Observação:

Conecte, acesse e analise dados do Google Cloud BigQuery do OCI Data Flow usando o Apache Spark

Introdução

Estabeleça conectividade multicloud com a OCI

Com a crescente popularidade do Oracle Cloud Infrastructure, muitos clientes querem migrar para a OCI ou usar a OCI como sua solução multinuvem. Da mesma forma, muitos clientes desejam acessar outras plataformas de Dados na Nuvem da OCI e usar a OCI para processamento/computação ao lidar com soluções de Big Data.

Objetivo

Este tutorial demonstrará como conectar o Google Cloud BigQuery do OCI Data Flow Spark Notebook e executar alguma operação de leitura na tabela BigQuery usando o Spark. Também abordaremos como anotar o dataframe Spark resultante no OCI Object Storage e no Autonomous Data Warehouse.

Detalhes da Sessão de Notebook do OCI Data Science

Solução

Esta solução utilizará o recurso Apache Spark, como processamento paralelo e distribuído no cálculo da memória. O aplicativo OCI Data Flow também pode ser programado/orquestrado por meio do OCI Data Integration Service. Nesta abordagem, o usuário pode desenvolver seu Script Spark no OCI Data Flow e no Notebook Interativo que ele próprio aproveita o cluster Spark do OCI Data Flow. As etapas de alto nível são:

  1. Conecte-se com o Google Cloud Platform: Google Cloud BigQuery usando o Apache Spark BigQuery Connector.
  2. Desenvolva uma Solução ETL completa.
  3. Extraia Dados do Google Cloud BigQuery.
  4. Transforme os dados usando o Cluster Apache Spark no OCI Data Flow.
  5. Ingestão de dados no OCI Object Storage ou Autonomous Data Warehouse.
  6. Use o Notebook Interativo do Spark amigável ao Desenvolvedor.
  7. Integre quaisquer pacotes Spark de código-fonte aberto suportados.
  8. Orquestre seu script usando o OCI Data Integration Service.

Pré-requisitos

  1. Uma assinatura ativa do OCI e do Google Cloud com acesso ao portal.

  2. Configure o OCI Data Flow, o OCI Object Storage Bucket e o OCI Data Science Notebook. Para obter mais informações, consulte:

  3. Crie e faça download do OCID do segredo da Chave JSON da API do Google para o projeto em que o banco de dados BigQuery reside no Google Cloud.

  4. Faça upload do OCID do segredo da Chave JSON da API do Google para o OCI Object Storage.

    • Armazenamento de Objetos do OCI de Amostra: oci://demo-bucketname@OSnamespace/gcp_utility/BigQuery/ocigcp_user_creds.json
  5. Faça download do Spark BigQuery Jar e faça upload dele no Object Storage.

  6. Colete os seguintes parâmetros para sua tabela do Google Cloud BigQuery.

    'project' : 'bigquery-public-data'
    'parentProject' : 'core-invention-366213'
    'table' : 'bitcoin_blockchain.transactions'
    "credentialsFile" : "./ocigcp_user_creds.json"
    
  7. Faça download da Wallet do Autonomous Data Warehouse no Portal do OCI e mantenha os detalhes do Usuário/Senha acessíveis.

