注:

以 CSV 格式导出自定义时间范围的 OCI 审计日志

简介

OCI 审计日志与 Oracle Cloud Infrastructure Audit 服务发出的事件相关。这些日志可从“OCI 控制台日志记录审计”页获取,也可在“搜索”页和其余日志中搜索。您可以从 Oracle Cloud 控制台以 JSON 格式导出 Oracle Cloud Infrastructure (OCI) 审计日志,以 14 天为限的定制时间范围导出。

在本教程中,您将学习如何部署定制脚本以 CSV 格式导出超过 14 天的审计日志。

:审计日志的最大保留期仅为 365 天。

目标

部署定制 Python 脚本以在任意定制时间范围内(少于 365 天)以 CSV 格式导出 OCI 审计日志。

先决条件

任务 1:部署定制 Python 脚本

  1. 使用命令验证 VM 上是否安装了 Python。

    python3 –-version
    
  2. 使用命令安装 Pandas 库。

    pip3 install pandas
    
  3. 创建新目录并更改为用于部署脚本的目录。

    mkdir <directory-name>
    cd <directory-name>
    
  4. 创建名为 main.py 的文件并使用任何编辑器编辑该文件(此处使用了 vim 编辑器)。

    touch main.py
    vim main.py
    
  5. 复制 - 按如下所示在 main.py 文件中粘贴脚本的内容,并通过键入 :x 并单击 Enter 保存文件内容。

    import pandas as pd
    import csv
    import os
    import datetime
    from datetime import timedelta,date
    import subprocess
    import json
    import sys
    import getopt
    
    def myfunc(argv):
    
        START_DATE = ""
        END_DATE = ""
        COMPARTMENT_ID = ""
        arg_help = "\nUsage:\n\npython3 main.py -s <log_start_date in YYYY-MM-DD Format> -e <log_end_date in YYYY-MM-DD Format> -c <Compartment_ID>\n".format(argv[0])
        try:
            opts, args = getopt.getopt(argv[1:],"hs:e:c:")
        except:
            print("\nenter valid argument values. Try -h option with script to know the usage.\n")
            sys.exit(2)
    
        for opt, arg in opts:
            if opt in ("-h"):
                print(arg_help)  # print the help message
                sys.exit(2)
            elif opt in ("-s"):
                START_DATE = arg
            elif opt in ("-e"):
                END_DATE = arg
            elif opt in ("-c"):
                COMPARTMENT_ID = arg
    
        INC = -1
        year1, month1, day1 = map(int, START_DATE.split('-'))
        DATE1 = datetime.date(year1, month1, day1)
        year2, month2, day2 = map(int, END_DATE.split('-'))
        DATE2 = datetime.date(year2, month2, day2)
        NUM_DAYS = DATE2 - DATE1
        NUM_DAYS = NUM_DAYS.days
    
    # Function for converting JSON to .CSV format
    
        def convert_csv():
    
            with open('out.json', 'r') as file:
                data = json.load(file)
            datetimez = []
            compartmentID = []
            compartmentName = []
            message = []
            tenantId = []
            userAgent = []
            path = []
            ingestedtime = []
            type = []
            id = []
            for ele in data['data']['results']:
                datetimez.append(ele['data']['datetime'])
                compartmentID.append(ele['data']['logContent']['data']['compartmentId'])
                compartmentName.append(ele['data']['logContent']['data']['compartmentName'])
                message.append(ele['data']['logContent']['data']['message'])
                tenantId.append(ele['data']['logContent']['data']['identity']['tenantId'])
                userAgent.append(ele['data']['logContent']['data']['identity']['userAgent'])
                path.append(ele['data']['logContent']['data']['request']['path'])
                ingestedtime.append(ele['data']['logContent']['oracle']['ingestedtime'])
                type.append(ele['data']['logContent']['type'])
                id.append(ele['data']['logContent']['id'])
            finaldate = []
            for ts in datetimez:
                finaldate.append(datetime.datetime.fromtimestamp(int(ts) / 1000).strftime('%Y-%m-%d %H:%M:%S'))
    
            output = zip(finaldate, compartmentID, compartmentName, message, tenantId, userAgent, path, ingestedtime, type, id)
            output = list(output)
            df = pd.DataFrame(output)
            df.to_csv('/tmp/out.csv', header=False , mode='a',index=False)
            return None
    
    # Check and validate the .CSV file in the /tmp directory
    
        os.system("touch /tmp/out.csv" )
        os.remove("/tmp/out.csv")
        header=['Date-Time', 'CompartmentID', 'CompartmentName', 'Message', 'TenantId', 'UserAgent', 'Path', 'Ingested-Time', 'Type', 'ID']
        data = []
        with open('/tmp/out.csv', 'a') as f:
            writer = csv.writer(f)
            writer.writerow(header)
            writer.writerows(data)
    
    # Block for saving Audit Logs in JSON format to out.json file    
    
        for i in range(INC, NUM_DAYS):
            print("\ncollecting logs for", DATE1)
            p = subprocess.Popen(''' oci logging-search search-logs --search-query 'search "''' + str(COMPARTMENT_ID) + '''/_Audit" | sort by datetime desc' --time-start ''' + str(DATE1) + '''"T00:00:00Z" --time-end ''' + str(DATE1) + '''"T23:59:00Z" > out.json ''', shell=True, stdout=subprocess.PIPE)
            (output, err) = p.communicate()
            convert_csv()
            PAG = subprocess.check_output(''' oci logging-search search-logs --search-query 'search "''' + str(COMPARTMENT_ID) + '''/_Audit" | sort by datetime desc' --time-start ''' + str(DATE1) + '''"T00:00:00Z" --time-end ''' + str(DATE1) + '''"T23:59:00Z" | grep "opc-next-page" | awk -F":" '{print $2}' | tr -d '"' | tr -d " " | tail -1 ''', shell=True).strip().decode('ascii')
    
            while (PAG != ""):
                p = subprocess.Popen(''' oci logging-search search-logs --search-query 'search "''' + str(COMPARTMENT_ID) + '''/_Audit" | sort by datetime desc' --time-start ''' + str(DATE1) + '''"T00:00:00Z" --time-end ''' + str(DATE1) + '''"T23:59:00Z" --page ''' + str(PAG) + '''  > out.json ''', shell=True, stdout=subprocess.PIPE)
                (output, err) = p.communicate()
                convert_csv()
                PAG = subprocess.check_output(''' oci logging-search search-logs --search-query 'search "''' + str(COMPARTMENT_ID) + '''/_Audit" | sort by datetime desc' --time-start ''' + str(DATE1) + '''"T00:00:00Z" --time-end ''' + str(DATE1) + '''"T23:59:00Z" --page ''' + str(PAG) + ''' | grep "opc-next-page" | awk -F":" '{print $2}' | tr -d '"' | tr -d " " | tail -1 ''', shell=True).strip().decode('ascii')
            print("successfully collected logs for", DATE1)
            DATE1 += timedelta(days=1)
            i = i + 1
    
        print("\nThe .csv file is saved in location /tmp/out.csv")
    
    if __name__ == "__main__":
        myfunc(sys.argv)
    
    

