Note:

Push logs from OCI Logging into mTLS configured Kafka using OCI Functions

Introduction

Logs represent the diary for resources and there you get records about every message from the network systems. Having many resources within an organization’s cloud infrastructure comes with the need to maintain its performance and security. There is a need for log collection, the process of collecting log data from various resources within the organization’s ecosystem and bringing them together in a central point for analysis. This tutorial describes the process of setting up resources needed in Oracle Cloud Infrastructure (OCI) to push logs into a Kafka server setup with mTLS located outside OCI.

Using this configuration, you will have a solution that gets real-time logs from your OCI tenancy into a central point of analysis. Furthermore, you can link a Logstash to your Kafka server and transform/enhance the data as per your requirement and place it in the ElasticSearch index.

Solution architecture

Objectives

Prerequisites

Task 1: Configure your Oracle Cloud Infrastructure setup

This section includes the initial steps which will be used to create the infrastructure necessary in OCI. You will be creating below resources:

  1. Create Vault with Master Encryption Keys and Secrets

    1. Go to Identity & Security menu, select Vault, and then select Create Vault.

    2. Enter a Name for your vault and click Create Vault.

      Create Vault Window

    3. After the vault is created, click the vault you created and create a Master Encryption Key with AES Algorithm. This key will be used to encrypt the vault secrets further in this blog.

  2. Create Container Registry for the image that will be used by OCI Function

    1. Go to Developer Service, Container & Artifacts and select Container Registry.

    2. Create a private repository named logs_to_kafka.

      Create Repository Window

  3. Create Functions Application

    1. To ingest logs into the Kafka server will need a Kafka client which will publish into Kafka’s topic. Go to Developer Services in Functions section.

    2. Select Create application, and name the application logs_to_kafka.

    3. Select the VCN you created in the Prerequisites section of this blog and select the public subnet within the VCN to deploy the application in. Make sure you have the required security rules and route rules to permit traffic outside the VCN from that public subnet.

    Create Application Window

Task 2: Install and configure the Kafka Server

  1. Create Linux Instance on which Kafka Server will be deployed.

    1. First, create the VM instance and enable ssh access to install and configure Kafka with mTLS.

    2. Access the VM using ssh and follow the steps from quickstart to install Kafka on that instance. 

  2. Configure mTLS for Kafka Server and create the certificates used for mTLS. Go to the home directory of the instance and run the following commands:

    $ mkdir ~/ssl
    # This one, ca-cert is the certificate authority you will use in the function
    $ openssl req -new -x509 -keyout ssl/ca-key -out ssl/ca-cert -days 365 -nodes
    $ keytool -keystore ssl/kafka.server.keystore.jks -alias server -validity 365 -genkey -keyalg RSA -storepass <password> -keypass <password>
    $ keytool -keystore ssl/kafka.client.keystore.jks -alias client -validity 365 -genkey -keyalg RSA -storepass <password> -keypass <password>
    $ keytool -keystore ssl/kafka.server.truststore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt
    $ keytool -keystore ssl/kafka.client.truststore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt
    $ keytool -keystore ssl/kafka.server.keystore.jks -alias server -storepass <password> -certreq -file ssl/server-cert-file
    $ keytool -keystore ssl/kafka.client.keystore.jks -alias client -storepass<password> -certreq -file ssl/client-cert-file
    $ openssl x509 -req -CA ssl/ca-cert -CAkey ssl/ca-key -in ssl/server-cert-file -out ssl/server-cert-signed -days 365 -CAcreateserial -passin pass:<password>
    $ openssl x509 -req -CA ssl/ca-cert -CAkey ssl/ca-key -in ssl/client-cert-file -out ssl/client-cert-signed -days 365 -CAcreateserial -passin pass:<password>
    $ keytool -keystore ssl/kafka.server.keystore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt
    $ keytool -keystore ssl/kafka.server.keystore.jks -alias server -storepass <password> -importcert -file ssl/server-cert-signed -noprompt
    $ keytool -keystore ssl/kafka.client.keystore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt
    $ keytool -keystore ssl/kafka.client.keystore.jks -alias client -storepass <password> -importcert -file ssl/client-cert-signed -noprompt
    # This one, other_client_cert.pem is the client certificate you use in the function
    $ keytool -exportcert -alias client -keystore kafka.client.keystore.jks -rfc -file other_client_cert.pem
    
  3. Edit server.properties from the config/ folder of the Kafka server.

  4. After the SSL keys and certificates are created you will need to add them to the Kafka configuration. Edit config/server.properties under the Kafka folder and add the following lines:

    listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9093
    advertised.listeners=PLAINTEXT://<hostname>:9092,SSL://<public ip of hostname:9093
    ssl.keystore.location=<path to ssl/kafka.server.keystore.jks>
    ssl.keystore.password=<password>
    ssl.key.password=<password>
    ssl.truststore.location=<path to ssl/kafka.server.truststore.jks> 
    ssl.truststore.password=<password>
    
  5. Add ports allowance to the firewall of the instance for the Kafka Server ports. In security terms, first, you will need to make sure the firewall inside the VM is configured to permit access on Kafka ports.

    sudo firewall-cmd --add-port=9093/tcp
    sudo firewall-cmd --add-port=9092/tcp
    
  6. Make sure you have the network configured to allow traffic to that instance. After you confirm that the firewall inside the VM has the proper configuration, you must configure security rules and route rules or any other type of firewall which filters traffic to allow traffic to that instance on Kafka-specific ports.

  7. Run the following commands to run Kafka server in the background.

    nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
    nohup bin/kafka-server-start.sh config/server.properties &
    

