注:

将 Oracle Cloud Infrastructure Streaming 和 OCI 数据流与微批处理结合使用来流式 AVRO 消息

简介

在当今数据驱动的格局中,处理和分析实时数据流的能力对于企业而言至关重要,因为企业希望获得洞察并快速应对不断变化的情况。流数据处理技术已成为处理大量连续数据流的强大解决方案。在本教程中,我们将探讨一种创新方法,即使用 Oracle Cloud Infrastructure (OCI) 流处理来高效流式传输 AVRO 消息,并结合微批处理技术,并通过基于开源 FN 项目的 Oracle Functions 的无服务器功能进行增强。

AVRO 和流数据简介

AVRO 是一种广泛采用的数据序列化格式,以其代表复杂数据结构的效率及其与各种编程语言的兼容性而闻名。与流处理技术集成时,AVRO 几乎可以实时传输和处理数据,从而在没有通常与批处理关联的延迟的情况下提取有价值的洞察。

OCI 流:提供实时数据

Oracle Cloud Infrastructure (OCI) 提供了一系列用于在云中处理数据的工具,OCI 流处理是专为实现高吞吐量实时数据流量而定制的服务。通过利用 OCI 流处理,开发人员可以构建可扩展的可靠数据管道,从而高效摄取、处理和分发数据流。

OCI 数据流托管 Spark:无锁定解决方案

Oracle Cloud Infrastructure (OCI) 数据流是完全托管的 Apache Spark 服务,它对非常大的数据集执行处理任务,而无需部署或管理基础设施。

微批量处理微批量处理涉及使用时间或大小作为标准将传入数据流分解为紧凑批处理。然后将这些批处理为较小的作业。与流处理中的记录常量和单独处理不同,微批处理在处理之前引入了一些延迟和存储,从而可以进一步控制对数据执行的操作。与按时间间隔处理大数据集的传统批处理不同,微批处理可提供几乎实时的结果处理和交付。

发挥协同作用:OCI 流处理、OCI 数据流和 Oracle Functions

此教程深入分析了 OCI 流处理、OCI 数据流管理 Spark 流处理和 Oracle Functions 的融合。我们将指导您完成设置端到端流数据管道的过程,该管道将摄取 AVRO 编码的消息,并使用 OCI 数据流管理的 Spark 微批处理功能有效地处理消息,并引入了 Oracle Functions 的无服务器事件驱动处理。

目标

使用 OCI 流处理和 OCI 数据流管理的 Spark 微批处理,使用 AVRO 格式创建高效的实时数据处理管道。

重要提示:本教程仅针对教育和学习目的而设计。它为学员提供一个在受控环境中进行实验和获得实践经验的环境。值得注意的是,本实验室采用的安全配置和实践可能不适合实际情况。

现实世界应用的安全注意事项往往更为复杂和动态。因此,在实施生产环境中展示的任何技术或配置之前,必须进行全面的安全评估和审查。此审查应涵盖安全的所有方面,包括访问控制、加密、监视和合规性,以确保系统符合组织的安全策略和标准。

从实验室环境转变为实际部署时,安全性始终应是首要任务。

流程流
T0_1

高级架构
T0_1

先决条件 - Oracle Cloud Infrastructure

先决条件 - 本地计算机环境

任务 1:设置动态组

  1. 转到域,单击动态组并创建以下组。

    组名:MyFunctions

    ALL {resource.type = 'fnfunc', resource.compartment.id = 'pasteYourCompartmentID'}
    

    组名:ContainerIntances

    ALL {resource.type='compute-container-instances',  resource.compartment.id = 'pasteYourCompartmentID'}
    

    组名:DataFlowDynamicGroup

    ALL {resource.type='dataflowrun', resource.compartment.id = 'pasteYourCompartmentID'}
    

任务 2:创建策略

任务 3:创建存储桶并上载 AVRO 方案

  1. 转到存储桶,单击创建存储桶并创建一个名为 avro-schema-bucket 的新存储桶来存储 AVRO 模式文件。

    T3_1

  2. 现在,选择您的存储桶和记下名称空间,以后需要它。

    T3_1

  3. 将文件 user.asvc 上载到此创建的存储桶中。

    T3_1

