Hinweis:

Oracle Cloud Infrastructure-Logs mit serverloser Funktion für SIEMs von Drittanbietern verdecken

Einführung

Oracle Cloud Infrastructure Connector Hub (OCI Connector Hub) orchestriert das Verschieben von Daten zwischen Services in OCI. Ein Connector gibt den Quellservice an, der die zu verschiebenden Daten enthält, optionale Aufgaben sowie den Zielservice für die Bereitstellung von Daten.

Wenn Sie eine Loggingquelle auswählen, um Logdaten mit dem Connector aus dem OCI Logging-Service zu übertragen, liest sie Daten aus der Quelle und schreibt in den Zielservice "AS IS". Einige SIEM-Tools müssen möglicherweise die Elemente in der Quell-Json-Payload verdecken. Es kann entweder verschlüsselt sein oder Speicherplatz belegen und bietet keinen Wert für die Datentransformation.

Eine optionale Funktionsaufgabe ist im Connector für diese Art von Anforderungen nützlich, im Logfilter, um Logdaten aus der Quelle zu filtern und dann in den Zielservice zu schreiben.

Diese Lösung unterstützt SIEM-Tools bei der schnellen Einführung von OCI mit minimalen Änderungen in Bezug auf Architektur-, Schulungs- oder Prozessänderungen. Diese Automatisierung unterstützt OCI-Audit-, Service- und benutzerdefinierte Logs aus dem Logging-Service.

Ziele

Voraussetzungen

Der Oracle Cloud Infrastructure Identity and Access Management-(OCI IAM-)Administrator muss Folgendes erstellen:

