注:

使用 Oracle Cloud Infrastructure Streaming 和 Oracle Database 进行流分析

简介

Oracle Cloud Infrastructure Streaming 是 Oracle Cloud Infrastructure (OCI) 上一个高度可扩展的高可用性流处理服务。流服务完全无服务器,API 可与 Apache Kafka 兼容。

Oracle SQL 访问 Kafka 是一个 Oracle PL/SQL 程序包和一个外部表预处理程序。它使 Oracle Database 能够读取和处理 Kafka 主题中的事件和消息作为 Oracle Database 中的视图或表。从 Kafka 到数据位于 Oracle Database 表中或从 Oracle Database 视图中可见后,可以使用 Oracle PL/SQL 的全部功能进行查询,就像 Oracle Database 中的任何其他数据一样。

通过 Oracle SQL 访问启用了 Kafka 的视图来检索数据时,数据不会保留在 Oracle Database 中。但是,使用 Oracle SQL 访问基于 Kafka 的表时,该访问会持久保留在 Oracle Database 中。因此,访问 Kafka 的 Oracle SQL 后,开发人员可以灵活地在 Oracle Database 上保存流数据。有关 Oracle SQL 访问 Kafka 的完整概览和用例,请参阅此博客文章使用 Oracle SQL 访问 Kafka 视图将移动中的数据与静态数据集成

简而言之,通过对 Kafka 的 Oracle SQL 访问,可以使用 Oracle PL/SQL 处理动态数据(在 Oracle Database 表中)静态数据。因此,应用程序(例如,Java 数据库连接 (JDBC) 应用程序)可以在 Oracle Database 事务处理中处理来自 Kafka 和关键 Oracle Database 数据的实时事件,从而提供 ACID 保证。当应用程序从 Oracle Database 中单独提取 Kafka 事件和数据时,这一点并不容易。

福利

  1. 客户可以使用对 Kafka 的 Oracle SQL 访问来使用 Oracle PL/SQL 运行实时流分析作业,直接从流服务读取数据,而无需将数据移到外部数据存储中。
  2. 客户还可以使用 Oracle SQL 访问 Kafka,而实现从流服务到 Oracle Database 的数据移动,而不进行任何处理。
  3. 流处理操作可以在应用程序控制的 Oracle ACID 事务处理的上下文中执行。
  4. Oracle SQL 访问 Kafka 仅充当 Kafka 使用者,也不充当 Kafka 生产者。整个偏移管理由 OSaK 处理。它将这些信息存储在 Oracle Database 中的元数据表中。因此,通过对 Kafka 的 Oracle SQL 访问,它可以完全一次处理语义,因为它可以将 Kafka 和流服务和应用程序数据分区偏移提交到 Oracle Database 的单个符合 ACID 的 Oracle Database 事务处理。这样可以避免流记录丢失或重新读取。

用例

假定您要将流数据与关系数据关联或联接的任何用例,例如:

  1. 您希望将来自聊天 IoT 设备(很可能存在于客户终端中)的流数据与存储在源事实关系 Oracle Database 中的相关客户信息组合起来。
  2. 您希望计算流向流处理服务的股票价格的精确指数移动平均值。您需要使用完全一致的语义来实现这一点。您希望将此数据与存储在 Oracle Database 中的有关该股票的静态信息相结合,例如名称、公司标识、市场上限等。

请注意,由于对 Kafka 的 Oracle SQL 访问是需要在 Oracle 服务器主机上手动安装的 PL/SQL 程序包,因此它只能用于自我管理的(内部部署或云端)Oracle Database 安装。它不能与 Oracle Cloud Infrastructure (OCI) 上的 Oracle Autonomous Database 等无服务器 Oracle Database 产品配合使用。

由于流服务与 Kafka 兼容,因此它可以与 Oracle SQL 访问 Kafka 无缝工作。从 Oracle SQL 访问 Kafka 的观点中,流服务流池是 Kafka 集群,流服务流是 Kafka 集群中的主题。

在本教程中,我们将展示如何轻松地将 Oracle SQL 访问 Kafka 与流服务集成。

注意:如果您已经熟悉对 Kafka、流服务和 Kafka 的 Oracle SQL 访问,并且希望使用流服务的 Oracle SQL 访问来访问 Kafka,则可以直接跳转到对 Kafka 步骤 2.1 的 Oracle Cloud Infrastructure Streaming 集群。根据需要浏览教程的其余部分。

先决条件

通过 Oracle Cloud Infrastructure Streaming 和 Oracle SQL 访问 Kafka 集成

在 Oracle Cloud Infrastructure 中创建流池和流

  1. 登录到 OCI 账户 / 租户并设置名为 StreampoolForOsak 的流服务流池和名为 StreamForOsak 的流服务流池,如下所示。

    在 OCI 上创建 Streampool

  2. 我们现在在刚刚创建的 streampool StreampoolForOsak 中创建名为 StreamForOsak 的流。

    在 OCI 中创建流

对于这些资源创建,您可以使用任何现有区间。为了方便起见,我们创建了一个名为 OssOsakIntegration 的新区间,所有资源都在同一区间中。

在流服务术语中,Kafka 主题称为流。因此,从 Oracle SQL 访问 Kafka 的角度来看,流 StreamForOsak 是一个包含三个分区的 Kafka 主题。

现在,我们已完成流服务的创建。此后,流服务和 Kafka 的术语可以互换。同样,流和 Kafka 主题是可互换的。

在 Oracle Cloud Infrastructure 中创建用户、组和策略

