ノート:

Apache Sparkを使用したOCIデータ・フローからのGoogle CloudのBigQueryデータの接続、アクセスおよび分析

イントロダクション

OCIでマルチクラウド接続を確立

Oracle Cloud Infrastructureの人気が急速に拡大する中、多くの顧客はOCIに移行するか、OCIをマルチクラウド・ソリューションとして使用したいと考えています。同様に、多くのお客様は、OCIから他のクラウド・データ・プラットフォームにアクセスし、ビッグ・データ・ソリューションを処理する際の処理/計算にOCIを使用することを希望しています。

目標

このチュートリアルでは、OCIデータ・フローSparkノートブックからGoogle Cloud BigQueryを接続し、Sparkを使用してBigQuery表でいくつかの読取り操作を実行する方法を示します。結果のSparkデータフレームをOCI Object StorageおよびAutonomous Data Warehouseに書き込む方法についても説明します。

OCIデータ・サイエンス・ノートブック・セッション詳細

ソリューション

このソリューションは、並列処理やメモリー計算での分散などのApache Spark機能を利用します。OCIデータ・フロー・アプリケーションは、OCIデータ統合サービスを介してスケジュール/調整することもできます。このアプローチでは、ユーザーはOCIデータ・フローおよびそれ自体がOCIデータ・フローSparkクラスタを利用する対話型ノートブックでSparkスクリプトを開発できます。上位レベルのステップは次のとおりです。

  1. Google Cloud Platform: Apache Spark BigQuery Connectorを使用してGoogle Cloud BigQueryに接続します。
  2. 完全なETLソリューションを開発します。
  3. Google Cloud BigQueryからデータを抽出します。
  4. OCIデータ・フローでApache Sparkクラスタを使用してデータを変換します。
  5. OCI Object StorageまたはAutonomous Data Warehouseにデータを取り込みます。
  6. Developerの使いやすい対話型Sparkノートブックを使用します。
  7. サポートされている任意のオープン・ソースSparkパッケージを統合します。
  8. OCI Data Integration Serviceを使用してスクリプトを調整します。

前提条件

  1. ポータルにアクセスできるアクティブなOCIおよびGoogle Cloudサブスクリプション。

  2. OCIデータ・フロー、OCIオブジェクト・ストレージ・バケットおよびOCIデータ・サイエンス・ノートブックを設定します。詳細は次を参照してください。

  3. BigQueryデータベースがGoogle Cloudに存在するプロジェクトのGoogle API JSONキー・シークレットOCIDを作成してダウンロードします。

  4. Google API JSONキー・シークレットOCIDをOCI Object Storageにアップロードします。

    • サンプルのOCIオブジェクト・ストレージ: OCI://demo-bucketname@OSnamespace/gcp_utility/BigQuery/ocigcp_user_creds.json
  5. Spark BigQuery Jarをダウンロードし、オブジェクト・ストレージにアップロードします。

  6. Google Cloud BigQuery表の次のパラメータを収集します。

    'project' : 'bigquery-public-data'
    'parentProject' : 'core-invention-366213'
    'table' : 'bitcoin_blockchain.transactions'
    "credentialsFile" : "./ocigcp_user_creds.json"
    
  7. OCIポータルからAutonomous Data Warehouse Walletをダウンロードし、ユーザー/パスワードの詳細を簡単に保持します。

