注:
- 此教程需要访问 Oracle Cloud。要注册免费账户,请参阅开始使用 Oracle Cloud Infrastructure Free Tier 。
- 它使用 Oracle Cloud Infrastructure 身份证明、租户和区间示例值。完成实验室时,请将这些值替换为特定于云环境的那些值。
使用 Oracle Cloud Infrastructure Data Flow 处理 Autonomous Database 和 Kafka 中的大文件
简介
Oracle Cloud Infrastructure (OCI) 数据流是用于运行 Apache Spark TM 应用程序的完全托管服务。数据流用于处理大文件、流处理、数据库操作,您可以通过高度可扩展的处理构建大量应用程序。Apache Spark 可以扩展和使用群集计算机,通过最低配置来并行化作业。
使用 Apache Spark 作为托管服务(数据流),您可以添加许多可伸缩服务以乘以云处理的力量,本教程介绍了如何使用:
- 对象存储:文件存储库成本低且可扩展
- Autonomous Database:作为云中的可扩展数据库
- 流处理:作为可扩展的高 Kafka 托管服务

在本教程中,您可以看到用于处理大文件、查询数据库以及合并/联接数据以在内存中形成另一个表的最常见活动。您可以将此海量数据写入数据库,并在 Kafka 队列中写入高性能、低成本的数据。
目标
- 了解如何使用数据流处理大量数据
- 了解如何集成可扩展服务:文件存储库、数据库和队列
先决条件
-
一个可操作的 Oracle Cloud 租户:您可以在一个月内创建价值为 300.00 美元的免费 Oracle Cloud 账户,以尝试此教程。请参阅创建免费的 Oracle Cloud 账户
-
本地计算机上安装了 OCI CLI (Oracle Cloud 命令行界面):这是用于安装 OCI CLI 的链接。
-
安装在本地计算机中的 Apache Spark 应用程序。查看本地开发 Oracle Cloud Infrastructure Data Flow 应用程序、部署到云,了解如何在本地和数据流中开发。
注:这是要安装的官方页面:Apache Spark 。为每种类型的操作系统 (Linux/Mac OS/Windows) 安装 Apache Spark 有其他过程。
-
已安装 Spark 提交 CLI 。这是用于安装 Spark 提交 CLI 的链接。
-
本地计算机中安装了 Maven 。
-
了解 OCI 概念:
- 划分
- IAM 策略
- 租户
- 资源的 OCID
任务 1:创建对象存储结构
对象存储将用作默认文件存储库。您可以使用其他类型的文件资料档案库,但对象存储是一种简单且低成本的方法来处理具有性能的文件。在本教程中,两个应用程序都将从对象存储加载大型 CSV 文件,展示 Apache Spark 如何快速而智能地处理大量数据。
-
创建区间:区间对于组织和隔离云资源很重要。您可以按 IAM 策略隔离资源。
-
您可以使用此链接了解和设置区间的策略:管理区间
-
创建一个区间来托管本教程中 2 个应用程序的所有资源。创建名为 analytics 的区间。
-
转到 Oracle Cloud 主菜单并搜索:身份与安全性、区间。在“区间”部分中,单击创建区间并输入名称。

注:您需要向一组用户授予访问权限并包括您的用户。
-
单击创建区间以包括您的区间。
-
-
在对象存储中创建存储桶:存储桶是用于存储对象的逻辑容器,因此用于此演示的所有文件都将存储在此存储桶中。
-
转到 Oracle Cloud 主菜单并搜索存储和存储桶。在“Buckets(存储桶)”部分,选择之前创建的区间 (analytics)。

-
单击创建存储桶。创建 4 个存储桶:应用、数据、数据流日志、Wallet

-
输入包含这 4 个存储桶的存储桶名称信息,并使用默认选择维护其他参数。
-
对于每个存储桶,单击创建。您可以看到创建的存储桶。

-
注:查看存储桶的 IAM 策略。如果要在演示应用程序中使用这些存储桶,则必须设置策略。您可以在对象存储概览和 IAM 策略中了解这些概念和设置。
任务 2:创建 Autonomous Database
Oracle Cloud Autonomous Database 是 Oracle Database 的托管服务。对于本教程,应用程序将出于安全原因通过 Wallet 连接到数据库。
-
按如下所述实例化 Autonomous Database: Provision Autonomous Database 。
-
从 Oracle Cloud 主菜单中,选择数据仓库选项,选择 Oracle Database 和 Autonomous Data Warehouse ;选择区间 analytics ,然后按照教程创建数据库实例。
-
将实例命名为已处理日志,选择日志作为数据库名称,您无需更改应用程序中的任何代码。
-
输入 ADMIN 密码并下载 Wallet zip 文件。

