Note :

Effectuer l'expurgation des journaux Oracle Cloud Infrastructure à l'aide d'une fonction sans serveur pour les fichiers SIEM de tierce partie

Présentation

Oracle Cloud Infrastructure Connector Hub (OCI Connector Hub) orchestre le déplacement des données entre les services dans OCI. Un connecteur indique le service source qui contient les données à déplacer, les tâches facultatives et le service cible pour la transmission des données.

Lorsque vous sélectionnez une source de journalisation pour transférer des données de journal à partir du service de journalisation OCI à l'aide d'un connecteur, il lit les données de la source et écrit dans le service cible "TEL QUEL". Certains outils SIEM peuvent avoir des exigences pour occulter les éléments dans les données utiles json sources. Il peut être chiffré ou consommer de l'espace et ne fournit pas de valeur à la transformation des données.

Une tâche de fonction facultative sera utile dans le connecteur pour ce type d'exigences, dans le filtre de journaux pour filtrer les données de journal de la source, puis écrire dans le service cible.

Cette solution aidera les outils SIEM à adopter rapidement OCI avec un minimum de modifications concernant la restructuration, la formation ou les modifications de processus. Cette automatisation prend en charge la vérification OCI, le service et les journaux personnalisés à partir du service de journalisation.

Objectifs

Préalables

L'administrateur d'Oracle Cloud Infrastructure Identity and Access Management (OCI IAM) doit créer :

