Configure

The Morpheus pipeline exposes several key parameters that you can tune to optimize performance based on the specific hardware and data characteristics. While you can use the default values for demonstration purposes, you must understand these parameters for deployment on production environments. These parameters are exposed as command-line arguments in the run.py script.

--pipeline_batch_size: This parameter controls the number of messages (such as, transactions) that are grouped together for processing as they move between stages in the pipeline.

--model_max_batch_size: This defines the maximum number of records sent to the GNN and XGBoost models for inference in a single request.

--num_threads: This sets the number of internal CPU threads used by the Morpheus pipeline engine for orchestration and I/O.

--edge_buffer_size: Controls how much data is stored in memory at edges between pipeline stages. Useful for tuning backpressure and throughput.

--log_level: Set to INFO, DEBUG, and so on to monitor pipeline behavior during tuning.

Validate Morpheus Pipeline

Follow these steps to validate and run the pipeline using local files for input and output:

  1. Clone and align the Morpheus repository using the following commands:
    # Navigate to your project directory
    cd $PROJECT_DIR
    # Clone the repository
    git clone https://github.com/nv-morpheus/Morpheus.git
    # 1. Navigate to the mounted code directory 
    cd Morpheus
    # BEST PRACTICE: Check out the version tag matching the Docker container
    git checkout v25.02.00
    # Download large model and data files using Git LFS
    git lfs install
    git lfs pull

    Caution:

    Use the version of the Morpheus code that matches the version of the Docker container to prevent dependency conflicts.
  2. After it completes, you can verify the output was created.
    # --- INSIDE THE CONTAINER ---
    head .tmp/output/gnn_fraud_detection_output.csv
  3. Exit the container.

Set Up Real-Time Streaming with Kafka

You can now evolve the architecture to a production-style streaming pipeline.

Follow these steps to set up the Kafka environment and run the services:

  1. On your host machine, use the following command to navigate to your project root directory:
    cd $PROJECT_DIR
    # File: <your-project-directory>/docker-compose.yml
  2. Create a docker-compose.yml file with the following contents to define the Kafka and Apache Zookeeper services.
    version: '3'
    services:
     zookeeper:
        image: confluentinc/cp-zookeeper:7.0.1
        container_name: zookeeper   
        environment:
          ZOOKEEPER_CLIENT_PORT: 2181
          ZOOKEEPER_TICK_TIME: 2000
     kafka:
        image: confluentinc/cp-kafka:7.0.1
        container_name: kafka
        depends_on:
          - zookeeper
        ports:      -
          "9092:9092"
        environment:
          KAFKA_BROKER_ID: 1     
          KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092     
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1     
          KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
  3. Start the Kafka services and create the topics.
    # --- ON THE HOST MACHINE ---# 
    Clean up any old containers from previous attempts 
    docker compose down --volumes--remove-orphans 
    # Start Kafka and Zookeeper in the background docker compose up -d
    # Create the input and output topics
    docker exec kafka kafka-topics --create --topic gnn_fraud_input --bootstrap-server localhost:9092 
    docker exec kafka kafka-topics --create --topic gnn_fraud_output --bootstrap-server localhost:9092

Create Kafka Helper Scripts

Set up the host Python environment to prevent Python package conflicts on your host machine.

# --- ON THE HOST MACHINE ---
# Navigate to your project directory (e.g., ~/morpheus_fraud_detection) 
cd $PROJECT_DIR# Create an isolated Python virtual environment 
python3 -m venv kafka_env 
# Activate the environment 
source kafka_env/bin/activate

