Hinweis:
- Dieses Tutorial erfordert Zugriff auf Oracle Cloud. Informationen zum Anmelden für einen kostenlosen Account finden Sie unter Erste Schritte mit Oracle Cloud Infrastructure Free Tier.
- Es verwendet Beispielwerte für Oracle Cloud Infrastructure-Zugangsdaten, -Mandanten und -Compartments. Wenn Sie Ihre Übung abgeschlossen haben, ersetzen Sie diese Werte durch die Werte, die für Ihre Cloud-Umgebung spezifisch sind.
Führen Sie Ihre Analysen zu OCI Data Flow mit dem interaktiven OCI Data Science-Notizbuch aus
Einführung
Oracle Cloud Infrastructure (OCI) Data Flow ist ein vollständig verwalteter Big Data-Service, mit dem Sie Apache Spark-Anwendungen in beliebiger Größe und nahezu ohne Administration ausführen können. Spark ist zum führenden Big Data-Verarbeitungs-Framework geworden, und OCI Data Flow ist die einfachste Möglichkeit, Spark in Oracle Cloud auszuführen, weil Entwickler nichts zu installieren oder zu verwalten haben.
Zielsetzung
In diesem Tutorial werden Sie durch das Setup geführt, das für den Zugriff auf die OCI Data Flow-Sessions über die OCI Data Science-Notizbuchsession erforderlich ist. Mit diesen Sessions können Sie interaktive Spark-Workloads auf einem langlebigen Data Flow-Cluster über eine Apache Livy-Integration ausführen.
Nachdem die OCI Data Flow-Spark-Session erstellt wurde, führen wir einige Beispielcodes für die Ausführung von Spark-Vorgängen in OCI Object Storage und Oracle Autonomous Data Warehouse durch.
Aufgabe 1: OCI Data Science-Notizbuch mit OCI Data Flow einrichten
OCI Data Flow Conda-Umgebung erstellen und einrichten
-
Erstellen Sie erforderliche Buckets.
a. Erstellen Sie einen Bucket mit dem Namen "dataflow-logs" in Ihrem Mandanten.
b. Erstellen Sie einen Bucket namens "dataflow-warehouse" in Ihrem Mandanten.
-
Erstellen Sie eine dynamische Gruppe in einem bestimmten Compartment.
ALL {resource.type='dataflowrun', resource.compartment.id='<compartment_id>'} ALL {resource.type='datasciencenotebooksession', resource.compartment.id='<compartment_id>'} Any {resource.type = 'datacatalogmetastore'}
-
Erstellen Sie eine Policy zur Verwaltung von OCI-Ressourcen aus OCI Data Flow und OCI Data Science.
-
ALLOW DYNAMIC-GROUP <df-dynamic-group> TO MANAGE objects IN TENANCY WHERE ANY {target.bucket.name='<bucket_name>',target.bucket.name ='dataflow-logs,target.bucket.name='dataflow-warehouse'}
-
ALLOW DYNAMIC-GROUP '<ds-dynamic-group>' TO MANAGE dataflow-family in compartment '<your-compartment-name>'
-
ALLOW DYNAMIC-GROUP '<df-dynamic-group>' TO MANAGE data-catalog-metastores IN TENANCY
-
ALLOW DYNAMIC-GROUP '<dcat-hive-group>' TO READ buckets IN TENANCY
-
ALLOW DYNAMIC-GROUP '<dcat-hive-group>' TO MANAGE object-family IN TENANCY WHERE ANY { target.bucket.name = '<bucket_name>',target.bucket.name = '<managed-table-location-bucket>',target.bucket.name = '<external-table-location-bucket>'}
-
ALLOW DYNAMIC-GROUP '<ds-dynamic-group>' TO MANAGE objects IN TENANCY WHERE ALL {target.bucket.name='ds-conda-env'}
-
ALLOW DYNAMIC-GROUP '<df-dynamic-group>' TO MANAGE objects IN TENANCY WHERE ALL {target.bucket.name='ds-conda-env'}
-
-
OCI Data Science-Projekt und -Session erstellen.
-
Öffnen Sie eine neue OCI Data Science-Session. Wählen Sie im Menü Datei die Option Neuer Starter, und klicken Sie auf Terminal.
-
Installieren und aktivieren Sie die Conda-Umgebung
pyspark32_p38_cpu_v1
über das Terminal.odsc conda install -s pyspark32_p38_cpu_v1 source activate /home/datascience/conda/pyspark32_p38_cpu_v1
-
Nachdem Conda aktiviert wurde, wechseln Sie zur Registerkarte Neuer Starter, und klicken Sie auf Einstellungen. Geben Sie die erforderlichen Informationen zum Objektspeicher ein, in den das Conda-Package hochgeladen und gespeichert wird.
-
Conda-Umgebung veröffentlichen
odsc conda publish -s pyspark3_2anddataflowv1_0
Hinweis: Die Veröffentlichung dauert einige Zeit. Nach Abschluss des Vorgangs können Sie beobachten, dass das Conda-Package in den Object Storage-Bucket hochgeladen wird.
OCI Data Flow-Spark-Session mit Livy Service einrichten und erstellen
Öffnen Sie das Notizbuch mit "PySpark und DataFlow" als Kernel aus dem neuen Launcher, und führen Sie die folgenden Befehle aus, um die OCI Data Flow Spark-Session mit Livy Service einzurichten und zu erstellen:
-
Authentifizierung mit ADS einrichten.
import ads
ads.set_auth("resource_principal") # Supported values: resource_principal, api_key
-
Erweiterung laden.
%load_ext dataflow.magics
-
Erstellen Sie eine OCI Data Flow Spark-Session mit dem Livy-Service über das OCI Data Science-Notizbuch.
import json command = { "compartmentId": "ocid1.compartment.oc1..xxxxxxxxxxxxxx", "displayName": "Demo_DataFlow_Spark_v1", "sparkVersion": "3.2.1", "driverShape": "VM.Standard.E3.Flex", "executorShape": "VM.Standard.E3.Flex", "driverShapeConfig":{"ocpus":1,"memoryInGBs":16}, "executorShapeConfig":{"ocpus":1,"memoryInGBs":16}, "numExecutors": 1, "logsBucketUri": "<oci://bucket@namespace/>", "archiveUri": "<oci://bucket@namespace/archive.zip>" "configuration":{"spark.archives":"<oci://bucket@namespace/>#conda", "spark.oracle.datasource.enabled":"true"} } command = f'\'{json.dumps(command)}\'' print("command",command) #"configuration":{ # "spark.dynamicAllocation.enabled":"true", # "spark.dynamicAllocation.shuffleTracking.enabled":"true", # "spark.dynamicAllocation.minExecutors":"1", # "spark.dynamicAllocation.maxExecutors":"4", # "spark.dynamicAllocation.executorIdleTimeout":"60", # "spark.dynamicAllocation.schedulerBacklogTimeout":"60", # "spark.dataflow.dynamicAllocation.quotaPolicy":"min" }}' %create_session -l python -c $command
Aufgabe 2: Spark-Vorgänge in OCI Object Storage mit dem Beispielcode ausführen
-
Abhängige Librarys in Session importieren.
%%spark #Import required libraries. import json import os import sys import datetime import oci import pyspark.sql from pyspark.sql.functions import countDistinct from delta.tables import *
-
Spark-Lesevorgang für Object Storage ausführen. Lesen Sie Object Storage-Datei mit
spark.read
aus Livy Session.%%spark -o df_Bronze_Insurance_Data #Read Claim Insurance files from OCI Object Storage in Spark Dataframe. df_Bronze_Insurance_Data = spark.read.format("csv").option("header", "true") \ .option("multiLine", "true").load("oci://test-demo@OSNamespace/insur_claim/claim.csv*") print("df_RawZone_Data",df_Bronze_Insurance_Data) df_Bronze_Insurance_Data.show(5)
-
Spark-Schreibvorgang in Object Storage ausführen.
%%spark df_Bronze_Insurance_Data.write.format("json").option("mode","overwrite").save("oci://test-demo@OSNamespace/insur_claim/claim_curated")
Aufgabe 3: Lese- und Schreibvorgänge in Oracle Autonomous Data Warehouse mit dem Beispielcode ausführen
-
Laden Sie Daten mit Secret Vault für Wallet in Oracle Autonomous Data Warehouse. Kopieren Sie den folgenden Code, wie er ist. Weitere Informationen finden Sie unter GitHub-Beispiel.
%%spark def get_authenticated_client(token_path, client, file_location=None, profile_name=None): """ Get an an authenticated OCI client. Example: get_authenticated_client(token_path, oci.object_storage.ObjectStorageClient) """ import oci if not in_dataflow(): # We are running locally, use our API Key. if file_location is None: file_location = oci.config.DEFAULT_LOCATION if profile_name is None: profile_name = oci.config.DEFAULT_PROFILE config = oci.config.from_file(file_location=file_location, profile_name=profile_name) authenticated_client = client(config) else: # We are running in Data Flow, use our Delegation Token. with open(token_path) as fd: delegation_token = fd.read() signer = oci.auth.signers.InstancePrincipalsDelegationTokenSigner( delegation_token=delegation_token ) authenticated_client = client(config={}, signer=signer) return authenticated_client def get_password_from_secrets(token_path, password_ocid): """ Get a password from the OCI Secrets Service. """ import base64 import oci secrets_client = get_authenticated_client(token_path, oci.secrets.SecretsClient) response = secrets_client.get_secret_bundle(password_ocid) base64_secret_content = response.data.secret_bundle_content.content base64_secret_bytes = base64_secret_content.encode("ascii") base64_message_bytes = base64.b64decode(base64_secret_bytes) secret_content = base64_message_bytes.decode("ascii") return secret_content def get_delegation_token_path(spark): """ Get the delegation token path when we're running in Data Flow. """ if not in_dataflow(): return None token_key = "spark.hadoop.fs.oci.client.auth.delegationTokenPath" token_path = spark.sparkContext.getConf().get(token_key) if not token_path: raise Exception(f"{token_key} is not set") return token_path def get_temporary_directory(): if in_dataflow(): return "/opt/spark/work-dir/" else: import tempfile return tempfile.gettempdir() def in_dataflow(): """ Determine if we are running in OCI Data Flow by checking the environment. """ if os.environ.get("HOME") == "/home/dataflow": return True return False def download_wallet(spark, wallet_path): """ Download an Oracle Autonomous Data Warehouse/ATP wallet file and prepare it for use in a Data Flow application. """ import oci import zipfile # Get an object store client. token_path = get_delegation_token_path(spark) object_store_client = get_authenticated_client( token_path, oci.object_storage.ObjectStorageClient ) # Download the wallet file. from urllib.parse import urlparse parsed = urlparse(wallet_path) bucket_name, namespace = parsed.netloc.split("@") file_name = parsed.path[1:] response = object_store_client.get_object(namespace, bucket_name, file_name) temporary_directory = get_temporary_directory() zip_file_path = os.path.join(temporary_directory, "wallet.zip") with open(zip_file_path, "wb") as fd: for chunk in response.data.raw.stream(1024 * 1024, decode_content=False): fd.write(chunk) # Extract everything locally. with zipfile.ZipFile(zip_file_path, "r") as zip_ref: zip_ref.extractall(temporary_directory) # Distribute all wallet files. contents = "cwallet.sso ewallet.p12 keystore.jks ojdbc.properties sqlnet.ora tnsnames.ora truststore.jks".split() spark_context = spark.sparkContext for file in contents: spark_context.addFile(os.path.join(temporary_directory, file)) return temporary_directory
-
Legen Sie die folgenden Parameter für die Oracle Autonomous Data Warehouse-Instanz und das Wallet fest.
%%spark PASSWORD_SECRET_OCID = "ocid1.vaultsecret.oc1.phx.xxxxxxx" TARGET_TABLE = "ADMIN.TB_NAME" TNSNAME = "demolakehouseadw_medium" USER = "admin" WALLET_PATH = "oci://bucketname@osnamespace/Wallet_DemoLakeHouseADW.zip" # Download and distribute our wallet file. wallet_path = download_wallet(spark, WALLET_PATH) adw_url = "jdbc:oracle:thin:@{}?TNS_ADMIN={}".format(TNSNAME, wallet_path)
-
Rufen Sie das Kennwort mit dem Secret Service ab.
%%spark # Get our password using the secret service. print("Getting wallet password") token_path = get_delegation_token_path(spark) password = get_password_from_secrets(token_path, PASSWORD_SECRET_OCID) print("Done getting wallet password") # Save the results to the database. print("Saving processed data to " + adw_url) properties = { "driver": "oracle.jdbc.driver.OracleDriver", "oracle.net.tns_admin": TNSNAME, "password": password, "user": USER }
-
Beispieltabelle aus Oracle Autonomous Data Warehouse lesen.
%%spark SOURCE_TABLE = "ADMIN.RETAILPOS" df_RetailPOS_15min = spark.read.jdbc(url=adw_url, table=SOURCE_TABLE, properties=properties)
-
Laden Sie den obigen Dataframe in Oracle Autonomous Data Warehouse.
%%spark #Load into Oracle Autonomous Data Warehouse: TARGET_TABLE = "ADMIN.RETAILPOS_15MINUTES" print("TARGET_TABLE : ",TARGET_TABLE) # Write to Oracle Autonomous Data Warehouse. print("Write to Oracle Autonomous Data Warehouse : ") df_RetailPOS_15min.write.jdbc(url=adw_url, table=TARGET_TABLE, mode="Append", properties=properties) print("Writing done to Oracle Autonomous Data Warehouse : ")
Nächste Schritte
-
Mit OCI Data Flow können Sie OCI Data Science-Notizbücher konfigurieren, um Anwendungen interaktiv mit Data Flow auszuführen. Sehen Sie sich das Tutorialvideo zur Verwendung von Data Science mit Data Flow Studio an.
-
Weitere Informationen zur Integration von Data Science und Data Flow finden Sie in der Oracle Accelerated Data Science-SDK-Dokumentation.
-
Weitere Beispiele finden Sie unter GitHub mit Data Flow-Beispielen und Data Science-Beispielen.
-
Melden Sie sich noch heute für die kostenlose Oracle Cloud-Testversion an, oder melden Sie sich bei Ihrem Account an, um OCI Data Flow zu testen. Testen Sie das 15-minütige Tutorial ohne Installation von Data Flow, um zu erfahren, wie einfach die Spark-Verarbeitung mit Oracle Cloud Infrastructure sein kann.
Zugehörige Links
Bestätigungen
Autor - Kumar Chandragupta (OCI Sr. Cloud Engineer)
Weitere Lernressourcen
Sehen Sie sich andere Übungen zu docs.oracle.com/learn an, oder greifen Sie auf weitere kostenlose Lerninhalte im Oracle Learning YouTube-Kanal zu. Besuchen Sie außerdem die Website education.oracle.com/learning-explorer, um Oracle Learning Explorer zu werden.
Produktdokumentation finden Sie im Oracle Help Center.
Perform your analytics on OCI Data Flow using OCI Data Science interactive notebook
F79466-01
March 2023
Copyright © 2023, Oracle and/or its affiliates.