注:

使用 OCI 函数将日志从 OCI 登录到 mTLS 配置的 Kafka

简介

日志表示资源的日志,您可以从网络系统获取每条消息的记录。在组织的云基础设施中拥有大量资源,因此需要维护其性能和安全性。需要进行日志收集,即从组织生态系统内的各种资源收集日志数据并将其整合到一个分析的中心点的过程。本教程介绍了设置 Oracle Cloud Infrastructure (OCI) 中将日志推送到位于 OCI 外部的 mTLS 的 Kafka 服务器设置所需的资源的过程。

使用此配置,您可以将实时日志从 OCI 租户获取到中心分析点。此外,您可以将 Logstash 链接到 Kafka 服务器并根据需要转换/增强数据,并将其置于 ElasticSearch 索引中。

解决方案架构

目标

先决条件

任务 1:配置 Oracle Cloud Infrastructure 设置

本部分包含用于在 OCI 中创建所需的基础结构的初始步骤。您将创建以下资源:

  1. 使用主加密密钥和密钥创建 Vault

    1. 转到 Identity & Security 菜单,选择 Vault ,然后选择 Create Vault

    2. 输入 Vault 的名称,然后单击创建 Vault

      创建 Vault 窗口

    3. 创建 Vault 之后,单击您创建的 Vault 并使用 AES 算法创建主加密密钥。此密钥将用于在此博客中进一步加密 Vault 密钥。

  2. 为 OCI 函数将使用的映像创建容器注册表

    1. 转到开发人员服务容器和构件,然后选择容器注册表

    2. 创建名为 logs_to_kafka 的专用系统信息库。

      创建资料档案库窗口

  3. 创建函数应用程序

    1. 要将日志提取到 Kafka 服务器,需要一个 Kafka 客户机,该客户机将发布到 Kafka 的主题中。转至函数部分中的开发人员服务

    2. 选择创建应用程序,并将应用程序命名为 logs_to_kafka

    3. 选择在此博客的先决条件部分中创建的 VCN,然后选择要在其中部署应用程序的 VCN 中的公共子网。请确保您具有必需的安全规则和路由规则,以允许 VCN 外部来自该公共子网的流量。

    创建应用程序窗口

