Set up Data Flow to process your on-premises logs in Oracle Cloud Infrastructure

Introduction

In this tutorial you provision the cloud infrastructure and deploy the PySpark code for automated uploading, processing, and storing of on-premises log data in the cloud. The data is staged in Object Storage, processed in Data Flow, and stored in Autonomous Data Warehouse for analysis.

The following diagram shows the architecture:

You create the PySpark app and then provision Oracle Cloud Infrastructure (OCI) using a Terraform script that’s designed for this purpose. You download the script from Oracle’s GitHub repository and make minor edits to update some variable values. You then apply the Terraform script.

Objectives

When you complete this tutorial, you’ll have a working system that ingests data from Object Storage, processes it in Data Flow using a PySpark app, and stores the results in Autonomous Data Warehouse.

The PySpark app that’s provided here demonstrates how to authenticate and secure your workloads, and how to get it ready for inserting into the database. You’ll need to customize it to meet your specific data processing needs.

Prerequisites

Before starting this tutorial, you must complete the Connect Data Flow PySpark apps to Autonomous Database in Oracle Cloud Infrastructure tutorial. That tutorial shows you how to create a dependency archive that contains the Python and Java libraries that are required for this tutorial. Specifically, you need the archive.zip file from that tutorial.

To complete this tutorial, you must have access to a tenancy on Oracle Cloud Infrastructure with permissions to create the artifacts that are shown in the architectural diagram.

In this tutorial, you download the Terraform code that creates the artifacts as described in the architecture diagram. The code is in a repository on GitHub. You can run the Terraform code on your local development machine, or you can run it from the Cloud Shell in Oracle Cloud Infrastructure. The Cloud Shell has Terraform and Git installed.

If you’re using your local machine, the following prerequisites apply:

ADW connection details

To connect to an ADW database, you must pass in a wallet as part of the connection parameters. The wallet contains credentials and other details that allow the bearer to connect to the database. The file name of the wallet is generally in the form of wallet_<database-name>.zip but might have a different name. The wallet gives you a connection to the database, but your database permissions are determined by your database user name and password.

The Terraform that accompanies this tutorial creates an Always Free Autonomous Data Warehouse database called logs. It stores the wallet for the database in two places. One is inside a bucket in Object Storage called Wallet, and the other is in the directory on the computer that you ran the Terraform code in. In both cases, the name of the wallet file is wallet_logs.zip.

Terraform generates the ADMIN password when it creates the database and displays it to you when it completes.

Prepare your Oracle Cloud Infrastructure environment

Before creating the PySpark app and running Terraform, you need to do some preliminary configuration of your OCI tenancy.

You need to create a compartment and a user group that is dedicated to your Data Flow app. This allows you to isolate Data Flow from other users in the tenancy.

Terraform can create the compartment and the groups, but in this tutorial you’ll create them yourself. This is to give you practice navigating and using the OCI console.

Create a compartment

Create a compartment that is used solely for containing the artifacts, configuration, and other resources that are related to the pipeline.

  1. Log in to the Oracle Cloud Infrastructure Console as a user that has Administrator privileges.
  2. In the Console navigation menu, select Identity then click Compartment.
  3. Click Create Compartment.
  4. Enter Dataflow as the name of the new compartment, enter a suitable description, and make sure that Parent Compartment is your root compartment.
  5. Click Create Compartment.
  6. After the compartment is created, make a record of its OCID. You will need the OCID later when you when you modify the terraform.tfvars file.

Create groups

You need to create two groups. One group contains users who can manage Data Flow, and the other group contains users who can use Data Flow but are not allowed to manage it.

  1. Create a Group.
    1. In the Console navigation menu, select Identity and click Groups.
    2. Click Create Group.
    3. Enter Dataflow_Admin_Group as the name of the Group, enter a suitable description, and then click Create.
  2. Repeat the previous step to create another group called Dataflow_User_Group.
  3. Add your user to Dataflow_Admin_Group. Click Dataflow_Admin_Group and then click Add User to Group and select your user from the list.

Record your Object Storage namespace

  1. Click your user profile icon, and in the menu that opens click your tenancy name.
  2. In the page that opens, locate the value for Object Storage Namespace and make a copy of it for use later.

Generate API keys

Terraform uses the OCI API to create and manage infrastructure in OCI. To do that, it must have the public key and fingerprint of an API key pair.

To generate your own public/private key pair:

  1. Click your user profile icon, and in the menu that opens click User Settings.
  2. On you User Details page, look in the Resources section and click API Keys.
  3. Click Add API Key.
  4. In the Add API Key panel that opens, click Download Private Key.
  5. In the Configuration File Preview, copy the contents of the text box and save them in a convenient location. You’ll need this information later when setting up Terraform.
  6. Click Close.
  7. Rename the private key file that was downloaded. Use a simple name such as oci_api_key.pem.

