注意:
- 本教程需要访问 Oracle Cloud。要注册免费账户,请参阅开始使用 Oracle Cloud Infrastructure 免费套餐。
- 它对 Oracle Cloud Infrastructure 身份证明、租户和区间使用示例值。完成实验室后,请使用特定于云环境的那些值替换这些值。
使用 OCI 事件服务和 OCI 函数将事件写入 OCI 专用流
简介
Oracle Cloud Infrastructure (OCI) Events Service 规则使用符合 Cloud Native Computing Foundation (CNCF) CloudEvents 标准的事件跟踪 OCI 中的资源更改。这使开发人员能够通过使用 OCI Functions 触发代码、将数据写入 OCI Streaming 或使用 OCI Notifications 发送警报来响应实时更改。
虽然本地支持将事件发送到公共流,但写入专用流需要使用 OCI 函数或定制代码的定制方法。本教程演示如何使用 OCI Functions 从 OCI 事件服务规则捕获事件有效负载并将其写入专用流。
目标
- 自动执行 OCI 事件处理设置,并以可扩展的方式将其发送到 OCI 专用流,以实现安全、定制的处理。
先决条件
-
访问 OCI 租户。
-
管理 OCI 事件服务规则、Oracle 应用和 OCI 流处理服务的权限。
任务 1:设置必需的策略和 Oracle Cloud Infrastructure Identity and Access Management (OCI IAM) 权限
此解决方案的每个组件都必须有权访问与之交互的 OCI 资源。要遵循本教程,开发人员必须具有以下权限。
-
用户策略:管理 OCI 事件服务规则、OCI 通知、OCI 函数、OCI 流以及 OCI Vault(可选)和网络系列的使用。
-
服务策略:授予将消息写入 OCI 专用流的功能权限。
详细的政策可以在这里找到:
任务 2:创建专用流
OCI 流处理是一项完全托管的 OCI 服务,其中对静态数据和传输中的数据进行加密,从而确保消息的完整性和安全性。为了增强安全性,您可以使用 OCI Vault 服务来存储和管理您自己的加密密钥,以满足特定的合规性或安全要求。可以在虚拟云网络 (Virtual Cloud Network,VCN) 中配置专用端点,以进一步保护流,将专用 IP 地址与流池关联。这可确保 OCI Streaming 流量保持在 VCN 内,从而完全避免互联网。但是,请注意,使用专用端点的流无法从 Internet 访问,这也限制了通过控制台查看其最新消息的能力。要使用来自专用流的消息,使用方必须对托管专用流的网络具有路由和访问权。
创建流和流池。输入 Stream Name(流名称),然后选择 Create New Stream Pool(创建新流池)以创建流池。在配置流池部分中,输入流池名称,选择专用端点,然后相应地输入 VCN 和子网以及网络详细信息。尽管是可选的,但我们建议为该 NSG 内的所有流量提供一个包含入站规则的网络安全组。有关更多信息,请参阅创建流和创建流池。
您可以使用自己的加密密钥,更好地控制密钥的生命周期。您可以选择调整流中消息的保留。默认值为 1 天,最大值为 7 天。
记下流 OCID 和消息端点。我们需要将此信息传递给函数。
任务 3:创建 OCI 通知主题和 OCI 事件服务规则
作为复习者,以下是事件的工作方式。OCI 服务为资源或数据发出事件。我们创建规则,其中包括用于指定相关事件的筛选器。当筛选器找到匹配事件时,规则必须触发操作。该操作可以是 OCI 通知主题、OCI 流处理服务或 OCI 函数。
-
方法 1:如果要求将事件有效负载发送到公共流,则不需要函数,因为可以本机配置流。
-
方法 2:如果要求仅将特定事件有效负载发送到专用流,则可以直接将该函数作为操作提供。
-
方法 3:如果要求将多个事件有效负载发送到专用流,则理想的方法是将事件有效负载发送到 OCI 通知主题并将 OCI 函数订阅到 OCI 通知主题。
在本教程中,我们将探讨方法 3,这是最复杂的。
任务 4:开发和部署函数
此函数对事件有效负载进行编码并将其发布到提供的消息端点和流。有关详细信息,请参阅创建函数。
-
func.py
:import json import logging import oci from base64 import b64encode def handler(ctx, data): try: # Parse the incoming event payload body = json.loads(data.getvalue()) cfg = ctx.Config() stream_ocid = cfg["stream_ocid"] stream_endpoint = cfg["stream_endpoint"] logging.getLogger().info(f'Function invoked for event: {body}') # Publish the event to the stream publish_to_stream(stream_ocid, stream_endpoint, body) return {"status": 200, "message": "Successfully processed event"} except (Exception, ValueError) as ex: logging.getLogger().error(f'Error processing event payload: {str(ex)}') return {"status": 500, "message": "Internal Server Error"} def publish_to_stream(stream_ocid, stream_endpoint, event_data): signer = oci.auth.signers.get_resource_principals_signer() stream_client = oci.streaming.StreamClient(config={}, signer=signer, service_endpoint=stream_endpoint) # Convert event data to JSON string and encode event_payload = json.dumps(event_data).encode('utf-8') # Build the message list message_list = [ oci.streaming.models.PutMessagesDetailsEntry( key=b64encode("partition-key-1".encode()).decode(), value=b64encode(event_payload).decode() ), ] try: logging.getLogger().info(f"Publishing {len(message_list)} messages to stream {stream_ocid}") put_message_details = oci.streaming.models.PutMessagesDetails(messages=message_list) put_message_result = stream_client.put_messages(stream_ocid, put_message_details) # Log publishing results for entry in put_message_result.data.entries: if entry.error: logging.getLogger().error(f"Error publishing message: {entry.error_message}") else: logging.getLogger().info(f"Published message to partition {entry.partition}, offset {entry.offset}") except Exception as ex: logging.getLogger().error(f"Failed to publish messages to stream: {str(ex)}") raise
-
func.yaml
:schema_version: 20180708 name: events-to-pvt-stream version: 0.0.1 runtime: python build_image: fnproject/python:3.9-dev run_image: fnproject/python:3.9 entrypoint: /python/bin/fdk /function/func.py handler memory: 256 config: stream_ocid: ocid1.stream.123445 stream_endpoint: https://xyz.us-ashburn-1.oci.oraclecloud.com
-
requirements.txt
:fdk oci
最后一步是告诉函数专用流的位置。此函数使用配置参数,如果要在其他租户中部署,则使其可重用。
任务 5:将函数订阅到通知主题
在此任务中,将部署的功能订阅到 OCI 通知主题。每当 OCI 通知主题收到消息时,都会触发该函数,该函数将为专用流写入事件有效负载。
您可以通过更改函数代码来消除或增强某些字段以满足要求。功能订阅不需要确认。有关详细信息,请参阅创建功能订阅。
验证
数据流可以在多个位置进行验证。
-
验证事件度量页以检查是否有任何事件与提供的规则匹配。故障图必须为空。
-
查看 OCI 通知主题度量。确保已传送所有消息;失败的消息图表中没有数据。
-
检查函数 Invocation 度量。确保没有错误,并且函数没有限制。
-
检查数据是否正在摄取到专用流中。
如果以下任一图表中缺少数据,请在此处停止并为该服务启用日志。日志将解释特定资源执行任务失败的原因。
后续步骤
在本教程中,您将学习如何集成 OCI 事件服务、OCI 通知、OCI 函数和 OCI 流处理,以安全地处理事件有效负载并将其发布到专用流。您将了解如何设置安全端点、管理加密密钥以及使用 OCI IAM 进行细粒度访问控制,以确保只有授权用户和服务才能与流数据交互。
此解决方案可帮助团队实时捕获资源更改,同时保持稳健的安全标准。组织可以使用专用流来保护敏感数据,遵守行业法规,并确保运营工作流与组织安全性和合规性目标保持一致。此方法可增强您的安全态势,并使您的团队实现无缝且安全的事件驱动自动化。
有关使用 OCI Functions 和 OCI 专用流功能的详细信息,请与您的 Oracle 代表联系,或者参见云安全解决方案。
确认
- 作者 — Aneel Kanuri(杰出云架构师)
更多学习资源
浏览 docs.oracle.com/learn 上的其他实验室,或者访问 Oracle Learning YouTube 渠道上的更多免费学习内容。此外,请访问 education.oracle.com/learning-explorer 成为 Oracle Learning Explorer。
有关产品文档,请访问 Oracle 帮助中心。
Write Events to an OCI Private Stream using OCI Events Service and OCI Functions
G23242-01
December 2024