Kafka Bridge
The Kafka Bridge microservice takes messages embedded with JSON, XML, or plaintext from a configured input (Pulsar or Kafka) and enqueues them to the configured output (Pulsar or Kafka). One end, either the input or output, must be a Kafka topic.
Note:
This microservice was previously called Kafka Adapter (kafka-adapter in the Helm chart and commands). It is now Kafka Bridge (kafka-bridge).
This microservice is part of the Event microservice pipeline. See Understanding the Event Pipeline in Unified Assurance Concepts for conceptual information. It can also participate in other pipelines that need to connect to Kafka.
You can enable redundancy for this microservice when you deploy it. See Configuring Microservice Redundancy for general information.
You can enable autoscaling for this microservice when you deploy it when you are using an internal Pulsar topic as input. See Configuring Autoscaling and Kafka Bridge Autoscaling Configuration.
This microservice provides additional Prometheus monitoring metrics. See Kafka Bridge Self-Monitoring Metrics.
Kafka Bridge Prerequisites
Before deploying the microservice, confirm that the following prerequisites are met:
-
A microservice cluster is set up. See Microservice Cluster Setup.
-
The Apache Pulsar microservice is deployed. See Pulsar.
-
You know the Kafka or Pulsar topic to send messages to or get them from.
-
You have created the appropriate Kubernetes secrets for your environment, which you will use when deploying the microservice. See Kafka Bridge Secrets.
Kafka Bridge Secrets
The Kafka Bridge microservice supports two types of Kubernetes secrets, which are mapped to different security and authentication requirements.
Secret Requirements for Security Protocols
For one-way TLS/SSL security protocol, you require a certificate secret containing a truststore. If the truststore is password-protected, you also require a credential secret containing a truststore password.
For mutual TLS (mTLS) security protocol, you require a certificate secret containing a truststore and a keystore. If the truststore, the keystore, or both are password-protected, you also require a credential secret containing a truststore password, a keystore password, or both.
Certificate Secret
The certificate secret consists of one or more files for certificates, keystores, truststores, or key files for TLS and mTLS. The secret name is of the format:
<microservice-release-name>-cert-secret
To create a certificate secret, use the following command:
a1k create secret generic <microservice-release-name>-cert-secret --from-file=<path-to>/<KafkaCert>.crt -n <namespace>
where:
-
<microservice-release-name> is the name used for the microservice instance. By default, this is the microservice name (kafka-bridge).
-
<namespace> is the namespace where you will deploy the microservice. The default namespace is a1-zone1-pri.
-
<path-to> is the file path leading to the required certificate file.
-
<KafkaCert> is the required certificate file.
Credential Secret
The credential secret is a text-type secret, and it consists of key-value pairs. The secret name is of the format:
<microservice-release-name>-cred-secret
The supported keys for a credential secret are:
-
password: The password set when Kafka Bridge is configured with Basic Authentication mechanism (PLAIN). This field is required for Basic Authentication mechanism.
-
clientSecret: The client secret set when Kafka Bridge is configured with OAuth mechanism (OAUTH). This field is required for OAuth mechanism.
-
keystorePassword: The password for the keystore if it is password-protected.
-
truststorePassword: The password for the truststore if it is password-protected.
To create a credential secret, use the following commands:
-
When Kafka Bridge is configured with Basic Authentication mechanism:
a1k create secret generic <microservice-release-name>-cred-secret --from-literal=password=<password> -n <namespace>
-
When Kafka Bridge is configured with OAuth mechanism:
a1k create secret generic <microservice-release-name>-cred-secret --from-literal=clientSecret=<secret> -n <namespace>
In the commands:
-
<microservice-release-name> is the name used for the microservice instance. By default, this is the microservice name (kafka-bridge).
-
<namespace> is the namespace used for the microservice instance. The default namespace is a1-zone1-pri.
-
<password> is the password you will set when Kafka Bridge is configured with Basic Authentication mechanism.
-
<secret> is the client secret you will set when Kafka Bridge is configured with OAuth mechanism.
Note:
Although the secret names of the certificate secret and credential secret are set by default, you can override the secret names when you deploy the microservice using the following configuration parameters:
-
KAFKA_CERT_SECRET_NAME_OVERRIDE to override the certificate secret name.
-
KAFKA_CRED_SECRET_NAME_OVERRIDE to override the credential secret name.
See Default Kafka Bridge Configuration for more details.
Deploying Kafka Bridge
To deploy the microservice, run the following commands:
su - assure1
export NAMESPACE=<namespace>
export WEBFQDN=<WebFQDN>
a1helm install <microservice-release-name> assure1/kafka-bridge -n $NAMESPACE --set global.imageRegistry=$WEBFQDN --set configData.STREAM_OUTPUT="<output_topic>" --set configData.STREAM_INPUT="<input_topic>" --set configData.LOG_LEVEL=DEBUG --set configData.<tlsConfigPrefix>_KEYSTORE="<key_store>.p12" --set configData.<tlsConfigPrefix>_CA="<CA_cert>" --set configData.<tlsConfigPrefix>_TRUSTSTORE_TYPE="<truststore_type>" --set configData.KAFKA_CERT_SECRET_NAME_OVERRIDE="cert-secret"
In the commands:
-
<namespace> is the namespace where you are deploying the microservice. The default namespace is a1-zone1-pri, but you can change the zone number and, when deploying to a redundant cluster, change pri to sec.
-
<WebFQDN> is the fully-qualified domain name of the primary presentation server for the cluster.
-
<microservice-release-name> is the name to use for the microservice instance. Oracle recommends using the microservice name (kafka-bridge) unless you are deploying multiple instances of the microservice to the same cluster.
-
<input_topic> and <output_topic> are the Kafka or Pulsar topics that the microservice subscribes and publishes to. There are no default values for these, and one of them must be a Kafka topic. See Example Input and Output Stream Formats for examples.
-
<tlsConfigPrefix> is the configuration prefix if Kafka is configured with TLS. You must specify this prefix in the Kafka URI that is used as either <input_topic> or <output_topic>.
-
<key_store> is the keystore file containing both client certificate and private key.
-
<CA_cert> is the file containing the trusted Certificate Authorities (CA).
-
<truststore_type> is the type of CA file. Accepted values are PKCS12 and PEM.
You can also use the Unified Assurance UI to deploy microservices. See Deploying a Microservice by Using the UI for more information.
Changing Kafka Bridge Configuration Parameters
When running the install command, you can optionally change default configuration parameter values by including them in the command with additional --set arguments. You can add as many additional --set arguments as you need.
For example:
-
Set a parameter described in Default Kafka Bridge Configuration by adding --set configData.<parameter_name>=<parameter_value>. For example, --set configData.LOG_LEVEL=DEBUG.
-
Enable redundancy for the microservice by adding --set redundancy=enabled. Enabling redundancy is valid when you are using a Unified Assurance Pulsar topic as the input for the microservice.
-
Enable autoscaling for the microservice by adding --set autoscaling.enabled=true. Enabling autoscaling is valid when you are using an external Kafka topic as input for the microservice.
-
Set an autoscaling parameter described in Kafka Bridge Autoscaling Configuration by adding --set autoscaling.<parameter_name>=<parameter_value>. For example, --set autoscaling.thresholds.totalEventsProcessed=500.
Default Kafka Bridge Configuration
The following table describes the default configuration parameters found in the Helm chart under configData for the microservice.
Name | Default Value | Possible Values | Notes |
---|---|---|---|
LOG_LEVEL | INFO | FATAL, ERROR, WARN, INFO, DEBUG | The logging level for the microservice. |
STREAM_INPUT | "" | Text | The Pulsar or Kafka topic that the microservice subscribes to. See Example Input and Output Stream Formats for examples. |
STREAM_OUTPUT | "" | Text | The Pulsar or Kafka topic that the microservice publishes to. See Example Input and Output Stream Formats for examples. |
REDUNDANCY_POLL_PERIOD | 5 | Integer greater than 0 | The number of seconds between status checks from the secondary microservice to the primary microservice. |
REDUNDANCY_FAILOVER_THRESHOLD | 4 | Integer greater than 0 | The number of times the primary microservice must fail checks before the secondary microservice becomes active. |
REDUNDANCY_FALLBACK_THRESHOLD | 1 | Integer greater than 0 | The number of times the primary microservice must succeed checks before the secondary microservice becomes inactive. |
KAFKA_CERT_SECRET_NAME_OVERRIDE | "" | Text | Use to override the default certificate secret name. |
KAFKA_CRED_SECRET_NAME_OVERRIDE | "" | Text | Use to override the default credential secret name. |
Other supported and required values depend on your security and streaming configuration. See Dynamic Parameters for TLS and SASL Configurations.
Example Input and Output Stream Formats
You can specify Pulsar or Kafka topics in the STREAM_INPUT and STREAM_OUTPUT configuration parameters.
For Kafka URLs, use the following format:
kafka://<username>@<host:port>/<topic>?tlsConfigPrefix=<prefix_1>&saslConfigPrefix=<prefix_2>
where:
-
<username> is the username you must set if using Basic Authentication (PLAIN). If you are not using Basic Authentication, omit <username>@ from the URL.
-
<host:port> specifies the bootstrap servers in Kafka.
-
<prefix_1> is the prefix you will use if Kafka Bridge is configured for TLS.
-
<prefix_2> is the prefix you will use if Kafka Bridge is configured for SASL.
The security protocol set up on the microservice also depends on the prefix parameters:
-
If you omit tlsConfigPrefix and specify saslConfigPrefix, the security protocol is set to
SASL_PLAINTEXT
. -
If you omit saslConfigPrefix and specify tlsConfigPrefix, the security protocol is set to
SSL
. -
If you omit both parameters, the security protocol is set to
PLAINTEXT
. -
If you specify both parameters, the security protocol is set to
SASL_SSL
.
For example:
-
To subscribe to the TMF688 Event Processor microservice's Pulsar topic, set STREAM_INPUT to:
persistent://assure1/event/tmf688-event
-
To specify the server URL for a Pulsar topic, set STREAM_INPUT to:
pulsar+ssl://<host:port>/<topic>
-
To publish to a Kafka topic on a server that does not use SSL or authentication, set STREAM_OUTPUT to:
kafka://<host:port>/<topic>
For Kafka, you can also include the following query parameters:
-
Input-only query parameters:
-
offset: The Kafka offset. Either oldest or newest. Default value is newest. This is an optional parameter.
-
consumerGroup: The Kafka consumer group, in text format. This is a mandatory parameter.
-
maxMessageSize: An integer representing the maximum message size in bytes. Default value is 1048576. This is an optional parameter.
-
-
Output-only query parameters:
-
compressionCodec: Possible values are none, gzip, snappy, lz4, and zstd. Default value is lz4. This is an optional parameter.
-
balanceStrategy: Possible values are range, roundrobin, and sticky. Default value is range. This is an optional parameter.
-
For example, to use some of these query parameters, set the stream to:
kafka://<host:port>/<topic>?tlsConfigPrefix=KAFKA_TLS&offset=newest&consumerGroup=grp1&maxMessageSize=1024
Dynamic Parameters for TLS and SASL Configurations
If Kafka is configured with TLS, the following parameters are dynamically available:
-
<tlsConfigPrefix>_CA: Use this parameter to specify the file containing trusted Certificate Authorities (CA), which is used for server certificate validation. Supported formats are PKCS12 (which includes .p12 and .pfx files) and PEM (which includes .crt and .pem files). For files of PKCS12 format, provide the password using the secret. If you do not specify a value for this parameter, then global trusted root CAs are used for the truststore.
-
<tlsConfigPrefix>_TRUSTSTORE_TYPE: Use this parameter to specify the type of truststore or CA file. Accepted values are PKCS12 and PEM.
-
<tlsConfigPrefix>_KEYSTORE: Use this parameter to specify the path to a keystore file containing both client certificate and private key. The keystore file must be of type PKCS12. If the Kafka Bridge is configured with mTLS, you must set either <tlsConfigPrefix>_KEYSTORE or both <tlsConfigPrefix>_KEY and <tlsConfigPrefix>_CERT. Leave all three fields blank if Kafka Bridge is configured with one-way TLS/SSL.
-
<tlsConfigPrefix>_CERT: Use this parameter to specify the client certificate only. The certificate file must be of PEM type. If the Kafka Bridge is configured with mTLS, you must set either <tlsConfigPrefix>_KEYSTORE or both <tlsConfigPrefix>_KEY and <tlsConfigPrefix>_CERT. Leave all three fields blank if Kafka Bridge is configured with one-way TLS/SSL.
-
<tlsConfigPrefix>_KEY: Use this parameter to specify the private key only. The key must be PKCS8 compliant and of PEM type. If the Kafka Bridge is configured with mTLS, you must set either <tlsConfigPrefix>_KEYSTORE or both <tlsConfigPrefix>_KEY and <tlsConfigPrefix>_CERT. Leave all three fields blank if Kafka Bridge is configured with one-way TLS/SSL.
Note:
If you provide <tlsConfigPrefix>_KEYSTORE, <tlsConfigPrefix>_KEY, and <tlsConfigPrefix>_CERT, then <tlsConfigPrefix>_KEYSTORE takes precedence.
If Kafka is configured with SASL, the following parameters are dynamically available:
-
<saslConfigPrefix>_MECHANISM: Use this field to specify the SASL mechanism. This is a required field. Accepted values are PLAIN and OAUTH.
-
<saslConfigPrefix>_CLIENT_ID: Use this field to specify the OAuth client ID. This field is required when <saslConfigPrefix>_MECHANISM is set to OAUTH.
-
<saslConfigPrefix>_OAUTH_ENDPOINT_URI: Use this field to specify the endpoint used to obtain the OAuth access token. This field is required when <saslConfigPrefix>_MECHANISM is set to OAUTH.
-
<saslConfigPrefix>_OAUTH_SERVER_TLS_CA: Use this field to specify the CA certificate for verifying the OAuth server. This is an optional field when <saslConfigPrefix>_MECHANISM is set to OAUTH.
-
<saslConfigPrefix>_AUDIENCE: Use this field to specify the audience for fetching the access token. This is an optional field when <saslConfigPrefix>_MECHANISM is set to OAUTH.
-
<saslConfigPrefix>_SCOPE: Use this field to specify the scope for fetching the access token. This is an optional field when <saslConfigPrefix>_MECHANISM is set to OAUTH.
Kafka Bridge Autoscaling Configuration
Autoscaling is supported for the Kafka Bridge microservice when you specify an internal Pulsar topic as the input. See Configuring Autoscaling for general information and details about the standard autoscaling configurations.
The Kafka Bridge microservice also supports the additional configurations described in the following table.
Name | Default Value | Possible Values | Notes |
---|---|---|---|
thresholds.backlogSize | 1000 | Integer | The number of items that need to be in the backlog before the autoscaling starts additional processes. |
thresholds.totalEventsProcessed | 400 | Integer | The total number of events processed by the microservice. If the average of total events processed in the configured polling interval exceeds the threshold, pods will be scaled. |
Kafka Bridge Self-Monitoring Metrics
The Kafka Bridge microservice exposes the self-monitoring metrics described in the following table to Prometheus.
Metric Name | Type | Description |
---|---|---|
received_events_total | Counter | The number of events received. |
sent_events_total | Counter | The number of events sent. |
avg_time | Gauge | The average time to process events. |
reception_success_per_second | Gauge | The rate of successfully received messages per second. |
sending_success_per_second | Gauge | The rate of successfully sent messages per second. |
failed_events_total | Counter | The total number of failed events. |
outstanding_metric | Gauge | The number of pending events. |
starts_per_second | Gauge | The rate of messages started per second. |
fails_per_second | Gauge | The rate of messages failed per second. |
finished_per_second | Gauge | The rate of messages finished per second. |
Note:
Metric names in the database include a prefix that indicates the service that inserted them. The prefix is prom_ for metrics inserted by Prometheus. For example, received_events_total is stored as prom_received_events_total in the database.
Example Kafka Messages
Example text message:
{
"_agents": [
{
"@timestamp": "2020-03-18T18:41:00.000Z",
"host": "assure1host.pod.dns.com",
"id": "assure1host.pod.dns.com",
"app": "app-name",
"version": "1.0.0-1",
"input": "persistent://assure1/kafka/sink",
"output": "kafka://broker1.example.com:9092,broker2.example.com:9092/kafka-topic"
}
],
"_domain": "...",
"_type": "...",
"_version": "5.0.0",
"@timestamp": "2020-03-18T18:41:00.000Z",
"message": {
"_type": "text",
"text": "A plaintext message"
}
}
Example XML message:
{
"_agents": [
{
"@timestamp": "2020-03-18T18:41:00.000Z",
"host": "assure1host.pod.dns.com",
"id": "assure1host.pod.dns.com",
"app": "app-name",
"version": "1.0.0-1",
"input": "persistent://assure1/kafka/sink",
"output": "kafka://broker1.something.com:9092,broker2.something.com:9092/kafka-topic"
}
],
"_domain": "...",
"_type": "...",
"_version": "5.0.0",
"@timestamp": "2020-03-18T18:41:00.000Z",
"message": {
"_type": "xml",
"xml": "<?xml version=\"1.0\" encoding=\"UTF-8\"?><note><to>Tove</to><from>Jani</from><heading>Reminder</heading><body>Don't forget me this weekend!</body></note>"
}
}
Example JSON message:
{
"_agents": [
{
"@timestamp": "2020-03-18T18:41:00.000Z",
"host": "assure1host.pod.dns.com",
"id": "assure1host.pod.dns.com",
"app": "app-name",
"version": "1.0.0-1",
"input": "persistent://assure1/kafka/sink",
"output": "kafka://broker1.example.com:9092,broker2.example.com:9092/kafka-topic"
}
],
"_domain": "fault",
"_type": "event",
"_version": "5.0.0",
"@timestamp": "2020-03-18T18:41:00.000Z",
"message": {
"_type": "json",
"json": {
"event": {
"EventID": 0,
"EventKey": "",
"EventCategory": 3,
"EventType": "",
"Ack": 0,
"Action": "",
"Actor": "",
"Count": 1,
"Customer": "",
"Department": "",
"Details": {},
"DeviceType": "",
"Duration": 0.000,
"EscalationFlag": 0,
"ExpireTime": 0,
"FirstReported": "2006-01-02T15:04:05.999999999Z",
"GeoPath": {
"type": "LineString",
"coordinates": [[0,0],[0,0]]
},
"GeoLocation": {
"type": "Point",
"coordinates": [0,0]
},
"IPAddress": "",
"LastChanged": "2006-01-02T15:04:05.999999999Z",
"LastReported": "2006-01-02T15:04:05.999999999Z",
"Location": "",
"Method": "",
"Node": "",
"OrigSeverity": 1,
"OwnerName": "",
"RootCauseFlag": 0,
"RootCauseID": 0,
"Score": 0,
"Service": "",
"ServiceImpact": 0,
"Severity": "Unknown",
"SubDeviceType": "",
"SubMethod": "",
"SubNode": "",
"Summary": "",
"TicketFlag": 0,
"TicketID": "",
"ZoneID": 0
}
}
}
}