Kafka Adapter

Overview

The Kafka Adapter microservice is part of the microservice event pipeline. The Kafka Adapter is responsible for taking messages embedded with JSON, XML, or plaintext from a configured input (Pulsar or Kafka) and enqueueing them to the configured output (Pulsar, Kafka, or Standard Out).

Prerequisites

  1. A microservices cluster must be setup. Refer to Microservice Cluster Setup.

  2. Apache Pulsar must be installed. Refer to Apache Pulsar microservice.

  3. Kafka topic to fetch/send data to/from.

Setup

su - assure1
export NAMESPACE=a1-zone1-pri
export WEBFQDN=<Primary Presentation Web FQDN> 
a1helm install kafka-adapter assure1/kafka-adapter -n $NAMESPACE --set global.imageRegistry=$WEBFQDN

Default Configuration

Name Value Possible Values Notes
LOG_LEVEL INFO FATAL, ERROR, WARN, INFO, DEBUG Logging level used by application.
STREAM_INPUT persistent://assure1/event/example Text, 255 characters Can be Apache Pulsar topic path or Kafka URI
STREAM_OUTPUT kafka://broker1.something.com:9092,broker2.something.com:9092/example-topic Text, 255 characters Can be Apache Pulsar topic path or Kafka URI or stdout.

Configurations can be changed by passing the values to the a1helm install prefixed with the configData parent key.

Example of setting the log level to DEBUG

a1helm install ... --set configData.LOG_LEVEL=DEBUG

Example Kafka URI format

kafka://broker1.something.com:9092,broker2.something.com:9092/example-topic

Query Parameters:

Input-Only Query Parameters:

Output-Only Query Parameters:

Example Messages

Exmple Text Message

{
  "_agents": [
    {
      "@timestamp": "2020-03-18T18:41:00.000Z",
      "host": "assure1host.pod.dns.com",
      "id": "assure1host.pod.dns.com",
      "app": "app-name",
      "version": "1.0.0-1",
      "input": "persistent://assure1/kafka/sink",
      "output": "kafka://broker1.something.com:9092,broker2.something.com:9092/kafka-topic"
    }
  ],
  "_domain": "...",
  "_type": "...",
  "_version": "5.0.0",
  "@timestamp": "2020-03-18T18:41:00.000Z",
  "message": {
    "_type": "text",
    "text": "Some plaintext message"
  }
}

Example XML Message

{
  "_agents": [
    {
      "@timestamp": "2020-03-18T18:41:00.000Z",
      "host": "assure1host.pod.dns.com",
      "id": "assure1host.pod.dns.com",
      "app": "app-name",
      "version": "1.0.0-1",
      "input": "persistent://assure1/kafka/sink",
      "output": "kafka://broker1.something.com:9092,broker2.something.com:9092/kafka-topic"
    }
  ],
  "_domain": "...",
  "_type": "...",
  "_version": "5.0.0",
  "@timestamp": "2020-03-18T18:41:00.000Z",
  "message": {
    "_type": "xml",
    "xml": "<?xml version=\"1.0\" encoding=\"UTF-8\"?><note><to>Tove</to><from>Jani</from><heading>Reminder</heading><body>Don't forget me this weekend!</body></note>"
  }
}

Example JSON Message

{
  "_agents": [
    {
      "@timestamp": "2020-03-18T18:41:00.000Z",
      "host": "assure1host.pod.dns.com",
      "id": "assure1host.pod.dns.com",
      "app": "app-name",
      "version": "1.0.0-1",
      "input": "persistent://assure1/kafka/sink",
      "output": "kafka://broker1.something.com:9092,broker2.something.com:9092/kafka-topic"
    }
  ],
  "_domain": "fault",
  "_type": "event",
  "_version": "5.0.0",
  "@timestamp": "2020-03-18T18:41:00.000Z",
  "message": {
    "_type": "json",
    "json": {
      "event": {
        "EventID": 0,
        "EventKey": "",
        "EventCategory": 3,
        "EventType": "",
        "Ack": 0,
        "Action": "",
        "Actor": "",
        "Count": 1,
        "Customer": "",
        "Department": "",
        "Details": {},
        "DeviceType": "",
        "Duration": 0.000,
        "EscalationFlag": 0,
        "ExpireTime": 0,
        "FirstReported": "2006-01-02T15:04:05.999999999Z",
        "GeoPath": {
          "type": "LineString",
          "coordinates": [[0,0],[0,0]]
        },
        "GeoLocation": {
          "type": "Point",
          "coordinates": [0,0]
        },
        "IPAddress": "",
        "LastChanged": "2006-01-02T15:04:05.999999999Z",
        "LastReported": "2006-01-02T15:04:05.999999999Z",
        "Location": "",
        "Method": "",
        "Node": "",
        "OrigSeverity": 1,
        "OwnerName": "",
        "RootCauseFlag": 0,
        "RootCauseID": 0,
        "Score": 0,
        "Service": "",
        "ServiceImpact": 0,
        "Severity": "Unknown",
        "SubDeviceType": "",
        "SubMethod": "",
        "SubNode": "",
        "Summary": "",
        "TicketFlag": 0,
        "TicketID": "",
        "ZoneID": 0
      }
    }
  }
}