附註:
- 此教學課程需要存取 Oracle Cloud。若要註冊免費帳戶,請參閱開始使用 Oracle Cloud Infrastructure Free Tier 。
- 它使用 Oracle Cloud Infrastructure 憑證、租用戶及區間的範例值。完成實驗室時,請以雲端環境特有的值取代這些值。
使用 Oracle Cloud Infrastructure Big Data Service 建立即時資料管線
簡介
在現今資料導向世界中,組織仰賴高效率的資料管線,即時擷取、處理及分析資料。Oracle Cloud Infrastructure (OCI) 提供健全的服務生態系統,可用於建立強大的資料管線。在本教學課程中,我們將引導您完成建立運用 Oracle Cloud Infrastructure Big Data Service (OCI BDS) 之資料管線的程序。OCI BDS 是 Oracle 提供的雲端服務,可讓使用者建立及管理 Hadoop 叢集、Spark 叢集及其他大數據服務。我們將運用其生態系統的一部分,包括 Kafka、Flink、Schema Registry 以及 Trino,建置此管線。

在本教學課程中,我們將提供詳細的指示和程式碼範例,確保資料管線部署順暢且安全無虞。
目標
-
設定 OCI 資源:瞭解如何設定 OCI 資源,包括 BDS 叢集、Kafka、Flink、綱要登錄、OCI 物件儲存及 Trino,以協助進行資料管線。
-
建立 Avro 產生器 (Java):建立 Java 應用程式,以便在使用「綱要登錄」的 Avro 綱要時,將 Avro 資料產生至 Kafka 主題。
-
建立 Flink 用戶 (Java):為使用 Kafka 資料、處理資料並將其儲存在物件儲存中的 Flink 用戶設定 BDS 叢集。
-
部署並執行應用程式:瞭解如何在 BDS 叢集上部署和執行 Avro 產生器和 Flink 用戶應用程式。
-
查詢策劃資料:使用各種 OCI 工具 (包括 Trino、BDS Cloud SQL、查詢服務、OAC 等) 存取 OCI Object Storage 中策劃的資料。我們將使用 Trino 查詢 Avro 資料。
必要條件
-
OCI 帳戶。
-
存取 OCI 服務,包括 OCI BDS、OCI 物件儲存。
-
Java 和 Apache Flink 的知識。
-
已安裝必要的 OCI 命令行介面 (CLI) 工具和 SDK。
-
瞭解 Avro 和 Kafka。
作業 1:建立啟用 Kafka/Flink/Schema 登錄 /Trino 的 Oracle Big Data 叢集
-
在 OCI 上建立 BDS (ODH 2.0) 環境。若要建立 BDS 叢集,請參閱開始使用非高可用性 ODH 大數據叢集。
-
使用下列 URL 登入 Ambari:
https://<cluster name>un0-hostname>:7183/以新增 Kafka、Flink、綱要登錄及 Trino 服務。請輸入使用者名稱與密碼,然後按一下登入。
-
按一下服務旁邊的省略符號 ( ...),然後選取新增服務。

-
檢查 Kafka 、 Flink 、綱要登錄和 Trino ,然後按一下下一步。

-
如果您看到下列建議的組態,請按一下任何方式,然後按一下部署。

-
安裝之後,按一下下一步,然後按一下完成。


-
重新啟動所有受影響的元件。按一下服務旁邊的省略符號 ( ...),選取重新啟動所有必要項目,重新啟動後,按一下確定。

