注意:
- 本教程需要访问 Oracle Cloud。要注册免费账户,请参阅开始使用 Oracle Cloud Infrastructure 免费套餐。
- 它对 Oracle Cloud Infrastructure 身份证明、租户和区间使用示例值。完成实验室后,请使用特定于云环境的那些值替换这些值。
使用第三方 SIEM 的无服务器功能编写 Oracle Cloud Infrastructure 日志
简介
Oracle Cloud Infrastructure Connector Hub (OCI Connector Hub) 在 OCI 中的服务之间编排数据移动。连接器指定包含要移动的数据的源服务、可选任务以及用于传送数据的目标服务。
当您选择日志记录源以使用连接器从 OCI 日志记录服务传输日志数据时,它将从源读取数据并按“原样”写入目标服务。某些 SIEM 工具可能有编写源 json 有效负载中的元素的要求。它可以是加密的,也可以占用空间,不会为数据转换提供价值。
可选函数任务在连接器中对于此类要求很有用,在日志过滤器中,用于过滤来自源的日志数据,然后写入目标服务。
此解决方案将帮助 SIEM 工具在对重新架构、培训或流程更改进行微调后,快速采用 OCI。此自动化功能支持 OCI 日志记录服务中的审计、服务和定制日志。
目标
- 从 OCI 日志
json payload
中动态重新 acta 加密或敏感元素。
先决条件
Oracle Cloud Infrastructure Identity and Access Management (OCI IAM) 管理员应创建:
-
OCI Functions 的动态组和策略用于调用和扩充审计日志。
-
动态组:为函数区间创建动态组。有关详细信息,请参阅创建动态组。
# Replace OCID for function compartment All {resource.type = 'fnfunc', resource.compartment.id = '<function-compartment>'} Example: All {resource.type = 'fnfunc', resource.compartment.id = 'ocid1.compartment.oc1..aaaaaaaanovmfmmnonjjyxeq4jyghszj2eczlrkgj5svnxrt...'}
-
-
开发人员用于创建和部署函数和连接器的区间、组和策略。
# Replace group-name as per your tenancy Allow group <group-name> to manage repos in compartment <function-compartment> allow group <group-name> to use cloud-shell in tenancy allow group <group-name> to manage functions-family in compartment <function-compartment> allow group <group-name> to use virtual-network-family in compartment <network-compartment> Allow group <group-name> to use objectstorage-namespaces in tenancy Allow group <group-name> to use stream-family in tenancy
任务 1:创建和部署用于数据编写的 OCI 函数
-
登录到 OCI 控制台,然后单击 OCI Cloud Shell 。
-
使用 OCI Cloud Shell 中的 Fn Project Command Line Interface (CLI) 创建函数。有关更多信息,请参见 Creating,Deploying,and Invoking a Helloworld Function 。
fn init --runtime python <function-name> Example: fn init --runtime python payload-trans-log-func
-
将目录更改为新创建的目录。
-
创建应用程序以部署函数。
# Specify the OCID of subnet fn create app <function-app-name> --annotation oracle.com/oci/subnetIds='["<subnet OCID>"]' Example: fn create app payload-trans-log-app --annotation oracle.com/oci/subnetIds='["ocid1.subnet.oc1.ap-mumbai-1.aaaaaaaabitp32dkxxxxxxxxxxxxxk3evl2nxivwb....."]'
-
通过覆盖现有内容,在
func.py
文件中复制并粘贴以下脚本。import io import os import oci import json from fdk import response import requests import logging def handler(ctx, data: io.BytesIO = None): logger = logging.getLogger() try: exclude_list = [] # Reading json element from function config parameter for 'exclude' exclude_json = json.loads(os.environ["exclude"]) for pkey, ckey in exclude_json.items(): for lkey in ckey: exclude_list.append(pkey + "." + lkey ) except Exception as ex: print('ERROR: Missing configuration key', ex, flush=True) raise try: payload_bytes = data.getvalue() if payload_bytes == b'': raise KeyError('No keys in payload') # Reading source log json payload logs = json.loads(data.getvalue()) logDict1 = [] totalrecs = 0 for auditlogrec in logs: logDict1.append(auditlogrec) totalrecs+= 1 strlogDict = audit_log_parse1(logDict1) # Calling log parse function for data redaction as specified in exclude parameter by user auditlogDict = audit_log_parse(strlogDict, exclude_list) except Exception as ex: print('ERROR: Missing configuration key', ex, flush=True) raise return response.Response( ctx, status_code=200, response_data=json.dumps(auditlogDict), headers={"Content-Type": "application/json"},) # validating number of lines in source to verify after redaction def audit_log_parse1(auditrecs): logger = logging.getLogger() strDict = [] mcount = 0 skip_line = 0 for line in auditrecs: mcount += 1 try: mline = json.dumps(line).replace('\\n',"") strDict.append(mline) except Exception as ex: print("ERROR for line number-" + str(mcount), ex, flush=True) skip_line += 1 pass logger.info('Total lines-' + str(mcount) + ' Skipped lines-' + str(skip_line)) return strDict # log parse function for data redaction as specified in exclude parameter by user def audit_log_parse(rawText, exclude_list): logger = logging.getLogger() logDict = [] mcount = 0 insertcount = 0 skip_line = 0 for line in rawText: if skip_line == 1: line = line_part + line skip_line = 0 mcount += 1 try: audit_data = json.loads(line) except Exception as ex: logger.info('ERROR for line number-' + str(mcount)) skip_line = 1 line_part = line.rstrip() pass if skip_line == 0: for key, value in audit_data.items(): if key == "data": if (isinstance(value, dict)): # Extracting and replacing json payload for nested keys for key1, value1 in value.copy().items(): json_path_l1 = "data" + "." + key1 delete_flag = json_filter(value, exclude_list, json_path_l1) if (isinstance(value1, dict)) and delete_flag == 0: for key2, value2 in value1.copy().items(): json_path_l2 = json_path_l1 + "." + key2 # Extracting and replacing json payload delete_flag = json_filter(value, exclude_list, json_path_l2) if (isinstance(value2, dict)) and delete_flag == 0: for key3, value3 in value2.copy().items(): json_path_l3 = json_path_l2 + "." + key3 delete_flag = json_filter(value, exclude_list, json_path_l3) if (isinstance(value3, dict)) and delete_flag == 0: for key4, value4 in value3.copy().items(): json_path_l4 = json_path_l3 + "." + key4 delete_flag = json_filter(value, exclude_list, json_path_l4) if (isinstance(value4, dict)) and delete_flag == 0: for key5, value5 in value4.copy().items(): json_path_l5 = json_path_l4 + "." + key5 delete_flag = json_filter(value, exclude_list, json_path_l5) if skip_line == 0: logDict.append(audit_data) insertcount += 1 # Number of records written as per source in a batch logger.info('Total count ->' + str(mcount) + ' Insert count ->' + str(insertcount) + ' logDict count->' + str(len(logDict))) return logDict def json_filter(value, exclude_list, json_path): try: mjsonlist = [] for ejson_path in exclude_list: if ejson_path == json_path: jsonlist = ejson_path.split('.') for x in jsonlist: mjsonlist.append(x) if len(jsonlist) == 2: del value[mjsonlist[1]] return 1 if len(jsonlist) == 3: del value[mjsonlist[1]][mjsonlist[2]] return 1 if len(jsonlist) == 4: del value[mjsonlist[1]][mjsonlist[2]][mjsonlist[3]] return 1 if len(jsonlist) == 5: del value[mjsonlist[1][mjsonlist[2]][mjsonlist[3]]][mjsonlist[4]] return 1 return 0 except Exception as ex: print("ERROR for redacting elements from payload", ex, flush=True) return 1 pass
-
验证
requirements.txt
文件中的以下软件包或更新。fdk requests oci
-
在部署后更新
func.yaml
中的 exclude 部分中的非必需 JSON 元素或函数配置参数。Config: exclude: {} Example: config: exclude: '{ "data.identity": [ "credentials"], "data.request.headers": [ "authorization", "Authorization", "X-OCI-LB-PrivateAccessMetadata", "opc-principal" ] }'
-
运行以下命令以部署该函数。
fn -v deploy --app <app-name> Example: fn -v deploy --app payload-trans-log-app
任务 2:使用函数创建和部署 OCI Connector Hub 任务
-
转到 OCI 控制台,单击连接器并选择创建连接器。
-
在创建连接器页中,为新连接器输入用户友好的名称和可选的说明。
-
选择要存储新连接器的区间。
-
在配置连接器部分的源中,选择日志记录。
-
在目标中,选择要将日志数据传输到的服务:
-
对象存储:将日志数据发送到存储桶。
-
流处理:将日志数据发送到流。
注:您可以使用更多受支持的目标。我们将按照本教程解决方案介绍 OCI Streaming 或 OCI Object Storage。
-
-
在配置源连接中,选择根和日志组的区间名称(审计)。
-
(可选)在配置函数任务部分中,配置函数任务以使用 OCI 函数服务处理来自源的数据。
-
选择任务:选择一个函数。
-
区间:选择包含该函数的区间。
-
函数应用程序:选择包含函数的函数应用程序的名称。
-
函数:选择要用于处理从源接收的数据的函数的名称。
-
-
如果选择对象存储作为目标,则在配置目标下,配置存储桶以将日志数据发送到该存储桶。
-
区间:选择包含所需存储桶的区间。
-
时段:选择要将数据发送到的存储桶的名称。
-
对象名称前缀:(可选)输入前缀值。
-
(可选)单击显示附加选项,输入批处理大小 (MB) 和批处理时间(毫秒)的值。
或者
如果选择了流处理作为目标,则在配置目标下,配置流以将日志数据发送到该流。
-
区间:选择包含所需流的区间。
-
流:选择要将数据发送到的流的名称。
-
-
单击创建。
任务 3:验证 OCI 审计日志中的编写元素
在本教程中,我们以审计日志为例,但您可以使用服务或定制日志以及 OCI Connector Hub 进行数据编写。验证是否在创建连接器后通过导航到度量部分写入目标的数据。有关度量的详细信息,请参阅连接器中心度量参考。
-
在连接器页中,选择包含要使用的度量的连接器。
-
在 Connector Details(连接器详细信息)页面中,选择 Metrics(度量)。
-
(可选)按错误、延迟、源、目标或任务筛选度量。
您可以在扩充前后查看有效负载的示例。
-
编写之前:
-
编写后;
-
任务 4:按第三方 SIEM 使用 OCI 审计日志
我们在常见的 SIEM 工具(例如 Splunk)上部署了此解决方案。您可以使用支持从 kafka 连接或 OCI 对象存储使用的任何第三方 SIEM。有关更多信息,请参见 Install and Administer Splunk Connect for Kafka 和 Stream OCI logs using OCI Streaming and Kafka Connect to Splunk 。
相关链接
确认
-
作者 — Dipesh Kumar Rathod(基础设施首席云架构师)
-
贡献者 — Balaji Meruva(基础设施首席云架构师)
更多学习资源
浏览 docs.oracle.com/learn 上的其他实验室,或者访问 Oracle Learning YouTube 渠道上的更多免费学习内容。此外,请访问 education.oracle.com/learning-explorer 成为 Oracle Learning Explorer。
有关产品文档,请访问 Oracle 帮助中心。
Redact Oracle Cloud Infrastructure Logs using Serverless Function for Third-Party SIEMs
G18657-01
November 2024