참고:
- 이 사용지침서에서는 Oracle Cloud에 접근해야 합니다. 무료 계정에 등록하려면 Oracle Cloud Infrastructure Free Tier 시작하기를 참조하십시오.
- Oracle Cloud Infrastructure 자격 증명, 테넌시 및 구획에 대한 예제 값을 사용합니다. 실습을 마치면 해당 값을 클라우드 환경과 관련된 값으로 대체합니다.
OCI Data Science 대화식 노트북을 사용하여 OCI 데이터 플로우에서 분석을 수행합니다.
소개
OCI(Oracle Cloud Infrastructure) 데이터 플로우는 거의 관리하지 않고도 모든 규모에서 Apache Spark 애플리케이션을 실행할 수 있는 완전 관리형 빅데이터 서비스입니다. Spark는 최고의 빅데이터 처리 프레임워크가 되었으며, OCI Data Flow는 개발자가 설치하거나 관리할 것이 없기 때문에 Oracle Cloud에서 Spark를 실행하는 가장 쉬운 방법입니다.
목표
이 자습서의 목적은 OCI Data Science 노트북 세션을 통해 OCI 데이터 플로우 세션에 액세스하는 데 필요한 설정을 안내하는 것입니다. 이 세션을 사용하면 Apache Livy 통합을 통해 오래 지속되는 데이터 플로우 클러스터에서 대화식 Spark 워크로드를 실행할 수 있습니다.
또한 OCI Data Flow Spark 세션이 생성되면 OCI Object Storage 및 Oracle Autonomous Data Warehouse에서 Spark 작업을 수행하기 위한 몇 가지 샘플 코드를 살펴보겠습니다.
작업 1: OCI Data Flow를 사용하여 OCI Data Science 노트북 설정
OCI Data Flow Conda 환경 생성 및 설정
-
필요한 버킷을 생성합니다.
a. 테넌시에 dataflow-logs라는 버킷을 생성합니다.
b. 테넌시에 dataflow-warehouse라는 버킷을 생성합니다.
-
특정 구획에 동적 그룹을 생성합니다.
ALL {resource.type='dataflowrun', resource.compartment.id='<compartment_id>'} ALL {resource.type='datasciencenotebooksession', resource.compartment.id='<compartment_id>'} Any {resource.type = 'datacatalogmetastore'} -
OCI Data Flow 및 OCI Data Science에서 OCI 리소스를 관리하는 정책을 생성합니다.
-
ALLOW DYNAMIC-GROUP <df-dynamic-group> TO MANAGE objects IN TENANCY WHERE ANY {target.bucket.name='<bucket_name>',target.bucket.name ='dataflow-logs,target.bucket.name='dataflow-warehouse'} -
ALLOW DYNAMIC-GROUP '<ds-dynamic-group>' TO MANAGE dataflow-family in compartment '<your-compartment-name>' -
ALLOW DYNAMIC-GROUP '<df-dynamic-group>' TO MANAGE data-catalog-metastores IN TENANCY -
ALLOW DYNAMIC-GROUP '<dcat-hive-group>' TO READ buckets IN TENANCY -
ALLOW DYNAMIC-GROUP '<dcat-hive-group>' TO MANAGE object-family IN TENANCY WHERE ANY { target.bucket.name = '<bucket_name>',target.bucket.name = '<managed-table-location-bucket>',target.bucket.name = '<external-table-location-bucket>'} -
ALLOW DYNAMIC-GROUP '<ds-dynamic-group>' TO MANAGE objects IN TENANCY WHERE ALL {target.bucket.name='ds-conda-env'} -
ALLOW DYNAMIC-GROUP '<df-dynamic-group>' TO MANAGE objects IN TENANCY WHERE ALL {target.bucket.name='ds-conda-env'}
-
-
OCI Data Science 프로젝트 및 세션을 생성합니다.

-
새 OCI Data Science 세션을 엽니다. 파일 메뉴에서 새 실행기를 선택한 다음 터미널을 누릅니다.

-
터미널에서
pyspark32_p38_cpu_v1Conda 환경을 설치하고 활성화합니다.odsc conda install -s pyspark32_p38_cpu_v1 source activate /home/datascience/conda/pyspark32_p38_cpu_v1 -
Conda가 활성화되면 새 실행기 탭으로 이동하여 설정을 누릅니다. Conda 패키지를 업로드하고 저장할 오브젝트 스토리지에 대한 필수 정보를 입력합니다.

-
Conda 환경을 게시합니다.
odsc conda publish -s pyspark3_2anddataflowv1_0참고: 게시하는 데 시간이 걸립니다. 완료되면 오브젝트 스토리지 버킷에 Conda 패키지가 업로드되는 것을 확인할 수 있습니다.
Livy Service를 사용하여 OCI Data Flow Spark 세션 설정 및 생성
새 실행기에서 커널로 "PySpark 및 DataFlow"를 사용하여 노트북을 열고 다음 명령을 실행하여 Livy Service를 사용하여 OCI Data Flow Spark 세션을 설정하고 생성합니다.
-
ADS를 사용하여 인증을 설정합니다.
import adsads.set_auth("resource_principal") # Supported values: resource_principal, api_key -
확장 로드.
%load_ext dataflow.magics -
OCI Data Science 노트북을 통해 Livy 서비스를 사용하여 OCI Data Flow Spark 세션을 생성합니다.
import json command = { "compartmentId": "ocid1.compartment.oc1..xxxxxxxxxxxxxx", "displayName": "Demo_DataFlow_Spark_v1", "sparkVersion": "3.2.1", "driverShape": "VM.Standard.E3.Flex", "executorShape": "VM.Standard.E3.Flex", "driverShapeConfig":{"ocpus":1,"memoryInGBs":16}, "executorShapeConfig":{"ocpus":1,"memoryInGBs":16}, "numExecutors": 1, "logsBucketUri": "<oci://bucket@namespace/>", "archiveUri": "<oci://bucket@namespace/archive.zip>" "configuration":{"spark.archives":"<oci://bucket@namespace/>#conda", "spark.oracle.datasource.enabled":"true"} } command = f'\'{json.dumps(command)}\'' print("command",command) #"configuration":{ # "spark.dynamicAllocation.enabled":"true", # "spark.dynamicAllocation.shuffleTracking.enabled":"true", # "spark.dynamicAllocation.minExecutors":"1", # "spark.dynamicAllocation.maxExecutors":"4", # "spark.dynamicAllocation.executorIdleTimeout":"60", # "spark.dynamicAllocation.schedulerBacklogTimeout":"60", # "spark.dataflow.dynamicAllocation.quotaPolicy":"min" }}' %create_session -l python -c $command
작업 2: 샘플 코드를 사용하여 OCI Object Storage에서 Spark 작업 수행
-
세션에서 종속 라이브러리를 가져옵니다.
%%spark #Import required libraries. import json import os import sys import datetime import oci import pyspark.sql from pyspark.sql.functions import countDistinct from delta.tables import * -
오브젝트 스토리지에서 Spark 읽기 작업을 수행합니다. Livy 세션에서
spark.read를 사용하여 오브젝트 스토리지 파일을 읽습니다.%%spark -o df_Bronze_Insurance_Data #Read Claim Insurance files from OCI Object Storage in Spark Dataframe. df_Bronze_Insurance_Data = spark.read.format("csv").option("header", "true") \ .option("multiLine", "true").load("oci://test-demo@OSNamespace/insur_claim/claim.csv*") print("df_RawZone_Data",df_Bronze_Insurance_Data) df_Bronze_Insurance_Data.show(5) -
오브젝트 스토리지에 대해 Spark 쓰기 작업을 수행합니다.
%%spark df_Bronze_Insurance_Data.write.format("json").option("mode","overwrite").save("oci://test-demo@OSNamespace/insur_claim/claim_curated")
작업 3: 샘플 코드를 사용하여 Oracle Autonomous Data Warehouse에서 읽기 및 쓰기 작업 수행
-
Wallet용 Secret Vault를 사용하여 Oracle Autonomous Data Warehouse로 데이터를 로드합니다. 다음 코드를 있는 그대로 복사합니다. 자세한 내용은 GitHub sample를 참조하십시오.
%%spark def get_authenticated_client(token_path, client, file_location=None, profile_name=None): """ Get an an authenticated OCI client. Example: get_authenticated_client(token_path, oci.object_storage.ObjectStorageClient) """ import oci if not in_dataflow(): # We are running locally, use our API Key. if file_location is None: file_location = oci.config.DEFAULT_LOCATION if profile_name is None: profile_name = oci.config.DEFAULT_PROFILE config = oci.config.from_file(file_location=file_location, profile_name=profile_name) authenticated_client = client(config) else: # We are running in Data Flow, use our Delegation Token. with open(token_path) as fd: delegation_token = fd.read() signer = oci.auth.signers.InstancePrincipalsDelegationTokenSigner( delegation_token=delegation_token ) authenticated_client = client(config={}, signer=signer) return authenticated_client def get_password_from_secrets(token_path, password_ocid): """ Get a password from the OCI Secrets Service. """ import base64 import oci secrets_client = get_authenticated_client(token_path, oci.secrets.SecretsClient) response = secrets_client.get_secret_bundle(password_ocid) base64_secret_content = response.data.secret_bundle_content.content base64_secret_bytes = base64_secret_content.encode("ascii") base64_message_bytes = base64.b64decode(base64_secret_bytes) secret_content = base64_message_bytes.decode("ascii") return secret_content def get_delegation_token_path(spark): """ Get the delegation token path when we're running in Data Flow. """ if not in_dataflow(): return None token_key = "spark.hadoop.fs.oci.client.auth.delegationTokenPath" token_path = spark.sparkContext.getConf().get(token_key) if not token_path: raise Exception(f"{token_key} is not set") return token_path def get_temporary_directory(): if in_dataflow(): return "/opt/spark/work-dir/" else: import tempfile return tempfile.gettempdir() def in_dataflow(): """ Determine if we are running in OCI Data Flow by checking the environment. """ if os.environ.get("HOME") == "/home/dataflow": return True return False def download_wallet(spark, wallet_path): """ Download an Oracle Autonomous Data Warehouse/ATP wallet file and prepare it for use in a Data Flow application. """ import oci import zipfile # Get an object store client. token_path = get_delegation_token_path(spark) object_store_client = get_authenticated_client( token_path, oci.object_storage.ObjectStorageClient ) # Download the wallet file. from urllib.parse import urlparse parsed = urlparse(wallet_path) bucket_name, namespace = parsed.netloc.split("@") file_name = parsed.path[1:] response = object_store_client.get_object(namespace, bucket_name, file_name) temporary_directory = get_temporary_directory() zip_file_path = os.path.join(temporary_directory, "wallet.zip") with open(zip_file_path, "wb") as fd: for chunk in response.data.raw.stream(1024 * 1024, decode_content=False): fd.write(chunk) # Extract everything locally. with zipfile.ZipFile(zip_file_path, "r") as zip_ref: zip_ref.extractall(temporary_directory) # Distribute all wallet files. contents = "cwallet.sso ewallet.p12 keystore.jks ojdbc.properties sqlnet.ora tnsnames.ora truststore.jks".split() spark_context = spark.sparkContext for file in contents: spark_context.addFile(os.path.join(temporary_directory, file)) return temporary_directory -
Oracle Autonomous Data Warehouse 인스턴스 및 전자 지갑과 관련된 다음 매개변수를 설정합니다.
%%spark PASSWORD_SECRET_OCID = "ocid1.vaultsecret.oc1.phx.xxxxxxx" TARGET_TABLE = "ADMIN.TB_NAME" TNSNAME = "demolakehouseadw_medium" USER = "admin" WALLET_PATH = "oci://bucketname@osnamespace/Wallet_DemoLakeHouseADW.zip" # Download and distribute our wallet file. wallet_path = download_wallet(spark, WALLET_PATH) adw_url = "jdbc:oracle:thin:@{}?TNS_ADMIN={}".format(TNSNAME, wallet_path) -
암호 서비스를 사용하여 암호를 가져옵니다.
%%spark # Get our password using the secret service. print("Getting wallet password") token_path = get_delegation_token_path(spark) password = get_password_from_secrets(token_path, PASSWORD_SECRET_OCID) print("Done getting wallet password") # Save the results to the database. print("Saving processed data to " + adw_url) properties = { "driver": "oracle.jdbc.driver.OracleDriver", "oracle.net.tns_admin": TNSNAME, "password": password, "user": USER } -
Oracle Autonomous Data Warehouse의 샘플 테이블을 읽어보십시오.
%%spark SOURCE_TABLE = "ADMIN.RETAILPOS" df_RetailPOS_15min = spark.read.jdbc(url=adw_url, table=SOURCE_TABLE, properties=properties) -
위의 데이터 프레임을 Oracle Autonomous Data Warehouse로 로드하십시오.
%%spark #Load into Oracle Autonomous Data Warehouse: TARGET_TABLE = "ADMIN.RETAILPOS_15MINUTES" print("TARGET_TABLE : ",TARGET_TABLE) # Write to Oracle Autonomous Data Warehouse. print("Write to Oracle Autonomous Data Warehouse : ") df_RetailPOS_15min.write.jdbc(url=adw_url, table=TARGET_TABLE, mode="Append", properties=properties) print("Writing done to Oracle Autonomous Data Warehouse : ")
다음 단계
-
OCI Data Flow를 사용하면 Data Flow에 대해 대화식으로 애플리케이션을 실행하도록 OCI Data Science 노트북을 구성할 수 있습니다. Data Flow Studio에서 Data Science를 사용하는 방법에 대한 자습서 비디오를 시청하십시오.
-
데이터 과학 및 데이터 플로우 통합에 대한 자세한 내용은 Oracle Accelerated Data Science SDK 설명서를 참조하십시오.
-
GitHub에서 데이터 플로우 샘플 및 데이터 과학 샘플과 함께 더 많은 예를 사용할 수 있습니다.
-
지금 시작하려면 Oracle Cloud 무료 체험판에 등록하거나 계정에 사인인하여 OCI Data Flow를 시도하십시오. Data Flow의 설치 없는 15분 튜토리얼을 사용해 Oracle Cloud Infrastructure를 통해 Spark 처리가 얼마나 쉬운지 알아보십시오.
관련 링크
수락
작성자 - Kumar Chandragupta(OCI Sr. Cloud Engineer)
추가 학습 자원
docs.oracle.com/learn에서 다른 실습을 탐색하거나 Oracle Learning YouTube 채널에서 더 많은 무료 학습 콘텐츠에 액세스하십시오. 또한 Oracle Learning Explorer가 되려면 education.oracle.com/learning-explorer을 방문하십시오.
제품 설명서는 Oracle Help Center를 참조하십시오.
Perform your analytics on OCI Data Flow using OCI Data Science interactive notebook
F79466-01
March 2023
Copyright © 2023, Oracle and/or its affiliates.