-
创建数据库后,您可以设置 ADMIN 用户密码并下载 Wallet zip 文件。



-
保存 Wallet zip 文件 (
Wallet_logs.zip) 并注释您的 ADMIN 密码,您需要设置应用程序代码。 -
转至存储、存储桶。转到 analytics 区间,您将看到 Wallet 存储桶。单击它。
-
要上载 Wallet zip 文件,只需单击上载并附加 Wallet_logs.zip 文件。

注:在此处查看用于访问 Autonomous Database 的 IAM 策略:IAM Policy for Autonomous Database
任务 3:上载 CSV 示例文件
为了演示 Apache Spark 的强大功能,应用程序将读取包含 1,000,000 行的 CSV 文件。此数据将仅使用一个命令行插入 Autonomous Data Warehouse 数据库,并在 Kafka 流处理 (Oracle Cloud Streaming) 上发布。所有这些资源都可扩展且非常适合高数据量。
-
下载这 2 个链接并上载到数据存储桶:
-
注:
- organizations.csv 只有 100 行,仅用于测试本地计算机上的应用程序。
- organizations1M.csv 包含 1,000,000 行,用于在数据流实例上运行。
-
从 Oracle Cloud 主菜单中,转至存储和存储桶。单击数据存储桶并从上一步上载 2 个文件。


-
将辅助表上载到 ADW 数据库
-
下载此文件以上载到 ADW 数据库: GDP PER CAPTA COUNTRY.csv
-
从 Oracle Cloud 主菜单中,选择 Oracle Database 和 Autonomous Data Warehouse 。
-
单击已处理日志实例可查看详细信息。
-
单击数据库操作可转到数据库实用程序。

-
为 ADMIN 用户输入身份证明。

-
单击 SQL 选项可转至查询实用程序。

-
单击数据加载。

-
将 GDP PER CAPTA COUNTRY.csv 文件放置到控制台面板中,然后继续将数据导入表中。

-
您可以看到名为 GDPPERCAPTA 的新表已成功导入。

任务 4:为 ADW ADMIN 密码创建密钥储存库
出于安全原因,ADW ADMIN 密码将保存在 Vault 上。Oracle Cloud Vault 可以安全地托管此密码,并且可以通过 OCI 验证在您的应用中访问。
-
按以下文档所述在 Vault 中创建密钥:将数据库管理员密码添加到 Vault
-
在应用程序中创建名为 PASSWORD_SECRET_OCID 的变量并输入 OCID。



注:请在此处查看 OCI Vault 的 IAM 策略: OCI Vault IAM 策略。
任务 5:使用 Oracle Cloud 流处理服务创建 Kafka 流处理
Oracle Cloud Streaming 是一种类似于托管流处理服务的 Kafka。您可以使用 Kafka API 和常用 SDK 开发应用。在本教程中,您将创建流处理实例并配置为在两个应用程序中执行以发布和使用大量数据。
-
从 Oracle Cloud 主菜单中,转至 Analytics & AI 、 Streams 。
-
将区间更改为 analytics 。将在此区间中创建此演示中的每个资源。这样可以更安全、更轻松地控制 IAM。
-
单击创建流。

-
将名称输入为 kafka_like (例如),您可以使用默认值维护所有其他参数。

-
单击创建以初始化实例。
-
等待 Active(活动)状态。现在,您可以使用该实例。
注:在流创建过程中,您可以选择自动创建默认流池选项以自动创建默认池。
-
单击 DefaultPool 链接。

-
查看连接设置。


-
在下一步中根据需要添加此信息注释。
注:请在此处查看 OCI 流处理的 IAM 策略: OCI 流处理的 IAM 策略。
任务 6:生成 AUTH TOKEN 以访问 Kafka
您可以使用与 OCI IAM 上的用户关联的验证令牌来访问 Oracle Cloud 中的 OCI 流处理 (Kafka API) 和其他资源。在 Kafka 连接设置中,SASL 连接字符串有一个名为 password 的参数和一个 AUTH_TOKEN 值,如上一个任务中所述。要启用对 OCI 流处理的访问,您需要转到 OCI 控制台上的用户并创建 AUTH TOKEN。
-
从 Oracle Cloud 主菜单中,转至身份与安全性、用户。
注:请记住,创建 AUTH TOKEN 所需的用户是配置有 OCI CLI 的用户,以及直到现在为止创建的资源的所有 IAM 策略配置。资源包括:
- Oracle Cloud Autonomous Data Warehouse
- Oracle Cloud 流处理
- Oracle Object Storage
- Oracle Data Flow
-
单击您的用户名可查看详细信息。

