참고:

Apache Spark를 사용하여 OCI Data Flow에서 Google Cloud BigQuery 데이터 연결, 액세스 및 분석

소개

OCI를 통한 멀티클라우드 연결 설정

Oracle Cloud Infrastructure가 급속히 인기가 높아짐에 따라 많은 고객이 OCI로 마이그레이션하거나 OCI를 멀티클라우드 솔루션으로 사용하고자 합니다. 마찬가지로 많은 고객이 OCI의 다른 클라우드 데이터 플랫폼에 액세스하고 빅데이터 솔루션을 처리할 때 처리/계산에 OCI를 사용하고자 합니다.

목표

이 사용지침서에서는 OCI 데이터 플로우 Spark 노트북에서 Google Cloud BigQuery을 연결하고 Spark를 사용하여 BigQuery 테이블에서 몇 가지 읽기 작업을 수행하는 방법을 보여줍니다. 또한 결과 Spark 데이터 프레임을 OCI Object Storage 및 Autonomous Data Warehouse에 기록하는 방법을 다룹니다.

OCI 데이터 과학 노트북 세션 세부정보

솔루션

이 솔루션은 병렬 처리 및 메모리 내 분산 계산과 같은 Apache Spark 기능을 활용합니다. OCI 데이터 플로우 애플리케이션은 OCI 데이터 통합 서비스를 통해 일정을 잡거나 조정할 수도 있습니다. 이 접근 방식에서 사용자는 OCI Data Flow 및 Interactive Notebook에서 OCI Data Flow Spark 클러스터를 활용하는 Spark 스크립트를 개발할 수 있습니다. 상위 레벨 단계는 다음과 같습니다.

  1. Apache Spark BigQuery Connector를 사용하여 Google Cloud Platform: Google Cloud BigQuery에 연결합니다.
  2. 완벽한 ETL 솔루션 개발
  3. Google Cloud BigQuery에서 데이터를 추출합니다.
  4. OCI 데이터 플로우에서 Apache Spark 클러스터를 사용하여 데이터를 변환합니다.
  5. OCI Object Storage 또는 Autonomous Data Warehouse에서 데이터를 수집합니다.
  6. 개발자의 친숙한 Interactive Spark 노트북을 사용합니다.
  7. 지원되는 모든 오픈 소스 Spark 패키지를 통합합니다.
  8. OCI Data Integration Service를 사용하여 스크립트를 통합관리합니다.

필요 조건

  1. 포털에 액세스할 수 있는 활성 OCI 및 Google Cloud 구독입니다.

  2. OCI 데이터 플로우, OCI 오브젝트 스토리지 버킷 및 OCI 데이터 과학 노트북을 설정합니다. 자세한 내용은 다음을 참조하십시오.

  3. BigQuery 데이터베이스가 Google Cloud에 상주하는 프로젝트에 대한 Google API JSON 키 암호 OCID를 생성하고 다운로드합니다.

  4. Google API JSON 키 암호 OCID를 OCI Object Storage에 업로드합니다.

    • 샘플 OCI 오브젝트 스토리지: oci://demo-bucketname@OSnamespace/gcp_utility/BigQuery/ocigcp_user_creds.json
  5. Spark BigQuery Jar를 다운로드하여 오브젝트 스토리지에 업로드합니다.

  6. Google Cloud BigQuery 테이블에 대해 다음 매개변수를 수집합니다.

    'project' : 'bigquery-public-data'
    'parentProject' : 'core-invention-366213'
    'table' : 'bitcoin_blockchain.transactions'
    "credentialsFile" : "./ocigcp_user_creds.json"
    
  7. OCI 포털에서 Autonomous Data Warehouse Wallet을 다운로드하고 사용자/비밀번호 세부정보를 편리하게 유지합니다.

