Kafka Bridge

The Kafka Bridge microservice takes messages embedded with JSON, XML, or plaintext from a configured input (Pulsar or Kafka) and enqueues them to the configured output (Pulsar or Kafka). One end, either the input or output, must be a Kafka topic.

Note:

This microservice was previously called Kafka Adapter (kafka-adapter in the Helm chart and commands). It is now Kafka Bridge (kafka-bridge).

This microservice is part of the Event microservice pipeline. See Understanding the Event Pipeline in Unified Assurance Concepts for conceptual information. It can also participate in other pipelines that need to connect to Kafka.

You can enable redundancy for this microservice when you deploy it. See Configuring Microservice Redundancy for general information.

You can enable autoscaling for this microservice when you deploy it when you are using an internal Pulsar topic as input. See Configuring Autoscaling and Kafka Bridge Autoscaling Configuration.

This microservice provides additional Prometheus monitoring metrics. See Kafka Bridge Self-Monitoring Metrics.

Kafka Bridge Prerequisites

Before deploying the microservice, confirm that the following prerequisites are met:

  1. A microservice cluster is set up. See Microservice Cluster Setup.

  2. The Apache Pulsar microservice is deployed. See Pulsar.

  3. You know the Kafka or Pulsar topic to send messages to or get them from.

  4. You have created the appropriate Kubernetes secrets for your environment, which you will use when deploying the microservice. See Kafka Bridge Secrets.

Kafka Bridge Secrets

The Kafka Bridge microservice supports two types of Kubernetes secrets, which are mapped to different security and authentication requirements.

Secret Requirements for Security Protocols

For one-way TLS/SSL security protocol, you require a certificate secret containing a truststore. If the truststore is password-protected, you also require a credential secret containing a truststore password.

For mutual TLS (mTLS) security protocol, you require a certificate secret containing a truststore and a keystore. If the truststore, the keystore, or both are password-protected, you also require a credential secret containing a truststore password, a keystore password, or both.

Certificate Secret

The certificate secret consists of one or more files for certificates, keystores, truststores, or key files for TLS and mTLS. The secret name is of the format:

<microservice-release-name>-cert-secret

To create a certificate secret, use the following command:

a1k create secret generic <microservice-release-name>-cert-secret --from-file=<path-to>/<KafkaCert>.crt -n <namespace>

where:

Credential Secret

The credential secret is a text-type secret, and it consists of key-value pairs. The secret name is of the format:

<microservice-release-name>-cred-secret

The supported keys for a credential secret are:

To create a credential secret, use the following commands:

In the commands:

Note:

Although the secret names of the certificate secret and credential secret are set by default, you can override the secret names when you deploy the microservice using the following configuration parameters:

See Default Kafka Bridge Configuration for more details.

Deploying Kafka Bridge

To deploy the microservice, run the following commands:

su - assure1
export NAMESPACE=<namespace>
export WEBFQDN=<WebFQDN> 
a1helm install <microservice-release-name> assure1/kafka-bridge -n $NAMESPACE --set global.imageRegistry=$WEBFQDN --set configData.STREAM_OUTPUT="<output_topic>" --set configData.STREAM_INPUT="<input_topic>" --set configData.LOG_LEVEL=DEBUG --set configData.<tlsConfigPrefix>_KEYSTORE="<key_store>.p12" --set configData.<tlsConfigPrefix>_CA="<CA_cert>" --set configData.<tlsConfigPrefix>_TRUSTSTORE_TYPE="<truststore_type>" --set configData.KAFKA_CERT_SECRET_NAME_OVERRIDE="cert-secret"

In the commands:

You can also use the Unified Assurance UI to deploy microservices. See Deploying a Microservice by Using the UI for more information.

Changing Kafka Bridge Configuration Parameters

When running the install command, you can optionally change default configuration parameter values by including them in the command with additional --set arguments. You can add as many additional --set arguments as you need.

For example:

Default Kafka Bridge Configuration

The following table describes the default configuration parameters found in the Helm chart under configData for the microservice.

Name Default Value Possible Values Notes
LOG_LEVEL INFO FATAL, ERROR, WARN, INFO, DEBUG The logging level for the microservice.
STREAM_INPUT "" Text The Pulsar or Kafka topic that the microservice subscribes to.
See Example Input and Output Stream Formats for examples.
STREAM_OUTPUT "" Text The Pulsar or Kafka topic that the microservice publishes to.
See Example Input and Output Stream Formats for examples.
REDUNDANCY_POLL_PERIOD 5 Integer greater than 0 The number of seconds between status checks from the secondary microservice to the primary microservice.
REDUNDANCY_FAILOVER_THRESHOLD 4 Integer greater than 0 The number of times the primary microservice must fail checks before the secondary microservice becomes active.
REDUNDANCY_FALLBACK_THRESHOLD 1 Integer greater than 0 The number of times the primary microservice must succeed checks before the secondary microservice becomes inactive.
KAFKA_CERT_SECRET_NAME_OVERRIDE "" Text Use to override the default certificate secret name.
KAFKA_CRED_SECRET_NAME_OVERRIDE "" Text Use to override the default credential secret name.

Other supported and required values depend on your security and streaming configuration. See Dynamic Parameters for TLS and SASL Configurations.

Example Input and Output Stream Formats

You can specify Pulsar or Kafka topics in the STREAM_INPUT and STREAM_OUTPUT configuration parameters.

For Kafka URLs, use the following format:

kafka://<username>@<host:port>/<topic>?tlsConfigPrefix=<prefix_1>&saslConfigPrefix=<prefix_2>

where:

The security protocol set up on the microservice also depends on the prefix parameters:

For example:

For Kafka, you can also include the following query parameters:

