Nota:

Análisis de flujos con Oracle Cloud Infrastructure Streaming y Oracle Database

Introducción

Oracle Cloud Infrastructure Streaming es un servicio de transmisión altamente escalable y de alta disponibilidad en Oracle Cloud Infrastructure (OCI). El servicio Stream no tiene ningún servidor y es compatible con las API de Apache Kafka.

El acceso SQL de Oracle a Kafka es un paquete Oracle PL/SQL y un preprocesador de tabla externa. Permite a Oracle Database leer y procesar eventos y mensajes de temas de Kafka como vistas o tablas en Oracle Database. Una vez que los datos de Kafka están en una tabla de Oracle Database o son visibles desde una vista de Oracle Database, se pueden consultar con toda la potencia de Oracle PL/SQL, al igual que cualquier otro dato de Oracle Database.

Al recuperar datos con acceso SQL de Oracle a las vistas activadas para Kafka, los datos no se conservan en Oracle Database. Sin embargo, cuando utiliza el acceso SQL de Oracle a las tablas con tecnología Kafka, se mantiene en Oracle Database. Por lo tanto, el acceso SQL de Oracle a Kafka ofrece a los desarrolladores flexibilidad completa sobre la persistencia de los datos de transmisión en Oracle Database. Para obtener una buena visión general y un caso de uso para el acceso SQL de Oracle a Kafka, consulte esta publicación de blog Integración de datos en movimiento con datos en curso mediante Oracle SQL Access to Kafka Views.

En resumen, el acceso SQL de Oracle a Kafka permite que los datos en movimiento se procesen con datos estáticos (dentro de las tablas de Oracle Database) mediante PL/SQL de Oracle. Por lo tanto, las aplicaciones, por ejemplo, Java Database Connectivity (JDBC), pueden procesar eventos en tiempo real desde Kafka y datos críticos de Oracle Database, en transacciones de Oracle Database con garantías de ACID. Esto no es posible fácilmente cuando una aplicación recupera eventos y datos de Kafka de Oracle Database por separado.

Ventajas

  1. Los clientes pueden utilizar el acceso SQL de Oracle a Kafka para ejecutar trabajos de análisis de flujos en tiempo real con Oracle PL/SQL mediante la lectura directa de los datos del servicio Stream, sin cambiarlos a un almacén de datos externo.
  2. Los clientes también pueden utilizar el acceso SQL de Oracle a Kafka exclusivamente para mover datos del servicio de flujo a Oracle Database sin ningún procesamiento.
  3. La operación de procesamiento de flujo se puede realizar en el contexto de una transacción ACID de Oracle controlada por la aplicación.
  4. El acceso SQL de Oracle a Kafka solo actúa como consumidor de Kafka y nunca como productor de Kafka. La gestión de compensación completa la gestiona OSaK. Almacena esta información en tablas de metadatos en Oracle Database. Por lo tanto, el acceso SQL de Oracle a Kafka permite semántica de procesamiento exactamente una vez, ya que puede confirmar el desfase de partición para Kafka y el servicio Stream y datos de aplicación, en una única transacción de Oracle Database compatible con ACID a Oracle Database. Esto elimina la pérdida o relectura de registros de transmisión.

Casos de uso

Imagine cualquier caso de uso en el que desee relacionar o unir los datos de transmisión con sus datos relacionales, por ejemplo:

  1. Desea combinar los datos de transmisión de sus dispositivos IoT de chat (probablemente presentes en las instalaciones del cliente) con la información de cliente relevante, almacenada en su Oracle Database relacional de verdad de origen.
  2. Desea calcular medias móviles exponenciales precisas para los precios de las acciones que están transmitiendo al servicio de flujo. Debe hacerlo con una semántica exactamente una vez. Desea combinar estos datos con información estática sobre ese stock como su nombre, ID de compañía, límite de mercado, etc., almacenado en Oracle Database.

Tenga en cuenta que, dado que el acceso SQL de Oracle a Kafka es un paquete PL/SQL que se debe instalar manualmente en el host del servidor de Oracle, solo puede trabajar con instalaciones de Oracle Database autogestionadas (locales o en la nube). No puede trabajar con ofertas sin servidor de Oracle Database, como Oracle Autonomous Database en Oracle Cloud Infrastructure (OCI).

Como el servicio Stream es compatible con la API de Kafka, funciona perfectamente con el acceso SQL de Oracle a Kafka. Desde el acceso SQL de Oracle al punto de vista de Kafka, un pool de flujos de servicio es un cluster de Kafka y un flujo de servicio de flujos es un tema del cluster de Kafka.

En este tutorial, mostraremos la facilidad con la que podemos integrar el acceso de Oracle SQL a Kafka con el servicio Stream.

Nota: si ya está familiarizado con el acceso SQL de Oracle a Kafka, el servicio Stream y Kafka y desea utilizar el acceso SQL de Oracle a Kafka con el servicio Stream, puede saltar directamente para configurar clusters de Oracle Cloud Infrastructure Streaming para el acceso SQL de Oracle a Kafka, paso 2.1. Aprenda el resto del tutorial según sea necesario.

Requisitos

Oracle Cloud Infrastructure Streaming y Oracle SQL Access to Kafka Integration

Crear un flujo y pool de flujos en Oracle Cloud Infrastructure

  1. Conéctese a su cuenta/arrendamiento de OCI y configure un pool de flujos de servicio de flujo denominado StreampoolForOsak y el flujo denominado StreamForOsak de la siguiente manera.

    Creación de Streampool en OCI

  2. Ahora creamos un flujo denominado StreamForOsak en el pool de flujos StreampoolForOsak que acabamos de crear.

    Creación de flujos en OCI

Para estas creaciones de recursos, puede utilizar cualquiera de sus compartimentos existentes. Para nuestra comodidad, hemos creado un nuevo compartimento denominado OssOsakIntegration y todos los recursos están en el mismo compartimento.

En la terminología del servicio Stream, los temas de Kafka se denominan Streams. Por lo tanto, desde el acceso SQL de Oracle hasta el punto de vista de Kafka, el flujo StreamForOsak es un tema de Kafka con tres particiones.

