Nota:

Transferir logs desde OCI Logging a Kafka configurado de mTLS mediante funciones de OCI

Introducción

Los logs representan el diario para los recursos y allí obtiene registros sobre cada mensaje de los sistemas de red. Tener muchos recursos en la infraestructura en la nube de una organización implica la necesidad de mantener su rendimiento y seguridad. Es necesario recopilar los registros, el proceso de recopilar los datos de los registros de diversos recursos dentro del ecosistema de la organización y reunirlos en un punto central de análisis. En este tutorial se describe el proceso de configuración de los recursos necesarios en Oracle Cloud Infrastructure (OCI) para transferir logs a una configuración de servidor de Kafka con mTLS ubicada fuera de OCI.

Con esta configuración, tendrá una solución que obtiene logs en tiempo real de su arrendamiento de OCI en un punto central de análisis. Además, puede enlazar una Logstash al servidor de Kafka y transformar/mejorar los datos según sus requisitos y colocarlos en el índice ElasticSearch.

Arquitectura de la solución

Objetivos

Requisitos

Tarea 1: Configurar la configuración de Oracle Cloud Infrastructure

Esta sección incluye los pasos iniciales que se utilizarán para crear la infraestructura necesaria en OCI. Creará los siguientes recursos:

  1. Crear almacén con secretos y claves de cifrado maestras

    1. Vaya al menú Identidad y seguridad, seleccione Almacén y, a continuación, seleccione Crear almacén.

    2. Introduzca un nombre para el almacén y haga clic en Crear almacén.

      Ventana Crear almacén

    3. Después de crear el almacén, haga clic en el almacén que ha creado y cree una clave de cifrado maestra con el algoritmo AES. Esta clave se utilizará para cifrar aún más los secretos del almacén en este blog.

  2. Crear registro de contenedor para la imagen que utilizará la función de OCI

    1. Vaya a Servicio para desarrolladores, Contenedor y artefactos y seleccione Registro de contenedor.

    2. Cree un repositorio privado denominado logs_to_kafka.

      Ventana Crear Repositorio

  3. Creación de la aplicación de Functions

    1. Para ingerir logs en el servidor de Kafka, se necesitará un cliente de Kafka que se publique en el tema de Kafka. Vaya a Servicios para desarrolladores en la sección Funciones.

    2. Seleccione Crear aplicación y asígnele el nombre logs_to_kafka.

    3. Seleccione la VCN que ha creado en la sección Requisitos previos de este blog y seleccione la subred pública en la VCN en la que desea desplegar la aplicación. Asegúrese de que tiene las reglas de seguridad y de ruta necesarias para permitir el tráfico fuera de la VCN de esa subred pública.

    Crear ventana de aplicación