任务 2:执行定制 Python 脚本

  1. 使用命令为 main.py 文件提供执行权限。

    chmod +x main.py
    
  2. 需要使用日志开始日期、日志结束日期和区间 ID 参数运行脚本。要检查脚本的准确用法,请使用“-h”选项运行脚本,如下所示。

    python3 main.py -h
    

    这将显示该脚本的正确语法。

    脚本帮助

  3. 运行脚本后,运行脚本的日期将显示在屏幕上并显示成功消息,后跟说明输出 .csv 文件位置的消息。

    脚本输出

nohup python3 main.py -s <log_start_date in YYYY-MM-DD Format> -e <log_end_date in YYYY-MM-DD Format> -c ><Compartment_ID> &

CSV 文件将生成并存储在以下文件夹中:/tmp/out.csv

任务 3:计划审计日志的收集(可选)

如果要安排审计日志的每月/每周集合,您可以通过设置 cronjobs,使较旧的 .csv 文件复制到其他文件,并且新日志保存在 out.csv 文件中。

  1. 键入以下命令以编辑 crontab。

    crontab -e
    
  2. 添加要调度的以下 cronjob。确保将 **** 参数替换为区间/租户的区间 ID。

    0 1 1 * * export DATE=$(date +'%Y-%m-%d-%T') && cp /tmp/out.csv out-$DATE.csv
    15 1 1 * * export DATE1=$(date +'%Y-%m-%d') && export DATE2=$(date -d "$DATE1 -1 month" +%Y-%m-%d) && python3 main.py -s DATE1 -e DATE2 -c <Compartment_ID>
    

    以上职务分别在每月 1 日上午 1 点和上午 1.15 点运行。您可以根据需要更改时间。

    :您可以安排作业在每月的第 30 或 31 天运行,以便脚本不会错过一天或返回错误。

  3. 使用以下命令保存文件并查看活动的 cronjob。

    crontab -l
    

确认

更多学习资源

探索 docs.oracle.com/learn 上的其他实验室,或者访问 Oracle Learning YouTube 频道上的更多免费学习内容。此外,请访问 education.oracle.com/learning-explorer 成为 Oracle Learning Explorer。

有关产品文档,请访问 Oracle 帮助中心