참고:
- 이 사용지침서에서는 Oracle Cloud에 접근해야 합니다. 무료 계정에 등록하려면 Oracle Cloud Infrastructure Free Tier 시작하기를 참조하십시오.
- Oracle Cloud Infrastructure 자격 증명, 테넌시 및 구획에 대한 예제 값을 사용합니다. 실습을 마치면 해당 값을 클라우드 환경과 관련된 값으로 대체합니다.
OCI 기능을 사용하여 OCI 로깅에서 mTLS 구성 Kafka로 로그 푸시
소개
로그는 리소스의 일기를 나타내며 네트워크 시스템에서 모든 메시지에 대한 레코드를 가져옵니다. 조직의 클라우드 인프라 내에 많은 리소스가 있으면 성능과 보안을 유지할 필요가 있습니다. 로그 수집은 조직의 에코시스템 내 다양한 리소스에서 로그 데이터를 수집하여 분석을 위한 중앙 지점에 통합하는 프로세스입니다. 이 사용지침서에서는 OCI 외부에 있는 mTLS를 사용하여 Kafka 서버 설정으로 로그를 푸시하기 위해 OCI(Oracle Cloud Infrastructure)에 필요한 리소스를 설정하는 프로세스를 설명합니다.
이 구성을 사용하면 OCI 테넌시에서 중앙 분석 지점으로 실시간 로그를 가져오는 솔루션이 제공됩니다. 또한 Logstash를 Kafka 서버에 연결하고 요구사항에 따라 데이터를 변환/향상한 다음 ElasticSearch 인덱스에 배치할 수 있습니다.

목표
- OCI Service Connector Hub 생성 및 사용
- OCI 함수 구성에서 OCI Vault 생성 및 사용
- Kafka에 대한 mTLS 구성
- OCI Logging 서비스에서 로그를 가져와 OCI 서비스 커넥터 허브 및 OCI 기능을 사용하여 mTLS로 구성된 Kafka로 리디렉션
필요 조건
- Oracle Cloud 계정입니다. 계정이 없는 경우 Oracle Cloud Free Tier 계정에 가입할 수 있습니다.
- 함수 응용 프로그램을 만들 네트워크 설정입니다. 함수 애플리케이션이 생성될 공용 서브넷으로 생성된 VCN이 있다고 가정합니다.
작업 1: Oracle Cloud Infrastructure 설정 구성
이 섹션에는 OCI에 필요한 인프라를 생성하는 데 사용될 초기 단계가 포함되어 있습니다. 아래 리소스를 생성합니다.
-
마스터 암호화 키 및 암호로 저장소 생성
-
ID 및 보안 메뉴로 이동하여 저장소를 선택한 다음 저장소 생성을 선택합니다.
-
저장소에 대한 이름을 입력하고 저장소 생성을 누릅니다.

-
저장소가 생성된 후 생성한 저장소를 누르고 AES 알고리즘을 사용하여 마스터 암호화 키를 생성합니다. 이 키는 이 블로그에서 저장소 암호를 추가로 암호화하는 데 사용됩니다.
-
-
OCI 함수에서 사용할 이미지에 대한 컨테이너 레지스트리 생성
-
개발자 서비스, 컨테이너 및 아티팩트로 이동하고 컨테이너 레지스트리를 선택합니다.
-
이름이 logs_to_kafka인 개인 저장소를 만듭니다.

-
-
함수 애플리케이션 생성
-
Kafka 서버에 로그를 수집하려면 Kafka 토픽에 게시할 Kafka 클라이언트가 필요합니다. 함수 섹션에서 개발자 서비스로 이동합니다.
-
애플리케이션 생성을 선택하고 애플리케이션 이름을 logs_to_kafka로 지정합니다.
-
이 블로그의 사전 요구사항 섹션에서 생성한 VCN을 선택하고 애플리케이션을 배포할 VCN 내의 공용 서브넷을 선택합니다. 해당 공용 서브넷에서 VCN 외부의 트래픽을 허용하는 데 필요한 보안 규칙 및 경로 규칙이 있는지 확인하십시오.