Add the database admin password to Vault

You must create a vault secret for the database ADMIN password and record the OCID. You need this for the PASSWORD_SECRET_OCID constant in the PySpark app that you create later.

For now, enter a fake password into the vault secret. This is because you don’t yet know what the password is. You won’t know the password until after the Terraform completes. But you need the OCID of the secret for the PySpark app that the Terraform uploads to Object Storage. So first you create a secret, record its OCID, then after the Terraform completes, you update the secret with the real password.

  1. Go to Security and click Vault.
  2. In the List Scope section, make sure you’re in the Dataflow compartment.
  3. Click Create Vault.
  4. In the panel that opens, enter Dataflow in the Name field.
  5. Click Create Vault.
  6. When the vault state becomes Active, click Dataflow to open the Vault Details page.
  7. In the Master Encryption section, click Create Key.
  8. In the Create Key panel, enter Dataflow in the Name field.
  9. Click Create Key.
  10. In the Resources section, click Secrets.
  11. Click Create Secret.
  12. In the Create Secret dialog box, choose a compartment from the Create in Compartment list.
  13. In the Name field, enter a name to identify the secret. Avoid entering confidential information.
  14. In the Description field, enter a brief description of the secret to help identify it. Avoid entering any confidential information.
  15. In the Encryption Key field, select Dataflow.
  16. In the Secret Type Template field, select Plain-Text.
  17. In the Secret Contents field, enter a fake password.
  18. Click Create Secret.
  19. When the panel closes, click the Dataflow secret to open the details page and copy the OCID.

Create the PySpark app

The PySpark app creates a Spark session that reads the log data from Object Storage, transforms it into a dataframe, and then stores the dataframe in a table in ADW.

