Configurar

O pipeline Morpheus expõe vários parâmetros-chave que você pode ajustar para otimizar o desempenho com base nas características específicas de hardware e dados. Embora seja possível usar os valores padrão para fins de demonstração, você deve entender esses parâmetros para implantação em ambientes de produção. Esses parâmetros são expostos como argumentos de linha de comando no script run.py.

--pipeline_batch_size: Esse parâmetro controla o número de mensagens (como transações) que são agrupadas para processamento à medida que se movem entre estágios no pipeline.

--model_max_batch_size: define o número máximo de registros enviados para a GNN e os modelos XGBoost para inferência em uma única solicitação.

--num_threads: Define o número de threads internos de CPU usados pelo mecanismo de pipeline Morpheus para orquestração e E/S.

--edge_buffer_size: Controla o volume de dados armazenados na memória nas bordas entre os estágios do pipeline. Útil para ajustar a contrapressão e o throughput.

--log_level: Defina como INFO, DEBUG etc. para monitorar o comportamento do pipeline durante o ajuste.

Validar Pipeline de Morpheus

Siga estas etapas para validar e executar o pipeline usando arquivos locais para entrada e saída:

  1. Clone e alinhe o repositório Morpheus usando os seguintes comandos:
    # Navigate to your project directory
    cd $PROJECT_DIR
    # Clone the repository
    git clone https://github.com/nv-morpheus/Morpheus.git
    # 1. Navigate to the mounted code directory 
    cd Morpheus
    # BEST PRACTICE: Check out the version tag matching the Docker container
    git checkout v25.02.00
    # Download large model and data files using Git LFS
    git lfs install
    git lfs pull

    Atenção:

    Use a versão do código Morpheus que corresponde à versão do contêiner do Docker para evitar conflitos de dependência.
  2. Após a conclusão, você poderá verificar se a saída foi criada.
    # --- INSIDE THE CONTAINER ---
    head .tmp/output/gnn_fraud_detection_output.csv
  3. Saia do contêiner.

Configurar o Streaming em Tempo Real com o Kafka

Agora você pode evoluir a arquitetura para um pipeline de streaming no estilo de produção.

Siga estas etapas para configurar o ambiente Kafka e executar os serviços:

  1. Na máquina host, use o seguinte comando para navegar até o diretório raiz do projeto:
    cd $PROJECT_DIR
    # File: <your-project-directory>/docker-compose.yml
  2. Crie um arquivo docker-compose.yml com o conteúdo a seguir para definir os serviços Kafka e Apache Zookeeper.
    version: '3'
    services:
     zookeeper:
        image: confluentinc/cp-zookeeper:7.0.1
        container_name: zookeeper   
        environment:
          ZOOKEEPER_CLIENT_PORT: 2181
          ZOOKEEPER_TICK_TIME: 2000
     kafka:
        image: confluentinc/cp-kafka:7.0.1
        container_name: kafka
        depends_on:
          - zookeeper
        ports:      -
          "9092:9092"
        environment:
          KAFKA_BROKER_ID: 1     
          KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092     
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1     
          KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
  3. Inicie os serviços do Kafka e crie os tópicos.
    # --- ON THE HOST MACHINE ---# 
    Clean up any old containers from previous attempts 
    docker compose down --volumes--remove-orphans 
    # Start Kafka and Zookeeper in the background docker compose up -d
    # Create the input and output topics
    docker exec kafka kafka-topics --create --topic gnn_fraud_input --bootstrap-server localhost:9092 
    docker exec kafka kafka-topics --create --topic gnn_fraud_output --bootstrap-server localhost:9092

Criar Scripts do Auxiliar do Kafka

Configure o ambiente Python do host para evitar conflitos de pacote Python na sua máquina host.

# --- ON THE HOST MACHINE ---
# Navigate to your project directory (e.g., ~/morpheus_fraud_detection) 
cd $PROJECT_DIR# Create an isolated Python virtual environment 
python3 -m venv kafka_env 
# Activate the environment 
source kafka_env/bin/activate