작업 1: OCI 데이터 플로우와 함께 OCI 데이터 과학 노트북을 사용하여 Google Cloud BigQuery에 액세스

  1. OCI 데이터 플로우용 Conda 환경을 이미 생성한 OCI 데이터 과학 세션을 엽니다. 필요 조건 점 2를 참조하십시오.

  2. Data Flow를 Kernel로 사용하여 새 노트북을 엽니다.

  3. OCI Data Flow에 대한 Livy 세션을 생성하고 Google Cloud BigQuery를 비롯한 기타 필수 정보를 제공합니다.

    spark.archives : oci://demo-ds-conda-env@OSNameSpace/conda_environments/cpu/PySpark 3.2 and Data Flow/1.0/pyspark32_p38_cpu_v1#conda
    spark.files : oci://demo-ds-conda-env@OSNameSpace/gcp_utility/BigQuery/ocigcp_user_creds.json
    spark.jars : oci://demo-ds-conda-env@OSNameSpace/gcp_utility/BigQuery/bigquery_spark-bigquery-with-dependencies_2.12-0.23.2.jar
    spark.oracle.datasource.enabled : true
    

    OCI 데이터 플로우용 Livy 세션이 있는 SparkSession를 생성하는 샘플 코드:

    import json
    command = {
    "compartmentId": "ocid1.compartment.oc1..xxxxxxx",
    "displayName": "Demo_BigQuery2ADW_v1",
    "sparkVersion": "3.2.1",
    "driverShape": "VM.Standard.E3.Flex",
    "executorShape": "VM.Standard.E3.Flex",
    "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
    "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
    "numExecutors": 10,
    "configuration":{"spark.archives":"oci://demo-ds-conda-env@osnamespace/conda_environments/cpu/PySpark 3.2 and Data Flow/1.0/pyspark32_p38_cpu_v1#conda",
    "spark.files":"oci://demo-ds-conda-env@osnamespace/gcp_utility/BigQuery/ocigcp_user_creds.json",
    "spark.oracle.datasource.enabled":"true",
    "spark.hadoop.google.cloud.auth.service.account.enable":"true",
    "spark.jars":"oci://demo-ds-conda-env@osnamespace/gcp_utility/BigQuery/bigquery_spark-bigquery-with-dependencies_2.12-0.23.2.jar"
    }
    }
    command = f'\'{json.dumps(command)}\''
    print("command",command)
    
    #enableHiveSupport()
    %create_session -l python -c $command
    
    
  4. 필요한 모듈을 임포트합니다.

    %%spark
    
    #Import required libraries.
    import json
    import os
    import sys
    import datetime
    import oci
    
    import google.cloud.bigquery as bigquery
    import google.cloud
    
    import pyspark.sql
    from pyspark.sql.functions import countDistinct
    
    
  5. Google Cloud BigQuery 테이블을 읽습니다.

    샘플 코드 1:

    
    %%spark
    
    # Read from BigQuery : "bitcoin_blockchain.transactions". i.e. At Source "BigQuery"
    #Number of rows : 340,311,544
    #Total logical bytes : 587.14 GB
    
    df_bitcoin_blockchain = spark.read.format('bigquery').option('project','bigquery-public-data').option('parentProject','core-invention-366213').option("credentialsFile","/opt/spark/work-dir/ocigcp_user_creds.json").option('table','bitcoin_blockchain.transactions').load()
    
    print("Total Records Count bitcoin_blockchain.transactions : ",df.count())
    

    샘플 코드 2:

    %%spark
    #Read another BigQuery Table
    
    df_RetailPOS_15min = spark.read.format('bigquery').option('project','core-invention-366213').option('parentProject','core-invention-366213').option("credentialsFile","/opt/spark/work-dir/ocigcp_user_creds.json").option('table','Retail_Channel.RetailPOS_15min').load()
    
    df_RetailPOS_15min.show()
    
  6. 데이터를 오브젝트 스토리지로 로드합니다.

    %%spark
    #Write in Object Storage
    
    df_RetailPOS_15min.write.format("json").option("mode","overwrite").save("oci://ds-Raw@osnamespace/TargetData/bitcoin_blockchain_transactions")
    
    
  7. 전자 지갑 비밀번호를 사용하여 Autonomous Data Warehouse로 데이터를 로드합니다.

    %%spark
    
    print("Set Parameters for ADW connectivity.")
    
    USERNAME = "admin"
    PASSWORD = "xxxxx"
    connectionId= "demolakehouseadw_medium"
    walletUri = "oci://demo-ds-conda-env@osnamespace/oci_utility/Wallet_DemoLakeHouseADW.zip"
    
    properties = {"connectionId": connectionId,"user" : USERNAME,"password": PASSWORD,"walletUri": walletUri}
    print("properties:",properties)
    
    
    %%spark
    #Load into ADW:
    TARGET_TABLE = "ADMIN.RETAILPOS_15MINUTES"
    print("TARGET_TABLE : ",TARGET_TABLE)
    
    # Write to ADW.
    df_RetailPOS_15min.write.format("oracle").mode("append").option("dbtable",TARGET_TABLE).options(**properties).save()
    
    print("Writing completed to ADW.....")
    

다음 단계

승인

작성자 - Kumar Chandragupta(OCI 선임 클라우드 엔지니어)

추가 학습 자원

docs.oracle.com/learn에서 다른 실습을 탐색하거나 Oracle Learning YouTube 채널에서 더 많은 무료 학습 콘텐츠에 액세스할 수 있습니다. 또한 education.oracle.com/learning-explorer을 방문하여 Oracle Learning Explorer가 됩니다.

제품 설명서는 Oracle Help Center를 참조하십시오.