如果现有用户具有使用流服务的正确授权,则可以完全跳过步骤 2。

  1. 要通过 Oracle SQL 访问 Kafka 来使用流服务流,我们需要为其创建新的 OCI 用户。我们使用用户名 OssOsakUser 从 OCI Web 控制台创建新用户,如下所示:

    OCI 用户创建

  2. 要让用户 OssOsakUser 使用流服务进行自我验证(使用 Kafka API),我们需要为此新用户创建验证令牌,如下所示:

    用户验证令牌创建

    生成令牌后,您将获得第一个机会,这是唯一一次查看和复制 auth-token 的机会。因此,请复制验证令牌并使其安全地保留在您以后可以访问该令牌的位置。稍后需要此功能,特别是为访问 Kafka 的 Oracle SQL 配置 Kafka 集群时。对 Kafka 的 Oracle SQL 访问将使用该用户名(即 OssOsakUser 及其验证令牌)在内部使用 Kafka 使用者 API 访问流服务。

  3. OssOsakUser 还需要在 OCI IAM 中具有一组正确的权限才能访问流服务集群。

    在 OCI 中,用户在分配给自己所属用户组的策略的帮助下获得权限。因此,我们需要为 OssOsakUser 创建组,然后为该组创建一个策略。

    按如下所示创建用户组

    创建用户组

  4. 将用户 OssOsakUser 添加到用户组 OssOsakUserGroup。

    将用户添加到组

  5. 要授权 OssOsakUser 使用流服务流(专门用于发布和读取来自该流的消息),我们需要在租户中创建以下策略。此策略在创建后,将权限授予组 OssOsakUserGroup 中的所有用户。由于用户 OssOsakUser 位于同一个组中,因此它会获得相同的特权。

    策略

    上述策略相同的文本片段如下所示。

    Allow group OssOsakUserGroup to manage streams in compartment OssOsakIntegration  
    Allow group OssOsakUserGroup to manage stream-push in compartment OssOsakIntegration   
    Allow group OssOsakUserGroup to manage stream-pull in compartment OssOsakIntegration  
    Allow group OssOsakUserGroup to manage stream-family in compartment OssOsakIntegration
    

在 Oracle Database 主机上安装对 Kafka 的 Oracle SQL 访问权限

  1. 可以在广泛使用的 SQL Developer 中访问 Kafka 工具包。

    使用官方 SQL Developer 链接下载最新版本的 SQL Developer。截至本教程的编写过程,最新 SQL Developer 版本为 20.4。

    请确保下载与 Oracle Database 主机相同的平台的 Oracle SQL Developer。在开发工具包上,我们下载适用于 Oracle Linux RPM 的 Oracle SQL Developer(因为 Oracle Database 正在 Oracle Linux 平台上运行)。

    下载完成后,使用 unziptar 命令将 Oracle SQL Developer 的 RPM/zip 文件的内容提取到任何目录 / 文件夹中。

    tar xvf sqldeveloper-20.4.0.379.2205-20.4.0-379.2205.noarch.rpm
    
  2. 转到提取 Oracle SQL Developer 内容的目录,并按如下方式查找对名为 orakafka.zip 的 Kafka ZIP 文件的 Oracle SQL 访问权限:

    $ find . -name 'orakafta*'  
    ./sqldeveloper/orakafta  
    ./sqldeveloper/orakafta/orakafta.zip
    

    对 Kafka 工具包的 Oracle SQL 访问位于 orakafka.zip 文件中。对于此演示,我们不感兴趣的是 SQL 开发人员的其余内容。至于安装和使用对 Kafka 的 Oracle SQL 访问,我们只需要 orakafka.zip

  3. 使用 scp 命令或基于 GUI 的 FTP 客户机(如 FileZilla)将 orakafka.zip 文件复制到 Oracle Database 主机。

    SSH 进入 Oracle Database 节点,并使用 mv_ commandorakafka.zip 移至 /home/oracle(oracle 用户的主目录)。

  4. 有关安装对 Kafka 的 Oracle SQL 访问的其余说明,需要切换到 Oracle Database 主机上的 oracle 用户。

    确保当前工作目录为 /home/oracleorakafka.zip 已位于同一目录中。

    [opc@dbass ~]$ sudo su - oracle  
    Last login: Sat Feb 20 09:31:12 UTC 2021  
    [oracle@dbass ~]$ pwd  
    /home/oracle  
    [oracle@dbass ~]$ ls -al  
    total 3968  
    drwx------ 6 oracle oinstall    4096 Feb 19 17:39 .  
    drwxr-xr-x 5 root   root        4096 Feb 18 15:15 ..  
    -rw------- 1 oracle oinstall    4397 Feb 20 09:31 .bash_history  
    -rw-r--r-- 1 oracle oinstall      18 Nov 22  2019 .bash_logout  
    -rw-r--r-- 1 oracle oinstall     203 Feb 18 15:15 .bash_profile  
    -rw-r--r-- 1 oracle oinstall     545 Feb 18 15:20 .bashrc  
    -rw-r--r-- 1 oracle oinstall     172 Apr  1  2020 .kshrc  
    drwxr----- 3 oracle oinstall    4096 Feb 19 17:37 .pki  
    drwxr-xr-x 2 oracle oinstall    4096 Feb 18 15:20 .ssh  
    -rw-r--r-- 1 oracle oinstall 4002875 Feb 19 17:38 orakafka.zip
    
  5. 提取或解压缩 orakafka.zip。这将创建一个名为 orakafka-<version> 的新目录,其中包含提取的内容。在本例中,为 orakafka-1.2.0,如下所示:

    [oracle@dbass ~]$ unzip orakafka.zip  
    Archive:  orakafka.zip 
    creating: orakafka-1.2.0/  
    

