ノート:

OCIファンクションを使用して、OCIロギングからmTLS構成済のKafkaにログをプッシュします

イントロダクション

ログはリソースの日記を表し、ネットワーク・システムからすべてのメッセージに関するレコードを取得します。組織のクラウド・インフラストラクチャ内に多くのリソースがある場合は、パフォーマンスとセキュリティを維持する必要があります。ログ収集、組織のエコシステム内の様々なリソースからログ・データを収集し、それらを一元的な分析ポイントにまとめるプロセスが必要です。このチュートリアルでは、OCIの外部にあるmTLSを使用してKafkaサーバー設定にログをプッシュするためにOracle Cloud Infrastructure (OCI)で必要なリソースを設定するプロセスについて説明します。

この構成を使用すると、OCIテナンシから一元的な分析ポイントにリアルタイムのログを取得するソリューションが提供されます。さらに、LogstashをKafkaサーバーにリンクして、要件に従ってデータを変換/拡張し、ElasticSearch索引に配置できます。

ソリューション・アーキテクチャ

目的

前提条件

タスク1: Oracle Cloud Infrastructure設定の構成

この項では、OCIに必要なインフラストラクチャの作成に使用する最初のステップについて説明します。次のリソースを作成します:

  1. マスター暗号化キーおよびシークレットを使用したVaultの作成

    1. 「アイデンティティとセキュリティ」メニューに移動し、「Vault」「Vaultの作成」の順に選択します。

    2. ボールトの名前を入力し、「Vaultの作成」をクリックします。

      Vaultウィンドウの作成

    3. ボールトの作成後、作成したボールトをクリックし、AESアルゴリズムを使用してマスター暗号化キーを作成します。このキーは、このブログのボールト・シークレットをさらに暗号化するために使用されます。

  2. OCIファンクションで使用されるイメージのコンテナ・レジストリの作成

    1. 「開発者サービス」「コンテナおよびアーティファクト」に移動して、「コンテナ・レジストリ」を選択します。

    2. logs_to_kafkaという名前のプライベート・リポジトリを作成します。

      「リポジトリの作成」ウィンドウ

  3. Functionsアプリケーションの作成

    1. Kafkaサーバーにログを収集するには、Kafkaのトピックに公開するKafkaクライアントが必要です。「ファンクション」セクションの「開発者サービス」に移動します。

    2. 「アプリケーションの作成」を選択し、アプリケーションにlogs_to_kafkaという名前を付けます。

    3. このブログの「前提条件」セクションで作成したVCNを選択し、アプリケーションをデプロイするVCN内のパブリック・サブネットを選択します。そのパブリック・サブネットからのVCN外部のトラフィックを許可するために必要なセキュリティ・ルールおよびルート・ルールがあることを確認してください。

    アプリケーション・ウィンドウの作成