Tarea 2: Instalación y configuración del servidor de Kafka

  1. Cree una instancia de Linux en la que se desplegará el servidor de Kafka.

    1. En primer lugar, cree la instancia de VM y active el acceso ssh para instalar y configurar Kafka con mTLS.

    2. Acceda a la máquina virtual mediante ssh y siga los pasos de quickstart para instalar Kafka en esa instancia. 

  2. Configure mTLS para el servidor de Kafka y cree los certificados utilizados para mTLS. Vaya al directorio raíz de la instancia y ejecute los siguientes comandos:

    $ mkdir ~/ssl
    # This one, ca-cert is the certificate authority you will use in the function
    $ openssl req -new -x509 -keyout ssl/ca-key -out ssl/ca-cert -days 365 -nodes
    $ keytool -keystore ssl/kafka.server.keystore.jks -alias server -validity 365 -genkey -keyalg RSA -storepass <password> -keypass <password>
    $ keytool -keystore ssl/kafka.client.keystore.jks -alias client -validity 365 -genkey -keyalg RSA -storepass <password> -keypass <password>
    $ keytool -keystore ssl/kafka.server.truststore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt
    $ keytool -keystore ssl/kafka.client.truststore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt
    $ keytool -keystore ssl/kafka.server.keystore.jks -alias server -storepass <password> -certreq -file ssl/server-cert-file
    $ keytool -keystore ssl/kafka.client.keystore.jks -alias client -storepass<password> -certreq -file ssl/client-cert-file
    $ openssl x509 -req -CA ssl/ca-cert -CAkey ssl/ca-key -in ssl/server-cert-file -out ssl/server-cert-signed -days 365 -CAcreateserial -passin pass:<password>
    $ openssl x509 -req -CA ssl/ca-cert -CAkey ssl/ca-key -in ssl/client-cert-file -out ssl/client-cert-signed -days 365 -CAcreateserial -passin pass:<password>
    $ keytool -keystore ssl/kafka.server.keystore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt
    $ keytool -keystore ssl/kafka.server.keystore.jks -alias server -storepass <password> -importcert -file ssl/server-cert-signed -noprompt
    $ keytool -keystore ssl/kafka.client.keystore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt
    $ keytool -keystore ssl/kafka.client.keystore.jks -alias client -storepass <password> -importcert -file ssl/client-cert-signed -noprompt
    # This one, other_client_cert.pem is the client certificate you use in the function
    $ keytool -exportcert -alias client -keystore kafka.client.keystore.jks -rfc -file other_client_cert.pem
    
  3. Edite server.properties desde la carpeta config/ del servidor de Kafka.

  4. Después de crear las claves y los certificados SSL, deberá agregarlos a la configuración de Kafka. Edite config/server.properties en la carpeta Kafka y agregue las siguientes líneas:

    listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9093
    advertised.listeners=PLAINTEXT://<hostname>:9092,SSL://<public ip of hostname:9093
    ssl.keystore.location=<path to ssl/kafka.server.keystore.jks>
    ssl.keystore.password=<password>
    ssl.key.password=<password>
    ssl.truststore.location=<path to ssl/kafka.server.truststore.jks> 
    ssl.truststore.password=<password>
    
  5. Agregue permisos de puertos al firewall de la instancia para los puertos del servidor de Kafka. En términos de seguridad, en primer lugar, deberá asegurarse de que el firewall dentro de la máquina virtual esté configurado para permitir el acceso en los puertos de Kafka.

    sudo firewall-cmd --add-port=9093/tcp
    sudo firewall-cmd --add-port=9092/tcp
    
  6. Asegúrese de que tiene la red configurada para permitir el tráfico a esa instancia. Después de confirmar que el firewall dentro de la máquina virtual tiene la configuración adecuada, debe configurar reglas de seguridad y reglas de ruta o cualquier otro tipo de firewall que filtre el tráfico para permitir el tráfico a esa instancia en puertos específicos de Kafka.

  7. Ejecute los siguientes comandos para ejecutar el servidor de Kafka en segundo plano.

    nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
    nohup bin/kafka-server-start.sh config/server.properties &
    

Tarea 3: Configuración de OCI Function y Service Connector Hub

