This chapter explains the Kafka Handler and includes examples so that you can understand this functionality.
Topics:
The Oracle GoldenGate for Big Data Kafka Handler is designed to stream change capture data from a Oracle GoldenGate trail to a Kafka topic. Additionally, the Kafka Handler provides optional functionality to publish the associated schemas for messages to a separate schema topic. Schema publication for Avro and JSON is supported.
Apache Kafka is an open source, distributed, partitioned, and replicated messaging service. Kafka and its associated documentation are available at http://kafka.apache.org/.
Kafka can be run as a single instance or as a cluster on multiple servers. Each Kafka server instance is called a broker. A Kafka topic is a category or feed name to which messages are published by the producers and retrieved by consumers.
The Kafka Handler implements a Kafka producer that writes serialized change data capture from multiple source tables to either a single configured topic or separating source operations to different Kafka topics in Kafka when the topic name corresponds to the fully-qualified source table name.
Transaction Versus Operation Mode
The Kafka Handler sends instances of the Kafka ProducerRecord
class to the Kafka producer API which in turn publishes the ProducerRecord
to a Kafka topic. The Kafka ProducerRecord
effectively is the implementation of a Kafka message. The ProducerRecord
has two components, a key and a value. Both the key and value are represented as byte arrays by the Kafka Handler. This section describes how the Kafka Handler publishes data.
Transaction Mode
Transaction mode is indicated by the following configuration of the Kafka Handler:
gg.handler.name.Mode=tx
In Transaction Mode the serialized data for every operation in a transaction from the source Oracle GoldenGate trail files is concatenated. The contents of the concatenated operation data is the value of the Kafka ProducerRecord
object. The key of the Kafka ProducerRecord
object is NULL. The result is that Kafka messages comprise the data from 1 to N operations, where N is the number of operations in the transaction. With grouped transactions, all of the data for all of the operations for a grouped transaction are concatenated into a single Kafka message. The result can be very large Kafka messages containing data for a large number of operations.
Operation Mode
Operation mode is indicated by the following configuration of the Kafka Handler:
gg.handler.name.Mode=op
In Operation Mode the serialized data for each operation is placed into an individual ProducerRecord
object as the value. The ProducerRecord
key is the fully qualified table name of the source operation. The ProducerRecord
is immediately sent using the Kafka Producer API. This means there is a 1 to 1 relationship between the incoming operations and the number of Kafka messages produced.
Blocking Versus Non-Blocking Mode
The Kafka Handler can send messages to Kafka in either blocking mode (synchronous) or non-blocking mode (asynchronous).
Blocking Mode
Blocking mode is set by the following configuration property of the Kafka Handler:
gg.handler.name.BlockingSend=true
Messages are delivered to Kafka on a synchronous basis. The Kafka Handler does not send the next message until the current message has been written to the intended topic and an acknowledgement has been received. Blocking mode provides the best guarantee of message delivery though the cost is reduced performance.
You must never set the Kafka Producer linger.ms
variable when in blocking mode as this causes the Kafka producer to wait for the entire timeout period before sending the message to the Kafka broker. When this happens, the Kafka Handler is waiting for acknowledgement that the message has been sent while at the same time the Kafka Producer is buffering messages to be sent to the Kafka brokers.
Non-Blocking Mode
Non-blocking mode is set by the following configuration property of the Kafka Handler:
gg.handler.name.BlockingSend=false
Message are delivered to Kafka on an asynchronous basis. Kafka messages are published one after the other without waiting for acknowledgements. The Kafka Producer client may buffer incoming messages in order to increase throughput.
On each transaction commit, the Kafka producer flush call is invoked to ensure all outstanding messages are transferred to the Kafka cluster. This allows the Kafka Handler to safely checkpoint ensuring zero data loss. Invocation of the Kafka producer flush call is not affected by the linger.ms
duration. This allows the Kafka Handler to safely checkpoint ensuring zero data loss.
You can control when the Kafka Producer flushes data to the Kafka Broker by a number of configurable properties in the Kafka producer configuration file. In order to enable batch sending of messages by the Kafka Producer both the batch.size
and linger.ms
Kafka Producer properties must be set in the Kafka producer configuration file. The batch.size
controls the maximum number of bytes to buffer before a send to Kafka while the linger.ms
variable controls the maximum milliseconds to wait before sending data. Data is sent to Kafka once the batch.size
is reached or the linger.ms
period expires, whichever comes first. Setting the batch.size
variable only causes messages to be sent immediately to Kafka.
Topic Name Selection
The topic is resolved at runtime using this configuration parameter:
gg.handler.topicMappingTemplate
You can configure a static string, keywords, or a combination of static strings and keywords to dynamically resolve the topic name at runtime based on the context of the current operation, see Using Templates to Resolve the Topic Name and Message Key.
Kafka Broker Settings
To enable the automatic creation of topics, set the auto.create.topics.enable
property to true
in the Kafka Broker Configuration. The default value for this property is true.
If the auto.create.topics.enable
property is set to false
in Kafka Broker configuration, then all the required topics should be created manually before starting the Replicat process.
Schema Propagation
The schema data for all tables is delivered to the schema topic configured with the schemaTopicName
property. For more information , see Schema Propagation.
Instructions for configuring the Kafka Handler components and running the handler are described in this section.
You must install and correctly configure Kafka either as a single node or a clustered instance. Information on how to install and configure Apache Kafka is available at:
http://kafka.apache.org/documentation.html
If you are using a Kafka distribution other than Apache Kafka, then consult the documentation for your specific Kafka distribution for installation and configuration instructions.
Zookeeper, a prerequisite component for Kafka and Kafka broker (or brokers), must be up and running.
Oracle recommends and considers it best practice that the data topic and the schema topic (if applicable) are preconfigured on the running Kafka brokers. You can create Kafka topics dynamically; though this relies on the Kafka brokers being configured to allow dynamic topics.
If the Kafka broker is not collocated with the Kafka Handler process, then the remote host port must be reachable from the machine running the Kafka Handler.
Topics:
Two things must be configured in the gg.classpath
configuration variable so that the Kafka Handler can to connect to Kafka and run. The required items are the Kafka Producer properties file and the Kafka client JARs. The Kafka client JARs must match the version of Kafka that the Kafka Handler is connecting to. For a listing of the required client JAR files by version, see Kafka Handler Client Dependencies.
The recommending storage location for the Kafka Producer properties file is the Oracle GoldenGate dirprm
directory.
The default location of the Kafka client JARs is Kafka_Home
/libs/*.
The gg.classpath
must be configured precisely. Pathing to the Kafka Producer Properties file should simply contain the path with no wildcard appended. The inclusion of the *
wildcard in the path to the Kafka Producer Properties file will cause it not to be picked up. Conversely, pathing to the dependency JARs should include the *
wild card character in order to include all of the JAR files in that directory in the associated classpath. Do not use *.jar.
The following is an example of the correctly configured classpath:
gg.classpath={kafka install dir}/libs/*
Table 8-1 Configuration Properties for Kafka Handler
Property Name | Required | Property Value | Default | Description |
---|---|---|---|---|
|
Yes |
|
List of handlers to be used. |
|
|
Yes |
|
Type of handler to use. For example, Kafka, Flume, HDFS. |
|
|
No. Defaults to |
Any custom file name |
Filename in classpath that holds Apache Kafka properties to configure the Apache Kafka producer. |
|
|
No. Defaults to |
Formatter class or short code |
Formatter to use to format payload. Can be one of |
|
|
Yes, when schema delivery is required. |
Name of the schema topic |
Topic name where schema data will be delivered. If this property is not set, schema will not be propagated. Schemas will be propagated only for Avro formatters. |
|
|
No. Defaults to provided implementation class: |
Fully qualified class name of a custom class that implements Oracle GoldenGate for Big Data Kafka Handler's |
Schema is also propagated as a |
|
|
No. Defaults to |
|
If this property is set to true, then delivery to Kafka is made to work in a completely synchronous model. The next payload will be sent only after the current payload has been written out to the intended topic and an acknowledgement has been received. In transaction mode, this provides exactly once semantics. If this property is set to false, then delivery to Kafka is made to work in an asynchronous model. Payloads are sent one after the other without waiting for acknowledgements. Kafka internal queues may buffer contents to increase throughput. Checkpoints are made only when acknowledgements are received from Kafka brokers using Java Callbacks. |
|
|
No. Defaults to |
|
With Kafka Handler operation mode, each change capture data record (Insert, Update, Delete etc) payload will be represented as a Kafka Producer Record and will be flushed one at a time. With Kafka Handler in transaction mode, all operations within a source transaction will be represented by as a single Kafka Producer record. This combined byte payload will be flushed on a transaction Commit event. |
|
|
Required |
A template string value to resolve the Kafka topic name at runtime. |
None |
See Using Templates to Resolve the Topic Name and Message Key. |
|
Required |
A template string value to resolve the Kafka message key at runtime. |
None |
See Using Templates to Resolve the Topic Name and Message Key. |
A sample configuration for the Kafka Handler from the Adapter properties file is:
gg.handlerlist = kafkahandler gg.handler.kafkahandler.Type = kafka gg.handler.kafkahandler.KafkaProducerConfigFile = custom_kafka_producer.properties gg.handler.kafkahandler.topicMappingTemplate=oggtopic gg.handler.kafkahandler.keyMappingTemplate=${currentTimestamp} gg.handler.kafkahandler.Format = avro_op gg.handler.kafkahandler.SchemaTopicName = oggSchemaTopic gg.handler.kafkahandler.SchemaPrClassName = com.company.kafkaProdRec.SchemaRecord gg.handler.kafkahandler.Mode = tx gg.handler.kafkahandler.BlockingSend = true
A sample Replicat configuration and a Java Adapter Properties file for a Kafka integration can be found at the following directory:
GoldenGate_install_directory
/AdapterExamples/big-data/kafka
The Kafka Handler must access a Kafka producer configuration file in order publish messages to Kafka. The file name of the Kafka producer configuration file is controlled by the following configuration in the Kafka Handler properties.
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
The Kafka Handler will attempt to locate and load the Kafka producer configuration file using the Java classpath. Therefore the Java classpath must include the directory containing the Kafka Producer Configuration File.
The Kafka producer configuration file contains Kafka proprietary properties. The Kafka documentation provides configuration information for the 0.8.2.0 Kafka producer interface properties. The Kafka Handler used these properties to resolve the host and port of the Kafka brokers and properties in the Kafka producer configuration file control the behavior of the interaction between the Kafka producer client and the Kafka brokers.
A sample of configuration file for the Kafka producer is as follows:
bootstrap.servers=localhost:9092 acks = 1 compression.type = gzip reconnect.backoff.ms = 1000 value.serializer = org.apache.kafka.common.serialization.ByteArraySerializer key.serializer = org.apache.kafka.common.serialization.ByteArraySerializer # 100KB per partition batch.size = 102400 linger.ms = 0 max.request.size = 1048576 send.buffer.bytes = 131072
The Kafka Handler provides functionality to resolve the topic name and the message key at runtime using a template configuration value. Templates allow you to configure static values and keywords. Keywords are used to dynamically replace the keyword with the context of the current processing. The templates use the following configuration properties:
gg.handler.name.topicMappingTemplate gg.handler.name.keyMappingTemplate
Template Modes
Source database transactions are made up of one or more individual operations that are the individual inserts, updates, and deletes. The Kafka Handler can be configured to send one message per operation (insert, update, delete), or alternatively can be configured to group operations into messages at the transaction level. Many template keywords resolve data based on the context of an individual source database operation. Therefore, many of the keywords do not work when sending messages at the transaction level. For example, using ${fullyQualifiedTableName}
does not work when sending messages at the transaction level rather it resolves to the qualified source table name for an operation. However, transactions can contain multiple operations for many source tables. Resolving the fully qualified table name for messages at the transaction level is non-deterministic so abends at runtime.
Template Keywords
This table includes a column if the keyword is supported for transaction level messages.
Keyword | Explanation | Transaction Message Support |
---|---|---|
|
Resolves to the fully qualified table name including the period (.) delimiter between the catalog, schema, and table names. For example, |
No |
|
Resolves to the catalog name. |
No |
|
Resolves to the schema name. |
No |
|
Resolves to the short table name. |
No |
|
Resolves to the type of the operation: ( |
No |
|
Resolves to the concatenated primary key values delimited by an underscore (_) character. |
No |
|
The sequence number of the source trail file followed by the offset (RBA). |
Yes |
|
The operation timestamp from the source trail file. |
Yes |
|
Resolves to “”. |
Yes |
|
Resolves to the name of the Replicat process. If using coordinated delivery, it resolves to the name of the Replicat process with the Replicate thread number appended. |
Yes |
|
Resolves to a static value where the key is the fully-qualified table name. The keys and values are designated inside of the square brace in the following format: ${staticMap[dbo.table1=value1,dbo.table2=value2]} |
No |
|
Resolves to a column value where the key is the fully-qualified table name and the value is the column name to be resolved. For example: ${staticMap[dbo.table1=col1,dbo.table2=col2]} |
No |
Or
|
Resolves to the current timestamp. You can control the format of the current timestamp using the Java based formatting as described in the Examples: ${currentDate} ${currentDate[yyyy-mm-dd hh:MM:ss.SSS]} |
Yes |
|
Resolves to a NULL string. |
Yes |
|
It is possible to write a custom value resolver. If required, contact Oracle Support. |
Implementation dependent |
Example Templates
The following describes example template configuration values and the resolved values.
Example Template | Resolved Value |
---|---|
|
|
|
|
|
|
The Kafka Handler provides the ability to publish schemas to a schema topic. Currently the Avro Row and Operation formatters are the only formatters that are enabled for schema publishing. If the Kafka Handler schemaTopicName
property is set, then the schema is published for the following events:
The Avro schema for a specific table will be published the first time an operation for that table is encountered.
If the Kafka Handler receives a metadata change event, the schema is flushed. The regenerated Avro schema for a specific table is published the next time an operation for that table is encountered.
If the Avro wrapping functionality is enabled, then the generic wrapper Avro schema is published the first time any operation is encountered. The generic wrapper Avro schema functionality can be enabled in the Avro formatter configuration, see Avro Row Formatter and Avro Operation Formatter.
The Kafka ProducerRecord
value is the schema and the key will be the fully qualified table name.
Avro over Kafka can be problematic because of the direct dependency of Avro messages on an Avro schema. Avro messages are binary so are not human readable. To deserialize an Avro message, the receiver must first have the correct Avro schema. Since each table from the source database results in a separate Avro schema, this can be problematic. The receiver of a Kafka message cannot determine which Avro schema to use to deserialize individual messages when the source Oracle GoldenGate trail file includes operations from multiple tables. To solve this problem, you can wrap the specialized Avro messages in a generic Avro message wrapper. This generic Avro wrapper provides the fully-qualified table name, the hashcode of the schema string, and the wrapped Avro message. The receiver can use the fully-qualified table name and the hashcode of the schema string to resolve the associated schema of the wrapped message, and then use that schema to deserialize the wrapped message.
Oracle recommends that you do not to use the linger.ms
setting in the Kafka producer config
file when gg.handler.name.BlockingSend=true
. This causes each send to block for at least linger.ms
leading to major performance issues because the Kafka Handler configuration and the Kafka Producer configuration are in conflict with each other. This configuration results a temporary deadlock scenario where the Kafka Handler is waiting for send acknowledgement while the Kafka producer is waiting for more messages before sending. The deadlock resolves once the linger.ms
period has expired. This behavior repeats for every message sent.
For the best performance, Oracle recommends that you set the Kafka Handler to operate in operation mode using non-blocking (asynchronous) calls to the Kafka producer by using the following configuration in your Java Adapter properties file:
gg.handler.name.mode = op gg.handler.name.BlockingSend = false
Additionally the recommendation is to set the batch.size
and linger.ms values in the Kafka Producer properties file. The values to set the batch.size
and linger.ms
values are highly dependent upon the use case scenario. Typically, higher values results in better throughput but latency is increased. Smaller values in these properties reduces latency though overall throughput decreases. If you have a high volume of input data from the source trial files, then set the batch.size
and linger.ms
size to as high as possible.
Use of the Replicat
variable GROUPTRANSOPS
also improves performance. The recommended setting for that is 10000
.
If you need to have the serialized operations from the source trail file delivered in individual Kafka messages, then the Kafka Handler must be set to operation mode.
gg.handler.name.mode = op
The result is many more Kafka messages and performance is adversely affected.
Kafka version 0.9.0.0 introduced security through SSL/TLS and SASL (Kerberos). You can secure the Kafka Handler using one or both of the SSL/TLS and SASL (Kerberos) security offerings. The Kafka producer client libraries provide an abstraction of security functionality from the integrations utilizing those libraries. The Kafka Handler is effectively abstracted from security functionality. Enabling security requires setting up security for the Kafka cluster, connecting machines, and then configuring the Kafka producer properties file, that the Kafka Handler uses for processing, with the required security properties. For detailed instructions about securing the Kafka cluster, see the Kafka documentation at
http://kafka.apache.org/documentation.html#security_configclients
Metadata change events are now handled in the Kafka Handler. This is only relevant if you have configured a schema topic and the formatter used supports schema propagation (currently Avro row and Avro Operation formatters). The next time an operation is encountered for a table for which the schema has changed, the updated schema is published to the schema topic.
To support metadata change events, the Oracle GoldenGate process capturing changes in the source database must support the Oracle GoldenGate metadata in trail feature, which was introduced in Oracle GoldenGate 12c (12.2).
The Kafka Producer Configuration file supports the use of compression. One of the configurable options is Snappy, which is an open source compression and decompression (codec
) library that tends to provide better performance than other codec
libraries. The Snappy JAR does not run on all platforms. Snappy seems to work on Linux systems though may or may not work on other UNIX and Windows implementations. If you want to use Snappy compression, they you should test Snappy on all required systems before implementing compression using Snappy. If Snappy does not port to all required systems, then Oracle recommends using an alternate codec
library.
Topics:
You can use the command line Kafka producer to write dummy data to a Kafka topic and a Kafka consumer can be used to read this data from the Kafka topic. Use this to verify the set up and read write permissions to Kafka topics on disk. For further details, refer to the online Kafka documentation at
One of the most common problems is Java classpath problems. Typically this is a ClassNotFoundException
problem in the log4j
log file though may be an error resolving the classpath if there is a typographic error in the gg.classpath
variable. The Kafka client libraries do not ship with the Oracle GoldenGate for Big Data product. The requirement is on you to obtain the correct version of the Kafka client libraries and to properly configure the gg.classpath
property in the Java Adapter Properties file to correctly resolve the Java the Kafka client libraries as described in Classpath Configuration.
The Kafka Handler does not support Kafka versions 0.8.2.2 and older. The typical outcome when running with an unsupported version of Kafka is a runtime Java exception, java.lang.NoSuchMethodError
, indicating that the org.apache.kafka.clients.producer.KafkaProducer.flush()
method cannot be found. If this error is encountered, you must migrate to Kafka version 0.9.0.0 or later.
Typically, this problem is in the following exception.
ERROR 2015-11-11 11:49:08,482 [main] Error loading the kafka producer properties
The gg.handler.kafkahandler.KafkaProducerConfigFile
configuration variable should be verified that the Kafka Producer Configuration file name is set correctly. Check the gg.classpath
variable to verify that the classpath includes the path to the Kafka Producer properties file and that the path to the properties file does not contain a *
wildcard at the end.
This problem occurs when the Kafka Handler is unable to connect to Kafka with the following warnings:
WARN 2015-11-11 11:25:50,784 [kafka-producer-network-thread | producer-1] WARN (Selector.java:276) - Error in I/O with localhost/127.0.0.1 java.net.ConnectException: Connection refused
The connection retry interval expires and the Kafka Handler process abends. Ensure that the Kafka Brokers is running and that the host and port provided in the Kafka Producer Properties file is correct. Network shell commands (such as, netstat -l
) can be used on the machine hosting the Kafka broker to verify that Kafka is listening on the expected port.