Nota

Crea una pipeline di dati in tempo reale con Oracle Cloud Infrastructure Big Data Service

Introduzione

Nel mondo odierno basato sui dati, le organizzazioni si affidano a pipeline di dati efficienti per acquisire, elaborare e analizzare i dati in tempo reale. Oracle Cloud Infrastructure (OCI) offre un solido ecosistema di servizi che possono essere sfruttati per creare una potente pipeline di dati. In questa esercitazione verrà descritto come creare una pipeline di dati basata su Oracle Cloud Infrastructure Big Data Service (OCI BDS). OCI BDS è un servizio basato su cloud fornito da Oracle che consente agli utenti di creare e gestire cluster Hadoop, cluster Spark e altri servizi Big Data. Utilizzeremo una parte del suo ecosistema che include Kafka, Flink, Schema Registry e Trino per costruire questa pipeline.

Architettura della pipeline

In questa esercitazione verranno fornite istruzioni dettagliate ed esempi di codice per garantire una distribuzione della pipeline di dati trasparente e sicura.

Obiettivi

Prerequisiti

Task 1: creare un cluster Oracle Big Data con Kafka/Flink/Schema Registry/Trino abilitato

  1. Crea un ambiente BDS (ODH 2.0) su OCI. Per creare un cluster BDS, vedere Introduzione a un cluster Big Data ODH non ad alta disponibilità.

  2. Eseguire il login ad Ambari con il seguente URL: https://<cluster name>un0-hostname>:7183/ per aggiungere i servizi Kafka, Flink, Schema Registry e Trino. Immettere il nome utente e la password, quindi fare clic su Connetti.

    Accedi a Ambari

  3. Fare clic sui puntini di sospensione (...) accanto a Servizi, quindi selezionare Aggiungi servizio.

    Aggiunge servizi

  4. Selezionare Kafka, Flink, registro schema e Trino, quindi fare clic su Avanti.

    Scegli servizi

  5. Se vengono visualizzate le seguenti configurazioni consigliate, fare clic su PROCED ANYWAY, quindi su DEPLOY.

    Continuare

  6. Dopo l'installazione, fare clic su PROSSIMO, quindi su COMPLETO.

    Dopo l'installazione

    Dopo l'installazione completata

  7. Riavviare tutti i componenti interessati. Fare clic sui puntini di sospensione (...) accanto a Servizi, selezionare Riavvia tutto necessario e dopo il riavvio fare clic su OK.

    Servizi di avvio Ambari

  8. Eseguire il login al nodo master del cluster di Oracle Big Data Service tramite un comando SSH o utilizzando putty con il file ppk utilizzando le credenziali utente opc. Una volta eseguito il login, elevare le autorizzazioni all'utente root. Abbiamo usato putty per accedere ai nodi.

    sudo su -
    
  9. Eseguire i passi riportati di seguito per caricare il file jar in modo da poter accedere a Kafka con Flink.

    1. Scaricare lib.zip in qualsiasi directory (ad esempio /tmp) e decomprimerlo.

      cd /tmp
      wget https://objectstorage.ap-tokyo-1.oraclecloud.com/p/bhsTSKQYGjbeGwm5JW63Re3_o9o2JOrKiQVi_-6hrQfrB7lGSvA1z5RPyDLy3lpU/n/sehubjapacprod/b/live-lab/o/download/lib.zip
      #unzip lib
      unzip /tmp/lib.zip
      
    2. Eseguire il comando seguente per copiare il file jar in tutti i nodi BDS.

      dcli -C "cp /usr/odh/current/flink/connectors/*kafka*.jar  /usr/odh/current/flink/lib"
      dcli -C "cp /usr/lib/flink/connectors/flink-connector-jdbc-1.15.2.jar  /usr/odh/current/flink/lib"
      dcli -C "cp /usr/odh/current/kafka-broker/libs/kafka-clients-3.2.0.jar  /usr/odh/current/flink/lib"
      su - hdfs
      hadoop fs -put /usr/odh/current/flink/connectors/*.jar /flink/lib/flink-libs/
      hadoop fs -put /usr/odh/current/kafka-broker/libs/kafka-clients-3.2.0.jar /flink/lib/flink-libs/
      hadoop fs -put /tmp/lib/mysql-connector-java-8.0.28.jar /flink/lib/flink-libs/
      
  10. Eseguire il login a Trino utilizzando i seguenti comandi e creare schema e tabella in cui il consumer Flink scriverà i dati Avro.

    For HA Cluster
    
    /usr/lib/trino/bin/trino-cli --server <cordinator dns>:7778 --krb5-principal <username> --krb5-keytab-path <path to key tab> trino.service.keytab --krb5-remote-service-name trino --user <username> --truststore-path=/etc/security/serverKeys/truststore.jks
    
    For NON HA Cluster
    
    /usr/lib/trino/bin/trino-cli --server <cordinator dns>:8285
    
    Create Schema
    
    CREATE SCHEMA <catalog>.<schema name> WITH ( LOCATION  = '<object store>/<folder>');
    
    Create table on SCHEMA
    
    CREATE TABLE <catalog>.<schema name>.<table name> (
    name varchar,
    lastname varchar,
    age int,
    email varchar,
    timestamp bigint
    )
    WITH (
    format = 'AVRO',
    avro_schema_url = '<object store>/<avro schema file name>'
    )
    
    

    Nota: attenersi alla procedura per configurare il metastore Hive in modo che utilizzi l'implementazione SerdeStorageSchemaReader. È necessario che Trino esegua query sui dati dall'area di memorizzazione degli oggetti OCI.

  11. Accedere a Ambari, Hive, Config e hive-site personalizzato. Impostare la proprietà seguente per configurare il metastore Hive: metastore.storage.schema.reader.impl=org.apache.hadoop.hive.metastore.SerDeStorageSchemaReader.

  12. Creare un bucket di storage degli oggetti OCI per memorizzare l'output.

Task 2: creare un progetto Java Maven utilizzando le dipendenze POM

Per avviare lo sviluppo della pipeline di dati in tempo reale, impostare un progetto Java Maven con le seguenti dipendenze richieste. È possibile scegliere qualsiasi IDE o tipo di progetto adatto alle proprie preferenze.

<dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka</artifactId>
      <version>1.15.2</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.12</artifactId>
      <version>3.2.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>1.15.2</version>
      <scope>provided</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-core</artifactId>
      <version>1.15.2</version>
      <scope>provided</scope>
    </dependency>


    <!-- https://mvnrepository.com/artifact/com.hortonworks.registries/schema-registry-serdes -->
    <dependency>
      <groupId>com.hortonworks.registries</groupId>
      <artifactId>schema-registry-serdes</artifactId>
      <version>1.0.0</version>
    </dependency>


    <!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
    <dependency>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro</artifactId>
      <version>1.11.3</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.hortonworks.registries/schema-registry-client -->
    <dependency>
      <groupId>com.hortonworks.registries</groupId>
      <artifactId>schema-registry-client</artifactId>
      <version>1.0.0</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.hortonworks.registries/schema-registry-common -->
    <dependency>
      <groupId>com.hortonworks.registries</groupId>
      <artifactId>schema-registry-common</artifactId>
      <version>1.0.0</version>
    </dependency>


    <dependency>
      <groupId>javax.ws.rs</groupId>
      <artifactId>javax.ws.rs-api</artifactId>
      <version>2.0.1</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.hortonworks.registries/jersey-shaded -->
    <dependency>
      <groupId>com.hortonworks.registries</groupId>
      <artifactId>jersey-shaded</artifactId>
      <version>0.9.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.hortonworks.registries/common-auth -->
    <dependency>
      <groupId>com.hortonworks.registries</groupId>
      <artifactId>common-auth</artifactId>
      <version>1.0.0</version>
    </dependency>


    <!-- https://mvnrepository.com/artifact/com.hortonworks.registries/registry-common-client -->
    <dependency>
      <groupId>com.hortonworks.registries</groupId>
      <artifactId>registry-common-client</artifactId>
      <version>1.0.0</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>3.2.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-guava -->
    <dependency>
      <groupId>com.fasterxml.jackson.datatype</groupId>
      <artifactId>jackson-datatype-guava</artifactId>
      <version>2.14.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-joda -->
    <dependency>
      <groupId>com.fasterxml.jackson.datatype</groupId>
      <artifactId>jackson-datatype-joda</artifactId>
      <version>2.14.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.module/jackson-module-parameter-names -->
    <dependency>
      <groupId>com.fasterxml.jackson.module</groupId>
      <artifactId>jackson-module-parameter-names</artifactId>
      <version>2.14.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jdk8 -->
    <dependency>
      <groupId>com.fasterxml.jackson.datatype</groupId>
      <artifactId>jackson-datatype-jdk8</artifactId>
      <version>2.14.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jsr310 -->
    <dependency>
      <groupId>com.fasterxml.jackson.datatype</groupId>
      <artifactId>jackson-datatype-jsr310</artifactId>
      <version>2.14.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-json-org -->
    <dependency>
      <groupId>com.fasterxml.jackson.datatype</groupId>
      <artifactId>jackson-datatype-json-org</artifactId>
      <version>2.14.1</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-annotations -->
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-annotations</artifactId>
      <version>2.14.1</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-avro</artifactId>
      <version>1.15.2</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro-cloudera-registry -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-avro-cloudera-registry</artifactId>
      <version>1.15.1-csa1.8.0.4</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-redis -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-redis_2.10</artifactId>
      <version>1.1.5-hadoop1</version>
    </dependency>

    <dependency>
      <groupId>com.oracle.oci.sdk</groupId>
      <artifactId>oci-java-sdk-redis</artifactId>
      <version>3.26.0</version>
    </dependency>
</dependencies>

Task 3: creare il producer Java Kafka Avro utilizzando il codice di esempio e il file avsc

Creare il producer Java Kafka Avro per la pipeline di dati utilizzando il codice di esempio e lo schema Avro forniti. Sostituire i segnaposto come your_kafka_bootstrap_servers, your_schema_registry_url, your_kafka_topic e adattare lo schema Avro con i valori e la struttura specifici.

Questo codice fungerà da base per il produttore Avro, consentendogli di produrre dati Avro sull'argomento Kafka all'interno della pipeline di dati in tempo reale.

package org.example;
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.StringSerializer;
import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;


import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Properties;

public class KafkaAvroProducer {
    public static void main(String[] args) {

        String schemaFileName = "ItemTransaction.avsc"; // Provide the path to your Avro schema file
        String topicName = "kafka-flink-avro-1"; // Provide the Kafka topic name

        try {
            //Map<String, Object> config = new HashMap<>();

            // Load producer properties
            Properties producerConfig = new Properties();
            //Map<String, Object> producerConfig = new HashMap<>();
            producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<Kafka server IP/URL>:<Port>");
            producerConfig.putAll(Collections.singletonMap(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), "http://<Schema Registry IP/URL>:<Port>/api/v1"));
            producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());

            // Create a Kafka producer
            Producer<String, GenericRecord> producer = new KafkaProducer<>(producerConfig);
            // Load Avro schema from resources
            Schema schema = loadAvroSchemaFromResources(schemaFileName);
            // Create a sample Avro record
            while (true) {
                GenericRecord avroRecord = new GenericData.Record(schema);
                avroRecord.put("name", "My Name");
                avroRecord.put("lastname", "My Last Name");
                avroRecord.put("age", 10);
                avroRecord.put("email", "a@mail.com");
                avroRecord.put("timestamp", System.currentTimeMillis());

                // Create a Kafka record with a topic and Avro payload
                ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(topicName, avroRecord);

                // Send the message
                producer.send(record);
                // Sleep for 3 seconds before sending the next message
                Thread.sleep(3000);
            }
            //producer.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static Schema loadAvroSchemaFromResources(String schemaFileName) throws IOException {
        try (InputStream inputStream = KafkaAvroProducer.class.getClassLoader().getResourceAsStream(schemaFileName)) {
            if (inputStream == null) {
                throw new IllegalArgumentException("Schema file not found in resources: " + schemaFileName);
            }
            return new Schema.Parser().parse(inputStream);
        }
    }
}

Il formato messaggio Avro definito dal file di schema ItemTransaction.avsc, come mostrato di seguito, viene memorizzato nella cartella resources del progetto. È importante sottolineare che per la serializzazione e la deserializzazione di Avro, è necessario generare una classe POJO specifica di Avro Java denominata ItemTransaction.java per gestire in modo efficace il formato Avro.

{
   "type" : "record",
   "name" : "ItemTransaction",
   "namespace": "org.example",
   "fields" : [
      { "name" : "name" , "type" : "string" },
      { "name" : "lastname" , "type" : "string" },
      { "name" : "age" , "type"  : "int" },
      { "name" : "email" , "type" : "string" },
      { "name" : "timestamp" , "type" : "long" }
   ]
}

Verrà creato il consumer Java Flink per la pipeline di dati utilizzando il codice di esempio fornito. Sostituire i segnaposto come your_kafka_bootstrap_servers, your_consumer_group_id e your_kafka_topic con i dettagli di impostazione Kafka specifici.

Questo codice di esempio imposta un consumer Flink Kafka all'interno dell'ambiente Flink, consentendo di elaborare il flusso Kafka in entrata. Personalizzare la logica di elaborazione in base alle esigenze per la pipeline di dati.

package org.example;

import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.AvroOutputFormat;
import org.apache.flink.formats.avro.registry.cloudera.ClouderaRegistryKafkaDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Collections;
import java.util.Properties;

public class KafkaAvroFlinkConsumer {
    public static void main(String[] args) throws Exception {
        String topicName = "kafka-flink-avro-1"; // Same Kafka topic as the producer
        Properties consumerConfig = new Properties();
        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-group-id");
        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<Kafka server Ip/Url>:<Port>");
        consumerConfig.putAll(Collections.singletonMap(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), "<Schema Regitry Ip/Utl>:<Port>/api/v1"));
        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Create a Flink Kafka consumer with the custom Avro deserialization schema
        FlinkKafkaConsumer<ItemTransaction> kafkaConsumer = new FlinkKafkaConsumer<>(
                topicName,
                ClouderaRegistryKafkaDeserializationSchema
                        .builder(ItemTransaction.class)
                        .setRegistryAddress("http://<Schema Registry Ip/url>:<Port>/api/v1")
                        .build(),
                consumerConfig
        );

        // Add the Kafka consumer to the Flink environment
        DataStream<ItemTransaction> avroStream = env.addSource(kafkaConsumer);

        KeyedStream<ItemTransaction, Tuple1<String>> keyedStream = avroStream.keyBy(new KeyExtractor());

        // Process and write output using DynamicOutputPathMapper
        DataStream<Tuple2<String, Long>> windowedCounts = keyedStream
                .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
                .reduce(new ReduceFunction<ItemTransaction>() {
                    @Override
                    public ItemTransaction reduce(ItemTransaction item1, ItemTransaction item2) throws Exception {
                        return item1; // You can modify the reduce logic as needed
                    }
                })
                .map(new DynamicOutputPathMapper())
                .keyBy(0)
                .sum(1);

        windowedCounts.print();

        // Print Avro records for processing
        avroStream.print();

        env.execute("KafkaAvroFlinkConsumer");
    }

    public static class KeyExtractor implements KeySelector<ItemTransaction, Tuple1<String>> {
        @Override
        public Tuple1<String> getKey(ItemTransaction item) {
            return new Tuple1<>(item.getName().toString());
        }
    }

    // Custom Avro deserialization schema

    public static class DynamicOutputPathMapper extends RichMapFunction<ItemTransaction, Tuple2<String, Long>> {
        private long windowCount = 0;
        private transient AvroOutputFormat<ItemTransaction> avroOutputFormat;
        private transient int subtaskIndex;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
        }

        @Override
        public void close() throws Exception {
            if (avroOutputFormat != null) {
                avroOutputFormat.close();
                avroOutputFormat = null;
            }
        }

        @Override
        public Tuple2<String, Long> map(ItemTransaction item) throws Exception {
            // Increment the window count
            windowCount++;

            // Generate a dynamic output path based on the window count and subtask index
            String dynamicOutputPath = "oci://flink-output@orasenatdctocloudcorp01/output" + windowCount + "_" + subtaskIndex + ".txt";

            // Initialize or update the AvroOutputFormat
            if (avroOutputFormat == null) {
                avroOutputFormat = new AvroOutputFormat<>(new Path(dynamicOutputPath), ItemTransaction.class);
                avroOutputFormat.configure(null);
                avroOutputFormat.open(subtaskIndex, getRuntimeContext().getNumberOfParallelSubtasks());
            } else if (!dynamicOutputPath.equals(avroOutputFormat.getOutputFilePath())) {
                avroOutputFormat.close();
                avroOutputFormat.setOutputFilePath(new Path(dynamicOutputPath));
                avroOutputFormat.open(subtaskIndex, getRuntimeContext().getNumberOfParallelSubtasks());
            }

            // Write the item using AvroOutputFormat
            avroOutputFormat.writeRecord(item);

            return new Tuple2<>("Window Count", 1L);
        }
    }
}

Task 5: Distribuzione ed esecuzione dei file jar delle applicazioni

Hai creato il producer Java Kafka Avro e il consumer Flink. È giunto il momento di distribuire ed eseguire queste applicazioni sui cluster BDS OCI. Distribuire il file jar nel cluster Hadoop e utilizzare i comandi riportati di seguito per eseguire Producer e Consumer.

sudo -u flink  /usr/odh/current/flink/bin//flink run -m yarn-cluster -yD classloader.check-leaked-classloader=false -c <Main Class Name> <Path/To/Jar>/<Jar Name>
sudo -u flink  /usr/odh/current/flink/bin//flink run -m yarn-cluster -yD classloader.check-leaked-classloader=false -c <Main Class Name> <Path/To/Jar>/<Jar Name>

Task 6: verificare se l'argomento è registrato nel registro schema

Prima di procedere ulteriormente, è essenziale verificare se l'argomento Kafka in uso nella pipeline di dati è registrato nel registro schema. Lo schema viene registrato con il nome argomento utilizzato nell'applicazione producer.

Registro schema

Il job Flink è in esecuzione sul cluster BDS, monitoriamo lo stato di avanzamento tramite l'interfaccia utente Web Flink e convalidiamo i dati elaborati memorizzati nello storage degli oggetti.

Sito web Flink

Output storage degli oggetti

Task 8: interrogare i dati Avro utilizzando Trino

Trino, un motore di query SQL distribuito open source, può essere utilizzato per eseguire query sui dati Avro senza problemi. Assicurarsi che Trino sia installato e configurato nell'ambiente in uso. Ottenere i dettagli di connessione necessari, ad esempio l'URL del coordinatore Trino. Aprire un terminale BDS e avviare la CLI Trino utilizzando il comando. Utilizzare l'interfaccia CLI Trino per eseguire query SQL sui dati Avro come mostrato nell'immagine riportata di seguito.

Dati Avro query Trino

Passi successivi

Questa esercitazione fornisce una guida completa alla creazione di una pipeline di dati in tempo reale su OCI utilizzando il servizio Big Data, integrando servizi essenziali per produrre, elaborare ed eseguire query sui dati Avro in modo efficiente. Con obiettivi chiari, istruzioni e rinforzi positivi, sei ben attrezzato per costruire e ottimizzare la tua pipeline di dati in tempo reale.

Conferme

Altre risorse di apprendimento

Esplora altri laboratori su docs.oracle.com/learn o accedi a più contenuti gratuiti sulla formazione su Oracle Learning YouTube channel. Inoltre, visita education.oracle.com/learning-explorer per diventare Oracle Learning Explorer.

Per la documentazione del prodotto, visitare Oracle Help Center.