设置数据流以处理 Oracle Cloud Infrastructure 中的内部部署日志

简介

在此教程中,您将预配云基础设施并部署 PySpark 代码,以便在云中自动上载、处理和存储内部部署日志数据。这些数据存储在对象存储中,在数据流中处理,并存储在 Autonomous Data Warehouse 中进行分析。

下表显示了体系结构:

插图体系结构 -analyze-logs.png 的说明

您可以创建 PySpark 应用程序,然后使用为此目的设计的 Terraform 脚本预配 Oracle Cloud Infrastructure (OCI)。您可以从 Oracle 的 GitHub 资料档案库下载脚本,并进行少量编辑来更新某些变量值。然后应用 Terraform 脚本。

目标

完成本教程后,您将有一个工作系统从对象存储中提取数据,使用 PySpark 应用在数据流中处理数据,并将结果存储在 Autonomous Data Warehouse 中。

此处提供的 PySpark 应用程序演示了如何验证和保护工作量,以及如何使其做好插入数据库的准备。您需要对其进行定制以满足特定的数据处理需求。

先决条件

在开始本教程之前,您必须完成将数据流 PySpark 应用程序连接到 Oracle Cloud Infrastructure 中的 Autonomous Database 教程。此教程展示如何创建包含本教程所需的 Python 和 Java 库的相关性档案。具体而言,您需要本教程中的 archive.zip 文件。

要完成此教程,您必须有权访问 Oracle Cloud Infrastructure 上的租户并有权创建架构图中所示的构件。

在本教程中,您将下载创建构件的 Terraform 代码,如体系结构图中所述。代码位于 GitHub 上的存储库中。您可以在本地开发计算机上运行 Terraform 代码,也可以从 Oracle Cloud Infrastructure 中的 Cloud Shell 运行代码。Cloud Shell 已安装 Terraform 和 Git。

如果使用的是本地计算机,则适用以下先决条件:

ADW 连接详细信息

要连接到 ADW 数据库,必须在连接参数中传递 wallet。Wallet 包含身份证明和其他详细信息,这些身份证明允许 bearer 连接到数据库。Wallet 的文件名通常采用 wallet_<database-name>.zip 格式,但可能具有不同的名称。Wallet 为您提供了与数据库的连接,但您的数据库权限由数据库用户名和密码确定。

本教程随附的 Terraform 会创建名为日志的始终免费的 Autonomous Data Warehouse 数据库。它将数据库的 wallet 存储在两个位置。一个位于对象存储中名为 Wallet 的存储桶内,另一个位于您运行 Terraform 代码的计算机目录中。在这两种情况下,Wallet 文件的名称都是 wallet_logs.zip

Terraform 在创建数据库时生成 ADMIN 密码并在完成后向您显示密码。

准备 Oracle Cloud Infrastructure 环境

在创建 PySpark 应用程序并运行 Terraform 之前,您需要对 OCI 租户执行一些初步配置。

您需要创建区间和专用于数据流应用程序的用户组。这样,您可以将数据流与租户中的其他用户隔离。

Terraform 可以创建区间和组,但在本教程中,您将自己创建它们。这样,您将练习导航和使用 OCI 控制台。

创建区间

创建一个区间,该区间仅用于包含与管道相关的对象、配置和其他资源。

  1. 以具有管理员权限的用户身份登录到 Oracle Cloud Infrastructure 控制台。
  2. 在控制台导航菜单中,选择 Identity,然后单击 Compartment
  3. 单击创建区间
  4. 输入 Dataflow 作为新区间的名称,输入适当的说明,并确保父区间是根区间。
  5. 单击创建区间
  6. 创建区间后,记录其 OCID。稍后修改 terraform.tfvars 文件时,将需要 OCID。

创建组

您需要创建两个组。一个组包含可以管理数据流的用户,另一个组包含可使用数据流但不允许管理数据流的用户。

  1. 创建组。
    1. 在控制台导航菜单中,选择 Identity 并单击 Groups(组)
    2. 单击创建组
    3. 输入 Dataflow_Admin_Group 作为组名,输入合适的说明,然后单击创建
  2. 重复上一步创建另一个名为 Dataflow_User_Group 的组。
  3. 将用户添加到 Dataflow_Admin_Group。单击 Dataflow_Admin_Group,然后单击将用户添加到组并从列表中选择您的用户。

