Note:

Stream AVRO Messages using Oracle Cloud Infrastructure Streaming and OCI Data Flow with Micro-Batch processing

Introduction

In today’s data-driven landscape, the ability to process and analyze real-time data streams is crucial for businesses aiming to gain insights and respond rapidly to changing conditions. Streaming data processing technologies have emerged as potent solutions for handling high-volume, continuous data streams. In this tutorial, we will explore an innovative approach to efficiently stream AVRO messages using Oracle Cloud Infrastructure (OCI) Streaming, combined with the micro-batch processing technique, and enhanced with the serverless capabilities of Oracle Functions based on the open-source FN Project.

Introduction to AVRO and Streaming Data

AVRO, a widely adopted data serialization format, is known for its efficiency in representing intricate data structures and its compatibility with various programming languages. When integrated with streaming technologies, AVRO empowers organizations to transmit and process data nearly in real-time, enabling them to extract valuable insights without the latency typically associated with batch processing.

OCI Streaming: Empowering Real-Time Data

Oracle Cloud Infrastructure (OCI) has an array of tools for handling data in the cloud, with OCI Streaming being one such service tailored for high-throughput, real-time data streams. By leveraging OCI Streaming, developers can construct scalable and dependable data pipelines that efficiently ingest, process, and distribute data streams.

OCI Data Flow Managed Spark: A No Lock-In Solution

Oracle Cloud Infrastructure (OCI) Data Flow is a fully managed Apache Spark service that performs processing tasks on extremely large datasets without infrastructure to deploy or manage.

Micro-batch processing Micro-batch processing involves breaking down incoming data streams into compact batches using time or size as criteria. These batches are then processed as smaller jobs. Unlike the constant and individual handling of records in stream processing, micro-batch processing introduces a bit of delay and storage before processing, that allows more control on what to do with the data. Unlike traditional batch processing that tackles big data sets at intervals, micro-batch processing provides nearly real-time processing and delivery of outcomes.

Unlocking Synergy: OCI Streaming, OCI Data Flow, and Oracle Functions

This tutorial delves into the fusion of OCI Streaming, OCI Data Flow managed Spark Streaming, and Oracle Functions. We’ll guide you through the process of setting up an end-to-end streaming data pipeline that ingests AVRO-encoded messages, efficiently processes them using OCI Data Flow managed Spark micro-batch processing capabilities, and introduces serverless event-driven processing with Oracle Functions.

Objective

Use OCI Streaming and OCI Data Flow managed Spark micro-batch processing to create an efficient, real-time data processing pipeline using AVRO format.

IMPORTANT: This tutorial is designed solely for educational and study purposes. It provides an environment for learners to experiment and gain practical experience in a controlled setting. It is crucial to note that the security configurations and practices employed in this lab might not be suitable for real-world scenarios.

Security considerations for real-world applications are often far more complex and dynamic. Therefore, before implementing any of the techniques or configurations demonstrated here in a production environment, it is essential to conduct a comprehensive security assessment and review. This review should encompass all aspects of security, including access control, encryption, monitoring, and compliance, to ensure that the system aligns with the organization’s security policies and standards.

Security should always be a top priority when transitioning from a lab environment to a real-world deployment.

Process flow
T0_1

High level architecture
T0_1

Prerequisites - Oracle Cloud Infrastructure

Prerequisites - Local machine environment

Task 1: Set up Dynamic Groups

  1. Go to your domain, click Dynamic Groups and create the following groups.

    Group Name: MyFunctions

    ALL {resource.type = 'fnfunc', resource.compartment.id = 'pasteYourCompartmentID'}
    

    Group Name: ContainerIntances

    ALL {resource.type='compute-container-instances',  resource.compartment.id = 'pasteYourCompartmentID'}
    

    Group Name: DataFlowDynamicGroup

    ALL {resource.type='dataflowrun', resource.compartment.id = 'pasteYourCompartmentID'}
    

Task 2: Create Policies

