참고:

OCI 기능을 사용하여 OCI 로깅에서 mTLS 구성 Kafka로 로그 푸시

소개

로그는 리소스의 일기를 나타내며 네트워크 시스템에서 모든 메시지에 대한 레코드를 가져옵니다. 조직의 클라우드 인프라 내에 많은 리소스가 있으면 성능과 보안을 유지할 필요가 있습니다. 로그 수집은 조직의 에코시스템 내 다양한 리소스에서 로그 데이터를 수집하여 분석을 위한 중앙 지점에 통합하는 프로세스입니다. 이 사용지침서에서는 OCI 외부에 있는 mTLS를 사용하여 Kafka 서버 설정으로 로그를 푸시하기 위해 OCI(Oracle Cloud Infrastructure)에 필요한 리소스를 설정하는 프로세스를 설명합니다.

이 구성을 사용하면 OCI 테넌시에서 중앙 분석 지점으로 실시간 로그를 가져오는 솔루션이 제공됩니다. 또한 Logstash를 Kafka 서버에 연결하고 요구사항에 따라 데이터를 변환/향상한 다음 ElasticSearch 인덱스에 배치할 수 있습니다.

솔루션 아키텍처

목표

필요 조건

작업 1: Oracle Cloud Infrastructure 설정 구성

이 섹션에는 OCI에 필요한 인프라를 생성하는 데 사용될 초기 단계가 포함되어 있습니다. 아래 리소스를 생성합니다.

  1. 마스터 암호화 키 및 암호로 저장소 생성

    1. ID 및 보안 메뉴로 이동하여 저장소를 선택한 다음 저장소 생성을 선택합니다.

    2. 저장소에 대한 이름을 입력하고 저장소 생성을 누릅니다.

      저장소 창 생성

    3. 저장소가 생성된 후 생성한 저장소를 누르고 AES 알고리즘을 사용하여 마스터 암호화 키를 생성합니다. 이 키는 이 블로그에서 저장소 암호를 추가로 암호화하는 데 사용됩니다.

  2. OCI 함수에서 사용할 이미지에 대한 컨테이너 레지스트리 생성

    1. 개발자 서비스, 컨테이너 및 아티팩트로 이동하고 컨테이너 레지스트리를 선택합니다.

    2. 이름이 logs_to_kafka인 개인 저장소를 만듭니다.

      저장소 창 생성

  3. 함수 애플리케이션 생성

    1. Kafka 서버에 로그를 수집하려면 Kafka 토픽에 게시할 Kafka 클라이언트가 필요합니다. 함수 섹션에서 개발자 서비스로 이동합니다.

    2. 애플리케이션 생성을 선택하고 애플리케이션 이름을 logs_to_kafka로 지정합니다.

    3. 이 블로그의 사전 요구사항 섹션에서 생성한 VCN을 선택하고 애플리케이션을 배포할 VCN 내의 공용 서브넷을 선택합니다. 해당 공용 서브넷에서 VCN 외부의 트래픽을 허용하는 데 필요한 보안 규칙 및 경로 규칙이 있는지 확인하십시오.

    애플리케이션 창 생성