记录您的对象存储名称空间

  1. 单击您的用户概要信息图标,然后在打开的菜单中单击您的租户名称。
  2. 在打开的页面中,找到对象存储名称空间的值,并创建该值的副本供以后使用。

生成 API 密钥

Terraform 使用 OCI API 在 OCI 中创建和管理基础设施。为此,它必须具有 API 密钥对的公共密钥和指纹。

要生成您自己的公共 / 私有密钥对:

  1. 单击您的用户概要信息图标,然后在打开的菜单中单击用户设置
  2. 在“用户详细信息”页面上,查看“资源”部分,然后单击 API 密钥
  3. 单击添加 API 密钥
  4. 在打开的 "Add API Key"(添加 API 密钥)面板中,单击 Download Private Key(下载私有密钥)
  5. 在“配置文件预览”中,复制文本框的内容并将其保存在方便的位置。稍后设置 Terraform 时需要此信息。
  6. 单击关闭
  7. 重命名下载的私有密钥文件。使用简单名称,例如 oci_api_key.pem

将数据库管理员密码添加到 Vault

必须为数据库 ADMIN 密码创建 Vault 密钥并记录 OCID。对于以后创建的 PySpark 应用程序中的 PASSWORD_SECRET_OCID 常量,需要使用此项。

目前,在 Vault 密钥中输入伪密码。这是因为您还不知道密码是什么。在 Terraform 完成后才知道密码。但您需要 Terraform 上载到对象存储的 PySpark 应用的密钥 OCID。因此,首先创建一个密钥,记录其 OCID,然后在 Terraform 完成后,您使用实际密码更新密钥。

  1. 转到 Security 并单击 Vault
  2. 在“List Scope(列表范围)”部分中,确保位于数据流区间中。
  3. 单击 Create Vault
  4. 在打开的面板中,在名称字段中输入数据流
  5. 单击 Create Vault
  6. 当 Vault 状态变为活动时,单击数据流以打开“ Vault 详细信息”页。
  7. 在 "Master Encryption"(主加密)部分中,单击 Create Key(创建密钥)
  8. 在 "Create Key"(创建密钥)面板中,在 Name(名称)字段中输入 Dataflow(数据流)
  9. 单击创建键
  10. 在“资源”部分中,单击机密
  11. 单击 Create Secret
  12. 在“创建密钥”对话框中,从“在区间中创建”列表中选择区间。
  13. 名称字段中,输入名称以标识密钥。请避免输入机密信息。
  14. 说明字段中,输入密钥的简要说明以帮助识别该密钥。请避免输入任何机密信息。
  15. Encryption Key(加密密钥)字段中,选择 Dataflow(数据流)
  16. 秘密类型模板字段中,选择纯文本
  17. 机密内容字段中,输入伪密码。
  18. 单击 Create Secret
  19. 面板关闭时,单击数据流密钥以打开详细信息页并复制 OCID。

创建 PySpark 应用程序

PySpark 应用程序创建一个 Spark 会话,从对象存储读取日志数据,将其转换为数据框架,然后将数据框架存储在 ADW 中的表中。

