Intégration de Data Flow à Data Science

Avec Data Flow, vous pouvez configurer des blocs-notes Data Science pour exécuter des applications de manière interactive sur Data Flow.

Data Flow utilise des blocs-notes Jupyter entièrement gérés pour permettre aux analystes de données et aux ingénieurs de données de créer, de visualiser et de déboguer des applications d'ingénierie et de science des données, ainsi que de collaborer sur celles-ci. Vous pouvez écrire ces applications en Python, en Scala et en PySpark. Vous pouvez également connecter une session de bloc-notes Data Science à Data Flow pour exécuter des applications. Les applications et noyaux Data Flow sont exécutés sur Oracle Cloud Infrastructure Data Flow.

Apache Spark est un système de calcul distribué conçu pour traiter les données à l'échelle. Il prend en charge les traitements de flux de données, batch et SQL, ainsi que les tâches d'apprentissage automatique à grande échelle. Spark SQL fournit une prise en charge de type base de données. Pour interroger des données structurées, utilisez Spark SQL. Il s'agit d'une implémentation SQL de norme ANSI.

Data Flow Sessions prend en charge le redimensionnement automatique des fonctionnalités de cluster de Data Flow. Pour plus d'informations, reportez-vous à Redimensionnement automatique dans la documentation Data Flow.

Les sessions Data Flow prennent en charge l'utilisation d'environnements conda en tant qu'environnements d'exécution Spark personnalisables.

Un bloc-notes Data Science utilise Data Flow Magic pour envoyer des demandes à Data Flow à l'aide des API NotebookSession afin d'exécuter du code Spark sur un serveur Data Flow.
Limites
  • Les sessions Data Flow peuvent durer jusqu'à 7 jours ou 10 080 minutes (maxDurationInMinutes).

  • La valeur de délai d'inactivité par défaut des sessions Data Flow est de 8 heures (480) (idleTimeoutInMinutes). Vous pouvez configurer une autre valeur.
  • La session Data Flow est uniquement disponible via une session de bloc-notes Data Science.
  • Seules les versions 3.5.0 et 3.2.1 de Spark sont prises en charge.
Conseil

Regardez le tutoriel vidéo sur l'utilisation de Data Science avec le studio Data Flow. Pour plus d'informations sur l'intégration de Data Science et Data Flow, reportez-vous également à la documentation du kit SDK Oracle Accelerated Data Science.

Installation de l'environnement conda

Suivez ces étapes pour utiliser Data Flow avec Data Flow Magic.

  1. Créer ou ouvrir une session de bloc-notes dans Data Science.
    • La session de bloc-notes doit se trouver dans la région dans laquelle le service a été activé pour la location.
    • La session de bloc-notes doit se trouver dans le compartiment du groupe dynamique de sessions de bloc-notes.
  2. Installez et activez l'environnement conda pyspark32_p38_cpu_v3 dans la session de bloc-notes :
    copy odsc conda install -s pyspark32_p38_cpu_v3
    Remarque

    Nous vous recommandons d'installer l'environnement conda mis à jour pyspark32_p38_cpu_v3, car il inclut la prise en charge du redémarrage automatique des sessions qui se sont arrêtées en raison du dépassement du délai d'expiration maximal.

    L'environnement conda stable précédent était pyspark32_p38_cpu_v2.

  3. Activez l'environnement conda pyspark32_p38_cpu_v3 :
    source activate /home/datascience/conda/pyspark32_p38_cpu_v3

Utilisation de Data Flow avec Data Science

