Jobs with ADS

Oracle Data Science jobs enables you define and run a repeatable machine learning task on a fully managed infrastructure, such as data preparation, model training, hyperparameter optimization, batch inference, and so on.

About Jobs

Data Science jobs allow you to run customized tasks outside of a notebook session. You can have Compute on demand and only pay for the Compute that you need. With jobs, you can run applications that perform tasks such as data preparation, model training, hyperparameter tuning, and batch inference. When the task is complete the compute automatically terminates. You can use the Logging service to capture output messages.

Using jobs, you can:

  • Run machine learning (ML) or data science tasks outside of your JupyterLab notebook session.

  • Operationalize discrete data science and machine learning tasks, such as reusable runnable operations.

  • Automate your MLOps or CI/CD pipeline.

  • Execute batch or workloads triggered by events or actions.

  • Batch, mini batch, or distributed batch job inference.

  • In a JupyterLab notebook session, you can launch long-running tasks or compute intensive tasks in a Data Science job to keep your notebook free for you to continue your work.

Typically, an ML and data science project is a series of steps including:

  • Access

  • Explore

  • Prepare

  • Model

  • Train

  • Validate

  • Deploy

  • Test

After the steps are completed, you can automate the process of data exploration, model training, deploying, and testing using jobs. A single change in the data preparation or model training to experiment with hyperparameter tunings can be run as a job and independently tested.

Jobs consist of a job and a job run.

Job

A job is a template that describes the task. It contains elements like the job artifact, which is immutable. It can’t be modified after being registered as a Data Science job. A job contains information about the Compute shape, logging configuration, Block Storage, and other options. You can configure environment variables can be configured that are used at run-time by the job run. You can also pass in CLI arguments. This allows a job run to be customized while using the same job as a template. You can override the environment variable and CLI parameters in job runs. Only the job artifact is immutable though the settings can be changed.

Job Run

A job run is an instantiation of a job. In each job run, you can override some of the job configuration. The most common configurations to change are the environment variables and CLI arguments. You can use the same job as a template and launch multiple simultaneous job runs to parallelize a large task. You can also sequence jobs and keep the state by writing state information to Object Storage.

For example, you could experiment with how different model classes perform on the same training data by using the ADSTuner to perform hyperparameter tuning on each model class. You could do this in parallel by having a different job run for each class of models. For a given job run, you could pass an environment variable that identifies the model class that you want to use. Each model cab write its results to the Logging service or Object Storage. Then you can run a final sequential job that uses the best model class, and trains the final model on the entire dataset.

Creating Jobs

A job is created by specifying the default job configuration parameters, and providing the immutable job artifact. The job artifact contains the job’s executable code. This artifact can be Python, Bash/Shell, or a ZIP or tar file containing an entire project written in Python or Java. When creating a job you specify the Compute shape, Block Storage, environment variables, CLI arguments, logging, and network settings.

Setting up Logging

If you have already defined the Logging service group and log, you use their OCIDs. The following section assumes that they haven’t been configured and the following code snippets create them.

import ads
import os
import random
import string

# Naming the resources that will be created consistently
resource_name = "model_deployment_jobs_" + "".join(random.choices(string.ascii_letters + string.digits,k=4))
print(f"Unique ID used in all the resources: {resource_name}")
if "OCI_RESOURCE_PRINCIPAL_VERSION" in os.environ:
    ads.set_auth("resource_principal")
from ads.common.oci_logging import OCILogGroup, OCILog
log_group = OCILogGroup(display_name=resource_name).create()
log = log_group.create_log(resource_name)

Working with the ADS Jobs API

Running code from a Python script as a Job

This example shows you how to create a job with a “Hello World” Python script. The Logging service log and log group are defined in the infrastructure. The output of the script appears in the logs.

import tempfile
job_script = tempfile.NamedTemporaryFile(suffix=".py")

# Write a simple 'Hello World' script
with open(job_script.name, mode="w") as f:
    f.write(f"""print("Hello World")""")
# Instantiate a job using the resource name defined previously
from ads.jobs import Job
job = Job(name=resource_name)

Next, you can specify the desired infrastructure to run the job. If you are in a notebook session, ADS can automatically fetch the infrastructure configurations and use them for the job. If you aren’t in a notebook session or you want to customize the infrastructure, you can specify them using the .with_infrastructure() and associated methods as in this example.

