Observação:

Reduzir Logs do Oracle Cloud Infrastructure usando Função sem Servidor para SIEMs de Terceiros

Introdução

O Oracle Cloud Infrastructure Connector Hub (OCI Connector Hub) orquestra a movimentação de dados entre serviços no OCI. Um conector especifica o serviço de origem que contém os dados a serem movidos, as tarefas opcionais e o serviço de destino para entrega de dados.

Quando você seleciona uma origem de log para transferir dados de log do serviço OCI Logging usando o conector, ele lê dados do serviço de origem e grava no serviço de destino 'AS IS'. Algumas ferramentas SIEM podem ter requisitos para redigir os elementos no payload json de origem. Ele pode ser criptografado ou consumir espaço e não fornece valor para a transformação de dados.

Uma tarefa de função opcional será útil no conector para esse tipo de requisitos, no filtro de log para filtrar dados de log da origem e, em seguida, gravar no serviço de destino.

Essa solução ajudará as ferramentas SIEM a adotar rapidamente a OCI com ajustes mínimos em relação à rearquitetura, treinamento ou alterações de processo. Essa automação suporta auditoria, serviço e logs personalizados do OCI do serviço de registro em log.

Objetivos

Pré-requisitos

O administrador do Oracle Cloud Infrastructure Identity and Access Management (OCI IAM) deve criar:

