Nota
- Questa esercitazione richiede l'accesso a Oracle Cloud. Per iscriverti a un account gratuito, consulta Inizia a utilizzare Oracle Cloud Infrastructure Free Tier.
- Utilizza valori di esempio per le credenziali, la tenancy e i compartimenti di Oracle Cloud Infrastructure. Al termine del laboratorio, sostituisci questi valori con quelli specifici del tuo ambiente cloud.
Crea una pipeline di dati in tempo reale con Oracle Cloud Infrastructure Big Data Service
Introduzione
Nel mondo odierno basato sui dati, le organizzazioni si affidano a pipeline di dati efficienti per acquisire, elaborare e analizzare i dati in tempo reale. Oracle Cloud Infrastructure (OCI) offre un solido ecosistema di servizi che possono essere sfruttati per creare una potente pipeline di dati. In questa esercitazione verrà descritto come creare una pipeline di dati basata su Oracle Cloud Infrastructure Big Data Service (OCI BDS). OCI BDS è un servizio basato su cloud fornito da Oracle che consente agli utenti di creare e gestire cluster Hadoop, cluster Spark e altri servizi Big Data. Utilizzeremo una parte del suo ecosistema che include Kafka, Flink, Schema Registry e Trino per costruire questa pipeline.

In questa esercitazione verranno fornite istruzioni dettagliate ed esempi di codice per garantire una distribuzione della pipeline di dati trasparente e sicura.
Obiettivi
-
Imposta risorse OCI: scopri come configurare le risorse OCI, inclusi i cluster BDS, Kafka, Flink, Schema Registry, OCI Object Storage e Trino per facilitare la pipeline dei dati.
-
Creare un producer Avro (Java): creare un'applicazione Java per la produzione di dati Avro nell'argomento Kafka durante l'utilizzo degli schemi Avro dal registro schema.
-
Creare un consumer Flink (Java): impostare un cluster BDS per un consumer Flink che utilizza i dati di Kafka, li elabora e li memorizza nello storage degli oggetti.
-
Distribuisci ed esegui le applicazioni: scopri come distribuire ed eseguire il producer Avro e le applicazioni consumer Flink sui tuoi cluster BDS.
-
Query sui dati esaminati: ottieni la possibilità di accedere ai dati curati in OCI Object Storage utilizzando diversi strumenti OCI, tra cui Trino, BDS Cloud SQL, Query Service, OAC e altro ancora. Utilizzeremo Trino per eseguire query sui dati Avro.
Prerequisiti
-
Un account OCI.
-
Accesso ai servizi OCI, inclusi OCI BDS e OCI Object Storage.
-
Conoscenza di Java e Apache Flink.
-
Strumenti CLI (Command Line Interface) OCI necessari e SDK installati.
-
Comprensione di Avro e Kafka.
Task 1: creare un cluster Oracle Big Data con Kafka/Flink/Schema Registry/Trino abilitato
-
Crea un ambiente BDS (ODH 2.0) su OCI. Per creare un cluster BDS, vedere Introduzione a un cluster Big Data ODH non ad alta disponibilità.
-
Eseguire il login ad Ambari con il seguente URL:
https://<cluster name>un0-hostname>:7183/per aggiungere i servizi Kafka, Flink, Schema Registry e Trino. Immettere il nome utente e la password, quindi fare clic su Connetti.
-
Fare clic sui puntini di sospensione (...) accanto a Servizi, quindi selezionare Aggiungi servizio.

-
Selezionare Kafka, Flink, registro schema e Trino, quindi fare clic su Avanti.

-
Se vengono visualizzate le seguenti configurazioni consigliate, fare clic su PROCED ANYWAY, quindi su DEPLOY.

-
Dopo l'installazione, fare clic su PROSSIMO, quindi su COMPLETO.


-
Riavviare tutti i componenti interessati. Fare clic sui puntini di sospensione (...) accanto a Servizi, selezionare Riavvia tutto necessario e dopo il riavvio fare clic su OK.