在创建 Spark 会话之前,您需要确保导入一些重要的模块,并将一些常量设置为稍后在应用程序中使用。

  1. 使用以下语句导入相关模块。

    import os
    import oci
    import base64
    import zipfile
    from urllib.parse import urlparse
    
    from pyspark import SparkConf
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import regexp_extract
    
  2. 这些是需要在 main() 函数中设置的常量。

    确保为 OBJECT_STORAGE_NAMESPACE 和 PASSWORD_SECRET_OCID 输入您自己的值。

    OBJECT_STORAGE_NAMESPACE = "YOUR-OBJECT-STORAGE-NAMESPACE"
    INPUT_PATH = "oci://data@{}/sample_logs.log".format(OBJECT_STORAGE_NAMESPACE)
    DATABASE_NAME = "logs"
    PASSWORD_SECRET_OCID = "ocid1.vaultsecret..... "
    WALLET_PATH = "oci://Wallet@{}/wallet_{}.zip".format(OBJECT_STORAGE_NAMESPACE,DATABASE_NAME)
    TNS_NAME = "{}_high".format(DATABASE_NAME)
    USER="ADMIN"
    TARGET_DATABASE_TABLE="processed_logs"
    
  3. 设置 Spark 会话并从对象存储加载数据。数据采用平面文本文件的形式,可将该文件加载到 Spark 数据框架中。

    spark_session = SparkSession.builder.appName("Dataflow").getOrCreate()
    input_df = spark_session.read.text(INPUT_PATH)
    input_df.show(5, truncate=False) # Some output for the Data Flow log file
    

    数据框架看起来应与以下样例类似:

    +----------------------------------------------------------------------------------+
    |value                                                                             |
    +----------------------------------------------------------------------------------+
    |10.0.0.1 - user1 [10/Mar/2021:13:55:36 -0700] "GET /index.html HTTP/1.0" 200 2326 |
    |10.0.0.2 - user2 [10/Mar/2021:14:55:36 -0700] "GET /file1.html HTTP/1.0" 200 9889 |
    |10.0.0.3 - user3 [10/Mar/2021:14:55:37 -0700] "GET /file2.html HTTP/1.0" 200 4242 |
    |10.0.0.4 - user4 [10/Mar/2021:14:56:36 -0700] "GET /file3.html HTTP/1.0" 200 10267|
    |10.0.0.1 - user1 [10/Mar/2021:15:05:36 -0700] "GET /file4.html HTTP/1.0" 200 15678|
    +----------------------------------------------------------------------------------+
    

    如您所见,数据由名为 value 的单个列组成,其中包含每个日志条目作为单个字符串。在将数据传输到数据库之前,数据框架必须具有多个列,日志文件中的每个字段各对应一列。

  4. 将数据框架拆分为多个列。

    在以下示例中,新数据帧中不包括 user-identifier 字段(通常为 "-")。

    hostname = r'(^\S+\.[\S+\.]+\S+)\s'
    user = r'\s+.+\s+(.+)\s+\['
    timestamp = r'\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]'
    method_uri_protocol = r'\"(\S+)\s(\S+)\s*(\S*)\"'
    status = r'\s(\d{3})\s'
    content_size = r'\s(\d+)$'
    
    logdata_df = input_df.select(
        regexp_extract('value', hostname, 1).alias('hostname'),
        regexp_extract('value', user, 1).alias('user'),
        regexp_extract('value', timestamp, 1).alias('timestamp'),
        regexp_extract('value', method_uri_protocol, 1).alias('method'),
        regexp_extract('value', method_uri_protocol, 2).alias('URI'),
        regexp_extract('value', method_uri_protocol, 3).alias('protocol'),
        regexp_extract('value', status, 1).cast('integer').alias('status'),
        regexp_extract('value', content_size, 1).cast('integer').alias('content_size'))
    

    您可以在其中应用一个或多个机器学习模型。例如,您可能需要先运行入侵检测工作量,然后再将日志存储在数据库中。

  5. 检索 Autonomous Data Warehouse 数据库的身份证明。

    本教程随附的 Terraform 代码将创建自治数据仓库并将 Wallet 文件存储在对象存储中。PySpark 应用程序需要检索 Wallet 并使用其信息连接到 ADW。

    # Get an Object Store client
    token_key = "spark.hadoop.fs.oci.client.auth.delegationTokenPath"
    token_path = spark_session.sparkContext.getConf().get(token_key)
    with open(token_path) as fd:
        delegation_token = fd.read()
    signer = oci.auth.signers.InstancePrincipalsDelegationTokenSigner(
        delegation_token=delegation_token)
    object_store_client = oci.object_storage.ObjectStorageClient(config={}, signer=signer)
    
    # Extract the wallet file location info
    split_url = urlparse(WALLET_PATH)
    bucket_name, namespace = split_url.netloc.split("@")
    file_name = split_url.path[1:]
    
    # Get the wallet from Object Storage.
    # The response contains the wallet and some metadata
    response = object_store_client.get_object(namespace, bucket_name, file_name)
    
    # Extract the wallet from response and store it in the Spark work-dir
    wallet_path = "/opt/spark/work-dir/"
    zip_file_path = os.path.join(wallet_path, "temp.zip")
    with open(zip_file_path, "wb") as fd:
        for chunk in response.data.raw.stream(1024 * 1024, decode_content=False):
            fd.write(chunk)
    with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
        zip_ref.extractall(wallet_path)
    
    # Extract the wallet contents and add the files to the Spark context
    contents = "cwallet.sso ewallet.p12 keystore.jks ojdbc.properties sqlnet.ora tnsnames.ora truststore.jks".split()
    for file in contents:
        spark.sparkContext.addFile(os.path.join(wallet_path, file))
    
    
  6. 将数据存储在 Autonomous Data Warehouse 数据库中。

    以下代码片段显示了如何连接到 ADW 并将数据框架写入目标表。如果表不存在,则会创建表。如果该表存在,则会由于 mode 参数中的 "overwrite" 选项而将其删除并重新创建。

    从 Oracle Cloud Infrastructure Vault 检索密码。运行 Terraform 代码后,将密码放在 Vault 中。在运行 Terraform 后面的步骤中介绍了在 Vault 中放置密码的内容。

    adw_url = "jdbc:oracle:thin:@{}?TNS_ADMIN={}".format(TNS_NAME, wallet_path)
    
    # Retrieve the database password for user 'ADMIN' from OCI Vault
    # The password is stored as base64 text, so it must be decoded
    with open(token_path) as fd:
        delegation_token = fd.read()
    signer = oci.auth.signers.InstancePrincipalsDelegationTokenSigner(
        delegation_token=delegation_token)
    secrets_client = oci.secrets.SecretsClient(config={}, signer=signer)
    response = secrets_client.get_secret_bundle(PASSWORD_SECRET_OCID)
    
    base64_password = response.data.secret_bundle_content.content
    base64_secret_bytes = base64_password.encode("ascii")
    base64_message_bytes = base64.b64decode(base64_secret_bytes)
    password = base64_message_bytes.decode("ascii")
    
    # Set up some properties for connecting to the database
    properties = {
        "driver": "oracle.jdbc.driver.OracleDriver",
        "oracle.net.tns_admin": TNS_NAME,
        "password": password,
        "user": USER,
    }
    
    # Write the dataframe to the database
    adw_url = "jdbc:oracle:thin:@{}?TNS_ADMIN={}".format(TNS_NAME, wallet_path)
    logdata_df.write.jdbc(url=adw_url, table=TARGET_DATABASE_TABLE, 
        mode="Overwrite", properties=properties)
    