提取:orakafka-1.2.0/kit_version.txt
正在激增:orakafka-1.2.0/orakafka_distro_install.sh
提取:orakafka-1.2.0/orakafka.zip
正在激增:orakafka-1.2.0/README


6. Now we follow the instructions found in the _**orakafka-1.2.0/README**_ for the setup of Oracle SQL access to Kafka. We follow _simple install_ for single-instance Oracle Database. 

This README doc has instructions for Oracle SQL access to Kafka installation on Oracle Real Application Clusters (Oracle RAC) as well. By and large, in the case of Oracle RAC, we need to replicate the following steps on all nodes of Oracle RAC. Please follow the README for details.


[oracle@dbass ~]$ cd orakafka-1.2.0/
[oracle@dbass orakafka-1.2.0]$ ls -al
total 3944
drwxrwxr-x 2 oracle oinstall    4096 Feb 20 09:12 .
drwx—— 6 oracle oinstall    4096 Feb 19 17:39 ..
-rw-r–r– 1 oracle oinstall    6771 Oct 16 03:11 README 
-rw-r–r– 1 oracle oinstall       5 Oct 16 03:11 kit_version.txt
-rw-rw-r– 1 oracle oinstall 3996158 Oct 16 03:11 orakafka.zip
-rwxr-xr-x 1 oracle oinstall   17599 Oct 16 03:11 orakafka_distro_install.sh


tar xvf sqldeveloper-20.4.0.379.2205-20.4.0-379.2205.noarch.rpm


7. As per _./orakafka-1.2.0/README_, we install Oracle SQL access to Kafka on the Oracle Database host with the help of _./orakafka-1.2.0/orakafka\_distro\_install.sh_ script. Argument -p lets us specify the location or base directory for the Oracle SQL access to Kafka installation on this host. 

   We choose the newly created empty directory named ora\_kafka\_home as the OSaK base directory on this host. So the full path of the OSaK base directory will be _/home/oracle/ora\_kafka\_home_.

[oracle@dbass ~]$ ./orakafka-1.2.0/orakafka_distro_install.sh -p ./ora_kafka_home/
 
 Step Create Product Home::
————————————————————–
…../home/oracle/ora_kafka_home already exists..
Step Create Product Home: completed.
PRODUCT_HOME=/home/oracle/ora_kafka_home
 
 Step Create app_data home::
————————————————————–
….. creating /home/oracle/ora_kafka_home/app_data and subdirectories
……Generated CONF_KIT_HOME_SCRIPT=/home/oracle/ora_kafka_home/app_data/scripts/configure_kit_home.sh
……Generated CONF_APP_DATA_HOME_SCRIPT=/home/oracle/ora_kafka_home/configure_app_data_home.sh
Step Create app_data home: completed.
APP_DATA_HOME=/home/oracle/ora_kafka_home/app_data
 
 Step unzip_kit::
————————————————————–
…..checking for existing binaries in /home/oracle/ora_kafka_home/orakafka
…..unzip kit into /home/oracle/ora_kafka_home/orakafka
Archive:  /home/oracle/orakafka-1.2.0/orakafka.zip
   creating: /home/oracle/ora_kafka_home/orakafka/
 extracting: /home/oracle/ora_kafka_home/orakafka/kit_version.txt
  inflating: /home/oracle/ora_kafka_home/orakafka/README
   creating: /home/oracle/ora_kafka_home/orakafka/doc/
  inflating: /home/oracle/ora_kafka_home/orakafka/doc/README_INSTALL
   creating: /home/oracle/ora_kafka_home/orakafka/jlib/
  inflating: /home/oracle/ora_kafka_home/orakafka/jlib/osakafka.jar
  inflating: /home/oracle/ora_kafka_home/orakafka/jlib/kafka-clients-2.5.0.jar
  inflating: /home/oracle/ora_kafka_home/orakafka/jlib/slf4j-simple-1.7.28.jar
  inflating: /home/oracle/ora_kafka_home/orakafka/jlib/lz4-java-no-jni-1.7.1.jar
  inflating: /home/oracle/ora_kafka_home/orakafka/jlib/snappy-java-no-jni-1.1.7.3.jar
  inflating: /home/oracle/ora_kafka_home/orakafka/jlib/zstd-no-jni-1.4.4-7.jar
  inflating: /home/oracle/ora_kafka_home/orakafka/jlib/slf4j-api-1.7.30.jar
   creating: /home/oracle/ora_kafka_home/orakafka/bin/
  inflating: /home/oracle/ora_kafka_home/orakafka/bin/orakafka_stream.sh
  inflating: /home/oracle/ora_kafka_home/orakafka/bin/orakafka.sh
   creating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/
  inflating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/removeuser_cluster.sh
  inflating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/setup_all.sh
  inflating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/remove_cluster.sh
  inflating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/verify_install.sh
  inflating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/add_cluster.sh
  inflating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/config_util.sh
  inflating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/test_views.sh
  inflating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/install.sh
  inflating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/set_java_home.sh
  inflating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/list_clusters.sh
  inflating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/adduser_cluster.sh
  inflating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/uninstall.sh
  inflating: /home/oracle/ora_kafka_home/orakafka/bin/scripts/test_cluster.sh
   creating: /home/oracle/ora_kafka_home/orakafka/conf/
  inflating: /home/oracle/ora_kafka_home/orakafka/conf/orakafka.properties.template
   creating: /home/oracle/ora_kafka_home/orakafka/sql/
  inflating: /home/oracle/ora_kafka_home/orakafka/sql/orakafkatab.plb
  inflating: /home/oracle/ora_kafka_home/orakafka/sql/catnoorakafka.sql
  inflating: /home/oracle/ora_kafka_home/orakafka/sql/catorakafka.sql
  inflating: /home/oracle/ora_kafka_home/orakafka/sql/pvtorakafkaus.plb
  inflating: /home/oracle/ora_kafka_home/orakafka/sql/orakafka_pkg_uninstall.sql
  inflating: /home/oracle/ora_kafka_home/orakafka/sql/orakafkab.plb
  inflating: /home/oracle/ora_kafka_home/orakafka/sql/pvtorakafkaub.plb
  inflating: /home/oracle/ora_kafka_home/orakafka/sql/orakafka_pkg_install.sql
  inflating: /home/oracle/ora_kafka_home/orakafka/sql/orakafkas.sql
   creating: /home/oracle/ora_kafka_home/orakafka/lib/
  inflating: /home/oracle/ora_kafka_home/orakafka/lib/libsnappyjava.so
  inflating: /home/oracle/ora_kafka_home/orakafka/lib/libzstd-jni.so
  inflating: /home/oracle/ora_kafka_home/orakafka/lib/liblz4-java.so
