Note:

Redacción de logs de Oracle Cloud Infrastructure mediante la función sin servidor para SIEM de terceros

Introducción

Oracle Cloud Infrastructure Connector Hub (OCI Connector Hub) organiza el movimiento de datos entre servicios en OCI. Un conector especifica el servicio de origen que contiene los datos que se van a mover, las tareas opcionales y el servicio de destino para la entrega de datos.

Al seleccionar un origen de registro para transferir datos de log del servicio OCI Logging mediante el conector, lee los datos del origen y escribe en el servicio de destino 'TAL CUAL'. Algunas herramientas SIEM pueden tener requisitos para redactar los elementos de la carga útil json de origen. Podría estar cifrado o consumir espacio, y no proporciona valor a la transformación de datos.

Una tarea de función opcional será útil en el conector para este tipo de requisitos, en el filtro de log para filtrar los datos de log del origen y, a continuación, escribir en el servicio de destino.

Esta solución ayudará a las herramientas SIEM a adoptar rápidamente OCI con ajustes mínimos en cuanto a reestructuración, formación o cambios en los procesos. Esta automatización soporta la auditoría, el servicio y los logs personalizados de OCI desde el servicio de registro.

Objetivos

Requisitos

El administrador de Oracle Cloud Infrastructure Identity and Access Management (OCI IAM) debe crear:

