Configurer

Le pipeline Morpheus expose plusieurs paramètres clés que vous pouvez régler pour optimiser les performances en fonction des caractéristiques spécifiques du matériel et des données. Bien que vous puissiez utiliser les valeurs par défaut à des fins de démonstration, vous devez comprendre ces paramètres pour le déploiement dans des environnements de production. Ces paramètres sont présentés en tant qu'arguments de ligne de commande dans le script run.py.

--pipeline_batch_size : ce paramètre contrôle le nombre de messages (tels que les transactions) qui sont regroupés pour traitement lorsqu'ils se déplacent entre les étapes du pipeline.

--model_max_batch_size : définit le nombre maximum d'enregistrements envoyés aux modèles GNN et XGBoost pour inférence dans une seule demande.

--num_threads : définit le nombre de threads d'UC internes utilisés par le moteur de pipeline Morpheus pour l'orchestration et les E/S.

--edge_buffer_size : contrôle la quantité de données stockées en mémoire dans des arêtes entre les étapes de pipeline. Utile pour régler la contre-pression et le débit.

--log_level : définissez cette option sur INFO, DEBUG, etc. pour surveiller le comportement du pipeline lors du réglage.

Valider le pipeline Morpheus

Pour valider et exécuter le pipeline à l'aide de fichiers locaux pour les entrées et les sorties, procédez comme suit :

  1. Clonez et alignez le référentiel Morpheus à l'aide des commandes suivantes :
    # Navigate to your project directory
    cd $PROJECT_DIR
    # Clone the repository
    git clone https://github.com/nv-morpheus/Morpheus.git
    # 1. Navigate to the mounted code directory 
    cd Morpheus
    # BEST PRACTICE: Check out the version tag matching the Docker container
    git checkout v25.02.00
    # Download large model and data files using Git LFS
    git lfs install
    git lfs pull

    Attention :

    Utilisez la version du code Morpheus qui correspond à la version du conteneur Docker pour éviter les conflits de dépendance.
  2. Une fois l'opération terminée, vous pouvez vérifier que la sortie a été créée.
    # --- INSIDE THE CONTAINER ---
    head .tmp/output/gnn_fraud_detection_output.csv
  3. Quittez le conteneur.

Configuration de Streaming en temps réel avec Kafka

Vous pouvez désormais faire évoluer l'architecture vers un pipeline de diffusion en continu de type production.

Pour configurer l'environnement Kafka et exécuter les services, procédez comme suit :

  1. Sur l'ordinateur hôte, utilisez la commande suivante pour accéder au répertoire racine du projet :
    cd $PROJECT_DIR
    # File: <your-project-directory>/docker-compose.yml
  2. Créez un fichier docker-compose.yml avec le contenu suivant pour définir les services Kafka et Apache Zookeeper.
    version: '3'
    services:
     zookeeper:
        image: confluentinc/cp-zookeeper:7.0.1
        container_name: zookeeper   
        environment:
          ZOOKEEPER_CLIENT_PORT: 2181
          ZOOKEEPER_TICK_TIME: 2000
     kafka:
        image: confluentinc/cp-kafka:7.0.1
        container_name: kafka
        depends_on:
          - zookeeper
        ports:      -
          "9092:9092"
        environment:
          KAFKA_BROKER_ID: 1     
          KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092     
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1     
          KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
  3. Démarrez les services Kafka et créez les rubriques.
    # --- ON THE HOST MACHINE ---# 
    Clean up any old containers from previous attempts 
    docker compose down --volumes--remove-orphans 
    # Start Kafka and Zookeeper in the background docker compose up -d
    # Create the input and output topics
    docker exec kafka kafka-topics --create --topic gnn_fraud_input --bootstrap-server localhost:9092 
    docker exec kafka kafka-topics --create --topic gnn_fraud_output --bootstrap-server localhost:9092

Création de scripts d'aide Kafka

Configurez l'environnement Python de l'hôte afin d'éviter les conflits de package Python sur votre machine hôte.

# --- ON THE HOST MACHINE ---
# Navigate to your project directory (e.g., ~/morpheus_fraud_detection) 
cd $PROJECT_DIR# Create an isolated Python virtual environment 
python3 -m venv kafka_env 
# Activate the environment 
source kafka_env/bin/activate