Tâche 1 : Créer et déployer des fonctions OCI pour la protection par occultation de données

  1. Connectez-vous à la console OCI et cliquez sur Cloud Shell pour OCI.

  2. Créez une fonction à l'aide de l'interface de ligne de commande Fn Project à partir de Cloud Shell pour OCI. Pour plus d'informations, voir Création, déploiement et appel d'une fonction Helloworld.

    fn init --runtime python <function-name>
    
    Example: fn init --runtime python payload-trans-log-func
    
  3. Remplacez le répertoire par le nouveau répertoire créé.

  4. Créez une application pour déployer la fonction.

    # Specify the OCID of subnet
    fn create app <function-app-name> --annotation oracle.com/oci/subnetIds='["<subnet OCID>"]'
    
    Example:  fn create app payload-trans-log-app --annotation oracle.com/oci/subnetIds='["ocid1.subnet.oc1.ap-mumbai-1.aaaaaaaabitp32dkxxxxxxxxxxxxxk3evl2nxivwb....."]'
    
  5. Copiez et collez le script suivant dans le fichier func.py en remplaçant le contenu existant.

    import io
    import os
    import oci
    import json
    from fdk import response
    import requests
    import logging
    
    def handler(ctx, data: io.BytesIO = None):
        logger = logging.getLogger()
        try:
          exclude_list = []
          # Reading json element from function config parameter for 'exclude' 
          exclude_json = json.loads(os.environ["exclude"])
          for pkey, ckey in exclude_json.items():
                for lkey in ckey:
                   exclude_list.append(pkey + "." + lkey )
       except Exception as ex:
          print('ERROR: Missing configuration key', ex, flush=True)
          raise
    
       try:
          payload_bytes = data.getvalue()
          if payload_bytes == b'':
                raise KeyError('No keys in payload')
          # Reading source log json payload
          logs = json.loads(data.getvalue())
          logDict1 = []
          totalrecs = 0
          for auditlogrec in logs:
                logDict1.append(auditlogrec)
                totalrecs+= 1
          strlogDict = audit_log_parse1(logDict1)
          # Calling log parse function for data redaction as specified in exclude parameter by user
          auditlogDict = audit_log_parse(strlogDict, exclude_list)
       except Exception as ex:
          print('ERROR: Missing configuration key', ex, flush=True)
          raise
    
    return response.Response( ctx, status_code=200, response_data=json.dumps(auditlogDict), headers={"Content-Type": "application/json"},)
    
    # validating number of lines in source to verify after redaction
    def audit_log_parse1(auditrecs):
       logger = logging.getLogger()
       strDict = []
       mcount = 0
       skip_line = 0
       for line in auditrecs:
          mcount += 1
          try:
                mline = json.dumps(line).replace('\\n',"")
                strDict.append(mline)
          except Exception as ex:
                print("ERROR for line number-" + str(mcount), ex, flush=True)
                skip_line += 1
                pass
       logger.info('Total lines-' + str(mcount) + ' Skipped lines-' + str(skip_line))
       return strDict
    
    # log parse function for data redaction as specified in exclude parameter by user
    def audit_log_parse(rawText, exclude_list):
       logger = logging.getLogger()
       logDict = []
       mcount = 0
       insertcount = 0
       skip_line = 0
       for line in rawText:
          if skip_line == 1:
                line = line_part + line
          skip_line = 0
          mcount += 1
          try:
                audit_data = json.loads(line)
          except Exception as ex:
                logger.info('ERROR for line number-' + str(mcount))
                skip_line = 1
                line_part = line.rstrip()
                pass
    
          if skip_line == 0:
                for key, value in audit_data.items():
                   if key == "data":
                      if (isinstance(value, dict)):
                            # Extracting and replacing json payload for nested keys
                            for key1, value1 in value.copy().items():
                               json_path_l1 = "data" + "." + key1
                               delete_flag = json_filter(value, exclude_list, json_path_l1)
                               if (isinstance(value1, dict)) and delete_flag == 0:
                                  for key2, value2 in value1.copy().items():
                                        json_path_l2 = json_path_l1 + "." + key2
                                        # Extracting and replacing json payload 
                                        delete_flag = json_filter(value, exclude_list, json_path_l2)
                                        if (isinstance(value2, dict)) and delete_flag == 0:
                                           for key3, value3 in value2.copy().items():
                                              json_path_l3 = json_path_l2 + "." + key3
                                              delete_flag = json_filter(value, exclude_list, json_path_l3)
                                              if (isinstance(value3, dict)) and delete_flag == 0:
                                                    for key4, value4 in value3.copy().items():
                                                       json_path_l4 = json_path_l3 + "." + key4
                                                       delete_flag = json_filter(value, exclude_list, json_path_l4)
                                                       if (isinstance(value4, dict)) and delete_flag == 0:
                                                          for key5, value5 in value4.copy().items():
                                                                json_path_l5 = json_path_l4 + "." + key5
                                                                delete_flag = json_filter(value, exclude_list, json_path_l5)
    
          if skip_line == 0:
                logDict.append(audit_data)
                insertcount += 1
       # Number of records written as per source in a batch
       logger.info('Total count ->' + str(mcount) + ' Insert count ->' + str(insertcount) + ' logDict count->' + str(len(logDict)))
       return logDict
    
    
    def json_filter(value, exclude_list, json_path):
       try:
          mjsonlist = []
          for ejson_path in exclude_list:
                if ejson_path == json_path:
                   jsonlist = ejson_path.split('.')
                   for x in jsonlist:
                      mjsonlist.append(x)
                   if len(jsonlist) == 2:
                      del value[mjsonlist[1]]
                      return 1
                   if len(jsonlist) == 3:
                      del value[mjsonlist[1]][mjsonlist[2]]
                      return 1
                   if len(jsonlist) == 4:
                      del value[mjsonlist[1]][mjsonlist[2]][mjsonlist[3]]
                      return 1
                   if len(jsonlist) == 5:
                      del value[mjsonlist[1][mjsonlist[2]][mjsonlist[3]]][mjsonlist[4]]
                      return 1
          return 0
       except Exception as ex:
          print("ERROR for redacting elements from payload", ex, flush=True)
          return 1
          pass
    
  6. Vérifiez les ensembles suivants dans le fichier requirements.txt ou mettez-les à jour.

    fdk
    requests
    oci
    
  7. Mettez à jour l'élément JSON non requis dans la section Exclure de func.yaml ou le paramètre de configuration de fonction après le déploiement.

    Config:
    exclude: {}
    
    Example:
    config:
    exclude: '{ "data.identity": [ "credentials"], "data.request.headers": [ "authorization", "Authorization", "X-OCI-LB-PrivateAccessMetadata", "opc-principal" ] }'
    
  8. Exécutez la commande suivante pour déployer la fonction.

    fn -v deploy --app <app-name>
    Example: fn -v deploy --app payload-trans-log-app
    