Ahora se realiza la creación del flujo de servicio de flujo. A continuación, los términos Servicio de Flujo y Kafka son intercambiables. De forma similar, los términos Stream y Kafka Topic son intercambiables.

Creación de un usuario, grupo y políticas en Oracle Cloud Infrastructure

Si ya tiene un usuario existente con las autorizaciones adecuadas para utilizar el servicio Stream, el paso 2 se puede omitir por completo.

  1. Para utilizar los flujos de servicio de flujo con el acceso SQL de Oracle a Kafka, debemos crear un nuevo usuario de OCI para ello. Creamos un nuevo usuario para este fin con el nombre de usuario OssOsakUser, desde la consola web de OCI de la siguiente manera:

    Creación de Usuario de OCI

  2. Para que el usuario OssOsakUser se autentique con el servicio Stream (con las API de Kafka), necesitamos crear un token de autenticación para este nuevo usuario, de la siguiente forma:

    Creación de token de autorización de usuario

    Después de haber generado el token, primero tiene la única oportunidad de ver y copiar el token de autenticación. Por lo tanto, copie el token de autenticación y manténgalo seguro en un lugar desde donde pueda acceder a él más tarde. Lo necesita en pasos posteriores, específicamente cuando configuramos el cluster de Kafka para el acceso SQL de Oracle a Kafka. El acceso de Oracle SQL a Kafka utilizará este nombre de usuario: OssOsakUser y su token de autenticación para acceder al servicio de flujo mediante las API de consumidor de Kafka de forma interna.

  3. OssOsakUser también necesita tener el conjunto adecuado de privilegios en OCI IAM para acceder al cluster de servicio de flujo.

    En OCI, los usuarios adquieren privilegios con la ayuda de las políticas asignadas a los grupos de usuarios de los que forman parte. Por lo tanto, necesitamos crear un grupo para OssOsakUser, seguido de una política para ese grupo.

    Cree un grupo de usuarios como se muestra a continuación:

    Creación de grupo de usuarios

  4. Agregue el usuario OssOsakUser al grupo de usuarios OssOsakUserGroup.

    Agregue el usuario al grupo

  5. Para autorizar a OssOsakUser a utilizar el flujo de servicio de flujo, específicamente para publicar y leer mensajes de él, debemos crear la siguiente política en el arrendamiento. Esta política, una vez creada, otorga privilegios a todos los usuarios del grupo OssOsakUserGroup. Como el usuario OssOsakUser está en el mismo grupo, adquiere los mismos privilegios.

    policy

    El fragmento de texto para la misma política anterior es el siguiente.

    Allow group OssOsakUserGroup to manage streams in compartment OssOsakIntegration  
    Allow group OssOsakUserGroup to manage stream-push in compartment OssOsakIntegration   
    Allow group OssOsakUserGroup to manage stream-pull in compartment OssOsakIntegration  
    Allow group OssOsakUserGroup to manage stream-family in compartment OssOsakIntegration
    

Instalación del acceso SQL de Oracle a Kafka en un host de Oracle Database

  1. El acceso de Oracle SQL al kit de Kafka está disponible como parte de una ampliamente utilizada SQL Developer.

    Utilice el enlace oficial de SQL Developer para descargar la última versión de SQL Developer. A partir de la escritura de este tutorial, la última versión de SQL Developer es la 20.4.

    Asegúrese de descargar Oracle SQL Developer para la misma plataforma que el host de Oracle Database. En nuestro entorno de desarrollo, descargamos Oracle SQL Developer for Oracle Linux RPM (ya que nuestra instancia de Oracle Database se ejecuta en Oracle Linux Platform).

    Una vez finalizada la descarga, extraiga el contenido del archivo RPM/zip de Oracle SQL Developer a cualquier directorio/carpeta mediante el comando unzip o tar.

    tar xvf sqldeveloper-20.4.0.379.2205-20.4.0-379.2205.noarch.rpm
    
  2. Vaya al directorio en el que se extrajo el contenido de Oracle SQL Developer y busque el acceso SQL de Oracle al archivo ZIP de Kafka denominado orakafka.zip de la siguiente manera:

    $ find . -name 'orakafta*'  
    ./sqldeveloper/orakafta  
    ./sqldeveloper/orakafta/orakafta.zip
    

    El acceso de Oracle SQL al kit de Kafka está en el archivo orakafka.zip. No nos interesa el resto del contenido del desarrollador SQL para esta demostración. Por lo que respecta a la instalación y el uso del acceso SQL de Oracle a Kafka, orakafka.zip es todo lo que necesitamos.

  3. Copie el archivo orakafka.zip en el host de Oracle Database mediante el comando scp o los clientes FTP basados en GUI como FileZilla.

    Establezca una conexión SSH en el nodo de Oracle Database y mueva orakafka.zip a /home/oracle(directorio raíz del usuario oracle) con mv_ command.

  4. Para el resto de instrucciones para instalar el acceso SQL de Oracle a Kafka, debemos cambiar al usuario oracle en el host de Oracle Database.

    Asegúrese de que el directorio de trabajo actual es /home/oracle. Ya tenemos orakafka.zip en el mismo directorio.

    [opc@dbass ~]$ sudo su - oracle  
    Last login: Sat Feb 20 09:31:12 UTC 2021  
    [oracle@dbass ~]$ pwd  
    /home/oracle  
    [oracle@dbass ~]$ ls -al  
    total 3968  
    drwx------ 6 oracle oinstall    4096 Feb 19 17:39 .  
    drwxr-xr-x 5 root   root        4096 Feb 18 15:15 ..  
    -rw------- 1 oracle oinstall    4397 Feb 20 09:31 .bash_history  
    -rw-r--r-- 1 oracle oinstall      18 Nov 22  2019 .bash_logout  
    -rw-r--r-- 1 oracle oinstall     203 Feb 18 15:15 .bash_profile  
    -rw-r--r-- 1 oracle oinstall     545 Feb 18 15:20 .bashrc  
    -rw-r--r-- 1 oracle oinstall     172 Apr  1  2020 .kshrc  
    drwxr----- 3 oracle oinstall    4096 Feb 19 17:37 .pki  
    drwxr-xr-x 2 oracle oinstall    4096 Feb 18 15:20 .ssh  
    -rw-r--r-- 1 oracle oinstall 4002875 Feb 19 17:38 orakafka.zip
    
  5. Extraiga o descomprima orakafka.zip. Se creará un nuevo directorio denominado orakafka-<version> con el contenido extraído. En nuestro caso, es orakafka-1.2.0 de la siguiente manera:

    [oracle@dbass ~]$ unzip orakafka.zip  
    Archive:  orakafka.zip 
    creating: orakafka-1.2.0/  
    

