Nota

Proteggi i log di Oracle Cloud Infrastructure utilizzando la funzione serverless per i SIEM di terze parti

Introduzione

Oracle Cloud Infrastructure Connector Hub (OCI Connector Hub) orchestra lo spostamento dei dati tra i servizi in OCI. Un connettore specifica il servizio di origine che contiene i dati da spostare, i task facoltativi e il servizio di destinazione per la consegna dei dati.

Quando si seleziona un'origine log per trasferire i dati di log dal servizio OCI Logging utilizzando il connettore, legge i dati dall'origine e scrive nel servizio di destinazione 'AS IS'. Alcuni strumenti SIEM potrebbero avere requisiti per redigere gli elementi nel payload json di origine. Potrebbe essere cifrato o occupare spazio e non fornire valore alla trasformazione dei dati.

Un task funzione facoltativo sarà utile nel connettore per questo tipo di requisiti, nel filtro di log per filtrare i dati di log dall'origine e quindi scrivere nel servizio di destinazione.

Questa soluzione aiuterà gli strumenti SIEM ad adottare rapidamente OCI con modifiche minime a livello di ristrutturazione, formazione o processi. Questa automazione supporta l'audit, il servizio e i log personalizzati OCI dal servizio di log.

Obiettivi

Prerequisiti

L'amministratore di Oracle Cloud Infrastructure Identity and Access Management (IAM OCI) deve creare:

