Note:
- This tutorial requires access to Oracle Cloud. To sign up for a free account, see Get started with Oracle Cloud Infrastructure Free Tier.
- It uses example values for Oracle Cloud Infrastructure credentials, tenancy, and compartments. When completing your lab, substitute these values with ones specific to your cloud environment.
Push logs from OCI Logging into mTLS configured Kafka using OCI Functions
Introduction
Logs represent the diary for resources and there you get records about every message from the network systems. Having many resources within an organization’s cloud infrastructure comes with the need to maintain its performance and security. There is a need for log collection, the process of collecting log data from various resources within the organization’s ecosystem and bringing them together in a central point for analysis. This tutorial describes the process of setting up resources needed in Oracle Cloud Infrastructure (OCI) to push logs into a Kafka server setup with mTLS located outside OCI.
Using this configuration, you will have a solution that gets real-time logs from your OCI tenancy into a central point of analysis. Furthermore, you can link a Logstash to your Kafka server and transform/enhance the data as per your requirement and place it in the ElasticSearch index.
Objectives
- Create and use OCI Service Connector Hub
- Create and use OCI Vault in an OCI Functions Configuration
- Configure mTLS for Kafka
- Get logs from OCI Logging service and redirect them, using OCI Service Connector Hub and OCI Function, into a mTLS configured Kafka
Prerequisites
- An Oracle Cloud account. If you don’t have an account, you can sign up for an Oracle Cloud Free Tier account.
- A network setup in which you will create the function application. We are assuming you already have a VCN created with a public subnet in which the function application will be created.
Task 1: Configure your Oracle Cloud Infrastructure setup
This section includes the initial steps which will be used to create the infrastructure necessary in OCI. You will be creating below resources:
-
Create Vault with Master Encryption Keys and Secrets
-
Go to Identity & Security menu, select Vault, and then select Create Vault.
-
Enter a Name for your vault and click Create Vault.
-
After the vault is created, click the vault you created and create a Master Encryption Key with AES Algorithm. This key will be used to encrypt the vault secrets further in this blog.
-
-
Create Container Registry for the image that will be used by OCI Function
-
Go to Developer Service, Container & Artifacts and select Container Registry.
-
Create a private repository named logs_to_kafka.
-
-
Create Functions Application
-
To ingest logs into the Kafka server will need a Kafka client which will publish into Kafka’s topic. Go to Developer Services in Functions section.
-
Select Create application, and name the application logs_to_kafka.
-
Select the VCN you created in the Prerequisites section of this blog and select the public subnet within the VCN to deploy the application in. Make sure you have the required security rules and route rules to permit traffic outside the VCN from that public subnet.
-
Task 2: Install and configure the Kafka Server
-
Create Linux Instance on which Kafka Server will be deployed.
-
First, create the VM instance and enable
ssh
access to install and configure Kafka with mTLS. -
Access the VM using
ssh
and follow the steps from quickstart to install Kafka on that instance.
-
-
Configure mTLS for Kafka Server and create the certificates used for mTLS. Go to the home directory of the instance and run the following commands:
$ mkdir ~/ssl # This one, ca-cert is the certificate authority you will use in the function $ openssl req -new -x509 -keyout ssl/ca-key -out ssl/ca-cert -days 365 -nodes $ keytool -keystore ssl/kafka.server.keystore.jks -alias server -validity 365 -genkey -keyalg RSA -storepass <password> -keypass <password> $ keytool -keystore ssl/kafka.client.keystore.jks -alias client -validity 365 -genkey -keyalg RSA -storepass <password> -keypass <password> $ keytool -keystore ssl/kafka.server.truststore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt $ keytool -keystore ssl/kafka.client.truststore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt $ keytool -keystore ssl/kafka.server.keystore.jks -alias server -storepass <password> -certreq -file ssl/server-cert-file $ keytool -keystore ssl/kafka.client.keystore.jks -alias client -storepass<password> -certreq -file ssl/client-cert-file $ openssl x509 -req -CA ssl/ca-cert -CAkey ssl/ca-key -in ssl/server-cert-file -out ssl/server-cert-signed -days 365 -CAcreateserial -passin pass:<password> $ openssl x509 -req -CA ssl/ca-cert -CAkey ssl/ca-key -in ssl/client-cert-file -out ssl/client-cert-signed -days 365 -CAcreateserial -passin pass:<password> $ keytool -keystore ssl/kafka.server.keystore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt $ keytool -keystore ssl/kafka.server.keystore.jks -alias server -storepass <password> -importcert -file ssl/server-cert-signed -noprompt $ keytool -keystore ssl/kafka.client.keystore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt $ keytool -keystore ssl/kafka.client.keystore.jks -alias client -storepass <password> -importcert -file ssl/client-cert-signed -noprompt # This one, other_client_cert.pem is the client certificate you use in the function $ keytool -exportcert -alias client -keystore kafka.client.keystore.jks -rfc -file other_client_cert.pem
-
Edit server.properties from the config/ folder of the Kafka server.
-
After the SSL keys and certificates are created you will need to add them to the Kafka configuration. Edit config/server.properties under the Kafka folder and add the following lines:
listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9093 advertised.listeners=PLAINTEXT://<hostname>:9092,SSL://<public ip of hostname:9093 ssl.keystore.location=<path to ssl/kafka.server.keystore.jks> ssl.keystore.password=<password> ssl.key.password=<password> ssl.truststore.location=<path to ssl/kafka.server.truststore.jks> ssl.truststore.password=<password>
-
Add ports allowance to the firewall of the instance for the Kafka Server ports. In security terms, first, you will need to make sure the firewall inside the VM is configured to permit access on Kafka ports.
sudo firewall-cmd --add-port=9093/tcp sudo firewall-cmd --add-port=9092/tcp
-
Make sure you have the network configured to allow traffic to that instance. After you confirm that the firewall inside the VM has the proper configuration, you must configure security rules and route rules or any other type of firewall which filters traffic to allow traffic to that instance on Kafka-specific ports.
-
Run the following commands to run Kafka server in the background.
nohup bin/zookeeper-server-start.sh config/zookeeper.properties & nohup bin/kafka-server-start.sh config/server.properties &
Task 3: Set up OCI Function and Service Connector Hub
Make sure you have policies to create, deploy and manage functions and applications. You must also allow service FAAS access to vault, secrets, and keys.
-
Create ca-cert and client_cert.pem secrets in the previously created OCI Vault.
ca-cert
client_cert.pem
-
In the OCI Console, go to Function, Configuration and add the following variables:
-
Set up OCI Functions
-
Go to Developer Services in the Functions section.
-
Go on the already created application logs_to_kafka and select Getting started, then select Cloud Shell Setup and get through the steps until the docker login command.
-
Generate a
logs_to_kafka
boilerplate function.fn init --runtime python logs_to_kafka
The fn init command will generate a folder called logs_to_kafka with 3 files inside; func.py, func.yaml, and requirements.txt.
-
Open func.py and replace the contents of the file with the following code:
import io import json import logging import oci import base64 from confluent_kafka import Producer, KafkaError def handler(ctx, data: io.BytesIO = None): cfg = ctx.Config() signer = oci.auth.signers.get_resource_principals_signer() try: topic_name = str(cfg["topic_name"]) bootstrap_server = str(cfg["bootstrap_server"]) security_protocol = str(cfg["security_protocol"]) secret_name = str(cfg["ca_cert_secret_name"]) client_cert_secret = str(cfg["client_cert_secret_name"]) vauld_ocid = str(cfg["vauld_ocid"]) except: logging.error('Some of the function config keys are not set') raise try: body = json.loads(data.getvalue()) except (Exception, ValueError) as ex: logging.getLogger().info('error parsing json payload: ' + str(ex)) try: client_certificate = decodeSecret( p_signer=signer, p_secretName=client_cert_secret, p_vaultOCID=vauld_ocid) except (Exception, ValueError) as ex: logging.getLogger().info('error retrieving the client certificate from vault: ' + str(ex)) try: decoded_secret = decodeSecret( p_signer=signer, p_secretName=secret_name, p_vaultOCID=vauld_ocid) except (Exception, ValueError) as ex: logging.getLogger().info('error retrieving the secret: ' + str(ex)) try: sent_logs = publish_message(topic=topic_name, bootstrap_srv=bootstrap_server, security_protocol=security_protocol, ca_pem=decoded_secret, client_pem=client_certificate, record_value=bytes(str(body[0]), encoding='utf-8')) logging.info(f'log is sent {sent_logs}') except (Exception, ValueError) as ex: logging.getLogger().info('error in publishing the message: ' + str(ex)) def decodeSecret(p_signer, p_secretName, p_vaultOCID): secretClient = oci.secrets.SecretsClient(config={}, signer=p_signer) secret = secretClient.get_secret_bundle_by_name( secret_name=p_secretName, vault_id=p_vaultOCID).data secret_content = secret.secret_bundle_content.content.encode("utf-8") decodedSecret = base64.b64decode(secret_content).decode("utf-8") return decodedSecret def delivery_report(errmsg, msg): if errmsg is not None: print("Delivery failed for Message: {} : {}".format(msg.key(), errmsg)) return print('Message successfully produced to Topic:{} at offset {}'.format( msg.topic(), msg.offset())) def publish_message(topic, bootstrap_srv, security_protocol, ca_pem, client_pem, record_value): conf = { 'bootstrap.servers': bootstrap_srv, 'security.protocol': security_protocol, 'ssl.certificate.pem': client_pem, 'ssl.ca.pem': ca_pem } producer = Producer(conf) produce_log = producer.produce(topic, key=None, value=record_value, on_delivery=delivery_report) producer.flush() return produce_log
-
Then replace the contents of func.yaml. Complete the fields with your appropriate pieces of information:
schema_version: 20180708 name: logs_to_kafka version: 0.0.1 runtime: python build_image: fnproject/python:3.9-dev run_image: fnproject/python:3.9 entrypoint: /python/bin/fdk /function/func.py handler memory: 256 config: bootstrap_server: <kafka_server_public_ip>:9093 ca_cert_secret_name: ca_cert_secret_name client_cert_secret_name: client_cert_secret_name security_protocol: SSL topic_name: quickstart-events vauld_ocid: <vault_ocid>
-
For requirements.txt you will need to have the following:
fdk>=0.1.50 confluent-kafka>=1.4.2 oci>=2.57.0
-
-
Go to Analytics & AI, Service Connector Hub and select Create Service Connector. Complete the fields as shown in the following image.
Task 4: Verify if the logs are sent to the Kafka server
-
Run a client on the instance where you have the folder for the Kafka server, because all the SSL certificates and Kafka folder with binaries are used also for the server.
-
First, have the following consumer.properties file:
security.protocol=SSL ssl.truststore.location=<path to kafka.client.truststore.jks> ssl.truststore.password=<password> ssl.keystore.location=<path to kafka.client.keystore.jks> ssl.keystore.password=<password> ssl.key.password=<password> ssl.endpoint.identification.algorithm=
-
Then run the following command in the Kafka folder:
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server <instance_public_ip>:9093 --consumer.config <path_to_consumer.properties>
Related Links
Acknowledgments
- Author - Cristian Vlad (Principal Cloud Architect)
- Contributor - Gabriel Feodorov (Cloud Engineer)
More Learning Resources
Explore other labs on docs.oracle.com/learn or access more free learning content on the Oracle Learning YouTube channel. Additionally, visit education.oracle.com/learning-explorer to become an Oracle Learning Explorer.
For product documentation, visit Oracle Help Center.
Push logs from OCI Logging into mTLS configured Kafka using OCI Functions
F79041-01
March 2023
Copyright © 2023, Oracle and/or its affiliates.