Remarques :

Protéger les journaux Oracle Cloud Infrastructure par occultation à l'aide de la fonction sans serveur pour les SIEM tiers

Introduction

Oracle Cloud Infrastructure Connector Hub (OCI Connector Hub) orchestre les déplacements de données entre des services OCI. Un connecteur indique le service source qui contient les données à déplacer, les tâches facultatives et le service cible pour la livraison de données.

Lorsque vous sélectionnez une source de journalisation pour transférer les données de journal à partir du service OCI Logging à l'aide du connecteur, elle lit les données à partir de la source et écrit vers le service cible "tel quel". Certains outils SIEM peuvent avoir des exigences pour occulter les éléments dans la charge utile json source. Il peut être crypté ou consommer de l'espace et n'apporte aucune valeur à la transformation des données.

Une tâche de fonction facultative sera utile dans le connecteur pour ce type d'exigences, dans le filtre de journal pour filtrer les données de journal à partir de la source, puis écrire dans le service cible.

Cette solution aidera les outils SIEM à adopter rapidement OCI avec un minimum de modifications concernant la réarchitecture, la formation ou les changements de processus. Cette automatisation prend en charge les journaux d'audit, de service et personnalisés OCI à partir du service de journalisation.

Objectifs

Prérequis

L'administrateur Oracle Cloud Infrastructure Identity and Access Management (OCI IAM) doit créer les éléments suivants :