extracción: orakafka-1.2.0/kit_version.txt
inflando: orakafka-1.2.0/orakafka_distro_install.sh
extrayendo: orakafka-1.2.0/orakafka.zip
inflando: orakafka-1.2.0/README


6. Now we follow the instructions found in the _**orakafka-1.2.0/README**_ for the setup of Oracle SQL access to Kafka. We follow _simple install_ for single-instance Oracle Database. 

This README doc has instructions for Oracle SQL access to Kafka installation on Oracle Real Application Clusters (Oracle RAC) as well. By and large, in the case of Oracle RAC, we need to replicate the following steps on all nodes of Oracle RAC. Please follow the README for details.


[oracle@dbass ~]$ cd orakafka-1.2.0/
[oracle@dbass orakafka-1.2.0]$ ls -al
total 3944
drwxrwxr-x 2 oracle oinstall    4096 Feb 20 09:12 .
drwx—— 6 oracle oinstall    4096 Feb 19 17:39 ..
-rw-r–r– 1 oracle oinstall    6771 Oct 16 03:11 README 
-rw-r–r– 1 oracle oinstall       5 Oct 16 03:11 kit_version.txt
-rw-rw-r– 1 oracle oinstall 3996158 Oct 16 03:11 orakafka.zip
-rwxr-xr-x 1 oracle oinstall   17599 Oct 16 03:11 orakafka_distro_install.sh


tar xvf sqldeveloper-20.4.0.379.2205-20.4.0-379.2205.noarch.rpm


7. As per _./orakafka-1.2.0/README_, we install Oracle SQL access to Kafka on the Oracle Database host with the help of _./orakafka-1.2.0/orakafka\_distro\_install.sh_ script. Argument -p lets us specify the location or base directory for the Oracle SQL access to Kafka installation on this host. 

   We choose the newly created empty directory named ora\_kafka\_home as the OSaK base directory on this host. So the full path of the OSaK base directory will be _/home/oracle/ora\_kafka\_home_.

[oracle@dbass ~]$ ./orakafka-1.2.0/orakafka_distro_install.sh -p ./ora_kafka_home/
 
 Step Create Product Home::
————————————————————–
…../home/oracle/ora_kafka_home already exists..
Step Create Product Home: completed.
PRODUCT_HOME=/home/oracle/ora_kafka_home
 
 Step Create app_data home::
————————————————————–
….. creating /home/oracle/ora_kafka_home/app_data and subdirectories
……Generated CONF_KIT_HOME_SCRIPT=/home/oracle/ora_kafka_home/app_data/scripts/configure_kit_home.sh
……Generated CONF_APP_DATA_HOME_SCRIPT=/home/oracle/ora_kafka_home/configure_app_data_home.sh
Step Create app_data home: completed.
APP_DATA_HOME=/home/oracle/ora_kafka_home/app_data
 
 Step unzip_kit::
————————————————————–
…..checking for existing binaries in /home/oracle/ora_kafka_home/orakafka
…..unzip kit into /home/oracle/ora_kafka_home/orakafka
Archive:  /home/oracle/orakafka-1.2.0/orakafka.zip
   creating: /home/oracle/ora_kafka_home/orakafka/
 extracting: /home/oracle/ora_kafka_home/orakafka/kit_version.txt
  inflating: /home/oracle/ora_kafka_home/orakafka/README
   creating: /home/oracle/ora_kafka_home/orakafka/doc/
  inflating: /home/oracle/ora_kafka_home/orakafka/doc/README_INSTALL
   creating: /home/oracle/ora_kafka_home/orakafka/jlib/
  inflating: /home/oracle/ora_kafka_home/orakafka/jlib/osakafka.jar
  inflating: /home/oracle/ora_kafka_home/orakafka/jlib/kafka-clients-2.5.0.jar
  inflating: /home/oracle/ora_kafka_home/orakafka/jlib/slf4j-simple-1.7.28.jar
  inflating: /home/oracle/ora_kafka_home/orakafka/jlib/lz4-java-no-jni-1.7.1.jar
  inflating: /home/oracle/ora_kafka_home/orakafka/jlib/snappy-java-no-jni-1.1.7.3.jar
  inflating: /home/oracle/ora_kafka_home/orakafka/jlib/zstd-no-jni-1.4.4-7.jar
  inflating: /home/oracle/ora_kafka_home/orakafka/jlib/slf4j-api-1.7.30.jar
   creating: /home/oracle/ora_kafka_home/orakafka/bin/
  inflating: /home/oracle/ora_kafka_home/orakafka/bin/orakafka_stream.sh
  inflating: /home/oracle/ora_kafka_home/orakafka/bin/orakafka.sh
   creating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/
  inflating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/removeuser_cluster.sh
  inflating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/setup_all.sh
  inflating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/remove_cluster.sh
  inflating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/verify_install.sh
  inflating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/add_cluster.sh
  inflating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/config_util.sh
  inflating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/test_views.sh
  inflating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/install.sh
  inflating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/set_java_home.sh
  inflating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/list_clusters.sh
  inflating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/adduser_cluster.sh
  inflating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/uninstall.sh
  inflating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/test_cluster.sh
   creating: /home/oracle/ora_kafka_home/orakafka/conf/
  inflating: /home/oracle/ora_kafka_home/orakafka/conf/orakafka.properties.template
   creating: /home/oracle/ora_kafka_home/orakafka/sql/
  inflating: /home/oracle/ora_kafka_home/orakafka/sql/orakafkatab.plb
  inflating: /home/oracle/ora_kafka_home/orakafka/sql/catnoorakafka.sql
  inflating: /home/oracle/ora_kafka_home/orakafka/sql/catorakafka.sql
  inflating: /home/oracle/ora_kafka_home/orakafka/sql/pvtorakafkaus.plb
  inflating: /home/oracle/ora_kafka_home/orakafka/sql/orakafka_pkg_uninstall.sql
  inflating: /home/oracle/ora_kafka_home/orakafka/sql/orakafkab.plb
  inflating: /home/oracle/ora_kafka_home/orakafka/sql/pvtorakafkaub.plb
  inflating: /home/oracle/ora_kafka_home/orakafka/sql/orakafka_pkg_install.sql
  inflating: /home/oracle/ora_kafka_home/orakafka/sql/orakafkas.sql
   creating: /home/oracle/ora_kafka_home/orakafka/lib/
  inflating: /home/oracle/ora_kafka_home/orakafka/lib/libsnappyjava.so
  inflating: /home/oracle/ora_kafka_home/orakafka/lib/libzstd-jni.so
  inflating: /home/oracle/ora_kafka_home/orakafka/lib/liblz4-java.so