タスク1: OCIデータ・フローでのOCIデータ・サイエンス・ノートブックを使用したGoogle Cloud BigQueryへのアクセス

  1. OCIデータ・サイエンス・セッションを開きます。このセッションでは、OCIデータ・フロー用のConda環境がすでに作成されています。前提条件ポイント2を参照してください。

  2. データ・フローがカーネルである新規ノートブックを開きます。

  3. OCIデータ・フローのLivyセッションを作成し、Google Cloud BigQueryなどの他の必要な情報を指定します。

    spark.archives : oci://demo-ds-conda-env@OSNameSpace/conda_environments/cpu/PySpark 3.2 and Data Flow/1.0/pyspark32_p38_cpu_v1#conda
    spark.files : oci://demo-ds-conda-env@OSNameSpace/gcp_utility/BigQuery/ocigcp_user_creds.json
    spark.jars : oci://demo-ds-conda-env@OSNameSpace/gcp_utility/BigQuery/bigquery_spark-bigquery-with-dependencies_2.12-0.23.2.jar
    spark.oracle.datasource.enabled : true
    

    OCIデータ・フローのLivyセッションを使用してSparkSessionを作成するためのサンプル・コード:

    import json
    command = {
    "compartmentId": "ocid1.compartment.oc1..xxxxxxx",
    "displayName": "Demo_BigQuery2ADW_v1",
    "sparkVersion": "3.2.1",
    "driverShape": "VM.Standard.E3.Flex",
    "executorShape": "VM.Standard.E3.Flex",
    "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
    "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
    "numExecutors": 10,
    "configuration":{"spark.archives":"oci://demo-ds-conda-env@osnamespace/conda_environments/cpu/PySpark 3.2 and Data Flow/1.0/pyspark32_p38_cpu_v1#conda",
    "spark.files":"oci://demo-ds-conda-env@osnamespace/gcp_utility/BigQuery/ocigcp_user_creds.json",
    "spark.oracle.datasource.enabled":"true",
    "spark.hadoop.google.cloud.auth.service.account.enable":"true",
    "spark.jars":"oci://demo-ds-conda-env@osnamespace/gcp_utility/BigQuery/bigquery_spark-bigquery-with-dependencies_2.12-0.23.2.jar"
    }
    }
    command = f'\'{json.dumps(command)}\''
    print("command",command)
    
    #enableHiveSupport()
    %create_session -l python -c $command
    
    
  4. 必要なモジュールをインポートします。

    %%spark
    
    #Import required libraries.
    import json
    import os
    import sys
    import datetime
    import oci
    
    import google.cloud.bigquery as bigquery
    import google.cloud
    
    import pyspark.sql
    from pyspark.sql.functions import countDistinct
    
    
  5. Google CloudのBigQuery表を参照してください。

    サンプル・コード 1:

    
    %%spark
    
    # Read from BigQuery : "bitcoin_blockchain.transactions". i.e. At Source "BigQuery"
    #Number of rows : 340,311,544
    #Total logical bytes : 587.14 GB
    
    df_bitcoin_blockchain = spark.read.format('bigquery').option('project','bigquery-public-data').option('parentProject','core-invention-366213').option("credentialsFile","/opt/spark/work-dir/ocigcp_user_creds.json").option('table','bitcoin_blockchain.transactions').load()
    
    print("Total Records Count bitcoin_blockchain.transactions : ",df.count())
    

    サンプル・コード 2:

    %%spark
    #Read another BigQuery Table
    
    df_RetailPOS_15min = spark.read.format('bigquery').option('project','core-invention-366213').option('parentProject','core-invention-366213').option("credentialsFile","/opt/spark/work-dir/ocigcp_user_creds.json").option('table','Retail_Channel.RetailPOS_15min').load()
    
    df_RetailPOS_15min.show()
    
  6. データをObject Storageにロードします。

    %%spark
    #Write in Object Storage
    
    df_RetailPOS_15min.write.format("json").option("mode","overwrite").save("oci://ds-Raw@osnamespace/TargetData/bitcoin_blockchain_transactions")
    
    
  7. Walletパスワードを使用してAutonomous Data Warehouseにデータをロードします。

    %%spark
    
    print("Set Parameters for ADW connectivity.")
    
    USERNAME = "admin"
    PASSWORD = "xxxxx"
    connectionId= "demolakehouseadw_medium"
    walletUri = "oci://demo-ds-conda-env@osnamespace/oci_utility/Wallet_DemoLakeHouseADW.zip"
    
    properties = {"connectionId": connectionId,"user" : USERNAME,"password": PASSWORD,"walletUri": walletUri}
    print("properties:",properties)
    
    
    %%spark
    #Load into ADW:
    TARGET_TABLE = "ADMIN.RETAILPOS_15MINUTES"
    print("TARGET_TABLE : ",TARGET_TABLE)
    
    # Write to ADW.
    df_RetailPOS_15min.write.format("oracle").mode("append").option("dbtable",TARGET_TABLE).options(**properties).save()
    
    print("Writing completed to ADW.....")
    

次のステップ

謝辞

作成者 - Kumar Chandragupta (OCIシニア・クラウド・エンジニア)

その他の学習リソース

docs.oracle.com/learnで他のラボをご覧いただくか、Oracle Learning YouTubeチャネルでより無料のラーニング・コンテンツにアクセスしてください。また、education.oracle.com/learning-explorerにアクセスして、Oracle Learning Explorerになります。

製品ドキュメントについては、Oracle Help Centerを参照してください。