Data Flow-Integration mit Data Science

Mit Data Flow können Sie Data Science-Notizbücher konfigurieren, um Anwendungen interaktiv für Data Flow auszuführen.

Data Flow verwendet vollständig verwaltete Jupyter-Notizbücher, damit Data Scientists und Data Engineers Data Science-Anwendungen erstellen, visualisieren und debuggen sowie zusammen daran arbeiten können. Sie können diese Anwendungen in Python, Scala und PySpark schreiben. Sie können auch eine Data Science-Notebook-Session zur Ausführung von Anwendungen mit Data Flow verbinden. Die Datenflusskernels und -anwendungen, die auf Oracle Cloud Infrastructure Data Flow ausgeführt werden.

Apache Spark ist ein verteiltes Compute-System für die skalierbare Datenverarbeitung. Es unterstützt umfangreiche SQL-, Batch- und Streamverarbeitungs- und ML-Aufgaben. Spark SQL bietet datenbankähnliche Unterstützung. Um strukturierte Daten abzufragen, verwenden Sie Spark SQL. Es ist eine ANSI-Standard-SQL-Implementierung.

Data Flow-Sessions unterstützen das Autoscaling von Data Flow-Clusterfunktionen. Weitere Informationen finden Sie in der Data Flow-Dokumentation unter Autoscaling.

Data Flow-Sessions unterstützen die Verwendung von Conda-Umgebungen als anpassbare Spark-Laufzeitumgebungen.

Ein Data Science-Notebook sendet mit Data Flow Magic Anforderungen mit den NotebookSession-APIs an Data Flow, um Spark-Code auf einem Data Flow-Server auszuführen.
Einschränkungen
  • Data Flow-Sessions dauern bis zu 7 Tage oder 10.080 Minuten (maxDurationInMinutes).

  • Data Flow-Sessions haben einen standardmäßigen inaktiven Timeout von 480 Minuten (8 Stunden) (idleTimeoutInMinutes). Sie können einen anderen Wert konfigurieren.
  • Die Data Flow-Session ist nur über eine Data Science-Notebooksession verfügbar.
  • Es werden nur Spark-Versionen 3.5.0 und 3.2.1 unterstützt.
Tipp

Sehen Sie sich das Tutorialvideo an, um Data Science mit Data Flow Studio zu verwenden. Weitere Informationen zur Integration von Data Science und Data Flow finden Sie in der Dokumentation zum Oracle Accelerated Data Science-SDK.

Conda-Umgebung installieren

Führen Sie die folgenden Schritte aus, um Data Flow mit Data Flow Magic zu verwenden.

  1. Erstellen oder öffnen Sie eine Notebook-Session in Data Science.
    • Die Notizbuchsession muss sich in der Region befinden, in der der Service für den Mandanten aktiviert wurde.
    • Die Notizbuchsession muss sich im Compartment der dynamischen Gruppe von Notizbuchsessions befinden.
  2. Installieren und aktivieren Sie die Conda-Umgebung pyspark32_p38_cpu_v3 in der Notizbuchsession:
    copy odsc conda install -s pyspark32_p38_cpu_v3
    Hinweis

    Es wird empfohlen, die aktualisierte Conda-Umgebung pyspark32_p38_cpu_v3 zu installieren, da sie die Unterstützung für das automatische Neustarten von Sessions enthält, die gestoppt wurden, weil sie die maximale Timeoutdauer überschritten haben.

    Die vorherige stabile Conda-Umgebung war pyspark32_p38_cpu_v2.

  3. Aktivieren Sie die Conda-Umgebung pyspark32_p38_cpu_v3:
    source activate /home/datascience/conda/pyspark32_p38_cpu_v3

Data Flow mit Data Science verwenden