Step unzip_kit: completed.
 
Successfully installed orakafka kit in /home/oracle/ora_kafka_home

8. Configure JAVA\_HOME for Oracle SQL access to Kafka. We find the Java path on the node as follows:

[oracle@dbass ~]$ java -XshowSettings:properties -version 2>&1 > /dev/null | grep 'java.home'
java.home = /usr/java/jre1.8.0_271-amd64
[oracle@dbass ~]$ export JAVA_HOME=/usr/java/jre1.8.0_271-amd64


   To set Java home for OSaK, we use the script _/home/oracle/ora\_kafka\_home/orakafka/bin/orakafka.sh_ script, with _set\_java\_home_ option.

[oracle@dbass bin]$ pwd
/home/oracle/ora_kafka_home/orakafka/bin
[oracle@dbass bin]$ ./orakafka.sh set_java_home -p $JAVA_HOME
 
Step1: Check for valid JAVA_HOME
————————————————————–
Found /usr/java/jre1.8.0_271-amd64/bin/java, JAVA_HOME path is valid.
Step1 succeeded.
 
Step2: JAVA version check
————————————————————–
java version “1.8.0_271”
Java(TM) SE Runtime Environment (build 1.8.0_271-b09)
Java HotSpot(TM) 64-Bit Server VM (build 25.271-b09, mixed mode)
Java version >= 1.8
Step2 succeeded.
 
Step3: Creating configure_java.sh script
————————————————————–
Wrote to /home/oracle/ora_kafka_home/app_data/scripts/configure_java.sh
Step3 succeeded.
 
Successfully configured JAVA_HOME in /home/oracle/ora_kafka_home/app_data/scripts/configure_java.sh
 
The above information is written to /home/oracle/ora_kafka_home/app_data/logs/set_java_home.log.2021.02.20-04.38.23
 
[oracle@dbass bin]$

9. Verify installation of OSaK with _verify\_install_ option of script _orakafka.sh_ as follows:

[oracle@dbass ~]$ cd ora_kafka_home/
[oracle@dbass ora_kafka_home]$ cd orakafka/bin/
[oracle@dbass bin]$ ./orakafka.sh verify_install
Compruebe la propiedad de todos los archivos/dirs - transferido
Compruebe los privilegios del directorio - transferido
Compruebe los archivos ejecutables esperados - transferido

La información anterior se escribe en /home/oracle/ora_kafka_home/app_data/logs/verify_install.log.2021.02.19-18.10.15


### Set Up Oracle Cloud Infrastructure Stream Clusters for Oracle SQL Access to Kafka

1. Under _ora\_kafka\_home_, that is _/home/oracle/ora\_kafka\_home_, we have two more READMEs as follows:

[oracle@dbass ~]$ find ora_kafka_home/ -name "README*"
ora_kafka_home/orakafka/README
ora_kafka_home/orakafka/doc/README_INSTALL


   _~/ora\_kafka\_home/orakafka/README_ is the readme for this release of OSaK that we have installed. Please read through this readme.

   And _ora\_kafka\_home/orakafka/doc/README\_INSTALL_ is the README for the actual setup of a Kafka cluster for Oracle SQL access to Kafka. The rest of the steps 2 onwards below follow this README by and large. As per the same, we will leverage _~/ora\_kafka\_home/orakafka/bin/orakafka.s_h script, for adding a Kafka cluster, adding an Oracle Database user for using the Kafka cluster in the next steps.

2. Add our Stream service stream pool to Oracle SQL access to Kafka.

   As mentioned earlier, from the Oracle SQL access to Kafka point of view, the Stream service stream pool is a Kafka cluster.

   We use the _add\_cluster_ option of the _orakafka.sh_ script to add the cluster to OSaK. We use the _\-c_ argument to name the cluster as _kc1_.

[oracle@dbass bin]$ pwd
/home/oracle/ora_kafka_home/orakafka/bin
[oracle@dbass bin]$ ./orakafka.sh add_cluster -c kc1

Step1: creación del directorio de configuración de cluster del sistema de archivos
---------------------
Se ha completado la creación del directorio de cluster del sistema de archivos.
Configure las propiedades de seguridad en /home/oracle/ora_kafka_home/app_data/clusters/KC1/conf/orakafka.properties.
Step1 correcto.

Step2: genere DDL para la creación del directorio de base de datos de configuración de cluster
---------------------
Ejecute el siguiente script SQL mientras está conectado como sysdba
para crear el directorio de base de datos de configuración de cluster:
@/home/oracle/ora_kafka_home/app_data/scratch/orakafka_create_KC1_CONF_DIR.sql
Step2 ha generado correctamente el script.

****SUMMARY*****

Tareas de TODO:

  1. Configure las propiedades de seguridad en /home/oracle/ora_kafka_home/app_data/clusters/KC1/conf/orakafka.properties
  2. Ejecute el siguiente SQL mientras está conectado como sysdba:
    @/home/oracle/ora_kafka_home/app_data/scratch/orakafka_create_KC1_CONF_DIR.sql

