Intégration du service de flux de données au service de science des données

Avec le service de flux de données, vous pouvez configurer des carnets du service de science des données pour l'exécution interactive d'applications avec le service de flux de données.

Le service de flux de données utilise des carnets Jupyter entièrement gérés pour permettre aux spécialistes des données et aux ingénieurs de données de créer, de visualiser, de collaborer et de déboguer des applications d'ingénierie des données et de science des données. Vous pouvez écrire ces applications en Python, Scala et PySpark. Vous pouvez également connecter une session de carnet du service de science des données au service de flux de données pour exécuter des applications. Les noyaux et les applications de flux de données s'exécutent sur Oracle Cloud Infrastructure Data Flow .

Apache Spark est un système de calcul distribué conçu pour traiter des données à grande échelle. Il prend en charge les traitements SQL, par lots et de flux de données, ainsi que les tâches d'apprentissage automatique à grande échelle. Spark SQL offre une prise en charge de type base de données. Pour interroger des données structurées, utilisez Spark SQL. Il s'agit d'une mise en oeuvre SQL ANSI standard.

Les sessions du service de flux de données prennent en charge l'ajustement automatique des capacités des grappes du service de flux de données. Pour plus d'informations, voir Ajustement automatique dans la documentation sur le service de flux de données.

Les sessions du service de flux de données prennent en charge l'utilisation d'environnements Conda en tant qu'environnements d'exécution Spark personnalisables.

Un carnet du service de science des données utilise Data Flow Magic pour envoyer des demandes au service de flux de données à l'aide des API NotebookSession pour exécuter du code Spark sur un serveur de flux de données.
Limitations
  • Les sessions du service de flux de données durent jusqu'à 7 jours ou 10 080 minutes (maxDurationInMinutes).

  • Les sessions du service de flux de données ont une valeur de délai d'attente par défaut de 480 minutes (8 heures) (idleTimeoutInMinutes). Vous pouvez configurer une valeur différente.
  • La session du service de flux de données est disponible uniquement par l'intermédiaire d'une session de carnet du service de science des données.
  • Seules les versions 3.5.0 et 3.2.1 de Spark sont prises en charge.
Conseil

Regardez la vidéo du tutoriel sur l'utilisation du service de science des données avec Data Flow Studio. Consultez également la documentation sur la trousse SDK Oracle Accelerated Data Science pour plus d'informations sur l'intégration des services de science des données et de flux de données.

Installation de l'environnement Conda

Suivez les présentes étapes pour utiliser le service de flux de données avec Data Flow Magic.

  1. Créez ou ouvrez une session de carnet dans le service de science des données.
    • La session de carnet doit se trouver dans la région où le service a été activé pour la location.
    • La session de carnet doit se trouver dans le compartiment du groupe dynamique de sessions de carnet.
  2. Installez et activez l'environnement Conda pyspark32_p38_cpu_v3 dans la session de carnet :
    copy odsc conda install -s pyspark32_p38_cpu_v3
    Note

    Nous vous recommandons d'installer l'environnement Conda mis à jour pyspark32_p38_cpu_v3, car il prend en charge le redémarrage automatique des sessions qui se sont arrêtées car elles ont dépassé la durée maximale de temporisation.

    L'environnement Conda stable précédent était pyspark32_p38_cpu_v2.

  3. Activez l'environnement Conda pyspark32_p38_cpu_v3 :
    source activate /home/datascience/conda/pyspark32_p38_cpu_v3

Utilisation du service de flux de données avec le service de science des données