Aufgabe 1: OCI Functions für die Datenverdeckung erstellen und bereitstellen

  1. Melden Sie sich bei der OCI-Konsole an, und klicken Sie auf OCI Cloud Shell.

  2. Funktion mit der Befehlszeilenschnittstelle (CLI) von Fn Project aus OCI Cloud Shell erstellen. Weitere Informationen finden Sie unter Helloworld-Funktion erstellen, bereitstellen und aufrufen.

    fn init --runtime python <function-name>
    
    Example: fn init --runtime python payload-trans-log-func
    
  3. Wechseln Sie in das neu erstellte Verzeichnis.

  4. Erstellen Sie eine Anwendung, um die Funktion bereitzustellen.

    # Specify the OCID of subnet
    fn create app <function-app-name> --annotation oracle.com/oci/subnetIds='["<subnet OCID>"]'
    
    Example:  fn create app payload-trans-log-app --annotation oracle.com/oci/subnetIds='["ocid1.subnet.oc1.ap-mumbai-1.aaaaaaaabitp32dkxxxxxxxxxxxxxk3evl2nxivwb....."]'
    
  5. Kopieren Sie das folgende Skript, und fügen Sie es in die Datei func.py ein, indem Sie den vorhandenen Inhalt überschreiben.

    import io
    import os
    import oci
    import json
    from fdk import response
    import requests
    import logging
    
    def handler(ctx, data: io.BytesIO = None):
        logger = logging.getLogger()
        try:
          exclude_list = []
          # Reading json element from function config parameter for 'exclude' 
          exclude_json = json.loads(os.environ["exclude"])
          for pkey, ckey in exclude_json.items():
                for lkey in ckey:
                   exclude_list.append(pkey + "." + lkey )
       except Exception as ex:
          print('ERROR: Missing configuration key', ex, flush=True)
          raise
    
       try:
          payload_bytes = data.getvalue()
          if payload_bytes == b'':
                raise KeyError('No keys in payload')
          # Reading source log json payload
          logs = json.loads(data.getvalue())
          logDict1 = []
          totalrecs = 0
          for auditlogrec in logs:
                logDict1.append(auditlogrec)
                totalrecs+= 1
          strlogDict = audit_log_parse1(logDict1)
          # Calling log parse function for data redaction as specified in exclude parameter by user
          auditlogDict = audit_log_parse(strlogDict, exclude_list)
       except Exception as ex:
          print('ERROR: Missing configuration key', ex, flush=True)
          raise
    
    return response.Response( ctx, status_code=200, response_data=json.dumps(auditlogDict), headers={"Content-Type": "application/json"},)
    
    # validating number of lines in source to verify after redaction
    def audit_log_parse1(auditrecs):
       logger = logging.getLogger()
       strDict = []
       mcount = 0
       skip_line = 0
       for line in auditrecs:
          mcount += 1
          try:
                mline = json.dumps(line).replace('\\n',"")
                strDict.append(mline)
          except Exception as ex:
                print("ERROR for line number-" + str(mcount), ex, flush=True)
                skip_line += 1
                pass
       logger.info('Total lines-' + str(mcount) + ' Skipped lines-' + str(skip_line))
       return strDict
    
    # log parse function for data redaction as specified in exclude parameter by user
    def audit_log_parse(rawText, exclude_list):
       logger = logging.getLogger()
       logDict = []
       mcount = 0
       insertcount = 0
       skip_line = 0
       for line in rawText:
          if skip_line == 1:
                line = line_part + line
          skip_line = 0
          mcount += 1
          try:
                audit_data = json.loads(line)
          except Exception as ex:
                logger.info('ERROR for line number-' + str(mcount))
                skip_line = 1
                line_part = line.rstrip()
                pass
    
          if skip_line == 0:
                for key, value in audit_data.items():
                   if key == "data":
                      if (isinstance(value, dict)):
                            # Extracting and replacing json payload for nested keys
                            for key1, value1 in value.copy().items():
                               json_path_l1 = "data" + "." + key1
                               delete_flag = json_filter(value, exclude_list, json_path_l1)
                               if (isinstance(value1, dict)) and delete_flag == 0:
                                  for key2, value2 in value1.copy().items():
                                        json_path_l2 = json_path_l1 + "." + key2
                                        # Extracting and replacing json payload 
                                        delete_flag = json_filter(value, exclude_list, json_path_l2)
                                        if (isinstance(value2, dict)) and delete_flag == 0:
                                           for key3, value3 in value2.copy().items():
                                              json_path_l3 = json_path_l2 + "." + key3
                                              delete_flag = json_filter(value, exclude_list, json_path_l3)
                                              if (isinstance(value3, dict)) and delete_flag == 0:
                                                    for key4, value4 in value3.copy().items():
                                                       json_path_l4 = json_path_l3 + "." + key4
                                                       delete_flag = json_filter(value, exclude_list, json_path_l4)
                                                       if (isinstance(value4, dict)) and delete_flag == 0:
                                                          for key5, value5 in value4.copy().items():
                                                                json_path_l5 = json_path_l4 + "." + key5
                                                                delete_flag = json_filter(value, exclude_list, json_path_l5)
    
          if skip_line == 0:
                logDict.append(audit_data)
                insertcount += 1
       # Number of records written as per source in a batch
       logger.info('Total count ->' + str(mcount) + ' Insert count ->' + str(insertcount) + ' logDict count->' + str(len(logDict)))
       return logDict
    
    
    def json_filter(value, exclude_list, json_path):
       try:
          mjsonlist = []
          for ejson_path in exclude_list:
                if ejson_path == json_path:
                   jsonlist = ejson_path.split('.')
                   for x in jsonlist:
                      mjsonlist.append(x)
                   if len(jsonlist) == 2:
                      del value[mjsonlist[1]]
                      return 1
                   if len(jsonlist) == 3:
                      del value[mjsonlist[1]][mjsonlist[2]]
                      return 1
                   if len(jsonlist) == 4:
                      del value[mjsonlist[1]][mjsonlist[2]][mjsonlist[3]]
                      return 1
                   if len(jsonlist) == 5:
                      del value[mjsonlist[1][mjsonlist[2]][mjsonlist[3]]][mjsonlist[4]]
                      return 1
          return 0
       except Exception as ex:
          print("ERROR for redacting elements from payload", ex, flush=True)
          return 1
          pass
    
  6. Prüfen Sie die folgenden Packages in der Datei requirements.txt, oder aktualisieren Sie sie.

    fdk
    requests
    oci
    
  7. Aktualisieren Sie das nicht erforderliche JSON-Element im Abschnitt exclude in func.yaml oder den Konfigurationsparameter der Funktion nach dem Deployment.

    Config:
    exclude: {}
    
    Example:
    config:
    exclude: '{ "data.identity": [ "credentials"], "data.request.headers": [ "authorization", "Authorization", "X-OCI-LB-PrivateAccessMetadata", "opc-principal" ] }'
    
  8. Führen Sie den folgenden Befehl aus, um die Funktion bereitzustellen.

    fn -v deploy --app <app-name>
    Example: fn -v deploy --app payload-trans-log-app
    

