Nota:

Conecte, acceda y analice los datos de Google Cloud BigQuery de OCI Data Flow mediante Apache Spark

Introducción

Establezca conectividad multinube con OCI

Con la creciente popularidad de Oracle Cloud Infrastructure, muchos clientes desean migrar a OCI o utilizar OCI como su solución multinube. Asimismo, muchos clientes desean acceder a otras plataformas de datos en la nube desde OCI y utilizar OCI para el procesamiento y la computación al tratar con soluciones de big data.

Objetivo

En este tutorial se mostrará cómo conectar Google Cloud BigQuery desde OCI Data Flow Spark Notebook y realizar alguna operación de lectura en la tabla BigQuery mediante Spark. También trataremos cómo anotar el marco de datos de Spark resultante en OCI Object Storage y Autonomous Data Warehouse.

Detalles de sesión de bloc de notas de OCI Data Science

Solución

Esta solución utilizará la capacidad de Apache Spark como el procesamiento paralelo y se distribuirá en el cálculo de memoria. La aplicación OCI Data Flow también se puede programar/orquestar mediante OCI Data Integration Service. En este enfoque, el usuario puede desarrollar su script de Spark en OCI Data Flow y el bloc de notas interactivo, que utiliza el cluster de Spark de OCI Data Flow. Los pasos de nivel superior son:

  1. Conéctese con Google Cloud Platform: Google Cloud BigQuery con el conector BigQuery de Apache Spark.
  2. Desarrolle una solución ETL completa.
  3. Extraiga datos de Google Cloud BigQuery.
  4. Transforme los datos mediante el cluster de Apache Spark en OCI Data Flow.
  5. Ingiera datos en OCI Object Storage o Autonomous Data Warehouse.
  6. Utilice el cuaderno interactivo de Spark fácil de recordar del desarrollador.
  7. Integre los paquetes de Spark de código abierto soportados.
  8. Organice el script con OCI Data Integration Service.

Requisitos

  1. Una suscripción activa a OCI y Google Cloud con acceso al portal.

  2. Configurar OCI Data Flow, OCI Object Storage Bucket y OCI Data Science Notebook. Para obtener más información, consulte:

  3. Cree y descargue OCID secreto de clave JSON de API de Google para el proyecto en el que reside la base de datos BigQuery en Google Cloud.

  4. Cargue el OCID secreto de clave JSON de API de Google en OCI Object Storage.

    • Almacenamiento de objetos de OCI de ejemplo: oci://demo-bucketname@OSnamespace/gcp_utility/BigQuery/ocigcp_user_creds.json
  5. Descargue el archivo Jar de Spark BigQuery y cárguelo en Object Storage.

  6. Recopile los siguientes parámetros para la tabla BigQuery de Google Cloud.

    'project' : 'bigquery-public-data'
    'parentProject' : 'core-invention-366213'
    'table' : 'bitcoin_blockchain.transactions'
    "credentialsFile" : "./ocigcp_user_creds.json"
    
  7. Descargue la cartera de Autonomous Data Warehouse desde el portal de OCI y mantenga a mano los detalles de usuario/contraseña.