Task 1: Creare e distribuire le funzioni OCI per la protezione dati sensibili

  1. Eseguire il login a OCI Console e fare clic su OCI Cloud Shell.

  2. Creare una funzione utilizzando l'interfaccia CLI (Command Line Interface) del progetto Fn da OCI Cloud Shell. Per ulteriori informazioni, vedere Creazione, distribuzione e richiamo di una funzione Helloworld.

    fn init --runtime python <function-name>
    
    Example: fn init --runtime python payload-trans-log-func
    
  3. Spostarsi nella directory appena creata.

  4. Creare un'applicazione per distribuire la funzione.

    # Specify the OCID of subnet
    fn create app <function-app-name> --annotation oracle.com/oci/subnetIds='["<subnet OCID>"]'
    
    Example:  fn create app payload-trans-log-app --annotation oracle.com/oci/subnetIds='["ocid1.subnet.oc1.ap-mumbai-1.aaaaaaaabitp32dkxxxxxxxxxxxxxk3evl2nxivwb....."]'
    
  5. Copiare e incollare il seguente script nel file func.py sovrascrivendo il contenuto esistente.

    import io
    import os
    import oci
    import json
    from fdk import response
    import requests
    import logging
    
    def handler(ctx, data: io.BytesIO = None):
        logger = logging.getLogger()
        try:
          exclude_list = []
          # Reading json element from function config parameter for 'exclude' 
          exclude_json = json.loads(os.environ["exclude"])
          for pkey, ckey in exclude_json.items():
                for lkey in ckey:
                   exclude_list.append(pkey + "." + lkey )
       except Exception as ex:
          print('ERROR: Missing configuration key', ex, flush=True)
          raise
    
       try:
          payload_bytes = data.getvalue()
          if payload_bytes == b'':
                raise KeyError('No keys in payload')
          # Reading source log json payload
          logs = json.loads(data.getvalue())
          logDict1 = []
          totalrecs = 0
          for auditlogrec in logs:
                logDict1.append(auditlogrec)
                totalrecs+= 1
          strlogDict = audit_log_parse1(logDict1)
          # Calling log parse function for data redaction as specified in exclude parameter by user
          auditlogDict = audit_log_parse(strlogDict, exclude_list)
       except Exception as ex:
          print('ERROR: Missing configuration key', ex, flush=True)
          raise
    
    return response.Response( ctx, status_code=200, response_data=json.dumps(auditlogDict), headers={"Content-Type": "application/json"},)
    
    # validating number of lines in source to verify after redaction
    def audit_log_parse1(auditrecs):
       logger = logging.getLogger()
       strDict = []
       mcount = 0
       skip_line = 0
       for line in auditrecs:
          mcount += 1
          try:
                mline = json.dumps(line).replace('\\n',"")
                strDict.append(mline)
          except Exception as ex:
                print("ERROR for line number-" + str(mcount), ex, flush=True)
                skip_line += 1
                pass
       logger.info('Total lines-' + str(mcount) + ' Skipped lines-' + str(skip_line))
       return strDict
    
    # log parse function for data redaction as specified in exclude parameter by user
    def audit_log_parse(rawText, exclude_list):
       logger = logging.getLogger()
       logDict = []
       mcount = 0
       insertcount = 0
       skip_line = 0
       for line in rawText:
          if skip_line == 1:
                line = line_part + line
          skip_line = 0
          mcount += 1
          try:
                audit_data = json.loads(line)
          except Exception as ex:
                logger.info('ERROR for line number-' + str(mcount))
                skip_line = 1
                line_part = line.rstrip()
                pass
    
          if skip_line == 0:
                for key, value in audit_data.items():
                   if key == "data":
                      if (isinstance(value, dict)):
                            # Extracting and replacing json payload for nested keys
                            for key1, value1 in value.copy().items():
                               json_path_l1 = "data" + "." + key1
                               delete_flag = json_filter(value, exclude_list, json_path_l1)
                               if (isinstance(value1, dict)) and delete_flag == 0:
                                  for key2, value2 in value1.copy().items():
                                        json_path_l2 = json_path_l1 + "." + key2
                                        # Extracting and replacing json payload 
                                        delete_flag = json_filter(value, exclude_list, json_path_l2)
                                        if (isinstance(value2, dict)) and delete_flag == 0:
                                           for key3, value3 in value2.copy().items():
                                              json_path_l3 = json_path_l2 + "." + key3
                                              delete_flag = json_filter(value, exclude_list, json_path_l3)
                                              if (isinstance(value3, dict)) and delete_flag == 0:
                                                    for key4, value4 in value3.copy().items():
                                                       json_path_l4 = json_path_l3 + "." + key4
                                                       delete_flag = json_filter(value, exclude_list, json_path_l4)
                                                       if (isinstance(value4, dict)) and delete_flag == 0:
                                                          for key5, value5 in value4.copy().items():
                                                                json_path_l5 = json_path_l4 + "." + key5
                                                                delete_flag = json_filter(value, exclude_list, json_path_l5)
    
          if skip_line == 0:
                logDict.append(audit_data)
                insertcount += 1
       # Number of records written as per source in a batch
       logger.info('Total count ->' + str(mcount) + ' Insert count ->' + str(insertcount) + ' logDict count->' + str(len(logDict)))
       return logDict
    
    
    def json_filter(value, exclude_list, json_path):
       try:
          mjsonlist = []
          for ejson_path in exclude_list:
                if ejson_path == json_path:
                   jsonlist = ejson_path.split('.')
                   for x in jsonlist:
                      mjsonlist.append(x)
                   if len(jsonlist) == 2:
                      del value[mjsonlist[1]]
                      return 1
                   if len(jsonlist) == 3:
                      del value[mjsonlist[1]][mjsonlist[2]]
                      return 1
                   if len(jsonlist) == 4:
                      del value[mjsonlist[1]][mjsonlist[2]][mjsonlist[3]]
                      return 1
                   if len(jsonlist) == 5:
                      del value[mjsonlist[1][mjsonlist[2]][mjsonlist[3]]][mjsonlist[4]]
                      return 1
          return 0
       except Exception as ex:
          print("ERROR for redacting elements from payload", ex, flush=True)
          return 1
          pass
    
  6. Verificare i seguenti pacchetti nel file requirements.txt o eseguire l'aggiornamento.

    fdk
    requests
    oci
    
  7. Aggiornare l'elemento JSON non obbligatorio nella sezione escludi in func.yaml o il parametro di configurazione della funzione dopo la distribuzione.

    Config:
    exclude: {}
    
    Example:
    config:
    exclude: '{ "data.identity": [ "credentials"], "data.request.headers": [ "authorization", "Authorization", "X-OCI-LB-PrivateAccessMetadata", "opc-principal" ] }'
    
  8. Eseguire il comando riportato di seguito per distribuire la funzione.

    fn -v deploy --app <app-name>
    Example: fn -v deploy --app payload-trans-log-app
    