-
单击控制台左侧的 Auth Tokens 选项,然后单击 Generate Token(生成令牌)。
注:标记将仅在此步骤中生成,在完成该步骤后将不可见。因此,请复制值并保存该值。如果丢失令牌值,则必须再次生成验证令牌。


任务 7:设置演示应用程序
本教程包含两个演示应用程序,我们将为其设置所需信息:
-
Java-CSV-DB:此应用程序将读取 1,000,000 行 csv 文件 (organizations1M.csv),并在与数据库 (Oracle Cloud Autonomous Data Warehouse) 和 Kafka 流处理 (Oracle Cloud Streaming) 集成的常见方案中执行一些常规流程。
此演示将说明如何在数据库中将 CSV 数据集与辅助表合并,以及跨类型的表生成内存中的第三个数据集。执行后,数据集将插入 ADW 并发布在 Kafka 流处理上。
-
JavaConsumeKafka:此应用程序将重复第一个应用程序的一些步骤,只是为了获取 CPU 和内存以进行大量处理。区别在于,第一个应用程序发布到 Kafka 流处理,而此应用程序从流处理读取。
-
使用以下链接下载应用程序:
-
在 Oracle Cloud 控制台中找到以下详细信息:
-
租户名称空间


-
密码密钥



-
流处理连接设置

-
验证令牌


-
-
打开下载的 zip 文件(
Java-CSV-DB.zip和JavaConsumeKafka.zip)。转到 /src/main/java/example 文件夹并查找 Example.java 代码。
-
这些变量需要随租户资源值一起更改。
变量名称 资源名称 信息标题 名称空间 租户名称空间 租户 OBJECT_STORAGE_NAMESPACE 租户名称空间 租户 PASSWORD_SECRET_OCID PASSWORD_SECRET_OCID OCID streamPoolId 流处理连接设置 ocid1.streampool.oc1.iad..... SASL 连接字符串中的值 kafkaUsername 流处理连接设置 SASL 连接字符串中 "" 内的用户名的值 kafkaPassword 验证令牌 该值仅显示在创建步骤中
-
注:为此教程创建的所有资源都位于 US-ASHBURN-1 区域中。检查要使用的区域。如果更改区域,则需要在 2 个代码文件中更改以下详细信息:
Example.java :更改 bootstrapServers 变量,将 "us-ashburn-1" 替换为新区域。
OboTokenClientConfigurator.java :使用新区域更改 CANONICAL_REGION_NAME 变量。
任务 8:了解 Java 代码
此教程是用 Java 创建的,此代码也可以移植到 Python。本教程分为两部分:
-
用于发布到 Kafka 流处理的应用 1
-
用于从 Kafka 流处理的应用 2
为了证明效率和可扩展性,开发了这两个应用程序,以在集成过程的常见用例中显示一些可能性。因此,两个应用程序的代码都显示以下示例:
-
读取包含 1,000,000 行的 CSV 文件
-
准备 ADW Wallet 以通过 JDBC 连接进行连接
-
将 1,000,000 行 CSV 数据插入 ADW 数据库
-
执行 SQL 句子查询 ADW 表
-
使用 ADW 数据集表对 CSV 数据集执行 SQL 句子联接
-
执行 CSV 数据集循环以演示数据的迭代
-
通过 Kafka 流处理实现运营
此演示可以在本地计算机中执行,并可以部署到数据流实例中,以作为作业执行运行。
注:对于数据流作业和本地计算机,使用 OCI CLI 配置访问 OCI 资源。在“数据流”端,所有内容都已预先配置,因此无需更改参数。在本地计算机端,安装 OCI CLI 并配置租户、用户和私有密钥来访问您的 OCI 资源。
让我们在以下部分中显示 Example.java 代码:
-
Apache Spark 初始化:此部分代码表示 Spark 初始化。执行执行进程的大多数配置都是自动配置的,因此使用 Spark 引擎非常容易。

-
读取多种格式的大文件:Apache Spark 引擎和 SDK 允许快速加载和写入文件格式。高容量可以在几秒甚至几毫秒内处理。因此,您可以在内存中执行 MERGE、FILTER、JOIN 数据集并处理不同的数据源。

-
阅读 ADW Vault 密钥:此部分代码访问您的 Vault,以获取 ADW 实例的密钥。

-
阅读
Wallet.zip文件以通过 JDBC 进行连接:本部分说明如何从对象存储加载Wallet.zip文件并配置 JDBC 驱动程序。