Step unzip_kit: completed.
 
Successfully installed orakafka kit in /home/oracle/ora_kafka_home

8. Configure JAVA\_HOME for Oracle SQL access to Kafka. We find the Java path on the node as follows:

[oracle@dbass ~]$ java -XshowSettings:properties -version 2>&1 > /dev/null | grep ‘ java.home ’
java.home = /usr/java/jre1.8.0_271-amd64
[oracle@dbass ~]$ export JAVA_HOME=/usr/java/jre1.8.0_271-amd64


   To set Java home for OSaK, we use the script _/home/oracle/ora\_kafka\_home/orakafka/bin/orakafka.sh_ script, with _set\_java\_home_ option.

[oracle@dbass bin]$ pwd
/home/oracle/ora_kafka_home/orakafka/bin
[oracle@dbass bin]$ 。/orakafka.sh set_java_home -p $JAVA_HOME

Step1:检查有效的 JAVA_HOME
---------------------
找到 /usr/java/jre1.8.0_271-amd64/bin/java, JAVA_HOME 路径有效。
Step1 succeeded.
 
Step2: JAVA version check
————————————————————–
java version “1.8.0_271”
Java(TM) SE Runtime Environment (build 1.8.0_271-b09)
Java HotSpot(TM) 64-Bit Server VM (build 25.271-b09, mixed mode)
Java version >= 1.8
Step2 succeeded.
 
Step3: Creating configure_java.sh script
————————————————————–
Wrote to /home/oracle/ora_kafka_home/app_data/scripts/configure_java.sh
Step3 succeeded.
 
Successfully configured JAVA_HOME in /home/oracle/ora_kafka_home/app_data/scripts/configure_java.sh
 
The above information is written to /home/oracle/ora_kafka_home/app_data/logs/set_java_home.log.2021.02.20-04.38.23
 
[oracle@dbass bin]$

9. Verify installation of OSaK with _verify\_install_ option of script _orakafka.sh_ as follows:

[oracle@dbass ~]$ cd ora_kafka_home/
[oracle@dbass ora_kafka_home]$ cd orakafka/bin/
[oracle@dbass bin]$ 。/orakafka.sh verify_install
检查所有文件 / 目录所有权 - 已通过
检查目录权限 - 已通过
检查预期的可执行文件 - 已通过

以上信息将写入 /home/oracle/ora_kafka_home/app_data/logs/verify_install.log.2021.02.19-18.10.15


### Set Up Oracle Cloud Infrastructure Stream Clusters for Oracle SQL Access to Kafka

1. Under _ora\_kafka\_home_, that is _/home/oracle/ora\_kafka\_home_, we have two more READMEs as follows:

[oracle@dbass ~]$ 找到 ora_kafka_home/ -name "README*"
ora_kafka_home/orakafka/README
ora_kafka_home/orakafka/doc/README_INSTALL


   _~/ora\_kafka\_home/orakafka/README_ is the readme for this release of OSaK that we have installed. Please read through this readme.

   And _ora\_kafka\_home/orakafka/doc/README\_INSTALL_ is the README for the actual setup of a Kafka cluster for Oracle SQL access to Kafka. The rest of the steps 2 onwards below follow this README by and large. As per the same, we will leverage _~/ora\_kafka\_home/orakafka/bin/orakafka.s_h script, for adding a Kafka cluster, adding an Oracle Database user for using the Kafka cluster in the next steps.

2. Add our Stream service stream pool to Oracle SQL access to Kafka.

   As mentioned earlier, from the Oracle SQL access to Kafka point of view, the Stream service stream pool is a Kafka cluster.

   We use the _add\_cluster_ option of the _orakafka.sh_ script to add the cluster to OSaK. We use the _\-c_ argument to name the cluster as _kc1_.

[oracle@dbass bin]$ pwd
/home/oracle/ora_kafka_home/orakafka/bin
[oracle@dbass bin]$ ./orakafka.sh add_cluster -c kc1

Step1: 文件系统群集配置目录的创建
---------------------
文件系统群集目录的创建已完成。
在 /home/oracle/ora_kafka_home/app_data/clusters/KC1/conf/orakafka.properties 上配置安全属性。
Step1 成功。

