4 Deploying Unified Operations Message Bus

This chapter describes how to deploy Unified Operations Message Bus.

Unified Operations Message Bus Overview

The Oracle Communications Unified Operations Message Bus (OCUOMB) service is a distributed event store and stream-processing platform service. The Message Bus clients send and receive events and messages from the Message Bus service that in turn sends and receives from a specific channel called Topic. This enables that the source and target clients or services are loosely coupled and asynchronous. Message Bus uses Apache Kafka in its platform to support the event store and stream-processing and for packaging. For deployment, Message Bus uses Strimzi.

Strimzi simplifies the process of running Apache Kafka in a Kubernetes cluster. Strimzi provides container images and operators for running Apache Kafka on Kubernetes. Strimzi operators are fundamental for the smooth running of Strimzi. These operators are software extensions to Kubernetes that make use of custom resources to manage applications and their components. These operators simplify the process of:

  • Deploying, running, and upgrading the Kafka cluster and its components.
  • Configuring and securing access to Kafka.
  • Creating and managing Kafka topics.

Operators are a method of packaging, deploying, and managing a Kubernetes application. The Strimzi operators extend Kubernetes functionality and automate common and complex tasks related to a Kafka deployment. By implementing knowledge of Kafka operations in code, Kafka administration tasks are simplified and require less manual intervention. See https://strimzi.io/docs/operators/latest/overview.html for more details on the Strimzi operators. Strimzi has the following operators:

  • Cluster Operator: Deploys and manages the Apache Kafka clusters, Kafka Connect, Kafka Mirror Maker, Kafka Bridge, Kafka Exporter, Cruise Control, and the Entity Operator.
  • Entity Operator: Comprises the Topic Operator and User Operator
  • Topic Operator: Manages Kafka topics

See the following webssites for more information on Strimzi and Apache Kafka:

The Message Bus service provides scripts and helm charts to deploy and manage the Apache Kafka cluster in Kubernetes by using the Strimzi operator and Kubernetes Custom Resources definitions. The Message Bus service does not provide any image builder toolkits to build the container images and by default, Helm charts pull the required container images from the quay.io/strimzi container repository.

Table 4-1 Container Images and Purposes

Container Image Purpose
quay.io/strimzi/operator:<Strimzi_Operator_version> Container Image with Strimzi Operator.
quay.io/strimzi/kafka:<Strimzi_Operator_version>-kafka-<Kafka_version>

Container Image with Apache Kafka and Strimzi distribution.

In the following sections, the reference to the container image is named as STRIMZI_KAFKA_IMAGE_NAME

Note:

See UIM Compatibility Matrix for the latest versions of software.

Message Bus Cloud Native Architecture

The Message Bus service uses Apache Kafka as a distributed event store platform. To run an Apache Kafka cluster on Kubernetes, the Message Bus service uses the Strimzi operator. Strimzi is an open-source project that provides container images and operators for running Apache Kafka on Kubernetes.

Figure 4-1 Message Bus Cloud Native Architecture



Access to Message Bus

While deploying the Message Bus Service in Kubernetes namespace, the following Kubernetes service objects are created to access the Message Bus pods either internally or externally (through an ingress controller). The external access is provided through the ingress controller by IngressRouteTCP objects.

You can override the value of subDomainNameSeparator. The default separator is ".", This value can be modified as "-" to match the wild-card pattern of SSL certificates.

To override, uncomment and change the value in applications.yaml as follows:
#subDomainNameSeparator: "."
#Example hostnames for "-" : quick-sr-messaging-bootstrap.uim.org

Figure 4-2 Process of Accessing the Message Bus



The external access to Message Bus service is supported with TCP+TLS+OAuth 2.0 Authentication through Traefik ingress controller or a Generic ingress controller. The internal access to Message Bus Service is also supported with TCP+TLS+OAuth 2.0 Authentication where TLS can be configurable. Access to Message Bus service is configured through the listeners section in applications.yaml file.

Note:

  • If the client is in the same Kubernetes cluster, the internal listener is used.
  • If the client is outside the Kubernetes cluster, the ingress listener is used.

The Message Bus is deployed using the scripts provided in Common CNTK. For deployment prerequisites, see "Planning and Validating Your Cloud Environment".

The following steps need to be followed to deploy a Kafka cluster in a Kubernetes namespace in a cluster:

  1. Deploy the Strimzi operator to manage your Kafka cluster.

    Note:

    This is an administrative one-time activity where additional cluster roles are required.
    1. Create a namespace to deploy Strimzi Operator.
    2. Deploy Strimzi Operator in the namespace. See "Deploying Strimzi Operator" for more information.
  2. Deploy the Message Bus that has Kafka cluster, ZooKeeper cluster, and entity operator.
    1. Create a namespace to deploy the Kafka cluster.
    2. Register the namespace with Strimzi Operator. See "Register namespaces with Strimzi Operator" for more information.
    3. Register the namespace with Traefik. See "Registering the Namespaces with Strimzi Operator" for more information.

      Note:

      • The ingress controller (Traefik or Generic) has to be available.
      • Register the namespace with Traefik ingress controller. If you use Generic Ingress controller, ensure that ingress.className is set in the applications.yaml file.
    4. Deploy Kafka Cluster in the namespace. See "Deploy Kafka Cluster and Kafka Topic" for more information.
  3. Validate the deployment with sample standalone producer and consumer clients. See the "Validating the Kafka cluster" and "Internal access - same namespace - plain" for more information.

Strimzi Operator

Export the Strimzi operator namespace environment variable to run the deployment script using the COMMON_CNTK:

export STRIMZI_NS=<STRIMZI_OPERATOR_NAMESPACE>

The configurable parameters of the Strimzi Operator charts and their default values are listed in the corresponding subsections within this document.

See the Assembling the Specifications section in strimzi-operator-override-values.yaml. To override the default values, copy the $COMMON_CNTK/samples/strimzi-operator-override-values.yaml file to the directory $SPEC_PATH/<STRIMZI_PROJECT>, where <STRIMZI_PROJECT> is the Kubernetes namespace where the Strimzi operator is being deployed.

Create Global Resources

Configure the createGlobalResources value in strimzi-operator-override-values.yaml file (sample). If you require more than one strimzi-cluster-operator in the same cluster, set the value to false. Only the latest versions of strimzi should be installed, in case of multiple strimzi-operator to avoid any risk related to backward compatibility.

Private Container Repository

The Strimzi operator pulls the Strimzi component container images from quay.io registry. If you want to maintain private container registry, pull the images from the quay.io registry and push them into the private container registry. It is mandatory to push the images with same name and tag, the repository path can be different. For Strimzi image and tag names, see "Unified Operations Message Bus Overview" for more information.

See About Container Image Management section from UIM Cloud Native Deployment Guide for more information on private container repository management.

To use the private container registry, uncomment and modify the values in $SPEC_PATH/<STRIMZI_PROJECT>/strimzi-operator-override-values.yaml file. Provide the modified strimzi-operator-override-values.yaml file path as an -f option to the Strimzi operator create/upgrade command.

If the private container registry requires authentication, create the Kubernetes secret in the namespace and provide the secret name as part of strimzi-operator-override-values.yaml file. Create the secret with same name in the namespace where the Kafka cluster is planned to deploy.

strimzi-operator-override-values.yaml file (Sample)

defaultImageRegistry: <Image registry>
defaultImageRepository: <Image Repository>
image:
  imagePullSecrets: <Pull Secret>

The following is a sample command to create Kubernetes secret for the registry. Create the secret in the namespace where the Strimzi operator is being deployed. See https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/ for creating secret.

kubectl create secret docker-registry <secret-name> --docker-server=<Image Registry> \
                                                    --docker-username=<Username> \
                                                    --docker-password=<Password> \
                                                    -n <STRIMZI_OPERATOR_NAMESPACE>

ImagePullPolicy

The following sample of ImagePullPolicy for Strimzi Operator is provided. To create the policy using a different procedure, see https://kubernetes.io/docs/concepts/containers/images/#image-pull-policy

strimzi-operator-override-values.yaml file (Sample)

image:
  imagePullPolicy: IfNotPresent

Resources

These resources are used for configuring the virtual resources (limits and requests). Uncomment or add the blow resources section with new values in the strimzi-operator-override-values.yaml file.

resources:
  requests:
    memory: <Mi>
    cpu: <m>
  limits:
    memory: <Gi>
    cpu: <"1">
 
fullReconciliationIntervalMs: 120000
operationTimeoutMs: 300000

The default values are as follows:

resources.limits.memory: 500Mi
resources.limits.cpu: 500m
resources.requests.memory: 1Gi
resources.requests.cpu: 1

Along with the above resources, you can provide the following additional configurations:

# Full reconciliation interval in milliseconds
fullReconciliationIntervalMs: 120000
# Operation timeout in milliseconds
operationTimeoutMs: 300000

Deploying Strimzi Operator

Run the following script to deploy the Strimzi operator in the Kubernetes namespace:

$COMMON_CNTK/scripts/strimzi-operator.sh -p <STRIMZI_OPERATOR_NAMESPACE> -c create

Optionally, run the following script to deploy the Strimzi operator in Kubernetes namespace with custom image registry and repository:

$COMMON_CNTK/scripts/strimzi-operator.sh -p <STRIMZI_OPERATOR_NAMESPACE> -c create -f $SPEC_PATH/<STRIMZI_OPERATOR_NAMESPACE>/strimzi-operator-override-values.yaml