任务 2:安装和配置 Kafka 服务器

  1. 创建将部署 Kafka 服务器的 Linux 实例。

    1. 首先,创建 VM 实例并启用 ssh 访问权限以安装和配置具有 mTLS 的 Kafka。

    2. 使用 ssh 访问 VM,然后按照 quickstart 中的步骤在该实例上安装 Kafka。 

  2. 为 Kafka 服务器配置 mTLS 并创建用于 mTLS 的证书。转到实例的起始目录并运行以下命令:

    $ mkdir ~/ssl
    # This one, ca-cert is the certificate authority you will use in the function
    $ openssl req -new -x509 -keyout ssl/ca-key -out ssl/ca-cert -days 365 -nodes
    $ keytool -keystore ssl/kafka.server.keystore.jks -alias server -validity 365 -genkey -keyalg RSA -storepass <password> -keypass <password>
    $ keytool -keystore ssl/kafka.client.keystore.jks -alias client -validity 365 -genkey -keyalg RSA -storepass <password> -keypass <password>
    $ keytool -keystore ssl/kafka.server.truststore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt
    $ keytool -keystore ssl/kafka.client.truststore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt
    $ keytool -keystore ssl/kafka.server.keystore.jks -alias server -storepass <password> -certreq -file ssl/server-cert-file
    $ keytool -keystore ssl/kafka.client.keystore.jks -alias client -storepass<password> -certreq -file ssl/client-cert-file
    $ openssl x509 -req -CA ssl/ca-cert -CAkey ssl/ca-key -in ssl/server-cert-file -out ssl/server-cert-signed -days 365 -CAcreateserial -passin pass:<password>
    $ openssl x509 -req -CA ssl/ca-cert -CAkey ssl/ca-key -in ssl/client-cert-file -out ssl/client-cert-signed -days 365 -CAcreateserial -passin pass:<password>
    $ keytool -keystore ssl/kafka.server.keystore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt
    $ keytool -keystore ssl/kafka.server.keystore.jks -alias server -storepass <password> -importcert -file ssl/server-cert-signed -noprompt
    $ keytool -keystore ssl/kafka.client.keystore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt
    $ keytool -keystore ssl/kafka.client.keystore.jks -alias client -storepass <password> -importcert -file ssl/client-cert-signed -noprompt
    # This one, other_client_cert.pem is the client certificate you use in the function
    $ keytool -exportcert -alias client -keystore kafka.client.keystore.jks -rfc -file other_client_cert.pem
    
  3. 从 Kafka 服务器的 config/ 文件夹中编辑 server.properties

  4. 创建 SSL 密钥和证书后,您需要将其添加到 Kafka 配置中。编辑 Kafka 文件夹下的 config/server.properties 并添加以下行:

    listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9093
    advertised.listeners=PLAINTEXT://<hostname>:9092,SSL://<public ip of hostname:9093
    ssl.keystore.location=<path to ssl/kafka.server.keystore.jks>
    ssl.keystore.password=<password>
    ssl.key.password=<password>
    ssl.truststore.location=<path to ssl/kafka.server.truststore.jks> 
    ssl.truststore.password=<password>
    
  5. 为 Kafka 服务器端口的实例防火墙添加端口容限。在安全方面,首先,您需要确保 VM 内的防火墙配置为允许访问 Kafka 端口。

    sudo firewall-cmd --add-port=9093/tcp
    sudo firewall-cmd --add-port=9092/tcp
    
  6. 确保已将网络配置为允许该实例的流量。确认 VM 内的防火墙具有正确的配置后,必须配置安全规则和路由规则,或者配置任何其他类型的防火墙来过滤流量,以允许在特定于 Kafka 的端口上与该实例通信。

  7. 运行以下命令在后台运行 Kafka 服务器。

    nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
    nohup bin/kafka-server-start.sh config/server.properties &
    

任务 3:设置 OCI 函数和服务连接器中心

