Kafka Related Pipelines

Kakfa widgets are available under the Stream Analytics. You can create kafka pipelines by using different kafka widgets like Kafka Source, Kafka Sink, Kafka Consumer, Kafka Producer, Kafka Monitor and a newly added widget called Kafka Windowing.

The following image illustrates the Kafka related pipelines:

Figure 11-79 Kafka related pipelines


Kafka related pipelines

The Kafka Consumer, Kafka Producer, Kafka Transform and Kafka Monitor widgets can be utilized in a pipeline in the following sequence
  1. Kafka Consumer
  2. Kafka Transform
  3. Kafka Producer
  4. Kafka Monitor

Create a Pipeline by Using the Kafka Widgets

Note:

The Kafka Input and Kafka Output topics that are created under the Kafka Topics screen must be mapped to the workspace for user to select it when creating pipelines.

Kafka Source

When you click the Kafka Source widget, the following manadatory parameters must be provided:

Figure 11-80 Kafka Source


Kafka Source

Table 11-18 Mandatory Parameters

Parameter Description
Input Topic Select the input topic.
Auto Offset Reset Select either of the two options:
  • Earliest- This displays the earliest starting data
  • Latest- This displays the latest data.
Enable auto commit Select either True or False.
Consumer Timeout(ms) Provide timeout time in milli seconds.
Group ID Provide any group ID (any data).
Transformation script Provide a script. For example, the following is the script of how data will be printed to the file:

import pandas as pd

data_out= { "Msg": data_prev}

pd.DataFrame(data_out).to_csv("/scratch/ofsaaapp/kafka/output/data_out.csv", sep='\t')

In order to utilize the Kafka Source in a pipeline, an additional python paragraph must be added after the Kafka Source widget. The following is an example of the python script:

%python

import pandas as pd

# Sample data with Name and Gender columns

data = {

'start': [1,1],

'end':[11,10],

'avg_amount':[100,124],

'count':[12,11]

}

# Create the DataFrame

df = pd.DataFrame(data)

print (df)

Kafka Sink

When you click the Kafka Sink widget, the following manadatory parameters must be provided:

Figure 11-81 Kafka Sink


Kafka Sink

Table 11-19 Mandatory Parameters

Parameter Description
Output Topic Select the output topic.
Input Dataframe Provide a value in this parameter.

In order to utilize the Kafka Sink in a pipeline, an additional python paragraph must be added first and thenthe Kafka Sink widget must be added.

Kafka Consumer

When you click the Kafka Consumer widget, the following manadatory parameters must be provided:

Figure 11-82 Kafka Consumer


Kafka Consumer

Table 11-20 Mandatory Parameters

Parameter Description
Input Topic Select the input topic.
Auto Offset Reset Select either of the two options:
  • Earliest- This displays the earliest starting data
  • Latest- This displays the latest data.
Enable auto commit Select either True or False.
Consumer Timeout(ms) Provide timeout time in milli seconds.
Group ID Provide any group ID (any data).
Transformation script Provide a script. For example, the following is the script of how data will be printed to the file:

import pandas as pd

data_out= { "Msg": data_prev}

pd.DataFrame(data_out).to_csv("/scratch/ofsaaapp/kafka/output/data_out.csv", sep='\t')

Kafka Producer

When you click the Kafka Producer widget, the following manadatory parameters must be provided:

Figure 11-83 Kafka Producer


Kafka Producer

  • Output Topic: Provide the output topic.

Kafka Transform

When you click the Kafka Transform widget, the following manadatory parameters must be provided:

Figure 11-84 Kafka Transform


Kafka Transform

  • Transformation script: Provide a script. For example, the following is the script on how data will be printed to the file:

    import pandas as pd

    data_out= { "Msg": data_prev}

    pd.DataFrame(data_out).to_csv("/scratch/ofsaaapp/kafka/output/data_out.csv", sep='\t')

Kafka Monitor

When you click the Kafka Monitor widget, the following manadatory parameters must be provided:

Figure 11-85 Kafka Monitor


Kafka Monitor

Table 11-21 Mandatory Parameters

Parameter Description
Time duration Provide the time in milliseconds.
Output partition to monior Enter any numeric value.
Output variable name Provide any name that will be the output

Kafka Windowing

The windowing widget integrates with the existing kafka producer and consumer widgets in a PyFlink pipeline. This widget enables you to perform windowed computations, tumbling or sliding, with various configurations on event streams flowing from Kafka.

It ensures that the windowing widget accepts inputs as specified, properly integrates within the producer-consumer pipelines, correctly applies windowing logic, and robustly handles legal/illegal configurations and edge cases.

The following are the inputs of the kafka window:

Figure 11-86 Kafka Windowing


Kafka Windowing

Table 11-22 Kafka Windowing Fields

Field Type Values / Rules
Window Assigner SELECT Tumbling and sliding
Window Size (sec) TEXT Positive integer
Window Slide (sec) TEXT Positive integer (sliding only, optional)
Window Type SELECT event_time, processing_time
Watermark Column TEXT Optional; for event_time windowing
Allowed Lateness (sec) TEXT Integer; max allowed lateness for event_time
Key By Column TEXT Column name for grouping
Window Function Body LONGTEXT PyFlink aggregate function body (Python code)
Note:
    • Required fields must not be blank.
    • For tumbling, Window Slide (sec) must be ignored/hidden/optional.
    • For sliding, Window Slide (sec) is required.
    • For event_time, Watermark Column and Allowed Lateness (sec) are required and validated.
    • Negative/zero values or non-integer inputs are rejected with an error message.

The following is an example configuration for the kafka windowing feature:

from pyflink.common import Row

def sum_amount_reduce(a, b):

"""

Reduce function to sum the Amount field between two Row objects,

keeping other fields from the first Row and using the latest TimeStamp.

Assumes Row schema:

(Account_Id, Txn_Origin, Amount, Trxn_Id, Txn_Channel, TimeStamp)

"""

return Row(

a[0], # Account_Id

a[1], # Txn_Origin

a[2] + b[2], # Amount

a[3], # Trxn_Id

a[4], # Txn_Channel

max(a[5], b[5]) # TimeStamp

)

Note:

The following features will be implemented in a future release:
    • Tooltips: Informative tooltips for each input field will be provided for user guidance.
    • Conditional Field Visibility:Display the Window Slide (sec) field only when the window assigner is set to sliding.
        • Watermark Column will be displayed only when the window type is event_time.