Nota:
- Questa esercitazione richiede l'accesso a Oracle Cloud. Per iscriversi a un account gratuito, consulta Inizia a utilizzare Oracle Cloud Infrastructure Free Tier.
- Utilizza valori di esempio per le credenziali, la tenancy e i compartimenti di Oracle Cloud Infrastructure. Al termine del laboratorio, sostituisci questi valori con quelli specifici del tuo ambiente cloud.
Eseguire il push dei log da Log OCI in Kafka configurato mTLS utilizzando le funzioni OCI
Introduzione
I log rappresentano il diario per le risorse e vengono visualizzati record su ogni messaggio proveniente dai sistemi di rete. La disponibilità di molte risorse all'interno dell'infrastruttura cloud di un'organizzazione prevede la necessità di garantirne le prestazioni e la sicurezza. È necessario raccogliere i dati di registro da varie risorse all'interno dell'ecosistema dell'organizzazione e riunirli in un punto centrale per l'analisi. Questa esercitazione descrive il processo di impostazione delle risorse necessarie in Oracle Cloud Infrastructure (OCI) per eseguire il push dei log in un'impostazione del server Kafka con mTLS situato al di fuori di OCI.
Utilizzando questa configurazione, avrai una soluzione che ottiene log in tempo reale dalla tua tenancy OCI in un punto centrale di analisi. Inoltre, puoi collegare un Logstash al tuo server Kafka e trasformare/migliorare i dati in base alle tue esigenze e inserirlo nell'indice ElasticSearch.

Obiettivi
- Creare e utilizzare l'hub connettore servizio OCI
- Creare e utilizzare un vault OCI in una configurazione di funzioni OCI
- Configura mTLS per Kafka
- Recuperare i log dal servizio di log OCI e reindirizzarli, utilizzando l'hub connettore servizio OCI e la funzione OCI, in un Kafka configurato mTLS
Prerequisiti
- Un account Oracle Cloud. Se non disponi di un account, puoi iscriverti a un account Oracle Cloud Free Tier.
- impostazione di rete in cui verrà creata l'applicazione funzione. Si suppone che si disponga già di una VCN creata con una subnet pubblica in cui verrà creata l'applicazione di funzione.
Task 1: configurare l'impostazione di Oracle Cloud Infrastructure
Questa sezione include i passi iniziali che verranno utilizzati per creare l'infrastruttura necessaria in OCI. Si creeranno le risorse riportate di seguito.
-
Crea vault con chiavi di cifratura master e segreti
-
Andare al menu Identità e sicurezza, selezionare Errore, quindi selezionare Crea vault.
-
Immettere un nome per il vault e fare clic su Crea vault.

-
Dopo aver creato il vault, fare clic sul vault creato e creare una chiave di cifratura master con l'algoritmo AES. Questa chiave verrà utilizzata per cifrare ulteriormente i segreti del vault in questo blog.
-
-
Crea registro contenitore per l'immagine che verrà utilizzata dalla funzione OCI
-
Andare a Servizio sviluppatore, Container e artifact e selezionare Container Registry.
-
Creare un repository privato denominato logs_to_kafka.

-
-
Crea applicazione funzioni
-
Per includere i log nel server Kafka sarà necessario un client Kafka che verrà pubblicato nell'argomento di Kafka. Andare a Servizi per sviluppatori nella sezione Funzioni.
-
Selezionare Crea applicazione e assegnare un nome all'applicazione logs_to_kafka.
-
Selezionare la VCN creata nella sezione Prerequisiti di questo blog e selezionare la subnet pubblica all'interno della VCN in cui distribuire l'applicazione. Assicurarsi di disporre delle regole di sicurezza e di instradamento necessarie per consentire il traffico esterno alla VCN da tale subnet pubblica.

