Hinweis:

Führen Sie Ihre Analysen zu OCI Data Flow mit dem interaktiven OCI Data Science-Notizbuch aus

Einführung

Oracle Cloud Infrastructure (OCI) Data Flow ist ein vollständig verwalteter Big Data-Service, mit dem Sie Apache Spark-Anwendungen in beliebiger Größe und nahezu ohne Administration ausführen können. Spark ist zum führenden Big Data-Verarbeitungs-Framework geworden, und OCI Data Flow ist die einfachste Möglichkeit, Spark in Oracle Cloud auszuführen, weil Entwickler nichts zu installieren oder zu verwalten haben.

Zielsetzung

In diesem Tutorial werden Sie durch das Setup geführt, das für den Zugriff auf die OCI Data Flow-Sessions über die OCI Data Science-Notizbuchsession erforderlich ist. Mit diesen Sessions können Sie interaktive Spark-Workloads auf einem langlebigen Data Flow-Cluster über eine Apache Livy-Integration ausführen.

Nachdem die OCI Data Flow-Spark-Session erstellt wurde, führen wir einige Beispielcodes für die Ausführung von Spark-Vorgängen in OCI Object Storage und Oracle Autonomous Data Warehouse durch.

Aufgabe 1: OCI Data Science-Notizbuch mit OCI Data Flow einrichten

