注:
- 此教程需要访问 Oracle Cloud。要注册免费账户,请参阅开始使用 Oracle Cloud Infrastructure Free Tier 。
- 它使用 Oracle Cloud Infrastructure 身份证明、租户和区间示例值。完成实验室时,请将这些值替换为特定于云环境的那些值。
使用 OCI 函数将日志从 OCI 登录到 mTLS 配置的 Kafka
简介
日志表示资源的日志,您可以从网络系统获取每条消息的记录。在组织的云基础设施中拥有大量资源,因此需要维护其性能和安全性。需要进行日志收集,即从组织生态系统内的各种资源收集日志数据并将其整合到一个分析的中心点的过程。本教程介绍了设置 Oracle Cloud Infrastructure (OCI) 中将日志推送到位于 OCI 外部的 mTLS 的 Kafka 服务器设置所需的资源的过程。
使用此配置,您可以将实时日志从 OCI 租户获取到中心分析点。此外,您可以将 Logstash 链接到 Kafka 服务器并根据需要转换/增强数据,并将其置于 ElasticSearch 索引中。
目标
- 创建和使用 OCI 服务连接器中心
- 在 OCI 功能配置中创建和使用 OCI Vault
- 为 Kafka 配置 mTLS
- 从 OCI 日志记录服务获取日志并使用 OCI 服务连接器中心和 OCI 函数将其重定向到 mTLS 配置的 Kafka
先决条件
- Oracle Cloud 账户。如果没有账户,您可以注册 Oracle Cloud 免费套餐账户。
- 在其中创建函数应用程序的网络设置。我们假设您已经创建了一个 VCN,其中包含将在其中创建函数应用程序的公共子网。
任务 1:配置 Oracle Cloud Infrastructure 设置
本部分包含用于在 OCI 中创建所需的基础结构的初始步骤。您将创建以下资源:
-
使用主加密密钥和密钥创建 Vault
-
转到 Identity & Security 菜单,选择 Vault ,然后选择 Create Vault 。
-
输入 Vault 的名称,然后单击创建 Vault 。
-
创建 Vault 之后,单击您创建的 Vault 并使用 AES 算法创建主加密密钥。此密钥将用于在此博客中进一步加密 Vault 密钥。
-
-
为 OCI 函数将使用的映像创建容器注册表
-
转到开发人员服务、容器和构件,然后选择容器注册表。
-
创建名为 logs_to_kafka 的专用系统信息库。
-
-
创建函数应用程序
-
要将日志提取到 Kafka 服务器,需要一个 Kafka 客户机,该客户机将发布到 Kafka 的主题中。转至函数部分中的开发人员服务。
-
选择创建应用程序,并将应用程序命名为 logs_to_kafka 。
-
选择在此博客的先决条件部分中创建的 VCN,然后选择要在其中部署应用程序的 VCN 中的公共子网。请确保您具有必需的安全规则和路由规则,以允许 VCN 外部来自该公共子网的流量。
-
任务 2:安装和配置 Kafka 服务器
-
创建将部署 Kafka 服务器的 Linux 实例。
-
首先,创建 VM 实例并启用
ssh
访问权限以安装和配置具有 mTLS 的 Kafka。 -
使用
ssh
访问 VM,然后按照 quickstart 中的步骤在该实例上安装 Kafka。
-
-
为 Kafka 服务器配置 mTLS 并创建用于 mTLS 的证书。转到实例的起始目录并运行以下命令:
$ mkdir ~/ssl # This one, ca-cert is the certificate authority you will use in the function $ openssl req -new -x509 -keyout ssl/ca-key -out ssl/ca-cert -days 365 -nodes $ keytool -keystore ssl/kafka.server.keystore.jks -alias server -validity 365 -genkey -keyalg RSA -storepass <password> -keypass <password> $ keytool -keystore ssl/kafka.client.keystore.jks -alias client -validity 365 -genkey -keyalg RSA -storepass <password> -keypass <password> $ keytool -keystore ssl/kafka.server.truststore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt $ keytool -keystore ssl/kafka.client.truststore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt $ keytool -keystore ssl/kafka.server.keystore.jks -alias server -storepass <password> -certreq -file ssl/server-cert-file $ keytool -keystore ssl/kafka.client.keystore.jks -alias client -storepass<password> -certreq -file ssl/client-cert-file $ openssl x509 -req -CA ssl/ca-cert -CAkey ssl/ca-key -in ssl/server-cert-file -out ssl/server-cert-signed -days 365 -CAcreateserial -passin pass:<password> $ openssl x509 -req -CA ssl/ca-cert -CAkey ssl/ca-key -in ssl/client-cert-file -out ssl/client-cert-signed -days 365 -CAcreateserial -passin pass:<password> $ keytool -keystore ssl/kafka.server.keystore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt $ keytool -keystore ssl/kafka.server.keystore.jks -alias server -storepass <password> -importcert -file ssl/server-cert-signed -noprompt $ keytool -keystore ssl/kafka.client.keystore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt $ keytool -keystore ssl/kafka.client.keystore.jks -alias client -storepass <password> -importcert -file ssl/client-cert-signed -noprompt # This one, other_client_cert.pem is the client certificate you use in the function $ keytool -exportcert -alias client -keystore kafka.client.keystore.jks -rfc -file other_client_cert.pem
-
从 Kafka 服务器的 config/ 文件夹中编辑 server.properties 。
-
创建 SSL 密钥和证书后,您需要将其添加到 Kafka 配置中。编辑 Kafka 文件夹下的 config/server.properties 并添加以下行:
listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9093 advertised.listeners=PLAINTEXT://<hostname>:9092,SSL://<public ip of hostname:9093 ssl.keystore.location=<path to ssl/kafka.server.keystore.jks> ssl.keystore.password=<password> ssl.key.password=<password> ssl.truststore.location=<path to ssl/kafka.server.truststore.jks> ssl.truststore.password=<password>
-
为 Kafka 服务器端口的实例防火墙添加端口容限。在安全方面,首先,您需要确保 VM 内的防火墙配置为允许访问 Kafka 端口。
sudo firewall-cmd --add-port=9093/tcp sudo firewall-cmd --add-port=9092/tcp
-
确保已将网络配置为允许该实例的流量。确认 VM 内的防火墙具有正确的配置后,必须配置安全规则和路由规则,或者配置任何其他类型的防火墙来过滤流量,以允许在特定于 Kafka 的端口上与该实例通信。
-
运行以下命令在后台运行 Kafka 服务器。
nohup bin/zookeeper-server-start.sh config/zookeeper.properties & nohup bin/kafka-server-start.sh config/server.properties &
任务 3:设置 OCI 函数和服务连接器中心
确保您具有策略来创建、部署和管理函数和应用程序。还必须允许服务 FAAS 访问 Vault、密钥和密钥。
-
在以前创建的 OCI Vault 中创建 ca-cert 和 client_cert.pem 密钥。
咖啡屋
client_cert.pem
-
在 OCI 控制台中,转至功能、配置并添加以下变量:
-
设置 OCI 功能
-
转至函数部分中的开发人员服务。
-
转到已创建的应用程序 logs_to_kafka 并选择入门,然后选择 Cloud Shell 设置并执行这些步骤,直到 docker 登录命令为止。
-
生成
logs_to_kafka
模板函数。fn init --runtime python logs_to_kafka
fn init 命令将生成一个名为 logs_to_kafka 的文件夹,其中包含 3 个文件;func.py 、func.yaml 和 requirements.txt 。
-
打开 func.py 并将文件的内容替换为以下代码:
import io import json import logging import oci import base64 from confluent_kafka import Producer, KafkaError def handler(ctx, data: io.BytesIO = None): cfg = ctx.Config() signer = oci.auth.signers.get_resource_principals_signer() try: topic_name = str(cfg["topic_name"]) bootstrap_server = str(cfg["bootstrap_server"]) security_protocol = str(cfg["security_protocol"]) secret_name = str(cfg["ca_cert_secret_name"]) client_cert_secret = str(cfg["client_cert_secret_name"]) vauld_ocid = str(cfg["vauld_ocid"]) except: logging.error('Some of the function config keys are not set') raise try: body = json.loads(data.getvalue()) except (Exception, ValueError) as ex: logging.getLogger().info('error parsing json payload: ' + str(ex)) try: client_certificate = decodeSecret( p_signer=signer, p_secretName=client_cert_secret, p_vaultOCID=vauld_ocid) except (Exception, ValueError) as ex: logging.getLogger().info('error retrieving the client certificate from vault: ' + str(ex)) try: decoded_secret = decodeSecret( p_signer=signer, p_secretName=secret_name, p_vaultOCID=vauld_ocid) except (Exception, ValueError) as ex: logging.getLogger().info('error retrieving the secret: ' + str(ex)) try: sent_logs = publish_message(topic=topic_name, bootstrap_srv=bootstrap_server, security_protocol=security_protocol, ca_pem=decoded_secret, client_pem=client_certificate, record_value=bytes(str(body[0]), encoding='utf-8')) logging.info(f'log is sent {sent_logs}') except (Exception, ValueError) as ex: logging.getLogger().info('error in publishing the message: ' + str(ex)) def decodeSecret(p_signer, p_secretName, p_vaultOCID): secretClient = oci.secrets.SecretsClient(config={}, signer=p_signer) secret = secretClient.get_secret_bundle_by_name( secret_name=p_secretName, vault_id=p_vaultOCID).data secret_content = secret.secret_bundle_content.content.encode("utf-8") decodedSecret = base64.b64decode(secret_content).decode("utf-8") return decodedSecret def delivery_report(errmsg, msg): if errmsg is not None: print("Delivery failed for Message: {} : {}".format(msg.key(), errmsg)) return print('Message successfully produced to Topic:{} at offset {}'.format( msg.topic(), msg.offset())) def publish_message(topic, bootstrap_srv, security_protocol, ca_pem, client_pem, record_value): conf = { 'bootstrap.servers': bootstrap_srv, 'security.protocol': security_protocol, 'ssl.certificate.pem': client_pem, 'ssl.ca.pem': ca_pem } producer = Producer(conf) produce_log = producer.produce(topic, key=None, value=record_value, on_delivery=delivery_report) producer.flush() return produce_log
-
然后替换 func.yaml 的内容。使用相应的信息填写字段:
schema_version: 20180708 name: logs_to_kafka version: 0.0.1 runtime: python build_image: fnproject/python:3.9-dev run_image: fnproject/python:3.9 entrypoint: /python/bin/fdk /function/func.py handler memory: 256 config: bootstrap_server: <kafka_server_public_ip>:9093 ca_cert_secret_name: ca_cert_secret_name client_cert_secret_name: client_cert_secret_name security_protocol: SSL topic_name: quickstart-events vauld_ocid: <vault_ocid>
-
对于 requirements.txt ,您需要执行以下操作:
fdk>=0.1.50 confluent-kafka>=1.4.2 oci>=2.57.0
-
-
转至 Analytics & AI 、 Service Connector Hub ,然后选择创建服务连接器。填写以下图像中所示的字段。
任务 4:验证日志是否已发送到 Kafka 服务器
-
在具有 Kafka 服务器文件夹的实例上运行客户机,因为所有 SSL 证书和带有二进制文件的 Kafka 文件夹也用于服务器。
-
首先,具有以下 consumer.properties 文件:
security.protocol=SSL ssl.truststore.location=<path to kafka.client.truststore.jks> ssl.truststore.password=<password> ssl.keystore.location=<path to kafka.client.keystore.jks> ssl.keystore.password=<password> ssl.key.password=<password> ssl.endpoint.identification.algorithm=
-
然后在 Kafka 文件夹中运行以下命令:
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server <instance_public_ip>:9093 --consumer.config <path_to_consumer.properties>
相关链接
确认
- 作者 - Cristian Vlad(首席云架构师)
- 贡献者 - Gabriel Feodorov(云工程师)
更多学习资源
探索 docs.oracle.com/learn 上的其他实验室,或者访问 Oracle Learning YouTube 频道上的更多免费学习内容。此外,请访问 education.oracle.com/learning-explorer 成为 Oracle Learning Explorer。
有关产品文档,请访问 Oracle 帮助中心。
Push logs from OCI Logging into mTLS configured Kafka using OCI Functions
F79041-01
March 2023
Copyright © 2023, Oracle and/or its affiliates.