Procédez comme suit pour exécuter une application en utilisant Data Flow avec Data Science.

  • Assurez-vous que les stratégies permettant d'utiliser un bloc-notes avec Data Flow sont configurées.

  • Assurez-vous que les stratégies Data Science sont correctement configurées.

  • Pour obtenir la liste de toutes les commandes prises en charge, utilisez la commande %help.
  • Les commandes des étapes suivantes s'appliquent à Spark 3.5.0 et à Spark 3.2.1. Spark 3.5.0 est utilisé dans les exemples. Définissez la valeur de sparkVersion en fonction de la version de Spark utilisée.
  1. Configurez l'authentification dans ADS.
    • Le kit SDK ADS est utilisé pour contrôler le type d'authentification utilisé dans Data Flow Magic.
    • L'authentification par clé d'API est utilisée par défaut. Pour modifier le type d'authentification, utilisez la commande ads.set_auth("resource_principal") :
      import ads
      ads.set_auth("resource_principal") # Supported values: resource_principal, api_key
  2. Chargement de l'extension Data Flow Magic
    %load_ext dataflow.magics
  3. (Facultatif) Créez une session Data Flow à l'aide de la commande magic %create_session :
    Session commune
    L'exemple suivant montre comment créer une session sur des formes flexibles :
    import json
    command = {
        "compartmentId": "<compartment_ocid>",
        "displayName": "<session_name>",
        "language": "PYTHON",
        "sparkVersion": "3.5.0",
        "driverShape": "VM.Standard.E3.Flex",
        "executorShape": "VM.Standard.E3.Flex",
        "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "numExecutors": 1,
        "type": "SESSION",
        "logsBucketUri": "oci://<bucket>@<namespace>/",
    }
    command = f'\'{json.dumps(command)}\''
     
    %create_session -l python -c $command
    Session avec un URI d'archive
    import json
    command = {
        "compartmentId": "<compartment_ocid>",
        "displayName": "<session_name>",
        "language": "PYTHON",
        "sparkVersion": "3.5.0",
        "driverShape": "VM.Standard.E3.Flex",
        "executorShape": "VM.Standard.E3.Flex",
        "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "numExecutors": 1,
        "type": "SESSION",
        "logsBucketUri": "oci://<bucket>@<namespace>",
        "archiveUri": <oci://<bucket>@<namespace>/archive.zip"
    }
    command = f'\'{json.dumps(command)}\''
     
    %create_session -l python -c $command
    Session avec un environnement conda personnalisé
    import json
    command = {
        "compartmentId": "<compartment_ocid>",
        "displayName": "<session_name>",
        "language": "PYTHON",
        "sparkVersion": "3.5.0",
        "driverShape": "VM.Standard.E3.Flex",
        "executorShape": "VM.Standard.E3.Flex",
        "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "numExecutors": 1,
        "type": "SESSION",
        "logsBucketUri": "oci://<bucket>@<namespace>",
        "configuration": {
            "spark.archives": "oci://<bucket>@<namespace>/conda_pack/<pack_name>"
        },
    }
    command = f'\'{json.dumps(command)}\''
     
    %create_session -l python -c $command
    Session avec un metastore
    import json
    command = {
        "compartmentId": "<compartment_ocid>",
        "displayName": "<session_name>",
        "language": "PYTHON",
        "sparkVersion": "3.5.0",
        "driverShape": "VM.Standard.E3.Flex",
        "executorShape": "VM.Standard.E3.Flex",
        "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "numExecutors": 1,
        "type": "SESSION",
        "logsBucketUri": oci://<bucket>@<namespace>",
        "metastoreId": "<ocid1.datacatalogmetastore.oc1.iad...>",
        "configuration": {
            "spark.archives": "oci://<bucket>@<namespace>/conda_pack/<pack_name>"
        },
    }
    command = f'\'{json.dumps(command)}\''
     
    %create_session -l python -c $command
  4. (Facultatif) Utilisez une session Data Flow existante :
    Utilisez la commande %use_session. Copiez l'OCID à partir de la console.
    %use_session -s <session_OCID> -r <region_name>
  5. Configurez la session à l'aide de la commande %config_session :
    • Pour afficher la configuration en cours, exécutez la commande suivante :
      %config
    • Pour modifier les pilotes et les exécuteurs, exécutez la commande suivante :
      %configure_session -i '{"driverShape": "VM.Standard2.1", "executorShape": "VM.Standard2.1", "numExecutors": 1}'
    • Pour appliquer le redimensionnement automatique, exécutez la commande suivante :
      %configure_session -i '{"driverShape": "VM.Standard2.1",\
      "executorShape": "VM.Standard2.1", "numExecutors": 16,\
      "sparkVersion":"3.5.0",\
      "configuration":{"spark.dynamicAllocation.enabled":"true",\
      "spark.dynamicAllocation.shuffleTracking.enabled":"true",\
      "spark.dynamicAllocation.minExecutors":"16",\
      "spark.dynamicAllocation.maxExecutors":"54",\
      "spark.dynamicAllocation.executorIdleTimeout":"60",\
      "spark.dynamicAllocation.schedulerBacklogTimeout":"60",\
      "spark.dataflow.dynamicAllocation.quotaPolicy":"max"} \
      }'
      Vous pouvez également appliquer une stratégie de redimensionnement automatique lorsque vous créez une session.
  6. (Facultatif) Pour activer une session existante, utilisez la commande %activate_session :
    %activate_session -l python -c 
    '{"compartmentId": "<Compartment_OCID>",
        "displayName": "<Name>",
        "applicationId": "<Application_OCID>"
    }'
  7. Pour arrêter une session, utilisez la commande %stop_session :
    %stop_session

Personnalisation d'un environnement Spark Data Flow avec un environnement conda