Upgrading Strimzi Operator

Run the following script to upgrade the Strimzi Operator in Kubernetes namespace:

$COMMON_CNTK/scripts/strimzi-operator.sh -p <STRIMZI_OPERATOR_NAMESPACE> -c upgrade

Optionally, run the following script to deploy the Strimzi operator in Kubernetes namespace with custom image registry and repository:

$COMMON_CNTK/scripts/strimzi-operator.sh -p <STRIMZI_OPERATOR_NAMESPACE> -c upgrade -f $SPEC_PATH/<STRIMZI_OPERATOR_NAMESPACE>/strimzi-operator-override-values.yaml

Note:

If you are upgrading strimzi-cluster-operator to a newer version, the old toolkit should be used for old version of strimzi (the one already deployed) and the new toolkit should be used while upgrading to the newer version, in case of Create, Upgrade, Delete, Register, and Unregister.

Uninstalling Strimzi Operator

Run the following script to uninstall the Strimzi Operator from Kubernetes namespace:

$COMMON_CNTK/scripts/strimzi-operator.sh -p <STRIMZI_OPERATOR_NAMESPACE> -c delete

Validating Strimzi Operator

Validate the Strimzi operator that is installed in the provided namespace by running the following command:

$kubectl get pod -n <STRIMZI_OPERATOR_NAMESPACE>
 
NAME                                        READY   STATUS    RESTARTS   AGE
strimzi-cluster-operator-*******-***        1/1     Running   0          6m55s

Validate the Helm release installed for the Strimzi operator in the provided namespace by running the following command:

$helm list -n <STRIMZI_OPERATOR_NAMESPACE>
 
NAME                    NAMESPACE                          REVISION        STATUS          CHART                         APP VERSION
strimzi-operator      <STRIMZI_OPERATOR_NAMESPACE>         1               deployed        strimzi-kafka-operator-x.y.z  x.y.z

Restarting the Strimzi Operator

Run the following script to restart the Strimzi Operator:

$COMMON_CNTK/scripts/strimzi-operator.sh -p <STRIMZI_OPERATOR_NAMESPACE> -c restart

Registering the Namespaces with Strimzi Operator

To create and manage the Kafka cluster in a Kubernetes namespace, this namespace must be registered with the Strimzi operator to monitor the CRDs.

Run the following script to register the namespace(s) with the Strimzi operator to monitor and create or manage the Kafka cluster and its components:

$COMMON_CNTK/scripts/register-namespace.sh -p <Namespace to be monitored> -t strimzi

Unregistering the Namespaces with Strimzi Operator

Run the following script to unregister the namespaces from the Strimzi operator:

$COMMON_CNTK/scripts/unregister-namespace.sh -p <Namespace to be un-monitored> -t strimzi

Deploying and Managing Message Bus

Kafka cluster consists of Kafka Brokers and Zookeeper nodes. Once the Strimzi operator is successfully installed in the Kubernetes cluster and a namespace for the Kafka cluster is registered to monitor, you can deploy and manage the Kafka cluster.

Update the applications.yaml file as per your requirement and verify the following configuration elements in the yaml file before deploying the Kafka cluster:

Note:

If applications.yaml is not copied from Common CNTK, copy the $COMMON_CNTK/samples/applications.yaml file to your local directory, for example: $SPEC_PATH/sr/quick, where the sr is the Kubernetes namespace and quick is the instance name.

  • The Storage class name that is used to create persistent volumes.
  • The Kafka cluster replicas, which is the number of Kafka Brokers and Zookeeper nodes.
  • Virtual Resource sizing.
  • The Kafka Broker default settings.
  • The listeners to be exposed with authentication and TLS.
  • Authentication details.
  • Metrics enablement.
  • Affinity settings
  • Update partitions, replicas, and retention period values for the default Kafka Topics.

See "Configuring the applications.yaml File" for more details.

Deploying Message Bus

Run the following commands to deploy the Kafka cluster with Kafka Topics in a Kubernetes namespace:

$COMMON_CNTK/scripts/create-applications.sh \
-p <kafka cluster namespace> \
-i <kafka cluster instance name> \
-f <path to override values yaml file> \
-a messaging-bus

For example:

In the following command, sr is a namespace and quick an instance name:

$COMMON_CNTK/scripts/create-applications.sh -p sr -i quick -f $SPEC_PATH/sr/quick/applications.yaml -a messaging-bus

Upgrading Message Bus

The Kafka cluster upgrade requires persistent storage enabled for rolling update. Oracle recommends you have multiple replicas so that the service is not down while upgrading.

Update the Kafka cluster configuration in the applications.yaml file:

$COMMON_CNTK/scripts/upgrade-applications.sh \
-p <kafka cluster namespace> \
-i <kafka cluster instance name> \
-f <path to override values yaml file> \
-a messaging-bus

For example, run the following command to upgrade the Kafka cluster and Kafka topic running in sr namespace with instance as quick:

$COMMON_CNTK/scripts/upgrade-applications.sh -p sr -i quick -f $SPEC_PATH/sr/quick/applications.yaml -a messaging-bus

Deleting Message Bus

Run the following script to delete or uninstall the Kafka cluster and Kafka Topic from the Kubernetes namespace:

$COMMON_CNTK/scripts/delete-applications.sh \
-p <kafka cluster namespace> \
-i <kafka cluster instance name> \
-f <path to override values yaml file> \
-a messaging-bus

For example: Run the following command to delete the Kafka cluster with Kafka topic running in sr namespace with instance as quick:

$COMMON_CNTK/scripts/delete-applications.sh -p sr -i quick -f $SPEC_PATH/sr/quick/applications.yaml -a messaging-bus

Validating Message Bus

Check the pods created for the Kafka cluster. The following sample output shows the internal listener configuration. If it has any external listener settings, the additional service objects appear:

$kubectl get svc -n sr
 
NAME                                  TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                      AGE
sr-quick-messaging-kafka-bootstrap    ClusterIP   <clusterIP>      <none>        9091/TCP,9092/TCP            22m
sr-quick-messaging-kafka-brokers      ClusterIP   None             <none>        9090/TCP,9091/TCP,9092/TCP   22m
sr-quick-messaging-zookeeper-client   ClusterIP   <clusterIP>      <none>        2181/TCP                     23m
sr-quick-messaging-zookeeper-nodes    ClusterIP   None             <none>        2181/TCP,2888/TCP,3888/TCP   23m

Check the Service object created for the Kafka cluster. The following sample output shows the Kafka and ZooKeeper replica as 1.

$kubectl get pod -n sr
 
 NAME                                                 READY   STATUS    RESTARTS   AGE
 sr-quick-messaging-entity-operator-*****-****        3/3     Running   0          27h
 sr-quick-messaging-kafka-0                           1/1     Running   0          27h
 sr-quick-messaging-zookeeper-0                       1/1     Running   0          27h

Check the Helm release:

$helm list -n sr
 
NAME                    NAMESPACE       REVISION        UPDATED          STATUS          CHART                   APP VERSION
sr-quick-messaging      sr              1               *****            deployed        kafka-cluster-<x.y.z>     <x.y.z>

Check the persistent volume claims created:

