5 Using the Kafka Handler

The Oracle GoldenGate for Big Data Kafka Handler is designed to stream change capture data from a Oracle GoldenGate trail to a Kafka topic. Additionally, the Kafka Handler provides optional functionality to publish the associated schemas for messages to a separate schema topic. Schema publication is currently only supported for Avro schemas because of the direct dependency of Avro messages upon an Avro schema.

Apache Kafka is an open source, distributed, partitioned and replicated messaging service. Kafka and its associated documentation are available at www.kafka.apache.org.

Kafka can be run as a single instance or as a cluster on multiple servers. Each Kafka server instance is called a broker. A Kafka topic is a category or feed name to which messages are published by the producers and retrieved by consumers.

The Kafka Handler implements a Kafka producer that writes serialized change capture data from multiple tables to one topic.

This chapter contains the following sections:

5.1 Setup and Running

Instructions for setting up each of the Kafka Handler components and running the handler are described in the following sections.

5.1.1 Runtime Prerequisites

  • Zookeeper, a prerequisite component for Kafka and Kafka broker (or brokers) should be up and running.

  • It is highly recommended and considered a best practice that the data topic and the schema topic (if applicable) are preconfigured on the running Kafka brokers. It is possible to create Kafka topics dynamically; however, this relies on the Kafka brokers being configured to allow dynamic topics.

  • If the Kafka broker is not collocated with the Oracle GoldenGate for Big Data Kafka Handler process, the remote host:port must be reachable from the machine running the Kafka Handler.

5.1.2 Classpath Configuration

Two things must be configured in the gg.classpath configuration variable in order for the Kafka Handler to connect to Kafka and run. The required items are the Kafka Producer Properties file and the Kafka client jars. The Kafka client jars must match the version of Kafka that the Kafka Handler is connecting to. For a listing of the required client JAR files by version, see Kafka Handler Client Dependencies.

The recommending storage location for the Kafka Producer Properties file is the Oracle GoldenGate dirprm directory.