from ads.jobs import DataScienceJob
job.with_infrastructure(DataScienceJob() \
       .with_log_id(log.id) \
       .with_log_group_id(log_group.id))

The .with_runtime() method is used to define what is going to be executed in a job run. In this example, it is a Python script so the PythonRuntime() class is used to define the name of the script using the .with_source() method.

from ads.jobs import PythonRuntime
job.with_runtime(
    PythonRuntime().with_source(job_script.name))

Using a configuration file based on a YAML format is often more convenient than writing the configuration in Python. The ADS jobs module allows you to read a configuration from YAML. It also allows you to write an existing configuration into a YAML file.

# Serialize the Job and save it in a YAML file
job.to_yaml(uri=<"path_to_yaml_file">)
# Load the job back from the YAML file
job = job.from_yaml(uri=<"path_to_yaml_file">)

Equivalent in YAML

It is also possible to initialize a job directly from a YAML string. For example, to create a job identical to the preceding example, you could simply run the following:

job = Job.from_string(f"""
kind: job
spec:
  infrastructure:
    kind: infrastructure
    spec:
      jobInfrastructureType: STANDALONE
      jobType: DEFAULT
      logGroupId: {log_group.id}
      logId: {log.id}
    type: dataScienceJob
  name: {resource_name}
  runtime:
    kind: runtime
    spec:
      scriptPathURI: {job_script.name}
    type: python
""")

Finally, you create and run the job, which gives you access to the job_run.id.

job.create()
job_run = job.run()

Additionally, can acquire the job run using the OCID.

from ads.jobs import DataScienceJobRun
job_run = DataScienceJobRun.from_ocid(job_run.id)

The .watch() method is useful to monitor the progress of the job run.

job_run.watch()

After the job has been created and runs successfully, you can find the output of the script in the logs if you configured logging.

Adding command line arguments

If the Python script that you want to run as a job requires CLI arguments, use the .with_argument() method to pass the arguments to the job. The arguments are available using the system arguments in the Python script.

job_script = tempfile.NamedTemporaryFile(suffix=".py")

# Example script that uses command-line arguments
with open(job_script.name, mode="w") as f:
    f.write(f"""
import sys
print("Hello " + str(sys.argv[1]) + " and " + str(sys.argv[2]))
""")
job = Job()
job.with_infrastructure(
    DataScienceJob() \
    .with_log_group_id(log_group.id) \
    .with_log_id(log.id))

# The CLI argument can be passed in using `with_argument` when defining the runtime
job.with_runtime(
    PythonRuntime() \
       .with_source(job_script.name) \
       .with_argument("first_argument", "second_argument"))

job.create()
job_run = job.run()

Equivalent in YAML

You can define a job with a YAML string. In order to define a job identical to the preceding job, you could use the following code before running job.create() and job.run():

job = Job.from_yaml(f"""
kind: job
spec:
  infrastructure:
    kind: infrastructure
    spec:
      jobInfrastructureType: STANDALONE
      jobType: DEFAULT
      logGroupId: {log_group.id}
      logId: {log.id}
    type: dataScienceJob
  name: null
  runtime:
    kind: runtime
    spec:
      args:
      - first_argument
      - second_argument
      scriptPathURI: {job_script.name}
    type: python
""")

After the job run is created and run, you can use the .watch() method to monitor its progress.

job_run.watch()

Adding environment variables

Similarly, if the script you want to run requires environment variables, you also pass them in using the .with_environment_variable() method. The key-value pair of the environment variable are passed in using the .with_environment_variable() method, and are accessed in the Python script using the os.environ dictionary.

job_script = tempfile.NamedTemporaryFile(suffix=".py")

with open(job_script.name, mode="w") as f:
    f.write(f"""
import os
import sys
print("Hello " + os.environ["KEY1"] + " and " + os.environ["KEY2"])""")
# Define an environment variable value that will be passed in as part of the job
custom_env_var_value = "<your_value_here>"

job = Job()
job.with_infrastructure(
    DataScienceJob() \
    .with_log_group_id(log_group.id) \
    .with_log_id(log.id))

job.with_runtime(
    PythonRuntime() \
       .with_source(job_script.name) \
       .with_environment_variable(KEY1=custom_env_var_value))
job.create()
job_run = job.run()

Equivalent in YAML

The following code shows the equivalent way to build a job with a YAML string:

