.. _data-flow-8:​ ========= Data Flow ========= ​ Data Flow is an Oracle Cloud Infrastructure service for creating and running Spark applications. ADS can be used to to create and run PySpark Data Flow applications directly from notebook session. - `Getting started with Data Flow`_ - `Create a Data Flow instance`_ - `Generate a script using template`_ - `Create a Data Flow Application`_ - `Load an Existing Data Flow application`_ - `Listing Data Flow Applications`_ - `Create a Data Flow run`_ - `Fetching Logs`_ - `Edit and sync pyspark script`_ - `Arguments and Parameters`_ - `Add Third-Party Libraries`_ - `Fetching pyspark Output`_ Getting started with Data Flow ------------------------------ .. note:: We recommend that you use the ``pyspark`` conda environment for Data Flow code development - Before running applications in Data Flow service, there are two storage buckets that are required in Object Store. Data Flow requires a bucket to store the logs, and a data warehouse bucket for Spark SQL application `set up storage `_ - Data Flow requires policies to be set in IAM to access resources in order to manage and run applications `policy set up `_ - Full Data Flow Documentation: `Data Flow documentation `_ Create a Data Flow instance --------------------------- You first need to create a ``DataFlow`` object instance. You can specify a default path where all the data flow artifacts will be stored using the ``dataflow_base_folder`` optional argument. By default all data flow artifacts are stored under ``/home/datascience/dataflow``. This directory ``dataflow_base_folder`` contains multiple subdirectories, each one corresponding to a different application. The name of the subdirectory corresponds to the application name to which a random string is added as a suffix. In each application directory, artifacts generated by separate data flow runs are stored in different folders. Each folder is identified by the run display name and the run creation time. All the run specific artifacts including the script, the run configuration and the run logs are saved in the corresponding run folder. Also, you can choose to use a specific compartment by providing the optional argument ``compartment_id`` when creating the dataflow instance. Otherwise, it will use the **same** compartment as **your notebook session** to create the instance. .. code-block:: python3 from ads.dataflow.dataflow import DataFlow data_flow = DataFlow( compartment_id="", dataflow_base_folder="" ) Generate a script using template --------------------------------- We provide simple pyspark or sparksql templates for you to get started with Data Flow. You can use ``data_flow.template()`` to generate a pre-written template. We support two types of templates: 1. ``standard_pyspark`` template, which is for standard pyspark job 2. ``sparksql`` template which is for sparksql job .. code-block:: python3 from ads.dataflow.dataflow import DataFlow data_flow = DataFlow() data_flow.template(job_type='standard_pyspark') ``data_flow.template()`` will return the local path to the script you have generated. Create a Data Flow Application ------------------------------ ​ The application creation process has two stages: preparation and creation stage. ​ In the preparation stage, you prepare a configuration object necessary to create the Data Flow application. You need to provide values for the three parameters: - ``display_name`` - the name you give your application - ``script_bucket`` - the bucket used to read/write the ``pyspark`` script in object storage - ``pyspark_file_path`` - the local path to your ``pyspark`` script ADS checks that the bucket exists and that you can write to it from your notebook sesssion. Optionally, you can change values for the following parameters: - ``compartment_id`` - OCID of the compartment to create a Data Flow application. If it's not provided, use the **same** compartment as **your dataflow object** by default. - ``logs_bucket`` - the bucket used to store run logs in object storage, the default value is ``"dataflow-logs"`` - ``driver_shape`` - driver shape to create the application, the default value is ``"VM.Standard2.4"`` - ``executor_shape`` - executor shape to create the application, the default value is ``"VM.Standard2.4"`` - ``num_executors`` - the number of executor VMs requested, the default value is ``1`` .. note:: (1) If you want to use a **private** bucket as the ``logs_bucket``, make sure to add a corresponding Data Flow service policy using `Data Flow Identity: Policy Set Up`_. Then you can use ``prepare_app()`` to create the configuration object necessary to create the application. .. code-block:: python3 from ads.dataflow.dataflow import DataFlow data_flow = DataFlow() app_config = data_flow.prepare_app( display_name="", script_bucket="" , pyspark_file_path="" ) Once you have the application configured, you can create a Data Flow application using ``create_app``: .. code-block:: python3 app = data_flow.create_app(app_config) .. note:: (1) Your local script is uploaded to the script bucket in this application creation step. Object Storage supports file versioning that creates an object version when the content changes or the object is deleted. You can enable ``Object Versioning`` in your bucket in the Oracle Cloud Infrastructure Console to prevent overwriting of existing files in Object Storage. (2) You can create an application with a script file that exists in Object Storage by setting ``overwrite_script=True`` in ``create_app``. Similarly, you can set ``overwrite_archive=True`` to create an application with an archive file that exists in Object Storage. By default, the ``overwrite_script`` and ``overwrite_archive`` options are set to ``false``. .. code-block:: python3 app = data_flow.create_app(app_config, overwrite_script=True, overwrite_archive=True) You can explore a few attributes of the ``DataFlowApp`` object. First you can look at the configuration of the application. .. code-block:: python3 app.config You can also get a url link to the OCI console Application Details page. .. code-block:: python3 app.oci_link Load an Existing Data Flow application -------------------------------------- As an alternative to creating applications in ADS, you can load existing applications created elsewhere. These Data Flow applications must be Python applications. To load an existing applications, you need the applications's OCID. .. code-block:: python3 existing_app = data_flow.load_app(app_id, target_folder) You can find the ``app_id`` either directly in the the Oracle Cloud Infrastructure Console or by listing existing applications. Optionally, you could assign a value to the parameter ``target_folder``. This parameter is the directory in which you want to store the local artifacts of this application. If ``target_folder`` is not provided, then **by default** the local artifacts of this application are stored under the ``dataflow_base_folder`` defined by the dataflow object instance. Listing Data Flow Applications ------------------------------ From ADS you can list applications, that are returned a as a list of dicts, with a function to provide the data in a Pandas dataframe. The default sort order is the most recent run first. For example, to list the most recent five applications use: .. code-block:: python3 from ads.dataflow.dataflow import DataFlow data_flow = DataFlow() data_flow.list_apps().to_dataframe().head(5) .. image:: images/list_apps.png :height: 200 :alt: Listing of data flow apps Create a Data Flow run ---------------------- After an application is created or loaded in your notebook session, the next logical step is to execute a run of that application. The process of executing (aka creating) a run is similar to creating an application. The first step is to configure the run. This can be done via the ``prepare_run()`` method of the ``DataFlowApp`` object. You only need to provide a value for the name of your run via ``run_display_name``. .. code-block:: python3 run_config = app.prepare_run(run_display_name="") .. note:: (1) You could use a compartment **different** from your application to create a run by specifying the ``compartment_id`` in ``prepare_run``. By default, it will use the **same** compartment as **your dataflow application** to create the run. (2) Optionally you can specify the ``logs_bucket`` to store the logs of your run. By default, the run will inherit the ``logs_bucket`` from the parent application but you can overwrite that option. (3) Every time the Data Flow application launches a run, a local folder representing this Data Flow run is created. This folder stores all the information including the script, the run configuration, and any logs that are stored in the logs bucket. Then, you can create a Data Flow run using the ``run_config`` generated in the preparation stage. During this process, you can monitor the Data Flow run while the job is running. You can also pull logs to your local directories by setting ``save_log_to_local=True``. .. code-block:: python3 run = app.run(run_config, save_log_to_local=True) The ``DataFlowRun`` object has some useful attributes similar to the ``DataFlowApp`` object. You can check the status of the run by .. code-block:: python3 run.status You can get the config file that created this run. The run configuration and the pyspark script used in this run are also saved in the corresponding run directory in your notebook environment. .. code-block:: python3 run.config You can get the run directory where the artifacts are stored in your notebook environment with: .. code-block:: python3 run.local_dir Similarily, you can get a clickable link to the Oracle Cloud Infrastructure Console Run Details page with: .. code-block:: python3 run.oci_link Fetching Logs ------------- After a Data Flow run has completed, you can examine the logs using ADS. There are two types of logs, ``stdout`` and ``stderr`` ​ .. code-block:: python3 run.log_stdout.head() # show first rows of stdout run.log_stdout.tail() # show last lines of stdout # where the logs are stored on OCI Storage run.log_stdout.oci_path # the path to the saved logs in the notebook environment if ``save_log_to_local`` was ``True`` when you create this run run.log_stdout.local_path If ``save_log_to_local`` is set to ``False`` during ``app.run(...)``, you can fetch logs by calling the ``fetch_log(...).save()`` method on the ``DataFlowRun`` object with the correct logs type. .. code-block:: python3 run.fetch_log("stdout").save() run.fetch_log("stderr").save() .. note:: Due to a limitation of ``pyspark`` (specifically Python applications in Spark) both ``stdout`` and ``stderr`` are merged into the ``stdout`` stream. Edit and sync pyspark script ---------------------------- Data Flow integration with ADS supports the edit-run-edit cycle, the local pyspark script can be edited and will automatically be sync'd to object storage `each time the app is run`. Data Flow sources the pyspark script from object storage so the local files in the notebook session are not visible to Data Flow. The ``app.run(...)`` method will compare the content hash of the local file with the remote copy on object storage, and if any change is detected, the new local version will be copied over to the remote. For the first run the sync will create the remote file and generate a fully qualified url with namespace that's required for Data Flow. Syncing is the default setting in ``app.run(...)``. If you *do not* want the app to sync with the local modified files, you need to include ``sync=False`` as an argument parameter in ``app.run(...)``. Arguments and Parameters ------------------------ Passing arguments to pyspark scripts is done with the ``arguments`` value in ``prepare_app``. Additional to arguments Data Flow supports a parameter dictionary that may be used to interpolate arguments. To just simply pass arguments the ``script_parameter`` section may be ignored, however, any key/value pair defined in ``script_parameter`` may be referened in arguments using the ``${key}`` syntax and the value of that key will be passed as the argument value. .. code-block:: python3 from ads.dataflow.dataflow import DataFlow data_flow = DataFlow() app_config = data_flow.prepare_app( display_name, script_bucket, pyspark_file_path, arguments = ['${foo}', 'bar', '-d', '--file', '${filename}'], script_parameters={ 'foo': 'val1 val2', 'filename': 'file1', } ) app = data_flow.create_app(app_config) run_config = app.prepare_run(run_display_name="test-run") run = app.run(run_config) .. note:: The arguments in the format of ``${arg}`` will be replaced by the value provided in script parameters when passed in, while arguments not in this format are passed into the script verbatim. You can override the values of some or all script parameters in each run by passing different values to ``prepare_run()``. .. code-block:: python3 run_config = app.prepare_run(run_display_name="test-run", foo='val3') run = app.run(run_config) Add Third-Party Libraries -------------------------- Your PySpark applications might have custom dependencies in the form of Python wheels or virtual environments. Dataflow provided instructions on how to create the archive file here: `Adding Third-Party Libraries to Data Flow Applications `_. Pass the archive file to your dataflow applications with ``archive_path`` and ``archive_bucket`` values in ``prepare_app``. - ``archive_path`` is the local path to archive file - ``archive_bucket`` is the bucket used to read/write the archive file in object storage; if not provided, ``archive_bucket`` will use the bucket for pyspark bucket by default. Use ``prepare_app()`` to create the configuration object necessary to create the application. .. code-block:: python3 from ads.dataflow.dataflow import DataFlow data_flow = DataFlow() app_config = data_flow.prepare_app( display_name="", script_bucket="", pyspark_file_path="", archive_path="", archive_bucket="" ) The behavior of the archive file would be very similar to the pyspark script. - when creating an app, the local archive file is uploaded to the specified bucket Object Storage. - when creating a run, the latest local archive file is synchronized to the remote file on Object Storage. The ``sync`` parameter controls this behavior. - when loading an existing app created with ``archive_uri``, the archive file will be pulled from object storage and saved in the local directory. Fetching pyspark Output ----------------------- After the application has run and any ``stdout`` captured in the log file the pyspark script most likely will produce some form of output. Usually a pyspark script will batch process something, for example, sampling data, aggregating data, preprocessing data. The resulting output can be loaded as an ``ADSDataset.open()`` using the ``ocis://`` protocol handler. The only way to get output from pyspark back into the notebook is to either create files on OCI Storage that is read into the notebook, or use the ``stdout`` stream. Here we show a simple example where the pyspark script produces some output that's printed in a portable format, here we show JSON-L, but CSV works too. This method while convenient as an example, it's not a recommended way for large data. .. code-block:: python3 from pyspark.sql import SparkSession def main(): # create a spark session spark = SparkSession \ .builder \ .appName("Python Spark SQL basic example") \ .getOrCreate() # load an example csv file from dataflow public storage into DataFrame original_df = spark\ .read\ .format("csv")\ .option("header", "true")\ .option("multiLine", "true")\ .load("oci://oow_2019_dataflow_lab@bigdatadatasciencelarge/usercontent/kaggle_berlin_airbnb_listings_summary.csv") # the dataframe as a sql view so we can perform SQL on it original_df.createOrReplaceTempView("berlin") query_result_df = spark.sql(""" SELECT city, zipcode, number_of_reviews, CONCAT(latitude, ',', longitude) AS lat_long FROM berlin""" ) # Convert the filtered Spark DataFrame into json format # Note: we are writing to the spark stdout log so that we can retrieve the log later at the end of the notebook. print('\n'\ .join(query_result_df\ .toJSON()\ .collect())) if __name__ == '__main__': main() Once the above has run the stdout stream (which contains CSV formatted data) can be interpreted as a string using ``Pandas`` .. code-block:: python3 import io import pandas as pd from ads.dataset.factory import DatasetFactory # the pyspark script wrote to the log as jsonL, and we read the log back as ADS dataset ds = DatasetFactory.open(pd.read_json((str(run.log_stdout)), lines=True)) ds.head()