12 Using the Kafka REST Proxy Handler
Learn how to use the Kafka REST Proxy Handler to stream messages to the Kafka REST Proxy distributed by Confluent.
Topics:
12.1 Overview
The Kafka REST Proxy Handler allows Kafka messages to be streamed using an HTTPS protocol. The use case for this functionality is to stream Kafka messages from an Oracle GoldenGate On Premises installation to cloud or alternately from cloud to cloud.
The Kafka REST proxy provides a RESTful interface to a Kafka cluster. It makes it easy for you to:
-
produce and consume messages,
-
view the state of the cluster,
-
and perform administrative actions without using the native Kafka protocol or clients.
Kafka REST Proxy is part of the Confluent Open Source and Confluent Enterprise distributions. It is not available in the Apache Kafka distribution. To access Kafka through the REST proxy, you have to install the Confluent Kafka version see https://docs.confluent.io/current/kafka-rest/docs/index.html.
Parent topic: Using the Kafka REST Proxy Handler
12.2 Setting Up and Starting the Kafka REST Proxy Handler Services
You have several installation formats to choose from including ZIP or tar archives, Docker, and Packages.
- Using the Kafka REST Proxy Handler
- Downloading the Dependencies
- Classpath Configuration
- Kafka REST Proxy Handler Configuration
- Review a Sample Configuration
- Security
- Generating a Keystore or Truststore
- Using Templates to Resolve the Topic Name and Message Key
- Kafka REST Proxy Handler Formatter Properties
Parent topic: Using the Kafka REST Proxy Handler
12.2.1 Using the Kafka REST Proxy Handler
You must download and install the Confluent Open Source or Confluent Enterprise Distribution because the Kafka REST Proxy is not included in Apache, Cloudera, or Hortonworks. You have several installation formats to choose from including ZIP or TAR archives, Docker, and Packages.
The Kafka REST Proxy has dependency on ZooKeeper, Kafka, and the Schema Registry
12.2.2 Downloading the Dependencies
You can review and download the Jersey RESTful Web Services in Java client dependency from:
https://jersey.github.io/download.html
You can review and download the Jersey Apache Connector dependencies from the maven repository:
https://jersey.github.io/project-info/2.27/jersey/project/jersey-apache-connector/dependencies.html
12.2.3 Classpath Configuration
The Kafka REST Proxy handler uses the Jersey project jersey-client
version 2.27 and jersey-connectors-apache
version 2.27 to connect to Kafka. Oracle GoldenGate for Big Data does not include the required dependencies so you must obtain them, see Downloading the Dependencies.
You have to configure these dependencies using the gg.classpath
property in the Java Adapter properties file. This is an example of a correctly configured classpath for the Kafka REST Proxy Handler:
gg.classpath=dirprm:
{path_to_jersey_client_jars}/jaxrs-ri/lib/*:{path_to_jersey_client_jars}
/jaxrs-ri/api/*
:{path_to_jersey_client_jars}/jaxrs-ri/ext/*:{path_to_jersey_client_jars}
/connector/*
12.2.4 Kafka REST Proxy Handler Configuration
The following are the configurable values for the Kafka REST Proxy Handler. Oracle recommend that you store the Kafka REST Proxy properties file in the Oracle GoldenGate dirprm
directory.
To enable the selection of the Kafka REST Proxy Handler, you must first configure the handler type by specifying gg.handler.name.type=kafkarestproxy
and the other Kafka REST Proxy Handler properties as follows:
Properties | Required/ Optional | Legal Values | Default | Explanation |
---|---|---|---|---|
|
Required |
|
None |
The configuration to select the Kafka REST Proxy Handler. |
|
Required |
A template string value to resolve the Kafka topic name at runtime. |
None |
See Using Templates to Resolve the Topic Name and Message Key. |
|
Required |
A template string value to resolve the Kafka message key at runtime. |
None |
See Using Templates to Resolve the Topic Name and Message Key. |
|
Required |
The Listener address of the Rest Proxy. |
None |
Set to the URL of the Kafka REST proxy. |
|
Required |
|
None |
Set to the REST proxy payload data format |
|
Optional |
A value representing the payload size in mega bytes. |
|
Set to the maximum size of the payload of the HTTP messages. |
|
Optional |
|
|
Sets the API version to use. |
|
Optional |
|
|
Sets how operations are processed. In |
|
Optional |
Path to the truststore. |
None |
Path to the truststore file that holds certificates from trusted certificate authorities (CA). These CAs are used to verify certificates presented by the server during an SSL connection, see Generating a Keystore or Truststore. |
|
Optional |
Password of the truststore. |
None |
The truststore password. |
|
Optional |
Path to the keystore. |
None |
Path to the keystore file that the private key and identity certificate, which are presented to other parties (server or client) to verify its identity, see Generating a Keystore or Truststore. |
|
Optional |
Password of the keystore. |
None |
The keystore password. |
|
Optional |
None |
Proxy URL in the http://host:port format. |
|
|
Optional |
Any string. |
None |
The proxy user name. |
|
Optional |
Any string. |
None |
The proxy password. |
|
Optional |
Integer value. |
None |
The amount of time allowed for the server to respond. |
|
Optional |
Integer value. |
None |
The amount of time to wait to establish the connection to the host. |
See Using Templates to Resolve the Stream Name and Partition Name for more information.
12.2.5 Review a Sample Configuration
The following is a sample configuration for the Kafka REST Proxy Handler from the Java Adapter properties file:
gg.handlerlist=kafkarestproxy #The handler properties gg.handler.kafkarestproxy.type=kafkarestproxy #The following selects the topic name based on the fully qualified table name gg.handler.kafkarestproxy.topicMappingTemplate=${fullyQualifiedTableName} #The following selects the message key using the concatenated primary keys gg.handler.kafkarestproxy.keyMappingTemplate=${primaryKeys} gg.handler.kafkarestproxy.postDataUrl=http://localhost:8083 gg.handler.kafkarestproxy.apiVersion=v1 gg.handler.kafkarestproxy.format=json gg.handler.kafkarestproxy.payloadsize=1 gg.handler.kafkarestproxy.mode=tx #Server auth properties #gg.handler.kafkarestproxy.trustStore=/keys/truststore.jks #gg.handler.kafkarestproxy.trustStorePassword=test1234 #Client auth properites #gg.handler.kafkarestproxy.keyStore=/keys/keystore.jks #gg.handler.kafkarestproxy.keyStorePassword=test1234 #Proxy properties #gg.handler.kafkarestproxy.proxy=http://proxyurl:80 #gg.handler.kafkarestproxy.proxyUserName=username #gg.handler.kafkarestproxy.proxyPassword=password #The MetaColumnTemplate formatter properties gg.handler.kafkarestproxy.format.metaColumnsTemplate=${optype},${timestampmicro},${currenttimestampmicro}
12.2.6 Security
Security is possible between the following:
-
Kafka REST Proxy clients and the Kafka REST Proxy server. The Oracle GoldenGate REST Proxy Handler is a Kafka REST Proxy client.
-
The Kafka REST Proxy server and Kafka Brokers. Oracle recommends that you thoroughly review the security documentation and configuration of the Kafka REST Proxy server, see https://docs.confluent.io/current/kafka-rest/docs/index.html
REST Proxy supports SSL for securing communication between clients and the Kafka REST Proxy Handler. To configure SSL:
-
Generate a keystore using the scripts, see Generating a Keystore or Truststore.
-
Update the Kafka REST Proxy server configuration in the
kafka-rest.properties
file with these properties:listeners=https://hostname:8083 confluent.rest.auth.propagate.method=SSL Configuration Options for HTTPS ssl.client.auth=true ssl.keystore.location={keystore_file_path}/server.keystore.jks ssl.keystore.password=test1234 ssl.key.password=test1234 ssl.truststore.location={keystore_file_path}/server.truststore.jks ssl.truststore.password=test1234 ssl.keystore.type=JKS ssl.truststore.type=JKS ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
-
Restart your server.
To disable mutual authentication, you update the ssl.client.auth=
property from true
to false
.
12.2.7 Generating a Keystore or Truststore
Generating a Truststore
You execute this script to generate the ca-cert
, ca-key
, and truststore.jks
truststore files.
#!/bin/bash PASSWORD=password CLIENT_PASSWORD=password VALIDITY=365
Then you generate a CA as in this example:
openssl req -new -x509 -keyout ca-key -out ca-cert -days $VALIDITY -passin pass:$PASSWORD -passout pass:$PASSWORD -subj "/C=US/ST=CA/L=San Jose/O=Company/OU=Org/CN=FQDN" -nodes
Lastly, you add the CA to the server's truststore using keytool
:
keytool -keystore truststore.jks -alias CARoot -import -file ca-cert -storepass $PASSWORD -keypass $PASSWORD
Generating a Keystore
You run this script and pass the fqdn
as argument to generate the ca-cert.srl
, cert-file
, cert-signed
, and keystore.jks
keystore files.
#!/bin/bash PASSWORD=password VALIDITY=365 if [ $# -lt 1 ]; then echo "`basename $0` host fqdn|user_name|app_name" exit 1 fi CNAME=$1 ALIAS=`echo $CNAME|cut -f1 -d"."`
Then you generate the keystore with keytool
as in this example:
keytool -noprompt ¿keystore keystore.jks -alias $ALIAS -keyalg RSA -validity $VALIDITY -genkey -dname "CN=$CNAME,OU=BDP,O=Company,L=San Jose,S=CA,C=US" -storepass $PASSWORD -keypass $PASSWORD
Next, you sign all the certificates in the keystore with the CA:
keytool -keystore keystore.jks -alias $ALIAS -certreq -file cert-file -storepass $PASSWORDopenssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days $VALIDITY -CAcreateserial -passin pass:$PASSWORD
Lastly, you import both the CA and the signed certificate into the keystore:
keytool -keystore keystore.jks -alias CARoot -import -file ca-cert -storepass $PASSWORDkeytool -keystore keystore.jks -alias $ALIAS -import -file cert-signed -storepass $PASSWORD
12.2.7.1 Setting Metacolumn Output
The following are the configurable values for the Kafka REST Proxy Handler metacolumns template property that controls metacolumn output.
Table 12-1 Metacolumns Template Property
Properties | Required/ Optional | Legal Values | Default | Explanation |
---|---|---|---|---|
gg.handler.name.format.metaColumnsTemplate |
Optional |
| ${static} for a static value. It propagates a
static value. Therefore, ${static.MYVALUE}
propagates MYVALUE to the generated field.
|
None |
The current meta column information can be configured in a simple manner and removes the explicit need to use: insertOpKey | updateOpKey | deleteOpKey | truncateOpKey | includeTableName | includeOpTimestamp | includeOpType | includePosition | includeCurrentTimestamp, useIso8601Format It is a comma-delimited string consisting of one or more templated values that represent the template. |
This is an example that would produce a list of metacolumns:
${optype}, ${token.ROWID}, ${sys.username}, ${currenttimestamp}
Explanation of the Metacolumn Keywords
The metacolumns functionality allows you to select the metadata fields that you want to see in the generated output messages. The format of the metacolumn syntax is:
-
${keyword[fieldName].argument}
-
The keyword is fixed based on the metacolumn syntax. Optionally, you can provide a field name between the square brackets. If a field name is not provided, then the default field name is used.
The argument is required to resolve the metacolumn value.
-
${alltokens}
-
All of the Oracle GoldenGate tokens.
-
${token}
-
The value of a specific Oracle GoldenGate token. The token key should follow token key should follow the token using the period (
.
) operator. For example:${token.MYTOKEN}
-
${sys}
-
A system environmental variable. The variable name should follow
sys
using the period (.
) operator. For example:${sys.MYVAR}
-
${env}
-
An Oracle GoldenGate environment variable. The variable name should follow
env
using the period (.
) operator. For example:${env.someVariable}
-
${javaprop}
-
A Java JVM variable. The variable name should follow
javaprop
using the period (.
) operator. For example:${javaprop.MYVAR}
-
${optype}
-
Operation type.
-
${position}
-
Record position.
-
${timestamp}
-
Record timestamp.
-
${catalog}
-
Catalog name.
-
${schema}
-
Schema name.
-
${table}
-
Table name.
-
${objectname}
-
The fully qualified table name.
-
${csn}
-
Source Commit Sequence Number.
-
${xid}
-
Source transaction ID.
-
${currenttimestamp}
-
Current timestamp.
-
${opseqno}
-
Record sequence number within the transaction.
-
${timestampmicro}
-
Record timestamp in microseconds after epoch.
-
${currenttimestampmicro}
-
Current timestamp in microseconds after epoch.
-
${txind}
-
The is the transactional indicator from the source trail file. The values of a transaction are
B
for the first operation,M
for the middle operations,E
for the last operation, orW
for whole if there is only one operation. Filtering operations or the use of coordinated apply negate the usefulness of this field. -
${static}
-
Use to inject a field with a static value into the output. The value desired should be the argument. If the desired value is
abc
, then the syntax would be${static.abc}
or${static[FieldName].abc}
.
Sample Configuration:
gg.handlerlist=kafkarestproxy
#The handler properties
gg.handler.kafkarestproxy.type=kafkarestproxy
#The following selects the topic name based on the fully qualified table name
gg.handler.kafkarestproxy.topicMappingTemplate=${fullyQualifiedTableName}
#The following selects the message key using the concatenated primary keys
gg.handler.kafkarestproxy.keyMappingTemplate=${primaryKeys}
gg.handler.kafkarestproxy.postDataUrl=http://localhost:8083
gg.handler.kafkarestproxy.apiVersion=v1
gg.handler.kafkarestproxy.format=json
gg.handler.kafkarestproxy.payloadsize=1
gg.handler.kafkarestproxy.mode=tx
#Server auth properties
#gg.handler.kafkarestproxy.trustStore=/keys/truststore.jks
#gg.handler.kafkarestproxy.trustStorePassword=test1234
#Client auth properites
#gg.handler.kafkarestproxy.keyStore=/keys/keystore.jks
#gg.handler.kafkarestproxy.keyStorePassword=test1234
#Proxy properties
#gg.handler.kafkarestproxy.proxy=http://proxyurl:80
#gg.handler.kafkarestproxy.proxyUserName=username
#gg.handler.kafkarestproxy.proxyPassword=password
#The MetaColumnTemplate formatter properties
gg.handler.kafkarestproxy.format.metaColumnsTemplate=${optype},${timestampmicro},${currenttimestampmicro}
Parent topic: Generating a Keystore or Truststore
12.2.8 Using Templates to Resolve the Topic Name and Message Key
The Kafka REST Proxy Handler provides functionality to resolve the topic name and the message key at runtime using a template configuration value. Templates allow you to configure static values and keywords. Keywords are used to dynamically replace the keyword with the context of the current processing. The templates use the following configuration properties:
gg.handler.name.topicMappingTemplate
gg.handler.name.keyMappingTemplate
Template Modes
The Kafka REST Proxy Handler can be configured to send one message per operation (insert, update, delete). Alternatively, it can be configured to group operations into messages at the transaction level.
Template Keywords
This table includes a column if the keyword is supported for transaction level messages.
Keyword | Explanation | Transaction Message Support |
---|---|---|
|
Resolves to the fully qualified table name including the period (.) delimiter between the catalog, schema, and table names. For example, |
No |
|
Resolves to the catalog name. |
No |
|
Resolves to the schema name. |
No |
|
Resolves to the short table name. |
No |
|
Resolves to the type of the operation: ( |
No |
|
Resolves to the concatenated primary key values delimited by an underscore (_) character. |
No |
|
The sequence number of the source trail file followed by the offset (RBA). |
Yes |
|
The operation timestamp from the source trail file. |
Yes |
|
Resolves to “”. |
Yes |
|
Resolves to the name of the Replicat process. If using coordinated delivery, it resolves to the name of the Replicat process with the Replicate thread number appended. |
Yes |
|
Resolves to a static value where the key is the fully-qualified table name. The keys and values are designated inside of the square brace in the following format:
|
No |
|
Resolves to a column value where the key is the fully-qualified table name and the value is the column name to be resolved. For example:
|
No |
Or
|
Resolves to the current timestamp. You can control the format of the current timestamp using the Java based formatting as described in the Examples:
|
Yes |
|
Resolves to a NULL string. |
Yes |
|
It is possible to write a custom value resolver. If required, contact Oracle Support. |
Implementation dependent |
Example Templates
The following describes example template configuration values and the resolved values.
Example Template | Resolved Value |
---|---|
|
|
|
|
|
|
12.2.9 Kafka REST Proxy Handler Formatter Properties
The following are the configurable values for the Kafka REST Proxy Handler Formatter.
Table 12-2 Kafka REST Proxy Handler Formatter Properties
Properties | Optional/ Optional | Legal Values | Default | Explanation |
---|---|---|---|---|
gg.handler.name.format.includeOpType |
Optional |
|
|
Set to Set to |
gg.handler.name.format.includeOpTimestamp |
Optional |
|
|
Set to Set to |
gg.handler.name.format.includeCurrentTimestamp |
Optional |
|
|
Set to Set to |
gg.handler.name.format.includePosition |
Optional |
|
|
Set to Set to |
gg.handler.name.format.includePrimaryKeys |
Optional |
|
|
Set to Set to |
gg.handler.name.format.includeTokens |
Optional |
|
|
Set to Set to |
gg.handler.name.format.insertOpKey |
Optional |
Any string. |
|
The value of the field |
gg.handler.name.format.updateOpKey |
Optional |
Any string. |
|
The value of the field |
gg.handler.name.format.deleteOpKey |
Optional |
Any string. |
|
The value of the field |
gg.handler.name.format.truncateOpKey |
Optional |
Any string. |
|
The value of the field |
gg.handler.name.format.treatAllColumnsAsStrings |
Optional |
|
|
Set to Set to |
gg.handler.name.format.mapLargeNumbersAsStrings |
Optional |
|
|
Set to |
gg.handler.name.format.iso8601Format |
Optional |
|
|
Set to |
gg.handler.name.format.pkUpdateHandling |
Optional |
|
|
It is only applicable if you are modeling row messages with the |
12.3 Consuming the Records
A simple way to consume data from Kafka topics using the Kafka REST Proxy Handler is Curl.
Consume JSON Data
-
Create a consumer for JSON data.
curl -k -X POST -H "Content-Type: application/vnd.kafka.v2+json" https://localhost:8082/consumers/my_json_consumer
-
Subscribe to a topic.
curl -k -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["topicname"]}' \ https://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription
-
Consume records.
curl –k -X GET -H "Accept: application/vnd.kafka.json.v2+json" \ https://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records
Consume Avro Data
-
Create a consumer for Avro data.
curl -k -X POST -H "Content-Type: application/vnd.kafka.v2+json" \ --data '{"name": "my_consumer_instance", "format": "avro", "auto.offset.reset": "earliest"}' \ https://localhost:8082/consumers/my_avro_consumer
-
Subscribe to a topic.
curl –k -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["topicname"]}' \ https://localhost:8082/consumers/my_avro_consumer/instances/my_consumer_instance/subscription
-
Consume records.
curl -X GET -H "Accept: application/vnd.kafka.avro.v2+json" \ https://localhost:8082/consumers/my_avro_consumer/instances/my_consumer_instance/records
Note:
If you are usingcurl
from the machine hosting the REST proxy, then unset
the http_proxy
environmental variable before consuming the messages. If you are
using curl
from the local machine to get messages from the Kafka REST Proxy,
then setting the http_proxy
environmental variable may be required.
Parent topic: Using the Kafka REST Proxy Handler
12.4 Performance Considerations
There are several configuration settings both for the Oracle GoldenGate for Big Data configuration and in the Kafka producer that affects performance.
The Oracle GoldenGate parameter that has the greatest affect on performance is the Replicat GROUPTRANSOPS
parameter. It allows Replicat to group multiple source transactions into a single target transaction. At transaction commit, the Kafka REST Proxy Handler POST
’s the data to the Kafka Producer.
Setting the Replicat GROUPTRANSOPS
to a larger number allows the Replicat to call the POST
less frequently improving performance. The default value for GROUPTRANSOPS
is 1000 and performance can be improved by increasing the value to 2500, 5000, or even 10000.
Parent topic: Using the Kafka REST Proxy Handler
12.5 Kafka REST Proxy Handler Metacolumns Template Property
Problems Starting Kafka REST Proxy server
The script to start the Kafka REST Proxy server appends its
CLASSPATH
to the environment CLASSPATH
variable. If set, the environment CLASSPATH
can contain JAR files
that conflict with the correct execution of the Kafka REST Proxy server and may
prevent it from starting. Oracle recommends that you unset the
CLASSPATH
environmental variable before started your Kafka REST
Proxy server. Reset the CLASSPATH
to “”
to
overcome the problem.
Parent topic: Using the Kafka REST Proxy Handler