Observação:
- Este tutorial requer acesso ao Oracle Cloud. Para se inscrever em uma conta gratuita, consulte Conceitos básicos do Oracle Cloud Infrastructure Free Tier.
- Ele usa valores de exemplo para credenciais, tenancy e compartimentos do Oracle Cloud Infrastructure. Ao concluir seu laboratório, substitua esses valores por valores específicos do seu ambiente de nuvem.
Crie um Pipeline de Dados em Tempo Real com o Oracle Cloud Infrastructure Big Data Service
Introdução
No mundo atual orientado por dados, as organizações confiam em pipelines de dados eficientes para capturar, processar e analisar dados em tempo real. A Oracle Cloud Infrastructure (OCI) fornece um ecossistema robusto de serviços que podem ser aproveitados para criar um pipeline de dados avançado. Neste tutorial, orientaremos você no processo de criação de um pipeline de dados que aproveite o Oracle Cloud Infrastructure Big Data Service (OCI BDS). O OCI BDS é um serviço baseado em nuvem fornecido pela Oracle que permite aos usuários criar e gerenciar clusters Hadoop, clusters Spark e outros serviços de big data. Utilizaremos uma parte de seu ecossistema que inclui Kafka, Flink, Schema Registry e Trino para construir esse pipeline.
Ao longo deste tutorial, forneceremos instruções detalhadas e exemplos de código para garantir uma implantação de pipeline de dados perfeita e segura.
Objetivos
-
Configurar Recursos do OCI: Entenda como configurar recursos do OCI, incluindo clusters BDS, Kafka, Flink, Schema Registry, OCI Object Storage e Trino para facilitar o pipeline de dados.
-
Crie um Produtor Avro (Java): Crie um aplicativo Java para produzir dados Avro para o tópico Kafka ao usar esquemas Avro do Registro de Esquema.
-
Crie um consumidor Flink (Java): Configure um cluster BDS para um consumidor Flink que consome dados do Kafka, processa-os e os armazena no Object Storage.
-
Implantar e Executar os Aplicativos: Saiba como implantar e executar o produtor Avro e os aplicativos consumidores Flink em seus clusters BDS.
-
Consultar Dados Selecionados: Obtenha a capacidade de acessar os dados selecionados no OCI Object Storage usando várias ferramentas da OCI, incluindo Trino, BDS Cloud SQL, Query Service, OAC e muito mais. Usaremos o Trino para consultar dados da Avro.
Pré-requisitos
-
Uma conta do OCI.
-
Acesso aos serviços do OCI, incluindo OCI BDS, OCI Object Storage.
-
Conhecimento de Java e Apache Flink.
-
As ferramentas e SDKs necessários da CLI (Interface de Linha de Comando) do OCI instalados.
-
Compreensão de Avro e Kafka.
Tarefa 1: Criar um Cluster do Oracle Big Data com Kafka/Flink/Schema Registry/Trino ativado
-
Crie um ambiente BDS (ODH 2.0) no OCI. Para criar o cluster do BDS, consulte Conceitos Básicos de um Cluster do Big Data ODH Não Altamente Disponível.
-
Faça login no Ambari com o seguinte URL:
https://<cluster name>un0-hostname>:7183/
para adicionar serviços Kafka, Flink, registro de esquema e Trino. Informe o nome de usuário e a senha e clique em INSCREVA-SE. -
Clique em reticências (...) ao lado de Serviços e selecione Adicionar Serviço.
-
Verifique Kafka, Flink, registro de esquema e Trino e clique em Próximo.
-
Se você vir as configurações recomendadas a seguir, clique em PROCEED ANYWAY e, em seguida, clique em DEPLOY.
-
Após a instalação, clique em PRÓXIMO e, em seguida, clique em CONCLUIR.
-
Reinicie todos os componentes afetados. Clique em reticências (...) ao lado de Serviços, selecione Reiniciar Tudo o Necessário e, após reiniciar, clique em OK.
-
Faça log-in no nó mestre do cluster do Oracle Big Data Service por meio de um comando
SSH
ou utilizando putty com o arquivoppk
usando as credenciais do usuárioopc
. Depois de fazer log-in, eleve suas permissões para o usuárioroot
. Usamos putty para fazer login nos nós.sudo su -
-
Execute as etapas a seguir para fazer upload do arquivo jar para que você possa acessar o Kafka com o Flink.
-
Faça download de
lib.zip
em qualquer diretório (por exemplo,/tmp
) e descompacte-o.cd /tmp wget https://objectstorage.ap-tokyo-1.oraclecloud.com/p/bhsTSKQYGjbeGwm5JW63Re3_o9o2JOrKiQVi_-6hrQfrB7lGSvA1z5RPyDLy3lpU/n/sehubjapacprod/b/live-lab/o/download/lib.zip #unzip lib unzip /tmp/lib.zip
-
Execute o comando a seguir para copiar o jar para todos os nós do BDS.
dcli -C "cp /usr/odh/current/flink/connectors/*kafka*.jar /usr/odh/current/flink/lib" dcli -C "cp /usr/lib/flink/connectors/flink-connector-jdbc-1.15.2.jar /usr/odh/current/flink/lib" dcli -C "cp /usr/odh/current/kafka-broker/libs/kafka-clients-3.2.0.jar /usr/odh/current/flink/lib" su - hdfs hadoop fs -put /usr/odh/current/flink/connectors/*.jar /flink/lib/flink-libs/ hadoop fs -put /usr/odh/current/kafka-broker/libs/kafka-clients-3.2.0.jar /flink/lib/flink-libs/ hadoop fs -put /tmp/lib/mysql-connector-java-8.0.28.jar /flink/lib/flink-libs/
-
-
Faça login no Trino usando os comandos a seguir e crie um esquema e uma tabela em que o consumidor do Flink gravará os dados do Avro.
For HA Cluster /usr/lib/trino/bin/trino-cli --server <cordinator dns>:7778 --krb5-principal <username> --krb5-keytab-path <path to key tab> trino.service.keytab --krb5-remote-service-name trino --user <username> --truststore-path=/etc/security/serverKeys/truststore.jks For NON HA Cluster /usr/lib/trino/bin/trino-cli --server <cordinator dns>:8285 Create Schema CREATE SCHEMA <catalog>.<schema name> WITH ( LOCATION = '<object store>/<folder>'); Create table on SCHEMA CREATE TABLE <catalog>.<schema name>.<table name> ( name varchar, lastname varchar, age int, email varchar, timestamp bigint ) WITH ( format = 'AVRO', avro_schema_url = '<object store>/<avro schema file name>' )
Observação: siga as etapas para configurar o Hive Metastore para usar a implementação
SerdeStorageSchemaReader
. É necessário que a Trino consulte dados do armazenamento de objetos do OCI. -
Vá para Ambari, Hive, Configuração e Hive-site personalizado. Defina a seguinte propriedade para configurar o Hive Metastore:
metastore.storage.schema.reader.impl=org.apache.hadoop.hive.metastore.SerDeStorageSchemaReader
. -
Crie o bucket do OCI Object Storage para armazenar a saída.
Tarefa 2: Criar um projeto Java Maven usando dependências de POM
Para iniciar o desenvolvimento do pipeline de dados em tempo real, vamos configurar um projeto Java Maven com as dependências necessárias a seguir. Você pode escolher qualquer IDE ou tipo de projeto que se adapte à sua preferência.
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.15.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>3.2.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.15.2</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.15.2</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/schema-registry-serdes -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>schema-registry-serdes</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/schema-registry-client -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>schema-registry-client</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/schema-registry-common -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>schema-registry-common</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
<version>2.0.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/jersey-shaded -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>jersey-shaded</artifactId>
<version>0.9.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/common-auth -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>common-auth</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/registry-common-client -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>registry-common-client</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-guava -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-guava</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-joda -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-joda</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.module/jackson-module-parameter-names -->
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-parameter-names</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jdk8 -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jdk8</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jsr310 -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-json-org -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-json-org</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-annotations -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>1.15.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro-cloudera-registry -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro-cloudera-registry</artifactId>
<version>1.15.1-csa1.8.0.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-redis -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.10</artifactId>
<version>1.1.5-hadoop1</version>
</dependency>
<dependency>
<groupId>com.oracle.oci.sdk</groupId>
<artifactId>oci-java-sdk-redis</artifactId>
<version>3.26.0</version>
</dependency>
</dependencies>
Tarefa 3: Criar produtor Java Kafka Avro usando código de amostra e arquivo avsc
Crie o produtor Avro do Java Kafka para seu pipeline de dados usando o código de amostra fornecido e o esquema Avro. Substitua placeholders como your_kafka_bootstrap_servers
, your_schema_registry_url
, your_kafka_topic
e adapte o esquema Avro aos seus valores e estrutura específicos.
Este código servirá como base para o seu produtor Avro, permitindo-lhe produzir dados Avro para o tópico Kafka dentro do seu pipeline de dados em tempo real.
package org.example;
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.StringSerializer;
import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Properties;
public class KafkaAvroProducer {
public static void main(String[] args) {
String schemaFileName = "ItemTransaction.avsc"; // Provide the path to your Avro schema file
String topicName = "kafka-flink-avro-1"; // Provide the Kafka topic name
try {
//Map<String, Object> config = new HashMap<>();
// Load producer properties
Properties producerConfig = new Properties();
//Map<String, Object> producerConfig = new HashMap<>();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<Kafka server IP/URL>:<Port>");
producerConfig.putAll(Collections.singletonMap(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), "http://<Schema Registry IP/URL>:<Port>/api/v1"));
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
// Create a Kafka producer
Producer<String, GenericRecord> producer = new KafkaProducer<>(producerConfig);
// Load Avro schema from resources
Schema schema = loadAvroSchemaFromResources(schemaFileName);
// Create a sample Avro record
while (true) {
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("name", "My Name");
avroRecord.put("lastname", "My Last Name");
avroRecord.put("age", 10);
avroRecord.put("email", "a@mail.com");
avroRecord.put("timestamp", System.currentTimeMillis());
// Create a Kafka record with a topic and Avro payload
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(topicName, avroRecord);
// Send the message
producer.send(record);
// Sleep for 3 seconds before sending the next message
Thread.sleep(3000);
}
//producer.close();
} catch (Exception e) {
e.printStackTrace();
}
}
private static Schema loadAvroSchemaFromResources(String schemaFileName) throws IOException {
try (InputStream inputStream = KafkaAvroProducer.class.getClassLoader().getResourceAsStream(schemaFileName)) {
if (inputStream == null) {
throw new IllegalArgumentException("Schema file not found in resources: " + schemaFileName);
}
return new Schema.Parser().parse(inputStream);
}
}
}
O formato de mensagem Avro definido pelo arquivo de esquema ItemTransaction.avsc
, conforme mostrado abaixo, é armazenado na pasta resources
do projeto. É importante destacar que, para serialização e desserialização do Avro, uma classe POJO específica do Java Avro chamada ItemTransaction.java
precisa ser gerada para lidar efetivamente com o formato Avro.
{
"type" : "record",
"name" : "ItemTransaction",
"namespace": "org.example",
"fields" : [
{ "name" : "name" , "type" : "string" },
{ "name" : "lastname" , "type" : "string" },
{ "name" : "age" , "type" : "int" },
{ "name" : "email" , "type" : "string" },
{ "name" : "timestamp" , "type" : "long" }
]
}
Tarefa 4: Criar consumidor do Java Flink usando código de amostra
Construiremos o consumidor do Java Flink para seu pipeline de dados usando o código de amostra fornecido. Substitua placeholders como your_kafka_bootstrap_servers
, your_consumer_group_id
e your_kafka_topic
pelos detalhes específicos da configuração do Kafka.
Este exemplo de código configura um consumidor do Flink Kafka dentro do ambiente do Flink, permitindo que você processe o fluxo de entrada do Kafka. Personalize a lógica de processamento conforme necessário para o pipeline de dados.
package org.example;
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.AvroOutputFormat;
import org.apache.flink.formats.avro.registry.cloudera.ClouderaRegistryKafkaDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;
public class KafkaAvroFlinkConsumer {
public static void main(String[] args) throws Exception {
String topicName = "kafka-flink-avro-1"; // Same Kafka topic as the producer
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-group-id");
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<Kafka server Ip/Url>:<Port>");
consumerConfig.putAll(Collections.singletonMap(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), "<Schema Regitry Ip/Utl>:<Port>/api/v1"));
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create a Flink Kafka consumer with the custom Avro deserialization schema
FlinkKafkaConsumer<ItemTransaction> kafkaConsumer = new FlinkKafkaConsumer<>(
topicName,
ClouderaRegistryKafkaDeserializationSchema
.builder(ItemTransaction.class)
.setRegistryAddress("http://<Schema Registry Ip/url>:<Port>/api/v1")
.build(),
consumerConfig
);
// Add the Kafka consumer to the Flink environment
DataStream<ItemTransaction> avroStream = env.addSource(kafkaConsumer);
KeyedStream<ItemTransaction, Tuple1<String>> keyedStream = avroStream.keyBy(new KeyExtractor());
// Process and write output using DynamicOutputPathMapper
DataStream<Tuple2<String, Long>> windowedCounts = keyedStream
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
.reduce(new ReduceFunction<ItemTransaction>() {
@Override
public ItemTransaction reduce(ItemTransaction item1, ItemTransaction item2) throws Exception {
return item1; // You can modify the reduce logic as needed
}
})
.map(new DynamicOutputPathMapper())
.keyBy(0)
.sum(1);
windowedCounts.print();
// Print Avro records for processing
avroStream.print();
env.execute("KafkaAvroFlinkConsumer");
}
public static class KeyExtractor implements KeySelector<ItemTransaction, Tuple1<String>> {
@Override
public Tuple1<String> getKey(ItemTransaction item) {
return new Tuple1<>(item.getName().toString());
}
}
// Custom Avro deserialization schema
public static class DynamicOutputPathMapper extends RichMapFunction<ItemTransaction, Tuple2<String, Long>> {
private long windowCount = 0;
private transient AvroOutputFormat<ItemTransaction> avroOutputFormat;
private transient int subtaskIndex;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
}
@Override
public void close() throws Exception {
if (avroOutputFormat != null) {
avroOutputFormat.close();
avroOutputFormat = null;
}
}
@Override
public Tuple2<String, Long> map(ItemTransaction item) throws Exception {
// Increment the window count
windowCount++;
// Generate a dynamic output path based on the window count and subtask index
String dynamicOutputPath = "oci://flink-output@orasenatdctocloudcorp01/output" + windowCount + "_" + subtaskIndex + ".txt";
// Initialize or update the AvroOutputFormat
if (avroOutputFormat == null) {
avroOutputFormat = new AvroOutputFormat<>(new Path(dynamicOutputPath), ItemTransaction.class);
avroOutputFormat.configure(null);
avroOutputFormat.open(subtaskIndex, getRuntimeContext().getNumberOfParallelSubtasks());
} else if (!dynamicOutputPath.equals(avroOutputFormat.getOutputFilePath())) {
avroOutputFormat.close();
avroOutputFormat.setOutputFilePath(new Path(dynamicOutputPath));
avroOutputFormat.open(subtaskIndex, getRuntimeContext().getNumberOfParallelSubtasks());
}
// Write the item using AvroOutputFormat
avroOutputFormat.writeRecord(item);
return new Tuple2<>("Window Count", 1L);
}
}
}
Tarefa 5: Implantar e Executar os jars de Aplicativos
Você criou o produtor do Java Kafka Avro e o consumidor do Flink. É hora de implantar e executar esses aplicativos em seus clusters do OCI BDS. Implante o jar no cluster do Hadoop e use os comandos a seguir para executar o Produtor e o Consumidor.
sudo -u flink /usr/odh/current/flink/bin//flink run -m yarn-cluster -yD classloader.check-leaked-classloader=false -c <Main Class Name> <Path/To/Jar>/<Jar Name>
sudo -u flink /usr/odh/current/flink/bin//flink run -m yarn-cluster -yD classloader.check-leaked-classloader=false -c <Main Class Name> <Path/To/Jar>/<Jar Name>
Tarefa 6: Verificar se o tópico está registrado no Schema Registry
Antes de continuar, é essencial verificar se o tópico do Kafka que você está usando em seu pipeline de dados está registrado no Schema Registry. Podemos ver que o esquema está registrado com o nome do tópico usado no aplicativo produtor.
Tarefa 7: Monitorar o Flink por meio da IU da Web do Flink e verificar o arquivo de saída no OCI Object Storage
Seu job do Flink está sendo executado no cluster do BDS, vamos monitorar seu progresso por meio da interface do usuário Web do Flink e validar os dados processados armazenados no Object Storage.
Tarefa 8: Consultar Dados Avro usando Trino
O Trino, um mecanismo de consulta SQL distribuído de código aberto, pode ser usado para consultar dados Avro perfeitamente. Certifique-se de que o Trino esteja instalado e configurado em seu ambiente. Obtenha os detalhes de conexão necessários, como o URL do coordenador do Trino. Abra um terminal do BDS e inicie a CLI do Trino usando o comando. Use a CLI do Trino para executar consultas SQL nos dados Avro, conforme mostrado na imagem a seguir.
Próximas Etapas
Este tutorial fornece um guia abrangente para criar um pipeline de dados em tempo real na OCI usando o Big Data Service, integrando serviços essenciais para produzir, processar e consultar dados da Avro com eficiência. Com objetivos claros, instruções e reforço positivo, você está bem equipado para construir e otimizar seu próprio pipeline de dados em tempo real.
Links Relacionados
Agradecimentos
- Autores - Pavan Upadhyay (Engenheiro Principal de Nuvem), Saket Bihari (Engenheiro Principal de Nuvem)
Mais Recursos de Aprendizagem
Explore outros laboratórios em docs.oracle.com/learn ou acesse mais conteúdo de aprendizado gratuito no canal Oracle Learning YouTube. Além disso, visite education.oracle.com/learning-explorer para se tornar um Oracle Learning Explorer.
Para obter a documentação do produto, visite o Oracle Help Center.
Build a Real-Time Data Pipeline with Oracle Cloud Infrastructure Big Data Service
F89771-01
November 2023
Copyright © 2023, Oracle and/or its affiliates.