Note:
- This tutorial requires access to Oracle Cloud. To sign up for a free account, see Get started with Oracle Cloud Infrastructure Free Tier.
- It uses example values for Oracle Cloud Infrastructure credentials, tenancy, and compartments. When completing your lab, substitute these values with ones specific to your cloud environment.
Stream AVRO Messages using Oracle Cloud Infrastructure Streaming and OCI Data Flow with Micro-Batch processing
Introduction
In today’s data-driven landscape, the ability to process and analyze real-time data streams is crucial for businesses aiming to gain insights and respond rapidly to changing conditions. Streaming data processing technologies have emerged as potent solutions for handling high-volume, continuous data streams. In this tutorial, we will explore an innovative approach to efficiently stream AVRO messages using Oracle Cloud Infrastructure (OCI) Streaming, combined with the micro-batch processing technique, and enhanced with the serverless capabilities of Oracle Functions based on the open-source FN Project.
Introduction to AVRO and Streaming Data
AVRO, a widely adopted data serialization format, is known for its efficiency in representing intricate data structures and its compatibility with various programming languages. When integrated with streaming technologies, AVRO empowers organizations to transmit and process data nearly in real-time, enabling them to extract valuable insights without the latency typically associated with batch processing.
OCI Streaming: Empowering Real-Time Data
Oracle Cloud Infrastructure (OCI) has an array of tools for handling data in the cloud, with OCI Streaming being one such service tailored for high-throughput, real-time data streams. By leveraging OCI Streaming, developers can construct scalable and dependable data pipelines that efficiently ingest, process, and distribute data streams.
OCI Data Flow Managed Spark: A No Lock-In Solution
Oracle Cloud Infrastructure (OCI) Data Flow is a fully managed Apache Spark service that performs processing tasks on extremely large datasets without infrastructure to deploy or manage.
Micro-batch processing Micro-batch processing involves breaking down incoming data streams into compact batches using time or size as criteria. These batches are then processed as smaller jobs. Unlike the constant and individual handling of records in stream processing, micro-batch processing introduces a bit of delay and storage before processing, that allows more control on what to do with the data. Unlike traditional batch processing that tackles big data sets at intervals, micro-batch processing provides nearly real-time processing and delivery of outcomes.
Unlocking Synergy: OCI Streaming, OCI Data Flow, and Oracle Functions
This tutorial delves into the fusion of OCI Streaming, OCI Data Flow managed Spark Streaming, and Oracle Functions. We’ll guide you through the process of setting up an end-to-end streaming data pipeline that ingests AVRO-encoded messages, efficiently processes them using OCI Data Flow managed Spark micro-batch processing capabilities, and introduces serverless event-driven processing with Oracle Functions.
Objective
Use OCI Streaming and OCI Data Flow managed Spark micro-batch processing to create an efficient, real-time data processing pipeline using AVRO format.
IMPORTANT: This tutorial is designed solely for educational and study purposes. It provides an environment for learners to experiment and gain practical experience in a controlled setting. It is crucial to note that the security configurations and practices employed in this lab might not be suitable for real-world scenarios.
Security considerations for real-world applications are often far more complex and dynamic. Therefore, before implementing any of the techniques or configurations demonstrated here in a production environment, it is essential to conduct a comprehensive security assessment and review. This review should encompass all aspects of security, including access control, encryption, monitoring, and compliance, to ensure that the system aligns with the organization’s security policies and standards.
Security should always be a top priority when transitioning from a lab environment to a real-world deployment.
Process flow
High level architecture
Prerequisites - Oracle Cloud Infrastructure
- Oracle account with admin level access permissions
- A compartment to create your resources: Note the COMPARMENT ID
- VCN with two subnets (Private and Public), see Creating Network tutorial
- Make sure your subnets have proper Ingress Rules for Service Gateway and ports 443 & 80 for both private and public subnets
Prerequisites - Local machine environment
-
An Oracle Linux compute instance on the PRIVATE subnet. This is important for accessing resources on the PRIVATE subnet, such as OCI Streaming, Functions and Container Instances that will be deployed during this tutorial.
-
A Bastion Host to connect into the Oracle Linux compute instance and perform tasks for the tutorial. For more details, see Bastion Overview.
-
Local OCI-CLI setup. For more details, see Installing the CLI tutorial.
-
Local DOCKER to be able to build images, if you’re using Oracle Linux, see Install Docker on Oracle Linux.
-
Local Python version 3.9.16 (at least) installed for testing
-
Local Java JDK 11.0.8
-
Local Maven 3.5.4
-
Local FN CLI to be able to deploy your function to OCI: Installing FN CLI
Task 1: Set up Dynamic Groups
-
Go to your domain, click Dynamic Groups and create the following groups.
Group Name: MyFunctions
ALL {resource.type = 'fnfunc', resource.compartment.id = 'pasteYourCompartmentID'}
Group Name: ContainerIntances
ALL {resource.type='compute-container-instances', resource.compartment.id = 'pasteYourCompartmentID'}
Group Name: DataFlowDynamicGroup
ALL {resource.type='dataflowrun', resource.compartment.id = 'pasteYourCompartmentID'}
Task 2: Create Policies
-
Go to Policies and create the following policies.
Policy Name: FunctionsPolicies
Allow dynamic-group MyFunctions to manage objects in compartment YOUR-COMPARTMENT-NAME Allow dynamic-group ContainerIntances to manage objects in compartment YOUR-COMPARTMENT-NAME Allow dynamic-group MyFunctions to read secret-bundles in compartment YOUR-COMPARTMENT-NAME
Policy Name: StreamTopicPolicies
Allow dynamic-group ContainerIntances to read objects in compartment YOUR-COMPARTMENT-NAME Allow dynamic-group ContainerIntances use stream-push in compartment YOUR-COMPARTMENT-NAME
Policy Name: ContainerInstancesPolicy
Allow dynamic-group ContainerIntances to read repos in compartment YOUR-COMPARTMENT-NAME
Policy Name: DataFlowPolicies
Allow dynamic-group DataFlowDynamicGroup to manage objects in compartment YOUR-COMPARTMENT-NAME Allow dynamic-group DataFlowDynamicGroup to {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in compartment YOUR-COMPARTMENT-NAME Allow dynamic-group DataFlowDynamicGroup to manage data-catalog-metastores in compartment YOUR-COMPARTMENT-NAME Allow dynamic-group DataFlowDynamicGroup to manage object-family in compartment YOUR-COMPARTMENT-NAME Allow dynamic-group DataFlowDynamicGroup to read secret-bundles in compartment YOUR-COMPARTMENT-NAME Allow service dataflow to read objects in compartment YOUR-COMPARTMENT-NAME
Task 3: Create storage buckets and upload AVRO schema
-
Go to Buckets, click Create Bucket and create a new bucket named avro-schema-bucket to store the AVRO schema file.
-
Now, select your bucket and TAKE NOTE of the Namespace, we will need it later.
-
Upload the file user.asvc into this created bucket.
Task 4: Create the Private OCI Streaming Topic
-
Go to Analytics & AI, then click Streaming and create a new stream called FrontDoorTopic.
-
Select Stream Pools, click PrivatePool, then click on the Kafka Connection Settings option and TAKE NOTE of the fields, we will need it later.
Task 5: Create the AUTH TOKEN
Create the AUTH TOKEN for your user, this is required to work with the Kafka Topic
-
Click on the top right user icon and then select the User Settings option.
-
Click Auth Tokens and then generate a new token and TAKE NOTE of your token.
Task 6: Create container registry
-
Go to the Developer Services menu, click Container registry and create the following private repositories.
Repository Name Type api-avro-sample_a Private api-avro-sample_b Private fn-recep-avro Private -
Check the repositories and TAKE NOTE of the Namespace.
-
Open the terminal shell where you have OCI CLI and Docker installed, and proceed with the login on the registry. Check what is the correct URL for your REGION. In this tutorial, we’re using Brazil East (Sao Paulo) where the registry url is gru.ocir.io.
docker login gru.ocir.io Username: <your container namespace>/youruser Password: YOUR_AUTH_TOKEN_CREATED_EARLIER
Task 7: Create OCI Vault
Create the OCI Vault and provide the needed variables that will be used later in this tutorial.
-
Go to Identify & Security, click Vault and then click Create Vault.
-
Select the new vault, and create Master Encryption Keys for it.
-
Create a new secret called AUTH_KEY and paste the auth key that you created earlier.
-
Repeat the secret creation process and create the following new secrets:
Variable Name Value KAFKA_BOOTSTRAPSERVER “The boostrap server from your OCI Streaming configuration” KAFKA_TOPIC “FrontDoorTopic” KAFKA_USERNAME “Your username + streampool ID from your OCI Streaming configuration” AUTH_KEY “Your AUTH Token you’ve created on earlier steps” -
Take note of the Secret OCID that was created for each secret and create a new config file.
-
config.properties file contains the mapping variables from application to the vault secrets ocids. The application will use this file to identify what vault secrets needs to be gathered during the runtime.
-
Create a new file on your local machine where you have access to OCI-CLI:
Replace with your OCID for each Secreat
File Name: config.propertieskafka_bootstrapserver_vaultOCID=ocid1.vaultsecret.REPLACE-WITH-YOURS kafka_topic_vaultOCID=ocid1.vaultsecret.REPLACE-WITH-YOURSxxxxxx kafka_username_vaultOCID=ocid1.vaultsecret.REPLACE-WITH-YOURS auth_token_vaultOCID=ocid1.vaultsecret.oc1.REPLACE-WITH-YOURS
-
-
Go to Buckets, click Create Bucket and create a new bucket named config to store the
config.properties
file. -
Upload the config.properties file to the storage bucket config
ls -lrt config.properties oci os object put -bn config --file config.properties --force
Task 8: Create a simple AVRO message and save it to a file using the sample python code provided
-
Open the shell terminal where you have OCI CLI, Docker and Python3 installed and create a new AVRO file that contains a single message based on the same AVRO schema we uploaded in object storage earlier.
Note: You must check the Python version, we are using Python 3.9.16 in this tutorial, earlier versions may not work.
-
Get the code from here Create_avro_sample.zip.
-
Unzip it to the location you choose and then run the program to generate a sample AVRO message:
cd ~ mkdir create_avro_sample cd create_avro_sample unzip CreateAVRO_SampleFile.zip # Check the files are there ls -lrt # install the python dependencies on requirements.txt pip3 install -r requirements.txt # Run the program and create an AVRO message file python3 create_avro_sample.py meu_file.bin '{"id":10029,"name":"John","email":"john@bla.com"}'
Task 9: Create the OCI Function to receive the AVRO message and publish to the OCI Streaming topic
-
Go to Developer Services, under Functions, click Applications and then click Create application.
-
Go to your terminal shell where you have Docker, OCI CLI, FN CLI installed and run the following commands to initialize the function.
Note: If you followed the steps, your Docker login command has already been executed by now, if not, proceed with the Docker login step on the Create container registry task.
fn create context oci-cloud --provider oracle fn use context oci-cloud fn update context oracle.compartment-id PASTE_YOUR_COMPARTMENT_OCID fn update context api-url https://functions.sa-saopaulo-1.oraclecloud.com fn update context registry gru.ocir.io/PASTE_YOUR_REGISTRY_NAMESPACE
Note: In this tutorial, we are using Brazil East(Sao Paulo) region, if you’re using a different region, you need to change the api-url and registry locations.
-
Create a simple Hello-world function to make sure all your settings are correct.
fn init --runtime python fn-recep-avro cd fn-recep-avro fn deploy --app MyReceptionApp fn invoke MyReceptionApp fn-recep-avro
-
Get the AVRO function sample code in file fn-recep-avro.zip and replace the hello-world code we created earlier. You must get both files func.py and requirements.txt in order to work.
# Check you have the right code for func.py & requirements.txt (you got from zip file) ls -lrt
-
Build the new code and deploy the function
fn deploy --app MyReceptionApp
-
To invoke the function, we need to pass an AVRO message as parameter, for that, we will use the created sample AVRO message file from the earlier steps. The first time a function is invoked it takes a little longer since it needs to be started.
# Check where you created the sample avro message file ls -lrt ../create_avro_sample/ # Invoke the function to check if it's working as expected echo -n -e "$(cat ../create_avro_sample/meu_file.bin)" | fn invoke MyReceptionApp fn-recep-avro
Task 10: Create API Gateway to expose the function
-
In your console, click Developer Services, then under API Management, click Gateways and then click Create Gateway.
-
Once created, click on the Deployments option and then click Create deployment.
Name: RecepFunction
Path prefix: /- In Authentication, choose No Authentication since this is a simple lab and there is no API authentication implemented. The main objective here is to demonstrate a HTTPS call passing a Binary AVRO message via API, and for the purpose of this lab, we will not implement any authentication method for this simple lab.
- Before moving forward to a real life environment, make sure you follow the security best practices for API Gateway.
- For more details, see Securing API Gateways and Resources.
Route 1: Path: /
Methos: POST
Backend Type: Oracle functions
Application: Select your function -
Check for the API gateway endpoint and take note.
-
Open your Linux shell terminal and call the API Gateway. Replace the api URL with the correct endpoint you’ve got on the earlier step.
cd create_avro_sample/ ls -lrt curl -X POST -H "Content-Type: application/octet-stream" \ -d "$(echo -n -e "$(cat meu_file.bin)")" \ https://xxxxxxxxxxxxx.apigateway.sa-saopaulo-1.oci.customer-oci.com/
Checkpoint
Task 11: Build container image for API type A
Note: The APIs type A and B code are basically the same with only a different header message in order to simulate two different APIs.
-
Get the code from API type A and unzip it in your Linux shell terminal api-avro-sample_a.zip.
-
Get your container registry namespace that you got on the earlier steps and create the application registry location, following the pattern below. The ocir url is based on your region, i.e. gru.ocir.io for Brasil East(SaoPaulo)
[ocir url]/[your namespace]/api-avro-sample_a:latest
-
In your Linux shell terminal, build and push the docker image for this API.
ls -lrt docker build . -t gru.ocir.io/yournamespace/api-avro-sample_a:latest docker push gru.ocir.io/yournamespace/api-avro-sample_a:latest
Task 12: Build container image for API type B
-
Get the code from API type B and unzip it in your Linux shell terminal api-avro-sample_b.zip.
-
Get your container registry namespace that you got on the earlier steps and create the application registry location, following the pattern below. The ocir url is based on your region, i.e. gru.ocir.io for Brasil East(SaoPaulo)
[ocir url]/[your namespace]/api-avro-sample_b:latest
-
In your Linux shell terminal, build and push the docker image for this API.
ls -lrt docker build . -t gru.ocir.io/yournamespace/api-avro-sample_b:latest docker push gru.ocir.io/yournamespace/api-avro-sample_b:latest
-
Check on your container registry page if the image has been successfully pushed.
Task 13: Deploy the APIs in container service
-
Go to Developer Services, Container Instances and click Create container instance.
-
Repeat Step 1 for api-type-b and select the correct image for the TYPE B API.
-
Go to Developer Services, Container Instances and click Create container instance and repeat the steps to deploy the API type B
-
Get the Internal FQDN address from your container instances.
- Click on the container instance, and take note of each Internal FQDN address.
-
Go to Identify & Secutiry, click Vault, select your VAULT and create two new secrets.
Secret Name Value API_TYPE_A_URL paste the Internal FQDN Private Address for API Type A API_TYPE_B_URL paste the Internal FQDN Private Address for API Type B Take note of each secret OCID
Your vault should look like this now:
-
Edit the config.properties file you uploaded to the config storage bucket and add new entries for the secret OCID
ls -lrt config.properties vi config.properties api_type_a_url_vaultOCID=paste_API_TYPE_A_URL_secretOCID api_type_b_url_vaultOCID=paste_API_TYPE_B_URL_secretOCID # After save, please upload the new version to Object Storage cat config.properties oci os object put -bn config --file config.properties --force
Your file should look like this:
-
Task 14: Test the APIs using create_avro_sample.py
-
Go to your Linux shell terminal where you saved the create_avro_sample.py from Task 7 and create some new messages to test API calls. We are creating two new AVRO files with different IDs (1010 an 1020) that we will use as filter inside the Spark Stream (DataFlow) program.
ls -lrt python3 create_avro_sample.py type_a_message.bin '{"id":1010,"name":"Paul that goes to API type A","email":"paul@bla.com"}' python3 create_avro_sample.py type_b_message.bin '{"id":1020,"name":"Mary that goes to API type B","email":"mary@bla.com"}'
-
Call the APIs passing the AVRO message to test is working fine. Go to your Container Instances page and get the Internal FQDN address for each of the APIs api-type-a and api-type-b. Remember to replace the URL below for the corresponding Internal FQDN address from your APIs.
ls -lrt type* curl -i -X POST -H "Content-Type: application/octet-stream" \ --data-binary "@type_a_message.bin" \ xxx.xx.x.xxx curl -i -X POST -H "Content-Type: application/octet-stream" \ --data-binary "@type_b_message.bin" \ xxx.xxx.xx.xxx
Task 15: Set up the Java Spark streaming application
-
Go to Buckets, click Create Bucket and create two new buckets called dataflow-app-avro and dataflow-logs-avro, this will be used to upload your java application.
-
Double check your java environment versions.
Java
java 11.0.8 2020-07-14 LTS Java(TM) SE Runtime Environment 18.9 (build 11.0.8+10-LTS) Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.8+10-LTS, mixed mode)
Maven
Apache Maven 3.5.4 (Red Hat 3.5.4-5) Maven home: /usr/share/maven Java version: 11.0.20, vendor: Red Hat, Inc., runtime: /usr/lib/jvm/java-11-openjdk-11.0.20.0.8-3.0.1.el8.x86_64 Default locale: en_US, platform encoding: ANSI_X3.4-1968 OS name: "linux", version: "5.15.0-103.114.4.el8uek.x86_64", arch: "amd64", family: "unix"
-
Download the sample code and unzip it on your local environment that has oci-cli, docker, java and maven: spark-consume-avro-message.zip.
unzip spark-consume-avro-message.zip cd spark-consume-avro-message ls -lrt
A little dive into the proxy code to call container instances types A and B.
Check main program file .src/main/java/example/Example.java….
-
Since this Java program uses a lib to handle the spark-avro, we need to package the depedency to pass it to dataflow. For that, we will use Data Flow Dependency Packager, if you need more details you can go to Data Flow Dependency Packager.
The package org.apache.spark:spark-avro_2.12:3.2.1 is already declared on the packages.txt file, you just need to package it by running:
docker run --privileged --platform linux/amd64 --rm -v $(pwd):/opt/dataflow --pull always -it phx.ocir.io/oracle/dataflow/dependency-packager:latest -p 3.8
-
Upload the archive.zip file into the storage bucket called dataflow-app-avro, by using the oci-cli.
oci os object put -bn dataflow-app-avro --file archive.zip --force
-
Compile, package the java application and upload it to the storage bucket dataflow-app-avro
ls -lrt mvn clean install
…reduced number of lines of the compiling log…# upload the JAR file to the storage bucket oci os object put -bn dataflow-app-avro --file target/consumekafka-1.0-SNAPSHOT.jar --force
-
Check your current dataflow-app-avro storage bucket and make sure it looks like this.
-
Go to Analytics & AI, then under Data Lake, click Data Flow, select the left menu Private endpoints and click Create private endpoint.
-
The private endpoint is needed since we are using PRIVATE subnet for the container instances and OCI Streaming Pool.
-
Make sure you fill the DNS Zones with the Internal FQDN from the OCI Container Instances and the OCI Streaming Pool with comma delimited.
-
-
Go to Analytics & AI, then under Data Lake, click Data Flow, and then click Create application.
-
Once created, select the spark-lab-avro dataflow, and then click Run to start the program, it usually takes up to 8 min to start.
-
-
Check the Running dataflow application and open the SparkUI that will show the current jobs and the app is working.
Task 16: Validate the flow
Call the function and pass a message to check all the flow is working as expected.
-
Open your Linux shell terminal where you created the sample messages type_a_message.bin and type_b_message.bin and send the message. Replace the api URL with the correct endpoint you’ve got from your API Gateway creation.
cd create_avro_sample/ ls -lrt curl -X POST -H "Content-Type: application/octet-stream" \ -d "$(echo -n -e "$(cat type_a_message.bin)")" \ https://xxxxxxxxxxxxx.apigateway.sa-saopaulo-1.oci.customer-oci.com/
-
Let’s check if the API type A has been called by checking the logs on container instance.
You can repeat the process and send a type_b_message.bin file, and it will call the type B container instance.
Related Links
Acknowledgments
- Author - Joao Tarla (Oracle LAD A-Team Solution Engineer)
More Learning Resources
Explore other labs on docs.oracle.com/learn or access more free learning content on the Oracle Learning YouTube channel. Additionally, visit education.oracle.com/learning-explorer to become an Oracle Learning Explorer.
For product documentation, visit Oracle Help Center.
Stream AVRO Messages with Oracle Cloud Infrastructure Streaming and OCI Data Flow with Micro-Batch processing
F86501-01
September 2023
Copyright © 2023, Oracle and/or its affiliates.