注意:

使用 Apache Spark 從 OCI 資料流程連線、存取及分析 Google Cloud BigQuery 資料

簡介

建立與 OCI 的多雲端連線

隨著 Oracle Cloud Infrastructure 的快速成長,許多客戶想要移轉至 OCI,或使用 OCI 作為其多雲端解決方案。同樣地,許多客戶想要從 OCI 存取其他雲端資料平台,並使用 OCI 處理 / 運算處理大數據解決方案。

目標

本教學課程將示範如何從 OCI 資料流程 Spark Notebook 連線 Google Cloud BigQuery,並使用 Spark 在 BigQuery 表格上執行一些讀取作業。我們還將說明如何將產生的 Spark 資料範圍寫入 OCI Object Storage 和 Autonomous Data Warehouse。

OCI 資料科學記事本階段作業詳細資訊

解決方案

此解決方案將利用 Apache Spark 功能 (例如平行處理) 和分散在記憶體計算中。OCI 資料流程應用程式也可以透過 OCI 資料整合服務進行排程 / 協調。在此方法中,使用者可以在 OCI 資料流程和互動式記事本上開發其 Spark 命令檔,以便自行運用 OCI 資料流程 Spark 叢集。高階步驟為:

  1. 使用 Apache Spark BigQuery Connector 與 Google Cloud Platform 連線:Google Cloud BigQuery。
  2. 開發完整的 ETL 解決方案。
  3. 從 Google Cloud BigQuery 擷取資料。
  4. 在 OCI 資料流程上使用 Apache Spark Cluster 轉換資料。
  5. 擷取 OCI Object Storage 或 Autonomous Data Warehouse 中的資料。
  6. 使用開發者適用的互動式 Spark Notebook。
  7. 整合所有支援的開源 Spark 套裝程式。
  8. 使用 OCI Data Integration Service 協調命令檔。

必要條件

  1. 可存取入口網站的作用中 OCI 和 Google Cloud 訂閱。

  2. 設定 OCI 資料流程、OCI 物件儲存的儲存桶及 OCI 資料科學記事本。如需詳細資訊,請參閱:

  3. 為 BigQuery 資料庫位於 Google Cloud 的專案建立並下載 Google API JSON 金鑰加密密碼 OCID

  4. Google API JSON 金鑰加密密碼 OCID 上傳至 OCI 物件儲存。

    • OCI 物件儲存範例:oci://demo-bucketname@OSnamespace/gcp_utility/BigQuery/ocigcp_user_creds.json
  5. 下載 Spark BigQuery Jar 並上傳至物件儲存。

  6. 為您的 Google Cloud BigQuery 表格收集下列參數。

    'project' : 'bigquery-public-data'
    'parentProject' : 'core-invention-366213'
    'table' : 'bitcoin_blockchain.transactions'
    "credentialsFile" : "./ocigcp_user_creds.json"
    
  7. 從 OCI 入口網站下載 Autonomous Data Warehouse 公事包,並將使用者 / 密碼詳細資訊保存在手邊。

