8.1.4 Apache Kafka

The Oracle GoldenGate capture (Extract) for Kafka is used to read messages from a Kafka topic or topics and convert data into logical change records written to GoldenGate trail files. This section explains how to use Oracle GoldenGate capture for Kafka.

8.1.4.1 Overview

Kafka has gained market traction in recent years and become a leader in the enterprise messaging space. Kafka is a cluster-based messaging system that provides high availability, fail over, data integrity through redundancy, and high performance. Kafka is now the leading application for implementations of the Enterprise Service Bus architecture. Kafka Capture extract process reads messages from Kafka and transforms those messages into logical change records which are written to Oracle GoldenGate trail files. The generated trail files can then be used to propagate the data in the trail file to various RDBMS implementations or other integrations supported by Oracle GoldenGate replicat processes.

8.1.4.2 Prerequisites

8.1.4.2.1 Set up Credential Store Entry to Detect Source Type

The database type for capture is based on the prefix in the database credential userid. The generic format for userid is as follows: <dbtype>://<db-user>@<comma separated list of server addresses>:<port>

The userid value for Kafka capture should be any value with the prefix kafka://. You can set up Credential Store Entry in Administration Service/ DB Connections.

Example
alter credentialstore add user kafka:// password somepass alias kafka

Note:

You can specify a dummy Password for Kafka while setting up the credentials.

8.1.4.3 General Terms and Functionality of Kafka Capture

8.1.4.3.1 Kafka Streams

As a Kafka consumer, you can read from one or more topics. Additionally, each topic can be divided into one or more partitions. Each discrete topic/partition combination is a Kafka stream. This topic discusses Kafka streams extensively and it is important to clearly define the term here.

The following is an example of five Kafka streams:
  • Topic: TEST1 Partition: 0
  • Topic: TEST1 Partition: 1
  • Topic: TEST2 Partition: 0
  • Topic: TEST2 Partition: 1
  • Topic: TEST2 Partition: 2

8.1.4.3.2 Kafka Message Order

Messages received from the KafkaConsumer for an individual stream should be in the order as stored in the Kafka commit log. However, Kafka streams move independently from one another and the order in which messages are received from different streams is nondeterministic.

For example, Kafka Capture is consuming messages from two streams:
  • Stream 1: Topic TEST1, partition 0
  • Stream 2: Topic TEST1, partition 1
Stream 1 in Topic|partition|offset|timestamp format total of 5 messages.
TEST1|0|0|1588888086210
TEST1|0|1|1588888086220
TEST1|0|2|1588888086230
TEST1|0|3|1588888086240
TEST1|0|4|1588888086250
Stream 2 to Topic|partition|offset|timestamp format total of 5 messages.
TEST1|1|0|1588888086215
TEST1|1|1|1588888086225
TEST1|1|2|1588888086235
TEST1|1|3|1588888086245
TEST1|1|4|1588888086255
The Kafka Consumer could deliver the messages in the following order on run 1.
TEST1|1|0|1588888086215
TEST1|1|1|1588888086225
TEST1|0|0|1588888086210
TEST1|0|1|1588888086220
TEST1|0|2|1588888086230
TEST1|0|3|1588888086240
TEST1|0|4|1588888086250
TEST1|1|2|1588888086235
TEST1|1|3|1588888086245
TEST1|1|4|1588888086255
On a secondary run messages could be delivered in the following order.
TEST1|0|0|1588888086210
TEST1|0|1|1588888086220
TEST1|1|0|1588888086215
TEST1|1|1|1588888086225
TEST1|0|2|1588888086230
TEST1|0|3|1588888086240
TEST1|0|4|1588888086250
TEST1|1|2|1588888086235
TEST1|1|3|1588888086245
TEST1|1|4|1588888086255

Note:

In the two runs that the messages belonging to the same Kafka stream are delivered in order as they occur in that stream. However, messages from different streams are interlaced in a nondeterministic manner.

8.1.4.3.3 Kafka Message Timestamps