$kubectl get pvc -n sr`
 
NAME                                  STATUS   VOLUME                                    CAPACITY   ACCESS MODES   STORAGECLASS   AGE
data-sr-quick-messaging-kafka-0       Bound    <volume>  1Gi        RWO           sc             27h
data-sr-quick-messaging-zookeeper-0   Bound    <volume>  1Gi        RWO           sc             27h

Run a standalone producer or consumer. See "Internal access - same namespace – plain" to run standalone producer and consumer pods in a Kafka cluster namespace.

Note:

As part of deploying, upgrading, and deleting the Message Bus, the Kafka topics are also created, upgraded, and deleted from the configuration provided in the input yaml file.

Restarting Message Bus

The restart-application.sh script with application name as messaging-bus restarts all the subcomponents such as Kafka, ZooKeeper, and Entity Operators of the Message Bus. Run the following command to restart:

$COMMON_CNTK/scripts/restart-applications.sh -p sr -i quick -f $SPEC_PATH/sr/quick/applications.yaml -a messaging-bus

Note:

The Message Bus service restart requires to have multiple replicas so that the service is not down while upgrading and the replica count should be greater than or equal to 2.

To validate the restart option, see "Validating Message Bus".

Configuring the applications.yaml File

Modify the values in the applications.yaml file and upgrade or create the Message Bus service. The following configurations are available for the Message Bus service:

  • Image Pull Secrets
  • Security Context
  • Cluster Size
  • Storage
  • Broker Defaults
  • JVM Options
  • Kafka Topics
  • Accessing Kafka Cluster
  • Authentication

Using Image Pull Secrets

You use the Image Pull Secrets sample only while using the private container repository that requires authentication. These authentication details have to be provided as Kubernetes secret object in the namespace where the Kafka cluster is planned to be deployed. This process is also followed while deploying Strimzi Operator.

Note:

Provide the secret name in the kafka-cluster section, if using different secret name than in the Strimzi Operator's namespace.

Image Pull Secrets (Sample)

imagePullSecret:
  imagePullSecrets:
    - name: <secret name>

The sample command to create secret object for registry authentication is as follows:

kubectl create secret docker-registry <secret-name> --docker-server=<Image Registry> \
                                                    --docker-username=<Username> \
                                                    --docker-password=<Password> \
                                                     -n <Kafka-Namespace>

See https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/ to create the secret object.

Security Context

The userSecurity section that has securityContext is applicable only when you want to define privilege and access control settings for a pod or container. The pod security context which is configured at the pod-level is provided as a sample and is applied to all containers in given pod.

Note:

If a value is commented, it cannot be used, To use a different key-value, uncomment the corresponding value in applications.yaml.

See https://strimzi.io/blog/2022/09/09/configuring-security-context-in-pods-managed-by-strimzi/ and https://kubernetes.io/docs/tasks/configure-pod-container/security-context/ for more information.

Security-Context (Sample)
userSecurity:
  securityContext:
    runAsNonRoot: <true/false>
    runAsUser: <userID>
    runAsGroup: <groupID>
    fsGroup: <fsGroup>

Cluster Size

The Message Bus cluster consists of Kafka Brokers and Zookeeper nodes. Modify the replicas count for the Kafka Brokers and Zookeeper nodes according to the usage. For high availability of Message Bus service, make sure the number of replicas is minimum 3 for Kafka and Zookeeper, in production instance and adjust Kafka Broker configuration accordingly:

kafka-cluster:
  replicas:
    kafka: 3
    zookeeper: 3

Storage

The Message Bus uses Strimzi to deploy the Apache Kafka cluster in Kubernetes cluster. For Strimzi to work as required, an efficient data storage infrastructure is essential. Oracle recommends using a block storage as Strimzi is tested for using with block storage. For more information on data storage, see https://strimzi.io/docs/operators/latest/deploying#considerations-for-data-storage-str

The Message Bus Service stores the events (or messages) in block storage using the Kubernetes Persistent Volumes. Modify the values for class, size, and isDeleteClaim values in storage section under the Kafka cluster. The storage class must have dynamic persistent volume provision capability:

kafka-cluster:
  #storage:
    #When storage.type below is set as "persistent-claim", the storage class name & size are mandatory to be set
    #type: persistent-claim
    #class: psrnfsn1
    #size: 1Gi
    #isDeleteClaim: false

For development to use ephemeral (that is, temporary container storage), do not change the values. These values must be commented for ephemeral.

Broker Defaults

The following configuration is applied when the Topics are auto created. Modify the following settings in the kafkaConfig section under the Kafka cluster accordingly:

kafka-cluster:
  kafkaConfig:
    #The default replication factor for automatically created topics
    defaultReplicationFactor: 2
    offsetsTopicReplicationFactor: 2
    transactionStateLogReplicationFactor: 2
    transactionStateLogMinIsr: 2
    minInsyncReplicas: 2
    logRetentionMinutes: 30
    numPartitions: 3

The values for replicationFactors and minimum in-sync replicas must be entered according to the values entered in the Kafka Cluster. These values must be less than or equal to the Kafka Cluster replica values.

For more information on the values, see the Kafka documentation at: https://kafka.apache.org/081/documentation.html#brokerconfigs

JVM Options

The Message Bus cluster consists of Kafka Brokers and Zookeeper nodes. Modify the jvmOptions for Kafka Brokers and Zookeeper nodes according to the usage. See https://strimzi.io/docs/operators/latest/full/configuring.html#con-common-configuration-jvm-reference for more details.

jvmOptions:
  kafka:
    -Xms: 1024m
    -Xmx: 1024m
    # javaSystemProperties:
    #   - name: <placeHolder>
    #     value: <value>
 
  zookeeper:
    -Xms: 1024m
    -Xmx: 1024m
    # javaSystemProperties:
    #   - name: <placeHolder>
    #     value: <value>

Kafka Topics

Add or update the Kafka Topics in the applications.yaml file in the kafkaTopics section which are required for the Message Bus service clients (producers or receivers).

For example:

kafka-topic:
  #List of Kafka topics
  kafkaTopics:
    - name: <topic1>
      partitions: <no_partitions>
      replicas: <no_replicas>
      config: 
        retention: 7200000
        segmentBytes: 1073741824

The following topics are required for the UTIA integration which are defined in the applications.yaml file within the Common CNTK samples. These topics are created during the deployment of Message Bus service using Common CNTK:

Table 4-2 Topic, producer, and consumer details.

Topic Producer Consumer Additional Details
ora-uim-topology UIM Unified Topology See UIM System Administrator’s Guide for more details.
ora-fault-topology Assurance System Unified Topology See Unified Topology for more details
ora-retry-topology Unified Topology Unified Topology See Unified Topology for more details
ora-dlt-topology Unified Topology Unified Topology See Unified Topology for more details

Note:

Do not use the default topics (ora-uim-topology, ora-fault-topology, ora-retry-topology and ora-dlt-topology) for a standalone testing. Use only the ora-test-topic to test the deployment of Message Bus service.

Accessing Kafka Cluster

There are various listener type configurations available to access the Message Bus service internally and externally. The Authentication configuration is applied across all listener types. As part of Kafka cluster deployment, the Kubernetes service objects are created to provide access to Kafka cluster pods. This service objects are created based on the listener type configuration in the applications.yaml file for message-bus section. You can access the Message Bus service in any of the following ways:

  • Accessing within the same cluster (Internal access)
  • Accessing from outside of the cluster (External access)

Note:

When a Message Bus service is deployed, it autogenerates the certificates of TLS for server and client. You must use the custom certificates so that the certificates are retained when the service is terminated and created again. See "Using Wild Card Certificates" for more information.

Accessing the Message Bus service from within the same cluster (Internal access)

The internal listener configuration in the applications.yaml file is used when the client services are in the same Kubernetes cluster, which can be in the same namespace or a different namespace. This configuration is enabled by default.

kafka-cluster:
  listeners:
    #Plain is for internal access within the same k8s cluster.
    internal:
      # Enable the tls to true if encryption/decryption is needed for internal access
      #tls: false

See "Configuring Message Bus Listeners" for more information.

Accessing the Message Bus service from outside of the cluster (External access)

The ingress listener configuration in the applications.yaml file is used when the client services are outside of the Kubernetes cluster. This access is achieved using the ingress controller.


# To expose the kafka-cluster to external kafka clients via ingress controller uncomment the following and modify accordingly.
 
# Valid values are TRAEFIK, GENERIC
ingressController: <INGRESS_CONTROLLER>
 
#ingress:
#  #specify className field for ingressClassName of generic ingress controller.
#  #In case of nginx the default values is nginx
#  className: "nginx"
 
#provide loadbalancer port
# if TLS is enabled in global section, then loadbalancerport will be used as external port for Generic or Traefik.
loadbalancerport: <loadBalancer-port>
 
 
kafka-cluster:
  listeners:
    #To expose the kafka-cluster to external kafka clients via ingress controller (traefik or generic) uncomment the following and modify accordingly.
    #ingress:
    #  #The secure port of either ingress controller or external load-balancer. If TLS is Disabled in global, then below ingressSslPort will be used as external port.
    #  ingressSslPort: <LOADBALANCER_PORT>
    #  #If using Generic Ingress controller, below given annotations are mandatory for Message-Bus external access.
    #  #These annotations are required for nginx ingress controller.
    #  annotations:
    #    nginx.ingress.kubernetes.io/ingress.allow-http: "false"
    #    nginx.ingress.kubernetes.io/backend-protocol: "HTTPS"
    #    ingress.kubernetes.io/ssl-passthrough: "true"
    #    nginx.ingress.kubernetes.io/ssl-passthrough: "true"

See "Configuring Message Bus Listeners" for more information.

Accessing the Message Bus service using a nodeport listener

The nodeport listener configuration in applications.yaml file configuration is also used when the client services are outside of the Kubernetes cluster. The access is directly with the Kubernetes work node’s port.

Note:

Oracle does not recommend this listener for production. It must be used only for debugging where ingress controller is not deployed.
kafka-cluster:
  listeners:
    #To expose the kafka-cluster to external kafka clients without ingress controller, uncomment the following section and modify accordingly
    #nodeport:
      #default is true. can be turned off if needed  
      #tls: true
      #if need to expose on a static nodeport, pease uncomment the below section and provide values
      #nodePort: 32100

See "Configuring Message Bus Listeners" for more information.

Configuring Authentication

Kafka 2.0.0 or later supports an extensible OAuth 2.0 compatible token-based mechanism available, called SASL OAUTHBEARER. Strimzi has developed extensions that provide integration with OAuth 2.0 compliant authorization servers. That means, in principle, you can use any OAuth 2.0 compliant authorization server to enable centrally managed users for authentication with Kafka.

The Message Bus service uses a Strimzi operator to deploy Kafka brokers and in-turn use OAuth 2.0 token-based authentication while establishing a session to a Kafka broker. With this authentication, Message Bus clients (or Kafka clients) and Kafka brokers communicate with a central OAuth 2.0 compliant authorization server. These Kafka clients use the authorization server to obtain access tokens and are configured with access tokens issued by the server. Kafka brokers communicate with authorization server to validate the tokens presented by the clients, thus confirming their identities. You can perform the validation of access token using a fast local JWT validation or a token validation using an introspection endpoint.

To configure OAuth 2.0 support for Kafka Brokers in the Message Bus service, you need to update applications.yaml file and create or upgrade the service.

Prerequisites

  • The Authorization server (OAuth 2.0 compliant) is up and running. See "Deploying OAM along with OHS for Authentication Service" in Authentication Service
  • Configure the client for Kafka broker in the authorization server. See "Creating a Client" section in Authentication Service
  • Configure the clients for Kafka producer or consumer application in the authorization server. See "Creating a Client" section in Authentication Service
  • Kafka cluster is configured with oauth type Authentication. See the following sections.

Enable Authentication on Kafka Cluster:

This procedure describes how to configure Kafka brokers so that the broker listeners are enabled to use OAuth 2.0 authentication by using an authorization server.

Note:

Oracle recommends to use OAuth 2.0 over an encrypted interface through a listener with tls. Plain listeners are not recommended.

To enable authentication on the Kafka cluster:

  1. In applications.yaml, un-comment or add the following configurations:
    1. Set the authentication.enabled flag to true and update the loadbalancerhost, loadbalancerport and ohsHostname in $SPEC_PATH/sr/quick/applications.yaml file.
    2. To use fast local JWT validation, set useFastLocalJWTvalidation value to true under kafka-cluster.listeners.authentication. If not set, the introspection endpoint is used for validation.
      # The enabled flag is to enable or disable authentication
      authentication:
        enabled: true
       
      #Uncomment the below host aliases section and provide hostname to ipaddresss mappings
      #This will add entries to POD's /etc/hosts file for hostname resolution when DNS and other options are not applicable.
      #For more details see https://kubernetes.io/docs/tasks/network/customize-hosts-file-for-pods/
       
      #hostAliases:
      #- ip: <ip-address>
        #hostnames:
        #- <hostname-1> # Ex. quick.sr.ohs.uim.org
       
       
      #Sample sub-section for using fast local jwt validation
      kafka-cluster:
        listeners:
          authentication:
            useFastLocalJWTvalidation: true
  2. The Message Bus service uses other configuration values from Kubernetes Secret (<namespace>-<instance>-oauth-credentials) and Config Map (<namespace>-<instance>-oauth-config-cm) objects from the same namespace. This Secret and Configuration Map Kubernetes objects have to be created before deploying the Message Bus service for authentication. See "Adding Common OAuth Secret and ConfigMap" for creating the secret. The configuration values used are:
    • clientID: The client ID to identify the client.
    • clientSecret: The client secret used for authentication.
    • validIssuerUri: The URI of the token issuer used for authentication.
    • introspectionEndpointUri: The URI of the token introspection endpoint.
    • jwksEndpointUri: The endpoint with public keys of authentication server that has to be used for fast local JWT validation.
    • tlsTrustedCertificate: The trusted certificates for TLS connection to the authorization server.

The following optional values are supported for authentication. See Strimzi documentation https://strimzi.io/docs/operators/in-development/configuring.html#type-KafkaListenerAuthenticationOAuth-reference for details on each value. Add the following optional values as required, under the kafka-cluster.listeners.authentication section in applications.yaml file:

# Additional optional authentication values
kafka-cluster:
  listeners:
    authentication:
      oauthConfig:
        #Enable or disable audience checking
        checkAudience:
        #Enable or disable issuer checking.  By default issuer is checked using the value configured by validIssuerUri 
        checkIssuer:
        #The audience to use when making requests to the authorization server’s token endpoint
        clientAudience:
        #The scope to use when making requests to the authorization server’s token endpoint
        clientScope:
        #The connect timeout in seconds when connecting to authorization server
        connectTimeoutSeconds:
        #Enable or disable TLS hostname verification. Default value is false. 
        disableTlsHostnameVerification:
        #The read timeout in seconds when connecting to authorization server.
        readTimeoutSeconds:
        #URI of the User Info Endpoint to use as a fallback to obtaining the user id
        userInfoEndpointUri:
        #Name of the claim from the JWT authentication token 
        userNameClaim:

Using GC Logs

By default, GC logs are disabled, you can enable it and view the logs on stdout by using kubectl logs <kafka-cluster-pod-name>.

To Enable GC logs, update $SPEC_PATH/<project>/<instance>/applications.yaml file as follows:

  1. Under gcLogs make enabled as true.
  2. Uncomment the gcLogs option under kafka-cluster to override common values.
    gcLogs:
      enabled: true

Note:

You do not have to configure fileSize and noOfFiles as the logs are printed on the stdout.

Alternate Configuration Options

There are various alternate options for configuring the Message Bus.

Log Level

Kafka uses Apache log4j. By default, it is enabled with INFO. Update this for debugging:

logging:
  kafka:
    logLevel: INFO
  zookeeper:
    logLevel: INFO

Choosing Worker Nodes for Running Message Bus Service

Update the Message Bus service configuration section in the applications.yaml file to node affinity or pod affinity and anti-affinity to constrain which nodes your pod can be scheduled. Alternatively, co-locate the pods in same node (or separate) and run either create or upgrade script.

Node Affinity

Node affinity is conceptually similar to nodeSelector, that enables you to constrain which nodes your pod can be scheduled, based on the node labels.

There are two types of node affinities:

  • Schedule a pod using required node affinity: The scheduler cannot schedule the pod unless the rule is met.
  • Schedule a pod using preferred node affinity: The scheduler tries to find a node that meets the rule. If a matching node is not available, the scheduler continues to schedule the pod.

Preferred node affinity

The sample configuration for enabling preferred node affinity is as follows:

kafka-cluster:
  affinity:
    nodeAffinity:
      preferredDuringSchedulingIgnoredDuringExecution:
      - weight: 1
        preference:
          matchExpressions:
          - key: name
            operator: In
            values:
            - south_zone

Kubernetes pod is scheduled on the node with label name as south_zone. If node with label name: south_zone is not available, pod will still be scheduled on another node.

Pod Affinity and Anti-Affinity

The Pod Affinity or anti-affinity allows you to constrain which node your pod is eligible to be scheduled, based on the labels on other pods.

Similar to node affinity, there are two types of pod affinity and anti-affinity:

  • requiredDuringSchedulingIgnoredDuringExecution
  • preferredDuringSchedulingIgnoredDuringExecution

Pod Affinity

Assign a Kubernetes pod to a node based on the labels on other pods using the Pod Affinity in a Kubernetes cluster. Modify the Kafka cluster override values yaml file.

The sample configuration for enabling the required pod affinity is as follows:

kafka-cluster:
  affinity:
    podAffinity:
      requiredDuringSchedulingIgnoredDuringExecution:
        - labelSelector:
            matchExpressions:
              - key: app.kubernetes.io/name
                operator: In
                values:
                  - kafka
          topologyKey: "kubernetes.io/hostname"

Kubernetes pod is scheduled on the node which contains a pod with label http://app.kubernetes.io/name: kafka.

Modify the Kafka cluster override values yaml file. The sample configuration for enabling the preferred pod affinity is as follows:

kafka-cluster:
  affinity:
    podAffinity:        
      preferredDuringSchedulingIgnoredDuringExecution:
        - weight: 1
          podAffinityTerm:
            labelSelector:
              matchExpressions:
                - key: app.kubernetes.io/name
                  operator: In
                  values:
                  - kafka
            topologyKey: "kubernetes.io/hostname"

The Kubernetes pod is scheduled on the node which contains a pod with label http://app.kubernetes.io/name: kafka. If the node is not available, pod will still be scheduled on another node.

Pod anti-affinity

Assign a Kubernetes pod to a node based on the labels on other pods using pod anti affinity in a Kubernetes cluster.

Modify the Kafka cluster override values yaml file. The sample configuration with required pod anti-affinity is as follows:

kafka-cluster:
  affinity:
    podAntiAffinity:
      requiredDuringSchedulingIgnoredDuringExecution:
        - labelSelector:
            matchExpressions:
              - key: app.kubernetes.io/name
                operator: In
                values:
                  - kafka
          topologyKey: "kubernetes.io/hostname"

Kubernetes pod is scheduled on the node which does not contain a pod with label http://app.kubernetes.io/name: kafka.

Modify the Kafka cluster's override values yaml file. The sample configuration with preferred pod anti-affinity is follows:

kafka-cluster:
  affinity:
    podAntiAffinity:
      preferredDuringSchedulingIgnoredDuringExecution:
        - weight: 1
          podAffinityTerm:
            labelSelector:
              matchExpressions:
                - key: app.kubernetes.io/name
                  operator: In
                  values:
                  - kafka
            topologyKey: "kubernetes.io/hostname"

Kubernetes pod is scheduled on the node which does not contains a pod with label http://app.kubernetes.io/name: kafka. If node is not available, pod will still be scheduled on another node.

Managing Message Bus Metrics

Metrics in Message Bus are configured by enabling the JMX Exporter and Kafka Exporter. JMX Exporter can be enabled to get JVM metrics of Kafka cluster and Kafka Exporter can be enabled on a Kafka cluster to extract additional Prometheus metrics data from Kafka brokers, which is related to offsets, consumer groups, consumer lag, and topics.

See https://strimzi.io/docs/operators/latest/overview.html#metrics-overview_str for more information on metrics from Strimzi.

Enable metrics

Enable Kafka Exporter and JMX Exporter in the $SPEC_PATH/sr/quick/applications.yaml file and upgrade or create the Message Bus service. The sample content is as follows:

kafka-cluster:
  metrics:
    kafkaExporter: 
      enable: true
    jmxExporter:
      enable: true

The above configuration exposes the Prometheus metrics for Kafka Brokers, Topics, and Consumer Groups components on metrics end-point on the pods. You can view these details on Prometheus UI by configuring the Scrape job. You can view this information in the form of graphs using the Grafana dashboard.

See https://github.com/danielqsj/kafka_exporter#metrics to see the exposed metrics.

Prometheus and Grafana setup

See Setting Up Prometheus and Grafana for more information.

Adding scrape Job in Prometheus

Add the following Scrape job in Prometheus Server. This can be added by editing the config map used by the Prometheus server:

- job_name: Message_bus
  kubernetes_sd_configs:
  - role: pod
    namespaces:
      names:
      - 'sr'
  relabel_configs:
  - separator: ";"
    regex: __meta_kubernetes_pod_label_(strimzi_io_.+)
    replacement: $1
    action: labelmap
  - source_labels: [__meta_kubernetes_namespace]
    separator: ";"
    regex: (.*)
    target_label: namespace
    replacement: $1
    action: replace
  - source_labels: [__meta_kubernetes_pod_name]
    separator: ";"
    regex: (.*)
    target_label: kubernetes_pod_name
    replacement: $1
    action: replace
  - source_labels: [__meta_kubernetes_pod_node_name]
    separator: ";"
    regex: (.*)
    target_label: node_name
    replacement: $1
    action: replace
  - source_labels: [__meta_kubernetes_pod_host_ip]
    separator: ";"
    regex: (.*)
    target_label: node_ip
    replacement: $1
    action: replace

Sample Grafana dashboards

Add the Prometheus data source and import the sample Grafana dashboards from Strimzi github.

The sample Grafana dashboard for Kafka and JMX Exporters can be downloaded from the following links:

Installing and Configuring Mirror Maker 2.0

This section describes the installation and configuration of Mirror Maker 2.0.

Configuring Source and Target Message Bus (Kafka cluster) Details

Update the $COMMON_CNTK/samples/messaging-bus/kafka-mirror-maker/values.yaml with source and target Kafka cluster details as follows:

sourceCluster:
    #Source Kafka cluster 
    name: sr1-quick1-messaging  
    #Bootstarp server for connection to the source Kafka cluster
    bootstrapServers: sr1-quick1-messaging-kafka-bootstrap:9092 
  targetCluster:
    #Target Kafka cluster
    name: sr2-quick2-messaging
    #Bootstarp server for connection to the target Kafka cluster
    bootstrapServers: sr2-quick2-messaging-kafka-bootstrap:9092 

In the above command:

  • sourceCluster.name is the helm release for source Kafka cluster (sr1-quick1-messaging)
  • sourceCluster.bootstrapServers is the bootstrap server of source Kafka cluster (sr1-quick1-messaging-kafka-bootstrap:9092)
  • targetCluster.name is the helm release for target Kafka cluster (sr2-quick2-messaging)
  • targetCluster.bootstrapServers is the bootstrap server of target Kafka cluster (sr2-quick2-messaging-kafka-bootstrap:9092)

Note:

To enable geo replication between the Kafka clusters from different namespaces, we can use the hostname pattern as servicename.namespace.svc.cluster.local while updating
$COMMON_CNTK/samples/messaging/kafka-mirror-maker/values.yaml

If the sr1-quick1-messaging-kafka-bootstrap service is hosted in Strimzi namespace on 9092 port and the client application in another namespace, then the bootstrap-server URL should be used as sr1-quick1-messaging-kafka-bootstrap.strimzi.svc.cluster.local

If the target cluster is in another Kubernetes cluster, you must to use external listener for referring to the boostrap server.

While using Nodeport, the worker node IP of the target cluster is to be used as the target cluster bootstrap address along with the exposed nodeport.

While using Ingress, the hostname of the target cluster is to be used as target cluster bootstrap address.

Installing Mirror Maker

Run the following command to install Mirror Maker in specific namespace:
helm install mirror-maker $COMMON_CNTK/samples/messaging/kafka-mirror-maker/ -n <namespace> --values $COMMON_CNTK/samples/messaging/kafka-mirror-maker/values.yaml

Validate that Mirror Maker is installed by running the following command:

kubectl get pods -n <namespace>
replication-mirror-maker-mirrormaker2-5c6d7dd7d7-r89cj             1/1     Running   0          67m
kubectl get svc -n <namespace>
replication-mirror-maker-mirrormaker2-api          ClusterIP   <clusterIP>   <none>        8083/TCP                              67m

Uninstalling Mirror Maker

Run the following command to uninstall Mirror Maker from specific namespace:

helm uninstall mirror-maker -n <namespace>

Delete topic mm2-offset-syncs.messaging-test.internal from the source cluster (dev1-messaging)

$kubectl -n <SourceKafkaClusterNamespace> run kafka-topic -ti --image=<STRIMZI_KAFKA_IMAGE_NAME> --rm=true --restart=Never -- bin/kafka-topics.sh --bootstrap-server <instance>-messaging-kafka-bootstrap:9092 --delete --topic mm2-offset-syncs.messaging-test.internal

Delete topics heartbeats, mirrormaker2-cluster-status, mirrormaker2-cluster-offsets, mirrormaker2-cluster-configs from the target cluster (dev2-messaging)

$kubectl -n <TargetKafkaClusterNamespace> run kafka-topic -ti --image=<STRIMZI_KAFKA_IMAGE_NAME> --rm=true --restart=Never -- bin/kafka-topics.sh --bootstrap-server <namespace>-<instance>-messaging-kafka-bootstrap:9092 --delete --topic heartbeats
 
$kubectl -n <TargetKafkaClusterNamespace> run kafka-topic -ti --image=<STRIMZI_KAFKA_IMAGE_NAME> --rm=true --restart=Never -- bin/kafka-topics.sh --bootstrap-server <namespace>-<instance>-messaging-kafka-bootstrap:9092 --delete --topic mirrormaker2-cluster-status
 
$kubectl -n <TargetKafkaClusterNamespace> run kafka-topic -ti --image=<STRIMZI_KAFKA_IMAGE_NAME> --rm=true --restart=Never -- bin/kafka-topics.sh --bootstrap-server <namespace>-<instance>-messaging-kafka-bootstrap:9092 --delete --topic mirrormaker2-cluster-offsets
 
$kubectl -n <TargetKafkaClusterNamespace> run kafka-topic -ti --image=<STRIMZI_KAFKA_IMAGE_NAME> --rm=true --restar

Client Access

Accessing Message Bus in events producer and consumers clients.

Internal Access in the Same namespace for Plain

When the message producer or consumer applications are in same namespace as the Message Bus service then they can access the Kafka cluster using the Bootstrap Kubernetes service object name and port.

Run the following command to test the standalone producer. Here the project namespace is sr and instance is quick.

$kubectl -n sr run kafka-producer-plain -ti \
--image=<STRIMZI_KAFKA_IMAGE_NAME> \
--rm=true --restart=Never \
-- bin/kafka-console-producer.sh \
--bootstrap-server sr-quick-messaging-kafka-bootstrap:9092 \
--topic ora-test-topic

Type a few lines of text and each ENTER sends a message to Kafka broker. Type CTRL-C to quit.

Run the following command to test the standalone consumer. Here the project namespace is sr and instance is quick.

$kubectl -n sr run kafka-consumer-plain -ti \
--image=<STRIMZI_KAFKA_IMAGE_NAME> \
--rm=true --restart=Never \
-- bin/kafka-console-consumer.sh \
--bootstrap-server sr-quick-messaging-kafka-bootstrap:9092 \
--group ora-uim-consumer-test --isolation-level read_committed \
--topic  ora-test-topic --from-beginning

You get responses after the validation is successful.

Internal Access in a Different namespace for Plain

When the massage producer or consumer applications are in different namespace than the Message Bus service then they can access the Kafka cluster using the bootstrap service name and port but need to suffix <namespace>.svc.cluster.local to the service name.

See "Internal access - same namespace - plain" section on running the standalone console test producer and consumer pods for testing. Replace the bootstrap-server url with sr-quick-messaging-kafka-bootstrap.sr.svc.cluster.local, where the namespace is sr and instance is quick.

Internal Access in the Same namespace for Authentication

When the message producer or consumer applications are in same namespace as the Message Bus service then they can access the Kafka cluster using the bootstrap Kubernetes service object name and port.

Create a test client pod definition.

  1. Copy the following YAML content into the bastion host (or worker node) as mb-test-client-deployment.yaml file.
  2. Update the hostAliases section according to your OAuth service environment.
  3. Update the STRIMZI_KAFKA_IMAGE_NAME.
  4. Update the OAUTH Endpoint, Client Id and Secret.
  5. Update the OAUTH Endpoint, Client Id, Client Secret, Scope, Audience, and anything else that are applicable to your client configuration
apiVersion: apps/v1
kind: Deployment
metadata:
  name:  mb-test-auth-client-deployment
  labels:
    app:  mb-test-auth-client
spec:
  replicas: 1
  selector:
    matchLabels:
      app:  mb-test-auth-client
  template:
    metadata:
      labels:
        app:  mb-test-auth-client
    spec:
#     <Uncomment below and replace with your bootstrap and brokers DNS names>  
      #hostAliases:
      #- ip: <LOADBALANCER_IP>
      #  hostnames:
      #  - "<OHS_HOSTNAME>"
      containers:
      - name:  mb-test-client
        image: <STRIMZI_KAFKA_IMAGE_NAME>
        command:
        - "tail"
        - "-f"
        - "/dev/null"
        imagePullPolicy: IfNotPresent
        env:
        - name: OAUTH_TOKEN_ENDPOINT_URI
          value: <Update the OAUTH_TOKEN_ENDPOINT_URI>
        - name: OAUTH_CLIENT_ID
          value: <Update the OAUTH_CLIENT_ID>
        - name: OAUTH_CLIENT_SECRET
          value: <Update the OAUTH_CLIENT_SECRET>
       # - name: OAUTH_SCOPE
       #   value: <Uncomment and update OAUTH_SCOPE>
       # - name: OAUTH_AUDIENCE
       #   value: <Uncomment and update OAUTH_AUDIENCE>
        ports:
        - containerPort: 9090
          name: http
          protocol: TCP

Create the authentication properties in a file (mb_test_client.properties).

sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
security.protocol=SASL_PLAINTEXT
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler

Run the test client container and provide authentication properties

#Apply the test client pod definition in the namespace (say "sr").
$kubectl apply -f mb-test-client-deployment.yaml -n sr
 
#Get the newly created pod name
$kubectl get pod -n sr | grep mb-test-auth-client-deployment
 
#Sample Output
#mb-test-auth-client-deployment-******-****         1/1     Running   0          98s
 
#Copy the mb_authentication.properties file into the pod
$kubectl -n sr cp mb_test_client.properties mb-test-auth-client-deployment-******-****:/home/kafka/mb_test_client.properties

Test for message bus producer client:

  • Start an interactive shell process in the test client pod
  • Export the environment variables needed for the authentication
  • Run the console producer command.
  • Enter some string messages
#Get the newly created pod name
kubectl get pod -n sr | grep mb-test-auth-client-deployment
 
#Exec into the newly created pod
kubectl exec -it mb-test-auth-client-deployment-******-**** -n sr -- bash
 
#Run the following test console producer
bin/kafka-console-producer.sh \
--producer.config /home/kafka/mb_test_client.properties \
--bootstrap-server sr-quick-messaging-kafka-bootstrap:9092 \
--topic ora-test-topic

Test for message bus consumer client:

  • Start an interactive shell process in the test client pod
  • Export the environment variables needed for the authentication
  • Run the console consumer command.
  • You will see the previous string messages of producer
#Get the newly created pod name
kubectl get pod -n sr | grep mb-test-auth-client-deployment
 
#Sample Output
#mb-test-auth-client-deployment-******-****         1/1     Running   0          98s
 
#Exec into the newly created pod
kubectl exec -it mb-test-auth-client-deployment-******-**** -n sr -- bash
 
#Run the following test console consumer
bin/kafka-console-consumer.sh \
--consumer.config /home/kafka/mb_test_client.properties \
--bootstrap-server sr-quick-messaging-kafka-bootstrap:9092 \
--topic ora-test-topic \
--from-beginning

External ingress access - SSL and Authentication

The external access to Message Bus is provided through Ingress controller (Traefik or Generic) with TLS enabled. The following must be performed in clients for testing:

  • Export and import the Message Bus service (that is sr-quick-messaging-cluster-ca-cert, where sr is namespace and quick is instance) certificate into clients.
  • Export and import the certificate of OAuth service into the clients.

    Note:

    This is optional and is required only if OAuth is enabled for SSL.
  • Update the bootstrap and brokers DNS names with load balancer IP in the etc/hosts file of clients (that is, event producer or consumer applications).
  • Update the DNS name of OAuth service with load balancer IP in /etc/hosts file of clients.

    Note:

    This is optional and is required only if the OAuth service requires DNS name to access.
  • Run the producer or consumer script with SSL and Authentication details

In the following section, the external ingress access test is provided with Strimzi Kafka container. If you want to test the client code without Kubernetes cluster then you can download the Apache Kafka and perform the same.

Add Message Bus service and OAuth service certifications to trust store. See Import/export of TLS certificates section.

#Run the below command to export and import the Message Bus service certificate into the trust store (mb-cert-keystore.jks) file.
$COMMON_CNTK/scripts/export-cluster-cert.sh -p sr -i quick -l . -k ./mb-test-client-cert-keystore.jks -a mb-cert
 
#Get the OAuth (OAM) service certificate and import into trust store (mb-test-client-cert-keystore.jks) file (Optional, needed if OAuth is SSL)
keytool -importcert -alias oauth-server -file <Path to OAuth Server certificate, the .pem file> -keystore ./mb-test-client-cert-keystore.jk --trustcacerts -noprompt

Create the following authentication properties in a file (mb_test_client.properties).

sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
ssl.endpoint.identification.algorithm=

Create a test client pod definition.

  1. Copy the following YAML content into the bastion host (or worker node) as "mb-test-client-deployment.yaml" file.
  2. Update the Strimzi Kafka image.
  3. Update the hostAliases section according to your OAuth and Message Bus service setup. This will add entries to /etc/hosts file.
  4. Update the OAuth Endpoint, Client Id, Client Secret, and Trust Store Password in env section.

Note:

You can override the value of subDomainNameSeparator. The default is .. This value can be changed as "-" to match the wild card pattern of SSL certificates.

To override, uncomment and change this value in applications.yaml. See "Using Wild Card Certificates" for more information.

apiVersion: apps/v1
kind: Deployment
metadata:
  name:  mb-test-client-deployment
  labels:
    app: mb-test-client
spec:
  replicas: 1
  selector:
    matchLabels:
      app:  mb-test-client
  template:
    metadata:
      labels:
        app:  mb-test-client
    spec:
#     <Uncomment below and replace with your bootstrap and brokers DNS names> 
#     hostAliases:
#     - ip: <Replace with your LOADBALANCER_IP>
#       hostnames:
#       - "<INSTANCE.PROJECT.messaging.broker0.uim.org>"
#       - "<INSTANCE.PROJECT.messaging.brokerN.uim.org>"
#       - "<INSTANCE.PROJECT.messaging.bootstrap.uim.org>"
#       - "<Replace with OHS_HOSTNAME>"
      containers:
      - name:  mb-test-client
        image: quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
        command:
        - "tail"
        - "-f"
        - "/dev/null"
        imagePullPolicy: IfNotPresent
        env:
        - name: OAUTH_TOKEN_ENDPOINT_URI
          value: <Replace with your OAUTH_TOKEN_ENDPOINT_URI>
        - name: OAUTH_CLIENT_ID
          value: <Replace with your OAUTH_CLIENT_ID>
        - name: OAUTH_CLIENT_SECRET
          value: <Replace with your OAUTH_CLIENT_SECRET>
        #- name: OAUTH_SCOPE
        #  value: <Uncomment and replace with your OAUTH_SCOPE>
        #- name: OAUTH_AUDIENCE
        #  value: <Uncomment and replace with yours OAUTH_AUDIENCE>
        - name: KAFKA_OPTS
          value: " \
                  -Djavax.net.ssl.trustStore=/home/kafka/mb-test-client-cert-keystore.jks \
                  -Djavax.net.ssl.trustStorePassword=<Replace with your store password> \
                  -Djavax.net.ssl.trustStoreType=JKS"
        ports:
        - containerPort: 9090
          name: http
          protocol: TCP

Run the test client container and apply readiness for authentication and SSL.

#Apply the test client pod definition in the namespace (say "sr").
$kubectl apply -f mb-test-client-deployment.yaml -n sr
 
#Get the newly created pod name
$kubectl get pod -n sr | grep mb-test-client-deployment
 
#Sample Output
#mb-test-client-deployment-******-****         1/1     Running   0          98s
 
#Copy the certificate store into the newly created pod.  Replace the pod name below
kubectl -n sr cp mb-test-client-cert-keystore.jks <Replace with mb-test-client-deployment pod name>:/home/kafka/mb-test-client-cert-keystore.jks  
 
#Copy the mb_test_client.properties file into the POD
kubectl -n sr cp mb_test_client.properties <Replace with mb-test-client-deployment pod name>:/home/kafka/mb_test_client.properties

Start a shell session inside container for console test producer.

#Get the newly created pod name
$kubectl get pod -n sr | grep mb-test-auth-client-deployment
 
#Sample Output
#mb-test-auth-client-deployment-******-****         1/1     Running   0          98s
 
#Exec into the newly created pod
kubectl exec -it<Replace with mb-test-client-deployment pod name> -n sr -- bash
 
#Run the following producer command
bin/kafka-console-producer.sh \
--producer.config /home/kafka/mb_test_client.properties \
--bootstrap-server quick.sr.messaging.bootstrap.uim.org:30443 \
--topic ora-test-topic 

Start start a shell session inside container for console test consumer:

#Exec into the newly created pod
kubectl exec -it <Replace with mb-test-client-deployment pod name> -n sr -- bash
 
#Run the following producer command.  Replace the bootstrap-server url accordingly to your environment
bin/kafka-console-consumer.sh \
--consumer.config /home/kafka/mb_test_client.properties \
--bootstrap-server sthatipa.sr.messaging.bootstrap.uim.org:30443 \
--consumer-property group.id=test-client-service \
--topic ora-test-topic --from-beginning

Clean-up the newly created test pod:

kubectl delete -f mb-test-client-deployment.yaml -n sr

External node port access

The nodeport listener type allows the external access from outside of the Kubernetes cluster using the load balancer or Kubernetes worker node ip address and nodePort(port of worker node).

The Bootstrap URL is constructed with worker node IP Address and node port of bootstrap service.

Get the host port of the external bootstrap service using the following command:

$kubectl get service sr-quick-messaging-kafka-nodeport-bootstrap -o=jsonpath='{.spec.ports[0].nodePort}{"\n"}' -n sr
 
Output: 32100

Get the IP Address of the Kubernetes worker node. Replace the <NODE_NAME> in the following with your node name:

$kubectl get node <NODE_NAME> -o=jsonpath='{range .status.addresses[*]}{.type}{"\t"}{.address}{"\n"}' -n sr
 
Output:
InternalIP  100.xx.xx.142
Hostname    *********

Update the Kafka cluster Bootstrap URL as 100.xx.xx.142:32100 in the events producer and consumer applications.

To access with plain, see "Internal access - same namespace - plain" section. Replace the bootstrap URL with above constructed one.

To access with Authentication, see "Internal access - same namespace - authentication" section. Replace the bootstrap URL with above constructed one.

To access with SSL and Authentication, see "External ingress access - SSL & Authentication" section. Replace the bootstrap URL with above constructed one.

Import/export of TLS certificates

To enable TLS encrypted access, the ca-certs of Kafka cluster is needed to be extracted and imported into key store and the location of that key store is used as the producer or consumer properties in events application.

Export the ca-certs of the Kafka cluster using the following command:

$COMMON_CNTK/scripts/export-cluster-cert.sh -p <Namespace of kafka cluster> \
-i <instance name of kafka cluster> \
-l <directory to export clustercerts temporarily> \
-k <keystore-location> \
-a <alias for cert>

For example:

$COMMON_CNTK/scripts/export-cluster-cert.sh -p sr -i quick -l . -k ./mb-cert-keystore.jks -a mb-sr-quick-cert

The export-cluster-cert.sh script creates JKS type truststore by default in the provided key store location. If any other truststore type is created, specify that as producer or consumer property while running the clients. These exported artifacts can be used in Kafka client applications.

Note:

If custom certificates were used during cluster creation, then these can be directly provided through a keystore than extracting the generated certs.

Using custom certificates

Custom certificates can be used while creating the Kafka cluster:

Prerequisites:

  • Certificates and keys are to be in PEM format.
  • Key should not be encrypted. Encrypted keys are not supported since they need user interaction for entering the passphrase during access.

Creating a custom certificate

To create a custom certificate, see Self-signed SSL Certificates.

Create Kubernetes secret

Run the following command by replacing the placeholders:

kubectl create secret generic <secret-name> --from-file=<key-file-name> --from-file=<certificate-file-name>
 For example:
kubectl create secret generic myCustomCertSecret --from-file=commonkey.pem --from-file=commoncert.pem

Update Kafka Cluster configuration

Update the customCerts configuration section in Kafka cluster's override values yaml file:

kafka-cluster:
  ## to enable custom or owned certs for tls please create a kubernetes secret with the cert and key if not already present, uncomment the below section and add respective values.
    ## please be advised that encrypted keys are not supported since they require user interaction for the passphrase
    customCerts:
      # Secret in which cert and key are present
      secretName: <secret-name created above>
      certName: <certificate file used in the secret created above>
      keyName: <key-file used in the secret created above>

Configuring Message Bus Listeners

Message Bus has three listeners (internal, ingress and nodeport) to access the service. These are described the in following sections.

Message Bus Internal Listener

The following is the configuration for internal listener type which can be commented or uncommented.

 kafka-cluster:
  listeners:
  # plain is for internal access within the same k8s cluster.
    internal:

From same namespace in cluster

This is an internal access method that is used by the message producer or consumer clients (or applications) when they are deployed in same namespace as the Message Bus service. This is enabled by default with internal listener type. To access the Message Bus, the producer or consumer applications must get the Bootstrap service URL of the Kafka cluster.

To get the Bootstrap service URL of the Kafka cluster run the following command:

kubectl get svc -n sr | grep sr-quick-messaging-kafka-bootstrap
 
sr-quick-messaging-kafka-bootstrap       ClusterIP   <clusterIP>    <none>        9091/TCP,9092/TCP

Note:

The project namesapce is sr and instance is quick.

Use the sr-quick-messaging-kafka-bootstrap:9092 URL in the producer and consumer client configuration in the applications.

From another namespace in cluster

This is an internal access method which is used by the producer or consumer client applications when they are deployed in different namespace than the message-bus service. This is enabled by default with internal listener type. To access the Message Bus, the producer or consumer client applications have to get the Bootstrap service URL of the Kafka cluster and convert the URL pattern as serviceName.namespace.svc.cluster.local.

If the sr-quick-messaging-kafka-bootstrap service is hosted in sr namespace on 9092 port and the client applications from different namespace can access the Kafka cluster with Bootstrap URL as sr-quick-messaging-kafka-bootstrap.sr.svc.cluster.local:9092

Message Bus Ingress Listener

This is an external access method which is used by message producer or consumer applications when they are deployed outside of the Kubernetes cluster. This is disabled by default and must be enabled in the applications.yaml. This external access is provided through the Traefik Ingress Controller and Generic Ingress Controller to the Kafka cluster. To enable this external access, the ingress listener type configuration must be enabled in the Kafka cluster configuration yaml file.

Ingress listener type

Un-comment the ingress lister type section in applications.yaml file to expose the Message Bus Service outside of Kubernetes cluster. Ingress controller (Traefik or Generic) should be deployed in order for this ingress listener type to work and Message Bus namespace must be registered with Traefik operator. In case of Generic Ingress, set ingress.className according to your Generic Ingress Controller.

In case of Generic Ingress controller (nginx), annotations given under the kafka-cluster.listeners.ingress.annotations tag in applications.yaml are mandatory.

# To expose the kafka-cluster to external kafka clients via ingress controller uncomment the following and modify accordingly. # Valid values are TRAEFIK, GENERIC
ingressController: "TRAEFIK"
 
#ingress:
#  #specify className field for ingressClassName of generic ingress controller.
#  #In case of nginx the default values is nginx
#  className: "nginx"
 
#provide loadbalancer port
# if TLS is enabled in global section, then loadbalancerport will be used as external port for Generic or Traefik
loadbalancerport: <loadBalancer-port>
 
kafka-cluster
  listeners:
    ingress:
      # if TLS is Disabled in global, then ingressSslPort will be used as external port.
      ingressSslPort: <LoadBalancer_SSL_Port>
      # If using Generic Ingress controller, below given annotations are mandatory for Message-Bus external access.
      # These annotations are required for nginx ingress controller in Message-Bus.
      annotations: 
        nginx.ingress.kubernetes.io/ingress.allow-http: "false"
        nginx.ingress.kubernetes.io/backend-protocol: "HTTPS"
        ingress.kubernetes.io/ssl-passthrough: "true"
        nginx.ingress.kubernetes.io/ssl-passthrough: "true"

In external producer or consumer messaging clients (or applications), the following must be done to access the Kafka cluster through Ingress controller.

  • The Bootstrap server and advertised broker host names must be configured in DNS at client side.
  • Import the TLS certificate and trust stores from the Kafka cluster into client configurations.
  • Add required additional properties in Kafka producer or consumer client configuration.

DNS settings in client applications host

The Bootstrap server host name and advertised broker host names must be configured in /etc/hosts file in producer and consumer client applications with the Traefik or Load Balancer IP Address. Hostnames are pre-configured when deployed with ingress listener type enabled with the following pattern:

bootstrap-server:  <kafka-cluster-instance-name>.<kafka-cluser-project-name>.messaging.bootstrap.uim.org 
broker-0:          <kafka-cluster-instance-name>.<kafka-cluser-project-name>.messaging.broker0.uim.org 
broker-1:          <kafka-cluster-instance-name>.kafka-cluser-project-name>.messaging.broker1.uim.org
 
For example if a instance is quick and namesapce is sr then the hostnames will be as follows:
bootstrap-server:  quick.sr.messaging.bootstrap.uim.org 
broker-0:          quick.sr.messaging.broker0.uim.org
broker-1:          quick.sr.messaging.broker1.uim.org

Note:

You can override the value of subDomainNameSeparator. The default value is ".", This value can be changed to "-" to match the wild card pattern of SSL certificates.

To override the value, uncomment and change it in applications.yaml as follows:

#subDomainNameSeparator: "."

#Example hostnames for "-" : quick-sr-messaging-bootstrap.uim.org

Importing certificates into client applications

See the "Import/export of TLS certificates" section in “Client Access” section for exporting the ca-certs of Kafka cluster to producer or consumer applications.

Message Bus NodePort Listener

This is another external access method which is used by events producer or consumer client applications when they are deployed out-side of the Kubernetes cluster and wants to access the message-bus service without ingress controller.

Node port

The following configuration in the application yaml file allows exposing the nodeport listener type to access the Message Bus externally with tls and OAuth 2.0 Authentication.

Kafka-cluster:
  listeners:
    #To expose the kafka-cluster to external kafka clients without ingress controller, uncomment the following section and modify accordingly.
    nodeport:
      tls: true
      # if need to expose on a static nodeport, please uncomment the below nodePort key and provide values.
      nodePort: 32100
      authentication: true

When the tls is enabled the certificates of the Kafka cluster must be imported in the events producer and consumer clients to access the Kafka cluster.

See the "Import/export of TLS certificates" section in “Client Access” section for exporting the auto-generated ca-certs of Kafka cluster.

Debugging and Troubleshooting

NotEnoughReplicasException

When you get the org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required. The reason could be that the topics replicas is not meeting the default minInsyncReplicas value configured in the Message Bus service.

Asynchronous auto-commit of offsets failed

When you get the following error in the logs (for example: UTIA Consumer). To resolve this make sure that max.polling.interval.ms is always greater than the last poll or else reduce the max.poll.records.

[Consumer clientId=consumer-ora-uim-topology-service-2, groupId=ora-uim-topology-service] Asynchronous auto-commit of offsets failed: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.. Will continue to join group.

Add these additional properties in the YAML file under the mp.messaging.connector.helidon-kafka section with override values.

mp.messaging:
  connector:
    helidon-kafka:
      # The following are default global configuration values which effects for all the consumer groups.
      max.polling.interval.ms: 300000
      max.poll.records: 500
 
  # The following are channel specific configuration values
  incoming:
    # The toInventoryChannel effects only for ora-uim-topology-service consumer group
    # uncomment and update the specific values
    #toInventoryChannel:
      #max.polling.interval.ms: 300000
      #max.poll.records: 500
 
    # The toFaultChannel effects only for ora-uim-topology-retry-service consumer group
    # Uncomment and update the specific values
    #toRetryChannel:
      #max.polling.interval.ms: 300000
      #max.poll.records: 200
  
    # The toDltChannel effects only for ora-uim-topology-dlt-service consumer group
    # uncomment and update the specific values
    #toDltChannel:
      #max.polling.interval.ms: 300000
      #max.poll.records: 100

Performance Tuning: Consumer Configurations

The following are some consumer configuration properties in message consumers which are related to performance. See https://kafka.apache.org/documentation/#consumerconfigs for all available consumer config properties.

  • max.poll.records (default=500) defines the maximum number of messages that a consumer can poll at once.
  • max.partition.fetch.bytes (default=1048576) defines the maximum number of bytes that the server returns in a poll for a single partition.
  • max.poll.interval.ms (default=300000) defines the time a consumer must process all messages from a poll and fetch a new poll afterward. If this interval is exceeded, the consumer leaves the consumer group.
  • http://heartbeat.interval.ms (default=3000) defines the frequency with which a consumer sends heartbeats.
  • http://session.timeout.ms (default=10000) defines the time a consumer must send a heartbeat. If no heartbeat was received in that timeout, the member is considered dead and leaves the group.

Managing Consumer Groups

For more list of options available on the consumer groups see the apache kafka managing consumer groups section. The following sub-sections list some significant operations. See "Message Bus Client Access" for more information.

List consumer groups

#Exec into running message bus test client pod
kubectl exec -it mb-test-auth-client-deployment-******-**** -n sr -- bash  
 
#Run the below command to list all the consumer groups
bin/kafka-consumer-groups.sh \
--command-config /home/kafka/mb_test_client.properties \
--bootstrap-server <Your Bootstrap Server URL> \
--list

Describe consumer group

#Exec into running Kafka admin client pod
kubectl exec -it mb-test-auth-client-deployment-******-**** -n sr -- bash 
 
#Run the below command to describe specific consumer group to check topics, partitions, offsets
#Replace the command-config, bootstrap, group values accordingly
bin/kafka-consumer-groups.sh \
--command-config /home/kafka/mb_test_client.properties \
--bootstrap-server <Your Bootstrap Server URL> \
--group test-client-service \
--describe

Reset offset of a consumer group

#Exec into running Kafka admin client pod
kubectl exec -it mb-test-auth-client-deployment-******-**** -n sr -- bash  
 
#Run the below command to reset offset for consumer group for topic to latest.  See Apache Kafka documentation for other available options.
#Replace the command-config, bootstrap, group and topic values accordingly
bin/kafka-consumer-groups.sh \
--command-config /home/kafka/mb_test_client.properties \
--bootstrap-server <Your Bootstrap Server URL> \
--group test-client-service \
--reset-offsets \
--topic ora-test-topic \
--to-latest \
--execute

Topics

For more detailed list of operations available on the topics see the "Apache Kafka Operations".The following sub-sections list some significant operations. See "Message Bus Client Access" for more information.

Create

Create a topic with three partitions and two replications.

#Exec into running Kafka admin client pod
kubectl exec -it mb-test-auth-client-deployment-******-**** -n sr -- bash
 
 
#Run the below command to create a topic
bin/kafka-topics.sh \
 --command-config /home/kafka/mb_test_client.properties \
 --bootstrap-server <Your Bootstrap Server URL> \
 --create \
 --topic replicated-2 \
 --replication-factor 2 \
 --partitions 3

List

To list all topics:

#Exec into running Kafka admin client pod
kubectl exec -it mb-test-auth-client-deployment-******-**** -n sr -- bash
 
 
#Run the below command to list all the topic
bin/kafka-topics.sh \
 --command-config /home/kafka/mb_test_client.properties \
 --bootstrap-server <Your Bootstrap Server URL> \
 --list

Describe

Describes the topic and its partition count, replicas factory along with leaders for the partition.

#Exec into running Kafka admin client pod
kubectl exec -it mb-test-auth-client-deployment-******-**** -n sr -- bash
 
#Run the below command to describe the topic
bin/kafka-topics.sh \
 --command-config /home/kafka/mb_test_client.properties \
 --bootstrap-server <Your Bootstrap Server URL> \
 --topic replicated-2 \
 --describe
 
#Sample output
Topic: replicated-2      TopicId: vyalpPOmR0CtYt7Sc-gbxA           PartitionCount: 3               ReplicationFactor: 2          Configs: min.insync.replicas=1,message.format.version=3.0-IV1
 
Topic: replicated-2     Partition: 0     Leader: 1      Replicas: 1,0     Isr: 1,0
Topic: replicated-2     Partition: 1     Leader: 0      Replicas: 0,1     Isr: 0,1
Topic: replicated-2     Partition: 2     Leader: 1      Replicas: 1,0     Isr: 1,0

Alter

You can alter a topic and increase the partitions to 2.

#Exec into running Kafka admin client pod
kubectl exec -it mb-test-auth-client-deployment-******-**** -n sr -- bash
 
#Run the below command to alter the topic  bin/kafka-topics.sh \
 --command-config /home/kafka/mb_test_client.properties \
 --bootstrap-server <Your Bootstrap Server URL> \
 --alter \
 --topic <Your Topic Name> \
 --partitions 1

Reassignment

The partition reassignment tool can also be used to selectively move replicas of a partition to a specific set of brokers. In the following example the partitions for topic (replicated-2) are reassigned to different brokers.

See the "Message Bus Client Access"section for more information on running the message bus test pod with required configuration such as Authentication and SSL.

Create a file called custom-reassignment.json file a terminal

{"version":"1", "partitions":[{"topic":"replicated-2","partition":"0","replicas":"[0,1]"},{"topic":"replicated-2","partition":1,"replicas":"[1,2]"},{"topic":"replicated-2","partition":"2","replicas":"[0,2]"}]}

Run the following commands for reassignment:

#Copy the custom-reassignment.json file into the newly created pod under /home/kafka directory
$kubectl cp custom-reassignment.json mb-test-auth-client-deployment-*****-****:/home/kafka/custom-reassignment.json -n kafka 
 
 
#Exec into running test pod
kubectl exec -it mb-test-auth-client-deployment-******-**** -n sr -- bash  #Cd directory to /home/kafka
 
#Validate the topic ("replicated-2"
/opt/kafka/bin/kafka-topics.sh \
--command-config /home/kafka/mb_test_client.properties \
--bootstrap-server <Your Bootstrap Server URL> \
--topic replicated-2 --describe
Topic: replicated-2     TopicId: vyalpPOmR0CtYt7Sc-gbxA PartitionCount: 3       ReplicationFactor: 2    Configs: min.insync.replicas=1,message.format.version=3.0-IV1
        Topic: replicated-2     Partition: 0    Leader: 1       Replicas: 1,0   Isr: 1,0
        Topic: replicated-2     Partition: 1    Leader: 1       Replicas: 0,1   Isr: 1,0
        Topic: replicated-2     Partition: 2    Leader: 1       Replicas: 1,0   Isr: 1,0
 
#Run reassign-partitions script to reassign the partitions according to the json file
$/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server dev-messaging-kafka-bootstrap:9092 --reassignment-json-file custom-reassignment.json --execute
 
Current partition replica assignment
 
{"version":1,"partitions":[{"topic":"replicated-2","partition":0,"replicas":[1,0],"log_dirs":["any","any"]},{"topic":"replicated-2","partition":1,"replicas":[0,1],"log_dirs":["any","any"]},{"topic":"replicated-2","partition":2,"replicas":[1,0],"log_dirs":["any","any"]}]}
 
Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for replicated-2-0,replicated-2-1,replicated-2-2
 
#Verfify the reassignment status
$/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server dev-messaging-kafka-bootstrap:9092 --reassignment-json-file custom-reassignment.json --verify
 
Status of partition reassignment:
Reassignment of partition replicated-2-0 is complete.
Reassignment of partition replicated-2-1 is complete.
Reassignment of partition replicated-2-2 is complete.
 
Clearing broker-level throttles on brokers 0,1,2
Clearing topic-level throttles on topic replicated-2
 
# Validate the partition assignments
$/opt/kafka/bin//kafka-topics.sh \
  --command-config /home/kafka/mb_test_client.properties \
  --bootstrap-server <Your Bootstrap Server URL> \
  --topic replicated-2 --describe
Topic: replicated-2     TopicId: vyalpPOmR0CtYt7Sc-gbxA PartitionCount: 3       ReplicationFactor: 2    Configs: min.insync.replicas=1,message.format.version=3.0-IV1
        Topic: replicated-2     Partition: 0    Leader: 1       Replicas: 0,1   Isr: 1,0
        Topic: replicated-2     Partition: 1    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: replicated-2     Partition: 2    Leader: 0       Replicas: 0,2   Isr: 0,2