Note:
- This tutorial requires access to Oracle Cloud. To sign up for a free account, see Get started with Oracle Cloud Infrastructure Free Tier.
- It uses example values for Oracle Cloud Infrastructure credentials, tenancy, and compartments. When completing your lab, substitute these values with ones specific to your cloud environment.
Stream analytics using Oracle Cloud Infrastructure Streaming and Oracle Database
Introduction
Oracle Cloud Infrastructure Streaming is a highly scalable and highly available streaming service on Oracle Cloud Infrastructure (OCI). The Stream service is fully serverless and API compatible with Apache Kafka.
Oracle SQL access to Kafka is an Oracle PL/SQL package and an external table preprocessor. It enables Oracle Database to read and process events and messages from Kafka Topics as Views or Tables in Oracle Database. Once the data from Kafka is in an Oracle Database table, or visible from an Oracle Database view, it can be queried with the full power of Oracle PL/SQL, just like any other data in Oracle Database.
When you retrieve data with Oracle SQL access to Kafka enabled views, data is not persisted in the Oracle Database. But when you use the Oracle SQL access to Kafka powered tables, it is persisted in the Oracle Database. So Oracle SQL access to Kafka gives developers complete flexibility about the persistence of the streaming data on Oracle Database. For a good overview and use case for Oracle SQL access to Kafka, refer to this blog post, Integrating Data-in-Motion with Data-at-Rest using Oracle SQL Access to Kafka Views.
In short, Oracle SQL access to Kafka allows data in motion to be processed with data at rest (inside Oracle Database tables) using Oracle PL/SQL. Hence, applications, for example, Java Database Connectivity (JDBC) applications can process real-time events from Kafka and critical Oracle Database data, within Oracle Database transactions giving ACID guarantees. This is not easily possible when an application fetches Kafka events and data from Oracle Database separately.
Benefits
- Customers can use Oracle SQL access to Kafka to run real-time stream analytics jobs using Oracle PL/SQL by directly reading data from the Stream service, without ever moving it to an external data store.
- Customers can also use Oracle SQL access to Kafka purely for data movement from the Stream service to the Oracle Database without any processing.
- The stream processing operation can be performed within the context of an Oracle ACID transaction controlled by the application.
- Oracle SQL access to Kafka only acts as a Kafka consumer and never as a Kafka producer. Entire offset management is handled by OSaK. It stores this information in metadata tables in Oracle Database. Hence, Oracle SQL access to Kafka enables exactly-once processing semantics since it can commit partition offset for Kafka and the Stream service and application data, in a single ACID-compliant Oracle Database transaction to Oracle Database. This eliminates losing or re-reading of streaming records.
Use Cases
Imagine any use case where you want to relate or join your streaming data with your relational data, for example:
- You want to combine the streaming data from your chatty IoT devices (most probably present on your customer premises) with the relevant customer information, stored in your source truth relational Oracle Database.
- You want to calculate precise exponential moving averages for stock prices that are streaming into the Stream service. You need to do this with exactly-once semantics. You want to combine this data with static information about that stock like its name, company-id, market cap, etc., stored in your Oracle Database.
Please note, as Oracle SQL access to Kafka is a PL/SQL package that needs to be manually installed on Oracle Server Host, it can only work with self-managed (on-premises or on-cloud) Oracle Database installations. It can not work with serverless Oracle Database offerings like Oracle Autonomous Database on Oracle Cloud Infrastructure (OCI).
Since the Stream service is API compatible with Kafka, it works seamlessly with Oracle SQL access to Kafka. From the Oracle SQL access to Kafka point of view, a Stream service stream pool is a Kafka cluster and a Stream service stream is a topic in the Kafka cluster.
In this tutorial, we will show how easily we can integrate Oracle SQL access to Kafka with the Stream service.
Note: if you are already familiar with Oracle SQL access to Kafka, the Stream service, and Kafka and want to use Oracle SQL access to Kafka with the Stream service, you may jump directly to Set Up Oracle Cloud Infrastructure Streaming Clusters for Oracle SQL access to Kafka step 2.1. Skim through the rest of the tutorial as needed.
Prerequisites
- OCI account or tenancy with authorization to create and use the Stream service.
- Apache Kafka cluster 0.10.2 or later. The Stream service satisfies this requirement.
- Oracle Database 12.2 or later where Oracle SQL access to Kafka will install (version 19c is used in this tutorial). We are using Oracle Linux as the platform OS for the Oracle Database. Instructions by and large remain the same for other platforms.
- Java 8 or later installed on the Oracle Database host.
Oracle Cloud Infrastructure Streaming and Oracle SQL Access to Kafka Integration
Create a Stream Pool and Stream in Oracle Cloud Infrastructure
-
Log in to your OCI account/tenancy and set up a Stream service stream pool named StreampoolForOsak and stream named StreamForOsak as follows.
-
We now create a stream named StreamForOsak in the streampool StreampoolForOsak we just created.
For these resource creations, you can use any of your existing compartments. For our convenience, we have created a new compartment named OssOsakIntegration and all the resources are in the same compartment.
In the Stream service terminology, Kafka topics are called Streams. So, from the Oracle SQL access to Kafka point of view, stream StreamForOsak is a Kafka topic with three partitions.
Now we are done with the Stream service stream creation. Hereafter, the terms the Stream Service and Kafka, are interchangeable. Similarly the terms Stream and Kafka Topic are interchangeable.
Create a User, Group, and Policies in Oracle Cloud Infrastructure
If you already have an existing user with the right authorizations for using the Stream service, step 2 can be skipped entirely.
-
To use the Stream service streams with Oracle SQL access to Kafka, we need to create a new OCI user for it. We create a new user for this purpose with the username OssOsakUser, from the OCI web console as follows:
-
For user OssOsakUser to authenticate itself with the Stream service (using Kafka APIs), we need to create an auth-token for this new user, as follows:
After you have generated the token, you get first and the only chance to see and copy the auth-token. Hence please copy the auth-token and keep it securely in a place from where you can access it later. You need it in later steps, specifically when we configure the Kafka cluster for Oracle SQL access to Kafka. Oracle SQL access to Kafka will use this username namely OssOsakUser and its auth-token to access the Stream service, using Kafka Consumer APIs internally.
-
The OssOsakUser also needs to have the right set of privileges in OCI IAM to access the Stream service cluster.
In OCI, users acquire privileges with the help of policies assigned to user-groups they are part of. Hence, we need to create a group for OssOsakUser, followed by a policy for that group.
Create a user group as shown below:
-
Add user OssOsakUser to the user-group OssOsakUserGroup.
-
To authorize OssOsakUser to use the Stream service stream, specifically to publish and read messages from it, we need to create the following policy in the tenancy. This policy, once created grants privileges to all users in group OssOsakUserGroup. As user OssOsakUser is in the same group, it acquires the same privileges.
Text snippet for the same above policy is as follows.
Allow group OssOsakUserGroup to manage streams in compartment OssOsakIntegration Allow group OssOsakUserGroup to manage stream-push in compartment OssOsakIntegration Allow group OssOsakUserGroup to manage stream-pull in compartment OssOsakIntegration Allow group OssOsakUserGroup to manage stream-family in compartment OssOsakIntegration
Install Oracle SQL Access to Kafka on an Oracle Database Host
-
The Oracle SQL access to Kafka kit is available as part of a widely used SQL Developer.
Use the official SQL Developer link to download the latest version of SQL Developer. As of the writing of this tutorial, the latest SQL Developer version is 20.4.
Make sure to download Oracle SQL Developer for the same platform as your Oracle Database host. On our dev-box, we download Oracle SQL Developer for Oracle Linux RPM (as our Oracle Database is running on Oracle Linux Platform).
After the download is complete, extract the contents of the RPM/zip file of the Oracle SQL Developer to any directory/folder using
unzip
ortar
command.tar xvf sqldeveloper-20.4.0.379.2205-20.4.0-379.2205.noarch.rpm
-
Go to the directory where Oracle SQL Developer contents were extracted and find the Oracle SQL access to Kafka ZIP file named
orakafka.zip
as follows:$ find . -name 'orakafta*' ./sqldeveloper/orakafta ./sqldeveloper/orakafta/orakafta.zip
The Oracle SQL access to Kafka kit is in the
orakafka.zip
file. We are not interested in the rest of the contents of the SQL developer for this demo. As far as installation and usage of Oracle SQL access to Kafka are concerned,orakafka.zip
is all we need. -
Copy the
orakafka.zip
file to the Oracle Database host using thescp
command or GUI-based FTP clients like FileZilla.SSH into Oracle Database node and move
orakafka.zip
to/home/oracle
(home directory of oracle user) with themv_ command
. -
For the rest of the instructions for installing Oracle SQL access to Kafka, we need to switch to the oracle user on the Oracle Database host.
Make sure your current working directory is
/home/oracle
. We haveorakafka.zip
already in the same directory.[opc@dbass ~]$ sudo su - oracle Last login: Sat Feb 20 09:31:12 UTC 2021 [oracle@dbass ~]$ pwd /home/oracle [oracle@dbass ~]$ ls -al total 3968 drwx------ 6 oracle oinstall 4096 Feb 19 17:39 . drwxr-xr-x 5 root root 4096 Feb 18 15:15 .. -rw------- 1 oracle oinstall 4397 Feb 20 09:31 .bash_history -rw-r--r-- 1 oracle oinstall 18 Nov 22 2019 .bash_logout -rw-r--r-- 1 oracle oinstall 203 Feb 18 15:15 .bash_profile -rw-r--r-- 1 oracle oinstall 545 Feb 18 15:20 .bashrc -rw-r--r-- 1 oracle oinstall 172 Apr 1 2020 .kshrc drwxr----- 3 oracle oinstall 4096 Feb 19 17:37 .pki drwxr-xr-x 2 oracle oinstall 4096 Feb 18 15:20 .ssh -rw-r--r-- 1 oracle oinstall 4002875 Feb 19 17:38 orakafka.zip
-
Extract or unzip
orakafka.zip
. This will create a new directory namedorakafka-<version>
with the extracted contents. In our case, it isorakafka-1.2.0
as follows:[oracle@dbass ~]$ unzip orakafka.zip Archive: orakafka.zip creating: orakafka-1.2.0/
extracting: orakafka-1.2.0/kit_version.txt
inflating: orakafka-1.2.0/orakafka_distro_install.sh
extracting: orakafka-1.2.0/orakafka.zip
inflating: orakafka-1.2.0/README
6. Now we follow the instructions found in the _**orakafka-1.2.0/README**_ for the setup of Oracle SQL access to Kafka. We follow _simple install_ for single-instance Oracle Database.
This README doc has instructions for Oracle SQL access to Kafka installation on Oracle Real Application Clusters (Oracle RAC) as well. By and large, in the case of Oracle RAC, we need to replicate the following steps on all nodes of Oracle RAC. Please follow the README for details.
[oracle@dbass ~]$ cd orakafka-1.2.0/
[oracle@dbass orakafka-1.2.0]$ ls -al
total 3944
drwxrwxr-x 2 oracle oinstall 4096 Feb 20 09:12 .
drwx—— 6 oracle oinstall 4096 Feb 19 17:39 ..
-rw-r–r– 1 oracle oinstall 6771 Oct 16 03:11 README
-rw-r–r– 1 oracle oinstall 5 Oct 16 03:11 kit_version.txt
-rw-rw-r– 1 oracle oinstall 3996158 Oct 16 03:11 orakafka.zip
-rwxr-xr-x 1 oracle oinstall 17599 Oct 16 03:11
orakafka_distro_install.sh
tar xvf sqldeveloper-20.4.0.379.2205-20.4.0-379.2205.noarch.rpm
7. As per _./orakafka-1.2.0/README_, we install Oracle SQL access to Kafka on the Oracle Database host with the help of _./orakafka-1.2.0/orakafka\_distro\_install.sh_ script. Argument -p lets us specify the location or base directory for the Oracle SQL access to Kafka installation on this host.
We choose the newly created empty directory named ora\_kafka\_home as the OSaK base directory on this host. So the full path of the OSaK base directory will be _/home/oracle/ora\_kafka\_home_.
[oracle@dbass ~]$ ./orakafka-1.2.0/orakafka_distro_install.sh -p ./ora_kafka_home/
Step Create Product Home::
————————————————————–
…../home/oracle/ora_kafka_home already exists..
Step Create Product Home: completed.
PRODUCT_HOME=/home/oracle/ora_kafka_home
Step Create app_data home::
————————————————————–
….. creating /home/oracle/ora_kafka_home/app_data and subdirectories
……Generated CONF_KIT_HOME_SCRIPT=/home/oracle/ora_kafka_home/app_data/scripts/configure_kit_home.sh
……Generated CONF_APP_DATA_HOME_SCRIPT=/home/oracle/ora_kafka_home/configure_app_data_home.sh
Step Create app_data home: completed.
APP_DATA_HOME=/home/oracle/ora_kafka_home/app_data
Step unzip_kit::
————————————————————–
…..checking for existing binaries in /home/oracle/ora_kafka_home/orakafka
…..unzip kit into /home/oracle/ora_kafka_home/orakafka
Archive: /home/oracle/orakafka-1.2.0/orakafka.zip
creating: /home/oracle/ora_kafka_home/orakafka/
extracting: /home/oracle/ora_kafka_home/orakafka/kit_version.txt
inflating: /home/oracle/ora_kafka_home/orakafka/README
creating: /home/oracle/ora_kafka_home/orakafka/doc/
inflating: /home/oracle/ora_kafka_home/orakafka/doc/README_INSTALL
creating: /home/oracle/ora_kafka_home/orakafka/jlib/
inflating: /home/oracle/ora_kafka_home/orakafka/jlib/osakafka.jar
inflating: /home/oracle/ora_kafka_home/orakafka/jlib/kafka-clients-2.5.0.jar
inflating: /home/oracle/ora_kafka_home/orakafka/jlib/slf4j-simple-1.7.28.jar
inflating: /home/oracle/ora_kafka_home/orakafka/jlib/lz4-java-no-jni-1.7.1.jar
inflating: /home/oracle/ora_kafka_home/orakafka/jlib/snappy-java-no-jni-1.1.7.3.jar
inflating: /home/oracle/ora_kafka_home/orakafka/jlib/zstd-no-jni-1.4.4-7.jar
inflating: /home/oracle/ora_kafka_home/orakafka/jlib/slf4j-api-1.7.30.jar
creating: /home/oracle/ora_kafka_home/orakafka/bin/
inflating: /home/oracle/ora_kafka_home/orakafka/bin/orakafka_stream.sh
inflating: /home/oracle/ora_kafka_home/orakafka/bin/orakafka.sh
creating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/
inflating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/removeuser_cluster.sh
inflating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/setup_all.sh
inflating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/remove_cluster.sh
inflating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/verify_install.sh
inflating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/add_cluster.sh
inflating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/config_util.sh
inflating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/test_views.sh
inflating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/install.sh
inflating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/set_java_home.sh
inflating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/list_clusters.sh
inflating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/adduser_cluster.sh
inflating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/uninstall.sh
inflating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/test_cluster.sh
creating: /home/oracle/ora_kafka_home/orakafka/conf/
inflating: /home/oracle/ora_kafka_home/orakafka/conf/orakafka.properties.template
creating: /home/oracle/ora_kafka_home/orakafka/sql/
inflating: /home/oracle/ora_kafka_home/orakafka/sql/orakafkatab.plb
inflating: /home/oracle/ora_kafka_home/orakafka/sql/catnoorakafka.sql
inflating: /home/oracle/ora_kafka_home/orakafka/sql/catorakafka.sql
inflating: /home/oracle/ora_kafka_home/orakafka/sql/pvtorakafkaus.plb
inflating: /home/oracle/ora_kafka_home/orakafka/sql/orakafka_pkg_uninstall.sql
inflating: /home/oracle/ora_kafka_home/orakafka/sql/orakafkab.plb
inflating: /home/oracle/ora_kafka_home/orakafka/sql/pvtorakafkaub.plb
inflating: /home/oracle/ora_kafka_home/orakafka/sql/orakafka_pkg_install.sql
inflating: /home/oracle/ora_kafka_home/orakafka/sql/orakafkas.sql
creating: /home/oracle/ora_kafka_home/orakafka/lib/
inflating: /home/oracle/ora_kafka_home/orakafka/lib/libsnappyjava.so
inflating: /home/oracle/ora_kafka_home/orakafka/lib/libzstd-jni.so
inflating: /home/oracle/ora_kafka_home/orakafka/lib/liblz4-java.so
Step unzip_kit: completed.
Successfully installed orakafka kit in /home/oracle/ora_kafka_home
8. Configure JAVA\_HOME for Oracle SQL access to Kafka. We find the Java path on the node as follows:
[oracle@dbass ~]$ java -XshowSettings:properties -version 2>&1 > /dev/null | grep ‘java.home’
java.home = /usr/java/jre1.8.0_271-amd64
[oracle@dbass ~]$ export JAVA_HOME=/usr/java/jre1.8.0_271-amd64
To set Java home for OSaK, we use the script _/home/oracle/ora\_kafka\_home/orakafka/bin/orakafka.sh_ script, with _set\_java\_home_ option.
[oracle@dbass bin]$ pwd
/home/oracle/ora_kafka_home/orakafka/bin
[oracle@dbass bin]$ ./orakafka.sh set_java_home -p $JAVA_HOME
Step1: Check for valid JAVA_HOME
————————————————————–
Found /usr/java/jre1.8.0_271-amd64/bin/java, JAVA_HOME path is valid.
Step1 succeeded.
Step2: JAVA version check
————————————————————–
java version “1.8.0_271”
Java(TM) SE Runtime Environment (build 1.8.0_271-b09)
Java HotSpot(TM) 64-Bit Server VM (build 25.271-b09, mixed mode)
Java version >= 1.8
Step2 succeeded.
Step3: Creating configure_java.sh script
————————————————————–
Wrote to /home/oracle/ora_kafka_home/app_data/scripts/configure_java.sh
Step3 succeeded.
Successfully configured JAVA_HOME in /home/oracle/ora_kafka_home/app_data/scripts/configure_java.sh
The above information is written to /home/oracle/ora_kafka_home/app_data/logs/set_java_home.log.2021.02.20-04.38.23
[oracle@dbass bin]$
9. Verify installation of OSaK with _verify\_install_ option of script _orakafka.sh_ as follows:
[oracle@dbass ~]$ cd ora_kafka_home/
[oracle@dbass ora_kafka_home]$ cd orakafka/bin/
[oracle@dbass bin]$ ./orakafka.sh verify_install
Check all files/dirs ownership - passed
Check directory privileges - passed
Check expected executable files - passed
The above information is written to /home/oracle/ora_kafka_home/app_data/logs/verify_install.log.2021.02.19-18.10.15
### Set Up Oracle Cloud Infrastructure Stream Clusters for Oracle SQL Access to Kafka
1. Under _ora\_kafka\_home_, that is _/home/oracle/ora\_kafka\_home_, we have two more READMEs as follows:
[oracle@dbass ~]$ find ora_kafka_home/ -name “README*”
ora_kafka_home/orakafka/README
ora_kafka_home/orakafka/doc/README_INSTALL
_~/ora\_kafka\_home/orakafka/README_ is the readme for this release of OSaK that we have installed. Please read through this readme.
And _ora\_kafka\_home/orakafka/doc/README\_INSTALL_ is the README for the actual setup of a Kafka cluster for Oracle SQL access to Kafka. The rest of the steps 2 onwards below follow this README by and large. As per the same, we will leverage _~/ora\_kafka\_home/orakafka/bin/orakafka.s_h script, for adding a Kafka cluster, adding an Oracle Database user for using the Kafka cluster in the next steps.
2. Add our Stream service stream pool to Oracle SQL access to Kafka.
As mentioned earlier, from the Oracle SQL access to Kafka point of view, the Stream service stream pool is a Kafka cluster.
We use the _add\_cluster_ option of the _orakafka.sh_ script to add the cluster to OSaK. We use the _\-c_ argument to name the cluster as _kc1_.
[oracle@dbass bin]$ pwd
/home/oracle/ora_kafka_home/orakafka/bin
[oracle@dbass bin]$ ./orakafka.sh add_cluster -c kc1
Step1: Creation of filesystem cluster config directory
————————————————————–
Filesystem cluster directory creation completed.
Configure security properties at /home/oracle/ora_kafka_home/app_data/clusters/KC1/conf/orakafka.properties.
Step1 succeeded.
Step2: Generate DDL for creation of cluster config DB directory
————————————————————–
Execute the following SQL script while connected as sysdba
to create cluster config database directory:
@/home/oracle/ora_kafka_home/app_data/scratch/orakafka_create_KC1_CONF_DIR.sql
Step2 successfully generated script.
****SUMMARY*****
TODO tasks:
- Configure security properties at /home/oracle/ora_kafka_home/app_data/clusters/KC1/conf/orakafka.properties
- Execute the following SQL while connected as sysdba:
@/home/oracle/ora_kafka_home/app_data/scratch/orakafka_create_KC1_CONF_DIR.sql
The above information is written to /home/oracle/ora_kafka_home/app_data/logs/add_cluster.log.2021.02.20-05.23.30
[oracle@dbass bin]$
We get two TODO tasks as per output from the above execution of the _add\_cluster_ command.
1. For the first task, we add security properties for our Stream service stream to `/home/oracle/ora_kafka_home/app_data/clusters/KC1/conf/orakafka.properties`.
Where to get Kafka-compatible security configs for the Stream service cluster also known as (AKA) streampool? We get these security configs from the OCI web console as shown below.
Please take note of the bootstrap server endpoint(_cell-1.streaming.ap-mumbai-1.oci.oraclecloud.com:9092_ from the screenshot below) as well. It is not related to the security configs of the cluster, but we need it in later steps for connecting to our Stream service and Kafka cluster.

