Note:
- This tutorial requires access to Oracle Cloud. To sign up for a free account, see Get started with Oracle Cloud Infrastructure Free Tier.
- It uses example values for Oracle Cloud Infrastructure credentials, tenancy, and compartments. When completing your lab, substitute these values with ones specific to your cloud environment.
Write Events to an OCI Private Stream using OCI Events Service and OCI Functions
Introduction
Oracle Cloud Infrastructure (OCI) Events Service rules track resource changes in OCI using events that comply with the Cloud Native Computing Foundation (CNCF) CloudEvents standard. This enables developers to respond to real-time changes by triggering code with OCI Functions, writing data to OCI Streaming, or sending alerts using OCI Notifications.
While sending events to public streams is natively supported, writing to a private stream requires a custom approach using OCI Functions or custom code. This tutorial demonstrates using OCI Functions to capture event payloads from an OCI Events Service rule and write them to a private stream.
Objective
- Automate setup for processing OCI events and sending them to an OCI private stream in a scalable manner for secure, customized processing.
Prerequisites
-
Access to an OCI tenancy.
-
Privileges to manage OCI Events Service rules, Oracle Applications, and OCI Streaming services.
Task 1: Set Up the Required Policies and Oracle Cloud Infrastructure Identity and Access Management (OCI IAM) Permissions
Each component of this solution must have access to the OCI resources it interacts with. To follow this tutorial, the developer must have the following permissions.
-
User Policies: Manage OCI Events Service rules, OCI Notifications, OCI Functions, OCI streams and use of OCI Vault (optional) and network family.
-
Service Policy: Grant the function permission to write messages to OCI private stream.
Detailed policies can be found here:
Task 2: Create a Private Stream
OCI Streaming is a fully managed OCI service in which data is encrypted at rest and in transit, ensuring the integrity and security of messages. For enhanced security, you can use the OCI Vault service to store and manage your own encryption keys, meeting specific compliance or security requirements. Private endpoints can be configured within a Virtual Cloud Network (VCN) to further secure your streams, associating a private IP address to the stream pool. This ensures that the OCI Streaming traffic stays within the VCN, avoiding the internet entirely. However, note that streams using private endpoints are not accessible from the internet, which also limits the ability to view their latest messages through the console. To consume messages from a private stream, the consumer must have both route and access to the network where the private stream is hosted.
Create a stream and stream pool. Enter Stream Name and select Create New Stream Pool to create stream pool. In the Configure Stream Pool section, enter Stream Pool Name, select Private Endpoint and enter VCN and Subnet and network details accordingly. Though optional, we recommend providing a Network Security Group with an ingress rule for all traffic within that NSG. For more information, see Create Stream and Create Stream Pool.
You can use your own encryption keys, gaining greater control over the key’s lifecycle. You have an option to adjust the retention of messages within the stream. The default is 1 day, and the maximum is 7 days.
Note down the stream OCID and the messages endpoint. We need to pass this information to the function.
Task 3: Create OCI Notifications Topic and OCI Events Service Rule
As a refresher, here is how an event works. OCI services emit events for resources or data. We create rules, which include a filter to specify the events of interest. The rules must trigger an action when the filter finds a matching event. The action could be an OCI Notifications topic, OCI Streaming service, or OCI Functions.
-
Approach 1: If the requirement is to send event payload to a public stream, then there is no need for a function, as the flow can be natively configured.
-
Approach 2: If the requirement is to send only a specific event payload to a private stream, then the function can be provided as an action directly.
-
Approach 3: If the requirement is to send multiple events payload to a private stream, then the ideal way is to send the event payload to an OCI Notifications topic and subscribe the OCI Functions to the OCI Notifications topic.
In this tutorial, we will explore Approach 3, which is the most complex.
-
Create an OCI Notifications topic. For more information, see Creating a Topic.
-
Create an event rule. To justify the approach, we will create two event rules. For more information, see Creating an Events Rule.
Task 4: Develop and Deploy the Function
This function encodes the event payload and publishes it to the provided messages endpoint and stream. For more information, see Creating functions.
-
func.py
:import json import logging import oci from base64 import b64encode def handler(ctx, data): try: # Parse the incoming event payload body = json.loads(data.getvalue()) cfg = ctx.Config() stream_ocid = cfg["stream_ocid"] stream_endpoint = cfg["stream_endpoint"] logging.getLogger().info(f'Function invoked for event: {body}') # Publish the event to the stream publish_to_stream(stream_ocid, stream_endpoint, body) return {"status": 200, "message": "Successfully processed event"} except (Exception, ValueError) as ex: logging.getLogger().error(f'Error processing event payload: {str(ex)}') return {"status": 500, "message": "Internal Server Error"} def publish_to_stream(stream_ocid, stream_endpoint, event_data): signer = oci.auth.signers.get_resource_principals_signer() stream_client = oci.streaming.StreamClient(config={}, signer=signer, service_endpoint=stream_endpoint) # Convert event data to JSON string and encode event_payload = json.dumps(event_data).encode('utf-8') # Build the message list message_list = [ oci.streaming.models.PutMessagesDetailsEntry( key=b64encode("partition-key-1".encode()).decode(), value=b64encode(event_payload).decode() ), ] try: logging.getLogger().info(f"Publishing {len(message_list)} messages to stream {stream_ocid}") put_message_details = oci.streaming.models.PutMessagesDetails(messages=message_list) put_message_result = stream_client.put_messages(stream_ocid, put_message_details) # Log publishing results for entry in put_message_result.data.entries: if entry.error: logging.getLogger().error(f"Error publishing message: {entry.error_message}") else: logging.getLogger().info(f"Published message to partition {entry.partition}, offset {entry.offset}") except Exception as ex: logging.getLogger().error(f"Failed to publish messages to stream: {str(ex)}") raise
-
func.yaml
:schema_version: 20180708 name: events-to-pvt-stream version: 0.0.1 runtime: python build_image: fnproject/python:3.9-dev run_image: fnproject/python:3.9 entrypoint: /python/bin/fdk /function/func.py handler memory: 256 config: stream_ocid: ocid1.stream.123445 stream_endpoint: https://xyz.us-ashburn-1.oci.oraclecloud.com
-
requirements.txt
:fdk oci
The final step is to tell the function where the private stream is. This function uses configuration parameters, making it reusable if you want to deploy in another tenancy.
Task 5: Subscribe the Function to the Notifications Topic
In this task, subscribe the deployed function to the OCI Notifications topic. Whenever the OCI Notifications topic receives a message, it will trigger the function, and the function will write the event payload for a private stream.
You can eliminate or enhance certain fields by altering the function code to meet the requirements. Confirmation is not required for function subscriptions. For more information, see Creating a Function Subscription.
Verification
There are multiple places where the data flow can be verified.
-
Verify the event metrics page to check if any events match the provided rule. The failure graph must be empty.
-
Check the OCI Notifications topic metrics. Ensure all messages are delivered; there is no data in the failed messages charts.
-
Check the function Invocation metrics. Ensure there are no errors and the function is not throttling.
-
Check that the data is being ingested into the private stream.
If the data is absent in any of the following charts, stop there and enable logs for that service. Logs will explain why a specific resource is failing to perform the task.
Next Steps
In this tutorial, you learned how to integrate the OCI Events Service, OCI Notifications, OCI Functions, and OCI Streaming to securely process and publish event payloads to private streams. You explored setting up secure endpoints, managing encryption keys, and using OCI IAM for fine-grained access control to ensure that only authorized users and services can interact with your streaming data.
This solution empowers teams to capture resource changes in real-time while maintaining robust security standards. Organizations can use private streams to safeguard sensitive data, comply with industry regulations, and ensure that operational workflows align with organizational security and compliance goals. This approach strengthens your security posture and enables your team to achieve seamless and secure event-driven automation.
For more information about using OCI Functions and OCI private stream capabilities, contact your Oracle representative or see Cloud Security Solutions.
Acknowledgments
- Author - Aneel Kanuri (Distinguished Cloud Architect)
More Learning Resources
Explore other labs on docs.oracle.com/learn or access more free learning content on the Oracle Learning YouTube channel. Additionally, visit education.oracle.com/learning-explorer to become an Oracle Learning Explorer.
For product documentation, visit Oracle Help Center.
Write Events to an OCI Private Stream using OCI Events Service and OCI Functions
G23048-01
December 2024