Integrazione di Data Flow e Data Science

Con Data Flow, puoi configurare i notebook Data Science per eseguire le applicazioni in modo interattivo su Data Flow.

Data Flow utilizza i notebook Jupyter completamente gestiti per consentire ai data scientist e ai data engineer di creare, visualizzare, collaborare ed eseguire il debug di applicazioni di data engineering e data science. È possibile scrivere queste applicazioni in Python, Scala e PySpark. È inoltre possibile connettere una sessione notebook Data Science a Data Flow per eseguire le applicazioni. I kernel e le applicazioni Data Flow vengono eseguiti su Oracle Cloud Infrastructure Data Flow .

Apache Spark è un sistema di calcolo distribuito progettato per elaborare i dati su larga scala. Supporta l'elaborazione SQL, batch e stream su larga scala e task di Machine Learning. Spark SQL fornisce supporto di tipo database. Per eseguire query sui dati strutturati, utilizzare Spark SQL. Si tratta di un'implementazione SQL standard ANSI.

Le sessioni di Data Flow supportano il ridimensionamento automatico delle funzionalità del cluster di Data Flow. Per ulteriori informazioni, vedere Ridimensionamento automatico nella documentazione di Data Flow.

Le sessioni di flusso di dati supportano l'uso di ambienti conda come ambienti runtime Spark personalizzabili.

Un notebook Data Science utilizza Data Flow Magic per inviare i requet a Data Flow utilizzando le API NotebookSession per eseguire il codice Spark su un server Data Flow.
Limitazioni
  • Le sessioni di flusso dati durano fino a 7 giorni o 10.080 minuti (maxDurationInMinutes).

  • Le sessioni di flusso dati hanno un valore di timeout inattività predefinito di 480 minuti (8 ore) (idleTimeoutInMinutes). È possibile configurare un valore diverso.
  • La sessione di flusso dati è disponibile solo tramite una sessione notebook di Data Science.
  • Sono supportati solo Spark versione 3.5.0 e 3.2.1.
Suggerimento

Guarda il video dell'esercitazione sull'utilizzo di Data Science con Data Flow Studio. Per ulteriori informazioni sull'integrazione di Data Science e Data Flow, consulta anche la documentazione di Oracle Accelerated Data Science SDK.

Installazione dell'ambiente Conda

Attenersi alla procedura descritta di seguito per utilizzare il flusso di dati con Magic.

  1. Creare o aprire una sessione notebook in Data Science.
    • La sessione notebook deve trovarsi nell'area in cui il servizio è stato abilitato per la tenancy.
    • La sessione notebook deve trovarsi nel compartimento del gruppo dinamico di sessioni notebook.
  2. Installare e attivare l'ambiente Conda pyspark32_p38_cpu_v3 nella sessione notebook:
    copy odsc conda install -s pyspark32_p38_cpu_v3
    Nota

    Si consiglia di installare l'ambiente Conda aggiornato pyspark32_p38_cpu_v3, poiché include il supporto per il riavvio automatico delle sessioni arrestate perché hanno superato la durata massima del timeout.

    Il precedente ambiente Conda stabile era pyspark32_p38_cpu_v2.

  3. Attivare l'ambiente conda pyspark32_p38_cpu_v3:
    source activate /home/datascience/conda/pyspark32_p38_cpu_v3

Uso del flusso di dati con Data Science

