Remarque :

Connexion, accès et analyse des données BigQuery de Google Cloud à partir d'OCI Data Flow à l'aide d'Apache Spark

Introduction

Etablir une connectivité multicloud avec OCI

Avec la popularité croissante d'Oracle Cloud Infrastructure, de nombreux clients souhaitent migrer vers OCI ou utiliser OCI comme solution multicloud. De même, de nombreux clients souhaitent accéder à d'autres plates-formes de données cloud à partir d'OCI et utiliser OCI pour le traitement/calcul lorsqu'ils traitent des solutions Big Data.

Objectif

Ce tutoriel montre comment connecter Google Cloud BigQuery à partir du bloc-notes Spark OCI Data Flow et effectuer une opération de lecture sur la table BigQuery à l'aide de Spark. Nous verrons également comment écrire la structure de données Spark résultante dans OCI Object Storage et Autonomous Data Warehouse.

Détails de la session de bloc-notes OCI Data Science

Solution

Cette solution tirera parti des fonctionnalités d'Apache Spark telles que le traitement parallèle et le calcul en mémoire. L'application OCI Data Flow peut également être programmée/orchestrée via OCI Data Integration Service. Dans cette approche, l'utilisateur peut développer son script Spark sur OCI Data Flow et son bloc-notes interactif qui exploite lui-même le cluster Spark OCI Data Flow. Les étapes générales sont les suivantes :

  1. Connectez-vous à Google Cloud Platform : Google Cloud BigQuery à l'aide d'Apache Spark BigQuery Connector.
  2. Développer une solution ETL complète.
  3. Extrayez les données de Google Cloud BigQuery.
  4. Transformez les données à l'aide du cluster Apache Spark sur OCI Data Flow.
  5. Assimilation des données dans OCI Object Storage ou Autonomous Data Warehouse.
  6. Utiliser le bloc-notes interactif Spark convivial du développeur.
  7. Intégrer tous les packages Spark open source pris en charge.
  8. Orchestrez votre script à l'aide du service OCI Data Integration.

Prérequis

  1. Abonnement OCI et Google Cloud actif avec accès au portail.

  2. Configurez OCI Data Flow, OCI Object Storage Bucket et OCI Data Science Notebook. Pour plus d'informations, voir :

  3. Créez et téléchargez l'OCID de clé secrète JSON de l'API Google pour le projet dans lequel la base de données BigQuery réside sur Google Cloud.

  4. Téléchargez l'OCID de clé secrète JSON de l'API Google vers OCI Object Storage.

    • Exemple de stockage d'objet OCI : oci://demo-bucketname@OSnamespace/gcp_utility/BigQuery/ocigcp_user_creds.json
  5. Téléchargez le fichier JAR BigQuery Spark et téléchargez-le vers Object Storage.

  6. Collectez les paramètres suivants pour votre table Google Cloud BigQuery.

    'project' : 'bigquery-public-data'
    'parentProject' : 'core-invention-366213'
    'table' : 'bitcoin_blockchain.transactions'
    "credentialsFile" : "./ocigcp_user_creds.json"
    
  7. Téléchargez le portefeuille Autonomous Data Warehouse à partir du portail OCI et conservez les détails de l'utilisateur/du mot de passe à portée de main.

