Note:

Cree un pipeline de datos en tiempo real con Oracle Cloud Infrastructure Big Data Service

Introducción

En el mundo actual basado en datos, las organizaciones confían en pipelines de datos eficientes para capturar, procesar y analizar datos en tiempo real. Oracle Cloud Infrastructure (OCI) proporciona un sólido ecosistema de servicios que se pueden aprovechar para crear un potente pipeline de datos. En este tutorial, le guiaremos por el proceso de creación de un pipeline de datos que utilice Oracle Cloud Infrastructure Big Data Service (OCI BDS). OCI BDS es un servicio basado en la nube proporcionado por Oracle que permite a los usuarios crear y gestionar clusters de Hadoop, clusters de Spark y otros servicios de big data. Utilizaremos una parte de su ecosistema que incluye Kafka, Flink, Schema Registry y Trino para construir este pipeline.

Arquitectura del pipeline

A lo largo de este tutorial, proporcionaremos instrucciones detalladas y ejemplos de código para garantizar un despliegue de pipeline de datos fluido y seguro.

Objetivos

Requisitos

Tarea 1: Creación de un cluster de Oracle Big Data con Kafka/Flink/Schema Registry/Trino activado

  1. Cree un entorno BDS (ODH 2.0) en OCI. Para crear un cluster de BDS, consulte Get Started with a Non-Highly Available ODH Big Data Cluster.

  2. Conéctese a Ambari con la siguiente URL: https://<cluster name>un0-hostname>:7183/ para agregar servicios de Kafka, Flink, registro de esquemas y Trino. Introduzca el nombre de usuario y la contraseña y, a continuación, haga clic en Conectar.

    Iniciar sesión en Ambari

  3. Haga clic en los puntos suspensivos (...) junto a Servicios y, a continuación, seleccione Agregar servicio.

    Agregar servicios

  4. Marque Kafka, Flink, registro de esquemas y Trino y, a continuación, haga clic en Next.

    Seleccionar Servicios

  5. Si ve las siguientes configuraciones recomendadas, haga clic en PROCEED ANYWAY y, a continuación, haga clic en DEPLOY.

    ¿Desea continuar de todos modos

  6. Después de la instalación, haga clic en SIGUIENTE y, a continuación, haga clic en COMPLETE.

    Después de la instalación siguiente

    Después de finalizar la instalación

  7. Reinicie todos los componentes afectados. Haga clic en los puntos suspensivos (...) junto a Servicios, seleccione Reiniciar todo lo necesario y, después de reiniciar, haga clic en Aceptar.

    Servicios de inicio de Ambari

  8. Conéctese al nodo maestro del cluster de Oracle Big Data Service mediante un comando SSH o mediante putty con el archivo ppk mediante las credenciales de usuario opc. Una vez conectado, eleve los permisos al usuario root. Hemos utilizado putty para iniciar sesión en los nodos.

    sudo su -
    
  9. Ejecute los siguientes pasos para cargar el archivo jar para poder acceder a Kafka con Flink.

    1. Descargue lib.zip en cualquier directorio (por ejemplo, /tmp) y descomprímalo.

      cd /tmp
      wget https://objectstorage.ap-tokyo-1.oraclecloud.com/p/bhsTSKQYGjbeGwm5JW63Re3_o9o2JOrKiQVi_-6hrQfrB7lGSvA1z5RPyDLy3lpU/n/sehubjapacprod/b/live-lab/o/download/lib.zip
      #unzip lib
      unzip /tmp/lib.zip
      
    2. Ejecute el siguiente comando para copiar jar en todos los nodos de BDS.

      dcli -C "cp /usr/odh/current/flink/connectors/*kafka*.jar  /usr/odh/current/flink/lib"
      dcli -C "cp /usr/lib/flink/connectors/flink-connector-jdbc-1.15.2.jar  /usr/odh/current/flink/lib"
      dcli -C "cp /usr/odh/current/kafka-broker/libs/kafka-clients-3.2.0.jar  /usr/odh/current/flink/lib"
      su - hdfs
      hadoop fs -put /usr/odh/current/flink/connectors/*.jar /flink/lib/flink-libs/
      hadoop fs -put /usr/odh/current/kafka-broker/libs/kafka-clients-3.2.0.jar /flink/lib/flink-libs/
      hadoop fs -put /tmp/lib/mysql-connector-java-8.0.28.jar /flink/lib/flink-libs/
      
  10. Inicie sesión en Trino con los siguientes comandos y cree un esquema y una tabla donde el consumidor de Flink escribirá los datos de Avro.

    For HA Cluster
    
    /usr/lib/trino/bin/trino-cli --server <cordinator dns>:7778 --krb5-principal <username> --krb5-keytab-path <path to key tab> trino.service.keytab --krb5-remote-service-name trino --user <username> --truststore-path=/etc/security/serverKeys/truststore.jks
    
    For NON HA Cluster
    
    /usr/lib/trino/bin/trino-cli --server <cordinator dns>:8285
    
    Create Schema
    
    CREATE SCHEMA <catalog>.<schema name> WITH ( LOCATION  = '<object store>/<folder>');
    
    Create table on SCHEMA
    
    CREATE TABLE <catalog>.<schema name>.<table name> (
    name varchar,
    lastname varchar,
    age int,
    email varchar,
    timestamp bigint
    )
    WITH (
    format = 'AVRO',
    avro_schema_url = '<object store>/<avro schema file name>'
    )
    
    

    Nota: Siga los pasos para configurar Hive Metastore para utilizar la implantación SerdeStorageSchemaReader. Es necesario que Trino consulte datos del almacén de objetos de OCI.

  11. Vaya a Ambari, Hive, Config y Custom hive-site. Defina la siguiente propiedad para configurar Hive Metastore: metastore.storage.schema.reader.impl=org.apache.hadoop.hive.metastore.SerDeStorageSchemaReader.

  12. Cree un cubo de OCI Object Storage para almacenar la salida.

Tarea 2: Creación de un proyecto de Java Maven mediante dependencias POM

Para iniciar el desarrollo del pipeline de datos en tiempo real, vamos a configurar un proyecto de Java Maven con las siguientes dependencias necesarias. Puede elegir cualquier IDE o tipo de proyecto que se adapte a sus preferencias.

<dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka</artifactId>
      <version>1.15.2</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.12</artifactId>
      <version>3.2.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>1.15.2</version>
      <scope>provided</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-core</artifactId>
      <version>1.15.2</version>
      <scope>provided</scope>
    </dependency>


    <!-- https://mvnrepository.com/artifact/com.hortonworks.registries/schema-registry-serdes -->
    <dependency>
      <groupId>com.hortonworks.registries</groupId>
      <artifactId>schema-registry-serdes</artifactId>
      <version>1.0.0</version>
    </dependency>


    <!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
    <dependency>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro</artifactId>
      <version>1.11.3</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.hortonworks.registries/schema-registry-client -->
    <dependency>
      <groupId>com.hortonworks.registries</groupId>
      <artifactId>schema-registry-client</artifactId>
      <version>1.0.0</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.hortonworks.registries/schema-registry-common -->
    <dependency>
      <groupId>com.hortonworks.registries</groupId>
      <artifactId>schema-registry-common</artifactId>
      <version>1.0.0</version>
    </dependency>


    <dependency>
      <groupId>javax.ws.rs</groupId>
      <artifactId>javax.ws.rs-api</artifactId>
      <version>2.0.1</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.hortonworks.registries/jersey-shaded -->
    <dependency>
      <groupId>com.hortonworks.registries</groupId>
      <artifactId>jersey-shaded</artifactId>
      <version>0.9.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.hortonworks.registries/common-auth -->
    <dependency>
      <groupId>com.hortonworks.registries</groupId>
      <artifactId>common-auth</artifactId>
      <version>1.0.0</version>
    </dependency>


    <!-- https://mvnrepository.com/artifact/com.hortonworks.registries/registry-common-client -->
    <dependency>
      <groupId>com.hortonworks.registries</groupId>
      <artifactId>registry-common-client</artifactId>
      <version>1.0.0</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>3.2.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-guava -->
    <dependency>
      <groupId>com.fasterxml.jackson.datatype</groupId>
      <artifactId>jackson-datatype-guava</artifactId>
      <version>2.14.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-joda -->
    <dependency>
      <groupId>com.fasterxml.jackson.datatype</groupId>
      <artifactId>jackson-datatype-joda</artifactId>
      <version>2.14.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.module/jackson-module-parameter-names -->
    <dependency>
      <groupId>com.fasterxml.jackson.module</groupId>
      <artifactId>jackson-module-parameter-names</artifactId>
      <version>2.14.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jdk8 -->
    <dependency>
      <groupId>com.fasterxml.jackson.datatype</groupId>
      <artifactId>jackson-datatype-jdk8</artifactId>
      <version>2.14.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jsr310 -->
    <dependency>
      <groupId>com.fasterxml.jackson.datatype</groupId>
      <artifactId>jackson-datatype-jsr310</artifactId>
      <version>2.14.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-json-org -->
    <dependency>
      <groupId>com.fasterxml.jackson.datatype</groupId>
      <artifactId>jackson-datatype-json-org</artifactId>
      <version>2.14.1</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-annotations -->
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-annotations</artifactId>
      <version>2.14.1</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-avro</artifactId>
      <version>1.15.2</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro-cloudera-registry -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-avro-cloudera-registry</artifactId>
      <version>1.15.1-csa1.8.0.4</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-redis -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-redis_2.10</artifactId>
      <version>1.1.5-hadoop1</version>
    </dependency>

    <dependency>
      <groupId>com.oracle.oci.sdk</groupId>
      <artifactId>oci-java-sdk-redis</artifactId>
      <version>3.26.0</version>
    </dependency>
</dependencies>

Tarea 3: Creación del productor de Java Kafka Avro con código de ejemplo y archivo avsc

Cree un productor de Java Kafka Avro para el pipeline de datos con el código de ejemplo y el esquema de Avro proporcionados. Sustituya marcadores de posición como your_kafka_bootstrap_servers, your_schema_registry_url, your_kafka_topic y adapte el esquema de Avro a sus valores y estructura específicos.

Este código servirá como base para su productor de Avro, permitiéndole producir datos de Avro al tema de Kafka dentro de su pipeline de datos en tiempo real.

package org.example;
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.StringSerializer;
import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;


import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Properties;

public class KafkaAvroProducer {
    public static void main(String[] args) {

        String schemaFileName = "ItemTransaction.avsc"; // Provide the path to your Avro schema file
        String topicName = "kafka-flink-avro-1"; // Provide the Kafka topic name

        try {
            //Map<String, Object> config = new HashMap<>();

            // Load producer properties
            Properties producerConfig = new Properties();
            //Map<String, Object> producerConfig = new HashMap<>();
            producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<Kafka server IP/URL>:<Port>");
            producerConfig.putAll(Collections.singletonMap(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), "http://<Schema Registry IP/URL>:<Port>/api/v1"));
            producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());

            // Create a Kafka producer
            Producer<String, GenericRecord> producer = new KafkaProducer<>(producerConfig);
            // Load Avro schema from resources
            Schema schema = loadAvroSchemaFromResources(schemaFileName);
            // Create a sample Avro record
            while (true) {
                GenericRecord avroRecord = new GenericData.Record(schema);
                avroRecord.put("name", "My Name");
                avroRecord.put("lastname", "My Last Name");
                avroRecord.put("age", 10);
                avroRecord.put("email", "a@mail.com");
                avroRecord.put("timestamp", System.currentTimeMillis());

                // Create a Kafka record with a topic and Avro payload
                ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(topicName, avroRecord);

                // Send the message
                producer.send(record);
                // Sleep for 3 seconds before sending the next message
                Thread.sleep(3000);
            }
            //producer.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static Schema loadAvroSchemaFromResources(String schemaFileName) throws IOException {
        try (InputStream inputStream = KafkaAvroProducer.class.getClassLoader().getResourceAsStream(schemaFileName)) {
            if (inputStream == null) {
                throw new IllegalArgumentException("Schema file not found in resources: " + schemaFileName);
            }
            return new Schema.Parser().parse(inputStream);
        }
    }
}

El formato de mensaje Avro definido por el archivo de esquema ItemTransaction.avsc, como se muestra a continuación, se almacena en la carpeta resources del proyecto. Es importante resaltar que para la serialización y anulación de la serialización de Avro, se debe generar una clase POJO correspondiente específica de Java Avro denominada ItemTransaction.java para manejar eficazmente el formato de Avro.

{
   "type" : "record",
   "name" : "ItemTransaction",
   "namespace": "org.example",
   "fields" : [
      { "name" : "name" , "type" : "string" },
      { "name" : "lastname" , "type" : "string" },
      { "name" : "age" , "type"  : "int" },
      { "name" : "email" , "type" : "string" },
      { "name" : "timestamp" , "type" : "long" }
   ]
}

Construiremos el consumidor de Java Flink para su pipeline de datos utilizando el código de ejemplo proporcionado. Sustituya los marcadores de posición como your_kafka_bootstrap_servers, your_consumer_group_id y your_kafka_topic por los detalles de configuración de Kafka específicos.

Este código de ejemplo configura un consumidor de Kafka de Flink en el entorno de Flink, lo que le permite procesar el flujo de Kafka entrante. Personalice la lógica de procesamiento según sea necesario para el pipeline de datos.

package org.example;

import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.AvroOutputFormat;
import org.apache.flink.formats.avro.registry.cloudera.ClouderaRegistryKafkaDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Collections;
import java.util.Properties;

public class KafkaAvroFlinkConsumer {
    public static void main(String[] args) throws Exception {
        String topicName = "kafka-flink-avro-1"; // Same Kafka topic as the producer
        Properties consumerConfig = new Properties();
        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-group-id");
        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<Kafka server Ip/Url>:<Port>");
        consumerConfig.putAll(Collections.singletonMap(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), "<Schema Regitry Ip/Utl>:<Port>/api/v1"));
        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Create a Flink Kafka consumer with the custom Avro deserialization schema
        FlinkKafkaConsumer<ItemTransaction> kafkaConsumer = new FlinkKafkaConsumer<>(
                topicName,
                ClouderaRegistryKafkaDeserializationSchema
                        .builder(ItemTransaction.class)
                        .setRegistryAddress("http://<Schema Registry Ip/url>:<Port>/api/v1")
                        .build(),
                consumerConfig
        );

        // Add the Kafka consumer to the Flink environment
        DataStream<ItemTransaction> avroStream = env.addSource(kafkaConsumer);

        KeyedStream<ItemTransaction, Tuple1<String>> keyedStream = avroStream.keyBy(new KeyExtractor());

        // Process and write output using DynamicOutputPathMapper
        DataStream<Tuple2<String, Long>> windowedCounts = keyedStream
                .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
                .reduce(new ReduceFunction<ItemTransaction>() {
                    @Override
                    public ItemTransaction reduce(ItemTransaction item1, ItemTransaction item2) throws Exception {
                        return item1; // You can modify the reduce logic as needed
                    }
                })
                .map(new DynamicOutputPathMapper())
                .keyBy(0)
                .sum(1);

        windowedCounts.print();

        // Print Avro records for processing
        avroStream.print();

        env.execute("KafkaAvroFlinkConsumer");
    }

    public static class KeyExtractor implements KeySelector<ItemTransaction, Tuple1<String>> {
        @Override
        public Tuple1<String> getKey(ItemTransaction item) {
            return new Tuple1<>(item.getName().toString());
        }
    }

    // Custom Avro deserialization schema

    public static class DynamicOutputPathMapper extends RichMapFunction<ItemTransaction, Tuple2<String, Long>> {
        private long windowCount = 0;
        private transient AvroOutputFormat<ItemTransaction> avroOutputFormat;
        private transient int subtaskIndex;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
        }

        @Override
        public void close() throws Exception {
            if (avroOutputFormat != null) {
                avroOutputFormat.close();
                avroOutputFormat = null;
            }
        }

        @Override
        public Tuple2<String, Long> map(ItemTransaction item) throws Exception {
            // Increment the window count
            windowCount++;

            // Generate a dynamic output path based on the window count and subtask index
            String dynamicOutputPath = "oci://flink-output@orasenatdctocloudcorp01/output" + windowCount + "_" + subtaskIndex + ".txt";

            // Initialize or update the AvroOutputFormat
            if (avroOutputFormat == null) {
                avroOutputFormat = new AvroOutputFormat<>(new Path(dynamicOutputPath), ItemTransaction.class);
                avroOutputFormat.configure(null);
                avroOutputFormat.open(subtaskIndex, getRuntimeContext().getNumberOfParallelSubtasks());
            } else if (!dynamicOutputPath.equals(avroOutputFormat.getOutputFilePath())) {
                avroOutputFormat.close();
                avroOutputFormat.setOutputFilePath(new Path(dynamicOutputPath));
                avroOutputFormat.open(subtaskIndex, getRuntimeContext().getNumberOfParallelSubtasks());
            }

            // Write the item using AvroOutputFormat
            avroOutputFormat.writeRecord(item);

            return new Tuple2<>("Window Count", 1L);
        }
    }
}

Tarea 5: Despliegue y ejecución de los archivos jar de aplicaciones

Ha creado el productor de Java Kafka Avro y el consumidor de Flink. Ha llegado el momento de desplegar y ejecutar estas aplicaciones en sus clusters de OCI BDS. Despliegue el jar en el cluster de Hadoop y utilice los siguientes comandos para ejecutar Producer y Consumer.

sudo -u flink  /usr/odh/current/flink/bin//flink run -m yarn-cluster -yD classloader.check-leaked-classloader=false -c <Main Class Name> <Path/To/Jar>/<Jar Name>
sudo -u flink  /usr/odh/current/flink/bin//flink run -m yarn-cluster -yD classloader.check-leaked-classloader=false -c <Main Class Name> <Path/To/Jar>/<Jar Name>

Tarea 6: Comprobar si el tema está registrado en el registro de esquema

Antes de continuar, es esencial verificar si el tema de Kafka que está utilizando en el pipeline de datos está registrado en el registro de esquema. Podemos ver que el esquema está registrado con el nombre de tema utilizado en la aplicación del productor.

Regulación de esquema

El trabajo Flink se está ejecutando en el cluster de BDS, supervisemos su progreso mediante la interfaz de usuario web de Flink y validemos los datos procesados almacenados en Object Storage.

Flink web ui

Salida de almacenamiento de objetos

Tarea 8: Consulta de datos de Avro con Trino

Trino, un motor de consultas SQL distribuido de código abierto, se puede utilizar para consultar datos de Avro sin problemas. Asegúrese de que Trino esté instalado y configurado en su entorno. Obtenga los detalles de conexión necesarios, como la URL del coordinador de Trino. Abra un terminal de BDS e inicie Trino CLI con el comando. Utilice la CLI de Trino para ejecutar consultas SQL en los datos de Avro, como se muestra en la siguiente imagen.

Trino Consultar datos de Avro

Pasos Siguientes

Este tutorial proporciona una guía completa para crear un pipeline de datos en tiempo real en OCI mediante Big Data Service, integrando servicios esenciales para producir, procesar y consultar datos de Avro de forma eficiente. Con objetivos claros, instrucciones y refuerzo positivo, está bien equipado para construir y optimizar su propio pipeline de datos en tiempo real.

Agradecimientos

Más recursos de aprendizaje

Explore otros laboratorios en docs.oracle.com/learn o acceda a más contenido de aprendizaje gratuito en el canal YouTube de Oracle Learning. Además, visite education.oracle.com/learning-explorer para convertirse en Oracle Learning Explorer.

Para obtener documentación sobre el producto, visite Oracle Help Center.