附註:

使用適用於第三方 SIEM 的無伺服器功能隱匿 Oracle Cloud Infrastructure 日誌

簡介

Oracle Cloud Infrastructure Connector Hub (OCI Connector Hub) 協調 OCI 中服務之間的資料移動。連線器指定來源服務,其中包含要移動的資料、選擇性工作,以及要傳遞資料的目標服務。

當您選取使用連線器從 OCI 日誌記錄服務傳輸日誌資料的日誌來源時,會從來源讀取資料,並且依原狀寫入目標服務。某些 SIEM 工具可能需要隱匿來源 json 有效負載中的元素。它可以是加密或佔用的空間,而且不提供資料轉換的值。

選擇性函數作業在連線器中對於此類需求很有用,在日誌篩選中篩選日誌資料,然後寫入目標服務。

此解決方案可協助 SIEM 工具在重新架構、訓練或流程變更方面,以最少的角度快速採用 OCI。此自動化支援日誌記錄服務的 OCI 稽核、服務以及自訂日誌。

目標

必要條件

Oracle Cloud Infrastructure Identity and Access Management (OCI IAM) 管理員應建立:

作業 1:建立及部署資料隱匿的 OCI 函數

  1. 登入 OCI 主控台,然後按一下 OCI Cloud Shell

  2. 使用 OCI Cloud Shell 的 Fn 專案命令行介面 (CLI) 建立函數。如需詳細資訊,請參閱建立、部署及呼叫 Helloworld 函數

    fn init --runtime python <function-name>
    
    Example: fn init --runtime python payload-trans-log-func
    
  3. 將目錄變更為新建立的目錄。

  4. 建立應用程式以部署函數。

    # Specify the OCID of subnet
    fn create app <function-app-name> --annotation oracle.com/oci/subnetIds='["<subnet OCID>"]'
    
    Example:  fn create app payload-trans-log-app --annotation oracle.com/oci/subnetIds='["ocid1.subnet.oc1.ap-mumbai-1.aaaaaaaabitp32dkxxxxxxxxxxxxxk3evl2nxivwb....."]'
    
  5. 藉由覆寫現有內容,複製並貼上 func.py 檔案中的下列命令檔。

    import io
    import os
    import oci
    import json
    from fdk import response
    import requests
    import logging
    
    def handler(ctx, data: io.BytesIO = None):
        logger = logging.getLogger()
        try:
          exclude_list = []
          # Reading json element from function config parameter for 'exclude' 
          exclude_json = json.loads(os.environ["exclude"])
          for pkey, ckey in exclude_json.items():
                for lkey in ckey:
                   exclude_list.append(pkey + "." + lkey )
       except Exception as ex:
          print('ERROR: Missing configuration key', ex, flush=True)
          raise
    
       try:
          payload_bytes = data.getvalue()
          if payload_bytes == b'':
                raise KeyError('No keys in payload')
          # Reading source log json payload
          logs = json.loads(data.getvalue())
          logDict1 = []
          totalrecs = 0
          for auditlogrec in logs:
                logDict1.append(auditlogrec)
                totalrecs+= 1
          strlogDict = audit_log_parse1(logDict1)
          # Calling log parse function for data redaction as specified in exclude parameter by user
          auditlogDict = audit_log_parse(strlogDict, exclude_list)
       except Exception as ex:
          print('ERROR: Missing configuration key', ex, flush=True)
          raise
    
    return response.Response( ctx, status_code=200, response_data=json.dumps(auditlogDict), headers={"Content-Type": "application/json"},)
    
    # validating number of lines in source to verify after redaction
    def audit_log_parse1(auditrecs):
       logger = logging.getLogger()
       strDict = []
       mcount = 0
       skip_line = 0
       for line in auditrecs:
          mcount += 1
          try:
                mline = json.dumps(line).replace('\\n',"")
                strDict.append(mline)
          except Exception as ex:
                print("ERROR for line number-" + str(mcount), ex, flush=True)
                skip_line += 1
                pass
       logger.info('Total lines-' + str(mcount) + ' Skipped lines-' + str(skip_line))
       return strDict
    
    # log parse function for data redaction as specified in exclude parameter by user
    def audit_log_parse(rawText, exclude_list):
       logger = logging.getLogger()
       logDict = []
       mcount = 0
       insertcount = 0
       skip_line = 0
       for line in rawText:
          if skip_line == 1:
                line = line_part + line
          skip_line = 0
          mcount += 1
          try:
                audit_data = json.loads(line)
          except Exception as ex:
                logger.info('ERROR for line number-' + str(mcount))
                skip_line = 1
                line_part = line.rstrip()
                pass
    
          if skip_line == 0:
                for key, value in audit_data.items():
                   if key == "data":
                      if (isinstance(value, dict)):
                            # Extracting and replacing json payload for nested keys
                            for key1, value1 in value.copy().items():
                               json_path_l1 = "data" + "." + key1
                               delete_flag = json_filter(value, exclude_list, json_path_l1)
                               if (isinstance(value1, dict)) and delete_flag == 0:
                                  for key2, value2 in value1.copy().items():
                                        json_path_l2 = json_path_l1 + "." + key2
                                        # Extracting and replacing json payload 
                                        delete_flag = json_filter(value, exclude_list, json_path_l2)
                                        if (isinstance(value2, dict)) and delete_flag == 0:
                                           for key3, value3 in value2.copy().items():
                                              json_path_l3 = json_path_l2 + "." + key3
                                              delete_flag = json_filter(value, exclude_list, json_path_l3)
                                              if (isinstance(value3, dict)) and delete_flag == 0:
                                                    for key4, value4 in value3.copy().items():
                                                       json_path_l4 = json_path_l3 + "." + key4
                                                       delete_flag = json_filter(value, exclude_list, json_path_l4)
                                                       if (isinstance(value4, dict)) and delete_flag == 0:
                                                          for key5, value5 in value4.copy().items():
                                                                json_path_l5 = json_path_l4 + "." + key5
                                                                delete_flag = json_filter(value, exclude_list, json_path_l5)
    
          if skip_line == 0:
                logDict.append(audit_data)
                insertcount += 1
       # Number of records written as per source in a batch
       logger.info('Total count ->' + str(mcount) + ' Insert count ->' + str(insertcount) + ' logDict count->' + str(len(logDict)))
       return logDict
    
    
    def json_filter(value, exclude_list, json_path):
       try:
          mjsonlist = []
          for ejson_path in exclude_list:
                if ejson_path == json_path:
                   jsonlist = ejson_path.split('.')
                   for x in jsonlist:
                      mjsonlist.append(x)
                   if len(jsonlist) == 2:
                      del value[mjsonlist[1]]
                      return 1
                   if len(jsonlist) == 3:
                      del value[mjsonlist[1]][mjsonlist[2]]
                      return 1
                   if len(jsonlist) == 4:
                      del value[mjsonlist[1]][mjsonlist[2]][mjsonlist[3]]
                      return 1
                   if len(jsonlist) == 5:
                      del value[mjsonlist[1][mjsonlist[2]][mjsonlist[3]]][mjsonlist[4]]
                      return 1
          return 0
       except Exception as ex:
          print("ERROR for redacting elements from payload", ex, flush=True)
          return 1
          pass
    
  6. 驗證 requirements.txt 檔案中的下列套裝軟體或更新。

    fdk
    requests
    oci
    
  7. 更新 func.yaml排除區段或函數組態參數後續部署中的非必要 JSON 元素。

    Config:
    exclude: {}
    
    Example:
    config:
    exclude: '{ "data.identity": [ "credentials"], "data.request.headers": [ "authorization", "Authorization", "X-OCI-LB-PrivateAccessMetadata", "opc-principal" ] }'
    
  8. 執行下列命令以部署函數。

    fn -v deploy --app <app-name>
    Example: fn -v deploy --app payload-trans-log-app
    