So führen Sie eine Anwendung mit Data Flow mit Data Science aus:

  • Stellen Sie sicher, dass die Policys für die Verwendung eines Notizbuchs mit Data Flow eingerichtet sind.

  • Stellen Sie sicher, dass die Data Science-Policys korrekt eingerichtet sind.

  • Eine Liste aller unterstützten Befehle erhalten Sie mit dem Befehl %help.
  • Die Befehle in den folgenden Schritten gelten sowohl für Spark 3.5.0 als auch für Spark 3.2.1. In den Beispielen wird Spark 3.5.0 verwendet. Legen Sie den Wert von sparkVersion entsprechend der verwendeten Spark-Version fest.
  1. Richten Sie die Authentifizierung in ADS ein.
    • Mit dem ADS-SDK wird der Authentifizierungstyp gesteuert, der in Data Flow Magic verwendet wird.
    • Die API-KEY-Authentifizierung wird standardmäßig verwendet. Um den Authentifizierungstyp zu ändern, verwenden Sie den Befehl ads.set_auth("resource_principal"):
      import ads
      ads.set_auth("resource_principal") # Supported values: resource_principal, api_key
  2. Data Flow Magic-Erweiterung laden
    %load_ext dataflow.magics
  3. (Optional) Erstellen Sie eine Data Flow-Session mit dem Magic-Befehl %create_session:
    Allgemeine Session
    In diesem Beispiel wird gezeigt, wie Sie eine neue Session mit flexiblen Ausprägungen erstellen:
    import json
    command = {
        "compartmentId": "<compartment_ocid>",
        "displayName": "<session_name>",
        "language": "PYTHON",
        "sparkVersion": "3.5.0",
        "driverShape": "VM.Standard.E3.Flex",
        "executorShape": "VM.Standard.E3.Flex",
        "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "numExecutors": 1,
        "type": "SESSION",
        "logsBucketUri": "oci://<bucket>@<namespace>/",
    }
    command = f'\'{json.dumps(command)}\''
     
    %create_session -l python -c $command
    Session mit Archiv-URI
    import json
    command = {
        "compartmentId": "<compartment_ocid>",
        "displayName": "<session_name>",
        "language": "PYTHON",
        "sparkVersion": "3.5.0",
        "driverShape": "VM.Standard.E3.Flex",
        "executorShape": "VM.Standard.E3.Flex",
        "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "numExecutors": 1,
        "type": "SESSION",
        "logsBucketUri": "oci://<bucket>@<namespace>",
        "archiveUri": <oci://<bucket>@<namespace>/archive.zip"
    }
    command = f'\'{json.dumps(command)}\''
     
    %create_session -l python -c $command
    Session mit einer benutzerdefinierten Conda-Umgebung
    import json
    command = {
        "compartmentId": "<compartment_ocid>",
        "displayName": "<session_name>",
        "language": "PYTHON",
        "sparkVersion": "3.5.0",
        "driverShape": "VM.Standard.E3.Flex",
        "executorShape": "VM.Standard.E3.Flex",
        "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "numExecutors": 1,
        "type": "SESSION",
        "logsBucketUri": "oci://<bucket>@<namespace>",
        "configuration": {
            "spark.archives": "oci://<bucket>@<namespace>/conda_pack/<pack_name>"
        },
    }
    command = f'\'{json.dumps(command)}\''
     
    %create_session -l python -c $command
    Session mit Metastore
    import json
    command = {
        "compartmentId": "<compartment_ocid>",
        "displayName": "<session_name>",
        "language": "PYTHON",
        "sparkVersion": "3.5.0",
        "driverShape": "VM.Standard.E3.Flex",
        "executorShape": "VM.Standard.E3.Flex",
        "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "numExecutors": 1,
        "type": "SESSION",
        "logsBucketUri": oci://<bucket>@<namespace>",
        "metastoreId": "<ocid1.datacatalogmetastore.oc1.iad...>",
        "configuration": {
            "spark.archives": "oci://<bucket>@<namespace>/conda_pack/<pack_name>"
        },
    }
    command = f'\'{json.dumps(command)}\''
     
    %create_session -l python -c $command
  4. (Optional) Eine vorhandene Data Flow-Session verwenden:
    Verwenden Sie den Befehl %use_session. Kopieren Sie die OCID in der Konsole.
    %use_session -s <session_OCID> -r <region_name>
  5. Konfigurieren Sie die Session mit dem Befehl %config_session:
    • So zeigen Sie die aktuelle Konfiguration an:
      %config
    • So ändern Sie die Treiber und Executors:
      %configure_session -i '{"driverShape": "VM.Standard2.1", "executorShape": "VM.Standard2.1", "numExecutors": 1}'
    • So wenden Sie Autoscaling an:
      %configure_session -i '{"driverShape": "VM.Standard2.1",\
      "executorShape": "VM.Standard2.1", "numExecutors": 16,\
      "sparkVersion":"3.5.0",\
      "configuration":{"spark.dynamicAllocation.enabled":"true",\
      "spark.dynamicAllocation.shuffleTracking.enabled":"true",\
      "spark.dynamicAllocation.minExecutors":"16",\
      "spark.dynamicAllocation.maxExecutors":"54",\
      "spark.dynamicAllocation.executorIdleTimeout":"60",\
      "spark.dynamicAllocation.schedulerBacklogTimeout":"60",\
      "spark.dataflow.dynamicAllocation.quotaPolicy":"max"} \
      }'
      Sie können beim Erstellen einer Session auch eine Autoscaling Policy anwenden.
  6. (Optional) Um eine vorhandene Session zu aktivieren, verwenden Sie den Befehl %activate_session:
    %activate_session -l python -c 
    '{"compartmentId": "<Compartment_OCID>",
        "displayName": "<Name>",
        "applicationId": "<Application_OCID>"
    }'
  7. Um eine Session zu stoppen, verwenden Sie den Befehl %stop_session:
    %stop_session

