6 Kafka Cluster Management Procedures
The following sections describes the procedure to manage the kafka cluster.
6.1 Creating Topics for OCNADD
- Create topics (MAIN, SCP, SEPP, PCF, BSF and NRF) using configuration service, before starting data ingestion.
- If data has to be ingested from thirdparty NFs, then create topic "NON_ORACLE" using admin service, before starting data ingestion.
- For more information on topic creation and partitions, see the "OCNADD resource requirement" section in Oracle Communications Network Analytics Data Director Benchmarking Guide.
To create a topic connect, to any worker node and send a POST curl request to the API Endpoint described below.
API Endpoint: <ClusterIP:Config Port>/ocnadd-configuration/v1/<worker-group>/topic
where <worker-group> = <workerGroupNamespace>:<clusterName>
For example:
curl -k --location "https://ocnaddconfiguration:12590/ocnadd-configuration/v1/dd-worker-group1:dd-cluster/topic"
curl -k --location --cert-type P12 --cert /var/securityfiles/keystore/clientKeyStore.p12:$OCNADD_SERVER_KS_PASSWORD "https://ocnaddconfiguration:12590/ocnadd-configuration/v1/dd-worker-group1:dd-cluster/topic"
{
"topicName":"<topicname>",
"partitions":"3",
"replicationFactor":"2",
"retentionMs":"120000"
}
Note:
- In case worker node access is not available then the adminservice
Service-Type can be changed to LoadBalancer or NodePort in the admin service
values.yaml
(helm upgrade is required for any such changes). - For Loadbalancer service ensure that the admin port is not blocked in the cluster.
6.2 Adding Partitions to an Existing Topic
The topics in OCNADD must be created with the necessary partitions for the supported MPS during the deployment. In case the traffic load increases beyond the supported MPS, there may be the need to increase the number of partitions in the existing topics. This section describes the procedure to increase the number of partitions in the corresponding topic.
Caution:
The number of partitions cannot be decreased using this procedure. If partition reduction is required, the topic needs to be recreated with the desired number of partitions, in such case the complete data loss of the concerned topic is anticipated.
- Login to the bastion host.
- Enter inside any POD in the deployed OCNADD namespace, execute the
following command:
kubectl exec -it <pod-name>-n <namespace> -- bash
Example:
kubectl exec -ti -n ocnadd-deploy ocnaddadminservice-xxxxxx -- bash
Note:
The procedure has been explained using the SCP topic name and https curls as an example. Modify the command with respect to the topic name you are updating. And use http in case intraTls is not enabled. - Describe the corresponding topic using the below command. Also
provide the worker group name in the command
below:
curl -k --location "https://ocnaddconfiguration:12590/ocnadd-configuration/v1/<worker-group>/topic/SCP"
If intraTLS is enabled or intraTls & mTLS enabled then use the following command:
curl -k --location --cert-type P12 --cert /var/securityfiles/keystore/clientKeyStore.p12:$OCNADD_SERVER_KS_PASSWORD "https://ocnaddconfiguration:12590/ocnadd-configuration/v1/<worker-group>/topic/SCP"
The above command will list the topic details such as number of partitions, replication factor, retentionMs, and so on.
- Add/increase partitions in the topic by executing the following
command. Also provide the worker group name in the below command:
curl -v -k --location --request PUT "https://ocnaddconfiguration:12590/ocnadd-configuration/v1/<worker-group>/topic" --header 'Content-Type: application/json' \ --data-raw '{ "topicName": "SCP", "partitions": "24" }'; if intraTLS is enabled or intraTls & mTLS enabled then use the following command curl -v -k --location --cert-type P12 --cert /var/securityfiles/keystore/clientKeyStore.p12:$OCNADD_SERVER_KS_PASSWORD --request PUT "https://ocnaddconfiguration:12590/ocnadd-configuration/v1/<worker-group>/topic" --header 'Content-Type: application/json' \ --data-raw '{ "topicName": "SCP", "partitions": "24" }';
- Verify that the partitions have been added to the topic by
executing the following command. Also provide the worker group name in the
command
below:
curl -k --location "https://ocnaddconfiguration:12590/ocnadd-configuration/v1/<worker-group>/topic/SCP"
If intraTLS is enabled or intraTls & mTLS enabled then use the following command:curl -k --location --cert-type P12 --cert /var/securityfiles/keystore/clientKeyStore.p12:$OCNADD_SERVER_KS_PASSWORD "https://ocnaddconfiguration:12590/ocnadd-configuration/v1/<worker-group>/topic/SCP"
- Exit from the POD (container).
6.3 Enabling Kafka Log Retention Policy
In Kafka, the log retention strategy determines how long the data is kept in the broker's logs before it is purged to free up storage space.
- Time-based retention:
Once the logs reach the specified age, they are considered eligible for deletion, and the broker will start a background task to remove the log segments that are older than the retention time. The time-based retention policy applies to all logs in a topic, including both the active logs that are being written and the inactive logs that have already been compacted.
The retention time is usually set using the "log.retention.hours" or "log.retention.minutes" configuration.
Parameters used:
log.retention.minutes=5 The default value for "log.retention.minutes" is set to 5 min.
- Size-based retention:
Once the logs reach the specified size threshold, the broker will start a background task to remove the oldest log segments to ensure that the total log size remains below the specified limit. The size-based retention policy applies to all logs in a topic, including both the active logs that are being written and the inactive logs that have already been compacted.
By default, these parameters are not available in the OCNADD helm chart. The parameters are customizable and can be added in 'ocnadd/charts/ocnaddkafka/templates/scripts-config.yaml', a helm upgrade needs to be performed in order to apply the parameters. A Kafka Broker restart is expected.
Parameters for size-based retention:
For size-based retention add the below parameters in 'ocnadd/charts/ocnaddkafka/templates/scripts-config.yaml' file, and perform a helm upgrade to apply the parameters.
#The maximum size of a single log file
log.segment.bytes=1073741824
#The maximum size of the log before deleting it
log.retention.bytes=32212254720
#Enable the log cleanup process to run on the server
log.cleaner.enable=true
#This is default cleanup policy. This will discard old segments when their retention time or size limit has been reached.
log.cleanup.policy=delete
#The interval at which log segments are checked to see if they can be deleted
log.retention.check.interval.ms=1000
#The amount of time to sleep when there are no logs to clean
log.cleaner.backoff.ms=1000
#The number of background threads to use for log cleaning
log.cleaner.threads=5
Calculate "log.retention.bytes":
The log retention size can be calculated as below.
Example: For 80% threshold of the PVC claim size,
"log.retention.bytes" will be calculated as: (pvc(in bytes) / TotalPartition) * threshold/100
Here TotalPartition will be the sum of partitions of all the topics. If any topic has replication factor 2, then the number of partitions will be twice the number of partitions in that topic. See Oracle Communications Network Analytics Data Director Benchmarking Guide section "OCNADD resource requirement" to determine the number of partitions per topic.
Note:
- It's important to choose an appropriate time-based retention and size-based retention policy that balances the need for retaining data for downstream processing against the need to free up disk space. If the retention size or retention time is set too high, it may result in large amounts of disk space being consumed, and if it's set too low, important data may be lost.
- Kafka also allows for a combination of both time-based and size-based retention by setting both "log.retention.hours" or "log.retention.minutes" and "log.retention.bytes" configurations, in which the broker will retain logs for the shorter of the two.
- The "log.segment.bytes" is used to control the size of log segments in a topic's partition and is usually set to a relatively high value (for example, 1 GB) to reduce the number of segment files and minimize the overhead of file management. It is recommended to set this value lower based on smaller PVC size.
- The above procedures should be applied for all the Kafka Broker clusters corresponding to each of the worker groups.
6.4 Expanding Kafka Storage
With the increase in throughput requirements in the OCNADD, the storage allocation in Kafka should also be increased. If the user had previously allocated storage for lower throughput, then, storage should be increased to meet the new throughput requirements.
The procedure should be applied to all the available Kafka Broker clusters corresponding to each of the worker groups in the Centralized deployment mode.
Caution:
- The PVC expansion is recommended to be performed before the release upgrade, for example, if the Data Director release is running in 23.3.0 (source release) and planned to be upgraded to 23.4 or higher (target release) and there is a need for the expanding the PVC storage because of higher throughput support in the target release then PVC expansion must be done in the source release using the below mentioned procedures
- It is not possible to rollback the Kafka to the previous release if the Kafka PVC size has been increased after the upgrade. In the case of rollback is still required then DR procedures for the Kafka should be followed and this may result in data loss if data is not consumed already.
- Check the
StorageClass
to which your PVC is attached.Note:
- PVC storage size can only be increased. It cannot be decreased.
- It is mandatory to keep the same PVC storage size for all the Kafka brokers.
- Run the following command to get the list of storage
classes:
kubectl get sc
Sample output:NAME PROVISIONER RECLAIMPOLICY VOLUMEBINDINGMODE ALLOWVOLUMEEXPANSION AGE occne-esdata-sc rook-ceph.rbd.csi.ceph.com Delete Immediate true 125d occne-esmaster-sc rook-ceph.rbd.csi.ceph.com Delete Immediate true 125d occne-metrics-sc rook-ceph.rbd.csi.ceph.com Delete Immediate true 125d standard (default) rook-ceph.rbd.csi.ceph.com Delete Immediate true 125d
- Run the following command to describe a storage
class:
kubectl describe sc <storage_class_name>
For example:kubectl describe sc standard
Sample output:Name: standard IsDefaultClass: yes Provisioner: rook-ceph.rbd.csi.ceph.com Parameter: clusterID=rook-ceph,csi.storage.k8s.io/ AllowVolumeExpansion: True MountOptions: discard ReclaimPolicy: Delete VolumeBindingMode: Immediate Events: <none>
The PVC should belong to the StorageClass which has "AllowVolumeExpansion
" set asTrue
:AllowVolumeExpansion: True
- Run the following command to list all the available
PVCs:
kubectl get pvc -n <namespace>
For example:kubectl get pvc -n ocnadd-deploy
Sample output:NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE backup-mysql-pvc Bound pvc-6e0c8366-fdaa-488e-a09d-f976e74f025b 20Gi RWO standard 8d kafka-broker-security-zookeeper-0 Bound pvc-4f3c310c-1173-4d3d-824a-ddd2640ab028 5Gi RWO standard 8d kafka-broker-security-zookeeper-1 Bound pvc-70fffae2-4f2a-4ecc-8f74-abfa003a8c1e 5Gi RWO standard 8d kafka-broker-security-zookeeper-2 Bound pvc-e1b0b96d-e000-4e5c-a01c-d41cc8f3cfa4 5Gi RWO standard 8d kafka-volume-kafka-broker-0 Bound pvc-48845cf6-1708-400a-808e-5b9b3cda7242 20Gi RWO standard 8d kafka-volume-kafka-broker-1 Bound pvc-8acbde89-b223-4015-a170-ad6417b08be7 20Gi RWO standard 8d kafka-volume-kafka-broker-2 Bound pvc-7b444cba-d1ab-496b-8b60-7c7599ef8754 20Gi RWO standard 8d kafka-volume-kafka-broker-3 Bound pvc-e1584794-8e62-4f6a-8eff-c0c86e0864a1 20Gi RWO standard 8d ocnadd-cache-volume-ocnaddcache-0 Bound pvc-5bf8ce9c-7ea6-4cf3-b51e-6ca94104dc5b 1Gi RWO standard 8d ocnadd-cache-volume-ocnaddcache-1 Bound pvc-ddb0b53c-ec69-4e07-ac44-3ab98bee1d4c 1Gi RWO standard 8d
- Run the following command to edit the required PVC and update the
storage
size:
kubectl edit pvc <pvc_name> -n <namespace>
For example:kubectl edit pvc kafka-volume-kafka-broker-0 -n ocnadd-deploy
Sample output:spec: accessModes: - ReadWriteOnce resources: requests: storage: <pvc_size>Gi
Repeat steps 2 and 3 for each available Kafka PVC. Increase the size of the next Kafka broker PVC only after confirming that the current Kafka broker PVC's augmented size is reflected in the output of step 3. To increase the <pvc_size> for all the Kafka broker pods created during deployment, follow these steps.
To understand the storage requirement for your Kafka broker pods based on supported throughput, see "Kafka PVC-Storage Requirements" section in Oracle Communications Network Analytics Data Director Benchmarking Guide.
- Run the following command to delete the stateful
set:
kubectl delete statefulset --cascade=orphan <statefulset_name> -n <namespace>
- In the
ocnadd-custom-values-25.1.200.yaml
file, update the PVC size (.Values.ocnaddkafka.ocnadd.kafkaBroker.pvcClaimSize
) to match the value configured in step 4.ocnaddkafka: ocnadd: ################ # kafka-broker # ################ kafkaBroker: name: kafka-broker replicas: 4 pvcClaimSize: <pvc_size>Gi
- Perform the Helm upgrade to recreate the stateful set of Kafka
broker:
helm upgrade <release_name> <worker_grp_chart_name> -n <worker_grp_namespace> -f ocnadd-custom-values_<worker_grp>.yaml