ノート:
- このチュートリアルではOracle Cloudへのアクセスが必要です。無料アカウントにサインアップするには、Oracle Cloud Infrastructure Free Tierの開始を参照してください。
- Oracle Cloud Infrastructure資格証明、テナンシおよびコンパートメントの値の例を使用します。演習を完了する場合は、これらの値をクラウド環境に固有の値に置き換えてください。
Apache Sparkを使用したOCIデータ・フローからのGoogle CloudのBigQueryデータの接続、アクセスおよび分析
イントロダクション
OCIでマルチクラウド接続を確立
Oracle Cloud Infrastructureの人気が急速に拡大する中、多くの顧客はOCIに移行するか、OCIをマルチクラウド・ソリューションとして使用したいと考えています。同様に、多くのお客様は、OCIから他のクラウド・データ・プラットフォームにアクセスし、ビッグ・データ・ソリューションを処理する際の処理/計算にOCIを使用することを希望しています。
目標
このチュートリアルでは、OCIデータ・フローSparkノートブックからGoogle Cloud BigQueryを接続し、Sparkを使用してBigQuery表でいくつかの読取り操作を実行する方法を示します。結果のSparkデータフレームをOCI Object StorageおよびAutonomous Data Warehouseに書き込む方法についても説明します。
ソリューション
このソリューションは、並列処理やメモリー計算での分散などのApache Spark機能を利用します。OCIデータ・フロー・アプリケーションは、OCIデータ統合サービスを介してスケジュール/調整することもできます。このアプローチでは、ユーザーはOCIデータ・フローおよびそれ自体がOCIデータ・フローSparkクラスタを利用する対話型ノートブックでSparkスクリプトを開発できます。上位レベルのステップは次のとおりです。
- Google Cloud Platform: Apache Spark BigQuery Connectorを使用してGoogle Cloud BigQueryに接続します。
- 完全なETLソリューションを開発します。
- Google Cloud BigQueryからデータを抽出します。
- OCIデータ・フローでApache Sparkクラスタを使用してデータを変換します。
- OCI Object StorageまたはAutonomous Data Warehouseにデータを取り込みます。
- Developerの使いやすい対話型Sparkノートブックを使用します。
- サポートされている任意のオープン・ソースSparkパッケージを統合します。
- OCI Data Integration Serviceを使用してスクリプトを調整します。
前提条件
-
ポータルにアクセスできるアクティブなOCIおよびGoogle Cloudサブスクリプション。
-
OCIデータ・フロー、OCIオブジェクト・ストレージ・バケットおよびOCIデータ・サイエンス・ノートブックを設定します。詳細は次を参照してください。
-
BigQueryデータベースがGoogle Cloudに存在するプロジェクトのGoogle API JSONキー・シークレットOCIDを作成してダウンロードします。
-
Google API JSONキー・シークレットOCIDをOCI Object Storageにアップロードします。
- サンプルのOCIオブジェクト・ストレージ:
OCI://demo-bucketname@OSnamespace/gcp_utility/BigQuery/ocigcp_user_creds.json
- サンプルのOCIオブジェクト・ストレージ:
-
Spark BigQuery Jarをダウンロードし、オブジェクト・ストレージにアップロードします。
-
サンプル:
spark-bigquery-with-dependencies_2.12-0.23.2.jar
-
-
Google Cloud BigQuery表の次のパラメータを収集します。
'project' : 'bigquery-public-data' 'parentProject' : 'core-invention-366213' 'table' : 'bitcoin_blockchain.transactions' "credentialsFile" : "./ocigcp_user_creds.json"
-
OCIポータルからAutonomous Data Warehouse Walletをダウンロードし、ユーザー/パスワードの詳細を簡単に保持します。
タスク1: OCIデータ・フローでのOCIデータ・サイエンス・ノートブックを使用したGoogle Cloud BigQueryへのアクセス
-
OCIデータ・サイエンス・セッションを開きます。このセッションでは、OCIデータ・フロー用のConda環境がすでに作成されています。前提条件ポイント2を参照してください。
-
データ・フローがカーネルである新規ノートブックを開きます。
-
OCIデータ・フローのLivyセッションを作成し、Google Cloud BigQueryなどの他の必要な情報を指定します。
spark.archives : oci://demo-ds-conda-env@OSNameSpace/conda_environments/cpu/PySpark 3.2 and Data Flow/1.0/pyspark32_p38_cpu_v1#conda spark.files : oci://demo-ds-conda-env@OSNameSpace/gcp_utility/BigQuery/ocigcp_user_creds.json spark.jars : oci://demo-ds-conda-env@OSNameSpace/gcp_utility/BigQuery/bigquery_spark-bigquery-with-dependencies_2.12-0.23.2.jar spark.oracle.datasource.enabled : true
OCIデータ・フローのLivyセッションを使用してSparkSessionを作成するためのサンプル・コード:
import json command = { "compartmentId": "ocid1.compartment.oc1..xxxxxxx", "displayName": "Demo_BigQuery2ADW_v1", "sparkVersion": "3.2.1", "driverShape": "VM.Standard.E3.Flex", "executorShape": "VM.Standard.E3.Flex", "driverShapeConfig":{"ocpus":1,"memoryInGBs":16}, "executorShapeConfig":{"ocpus":1,"memoryInGBs":16}, "numExecutors": 10, "configuration":{"spark.archives":"oci://demo-ds-conda-env@osnamespace/conda_environments/cpu/PySpark 3.2 and Data Flow/1.0/pyspark32_p38_cpu_v1#conda", "spark.files":"oci://demo-ds-conda-env@osnamespace/gcp_utility/BigQuery/ocigcp_user_creds.json", "spark.oracle.datasource.enabled":"true", "spark.hadoop.google.cloud.auth.service.account.enable":"true", "spark.jars":"oci://demo-ds-conda-env@osnamespace/gcp_utility/BigQuery/bigquery_spark-bigquery-with-dependencies_2.12-0.23.2.jar" } } command = f'\'{json.dumps(command)}\'' print("command",command) #enableHiveSupport() %create_session -l python -c $command
-
必要なモジュールをインポートします。
%%spark #Import required libraries. import json import os import sys import datetime import oci import google.cloud.bigquery as bigquery import google.cloud import pyspark.sql from pyspark.sql.functions import countDistinct
-
Google CloudのBigQuery表を参照してください。
サンプル・コード 1:
%%spark # Read from BigQuery : "bitcoin_blockchain.transactions". i.e. At Source "BigQuery" #Number of rows : 340,311,544 #Total logical bytes : 587.14 GB df_bitcoin_blockchain = spark.read.format('bigquery').option('project','bigquery-public-data').option('parentProject','core-invention-366213').option("credentialsFile","/opt/spark/work-dir/ocigcp_user_creds.json").option('table','bitcoin_blockchain.transactions').load() print("Total Records Count bitcoin_blockchain.transactions : ",df.count())
サンプル・コード 2:
%%spark #Read another BigQuery Table df_RetailPOS_15min = spark.read.format('bigquery').option('project','core-invention-366213').option('parentProject','core-invention-366213').option("credentialsFile","/opt/spark/work-dir/ocigcp_user_creds.json").option('table','Retail_Channel.RetailPOS_15min').load() df_RetailPOS_15min.show()
-
データをObject Storageにロードします。
%%spark #Write in Object Storage df_RetailPOS_15min.write.format("json").option("mode","overwrite").save("oci://ds-Raw@osnamespace/TargetData/bitcoin_blockchain_transactions")
-
Walletパスワードを使用してAutonomous Data Warehouseにデータをロードします。
%%spark print("Set Parameters for ADW connectivity.") USERNAME = "admin" PASSWORD = "xxxxx" connectionId= "demolakehouseadw_medium" walletUri = "oci://demo-ds-conda-env@osnamespace/oci_utility/Wallet_DemoLakeHouseADW.zip" properties = {"connectionId": connectionId,"user" : USERNAME,"password": PASSWORD,"walletUri": walletUri} print("properties:",properties) %%spark #Load into ADW: TARGET_TABLE = "ADMIN.RETAILPOS_15MINUTES" print("TARGET_TABLE : ",TARGET_TABLE) # Write to ADW. df_RetailPOS_15min.write.format("oracle").mode("append").option("dbtable",TARGET_TABLE).options(**properties).save() print("Writing completed to ADW.....")
次のステップ
-
OCIデータ・フローを使用すると、データ・フローに対して対話形式でアプリケーションを実行するようにOCIデータ・サイエンス・ノートブックを構成できます。Data Flow Studioでのデータ・サイエンスの使用に関するチュートリアル・ビデオをご覧ください。
-
データ・サイエンスとデータ・フローの統合の詳細は、Oracle Accelerated Data Science SDKドキュメンテーションを参照してください。
-
その他の例は、GitHubからデータ・フロー・サンプルおよびデータ・サイエンス・サンプルとともに使用できます。
-
今日から開始するには、Oracle Cloud無料トライアルにサインアップするか、アカウントにサインインしてOCIデータ・フローを試してください。データ・フローのインストール不要の15分間のチュートリアルを試して、Oracle Cloud InfrastructureでSparkの処理がどの程度簡単であるかを確認してください。
関連リンク
謝辞
作成者 - Kumar Chandragupta (OCIシニア・クラウド・エンジニア)
その他の学習リソース
docs.oracle.com/learnで他のラボをご覧いただくか、Oracle Learning YouTubeチャネルでより無料のラーニング・コンテンツにアクセスしてください。また、education.oracle.com/learning-explorerにアクセスして、Oracle Learning Explorerになります。
製品ドキュメントについては、Oracle Help Centerを参照してください。
Connect, access and analyze Google Cloud BigQuery data from OCI Data Flow using Apache Spark
F80029-01
April 2023
Copyright © 2023, Oracle and/or its affiliates.