Nota:

Eseguire il push dei log da Log OCI in Kafka configurato mTLS utilizzando le funzioni OCI

Introduzione

I log rappresentano il diario per le risorse e vengono visualizzati record su ogni messaggio proveniente dai sistemi di rete. La disponibilità di molte risorse all'interno dell'infrastruttura cloud di un'organizzazione prevede la necessità di garantirne le prestazioni e la sicurezza. È necessario raccogliere i dati di registro da varie risorse all'interno dell'ecosistema dell'organizzazione e riunirli in un punto centrale per l'analisi. Questa esercitazione descrive il processo di impostazione delle risorse necessarie in Oracle Cloud Infrastructure (OCI) per eseguire il push dei log in un'impostazione del server Kafka con mTLS situato al di fuori di OCI.

Utilizzando questa configurazione, avrai una soluzione che ottiene log in tempo reale dalla tua tenancy OCI in un punto centrale di analisi. Inoltre, puoi collegare un Logstash al tuo server Kafka e trasformare/migliorare i dati in base alle tue esigenze e inserirlo nell'indice ElasticSearch.

Architettura della soluzione

Obiettivi

Prerequisiti

Task 1: configurare l'impostazione di Oracle Cloud Infrastructure

Questa sezione include i passi iniziali che verranno utilizzati per creare l'infrastruttura necessaria in OCI. Si creeranno le risorse riportate di seguito.

  1. Crea vault con chiavi di cifratura master e segreti

    1. Andare al menu Identità e sicurezza, selezionare Errore, quindi selezionare Crea vault.

    2. Immettere un nome per il vault e fare clic su Crea vault.

      Crea finestra Vault

    3. Dopo aver creato il vault, fare clic sul vault creato e creare una chiave di cifratura master con l'algoritmo AES. Questa chiave verrà utilizzata per cifrare ulteriormente i segreti del vault in questo blog.

  2. Crea registro contenitore per l'immagine che verrà utilizzata dalla funzione OCI

    1. Andare a Servizio sviluppatore, Container e artifact e selezionare Container Registry.

    2. Creare un repository privato denominato logs_to_kafka.

      Finestra Crea repository

  3. Crea applicazione funzioni

    1. Per includere i log nel server Kafka sarà necessario un client Kafka che verrà pubblicato nell'argomento di Kafka. Andare a Servizi per sviluppatori nella sezione Funzioni.

    2. Selezionare Crea applicazione e assegnare un nome all'applicazione logs_to_kafka.

    3. Selezionare la VCN creata nella sezione Prerequisiti di questo blog e selezionare la subnet pubblica all'interno della VCN in cui distribuire l'applicazione. Assicurarsi di disporre delle regole di sicurezza e di instradamento necessarie per consentire il traffico esterno alla VCN da tale subnet pubblica.

    Crea finestra applicazione

