20 Using the Kafka Connect Handler

The Kafka Connect Handler is an extension of the standard Kafka messaging functionality.

This chapter describes how to use the Kafka Connect Handler.

20.1 Overview

The Oracle GoldenGate Kafka Connect is an extension of the standard Kafka messaging functionality. Kafka Connect is a functional layer on top of the standard Kafka Producer and Consumer interfaces. It provides standardization for messaging to make it easier to add new source and target systems into your topology.

Confluent is a primary adopter of Kafka Connect and their Confluent Platform offering includes extensions over the standard Kafka Connect functionality. This includes Avro serialization and deserialization, and an Avro schema registry. Much of the Kafka Connect functionality is available in Apache Kafka. A number of open source Kafka Connect integrations are found at:

https://www.confluent.io/product/connectors/

The Kafka Connect Handler is a Kafka Connect source connector. You can capture database changes from any database supported by Oracle GoldenGate and stream that change of data through the Kafka Connect layer to Kafka. You can also connect to Oracle Event Hub Cloud Services (EHCS) with this handler.

Kafka Connect uses proprietary objects to define the schemas (org.apache.kafka.connect.data.Schema) and the messages (org.apache.kafka.connect.data.Struct). The Kafka Connect Handler can be configured to manage what data is published and the structure of the published data.

The Kafka Connect Handler does not support any of the pluggable formatters that are supported by the Kafka Handler.

20.2 Detailed Functionality

JSON Converter

The Kafka Connect framework provides converters to convert in-memory Kafka Connect messages to a serialized format suitable for transmission over a network. These converters are selected using configuration in the Kafka Producer properties file.

Kafka Connect and the JSON converter is available as part of the Apache Kafka download. The JSON Converter converts the Kafka keys and values to JSONs which are then sent to a Kafka topic. You identify the JSON Converters with the following configuration in the Kafka Producer properties file:

key.converter=org.apache.kafka.connect.json.JsonConverter 
key.converter.schemas.enable=true 
value.converter=org.apache.kafka.connect.json.JsonConverter 
value.converter.schemas.enable=true

The format of the messages is the message schema information followed by the payload information. JSON is a self describing format so you should not include the schema information in each message published to Kafka.

To omit the JSON schema information from the messages set the following:

key.converter.schemas.enable=false
value.converter.schemas.enable=false

Avro Converter

Confluent provides Kafka installations, support for Kafka, and extended functionality built on top of Kafka to help realize the full potential of Kafka. Confluent provides both open source versions of Kafka (Confluent Open Source) and an enterprise edition (Confluent Enterprise), which is available for purchase.

A common Kafka use case is to send Avro messages over Kafka. This can create a problem on the receiving end as there is a dependency for the Avro schema in order to deserialize an Avro message. Schema evolution can increase the problem because received messages must be matched up with the exact Avro schema used to generate the message on the producer side. Deserializing Avro messages with an incorrect Avro schema can cause runtime failure, incomplete data, or incorrect data. Confluent has solved this problem by using a schema registry and the Confluent schema converters.

The following shows the configuration of the Kafka Producer properties file.

key.converter=io.confluent.connect.avro.AvroConverter 
value.converter=io.confluent.connect.avro.AvroConverter 
key.converter.schema.registry.url=http://localhost:8081 
value.converter.schema.registry.url=http://localhost:8081 

When messages are published to Kafka, the Avro schema is registered and stored in the schema registry. When messages are consumed from Kafka, the exact Avro schema used to create the message can be retrieved from the schema registry to deserialize the Avro message. This creates matching of Avro messages to corresponding Avro schemas on the receiving side, which solves this problem.

Following are the requirements to use the Avro Converters:

  • This functionality is available in both versions of Confluent Kafka (open source or enterprise).

  • The Confluent schema registry service must be running.

  • Source database tables must have an associated Avro schema. Messages associated with different Avro schemas must be sent to different Kafka topics.

  • The Confluent Avro converters and the schema registry client must be available in the classpath.

The schema registry keeps track of Avro schemas by topic. Messages must be sent to a topic that has the same schema or evolving versions of the same schema. Source messages have Avro schemas based on the source database table schema so Avro schemas are unique for each source table. Publishing messages to a single topic for multiple source tables will appear to the schema registry that the schema is evolving every time the message sent from a source table that is different from the previous message.

20.3 Setting Up and Running the Kafka Connect Handler