Step2:生成 DDL 以创建集群配置数据库目录
---------------------
以 sysdba 身份连接时执行以下 SQL 脚本
以创建集群配置数据库目录:
@/home/oracle/ora_kafka_home/app_data/scratch/orakafka_create_KC1_CONF_DIR.sql
Step2 已成功生成脚本。

****汇总*****

TODO 任务:

  1. 在 /home/oracle/ora_kafka_home/app_data/clusters/KC1/conf/orakafka.properties 上配置安全属性
  2. 以 sysdba 身份进行连接时执行以下 SQL:
    @/home/oracle/ora_kafka_home/app_data/scratch/orakafka_create_KC1_CONF_DIR.sql

以上信息将写入 /home/oracle/ora_kafka_home/app_data/logs/add_cluster.log.2021.02.20-05.23.30

[oracle@dbass bin]$


   We get two TODO tasks as per output from the above execution of the _add\_cluster_ command. 

   1. For the first task, we add security properties for our Stream service stream to `/home/oracle/ora_kafka_home/app_data/clusters/KC1/conf/orakafka.properties`. 

      Where to get Kafka-compatible security configs for the Stream service cluster also known as (AKA) streampool? We get these security configs from the OCI web console as shown below.

      Please take note of the bootstrap server endpoint(_cell-1.streaming.ap-mumbai-1.oci.oraclecloud.com:9092_ from the screenshot below) as well. It is not related to the security configs of the cluster, but we need it in later steps for connecting to our Stream service and Kafka cluster. 

      ![](images/bsend.png " ")

      We write the above config values to OSaK in the following format, to `/home/oracle/ora_kafka_home/app_data/clusters/KC1/conf/orakafka.properties` file using any text editor like say vi. 

      ![](images/osakosssec.png " ")

      For clarity, we have the same configs in text format as follows:

      ```
      security.protocol=SASL_SSL  
      sasl.mechanism=PLAIN  
      sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<YOUR_TENANCY_NAME>/<YOUR_OCI_USERID_HERE>/<YOUR_OCI_STREAMPOOL_OCID>" password=" YOUR_AUTH_TOKEN";  
      sasl.plain.username="<YOUR_TENANCY_NAME>/<YOUR_OCI_USERID_HERE>/<YOUR_OCI_STREAMPOOL_OCID>"  
      sasl.plain.password ="YOUR_AUTH_TOKEN";
      ```

      **Note:** default Kafka Consumer Configs are already pre-populated in _orakafka.properties_ for this cluster. If there is a need, one can modify these configs.

   2. For the second TODO task, do SQL script execution as follows: 

      ```
      [oracle@dbass ~]$ sqlplus / as sysdba  
      SQL*Plus: Release 19.0.0.0.0 - Production on Sat Feb 20 05:37:28 2021  
      Version 19.9.0.0.0   
      Copyright (c) 1982, 2020, Oracle.  All rights reserved.
      Connected to:  
      Oracle Database 19c Standard Edition 2 Release 19.0.0.0.0 - Production  
      Version 19.9.0.0.0   
      SQL> alter session set container=p0;  
      Session altered.  
      SQL> @/home/oracle/ora_kafka_home/app_data/scratch/orakafka_create_KC1_CONF_DIR.sql  
      Creating database directory "KC1_CONF_DIR"..  
      Directory created.  
      The above information is written to /home/oracle/ora_kafka_home/app_data/logs/orakafka_create_KC1_CONF_DIR.log  
      Disconnected from Oracle Database 19c Standard Edition 2 Release 19.0.0.0.0 - Production  
      Version 19.9.0.0.0
      ```

      Take note of directory object _KC1\_CONF\_DIR_ created by the above SQL script.

      **Note:** As dictated in add\_cluster output, we run the SQL script as a _SYSDBA_ user, inside the PDB of our interest (_p0_ PDB here).

   3. To make sure that the cluster configuration is working, we can leverage the _test\_cluster_ option of script _orakafka.sh_.

      We pass cluster name with -c argument and bootstrap server with -b argument. We have from bootstrap server info from step 4.2.1.

      ```
      [oracle@dbass bin]$ pwd  
      /home/oracle/ora_kafka_home/orakafka/bin  
      [oracle@dbass bin]$ ./orakafka.sh test_cluster -c kc1 -b cell-1.streaming.ap-mumbai-1.oci.oraclecloud.com:9092  
      Kafka cluster configuration test succeeded.  
         
      Kafka cluster configuration test - "KC1"  
      ------------------------------------------------------  
      KAFKA_BOOTSTRAP_SERVERS=cell-1.streaming.ap-mumbai-1.oci.oraclecloud.com:9092  
      LOCATION_FILE="/home/oracle/ora_kafka_home/app_data/clusters/KC1/logs/orakafka_stream_KC1.loc"  
      ora_kafka_operation=metadata  
      kafka_bootstrap_servers=cell-1.streaming.ap-mumbai-1.oci.oraclecloud.com:9092  
         
      List topics output:  
      StreamForOsak,3  
      ...  
      For full log, please look at /home/oracle/ora_kafka_home/app_data/clusters/KC1/logs/test_KC1.log.2021.02.20-09.01.12  
      Test of Kafka cluster configuration for "KC1" completed.  
         
      The above information is written to /home/oracle/ora_kafka_home/app_data/logs/test_cluster.log.2021.02.20-09.01.12  
         
      [oracle@dbass bin]$
      ```

      As you can see our Kafka cluster configuration test succeeded! We get the list of topics, AKA the Stream service streams in the output here. Topic names are followed by the number of partitions for that stream. Here we have the number of partitions as three, as the Stream service stream StreamForOsak has exactly three partitions.