Task 2: installare e configurare il server Kafka

  1. Creare un'istanza Linux su cui verrà distribuito Kafka Server.

    1. In primo luogo, creare l'istanza VM e abilitare l'accesso ssh per installare e configurare Kafka con mTLS.

    2. Accedi alla VM utilizzando ssh e segui i passi di quickstart per installare Kafka su tale istanza. 

  2. Configurare mTLS per il server Kafka e creare i certificati utilizzati per mTLS. Andare alla directory home dell'istanza ed eseguire i comandi seguenti:

    $ mkdir ~/ssl
    # This one, ca-cert is the certificate authority you will use in the function
    $ openssl req -new -x509 -keyout ssl/ca-key -out ssl/ca-cert -days 365 -nodes
    $ keytool -keystore ssl/kafka.server.keystore.jks -alias server -validity 365 -genkey -keyalg RSA -storepass <password> -keypass <password>
    $ keytool -keystore ssl/kafka.client.keystore.jks -alias client -validity 365 -genkey -keyalg RSA -storepass <password> -keypass <password>
    $ keytool -keystore ssl/kafka.server.truststore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt
    $ keytool -keystore ssl/kafka.client.truststore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt
    $ keytool -keystore ssl/kafka.server.keystore.jks -alias server -storepass <password> -certreq -file ssl/server-cert-file
    $ keytool -keystore ssl/kafka.client.keystore.jks -alias client -storepass<password> -certreq -file ssl/client-cert-file
    $ openssl x509 -req -CA ssl/ca-cert -CAkey ssl/ca-key -in ssl/server-cert-file -out ssl/server-cert-signed -days 365 -CAcreateserial -passin pass:<password>
    $ openssl x509 -req -CA ssl/ca-cert -CAkey ssl/ca-key -in ssl/client-cert-file -out ssl/client-cert-signed -days 365 -CAcreateserial -passin pass:<password>
    $ keytool -keystore ssl/kafka.server.keystore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt
    $ keytool -keystore ssl/kafka.server.keystore.jks -alias server -storepass <password> -importcert -file ssl/server-cert-signed -noprompt
    $ keytool -keystore ssl/kafka.client.keystore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt
    $ keytool -keystore ssl/kafka.client.keystore.jks -alias client -storepass <password> -importcert -file ssl/client-cert-signed -noprompt
    # This one, other_client_cert.pem is the client certificate you use in the function
    $ keytool -exportcert -alias client -keystore kafka.client.keystore.jks -rfc -file other_client_cert.pem
    
  3. Modificare server.properties dalla cartella config/ del server Kafka.

  4. Dopo aver creato le chiavi e i certificati SSL, sarà necessario aggiungerli alla configurazione Kafka. Modificare config/server.properties nella cartella Kafka e aggiungere le righe seguenti:

    listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9093
    advertised.listeners=PLAINTEXT://<hostname>:9092,SSL://<public ip of hostname:9093
    ssl.keystore.location=<path to ssl/kafka.server.keystore.jks>
    ssl.keystore.password=<password>
    ssl.key.password=<password>
    ssl.truststore.location=<path to ssl/kafka.server.truststore.jks> 
    ssl.truststore.password=<password>
    
  5. Aggiungere il limite delle porte al firewall dell'istanza per le porte del server Kafka. In termini di sicurezza, in primo luogo, sarà necessario assicurarsi che il firewall all'interno della VM sia configurato in modo da consentire l'accesso alle porte Kafka.

    sudo firewall-cmd --add-port=9093/tcp
    sudo firewall-cmd --add-port=9092/tcp
    
  6. Assicurarsi di disporre della rete configurata per consentire il traffico verso tale istanza. Dopo aver confermato che il firewall all'interno della VM ha la configurazione appropriata, è necessario configurare le regole di sicurezza e di instradamento o qualsiasi altro tipo di firewall che filtra il traffico per consentire il traffico verso tale istanza sulle porte specifiche di Kafka.

  7. Eseguire i comandi seguenti per eseguire il server Kafka in background.

    nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
    nohup bin/kafka-server-start.sh config/server.properties &
    

Task 3: impostare l'hub connettore servizio e funzione OCI

