Nota:

Connettere, accedere e analizzare i dati di Google Cloud BigQuery da Flusso dati OCI utilizzando Apache Spark

Introduzione

Stabilisci la connettività multicloud con OCI

Con la popolarità in rapida crescita di Oracle Cloud Infrastructure, molti clienti desiderano migrare a OCI o utilizzare OCI come soluzione multicloud. Allo stesso modo, molti clienti desiderano accedere ad altre piattaforme di dati cloud di OCI e utilizzare OCI per l'elaborazione/il calcolo quando si tratta di soluzioni di Big Data.

Obiettivo

Questa esercitazione spiega come connettere Google Cloud BigQuery dal notebook Spark di Flusso dati OCI ed eseguire alcune operazioni di lettura sulla tabella BigQuery utilizzando Spark. Inoltre, ti spiegheremo come scrivere il dataframe Spark risultante nello storage degli oggetti OCI e in Autonomous Data Warehouse.

Dettagli sessione notebook OCI Data Science

Soluzione

Questa soluzione sfrutterà la funzionalità Apache Spark come l'elaborazione parallela e la distribuzione nel calcolo della memoria. L'applicazione di Flusso dati OCI può essere inoltre pianificata o orchestrata tramite il servizio Integrazione dati OCI. In questo approccio, l'utente può sviluppare il proprio script Spark su Flusso dati OCI e Blocco appunti interattivo che a sua volta utilizza il cluster Spark di Flusso dati OCI. I passi di alto livello sono:

  1. Connettiti a Google Cloud Platform: Google Cloud BigQuery utilizzando Apache Spark BigQuery Connector.
  2. Sviluppa una soluzione ETL completa.
  3. Estrarre dati da Google Cloud BigQuery.
  4. Trasforma i dati utilizzando il cluster Apache Spark su Flusso dati OCI.
  5. Includi i dati nello storage degli oggetti OCI o in Autonomous Data Warehouse.
  6. Utilizza il notebook interattivo Spark di facile uso da parte degli sviluppatori.
  7. Integra tutti i package Spark open source supportati.
  8. Orchestra lo script utilizzando il servizio OCI Data Integration.

Prerequisiti

  1. Una sottoscrizione attiva a OCI e Google Cloud con accesso al portale.

  2. Impostare Flusso dati OCI, Bucket di storage degli oggetti OCI e Notebook OCI Data Science. Per ulteriori informazioni, vedere:

  3. Creare e scaricare OCID segreto chiave JSON API Google per il progetto in cui risiede il database BigQuery in Google Cloud.

  4. Caricare l'OCID segreto chiave JSON API Google nello storage degli oggetti OCI.

    • Storage degli oggetti OCI di esempio: oci://demo-bucketname@OSnamespace/gcp_utility/BigQuery/ocigcp_user_creds.json
  5. Scarica il file JAR Spark BigQuery e caricalo nello storage degli oggetti.

  6. Raccogliere i seguenti parametri per la tabella BigQuery di Google Cloud.

    'project' : 'bigquery-public-data'
    'parentProject' : 'core-invention-366213'
    'table' : 'bitcoin_blockchain.transactions'
    "credentialsFile" : "./ocigcp_user_creds.json"
    
  7. Scaricare il wallet di Autonomous Data Warehouse dal portale OCI e tenere a portata di mano i dettagli utente/password.

