注意:

使用第三方 SIEM 的无服务器功能编写 Oracle Cloud Infrastructure 日志

简介

Oracle Cloud Infrastructure Connector Hub (OCI Connector Hub) 在 OCI 中的服务之间编排数据移动。连接器指定包含要移动的数据的源服务、可选任务以及用于传送数据的目标服务。

当您选择日志记录源以使用连接器从 OCI 日志记录服务传输日志数据时,它将从源读取数据并按“原样”写入目标服务。某些 SIEM 工具可能有编写源 json 有效负载中的元素的要求。它可以是加密的,也可以占用空间,不会为数据转换提供价值。

可选函数任务在连接器中对于此类要求很有用,在日志过滤器中,用于过滤来自源的日志数据,然后写入目标服务。

此解决方案将帮助 SIEM 工具在对重新架构、培训或流程更改进行微调后,快速采用 OCI。此自动化功能支持 OCI 日志记录服务中的审计、服务和定制日志。

目标

先决条件

Oracle Cloud Infrastructure Identity and Access Management (OCI IAM) 管理员应创建:

任务 1:创建和部署用于数据编写的 OCI 函数

  1. 登录到 OCI 控制台,然后单击 OCI Cloud Shell

  2. 使用 OCI Cloud Shell 中的 Fn Project Command Line Interface (CLI) 创建函数。有关更多信息,请参见 Creating,Deploying,and Invoking a Helloworld Function

    fn init --runtime python <function-name>
    
    Example: fn init --runtime python payload-trans-log-func
    
  3. 将目录更改为新创建的目录。

  4. 创建应用程序以部署函数。

    # Specify the OCID of subnet
    fn create app <function-app-name> --annotation oracle.com/oci/subnetIds='["<subnet OCID>"]'
    
    Example:  fn create app payload-trans-log-app --annotation oracle.com/oci/subnetIds='["ocid1.subnet.oc1.ap-mumbai-1.aaaaaaaabitp32dkxxxxxxxxxxxxxk3evl2nxivwb....."]'
    
  5. 通过覆盖现有内容,在 func.py 文件中复制并粘贴以下脚本。

    import io
    import os
    import oci
    import json
    from fdk import response
    import requests
    import logging
    
    def handler(ctx, data: io.BytesIO = None):
        logger = logging.getLogger()
        try:
          exclude_list = []
          # Reading json element from function config parameter for 'exclude' 
          exclude_json = json.loads(os.environ["exclude"])
          for pkey, ckey in exclude_json.items():
                for lkey in ckey:
                   exclude_list.append(pkey + "." + lkey )
       except Exception as ex:
          print('ERROR: Missing configuration key', ex, flush=True)
          raise
    
       try:
          payload_bytes = data.getvalue()
          if payload_bytes == b'':
                raise KeyError('No keys in payload')
          # Reading source log json payload
          logs = json.loads(data.getvalue())
          logDict1 = []
          totalrecs = 0
          for auditlogrec in logs:
                logDict1.append(auditlogrec)
                totalrecs+= 1
          strlogDict = audit_log_parse1(logDict1)
          # Calling log parse function for data redaction as specified in exclude parameter by user
          auditlogDict = audit_log_parse(strlogDict, exclude_list)
       except Exception as ex:
          print('ERROR: Missing configuration key', ex, flush=True)
          raise
    
    return response.Response( ctx, status_code=200, response_data=json.dumps(auditlogDict), headers={"Content-Type": "application/json"},)
    
    # validating number of lines in source to verify after redaction
    def audit_log_parse1(auditrecs):
       logger = logging.getLogger()
       strDict = []
       mcount = 0
       skip_line = 0
       for line in auditrecs:
          mcount += 1
          try:
                mline = json.dumps(line).replace('\\n',"")
                strDict.append(mline)
          except Exception as ex:
                print("ERROR for line number-" + str(mcount), ex, flush=True)
                skip_line += 1
                pass
       logger.info('Total lines-' + str(mcount) + ' Skipped lines-' + str(skip_line))
       return strDict
    
    # log parse function for data redaction as specified in exclude parameter by user
    def audit_log_parse(rawText, exclude_list):
       logger = logging.getLogger()
       logDict = []
       mcount = 0
       insertcount = 0
       skip_line = 0
       for line in rawText:
          if skip_line == 1:
                line = line_part + line
          skip_line = 0
          mcount += 1
          try:
                audit_data = json.loads(line)
          except Exception as ex:
                logger.info('ERROR for line number-' + str(mcount))
                skip_line = 1
                line_part = line.rstrip()
                pass
    
          if skip_line == 0:
                for key, value in audit_data.items():
                   if key == "data":
                      if (isinstance(value, dict)):
                            # Extracting and replacing json payload for nested keys
                            for key1, value1 in value.copy().items():
                               json_path_l1 = "data" + "." + key1
                               delete_flag = json_filter(value, exclude_list, json_path_l1)
                               if (isinstance(value1, dict)) and delete_flag == 0:
                                  for key2, value2 in value1.copy().items():
                                        json_path_l2 = json_path_l1 + "." + key2
                                        # Extracting and replacing json payload 
                                        delete_flag = json_filter(value, exclude_list, json_path_l2)
                                        if (isinstance(value2, dict)) and delete_flag == 0:
                                           for key3, value3 in value2.copy().items():
                                              json_path_l3 = json_path_l2 + "." + key3
                                              delete_flag = json_filter(value, exclude_list, json_path_l3)
                                              if (isinstance(value3, dict)) and delete_flag == 0:
                                                    for key4, value4 in value3.copy().items():
                                                       json_path_l4 = json_path_l3 + "." + key4
                                                       delete_flag = json_filter(value, exclude_list, json_path_l4)
                                                       if (isinstance(value4, dict)) and delete_flag == 0:
                                                          for key5, value5 in value4.copy().items():
                                                                json_path_l5 = json_path_l4 + "." + key5
                                                                delete_flag = json_filter(value, exclude_list, json_path_l5)
    
          if skip_line == 0:
                logDict.append(audit_data)
                insertcount += 1
       # Number of records written as per source in a batch
       logger.info('Total count ->' + str(mcount) + ' Insert count ->' + str(insertcount) + ' logDict count->' + str(len(logDict)))
       return logDict
    
    
    def json_filter(value, exclude_list, json_path):
       try:
          mjsonlist = []
          for ejson_path in exclude_list:
                if ejson_path == json_path:
                   jsonlist = ejson_path.split('.')
                   for x in jsonlist:
                      mjsonlist.append(x)
                   if len(jsonlist) == 2:
                      del value[mjsonlist[1]]
                      return 1
                   if len(jsonlist) == 3:
                      del value[mjsonlist[1]][mjsonlist[2]]
                      return 1
                   if len(jsonlist) == 4:
                      del value[mjsonlist[1]][mjsonlist[2]][mjsonlist[3]]
                      return 1
                   if len(jsonlist) == 5:
                      del value[mjsonlist[1][mjsonlist[2]][mjsonlist[3]]][mjsonlist[4]]
                      return 1
          return 0
       except Exception as ex:
          print("ERROR for redacting elements from payload", ex, flush=True)
          return 1
          pass
    
  6. 验证 requirements.txt 文件中的以下软件包或更新。

    fdk
    requests
    oci
    
  7. 在部署后更新 func.yaml 中的 exclude 部分中的非必需 JSON 元素或函数配置参数。

    Config:
    exclude: {}
    
    Example:
    config:
    exclude: '{ "data.identity": [ "credentials"], "data.request.headers": [ "authorization", "Authorization", "X-OCI-LB-PrivateAccessMetadata", "opc-principal" ] }'
    
  8. 运行以下命令以部署该函数。

    fn -v deploy --app <app-name>
    Example: fn -v deploy --app payload-trans-log-app
    