Dataflow-Spark-Umgebung mit einer Conda-Umgebung anpassen

Sie können eine veröffentlichte Conda-Umgebung als Laufzeitumgebung verwenden.

  1. Installieren Sie Spark 3.5.0 und Data Flow in der Notizbuchsession:
    odsc conda install -s pyspark32_p38_cpu_v3
  2. Installieren Sie die Librarys mit Conda.
  3. Veröffentlichen Sie die Conda-Umgebung:
    odsc conda publish -s pyspark32_p38_cpu_v3
  4. Starten Sie das Data Flow-Cluster. Beispiel:
    %create_session -l python -c '{"compartmentId":"<your-compartment-ocid>", \
    "displayName":"<your-display-name>",\
    "sparkVersion":"3.5.0", \
    "language":"PYTHON", \
    "type": "SESSION",\
    "driverShape":"VM.Standard2.1", \
    "executorShape":"VM.Standard2.1",\
    "numExecutors":1,\
    "configuration": {"spark.archives":"oci://<your-bucket>@<your-tenancy-namespace>/<your-path-to-the-conda-environment>#conda"}}'
    Hinweis

    Die Sessionkonfiguration muss die folgenden Parameter enthalten:
    • "sparkVersion":"3.5.0"
    • "language":"PYTHON"
    • "configuration" mit einem spark.archives-Pfad zur Conda-Umgebung im Objektspeicher.

spark-nlp bei Datenfluss ausführen

Führen Sie diese Schritte aus, um Spark-nlp zu installieren und in Data Flow auszuführen.

