Remarques :
- Ce tutoriel nécessite un accès à Oracle Cloud. Pour vous inscrire à un compte gratuit, reportez-vous à Introduction au niveau gratuit d'Oracle Cloud Infrastructure.
- Il utilise des exemples de valeur pour les informations d'identification, la location et les compartiments Oracle Cloud Infrastructure. A la fin de l'exercice, remplacez ces valeurs par des valeurs propres à votre environnement cloud.
Création d'un pipeline de données en temps réel avec Oracle Cloud Infrastructure Big Data Service
Introduction
Dans le monde actuel axé sur les données, les entreprises s'appuient sur des pipelines de données efficaces pour capturer, traiter et analyser les données en temps réel. Oracle Cloud Infrastructure (OCI) fournit un écosystème robuste de services pouvant être exploités pour créer un pipeline de données puissant. Dans ce tutoriel, nous vous guiderons tout au long du processus de création d'un pipeline de données qui utilise Oracle Cloud Infrastructure Big Data Service (OCI BDS). OCI BDS est un service cloud fourni par Oracle qui permet aux utilisateurs de créer et de gérer des clusters Hadoop, des clusters Spark et d'autres services Big Data. Nous utiliserons une partie de son écosystème qui comprend Kafka, Flink, Schema Registry et Trino pour construire ce pipeline.
Tout au long de ce tutoriel, nous fournirons des instructions détaillées et des exemples de code pour garantir un déploiement de pipeline de données transparent et sécurisé.
Objectifs
-
Configurer les ressources OCI : découvrez comment configurer les ressources OCI, y compris les clusters BDS, Kafka, Flink, Schema Registry, OCI Object Storage et Trino, pour faciliter le pipeline de données.
-
Créer un fournisseur Avro (Java) : créez une application Java pour produire des données Avro vers la rubrique Kafka tout en utilisant des schémas Avro à partir du registre de schémas.
-
Créer un consommateur Flink (Java) : configurez un cluster BDS pour un consommateur Flink qui utilise les données de Kafka, les traite et les stocke dans Object Storage.
-
Déployer et exécuter les applications : Découvrez comment déployer et exécuter les applications de fournisseur Avro et Flink sur vos clusters BDS.
-
Données organisées par requête : accédez aux données organisées dans OCI Object Storage à l'aide de divers outils OCI, notamment Trino, BDS Cloud SQL, Query Service, OAC, etc. Nous allons utiliser Trino pour interroger les données Avro.
Prérequis
-
Un compte OCI.
-
Accès aux services OCI, y compris OCI BDS et OCI Object Storage.
-
Connaissance de Java et Apache Flink.
-
Les outils et kits SDK nécessaires de l'interface de ligne de commande (CLI) OCI sont installés.
-
Compréhension d'Avro et de Kafka.
Tâche 1 : créer un cluster Oracle Big Data avec Kafka/Flink/Schema Registry/Trino activé
-
Créez un environnement BDS (ODH 2.0) sur OCI. Pour créer un cluster BDS, reportez-vous à Introduction à un cluster Big Data ODH qui n'est pas hautement disponible.
-
Connectez-vous à Ambari avec l'URL suivante :
https://<cluster name>un0-hostname>:7183/
pour ajouter des services Kafka, Flink, schema registry et Trino. Entrez le nom utilisateur et le mot de passe, puis cliquez sur SIGN IN. -
Cliquez sur les points de suspension (...) en regard de Services, puis sélectionnez Ajouter un service.
-
Cochez Kafka, Flink, registre de schéma et Trino, puis cliquez sur Suivant.
-
Si les configurations recommandées suivantes s'affichent, cliquez sur PROCEED ANYWAY, puis sur DEPLOY.
-
Après l'installation, cliquez sur NEXT, puis sur COMPLETE.
-
Redémarrez tous les composants concernés. Cliquez sur les points de suspension (...) en regard de Services, sélectionnez Redémarrer tout requis et, après le redémarrage, cliquez sur OK.
-
Connectez-vous au noeud maître du cluster Oracle Big Data Service via une commande
SSH
ou en utilisant putty avec le fichierppk
à l'aide des informations d'identification utilisateuropc
. Une fois connecté, transférez vos droits d'accès à l'utilisateurroot
. Nous avons utilisé putty pour nous connecter aux noeuds.sudo su -
-
Exécutez les étapes suivantes pour télécharger le fichier JAR afin de pouvoir accéder à Kafka avec Flink.
-
Téléchargez
lib.zip
dans n'importe quel répertoire (par exemple,/tmp
) et décompressez-le.cd /tmp wget https://objectstorage.ap-tokyo-1.oraclecloud.com/p/bhsTSKQYGjbeGwm5JW63Re3_o9o2JOrKiQVi_-6hrQfrB7lGSvA1z5RPyDLy3lpU/n/sehubjapacprod/b/live-lab/o/download/lib.zip #unzip lib unzip /tmp/lib.zip
-
Exécutez la commande suivante pour copier le fichier JAR sur tous les noeuds BDS.
dcli -C "cp /usr/odh/current/flink/connectors/*kafka*.jar /usr/odh/current/flink/lib" dcli -C "cp /usr/lib/flink/connectors/flink-connector-jdbc-1.15.2.jar /usr/odh/current/flink/lib" dcli -C "cp /usr/odh/current/kafka-broker/libs/kafka-clients-3.2.0.jar /usr/odh/current/flink/lib" su - hdfs hadoop fs -put /usr/odh/current/flink/connectors/*.jar /flink/lib/flink-libs/ hadoop fs -put /usr/odh/current/kafka-broker/libs/kafka-clients-3.2.0.jar /flink/lib/flink-libs/ hadoop fs -put /tmp/lib/mysql-connector-java-8.0.28.jar /flink/lib/flink-libs/
-
-
Connectez-vous à Trino à l'aide des commandes suivantes et créez un schéma et une table où le consommateur Flink écrira les données Avro.
For HA Cluster /usr/lib/trino/bin/trino-cli --server <cordinator dns>:7778 --krb5-principal <username> --krb5-keytab-path <path to key tab> trino.service.keytab --krb5-remote-service-name trino --user <username> --truststore-path=/etc/security/serverKeys/truststore.jks For NON HA Cluster /usr/lib/trino/bin/trino-cli --server <cordinator dns>:8285 Create Schema CREATE SCHEMA <catalog>.<schema name> WITH ( LOCATION = '<object store>/<folder>'); Create table on SCHEMA CREATE TABLE <catalog>.<schema name>.<table name> ( name varchar, lastname varchar, age int, email varchar, timestamp bigint ) WITH ( format = 'AVRO', avro_schema_url = '<object store>/<avro schema file name>' )
Remarque : suivez les étapes de configuration du metastore Hive pour utiliser l'implémentation
SerdeStorageSchemaReader
. Trino doit interroger les données de la banque d'objets OCI. -
Accédez à Ambari, Hive, Config et Custom hive-site. Définissez la propriété suivante pour configurer le metastore Hive :
metastore.storage.schema.reader.impl=org.apache.hadoop.hive.metastore.SerDeStorageSchemaReader
. -
Créez un bucket OCI Object Storage pour stocker la sortie.
Tâche 2 : créer un projet Java Maven à l'aide de dépendances POM
Pour démarrer le développement de votre pipeline de données en temps réel, configurons un projet Java Maven avec les dépendances requises suivantes. Vous pouvez choisir n'importe quel IDE ou type de projet qui correspond à vos préférences.
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.15.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>3.2.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.15.2</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.15.2</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/schema-registry-serdes -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>schema-registry-serdes</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/schema-registry-client -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>schema-registry-client</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/schema-registry-common -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>schema-registry-common</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
<version>2.0.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/jersey-shaded -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>jersey-shaded</artifactId>
<version>0.9.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/common-auth -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>common-auth</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/registry-common-client -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>registry-common-client</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-guava -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-guava</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-joda -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-joda</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.module/jackson-module-parameter-names -->
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-parameter-names</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jdk8 -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jdk8</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jsr310 -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-json-org -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-json-org</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-annotations -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>1.15.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro-cloudera-registry -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro-cloudera-registry</artifactId>
<version>1.15.1-csa1.8.0.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-redis -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.10</artifactId>
<version>1.1.5-hadoop1</version>
</dependency>
<dependency>
<groupId>com.oracle.oci.sdk</groupId>
<artifactId>oci-java-sdk-redis</artifactId>
<version>3.26.0</version>
</dependency>
</dependencies>
Tâche 3 : créer un producteur Java Kafka Avro à l'aide d'un exemple de code et d'un fichier avsc
Créez un producteur Java Kafka Avro pour votre pipeline de données à l'aide de l'exemple de code et du schéma Avro fournis. Remplacez les espaces réservés tels que your_kafka_bootstrap_servers
, your_schema_registry_url
, your_kafka_topic
et adaptez le schéma Avro avec vos valeurs et votre structure spécifiques.
Ce code servira de base à votre producteur Avro, ce qui lui permettra de produire des données Avro vers la rubrique Kafka dans votre pipeline de données en temps réel.
package org.example;
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.StringSerializer;
import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Properties;
public class KafkaAvroProducer {
public static void main(String[] args) {
String schemaFileName = "ItemTransaction.avsc"; // Provide the path to your Avro schema file
String topicName = "kafka-flink-avro-1"; // Provide the Kafka topic name
try {
//Map<String, Object> config = new HashMap<>();
// Load producer properties
Properties producerConfig = new Properties();
//Map<String, Object> producerConfig = new HashMap<>();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<Kafka server IP/URL>:<Port>");
producerConfig.putAll(Collections.singletonMap(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), "http://<Schema Registry IP/URL>:<Port>/api/v1"));
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
// Create a Kafka producer
Producer<String, GenericRecord> producer = new KafkaProducer<>(producerConfig);
// Load Avro schema from resources
Schema schema = loadAvroSchemaFromResources(schemaFileName);
// Create a sample Avro record
while (true) {
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("name", "My Name");
avroRecord.put("lastname", "My Last Name");
avroRecord.put("age", 10);
avroRecord.put("email", "a@mail.com");
avroRecord.put("timestamp", System.currentTimeMillis());
// Create a Kafka record with a topic and Avro payload
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(topicName, avroRecord);
// Send the message
producer.send(record);
// Sleep for 3 seconds before sending the next message
Thread.sleep(3000);
}
//producer.close();
} catch (Exception e) {
e.printStackTrace();
}
}
private static Schema loadAvroSchemaFromResources(String schemaFileName) throws IOException {
try (InputStream inputStream = KafkaAvroProducer.class.getClassLoader().getResourceAsStream(schemaFileName)) {
if (inputStream == null) {
throw new IllegalArgumentException("Schema file not found in resources: " + schemaFileName);
}
return new Schema.Parser().parse(inputStream);
}
}
}
Le format de message Avro défini par le fichier de schéma ItemTransaction.avsc
, comme indiqué ci-dessous, est stocké dans le dossier resources
du projet. Il est important de souligner que pour la sérialisation et la désérialisation Avro, une classe POJO spécifique à Java Avro nommée ItemTransaction.java
doit être générée pour gérer efficacement le format Avro.
{
"type" : "record",
"name" : "ItemTransaction",
"namespace": "org.example",
"fields" : [
{ "name" : "name" , "type" : "string" },
{ "name" : "lastname" , "type" : "string" },
{ "name" : "age" , "type" : "int" },
{ "name" : "email" , "type" : "string" },
{ "name" : "timestamp" , "type" : "long" }
]
}
Tâche 4 : créer un consommateur Java Flink à l'aide d'un exemple de code
Nous construirons le consommateur Java Flink pour votre pipeline de données à l'aide de l'exemple de code fourni. Remplacez les espaces réservés tels que your_kafka_bootstrap_servers
, your_consumer_group_id
et your_kafka_topic
par vos détails de configuration Kafka spécifiques.
Cet exemple de code configure un destinataire Kafka Flink dans l'environnement Flink, ce qui vous permet de traiter le flux Kafka entrant. Personnalisez la logique de traitement selon vos besoins pour votre pipeline de données.
package org.example;
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.AvroOutputFormat;
import org.apache.flink.formats.avro.registry.cloudera.ClouderaRegistryKafkaDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;
public class KafkaAvroFlinkConsumer {
public static void main(String[] args) throws Exception {
String topicName = "kafka-flink-avro-1"; // Same Kafka topic as the producer
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-group-id");
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<Kafka server Ip/Url>:<Port>");
consumerConfig.putAll(Collections.singletonMap(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), "<Schema Regitry Ip/Utl>:<Port>/api/v1"));
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create a Flink Kafka consumer with the custom Avro deserialization schema
FlinkKafkaConsumer<ItemTransaction> kafkaConsumer = new FlinkKafkaConsumer<>(
topicName,
ClouderaRegistryKafkaDeserializationSchema
.builder(ItemTransaction.class)
.setRegistryAddress("http://<Schema Registry Ip/url>:<Port>/api/v1")
.build(),
consumerConfig
);
// Add the Kafka consumer to the Flink environment
DataStream<ItemTransaction> avroStream = env.addSource(kafkaConsumer);
KeyedStream<ItemTransaction, Tuple1<String>> keyedStream = avroStream.keyBy(new KeyExtractor());
// Process and write output using DynamicOutputPathMapper
DataStream<Tuple2<String, Long>> windowedCounts = keyedStream
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
.reduce(new ReduceFunction<ItemTransaction>() {
@Override
public ItemTransaction reduce(ItemTransaction item1, ItemTransaction item2) throws Exception {
return item1; // You can modify the reduce logic as needed
}
})
.map(new DynamicOutputPathMapper())
.keyBy(0)
.sum(1);
windowedCounts.print();
// Print Avro records for processing
avroStream.print();
env.execute("KafkaAvroFlinkConsumer");
}
public static class KeyExtractor implements KeySelector<ItemTransaction, Tuple1<String>> {
@Override
public Tuple1<String> getKey(ItemTransaction item) {
return new Tuple1<>(item.getName().toString());
}
}
// Custom Avro deserialization schema
public static class DynamicOutputPathMapper extends RichMapFunction<ItemTransaction, Tuple2<String, Long>> {
private long windowCount = 0;
private transient AvroOutputFormat<ItemTransaction> avroOutputFormat;
private transient int subtaskIndex;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
}
@Override
public void close() throws Exception {
if (avroOutputFormat != null) {
avroOutputFormat.close();
avroOutputFormat = null;
}
}
@Override
public Tuple2<String, Long> map(ItemTransaction item) throws Exception {
// Increment the window count
windowCount++;
// Generate a dynamic output path based on the window count and subtask index
String dynamicOutputPath = "oci://flink-output@orasenatdctocloudcorp01/output" + windowCount + "_" + subtaskIndex + ".txt";
// Initialize or update the AvroOutputFormat
if (avroOutputFormat == null) {
avroOutputFormat = new AvroOutputFormat<>(new Path(dynamicOutputPath), ItemTransaction.class);
avroOutputFormat.configure(null);
avroOutputFormat.open(subtaskIndex, getRuntimeContext().getNumberOfParallelSubtasks());
} else if (!dynamicOutputPath.equals(avroOutputFormat.getOutputFilePath())) {
avroOutputFormat.close();
avroOutputFormat.setOutputFilePath(new Path(dynamicOutputPath));
avroOutputFormat.open(subtaskIndex, getRuntimeContext().getNumberOfParallelSubtasks());
}
// Write the item using AvroOutputFormat
avroOutputFormat.writeRecord(item);
return new Tuple2<>("Window Count", 1L);
}
}
}
Tâche 5 : déployer et exécuter les fichiers JAR Applications
Vous avez créé le producteur Java Kafka Avro et le consommateur Flink. Il est temps de déployer et d'exécuter ces applications sur vos clusters OCI BDS. Déployez le fichier JAR dans le cluster Hadoop et utilisez les commandes suivantes pour exécuter Producer et Consumer.
sudo -u flink /usr/odh/current/flink/bin//flink run -m yarn-cluster -yD classloader.check-leaked-classloader=false -c <Main Class Name> <Path/To/Jar>/<Jar Name>
sudo -u flink /usr/odh/current/flink/bin//flink run -m yarn-cluster -yD classloader.check-leaked-classloader=false -c <Main Class Name> <Path/To/Jar>/<Jar Name>
Tâche 6 : vérifier si le sujet est inscrit auprès du registre de schémas
Avant de continuer, il est essentiel de vérifier si la rubrique Kafka que vous utilisez dans votre pipeline de données est inscrite auprès du registre de schémas. Nous voyons que le schéma est inscrit avec le nom de rubrique utilisé dans l'application de fournisseur.
Tâche 7 : surveiller Flink via l'interface utilisateur Web Flink et vérifier le fichier de sortie dans OCI Object Storage
Votre travail Flink est en cours d'exécution sur le cluster BDS, surveillons sa progression via l'interface utilisateur Web Flink et validons les données traitées stockées dans Object Storage.
Tâche 8 : interroger les données Avro à l'aide de Trino
Trino, un moteur de requête SQL distribué open source, peut être utilisé pour interroger les données Avro de manière transparente. Assurez-vous que Trino est installé et configuré dans votre environnement. Obtenez les détails de connexion nécessaires, tels que l'URL du coordinateur Trino. Ouvrez un terminal BDS et démarrez la CLI Trino à l'aide de la commande. Utilisez l'interface de ligne de commande Trino pour exécuter des requêtes SQL sur les données Avro, comme indiqué dans l'image suivante.
Etapes suivantes
Ce tutoriel fournit un guide complet pour créer un pipeline de données en temps réel sur OCI à l'aide de Big Data Service, intégrant des services essentiels pour produire, traiter et interroger efficacement les données Avro. Avec des objectifs clairs, des instructions et un renforcement positif, vous êtes bien équipé pour construire et optimiser votre propre pipeline de données en temps réel.
Liens connexes
Remerciements
- Auteurs - Pavan Upadhyay (ingénieur cloud principal), Saket Bihari (ingénieur cloud principal)
Ressources de formation supplémentaires
Parcourez d'autres ateliers sur docs.oracle.com/learn ou accédez à davantage de contenus de formation gratuits sur le canal Oracle Learning YouTube. De plus, rendez-vous sur education.oracle.com/learning-explorer pour devenir un explorateur Oracle Learning.
Pour obtenir de la documentation sur le produit, visitez Oracle Help Center.
Build a Real-Time Data Pipeline with Oracle Cloud Infrastructure Big Data Service
F89771-01
November 2023
Copyright © 2023, Oracle and/or its affiliates.