작업 2: Kafka 서버 설치 및 구성

  1. Kafka 서버가 배치될 Linux 인스턴스를 생성합니다.

    1. 먼저 VM 인스턴스를 생성하고 ssh 액세스를 사용으로 설정하여 mTLS를 사용하여 Kafka를 설치 및 구성합니다.

    2. ssh를 사용하여 VM에 액세스하고 quickstart의 단계에 따라 해당 인스턴스에 Kafka를 설치합니다. 

  2. Kafka 서버에 대한 mTLS를 구성하고 mTLS에 사용되는 인증서를 만듭니다. 인스턴스의 홈 디렉토리로 이동하여 다음 명령을 실행합니다.

    $ mkdir ~/ssl
    # This one, ca-cert is the certificate authority you will use in the function
    $ openssl req -new -x509 -keyout ssl/ca-key -out ssl/ca-cert -days 365 -nodes
    $ keytool -keystore ssl/kafka.server.keystore.jks -alias server -validity 365 -genkey -keyalg RSA -storepass <password> -keypass <password>
    $ keytool -keystore ssl/kafka.client.keystore.jks -alias client -validity 365 -genkey -keyalg RSA -storepass <password> -keypass <password>
    $ keytool -keystore ssl/kafka.server.truststore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt
    $ keytool -keystore ssl/kafka.client.truststore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt
    $ keytool -keystore ssl/kafka.server.keystore.jks -alias server -storepass <password> -certreq -file ssl/server-cert-file
    $ keytool -keystore ssl/kafka.client.keystore.jks -alias client -storepass<password> -certreq -file ssl/client-cert-file
    $ openssl x509 -req -CA ssl/ca-cert -CAkey ssl/ca-key -in ssl/server-cert-file -out ssl/server-cert-signed -days 365 -CAcreateserial -passin pass:<password>
    $ openssl x509 -req -CA ssl/ca-cert -CAkey ssl/ca-key -in ssl/client-cert-file -out ssl/client-cert-signed -days 365 -CAcreateserial -passin pass:<password>
    $ keytool -keystore ssl/kafka.server.keystore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt
    $ keytool -keystore ssl/kafka.server.keystore.jks -alias server -storepass <password> -importcert -file ssl/server-cert-signed -noprompt
    $ keytool -keystore ssl/kafka.client.keystore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt
    $ keytool -keystore ssl/kafka.client.keystore.jks -alias client -storepass <password> -importcert -file ssl/client-cert-signed -noprompt
    # This one, other_client_cert.pem is the client certificate you use in the function
    $ keytool -exportcert -alias client -keystore kafka.client.keystore.jks -rfc -file other_client_cert.pem
    
  3. Kafka 서버의 config/ 폴더에서 server.properties를 편집합니다.

  4. SSL 키 및 인증서를 만든 후에는 Kafka 구성에 추가해야 합니다. Kafka 폴더 아래에서 config/server.properties를 편집하고 다음 라인을 추가합니다.

    listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9093
    advertised.listeners=PLAINTEXT://<hostname>:9092,SSL://<public ip of hostname:9093
    ssl.keystore.location=<path to ssl/kafka.server.keystore.jks>
    ssl.keystore.password=<password>
    ssl.key.password=<password>
    ssl.truststore.location=<path to ssl/kafka.server.truststore.jks> 
    ssl.truststore.password=<password>
    
  5. Kafka 서버 포트에 대한 인스턴스의 방화벽에 포트 허용량을 추가합니다. 보안 측면에서 먼저 VM 내부의 방화벽이 Kafka 포트에서 액세스를 허용하도록 구성되었는지 확인해야 합니다.

    sudo firewall-cmd --add-port=9093/tcp
    sudo firewall-cmd --add-port=9092/tcp
    
  6. 해당 인스턴스에 대한 트래픽을 허용하도록 네트워크가 구성되어 있는지 확인합니다. VM 내의 방화벽에 적절한 구성이 있는지 확인한 후 Kafka 특정 포트에서 해당 인스턴스에 대한 트래픽을 허용하도록 트래픽을 필터링하는 보안 규칙 및 경로 규칙 또는 기타 유형의 방화벽을 구성해야 합니다.

  7. 다음 명령을 실행하여 백그라운드에서 Kafka 서버를 실행합니다.

    nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
    nohup bin/kafka-server-start.sh config/server.properties &
    

작업 3: OCI 기능 및 서비스 커넥터 허브 설정