3. Configure an Oracle Database user for the added Kafka cluster.

   We already have created an Oracle Pluggable Databases (PDB) level Oracle Database user with username books\_admin, for the PDB named _p0_.  We use the _adduser\_cluster_ option to grant required permissions for user books\_admin on Kafka cluster _kc1_. 

[oracle@dbass bin]$ ./orakafka.sh adduser_cluster -c kc1 -u books_admin


   
Step1: Generate DDL to grant permissions on cluster configuration directory to "BOOKS_ADMIN"
--------------------------------------------------------------------------------------------

Execute the following script while connected as sysdba  
to grant permissions on cluster conf directory :  
 @/home/oracle/ora_kafka_home/app_data/scratch/orakafka_adduser_cluster_KC1_user1.sql  
Step1 successfully generated script.  
   
***********SUMMARY************  
   
TODO tasks:  
 
1. Execute the following SQL while connected as sysdba:  
      @/home/oracle/ora_kafka_home/app_data/scratch/orakafka_adduser_cluster_KC1_user1.sql  
      
   The above information is written to /home/oracle/ora_kafka_home/app_data/logs/adduser_cluster.log.2021.02.20-10.45.57  
      
   [oracle@dbass bin]$

与上一步相似,我们还有一项要执行的 TODO 任务。

根据输出结果,有一个自动生成的 SQL 脚本。我们需要运行此脚本(以 sysdba 身份),以授予 Oracle Database 用户 books_admin 对群集 kc1 的群集配置目录的权限。

在本例中,此目录为 /home/oracle/ora_kafka_home/app_data/clusters/KC1


[oracle@dbass ~]$ sqlplus / as sysdba  
SQL*Plus: Release 19.0.0.0.0 - Production on Sat Feb 20 10:56:17 2021  
Version 19.9.0.0.0  
Copyright (c) 1982, 2020, Oracle.  All rights reserved.  
   
Connected to:  
Oracle Database 19c Standard Edition 2 Release 19.0.0.0.0 - Production  
Version 19.9.0.0.0  
   
SQL> alter session set container=p0;  
Session altered.  
SQL> @/home/oracle/ora_kafka_home/app_data/scratch/orakafka_adduser_cluster_KC1_user1.sql  
   
