구성

Morpheus 파이프라인은 특정 하드웨어 및 데이터 특성에 따라 성능을 최적화하기 위해 조정할 수 있는 몇 가지 주요 매개변수를 노출합니다. 데모용으로 기본값을 사용할 수 있지만 운용 환경에 배치하기 위해 이러한 매개변수를 이해해야 합니다. 이러한 매개변수는 run.py 스크립트에서 명령행 인수로 표시됩니다.

--pipeline_batch_size: 이 매개변수는 파이프라인의 단계 간에 이동할 때 처리를 위해 함께 그룹화된 메시지 수(예: 트랜잭션)를 제어합니다.

--model_max_batch_size: 단일 요청에서 추론을 위해 GNN 및 XGBoost 모델로 전송되는 최대 레코드 수를 정의합니다.

--num_threads: 조정 및 I/O에 대해 Morpheus 파이프라인 엔진이 사용하는 내부 CPU 스레드 수를 설정합니다.

--edge_buffer_size: 파이프라인 단계 간 가장자리에 메모리에 저장되는 데이터의 양을 제어합니다. 역압 및 처리량을 조정하는 데 유용합니다.

--log_level: 조정 중 파이프라인 동작을 모니터하려면 INFO, DEBUG 등으로 설정합니다.

Morpheus 파이프라인 검증

다음 단계에 따라 입력 및 출력에 대해 로컬 파일을 사용하여 파이프라인을 검증하고 실행합니다.

  1. 다음 명령을 사용하여 Morpheus 저장소를 복제하고 정렬합니다.
    # Navigate to your project directory
    cd $PROJECT_DIR
    # Clone the repository
    git clone https://github.com/nv-morpheus/Morpheus.git
    # 1. Navigate to the mounted code directory 
    cd Morpheus
    # BEST PRACTICE: Check out the version tag matching the Docker container
    git checkout v25.02.00
    # Download large model and data files using Git LFS
    git lfs install
    git lfs pull

    주의:

    종속성 충돌을 방지하려면 Docker 컨테이너의 버전과 일치하는 Morpheus 코드 버전을 사용합니다.
  2. 완료 후 출력이 생성되었는지 확인할 수 있습니다.
    # --- INSIDE THE CONTAINER ---
    head .tmp/output/gnn_fraud_detection_output.csv
  3. 컨테이너를 종료합니다.

Kafka를 통해 실시간 스트리밍 설정

이제 아키텍처를 프로덕션 스타일의 스트리밍 파이프라인으로 발전시킬 수 있습니다.

다음 단계에 따라 Kafka 환경을 설정하고 서비스를 실행합니다.

  1. 호스트 시스템에서 다음 명령을 사용하여 프로젝트 루트 디렉토리로 이동합니다.
    cd $PROJECT_DIR
    # File: <your-project-directory>/docker-compose.yml
  2. 다음 콘텐츠로 docker-compose.yml 파일을 생성하여 KafkaApache Zookeeper 서비스를 정의합니다.
    version: '3'
    services:
     zookeeper:
        image: confluentinc/cp-zookeeper:7.0.1
        container_name: zookeeper   
        environment:
          ZOOKEEPER_CLIENT_PORT: 2181
          ZOOKEEPER_TICK_TIME: 2000
     kafka:
        image: confluentinc/cp-kafka:7.0.1
        container_name: kafka
        depends_on:
          - zookeeper
        ports:      -
          "9092:9092"
        environment:
          KAFKA_BROKER_ID: 1     
          KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092     
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1     
          KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
  3. Kafka 서비스를 시작하고 토픽을 생성합니다.
    # --- ON THE HOST MACHINE ---# 
    Clean up any old containers from previous attempts 
    docker compose down --volumes--remove-orphans 
    # Start Kafka and Zookeeper in the background docker compose up -d
    # Create the input and output topics
    docker exec kafka kafka-topics --create --topic gnn_fraud_input --bootstrap-server localhost:9092 
    docker exec kafka kafka-topics --create --topic gnn_fraud_output --bootstrap-server localhost:9092

Kafka 도우미 스크립트 생성

호스트 시스템에서 Python 패키지 충돌이 발생하지 않도록 호스트 Python 환경을 설정합니다.