함수와 애플리케이션을 생성, 배포 및 관리하는 정책이 있는지 확인하십시오. 저장소, 암호 및 키에 대한 서비스 FAAS 액세스를 허용해야 합니다.

  1. 이전에 만든 OCI Vault에서 ca-certclient_cert.pem 암호를 만듭니다.

    ca-cert

    보안 CA 인증서 생성 창

    client_cert.pem

    암호 클라이언트 인증서 생성 창

  2. OCI 콘솔에서 함수, 구성으로 이동하여 다음 변수를 추가합니다.

    함수 구성 생성 창

  3. OCI 기능 설정

    1. 함수 섹션에서 개발자 서비스로 이동합니다.

    2. 이미 생성된 애플리케이션 logs_to_kafka에서 시작하기를 선택하고 Cloud Shell 설정을 선택한 다음 도커 로그인 명령까지 단계를 수행합니다.

      함수 시작 창

    3. logs_to_kafka 보일러 플레이트 함수를 생성합니다.

      fn init --runtime python logs_to_kafka
      

      fn init 명령은 func.py, func.yamlrequirements.txt의 3개 파일을 포함하는 logs_to_kafka라는 폴더를 생성합니다.

    4. func.py를 열고 파일 내용을 다음 코드로 바꿉니다.

      import io
      import json
      import logging
      import oci
      import base64
      from confluent_kafka import Producer, KafkaError
      
      def handler(ctx, data: io.BytesIO = None):
          cfg = ctx.Config()
          signer = oci.auth.signers.get_resource_principals_signer()
          try:
              topic_name = str(cfg["topic_name"])
              bootstrap_server = str(cfg["bootstrap_server"])
              security_protocol = str(cfg["security_protocol"])
              secret_name = str(cfg["ca_cert_secret_name"])
              client_cert_secret = str(cfg["client_cert_secret_name"])
              vauld_ocid = str(cfg["vauld_ocid"])
          except:
              logging.error('Some of the function config keys are not set')
              raise
          try:
              body = json.loads(data.getvalue())
          except (Exception, ValueError) as ex:
              logging.getLogger().info('error parsing json payload: ' + str(ex))
          try:
              client_certificate = decodeSecret(
              p_signer=signer, p_secretName=client_cert_secret, p_vaultOCID=vauld_ocid)
          except (Exception, ValueError) as ex:
              logging.getLogger().info('error retrieving the client certificate from vault: ' + str(ex))
          try:
              decoded_secret = decodeSecret(
                  p_signer=signer, p_secretName=secret_name, p_vaultOCID=vauld_ocid)
          except (Exception, ValueError) as ex:
              logging.getLogger().info('error retrieving the secret: ' + str(ex))
          try:
              sent_logs = publish_message(topic=topic_name, bootstrap_srv=bootstrap_server, security_protocol=security_protocol,
                               ca_pem=decoded_secret, client_pem=client_certificate, record_value=bytes(str(body[0]), encoding='utf-8'))
              logging.info(f'log is sent {sent_logs}')
          except (Exception, ValueError) as ex:
              logging.getLogger().info('error in publishing the message: ' + str(ex))
      
      def decodeSecret(p_signer, p_secretName, p_vaultOCID):
          secretClient = oci.secrets.SecretsClient(config={}, signer=p_signer)
          secret = secretClient.get_secret_bundle_by_name(
                                      secret_name=p_secretName, vault_id=p_vaultOCID).data
          secret_content = secret.secret_bundle_content.content.encode("utf-8")
          decodedSecret = base64.b64decode(secret_content).decode("utf-8")
          return decodedSecret
      
      def delivery_report(errmsg, msg):
          if errmsg is not None:
              print("Delivery failed for Message: {} : {}".format(msg.key(), errmsg))
              return
          print('Message successfully produced to Topic:{} at offset {}'.format(
                  msg.topic(), msg.offset()))
      
      def publish_message(topic, bootstrap_srv, security_protocol, ca_pem, client_pem, record_value):
          conf = {
              'bootstrap.servers': bootstrap_srv,
              'security.protocol': security_protocol,
              'ssl.certificate.pem': client_pem,
              'ssl.ca.pem': ca_pem
          }
          producer = Producer(conf)
          produce_log = producer.produce(topic, key=None, value=record_value, on_delivery=delivery_report)
          producer.flush()
          return produce_log
      
    5. 그런 다음 func.yaml의 내용을 바꿉니다. 적절한 정보로 필드에 정보를 입력합니다.

      schema_version: 20180708
      name: logs_to_kafka
      version: 0.0.1
      runtime: python
      build_image: fnproject/python:3.9-dev
      run_image: fnproject/python:3.9
      entrypoint: /python/bin/fdk /function/func.py handler
      memory: 256
      config:
          bootstrap_server: <kafka_server_public_ip>:9093
          ca_cert_secret_name: ca_cert_secret_name
          client_cert_secret_name: client_cert_secret_name
          security_protocol: SSL
          topic_name: quickstart-events
          vauld_ocid: <vault_ocid>
      
    6. requirements.txt의 경우 다음이 필요합니다.

      fdk>=0.1.50
      confluent-kafka>=1.4.2
      oci>=2.57.0
      
  4. Analytics & AI, Service Connector Hub로 이동하고 서비스 커넥터 생성을 선택합니다. 다음 그림과 같이 필드에 정보를 입력합니다.

    서비스 커넥터 생성 창

작업 4: 로그가 Kafka 서버로 전송되었는지 확인합니다.

  1. 이진이 있는 모든 SSL 인증서와 Kafka 폴더가 서버에도 사용되므로 Kafka 서버에 대한 폴더가 있는 인스턴스에서 클라이언트를 실행합니다. 

  2. 먼저 다음 consumer.properties 파일을 준비합니다.

    security.protocol=SSL
    ssl.truststore.location=<path to kafka.client.truststore.jks>
    ssl.truststore.password=<password>
    ssl.keystore.location=<path to kafka.client.keystore.jks>
    ssl.keystore.password=<password>
    ssl.key.password=<password>
    ssl.endpoint.identification.algorithm=
    
  3. 그런 다음 Kafka 폴더에서 다음 명령을 실행합니다. 

    bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server <instance_public_ip>:9093 --consumer.config <path_to_consumer.properties>
    

승인

추가 학습 자원

docs.oracle.com/learn에서 다른 실습을 탐색하거나 Oracle Learning YouTube 채널에서 더 많은 무료 학습 콘텐츠에 액세스하십시오. 또한 Oracle Learning Explorer가 되려면 education.oracle.com/learning-explorer을 방문하십시오.

제품 설명서는 Oracle Help Center를 참조하십시오.