Hinweis:

Logs aus OCI Logging mit OCI Functions in mTLS konfiguriertes Kafka pushen

Einführung

Logs stellen das Tagebuch für Ressourcen dar, und Sie erhalten Datensätze zu jeder Nachricht von den Netzwerksystemen. Wenn viele Ressourcen in der Cloud-Infrastruktur eines Unternehmens vorhanden sind, müssen Performance und Sicherheit aufrechterhalten werden. Es ist eine Logerfassung erforderlich, der Prozess zum Erfassen von Logdaten aus verschiedenen Ressourcen im Ökosystem des Unternehmens und zum Zusammenführen von Logdaten in einem zentralen Punkt für die Analyse. In diesem Tutorial wird beschrieben, wie Sie in Oracle Cloud Infrastructure (OCI) erforderliche Ressourcen einrichten, um Logs in ein Kafka-Serversetup mit mTLS außerhalb von OCI zu übertragen.

Mit dieser Konfiguration erhalten Sie eine Lösung, mit der Echtzeitlogs von Ihrem OCI-Mandanten zu einem zentralen Analysepunkt abgerufen werden. Darüber hinaus können Sie einen Logstash mit Ihrem Kafka-Server verknüpfen und die Daten entsprechend Ihrer Anforderung transformieren/optimieren und im ElasticSearch-Index speichern.

Lösungsarchitektur

Ziele

Voraussetzungen

Aufgabe 1: Oracle Cloud Infrastructure-Setup konfigurieren

Dieser Abschnitt enthält die ersten Schritte zum Erstellen der Infrastruktur, die in OCI erforderlich ist. Sie erstellen die folgenden Ressourcen:

  1. Vault mit Masterverschlüsselungsschlüsseln und Secrets erstellen

    1. Gehen Sie zum Menü Identität und Sicherheit, wählen Sie Vault aus, und wählen Sie Vault erstellen aus.

    2. Geben Sie einen Namen für den Vault ein, und klicken Sie auf Vault erstellen.

      Fenster "Vault erstellen"

    3. Nachdem der Vault erstellt wurde, klicken Sie auf den erstellten Vault, und erstellen Sie einen Masterverschlüsselungsschlüssel mit einem AES-Algorithmus. Dieser Schlüssel wird verwendet, um die Vault Secrets weiter in diesem Blog zu verschlüsseln.

  2. Container Registry für das Image erstellen, das von der OCI-Funktion verwendet wird

    1. Gehen Sie zu Entwicklerservice, Container und Artefakte, und wählen Sie Container Registry aus.

    2. Erstellen Sie ein privates Repository namens logs_to_kafka.

      Fenster "Repository erstellen"

  3. Functions-Anwendung erstellen

    1. Um Logs auf dem Kafka-Server aufzunehmen, benötigen Sie einen Kafka-Client, der in Kafkas Thema veröffentlicht wird. Gehen Sie zu Entwicklerservices im Abschnitt Funktionen.

    2. Wählen Sie Anwendung erstellen aus, und benennen Sie die Anwendung logs_to_kafka.

    3. Wählen Sie das VCN aus, das Sie im Abschnitt Voraussetzungen dieses Blogs erstellt haben, und wählen Sie das öffentliche Subnetz im VCN aus, in dem die Anwendung bereitgestellt werden soll. Stellen Sie sicher, dass Sie über die erforderlichen Sicherheitsregeln und Routingregeln verfügen, um Traffic außerhalb des VCN von diesem öffentlichen Subnetz zuzulassen.

    Anwendungsfenster erstellen