-
Eseguire il login al nodo master del cluster di Oracle Big Data Service tramite un comando
SSHo utilizzando putty con il fileppkutilizzando le credenziali utenteopc. Una volta eseguito il login, elevare le autorizzazioni all'utenteroot. Abbiamo usato putty per accedere ai nodi.sudo su - -
Eseguire i passi riportati di seguito per caricare il file jar in modo da poter accedere a Kafka con Flink.
-
Scaricare
lib.zipin qualsiasi directory (ad esempio/tmp) e decomprimerlo.cd /tmp wget https://objectstorage.ap-tokyo-1.oraclecloud.com/p/bhsTSKQYGjbeGwm5JW63Re3_o9o2JOrKiQVi_-6hrQfrB7lGSvA1z5RPyDLy3lpU/n/sehubjapacprod/b/live-lab/o/download/lib.zip #unzip lib unzip /tmp/lib.zip -
Eseguire il comando seguente per copiare il file jar in tutti i nodi BDS.
dcli -C "cp /usr/odh/current/flink/connectors/*kafka*.jar /usr/odh/current/flink/lib" dcli -C "cp /usr/lib/flink/connectors/flink-connector-jdbc-1.15.2.jar /usr/odh/current/flink/lib" dcli -C "cp /usr/odh/current/kafka-broker/libs/kafka-clients-3.2.0.jar /usr/odh/current/flink/lib" su - hdfs hadoop fs -put /usr/odh/current/flink/connectors/*.jar /flink/lib/flink-libs/ hadoop fs -put /usr/odh/current/kafka-broker/libs/kafka-clients-3.2.0.jar /flink/lib/flink-libs/ hadoop fs -put /tmp/lib/mysql-connector-java-8.0.28.jar /flink/lib/flink-libs/
-
-
Eseguire il login a Trino utilizzando i seguenti comandi e creare schema e tabella in cui il consumer Flink scriverà i dati Avro.
For HA Cluster /usr/lib/trino/bin/trino-cli --server <cordinator dns>:7778 --krb5-principal <username> --krb5-keytab-path <path to key tab> trino.service.keytab --krb5-remote-service-name trino --user <username> --truststore-path=/etc/security/serverKeys/truststore.jks For NON HA Cluster /usr/lib/trino/bin/trino-cli --server <cordinator dns>:8285 Create Schema CREATE SCHEMA <catalog>.<schema name> WITH ( LOCATION = '<object store>/<folder>'); Create table on SCHEMA CREATE TABLE <catalog>.<schema name>.<table name> ( name varchar, lastname varchar, age int, email varchar, timestamp bigint ) WITH ( format = 'AVRO', avro_schema_url = '<object store>/<avro schema file name>' )Nota: attenersi alla procedura per configurare il metastore Hive in modo che utilizzi l'implementazione
SerdeStorageSchemaReader. È necessario che Trino esegua query sui dati dall'area di memorizzazione degli oggetti OCI. -
Accedere a Ambari, Hive, Config e hive-site personalizzato. Impostare la proprietà seguente per configurare il metastore Hive:
metastore.storage.schema.reader.impl=org.apache.hadoop.hive.metastore.SerDeStorageSchemaReader. -
Creare un bucket di storage degli oggetti OCI per memorizzare l'output.
Task 2: creare un progetto Java Maven utilizzando le dipendenze POM
Per avviare lo sviluppo della pipeline di dati in tempo reale, impostare un progetto Java Maven con le seguenti dipendenze richieste. È possibile scegliere qualsiasi IDE o tipo di progetto adatto alle proprie preferenze.
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.15.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>3.2.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.15.2</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.15.2</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/schema-registry-serdes -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>schema-registry-serdes</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/schema-registry-client -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>schema-registry-client</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/schema-registry-common -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>schema-registry-common</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
<version>2.0.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/jersey-shaded -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>jersey-shaded</artifactId>
<version>0.9.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/common-auth -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>common-auth</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/registry-common-client -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>registry-common-client</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-guava -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-guava</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-joda -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-joda</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.module/jackson-module-parameter-names -->
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-parameter-names</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jdk8 -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jdk8</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jsr310 -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-json-org -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-json-org</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-annotations -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>1.15.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro-cloudera-registry -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro-cloudera-registry</artifactId>
<version>1.15.1-csa1.8.0.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-redis -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.10</artifactId>
<version>1.1.5-hadoop1</version>
</dependency>
<dependency>
<groupId>com.oracle.oci.sdk</groupId>
<artifactId>oci-java-sdk-redis</artifactId>
<version>3.26.0</version>
</dependency>
</dependencies>
Task 3: creare il producer Java Kafka Avro utilizzando il codice di esempio e il file avsc
Creare il producer Java Kafka Avro per la pipeline di dati utilizzando il codice di esempio e lo schema Avro forniti. Sostituire i segnaposto come your_kafka_bootstrap_servers, your_schema_registry_url, your_kafka_topic e adattare lo schema Avro con i valori e la struttura specifici.
Questo codice fungerà da base per il produttore Avro, consentendogli di produrre dati Avro sull'argomento Kafka all'interno della pipeline di dati in tempo reale.
package org.example;
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.StringSerializer;
import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Properties;
public class KafkaAvroProducer {
public static void main(String[] args) {
String schemaFileName = "ItemTransaction.avsc"; // Provide the path to your Avro schema file
String topicName = "kafka-flink-avro-1"; // Provide the Kafka topic name
try {
//Map<String, Object> config = new HashMap<>();
// Load producer properties
Properties producerConfig = new Properties();
//Map<String, Object> producerConfig = new HashMap<>();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<Kafka server IP/URL>:<Port>");
producerConfig.putAll(Collections.singletonMap(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), "http://<Schema Registry IP/URL>:<Port>/api/v1"));
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
// Create a Kafka producer
Producer<String, GenericRecord> producer = new KafkaProducer<>(producerConfig);
// Load Avro schema from resources
Schema schema = loadAvroSchemaFromResources(schemaFileName);
// Create a sample Avro record
while (true) {
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("name", "My Name");
avroRecord.put("lastname", "My Last Name");
avroRecord.put("age", 10);
avroRecord.put("email", "a@mail.com");
avroRecord.put("timestamp", System.currentTimeMillis());
// Create a Kafka record with a topic and Avro payload
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(topicName, avroRecord);
// Send the message
producer.send(record);
// Sleep for 3 seconds before sending the next message
Thread.sleep(3000);
}
//producer.close();
} catch (Exception e) {
e.printStackTrace();
}
}
private static Schema loadAvroSchemaFromResources(String schemaFileName) throws IOException {
try (InputStream inputStream = KafkaAvroProducer.class.getClassLoader().getResourceAsStream(schemaFileName)) {
if (inputStream == null) {
throw new IllegalArgumentException("Schema file not found in resources: " + schemaFileName);
}
return new Schema.Parser().parse(inputStream);
}
}
}
Il formato messaggio Avro definito dal file di schema ItemTransaction.avsc, come mostrato di seguito, viene memorizzato nella cartella resources del progetto. È importante sottolineare che per la serializzazione e la deserializzazione di Avro, è necessario generare una classe POJO specifica di Avro Java denominata ItemTransaction.java per gestire in modo efficace il formato Avro.
{
"type" : "record",
"name" : "ItemTransaction",
"namespace": "org.example",
"fields" : [
{ "name" : "name" , "type" : "string" },
{ "name" : "lastname" , "type" : "string" },
{ "name" : "age" , "type" : "int" },
{ "name" : "email" , "type" : "string" },
{ "name" : "timestamp" , "type" : "long" }
]
}
Task 4: creare un consumer Java Flink utilizzando un codice di esempio
Verrà creato il consumer Java Flink per la pipeline di dati utilizzando il codice di esempio fornito. Sostituire i segnaposto come your_kafka_bootstrap_servers, your_consumer_group_id e your_kafka_topic con i dettagli di impostazione Kafka specifici.
Questo codice di esempio imposta un consumer Flink Kafka all'interno dell'ambiente Flink, consentendo di elaborare il flusso Kafka in entrata. Personalizzare la logica di elaborazione in base alle esigenze per la pipeline di dati.
package org.example;
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.AvroOutputFormat;
import org.apache.flink.formats.avro.registry.cloudera.ClouderaRegistryKafkaDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;
public class KafkaAvroFlinkConsumer {
public static void main(String[] args) throws Exception {
String topicName = "kafka-flink-avro-1"; // Same Kafka topic as the producer
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-group-id");
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<Kafka server Ip/Url>:<Port>");
consumerConfig.putAll(Collections.singletonMap(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), "<Schema Regitry Ip/Utl>:<Port>/api/v1"));
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create a Flink Kafka consumer with the custom Avro deserialization schema
FlinkKafkaConsumer<ItemTransaction> kafkaConsumer = new FlinkKafkaConsumer<>(
topicName,
ClouderaRegistryKafkaDeserializationSchema
.builder(ItemTransaction.class)
.setRegistryAddress("http://<Schema Registry Ip/url>:<Port>/api/v1")
.build(),
consumerConfig
);
// Add the Kafka consumer to the Flink environment
DataStream<ItemTransaction> avroStream = env.addSource(kafkaConsumer);
KeyedStream<ItemTransaction, Tuple1<String>> keyedStream = avroStream.keyBy(new KeyExtractor());
// Process and write output using DynamicOutputPathMapper
DataStream<Tuple2<String, Long>> windowedCounts = keyedStream
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
.reduce(new ReduceFunction<ItemTransaction>() {
@Override
public ItemTransaction reduce(ItemTransaction item1, ItemTransaction item2) throws Exception {
return item1; // You can modify the reduce logic as needed
}
})
.map(new DynamicOutputPathMapper())
.keyBy(0)
.sum(1);
windowedCounts.print();
// Print Avro records for processing
avroStream.print();
env.execute("KafkaAvroFlinkConsumer");
}
public static class KeyExtractor implements KeySelector<ItemTransaction, Tuple1<String>> {
@Override
public Tuple1<String> getKey(ItemTransaction item) {
return new Tuple1<>(item.getName().toString());
}
}
// Custom Avro deserialization schema
public static class DynamicOutputPathMapper extends RichMapFunction<ItemTransaction, Tuple2<String, Long>> {
private long windowCount = 0;
private transient AvroOutputFormat<ItemTransaction> avroOutputFormat;
private transient int subtaskIndex;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
}
@Override
public void close() throws Exception {
if (avroOutputFormat != null) {
avroOutputFormat.close();
avroOutputFormat = null;
}
}
@Override
public Tuple2<String, Long> map(ItemTransaction item) throws Exception {
// Increment the window count
windowCount++;
// Generate a dynamic output path based on the window count and subtask index
String dynamicOutputPath = "oci://flink-output@orasenatdctocloudcorp01/output" + windowCount + "_" + subtaskIndex + ".txt";
// Initialize or update the AvroOutputFormat
if (avroOutputFormat == null) {
avroOutputFormat = new AvroOutputFormat<>(new Path(dynamicOutputPath), ItemTransaction.class);
avroOutputFormat.configure(null);
avroOutputFormat.open(subtaskIndex, getRuntimeContext().getNumberOfParallelSubtasks());
} else if (!dynamicOutputPath.equals(avroOutputFormat.getOutputFilePath())) {
avroOutputFormat.close();
avroOutputFormat.setOutputFilePath(new Path(dynamicOutputPath));
avroOutputFormat.open(subtaskIndex, getRuntimeContext().getNumberOfParallelSubtasks());
}
// Write the item using AvroOutputFormat
avroOutputFormat.writeRecord(item);
return new Tuple2<>("Window Count", 1L);
}
}
}
Task 5: Distribuzione ed esecuzione dei file jar delle applicazioni
Hai creato il producer Java Kafka Avro e il consumer Flink. È giunto il momento di distribuire ed eseguire queste applicazioni sui cluster BDS OCI. Distribuire il file jar nel cluster Hadoop e utilizzare i comandi riportati di seguito per eseguire Producer e Consumer.
sudo -u flink /usr/odh/current/flink/bin//flink run -m yarn-cluster -yD classloader.check-leaked-classloader=false -c <Main Class Name> <Path/To/Jar>/<Jar Name>
sudo -u flink /usr/odh/current/flink/bin//flink run -m yarn-cluster -yD classloader.check-leaked-classloader=false -c <Main Class Name> <Path/To/Jar>/<Jar Name>
Task 6: verificare se l'argomento è registrato nel registro schema
Prima di procedere ulteriormente, è essenziale verificare se l'argomento Kafka in uso nella pipeline di dati è registrato nel registro schema. Lo schema viene registrato con il nome argomento utilizzato nell'applicazione producer.

Task 7: Monitora il collegamento tramite l'interfaccia utente Web Flink e controlla il file di output in OCI Object Storage
Il job Flink è in esecuzione sul cluster BDS, monitoriamo lo stato di avanzamento tramite l'interfaccia utente Web Flink e convalidiamo i dati elaborati memorizzati nello storage degli oggetti.


Task 8: interrogare i dati Avro utilizzando Trino
Trino, un motore di query SQL distribuito open source, può essere utilizzato per eseguire query sui dati Avro senza problemi. Assicurarsi che Trino sia installato e configurato nell'ambiente in uso. Ottenere i dettagli di connessione necessari, ad esempio l'URL del coordinatore Trino. Aprire un terminale BDS e avviare la CLI Trino utilizzando il comando. Utilizzare l'interfaccia CLI Trino per eseguire query SQL sui dati Avro come mostrato nell'immagine riportata di seguito.

Passi successivi
Questa esercitazione fornisce una guida completa alla creazione di una pipeline di dati in tempo reale su OCI utilizzando il servizio Big Data, integrando servizi essenziali per produrre, elaborare ed eseguire query sui dati Avro in modo efficiente. Con obiettivi chiari, istruzioni e rinforzi positivi, sei ben attrezzato per costruire e ottimizzare la tua pipeline di dati in tempo reale.
Collegamenti correlati
Conferme
- Autori - Pavan Upadhyay (Principal Cloud Engineer), Saket Bihari (Principal Cloud Engineer)
Altre risorse di apprendimento
Esplora altri laboratori su docs.oracle.com/learn o accedi a più contenuti gratuiti sulla formazione su Oracle Learning YouTube channel. Inoltre, visita education.oracle.com/learning-explorer per diventare Oracle Learning Explorer.
Per la documentazione del prodotto, visitare Oracle Help Center.
Build a Real-Time Data Pipeline with Oracle Cloud Infrastructure Big Data Service
F89771-01
November 2023
Copyright © 2023, Oracle and/or its affiliates.