-
작업 2: Kafka 서버 설치 및 구성
-
Kafka 서버가 배치될 Linux 인스턴스를 생성합니다.
-
먼저 VM 인스턴스를 생성하고
ssh액세스를 사용으로 설정하여 mTLS를 사용하여 Kafka를 설치 및 구성합니다. -
ssh를 사용하여 VM에 액세스하고 quickstart의 단계에 따라 해당 인스턴스에 Kafka를 설치합니다.
-
-
Kafka 서버에 대한 mTLS를 구성하고 mTLS에 사용되는 인증서를 만듭니다. 인스턴스의 홈 디렉토리로 이동하여 다음 명령을 실행합니다.
$ mkdir ~/ssl # This one, ca-cert is the certificate authority you will use in the function $ openssl req -new -x509 -keyout ssl/ca-key -out ssl/ca-cert -days 365 -nodes $ keytool -keystore ssl/kafka.server.keystore.jks -alias server -validity 365 -genkey -keyalg RSA -storepass <password> -keypass <password> $ keytool -keystore ssl/kafka.client.keystore.jks -alias client -validity 365 -genkey -keyalg RSA -storepass <password> -keypass <password> $ keytool -keystore ssl/kafka.server.truststore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt $ keytool -keystore ssl/kafka.client.truststore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt $ keytool -keystore ssl/kafka.server.keystore.jks -alias server -storepass <password> -certreq -file ssl/server-cert-file $ keytool -keystore ssl/kafka.client.keystore.jks -alias client -storepass<password> -certreq -file ssl/client-cert-file $ openssl x509 -req -CA ssl/ca-cert -CAkey ssl/ca-key -in ssl/server-cert-file -out ssl/server-cert-signed -days 365 -CAcreateserial -passin pass:<password> $ openssl x509 -req -CA ssl/ca-cert -CAkey ssl/ca-key -in ssl/client-cert-file -out ssl/client-cert-signed -days 365 -CAcreateserial -passin pass:<password> $ keytool -keystore ssl/kafka.server.keystore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt $ keytool -keystore ssl/kafka.server.keystore.jks -alias server -storepass <password> -importcert -file ssl/server-cert-signed -noprompt $ keytool -keystore ssl/kafka.client.keystore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt $ keytool -keystore ssl/kafka.client.keystore.jks -alias client -storepass <password> -importcert -file ssl/client-cert-signed -noprompt # This one, other_client_cert.pem is the client certificate you use in the function $ keytool -exportcert -alias client -keystore kafka.client.keystore.jks -rfc -file other_client_cert.pem -
Kafka 서버의 config/ 폴더에서 server.properties를 편집합니다.
-
SSL 키 및 인증서를 만든 후에는 Kafka 구성에 추가해야 합니다. Kafka 폴더 아래에서 config/server.properties를 편집하고 다음 라인을 추가합니다.
listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9093 advertised.listeners=PLAINTEXT://<hostname>:9092,SSL://<public ip of hostname:9093 ssl.keystore.location=<path to ssl/kafka.server.keystore.jks> ssl.keystore.password=<password> ssl.key.password=<password> ssl.truststore.location=<path to ssl/kafka.server.truststore.jks> ssl.truststore.password=<password> -
Kafka 서버 포트에 대한 인스턴스의 방화벽에 포트 허용량을 추가합니다. 보안 측면에서 먼저 VM 내부의 방화벽이 Kafka 포트에서 액세스를 허용하도록 구성되었는지 확인해야 합니다.
sudo firewall-cmd --add-port=9093/tcp sudo firewall-cmd --add-port=9092/tcp -
해당 인스턴스에 대한 트래픽을 허용하도록 네트워크가 구성되어 있는지 확인합니다. VM 내의 방화벽에 적절한 구성이 있는지 확인한 후 Kafka 특정 포트에서 해당 인스턴스에 대한 트래픽을 허용하도록 트래픽을 필터링하는 보안 규칙 및 경로 규칙 또는 기타 유형의 방화벽을 구성해야 합니다.
-
다음 명령을 실행하여 백그라운드에서 Kafka 서버를 실행합니다.
nohup bin/zookeeper-server-start.sh config/zookeeper.properties & nohup bin/kafka-server-start.sh config/server.properties &
작업 3: OCI 기능 및 서비스 커넥터 허브 설정
함수와 애플리케이션을 생성, 배포 및 관리하는 정책이 있는지 확인하십시오. 저장소, 암호 및 키에 대한 서비스 FAAS 액세스를 허용해야 합니다.
-
이전에 만든 OCI Vault에서 ca-cert 및 client_cert.pem 암호를 만듭니다.
ca-cert

client_cert.pem

-
OCI 콘솔에서 함수, 구성으로 이동하여 다음 변수를 추가합니다.

-
OCI 기능 설정
-
함수 섹션에서 개발자 서비스로 이동합니다.
-
이미 생성된 애플리케이션 logs_to_kafka에서 시작하기를 선택하고 Cloud Shell 설정을 선택한 다음 도커 로그인 명령까지 단계를 수행합니다.

