注意:
- 此教學課程需要存取 Oracle Cloud。若要註冊免費帳戶,請參閱開始使用 Oracle Cloud Infrastructure Free Tier 。
- 它會使用 Oracle Cloud Infrastructure 證明資料、租用戶及區間的範例值。完成實驗室時,請將這些值替代為您雲端環境特定的值。
使用 Apache Spark 從 OCI 資料流程連線、存取及分析 Google Cloud BigQuery 資料
簡介
建立與 OCI 的多雲端連線
隨著 Oracle Cloud Infrastructure 的快速成長,許多客戶想要移轉至 OCI,或使用 OCI 作為其多雲端解決方案。同樣地,許多客戶想要從 OCI 存取其他雲端資料平台,並使用 OCI 處理 / 運算處理大數據解決方案。
目標
本教學課程將示範如何從 OCI 資料流程 Spark Notebook 連線 Google Cloud BigQuery,並使用 Spark 在 BigQuery 表格上執行一些讀取作業。我們還將說明如何將產生的 Spark 資料範圍寫入 OCI Object Storage 和 Autonomous Data Warehouse。

解決方案
此解決方案將利用 Apache Spark 功能 (例如平行處理) 和分散在記憶體計算中。OCI 資料流程應用程式也可以透過 OCI 資料整合服務進行排程 / 協調。在此方法中,使用者可以在 OCI 資料流程和互動式記事本上開發其 Spark 命令檔,以便自行運用 OCI 資料流程 Spark 叢集。高階步驟為:
- 使用 Apache Spark BigQuery Connector 與 Google Cloud Platform 連線:Google Cloud BigQuery。
- 開發完整的 ETL 解決方案。
- 從 Google Cloud BigQuery 擷取資料。
- 在 OCI 資料流程上使用 Apache Spark Cluster 轉換資料。
- 擷取 OCI Object Storage 或 Autonomous Data Warehouse 中的資料。
- 使用開發者適用的互動式 Spark Notebook。
- 整合所有支援的開源 Spark 套裝程式。
- 使用 OCI Data Integration Service 協調命令檔。
必要條件
-
可存取入口網站的作用中 OCI 和 Google Cloud 訂閱。
-
設定 OCI 資料流程、OCI 物件儲存的儲存桶及 OCI 資料科學記事本。如需詳細資訊,請參閱:
-
為 BigQuery 資料庫位於 Google Cloud 的專案建立並下載 Google API JSON 金鑰加密密碼 OCID 。
-
將 Google API JSON 金鑰加密密碼 OCID 上傳至 OCI 物件儲存。
- OCI 物件儲存範例:
oci://demo-bucketname@OSnamespace/gcp_utility/BigQuery/ocigcp_user_creds.json
- OCI 物件儲存範例:
-
下載 Spark BigQuery Jar 並上傳至物件儲存。
-
範例:
spark-bigquery-with-dependencies_2.12-0.23.2.jar
-
-
為您的 Google Cloud BigQuery 表格收集下列參數。
'project' : 'bigquery-public-data' 'parentProject' : 'core-invention-366213' 'table' : 'bitcoin_blockchain.transactions' "credentialsFile" : "./ocigcp_user_creds.json" -
從 OCI 入口網站下載 Autonomous Data Warehouse 公事包,並將使用者 / 密碼詳細資訊保存在手邊。
工作 1:使用 OCI 資料流程的 OCI 資料科學記事本存取 Google Cloud BigQuery
-
開啟 OCI 資料科學階段作業,您已經為 OCI 資料流程建立 Conda 環境。請參閱先決條件點 2。
-
以核心形式開啟含有資料流程的新記事本。
-
建立 OCI 資料流程的 Livy 階段作業,並且提供其他必要資訊,包括 Google Cloud BigQuery。
spark.archives : oci://demo-ds-conda-env@OSNameSpace/conda_environments/cpu/PySpark 3.2 and Data Flow/1.0/pyspark32_p38_cpu_v1#conda spark.files : oci://demo-ds-conda-env@OSNameSpace/gcp_utility/BigQuery/ocigcp_user_creds.json spark.jars : oci://demo-ds-conda-env@OSNameSpace/gcp_utility/BigQuery/bigquery_spark-bigquery-with-dependencies_2.12-0.23.2.jar spark.oracle.datasource.enabled : true用 Livy Session for OCI Data Flow 建立 SparkSession 的範例程式碼:
import json command = { "compartmentId": "ocid1.compartment.oc1..xxxxxxx", "displayName": "Demo_BigQuery2ADW_v1", "sparkVersion": "3.2.1", "driverShape": "VM.Standard.E3.Flex", "executorShape": "VM.Standard.E3.Flex", "driverShapeConfig":{"ocpus":1,"memoryInGBs":16}, "executorShapeConfig":{"ocpus":1,"memoryInGBs":16}, "numExecutors": 10, "configuration":{"spark.archives":"oci://demo-ds-conda-env@osnamespace/conda_environments/cpu/PySpark 3.2 and Data Flow/1.0/pyspark32_p38_cpu_v1#conda", "spark.files":"oci://demo-ds-conda-env@osnamespace/gcp_utility/BigQuery/ocigcp_user_creds.json", "spark.oracle.datasource.enabled":"true", "spark.hadoop.google.cloud.auth.service.account.enable":"true", "spark.jars":"oci://demo-ds-conda-env@osnamespace/gcp_utility/BigQuery/bigquery_spark-bigquery-with-dependencies_2.12-0.23.2.jar" } } command = f'\'{json.dumps(command)}\'' print("command",command) #enableHiveSupport() %create_session -l python -c $command -
匯入必要的模組。
%%spark #Import required libraries. import json import os import sys import datetime import oci import google.cloud.bigquery as bigquery import google.cloud import pyspark.sql from pyspark.sql.functions import countDistinct -
閱讀 Google Cloud BigQuery 表格。
範例程式碼 1 :
%%spark # Read from BigQuery : "bitcoin_blockchain.transactions". i.e. At Source "BigQuery" #Number of rows : 340,311,544 #Total logical bytes : 587.14 GB df_bitcoin_blockchain = spark.read.format('bigquery').option('project','bigquery-public-data').option('parentProject','core-invention-366213').option("credentialsFile","/opt/spark/work-dir/ocigcp_user_creds.json").option('table','bitcoin_blockchain.transactions').load() print("Total Records Count bitcoin_blockchain.transactions : ",df.count())範例程式碼 2 :
%%spark #Read another BigQuery Table df_RetailPOS_15min = spark.read.format('bigquery').option('project','core-invention-366213').option('parentProject','core-invention-366213').option("credentialsFile","/opt/spark/work-dir/ocigcp_user_creds.json").option('table','Retail_Channel.RetailPOS_15min').load() df_RetailPOS_15min.show() -
將資料載入物件儲存體。
%%spark #Write in Object Storage df_RetailPOS_15min.write.format("json").option("mode","overwrite").save("oci://ds-Raw@osnamespace/TargetData/bitcoin_blockchain_transactions") -
使用公事包密碼將資料載入 Autonomous Data Warehouse。
%%spark print("Set Parameters for ADW connectivity.") USERNAME = "admin" PASSWORD = "xxxxx" connectionId= "demolakehouseadw_medium" walletUri = "oci://demo-ds-conda-env@osnamespace/oci_utility/Wallet_DemoLakeHouseADW.zip" properties = {"connectionId": connectionId,"user" : USERNAME,"password": PASSWORD,"walletUri": walletUri} print("properties:",properties) %%spark #Load into ADW: TARGET_TABLE = "ADMIN.RETAILPOS_15MINUTES" print("TARGET_TABLE : ",TARGET_TABLE) # Write to ADW. df_RetailPOS_15min.write.format("oracle").mode("append").option("dbtable",TARGET_TABLE).options(**properties).save() print("Writing completed to ADW.....")
接下來的步驟
-
透過 OCI 資料流程,您可以設定 OCI 資料科學記事本,以互動方式針對資料流程執行應用程式。觀看教學影片,瞭解搭配 Data Flow Studio 使用資料科學。
-
如需有關整合資料科學與資料流程的詳細資訊,請參閱 Oracle Accelerated Data Science SDK 文件。
-
若要立即開始使用,請註冊 Oracle Cloud 免費試用或登入您的帳戶以試用 OCI 資料流程。嘗試使用資料流程的不需安裝 15 分鐘教學課程,即可瞭解 Oracle Cloud Infrastructure 如何輕鬆處理 Spark。
相關連結
確認
作者 - Kumar Chandragupta (OCI 資深雲端工程師)
其他學習資源
探索 docs.oracle.com/learn 的其他實驗室,或者存取更多 Oracle Learning YouTube 頻道上的免費學習內容。此外,請瀏覽 education.oracle.com/learning-explorer 以成為 Oracle Learning 檔案總管。
如需產品文件,請造訪 Oracle Help Center 。
Connect, access and analyze Google Cloud BigQuery data from OCI Data Flow using Apache Spark
F80029-01
April 2023
Copyright © 2023, Oracle and/or its affiliates.