Tâche 2 : Créer et déployer le centre de connecteurs OCI avec la tâche Fonction

  1. Allez à la console OCI, cliquez sur Connecteurs et sélectionnez Créer un connecteur.

  2. Dans la page Créer un connecteur, entrez un nom convivial pour le nouveau connecteur et une description facultative.

  3. Sélectionnez le compartiment dans lequel vous voulez stocker le nouveau connecteur.

  4. Dans la section Configurer le connecteur, pour Source, sélectionnez Journalisation.

  5. Dans Cible, sélectionnez le service vers lequel transférer les données de journal :

    • Stockage d'objets : Envoyer des données de journal à un seau.

    • Diffusion en continu : Envoyer des données de journal vers un flux.

    Note : Vous pouvez utiliser une cible plus prise en charge. Nous décrivons le service de diffusion en continu pour OCI ou le service de stockage d'objets pour OCI conformément à ce tutoriel.

  6. Dans Configurer la connexion source, sélectionnez le nom du compartiment en tant que racine et groupe de journaux (Vérification).

  7. (Facultatif) Dans la section Configurer une tâche de fonction, configurez une tâche de fonction pour traiter les données de la source à l'aide du service des fonctions pour OCI.

    • Sélectionner une tâche : Sélectionnez une fonction.

    • Compartiment : Sélectionnez le compartiment qui contient la fonction.

    • Application de fonction : Sélectionnez le nom de l'application de fonction qui inclut la fonction.

    • Fonction : Sélectionnez le nom de la fonction à utiliser pour traiter les données reçues de la source.

  8. Si vous avez sélectionné Stockage d'objets comme cible, sous Configurer la cible, configurez le seau auquel envoyer les données de journal.

    • Compartiment : Sélectionnez le compartiment qui contient le seau que vous voulez.

    • Seau : : Sélectionnez le nom du seau auquel vous voulez envoyer les données.

    • Préfixe de nom d'objet : (Facultatif) entrez une valeur de préfixe.

    • (Facultatif) Cliquez sur Afficher les options supplémentaires, entrez des valeurs pour la taille de lot (Mo) et le temps de lot (en millisecondes).

    Ou

    Si vous avez sélectionné Diffusion en continu comme cible, sous Configurer la cible, configurez le flux pour envoyer les données de journal.

    • Compartiment : Sélectionnez le compartiment qui contient le flux souhaité.

    • Flux : : Sélectionnez le nom du flux vers lequel vous voulez envoyer les données.

  9. Cliquez sur Créer.

Tâche 3 : Valider l'élément d'occultation dans le journal de vérification OCI

Dans ce tutoriel, nous utilisons le journal de vérification comme exemple, mais vous pouvez utiliser des journaux de service ou personnalisés ainsi que le centre de connecteurs OCI pour la protection par occultation des données. Vérifiez si les données sont écrites dans la cible en accédant à la section Mesure après avoir créé le connecteur. Pour plus d'informations sur les mesures, voir Informations de référence sur les mesures du centre de connecteurs.

  1. Dans la page Connecteurs, sélectionnez le connecteur qui contient les mesures que vous voulez utiliser.

  2. Dans la page Détails du connecteur, sélectionnez Mesures.

  3. (Facultatif) Filtrez les mesures par erreur, latence, source, cible ou tâche.

    Vous pouvez voir un exemple de données utiles avant et après l'enrichissement.

    • Avant l'expurgation :

      données utiles json avant suppression

    • Après la rédaction;

      nettoyer la charge utile json après la suppression

Tâche 4 : Consommer les journaux de vérification OCI par SIEM de tierce partie

Nous avons déployé cette solution sur des outils SIEM communs, par exemple, Splunk. Vous pouvez utiliser des solutions SIEM de tierce partie qui prennent en charge la consommation à partir de la connexion kafka ou du stockage d'objets OCI. Pour plus d'informations, voir Installer et administrer Splunk Connect pour Kafka et Fluxer les journaux OCI à l'aide du service de diffusion en continu pour OCI et de Kafka Connect pour Splunk.

Confirmation

Autres ressources d'apprentissage

Explorez d'autres laboratoires sur la page docs.oracle.com/learn ou accédez à plus de contenu d'apprentissage gratuit sur le canal YouTube d'Oracle Learning. De plus, visitez education.oracle.com/learning-explorer pour devenir un explorateur Oracle Learning.

Pour obtenir de la documentation sur le produit, visitez Oracle Help Center.