Assicurarsi di disporre di criteri per creare, distribuire e gestire funzioni e applicazioni. Inoltre, devi consentire all'agente FAAS del servizio l'accesso a vault, segreti e chiavi.

  1. Creare segreti ca-cert e client_cert.pem nel ult OCI creato in precedenza.

    ca-cert

    Crea finestra Cert CA segreta

    client_cert.pem

    Crea finestra clientcert segreto

  2. Nella console OCI, andare a Funzione, Configurazione e aggiungere le variabili riportate di seguito.

    Finestra Crea configurazione funzioni

  3. Impostazione delle funzioni OCI

    1. Andare a Servizi per sviluppatori nella sezione Funzioni.

    2. Accedere all'applicazione già creata logs_to_kafka e selezionare Introduzione, quindi selezionare Impostazione di Cloud Shell e procedere fino al comando di login di Docker.

      Finestra Introduzione funzione

    3. Generare una funzione boilerplate logs_to_kafka.

      fn init --runtime python logs_to_kafka
      

      Il comando fn init genererà una cartella denominata logs_to_kafka con 3 file all'interno; func.py, func.yaml e requirements.txt.

    4. Aprire func.py e sostituire il contenuto del file con il codice seguente:

      import io
      import json
      import logging
      import oci
      import base64
      from confluent_kafka import Producer, KafkaError
      
      def handler(ctx, data: io.BytesIO = None):
          cfg = ctx.Config()
          signer = oci.auth.signers.get_resource_principals_signer()
          try:
              topic_name = str(cfg["topic_name"])
              bootstrap_server = str(cfg["bootstrap_server"])
              security_protocol = str(cfg["security_protocol"])
              secret_name = str(cfg["ca_cert_secret_name"])
              client_cert_secret = str(cfg["client_cert_secret_name"])
              vauld_ocid = str(cfg["vauld_ocid"])
          except:
              logging.error('Some of the function config keys are not set')
              raise
          try:
              body = json.loads(data.getvalue())
          except (Exception, ValueError) as ex:
              logging.getLogger().info('error parsing json payload: ' + str(ex))
          try:
              client_certificate = decodeSecret(
              p_signer=signer, p_secretName=client_cert_secret, p_vaultOCID=vauld_ocid)
          except (Exception, ValueError) as ex:
              logging.getLogger().info('error retrieving the client certificate from vault: ' + str(ex))
          try:
              decoded_secret = decodeSecret(
                  p_signer=signer, p_secretName=secret_name, p_vaultOCID=vauld_ocid)
          except (Exception, ValueError) as ex:
              logging.getLogger().info('error retrieving the secret: ' + str(ex))
          try:
              sent_logs = publish_message(topic=topic_name, bootstrap_srv=bootstrap_server, security_protocol=security_protocol,
                               ca_pem=decoded_secret, client_pem=client_certificate, record_value=bytes(str(body[0]), encoding='utf-8'))
              logging.info(f'log is sent {sent_logs}')
          except (Exception, ValueError) as ex:
              logging.getLogger().info('error in publishing the message: ' + str(ex))
      
      def decodeSecret(p_signer, p_secretName, p_vaultOCID):
          secretClient = oci.secrets.SecretsClient(config={}, signer=p_signer)
          secret = secretClient.get_secret_bundle_by_name(
                                      secret_name=p_secretName, vault_id=p_vaultOCID).data
          secret_content = secret.secret_bundle_content.content.encode("utf-8")
          decodedSecret = base64.b64decode(secret_content).decode("utf-8")
          return decodedSecret
      
      def delivery_report(errmsg, msg):
          if errmsg is not None:
              print("Delivery failed for Message: {} : {}".format(msg.key(), errmsg))
              return
          print('Message successfully produced to Topic:{} at offset {}'.format(
                  msg.topic(), msg.offset()))
      
      def publish_message(topic, bootstrap_srv, security_protocol, ca_pem, client_pem, record_value):
          conf = {
              'bootstrap.servers': bootstrap_srv,
              'security.protocol': security_protocol,
              'ssl.certificate.pem': client_pem,
              'ssl.ca.pem': ca_pem
          }
          producer = Producer(conf)
          produce_log = producer.produce(topic, key=None, value=record_value, on_delivery=delivery_report)
          producer.flush()
          return produce_log
      
    5. Sostituire quindi il contenuto di func.yaml. Completare i campi con le informazioni appropriate:

      schema_version: 20180708
      name: logs_to_kafka
      version: 0.0.1
      runtime: python
      build_image: fnproject/python:3.9-dev
      run_image: fnproject/python:3.9
      entrypoint: /python/bin/fdk /function/func.py handler
      memory: 256
      config:
          bootstrap_server: <kafka_server_public_ip>:9093
          ca_cert_secret_name: ca_cert_secret_name
          client_cert_secret_name: client_cert_secret_name
          security_protocol: SSL
          topic_name: quickstart-events
          vauld_ocid: <vault_ocid>
      
    6. Per requirements.txt è necessario disporre dei seguenti elementi:

      fdk>=0.1.50
      confluent-kafka>=1.4.2
      oci>=2.57.0
      
  4. Andare a Analytics e AI, Hub connettore servizio e selezionare Crea connettore servizio. Completare i campi come mostrato nella seguente immagine.

    Finestra Crea connettore servizio

Task 4: verificare se i log vengono inviati al server Kafka

  1. Eseguire un client nell'istanza in cui si dispone della cartella per il server Kafka, perché per il server vengono utilizzati anche tutti i certificati SSL e la cartella Kafka con file binari. 

  2. In primo luogo, usare il seguente file consumer.properties:

    security.protocol=SSL
    ssl.truststore.location=<path to kafka.client.truststore.jks>
    ssl.truststore.password=<password>
    ssl.keystore.location=<path to kafka.client.keystore.jks>
    ssl.keystore.password=<password>
    ssl.key.password=<password>
    ssl.endpoint.identification.algorithm=
    
  3. Quindi eseguire il comando seguente nella cartella Kafka: 

    bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server <instance_public_ip>:9093 --consumer.config <path_to_consumer.properties>
    

Conferme

Altre risorse di apprendimento

Esplora altri laboratori su docs.oracle.com/learn o accedi a contenuti di formazione gratuiti sul canale YouTube di Oracle Learning. Inoltre, visitare education.oracle.com/learning-explorer per diventare Explorer di Oracle Learning.

Per la documentazione sul prodotto, visitare il sito Oracle Help Center.