Tarefa 1: Acessar o Google Cloud BigQuery Usando o Notebook do OCI Data Science com o OCI Data Flow

  1. Abra a Sessão do OCI Data Science, na qual você já criou o ambiente Conda para o OCI Data Flow. Consulte o Ponto 2 do Pré-requisito.

  2. Abra o Novo Notebook com o Data Flow como Kernel.

  3. Crie uma sessão Livy para o OCI Data Flow e forneça outras informações necessárias, incluindo o Google Cloud BigQuery.

    spark.archives : oci://demo-ds-conda-env@OSNameSpace/conda_environments/cpu/PySpark 3.2 and Data Flow/1.0/pyspark32_p38_cpu_v1#conda
    spark.files : oci://demo-ds-conda-env@OSNameSpace/gcp_utility/BigQuery/ocigcp_user_creds.json
    spark.jars : oci://demo-ds-conda-env@OSNameSpace/gcp_utility/BigQuery/bigquery_spark-bigquery-with-dependencies_2.12-0.23.2.jar
    spark.oracle.datasource.enabled : true
    

    Código de Amostra para criar SparkSession com Sessão Livy para o OCI Data Flow:

    import json
    command = {
    "compartmentId": "ocid1.compartment.oc1..xxxxxxx",
    "displayName": "Demo_BigQuery2ADW_v1",
    "sparkVersion": "3.2.1",
    "driverShape": "VM.Standard.E3.Flex",
    "executorShape": "VM.Standard.E3.Flex",
    "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
    "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
    "numExecutors": 10,
    "configuration":{"spark.archives":"oci://demo-ds-conda-env@osnamespace/conda_environments/cpu/PySpark 3.2 and Data Flow/1.0/pyspark32_p38_cpu_v1#conda",
    "spark.files":"oci://demo-ds-conda-env@osnamespace/gcp_utility/BigQuery/ocigcp_user_creds.json",
    "spark.oracle.datasource.enabled":"true",
    "spark.hadoop.google.cloud.auth.service.account.enable":"true",
    "spark.jars":"oci://demo-ds-conda-env@osnamespace/gcp_utility/BigQuery/bigquery_spark-bigquery-with-dependencies_2.12-0.23.2.jar"
    }
    }
    command = f'\'{json.dumps(command)}\''
    print("command",command)
    
    #enableHiveSupport()
    %create_session -l python -c $command
    
    
  4. Importe os módulos obrigatórios.

    %%spark
    
    #Import required libraries.
    import json
    import os
    import sys
    import datetime
    import oci
    
    import google.cloud.bigquery as bigquery
    import google.cloud
    
    import pyspark.sql
    from pyspark.sql.functions import countDistinct
    
    
  5. Leia a tabela BigQuery do Google Cloud.

    Código de Amostra 1:

    
    %%spark
    
    # Read from BigQuery : "bitcoin_blockchain.transactions". i.e. At Source "BigQuery"
    #Number of rows : 340,311,544
    #Total logical bytes : 587.14 GB
    
    df_bitcoin_blockchain = spark.read.format('bigquery').option('project','bigquery-public-data').option('parentProject','core-invention-366213').option("credentialsFile","/opt/spark/work-dir/ocigcp_user_creds.json").option('table','bitcoin_blockchain.transactions').load()
    
    print("Total Records Count bitcoin_blockchain.transactions : ",df.count())
    

    Código de Amostra 2:

    %%spark
    #Read another BigQuery Table
    
    df_RetailPOS_15min = spark.read.format('bigquery').option('project','core-invention-366213').option('parentProject','core-invention-366213').option("credentialsFile","/opt/spark/work-dir/ocigcp_user_creds.json").option('table','Retail_Channel.RetailPOS_15min').load()
    
    df_RetailPOS_15min.show()
    
  6. Carregue dados no Object Storage.

    %%spark
    #Write in Object Storage
    
    df_RetailPOS_15min.write.format("json").option("mode","overwrite").save("oci://ds-Raw@osnamespace/TargetData/bitcoin_blockchain_transactions")
    
    
  7. Carregue dados no Autonomous Data Warehouse usando a senha da Wallet.

    %%spark
    
    print("Set Parameters for ADW connectivity.")
    
    USERNAME = "admin"
    PASSWORD = "xxxxx"
    connectionId= "demolakehouseadw_medium"
    walletUri = "oci://demo-ds-conda-env@osnamespace/oci_utility/Wallet_DemoLakeHouseADW.zip"
    
    properties = {"connectionId": connectionId,"user" : USERNAME,"password": PASSWORD,"walletUri": walletUri}
    print("properties:",properties)
    
    
    %%spark
    #Load into ADW:
    TARGET_TABLE = "ADMIN.RETAILPOS_15MINUTES"
    print("TARGET_TABLE : ",TARGET_TABLE)
    
    # Write to ADW.
    df_RetailPOS_15min.write.format("oracle").mode("append").option("dbtable",TARGET_TABLE).options(**properties).save()
    
    print("Writing completed to ADW.....")
    

Próximas Etapas

Aquisições

Autor - Kumar Chandragupta (Engenheiro de Nuvem Sênior do OCI)

Mais Recursos de Aprendizagem

Explore outros laboratórios no site docs.oracle.com/learn ou acesse mais conteúdo de aprendizado gratuito no canal YouTube do Oracle Learning. Além disso, visite education.oracle.com/learning-explorer para se tornar um Oracle Learning Explorer.

Para obter a documentação do produto, visite o Oracle Help Center.