Aufgabe 2: Kafka-Server installieren und konfigurieren

  1. Erstellen Sie eine Linux-Instanz, auf der Kafka-Server bereitgestellt wird.

    1. Erstellen Sie zunächst die VM-Instanz, und aktivieren Sie ssh, um Kafka mit mTLS zu installieren und zu konfigurieren.

    2. Rufen Sie die VM mit ssh auf, und befolgen Sie die Schritte von quickstart, um Kafka auf dieser Instanz zu installieren. 

  2. Konfigurieren Sie mTLS für den Kafka-Server, und erstellen Sie die Zertifikate für mTLS. Gehen Sie zum Home-Verzeichnis der Instanz, und führen Sie die folgenden Befehle aus:

    $ mkdir ~/ssl
    # This one, ca-cert is the certificate authority you will use in the function
    $ openssl req -new -x509 -keyout ssl/ca-key -out ssl/ca-cert -days 365 -nodes
    $ keytool -keystore ssl/kafka.server.keystore.jks -alias server -validity 365 -genkey -keyalg RSA -storepass <password> -keypass <password>
    $ keytool -keystore ssl/kafka.client.keystore.jks -alias client -validity 365 -genkey -keyalg RSA -storepass <password> -keypass <password>
    $ keytool -keystore ssl/kafka.server.truststore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt
    $ keytool -keystore ssl/kafka.client.truststore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt
    $ keytool -keystore ssl/kafka.server.keystore.jks -alias server -storepass <password> -certreq -file ssl/server-cert-file
    $ keytool -keystore ssl/kafka.client.keystore.jks -alias client -storepass<password> -certreq -file ssl/client-cert-file
    $ openssl x509 -req -CA ssl/ca-cert -CAkey ssl/ca-key -in ssl/server-cert-file -out ssl/server-cert-signed -days 365 -CAcreateserial -passin pass:<password>
    $ openssl x509 -req -CA ssl/ca-cert -CAkey ssl/ca-key -in ssl/client-cert-file -out ssl/client-cert-signed -days 365 -CAcreateserial -passin pass:<password>
    $ keytool -keystore ssl/kafka.server.keystore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt
    $ keytool -keystore ssl/kafka.server.keystore.jks -alias server -storepass <password> -importcert -file ssl/server-cert-signed -noprompt
    $ keytool -keystore ssl/kafka.client.keystore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt
    $ keytool -keystore ssl/kafka.client.keystore.jks -alias client -storepass <password> -importcert -file ssl/client-cert-signed -noprompt
    # This one, other_client_cert.pem is the client certificate you use in the function
    $ keytool -exportcert -alias client -keystore kafka.client.keystore.jks -rfc -file other_client_cert.pem
    
  3. Bearbeiten Sie server.properties aus dem Ordner config/ des Kafka-Servers.

  4. Nachdem die SSL-Schlüssel und -Zertifikate erstellt wurden, müssen Sie sie der Kafka-Konfiguration hinzufügen. Bearbeiten Sie config/server.properties im Kafka-Ordner, und fügen Sie die folgenden Zeilen hinzu:

    listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9093
    advertised.listeners=PLAINTEXT://<hostname>:9092,SSL://<public ip of hostname:9093
    ssl.keystore.location=<path to ssl/kafka.server.keystore.jks>
    ssl.keystore.password=<password>
    ssl.key.password=<password>
    ssl.truststore.location=<path to ssl/kafka.server.truststore.jks> 
    ssl.truststore.password=<password>
    
  5. Fügen Sie der Firewall der Instanz für die Kafka-Serverports eine Portzulage hinzu. Aus Sicherheitsgründen müssen Sie zunächst sicherstellen, dass die Firewall innerhalb der VM für den Zugriff auf Kafka-Ports konfiguriert ist.

    sudo firewall-cmd --add-port=9093/tcp
    sudo firewall-cmd --add-port=9092/tcp
    
  6. Stellen Sie sicher, dass das Netzwerk so konfiguriert ist, dass Traffic zu dieser Instanz zugelassen wird. Nachdem Sie bestätigt haben, dass die Firewall innerhalb der VM die richtige Konfiguration aufweist, müssen Sie Sicherheitsregeln und Routingregeln oder einen anderen Firewalltyp konfigurieren, der Traffic filtert, um Traffic zu dieser Instanz auf Kafka-spezifischen Ports zuzulassen.

  7. Führen Sie die folgenden Befehle aus, um den Kafka-Server im Hintergrund auszuführen.

    nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
    nohup bin/kafka-server-start.sh config/server.properties &
    

Aufgabe 3: OCI-Funktion und Service Connector Hub einrichten

