8.2.9 Apache Kafka

The Kafka Handler is designed to stream change capture data from an Oracle GoldenGate trail to a Kafka topic.

This chapter describes how to use the Kafka Handler.

8.2.9.1 Apache Kafka

The Kafka Handler is designed to stream change capture data from an Oracle GoldenGate trail to a Kafka topic.

This chapter describes how to use the Kafka Handler.

8.2.9.1.1 Overview

The Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA) Kafka Handler streams change capture data from an Oracle GoldenGate trail to a Kafka topic. Additionally, the Kafka Handler provides functionality to publish messages to a separate schema topic. Schema publication for Avro and JSON is supported.

Apache Kafka is an open source, distributed, partitioned, and replicated messaging service, see http://kafka.apache.org/.

Kafka can be run as a single instance or as a cluster on multiple servers. Each Kafka server instance is called a broker. A Kafka topic is a category or feed name to which messages are published by the producers and retrieved by consumers.

In Kafka, when the topic name corresponds to the fully-qualified source table name, the Kafka Handler implements a Kafka producer. The Kafka producer writes serialized change data capture, from multiple source tables to either a single configured topic or separating source operations, to different Kafka topics.

8.2.9.1.2 Detailed Functionality

Transaction Versus Operation Mode

The Kafka Handler sends instances of the Kafka ProducerRecord class to the Kafka producer API, which in turn publishes the ProducerRecord to a Kafka topic. The Kafka ProducerRecord effectively is the implementation of a Kafka message. The ProducerRecord has two components: a key and a value. Both the key and value are represented as byte arrays by the Kafka Handler. This section describes how the Kafka Handler publishes data.

Transaction Mode

The following configuration sets the Kafka Handler to transaction mode:

gg.handler.name.Mode=tx

In transaction mode, the serialized data is concatenated for every operation in a transaction from the source Oracle GoldenGate trail files. The contents of the concatenated operation data is the value of the Kafka ProducerRecord object. The key of the Kafka ProducerRecord object is NULL. The result is that Kafka messages comprise data from 1 to N operations, where N is the number of operations in the transaction.

For grouped transactions, all the data for all the operations are concatenated into a single Kafka message. Therefore, grouped transactions may result in very large Kafka messages that contain data for a large number of operations.

Operation Mode

The following configuration sets the Kafka Handler to operation mode:

gg.handler.name.Mode=op

In operation mode, the serialized data for each operation is placed into an individual ProducerRecord object as the value. The ProducerRecord key is the fully qualified table name of the source operation. The ProducerRecord is immediately sent using the Kafka Producer API. This means that there is a 1 to 1 relationship between the incoming operations and the number of Kafka messages produced.

Topic Name Selection

The topic is resolved at runtime using this configuration parameter:

gg.handler.topicMappingTemplate 

You can configure a static string, keywords, or a combination of static strings and keywords to dynamically resolve the topic name at runtime based on the context of the current operation, see Using Templates to Resolve the Topic Name and Message Key.

Kafka Broker Settings

To configure topics to be created automatically, set the auto.create.topics.enable property to true. This is the default setting.

If you set the auto.create.topics.enable property to false, then you must manually create topics before you start the Replicat process.

Schema Propagation

The schema data for all tables is delivered to the schema topic that is configured with the schemaTopicName property. For more information , see Schema Propagation.

8.2.9.1.3 Setting Up and Running the Kafka Handler

Instructions for configuring the Kafka Handler components and running the handler are described in this section.

You must install and correctly configure Kafka either as a single node or a clustered instance, see http://kafka.apache.org/documentation.html.

If you are using a Kafka distribution other than Apache Kafka, then consult the documentation for your Kafka distribution for installation and configuration instructions.

Zookeeper, a prerequisite component for Kafka and Kafka broker (or brokers), must be up and running.

Oracle recommends and considers it best practice that the data topic and the schema topic (if applicable) are preconfigured on the running Kafka brokers. You can create Kafka topics dynamically. However, this relies on the Kafka brokers being configured to allow dynamic topics.

If the Kafka broker is not collocated with the Kafka Handler process, then the remote host port must be reachable from the machine running the Kafka Handler.

8.2.9.1.3.1 Classpath Configuration

For the Kafka Handler to connect to Kafka and run, the Kafka Producer properties file and the Kafka client JARs must be configured in the gg.classpath configuration variable. The Kafka client JARs must match the version of Kafka that the Kafka Handler is connecting to. For a list of the required client JAR files by version, see Kafka Handler Client Dependencies.

The recommended storage location for the Kafka Producer properties file is the Oracle GoldenGate dirprm directory.

