Kafka Related Pipelines
Kakfa widgets are available under the Stream Analytics. You can create kafka pipelines by using different kafka widgets like Kafka Source, Kafka Sink, Kafka Consumer, Kafka Producer, Kafka Monitor and a newly added widget called Kafka Windowing.
Figure 11-79 Kafka related pipelines

- Kafka Consumer
- Kafka Transform
- Kafka Producer
- Kafka Monitor
Create a Pipeline by Using the Kafka Widgets
Note:
The Kafka Input and Kafka Output topics that are created under the Kafka Topics screen must be mapped to the workspace for user to select it when creating pipelines.Kafka Source
Figure 11-80 Kafka Source

Table 11-18 Mandatory Parameters
| Parameter | Description |
|---|---|
| Input Topic | Select the input topic. |
| Auto Offset Reset | Select either of the two options:
|
| Enable auto commit | Select either True or False. |
| Consumer Timeout(ms) | Provide timeout time in milli seconds. |
| Group ID | Provide any group ID (any data). |
| Transformation script | Provide a script. For example, the following is the
script of how data will be printed to the file:
|
In order to utilize the Kafka Source in a pipeline, an additional python paragraph must be added after the Kafka Source widget. The following is an example of the python script:
%python
import pandas as pd
# Sample data with Name and Gender columns
data = {
'start': [1,1],
'end':[11,10],
'avg_amount':[100,124],
'count':[12,11]
}
# Create the DataFrame
df = pd.DataFrame(data)
print (df)
Kafka Sink
Figure 11-81 Kafka Sink

Table 11-19 Mandatory Parameters
| Parameter | Description |
|---|---|
| Output Topic | Select the output topic. |
| Input Dataframe | Provide a value in this parameter. |
In order to utilize the Kafka Sink in a pipeline, an additional python paragraph must be added first and thenthe Kafka Sink widget must be added.
Kafka Consumer
Figure 11-82 Kafka Consumer

Table 11-20 Mandatory Parameters
| Parameter | Description |
|---|---|
| Input Topic | Select the input topic. |
| Auto Offset Reset | Select either of the two options:
|
| Enable auto commit | Select either True or False. |
| Consumer Timeout(ms) | Provide timeout time in milli seconds. |
| Group ID | Provide any group ID (any data). |
| Transformation script | Provide a script. For example, the following is the
script of how data will be printed to the file:
|
Kafka Producer
Figure 11-83 Kafka Producer

- Output Topic: Provide the output topic.
Kafka Transform
Figure 11-84 Kafka Transform

- Transformation script: Provide a script. For example, the following is
the script on how data will be printed to the file:
import pandas as pddata_out= { "Msg": data_prev}pd.DataFrame(data_out).to_csv("/scratch/ofsaaapp/kafka/output/data_out.csv", sep='\t')
Kafka Monitor
Figure 11-85 Kafka Monitor

Table 11-21 Mandatory Parameters
| Parameter | Description |
|---|---|
| Time duration | Provide the time in milliseconds. |
| Output partition to monior | Enter any numeric value. |
| Output variable name | Provide any name that will be the output |
Kafka Windowing
The windowing widget integrates with the existing kafka producer and consumer widgets in a PyFlink pipeline. This widget enables you to perform windowed computations, tumbling or sliding, with various configurations on event streams flowing from Kafka.
It ensures that the windowing widget accepts inputs as specified, properly integrates within the producer-consumer pipelines, correctly applies windowing logic, and robustly handles legal/illegal configurations and edge cases.
Figure 11-86 Kafka Windowing

Table 11-22 Kafka Windowing Fields
| Field | Type | Values / Rules |
|---|---|---|
| Window Assigner | SELECT | Tumbling and sliding |
| Window Size (sec) | TEXT | Positive integer |
| Window Slide (sec) | TEXT | Positive integer (sliding only, optional) |
| Window Type | SELECT | event_time, processing_time |
| Watermark Column | TEXT | Optional; for event_time windowing |
| Allowed Lateness (sec) | TEXT | Integer; max allowed lateness for event_time |
| Key By Column | TEXT | Column name for grouping |
| Window Function Body | LONGTEXT | PyFlink aggregate function body (Python code) |
Note:
|
||
The following is an example configuration for the kafka windowing feature:
from pyflink.common import Row
def sum_amount_reduce(a, b):
"""
Reduce function to sum the Amount field between two Row objects,
keeping other fields from the first Row and using the latest
TimeStamp.
Assumes Row schema:
(Account_Id, Txn_Origin, Amount, Trxn_Id, Txn_Channel, TimeStamp)
"""
return Row(
a[0], # Account_Id
a[1], # Txn_Origin
a[2] + b[2], # Amount
a[3], # Trxn_Id
a[4], # Txn_Channel
max(a[5], b[5]) # TimeStamp
)
Note:
The following features will be implemented in a future release:-
- Tooltips: Informative tooltips for each input field will be provided for user guidance.
- Conditional Field Visibility:Display the Window Slide
(sec) field only when the window assigner is set to
sliding.
-
- Watermark Column will be displayed only when the window type is event_time.
-