Kafka Adapter
Overview
The Kafka Adapter microservice is part of the microservice event pipeline. The Kafka Adapter is responsible for taking messages embedded with JSON, XML, or plaintext from a configured input (Pulsar or Kafka) and enqueueing them to the configured output (Pulsar, Kafka, or Standard Out).
Prerequisites
-
A microservices cluster must be setup. Refer to Microservice Cluster Setup.
-
Apache Pulsar must be installed. Refer to Apache Pulsar microservice.
-
Kafka topic to fetch/send data to/from.
Setup
su - assure1
export NAMESPACE=a1-zone1-pri
export WEBFQDN=<Primary Presentation Web FQDN>
a1helm install kafka-adapter assure1/kafka-adapter -n $NAMESPACE --set global.imageRegistry=$WEBFQDN
Default Configuration
Name | Value | Possible Values | Notes |
---|---|---|---|
LOG_LEVEL | INFO | FATAL, ERROR, WARN, INFO, DEBUG | Logging level used by application. |
STREAM_INPUT | persistent://assure1/event/example | Text, 255 characters | Can be Apache Pulsar topic path or Kafka URI |
STREAM_OUTPUT | kafka://broker1.something.com:9092,broker2.something.com:9092/example-topic | Text, 255 characters | Can be Apache Pulsar topic path or Kafka URI or stdout. |
Configurations can be changed by passing the values to the a1helm install
prefixed with the configData parent key.
Example of setting the log level to DEBUG
a1helm install ... --set configData.LOG_LEVEL=DEBUG
Example Kafka URI format
kafka://broker1.something.com:9092,broker2.something.com:9092/example-topic
Query Parameters:
-
version
- Kafka version -
tlsConfigPrefix=EXAMPLE
– Use TLS using the additional configuration variable EXAMPLE_CA -
saslConfigPrefix=EXAMPLE
– Use SASL using the additional configuration variables EXAMPLE_USERNAME, EXAMPLE_PASSWORD, EXAMPLE_MECHANISM (OAUTHBEARER or PLAIN), EXAMPLE_VERSION, and EXAMPLE_TOKEN
Input-Only Query Parameters:
-
offset – Kafka offset. Either
oldest
ornewest
-
consumerGroup – Kafka consumer group
-
maxMessageSize - Integer (bytes)
-
maxProcessingTime=100ms – String (s, ms)
Output-Only Query Parameters:
-
compressionCodec –
none
,gzip
,snappy
,lz4
, orzstd
-
balanceStrategy –
range
,roundrobin
, orsticky
Example Messages
Exmple Text Message
{
"_agents": [
{
"@timestamp": "2020-03-18T18:41:00.000Z",
"host": "assure1host.pod.dns.com",
"id": "assure1host.pod.dns.com",
"app": "app-name",
"version": "1.0.0-1",
"input": "persistent://assure1/kafka/sink",
"output": "kafka://broker1.something.com:9092,broker2.something.com:9092/kafka-topic"
}
],
"_domain": "...",
"_type": "...",
"_version": "5.0.0",
"@timestamp": "2020-03-18T18:41:00.000Z",
"message": {
"_type": "text",
"text": "Some plaintext message"
}
}
Example XML Message
{
"_agents": [
{
"@timestamp": "2020-03-18T18:41:00.000Z",
"host": "assure1host.pod.dns.com",
"id": "assure1host.pod.dns.com",
"app": "app-name",
"version": "1.0.0-1",
"input": "persistent://assure1/kafka/sink",
"output": "kafka://broker1.something.com:9092,broker2.something.com:9092/kafka-topic"
}
],
"_domain": "...",
"_type": "...",
"_version": "5.0.0",
"@timestamp": "2020-03-18T18:41:00.000Z",
"message": {
"_type": "xml",
"xml": "<?xml version=\"1.0\" encoding=\"UTF-8\"?><note><to>Tove</to><from>Jani</from><heading>Reminder</heading><body>Don't forget me this weekend!</body></note>"
}
}
Example JSON Message
{
"_agents": [
{
"@timestamp": "2020-03-18T18:41:00.000Z",
"host": "assure1host.pod.dns.com",
"id": "assure1host.pod.dns.com",
"app": "app-name",
"version": "1.0.0-1",
"input": "persistent://assure1/kafka/sink",
"output": "kafka://broker1.something.com:9092,broker2.something.com:9092/kafka-topic"
}
],
"_domain": "fault",
"_type": "event",
"_version": "5.0.0",
"@timestamp": "2020-03-18T18:41:00.000Z",
"message": {
"_type": "json",
"json": {
"event": {
"EventID": 0,
"EventKey": "",
"EventCategory": 3,
"EventType": "",
"Ack": 0,
"Action": "",
"Actor": "",
"Count": 1,
"Customer": "",
"Department": "",
"Details": {},
"DeviceType": "",
"Duration": 0.000,
"EscalationFlag": 0,
"ExpireTime": 0,
"FirstReported": "2006-01-02T15:04:05.999999999Z",
"GeoPath": {
"type": "LineString",
"coordinates": [[0,0],[0,0]]
},
"GeoLocation": {
"type": "Point",
"coordinates": [0,0]
},
"IPAddress": "",
"LastChanged": "2006-01-02T15:04:05.999999999Z",
"LastReported": "2006-01-02T15:04:05.999999999Z",
"Location": "",
"Method": "",
"Node": "",
"OrigSeverity": 1,
"OwnerName": "",
"RootCauseFlag": 0,
"RootCauseID": 0,
"Score": 0,
"Service": "",
"ServiceImpact": 0,
"Severity": "Unknown",
"SubDeviceType": "",
"SubMethod": "",
"SubNode": "",
"Summary": "",
"TicketFlag": 0,
"TicketID": "",
"ZoneID": 0
}
}
}
}