참고:

OCI Data Science 대화식 노트북을 사용하여 OCI 데이터 플로우에서 분석을 수행합니다.

소개

OCI(Oracle Cloud Infrastructure) 데이터 플로우는 거의 관리하지 않고도 모든 규모에서 Apache Spark 애플리케이션을 실행할 수 있는 완전 관리형 빅데이터 서비스입니다. Spark는 최고의 빅데이터 처리 프레임워크가 되었으며, OCI Data Flow는 개발자가 설치하거나 관리할 것이 없기 때문에 Oracle Cloud에서 Spark를 실행하는 가장 쉬운 방법입니다.

목표

이 자습서의 목적은 OCI Data Science 노트북 세션을 통해 OCI 데이터 플로우 세션에 액세스하는 데 필요한 설정을 안내하는 것입니다. 이 세션을 사용하면 Apache Livy 통합을 통해 오래 지속되는 데이터 플로우 클러스터에서 대화식 Spark 워크로드를 실행할 수 있습니다.

또한 OCI Data Flow Spark 세션이 생성되면 OCI Object Storage 및 Oracle Autonomous Data Warehouse에서 Spark 작업을 수행하기 위한 몇 가지 샘플 코드를 살펴보겠습니다.

작업 1: OCI Data Flow를 사용하여 OCI Data Science 노트북 설정