L'invite de terminal est désormais précédée de (kafka_env).

  1. Installez la bibliothèque Python requise sur l'hôte (dans l'environnement virtuel) :
    # Ensure the (kafka_env) is active
    pip install kafka-python
  2. Sur l'ordinateur hôte, créez deux scripts Python dans le répertoire de projet pour produire des données et utiliser les résultats.
  3. Créez le fichier du producteur à l'aide du script suivant :
    # File: $PROJECT_DIR/producer.py\
    import csv\
    from kafka import KafkaProducer\
    import time\
    \
    producer = KafkaProducer(\
        bootstrap_servers='localhost:9092',\
        value_serializer=lambda v: v.encode('utf-8')\
    )\
    csv_file_path = './Morpheus/examples/gnn_fraud_detection_pipeline/validation.csv'\
    input_topic = 'gnn_fraud_input'\
    print(f"Streaming data from \{csv_file_path\} to Kafka topic '\{input_topic\}'...")\
    with open(csv_file_path, 'r') as file:\
        header = next(file)\
        reader = csv.reader(file)\
        for row in reader:\
            message = header.strip() + '\\n' + ','.join(row).strip()\
            producer.send(input_topic, value=message)\
            print(f"Sent transaction index: \{row[0]\}")\
            time.sleep(0.2)\
    producer.flush()\
    print("Finished streaming data.")}
  4. Créez le fichier de consommateur à l'aide du script suivant :
    # File: $PROJECT_DIR/consumer.py\
    from kafka import KafkaConsumer\
    \
    consumer = KafkaConsumer(\
        'gnn_fraud_output',\
        bootstrap_servers='localhost:9092',\
        auto_offset_reset='earliest',\
        group_id='fraud-demo-consumer',\
        value_deserializer=lambda x: x.decode('utf-8')\
    )\
    print("Listening for fraud detection results on topic 'gnn_fraud_output'...")\
    for message in consumer:\
        print("\\n--- Real-Time Fraud Alert ---")\
        print(message.value)\
        print("----------------------------")}
  5. Remplacez le contenu du script de pipeline Morpheus situé dans $PROJECT_DIR/Morpheus/examples/gnn_fraud_detection_pipeline/run.py par la version activée pour Kafka comme suit :
    # File: $PROJECT_DIR/Morpheus/examples/gnn_fraud_detection_pipeline/run.py
    # Copyright (c) 2021-2025, NVIDIA CORPORATION.\
    #\
    # Licensed under the Apache License, Version 2.0 (the "License");\
    # you may not use this file except in compliance with the License.\
    # You may obtain a copy of the License at\
    #\
    #     http://www.apache.org/licenses/LICENSE-2.0\
    #\
    # Unless required by applicable law or agreed to in writing, software\
    # distributed under the License is distributed on an "AS IS" BASIS,\
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\
    # See the License for the specific language governing permissions and\
    # limitations under the License.\
    \
    import logging\
    import os\
    \
    import click\
    # pylint: disable=no-name-in-module\
    from stages.classification_stage import ClassificationStage\
    from stages.graph_construction_stage import FraudGraphConstructionStage\
    from stages.graph_sage_stage import GraphSAGEStage\
    \
    from morpheus.config import Config\
    from morpheus.config import PipelineModes\
    from morpheus.pipeline.linear_pipeline import LinearPipeline\
    from morpheus.stages.general.monitor_stage import MonitorStage\
    from morpheus.stages.input.kafka_source_stage import KafkaSourceStage\
    from morpheus.stages.output.write_to_kafka_stage import WriteToKafkaStage\
    from morpheus.stages.postprocess.serialize_stage import SerializeStage\
    from morpheus.stages.preprocess.deserialize_stage import DeserializeStage\
    from morpheus.utils.logger import configure_logging\
    \
    CUR_DIR = os.path.dirname(__file__)\
    \
    \
    @click.command()\
    @click.option(\
        "--num_threads",\
        default=len(os.sched_getaffinity(0)),\
        type=click.IntRange(min=1),\
        help="Number of internal pipeline threads to use.",\
    )\
    @click.option(\
        "--pipeline_batch_size",\
        default=1024,\
        type=click.IntRange(min=1),\
        help=("Internal batch size for the pipeline. Can be much larger than the model batch size. "\
              "Also used for Kafka consumers."),\
    )\
    @click.option(\
        "--model_max_batch_size",\
        default=32,\
        type=click.IntRange(min=1),\
        help="Max batch size to use for the model.",\
    )\
    @click.option(\
        "--model_fea_length",\
        default=70,\
        type=click.IntRange(min=1),\
        help="Features length to use for the model.",\
    )\
    @click.option(\
        "--bootstrap_server",\
        default="localhost:9092",\
        help="Kafka bootstrap server address.",\
    )\
    @click.option(\
        "--input_topic",\
        default="gnn_fraud_input",\
        help="Kafka topic to listen for input messages.",\
    )\
    @click.option(\
        "--output_topic",\
        default="gnn_fraud_output",\
        help="Kafka topic to publish output messages to.",\
    )\
    @click.option(\
        "--training_file",\
        type=click.Path(exists=True, readable=True, dir_okay=False),\
        default=os.path.join(CUR_DIR, "training.csv"),\
        required=True,\
        help="Training data filepath (used for graph context).",\
    )\
    @click.option(\
        "--model_dir",\
        type=click.Path(exists=True, readable=True, file_okay=False, dir_okay=True),\
        default=os.path.join(CUR_DIR, "model"),\
        required=True,\
        help="Path to trained Hinsage & XGB models.",\
    )\
    def run_pipeline(num_threads,\
                     pipeline_batch_size,\
                     model_max_batch_size,\
                     model_fea_length,\
                     bootstrap_server,\
                     input_topic,\
                     output_topic,\
                     training_file,\
                     model_dir):\
        # Enable the default logger.\
        configure_logging(log_level=logging.INFO)\
    \
        # Its necessary to get the global config object and configure it.\
        config = Config()\
        config.mode = PipelineModes.OTHER\
    \
        # Below properties are specified by the command line.\
        config.num_threads = num_threads\
        config.pipeline_batch_size = pipeline_batch_size\
        config.model_max_batch_size = model_max_batch_size\
        config.feature_length = model_fea_length\
    \
        config.class_labels = ["probs"]\
        config.edge_buffer_size = 4\
    \
        # Create a linear pipeline object.\
        pipeline = LinearPipeline(config)\
    \
        # Set source stage to KafkaSourceStage\
        # This stage reads messages from a Kafka topic.\
        pipeline.set_source(\
            KafkaSourceStage(config,\
                             bootstrap_servers=bootstrap_server,\
                             input_topic=input_topic,\
                             auto_offset_reset="earliest",\
                             poll_interval="1 seconds",\
                             stop_after=0)) # stop_after=0 runs indefinitely\
    \
        # Add a deserialize stage.\
        # At this stage, messages were logically partitioned based on the 'pipeline_batch_size'.\
        pipeline.add_stage(DeserializeStage(config))\
    \
        # Add the graph construction stage.\
        pipeline.add_stage(FraudGraphConstructionStage(config, training_file))\
        pipeline.add_stage(MonitorStage(config, description="Graph construction rate"))\
    \
        # Add a sage inference stage.\
        pipeline.add_stage(GraphSAGEStage(config, model_dir))\
        pipeline.add_stage(MonitorStage(config, description="Inference rate"))\
    \
        # Add classification stage.\
        # This stage adds detected classifications to each message.\
        pipeline.add_stage(ClassificationStage(config, os.path.join(model_dir, "xgb.pt")))\
        pipeline.add_stage(MonitorStage(config, description="Add classification rate"))\
    \
        # Add a serialize stage.\
        # This stage includes & excludes columns from messages.\
        pipeline.add_stage(SerializeStage(config))\
        pipeline.add_stage(MonitorStage(config, description="Serialize rate"))\
    \
        # Add a write to Kafka stage.\
        # This stage writes all messages to a Kafka topic.\
        pipeline.add_stage(\
            WriteToKafkaStage(config,\
                              bootstrap_servers=bootstrap_server,\
                              output_topic=output_topic))\
    \
        # Run the pipeline.\
        pipeline.run()\
    \
    \
    if __name__ == "__main__":\
        run_pipeline()}