Using Apache Kafka

Apache Kafka is an open source distributed publish-subscribe messaging platform purpose-built to handle real-time streaming data for distributed streaming, pipelining, and replay of data feeds for fast, scalable operations. Kafka is a broker-based solution that operates by maintaining streams of data as records within a cluster of servers.

In Big Data Service clusters, Kafka can be used in the following ways.

  1. Create a Kafka profile cluster:
    1. Create a cluster.
    2. In the Cluster profile field, select Kafka.
  2. Create an Hadoop_Extended profile cluster, and add Kafka to the cluster:
    1. Create a cluster.
    2. In the Cluster profile field, select Hadoop_Extended.
    3. Add Kafka to the cluster.

Apache Kafka Best Practices

Hardware Requirements

Apache Kafka, for its regular functioning, requires a small amount of resources, especially with some configuration tuning. By default, Kafka can run on 1 core and 1 GB of memory with storage scaled based on requirements for data retention.

CPU is rarely a bottleneck because Kafka is I/O heavy. However, a moderately sized CPU with enough threads is important to handle concurrent connections and background tasks.

  • Kafka Broker Node: Eight cores, 64 GB to 128 GB of RAM, two or more 2 TB disks (standard2.8 or higher preferably DenseIO or equivalent)
  • A minimum of three Kafka broker nodes
  • Hardware Profile: More RAM and high-speed disks are better
  • Install Kafka brokers on worker nodes as they can horizontally grow in size

Recommended Kafka Cluster Topology

  1. Remove node manager from worker node
  2. Because Kafka brokers need at least three nodes for replication/HA you might consider provisioning additional worker nodes for Kafka.
  3. Provision additional HDFS worker nodes and decommission both the data node and node manager.
    Note

    Current worker nodes are modeled after HDFS worker nodes, which are being repurposed for Kafka broker nodes. Hence, if Kafka broker nodes are run along with HDFS data nodes, HDFS will lose effective storage.

Some common parameters to tune while setting up Kafka are as follows:

Features to Tune Parameters to Adjust
Message Retention Disk size
Client Throughput (Producer and Consumer) Network capacity
Producer throughput Disk I/O
Consumer throughput Memory

These parameters vary from case to case and need to be set carefully to achieve better performance. No single setting to suits all use cases.

ZooKeeper

ZooKeeper is an important component of a Kafka cluster that acts as a distributed coordination service. ZooKeeper is in charge of monitoring and preserving the cluster's metadata, coordinating the operations of many nodes, and assuring the general stability and consistency of the Kafka cluster.

Big Data Service HA clusters include three Zookeeper hosts. However, for larger production use cases, we recommend horizontally scaling the Zookeeper hosts as it's shared among other services withing Big Data Service cluster.

Performance Considerations

Kafka is optimized out of the box. However, some tuning is required to improve cluster performance. Consider two main metrics:

  • Throughput: The number of messages that arrive in a particular amount of time.
  • Latency: The amount of time it takes to process each message.

Tuning Brokers

You control the number of partitions in a topic. Increasing the number of partitions and the number of brokers in a cluster leads to increased parallelism of message consumption, which in turn improves the throughput of a Kafka cluster. However, the time required to replicate data across replica sets also increases.

Tuning Producers

You can run a producer in two different modes: synchronous and asynchronous. In synchronous mode, as soon as a message is published, the producer sends a request to the broker. Therefore, if you’re producing 100 messages a second, the producer sends out 100 requests a second to the broker. This decreases throughput and acts as a blocking operation. So when publishing a high number of messages, it’s better to run producers in asynchronous mode.

In asynchronous mode, you must tune two parameters for best performance: batch.size and linger.ms (linger time). Batch size is the size of data to be sent in one batch, measured in bytes. For example, if you set batch size to 100, the producer waits until messages add up to 100 bytes before making a call to the broker. If message production is low and you set a high batch size, the producer waits a long time before eventually producing messages. This reduce throughput and increases message-delivery latency. Therefore, depending on the number of messages being produced, this value must be optimized. The default batch size is 16,384.