job = Job.from_yaml(f"""
kind: job
spec:
  infrastructure:
    kind: infrastructure
    spec:
      jobInfrastructureType: STANDALONE
      jobType: DEFAULT
      logGroupId: {log_group.id}
      logId: {log.id}
    type: dataScienceJob
  name: null
  runtime:
    kind: runtime
    spec:
      env:
      - name: KEY1
        value: first_variable_here
      - name: KEY2
        value: second_variable_here
      scriptPathURI: {job_script.name}
    type: python
""")

You can watch the progress of the job run using the .watch() method.

job_run.watch()

Running a Job with a Conda Environment

Using a Data Science Service Conda Environment

The API also provides an option to create a job run that uses a Data Science service conda environment. You pass in the slug of the conda environment you want to use into the .with_service_conda() method. This example uses the tensorflow26_p37_cpu_v1 conda environment:

job_script = tempfile.NamedTemporaryFile(suffix=".py")

with open(job_script.name, mode="w") as f:
    f.write(f"""
import ads
print("ADS version: " + str(ads.__version__))""")
ads_conda_pack = "tensorflow26_p37_cpu_v1"
from ads.jobs import ScriptRuntime
job = Job()
job.with_infrastructure(
    DataScienceJob() \
    .with_log_group_id(log_group.id) \
    .with_log_id(log.id))

job.with_runtime(
    ScriptRuntime() \
       .with_script(job_script.name) \
       .with_environment_variable(KEY1="VALUE1") \
       .with_service_conda(ads_conda_pack))
job.create()
job_run = job.run()

Equivalent in YAML

To generate the same job using a YAML string, use this code:

job = Job.from_yaml(f"""
kind: job
spec:
  infrastructure:
    kind: infrastructure
    spec:
      jobInfrastructureType: STANDALONE
      jobType: DEFAULT
      logGroupId: {log_group.id}
      logId: {log.id}
    type: dataScienceJob
  name: null
  runtime:
    kind: runtime
    spec:
      conda:
        slug: tensorflow26_p37_cpu_v1
        type: service
      env:
      - name: KEY1
        value: VALUE1
      scriptPathURI: {job_script.name}
    type: script
""")

The .watch() method allows you to track the progress of the job run.

job_run.watch()

Using a Custom Conda Environment

If you want to use a custom conda environment, the only change you need to make is in the definition of the ScriptRuntime:

ScriptRuntime().with_script("my_script")
    .with_custom_conda("oci://bucket@namespace/conda_pack/pack_name")

Equivalent in YAML

The full YAML string to generate a job that runs a custom conda environment is:

kind: job
spec:
  infrastructure:
    kind: infrastructure
    spec:
      jobInfrastructureType: STANDALONE
      jobType: DEFAULT
      logGroupId: ocid1.loggroup.oc1.iad.amaaaaaav66vvniaitywjb2b5rfhyvuxqido6t27yc6ko5czjbvc25zgbhsa
      logId: ocid1.log.oc1.iad.amaaaaaav66vvniav7hljx4twkle2mkhujkd4vgeqn6kagtkccycxmexyvnq
    type: dataScienceJob
  name: null
  runtime:
    kind: runtime
    spec:
      conda:
        type: published
        uri: oci://bucket@namespace/conda_pack/pack_name
      scriptPathURI: my_script
    type: script

Running code from a JupyterLab Notebook Session as a Job

In some cases, you may want to run an existing JupyterLab notebook as a job. You can do this using a NotebookRuntime() object. Magic commands in notebooks isn’t supported. If you have magic commands in your notebook, comment them out or remove them before running the notebook as a job.

To run the following example, you may need to install the aiohttp package from PyPi. Ensure that you have internet access to pull the notebook. This example uses an existing notebook from the TensorFlow documentation:

from ads.jobs import NotebookRuntime
job = Job()
job.with_infrastructure(
    DataScienceJob() \
    .with_log_id(log.id) \
    .with_log_group_id(log_group.id))