Vous pouvez utiliser un environnement conda publié comme environnement d'exécution.

  1. Installez Spark 3.5.0 et Data Flow dans la session de bloc-notes :
    odsc conda install -s pyspark32_p38_cpu_v3
  2. Installez les bibliothèques à l'aide de conda.
  3. publier l'environnement conda:
    odsc conda publish -s pyspark32_p38_cpu_v3
  4. Démarrez le cluster Data Flow, par exemple :
    %create_session -l python -c '{"compartmentId":"<your-compartment-ocid>", \
    "displayName":"<your-display-name>",\
    "sparkVersion":"3.5.0", \
    "language":"PYTHON", \
    "type": "SESSION",\
    "driverShape":"VM.Standard2.1", \
    "executorShape":"VM.Standard2.1",\
    "numExecutors":1,\
    "configuration": {"spark.archives":"oci://<your-bucket>@<your-tenancy-namespace>/<your-path-to-the-conda-environment>#conda"}}'
    Remarque

    La configuration de la session doit inclure les paramètres suivants :
    • "sparkVersion":"3.5.0"
    • "language":"PYTHON"
    • "configuration" avec un chemin spark.archives vers l'environnement conda sur le stockage d'objet.

Exécution de spark-nlp sur Data Flow

Suivez ces étapes pour installer Spark-nlp et l'exécuter sur Data Flow.

Vous devez avoir effectué les étapes 1 et 2 décrites dans Personnalisation d'un environnement Spark Data Flow avec un environnement conda. La bibliothèque spark-nlp est préinstallée dans l'environnement conda pyspark32_p38_cpu_v2.

  1. Installez les pipelines et les modèles spark-nlp préentraînés.

    Si vous avez besoin de modèles spark-nlp préentraînés, téléchargez-les et décompressez-les dans le dossier de l'environnement conda. Data Flow ne prend pas encore en charge la sortie vers le réseau Internet public. Vous ne pouvez pas télécharger de manière dynamique des modèles préentraînés à partir d'AWS S3 dans Data Flow.

    Vous pouvez télécharger des modèles préentraînés à partir du hub de modèles sous forme d'archives ZIP. Décompressez le modèle dans le dossier de l'environnement conda. L'exemple de modèle est disponible sur la page https://nlp.johnsnowlabs.com/2021/03/23/explain_document_dl_en.html :
    mkdir /home/datascience/conda/pyspark32_p38_cpu_v2/sparknlp-models
    unzip explain_document_dl_en_3.0.0_3.0_1616473268265.zip -d /home/datascience/conda/pyspark32_p38_cpu_v2/sparknlp-models/
  2. Publiez l'environnement conda. Reportez-vous à l'étape 3 décrite dans Personnalisation d'un environnement Spark Data Flow avec un environnement conda.
  3. Démarrez le cluster Data Flow.

    Dans une cellule de bloc-notes exécutée dans le noyau pyspark30_p37_cpu_v5 de la session de bloc-notes, vérifiez à nouveau

    le paramètre spark.jars.packages. Il reflète la version de spark-nlp que vous avez installée.
    %create_session -l python -c '{"compartmentId":" <your-compartment-ocid>", \
    "displayName":"sparknlp",\
    "sparkVersion":"3.2.1", \
    "language":"PYTHON", \
    "type": "SESSION",\
    "driverShape":"VM.Standard2.1", \
    "executorShape":"VM.Standard2.1",\
    "numExecutors":1,\
    "configuration": {"spark.archives":"oci://<your-bucket>@<your-tenancy-namespace>/<your-path-to-the-conda-environment>#conda",\
    "spark.jars.ivy":"/opt/spark/work-dir/conda/.ivy2",\
    "spark.jars.packages": "com.johnsnowlabs.nlp:spark-nlp_2.12:4.1.0"}\
    }'
  4. Testez-le avec un fragment de code provenant du référentiel GitHub spark-nlp :
    %%spark
     
    from sparknlp.base import *
    from sparknlp.annotator import *
    from sparknlp.pretrained import PretrainedPipeline
    import sparknlp
     
    # Start SparkSession with Spark NLP
    # start() functions has 3 parameters: gpu, m1, and memory
    # sparknlp.start(gpu=True) will start the session with GPU support
    # sparknlp.start(m1=True) will start the session with macOS M1 support
    # sparknlp.start(memory="16G") to change the default driver memory in SparkSession
    spark = sparknlp.start()
     
    # Download a pre-trained pipeline
    pipeline = PretrainedPipeline('explain_document_dl', lang='en', disk_location="/opt/spark/work-dir/conda/sparknlp-models/")
     
     
    # Your testing dataset
    text = """
    Lawrence Joseph Ellison (born August 17, 1944) is an American business magnate and investor who is the co-founder,
    executive chairman, chief technology officer (CTO) and former chief executive officer (CEO) of the
    American computer technology company Oracle Corporation.[2] As of September 2022, he was listed by
    Bloomberg Billionaires Index as the ninth-wealthiest person in the world, with an estimated
    fortune of $93 billion.[3] Ellison is also known for his 98% ownership stake in Lanai,
    the sixth-largest island in the Hawaiian Archipelago.[4]
    """
     
    # Annotate your testing dataset
    result = pipeline.annotate(text)
     
    # What's in the pipeline
    print(list(result.keys()))
     
    # Check the results
    print(result['entities'])
    La sortie suivante se trouve dans la cellule du bloc-notes :
    ['entities', 'stem', 'checked', 'lemma', 'document', 'pos', 'token', 'ner', 'embeddings', 'sentence']
    ['Lawrence Joseph Ellison', 'American', 'American', 'Oracle Corporation', 'Bloomberg Billionaires Index', 'Ellison', 'Lanai', 'Hawaiian Archipelago']