La información anterior se escribe en /home/oracle/ora_kafka_home/app_data/logs/add_cluster.log.2021.02.20-05.23.30

[oracle@dbass bin]$


   We get two TODO tasks as per output from the above execution of the _add\_cluster_ command. 

   1. For the first task, we add security properties for our Stream service stream to `/home/oracle/ora_kafka_home/app_data/clusters/KC1/conf/orakafka.properties`. 

      Where to get Kafka-compatible security configs for the Stream service cluster also known as (AKA) streampool? We get these security configs from the OCI web console as shown below.

      Please take note of the bootstrap server endpoint(_cell-1.streaming.ap-mumbai-1.oci.oraclecloud.com:9092_ from the screenshot below) as well. It is not related to the security configs of the cluster, but we need it in later steps for connecting to our Stream service and Kafka cluster. 

      ![](images/bsend.png " ")

      We write the above config values to OSaK in the following format, to `/home/oracle/ora_kafka_home/app_data/clusters/KC1/conf/orakafka.properties` file using any text editor like say vi. 

      ![](images/osakosssec.png " ")

      For clarity, we have the same configs in text format as follows:

      ```
      security.protocol=SASL_SSL  
      sasl.mechanism=PLAIN  
      sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<YOUR_TENANCY_NAME>/<YOUR_OCI_USERID_HERE>/<YOUR_OCI_STREAMPOOL_OCID>" password=" YOUR_AUTH_TOKEN";  
      sasl.plain.username="<YOUR_TENANCY_NAME>/<YOUR_OCI_USERID_HERE>/<YOUR_OCI_STREAMPOOL_OCID>"  
      sasl.plain.password ="YOUR_AUTH_TOKEN";
      ```

      **Note:** default Kafka Consumer Configs are already pre-populated in _orakafka.properties_ for this cluster. If there is a need, one can modify these configs.

   2. For the second TODO task, do SQL script execution as follows: 

      ```
      [oracle@dbass ~]$ sqlplus / as sysdba  
      SQL*Plus: Release 19.0.0.0.0 - Production on Sat Feb 20 05:37:28 2021  
      Version 19.9.0.0.0   
      Copyright (c) 1982, 2020, Oracle.  All rights reserved.
      Connected to:  
      Oracle Database 19c Standard Edition 2 Release 19.0.0.0.0 - Production  
      Version 19.9.0.0.0   
      SQL> alter session set container=p0;  
      Session altered.  
      SQL> @/home/oracle/ora_kafka_home/app_data/scratch/orakafka_create_KC1_CONF_DIR.sql  
      Creating database directory "KC1_CONF_DIR"..  
      Directory created.  
      The above information is written to /home/oracle/ora_kafka_home/app_data/logs/orakafka_create_KC1_CONF_DIR.log  
      Disconnected from Oracle Database 19c Standard Edition 2 Release 19.0.0.0.0 - Production  
      Version 19.9.0.0.0
      ```

      Take note of directory object _KC1\_CONF\_DIR_ created by the above SQL script.

      **Note:** As dictated in add\_cluster output, we run the SQL script as a _SYSDBA_ user, inside the PDB of our interest (_p0_ PDB here).

   3. To make sure that the cluster configuration is working, we can leverage the _test\_cluster_ option of script _orakafka.sh_.

      We pass cluster name with -c argument and bootstrap server with -b argument. We have from bootstrap server info from step 4.2.1.

      ```
      [oracle@dbass bin]$ pwd  
      /home/oracle/ora_kafka_home/orakafka/bin  
      [oracle@dbass bin]$ ./orakafka.sh test_cluster -c kc1 -b cell-1.streaming.ap-mumbai-1.oci.oraclecloud.com:9092  
      Kafka cluster configuration test succeeded.  
         
      Kafka cluster configuration test - "KC1"  
      ------------------------------------------------------  
      KAFKA_BOOTSTRAP_SERVERS=cell-1.streaming.ap-mumbai-1.oci.oraclecloud.com:9092  
      LOCATION_FILE="/home/oracle/ora_kafka_home/app_data/clusters/KC1/logs/orakafka_stream_KC1.loc"  
      ora_kafka_operation=metadata  
      kafka_bootstrap_servers=cell-1.streaming.ap-mumbai-1.oci.oraclecloud.com:9092  
         
      List topics output:  
      StreamForOsak,3  
      ...  
      For full log, please look at /home/oracle/ora_kafka_home/app_data/clusters/KC1/logs/test_KC1.log.2021.02.20-09.01.12  
      Test of Kafka cluster configuration for "KC1" completed.  
         
      The above information is written to /home/oracle/ora_kafka_home/app_data/logs/test_cluster.log.2021.02.20-09.01.12  
         
      [oracle@dbass bin]$
      ```

      As you can see our Kafka cluster configuration test succeeded! We get the list of topics, AKA the Stream service streams in the output here. Topic names are followed by the number of partitions for that stream. Here we have the number of partitions as three, as the Stream service stream StreamForOsak has exactly three partitions.

3. Configure an Oracle Database user for the added Kafka cluster.

   We already have created an Oracle Pluggable Databases (PDB) level Oracle Database user with username books\_admin, for the PDB named _p0_.  We use the _adduser\_cluster_ option to grant required permissions for user books\_admin on Kafka cluster _kc1_. 

[oracle@dbass bin]$ ./orakafka.sh adduser_cluster -c kc1 -u books_admin


   
Step1: Generate DDL to grant permissions on cluster configuration directory to "BOOKS_ADMIN"
--------------------------------------------------------------------------------------------

Execute the following script while connected as sysdba  
to grant permissions on cluster conf directory :  
 @/home/oracle/ora_kafka_home/app_data/scratch/orakafka_adduser_cluster_KC1_user1.sql  
Step1 successfully generated script.  
   
***********SUMMARY************  
   
TODO tasks:  
 
1. Execute the following SQL while connected as sysdba:  
      @/home/oracle/ora_kafka_home/app_data/scratch/orakafka_adduser_cluster_KC1_user1.sql  
      
   The above information is written to /home/oracle/ora_kafka_home/app_data/logs/adduser_cluster.log.2021.02.20-10.45.57  
      
   [oracle@dbass bin]$