For example, to use some of these query parameters, set the stream to:

kafka://<host:port>/<topic>?tlsConfigPrefix=KAFKA_TLS&offset=newest&consumerGroup=grp1&maxMessageSize=1024

Dynamic Parameters for TLS and SASL Configurations

If Kafka is configured with TLS, the following parameters are dynamically available:

Note:

If you provide <tlsConfigPrefix>_KEYSTORE, <tlsConfigPrefix>_KEY, and <tlsConfigPrefix>_CERT, then <tlsConfigPrefix>_KEYSTORE takes precedence.

If Kafka is configured with SASL, the following parameters are dynamically available:

Kafka Bridge Autoscaling Configuration

Autoscaling is supported for the Kafka Bridge microservice when you specify an internal Pulsar topic as the input. See Configuring Autoscaling for general information and details about the standard autoscaling configurations.

The Kafka Bridge microservice also supports the additional configurations described in the following table.

Name Default Value Possible Values Notes
thresholds.backlogSize 1000 Integer The number of items that need to be in the backlog before the autoscaling starts additional processes.
thresholds.totalEventsProcessed 400 Integer The total number of events processed by the microservice. If the average of total events processed in the configured polling interval exceeds the threshold, pods will be scaled.

Kafka Bridge Self-Monitoring Metrics

The Kafka Bridge microservice exposes the self-monitoring metrics described in the following table to Prometheus.

Metric Name Type Description
received_events_total Counter The number of events received.
sent_events_total Counter The number of events sent.
avg_time Gauge The average time to process events.
reception_success_per_second Gauge The rate of successfully received messages per second.
sending_success_per_second Gauge The rate of successfully sent messages per second.
failed_events_total Counter The total number of failed events.
outstanding_metric Gauge The number of pending events.
starts_per_second Gauge The rate of messages started per second.
fails_per_second Gauge The rate of messages failed per second.
finished_per_second Gauge The rate of messages finished per second.

Note:

Metric names in the database include a prefix that indicates the service that inserted them. The prefix is prom_ for metrics inserted by Prometheus. For example, received_events_total is stored as prom_received_events_total in the database.

Example Kafka Messages

Example text message:

{
  "_agents": [
    {
      "@timestamp": "2020-03-18T18:41:00.000Z",
      "host": "assure1host.pod.dns.com",
      "id": "assure1host.pod.dns.com",
      "app": "app-name",
      "version": "1.0.0-1",
      "input": "persistent://assure1/kafka/sink",
      "output": "kafka://broker1.example.com:9092,broker2.example.com:9092/kafka-topic"
    }
  ],
  "_domain": "...",
  "_type": "...",
  "_version": "5.0.0",
  "@timestamp": "2020-03-18T18:41:00.000Z",
  "message": {
    "_type": "text",
    "text": "A plaintext message"
  }
}

Example XML message:

{
  "_agents": [
    {
      "@timestamp": "2020-03-18T18:41:00.000Z",
      "host": "assure1host.pod.dns.com",
      "id": "assure1host.pod.dns.com",
      "app": "app-name",
      "version": "1.0.0-1",
      "input": "persistent://assure1/kafka/sink",
      "output": "kafka://broker1.something.com:9092,broker2.something.com:9092/kafka-topic"
    }
  ],
  "_domain": "...",
  "_type": "...",
  "_version": "5.0.0",
  "@timestamp": "2020-03-18T18:41:00.000Z",
  "message": {
    "_type": "xml",
    "xml": "<?xml version=\"1.0\" encoding=\"UTF-8\"?><note><to>Tove</to><from>Jani</from><heading>Reminder</heading><body>Don't forget me this weekend!</body></note>"
  }
}

Example JSON message:

{
  "_agents": [
    {
      "@timestamp": "2020-03-18T18:41:00.000Z",
      "host": "assure1host.pod.dns.com",
      "id": "assure1host.pod.dns.com",
      "app": "app-name",
      "version": "1.0.0-1",
      "input": "persistent://assure1/kafka/sink",
      "output": "kafka://broker1.example.com:9092,broker2.example.com:9092/kafka-topic"
    }
  ],
  "_domain": "fault",
  "_type": "event",
  "_version": "5.0.0",
  "@timestamp": "2020-03-18T18:41:00.000Z",
  "message": {
    "_type": "json",
    "json": {
      "event": {
        "EventID": 0,
        "EventKey": "",
        "EventCategory": 3,
        "EventType": "",
        "Ack": 0,
        "Action": "",
        "Actor": "",
        "Count": 1,
        "Customer": "",
        "Department": "",
        "Details": {},
        "DeviceType": "",
        "Duration": 0.000,
        "EscalationFlag": 0,
        "ExpireTime": 0,
        "FirstReported": "2006-01-02T15:04:05.999999999Z",
        "GeoPath": {
          "type": "LineString",
          "coordinates": [[0,0],[0,0]]
        },
        "GeoLocation": {
          "type": "Point",
          "coordinates": [0,0]
        },
        "IPAddress": "",
        "LastChanged": "2006-01-02T15:04:05.999999999Z",
        "LastReported": "2006-01-02T15:04:05.999999999Z",
        "Location": "",
        "Method": "",
        "Node": "",
        "OrigSeverity": 1,
        "OwnerName": "",
        "RootCauseFlag": 0,
        "RootCauseID": 0,
        "Score": 0,
        "Service": "",
        "ServiceImpact": 0,
        "Severity": "Unknown",
        "SubDeviceType": "",
        "SubMethod": "",
        "SubNode": "",
        "Summary": "",
        "TicketFlag": 0,
        "TicketID": "",
        "ZoneID": 0
      }
    }
  }
}