Aufgabe 2: OCI Connector Hub mit Funktionsaufgabe erstellen und bereitstellen

  1. Gehen Sie zur OCI-Konsole, klicken Sie auf Connectors, und wählen Sie Connector erstellen aus.

  2. Geben Sie auf der Seite Connector erstellen einen benutzerfreundlichen Namen für den neuen Connector und eine optionale Beschreibung ein.

  3. Wählen Sie das Compartment aus, in dem Sie den neuen Connector speichern möchten.

  4. Wählen Sie im Abschnitt Connector konfigurieren unter Quelle die Option Logging aus.

  5. Wählen Sie unter Ziel den Service aus, an den Sie die Logdaten übertragen möchten:

    • Object Storage: Senden Sie Logdaten an einen Bucket.

    • Streaming: Senden Sie Logdaten an einen Stream.

    Hinweis: Sie können ein stärker unterstütztes Ziel verwenden. Wir beschreiben OCI Streaming oder OCI Object Storage gemäß dieser Tutoriallösung.

  6. Wählen Sie unter Quellverbindung konfigurieren den Compartment-Namen für Root und Loggruppe (Audit) aus.

  7. (Optional) Konfigurieren Sie im Abschnitt Funktionsaufgabe konfigurieren eine Funktionsaufgabe, um Daten aus der Quelle mit dem OCI Functions-Service zu verarbeiten.

    • Aufgabe auswählen: Wählen Sie eine Funktion aus.

    • Compartment: Wählen Sie das Compartment aus, das die Funktion enthält.

    • Funktionsanwendung: Wählen Sie den Namen der Funktionsanwendung aus, die diese Funktion enthält.

    • Funktion: Wählen Sie den Namen der Funktion aus, mit der Sie die von der Quelle empfangenen Daten verarbeiten möchten.

  8. Wenn Sie Object Storage als Ziel ausgewählt haben, konfigurieren Sie unter Ziel konfigurieren den Bucket, an den die Logdaten gesendet werden sollen.

    • Compartment: Wählen Sie das Compartment mit dem gewünschten Bucket aus.

    • Bucket: Wählen Sie den Namen des Buckets aus, an den Sie die Daten senden möchten.

    • Objektnamenpräfix: (Optional) Geben Sie einen Präfixwert ein.

    • (Optional) Klicken Sie auf Zusätzliche Optionen anzeigen, und geben Sie Werte für Batchgröße (MBs) und Batchzeit (Millisekunden) ein.

    Oder

    Wenn Sie Streaming als Ziel ausgewählt haben, konfigurieren Sie unter Ziel konfigurieren den Stream, an den die Logdaten gesendet werden sollen.

    • Compartment: Wählen Sie das Compartment mit dem gewünschten Stream aus.

    • Stream: Wählen Sie den Namen des Streams aus, an den Sie die Daten senden möchten.

  9. Klicken Sie auf Erstellen.

Aufgabe 3: Verdeckungselement im OCI-Auditlog validieren

In diesem Tutorial verwenden wir das Auditlog als Beispiel. Sie können jedoch Service- oder benutzerdefinierte Logs sowie OCI Connector Hub zur Datenverdeckung verwenden. Prüfen Sie, ob Daten in das Ziel geschrieben wurden, indem Sie nach dem Erstellen des Connectors zum Abschnitt Metrik navigieren. Weitere Informationen über Metriken finden Sie in der Connector Hub-Metrikreferenz

  1. Wählen Sie auf der Seite Connectors den Connector aus, der die Metriken enthält, mit denen Sie arbeiten möchten.

  2. Wählen Sie auf der Seite Connector-Details die Option Metriken aus.

  3. (Optional) Filtern Sie Metriken nach Fehler, Latenz, Quelle, Ziel oder Aufgabe.

    Beispiel für Payload vor und nach der Anreicherung.

    • Vor der Verdeckung:

      json-Payload vor dem Entfernen

    • Nach der Verdeckung;

      json-Nutzlast nach dem Entfernen bereinigen

Aufgabe 4: OCI-Auditlogs nach Drittanbieter-SIEM konsumieren

Wir haben diese Lösung für gängige SIEM-Tools eingesetzt, zum Beispiel Splunk. Sie können alle SIEM-Komponenten von Drittanbietern verwenden, die Kafka Connect oder OCI Object Storage unterstützen. Weitere Informationen finden Sie unter Splunk Connect für Kafka installieren und verwalten und OCI-Logs mit OCI Streaming und Kafka Connect zu Splunk streamen.

Danksagungen

Weitere Lernressourcen

Sehen Sie sich andere Übungen zu docs.oracle.com/learn an, oder greifen Sie im Oracle Learning YouTube-Channel auf weitere kostenlose Lerninhalte zu. Besuchen Sie außerdem education.oracle.com/learning-explorer, um Oracle Learning Explorer zu werden.

Die Produktdokumentation finden Sie im Oracle Help Center.