Note:
- This tutorial requires access to Oracle Cloud. To sign up for a free account, see Get started with Oracle Cloud Infrastructure Free Tier.
- It uses example values for Oracle Cloud Infrastructure credentials, tenancy, and compartments. When completing your lab, substitute these values with ones specific to your cloud environment.
Connect, access and analyze Google Cloud BigQuery data from OCI Data Flow using Apache Spark
Introduction
Establish multicloud connectivity with OCI
With the rapidly growing popularity of Oracle Cloud Infrastructure, many customers want to migrate to OCI or use OCI as their multicloud solution. Likewise, many customers want to access other Cloud Data platforms from OCI and use OCI for processing/computing when dealing with Big Data solutions.
Objective
This tutorial will demonstrate how to connect Google Cloud BigQuery from OCI Data Flow Spark Notebook and perform some read operation on BigQuery table using Spark. We will also cover how to write down the resultant Spark dataframe to OCI Object Storage and Autonomous Data Warehouse.
Solution
This solution will leverage Apache Spark capability like parallel processing and distributed in memory computation. OCI Data Flow application also can be scheduled/orchestrated through OCI Data Integration Service. In this approach, user can develop their Spark Script on OCI Data Flow and Interactive Notebook which itself leverages OCI Data Flow Spark cluster. The high level steps are:
- Connect with Google Cloud Platform: Google Cloud BigQuery using Apache Spark BigQuery Connector.
- Develop complete ETL Solution.
- Extract Data from Google Cloud BigQuery.
- Transform the data using Apache Spark Cluster on OCI Data Flow.
- Ingest data in OCI Object Storage or Autonomous Data Warehouse.
- Use Developer’s friendly Interactive Spark Notebook.
- Integrate any supported open source Spark packages.
- Orchestrate your script using OCI Data Integration Service.
Prerequisites
-
An active OCI and Google Cloud subscription with access to the portal.
-
Setup OCI Data Flow, OCI Object Storage Bucket and OCI Data Science Notebook. For more information, see:
-
Create and download Google API JSON Key secret OCID for the project where BigQuery database is residing on Google Cloud.
-
Upload the Google API JSON Key secret OCID to OCI Object Storage.
- Sample OCI Object Storage:
oci://demo-bucketname@OSnamespace/gcp_utility/BigQuery/ocigcp_user_creds.json
- Sample OCI Object Storage:
-
Download Spark BigQuery Jar and upload it to Object Storage.
-
Sample:
spark-bigquery-with-dependencies_2.12-0.23.2.jar
-
-
Collect the following parameters for your Google Cloud BigQuery table.
'project' : 'bigquery-public-data' 'parentProject' : 'core-invention-366213' 'table' : 'bitcoin_blockchain.transactions' "credentialsFile" : "./ocigcp_user_creds.json"
-
Download Autonomous Data Warehouse Wallet from the OCI Portal and keep the User/Password details handy.
Task 1: Access Google Cloud BigQuery Using OCI Data Science Notebook with OCI Data Flow
-
Open OCI Data Science Session, where you have already created Conda environment for OCI Data Flow. See Prerequisite Point 2.
-
Open the New Notebook with Data Flow as Kernel.
-
Create a Livy session for OCI Data Flow and provide other required information including Google Cloud BigQuery.
spark.archives : oci://demo-ds-conda-env@OSNameSpace/conda_environments/cpu/PySpark 3.2 and Data Flow/1.0/pyspark32_p38_cpu_v1#conda spark.files : oci://demo-ds-conda-env@OSNameSpace/gcp_utility/BigQuery/ocigcp_user_creds.json spark.jars : oci://demo-ds-conda-env@OSNameSpace/gcp_utility/BigQuery/bigquery_spark-bigquery-with-dependencies_2.12-0.23.2.jar spark.oracle.datasource.enabled : true
Sample Code to create SparkSession with Livy Session for OCI Data Flow:
import json command = { "compartmentId": "ocid1.compartment.oc1..xxxxxxx", "displayName": "Demo_BigQuery2ADW_v1", "sparkVersion": "3.2.1", "driverShape": "VM.Standard.E3.Flex", "executorShape": "VM.Standard.E3.Flex", "driverShapeConfig":{"ocpus":1,"memoryInGBs":16}, "executorShapeConfig":{"ocpus":1,"memoryInGBs":16}, "numExecutors": 10, "configuration":{"spark.archives":"oci://demo-ds-conda-env@osnamespace/conda_environments/cpu/PySpark 3.2 and Data Flow/1.0/pyspark32_p38_cpu_v1#conda", "spark.files":"oci://demo-ds-conda-env@osnamespace/gcp_utility/BigQuery/ocigcp_user_creds.json", "spark.oracle.datasource.enabled":"true", "spark.hadoop.google.cloud.auth.service.account.enable":"true", "spark.jars":"oci://demo-ds-conda-env@osnamespace/gcp_utility/BigQuery/bigquery_spark-bigquery-with-dependencies_2.12-0.23.2.jar" } } command = f'\'{json.dumps(command)}\'' print("command",command) #enableHiveSupport() %create_session -l python -c $command
-
Import the required modules.
%%spark #Import required libraries. import json import os import sys import datetime import oci import google.cloud.bigquery as bigquery import google.cloud import pyspark.sql from pyspark.sql.functions import countDistinct
-
Read Google Cloud BigQuery table.
Sample Code 1:
%%spark # Read from BigQuery : "bitcoin_blockchain.transactions". i.e. At Source "BigQuery" #Number of rows : 340,311,544 #Total logical bytes : 587.14 GB df_bitcoin_blockchain = spark.read.format('bigquery').option('project','bigquery-public-data').option('parentProject','core-invention-366213').option("credentialsFile","/opt/spark/work-dir/ocigcp_user_creds.json").option('table','bitcoin_blockchain.transactions').load() print("Total Records Count bitcoin_blockchain.transactions : ",df.count())
Sample Code 2:
%%spark #Read another BigQuery Table df_RetailPOS_15min = spark.read.format('bigquery').option('project','core-invention-366213').option('parentProject','core-invention-366213').option("credentialsFile","/opt/spark/work-dir/ocigcp_user_creds.json").option('table','Retail_Channel.RetailPOS_15min').load() df_RetailPOS_15min.show()
-
Load data into Object Storage.
%%spark #Write in Object Storage df_RetailPOS_15min.write.format("json").option("mode","overwrite").save("oci://ds-Raw@osnamespace/TargetData/bitcoin_blockchain_transactions")
-
Load data into Autonomous Data Warehouse using Wallet password.
%%spark print("Set Parameters for ADW connectivity.") USERNAME = "admin" PASSWORD = "xxxxx" connectionId= "demolakehouseadw_medium" walletUri = "oci://demo-ds-conda-env@osnamespace/oci_utility/Wallet_DemoLakeHouseADW.zip" properties = {"connectionId": connectionId,"user" : USERNAME,"password": PASSWORD,"walletUri": walletUri} print("properties:",properties) %%spark #Load into ADW: TARGET_TABLE = "ADMIN.RETAILPOS_15MINUTES" print("TARGET_TABLE : ",TARGET_TABLE) # Write to ADW. df_RetailPOS_15min.write.format("oracle").mode("append").option("dbtable",TARGET_TABLE).options(**properties).save() print("Writing completed to ADW.....")
Next Steps
-
With OCI Data Flow, you can configure OCI Data Science notebooks to run applications interactively against Data Flow. Watch the tutorial video on using Data Science with Data Flow Studio.
-
For more information about integrating Data Science and Data Flow, see the Oracle Accelerated Data Science SDK documentation.
-
More examples are available from GitHub with Data Flow samples and Data Science samples.
-
To get started today, sign up for the Oracle Cloud Free Trial or sign in to your account to try OCI Data Flow. Try Data Flow’s 15-minute no-installation-required tutorial to see just how easy Spark processing can be with Oracle Cloud Infrastructure.
Related links
Acknowledgments
Author - Kumar Chandragupta (OCI Senior Cloud Engineer)
More Learning Resources
Explore other labs on docs.oracle.com/learn or access more free learning content on the Oracle Learning YouTube channel. Additionally, visit education.oracle.com/learning-explorer to become an Oracle Learning Explorer.
For product documentation, visit Oracle Help Center.
Connect, access and analyze Google Cloud BigQuery data from OCI Data Flow using Apache Spark
F80029-01
April 2023
Copyright © 2023, Oracle and/or its affiliates.