PL/SQL procedure successfully completed.   
Granting permissions on "KC1_CONF_DIR" to "BOOKS_ADMIN"  
Grant succeeded.`

`The above information is written to /home/oracle/ora_kafka_home/app_data/logs/orakafka_adduser_cluster_KC1_user1.log  
Disconnected from Oracle Database 19c Standard Edition 2 Release 19.0.0.0.0 - Production  
Version 19.9.0.0.0  
[oracle@dbass ~]$

  1. 现在,我们利用了 orakafka.shinstall 选项。我们传递此用户在添加的 Kafka 集群上与其 OSaK 活动相关的数据的父目录(带有 -p 参数)。

    此命令自动生成两个数据定义语言 (Data Definition Language, DDL) 脚本,如下所示:
     

       [oracle@dbass bin]$ ./orakafka.sh install -u books_admin -r /home/oracle/ora_kafka_home/books_admin_user_data_dir  
       
    Step1: Creation of filesystem location and default directories  
    --------------------------------------------------------------  
    Created filesystem location directory at /home/oracle/ora_kafka_home/books_admin_user_data_dir/orakafka_location_dir  
    Created filesystem default directory at /home/oracle/ora_kafka_home/books_admin_user_data_dir/orakafka_default_dir  
    Step1 succeeded.  
       
    Step2: Generate DDL for creation of DB location and default directories  
    --------------------------------------------------------------  
    Execute the following SQL script while connected as sysdba  
    to setup database directories:  
     @/home/oracle/ora_kafka_home/app_data/scratch/setup_db_dirs_user1.sql  
    On failure, to cleanup location and default directory setup, please run "./orakafka.sh uninstall -u 'BOOKS_ADMIN'"  
    Step2 successfully generated script.  
       
    Step3: Install ORA_KAFKA package in "BOOKS_ADMIN" user schema  
    --------------------------------------------------------------  
    Execute the following script in user schema "BOOKS_ADMIN" to  
    install ORA_KAFKA package in the user schema  
     @/home/oracle/ora_kafka_home/app_data/scratch/install_orakafka_user1.sql  
    On failure, to cleanup ORA_KAFKA package from user schema, please run "./orakafka.sh uninstall -u 'BOOKS_ADMIN'"  
    Step3 successfully generated script.  
       
    ***********SUMMARY************  
       
    TODO tasks:  
       
    1. Execute the following SQL while connected as sysdba:  
       @/home/oracle/ora_kafka_home/app_data/scratch/setup_db_dirs_user1.sql  
    2. Execute the following SQL in user schema:  
       @/home/oracle/ora_kafka_home/app_data/scratch/install_orakafka_user1.sql  
       
    The above information is written to /home/oracle/ora_kafka_home/app_data/logs/install.log.2021.02.20-11.33.09  
       
    [oracle@dbass bin]
    
  2. 第一个任务是运行 SQL 脚本,将两个目录(即 /home/oracle/ora_kafka_home/books_admin_user_data_dir/orakafka_location_dir/home/oracle/ora_kafka_home/books_admin_user_data_dir/orakafka_default_dir
    )注册为 Oracle DB 目录对象。

    如输出中所述,需要使用 SYSDBA 特权运行该命令,如下所示:

       [oracle@dbass ~]$ sqlplus / as sysdba  
    SQL*Plus: Release 19.0.0.0.0 - Production on Sat Feb 20 11:46:04 2021  
    Version 19.9.0.0.0  
    Copyright (c) 1982, 2020, Oracle.  All rights reserved.  
    Connected to:  
    Oracle Database 19c Standard Edition 2 Release 19.0.0.0.0 - Production  
    Version 19.9.0.0.0  
       
    SQL> alter session set container=p0;  
       
    Session altered.  
       
    SQL> @/home/oracle/ora_kafka_home/app_data/scratch/setup_db_dirs_user1.sql  
    Checking if user exists..  
       
    PL/SQL procedure successfully completed.  
    Creating location and default directories..  
    PL/SQL procedure successfully completed.  
    Directory created.  
    Directory created.  
    Grant succeeded.  
    Grant succeeded.  
    Creation of location dir "BOOKS_ADMIN_KAFKA_LOC_DIR" and default dir "BOOKS_ADMIN_KAFKA_DEF_DIR" completed.  
    Grant of required permissions on "BOOKS_ADMIN_KAFKA_LOC_DIR","BOOKS_ADMIN_KAFKA_DEF_DIR" to "BOOKS_ADMIN" completed.  
    The above information is written to /home/oracle/ora_kafka_home/app_data/logs/setup_db_dirs_user1.log  
    Disconnected from Oracle Database 19c Standard Edition 2 Release 19.0.0.0.0 - Production  
    Version 19.9.0.0.0  
    [oracle@dbass ~]$
    

    记下以上 SQL 脚本(即 BOOKS_ADMIN_KAFKA_LOC_DIRBOOKS_ADMIN_KAFKA_DEF_DIR)创建的目录对象。

    现在让我们开始第二个 TODO 任务。

  3. 在此处,我们在 p0 PDB 上以 books_admin(而非 sysdba)身份运行另一个 SQL 脚本。此脚本将在属于 books_admin 的方案中安装对 Kafka 程序包和对象的 Oracle SQL 访问权限。

       [oracle@dbass ~]$ sqlplus books_admin@p0pdb #p0pdb is tns entry for p0 pdb  
    SQL*Plus: Release 19.0.0.0.0 - Production on Sat Feb 20 11:52:33 2021  
    Version 19.9.0.0.0  
    Copyright (c) 1982, 2020, Oracle.  All rights reserved.  
    Enter password:  
    Last Successful login time: Fri Feb 19 2021 14:04:59 +00:00  
    Connected to:  
    Oracle Database 19c Standard Edition 2 Release 19.0.0.0.0 - Production  
    Version 19.9.0.0.0  
       
    SQL> ALTER SESSION SET CONTAINER=p0;  
    Session altered.  
       
    SQL> @/home/oracle/ora_kafka_home/app_data/scratch/install_orakafka_user1.sql  
    Verifying user schema..  
       
    PL/SQL procedure successfully completed.  
       
    Verifying that location and default directories are accessible..  
       
    PL/SQL procedure successfully completed.  
       
    Installing ORA_KAFKA package in user schema..  
    .. Creating ORA_KAFKA artifacts  
       
    Table created.  
    Table created.  
    Table created.  
    Table created.  
    Package created.  
    No errors.  
    Package created.  
    No errors.  
    Package body created.  
    No errors.  
    Package body created.  
    No errors.  
    The above information is written to /home/oracle/ora_kafka_home/app_data/logs/install_orakafka_user1.log  
    Disconnected from Oracle Database 19c Standard Edition 2 Release 19.0.0.0.0 - Production  
    Version 19.9.0.0.0  
    [oracle@dbass ~]$
    

在 Oracle Database 中注册流服务集群并为流创建视图

在此步骤中,我们将群集 kc1 与 Oracle Database 结合使用,并为流 StreamForOsak 创建视图。

此步骤的所有 SQL 查询将以 books_admin 用户和 p0 PDB 的身份运行。

  1. 使用 Oracle SQL 访问 Kafka 过程 ORA_KAFKA.REGISTER_CLUSTER 注册集群。

BEGIN  
ORA_KAFKA.REGISTER_CLUSTER  
('kc1', -- cluster name  
'cell-1.streaming.ap-mumbai-1.oci.oraclecloud.com:9092', -- fill up your bootstrap server here   
'BOOKS_ADMIN_KAFKA_DEF_DIR', -- default directory for external table created in previous steps  
'BOOKS_ADMIN_KAFKA_LOC_DIR', -- this directory object too is created in previous steps  
'KC1_CONF_DIR', --config dir for cluster  
'Registering kc1 for this session'); --description   
END;  
/

  1. 创建一个名为 BOOKS 的 Oracle Database 表,该表需要与 Kafka 主题 StreamForOsak 中的消息具有相同的方案。

CREATE TABLE BOOKS ( -- reference table. It is empty table. Schema of this table must correspond to Kafka Messages  
id int,  
title varchar2(50),  
author_name varchar2(50),  
author_email varchar2(50),  
publisher varchar2(50)  
);  
/

  1. 使用 Oracle SQL 访问 Kafka 软件包 viz CREATE_VIEWS 中的帮助存储过程,为主题 StreamForOsak 创建一个视图,如下所示。

DECLARE  
 application_id VARCHAR2(128);  
 views_created INTEGER;  
BEGIN  
ORA_KAFKA.CREATE_VIEWS  
('kc1', -- cluster name  
'OsakApp0', -- consumer group name that OSS or Kafka cluster sees.  
'StreamForOsak', -- Kafka topic aka OSS stream name  
'CSV', -- format type  
'BOOKS', -- database reference table  
views_created, -- output  
application_id); --output  
dbms_output.put_line('views created  = ' || views_created);  
dbms_output.put_line('application id = ' || application_id);  
END;  
/

