Remarques :

Création d'un pipeline de données en temps réel avec Oracle Cloud Infrastructure Big Data Service

Introduction

Dans le monde actuel axé sur les données, les entreprises s'appuient sur des pipelines de données efficaces pour capturer, traiter et analyser les données en temps réel. Oracle Cloud Infrastructure (OCI) fournit un écosystème robuste de services pouvant être exploités pour créer un pipeline de données puissant. Dans ce tutoriel, nous vous guiderons tout au long du processus de création d'un pipeline de données qui utilise Oracle Cloud Infrastructure Big Data Service (OCI BDS). OCI BDS est un service cloud fourni par Oracle qui permet aux utilisateurs de créer et de gérer des clusters Hadoop, des clusters Spark et d'autres services Big Data. Nous utiliserons une partie de son écosystème qui comprend Kafka, Flink, Schema Registry et Trino pour construire ce pipeline.

Architecture de pipeline

Tout au long de ce tutoriel, nous fournirons des instructions détaillées et des exemples de code pour garantir un déploiement de pipeline de données transparent et sécurisé.

Objectifs

Prérequis

Tâche 1 : créer un cluster Oracle Big Data avec Kafka/Flink/Schema Registry/Trino activé

  1. Créez un environnement BDS (ODH 2.0) sur OCI. Pour créer un cluster BDS, reportez-vous à Introduction à un cluster Big Data ODH qui n'est pas hautement disponible.

  2. Connectez-vous à Ambari avec l'URL suivante : https://<cluster name>un0-hostname>:7183/ pour ajouter des services Kafka, Flink, schema registry et Trino. Entrez le nom utilisateur et le mot de passe, puis cliquez sur SIGN IN.

    Se connecter à Ambari

  3. Cliquez sur les points de suspension (...) en regard de Services, puis sélectionnez Ajouter un service.

    Ajouter des services

  4. Cochez Kafka, Flink, registre de schéma et Trino, puis cliquez sur Suivant.

    Sélectionner des services

  5. Si les configurations recommandées suivantes s'affichent, cliquez sur PROCEED ANYWAY, puis sur DEPLOY.

    Continuer quand même

  6. Après l'installation, cliquez sur NEXT, puis sur COMPLETE.

    Après l'installation suivante

    Après l'installation terminée

  7. Redémarrez tous les composants concernés. Cliquez sur les points de suspension (...) en regard de Services, sélectionnez Redémarrer tout requis et, après le redémarrage, cliquez sur OK.

    Services de démarrage Ambari

  8. Connectez-vous au noeud maître du cluster Oracle Big Data Service via une commande SSH ou en utilisant putty avec le fichier ppk à l'aide des informations d'identification utilisateur opc. Une fois connecté, transférez vos droits d'accès à l'utilisateur root. Nous avons utilisé putty pour nous connecter aux noeuds.

    sudo su -
    
  9. Exécutez les étapes suivantes pour télécharger le fichier JAR afin de pouvoir accéder à Kafka avec Flink.

    1. Téléchargez lib.zip dans n'importe quel répertoire (par exemple, /tmp) et décompressez-le.

      cd /tmp
      wget https://objectstorage.ap-tokyo-1.oraclecloud.com/p/bhsTSKQYGjbeGwm5JW63Re3_o9o2JOrKiQVi_-6hrQfrB7lGSvA1z5RPyDLy3lpU/n/sehubjapacprod/b/live-lab/o/download/lib.zip
      #unzip lib
      unzip /tmp/lib.zip
      
    2. Exécutez la commande suivante pour copier le fichier JAR sur tous les noeuds BDS.

      dcli -C "cp /usr/odh/current/flink/connectors/*kafka*.jar  /usr/odh/current/flink/lib"
      dcli -C "cp /usr/lib/flink/connectors/flink-connector-jdbc-1.15.2.jar  /usr/odh/current/flink/lib"
      dcli -C "cp /usr/odh/current/kafka-broker/libs/kafka-clients-3.2.0.jar  /usr/odh/current/flink/lib"
      su - hdfs
      hadoop fs -put /usr/odh/current/flink/connectors/*.jar /flink/lib/flink-libs/
      hadoop fs -put /usr/odh/current/kafka-broker/libs/kafka-clients-3.2.0.jar /flink/lib/flink-libs/
      hadoop fs -put /tmp/lib/mysql-connector-java-8.0.28.jar /flink/lib/flink-libs/
      
  10. Connectez-vous à Trino à l'aide des commandes suivantes et créez un schéma et une table où le consommateur Flink écrira les données Avro.

    For HA Cluster
    
    /usr/lib/trino/bin/trino-cli --server <cordinator dns>:7778 --krb5-principal <username> --krb5-keytab-path <path to key tab> trino.service.keytab --krb5-remote-service-name trino --user <username> --truststore-path=/etc/security/serverKeys/truststore.jks
    
    For NON HA Cluster
    
    /usr/lib/trino/bin/trino-cli --server <cordinator dns>:8285
    
    Create Schema
    
    CREATE SCHEMA <catalog>.<schema name> WITH ( LOCATION  = '<object store>/<folder>');
    
    Create table on SCHEMA
    
    CREATE TABLE <catalog>.<schema name>.<table name> (
    name varchar,
    lastname varchar,
    age int,
    email varchar,
    timestamp bigint
    )
    WITH (
    format = 'AVRO',
    avro_schema_url = '<object store>/<avro schema file name>'
    )
    
    

    Remarque : suivez les étapes de configuration du metastore Hive pour utiliser l'implémentation SerdeStorageSchemaReader. Trino doit interroger les données de la banque d'objets OCI.

  11. Accédez à Ambari, Hive, Config et Custom hive-site. Définissez la propriété suivante pour configurer le metastore Hive : metastore.storage.schema.reader.impl=org.apache.hadoop.hive.metastore.SerDeStorageSchemaReader.

  12. Créez un bucket OCI Object Storage pour stocker la sortie.

Tâche 2 : créer un projet Java Maven à l'aide de dépendances POM

Pour démarrer le développement de votre pipeline de données en temps réel, configurons un projet Java Maven avec les dépendances requises suivantes. Vous pouvez choisir n'importe quel IDE ou type de projet qui correspond à vos préférences.

<dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka</artifactId>
      <version>1.15.2</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.12</artifactId>
      <version>3.2.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>1.15.2</version>
      <scope>provided</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-core</artifactId>
      <version>1.15.2</version>
      <scope>provided</scope>
    </dependency>


    <!-- https://mvnrepository.com/artifact/com.hortonworks.registries/schema-registry-serdes -->
    <dependency>
      <groupId>com.hortonworks.registries</groupId>
      <artifactId>schema-registry-serdes</artifactId>
      <version>1.0.0</version>
    </dependency>


    <!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
    <dependency>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro</artifactId>
      <version>1.11.3</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.hortonworks.registries/schema-registry-client -->
    <dependency>
      <groupId>com.hortonworks.registries</groupId>
      <artifactId>schema-registry-client</artifactId>
      <version>1.0.0</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.hortonworks.registries/schema-registry-common -->
    <dependency>
      <groupId>com.hortonworks.registries</groupId>
      <artifactId>schema-registry-common</artifactId>
      <version>1.0.0</version>
    </dependency>


    <dependency>
      <groupId>javax.ws.rs</groupId>
      <artifactId>javax.ws.rs-api</artifactId>
      <version>2.0.1</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.hortonworks.registries/jersey-shaded -->
    <dependency>
      <groupId>com.hortonworks.registries</groupId>
      <artifactId>jersey-shaded</artifactId>
      <version>0.9.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.hortonworks.registries/common-auth -->
    <dependency>
      <groupId>com.hortonworks.registries</groupId>
      <artifactId>common-auth</artifactId>
      <version>1.0.0</version>
    </dependency>


    <!-- https://mvnrepository.com/artifact/com.hortonworks.registries/registry-common-client -->
    <dependency>
      <groupId>com.hortonworks.registries</groupId>
      <artifactId>registry-common-client</artifactId>
      <version>1.0.0</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>3.2.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-guava -->
    <dependency>
      <groupId>com.fasterxml.jackson.datatype</groupId>
      <artifactId>jackson-datatype-guava</artifactId>
      <version>2.14.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-joda -->
    <dependency>
      <groupId>com.fasterxml.jackson.datatype</groupId>
      <artifactId>jackson-datatype-joda</artifactId>
      <version>2.14.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.module/jackson-module-parameter-names -->
    <dependency>
      <groupId>com.fasterxml.jackson.module</groupId>
      <artifactId>jackson-module-parameter-names</artifactId>
      <version>2.14.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jdk8 -->
    <dependency>
      <groupId>com.fasterxml.jackson.datatype</groupId>
      <artifactId>jackson-datatype-jdk8</artifactId>
      <version>2.14.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jsr310 -->
    <dependency>
      <groupId>com.fasterxml.jackson.datatype</groupId>
      <artifactId>jackson-datatype-jsr310</artifactId>
      <version>2.14.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-json-org -->
    <dependency>
      <groupId>com.fasterxml.jackson.datatype</groupId>
      <artifactId>jackson-datatype-json-org</artifactId>
      <version>2.14.1</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-annotations -->
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-annotations</artifactId>
      <version>2.14.1</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-avro</artifactId>
      <version>1.15.2</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro-cloudera-registry -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-avro-cloudera-registry</artifactId>
      <version>1.15.1-csa1.8.0.4</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-redis -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-redis_2.10</artifactId>
      <version>1.1.5-hadoop1</version>
    </dependency>

    <dependency>
      <groupId>com.oracle.oci.sdk</groupId>
      <artifactId>oci-java-sdk-redis</artifactId>
      <version>3.26.0</version>
    </dependency>
</dependencies>

Tâche 3 : créer un producteur Java Kafka Avro à l'aide d'un exemple de code et d'un fichier avsc

Créez un producteur Java Kafka Avro pour votre pipeline de données à l'aide de l'exemple de code et du schéma Avro fournis. Remplacez les espaces réservés tels que your_kafka_bootstrap_servers, your_schema_registry_url, your_kafka_topic et adaptez le schéma Avro avec vos valeurs et votre structure spécifiques.

Ce code servira de base à votre producteur Avro, ce qui lui permettra de produire des données Avro vers la rubrique Kafka dans votre pipeline de données en temps réel.

package org.example;
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.StringSerializer;
import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;


import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Properties;

public class KafkaAvroProducer {
    public static void main(String[] args) {

        String schemaFileName = "ItemTransaction.avsc"; // Provide the path to your Avro schema file
        String topicName = "kafka-flink-avro-1"; // Provide the Kafka topic name

        try {
            //Map<String, Object> config = new HashMap<>();

            // Load producer properties
            Properties producerConfig = new Properties();
            //Map<String, Object> producerConfig = new HashMap<>();
            producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<Kafka server IP/URL>:<Port>");
            producerConfig.putAll(Collections.singletonMap(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), "http://<Schema Registry IP/URL>:<Port>/api/v1"));
            producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());

            // Create a Kafka producer
            Producer<String, GenericRecord> producer = new KafkaProducer<>(producerConfig);
            // Load Avro schema from resources
            Schema schema = loadAvroSchemaFromResources(schemaFileName);
            // Create a sample Avro record
            while (true) {
                GenericRecord avroRecord = new GenericData.Record(schema);
                avroRecord.put("name", "My Name");
                avroRecord.put("lastname", "My Last Name");
                avroRecord.put("age", 10);
                avroRecord.put("email", "a@mail.com");
                avroRecord.put("timestamp", System.currentTimeMillis());

                // Create a Kafka record with a topic and Avro payload
                ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(topicName, avroRecord);

                // Send the message
                producer.send(record);
                // Sleep for 3 seconds before sending the next message
                Thread.sleep(3000);
            }
            //producer.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static Schema loadAvroSchemaFromResources(String schemaFileName) throws IOException {
        try (InputStream inputStream = KafkaAvroProducer.class.getClassLoader().getResourceAsStream(schemaFileName)) {
            if (inputStream == null) {
                throw new IllegalArgumentException("Schema file not found in resources: " + schemaFileName);
            }
            return new Schema.Parser().parse(inputStream);
        }
    }
}

Le format de message Avro défini par le fichier de schéma ItemTransaction.avsc, comme indiqué ci-dessous, est stocké dans le dossier resources du projet. Il est important de souligner que pour la sérialisation et la désérialisation Avro, une classe POJO spécifique à Java Avro nommée ItemTransaction.java doit être générée pour gérer efficacement le format Avro.

{
   "type" : "record",
   "name" : "ItemTransaction",
   "namespace": "org.example",
   "fields" : [
      { "name" : "name" , "type" : "string" },
      { "name" : "lastname" , "type" : "string" },
      { "name" : "age" , "type"  : "int" },
      { "name" : "email" , "type" : "string" },
      { "name" : "timestamp" , "type" : "long" }
   ]
}

Nous construirons le consommateur Java Flink pour votre pipeline de données à l'aide de l'exemple de code fourni. Remplacez les espaces réservés tels que your_kafka_bootstrap_servers, your_consumer_group_id et your_kafka_topic par vos détails de configuration Kafka spécifiques.

Cet exemple de code configure un destinataire Kafka Flink dans l'environnement Flink, ce qui vous permet de traiter le flux Kafka entrant. Personnalisez la logique de traitement selon vos besoins pour votre pipeline de données.

package org.example;

import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.AvroOutputFormat;
import org.apache.flink.formats.avro.registry.cloudera.ClouderaRegistryKafkaDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Collections;
import java.util.Properties;

public class KafkaAvroFlinkConsumer {
    public static void main(String[] args) throws Exception {
        String topicName = "kafka-flink-avro-1"; // Same Kafka topic as the producer
        Properties consumerConfig = new Properties();
        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-group-id");
        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<Kafka server Ip/Url>:<Port>");
        consumerConfig.putAll(Collections.singletonMap(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), "<Schema Regitry Ip/Utl>:<Port>/api/v1"));
        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Create a Flink Kafka consumer with the custom Avro deserialization schema
        FlinkKafkaConsumer<ItemTransaction> kafkaConsumer = new FlinkKafkaConsumer<>(
                topicName,
                ClouderaRegistryKafkaDeserializationSchema
                        .builder(ItemTransaction.class)
                        .setRegistryAddress("http://<Schema Registry Ip/url>:<Port>/api/v1")
                        .build(),
                consumerConfig
        );

        // Add the Kafka consumer to the Flink environment
        DataStream<ItemTransaction> avroStream = env.addSource(kafkaConsumer);

        KeyedStream<ItemTransaction, Tuple1<String>> keyedStream = avroStream.keyBy(new KeyExtractor());

        // Process and write output using DynamicOutputPathMapper
        DataStream<Tuple2<String, Long>> windowedCounts = keyedStream
                .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
                .reduce(new ReduceFunction<ItemTransaction>() {
                    @Override
                    public ItemTransaction reduce(ItemTransaction item1, ItemTransaction item2) throws Exception {
                        return item1; // You can modify the reduce logic as needed
                    }
                })
                .map(new DynamicOutputPathMapper())
                .keyBy(0)
                .sum(1);

        windowedCounts.print();

        // Print Avro records for processing
        avroStream.print();

        env.execute("KafkaAvroFlinkConsumer");
    }

    public static class KeyExtractor implements KeySelector<ItemTransaction, Tuple1<String>> {
        @Override
        public Tuple1<String> getKey(ItemTransaction item) {
            return new Tuple1<>(item.getName().toString());
        }
    }

    // Custom Avro deserialization schema

    public static class DynamicOutputPathMapper extends RichMapFunction<ItemTransaction, Tuple2<String, Long>> {
        private long windowCount = 0;
        private transient AvroOutputFormat<ItemTransaction> avroOutputFormat;
        private transient int subtaskIndex;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
        }

        @Override
        public void close() throws Exception {
            if (avroOutputFormat != null) {
                avroOutputFormat.close();
                avroOutputFormat = null;
            }
        }

        @Override
        public Tuple2<String, Long> map(ItemTransaction item) throws Exception {
            // Increment the window count
            windowCount++;

            // Generate a dynamic output path based on the window count and subtask index
            String dynamicOutputPath = "oci://flink-output@orasenatdctocloudcorp01/output" + windowCount + "_" + subtaskIndex + ".txt";

            // Initialize or update the AvroOutputFormat
            if (avroOutputFormat == null) {
                avroOutputFormat = new AvroOutputFormat<>(new Path(dynamicOutputPath), ItemTransaction.class);
                avroOutputFormat.configure(null);
                avroOutputFormat.open(subtaskIndex, getRuntimeContext().getNumberOfParallelSubtasks());
            } else if (!dynamicOutputPath.equals(avroOutputFormat.getOutputFilePath())) {
                avroOutputFormat.close();
                avroOutputFormat.setOutputFilePath(new Path(dynamicOutputPath));
                avroOutputFormat.open(subtaskIndex, getRuntimeContext().getNumberOfParallelSubtasks());
            }

            // Write the item using AvroOutputFormat
            avroOutputFormat.writeRecord(item);

            return new Tuple2<>("Window Count", 1L);
        }
    }
}

Tâche 5 : déployer et exécuter les fichiers JAR Applications

Vous avez créé le producteur Java Kafka Avro et le consommateur Flink. Il est temps de déployer et d'exécuter ces applications sur vos clusters OCI BDS. Déployez le fichier JAR dans le cluster Hadoop et utilisez les commandes suivantes pour exécuter Producer et Consumer.

sudo -u flink  /usr/odh/current/flink/bin//flink run -m yarn-cluster -yD classloader.check-leaked-classloader=false -c <Main Class Name> <Path/To/Jar>/<Jar Name>
sudo -u flink  /usr/odh/current/flink/bin//flink run -m yarn-cluster -yD classloader.check-leaked-classloader=false -c <Main Class Name> <Path/To/Jar>/<Jar Name>

Tâche 6 : vérifier si le sujet est inscrit auprès du registre de schémas

Avant de continuer, il est essentiel de vérifier si la rubrique Kafka que vous utilisez dans votre pipeline de données est inscrite auprès du registre de schémas. Nous voyons que le schéma est inscrit avec le nom de rubrique utilisé dans l'application de fournisseur.

Régitry de schéma

Votre travail Flink est en cours d'exécution sur le cluster BDS, surveillons sa progression via l'interface utilisateur Web Flink et validons les données traitées stockées dans Object Storage.

Interface utilisateur Web Flink

Sortie Object Storage

Tâche 8 : interroger les données Avro à l'aide de Trino

Trino, un moteur de requête SQL distribué open source, peut être utilisé pour interroger les données Avro de manière transparente. Assurez-vous que Trino est installé et configuré dans votre environnement. Obtenez les détails de connexion nécessaires, tels que l'URL du coordinateur Trino. Ouvrez un terminal BDS et démarrez la CLI Trino à l'aide de la commande. Utilisez l'interface de ligne de commande Trino pour exécuter des requêtes SQL sur les données Avro, comme indiqué dans l'image suivante.

Données Avro de requête Trino

Etapes suivantes

Ce tutoriel fournit un guide complet pour créer un pipeline de données en temps réel sur OCI à l'aide de Big Data Service, intégrant des services essentiels pour produire, traiter et interroger efficacement les données Avro. Avec des objectifs clairs, des instructions et un renforcement positif, vous êtes bien équipé pour construire et optimiser votre propre pipeline de données en temps réel.

Remerciements

Ressources de formation supplémentaires

Parcourez d'autres ateliers sur docs.oracle.com/learn ou accédez à davantage de contenus de formation gratuits sur le canal Oracle Learning YouTube. De plus, rendez-vous sur education.oracle.com/learning-explorer pour devenir un explorateur Oracle Learning.

Pour obtenir de la documentation sur le produit, visitez Oracle Help Center.