任务 4:创建专用 OCI 流处理主题

  1. 转到 Analytics & AI ,然后单击 Streaming 并创建一个名为 FrontDoorTopic 的新流。

    T4_0

  2. 选择流池,单击 PrivatePool ,然后单击 Kafka 连接设置选项和字段的 TAKE NOTE ,以后需要它。

    T4_0

任务 5:创建 AUTH TOKEN

为用户创建 AUTH TOKEN,这是使用 Kafka 主题所必需的

  1. 单击右上方的用户图标,然后选择用户设置选项。

  2. 单击 Auth Tokens ,然后生成新令牌和令牌的 TAKE NOTE

    T4_1

任务 6:创建容器注册表

  1. 转到开发人员服务菜单,单击容器注册表并创建以下专用资料档案库。

    资料档案库名称 类型
    api-avro-sample_a 专用
    api-avro-sample_b 专用
    fn-recep-avro 专用
  2. 检查系统信息库和名称空间TAKE NOTE

    T6_1

  3. 打开安装了 OCI CLI 和 Docker 的终端 shell,然后继续在注册表中登录。检查您的区域的正确 URL 是什么。在本教程中,我们使用的是 Brazil East (Sao Paulo) ,注册表 URL 为 gru.ocir.io

    docker login gru.ocir.io
    Username: <your container namespace>/youruser
    Password: YOUR_AUTH_TOKEN_CREATED_EARLIER
    

    T6_1

任务 7:创建 OCI Vault

创建 OCI Vault 并提供本教程稍后将使用的所需变量。

  1. 转到 Identify & Security ,单击 Vault ,然后单击 Create Vault

    T7_1new

  2. 选择新的 Vault,并为其创建主加密密钥

    T7_1new

  3. 创建名为 AUTH_KEY 的新密钥并粘贴先前创建的验证密钥。

    T7_1new

  4. 重复密钥创建过程并创建以下新密钥:

    变量名称
    KAFKA_BOOTSTRAPSERVER “OCI 流处理配置中的助推服务器”
    KAFKA_TOPIC “FrontDoorTopic”
    KAFKA_USERNAME “您的用户名 + OCI 流处理配置中的功能集 ID”
    AUTH_KEY “在以前的步骤中创建的 AUTH 令牌”
  5. 记下为每个密钥创建的密钥 OCID 并创建新配置文件。

    • config.properties 文件包含从应用程序到 Vault 密钥 OCID 的映射变量。应用程序将使用此文件确定在运行时需要收集哪些 Vault 密钥。

    • 在本地计算机上创建新文件,您可以在其中访问 OCI-CLI:
      替换为每个安全的 OCID
      文件名:config.properties

      kafka_bootstrapserver_vaultOCID=ocid1.vaultsecret.REPLACE-WITH-YOURS
      kafka_topic_vaultOCID=ocid1.vaultsecret.REPLACE-WITH-YOURSxxxxxx
      kafka_username_vaultOCID=ocid1.vaultsecret.REPLACE-WITH-YOURS
      auth_token_vaultOCID=ocid1.vaultsecret.oc1.REPLACE-WITH-YOURS
      
  6. 转到存储桶,单击创建存储桶并创建一个名为 config 的新存储桶来存储 config.properties 文件。

  7. config.properties 文件上载到存储桶 config

    ls -lrt config.properties
    oci os object put -bn config --file config.properties --force
    

    T7_1new

任务 8:创建简单的 AVRO 消息并使用提供的示例 python 代码将其保存到文件中

  1. 打开安装了 OCI CLI、Docker 和 Python3 的 shell 终端,并基于之前在对象存储中上载的相同 AVRO 方案创建包含单个消息的新 AVRO 文件。

    注意:您必须检查 Python 版本,在本教程中使用的是 Python 3.9.16 ,早期版本可能无法运行。

  2. 从此处获取代码 Create_avro_sample.zip

  3. 将其解压缩到您选择的位置,然后运行程序以生成示例 AVRO 消息:

    cd ~
    mkdir create_avro_sample
    cd create_avro_sample
    unzip CreateAVRO_SampleFile.zip
    # Check the files are there
    ls -lrt
    # install the python dependencies on requirements.txt
    pip3 install -r requirements.txt
    # Run the program and create an AVRO message file
    python3 create_avro_sample.py meu_file.bin '{"id":10029,"name":"John","email":"john@bla.com"}'
    

    T8_1 T8_1 T8_1