-
Task 2: installare e configurare il server Kafka
-
Creare un'istanza Linux su cui verrà distribuito Kafka Server.
-
In primo luogo, creare l'istanza VM e abilitare l'accesso
sshper installare e configurare Kafka con mTLS. -
Accedi alla VM utilizzando
sshe segui i passi di quickstart per installare Kafka su tale istanza.
-
-
Configurare mTLS per il server Kafka e creare i certificati utilizzati per mTLS. Andare alla directory home dell'istanza ed eseguire i comandi seguenti:
$ mkdir ~/ssl # This one, ca-cert is the certificate authority you will use in the function $ openssl req -new -x509 -keyout ssl/ca-key -out ssl/ca-cert -days 365 -nodes $ keytool -keystore ssl/kafka.server.keystore.jks -alias server -validity 365 -genkey -keyalg RSA -storepass <password> -keypass <password> $ keytool -keystore ssl/kafka.client.keystore.jks -alias client -validity 365 -genkey -keyalg RSA -storepass <password> -keypass <password> $ keytool -keystore ssl/kafka.server.truststore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt $ keytool -keystore ssl/kafka.client.truststore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt $ keytool -keystore ssl/kafka.server.keystore.jks -alias server -storepass <password> -certreq -file ssl/server-cert-file $ keytool -keystore ssl/kafka.client.keystore.jks -alias client -storepass<password> -certreq -file ssl/client-cert-file $ openssl x509 -req -CA ssl/ca-cert -CAkey ssl/ca-key -in ssl/server-cert-file -out ssl/server-cert-signed -days 365 -CAcreateserial -passin pass:<password> $ openssl x509 -req -CA ssl/ca-cert -CAkey ssl/ca-key -in ssl/client-cert-file -out ssl/client-cert-signed -days 365 -CAcreateserial -passin pass:<password> $ keytool -keystore ssl/kafka.server.keystore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt $ keytool -keystore ssl/kafka.server.keystore.jks -alias server -storepass <password> -importcert -file ssl/server-cert-signed -noprompt $ keytool -keystore ssl/kafka.client.keystore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt $ keytool -keystore ssl/kafka.client.keystore.jks -alias client -storepass <password> -importcert -file ssl/client-cert-signed -noprompt # This one, other_client_cert.pem is the client certificate you use in the function $ keytool -exportcert -alias client -keystore kafka.client.keystore.jks -rfc -file other_client_cert.pem -
Modificare server.properties dalla cartella config/ del server Kafka.
-
Dopo aver creato le chiavi e i certificati SSL, sarà necessario aggiungerli alla configurazione Kafka. Modificare config/server.properties nella cartella Kafka e aggiungere le righe seguenti:
listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9093 advertised.listeners=PLAINTEXT://<hostname>:9092,SSL://<public ip of hostname:9093 ssl.keystore.location=<path to ssl/kafka.server.keystore.jks> ssl.keystore.password=<password> ssl.key.password=<password> ssl.truststore.location=<path to ssl/kafka.server.truststore.jks> ssl.truststore.password=<password> -
Aggiungere il limite delle porte al firewall dell'istanza per le porte del server Kafka. In termini di sicurezza, in primo luogo, sarà necessario assicurarsi che il firewall all'interno della VM sia configurato in modo da consentire l'accesso alle porte Kafka.
sudo firewall-cmd --add-port=9093/tcp sudo firewall-cmd --add-port=9092/tcp -
Assicurarsi di disporre della rete configurata per consentire il traffico verso tale istanza. Dopo aver confermato che il firewall all'interno della VM ha la configurazione appropriata, è necessario configurare le regole di sicurezza e di instradamento o qualsiasi altro tipo di firewall che filtra il traffico per consentire il traffico verso tale istanza sulle porte specifiche di Kafka.
-
Eseguire i comandi seguenti per eseguire il server Kafka in background.
nohup bin/zookeeper-server-start.sh config/zookeeper.properties & nohup bin/kafka-server-start.sh config/server.properties &
Task 3: impostare l'hub connettore servizio e funzione OCI
Assicurarsi di disporre di criteri per creare, distribuire e gestire funzioni e applicazioni. Inoltre, devi consentire all'agente FAAS del servizio l'accesso a vault, segreti e chiavi.
-
Creare segreti ca-cert e client_cert.pem nel ult OCI creato in precedenza.
ca-cert

client_cert.pem

-
Nella console OCI, andare a Funzione, Configurazione e aggiungere le variabili riportate di seguito.

-
Impostazione delle funzioni OCI
-
Andare a Servizi per sviluppatori nella sezione Funzioni.
-
Accedere all'applicazione già creata logs_to_kafka e selezionare Introduzione, quindi selezionare Impostazione di Cloud Shell e procedere fino al comando di login di Docker.