Tarea 1: Acceso a Google Cloud BigQuery mediante el bloc de notas de OCI Data Science con OCI Data Flow

  1. Abra la sesión de OCI Data Science, donde ya ha creado el entorno Conda para OCI Data Flow. Consulte el punto de requisito previo 2.

  2. Abra el nuevo bloc de notas con Data Flow como núcleo.

  3. Cree una sesión de Livy para OCI Data Flow y proporcione otra información necesaria, incluida Google Cloud BigQuery.

    spark.archives : oci://demo-ds-conda-env@OSNameSpace/conda_environments/cpu/PySpark 3.2 and Data Flow/1.0/pyspark32_p38_cpu_v1#conda
    spark.files : oci://demo-ds-conda-env@OSNameSpace/gcp_utility/BigQuery/ocigcp_user_creds.json
    spark.jars : oci://demo-ds-conda-env@OSNameSpace/gcp_utility/BigQuery/bigquery_spark-bigquery-with-dependencies_2.12-0.23.2.jar
    spark.oracle.datasource.enabled : true
    

    Código de ejemplo para crear SparkSession con sesión privada para OCI Data Flow:

    import json
    command = {
    "compartmentId": "ocid1.compartment.oc1..xxxxxxx",
    "displayName": "Demo_BigQuery2ADW_v1",
    "sparkVersion": "3.2.1",
    "driverShape": "VM.Standard.E3.Flex",
    "executorShape": "VM.Standard.E3.Flex",
    "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
    "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
    "numExecutors": 10,
    "configuration":{"spark.archives":"oci://demo-ds-conda-env@osnamespace/conda_environments/cpu/PySpark 3.2 and Data Flow/1.0/pyspark32_p38_cpu_v1#conda",
    "spark.files":"oci://demo-ds-conda-env@osnamespace/gcp_utility/BigQuery/ocigcp_user_creds.json",
    "spark.oracle.datasource.enabled":"true",
    "spark.hadoop.google.cloud.auth.service.account.enable":"true",
    "spark.jars":"oci://demo-ds-conda-env@osnamespace/gcp_utility/BigQuery/bigquery_spark-bigquery-with-dependencies_2.12-0.23.2.jar"
    }
    }
    command = f'\'{json.dumps(command)}\''
    print("command",command)
    
    #enableHiveSupport()
    %create_session -l python -c $command
    
    
  4. Importe los módulos necesarios.

    %%spark
    
    #Import required libraries.
    import json
    import os
    import sys
    import datetime
    import oci
    
    import google.cloud.bigquery as bigquery
    import google.cloud
    
    import pyspark.sql
    from pyspark.sql.functions import countDistinct
    
    
  5. Lee la tabla BigQuery de Google Cloud.

    Código de ejemplo 1:

    
    %%spark
    
    # Read from BigQuery : "bitcoin_blockchain.transactions". i.e. At Source "BigQuery"
    #Number of rows : 340,311,544
    #Total logical bytes : 587.14 GB
    
    df_bitcoin_blockchain = spark.read.format('bigquery').option('project','bigquery-public-data').option('parentProject','core-invention-366213').option("credentialsFile","/opt/spark/work-dir/ocigcp_user_creds.json").option('table','bitcoin_blockchain.transactions').load()
    
    print("Total Records Count bitcoin_blockchain.transactions : ",df.count())
    

    Código de ejemplo 2:

    %%spark
    #Read another BigQuery Table
    
    df_RetailPOS_15min = spark.read.format('bigquery').option('project','core-invention-366213').option('parentProject','core-invention-366213').option("credentialsFile","/opt/spark/work-dir/ocigcp_user_creds.json").option('table','Retail_Channel.RetailPOS_15min').load()
    
    df_RetailPOS_15min.show()
    
  6. Cargue datos en el almacenamiento de objetos.

    %%spark
    #Write in Object Storage
    
    df_RetailPOS_15min.write.format("json").option("mode","overwrite").save("oci://ds-Raw@osnamespace/TargetData/bitcoin_blockchain_transactions")
    
    
  7. Cargue datos en Autonomous Data Warehouse mediante la contraseña de cartera.

    %%spark
    
    print("Set Parameters for ADW connectivity.")
    
    USERNAME = "admin"
    PASSWORD = "xxxxx"
    connectionId= "demolakehouseadw_medium"
    walletUri = "oci://demo-ds-conda-env@osnamespace/oci_utility/Wallet_DemoLakeHouseADW.zip"
    
    properties = {"connectionId": connectionId,"user" : USERNAME,"password": PASSWORD,"walletUri": walletUri}
    print("properties:",properties)
    
    
    %%spark
    #Load into ADW:
    TARGET_TABLE = "ADMIN.RETAILPOS_15MINUTES"
    print("TARGET_TABLE : ",TARGET_TABLE)
    
    # Write to ADW.
    df_RetailPOS_15min.write.format("oracle").mode("append").option("dbtable",TARGET_TABLE).options(**properties).save()
    
    print("Writing completed to ADW.....")
    

Pasos Siguientes

Agradecimientos

Autor: Kumar Chandragupta (ingeniero sénior de la nube de OCI)

Más recursos de aprendizaje

Explore otros laboratorios en docs.oracle.com/learn o acceda a más contenido de aprendizaje gratuito en el canal YouTube de Oracle Learning. Además, visite education.oracle.com/learning-explorer para convertirse en un explorador de Oracle Learning.

Para obtener documentación sobre los productos, visite Oracle Help Center.