Each Kafka message has a timestamp associated with it. The timestamp on the Kafka message maps to the operation timestamp for the record in the generated trail file. Timestamps on Kafka messages are not guaranteed to be monotonically increasing even in the case where extract is reading from only one stream (single topic and partition). Kafka has no requirement that Kafka message timestamps are monotonically increasing even within a stream. The Kafka Producer provides an API whereby the message timestamp can be explicitly set on messages. This means a Kafka Producer can set the Kafka message timestamp to any value.

When reading from multiple topics and/or a topic with multiple partitions it is almost certain that trail files generated by Kafka capture will not have operation timestamps that are monotonically increasing. Kafka streams move independently from one another and there is no guarantee of delivery order for messages received from different streams. Messages from different streams can interlace in any random order when the Kafka Consumer is reading them from a Kafka cluster.

8.1.4.3.4 Kafka Message Coordinates

Kafka Capture performs message gap checking to ensure message consistency withing the context of a message stream. For every Kafka stream from which Kafka capture is consuming messages, there should be no gap in the Kafka message offset sequence.

If a gap is found in the message offset sequence, then the Kafka capture logs an error and the Kafka Capture extract process will abend.

Message gap checking can be disabled by setting the following in the parameter file.

SETENV (PERFORMMESSAGEGAPCHECK = "false").

8.1.4.3.5 Start Extract Modes

Extract can be configured to start replication from two distinct points. You can select Extract starting positions from the UI in the Extract Options step, under the Begin section. You can either click Now or define a Custom Time.

8.1.4.3.5.1 Start Earliest
Start Kafka Capture from the oldest available message in Kafka.
ggsci> ADD EXTRACT kafka, TRANLOG
ggsci> ADD EXTRAIL dirdat/kc, extract kafka
ggsci> START EXTRACT kafka
8.1.4.3.5.2 Start Timestamp
Start Kafka Capture from the oldest available message in Kafka.
ggsci> ADD EXTRACT kafka, TRANLOG BEGIN 2019-03-27 23:05:05.123456
ggsci> ADD EXTRAIL dirdat/kc, extract kafka
ggsci> START EXTRACT kafka
Or alternatively, start now as now is a point in time.
ggsci> ADD EXTRACT kafka, TRANLOG BEGIN NOW
ggsci> ADD EXTRAIL dirdat/kc, extract kafka
ggsci> START EXTRACT kafka

Note:

Note on starting from a point in time. Kafka Capture will start from the first available record in the stream which fits the criteria (time equal to or greater than the configured time). Replicat will continue from that first message regardless of the timestamps of subsequent messages. As previously discussed, there is no guarantee or requirement that Kafka message timestamps are monotonically increasing.

Alter Extract

Alter Timestamp
ggsci> STOP EXTRACT kafka
ggsci> ALTER EXTRACT kafka BEGIN {Timestamp}
ggsci> START EXTRACT kafka 

Alter Now

ggsci> STOP EXTRACT kafka
ggsci> ALTER EXTRACT kafka BEGIN NOW
ggsci> START EXTRACT kafka 

8.1.4.3.6 General Configuration Overview

8.1.4.3.7 OGGSOURCE parameter

