注:

使用 Apache Spark 从 OCI 数据流连接、访问和分析 Google Cloud BigQuery 数据

简介

与 OCI 建立多云连接

随着 Oracle Cloud Infrastructure 的日益普及,许多客户希望迁移到 OCI 或将 OCI 用作多云解决方案。同样,许多客户希望从 OCI 访问其他云数据平台,并在处理大数据解决方案时使用 OCI 进行处理/计算。

目标

本教程将演示如何从 OCI 数据流 Spark Notebook 连接 Google Cloud BigQuery 以及如何使用 Spark 对 BigQuery 表执行一些读取操作。此外,我们还将介绍如何将结果的 Spark 数据框架写入 OCI 对象存储和 Autonomous Data Warehouse。

OCI 数据科学记事本会话详细信息

解决方案

此解决方案将利用并行处理和分布在内存计算中的 Apache Spark 功能。OCI 数据流应用程序也可以通过 OCI 数据集成服务进行调度/编排。在此方法中,用户可以在 OCI 数据流和交互式记事本上开发 Spark 脚本,该脚本本身利用 OCI 数据流 Spark 集群。高级别步骤包括:

  1. 使用 Apache Spark BigQuery Connector 与 Google 云平台连接:Google Cloud BigQuery。
  2. 开发完整的 ETL 解决方案。
  3. 从 Google Cloud BigQuery 提取数据。
  4. 使用 OCI 数据流上的 Apache Spark 集群转换数据。
  5. 在 OCI 对象存储或 Autonomous Data Warehouse 中摄取数据。
  6. 使用开发人员友好的交互式 Spark 笔记本。
  7. 集成任何受支持的开源 Spark 软件包。
  8. 使用 OCI 数据集成服务编排脚本。

先决条件

  1. 可访问门户的主动 OCI 和 Google 云订阅。

  2. 设置 OCI 数据流、OCI 对象存储桶和 OCI 数据科学手册。有关详细信息,请参阅:

  3. 为 BigQuery 数据库驻留在 Google Cloud 上的项目创建和下载 Google API JSON 密钥 OCID

  4. Google API JSON 密钥密钥 OCID 上载到 OCI 对象存储。

    • OCI 对象存储样例:oci://demo-bucketname@OSnamespace/gcp_utility/BigQuery/ocigcp_user_creds.json
  5. 下载 Spark BigQuery Jar 并将其上载到对象存储。

  6. 为您的 Google Cloud BigQuery 表收集以下参数。

    'project' : 'bigquery-public-data'
    'parentProject' : 'core-invention-366213'
    'table' : 'bitcoin_blockchain.transactions'
    "credentialsFile" : "./ocigcp_user_creds.json"
    
  7. 从 OCI 门户下载 Autonomous Data Warehouse Wallet,并保持用户/密码详细信息方便。

