Hinweis:

Google Cloud BigQuery-Daten aus OCI Data Flow mit Apache Spark verbinden, darauf zugreifen und analysieren

Einführung

Multicloud-Konnektivität mit OCI herstellen

Mit der schnell wachsenden Beliebtheit von Oracle Cloud Infrastructure möchten viele Kunden zu OCI migrieren oder OCI als Multicloud-Lösung verwenden. Ebenso möchten viele Kunden auf andere Cloud-Datenplattformen von OCI zugreifen und OCI für die Verarbeitung/Berechnung im Zusammenhang mit Big-Data-Lösungen verwenden.

Zielsetzung

Dieses Tutorial zeigt, wie Sie Google Cloud BigQuery aus dem OCI Data Flow Spark-Notizbuch verbinden und Lesevorgänge für die Tabelle BigQuery mit Spark ausführen. Außerdem wird beschrieben, wie Sie den resultierenden Spark-Data-aframe in OCI Object Storage und Autonomous Data Warehouse aufschreiben.

OCI Data Science-Notizbuchsessiondetails

Lösung

Bei dieser Lösung wird die Apache Spark-Funktion wie die parallele Verarbeitung und die Verteilung in der Speicherberechnung genutzt. OCI Data Flow-Anwendung kann auch über OCI Data Integration Service geplant/orchestriert werden. In diesem Ansatz können Benutzer ihr Spark-Skript in OCI Data Flow und dem interaktiven Notizbuch entwickeln, das selbst das OCI Data Flow Spark-Cluster nutzt. Die allgemeinen Schritte sind:

  1. Verbindung zu Google Cloud Platform: Google Cloud BigQuery mit Apache Spark BigQuery Connector.
  2. Entwickeln Sie eine vollständige ETL-Lösung.
  3. Extrahieren Sie Daten aus Google Cloud BigQuery.
  4. Transformieren Sie die Daten mit dem Apache Spark-Cluster in OCI Data Flow.
  5. Nehmen Sie Daten in OCI Object Storage oder Autonomous Data Warehouse auf.
  6. Verwenden Sie das benutzerfreundliche interaktive Spark-Notizbuch des Entwicklers.
  7. Integrieren Sie alle unterstützten Open-Source-Spark-Packages.
  8. Orchestrieren Sie Ihr Skript mit OCI Data Integration Service.

Voraussetzungen

  1. Ein aktives OCI- und Google Cloud-Abonnement mit Zugriff auf das Portal.

  2. OCI Data Flow, OCI Object Storage-Bucket und OCI Data Science-Notizbuch einrichten. Weitere Informationen finden Sie unter:

  3. Erstellen und laden Sie die Google API JSON Key Secret-OCID für das Projekt herunter, in dem sich die BigQuery-Datenbank in Google Cloud befindet.

  4. Laden Sie die Google-API-JSON-Schlüssel-Secret-OCID in OCI Object Storage hoch.

    • Beispiel für OCI Object Storage: oci://demo-bucketname@OSnamespace/gcp_utility/BigQuery/ocigcp_user_creds.json
  5. Laden Sie die JAR-Datei Spark BigQuery herunter, und laden Sie sie in Object Storage hoch.

  6. Erfassen Sie die folgenden Parameter für die Google Cloud-Tabelle BigQuery.

    'project' : 'bigquery-public-data'
    'parentProject' : 'core-invention-366213'
    'table' : 'bitcoin_blockchain.transactions'
    "credentialsFile" : "./ocigcp_user_creds.json"
    
  7. Laden Sie das Autonomous Data Warehouse-Wallet aus dem OCI-Portal herunter, und halten Sie die Benutzer-/Kennwortdetails praktisch.

