注:
- 此教程需要访问 Oracle Cloud。要注册免费账户,请参阅开始使用 Oracle Cloud Infrastructure Free Tier 。
- 它使用 Oracle Cloud Infrastructure 身份证明、租户和区间示例值。完成实验室时,请将这些值替换为特定于云环境的那些值。
将 Oracle Cloud Infrastructure Streaming 和 OCI 数据流与微批处理结合使用来流式 AVRO 消息
简介
在当今数据驱动的格局中,处理和分析实时数据流的能力对于企业而言至关重要,因为企业希望获得洞察并快速应对不断变化的情况。流数据处理技术已成为处理大量连续数据流的强大解决方案。在本教程中,我们将探讨一种创新方法,即使用 Oracle Cloud Infrastructure (OCI) 流处理来高效流式传输 AVRO 消息,并结合微批处理技术,并通过基于开源 FN 项目的 Oracle Functions 的无服务器功能进行增强。
AVRO 和流数据简介
AVRO 是一种广泛采用的数据序列化格式,以其代表复杂数据结构的效率及其与各种编程语言的兼容性而闻名。与流处理技术集成时,AVRO 几乎可以实时传输和处理数据,从而在没有通常与批处理关联的延迟的情况下提取有价值的洞察。
OCI 流:提供实时数据
Oracle Cloud Infrastructure (OCI) 提供了一系列用于在云中处理数据的工具,OCI 流处理是专为实现高吞吐量实时数据流量而定制的服务。通过利用 OCI 流处理,开发人员可以构建可扩展的可靠数据管道,从而高效摄取、处理和分发数据流。
OCI 数据流托管 Spark:无锁定解决方案
Oracle Cloud Infrastructure (OCI) 数据流是完全托管的 Apache Spark 服务,它对非常大的数据集执行处理任务,而无需部署或管理基础设施。
微批量处理微批量处理涉及使用时间或大小作为标准将传入数据流分解为紧凑批处理。然后将这些批处理为较小的作业。与流处理中的记录常量和单独处理不同,微批处理在处理之前引入了一些延迟和存储,从而可以进一步控制对数据执行的操作。与按时间间隔处理大数据集的传统批处理不同,微批处理可提供几乎实时的结果处理和交付。
发挥协同作用:OCI 流处理、OCI 数据流和 Oracle Functions
此教程深入分析了 OCI 流处理、OCI 数据流管理 Spark 流处理和 Oracle Functions 的融合。我们将指导您完成设置端到端流数据管道的过程,该管道将摄取 AVRO 编码的消息,并使用 OCI 数据流管理的 Spark 微批处理功能有效地处理消息,并引入了 Oracle Functions 的无服务器事件驱动处理。
目标
使用 OCI 流处理和 OCI 数据流管理的 Spark 微批处理,使用 AVRO 格式创建高效的实时数据处理管道。
重要提示:本教程仅针对教育和学习目的而设计。它为学员提供一个在受控环境中进行实验和获得实践经验的环境。值得注意的是,本实验室采用的安全配置和实践可能不适合实际情况。
现实世界应用的安全注意事项往往更为复杂和动态。因此,在实施生产环境中展示的任何技术或配置之前,必须进行全面的安全评估和审查。此审查应涵盖安全的所有方面,包括访问控制、加密、监视和合规性,以确保系统符合组织的安全策略和标准。
从实验室环境转变为实际部署时,安全性始终应是首要任务。
流程流
高级架构
先决条件 - Oracle Cloud Infrastructure
- 具有管理员级别访问权限的 Oracle 账户
- 用于创建资源的区间:请注意 COMPARMENT ID
- 具有两个子网(专用和公共)的 VCN,请参阅创建网络教程
- 请确保子网对服务网关具有正确的入站规则,对于专用子网和公共子网,其端口 443 和 80
先决条件 - 本地计算机环境
-
PRIVATE 子网上的 Oracle Linux 计算实例。这对于访问 PRIVATE 子网上的资源非常重要,例如本教程期间将部署的 OCI 流处理、函数和容器实例。
-
堡垒主机,用于连接到 Oracle Linux 计算实例并执行教程任务。有关详细信息,请参阅堡垒概览。
-
本地 OCI-CLI 设置。有关详细信息,请参阅安装 CLI 教程。
-
如果使用的是 Oracle Linux,则本地文档能够构建映像,请参见 Install DOCKER on Oracle Linux 。
-
本地 Python 版本 3.9.16(至少)已安装用于测试
-
本地 Java JDK 11.0.8
-
本地 Maven 3.5.4
-
能够将函数部署到 OCI 的本地 FN CLI: Installing FN CLI
任务 1:设置动态组
-
转到域,单击动态组并创建以下组。
组名:MyFunctions
ALL {resource.type = 'fnfunc', resource.compartment.id = 'pasteYourCompartmentID'}
组名:ContainerIntances
ALL {resource.type='compute-container-instances', resource.compartment.id = 'pasteYourCompartmentID'}
组名:DataFlowDynamicGroup
ALL {resource.type='dataflowrun', resource.compartment.id = 'pasteYourCompartmentID'}
任务 2:创建策略
-
转到策略并创建以下策略。
策略名:FunctionsPolicies
Allow dynamic-group MyFunctions to manage objects in compartment YOUR-COMPARTMENT-NAME Allow dynamic-group ContainerIntances to manage objects in compartment YOUR-COMPARTMENT-NAME Allow dynamic-group MyFunctions to read secret-bundles in compartment YOUR-COMPARTMENT-NAME
策略名:StreamTopicPolicies
Allow dynamic-group ContainerIntances to read objects in compartment YOUR-COMPARTMENT-NAME Allow dynamic-group ContainerIntances use stream-push in compartment YOUR-COMPARTMENT-NAME
策略名:ContainerInstancesPolicy
Allow dynamic-group ContainerIntances to read repos in compartment YOUR-COMPARTMENT-NAME
策略名:DataFlowPolicies
Allow dynamic-group DataFlowDynamicGroup to manage objects in compartment YOUR-COMPARTMENT-NAME Allow dynamic-group DataFlowDynamicGroup to {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in compartment YOUR-COMPARTMENT-NAME Allow dynamic-group DataFlowDynamicGroup to manage data-catalog-metastores in compartment YOUR-COMPARTMENT-NAME Allow dynamic-group DataFlowDynamicGroup to manage object-family in compartment YOUR-COMPARTMENT-NAME Allow dynamic-group DataFlowDynamicGroup to read secret-bundles in compartment YOUR-COMPARTMENT-NAME Allow service dataflow to read objects in compartment YOUR-COMPARTMENT-NAME
任务 3:创建存储桶并上载 AVRO 方案
-
转到存储桶,单击创建存储桶并创建一个名为 avro-schema-bucket 的新存储桶来存储 AVRO 模式文件。
-
现在,选择您的存储桶和记下名称空间,以后需要它。
-
将文件 user.asvc 上载到此创建的存储桶中。
任务 4:创建专用 OCI 流处理主题
-
转到 Analytics & AI ,然后单击 Streaming 并创建一个名为 FrontDoorTopic 的新流。
-
选择流池,单击 PrivatePool ,然后单击 Kafka 连接设置选项和字段的 TAKE NOTE ,以后需要它。
任务 5:创建 AUTH TOKEN
为用户创建 AUTH TOKEN,这是使用 Kafka 主题所必需的
-
单击右上方的用户图标,然后选择用户设置选项。
-
单击 Auth Tokens ,然后生成新令牌和令牌的 TAKE NOTE 。
任务 6:创建容器注册表
-
转到开发人员服务菜单,单击容器注册表并创建以下专用资料档案库。
资料档案库名称 类型 api-avro-sample_a 专用 api-avro-sample_b 专用 fn-recep-avro 专用 -
检查系统信息库和名称空间的 TAKE NOTE 。
-
打开安装了 OCI CLI 和 Docker 的终端 shell,然后继续在注册表中登录。检查您的区域的正确 URL 是什么。在本教程中,我们使用的是 Brazil East (Sao Paulo) ,注册表 URL 为 gru.ocir.io 。
docker login gru.ocir.io Username: <your container namespace>/youruser Password: YOUR_AUTH_TOKEN_CREATED_EARLIER
任务 7:创建 OCI Vault
创建 OCI Vault 并提供本教程稍后将使用的所需变量。
-
转到 Identify & Security ,单击 Vault ,然后单击 Create Vault 。
-
选择新的 Vault,并为其创建主加密密钥。
-
创建名为 AUTH_KEY 的新密钥并粘贴先前创建的验证密钥。
-
重复密钥创建过程并创建以下新密钥:
变量名称 值 KAFKA_BOOTSTRAPSERVER “OCI 流处理配置中的助推服务器” KAFKA_TOPIC “FrontDoorTopic” KAFKA_USERNAME “您的用户名 + OCI 流处理配置中的功能集 ID” AUTH_KEY “在以前的步骤中创建的 AUTH 令牌” -
记下为每个密钥创建的密钥 OCID 并创建新配置文件。
-
config.properties 文件包含从应用程序到 Vault 密钥 OCID 的映射变量。应用程序将使用此文件确定在运行时需要收集哪些 Vault 密钥。
-
在本地计算机上创建新文件,您可以在其中访问 OCI-CLI:
替换为每个安全的 OCID
文件名:config.propertieskafka_bootstrapserver_vaultOCID=ocid1.vaultsecret.REPLACE-WITH-YOURS kafka_topic_vaultOCID=ocid1.vaultsecret.REPLACE-WITH-YOURSxxxxxx kafka_username_vaultOCID=ocid1.vaultsecret.REPLACE-WITH-YOURS auth_token_vaultOCID=ocid1.vaultsecret.oc1.REPLACE-WITH-YOURS
-
-
转到存储桶,单击创建存储桶并创建一个名为 config 的新存储桶来存储
config.properties
文件。 -
将 config.properties 文件上载到存储桶 config
ls -lrt config.properties oci os object put -bn config --file config.properties --force
任务 8:创建简单的 AVRO 消息并使用提供的示例 python 代码将其保存到文件中
-
打开安装了 OCI CLI、Docker 和 Python3 的 shell 终端,并基于之前在对象存储中上载的相同 AVRO 方案创建包含单个消息的新 AVRO 文件。
注意:您必须检查 Python 版本,在本教程中使用的是 Python 3.9.16 ,早期版本可能无法运行。
-
从此处获取代码 Create_avro_sample.zip 。
-
将其解压缩到您选择的位置,然后运行程序以生成示例 AVRO 消息:
cd ~ mkdir create_avro_sample cd create_avro_sample unzip CreateAVRO_SampleFile.zip # Check the files are there ls -lrt # install the python dependencies on requirements.txt pip3 install -r requirements.txt # Run the program and create an AVRO message file python3 create_avro_sample.py meu_file.bin '{"id":10029,"name":"John","email":"john@bla.com"}'
任务 9:创建 OCI 函数以接收 AVRO 消息并发布到 OCI 流处理主题
-
转到开发人员服务,在函数下,单击应用程序,然后单击创建应用程序。
-
转到安装了 Docker、OCI CLI 和 FN CLI 的终端 shell,然后运行以下命令来初始化函数。
注:如果您按照这些步骤操作,则现在已经执行了 Docker 登录命令(如果尚未执行),请继续执行创建容器注册表任务上的 Docker 登录步骤。
fn create context oci-cloud --provider oracle fn use context oci-cloud fn update context oracle.compartment-id PASTE_YOUR_COMPARTMENT_OCID fn update context api-url https://functions.sa-saopaulo-1.oraclecloud.com fn update context registry gru.ocir.io/PASTE_YOUR_REGISTRY_NAMESPACE
注意:在本教程中,我们使用的是巴西东部(圣保罗)区域,如果您使用的是其他区域,则需要更改 api-url 和注册表位置。
-
创建一个简单的 Hello-world 函数,以确保您的所有设置都正确无误。
fn init --runtime python fn-recep-avro cd fn-recep-avro fn deploy --app MyReceptionApp fn invoke MyReceptionApp fn-recep-avro
-
获取文件 fn-recep-avro.zip 中的 AVRO 函数样例代码并替换之前创建的 hello-world 代码。必须同时获取文件 func.py 和 requirements.txt 才能工作。
# Check you have the right code for func.py & requirements.txt (you got from zip file) ls -lrt
-
构建新代码并部署函数
fn deploy --app MyReceptionApp
-
要调用函数,我们需要将 AVRO 消息作为参数传递,为此,我们将使用先前步骤中创建的示例 AVRO 消息文件。首次调用函数时,需要更长的时间,因为它需要启动。
# Check where you created the sample avro message file ls -lrt ../create_avro_sample/ # Invoke the function to check if it's working as expected echo -n -e "$(cat ../create_avro_sample/meu_file.bin)" | fn invoke MyReceptionApp fn-recep-avro
任务 10:创建 API 网关以公开函数
-
在控制台中,单击开发人员服务,然后在 API 管理下,单击网关,然后单击创建网关。
-
创建后,单击部署选项,然后单击创建部署。
名称:RecepFunction
路径前缀: /- 在 "Authentication"(验证)中,选择 No Authentication(无身份验证),因为这是一个简单的实验并且未实施 API 验证。此处的主要目标是演示通过 API 传递二进制 AVRO 消息的 HTTPS 调用,为此,我们将不会为此简单实验室实施任何验证方法。
- 在迈向现实生活环境之前,请确保遵循 API 网关的安全实践。
- 有关详细信息,请参阅保护 API 网关和资源。
路由 1:路径: /
Methos: POST
后端类型:Oracle 函数
应用程序: 选择您的函数 -
检查 API 网关端点并注意事项。
-
打开 Linux shell 终端并调用 API 网关。将 API URL 替换为您在前面的步骤中找到的正确端点。
cd create_avro_sample/ ls -lrt curl -X POST -H "Content-Type: application/octet-stream" \ -d "$(echo -n -e "$(cat meu_file.bin)")" \ https://xxxxxxxxxxxxx.apigateway.sa-saopaulo-1.oci.customer-oci.com/
检查点
任务 11:为 API 类型 A 构建容器映像
注: API 类型 A 和 B 代码基本相同,只有不同的标题消息才能模拟两个不同的 API 。
-
从 API 类型 A 获取代码,并在 Linux shell 终端 api-avro-sample_a.zip 中解压缩该代码。
-
获取先前步骤中获取的容器注册表名称空间,并按照以下模式创建应用程序注册表位置。目录 URL 基于您的区域,即 Brasil East(SaoPaulo) 的 gru.ocir.io
[ocir url]/[您的名称空间]/api-avro-sample_a:latest
-
在 Linux shell 终端中,为此 API 构建和推送 docker 映像。
ls -lrt docker build . -t gru.ocir.io/yournamespace/api-avro-sample_a:latest docker push gru.ocir.io/yournamespace/api-avro-sample_a:latest
任务 12:为 API 类型 B 构建容器映像
-
从 API type B 获取代码并将其解压缩到 Linux shell 终端 api-avro-sample_b.zip 中。
-
获取先前步骤中获取的容器注册表名称空间,并按照以下模式创建应用程序注册表位置。目录 URL 基于您的区域,即 Brasil East(SaoPaulo) 的 gru.ocir.io
[ocir url]/[您的名称空间]/api-avro-sample_b:latest
-
在 Linux shell 终端中,为此 API 构建和推送 docker 映像。
ls -lrt docker build . -t gru.ocir.io/yournamespace/api-avro-sample_b:latest docker push gru.ocir.io/yournamespace/api-avro-sample_b:latest
-
如果图像已成功推送,请检查容器注册表页。
任务 13:在容器服务中部署 API
-
转到开发人员服务、容器实例,然后单击创建容器实例。
-
为 api-type-b 重复步骤 1,为 TYPE B API 选择正确的映像。
-
转到开发人员服务、容器实例,然后单击创建容器实例并重复部署 API 类型 B 的步骤
-
从容器实例获取内部 FQDN 地址。
- 单击容器实例,并记下每个内部 FQDN 地址。
-
转到 Identify & Secutiry ,单击 Vault ,选择 VAULT 并创建两个新的 secrets 。
密钥名称 值 API 类型 _A_URL 粘贴 API 类型 A 的内部 FQDN 专用地址 API 类型 _B_URL 粘贴 API 类型 B 的内部 FQDN 专用地址 记下每个密钥 OCID
您的 Vault 现在应如下所示:
-
编辑上载到 config 存储桶的 config.properties 文件,并为密钥 OCID 添加新条目
ls -lrt config.properties vi config.properties api_type_a_url_vaultOCID=paste_API_TYPE_A_URL_secretOCID api_type_b_url_vaultOCID=paste_API_TYPE_B_URL_secretOCID # After save, please upload the new version to Object Storage cat config.properties oci os object put -bn config --file config.properties --force
文件应如下所示:
-
任务 14:使用 create_avro_sample.py 测试 API
-
转到 Linux shell 终端,您将从任务 7 中保存 create_avro_sample.py ,然后创建一些新消息来测试 API 调用。我们正在创建具有不同 ID 的两个新的 AVRO 文件(1010 个,1020 个),我们将在 Spark Stream (DataFlow) 程序中用作过滤器。
ls -lrt python3 create_avro_sample.py type_a_message.bin '{"id":1010,"name":"Paul that goes to API type A","email":"paul@bla.com"}' python3 create_avro_sample.py type_b_message.bin '{"id":1020,"name":"Mary that goes to API type B","email":"mary@bla.com"}'
-
调用通过 AVRO 消息进行测试的 API 正常工作。转到容器实例页面,获取每个 API api-type-a 和 api-type-b 的内部 FQDN 地址。请记住,要替换 API 中对应的内部 FQDN 地址的以下 URL。
ls -lrt type* curl -i -X POST -H "Content-Type: application/octet-stream" \ --data-binary "@type_a_message.bin" \ xxx.xx.x.xxx curl -i -X POST -H "Content-Type: application/octet-stream" \ --data-binary "@type_b_message.bin" \ xxx.xxx.xx.xxx
任务 15:设置 Java Spark 流处理应用程序
-
转到存储桶,单击创建存储桶并创建两个名为 dataflow-app-avro 和 dataflow-logs-avro 的新存储桶,这将用于上载 java 应用程序。
-
请仔细检查 java 环境版本。
Java
java 11.0.8 2020-07-14 LTS Java(TM) SE Runtime Environment 18.9 (build 11.0.8+10-LTS) Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.8+10-LTS, mixed mode)
Maven
Apache Maven 3.5.4 (Red Hat 3.5.4-5) Maven home: /usr/share/maven Java version: 11.0.20, vendor: Red Hat, Inc., runtime: /usr/lib/jvm/java-11-openjdk-11.0.20.0.8-3.0.1.el8.x86_64 Default locale: en_US, platform encoding: ANSI_X3.4-1968 OS name: "linux", version: "5.15.0-103.114.4.el8uek.x86_64", arch: "amd64", family: "unix"
-
下载示例代码并将其解压缩到具有 oci-cli、docker、java 和 maven 的本地环境中: spark-consume-avro-message.zip 。
unzip spark-consume-avro-message.zip cd spark-consume-avro-message ls -lrt
下面简要介绍了用于调用容器实例类型 A 和 B 的代理代码。
检查主程序文件 .src/main/java/example/Example.java....
-
由于此 Java 程序使用 lib 处理 spark-avro,因此我们需要打包依赖项以将其传递到数据流。为此,我们将使用数据流相关性打包程序,如果您需要更多详细信息,可以转至数据流相关性打包程序。
软件包 org.apache.spark:spark-avro_2.12:3.2.1 已在 packages.txt 文件中声明,只需要通过运行对其进行打包:
docker run --privileged --platform linux/amd64 --rm -v $(pwd):/opt/dataflow --pull always -it phx.ocir.io/oracle/dataflow/dependency-packager:latest -p 3.8
-
使用 oci-cli 将 archive.zip 文件上载到名为 dataflow-app-avro 的存储桶。
oci os object put -bn dataflow-app-avro --file archive.zip --force
-
编译 java 应用程序并将其打包并上载到存储桶 dataflow-app-avro
ls -lrt mvn clean install
... 压缩了编译日志的行数 ...# upload the JAR file to the storage bucket oci os object put -bn dataflow-app-avro --file target/consumekafka-1.0-SNAPSHOT.jar --force
-
检查当前的 dataflow-app-avro 存储桶,并确保它如下所示。
-
转到 Analytics & AI ,然后在数据湖下,单击数据流,选择左侧菜单专用端点,然后单击创建专用端点。
-
由于我们将 PRIVATE 子网用于容器实例和 OCI 流处理池,因此需要专用端点。
-
确保使用 OCI 容器实例和 OCI 流处理池中的内部 FQDN 填充 DNS 区域,并用逗号分隔。
-
-
转到 Analytics & AI ,然后在数据湖下,单击数据流,然后单击创建应用程序。
-
创建后,选择 spark-lab-avro 数据流,然后单击 Run 启动程序,启动程序通常最多需要 8 分钟。
-
-
检查“Running dataflow(正在运行的数据流)”应用程序,打开将显示当前作业并且应用程序正在运行的 SparkUI。
任务 16:验证流
调用函数并通过消息检查所有流是否按预期工作。
-
打开 Linux shell 终端,在其中创建样例消息 type_a_message.bin 和 type_b_message.bin 并发送消息。将 API URL 替换为您创建的 API 网关中的正确端点。
cd create_avro_sample/ ls -lrt curl -X POST -H "Content-Type: application/octet-stream" \ -d "$(echo -n -e "$(cat type_a_message.bin)")" \ https://xxxxxxxxxxxxx.apigateway.sa-saopaulo-1.oci.customer-oci.com/
-
检查容器实例上的日志是否调用了 API 类型 A 。
您可以重复该过程并发送 type_b_message.bin 文件,它将调用类型 B 容器实例。
相关链接
确认
- 作者 - Joao Tarla(Oracle LAD A-Team 解决方案工程师)
更多学习资源
探索 docs.oracle.com/learn 上的其他实验室,或者访问 Oracle Learning YouTube 频道上的更多免费学习内容。此外,请访问 education.oracle.com/learning-explorer 成为 Oracle Learning Explorer。
有关产品文档,请访问 Oracle 帮助中心。
Stream AVRO Messages with Oracle Cloud Infrastructure Streaming and OCI Data Flow with Micro-Batch processing
F86501-01
September 2023
Copyright © 2023, Oracle and/or its affiliates.