path_to_nb = "https://raw.githubusercontent.com/tensorflow/docs/master/site/en/tutorials/customization/basics.ipynb"
job.with_runtime(
    NotebookRuntime() \
    .with_notebook(path=path_to_nb)
    .with_service_conda(ads_conda_pack)
job.create()
job_run = job.run()

Equivalent in YAML

There is an alternative way to build an identical job using a YAML string:

job = Job.from_yaml(f"""
kind: job
spec:
  infrastructure:
    kind: infrastructure
    spec:
      jobInfrastructureType: STANDALONE
      jobType: DEFAULT
      logGroupId: {log_group.id}
      logId: {log.id}
    type: dataScienceJob
  name: null
  runtime:
    kind: runtime
    spec:
      conda:
        slug: tensorflow26_p37_cpu_v1
        type: service
      notebookPathURI: {path_to_nb}
    type: notebook
""")

After the job is created and running, you can use the .watch() API to monitor the job:

job_run.watch()

The API allows you to use exclusion tags, which lets you exclude cells from a job run. For example, you could use these tags to do exploratory data analysis, and then train and evaluate your model in a notebook. Then you can use that same notebook to only build future models that are trained on a different dataset. So the job run only has to execute the cells that are related to training the model, and not the exploratory data analysis or model evaluation.

You tag the cells, and then use the .with_exclude_tag() method. For example, if you tagged cells with ignore and remove, you pass in a list of tags to the method and those cells are excluded from the code that is executed as part of the job run. To tag cells in a notebook, we recommend using the JupyterLab celltags.

job.with_runtime(NotebookRuntime() \
            .with_notebook("path_to_notebook") \
            .with_exclude_tag(["ignore", "remove"])

Running code from a ZIP file or folder as a Job

The ScriptRuntime class that is part of ADS allows you to run code from a zipped file or folder:

job = Job(name=resource_name)
job.with_infrastructure(DataScienceJob() \
   .with_log_id(log.id) \
   .with_log_group_id(log_group.id))

Modify the SOURCE_PATH and ENTRYPOINT to indicate where the zipped file you intend to run as a job is located. You can also pick which conda environment to run it with.

from ads.jobs import ScriptRuntime
job.with_runtime(
        ScriptRuntime() \
        .with_source(SOURCE_PATH, entrypoint=ENRTYPOINT) \
        .with_service_conda("mlcpuv1"))
job.create()
job_run = job.run()

Equivalent in YAML

The following code shows how to build the same job using a YAML string:

job = Job.from_yaml(f"""
kind: job
spec:
  infrastructure:
    kind: infrastructure
    spec:
      jobInfrastructureType: STANDALONE
      jobType: DEFAULT
      logGroupId: {log_group.id}
      logId: {log.id}
    type: dataScienceJob
  name: null
  runtime:
    kind: runtime
    spec:
      conda:
        slug: mlcpuv1
        type: service
      entrypoint: {ENTRYPOINT}
      scriptPathURI: {SOURCE_PATH}
    type: script
""")

After the job run is creating and running, you can use the .watch() API to track the progress:

job_run.watch()

Running code from Git as a Job

The ADS GitPythonRuntime class allows you to run source code from a Git repository as a Data Science job. The next example shows how to run a Pytorch Neural Network Example to train third order polynomial predicting y=sin(x).

To configure the GitPythonRuntime, you must specify the source code url and entrypoint path. Similar to PythonRuntime, you can specify a service conda environment, environment variables, and CLI arguments. In this example, the pytorch19_p37_gpu_v1 service conda environment is used. The infrastructure is the same as the previous examples.

from ads.jobs import GitPythonRuntime
infrastructure = DataScienceJob().with_log_id(log.id) \
                                 .with_log_group_id(log_group.id)

runtime = (
    GitPythonRuntime() \
    .with_source("https://github.com/pytorch/tutorials.git") \
    .with_entrypoint("beginner_source/examples_nn/polynomial_nn.py") \
    .with_service_conda("pytorch19_p37_gpu_v1")
)

The default branch from the Git repository is used (for example, the “main” branch) unless you specify a different branch or commit on the .with_source() method if needed.

For a public repository, we recommend the “http://” or “https://” URL. Authentication may be required for the SSH URL even if the repository is public.

To use a private repository, you must first save an SSH key (for example, GitHub Deploy Key) to an OCI Vault as a secret, and provide the secret_ocid to the with_source() method. See Managing Secret with Vault.

The entrypoint specifies how the source code is invoked. The .with_entrypiont() has the following arguments:

  • path: Required. The relative path for the script/module/file to start the job.

  • func: Optional. The function that is to be called. If this

is not specified, the file is run as a Python script in a subprocess.

With the GitPythonRuntime class, you can save the output files from the job run to Object Storage. By default, the source code is cloned to the ~/Code directory. However, in this example the files in the example_nn directory are copied to the Object Storage specified by the output_uri parameter. The output_uri parameter should have the format of oci://BUCKET_NAME@BUCKET_NAMESPACE/PREFIX.

OUTPUT_URI = "oci://BUCKET_NAME@BUCKET_NAMESPACE/PREFIX"
runtime.with_output(output_dir="~/Code/tutorials/beginner_source/examples_nn", output_uri=OUTPUT_URI)
job = (
    Job() \
    .with_infrastructure(infrastructure) \
    .with_runtime(runtime) \
).create()

Equivalent in YAML

Again, you could create the job with a YAML file:

job = Job.from_yaml(f"""
kind: job
spec:
  infrastructure:
    kind: infrastructure
    spec:
      jobInfrastructureType: STANDALONE
      jobType: DEFAULT
      logGroupId: {log_group.id}
      logId: {log.id}
    type: dataScienceJob
  name: null
  runtime:
    kind: runtime
    spec:
      conda:
        slug: pytorch19_p37_gpu_v1
        type: service
      entrypoint: beginner_source/examples_nn/polynomial_nn.py
      outputDir: ~/Code/tutorials/beginner_source/examples_nn
      outputUri: oci://BUCKET_NAME@BUCKET_NAMESPACE/PREFIX
      url: https://github.com/pytorch/tutorials.git
    type: gitPython
""")

After the job is created, you can run it, and then the job run can be monitored using the .watch() API:

job.run().watch()

The GitPythonRuntime also supports additional configurations:

  • The .with_python_path() method allows you to add additional Python paths

to the runtime. The code directory checked out from Git is added to sys.path by default. Additional Python paths are appended before the code directory is appended. * The .with_argument() method allows you to pass arguments to invoke the script or function. For running a script, the arguments are passed in as CLI arguments. For running a function, JSON serializable objects (list and dict) are supported and are passed into the function.

For example:

runtime = (
    GitPythonRuntime() \
    .with_source("YOUR_GIT_URL") \
    .with_entrypoint(path="YOUR_MODULE_PATH", func="YOUR_FUNCTION") \
    .with_service_conda("pytorch19_p37_gpu_v1") \
    .with_argument("val", ["a", "b"], key=dict(k="v"))
)

The GitPythonRuntime updates metadata in the freeform tags of the job run after the job run finishes. The following tags are added automatically:

  • repo: The URL of the Git repository.

  • commit: The Git commit ID.

  • module: The entry script/module.

  • method: The entry function/method.

  • outputs: The prefix of the output files in Object Storage.

The new values overwrite any existing tags. If you want to skip the metadata update, set skip_metadata_update to True when initializing the runtime.

runtime = GitPythonRuntime(skip_metadata_update=True)

Running a Data Flow Application as a Job

OCI Data Flow is a service for creating and running Spark applications. The following examples demonstrate how to create and run Dat aFlow applications using ADS.

To create and run a Data Flow application, you must specify a compartment and a bucket for storing logs under the same compartment:

compartment_id = "<compartment_id>"
logs_bucket_uri = "<logs_bucket_uri>"

Ensure that you have the correct policies set up. For instance, for Data Flow to access logs bucket, use a policy like:

ALLOW SERVICE dataflow TO READ objects IN tenancy WHERE target.bucket.name='dataflow-logs'

For more information, se the Data Flow Documentation.

Update oci_profile if not using the default:

oci_profile = "DEFAULT"
ads.set_auth(auth="api_key", profile=oci_profile)

To create a Data Flow application you need two components:

  • DataFlow, a subclass of Infrastructure

  • DataFlowRuntime, a subclass of Runtime

DataFlow stores properties specific to Data Flow service, such as compartment_id, logs_bucket_uri, and so on. DataFlowRuntime stores properties related to the script to be run, such as the path to the script and CLI arguments. Since service configurations remain mostly unchanged across multiple experiments, a DataFlow object can be reused and combined with various DataFlowRuntimes to create applications. All properties can be set using the with_{property_name} method. For a complete list of properties and setter functions, see the ADS classes.

In the following example, DataFlow is populated with compartment_id, driver_shape, and executor_shape. DataFlowRuntime is populated with script_uri and script_bucket. script_uri specifies the path to the script. It can be local or remote (an Object Storage path). If the path is local, then script_bucket must be specified additionally because Data Flow requires a script to be available in Object Storage. ADS performs the upload step for you, as long as you give the bucket name or the Object Storage path prefix to upload the script. Either can be given to script_bucket. For example,  either with_script_bucket("<bucket_name>") or with_script_bucket("oci://<bucket_name>@<namespace>/<prefix>") is accepted. In this example, the prefix is given:

from ads.jobs import DataFlow, DataFlowRun, DataFlowRuntime
from uuid import uuid4

script_prefix = "oci://<bucket>@<namespace>/prefix/path"

with tempfile.TemporaryDirectory() as td:
    with open(os.path.join(td, "script.py"), "w") as f:
        f.write('''
from pyspark.sql import SparkSession

def main():

    # Create a Spark session
    spark = SparkSession \
        .builder \
        .appName("Python Spark SQL basic example") \
        .getOrCreate()

    # Load a csv file from dataflow public storage
    df = spark \
        .read \
        .format("csv") \
        .option("header", "true") \
        .option("multiLine", "true") \
        .load("oci://oow_2019_dataflow_lab@bigdatadatasciencelarge/usercontent/kaggle_berlin_airbnb_listings_summary.csv")

    # Create a temp view and do some SQL operations
    df.createOrReplaceTempView("berlin")
    query_result_df = spark.sql("""
        SELECT
            city,
            zipcode,
            CONCAT(latitude,',', longitude) AS lat_long
        FROM berlin
    """)

    # Convert the filtered Spark DataFrame into JSON format
    # Note: Writing to the spark stdout log so thatyou can retrieve the log later at the end of the notebook.
    print('\\n'.join(query_result_df.toJSON().collect()))

if __name__ == '__main__':
    main()
''')

    name = f"dataflow-app-{str(uuid4())}"
    dataflow_configs = DataFlow()\
        .with_compartment_id(compartment_id)\
        .with_logs_bucket_uri(logs_bucket_uri)\
        .with_driver_shape("VM.Standard2.1") \
        .with_executor_shape("VM.Standard2.1")
    runtime_config = DataFlowRuntime()\
        .with_script_uri(os.path.join(td, "script.py"))\
        .with_script_bucket(script_prefix)
    df = Job(name=name, infrastructure=dataflow_configs, runtime=runtime_config)
    df.create()
df_run = df.run()

You can save the application specification into a YAML file for future reuse. The json format is also available.

print(df.to_yaml("sample-df.yaml"))

You can also load a Data Flow application directly from the YAML file saved in the prior step:

df2 = Job.from_yaml(uri="sample-df.yaml")

Creating a new job and a run:

df_run2 = df2.create().run()

Deleting a job cancels associated runs:

df2.delete()
df_run2.status

You can also load a Data Flow application from an OCID:

df3 = Job.from_dataflow_job(df.id)

Creating a run under the same application:

df_run3 = df3.run()

Now there are 2 runs under application df:

assert len(df.run_list()) == 2

When you run a Data Flow application, a DataFlowRun object is created. You can check the status, wait for a run to finish, check its logs afterwards, or cancel a run in progress. For example:

df_run.status
df_run.wait()

There are three types of logs for a run:

  • application log

  • driver log

  • executor log Each log consists of stdout and stderr.

For example, to access stdout from application log, you could use:

df_run.logs.application.stdout

And you could check it with:

df_run.logs.application.stderr
df_run.logs.executor.stdout
df_run.logs.executor.stderr

A link to run the page in the OCI Console is given using the run_details_link property:

df_run.run_details_link

To list Data Flow applications, a compartment id must be given with any optional filtering criteria. For example, you can filter by name of the application:

Job.dataflow_job(compartment_id=compartment_id, display_name=name)

Equivalent in YAML

To create a Data Flow job directly from a YAML string. You can pass a YAML string into the Job.from_yaml() function to build a Data Flow job:

kind: job
spec:
  id: <dataflow_app_ocid>
  infrastructure:
    kind: infrastructure
    spec:
      compartmentId: <compartment_id>
      driverShape: VM.Standard2.1
      executorShape: VM.Standard2.1
      id: <dataflow_app_ocid>
      language: PYTHON
      logsBucketUri: <logs_bucket_uri>
      numExecutors: 1
      sparkVersion: 2.4.4
    type: dataFlow
  name: dataflow_app_name
  runtime:
    kind: runtime
    spec:
      scriptBucket: bucket_name
      scriptPathURI: oci://<bucket_name>@<namespace>/<prefix>
    type: dataFlow