Tâche 1 : création et déploiement de fonctions OCI pour la protection par occultation des données

  1. Connectez-vous à la console OCI et cliquez sur OCI Cloud Shell.

  2. Créez une fonction à l'aide de l'interface de ligne de commande du projet Fn à partir d'OCI Cloud Shell. Pour plus d'informations, reportez-vous à Création, déploiement et appel d'une fonction HelloWorld.

    fn init --runtime python <function-name>
    
    Example: fn init --runtime python payload-trans-log-func
    
  3. Passez au répertoire que vous venez de créer.

  4. Créez une application pour déployer la fonction.

    # Specify the OCID of subnet
    fn create app <function-app-name> --annotation oracle.com/oci/subnetIds='["<subnet OCID>"]'
    
    Example:  fn create app payload-trans-log-app --annotation oracle.com/oci/subnetIds='["ocid1.subnet.oc1.ap-mumbai-1.aaaaaaaabitp32dkxxxxxxxxxxxxxk3evl2nxivwb....."]'
    
  5. Copiez et collez le script suivant dans le fichier func.py en écrasant le contenu existant.

    import io
    import os
    import oci
    import json
    from fdk import response
    import requests
    import logging
    
    def handler(ctx, data: io.BytesIO = None):
        logger = logging.getLogger()
        try:
          exclude_list = []
          # Reading json element from function config parameter for 'exclude' 
          exclude_json = json.loads(os.environ["exclude"])
          for pkey, ckey in exclude_json.items():
                for lkey in ckey:
                   exclude_list.append(pkey + "." + lkey )
       except Exception as ex:
          print('ERROR: Missing configuration key', ex, flush=True)
          raise
    
       try:
          payload_bytes = data.getvalue()
          if payload_bytes == b'':
                raise KeyError('No keys in payload')
          # Reading source log json payload
          logs = json.loads(data.getvalue())
          logDict1 = []
          totalrecs = 0
          for auditlogrec in logs:
                logDict1.append(auditlogrec)
                totalrecs+= 1
          strlogDict = audit_log_parse1(logDict1)
          # Calling log parse function for data redaction as specified in exclude parameter by user
          auditlogDict = audit_log_parse(strlogDict, exclude_list)
       except Exception as ex:
          print('ERROR: Missing configuration key', ex, flush=True)
          raise
    
    return response.Response( ctx, status_code=200, response_data=json.dumps(auditlogDict), headers={"Content-Type": "application/json"},)
    
    # validating number of lines in source to verify after redaction
    def audit_log_parse1(auditrecs):
       logger = logging.getLogger()
       strDict = []
       mcount = 0
       skip_line = 0
       for line in auditrecs:
          mcount += 1
          try:
                mline = json.dumps(line).replace('\\n',"")
                strDict.append(mline)
          except Exception as ex:
                print("ERROR for line number-" + str(mcount), ex, flush=True)
                skip_line += 1
                pass
       logger.info('Total lines-' + str(mcount) + ' Skipped lines-' + str(skip_line))
       return strDict
    
    # log parse function for data redaction as specified in exclude parameter by user
    def audit_log_parse(rawText, exclude_list):
       logger = logging.getLogger()
       logDict = []
       mcount = 0
       insertcount = 0
       skip_line = 0
       for line in rawText:
          if skip_line == 1:
                line = line_part + line
          skip_line = 0
          mcount += 1
          try:
                audit_data = json.loads(line)
          except Exception as ex:
                logger.info('ERROR for line number-' + str(mcount))
                skip_line = 1
                line_part = line.rstrip()
                pass
    
          if skip_line == 0:
                for key, value in audit_data.items():
                   if key == "data":
                      if (isinstance(value, dict)):
                            # Extracting and replacing json payload for nested keys
                            for key1, value1 in value.copy().items():
                               json_path_l1 = "data" + "." + key1
                               delete_flag = json_filter(value, exclude_list, json_path_l1)
                               if (isinstance(value1, dict)) and delete_flag == 0:
                                  for key2, value2 in value1.copy().items():
                                        json_path_l2 = json_path_l1 + "." + key2
                                        # Extracting and replacing json payload 
                                        delete_flag = json_filter(value, exclude_list, json_path_l2)
                                        if (isinstance(value2, dict)) and delete_flag == 0:
                                           for key3, value3 in value2.copy().items():
                                              json_path_l3 = json_path_l2 + "." + key3
                                              delete_flag = json_filter(value, exclude_list, json_path_l3)
                                              if (isinstance(value3, dict)) and delete_flag == 0:
                                                    for key4, value4 in value3.copy().items():
                                                       json_path_l4 = json_path_l3 + "." + key4
                                                       delete_flag = json_filter(value, exclude_list, json_path_l4)
                                                       if (isinstance(value4, dict)) and delete_flag == 0:
                                                          for key5, value5 in value4.copy().items():
                                                                json_path_l5 = json_path_l4 + "." + key5
                                                                delete_flag = json_filter(value, exclude_list, json_path_l5)
    
          if skip_line == 0:
                logDict.append(audit_data)
                insertcount += 1
       # Number of records written as per source in a batch
       logger.info('Total count ->' + str(mcount) + ' Insert count ->' + str(insertcount) + ' logDict count->' + str(len(logDict)))
       return logDict
    
    
    def json_filter(value, exclude_list, json_path):
       try:
          mjsonlist = []
          for ejson_path in exclude_list:
                if ejson_path == json_path:
                   jsonlist = ejson_path.split('.')
                   for x in jsonlist:
                      mjsonlist.append(x)
                   if len(jsonlist) == 2:
                      del value[mjsonlist[1]]
                      return 1
                   if len(jsonlist) == 3:
                      del value[mjsonlist[1]][mjsonlist[2]]
                      return 1
                   if len(jsonlist) == 4:
                      del value[mjsonlist[1]][mjsonlist[2]][mjsonlist[3]]
                      return 1
                   if len(jsonlist) == 5:
                      del value[mjsonlist[1][mjsonlist[2]][mjsonlist[3]]][mjsonlist[4]]
                      return 1
          return 0
       except Exception as ex:
          print("ERROR for redacting elements from payload", ex, flush=True)
          return 1
          pass
    
  6. Vérifiez les packages suivants dans le fichier requirements.txt ou mettez-les à jour.

    fdk
    requests
    oci
    
  7. Mettez à jour l'élément JSON non requis dans la section exclude dans func.yaml ou le paramètre de configuration de fonction après le déploiement.

    Config:
    exclude: {}
    
    Example:
    config:
    exclude: '{ "data.identity": [ "credentials"], "data.request.headers": [ "authorization", "Authorization", "X-OCI-LB-PrivateAccessMetadata", "opc-principal" ] }'
    
  8. Exécutez la commande suivante pour déployer la fonction.

    fn -v deploy --app <app-name>
    Example: fn -v deploy --app payload-trans-log-app
    

