ノート:
- このチュートリアルでは、Oracle Cloudへのアクセスが必要です。無料アカウントにサインアップするには、Oracle Cloud Infrastructure Free Tierの開始を参照してください。
- Oracle Cloud Infrastructureの資格証明、テナンシおよびコンパートメントの値の例を使用します。演習を完了するときに、これらの値をクラウド環境に固有の値に置き換えます。
サードパーティSIEMのサーバーレス機能を使用したOracle Cloud Infrastructureログのリダクション
イントロダクション
Oracle Cloud Infrastructure Connector Hub (OCI Connector Hub)は、OCIのサービス間のデータ移動を調整します。コネクタは、移動対象のデータを含むソース・サービス、オプションのタスク、およびデータの配信先となるターゲット・サービスを指定します。
コネクタを使用してOCIロギング・サービスからログ・データを転送するロギング・ソースを選択すると、ソースからデータを読み取り、ターゲット・サービス「現状のまま」に書き込みます。一部のSIEMツールには、ソースjsonペイロードの要素をリダクションするための要件がある場合があります。暗号化することも、領域を消費することもでき、データ変換に価値を提供することもできません。
オプションのファンクション・タスクは、ソースからのログ・データをフィルタしてからターゲット・サービスに書き込むログ・フィルタで、この種の要件のコネクタで役立ちます。
このソリューションは、SIEMツールが、再設計、トレーニング、プロセスの変更に関する微調整を最小限に抑えながら、OCIを迅速に導入するのに役立ちます。この自動化は、ロギング・サービスからのOCI監査、サービスおよびカスタム・ログをサポートします。
目的
- OCIログ
json payload
から暗号化された要素または機密要素を動的にリダクタします。
前提条件
Oracle Cloud Infrastructure Identity and Access Management (OCI IAM)管理者は、次を作成する必要があります:
-
監査ログを起動およびエンリッチするOCI関数の動的グループおよびポリシー。
-
動的グループ:ファンクション・コンパートメントの動的グループを作成します。詳細は、「動的グループの作成」を参照してください。
# Replace OCID for function compartment All {resource.type = 'fnfunc', resource.compartment.id = '<function-compartment>'} Example: All {resource.type = 'fnfunc', resource.compartment.id = 'ocid1.compartment.oc1..aaaaaaaanovmfmmnonjjyxeq4jyghszj2eczlrkgj5svnxrt...'}
-
-
開発者がファンクションおよびコネクタを作成およびデプロイするためのコンパートメント、グループおよびポリシー。
# Replace group-name as per your tenancy Allow group <group-name> to manage repos in compartment <function-compartment> allow group <group-name> to use cloud-shell in tenancy allow group <group-name> to manage functions-family in compartment <function-compartment> allow group <group-name> to use virtual-network-family in compartment <network-compartment> Allow group <group-name> to use objectstorage-namespaces in tenancy Allow group <group-name> to use stream-family in tenancy
タスク1: データ・リダクション用のOCIファンクションの作成およびデプロイ
-
OCIコンソールにログインし、「OCI Cloud Shell」をクリックします。
-
OCI Cloud ShellからFn Projectコマンドライン・インタフェース(CLI)を使用してファンクションを作成します。詳細は、Helloworldファンクションの作成、デプロイおよび呼出しを参照してください。
fn init --runtime python <function-name> Example: fn init --runtime python payload-trans-log-func
-
ディレクトリを新たに作成されたディレクトリに変更します。
-
ファンクションをデプロイするアプリケーションを作成します。
# Specify the OCID of subnet fn create app <function-app-name> --annotation oracle.com/oci/subnetIds='["<subnet OCID>"]' Example: fn create app payload-trans-log-app --annotation oracle.com/oci/subnetIds='["ocid1.subnet.oc1.ap-mumbai-1.aaaaaaaabitp32dkxxxxxxxxxxxxxk3evl2nxivwb....."]'
-
既存のコンテンツを上書きして、
func.py
ファイルに次のスクリプトをコピーして貼り付けます。import io import os import oci import json from fdk import response import requests import logging def handler(ctx, data: io.BytesIO = None): logger = logging.getLogger() try: exclude_list = [] # Reading json element from function config parameter for 'exclude' exclude_json = json.loads(os.environ["exclude"]) for pkey, ckey in exclude_json.items(): for lkey in ckey: exclude_list.append(pkey + "." + lkey ) except Exception as ex: print('ERROR: Missing configuration key', ex, flush=True) raise try: payload_bytes = data.getvalue() if payload_bytes == b'': raise KeyError('No keys in payload') # Reading source log json payload logs = json.loads(data.getvalue()) logDict1 = [] totalrecs = 0 for auditlogrec in logs: logDict1.append(auditlogrec) totalrecs+= 1 strlogDict = audit_log_parse1(logDict1) # Calling log parse function for data redaction as specified in exclude parameter by user auditlogDict = audit_log_parse(strlogDict, exclude_list) except Exception as ex: print('ERROR: Missing configuration key', ex, flush=True) raise return response.Response( ctx, status_code=200, response_data=json.dumps(auditlogDict), headers={"Content-Type": "application/json"},) # validating number of lines in source to verify after redaction def audit_log_parse1(auditrecs): logger = logging.getLogger() strDict = [] mcount = 0 skip_line = 0 for line in auditrecs: mcount += 1 try: mline = json.dumps(line).replace('\\n',"") strDict.append(mline) except Exception as ex: print("ERROR for line number-" + str(mcount), ex, flush=True) skip_line += 1 pass logger.info('Total lines-' + str(mcount) + ' Skipped lines-' + str(skip_line)) return strDict # log parse function for data redaction as specified in exclude parameter by user def audit_log_parse(rawText, exclude_list): logger = logging.getLogger() logDict = [] mcount = 0 insertcount = 0 skip_line = 0 for line in rawText: if skip_line == 1: line = line_part + line skip_line = 0 mcount += 1 try: audit_data = json.loads(line) except Exception as ex: logger.info('ERROR for line number-' + str(mcount)) skip_line = 1 line_part = line.rstrip() pass if skip_line == 0: for key, value in audit_data.items(): if key == "data": if (isinstance(value, dict)): # Extracting and replacing json payload for nested keys for key1, value1 in value.copy().items(): json_path_l1 = "data" + "." + key1 delete_flag = json_filter(value, exclude_list, json_path_l1) if (isinstance(value1, dict)) and delete_flag == 0: for key2, value2 in value1.copy().items(): json_path_l2 = json_path_l1 + "." + key2 # Extracting and replacing json payload delete_flag = json_filter(value, exclude_list, json_path_l2) if (isinstance(value2, dict)) and delete_flag == 0: for key3, value3 in value2.copy().items(): json_path_l3 = json_path_l2 + "." + key3 delete_flag = json_filter(value, exclude_list, json_path_l3) if (isinstance(value3, dict)) and delete_flag == 0: for key4, value4 in value3.copy().items(): json_path_l4 = json_path_l3 + "." + key4 delete_flag = json_filter(value, exclude_list, json_path_l4) if (isinstance(value4, dict)) and delete_flag == 0: for key5, value5 in value4.copy().items(): json_path_l5 = json_path_l4 + "." + key5 delete_flag = json_filter(value, exclude_list, json_path_l5) if skip_line == 0: logDict.append(audit_data) insertcount += 1 # Number of records written as per source in a batch logger.info('Total count ->' + str(mcount) + ' Insert count ->' + str(insertcount) + ' logDict count->' + str(len(logDict))) return logDict def json_filter(value, exclude_list, json_path): try: mjsonlist = [] for ejson_path in exclude_list: if ejson_path == json_path: jsonlist = ejson_path.split('.') for x in jsonlist: mjsonlist.append(x) if len(jsonlist) == 2: del value[mjsonlist[1]] return 1 if len(jsonlist) == 3: del value[mjsonlist[1]][mjsonlist[2]] return 1 if len(jsonlist) == 4: del value[mjsonlist[1]][mjsonlist[2]][mjsonlist[3]] return 1 if len(jsonlist) == 5: del value[mjsonlist[1][mjsonlist[2]][mjsonlist[3]]][mjsonlist[4]] return 1 return 0 except Exception as ex: print("ERROR for redacting elements from payload", ex, flush=True) return 1 pass
-
requirements.txt
ファイルで次のパッケージを確認するか、更新します。fdk requests oci
-
func.yaml
のexcludeセクションまたはデプロイメント後のファンクション構成パラメータの必須でないJSON要素を更新します。Config: exclude: {} Example: config: exclude: '{ "data.identity": [ "credentials"], "data.request.headers": [ "authorization", "Authorization", "X-OCI-LB-PrivateAccessMetadata", "opc-principal" ] }'
-
次のコマンドを実行して、ファンクションをデプロイします。
fn -v deploy --app <app-name> Example: fn -v deploy --app payload-trans-log-app
タスク2: ファンクション・タスクを使用したOCIコネクタ・ハブの作成およびデプロイ
-
OCIコンソールに移動し、「コネクタ」をクリックして「コネクタの作成」を選択します。
-
「コネクタの作成」ページで、新しいコネクタのわかりやすい「名前」およびオプションの「説明」を入力します。
-
新しいコネクタを格納するコンパートメントを選択します。
-
「コネクタの構成」セクションの「ソース」で、「ロギング」を選択します。
-
「ターゲット」で、ログ・データの転送先のサービスを選択します:
-
オブジェクト・ストレージ:ログ・データをバケットに送信します。
-
ストリーミング:ログ・データをストリームに送信します。
ノート:サポートされているターゲットをさらに使用できます。このチュートリアル・ソリューションに従って、OCIストリーミングまたはOCIオブジェクト・ストレージについて説明します。
-
-
「ソース接続の構成」で、ルートおよびログ・グループ(「監査」)のコンパートメント名を選択します。
-
(オプション)「ファンクション・タスクの構成」セクションで、OCIファンクション・サービスを使用してソースからデータを処理するようにファンクション・タスクを構成します。
-
タスクの選択:関数を選択します。
-
コンパートメント:ファンクションを含むコンパートメントを選択します。
-
ファンクション・アプリケーション:ファンクションを含むファンクション・アプリケーションの名前を選択します。
-
ファンクション:ソースから受信したデータの処理に使用するファンクションの名前を選択します。
-
-
「ターゲット」として「オブジェクト・ストレージ」を選択した場合は、「ターゲットの構成」で、ログ・データの送信先となるバケットを構成します。
-
コンパートメント:必要なバケットを含むコンパートメントを選択します。
-
バケット:データの送信先のバケットの名前を選択します。
-
オブジェクト名接頭辞: (オプション)接頭辞値を入力します。
-
(オプション)「追加オプションの表示」をクリックし、バッチ・サイズ(MB)およびバッチ時間(ミリ秒)の値を入力します。
または
「ターゲット」として「ストリーミング」を選択した場合は、「ターゲットの構成」で、ログ・データの送信先となるストリームを構成します。
-
コンパートメント:必要なストリームを含むコンパートメントを選択します。
-
ストリーム:データの送信先のストリームの名前を選択します。
-
-
「作成」をクリックします。
タスク3: OCI監査ログのリダクション要素の検証
このチュートリアルでは、監査ログを例として使用していますが、データ・リダクションにOCI Connector Hubだけでなく、サービス・ログまたはカスタム・ログも使用できます。コネクタの作成後に「メトリック」セクションに移動して、データがターゲットに書き込まれたかどうかを確認します。メトリックの詳細は、コネクタ・ハブ・メトリック・リファレンスを参照してください。
-
「コネクタ」ページで、操作するメトリックを含むコネクタを選択します。
-
「コネクタの詳細」ページで、「メトリック」を選択します。
-
(オプション)エラー、レイテンシ、ソース、ターゲットまたはタスクでメトリックをフィルタします。
エンリッチメントの前後のペイロードの例を確認できます。
-
リダクション前:
-
リダクション後
-
タスク4: サードパーティSIEMによるOCI監査ログの使用
このソリューションは、一般的なSIEMツール(Splunkなど)にデプロイされています。kafka接続またはOCI Object Storageからの消費をサポートするサード・パーティSIEMを使用できます。詳細は、Kafka用のSplunk Connectのインストールと管理およびOCIストリーミングおよびKafka Connect to Splunkを使用したストリームOCIログを参照してください。
関連リンク
承認
-
著者 - Dipesh Kumar Rathod (マスター・プリンシパル・クラウド・アーキテクト、インフラストラクチャ)
-
貢献者–Balaji Meruva (インフラストラクチャ、プリンシパル・クラウド・アーキテクト)
その他の学習リソース
docs.oracle.com/learnの他のラボを確認するか、Oracle Learning YouTubeチャネルで無料のラーニング・コンテンツにアクセスしてください。また、education.oracle.com/learning-explorerにアクセスしてOracle Learning Explorerになります。
製品ドキュメントは、Oracle Help Centerを参照してください。
Redact Oracle Cloud Infrastructure Logs using Serverless Function for Third-Party SIEMs
G18654-01
November 2024