18.8 Create Data Flows

You can create data pipelines using DataFlow APIs. A Data Flow is composed of sources (one or more), data transformation operators and a target.

You can customize the data loading strategies using the LoadOptions available for the specific connector.

Create the file transform_data.py.

from datatransforms.workbench import DataTransformsWorkbench,WorkbenchConfig
from datatransforms.dataflow import DataFlow
from datatransforms.dataentity import DataEntity
from datatransforms.dataflow_load_options import OracleInsert,DataFlowIntegrationType
import logging

pswd="<your deployment pswd from secret store>"
connect_params = WorkbenchConfig.get_workbench_config(pswd)
workbench = DataTransformsWorkbench()
workbench.connect_workbench(connect_params)

##########################################################
#      Create the Target Table for the Data Flow         #
##########################################################
#pylint: disable=all
product_sales_entity = DataEntity().from_connection("Demo Target Data","DEMO_TARGET").entity_name("PRODUCT_SALES")
product_sales_entity.add_column(name="CUST_ID",position=2,dataType="NUMBER",dataTypeCode="NUMBER",length=38,scale=None)
product_sales_entity.add_column(name="TIME_ID",position=3,dataType="DATE",dataTypeCode="DATE",length=None,scale=None)
product_sales_entity.add_column(name="CHANNEL_ID",position=4,dataType="NUMBER",dataTypeCode="NUMBER",length=38,scale=None)
product_sales_entity.add_column(name="PROMO_ID",position=5,dataType="NUMBER",dataTypeCode="NUMBER",length=38,scale=None)
product_sales_entity.add_column(name="QUANTITY_SOLD",position=6,dataType="NUMBER",dataTypeCode="NUMBER",length=10,scale=2)
product_sales_entity.add_column(name="AMOUNT_SOLD",position=7,dataType="NUMBER",dataTypeCode="NUMBER",length=10,scale=2)
product_sales_entity.add_column(name="PROD_ID",position=8,dataType="NUMBER",dataTypeCode="NUMBER",length=6,scale=0)
product_sales_entity.add_column(name="PROD_NAME",position=9,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=50,scale=None)
product_sales_entity.add_column(name="PROD_DESC",position=10,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=4000,scale=None)
product_sales_entity.add_column(name="PROD_SUBCATEGORY",position=11,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=50,scale=None)
product_sales_entity.add_column(name="PROD_SUBCATEGORY_ID",position=12,dataType="NUMBER",dataTypeCode="NUMBER",length=38,scale=None)
product_sales_entity.add_column(name="PROD_SUBCATEGORY_DESC",position=13,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=2000,scale=None)
product_sales_entity.add_column(name="PROD_CATEGORY",position=14,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=50,scale=None)
product_sales_entity.add_column(name="PROD_CATEGORY_ID",position=15,dataType="NUMBER",dataTypeCode="NUMBER",length=38,scale=None)
product_sales_entity.add_column(name="PROD_CATEGORY_DESC",position=16,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=2000,scale=None)
product_sales_entity.add_column(name="PROD_WEIGHT_CLASS",position=17,dataType="NUMBER",dataTypeCode="NUMBER",length=3,scale=0)
product_sales_entity.add_column(name="PROD_UNIT_OF_MEASURE",position=18,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=20,scale=None)
product_sales_entity.add_column(name="PROD_PACK_SIZE",position=19,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=30,scale=None)
product_sales_entity.add_column(name="SUPPLIER_ID",position=20,dataType="NUMBER",dataTypeCode="NUMBER",length=6,scale=0)
product_sales_entity.add_column(name="PROD_STATUS",position=21,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=20,scale=None)
product_sales_entity.add_column(name="PROD_LIST_PRICE",position=22,dataType="NUMBER",dataTypeCode="NUMBER",length=8,scale=2)
product_sales_entity.add_column(name="PROD_MIN_PRICE",position=23,dataType="NUMBER",dataTypeCode="NUMBER",length=8,scale=2)
product_sales_entity.add_column(name="PROD_TOTAL",position=24,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=13,scale=None)
product_sales_entity.add_column(name="PROD_TOTAL_ID",position=25,dataType="NUMBER",dataTypeCode="NUMBER",length=38,scale=None)
product_sales_entity.add_column(name="PROD_SRC_ID",position=26,dataType="NUMBER",dataTypeCode="NUMBER",length=38,scale=None)
product_sales_entity.add_column(name="PROD_EFF_FROM",position=27,dataType="DATE",dataTypeCode="DATE",length=None,scale=None)
product_sales_entity.add_column(name="PROD_EFF_TO",position=28,dataType="DATE",dataTypeCode="DATE",length=None,scale=None)
product_sales_entity.add_column(name="PROD_VALID",position=29,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=1,scale=None)
workbench.save_data_entity(product_sales_entity)
 
 
 