任务 9:创建 OCI 函数以接收 AVRO 消息并发布到 OCI 流处理主题

  1. 转到开发人员服务,在函数下,单击应用程序,然后单击创建应用程序

    T9_1

  2. 转到安装了 Docker、OCI CLI 和 FN CLI 的终端 shell,然后运行以下命令来初始化函数。

    :如果您按照这些步骤操作,则现在已经执行了 Docker 登录命令(如果尚未执行),请继续执行创建容器注册表任务上的 Docker 登录步骤。

    fn create context oci-cloud --provider oracle
    fn use context oci-cloud
    fn update context oracle.compartment-id PASTE_YOUR_COMPARTMENT_OCID
    fn update context api-url https://functions.sa-saopaulo-1.oraclecloud.com
    fn update context registry gru.ocir.io/PASTE_YOUR_REGISTRY_NAMESPACE
    

    注意:在本教程中,我们使用的是巴西东部(圣保罗)区域,如果您使用的是其他区域,则需要更改 api-url注册表位置。

    T9_1

  3. 创建一个简单的 Hello-world 函数,以确保您的所有设置都正确无误。

    fn init --runtime python fn-recep-avro
    cd fn-recep-avro
    fn deploy --app MyReceptionApp
    fn invoke MyReceptionApp fn-recep-avro
    

    T9_1

  4. 获取文件 fn-recep-avro.zip 中的 AVRO 函数样例代码并替换之前创建的 hello-world 代码。必须同时获取文件 func.pyrequirements.txt 才能工作。

    # Check you have the right code for func.py & requirements.txt (you got from zip file)
    ls -lrt
    

    T9_1

  5. 构建新代码并部署函数

    fn deploy --app MyReceptionApp
    

    T9_1

  6. 要调用函数,我们需要将 AVRO 消息作为参数传递,为此,我们将使用先前步骤中创建的示例 AVRO 消息文件。首次调用函数时,需要更长的时间,因为它需要启动。

    # Check where you created the sample avro message file
    ls -lrt ../create_avro_sample/
    
    # Invoke the function to check if it's working as expected
    echo -n -e "$(cat ../create_avro_sample/meu_file.bin)" | fn invoke MyReceptionApp fn-recep-avro
    

    T9_1

任务 10:创建 API 网关以公开函数

  1. 在控制台中,单击开发人员服务,然后在 API 管理下,单击网关,然后单击创建网关

    T10_1

  2. 创建后,单击部署选项,然后单击创建部署

    名称:RecepFunction
    路径前缀: /

    • 在 "Authentication"(验证)中,选择 No Authentication(无身份验证),因为这是一个简单的实验并且未实施 API 验证。此处的主要目标是演示通过 API 传递二进制 AVRO 消息的 HTTPS 调用,为此,我们将不会为此简单实验室实施任何验证方法。
    • 在迈向现实生活环境之前,请确保遵循 API 网关的安全实践。
    • 有关详细信息,请参阅保护 API 网关和资源

    路由 1:路径: /

    Methos: POST
    后端类型:Oracle 函数
    应用程序: 选择您的函数

    T9_1

    T9_1

    T9_1

    T9_1

  3. 检查 API 网关端点并注意事项。

    T9_1

  4. 打开 Linux shell 终端并调用 API 网关。将 API URL 替换为您在前面的步骤中找到的正确端点。

    cd create_avro_sample/
    ls -lrt
    curl -X POST -H "Content-Type: application/octet-stream" \
         -d "$(echo -n -e "$(cat meu_file.bin)")" \
         https://xxxxxxxxxxxxx.apigateway.sa-saopaulo-1.oci.customer-oci.com/
    

    T9_1

检查点

T9_1

任务 11:为 API 类型 A 构建容器映像

API 类型 A 和 B 代码基本相同,只有不同的标题消息才能模拟两个不同的 API

  1. 从 API 类型 A 获取代码,并在 Linux shell 终端 api-avro-sample_a.zip 中解压缩该代码。

  2. 获取先前步骤中获取的容器注册表名称空间,并按照以下模式创建应用程序注册表位置。目录 URL 基于您的区域,即 Brasil East(SaoPaulo) 的 gru.ocir.io

    [ocir url]/[您的名称空间]/api-avro-sample_a:latest

  3. 在 Linux shell 终端中,为此 API 构建和推送 docker 映像。

    ls -lrt
    docker build . -t gru.ocir.io/yournamespace/api-avro-sample_a:latest
    docker push gru.ocir.io/yournamespace/api-avro-sample_a:latest
    

    T10_1 T10_1

