設定

Morpheus 管線會顯示數個關鍵參數,供您根據特定的硬體與資料特性調整以最佳化效能。雖然您可以將預設值用於示範用途,但您必須瞭解這些參數以便在生產環境中進行部署。這些參數在 run.py 命令檔中會顯示為命令行引數。

--pipeline_batch_size :此參數控制當訊息在管線中各階段之間移動時,群組在一起以進行處理的訊息數目 (例如交易)。

--model_max_batch_size :定義傳送至 GNN 的記錄數目上限,以及用於推論單一要求的 XGBoost 模型。

--num_threads :這會設定 Morpheus 管線引擎用於協調和 I/O 的內部 CPU 繫線數目。

--edge_buffer_size :控制管線階段之間的邊緣儲存於記憶體中的資料量。可用於調整回壓和輸送量。

--log_level :設為 INFODEBUG 等,以監督調整期間的管線行為。

驗證 Morpheus 管線

請依照下列步驟,使用本機檔案進行輸入和輸出驗證和執行管線:

  1. 使用下列命令複製並調整 Morpheus 儲存區域:
    # Navigate to your project directory
    cd $PROJECT_DIR
    # Clone the repository
    git clone https://github.com/nv-morpheus/Morpheus.git
    # 1. Navigate to the mounted code directory 
    cd Morpheus
    # BEST PRACTICE: Check out the version tag matching the Docker container
    git checkout v25.02.00
    # Download large model and data files using Git LFS
    git lfs install
    git lfs pull

    警示:

    使用與 Docker 容器版本相符的 Morpheus 程式碼版本,以避免相依性衝突。
  2. 完成後,您可以確認已建立輸出。
    # --- INSIDE THE CONTAINER ---
    head .tmp/output/gnn_fraud_detection_output.csv
  3. 結束容器。

使用 Kafka 設定即時串流

現在,您可以將架構發展成生產型串流管線。

請依照下列步驟設定 Kafka 環境並執行服務:

  1. 在您的主機機器上,使用下列命令瀏覽至您的專案根目錄:
    cd $PROJECT_DIR
    # File: <your-project-directory>/docker-compose.yml
  2. 建立包含下列內容的 docker-compose.yml 檔案,以定義 KafkaApache Zookeeper 服務。
    version: '3'
    services:
     zookeeper:
        image: confluentinc/cp-zookeeper:7.0.1
        container_name: zookeeper   
        environment:
          ZOOKEEPER_CLIENT_PORT: 2181
          ZOOKEEPER_TICK_TIME: 2000
     kafka:
        image: confluentinc/cp-kafka:7.0.1
        container_name: kafka
        depends_on:
          - zookeeper
        ports:      -
          "9092:9092"
        environment:
          KAFKA_BROKER_ID: 1     
          KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092     
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1     
          KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
  3. 啟動 Kafka 服務並建立主題。
    # --- ON THE HOST MACHINE ---# 
    Clean up any old containers from previous attempts 
    docker compose down --volumes--remove-orphans 
    # Start Kafka and Zookeeper in the background docker compose up -d
    # Create the input and output topics
    docker exec kafka kafka-topics --create --topic gnn_fraud_input --bootstrap-server localhost:9092 
    docker exec kafka kafka-topics --create --topic gnn_fraud_output --bootstrap-server localhost:9092

建立 Kafka 說明文件

設定主機 Python 環境,以避免主機機器上出現 Python 套裝軟體衝突。

# --- ON THE HOST MACHINE ---
# Navigate to your project directory (e.g., ~/morpheus_fraud_detection) 
cd $PROJECT_DIR# Create an isolated Python virtual environment 
python3 -m venv kafka_env 
# Activate the environment 
source kafka_env/bin/activate