-
透過
SSH命令登入 Oracle Big Data Service 叢集主節點,或使用opc使用者證明資料搭配ppk檔案使用 putty。登入後,會將您的權限提升至root使用者。我們已使用 putty 登入節點。sudo su - -
執行下列步驟以上傳 Jar 檔案,以便使用 Flink 存取 Kafka。
-
將
lib.zip下載至任何目錄 (例如/tmp) 並解壓縮。cd /tmp wget https://objectstorage.ap-tokyo-1.oraclecloud.com/p/bhsTSKQYGjbeGwm5JW63Re3_o9o2JOrKiQVi_-6hrQfrB7lGSvA1z5RPyDLy3lpU/n/sehubjapacprod/b/live-lab/o/download/lib.zip #unzip lib unzip /tmp/lib.zip -
執行下列命令,將 jar 複製到所有 BDS 節點。
dcli -C "cp /usr/odh/current/flink/connectors/*kafka*.jar /usr/odh/current/flink/lib" dcli -C "cp /usr/lib/flink/connectors/flink-connector-jdbc-1.15.2.jar /usr/odh/current/flink/lib" dcli -C "cp /usr/odh/current/kafka-broker/libs/kafka-clients-3.2.0.jar /usr/odh/current/flink/lib" su - hdfs hadoop fs -put /usr/odh/current/flink/connectors/*.jar /flink/lib/flink-libs/ hadoop fs -put /usr/odh/current/kafka-broker/libs/kafka-clients-3.2.0.jar /flink/lib/flink-libs/ hadoop fs -put /tmp/lib/mysql-connector-java-8.0.28.jar /flink/lib/flink-libs/
-
-
使用下列命令登入 Trino,並建立 Flink 用戶將寫入 Avro 資料的綱要和表格。
For HA Cluster /usr/lib/trino/bin/trino-cli --server <cordinator dns>:7778 --krb5-principal <username> --krb5-keytab-path <path to key tab> trino.service.keytab --krb5-remote-service-name trino --user <username> --truststore-path=/etc/security/serverKeys/truststore.jks For NON HA Cluster /usr/lib/trino/bin/trino-cli --server <cordinator dns>:8285 Create Schema CREATE SCHEMA <catalog>.<schema name> WITH ( LOCATION = '<object store>/<folder>'); Create table on SCHEMA CREATE TABLE <catalog>.<schema name>.<table name> ( name varchar, lastname varchar, age int, email varchar, timestamp bigint ) WITH ( format = 'AVRO', avro_schema_url = '<object store>/<avro schema file name>' )注意:請依照步驟設定讓 Hive 描述資料存放區使用
SerdeStorageSchemaReader實行。Trino 必須從 OCI 物件存放區查詢資料。 -
前往 Ambari 、Hive 、 Config 以及 Custom hive-site 。設定下列特性以設定 Hive 描述資料存放區:
metastore.storage.schema.reader.impl=org.apache.hadoop.hive.metastore.SerDeStorageSchemaReader。 -
建立 OCI 物件儲存的儲存桶以儲存輸出。
作業 2:使用 POM 相依性建置 Java Maven 專案
若要開始開發即時資料管線,請設定包含下列必要相依性的 Java Maven 專案。您可以選擇任何符合您偏好的 IDE 或專案類型。
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.15.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>3.2.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.15.2</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.15.2</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/schema-registry-serdes -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>schema-registry-serdes</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/schema-registry-client -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>schema-registry-client</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/schema-registry-common -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>schema-registry-common</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
<version>2.0.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/jersey-shaded -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>jersey-shaded</artifactId>
<version>0.9.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/common-auth -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>common-auth</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/registry-common-client -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>registry-common-client</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-guava -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-guava</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-joda -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-joda</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.module/jackson-module-parameter-names -->
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-parameter-names</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jdk8 -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jdk8</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jsr310 -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-json-org -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-json-org</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-annotations -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>1.15.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro-cloudera-registry -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro-cloudera-registry</artifactId>
<version>1.15.1-csa1.8.0.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-redis -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.10</artifactId>
<version>1.1.5-hadoop1</version>
</dependency>
<dependency>
<groupId>com.oracle.oci.sdk</groupId>
<artifactId>oci-java-sdk-redis</artifactId>
<version>3.26.0</version>
</dependency>
</dependencies>
工作 3:使用範例程式碼和 avsc 檔案建置 Java Kafka Avro 產生器
使用提供的範例程式碼和 Avro 綱要,為您的資料管線建置 Java Kafka Avro 產生器。取代 your_kafka_bootstrap_servers、your_schema_registry_url、your_kafka_topic 等預留位置,並使用您的特定值和結構來調整 Avro 綱要。
此程式碼將作為 Avro 產生器的基礎,讓它能夠在您的即時資料管線中產生 Kafka 主題的 Avro 資料。
package org.example;
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.StringSerializer;
import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Properties;
public class KafkaAvroProducer {
public static void main(String[] args) {
String schemaFileName = "ItemTransaction.avsc"; // Provide the path to your Avro schema file
String topicName = "kafka-flink-avro-1"; // Provide the Kafka topic name
try {
//Map<String, Object> config = new HashMap<>();
// Load producer properties
Properties producerConfig = new Properties();
//Map<String, Object> producerConfig = new HashMap<>();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<Kafka server IP/URL>:<Port>");
producerConfig.putAll(Collections.singletonMap(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), "http://<Schema Registry IP/URL>:<Port>/api/v1"));
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
// Create a Kafka producer
Producer<String, GenericRecord> producer = new KafkaProducer<>(producerConfig);
// Load Avro schema from resources
Schema schema = loadAvroSchemaFromResources(schemaFileName);
// Create a sample Avro record
while (true) {
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("name", "My Name");
avroRecord.put("lastname", "My Last Name");
avroRecord.put("age", 10);
avroRecord.put("email", "a@mail.com");
avroRecord.put("timestamp", System.currentTimeMillis());
// Create a Kafka record with a topic and Avro payload
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(topicName, avroRecord);
// Send the message
producer.send(record);
// Sleep for 3 seconds before sending the next message
Thread.sleep(3000);
}
//producer.close();
} catch (Exception e) {
e.printStackTrace();
}
}
private static Schema loadAvroSchemaFromResources(String schemaFileName) throws IOException {
try (InputStream inputStream = KafkaAvroProducer.class.getClassLoader().getResourceAsStream(schemaFileName)) {
if (inputStream == null) {
throw new IllegalArgumentException("Schema file not found in resources: " + schemaFileName);
}
return new Schema.Parser().parse(inputStream);
}
}
}
綱要檔案 ItemTransaction.avsc 所定義的 Avro 訊息格式 (如下所示) 儲存在專案的 resources 資料夾中。請務必強調,對於 Avro 序列化和還原序列化,必須產生名為 ItemTransaction.java 的對應 Java Avro 特定 POJO 類別,才能有效處理 Avro 格式。
{
"type" : "record",
"name" : "ItemTransaction",
"namespace": "org.example",
"fields" : [
{ "name" : "name" , "type" : "string" },
{ "name" : "lastname" , "type" : "string" },
{ "name" : "age" , "type" : "int" },
{ "name" : "email" , "type" : "string" },
{ "name" : "timestamp" , "type" : "long" }
]
}
作業 4:使用範例程式碼建置 Java Flink 用戶
我們將使用提供的範例程式碼,為您的資料管線建構 Java Flink 用戶。將定位子 (例如 your_kafka_bootstrap_servers、your_consumer_group_id 和 your_kafka_topic) 取代為您特定的 Kafka 設定詳細資訊。
此範例程式碼會在 Flink 環境中設定 Flink Kafka 用戶,可讓您處理內送的 Kafka 串流。視需要自訂資料管線的處理邏輯。
package org.example;
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.AvroOutputFormat;
import org.apache.flink.formats.avro.registry.cloudera.ClouderaRegistryKafkaDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;
public class KafkaAvroFlinkConsumer {
public static void main(String[] args) throws Exception {
String topicName = "kafka-flink-avro-1"; // Same Kafka topic as the producer
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-group-id");
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<Kafka server Ip/Url>:<Port>");
consumerConfig.putAll(Collections.singletonMap(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), "<Schema Regitry Ip/Utl>:<Port>/api/v1"));
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create a Flink Kafka consumer with the custom Avro deserialization schema
FlinkKafkaConsumer<ItemTransaction> kafkaConsumer = new FlinkKafkaConsumer<>(
topicName,
ClouderaRegistryKafkaDeserializationSchema
.builder(ItemTransaction.class)
.setRegistryAddress("http://<Schema Registry Ip/url>:<Port>/api/v1")
.build(),
consumerConfig
);
// Add the Kafka consumer to the Flink environment
DataStream<ItemTransaction> avroStream = env.addSource(kafkaConsumer);
KeyedStream<ItemTransaction, Tuple1<String>> keyedStream = avroStream.keyBy(new KeyExtractor());
// Process and write output using DynamicOutputPathMapper
DataStream<Tuple2<String, Long>> windowedCounts = keyedStream
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
.reduce(new ReduceFunction<ItemTransaction>() {
@Override
public ItemTransaction reduce(ItemTransaction item1, ItemTransaction item2) throws Exception {
return item1; // You can modify the reduce logic as needed
}
})
.map(new DynamicOutputPathMapper())
.keyBy(0)
.sum(1);
windowedCounts.print();
// Print Avro records for processing
avroStream.print();
env.execute("KafkaAvroFlinkConsumer");
}
public static class KeyExtractor implements KeySelector<ItemTransaction, Tuple1<String>> {
@Override
public Tuple1<String> getKey(ItemTransaction item) {
return new Tuple1<>(item.getName().toString());
}
}
// Custom Avro deserialization schema
public static class DynamicOutputPathMapper extends RichMapFunction<ItemTransaction, Tuple2<String, Long>> {
private long windowCount = 0;
private transient AvroOutputFormat<ItemTransaction> avroOutputFormat;
private transient int subtaskIndex;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
}
@Override
public void close() throws Exception {
if (avroOutputFormat != null) {
avroOutputFormat.close();
avroOutputFormat = null;
}
}
@Override
public Tuple2<String, Long> map(ItemTransaction item) throws Exception {
// Increment the window count
windowCount++;
// Generate a dynamic output path based on the window count and subtask index
String dynamicOutputPath = "oci://flink-output@orasenatdctocloudcorp01/output" + windowCount + "_" + subtaskIndex + ".txt";
// Initialize or update the AvroOutputFormat
if (avroOutputFormat == null) {
avroOutputFormat = new AvroOutputFormat<>(new Path(dynamicOutputPath), ItemTransaction.class);
avroOutputFormat.configure(null);
avroOutputFormat.open(subtaskIndex, getRuntimeContext().getNumberOfParallelSubtasks());
} else if (!dynamicOutputPath.equals(avroOutputFormat.getOutputFilePath())) {
avroOutputFormat.close();
avroOutputFormat.setOutputFilePath(new Path(dynamicOutputPath));
avroOutputFormat.open(subtaskIndex, getRuntimeContext().getNumberOfParallelSubtasks());
}
// Write the item using AvroOutputFormat
avroOutputFormat.writeRecord(item);
return new Tuple2<>("Window Count", 1L);
}
}
}
作業 5:部署和執行應用程式 Jar
您已建置 Java Kafka Avro 產生器和 Flink 用戶,即可在您的 OCI BDS 叢集上部署和執行這些應用程式。在 Hadoop 叢集中部署 jar,然後使用下列命令執行「產生器」和「用戶」。
sudo -u flink /usr/odh/current/flink/bin//flink run -m yarn-cluster -yD classloader.check-leaked-classloader=false -c <Main Class Name> <Path/To/Jar>/<Jar Name>
sudo -u flink /usr/odh/current/flink/bin//flink run -m yarn-cluster -yD classloader.check-leaked-classloader=false -c <Main Class Name> <Path/To/Jar>/<Jar Name>
工作 6:檢查主題是否已向「綱要登錄」註冊
進一步繼續進行之前,請務必確認資料管線中使用的 Kafka 主題是否已向「綱要登錄」註冊。我們可以看到綱要是以產生器應用程式中使用的主題名稱註冊。

作業 7:透過 Flink Web UI 監督閃爍,並檢查 OCI 物件儲存中的輸出檔
您的 Flink 工作正在 BDS 叢集上執行,讓我們透過 Flink Web UI 監督其進度,並驗證儲存在物件儲存中的已處理資料。


作業 8:使用 Trino 查詢 Avro 資料
開放原始碼分散式 SQL 查詢引擎 Trino 可用於無縫查詢 Avro 資料。確定您的環境中已安裝並設定 Trino。取得必要的連線詳細資訊,例如 Trino 協調器 URL。開啟 BDS 終端機,然後使用命令啟動 Trino CLI。使用 Trino CLI 對 Avro 資料執行 SQL 查詢,如下圖所示。

接下來的步驟
本教學課程提供使用大數據服務在 OCI 上建置即時資料管線的全方位指南,整合重要服務以有效率地產生、處理及查詢 Avro 資料。有了明確的目標、指示和正面的強化功能,您就可以建構並最佳化自己的即時資料管線。
相關連結
認可
- 作者 - Pavan Upadhyay (主要雲端工程師)、Saket Bihari (主要雲端工程師)
其他學習資源
瀏覽 docs.oracle.com/learn 的其他實驗室,或前往 Oracle Learning YouTube 頻道存取更多免費學習內容。此外,請造訪 education.oracle.com/learning-explorer 以成為 Oracle Learning Explorer。
如需產品文件,請造訪 Oracle Help Center 。
Build a Real-Time Data Pipeline with Oracle Cloud Infrastructure Big Data Service
F89771-01
November 2023
Copyright © 2023, Oracle and/or its affiliates.