Tarefa 1: Criar e Implantar Funções do OCI para Redação de Dados

  1. Faça log-in na Console do OCI e clique em OCI Cloud Shell.

  2. Crie a função usando a CLI (Interface de Linha de Comando) do Fn Project no OCI Cloud Shell. Para obter mais informações, consulte Criando, Implantando e Chamando uma Função Helloworld.

    fn init --runtime python <function-name>
    
    Example: fn init --runtime python payload-trans-log-func
    
  3. Altere o diretório para o diretório recém-criado.

  4. Crie um aplicativo para implantar a função.

    # Specify the OCID of subnet
    fn create app <function-app-name> --annotation oracle.com/oci/subnetIds='["<subnet OCID>"]'
    
    Example:  fn create app payload-trans-log-app --annotation oracle.com/oci/subnetIds='["ocid1.subnet.oc1.ap-mumbai-1.aaaaaaaabitp32dkxxxxxxxxxxxxxk3evl2nxivwb....."]'
    
  5. Copie e cole o script a seguir no arquivo func.py substituindo o conteúdo existente.

    import io
    import os
    import oci
    import json
    from fdk import response
    import requests
    import logging
    
    def handler(ctx, data: io.BytesIO = None):
        logger = logging.getLogger()
        try:
          exclude_list = []
          # Reading json element from function config parameter for 'exclude' 
          exclude_json = json.loads(os.environ["exclude"])
          for pkey, ckey in exclude_json.items():
                for lkey in ckey:
                   exclude_list.append(pkey + "." + lkey )
       except Exception as ex:
          print('ERROR: Missing configuration key', ex, flush=True)
          raise
    
       try:
          payload_bytes = data.getvalue()
          if payload_bytes == b'':
                raise KeyError('No keys in payload')
          # Reading source log json payload
          logs = json.loads(data.getvalue())
          logDict1 = []
          totalrecs = 0
          for auditlogrec in logs:
                logDict1.append(auditlogrec)
                totalrecs+= 1
          strlogDict = audit_log_parse1(logDict1)
          # Calling log parse function for data redaction as specified in exclude parameter by user
          auditlogDict = audit_log_parse(strlogDict, exclude_list)
       except Exception as ex:
          print('ERROR: Missing configuration key', ex, flush=True)
          raise
    
    return response.Response( ctx, status_code=200, response_data=json.dumps(auditlogDict), headers={"Content-Type": "application/json"},)
    
    # validating number of lines in source to verify after redaction
    def audit_log_parse1(auditrecs):
       logger = logging.getLogger()
       strDict = []
       mcount = 0
       skip_line = 0
       for line in auditrecs:
          mcount += 1
          try:
                mline = json.dumps(line).replace('\\n',"")
                strDict.append(mline)
          except Exception as ex:
                print("ERROR for line number-" + str(mcount), ex, flush=True)
                skip_line += 1
                pass
       logger.info('Total lines-' + str(mcount) + ' Skipped lines-' + str(skip_line))
       return strDict
    
    # log parse function for data redaction as specified in exclude parameter by user
    def audit_log_parse(rawText, exclude_list):
       logger = logging.getLogger()
       logDict = []
       mcount = 0
       insertcount = 0
       skip_line = 0
       for line in rawText:
          if skip_line == 1:
                line = line_part + line
          skip_line = 0
          mcount += 1
          try:
                audit_data = json.loads(line)
          except Exception as ex:
                logger.info('ERROR for line number-' + str(mcount))
                skip_line = 1
                line_part = line.rstrip()
                pass
    
          if skip_line == 0:
                for key, value in audit_data.items():
                   if key == "data":
                      if (isinstance(value, dict)):
                            # Extracting and replacing json payload for nested keys
                            for key1, value1 in value.copy().items():
                               json_path_l1 = "data" + "." + key1
                               delete_flag = json_filter(value, exclude_list, json_path_l1)
                               if (isinstance(value1, dict)) and delete_flag == 0:
                                  for key2, value2 in value1.copy().items():
                                        json_path_l2 = json_path_l1 + "." + key2
                                        # Extracting and replacing json payload 
                                        delete_flag = json_filter(value, exclude_list, json_path_l2)
                                        if (isinstance(value2, dict)) and delete_flag == 0:
                                           for key3, value3 in value2.copy().items():
                                              json_path_l3 = json_path_l2 + "." + key3
                                              delete_flag = json_filter(value, exclude_list, json_path_l3)
                                              if (isinstance(value3, dict)) and delete_flag == 0:
                                                    for key4, value4 in value3.copy().items():
                                                       json_path_l4 = json_path_l3 + "." + key4
                                                       delete_flag = json_filter(value, exclude_list, json_path_l4)
                                                       if (isinstance(value4, dict)) and delete_flag == 0:
                                                          for key5, value5 in value4.copy().items():
                                                                json_path_l5 = json_path_l4 + "." + key5
                                                                delete_flag = json_filter(value, exclude_list, json_path_l5)
    
          if skip_line == 0:
                logDict.append(audit_data)
                insertcount += 1
       # Number of records written as per source in a batch
       logger.info('Total count ->' + str(mcount) + ' Insert count ->' + str(insertcount) + ' logDict count->' + str(len(logDict)))
       return logDict
    
    
    def json_filter(value, exclude_list, json_path):
       try:
          mjsonlist = []
          for ejson_path in exclude_list:
                if ejson_path == json_path:
                   jsonlist = ejson_path.split('.')
                   for x in jsonlist:
                      mjsonlist.append(x)
                   if len(jsonlist) == 2:
                      del value[mjsonlist[1]]
                      return 1
                   if len(jsonlist) == 3:
                      del value[mjsonlist[1]][mjsonlist[2]]
                      return 1
                   if len(jsonlist) == 4:
                      del value[mjsonlist[1]][mjsonlist[2]][mjsonlist[3]]
                      return 1
                   if len(jsonlist) == 5:
                      del value[mjsonlist[1][mjsonlist[2]][mjsonlist[3]]][mjsonlist[4]]
                      return 1
          return 0
       except Exception as ex:
          print("ERROR for redacting elements from payload", ex, flush=True)
          return 1
          pass
    
  6. Verifique os pacotes a seguir no arquivo requirements.txt ou atualize.

    fdk
    requests
    oci
    
  7. Atualize o elemento JSON não obrigatório na seção exclude em func.yaml ou no parâmetro de configuração de função pós-implantação.

    Config:
    exclude: {}
    
    Example:
    config:
    exclude: '{ "data.identity": [ "credentials"], "data.request.headers": [ "authorization", "Authorization", "X-OCI-LB-PrivateAccessMetadata", "opc-principal" ] }'
    
  8. Execute o comando a seguir para implantar a função.

    fn -v deploy --app <app-name>
    Example: fn -v deploy --app payload-trans-log-app
    