Tarea 1: Creación y despliegue de OCI Functions para la ocultación de datos

  1. Conéctese a la consola de OCI y haga clic en OCI Cloud Shell.

  2. Cree una función mediante la interfaz de línea de comandos (CLI) de Fn Project desde OCI Cloud Shell. Para obtener más información, consulte Creación, despliegue y llamada de una función de mundo auxiliar.

    fn init --runtime python <function-name>
    
    Example: fn init --runtime python payload-trans-log-func
    
  3. Cambie el directorio al directorio creado recientemente.

  4. Cree una aplicación para desplegar la función.

    # Specify the OCID of subnet
    fn create app <function-app-name> --annotation oracle.com/oci/subnetIds='["<subnet OCID>"]'
    
    Example:  fn create app payload-trans-log-app --annotation oracle.com/oci/subnetIds='["ocid1.subnet.oc1.ap-mumbai-1.aaaaaaaabitp32dkxxxxxxxxxxxxxk3evl2nxivwb....."]'
    
  5. Copie y pegue el siguiente script en el archivo func.py sobrescribiendo el contenido existente.

    import io
    import os
    import oci
    import json
    from fdk import response
    import requests
    import logging
    
    def handler(ctx, data: io.BytesIO = None):
        logger = logging.getLogger()
        try:
          exclude_list = []
          # Reading json element from function config parameter for 'exclude' 
          exclude_json = json.loads(os.environ["exclude"])
          for pkey, ckey in exclude_json.items():
                for lkey in ckey:
                   exclude_list.append(pkey + "." + lkey )
       except Exception as ex:
          print('ERROR: Missing configuration key', ex, flush=True)
          raise
    
       try:
          payload_bytes = data.getvalue()
          if payload_bytes == b'':
                raise KeyError('No keys in payload')
          # Reading source log json payload
          logs = json.loads(data.getvalue())
          logDict1 = []
          totalrecs = 0
          for auditlogrec in logs:
                logDict1.append(auditlogrec)
                totalrecs+= 1
          strlogDict = audit_log_parse1(logDict1)
          # Calling log parse function for data redaction as specified in exclude parameter by user
          auditlogDict = audit_log_parse(strlogDict, exclude_list)
       except Exception as ex:
          print('ERROR: Missing configuration key', ex, flush=True)
          raise
    
    return response.Response( ctx, status_code=200, response_data=json.dumps(auditlogDict), headers={"Content-Type": "application/json"},)
    
    # validating number of lines in source to verify after redaction
    def audit_log_parse1(auditrecs):
       logger = logging.getLogger()
       strDict = []
       mcount = 0
       skip_line = 0
       for line in auditrecs:
          mcount += 1
          try:
                mline = json.dumps(line).replace('\\n',"")
                strDict.append(mline)
          except Exception as ex:
                print("ERROR for line number-" + str(mcount), ex, flush=True)
                skip_line += 1
                pass
       logger.info('Total lines-' + str(mcount) + ' Skipped lines-' + str(skip_line))
       return strDict
    
    # log parse function for data redaction as specified in exclude parameter by user
    def audit_log_parse(rawText, exclude_list):
       logger = logging.getLogger()
       logDict = []
       mcount = 0
       insertcount = 0
       skip_line = 0
       for line in rawText:
          if skip_line == 1:
                line = line_part + line
          skip_line = 0
          mcount += 1
          try:
                audit_data = json.loads(line)
          except Exception as ex:
                logger.info('ERROR for line number-' + str(mcount))
                skip_line = 1
                line_part = line.rstrip()
                pass
    
          if skip_line == 0:
                for key, value in audit_data.items():
                   if key == "data":
                      if (isinstance(value, dict)):
                            # Extracting and replacing json payload for nested keys
                            for key1, value1 in value.copy().items():
                               json_path_l1 = "data" + "." + key1
                               delete_flag = json_filter(value, exclude_list, json_path_l1)
                               if (isinstance(value1, dict)) and delete_flag == 0:
                                  for key2, value2 in value1.copy().items():
                                        json_path_l2 = json_path_l1 + "." + key2
                                        # Extracting and replacing json payload 
                                        delete_flag = json_filter(value, exclude_list, json_path_l2)
                                        if (isinstance(value2, dict)) and delete_flag == 0:
                                           for key3, value3 in value2.copy().items():
                                              json_path_l3 = json_path_l2 + "." + key3
                                              delete_flag = json_filter(value, exclude_list, json_path_l3)
                                              if (isinstance(value3, dict)) and delete_flag == 0:
                                                    for key4, value4 in value3.copy().items():
                                                       json_path_l4 = json_path_l3 + "." + key4
                                                       delete_flag = json_filter(value, exclude_list, json_path_l4)
                                                       if (isinstance(value4, dict)) and delete_flag == 0:
                                                          for key5, value5 in value4.copy().items():
                                                                json_path_l5 = json_path_l4 + "." + key5
                                                                delete_flag = json_filter(value, exclude_list, json_path_l5)
    
          if skip_line == 0:
                logDict.append(audit_data)
                insertcount += 1
       # Number of records written as per source in a batch
       logger.info('Total count ->' + str(mcount) + ' Insert count ->' + str(insertcount) + ' logDict count->' + str(len(logDict)))
       return logDict
    
    
    def json_filter(value, exclude_list, json_path):
       try:
          mjsonlist = []
          for ejson_path in exclude_list:
                if ejson_path == json_path:
                   jsonlist = ejson_path.split('.')
                   for x in jsonlist:
                      mjsonlist.append(x)
                   if len(jsonlist) == 2:
                      del value[mjsonlist[1]]
                      return 1
                   if len(jsonlist) == 3:
                      del value[mjsonlist[1]][mjsonlist[2]]
                      return 1
                   if len(jsonlist) == 4:
                      del value[mjsonlist[1]][mjsonlist[2]][mjsonlist[3]]
                      return 1
                   if len(jsonlist) == 5:
                      del value[mjsonlist[1][mjsonlist[2]][mjsonlist[3]]][mjsonlist[4]]
                      return 1
          return 0
       except Exception as ex:
          print("ERROR for redacting elements from payload", ex, flush=True)
          return 1
          pass
    
  6. Verifique los siguientes paquetes en el archivo requirements.txt o actualícelos.

    fdk
    requests
    oci
    
  7. Actualice el elemento JSON no necesario en la sección exclude de func.yaml o el parámetro de configuración de función posterior al despliegue.

    Config:
    exclude: {}
    
    Example:
    config:
    exclude: '{ "data.identity": [ "credentials"], "data.request.headers": [ "authorization", "Authorization", "X-OCI-LB-PrivateAccessMetadata", "opc-principal" ] }'
    
  8. Ejecute el siguiente comando para desplegar la función.

    fn -v deploy --app <app-name>
    Example: fn -v deploy --app payload-trans-log-app
    