Al igual que en el paso anterior, tenemos que realizar una tarea TODO.

Según la salida, tenemos un script SQL generado automáticamente. Necesitamos ejecutar esta secuencia de comandos (como sysdba), para otorgar al usuario de Oracle Database books_admin permisos sobre el directorio de configuración del cluster para el cluster kc1.

Este directorio en nuestro caso es /home/oracle/ora_kafka_home/app_data/clusters/KC1.


[oracle@dbass ~]$ sqlplus / as sysdba  
SQL*Plus: Release 19.0.0.0.0 - Production on Sat Feb 20 10:56:17 2021  
Version 19.9.0.0.0  
Copyright (c) 1982, 2020, Oracle.  All rights reserved.  
   
Connected to:  
Oracle Database 19c Standard Edition 2 Release 19.0.0.0.0 - Production  
Version 19.9.0.0.0  
   
SQL> alter session set container=p0;  
Session altered.  
SQL> @/home/oracle/ora_kafka_home/app_data/scratch/orakafka_adduser_cluster_KC1_user1.sql  
   
PL/SQL procedure successfully completed.   
Granting permissions on "KC1_CONF_DIR" to "BOOKS_ADMIN"  
Grant succeeded.`

`The above information is written to /home/oracle/ora_kafka_home/app_data/logs/orakafka_adduser_cluster_KC1_user1.log  
Disconnected from Oracle Database 19c Standard Edition 2 Release 19.0.0.0.0 - Production  
Version 19.9.0.0.0  
[oracle@dbass ~]$

  1. Ahora aprovechamos la opción install de orakafka.sh. Transferimos el directorio principal (con argumento -p) para los datos de este usuario relacionados con sus actividades OSaK en los clusters de Kafka a los que se agrega.

    Este comando genera automáticamente dos scripts de lenguaje de definición de datos (DDL) para nosotros de la siguiente manera:
     

       [oracle@dbass bin]$ ./orakafka.sh install -u books_admin -r /home/oracle/ora_kafka_home/books_admin_user_data_dir  
       
    Step1: Creation of filesystem location and default directories  
    --------------------------------------------------------------  
    Created filesystem location directory at /home/oracle/ora_kafka_home/books_admin_user_data_dir/orakafka_location_dir  
    Created filesystem default directory at /home/oracle/ora_kafka_home/books_admin_user_data_dir/orakafka_default_dir  
    Step1 succeeded.  
       
    Step2: Generate DDL for creation of DB location and default directories  
    --------------------------------------------------------------  
    Execute the following SQL script while connected as sysdba  
    to setup database directories:  
     @/home/oracle/ora_kafka_home/app_data/scratch/setup_db_dirs_user1.sql  
    On failure, to cleanup location and default directory setup, please run "./orakafka.sh uninstall -u 'BOOKS_ADMIN'"  
    Step2 successfully generated script.  
       
    Step3: Install ORA_KAFKA package in "BOOKS_ADMIN" user schema  
    --------------------------------------------------------------  
    Execute the following script in user schema "BOOKS_ADMIN" to  
    install ORA_KAFKA package in the user schema  
     @/home/oracle/ora_kafka_home/app_data/scratch/install_orakafka_user1.sql  
    On failure, to cleanup ORA_KAFKA package from user schema, please run "./orakafka.sh uninstall -u 'BOOKS_ADMIN'"  
    Step3 successfully generated script.  
       
    ***********SUMMARY************  
       
    TODO tasks:  
       
    1. Execute the following SQL while connected as sysdba:  
       @/home/oracle/ora_kafka_home/app_data/scratch/setup_db_dirs_user1.sql  
    2. Execute the following SQL in user schema:  
       @/home/oracle/ora_kafka_home/app_data/scratch/install_orakafka_user1.sql  
       
    The above information is written to /home/oracle/ora_kafka_home/app_data/logs/install.log.2021.02.20-11.33.09  
       
    [oracle@dbass bin]
    
  2. La primera tarea es ejecutar un script SQL para registrar los dos directorios, /home/oracle/ora_kafka_home/books_admin_user_data_dir/orakafka_location_dir y /home/oracle/ora_kafka_home/books_admin_user_data_dir/orakafka_default_dir
    con Oracle DB, como objetos de directorios.

    Como se indica en la salida, es necesario ejecutarla con privilegios SYSDBA, de la siguiente manera:

       [oracle@dbass ~]$ sqlplus / as sysdba  
    SQL*Plus: Release 19.0.0.0.0 - Production on Sat Feb 20 11:46:04 2021  
    Version 19.9.0.0.0  
    Copyright (c) 1982, 2020, Oracle.  All rights reserved.  
    Connected to:  
    Oracle Database 19c Standard Edition 2 Release 19.0.0.0.0 - Production  
    Version 19.9.0.0.0  
       
    SQL> alter session set container=p0;  
       
    Session altered.  
       
    SQL> @/home/oracle/ora_kafka_home/app_data/scratch/setup_db_dirs_user1.sql  
    Checking if user exists..  
       
    PL/SQL procedure successfully completed.  
    Creating location and default directories..  
    PL/SQL procedure successfully completed.  
    Directory created.  
    Directory created.  
    Grant succeeded.  
    Grant succeeded.  
    Creation of location dir "BOOKS_ADMIN_KAFKA_LOC_DIR" and default dir "BOOKS_ADMIN_KAFKA_DEF_DIR" completed.  
    Grant of required permissions on "BOOKS_ADMIN_KAFKA_LOC_DIR","BOOKS_ADMIN_KAFKA_DEF_DIR" to "BOOKS_ADMIN" completed.  
    The above information is written to /home/oracle/ora_kafka_home/app_data/logs/setup_db_dirs_user1.log  
    Disconnected from Oracle Database 19c Standard Edition 2 Release 19.0.0.0.0 - Production  
    Version 19.9.0.0.0  
    [oracle@dbass ~]$
    

    Tome nota de los objetos de directorio creados por los scripts SQL anteriores, como BOOKS_ADMIN_KAFKA_LOC_DIR y BOOKS_ADMIN_KAFKA_DEF_DIR.

    Ahora vamos a llegar a la segunda tarea de TODO.

  3. Aquí se ejecuta otro script SQL como books_admin (y no sysdba) en la PDB p0. Este script instalará el acceso SQL de Oracle a los objetos y paquetes de Kafka en el esquema que pertenece a books_admin.

       [oracle@dbass ~]$ sqlplus books_admin@p0pdb #p0pdb is tns entry for p0 pdb  
    SQL*Plus: Release 19.0.0.0.0 - Production on Sat Feb 20 11:52:33 2021  
    Version 19.9.0.0.0  
    Copyright (c) 1982, 2020, Oracle.  All rights reserved.  
    Enter password:  
    Last Successful login time: Fri Feb 19 2021 14:04:59 +00:00  
    Connected to:  
    Oracle Database 19c Standard Edition 2 Release 19.0.0.0.0 - Production  
    Version 19.9.0.0.0  
       
    SQL> ALTER SESSION SET CONTAINER=p0;  
    Session altered.  
       
    SQL> @/home/oracle/ora_kafka_home/app_data/scratch/install_orakafka_user1.sql  
    Verifying user schema..  
       
    PL/SQL procedure successfully completed.  
       
    Verifying that location and default directories are accessible..  
       
    PL/SQL procedure successfully completed.  
       
    Installing ORA_KAFKA package in user schema..  
    .. Creating ORA_KAFKA artifacts  
       
    Table created.  
    Table created.  
    Table created.  
    Table created.  
    Package created.  
    No errors.  
    Package created.  
    No errors.  
    Package body created.  
    No errors.  
    Package body created.  
    No errors.  
    The above information is written to /home/oracle/ora_kafka_home/app_data/logs/install_orakafka_user1.log  
    Disconnected from Oracle Database 19c Standard Edition 2 Release 19.0.0.0.0 - Production  
    Version 19.9.0.0.0  
    [oracle@dbass ~]$
    