任务 1:使用 OCI 数据科学手册与 OCI 数据流访问 Google Cloud BigQuery

  1. 打开 OCI 数据科学会话,您已在其中为 OCI 数据流创建 Conda 环境。请参见先决条件点 2。

  2. 打开将数据流作为内核的新记事本。

  3. 为 OCI 数据流创建 Livy 会话并提供其他必需的信息,包括 Google Cloud BigQuery。

    spark.archives : oci://demo-ds-conda-env@OSNameSpace/conda_environments/cpu/PySpark 3.2 and Data Flow/1.0/pyspark32_p38_cpu_v1#conda
    spark.files : oci://demo-ds-conda-env@OSNameSpace/gcp_utility/BigQuery/ocigcp_user_creds.json
    spark.jars : oci://demo-ds-conda-env@OSNameSpace/gcp_utility/BigQuery/bigquery_spark-bigquery-with-dependencies_2.12-0.23.2.jar
    spark.oracle.datasource.enabled : true
    

    使用 Livy 会话为 OCI 数据流创建 SparkSession 的示例代码

    import json
    command = {
    "compartmentId": "ocid1.compartment.oc1..xxxxxxx",
    "displayName": "Demo_BigQuery2ADW_v1",
    "sparkVersion": "3.2.1",
    "driverShape": "VM.Standard.E3.Flex",
    "executorShape": "VM.Standard.E3.Flex",
    "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
    "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
    "numExecutors": 10,
    "configuration":{"spark.archives":"oci://demo-ds-conda-env@osnamespace/conda_environments/cpu/PySpark 3.2 and Data Flow/1.0/pyspark32_p38_cpu_v1#conda",
    "spark.files":"oci://demo-ds-conda-env@osnamespace/gcp_utility/BigQuery/ocigcp_user_creds.json",
    "spark.oracle.datasource.enabled":"true",
    "spark.hadoop.google.cloud.auth.service.account.enable":"true",
    "spark.jars":"oci://demo-ds-conda-env@osnamespace/gcp_utility/BigQuery/bigquery_spark-bigquery-with-dependencies_2.12-0.23.2.jar"
    }
    }
    command = f'\'{json.dumps(command)}\''
    print("command",command)
    
    #enableHiveSupport()
    %create_session -l python -c $command
    
    
  4. 导入所需的模块。

    %%spark
    
    #Import required libraries.
    import json
    import os
    import sys
    import datetime
    import oci
    
    import google.cloud.bigquery as bigquery
    import google.cloud
    
    import pyspark.sql
    from pyspark.sql.functions import countDistinct
    
    
  5. 阅读 Google Cloud BigQuery 表。

    示例代码 1

    
    %%spark
    
    # Read from BigQuery : "bitcoin_blockchain.transactions". i.e. At Source "BigQuery"
    #Number of rows : 340,311,544
    #Total logical bytes : 587.14 GB
    
    df_bitcoin_blockchain = spark.read.format('bigquery').option('project','bigquery-public-data').option('parentProject','core-invention-366213').option("credentialsFile","/opt/spark/work-dir/ocigcp_user_creds.json").option('table','bitcoin_blockchain.transactions').load()
    
    print("Total Records Count bitcoin_blockchain.transactions : ",df.count())
    

    示例代码 2

    %%spark
    #Read another BigQuery Table
    
    df_RetailPOS_15min = spark.read.format('bigquery').option('project','core-invention-366213').option('parentProject','core-invention-366213').option("credentialsFile","/opt/spark/work-dir/ocigcp_user_creds.json").option('table','Retail_Channel.RetailPOS_15min').load()
    
    df_RetailPOS_15min.show()
    
  6. 将数据加载到对象存储。

    %%spark
    #Write in Object Storage
    
    df_RetailPOS_15min.write.format("json").option("mode","overwrite").save("oci://ds-Raw@osnamespace/TargetData/bitcoin_blockchain_transactions")
    
    
  7. 使用 Wallet 密码将数据加载到 Autonomous Data Warehouse 中。

    %%spark
    
    print("Set Parameters for ADW connectivity.")
    
    USERNAME = "admin"
    PASSWORD = "xxxxx"
    connectionId= "demolakehouseadw_medium"
    walletUri = "oci://demo-ds-conda-env@osnamespace/oci_utility/Wallet_DemoLakeHouseADW.zip"
    
    properties = {"connectionId": connectionId,"user" : USERNAME,"password": PASSWORD,"walletUri": walletUri}
    print("properties:",properties)
    
    
    %%spark
    #Load into ADW:
    TARGET_TABLE = "ADMIN.RETAILPOS_15MINUTES"
    print("TARGET_TABLE : ",TARGET_TABLE)
    
    # Write to ADW.
    df_RetailPOS_15min.write.format("oracle").mode("append").option("dbtable",TARGET_TABLE).options(**properties).save()
    
    print("Writing completed to ADW.....")
    

后续步骤

确认

作者 - Kumar Chandragupta(OCI 高级云工程师)

更多学习资源

探索 docs.oracle.com/learn 上的其他实验室,或者访问 Oracle Learning YouTube 频道上的更多免费学习内容。此外,请访问 education.oracle.com/learning-explorer 成为 Oracle Learning Explorer。

有关产品文档,请访问 Oracle 帮助中心