Attenersi alla procedura riportata di seguito per eseguire un'applicazione utilizzando Data Flow con Data Science.

  • Assicurarsi di disporre dei criteri impostati per utilizzare un notebook con Data Flow.

  • Assicurarsi di avere impostato correttamente i criteri di Data Science.

  • Per un elenco di tutti i comandi supportati, utilizzare il comando %help.
  • I comandi riportati di seguito si applicano sia a Spark 3.5.0 che a Spark 3.2.1. Negli esempi viene utilizzato Spark 3.5.0. Impostare il valore di sparkVersion in base alla versione di Spark utilizzata.
  1. Impostare l'autenticazione in ADS.
    • Il kit SDK ADS viene utilizzato per controllare il tipo di autenticazione utilizzato in Data Flow Magic.
    • L'autenticazione API KEY viene utilizzata per impostazione predefinita. Per modificare il tipo di autenticazione, utilizzare il comando ads.set_auth("resource_principal"):
      import ads
      ads.set_auth("resource_principal") # Supported values: resource_principal, api_key
  2. Carica l'estensione Magic di Data Flow
    %load_ext dataflow.magics
  3. (Facoltativo) Creare una sessione di Data Flow utilizzando il comando magic %create_session:
    Sessione comune
    In questo esempio viene illustrato come creare una nuova sessione su forme flessibili.
    import json
    command = {
        "compartmentId": "<compartment_ocid>",
        "displayName": "<session_name>",
        "language": "PYTHON",
        "sparkVersion": "3.5.0",
        "driverShape": "VM.Standard.E3.Flex",
        "executorShape": "VM.Standard.E3.Flex",
        "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "numExecutors": 1,
        "type": "SESSION",
        "logsBucketUri": "oci://<bucket>@<namespace>/",
    }
    command = f'\'{json.dumps(command)}\''
     
    %create_session -l python -c $command
    Sessione con URI archivio
    import json
    command = {
        "compartmentId": "<compartment_ocid>",
        "displayName": "<session_name>",
        "language": "PYTHON",
        "sparkVersion": "3.5.0",
        "driverShape": "VM.Standard.E3.Flex",
        "executorShape": "VM.Standard.E3.Flex",
        "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "numExecutors": 1,
        "type": "SESSION",
        "logsBucketUri": "oci://<bucket>@<namespace>",
        "archiveUri": <oci://<bucket>@<namespace>/archive.zip"
    }
    command = f'\'{json.dumps(command)}\''
     
    %create_session -l python -c $command
    Sessione con un ambiente Conda personalizzato
    import json
    command = {
        "compartmentId": "<compartment_ocid>",
        "displayName": "<session_name>",
        "language": "PYTHON",
        "sparkVersion": "3.5.0",
        "driverShape": "VM.Standard.E3.Flex",
        "executorShape": "VM.Standard.E3.Flex",
        "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "numExecutors": 1,
        "type": "SESSION",
        "logsBucketUri": "oci://<bucket>@<namespace>",
        "configuration": {
            "spark.archives": "oci://<bucket>@<namespace>/conda_pack/<pack_name>"
        },
    }
    command = f'\'{json.dumps(command)}\''
     
    %create_session -l python -c $command
    Sessione con metastore
    import json
    command = {
        "compartmentId": "<compartment_ocid>",
        "displayName": "<session_name>",
        "language": "PYTHON",
        "sparkVersion": "3.5.0",
        "driverShape": "VM.Standard.E3.Flex",
        "executorShape": "VM.Standard.E3.Flex",
        "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "numExecutors": 1,
        "type": "SESSION",
        "logsBucketUri": oci://<bucket>@<namespace>",
        "metastoreId": "<ocid1.datacatalogmetastore.oc1.iad...>",
        "configuration": {
            "spark.archives": "oci://<bucket>@<namespace>/conda_pack/<pack_name>"
        },
    }
    command = f'\'{json.dumps(command)}\''
     
    %create_session -l python -c $command
  4. (Facoltativo) Utilizzare una sessione di Data Flow esistente:
    Utilizzare il comando %use_session. Copiare l'OCID dalla console.
    %use_session -s <session_OCID> -r <region_name>
                                
  5. Configurare la sessione utilizzando il comando %config_session:
    • Per visualizzare la configurazione corrente:
      %config
    • Per modificare i driver e gli esecutori:
      %configure_session -i '{"driverShape": "VM.Standard2.1", "executorShape": "VM.Standard2.1", "numExecutors": 1}'
    • Per applicare la ridimensionamento automatico:
      %configure_session -i '{"driverShape": "VM.Standard2.1",\
      "executorShape": "VM.Standard2.1", "numExecutors": 16,\
      "sparkVersion":"3.5.0",\
      "configuration":{"spark.dynamicAllocation.enabled":"true",\
      "spark.dynamicAllocation.shuffleTracking.enabled":"true",\
      "spark.dynamicAllocation.minExecutors":"16",\
      "spark.dynamicAllocation.maxExecutors":"54",\
      "spark.dynamicAllocation.executorIdleTimeout":"60",\
      "spark.dynamicAllocation.schedulerBacklogTimeout":"60",\
      "spark.dataflow.dynamicAllocation.quotaPolicy":"max"} \
      }'
      Puoi anche applicare un criterio di ridimensionamento automatico quando crei una sessione.
  6. (Facoltativo) Per attivare una sessione esistente, utilizzare il comando %activate_session:
    %activate_session -l python -c 
    '{"compartmentId": "<Compartment_OCID>",
        "displayName": "<Name>",
        "applicationId": "<Application_OCID>"
    }'
  7. Per arrestare una sessione, utilizzare il comando %stop_session:
    %stop_session

Personalizzazione di un ambiente Spark di flusso di dati con un ambiente Conda