Task 3: Create storage buckets and upload AVRO schema

  1. Go to Buckets, click Create Bucket and create a new bucket named avro-schema-bucket to store the AVRO schema file.

    T3_1

  2. Now, select your bucket and TAKE NOTE of the Namespace, we will need it later.

    T3_1

  3. Upload the file user.asvc into this created bucket.

    T3_1

Task 4: Create the Private OCI Streaming Topic

  1. Go to Analytics & AI, then click Streaming and create a new stream called FrontDoorTopic.

    T4_0

  2. Select Stream Pools, click PrivatePool, then click on the Kafka Connection Settings option and TAKE NOTE of the fields, we will need it later.

    T4_0

Task 5: Create the AUTH TOKEN

Create the AUTH TOKEN for your user, this is required to work with the Kafka Topic

  1. Click on the top right user icon and then select the User Settings option.

  2. Click Auth Tokens and then generate a new token and TAKE NOTE of your token.

    T4_1

Task 6: Create container registry

  1. Go to the Developer Services menu, click Container registry and create the following private repositories.

    Repository Name Type
    api-avro-sample_a Private
    api-avro-sample_b Private
    fn-recep-avro Private
  2. Check the repositories and TAKE NOTE of the Namespace.

    T6_1

  3. Open the terminal shell where you have OCI CLI and Docker installed, and proceed with the login on the registry. Check what is the correct URL for your REGION. In this tutorial, we’re using Brazil East (Sao Paulo) where the registry url is gru.ocir.io.

    docker login gru.ocir.io
    Username: <your container namespace>/youruser
    Password: YOUR_AUTH_TOKEN_CREATED_EARLIER
    

    T6_1

Task 7: Create OCI Vault

Create the OCI Vault and provide the needed variables that will be used later in this tutorial.

  1. Go to Identify & Security, click Vault and then click Create Vault.

    T7_1new

  2. Select the new vault, and create Master Encryption Keys for it.

    T7_1new

  3. Create a new secret called AUTH_KEY and paste the auth key that you created earlier.

    T7_1new

  4. Repeat the secret creation process and create the following new secrets:

    Variable Name Value
    KAFKA_BOOTSTRAPSERVER “The boostrap server from your OCI Streaming configuration”
    KAFKA_TOPIC “FrontDoorTopic”
    KAFKA_USERNAME “Your username + streampool ID from your OCI Streaming configuration”
    AUTH_KEY “Your AUTH Token you’ve created on earlier steps”
  5. Take note of the Secret OCID that was created for each secret and create a new config file.

    • config.properties file contains the mapping variables from application to the vault secrets ocids. The application will use this file to identify what vault secrets needs to be gathered during the runtime.

    • Create a new file on your local machine where you have access to OCI-CLI:
      Replace with your OCID for each Secreat
      File Name: config.properties

      kafka_bootstrapserver_vaultOCID=ocid1.vaultsecret.REPLACE-WITH-YOURS
      kafka_topic_vaultOCID=ocid1.vaultsecret.REPLACE-WITH-YOURSxxxxxx
      kafka_username_vaultOCID=ocid1.vaultsecret.REPLACE-WITH-YOURS
      auth_token_vaultOCID=ocid1.vaultsecret.oc1.REPLACE-WITH-YOURS
      
  6. Go to Buckets, click Create Bucket and create a new bucket named config to store the config.properties file.

  7. Upload the config.properties file to the storage bucket config

    ls -lrt config.properties
    oci os object put -bn config --file config.properties --force
    

    T7_1new

Task 8: Create a simple AVRO message and save it to a file using the sample python code provided

  1. Open the shell terminal where you have OCI CLI, Docker and Python3 installed and create a new AVRO file that contains a single message based on the same AVRO schema we uploaded in object storage earlier.

    Note: You must check the Python version, we are using Python 3.9.16 in this tutorial, earlier versions may not work.

  2. Get the code from here Create_avro_sample.zip.

  3. Unzip it to the location you choose and then run the program to generate a sample AVRO message:

    cd ~
    mkdir create_avro_sample
    cd create_avro_sample
    unzip CreateAVRO_SampleFile.zip
    # Check the files are there
    ls -lrt
    # install the python dependencies on requirements.txt
    pip3 install -r requirements.txt
    # Run the program and create an AVRO message file
    python3 create_avro_sample.py meu_file.bin '{"id":10029,"name":"John","email":"john@bla.com"}'
    

    T8_1 T8_1 T8_1