Tâche 2 : création et déploiement d'OCI Connector Hub avec la tâche de fonction

  1. Accédez à la console OCI, cliquez sur Connecteurs et sélectionnez Créer un connecteur.

  2. Sur la page Créer un connecteur, entrez un nom convivial pour le nouveau connecteur et une description facultative.

  3. Sélectionnez le compartiment dans lequel stocker le nouveau connecteur.

  4. Dans la section Configurer le connecteur, pour Source, sélectionnez Journalisation.

  5. Dans Cible, sélectionnez le service vers lequel transférer les données de journal :

    • Object Storage : envoyez les données de journal à un bucket.

    • Streaming : envoyez les données de journal à un flux de données.

    Remarque : vous pouvez utiliser une cible plus prise en charge. Nous décrivons OCI Streaming ou OCI Object Storage comme indiqué dans ce tutoriel.

  6. Dans Configuration d'une connexion source, sélectionnez le nom du compartiment vers la racine et le groupe de journaux (Audit).

  7. (Facultatif) Dans la section Configurer la tâche de fonction, configurez une tâche de fonction pour traiter les données de la source à l'aide du service OCI Functions.

    • Sélectionner une tâche : sélectionnez une fonction.

    • Compartiment : sélectionnez le compartiment contenant la fonction.

    • Application de fonction : sélectionnez le nom de l'application de fonction qui inclut la fonction.

    • Fonction : sélectionnez le nom de la fonction à utiliser pour traiter les données reçues de la source.

  8. Si vous avez sélectionné Object Storage en tant que cible, sous Configurer la cible, configurez le bucket vers lequel envoyer les données de journal.

    • Compartiment : sélectionnez le compartiment qui contient le bucket de votre choix.

    • Regroupement : sélectionnez le nom du bucket auquel envoyer les données.

    • Préfixe de nom d'objet : (facultatif) saisissez une valeur de préfixe.

    • (Facultatif) Cliquez sur Afficher les options supplémentaires, entrez des valeurs pour la taille de batch (Mo) et le temps de batch (millisecondes).

    Ou

    Si vous avez sélectionné Transmission en continu en tant que cible, sous Configurer la cible, configurez le flux de données vers lequel envoyer les données de journal.

    • Compartiment : sélectionnez le compartiment qui contient le flux de données de votre choix.

    • Flux de données : sélectionnez le nom du flux de données auquel envoyer les données.

  9. Cliquez sur Créer.

Tâche 3 : valider l'élément de protection par occultation dans le journal d'audit OCI

Dans ce tutoriel, nous utilisons le journal d'audit comme exemple, mais vous pouvez utiliser des journaux de service ou personnalisés ainsi que OCI Connector Hub pour la protection par occultation des données. Vérifiez si les données sont écrites sur la cible en accédant à la section Mesure après avoir créé le connecteur. Pour plus d'informations sur les mesures, reportez-vous à Référence des mesures de Connector Hub.

  1. Sur la page Connecteurs, sélectionnez le connecteur contenant les mesures à utiliser.

  2. Sur la page Détails du connecteur, sélectionnez Mesures.

  3. (Facultatif) Filtrez les mesures par erreur, latence, source, cible ou tâche.

    Vous pouvez voir des exemples de charge utile avant et après l'enrichissement.

    • Avant la protection par occultation :

      Charge utile json avant suppression

    • Après la protection par occultation ;

      nettoyer la charge utile json après sa suppression

Tâche 4 : utilisation des journaux d'audit OCI par un SIEM tiers

Nous avons déployé cette solution sur des outils SIEM courants, par exemple, Splunk. Vous pouvez utiliser n'importe quel SIEM tiers qui prend en charge l'utilisation à partir de kafka connect ou d'OCI Object Storage. Pour plus d'informations, reportez-vous à Installation et administration de Splunk Connect pour Kafka et à Flux de journaux OCI à l'aide d'OCI Streaming et de Kafka Connect pour Splunk.

Remerciements

Ressources de formation supplémentaires

Explorez d'autres ateliers sur docs.oracle.com/learn ou accédez à d'autres contenus de formation gratuits sur le canal Oracle Learning YouTube. De plus, visitez le site education.oracle.com/learning-explorer pour devenir un explorateur Oracle Learning.

Pour obtenir la documentation produit, consultez le site Oracle Help Center.