作業 2:建立並部署 OCI 連線器中心與功能作業

  1. 移至 OCI 主控台,按一下連線器並選取建立連線器

  2. 建立連線器頁面中,輸入新連線器的易記名稱和選擇性的描述

  3. 選取要在其中儲存新連線器的區間

  4. 設定連線器區段的來源中,選取記錄日誌

  5. 目標中,選取您要將日誌資料傳輸至下列位置的服務:

    • 物件儲存:將日誌資料傳送至儲存桶。

    • 串流:將日誌資料傳送至串流。

    注意:您可以使用更多支援的目標。我們根據此教學課程解決方案描述 OCI Streaming 或 OCI Object Storage。

  6. 設定來源連線中,選取根和日誌群組的區間名稱 ( 稽核 )。

  7. ( 選擇性 ) 在設定函數任務區段中,設定函數任務以使用 OCI 函數服務從來源處理資料。

    • 選取任務:選取函數。

    • 區間: 選取包含此函數的區間。

    • 函數應用程式:選取包含函數的函數應用程式名稱。

    • 函數:選取您要用來處理從來源接收之資料的函數名稱。

  8. 如果您選取物件儲存作為目標,請在設定目標底下設定要傳送日誌資料的儲存桶。

    • 區間: 選取包含所要儲存桶的區間。

    • 儲存桶:選取您要傳送資料的儲存桶名稱。

    • 物件名稱首碼: ( 選擇性 ) 輸入首碼值。

    • ( 選擇性 ) 按一下顯示其他選項,輸入批次大小 (MB) 和批次時間 (毫秒) 的值。

    如果您選取串流作為目標,請在設定目標底下設定要傳送日誌資料的串流。

    • 區間: 選取包含所要串流的區間。

    • 串流:選取您要將資料傳送到其中的串流名稱。

  9. 按一下建立

作業 3:驗證 OCI 稽核日誌中的隱匿元素

在本教學課程中,我們將使用稽核日誌作為範例,但您也可以在 OCI Connector Hub 中使用服務或自訂日誌進行資料隱匿。建立連線器之後,瀏覽至度量區段來驗證資料是否寫入目標。如需詳細資訊,請參閱連線器中心度量參照

  1. 連線器頁面中,選取包含您要使用之測量結果的連線器。

  2. 連線器詳細資訊頁面中,選取測量結果

  3. ( 選擇性 ) 依錯誤、延遲、來源、目標或工作篩選測量結果。

    您可以查看擴充前後的有效負載範例。

    • 隱匿之前:

      移除前的 json 有效負載

    • 隱匿後;

      移除後清除 json 有效負載

作業 4:依第三方 SIEM 使用 OCI 稽核日誌

我們已將此解決方案部署在常用的 SIEM 工具上,例如 Splunk。您可以使用任何支援從 kafka 連線或 OCI 物件儲存體使用的第三方 SIEM。如需詳細資訊,請參閱 Install and Administer Splunk Connect for KafkaStream OCI 日誌使用 OCI Streaming and Kafka Connect to Splunk

認可

其他學習資源

探索 docs.oracle.com/learn 上的其他實驗室,或存取 Oracle Learning YouTube 頻道上的更多免費學習內容。此外,請造訪 education.oracle.com/learning-explorer 以成為 Oracle Learning Explorer。

如需產品文件,請造訪 Oracle Help Center