Instructions for configuring the Kafka Connect Handler components and running the handler are described in this section.

Classpath Configuration

Two things must be configured in the gg.classpath configuration variable so that the Kafka Connect Handler can to connect to Kafka and run. The required items are the Kafka Producer properties file and the Kafka client JARs. The Kafka client JARs must match the version of Kafka that the Kafka Connect Handler is connecting to. For a listing of the required client JAR files by version, see Kafka Handler Client Dependencies Kafka Connect Handler Client Dependencies. The recommended storage location for the Kafka Producer properties file is the Oracle GoldenGate dirprm directory.

The default location of the Kafka Connect client JARs is the Kafka_Home/libs/* directory.

The gg.classpath variable must be configured precisely. Pathing to the Kafka Producer properties file should contain the path with no wildcard appended. The inclusion of the asterisk (*) wildcard in the path to the Kafka Producer properties file causes it to be discarded. Pathing to the dependency JARs should include the * wildcard character to include all of the JAR files in that directory in the associated classpath. Do not use *.jar.

Following is an example of a correctly configured Apache Kafka classpath:

gg.classpath=dirprm:{kafka_install_dir}/libs/*

Following is an example of a correctly configured Confluent Kafka classpath:

gg.classpath={confluent_install_dir}/share/java/kafka-serde-tools/*:{confluent_install_dir}/share/java/kafka/*:{confluent_install_dir}/share/java/confluent-common/*

20.3.1 Kafka Connect Handler Configuration

Meta-column fields can be configured as the following property:

gg.handler.name.metaColumnsTemplate

To output the metacolumns as in previous versions configure the following:

gg.handler.name.metaColumnsTemplate=${objectname[table]},${optype[op_type]},${timestamp[op_ts]},${currenttimestamp[current_ts]},${position[pos]}

To also include the primary key columns and the tokens configure as follows:

gg.handler.name.format.metaColumnsTemplate=${objectname[table]},${optype[op_type]},${timestamp[op_ts]},${currenttimestamp[current_ts]},${position[pos]},${primarykeycolumns[primary_keys]},${alltokens[tokens]}

For more information see the configuration property

gg.handler.name.metaColumnsTemplate

Table 20-1 Kafka Connect Handler Configuration Properties

Properties Required/ Optional Legal Values Default Explanation
gg.handler.name.type

Required

kafkaconnect

None

The configuration to select the Kafka Connect Handler.

gg.handler.name.kafkaProducerConfigFile

Required

string

None

A path to a properties file containing the properties of the Kafka and Kafka Connect configuration properties.

gg.handler.name.topicMappingTemplate

Required

A template string value to resolve the Kafka topic name at runtime.

None

See Using Templates to Resolve the Topic Name and Message Key.

gg.handler.name.keyMappingTemplate

Required

A template string value to resolve the Kafka message key at runtime.

None

See Using Templates to Resolve the Topic Name and Message Key.

gg.handler.name.includeTokens

Optional

true | false

false

Set to true to include a map field in output messages. The key is tokens and the value is a map where the keys and values are the token keys and values from the Oracle GoldenGate source trail file.

Set to false to suppress this field.

gg.handler.name.messageFormatting

Optional

row | op

row

Controls how output messages are modeled. Selecting row and the output messages will be modeled as row. Set to op and the output messages will be modeled as operations messages.

gg.handler.name.insertOpKey

Optional

any string

I

The value of the field op_type to indicate an insert operation.

gg.handler.name.updateOpKey

Optional

any string

U

The value of the field op_type to indicate an insert operation.

gg.handler.name.deleteOpKey

Optional

any string

D

The value of the field op_type to indicate a delete operation.

gg.handler.name.truncateOpKey

Optional

any string

T

The value of the field op_type to indicate a truncate operation.

gg.handler.name.treatAllColumnsAsStrings

Optional

true | false

false

Set to true to treat all output fields as strings. Set to false and the Handler will map the corresponding field type from the source trail file to the best corresponding Kafka Connect data type.

gg.handler.name.mapLargeNumbersAsStrings

Optional

true | false

false

Large numbers are mapping to number fields as Doubles. It is possible to lose precision in certain scenarios.

If set to true these fields will be mapped as Strings in order to preserve precision.

gg.handler.name.pkUpdateHandling

Optional

abend | update | delete-insert

abend

Only applicable if modeling row messages gg.handler.name.messageFormatting=row. Not applicable if modeling operations messages as the before and after images are propagated to the message in the case of an update.

gg.handler.name.metaColumnsTemplate

Optional

Any of the metacolumns keywords.

None

A comma-delimited string consisting of one or more templated values that represent the template, see Setting Metacolumn Output.

gg.handler.name.includeIsMissingFields

Optional

true|false

true

Set to true to include an extract{column_name}.

Set this property for each column to allow downstream applications to differentiate if a null value is actually null in the source trail file or if it is missing in the source trail file.

gg.handler.name.enableDecimalLogicalType Optional true|false false Set to true to enable decimal logical types in Kafka Connect. Decimal logical types allow numbers which will not fit in a 64 bit data type to be represented.
gg.handler.name.oracleNumberScale Optional Positive Integer 38 Only applicable if gg.handler.name.enableDecimalLogicalType=true. Some source data types do not have a fixed scale associated with them. Scale must be set for Kafka Connectdecimal logical types. In the case of source types which do not have a scalein the metadata, the value of this parameter is used to set the scale.
gg.handler.name.EnableTimestampLogicalType Optional true|false false Set to true to enable the Kafka Connect timestamp logical type. The Kafka connect timestamp logical time is a integer measurement ofmilliseconds since the Java epoch. This means precision greater thanmilliseconds is not possible if the timestamp logica type is used. Use of this property requires that the gg.format.timestamp property be set. This property is the timestamp formatting string, which is used to determine the output of timestamps in string format. For example, gg.format.timestamp=yyyy-MM-dd HH:mm:ss.SSS. Ensure that the goldengate.userexit.timestamp property is not set in the configuration file. Setting this property prevents parsing the input timestamp into a Java object which is required for logical timestamps.
gg.handler.name.metaHeadersTemplate Optional Comma delimited list of metacolumn keywords. None Allows the user to select metacolumns to inject context-based key value pairs into Kafka message headers using the metacolumn keyword syntax.
gg.handler.name.schemaNamespace

Optional Any string without characters which violate the Kafka Connector Avro schema naming requirements. None Used to control the generated Kafka Connect schema name. If it is not set, then the schema name is the same as the qualified source table name. For example, if the source table is QASOURCE.TCUSTMER, then the Kafka Connect schema name will be the same.

This property allows you to control the generated schema name. For example, if this property is set to com.example.company, then the generated Kafka Connect schema name for the table QASOURCE.TCUSTMER is com.example.company.TCUSTMER.

See Using Templates to Resolve the Stream Name and Partition Name for more information.

Review a Sample Configuration

gg.handlerlist=kafkaconnect
#The handler properties
gg.handler.kafkaconnect.type=kafkaconnect
gg.handler.kafkaconnect.kafkaProducerConfigFile=kafkaconnect.properties
gg.handler.kafkaconnect.mode=op
#The following selects the topic name based on the fully qualified table name
gg.handler.kafkaconnect.topicMappingTemplate=${fullyQualifiedTableName}
#The following selects the message key using the concatenated primary keys
gg.handler.kafkaconnect.keyMappingTemplate=${primaryKeys}
#The formatter properties
gg.handler.kafkaconnect.messageFormatting=row
gg.handler.kafkaconnect.insertOpKey=I
gg.handler.kafkaconnect.updateOpKey=U
gg.handler.kafkaconnect.deleteOpKey=D
gg.handler.kafkaconnect.truncateOpKey=T
gg.handler.kafkaconnect.treatAllColumnsAsStrings=false
gg.handler.kafkaconnect.pkUpdateHandling=abend

20.3.2 Using Templates to Resolve the Topic Name and Message Key

The Kafka Connect Handler provides functionality to resolve the topic name and the message key at runtime using a template configuration value. Templates allow you to configure static values and keywords. Keywords are used to dynamically replace the keyword with the context of the current processing. Templates are applicable to the following configuration parameters:

gg.handler.name.topicMappingTemplate
gg.handler.name.keyMappingTemplate

Template Modes

The Kafka Connect Handler can only send operation messages. The Kafka Connect Handler cannot group operation messages into a larger transaction message.

Template Keywords

Keyword Explanation

${fullyQualifiedTableName}

Resolves to the fully qualified table name including the period (.) delimiter between the catalog, schema, and table names.

For example, test.dbo.table1.

${catalogName}

Resolves to the catalog name.

${schemaName}

Resolves to the schema name.

${tableName}

Resolves to the short table name.

${opType}

Resolves to the type of the operation: (INSERT, UPDATE, DELETE, or TRUNCATE)

${primaryKeys}

Resolves to the concatenated primary key values delimited by an underscore (_) character.

${position}

The sequence number of the source trail file followed by the offset (RBA).

${opTimestamp}

The operation timestamp from the source trail file.

${emptyString}

Resolves to “”.

${groupName}

Resolves to the name of the Replicat process. If using coordinated delivery, it resolves to the name of the Replicat process with the Replicate thread number appended.

${staticMap[]}

Resolves to a static value where the key is the fully-qualified table name. The keys and values are designated inside of the square brace in the following format:

${staticMap[dbo.table1=value1,dbo.table2=value2]}

${columnValue[]}

Resolves to a column value where the key is the fully-qualified table name and the value is the column name to be resolved. For example:

${staticMap[dbo.table1=col1,dbo.table2=col2]}

${currentTimestamp}

Or

${currentTimestamp[]}

Resolves to the current timestamp. You can control the format of the current timestamp using the Java based formatting as described in the SimpleDateFormat class, see https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html.

Examples:

${currentDate}
${currentDate[yyyy-mm-dd hh:MM:ss.SSS]}

${null}

Resolves to a NULL string.

${custom[]}

It is possible to write a custom value resolver. If required, contact Oracle Support.

Example Templates

The following describes example template configuration values and the resolved values.

Example Template Resolved Value

${groupName}_{fullyQualfiedTableName}

KAFKA001_dbo.table1

prefix_${schemaName}_${tableName}_suffix

prefix_dbo_table1_suffix

${currentDate[yyyy-mm-dd hh:MM:ss.SSS]}

2017-05-17 11:45:34.254

20.3.3 Configuring Security in the Kafka Connect Handler

Kafka version 0.9.0.0 introduced security through SSL/TLS or Kerberos. The Kafka Connect Handler can be secured using SSL/TLS or Kerberos. The Kafka producer client libraries provide an abstraction of security functionality from the integrations utilizing those libraries. The Kafka Connect Handler is effectively abstracted from security functionality. Enabling security requires setting up security for the Kafka cluster, connecting machines, and then configuring the Kafka Producer properties file, that the Kafka Handler uses for processing, with the required security properties.

You may encounter the inability to decrypt the Kerberos password from the keytab file. This causes the Kerberos authentication to fall back to interactive mode which cannot work because it is being invoked programmatically. The cause of this problem is that the Java Cryptography Extension (JCE) is not installed in the Java Runtime Environment (JRE). Ensure that the JCE is loaded in the JRE, see http://www.oracle.com/technetwork/java/javase/downloads/jce8-download-2133166.html.

20.4 Kafka Connect Handler Performance Considerations

There are multiple configuration settings both for the Oracle GoldenGate for Big Data configuration and in the Kafka producer which affect performance.

The Oracle GoldenGate parameter have the greatest affect on performance is the Replicat GROUPTRANSOPS parameter. The GROUPTRANSOPS parameter allows Replicat to group multiple source transactions into a single target transaction. At transaction commit, the Kafka Connect Handler calls flush on the Kafka Producer to push the messages to Kafka for write durability followed by a checkpoint. The flush call is an expensive call and setting the Replicat GROUPTRANSOPS setting to larger amount allows the replicat to call the flush call less frequently thereby improving performance.

The default setting for GROUPTRANSOPS is 1000 and performance improvements can be obtained by increasing the value to 2500, 5000, or even 10000.

The Op mode gg.handler.kafkaconnect.mode=op parameter can also improve performance than the Tx mode gg.handler.kafkaconnect.mode=tx.

A number of Kafka Producer properties can affect performance. The following are the parameters with significant impact:

  • linger.ms

  • batch.size

  • acks

  • buffer.memory

  • compression.type

Oracle recommends that you start with the default values for these parameters and perform performance testing to obtain a base line for performance. Review the Kafka documentation for each of these parameters to understand its role and adjust the parameters and perform additional performance testing to ascertain the performance effect of each parameter.

20.5 Kafka Interceptor Support

The Kafka Producer client framework supports the use of Producer Interceptors. A Producer Interceptor is simply a user exit from the Kafka Producer client whereby the Interceptor object is instantiated and receives notifications of Kafka message send calls and Kafka message send acknowledgement calls.

The typical use case for Interceptors is monitoring. Kafka Producer Interceptors must conform to the interface org.apache.kafka.clients.producer.ProducerInterceptor. The Kafka Connect Handler supports Producer Interceptor usage.

The requirements to using Interceptors in the Handlers are as follows:

  • The Kafka Producer configuration property "interceptor.classes" must be configured with the class name of the Interceptor(s) to be invoked.
  • In order to invoke the Interceptor(s), the jar files plus any dependency jars must be available to the JVM. Therefore, the jar files containing the Interceptor(s) plus any dependency jars must be added to the gg.classpath in the Handler configuration file.

    For more information, see Kafka documentation.

20.6 Kafka Partition Selection

Kafka topics comprise one or more partitions. Distribution to multiple partitions is a good way to improve Kafka ingest performance, because the Kafka client parallelizes message sending to different topic/partition combinations. Partition selection is controlled by a following calculation in the Kafka client.

(Hash of the Kafka message key) modulus (the number of partitions) = selected partition number

The Kafka message key is selected by the following configuration value:

gg.handler.{your handler name}.keyMappingTemplate=

If this parameter is set to a value which generates a static key, all messages will go to the same partition. The following is example of static keys:

gg.handler.{your handler name}.keyMappingTemplate=StaticValue

If this parameter is set to a value which generates a key that changes infrequently, partition selection changes infrequently. In the following example the table name is used as the message key. Every operation for a specific source table will have the same key and thereby route to the same partition:

gg.handler.{your handler name}.keyMappingTemplate=${tableName}
A null Kafka message key distributes to the partitions on a round-robin basis. To do this, set the following:
gg.handler.{your handler name}.keyMappingTemplate=${null}

The recommended setting for configuration of the mapping key is the following:

gg.handler.{your handler name}.keyMappingTemplate=${primaryKeys}

This generates a Kafka message key that is the concatenated and delimited primary key values.

Operations for each row should have a unique primary key(s) thereby generating a unique Kafka message key for each row. Another important consideration is Kafka messages sent to different partitions are not guaranteed to be delivered to a Kafka consumer in the original order sent. This is part of the Kafka specification. Order is only maintained within a partition. Using primary keys as the Kafka message key means that operations for the same row, which have the same primary key(s), generate the same Kafka message key, and therefore are sent to the same Kafka partition. In this way, the order is maintained for operations for the same row.

At the DEBUG log level the Kafka message coordinates (topic, partition, and offset) are logged to the .log file for successfully sent messages.

20.7 Troubleshooting the Kafka Connect Handler

20.7.1 Java Classpath for Kafka Connect Handler

Issues with the Java classpath are one of the most common problems. The indication of a classpath problem is a ClassNotFoundException in the Oracle GoldenGate Java log4j log file or and error while resolving the classpath if there is a typographic error in the gg.classpath variable.

The Kafka client libraries do not ship with the Oracle GoldenGate for Big Data product. You are required to obtain the correct version of the Kafka client libraries and to properly configure the gg.classpath property in the Java Adapter Properties file to correctly resolve the Java the Kafka client libraries as described in Setting Up and Running the Kafka Connect Handler.

20.7.2 Invalid Kafka Version

Kafka Connect was introduced in Kafka 0.9.0.0 version. The Kafka Connect Handler does not work with Kafka versions 0.8.2.2 and older. Attempting to use Kafka Connect with Kafka 0.8.2.2 version typically results in a ClassNotFoundException error at runtime.

20.7.3 Kafka Producer Properties File Not Found

Typically, the following exception message occurs:

ERROR 2015-11-11 11:49:08,482 [main] Error loading the kafka producer properties

Verify that the gg.handler.kafkahandler.KafkaProducerConfigFile configuration property for the Kafka Producer Configuration file name is set correctly.

Ensure that the gg.classpath variable includes the path to the Kafka Producer properties file and that the path to the properties file does not contain a * wildcard at the end.

20.7.4 Kafka Connection Problem

Typically, the following exception message appears:

WARN 2015-11-11 11:25:50,784 [kafka-producer-network-thread | producer-1] 

WARN  (Selector.java:276) - Error in I/O with localhost/127.0.0.1  java.net.ConnectException: Connection refused

When this occurs, the connection retry interval expires and the Kafka Connection Handler process abends. Ensure that the Kafka Brokers are running and that the host and port provided in the Kafka Producer properties file is correct.

Network shell commands (such as, netstat -l) can be used on the machine hosting the Kafka broker to verify that Kafka is listening on the expected port.