设置数据流以处理 Oracle Cloud Infrastructure 中的内部部署日志
简介
在此教程中,您将预配云基础设施并部署 PySpark 代码,以便在云中自动上载、处理和存储内部部署日志数据。这些数据存储在对象存储中,在数据流中处理,并存储在 Autonomous Data Warehouse 中进行分析。
下表显示了体系结构:

您可以创建 PySpark 应用程序,然后使用为此目的设计的 Terraform 脚本预配 Oracle Cloud Infrastructure (OCI)。您可以从 Oracle 的 GitHub 资料档案库下载脚本,并进行少量编辑来更新某些变量值。然后应用 Terraform 脚本。
目标
完成本教程后,您将有一个工作系统从对象存储中提取数据,使用 PySpark 应用在数据流中处理数据,并将结果存储在 Autonomous Data Warehouse 中。
此处提供的 PySpark 应用程序演示了如何验证和保护工作量,以及如何使其做好插入数据库的准备。您需要对其进行定制以满足特定的数据处理需求。
先决条件
在开始本教程之前,您必须完成将数据流 PySpark 应用程序连接到 Oracle Cloud Infrastructure 中的 Autonomous Database 教程。此教程展示如何创建包含本教程所需的 Python 和 Java 库的相关性档案。具体而言,您需要本教程中的 archive.zip 文件。
要完成此教程,您必须有权访问 Oracle Cloud Infrastructure 上的租户并有权创建架构图中所示的构件。
在本教程中,您将下载创建构件的 Terraform 代码,如体系结构图中所述。代码位于 GitHub 上的存储库中。您可以在本地开发计算机上运行 Terraform 代码,也可以从 Oracle Cloud Infrastructure 中的 Cloud Shell 运行代码。Cloud Shell 已安装 Terraform 和 Git。
如果使用的是本地计算机,则适用以下先决条件:
- Git 安装在开发计算机上。这用于克隆 Terraform 代码资料档案库。
- 您的开发计算机上安装了 Terraform。这用于在 OCI 上创建基础设施。
- 与 OCI 租户的网络连接。
ADW 连接详细信息
要连接到 ADW 数据库,必须在连接参数中传递 wallet。Wallet 包含身份证明和其他详细信息,这些身份证明允许 bearer 连接到数据库。Wallet 的文件名通常采用 wallet_<database-name>.zip 格式,但可能具有不同的名称。Wallet 为您提供了与数据库的连接,但您的数据库权限由数据库用户名和密码确定。
本教程随附的 Terraform 会创建名为日志的始终免费的 Autonomous Data Warehouse 数据库。它将数据库的 wallet 存储在两个位置。一个位于对象存储中名为 Wallet 的存储桶内,另一个位于您运行 Terraform 代码的计算机目录中。在这两种情况下,Wallet 文件的名称都是 wallet_logs.zip。
Terraform 在创建数据库时生成 ADMIN 密码并在完成后向您显示密码。
准备 Oracle Cloud Infrastructure 环境
在创建 PySpark 应用程序并运行 Terraform 之前,您需要对 OCI 租户执行一些初步配置。
您需要创建区间和专用于数据流应用程序的用户组。这样,您可以将数据流与租户中的其他用户隔离。
Terraform 可以创建区间和组,但在本教程中,您将自己创建它们。这样,您将练习导航和使用 OCI 控制台。
创建区间
创建一个区间,该区间仅用于包含与管道相关的对象、配置和其他资源。
- 以具有管理员权限的用户身份登录到 Oracle Cloud Infrastructure 控制台。
- 在控制台导航菜单中,选择 Identity,然后单击 Compartment。
- 单击创建区间。
- 输入
Dataflow作为新区间的名称,输入适当的说明,并确保父区间是根区间。 - 单击创建区间。
- 创建区间后,记录其 OCID。稍后修改
terraform.tfvars文件时,将需要 OCID。
创建组
您需要创建两个组。一个组包含可以管理数据流的用户,另一个组包含可使用数据流但不允许管理数据流的用户。
- 创建组。
- 在控制台导航菜单中,选择 Identity 并单击 Groups(组)。
- 单击创建组。
- 输入
Dataflow_Admin_Group作为组名,输入合适的说明,然后单击创建。
- 重复上一步创建另一个名为
Dataflow_User_Group的组。 - 将用户添加到 Dataflow_Admin_Group。单击 Dataflow_Admin_Group,然后单击将用户添加到组并从列表中选择您的用户。
记录您的对象存储名称空间
- 单击您的用户概要信息图标,然后在打开的菜单中单击您的租户名称。
- 在打开的页面中,找到对象存储名称空间的值,并创建该值的副本供以后使用。
生成 API 密钥
Terraform 使用 OCI API 在 OCI 中创建和管理基础设施。为此,它必须具有 API 密钥对的公共密钥和指纹。
要生成您自己的公共 / 私有密钥对:
- 单击您的用户概要信息图标,然后在打开的菜单中单击用户设置。
- 在“用户详细信息”页面上,查看“资源”部分,然后单击 API 密钥。
- 单击添加 API 密钥。
- 在打开的 "Add API Key"(添加 API 密钥)面板中,单击 Download Private Key(下载私有密钥)。
- 在“配置文件预览”中,复制文本框的内容并将其保存在方便的位置。稍后设置 Terraform 时需要此信息。
- 单击关闭。
- 重命名下载的私有密钥文件。使用简单名称,例如
oci_api_key.pem。
将数据库管理员密码添加到 Vault
必须为数据库 ADMIN 密码创建 Vault 密钥并记录 OCID。对于以后创建的 PySpark 应用程序中的 PASSWORD_SECRET_OCID 常量,需要使用此项。
目前,在 Vault 密钥中输入伪密码。这是因为您还不知道密码是什么。在 Terraform 完成后才知道密码。但您需要 Terraform 上载到对象存储的 PySpark 应用的密钥 OCID。因此,首先创建一个密钥,记录其 OCID,然后在 Terraform 完成后,您使用实际密码更新密钥。
- 转到 Security 并单击 Vault。
- 在“List Scope(列表范围)”部分中,确保位于数据流区间中。
- 单击 Create Vault。
- 在打开的面板中,在名称字段中输入数据流。
- 单击 Create Vault。
- 当 Vault 状态变为活动时,单击数据流以打开“ Vault 详细信息”页。
- 在 "Master Encryption"(主加密)部分中,单击 Create Key(创建密钥)。
- 在 "Create Key"(创建密钥)面板中,在 Name(名称)字段中输入 Dataflow(数据流)。
- 单击创建键。
- 在“资源”部分中,单击机密。
- 单击 Create Secret。
- 在“创建密钥”对话框中,从“在区间中创建”列表中选择区间。
- 在名称字段中,输入名称以标识密钥。请避免输入机密信息。
- 在说明字段中,输入密钥的简要说明以帮助识别该密钥。请避免输入任何机密信息。
- 在 Encryption Key(加密密钥)字段中,选择 Dataflow(数据流)。
- 在秘密类型模板字段中,选择纯文本。
- 在机密内容字段中,输入伪密码。
- 单击 Create Secret。
- 面板关闭时,单击数据流密钥以打开详细信息页并复制 OCID。
创建 PySpark 应用程序
PySpark 应用程序创建一个 Spark 会话,从对象存储读取日志数据,将其转换为数据框架,然后将数据框架存储在 ADW 中的表中。
在创建 Spark 会话之前,您需要确保导入一些重要的模块,并将一些常量设置为稍后在应用程序中使用。
-
使用以下语句导入相关模块。
import os import oci import base64 import zipfile from urllib.parse import urlparse from pyspark import SparkConf from pyspark.sql import SparkSession from pyspark.sql.functions import regexp_extract -
这些是需要在
main()函数中设置的常量。确保为 OBJECT_STORAGE_NAMESPACE 和 PASSWORD_SECRET_OCID 输入您自己的值。
OBJECT_STORAGE_NAMESPACE = "YOUR-OBJECT-STORAGE-NAMESPACE" INPUT_PATH = "oci://data@{}/sample_logs.log".format(OBJECT_STORAGE_NAMESPACE) DATABASE_NAME = "logs" PASSWORD_SECRET_OCID = "ocid1.vaultsecret..... " WALLET_PATH = "oci://Wallet@{}/wallet_{}.zip".format(OBJECT_STORAGE_NAMESPACE,DATABASE_NAME) TNS_NAME = "{}_high".format(DATABASE_NAME) USER="ADMIN" TARGET_DATABASE_TABLE="processed_logs" -
设置 Spark 会话并从对象存储加载数据。数据采用平面文本文件的形式,可将该文件加载到 Spark 数据框架中。
spark_session = SparkSession.builder.appName("Dataflow").getOrCreate() input_df = spark_session.read.text(INPUT_PATH) input_df.show(5, truncate=False) # Some output for the Data Flow log file数据框架看起来应与以下样例类似:
+----------------------------------------------------------------------------------+ |value | +----------------------------------------------------------------------------------+ |10.0.0.1 - user1 [10/Mar/2021:13:55:36 -0700] "GET /index.html HTTP/1.0" 200 2326 | |10.0.0.2 - user2 [10/Mar/2021:14:55:36 -0700] "GET /file1.html HTTP/1.0" 200 9889 | |10.0.0.3 - user3 [10/Mar/2021:14:55:37 -0700] "GET /file2.html HTTP/1.0" 200 4242 | |10.0.0.4 - user4 [10/Mar/2021:14:56:36 -0700] "GET /file3.html HTTP/1.0" 200 10267| |10.0.0.1 - user1 [10/Mar/2021:15:05:36 -0700] "GET /file4.html HTTP/1.0" 200 15678| +----------------------------------------------------------------------------------+如您所见,数据由名为
value的单个列组成,其中包含每个日志条目作为单个字符串。在将数据传输到数据库之前,数据框架必须具有多个列,日志文件中的每个字段各对应一列。 -
将数据框架拆分为多个列。
在以下示例中,新数据帧中不包括 user-identifier 字段(通常为 "-")。
hostname = r'(^\S+\.[\S+\.]+\S+)\s' user = r'\s+.+\s+(.+)\s+\[' timestamp = r'\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]' method_uri_protocol = r'\"(\S+)\s(\S+)\s*(\S*)\"' status = r'\s(\d{3})\s' content_size = r'\s(\d+)$' logdata_df = input_df.select( regexp_extract('value', hostname, 1).alias('hostname'), regexp_extract('value', user, 1).alias('user'), regexp_extract('value', timestamp, 1).alias('timestamp'), regexp_extract('value', method_uri_protocol, 1).alias('method'), regexp_extract('value', method_uri_protocol, 2).alias('URI'), regexp_extract('value', method_uri_protocol, 3).alias('protocol'), regexp_extract('value', status, 1).cast('integer').alias('status'), regexp_extract('value', content_size, 1).cast('integer').alias('content_size'))您可以在其中应用一个或多个机器学习模型。例如,您可能需要先运行入侵检测工作量,然后再将日志存储在数据库中。
-
检索 Autonomous Data Warehouse 数据库的身份证明。
本教程随附的 Terraform 代码将创建自治数据仓库并将 Wallet 文件存储在对象存储中。PySpark 应用程序需要检索 Wallet 并使用其信息连接到 ADW。
# Get an Object Store client token_key = "spark.hadoop.fs.oci.client.auth.delegationTokenPath" token_path = spark_session.sparkContext.getConf().get(token_key) with open(token_path) as fd: delegation_token = fd.read() signer = oci.auth.signers.InstancePrincipalsDelegationTokenSigner( delegation_token=delegation_token) object_store_client = oci.object_storage.ObjectStorageClient(config={}, signer=signer) # Extract the wallet file location info split_url = urlparse(WALLET_PATH) bucket_name, namespace = split_url.netloc.split("@") file_name = split_url.path[1:] # Get the wallet from Object Storage. # The response contains the wallet and some metadata response = object_store_client.get_object(namespace, bucket_name, file_name) # Extract the wallet from response and store it in the Spark work-dir wallet_path = "/opt/spark/work-dir/" zip_file_path = os.path.join(wallet_path, "temp.zip") with open(zip_file_path, "wb") as fd: for chunk in response.data.raw.stream(1024 * 1024, decode_content=False): fd.write(chunk) with zipfile.ZipFile(zip_file_path, "r") as zip_ref: zip_ref.extractall(wallet_path) # Extract the wallet contents and add the files to the Spark context contents = "cwallet.sso ewallet.p12 keystore.jks ojdbc.properties sqlnet.ora tnsnames.ora truststore.jks".split() for file in contents: spark.sparkContext.addFile(os.path.join(wallet_path, file)) -
将数据存储在 Autonomous Data Warehouse 数据库中。
以下代码片段显示了如何连接到 ADW 并将数据框架写入目标表。如果表不存在,则会创建表。如果该表存在,则会由于 mode 参数中的 "overwrite" 选项而将其删除并重新创建。
从 Oracle Cloud Infrastructure Vault 检索密码。运行 Terraform 代码后,将密码放在 Vault 中。在运行 Terraform 后面的步骤中介绍了在 Vault 中放置密码的内容。
adw_url = "jdbc:oracle:thin:@{}?TNS_ADMIN={}".format(TNS_NAME, wallet_path) # Retrieve the database password for user 'ADMIN' from OCI Vault # The password is stored as base64 text, so it must be decoded with open(token_path) as fd: delegation_token = fd.read() signer = oci.auth.signers.InstancePrincipalsDelegationTokenSigner( delegation_token=delegation_token) secrets_client = oci.secrets.SecretsClient(config={}, signer=signer) response = secrets_client.get_secret_bundle(PASSWORD_SECRET_OCID) base64_password = response.data.secret_bundle_content.content base64_secret_bytes = base64_password.encode("ascii") base64_message_bytes = base64.b64decode(base64_secret_bytes) password = base64_message_bytes.decode("ascii") # Set up some properties for connecting to the database properties = { "driver": "oracle.jdbc.driver.OracleDriver", "oracle.net.tns_admin": TNS_NAME, "password": password, "user": USER, } # Write the dataframe to the database adw_url = "jdbc:oracle:thin:@{}?TNS_ADMIN={}".format(TNS_NAME, wallet_path) logdata_df.write.jdbc(url=adw_url, table=TARGET_DATABASE_TABLE, mode="Overwrite", properties=properties)
使用名称 dataflow-app.py 保存 PySpark 应用程序。
下载并应用 Terraform 代码
Terraform 代码在 GitHub 上的 Oracle quickstart 系统信息库上可用。该代码将预配体系结构图中显示的组件。
要下载并应用代码,请执行以下操作:
-
转到方便的目录并克隆系统信息库。
git clone https://github.com/oracle-quickstart/oci-arch-dataflow-store-analyze-data -
将依赖性文件
archive.zip复制到oci-arch-dataflow-store-analyze-data目录中。注意:如果您没有
archive.zip文件,请参见先决条件部分以查看如何获取该文件。 -
转到
oci-arch-dataflow-store-analyze-data目录。cd oci-arch-dataflow-store-analyze-data -
找到 PySpark 应用程序文件
dataflow-app.py(这是一个存根文件),然后将其替换为您编写的文件。 -
在文本编辑器中打开
terraform.tfvars文件。 -
更新
terraform.tfvars中 object_storage_namespace 的值。使用之前记录的值。 -
根据所使用的计算机的操作系统修改
env-vars.bat或env-vars.sh。这些脚本用于设置 Terraform 使用的环境变量。在 Windows 上,
env-vars.bat文件应与以下样例类似。确保粘贴您自己的租户的值。@echo off set TF_VAR_tenancy_ocid=ocid1.tenancy.oc1.. set TF_VAR_user_ocid=ocid1.user.oc1.. set TF_VAR_compartment_ocid=ocid1.compartment.oc1.. set TF_VAR_private_key_path=%HOMEPATH%\.ssh\oci_api_key.pem set TF_VAR_fingerprint=2a:b4:ef:9c:f1... set TF_VAR_region=ca-toronto-1在 Linux 和 Mac 上,
env-vars.sh文件应与以下样例类似。确保粘贴您自己的租户的值。export TF_VAR_tenancy_ocid=ocid1.tenancy.oc1.. export TF_VAR_user_ocid=ocid1.user.oc1.. export TF_VAR_compartment_ocid=ocid1.compartment.oc1.. export TF_VAR_private_key_path=~/.ssh/oci_api_key.pem export TF_VAR_fingerprint=2a:b4:ef:9c:f1... export TF_VAR_region=ca-toronto-1使用您自己的租户和用户信息更新文件后,请确保设置了环境变量。
在 Windows 上,运行以下命令:
env-vars.bat在 Linux 或 Mac 上,运行以下命令:
source env-vars.sh -
在终端中,运行 init 命令
terraform init -
为 Oracle 基础设施云预配资源。
terraform apply完成后,Terraform 应用程序将显示所创建数据库的 ADMIN 密码。
-
复制 Terraform 应用程序显示的密码。
-
验证是否已预配资源。
- 转到 OCI 控制台并打开导航菜单。
- 向下滚动到监管和管理部分,展开监管,然后单击租户浏览器。
- 选择之前创建的数据流区间。
几秒钟后,资源将显示在表中,您可以对其进行筛选和排序。
在 Vault 中更新 ADMIN 密码
以前,您向 OCI Vault 添加了伪密码,以获得其 OCID,以便在数据流中运行的 PySpark 应用程序中使用。现在,您需要将该口令更新为实际值。实际值在 Terraform 应用命令完成时显示在终端中。
- 在控制台导航菜单中,选择 Security,然后单击 Vault。
- 单击先前创建的 Vault 的名称。
- 在打开的“ Vault 详细信息”页中,查看“资源”部分,然后单击秘密。
- 单击之前创建的密钥的名称。
- 单击创建密钥版本。
- 确保将秘密类型模板设置为纯文本,并将数据库 ADMIN 密码粘贴到秘密内容框中。
- 单击创建密钥版本。
运行数据流应用程序
现在是运行 PySpark 应用程序的时候了。
- 在控制台导航菜单中,选择 Data Flow(数据流),然后单击 Applications(应用程序)。
- 单击分析内部部署日志以打开“应用程序详细信息”页。
- 单击 Run(运行)以打开 "Run Python Application"(运行 Python 应用程序)面板。
- 单击运行。
此时将打开“数据流运行”页面,您可以在其中监控运行进度。资源分配和启动一切所需的时间需要几分钟。运行完成时,单击运行的名称以打开“运行详细信息”页。
在 "Run Details"(运行详细信息)页面中,可以下载并检查生成的日志。如果由于任何原因运行失败,您可以检查日志以确定问题的来源。
致谢
- 作者 - Jeff Schering(用户帮助开发人员)
- 内容提供者 - Prashant Jha(产品管理总监、OCI 大数据开发)
了解更多
在 docs.oracle.com/learn 上浏览其他实验室,或者在 Oracle Learning YouTube 渠道上访问更多免费学习内容。此外,访问 education.oracle.com/learning-explorer 以成为 Oracle Learning Explorer。
有关产品文档,请访问 Oracle 帮助中心。
更多学习资源
在 docs.oracle.com/learn 上浏览其他实验室,或者在 Oracle Learning YouTube 渠道上访问更多免费学习内容。此外,访问 education.oracle.com/learning-explorer 以成为 Oracle Learning Explorer。
有关产品文档,请访问 Oracle 帮助中心。
Store and analyze your on-premises logs in Oracle Cloud Infrastructure
F52099-01
January 2022
Copyright © 2022, Oracle and/or its affiliates.