Sie müssen die Schritte 1 und 2 unter Data Flow-Spark-Umgebung mit einer Conda-Umgebung anpassen ausgeführt haben. Die spark-nlp-Library ist in der Conda-Umgebung pyspark32_p38_cpu_v2 vorinstalliert.

  1. Installieren Sie die vortrainierten spark-nlp-Modelle und -Pipelines.

    Wenn Sie vortrainierte spark-nlp-Modelle benötigen, laden Sie diese herunter, und dekomprimieren Sie sie im Conda-Umgebungsordner. Data Flow unterstützt den Egress-Traffic zum öffentlichen Internet noch nicht. Sie können vortrainierte Modelle nicht dynamisch aus AWS S3 in Data Flow dynamisch herunterladen.

    Sie können vortrainierte Modelle als ZIP-Archive vom Modell-Hub herunterladen. Dekomprimieren Sie das Modell im Conda-Umgebungsordner. Das Beispielmodell lautet https://nlp.johnsnowlabs.com/2021/03/23/explain_document_dl_en.html:
    mkdir /home/datascience/conda/pyspark32_p38_cpu_v2/sparknlp-models
    unzip explain_document_dl_en_3.0.0_3.0_1616473268265.zip -d /home/datascience/conda/pyspark32_p38_cpu_v2/sparknlp-models/
  2. Veröffentlichen Sie die Conda-Umgebung. Informationen hierzu finden Sie in Schritt 3 unter Data Flow-Spark-Umgebung mit einer Conda-Umgebung anpassen.
  3. Starten Sie das Data Flow-Cluster.

    Prüfen Sie in einer Notizbuchzelle, die im Notizbuchsession-Kernel pyspark30_p37_cpu_v5 ausgeführt wird, den

    Parameter spark.jars.packages. Er spiegelt die Version von spark-nlp wider, die Sie installiert haben.
    %create_session -l python -c '{"compartmentId":" <your-compartment-ocid>", \
    "displayName":"sparknlp",\
    "sparkVersion":"3.2.1", \
    "language":"PYTHON", \
    "type": "SESSION",\
    "driverShape":"VM.Standard2.1", \
    "executorShape":"VM.Standard2.1",\
    "numExecutors":1,\
    "configuration": {"spark.archives":"oci://<your-bucket>@<your-tenancy-namespace>/<your-path-to-the-conda-environment>#conda",\
    "spark.jars.ivy":"/opt/spark/work-dir/conda/.ivy2",\
    "spark.jars.packages": "com.johnsnowlabs.nlp:spark-nlp_2.12:4.1.0"}\
    }'
  4. Testen Sie es mit einem Code-Snippet aus dem spark-nlp-Repository GitHub:
    %%spark
     
    from sparknlp.base import *
    from sparknlp.annotator import *
    from sparknlp.pretrained import PretrainedPipeline
    import sparknlp
     
    # Start SparkSession with Spark NLP
    # start() functions has 3 parameters: gpu, m1, and memory
    # sparknlp.start(gpu=True) will start the session with GPU support
    # sparknlp.start(m1=True) will start the session with macOS M1 support
    # sparknlp.start(memory="16G") to change the default driver memory in SparkSession
    spark = sparknlp.start()
     
    # Download a pre-trained pipeline
    pipeline = PretrainedPipeline('explain_document_dl', lang='en', disk_location="/opt/spark/work-dir/conda/sparknlp-models/")
     
     
    # Your testing dataset
    text = """
    Lawrence Joseph Ellison (born August 17, 1944) is an American business magnate and investor who is the co-founder,
    executive chairman, chief technology officer (CTO) and former chief executive officer (CEO) of the
    American computer technology company Oracle Corporation.[2] As of September 2022, he was listed by
    Bloomberg Billionaires Index as the ninth-wealthiest person in the world, with an estimated
    fortune of $93 billion.[3] Ellison is also known for his 98% ownership stake in Lanai,
    the sixth-largest island in the Hawaiian Archipelago.[4]
    """
     
    # Annotate your testing dataset
    result = pipeline.annotate(text)
     
    # What's in the pipeline
    print(list(result.keys()))
     
    # Check the results
    print(result['entities'])
    Die folgende Ausgabe befindet sich in der Notizbuchzelle:
    ['entities', 'stem', 'checked', 'lemma', 'document', 'pos', 'token', 'ner', 'embeddings', 'sentence']
    ['Lawrence Joseph Ellison', 'American', 'American', 'Oracle Corporation', 'Bloomberg Billionaires Index', 'Ellison', 'Lanai', 'Hawaiian Archipelago']

Beispiele

Im Folgenden finden Sie einige Beispiele für die Verwendung von Daten FlowMagic.

PySpark

Die Variable sc stellt den Spark dar und ist verfügbar, wenn der Magic-Befehl %%spark verwendet wird. Die folgende Zelle ist ein spielerisches Beispiel zur Verwendung von sc in einer Datenzelle FlowMagic. Die Zelle ruft die Methode .parallelize() auf, mit der ein RDD, numbers, aus einer Liste von Zahlen erstellt wird. Informationen über das RDD werden ausgegeben. Die Methode .toDebugString() gibt eine Beschreibung des RDD zurück.
%%spark
print(sc.version)
 