工作 1:使用 OCI 資料流程的 OCI 資料科學記事本存取 Google Cloud BigQuery

  1. 開啟 OCI 資料科學階段作業,您已經為 OCI 資料流程建立 Conda 環境。請參閱先決條件點 2。

  2. 以核心形式開啟含有資料流程的新記事本。

  3. 建立 OCI 資料流程的 Livy 階段作業,並且提供其他必要資訊,包括 Google Cloud BigQuery。

    spark.archives : oci://demo-ds-conda-env@OSNameSpace/conda_environments/cpu/PySpark 3.2 and Data Flow/1.0/pyspark32_p38_cpu_v1#conda
    spark.files : oci://demo-ds-conda-env@OSNameSpace/gcp_utility/BigQuery/ocigcp_user_creds.json
    spark.jars : oci://demo-ds-conda-env@OSNameSpace/gcp_utility/BigQuery/bigquery_spark-bigquery-with-dependencies_2.12-0.23.2.jar
    spark.oracle.datasource.enabled : true
    

    用 Livy Session for OCI Data Flow 建立 SparkSession 的範例程式碼

    import json
    command = {
    "compartmentId": "ocid1.compartment.oc1..xxxxxxx",
    "displayName": "Demo_BigQuery2ADW_v1",
    "sparkVersion": "3.2.1",
    "driverShape": "VM.Standard.E3.Flex",
    "executorShape": "VM.Standard.E3.Flex",
    "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
    "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
    "numExecutors": 10,
    "configuration":{"spark.archives":"oci://demo-ds-conda-env@osnamespace/conda_environments/cpu/PySpark 3.2 and Data Flow/1.0/pyspark32_p38_cpu_v1#conda",
    "spark.files":"oci://demo-ds-conda-env@osnamespace/gcp_utility/BigQuery/ocigcp_user_creds.json",
    "spark.oracle.datasource.enabled":"true",
    "spark.hadoop.google.cloud.auth.service.account.enable":"true",
    "spark.jars":"oci://demo-ds-conda-env@osnamespace/gcp_utility/BigQuery/bigquery_spark-bigquery-with-dependencies_2.12-0.23.2.jar"
    }
    }
    command = f'\'{json.dumps(command)}\''
    print("command",command)
    
    #enableHiveSupport()
    %create_session -l python -c $command
    
    
  4. 匯入必要的模組。

    %%spark
    
    #Import required libraries.
    import json
    import os
    import sys
    import datetime
    import oci
    
    import google.cloud.bigquery as bigquery
    import google.cloud
    
    import pyspark.sql
    from pyspark.sql.functions import countDistinct
    
    
  5. 閱讀 Google Cloud BigQuery 表格。

    範例程式碼 1

    
    %%spark
    
    # Read from BigQuery : "bitcoin_blockchain.transactions". i.e. At Source "BigQuery"
    #Number of rows : 340,311,544
    #Total logical bytes : 587.14 GB
    
    df_bitcoin_blockchain = spark.read.format('bigquery').option('project','bigquery-public-data').option('parentProject','core-invention-366213').option("credentialsFile","/opt/spark/work-dir/ocigcp_user_creds.json").option('table','bitcoin_blockchain.transactions').load()
    
    print("Total Records Count bitcoin_blockchain.transactions : ",df.count())
    

    範例程式碼 2

    %%spark
    #Read another BigQuery Table
    
    df_RetailPOS_15min = spark.read.format('bigquery').option('project','core-invention-366213').option('parentProject','core-invention-366213').option("credentialsFile","/opt/spark/work-dir/ocigcp_user_creds.json").option('table','Retail_Channel.RetailPOS_15min').load()
    
    df_RetailPOS_15min.show()
    
  6. 將資料載入物件儲存體。

    %%spark
    #Write in Object Storage
    
    df_RetailPOS_15min.write.format("json").option("mode","overwrite").save("oci://ds-Raw@osnamespace/TargetData/bitcoin_blockchain_transactions")
    
    
  7. 使用公事包密碼將資料載入 Autonomous Data Warehouse。

    %%spark
    
    print("Set Parameters for ADW connectivity.")
    
    USERNAME = "admin"
    PASSWORD = "xxxxx"
    connectionId= "demolakehouseadw_medium"
    walletUri = "oci://demo-ds-conda-env@osnamespace/oci_utility/Wallet_DemoLakeHouseADW.zip"
    
    properties = {"connectionId": connectionId,"user" : USERNAME,"password": PASSWORD,"walletUri": walletUri}
    print("properties:",properties)
    
    
    %%spark
    #Load into ADW:
    TARGET_TABLE = "ADMIN.RETAILPOS_15MINUTES"
    print("TARGET_TABLE : ",TARGET_TABLE)
    
    # Write to ADW.
    df_RetailPOS_15min.write.format("oracle").mode("append").option("dbtable",TARGET_TABLE).options(**properties).save()
    
    print("Writing completed to ADW.....")
    

接下來的步驟

確認

作者 - Kumar Chandragupta (OCI 資深雲端工程師)

其他學習資源

探索 docs.oracle.com/learn 的其他實驗室,或者存取更多 Oracle Learning YouTube 頻道上的免費學習內容。此外,請瀏覽 education.oracle.com/learning-explorer 以成為 Oracle Learning 檔案總管。

如需產品文件,請造訪 Oracle Help Center