任务 12:为 API 类型 B 构建容器映像

  1. 从 API type B 获取代码并将其解压缩到 Linux shell 终端 api-avro-sample_b.zip 中。

  2. 获取先前步骤中获取的容器注册表名称空间,并按照以下模式创建应用程序注册表位置。目录 URL 基于您的区域,即 Brasil East(SaoPaulo) 的 gru.ocir.io

    [ocir url]/[您的名称空间]/api-avro-sample_b:latest

  3. 在 Linux shell 终端中,为此 API 构建和推送 docker 映像。

    ls -lrt
    docker build . -t gru.ocir.io/yournamespace/api-avro-sample_b:latest
    docker push gru.ocir.io/yournamespace/api-avro-sample_b:latest
    

    T10_1

    T10_1

  4. 如果图像已成功推送,请检查容器注册表页。

    T10_1

任务 13:在容器服务中部署 API

  1. 转到开发人员服务容器实例,然后单击创建容器实例

    T13_1

    T13_1

  2. api-type-b 重复步骤 1,为 TYPE B API 选择正确的映像。

    1. 转到开发人员服务容器实例,然后单击创建容器实例并重复部署 API 类型 B 的步骤

    2. 从容器实例获取内部 FQDN 地址

      T14_1

      • 单击容器实例,并记下每个内部 FQDN 地址

      T14_1

    3. 转到 Identify & Secutiry ,单击 Vault ,选择 VAULT 并创建两个新的 secrets

      密钥名称
      API 类型 _A_URL 粘贴 API 类型 A 的内部 FQDN 专用地址
      API 类型 _B_URL 粘贴 API 类型 B 的内部 FQDN 专用地址

      记下每个密钥 OCID

      您的 Vault 现在应如下所示:

      T14_1

    4. 编辑上载到 config 存储桶的 config.properties 文件,并为密钥 OCID 添加新条目

      ls -lrt config.properties
      vi config.properties
      api_type_a_url_vaultOCID=paste_API_TYPE_A_URL_secretOCID
      api_type_b_url_vaultOCID=paste_API_TYPE_B_URL_secretOCID
      
      # After save, please upload the new version to Object Storage
      cat config.properties
      oci os object put -bn config --file config.properties --force
      

      文件应如下所示:
      T14_1

      T14_1

任务 14:使用 create_avro_sample.py 测试 API

  1. 转到 Linux shell 终端,您将从任务 7 中保存 create_avro_sample.py ,然后创建一些新消息来测试 API 调用。我们正在创建具有不同 ID 的两个新的 AVRO 文件(1010 个,1020 个),我们将在 Spark Stream (DataFlow) 程序中用作过滤器。

    ls -lrt
    python3 create_avro_sample.py type_a_message.bin '{"id":1010,"name":"Paul that goes to API type A","email":"paul@bla.com"}'
    
    python3 create_avro_sample.py type_b_message.bin '{"id":1020,"name":"Mary that goes to API type B","email":"mary@bla.com"}'
    
    

    T14_1

  2. 调用通过 AVRO 消息进行测试的 API 正常工作。转到容器实例页面,获取每个 API api-type-aapi-type-b内部 FQDN 地址。请记住,要替换 API 中对应的内部 FQDN 地址的以下 URL。

    ls -lrt type*
    
    curl -i -X POST -H "Content-Type: application/octet-stream" \
       --data-binary "@type_a_message.bin" \
       xxx.xx.x.xxx
    
    curl -i -X POST -H "Content-Type: application/octet-stream" \
       --data-binary "@type_b_message.bin" \
       xxx.xxx.xx.xxx
    
    

    T14_1