タスク2: Kafkaサーバーのインストールと構成

  1. KafkaサーバーがデプロイされるLinuxインスタンスを作成します。

    1. まず、VMインスタンスを作成し、sshアクセスを有効にして、mTLSを使用してKafkaをインストールおよび構成します。

    2. sshを使用してVMにアクセスし、クイックスタートのステップに従って、そのインスタンスにKafkaをインストールします。

  2. Kafka ServerのmTLSを構成し、mTLSに使用される証明書を作成します。インスタンスのホーム・ディレクトリに移動し、次のコマンドを実行します。

    $ mkdir ~/ssl
    # This one, ca-cert is the certificate authority you will use in the function
    $ openssl req -new -x509 -keyout ssl/ca-key -out ssl/ca-cert -days 365 -nodes
    $ keytool -keystore ssl/kafka.server.keystore.jks -alias server -validity 365 -genkey -keyalg RSA -storepass <password> -keypass <password>
    $ keytool -keystore ssl/kafka.client.keystore.jks -alias client -validity 365 -genkey -keyalg RSA -storepass <password> -keypass <password>
    $ keytool -keystore ssl/kafka.server.truststore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt
    $ keytool -keystore ssl/kafka.client.truststore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt
    $ keytool -keystore ssl/kafka.server.keystore.jks -alias server -storepass <password> -certreq -file ssl/server-cert-file
    $ keytool -keystore ssl/kafka.client.keystore.jks -alias client -storepass<password> -certreq -file ssl/client-cert-file
    $ openssl x509 -req -CA ssl/ca-cert -CAkey ssl/ca-key -in ssl/server-cert-file -out ssl/server-cert-signed -days 365 -CAcreateserial -passin pass:<password>
    $ openssl x509 -req -CA ssl/ca-cert -CAkey ssl/ca-key -in ssl/client-cert-file -out ssl/client-cert-signed -days 365 -CAcreateserial -passin pass:<password>
    $ keytool -keystore ssl/kafka.server.keystore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt
    $ keytool -keystore ssl/kafka.server.keystore.jks -alias server -storepass <password> -importcert -file ssl/server-cert-signed -noprompt
    $ keytool -keystore ssl/kafka.client.keystore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt
    $ keytool -keystore ssl/kafka.client.keystore.jks -alias client -storepass <password> -importcert -file ssl/client-cert-signed -noprompt
    # This one, other_client_cert.pem is the client certificate you use in the function
    $ keytool -exportcert -alias client -keystore kafka.client.keystore.jks -rfc -file other_client_cert.pem
    
  3. Kafkaサーバーのconfig/フォルダからserver.propertiesを編集します。

  4. SSLキーおよび証明書が作成されたら、それらをKafka構成に追加する必要があります。Kafkaフォルダの下のconfig/server.propertiesを編集し、次の行を追加します:

    listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9093
    advertised.listeners=PLAINTEXT://<hostname>:9092,SSL://<public ip of hostname:9093
    ssl.keystore.location=<path to ssl/kafka.server.keystore.jks>
    ssl.keystore.password=<password>
    ssl.key.password=<password>
    ssl.truststore.location=<path to ssl/kafka.server.truststore.jks> 
    ssl.truststore.password=<password>
    
  5. Kafkaサーバー・ポートのインスタンスのファイアウォールにポート許容量を追加します。セキュリティ上の観点から、まず、VM内のファイアウォールがKafkaポートへのアクセスを許可するように構成されている必要があります。

    sudo firewall-cmd --add-port=9093/tcp
    sudo firewall-cmd --add-port=9092/tcp
    
  6. そのインスタンスへのトラフィックを許可するようにネットワークが構成されていることを確認します。VM内のファイアウォールが適切な構成であることを確認したら、セキュリティ・ルールとルート・ルール、またはKafka固有のポート上のそのインスタンスへのトラフィックを許可するためにトラフィックをフィルタするその他のタイプのファイアウォールを構成する必要があります。

  7. 次のコマンドを実行して、Kafkaサーバーをバックグラウンドで実行します。

    nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
    nohup bin/kafka-server-start.sh config/server.properties &
    

タスク3: OCIファンクションおよびサービス・コネクタ・ハブの設定