Tarea 2: Crear y desplegar OCI Connector Hub con tarea de función

  1. Vaya a la consola de OCI, haga clic en Conectores y seleccione Crear conector.

  2. En la página Crear conector, introduzca un nombre fácil de recordar para el nuevo conector y una descripción opcional.

  3. Seleccione el compartimento en el que desea almacenar el nuevo conector.

  4. En la sección Configurar conector, en Origen, seleccione Registro.

  5. En Destino, seleccione el servicio al que desea transferir los datos de log:

    • Object Storage: envía datos de log a un cubo.

    • Streaming: envía datos de log a un flujo.

    Nota: Puede utilizar un destino más soportado. Estamos describiendo OCI Streaming u OCI Object Storage según esta solución de tutorial.

  6. En Configurar conexión de origen, seleccione el nombre de compartimento para la raíz y el grupo de logs (Auditoría).

  7. (Opcional) En la sección Configurar tarea de función, configure una tarea de función para procesar datos del origen mediante el servicio OCI Functions.

    • Seleccionar tarea: seleccione una función.

    • Compartimento: seleccione el compartimento que contiene la función.

    • Aplicación de función: seleccione el nombre de la aplicación de función que incluye la función.

    • Función: seleccione el nombre de la función que desea utilizar para procesar los datos recibidos del origen.

  8. Si ha seleccionado Object Storage como Destino, en Configurar destino, configure el cubo al que enviar los datos de log.

    • Compartimento: seleccione el compartimento que contiene el cubo que desea.

    • Cubo: seleccione el nombre del cubo al que desea enviar los datos.

    • Prefijo de nombre de objeto: (Opcional) introduzca un valor de prefijo.

    • (Opcional) haga clic en Mostrar opciones adicionales, introduzca valores para el tamaño de lote (MB) y el tiempo de lote (milisegundos).

    O bien:

    Si ha seleccionado Flujo como Destino, en Configurar Destino, configure el flujo al que enviar los datos de log.

    • Compartimento: seleccione el compartimento que contiene el flujo que desea.

    • Flujo: seleccione el nombre del flujo al que desea enviar los datos.

  9. Haga clic en Crear.

Tarea 3: Validación del elemento de ocultación en el log de auditoría de OCI

En este tutorial, estamos utilizando el log de auditoría como ejemplo, pero puede utilizar logs de servicio o personalizados también en OCI Connector Hub para la ocultación de datos. Verifique si los datos se escriben en el destino. Para ello, vaya a la sección Métrica después de crear el conector. Para obtener más información sobre las métricas, consulte Referencia de métricas de Connector Hub.

  1. En la página Conectores, seleccione el conector que contiene las métricas con las que desea trabajar.

  2. En la página Detalles de conector, seleccione Métricas.

  3. (Opcional) Filtre las métricas por error, latencia, origen, destino o tarea.

    Puede ver un ejemplo de carga útil antes y después del enriquecimiento.

    • Antes de la redacción:

      carga útil de json antes de la eliminación

    • Después de la redacción;

      Limpiar carga útil de json después de la eliminación

Tarea 4: Consumo de logs de auditoría de OCI por SIEM de terceros

Hemos implementado esta solución en herramientas SIEM comunes, por ejemplo, Splunk. Podría utilizar cualquier SIEM de terceros que soporte el uso de kafka connect u OCI Object Storage. Para obtener más información, consulte Instalación y administración de Splunk Connect for Kafka y Logs de OCI Streaming mediante OCI Streaming y Kafka Connect to Splunk.

Agradecimientos

Más recursos de aprendizaje

Explore otros laboratorios en docs.oracle.com/learn o acceda a más contenido de formación gratuita en el canal YouTube de Oracle Learning. Además, visita education.oracle.com/learning-explorer para convertirte en un Oracle Learning Explorer.

Para obtener documentación sobre el producto, visite Oracle Help Center.