注:
- 此教程需要访问 Oracle Cloud。要注册免费账户,请参阅开始使用 Oracle Cloud Infrastructure Free Tier 。
- 它使用 Oracle Cloud Infrastructure 身份证明、租户和区间示例值。完成实验室时,请将这些值替换为特定于云环境的那些值。
使用 Apache Spark 从 OCI 数据流连接、访问和分析 Google Cloud BigQuery 数据
简介
与 OCI 建立多云连接
随着 Oracle Cloud Infrastructure 的日益普及,许多客户希望迁移到 OCI 或将 OCI 用作多云解决方案。同样,许多客户希望从 OCI 访问其他云数据平台,并在处理大数据解决方案时使用 OCI 进行处理/计算。
目标
本教程将演示如何从 OCI 数据流 Spark Notebook 连接 Google Cloud BigQuery 以及如何使用 Spark 对 BigQuery 表执行一些读取操作。此外,我们还将介绍如何将结果的 Spark 数据框架写入 OCI 对象存储和 Autonomous Data Warehouse。

解决方案
此解决方案将利用并行处理和分布在内存计算中的 Apache Spark 功能。OCI 数据流应用程序也可以通过 OCI 数据集成服务进行调度/编排。在此方法中,用户可以在 OCI 数据流和交互式记事本上开发 Spark 脚本,该脚本本身利用 OCI 数据流 Spark 集群。高级别步骤包括:
- 使用 Apache Spark BigQuery Connector 与 Google 云平台连接:Google Cloud BigQuery。
- 开发完整的 ETL 解决方案。
- 从 Google Cloud BigQuery 提取数据。
- 使用 OCI 数据流上的 Apache Spark 集群转换数据。
- 在 OCI 对象存储或 Autonomous Data Warehouse 中摄取数据。
- 使用开发人员友好的交互式 Spark 笔记本。
- 集成任何受支持的开源 Spark 软件包。
- 使用 OCI 数据集成服务编排脚本。
先决条件
-
可访问门户的主动 OCI 和 Google 云订阅。
-
设置 OCI 数据流、OCI 对象存储桶和 OCI 数据科学手册。有关详细信息,请参阅:
-
为 BigQuery 数据库驻留在 Google Cloud 上的项目创建和下载 Google API JSON 密钥 OCID 。
-
将 Google API JSON 密钥密钥 OCID 上载到 OCI 对象存储。
- OCI 对象存储样例:
oci://demo-bucketname@OSnamespace/gcp_utility/BigQuery/ocigcp_user_creds.json
- OCI 对象存储样例:
-
下载 Spark BigQuery Jar 并将其上载到对象存储。
-
示例:
spark-bigquery-with-dependencies_2.12-0.23.2.jar
-
-
为您的 Google Cloud BigQuery 表收集以下参数。
'project' : 'bigquery-public-data' 'parentProject' : 'core-invention-366213' 'table' : 'bitcoin_blockchain.transactions' "credentialsFile" : "./ocigcp_user_creds.json" -
从 OCI 门户下载 Autonomous Data Warehouse Wallet,并保持用户/密码详细信息方便。
任务 1:使用 OCI 数据科学手册与 OCI 数据流访问 Google Cloud BigQuery
-
打开 OCI 数据科学会话,您已在其中为 OCI 数据流创建 Conda 环境。请参见先决条件点 2。
-
打开将数据流作为内核的新记事本。
-
为 OCI 数据流创建 Livy 会话并提供其他必需的信息,包括 Google Cloud BigQuery。
spark.archives : oci://demo-ds-conda-env@OSNameSpace/conda_environments/cpu/PySpark 3.2 and Data Flow/1.0/pyspark32_p38_cpu_v1#conda spark.files : oci://demo-ds-conda-env@OSNameSpace/gcp_utility/BigQuery/ocigcp_user_creds.json spark.jars : oci://demo-ds-conda-env@OSNameSpace/gcp_utility/BigQuery/bigquery_spark-bigquery-with-dependencies_2.12-0.23.2.jar spark.oracle.datasource.enabled : true使用 Livy 会话为 OCI 数据流创建 SparkSession 的示例代码:
import json command = { "compartmentId": "ocid1.compartment.oc1..xxxxxxx", "displayName": "Demo_BigQuery2ADW_v1", "sparkVersion": "3.2.1", "driverShape": "VM.Standard.E3.Flex", "executorShape": "VM.Standard.E3.Flex", "driverShapeConfig":{"ocpus":1,"memoryInGBs":16}, "executorShapeConfig":{"ocpus":1,"memoryInGBs":16}, "numExecutors": 10, "configuration":{"spark.archives":"oci://demo-ds-conda-env@osnamespace/conda_environments/cpu/PySpark 3.2 and Data Flow/1.0/pyspark32_p38_cpu_v1#conda", "spark.files":"oci://demo-ds-conda-env@osnamespace/gcp_utility/BigQuery/ocigcp_user_creds.json", "spark.oracle.datasource.enabled":"true", "spark.hadoop.google.cloud.auth.service.account.enable":"true", "spark.jars":"oci://demo-ds-conda-env@osnamespace/gcp_utility/BigQuery/bigquery_spark-bigquery-with-dependencies_2.12-0.23.2.jar" } } command = f'\'{json.dumps(command)}\'' print("command",command) #enableHiveSupport() %create_session -l python -c $command -
导入所需的模块。
%%spark #Import required libraries. import json import os import sys import datetime import oci import google.cloud.bigquery as bigquery import google.cloud import pyspark.sql from pyspark.sql.functions import countDistinct -
阅读 Google Cloud BigQuery 表。
示例代码 1 :
%%spark # Read from BigQuery : "bitcoin_blockchain.transactions". i.e. At Source "BigQuery" #Number of rows : 340,311,544 #Total logical bytes : 587.14 GB df_bitcoin_blockchain = spark.read.format('bigquery').option('project','bigquery-public-data').option('parentProject','core-invention-366213').option("credentialsFile","/opt/spark/work-dir/ocigcp_user_creds.json").option('table','bitcoin_blockchain.transactions').load() print("Total Records Count bitcoin_blockchain.transactions : ",df.count())示例代码 2 :
%%spark #Read another BigQuery Table df_RetailPOS_15min = spark.read.format('bigquery').option('project','core-invention-366213').option('parentProject','core-invention-366213').option("credentialsFile","/opt/spark/work-dir/ocigcp_user_creds.json").option('table','Retail_Channel.RetailPOS_15min').load() df_RetailPOS_15min.show() -
将数据加载到对象存储。
%%spark #Write in Object Storage df_RetailPOS_15min.write.format("json").option("mode","overwrite").save("oci://ds-Raw@osnamespace/TargetData/bitcoin_blockchain_transactions") -
使用 Wallet 密码将数据加载到 Autonomous Data Warehouse 中。
%%spark print("Set Parameters for ADW connectivity.") USERNAME = "admin" PASSWORD = "xxxxx" connectionId= "demolakehouseadw_medium" walletUri = "oci://demo-ds-conda-env@osnamespace/oci_utility/Wallet_DemoLakeHouseADW.zip" properties = {"connectionId": connectionId,"user" : USERNAME,"password": PASSWORD,"walletUri": walletUri} print("properties:",properties) %%spark #Load into ADW: TARGET_TABLE = "ADMIN.RETAILPOS_15MINUTES" print("TARGET_TABLE : ",TARGET_TABLE) # Write to ADW. df_RetailPOS_15min.write.format("oracle").mode("append").option("dbtable",TARGET_TABLE).options(**properties).save() print("Writing completed to ADW.....")
后续步骤
-
通过 OCI 数据流,您可以配置 OCI 数据科学记事本,以根据数据流以交互方式运行应用程序。观看教程视频,了解如何将数据科学与数据流 Studio 结合使用。
-
有关集成数据科学和数据流的详细信息,请参阅 Oracle Accelerated Data Science SDK 文档。
-
要立即开始使用,请注册 Oracle Cloud 免费试用或登录您的账户以尝试 OCI 数据流。试用数据流 15 分钟的无安装必备教程,了解 Oracle Cloud Infrastructure 可以轻松进行 Spark 处理。
相关链接
确认
作者 - Kumar Chandragupta(OCI 高级云工程师)
更多学习资源
探索 docs.oracle.com/learn 上的其他实验室,或者访问 Oracle Learning YouTube 频道上的更多免费学习内容。此外,请访问 education.oracle.com/learning-explorer 成为 Oracle Learning Explorer。
有关产品文档,请访问 Oracle 帮助中心。
Connect, access and analyze Google Cloud BigQuery data from OCI Data Flow using Apache Spark
F80029-01
April 2023
Copyright © 2023, Oracle and/or its affiliates.