注意:
- 此教程需要访问 Oracle Cloud。要注册免费账户,请参阅开始使用 Oracle Cloud Infrastructure Free Tier 。
- 它使用 Oracle Cloud Infrastructure 身份证明、租户和区间示例值。完成实验室时,请将这些值替换为特定于您的云环境的值。
使用可扩展的云原生流在 Oracle Cloud Infrastructure Object Storage 上处理文件
简介
在我们的应用程序中,我们经常需要处理大量文件。过去,这是以批处理形式完成的,但随着新技术和云的出现,我们现在能够将许多串行进程转换为并行进程。使用消息队列、Kubernetes 集群和事件驱动架构是一些广泛用于充分利用大量处理的技术和架构。
Oracle Cloud Infrastructure (OCI) 提供资源来帮助企业降低可扩展性和成本。让我们来了解一下云原生服务。
-
OCI Object Storage 支持客户以原生格式安全地存储任何类型的数据。OCI 对象存储具有内置冗余功能,非常适合构建需要扩展和灵活性的现代应用,因为它们可用于整合多个数据源以进行分析、备份或归档。
-
OCI Streaming 服务是一个与 Apache Kafka 兼容、无服务器、实时的事件流处理平台,面向开发人员和数据科学家。流处理与 OCI、数据库、GoldenGate 和集成云完全集成。该服务还为 DevOps、数据库、大数据和 SaaS 应用等类别的数百种第三方产品提供现成的集成。
-
OCI 事件服务使用符合云原生计算基金会 (CNCF) 云事件标准的事件跟踪对资源所做的更改。开发人员可以使用 OCI Functions 触发代码、记录到 OCI Streaming 或使用 OCI Notifications 发送警报,从而实时响应所做的更改。
-
OCI Functions 是一项无服务器计算服务,开发人员无需管理任何基础设施即可构建、运行和扩展应用。OCI Functions 与其他 OCI 服务和 SaaS 应用实现了原生集成。OCI Functions 基于开源 Fn 项目,因此开发人员可以创建可轻松移植到其他云和内部部署环境的应用。基于函数的代码通常在很短的时间内运行,是无状态代码,并出于单一逻辑目的执行。客户只需为使用的资源付费。
-
Oracle Cloud Infrastructure Container Engine for Kubernetes (OKE) 是一项托管 Kubernetes 服务,可简化企业级的大规模 Kubernetes 运营。它可减少管理复杂 Kubernetes 基础设施所需的时间、成本和工作。OKE 允许您部署 Kubernetes 集群,以通过自动扩展、更新和安全修补程序确保控制层和 worker 节点上的可靠运行。此外,OKE 还通过虚拟节点提供完全无服务器 Kubernetes 体验。
在本教程中,我们将看到处理大量文件的常见方式,其中应用程序可以将文件存放在 OCI 对象存储的存储桶中,当这些文件被存放时,将生成一个事件,允许触发函数将此文件的 URL 写入 OCI 流处理。
注:我们可以想象,只有一些源应用在 OCI Streaming 中保存文件内容,而我们的应用仅读取此内容,但是在 Kakfa 队列中传输大量数据并不是一个好做法。为此,我们的方法将使用一个名为 Claim-Check 的模式,该模式将完全按照我们的建议执行,而不是通过消息队列发送文件,而是发送对此文件的引用。我们会将读取文件委托给负责处理该文件的应用程序。
本教程将介绍以下组件:OCI 对象存储、事件服务、函数和流处理。
在这个链的结尾,我们将让应用程序使用流队列,但是,我们不会讨论如何处理文件。
目标
- 实施可扩展的事件架构,通过使用 OCI Object Storage、Event Service、Functions 和 Streaming 处理大量文件。
先决条件
-
为存储桶、函数和流配置了 VNC、子网和所有安全设置。
-
配置为管理存储桶、事件服务、函数和流处理的 Oracle Cloud Infrastructure Identity and Access Management (OCI IAM) 用户。
任务 1:创建 OCI 流处理实例
OCI Streaming 是一个 Kafka 类型的托管流处理服务。我们可以在市场上使用 Kafka API 和通用 SDK 开发应用。在此任务中,我们将创建 OCI 流处理实例,并将其配置为在两个应用中执行,以发布和使用大量数据。
-
登录到 OCI 控制台,单击分析和 AI 和流。
-
选择区间,然后单击创建流。
-
输入流实例的流名称,并使用默认值保留其他参数。单击创建以初始化实例并等待活动状态。
注:
-
在流处理创建过程中,我们可以选择自动创建默认流池,因此将自动创建默认池。
-
您可以在专用子网中创建流实例。在这种情况下,请注意任务 4 中的函数,它必须位于同一专用子网中,或者位于有权访问专用子网流实例的子网中。检查您的 VCN、子网、安全列表、服务网关或其他安全组件。请确保您的函数可以访问 OCI 流处理实例。
-
-
单击 DefaultPool 链接。
-
单击 Kafka 连接设置并查看连接设置。请记下以下信息,因为这些信息在后续任务中是必需的。
任务 2:创建 OCI 对象存储桶
我们需要创建一个存储桶。存储桶是用于存储对象的逻辑容器,因此用于此演示的所有文件都将存储在此存储桶中。
-
打开 OCI 控制台并导航到存储、存储桶。在存储桶部分中,选择区间,区间将与在任务 1 中创建的 OCI 流处理实例相同。
-
单击创建组并输入组名称。保留其他参数的默认值,然后单击创建。
我们可以看到创建的存储桶。
注:查看存储桶的 OCI IAM 策略。如果您要在演示应用中使用这些存储桶,则需要设置策略。有关详细信息,请参阅对象存储概述和 OCI IAM 策略。
任务 3:为 OCI 事件服务激活 OCI 对象存储存储桶
我们需要启用存储桶以发出事件。因此,单击存储桶详细信息并搜索发出对象事件编辑链接并激活它。
任务 4:创建 OCI 函数
要执行以下任务,请从此处下载代码:OCI_Streaming_Claim_Check.zip 。
-
了解代码
有两个代码文件,即主代码
HelloFunction.java
和 OCI 流处理生产者代码Producer.java
。-
HelloFunction.java
.在代码的这一部分,我们需要捕获来自 OCI 事件服务的数据,因此有 3 个源。
- 上下文:此属性 cmes 来自 RuntimeContext,我们使用
REGION
变量。 - 事件数据: OCI Events Services 将数据生成为
resourceName
。 - 其他事件详细信息数据: OCI 对象存储的 OCI 事件服务生成的数据为
namespace
和bucketName
。
可以装载 OCI 对象存储文件 URL。
主代码可以将 URL 传递给 OCI Streaming 生成器。
- 上下文:此属性 cmes 来自 RuntimeContext,我们使用
-
Producer.java
.这是用于生成 Claim-check 模式的 Kafka 信息的
Message
类结构。仅key
和value
。这是流处理的基本代码。
-
-
构建和部署 OCI 函数
在此步骤中,我们需要使用 OCI CLI 创建 OCI 函数并将代码部署到 OCI 租户中。要创建 OCI 函数,请参阅函数:使用 CLI 入门,然后按照步骤搜索 Java 选项。需要使用以下信息创建函数。
Application: ocistreaming-app (follow the link Functions: Get Started using CLI) fn create app ocistreaming-app --annotation oracle.com/oci/subnetIds='["<the same OCID of your streaming subnet>"]' Context Variable: REGION=<your streaming region name, ex: us-ashburn-1> fn config app ocistreaming-app REGION=us-ashburn-1
请记住在其中部署函数的区间。您需要此信息才能配置 OCI 事件服务。
任务 5:配置 OCI 事件
让我们配置事件规则来触发函数以获取存储桶信息并将其发送到 OCI Streaming。
-
为规则选择相同的区间,然后单击创建规则。
-
输入以下信息。
-
在规则条件部分中。
- 条件:
Event Type
。 - 服务名:
Object Storage
。 - 事件类型:
Object-Create, Object-Delete, Object-Update
。
- 条件:
-
在操作部分中。
- 操作类型:
Functions
。 - 函数区间:
<your function compartment name>
。 - 函数应用程序:
<your function app, in this example ocistreaming-app>
。 - 函数:
fn_stream
。
- 操作类型:
-
任务 6:测试您的事件电路
注:对于专用网络,需要在连接到 OCI 流处理的同一专用子网的堡垒中执行测试代码。
在 OCI_Streaming_Claim_Check.zip 源代码包中,可以找到名为 monitoring
的文件夹和名为 consume.py
的文件。我们可以使用此代码来监控和测试解决方案是否正常工作。
我们需要配置代码。
配置流参数后,您可以运行代码并验证电路,即存储桶、事件、函数和流处理。
相关链接
确认
- 作者 - Cristiano Hoshikawa(Oracle LAD A 团队解决方案工程师)
更多学习资源
浏览 docs.oracle.com/learn 上的其他实验室,或者通过 Oracle Learning YouTube 频道访问更多免费学习内容。此外,请访问 education.oracle.com/learning-explorer 以成为 Oracle Learning Explorer。
有关产品文档,请访问 Oracle 帮助中心。
Process Files on Oracle Cloud Infrastructure Object Storage with a Scalable Cloud Native Flow
F94116-01
March 2024
Copyright © 2024, Oracle and/or its affiliates.