18.8 Create Data Flows
You can create data pipelines using DataFlow APIs. A Data Flow is composed of sources (one or more), data transformation operators and a target.
You can customize the data loading strategies using the LoadOptions available for the specific connector.
Create the file transform_data.py
.
from datatransforms.workbench import DataTransformsWorkbench,WorkbenchConfig
from datatransforms.dataflow import DataFlow
from datatransforms.dataentity import DataEntity
from datatransforms.dataflow_load_options import OracleInsert,DataFlowIntegrationType
import logging
pswd="<your deployment pswd from secret store>"
connect_params = WorkbenchConfig.get_workbench_config(pswd)
workbench = DataTransformsWorkbench()
workbench.connect_workbench(connect_params)
##########################################################
# Create the Target Table for the Data Flow #
##########################################################
#pylint: disable=all
product_sales_entity = DataEntity().from_connection("Demo Target Data","DEMO_TARGET").entity_name("PRODUCT_SALES")
product_sales_entity.add_column(name="CUST_ID",position=2,dataType="NUMBER",dataTypeCode="NUMBER",length=38,scale=None)
product_sales_entity.add_column(name="TIME_ID",position=3,dataType="DATE",dataTypeCode="DATE",length=None,scale=None)
product_sales_entity.add_column(name="CHANNEL_ID",position=4,dataType="NUMBER",dataTypeCode="NUMBER",length=38,scale=None)
product_sales_entity.add_column(name="PROMO_ID",position=5,dataType="NUMBER",dataTypeCode="NUMBER",length=38,scale=None)
product_sales_entity.add_column(name="QUANTITY_SOLD",position=6,dataType="NUMBER",dataTypeCode="NUMBER",length=10,scale=2)
product_sales_entity.add_column(name="AMOUNT_SOLD",position=7,dataType="NUMBER",dataTypeCode="NUMBER",length=10,scale=2)
product_sales_entity.add_column(name="PROD_ID",position=8,dataType="NUMBER",dataTypeCode="NUMBER",length=6,scale=0)
product_sales_entity.add_column(name="PROD_NAME",position=9,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=50,scale=None)
product_sales_entity.add_column(name="PROD_DESC",position=10,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=4000,scale=None)
product_sales_entity.add_column(name="PROD_SUBCATEGORY",position=11,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=50,scale=None)
product_sales_entity.add_column(name="PROD_SUBCATEGORY_ID",position=12,dataType="NUMBER",dataTypeCode="NUMBER",length=38,scale=None)
product_sales_entity.add_column(name="PROD_SUBCATEGORY_DESC",position=13,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=2000,scale=None)
product_sales_entity.add_column(name="PROD_CATEGORY",position=14,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=50,scale=None)
product_sales_entity.add_column(name="PROD_CATEGORY_ID",position=15,dataType="NUMBER",dataTypeCode="NUMBER",length=38,scale=None)
product_sales_entity.add_column(name="PROD_CATEGORY_DESC",position=16,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=2000,scale=None)
product_sales_entity.add_column(name="PROD_WEIGHT_CLASS",position=17,dataType="NUMBER",dataTypeCode="NUMBER",length=3,scale=0)
product_sales_entity.add_column(name="PROD_UNIT_OF_MEASURE",position=18,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=20,scale=None)
product_sales_entity.add_column(name="PROD_PACK_SIZE",position=19,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=30,scale=None)
product_sales_entity.add_column(name="SUPPLIER_ID",position=20,dataType="NUMBER",dataTypeCode="NUMBER",length=6,scale=0)
product_sales_entity.add_column(name="PROD_STATUS",position=21,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=20,scale=None)
product_sales_entity.add_column(name="PROD_LIST_PRICE",position=22,dataType="NUMBER",dataTypeCode="NUMBER",length=8,scale=2)
product_sales_entity.add_column(name="PROD_MIN_PRICE",position=23,dataType="NUMBER",dataTypeCode="NUMBER",length=8,scale=2)
product_sales_entity.add_column(name="PROD_TOTAL",position=24,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=13,scale=None)
product_sales_entity.add_column(name="PROD_TOTAL_ID",position=25,dataType="NUMBER",dataTypeCode="NUMBER",length=38,scale=None)
product_sales_entity.add_column(name="PROD_SRC_ID",position=26,dataType="NUMBER",dataTypeCode="NUMBER",length=38,scale=None)
product_sales_entity.add_column(name="PROD_EFF_FROM",position=27,dataType="DATE",dataTypeCode="DATE",length=None,scale=None)
product_sales_entity.add_column(name="PROD_EFF_TO",position=28,dataType="DATE",dataTypeCode="DATE",length=None,scale=None)
product_sales_entity.add_column(name="PROD_VALID",position=29,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=1,scale=None)
workbench.save_data_entity(product_sales_entity)
####################################
# Create a Data Flow #
####################################
flow = DataFlow("Transform SH Data","MyProject")
#Define the Tables that will be used in the Data Flow
sales_data_entity=flow.use(connection_name="Demo Source Data",schema_name="SH",data_entity_name="SALES")
products_data_entity=flow.use(connection_name="Demo Source Data",schema_name="SH",data_entity_name="PRODUCTS")
productsales_data_entity=flow.use(connection_name="Demo Target Data",schema_name="DEMO_TARGET",data_entity_name="PRODUCT_SALES")
#By Default, Source columns are mapped to target columns by matching name. You can override column mappings
manual_column_mappings={
"PRODUCT_SALES.PROD_NAME": "UPPER(PRODUCTS.PROD_NAME)"
}
override_column_mappings = {
"column_mappings" : manual_column_mappings
}
OracleInsert.DROP_AND_CREATE_TARGET_TABLE=True
load_options=DataFlowIntegrationType.append()
load_options.update(OracleInsert.options())
load_options.update(override_column_mappings)
flow.from_source("SALES","Demo Source Data.SH.SALES").\
from_source("PRODUCTS","Demo Source Data.SH.PRODUCTS").\
join("Join","INNER","SALES.PROD_ID=PRODUCTS.PROD_ID").\
filter_by("Filter","PRODUCTS.PROD_CATEGORY like 'Photo'").\
load("PRODUCT_SALES","Demo Target Data.DEMO_TARGET.PRODUCT_SALES",load_options)
flow.create()
Parent topic: Python API for Oracle Data Transforms
18.8.1 Using the Aggregate Operator
Understanding the aggregate function
The aggregate function enables data flow to perform data aggregation. The snippet below
explains the function and its
usage.
def aggregate(
aggregate_name: Any,
having_condition: Any,
manual_groupby: Any,
aggrregate_attributes: Any,
column_map: Any,
retain_projected_cols: bool = False
)
Adds aggregate operator in dataflow
Arguments
aggregate_name -- operator name
having_condition -- for aggregate operation
manual_groupby -- group by column for aggregate operation
aggrregate_attributes -- list of aggregate columns
column_map -- dictionary of column map with aggregate expression
retain_projected_cols -- Default is false, when set to True the operator will
receive all the projected column(s) from previous step.
Returns: the current dataflow object**
Preparing aggregate attributes
The SDK provides AggregateAttribute class, which helps creating Aggregate column
definitions. The aggregate column(s) are added to an array and passed as
aggrregate_attributes
for aggregate
step.aggrregate_attributes=[] # prepare an empty array
aggrregate_attributes.append(
AggregateAttribute(attribute_name="INVOICE_ID",
data_type="NUMBER",length=38,
expression="MY_INVOICE.INVOICE_ID",
is_group_by="YES")) # create an aggregate colum and add to array
Example Data Flow involving Aggregate operation
from datatransforms.workbench import DataTransformsWorkbench,WorkbenchConfig
from datatransforms.dataflow import DataFlow
from datatransforms.dataentity import DataEntity
from datatransforms.dataflow_load_options import OracleInsert,DataFlowIntegrationType
from datatransforms.dataflow import DataFlow,Project,AggregateAttribute
import logging
import sys
print(sys.path)
pswd="<deployment-password>"
connect_params = WorkbenchConfig.get_workbench_config(pswd)
workbench = DataTransformsWorkbench()
workbench.connect_workbench(connect_params)
flow = DataFlow("load_invoice_fact","FactLoader")
#Define the Tables that will be used in the Data Flow
source_table=flow.use(connection_name="MY_ADB",schema_name="QTEAM2",data_entity_name="MY_INVOICE")
source_table=flow.use(connection_name="MY_ADB",schema_name="QTEAM2",data_entity_name="MY_INVOICE_DETAIL")
target_table=flow.use(connection_name="MY_ADB",schema_name="QTEAM2",data_entity_name="MY_INVOICE_FACT")
OracleInsert.DROP_AND_CREATE_TARGET_TABLE=True
load_options=DataFlowIntegrationType.append()
#prepare aggregate column(s)
aggregate_attributes=[]
aggregate_attributes.append(AggregateAttribute("INVOICE_ID","NUMBER",38,"MY_INVOICE.INVOICE_ID", is_group_by="YES"))
aggregate_attributes.append(AggregateAttribute("INVOICE_LINE_AMOUNT_TOTAL","NUMBER",38,"SUM(MY_INVOICE_DETAIL.INVOICE_LINE_ITEM_AMOUNT)", is_group_by="NO"))
aggregate_attributes.append(AggregateAttribute("INVOICE_LINE_AMOUNT","NUMBER",38,"MY_INVOICE.INVOICE_AMOUNT", is_group_by="NO"))
#add custom column definitions if required for Aggregate.
custom_aggregate_mapping={}
#By Default, Source columns are mapped to target columns by matching name. You can override column mappings
manual_column_mappings={
"MY_INVOICE_FACT.INVOICE_LINE_AMOUNT_TOTAL": "SUM_LINE_ITEM.INVOICE_LINE_ITEM_AMOUNT"
}
override_column_mappings = {
"column_mappings" : manual_column_mappings
}
load_options.update(override_column_mappings)
flow.from_source("MY_INVOICE","MY_ADB.QTEAM2.MY_INVOICE").\
from_source("MY_INVOICE_DETAIL","MY_ADB.QTEAM2.MY_INVOICE_DETAIL").\
join("Join","INNER","MY_INVOICE.INVOICE_ID=MY_INVOICE_DETAIL.INVOICE_ID").\
filter_by("Filter","MY_INVOICE.INVOICE_DATE > #last_load_date").\
aggregate("SUM_LINE_ITEM", "", "",aggregate_attributes,custom_aggregate_mapping).\
load("MY_INVOICE_FACT","MY_ADB.QTEAM2.MY_INVOICE_FACT",load_options)
flow.create()
Parent topic: Create Data Flows
18.8.2 Using Expression operator
Understanding the expression function
The aggregate function enables data flow to perform data aggregation. The snippet below
explains the function and its
usage.
def expression(
expression_name: Any,
expression_attributes: Any,
retain_projected_cols: bool = False
) -> DataFlow
Adds expression operation in dataflow
Arguments:
expression_name -- operator name
expression_attributes -- List of expression attributes
retain_projected_cols - default value is False, when set to True - the expression step
will receive all the projected column(s) from the previous step(s)
Returns: the current dataflow object**
Preparing expression columns
The SDK provides ColumnDefinition class, which helps creating column definition in
an Expression step. The column definitions are added to an array and passed as
expression_columns for expression
step.
expression_columns=[] # prepare an empty array
# create a colum for expression and add to expression_columns array
annual_pkg_column = ColumnDefinition(name="ANNUAL_PACKAGE",
data_type="NUMERIC",length=8,scale=2,
position=11,globalId="",
expression=custom_column_expression_str))
expression_columns.append(annual_pkg_column)
Example Data Flow involving Expression
operation
import sys
from datatransforms.dataflow import DataFlow,ExpressionAttributes,ColumnDefinition
from datatransforms.workbench import DataTransformsWorkbench,WorkbenchConfig
#connect to workbench
connect_params = WorkbenchConfig.get_workbench_config("<deployment-password")
workbench = DataTransformsWorkbench()
workbench.connect_workbench(connect_params)
# prepare data flow -dataflow name, project code (not the project name), and the parent folder.
# (default folder is "DefaultFolder" if is not provided
flow = DataFlow("DF_With_Expression","DFDemo")
#internally data entity is queried (REST) and column defintionas are available
source_data_entity=flow.use(connection_name="src",schema_name="SRC_USER",data_entity_name="EMPLOYEE",alias_name="EMPLOYEES1")
target_data_entity=flow.use(connection_name="tgt",schema_name="TGT_USER",data_entity_name="EMPLOYEE")
#below is the custom expression string required for a ANNUAL_PACKAGE new column in expression
custom_column_expression_str="case when EMPLOYEES1.COMMISSION_PCT is null then EMPLOYEES1.SALARY *12 else (EMPLOYEES1.SALARY *12* EMPLOYEES1.COMMISSION_PCT)/100 end"
#prepares a mapping for expression ,project all columns from EMPLOYEES1
expression_columns = []
#add new column definition
annual_pkg_column = ColumnDefinition(name="ANNUAL_PACKAGE",
data_type="NUMERIC",length=8,scale=2,
position=11,globalId="",
expression=custom_column_expression_str)
expression_columns.append(annual_pkg_column)
#Prepares column mapping from multiple sources , maps to target
store_column_mappings={}
store_column_mappings["EMPLOYEESTGT.SALARY"]="EXPRESSION.ANNUAL_PACKAGE"
#print(store_column_mappings)
load_options = {
"column_mappings" : store_column_mappings,
"integrationType" : "append",
}
flow.from_source("EMPLOYEES1","src.SRC_USER.EMPLOYEE").\
filter_by("myFileter","ACC.EX > 1").\
sort_by("Sorter","Here any condition can come").\
expression("EXPRESSION",expression_columns).\
load("EMPLOYEESTGT","tgt.TGT_USER.EMPLOYEE",load_options=load_options)
flow.create()
Parent topic: Create Data Flows