-
将 1,000,000 行 CSV 数据集插入 ADW 数据库:从 CSV 数据集中,可以直接将数据插入到 ADW 数据库中。Apache Spark 可利用集群、CPU 和内存的所有计算机功能优化执行,以获得最佳性能。

-
数据转换:想象加载许多 CSV 文件,在数据集中查询数据库中的某些表、JOIN、筛选器、消除列、计算和许多其他操作(仅限几分之一时间),并执行任何格式的写入操作。在此示例中,从 CSV 数据集和 ADW 数据库数据集创建了一个名为 oracleDF2 的新数据集。

-
在循环中使用数据集进行迭代:这是 CSV 数据集(1,000,000 行)的循环迭代示例。row 对象包含 CSV 字段结构的映射。因此,您可以获取每行的数据,并执行 API 调用和许多其他操作。

-
Kafka 操作:这是为使用 Kafka API 连接到 OCI 流处理的准备。
注意:Oracle Cloud 流处理与大多数 Kafka API 兼容。

-
配置连接参数后,代码将显示如何生成和使用流处理。


任务 9:使用 Maven 为您的应用打包
在 Apache Spark 中执行作业之前,必须使用 Maven 打包应用程序。Maven 是使用库和插件打包应用程序的最知名的实用程序之一。
注:
您可以执行快速测试,更改仅包含 100 行的 CSV 文件。为此,只需在 Example.java 文件中找到以下代码: private static String INPUT_PATH = "oci://data@" + OBJECT_STORAGE_NAMESPACE + "/organizations1M.csv";
将
organizations1M.csv替换为organizations.csv,执行速度将明显加快。
-
Java-CSV-DB 程序包
-
转到 /Java-CSV-DB 文件夹,然后执行以下命令:
mvn package -
您可以看到 Maven 正在开始打包。

-
如果一切正确,可以查看成功消息。

-
要在本地 Apache Spark 计算机中测试应用程序,请执行以下命令:
spark-submit --class example.Example target/loadadw-1.0-SNAPSHOT.jar
-
-
JavaConsumeKafka 程序包
-
转到 /JavaConsumeKafka 文件夹并执行以下命令:
mvn package -
您可以看到 Maven 正在开始打包。

-
如果一切正确,可以查看成功消息。

-
要在本地 Apache Spark 计算机中测试应用程序,请执行以下命令:
spark-submit --class example.Example target/loadkafka-1.0-SNAPSHOT.jar
-
任务 10:验证执行
-
确认 ADW 插入
-
转到 Oracle Cloud 主菜单,选择 Oracle Database 和 Autonomous Data Warehouse 。
-
单击已处理日志实例可查看详细信息。
-
单击数据库操作可转到数据库实用程序。

-
为 ADMIN 用户输入身份证明。

-
单击 SQL 选项可转至查询实用程序。

-
执行查询以查看表中的 1,000,000 行。

-
-
确认执行日志
-
如果作业可以访问和加载数据集,则可以在执行日志中查看。

-
任务 11:创建和执行数据流作业
现在,在本地 Apache Spark 计算机中成功运行两个应用后,可以将它们部署到租户中的 Oracle Cloud 数据流中。
-
从 Oracle Cloud 主菜单中,转至分析和 AI 和数据流。
-
在创建数据流应用程序之前,请确保选择分析区间。
-
单击创建应用程序。

-
如下图所示完成参数:

-
单击创建。
-
创建后,单击缩放演示链接可查看详细信息。
-
单击运行以执行作业。
-
确认参数,然后再次单击运行。

-
查看作业的状态,等待状态更改为成功,您可以看到结果。


后续步骤
第一个应用程序将数据发布到 Kafka 流处理。第二个应用程序从 Kafka 使用这些数据。
-
创建第一个数据流应用程序时,使用相同的步骤创建另一个数据流应用程序。
-
必须更改应用程序的名称,并将软件包从 loadadw-1.0-SNAPSHOT.jar 更改为 loadkafka-1.0-SNAPSHOT.jar 。
-
可以保留其他参数,使其与第一个数据流应用程序相同,然后运行作业。
相关链接
确认
- 作者 - Cristiano Hoshikawa(LAD A-Team 解决方案工程师)
更多学习资源
探索 docs.oracle.com/learn 上的其他实验室,或者访问 Oracle Learning YouTube 频道上的更多免费学习内容。此外,请访问 education.oracle.com/learning-explorer 成为 Oracle Learning Explorer。
有关产品文档,请访问 Oracle 帮助中心。
Process large files in Autonomous Database and Kafka with Oracle Cloud Infrastructure Data Flow
F79141-01
March 2023
Copyright © 2023, Oracle and/or its affiliates.