numbers = sc.parallelize([4, 3, 2, 1])
print(f"First element of numbers is {numbers.first()}")
print(f"The RDD, numbers, has the following description\n{numbers.toDebugString()}")

Spark SQL

Mit der Option -c sql können Sie Spark SQL-Befehle in einer Zelle ausführen. In diesem Abschnitt wird das Dataset Citi Bike verwendet. Die folgende Zelle liest das Dataset in einen Spark-Dataframe und speichert es als Tabelle. Dieses Beispiel zeigt Spark SQL an.

Das Citi Bike-Dataset wird in den Objektspeicher hochgeladen. Möglicherweise müssen Sie dasselbe in Ihrer Realm tun.
%%spark
df_bike_trips = spark.read.csv("oci://dmcherka-dev@ociodscdev/201306-citibike-tripdata.csv", header=False, inferSchema=True)
df_bike_trips.show()
df_bike_trips.createOrReplaceTempView("bike_trips")

Im folgenden Beispiel wird die Option -c sql verwendet, um Daten FlowMagic mitzuteilen, dass der Inhalt der Zelle SparkSQL ist. Mit der Option -o <variable> werden die Ergebnisse des Spark SQL-Vorgangs übernommen und in der definierten Variablen gespeichert. In diesem Fall sind

df_bike_trips ein Pandas-Dataframe, der im Notizbuch verwendet werden kann.
%%spark -c sql -o df_bike_trips
SELECT _c0 AS Duration, _c4 AS Start_Station, _c8 AS End_Station, _c11 AS Bike_ID FROM bike_trips;
Drucken Sie die ersten Datenzeilen:
df_bike_trips.head()
Ebenso können Sie sqlContext verwenden, um die Tabelle abzufragen:
%%spark
df_bike_trips_2 = sqlContext.sql("SELECT * FROM bike_trips")
df_bike_trips_2.show()
Schließlich können Sie die Tabelle beschreiben:
%%spark -c sql
SHOW TABLES

Widget für automatische Visualisierung

Die Daten FlowMagic enthalten autovizwidget, das die Visualisierung von Pandas-Dataframes ermöglicht. Die Funktion display_dataframe() verwendet einen Pandas-Dataframe als Parameter und generiert eine interaktive GUI im Notizbuch. Sie verfügt über Registerkarten, mit denen die Daten in verschiedenen Formen visualisiert werden können, wie z.B. tabellarisch, Kreisdiagramme, Streudiagramme sowie Flächen- und Balkendiagramme.

Die folgende Zelle ruft display_dataframe() mit dem df_people-Dataframe auf, der im Spark SQL-Abschnitt des Notizbuchs erstellt wurde:
from autovizwidget.widget.utils import display_dataframe
display_dataframe(df_bike_trips)

Matplotlib

Data Scientists müssen häufig ihre Daten visualisieren. Bei großen Datasets ist es in der Regel nicht möglich und fast nie ratsam, die Daten aus dem Data Flow-Spark-Cluster in die Notebook-Session abzurufen. In diesem Beispiel wird gezeigt, wie Sie serverseitige Ressourcen verwenden, um ein Plot zu generieren und in das Notizbuch aufzunehmen.

Der Dataframe df_bike_trips wird in der Session definiert und wiederverwendet. Um eine Matplotlib-Bibliothek zu erzeugen, nehmen Sie die erforderlichen Bibliotheken auf, und generieren Sie das Diagramm. Verwenden Sie den Magic-Befehl %matplot plt, um das Diagramm im Notizbuch anzuzeigen, obwohl sie serverseitig angezeigt wird:
%%spark
import matplotlib.pyplot as plt
df_bike_trips.groupby("_c4").count().toPandas().plot.bar(x="_c4", y="count")
%matplot plt

Weitere Beispiele

Weitere Beispiele finden Sie unter GitHub unter Data Flow-Beispiele und Data Science-Beispiele.