Note:
- This tutorial requires access to Oracle Cloud. To sign up for a free account, see Get started with Oracle Cloud Infrastructure Free Tier.
- It uses example values for Oracle Cloud Infrastructure credentials, tenancy, and compartments. When completing your lab, substitute these values with ones specific to your cloud environment.
Redact Oracle Cloud Infrastructure Logs using Serverless Function for Third-Party SIEMs
Introduction
Oracle Cloud Infrastructure Connector Hub (OCI Connector Hub) orchestrates data movement between services in OCI. A connector specifies the source service that contains the data to be moved, optional tasks, and the target service for delivery of data.
When you select a logging source to transfer log data from the OCI Logging service using connector, it reads data from the source and write to target service ‘AS IS’. Some SIEM tools may have requirements to redact the elements in source json payload. It could be either encrypted or consuming space, and does not provide value to data transformation.
An optional function task will be useful in connector for this kind of requirements, in log filter to filter log data from the source and then write to target service.
This solution will help SIEM tools to quickly adopt OCI with minimal tweaks regarding rearchitecture, training, or process changes. This automation supports OCI audit, service and custom logs from logging service.
Objectives
- Dynamically redacta encrypted or sensitive elements from OCI log
json payload
.
Prerequisites
Oracle Cloud Infrastructure Identity and Access Management (OCI IAM) administrator should create:
-
Dynamic group and policies for OCI Functions to invoke and enrich audit log.
-
Dynamic Group: Create dynamic group for function compartment. For more information, see Creating a Dynamic Group.
# Replace OCID for function compartment All {resource.type = 'fnfunc', resource.compartment.id = '<function-compartment>'} Example: All {resource.type = 'fnfunc', resource.compartment.id = 'ocid1.compartment.oc1..aaaaaaaanovmfmmnonjjyxeq4jyghszj2eczlrkgj5svnxrt...'}
-
-
Compartment, group and policies for developer to create and deploy the function and connector.
# Replace group-name as per your tenancy Allow group <group-name> to manage repos in compartment <function-compartment> allow group <group-name> to use cloud-shell in tenancy allow group <group-name> to manage functions-family in compartment <function-compartment> allow group <group-name> to use virtual-network-family in compartment <network-compartment> Allow group <group-name> to use objectstorage-namespaces in tenancy Allow group <group-name> to use stream-family in tenancy
Task 1: Create and Deploy OCI Functions for Data Redaction
-
Log in to the OCI Console and click OCI Cloud Shell.
-
Create function using Fn Project Command Line Interface (CLI) from OCI Cloud Shell. For more information, see Creating, Deploying, and Invoking a Helloworld Function.
fn init --runtime python <function-name> Example: fn init --runtime python payload-trans-log-func
-
Change directory to the newly created directory.
-
Create application to deploy the function.
# Specify the OCID of subnet fn create app <function-app-name> --annotation oracle.com/oci/subnetIds='["<subnet OCID>"]' Example: fn create app payload-trans-log-app --annotation oracle.com/oci/subnetIds='["ocid1.subnet.oc1.ap-mumbai-1.aaaaaaaabitp32dkxxxxxxxxxxxxxk3evl2nxivwb....."]'
-
Copy and paste the following script in the
func.py
file by overwriting the existing content.import io import os import oci import json from fdk import response import requests import logging def handler(ctx, data: io.BytesIO = None): logger = logging.getLogger() try: exclude_list = [] # Reading json element from function config parameter for 'exclude' exclude_json = json.loads(os.environ["exclude"]) for pkey, ckey in exclude_json.items(): for lkey in ckey: exclude_list.append(pkey + "." + lkey ) except Exception as ex: print('ERROR: Missing configuration key', ex, flush=True) raise try: payload_bytes = data.getvalue() if payload_bytes == b'': raise KeyError('No keys in payload') # Reading source log json payload logs = json.loads(data.getvalue()) logDict1 = [] totalrecs = 0 for auditlogrec in logs: logDict1.append(auditlogrec) totalrecs+= 1 strlogDict = audit_log_parse1(logDict1) # Calling log parse function for data redaction as specified in exclude parameter by user auditlogDict = audit_log_parse(strlogDict, exclude_list) except Exception as ex: print('ERROR: Missing configuration key', ex, flush=True) raise return response.Response( ctx, status_code=200, response_data=json.dumps(auditlogDict), headers={"Content-Type": "application/json"},) # validating number of lines in source to verify after redaction def audit_log_parse1(auditrecs): logger = logging.getLogger() strDict = [] mcount = 0 skip_line = 0 for line in auditrecs: mcount += 1 try: mline = json.dumps(line).replace('\\n',"") strDict.append(mline) except Exception as ex: print("ERROR for line number-" + str(mcount), ex, flush=True) skip_line += 1 pass logger.info('Total lines-' + str(mcount) + ' Skipped lines-' + str(skip_line)) return strDict # log parse function for data redaction as specified in exclude parameter by user def audit_log_parse(rawText, exclude_list): logger = logging.getLogger() logDict = [] mcount = 0 insertcount = 0 skip_line = 0 for line in rawText: if skip_line == 1: line = line_part + line skip_line = 0 mcount += 1 try: audit_data = json.loads(line) except Exception as ex: logger.info('ERROR for line number-' + str(mcount)) skip_line = 1 line_part = line.rstrip() pass if skip_line == 0: for key, value in audit_data.items(): if key == "data": if (isinstance(value, dict)): # Extracting and replacing json payload for nested keys for key1, value1 in value.copy().items(): json_path_l1 = "data" + "." + key1 delete_flag = json_filter(value, exclude_list, json_path_l1) if (isinstance(value1, dict)) and delete_flag == 0: for key2, value2 in value1.copy().items(): json_path_l2 = json_path_l1 + "." + key2 # Extracting and replacing json payload delete_flag = json_filter(value, exclude_list, json_path_l2) if (isinstance(value2, dict)) and delete_flag == 0: for key3, value3 in value2.copy().items(): json_path_l3 = json_path_l2 + "." + key3 delete_flag = json_filter(value, exclude_list, json_path_l3) if (isinstance(value3, dict)) and delete_flag == 0: for key4, value4 in value3.copy().items(): json_path_l4 = json_path_l3 + "." + key4 delete_flag = json_filter(value, exclude_list, json_path_l4) if (isinstance(value4, dict)) and delete_flag == 0: for key5, value5 in value4.copy().items(): json_path_l5 = json_path_l4 + "." + key5 delete_flag = json_filter(value, exclude_list, json_path_l5) if skip_line == 0: logDict.append(audit_data) insertcount += 1 # Number of records written as per source in a batch logger.info('Total count ->' + str(mcount) + ' Insert count ->' + str(insertcount) + ' logDict count->' + str(len(logDict))) return logDict def json_filter(value, exclude_list, json_path): try: mjsonlist = [] for ejson_path in exclude_list: if ejson_path == json_path: jsonlist = ejson_path.split('.') for x in jsonlist: mjsonlist.append(x) if len(jsonlist) == 2: del value[mjsonlist[1]] return 1 if len(jsonlist) == 3: del value[mjsonlist[1]][mjsonlist[2]] return 1 if len(jsonlist) == 4: del value[mjsonlist[1]][mjsonlist[2]][mjsonlist[3]] return 1 if len(jsonlist) == 5: del value[mjsonlist[1][mjsonlist[2]][mjsonlist[3]]][mjsonlist[4]] return 1 return 0 except Exception as ex: print("ERROR for redacting elements from payload", ex, flush=True) return 1 pass
-
Verify the following packages in
requirements.txt
file or update.fdk requests oci
-
Update the non-required JSON element in exclude section in
func.yaml
or function configuration parameter post deployment.Config: exclude: {} Example: config: exclude: '{ "data.identity": [ "credentials"], "data.request.headers": [ "authorization", "Authorization", "X-OCI-LB-PrivateAccessMetadata", "opc-principal" ] }'
-
Run the following command to deploy the function.
fn -v deploy --app <app-name> Example: fn -v deploy --app payload-trans-log-app
Task 2: Create and Deploy OCI Connector Hub with Function Task
-
Go to the OCI Console, click Connectors and select Create connector.
-
In the Create connector page, enter a user-friendly Name for the new connector and an optional Description.
-
Select the compartment where you want to store the new connector.
-
In the Configure connector section, for Source, select Logging.
-
In Target, select the service that you want to transfer the log data to:
-
Object Storage: Send log data to a bucket.
-
Streaming: Send log data to a stream.
Note: You could use more supported target. We are describing OCI Streaming or OCI Object Storage as per this tutorial solution.
-
-
In Configure source connection, select the compartment name to root and log group (Audit).
-
(Optional) In the Configure function task section, configure a function task to process data from the source using the OCI Functions service.
-
Select task: Select a function.
-
Compartment: Select the compartment that contains the function.
-
Function application: Select the name of the function application that includes the function.
-
Function: Select the name of the function that you want to use to process the data received from the source.
-
-
If you selected Object Storage as the Target, under Configure Target, configure the bucket to send the log data to.
-
Compartment: Select the compartment that contains the bucket that you want.
-
Bucket: Select the name of the bucket that you want to send the data to.
-
Object Name Prefix: (Optional) enter a prefix value.
-
(Optional) click Show additional options, enter values for batch size (MBs) and batch time (milliseconds).
Or
If you selected Streaming as the Target, under Configure Target, configure the stream to send the log data to.
-
Compartment: Select the compartment that contains the stream that you want.
-
Stream: Select the name of the stream that you want to send the data to.
-
-
Click Create.
Task 3: Validate Redaction Element in OCI Audit Log
In this tutorial, we are using audit log as example but you could use service or custom logs as wellin OCI Connector Hub for data redaction. Verify, if data written to target by navigating to Metric section after you create the connector. For more information about metrics, see Connector Hub Metrics Reference.
-
In the Connectors page, select the connector that contains the metrics that you want to work with.
-
In the Connector Details page, select Metrics.
-
(Optional) Filter metrics by error, latency, source, target, or task.
You can see example of payload before and after enrichment.
-
Before Redaction:
-
After Redaction;
-
Task 4: Consume OCI Audit Logs by Third-Party SIEM
We have deployed this solution on common SIEM tools, for example, Splunk. You could use any third party SIEM which support to consume from kafka connect or OCI Object Storage. For more information, see Install and Administer Splunk Connect for Kafka and Stream OCI logs using OCI Streaming and Kafka Connect to Splunk.
Related Links
Acknowledgments
-
Author - Dipesh Kumar Rathod (Master Principal Cloud Architect, Infrastructure)
-
Contributor – Balaji Meruva (Master Principal Cloud Architect, Infrastructure)
More Learning Resources
Explore other labs on docs.oracle.com/learn or access more free learning content on the Oracle Learning YouTube channel. Additionally, visit education.oracle.com/learning-explorer to become an Oracle Learning Explorer.
For product documentation, visit Oracle Help Center.
Redact Oracle Cloud Infrastructure Logs using Serverless Function for Third-Party SIEMs
G18462-01
November 2024