ノート:
- このチュートリアルではOracle Cloudへのアクセスが必要です。無料アカウントにサインアップするには、Oracle Cloud Infrastructure Free Tierの開始を参照してください。
- Oracle Cloud Infrastructure資格証明、テナンシおよびコンパートメントの値の例を使用します。演習を完了する場合は、これらの値をクラウド環境に固有の値に置き換えてください。
OCIファンクションを使用して、OCIロギングからmTLS構成済のKafkaにログをプッシュします
イントロダクション
ログはリソースの日記を表し、ネットワーク・システムからすべてのメッセージに関するレコードを取得します。組織のクラウド・インフラストラクチャ内に多くのリソースがある場合は、パフォーマンスとセキュリティを維持する必要があります。ログ収集、組織のエコシステム内の様々なリソースからログ・データを収集し、それらを一元的な分析ポイントにまとめるプロセスが必要です。このチュートリアルでは、OCIの外部にあるmTLSを使用してKafkaサーバー設定にログをプッシュするためにOracle Cloud Infrastructure (OCI)で必要なリソースを設定するプロセスについて説明します。
この構成を使用すると、OCIテナンシから一元的な分析ポイントにリアルタイムのログを取得するソリューションが提供されます。さらに、LogstashをKafkaサーバーにリンクして、要件に従ってデータを変換/拡張し、ElasticSearch索引に配置できます。
目的
- OCIサービス・コネクタ・ハブの作成および使用
- OCIファンクション構成でのOCI Vaultの作成および使用
- KafkaのmTLSを構成します
- OCIロギング・サービスからログを取得し、OCI Service Connector HubとOCI Functionを使用して、mTLSで構成されたKafkaにリダイレクトします
前提条件
- Oracle Cloudアカウント。アカウントがない場合は、Oracle Cloud Free Tierアカウントのサインアップできます。
- ファンクション・アプリケーションを作成するネットワーク設定。ファンクション・アプリケーションが作成されるパブリック・サブネットでVCNがすでに作成されていることを前提としています。
タスク1: Oracle Cloud Infrastructure設定の構成
この項では、OCIに必要なインフラストラクチャの作成に使用する最初のステップについて説明します。次のリソースを作成します:
-
マスター暗号化キーおよびシークレットを使用したVaultの作成
-
「アイデンティティとセキュリティ」メニューに移動し、「Vault」、「Vaultの作成」の順に選択します。
-
ボールトの名前を入力し、「Vaultの作成」をクリックします。
-
ボールトの作成後、作成したボールトをクリックし、AESアルゴリズムを使用してマスター暗号化キーを作成します。このキーは、このブログのボールト・シークレットをさらに暗号化するために使用されます。
-
-
OCIファンクションで使用されるイメージのコンテナ・レジストリの作成
-
「開発者サービス」、「コンテナおよびアーティファクト」に移動して、「コンテナ・レジストリ」を選択します。
-
logs_to_kafkaという名前のプライベート・リポジトリを作成します。
-
-
Functionsアプリケーションの作成
-
Kafkaサーバーにログを収集するには、Kafkaのトピックに公開するKafkaクライアントが必要です。「ファンクション」セクションの「開発者サービス」に移動します。
-
「アプリケーションの作成」を選択し、アプリケーションにlogs_to_kafkaという名前を付けます。
-
このブログの「前提条件」セクションで作成したVCNを選択し、アプリケーションをデプロイするVCN内のパブリック・サブネットを選択します。そのパブリック・サブネットからのVCN外部のトラフィックを許可するために必要なセキュリティ・ルールおよびルート・ルールがあることを確認してください。
-
タスク2: Kafkaサーバーのインストールと構成
-
KafkaサーバーがデプロイされるLinuxインスタンスを作成します。
-
まず、VMインスタンスを作成し、
ssh
アクセスを有効にして、mTLSを使用してKafkaをインストールおよび構成します。 -
ssh
を使用してVMにアクセスし、クイックスタートのステップに従って、そのインスタンスにKafkaをインストールします。
-
-
Kafka ServerのmTLSを構成し、mTLSに使用される証明書を作成します。インスタンスのホーム・ディレクトリに移動し、次のコマンドを実行します。
$ mkdir ~/ssl # This one, ca-cert is the certificate authority you will use in the function $ openssl req -new -x509 -keyout ssl/ca-key -out ssl/ca-cert -days 365 -nodes $ keytool -keystore ssl/kafka.server.keystore.jks -alias server -validity 365 -genkey -keyalg RSA -storepass <password> -keypass <password> $ keytool -keystore ssl/kafka.client.keystore.jks -alias client -validity 365 -genkey -keyalg RSA -storepass <password> -keypass <password> $ keytool -keystore ssl/kafka.server.truststore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt $ keytool -keystore ssl/kafka.client.truststore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt $ keytool -keystore ssl/kafka.server.keystore.jks -alias server -storepass <password> -certreq -file ssl/server-cert-file $ keytool -keystore ssl/kafka.client.keystore.jks -alias client -storepass<password> -certreq -file ssl/client-cert-file $ openssl x509 -req -CA ssl/ca-cert -CAkey ssl/ca-key -in ssl/server-cert-file -out ssl/server-cert-signed -days 365 -CAcreateserial -passin pass:<password> $ openssl x509 -req -CA ssl/ca-cert -CAkey ssl/ca-key -in ssl/client-cert-file -out ssl/client-cert-signed -days 365 -CAcreateserial -passin pass:<password> $ keytool -keystore ssl/kafka.server.keystore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt $ keytool -keystore ssl/kafka.server.keystore.jks -alias server -storepass <password> -importcert -file ssl/server-cert-signed -noprompt $ keytool -keystore ssl/kafka.client.keystore.jks -alias CARoot -storepass <password> -importcert -file ssl/ca-cert -noprompt $ keytool -keystore ssl/kafka.client.keystore.jks -alias client -storepass <password> -importcert -file ssl/client-cert-signed -noprompt # This one, other_client_cert.pem is the client certificate you use in the function $ keytool -exportcert -alias client -keystore kafka.client.keystore.jks -rfc -file other_client_cert.pem
-
Kafkaサーバーのconfig/フォルダからserver.propertiesを編集します。
-
SSLキーおよび証明書が作成されたら、それらをKafka構成に追加する必要があります。Kafkaフォルダの下のconfig/server.propertiesを編集し、次の行を追加します:
listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9093 advertised.listeners=PLAINTEXT://<hostname>:9092,SSL://<public ip of hostname:9093 ssl.keystore.location=<path to ssl/kafka.server.keystore.jks> ssl.keystore.password=<password> ssl.key.password=<password> ssl.truststore.location=<path to ssl/kafka.server.truststore.jks> ssl.truststore.password=<password>
-
Kafkaサーバー・ポートのインスタンスのファイアウォールにポート許容量を追加します。セキュリティ上の観点から、まず、VM内のファイアウォールがKafkaポートへのアクセスを許可するように構成されている必要があります。
sudo firewall-cmd --add-port=9093/tcp sudo firewall-cmd --add-port=9092/tcp
-
そのインスタンスへのトラフィックを許可するようにネットワークが構成されていることを確認します。VM内のファイアウォールが適切な構成であることを確認したら、セキュリティ・ルールとルート・ルール、またはKafka固有のポート上のそのインスタンスへのトラフィックを許可するためにトラフィックをフィルタするその他のタイプのファイアウォールを構成する必要があります。
-
次のコマンドを実行して、Kafkaサーバーをバックグラウンドで実行します。
nohup bin/zookeeper-server-start.sh config/zookeeper.properties & nohup bin/kafka-server-start.sh config/server.properties &
タスク3: OCIファンクションおよびサービス・コネクタ・ハブの設定
ファンクションおよびアプリケーションを作成、デプロイおよび管理するポリシーがあることを確認します。また、サービスFAASによるボールト、シークレットおよびキーへのアクセスも許可する必要があります。
-
以前に作成したOCI Vaultにca-certおよびclient_cert.pemシークレットを作成します。
協奏曲
client_cert.pem
-
OCIコンソールで、「関数」→「構成」に移動し、次の変数を追加します:
-
OCIファンクションの設定
-
「ファンクション」セクションの「開発者サービス」に移動します。
-
すでに作成されたアプリケーションlogs_to_kafkaに移動し、「開始」を選択してから、「クラウド・シェル設定」を選択し、dockerログイン・コマンドまでステップを実行します。
-
logs_to_kafka
ボイラープレート関数を生成します。fn init --runtime python logs_to_kafka
fn initコマンドは、logs_to_kafkaというフォルダを生成し、内部に3つのファイル(func.py、func.yamlおよびrequirements.txt)を含めます。
-
func.pyを開き、ファイルの内容を次のコードに置き換えます。
import io import json import logging import oci import base64 from confluent_kafka import Producer, KafkaError def handler(ctx, data: io.BytesIO = None): cfg = ctx.Config() signer = oci.auth.signers.get_resource_principals_signer() try: topic_name = str(cfg["topic_name"]) bootstrap_server = str(cfg["bootstrap_server"]) security_protocol = str(cfg["security_protocol"]) secret_name = str(cfg["ca_cert_secret_name"]) client_cert_secret = str(cfg["client_cert_secret_name"]) vauld_ocid = str(cfg["vauld_ocid"]) except: logging.error('Some of the function config keys are not set') raise try: body = json.loads(data.getvalue()) except (Exception, ValueError) as ex: logging.getLogger().info('error parsing json payload: ' + str(ex)) try: client_certificate = decodeSecret( p_signer=signer, p_secretName=client_cert_secret, p_vaultOCID=vauld_ocid) except (Exception, ValueError) as ex: logging.getLogger().info('error retrieving the client certificate from vault: ' + str(ex)) try: decoded_secret = decodeSecret( p_signer=signer, p_secretName=secret_name, p_vaultOCID=vauld_ocid) except (Exception, ValueError) as ex: logging.getLogger().info('error retrieving the secret: ' + str(ex)) try: sent_logs = publish_message(topic=topic_name, bootstrap_srv=bootstrap_server, security_protocol=security_protocol, ca_pem=decoded_secret, client_pem=client_certificate, record_value=bytes(str(body[0]), encoding='utf-8')) logging.info(f'log is sent {sent_logs}') except (Exception, ValueError) as ex: logging.getLogger().info('error in publishing the message: ' + str(ex)) def decodeSecret(p_signer, p_secretName, p_vaultOCID): secretClient = oci.secrets.SecretsClient(config={}, signer=p_signer) secret = secretClient.get_secret_bundle_by_name( secret_name=p_secretName, vault_id=p_vaultOCID).data secret_content = secret.secret_bundle_content.content.encode("utf-8") decodedSecret = base64.b64decode(secret_content).decode("utf-8") return decodedSecret def delivery_report(errmsg, msg): if errmsg is not None: print("Delivery failed for Message: {} : {}".format(msg.key(), errmsg)) return print('Message successfully produced to Topic:{} at offset {}'.format( msg.topic(), msg.offset())) def publish_message(topic, bootstrap_srv, security_protocol, ca_pem, client_pem, record_value): conf = { 'bootstrap.servers': bootstrap_srv, 'security.protocol': security_protocol, 'ssl.certificate.pem': client_pem, 'ssl.ca.pem': ca_pem } producer = Producer(conf) produce_log = producer.produce(topic, key=None, value=record_value, on_delivery=delivery_report) producer.flush() return produce_log
-
次に、func.yamlの内容を置き換えます。フィールドに適切な情報を入力します。
schema_version: 20180708 name: logs_to_kafka version: 0.0.1 runtime: python build_image: fnproject/python:3.9-dev run_image: fnproject/python:3.9 entrypoint: /python/bin/fdk /function/func.py handler memory: 256 config: bootstrap_server: <kafka_server_public_ip>:9093 ca_cert_secret_name: ca_cert_secret_name client_cert_secret_name: client_cert_secret_name security_protocol: SSL topic_name: quickstart-events vauld_ocid: <vault_ocid>
-
requirements.txtには、次のものが必要です。
fdk>=0.1.50 confluent-kafka>=1.4.2 oci>=2.57.0
-
-
「分析およびAI」、「サービス・コネクタ・ハブ」に移動して、「サービス・コネクタの作成」を選択します。次の図に示すように、フィールドに入力します。
タスク4: ログがKafkaサーバーに送信されているかどうかの確認
-
すべてのSSL証明書およびバイナリを含むKafkaフォルダがサーバーにも使用されるため、Kafkaサーバーのフォルダがあるインスタンスでクライアントを実行します。
-
最初に、次の consumer.propertiesファイルがあります。
security.protocol=SSL ssl.truststore.location=<path to kafka.client.truststore.jks> ssl.truststore.password=<password> ssl.keystore.location=<path to kafka.client.keystore.jks> ssl.keystore.password=<password> ssl.key.password=<password> ssl.endpoint.identification.algorithm=
-
次に、Kafkaフォルダで次のコマンドを実行します:
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server <instance_public_ip>:9093 --consumer.config <path_to_consumer.properties>
関連リンク
承認
- 作成者 - Cristian Vlad (プリンシパル・クラウド・アーキテクト)
- コントリビュータ - Gabriel Feodorov (クラウド・エンジニア)
その他の学習リソース
docs.oracle.com/learnで他のラボをご覧いただくか、Oracle Learning YouTubeチャネルでより無料のラーニング・コンテンツにアクセスしてください。また、education.oracle.com/learning-explorerにアクセスして、Oracle Learning Explorerになります。
製品ドキュメントについては、Oracle Help Centerを参照してください。
Push logs from OCI Logging into mTLS configured Kafka using OCI Functions
F79041-01
March 2023
Copyright © 2023, Oracle and/or its affiliates.