OCI Data Flow Conda-Umgebung erstellen und einrichten

  1. Erstellen Sie erforderliche Buckets.

    a. Erstellen Sie einen Bucket mit dem Namen "dataflow-logs" in Ihrem Mandanten.

    b. Erstellen Sie einen Bucket namens "dataflow-warehouse" in Ihrem Mandanten.

  2. Erstellen Sie eine dynamische Gruppe in einem bestimmten Compartment.

      ALL {resource.type='dataflowrun', resource.compartment.id='<compartment_id>'}
      ALL {resource.type='datasciencenotebooksession', resource.compartment.id='<compartment_id>'}
      Any {resource.type = 'datacatalogmetastore'}
    
    
  3. Erstellen Sie eine Policy zur Verwaltung von OCI-Ressourcen aus OCI Data Flow und OCI Data Science.

    • ALLOW DYNAMIC-GROUP <df-dynamic-group> TO MANAGE objects IN TENANCY WHERE ANY {target.bucket.name='<bucket_name>',target.bucket.name ='dataflow-logs,target.bucket.name='dataflow-warehouse'}

    • ALLOW DYNAMIC-GROUP '<ds-dynamic-group>' TO MANAGE dataflow-family in compartment '<your-compartment-name>'

    • ALLOW DYNAMIC-GROUP '<df-dynamic-group>' TO MANAGE data-catalog-metastores IN TENANCY

    • ALLOW DYNAMIC-GROUP '<dcat-hive-group>' TO READ buckets IN TENANCY

    • ALLOW DYNAMIC-GROUP '<dcat-hive-group>' TO MANAGE object-family IN TENANCY WHERE ANY { target.bucket.name = '<bucket_name>',target.bucket.name = '<managed-table-location-bucket>',target.bucket.name = '<external-table-location-bucket>'}

    • ALLOW DYNAMIC-GROUP '<ds-dynamic-group>' TO MANAGE objects IN TENANCY WHERE ALL {target.bucket.name='ds-conda-env'}

    • ALLOW DYNAMIC-GROUP '<df-dynamic-group>' TO MANAGE objects IN TENANCY WHERE ALL {target.bucket.name='ds-conda-env'}

  4. OCI Data Science-Projekt und -Session erstellen.

    OCI Data Science-Notizbuchsessiondetails

  5. Öffnen Sie eine neue OCI Data Science-Session. Wählen Sie im Menü Datei die Option Neuer Starter, und klicken Sie auf Terminal.

    OCI Data Science-Sessiondetails

  6. Installieren und aktivieren Sie die Conda-Umgebung pyspark32_p38_cpu_v1 über das Terminal.

     odsc conda install -s pyspark32_p38_cpu_v1
     source activate /home/datascience/conda/pyspark32_p38_cpu_v1
    
    
  7. Nachdem Conda aktiviert wurde, wechseln Sie zur Registerkarte Neuer Starter, und klicken Sie auf Einstellungen. Geben Sie die erforderlichen Informationen zum Objektspeicher ein, in den das Conda-Package hochgeladen und gespeichert wird.

    Conda-Umgebung für DS-Session einrichten

  8. Conda-Umgebung veröffentlichen

    odsc conda publish -s pyspark3_2anddataflowv1_0

    Hinweis: Die Veröffentlichung dauert einige Zeit. Nach Abschluss des Vorgangs können Sie beobachten, dass das Conda-Package in den Object Storage-Bucket hochgeladen wird.

OCI Data Flow-Spark-Session mit Livy Service einrichten und erstellen

Öffnen Sie das Notizbuch mit "PySpark und DataFlow" als Kernel aus dem neuen Launcher, und führen Sie die folgenden Befehle aus, um die OCI Data Flow Spark-Session mit Livy Service einzurichten und zu erstellen:

  1. Authentifizierung mit ADS einrichten.

    import ads ads.set_auth("resource_principal") # Supported values: resource_principal, api_key

  2. Erweiterung laden.

    %load_ext dataflow.magics

  3. Erstellen Sie eine OCI Data Flow Spark-Session mit dem Livy-Service über das OCI Data Science-Notizbuch.

     import json
     command = {
        "compartmentId": "ocid1.compartment.oc1..xxxxxxxxxxxxxx",
        "displayName": "Demo_DataFlow_Spark_v1",
        "sparkVersion": "3.2.1",
        "driverShape": "VM.Standard.E3.Flex",
        "executorShape": "VM.Standard.E3.Flex",
        "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "numExecutors": 1,
        "logsBucketUri": "<oci://bucket@namespace/>",
        "archiveUri": "<oci://bucket@namespace/archive.zip>"
        "configuration":{"spark.archives":"<oci://bucket@namespace/>#conda",
                          "spark.oracle.datasource.enabled":"true"}
     }
    
     command = f'\'{json.dumps(command)}\''
     print("command",command)
    
     #"configuration":{
     #    "spark.dynamicAllocation.enabled":"true",
     #    "spark.dynamicAllocation.shuffleTracking.enabled":"true",
     #    "spark.dynamicAllocation.minExecutors":"1",
     #    "spark.dynamicAllocation.maxExecutors":"4",
     #    "spark.dynamicAllocation.executorIdleTimeout":"60",
     #    "spark.dynamicAllocation.schedulerBacklogTimeout":"60",
     #    "spark.dataflow.dynamicAllocation.quotaPolicy":"min" }}'
    
     %create_session -l python -c $command
    
    

    Objekte hochladen

Aufgabe 2: Spark-Vorgänge in OCI Object Storage mit dem Beispielcode ausführen

  1. Abhängige Librarys in Session importieren.

     %%spark
     #Import required libraries.
    
     import json
     import os
     import sys
     import datetime
     import oci
     import pyspark.sql
     from pyspark.sql.functions import countDistinct
    
     from delta.tables import *
    
    
  2. Spark-Lesevorgang für Object Storage ausführen. Lesen Sie Object Storage-Datei mit spark.read aus Livy Session.

     %%spark -o df_Bronze_Insurance_Data
     #Read Claim Insurance files from OCI Object Storage in Spark Dataframe.
     df_Bronze_Insurance_Data = spark.read.format("csv").option("header", "true") \
     .option("multiLine", "true").load("oci://test-demo@OSNamespace/insur_claim/claim.csv*")
    
     print("df_RawZone_Data",df_Bronze_Insurance_Data)
     df_Bronze_Insurance_Data.show(5)
    
    
  3. Spark-Schreibvorgang in Object Storage ausführen.

     %%spark
     df_Bronze_Insurance_Data.write.format("json").option("mode","overwrite").save("oci://test-demo@OSNamespace/insur_claim/claim_curated")
    
    

Aufgabe 3: Lese- und Schreibvorgänge in Oracle Autonomous Data Warehouse mit dem Beispielcode ausführen

  1. Laden Sie Daten mit Secret Vault für Wallet in Oracle Autonomous Data Warehouse. Kopieren Sie den folgenden Code, wie er ist. Weitere Informationen finden Sie unter GitHub-Beispiel.

     %%spark
    
     def get_authenticated_client(token_path, client, file_location=None, profile_name=None):
        """
        Get an an authenticated OCI client.
        Example: get_authenticated_client(token_path, oci.object_storage.ObjectStorageClient)
        """
        import oci
    
        if not in_dataflow():
           # We are running locally, use our API Key.
           if file_location is None:
                 file_location = oci.config.DEFAULT_LOCATION
           if profile_name is None:
                 profile_name = oci.config.DEFAULT_PROFILE
           config = oci.config.from_file(file_location=file_location, profile_name=profile_name)
           authenticated_client = client(config)
        else:
           # We are running in Data Flow, use our Delegation Token.
           with open(token_path) as fd:
                 delegation_token = fd.read()
           signer = oci.auth.signers.InstancePrincipalsDelegationTokenSigner(
                 delegation_token=delegation_token
           )
           authenticated_client = client(config={}, signer=signer)
        return authenticated_client
    
     def get_password_from_secrets(token_path, password_ocid):
        """
        Get a password from the OCI Secrets Service.
        """
        import base64
        import oci
    
        secrets_client = get_authenticated_client(token_path, oci.secrets.SecretsClient)
        response = secrets_client.get_secret_bundle(password_ocid)
        base64_secret_content = response.data.secret_bundle_content.content
        base64_secret_bytes = base64_secret_content.encode("ascii")
        base64_message_bytes = base64.b64decode(base64_secret_bytes)
        secret_content = base64_message_bytes.decode("ascii")
        return secret_content
    
     def get_delegation_token_path(spark):
        """
        Get the delegation token path when we're running in Data Flow.
        """
        if not in_dataflow():
           return None
        token_key = "spark.hadoop.fs.oci.client.auth.delegationTokenPath"
        token_path = spark.sparkContext.getConf().get(token_key)
        if not token_path:
           raise Exception(f"{token_key} is not set")
        return token_path    
    
     def get_temporary_directory():
        if in_dataflow():
           return "/opt/spark/work-dir/"
        else:
           import tempfile
           return tempfile.gettempdir()
    
     def in_dataflow():
        """
        Determine if we are running in OCI Data Flow by checking the environment.
        """
        if os.environ.get("HOME") == "/home/dataflow":
           return True
        return False
    
     def download_wallet(spark, wallet_path):
        """
        Download an Oracle Autonomous Data Warehouse/ATP wallet file and prepare it for use in a Data Flow
        application.
        """
        import oci
        import zipfile
    
        # Get an object store client.
        token_path = get_delegation_token_path(spark)
        object_store_client = get_authenticated_client(
           token_path, oci.object_storage.ObjectStorageClient
        )
    
        # Download the wallet file.
        from urllib.parse import urlparse
        parsed = urlparse(wallet_path)
        bucket_name, namespace = parsed.netloc.split("@")
        file_name = parsed.path[1:]
        response = object_store_client.get_object(namespace, bucket_name, file_name)
        temporary_directory = get_temporary_directory()
        zip_file_path = os.path.join(temporary_directory, "wallet.zip")
        with open(zip_file_path, "wb") as fd:
           for chunk in response.data.raw.stream(1024 * 1024, decode_content=False):
                 fd.write(chunk)
    
        # Extract everything locally.
        with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
           zip_ref.extractall(temporary_directory)
    
        # Distribute all wallet files.
        contents = "cwallet.sso ewallet.p12 keystore.jks ojdbc.properties sqlnet.ora tnsnames.ora truststore.jks".split()
        spark_context = spark.sparkContext
        for file in contents:
           spark_context.addFile(os.path.join(temporary_directory, file))
    
        return temporary_directory
    
    
  2. Legen Sie die folgenden Parameter für die Oracle Autonomous Data Warehouse-Instanz und das Wallet fest.

      %%spark
      PASSWORD_SECRET_OCID = "ocid1.vaultsecret.oc1.phx.xxxxxxx"
      TARGET_TABLE = "ADMIN.TB_NAME"
      TNSNAME = "demolakehouseadw_medium"
      USER = "admin"
      WALLET_PATH = "oci://bucketname@osnamespace/Wallet_DemoLakeHouseADW.zip"
    
      # Download and distribute our wallet file.
      wallet_path = download_wallet(spark, WALLET_PATH)
      adw_url = "jdbc:oracle:thin:@{}?TNS_ADMIN={}".format(TNSNAME, wallet_path)
    
    
  3. Rufen Sie das Kennwort mit dem Secret Service ab.

      %%spark
      # Get our password using the secret service.
      print("Getting wallet password")
      token_path = get_delegation_token_path(spark)
      password = get_password_from_secrets(token_path, PASSWORD_SECRET_OCID)
      print("Done getting wallet password")
    
      # Save the results to the database.
      print("Saving processed data to " + adw_url)
      properties = {
         "driver": "oracle.jdbc.driver.OracleDriver",
         "oracle.net.tns_admin": TNSNAME,
         "password": password,
         "user": USER
      }
    
  4. Beispieltabelle aus Oracle Autonomous Data Warehouse lesen.

      %%spark
      SOURCE_TABLE = "ADMIN.RETAILPOS"
      df_RetailPOS_15min = spark.read.jdbc(url=adw_url, table=SOURCE_TABLE, properties=properties)
    
    
  5. Laden Sie den obigen Dataframe in Oracle Autonomous Data Warehouse.

      %%spark
    
      #Load into Oracle Autonomous Data Warehouse:
    
      TARGET_TABLE = "ADMIN.RETAILPOS_15MINUTES"
      print("TARGET_TABLE : ",TARGET_TABLE)
    
      # Write to Oracle Autonomous Data Warehouse.
      print("Write to Oracle Autonomous Data Warehouse : ")
      df_RetailPOS_15min.write.jdbc(url=adw_url, table=TARGET_TABLE, mode="Append", properties=properties)
      print("Writing done to Oracle Autonomous Data Warehouse : ")
    
    

    In ADW schreiben

Nächste Schritte

Bestätigungen

Autor - Kumar Chandragupta (OCI Sr. Cloud Engineer)

Weitere Lernressourcen

Sehen Sie sich andere Übungen zu docs.oracle.com/learn an, oder greifen Sie auf weitere kostenlose Lerninhalte im Oracle Learning YouTube-Kanal zu. Besuchen Sie außerdem die Website education.oracle.com/learning-explorer, um Oracle Learning Explorer zu werden.

Produktdokumentation finden Sie im Oracle Help Center.