11 Kafka Cluster Management Procedures

The following sections describes the procedure to manage the kafka cluster.

11.1 Creating Topics for OCNADD

Create topics (MAIN, SCP, SEPP, and NRF) using admin service, before starting data ingestion. For more details on topic and partitions see, "Kafka PVC Storage Requirements" section of Oracle Communications Network Analytics Data Director Benchmarking Guide.

To create a topic connect to any worker node and send a POST curl request to the API Endpoint described below.

API Endpoint : <ClusterIP:Admin Port>/ocnadd-admin-svc/v1/topic
{
    "topicName":"<topicname>",
    "partitions":"3",
    "replicationFactor":"2",
    "retentionMs":"120000"
}

Note:

  • In case worker node access is not available then the adminservice Service-Type can be changed to LoadBalancer or NodePort in the admin service values.yaml (helm upgrade is required for any such changes)
  • For Loadbalancer service ensure that the admin port is not blocked in the cluster.

11.2 Adding Partitions to an Existing Topic

The topics in OCNADD must be created with the necessary partitions for the supported MPS during the deployment. In case the traffic load increases beyond the supported MPS, there may be the need to increase the number of partitions in the existing topics. This section describes the procedure to increase the number of partitions in the corresponding topic.

Caution:

The number of partitions cannot be decreased using this procedure. If partition reduction is required, the topic needs to be recreated with the desired number of partitions, in such case the complete data loss of the concerned topic is anticipated.

To add partitions to an existing topic:
  1. Login to the bastion host.
  2. Enter the Kafka broker POD using the following command:
    "kubectl exec -it kafka-broker-0 -n <namespace> --  bash"

    Example:

    kubectl exec -it kafka-broker-0 -n ocnadd-deploy --bash
  3. Inside the POD change directory to "/ocnadd/kafka/bin"
  4. Run the following command:
    "unset JMX_PORT"
  5. Describe the corresponding topic using the following command:
    ./kafka-topics.sh  --topic <topic_name>--describe  --bootstrap-server
        kafka-broker-0:9092

    Example:

    ./kafka-topics.sh --topic SCP --describe --bootstrap-server  kafka-broker-0:9092

    The command lists the topic details such as number of partitions, replication factor, retentionMs etc.

  6. Add partitions in the topic using the following command:
    /kafka-topics.sh --topic <topic_name> --alter --bootstrap-server kafka-broker-0:9092 --partitions
            <partition_count>
  7. Verify that the partitions have been added to the topic using the following command:
    ./kafka-topics.sh --topic <topic_name> --describe --bootstrap-server kafka-broker-0:9092 

    Example:

    ./kafka-topics.sh --topic SCP --describe --bootstrap-server kafka-broker-0:9092
  8. Exit from the Kafka Broker POD (container).

11.3 Enabling Kafka Log Retention Policy

In Kafka, the log retention strategy determines how long the data is kept in the broker's logs before it is purged to free up storage space.

By default, these parameters are not available in the OCNADD helm chart. The parameters are customizable and can be added in 'ocnadd/charts/ocnaddkafka/templates/scripts-config.yaml' using the helm upgrade. The Kafka Broker restart is expected.

There are two main approaches to log retention in Kafka:
  • Time-based retention:

    Once the logs reach the specified age, they are considered eligible for deletion, and the broker will start a background task to remove the log segments that are older than the retention time. The time-based retention policy applies to all logs in a topic, including both the active logs that are being written and the inactive logs that have already been compacted.

    The retention time is usually set using the "log.retention.hours" or "log.retention.minutes" configuration.

    Parameters used:

    log.retention.minutes=5 Default value for "log.retention.minutes" is set to 5 min.

  • Size-based retention:

    Once the logs reach the specified size threshold, the broker will start a background task to remove the oldest log segments to ensure that the total log size remains below the specified limit. The size-based retention policy applies to all logs in a topic, including both the active logs that are being written and the inactive logs that have already been compacted.

    Parameters for size-based retention:
    #The maximum size of a single log file
    log.segment.bytes=1073741824
     
    #The maximum size of the log before deleting it
    log.retention.bytes=32212254720
     
    #Enable the log cleanup process to run on the server
    log.cleaner.enable=true
     
    #This is default cleanup policy. This will discard old segments when their retention time or size limit has been reached.
    log.cleanup.policy=delete
     
    #The interval at which log segments are checked to see if they can be deleted
    log.retention.check.interval.ms=1000
     
    #The amount of time to sleep when there are no logs to clean
    log.cleaner.backoff.ms=1000
     
    #The number of background threads to use for log cleaning
    log.cleaner.threads=5

Calculate "log.retention.bytes":

The log retention size can be calculated as follows:

Example: For 80% threshold of the PVC claim size, "log.retention.bytes" are calculated as:: (pvc(in bytes) / TotalPartition) * threshold/100

where, TotalPartition: sum of partitions of all the topics.

If any topic has replication factor 2 , then number of partitions will be twice the number of partitions in that topic. For more information about number of partitions per topic, see "Kafka PVC Storage Requirements" section of Oracle Communications Network Analytics Data Director Benchmarking Guide.

Note:

  • It is important to choose an appropriate time-based retention and size-based retention policy that balances the need for retaining data for downstream processing against the need to free up disk space. If the retention size or retention time is set too high, it may result in large amounts of disk space being consumed. If it is set to a very low value, then the important data may be lost.
  • Kafka allows for a combination of both time-based and size-based retention by setting both "log.retention.hours" or "log.retention.minutes" and "log.retention.bytes" configurations. The broker retains logs for the smaller value.
  • The "log.segment.bytes" is used to control the size of log segments in a topic's partition and is usually set to a relatively high value (for example, 1 GB) to reduce the number of segment files and minimize the overhead of file management. It is recommended to set this value lower based on smaller PVC size.