Note:
- This tutorial requires access to Oracle Cloud. To sign up for a free account, see Get started with Oracle Cloud Infrastructure Free Tier.
- It uses example values for Oracle Cloud Infrastructure credentials, tenancy, and compartments. When completing your lab, substitute these values with ones specific to your cloud environment.
Perform your analytics on OCI Data Flow using OCI Data Science interactive notebook
Introduction
Oracle Cloud Infrastructure (OCI) Data Flow is a fully managed big data service that lets you run Apache Spark applications at any scale with almost no administration. Spark has become the leading big data processing framework and OCI Data Flow is the easiest way to run Spark in Oracle Cloud because there’s nothing for developers to install or manage.
Objective
The purpose of this tutorial is to walk you through the setup required to access the OCI Data Flow sessions through the OCI Data Science notebook session. These sessions allow you to run interactive Spark workloads on a long lasting Data Flow cluster through an Apache Livy integration.
Also, once OCI Data Flow Spark session is created, we will go through some sample code for performing Spark operations on OCI Object Storage and Oracle Autonomous Data Warehouse.
Task 1: Set up the OCI Data Science notebook with OCI Data Flow
Create and set up OCI Data Flow Conda environment
-
Create required buckets.
a. Create a bucket named dataflow-logs in your tenancy.
b. Create a bucket named dataflow-warehouse in your tenancy.
-
Create a dynamic group in a specific compartment.
ALL {resource.type='dataflowrun', resource.compartment.id='<compartment_id>'} ALL {resource.type='datasciencenotebooksession', resource.compartment.id='<compartment_id>'} Any {resource.type = 'datacatalogmetastore'}
-
Create a policy to manage OCI resources from OCI Data Flow and OCI Data Science.
-
ALLOW DYNAMIC-GROUP <df-dynamic-group> TO MANAGE objects IN TENANCY WHERE ANY {target.bucket.name='<bucket_name>',target.bucket.name ='dataflow-logs,target.bucket.name='dataflow-warehouse'}
-
ALLOW DYNAMIC-GROUP '<ds-dynamic-group>' TO MANAGE dataflow-family in compartment '<your-compartment-name>'
-
ALLOW DYNAMIC-GROUP '<df-dynamic-group>' TO MANAGE data-catalog-metastores IN TENANCY
-
ALLOW DYNAMIC-GROUP '<dcat-hive-group>' TO READ buckets IN TENANCY
-
ALLOW DYNAMIC-GROUP '<dcat-hive-group>' TO MANAGE object-family IN TENANCY WHERE ANY { target.bucket.name = '<bucket_name>',target.bucket.name = '<managed-table-location-bucket>',target.bucket.name = '<external-table-location-bucket>'}
-
ALLOW DYNAMIC-GROUP '<ds-dynamic-group>' TO MANAGE objects IN TENANCY WHERE ALL {target.bucket.name='ds-conda-env'}
-
ALLOW DYNAMIC-GROUP '<df-dynamic-group>' TO MANAGE objects IN TENANCY WHERE ALL {target.bucket.name='ds-conda-env'}
-
-
Create OCI Data Science project and session.
-
Open a new OCI Data Science session. From the File menu, select New Launcher and then click Terminal.
-
Install and activate the
pyspark32_p38_cpu_v1
Conda environment from your terminal.odsc conda install -s pyspark32_p38_cpu_v1 source activate /home/datascience/conda/pyspark32_p38_cpu_v1
-
Once Conda is activated, go to the New Launcher tab and click Settings. Enter the required information about object storage where Conda package will be uploaded and save.
-
Publish Conda environment.
odsc conda publish -s pyspark3_2anddataflowv1_0
Note: Publishing will take some time. Once it is completed, you can observe Conda package is uploaded on the Object Storage bucket.
Set up and create OCI Data Flow Spark session using Livy Service
Open Notebook using “PySpark and DataFlow” as kernel from new Launcher and execute the following commands to set up and create OCI Data Flow Spark session using Livy Service:
-
Setup Authentication using ADS.
import ads
ads.set_auth("resource_principal") # Supported values: resource_principal, api_key
-
Load Extension.
%load_ext dataflow.magics
-
Create OCI Data Flow Spark session using Livy service through OCI Data Science notebook.
import json command = { "compartmentId": "ocid1.compartment.oc1..xxxxxxxxxxxxxx", "displayName": "Demo_DataFlow_Spark_v1", "sparkVersion": "3.2.1", "driverShape": "VM.Standard.E3.Flex", "executorShape": "VM.Standard.E3.Flex", "driverShapeConfig":{"ocpus":1,"memoryInGBs":16}, "executorShapeConfig":{"ocpus":1,"memoryInGBs":16}, "numExecutors": 1, "logsBucketUri": "<oci://bucket@namespace/>", "archiveUri": "<oci://bucket@namespace/archive.zip>" "configuration":{"spark.archives":"<oci://bucket@namespace/>#conda", "spark.oracle.datasource.enabled":"true"} } command = f'\'{json.dumps(command)}\'' print("command",command) #"configuration":{ # "spark.dynamicAllocation.enabled":"true", # "spark.dynamicAllocation.shuffleTracking.enabled":"true", # "spark.dynamicAllocation.minExecutors":"1", # "spark.dynamicAllocation.maxExecutors":"4", # "spark.dynamicAllocation.executorIdleTimeout":"60", # "spark.dynamicAllocation.schedulerBacklogTimeout":"60", # "spark.dataflow.dynamicAllocation.quotaPolicy":"min" }}' %create_session -l python -c $command
Task 2: Perform Spark operations on OCI Object Storage using the sample code
-
Import dependent libraries in session.
%%spark #Import required libraries. import json import os import sys import datetime import oci import pyspark.sql from pyspark.sql.functions import countDistinct from delta.tables import *
-
Perform Spark Read operation on Object Storage. Read Object Storage file using
spark.read
from Livy Session.%%spark -o df_Bronze_Insurance_Data #Read Claim Insurance files from OCI Object Storage in Spark Dataframe. df_Bronze_Insurance_Data = spark.read.format("csv").option("header", "true") \ .option("multiLine", "true").load("oci://test-demo@OSNamespace/insur_claim/claim.csv*") print("df_RawZone_Data",df_Bronze_Insurance_Data) df_Bronze_Insurance_Data.show(5)
-
Perform Spark Write operation on Object Storage.
%%spark df_Bronze_Insurance_Data.write.format("json").option("mode","overwrite").save("oci://test-demo@OSNamespace/insur_claim/claim_curated")
Task 3: Perform Read and Write operations on Oracle Autonomous Data Warehouse using the sample code
-
Load data into Oracle Autonomous Data Warehouse using Secret Vault for Wallet. Copy the following code as it is. For more information, see: GitHub sample.
%%spark def get_authenticated_client(token_path, client, file_location=None, profile_name=None): """ Get an an authenticated OCI client. Example: get_authenticated_client(token_path, oci.object_storage.ObjectStorageClient) """ import oci if not in_dataflow(): # We are running locally, use our API Key. if file_location is None: file_location = oci.config.DEFAULT_LOCATION if profile_name is None: profile_name = oci.config.DEFAULT_PROFILE config = oci.config.from_file(file_location=file_location, profile_name=profile_name) authenticated_client = client(config) else: # We are running in Data Flow, use our Delegation Token. with open(token_path) as fd: delegation_token = fd.read() signer = oci.auth.signers.InstancePrincipalsDelegationTokenSigner( delegation_token=delegation_token ) authenticated_client = client(config={}, signer=signer) return authenticated_client def get_password_from_secrets(token_path, password_ocid): """ Get a password from the OCI Secrets Service. """ import base64 import oci secrets_client = get_authenticated_client(token_path, oci.secrets.SecretsClient) response = secrets_client.get_secret_bundle(password_ocid) base64_secret_content = response.data.secret_bundle_content.content base64_secret_bytes = base64_secret_content.encode("ascii") base64_message_bytes = base64.b64decode(base64_secret_bytes) secret_content = base64_message_bytes.decode("ascii") return secret_content def get_delegation_token_path(spark): """ Get the delegation token path when we're running in Data Flow. """ if not in_dataflow(): return None token_key = "spark.hadoop.fs.oci.client.auth.delegationTokenPath" token_path = spark.sparkContext.getConf().get(token_key) if not token_path: raise Exception(f"{token_key} is not set") return token_path def get_temporary_directory(): if in_dataflow(): return "/opt/spark/work-dir/" else: import tempfile return tempfile.gettempdir() def in_dataflow(): """ Determine if we are running in OCI Data Flow by checking the environment. """ if os.environ.get("HOME") == "/home/dataflow": return True return False def download_wallet(spark, wallet_path): """ Download an Oracle Autonomous Data Warehouse/ATP wallet file and prepare it for use in a Data Flow application. """ import oci import zipfile # Get an object store client. token_path = get_delegation_token_path(spark) object_store_client = get_authenticated_client( token_path, oci.object_storage.ObjectStorageClient ) # Download the wallet file. from urllib.parse import urlparse parsed = urlparse(wallet_path) bucket_name, namespace = parsed.netloc.split("@") file_name = parsed.path[1:] response = object_store_client.get_object(namespace, bucket_name, file_name) temporary_directory = get_temporary_directory() zip_file_path = os.path.join(temporary_directory, "wallet.zip") with open(zip_file_path, "wb") as fd: for chunk in response.data.raw.stream(1024 * 1024, decode_content=False): fd.write(chunk) # Extract everything locally. with zipfile.ZipFile(zip_file_path, "r") as zip_ref: zip_ref.extractall(temporary_directory) # Distribute all wallet files. contents = "cwallet.sso ewallet.p12 keystore.jks ojdbc.properties sqlnet.ora tnsnames.ora truststore.jks".split() spark_context = spark.sparkContext for file in contents: spark_context.addFile(os.path.join(temporary_directory, file)) return temporary_directory
-
Set the following parameters related to Oracle Autonomous Data Warehouse Instance and Wallet.
%%spark PASSWORD_SECRET_OCID = "ocid1.vaultsecret.oc1.phx.xxxxxxx" TARGET_TABLE = "ADMIN.TB_NAME" TNSNAME = "demolakehouseadw_medium" USER = "admin" WALLET_PATH = "oci://bucketname@osnamespace/Wallet_DemoLakeHouseADW.zip" # Download and distribute our wallet file. wallet_path = download_wallet(spark, WALLET_PATH) adw_url = "jdbc:oracle:thin:@{}?TNS_ADMIN={}".format(TNSNAME, wallet_path)
-
Get the password using secret service.
%%spark # Get our password using the secret service. print("Getting wallet password") token_path = get_delegation_token_path(spark) password = get_password_from_secrets(token_path, PASSWORD_SECRET_OCID) print("Done getting wallet password") # Save the results to the database. print("Saving processed data to " + adw_url) properties = { "driver": "oracle.jdbc.driver.OracleDriver", "oracle.net.tns_admin": TNSNAME, "password": password, "user": USER }
-
Read sample table from Oracle Autonomous Data Warehouse.
%%spark SOURCE_TABLE = "ADMIN.RETAILPOS" df_RetailPOS_15min = spark.read.jdbc(url=adw_url, table=SOURCE_TABLE, properties=properties)
-
Load the above Dataframe into Oracle Autonomous Data Warehouse.
%%spark #Load into Oracle Autonomous Data Warehouse: TARGET_TABLE = "ADMIN.RETAILPOS_15MINUTES" print("TARGET_TABLE : ",TARGET_TABLE) # Write to Oracle Autonomous Data Warehouse. print("Write to Oracle Autonomous Data Warehouse : ") df_RetailPOS_15min.write.jdbc(url=adw_url, table=TARGET_TABLE, mode="Append", properties=properties) print("Writing done to Oracle Autonomous Data Warehouse : ")
Next Steps
-
With OCI Data Flow, you can configure OCI Data Science notebooks to run applications interactively against Data Flow. Watch the tutorial video on using Data Science with Data Flow Studio.
-
Also see the Oracle Accelerated Data Science SDK documentation for more information on integrating Data Science and Data Flow.
-
More examples are available from GitHub with Data Flow samples and Data Science samples.
-
To get started today, sign up for the Oracle Cloud Free Trial or sign in to your account to try OCI Data Flow. Try Data Flow’s 15-minute no-installation-required tutorial to see just how easy Spark processing can be with Oracle Cloud Infrastructure.
Related links
Acknowledgments
Author - Kumar Chandragupta (OCI Sr. Cloud Engineer)
More Learning Resources
Explore other labs on docs.oracle.com/learn or access more free learning content on the Oracle Learning YouTube channel. Additionally, visit education.oracle.com/learning-explorer to become an Oracle Learning Explorer.
For product documentation, visit Oracle Help Center.
Perform your analytics on OCI Data Flow using OCI Data Science interactive notebook
F79466-01
March 2023
Copyright © 2023, Oracle and/or its affiliates.