OCI Data Flow Conda 환경 생성 및 설정

  1. 필요한 버킷을 생성합니다.

    a. 테넌시에 dataflow-logs라는 버킷을 생성합니다.

    b. 테넌시에 dataflow-warehouse라는 버킷을 생성합니다.

  2. 특정 구획에 동적 그룹을 생성합니다.

      ALL {resource.type='dataflowrun', resource.compartment.id='<compartment_id>'}
      ALL {resource.type='datasciencenotebooksession', resource.compartment.id='<compartment_id>'}
      Any {resource.type = 'datacatalogmetastore'}
    
    
  3. OCI Data Flow 및 OCI Data Science에서 OCI 리소스를 관리하는 정책을 생성합니다.

    • ALLOW DYNAMIC-GROUP <df-dynamic-group> TO MANAGE objects IN TENANCY WHERE ANY {target.bucket.name='<bucket_name>',target.bucket.name ='dataflow-logs,target.bucket.name='dataflow-warehouse'}

    • ALLOW DYNAMIC-GROUP '<ds-dynamic-group>' TO MANAGE dataflow-family in compartment '<your-compartment-name>'

    • ALLOW DYNAMIC-GROUP '<df-dynamic-group>' TO MANAGE data-catalog-metastores IN TENANCY

    • ALLOW DYNAMIC-GROUP '<dcat-hive-group>' TO READ buckets IN TENANCY

    • ALLOW DYNAMIC-GROUP '<dcat-hive-group>' TO MANAGE object-family IN TENANCY WHERE ANY { target.bucket.name = '<bucket_name>',target.bucket.name = '<managed-table-location-bucket>',target.bucket.name = '<external-table-location-bucket>'}

    • ALLOW DYNAMIC-GROUP '<ds-dynamic-group>' TO MANAGE objects IN TENANCY WHERE ALL {target.bucket.name='ds-conda-env'}

    • ALLOW DYNAMIC-GROUP '<df-dynamic-group>' TO MANAGE objects IN TENANCY WHERE ALL {target.bucket.name='ds-conda-env'}

  4. OCI Data Science 프로젝트 및 세션을 생성합니다.

    OCI 데이터 과학 노트북 세션 세부정보

  5. 새 OCI Data Science 세션을 엽니다. 파일 메뉴에서 새 실행기를 선택한 다음 터미널을 누릅니다.

    OCI 데이터 과학 세션 세부정보

  6. 터미널에서 pyspark32_p38_cpu_v1 Conda 환경을 설치하고 활성화합니다.

     odsc conda install -s pyspark32_p38_cpu_v1
     source activate /home/datascience/conda/pyspark32_p38_cpu_v1
    
    
  7. Conda가 활성화되면 새 실행기 탭으로 이동하여 설정을 누릅니다. Conda 패키지를 업로드하고 저장할 오브젝트 스토리지에 대한 필수 정보를 입력합니다.

    DS 세션에 대한 Conda 환경 설정

  8. Conda 환경을 게시합니다.

    odsc conda publish -s pyspark3_2anddataflowv1_0

    참고: 게시하는 데 시간이 걸립니다. 완료되면 오브젝트 스토리지 버킷에 Conda 패키지가 업로드되는 것을 확인할 수 있습니다.

Livy Service를 사용하여 OCI Data Flow Spark 세션 설정 및 생성

새 실행기에서 커널로 "PySpark 및 DataFlow"를 사용하여 노트북을 열고 다음 명령을 실행하여 Livy Service를 사용하여 OCI Data Flow Spark 세션을 설정하고 생성합니다.

  1. ADS를 사용하여 인증을 설정합니다.

    import ads ads.set_auth("resource_principal") # Supported values: resource_principal, api_key

  2. 확장 로드.

    %load_ext dataflow.magics

  3. OCI Data Science 노트북을 통해 Livy 서비스를 사용하여 OCI Data Flow Spark 세션을 생성합니다.

     import json
     command = {
        "compartmentId": "ocid1.compartment.oc1..xxxxxxxxxxxxxx",
        "displayName": "Demo_DataFlow_Spark_v1",
        "sparkVersion": "3.2.1",
        "driverShape": "VM.Standard.E3.Flex",
        "executorShape": "VM.Standard.E3.Flex",
        "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "numExecutors": 1,
        "logsBucketUri": "<oci://bucket@namespace/>",
        "archiveUri": "<oci://bucket@namespace/archive.zip>"
        "configuration":{"spark.archives":"<oci://bucket@namespace/>#conda",
                          "spark.oracle.datasource.enabled":"true"}
     }
    
     command = f'\'{json.dumps(command)}\''
     print("command",command)
    
     #"configuration":{
     #    "spark.dynamicAllocation.enabled":"true",
     #    "spark.dynamicAllocation.shuffleTracking.enabled":"true",
     #    "spark.dynamicAllocation.minExecutors":"1",
     #    "spark.dynamicAllocation.maxExecutors":"4",
     #    "spark.dynamicAllocation.executorIdleTimeout":"60",
     #    "spark.dynamicAllocation.schedulerBacklogTimeout":"60",
     #    "spark.dataflow.dynamicAllocation.quotaPolicy":"min" }}'
    
     %create_session -l python -c $command
    
    

    객체 업로드

작업 2: 샘플 코드를 사용하여 OCI Object Storage에서 Spark 작업 수행

  1. 세션에서 종속 라이브러리를 가져옵니다.

     %%spark
     #Import required libraries.
    
     import json
     import os
     import sys
     import datetime
     import oci
     import pyspark.sql
     from pyspark.sql.functions import countDistinct
    
     from delta.tables import *
    
    
  2. 오브젝트 스토리지에서 Spark 읽기 작업을 수행합니다. Livy 세션에서 spark.read를 사용하여 오브젝트 스토리지 파일을 읽습니다.

     %%spark -o df_Bronze_Insurance_Data
     #Read Claim Insurance files from OCI Object Storage in Spark Dataframe.
     df_Bronze_Insurance_Data = spark.read.format("csv").option("header", "true") \
     .option("multiLine", "true").load("oci://test-demo@OSNamespace/insur_claim/claim.csv*")
    
     print("df_RawZone_Data",df_Bronze_Insurance_Data)
     df_Bronze_Insurance_Data.show(5)
    
    
  3. 오브젝트 스토리지에 대해 Spark 쓰기 작업을 수행합니다.

     %%spark
     df_Bronze_Insurance_Data.write.format("json").option("mode","overwrite").save("oci://test-demo@OSNamespace/insur_claim/claim_curated")
    
    

작업 3: 샘플 코드를 사용하여 Oracle Autonomous Data Warehouse에서 읽기 및 쓰기 작업 수행

  1. Wallet용 Secret Vault를 사용하여 Oracle Autonomous Data Warehouse로 데이터를 로드합니다. 다음 코드를 있는 그대로 복사합니다. 자세한 내용은 GitHub sample를 참조하십시오.

     %%spark
    
     def get_authenticated_client(token_path, client, file_location=None, profile_name=None):
        """
        Get an an authenticated OCI client.
        Example: get_authenticated_client(token_path, oci.object_storage.ObjectStorageClient)
        """
        import oci
    
        if not in_dataflow():
           # We are running locally, use our API Key.
           if file_location is None:
                 file_location = oci.config.DEFAULT_LOCATION
           if profile_name is None:
                 profile_name = oci.config.DEFAULT_PROFILE
           config = oci.config.from_file(file_location=file_location, profile_name=profile_name)
           authenticated_client = client(config)
        else:
           # We are running in Data Flow, use our Delegation Token.
           with open(token_path) as fd:
                 delegation_token = fd.read()
           signer = oci.auth.signers.InstancePrincipalsDelegationTokenSigner(
                 delegation_token=delegation_token
           )
           authenticated_client = client(config={}, signer=signer)
        return authenticated_client
    
     def get_password_from_secrets(token_path, password_ocid):
        """
        Get a password from the OCI Secrets Service.
        """
        import base64
        import oci
    
        secrets_client = get_authenticated_client(token_path, oci.secrets.SecretsClient)
        response = secrets_client.get_secret_bundle(password_ocid)
        base64_secret_content = response.data.secret_bundle_content.content
        base64_secret_bytes = base64_secret_content.encode("ascii")
        base64_message_bytes = base64.b64decode(base64_secret_bytes)
        secret_content = base64_message_bytes.decode("ascii")
        return secret_content
    
     def get_delegation_token_path(spark):
        """
        Get the delegation token path when we're running in Data Flow.
        """
        if not in_dataflow():
           return None
        token_key = "spark.hadoop.fs.oci.client.auth.delegationTokenPath"
        token_path = spark.sparkContext.getConf().get(token_key)
        if not token_path:
           raise Exception(f"{token_key} is not set")
        return token_path    
    
     def get_temporary_directory():
        if in_dataflow():
           return "/opt/spark/work-dir/"
        else:
           import tempfile
           return tempfile.gettempdir()
    
     def in_dataflow():
        """
        Determine if we are running in OCI Data Flow by checking the environment.
        """
        if os.environ.get("HOME") == "/home/dataflow":
           return True
        return False
    
     def download_wallet(spark, wallet_path):
        """
        Download an Oracle Autonomous Data Warehouse/ATP wallet file and prepare it for use in a Data Flow
        application.
        """
        import oci
        import zipfile
    
        # Get an object store client.
        token_path = get_delegation_token_path(spark)
        object_store_client = get_authenticated_client(
           token_path, oci.object_storage.ObjectStorageClient
        )
    
        # Download the wallet file.
        from urllib.parse import urlparse
        parsed = urlparse(wallet_path)
        bucket_name, namespace = parsed.netloc.split("@")
        file_name = parsed.path[1:]
        response = object_store_client.get_object(namespace, bucket_name, file_name)
        temporary_directory = get_temporary_directory()
        zip_file_path = os.path.join(temporary_directory, "wallet.zip")
        with open(zip_file_path, "wb") as fd:
           for chunk in response.data.raw.stream(1024 * 1024, decode_content=False):
                 fd.write(chunk)
    
        # Extract everything locally.
        with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
           zip_ref.extractall(temporary_directory)
    
        # Distribute all wallet files.
        contents = "cwallet.sso ewallet.p12 keystore.jks ojdbc.properties sqlnet.ora tnsnames.ora truststore.jks".split()
        spark_context = spark.sparkContext
        for file in contents:
           spark_context.addFile(os.path.join(temporary_directory, file))
    
        return temporary_directory
    
    
  2. Oracle Autonomous Data Warehouse 인스턴스 및 전자 지갑과 관련된 다음 매개변수를 설정합니다.

      %%spark
      PASSWORD_SECRET_OCID = "ocid1.vaultsecret.oc1.phx.xxxxxxx"
      TARGET_TABLE = "ADMIN.TB_NAME"
      TNSNAME = "demolakehouseadw_medium"
      USER = "admin"
      WALLET_PATH = "oci://bucketname@osnamespace/Wallet_DemoLakeHouseADW.zip"
    
      # Download and distribute our wallet file.
      wallet_path = download_wallet(spark, WALLET_PATH)
      adw_url = "jdbc:oracle:thin:@{}?TNS_ADMIN={}".format(TNSNAME, wallet_path)
    
    
  3. 암호 서비스를 사용하여 암호를 가져옵니다.

      %%spark
      # Get our password using the secret service.
      print("Getting wallet password")
      token_path = get_delegation_token_path(spark)
      password = get_password_from_secrets(token_path, PASSWORD_SECRET_OCID)
      print("Done getting wallet password")
    
      # Save the results to the database.
      print("Saving processed data to " + adw_url)
      properties = {
         "driver": "oracle.jdbc.driver.OracleDriver",
         "oracle.net.tns_admin": TNSNAME,
         "password": password,
         "user": USER
      }
    
  4. Oracle Autonomous Data Warehouse의 샘플 테이블을 읽어보십시오.

      %%spark
      SOURCE_TABLE = "ADMIN.RETAILPOS"
      df_RetailPOS_15min = spark.read.jdbc(url=adw_url, table=SOURCE_TABLE, properties=properties)
    
    
  5. 위의 데이터 프레임을 Oracle Autonomous Data Warehouse로 로드하십시오.

      %%spark
    
      #Load into Oracle Autonomous Data Warehouse:
    
      TARGET_TABLE = "ADMIN.RETAILPOS_15MINUTES"
      print("TARGET_TABLE : ",TARGET_TABLE)
    
      # Write to Oracle Autonomous Data Warehouse.
      print("Write to Oracle Autonomous Data Warehouse : ")
      df_RetailPOS_15min.write.jdbc(url=adw_url, table=TARGET_TABLE, mode="Append", properties=properties)
      print("Writing done to Oracle Autonomous Data Warehouse : ")
    
    

    ADW에 기록

다음 단계

수락

작성자 - Kumar Chandragupta(OCI Sr. Cloud Engineer)

추가 학습 자원

docs.oracle.com/learn에서 다른 실습을 탐색하거나 Oracle Learning YouTube 채널에서 더 많은 무료 학습 콘텐츠에 액세스하십시오. 또한 Oracle Learning Explorer가 되려면 education.oracle.com/learning-explorer을 방문하십시오.

제품 설명서는 Oracle Help Center를 참조하십시오.