18.8 Create Data Flows
You can create data pipelines using DataFlow APIs. A Data Flow is composed of sources (one or more), data transformation operators and a target.
You can customize the data loading strategies using the LoadOptions available for the specific connector.
Create the file transform_data.py
.
from datatransforms.workbench import DataTransformsWorkbench,WorkbenchConfig
from datatransforms.dataflow import DataFlow
from datatransforms.dataentity import DataEntity
from datatransforms.dataflow_load_options import OracleInsert,DataFlowIntegrationType
import logging
pswd="<your deployment pswd from secret store>"
connect_params = WorkbenchConfig.get_workbench_config(pswd)
workbench = DataTransformsWorkbench()
workbench.connect_workbench(connect_params)
##########################################################
# Create the Target Table for the Data Flow #
##########################################################
#pylint: disable=all
product_sales_entity = DataEntity().from_connection("Demo Target Data","DEMO_TARGET").entity_name("PRODUCT_SALES")
product_sales_entity.add_column(name="CUST_ID",position=2,dataType="NUMBER",dataTypeCode="NUMBER",length=38,scale=None)
product_sales_entity.add_column(name="TIME_ID",position=3,dataType="DATE",dataTypeCode="DATE",length=None,scale=None)
product_sales_entity.add_column(name="CHANNEL_ID",position=4,dataType="NUMBER",dataTypeCode="NUMBER",length=38,scale=None)
product_sales_entity.add_column(name="PROMO_ID",position=5,dataType="NUMBER",dataTypeCode="NUMBER",length=38,scale=None)
product_sales_entity.add_column(name="QUANTITY_SOLD",position=6,dataType="NUMBER",dataTypeCode="NUMBER",length=10,scale=2)
product_sales_entity.add_column(name="AMOUNT_SOLD",position=7,dataType="NUMBER",dataTypeCode="NUMBER",length=10,scale=2)
product_sales_entity.add_column(name="PROD_ID",position=8,dataType="NUMBER",dataTypeCode="NUMBER",length=6,scale=0)
product_sales_entity.add_column(name="PROD_NAME",position=9,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=50,scale=None)
product_sales_entity.add_column(name="PROD_DESC",position=10,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=4000,scale=None)
product_sales_entity.add_column(name="PROD_SUBCATEGORY",position=11,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=50,scale=None)
product_sales_entity.add_column(name="PROD_SUBCATEGORY_ID",position=12,dataType="NUMBER",dataTypeCode="NUMBER",length=38,scale=None)
product_sales_entity.add_column(name="PROD_SUBCATEGORY_DESC",position=13,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=2000,scale=None)
product_sales_entity.add_column(name="PROD_CATEGORY",position=14,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=50,scale=None)
product_sales_entity.add_column(name="PROD_CATEGORY_ID",position=15,dataType="NUMBER",dataTypeCode="NUMBER",length=38,scale=None)
product_sales_entity.add_column(name="PROD_CATEGORY_DESC",position=16,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=2000,scale=None)
product_sales_entity.add_column(name="PROD_WEIGHT_CLASS",position=17,dataType="NUMBER",dataTypeCode="NUMBER",length=3,scale=0)
product_sales_entity.add_column(name="PROD_UNIT_OF_MEASURE",position=18,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=20,scale=None)
product_sales_entity.add_column(name="PROD_PACK_SIZE",position=19,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=30,scale=None)
product_sales_entity.add_column(name="SUPPLIER_ID",position=20,dataType="NUMBER",dataTypeCode="NUMBER",length=6,scale=0)
product_sales_entity.add_column(name="PROD_STATUS",position=21,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=20,scale=None)
product_sales_entity.add_column(name="PROD_LIST_PRICE",position=22,dataType="NUMBER",dataTypeCode="NUMBER",length=8,scale=2)
product_sales_entity.add_column(name="PROD_MIN_PRICE",position=23,dataType="NUMBER",dataTypeCode="NUMBER",length=8,scale=2)
product_sales_entity.add_column(name="PROD_TOTAL",position=24,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=13,scale=None)
product_sales_entity.add_column(name="PROD_TOTAL_ID",position=25,dataType="NUMBER",dataTypeCode="NUMBER",length=38,scale=None)
product_sales_entity.add_column(name="PROD_SRC_ID",position=26,dataType="NUMBER",dataTypeCode="NUMBER",length=38,scale=None)
product_sales_entity.add_column(name="PROD_EFF_FROM",position=27,dataType="DATE",dataTypeCode="DATE",length=None,scale=None)
product_sales_entity.add_column(name="PROD_EFF_TO",position=28,dataType="DATE",dataTypeCode="DATE",length=None,scale=None)
product_sales_entity.add_column(name="PROD_VALID",position=29,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=1,scale=None)
workbench.save_data_entity(product_sales_entity)
####################################
# Create a Data Flow #
####################################
flow = DataFlow("Transform SH Data","MyProject")
#Define the Tables that will be used in the Data Flow
sales_data_entity=flow.use(connection_name="Demo Source Data",schema_name="SH",data_entity_name="SALES")
products_data_entity=flow.use(connection_name="Demo Source Data",schema_name="SH",data_entity_name="PRODUCTS")
productsales_data_entity=flow.use(connection_name="Demo Target Data",schema_name="DEMO_TARGET",data_entity_name="PRODUCT_SALES")
#By Default, Source columns are mapped to target columns by matching name. You can override column mappings
manual_column_mappings={
"PRODUCT_SALES.PROD_NAME": "UPPER(PRODUCTS.PROD_NAME)"
}
override_column_mappings = {
"column_mappings" : manual_column_mappings
}
OracleInsert.DROP_AND_CREATE_TARGET_TABLE=True
load_options=DataFlowIntegrationType.append()
load_options.update(OracleInsert.options())
load_options.update(override_column_mappings)
flow.from_source("SALES","Demo Source Data.SH.SALES").\
from_source("PRODUCTS","Demo Source Data.SH.PRODUCTS").\
join("Join","INNER","SALES.PROD_ID=PRODUCTS.PROD_ID").\
filter_by("Filter","PRODUCTS.PROD_CATEGORY like 'Photo'").\
load("PRODUCT_SALES","Demo Target Data.DEMO_TARGET.PRODUCT_SALES",load_options)
flow.create()
Parent topic: Python API for Oracle Data Transforms