Before creating the Spark session, you need to make sure some important modules are imported, and set some constants for use later in the app.

  1. Use the following statements to import the relevant modules.

     import os
     import oci
     import base64
     import zipfile
     from urllib.parse import urlparse
    
     from pyspark import SparkConf
     from pyspark.sql import SparkSession
     from pyspark.sql.functions import regexp_extract
    
  2. These are the constants that you need to set in the main() function.

    Make sure you enter your own values for OBJECT_STORAGE_NAMESPACE and PASSWORD_SECRET_OCID.

     OBJECT_STORAGE_NAMESPACE = "YOUR-OBJECT-STORAGE-NAMESPACE"
     INPUT_PATH = "oci://data@{}/sample_logs.log".format(OBJECT_STORAGE_NAMESPACE)
     DATABASE_NAME = "logs"
     PASSWORD_SECRET_OCID = "ocid1.vaultsecret..... "
     WALLET_PATH = "oci://Wallet@{}/wallet_{}.zip".format(OBJECT_STORAGE_NAMESPACE,DATABASE_NAME)
     TNS_NAME = "{}_high".format(DATABASE_NAME)
     USER="ADMIN"
     TARGET_DATABASE_TABLE="processed_logs"
    
  3. Set up the Spark session and load the data from Object Storage. The data is in the form of a flat text file, which you load into a Spark dataframe.

     spark_session = SparkSession.builder.appName("Dataflow").getOrCreate()
     input_df = spark_session.read.text(INPUT_PATH)
     input_df.show(5, truncate=False) # Some output for the Data Flow log file
    

    The dataframe should look similar to the following sample:

     +----------------------------------------------------------------------------------+
     |value                                                                             |
     +----------------------------------------------------------------------------------+
     |10.0.0.1 - user1 [10/Mar/2021:13:55:36 -0700] "GET /index.html HTTP/1.0" 200 2326 |
     |10.0.0.2 - user2 [10/Mar/2021:14:55:36 -0700] "GET /file1.html HTTP/1.0" 200 9889 |
     |10.0.0.3 - user3 [10/Mar/2021:14:55:37 -0700] "GET /file2.html HTTP/1.0" 200 4242 |
     |10.0.0.4 - user4 [10/Mar/2021:14:56:36 -0700] "GET /file3.html HTTP/1.0" 200 10267|
     |10.0.0.1 - user1 [10/Mar/2021:15:05:36 -0700] "GET /file4.html HTTP/1.0" 200 15678|
     +----------------------------------------------------------------------------------+
    

    As you can see, the data consists of a single column called value that contains each log entry as a single string. Before you can transfer the data to the database, the dataframe must have several columns, one for each field in the log file.

  4. Split the dataframe into columns.

    In the following example, the user-identifier field (usually “-“) is not included in the new dataframe.

     hostname = r'(^\S+\.[\S+\.]+\S+)\s'
     user = r'\s+.+\s+(.+)\s+\['
     timestamp = r'\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]'
     method_uri_protocol = r'\"(\S+)\s(\S+)\s*(\S*)\"'
     status = r'\s(\d{3})\s'
     content_size = r'\s(\d+)$'
    
     logdata_df = input_df.select(
         regexp_extract('value', hostname, 1).alias('hostname'),
         regexp_extract('value', user, 1).alias('user'),
         regexp_extract('value', timestamp, 1).alias('timestamp'),
         regexp_extract('value', method_uri_protocol, 1).alias('method'),
         regexp_extract('value', method_uri_protocol, 2).alias('URI'),
         regexp_extract('value', method_uri_protocol, 3).alias('protocol'),
         regexp_extract('value', status, 1).cast('integer').alias('status'),
         regexp_extract('value', content_size, 1).cast('integer').alias('content_size'))
    

    This is also the place where you can apply one or more machine learning models. For example, you might want to run an intrusion detection workload before storing the logs in the database.

  5. Retrieve the credentials for your Autonomous Data Warehouse database.

    The Terraform code that accompanies this tutorial creates the Autonomus Data Warehouse and stores the wallet file in Object Storage. The PySpark app needs to retrieve the wallet and use its information for connecting to ADW.

     # Get an Object Store client
     token_key = "spark.hadoop.fs.oci.client.auth.delegationTokenPath"
     token_path = spark_session.sparkContext.getConf().get(token_key)
     with open(token_path) as fd:
         delegation_token = fd.read()
     signer = oci.auth.signers.InstancePrincipalsDelegationTokenSigner(
         delegation_token=delegation_token)
     object_store_client = oci.object_storage.ObjectStorageClient(config={}, signer=signer)
        
     # Extract the wallet file location info
     split_url = urlparse(WALLET_PATH)
     bucket_name, namespace = split_url.netloc.split("@")
     file_name = split_url.path[1:]
    
     # Get the wallet from Object Storage.
     # The response contains the wallet and some metadata
     response = object_store_client.get_object(namespace, bucket_name, file_name)
    
     # Extract the wallet from response and store it in the Spark work-dir
     wallet_path = "/opt/spark/work-dir/"
     zip_file_path = os.path.join(wallet_path, "temp.zip")
     with open(zip_file_path, "wb") as fd:
         for chunk in response.data.raw.stream(1024 * 1024, decode_content=False):
             fd.write(chunk)
     with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
         zip_ref.extractall(wallet_path)
        
     # Extract the wallet contents and add the files to the Spark context
     contents = "cwallet.sso ewallet.p12 keystore.jks ojdbc.properties sqlnet.ora tnsnames.ora truststore.jks".split()
     for file in contents:
         spark.sparkContext.addFile(os.path.join(wallet_path, file))
    
    
  6. Store the data in your Autonomous Data Warehouse database.

    The following snippet shows how to connect to ADW and write the dataframe to the target table. If the table doesn’t exist, it’s created. If the table exists, it’s dropped and recreated, due to the “overwrite” option in the mode parameter.

    The password is retrieved from Oracle Cloud Infrastructure Vault. You place the password in the vault after you run the Terraform code. Placing the password in Vault is covered later in the steps after you run Terraform.

     adw_url = "jdbc:oracle:thin:@{}?TNS_ADMIN={}".format(TNS_NAME, wallet_path)
    
     # Retrieve the database password for user 'ADMIN' from OCI Vault
     # The password is stored as base64 text, so it must be decoded
     with open(token_path) as fd:
         delegation_token = fd.read()
     signer = oci.auth.signers.InstancePrincipalsDelegationTokenSigner(
         delegation_token=delegation_token)
     secrets_client = oci.secrets.SecretsClient(config={}, signer=signer)
     response = secrets_client.get_secret_bundle(PASSWORD_SECRET_OCID)
    
     base64_password = response.data.secret_bundle_content.content
     base64_secret_bytes = base64_password.encode("ascii")
     base64_message_bytes = base64.b64decode(base64_secret_bytes)
     password = base64_message_bytes.decode("ascii")
    
     # Set up some properties for connecting to the database
     properties = {
         "driver": "oracle.jdbc.driver.OracleDriver",
         "oracle.net.tns_admin": TNS_NAME,
         "password": password,
         "user": USER,
     }
    
     # Write the dataframe to the database
     adw_url = "jdbc:oracle:thin:@{}?TNS_ADMIN={}".format(TNS_NAME, wallet_path)
     logdata_df.write.jdbc(url=adw_url, table=TARGET_DATABASE_TABLE, 
         mode="Overwrite", properties=properties)
    