Exemples

Voici des exemples d'utilisation des données FlowMagic.

PySpark

La variable sc représente Spark et est disponible lorsque la commande magique %%spark est utilisée. La cellule suivante est un exemple d'utilisation de sc dans une cellule de données FlowMagic. La cellule appelle la méthode .parallelize() qui crée un RDD (Resilient Distributed Dataset), numbers, à partir d'une liste de numéros. Les informations sur le RDD sont affichées. La méthode .toDebugString() renvoie la description du RDD.
%%spark
print(sc.version)
 
numbers = sc.parallelize([4, 3, 2, 1])
print(f"First element of numbers is {numbers.first()}")
print(f"The RDD, numbers, has the following description\n{numbers.toDebugString()}")

Spark SQL

L'option -c sql permet d'exécuter des commandes Spark SQL dans une cellule. Dans cette section, l'ensemble de données de Citi Bike est utilisé. La cellule suivante lit l'ensemble de données dans une trame de données Spark et l'enregistre sous forme de table. Cet exemple est utilisé pour afficher Spark SQL.

L'ensemble de données de Citi Bike est téléchargé vers le stockage d'objet. Vous devrez peut-être faire de même dans votre domaine.
%%spark
df_bike_trips = spark.read.csv("oci://dmcherka-dev@ociodscdev/201306-citibike-tripdata.csv", header=False, inferSchema=True)
df_bike_trips.show()
df_bike_trips.createOrReplaceTempView("bike_trips")

L'exemple suivant utilise l'option -c sql pour indiquer aux données FlowMagic que le contenu de la cellule est SparkSQL. L'option -o <variable> prend les résultats de l'opération Spark SQL et les stocke dans la variable définie. Dans ce cas,

df_bike_trips est une trame de données Pandas pouvant être utilisée dans le bloc-notes.
%%spark -c sql -o df_bike_trips
SELECT _c0 AS Duration, _c4 AS Start_Station, _c8 AS End_Station, _c11 AS Bike_ID FROM bike_trips;
Imprimez les premières lignes de données :
df_bike_trips.head()
De même, vous pouvez utiliser sqlContext pour interroger la table :
%%spark
df_bike_trips_2 = sqlContext.sql("SELECT * FROM bike_trips")
df_bike_trips_2.show()
Enfin, vous pouvez décrire la table :
%%spark -c sql
SHOW TABLES

Widget de visualisation automatique

Les données FlowMagic sont fournies avec autovizwidget, qui permet la visualisation des trames de données Pandas. La fonction display_dataframe() prend une trame de données Pandas en tant que paramètre et génère une interface utilisateur graphique interactive dans le bloc-notes. Elle comporte des onglets qui affichent la visualisation des données dans différents formulaires, tels que des tableaux, des graphiques à secteurs, des diagrammes en nuage de points et des graphiques en zone et à barres.

La cellule suivante appelle display_dataframe() avec la trame de données df_people qui a été créée dans la section Spark SQL du bloc-notes :
from autovizwidget.widget.utils import display_dataframe
display_dataframe(df_bike_trips)

Matplotlib

Une tâche courante des analystes de données consiste à visualiser leurs données. Avec des ensembles de données volumineux, il n'est généralement pas possible ni recommandé d'extraire les données du cluster Spark Data Flow vers la session de bloc-notes. Cet exemple montre comment utiliser les ressources côté serveur pour générer un tracé et l'inclure dans le bloc-notes.

La trame de données df_bike_trips est définie dans la session et réutilisée. Pour générer une instance Matplotlib, incluez les bibliothèques requises et générez le tracé. Utilisez la commande magic %matplot plt pour afficher le graphique dans le bloc-notes, même s'il est affiché côté serveur :
%%spark
import matplotlib.pyplot as plt
df_bike_trips.groupby("_c4").count().toPandas().plot.bar(x="_c4", y="count")
%matplot plt

Autres exemples

D'autres exemples sont disponibles à partir de GitHub avec des échantillons Data Flow et des échantillons Data Science.