È possibile utilizzare un ambiente Conda pubblicato come ambiente runtime.

  1. Installare Spark 3.5.0 e Data Flow nella sessione notebook:
    odsc conda install -s pyspark32_p38_cpu_v3
  2. Installare le librerie utilizzando conda.
  3. Pubblicare l'ambiente Conda:
    odsc conda publish -s pyspark32_p38_cpu_v3
  4. Avviare il cluster Data Flow, ad esempio:
    %create_session -l python -c '{"compartmentId":"<your-compartment-ocid>", \
    "displayName":"<your-display-name>",\
    "sparkVersion":"3.5.0", \
    "language":"PYTHON", \
    "type": "SESSION",\
    "driverShape":"VM.Standard2.1", \
    "executorShape":"VM.Standard2.1",\
    "numExecutors":1,\
    "configuration": {"spark.archives":"oci://<your-bucket>@<your-tenancy-namespace>/<your-path-to-the-conda-environment>#conda"}}'
    Nota

    La configurazione della sessione deve includere i seguenti parametri:
    • "sparkVersion":"3.5.0"
    • "language":"PYTHON"
    • "configuration" con un percorso spark.archives all'ambiente Conda nello storage degli oggetti.

Esecuzione di spark-nlp nel flusso di dati

Attenersi alla procedura riportata di seguito per installare Spark-nlp ed eseguirlo in Data Flow.

È necessario aver completato i passi 1 e 2 in Personalizzazione di un ambiente Spark di un flusso di dati con un ambiente Conda. La libreria spark-nlp è preinstallata nell'ambiente conda pyspark32_p38_cpu_v2.

  1. Installare i modelli e le pipeline spark-nlp pre-addestrati.

    Se sono necessari modelli spark-nlp pre-addestrati, scaricarli e decomprimerli nella cartella dell'ambiente Conda. Data Flow non supporta ancora l'uscita alla rete Internet pubblica. Non è possibile scaricare in modo dinamico modelli pre-addestrati da AWS S3 in Data Flow.

    È possibile scaricare i modelli pre-addestrati dall'hub dei modelli come archivi zip Estrarre il modello nella cartella dell'ambiente Conda. Il modello di esempio è https://nlp.johnsnowlabs.com/2021/03/23/explain_document_dl_en.html:
    mkdir /home/datascience/conda/pyspark32_p38_cpu_v2/sparknlp-models
    unzip explain_document_dl_en_3.0.0_3.0_1616473268265.zip -d /home/datascience/conda/pyspark32_p38_cpu_v2/sparknlp-models/
  2. Pubblicare l'ambiente Conda, vedere il passo 3 in Personalizzazione di un ambiente Spark di Data Flow con un ambiente Conda.
  3. Avviare il cluster Data Flow.

    In una cella notebook in esecuzione nel kernel pyspark30_p37_cpu_v5 della sessione notebook, controllare due volte

    Parametro spark.jars.packages. Indica la versione di spark-nlp installata.
    %create_session -l python -c '{"compartmentId":" <your-compartment-ocid>", \
    "displayName":"sparknlp",\
    "sparkVersion":"3.2.1", \
    "language":"PYTHON", \
    "type": "SESSION",\
    "driverShape":"VM.Standard2.1", \
    "executorShape":"VM.Standard2.1",\
    "numExecutors":1,\
    "configuration": {"spark.archives":"oci://<your-bucket>@<your-tenancy-namespace>/<your-path-to-the-conda-environment>#conda",\
    "spark.jars.ivy":"/opt/spark/work-dir/conda/.ivy2",\
    "spark.jars.packages": "com.johnsnowlabs.nlp:spark-nlp_2.12:4.1.0"}\
    }'
  4. Testarlo con uno snippet di codice dal repository GitHub spark-nlp:
    %%spark
     
    from sparknlp.base import *
    from sparknlp.annotator import *
    from sparknlp.pretrained import PretrainedPipeline
    import sparknlp
     
    # Start SparkSession with Spark NLP
    # start() functions has 3 parameters: gpu, m1, and memory
    # sparknlp.start(gpu=True) will start the session with GPU support
    # sparknlp.start(m1=True) will start the session with macOS M1 support
    # sparknlp.start(memory="16G") to change the default driver memory in SparkSession
    spark = sparknlp.start()
     
    # Download a pre-trained pipeline
    pipeline = PretrainedPipeline('explain_document_dl', lang='en', disk_location="/opt/spark/work-dir/conda/sparknlp-models/")
     
     
    # Your testing dataset
    text = """
    Lawrence Joseph Ellison (born August 17, 1944) is an American business magnate and investor who is the co-founder,
    executive chairman, chief technology officer (CTO) and former chief executive officer (CEO) of the
    American computer technology company Oracle Corporation.[2] As of September 2022, he was listed by
    Bloomberg Billionaires Index as the ninth-wealthiest person in the world, with an estimated
    fortune of $93 billion.[3] Ellison is also known for his 98% ownership stake in Lanai,
    the sixth-largest island in the Hawaiian Archipelago.[4]
    """
     
    # Annotate your testing dataset
    result = pipeline.annotate(text)
     
    # What's in the pipeline
    print(list(result.keys()))
     
    # Check the results
    print(result['entities'])
    Il seguente output si trova nella cella del notebook:
    ['entities', 'stem', 'checked', 'lemma', 'document', 'pos', 'token', 'ner', 'embeddings', 'sentence']
    ['Lawrence Joseph Ellison', 'American', 'American', 'Oracle Corporation', 'Bloomberg Billionaires Index', 'Ellison', 'Lanai', 'Hawaiian Archipelago']

