Hinweis:
- Dieses Tutorial erfordert Zugriff auf Oracle Cloud. Informationen zum Registrieren eines kostenlosen Accounts finden Sie unter Erste Schritte mit Oracle Cloud Infrastructure Free Tier.
- Es verwendet Beispielwerte für Oracle Cloud Infrastructure-Zugangsdaten, -Mandanten und -Compartments. Wenn Sie Ihre Übung abgeschlossen haben, ersetzen Sie diese Werte durch spezifische Werte für Ihre Cloud-Umgebung.
Erstellen einer Echtzeit-Datenpipeline mit Oracle Cloud Infrastructure Big Data Service
Einführung
In der heutigen datengesteuerten Welt verlassen sich Unternehmen auf effiziente Datenpipelines, um Daten in Echtzeit zu erfassen, zu verarbeiten und zu analysieren. Oracle Cloud Infrastructure (OCI) bietet ein robustes Ökosystem von Services, die zum Erstellen einer leistungsstarken Datenpipeline genutzt werden können. In diesem Tutorial werden Sie durch den Prozess zum Erstellen einer Datenpipeline geführt, die Oracle Cloud Infrastructure Big Data Service (OCI BDS) nutzt. OCI BDS ist ein cloud-basierter Service von Oracle, mit dem Benutzer Hadoop-Cluster, Spark-Cluster und andere Big Data-Services erstellen und verwalten können. Wir werden einen Teil seines Ökosystems nutzen, zu dem Kafka, Flink, Schema Registry und Trino gehören, um diese Pipeline aufzubauen.
In diesem Tutorial erhalten Sie detaillierte Anweisungen und Codebeispiele, um ein nahtloses und sicheres Deployment der Datenpipeline sicherzustellen.
Ziele
-
OCI-Ressourcen einrichten: Konfigurieren von OCI-Ressourcen, einschließlich BDS-Clustern, Kafka, Flink, Schema-Registry, OCI Object Storage und Trino, um die Datenpipeline zu vereinfachen.
-
Avro Producer (Java) erstellen: Erstellen Sie eine Java-Anwendung zum Erstellen von Avro-Daten für das Kafka-Thema, während Sie Avro-Schemas aus der Schema-Registry verwenden.
-
Flink Consumer (Java) erstellen: Richten Sie ein BDS-Cluster für einen Flink Consumer ein, der Daten aus Kafka konsumiert, verarbeitet und in Object Storage speichert.
-
Anwendungen bereitstellen und ausführen: Erfahren Sie, wie Sie die Avro-Producer- und Flink-Consumeranwendungen auf Ihren BDS-Clustern bereitstellen und ausführen.
-
Kuratierte Daten abfragen: Mit verschiedenen OCI-Tools wie Trino, BDS Cloud SQL, Query Service, OAC und mehr können Sie auf die kuratierten Daten in OCI Object Storage zugreifen. Wir verwenden Trino, um Avro-Daten abzufragen.
Voraussetzungen
-
Ein OCI-Account.
-
Zugriff auf OCI-Services, einschließlich OCI BDS, OCI Object Storage.
-
Kenntnisse über Java und Apache Flink.
-
Die erforderlichen OCI Command Line Interface-(CLI-)Tools und SDKs sind installiert.
-
Verständnis von Avro und Kafka.
Aufgabe 1: Oracle Big Data-Cluster mit aktiviertem Kafka/Flink/Schema Registry/Trino erstellen
-
Erstellen Sie eine BDS-(ODH 2.0-)Umgebung auf OCI. Informationen zum Erstellen eines BDS-Clusters finden Sie unter Erste Schritte mit einem nicht hoch verfügbaren ODH Big Data-Cluster.
-
Melden Sie sich mit der folgenden URL bei Ambari an:
https://<cluster name>un0-hostname>:7183/
, um Kafka-, Flink-, Schema-Registry- und Trino-Services hinzuzufügen. Geben Sie den Benutzernamen und das Kennwort ein, und klicken Sie dann auf Anmelden. -
Klicken Sie neben Services auf Auslassungspunkte (...), und wählen Sie Service hinzufügen aus.
-
Prüfen Sie Kafka, Flink, Schema-Registry und Trino, und klicken Sie dann auf Weiter.
-
Wenn Ihnen die folgenden empfohlenen Konfigurationen angezeigt werden, klicken Sie auf Jederzeit GENEHMIGT und dann auf DEPLOY.
-
Klicken Sie nach der Installation auf Weiter, und klicken Sie dann auf Abschließen.
-
Starten Sie alle betroffenen Komponenten neu. Klicken Sie neben Services auf Auslassungspunkte (...), wählen Sie Alle erforderlichen neu starten aus, und klicken Sie nach dem Neustart auf OK.
-
Melden Sie sich beim Masterknoten des Oracle Big Data Service-Clusters entweder mit einem
SSH
-Befehl an, oder verwenden Sie putty mit der Dateippk
mit denopc
-Benutzerzugangsdaten. Leiten Sie nach der Anmeldung Ihre Berechtigungen an den Benutzerroot
weiter. Wir haben uns mit putty bei den Knoten angemeldet.sudo su -
-
Führen Sie die folgenden Schritte aus, um die JAR-Datei hochzuladen, damit Sie mit Flink auf Kafka zugreifen können.
-
Laden Sie
lib.zip
in ein beliebiges Verzeichnis herunter (z.B./tmp
), und dekomprimieren Sie es.cd /tmp wget https://objectstorage.ap-tokyo-1.oraclecloud.com/p/bhsTSKQYGjbeGwm5JW63Re3_o9o2JOrKiQVi_-6hrQfrB7lGSvA1z5RPyDLy3lpU/n/sehubjapacprod/b/live-lab/o/download/lib.zip #unzip lib unzip /tmp/lib.zip
-
Führen Sie den folgenden Befehl aus, um die JAR-Datei auf alle BDS-Knoten zu kopieren.
dcli -C "cp /usr/odh/current/flink/connectors/*kafka*.jar /usr/odh/current/flink/lib" dcli -C "cp /usr/lib/flink/connectors/flink-connector-jdbc-1.15.2.jar /usr/odh/current/flink/lib" dcli -C "cp /usr/odh/current/kafka-broker/libs/kafka-clients-3.2.0.jar /usr/odh/current/flink/lib" su - hdfs hadoop fs -put /usr/odh/current/flink/connectors/*.jar /flink/lib/flink-libs/ hadoop fs -put /usr/odh/current/kafka-broker/libs/kafka-clients-3.2.0.jar /flink/lib/flink-libs/ hadoop fs -put /tmp/lib/mysql-connector-java-8.0.28.jar /flink/lib/flink-libs/
-
-
Melden Sie sich mit den folgenden Befehlen beim Trino an, und erstellen Sie ein Schema und eine Tabelle, in die der Flink-Consumer die Avro-Daten schreibt.
For HA Cluster /usr/lib/trino/bin/trino-cli --server <cordinator dns>:7778 --krb5-principal <username> --krb5-keytab-path <path to key tab> trino.service.keytab --krb5-remote-service-name trino --user <username> --truststore-path=/etc/security/serverKeys/truststore.jks For NON HA Cluster /usr/lib/trino/bin/trino-cli --server <cordinator dns>:8285 Create Schema CREATE SCHEMA <catalog>.<schema name> WITH ( LOCATION = '<object store>/<folder>'); Create table on SCHEMA CREATE TABLE <catalog>.<schema name>.<table name> ( name varchar, lastname varchar, age int, email varchar, timestamp bigint ) WITH ( format = 'AVRO', avro_schema_url = '<object store>/<avro schema file name>' )
Hinweis: Führen Sie die Schritte aus, um Hive Metastore für die Verwendung der
SerdeStorageSchemaReader
-Implementierung zu konfigurieren. Trino muss Daten aus dem OCI-Objektspeicher abfragen. -
Gehen Sie zu Ambari, Hive, Config und Custom hive-site. Legen Sie die folgende Eigenschaft fest, um Hive Metastore zu konfigurieren:
metastore.storage.schema.reader.impl=org.apache.hadoop.hive.metastore.SerDeStorageSchemaReader
. -
Erstellen Sie einen OCI Object Storage-Bucket zum Speichern der Ausgabe.
Aufgabe 2: Java Maven-Projekt mit POM-Abhängigkeiten erstellen
Um die Entwicklung Ihrer Echtzeit-Datenpipeline zu starten, richten wir ein Java Maven-Projekt mit den folgenden erforderlichen Abhängigkeiten ein. Sie können eine beliebige IDE oder einen beliebigen Projekttyp auswählen, der Ihren Wünschen entspricht.
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.15.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>3.2.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.15.2</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.15.2</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/schema-registry-serdes -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>schema-registry-serdes</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/schema-registry-client -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>schema-registry-client</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/schema-registry-common -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>schema-registry-common</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
<version>2.0.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/jersey-shaded -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>jersey-shaded</artifactId>
<version>0.9.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/common-auth -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>common-auth</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/registry-common-client -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>registry-common-client</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-guava -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-guava</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-joda -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-joda</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.module/jackson-module-parameter-names -->
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-parameter-names</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jdk8 -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jdk8</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jsr310 -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-json-org -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-json-org</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-annotations -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>1.15.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro-cloudera-registry -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro-cloudera-registry</artifactId>
<version>1.15.1-csa1.8.0.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-redis -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.10</artifactId>
<version>1.1.5-hadoop1</version>
</dependency>
<dependency>
<groupId>com.oracle.oci.sdk</groupId>
<artifactId>oci-java-sdk-redis</artifactId>
<version>3.26.0</version>
</dependency>
</dependencies>
Aufgabe 3: Java Kafka Avro Producer mit Beispielcode und Datei avsc
erstellen
Erstellen Sie den Java Kafka Avro-Producer für Ihre Datenpipeline mit dem bereitgestellten Beispielcode und dem Avro-Schema. Ersetzen Sie Platzhalter wie your_kafka_bootstrap_servers
, your_schema_registry_url
, your_kafka_topic
, und passen Sie das Avro-Schema an Ihre spezifischen Werte und Struktur an.
Dieser Code dient als Grundlage für Ihren Avro-Produzenten und ermöglicht es ihm, Avro-Daten für das Kafka-Thema in Ihrer Echtzeit-Datenpipeline zu erstellen.
package org.example;
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.StringSerializer;
import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Properties;
public class KafkaAvroProducer {
public static void main(String[] args) {
String schemaFileName = "ItemTransaction.avsc"; // Provide the path to your Avro schema file
String topicName = "kafka-flink-avro-1"; // Provide the Kafka topic name
try {
//Map<String, Object> config = new HashMap<>();
// Load producer properties
Properties producerConfig = new Properties();
//Map<String, Object> producerConfig = new HashMap<>();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<Kafka server IP/URL>:<Port>");
producerConfig.putAll(Collections.singletonMap(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), "http://<Schema Registry IP/URL>:<Port>/api/v1"));
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
// Create a Kafka producer
Producer<String, GenericRecord> producer = new KafkaProducer<>(producerConfig);
// Load Avro schema from resources
Schema schema = loadAvroSchemaFromResources(schemaFileName);
// Create a sample Avro record
while (true) {
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("name", "My Name");
avroRecord.put("lastname", "My Last Name");
avroRecord.put("age", 10);
avroRecord.put("email", "a@mail.com");
avroRecord.put("timestamp", System.currentTimeMillis());
// Create a Kafka record with a topic and Avro payload
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(topicName, avroRecord);
// Send the message
producer.send(record);
// Sleep for 3 seconds before sending the next message
Thread.sleep(3000);
}
//producer.close();
} catch (Exception e) {
e.printStackTrace();
}
}
private static Schema loadAvroSchemaFromResources(String schemaFileName) throws IOException {
try (InputStream inputStream = KafkaAvroProducer.class.getClassLoader().getResourceAsStream(schemaFileName)) {
if (inputStream == null) {
throw new IllegalArgumentException("Schema file not found in resources: " + schemaFileName);
}
return new Schema.Parser().parse(inputStream);
}
}
}
Das Avro-Nachrichtenformat, das von der Schemadatei ItemTransaction.avsc
definiert wird, wie unten dargestellt, wird im Ordner resources
des Projekts gespeichert. Es ist wichtig zu betonen, dass für die Serialisierung und Deserialisierung von Avro eine entsprechende Java Avro-spezifische POJO-Klasse namens ItemTransaction.java
generiert werden muss, um das Avro-Format effektiv zu verarbeiten.
{
"type" : "record",
"name" : "ItemTransaction",
"namespace": "org.example",
"fields" : [
{ "name" : "name" , "type" : "string" },
{ "name" : "lastname" , "type" : "string" },
{ "name" : "age" , "type" : "int" },
{ "name" : "email" , "type" : "string" },
{ "name" : "timestamp" , "type" : "long" }
]
}
Aufgabe 4: Java Flink-Consumer mit Beispielcode erstellen
Wir erstellen den Java Flink-Consumer für Ihre Datenpipeline mit dem bereitgestellten Beispielcode. Ersetzen Sie Platzhalter wie your_kafka_bootstrap_servers
, your_consumer_group_id
und your_kafka_topic
durch Ihre spezifischen Kafka-Setupdetails.
Dieser Beispielcode richtet einen Flink-Kafka-Consumer in der Flink-Umgebung ein, mit dem Sie den eingehenden Kafka-Stream verarbeiten können. Passen Sie die Verarbeitungslogik nach Bedarf für Ihre Datenpipeline an.
package org.example;
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.AvroOutputFormat;
import org.apache.flink.formats.avro.registry.cloudera.ClouderaRegistryKafkaDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;
public class KafkaAvroFlinkConsumer {
public static void main(String[] args) throws Exception {
String topicName = "kafka-flink-avro-1"; // Same Kafka topic as the producer
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-group-id");
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<Kafka server Ip/Url>:<Port>");
consumerConfig.putAll(Collections.singletonMap(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), "<Schema Regitry Ip/Utl>:<Port>/api/v1"));
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create a Flink Kafka consumer with the custom Avro deserialization schema
FlinkKafkaConsumer<ItemTransaction> kafkaConsumer = new FlinkKafkaConsumer<>(
topicName,
ClouderaRegistryKafkaDeserializationSchema
.builder(ItemTransaction.class)
.setRegistryAddress("http://<Schema Registry Ip/url>:<Port>/api/v1")
.build(),
consumerConfig
);
// Add the Kafka consumer to the Flink environment
DataStream<ItemTransaction> avroStream = env.addSource(kafkaConsumer);
KeyedStream<ItemTransaction, Tuple1<String>> keyedStream = avroStream.keyBy(new KeyExtractor());
// Process and write output using DynamicOutputPathMapper
DataStream<Tuple2<String, Long>> windowedCounts = keyedStream
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
.reduce(new ReduceFunction<ItemTransaction>() {
@Override
public ItemTransaction reduce(ItemTransaction item1, ItemTransaction item2) throws Exception {
return item1; // You can modify the reduce logic as needed
}
})
.map(new DynamicOutputPathMapper())
.keyBy(0)
.sum(1);
windowedCounts.print();
// Print Avro records for processing
avroStream.print();
env.execute("KafkaAvroFlinkConsumer");
}
public static class KeyExtractor implements KeySelector<ItemTransaction, Tuple1<String>> {
@Override
public Tuple1<String> getKey(ItemTransaction item) {
return new Tuple1<>(item.getName().toString());
}
}
// Custom Avro deserialization schema
public static class DynamicOutputPathMapper extends RichMapFunction<ItemTransaction, Tuple2<String, Long>> {
private long windowCount = 0;
private transient AvroOutputFormat<ItemTransaction> avroOutputFormat;
private transient int subtaskIndex;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
}
@Override
public void close() throws Exception {
if (avroOutputFormat != null) {
avroOutputFormat.close();
avroOutputFormat = null;
}
}
@Override
public Tuple2<String, Long> map(ItemTransaction item) throws Exception {
// Increment the window count
windowCount++;
// Generate a dynamic output path based on the window count and subtask index
String dynamicOutputPath = "oci://flink-output@orasenatdctocloudcorp01/output" + windowCount + "_" + subtaskIndex + ".txt";
// Initialize or update the AvroOutputFormat
if (avroOutputFormat == null) {
avroOutputFormat = new AvroOutputFormat<>(new Path(dynamicOutputPath), ItemTransaction.class);
avroOutputFormat.configure(null);
avroOutputFormat.open(subtaskIndex, getRuntimeContext().getNumberOfParallelSubtasks());
} else if (!dynamicOutputPath.equals(avroOutputFormat.getOutputFilePath())) {
avroOutputFormat.close();
avroOutputFormat.setOutputFilePath(new Path(dynamicOutputPath));
avroOutputFormat.open(subtaskIndex, getRuntimeContext().getNumberOfParallelSubtasks());
}
// Write the item using AvroOutputFormat
avroOutputFormat.writeRecord(item);
return new Tuple2<>("Window Count", 1L);
}
}
}
Aufgabe 5: Anwendungs-JARs bereitstellen und ausführen
Sie haben den Java Kafka Avro-Producer und den Flink-Consumer erstellt. Es ist an der Zeit, diese Anwendungen auf Ihren OCI BDS-Clustern bereitzustellen und auszuführen. Stellen Sie die JAR-Datei im Hadoop-Cluster bereit, und führen Sie Producer und Consumer mit den folgenden Befehlen aus.
sudo -u flink /usr/odh/current/flink/bin//flink run -m yarn-cluster -yD classloader.check-leaked-classloader=false -c <Main Class Name> <Path/To/Jar>/<Jar Name>
sudo -u flink /usr/odh/current/flink/bin//flink run -m yarn-cluster -yD classloader.check-leaked-classloader=false -c <Main Class Name> <Path/To/Jar>/<Jar Name>
Aufgabe 6: Prüfen, ob das Topic bei Schema Registry registriert ist
Bevor Sie fortfahren, müssen Sie unbedingt prüfen, ob das Kafka-Thema, das Sie in der Datenpipeline verwenden, bei der Schema-Registry registriert ist. Das Schema wird mit dem in der Producer-Anwendung verwendeten Themennamen registriert.
Aufgabe 7: Flink über Flink-Web-UI überwachen und Ausgabedatei in OCI Object Storage prüfen
Ihr Flink-Job wird im BDS-Cluster ausgeführt. Überwachen wir seinen Fortschritt über die Web-UI von Flink und validieren die verarbeiteten Daten, die in Object Storage gespeichert sind.
Aufgabe 8: Avro-Daten mit Trino abfragen
Trino, eine verteilte Open-Source-SQL-Abfrage-Engine, kann verwendet werden, um Avro-Daten nahtlos abzufragen. Stellen Sie sicher, dass Trino in Ihrer Umgebung installiert und konfiguriert ist. Rufen Sie die erforderlichen Verbindungsdetails wie die Trino-Koordinator-URL ab. Öffnen Sie ein BDS-Terminal, und starten Sie die Trino-CLI mit dem Befehl. Verwenden Sie die Trino-CLI, um SQL-Abfragen für die Avro-Daten auszuführen, wie in der folgenden Abbildung dargestellt.
Nächste Schritte
Dieses Tutorial bietet eine umfassende Anleitung zum Erstellen einer Echtzeit-Datenpipeline auf OCI mit Big Data Service, bei der wichtige Services zur effizienten Erstellung, Verarbeitung und Abfrage von Avro-Daten integriert werden. Mit klaren Zielen, Anweisungen und positiver Verstärkung sind Sie gut gerüstet, um Ihre eigene Echtzeit-Datenpipeline zu erstellen und zu optimieren.
Verwandte Links
Danksagungen
- Autoren - Pavan Upadhyay (Principal Cloud Engineer), Saket Bihari (Principal Cloud Engineer)
Weitere Lernressourcen
Lernen Sie andere Übungen auf docs.oracle.com/learn kennen, oder greifen Sie auf weitere kostenlose Lerninhalte im Oracle Learning YouTube Channel zu. Besuchen Sie außerdem education.oracle.com/learning-explorer, um Oracle Learning Explorer zu werden.
Produktdokumentation finden Sie im Oracle Help Center.
Build a Real-Time Data Pipeline with Oracle Cloud Infrastructure Big Data Service
F89771-01
November 2023
Copyright © 2023, Oracle and/or its affiliates.