18.8 Create Data Flows

You can create data pipelines using DataFlow APIs. A Data Flow is composed of sources (one or more), data transformation operators and a target.

You can customize the data loading strategies using the LoadOptions available for the specific connector.

Create the file transform_data.py.

from datatransforms.workbench import DataTransformsWorkbench,WorkbenchConfig
from datatransforms.dataflow import DataFlow
from datatransforms.dataentity import DataEntity
from datatransforms.dataflow_load_options import OracleInsert,DataFlowIntegrationType
import logging

pswd="<your deployment pswd from secret store>"
connect_params = WorkbenchConfig.get_workbench_config(pswd)
workbench = DataTransformsWorkbench()
workbench.connect_workbench(connect_params)

##########################################################
#      Create the Target Table for the Data Flow         #
##########################################################
#pylint: disable=all
product_sales_entity = DataEntity().from_connection("Demo Target Data","DEMO_TARGET").entity_name("PRODUCT_SALES")
product_sales_entity.add_column(name="CUST_ID",position=2,dataType="NUMBER",dataTypeCode="NUMBER",length=38,scale=None)
product_sales_entity.add_column(name="TIME_ID",position=3,dataType="DATE",dataTypeCode="DATE",length=None,scale=None)
product_sales_entity.add_column(name="CHANNEL_ID",position=4,dataType="NUMBER",dataTypeCode="NUMBER",length=38,scale=None)
product_sales_entity.add_column(name="PROMO_ID",position=5,dataType="NUMBER",dataTypeCode="NUMBER",length=38,scale=None)
product_sales_entity.add_column(name="QUANTITY_SOLD",position=6,dataType="NUMBER",dataTypeCode="NUMBER",length=10,scale=2)
product_sales_entity.add_column(name="AMOUNT_SOLD",position=7,dataType="NUMBER",dataTypeCode="NUMBER",length=10,scale=2)
product_sales_entity.add_column(name="PROD_ID",position=8,dataType="NUMBER",dataTypeCode="NUMBER",length=6,scale=0)
product_sales_entity.add_column(name="PROD_NAME",position=9,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=50,scale=None)
product_sales_entity.add_column(name="PROD_DESC",position=10,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=4000,scale=None)
product_sales_entity.add_column(name="PROD_SUBCATEGORY",position=11,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=50,scale=None)
product_sales_entity.add_column(name="PROD_SUBCATEGORY_ID",position=12,dataType="NUMBER",dataTypeCode="NUMBER",length=38,scale=None)
product_sales_entity.add_column(name="PROD_SUBCATEGORY_DESC",position=13,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=2000,scale=None)
product_sales_entity.add_column(name="PROD_CATEGORY",position=14,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=50,scale=None)
product_sales_entity.add_column(name="PROD_CATEGORY_ID",position=15,dataType="NUMBER",dataTypeCode="NUMBER",length=38,scale=None)
product_sales_entity.add_column(name="PROD_CATEGORY_DESC",position=16,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=2000,scale=None)
product_sales_entity.add_column(name="PROD_WEIGHT_CLASS",position=17,dataType="NUMBER",dataTypeCode="NUMBER",length=3,scale=0)
product_sales_entity.add_column(name="PROD_UNIT_OF_MEASURE",position=18,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=20,scale=None)
product_sales_entity.add_column(name="PROD_PACK_SIZE",position=19,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=30,scale=None)
product_sales_entity.add_column(name="SUPPLIER_ID",position=20,dataType="NUMBER",dataTypeCode="NUMBER",length=6,scale=0)
product_sales_entity.add_column(name="PROD_STATUS",position=21,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=20,scale=None)
product_sales_entity.add_column(name="PROD_LIST_PRICE",position=22,dataType="NUMBER",dataTypeCode="NUMBER",length=8,scale=2)
product_sales_entity.add_column(name="PROD_MIN_PRICE",position=23,dataType="NUMBER",dataTypeCode="NUMBER",length=8,scale=2)
product_sales_entity.add_column(name="PROD_TOTAL",position=24,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=13,scale=None)
product_sales_entity.add_column(name="PROD_TOTAL_ID",position=25,dataType="NUMBER",dataTypeCode="NUMBER",length=38,scale=None)
product_sales_entity.add_column(name="PROD_SRC_ID",position=26,dataType="NUMBER",dataTypeCode="NUMBER",length=38,scale=None)
product_sales_entity.add_column(name="PROD_EFF_FROM",position=27,dataType="DATE",dataTypeCode="DATE",length=None,scale=None)
product_sales_entity.add_column(name="PROD_EFF_TO",position=28,dataType="DATE",dataTypeCode="DATE",length=None,scale=None)
product_sales_entity.add_column(name="PROD_VALID",position=29,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=1,scale=None)
workbench.save_data_entity(product_sales_entity)
 
 
 
####################################
#      Create a Data Flow          #
####################################
 
flow = DataFlow("Transform SH Data","MyProject")
 
#Define the Tables that will be used in the Data Flow
sales_data_entity=flow.use(connection_name="Demo Source Data",schema_name="SH",data_entity_name="SALES")
products_data_entity=flow.use(connection_name="Demo Source Data",schema_name="SH",data_entity_name="PRODUCTS")
productsales_data_entity=flow.use(connection_name="Demo Target Data",schema_name="DEMO_TARGET",data_entity_name="PRODUCT_SALES")
 
 
 
#By Default, Source columns are mapped to target columns by matching name. You can override column mappings
manual_column_mappings={
      "PRODUCT_SALES.PROD_NAME": "UPPER(PRODUCTS.PROD_NAME)"
}
override_column_mappings = {
    "column_mappings" : manual_column_mappings
}
 
OracleInsert.DROP_AND_CREATE_TARGET_TABLE=True
load_options=DataFlowIntegrationType.append()
load_options.update(OracleInsert.options())
load_options.update(override_column_mappings)
 
 
flow.from_source("SALES","Demo Source Data.SH.SALES").\
     from_source("PRODUCTS","Demo Source Data.SH.PRODUCTS").\
     join("Join","INNER","SALES.PROD_ID=PRODUCTS.PROD_ID").\
     filter_by("Filter","PRODUCTS.PROD_CATEGORY like 'Photo'").\
     load("PRODUCT_SALES","Demo Target Data.DEMO_TARGET.PRODUCT_SALES",load_options)
 
flow.create()