Tâche 1 : accéder à Google Cloud BigQuery à l'aide du bloc-notes OCI Data Science avec OCI Data Flow

  1. Ouvrez la session OCI Data Science, où vous avez déjà créé un environnement Conda pour OCI Data Flow. Voir Prérequis Point 2.

  2. Ouvrez le nouveau bloc-notes avec Data Flow en tant que noyau.

  3. Créez une session Livy pour OCI Data Flow et fournissez d'autres informations requises, notamment Google Cloud BigQuery.

    spark.archives : oci://demo-ds-conda-env@OSNameSpace/conda_environments/cpu/PySpark 3.2 and Data Flow/1.0/pyspark32_p38_cpu_v1#conda
    spark.files : oci://demo-ds-conda-env@OSNameSpace/gcp_utility/BigQuery/ocigcp_user_creds.json
    spark.jars : oci://demo-ds-conda-env@OSNameSpace/gcp_utility/BigQuery/bigquery_spark-bigquery-with-dependencies_2.12-0.23.2.jar
    spark.oracle.datasource.enabled : true
    

    Exemple de code permettant de créer SparkSession avec une session Livy pour OCI Data Flow :

    import json
    command = {
    "compartmentId": "ocid1.compartment.oc1..xxxxxxx",
    "displayName": "Demo_BigQuery2ADW_v1",
    "sparkVersion": "3.2.1",
    "driverShape": "VM.Standard.E3.Flex",
    "executorShape": "VM.Standard.E3.Flex",
    "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
    "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
    "numExecutors": 10,
    "configuration":{"spark.archives":"oci://demo-ds-conda-env@osnamespace/conda_environments/cpu/PySpark 3.2 and Data Flow/1.0/pyspark32_p38_cpu_v1#conda",
    "spark.files":"oci://demo-ds-conda-env@osnamespace/gcp_utility/BigQuery/ocigcp_user_creds.json",
    "spark.oracle.datasource.enabled":"true",
    "spark.hadoop.google.cloud.auth.service.account.enable":"true",
    "spark.jars":"oci://demo-ds-conda-env@osnamespace/gcp_utility/BigQuery/bigquery_spark-bigquery-with-dependencies_2.12-0.23.2.jar"
    }
    }
    command = f'\'{json.dumps(command)}\''
    print("command",command)
    
    #enableHiveSupport()
    %create_session -l python -c $command
    
    
  4. Importez les modules requis.

    %%spark
    
    #Import required libraries.
    import json
    import os
    import sys
    import datetime
    import oci
    
    import google.cloud.bigquery as bigquery
    import google.cloud
    
    import pyspark.sql
    from pyspark.sql.functions import countDistinct
    
    
  5. Lisez la table BigQuery de Google Cloud.

    Exemple de code 1 :

    
    %%spark
    
    # Read from BigQuery : "bitcoin_blockchain.transactions". i.e. At Source "BigQuery"
    #Number of rows : 340,311,544
    #Total logical bytes : 587.14 GB
    
    df_bitcoin_blockchain = spark.read.format('bigquery').option('project','bigquery-public-data').option('parentProject','core-invention-366213').option("credentialsFile","/opt/spark/work-dir/ocigcp_user_creds.json").option('table','bitcoin_blockchain.transactions').load()
    
    print("Total Records Count bitcoin_blockchain.transactions : ",df.count())
    

    Exemple de code 2 :

    %%spark
    #Read another BigQuery Table
    
    df_RetailPOS_15min = spark.read.format('bigquery').option('project','core-invention-366213').option('parentProject','core-invention-366213').option("credentialsFile","/opt/spark/work-dir/ocigcp_user_creds.json").option('table','Retail_Channel.RetailPOS_15min').load()
    
    df_RetailPOS_15min.show()
    
  6. Chargez des données dans Object Storage.

    %%spark
    #Write in Object Storage
    
    df_RetailPOS_15min.write.format("json").option("mode","overwrite").save("oci://ds-Raw@osnamespace/TargetData/bitcoin_blockchain_transactions")
    
    
  7. Chargez les données dans Autonomous Data Warehouse à l'aide du mot de passe de portefeuille.

    %%spark
    
    print("Set Parameters for ADW connectivity.")
    
    USERNAME = "admin"
    PASSWORD = "xxxxx"
    connectionId= "demolakehouseadw_medium"
    walletUri = "oci://demo-ds-conda-env@osnamespace/oci_utility/Wallet_DemoLakeHouseADW.zip"
    
    properties = {"connectionId": connectionId,"user" : USERNAME,"password": PASSWORD,"walletUri": walletUri}
    print("properties:",properties)
    
    
    %%spark
    #Load into ADW:
    TARGET_TABLE = "ADMIN.RETAILPOS_15MINUTES"
    print("TARGET_TABLE : ",TARGET_TABLE)
    
    # Write to ADW.
    df_RetailPOS_15min.write.format("oracle").mode("append").option("dbtable",TARGET_TABLE).options(**properties).save()
    
    print("Writing completed to ADW.....")
    

Etapes suivantes

Remerciements

Auteur - Kumar Chandragupta (ingénieur cloud senior OCI)

Ressources de formation supplémentaires

Explorez d'autres ateliers sur docs.oracle.com/learn ou accédez à davantage de contenu de formation gratuit sur le canal Oracle Learning YouTube. En outre, accédez à education.oracle.com/learning-explorer pour devenir un explorateur Oracle Learning.

Pour consulter la documentation produit, consultez Oracle Help Center.