注意:
- 本教程需要访问 Oracle Cloud。要注册免费账户,请参阅开始使用 Oracle Cloud Infrastructure 免费套餐。
- 它对 Oracle Cloud Infrastructure 身份证明、租户和区间使用示例值。完成实验室后,请使用特定于云环境的那些值替换这些值。
使用 Oracle Cloud Infrastructure Functions 将日志写入 Oracle Cloud Infrastructure Private Stream
简介
日志是现代云运营的基石,可提供有关系统活动、性能和安全性的重要洞察。对于处理敏感数据的企业来说,安全高效地管理日志至关重要。本教程将指导您使用 Oracle Cloud Infrastructure (OCI) 为日志管理构建安全、可扩展且事件驱动的架构。
该解决方案的核心是 OCI Functions,可确保在不暴露于公共端点的情况下安全地传输和处理日志。该架构由通过 OCI Connector Hub 收集并压缩到 OCI Object Storage 中的日志中的事件驱动。每次创建日志文件时,它会触发 OCI Functions,解压缩它,将其分解为可管理的块,并将日志发布到专用流以实现安全存储或进一步处理。
这种事件驱动的方法可确保无缝的自动化管道,使组件对变化做出动态反应,最大限度地减少延迟并最大限度地提高效率。通过利用 OCI 的原生服务,您将获得一个安全、可扩展的框架来实时处理大量日志处理。
目标
- 实施弹性、安全的日志管理工作流,专为希望保护敏感日志数据并优化云环境中的运营的企业而设计。
先决条件
-
访问 OCI 租户。
-
管理 OCI Object Storage、OCI 日志、OCI Connector Hub、OCI Event Service 规则、Oracle Applications 和 OCI Streaming 服务的权限。
任务 1:设置必需的策略和 Oracle Cloud Infrastructure Identity and Access Management (OCI IAM) 权限
此解决方案的每个组件都必须有权访问与之交互的 OCI 资源。要遵循本教程,需要以下权限。
-
用户策略:管理 OCI Object Storage、OCI Connector Hub、OCI Event Service 规则、OCI 日志、OCI Functions 和 OCI 流。在 OCI Vault(可选)和网络系列上使用。
-
服务策略:授予从 OCI 对象存储存储桶(读取对象)读取消息以及写入流(使用流推送)的功能权限。需要动态组。
详细的政策可以在这里找到:
任务 2:创建专用流
OCI 流处理是一项完全托管的 OCI 服务,其中对静态数据和传输中的数据进行加密,从而确保消息的完整性和安全性。为了增强安全性,您可以使用 OCI Vault 服务来存储和管理您自己的加密密钥,以满足特定的合规性或安全要求。可以在虚拟云网络 (Virtual Cloud Network,VCN) 中配置专用端点,以进一步保护流,将专用 IP 地址与流池关联。这可确保 OCI Streaming 流量保持在 VCN 内,从而完全避免互联网。但是,请注意,使用专用端点的流无法从 Internet 访问,从而限制了通过控制台查看其最新消息的能力。要使用来自专用流的消息,使用方必须对托管专用流的网络具有路由和访问权。
创建流和流池。输入 Stream Name(流名称),然后选择 Create New Stream Pool(创建新流池)以创建流池。在配置流池部分中,输入流池名称,选择专用端点,然后相应地输入 VCN 和子网以及网络详细信息。尽管是可选的,但我们建议为该 NSG 内的所有流量提供一个包含入站规则的网络安全组。有关更多信息,请参阅创建流和创建流池。
您可以使用自己的加密密钥,更好地控制密钥的生命周期。您可以选择调整流中消息的保留。默认值为 1 天,最大值为 7 天。
记下流 OCID 和消息端点。我们需要将此信息传递给函数。
任务 3:创建和配置 OCI Connector Hub
OCI Connector Hub 充当安全消息总线,有助于在源和目标之间实现无缝、可靠的数据传输。在此架构中,源为 OCI 日志记录,目标为 OCI 对象存储,在这些存储中对这些日志进行压缩并进行进一步处理。OCI Connector Hub 充当中介,可确保高效的数据流,同时维护传输消息的安全性和完整性。
本教程假定在子网上启用了流日志,并且 OCI 对象存储存储桶可用。有关启用流日志和创建存储桶的更多信息,请参见启用流日志和创建对象存储存储桶。
创建存储桶时,请确保选择发出对象事件。这是我们事件驱动架构的关键。
配置 OCI Connector 中心,在 OCI 日志记录服务和 OCI 对象存储存储桶之间创建数据流。有关更多信息,请参见 Creating a Connector with a Logging Source 。
通过调整批结转详细信息,您可以配置日志写入 OCI 对象存储存储桶的频率。默认值为 100 MB 或 7 分钟。
任务 4:开发和部署函数
此函数将读取 OCI 对象存储中的对象并将消息写入流。为此,它将在以下两者之间执行操作:
- 从存储桶读取对象。
- 解压缩对象。
- 检查对象大小并根据需要创建 1 MB 的块。OCI 流处理服务限制生成器可以发布到流的唯一消息的最大大小为 1 MB。
- 对消息进行编码。
- 发布到流。
有关详细信息,请参阅创建函数。
-
func.py
:import io import json import logging import oci import gzip from base64 import b64encode def handler(ctx, data: io.BytesIO = None): try: # Parse the incoming data cfg = ctx.Config() body = json.loads(data.getvalue()) bucket_name = body["data"]["additionalDetails"]["bucketName"] object_name = body["data"]["resourceName"] stream_ocid = cfg["stream_ocid"] stream_endpoint = cfg["stream_endpoint"] logging.getLogger().info(f'Function invoked for bucket upload: {bucket_name}') except (Exception, ValueError) as ex: logging.getLogger().error(f'Error parsing JSON payload: {str(ex)}') return {"status": 400, "message": "Bad Request"} try: # Get the object data from Object Storage object_content = get_object(bucket_name, object_name) # Check if the object is a .gz file and decompress it if object_name.endswith('.gz'): logging.getLogger().info(f'Decompressing object: {object_name}') object_content = gzip.decompress(object_content) logging.getLogger().info(f'Object Content: {object_content.decode("utf-8")[:100]}...') # Split content into message chunks ensuring no message is split messages = split_content_into_messages(object_content, max_size=1024*1024) # Publish messages to the stream for message in messages: publish_to_stream(stream_ocid, stream_endpoint, data.getvalue().decode('utf-8'), message) return {"status": 200, "message": "Successfully processed object update"} except Exception as ex: logging.getLogger().error(f'Error processing object: {str(ex)}') return {"status": 500, "message": "Internal Server Error"} def get_object(bucket_name, object_name): signer = oci.auth.signers.get_resource_principals_signer() object_storage_client = oci.object_storage.ObjectStorageClient(config={}, signer=signer) namespace = object_storage_client.get_namespace().data try: logging.getLogger().info(f'Searching for bucket: {bucket_name}, object: {object_name}') obj = object_storage_client.get_object(namespace, bucket_name, object_name) logging.getLogger().info(f'Found object: {object_name}') return obj.data.content except Exception as ex: logging.getLogger().error(f'Failed to retrieve object: {str(ex)}') raise def publish_to_stream(stream_ocid, stream_endpoint, event_data, object_content): signer = oci.auth.signers.get_resource_principals_signer() stream_client = oci.streaming.StreamClient(config={}, signer=signer, service_endpoint=stream_endpoint) # Build the message list message_list = [ oci.streaming.models.PutMessagesDetailsEntry( key=b64encode("partition-key-1".encode()).decode(), value=b64encode(object_content).decode() ), ] try: logging.getLogger().info(f"Publishing {len(message_list)} messages to stream {stream_ocid}") put_message_details = oci.streaming.models.PutMessagesDetails(messages=message_list) put_message_result = stream_client.put_messages(stream_ocid, put_message_details) # Log publishing results for entry in put_message_result.data.entries: if entry.error: logging.getLogger().error(f"Error publishing message: {entry.error_message}") else: logging.getLogger().info(f"Published message to partition {entry.partition}, offset {entry.offset}") except Exception as ex: logging.getLogger().error(f"Failed to publish messages to stream: {str(ex)}") raise def split_content_into_messages(content, max_size): """ Splits content into messages of specified max size while ensuring no message is split. Attempts to split based on newline characters for text content. """ messages = [] current_chunk = [] current_size = 0 for line in content.decode('utf-8').splitlines(keepends=True): line_size = len(line.encode('utf-8')) if current_size + line_size > max_size: messages.append(''.join(current_chunk).encode('utf-8')) current_chunk = [line] current_size = line_size else: current_chunk.append(line) current_size += line_size if current_chunk: messages.append(''.join(current_chunk).encode('utf-8')) return messages
-
func.yaml
:schema_version: 20180708 name: logs-to-pvt-stream version: 0.0.1 runtime: python build_image: fnproject/python:3.9-dev run_image: fnproject/python:3.9 entrypoint: /python/bin/fdk /function/func.py handler memory: 256 config: stream_ocid: ocid1.stream.123445 stream_endpoint: https://xyz.us-ashburn-1.oci.oraclecloud.com
-
requirements.txt
:fdk oci
最后一步是告诉函数专用流的位置。此函数使用配置参数,如果要在其他租户中部署,则使其可重用。
任务 5:创建事件并订阅函数
在此任务中,将函数订阅到对象上载事件。将事件类型创建规则作为对象 - 创建,将存储桶名称作为条件属性。有关更多信息,请参阅创建事件规则。
验证
数据流可以在多个位置进行验证。
-
验证日志组度量以检查是否摄取了流日志。
-
下一个跃点是连接器集线器度量。OCI Connector Hub 会收集日志并将其发送到 OCI 对象存储。确保源和目标上没有错误。
-
下一个跃点是 OCI Object Storage。确保对象计数递增。如果需要,请启用读写日志以进一步调试。
-
下一个跃点是 OCI 事件服务。复核度量以确保没有传送失败。
-
下一步是检查函数 Invocation 度量。确保没有错误,并且函数没有限制。
-
检查数据是否正在摄取到专用流中。
如果以下任一图表中缺少数据,请在此处停止并为该服务启用日志。日志将解释特定资源执行任务失败的原因。
后续步骤
祝贺您在 OCI 中成功实施安全且事件驱动的日志管理解决方案!通过结合 OCI Logging、OCI Connector Hub、OCI Object Storage 和 OCI 专用流的强大功能,您创建了一个强大的架构,可确保以近乎实时的方式安全地收集、处理和发布日志。
此解决方案通过专用流保护敏感日志数据,并展示事件驱动自动化的效率。随着系统扩展,此架构将无缝适应,使您能够以最少的手动干预来处理大量日志。
借助该框架,您可以确保安全、高效的日志处理,同时符合隐私要求。通过此架构,您可以灵活地构建符合企业需求的定制处理管道。通过其他分析或预警机制扩展此设置可以更深入地了解系统事件,并增强主动检测和响应异常的能力。
有关使用 OCI Functions 和 OCI 专用流功能的详细信息,请与您的 Oracle 代表联系,或者参见云安全解决方案。
确认
- 作者 — Aneel Kanuri(杰出云架构师)
更多学习资源
浏览 docs.oracle.com/learn 上的其他实验室,或者访问 Oracle Learning YouTube 渠道上的更多免费学习内容。此外,请访问 education.oracle.com/learning-explorer 成为 Oracle Learning Explorer。
有关产品文档,请访问 Oracle 帮助中心。
Write Logs to Oracle Cloud Infrastructure Private Stream using Oracle Cloud Infrastructure Functions
G23254-01
December 2024