####################################
#      Create a Data Flow          #
####################################
 
flow = DataFlow("Transform SH Data","MyProject")
 
#Define the Tables that will be used in the Data Flow
sales_data_entity=flow.use(connection_name="Demo Source Data",schema_name="SH",data_entity_name="SALES")
products_data_entity=flow.use(connection_name="Demo Source Data",schema_name="SH",data_entity_name="PRODUCTS")
productsales_data_entity=flow.use(connection_name="Demo Target Data",schema_name="DEMO_TARGET",data_entity_name="PRODUCT_SALES")
 
 
 
#By Default, Source columns are mapped to target columns by matching name. You can override column mappings
manual_column_mappings={
      "PRODUCT_SALES.PROD_NAME": "UPPER(PRODUCTS.PROD_NAME)"
}
override_column_mappings = {
    "column_mappings" : manual_column_mappings
}
 
OracleInsert.DROP_AND_CREATE_TARGET_TABLE=True
load_options=DataFlowIntegrationType.append()
load_options.update(OracleInsert.options())
load_options.update(override_column_mappings)
 
 
flow.from_source("SALES","Demo Source Data.SH.SALES").\
     from_source("PRODUCTS","Demo Source Data.SH.PRODUCTS").\
     join("Join","INNER","SALES.PROD_ID=PRODUCTS.PROD_ID").\
     filter_by("Filter","PRODUCTS.PROD_CATEGORY like 'Photo'").\
     load("PRODUCT_SALES","Demo Target Data.DEMO_TARGET.PRODUCT_SALES",load_options)
 
flow.create()

18.8.1 Using the Aggregate Operator

Understanding the aggregate function

The aggregate function enables data flow to perform data aggregation. The snippet below explains the function and its usage.
def aggregate(
    aggregate_name: Any,
    having_condition: Any,
    manual_groupby: Any,
    aggrregate_attributes: Any,
    column_map: Any,
    retain_projected_cols: bool = False
)
Adds aggregate operator in dataflow

Arguments

aggregate_name -- operator name
having_condition -- for aggregate operation
manual_groupby -- group by column for aggregate operation
aggrregate_attributes -- list of aggregate columns
column_map -- dictionary of column map with aggregate expression
retain_projected_cols -- Default is false, when set to True the operator will
receive all the projected column(s) from previous step.

Returns: the current dataflow object**

Preparing aggregate attributes

The SDK provides AggregateAttribute class, which helps creating Aggregate column definitions. The aggregate column(s) are added to an array and passed as aggrregate_attributes for aggregate step.
aggrregate_attributes=[] # prepare an empty array

aggrregate_attributes.append(
    AggregateAttribute(attribute_name="INVOICE_ID",
                        data_type="NUMBER",length=38,
                        expression="MY_INVOICE.INVOICE_ID",
                        is_group_by="YES")) # create an aggregate colum and add to array

Example Data Flow involving Aggregate operation

from datatransforms.workbench import DataTransformsWorkbench,WorkbenchConfig
from datatransforms.dataflow import DataFlow
from datatransforms.dataentity import DataEntity
from datatransforms.dataflow_load_options import OracleInsert,DataFlowIntegrationType
from datatransforms.dataflow import DataFlow,Project,AggregateAttribute
import logging

import sys
print(sys.path)

pswd="<deployment-password>"
connect_params = WorkbenchConfig.get_workbench_config(pswd)
workbench = DataTransformsWorkbench()
workbench.connect_workbench(connect_params)


flow = DataFlow("load_invoice_fact","FactLoader")

 
#Define the Tables that will be used in the Data Flow
source_table=flow.use(connection_name="MY_ADB",schema_name="QTEAM2",data_entity_name="MY_INVOICE")
source_table=flow.use(connection_name="MY_ADB",schema_name="QTEAM2",data_entity_name="MY_INVOICE_DETAIL")
target_table=flow.use(connection_name="MY_ADB",schema_name="QTEAM2",data_entity_name="MY_INVOICE_FACT")
 
 
OracleInsert.DROP_AND_CREATE_TARGET_TABLE=True
load_options=DataFlowIntegrationType.append()

#prepare aggregate column(s)
aggregate_attributes=[]
aggregate_attributes.append(AggregateAttribute("INVOICE_ID","NUMBER",38,"MY_INVOICE.INVOICE_ID", is_group_by="YES"))
aggregate_attributes.append(AggregateAttribute("INVOICE_LINE_AMOUNT_TOTAL","NUMBER",38,"SUM(MY_INVOICE_DETAIL.INVOICE_LINE_ITEM_AMOUNT)", is_group_by="NO"))
aggregate_attributes.append(AggregateAttribute("INVOICE_LINE_AMOUNT","NUMBER",38,"MY_INVOICE.INVOICE_AMOUNT", is_group_by="NO"))