使用名称 dataflow-app.py 保存 PySpark 应用程序。

下载并应用 Terraform 代码

Terraform 代码在 GitHub 上的 Oracle quickstart 系统信息库上可用。该代码将预配体系结构图中显示的组件。

要下载并应用代码,请执行以下操作:

  1. 转到方便的目录并克隆系统信息库。

    git clone https://github.com/oracle-quickstart/oci-arch-dataflow-store-analyze-data

  2. 将依赖性文件 archive.zip 复制到 oci-arch-dataflow-store-analyze-data 目录中。

    注意:如果您没有 archive.zip 文件,请参见先决条件部分以查看如何获取该文件。

  3. 转到 oci-arch-dataflow-store-analyze-data 目录。

    cd oci-arch-dataflow-store-analyze-data

  4. 找到 PySpark 应用程序文件 dataflow-app.py(这是一个存根文件),然后将其替换为您编写的文件。

  5. 在文本编辑器中打开 terraform.tfvars 文件。

  6. 更新 terraform.tfvarsobject_storage_namespace 的值。使用之前记录的值。

  7. 根据所使用的计算机的操作系统修改 env-vars.batenv-vars.sh。这些脚本用于设置 Terraform 使用的环境变量。

    在 Windows 上,env-vars.bat 文件应与以下样例类似。确保粘贴您自己的租户的值。

    @echo off
    set TF_VAR_tenancy_ocid=ocid1.tenancy.oc1..
    set TF_VAR_user_ocid=ocid1.user.oc1..
    set TF_VAR_compartment_ocid=ocid1.compartment.oc1..
    set TF_VAR_private_key_path=%HOMEPATH%\.ssh\oci_api_key.pem
    set TF_VAR_fingerprint=2a:b4:ef:9c:f1...
    set TF_VAR_region=ca-toronto-1
    

    在 Linux 和 Mac 上,env-vars.sh 文件应与以下样例类似。确保粘贴您自己的租户的值。

    export TF_VAR_tenancy_ocid=ocid1.tenancy.oc1..
    export TF_VAR_user_ocid=ocid1.user.oc1..
    export TF_VAR_compartment_ocid=ocid1.compartment.oc1..
    export TF_VAR_private_key_path=~/.ssh/oci_api_key.pem
    export TF_VAR_fingerprint=2a:b4:ef:9c:f1...
    export TF_VAR_region=ca-toronto-1
    

    使用您自己的租户和用户信息更新文件后,请确保设置了环境变量。

    在 Windows 上,运行以下命令:

    env-vars.bat
    

    在 Linux 或 Mac 上,运行以下命令:

    source env-vars.sh
    
  8. 在终端中,运行 init 命令

    terraform init

  9. 为 Oracle 基础设施云预配资源。

    terraform apply

    完成后,Terraform 应用程序将显示所创建数据库的 ADMIN 密码。

  10. 复制 Terraform 应用程序显示的密码。

  11. 验证是否已预配资源。

    1. 转到 OCI 控制台并打开导航菜单。
    2. 向下滚动到监管和管理部分,展开监管,然后单击租户浏览器
    3. 选择之前创建的数据流区间。

    几秒钟后,资源将显示在表中,您可以对其进行筛选和排序。