Suivez ces étapes pour exécuter une application à l'aide du service de flux de données avec le service de science des données.

  • Assurez-vous que les politiques pour utiliser un carnet avec le service de flux de données sont configurées.

  • Assurez-vous que les politiques du service de science des données sont configurées correctement.

  • Pour obtenir la liste de toutes les commandes prises en charge, utilisez la commande %help.
  • Les commandes des étapes suivantes s'appliquent à la fois à Spark 3.5.0 et à Spark 3.2.1. Spark 3.5.0 est utilisé dans les exemples. Définissez la valeur de sparkVersion en fonction de la version de Spark utilisée.
  1. Configurez l'authentification dans ADS.
    • La trousse SDK ADS est utilisée pour contrôler le type d'authentification utilisé dans Data Flow Magic.
    • L'authentification par clé d'API est utilisée par défaut. Pour modifier le type d'authentification, utilisez la commande ads.set_auth("resource_principal") :
      import ads
      ads.set_auth("resource_principal") # Supported values: resource_principal, api_key
  2. Charger l'extension Data Flow Magic
    %load_ext dataflow.magics
  3. (Facultatif) Créer une session de flux de données à l'aide de la commande magique %create_session :
    Session commune
    Cet exemple montre comment créer une session sur des formes flexibles :
    import json
    command = {
        "compartmentId": "<compartment_ocid>",
        "displayName": "<session_name>",
        "language": "PYTHON",
        "sparkVersion": "3.5.0",
        "driverShape": "VM.Standard.E3.Flex",
        "executorShape": "VM.Standard.E3.Flex",
        "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "numExecutors": 1,
        "type": "SESSION",
        "logsBucketUri": "oci://<bucket>@<namespace>/",
    }
    command = f'\'{json.dumps(command)}\''
     
    %create_session -l python -c $command
    Session avec URI d'archive
    import json
    command = {
        "compartmentId": "<compartment_ocid>",
        "displayName": "<session_name>",
        "language": "PYTHON",
        "sparkVersion": "3.5.0",
        "driverShape": "VM.Standard.E3.Flex",
        "executorShape": "VM.Standard.E3.Flex",
        "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "numExecutors": 1,
        "type": "SESSION",
        "logsBucketUri": "oci://<bucket>@<namespace>",
        "archiveUri": <oci://<bucket>@<namespace>/archive.zip"
    }
    command = f'\'{json.dumps(command)}\''
     
    %create_session -l python -c $command
    Session avec environnement Conda personnalisé
    import json
    command = {
        "compartmentId": "<compartment_ocid>",
        "displayName": "<session_name>",
        "language": "PYTHON",
        "sparkVersion": "3.5.0",
        "driverShape": "VM.Standard.E3.Flex",
        "executorShape": "VM.Standard.E3.Flex",
        "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "numExecutors": 1,
        "type": "SESSION",
        "logsBucketUri": "oci://<bucket>@<namespace>",
        "configuration": {
            "spark.archives": "oci://<bucket>@<namespace>/conda_pack/<pack_name>"
        },
    }
    command = f'\'{json.dumps(command)}\''
     
    %create_session -l python -c $command
    Session avec magasin de métadonnées
    import json
    command = {
        "compartmentId": "<compartment_ocid>",
        "displayName": "<session_name>",
        "language": "PYTHON",
        "sparkVersion": "3.5.0",
        "driverShape": "VM.Standard.E3.Flex",
        "executorShape": "VM.Standard.E3.Flex",
        "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "numExecutors": 1,
        "type": "SESSION",
        "logsBucketUri": oci://<bucket>@<namespace>",
        "metastoreId": "<ocid1.datacatalogmetastore.oc1.iad...>",
        "configuration": {
            "spark.archives": "oci://<bucket>@<namespace>/conda_pack/<pack_name>"
        },
    }
    command = f'\'{json.dumps(command)}\''
     
    %create_session -l python -c $command
  4. (Facultatif) Utilisez une session de flux de données existante :
    Utilisez la commande %use_session. Copiez l'OCID à partir de la console.
    %use_session -s <session_OCID> -r <region_name>
                                
  5. Configurez la session à l'aide de la commande %config_session :
    • Pour voir la configuration courante :
      %config
    • Pour modifier les pilotes et les exécuteurs :
      %configure_session -i '{"driverShape": "VM.Standard2.1", "executorShape": "VM.Standard2.1", "numExecutors": 1}'
    • Pour appliquer l'ajustement automatique :
      %configure_session -i '{"driverShape": "VM.Standard2.1",\
      "executorShape": "VM.Standard2.1", "numExecutors": 16,\
      "sparkVersion":"3.5.0",\
      "configuration":{"spark.dynamicAllocation.enabled":"true",\
      "spark.dynamicAllocation.shuffleTracking.enabled":"true",\
      "spark.dynamicAllocation.minExecutors":"16",\
      "spark.dynamicAllocation.maxExecutors":"54",\
      "spark.dynamicAllocation.executorIdleTimeout":"60",\
      "spark.dynamicAllocation.schedulerBacklogTimeout":"60",\
      "spark.dataflow.dynamicAllocation.quotaPolicy":"max"} \
      }'
      Vous pouvez également appliquer une politique d'ajustement automatique lors de la création d'une session.
  6. (Facultatif) Pour activer une session existante, utilisez la commande %activate_session :
    %activate_session -l python -c 
    '{"compartmentId": "<Compartment_OCID>",
        "displayName": "<Name>",
        "applicationId": "<Application_OCID>"
    }'
  7. Pour arrêter une session, utilisez la commande %stop_session :
    %stop_session

Personnalisation d'un environnement Spark de flux de données avec un environnement Conda

