ノート:
- このチュートリアルでは、Oracle Cloudへのアクセスが必要です。無料アカウントにサインアップするには、Oracle Cloud Infrastructure Free Tierの開始を参照してください。
- Oracle Cloud Infrastructureの資格証明、テナンシおよびコンパートメントの値の例を使用します。演習を完了するときに、これらの値をクラウド環境に固有の値に置き換えます。
Oracle Cloud Infrastructure Functionsを使用したOracle Cloud Infrastructureプライベート・ストリームへのログの書込み
イントロダクション
ログは最新のクラウド運用の基礎であり、システム・アクティビティ、パフォーマンス、セキュリティに関する重要なインサイトを提供します。機密データを処理する企業にとって、ログを安全かつ効率的に管理することが最も重要です。このチュートリアルでは、Oracle Cloud Infrastructure (OCI)を使用したログ管理のための、セキュアでスケーラブルなイベントドリブンなアーキテクチャを構築する方法について説明します。
このソリューションの中核はOCI Functionsです。これにより、パブリック・エンドポイントにさらされることなく、ログを安全に転送および処理できます。アーキテクチャは、OCI Connector Hubを介してOCI Object Storageに収集および圧縮されたログからのイベントによって駆動されます。ログ・ファイルが作成されるたびに、OCI Functionsがトリガーされ、OCI Functionsは圧縮解除され、管理可能なチャンクに分割され、セキュアなストレージまたはさらなる処理のためにログをプライベート・ストリームに公開します。
このイベントドリブンのアプローチにより、コンポーネントが変化に動的に反応し、レイテンシを最小限に抑え、効率を最大化する、シームレスで自動化されたパイプラインが保証されます。OCIのネイティブ・サービスを活用することで、大量のログ処理をリアルタイムで処理するためのセキュアでスケーラブルなフレームワークを実現できます。
目標
- 機密ログ・データの保護とクラウド環境での運用の最適化を目指す企業向けに設計された、回復性とセキュアなログ管理ワークフローを実装します。
前提条件
-
OCIテナンシへのアクセス。
-
OCI Object Storage、OCIログ、OCI Connector Hub、OCI Event Serviceルール、Oracle ApplicationsおよびOCI Streamingサービスを管理する権限。
タスク1: 必要なポリシーおよびOracle Cloud Infrastructure Identity and Access Management (OCI IAM)権限の設定
このソリューションの各コンポーネントは、相互作用するOCIリソースにアクセスできる必要があります。このチュートリアルに従うには、次の権限が必要です。
-
ユーザー・ポリシー: OCI Object Storage、OCI Connector Hub、OCI Event Serviceルール、OCIログ、OCI FunctionsおよびOCIストリームを管理します。OCI Vault (オプション)およびネットワーク・ファミリで使用します。
-
サービス・ポリシー: OCIオブジェクト・ストレージ・バケット(読取りオブジェクト)からメッセージを読み取り、ストリームに書き込む(ストリーム・プッシュを使用)ためのファンクション権限を付与します。動的グループが必要です。
詳細なポリシーはここにあります。
タスク2: プライベート・ストリームの作成
OCI Streamingは、保存中および転送中にデータが暗号化されるフルマネージドOCIサービスであり、メッセージの整合性とセキュリティを確保します。セキュリティを強化するために、OCI Vaultサービスを使用して、特定のコンプライアンスまたはセキュリティ要件を満たす独自の暗号化キーを格納および管理できます。プライベート・エンドポイントは、Virtual Cloud Network (VCN)内で構成して、ストリームをさらに保護し、プライベートIPアドレスをストリーム・プールに関連付けることができます。これにより、OCIストリーミング・トラフィックがVCN内にとどまり、インターネット全体が回避されます。ただし、プライベート・エンドポイントを使用するストリームにはインターネットからアクセスできず、コンソールを介して最新のメッセージを表示する機能が制限されることに注意してください。プライベート・ストリームからのメッセージを消費するには、コンシューマには、プライベート・ストリームがホストされているネットワークへのルートとアクセスの両方が必要です。
ストリームおよびストリーム・プールを作成します。「ストリーム名」と入力し、「新規ストリーム・プールの作成」を選択してストリーム・プールを作成します。「ストリーム・プールの構成」セクションで、「ストリーム・プール名」を入力し、「プライベート・エンドポイント」を選択して、それに応じてVCNおよび「サブネット」およびネットワークの詳細を入力します。オプションですが、ネットワーク・セキュリティ・グループに、そのNSG内のすべてのトラフィックに対するイングレス・ルールを提供することをお薦めします。詳細は、ストリームの作成およびストリーム・プールの作成を参照してください。
独自の暗号化キーを使用することで、キーのライフサイクルをより詳細に制御できます。ストリーム内のメッセージの保存を調整するオプションがあります。デフォルトは1日で、最大は7日です。
ストリームOCIDおよびメッセージ・エンドポイントを書き留めます。この情報を関数に渡す必要があります。
タスク3: OCI Connector Hubの作成および構成
OCI Connector Hubはセキュアなメッセージ・バスとして機能し、ソースと宛先間のシームレスで信頼性の高いデータ転送を促進します。このアーキテクチャでは、ソースはOCIロギングで、宛先はOCI Object Storageです。このOCI Object Storageでは、これらのログは圧縮され、さらに処理するために格納されます。OCI Connector Hubは、仲介役として機能することで、転送されたメッセージのセキュリティと整合性を維持しながら、効率的なデータ・フローを保証します。
このチュートリアルでは、フロー・ログがサブネットで有効になり、OCIオブジェクト・ストレージ・バケットが使用可能であることを前提としています。フロー・ログの有効化およびバケットの作成の詳細は、フロー・ログの有効化およびオブジェクト・ストレージ・バケットの作成を参照してください。
バケットの作成時に、「オブジェクト・イベントの出力」を選択する必要があります。これは、イベント主導型のアーキテクチャの鍵です。
OCIコネクタ・ハブを構成し、OCIロギング・サービスとOCIオブジェクト・ストレージ・バケットの間にデータ・フローを作成します。詳細は、ロギング・ソースを使用したコネクタの作成を参照してください。
バッチ・ロールオーバー詳細を調整することで、ログがOCIオブジェクト・ストレージ・バケットに書き込まれる頻度を構成できます。デフォルトは100MBまたは7分です。
タスク4: ファンクションの開発およびデプロイ
この関数は、OCIオブジェクト・ストレージからオブジェクトを読み取り、メッセージをストリームに書き込みます。そのためには、次の操作を次の間に実行します。
- オブジェクトをバケットから読み取ります。
- オブジェクトを圧縮解除します。
- オブジェクト・サイズを確認し、必要に応じて1MBのチャンクを作成します。OCIストリーミング・サービスは、プロデューサがストリームに公開できる一意のメッセージの最大サイズを1MBに制限します。
- メッセージをエンコードします。
- ストリームに公開します。
詳細は、ファンクションの作成を参照してください。
-
func.py
:import io import json import logging import oci import gzip from base64 import b64encode def handler(ctx, data: io.BytesIO = None): try: # Parse the incoming data cfg = ctx.Config() body = json.loads(data.getvalue()) bucket_name = body["data"]["additionalDetails"]["bucketName"] object_name = body["data"]["resourceName"] stream_ocid = cfg["stream_ocid"] stream_endpoint = cfg["stream_endpoint"] logging.getLogger().info(f'Function invoked for bucket upload: {bucket_name}') except (Exception, ValueError) as ex: logging.getLogger().error(f'Error parsing JSON payload: {str(ex)}') return {"status": 400, "message": "Bad Request"} try: # Get the object data from Object Storage object_content = get_object(bucket_name, object_name) # Check if the object is a .gz file and decompress it if object_name.endswith('.gz'): logging.getLogger().info(f'Decompressing object: {object_name}') object_content = gzip.decompress(object_content) logging.getLogger().info(f'Object Content: {object_content.decode("utf-8")[:100]}...') # Split content into message chunks ensuring no message is split messages = split_content_into_messages(object_content, max_size=1024*1024) # Publish messages to the stream for message in messages: publish_to_stream(stream_ocid, stream_endpoint, data.getvalue().decode('utf-8'), message) return {"status": 200, "message": "Successfully processed object update"} except Exception as ex: logging.getLogger().error(f'Error processing object: {str(ex)}') return {"status": 500, "message": "Internal Server Error"} def get_object(bucket_name, object_name): signer = oci.auth.signers.get_resource_principals_signer() object_storage_client = oci.object_storage.ObjectStorageClient(config={}, signer=signer) namespace = object_storage_client.get_namespace().data try: logging.getLogger().info(f'Searching for bucket: {bucket_name}, object: {object_name}') obj = object_storage_client.get_object(namespace, bucket_name, object_name) logging.getLogger().info(f'Found object: {object_name}') return obj.data.content except Exception as ex: logging.getLogger().error(f'Failed to retrieve object: {str(ex)}') raise def publish_to_stream(stream_ocid, stream_endpoint, event_data, object_content): signer = oci.auth.signers.get_resource_principals_signer() stream_client = oci.streaming.StreamClient(config={}, signer=signer, service_endpoint=stream_endpoint) # Build the message list message_list = [ oci.streaming.models.PutMessagesDetailsEntry( key=b64encode("partition-key-1".encode()).decode(), value=b64encode(object_content).decode() ), ] try: logging.getLogger().info(f"Publishing {len(message_list)} messages to stream {stream_ocid}") put_message_details = oci.streaming.models.PutMessagesDetails(messages=message_list) put_message_result = stream_client.put_messages(stream_ocid, put_message_details) # Log publishing results for entry in put_message_result.data.entries: if entry.error: logging.getLogger().error(f"Error publishing message: {entry.error_message}") else: logging.getLogger().info(f"Published message to partition {entry.partition}, offset {entry.offset}") except Exception as ex: logging.getLogger().error(f"Failed to publish messages to stream: {str(ex)}") raise def split_content_into_messages(content, max_size): """ Splits content into messages of specified max size while ensuring no message is split. Attempts to split based on newline characters for text content. """ messages = [] current_chunk = [] current_size = 0 for line in content.decode('utf-8').splitlines(keepends=True): line_size = len(line.encode('utf-8')) if current_size + line_size > max_size: messages.append(''.join(current_chunk).encode('utf-8')) current_chunk = [line] current_size = line_size else: current_chunk.append(line) current_size += line_size if current_chunk: messages.append(''.join(current_chunk).encode('utf-8')) return messages
-
func.yaml
:schema_version: 20180708 name: logs-to-pvt-stream version: 0.0.1 runtime: python build_image: fnproject/python:3.9-dev run_image: fnproject/python:3.9 entrypoint: /python/bin/fdk /function/func.py handler memory: 256 config: stream_ocid: ocid1.stream.123445 stream_endpoint: https://xyz.us-ashburn-1.oci.oraclecloud.com
-
requirements.txt
:fdk oci
最後のステップは、プライベート・ストリームがどこにあるかを関数に伝えることです。このファンクションは、構成パラメータを使用して、別のテナンシにデプロイする場合は再利用可能にします。
タスク5: イベントの作成および関数のサブスクライブ
このタスクでは、関数をオブジェクト・アップロード・イベントにサブスクライブします。「イベント・タイプ」のルールを、バケット名を条件属性として「オブジェクト- 作成」として作成します。詳細は、イベント・ルールの作成を参照してください。
検証
データ・フローを検証できる場所は複数あります。
-
ログ・グループ・メトリックを検証して、フロー・ログが収集されているかどうかを確認します。
-
次のホップは、コネクタ・ハブ・メトリックです。OCI Connector Hubはログを収集し、OCI Object Storageに送信します。ソースおよびターゲットにエラーがないことを確認します。
-
次のホップはOCI Object Storageです。オブジェクト数が増加していることを確認します。必要に応じて、読取りおよび書込みログを有効にしてさらにデバッグします。
-
次のホップはOCI Events Serviceです。メトリックをレビューして、配信の失敗がないことを確認します。
-
次のステップでは、ファンクションの起動メトリックを確認します。エラーがなく、関数がスロットルされていないことを確認します。
-
データがプライベート・ストリームに取り込まれていることを確認します。
次のいずれかのチャートにデータがない場合は、そこで停止し、そのサービスのログを有効にします。ログは、特定のリソースがタスクの実行に失敗する理由を説明します。
次のステップ
OCIでセキュアでイベントドリブンのログ管理ソリューションを正常に実装できました。OCI Logging、OCI Connector Hub、OCI Object Storage、OCIプライベート・ストリームの機能を組み合わせることで、ログをほぼリアルタイムで安全に収集、処理、公開できる堅牢なアーキテクチャを構築できました。
このソリューションは、プライベート・ストリームを通じて機密ログ・データを保護し、イベント主導型の自動化の効率性を実証します。システムのスケーリングに伴い、このアーキテクチャはシームレスに適応するため、手作業を最小限に抑えながら大量のログを処理できます。
このフレームワークを導入することで、プライバシー要件へのコンプライアンスを維持しながら、安全で効率的なログ処理を実現できます。このアーキテクチャは、企業のニーズに合せたカスタム処理パイプラインを構築するための柔軟性を提供します。追加の分析またはアラート・メカニズムを使用してこの設定を拡張すると、システム・イベントに関するより深いインサイトが得られ、異常をプロアクティブに検出して対応できるようになります。
OCI FunctionsおよびOCIプライベート・ストリーム機能の使用の詳細は、Oracle担当者に連絡するか、クラウド・セキュリティ・ソリューションを参照してください。
承認
- 作成者 - Aneel Kanuri (Distinguished Cloud Architect)
その他の学習リソース
docs.oracle.com/learnの他のラボを確認するか、Oracle Learning YouTubeチャネルで無料のラーニング・コンテンツにアクセスしてください。また、education.oracle.com/learning-explorerにアクセスしてOracle Learning Explorerになります。
製品ドキュメントは、Oracle Help Centerを参照してください。
Write Logs to Oracle Cloud Infrastructure Private Stream using Oracle Cloud Infrastructure Functions
G23253-01
December 2024