Note:
- Este tutorial requiere acceso a Oracle Cloud. Para registrarse para obtener una cuenta gratuita, consulte Introducción a la cuenta gratuita de Oracle Cloud Infrastructure.
- Utiliza valores de ejemplo para credenciales, arrendamiento y compartimentos de Oracle Cloud Infrastructure. Al finalizar la práctica, sustituya estos valores por otros específicos de su entorno en la nube.
Redacción de logs de Oracle Cloud Infrastructure mediante la función sin servidor para SIEM de terceros
Introducción
Oracle Cloud Infrastructure Connector Hub (OCI Connector Hub) organiza el movimiento de datos entre servicios en OCI. Un conector especifica el servicio de origen que contiene los datos que se van a mover, las tareas opcionales y el servicio de destino para la entrega de datos.
Al seleccionar un origen de registro para transferir datos de log del servicio OCI Logging mediante el conector, lee los datos del origen y escribe en el servicio de destino 'TAL CUAL'. Algunas herramientas SIEM pueden tener requisitos para redactar los elementos de la carga útil json de origen. Podría estar cifrado o consumir espacio, y no proporciona valor a la transformación de datos.
Una tarea de función opcional será útil en el conector para este tipo de requisitos, en el filtro de log para filtrar los datos de log del origen y, a continuación, escribir en el servicio de destino.
Esta solución ayudará a las herramientas SIEM a adoptar rápidamente OCI con ajustes mínimos en cuanto a reestructuración, formación o cambios en los procesos. Esta automatización soporta la auditoría, el servicio y los logs personalizados de OCI desde el servicio de registro.
Objetivos
- Redacta dinámicamente elementos cifrados o confidenciales del log de OCI
json payload
.
Requisitos
El administrador de Oracle Cloud Infrastructure Identity and Access Management (OCI IAM) debe crear:
-
Grupo dinámico y políticas para OCI Functions para llamar y enriquecer el log de auditoría.
-
Grupo dinámico: cree un grupo dinámico para el compartimento de funciones. Para obtener más información, consulte Creación de un grupo dinámico.
# Replace OCID for function compartment All {resource.type = 'fnfunc', resource.compartment.id = '<function-compartment>'} Example: All {resource.type = 'fnfunc', resource.compartment.id = 'ocid1.compartment.oc1..aaaaaaaanovmfmmnonjjyxeq4jyghszj2eczlrkgj5svnxrt...'}
-
-
Compartimento, grupo y políticas para que el desarrollador cree y despliegue la función y el conector.
# Replace group-name as per your tenancy Allow group <group-name> to manage repos in compartment <function-compartment> allow group <group-name> to use cloud-shell in tenancy allow group <group-name> to manage functions-family in compartment <function-compartment> allow group <group-name> to use virtual-network-family in compartment <network-compartment> Allow group <group-name> to use objectstorage-namespaces in tenancy Allow group <group-name> to use stream-family in tenancy
Tarea 1: Creación y despliegue de OCI Functions para la ocultación de datos
-
Conéctese a la consola de OCI y haga clic en OCI Cloud Shell.
-
Cree una función mediante la interfaz de línea de comandos (CLI) de Fn Project desde OCI Cloud Shell. Para obtener más información, consulte Creación, despliegue y llamada de una función de mundo auxiliar.
fn init --runtime python <function-name> Example: fn init --runtime python payload-trans-log-func
-
Cambie el directorio al directorio creado recientemente.
-
Cree una aplicación para desplegar la función.
# Specify the OCID of subnet fn create app <function-app-name> --annotation oracle.com/oci/subnetIds='["<subnet OCID>"]' Example: fn create app payload-trans-log-app --annotation oracle.com/oci/subnetIds='["ocid1.subnet.oc1.ap-mumbai-1.aaaaaaaabitp32dkxxxxxxxxxxxxxk3evl2nxivwb....."]'
-
Copie y pegue el siguiente script en el archivo
func.py
sobrescribiendo el contenido existente.import io import os import oci import json from fdk import response import requests import logging def handler(ctx, data: io.BytesIO = None): logger = logging.getLogger() try: exclude_list = [] # Reading json element from function config parameter for 'exclude' exclude_json = json.loads(os.environ["exclude"]) for pkey, ckey in exclude_json.items(): for lkey in ckey: exclude_list.append(pkey + "." + lkey ) except Exception as ex: print('ERROR: Missing configuration key', ex, flush=True) raise try: payload_bytes = data.getvalue() if payload_bytes == b'': raise KeyError('No keys in payload') # Reading source log json payload logs = json.loads(data.getvalue()) logDict1 = [] totalrecs = 0 for auditlogrec in logs: logDict1.append(auditlogrec) totalrecs+= 1 strlogDict = audit_log_parse1(logDict1) # Calling log parse function for data redaction as specified in exclude parameter by user auditlogDict = audit_log_parse(strlogDict, exclude_list) except Exception as ex: print('ERROR: Missing configuration key', ex, flush=True) raise return response.Response( ctx, status_code=200, response_data=json.dumps(auditlogDict), headers={"Content-Type": "application/json"},) # validating number of lines in source to verify after redaction def audit_log_parse1(auditrecs): logger = logging.getLogger() strDict = [] mcount = 0 skip_line = 0 for line in auditrecs: mcount += 1 try: mline = json.dumps(line).replace('\\n',"") strDict.append(mline) except Exception as ex: print("ERROR for line number-" + str(mcount), ex, flush=True) skip_line += 1 pass logger.info('Total lines-' + str(mcount) + ' Skipped lines-' + str(skip_line)) return strDict # log parse function for data redaction as specified in exclude parameter by user def audit_log_parse(rawText, exclude_list): logger = logging.getLogger() logDict = [] mcount = 0 insertcount = 0 skip_line = 0 for line in rawText: if skip_line == 1: line = line_part + line skip_line = 0 mcount += 1 try: audit_data = json.loads(line) except Exception as ex: logger.info('ERROR for line number-' + str(mcount)) skip_line = 1 line_part = line.rstrip() pass if skip_line == 0: for key, value in audit_data.items(): if key == "data": if (isinstance(value, dict)): # Extracting and replacing json payload for nested keys for key1, value1 in value.copy().items(): json_path_l1 = "data" + "." + key1 delete_flag = json_filter(value, exclude_list, json_path_l1) if (isinstance(value1, dict)) and delete_flag == 0: for key2, value2 in value1.copy().items(): json_path_l2 = json_path_l1 + "." + key2 # Extracting and replacing json payload delete_flag = json_filter(value, exclude_list, json_path_l2) if (isinstance(value2, dict)) and delete_flag == 0: for key3, value3 in value2.copy().items(): json_path_l3 = json_path_l2 + "." + key3 delete_flag = json_filter(value, exclude_list, json_path_l3) if (isinstance(value3, dict)) and delete_flag == 0: for key4, value4 in value3.copy().items(): json_path_l4 = json_path_l3 + "." + key4 delete_flag = json_filter(value, exclude_list, json_path_l4) if (isinstance(value4, dict)) and delete_flag == 0: for key5, value5 in value4.copy().items(): json_path_l5 = json_path_l4 + "." + key5 delete_flag = json_filter(value, exclude_list, json_path_l5) if skip_line == 0: logDict.append(audit_data) insertcount += 1 # Number of records written as per source in a batch logger.info('Total count ->' + str(mcount) + ' Insert count ->' + str(insertcount) + ' logDict count->' + str(len(logDict))) return logDict def json_filter(value, exclude_list, json_path): try: mjsonlist = [] for ejson_path in exclude_list: if ejson_path == json_path: jsonlist = ejson_path.split('.') for x in jsonlist: mjsonlist.append(x) if len(jsonlist) == 2: del value[mjsonlist[1]] return 1 if len(jsonlist) == 3: del value[mjsonlist[1]][mjsonlist[2]] return 1 if len(jsonlist) == 4: del value[mjsonlist[1]][mjsonlist[2]][mjsonlist[3]] return 1 if len(jsonlist) == 5: del value[mjsonlist[1][mjsonlist[2]][mjsonlist[3]]][mjsonlist[4]] return 1 return 0 except Exception as ex: print("ERROR for redacting elements from payload", ex, flush=True) return 1 pass
-
Verifique los siguientes paquetes en el archivo
requirements.txt
o actualícelos.fdk requests oci
-
Actualice el elemento JSON no necesario en la sección exclude de
func.yaml
o el parámetro de configuración de función posterior al despliegue.Config: exclude: {} Example: config: exclude: '{ "data.identity": [ "credentials"], "data.request.headers": [ "authorization", "Authorization", "X-OCI-LB-PrivateAccessMetadata", "opc-principal" ] }'
-
Ejecute el siguiente comando para desplegar la función.
fn -v deploy --app <app-name> Example: fn -v deploy --app payload-trans-log-app
Tarea 2: Crear y desplegar OCI Connector Hub con tarea de función
-
Vaya a la consola de OCI, haga clic en Conectores y seleccione Crear conector.
-
En la página Crear conector, introduzca un nombre fácil de recordar para el nuevo conector y una descripción opcional.
-
Seleccione el compartimento en el que desea almacenar el nuevo conector.
-
En la sección Configurar conector, en Origen, seleccione Registro.
-
En Destino, seleccione el servicio al que desea transferir los datos de log:
-
Object Storage: envía datos de log a un cubo.
-
Streaming: envía datos de log a un flujo.
Nota: Puede utilizar un destino más soportado. Estamos describiendo OCI Streaming u OCI Object Storage según esta solución de tutorial.
-
-
En Configurar conexión de origen, seleccione el nombre de compartimento para la raíz y el grupo de logs (Auditoría).
-
(Opcional) En la sección Configurar tarea de función, configure una tarea de función para procesar datos del origen mediante el servicio OCI Functions.
-
Seleccionar tarea: seleccione una función.
-
Compartimento: seleccione el compartimento que contiene la función.
-
Aplicación de función: seleccione el nombre de la aplicación de función que incluye la función.
-
Función: seleccione el nombre de la función que desea utilizar para procesar los datos recibidos del origen.
-
-
Si ha seleccionado Object Storage como Destino, en Configurar destino, configure el cubo al que enviar los datos de log.
-
Compartimento: seleccione el compartimento que contiene el cubo que desea.
-
Cubo: seleccione el nombre del cubo al que desea enviar los datos.
-
Prefijo de nombre de objeto: (Opcional) introduzca un valor de prefijo.
-
(Opcional) haga clic en Mostrar opciones adicionales, introduzca valores para el tamaño de lote (MB) y el tiempo de lote (milisegundos).
O bien:
Si ha seleccionado Flujo como Destino, en Configurar Destino, configure el flujo al que enviar los datos de log.
-
Compartimento: seleccione el compartimento que contiene el flujo que desea.
-
Flujo: seleccione el nombre del flujo al que desea enviar los datos.
-
-
Haga clic en Crear.
Tarea 3: Validación del elemento de ocultación en el log de auditoría de OCI
En este tutorial, estamos utilizando el log de auditoría como ejemplo, pero puede utilizar logs de servicio o personalizados también en OCI Connector Hub para la ocultación de datos. Verifique si los datos se escriben en el destino. Para ello, vaya a la sección Métrica después de crear el conector. Para obtener más información sobre las métricas, consulte Referencia de métricas de Connector Hub.
-
En la página Conectores, seleccione el conector que contiene las métricas con las que desea trabajar.
-
En la página Detalles de conector, seleccione Métricas.
-
(Opcional) Filtre las métricas por error, latencia, origen, destino o tarea.
Puede ver un ejemplo de carga útil antes y después del enriquecimiento.
-
Antes de la redacción:
-
Después de la redacción;
-
Tarea 4: Consumo de logs de auditoría de OCI por SIEM de terceros
Hemos implementado esta solución en herramientas SIEM comunes, por ejemplo, Splunk. Podría utilizar cualquier SIEM de terceros que soporte el uso de kafka connect u OCI Object Storage. Para obtener más información, consulte Instalación y administración de Splunk Connect for Kafka y Logs de OCI Streaming mediante OCI Streaming y Kafka Connect to Splunk.
Enlaces relacionados
Agradecimientos
-
Autor: Dipesh Kumar Rathod (arquitecto principal de infraestructura en la nube)
-
Colaborador – Balaji Meruva (arquitecto principal maestro en la nube, infraestructura)
Más recursos de aprendizaje
Explore otros laboratorios en docs.oracle.com/learn o acceda a más contenido de formación gratuita en el canal YouTube de Oracle Learning. Además, visita education.oracle.com/learning-explorer para convertirte en un Oracle Learning Explorer.
Para obtener documentación sobre el producto, visite Oracle Help Center.
Redact Oracle Cloud Infrastructure Logs using Serverless Function for Third-Party SIEMs
G18650-01
November 2024