Registrar el cluster de servicio de flujo con Oracle Database y crear vistas para el flujo

En este paso, el cluster kc1 con Oracle Database y crea vistas para el flujo StreamForOsak.

Todas las consultas SQL para este paso se ejecutarán como usuario books_admin y en la PDB p0.

  1. Registre el cluster con acceso SQL de Oracle al procedimiento de Kafka ORA_KAFKA.REGISTER_CLUSTER.

BEGIN  
ORA_KAFKA.REGISTER_CLUSTER  
('kc1', -- cluster name  
'cell-1.streaming.ap-mumbai-1.oci.oraclecloud.com:9092', -- fill up your bootstrap server here   
'BOOKS_ADMIN_KAFKA_DEF_DIR', -- default directory for external table created in previous steps  
'BOOKS_ADMIN_KAFKA_LOC_DIR', -- this directory object too is created in previous steps  
'KC1_CONF_DIR', --config dir for cluster  
'Registering kc1 for this session'); --description   
END;  
/

  1. Cree una tabla de Oracle Database denominada BOOKS, que debe tener el mismo esquema que los mensajes de nuestro tema de Kafka StreamForOsak.

CREATE TABLE BOOKS ( -- reference table. It is empty table. Schema of this table must correspond to Kafka Messages  
id int,  
title varchar2(50),  
author_name varchar2(50),  
author_email varchar2(50),  
publisher varchar2(50)  
);  
/

  1. Cree una vista para nuestro tema StreamForOsak con el procedimiento almacenado de ayuda desde el acceso SQL de Oracle al paquete de Kafka CREATE_VIEWS de la siguiente manera.

DECLARE  
 application_id VARCHAR2(128);  
 views_created INTEGER;  
BEGIN  
ORA_KAFKA.CREATE_VIEWS  
('kc1', -- cluster name  
'OsakApp0', -- consumer group name that OSS or Kafka cluster sees.  
'StreamForOsak', -- Kafka topic aka OSS stream name  
'CSV', -- format type  
'BOOKS', -- database reference table  
views_created, -- output  
application_id); --output  
dbms_output.put_line('views created  = ' || views_created);  
dbms_output.put_line('application id = ' || application_id);  
END;  
/

El acceso SQL de Oracle a Kafka soporta dos formatos para mensajes de Kafka, CSV y JSON. Aquí estamos utilizando CSV. Por lo tanto, un mensaje de Kafka de ejemplo que cumpla con el esquema de la tabla BOOKS puede ser 101, Famous Book, John Smith, john@smith.com, First Software.


tar xvf sqldeveloper-20.4.0.379.2205-20.4.0-379.2205.noarch.rpm

  1. El paso anterior creará una vista para cada partición del tema. Dado que el tema StreamForOsak tiene tres particiones, tendremos tres vistas. El nombre de cada vista tendrá el formato KV_<CLUSTER_NAME>_<GROUP_NAME>_TOPIC_<NUM_OF_PARTITION>.

Aquí tenemos tres vistas, como se muestra a continuación, una para cada partición.

Por lo tanto, la vista KV_KC1_OSAKAPP0_STREAMFOROSAK_0 se asigna a la partición 0, la vista KV_KC1_OSAKAPP0_STREAMFOROSAK_2 se asigna a la partición 1 y, por último, KV_KC1_OSAKAPP0_STREAMFOROSAK_0 se asigna a la partición 2 del tema StreamForOsak.

Producir datos en el flujo

  1. Produzca un mensaje de prueba para transmitir StreamForOsak mediante el botón Producir mensaje de prueba de la consola web de Oracle Cloud Infrastructure (OCI), como se muestra a continuación.

Al hacer clic en Producir mensaje de prueba, se muestra una ventana con un campo para introducir nuestro mensaje de prueba, como se muestra a continuación.

Introducimos el siguiente mensaje:

200, Andrew Miller Part 1, MS Brown, mb@example.com, First Software