# --- ON THE HOST MACHINE ---
# Navigate to your project directory (e.g., ~/morpheus_fraud_detection) 
cd $PROJECT_DIR# Create an isolated Python virtual environment 
python3 -m venv kafka_env 
# Activate the environment 
source kafka_env/bin/activate

이제 터미널 프롬프트에 (kafka_env) 접두어가 붙습니다.

  1. 가상 환경 내 호스트에 필요한 Python 라이브러리를 설치합니다.
    # Ensure the (kafka_env) is active
    pip install kafka-python
  2. 호스트 시스템에서 프로젝트 디렉토리에 두 개의 Python 스크립트를 생성하여 데이터를 생성하고 결과를 소비합니다.
  3. 다음 스크립트를 사용하여 생산자 파일을 생성합니다.
    # File: $PROJECT_DIR/producer.py\
    import csv\
    from kafka import KafkaProducer\
    import time\
    \
    producer = KafkaProducer(\
        bootstrap_servers='localhost:9092',\
        value_serializer=lambda v: v.encode('utf-8')\
    )\
    csv_file_path = './Morpheus/examples/gnn_fraud_detection_pipeline/validation.csv'\
    input_topic = 'gnn_fraud_input'\
    print(f"Streaming data from \{csv_file_path\} to Kafka topic '\{input_topic\}'...")\
    with open(csv_file_path, 'r') as file:\
        header = next(file)\
        reader = csv.reader(file)\
        for row in reader:\
            message = header.strip() + '\\n' + ','.join(row).strip()\
            producer.send(input_topic, value=message)\
            print(f"Sent transaction index: \{row[0]\}")\
            time.sleep(0.2)\
    producer.flush()\
    print("Finished streaming data.")}
  4. 다음 스크립트를 사용하여 소비자 파일을 생성합니다.
    # File: $PROJECT_DIR/consumer.py\
    from kafka import KafkaConsumer\
    \
    consumer = KafkaConsumer(\
        'gnn_fraud_output',\
        bootstrap_servers='localhost:9092',\
        auto_offset_reset='earliest',\
        group_id='fraud-demo-consumer',\
        value_deserializer=lambda x: x.decode('utf-8')\
    )\
    print("Listening for fraud detection results on topic 'gnn_fraud_output'...")\
    for message in consumer:\
        print("\\n--- Real-Time Fraud Alert ---")\
        print(message.value)\
        print("----------------------------")}
  5. $PROJECT_DIR/Morpheus/examples/gnn_fraud_detection_pipeline/run.py에 있는 Morpheus 파이프라인 스크립트의 콘텐츠를 다음과 같이 Kafka 사용 버전으로 바꿉니다.
    # File: $PROJECT_DIR/Morpheus/examples/gnn_fraud_detection_pipeline/run.py
    # Copyright (c) 2021-2025, NVIDIA CORPORATION.\
    #\
    # Licensed under the Apache License, Version 2.0 (the "License");\
    # you may not use this file except in compliance with the License.\
    # You may obtain a copy of the License at\
    #\
    #     http://www.apache.org/licenses/LICENSE-2.0\
    #\
    # Unless required by applicable law or agreed to in writing, software\
    # distributed under the License is distributed on an "AS IS" BASIS,\
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\
    # See the License for the specific language governing permissions and\
    # limitations under the License.\
    \
    import logging\
    import os\
    \
    import click\
    # pylint: disable=no-name-in-module\
    from stages.classification_stage import ClassificationStage\
    from stages.graph_construction_stage import FraudGraphConstructionStage\
    from stages.graph_sage_stage import GraphSAGEStage\
    \
    from morpheus.config import Config\
    from morpheus.config import PipelineModes\
    from morpheus.pipeline.linear_pipeline import LinearPipeline\
    from morpheus.stages.general.monitor_stage import MonitorStage\
    from morpheus.stages.input.kafka_source_stage import KafkaSourceStage\
    from morpheus.stages.output.write_to_kafka_stage import WriteToKafkaStage\
    from morpheus.stages.postprocess.serialize_stage import SerializeStage\
    from morpheus.stages.preprocess.deserialize_stage import DeserializeStage\
    from morpheus.utils.logger import configure_logging\
    \
    CUR_DIR = os.path.dirname(__file__)\
    \
    \
    @click.command()\
    @click.option(\
        "--num_threads",\
        default=len(os.sched_getaffinity(0)),\
        type=click.IntRange(min=1),\
        help="Number of internal pipeline threads to use.",\
    )\
    @click.option(\
        "--pipeline_batch_size",\
        default=1024,\
        type=click.IntRange(min=1),\
        help=("Internal batch size for the pipeline. Can be much larger than the model batch size. "\
              "Also used for Kafka consumers."),\
    )\
    @click.option(\
        "--model_max_batch_size",\
        default=32,\
        type=click.IntRange(min=1),\
        help="Max batch size to use for the model.",\
    )\
    @click.option(\
        "--model_fea_length",\
        default=70,\
        type=click.IntRange(min=1),\
        help="Features length to use for the model.",\
    )\
    @click.option(\
        "--bootstrap_server",\
        default="localhost:9092",\
        help="Kafka bootstrap server address.",\
    )\
    @click.option(\
        "--input_topic",\
        default="gnn_fraud_input",\
        help="Kafka topic to listen for input messages.",\
    )\
    @click.option(\
        "--output_topic",\
        default="gnn_fraud_output",\
        help="Kafka topic to publish output messages to.",\
    )\
    @click.option(\
        "--training_file",\
        type=click.Path(exists=True, readable=True, dir_okay=False),\
        default=os.path.join(CUR_DIR, "training.csv"),\
        required=True,\
        help="Training data filepath (used for graph context).",\
    )\
    @click.option(\
        "--model_dir",\
        type=click.Path(exists=True, readable=True, file_okay=False, dir_okay=True),\
        default=os.path.join(CUR_DIR, "model"),\
        required=True,\
        help="Path to trained Hinsage & XGB models.",\
    )\
    def run_pipeline(num_threads,\
                     pipeline_batch_size,\
                     model_max_batch_size,\
                     model_fea_length,\
                     bootstrap_server,\
                     input_topic,\
                     output_topic,\
                     training_file,\
                     model_dir):\
        # Enable the default logger.\
        configure_logging(log_level=logging.INFO)\
    \
        # Its necessary to get the global config object and configure it.\
        config = Config()\
        config.mode = PipelineModes.OTHER\
    \
        # Below properties are specified by the command line.\
        config.num_threads = num_threads\
        config.pipeline_batch_size = pipeline_batch_size\
        config.model_max_batch_size = model_max_batch_size\
        config.feature_length = model_fea_length\
    \
        config.class_labels = ["probs"]\
        config.edge_buffer_size = 4\
    \
        # Create a linear pipeline object.\
        pipeline = LinearPipeline(config)\
    \
        # Set source stage to KafkaSourceStage\
        # This stage reads messages from a Kafka topic.\
        pipeline.set_source(\
            KafkaSourceStage(config,\
                             bootstrap_servers=bootstrap_server,\
                             input_topic=input_topic,\
                             auto_offset_reset="earliest",\
                             poll_interval="1 seconds",\
                             stop_after=0)) # stop_after=0 runs indefinitely\
    \
        # Add a deserialize stage.\
        # At this stage, messages were logically partitioned based on the 'pipeline_batch_size'.\
        pipeline.add_stage(DeserializeStage(config))\
    \
        # Add the graph construction stage.\
        pipeline.add_stage(FraudGraphConstructionStage(config, training_file))\
        pipeline.add_stage(MonitorStage(config, description="Graph construction rate"))\
    \
        # Add a sage inference stage.\
        pipeline.add_stage(GraphSAGEStage(config, model_dir))\
        pipeline.add_stage(MonitorStage(config, description="Inference rate"))\
    \
        # Add classification stage.\
        # This stage adds detected classifications to each message.\
        pipeline.add_stage(ClassificationStage(config, os.path.join(model_dir, "xgb.pt")))\
        pipeline.add_stage(MonitorStage(config, description="Add classification rate"))\
    \
        # Add a serialize stage.\
        # This stage includes & excludes columns from messages.\
        pipeline.add_stage(SerializeStage(config))\
        pipeline.add_stage(MonitorStage(config, description="Serialize rate"))\
    \
        # Add a write to Kafka stage.\
        # This stage writes all messages to a Kafka topic.\
        pipeline.add_stage(\
            WriteToKafkaStage(config,\
                              bootstrap_servers=bootstrap_server,\
                              output_topic=output_topic))\
    \
        # Run the pipeline.\
        pipeline.run()\
    \
    \
    if __name__ == "__main__":\
        run_pipeline()}