The default location of the Kafka client jars is Kafka_Home/libs/*.

The gg.classpath must be configured exactly as shown. Pathing to the Kafka Producer Properties file should simply contain the path with no wild card appended. The inclusion of the * wildcard in the path to the Kafka Producer Properties file will cause it not to be picked up. Conversely, pathing to the dependency jars should include the * wild card character in order to include all of the jar files in that directory in the associated classpath. Do not use *.jar. The following is an example of the correctly configured classpath:

gg.classpath=dirprm:/ggwork/kafka/lib/*

5.1.3 Pluggable Formatters

The Kafka Handler supports all the big data formatters which includes:

  • Avro Row

  • Avro Operation

  • JSON

  • XML

  • Delimited Text

5.1.4 Kafka Handler Configuration

The following are the configurable values for the Kafka Handler. These properties are located in the Java Adapter properties file and not in the Replicat properties file.

Table 5-1 Configuration Properties for 12.2.0.1 Kafka Handler

Property Name Property Value Mandatory Description

gg.handlerlist

kafkahandler (choice of any name)

Yes

List of handlers to be used.

gg.handler.kafkahandler.Type

kafka

Yes

Type of handler to use. For example, Kafka, Flume, HDFS.

gg.handler.kafkahandler.KafkaProducerConfigFile

Any custom file name

No. Defaults to kafka-producer-default.properties

Filename in classpath that holds Apache Kafka properties to configure the Apache Kafka producer.

gg.handler.kafkahandler.TopicName

TopicName

Yes

Name of the Kafka topic where payload records will be sent.

gg.handler.kafkahandler.Format

Formatter class or short code

No. Defaults to delimitedtext.

Formatter to use to format payload. Can be one of xml, delimitedtext, json, avro_row, avro_op

gg.handler.kafkahandler.SchemaTopicName

Name of the schema topic

Yes, when schema delivery is required.

Topic name where schema data will be delivered. If this property is not set, schema will not be propagated. Schemas will be propagated only for Avro formatters.

gg.handler.kafkahandler.SchemaPrClassName

Fully qualified class name of a custom class that implements Oracle GoldenGate for Big Data Kafka Handler's CreateProducerRecord Java Interface

No. Defaults to provided implementation class: oracle.goldengate.handler.kafka.Default ProducerRecord

Schema is also propagated as a ProducerRecord. The default key here is the fully qualified table name. If this needs to be changed for schema records, the custom implementation of the CreateProducerRecord interface needs to be created and this property needs to be set to point to the fully qualified name of the new class.

gg.handler.kafkahandler.BlockingSend

true | false

No. Defaults to false.

If this property is set to true, then delivery to Kafka is made to work in a completely synchronous model. The next payload will be sent only after the current payload has been written out to the intended topic and an acknowledgement has been received. In transaction mode, this provides exactly once semantics. If this property is set to false, then delivery to Kafka is made to work in an asynchronous model. Payloads are sent one after the other without waiting for acknowledgements. Kafka internal queues may buffer contents to increase throughput. Checkpoints are made only when acknowledgements are received from Kafka brokers using Java Callbacks.

gg.handler.kafkahandler.ProducerRecordClass

Fully qualified class name of a custom class that implements Oracle GoldenGate for Big Data Kafka Handler's CreateProducerRecord Java Interface

No. Defaults to out-of-box provided implementation class:oracle.goldengate.handler.kafka.DefaultProducerRecord

The unit of data in Kafka - a ProducerRecord holds the key field with the value representing the payload. This key is used for partitioning a Kafka Producer record that holds change capture data. By default, the fully qualified table name is used to partition the records. In order to change this key or behavior, the CreateProducerRecord Kafka Handler Interface needs to be implemented and this property needs to be set to point to the fully qualified name of the custom ProducerRecord class.

gg.handler.kafkahandler.Mode

tx/op

No. Defaults to tx.

With Kafka Handler operation mode, each change capture data record (Insert, Update, Delete etc) payload will be represented as a Kafka Producer Record and will be flushed one at a time. With Kafka Handler in transaction mode, all operations within a source transaction will be represented by as a single Kafka Producer record. This combined byte payload will be flushed on a transaction Commit event.

gg.handler.kafkahandler.topicPartitioning

none | table

None

Controls whether data published into Kafka should be partitioned by table.

Set to table, the data for different tables are written to different Kafka topics.

Set to none, the data from different tables are interlaced in the same topic as configured in topicNameproperty.

5.1.5 Sample Configuration

The properties files are described in the following sections.

5.1.5.1 Java Adapter Properties File

A sample configuration for the Kafka Handler from the Adapter properties file is:

gg.handlerlist = kafkahandler
gg.handler.kafkahandler.Type = kafka
gg.handler.kafkahandler.KafkaProducerConfigFile = custom_kafka_producer.properties
gg.handler.kafkahandler.TopicName = oggtopic
gg.handler.kafkahandler.Format = avro_op
gg.handler.kafkahandler.SchemaTopicName = oggSchemaTopic
gg.handler.kafkahandler.ProducerRecordClass = com.company.kafka.CustomProducerRecord
gg.handler.kafkahandler.SchemaPrClassName = com.company.kafkaProdRec.SchemaRecord
gg.handler.kafkahandler.Mode = tx
gg.handler.kafkahandler.BlockingSend = true

A sample Replicat configuration and a Java Adapter Properties file for a Kafka integration can be found at the following directory:

GoldenGate_install_directory/AdapterExamples/big-data/kafka

5.1.6 Kafka Producer Configuration File

The Kafka Handler must access a Kafka producer configuration file in order publish messages to Kafka. The file name of the Kafka producer configuration file is controlled by the following configuration in the Kafka Handler properties.

gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties

The Kafka Handler will attempt to locate and load the Kafka producer configuration file using the Java classpath. Therefore the Java classpath must include the directory containing the Kafka Producer Configuration File.

The Kafka producer configuration file contains Kafka proprietary properties. The Kafka documentation provides configuration information for the 0.8.2.0 Kafka producer interface properties. The Kafka Handler used these properties to resolve the host and port of the Kafka brokers and properties in the Kafka producer configuration file control the behavior of the interaction between the Kafka producer client and the Kafka brokers.

A sample of configuration file for the Kafka producer is as follows:

bootstrap.servers=localhost:9092
acks = 1
compression.type = gzip
reconnect.backoff.ms = 1000
 
value.serializer = org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer = org.apache.kafka.common.serialization.ByteArraySerializer
# 100KB per partition
batch.size = 102400
linger.ms = 10000
max.request.size = 5024000 
send.buffer.bytes = 5024000

5.2 Detailed Functionality

This section details the modes of operation of the Kafka Handler.

5.2.1 Transaction versus Operation Mode

The Kafka Handler sends instances of the Kafka ProducerRecord class to the Kafka producer API which in turn publishes the ProducerRecord to a Kafka topic. The Kafka ProducerRecord effectively is the implementation of a Kafka message. The ProducerRecord has two components, a key and a value. Both the key and value are represented as byte arrays by the Kafka Handler. This section describes how the Kafka Handler publishes data.

Transaction Mode

Transaction mode is indicated by the following configuration of the Kafka Handler:

gg.handler.name.Mode=tx

In Transaction Mode the serialized data for every operation in a transaction from the source Oracle GoldenGate trail files is concatenated. The contents of the concatenated operation data is the value of the Kafka ProducerRecord object. The key of the Kafka ProducerRecord object is NULL. The result is that Kafka messages comprise the data from 1 to N operations, where N is the number of operations in the transaction. In the case of grouped transactions, all of the data for all of the operations for a grouped transaction are concatenated into a single Kafka message. The result can be very large Kafka messages containing a data for a large number of operations.

Operation Mode

Operation mode is indicated by the following configuration of the Kafka Handler:

gg.handler.name.Mode=op

In Operation Mode the serialized data for each operation is placed into an individual ProducerRecord object as the value. The ProducerRecord key is the fully qualified table name of the source operation. The ProducerRecord is immediately sent using the Kafka Producer API. This means there is a 1 to 1 relationship between the incoming operations and the number of Kafka messages produced.

5.2.2 Blocking versus Blocking Mode

The Kafka Handler can send messages to Kafka in either Blocking mode (synchronous) or Non-Blocking Mode (asynchronous).

Blocking Mode

Blocking mode is set by the following configuration of the Kafka Handler:

gg.handler.name.BlockingSend=true

In this mode messages are delivered to Kafka on a synchronous basis. The Kafka Handler will not send the next message until the current message has been written to the intended topic and an acknowledgement has been received. Blocking mode provides the best guarantee of message delivery; however, its cost is reduced performance.

You must never set the Kafka Producer linger.ms variable when in blocking mode as this will cause the Kafka producer to wait for the entire timeout period before sending the message to the Kafka broker. In this scenario the Kafka Handler is waiting for acknowledgement that the message has been sent while at the same time the Kafka Producer is buffering messages to be sent to the Kafka brokers. Therefore, these settings are at odds with each other.

Non-Blocking Mode

Non-Blocking mode is set by the following configuration of the Kafka Handler:

gg.handler.name.BlockingSend=false

In this mode message are delivered to Kafka on an asynchronous basis. Kafka messages are published one after the other without waiting for acknowledgements. The Kafka Producer client may buffer incoming messages in order to increase throughput.

On each transaction commit, we invoke a blocking call on the Kafka producer to flush all operations that Kafka producer client that may have buffered internally. This allows the Kafka Handler to safely checkpoint ensuring zero data loss. Each transaction commit call will block for a maximum of linger.ms duration in the worst case. It is recommended to use small linger.ms times in the order of millisecond intervals.

You can control when the Kafka Producer flushes data to the Kafka Broker by a number of configurable properties in the Kafka producer configuration file. In order to enable batch sending of messages by the Kafka Producer both the batch.size and linger.ms Kafka Producer properties must be set in the Kafka producer configuration file. The batch.size controls the maximum number of bytes to buffer before a send to Kafka while the linger.ms variable controls the maximum milliseconds to wait before sending data. Data will be sent to Kafka once the batch.size is reached or the linger.ms period expires, whichever comes first. Setting of the batch.size only variable will cause messages to be sent immediately to Kafka.

5.2.3 Publishing to Multiple Topics

The Kafka Handler allows operation data from the source trail file to be published to separate topics based on the corresponding table name of the operation data. This feature allows sorting of operation data from the source trail file by the source table name. The feature is enabled by setting the following configuration in the Java Adapter properties file as follows:

gg.handler.kafka.topicPartitioning=table 
gg.handler.kafka.mode=op 

The mode must be set to op and the Kafka topic name used is the fully qualified table name of the source operation. 

You can publish to multiple topics using the Kafka Handler. For example, you could publish one topic per table by setting gg.handler.kafkahandler.topicPartitioning property to table.

The topics are automatically created and with the topic name equal to the fully-qualified table name.

Kafka Broker Settings

To enable the automatic creation of topics, set the auto.create.topics.enable property to true in the Kafka Broker Configuration. The default value for this property is true.

If the auto.create.topics.enable property is set to false in Kafka Broker configuration, then all the required topics should be created manually before starting the Replicat process.

Schema Propagation

The schema data for all tables is delivered to the schema topic configured with the schemaTopicName property. For more information , see Schema Propagation

NOTE: Multiple topics are supported in the op mode only. For example, when gg.handler.kafkahandler.topicPartitioning is set to table then gg.handler.kafkahandler.mode should be set to op.

5.3 Schema Propagation

The Kafka Handler provides the ability to publish schemas to a schema topic. Currently the Avro Row and Operation formatters are the only formatters which are enabled for schema publishing. If the Kafka Handler schemaTopicName property is set the schema will be published for the following events.

  • The Avro schema for a specific table will be published the first time an operation for that table is encountered.

  • If the Kafka Handler receives a metadata change event, the schema will be flushed. The regenerated Avro schema for a specific table will be published the next time an operation for that table is encountered.

  • If the Avro wrapping functionality is enabled, the generic wrapper Avro schema will be published the first time any operation is encountered. The generic wrapper Avro schema functionality can be enabled in the Avro formatter configuration. Refer to the Avro Row Formatter or Avro Operation Formatter sections of this document for exact instructions.

The Kafka ProducerRecord value will be the schema and the key will be the fully qualified table name.

Avro over Kafka can be problematic because of the direct dependency of Avro messages on an Avro schema. Avro messages are binary and therefore are not human readable. In order to deserialize an Avro message the receiver must first have the correct Avro schema. Since each table from the source database results in a separate Avro schema this can be problematic. The receiver of a Kafka message has no way to determine which Avro schema to use to deserialize individual messages when the source Oracle GoldenGate trail file includes operations from multiple tables. In order to solve this problem, the functionality was provided to wrap the specialized Avro messages in a generic Avro message wrapper. This generic Avro wrapper provides the fully qualified table name, the hashcode of the schema string, and the wrapped Avro message. The receiver can use the fully qualified table name and the hashcode of the schema string to resolve the associated schema of the wrapped message and then use that schema to deserialize the wrapped message.

5.4 Troubleshooting

This section details troubleshooting options.

5.4.1 Verify Kafka Setup

Command line Kafka producer can be used to write dummy data to a Kafka topic and a Kafka consumer can be used to read this data from the Kafka topic. This can be used to verify the set up and read write permissions to Kafka topics on disk. For further details, refer to the online Kafka documentation at

http://kafka.apache.org/documentation.html#quickstart

5.4.2 Classpath Issues

One of the most common problems is Java classpath problems. This problem typically manifests itself as a ClassNotFoundException in the log4j log file, but also may manifest itself as an error resolving the classpath if there is a typo in the gg.classpath variable. The Kafka client libraries do not ship with the Oracle GoldenGate for Big Data product. The requirement is on you to obtain the correct version of the Kafka client libraries and to properly configure the gg.classpath property in the Java Adapter Properties file to correctly resolve the Java the Kafka client libraries.

5.4.3 Invalid Kafka Version

The Oracle GoldenGate for Big Data Kafka Handler utilizes the new recommended Kafka producer API introduced in Kafka 0.8.2. Attempting to connect to a version of Kafka older than 0.8.2 will cause a runtime failure. There is no workaround for this issue. Customers must integrate with Kafka 0.8.2 or higher.

5.4.4 Kafka Producer Properties File Not Found

This problem typically manifests itself in the following exception.

ERROR 2015-11-11 11:49:08,482 [main] Error loading the kafka producer properties

The gg.handler.kafkahandler.KafkaProducerConfigFile configuration variable should be verified that the Kafka Producer Configuration file name is correctly set. The gg.classpath variable should also be checked to verify that the classpath includes the path to the Kafka Producer Properties file and that the path to the properties file does not contain a * wildcard at the end.

5.4.5 Kafka Connection Problem

This problem occurs if the Kafka Handler is unable to connect to Kafka. This problem manifests itself with the following warnings:

WARN 2015-11-11 11:25:50,784 [kafka-producer-network-thread | producer-1] WARN  (Selector.java:276) - Error in I/O with localhost/127.0.0.1 
java.net.ConnectException: Connection refused

Ultimately the connection retry interval will expire and the Kafka Handler process will abend. Check that the Kafka Brokers is running and that the host and port provided in the Kafka Producer Properties file is correct. Network shell commands (such as netstat -l) can be used on the machine hosting the Kafka broker to verify that Kafka is listening on the expected port.

5.5 Performance Considerations

It is advised not to use linger.ms setting in the Kafka producer config file when gg.handler.{name}.BlockingSend=true. This will cause each send to block for at least linger.ms leading to major performance issues. The problem is that the Kafka Handler configuration and the Kafka Producer configuration are in conflict with each other. This configuration results a temporary deadlock scenario where the Kafka Handler is waiting for send acknowledgement while the Kafka producer is waiting for more messages before sending. The deadlock will resolve once the linger.ms period has expired. This scenario will repeat for every message sent.

For the best performance it is recommended to set the Kafka handler to operate in transaction mode using non-blocking (asynchronous) calls to the Kafka producer. This is achieved by the following configuration in the Java Adapter file.

gg.handler.{name}.Mode = tx
gg.handler.{name}.BlockingSend = false

Additionally the recommendation is to set the batch.size and linger.ms values in the Kafka Producer properties file. The values to set the batch.size and linger.ms values are highly dependent upon the use case scenario. Generally higher values will result in better throughput but latency is increased. Smaller values in these parameters will reduce latency but overall throughput will decrease. If the use case is for high volume of input data from the source trial files, then you is advised to set the batch.size and linger.ms size to as high as is tolerable.

Use of the Replicat variable GROUPTRANSOPS will also improve performance. The recommended setting for that is 10000.

If it is a requirement of the customer that the serialized operations from the source trail file be delivered in individual Kafka messages, the then Kafka handler must be set to operation mode.

gg.handler.{name}.Mode = op

The result will be many more Kafka messages and performance will be adversely affected.

5.6 Security

Kafka 0.8.2.2 and earlier does not provide support for security. Kafka 0.9.0.0 introduced security through SSL/TLS or Kerberos. The Oracle GoldenGate Kafka Handler can be secured using SSL/TLS or Kerberos. The Kafka producer client libraries provide an abstraction of security functionality from the integrations utilizing those libraries. The Oracle GoldenGate Kafka Handler is effectively abstracted from security functionality. Enabling security requires setting up security for the Kafka cluster, connecting machines, and then configuring the Kafka producer properties file (that the Oracle GoldenGate Kafka Handler uses for processing) with the required security properties. For detailed instructions about securing the Kafka cluster, see the Kafka documentation at

http://kafka.apache.org/documentation.html#security_configclients

5.7 Kafka Handler Certification Matrix

The Oracle GoldenGate for Big Data Kafka Handler implements the new recommended Kafka producer interface introduced in Kafka 0.8.2.0. The Kafka Handler is not compatible with Kafka version 0.8.1.0 and older.

The Kafka Handler is compatible with the following versions of Apache Kafka

  • 0.9.0.x

  • 0.8.2.x

The Kafka Handler is compatible with the following HDP 2.3 (Kafka 0.8.2.0) versions of the Hortonworks Data Platform (HDP):
  • HDP 2.4 (Kafka 0.9.0)

  • HDP 2.3 (Kafka 0.8.2.0)

Cloudera (CDH) does not currently include Kafka. Cloudera currently distributes Kafka separately as Cloudera Distribution of Apache Kafka. The Kafka Handler is compatible with the following CDH distributions:
  • Cloudera Distribution of Apache Kafka 2.0.x (Kafka 0.9.0.0)

  • Cloudera Distribution of Apache Kafka 1.x (Kafka 0.8.2.0)

5.8 Metadata Change Events

Metadata change events are now handled in the Kafka Handler. However, this is only relevant if you has configured a schema topic and the formatter used supports schema propagation (currently Avro row and Avro Operation formatters). The next time an operation is encountered for a table for which the schema has changed, the updated schema will be published to the schema topic.

To support metadata change events the Oracle GoldenGate process capturing changes in the source database must support both DDL changes and metadata in trail. GoldenGate does not support DDL replication for all database implementations. You are advised to consult the Oracle GoldenGate documentation for their database implementation to understand if DDL replication is supported.

5.9 Snappy Considerations

The Kafka Producer Configuration file supports the use of compression. One of the configurable options is Snappy. Snappy is an open source compression and decompression (codec) library that tends to provide performance than other codec libraries. However, Snappy has a shortcoming in that the Snappy jar does not run on all platforms. Snappy seems to universally work on Linux systems but it can be hit and miss on other Unix and Windows implementations. Customers using snappy compression are advised to test Snappy on all required systems before implementing compression using Snappy. If Snappy does not port to all required systems then Oracle suggests using an alternate codec library.