Note:
- Este tutorial requiere acceso a Oracle Cloud. Para registrarse en una cuenta gratuita, consulte Introducción a la cuenta gratuita de Oracle Cloud Infrastructure.
- Utiliza valores de ejemplo para credenciales, arrendamiento y compartimentos de Oracle Cloud Infrastructure. Al finalizar el laboratorio, sustituya estos valores por otros específicos del entorno en la nube.
Cree un pipeline de datos en tiempo real con Oracle Cloud Infrastructure Big Data Service
Introducción
En el mundo actual basado en datos, las organizaciones confían en pipelines de datos eficientes para capturar, procesar y analizar datos en tiempo real. Oracle Cloud Infrastructure (OCI) proporciona un sólido ecosistema de servicios que se pueden aprovechar para crear un potente pipeline de datos. En este tutorial, le guiaremos por el proceso de creación de un pipeline de datos que utilice Oracle Cloud Infrastructure Big Data Service (OCI BDS). OCI BDS es un servicio basado en la nube proporcionado por Oracle que permite a los usuarios crear y gestionar clusters de Hadoop, clusters de Spark y otros servicios de big data. Utilizaremos una parte de su ecosistema que incluye Kafka, Flink, Schema Registry y Trino para construir este pipeline.
A lo largo de este tutorial, proporcionaremos instrucciones detalladas y ejemplos de código para garantizar un despliegue de pipeline de datos fluido y seguro.
Objetivos
-
Configurar recursos de OCI: descubra cómo configurar recursos de OCI, incluidos clusters de BDS, Kafka, Flink, Schema Registry, OCI Object Storage y Trino para facilitar el pipeline de datos.
-
Crear un productor de Avro (Java): cree una aplicación Java para producir datos de Avro en el tema de Kafka mientras utiliza esquemas de Avro desde el registro de esquemas.
-
Creación de un consumidor de Flink (Java): configure un cluster de BDS para un consumidor de Flink que consume datos de Kafka, los procesa y los almacena en Object Storage.
-
Despliegue y ejecución de las aplicaciones: descubra cómo desplegar y ejecutar las aplicaciones de productor de Avro y consumidor de Flink en sus clusters de BDS.
-
Consultar datos depurados: obtenga la capacidad de acceder a los datos depurados en OCI Object Storage mediante varias herramientas de OCI, como Trino, BDS Cloud SQL, Query Service, OAC, etc. Utilizaremos Trino para consultar los datos de Avro.
Requisitos
-
Una cuenta de OCI.
-
Acceso a servicios de OCI, incluidos OCI BDS y OCI Object Storage.
-
Conocimiento de Java y Apache Flink.
-
Se han instalado las herramientas y los SDK de la interfaz de línea de comandos (CLI) de OCI necesarios.
-
Comprensión de Avro y Kafka.
Tarea 1: Creación de un cluster de Oracle Big Data con Kafka/Flink/Schema Registry/Trino activado
-
Cree un entorno BDS (ODH 2.0) en OCI. Para crear un cluster de BDS, consulte Get Started with a Non-Highly Available ODH Big Data Cluster.
-
Conéctese a Ambari con la siguiente URL:
https://<cluster name>un0-hostname>:7183/
para agregar servicios de Kafka, Flink, registro de esquemas y Trino. Introduzca el nombre de usuario y la contraseña y, a continuación, haga clic en Conectar. -
Haga clic en los puntos suspensivos (...) junto a Servicios y, a continuación, seleccione Agregar servicio.
-
Marque Kafka, Flink, registro de esquemas y Trino y, a continuación, haga clic en Next.
-
Si ve las siguientes configuraciones recomendadas, haga clic en PROCEED ANYWAY y, a continuación, haga clic en DEPLOY.
-
Después de la instalación, haga clic en SIGUIENTE y, a continuación, haga clic en COMPLETE.
-
Reinicie todos los componentes afectados. Haga clic en los puntos suspensivos (...) junto a Servicios, seleccione Reiniciar todo lo necesario y, después de reiniciar, haga clic en Aceptar.
-
Conéctese al nodo maestro del cluster de Oracle Big Data Service mediante un comando
SSH
o mediante putty con el archivoppk
mediante las credenciales de usuarioopc
. Una vez conectado, eleve los permisos al usuarioroot
. Hemos utilizado putty para iniciar sesión en los nodos.sudo su -
-
Ejecute los siguientes pasos para cargar el archivo jar para poder acceder a Kafka con Flink.
-
Descargue
lib.zip
en cualquier directorio (por ejemplo,/tmp
) y descomprímalo.cd /tmp wget https://objectstorage.ap-tokyo-1.oraclecloud.com/p/bhsTSKQYGjbeGwm5JW63Re3_o9o2JOrKiQVi_-6hrQfrB7lGSvA1z5RPyDLy3lpU/n/sehubjapacprod/b/live-lab/o/download/lib.zip #unzip lib unzip /tmp/lib.zip
-
Ejecute el siguiente comando para copiar jar en todos los nodos de BDS.
dcli -C "cp /usr/odh/current/flink/connectors/*kafka*.jar /usr/odh/current/flink/lib" dcli -C "cp /usr/lib/flink/connectors/flink-connector-jdbc-1.15.2.jar /usr/odh/current/flink/lib" dcli -C "cp /usr/odh/current/kafka-broker/libs/kafka-clients-3.2.0.jar /usr/odh/current/flink/lib" su - hdfs hadoop fs -put /usr/odh/current/flink/connectors/*.jar /flink/lib/flink-libs/ hadoop fs -put /usr/odh/current/kafka-broker/libs/kafka-clients-3.2.0.jar /flink/lib/flink-libs/ hadoop fs -put /tmp/lib/mysql-connector-java-8.0.28.jar /flink/lib/flink-libs/
-
-
Inicie sesión en Trino con los siguientes comandos y cree un esquema y una tabla donde el consumidor de Flink escribirá los datos de Avro.
For HA Cluster /usr/lib/trino/bin/trino-cli --server <cordinator dns>:7778 --krb5-principal <username> --krb5-keytab-path <path to key tab> trino.service.keytab --krb5-remote-service-name trino --user <username> --truststore-path=/etc/security/serverKeys/truststore.jks For NON HA Cluster /usr/lib/trino/bin/trino-cli --server <cordinator dns>:8285 Create Schema CREATE SCHEMA <catalog>.<schema name> WITH ( LOCATION = '<object store>/<folder>'); Create table on SCHEMA CREATE TABLE <catalog>.<schema name>.<table name> ( name varchar, lastname varchar, age int, email varchar, timestamp bigint ) WITH ( format = 'AVRO', avro_schema_url = '<object store>/<avro schema file name>' )
Nota: Siga los pasos para configurar Hive Metastore para utilizar la implantación
SerdeStorageSchemaReader
. Es necesario que Trino consulte datos del almacén de objetos de OCI. -
Vaya a Ambari, Hive, Config y Custom hive-site. Defina la siguiente propiedad para configurar Hive Metastore:
metastore.storage.schema.reader.impl=org.apache.hadoop.hive.metastore.SerDeStorageSchemaReader
. -
Cree un cubo de OCI Object Storage para almacenar la salida.
Tarea 2: Creación de un proyecto de Java Maven mediante dependencias POM
Para iniciar el desarrollo del pipeline de datos en tiempo real, vamos a configurar un proyecto de Java Maven con las siguientes dependencias necesarias. Puede elegir cualquier IDE o tipo de proyecto que se adapte a sus preferencias.
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.15.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>3.2.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.15.2</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.15.2</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/schema-registry-serdes -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>schema-registry-serdes</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/schema-registry-client -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>schema-registry-client</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/schema-registry-common -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>schema-registry-common</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
<version>2.0.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/jersey-shaded -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>jersey-shaded</artifactId>
<version>0.9.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/common-auth -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>common-auth</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/registry-common-client -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>registry-common-client</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-guava -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-guava</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-joda -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-joda</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.module/jackson-module-parameter-names -->
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-parameter-names</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jdk8 -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jdk8</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jsr310 -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-json-org -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-json-org</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-annotations -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>1.15.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro-cloudera-registry -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro-cloudera-registry</artifactId>
<version>1.15.1-csa1.8.0.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-redis -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.10</artifactId>
<version>1.1.5-hadoop1</version>
</dependency>
<dependency>
<groupId>com.oracle.oci.sdk</groupId>
<artifactId>oci-java-sdk-redis</artifactId>
<version>3.26.0</version>
</dependency>
</dependencies>
Tarea 3: Creación del productor de Java Kafka Avro con código de ejemplo y archivo avsc
Cree un productor de Java Kafka Avro para el pipeline de datos con el código de ejemplo y el esquema de Avro proporcionados. Sustituya marcadores de posición como your_kafka_bootstrap_servers
, your_schema_registry_url
, your_kafka_topic
y adapte el esquema de Avro a sus valores y estructura específicos.
Este código servirá como base para su productor de Avro, permitiéndole producir datos de Avro al tema de Kafka dentro de su pipeline de datos en tiempo real.
package org.example;
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.StringSerializer;
import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Properties;
public class KafkaAvroProducer {
public static void main(String[] args) {
String schemaFileName = "ItemTransaction.avsc"; // Provide the path to your Avro schema file
String topicName = "kafka-flink-avro-1"; // Provide the Kafka topic name
try {
//Map<String, Object> config = new HashMap<>();
// Load producer properties
Properties producerConfig = new Properties();
//Map<String, Object> producerConfig = new HashMap<>();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<Kafka server IP/URL>:<Port>");
producerConfig.putAll(Collections.singletonMap(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), "http://<Schema Registry IP/URL>:<Port>/api/v1"));
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
// Create a Kafka producer
Producer<String, GenericRecord> producer = new KafkaProducer<>(producerConfig);
// Load Avro schema from resources
Schema schema = loadAvroSchemaFromResources(schemaFileName);
// Create a sample Avro record
while (true) {
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("name", "My Name");
avroRecord.put("lastname", "My Last Name");
avroRecord.put("age", 10);
avroRecord.put("email", "a@mail.com");
avroRecord.put("timestamp", System.currentTimeMillis());
// Create a Kafka record with a topic and Avro payload
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(topicName, avroRecord);
// Send the message
producer.send(record);
// Sleep for 3 seconds before sending the next message
Thread.sleep(3000);
}
//producer.close();
} catch (Exception e) {
e.printStackTrace();
}
}
private static Schema loadAvroSchemaFromResources(String schemaFileName) throws IOException {
try (InputStream inputStream = KafkaAvroProducer.class.getClassLoader().getResourceAsStream(schemaFileName)) {
if (inputStream == null) {
throw new IllegalArgumentException("Schema file not found in resources: " + schemaFileName);
}
return new Schema.Parser().parse(inputStream);
}
}
}
El formato de mensaje Avro definido por el archivo de esquema ItemTransaction.avsc
, como se muestra a continuación, se almacena en la carpeta resources
del proyecto. Es importante resaltar que para la serialización y anulación de la serialización de Avro, se debe generar una clase POJO correspondiente específica de Java Avro denominada ItemTransaction.java
para manejar eficazmente el formato de Avro.
{
"type" : "record",
"name" : "ItemTransaction",
"namespace": "org.example",
"fields" : [
{ "name" : "name" , "type" : "string" },
{ "name" : "lastname" , "type" : "string" },
{ "name" : "age" , "type" : "int" },
{ "name" : "email" , "type" : "string" },
{ "name" : "timestamp" , "type" : "long" }
]
}
Tarea 4: Crear un consumidor de Java Flink con código de ejemplo
Construiremos el consumidor de Java Flink para su pipeline de datos utilizando el código de ejemplo proporcionado. Sustituya los marcadores de posición como your_kafka_bootstrap_servers
, your_consumer_group_id
y your_kafka_topic
por los detalles de configuración de Kafka específicos.
Este código de ejemplo configura un consumidor de Kafka de Flink en el entorno de Flink, lo que le permite procesar el flujo de Kafka entrante. Personalice la lógica de procesamiento según sea necesario para el pipeline de datos.
package org.example;
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.AvroOutputFormat;
import org.apache.flink.formats.avro.registry.cloudera.ClouderaRegistryKafkaDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;
public class KafkaAvroFlinkConsumer {
public static void main(String[] args) throws Exception {
String topicName = "kafka-flink-avro-1"; // Same Kafka topic as the producer
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-group-id");
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<Kafka server Ip/Url>:<Port>");
consumerConfig.putAll(Collections.singletonMap(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), "<Schema Regitry Ip/Utl>:<Port>/api/v1"));
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create a Flink Kafka consumer with the custom Avro deserialization schema
FlinkKafkaConsumer<ItemTransaction> kafkaConsumer = new FlinkKafkaConsumer<>(
topicName,
ClouderaRegistryKafkaDeserializationSchema
.builder(ItemTransaction.class)
.setRegistryAddress("http://<Schema Registry Ip/url>:<Port>/api/v1")
.build(),
consumerConfig
);
// Add the Kafka consumer to the Flink environment
DataStream<ItemTransaction> avroStream = env.addSource(kafkaConsumer);
KeyedStream<ItemTransaction, Tuple1<String>> keyedStream = avroStream.keyBy(new KeyExtractor());
// Process and write output using DynamicOutputPathMapper
DataStream<Tuple2<String, Long>> windowedCounts = keyedStream
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
.reduce(new ReduceFunction<ItemTransaction>() {
@Override
public ItemTransaction reduce(ItemTransaction item1, ItemTransaction item2) throws Exception {
return item1; // You can modify the reduce logic as needed
}
})
.map(new DynamicOutputPathMapper())
.keyBy(0)
.sum(1);
windowedCounts.print();
// Print Avro records for processing
avroStream.print();
env.execute("KafkaAvroFlinkConsumer");
}
public static class KeyExtractor implements KeySelector<ItemTransaction, Tuple1<String>> {
@Override
public Tuple1<String> getKey(ItemTransaction item) {
return new Tuple1<>(item.getName().toString());
}
}
// Custom Avro deserialization schema
public static class DynamicOutputPathMapper extends RichMapFunction<ItemTransaction, Tuple2<String, Long>> {
private long windowCount = 0;
private transient AvroOutputFormat<ItemTransaction> avroOutputFormat;
private transient int subtaskIndex;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
}
@Override
public void close() throws Exception {
if (avroOutputFormat != null) {
avroOutputFormat.close();
avroOutputFormat = null;
}
}
@Override
public Tuple2<String, Long> map(ItemTransaction item) throws Exception {
// Increment the window count
windowCount++;
// Generate a dynamic output path based on the window count and subtask index
String dynamicOutputPath = "oci://flink-output@orasenatdctocloudcorp01/output" + windowCount + "_" + subtaskIndex + ".txt";
// Initialize or update the AvroOutputFormat
if (avroOutputFormat == null) {
avroOutputFormat = new AvroOutputFormat<>(new Path(dynamicOutputPath), ItemTransaction.class);
avroOutputFormat.configure(null);
avroOutputFormat.open(subtaskIndex, getRuntimeContext().getNumberOfParallelSubtasks());
} else if (!dynamicOutputPath.equals(avroOutputFormat.getOutputFilePath())) {
avroOutputFormat.close();
avroOutputFormat.setOutputFilePath(new Path(dynamicOutputPath));
avroOutputFormat.open(subtaskIndex, getRuntimeContext().getNumberOfParallelSubtasks());
}
// Write the item using AvroOutputFormat
avroOutputFormat.writeRecord(item);
return new Tuple2<>("Window Count", 1L);
}
}
}
Tarea 5: Despliegue y ejecución de los archivos jar de aplicaciones
Ha creado el productor de Java Kafka Avro y el consumidor de Flink. Ha llegado el momento de desplegar y ejecutar estas aplicaciones en sus clusters de OCI BDS. Despliegue el jar en el cluster de Hadoop y utilice los siguientes comandos para ejecutar Producer y Consumer.
sudo -u flink /usr/odh/current/flink/bin//flink run -m yarn-cluster -yD classloader.check-leaked-classloader=false -c <Main Class Name> <Path/To/Jar>/<Jar Name>
sudo -u flink /usr/odh/current/flink/bin//flink run -m yarn-cluster -yD classloader.check-leaked-classloader=false -c <Main Class Name> <Path/To/Jar>/<Jar Name>
Tarea 6: Comprobar si el tema está registrado en el registro de esquema
Antes de continuar, es esencial verificar si el tema de Kafka que está utilizando en el pipeline de datos está registrado en el registro de esquema. Podemos ver que el esquema está registrado con el nombre de tema utilizado en la aplicación del productor.
Tarea 7: Supervisión de Flink a través de la interfaz de usuario web de Flink y comprobación del archivo de salida en OCI Object Storage
El trabajo Flink se está ejecutando en el cluster de BDS, supervisemos su progreso mediante la interfaz de usuario web de Flink y validemos los datos procesados almacenados en Object Storage.
Tarea 8: Consulta de datos de Avro con Trino
Trino, un motor de consultas SQL distribuido de código abierto, se puede utilizar para consultar datos de Avro sin problemas. Asegúrese de que Trino esté instalado y configurado en su entorno. Obtenga los detalles de conexión necesarios, como la URL del coordinador de Trino. Abra un terminal de BDS e inicie Trino CLI con el comando. Utilice la CLI de Trino para ejecutar consultas SQL en los datos de Avro, como se muestra en la siguiente imagen.
Pasos Siguientes
Este tutorial proporciona una guía completa para crear un pipeline de datos en tiempo real en OCI mediante Big Data Service, integrando servicios esenciales para producir, procesar y consultar datos de Avro de forma eficiente. Con objetivos claros, instrucciones y refuerzo positivo, está bien equipado para construir y optimizar su propio pipeline de datos en tiempo real.
Enlaces relacionados
Agradecimientos
- Autores: Pavan Upadhyay (ingeniero principal de la nube), Saket Bihari (ingeniero principal de la nube)
Más recursos de aprendizaje
Explore otros laboratorios en docs.oracle.com/learn o acceda a más contenido de aprendizaje gratuito en el canal YouTube de Oracle Learning. Además, visite education.oracle.com/learning-explorer para convertirse en Oracle Learning Explorer.
Para obtener documentación sobre el producto, visite Oracle Help Center.
Build a Real-Time Data Pipeline with Oracle Cloud Infrastructure Big Data Service
F89771-01
November 2023
Copyright © 2023, Oracle and/or its affiliates.