Hinweis:

Erstellen einer Echtzeit-Datenpipeline mit Oracle Cloud Infrastructure Big Data Service

Einführung

In der heutigen datengesteuerten Welt verlassen sich Unternehmen auf effiziente Datenpipelines, um Daten in Echtzeit zu erfassen, zu verarbeiten und zu analysieren. Oracle Cloud Infrastructure (OCI) bietet ein robustes Ökosystem von Services, die zum Erstellen einer leistungsstarken Datenpipeline genutzt werden können. In diesem Tutorial werden Sie durch den Prozess zum Erstellen einer Datenpipeline geführt, die Oracle Cloud Infrastructure Big Data Service (OCI BDS) nutzt. OCI BDS ist ein cloud-basierter Service von Oracle, mit dem Benutzer Hadoop-Cluster, Spark-Cluster und andere Big Data-Services erstellen und verwalten können. Wir werden einen Teil seines Ökosystems nutzen, zu dem Kafka, Flink, Schema Registry und Trino gehören, um diese Pipeline aufzubauen.

Pipelinearchitektur

In diesem Tutorial erhalten Sie detaillierte Anweisungen und Codebeispiele, um ein nahtloses und sicheres Deployment der Datenpipeline sicherzustellen.

Ziele

Voraussetzungen

Aufgabe 1: Oracle Big Data-Cluster mit aktiviertem Kafka/Flink/Schema Registry/Trino erstellen

  1. Erstellen Sie eine BDS-(ODH 2.0-)Umgebung auf OCI. Informationen zum Erstellen eines BDS-Clusters finden Sie unter Erste Schritte mit einem nicht hoch verfügbaren ODH Big Data-Cluster.

  2. Melden Sie sich mit der folgenden URL bei Ambari an: https://<cluster name>un0-hostname>:7183/, um Kafka-, Flink-, Schema-Registry- und Trino-Services hinzuzufügen. Geben Sie den Benutzernamen und das Kennwort ein, und klicken Sie dann auf Anmelden.

    Anmelden bei Ambari

  3. Klicken Sie neben Services auf Auslassungspunkte (...), und wählen Sie Service hinzufügen aus.

    Services hinzufügen

  4. Prüfen Sie Kafka, Flink, Schema-Registry und Trino, und klicken Sie dann auf Weiter.

    Dienste auswählen

  5. Wenn Ihnen die folgenden empfohlenen Konfigurationen angezeigt werden, klicken Sie auf Jederzeit GENEHMIGT und dann auf DEPLOY.

    Trotzdem fortfahren

  6. Klicken Sie nach der Installation auf Weiter, und klicken Sie dann auf Abschließen.

    Nach der nächsten Installation

    Nach Abschluss der Installation

  7. Starten Sie alle betroffenen Komponenten neu. Klicken Sie neben Services auf Auslassungspunkte (...), wählen Sie Alle erforderlichen neu starten aus, und klicken Sie nach dem Neustart auf OK.

    Ambari Start Dienstleistungen

  8. Melden Sie sich beim Masterknoten des Oracle Big Data Service-Clusters entweder mit einem SSH-Befehl an, oder verwenden Sie putty mit der Datei ppk mit den opc-Benutzerzugangsdaten. Leiten Sie nach der Anmeldung Ihre Berechtigungen an den Benutzer root weiter. Wir haben uns mit putty bei den Knoten angemeldet.

    sudo su -
    
  9. Führen Sie die folgenden Schritte aus, um die JAR-Datei hochzuladen, damit Sie mit Flink auf Kafka zugreifen können.

    1. Laden Sie lib.zip in ein beliebiges Verzeichnis herunter (z.B. /tmp), und dekomprimieren Sie es.

      cd /tmp
      wget https://objectstorage.ap-tokyo-1.oraclecloud.com/p/bhsTSKQYGjbeGwm5JW63Re3_o9o2JOrKiQVi_-6hrQfrB7lGSvA1z5RPyDLy3lpU/n/sehubjapacprod/b/live-lab/o/download/lib.zip
      #unzip lib
      unzip /tmp/lib.zip
      
    2. Führen Sie den folgenden Befehl aus, um die JAR-Datei auf alle BDS-Knoten zu kopieren.

      dcli -C "cp /usr/odh/current/flink/connectors/*kafka*.jar  /usr/odh/current/flink/lib"
      dcli -C "cp /usr/lib/flink/connectors/flink-connector-jdbc-1.15.2.jar  /usr/odh/current/flink/lib"
      dcli -C "cp /usr/odh/current/kafka-broker/libs/kafka-clients-3.2.0.jar  /usr/odh/current/flink/lib"
      su - hdfs
      hadoop fs -put /usr/odh/current/flink/connectors/*.jar /flink/lib/flink-libs/
      hadoop fs -put /usr/odh/current/kafka-broker/libs/kafka-clients-3.2.0.jar /flink/lib/flink-libs/
      hadoop fs -put /tmp/lib/mysql-connector-java-8.0.28.jar /flink/lib/flink-libs/
      
  10. Melden Sie sich mit den folgenden Befehlen beim Trino an, und erstellen Sie ein Schema und eine Tabelle, in die der Flink-Consumer die Avro-Daten schreibt.

    For HA Cluster
    
    /usr/lib/trino/bin/trino-cli --server <cordinator dns>:7778 --krb5-principal <username> --krb5-keytab-path <path to key tab> trino.service.keytab --krb5-remote-service-name trino --user <username> --truststore-path=/etc/security/serverKeys/truststore.jks
    
    For NON HA Cluster
    
    /usr/lib/trino/bin/trino-cli --server <cordinator dns>:8285
    
    Create Schema
    
    CREATE SCHEMA <catalog>.<schema name> WITH ( LOCATION  = '<object store>/<folder>');
    
    Create table on SCHEMA
    
    CREATE TABLE <catalog>.<schema name>.<table name> (
    name varchar,
    lastname varchar,
    age int,
    email varchar,
    timestamp bigint
    )
    WITH (
    format = 'AVRO',
    avro_schema_url = '<object store>/<avro schema file name>'
    )
    
    

    Hinweis: Führen Sie die Schritte aus, um Hive Metastore für die Verwendung der SerdeStorageSchemaReader-Implementierung zu konfigurieren. Trino muss Daten aus dem OCI-Objektspeicher abfragen.

  11. Gehen Sie zu Ambari, Hive, Config und Custom hive-site. Legen Sie die folgende Eigenschaft fest, um Hive Metastore zu konfigurieren: metastore.storage.schema.reader.impl=org.apache.hadoop.hive.metastore.SerDeStorageSchemaReader.

  12. Erstellen Sie einen OCI Object Storage-Bucket zum Speichern der Ausgabe.

Aufgabe 2: Java Maven-Projekt mit POM-Abhängigkeiten erstellen

Um die Entwicklung Ihrer Echtzeit-Datenpipeline zu starten, richten wir ein Java Maven-Projekt mit den folgenden erforderlichen Abhängigkeiten ein. Sie können eine beliebige IDE oder einen beliebigen Projekttyp auswählen, der Ihren Wünschen entspricht.

<dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka</artifactId>
      <version>1.15.2</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.12</artifactId>
      <version>3.2.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>1.15.2</version>
      <scope>provided</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-core</artifactId>
      <version>1.15.2</version>
      <scope>provided</scope>
    </dependency>


    <!-- https://mvnrepository.com/artifact/com.hortonworks.registries/schema-registry-serdes -->
    <dependency>
      <groupId>com.hortonworks.registries</groupId>
      <artifactId>schema-registry-serdes</artifactId>
      <version>1.0.0</version>
    </dependency>


    <!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
    <dependency>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro</artifactId>
      <version>1.11.3</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.hortonworks.registries/schema-registry-client -->
    <dependency>
      <groupId>com.hortonworks.registries</groupId>
      <artifactId>schema-registry-client</artifactId>
      <version>1.0.0</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.hortonworks.registries/schema-registry-common -->
    <dependency>
      <groupId>com.hortonworks.registries</groupId>
      <artifactId>schema-registry-common</artifactId>
      <version>1.0.0</version>
    </dependency>


    <dependency>
      <groupId>javax.ws.rs</groupId>
      <artifactId>javax.ws.rs-api</artifactId>
      <version>2.0.1</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.hortonworks.registries/jersey-shaded -->
    <dependency>
      <groupId>com.hortonworks.registries</groupId>
      <artifactId>jersey-shaded</artifactId>
      <version>0.9.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.hortonworks.registries/common-auth -->
    <dependency>
      <groupId>com.hortonworks.registries</groupId>
      <artifactId>common-auth</artifactId>
      <version>1.0.0</version>
    </dependency>


    <!-- https://mvnrepository.com/artifact/com.hortonworks.registries/registry-common-client -->
    <dependency>
      <groupId>com.hortonworks.registries</groupId>
      <artifactId>registry-common-client</artifactId>
      <version>1.0.0</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>3.2.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-guava -->
    <dependency>
      <groupId>com.fasterxml.jackson.datatype</groupId>
      <artifactId>jackson-datatype-guava</artifactId>
      <version>2.14.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-joda -->
    <dependency>
      <groupId>com.fasterxml.jackson.datatype</groupId>
      <artifactId>jackson-datatype-joda</artifactId>
      <version>2.14.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.module/jackson-module-parameter-names -->
    <dependency>
      <groupId>com.fasterxml.jackson.module</groupId>
      <artifactId>jackson-module-parameter-names</artifactId>
      <version>2.14.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jdk8 -->
    <dependency>
      <groupId>com.fasterxml.jackson.datatype</groupId>
      <artifactId>jackson-datatype-jdk8</artifactId>
      <version>2.14.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jsr310 -->
    <dependency>
      <groupId>com.fasterxml.jackson.datatype</groupId>
      <artifactId>jackson-datatype-jsr310</artifactId>
      <version>2.14.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-json-org -->
    <dependency>
      <groupId>com.fasterxml.jackson.datatype</groupId>
      <artifactId>jackson-datatype-json-org</artifactId>
      <version>2.14.1</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-annotations -->
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-annotations</artifactId>
      <version>2.14.1</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-avro</artifactId>
      <version>1.15.2</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro-cloudera-registry -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-avro-cloudera-registry</artifactId>
      <version>1.15.1-csa1.8.0.4</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-redis -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-redis_2.10</artifactId>
      <version>1.1.5-hadoop1</version>
    </dependency>

    <dependency>
      <groupId>com.oracle.oci.sdk</groupId>
      <artifactId>oci-java-sdk-redis</artifactId>
      <version>3.26.0</version>
    </dependency>
</dependencies>

Aufgabe 3: Java Kafka Avro Producer mit Beispielcode und Datei avsc erstellen

Erstellen Sie den Java Kafka Avro-Producer für Ihre Datenpipeline mit dem bereitgestellten Beispielcode und dem Avro-Schema. Ersetzen Sie Platzhalter wie your_kafka_bootstrap_servers, your_schema_registry_url, your_kafka_topic, und passen Sie das Avro-Schema an Ihre spezifischen Werte und Struktur an.

Dieser Code dient als Grundlage für Ihren Avro-Produzenten und ermöglicht es ihm, Avro-Daten für das Kafka-Thema in Ihrer Echtzeit-Datenpipeline zu erstellen.

package org.example;
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.StringSerializer;
import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;


import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Properties;

public class KafkaAvroProducer {
    public static void main(String[] args) {

        String schemaFileName = "ItemTransaction.avsc"; // Provide the path to your Avro schema file
        String topicName = "kafka-flink-avro-1"; // Provide the Kafka topic name

        try {
            //Map<String, Object> config = new HashMap<>();

            // Load producer properties
            Properties producerConfig = new Properties();
            //Map<String, Object> producerConfig = new HashMap<>();
            producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<Kafka server IP/URL>:<Port>");
            producerConfig.putAll(Collections.singletonMap(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), "http://<Schema Registry IP/URL>:<Port>/api/v1"));
            producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());

            // Create a Kafka producer
            Producer<String, GenericRecord> producer = new KafkaProducer<>(producerConfig);
            // Load Avro schema from resources
            Schema schema = loadAvroSchemaFromResources(schemaFileName);
            // Create a sample Avro record
            while (true) {
                GenericRecord avroRecord = new GenericData.Record(schema);
                avroRecord.put("name", "My Name");
                avroRecord.put("lastname", "My Last Name");
                avroRecord.put("age", 10);
                avroRecord.put("email", "a@mail.com");
                avroRecord.put("timestamp", System.currentTimeMillis());

                // Create a Kafka record with a topic and Avro payload
                ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(topicName, avroRecord);

                // Send the message
                producer.send(record);
                // Sleep for 3 seconds before sending the next message
                Thread.sleep(3000);
            }
            //producer.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static Schema loadAvroSchemaFromResources(String schemaFileName) throws IOException {
        try (InputStream inputStream = KafkaAvroProducer.class.getClassLoader().getResourceAsStream(schemaFileName)) {
            if (inputStream == null) {
                throw new IllegalArgumentException("Schema file not found in resources: " + schemaFileName);
            }
            return new Schema.Parser().parse(inputStream);
        }
    }
}

Das Avro-Nachrichtenformat, das von der Schemadatei ItemTransaction.avsc definiert wird, wie unten dargestellt, wird im Ordner resources des Projekts gespeichert. Es ist wichtig zu betonen, dass für die Serialisierung und Deserialisierung von Avro eine entsprechende Java Avro-spezifische POJO-Klasse namens ItemTransaction.java generiert werden muss, um das Avro-Format effektiv zu verarbeiten.

{
   "type" : "record",
   "name" : "ItemTransaction",
   "namespace": "org.example",
   "fields" : [
      { "name" : "name" , "type" : "string" },
      { "name" : "lastname" , "type" : "string" },
      { "name" : "age" , "type"  : "int" },
      { "name" : "email" , "type" : "string" },
      { "name" : "timestamp" , "type" : "long" }
   ]
}

Wir erstellen den Java Flink-Consumer für Ihre Datenpipeline mit dem bereitgestellten Beispielcode. Ersetzen Sie Platzhalter wie your_kafka_bootstrap_servers, your_consumer_group_id und your_kafka_topic durch Ihre spezifischen Kafka-Setupdetails.

Dieser Beispielcode richtet einen Flink-Kafka-Consumer in der Flink-Umgebung ein, mit dem Sie den eingehenden Kafka-Stream verarbeiten können. Passen Sie die Verarbeitungslogik nach Bedarf für Ihre Datenpipeline an.

package org.example;

import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.AvroOutputFormat;
import org.apache.flink.formats.avro.registry.cloudera.ClouderaRegistryKafkaDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Collections;
import java.util.Properties;

public class KafkaAvroFlinkConsumer {
    public static void main(String[] args) throws Exception {
        String topicName = "kafka-flink-avro-1"; // Same Kafka topic as the producer
        Properties consumerConfig = new Properties();
        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-group-id");
        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<Kafka server Ip/Url>:<Port>");
        consumerConfig.putAll(Collections.singletonMap(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), "<Schema Regitry Ip/Utl>:<Port>/api/v1"));
        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Create a Flink Kafka consumer with the custom Avro deserialization schema
        FlinkKafkaConsumer<ItemTransaction> kafkaConsumer = new FlinkKafkaConsumer<>(
                topicName,
                ClouderaRegistryKafkaDeserializationSchema
                        .builder(ItemTransaction.class)
                        .setRegistryAddress("http://<Schema Registry Ip/url>:<Port>/api/v1")
                        .build(),
                consumerConfig
        );

        // Add the Kafka consumer to the Flink environment
        DataStream<ItemTransaction> avroStream = env.addSource(kafkaConsumer);

        KeyedStream<ItemTransaction, Tuple1<String>> keyedStream = avroStream.keyBy(new KeyExtractor());

        // Process and write output using DynamicOutputPathMapper
        DataStream<Tuple2<String, Long>> windowedCounts = keyedStream
                .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
                .reduce(new ReduceFunction<ItemTransaction>() {
                    @Override
                    public ItemTransaction reduce(ItemTransaction item1, ItemTransaction item2) throws Exception {
                        return item1; // You can modify the reduce logic as needed
                    }
                })
                .map(new DynamicOutputPathMapper())
                .keyBy(0)
                .sum(1);

        windowedCounts.print();

        // Print Avro records for processing
        avroStream.print();

        env.execute("KafkaAvroFlinkConsumer");
    }

    public static class KeyExtractor implements KeySelector<ItemTransaction, Tuple1<String>> {
        @Override
        public Tuple1<String> getKey(ItemTransaction item) {
            return new Tuple1<>(item.getName().toString());
        }
    }

    // Custom Avro deserialization schema

    public static class DynamicOutputPathMapper extends RichMapFunction<ItemTransaction, Tuple2<String, Long>> {
        private long windowCount = 0;
        private transient AvroOutputFormat<ItemTransaction> avroOutputFormat;
        private transient int subtaskIndex;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
        }

        @Override
        public void close() throws Exception {
            if (avroOutputFormat != null) {
                avroOutputFormat.close();
                avroOutputFormat = null;
            }
        }

        @Override
        public Tuple2<String, Long> map(ItemTransaction item) throws Exception {
            // Increment the window count
            windowCount++;

            // Generate a dynamic output path based on the window count and subtask index
            String dynamicOutputPath = "oci://flink-output@orasenatdctocloudcorp01/output" + windowCount + "_" + subtaskIndex + ".txt";

            // Initialize or update the AvroOutputFormat
            if (avroOutputFormat == null) {
                avroOutputFormat = new AvroOutputFormat<>(new Path(dynamicOutputPath), ItemTransaction.class);
                avroOutputFormat.configure(null);
                avroOutputFormat.open(subtaskIndex, getRuntimeContext().getNumberOfParallelSubtasks());
            } else if (!dynamicOutputPath.equals(avroOutputFormat.getOutputFilePath())) {
                avroOutputFormat.close();
                avroOutputFormat.setOutputFilePath(new Path(dynamicOutputPath));
                avroOutputFormat.open(subtaskIndex, getRuntimeContext().getNumberOfParallelSubtasks());
            }

            // Write the item using AvroOutputFormat
            avroOutputFormat.writeRecord(item);

            return new Tuple2<>("Window Count", 1L);
        }
    }
}

Aufgabe 5: Anwendungs-JARs bereitstellen und ausführen

Sie haben den Java Kafka Avro-Producer und den Flink-Consumer erstellt. Es ist an der Zeit, diese Anwendungen auf Ihren OCI BDS-Clustern bereitzustellen und auszuführen. Stellen Sie die JAR-Datei im Hadoop-Cluster bereit, und führen Sie Producer und Consumer mit den folgenden Befehlen aus.

sudo -u flink  /usr/odh/current/flink/bin//flink run -m yarn-cluster -yD classloader.check-leaked-classloader=false -c <Main Class Name> <Path/To/Jar>/<Jar Name>
sudo -u flink  /usr/odh/current/flink/bin//flink run -m yarn-cluster -yD classloader.check-leaked-classloader=false -c <Main Class Name> <Path/To/Jar>/<Jar Name>

Aufgabe 6: Prüfen, ob das Topic bei Schema Registry registriert ist

Bevor Sie fortfahren, müssen Sie unbedingt prüfen, ob das Kafka-Thema, das Sie in der Datenpipeline verwenden, bei der Schema-Registry registriert ist. Das Schema wird mit dem in der Producer-Anwendung verwendeten Themennamen registriert.

Schema-Regitry

Ihr Flink-Job wird im BDS-Cluster ausgeführt. Überwachen wir seinen Fortschritt über die Web-UI von Flink und validieren die verarbeiteten Daten, die in Object Storage gespeichert sind.

Web-UI verknüpfen

Object Storage-Ausgabe

Aufgabe 8: Avro-Daten mit Trino abfragen

Trino, eine verteilte Open-Source-SQL-Abfrage-Engine, kann verwendet werden, um Avro-Daten nahtlos abzufragen. Stellen Sie sicher, dass Trino in Ihrer Umgebung installiert und konfiguriert ist. Rufen Sie die erforderlichen Verbindungsdetails wie die Trino-Koordinator-URL ab. Öffnen Sie ein BDS-Terminal, und starten Sie die Trino-CLI mit dem Befehl. Verwenden Sie die Trino-CLI, um SQL-Abfragen für die Avro-Daten auszuführen, wie in der folgenden Abbildung dargestellt.

Trino Abfrage Avro Daten

Nächste Schritte

Dieses Tutorial bietet eine umfassende Anleitung zum Erstellen einer Echtzeit-Datenpipeline auf OCI mit Big Data Service, bei der wichtige Services zur effizienten Erstellung, Verarbeitung und Abfrage von Avro-Daten integriert werden. Mit klaren Zielen, Anweisungen und positiver Verstärkung sind Sie gut gerüstet, um Ihre eigene Echtzeit-Datenpipeline zu erstellen und zu optimieren.

Danksagungen

Weitere Lernressourcen

Lernen Sie andere Übungen auf docs.oracle.com/learn kennen, oder greifen Sie auf weitere kostenlose Lerninhalte im Oracle Learning YouTube Channel zu. Besuchen Sie außerdem education.oracle.com/learning-explorer, um Oracle Learning Explorer zu werden.

Produktdokumentation finden Sie im Oracle Help Center.