Task 9: Create the OCI Function to receive the AVRO message and publish to the OCI Streaming topic

  1. Go to Developer Services, under Functions, click Applications and then click Create application.

    T9_1

  2. Go to your terminal shell where you have Docker, OCI CLI, FN CLI installed and run the following commands to initialize the function.

    Note: If you followed the steps, your Docker login command has already been executed by now, if not, proceed with the Docker login step on the Create container registry task.

    fn create context oci-cloud --provider oracle
    fn use context oci-cloud
    fn update context oracle.compartment-id PASTE_YOUR_COMPARTMENT_OCID
    fn update context api-url https://functions.sa-saopaulo-1.oraclecloud.com
    fn update context registry gru.ocir.io/PASTE_YOUR_REGISTRY_NAMESPACE
    

    Note: In this tutorial, we are using Brazil East(Sao Paulo) region, if you’re using a different region, you need to change the api-url and registry locations.

    T9_1

  3. Create a simple Hello-world function to make sure all your settings are correct.

    fn init --runtime python fn-recep-avro
    cd fn-recep-avro
    fn deploy --app MyReceptionApp
    fn invoke MyReceptionApp fn-recep-avro
    

    T9_1

  4. Get the AVRO function sample code in file fn-recep-avro.zip and replace the hello-world code we created earlier. You must get both files func.py and requirements.txt in order to work.

    # Check you have the right code for func.py & requirements.txt (you got from zip file)
    ls -lrt
    

    T9_1

  5. Build the new code and deploy the function

    fn deploy --app MyReceptionApp
    

    T9_1

  6. To invoke the function, we need to pass an AVRO message as parameter, for that, we will use the created sample AVRO message file from the earlier steps. The first time a function is invoked it takes a little longer since it needs to be started.

    # Check where you created the sample avro message file
    ls -lrt ../create_avro_sample/
    
    # Invoke the function to check if it's working as expected
    echo -n -e "$(cat ../create_avro_sample/meu_file.bin)" | fn invoke MyReceptionApp fn-recep-avro
    

    T9_1

Task 10: Create API Gateway to expose the function

  1. In your console, click Developer Services, then under API Management, click Gateways and then click Create Gateway.

    T10_1

  2. Once created, click on the Deployments option and then click Create deployment.

    Name: RecepFunction
    Path prefix: /

    • In Authentication, choose No Authentication since this is a simple lab and there is no API authentication implemented. The main objective here is to demonstrate a HTTPS call passing a Binary AVRO message via API, and for the purpose of this lab, we will not implement any authentication method for this simple lab.
    • Before moving forward to a real life environment, make sure you follow the security best practices for API Gateway.
    • For more details, see Securing API Gateways and Resources.

    Route 1: Path: /

    Methos: POST
    Backend Type: Oracle functions
    Application: Select your function

    T9_1

    T9_1

    T9_1

    T9_1

  3. Check for the API gateway endpoint and take note.

    T9_1

  4. Open your Linux shell terminal and call the API Gateway. Replace the api URL with the correct endpoint you’ve got on the earlier step.

    cd create_avro_sample/
    ls -lrt
    curl -X POST -H "Content-Type: application/octet-stream" \
         -d "$(echo -n -e "$(cat meu_file.bin)")" \
         https://xxxxxxxxxxxxx.apigateway.sa-saopaulo-1.oci.customer-oci.com/
    

    T9_1

Checkpoint

T9_1

Task 11: Build container image for API type A

Note: The APIs type A and B code are basically the same with only a different header message in order to simulate two different APIs.

  1. Get the code from API type A and unzip it in your Linux shell terminal api-avro-sample_a.zip.

  2. Get your container registry namespace that you got on the earlier steps and create the application registry location, following the pattern below. The ocir url is based on your region, i.e. gru.ocir.io for Brasil East(SaoPaulo)

    [ocir url]/[your namespace]/api-avro-sample_a:latest

  3. In your Linux shell terminal, build and push the docker image for this API.

    ls -lrt
    docker build . -t gru.ocir.io/yournamespace/api-avro-sample_a:latest
    docker push gru.ocir.io/yournamespace/api-avro-sample_a:latest
    

    T10_1 T10_1

