Note:

Connect, access and analyze Google Cloud BigQuery data from OCI Data Flow using Apache Spark

Introduction

Establish multicloud connectivity with OCI

With the rapidly growing popularity of Oracle Cloud Infrastructure, many customers want to migrate to OCI or use OCI as their multicloud solution. Likewise, many customers want to access other Cloud Data platforms from OCI and use OCI for processing/computing when dealing with Big Data solutions.

Objective

This tutorial will demonstrate how to connect Google Cloud BigQuery from OCI Data Flow Spark Notebook and perform some read operation on BigQuery table using Spark. We will also cover how to write down the resultant Spark dataframe to OCI Object Storage and Autonomous Data Warehouse.

OCI Data Science Notebook Session Details

Solution

This solution will leverage Apache Spark capability like parallel processing and distributed in memory computation. OCI Data Flow application also can be scheduled/orchestrated through OCI Data Integration Service. In this approach, user can develop their Spark Script on OCI Data Flow and Interactive Notebook which itself leverages OCI Data Flow Spark cluster. The high level steps are:

  1. Connect with Google Cloud Platform: Google Cloud BigQuery using Apache Spark BigQuery Connector.
  2. Develop complete ETL Solution.
  3. Extract Data from Google Cloud BigQuery.
  4. Transform the data using Apache Spark Cluster on OCI Data Flow.
  5. Ingest data in OCI Object Storage or Autonomous Data Warehouse.
  6. Use Developer’s friendly Interactive Spark Notebook.
  7. Integrate any supported open source Spark packages.
  8. Orchestrate your script using OCI Data Integration Service.

Prerequisites

  1. An active OCI and Google Cloud subscription with access to the portal.

  2. Setup OCI Data Flow, OCI Object Storage Bucket and OCI Data Science Notebook. For more information, see:

  3. Create and download Google API JSON Key secret OCID for the project where BigQuery database is residing on Google Cloud.

  4. Upload the Google API JSON Key secret OCID to OCI Object Storage.

    • Sample OCI Object Storage: oci://demo-bucketname@OSnamespace/gcp_utility/BigQuery/ocigcp_user_creds.json
  5. Download Spark BigQuery Jar and upload it to Object Storage.

  6. Collect the following parameters for your Google Cloud BigQuery table.

    'project' : 'bigquery-public-data'
    'parentProject' : 'core-invention-366213'
    'table' : 'bitcoin_blockchain.transactions'
    "credentialsFile" : "./ocigcp_user_creds.json"
    
  7. Download Autonomous Data Warehouse Wallet from the OCI Portal and keep the User/Password details handy.