Stellen Sie sicher, dass Sie über Policys verfügen, um Funktionen und Anwendungen zu erstellen, bereitzustellen und zu verwalten. Außerdem müssen Sie Service-FAAS-Zugriff auf Vault, Secrets und Schlüssel zulassen.

  1. Erstellen Sie die Secrets ca-cert und client_cert.pem im zuvor erstellten OCI Vault.

    ca.

    Secret-CA-Zertifikatsfenster erstellen

    client_cert.pem

    Secret-Clientcert-Fenster erstellen

  2. Gehen Sie in der OCI-Konsole zu Funktion, Konfiguration, und fügen Sie die folgenden Variablen hinzu:

    Fenster "Funktionskonfiguration erstellen"

  3. OCI-Funktionen einrichten

    1. Gehen Sie zu Entwicklerservices im Abschnitt Funktionen.

    2. Gehen Sie zur bereits erstellten Anwendung logs_to_kafka, und wählen Sie Erste Schritte aus. Wählen Sie dann Cloud Shell-Setup aus, und führen Sie die Schritte aus, bis der Docker-Anmeldebefehl ausgeführt wird.

      Funktion - Fenster "Erste Schritte"

    3. Generieren Sie eine logs_to_kafka-Boilerplate-Funktion.

      fn init --runtime python logs_to_kafka
      

      Der fn-Initialisierungsbefehl generiert einen Ordner mit dem Namen logs_to_kafka und 3 darin enthaltenen Dateien: func.py, func.yaml und requirements.txt.

    4. Öffnen Sie func.py, und ersetzen Sie den Inhalt der Datei durch den folgenden Code:

      import io
      import json
      import logging
      import oci
      import base64
      from confluent_kafka import Producer, KafkaError
      
      def handler(ctx, data: io.BytesIO = None):
          cfg = ctx.Config()
          signer = oci.auth.signers.get_resource_principals_signer()
          try:
              topic_name = str(cfg["topic_name"])
              bootstrap_server = str(cfg["bootstrap_server"])
              security_protocol = str(cfg["security_protocol"])
              secret_name = str(cfg["ca_cert_secret_name"])
              client_cert_secret = str(cfg["client_cert_secret_name"])
              vauld_ocid = str(cfg["vauld_ocid"])
          except:
              logging.error('Some of the function config keys are not set')
              raise
          try:
              body = json.loads(data.getvalue())
          except (Exception, ValueError) as ex:
              logging.getLogger().info('error parsing json payload: ' + str(ex))
          try:
              client_certificate = decodeSecret(
              p_signer=signer, p_secretName=client_cert_secret, p_vaultOCID=vauld_ocid)
          except (Exception, ValueError) as ex:
              logging.getLogger().info('error retrieving the client certificate from vault: ' + str(ex))
          try:
              decoded_secret = decodeSecret(
                  p_signer=signer, p_secretName=secret_name, p_vaultOCID=vauld_ocid)
          except (Exception, ValueError) as ex:
              logging.getLogger().info('error retrieving the secret: ' + str(ex))
          try:
              sent_logs = publish_message(topic=topic_name, bootstrap_srv=bootstrap_server, security_protocol=security_protocol,
                               ca_pem=decoded_secret, client_pem=client_certificate, record_value=bytes(str(body[0]), encoding='utf-8'))
              logging.info(f'log is sent {sent_logs}')
          except (Exception, ValueError) as ex:
              logging.getLogger().info('error in publishing the message: ' + str(ex))
      
      def decodeSecret(p_signer, p_secretName, p_vaultOCID):
          secretClient = oci.secrets.SecretsClient(config={}, signer=p_signer)
          secret = secretClient.get_secret_bundle_by_name(
                                      secret_name=p_secretName, vault_id=p_vaultOCID).data
          secret_content = secret.secret_bundle_content.content.encode("utf-8")
          decodedSecret = base64.b64decode(secret_content).decode("utf-8")
          return decodedSecret
      
      def delivery_report(errmsg, msg):
          if errmsg is not None:
              print("Delivery failed for Message: {} : {}".format(msg.key(), errmsg))
              return
          print('Message successfully produced to Topic:{} at offset {}'.format(
                  msg.topic(), msg.offset()))
      
      def publish_message(topic, bootstrap_srv, security_protocol, ca_pem, client_pem, record_value):
          conf = {
              'bootstrap.servers': bootstrap_srv,
              'security.protocol': security_protocol,
              'ssl.certificate.pem': client_pem,
              'ssl.ca.pem': ca_pem
          }
          producer = Producer(conf)
          produce_log = producer.produce(topic, key=None, value=record_value, on_delivery=delivery_report)
          producer.flush()
          return produce_log
      
    5. Ersetzen Sie dann den Inhalt von func.yaml. Füllen Sie die Felder mit den entsprechenden Informationen aus:

      schema_version: 20180708
      name: logs_to_kafka
      version: 0.0.1
      runtime: python
      build_image: fnproject/python:3.9-dev
      run_image: fnproject/python:3.9
      entrypoint: /python/bin/fdk /function/func.py handler
      memory: 256
      config:
          bootstrap_server: <kafka_server_public_ip>:9093
          ca_cert_secret_name: ca_cert_secret_name
          client_cert_secret_name: client_cert_secret_name
          security_protocol: SSL
          topic_name: quickstart-events
          vauld_ocid: <vault_ocid>
      
    6. Für requirements.txt benötigen Sie Folgendes:

      fdk>=0.1.50
      confluent-kafka>=1.4.2
      oci>=2.57.0
      
  4. Gehen Sie zu Analysen und KI, Service Connector Hub, und wählen Sie Service-Connector erstellen aus. Füllen Sie die Felder aus, wie in der folgenden Abbildung gezeigt.

    Fenster "Service-Connector erstellen"

Aufgabe 4: Prüfen, ob die Logs an den Kafka-Server gesendet werden

  1. Führen Sie einen Client auf der Instanz aus, auf der Sie den Ordner für den Kafka-Server haben, da alle SSL-Zertifikate und der Kafka-Ordner mit Binärdateien auch für den Server verwendet werden. 

  2. Verwenden Sie zunächst die folgende consumer.properties-Datei:

    security.protocol=SSL
    ssl.truststore.location=<path to kafka.client.truststore.jks>
    ssl.truststore.password=<password>
    ssl.keystore.location=<path to kafka.client.keystore.jks>
    ssl.keystore.password=<password>
    ssl.key.password=<password>
    ssl.endpoint.identification.algorithm=
    
  3. Führen Sie dann den folgenden Befehl im Kafka-Ordner aus: 

    bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server <instance_public_ip>:9093 --consumer.config <path_to_consumer.properties>
    

Danksagungen

Weitere Lernressourcen

Sehen Sie sich andere Übungen zu docs.oracle.com/learn an, oder greifen Sie auf weitere kostenlose Lerninhalte im Oracle Learning YouTube-Kanal zu. Besuchen Sie außerdem die Website education.oracle.com/learning-explorer, um Oracle Learning Explorer zu werden.

Produktdokumentation finden Sie im Oracle Help Center.