8.1.4 Apache Kafka
The Oracle GoldenGate capture (Extract) for Kafka is used to read messages from a Kafka topic or topics and convert data into logical change records written to GoldenGate trail files. This section explains how to use Oracle GoldenGate capture for Kafka.
- Overview
- Prerequisites
- General Terms and Functionality of Kafka Capture
- Generic Mutation Builder
- Kafka Connect Mutation Builder
- Example Configuration Files
Parent topic: Source
8.1.4.1 Overview
Kafka has gained market traction in recent years and become a leader in the enterprise messaging space. Kafka is a cluster-based messaging system that provides high availability, fail over, data integrity through redundancy, and high performance. Kafka is now the leading application for implementations of the Enterprise Service Bus architecture. Kafka Capture extract process reads messages from Kafka and transforms those messages into logical change records which are written to Oracle GoldenGate trail files. The generated trail files can then be used to propagate the data in the trail file to various RDBMS implementations or other integrations supported by Oracle GoldenGate replicat processes.
Parent topic: Apache Kafka
8.1.4.2 Prerequisites
8.1.4.2.1 Set up Credential Store Entry to Detect Source Type
userid
. The generic format for userid
is as
follows: <dbtype>://<db-user>@<comma separated list of server
addresses>:<port>
The
userid
value for Kafka capture should be any value with the
prefix kafka://
. You can set up Credential Store Entry in
Administration Service/ DB Connections.
alter credentialstore add user kafka:// password somepass alias kafka
Note:
You can specify a dummy Password for Kafka while setting up the credentials.Parent topic: Prerequisites
8.1.4.3 General Terms and Functionality of Kafka Capture
- Kafka Streams
- Kafka Message Order
- Kafka Message Timestamps
- Kafka Message Coordinates
- Start Extract Modes
- General Configuration Overview
- OGGSOURCE parameter
- The Extract Parameter File
- Kafka Consumer Properties File
Parent topic: Apache Kafka
8.1.4.3.1 Kafka Streams
As a Kafka consumer, you can read from one or more topics. Additionally, each topic can be divided into one or more partitions. Each discrete topic/partition combination is a Kafka stream. This topic discusses Kafka streams extensively and it is important to clearly define the term here.
- Topic: TEST1 Partition: 0
- Topic: TEST1 Partition: 1
- Topic: TEST2 Partition: 0
- Topic: TEST2 Partition: 1
- Topic: TEST2 Partition: 2
Parent topic: General Terms and Functionality of Kafka Capture
8.1.4.3.2 Kafka Message Order
Messages received from the KafkaConsumer for an individual stream should be in the order as stored in the Kafka commit log. However, Kafka streams move independently from one another and the order in which messages are received from different streams is nondeterministic.
- Stream 1: Topic TEST1, partition 0
- Stream 2: Topic TEST1, partition 1
TEST1|0|0|1588888086210 TEST1|0|1|1588888086220 TEST1|0|2|1588888086230 TEST1|0|3|1588888086240 TEST1|0|4|1588888086250
TEST1|1|0|1588888086215 TEST1|1|1|1588888086225 TEST1|1|2|1588888086235 TEST1|1|3|1588888086245 TEST1|1|4|1588888086255
TEST1|1|0|1588888086215 TEST1|1|1|1588888086225 TEST1|0|0|1588888086210 TEST1|0|1|1588888086220 TEST1|0|2|1588888086230 TEST1|0|3|1588888086240 TEST1|0|4|1588888086250 TEST1|1|2|1588888086235 TEST1|1|3|1588888086245 TEST1|1|4|1588888086255
TEST1|0|0|1588888086210 TEST1|0|1|1588888086220 TEST1|1|0|1588888086215 TEST1|1|1|1588888086225 TEST1|0|2|1588888086230 TEST1|0|3|1588888086240 TEST1|0|4|1588888086250 TEST1|1|2|1588888086235 TEST1|1|3|1588888086245 TEST1|1|4|1588888086255
Note:
In the two runs that the messages belonging to the same Kafka stream are delivered in order as they occur in that stream. However, messages from different streams are interlaced in a nondeterministic manner.Parent topic: General Terms and Functionality of Kafka Capture
8.1.4.3.3 Kafka Message Timestamps
Each Kafka message has a timestamp associated with it. The timestamp on the Kafka message maps to the operation timestamp for the record in the generated trail file. Timestamps on Kafka messages are not guaranteed to be monotonically increasing even in the case where extract is reading from only one stream (single topic and partition). Kafka has no requirement that Kafka message timestamps are monotonically increasing even within a stream. The Kafka Producer provides an API whereby the message timestamp can be explicitly set on messages. This means a Kafka Producer can set the Kafka message timestamp to any value.
When reading from multiple topics and/or a topic with multiple partitions it is almost certain that trail files generated by Kafka capture will not have operation timestamps that are monotonically increasing. Kafka streams move independently from one another and there is no guarantee of delivery order for messages received from different streams. Messages from different streams can interlace in any random order when the Kafka Consumer is reading them from a Kafka cluster.
Parent topic: General Terms and Functionality of Kafka Capture
8.1.4.3.4 Kafka Message Coordinates
Kafka Capture performs message gap checking to ensure message consistency withing the context of a message stream. For every Kafka stream from which Kafka capture is consuming messages, there should be no gap in the Kafka message offset sequence.
If a gap is found in the message offset sequence, then the Kafka capture logs an error and the Kafka Capture extract process will abend.
Message gap checking can be disabled by setting the following in the parameter file.
SETENV (PERFORMMESSAGEGAPCHECK = "false")
.
Parent topic: General Terms and Functionality of Kafka Capture
8.1.4.3.5 Start Extract Modes
Extract can be configured to start replication from two distinct points. You can select Extract starting positions from the UI in the Extract Options step, under the Begin section. You can either click Now or define a Custom Time.
8.1.4.3.5.1 Start Earliest
ggsci> ADD EXTRACT kafka, TRANLOG ggsci> ADD EXTRAIL dirdat/kc, extract kafka ggsci> START EXTRACT kafka
Parent topic: Start Extract Modes
8.1.4.3.5.2 Start Timestamp
ggsci> ADD EXTRACT kafka, TRANLOG BEGIN 2019-03-27 23:05:05.123456 ggsci> ADD EXTRAIL dirdat/kc, extract kafka ggsci> START EXTRACT kafka
ggsci> ADD EXTRACT kafka, TRANLOG BEGIN NOW ggsci> ADD EXTRAIL dirdat/kc, extract kafka ggsci> START EXTRACT kafka
Note:
Note on starting from a point in time. Kafka Capture will start from the first available record in the stream which fits the criteria (time equal to or greater than the configured time). Replicat will continue from that first message regardless of the timestamps of subsequent messages. As previously discussed, there is no guarantee or requirement that Kafka message timestamps are monotonically increasing.Alter Extract
ggsci> STOP EXTRACT kafka ggsci> ALTER EXTRACT kafka BEGIN {Timestamp} ggsci> START EXTRACT kafka
Alter Now
ggsci> STOP EXTRACT kafka ggsci> ALTER EXTRACT kafka BEGIN NOW ggsci> START EXTRACT kafka
Parent topic: Start Extract Modes
8.1.4.3.6 General Configuration Overview
Parent topic: General Terms and Functionality of Kafka Capture
8.1.4.3.7 OGGSOURCE parameter
OGGSOURCE KAFKA JVMCLASSPATH ggjava/ggjava.jar:/kafka/client/path/*:dirprm JVMBOOTOPTIONS -Xmx512m -Dlog4j.configurationFile=log4j-default.properties -Dgg.log.level=INFO
OGGSOURCE KAFKA
: The first line indicates that the
source of replication is Kafka.
JVMCLASSPATH
ggjava/ggjava.jar:/kafka/client/path/*:dirprm
: The second line sets the
Java JVM classpath. The Java classpath provides the pathing to load all the required
Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA) and Kafka
client libraries. The Oracle GoldenGate for Distributed Applications and Analytics
(GG for DAA) library should be first in the list (ggjava.jar
). The
Kafka client libraries, the Kafka Connect framework, and the Kafka Connect
converters are not included with the Oracle GoldenGate for Distributed Applications
and Analytics (GG for DAA) installation. These libraries must be obtained
independently. Oracle recommends you to use the same version of the Kafka client as
the version of the Kafka broker to which you are connecting. The Dependency
Downloading tool can be used to download the dependency libraries. Alternately, the
pathing can be set to a Kafka installation. For more information about Dependency
Downloader, see Dependency Downloader.
JVMBOOTOPTIONS -Xmx512m
-Dlog4j.configurationFile=log4j-default.properties -Dgg.log.level=INFO
:
The third line is the JVM boot options. Use this to configure the maximum Java heap
size (-Xmx512m) and the log4j logging parameters to generate the
.log
file
(-Dlog4j.configurationFile=log4j-default.properties
-Dgg.log.level=INFO
)
Note:
Starting from Oracle GoldenGate for Distributed Applications and Analytics (GG for DAA) release 23c, this parameter will be deprecated.Parent topic: General Terms and Functionality of Kafka Capture
8.1.4.3.8 The Extract Parameter File
<extract name>.prm
. For
example, the extract parameter file for the extract process kc would be
kc.prm
.EXTRACT KC -- alter credentialstore add user kafka:// password <somepass> alias kafka SOURCEDB USERIDALIAS kafka JVMOPTIONS CLASSPATH ggjava/ggjava.jar:/kafka/client/path/* JVMOPTIONS BOOTOPTIONS -Xmx512m -Dlog4j.configurationFile=log4j-default.properties -Dgg.log.level=INFO TRANLOGOPTIONS GETMETADATAFROMVAM TRANLOGOPTIONS KAFKACONSUMERPROPERTIES kafka_consumer.properties EXTTRAIL dirdat/kc TABLE QASOURCE.TOPIC1;
EXTRACT KC
: The first line sets the name of the extract
process.
TRANLOGOPTIONS KAFKACONSUMERPROPERTIES
kafka_consumer.properties
: This line sets the name and location of the
Kafka Consumer properties file. The Kafka Consumer properties is a file containing
the Kafka specific configuration which configures connectivity and security to the
Kafka cluster. Documentation on the Kafka Consumer properties can be found in: Kafka Documentation.
EXTTRAIL dirdat/kc
: The fourth line sets the location and prefix of
the trail files to be generated.
TABLE QASOURCE.TOPIC1;
:
The fifth line is the extract TABLE
statement. There can be one or
more TABLE statements. The schema name in the example is QASOURCE
.
The schema name is an OGG artifact and it is required. It can be set to any legal
string. The schema name cannot be wildcarded. Each extact process only supports one
schema name. The configured table name maps to the Kafka topic name. The table
configuration does support wildcards. Legal Kafka topic names can have the following
characters.
- a-z (lowercase a to z)
- A-Z (uppercase A to Z)
- 0-9 (digits 0 to 9)
- . (period)
- _ (underscore)
- - (hyphen)
MYTOPIC1
and MyTopic1
are different Kafka
topics.
TABLE TESTSCHEMA.TEST*; TABLE TESTSCHEMA.MyTopic1; TABLE TESTSCHEMA.”My.Topic1”;
TABLE QASOURCE.TEST*; TABLE TESTSCHEMA.MYTOPIC1;
TABLE QASOURE.My.Topic1;
TABLE *.*;
Optional
.prm
configuration.
SETENV (PERFORMMESSAGEGAPCHECK = "false")
Parent topic: General Terms and Functionality of Kafka Capture
8.1.4.3.9 Kafka Consumer Properties File
The Kafka Consumer properties file contains the properties to configure the Kafka Consumer including how to connect to the Kafka cluster and security parameters.
#Kafka Properties bootstrap.servers=den02box:9092 group.id=mygroupid key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
8.1.4.3.9.1 Encrypt Kafka Producer Properties
For more information about how to use Credential Store, see Using Identities in Oracle GoldenGate Credential Store.
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required
username="alice" password="alice";
can be replaced with:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required
username=ORACLEWALLETUSERNAME[alias domain_name] password=ORACLEWALLETPASSWORD[alias
domain_name];
Parent topic: Kafka Consumer Properties File
8.1.4.4 Generic Mutation Builder
-
id: This is the primary key for the table. It is typed as a string. The value is the coordinates of the message in Kafka in the following format: topic name:partition number:offset. For example, the value for topic TEST, partition 1, and offset 245 would be TEST:1:245.
-
key: This is the message key field from the source Kafka message. The field is typed as binary. The value of the field is the key from the source Kafka message propagated as bytes.
-
payload: This is the message payload or value from the source Kafka message. The field is typed as binary. The value of the field is the payload from the source Kafka message propagated as bytes.
- All records are propagated as insert operations.
- Each Kafka message creates an operation in its own transaction.
Logdump 2666 >n ___________________________________________________________________ Hdr-Ind : E (x45) Partition : . (x00) UndoFlag : . (x00) BeforeAfter: A (x41) RecLength : 196 (x00c4) IO Time : 2021/07/22 14:57:25.085.436 IOType : 170 (xaa) OrigNode : 2 (x02) TransInd : . (x03) FormatType : R (x52) SyskeyLen : 0 (x00) Incomplete : . (x00) DDR/TDR index: (001, 001) AuditPos : 0 Continued : N (x00) RecCount : 1 (x01) 2021/07/22 14:57:25.085.436 Metadata Len 196 RBA 1335 Table Name: QASOURCE.TOPIC1 * 1)Name 2)Data Type 3)External Length 4)Fetch Offset 5)Scale 6)Level 7)Null 8)Bump if Odd 9)Internal Length 10)Binary Length 11)Table Length 12)Most Sig DT 13)Least Sig DT 14)High Precision 15)Low Precision 16)Elementary Item 17)Occurs 18)Key Column 19)Sub DataType 20)Native DataType 21)Character Set 22)Character Length 23)LOB Type 24)Partial Type 25)Remarks * TDR version: 11 Definition for table QASOURCE.TOPIC1 Record Length: 20016 Columns: 3 id 64 8000 0 0 0 0 0 8000 8000 0 0 0 0 0 1 0 1 0 12 -1 0 0 0 key 64 16000 8005 0 0 1 0 8000 8000 0 0 0 0 0 1 0 0 4 -3 -1 0 0 0 payload 64 8000 16010 0 0 1 0 4000 4000 0 0 0 0 0 1 0 0 4 -4 -1 0 1 0 End of definition
Parent topic: Apache Kafka
8.1.4.5 Kafka Connect Mutation Builder
The Kafka Connect Mutation Builder parses Kafka Connect messages into logical change records and that are then written to Oracle GoldenGate trail files.
- Functionality and Limitations of the Kafka Connect Mutation Builder
- Primary Key
- Kafka Message Key
- Kafka Connect Supported Types
- How to Enable the Kafka Connect Mutation Builder
Parent topic: Apache Kafka
8.1.4.5.1 Functionality and Limitations of the Kafka Connect Mutation Builder
- All records are propagated as insert operations.
- Each Kafka message creates an operation in its own transaction.
- The Kafka message key must be a Kafka Connect primitive type or logical type.
- The Kafka message value must be either a primitive type/logical type or a record containing only primitive types, logical types, and container types. A record cannot contain another record as nested records are not currently supported.
- Kafka Connect array data types are mapped into binary fields. The content of the binary field will be the source array converted into a serialized JSON array.
- Kafka Connect map data types are mapped into binary fields. The contents of the binary field will be the source map converted into a serialized JSON.
- The source Kafka messages must be Kafka Connect messages.
- Kafka Connect Protobuf messages are not currently supported. (The current Kafka Capture functionality only supports primitive or logical types for the Kafka message key. The Kafka Connect Protobuf Converter does not support stand only primitives or logical types.)
- Each source topic must contain messages which conform to the same schema. Interlacing messages in the same Kafka topic which conform to different Kafka Connect schema is not currently supported.
- Schema changes are not currently supported.
Parent topic: Kafka Connect Mutation Builder
8.1.4.5.2 Primary Key
A primary key field is created in the output as a column named
gg_id
. The value of this field is the concatentated topic name,
partition, and offset delimited by the :
character. For example:
TOPIC1:0:1001
.
Parent topic: Kafka Connect Mutation Builder
8.1.4.5.3 Kafka Message Key
The message key is mapped into a called named
gg_key
.
Parent topic: Kafka Connect Mutation Builder
8.1.4.5.4 Kafka Connect Supported Types
- String
- 8 bit Integer
- 16 bit Integer
- 32 bit Integer
- 64 bit Integer
- Boolean
- 32 bit Float
- 64 bit Float
- Bytes (binary)
- Decimal
- Timestamp
- Date
- Time
Supported Container Types
- Array – Only arrays of primitive or logical types are supported. Data is mapped as a binary field the value of which is a JSON array document containing the contents of the source array.
- List – Only lists of primitive or logical types are supported. Data is mapped as a binary field the value of which is a JSON document containing the contents of the source list.
Parent topic: Kafka Connect Mutation Builder
8.1.4.5.5 How to Enable the Kafka Connect Mutation Builder
The Kafka Connect Mutation Builder is enabled by configuration of the Kafka Connect key and value converters in the Kafka Producer properties file.
key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter
For the Kafka Connect Avro Converter
key.converter=io.confluent.connect.avro.AvroConverter value.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://localhost:8081 value.converter.schema.registry.url=http://localhost:8081
The Kafka Capture functionality reads the Kafka producer properties file. If the Kafka Connect converters are configured, then the Kafka Connect mutation builder is invoked.
Sample metadata from the trail file using logdump
2021/08/03 09:06:05.243.881 Metadata Len 1951 RBA 1335 Table Name: TEST.KC * 1)Name 2)Data Type 3)External Length 4)Fetch Offset 5)Scale 6)Level 7)Null 8)Bump if Odd 9)Internal Length 10)Binary Length 11)Table Length 12)Most Sig DT 13)Least Sig DT 14)High Precision 15)Low Precision 16)Elementary Item 17)Occurs 18)Key Column 19)Sub DataType 20)Native DataType 21)Character Set 22)Character Length 23)LOB Type 24)Partial Type 25)Remarks * TDR version: 11 Definition for table TEST.KC Record Length: 36422 Columns: 30 gg_id 64 8000 0 0 0 0 0 8000 8000 0 0 0 0 0 1 0 1 0 12 -1 0 0 0 gg_key 64 4000 8005 0 0 1 0 4000 4000 0 0 0 0 0 1 0 0 0 -1 -1 0 1 0 string_required 64 4000 12010 0 0 1 0 4000 4000 0 0 0 0 0 1 0 0 0 -1 -1 0 1 0 string_optional 64 4000 16015 0 0 1 0 4000 4000 0 0 0 0 0 1 0 0 0 -1 -1 0 1 0 byte_required 134 23 20020 0 0 1 0 8 8 8 0 0 0 0 1 0 0 0 4 -1 0 0 0 byte_optional 134 23 20031 0 0 1 0 8 8 8 0 0 0 0 1 0 0 0 4 -1 0 0 0 short_required 134 23 20042 0 0 1 0 8 8 8 0 0 0 0 1 0 0 0 4 -1 0 0 0 short_optional 134 23 20053 0 0 1 0 8 8 8 0 0 0 0 1 0 0 0 4 -1 0 0 0 integer_required 134 23 20064 0 0 1 0 8 8 8 0 0 0 0 1 0 0 0 4 -1 0 0 0 integer_optional 134 23 20075 0 0 1 0 8 8 8 0 0 0 0 1 0 0 0 4 -1 0 0 0 long_required 134 23 20086 0 0 1 0 8 8 8 0 0 0 0 1 0 0 0 -5 -1 0 0 0 long_optional 134 23 20097 0 0 1 0 8 8 8 0 0 0 0 1 0 0 0 -5 -1 0 0 0 boolean_required 0 2 20108 0 0 1 0 1 1 0 0 0 0 0 1 0 0 4 -2 -1 0 0 0 boolean_optional 0 2 20112 0 0 1 0 1 1 0 0 0 0 0 1 0 0 4 -2 -1 0 0 0 float_required 141 50 20116 0 0 1 0 8 8 8 0 0 0 0 1 0 0 0 6 -1 0 0 0 float_optional 141 50 20127 0 0 1 0 8 8 8 0 0 0 0 1 0 0 0 6 -1 0 0 0 double_required 141 50 20138 0 0 1 0 8 8 8 0 0 0 0 1 0 0 0 8 -1 0 0 0 double_optional 141 50 20149 0 0 1 0 8 8 8 0 0 0 0 1 0 0 0 8 -1 0 0 0 bytes_required 64 8000 20160 0 0 1 0 4000 4000 0 0 0 0 0 1 0 0 4 -4 -1 0 1 0 bytes_optional 64 8000 24165 0 0 1 0 4000 4000 0 0 0 0 0 1 0 0 4 -4 -1 0 1 0 decimal_required 64 50 28170 0 0 1 0 50 50 0 0 0 0 0 1 0 0 0 12 -1 0 0 0 decimal_optional 64 50 28225 0 0 1 0 50 50 0 0 0 0 0 1 0 0 0 12 -1 0 0 0 timestamp_required 192 29 28280 0 0 1 0 29 29 29 0 6 0 0 1 0 0 0 11 -1 0 0 0 timestamp_optional 192 29 28312 0 0 1 0 29 29 29 0 6 0 0 1 0 0 0 11 -1 0 0 0 date_required 192 10 28344 0 0 1 0 10 10 10 0 2 0 0 1 0 0 0 9 -1 0 0 0 date_optional 192 10 28357 0 0 1 0 10 10 10 0 2 0 0 1 0 0 0 9 -1 0 0 0 time_required 192 18 28370 0 0 1 0 18 18 18 3 6 0 0 1 0 0 0 10 -1 0 0 0 time_optional 192 18 28391 0 0 1 0 18 18 18 3 6 0 0 1 0 0 0 10 -1 0 0 0 array_optional 64 8000 28412 0 0 1 0 4000 4000 0 0 0 0 0 1 0 0 4 -4 -1 0 1 0 map_optional 64 8000 32417 0 0 1 0 4000 4000 0 0 0 0 0 1 0 0 4 -4 -1 0 1 0 End of definition
Parent topic: Kafka Connect Mutation Builder
8.1.4.6 Example Configuration Files
8.1.4.6.1 Example kc.prm file
EXTRACT KC OGGSOURCE KAFKA JVMOOPTIONS CLASSPATH ggjava/ggjava.jar:/path/to/kafka/libs/* TRANLOGOPTIONS GETMETADATAFROMVAM --Uncomment the following line to disable Kafka message gap checking. --SETENV (PERFORMMESSAGEGAPCHECK = "false") TRANLOGOPTIONS KAFKACONSUMERPROPERTIES kafka_consumer.properties EXTTRAIL dirdat/kc TABLE TEST.KC;
Parent topic: Example Configuration Files
8.1.4.6.2 Example Kafka Consumer Properties File
#Kafka Properties bootstrap.servers=localhost:9092 group.id=someuniquevalue key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer #JSON Converter Settings #Uncomment to use the Kafka Connect Mutation Builder with JSON Kafka Connect Messages #key.converter=org.apache.kafka.connect.json.JsonConverter #value.converter=org.apache.kafka.connect.json.JsonConverter #Avro Converter Settings #Uncomment to use the Kafka Connect Mutation Builder with Avro Kafka Connect Messages #key.converter=io.confluent.connect.avro.AvroConverter #value.converter=io.confluent.connect.avro.AvroConverter #key.converter.schema.registry.url=http://localhost:8081 #value.converter.schema.registry.url=http://localhost:8081
Parent topic: Example Configuration Files