#add custom column definitions if required for Aggregate.
custom_aggregate_mapping={}

#By Default, Source columns are mapped to target columns by matching name. You can override column mappings
manual_column_mappings={
      "MY_INVOICE_FACT.INVOICE_LINE_AMOUNT_TOTAL": "SUM_LINE_ITEM.INVOICE_LINE_ITEM_AMOUNT"
}
override_column_mappings = {
    "column_mappings" : manual_column_mappings
}

load_options.update(override_column_mappings)

flow.from_source("MY_INVOICE","MY_ADB.QTEAM2.MY_INVOICE").\
     from_source("MY_INVOICE_DETAIL","MY_ADB.QTEAM2.MY_INVOICE_DETAIL").\
     join("Join","INNER","MY_INVOICE.INVOICE_ID=MY_INVOICE_DETAIL.INVOICE_ID").\
     filter_by("Filter","MY_INVOICE.INVOICE_DATE > #last_load_date").\
     aggregate("SUM_LINE_ITEM", "", "",aggregate_attributes,custom_aggregate_mapping).\
     load("MY_INVOICE_FACT","MY_ADB.QTEAM2.MY_INVOICE_FACT",load_options)


flow.create()

18.8.2 Using Expression operator

Understanding the expression function

The aggregate function enables data flow to perform data aggregation. The snippet below explains the function and its usage.
    def expression(
    expression_name: Any,
    expression_attributes: Any,
    retain_projected_cols: bool = False
) -> DataFlow
Adds expression operation in dataflow

Arguments:

expression_name -- operator name
expression_attributes -- List of expression attributes
retain_projected_cols - default value is False, when set to True - the expression step
will receive all the projected column(s) from the previous step(s)
Returns:  the current dataflow object**

Preparing expression columns

The SDK provides ColumnDefinition class, which helps creating column definition in an Expression step. The column definitions are added to an array and passed as expression_columns for expression step.
expression_columns=[] # prepare an empty array
# create a colum for expression and add to expression_columns array
annual_pkg_column = ColumnDefinition(name="ANNUAL_PACKAGE",
                                 data_type="NUMERIC",length=8,scale=2,
                                 position=11,globalId="",
                                 expression=custom_column_expression_str))
expression_columns.append(annual_pkg_column)
Example Data Flow involving Expression operation
import sys
       
from datatransforms.dataflow import DataFlow,ExpressionAttributes,ColumnDefinition

from datatransforms.workbench import DataTransformsWorkbench,WorkbenchConfig 

#connect to workbench
connect_params = WorkbenchConfig.get_workbench_config("<deployment-password")
workbench = DataTransformsWorkbench()
workbench.connect_workbench(connect_params)
# prepare data flow -dataflow name, project code (not the project name), and the parent folder.
#  (default folder is "DefaultFolder" if is not provided
flow = DataFlow("DF_With_Expression","DFDemo")

#internally data entity is queried (REST) and column defintionas are available 
source_data_entity=flow.use(connection_name="src",schema_name="SRC_USER",data_entity_name="EMPLOYEE",alias_name="EMPLOYEES1")
target_data_entity=flow.use(connection_name="tgt",schema_name="TGT_USER",data_entity_name="EMPLOYEE")

#below is the custom expression string required for a ANNUAL_PACKAGE new column in expression
custom_column_expression_str="case when EMPLOYEES1.COMMISSION_PCT  is null then EMPLOYEES1.SALARY *12 else (EMPLOYEES1.SALARY *12* EMPLOYEES1.COMMISSION_PCT)/100 end"

#prepares a mapping for expression ,project all columns from EMPLOYEES1
expression_columns = []
#add new column definition
annual_pkg_column = ColumnDefinition(name="ANNUAL_PACKAGE",
                                     data_type="NUMERIC",length=8,scale=2,
                                     position=11,globalId="",
                                     expression=custom_column_expression_str)

expression_columns.append(annual_pkg_column)


#Prepares column mapping from multiple sources , maps to target 
store_column_mappings={}
store_column_mappings["EMPLOYEESTGT.SALARY"]="EXPRESSION.ANNUAL_PACKAGE"
#print(store_column_mappings)
load_options = {
    "column_mappings" : store_column_mappings,
    "integrationType" : "append",
}
flow.from_source("EMPLOYEES1","src.SRC_USER.EMPLOYEE").\
    filter_by("myFileter","ACC.EX > 1").\
    sort_by("Sorter","Here any condition can come").\
    expression("EXPRESSION",expression_columns).\
    load("EMPLOYEESTGT","tgt.TGT_USER.EMPLOYEE",load_options=load_options)

flow.create()