Save the PySpark app with the name dataflow-app.py.

Download and apply the Terraform code

The Terraform code is available on the Oracle quickstart repository on GitHub. The code provisions the components that are shown in the architecture diagram.

To download and apply the code:

  1. Change to a convenient directory and clone the repository.

    git clone https://github.com/oracle-quickstart/oci-arch-dataflow-store-analyze-data

  2. Copy the dependency file archive.zip into the oci-arch-dataflow-store-analyze-data directory.

    Note: If you don’t have the archive.zip file, see the Prerequisites section to see how to obtain it.

  3. Change into the oci-arch-dataflow-store-analyze-data directory.

    cd oci-arch-dataflow-store-analyze-data

  4. Locate the PySpark app file dataflow-app.py, which is a stub file, and replace it with the one that you wrote.

  5. Open the terraform.tfvars file in a text editor.

  6. Update the value for object_storage_namespace in terraform.tfvars. Use the value that you recorded earlier.

  7. Modify either env-vars.bat or env-vars.sh depending on the operating system of the computer that you’re using. These scripts are used for setting environment variables that Terraform uses.

    On Windows, the env-vars.bat file should be similar to the following sample. Make sure you paste in the values for your own tenancy.

    @echo off
    set TF_VAR_tenancy_ocid=ocid1.tenancy.oc1..
    set TF_VAR_user_ocid=ocid1.user.oc1..
    set TF_VAR_compartment_ocid=ocid1.compartment.oc1..
    set TF_VAR_private_key_path=%HOMEPATH%\.ssh\oci_api_key.pem
    set TF_VAR_fingerprint=2a:b4:ef:9c:f1...
    set TF_VAR_region=ca-toronto-1
    

    On Linux and Mac, the env-vars.sh file should be similar to the following sample. Make sure you paste in the values for your own tenancy.

    export TF_VAR_tenancy_ocid=ocid1.tenancy.oc1..
    export TF_VAR_user_ocid=ocid1.user.oc1..
    export TF_VAR_compartment_ocid=ocid1.compartment.oc1..
    export TF_VAR_private_key_path=~/.ssh/oci_api_key.pem
    export TF_VAR_fingerprint=2a:b4:ef:9c:f1...
    export TF_VAR_region=ca-toronto-1
    

    After you update the files with your own tenancy and user information, make sure the environment variables are set.

    On Windows, run the following command:

    env-vars.bat
    

    On Linux or Mac, run the following command:

    source env-vars.sh
    
  8. In a terminal, run the init command

    terraform init

  9. Provision the resources to Oracle Infrastructure Cloud.

    terraform apply

    When complete, the Terraform app displays the ADMIN password for the database that it created.

  10. Copy the password that the Terraform app displays.

  11. Verify that the resources were provisioned.

    1. Go to your OCI console and open the navigation menu.
    2. Scroll down to the Governance and Administration section, expand Governance then click Tenancy Explorer.
    3. Select the Dataflow compartment that you created earlier.

    After a few seconds, the resources are displayed in a table that you can filter and sort.

Update the ADMIN password in the Vault

Previously, you added a fake password to OCI Vault in order to obtain its OCID for use in the PySpark app that runs in Data Flow. Now you need to update that password to the real value. The real value was displayed in your terminal when the Terraform apply command completed.

  1. In the Console navigation menu, select Security and click Vault.
  2. Click the name of the vault that you created earlier.
  3. In the Vault Details page that opens, look in the Resources section and click Secrets.
  4. Click the name of the secret that you created earlier.
  5. Click Create Secret Version.
  6. Make sure the Secret Type Template is set to Plain Text and paste the database ADMIN password into the Secret Contents box.
  7. Click Create Secret Version.

Run the Data Flow application

It’s time now to run the PySpark app.

  1. In the Console navigation menu, select Data Flow and click Applications.
  2. Click Analyze On-prem Logs to open the Application Details page.
  3. Click Run to open the Run Python Application panel.
  4. Click Run.

The Data Flow Runs page opens where you can monitor the progress of the run. It takes a few minutes to allocate the resources and spin everything up. When the run completes, click the name of the run to open the Run Details page.

In the Run Details page, you can download and examine the logs that were produced. If the run failed for any reason, you can examine the logs to determine the source of the problem.

Acknowledgements

Learn More

Explore other labs on docs.oracle.com/learn or access more free learning content on the Oracle Learning YouTube channel. Additionally, visit education.oracle.com/learning-explorer to become an Oracle Learning Explorer.

For product documentation, visit Oracle Help Center.