18.9 Create Workflows

Create the file orchestrate_jobs.py.

from datatransforms.workbench import DataTransformsWorkbench,WorkbenchConfig
from datatransforms.workflow import Workflow,DataFlowStep,SqlStep,DataLoadStep

pswd="<your deployment pswd from secret store>"
connect_params = WorkbenchConfig.get_workbench_config(pswd)
workbench = DataTransformsWorkbench()
workbench.connect_workbench(connect_params)

####################################
#       Create a Worklow           #
####################################

#define steps in workflow , each step has name and its type
dls = DataLoadStep("Load SH Data","Load SH Data",firstStep=True)
dfs=DataFlowStep("Transform SH Data",data_flow_name="Transform SH Data",data_flow_project="MyProject")
ss_success = SqlStep("Processing Success","Demo Target Data","select SYSDATE from DUAL")
ss_failed = SqlStep("Processing Failed","Demo Target Data","select SYSDATE from DUAL")
 
#define the execution order
dls.ok(dfs).ok(ss_success)
dls.nok(ss_failed)
dfs.nok(ss_failed)
 
wf = Workflow("Process SH Data",project="MyProject")
wf.add_execution_steps([dls,dfs,ss_success,ss_failed])
wf.create()