Your terminal prompt will now be prefixed with (kafka_env).

  1. Install the required Python library on your host (inside the virtual environment):
    # Ensure the (kafka_env) is active
    pip install kafka-python
  2. On your host machine, create two Python scripts in your project directory to produce data and consume the results.
  3. Create the producer file using the following script:
    # File: $PROJECT_DIR/producer.py\
    import csv\
    from kafka import KafkaProducer\
    import time\
    \
    producer = KafkaProducer(\
        bootstrap_servers='localhost:9092',\
        value_serializer=lambda v: v.encode('utf-8')\
    )\
    csv_file_path = './Morpheus/examples/gnn_fraud_detection_pipeline/validation.csv'\
    input_topic = 'gnn_fraud_input'\
    print(f"Streaming data from \{csv_file_path\} to Kafka topic '\{input_topic\}'...")\
    with open(csv_file_path, 'r') as file:\
        header = next(file)\
        reader = csv.reader(file)\
        for row in reader:\
            message = header.strip() + '\\n' + ','.join(row).strip()\
            producer.send(input_topic, value=message)\
            print(f"Sent transaction index: \{row[0]\}")\
            time.sleep(0.2)\
    producer.flush()\
    print("Finished streaming data.")}
  4. Create the consumer file using the following script:
    # File: $PROJECT_DIR/consumer.py\
    from kafka import KafkaConsumer\
    \
    consumer = KafkaConsumer(\
        'gnn_fraud_output',\
        bootstrap_servers='localhost:9092',\
        auto_offset_reset='earliest',\
        group_id='fraud-demo-consumer',\
        value_deserializer=lambda x: x.decode('utf-8')\
    )\
    print("Listening for fraud detection results on topic 'gnn_fraud_output'...")\
    for message in consumer:\
        print("\\n--- Real-Time Fraud Alert ---")\
        print(message.value)\
        print("----------------------------")}
  5. Replace the contents the Morpheus pipeline script located in $PROJECT_DIR/Morpheus/examples/gnn_fraud_detection_pipeline/run.py with the Kafka-enabled version as follows:
    # File: $PROJECT_DIR/Morpheus/examples/gnn_fraud_detection_pipeline/run.py
    # Copyright (c) 2021-2025, NVIDIA CORPORATION.\
    #\
    # Licensed under the Apache License, Version 2.0 (the "License");\
    # you may not use this file except in compliance with the License.\
    # You may obtain a copy of the License at\
    #\
    #     http://www.apache.org/licenses/LICENSE-2.0\
    #\
    # Unless required by applicable law or agreed to in writing, software\
    # distributed under the License is distributed on an "AS IS" BASIS,\
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\
    # See the License for the specific language governing permissions and\
    # limitations under the License.\
    \
    import logging\
    import os\
    \
    import click\
    # pylint: disable=no-name-in-module\
    from stages.classification_stage import ClassificationStage\
    from stages.graph_construction_stage import FraudGraphConstructionStage\
    from stages.graph_sage_stage import GraphSAGEStage\
    \
    from morpheus.config import Config\
    from morpheus.config import PipelineModes\
    from morpheus.pipeline.linear_pipeline import LinearPipeline\
    from morpheus.stages.general.monitor_stage import MonitorStage\
    from morpheus.stages.input.kafka_source_stage import KafkaSourceStage\
    from morpheus.stages.output.write_to_kafka_stage import WriteToKafkaStage\
    from morpheus.stages.postprocess.serialize_stage import SerializeStage\
    from morpheus.stages.preprocess.deserialize_stage import DeserializeStage\
    from morpheus.utils.logger import configure_logging\
    \
    CUR_DIR = os.path.dirname(__file__)\
    \
    \
    @click.command()\
    @click.option(\
        "--num_threads",\
        default=len(os.sched_getaffinity(0)),\
        type=click.IntRange(min=1),\
        help="Number of internal pipeline threads to use.",\
    )\
    @click.option(\
        "--pipeline_batch_size",\
        default=1024,\
        type=click.IntRange(min=1),\
        help=("Internal batch size for the pipeline. Can be much larger than the model batch size. "\
              "Also used for Kafka consumers."),\
    )\
    @click.option(\
        "--model_max_batch_size",\
        default=32,\
        type=click.IntRange(min=1),\
        help="Max batch size to use for the model.",\
    )\
    @click.option(\
        "--model_fea_length",\
        default=70,\
        type=click.IntRange(min=1),\
        help="Features length to use for the model.",\
    )\
    @click.option(\
        "--bootstrap_server",\
        default="localhost:9092",\
        help="Kafka bootstrap server address.",\
    )\
    @click.option(\
        "--input_topic",\
        default="gnn_fraud_input",\
        help="Kafka topic to listen for input messages.",\
    )\
    @click.option(\
        "--output_topic",\
        default="gnn_fraud_output",\
        help="Kafka topic to publish output messages to.",\
    )\
    @click.option(\
        "--training_file",\
        type=click.Path(exists=True, readable=True, dir_okay=False),\
        default=os.path.join(CUR_DIR, "training.csv"),\
        required=True,\
        help="Training data filepath (used for graph context).",\
    )\
    @click.option(\
        "--model_dir",\
        type=click.Path(exists=True, readable=True, file_okay=False, dir_okay=True),\
        default=os.path.join(CUR_DIR, "model"),\
        required=True,\
        help="Path to trained Hinsage & XGB models.",\
    )\
    def run_pipeline(num_threads,\
                     pipeline_batch_size,\
                     model_max_batch_size,\
                     model_fea_length,\
                     bootstrap_server,\
                     input_topic,\
                     output_topic,\
                     training_file,\
                     model_dir):\
        # Enable the default logger.\
        configure_logging(log_level=logging.INFO)\
    \
        # Its necessary to get the global config object and configure it.\
        config = Config()\
        config.mode = PipelineModes.OTHER\
    \
        # Below properties are specified by the command line.\
        config.num_threads = num_threads\
        config.pipeline_batch_size = pipeline_batch_size\
        config.model_max_batch_size = model_max_batch_size\
        config.feature_length = model_fea_length\
    \
        config.class_labels = ["probs"]\
        config.edge_buffer_size = 4\
    \
        # Create a linear pipeline object.\
        pipeline = LinearPipeline(config)\
    \
        # Set source stage to KafkaSourceStage\
        # This stage reads messages from a Kafka topic.\
        pipeline.set_source(\
            KafkaSourceStage(config,\
                             bootstrap_servers=bootstrap_server,\
                             input_topic=input_topic,\
                             auto_offset_reset="earliest",\
                             poll_interval="1 seconds",\
                             stop_after=0)) # stop_after=0 runs indefinitely\
    \
        # Add a deserialize stage.\
        # At this stage, messages were logically partitioned based on the 'pipeline_batch_size'.\
        pipeline.add_stage(DeserializeStage(config))\
    \
        # Add the graph construction stage.\
        pipeline.add_stage(FraudGraphConstructionStage(config, training_file))\
        pipeline.add_stage(MonitorStage(config, description="Graph construction rate"))\
    \
        # Add a sage inference stage.\
        pipeline.add_stage(GraphSAGEStage(config, model_dir))\
        pipeline.add_stage(MonitorStage(config, description="Inference rate"))\
    \
        # Add classification stage.\
        # This stage adds detected classifications to each message.\
        pipeline.add_stage(ClassificationStage(config, os.path.join(model_dir, "xgb.pt")))\
        pipeline.add_stage(MonitorStage(config, description="Add classification rate"))\
    \
        # Add a serialize stage.\
        # This stage includes & excludes columns from messages.\
        pipeline.add_stage(SerializeStage(config))\
        pipeline.add_stage(MonitorStage(config, description="Serialize rate"))\
    \
        # Add a write to Kafka stage.\
        # This stage writes all messages to a Kafka topic.\
        pipeline.add_stage(\
            WriteToKafkaStage(config,\
                              bootstrap_servers=bootstrap_server,\
                              output_topic=output_topic))\
    \
        # Run the pipeline.\
        pipeline.run()\
    \
    \
    if __name__ == "__main__":\
        run_pipeline()}