Task 12: Build container image for API type B

  1. Get the code from API type B and unzip it in your Linux shell terminal api-avro-sample_b.zip.

  2. Get your container registry namespace that you got on the earlier steps and create the application registry location, following the pattern below. The ocir url is based on your region, i.e. gru.ocir.io for Brasil East(SaoPaulo)

    [ocir url]/[your namespace]/api-avro-sample_b:latest

  3. In your Linux shell terminal, build and push the docker image for this API.

    ls -lrt
    docker build . -t gru.ocir.io/yournamespace/api-avro-sample_b:latest
    docker push gru.ocir.io/yournamespace/api-avro-sample_b:latest
    

    T10_1

    T10_1

  4. Check on your container registry page if the image has been successfully pushed.

    T10_1

Task 13: Deploy the APIs in container service

  1. Go to Developer Services, Container Instances and click Create container instance.

    T13_1

    T13_1

  2. Repeat Step 1 for api-type-b and select the correct image for the TYPE B API.

    1. Go to Developer Services, Container Instances and click Create container instance and repeat the steps to deploy the API type B

    2. Get the Internal FQDN address from your container instances.

      T14_1

      • Click on the container instance, and take note of each Internal FQDN address.

      T14_1

    3. Go to Identify & Secutiry, click Vault, select your VAULT and create two new secrets.

      Secret Name Value
      API_TYPE_A_URL paste the Internal FQDN Private Address for API Type A
      API_TYPE_B_URL paste the Internal FQDN Private Address for API Type B

      Take note of each secret OCID

      Your vault should look like this now:

      T14_1

    4. Edit the config.properties file you uploaded to the config storage bucket and add new entries for the secret OCID

      ls -lrt config.properties
      vi config.properties
      api_type_a_url_vaultOCID=paste_API_TYPE_A_URL_secretOCID
      api_type_b_url_vaultOCID=paste_API_TYPE_B_URL_secretOCID
      
      # After save, please upload the new version to Object Storage
      cat config.properties
      oci os object put -bn config --file config.properties --force
      

      Your file should look like this:
      T14_1

      T14_1

Task 14: Test the APIs using create_avro_sample.py

  1. Go to your Linux shell terminal where you saved the create_avro_sample.py from Task 7 and create some new messages to test API calls. We are creating two new AVRO files with different IDs (1010 an 1020) that we will use as filter inside the Spark Stream (DataFlow) program.

    ls -lrt
    python3 create_avro_sample.py type_a_message.bin '{"id":1010,"name":"Paul that goes to API type A","email":"paul@bla.com"}'
    
    python3 create_avro_sample.py type_b_message.bin '{"id":1020,"name":"Mary that goes to API type B","email":"mary@bla.com"}'
    
    

    T14_1

  2. Call the APIs passing the AVRO message to test is working fine. Go to your Container Instances page and get the Internal FQDN address for each of the APIs api-type-a and api-type-b. Remember to replace the URL below for the corresponding Internal FQDN address from your APIs.

    ls -lrt type*
    
    curl -i -X POST -H "Content-Type: application/octet-stream" \
       --data-binary "@type_a_message.bin" \
       xxx.xx.x.xxx
    
    curl -i -X POST -H "Content-Type: application/octet-stream" \
       --data-binary "@type_b_message.bin" \
       xxx.xxx.xx.xxx
    
    

    T14_1