Esempi

Di seguito sono riportati alcuni esempi di utilizzo dei dati FlowMagic.

PySpark

La variabile sc rappresenta il Spark ed è disponibile quando si utilizza il comando magic %%spark. La cella seguente è un esempio giocattolo di come utilizzare sc in una cella Data FlowMagic. La cella chiama il metodo .parallelize() che crea un RDD, numbers, da un elenco di numeri. Vengono stampate le informazioni sull'RDD. Il metodo .toDebugString() restituisce una descrizione dell'RDD.
%%spark
print(sc.version)
 
numbers = sc.parallelize([4, 3, 2, 1])
print(f"First element of numbers is {numbers.first()}")
print(f"The RDD, numbers, has the following description\n{numbers.toDebugString()}")

Spark SQL

L'uso dell'opzione -c sql consente di eseguire i comandi SQL Spark in una cella. In questa sezione viene utilizzato il set di dati citi bike. La cella seguente legge il set di dati in un dataframe Spark e lo salva come tabella. Questo esempio viene utilizzato per mostrare Spark SQL.

Il data set Citibike viene caricato nello storage degli oggetti. Potrebbe essere necessario fare lo stesso nel tuo regno.
%%spark
df_bike_trips = spark.read.csv("oci://dmcherka-dev@ociodscdev/201306-citibike-tripdata.csv", header=False, inferSchema=True)
df_bike_trips.show()
df_bike_trips.createOrReplaceTempView("bike_trips")

L'esempio seguente utilizza l'opzione -c sql per indicare a Data FlowMagic che il contenuto della cella è SparkSQL. L'opzione -o <variable> acquisisce i risultati dell'operazione SQL Spark e li memorizza nella variabile definita. In questo caso, la

df_bike_trips è un dataframe Pandas disponibile per l'uso nel notebook.
%%spark -c sql -o df_bike_trips
SELECT _c0 AS Duration, _c4 AS Start_Station, _c8 AS End_Station, _c11 AS Bike_ID FROM bike_trips;
Stampare le prime righe di dati:
df_bike_trips.head()
Analogamente, è possibile utilizzare sqlContext per eseguire una query sulla tabella:
%%spark
df_bike_trips_2 = sqlContext.sql("SELECT * FROM bike_trips")
df_bike_trips_2.show()
Infine, è possibile descrivere la tabella:
%%spark -c sql
SHOW TABLES

Widget di visualizzazione automatica

Data FlowMagic viene fornito con autovizwidget che consente la visualizzazione dei dataframe Pandas. La funzione display_dataframe() utilizza un dataframe Pandas come parametro e genera una GUI interattiva nel notebook. Sono disponibili schede che mostrano la visualizzazione dei dati in varie forme, ad esempio tabelle, grafici a torta, grafici a dispersione e grafici ad aree e a barre.

La cella seguente chiama display_dataframe() con il dataframe df_people creato nella sezione Spark SQL del notebook:
from autovizwidget.widget.utils import display_dataframe
display_dataframe(df_bike_trips)

Matplotlib

Un compito comune che i data scientist devono eseguire è quello di visualizzare i propri dati. Con set di dati di grandi dimensioni, di solito non è possibile ed è quasi sempre preferibile estrarre i dati dal cluster Spark di Data Flow nella sessione notebook. Questo esempio mostra come utilizzare le risorse lato server per generare un grafico e includerlo nel notebook.

Il dataframe df_bike_trips viene definito nella sessione e viene riutilizzato. Per produrre un Matplotlib, includere le librerie necessarie e generare la trama. Utilizzare il comando magic %matplot plt per visualizzare la trama nel notebook, anche se viene visualizzata sul lato server:
%%spark
import matplotlib.pyplot as plt
df_bike_trips.groupby("_c4").count().toPandas().plot.bar(x="_c4", y="count")
%matplot plt

Ulteriori esempi

Sono disponibili altri esempi da GitHub con Esempi di flusso dati e Esempi di data science.