注意:
- 此教程需要访问 Oracle Cloud。要注册免费账户,请参阅开始使用 Oracle Cloud Infrastructure Free Tier 。
- 它使用 Oracle Cloud Infrastructure 身份证明、租户和区间示例值。完成实验室时,请将这些值替换为特定于云环境的值。
使用 Oracle Cloud Infrastructure 大数据服务构建实时数据管道
简介
在当今数据驱动的世界中,企业依靠高效的数据管道来实时捕获、处理和分析数据。Oracle Cloud Infrastructure (OCI) 提供了一个强大的服务生态系统,可用于构建强大的数据管道。在本教程中,我们将指导您创建利用 Oracle Cloud Infrastructure 大数据服务 (OCI BDS) 的数据管道。OCI BDS 是 Oracle 提供的基于云的服务,支持用户创建和管理 Hadoop 集群、Spark 集群和其他大数据服务。我们将利用其生态系统的一部分,包括 Kafka,Flink,Schema Registry 和 Trino 来构建此管道。
在本教程中,我们将提供详细的说明和代码示例,以确保数据管道部署无缝且安全。
目标
-
设置 OCI 资源:了解如何配置 OCI 资源,包括 BDS 集群、Kafka、Flink、模式注册表、OCI 对象存储和 Trino,从而简化数据管道。
-
构建 Avro 生成器 (Java):创建 Java 应用程序,以便在使用方案注册表中的 Avro 方案时将 Avro 数据生成到 Kafka 主题。
-
构建 Flink 使用者 (Java):为 Flink 使用者设置 BDS 集群,该使用者使用来自 Kafka 的数据,对其进行处理并将其存储在对象存储中。
-
部署和运行应用程序:了解如何在 BDS 集群上部署和运行 Avro 生产者和 Flink 使用者应用程序。
-
查询精选数据:使用各种 OCI 工具(包括 Trino、BDS Cloud SQL、Query Service、OAC 等)访问 OCI 对象存储中的精选数据。我们将使用 Trino 查询 Avro 数据。
先决条件
-
OCI 账户。
-
访问 OCI 服务,包括 OCI BDS 和 OCI 对象存储。
-
了解 Java 和 Apache Flink。
-
已安装必要的 OCI 命令行界面 (Command Line Interface,CLI) 工具和 SDK。
-
对 Avro 和 Kafka 的理解
任务 1:在启用 Kafka/Flink/方案注册表/Trino 的情况下创建 Oracle Big Data 集群
-
在 OCI 上创建 BDS (ODH 2.0) 环境。要创建 BDS 集群,请参见 Get Started with a Non-Highly Available ODH Big Data Cluster 。
-
使用以下 URL 登录到 Ambari:
https://<cluster name>un0-hostname>:7183/
以添加 Kafka、Flink、模式注册表和 Trino 服务。请输入用户名和口令,然后单击登录。 -
单击服务旁边的省略号 ( ... ),然后选择添加服务。
-
选中 Kafka 、 Flink 、 schema registry 和 Trino ,然后单击 Next 。
-
如果看到以下建议的配置,请单击 PROCEED ANYWAY ,然后单击 DEPLOY 。
-
安装后,单击下一步,然后单击完成。
-
重新启动所有受影响的组件。单击服务旁边的省略号 ( ... ),选择重新启动所有必需项,然后在重新启动后单击确定。
-
通过
SSH
命令或使用opc
用户凭证将 putty 与ppk
文件一起登录到 Oracle Big Data Service 集群主节点。登录后,将您的权限提升到root
用户。我们使用 putty 登录到节点。sudo su -
-
执行以下步骤上载 jar 文件,以便使用 Flink 访问 Kafka。
-
将
lib.zip
下载到任何目录(例如/tmp
)并解压缩它。cd /tmp wget https://objectstorage.ap-tokyo-1.oraclecloud.com/p/bhsTSKQYGjbeGwm5JW63Re3_o9o2JOrKiQVi_-6hrQfrB7lGSvA1z5RPyDLy3lpU/n/sehubjapacprod/b/live-lab/o/download/lib.zip #unzip lib unzip /tmp/lib.zip
-
执行以下命令将 jar 复制到所有 BDS 节点。
dcli -C "cp /usr/odh/current/flink/connectors/*kafka*.jar /usr/odh/current/flink/lib" dcli -C "cp /usr/lib/flink/connectors/flink-connector-jdbc-1.15.2.jar /usr/odh/current/flink/lib" dcli -C "cp /usr/odh/current/kafka-broker/libs/kafka-clients-3.2.0.jar /usr/odh/current/flink/lib" su - hdfs hadoop fs -put /usr/odh/current/flink/connectors/*.jar /flink/lib/flink-libs/ hadoop fs -put /usr/odh/current/kafka-broker/libs/kafka-clients-3.2.0.jar /flink/lib/flink-libs/ hadoop fs -put /tmp/lib/mysql-connector-java-8.0.28.jar /flink/lib/flink-libs/
-
-
使用以下命令登录到 Trino 并创建 Flink 使用者将写入 Avro 数据的方案和表。
For HA Cluster /usr/lib/trino/bin/trino-cli --server <cordinator dns>:7778 --krb5-principal <username> --krb5-keytab-path <path to key tab> trino.service.keytab --krb5-remote-service-name trino --user <username> --truststore-path=/etc/security/serverKeys/truststore.jks For NON HA Cluster /usr/lib/trino/bin/trino-cli --server <cordinator dns>:8285 Create Schema CREATE SCHEMA <catalog>.<schema name> WITH ( LOCATION = '<object store>/<folder>'); Create table on SCHEMA CREATE TABLE <catalog>.<schema name>.<table name> ( name varchar, lastname varchar, age int, email varchar, timestamp bigint ) WITH ( format = 'AVRO', avro_schema_url = '<object store>/<avro schema file name>' )
注:按照步骤配置 Hive Metastore 以使用
SerdeStorageSchemaReader
实现。Trino 需要从 OCI 对象存储中查询数据。 -
转至 Ambari 、Hive 、 Config 和 Custom hive-site 。设置以下属性以配置 Hive Metastore:
metastore.storage.schema.reader.impl=org.apache.hadoop.hive.metastore.SerDeStorageSchemaReader
。 -
创建 OCI 对象存储桶以存储输出。
任务 2:使用 POM 相关项构建 Java Maven 项目
要开始开发实时数据管道,让我们设置一个具有以下必需依赖项的 Java Maven 项目。您可以选择任何适合您的首选项的 IDE 或项目类型。
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.15.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>3.2.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.15.2</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.15.2</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/schema-registry-serdes -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>schema-registry-serdes</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/schema-registry-client -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>schema-registry-client</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/schema-registry-common -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>schema-registry-common</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
<version>2.0.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/jersey-shaded -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>jersey-shaded</artifactId>
<version>0.9.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/common-auth -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>common-auth</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/registry-common-client -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>registry-common-client</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-guava -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-guava</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-joda -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-joda</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.module/jackson-module-parameter-names -->
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-parameter-names</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jdk8 -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jdk8</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jsr310 -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-json-org -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-json-org</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-annotations -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>1.15.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro-cloudera-registry -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro-cloudera-registry</artifactId>
<version>1.15.1-csa1.8.0.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-redis -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.10</artifactId>
<version>1.1.5-hadoop1</version>
</dependency>
<dependency>
<groupId>com.oracle.oci.sdk</groupId>
<artifactId>oci-java-sdk-redis</artifactId>
<version>3.26.0</version>
</dependency>
</dependencies>
任务 3:使用示例代码和 avsc
文件构建 Java Kafka Avro 生成器
使用提供的示例代码和 Avro 方案,为您的数据管道构建 Java Kafka Avro 生成器。替换占位符(如 your_kafka_bootstrap_servers
、your_schema_registry_url
、your_kafka_topic
)并使用您的特定值和结构调整 Avro 方案。
此代码将作为 Avro 生产商的基础,使其能够将 Avro 数据生成到实时数据管道中的 Kafka 主题。
package org.example;
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.StringSerializer;
import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Properties;
public class KafkaAvroProducer {
public static void main(String[] args) {
String schemaFileName = "ItemTransaction.avsc"; // Provide the path to your Avro schema file
String topicName = "kafka-flink-avro-1"; // Provide the Kafka topic name
try {
//Map<String, Object> config = new HashMap<>();
// Load producer properties
Properties producerConfig = new Properties();
//Map<String, Object> producerConfig = new HashMap<>();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<Kafka server IP/URL>:<Port>");
producerConfig.putAll(Collections.singletonMap(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), "http://<Schema Registry IP/URL>:<Port>/api/v1"));
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
// Create a Kafka producer
Producer<String, GenericRecord> producer = new KafkaProducer<>(producerConfig);
// Load Avro schema from resources
Schema schema = loadAvroSchemaFromResources(schemaFileName);
// Create a sample Avro record
while (true) {
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("name", "My Name");
avroRecord.put("lastname", "My Last Name");
avroRecord.put("age", 10);
avroRecord.put("email", "a@mail.com");
avroRecord.put("timestamp", System.currentTimeMillis());
// Create a Kafka record with a topic and Avro payload
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(topicName, avroRecord);
// Send the message
producer.send(record);
// Sleep for 3 seconds before sending the next message
Thread.sleep(3000);
}
//producer.close();
} catch (Exception e) {
e.printStackTrace();
}
}
private static Schema loadAvroSchemaFromResources(String schemaFileName) throws IOException {
try (InputStream inputStream = KafkaAvroProducer.class.getClassLoader().getResourceAsStream(schemaFileName)) {
if (inputStream == null) {
throw new IllegalArgumentException("Schema file not found in resources: " + schemaFileName);
}
return new Schema.Parser().parse(inputStream);
}
}
}
架构文件 ItemTransaction.avsc
定义的 Avro 消息格式如下所示,存储在项目的 resources
文件夹中。需要强调的是,对于 Avro 序列化和反序列化,需要生成相应的 Java Avro 特定 POJO 类,名为 ItemTransaction.java
,以便有效处理 Avro 格式。
{
"type" : "record",
"name" : "ItemTransaction",
"namespace": "org.example",
"fields" : [
{ "name" : "name" , "type" : "string" },
{ "name" : "lastname" , "type" : "string" },
{ "name" : "age" , "type" : "int" },
{ "name" : "email" , "type" : "string" },
{ "name" : "timestamp" , "type" : "long" }
]
}
任务 4:使用示例代码构建 Java Flink 使用者
我们将使用提供的示例代码为您的数据管道构建 Java Flink 使用者。将 your_kafka_bootstrap_servers
、your_consumer_group_id
和 your_kafka_topic
等占位符替换为特定的 Kafka 设置详细信息。
此示例代码在 Flink 环境中设置 Flink Kafka 使用者,允许您处理传入的 Kafka 流。根据数据管道的需要定制处理逻辑。
package org.example;
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.AvroOutputFormat;
import org.apache.flink.formats.avro.registry.cloudera.ClouderaRegistryKafkaDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;
public class KafkaAvroFlinkConsumer {
public static void main(String[] args) throws Exception {
String topicName = "kafka-flink-avro-1"; // Same Kafka topic as the producer
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-group-id");
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<Kafka server Ip/Url>:<Port>");
consumerConfig.putAll(Collections.singletonMap(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), "<Schema Regitry Ip/Utl>:<Port>/api/v1"));
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create a Flink Kafka consumer with the custom Avro deserialization schema
FlinkKafkaConsumer<ItemTransaction> kafkaConsumer = new FlinkKafkaConsumer<>(
topicName,
ClouderaRegistryKafkaDeserializationSchema
.builder(ItemTransaction.class)
.setRegistryAddress("http://<Schema Registry Ip/url>:<Port>/api/v1")
.build(),
consumerConfig
);
// Add the Kafka consumer to the Flink environment
DataStream<ItemTransaction> avroStream = env.addSource(kafkaConsumer);
KeyedStream<ItemTransaction, Tuple1<String>> keyedStream = avroStream.keyBy(new KeyExtractor());
// Process and write output using DynamicOutputPathMapper
DataStream<Tuple2<String, Long>> windowedCounts = keyedStream
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
.reduce(new ReduceFunction<ItemTransaction>() {
@Override
public ItemTransaction reduce(ItemTransaction item1, ItemTransaction item2) throws Exception {
return item1; // You can modify the reduce logic as needed
}
})
.map(new DynamicOutputPathMapper())
.keyBy(0)
.sum(1);
windowedCounts.print();
// Print Avro records for processing
avroStream.print();
env.execute("KafkaAvroFlinkConsumer");
}
public static class KeyExtractor implements KeySelector<ItemTransaction, Tuple1<String>> {
@Override
public Tuple1<String> getKey(ItemTransaction item) {
return new Tuple1<>(item.getName().toString());
}
}
// Custom Avro deserialization schema
public static class DynamicOutputPathMapper extends RichMapFunction<ItemTransaction, Tuple2<String, Long>> {
private long windowCount = 0;
private transient AvroOutputFormat<ItemTransaction> avroOutputFormat;
private transient int subtaskIndex;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
}
@Override
public void close() throws Exception {
if (avroOutputFormat != null) {
avroOutputFormat.close();
avroOutputFormat = null;
}
}
@Override
public Tuple2<String, Long> map(ItemTransaction item) throws Exception {
// Increment the window count
windowCount++;
// Generate a dynamic output path based on the window count and subtask index
String dynamicOutputPath = "oci://flink-output@orasenatdctocloudcorp01/output" + windowCount + "_" + subtaskIndex + ".txt";
// Initialize or update the AvroOutputFormat
if (avroOutputFormat == null) {
avroOutputFormat = new AvroOutputFormat<>(new Path(dynamicOutputPath), ItemTransaction.class);
avroOutputFormat.configure(null);
avroOutputFormat.open(subtaskIndex, getRuntimeContext().getNumberOfParallelSubtasks());
} else if (!dynamicOutputPath.equals(avroOutputFormat.getOutputFilePath())) {
avroOutputFormat.close();
avroOutputFormat.setOutputFilePath(new Path(dynamicOutputPath));
avroOutputFormat.open(subtaskIndex, getRuntimeContext().getNumberOfParallelSubtasks());
}
// Write the item using AvroOutputFormat
avroOutputFormat.writeRecord(item);
return new Tuple2<>("Window Count", 1L);
}
}
}
任务 5:部署和运行应用程序 jar
您构建了 Java Kafka Avro 生产者和 Flink 使用者,是时候在 OCI BDS 集群上部署和运行这些应用了。在 Hadoop 集群中部署 jar,并使用以下命令运行生成器和使用者。
sudo -u flink /usr/odh/current/flink/bin//flink run -m yarn-cluster -yD classloader.check-leaked-classloader=false -c <Main Class Name> <Path/To/Jar>/<Jar Name>
sudo -u flink /usr/odh/current/flink/bin//flink run -m yarn-cluster -yD classloader.check-leaked-classloader=false -c <Main Class Name> <Path/To/Jar>/<Jar Name>
任务 6:检查主题是否已注册到方案注册表
在继续操作之前,必须验证您在数据管道中使用的 Kafka 主题是否已注册到方案注册表。我们可以看到,该方案已注册到生成器应用程序中使用的主题名称。
任务 7:通过 Flink Web UI 监视闪烁并检查 OCI 对象存储中的输出文件
您的 Flink 作业正在 BDS 集群上运行,让我们通过 Flink Web UI 监视其进度,并验证存储在对象存储中的已处理数据。
任务 8:使用 Trino 查询 Avro 数据
Trino 是一个开源的分布式 SQL 查询引擎,可用于无缝查询 Avro 数据。确保在您的环境中安装和配置了 Trino。获取必要的连接详细信息,例如 Trino 协调器 URL。打开 BDS 终端并使用该命令启动 Trino CLI。使用 Trino CLI 对 Avro 数据运行 SQL 查询,如下图所示。
后续步骤
本教程提供了使用大数据服务在 OCI 上构建实时数据管道的综合指南,并集成了有效生成、处理和查询 Avro 数据的基本服务。凭借明确的目标、说明和积极强化,您有能力构建和优化自己的实时数据管道。
相关链接
确认
- Authors -Pavan Upadhyay(首席云工程师),Saket Bihari(首席云工程师)
更多学习资源
浏览 docs.oracle.com/learn 上的其他实验室,或者通过 Oracle Learning YouTube 频道访问更多免费学习内容。此外,请访问 education.oracle.com/learning-explorer 以成为 Oracle Learning Explorer。
有关产品文档,请访问 Oracle 帮助中心。
Build a Real-Time Data Pipeline with Oracle Cloud Infrastructure Big Data Service
F89771-01
November 2023
Copyright © 2023, Oracle and/or its affiliates.