附註:
- 此教學課程需要存取 Oracle Cloud。若要註冊免費帳戶,請參閱 Oracle Cloud Infrastructure Free Tier 入門。
- 它使用 Oracle Cloud Infrastructure 證明資料、租用戶及區間的範例值。完成實驗室時,請將這些值取代為您雲端環境特定的值。
使用適用於第三方 SIEM 的無伺服器功能隱匿 Oracle Cloud Infrastructure 日誌
簡介
Oracle Cloud Infrastructure Connector Hub (OCI Connector Hub) 協調 OCI 中服務之間的資料移動。連線器指定來源服務,其中包含要移動的資料、選擇性工作,以及要傳遞資料的目標服務。
當您選取使用連線器從 OCI 日誌記錄服務傳輸日誌資料的日誌來源時,會從來源讀取資料,並且依原狀寫入目標服務。某些 SIEM 工具可能需要隱匿來源 json 有效負載中的元素。它可以是加密或佔用的空間,而且不提供資料轉換的值。
選擇性函數作業在連線器中對於此類需求很有用,在日誌篩選中篩選日誌資料,然後寫入目標服務。
此解決方案可協助 SIEM 工具在重新架構、訓練或流程變更方面,以最少的角度快速採用 OCI。此自動化支援日誌記錄服務的 OCI 稽核、服務以及自訂日誌。
目標
- 從 OCI 日誌
json payload
動態隱匿加密或機密元素。
必要條件
Oracle Cloud Infrastructure Identity and Access Management (OCI IAM) 管理員應建立:
-
OCI 函數的動態群組和原則,以呼叫並強化稽核日誌。
-
動態群組: 建立函數區間的動態群組。如需詳細資訊,請參閱建立動態群組。
# Replace OCID for function compartment All {resource.type = 'fnfunc', resource.compartment.id = '<function-compartment>'} Example: All {resource.type = 'fnfunc', resource.compartment.id = 'ocid1.compartment.oc1..aaaaaaaanovmfmmnonjjyxeq4jyghszj2eczlrkgj5svnxrt...'}
-
-
開發人員建立及部署函數和連線器的區間、群組以及原則。
# Replace group-name as per your tenancy Allow group <group-name> to manage repos in compartment <function-compartment> allow group <group-name> to use cloud-shell in tenancy allow group <group-name> to manage functions-family in compartment <function-compartment> allow group <group-name> to use virtual-network-family in compartment <network-compartment> Allow group <group-name> to use objectstorage-namespaces in tenancy Allow group <group-name> to use stream-family in tenancy
作業 1:建立及部署資料隱匿的 OCI 函數
-
登入 OCI 主控台,然後按一下 OCI Cloud Shell 。
-
使用 OCI Cloud Shell 的 Fn 專案命令行介面 (CLI) 建立函數。如需詳細資訊,請參閱建立、部署及呼叫 Helloworld 函數。
fn init --runtime python <function-name> Example: fn init --runtime python payload-trans-log-func
-
將目錄變更為新建立的目錄。
-
建立應用程式以部署函數。
# Specify the OCID of subnet fn create app <function-app-name> --annotation oracle.com/oci/subnetIds='["<subnet OCID>"]' Example: fn create app payload-trans-log-app --annotation oracle.com/oci/subnetIds='["ocid1.subnet.oc1.ap-mumbai-1.aaaaaaaabitp32dkxxxxxxxxxxxxxk3evl2nxivwb....."]'
-
藉由覆寫現有內容,複製並貼上
func.py
檔案中的下列命令檔。import io import os import oci import json from fdk import response import requests import logging def handler(ctx, data: io.BytesIO = None): logger = logging.getLogger() try: exclude_list = [] # Reading json element from function config parameter for 'exclude' exclude_json = json.loads(os.environ["exclude"]) for pkey, ckey in exclude_json.items(): for lkey in ckey: exclude_list.append(pkey + "." + lkey ) except Exception as ex: print('ERROR: Missing configuration key', ex, flush=True) raise try: payload_bytes = data.getvalue() if payload_bytes == b'': raise KeyError('No keys in payload') # Reading source log json payload logs = json.loads(data.getvalue()) logDict1 = [] totalrecs = 0 for auditlogrec in logs: logDict1.append(auditlogrec) totalrecs+= 1 strlogDict = audit_log_parse1(logDict1) # Calling log parse function for data redaction as specified in exclude parameter by user auditlogDict = audit_log_parse(strlogDict, exclude_list) except Exception as ex: print('ERROR: Missing configuration key', ex, flush=True) raise return response.Response( ctx, status_code=200, response_data=json.dumps(auditlogDict), headers={"Content-Type": "application/json"},) # validating number of lines in source to verify after redaction def audit_log_parse1(auditrecs): logger = logging.getLogger() strDict = [] mcount = 0 skip_line = 0 for line in auditrecs: mcount += 1 try: mline = json.dumps(line).replace('\\n',"") strDict.append(mline) except Exception as ex: print("ERROR for line number-" + str(mcount), ex, flush=True) skip_line += 1 pass logger.info('Total lines-' + str(mcount) + ' Skipped lines-' + str(skip_line)) return strDict # log parse function for data redaction as specified in exclude parameter by user def audit_log_parse(rawText, exclude_list): logger = logging.getLogger() logDict = [] mcount = 0 insertcount = 0 skip_line = 0 for line in rawText: if skip_line == 1: line = line_part + line skip_line = 0 mcount += 1 try: audit_data = json.loads(line) except Exception as ex: logger.info('ERROR for line number-' + str(mcount)) skip_line = 1 line_part = line.rstrip() pass if skip_line == 0: for key, value in audit_data.items(): if key == "data": if (isinstance(value, dict)): # Extracting and replacing json payload for nested keys for key1, value1 in value.copy().items(): json_path_l1 = "data" + "." + key1 delete_flag = json_filter(value, exclude_list, json_path_l1) if (isinstance(value1, dict)) and delete_flag == 0: for key2, value2 in value1.copy().items(): json_path_l2 = json_path_l1 + "." + key2 # Extracting and replacing json payload delete_flag = json_filter(value, exclude_list, json_path_l2) if (isinstance(value2, dict)) and delete_flag == 0: for key3, value3 in value2.copy().items(): json_path_l3 = json_path_l2 + "." + key3 delete_flag = json_filter(value, exclude_list, json_path_l3) if (isinstance(value3, dict)) and delete_flag == 0: for key4, value4 in value3.copy().items(): json_path_l4 = json_path_l3 + "." + key4 delete_flag = json_filter(value, exclude_list, json_path_l4) if (isinstance(value4, dict)) and delete_flag == 0: for key5, value5 in value4.copy().items(): json_path_l5 = json_path_l4 + "." + key5 delete_flag = json_filter(value, exclude_list, json_path_l5) if skip_line == 0: logDict.append(audit_data) insertcount += 1 # Number of records written as per source in a batch logger.info('Total count ->' + str(mcount) + ' Insert count ->' + str(insertcount) + ' logDict count->' + str(len(logDict))) return logDict def json_filter(value, exclude_list, json_path): try: mjsonlist = [] for ejson_path in exclude_list: if ejson_path == json_path: jsonlist = ejson_path.split('.') for x in jsonlist: mjsonlist.append(x) if len(jsonlist) == 2: del value[mjsonlist[1]] return 1 if len(jsonlist) == 3: del value[mjsonlist[1]][mjsonlist[2]] return 1 if len(jsonlist) == 4: del value[mjsonlist[1]][mjsonlist[2]][mjsonlist[3]] return 1 if len(jsonlist) == 5: del value[mjsonlist[1][mjsonlist[2]][mjsonlist[3]]][mjsonlist[4]] return 1 return 0 except Exception as ex: print("ERROR for redacting elements from payload", ex, flush=True) return 1 pass
-
驗證
requirements.txt
檔案中的下列套裝軟體或更新。fdk requests oci
-
更新
func.yaml
中排除區段或函數組態參數後續部署中的非必要 JSON 元素。Config: exclude: {} Example: config: exclude: '{ "data.identity": [ "credentials"], "data.request.headers": [ "authorization", "Authorization", "X-OCI-LB-PrivateAccessMetadata", "opc-principal" ] }'
-
執行下列命令以部署函數。
fn -v deploy --app <app-name> Example: fn -v deploy --app payload-trans-log-app
作業 2:建立並部署 OCI 連線器中心與功能作業
-
移至 OCI 主控台,按一下連線器並選取建立連線器。
-
在建立連線器頁面中,輸入新連線器的易記名稱和選擇性的描述。
-
選取要在其中儲存新連線器的區間。
-
在設定連線器區段的來源中,選取記錄日誌。
-
在目標中,選取您要將日誌資料傳輸至下列位置的服務:
-
物件儲存:將日誌資料傳送至儲存桶。
-
串流:將日誌資料傳送至串流。
注意:您可以使用更多支援的目標。我們根據此教學課程解決方案描述 OCI Streaming 或 OCI Object Storage。
-
-
在設定來源連線中,選取根和日誌群組的區間名稱 ( 稽核 )。
-
( 選擇性 ) 在設定函數任務區段中,設定函數任務以使用 OCI 函數服務從來源處理資料。
-
選取任務:選取函數。
-
區間: 選取包含此函數的區間。
-
函數應用程式:選取包含函數的函數應用程式名稱。
-
函數:選取您要用來處理從來源接收之資料的函數名稱。
-
-
如果您選取物件儲存作為目標,請在設定目標底下設定要傳送日誌資料的儲存桶。
-
區間: 選取包含所要儲存桶的區間。
-
儲存桶:選取您要傳送資料的儲存桶名稱。
-
物件名稱首碼: ( 選擇性 ) 輸入首碼值。
-
( 選擇性 ) 按一下顯示其他選項,輸入批次大小 (MB) 和批次時間 (毫秒) 的值。
或
如果您選取串流作為目標,請在設定目標底下設定要傳送日誌資料的串流。
-
區間: 選取包含所要串流的區間。
-
串流:選取您要將資料傳送到其中的串流名稱。
-
-
按一下建立。
作業 3:驗證 OCI 稽核日誌中的隱匿元素
在本教學課程中,我們將使用稽核日誌作為範例,但您也可以在 OCI Connector Hub 中使用服務或自訂日誌進行資料隱匿。建立連線器之後,瀏覽至度量區段來驗證資料是否寫入目標。如需詳細資訊,請參閱連線器中心度量參照。
-
在連線器頁面中,選取包含您要使用之測量結果的連線器。
-
在連線器詳細資訊頁面中,選取測量結果。
-
( 選擇性 ) 依錯誤、延遲、來源、目標或工作篩選測量結果。
您可以查看擴充前後的有效負載範例。
-
隱匿之前:
-
隱匿後;
-
作業 4:依第三方 SIEM 使用 OCI 稽核日誌
我們已將此解決方案部署在常用的 SIEM 工具上,例如 Splunk。您可以使用任何支援從 kafka 連線或 OCI 物件儲存體使用的第三方 SIEM。如需詳細資訊,請參閱 Install and Administer Splunk Connect for Kafka 和 Stream OCI 日誌使用 OCI Streaming and Kafka Connect to Splunk 。
相關連結
認可
-
作者 - Dipesh Kumar Rathod (基礎架構首席雲端架構師)
-
貢獻者 – Balaji Meruva (基礎架構首席雲端架構師)
其他學習資源
探索 docs.oracle.com/learn 上的其他實驗室,或存取 Oracle Learning YouTube 頻道上的更多免費學習內容。此外,請造訪 education.oracle.com/learning-explorer 以成為 Oracle Learning Explorer。
如需產品文件,請造訪 Oracle Help Center 。
Redact Oracle Cloud Infrastructure Logs using Serverless Function for Third-Party SIEMs
G18658-01
November 2024