The default location of the Kafka client JARs is Kafka_Home/libs/*.

The gg.classpath must be configured precisely. The path of the Kafka Producer Properties file must contain the path with no wildcard appended. If the * wildcard is included in the path to the Kafka Producer Properties file, the file is not picked up. Conversely, path to the dependency JARs must include the * wild card character in order to include all the JAR files in that directory in the associated classpath. Do not use *.jar. The following is an example of the correctly configured classpath:

gg.classpath={kafka install dir}/libs/*

8.2.9.1.3.2 Kafka Handler Configuration

The following are the configurable values for the Kafka Handler. These properties are located in the Java Adapter properties file (not in the Replicat properties file).

To enable the selection of the Kafka Handler, you must first configure the handler type by specifying gg.handler.namr.type=kafka and the other Kafka properties as follows:

Table 8-9 Configuration Properties for Kafka Handler

Property Name Required / Optional Property Value Default Description

gg.handlerlist

Required

name (choice of any name)

None

List of handlers to be used.

gg.handler.name.type

Required

kafka

None

Type of handler to use.

gg.handler.name.KafkaProducerConfigFile

Optional

Any custom file name

kafka-producer-default.properties

Filename in classpath that holds Apache Kafka properties to configure the Apache Kafka producer.

gg.handler.name.Format

Optional

Formatter class or short code.

delimitedtext

Formatter to use to format payload. Can be one of xml, delimitedtext, json, json_row, avro_row, avro_op

gg.handler.name.SchemaTopicName

Required when schema delivery is required.

Name of the schema topic.

None

Topic name where schema data will be delivered. If this property is not set, schema will not be propagated. Schemas will be propagated only for Avro formatters.

gg.handler.name.SchemaPrClassName

Optional

Fully qualified class name of a custom class that implements Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA) Kafka Handler's CreateProducerRecord Java Interface.

Provided this implementation class: oracle.goldengate.handler.kafka

ProducerRecord

Schema is also propagated as a ProducerRecord. The default key is the fully qualified table name. If this needs to be changed for schema records, the custom implementation of the CreateProducerRecord interface needs to be created and this property needs to be set to point to the fully qualified name of the new class.

gg.handler.name.mode

Optional

tx/op

tx

With Kafka Handler operation mode, each change capture data record (Insert, Update, Delete, and so on) payload is represented as a Kafka Producer Record and is flushed one at a time. With Kafka Handler in transaction mode, all operations within a source transaction are represented as a single Kafka Producer record. This combined byte payload is flushed on a transaction Commit event.

gg.handler.name.topicMappingTemplate

Required

A template string value to resolve the Kafka topic name at runtime.

None

See Using Templates to Resolve the Topic Name and Message Key.

gg.handler.name.keyMappingTemplate

Required

A template string value to resolve the Kafka message key at runtime.

None

See Using Templates to Resolve the Topic Name and Message Key.

gg.hander.name.logSuccessfullySentMessages

Optional

true | false

true

Set to true, the Kafka Handler will log at the INFO level message that have been successfully sent to Kafka. Enabling this property has negative impact on performance.

gg.handler.name.metaHeadersTemplate Optional Comma delimited list of metacolumn keywords. None Allows the user to select metacolumns to inject context-based key value pairs into Kafka message headers using the metacolumn keyword syntax.
8.2.9.1.3.3 Java Adapter Properties File

The following is a sample configuration for the Kafka Handler from the Adapter properties file:

gg.handlerlist = kafkahandler
gg.handler.kafkahandler.Type = kafka
gg.handler.kafkahandler.KafkaProducerConfigFile = custom_kafka_producer.properties
gg.handler.kafkahandler.topicMappingTemplate=oggtopic
gg.handler.kafkahandler.keyMappingTemplate=${currentTimestamp}
gg.handler.kafkahandler.Format = avro_op
gg.handler.kafkahandler.SchemaTopicName = oggSchemaTopic
gg.handler.kafkahandler.SchemaPrClassName = com.company.kafkaProdRec.SchemaRecord
gg.handler.kafkahandler.Mode = tx

You can find a sample Replicat configuration and a Java Adapter Properties file for a Kafka integration in the following directory:

GoldenGate_install_directory/AdapterExamples/big-data/kafka

8.2.9.1.3.4 Kafka Producer Configuration File

The Kafka Handler must access a Kafka producer configuration file in order to publish messages to Kafka. The file name of the Kafka producer configuration file is controlled by the following configuration in the Kafka Handler properties.

gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties

The Kafka Handler attempts to locate and load the Kafka producer configuration file by using the Java classpath. Therefore, the Java classpath must include the directory containing the Kafka Producer Configuration File.

The Kafka producer configuration file contains Kafka proprietary properties. The Kafka documentation provides configuration information for the 0.8.2.0 Kafka producer interface properties. The Kafka Handler uses these properties to resolve the host and port of the Kafka brokers, and properties in the Kafka producer configuration file control the behavior of the interaction between the Kafka producer client and the Kafka brokers.

A sample of configuration file for the Kafka producer is as follows:

bootstrap.servers=localhost:9092
acks = 1
compression.type = gzip
reconnect.backoff.ms = 1000
 
value.serializer = org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer = org.apache.kafka.common.serialization.ByteArraySerializer
# 100KB per partition
batch.size = 102400
linger.ms = 0
max.request.size = 1048576 
send.buffer.bytes = 131072
8.2.9.1.3.4.1 Encrypt Kafka Producer Properties
The sensitive properties within the Kafka Producer Configuration File can be encrypted using the Oracle GoldenGate Credential Store.

For more information about how to use Credential Store, see Using Identities in Oracle GoldenGate Credential Store.

For example, the following kafka property:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule  required
username="alice" password="alice"; 
can be replaced with:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule  required
username=ORACLEWALLETUSERNAME[alias domain_name]  password=ORACLEWALLETPASSWORD[alias
domain_name];
8.2.9.1.3.5 Using Templates to Resolve the Topic Name and Message Key

The Kafka Handler provides functionality to resolve the topic name and the message key at runtime using a template configuration value. Templates allow you to configure static values and keywords. Keywords are used to dynamically resolve content at runtime and inject that resolved value into the resolved string.

The templates use the following configuration properties:

gg.handler.name.topicMappingTemplate
gg.handler.name.keyMappingTemplate

Template Modes

Source database transactions are made up of one or more individual operations that are the individual inserts, updates, and deletes. The Kafka Handler can be configured to send one message per operation (insert, update, delete), or alternatively can be configured to group operations into messages at the transaction level. Many template keywords resolve data based on the context of an individual source database operation. Therefore, many of the keywords do not work when sending messages at the transaction level. For example, using ${fullyQualifiedTableName} does not work when sending messages at the transaction level rather it resolves to the qualified source table name for an operation. However, transactions can contain multiple operations for many source tables. Resolving the fully qualified table name for messages at the transaction level is non-deterministic so abends at runtime.

For more information about the Template Keywords, see Template Keywords.
8.2.9.1.3.6 Kafka Configuring with Kerberos on a Hadoop Platform

Use these steps to configure a Kafka Handler Replicat with Kerberos to enable a Cloudera instance to process an Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA) trail to a Kafka topic:

  1. In GGSCI, add a Kafka Replicat:
    GGSCI> add replicat kafka, exttrail dirdat/gg
  2. Configure a prm file with these properties:
    replicat kafka
    discardfile ./dirrpt/kafkax.dsc, purge
    SETENV (TZ=PST8PDT)
    GETTRUNCATES
    GETUPDATEBEFORES
    ReportCount Every 1000 Records, Rate
    MAP qasource.*, target qatarget.*;
  3. Configure a Replicat properties file as follows:
    ###KAFKA Properties file ###
    gg.log=log4j
    gg.log.level=info
    gg.report.time=30sec
    
    ###Kafka Classpath settings ###
    gg.classpath=/opt/cloudera/parcels/KAFKA-2.1.0-1.2.1.0.p0.115/lib/kafka/libs/*
    jvm.bootoptions=-Xmx64m -Xms64m -Djava.class.path=./ggjava/ggjava.jar -Dlog4j.configuration=log4j.properties -Djava.security.auth.login.config=/scratch/ydama/ogg/v123211/dirprm/jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf
    
    ### Kafka handler properties ###
    gg.handlerlist = kafkahandler
    gg.handler.kafkahandler.type=kafka
    gg.handler.kafkahandler.KafkaProducerConfigFile=kafka-producer.properties
    gg.handler.kafkahandler.format=delimitedtext
    gg.handler.kafkahandler.format.PkUpdateHandling=update
    gg.handler.kafkahandler.mode=op
    gg.handler.kafkahandler.format.includeCurrentTimestamp=false
    gg.handler.kafkahandler.format.fieldDelimiter=|
    gg.handler.kafkahandler.format.lineDelimiter=CDATA[\n]
    gg.handler.kafkahandler.topicMappingTemplate=myoggtopic
    gg.handler.kafkahandler.keyMappingTemplate=${position}
  4. Configure a Kafka Producer file with these properties:
    bootstrap.servers=10.245.172.52:9092
    acks=1
    #compression.type=snappy
    reconnect.backoff.ms=1000
    value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
    key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
    batch.size=1024
    linger.ms=2000
    
    security.protocol=SASL_PLAINTEXT
    
    sasl.kerberos.service.name=kafka
    sasl.mechanism=GSSAPI
  5. Configure a jaas.conf file with these properties:

    KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/scratch/ydama/ogg/v123211/dirtmp/keytabs/slc06unm/kafka.keytab"
    principal="kafka/slc06unm.us.oracle.com@HADOOPTEST.ORACLE.COM";
    };
  6. Ensure that you have the latest key.tab files from the Cloudera instance to connect secured Kafka topics.

  7. Start the Replicat from GGSCI and make sure that it is running with INFO ALL.

  8. Review the Replicat report to see the total number of records processed. The report is similar to:

    Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA)
    
    Copyright (c) 2007, 2018. Oracle and/or its affiliates. All rights reserved
    
    Built with Java 1.8.0_161 (class version: 52.0)
    
    2018-08-05 22:15:28 INFO OGG-01815 Virtual Memory Facilities for: COM
    anon alloc: mmap(MAP_ANON) anon free: munmap
    file alloc: mmap(MAP_SHARED) file free: munmap
    target directories:
    /scratch/ydama/ogg/v123211/dirtmp.
    
    Database Version:
    
    Database Language and Character Set:
    
    ***********************************************************************
    ** Run Time Messages **
    ***********************************************************************
    
    
    2018-08-05 22:15:28 INFO OGG-02243 Opened trail file /scratch/ydama/ogg/v123211/dirdat/kfkCustR/gg000000 at 2018-08-05 22:15:28.258810.
    
    2018-08-05 22:15:28 INFO OGG-03506 The source database character set, as determined from the trail file, is UTF-8.
    
    2018-08-05 22:15:28 INFO OGG-06506 Wildcard MAP resolved (entry qasource.*): MAP "QASOURCE"."BDCUSTMER1", target qatarget."BDCUSTMER1".
    
    2018-08-05 22:15:28 INFO OGG-02756 The definition for table QASOURCE.BDCUSTMER1 is obtained from the trail file.
    
    2018-08-05 22:15:28 INFO OGG-06511 Using following columns in default map by name: CUST_CODE, NAME, CITY, STATE.
    
    2018-08-05 22:15:28 INFO OGG-06510 Using the following key columns for target table qatarget.BDCUSTMER1: CUST_CODE.
    
    2018-08-05 22:15:29 INFO OGG-06506 Wildcard MAP resolved (entry qasource.*): MAP "QASOURCE"."BDCUSTORD1", target qatarget."BDCUSTORD1".
    
    2018-08-05 22:15:29 INFO OGG-02756 The definition for table QASOURCE.BDCUSTORD1 is obtained from the trail file.
    
    2018-08-05 22:15:29 INFO OGG-06511 Using following columns in default map by name: CUST_CODE, ORDER_DATE, PRODUCT_CODE, ORDER_ID, PRODUCT_PRICE, PRODUCT_AMOUNT, TRANSACTION_ID.
    
    2018-08-05 22:15:29 INFO OGG-06510 Using the following key columns for target table qatarget.BDCUSTORD1: CUST_CODE, ORDER_DATE, PRODUCT_CODE, ORDER_ID.
    
    2018-08-05 22:15:33 INFO OGG-01021 Command received from GGSCI: STATS.
    
    2018-08-05 22:16:03 INFO OGG-01971 The previous message, 'INFO OGG-01021', repeated 1 times.
    
    2018-08-05 22:43:27 INFO OGG-01021 Command received from GGSCI: STOP.
    
    ***********************************************************************
    * ** Run Time Statistics ** *
    ***********************************************************************
    
    Last record for the last committed transaction is the following:
    ___________________________________________________________________
    Trail name : /scratch/ydama/ogg/v123211/dirdat/kfkCustR/gg000000
    Hdr-Ind : E (x45) Partition : . (x0c)
    UndoFlag : . (x00) BeforeAfter: A (x41)
    RecLength : 0 (x0000) IO Time : 2015-08-14 12:02:20.022027
    IOType : 100 (x64) OrigNode : 255 (xff)
    TransInd : . (x03) FormatType : R (x52)
    SyskeyLen : 0 (x00) Incomplete : . (x00)
    AuditRBA : 78233 AuditPos : 23968384
    Continued : N (x00) RecCount : 1 (x01)
    
    2015-08-14 12:02:20.022027 GGSPurgedata Len 0 RBA 6473
    TDR Index: 2
    ___________________________________________________________________
    
    Reading /scratch/ydama/ogg/v123211/dirdat/kfkCustR/gg000000, current RBA 6556, 20 records, m_file_seqno = 0, m_file_rba = 6556
    
    Report at 2018-08-05 22:43:27 (activity since 2018-08-05 22:15:28)
    
    From Table QASOURCE.BDCUSTMER1 to qatarget.BDCUSTMER1:
    # inserts: 5
    # updates: 1
    # deletes: 0
    # discards: 0
    From Table QASOURCE.BDCUSTORD1 to qatarget.BDCUSTORD1:
    # inserts: 5
    # updates: 3
    # deletes: 5
    # truncates: 1
    # discards: 0
    
    
  9. Ensure that the secure Kafka topic is created:

    /kafka/bin/kafka-topics.sh --zookeeper slc06unm:2181 --list  
    myoggtopic
  10. Review the contents of the secure Kafka topic:

    1. Create a consumer.properties file containing:

      security.protocol=SASL_PLAINTEXT
      sasl.kerberos.service.name=kafka
    2. Set this environment variable:

      export KAFKA_OPTS="-Djava.security.auth.login.config="/scratch/ogg/v123211/dirprm/jaas.conf"
      
    3. Run the consumer utility to check the records:

      /kafka/bin/kafka-console-consumer.sh --bootstrap-server sys06:9092 --topic myoggtopic --new-consumer --consumer.config consumer.properties
8.2.9.1.3.7 Kafka SSL Support

Kafka support SSL connectivity between Kafka clients and the Kafka cluster. SSL connectivity provides both authentication and encryption of messages transported between the client and the server.

SSL can be configured for server authentication (client authenticates server) but is generally configured for mutual authentication (both client and server authenticate each other). In an SSL mutual authentication, each side of the connection retrieves a certificate from its keystore and passes it to the other side of the connection, which verifies the certificate against the certificate in its truststore.
When you set up SSL, see the Kafka documentation for more information about the specific Kafka version that you are running. The Kafka documentation also provides information on how to do the following:
  • Set up the Kafka cluster for SSL
  • Create self signed certificates in a keystore/truststore file
  • Configure the Kafka clients for SSL
Oracle recommends you to implement the SSL connectivity using the Kafka producer and consumer command line utilities before attempting to use it with Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA). The SSL connectivity should be confirmed between the machine hosting GG for DAA and the Kafka cluster. This action proves that SSL connectivity is correctly set up and working prior to introducing GG for DAA.
The following is an example of Kafka producer configuration with SSL mutual authentication:
bootstrap.servers=localhost:9092
acks=1
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
security.protocol=SSL
ssl.keystore.location=/var/private/ssl/server.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
ssl.truststore.location=/var/private/ssl/server.truststore.jks
ssl.truststore.password=test1234

8.2.9.1.4 Schema Propagation

The Kafka Handler provides the ability to publish schemas to a schema topic. Currently, the Avro Row and Operation formatters are the only formatters that are enabled for schema publishing. If the Kafka Handler schemaTopicName property is set, then the schema is published for the following events:

  • The Avro schema for a specific table is published the first time an operation for that table is encountered.

  • If the Kafka Handler receives a metadata change event, the schema is flushed. The regenerated Avro schema for a specific table is published the next time an operation for that table is encountered.

  • If the Avro wrapping functionality is enabled, then the generic wrapper Avro schema is published the first time that any operation is encountered. To enable the generic wrapper, Avro schema functionality is enabled in the Avro formatter configuration, see Avro Row Formatter and The Avro Operation Formatter.

The Kafka ProducerRecord value is the schema, and the key is the fully qualified table name.

Because Avro messages directly depend on an Avro schema, user of Avro over Kafka may encounter issues. Avro messages are not human readable because they are binary. To deserialize an Avro message, the receiver must first have the correct Avro schema, but because each table from the source database results in a separate Avro schema, this can be difficult. The receiver of a Kafka message cannot determine which Avro schema to use to deserialize individual messages when the source Oracle GoldenGate trail file includes operations from multiple tables. To solve this problem, you can wrap the specialized Avro messages in a generic Avro message wrapper. This generic Avro wrapper provides the fully-qualified table name, the hashcode of the schema string, and the wrapped Avro message. The receiver can use the fully-qualified table name and the hashcode of the schema string to resolve the associated schema of the wrapped message, and then use that schema to deserialize the wrapped message.

8.2.9.1.5 Performance Considerations

For the best performance, Oracle recommends that you send the Kafka Handler to operate in operation mode.

gg.handler.name.mode = op

Additionally, Oracle recommends that you set the batch.size and linger.ms values in the Kafka Producer properties file. These values are highly dependent upon the use case scenario. Typically, higher values result in better throughput, but latency is increased. Smaller values in these properties reduces latency but overall throughput decreases.

Use of the Replicat variable GROUPTRANSOPS also improves performance. The recommended setting is 10000.

If the serialized operations from the source trail file must be delivered in individual Kafka messages, then the Kafka Handler must be set to operation mode.

gg.handler.name.mode = op

8.2.9.1.6 About Security

Kafka version 0.9.0.0 introduced security through SSL/TLS and SASL (Kerberos). You can secure the Kafka Handler using one or both of the SSL/TLS and SASL security offerings. The Kafka producer client libraries provide an abstraction of security functionality from the integrations that use those libraries. The Kafka Handler is effectively abstracted from security functionality. Enabling security requires setting up security for the Kafka cluster, connecting machines, and then configuring the Kafka producer properties file with the required security properties. For detailed instructions about securing the Kafka cluster, see the Kafka documentation at

You may encounter the inability to decrypt the Kerberos password from the keytab file. This causes the Kerberos authentication to fall back to interactive mode which cannot work because it is being invoked programmatically. The cause of this problem is that the Java Cryptography Extension (JCE) is not installed in the Java Runtime Environment (JRE). Ensure that the JCE is loaded in the JRE, see http://www.oracle.com/technetwork/java/javase/downloads/jce8-download-2133166.html.

8.2.9.1.7 Metadata Change Events

Metadata change events are now handled in the Kafka Handler. This is relevant only if you have configured a schema topic and the formatter used supports schema propagation (currently Avro row and Avro Operation formatters). The next time an operation is encountered for a table for which the schema has changed, the updated schema is published to the schema topic.

To support metadata change events, the Oracle GoldenGate process capturing changes in the source database must support the Oracle GoldenGate metadata in trail feature, which was introduced in Oracle GoldenGate 12c (12.2).

8.2.9.1.8 Snappy Considerations

The Kafka Producer Configuration file supports the use of compression. One of the configurable options is Snappy, an open source compression and decompression (codec) library that provides better performance than other codec libraries. The Snappy JAR does not run on all platforms. Snappy may work on Linux systems though may or may not work on other UNIX and Windows implementations. If you want to use Snappy compression, test Snappy on all required systems before implementing compression using Snappy. If Snappy does not port to all required systems, then Oracle recommends using an alternate codec library.

8.2.9.1.9 Kafka Interceptor Support

The Kafka Producer client framework supports the use of Producer Interceptors. A Producer Interceptor is simply a user exit from the Kafka Producer client whereby the Interceptor object is instantiated and receives notifications of Kafka message send calls and Kafka message send acknowledgement calls.

The typical use case for Interceptors is monitoring. Kafka Producer Interceptors must conform to the interface org.apache.kafka.clients.producer.ProducerInterceptor. The Kafka Handler supports Producer Interceptor usage.

The requirements to using Interceptors in the Handlers are as follows:

  • The Kafka Producer configuration property "interceptor.classes" must be configured with the class name of the Interceptor(s) to be invoked.
  • In order to invoke the Interceptor(s), the jar files plus any dependency jars must be available to the JVM. Therefore, the jar files containing the Interceptor(s) plus any dependency jars must be added to the gg.classpath in the Handler configuration file.

    For more information, see Kafka documentation.

8.2.9.1.10 Kafka Partition Selection

Kafka topics comprise one or more partitions. Distribution to multiple partitions is a good way to improve Kafka ingest performance, because the Kafka client parallelizes message sending to different topic/partition combinations. Partition selection is controlled by a following calculation in the Kafka client.

(Hash of the Kafka message key) modulus (the number of partitions) = selected partition number

The Kafka message key is selected by the following configuration value:

gg.handler.{your handler name}.keyMappingTemplate=

If this parameter is set to a value which generates a static key, all messages will go to the same partition. The following is example of static keys:

gg.handler.{your handler name}.keyMappingTemplate=StaticValue

If this parameter is set to a value which generates a key that changes infrequently, partition selection changes infrequently. In the following example the table name is used as the message key. Every operation for a specific source table will have the same key and thereby route to the same partition:

gg.handler.{your handler name}.keyMappingTemplate=${tableName}
A null Kafka message key distributes to the partitions on a round-robin basis. To do this, set the following:
gg.handler.{your handler name}.keyMappingTemplate=${null}

The recommended setting for configuration of the mapping key is the following:

gg.handler.{your handler name}.keyMappingTemplate=${primaryKeys}

This generates a Kafka message key that is the concatenated and delimited primary key values.

Operations for each row should have a unique primary key(s) thereby generating a unique Kafka message key for each row. Another important consideration is Kafka messages sent to different partitions are not guaranteed to be delivered to a Kafka consumer in the original order sent. This is part of the Kafka specification. Order is only maintained within a partition. Using primary keys as the Kafka message key means that operations for the same row, which have the same primary key(s), generate the same Kafka message key, and therefore are sent to the same Kafka partition. In this way, the order is maintained for operations for the same row.

At the DEBUG log level the Kafka message coordinates (topic, partition, and offset) are logged to the .log file for successfully sent messages.

8.2.9.1.11 Troubleshooting

8.2.9.1.11.1 Verify the Kafka Setup

You can use the command line Kafka producer to write dummy data to a Kafka topic, and you can use a Kafka consumer to read this data from the Kafka topic. Use this method to verify the setup and read/write permissions to Kafka topics on disk, see http://kafka.apache.org/documentation.html#quickstart.

8.2.9.1.11.2 Classpath Issues

Java classpath problems are common. Such problems may include a ClassNotFoundException problem in the log4j log file or may be an error resolving the classpath because of a typographic error in the gg.classpath variable. The Kafka client libraries do not ship with the Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA) product. You must obtain the correct version of the Kafka client libraries and properly configure the gg.classpath property in the Java Adapter Properties file to correctly resolve the Java the Kafka client libraries as described in Classpath Configuration.

8.2.9.1.11.3 Invalid Kafka Version

The Kafka Handler does not support Kafka versions 0.8.2.2 or older. If you run an unsupported version of Kafka, a runtime Java exception, java.lang.NoSuchMethodError, occurs. It implies that the  org.apache.kafka.clients.producer.KafkaProducer.flush() method cannot be found. If you encounter this error, migrate to Kafka version 0.9.0.0 or later.

8.2.9.1.11.4 Kafka Producer Properties File Not Found

This problem typically results in the following exception:

ERROR 2015-11-11 11:49:08,482 [main] Error loading the kafka producer properties

Check the gg.handler.kafkahandler.KafkaProducerConfigFile configuration variable to ensure that the Kafka Producer Configuration file name is set correctly. Check the gg.classpath variable to verify that the classpath includes the path to the Kafka Producer properties file, and that the path to the properties file does not contain a * wildcard at the end.

8.2.9.1.11.5 Kafka Connection Problem

This problem occurs when the Kafka Handler is unable to connect to Kafka. You receive the following warnings:

WARN 2015-11-11 11:25:50,784 [kafka-producer-network-thread | producer-1] WARN  (Selector.java:276) - Error in I/O with localhost/127.0.0.1 
java.net.ConnectException: Connection refused

The connection retry interval expires, and the Kafka Handler process abends. Ensure that the Kafka Broker is running and that the host and port provided in the Kafka Producer Properties file are correct. You can use network shell commands (such as netstat -l) on the machine hosting the Kafka broker to verify that Kafka is listening on the expected port.

8.2.9.1.12 Kafka Handler Client Dependencies

What are the dependencies for the Kafka Handler to connect to Apache Kafka databases?

The maven central repository artifacts for Kafka databases are:

Maven groupId: org.apache.kafka

Maven atifactId: kafka-clients

Maven version: the Kafka version numbers listed for each section

8.2.9.1.12.1 Kafka 2.8.0
kafka-clients-2.8.0.jar
lz4-java-1.7.1.jar
slf4j-api-1.7.30.jar
snappy-java-1.1.8.1.jar
zstd-jni-1.4.9-1.jar
8.2.9.1.12.2 Kafka 2.7.0
kafka-clients-2.7.0.jar
lz4-java-1.7.1.jar
slf4j-api-1.7.30.jar
snappy-java-1.1.7.7.jar
zstd-jni-1.4.5-6.jar
8.2.9.1.12.3 Kafka 2.6.0
kafka-clients-2.6.0.jar
lz4-java-1.7.1.jar
slf4j-api-1.7.30.jar
snappy-java-1.1.7.3.jar
zstd-jni-1.4.4-7.jar
8.2.9.1.12.4 Kafka 2.5.1
kafka-clients-2.5.1.jar
lz4-java-1.7.1.jar
slf4j-api-1.7.30.jar
snappy-java-1.1.7.3.jar
zstd-jni-1.4.4-7.jar
8.2.9.1.12.5 Kafka 2.4.1
kafka-clients-2.4.1.jar
lz4-java-1.6.0.jar
slf4j-api-1.7.28.jar
snappy-java-1.1.7.3.jar
zstd-jni-1.4.3-1.jarr
8.2.9.1.12.6 Kafka 2.3.1
kafka-clients-2.3.1.jar
lz4-java-1.6.0.jar
slf4j-api-1.7.26.jar
snappy-java-1.1.7.3.jar
zstd-jni-1.4.0-1.jar

8.2.9.2 Apache Kafka Connect Handler

The Kafka Connect Handler is an extension of the standard Kafka messaging functionality.

This chapter describes how to use the Kafka Connect Handler.

8.2.9.2.1 Overview

The Oracle GoldenGate Kafka Connect is an extension of the standard Kafka messaging functionality. Kafka Connect is a functional layer on top of the standard Kafka Producer and Consumer interfaces. It provides standardization for messaging to make it easier to add new source and target systems into your topology.

Confluent is a primary adopter of Kafka Connect and their Confluent Platform offering includes extensions over the standard Kafka Connect functionality. This includes Avro serialization and deserialization, and an Avro schema registry. Much of the Kafka Connect functionality is available in Apache Kafka. A number of open source Kafka Connect integrations are found at:

https://www.confluent.io/product/connectors/

The Kafka Connect Handler is a Kafka Connect source connector. You can capture database changes from any database supported by Oracle GoldenGate and stream that change of data through the Kafka Connect layer to Kafka. You can also connect to Oracle Event Hub Cloud Services (EHCS) with this handler.

Kafka Connect uses proprietary objects to define the schemas (org.apache.kafka.connect.data.Schema) and the messages (org.apache.kafka.connect.data.Struct). The Kafka Connect Handler can be configured to manage what data is published and the structure of the published data.

The Kafka Connect Handler does not support any of the pluggable formatters that are supported by the Kafka Handler.

8.2.9.2.2 Detailed Functionality

JSON Converter

The Kafka Connect framework provides converters to convert in-memory Kafka Connect messages to a serialized format suitable for transmission over a network. These converters are selected using configuration in the Kafka Producer properties file.

Kafka Connect and the JSON converter is available as part of the Apache Kafka download. The JSON Converter converts the Kafka keys and values to JSONs which are then sent to a Kafka topic. You identify the JSON Converters with the following configuration in the Kafka Producer properties file:

key.converter=org.apache.kafka.connect.json.JsonConverter 
key.converter.schemas.enable=true 
value.converter=org.apache.kafka.connect.json.JsonConverter 
value.converter.schemas.enable=true

The format of the messages is the message schema information followed by the payload information. JSON is a self describing format so you should not include the schema information in each message published to Kafka.

To omit the JSON schema information from the messages set the following:

key.converter.schemas.enable=false
value.converter.schemas.enable=false

Avro Converter

Confluent provides Kafka installations, support for Kafka, and extended functionality built on top of Kafka to help realize the full potential of Kafka. Confluent provides both open source versions of Kafka (Confluent Open Source) and an enterprise edition (Confluent Enterprise), which is available for purchase.

A common Kafka use case is to send Avro messages over Kafka. This can create a problem on the receiving end as there is a dependency for the Avro schema in order to deserialize an Avro message. Schema evolution can increase the problem because received messages must be matched up with the exact Avro schema used to generate the message on the producer side. Deserializing Avro messages with an incorrect Avro schema can cause runtime failure, incomplete data, or incorrect data. Confluent has solved this problem by using a schema registry and the Confluent schema converters.

The following shows the configuration of the Kafka Producer properties file.

key.converter=io.confluent.connect.avro.AvroConverter 
value.converter=io.confluent.connect.avro.AvroConverter 
key.converter.schema.registry.url=http://localhost:8081 
value.converter.schema.registry.url=http://localhost:8081 

When messages are published to Kafka, the Avro schema is registered and stored in the schema registry. When messages are consumed from Kafka, the exact Avro schema used to create the message can be retrieved from the schema registry to deserialize the Avro message. This creates matching of Avro messages to corresponding Avro schemas on the receiving side, which solves this problem.

Following are the requirements to use the Avro Converters:

  • This functionality is available in both versions of Confluent Kafka (open source or enterprise).
  • The Confluent schema registry service must be running.
  • Source database tables must have an associated Avro schema. Messages associated with different Avro schemas must be sent to different Kafka topics.
  • The Confluent Avro converters and the schema registry client must be available in the classpath.

The schema registry keeps track of Avro schemas by topic. Messages must be sent to a topic that has the same schema or evolving versions of the same schema. Source messages have Avro schemas based on the source database table schema so Avro schemas are unique for each source table. Publishing messages to a single topic for multiple source tables will appear to the schema registry that the schema is evolving every time the message sent from a source table that is different from the previous message.

Protobuf Converter

The Protobuf Converter allows Kafka Connect messages to be formatted as Google Protocol Buffers format. The Protobuf Converter integrates with the Confluent schema registry and this functionality is available in both the open source and enterprise versions of Confluent. Confluent added the Protobuf Converter starting in Confluent version 5.5.0.

The following shows the configuration to select the Protobuf Converter in the Kafka Producer Properties file:
key.converter=io.confluent.connect.protobuf.ProtobufConverter
value.converter=io.confluent.connect.protobuf.ProtobufConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter.schema.registry.url=http://localhost:8081

The requirements to use the Protobuf Converter are as follows:

  • This functionality is available in both versions of Confluent Kafka (open source or enterprise) starting in 5.5.0.
  • The Confluent schema registry service must be running.
  • Messages with different schemas (source tables) should be sent to different Kafka topics.
  • The Confluent Protobuf converter and the schema registry client must be available in the classpath.

The schema registry keeps track of Protobuf schemas by topic. Messages must be sent to a topic that has the same schema or evolving versions of the same schema. Source messages have Protobuf schemas based on the source database table schema so Protobuf schemas are unique for each source table. Publishing messages to a single topic for multiple source tables will appear to the schema registry that the schema is evolving every time the message sent from a source table that is different from the previous message.

8.2.9.2.3 Setting Up and Running the Kafka Connect Handler

Instructions for configuring the Kafka Connect Handler components and running the handler are described in this section.

Classpath Configuration

Two things must be configured in the gg.classpath configuration variable so that the Kafka Connect Handler can to connect to Kafka and run. The required items are the Kafka Producer properties file and the Kafka client JARs. The Kafka client JARs must match the version of Kafka that the Kafka Connect Handler is connecting to. For a listing of the required client JAR files by version, see Kafka Handler Client Dependencies Kafka Connect Handler Client Dependencies. The recommended storage location for the Kafka Producer properties file is the Oracle GoldenGate dirprm directory.

The default location of the Kafka Connect client JARs is the Kafka_Home/libs/* directory.

The gg.classpath variable must be configured precisely. Pathing to the Kafka Producer properties file should contain the path with no wildcard appended. The inclusion of the asterisk (*) wildcard in the path to the Kafka Producer properties file causes it to be discarded. Pathing to the dependency JARs should include the * wildcard character to include all of the JAR files in that directory in the associated classpath. Do not use *.jar.

Following is an example of a correctly configured Apache Kafka classpath:

gg.classpath=dirprm:{kafka_install_dir}/libs/*

Following is an example of a correctly configured Confluent Kafka classpath:

gg.classpath={confluent_install_dir}/share/java/kafka-serde-tools/*:{confluent_install_dir}/share/java/kafka/*:{confluent_install_dir}/share/java/confluent-common/*
8.2.9.2.3.1 Kafka Connect Handler Configuration

The automated output of meta-column fields in generated Kafka Connect messages has been removed as of Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA) 21c release.

Meta-column fields can be configured as the following property:

gg.handler.name.metaColumnsTemplate

To output the metacolumns as in previous versions configure the following:

gg.handler.name.metaColumnsTemplate=${objectname[table]},${optype[op_type]},${timestamp[op_ts]},${currenttimestamp[current_ts]},${position[pos]}

To also include the primary key columns and the tokens configure as follows:

gg.handler.name.metaColumnsTemplate=${objectname[table]},${optype[op_type]},${timestamp[op_ts]},${currenttimestamp[current_ts]},${position[pos]},${primarykeycolumns[primary_keys]},${alltokens[tokens]}

For more information see the configuration property:

gg.handler.name.metaColumnsTemplate

Table 8-10 Kafka Connect Handler Configuration Properties

Properties Required/ Optional Legal Values Default Explanation
gg.handler.name.type

Required

kafkaconnect

None

The configuration to select the Kafka Connect Handler.

gg.handler.name.kafkaProducerConfigFile

Required

string

None

Name of the properties file containing the properties of the Kafka and Kafka Connect configuration properties. This file must be part of the classpath configured by the gg.classpath property.

gg.handler.name.topicMappingTemplate

Required

A template string value to resolve the Kafka topic name at runtime.

None

See Using Templates to Resolve the Topic Name and Message Key.

gg.handler.name.keyMappingTemplate

Required

A template string value to resolve the Kafka message key at runtime.

None

See Using Templates to Resolve the Topic Name and Message Key.

gg.handler.name.includeTokens

Optional

true | false

false

Set to true to include a map field in output messages. The key is tokens and the value is a map where the keys and values are the token keys and values from the Oracle GoldenGate source trail file.

Set to false to suppress this field.

gg.handler.name.messageFormatting

Optional

row | op

row

Controls how output messages are modeled. Selecting row and the output messages will be modeled as row. Set to op and the output messages will be modeled as operations messages.

gg.handler.name.insertOpKey

Optional

any string

I

The value of the field op_type to indicate an insert operation.

gg.handler.name.updateOpKey

Optional

any string

U

The value of the field op_type to indicate an insert operation.

gg.handler.name.deleteOpKey

Optional

any string

D

The value of the field op_type to indicate a delete operation.

gg.handler.name.truncateOpKey

Optional

any string

T

The value of the field op_type to indicate a truncate operation.

gg.handler.name.treatAllColumnsAsStrings

Optional

true | false

false

Set to true to treat all output fields as strings. Set to false and the Handler will map the corresponding field type from the source trail file to the best corresponding Kafka Connect data type.

gg.handler.name.mapLargeNumbersAsStrings

Optional

true | false

false

Large numbers are mapping to number fields as Doubles. It is possible to lose precision in certain scenarios.

If set to true these fields will be mapped as Strings in order to preserve precision.

gg.handler.name.pkUpdateHandling

Optional

abend | update | delete-insert

abend

Only applicable if modeling row messages gg.handler.name.messageFormatting=row. Not applicable if modeling operations messages as the before and after images are propagated to the message in the case of an update.

gg.handler.name.metaColumnsTemplate

Optional

Any of the metacolumns keywords.

None

A comma-delimited string consisting of one or more templated values that represent the template, see Metacolumn Keywords.

gg.handler.name.includeIsMissingFields

Optional

true|false

true

Set to true to include an extract{column_name}.

Set this property for each column to allow downstream applications to differentiate if a null value is actually null in the source trail file or if it is missing in the source trail file.

gg.handler.name.enableDecimalLogicalType Optional true|false false Set to true to enable decimal logical types in Kafka Connect. Decimal logical types allow numbers which will not fit in a 64 bit data type to be represented.
gg.handler.name.oracleNumberScale Optional Positive Integer 38 Only applicable if gg.handler.name.enableDecimalLogicalType=true. Some source data types do not have a fixed scale associated with them. Scale must be set for Kafka Connectdecimal logical types. In the case of source types which do not have a scalein the metadata, the value of this parameter is used to set the scale.
gg.handler.name.EnableTimestampLogicalType Optional true|false false Set to true to enable the Kafka Connect timestamp logical type. The Kafka connect timestamp logical time is a integer measurement ofmilliseconds since the Java epoch. This means precision greater thanmilliseconds is not possible if the timestamp logica type is used. Use of this property requires that the gg.format.timestamp property be set. This property is the timestamp formatting string, which is used to determine the output of timestamps in string format. For example, gg.format.timestamp=yyyy-MM-dd HH:mm:ss.SSS. Ensure that the goldengate.userexit.timestamp property is not set in the configuration file. Setting this property prevents parsing the input timestamp into a Java object which is required for logical timestamps.
gg.handler.name.metaHeadersTemplate Optional Comma delimited list of metacolumn keywords. None Allows the user to select metacolumns to inject context-based key value pairs into Kafka message headers using the metacolumn keyword syntax. See Metacolumn Keywords.
gg.handler.name.schemaNamespace

Optional Any string without characters which violate the Kafka Connector Avro schema naming requirements. None Used to control the generated Kafka Connect schema name. If it is not set, then the schema name is the same as the qualified source table name. For example, if the source table is QASOURCE.TCUSTMER, then the Kafka Connect schema name will be the same.

This property allows you to control the generated schema name. For example, if this property is set to com.example.company, then the generated Kafka Connect schema name for the table QASOURCE.TCUSTMER is com.example.company.TCUSTMER.

gg.handler.name.enableNonnullable Optional true|false false The default behavior is to set all fields as nullable in the generated Kafka Connect schema. Set this parameter to true to honor the nullable value configured in the target metadata provided by the metadata provider. Setting this property to true can have some adverse side effects.
  1. Setting a field to non-nullable means the field must have a value to be valid. If a field is set as non-nullable and the value is null or missing in the source trail file, a runtime error will result.
  2. Setting a field to non-nullable means that truncate operations cannot be propagated. Truncate operations have no field values. The result is that the Kafka Connect converter serialization will field because there is no value for the field.
  3. A schema change resulting in the addition of a non-nullable field will cause a schema backwards compatibility exception in the Confluent schema registry. If this occurs, users will need to adjust or disable the compatibility configuration of the Confluent schema registry.

See Using Templates to Resolve the Stream Name and Partition Name for more information.

Review a Sample Configuration

gg.handlerlist=kafkaconnect
#The handler properties
gg.handler.kafkaconnect.type=kafkaconnect
gg.handler.kafkaconnect.kafkaProducerConfigFile=kafkaconnect.properties
gg.handler.kafkaconnect.mode=op
#The following selects the topic name based on the fully qualified table name
gg.handler.kafkaconnect.topicMappingTemplate=${fullyQualifiedTableName}
#The following selects the message key using the concatenated primary keys
gg.handler.kafkaconnect.keyMappingTemplate=${primaryKeys}
#The formatter properties
gg.handler.kafkaconnect.messageFormatting=row
gg.handler.kafkaconnect.insertOpKey=I
gg.handler.kafkaconnect.updateOpKey=U
gg.handler.kafkaconnect.deleteOpKey=D
gg.handler.kafkaconnect.truncateOpKey=T
gg.handler.kafkaconnect.treatAllColumnsAsStrings=false
gg.handler.kafkaconnect.pkUpdateHandling=abend
8.2.9.2.3.2 Using Templates to Resolve the Topic Name and Message Key

The Kafka Connect Handler provides functionality to resolve the topic name and the message key at runtime using a template configuration value. Templates allow you to configure static values and keywords. Keywords are used to dynamically replace the keyword with the context of the current processing. Templates are applicable to the following configuration parameters:

gg.handler.name.topicMappingTemplate
gg.handler.name.keyMappingTemplate

Template Modes

The Kafka Connect Handler can only send operation messages. The Kafka Connect Handler cannot group operation messages into a larger transaction message.

For more information about the Template Keywords, see Template Keywords.
For example templates, see Example Templates.
8.2.9.2.3.3 Configuring Security in the Kafka Connect Handler

Kafka version 0.9.0.0 introduced security through SSL/TLS or Kerberos. The Kafka Connect Handler can be secured using SSL/TLS or Kerberos. The Kafka producer client libraries provide an abstraction of security functionality from the integrations utilizing those libraries. The Kafka Connect Handler is effectively abstracted from security functionality. Enabling security requires setting up security for the Kafka cluster, connecting machines, and then configuring the Kafka Producer properties file, that the Kafka Handler uses for processing, with the required security properties.

You may encounter the inability to decrypt the Kerberos password from the keytab file. This causes the Kerberos authentication to fall back to interactive mode which cannot work because it is being invoked programmatically. The cause of this problem is that the Java Cryptography Extension (JCE) is not installed in the Java Runtime Environment (JRE). Ensure that the JCE is loaded in the JRE, see http://www.oracle.com/technetwork/java/javase/downloads/jce8-download-2133166.html.

8.2.9.2.4 Connecting to a Secure Schema Registry

The customer topology for Kafka Connect may include a schema registry which is secured. This topic shows how to set the Kafka producer properties configured for connectivity to a secured schema registry.

SSL Mutual Auth
key.converter.schema.registry.ssl.truststore.location=
key.converter.schema.registry.ssl.truststore.password=
key.converter.schema.registry.ssl.keystore.location=
key.converter.schema.registry.ssl.keystore.password=
key.converter.schema.registry.ssl.key.password=
value.converter.schema.registry.ssl.truststore.location=
value.converter.schema.registry.ssl.truststore.password=
value.converter.schema.registry.ssl.keystore.location=
value.converter.schema.registry.ssl.keystore.password=
value.converter.schema.registry.ssl.key.password=

SSL Basic Auth

key.converter.basic.auth.credentials.source=USER_INFO
key.converter.basic.auth.user.info=username:password
key.converter.schema.registry.ssl.truststore.location=
key.converter.schema.registry.ssl.truststore.password=
value.converter.basic.auth.credentials.source=USER_INFO
value.converter.basic.auth.user.info=username:password
value.converter.schema.registry.ssl.truststore.location=
value.converter.schema.registry.ssl.truststore.password=

8.2.9.2.5 Kafka Connect Handler Performance Considerations

There are multiple configuration settings both for the Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA) configuration and in the Kafka producer which affect performance.

The Oracle GoldenGate parameter have the greatest affect on performance is the Replicat GROUPTRANSOPS parameter. The GROUPTRANSOPS parameter allows Replicat to group multiple source transactions into a single target transaction. At transaction commit, the Kafka Connect Handler calls flush on the Kafka Producer to push the messages to Kafka for write durability followed by a checkpoint. The flush call is an expensive call and setting the Replicat GROUPTRANSOPS setting to larger amount allows the replicat to call the flush call less frequently thereby improving performance.

The default setting for GROUPTRANSOPS is 1000 and performance improvements can be obtained by increasing the value to 2500, 5000, or even 10000.

The Op mode gg.handler.kafkaconnect.mode=op parameter can also improve performance than the Tx mode gg.handler.kafkaconnect.mode=tx.

A number of Kafka Producer properties can affect performance. The following are the parameters with significant impact:

  • linger.ms

  • batch.size

  • acks

  • buffer.memory

  • compression.type

Oracle recommends that you start with the default values for these parameters and perform performance testing to obtain a base line for performance. Review the Kafka documentation for each of these parameters to understand its role and adjust the parameters and perform additional performance testing to ascertain the performance effect of each parameter.

8.2.9.2.6 Kafka Interceptor Support

The Kafka Producer client framework supports the use of Producer Interceptors. A Producer Interceptor is simply a user exit from the Kafka Producer client whereby the Interceptor object is instantiated and receives notifications of Kafka message send calls and Kafka message send acknowledgement calls.

The typical use case for Interceptors is monitoring. Kafka Producer Interceptors must conform to the interface org.apache.kafka.clients.producer.ProducerInterceptor. The Kafka Connect Handler supports Producer Interceptor usage.

The requirements to using Interceptors in the Handlers are as follows:

  • The Kafka Producer configuration property "interceptor.classes" must be configured with the class name of the Interceptor(s) to be invoked.
  • In order to invoke the Interceptor(s), the jar files plus any dependency jars must be available to the JVM. Therefore, the jar files containing the Interceptor(s) plus any dependency jars must be added to the gg.classpath in the Handler configuration file.

    For more information, see Kafka documentation.

8.2.9.2.7 Kafka Partition Selection

Kafka topics comprise one or more partitions. Distribution to multiple partitions is a good way to improve Kafka ingest performance, because the Kafka client parallelizes message sending to different topic/partition combinations. Partition selection is controlled by a following calculation in the Kafka client.

(Hash of the Kafka message key) modulus (the number of partitions) = selected partition number

The Kafka message key is selected by the following configuration value:

gg.handler.{your handler name}.keyMappingTemplate=

If this parameter is set to a value which generates a static key, all messages will go to the same partition. The following is example of static keys:

gg.handler.{your handler name}.keyMappingTemplate=StaticValue

If this parameter is set to a value which generates a key that changes infrequently, partition selection changes infrequently. In the following example the table name is used as the message key. Every operation for a specific source table will have the same key and thereby route to the same partition:

gg.handler.{your handler name}.keyMappingTemplate=${tableName}
A null Kafka message key distributes to the partitions on a round-robin basis. To do this, set the following:
gg.handler.{your handler name}.keyMappingTemplate=${null}

The recommended setting for configuration of the mapping key is the following:

gg.handler.{your handler name}.keyMappingTemplate=${primaryKeys}

This generates a Kafka message key that is the concatenated and delimited primary key values.

Operations for each row should have a unique primary key(s) thereby generating a unique Kafka message key for each row. Another important consideration is Kafka messages sent to different partitions are not guaranteed to be delivered to a Kafka consumer in the original order sent. This is part of the Kafka specification. Order is only maintained within a partition. Using primary keys as the Kafka message key means that operations for the same row, which have the same primary key(s), generate the same Kafka message key, and therefore are sent to the same Kafka partition. In this way, the order is maintained for operations for the same row.

At the DEBUG log level the Kafka message coordinates (topic, partition, and offset) are logged to the .log file for successfully sent messages.

8.2.9.2.8 Troubleshooting the Kafka Connect Handler

8.2.9.2.8.1 Java Classpath for Kafka Connect Handler

Issues with the Java classpath are one of the most common problems. The indication of a classpath problem is a ClassNotFoundException in the Oracle GoldenGate Java log4j log file or and error while resolving the classpath if there is a typographic error in the gg.classpath variable.

The Kafka client libraries do not ship with the Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA) product. You are required to obtain the correct version of the Kafka client libraries and to properly configure the gg.classpath property in the Java Adapter Properties file to correctly resolve the Java the Kafka client libraries as described in Setting Up and Running the Kafka Connect Handler.

8.2.9.2.8.2 Invalid Kafka Version

Kafka Connect was introduced in Kafka 0.9.0.0 version. The Kafka Connect Handler does not work with Kafka versions 0.8.2.2 and older. Attempting to use Kafka Connect with Kafka 0.8.2.2 version typically results in a ClassNotFoundException error at runtime.

8.2.9.2.8.3 Kafka Producer Properties File Not Found

Typically, the following exception message occurs:

ERROR 2015-11-11 11:49:08,482 [main] Error loading the kafka producer properties

Verify that the gg.handler.kafkahandler.KafkaProducerConfigFile configuration property for the Kafka Producer Configuration file name is set correctly.

Ensure that the gg.classpath variable includes the path to the Kafka Producer properties file and that the path to the properties file does not contain a * wildcard at the end.

8.2.9.2.8.4 Kafka Connection Problem

Typically, the following exception message appears:

WARN 2015-11-11 11:25:50,784 [kafka-producer-network-thread | producer-1] 

WARN  (Selector.java:276) - Error in I/O with localhost/127.0.0.1  java.net.ConnectException: Connection refused

When this occurs, the connection retry interval expires and the Kafka Connection Handler process abends. Ensure that the Kafka Brokers are running and that the host and port provided in the Kafka Producer properties file is correct.

Network shell commands (such as, netstat -l) can be used on the machine hosting the Kafka broker to verify that Kafka is listening on the expected port.

8.2.9.2.9 Kafka Connect Handler Client Dependencies

What are the dependencies for the Kafka Connect Handler to connect to Apache Kafka Connect databases?

The maven central repository artifacts for Kafka Connect databases are:

Maven groupId: org.apache.kafka

Maven artifactId: kafka-clients & connect-json

Maven version: the Kafka Connect version numbers listed for each section

8.2.9.2.9.1 Kafka 2.8.0
connect-api-2.8.0.jar
connect-json-2.8.0.jar
jackson-annotations-2.10.5.jar
jackson-core-2.10.5.jar
jackson-databind-2.10.5.1.jar
jackson-datatype-jdk8-2.10.5.jar
javax.ws.rs-api-2.1.1.jar
kafka-clients-2.8.0.jar
lz4-java-1.7.1.jar
slf4j-api-1.7.30.jar
snappy-java-1.1.8.1.jar
zstd-jni-1.4.9-1.jar
8.2.9.2.9.2 Kafka 2.7.1
connect-api-2.7.1.jar
connect-json-2.7.1.jar
jackson-annotations-2.10.5.jar
jackson-core-2.10.5.jar
jackson-databind-2.10.5.1.jar
jackson-datatype-jdk8-2.10.5.jar
javax.ws.rs-api-2.1.1.jar
kafka-clients-2.7.1.jar
lz4-java-1.7.1.jar
slf4j-api-1.7.30.jar
snappy-java-1.1.7.7.jar
zstd-jni-1.4.5-6.jar
8.2.9.2.9.3 Kafka 2.6.0
connect-api-2.6.0.jar
connect-json-2.6.0.jar
jackson-annotations-2.10.2.jar
jackson-core-2.10.2.jar
jackson-databind-2.10.2.jar
jackson-datatype-jdk8-2.10.2.jar
javax.ws.rs-api-2.1.1.jar
kafka-clients-2.6.0.jar
lz4-java-1.7.1.jar
slf4j-api-1.7.30.jar
snappy-java-1.1.7.3.jar
zstd-jni-1.4.4-7.jar
8.2.9.2.9.4 Kafka 2.5.1
connect-api-2.5.1.jar
connect-json-2.5.1.jar
jackson-annotations-2.10.2.jar
jackson-core-2.10.2.jar
jackson-databind-2.10.2.jar
jackson-datatype-jdk8-2.10.2.jar
javax.ws.rs-api-2.1.1.jar
kafka-clients-2.5.1.jar
lz4-java-1.7.1.jar
slf4j-api-1.7.30.jar
snappy-java-1.1.7.3.jar
zstd-jni-1.4.4-7.jar
8.2.9.2.9.5 Kafka 2.4.1
kafka-clients-2.4.1.jar
lz4-java-1.6.0.jar
slf4j-api-1.7.28.jar
snappy-java-1.1.7.3.jar
zstd-jni-1.4.3-1.jarr
8.2.9.2.9.6 Kafka 2.3.1
connect-api-2.3.1.jar
connect-json-2.3.1.jar
jackson-annotations-2.10.0.jar
jackson-core-2.10.0.jar
jackson-databind-2.10.0.jar
jackson-datatype-jdk8-2.10.0.jar
javax.ws.rs-api-2.1.1.jar
kafka-clients-2.3.1.jar
lz4-java-1.6.0.jar
slf4j-api-1.7.26.jar
snappy-java-1.1.7.3.jar
zstd-jni-1.4.0-1.jar
8.2.9.2.9.7 Kafka 2.2.1
kafka-clients-2.2.1.jar
lz4-java-1.5.0.jar
slf4j-api-1.7.25.jar
snappy-java-1.1.7.2.jar
zstd-jni-1.3.8-1.jar
8.2.9.2.9.8 Kafka 2.1.1
audience-annotations-0.5.0.jar
connect-api-2.1.1.jar
connect-json-2.1.1.jar
jackson-annotations-2.9.0.jar
jackson-core-2.9.8.jar
jackson-databind-2.9.8.jar
javax.ws.rs-api-2.1.1.jar
jopt-simple-5.0.4.jar
kafka_2.12-2.1.1.jar
kafka-clients-2.1.1.jar
lz4-java-1.5.0.jar
metrics-core-2.2.0.jar
scala-library-2.12.7.jar
scala-logging_2.12-3.9.0.jar
scala-reflect-2.12.7.jar
slf4j-api-1.7.25.jar
snappy-java-1.1.7.2.jar
zkclient-0.11.jar
zookeeper-3.4.13.jar
zstd-jni-1.3.7-1.jar
8.2.9.2.9.9 Kafka 2.0.1
audience-annotations-0.5.0.jar
connect-api-2.0.1.jar
connect-json-2.0.1.jar
jackson-annotations-2.9.0.jar
jackson-core-2.9.7.jar
jackson-databind-2.9.7.jar
javax.ws.rs-api-2.1.jar
jopt-simple-5.0.4.jar
kafka_2.12-2.0.1.jar
kafka-clients-2.0.1.jar
lz4-java-1.4.1.jar
metrics-core-2.2.0.jar
scala-library-2.12.6.jar
scala-logging_2.12-3.9.0.jar
scala-reflect-2.12.6.jar
slf4j-api-1.7.25.jar
snappy-java-1.1.7.1.jar
zkclient-0.10.jar
zookeeper-3.4.13.jar
8.2.9.2.9.10 Kafka 1.1.1
kafka-clients-1.1.1.jar
lz4-java-1.4.1.jar
slf4j-api-1.7.25.jar
snappy-java-1.1.7.1.jar
8.2.9.2.9.11 Kafka 1.0.2
kafka-clients-1.0.2.jar
lz4-java-1.4.jar
slf4j-api-1.7.25.jar
snappy-java-1.1.4.jar
8.2.9.2.9.12 Kafka  0.11.0.0
connect-api-0.11.0.0.jar
connect-json-0.11.0.0.jar
jackson-annotations-2.8.0.jar
jackson-core-2.8.5.jar
jackson-databind-2.8.5.jar
jopt-simple-5.0.3.jar
kafka_2.11-0.11.0.0.jar
kafka-clients-0.11.0.0.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
metrics-core-2.2.0.jar
scala-library-2.11.11.jar
scala-parser-combinators_2.11-1.0.4.jar
slf4j-api-1.7.25.jar
slf4j-log4j12-1.7.25.jar
snappy-java-1.1.2.6.jar
zkclient-0.10.jar
zookeeper-3.4.10.jar
8.2.9.2.9.13 Kafka 0.10.2.0
connect-api-0.10.2.0.jar 
connect-json-0.10.2.0.jar 
jackson-annotations-2.8.0.jar 
jackson-core-2.8.5.jar 
jackson-databind-2.8.5.jar 
jopt-simple-5.0.3.jar 
kafka_2.11-0.10.2.0.jar 
kafka-clients-0.10.2.0.jar 
log4j-1.2.17.jar 
lz4-1.3.0.jar 
metrics-core-2.2.0.jar 
scala-library-2.11.8.jar 
scala-parser-combinators_2.11-1.0.4.jar 
slf4j-api-1.7.21.jar 
slf4j-log4j12-1.7.21.jar 
snappy-java-1.1.2.6.jar 
zkclient-0.10.jar 
zookeeper-3.4.9.jar
8.2.9.2.9.14 Kafka 0.10.2.0
connect-api-0.10.1.1.jar 
connect-json-0.10.1.1.jar 
jackson-annotations-2.6.0.jar 
jackson-core-2.6.3.jar 
jackson-databind-2.6.3.jar 
jline-0.9.94.jar 
jopt-simple-4.9.jar 
kafka_2.11-0.10.1.1.jar 
kafka-clients-0.10.1.1.jar 
log4j-1.2.17.jar 
lz4-1.3.0.jar 
metrics-core-2.2.0.jar 
netty-3.7.0.Final.jar 
scala-library-2.11.8.jar 
scala-parser-combinators_2.11-1.0.4.jar 
slf4j-api-1.7.21.jar 
slf4j-log4j12-1.7.21.jar 
snappy-java-1.1.2.6.jar 
zkclient-0.9.jar 
zookeeper-3.4.8.jar
8.2.9.2.9.15 Kafka 0.10.0.0
activation-1.1.jar 
connect-api-0.10.0.0.jar 
connect-json-0.10.0.0.jar 
jackson-annotations-2.6.0.jar 
jackson-core-2.6.3.jar 
jackson-databind-2.6.3.jar 
jline-0.9.94.jar 
jopt-simple-4.9.jar 
junit-3.8.1.jar 
kafka_2.11-0.10.0.0.jar 
kafka-clients-0.10.0.0.jar 
log4j-1.2.15.jar 
lz4-1.3.0.jar 
mail-1.4.jar 
metrics-core-2.2.0.jar 
netty-3.7.0.Final.jar 
scala-library-2.11.8.jar 
scala-parser-combinators_2.11-1.0.4.jar 
slf4j-api-1.7.21.jar 
slf4j-log4j12-1.7.21.jar 
snappy-java-1.1.2.4.jar 
zkclient-0.8.jar 
zookeeper-3.4.6.jar
8.2.9.2.9.16 Kafka 0.9.0.1
activation-1.1.jar 
connect-api-0.9.0.1.jar 
connect-json-0.9.0.1.jar 
jackson-annotations-2.5.0.jar 
jackson-core-2.5.4.jar 
jackson-databind-2.5.4.jar 
jline-0.9.94.jar 
jopt-simple-3.2.jar 
junit-3.8.1.jar 
kafka_2.11-0.9.0.1.jar 
kafka-clients-0.9.0.1.jar 
log4j-1.2.15.jar 
lz4-1.2.0.jar 
mail-1.4.jar 
metrics-core-2.2.0.jar 
netty-3.7.0.Final.jar 
scala-library-2.11.7.jar 
scala-parser-combinators_2.11-1.0.4.jar 
scala-xml_2.11-1.0.4.jar 
slf4j-api-1.7.6.jar 
slf4j-log4j12-1.7.6.jar 
snappy-java-1.1.1.7.jar 
zkclient-0.7.jar 
zookeeper-3.4.6.jar
8.2.9.2.9.16.1 Confluent Dependencies

Note:

The Confluent dependencies listed below are for the Kafka Connect Avro Converter and the assocated Avro Schema Registry client. When integrated with Confluent Kafka Connect, the below dependencies are required in addition to the Kafka Connect dependencies for the corresponding Kafka version which are listed in the previous sections.

8.2.9.2.9.16.1.1 Confluent 6.2.0
avro-1.10.1.jar
commons-compress-1.20.jar
common-utils-6.2.0.jar
connect-api-6.2.0-ccs.jar
connect-json-6.2.0-ccs.jar
jackson-annotations-2.10.5.jar
jackson-core-2.11.3.jar
jackson-databind-2.10.5.1.jar
jackson-datatype-jdk8-2.10.5.jar
jakarta.annotation-api-1.3.5.jar
jakarta.inject-2.6.1.jar
jakarta.ws.rs-api-2.1.6.jar
javax.ws.rs-api-2.1.1.jar
jersey-common-2.34.jar
kafka-avro-serializer-6.2.0.jar
kafka-clients-6.2.0-ccs.jar
kafka-connect-avro-converter-6.2.0.jar
kafka-connect-avro-data-6.2.0.jar
kafka-schema-registry-client-6.2.0.jar
kafka-schema-serializer-6.2.0.jar
lz4-java-1.7.1.jar
osgi-resource-locator-1.0.3.jar
slf4j-api-1.7.30.jar
snappy-java-1.1.8.1.jar
swagger-annotations-1.6.2.jar
zstd-jni-1.4.9-1.jar
8.2.9.2.9.16.1.2 Confluent 6.1.0
avro-1.9.2.jar
commons-compress-1.19.jar
common-utils-6.1.0.jar
connect-api-6.1.0-ccs.jar
connect-json-6.1.0-ccs.jar
jackson-annotations-2.10.5.jar
jackson-core-2.10.2.jar
jackson-databind-2.10.5.1.jar
jackson-datatype-jdk8-2.10.5.jar
jakarta.annotation-api-1.3.5.jar
jakarta.inject-2.6.1.jar
jakarta.ws.rs-api-2.1.6.jar
javax.ws.rs-api-2.1.1.jar
jersey-common-2.31.jar
kafka-avro-serializer-6.1.0.jar
kafka-clients-6.1.0-ccs.jar
kafka-connect-avro-converter-6.1.0.jar
kafka-connect-avro-data-6.1.0.jar
kafka-schema-registry-client-6.1.0.jar
kafka-schema-serializer-6.1.0.jar
lz4-java-1.7.1.jar
osgi-resource-locator-1.0.3.jar
slf4j-api-1.7.30.jar
snappy-java-1.1.7.7.jar
swagger-annotations-1.6.2.jar
zstd-jni-1.4.5-6.jar
8.2.9.2.9.16.1.3 Confluent 6.0.0
avro-1.9.2.jar
commons-compress-1.19.jar
common-utils-6.0.0.jar
connect-api-6.0.0-ccs.jar
connect-json-6.0.0-ccs.jar
jackson-annotations-2.10.5.jar
jackson-core-2.10.2.jar
jackson-databind-2.10.5.jar
jackson-datatype-jdk8-2.10.5.jar
jakarta.annotation-api-1.3.5.jar
jakarta.inject-2.6.1.jar
jakarta.ws.rs-api-2.1.6.jar
javax.ws.rs-api-2.1.1.jar
jersey-common-2.30.jar
kafka-avro-serializer-6.0.0.jar
kafka-clients-6.0.0-ccs.jar
kafka-connect-avro-converter-6.0.0.jar
kafka-connect-avro-data-6.0.0.jar
kafka-schema-registry-client-6.0.0.jar
kafka-schema-serializer-6.0.0.jar
lz4-java-1.7.1.jar
osgi-resource-locator-1.0.3.jar
slf4j-api-1.7.30.jar
snappy-java-1.1.7.3.jar
swagger-annotations-1.6.2.jar
zstd-jni-1.4.4-7.jar
8.2.9.2.9.16.1.4 Confluent 5.5.0
avro-1.9.2.jar
classmate-1.3.4.jar
common-config-5.5.0.jar
commons-compress-1.19.jar
commons-lang3-3.2.1.jar
common-utils-5.5.0.jar
connect-api-5.5.0-ccs.jar
connect-json-5.5.0-ccs.jar
guava-18.0.jar
hibernate-validator-6.0.17.Final.jar
jackson-annotations-2.10.2.jar
jackson-core-2.10.2.jar
jackson-databind-2.10.2.jar
jackson-dataformat-yaml-2.4.5.jar
jackson-datatype-jdk8-2.10.2.jar
jackson-datatype-joda-2.4.5.jar
jakarta.annotation-api-1.3.5.jar
jakarta.el-3.0.2.jar
jakarta.el-api-3.0.3.jar
jakarta.inject-2.6.1.jar
jakarta.validation-api-2.0.2.jar
jakarta.ws.rs-api-2.1.6.jar
javax.ws.rs-api-2.1.1.jar
jboss-logging-3.3.2.Final.jar
jersey-bean-validation-2.30.jar
jersey-client-2.30.jar
jersey-common-2.30.jar
jersey-media-jaxb-2.30.jar
jersey-server-2.30.jar
joda-time-2.2.jar
kafka-avro-serializer-5.5.0.jar
kafka-clients-5.5.0-ccs.jar
kafka-connect-avro-converter-5.5.0.jar
kafka-connect-avro-data-5.5.0.jar
kafka-schema-registry-client-5.5.0.jar
kafka-schema-serializer-5.5.0.jar
lz4-java-1.7.1.jar
osgi-resource-locator-1.0.3.jar
slf4j-api-1.7.30.jar
snakeyaml-1.12.jar
snappy-java-1.1.7.3.jar
swagger-annotations-1.5.22.jar
swagger-core-1.5.3.jar
swagger-models-1.5.3.jar
zstd-jni-1.4.4-7.jar
8.2.9.2.9.16.1.5 Confluent 5.4.0
avro-1.9.1.jar
common-config-5.4.0.jar
commons-compress-1.19.jar
commons-lang3-3.2.1.jar
common-utils-5.4.0.jar
connect-api-5.4.0-ccs.jar
connect-json-5.4.0-ccs.jar
guava-18.0.jar
jackson-annotations-2.9.10.jar
jackson-core-2.9.9.jar
jackson-databind-2.9.10.1.jar
jackson-dataformat-yaml-2.4.5.jar
jackson-datatype-jdk8-2.9.10.jar
jackson-datatype-joda-2.4.5.jar
javax.ws.rs-api-2.1.1.jar
joda-time-2.2.jar
kafka-avro-serializer-5.4.0.jar
kafka-clients-5.4.0-ccs.jar
kafka-connect-avro-converter-5.4.0.jar
kafka-schema-registry-client-5.4.0.jar
lz4-java-1.6.0.jar
slf4j-api-1.7.28.jar
snakeyaml-1.12.jar
snappy-java-1.1.7.3.jar
swagger-annotations-1.5.22.jar
swagger-core-1.5.3.jar
swagger-models-1.5.3.jar
zstd-jni-1.4.3-1.jar
8.2.9.2.9.16.1.6 Confluent 5.3.0
audience-annotations-0.5.0.jar
avro-1.8.1.jar
common-config-5.3.0.jar
commons-compress-1.8.1.jar
common-utils-5.3.0.jar
connect-api-5.3.0-ccs.jar
connect-json-5.3.0-ccs.jar
jackson-annotations-2.9.0.jar
jackson-core-2.9.9.jar
jackson-core-asl-1.9.13.jar
jackson-databind-2.9.9.jar
jackson-datatype-jdk8-2.9.9.jar
jackson-mapper-asl-1.9.13.jar
javax.ws.rs-api-2.1.1.jar
jline-0.9.94.jar
jsr305-3.0.2.jar
kafka-avro-serializer-5.3.0.jar
kafka-clients-5.3.0-ccs.jar
kafka-connect-avro-converter-5.3.0.jar
kafka-schema-registry-client-5.3.0.jar
lz4-java-1.6.0.jar
netty-3.10.6.Final.jar
paranamer-2.7.jar
slf4j-api-1.7.26.jar
snappy-java-1.1.1.3.jar
spotbugs-annotations-3.1.9.jar
xz-1.5.jar
zkclient-0.10.jar
zookeeper-3.4.14.jar
zstd-jni-1.4.0-1.jar
8.2.9.2.9.16.1.7 Confluent 5.2.1
audience-annotations-0.5.0.jar
avro-1.8.1.jar
common-config-5.2.1.jar
commons-compress-1.8.1.jar
common-utils-5.2.1.jar
connect-api-2.2.0-cp2.jar
connect-json-2.2.0-cp2.jar
jackson-annotations-2.9.0.jar
jackson-core-2.9.8.jar
jackson-core-asl-1.9.13.jar
jackson-databind-2.9.8.jar
jackson-datatype-jdk8-2.9.8.jar
jackson-mapper-asl-1.9.13.jar
javax.ws.rs-api-2.1.1.jar
jline-0.9.94.jar
kafka-avro-serializer-5.2.1.jar
kafka-clients-2.2.0-cp2.jar
kafka-connect-avro-converter-5.2.1.jar
kafka-schema-registry-client-5.2.1.jar
lz4-java-1.5.0.jar
netty-3.10.6.Final.jar
paranamer-2.7.jar
slf4j-api-1.7.25.jar
snappy-java-1.1.1.3.jar
xz-1.5.jar
zkclient-0.10.jar
zookeeper-3.4.13.jar
zstd-jni-1.3.8-1.jar
8.2.9.2.9.16.1.8 Confluent 5.1.3
audience-annotations-0.5.0.jar
avro-1.8.1.jar
common-config-5.1.3.jar
commons-compress-1.8.1.jar
common-utils-5.1.3.jar
connect-api-2.1.1-cp3.jar
connect-json-2.1.1-cp3.jar
jackson-annotations-2.9.0.jar
jackson-core-2.9.8.jar
jackson-core-asl-1.9.13.jar
jackson-databind-2.9.8.jar
jackson-mapper-asl-1.9.13.jar
javax.ws.rs-api-2.1.1.jar
jline-0.9.94.jar
kafka-avro-serializer-5.1.3.jar
kafka-clients-2.1.1-cp3.jar
kafka-connect-avro-converter-5.1.3.jar
kafka-schema-registry-client-5.1.3.jar
lz4-java-1.5.0.jar
netty-3.10.6.Final.jar
paranamer-2.7.jar
slf4j-api-1.7.25.jar
snappy-java-1.1.1.3.jar
xz-1.5.jar
zkclient-0.10.jar
zookeeper-3.4.13.jar
zstd-jni-1.3.7-1.jar
8.2.9.2.9.16.1.9 Confluent 5.0.3
audience-annotations-0.5.0.jar
avro-1.8.1.jar
common-config-5.0.3.jar
commons-compress-1.8.1.jar
common-utils-5.0.3.jar
connect-api-2.0.1-cp4.jar
connect-json-2.0.1-cp4.jar
jackson-annotations-2.9.0.jar
jackson-core-2.9.7.jar
jackson-core-asl-1.9.13.jar
jackson-databind-2.9.7.jar
jackson-mapper-asl-1.9.13.jar
javax.ws.rs-api-2.1.jar
jline-0.9.94.jar
kafka-avro-serializer-5.0.3.jar
kafka-clients-2.0.1-cp4.jar
kafka-connect-avro-converter-5.0.3.jar
kafka-schema-registry-client-5.0.3.jar
lz4-java-1.4.1.jar
netty-3.10.6.Final.jar
paranamer-2.7.jar
slf4j-api-1.7.25.jar
snappy-java-1.1.1.3.jar
xz-1.5.jar
zkclient-0.10.jar
zookeeper-3.4.13.jar
8.2.9.2.9.16.1.10 Confluent 4.1.2
avro-1.8.1.jar
common-config-4.1.2.jar
commons-compress-1.8.1.jar
common-utils-4.1.2.jar
connect-api-1.1.1-cp1.jar
connect-json-1.1.1-cp1.jar
jackson-annotations-2.9.0.jar
jackson-core-2.9.6.jar
jackson-core-asl-1.9.13.jar
jackson-databind-2.9.6.jar
jackson-mapper-asl-1.9.13.jar
jline-0.9.94.jar
kafka-avro-serializer-4.1.2.jar
kafka-clients-1.1.1-cp1.jar
kafka-connect-avro-converter-4.1.2.jar
kafka-schema-registry-client-4.1.2.jar
log4j-1.2.16.jar
lz4-java-1.4.1.jar
netty-3.10.5.Final.jar
paranamer-2.7.jar
slf4j-api-1.7.25.jar
slf4j-log4j12-1.6.1.jar
snappy-java-1.1.1.3.jar
xz-1.5.jar
zkclient-0.10.jar
zookeeper-3.4.10.jar

8.2.9.3 Apache Kafka REST Proxy

The Kafka REST Proxy Handler to stream messages to the Kafka REST Proxy distributed by Confluent.

This chapter describes how to use the Kafka REST Proxy Handler.

8.2.9.3.1 Overview

The Kafka REST Proxy Handler allows Kafka messages to be streamed using an HTTPS protocol. The use case for this functionality is to stream Kafka messages from an Oracle GoldenGate On Premises installation to cloud or alternately from cloud to cloud.

The Kafka REST proxy provides a RESTful interface to a Kafka cluster. It makes it easy for you to:

  • produce and consume messages,

  • view the state of the cluster,

  • and perform administrative actions without using the native Kafka protocol or clients.

Kafka REST Proxy is part of the Confluent Open Source and Confluent Enterprise distributions. It is not available in the Apache Kafka distribution. To access Kafka through the REST proxy, you have to install the Confluent Kafka version see https://docs.confluent.io/current/kafka-rest/docs/index.html.

8.2.9.3.2 Setting Up and Starting the Kafka REST Proxy Handler Services

You have several installation formats to choose from including ZIP or tar archives, Docker, and Packages.

8.2.9.3.2.1 Using the Kafka REST Proxy Handler

You must download and install the Confluent Open Source or Confluent Enterprise Distribution because the Kafka REST Proxy is not included in Apache, Cloudera, or Hortonworks. You have several installation formats to choose from including ZIP or TAR archives, Docker, and Packages.

The Kafka REST Proxy has dependency on ZooKeeper, Kafka, and the Schema Registry

8.2.9.3.2.2 Downloading the Dependencies

You can review and download the Jersey RESTful Web Services in Java client dependency from:

https://eclipse-ee4j.github.io/jersey/.

You can review and download the Jersey Apache Connector dependencies from the maven repository: https://mvnrepository.com/artifact/org.glassfish.jersey.connectors/jersey-apache-connector

8.2.9.3.2.3 Classpath Configuration

The Kafka REST Proxy handler uses the Jersey project jersey-client version 2.27 and jersey-connectors-apache version 2.27 to connect to Kafka. Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA) does not include the required dependencies so you must obtain them, see Downloading the Dependencies.

You have to configure these dependencies using the gg.classpath property in the Java Adapter properties file. This is an example of a correctly configured classpath for the Kafka REST Proxy Handler:

gg.classpath=dirprm:
{path_to_jersey_client_jars}/jaxrs-ri/lib/*:{path_to_jersey_client_jars}
/jaxrs-ri/api/*
:{path_to_jersey_client_jars}/jaxrs-ri/ext/*:{path_to_jersey_client_jars}
/connector/*
8.2.9.3.2.4 Kafka REST Proxy Handler Configuration

The following are the configurable values for the Kafka REST Proxy Handler. Oracle recommend that you store the Kafka REST Proxy properties file in the Oracle GoldenGate dirprm directory.

To enable the selection of the Kafka REST Proxy Handler, you must first configure the handler type by specifying gg.handler.name.type=kafkarestproxy and the other Kafka REST Proxy Handler properties as follows:

Properties Required/ Optional Legal Values Default Explanation

gg.handler.name.type

Required

kafkarestproxy

None

The configuration to select the Kafka REST Proxy Handler.

gg.handler.name.topicMappingTemplate

Required

A template string value to resolve the Kafka topic name at runtime.

None

See Using Templates to Resolve the Topic Name and Message Key.

gg.handler.name.keyMappingTemplate

Required

A template string value to resolve the Kafka message key at runtime.

None

See Using Templates to Resolve the Topic Name and Message Key.

gg.handler.name.postDataUrl

Required

The Listener address of the Rest Proxy.

None

Set to the URL of the Kafka REST proxy.

gg.handler.name.format

Required

avro | json

None

Set to the REST proxy payload data format

gg.handler.name.payloadsize

Optional

A value representing the payload size in mega bytes.

5MB

Set to the maximum size of the payload of the HTTP messages.

gg.handler.name.apiVersion

Optional

v1 | v2

v2

Sets the API version to use.

gg.handler.name.mode

Optional

op | tx

op

Sets how operations are processed. In op mode, operations are processed as they are received. In tx mode, operations are cached and processed at the transaction commit.

gg.handler.name.trustStore

Optional

Path to the truststore.

None

Path to the truststore file that holds certificates from trusted certificate authorities (CA). These CAs are used to verify certificates presented by the server during an SSL connection, see Generating a Keystore or Truststore.

gg.handler.name.trustStorePassword

Optional

Password of the truststore.

None

The truststore password.

gg.handler.name.keyStore

Optional

Path to the keystore.

None

Path to the keystore file that the private key and identity certificate, which are presented to other parties (server or client) to verify its identity, see Generating a Keystore or Truststore.

gg.handler.name.keyStorePassword

Optional

Password of the keystore.

None

The keystore password.

gg.handler.name.proxy

Optional

http://host:port

None

Proxy URL in the following format: http://host:port

gg.handler.name.proxyUserName

Optional

Any string.

None

The proxy user name.

gg.handler.name.proxyPassword

Optional

Any string.

None

The proxy password.

gg.handler.name.readTimeout

Optional

Integer value.

None

The amount of time allowed for the server to respond.

gg.handler.name.connectionTimeout

Optional

Integer value.

None

The amount of time to wait to establish the connection to the host.

gg.handler.name.format.metaColumnsTemplate Optional

${alltokens} | ${token} | ${env} | ${sys} | ${javaprop} | ${optype} | ${position} | ${timestamp} | ${catalog} | ${schema} | ${table} | ${objectname} | ${csn} | ${xid} | ${currenttimestamp} | ${opseqno} | ${timestampmicro} | ${currenttimestampmicro} |

${txind}

| ${primarykeycolumns}|${currenttimestampiso8601}${static}${segno} | ${rba}

None

${alltokens} | ${token} | ${env} | ${sys} | ${javaprop} | ${optype} | ${position} | ${timestamp} | ${catalog} | ${schema} | ${table} | ${objectname} | ${csn} | ${xid} | ${currenttimestamp} | ${opseqno} | ${timestampmicro} | ${currenttimestampmicro} |

${txind}

| ${primarykeycolumns}|${currenttimestampiso8601}${static}${segno} | ${rba}

It is a comma-delimited string consisting of one or more templated values that represent the template. For more information about the Metacolumn keywords, see Metacolumn Keywords. This is an example that would produce a list of metacolumns:
${optype}, ${token.ROWID}, ${sys.username}, ${currenttimestamp}

See Using Templates to Resolve the Stream Name and Partition Name for more information.

8.2.9.3.2.5 Review a Sample Configuration

The following is a sample configuration for the Kafka REST Proxy Handler from the Java Adapter properties file:

gg.handlerlist=kafkarestproxy

#The handler properties
gg.handler.kafkarestproxy.type=kafkarestproxy
#The following selects the topic name based on the fully qualified table name
gg.handler.kafkarestproxy.topicMappingTemplate=${fullyQualifiedTableName}
#The following selects the message key using the concatenated primary keys
gg.handler.kafkarestproxy.keyMappingTemplate=${primaryKeys}
gg.handler.kafkarestproxy.postDataUrl=http://localhost:8083
gg.handler.kafkarestproxy.apiVersion=v1
gg.handler.kafkarestproxy.format=json
gg.handler.kafkarestproxy.payloadsize=1
gg.handler.kafkarestproxy.mode=tx

#Server auth properties
#gg.handler.kafkarestproxy.trustStore=/keys/truststore.jks
#gg.handler.kafkarestproxy.trustStorePassword=test1234
#Client auth properites
#gg.handler.kafkarestproxy.keyStore=/keys/keystore.jks
#gg.handler.kafkarestproxy.keyStorePassword=test1234

#Proxy properties
#gg.handler.kafkarestproxy.proxy=http://proxyurl:80
#gg.handler.kafkarestproxy.proxyUserName=username
#gg.handler.kafkarestproxy.proxyPassword=password

#The MetaColumnTemplate formatter properties
gg.handler.kafkarestproxy.format.metaColumnsTemplate=${optype},${timestampmicro},${currenttimestampmicro}
8.2.9.3.2.6 Security

Security is possible between the following:

  • Kafka REST Proxy clients and the Kafka REST Proxy server. The Oracle GoldenGate REST Proxy Handler is a Kafka REST Proxy client.

  • The Kafka REST Proxy server and Kafka Brokers. Oracle recommends that you thoroughly review the security documentation and configuration of the Kafka REST Proxy server, see https://docs.confluent.io/current/kafka-rest/docs/index.html

REST Proxy supports SSL for securing communication between clients and the Kafka REST Proxy Handler. To configure SSL:

  1. Generate a keystore using the scripts, see Generating a Keystore or Truststore.

  2. Update the Kafka REST Proxy server configuration in the kafka-rest.properties file with these properties:

    listeners=https://hostname:8083
    confluent.rest.auth.propagate.method=SSL
    
    Configuration Options for HTTPS
    ssl.client.auth=true
    ssl.keystore.location={keystore_file_path}/server.keystore.jks
    ssl.keystore.password=test1234
    ssl.key.password=test1234
    ssl.truststore.location={keystore_file_path}/server.truststore.jks
    ssl.truststore.password=test1234
    ssl.keystore.type=JKS
    ssl.truststore.type=JKS
    ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
  3. Restart your server.

To disable mutual authentication, you update the ssl.client.auth= property from true to false.

8.2.9.3.2.7 Generating a Keystore or Truststore

Generating a Truststore

You execute this script to generate the ca-cert, ca-key, and truststore.jks truststore files.

#!/bin/bash
PASSWORD=password
CLIENT_PASSWORD=password
VALIDITY=365

Then you generate a CA as in this example:

openssl req -new -x509 -keyout ca-key -out ca-cert -days $VALIDITY -passin pass:$PASSWORD
      -passout pass:$PASSWORD -subj "/C=US/ST=CA/L=San Jose/O=Company/OU=Org/CN=FQDN"
      -nodes

Lastly, you add the CA to the server's truststore using keytool:

keytool -keystore truststore.jks -alias CARoot -import -file ca-cert -storepass $PASSWORD
      -keypass $PASSWORD

Generating a Keystore

You run this script and pass the fqdn as argument to generate the ca-cert.srl, cert-file, cert-signed, and keystore.jks keystore files.

#!/bin/bash
PASSWORD=password
VALIDITY=365

if [ $# -lt 1 ];
then
echo "`basename $0` host fqdn|user_name|app_name"
exit 1
fi

CNAME=$1
ALIAS=`echo $CNAME|cut -f1 -d"."`

Then you generate the keystore with keytool as in this example:

keytool -noprompt ¿keystore keystore.jks -alias $ALIAS -keyalg RSA -validity $VALIDITY
      -genkey -dname "CN=$CNAME,OU=BDP,O=Company,L=San Jose,S=CA,C=US" -storepass $PASSWORD
      -keypass $PASSWORD

Next, you sign all the certificates in the keystore with the CA:

keytool -keystore keystore.jks -alias $ALIAS -certreq -file cert-file -storepass
      $PASSWORDopenssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days $VALIDITY
      -CAcreateserial -passin pass:$PASSWORD

Lastly, you import both the CA and the signed certificate into the keystore:

keytool -keystore keystore.jks -alias CARoot -import -file ca-cert -storepass
      $PASSWORDkeytool -keystore keystore.jks -alias $ALIAS -import -file cert-signed -storepass
      $PASSWORD
8.2.9.3.2.8 Using Templates to Resolve the Topic Name and Message Key

The Kafka REST Proxy Handler provides functionality to resolve the topic name and the message key at runtime using a template configuration value. Templates allow you to configure static values and keywords. Keywords are used to dynamically replace the keyword with the context of the current processing. The templates use the following configuration properties:

gg.handler.name.topicMappingTemplate
gg.handler.name.keyMappingTemplate

Template Modes

The Kafka REST Proxy Handler can be configured to send one message per operation (insert, update, delete). Alternatively, it can be configured to group operations into messages at the transaction level.

For more information about the Template Keywords, see Template Keywords.

Example Templates

The following describes example template configuration values and the resolved values.

Example Template Resolved Value

${groupName}_${fullyQualfiedTableName}

KAFKA001_dbo.table1

prefix_${schemaName}_${tableName}_suffix

prefix_dbo_table1_suffix

${currentDate[yyyy-mm-dd hh:MM:ss.SSS]}

2017-05-17 11:45:34.254

8.2.9.3.2.9 Kafka REST Proxy Handler Formatter Properties

The following are the configurable values for the Kafka REST Proxy Handler Formatter.

Table 8-11 Kafka REST Proxy Handler Formatter Properties

Properties Optional/ Optional Legal Values Default Explanation
gg.handler.name.format.includeOpType

Optional

true | false

true

Set to true to create a field in the output messages called op_ts. The value is an indicator of the type of source database operation (for example, Ifor insert, Ufor update, Dfor delete).

Set to false to omit this field in the output.

gg.handler.name.format.includeOpTimestamp

Optional

true | false

true

Set to true to create a field in the output messages called op_type. The value is the operation timestamp (commit timestamp) from the source trail file.

Set to false to omit this field in the output.

gg.handler.name.format.includeCurrentTimestamp

Optional

true | false

true

Set to true to create a field in the output messages called current_ts. The value is the current timestamp of when the handler processes the operation.

Set to false to omit this field in the output.

gg.handler.name.format.includePosition

Optional

true | false

true

Set to true to create a field in the output messages called pos. The value is the position (sequence number + offset) of the operation from the source trail file.

Set to false to omit this field in the output.

gg.handler.name.format.includePrimaryKeys

Optional

true | false

true

Set to true to create a field in the output messages called primary_keys. The value is an array of the column names of the primary key columns.

Set to false to omit this field in the output.

gg.handler.name.format.includeTokens

Optional

true | false

true

Set to true to include a map field in output messages. The key is tokens and the value is a map where the keys and values are the token keys and values from the Oracle GoldenGate source trail file.

Set to false to suppress this field.

gg.handler.name.format.insertOpKey

Optional

Any string.

I

The value of the field op_type that indicates an insert operation.

gg.handler.name.format.updateOpKey

Optional

Any string.

U

The value of the field op_type that indicates an update operation.

gg.handler.name.format.deleteOpKey

Optional

Any string.

D

The value of the field op_type that indicates an delete operation.

gg.handler.name.format.truncateOpKey

Optional

Any string.

T

The value of the field op_type that indicates an truncate operation.

gg.handler.name.format.treatAllColumnsAsStrings

Optional

true | false

false

Set to true treat all output fields as strings.

Set to false and the handler maps the corresponding field type from the source trail file to the best corresponding Kafka data type.

gg.handler.name.format.mapLargeNumbersAsStrings

Optional

true | false

false

Set to true and these fields are mapped as strings to preserve precision. This property is specific to the Avro Formatter; it cannot be used with other formatters.

gg.handler.name.format.iso8601Format

Optional

true | false

false

Set to true to output the current date in the ISO8601 format.

gg.handler.name.format.pkUpdateHandling

Optional

abend | update | delete-insert

abend

It is only applicable if you are modeling row messages with the .(gg.handler.name.format.messageFormatting=row property. It is not applicable if you are modeling operations messages as the before and after images are propagated to the message with an update.

8.2.9.3.3 Consuming the Records

A simple way to consume data from Kafka topics using the Kafka REST Proxy Handler is Curl.

Consume JSON Data

  1. Create a consumer for JSON data.

    curl -k -X POST -H "Content-Type: application/vnd.kafka.v2+json"  
    
    https://localhost:8082/consumers/my_json_consumer
  2. Subscribe to a topic.

    curl -k -X POST -H "Content-Type: application/vnd.kafka.v2+json"    --data '{"topics":["topicname"]}' \
    
    https://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription
  3. Consume records.

    curl –k -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
    
    https://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records
    

Consume Avro Data

  1. Create a consumer for Avro data.

    curl -k -X POST  -H "Content-Type: application/vnd.kafka.v2+json" \
     --data '{"name": "my_consumer_instance", "format": "avro", "auto.offset.reset": "earliest"}' \
    
    https://localhost:8082/consumers/my_avro_consumer
  2. Subscribe to a topic.

    curl –k -X POST -H "Content-Type: application/vnd.kafka.v2+json"      --data '{"topics":["topicname"]}' \
    
    https://localhost:8082/consumers/my_avro_consumer/instances/my_consumer_instance/subscription
  3. Consume records.

    curl -X GET -H "Accept: application/vnd.kafka.avro.v2+json" \
    
    https://localhost:8082/consumers/my_avro_consumer/instances/my_consumer_instance/records

Note:

If you are using curl from the machine hosting the REST proxy, then unset the http_proxy environmental variable before consuming the messages. If you are using curl from the local machine to get messages from the Kafka REST Proxy, then setting the http_proxy environmental variable may be required.

8.2.9.3.4 Performance Considerations

There are several configuration settings both for the Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA) configuration and in the Kafka producer that affects performance.

The Oracle GoldenGate parameter that has the greatest affect on performance is the Replicat GROUPTRANSOPS parameter. It allows Replicat to group multiple source transactions into a single target transaction. At transaction commit, the Kafka REST Proxy Handler POST’s the data to the Kafka Producer.

Setting the Replicat GROUPTRANSOPS to a larger number allows the Replicat to call the POST less frequently improving performance. The default value for GROUPTRANSOPS is 1000 and performance can be improved by increasing the value to 2500, 5000, or even 10000.

8.2.9.3.5 Kafka REST Proxy Handler Metacolumns Template Property

Problems Starting Kafka REST Proxy server

The script to start the Kafka REST Proxy server appends its CLASSPATH to the environment CLASSPATH variable. If set, the environment CLASSPATH can contain JAR files that conflict with the correct execution of the Kafka REST Proxy server and may prevent it from starting. Oracle recommends that you unset the CLASSPATH environmental variable before started your Kafka REST Proxy server. Reset the CLASSPATH to “” to overcome the problem.