We write the above config values to OSaK in the following format, to `/home/oracle/ora_kafka_home/app_data/clusters/KC1/conf/orakafka.properties` file using any text editor like say vi.

For clarity, we have the same configs in text format as follows:
```
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<YOUR_TENANCY_NAME>/<YOUR_OCI_USERID_HERE>/<YOUR_OCI_STREAMPOOL_OCID>" password=" YOUR_AUTH_TOKEN";
sasl.plain.username="<YOUR_TENANCY_NAME>/<YOUR_OCI_USERID_HERE>/<YOUR_OCI_STREAMPOOL_OCID>"
sasl.plain.password ="YOUR_AUTH_TOKEN";
```
**Note:** default Kafka Consumer Configs are already pre-populated in _orakafka.properties_ for this cluster. If there is a need, one can modify these configs.
2. For the second TODO task, do SQL script execution as follows:
```
[oracle@dbass ~]$ sqlplus / as sysdba
SQL*Plus: Release 19.0.0.0.0 - Production on Sat Feb 20 05:37:28 2021
Version 19.9.0.0.0
Copyright (c) 1982, 2020, Oracle. All rights reserved.
Connected to:
Oracle Database 19c Standard Edition 2 Release 19.0.0.0.0 - Production
Version 19.9.0.0.0
SQL> alter session set container=p0;
Session altered.
SQL> @/home/oracle/ora_kafka_home/app_data/scratch/orakafka_create_KC1_CONF_DIR.sql
Creating database directory "KC1_CONF_DIR"..
Directory created.
The above information is written to /home/oracle/ora_kafka_home/app_data/logs/orakafka_create_KC1_CONF_DIR.log
Disconnected from Oracle Database 19c Standard Edition 2 Release 19.0.0.0.0 - Production
Version 19.9.0.0.0
```
Take note of directory object _KC1\_CONF\_DIR_ created by the above SQL script.
**Note:** As dictated in add\_cluster output, we run the SQL script as a _SYSDBA_ user, inside the PDB of our interest (_p0_ PDB here).
3. To make sure that the cluster configuration is working, we can leverage the _test\_cluster_ option of script _orakafka.sh_.
We pass cluster name with -c argument and bootstrap server with -b argument. We have from bootstrap server info from step 4.2.1.
```
[oracle@dbass bin]$ pwd
/home/oracle/ora_kafka_home/orakafka/bin
[oracle@dbass bin]$ ./orakafka.sh test_cluster -c kc1 -b cell-1.streaming.ap-mumbai-1.oci.oraclecloud.com:9092
Kafka cluster configuration test succeeded.
Kafka cluster configuration test - "KC1"
------------------------------------------------------
KAFKA_BOOTSTRAP_SERVERS=cell-1.streaming.ap-mumbai-1.oci.oraclecloud.com:9092
LOCATION_FILE="/home/oracle/ora_kafka_home/app_data/clusters/KC1/logs/orakafka_stream_KC1.loc"
ora_kafka_operation=metadata
kafka_bootstrap_servers=cell-1.streaming.ap-mumbai-1.oci.oraclecloud.com:9092
List topics output:
StreamForOsak,3
...
For full log, please look at /home/oracle/ora_kafka_home/app_data/clusters/KC1/logs/test_KC1.log.2021.02.20-09.01.12
Test of Kafka cluster configuration for "KC1" completed.
The above information is written to /home/oracle/ora_kafka_home/app_data/logs/test_cluster.log.2021.02.20-09.01.12
[oracle@dbass bin]$
```
As you can see our Kafka cluster configuration test succeeded! We get the list of topics, AKA the Stream service streams in the output here. Topic names are followed by the number of partitions for that stream. Here we have the number of partitions as three, as the Stream service stream StreamForOsak has exactly three partitions.
3. Configure an Oracle Database user for the added Kafka cluster.
We already have created an Oracle Pluggable Databases (PDB) level Oracle Database user with username books\_admin, for the PDB named _p0_. We use the _adduser\_cluster_ option to grant required permissions for user books\_admin on Kafka cluster _kc1_.
[oracle@dbass bin]$ ./orakafka.sh adduser_cluster -c kc1 -u books_admin
Step1: Generate DDL to grant permissions on cluster configuration directory to "BOOKS_ADMIN"
--------------------------------------------------------------------------------------------
Execute the following script while connected as sysdba
to grant permissions on cluster conf directory :
@/home/oracle/ora_kafka_home/app_data/scratch/orakafka_adduser_cluster_KC1_user1.sql
Step1 successfully generated script.
***********SUMMARY************
TODO tasks:
1. Execute the following SQL while connected as sysdba:
@/home/oracle/ora_kafka_home/app_data/scratch/orakafka_adduser_cluster_KC1_user1.sql
The above information is written to /home/oracle/ora_kafka_home/app_data/logs/adduser_cluster.log.2021.02.20-10.45.57
[oracle@dbass bin]$
Similar to the previous step, we have one TODO task to perform.
As per the output, we have an auto-generated SQL script. We need to run this script (as sysdba), to give the Oracle Database user books_admin permissions over the cluster configuration directory for cluster kc1.
This directory in our case is /home/oracle/ora_kafka_home/app_data/clusters/KC1.
[oracle@dbass ~]$ sqlplus / as sysdba
SQL*Plus: Release 19.0.0.0.0 - Production on Sat Feb 20 10:56:17 2021
Version 19.9.0.0.0
Copyright (c) 1982, 2020, Oracle. All rights reserved.
Connected to:
Oracle Database 19c Standard Edition 2 Release 19.0.0.0.0 - Production
Version 19.9.0.0.0
SQL> alter session set container=p0;
Session altered.
SQL> @/home/oracle/ora_kafka_home/app_data/scratch/orakafka_adduser_cluster_KC1_user1.sql
PL/SQL procedure successfully completed.
Granting permissions on "KC1_CONF_DIR" to "BOOKS_ADMIN"
Grant succeeded.`
`The above information is written to /home/oracle/ora_kafka_home/app_data/logs/orakafka_adduser_cluster_KC1_user1.log
Disconnected from Oracle Database 19c Standard Edition 2 Release 19.0.0.0.0 - Production
Version 19.9.0.0.0
[oracle@dbass ~]$
-
We now leverage the install option of orakafka.sh. We pass parent directory (with -p argument) for this user’s data related to its OSaK activities on the Kafka Clusters he is added to.
This command auto-generates two Data Definition Language (DDL) scripts for us as follows:
[oracle@dbass bin]$ ./orakafka.sh install -u books_admin -r /home/oracle/ora_kafka_home/books_admin_user_data_dir Step1: Creation of filesystem location and default directories -------------------------------------------------------------- Created filesystem location directory at /home/oracle/ora_kafka_home/books_admin_user_data_dir/orakafka_location_dir Created filesystem default directory at /home/oracle/ora_kafka_home/books_admin_user_data_dir/orakafka_default_dir Step1 succeeded. Step2: Generate DDL for creation of DB location and default directories -------------------------------------------------------------- Execute the following SQL script while connected as sysdba to setup database directories: @/home/oracle/ora_kafka_home/app_data/scratch/setup_db_dirs_user1.sql On failure, to cleanup location and default directory setup, please run "./orakafka.sh uninstall -u 'BOOKS_ADMIN'" Step2 successfully generated script. Step3: Install ORA_KAFKA package in "BOOKS_ADMIN" user schema -------------------------------------------------------------- Execute the following script in user schema "BOOKS_ADMIN" to install ORA_KAFKA package in the user schema @/home/oracle/ora_kafka_home/app_data/scratch/install_orakafka_user1.sql On failure, to cleanup ORA_KAFKA package from user schema, please run "./orakafka.sh uninstall -u 'BOOKS_ADMIN'" Step3 successfully generated script. ***********SUMMARY************ TODO tasks: 1. Execute the following SQL while connected as sysdba: @/home/oracle/ora_kafka_home/app_data/scratch/setup_db_dirs_user1.sql 2. Execute the following SQL in user schema: @/home/oracle/ora_kafka_home/app_data/scratch/install_orakafka_user1.sql The above information is written to /home/oracle/ora_kafka_home/app_data/logs/install.log.2021.02.20-11.33.09 [oracle@dbass bin]
-
The first task is to run an SQL script to register the two directories namely /home/oracle/ora_kafka_home/books_admin_user_data_dir/orakafka_location_dir and /home/oracle/ora_kafka_home/books_admin_user_data_dir/orakafka_default_dir
with Oracle DB, as directories objects.As prescribed in the output, we need to run it with SYSDBA privileges, as follows:
[oracle@dbass ~]$ sqlplus / as sysdba SQL*Plus: Release 19.0.0.0.0 - Production on Sat Feb 20 11:46:04 2021 Version 19.9.0.0.0 Copyright (c) 1982, 2020, Oracle. All rights reserved. Connected to: Oracle Database 19c Standard Edition 2 Release 19.0.0.0.0 - Production Version 19.9.0.0.0 SQL> alter session set container=p0; Session altered. SQL> @/home/oracle/ora_kafka_home/app_data/scratch/setup_db_dirs_user1.sql Checking if user exists.. PL/SQL procedure successfully completed. Creating location and default directories.. PL/SQL procedure successfully completed. Directory created. Directory created. Grant succeeded. Grant succeeded. Creation of location dir "BOOKS_ADMIN_KAFKA_LOC_DIR" and default dir "BOOKS_ADMIN_KAFKA_DEF_DIR" completed. Grant of required permissions on "BOOKS_ADMIN_KAFKA_LOC_DIR","BOOKS_ADMIN_KAFKA_DEF_DIR" to "BOOKS_ADMIN" completed. The above information is written to /home/oracle/ora_kafka_home/app_data/logs/setup_db_dirs_user1.log Disconnected from Oracle Database 19c Standard Edition 2 Release 19.0.0.0.0 - Production Version 19.9.0.0.0 [oracle@dbass ~]$
Take note of directory objects created by the above SQL scripts namely BOOKS_ADMIN_KAFKA_LOC_DIR and BOOKS_ADMIN_KAFKA_DEF_DIR.
Now let us get to the second TODO task.
-
Here we run another SQL script as books_admin (and not sysdba) on p0 PDB. This script will install the Oracle SQL access to Kafka package and objects in the schema belonging to books_admin.
[oracle@dbass ~]$ sqlplus books_admin@p0pdb #p0pdb is tns entry for p0 pdb SQL*Plus: Release 19.0.0.0.0 - Production on Sat Feb 20 11:52:33 2021 Version 19.9.0.0.0 Copyright (c) 1982, 2020, Oracle. All rights reserved. Enter password: Last Successful login time: Fri Feb 19 2021 14:04:59 +00:00 Connected to: Oracle Database 19c Standard Edition 2 Release 19.0.0.0.0 - Production Version 19.9.0.0.0 SQL> ALTER SESSION SET CONTAINER=p0; Session altered. SQL> @/home/oracle/ora_kafka_home/app_data/scratch/install_orakafka_user1.sql Verifying user schema.. PL/SQL procedure successfully completed. Verifying that location and default directories are accessible.. PL/SQL procedure successfully completed. Installing ORA_KAFKA package in user schema.. .. Creating ORA_KAFKA artifacts Table created. Table created. Table created. Table created. Package created. No errors. Package created. No errors. Package body created. No errors. Package body created. No errors. The above information is written to /home/oracle/ora_kafka_home/app_data/logs/install_orakafka_user1.log Disconnected from Oracle Database 19c Standard Edition 2 Release 19.0.0.0.0 - Production Version 19.9.0.0.0 [oracle@dbass ~]$
Register the Stream Service Cluster with Oracle Database and Create Views for the Stream
In this step we the cluster kc1 with Oracle Database and create views for the stream StreamForOsak.
All the SQL queries for this step are to be run as user books_admin and on p0 PDB.
- Register the cluster with Oracle SQL access to Kafka procedure ORA_KAFKA.REGISTER_CLUSTER.
BEGIN
ORA_KAFKA.REGISTER_CLUSTER
('kc1', -- cluster name
'cell-1.streaming.ap-mumbai-1.oci.oraclecloud.com:9092', -- fill up your bootstrap server here
'BOOKS_ADMIN_KAFKA_DEF_DIR', -- default directory for external table created in previous steps
'BOOKS_ADMIN_KAFKA_LOC_DIR', -- this directory object too is created in previous steps
'KC1_CONF_DIR', --config dir for cluster
'Registering kc1 for this session'); --description
END;
/
- Create an Oracle Database table named BOOKS, which needs to have the same schema as messages in our Kafka Topic StreamForOsak.
CREATE TABLE BOOKS ( -- reference table. It is empty table. Schema of this table must correspond to Kafka Messages
id int,
title varchar2(50),
author_name varchar2(50),
author_email varchar2(50),
publisher varchar2(50)
);
/
- Create a view for our topic StreamForOsak with the help stored procedure from the Oracle SQL access to Kafka package viz CREATE_VIEWS as follows.
DECLARE
application_id VARCHAR2(128);
views_created INTEGER;
BEGIN
ORA_KAFKA.CREATE_VIEWS
('kc1', -- cluster name
'OsakApp0', -- consumer group name that OSS or Kafka cluster sees.
'StreamForOsak', -- Kafka topic aka OSS stream name
'CSV', -- format type
'BOOKS', -- database reference table
views_created, -- output
application_id); --output
dbms_output.put_line('views created = ' || views_created);
dbms_output.put_line('application id = ' || application_id);
END;
/
Oracle SQL access to Kafka supports two formats for Kafka messages, CSV and JSON. Here we are using CSV. So an example Kafka message complying with BOOKS table schema can be 101, Famous Book, John Smith, john@smith.com, First Software.
tar xvf sqldeveloper-20.4.0.379.2205-20.4.0-379.2205.noarch.rpm
- The above step will create a view for each partition of the topic. Since topic StreamForOsak has three partitions, we will have three views. The name of each view will be in the format KV_<CLUSTER_NAME>_<GROUP_NAME>_TOPIC_<NUM_OF_PARTITION>.
Here we have three views as shown below, one for each partition.
So view KV_KC1_OSAKAPP0_STREAMFOROSAK_0 is mapped to partition 0, view KV_KC1_OSAKAPP0_STREAMFOROSAK_2 is mapped to partition 1 and lastly, KV_KC1_OSAKAPP0_STREAMFOROSAK_0 is mapped to partition 2 of the topic StreamForOsak.
Produce Data to the Stream
- Produce a test message to stream StreamForOsak, using the Produce Test Message button from the Oracle Cloud Infrastructure (OCI) web console as shown below.
When we click Produce Test Message, a window with a field to enter our test message displays, as shown below.
We enter the below message:
200, Andrew Miller Part 1, MS Brown, mb@example.com, First Software
Needless to say, we could have also used standard Kafka Producer API in Java/Python or OCI SDK for the Stream service, for producing the message to the Stream service stream.
- After we click Produce on the above window, we publish the message to stream StreamForOsak. We can use the Load Message utility to see the partition where our message landed.
As shown above, our message landed in partition 2 of the topic StreamForOsak.
Retrieve the Stream Service Messages Using the Database Views Set Up by Oracle SQL access to Kafka Stored Procedures
We can run a simple SQL query:
SELECT * FROM KV_KC1_OSAKAPP0_STREAMFOROSAK_2_
Or as shown below, we can simply open the view SQL developer to see the data in view KV_KC1_OSAKAPP0_STREAMFOROSAK_2:
We chose view KV_KC1_OSAKAPP0_STREAMFOROSAK_2, as it corresponds to partition 2 of the stream StreamForOsak. So each time we run the SELECT SQL query for this view, it goes to the corresponding partition of its associated topic, and it fetches new uncommitted messages (for the consumer group OsakApp0), as rows in the view. Each message gets its own row in the view. Please note, data of these views is not persisted.
Curious eyes can also observe that Oracle SQL access to Kafka fetches and store metadata info like the offset of the message, its partition, and timestamp in additional columns.
Most probably, you want to query this view again and again, and for each run of the query, you want the view to move sequentially in the stream. You can achieve this easily with the following canonical code snippet:
LOOP
ORA_KAFKA.NEXT_OFFSET(‘_KV_KC1_OSAKAPP0_STREAMFOROSAK_2_’);
SELECT * FROM KV_KC1_OSAKAPP0_STREAMFOROSAK_2_;
ORA_KAFKA.UPDATE_OFFSET(‘_KV_KC1_OSAKAPP0_STREAMFOROSAK_2_’);
COMMIT;
END LOOP;
As said earlier, When we use Oracle SQL access to Kafka, stream offsets are managed by Oracle Database and not the Stream service. They live in system tables tracking the positioning of offsets for all partitions accessed by the KV_KC1_OSAKAPP0_STREAMFOROSAK_2 view.
The NEXT_OFFSET call simply binds the KV_KC1_OSAKAPP0_STREAMFOROSAK_2 view to a new set of offsets that represent new data living in the Oracle Cloud Infrastructure Streaming partition accessed by the view. The UPDATE_OFFSET communicates how many rows were read for each partition and advances the offsets to new positions.
The COMMIT guarantees that this unit of work is ACID compliant.
Conclusion
In this tutorial, we have focused on how we can install Oracle SQL access to Kafka and use it with the Stream service. Since the Stream service is Kafka compatible, it is just like a Kafka cluster for Oracle SQL access to Kafka.
Please note, here we have barely scratched the surface as far as functionality offered by Oracle SQL access to Kafka is concerned. Oracle SQL access to Kafka is highly configurable. We can even store the streaming data in Oracle Database tables, which are persisted to the disk, unlike the views. Please refer to README files provided in the Oracle SQL access to Kafka installation and the below references for more details.
Related Links
- Oracle PL/SQL developer documentation for Oracle SQL access to Kafka
- Oracle blog on Oracle SQL access to Kafka
- Oracle SQL access to Kafka within Oracle Big Data SQL
- Oracle Cloud Infrastructure Streaming
- Stream service Kafka API compatibility
Acknowledgements
- Author - Mayur Raleraskar, Solutions Architect
```
More Learning Resources
Explore other labs on docs.oracle.com/learn or access more free learning content on the Oracle Learning YouTube channel. Additionally, visit education.oracle.com/learning-explorer to become an Oracle Learning Explorer.
For product documentation, visit Oracle Help Center.
Stream analytics using Oracle Cloud Infrastructure Streaming and Oracle Database
F41231-02
November 2021
Copyright © 2021, Oracle and/or its affiliates.