任务 15:设置 Java Spark 流处理应用程序

  1. 转到存储桶,单击创建存储桶并创建两个名为 dataflow-app-avrodataflow-logs-avro 的新存储桶,这将用于上载 java 应用程序。

  2. 请仔细检查 java 环境版本。

    Java

    java 11.0.8 2020-07-14 LTS
    Java(TM) SE Runtime Environment 18.9 (build 11.0.8+10-LTS)
    Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.8+10-LTS, mixed mode)
    

    Maven

    Apache Maven 3.5.4 (Red Hat 3.5.4-5)
    Maven home: /usr/share/maven
    Java version: 11.0.20, vendor: Red Hat, Inc., runtime: /usr/lib/jvm/java-11-openjdk-11.0.20.0.8-3.0.1.el8.x86_64
    Default locale: en_US, platform encoding: ANSI_X3.4-1968
    OS name: "linux", version: "5.15.0-103.114.4.el8uek.x86_64", arch: "amd64", family: "unix"
    
  3. 下载示例代码并将其解压缩到具有 oci-cli、docker、java 和 maven 的本地环境中: spark-consume-avro-message.zip

    unzip spark-consume-avro-message.zip
    cd spark-consume-avro-message
    ls -lrt
    

    T15_1 T15_1

    下面简要介绍了用于调用容器实例类型 AB 的代理代码。

    检查主程序文件 .src/main/java/example/Example.java.... T15_1

  4. 由于此 Java 程序使用 lib 处理 spark-avro,因此我们需要打包依赖项以将其传递到数据流。为此,我们将使用数据流相关性打包程序,如果您需要更多详细信息,可以转至数据流相关性打包程序

    软件包 org.apache.spark:spark-avro_2.12:3.2.1 已在 packages.txt 文件中声明,只需要通过运行对其进行打包:

    docker run --privileged --platform linux/amd64 --rm -v $(pwd):/opt/dataflow  --pull always -it phx.ocir.io/oracle/dataflow/dependency-packager:latest -p 3.8
    

    T15_1 T15_1 T15_1

  5. 使用 oci-cli 将 archive.zip 文件上载到名为 dataflow-app-avro 的存储桶。

    oci os object put -bn dataflow-app-avro --file archive.zip --force
    
  6. 编译 java 应用程序并将其打包并上载到存储桶 dataflow-app-avro

    ls -lrt
    mvn clean install
    

    T15_1
    ... 压缩了编译日志的行数 ... T15_1

    # upload the JAR file to the storage bucket
    oci os object put -bn dataflow-app-avro --file target/consumekafka-1.0-SNAPSHOT.jar --force
    

    T15_1

  7. 检查当前的 dataflow-app-avro 存储桶,并确保它如下所示。

    T15_1

  8. 转到 Analytics & AI ,然后在数据湖下,单击数据流,选择左侧菜单专用端点,然后单击创建专用端点

    • 由于我们将 PRIVATE 子网用于容器实例和 OCI 流处理池,因此需要专用端点。

    • 确保使用 OCI 容器实例和 OCI 流处理池中的内部 FQDN 填充 DNS 区域,并用逗号分隔。

      T15_1

  9. 转到 Analytics & AI ,然后在数据湖下,单击数据流,然后单击创建应用程序

    T15_1
    T15_1
    T15_1
    T15_1
    T15_1
    T15_1

    • 创建后,选择 spark-lab-avro 数据流,然后单击 Run 启动程序,启动程序通常最多需要 8 分钟。

      T15_1
      T15_1

  10. 检查“Running dataflow(正在运行的数据流)”应用程序,打开将显示当前作业并且应用程序正在运行的 SparkUI。

    T15_1

    T15_1

    T15_1

任务 16:验证流

调用函数并通过消息检查所有流是否按预期工作。

  1. 打开 Linux shell 终端,在其中创建样例消息 type_a_message.bintype_b_message.bin 并发送消息。将 API URL 替换为您创建的 API 网关中的正确端点。

    cd create_avro_sample/
    ls -lrt
    curl -X POST -H "Content-Type: application/octet-stream" \
       -d "$(echo -n -e "$(cat type_a_message.bin)")" \
       https://xxxxxxxxxxxxx.apigateway.sa-saopaulo-1.oci.customer-oci.com/
    

    T16_1

  2. 检查容器实例上的日志是否调用了 API 类型 A

    T16_1 T16_1

您可以重复该过程并发送 type_b_message.bin 文件,它将调用类型 B 容器实例。

T9_1

确认

更多学习资源

探索 docs.oracle.com/learn 上的其他实验室,或者访问 Oracle Learning YouTube 频道上的更多免费学习内容。此外,请访问 education.oracle.com/learning-explorer 成为 Oracle Learning Explorer。

有关产品文档,请访问 Oracle 帮助中心