Seu prompt de terminal agora será prefixado com (kafka_env).

  1. Instale a biblioteca Python necessária no seu host (dentro do ambiente virtual):
    # Ensure the (kafka_env) is active
    pip install kafka-python
  2. Na sua máquina host, crie dois scripts Python no diretório de projeto para produzir dados e consumir os resultados.
  3. Crie o arquivo do produtor usando o seguinte script:
    # File: $PROJECT_DIR/producer.py\
    import csv\
    from kafka import KafkaProducer\
    import time\
    \
    producer = KafkaProducer(\
        bootstrap_servers='localhost:9092',\
        value_serializer=lambda v: v.encode('utf-8')\
    )\
    csv_file_path = './Morpheus/examples/gnn_fraud_detection_pipeline/validation.csv'\
    input_topic = 'gnn_fraud_input'\
    print(f"Streaming data from \{csv_file_path\} to Kafka topic '\{input_topic\}'...")\
    with open(csv_file_path, 'r') as file:\
        header = next(file)\
        reader = csv.reader(file)\
        for row in reader:\
            message = header.strip() + '\\n' + ','.join(row).strip()\
            producer.send(input_topic, value=message)\
            print(f"Sent transaction index: \{row[0]\}")\
            time.sleep(0.2)\
    producer.flush()\
    print("Finished streaming data.")}
  4. Crie o arquivo do consumidor usando o seguinte script:
    # File: $PROJECT_DIR/consumer.py\
    from kafka import KafkaConsumer\
    \
    consumer = KafkaConsumer(\
        'gnn_fraud_output',\
        bootstrap_servers='localhost:9092',\
        auto_offset_reset='earliest',\
        group_id='fraud-demo-consumer',\
        value_deserializer=lambda x: x.decode('utf-8')\
    )\
    print("Listening for fraud detection results on topic 'gnn_fraud_output'...")\
    for message in consumer:\
        print("\\n--- Real-Time Fraud Alert ---")\
        print(message.value)\
        print("----------------------------")}
  5. Substitua o conteúdo do script do pipeline Morpheus localizado em $PROJECT_DIR/Morpheus/examples/gnn_fraud_detection_pipeline/run.py pela versão habilitada para Kafka da seguinte forma:
    # File: $PROJECT_DIR/Morpheus/examples/gnn_fraud_detection_pipeline/run.py
    # Copyright (c) 2021-2025, NVIDIA CORPORATION.\
    #\
    # Licensed under the Apache License, Version 2.0 (the "License");\
    # you may not use this file except in compliance with the License.\
    # You may obtain a copy of the License at\
    #\
    #     http://www.apache.org/licenses/LICENSE-2.0\
    #\
    # Unless required by applicable law or agreed to in writing, software\
    # distributed under the License is distributed on an "AS IS" BASIS,\
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\
    # See the License for the specific language governing permissions and\
    # limitations under the License.\
    \
    import logging\
    import os\
    \
    import click\
    # pylint: disable=no-name-in-module\
    from stages.classification_stage import ClassificationStage\
    from stages.graph_construction_stage import FraudGraphConstructionStage\
    from stages.graph_sage_stage import GraphSAGEStage\
    \
    from morpheus.config import Config\
    from morpheus.config import PipelineModes\
    from morpheus.pipeline.linear_pipeline import LinearPipeline\
    from morpheus.stages.general.monitor_stage import MonitorStage\
    from morpheus.stages.input.kafka_source_stage import KafkaSourceStage\
    from morpheus.stages.output.write_to_kafka_stage import WriteToKafkaStage\
    from morpheus.stages.postprocess.serialize_stage import SerializeStage\
    from morpheus.stages.preprocess.deserialize_stage import DeserializeStage\
    from morpheus.utils.logger import configure_logging\
    \
    CUR_DIR = os.path.dirname(__file__)\
    \
    \
    @click.command()\
    @click.option(\
        "--num_threads",\
        default=len(os.sched_getaffinity(0)),\
        type=click.IntRange(min=1),\
        help="Number of internal pipeline threads to use.",\
    )\
    @click.option(\
        "--pipeline_batch_size",\
        default=1024,\
        type=click.IntRange(min=1),\
        help=("Internal batch size for the pipeline. Can be much larger than the model batch size. "\
              "Also used for Kafka consumers."),\
    )\
    @click.option(\
        "--model_max_batch_size",\
        default=32,\
        type=click.IntRange(min=1),\
        help="Max batch size to use for the model.",\
    )\
    @click.option(\
        "--model_fea_length",\
        default=70,\
        type=click.IntRange(min=1),\
        help="Features length to use for the model.",\
    )\
    @click.option(\
        "--bootstrap_server",\
        default="localhost:9092",\
        help="Kafka bootstrap server address.",\
    )\
    @click.option(\
        "--input_topic",\
        default="gnn_fraud_input",\
        help="Kafka topic to listen for input messages.",\
    )\
    @click.option(\
        "--output_topic",\
        default="gnn_fraud_output",\
        help="Kafka topic to publish output messages to.",\
    )\
    @click.option(\
        "--training_file",\
        type=click.Path(exists=True, readable=True, dir_okay=False),\
        default=os.path.join(CUR_DIR, "training.csv"),\
        required=True,\
        help="Training data filepath (used for graph context).",\
    )\
    @click.option(\
        "--model_dir",\
        type=click.Path(exists=True, readable=True, file_okay=False, dir_okay=True),\
        default=os.path.join(CUR_DIR, "model"),\
        required=True,\
        help="Path to trained Hinsage & XGB models.",\
    )\
    def run_pipeline(num_threads,\
                     pipeline_batch_size,\
                     model_max_batch_size,\
                     model_fea_length,\
                     bootstrap_server,\
                     input_topic,\
                     output_topic,\
                     training_file,\
                     model_dir):\
        # Enable the default logger.\
        configure_logging(log_level=logging.INFO)\
    \
        # Its necessary to get the global config object and configure it.\
        config = Config()\
        config.mode = PipelineModes.OTHER\
    \
        # Below properties are specified by the command line.\
        config.num_threads = num_threads\
        config.pipeline_batch_size = pipeline_batch_size\
        config.model_max_batch_size = model_max_batch_size\
        config.feature_length = model_fea_length\
    \
        config.class_labels = ["probs"]\
        config.edge_buffer_size = 4\
    \
        # Create a linear pipeline object.\
        pipeline = LinearPipeline(config)\
    \
        # Set source stage to KafkaSourceStage\
        # This stage reads messages from a Kafka topic.\
        pipeline.set_source(\
            KafkaSourceStage(config,\
                             bootstrap_servers=bootstrap_server,\
                             input_topic=input_topic,\
                             auto_offset_reset="earliest",\
                             poll_interval="1 seconds",\
                             stop_after=0)) # stop_after=0 runs indefinitely\
    \
        # Add a deserialize stage.\
        # At this stage, messages were logically partitioned based on the 'pipeline_batch_size'.\
        pipeline.add_stage(DeserializeStage(config))\
    \
        # Add the graph construction stage.\
        pipeline.add_stage(FraudGraphConstructionStage(config, training_file))\
        pipeline.add_stage(MonitorStage(config, description="Graph construction rate"))\
    \
        # Add a sage inference stage.\
        pipeline.add_stage(GraphSAGEStage(config, model_dir))\
        pipeline.add_stage(MonitorStage(config, description="Inference rate"))\
    \
        # Add classification stage.\
        # This stage adds detected classifications to each message.\
        pipeline.add_stage(ClassificationStage(config, os.path.join(model_dir, "xgb.pt")))\
        pipeline.add_stage(MonitorStage(config, description="Add classification rate"))\
    \
        # Add a serialize stage.\
        # This stage includes & excludes columns from messages.\
        pipeline.add_stage(SerializeStage(config))\
        pipeline.add_stage(MonitorStage(config, description="Serialize rate"))\
    \
        # Add a write to Kafka stage.\
        # This stage writes all messages to a Kafka topic.\
        pipeline.add_stage(\
            WriteToKafkaStage(config,\
                              bootstrap_servers=bootstrap_server,\
                              output_topic=output_topic))\
    \
        # Run the pipeline.\
        pipeline.run()\
    \
    \
    if __name__ == "__main__":\
        run_pipeline()}