Vous pouvez utiliser un environnement Conda publié en tant qu'environnement d'exécution.

  1. Installez Spark 3.5.0 et le service de flux de données dans la session de carnet :
    odsc conda install -s pyspark32_p38_cpu_v3
  2. Installez les bibliothèques à l'aide de Conda.
  3. Publiez l'environnement Conda :
    odsc conda publish -s pyspark32_p38_cpu_v3
  4. Démarrez la grappe du service de flux de données, par exemple :
    %create_session -l python -c '{"compartmentId":"<your-compartment-ocid>", \
    "displayName":"<your-display-name>",\
    "sparkVersion":"3.5.0", \
    "language":"PYTHON", \
    "type": "SESSION",\
    "driverShape":"VM.Standard2.1", \
    "executorShape":"VM.Standard2.1",\
    "numExecutors":1,\
    "configuration": {"spark.archives":"oci://<your-bucket>@<your-tenancy-namespace>/<your-path-to-the-conda-environment>#conda"}}'
    Note

    La configuration de la session doit inclure les paramètres suivants :
    • "sparkVersion":"3.5.0"
    • "language" :"PYTHON"
    • "configuration" avec un chemin spark.archives vers l'environnement Conda du stockage d'objets.

Exécution de spark-nlp dans le service de flux de données

Suivez les étapes ci-dessous pour installer Spark-nlp et l'exécuter dans le service de flux de données.

Vous devez avoir terminé les étapes 1 et 2 de la rubrique Personnalisation d'un environnement Spark de flux de données avec un environnement Conda. La bibliothèque spark-nlp est préinstallée dans l'environnement Conda pyspark32_p38_cpu_v2.

  1. Installez les modèles et pipelines spark-nlp préentraînés.

    Si vous avez besoin de modèles spark-nlp préentraînés, téléchargez-les et décompressez-les dans le dossier de l'environnement Conda. Le service de flux de données ne prend pas encore en charge le trafic sortant vers le réseau Internet public. Vous ne pouvez pas télécharger dynamiquement des modèles préentraînés à partir d'AWS S3 dans le service de flux de données.

    Vous pouvez télécharger des modèles préentraînés à partir du centre de modèles sous forme d'archives zip. Décompressez le modèle dans le dossier de l'environnement Conda. L'exemple de modèle est https://nlp.johnsnowlabs.com/2021/03/23/explain_document_dl_en.html :
    mkdir /home/datascience/conda/pyspark32_p38_cpu_v2/sparknlp-models
    unzip explain_document_dl_en_3.0.0_3.0_1616473268265.zip -d /home/datascience/conda/pyspark32_p38_cpu_v2/sparknlp-models/
  2. Publiez l'environnement Conda. Voir l'étape 3 sous Personnalisation d'un environnement Spark de flux de données avec un environnement Conda.
  3. Démarrez la grappe du service de flux de données.

    Dans une cellule de carnet s'exécutant dans le noyau pyspark30_p37_cpu_v5 de la session de carnet, vérifiez

    le paramètre spark.jars.packages. Il reflète la version de spark-nlp que vous avez installée.
    %create_session -l python -c '{"compartmentId":" <your-compartment-ocid>", \
    "displayName":"sparknlp",\
    "sparkVersion":"3.2.1", \
    "language":"PYTHON", \
    "type": "SESSION",\
    "driverShape":"VM.Standard2.1", \
    "executorShape":"VM.Standard2.1",\
    "numExecutors":1,\
    "configuration": {"spark.archives":"oci://<your-bucket>@<your-tenancy-namespace>/<your-path-to-the-conda-environment>#conda",\
    "spark.jars.ivy":"/opt/spark/work-dir/conda/.ivy2",\
    "spark.jars.packages": "com.johnsnowlabs.nlp:spark-nlp_2.12:4.1.0"}\
    }'
  4. Testez-le avec un extrait de code du référentiel spark-nlp GitHub :
    %%spark
     
    from sparknlp.base import *
    from sparknlp.annotator import *
    from sparknlp.pretrained import PretrainedPipeline
    import sparknlp
     
    # Start SparkSession with Spark NLP
    # start() functions has 3 parameters: gpu, m1, and memory
    # sparknlp.start(gpu=True) will start the session with GPU support
    # sparknlp.start(m1=True) will start the session with macOS M1 support
    # sparknlp.start(memory="16G") to change the default driver memory in SparkSession
    spark = sparknlp.start()
     
    # Download a pre-trained pipeline
    pipeline = PretrainedPipeline('explain_document_dl', lang='en', disk_location="/opt/spark/work-dir/conda/sparknlp-models/")
     
     
    # Your testing dataset
    text = """
    Lawrence Joseph Ellison (born August 17, 1944) is an American business magnate and investor who is the co-founder,
    executive chairman, chief technology officer (CTO) and former chief executive officer (CEO) of the
    American computer technology company Oracle Corporation.[2] As of September 2022, he was listed by
    Bloomberg Billionaires Index as the ninth-wealthiest person in the world, with an estimated
    fortune of $93 billion.[3] Ellison is also known for his 98% ownership stake in Lanai,
    the sixth-largest island in the Hawaiian Archipelago.[4]
    """
     
    # Annotate your testing dataset
    result = pipeline.annotate(text)
     
    # What's in the pipeline
    print(list(result.keys()))
     
    # Check the results
    print(result['entities'])
    La sortie suivante se trouve dans la cellule de carnet :
    ['entities', 'stem', 'checked', 'lemma', 'document', 'pos', 'token', 'ner', 'embeddings', 'sentence']
    ['Lawrence Joseph Ellison', 'American', 'American', 'Oracle Corporation', 'Bloomberg Billionaires Index', 'Ellison', 'Lanai', 'Hawaiian Archipelago']