Tarefa 2: Criar e Implantar o OCI Connector Hub com a Tarefa de Função

  1. Vá para a Console do OCI, clique em Conectores e selecione Criar conector.

  2. Na página Criar conector, digite um Nome amigável para o novo conector e uma Descrição opcional.

  3. Selecione o compartimento no qual você deseja armazenar o novo conector.

  4. Na seção Configurar conector, para Origem, selecione Log.

  5. Em Destino, selecione o serviço para o qual você deseja transferir os dados de log:

    • Object Storage: Envie dados de log para um bucket.

    • Streaming: Envie dados de log para um stream.

    Observação: Você pode usar mais destinos suportados. Estamos descrevendo o OCI Streaming ou o OCI Object Storage de acordo com esta solução de tutorial.

  6. Em Configurar conexão de origem, selecione o nome do compartimento para raiz e o grupo de logs (Auditoria).

  7. (Opcional) Na seção Configurar tarefa de função, configure uma tarefa de função para processar dados da origem usando o serviço OCI Functions.

    • Selecionar tarefa: Selecione uma função.

    • Compartimento: selecione o compartimento que contém a função.

    • Aplicativo de função: Selecione o nome do aplicativo de função que inclui a função.

    • Função: Selecione o nome da função que você deseja usar para processar os dados recebidos da origem.

  8. Se você tiver selecionado o serviço Object Storage como o Destino, em Configurar Destino, configure o bucket para o qual enviar os dados de log.

    • Compartimento: Selecione o compartimento que contém o bucket desejado.

    • Bucket: Selecione o nome do bucket para que você deseja enviar os dados.

    • Prefixo de Nome de Objeto: (Opcional) informe um valor de prefixo.

    • (Opcional) clique em Mostrar opções adicionais, insira valores para o tamanho do batch (MBs) e o tempo do batch (milissegundos).

    Ou

    Se você selecionou Streaming como o Destino, em Configurar Destino, configure o stream para o qual enviar os dados de log.

    • Compartimento: Selecione o compartimento que contém o stream desejado.

    • Stream: Selecione o nome do stream para o qual você deseja enviar os dados.

  9. Clique em Criar.

Tarefa 3: Validar Elemento de Redação no Log de Auditoria do OCI

Neste tutorial, estamos usando o log de auditoria como exemplo, mas você pode usar logs de serviço ou personalizados também no OCI Connector Hub para redação de dados. Verifique se os dados gravados no destino navegando até a seção Métrica depois de criar o conector. Para obter mais informações sobre métricas, consulte Referência de Métricas do Connector Hub.

  1. Na página Conectores, selecione o conector que contém as métricas com as quais você deseja trabalhar.

  2. Na página Detalhes do Conector, selecione Métricas.

  3. (Opcional) Filtre métricas por erro, latência, origem, destino ou tarefa.

    Você pode ver um exemplo de payload antes e depois do enriquecimento.

    • Antes da Ocultação:

      payload json antes da remoção

    • Após a redação;

      limpar payload json após remoção

Tarefa 4: Consumir Logs de Auditoria do OCI por SIEM de Terceiros

Implantamos essa solução em ferramentas SIEM comuns, por exemplo, Splunk. Você pode usar qualquer SIEM de terceiros que suporte para consumir do kafka connect ou do OCI Object Storage. Para obter mais informações, consulte Instalar e Administrar o Splunk Connect para Kafka e Transmitir logs do OCI usando o OCI Streaming e o Kafka Connect para o Splunk.

Confirmações

Mais Recursos de Aprendizagem

Explore outros laboratórios em docs.oracle.com/learn ou acesse mais conteúdo de aprendizado gratuito no canal Oracle Learning YouTube. Além disso, visite education.oracle.com/learning-explorer para se tornar um Oracle Learning Explorer.

Para obter a documentação do produto, visite o Oracle Help Center.