ファンクションおよびアプリケーションを作成、デプロイおよび管理するポリシーがあることを確認します。また、サービスFAASによるボールト、シークレットおよびキーへのアクセスも許可する必要があります。

  1. 以前に作成したOCI Vaultca-certおよびclient_cert.pemシークレットを作成します。

    協奏曲

    シークレットCA証明書ウィンドウの作成

    client_cert.pem

    シークレット・クライアント・コンサート・ウィンドウの作成

  2. OCIコンソールで、「関数」「構成」に移動し、次の変数を追加します:

    機能構成ウィンドウの作成

  3. OCIファンクションの設定

    1. 「ファンクション」セクションの「開発者サービス」に移動します。

    2. すでに作成されたアプリケーションlogs_to_kafkaに移動し、「開始」を選択してから、「クラウド・シェル設定」を選択し、dockerログイン・コマンドまでステップを実行します。

      機能スタート・ウィンドウ

    3. logs_to_kafkaボイラープレート関数を生成します。

      fn init --runtime python logs_to_kafka
      

      fn initコマンドは、logs_to_kafkaというフォルダを生成し、内部に3つのファイル(func.pyfunc.yamlおよびrequirements.txt)を含めます。

    4. func.pyを開き、ファイルの内容を次のコードに置き換えます。

      import io
      import json
      import logging
      import oci
      import base64
      from confluent_kafka import Producer, KafkaError
      
      def handler(ctx, data: io.BytesIO = None):
          cfg = ctx.Config()
          signer = oci.auth.signers.get_resource_principals_signer()
          try:
              topic_name = str(cfg["topic_name"])
              bootstrap_server = str(cfg["bootstrap_server"])
              security_protocol = str(cfg["security_protocol"])
              secret_name = str(cfg["ca_cert_secret_name"])
              client_cert_secret = str(cfg["client_cert_secret_name"])
              vauld_ocid = str(cfg["vauld_ocid"])
          except:
              logging.error('Some of the function config keys are not set')
              raise
          try:
              body = json.loads(data.getvalue())
          except (Exception, ValueError) as ex:
              logging.getLogger().info('error parsing json payload: ' + str(ex))
          try:
              client_certificate = decodeSecret(
              p_signer=signer, p_secretName=client_cert_secret, p_vaultOCID=vauld_ocid)
          except (Exception, ValueError) as ex:
              logging.getLogger().info('error retrieving the client certificate from vault: ' + str(ex))
          try:
              decoded_secret = decodeSecret(
                  p_signer=signer, p_secretName=secret_name, p_vaultOCID=vauld_ocid)
          except (Exception, ValueError) as ex:
              logging.getLogger().info('error retrieving the secret: ' + str(ex))
          try:
              sent_logs = publish_message(topic=topic_name, bootstrap_srv=bootstrap_server, security_protocol=security_protocol,
                               ca_pem=decoded_secret, client_pem=client_certificate, record_value=bytes(str(body[0]), encoding='utf-8'))
              logging.info(f'log is sent {sent_logs}')
          except (Exception, ValueError) as ex:
              logging.getLogger().info('error in publishing the message: ' + str(ex))
      
      def decodeSecret(p_signer, p_secretName, p_vaultOCID):
          secretClient = oci.secrets.SecretsClient(config={}, signer=p_signer)
          secret = secretClient.get_secret_bundle_by_name(
                                      secret_name=p_secretName, vault_id=p_vaultOCID).data
          secret_content = secret.secret_bundle_content.content.encode("utf-8")
          decodedSecret = base64.b64decode(secret_content).decode("utf-8")
          return decodedSecret
      
      def delivery_report(errmsg, msg):
          if errmsg is not None:
              print("Delivery failed for Message: {} : {}".format(msg.key(), errmsg))
              return
          print('Message successfully produced to Topic:{} at offset {}'.format(
                  msg.topic(), msg.offset()))
      
      def publish_message(topic, bootstrap_srv, security_protocol, ca_pem, client_pem, record_value):
          conf = {
              'bootstrap.servers': bootstrap_srv,
              'security.protocol': security_protocol,
              'ssl.certificate.pem': client_pem,
              'ssl.ca.pem': ca_pem
          }
          producer = Producer(conf)
          produce_log = producer.produce(topic, key=None, value=record_value, on_delivery=delivery_report)
          producer.flush()
          return produce_log
      
    5. 次に、func.yamlの内容を置き換えます。フィールドに適切な情報を入力します。

      schema_version: 20180708
      name: logs_to_kafka
      version: 0.0.1
      runtime: python
      build_image: fnproject/python:3.9-dev
      run_image: fnproject/python:3.9
      entrypoint: /python/bin/fdk /function/func.py handler
      memory: 256
      config:
          bootstrap_server: <kafka_server_public_ip>:9093
          ca_cert_secret_name: ca_cert_secret_name
          client_cert_secret_name: client_cert_secret_name
          security_protocol: SSL
          topic_name: quickstart-events
          vauld_ocid: <vault_ocid>
      
    6. requirements.txtには、次のものが必要です。

      fdk>=0.1.50
      confluent-kafka>=1.4.2
      oci>=2.57.0
      
  4. 「分析およびAI」「サービス・コネクタ・ハブ」に移動して、「サービス・コネクタの作成」を選択します。次の図に示すように、フィールドに入力します。

    「サービス・コネクタの作成」ウィンドウ

タスク4: ログがKafkaサーバーに送信されているかどうかの確認

  1. すべてのSSL証明書およびバイナリを含むKafkaフォルダがサーバーにも使用されるため、Kafkaサーバーのフォルダがあるインスタンスでクライアントを実行します。

  2. 最初に、次の consumer.propertiesファイルがあります。

    security.protocol=SSL
    ssl.truststore.location=<path to kafka.client.truststore.jks>
    ssl.truststore.password=<password>
    ssl.keystore.location=<path to kafka.client.keystore.jks>
    ssl.keystore.password=<password>
    ssl.key.password=<password>
    ssl.endpoint.identification.algorithm=
    
  3. 次に、Kafkaフォルダで次のコマンドを実行します:

    bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server <instance_public_ip>:9093 --consumer.config <path_to_consumer.properties>
    

承認

その他の学習リソース

docs.oracle.com/learnで他のラボをご覧いただくか、Oracle Learning YouTubeチャネルでより無料のラーニング・コンテンツにアクセスしてください。また、education.oracle.com/learning-explorerにアクセスして、Oracle Learning Explorerになります。

製品ドキュメントについては、Oracle Help Centerを参照してください。