Exemples

Voici quelques exemples d'utilisation de Data FlowMagic.

PySpark

La variable sc représente Spark et est disponible lorsque la commande magique %%spark est utilisée. La cellule suivante montre un exemple simple d'utiliser sc dans une cellule Data FlowMagic. La cellule appelle la méthode .parallelize() , qui crée un jeu de données RDD, numbers, à partir d'une liste de nombres. Les informations relatives au RDD sont affichées. La méthode .toDebugString() retourne une description du RDD.
%%spark
print(sc.version)
 
numbers = sc.parallelize([4, 3, 2, 1])
print(f"First element of numbers is {numbers.first()}")
print(f"The RDD, numbers, has the following description\n{numbers.toDebugString()}")

Spark SQL

L'utilisation de l'option -c sql vous permet d'exécuter des commandes Spark SQL dans une cellule. Dans cette section, le jeu de données citi bike est utilisé. La cellule suivante lit le jeu de données dans un structure de données Spark et l'enregistre sous forme de table. Cet exemple est utilisé pour afficher Spark SQL.

Le jeu de données citibike est chargé dans le stockage d'objets. Vous devrez peut-être faire de même dans votre domaine.
%%spark
df_bike_trips = spark.read.csv("oci://dmcherka-dev@ociodscdev/201306-citibike-tripdata.csv", header=False, inferSchema=True)
df_bike_trips.show()
df_bike_trips.createOrReplaceTempView("bike_trips")

L'exemple suivant utilise l'option -c sql pour indiquer à Data FlowMagic que la cellule contient du contenu SparkSQL. L'option -o <variable> stocke les résultats de l'opération Spark SQL dans la variable définie. Dans ce cas,

df_bike_trips est une structure de données Pandas disponible pour être utilisée dans le carnet.
%%spark -c sql -o df_bike_trips
SELECT _c0 AS Duration, _c4 AS Start_Station, _c8 AS End_Station, _c11 AS Bike_ID FROM bike_trips;
Afficher les premières rangées de données :
df_bike_trips.head()
De même, vous pouvez utiliser sqlContext pour interroger la table :
%%spark
df_bike_trips_2 = sqlContext.sql("SELECT * FROM bike_trips")
df_bike_trips_2.show()
Enfin, vous pouvez décrire le tableau :
%%spark -c sql
SHOW TABLES

Widget de visualisation automatique

Les données FlowMagic sont fournies avec autovizwidget qui permet la visualisation des structures de données Pandas. La fonction display_dataframe() utilise une structure de données Pandas comme paramètre et génère une interface graphique interactive dans le carnet. Des onglets présentent la visualisation des données sous différentes formes, telles que des tableaux, des graphiques à secteurs, des graphiques en nuage de points et des graphiques en aires et à barres.

La cellule suivante appelle display_dataframe() avec la structure de données df_people créée dans la section Spark SQL du carnet :
from autovizwidget.widget.utils import display_dataframe
display_dataframe(df_bike_trips)

Matplotlib

Une tâche commune effectuée par les experts en science des données consiste à visualiser leurs données. Dans le cas de jeux de données volumineux, cela n'est généralement pas possible et il est presque toujours préférable d'extraire les données de la grappe Spark du service de flux de données dans la session de carnet. Cet exemple montre comment utiliser des ressources côté serveur pour générer un tracé et l'inclure dans le carnet.

La structure de données df_bike_trips est définie dans la session et réutilisée. Pour produire une bibliothèque Matplotlib, ajoutez les bibliothèques requises et générez le tracé. Utilisez la commande magique %matplot plt pour afficher le tracé dans le carnet, même s'il est rendu côté serveur :
%%spark
import matplotlib.pyplot as plt
df_bike_trips.groupby("_c4").count().toPandas().plot.bar(x="_c4", y="count")
%matplot plt

Autres exemples

D'autres exemples sont disponibles dans GitHub avec des échantillons de flux de données et des échantillons de science des données.