To enable Kafka extract replication, the GLOBALS parameter file must be configured as follows:
OGGSOURCE KAFKA
JVMCLASSPATH ggjava/ggjava.jar:/kafka/client/path/*:dirprm
JVMBOOTOPTIONS -Xmx512m -Dlog4j.configurationFile=log4j-default.properties -Dgg.log.level=INFO

OGGSOURCE KAFKA: The first line indicates that the source of replication is Kafka.

JVMCLASSPATH ggjava/ggjava.jar:/kafka/client/path/*:dirprm: The second line sets the Java JVM classpath. The Java classpath provides the pathing to load all the required Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA) and Kafka client libraries. The Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA) library should be first in the list (ggjava.jar). The Kafka client libraries, the Kafka Connect framework, and the Kafka Connect converters are not included with the Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA) installation. These libraries must be obtained independently. Oracle recommends you to use the same version of the Kafka client as the version of the Kafka broker to which you are connecting. The Dependency Downloading tool can be used to download the dependency libraries. Alternately, the pathing can be set to a Kafka installation. For more information about Dependency Downloader, see Dependency Downloader.

JVMBOOTOPTIONS -Xmx512m -Dlog4j.configurationFile=log4j-default.properties -Dgg.log.level=INFO: The third line is the JVM boot options. Use this to configure the maximum Java heap size (-Xmx512m) and the log4j logging parameters to generate the .log file (-Dlog4j.configurationFile=log4j-default.properties -Dgg.log.level=INFO)

Note:

Starting from Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA) release 23c, this parameter will be deprecated.

8.1.4.3.8 The Extract Parameter File

The extract process configured is configured via a .prm file. The format for the naming of the parameter file is <extract name>.prm. For example, the extract parameter file for the extract process kc would be kc.prm.
EXTRACT KC
-- alter credentialstore add user kafka:// password <somepass> alias kafka
SOURCEDB USERIDALIAS kafka
JVMOPTIONS CLASSPATH ggjava/ggjava.jar:/kafka/client/path/*
JVMOPTIONS BOOTOPTIONS -Xmx512m -Dlog4j.configurationFile=log4j-default.properties -Dgg.log.level=INFO
TRANLOGOPTIONS GETMETADATAFROMVAM
TRANLOGOPTIONS KAFKACONSUMERPROPERTIES kafka_consumer.properties
EXTTRAIL dirdat/kc
TABLE QASOURCE.TOPIC1;

EXTRACT KC: The first line sets the name of the extract process.

TRANLOGOPTIONS KAFKACONSUMERPROPERTIES kafka_consumer.properties: This line sets the name and location of the Kafka Consumer properties file. The Kafka Consumer properties is a file containing the Kafka specific configuration which configures connectivity and security to the Kafka cluster. Documentation on the Kafka Consumer properties can be found in: Kafka Documentation.

EXTTRAIL dirdat/kc: The fourth line sets the location and prefix of the trail files to be generated.
TABLE QASOURCE.TOPIC1;: The fifth line is the extract TABLE statement. There can be one or more TABLE statements. The schema name in the example is QASOURCE. The schema name is an OGG artifact and it is required. It can be set to any legal string. The schema name cannot be wildcarded. Each extact process only supports one schema name. The configured table name maps to the Kafka topic name. The table configuration does support wildcards. Legal Kafka topic names can have the following characters.
  • a-z (lowercase a to z)
  • A-Z (uppercase A to Z)
  • 0-9 (digits 0 to 9)
  • . (period)
  • _ (underscore)
  • - (hyphen)
If the topic name contains a period, underscore, or hyphen, please include the table name in quotes in the configuration. Topic names are case sensitive so the topic MYTOPIC1 and MyTopic1 are different Kafka topics.
Examples of legal extract table statements:
TABLE TESTSCHEMA.TEST*;
TABLE TESTSCHEMA.MyTopic1;
TABLE TESTSCHEMA.”My.Topic1”;
Examples of illegal configuration - multiple schema names are used:
TABLE QASOURCE.TEST*;
TABLE TESTSCHEMA.MYTOPIC1;
Example of illegal configuration – Table with special characters not quoted.
TABLE QASOURE.My.Topic1;
Example of illegal configuration – Schema name is a wildcard.
TABLE *.*;

Optional .prm configuration.

Kafka Capture performs message gap checking to ensure message continuity. To disable message gap checking set:
SETENV (PERFORMMESSAGEGAPCHECK = "false")

8.1.4.3.9 Kafka Consumer Properties File

The Kafka Consumer properties file contains the properties to configure the Kafka Consumer including how to connect to the Kafka cluster and security parameters.

Example:
#Kafka Properties
bootstrap.servers=den02box:9092
group.id=mygroupid
key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
8.1.4.3.9.1 Encrypt Kafka Producer Properties
The sensitive properties within the Kafka Producer Configuration File can be encrypted using the Oracle GoldenGate Credential Store.

For more information about how to use Credential Store, see Using Identities in Oracle GoldenGate Credential Store.

For example, the following kafka property:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule  required
username="alice" password="alice"; 
can be replaced with:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule  required
username=ORACLEWALLETUSERNAME[alias domain_name]  password=ORACLEWALLETPASSWORD[alias
domain_name];

8.1.4.4 Generic Mutation Builder

The default mode is to use the Generic Mutation Builder to transform Kafka messages into trail file operations. Kafka messages are comprised of data in any format. Kafka messages can be delimited text, JSON, Avro, XML, text, etc. This makes the mapping of data from a Kafka message into a logical change record challenging. However, Kafka message keys and payload values are at their fundamental form just byte arrays. The Generic Kafka Replication simply propagates the Kafka message key and Kafka message value as byte arrays. Generic Kafka Replication transforms the data into operations of three fields. The three fields are as follows:
  • id: This is the primary key for the table. It is typed as a string. The value is the coordinates of the message in Kafka in the following format: topic name:partition number:offset. For example, the value for topic TEST, partition 1, and offset 245 would be TEST:1:245.

  • key: This is the message key field from the source Kafka message. The field is typed as binary. The value of the field is the key from the source Kafka message propagated as bytes.

  • payload: This is the message payload or value from the source Kafka message. The field is typed as binary. The value of the field is the payload from the source Kafka message propagated as bytes.

Features of the Generic Mutation Builder
  • All records are propagated as insert operations.
  • Each Kafka message creates an operation in its own transaction.
Logdump 2666 >n
___________________________________________________________________ 
Hdr-Ind    :     E  (x45)     Partition  :     .  (x00)  
UndoFlag   :     .  (x00)     BeforeAfter:     A  (x41)  
RecLength  :   196  (x00c4)   IO Time    : 2021/07/22 14:57:25.085.436   
IOType     :   170  (xaa)     OrigNode   :     2  (x02) 
TransInd   :     .  (x03)     FormatType :     R  (x52) 
SyskeyLen  :     0  (x00)     Incomplete :     .  (x00) 
DDR/TDR index:   (001, 001)     AuditPos   : 0 
Continued  :     N  (x00)     RecCount   :     1  (x01) 

2021/07/22 14:57:25.085.436 Metadata             Len 196 RBA 1335 
Table Name:  QASOURCE.TOPIC1 
*
 1)Name          2)Data Type        3)External Length  4)Fetch Offset      5)Scale         6)Level
 7)Null          8)Bump if Odd      9)Internal Length 10)Binary Length    11)Table Length 12)Most Sig DT
13)Least Sig DT 14)High Precision  15)Low Precision   16)Elementary Item  17)Occurs       18)Key Column
19)Sub DataType 20)Native DataType 21)Character Set   22)Character Length 23)LOB Type     24)Partial Type
25)Remarks
*
TDR version: 11
Definition for table QASOURCE.TOPIC1
Record Length: 20016
Columns: 3
id        64   8000        0  0  0 0 0   8000   8000      0 0 0 0 0 1    0 1   0   12       -1      0 0 0  
key       64  16000     8005  0  0 1 0   8000   8000      0 0 0 0 0 1    0 0   4   -3       -1      0 0 0  
payload   64   8000    16010  0  0 1 0   4000   4000      0 0 0 0 0 1    0 0   4   -4       -1      0 1 0  
End of definition

8.1.4.5 Kafka Connect Mutation Builder

The Kafka Connect Mutation Builder parses Kafka Connect messages into logical change records and that are then written to Oracle GoldenGate trail files.

8.1.4.5.1 Functionality and Limitations of the Kafka Connect Mutation Builder

  • All records are propagated as insert operations.
  • Each Kafka message creates an operation in its own transaction.
  • The Kafka message key must be a Kafka Connect primitive type or logical type.
  • The Kafka message value must be either a primitive type/logical type or a record containing only primitive types, logical types, and container types. A record cannot contain another record as nested records are not currently supported.
  • Kafka Connect array data types are mapped into binary fields. The content of the binary field will be the source array converted into a serialized JSON array.
  • Kafka Connect map data types are mapped into binary fields. The contents of the binary field will be the source map converted into a serialized JSON.
  • The source Kafka messages must be Kafka Connect messages.
  • Kafka Connect Protobuf messages are not currently supported. (The current Kafka Capture functionality only supports primitive or logical types for the Kafka message key. The Kafka Connect Protobuf Converter does not support stand only primitives or logical types.)
  • Each source topic must contain messages which conform to the same schema. Interlacing messages in the same Kafka topic which conform to different Kafka Connect schema is not currently supported.
  • Schema changes are not currently supported.

8.1.4.5.2 Primary Key

A primary key field is created in the output as a column named gg_id. The value of this field is the concatentated topic name, partition, and offset delimited by the : character. For example: TOPIC1:0:1001.

8.1.4.5.3 Kafka Message Key

The message key is mapped into a called named gg_key.

8.1.4.5.4 Kafka Connect Supported Types

Supported Primitive Types
  • String
  • 8 bit Integer
  • 16 bit Integer
  • 32 bit Integer
  • 64 bit Integer
  • Boolean
  • 32 bit Float
  • 64 bit Float
  • Bytes (binary)
Supported Logical Types
  • Decimal
  • Timestamp
  • Date
  • Time

Supported Container Types

  • Array – Only arrays of primitive or logical types are supported. Data is mapped as a binary field the value of which is a JSON array document containing the contents of the source array.
  • List – Only lists of primitive or logical types are supported. Data is mapped as a binary field the value of which is a JSON document containing the contents of the source list.

8.1.4.5.5 How to Enable the Kafka Connect Mutation Builder

The Kafka Connect Mutation Builder is enabled by configuration of the Kafka Connect key and value converters in the Kafka Producer properties file.

For the Kafka Connect JSON Converter
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

For the Kafka Connect Avro Converter

key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter.schema.registry.url=http://localhost:8081

The Kafka Capture functionality reads the Kafka producer properties file. If the Kafka Connect converters are configured, then the Kafka Connect mutation builder is invoked.

Sample metadata from the trail file using logdump

2021/08/03 09:06:05.243.881 Metadata             Len 1951 RBA 1335 
Table Name: TEST.KC 
*
 1)Name          2)Data Type        3)External Length  4)Fetch Offset      5)Scale         6)Level
 7)Null          8)Bump if Odd      9)Internal Length 10)Binary Length    11)Table Length 12)Most Sig DT
13)Least Sig DT 14)High Precision  15)Low Precision   16)Elementary Item  17)Occurs       18)Key Column
19)Sub DataType 20)Native DataType 21)Character Set   22)Character Length 23)LOB Type     24)Partial Type
25)Remarks
*
TDR version: 11
Definition for table TEST.KC
Record Length: 36422
Columns: 30
gg_id                64   8000        0  0  0 0 0   8000   8000      0 0 0 0 0 1    0 1   0   12       -1      0 0 0  
gg_key               64   4000     8005  0  0 1 0   4000   4000      0 0 0 0 0 1    0 0   0   -1       -1      0 1 0  
string_required      64   4000    12010  0  0 1 0   4000   4000      0 0 0 0 0 1    0 0   0   -1       -1      0 1 0  
string_optional      64   4000    16015  0  0 1 0   4000   4000      0 0 0 0 0 1    0 0   0   -1       -1      0 1 0  
byte_required       134     23    20020  0  0 1 0      8      8      8 0 0 0 0 1    0 0   0    4       -1      0 0 0  
byte_optional       134     23    20031  0  0 1 0      8      8      8 0 0 0 0 1    0 0   0    4       -1      0 0 0  
short_required      134     23    20042  0  0 1 0      8      8      8 0 0 0 0 1    0 0   0    4       -1      0 0 0  
short_optional      134     23    20053  0  0 1 0      8      8      8 0 0 0 0 1    0 0   0    4       -1      0 0 0  
integer_required    134     23    20064  0  0 1 0      8      8      8 0 0 0 0 1    0 0   0    4       -1      0 0 0  
integer_optional    134     23    20075  0  0 1 0      8      8      8 0 0 0 0 1    0 0   0    4       -1      0 0 0  
long_required       134     23    20086  0  0 1 0      8      8      8 0 0 0 0 1    0 0   0   -5       -1      0 0 0  
long_optional       134     23    20097  0  0 1 0      8      8      8 0 0 0 0 1    0 0   0   -5       -1      0 0 0  
boolean_required      0      2    20108  0  0 1 0      1      1      0 0 0 0 0 1    0 0   4   -2       -1      0 0 0  
boolean_optional      0      2    20112  0  0 1 0      1      1      0 0 0 0 0 1    0 0   4   -2       -1      0 0 0  
float_required      141     50    20116  0  0 1 0      8      8      8 0 0 0 0 1    0 0   0    6       -1      0 0 0  
float_optional      141     50    20127  0  0 1 0      8      8      8 0 0 0 0 1    0 0   0    6       -1      0 0 0  
double_required     141     50    20138  0  0 1 0      8      8      8 0 0 0 0 1    0 0   0    8       -1      0 0 0  
double_optional     141     50    20149  0  0 1 0      8      8      8 0 0 0 0 1    0 0   0    8       -1      0 0 0  
bytes_required       64   8000    20160  0  0 1 0   4000   4000      0 0 0 0 0 1    0 0   4   -4       -1      0 1 0  
bytes_optional       64   8000    24165  0  0 1 0   4000   4000      0 0 0 0 0 1    0 0   4   -4       -1      0 1 0  
decimal_required     64     50    28170  0  0 1 0     50     50      0 0 0 0 0 1    0 0   0   12       -1      0 0 0  
decimal_optional     64     50    28225  0  0 1 0     50     50      0 0 0 0 0 1    0 0   0   12       -1      0 0 0  
timestamp_required  192     29    28280  0  0 1 0     29     29     29 0 6 0 0 1    0 0   0   11       -1      0 0 0  
timestamp_optional  192     29    28312  0  0 1 0     29     29     29 0 6 0 0 1    0 0   0   11       -1      0 0 0  
date_required       192     10    28344  0  0 1 0     10     10     10 0 2 0 0 1    0 0   0    9       -1      0 0 0  
date_optional       192     10    28357  0  0 1 0     10     10     10 0 2 0 0 1    0 0   0    9       -1      0 0 0  
time_required       192     18    28370  0  0 1 0     18     18     18 3 6 0 0 1    0 0   0   10       -1      0 0 0  
time_optional       192     18    28391  0  0 1 0     18     18     18 3 6 0 0 1    0 0   0   10       -1      0 0 0  
array_optional       64   8000    28412  0  0 1 0   4000   4000      0 0 0 0 0 1    0 0   4   -4       -1      0 1 0  
map_optional         64   8000    32417  0  0 1 0   4000   4000      0 0 0 0 0 1    0 0   4   -4       -1      0 1 0  
End of definition

8.1.4.6 Example Configuration Files

8.1.4.6.1 Example kc.prm file

EXTRACT KC
OGGSOURCE KAFKA
JVMOOPTIONS CLASSPATH ggjava/ggjava.jar:/path/to/kafka/libs/*
TRANLOGOPTIONS GETMETADATAFROMVAM
--Uncomment the following line to disable Kafka message gap checking.
--SETENV (PERFORMMESSAGEGAPCHECK = "false")
TRANLOGOPTIONS KAFKACONSUMERPROPERTIES kafka_consumer.properties
EXTTRAIL dirdat/kc
TABLE TEST.KC;

8.1.4.6.2 Example Kafka Consumer Properties File

#Kafka Properties
bootstrap.servers=localhost:9092
group.id=someuniquevalue
key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer

#JSON Converter Settings
#Uncomment to use the Kafka Connect Mutation Builder with JSON Kafka Connect Messages
#key.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter=org.apache.kafka.connect.json.JsonConverter

#Avro Converter Settings
#Uncomment to use the Kafka Connect Mutation Builder with Avro Kafka Connect Messages
#key.converter=io.confluent.connect.avro.AvroConverter
#value.converter=io.confluent.connect.avro.AvroConverter
#key.converter.schema.registry.url=http://localhost:8081
#value.converter.schema.registry.url=http://localhost:8081