Linger time is another metric based on when a producer decides to send a request to a broker. Using the previous example, if the batch size is set to 100 bytes and you’re only producing 50 bytes per second, the producer must wait two seconds before publishing those messages. To avoid this delay, you can tune the linger time (measured in milliseconds) to ensure that the producer doesn’t wait too long before sending out messages. Setting the linger time to 500 ms in this example makes the producer wait half a second at most.

Compression can also improve latency. By default, Kafka messages aren't compressed, but you can configure producers to compress them. Brokers and consumers then have the added overhead of decompressing messages, but the overall latency should be reduced as the physical size of data transmitted over the network is smaller.

Tuning Consumers

Consumers receive messages in batches, similar to how producers publish in batches. If you pull a large number of messages and take a lot of time to process each one, the throughput suffers. Similarly, if you poll the broker for a single message every time, the number of requests to the broker might decrease throughput.

Having more partitions and consumers within a consumer group can help improve throughput. But remember that as the number of consumers increases, the number of offset commit requests to the broker also increases. Because committing an offset is sending a Kafka message to an internal topic, this increases the load on the broker indirectly. Therefore, having an optimal number of consumers is crucial.

MirrorMaker Performance Optimization

Kafka MirrorMaker is a tool used to mirror Kafka messages from one data center or cluster to another. Because this is internally producing messages to Kafka, most optimization techniques already discussed hold true here as well. Because this also involves transmitting messages over long distances, there are a few more configuration parameters that can be tuned for better performance.

Note

When tuning, be sure to base your actions on the needs of your business use case.
  • MirrorMaker2 Location: MirrorMaker can be installed either at the source or the destination. However, we recommend installing at the destination because producing messages over long distances increases the chances of losing data during transmission.
  • Compression: By default, message compression in Kafka producers is set to none. However, to compress messages going out from the source to the destination, change compression to gzip. This, helps with larger batch sizes.
  • Batch Size: Increasing the batch size of messages increases throughput. Combining this with compression ensures that a large number of messages are being transmitted quickly. If the target batch size is taking more time than the linger time configured, the batches being sent out aren't completely filled. This decreases compression efficiency and wastes bandwidth. Therefore, tuning batch size along with enabling compression and tuning linger time is important.
  • Linger Time: Increasing linger time to allow batches to be filled completely is important. This might increase latency, but overall throughput improves. You must consider how important latency is for your business use case.
  • Increase Parallelism: To increase throughput further, you can deploy multiple instances of MirrorMaker under the same consumer group. This facilitates multiple MirrorMaker consumers receiving from the same source and producing to the destination in parallel.

Production Server Configurations in Kafka Tuning

Several other configuration parameters can be tuned to improve the performance of Kafka in a production environment. The following parameters improve the replication performance of messages within partitions:

  • num.replica.fetchers: Specifies the number of threads used to replicate messages from the leader to the followers. A larger number of replica fetchers improves parallelism in replication.
  • replica.fetch.max.bytes: Indicates the number of bytes of data to fetch from the leader. A larger number indicates a larger chunk of data being fetched, improving the throughput of replication.
  • num.partitions: Specifies the maximum number of consumers a topic can have in a specific user group, which is equal to the number of partitions available in that topic. Increasing the partitions increases parallelism and therefore throughput. However, a large number of partitions also consume more resources, so you must scale up resources as well.

Balancing Apache Kafka Clusters

Whenever a new broker is added to a Kafka cluster, existing partitions aren't distributed through the new broker. This means the new broker isn't busy, and if one or more old brokers go down, replication and potential leaders are reduced. This is known as leader skew. You can avoid this by making sure any newly added broker gets a share of the partitions. Rebalancing the cluster is important. Similarly, if a broker has more than the average number of partitions for a topic, known as broker skew, it can lead to performance issues.

Optimizing Kafka Performance

When running Kafka as a cluster, there are multiple ways to optimize its performance. You can tune various configuration parameters to strike a balance between throughput and latency. Engineering is involved to calculate the best values for some of these parameters, such as linger time, batch size, number of partitions, and so on. Depending on the use case, you might decide that throughput is more important than latency, that latency is more important than throughput, or that a balance between the two is best.