对于 Kafka 消息、CSV 和 JSON,Oracle SQL 访问支持两种格式。此处使用 CSV。因此,符合 BOOKS 表方案的 Kafka 消息示例可以是 101、著名的书、John Smith、john@smith.com、第一个软件。


tar xvf sqldeveloper-20.4.0.379.2205-20.4.0-379.2205.noarch.rpm

  1. 上面的步骤将为主题的每个分区创建一个视图。由于主题 StreamForOsak 具有三个分区,因此我们将有三个视图。每个视图的名称将采用 KV_<CLUSTER_NAME>_<GROUP_NAME>_TOPIC_<NUM_OF_PARTITION> 格式。

此处有三个视图,如下所示,每个分区一个视图。

因此,视图 KV_KC1_OSAKAPP0_STREAMFOROSAK_0 映射到分区 0,视图 KV_KC1_OSAKAPP0_STREAMFOROSAK_2 映射到分区 1,最后,KV_KC1_OSAKAPP0_STREAMFOROSAK_0 映射到主题 StreamForOsak 的分区 2。

为流生成数据

  1. 使用 Oracle Cloud Infrastructure (OCI) Web 控制台中的生成测试消息按钮,生成到流 StreamForOsak 的测试消息,如下所示。

单击生成测试消息时,将显示一个窗口,其中包含用于输入测试消息的字段,如下所示。

我们输入以下消息:

200、Andrew Miller 第 1 部分、MS Brown、mb@example.com、First Software

不用说,我们还可以使用 Java/Python 中的标准 Kafka 生成器 API 或用于流服务的 OCI SDK 来生成流服务流的消息。

  1. 在上面的窗口中单击生成后,将消息发布到流 StreamForOsak。我们可以使用 Load Message 实用程序查看消息到达的分区。

如上所示,我们的消息位于主题 StreamForOsak 的分区 2 中。

使用 Oracle SQL 访问 Kafka 存储过程设置的数据库视图检索流服务消息

我们可以运行简单的 SQL 查询:


SELECT * FROM KV_KC1_OSAKAPP0_STREAMFOROSAK_2_

或者,如下所示,只需打开视图 SQL 开发人员即可查看视图 KV_KC1_OSAKAPP0_STREAMFOROSAK_2 中的数据:

我们选择查看 KV_KC1_OSAKAPP0_STREAMFOROSAK_2,因为它对应于流 StreamForOsak 的分区 2。因此,每次对此视图运行 SELECT SQL 查询时,它都会转到其关联主题的相应分区,并按视图中的行提取新的未提交的消息(对于使用者组 OsakApp0)。每条消息在视图中都有自己的行。请注意,这些视图的数据不会保留。

好奇的眼睛还可以观察到 Oracle SQL 访问 Kafka 可以获取和存储元数据信息,例如其他列中的消息偏移、分区和时间戳。

大多数情况下,您可能希望一次又一次地查询此视图,对于每次查询运行,都希望视图在流中按顺序移动。可以使用以下规范代码片段轻松实现此目的:


LOOP
ORA_KAFKA.NEXT_OFFSET(‘_KV_KC1_OSAKAPP0_STREAMFOROSAK_2_’);

SELECT * FROM KV_KC1_OSAKAPP0_STREAMFOROSAK_2_;

ORA_KAFKA.UPDATE_OFFSET(‘_KV_KC1_OSAKAPP0_STREAMFOROSAK_2_’);

COMMIT;
END LOOP;

如前所述,使用 Oracle SQL 访问 Kafka 时,流失由 Oracle Database 管理,而非流服务管理。它们位于系统表中,跟踪 KV_KC1_OSAKAPP0_STREAMFOROSAK_2 视图访问的所有分区的偏移位置。

NEXT_OFFSET 调用仅将 KV_KC1_OSAKAPP0_STREAMFOROSAK_2 视图绑定到一组新的偏移,这些偏移表示该视图访问的 Oracle Cloud Infrastructure Streaming 分区中的新数据。UPDATE_OFFSET 指示每个分区读取的行数,并将偏移量前进到新位置。

COMMIT 保证此工作单元符合 ACID。

结论

在本教程中,我们重点介绍如何安装对 Kafka 的 Oracle SQL 访问以及如何使用流服务。由于流服务兼容 Kafka,因此它就像一个用于 Oracle SQL 访问 Kafka 的 Kafka 集群。

请注意,就 Oracle SQL 访问 Kafka 所提供的功能而言,目前我们几乎没有暂存表面。对 Kafka 的 Oracle SQL 访问具有高度可配置性。我们甚至可以将流数据存储到 Oracle Database 表中,这些表与视图不同。有关更多详细信息,请参阅对 Kafka 安装的 Oracle SQL 访问中提供的自述文件以及以下参考。

  1. Oracle PL/SQL 开发人员文档,支持访问 Kafka 的 Oracle SQL
  2. 有关 Oracle SQL 访问 Kafka 的 Oracle 博客文章
  3. Oracle SQL 访问 Oracle Big Data SQL 中的 Kafka
  4. Oracle Cloud Infrastructure Streaming
  5. 流服务 Kafka API 兼容性

致谢

```

更多学习资源

docs.oracle.com/learn 上浏览其他实验室,或者在 Oracle Learning YouTube 渠道上访问更多免费学习内容。此外,访问 education.oracle.com/learning-explorer 以成为 Oracle Learning Explorer。

有关产品文档,请访问 Oracle 帮助中心