Asegúrese de tener políticas para crear, desplegar y gestionar funciones y aplicaciones. También debe permitir el acceso de FAAS de servicio al almacén, los secretos y las claves.

  1. Cree secretos ca-cert y client_cert.pem en el almacén de OCI creado anteriormente.

    ca-cert

    Crear ventana de certificación de CA secreta

    client_cert.pem

    Crear ventana de cliente secreto

  2. En la consola de OCI, vaya a Función, Configuración y agregue las siguientes variables:

    Ventana Crear configuración de función

  3. Configurar funciones de OCI

    1. Vaya a Servicios para desarrolladores en la sección Funciones.

    2. Vaya a la aplicación ya creada logs_to_kafka y seleccione Introducción; a continuación, seleccione Configuración de Cloud Shell y realice los pasos hasta que aparezca el comando de conexión de docker.

      Ventana de introducción de función

    3. Genere una función de texto fijo logs_to_kafka.

      fn init --runtime python logs_to_kafka
      

      El comando fn init generará una carpeta denominada logs_to_kafka con 3 archivos dentro; func.py, func.yaml y requirements.txt.

    4. Abra func.py y reemplace el contenido del archivo por el siguiente código:

      import io
      import json
      import logging
      import oci
      import base64
      from confluent_kafka import Producer, KafkaError
      
      def handler(ctx, data: io.BytesIO = None):
          cfg = ctx.Config()
          signer = oci.auth.signers.get_resource_principals_signer()
          try:
              topic_name = str(cfg["topic_name"])
              bootstrap_server = str(cfg["bootstrap_server"])
              security_protocol = str(cfg["security_protocol"])
              secret_name = str(cfg["ca_cert_secret_name"])
              client_cert_secret = str(cfg["client_cert_secret_name"])
              vauld_ocid = str(cfg["vauld_ocid"])
          except:
              logging.error('Some of the function config keys are not set')
              raise
          try:
              body = json.loads(data.getvalue())
          except (Exception, ValueError) as ex:
              logging.getLogger().info('error parsing json payload: ' + str(ex))
          try:
              client_certificate = decodeSecret(
              p_signer=signer, p_secretName=client_cert_secret, p_vaultOCID=vauld_ocid)
          except (Exception, ValueError) as ex:
              logging.getLogger().info('error retrieving the client certificate from vault: ' + str(ex))
          try:
              decoded_secret = decodeSecret(
                  p_signer=signer, p_secretName=secret_name, p_vaultOCID=vauld_ocid)
          except (Exception, ValueError) as ex:
              logging.getLogger().info('error retrieving the secret: ' + str(ex))
          try:
              sent_logs = publish_message(topic=topic_name, bootstrap_srv=bootstrap_server, security_protocol=security_protocol,
                               ca_pem=decoded_secret, client_pem=client_certificate, record_value=bytes(str(body[0]), encoding='utf-8'))
              logging.info(f'log is sent {sent_logs}')
          except (Exception, ValueError) as ex:
              logging.getLogger().info('error in publishing the message: ' + str(ex))
      
      def decodeSecret(p_signer, p_secretName, p_vaultOCID):
          secretClient = oci.secrets.SecretsClient(config={}, signer=p_signer)
          secret = secretClient.get_secret_bundle_by_name(
                                      secret_name=p_secretName, vault_id=p_vaultOCID).data
          secret_content = secret.secret_bundle_content.content.encode("utf-8")
          decodedSecret = base64.b64decode(secret_content).decode("utf-8")
          return decodedSecret
      
      def delivery_report(errmsg, msg):
          if errmsg is not None:
              print("Delivery failed for Message: {} : {}".format(msg.key(), errmsg))
              return
          print('Message successfully produced to Topic:{} at offset {}'.format(
                  msg.topic(), msg.offset()))
      
      def publish_message(topic, bootstrap_srv, security_protocol, ca_pem, client_pem, record_value):
          conf = {
              'bootstrap.servers': bootstrap_srv,
              'security.protocol': security_protocol,
              'ssl.certificate.pem': client_pem,
              'ssl.ca.pem': ca_pem
          }
          producer = Producer(conf)
          produce_log = producer.produce(topic, key=None, value=record_value, on_delivery=delivery_report)
          producer.flush()
          return produce_log
      
    5. A continuación, reemplace el contenido de func.yaml. Rellene los campos con la información adecuada:

      schema_version: 20180708
      name: logs_to_kafka
      version: 0.0.1
      runtime: python
      build_image: fnproject/python:3.9-dev
      run_image: fnproject/python:3.9
      entrypoint: /python/bin/fdk /function/func.py handler
      memory: 256
      config:
          bootstrap_server: <kafka_server_public_ip>:9093
          ca_cert_secret_name: ca_cert_secret_name
          client_cert_secret_name: client_cert_secret_name
          security_protocol: SSL
          topic_name: quickstart-events
          vauld_ocid: <vault_ocid>
      
    6. Para requirements.txt deberá tener lo siguiente:

      fdk>=0.1.50
      confluent-kafka>=1.4.2
      oci>=2.57.0
      
  4. Vaya a Análisis e IA, Hub de conector de servicio y seleccione Crear conector de servicio. Complete los campos como se muestra en la siguiente imagen.

    Ventana Crear Conector de Servicio

Tarea 4: Verificar si los logs se envían al servidor de Kafka

  1. Ejecute un cliente en la instancia en la que tiene la carpeta del servidor de Kafka, porque todos los certificados SSL y la carpeta de Kafka con binarios también se utilizan para el servidor. 

  2. En primer lugar, tenga el siguiente archivo consumer.properties:

    security.protocol=SSL
    ssl.truststore.location=<path to kafka.client.truststore.jks>
    ssl.truststore.password=<password>
    ssl.keystore.location=<path to kafka.client.keystore.jks>
    ssl.keystore.password=<password>
    ssl.key.password=<password>
    ssl.endpoint.identification.algorithm=
    
  3. A continuación, ejecute el siguiente comando en la carpeta Kafka: 

    bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server <instance_public_ip>:9093 --consumer.config <path_to_consumer.properties>
    

Confirmaciones

Más recursos de aprendizaje

Explore otros laboratorios en docs.oracle.com/learn o acceda a más contenido de aprendizaje gratuito en el canal YouTube de Oracle Learning. Además, visite education.oracle.com/learning-explorer para convertirse en un explorador de Oracle Learning.

Para obtener documentación sobre los productos, visite Oracle Help Center.