Data Flow¶
Data Flow is an Oracle Cloud Infrastructure service for creating and running Spark applications. ADS can be used to to create and run PySpark Data Flow applications directly from notebook session.
Getting started with Data Flow¶
Note
We recommend that you use the pyspark
conda environment for Data Flow code development
Before running applications in Data Flow service, there are two storage buckets that are required in Object Store. Data Flow requires a bucket to store the logs, and a data warehouse bucket for Spark SQL application set up storage
Data Flow requires policies to be set in IAM to access resources in order to manage and run applications policy set up
Full Data Flow Documentation: Data Flow documentation
Create a Data Flow instance¶
You first need to create a DataFlow
object instance.
You can specify a default path where all the data flow artifacts will be stored using the dataflow_base_folder
optional argument. By default all data flow artifacts are stored under /home/datascience/dataflow
.
This directory dataflow_base_folder
contains multiple subdirectories, each one corresponding to a different application. The name of the subdirectory corresponds to the application name to which a random string is added as a suffix.
In each application directory, artifacts generated by separate data flow runs are stored in different folders. Each folder is identified by the run display name and the run creation time. All the run specific artifacts including the script, the run configuration and the run logs are saved in the corresponding run folder.
Also, you can choose to use a specific compartment by providing the optional argument compartment_id
when creating the dataflow instance. Otherwise, it will use the same compartment as your notebook session to create the instance.
from ads.dataflow.dataflow import DataFlow data_flow = DataFlow( compartment_id="<compartmentA_OCID>", dataflow_base_folder="<my_dataflow_dir>" )
Generate a script using template¶
We provide simple pyspark or sparksql templates for you to get started with Data Flow. You can use data_flow.template()
to generate a pre-written template.
We support two types of templates:
standard_pyspark
template, which is for standard pyspark jobsparksql
template which is for sparksql job
from ads.dataflow.dataflow import DataFlow data_flow = DataFlow() data_flow.template(job_type='standard_pyspark')
data_flow.template()
will return the local path to the script you have generated.
Create a Data Flow Application¶
The application creation process has two stages: preparation and creation stage. In the preparation stage, you prepare a configuration object necessary to create the Data Flow application. You need to provide values for the three parameters:
display_name
- the name you give your applicationscript_bucket
- the bucket used to read/write thepyspark
script in object storagepyspark_file_path
- the local path to yourpyspark
script
ADS checks that the bucket exists and that you can write to it from your notebook sesssion. Optionally, you can change values for the following parameters:
compartment_id
- OCID of the compartment to create a Data Flow application. If it’s not provided, use the same compartment as your dataflow object by default.logs_bucket
- the bucket used to store run logs in object storage, the default value is"dataflow-logs"
driver_shape
- driver shape to create the application, the default value is"VM.Standard2.4"
executor_shape
- executor shape to create the application, the default value is"VM.Standard2.4"
num_executors
- the number of executor VMs requested, the default value is1
Note
If you want to use a private bucket as the
logs_bucket
, make sure to add a corresponding Data Flow service policy using `Data Flow Identity: Policy Set Up`_.
Then you can use prepare_app()
to create the configuration object necessary to create the application.
from ads.dataflow.dataflow import DataFlow data_flow = DataFlow() app_config = data_flow.prepare_app( display_name="<app-display-name>", script_bucket="<your-script-bucket>" , pyspark_file_path="<your-scirpt-path>" )
Once you have the application configured, you can create a Data Flow application using create_app
:
app = data_flow.create_app(app_config)
Note
Your local script is uploaded to the script bucket in this application creation step. Object Storage supports file versioning that creates an object version when the content changes or the object is deleted. You can enable
Object Versioning
in your bucket in the Oracle Cloud Infrastructure Console to prevent overwriting of existing files in Object Storage.You can create an application with a script file that exists in Object Storage by setting
overwrite_script=True
increate_app
. Similarly, you can setoverwrite_archive=True
to create an application with an archive file that exists in Object Storage. By default, theoverwrite_script
andoverwrite_archive
options are set tofalse
.
app = data_flow.create_app(app_config, overwrite_script=True, overwrite_archive=True)
You can explore a few attributes of the DataFlowApp
object.
First you can look at the configuration of the application.
app.config
You can also get a url link to the OCI console Application Details page.
app.oci_link
Load an Existing Data Flow application¶
As an alternative to creating applications in ADS, you can load existing applications created elsewhere. These Data Flow applications must be Python applications. To load an existing applications, you need the applications’s OCID.
existing_app = data_flow.load_app(app_id, target_folder)
You can find the app_id
either directly in the the Oracle Cloud Infrastructure Console or by listing existing applications.
Optionally, you could assign a value to the parameter target_folder
. This parameter is the directory in which you want to store the local artifacts of this application. If target_folder
is not provided, then by default the local artifacts of this application are stored under the dataflow_base_folder
defined by the dataflow object instance.
Listing Data Flow Applications¶
From ADS you can list applications, that are returned a as a list of dicts, with a function to provide the data in a Pandas dataframe. The default sort order is the most recent run first.
For example, to list the most recent five applications use:
Create a Data Flow run¶
After an application is created or loaded in your notebook session, the next logical step is to execute a run of that application. The process of executing (aka creating) a run is similar to creating an application.
The first step is to configure the run. This can be done via the prepare_run()
method of the DataFlowApp
object. You only need to provide a value for the name of your run via run_display_name
.
run_config = app.prepare_run(run_display_name="<run-display-name>")
Note
You could use a compartment different from your application to create a run by specifying the
compartment_id
inprepare_run
. By default, it will use the same compartment as your dataflow application to create the run.Optionally you can specify the
logs_bucket
to store the logs of your run. By default, the run will inherit thelogs_bucket
from the parent application but you can overwrite that option.Every time the Data Flow application launches a run, a local folder representing this Data Flow run is created. This folder stores all the information including the script, the run configuration, and any logs that are stored in the logs bucket.
Then, you can create a Data Flow run using the run_config
generated in the preparation stage. During this process, you can monitor the Data Flow run while the job is running. You can also pull logs to your local directories by setting save_log_to_local=True
.
run = app.run(run_config, save_log_to_local=True)
The DataFlowRun
object has some useful attributes similar to the DataFlowApp
object.
You can check the status of the run by
run.status
You can get the config file that created this run. The run configuration and the pyspark script used in this run are also saved in the corresponding run directory in your notebook environment.
run.config
You can get the run directory where the artifacts are stored in your notebook environment with:
run.local_dir
Similarily, you can get a clickable link to the Oracle Cloud Infrastructure Console Run Details page with:
run.oci_link
Fetching Logs¶
After a Data Flow run has completed, you can examine the logs using ADS. There are two types of logs, stdout
and stderr
run.log_stdout.head() # show first rows of stdout run.log_stdout.tail() # show last lines of stdout # where the logs are stored on OCI Storage run.log_stdout.oci_path # the path to the saved logs in the notebook environment if ``save_log_to_local`` was ``True`` when you create this run run.log_stdout.local_path
If save_log_to_local
is set to False
during app.run(...)
, you can fetch logs by calling the fetch_log(...).save()
method on the DataFlowRun
object with the correct logs type.
run.fetch_log("stdout").save() run.fetch_log("stderr").save()
Note
Due to a limitation of pyspark
(specifically Python applications in Spark) both stdout
and stderr
are merged into the stdout
stream.
Edit and sync pyspark script¶
Data Flow integration with ADS supports the edit-run-edit cycle, the local pyspark script can be edited and will automatically be sync’d to object storage each time the app is run.
Data Flow sources the pyspark script from object storage
so the local files in the notebook session are not visible to Data Flow. The
app.run(...)
method will compare the content hash of the local file with the remote copy
on object storage, and if any change is detected, the new local version will be copied over to the remote. For
the first run the sync will create the remote file and generate a fully qualified
url with namespace that’s required for Data Flow.
Syncing is the default setting in app.run(...)
. If you do not want the app to sync with the local modified files, you need to include sync=False
as an argument parameter in app.run(...)
.
Arguments and Parameters¶
Passing arguments to pyspark scripts is done with the arguments
value in prepare_app
. Additional to arguments Data Flow
supports a parameter dictionary that may be used to interpolate arguments. To just simply pass arguments the script_parameter
section
may be ignored, however, any key/value pair defined in script_parameter
may be referened in arguments using the ${key}
syntax and
the value of that key will be passed as the argument value.
from ads.dataflow.dataflow import DataFlow data_flow = DataFlow() app_config = data_flow.prepare_app( display_name, script_bucket, pyspark_file_path, arguments = ['${foo}', 'bar', '-d', '--file', '${filename}'], script_parameters={ 'foo': 'val1 val2', 'filename': 'file1', } ) app = data_flow.create_app(app_config) run_config = app.prepare_run(run_display_name="test-run") run = app.run(run_config)
Note
The arguments in the format of ${arg}
will be replaced by the value provided in script parameters when passed in, while arguments not in this format are passed into the script verbatim.
You can override the values of some or all script parameters in each run by passing different values to prepare_run()
.
run_config = app.prepare_run(run_display_name="test-run", foo='val3') run = app.run(run_config)
Add Third-Party Libraries¶
Your PySpark applications might have custom dependencies in the form of Python wheels or virtual environments. Dataflow provided instructions on how to create the archive file here: Adding Third-Party Libraries to Data Flow Applications.
Pass the archive file to your dataflow applications with archive_path
and archive_bucket
values in prepare_app
.
archive_path
is the local path to archive filearchive_bucket
is the bucket used to read/write the archive file in object storage; if not provided,archive_bucket
will use the bucket for pyspark bucket by default.
Use prepare_app()
to create the configuration object necessary to create the application.
from ads.dataflow.dataflow import DataFlow data_flow = DataFlow() app_config = data_flow.prepare_app( display_name="<app-display-name>", script_bucket="<your-script-bucket>", pyspark_file_path="<your-scirpt-path>", archive_path="<your-archive-path>", archive_bucket="<your-archive-bucket>" )
The behavior of the archive file would be very similar to the pyspark script.
when creating an app, the local archive file is uploaded to the specified bucket Object Storage.
when creating a run, the latest local archive file is synchronized to the remote file on Object Storage. The
sync
parameter controls this behavior.when loading an existing app created with
archive_uri
, the archive file will be pulled from object storage and saved in the local directory.
Fetching pyspark Output¶
After the application has run and any stdout
captured in the log file the pyspark script
most likely will produce some form of output. Usually a pyspark script will batch process
something, for example, sampling data, aggregating data, preprocessing data. The resulting
output can be loaded as an ADSDataset.open()
using the ocis://
protocol handler.
The only way to get output from pyspark back into the notebook is to either create files
on OCI Storage that is read into the notebook, or use the stdout
stream. Here we show
a simple example where the pyspark script produces some output that’s printed in a portable
format, here we show JSON-L, but CSV works too. This method while convenient as an example,
it’s not a recommended way for large data.
from pyspark.sql import SparkSession
def main():
# create a spark session
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.getOrCreate()
# load an example csv file from dataflow public storage into DataFrame
original_df = spark\
.read\
.format("csv")\
.option("header", "true")\
.option("multiLine", "true")\
.load("oci://oow_2019_dataflow_lab@bigdatadatasciencelarge/usercontent/kaggle_berlin_airbnb_listings_summary.csv")
# the dataframe as a sql view so we can perform SQL on it
original_df.createOrReplaceTempView("berlin")
query_result_df = spark.sql("""
SELECT
city,
zipcode,
number_of_reviews,
CONCAT(latitude, ',', longitude) AS lat_long
FROM
berlin"""
)
# Convert the filtered Spark DataFrame into json format
# Note: we are writing to the spark stdout log so that we can retrieve the log later at the end of the notebook.
print('\n'\
.join(query_result_df\
.toJSON()\
.collect()))
if __name__ == '__main__':
main()
Once the above has run the stdout stream (which contains CSV formatted data) can be
interpreted as a string using Pandas
import io
import pandas as pd
from ads.dataset.factory import DatasetFactory
# the pyspark script wrote to the log as jsonL, and we read the log back as ADS dataset
ds = DatasetFactory.open(pd.read_json((str(run.log_stdout)), lines=True))
ds.head()