Remarques :
- Ce tutoriel nécessite un accès à Oracle Cloud. Pour vous inscrire à un compte gratuit, reportez-vous à Introduction au niveau gratuit d'Oracle Cloud Infrastructure.
- Il utilise des exemples de valeurs pour les informations d'identification, la location et les compartiments Oracle Cloud Infrastructure. Lorsque vous terminez votre atelier, remplacez ces valeurs par celles propres à votre environnement cloud.
Protéger les journaux Oracle Cloud Infrastructure par occultation à l'aide de la fonction sans serveur pour les SIEM tiers
Introduction
Oracle Cloud Infrastructure Connector Hub (OCI Connector Hub) orchestre les déplacements de données entre des services OCI. Un connecteur indique le service source qui contient les données à déplacer, les tâches facultatives et le service cible pour la livraison de données.
Lorsque vous sélectionnez une source de journalisation pour transférer les données de journal à partir du service OCI Logging à l'aide du connecteur, elle lit les données à partir de la source et écrit vers le service cible "tel quel". Certains outils SIEM peuvent avoir des exigences pour occulter les éléments dans la charge utile json source. Il peut être crypté ou consommer de l'espace et n'apporte aucune valeur à la transformation des données.
Une tâche de fonction facultative sera utile dans le connecteur pour ce type d'exigences, dans le filtre de journal pour filtrer les données de journal à partir de la source, puis écrire dans le service cible.
Cette solution aidera les outils SIEM à adopter rapidement OCI avec un minimum de modifications concernant la réarchitecture, la formation ou les changements de processus. Cette automatisation prend en charge les journaux d'audit, de service et personnalisés OCI à partir du service de journalisation.
Objectifs
- Protégez dynamiquement les éléments cryptés ou sensibles à partir du journal OCI
json payload
.
Prérequis
L'administrateur Oracle Cloud Infrastructure Identity and Access Management (OCI IAM) doit créer les éléments suivants :
-
Groupe dynamique et stratégies pour OCI Functions afin d'appeler et d'enrichir le journal d'audit.
-
Groupe dynamique : créez un groupe dynamique pour le compartiment de fonction. Pour plus d'informations, reportez-vous à Création d'un groupe dynamique.
# Replace OCID for function compartment All {resource.type = 'fnfunc', resource.compartment.id = '<function-compartment>'} Example: All {resource.type = 'fnfunc', resource.compartment.id = 'ocid1.compartment.oc1..aaaaaaaanovmfmmnonjjyxeq4jyghszj2eczlrkgj5svnxrt...'}
-
-
Compartiment, groupe et stratégies permettant au développeur de créer et de déployer la fonction et le connecteur.
# Replace group-name as per your tenancy Allow group <group-name> to manage repos in compartment <function-compartment> allow group <group-name> to use cloud-shell in tenancy allow group <group-name> to manage functions-family in compartment <function-compartment> allow group <group-name> to use virtual-network-family in compartment <network-compartment> Allow group <group-name> to use objectstorage-namespaces in tenancy Allow group <group-name> to use stream-family in tenancy
Tâche 1 : création et déploiement de fonctions OCI pour la protection par occultation des données
-
Connectez-vous à la console OCI et cliquez sur OCI Cloud Shell.
-
Créez une fonction à l'aide de l'interface de ligne de commande du projet Fn à partir d'OCI Cloud Shell. Pour plus d'informations, reportez-vous à Création, déploiement et appel d'une fonction HelloWorld.
fn init --runtime python <function-name> Example: fn init --runtime python payload-trans-log-func
-
Passez au répertoire que vous venez de créer.
-
Créez une application pour déployer la fonction.
# Specify the OCID of subnet fn create app <function-app-name> --annotation oracle.com/oci/subnetIds='["<subnet OCID>"]' Example: fn create app payload-trans-log-app --annotation oracle.com/oci/subnetIds='["ocid1.subnet.oc1.ap-mumbai-1.aaaaaaaabitp32dkxxxxxxxxxxxxxk3evl2nxivwb....."]'
-
Copiez et collez le script suivant dans le fichier
func.py
en écrasant le contenu existant.import io import os import oci import json from fdk import response import requests import logging def handler(ctx, data: io.BytesIO = None): logger = logging.getLogger() try: exclude_list = [] # Reading json element from function config parameter for 'exclude' exclude_json = json.loads(os.environ["exclude"]) for pkey, ckey in exclude_json.items(): for lkey in ckey: exclude_list.append(pkey + "." + lkey ) except Exception as ex: print('ERROR: Missing configuration key', ex, flush=True) raise try: payload_bytes = data.getvalue() if payload_bytes == b'': raise KeyError('No keys in payload') # Reading source log json payload logs = json.loads(data.getvalue()) logDict1 = [] totalrecs = 0 for auditlogrec in logs: logDict1.append(auditlogrec) totalrecs+= 1 strlogDict = audit_log_parse1(logDict1) # Calling log parse function for data redaction as specified in exclude parameter by user auditlogDict = audit_log_parse(strlogDict, exclude_list) except Exception as ex: print('ERROR: Missing configuration key', ex, flush=True) raise return response.Response( ctx, status_code=200, response_data=json.dumps(auditlogDict), headers={"Content-Type": "application/json"},) # validating number of lines in source to verify after redaction def audit_log_parse1(auditrecs): logger = logging.getLogger() strDict = [] mcount = 0 skip_line = 0 for line in auditrecs: mcount += 1 try: mline = json.dumps(line).replace('\\n',"") strDict.append(mline) except Exception as ex: print("ERROR for line number-" + str(mcount), ex, flush=True) skip_line += 1 pass logger.info('Total lines-' + str(mcount) + ' Skipped lines-' + str(skip_line)) return strDict # log parse function for data redaction as specified in exclude parameter by user def audit_log_parse(rawText, exclude_list): logger = logging.getLogger() logDict = [] mcount = 0 insertcount = 0 skip_line = 0 for line in rawText: if skip_line == 1: line = line_part + line skip_line = 0 mcount += 1 try: audit_data = json.loads(line) except Exception as ex: logger.info('ERROR for line number-' + str(mcount)) skip_line = 1 line_part = line.rstrip() pass if skip_line == 0: for key, value in audit_data.items(): if key == "data": if (isinstance(value, dict)): # Extracting and replacing json payload for nested keys for key1, value1 in value.copy().items(): json_path_l1 = "data" + "." + key1 delete_flag = json_filter(value, exclude_list, json_path_l1) if (isinstance(value1, dict)) and delete_flag == 0: for key2, value2 in value1.copy().items(): json_path_l2 = json_path_l1 + "." + key2 # Extracting and replacing json payload delete_flag = json_filter(value, exclude_list, json_path_l2) if (isinstance(value2, dict)) and delete_flag == 0: for key3, value3 in value2.copy().items(): json_path_l3 = json_path_l2 + "." + key3 delete_flag = json_filter(value, exclude_list, json_path_l3) if (isinstance(value3, dict)) and delete_flag == 0: for key4, value4 in value3.copy().items(): json_path_l4 = json_path_l3 + "." + key4 delete_flag = json_filter(value, exclude_list, json_path_l4) if (isinstance(value4, dict)) and delete_flag == 0: for key5, value5 in value4.copy().items(): json_path_l5 = json_path_l4 + "." + key5 delete_flag = json_filter(value, exclude_list, json_path_l5) if skip_line == 0: logDict.append(audit_data) insertcount += 1 # Number of records written as per source in a batch logger.info('Total count ->' + str(mcount) + ' Insert count ->' + str(insertcount) + ' logDict count->' + str(len(logDict))) return logDict def json_filter(value, exclude_list, json_path): try: mjsonlist = [] for ejson_path in exclude_list: if ejson_path == json_path: jsonlist = ejson_path.split('.') for x in jsonlist: mjsonlist.append(x) if len(jsonlist) == 2: del value[mjsonlist[1]] return 1 if len(jsonlist) == 3: del value[mjsonlist[1]][mjsonlist[2]] return 1 if len(jsonlist) == 4: del value[mjsonlist[1]][mjsonlist[2]][mjsonlist[3]] return 1 if len(jsonlist) == 5: del value[mjsonlist[1][mjsonlist[2]][mjsonlist[3]]][mjsonlist[4]] return 1 return 0 except Exception as ex: print("ERROR for redacting elements from payload", ex, flush=True) return 1 pass
-
Vérifiez les packages suivants dans le fichier
requirements.txt
ou mettez-les à jour.fdk requests oci
-
Mettez à jour l'élément JSON non requis dans la section exclude dans
func.yaml
ou le paramètre de configuration de fonction après le déploiement.Config: exclude: {} Example: config: exclude: '{ "data.identity": [ "credentials"], "data.request.headers": [ "authorization", "Authorization", "X-OCI-LB-PrivateAccessMetadata", "opc-principal" ] }'
-
Exécutez la commande suivante pour déployer la fonction.
fn -v deploy --app <app-name> Example: fn -v deploy --app payload-trans-log-app
Tâche 2 : création et déploiement d'OCI Connector Hub avec la tâche de fonction
-
Accédez à la console OCI, cliquez sur Connecteurs et sélectionnez Créer un connecteur.
-
Sur la page Créer un connecteur, entrez un nom convivial pour le nouveau connecteur et une description facultative.
-
Sélectionnez le compartiment dans lequel stocker le nouveau connecteur.
-
Dans la section Configurer le connecteur, pour Source, sélectionnez Journalisation.
-
Dans Cible, sélectionnez le service vers lequel transférer les données de journal :
-
Object Storage : envoyez les données de journal à un bucket.
-
Streaming : envoyez les données de journal à un flux de données.
Remarque : vous pouvez utiliser une cible plus prise en charge. Nous décrivons OCI Streaming ou OCI Object Storage comme indiqué dans ce tutoriel.
-
-
Dans Configuration d'une connexion source, sélectionnez le nom du compartiment vers la racine et le groupe de journaux (Audit).
-
(Facultatif) Dans la section Configurer la tâche de fonction, configurez une tâche de fonction pour traiter les données de la source à l'aide du service OCI Functions.
-
Sélectionner une tâche : sélectionnez une fonction.
-
Compartiment : sélectionnez le compartiment contenant la fonction.
-
Application de fonction : sélectionnez le nom de l'application de fonction qui inclut la fonction.
-
Fonction : sélectionnez le nom de la fonction à utiliser pour traiter les données reçues de la source.
-
-
Si vous avez sélectionné Object Storage en tant que cible, sous Configurer la cible, configurez le bucket vers lequel envoyer les données de journal.
-
Compartiment : sélectionnez le compartiment qui contient le bucket de votre choix.
-
Regroupement : sélectionnez le nom du bucket auquel envoyer les données.
-
Préfixe de nom d'objet : (facultatif) saisissez une valeur de préfixe.
-
(Facultatif) Cliquez sur Afficher les options supplémentaires, entrez des valeurs pour la taille de batch (Mo) et le temps de batch (millisecondes).
Ou
Si vous avez sélectionné Transmission en continu en tant que cible, sous Configurer la cible, configurez le flux de données vers lequel envoyer les données de journal.
-
Compartiment : sélectionnez le compartiment qui contient le flux de données de votre choix.
-
Flux de données : sélectionnez le nom du flux de données auquel envoyer les données.
-
-
Cliquez sur Créer.
Tâche 3 : valider l'élément de protection par occultation dans le journal d'audit OCI
Dans ce tutoriel, nous utilisons le journal d'audit comme exemple, mais vous pouvez utiliser des journaux de service ou personnalisés ainsi que OCI Connector Hub pour la protection par occultation des données. Vérifiez si les données sont écrites sur la cible en accédant à la section Mesure après avoir créé le connecteur. Pour plus d'informations sur les mesures, reportez-vous à Référence des mesures de Connector Hub.
-
Sur la page Connecteurs, sélectionnez le connecteur contenant les mesures à utiliser.
-
Sur la page Détails du connecteur, sélectionnez Mesures.
-
(Facultatif) Filtrez les mesures par erreur, latence, source, cible ou tâche.
Vous pouvez voir des exemples de charge utile avant et après l'enrichissement.
-
Avant la protection par occultation :
-
Après la protection par occultation ;
-
Tâche 4 : utilisation des journaux d'audit OCI par un SIEM tiers
Nous avons déployé cette solution sur des outils SIEM courants, par exemple, Splunk. Vous pouvez utiliser n'importe quel SIEM tiers qui prend en charge l'utilisation à partir de kafka connect ou d'OCI Object Storage. Pour plus d'informations, reportez-vous à Installation et administration de Splunk Connect pour Kafka et à Flux de journaux OCI à l'aide d'OCI Streaming et de Kafka Connect pour Splunk.
Liens connexes
Remerciements
-
Auteur : Dipesh Kumar Rathod (architecte cloud principal, infrastructure)
-
Contributeur – Balaji Meruva (architecte cloud principal, infrastructure)
Ressources de formation supplémentaires
Explorez d'autres ateliers sur docs.oracle.com/learn ou accédez à d'autres contenus de formation gratuits sur le canal Oracle Learning YouTube. De plus, visitez le site education.oracle.com/learning-explorer pour devenir un explorateur Oracle Learning.
Pour obtenir la documentation produit, consultez le site Oracle Help Center.
Redact Oracle Cloud Infrastructure Logs using Serverless Function for Third-Party SIEMs
G18652-01
November 2024