在 Vault 中更新 ADMIN 密码

以前,您向 OCI Vault 添加了伪密码,以获得其 OCID,以便在数据流中运行的 PySpark 应用程序中使用。现在,您需要将该口令更新为实际值。实际值在 Terraform 应用命令完成时显示在终端中。

  1. 在控制台导航菜单中,选择 Security,然后单击 Vault
  2. 单击先前创建的 Vault 的名称。
  3. 在打开的“ Vault 详细信息”页中,查看“资源”部分,然后单击秘密
  4. 单击之前创建的密钥的名称。
  5. 单击创建密钥版本
  6. 确保将秘密类型模板设置为纯文本,并将数据库 ADMIN 密码粘贴到秘密内容框中。
  7. 单击创建密钥版本

运行数据流应用程序

现在是运行 PySpark 应用程序的时候了。

  1. 在控制台导航菜单中,选择 Data Flow(数据流),然后单击 Applications(应用程序)
  2. 单击分析内部部署日志以打开“应用程序详细信息”页。
  3. 单击 Run(运行)以打开 "Run Python Application"(运行 Python 应用程序)面板。
  4. 单击运行

此时将打开“数据流运行”页面,您可以在其中监控运行进度。资源分配和启动一切所需的时间需要几分钟。运行完成时,单击运行的名称以打开“运行详细信息”页。

在 "Run Details"(运行详细信息)页面中,可以下载并检查生成的日志。如果由于任何原因运行失败,您可以检查日志以确定问题的来源。

致谢

了解更多

docs.oracle.com/learn 上浏览其他实验室,或者在 Oracle Learning YouTube 渠道上访问更多免费学习内容。此外,访问 education.oracle.com/learning-explorer 以成为 Oracle Learning Explorer。

有关产品文档,请访问 Oracle 帮助中心

更多学习资源

docs.oracle.com/learn 上浏览其他实验室,或者在 Oracle Learning YouTube 渠道上访问更多免费学习内容。此外,访问 education.oracle.com/learning-explorer 以成为 Oracle Learning Explorer。

有关产品文档,请访问 Oracle 帮助中心