Task 15: Set up the Java Spark streaming application

  1. Go to Buckets, click Create Bucket and create two new buckets called dataflow-app-avro and dataflow-logs-avro, this will be used to upload your java application.

  2. Double check your java environment versions.

    Java

    java 11.0.8 2020-07-14 LTS
    Java(TM) SE Runtime Environment 18.9 (build 11.0.8+10-LTS)
    Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.8+10-LTS, mixed mode)
    

    Maven

    Apache Maven 3.5.4 (Red Hat 3.5.4-5)
    Maven home: /usr/share/maven
    Java version: 11.0.20, vendor: Red Hat, Inc., runtime: /usr/lib/jvm/java-11-openjdk-11.0.20.0.8-3.0.1.el8.x86_64
    Default locale: en_US, platform encoding: ANSI_X3.4-1968
    OS name: "linux", version: "5.15.0-103.114.4.el8uek.x86_64", arch: "amd64", family: "unix"
    
  3. Download the sample code and unzip it on your local environment that has oci-cli, docker, java and maven: spark-consume-avro-message.zip.

    unzip spark-consume-avro-message.zip
    cd spark-consume-avro-message
    ls -lrt
    

    T15_1 T15_1

    A little dive into the proxy code to call container instances types A and B.

    Check main program file .src/main/java/example/Example.java…. T15_1

  4. Since this Java program uses a lib to handle the spark-avro, we need to package the depedency to pass it to dataflow. For that, we will use Data Flow Dependency Packager, if you need more details you can go to Data Flow Dependency Packager.

    The package org.apache.spark:spark-avro_2.12:3.2.1 is already declared on the packages.txt file, you just need to package it by running:

    docker run --privileged --platform linux/amd64 --rm -v $(pwd):/opt/dataflow  --pull always -it phx.ocir.io/oracle/dataflow/dependency-packager:latest -p 3.8
    

    T15_1 T15_1 T15_1

  5. Upload the archive.zip file into the storage bucket called dataflow-app-avro, by using the oci-cli.

    oci os object put -bn dataflow-app-avro --file archive.zip --force
    
  6. Compile, package the java application and upload it to the storage bucket dataflow-app-avro

    ls -lrt
    mvn clean install
    

    T15_1
    …reduced number of lines of the compiling log… T15_1

    # upload the JAR file to the storage bucket
    oci os object put -bn dataflow-app-avro --file target/consumekafka-1.0-SNAPSHOT.jar --force
    

    T15_1

  7. Check your current dataflow-app-avro storage bucket and make sure it looks like this.

    T15_1

  8. Go to Analytics & AI, then under Data Lake, click Data Flow, select the left menu Private endpoints and click Create private endpoint.

    • The private endpoint is needed since we are using PRIVATE subnet for the container instances and OCI Streaming Pool.

    • Make sure you fill the DNS Zones with the Internal FQDN from the OCI Container Instances and the OCI Streaming Pool with comma delimited.

      T15_1

  9. Go to Analytics & AI, then under Data Lake, click Data Flow, and then click Create application.

    T15_1
    T15_1
    T15_1
    T15_1
    T15_1
    T15_1

    • Once created, select the spark-lab-avro dataflow, and then click Run to start the program, it usually takes up to 8 min to start.

      T15_1
      T15_1

  10. Check the Running dataflow application and open the SparkUI that will show the current jobs and the app is working.

    T15_1

    T15_1

    T15_1

Task 16: Validate the flow

Call the function and pass a message to check all the flow is working as expected.

  1. Open your Linux shell terminal where you created the sample messages type_a_message.bin and type_b_message.bin and send the message. Replace the api URL with the correct endpoint you’ve got from your API Gateway creation.

    cd create_avro_sample/
    ls -lrt
    curl -X POST -H "Content-Type: application/octet-stream" \
       -d "$(echo -n -e "$(cat type_a_message.bin)")" \
       https://xxxxxxxxxxxxx.apigateway.sa-saopaulo-1.oci.customer-oci.com/
    

    T16_1

  2. Let’s check if the API type A has been called by checking the logs on container instance.

    T16_1 T16_1

You can repeat the process and send a type_b_message.bin file, and it will call the type B container instance.

T9_1

Acknowledgments

More Learning Resources

Explore other labs on docs.oracle.com/learn or access more free learning content on the Oracle Learning YouTube channel. Additionally, visit education.oracle.com/learning-explorer to become an Oracle Learning Explorer.

For product documentation, visit Oracle Help Center.