Task 3: Set up OCI Function and Service Connector Hub

Make sure you have policies to create, deploy and manage functions and applications. You must also allow service FAAS access to vault, secrets, and keys.

  1. Create ca-cert and client_cert.pem secrets in the previously created OCI Vault.

    ca-cert

    Create Secret CA Cert Window

    client_cert.pem

    Create Secret clientcert Window

  2. In the OCI Console, go to FunctionConfiguration and add the following variables:

    Create Function Configuration Window

  3. Set up OCI Functions

    1. Go to Developer Services in the Functions section.

    2. Go on the already created application logs_to_kafka and select Getting started, then select Cloud Shell Setup and get through the steps until the docker login command.

      Function Getting Started  Window

    3. Generate a logs_to_kafka boilerplate function.

      fn init --runtime python logs_to_kafka
      

      The fn init command will generate a folder called logs_to_kafka with 3 files inside; func.py, func.yaml, and requirements.txt.

    4. Open func.py and replace the contents of the file with the following code:

      import io
      import json
      import logging
      import oci
      import base64
      from confluent_kafka import Producer, KafkaError
      
      def handler(ctx, data: io.BytesIO = None):
          cfg = ctx.Config()
          signer = oci.auth.signers.get_resource_principals_signer()
          try:
              topic_name = str(cfg["topic_name"])
              bootstrap_server = str(cfg["bootstrap_server"])
              security_protocol = str(cfg["security_protocol"])
              secret_name = str(cfg["ca_cert_secret_name"])
              client_cert_secret = str(cfg["client_cert_secret_name"])
              vauld_ocid = str(cfg["vauld_ocid"])
          except:
              logging.error('Some of the function config keys are not set')
              raise
          try:
              body = json.loads(data.getvalue())
          except (Exception, ValueError) as ex:
              logging.getLogger().info('error parsing json payload: ' + str(ex))
          try:
              client_certificate = decodeSecret(
              p_signer=signer, p_secretName=client_cert_secret, p_vaultOCID=vauld_ocid)
          except (Exception, ValueError) as ex:
              logging.getLogger().info('error retrieving the client certificate from vault: ' + str(ex))
          try:
              decoded_secret = decodeSecret(
                  p_signer=signer, p_secretName=secret_name, p_vaultOCID=vauld_ocid)
          except (Exception, ValueError) as ex:
              logging.getLogger().info('error retrieving the secret: ' + str(ex))
          try:
              sent_logs = publish_message(topic=topic_name, bootstrap_srv=bootstrap_server, security_protocol=security_protocol,
                               ca_pem=decoded_secret, client_pem=client_certificate, record_value=bytes(str(body[0]), encoding='utf-8'))
              logging.info(f'log is sent {sent_logs}')
          except (Exception, ValueError) as ex:
              logging.getLogger().info('error in publishing the message: ' + str(ex))
      
      def decodeSecret(p_signer, p_secretName, p_vaultOCID):
          secretClient = oci.secrets.SecretsClient(config={}, signer=p_signer)
          secret = secretClient.get_secret_bundle_by_name(
                                      secret_name=p_secretName, vault_id=p_vaultOCID).data
          secret_content = secret.secret_bundle_content.content.encode("utf-8")
          decodedSecret = base64.b64decode(secret_content).decode("utf-8")
          return decodedSecret
      
      def delivery_report(errmsg, msg):
          if errmsg is not None:
              print("Delivery failed for Message: {} : {}".format(msg.key(), errmsg))
              return
          print('Message successfully produced to Topic:{} at offset {}'.format(
                  msg.topic(), msg.offset()))
      
      def publish_message(topic, bootstrap_srv, security_protocol, ca_pem, client_pem, record_value):
          conf = {
              'bootstrap.servers': bootstrap_srv,
              'security.protocol': security_protocol,
              'ssl.certificate.pem': client_pem,
              'ssl.ca.pem': ca_pem
          }
          producer = Producer(conf)
          produce_log = producer.produce(topic, key=None, value=record_value, on_delivery=delivery_report)
          producer.flush()
          return produce_log
      
    5. Then replace the contents of func.yaml. Complete the fields with your appropriate pieces of information:

      schema_version: 20180708
      name: logs_to_kafka
      version: 0.0.1
      runtime: python
      build_image: fnproject/python:3.9-dev
      run_image: fnproject/python:3.9
      entrypoint: /python/bin/fdk /function/func.py handler
      memory: 256
      config:
          bootstrap_server: <kafka_server_public_ip>:9093
          ca_cert_secret_name: ca_cert_secret_name
          client_cert_secret_name: client_cert_secret_name
          security_protocol: SSL
          topic_name: quickstart-events
          vauld_ocid: <vault_ocid>
      
    6. For requirements.txt you will need to have the following:

      fdk>=0.1.50
      confluent-kafka>=1.4.2
      oci>=2.57.0
      
  4. Go to Analytics & AI, Service Connector Hub and select Create Service Connector. Complete the fields as shown in the following image.

    Create Service Connector Window

Task 4: Verify if the logs are sent to the Kafka server

  1. Run a client on the instance where you have the folder for the Kafka server, because all the SSL certificates and Kafka folder with binaries are used also for the server. 

  2. First, have the following consumer.properties file:

    security.protocol=SSL
    ssl.truststore.location=<path to kafka.client.truststore.jks>
    ssl.truststore.password=<password>
    ssl.keystore.location=<path to kafka.client.keystore.jks>
    ssl.keystore.password=<password>
    ssl.key.password=<password>
    ssl.endpoint.identification.algorithm=
    
  3. Then run the following command in the Kafka folder: 

    bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server <instance_public_ip>:9093 --consumer.config <path_to_consumer.properties>
    

Acknowledgments

More Learning Resources

Explore other labs on docs.oracle.com/learn or access more free learning content on the Oracle Learning YouTube channel. Additionally, visit education.oracle.com/learning-explorer to become an Oracle Learning Explorer.

For product documentation, visit Oracle Help Center.