注意:
- 本教程需要访问 Oracle Cloud。要注册免费账户,请参阅开始使用 Oracle Cloud Infrastructure 免费套餐。
- 它对 Oracle Cloud Infrastructure 身份证明、租户和区间使用示例值。完成实验室后,请使用特定于云环境的那些值替换这些值。
使用 OCI Functions 将数据流式传输到 Oracle Autonomous Transaction Processing Database
简介
在本教程中,我们将学习如何使用 Oracle Cloud Infrastructure (OCI) 服务构建和部署实时数据流处理管道。Pipeline 利用 OCI Connector Hub 和 OCI Functions 将数据从 OCI 流传输到 Oracle Autonomous Database。该函数使用 Java 编写,采用资源主用户身份验证方法来安全访问 OCI 服务,并使用数据库 wallet 与 Oracle Autonomous Database 建立连接。
OCI Streaming: OCI 提供了一系列用于在云端处理数据的工具,OCI Streaming 是专为高吞吐量实时数据流量身定制的服务之一。通过利用 OCI Streaming,开发人员可以构建可扩展且可靠的数据管道,从而高效地摄取、处理和分发数据流。
OCI 函数: OCI 函数是一个完全托管的多租户、高度可扩展的按需函数即服务平台。它基于企业级 OCI,由 Fn Project 开源引擎提供支持。当您需要专注于编写代码以满足业务需求时,请使用 OCI Functions。
资源主用户验证:您可以使用资源主用户来验证和访问 OCI 资源。资源主体由临时会话令牌和安全凭证组成,使 OCI 函数能够向其他 OCI 服务(例如 OCI 流处理)验证自身。
OCI Connector Hub: OCI Connector Hub 是一个云消息总线平台,提供单一平台来描述、执行和监视在 OCI 服务之间移动数据时的交互。
注:
- 本教程仅为教育和学习目的而设计。它为学习者提供了一个在受控环境中进行实验和获得实践经验的环境。需要注意的是,本教程中使用的安全配置和做法可能不适合实际情况。
- 现实应用的安全注意事项往往更加复杂和动态。因此,在实施生产环境中演示的任何技术或配置之前,必须进行全面的安全评估和审查。本审查应涵盖安全的所有方面,包括访问控制、加密、监视和合规性,以确保系统符合组织的安全政策和标准。
- 从实验室环境过渡到实际部署时,安全性始终是重中之重。
目标
- 构建可实时处理消息的无缝数据流处理管道。将新消息发布到 OCI 流后,OCI Functions 会自动转发和处理该消息,然后将数据加载到 Oracle Autonomous Transaction Processing (ATP) 数据库中。此端到端解决方案展示了使用 OCI 服务的高效数据摄取和转换,确保了流处理与数据库系统之间安全、可扩展的集成。
先决条件
-
Oracle Cloud Infrastructure:
-
具有管理员级别访问权限的 Oracle 账户。
-
用于创建资源的区间。
注:请注意区间名称和区间 ID。
-
具有专用子网的 VCN。有关详细信息,请参阅创建虚拟云网络。
注:请注意 VCN OCID 和子网名称,您需要在流池和函数中使用它。
-
从 VCN 在专用子网中创建流池。有关更多信息,请参阅创建流池。
-
在之前创建的同一流池上创建名为
myfirststream
的流。有关详细信息,请参阅创建流。 -
Oracle Autonomous Database。有关详细信息,请参阅 Oracle Autonomous Database 。
-
用于存储密钥的 OCI Vault。有关详细信息,请参阅创建 Vault 。
-
用于允许登录到 OCI 容器注册表的 OCI 验证令牌。有关更多信息,请参见 Generating an Auth Token to Enable Login to Oracle Cloud Infrastructure Registry 。
-
-
OCI Cloud Shell 环境:
- 如果您没有 OCI Cloud Shell,则需要使用 OCI Cloud Shell 才能将该函数部署到 OCI,您可以改用堡垒主机。有关详细信息,请参阅堡垒概览。
任务 1:设置动态组
登录到 OCI 控制台,导航到您的域,单击动态组并创建包含以下信息的组。
-
组名:输入
MyFunctions
。ALL {resource.type = 'fnfunc', resource.compartment.id = 'pasteYourCompartmentOCID'}
任务 2: 创建策略
转到 OCI 控制台,导航到策略并使用以下信息创建策略。
-
策略名称:输入
FunctionsPolicies
。Allow dynamic-group MyFunctions to {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME, STREAM_PRODUCE} in compartment YOUR-COMPARTMENT-NAME Allow dynamic-group MyFunctions to read repos in compartment YOUR-COMPARTMENT-NAME Allow dynamic-group MyFunctions to read objects in compartment YOUR-COMPARTMENT-NAME Allow dynamic-group MyFunctions to read secret-bundles in compartment YOUR-COMPARTMENT-NAME
任务 3:创建 OCI 容器注册表
-
转到 OCI 控制台,导航到开发人员服务、容器和对象,选择容器注册表,然后单击创建资料档案库为函数映像创建专用资料档案库。
-
资料档案库名称:输入
lab/fn-java-event-to-atp
。
-
-
检查系统信息库并记下 Namespace 。
-
打开应在其中安装 OCI CLI 和 Docker 的 OCI Cloud Shell,然后继续登录注册表。检查您所在地区的正确 URL。在本教程中,我们使用 Brazil East (Sao Paulo) ,其中注册表 URL 为
gru.ocir.io
。注:您需要在“先决条件”部分中创建用户验证令牌。
docker login gru.ocir.io Username: <your container namespace>/youruser Password: YOUR_AUTH_TOKEN_CREATED_EARLIER
任务 4:创建 OCI 对象存储存储桶以存储数据库 Wallet
转到 OCI 控制台,导航到存储、对象存储和归档存储、存储桶,然后单击创建存储桶以创建一个名为 Wallet
的新存储桶来存储数据库 wallet zip 文件。
任务 5:下载数据库 Wallet 并将其存储在 OCI 对象存储存储桶中
注:此代码示例演示如何使用 wallet 安全地连接到 Oracle Autonomous Database。要继续,您需要配置 Wallet 身份证明并建立密码以在整个教程中使用。
-
转到 OCI 控制台,导航到 Oracle Database ,单击 Autonomous Database 并选择 ATP。
-
单击数据库连接。
-
输入您的钱包密码并记下,您需要在应用程序中使用此密码。
-
转到在任务 4 中创建的 OCI 对象存储存储桶,然后单击上载以上载下载的 wallet zip 文件。
任务 6:将数据库身份证明和 Wallet 密码安全地存储在 Vault 中
注:作为先决条件,请确保您已创建 Vault。
-
转到 OCI 控制台,导航到身份与安全、 Vault ,然后选择您的 Vault。
-
单击 Secrets(密钥)、 Create Secret(创建密钥)并使用以下信息创建密钥。
密钥名称 值 MYRDBMS_WALLET_PASSWORD
纯文本形式的钱包密码 MYRDBMS_DB_PASSWORD
以纯文本表示的数据库口令 注意确保为密文类型模板选择纯文本。
对 MYRDBMS_DB_PASSWORD
重复此过程,并记下将用于函数配置的两个 OCID。
任务 7:创建 Java OCI 函数以接收事件消息并插入到 ATP 数据库
注意请确保选择您的专用子网,即流池的同一子网。
-
转到 OCI 控制台,导航到 Developer Services(开发人员服务)、 Functions(函数)、 Applications(应用程序)并单击 Create application(创建应用程序)。
-
转到已安装 Docker、OCI CLI、Fn Project CLI 的 OCI Cloud Shell,然后运行以下命令来初始化函数。
注:如果执行了任务,则您的 Docker 登录命令现在已执行,如果未执行,请继续执行任务 3 中的 Docker 日志步骤。
-
检查当前上下文,因为您使用的是 OCI Cloud Shell,所以应该已在当前上下文中设置它。
fn list context
-
运行以下命令来更新函数设置,以便能够从在任务 3 中创建的容器注册表中提取映像。
fn update context oracle.compartment-id PASTE_YOUR_COMPARTMENT_OCID fn update context registry gru.ocir.io/PASTE_YOUR_REGISTRY_NAMESPACE/lab fn list context
注:在本教程中,我们使用的是 Brazil East(Sao Paulo) 区域,如果您使用的是其他区域,则需要更改 API URL 和 REGISTRY 位置。
-
运行以下命令检查您创建的应用程序是否列在 OCI Cloud Shell 上。
注:确保 OCI Cloud Shell 使用网络:公共
fn list apps
-
从此处获取 Java 函数示例代码: fn-java-event-to-atp.zip ,然后将其上载到 OCI Cloud Shell,然后继续解压缩文件。
-
单击右上角的滚轮菜单,然后选择上载以上载文件。
-
运行以下命令以解压缩文件。
# check your file is there ls -lrt # unzip the file unzip fn-java-event-to-atp.zip # check again ls -lrt
注:此简洁的 Java 项目从 OCI Connector Hub 检索 JSON 数据输入,根据预定义的格式对其进行语法分析,并将提取的信息无缝插入到 ATP 数据库中。要确保正确插入数据,请参阅以下步骤,以获得有关 ATP 数据库中所需消息格式和相应表的指导。
-
在 OCI Cloud Shell 中,运行以下命令来构建代码并部署函数。
fn deploy --app MyApp # After deploy complete, check the function is there: fn list functions MyApp |grep fn-java-event-to-atp
-
转到 OCI 控制台,导航到开发人员服务、函数、应用程序,选择您的应用程序 (
Myapp
),然后单击您的函数。注本教程的 Java 示例代码利用配置变量建立数据库连接。确保从环境中提供适当的值,以确保成功执行。
-
创建以下所有配置。
密钥名称/密钥 | 值 |
---|---|
WALLET_BUCKET | Wallet |
WALLET_OBJECT | zip 文件的名称 |
BUCKET_NAMESPACE | 您的存储桶名称空间 |
DB_USER | 您的数据库用户名 |
DB_SERVICE_NAME | 在数据库连接详细信息中找到的数据库服务名 |
DB_PASSWORD_OCID_VAULT | 粘贴密钥 OCID |
DB_WALLET_PASSWORD_OCID_VAULT | 粘贴密钥 OCID |
任务 8:在 ATP 数据库上创建新表
在 ATP 数据库上创建一个名为 MY_TABLE
的新表,以便从流中接收数据。
-
选择下载 wallet 的 Oracle Autonomous Database,单击数据库操作下拉菜单,然后单击 SQL 。
-
运行以下表创建 DDL 命令。
注意:此实验室使用 ADMIN 用户进行数据库连接。替换您自己的用户名(如果不同)。
CREATE TABLE "ADMIN"."MY_TABLE" ( "CODE" VARCHAR2(50 BYTE), "NAME" VARCHAR2(500 BYTE) ) ;
任务 9:将 OCI Connector Hub 设置为在新消息到达流时调用函数
现在,您已创建用于存储数据的函数和新表,现在是将所有内容连接在一起的时候了!我们将通过设置从 OCI Streaming、通过 OCI Connector Hub 流入到 OCI Functions 的管道来实现此目的。
-
转到 OCI 控制台,导航到 Analytics & AI 、 Messaging ,然后单击 Connector Hub 。
-
单击创建连接器并输入以下信息。
- 连接器名称:输入
StreamingToFN
。 - 来源:选择流处理。
- 目标:选择函数。
注意作为先决条件,请确保您已创建名为
myfirststream
的流池和流。 - 连接器名称:输入
-
在配置源中,选择流池和流。
-
在配置目标中,选择 MyApp 作为函数应用程序,选择 fn-java-event-to-atp 作为函数。
注意:您可能需要按照创建连接器页上的建议创建其他策略。
任务 10:在 OCI 流处理上创建消息并验证管道工作
注意:提供的 Java 代码示例设计为以特定的 JSON 结构处理消息。代码解析此 JSON 以提取必要的数据,然后将其插入到
MY_TABLE
表中。与预期 JSON 格式的任何偏差都将导致解析错误并阻止成功的数据库插入。
JSON 示例消息:
{"code": "001", "name":"Larry"}
执行相应步骤:
-
转到 OCI 控制台,导航到 Analytics & AI 、 Messaging 、 Streaming ,选择流 (
myfirststream
),然后单击 Produce Test Message 。 -
在数据中,输入示例 JSON 消息并单击生成。
注意流管道在首次执行时可能会遇到冷启动,从而导致明显的延迟。这是连接器集线器首次尝试发送消息时发生的函数初始化结果。
任务 11:验证数据库中到达的数据
-
选择下载 wallet 的 Oracle Autonomous Database,单击数据库操作下拉菜单,然后单击 SQL 。
-
运行下面的查询。
select * from admin.my_table;
故障排除和技巧
-
为函数启用日志并检查是否有任何错误。有关日志记录的更多信息,请参阅存储和查看函数日志。
-
验证所有策略是否都已就位,如“先决条件”部分所述。
-
为 OCI Connector Hub 启用日志。有关更多信息,请参见 Logs for Connector Hub 。
相关链接
确认
- 作者 — Joao Tarla(Oracle LAD A-Team 解决方案工程师)
更多学习资源
浏览 docs.oracle.com/learn 上的其他实验室,或者访问 Oracle Learning YouTube 渠道上的更多免费学习内容。此外,请访问 education.oracle.com/learning-explorer 成为 Oracle Learning Explorer。
有关产品文档,请访问 Oracle 帮助中心。
Stream Data to Oracle Autonomous Transaction Processing Database using OCI Functions
G25955-01
February 2025
Copyright ©2025, Oracle and/or its affiliates.