No hace falta decirlo, también podríamos haber utilizado la API estándar de productor de Kafka en Java/Python u OCI SDK para el servicio Stream, para producir el mensaje en el flujo de servicio Stream.

  1. Después de hacer clic en Producir en la ventana anterior, publicamos el mensaje para el flujo StreamForOsak. Podemos utilizar la utilidad Cargar mensaje para ver la partición en la que se encuentra nuestro mensaje.

Como se muestra anteriormente, nuestro mensaje aparece en la partición 2 del tema StreamForOsak.

Recuperación de los mensajes del servicio Stream con las vistas de base de datos configuradas por el acceso SQL de Oracle a los procedimientos almacenados de Kafka

Podemos ejecutar una consulta SQL simple:


SELECT * FROM KV_KC1_OSAKAPP0_STREAMFOROSAK_2_

O como se muestra a continuación, simplemente podemos abrir el desarrollador de SQL de vista para ver los datos en la vista KV_KC1_OSAKAPP0_STREAMFOROSAK_2:

Elegimos la vista KV_KC1_OSAKAPP0_STREAMFOROSAK_2, ya que corresponde a la partición 2 del flujo StreamForOsak. Por lo tanto, cada vez que ejecutamos la consulta SELECT SQL para esta vista, va a la partición correspondiente de su tema asociado y recupera nuevos mensajes sin confirmar (para el grupo de consumidores OsakApp0), como filas en la vista. Cada mensaje obtiene su propia fila en la vista. Tenga en cuenta que los datos de estas vistas no se mantienen.

Los ojos curiosos también pueden observar que Oracle SQL accede a Kafka recupera y almacena información de metadatos como el desplazamiento del mensaje, su partición y el registro de hora en columnas adicionales.

Lo más probable es que desee volver a consultar esta vista una y otra vez, y para cada ejecución de la consulta, desea que la vista se mueva secuencialmente en el flujo. Puede lograrlo fácilmente con el siguiente fragmento de código canónico:


LOOP
ORA_KAFKA.NEXT_OFFSET(‘_KV_KC1_OSAKAPP0_STREAMFOROSAK_2_’);

SELECT * FROM KV_KC1_OSAKAPP0_STREAMFOROSAK_2_;

ORA_KAFKA.UPDATE_OFFSET(‘_KV_KC1_OSAKAPP0_STREAMFOROSAK_2_’);

COMMIT;
END LOOP;

Como ya se ha dicho, al utilizar el acceso SQL de Oracle a Kafka, Oracle Database gestiona los desplazamientos de flujo, no el servicio Stream. Activa en tablas del sistema para realizar un seguimiento del posicionamiento de los desfases de todas las particiones a las que accede la vista KV_KC1_OSAKAPP0_STREAMFOROSAK_2.

La llamada NEXT_OFFSET simplemente enlaza la vista KV_KC1_OSAKAPP0_STREAMFOROSAK_2 a un nuevo juego de desfases que representan nuevos datos que viven en la partición de Oracle Cloud Infrastructure Streaming a la que accede la vista. UPDATE_OFFSET comunica cuántas filas se leyeron para cada partición y avanza las compensaciones a nuevas posiciones.

El COMMIT garantiza que esta unidad de trabajo sea conforme con ACID.

Conclusión

En este tutorial, nos hemos centrado en cómo podemos instalar Oracle SQL access to Kafka y utilizarlo con el servicio Stream. Como el servicio Stream es compatible con Kafka, es como un cluster de Kafka para el acceso SQL de Oracle a Kafka.

Tenga en cuenta que aquí apenas hemos reutilizado la superficie en lo que respecta a la funcionalidad que ofrece Oracle SQL para el acceso a Kafka. El acceso SQL de Oracle a Kafka es altamente configurable. Incluso podemos almacenar los datos de transmisión en tablas de Oracle Database, que se mantienen en el disco, a diferencia de las vistas. Consulte los archivos README proporcionados en el acceso SQL de Oracle a la instalación de Kafka y las referencias siguientes para obtener más información.

  1. Documentación para desarrolladores de Oracle PL/SQL para el acceso SQL de Oracle a Kafka
  2. Blog de Oracle sobre acceso SQL de Oracle a Kafka
  3. Acceso de Oracle SQL a Kafka dentro de Oracle Big Data SQL
  4. Oracle Cloud Infrastructure Streaming
  5. Compatibilidad con la API de Kafka del servicio de flujo

Agradecimientos

````````````````````````````````````````` ````````````````````````````````````````````````` ``` ``````` ````````` ``````` ``````` ````````````` ```````` ````````````````` ````` `` ````` `` ```````````` `` `````` ``````` ````````````` ``````` ```````` `` `` ````` ```````` `` ``````` ```````````````` ```````````````` `` `` `` `````` ``````````` `` ````` ``````` `` `` `` `` `` `````````````````````` `` ````` `` ```` `````` `` `` `` `` `````` ````````````` ````````````` `` `` `` ```````````` `` ````` `` `````````````` `` `````` `` `````` `` `` `` `` `` `` `` `` `` `` `````````````` `` `` `` `` ```` `` `` `` `` `` ````` ````` `` `` ````````` ``````` `` ````` `````` `` ```` `` `` `` `` `````` `` `````` `` `` `````````````````` ````````````````````````````` `` `` `````````````````````` `````` `` ```` `` `` ```````````````` `````` `` ``````````` ``````` `````` `` ````` `` `` `` `` `` `` `` `` `` `` `` ``````` `````` ``` `` `` ````````````` `` ````` `` ``````` `` `` ````` `` `` `` `` `` ```` `` `` `` ```` `` ````` `` `` `` `````` `` `` `` `` `` `` `` `` ````` `` ````````` `` `` `` `` `` ```` `` ````` `` ```````` `` `` `` `` `` `` ``` `` `` ``` `` `` ```` `` `` `` `` `` ``````````````````` `````````` `` `` `` `` `` `` `` ``` `` ``

Más recursos de aprendizaje

Explore otras prácticas en docs.oracle.com/learn o acceda a contenido de aprendizaje más gratuito en el canal YouTube de Oracle Learning. Además, visite education.oracle.com/learning-explorer para convertirse en un explorador de formación de Oracle.

Para obtener documentación sobre los productos, visite Oracle Help Center.