Note:

Perform your analytics on OCI Data Flow using OCI Data Science interactive notebook

Introduction

Oracle Cloud Infrastructure (OCI) Data Flow is a fully managed big data service that lets you run Apache Spark applications at any scale with almost no administration. Spark has become the leading big data processing framework and OCI Data Flow is the easiest way to run Spark in Oracle Cloud because there’s nothing for developers to install or manage.

Objective

The purpose of this tutorial is to walk you through the setup required to access the OCI Data Flow sessions through the OCI Data Science notebook session. These sessions allow you to run interactive Spark workloads on a long lasting Data Flow cluster through an Apache Livy integration.

Also, once OCI Data Flow Spark session is created, we will go through some sample code for performing Spark operations on OCI Object Storage and Oracle Autonomous Data Warehouse.

Task 1: Set up the OCI Data Science notebook with OCI Data Flow

Create and set up OCI Data Flow Conda environment

  1. Create required buckets.

    a. Create a bucket named dataflow-logs in your tenancy.

    b. Create a bucket named dataflow-warehouse in your tenancy.

  2. Create a dynamic group in a specific compartment.

      ALL {resource.type='dataflowrun', resource.compartment.id='<compartment_id>'}
      ALL {resource.type='datasciencenotebooksession', resource.compartment.id='<compartment_id>'}
      Any {resource.type = 'datacatalogmetastore'}
    
    
  3. Create a policy to manage OCI resources from OCI Data Flow and OCI Data Science.

    • ALLOW DYNAMIC-GROUP <df-dynamic-group> TO MANAGE objects IN TENANCY WHERE ANY {target.bucket.name='<bucket_name>',target.bucket.name ='dataflow-logs,target.bucket.name='dataflow-warehouse'}

    • ALLOW DYNAMIC-GROUP '<ds-dynamic-group>' TO MANAGE dataflow-family in compartment '<your-compartment-name>'

    • ALLOW DYNAMIC-GROUP '<df-dynamic-group>' TO MANAGE data-catalog-metastores IN TENANCY

    • ALLOW DYNAMIC-GROUP '<dcat-hive-group>' TO READ buckets IN TENANCY

    • ALLOW DYNAMIC-GROUP '<dcat-hive-group>' TO MANAGE object-family IN TENANCY WHERE ANY { target.bucket.name = '<bucket_name>',target.bucket.name = '<managed-table-location-bucket>',target.bucket.name = '<external-table-location-bucket>'}

    • ALLOW DYNAMIC-GROUP '<ds-dynamic-group>' TO MANAGE objects IN TENANCY WHERE ALL {target.bucket.name='ds-conda-env'}

    • ALLOW DYNAMIC-GROUP '<df-dynamic-group>' TO MANAGE objects IN TENANCY WHERE ALL {target.bucket.name='ds-conda-env'}

  4. Create OCI Data Science project and session.

    OCI Data Science Notebook Session Details

  5. Open a new OCI Data Science session. From the File menu, select New Launcher and then click Terminal.

    OCI Data Science Session Details

  6. Install and activate the pyspark32_p38_cpu_v1 Conda environment from your terminal.

     odsc conda install -s pyspark32_p38_cpu_v1
     source activate /home/datascience/conda/pyspark32_p38_cpu_v1
    
    
  7. Once Conda is activated, go to the New Launcher tab and click Settings. Enter the required information about object storage where Conda package will be uploaded and save.

    Setup Conda Environment for DS Session

  8. Publish Conda environment.

    odsc conda publish -s pyspark3_2anddataflowv1_0

    Note: Publishing will take some time. Once it is completed, you can observe Conda package is uploaded on the Object Storage bucket.

Set up and create OCI Data Flow Spark session using Livy Service

Open Notebook using “PySpark and DataFlow” as kernel from new Launcher and execute the following commands to set up and create OCI Data Flow Spark session using Livy Service:

  1. Setup Authentication using ADS.

    import ads ads.set_auth("resource_principal") # Supported values: resource_principal, api_key

  2. Load Extension.

    %load_ext dataflow.magics

  3. Create OCI Data Flow Spark session using Livy service through OCI Data Science notebook.

     import json
     command = {
        "compartmentId": "ocid1.compartment.oc1..xxxxxxxxxxxxxx",
        "displayName": "Demo_DataFlow_Spark_v1",
        "sparkVersion": "3.2.1",
        "driverShape": "VM.Standard.E3.Flex",
        "executorShape": "VM.Standard.E3.Flex",
        "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "numExecutors": 1,
        "logsBucketUri": "<oci://bucket@namespace/>",
        "archiveUri": "<oci://bucket@namespace/archive.zip>"
        "configuration":{"spark.archives":"<oci://bucket@namespace/>#conda",
                          "spark.oracle.datasource.enabled":"true"}
     }
    
     command = f'\'{json.dumps(command)}\''
     print("command",command)
    
     #"configuration":{
     #    "spark.dynamicAllocation.enabled":"true",
     #    "spark.dynamicAllocation.shuffleTracking.enabled":"true",
     #    "spark.dynamicAllocation.minExecutors":"1",
     #    "spark.dynamicAllocation.maxExecutors":"4",
     #    "spark.dynamicAllocation.executorIdleTimeout":"60",
     #    "spark.dynamicAllocation.schedulerBacklogTimeout":"60",
     #    "spark.dataflow.dynamicAllocation.quotaPolicy":"min" }}'
    
     %create_session -l python -c $command
    
    

    Upload Objects

Task 2: Perform Spark operations on OCI Object Storage using the sample code

  1. Import dependent libraries in session.

     %%spark
     #Import required libraries.
    
     import json
     import os
     import sys
     import datetime
     import oci
     import pyspark.sql
     from pyspark.sql.functions import countDistinct
    
     from delta.tables import *
    
    
  2. Perform Spark Read operation on Object Storage. Read Object Storage file using spark.read from Livy Session.

     %%spark -o df_Bronze_Insurance_Data
     #Read Claim Insurance files from OCI Object Storage in Spark Dataframe.
     df_Bronze_Insurance_Data = spark.read.format("csv").option("header", "true") \
     .option("multiLine", "true").load("oci://test-demo@OSNamespace/insur_claim/claim.csv*")
    
     print("df_RawZone_Data",df_Bronze_Insurance_Data)
     df_Bronze_Insurance_Data.show(5)
    
    
  3. Perform Spark Write operation on Object Storage.

     %%spark
     df_Bronze_Insurance_Data.write.format("json").option("mode","overwrite").save("oci://test-demo@OSNamespace/insur_claim/claim_curated")
    
    

Task 3: Perform Read and Write operations on Oracle Autonomous Data Warehouse using the sample code

  1. Load data into Oracle Autonomous Data Warehouse using Secret Vault for Wallet. Copy the following code as it is. For more information, see: GitHub sample.

     %%spark
    
     def get_authenticated_client(token_path, client, file_location=None, profile_name=None):
        """
        Get an an authenticated OCI client.
        Example: get_authenticated_client(token_path, oci.object_storage.ObjectStorageClient)
        """
        import oci
    
        if not in_dataflow():
           # We are running locally, use our API Key.
           if file_location is None:
                 file_location = oci.config.DEFAULT_LOCATION
           if profile_name is None:
                 profile_name = oci.config.DEFAULT_PROFILE
           config = oci.config.from_file(file_location=file_location, profile_name=profile_name)
           authenticated_client = client(config)
        else:
           # We are running in Data Flow, use our Delegation Token.
           with open(token_path) as fd:
                 delegation_token = fd.read()
           signer = oci.auth.signers.InstancePrincipalsDelegationTokenSigner(
                 delegation_token=delegation_token
           )
           authenticated_client = client(config={}, signer=signer)
        return authenticated_client
    
     def get_password_from_secrets(token_path, password_ocid):
        """
        Get a password from the OCI Secrets Service.
        """
        import base64
        import oci
    
        secrets_client = get_authenticated_client(token_path, oci.secrets.SecretsClient)
        response = secrets_client.get_secret_bundle(password_ocid)
        base64_secret_content = response.data.secret_bundle_content.content
        base64_secret_bytes = base64_secret_content.encode("ascii")
        base64_message_bytes = base64.b64decode(base64_secret_bytes)
        secret_content = base64_message_bytes.decode("ascii")
        return secret_content
    
     def get_delegation_token_path(spark):
        """
        Get the delegation token path when we're running in Data Flow.
        """
        if not in_dataflow():
           return None
        token_key = "spark.hadoop.fs.oci.client.auth.delegationTokenPath"
        token_path = spark.sparkContext.getConf().get(token_key)
        if not token_path:
           raise Exception(f"{token_key} is not set")
        return token_path    
    
     def get_temporary_directory():
        if in_dataflow():
           return "/opt/spark/work-dir/"
        else:
           import tempfile
           return tempfile.gettempdir()
    
     def in_dataflow():
        """
        Determine if we are running in OCI Data Flow by checking the environment.
        """
        if os.environ.get("HOME") == "/home/dataflow":
           return True
        return False
    
     def download_wallet(spark, wallet_path):
        """
        Download an Oracle Autonomous Data Warehouse/ATP wallet file and prepare it for use in a Data Flow
        application.
        """
        import oci
        import zipfile
    
        # Get an object store client.
        token_path = get_delegation_token_path(spark)
        object_store_client = get_authenticated_client(
           token_path, oci.object_storage.ObjectStorageClient
        )
    
        # Download the wallet file.
        from urllib.parse import urlparse
        parsed = urlparse(wallet_path)
        bucket_name, namespace = parsed.netloc.split("@")
        file_name = parsed.path[1:]
        response = object_store_client.get_object(namespace, bucket_name, file_name)
        temporary_directory = get_temporary_directory()
        zip_file_path = os.path.join(temporary_directory, "wallet.zip")
        with open(zip_file_path, "wb") as fd:
           for chunk in response.data.raw.stream(1024 * 1024, decode_content=False):
                 fd.write(chunk)
    
        # Extract everything locally.
        with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
           zip_ref.extractall(temporary_directory)
    
        # Distribute all wallet files.
        contents = "cwallet.sso ewallet.p12 keystore.jks ojdbc.properties sqlnet.ora tnsnames.ora truststore.jks".split()
        spark_context = spark.sparkContext
        for file in contents:
           spark_context.addFile(os.path.join(temporary_directory, file))
    
        return temporary_directory
    
    
  2. Set the following parameters related to Oracle Autonomous Data Warehouse Instance and Wallet.

      %%spark
      PASSWORD_SECRET_OCID = "ocid1.vaultsecret.oc1.phx.xxxxxxx"
      TARGET_TABLE = "ADMIN.TB_NAME"
      TNSNAME = "demolakehouseadw_medium"
      USER = "admin"
      WALLET_PATH = "oci://bucketname@osnamespace/Wallet_DemoLakeHouseADW.zip"
    
      # Download and distribute our wallet file.
      wallet_path = download_wallet(spark, WALLET_PATH)
      adw_url = "jdbc:oracle:thin:@{}?TNS_ADMIN={}".format(TNSNAME, wallet_path)
    
    
  3. Get the password using secret service.

      %%spark
      # Get our password using the secret service.
      print("Getting wallet password")
      token_path = get_delegation_token_path(spark)
      password = get_password_from_secrets(token_path, PASSWORD_SECRET_OCID)
      print("Done getting wallet password")
    
      # Save the results to the database.
      print("Saving processed data to " + adw_url)
      properties = {
         "driver": "oracle.jdbc.driver.OracleDriver",
         "oracle.net.tns_admin": TNSNAME,
         "password": password,
         "user": USER
      }
    
  4. Read sample table from Oracle Autonomous Data Warehouse.

      %%spark
      SOURCE_TABLE = "ADMIN.RETAILPOS"
      df_RetailPOS_15min = spark.read.jdbc(url=adw_url, table=SOURCE_TABLE, properties=properties)
    
    
  5. Load the above Dataframe into Oracle Autonomous Data Warehouse.

      %%spark
    
      #Load into Oracle Autonomous Data Warehouse:
    
      TARGET_TABLE = "ADMIN.RETAILPOS_15MINUTES"
      print("TARGET_TABLE : ",TARGET_TABLE)
    
      # Write to Oracle Autonomous Data Warehouse.
      print("Write to Oracle Autonomous Data Warehouse : ")
      df_RetailPOS_15min.write.jdbc(url=adw_url, table=TARGET_TABLE, mode="Append", properties=properties)
      print("Writing done to Oracle Autonomous Data Warehouse : ")
    
    

    Write Into ADW

Next Steps

Acknowledgments

Author - Kumar Chandragupta (OCI Sr. Cloud Engineer)

More Learning Resources

Explore other labs on docs.oracle.com/learn or access more free learning content on the Oracle Learning YouTube channel. Additionally, visit education.oracle.com/learning-explorer to become an Oracle Learning Explorer.

For product documentation, visit Oracle Help Center.