Task 1: Access Google Cloud BigQuery Using OCI Data Science Notebook with OCI Data Flow

  1. Open OCI Data Science Session, where you have already created Conda environment for OCI Data Flow. See Prerequisite Point 2.

  2. Open the New Notebook with Data Flow as Kernel.

  3. Create a Livy session for OCI Data Flow and provide other required information including Google Cloud BigQuery.

    spark.archives : oci://demo-ds-conda-env@OSNameSpace/conda_environments/cpu/PySpark 3.2 and Data Flow/1.0/pyspark32_p38_cpu_v1#conda
    spark.files : oci://demo-ds-conda-env@OSNameSpace/gcp_utility/BigQuery/ocigcp_user_creds.json
    spark.jars : oci://demo-ds-conda-env@OSNameSpace/gcp_utility/BigQuery/bigquery_spark-bigquery-with-dependencies_2.12-0.23.2.jar
    spark.oracle.datasource.enabled : true
    

    Sample Code to create SparkSession with Livy Session for OCI Data Flow:

    import json
    command = {
    "compartmentId": "ocid1.compartment.oc1..xxxxxxx",
    "displayName": "Demo_BigQuery2ADW_v1",
    "sparkVersion": "3.2.1",
    "driverShape": "VM.Standard.E3.Flex",
    "executorShape": "VM.Standard.E3.Flex",
    "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
    "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
    "numExecutors": 10,
    "configuration":{"spark.archives":"oci://demo-ds-conda-env@osnamespace/conda_environments/cpu/PySpark 3.2 and Data Flow/1.0/pyspark32_p38_cpu_v1#conda",
    "spark.files":"oci://demo-ds-conda-env@osnamespace/gcp_utility/BigQuery/ocigcp_user_creds.json",
    "spark.oracle.datasource.enabled":"true",
    "spark.hadoop.google.cloud.auth.service.account.enable":"true",
    "spark.jars":"oci://demo-ds-conda-env@osnamespace/gcp_utility/BigQuery/bigquery_spark-bigquery-with-dependencies_2.12-0.23.2.jar"
    }
    }
    command = f'\'{json.dumps(command)}\''
    print("command",command)
    
    #enableHiveSupport()
    %create_session -l python -c $command
    
    
  4. Import the required modules.

    %%spark
    
    #Import required libraries.
    import json
    import os
    import sys
    import datetime
    import oci
    
    import google.cloud.bigquery as bigquery
    import google.cloud
    
    import pyspark.sql
    from pyspark.sql.functions import countDistinct
    
    
  5. Read Google Cloud BigQuery table.

    Sample Code 1:

    
    %%spark
    
    # Read from BigQuery : "bitcoin_blockchain.transactions". i.e. At Source "BigQuery"
    #Number of rows : 340,311,544
    #Total logical bytes : 587.14 GB
    
    df_bitcoin_blockchain = spark.read.format('bigquery').option('project','bigquery-public-data').option('parentProject','core-invention-366213').option("credentialsFile","/opt/spark/work-dir/ocigcp_user_creds.json").option('table','bitcoin_blockchain.transactions').load()
    
    print("Total Records Count bitcoin_blockchain.transactions : ",df.count())
    

    Sample Code 2:

    %%spark
    #Read another BigQuery Table
    
    df_RetailPOS_15min = spark.read.format('bigquery').option('project','core-invention-366213').option('parentProject','core-invention-366213').option("credentialsFile","/opt/spark/work-dir/ocigcp_user_creds.json").option('table','Retail_Channel.RetailPOS_15min').load()
    
    df_RetailPOS_15min.show()
    
  6. Load data into Object Storage.

    %%spark
    #Write in Object Storage
    
    df_RetailPOS_15min.write.format("json").option("mode","overwrite").save("oci://ds-Raw@osnamespace/TargetData/bitcoin_blockchain_transactions")
    
    
  7. Load data into Autonomous Data Warehouse using Wallet password.

    %%spark
    
    print("Set Parameters for ADW connectivity.")
    
    USERNAME = "admin"
    PASSWORD = "xxxxx"
    connectionId= "demolakehouseadw_medium"
    walletUri = "oci://demo-ds-conda-env@osnamespace/oci_utility/Wallet_DemoLakeHouseADW.zip"
    
    properties = {"connectionId": connectionId,"user" : USERNAME,"password": PASSWORD,"walletUri": walletUri}
    print("properties:",properties)
    
    
    %%spark
    #Load into ADW:
    TARGET_TABLE = "ADMIN.RETAILPOS_15MINUTES"
    print("TARGET_TABLE : ",TARGET_TABLE)
    
    # Write to ADW.
    df_RetailPOS_15min.write.format("oracle").mode("append").option("dbtable",TARGET_TABLE).options(**properties).save()
    
    print("Writing completed to ADW.....")
    

Next Steps

Acknowledgments

Author - Kumar Chandragupta (OCI Senior Cloud Engineer)

More Learning Resources

Explore other labs on docs.oracle.com/learn or access more free learning content on the Oracle Learning YouTube channel. Additionally, visit education.oracle.com/learning-explorer to become an Oracle Learning Explorer.

For product documentation, visit Oracle Help Center.