Aufgabe 1: Mit OCI Data Science Notebook mit OCI Data Flow auf Google Cloud BigQuery zugreifen

  1. Öffnen Sie die OCI Data Science-Session, in der Sie bereits eine Conda-Umgebung für OCI Data Flow erstellt haben. Siehe Voraussetzungspunkt 2.

  2. Öffnen Sie das neue Notizbuch mit Data Flow als Kernel.

  3. Erstellen Sie eine Livy-Session für OCI Data Flow, und geben Sie weitere erforderliche Informationen an, einschließlich Google Cloud BigQuery.

    spark.archives : oci://demo-ds-conda-env@OSNameSpace/conda_environments/cpu/PySpark 3.2 and Data Flow/1.0/pyspark32_p38_cpu_v1#conda
    spark.files : oci://demo-ds-conda-env@OSNameSpace/gcp_utility/BigQuery/ocigcp_user_creds.json
    spark.jars : oci://demo-ds-conda-env@OSNameSpace/gcp_utility/BigQuery/bigquery_spark-bigquery-with-dependencies_2.12-0.23.2.jar
    spark.oracle.datasource.enabled : true
    

    Beispielcode zum Erstellen von SparkSession mit Livy-Session für OCI Data Flow:

    import json
    command = {
    "compartmentId": "ocid1.compartment.oc1..xxxxxxx",
    "displayName": "Demo_BigQuery2ADW_v1",
    "sparkVersion": "3.2.1",
    "driverShape": "VM.Standard.E3.Flex",
    "executorShape": "VM.Standard.E3.Flex",
    "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
    "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
    "numExecutors": 10,
    "configuration":{"spark.archives":"oci://demo-ds-conda-env@osnamespace/conda_environments/cpu/PySpark 3.2 and Data Flow/1.0/pyspark32_p38_cpu_v1#conda",
    "spark.files":"oci://demo-ds-conda-env@osnamespace/gcp_utility/BigQuery/ocigcp_user_creds.json",
    "spark.oracle.datasource.enabled":"true",
    "spark.hadoop.google.cloud.auth.service.account.enable":"true",
    "spark.jars":"oci://demo-ds-conda-env@osnamespace/gcp_utility/BigQuery/bigquery_spark-bigquery-with-dependencies_2.12-0.23.2.jar"
    }
    }
    command = f'\'{json.dumps(command)}\''
    print("command",command)
    
    #enableHiveSupport()
    %create_session -l python -c $command
    
    
  4. Importieren Sie die erforderlichen Module.

    %%spark
    
    #Import required libraries.
    import json
    import os
    import sys
    import datetime
    import oci
    
    import google.cloud.bigquery as bigquery
    import google.cloud
    
    import pyspark.sql
    from pyspark.sql.functions import countDistinct
    
    
  5. Lesen Sie die Google Cloud BigQuery-Tabelle.

    Beispielcode 1:

    
    %%spark
    
    # Read from BigQuery : "bitcoin_blockchain.transactions". i.e. At Source "BigQuery"
    #Number of rows : 340,311,544
    #Total logical bytes : 587.14 GB
    
    df_bitcoin_blockchain = spark.read.format('bigquery').option('project','bigquery-public-data').option('parentProject','core-invention-366213').option("credentialsFile","/opt/spark/work-dir/ocigcp_user_creds.json").option('table','bitcoin_blockchain.transactions').load()
    
    print("Total Records Count bitcoin_blockchain.transactions : ",df.count())
    

    Beispielcode 2:

    %%spark
    #Read another BigQuery Table
    
    df_RetailPOS_15min = spark.read.format('bigquery').option('project','core-invention-366213').option('parentProject','core-invention-366213').option("credentialsFile","/opt/spark/work-dir/ocigcp_user_creds.json").option('table','Retail_Channel.RetailPOS_15min').load()
    
    df_RetailPOS_15min.show()
    
  6. Laden Sie Daten in Object Storage.

    %%spark
    #Write in Object Storage
    
    df_RetailPOS_15min.write.format("json").option("mode","overwrite").save("oci://ds-Raw@osnamespace/TargetData/bitcoin_blockchain_transactions")
    
    
  7. Laden Sie Daten mit Wallet-Kennwort in Autonomous Data Warehouse.

    %%spark
    
    print("Set Parameters for ADW connectivity.")
    
    USERNAME = "admin"
    PASSWORD = "xxxxx"
    connectionId= "demolakehouseadw_medium"
    walletUri = "oci://demo-ds-conda-env@osnamespace/oci_utility/Wallet_DemoLakeHouseADW.zip"
    
    properties = {"connectionId": connectionId,"user" : USERNAME,"password": PASSWORD,"walletUri": walletUri}
    print("properties:",properties)
    
    
    %%spark
    #Load into ADW:
    TARGET_TABLE = "ADMIN.RETAILPOS_15MINUTES"
    print("TARGET_TABLE : ",TARGET_TABLE)
    
    # Write to ADW.
    df_RetailPOS_15min.write.format("oracle").mode("append").option("dbtable",TARGET_TABLE).options(**properties).save()
    
    print("Writing completed to ADW.....")
    

Nächste Schritte

Danksagungen

Autor - Kumar Chandragupta (OCI Senior Cloud Engineer)

Weitere Lernressourcen

Sehen Sie sich andere Übungen zu docs.oracle.com/learn an, oder greifen Sie auf weitere kostenlose Lerninhalte im Oracle Learning YouTube-Kanal zu. Besuchen Sie außerdem die Website education.oracle.com/learning-explorer, um Oracle Learning Explorer zu werden.

Produktdokumentation finden Sie im Oracle Help Center.