-
Generare una funzione boilerplate
logs_to_kafka.fn init --runtime python logs_to_kafkaIl comando fn init genererà una cartella denominata logs_to_kafka con 3 file all'interno; func.py, func.yaml e requirements.txt.
-
Aprire func.py e sostituire il contenuto del file con il codice seguente:
import io import json import logging import oci import base64 from confluent_kafka import Producer, KafkaError def handler(ctx, data: io.BytesIO = None): cfg = ctx.Config() signer = oci.auth.signers.get_resource_principals_signer() try: topic_name = str(cfg["topic_name"]) bootstrap_server = str(cfg["bootstrap_server"]) security_protocol = str(cfg["security_protocol"]) secret_name = str(cfg["ca_cert_secret_name"]) client_cert_secret = str(cfg["client_cert_secret_name"]) vauld_ocid = str(cfg["vauld_ocid"]) except: logging.error('Some of the function config keys are not set') raise try: body = json.loads(data.getvalue()) except (Exception, ValueError) as ex: logging.getLogger().info('error parsing json payload: ' + str(ex)) try: client_certificate = decodeSecret( p_signer=signer, p_secretName=client_cert_secret, p_vaultOCID=vauld_ocid) except (Exception, ValueError) as ex: logging.getLogger().info('error retrieving the client certificate from vault: ' + str(ex)) try: decoded_secret = decodeSecret( p_signer=signer, p_secretName=secret_name, p_vaultOCID=vauld_ocid) except (Exception, ValueError) as ex: logging.getLogger().info('error retrieving the secret: ' + str(ex)) try: sent_logs = publish_message(topic=topic_name, bootstrap_srv=bootstrap_server, security_protocol=security_protocol, ca_pem=decoded_secret, client_pem=client_certificate, record_value=bytes(str(body[0]), encoding='utf-8')) logging.info(f'log is sent {sent_logs}') except (Exception, ValueError) as ex: logging.getLogger().info('error in publishing the message: ' + str(ex)) def decodeSecret(p_signer, p_secretName, p_vaultOCID): secretClient = oci.secrets.SecretsClient(config={}, signer=p_signer) secret = secretClient.get_secret_bundle_by_name( secret_name=p_secretName, vault_id=p_vaultOCID).data secret_content = secret.secret_bundle_content.content.encode("utf-8") decodedSecret = base64.b64decode(secret_content).decode("utf-8") return decodedSecret def delivery_report(errmsg, msg): if errmsg is not None: print("Delivery failed for Message: {} : {}".format(msg.key(), errmsg)) return print('Message successfully produced to Topic:{} at offset {}'.format( msg.topic(), msg.offset())) def publish_message(topic, bootstrap_srv, security_protocol, ca_pem, client_pem, record_value): conf = { 'bootstrap.servers': bootstrap_srv, 'security.protocol': security_protocol, 'ssl.certificate.pem': client_pem, 'ssl.ca.pem': ca_pem } producer = Producer(conf) produce_log = producer.produce(topic, key=None, value=record_value, on_delivery=delivery_report) producer.flush() return produce_log -
Sostituire quindi il contenuto di func.yaml. Completare i campi con le informazioni appropriate:
schema_version: 20180708 name: logs_to_kafka version: 0.0.1 runtime: python build_image: fnproject/python:3.9-dev run_image: fnproject/python:3.9 entrypoint: /python/bin/fdk /function/func.py handler memory: 256 config: bootstrap_server: <kafka_server_public_ip>:9093 ca_cert_secret_name: ca_cert_secret_name client_cert_secret_name: client_cert_secret_name security_protocol: SSL topic_name: quickstart-events vauld_ocid: <vault_ocid> -
Per requirements.txt è necessario disporre dei seguenti elementi:
fdk>=0.1.50 confluent-kafka>=1.4.2 oci>=2.57.0
-
-
Andare a Analytics e AI, Hub connettore servizio e selezionare Crea connettore servizio. Completare i campi come mostrato nella seguente immagine.

Task 4: verificare se i log vengono inviati al server Kafka
-
Eseguire un client nell'istanza in cui si dispone della cartella per il server Kafka, perché per il server vengono utilizzati anche tutti i certificati SSL e la cartella Kafka con file binari.
-
In primo luogo, usare il seguente file consumer.properties:
security.protocol=SSL ssl.truststore.location=<path to kafka.client.truststore.jks> ssl.truststore.password=<password> ssl.keystore.location=<path to kafka.client.keystore.jks> ssl.keystore.password=<password> ssl.key.password=<password> ssl.endpoint.identification.algorithm= -
Quindi eseguire il comando seguente nella cartella Kafka:
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server <instance_public_ip>:9093 --consumer.config <path_to_consumer.properties>
Collegamenti correlati
Conferme
- Autore - Cristian Vlad (architetto del cloud principale)
- Collaboratore - Gabriel Feodorov (Cloud Engineer)
Altre risorse di apprendimento
Esplora altri laboratori su docs.oracle.com/learn o accedi a contenuti di formazione gratuiti sul canale YouTube di Oracle Learning. Inoltre, visitare education.oracle.com/learning-explorer per diventare Explorer di Oracle Learning.
Per la documentazione sul prodotto, visitare il sito Oracle Help Center.
Push logs from OCI Logging into mTLS configured Kafka using OCI Functions
F79041-01
March 2023
Copyright © 2023, Oracle and/or its affiliates.