20 Using the Kafka Connect Handler
The Kafka Connect Handler is an extension of the standard Kafka messaging functionality.
This chapter describes how to use the Kafka Connect Handler.
- Overview
The Oracle GoldenGate Kafka Connect is an extension of the standard Kafka messaging functionality. Kafka Connect is a functional layer on top of the standard Kafka Producer and Consumer interfaces. It provides standardization for messaging to make it easier to add new source and target systems into your topology. - Detailed Functionality
- Setting Up and Running the Kafka Connect Handler
- Kafka Connect Handler Performance Considerations
- Kafka Interceptor Support
The Kafka Producer client framework supports the use of Producer Interceptors. A Producer Interceptor is simply a user exit from the Kafka Producer client whereby the Interceptor object is instantiated and receives notifications of Kafka message send calls and Kafka message send acknowledgement calls. - Kafka Partition Selection
Kafka topics comprise one or more partitions. Distribution to multiple partitions is a good way to improve Kafka ingest performance, because the Kafka client parallelizes message sending to different topic/partition combinations. Partition selection is controlled by a following calculation in the Kafka client. - Troubleshooting the Kafka Connect Handler
20.1 Overview
The Oracle GoldenGate Kafka Connect is an extension of the standard Kafka messaging functionality. Kafka Connect is a functional layer on top of the standard Kafka Producer and Consumer interfaces. It provides standardization for messaging to make it easier to add new source and target systems into your topology.
Confluent is a primary adopter of Kafka Connect and their Confluent Platform offering includes extensions over the standard Kafka Connect functionality. This includes Avro serialization and deserialization, and an Avro schema registry. Much of the Kafka Connect functionality is available in Apache Kafka. A number of open source Kafka Connect integrations are found at:
https://www.confluent.io/product/connectors/
The Kafka Connect Handler is a Kafka Connect source connector. You can capture database changes from any database supported by Oracle GoldenGate and stream that change of data through the Kafka Connect layer to Kafka. You can also connect to Oracle Event Hub Cloud Services (EHCS) with this handler.
Kafka Connect uses proprietary objects to define the schemas (org.apache.kafka.connect.data.Schema
) and the messages (org.apache.kafka.connect.data.Struct
). The Kafka Connect Handler can be configured to manage what data is published and the structure of the published data.
The Kafka Connect Handler does not support any of the pluggable formatters that are supported by the Kafka Handler.
Parent topic: Using the Kafka Connect Handler
20.2 Detailed Functionality
JSON Converter
The Kafka Connect framework provides converters to convert in-memory Kafka Connect messages to a serialized format suitable for transmission over a network. These converters are selected using configuration in the Kafka Producer properties file.
Kafka Connect and the JSON converter is available as part of the Apache Kafka download. The JSON Converter converts the Kafka keys and values to JSONs which are then sent to a Kafka topic. You identify the JSON Converters with the following configuration in the Kafka Producer properties file:
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
The format of the messages is the message schema information followed by the payload information. JSON is a self describing format so you should not include the schema information in each message published to Kafka.
To omit the JSON schema information from the messages set the following:
key.converter.schemas.enable=false
value.converter.schemas.enable=false
Avro Converter
Confluent provides Kafka installations, support for Kafka, and extended functionality built on top of Kafka to help realize the full potential of Kafka. Confluent provides both open source versions of Kafka (Confluent Open Source) and an enterprise edition (Confluent Enterprise), which is available for purchase.
A common Kafka use case is to send Avro messages over Kafka. This can create a problem on the receiving end as there is a dependency for the Avro schema in order to deserialize an Avro message. Schema evolution can increase the problem because received messages must be matched up with the exact Avro schema used to generate the message on the producer side. Deserializing Avro messages with an incorrect Avro schema can cause runtime failure, incomplete data, or incorrect data. Confluent has solved this problem by using a schema registry and the Confluent schema converters.
The following shows the configuration of the Kafka Producer properties file.
key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter.schema.registry.url=http://localhost:8081
When messages are published to Kafka, the Avro schema is registered and stored in the schema registry. When messages are consumed from Kafka, the exact Avro schema used to create the message can be retrieved from the schema registry to deserialize the Avro message. This creates matching of Avro messages to corresponding Avro schemas on the receiving side, which solves this problem.
Following are the requirements to use the Avro Converters:
-
This functionality is available in both versions of Confluent Kafka (open source or enterprise).
-
The Confluent schema registry service must be running.
-
Source database tables must have an associated Avro schema. Messages associated with different Avro schemas must be sent to different Kafka topics.
-
The Confluent Avro converters and the schema registry client must be available in the classpath.
The schema registry keeps track of Avro schemas by topic. Messages must be sent to a topic that has the same schema or evolving versions of the same schema. Source messages have Avro schemas based on the source database table schema so Avro schemas are unique for each source table. Publishing messages to a single topic for multiple source tables will appear to the schema registry that the schema is evolving every time the message sent from a source table that is different from the previous message.
Parent topic: Using the Kafka Connect Handler
20.3 Setting Up and Running the Kafka Connect Handler
Instructions for configuring the Kafka Connect Handler components and running the handler are described in this section.
Classpath Configuration
Two things must be configured in the gg.classpath
configuration variable so that the Kafka Connect Handler can to connect to Kafka and run. The required items are the Kafka Producer properties file and the Kafka client JARs. The Kafka client JARs must match the version of Kafka that the Kafka Connect Handler is connecting to. For a listing of the required client JAR files by version, see Kafka Handler Client Dependencies Kafka Connect Handler Client Dependencies. The recommended storage location for the Kafka Producer properties file is the Oracle GoldenGate dirprm
directory.
The default location of the Kafka Connect client JARs is the Kafka_Home/libs/*
directory.
The gg.classpath
variable must be configured precisely. Pathing to the Kafka Producer properties file should contain the path with no wildcard appended. The inclusion of the asterisk (*) wildcard in the path to the Kafka Producer properties file causes it to be discarded. Pathing to the dependency JARs should include the * wildcard character to include all of the JAR files in that directory in the associated classpath. Do not use *.jar
.
Following is an example of a correctly configured Apache Kafka classpath:
gg.classpath=dirprm:{kafka_install_dir}/libs/*
Following is an example of a correctly configured Confluent Kafka classpath:
gg.classpath={confluent_install_dir}/share/java/kafka-serde-tools/*:{confluent_install_dir}/share/java/kafka/*:{confluent_install_dir}/share/java/confluent-common/*
- Kafka Connect Handler Configuration
- Using Templates to Resolve the Topic Name and Message Key
- Configuring Security in the Kafka Connect Handler
Parent topic: Using the Kafka Connect Handler
20.3.1 Kafka Connect Handler Configuration
Meta-column fields can be configured as the following property:
gg.handler.name.metaColumnsTemplate
To output the metacolumns as in previous versions configure the following:
gg.handler.name.metaColumnsTemplate=${objectname[table]},${optype[op_type]},${timestamp[op_ts]},${currenttimestamp[current_ts]},${position[pos]}
To also include the primary key columns and the tokens configure as follows:
gg.handler.name.format.metaColumnsTemplate=${objectname[table]},${optype[op_type]},${timestamp[op_ts]},${currenttimestamp[current_ts]},${position[pos]},${primarykeycolumns[primary_keys]},${alltokens[tokens]}
For more information see the configuration property
gg.handler.name.metaColumnsTemplate
Table 20-1 Kafka Connect Handler Configuration Properties
Properties | Required/ Optional | Legal Values | Default | Explanation |
---|---|---|---|---|
gg.handler.name.type |
Required |
|
None |
The configuration to select the Kafka Connect Handler. |
gg.handler.name.kafkaProducerConfigFile |
Required |
string |
None |
A path to a properties file containing the properties of the Kafka and Kafka Connect configuration properties. |
gg.handler.name.topicMappingTemplate |
Required |
A template string value to resolve the Kafka topic name at runtime. |
None |
See Using Templates to Resolve the Topic Name and Message Key. |
gg.handler.name.keyMappingTemplate |
Required |
A template string value to resolve the Kafka message key at runtime. |
None |
See Using Templates to Resolve the Topic Name and Message Key. |
gg.handler.name.includeTokens |
Optional |
|
|
Set to Set to |
gg.handler.name.messageFormatting |
Optional |
|
|
Controls how output messages are modeled. Selecting row and the output messages will be modeled as row. Set to op and the output messages will be modeled as operations messages. |
gg.handler.name.insertOpKey |
Optional |
any string |
|
The value of the field |
gg.handler.name.updateOpKey |
Optional |
any string |
|
The value of the field |
gg.handler.name.deleteOpKey |
Optional |
any string |
|
The value of the field |
gg.handler.name.truncateOpKey |
Optional |
any string |
|
The value of the field |
gg.handler.name.treatAllColumnsAsStrings |
Optional |
|
|
Set to true to treat all output fields as strings. Set to false and the Handler will map the corresponding field type from the source trail file to the best corresponding Kafka Connect data type. |
gg.handler.name.mapLargeNumbersAsStrings |
Optional |
|
|
Large numbers are mapping to number fields as Doubles. It is possible to lose precision in certain scenarios. If set to |
|
Optional |
abend | update | delete-insert |
|
Only applicable if modeling row messages |
|
Optional |
Any of the metacolumns keywords. |
None |
A comma-delimited string consisting of one or more templated values that represent the template, see Setting Metacolumn Output. |
|
Optional |
|
|
Set to Set this property for each column to allow downstream applications to differentiate if a null value is actually null in the source trail file or if it is missing in the source trail file. |
gg.handler.name.enableDecimalLogicalType |
Optional | true|false |
false |
Set to true to enable decimal
logical types in Kafka Connect. Decimal logical types allow numbers
which will not fit in a 64 bit data type to be represented.
|
gg.handler.name.oracleNumberScale |
Optional | Positive Integer | 38 | Only applicable if
gg.handler.name.enableDecimalLogicalType=true .
Some source data types do not have a fixed scale associated with
them. Scale must be set for Kafka Connectdecimal logical types. In
the case of source types which do not have a scalein the metadata,
the value of this parameter is used to set the scale.
|
gg.handler.name.EnableTimestampLogicalType |
Optional | true|false |
false |
Set to true to enable the Kafka
Connect timestamp logical type. The Kafka connect timestamp logical
time is a integer measurement ofmilliseconds since the Java epoch.
This means precision greater thanmilliseconds is not possible if the
timestamp logica type is used. Use of this property requires that
the gg.format.timestamp property be set. This
property is the timestamp formatting string, which is used to
determine the output of timestamps in string format. For example,
gg.format.timestamp=yyyy-MM-dd HH:mm:ss.SSS .
Ensure that the goldengate.userexit.timestamp
property is not set in the configuration file. Setting this property
prevents parsing the input timestamp into a Java object which is
required for logical timestamps.
|
gg.handler.name.metaHeadersTemplate |
Optional | Comma delimited list of metacolumn keywords. | None | Allows the user to select metacolumns to inject context-based key value pairs into Kafka message headers using the metacolumn keyword syntax. |
gg.handler.name.schemaNamespace
|
Optional | Any string without characters which violate the Kafka Connector Avro schema naming requirements. | None | Used to control the generated Kafka Connect schema
name. If it is not set, then the schema name is the same as the
qualified source table name. For example, if the source table is
QASOURCE.TCUSTMER , then the Kafka Connect
schema name will be the same.
This property allows you to control
the generated schema name. For example, if this property is set
to |
See Using Templates to Resolve the Stream Name and Partition Name for more information.
Review a Sample Configuration
gg.handlerlist=kafkaconnect #The handler properties gg.handler.kafkaconnect.type=kafkaconnect gg.handler.kafkaconnect.kafkaProducerConfigFile=kafkaconnect.properties gg.handler.kafkaconnect.mode=op #The following selects the topic name based on the fully qualified table name gg.handler.kafkaconnect.topicMappingTemplate=${fullyQualifiedTableName} #The following selects the message key using the concatenated primary keys gg.handler.kafkaconnect.keyMappingTemplate=${primaryKeys} #The formatter properties gg.handler.kafkaconnect.messageFormatting=row gg.handler.kafkaconnect.insertOpKey=I gg.handler.kafkaconnect.updateOpKey=U gg.handler.kafkaconnect.deleteOpKey=D gg.handler.kafkaconnect.truncateOpKey=T gg.handler.kafkaconnect.treatAllColumnsAsStrings=false gg.handler.kafkaconnect.pkUpdateHandling=abend
Parent topic: Setting Up and Running the Kafka Connect Handler
20.3.2 Using Templates to Resolve the Topic Name and Message Key
The Kafka Connect Handler provides functionality to resolve the topic name and the message key at runtime using a template configuration value. Templates allow you to configure static values and keywords. Keywords are used to dynamically replace the keyword with the context of the current processing. Templates are applicable to the following configuration parameters:
gg.handler.name.topicMappingTemplate
gg.handler.name.keyMappingTemplate
Template Modes
The Kafka Connect Handler can only send operation messages. The Kafka Connect Handler cannot group operation messages into a larger transaction message.
Template Keywords
Keyword | Explanation |
---|---|
|
Resolves to the fully qualified table name including the period (.) delimiter between the catalog, schema, and table names. For example, |
|
Resolves to the catalog name. |
|
Resolves to the schema name. |
|
Resolves to the short table name. |
|
Resolves to the type of the operation: ( |
|
Resolves to the concatenated primary key values delimited by an underscore (_) character. |
|
The sequence number of the source trail file followed by the offset (RBA). |
|
The operation timestamp from the source trail file. |
|
Resolves to “”. |
|
Resolves to the name of the Replicat process. If using coordinated delivery, it resolves to the name of the Replicat process with the Replicate thread number appended. |
|
Resolves to a static value where the key is the fully-qualified table name. The keys and values are designated inside of the square brace in the following format:
|
|
Resolves to a column value where the key is the fully-qualified table name and the value is the column name to be resolved. For example:
|
Or
|
Resolves to the current timestamp. You can control the format of the current timestamp using the Java based formatting as described in the Examples:
|
|
Resolves to a NULL string. |
|
It is possible to write a custom value resolver. If required, contact Oracle Support. |
Example Templates
The following describes example template configuration values and the resolved values.
Example Template | Resolved Value |
---|---|
|
|
|
|
|
|
Parent topic: Setting Up and Running the Kafka Connect Handler
20.3.3 Configuring Security in the Kafka Connect Handler
Kafka version 0.9.0.0 introduced security through SSL/TLS or Kerberos. The Kafka Connect Handler can be secured using SSL/TLS or Kerberos. The Kafka producer client libraries provide an abstraction of security functionality from the integrations utilizing those libraries. The Kafka Connect Handler is effectively abstracted from security functionality. Enabling security requires setting up security for the Kafka cluster, connecting machines, and then configuring the Kafka Producer properties file, that the Kafka Handler uses for processing, with the required security properties.
You may encounter the inability to decrypt the Kerberos password from the keytab
file. This causes the Kerberos authentication to fall back to interactive mode which cannot work because it is being invoked programmatically. The cause of this problem is that the Java Cryptography Extension (JCE) is not installed in the Java Runtime Environment (JRE). Ensure that the JCE is loaded in the JRE, see http://www.oracle.com/technetwork/java/javase/downloads/jce8-download-2133166.html.
Parent topic: Setting Up and Running the Kafka Connect Handler
20.4 Kafka Connect Handler Performance Considerations
There are multiple configuration settings both for the Oracle GoldenGate for Big Data configuration and in the Kafka producer which affect performance.
The Oracle GoldenGate parameter have the greatest affect on performance is the Replicat GROUPTRANSOPS
parameter. The GROUPTRANSOPS
parameter allows Replicat to group multiple source transactions into a single target transaction. At transaction commit, the Kafka Connect Handler calls flush on the Kafka Producer to push the messages to Kafka for write durability followed by a checkpoint. The flush call is an expensive call and setting the Replicat GROUPTRANSOPS
setting to larger amount allows the replicat to call the flush call less frequently thereby improving performance.
The default setting for GROUPTRANSOPS
is 1000
and performance improvements can be obtained by increasing the value to 2500, 5000, or even 10000.
The Op mode gg.handler.kafkaconnect.mode=op
parameter can also improve performance than the Tx mode gg.handler.kafkaconnect.mode=tx
.
A number of Kafka Producer properties can affect performance. The following are the parameters with significant impact:
-
linger.ms
-
batch.size
-
acks
-
buffer.memory
-
compression.type
Oracle recommends that you start with the default values for these parameters and perform performance testing to obtain a base line for performance. Review the Kafka documentation for each of these parameters to understand its role and adjust the parameters and perform additional performance testing to ascertain the performance effect of each parameter.
Parent topic: Using the Kafka Connect Handler
20.5 Kafka Interceptor Support
The Kafka Producer client framework supports the use of Producer Interceptors. A Producer Interceptor is simply a user exit from the Kafka Producer client whereby the Interceptor object is instantiated and receives notifications of Kafka message send calls and Kafka message send acknowledgement calls.
The typical use case for Interceptors is monitoring. Kafka Producer
Interceptors must conform to the interface
org.apache.kafka.clients.producer.ProducerInterceptor
. The Kafka
Connect Handler supports Producer Interceptor usage.
The requirements to using Interceptors in the Handlers are as follows:
- The Kafka Producer configuration property
"
interceptor.classes
" must be configured with the class name of the Interceptor(s) to be invoked. - In order to invoke the Interceptor(s), the jar files plus any
dependency jars must be available to the JVM. Therefore, the jar files
containing the Interceptor(s) plus any dependency jars must be added to the
gg.classpath
in the Handler configuration file.For more information, see Kafka documentation.
Parent topic: Using the Kafka Connect Handler
20.6 Kafka Partition Selection
Kafka topics comprise one or more partitions. Distribution to multiple partitions is a good way to improve Kafka ingest performance, because the Kafka client parallelizes message sending to different topic/partition combinations. Partition selection is controlled by a following calculation in the Kafka client.
(Hash of the Kafka message key) modulus (the number of partitions) = selected partition number
The Kafka message key is selected by the following configuration value:
gg.handler.{your handler name}.keyMappingTemplate=
If this parameter is set to a value which generates a static key, all messages will go to the same partition. The following is example of static keys:
gg.handler.{your handler name}.keyMappingTemplate=StaticValue
If this parameter is set to a value which generates a key that changes infrequently, partition selection changes infrequently. In the following example the table name is used as the message key. Every operation for a specific source table will have the same key and thereby route to the same partition:
gg.handler.{your handler name}.keyMappingTemplate=${tableName}
gg.handler.{your handler name}.keyMappingTemplate=${null}
The recommended setting for configuration of the mapping key is the following:
gg.handler.{your handler name}.keyMappingTemplate=${primaryKeys}
This generates a Kafka message key that is the concatenated and delimited primary key values.
Operations for each row should have a unique primary key(s) thereby generating a unique Kafka message key for each row. Another important consideration is Kafka messages sent to different partitions are not guaranteed to be delivered to a Kafka consumer in the original order sent. This is part of the Kafka specification. Order is only maintained within a partition. Using primary keys as the Kafka message key means that operations for the same row, which have the same primary key(s), generate the same Kafka message key, and therefore are sent to the same Kafka partition. In this way, the order is maintained for operations for the same row.
At the DEBUG
log level the Kafka message coordinates (topic,
partition, and offset) are logged to the .log
file for successfully
sent messages.
Parent topic: Using the Kafka Connect Handler
20.7 Troubleshooting the Kafka Connect Handler
- Java Classpath for Kafka Connect Handler
- Invalid Kafka Version
- Kafka Producer Properties File Not Found
- Kafka Connection Problem
Parent topic: Using the Kafka Connect Handler
20.7.1 Java Classpath for Kafka Connect Handler
Issues with the Java classpath are one of the most common problems. The indication of a classpath problem is a ClassNotFoundException
in the Oracle GoldenGate Java log4j
log file or and error while resolving the classpath if there is a typographic error in the gg.classpath
variable.
The Kafka client libraries do not ship with the Oracle GoldenGate for Big Data product. You are required to obtain the correct version of the Kafka client libraries and to properly configure the gg.classpath
property in the Java Adapter Properties file to correctly resolve the Java the Kafka client libraries as described in Setting Up and Running the Kafka Connect Handler.
Parent topic: Troubleshooting the Kafka Connect Handler
20.7.2 Invalid Kafka Version
Kafka Connect was introduced in Kafka 0.9.0.0 version. The Kafka Connect Handler does not work with Kafka versions 0.8.2.2 and older. Attempting to use Kafka Connect with Kafka 0.8.2.2 version typically results in a ClassNotFoundException
error at runtime.
Parent topic: Troubleshooting the Kafka Connect Handler
20.7.3 Kafka Producer Properties File Not Found
Typically, the following exception message occurs:
ERROR 2015-11-11 11:49:08,482 [main] Error loading the kafka producer properties
Verify that the gg.handler.kafkahandler.KafkaProducerConfigFile
configuration property for the Kafka Producer Configuration file name is set correctly.
Ensure that the gg.classpath
variable includes the path to the Kafka Producer properties file and that the path to the properties file does not contain a * wildcard at the end.
Parent topic: Troubleshooting the Kafka Connect Handler
20.7.4 Kafka Connection Problem
Typically, the following exception message appears:
WARN 2015-11-11 11:25:50,784 [kafka-producer-network-thread | producer-1] WARN (Selector.java:276) - Error in I/O with localhost/127.0.0.1 java.net.ConnectException: Connection refused
When this occurs, the connection retry interval expires and the Kafka Connection Handler process abends. Ensure that the Kafka Brokers are running and that the host and port provided in the Kafka Producer properties file is correct.
Network shell commands (such as, netstat -l
) can be used on the machine hosting the Kafka broker to verify that Kafka is listening on the expected port.
Parent topic: Troubleshooting the Kafka Connect Handler