确保您具有策略来创建、部署和管理函数和应用程序。还必须允许服务 FAAS 访问 Vault、密钥和密钥。

  1. 在以前创建的 OCI Vault 中创建 ca-certclient_cert.pem 密钥。

    咖啡屋

    创建密钥 CA 证书窗口

    client_cert.pem

    创建密钥 clientcert 窗口

  2. 在 OCI 控制台中,转至功能配置并添加以下变量:

    创建功能配置窗口

  3. 设置 OCI 功能

    1. 转至函数部分中的开发人员服务

    2. 转到已创建的应用程序 logs_to_kafka 并选择入门,然后选择 Cloud Shell 设置并执行这些步骤,直到 docker 登录命令为止。

      函数入门窗口

    3. 生成 logs_to_kafka 模板函数。

      fn init --runtime python logs_to_kafka
      

      fn init 命令将生成一个名为 logs_to_kafka 的文件夹,其中包含 3 个文件;func.pyfunc.yamlrequirements.txt

    4. 打开 func.py 并将文件的内容替换为以下代码:

      import io
      import json
      import logging
      import oci
      import base64
      from confluent_kafka import Producer, KafkaError
      
      def handler(ctx, data: io.BytesIO = None):
          cfg = ctx.Config()
          signer = oci.auth.signers.get_resource_principals_signer()
          try:
              topic_name = str(cfg["topic_name"])
              bootstrap_server = str(cfg["bootstrap_server"])
              security_protocol = str(cfg["security_protocol"])
              secret_name = str(cfg["ca_cert_secret_name"])
              client_cert_secret = str(cfg["client_cert_secret_name"])
              vauld_ocid = str(cfg["vauld_ocid"])
          except:
              logging.error('Some of the function config keys are not set')
              raise
          try:
              body = json.loads(data.getvalue())
          except (Exception, ValueError) as ex:
              logging.getLogger().info('error parsing json payload: ' + str(ex))
          try:
              client_certificate = decodeSecret(
              p_signer=signer, p_secretName=client_cert_secret, p_vaultOCID=vauld_ocid)
          except (Exception, ValueError) as ex:
              logging.getLogger().info('error retrieving the client certificate from vault: ' + str(ex))
          try:
              decoded_secret = decodeSecret(
                  p_signer=signer, p_secretName=secret_name, p_vaultOCID=vauld_ocid)
          except (Exception, ValueError) as ex:
              logging.getLogger().info('error retrieving the secret: ' + str(ex))
          try:
              sent_logs = publish_message(topic=topic_name, bootstrap_srv=bootstrap_server, security_protocol=security_protocol,
                               ca_pem=decoded_secret, client_pem=client_certificate, record_value=bytes(str(body[0]), encoding='utf-8'))
              logging.info(f'log is sent {sent_logs}')
          except (Exception, ValueError) as ex:
              logging.getLogger().info('error in publishing the message: ' + str(ex))
      
      def decodeSecret(p_signer, p_secretName, p_vaultOCID):
          secretClient = oci.secrets.SecretsClient(config={}, signer=p_signer)
          secret = secretClient.get_secret_bundle_by_name(
                                      secret_name=p_secretName, vault_id=p_vaultOCID).data
          secret_content = secret.secret_bundle_content.content.encode("utf-8")
          decodedSecret = base64.b64decode(secret_content).decode("utf-8")
          return decodedSecret
      
      def delivery_report(errmsg, msg):
          if errmsg is not None:
              print("Delivery failed for Message: {} : {}".format(msg.key(), errmsg))
              return
          print('Message successfully produced to Topic:{} at offset {}'.format(
                  msg.topic(), msg.offset()))
      
      def publish_message(topic, bootstrap_srv, security_protocol, ca_pem, client_pem, record_value):
          conf = {
              'bootstrap.servers': bootstrap_srv,
              'security.protocol': security_protocol,
              'ssl.certificate.pem': client_pem,
              'ssl.ca.pem': ca_pem
          }
          producer = Producer(conf)
          produce_log = producer.produce(topic, key=None, value=record_value, on_delivery=delivery_report)
          producer.flush()
          return produce_log
      
    5. 然后替换 func.yaml 的内容。使用相应的信息填写字段:

      schema_version: 20180708
      name: logs_to_kafka
      version: 0.0.1
      runtime: python
      build_image: fnproject/python:3.9-dev
      run_image: fnproject/python:3.9
      entrypoint: /python/bin/fdk /function/func.py handler
      memory: 256
      config:
          bootstrap_server: <kafka_server_public_ip>:9093
          ca_cert_secret_name: ca_cert_secret_name
          client_cert_secret_name: client_cert_secret_name
          security_protocol: SSL
          topic_name: quickstart-events
          vauld_ocid: <vault_ocid>
      
    6. 对于 requirements.txt ,您需要执行以下操作:

      fdk>=0.1.50
      confluent-kafka>=1.4.2
      oci>=2.57.0
      
  4. 转至 Analytics & AIService Connector Hub ,然后选择创建服务连接器。填写以下图像中所示的字段。

    创建服务连接器窗口

任务 4:验证日志是否已发送到 Kafka 服务器

  1. 在具有 Kafka 服务器文件夹的实例上运行客户机,因为所有 SSL 证书和带有二进制文件的 Kafka 文件夹也用于服务器。 

  2. 首先,具有以下 consumer.properties 文件:

    security.protocol=SSL
    ssl.truststore.location=<path to kafka.client.truststore.jks>
    ssl.truststore.password=<password>
    ssl.keystore.location=<path to kafka.client.keystore.jks>
    ssl.keystore.password=<password>
    ssl.key.password=<password>
    ssl.endpoint.identification.algorithm=
    
  3. 然后在 Kafka 文件夹中运行以下命令: 

    bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server <instance_public_ip>:9093 --consumer.config <path_to_consumer.properties>
    

确认

更多学习资源

探索 docs.oracle.com/learn 上的其他实验室,或者访问 Oracle Learning YouTube 频道上的更多免费学习内容。此外,请访问 education.oracle.com/learning-explorer 成为 Oracle Learning Explorer。

有关产品文档,请访问 Oracle 帮助中心