您的終端機提示現在前面會加上 (kafka_env)

  1. 在主機上 (虛擬環境旁) 安裝必要的 Python 程式庫:
    # Ensure the (kafka_env) is active
    pip install kafka-python
  2. 在您的主機機器上,在您的專案目錄中建立兩個 Python 命令檔以產生資料並使用結果。
  3. 使用下列命令檔建立產生器檔案:
    # File: $PROJECT_DIR/producer.py\
    import csv\
    from kafka import KafkaProducer\
    import time\
    \
    producer = KafkaProducer(\
        bootstrap_servers='localhost:9092',\
        value_serializer=lambda v: v.encode('utf-8')\
    )\
    csv_file_path = './Morpheus/examples/gnn_fraud_detection_pipeline/validation.csv'\
    input_topic = 'gnn_fraud_input'\
    print(f"Streaming data from \{csv_file_path\} to Kafka topic '\{input_topic\}'...")\
    with open(csv_file_path, 'r') as file:\
        header = next(file)\
        reader = csv.reader(file)\
        for row in reader:\
            message = header.strip() + '\\n' + ','.join(row).strip()\
            producer.send(input_topic, value=message)\
            print(f"Sent transaction index: \{row[0]\}")\
            time.sleep(0.2)\
    producer.flush()\
    print("Finished streaming data.")}
  4. 使用下列命令檔建立用戶檔案:
    # File: $PROJECT_DIR/consumer.py\
    from kafka import KafkaConsumer\
    \
    consumer = KafkaConsumer(\
        'gnn_fraud_output',\
        bootstrap_servers='localhost:9092',\
        auto_offset_reset='earliest',\
        group_id='fraud-demo-consumer',\
        value_deserializer=lambda x: x.decode('utf-8')\
    )\
    print("Listening for fraud detection results on topic 'gnn_fraud_output'...")\
    for message in consumer:\
        print("\\n--- Real-Time Fraud Alert ---")\
        print(message.value)\
        print("----------------------------")}
  5. 將位於 $PROJECT_DIR/Morpheus/examples/gnn_fraud_detection_pipeline/run.py 的 Morpheus 管線命令檔內容取代為啟用 Kafka 的版本,如下所示:
    # File: $PROJECT_DIR/Morpheus/examples/gnn_fraud_detection_pipeline/run.py
    # Copyright (c) 2021-2025, NVIDIA CORPORATION.\
    #\
    # Licensed under the Apache License, Version 2.0 (the "License");\
    # you may not use this file except in compliance with the License.\
    # You may obtain a copy of the License at\
    #\
    #     http://www.apache.org/licenses/LICENSE-2.0\
    #\
    # Unless required by applicable law or agreed to in writing, software\
    # distributed under the License is distributed on an "AS IS" BASIS,\
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\
    # See the License for the specific language governing permissions and\
    # limitations under the License.\
    \
    import logging\
    import os\
    \
    import click\
    # pylint: disable=no-name-in-module\
    from stages.classification_stage import ClassificationStage\
    from stages.graph_construction_stage import FraudGraphConstructionStage\
    from stages.graph_sage_stage import GraphSAGEStage\
    \
    from morpheus.config import Config\
    from morpheus.config import PipelineModes\
    from morpheus.pipeline.linear_pipeline import LinearPipeline\
    from morpheus.stages.general.monitor_stage import MonitorStage\
    from morpheus.stages.input.kafka_source_stage import KafkaSourceStage\
    from morpheus.stages.output.write_to_kafka_stage import WriteToKafkaStage\
    from morpheus.stages.postprocess.serialize_stage import SerializeStage\
    from morpheus.stages.preprocess.deserialize_stage import DeserializeStage\
    from morpheus.utils.logger import configure_logging\
    \
    CUR_DIR = os.path.dirname(__file__)\
    \
    \
    @click.command()\
    @click.option(\
        "--num_threads",\
        default=len(os.sched_getaffinity(0)),\
        type=click.IntRange(min=1),\
        help="Number of internal pipeline threads to use.",\
    )\
    @click.option(\
        "--pipeline_batch_size",\
        default=1024,\
        type=click.IntRange(min=1),\
        help=("Internal batch size for the pipeline. Can be much larger than the model batch size. "\
              "Also used for Kafka consumers."),\
    )\
    @click.option(\
        "--model_max_batch_size",\
        default=32,\
        type=click.IntRange(min=1),\
        help="Max batch size to use for the model.",\
    )\
    @click.option(\
        "--model_fea_length",\
        default=70,\
        type=click.IntRange(min=1),\
        help="Features length to use for the model.",\
    )\
    @click.option(\
        "--bootstrap_server",\
        default="localhost:9092",\
        help="Kafka bootstrap server address.",\
    )\
    @click.option(\
        "--input_topic",\
        default="gnn_fraud_input",\
        help="Kafka topic to listen for input messages.",\
    )\
    @click.option(\
        "--output_topic",\
        default="gnn_fraud_output",\
        help="Kafka topic to publish output messages to.",\
    )\
    @click.option(\
        "--training_file",\
        type=click.Path(exists=True, readable=True, dir_okay=False),\
        default=os.path.join(CUR_DIR, "training.csv"),\
        required=True,\
        help="Training data filepath (used for graph context).",\
    )\
    @click.option(\
        "--model_dir",\
        type=click.Path(exists=True, readable=True, file_okay=False, dir_okay=True),\
        default=os.path.join(CUR_DIR, "model"),\
        required=True,\
        help="Path to trained Hinsage & XGB models.",\
    )\
    def run_pipeline(num_threads,\
                     pipeline_batch_size,\
                     model_max_batch_size,\
                     model_fea_length,\
                     bootstrap_server,\
                     input_topic,\
                     output_topic,\
                     training_file,\
                     model_dir):\
        # Enable the default logger.\
        configure_logging(log_level=logging.INFO)\
    \
        # Its necessary to get the global config object and configure it.\
        config = Config()\
        config.mode = PipelineModes.OTHER\
    \
        # Below properties are specified by the command line.\
        config.num_threads = num_threads\
        config.pipeline_batch_size = pipeline_batch_size\
        config.model_max_batch_size = model_max_batch_size\
        config.feature_length = model_fea_length\
    \
        config.class_labels = ["probs"]\
        config.edge_buffer_size = 4\
    \
        # Create a linear pipeline object.\
        pipeline = LinearPipeline(config)\
    \
        # Set source stage to KafkaSourceStage\
        # This stage reads messages from a Kafka topic.\
        pipeline.set_source(\
            KafkaSourceStage(config,\
                             bootstrap_servers=bootstrap_server,\
                             input_topic=input_topic,\
                             auto_offset_reset="earliest",\
                             poll_interval="1 seconds",\
                             stop_after=0)) # stop_after=0 runs indefinitely\
    \
        # Add a deserialize stage.\
        # At this stage, messages were logically partitioned based on the 'pipeline_batch_size'.\
        pipeline.add_stage(DeserializeStage(config))\
    \
        # Add the graph construction stage.\
        pipeline.add_stage(FraudGraphConstructionStage(config, training_file))\
        pipeline.add_stage(MonitorStage(config, description="Graph construction rate"))\
    \
        # Add a sage inference stage.\
        pipeline.add_stage(GraphSAGEStage(config, model_dir))\
        pipeline.add_stage(MonitorStage(config, description="Inference rate"))\
    \
        # Add classification stage.\
        # This stage adds detected classifications to each message.\
        pipeline.add_stage(ClassificationStage(config, os.path.join(model_dir, "xgb.pt")))\
        pipeline.add_stage(MonitorStage(config, description="Add classification rate"))\
    \
        # Add a serialize stage.\
        # This stage includes & excludes columns from messages.\
        pipeline.add_stage(SerializeStage(config))\
        pipeline.add_stage(MonitorStage(config, description="Serialize rate"))\
    \
        # Add a write to Kafka stage.\
        # This stage writes all messages to a Kafka topic.\
        pipeline.add_stage(\
            WriteToKafkaStage(config,\
                              bootstrap_servers=bootstrap_server,\
                              output_topic=output_topic))\
    \
        # Run the pipeline.\
        pipeline.run()\
    \
    \
    if __name__ == "__main__":\
        run_pipeline()}