任务 2:使用函数创建和部署 OCI Connector Hub 任务

  1. 转到 OCI 控制台,单击连接器并选择创建连接器

  2. 创建连接器页中,为新连接器输入用户友好的名称和可选的说明

  3. 选择要存储新连接器的区间

  4. 配置连接器部分的中,选择日志记录

  5. 目标中,选择要将日志数据传输到的服务:

    • 对象存储:将日志数据发送到存储桶。

    • 流处理:将日志数据发送到流。

    注:您可以使用更多受支持的目标。我们将按照本教程解决方案介绍 OCI Streaming 或 OCI Object Storage。

  6. 配置源连接中,选择根和日志组的区间名称(审计)。

  7. 可选)在配置函数任务部分中,配置函数任务以使用 OCI 函数服务处理来自源的数据。

    • 选择任务:选择一个函数。

    • 区间:选择包含该函数的区间。

    • 函数应用程序:选择包含函数的函数应用程序的名称。

    • 函数:选择要用于处理从源接收的数据的函数的名称。

  8. 如果选择对象存储作为目标,则在配置目标下,配置存储桶以将日志数据发送到该存储桶。

    • 区间:选择包含所需存储桶的区间。

    • 时段:选择要将数据发送到的存储桶的名称。

    • 对象名称前缀:可选)输入前缀值。

    • 可选)单击显示附加选项,输入批处理大小 (MB) 和批处理时间(毫秒)的值。

    或者

    如果选择了流处理作为目标,则在配置目标下,配置流以将日志数据发送到该流。

    • 区间:选择包含所需流的区间。

    • 流:选择要将数据发送到的流的名称。

  9. 单击创建

任务 3:验证 OCI 审计日志中的编写元素

在本教程中,我们以审计日志为例,但您可以使用服务或定制日志以及 OCI Connector Hub 进行数据编写。验证是否在创建连接器后通过导航到度量部分写入目标的数据。有关度量的详细信息,请参阅连接器中心度量参考

  1. 连接器页中,选择包含要使用的度量的连接器。

  2. Connector Details(连接器详细信息)页面中,选择 Metrics(度量)

  3. 可选)按错误、延迟、源、目标或任务筛选度量。

    您可以在扩充前后查看有效负载的示例。

    • 编写之前:

      删除前 json 有效负载

    • 编写后;

      删除后清除 json 有效负载

任务 4:按第三方 SIEM 使用 OCI 审计日志

我们在常见的 SIEM 工具(例如 Splunk)上部署了此解决方案。您可以使用支持从 kafka 连接或 OCI 对象存储使用的任何第三方 SIEM。有关更多信息,请参见 Install and Administer Splunk Connect for KafkaStream OCI logs using OCI Streaming and Kafka Connect to Splunk

确认

更多学习资源

浏览 docs.oracle.com/learn 上的其他实验室,或者访问 Oracle Learning YouTube 渠道上的更多免费学习内容。此外,请访问 education.oracle.com/learning-explorer 成为 Oracle Learning Explorer。

有关产品文档,请访问 Oracle 帮助中心