-
logs_to_kafka보일러 플레이트 함수를 생성합니다.fn init --runtime python logs_to_kafkafn init 명령은 func.py, func.yaml 및 requirements.txt의 3개 파일을 포함하는 logs_to_kafka라는 폴더를 생성합니다.
-
func.py를 열고 파일 내용을 다음 코드로 바꿉니다.
import io import json import logging import oci import base64 from confluent_kafka import Producer, KafkaError def handler(ctx, data: io.BytesIO = None): cfg = ctx.Config() signer = oci.auth.signers.get_resource_principals_signer() try: topic_name = str(cfg["topic_name"]) bootstrap_server = str(cfg["bootstrap_server"]) security_protocol = str(cfg["security_protocol"]) secret_name = str(cfg["ca_cert_secret_name"]) client_cert_secret = str(cfg["client_cert_secret_name"]) vauld_ocid = str(cfg["vauld_ocid"]) except: logging.error('Some of the function config keys are not set') raise try: body = json.loads(data.getvalue()) except (Exception, ValueError) as ex: logging.getLogger().info('error parsing json payload: ' + str(ex)) try: client_certificate = decodeSecret( p_signer=signer, p_secretName=client_cert_secret, p_vaultOCID=vauld_ocid) except (Exception, ValueError) as ex: logging.getLogger().info('error retrieving the client certificate from vault: ' + str(ex)) try: decoded_secret = decodeSecret( p_signer=signer, p_secretName=secret_name, p_vaultOCID=vauld_ocid) except (Exception, ValueError) as ex: logging.getLogger().info('error retrieving the secret: ' + str(ex)) try: sent_logs = publish_message(topic=topic_name, bootstrap_srv=bootstrap_server, security_protocol=security_protocol, ca_pem=decoded_secret, client_pem=client_certificate, record_value=bytes(str(body[0]), encoding='utf-8')) logging.info(f'log is sent {sent_logs}') except (Exception, ValueError) as ex: logging.getLogger().info('error in publishing the message: ' + str(ex)) def decodeSecret(p_signer, p_secretName, p_vaultOCID): secretClient = oci.secrets.SecretsClient(config={}, signer=p_signer) secret = secretClient.get_secret_bundle_by_name( secret_name=p_secretName, vault_id=p_vaultOCID).data secret_content = secret.secret_bundle_content.content.encode("utf-8") decodedSecret = base64.b64decode(secret_content).decode("utf-8") return decodedSecret def delivery_report(errmsg, msg): if errmsg is not None: print("Delivery failed for Message: {} : {}".format(msg.key(), errmsg)) return print('Message successfully produced to Topic:{} at offset {}'.format( msg.topic(), msg.offset())) def publish_message(topic, bootstrap_srv, security_protocol, ca_pem, client_pem, record_value): conf = { 'bootstrap.servers': bootstrap_srv, 'security.protocol': security_protocol, 'ssl.certificate.pem': client_pem, 'ssl.ca.pem': ca_pem } producer = Producer(conf) produce_log = producer.produce(topic, key=None, value=record_value, on_delivery=delivery_report) producer.flush() return produce_log -
그런 다음 func.yaml의 내용을 바꿉니다. 적절한 정보로 필드에 정보를 입력합니다.
schema_version: 20180708 name: logs_to_kafka version: 0.0.1 runtime: python build_image: fnproject/python:3.9-dev run_image: fnproject/python:3.9 entrypoint: /python/bin/fdk /function/func.py handler memory: 256 config: bootstrap_server: <kafka_server_public_ip>:9093 ca_cert_secret_name: ca_cert_secret_name client_cert_secret_name: client_cert_secret_name security_protocol: SSL topic_name: quickstart-events vauld_ocid: <vault_ocid> -
requirements.txt의 경우 다음이 필요합니다.
fdk>=0.1.50 confluent-kafka>=1.4.2 oci>=2.57.0
-
-
Analytics & AI, Service Connector Hub로 이동하고 서비스 커넥터 생성을 선택합니다. 다음 그림과 같이 필드에 정보를 입력합니다.

작업 4: 로그가 Kafka 서버로 전송되었는지 확인합니다.
-
이진이 있는 모든 SSL 인증서와 Kafka 폴더가 서버에도 사용되므로 Kafka 서버에 대한 폴더가 있는 인스턴스에서 클라이언트를 실행합니다.
-
먼저 다음 consumer.properties 파일을 준비합니다.
security.protocol=SSL ssl.truststore.location=<path to kafka.client.truststore.jks> ssl.truststore.password=<password> ssl.keystore.location=<path to kafka.client.keystore.jks> ssl.keystore.password=<password> ssl.key.password=<password> ssl.endpoint.identification.algorithm= -
그런 다음 Kafka 폴더에서 다음 명령을 실행합니다.
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server <instance_public_ip>:9093 --consumer.config <path_to_consumer.properties>
관련 링크
승인
- 작성자 - Cristian Vlad(Principal Cloud Architect)
- 제공자 - Gabriel Feodorov(클라우드 엔지니어)
추가 학습 자원
docs.oracle.com/learn에서 다른 실습을 탐색하거나 Oracle Learning YouTube 채널에서 더 많은 무료 학습 콘텐츠에 액세스하십시오. 또한 Oracle Learning Explorer가 되려면 education.oracle.com/learning-explorer을 방문하십시오.
제품 설명서는 Oracle Help Center를 참조하십시오.
Push logs from OCI Logging into mTLS configured Kafka using OCI Functions
F79041-01
March 2023
Copyright © 2023, Oracle and/or its affiliates.