주:
- 이 자습서에서는 Oracle Cloud에 액세스해야 합니다. 무료 계정에 등록하려면 Oracle Cloud Infrastructure Free Tier 시작하기를 참조하십시오.
- Oracle Cloud Infrastructure 자격 증명, 테넌시 및 구획에 예제 값을 사용합니다. 실습을 완료했으면 이러한 값을 자신의 클라우드 환경과 관련된 값으로 대체하십시오.
타사 SIEM용 Serverless Function을 사용하여 Oracle Cloud Infrastructure 로그 수정
소개
Oracle Cloud Infrastructure Connector Hub(OCI Connector Hub)는 OCI의 서비스 간 데이터 이동을 통합관리합니다. 커넥터는 이동할 데이터, 선택적 작업 및 데이터 전달을 위한 대상 서비스가 포함된 소스 서비스를 지정합니다.
커넥터를 사용하여 OCI 로깅 서비스에서 로그 데이터를 전송할 로깅 소스를 선택하면 소스에서 데이터를 읽고 대상 서비스 '있는 그대로'에 기록합니다. 일부 SIEM 도구에는 소스 JSON 페이로드의 요소를 수정하기 위한 요구사항이 있을 수 있습니다. 암호화(encrypted)되거나 공간을 사용할 수 있으며 데이터 변형에 가치를 제공하지 않습니다.
선택적 함수 작업은 이러한 종류의 요구사항에 대한 커넥터에서 소스에서 로그 데이터를 필터링한 다음 대상 서비스에 쓰기 위한 로그 필터에 유용합니다.
이 솔루션은 SIEM 툴이 아키텍처 변경, 교육 또는 프로세스 변경과 관련하여 최소한의 조정만으로 OCI를 신속하게 채택할 수 있도록 지원합니다. 이 자동화는 로깅 서비스의 OCI 감사, 서비스 및 사용자정의 로그를 지원합니다.
목표
- OCI 로그
json payload
에서 암호화되거나 민감한 요소를 동적으로 개정합니다.
필요 조건
Oracle Cloud Infrastructure Identity and Access Management(OCI IAM) 관리자는 다음을 생성해야 합니다.
-
감사 로그를 호출 및 보강하기 위한 OCI 함수의 동적 그룹 및 정책
-
동적 그룹: 함수 컴파트먼트에 대한 동적 그룹을 생성합니다. 자세한 내용은 동적 그룹 생성을 참조하십시오.
# Replace OCID for function compartment All {resource.type = 'fnfunc', resource.compartment.id = '<function-compartment>'} Example: All {resource.type = 'fnfunc', resource.compartment.id = 'ocid1.compartment.oc1..aaaaaaaanovmfmmnonjjyxeq4jyghszj2eczlrkgj5svnxrt...'}
-
-
개발자가 함수 및 커넥터를 생성하고 배치할 수 있는 컴파트먼트, 그룹 및 정책입니다.
# Replace group-name as per your tenancy Allow group <group-name> to manage repos in compartment <function-compartment> allow group <group-name> to use cloud-shell in tenancy allow group <group-name> to manage functions-family in compartment <function-compartment> allow group <group-name> to use virtual-network-family in compartment <network-compartment> Allow group <group-name> to use objectstorage-namespaces in tenancy Allow group <group-name> to use stream-family in tenancy
작업 1: Data Redaction용 OCI 함수 생성 및 배치
-
OCI 콘솔에 로그인하고 OCI Cloud Shell을 누릅니다.
-
OCI Cloud Shell에서 Fn Project CLI(명령행 인터페이스)를 사용하여 함수를 생성합니다. 자세한 내용은 Helloworld 함수 생성, 배치 및 호출을 참조하십시오.
fn init --runtime python <function-name> Example: fn init --runtime python payload-trans-log-func
-
디렉토리를 새로 생성된 디렉토리로 변경합니다.
-
함수를 배치할 응용 프로그램을 생성합니다.
# Specify the OCID of subnet fn create app <function-app-name> --annotation oracle.com/oci/subnetIds='["<subnet OCID>"]' Example: fn create app payload-trans-log-app --annotation oracle.com/oci/subnetIds='["ocid1.subnet.oc1.ap-mumbai-1.aaaaaaaabitp32dkxxxxxxxxxxxxxk3evl2nxivwb....."]'
-
기존 콘텐츠를 겹쳐써서 다음 스크립트를 복사하여
func.py
파일에 붙여 넣습니다.import io import os import oci import json from fdk import response import requests import logging def handler(ctx, data: io.BytesIO = None): logger = logging.getLogger() try: exclude_list = [] # Reading json element from function config parameter for 'exclude' exclude_json = json.loads(os.environ["exclude"]) for pkey, ckey in exclude_json.items(): for lkey in ckey: exclude_list.append(pkey + "." + lkey ) except Exception as ex: print('ERROR: Missing configuration key', ex, flush=True) raise try: payload_bytes = data.getvalue() if payload_bytes == b'': raise KeyError('No keys in payload') # Reading source log json payload logs = json.loads(data.getvalue()) logDict1 = [] totalrecs = 0 for auditlogrec in logs: logDict1.append(auditlogrec) totalrecs+= 1 strlogDict = audit_log_parse1(logDict1) # Calling log parse function for data redaction as specified in exclude parameter by user auditlogDict = audit_log_parse(strlogDict, exclude_list) except Exception as ex: print('ERROR: Missing configuration key', ex, flush=True) raise return response.Response( ctx, status_code=200, response_data=json.dumps(auditlogDict), headers={"Content-Type": "application/json"},) # validating number of lines in source to verify after redaction def audit_log_parse1(auditrecs): logger = logging.getLogger() strDict = [] mcount = 0 skip_line = 0 for line in auditrecs: mcount += 1 try: mline = json.dumps(line).replace('\\n',"") strDict.append(mline) except Exception as ex: print("ERROR for line number-" + str(mcount), ex, flush=True) skip_line += 1 pass logger.info('Total lines-' + str(mcount) + ' Skipped lines-' + str(skip_line)) return strDict # log parse function for data redaction as specified in exclude parameter by user def audit_log_parse(rawText, exclude_list): logger = logging.getLogger() logDict = [] mcount = 0 insertcount = 0 skip_line = 0 for line in rawText: if skip_line == 1: line = line_part + line skip_line = 0 mcount += 1 try: audit_data = json.loads(line) except Exception as ex: logger.info('ERROR for line number-' + str(mcount)) skip_line = 1 line_part = line.rstrip() pass if skip_line == 0: for key, value in audit_data.items(): if key == "data": if (isinstance(value, dict)): # Extracting and replacing json payload for nested keys for key1, value1 in value.copy().items(): json_path_l1 = "data" + "." + key1 delete_flag = json_filter(value, exclude_list, json_path_l1) if (isinstance(value1, dict)) and delete_flag == 0: for key2, value2 in value1.copy().items(): json_path_l2 = json_path_l1 + "." + key2 # Extracting and replacing json payload delete_flag = json_filter(value, exclude_list, json_path_l2) if (isinstance(value2, dict)) and delete_flag == 0: for key3, value3 in value2.copy().items(): json_path_l3 = json_path_l2 + "." + key3 delete_flag = json_filter(value, exclude_list, json_path_l3) if (isinstance(value3, dict)) and delete_flag == 0: for key4, value4 in value3.copy().items(): json_path_l4 = json_path_l3 + "." + key4 delete_flag = json_filter(value, exclude_list, json_path_l4) if (isinstance(value4, dict)) and delete_flag == 0: for key5, value5 in value4.copy().items(): json_path_l5 = json_path_l4 + "." + key5 delete_flag = json_filter(value, exclude_list, json_path_l5) if skip_line == 0: logDict.append(audit_data) insertcount += 1 # Number of records written as per source in a batch logger.info('Total count ->' + str(mcount) + ' Insert count ->' + str(insertcount) + ' logDict count->' + str(len(logDict))) return logDict def json_filter(value, exclude_list, json_path): try: mjsonlist = [] for ejson_path in exclude_list: if ejson_path == json_path: jsonlist = ejson_path.split('.') for x in jsonlist: mjsonlist.append(x) if len(jsonlist) == 2: del value[mjsonlist[1]] return 1 if len(jsonlist) == 3: del value[mjsonlist[1]][mjsonlist[2]] return 1 if len(jsonlist) == 4: del value[mjsonlist[1]][mjsonlist[2]][mjsonlist[3]] return 1 if len(jsonlist) == 5: del value[mjsonlist[1][mjsonlist[2]][mjsonlist[3]]][mjsonlist[4]] return 1 return 0 except Exception as ex: print("ERROR for redacting elements from payload", ex, flush=True) return 1 pass
-
requirements.txt
파일에서 다음 패키지를 확인하거나 업데이트합니다.fdk requests oci
-
func.yaml
의 exclude 섹션 또는 함수 구성 매개변수 사후 배치에서 필요하지 않은 JSON 요소를 업데이트합니다.Config: exclude: {} Example: config: exclude: '{ "data.identity": [ "credentials"], "data.request.headers": [ "authorization", "Authorization", "X-OCI-LB-PrivateAccessMetadata", "opc-principal" ] }'
-
다음 명령을 실행하여 함수를 배치합니다.
fn -v deploy --app <app-name> Example: fn -v deploy --app payload-trans-log-app
작업 2: 함수 작업으로 OCI Connector Hub 생성 및 배치
-
OCI 콘솔로 이동하여 커넥터를 누르고 커넥터 생성을 선택합니다.
-
커넥터 생성 페이지에서 새 커넥터에 대해 사용자에게 친숙한 이름과 선택적 설명을 입력합니다.
-
새 커넥터를 저장할 구획을 선택합니다.
-
커넥터 구성 섹션에서 소스에 대해 로깅을 선택합니다.
-
대상에서 로그 데이터를 전송할 서비스를 선택합니다.
-
오브젝트 스토리지: 버킷에 로그 데이터를 보냅니다.
-
스트리밍: 스트림에 로그 데이터를 전송합니다.
주: 지원되는 대상을 더 많이 사용할 수 있습니다. 이 자습서 솔루션에 따라 OCI Streaming 또는 OCI Object Storage에 대해 설명합니다.
-
-
소스 접속 구성에서 루트 및 로그 그룹에 대한 컴파트먼트 이름(감사)을 선택합니다.
-
(선택사항) 함수 작업 구성 섹션에서 OCI 함수 서비스를 사용하여 소스의 데이터를 처리하도록 함수 작업을 구성합니다.
-
태스크 선택: 함수를 선택합니다.
-
컴파트먼트: 함수가 포함된 컴파트먼트를 선택합니다.
-
함수 응용 프로그램: 함수를 포함하는 함수 응용 프로그램의 이름을 선택합니다.
-
함수: 소스에서 수신된 데이터를 처리하는 데 사용할 함수의 이름을 선택합니다.
-
-
Object Storage를 대상으로 선택한 경우 대상 구성에서 로그 데이터를 전송할 버킷을 구성합니다.
-
구획: 원하는 버킷을 포함하는 구획을 선택합니다.
-
버킷: 데이터를 전송할 버킷의 이름을 선택합니다.
-
객체 이름 접두어:(선택사항) 접두어 값을 입력합니다.
-
(선택 사항) 추가 옵션 표시를 누르고 일괄 처리 크기(MB) 및 일괄 처리 시간(밀리초)에 대한 값을 입력합니다.
또는
스트리밍을 대상으로 선택한 경우 대상 구성에서 로그 데이터를 전송할 스트림을 구성합니다.
-
구획: 원하는 스트림을 포함하는 구획을 선택합니다.
-
스트림: 데이터를 전송할 스트림의 이름을 선택합니다.
-
-
Create를 누릅니다.
작업 3: OCI 감사 로그에서 개정 요소 검증
이 사용지침서에서는 감사 로그를 예로 사용하지만 OCI Connector Hub에서 데이터 편집을 위해 서비스 또는 사용자정의 로그를 사용할 수 있습니다. 커넥터를 생성한 후 측정항목 섹션으로 이동하여 대상에 데이터가 기록되었는지 확인합니다. 측정 단위에 대한 자세한 내용은 Connector Hub 측정 단위 참조를 참조하십시오.
-
커넥터 페이지에서 작업할 측정항목이 포함된 커넥터를 선택합니다.
-
커넥터 세부정보 페이지에서 측정항목을 선택합니다.
-
(선택사항) 오류, 대기 시간, 소스, 대상 또는 작업별로 측정항목을 필터링합니다.
보강 전후에 페이로드 예를 볼 수 있습니다.
-
개정 전:
-
수정 후;
-
작업 4: 타사 SIEM에서 OCI 감사 로그 소비
이 솔루션은 일반적인 SIEM 도구(예: Splunk)에 배포했습니다. Kafka Connect 또는 OCI Object Storage에서 사용할 수 있도록 지원하는 타사 SIEM을 사용할 수 있습니다. 자세한 내용은 Install and Administer Splunk Connect for Kafka 및 OCI Streaming 및 Kafka Connect to Splunk를 사용한 스트림 OCI 로그를 참조하십시오.
관련 링크
확인
-
작성자 - Dipesh Kumar Rathod(Master Principal Cloud Architect, Infrastructure)
-
제공자 – Balaji Meruva(마스터 수석 클라우드 아키텍트, 인프라)
추가 학습 자원
docs.oracle.com/learn에서 다른 실습을 탐색하거나 Oracle Learning YouTube 채널에서 더 많은 무료 학습 콘텐츠에 액세스하세요. 또한 Oracle Learning Explorer가 되려면 education.oracle.com/learning-explorer을 방문하십시오.
제품 설명서는 Oracle Help Center를 참조하십시오.
Redact Oracle Cloud Infrastructure Logs using Serverless Function for Third-Party SIEMs
G18655-01
November 2024