Task 1: accedere a Google Cloud BigQuery mediante il notebook OCI Data Science con Flusso dati OCI

  1. Aprire la sessione di Data Science OCI, in cui è già stato creato l'ambiente Conda per il flusso dati OCI. Cfr. punto prerequisito 2.

  2. Aprire il nuovo notebook con Flusso dati come kernel.

  3. Creare una sessione Livy per Flusso dati OCI e fornire altre informazioni richieste, tra cui Google Cloud BigQuery.

    spark.archives : oci://demo-ds-conda-env@OSNameSpace/conda_environments/cpu/PySpark 3.2 and Data Flow/1.0/pyspark32_p38_cpu_v1#conda
    spark.files : oci://demo-ds-conda-env@OSNameSpace/gcp_utility/BigQuery/ocigcp_user_creds.json
    spark.jars : oci://demo-ds-conda-env@OSNameSpace/gcp_utility/BigQuery/bigquery_spark-bigquery-with-dependencies_2.12-0.23.2.jar
    spark.oracle.datasource.enabled : true
    

    Codice di esempio per creare SparkSession con sessione Livy per il flusso dati OCI:

    import json
    command = {
    "compartmentId": "ocid1.compartment.oc1..xxxxxxx",
    "displayName": "Demo_BigQuery2ADW_v1",
    "sparkVersion": "3.2.1",
    "driverShape": "VM.Standard.E3.Flex",
    "executorShape": "VM.Standard.E3.Flex",
    "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
    "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
    "numExecutors": 10,
    "configuration":{"spark.archives":"oci://demo-ds-conda-env@osnamespace/conda_environments/cpu/PySpark 3.2 and Data Flow/1.0/pyspark32_p38_cpu_v1#conda",
    "spark.files":"oci://demo-ds-conda-env@osnamespace/gcp_utility/BigQuery/ocigcp_user_creds.json",
    "spark.oracle.datasource.enabled":"true",
    "spark.hadoop.google.cloud.auth.service.account.enable":"true",
    "spark.jars":"oci://demo-ds-conda-env@osnamespace/gcp_utility/BigQuery/bigquery_spark-bigquery-with-dependencies_2.12-0.23.2.jar"
    }
    }
    command = f'\'{json.dumps(command)}\''
    print("command",command)
    
    #enableHiveSupport()
    %create_session -l python -c $command
    
    
  4. Importare i moduli richiesti.

    %%spark
    
    #Import required libraries.
    import json
    import os
    import sys
    import datetime
    import oci
    
    import google.cloud.bigquery as bigquery
    import google.cloud
    
    import pyspark.sql
    from pyspark.sql.functions import countDistinct
    
    
  5. Leggi la tabella BigQuery di Google Cloud.

    Codice di esempio 1:

    
    %%spark
    
    # Read from BigQuery : "bitcoin_blockchain.transactions". i.e. At Source "BigQuery"
    #Number of rows : 340,311,544
    #Total logical bytes : 587.14 GB
    
    df_bitcoin_blockchain = spark.read.format('bigquery').option('project','bigquery-public-data').option('parentProject','core-invention-366213').option("credentialsFile","/opt/spark/work-dir/ocigcp_user_creds.json").option('table','bitcoin_blockchain.transactions').load()
    
    print("Total Records Count bitcoin_blockchain.transactions : ",df.count())
    

    Codice di esempio 2:

    %%spark
    #Read another BigQuery Table
    
    df_RetailPOS_15min = spark.read.format('bigquery').option('project','core-invention-366213').option('parentProject','core-invention-366213').option("credentialsFile","/opt/spark/work-dir/ocigcp_user_creds.json").option('table','Retail_Channel.RetailPOS_15min').load()
    
    df_RetailPOS_15min.show()
    
  6. Caricare i dati nello storage degli oggetti.

    %%spark
    #Write in Object Storage
    
    df_RetailPOS_15min.write.format("json").option("mode","overwrite").save("oci://ds-Raw@osnamespace/TargetData/bitcoin_blockchain_transactions")
    
    
  7. Caricare i dati in Autonomous Data Warehouse utilizzando la password del wallet.

    %%spark
    
    print("Set Parameters for ADW connectivity.")
    
    USERNAME = "admin"
    PASSWORD = "xxxxx"
    connectionId= "demolakehouseadw_medium"
    walletUri = "oci://demo-ds-conda-env@osnamespace/oci_utility/Wallet_DemoLakeHouseADW.zip"
    
    properties = {"connectionId": connectionId,"user" : USERNAME,"password": PASSWORD,"walletUri": walletUri}
    print("properties:",properties)
    
    
    %%spark
    #Load into ADW:
    TARGET_TABLE = "ADMIN.RETAILPOS_15MINUTES"
    print("TARGET_TABLE : ",TARGET_TABLE)
    
    # Write to ADW.
    df_RetailPOS_15min.write.format("oracle").mode("append").option("dbtable",TARGET_TABLE).options(**properties).save()
    
    print("Writing completed to ADW.....")
    

Passi successivi

Approvazioni

Autore - Kumar Chandragupta ( Senior Cloud Engineer OCI)

Altre risorse di apprendimento

Esplora altri laboratori su docs.oracle.com/learn o accedi a contenuti di formazione gratuiti sul canale YouTube di Oracle Learning. Inoltre, visitare education.oracle.com/learning-explorer per diventare Explorer di Oracle Learning.

Per la documentazione sul prodotto, visitare il sito Oracle Help Center.