Task 2: Creare e distribuire OCI Connector Hub con task funzione

  1. Andare a OCI Console, fare clic su Connettori e selezionare Crea connettore.

  2. Nella pagina Crea connettore, immettere un nome intuitivo per il nuovo connettore e una descrizione facoltativa.

  3. Selezionare il compartimento in cui memorizzare il nuovo connettore.

  4. Nella sezione Configura connettore, in Origine, selezionare Log.

  5. In Destinazione, selezionare il servizio in cui si desidera trasferire i dati di log:

    • Storage degli oggetti: invia i dati di log a un bucket.

    • Streaming: invia i dati di log a un flusso.

    Nota: è possibile utilizzare una destinazione più supportata. Stiamo descrivendo lo streaming OCI o lo storage degli oggetti OCI in base a questa soluzione di esercitazione.

  6. In Configura connessione di origine, selezionare il nome del compartimento su radice e gruppo di log (Audit).

  7. (Facoltativo) Nella sezione Configura task funzione, configurare un task funzione per elaborare i dati dall'origine utilizzando il servizio OCI Functions.

    • Seleziona task: selezionare una funzione.

    • Compartimento: selezionare il compartimento che contiene la funzione.

    • Applicazione funzione: selezionare il nome dell'applicazione funzione che include la funzione.

    • Funzione: selezionare il nome della funzione che si desidera utilizzare per elaborare i dati ricevuti dall'origine.

  8. Se è stato selezionato Storage degli oggetti come Destinazione, in Configura destinazione configurare il bucket a cui inviare i dati di log.

    • Compartimento: selezionare il compartimento contenente il bucket desiderato.

    • Bucket: selezionare il nome del bucket a cui si desidera inviare i dati.

    • Prefisso nome oggetto: (Facoltativo) immettere un valore di prefisso.

    • (Facoltativo) fare clic su Mostra opzioni aggiuntive, immettere i valori per la dimensione batch (MB) e il tempo batch (millisecondi).

    Oppure

    Se è stata selezionata l'opzione Streaming come Destinazione, in Configura destinazione configurare il flusso a cui inviare i dati di log.

    • Compartimento: selezionare il compartimento contenente il flusso desiderato.

    • Stream: selezionare il nome del flusso a cui si desidera inviare i dati.

  9. Fare clic su Crea.

Task 3: Convalida elemento protezione dati sensibili nel log di audit OCI

In questa esercitazione viene utilizzato il log di audit come esempio, ma è possibile utilizzare i log di servizio o personalizzati nonché OCI Connector Hub per la protezione dati sensibili. Verificare se i dati sono stati scritti nella destinazione passando alla sezione Metrica dopo aver creato il connettore. Per ulteriori informazioni sulle metriche, vedere Riferimento alle metriche dell'hub del connettore.

  1. Nella pagina Connettori selezionare il connettore che contiene le metriche da utilizzare.

  2. Nella pagina Dettagli connettore, selezionare Metriche.

  3. (Facoltativo) Filtrare le metriche in base a errore, latenza, origine, destinazione o task.

    È possibile visualizzare un esempio di payload prima e dopo l'arricchimento.

    • Prima della protezione dati sensibili:

      payload Json prima della rimozione

    • Dopo la redazione;

      cleanup del payload json dopo la rimozione

Task 4: Utilizzo dei log di audit OCI da parte di SIEM di terze parti

Abbiamo implementato questa soluzione su strumenti SIEM comuni, ad esempio Splunk. Puoi utilizzare qualsiasi SIEM di terze parti che supporta l'utilizzo dalla connessione kafka o dallo storage degli oggetti OCI. Per ulteriori informazioni, vedere Installa e amministra Splunk Connect for Kafka e Estrai i log OCI utilizzando OCI Streaming e Kafka Connect to Splunk.

Conferme

Altre risorse di apprendimento

Esplora altri laboratori su docs.oracle.com/learn o accedi a più contenuti gratuiti sulla formazione su Oracle Learning YouTube channel. Inoltre, visita education.oracle.com/learning-explorer per diventare un Oracle Learning Explorer.

Per la documentazione del prodotto, visita l'Oracle Help Center.