주:

Oracle Cloud Infrastructure Big Data Service로 실시간 데이터 파이프라인 구축

소개

오늘날의 데이터 중심 세계에서 조직은 효율적인 데이터 파이프라인을 사용하여 데이터를 실시간으로 캡처, 처리 및 분석합니다. Oracle Cloud Infrastructure(OCI)는 강력한 데이터 파이프라인 구축을 위해 활용할 수 있는 강력한 서비스 에코시스템을 제공합니다. 이 사용지침서에서는 Oracle Cloud Infrastructure Big Data Service(OCI BDS)를 활용하는 데이터 파이프라인 생성 과정을 안내합니다. OCI BDS는 사용자가 Hadoop 클러스터, Spark 클러스터 및 기타 빅데이터 서비스를 생성하고 관리할 수 있도록 지원하는 Oracle에서 제공하는 클라우드 기반 서비스입니다. 우리는 Kafka, Flink, Schema Registry 및 Trino를 포함한 생태계의 일부를 활용하여이 파이프라인을 구축 할 것입니다.

파이프라인 구조

이 자습서 전체에서 원활하고 안전한 데이터 파이프라인 배포를 보장하기 위한 자세한 지침과 코드 예를 제공합니다.

목표

필요 조건

작업 1: Kafka/Flink/스키마 레지스트리/Trino가 사용으로 설정된 Oracle Big Data 클러스터 생성

  1. OCI에서 BDS(ODH 2.0) 환경을 생성합니다. BDS 클러스터를 생성하려면 고가용성 ODH 빅데이터 클러스터 시작하기를 참조하십시오.

  2. https://<cluster name>un0-hostname>:7183/ URL로 Ambari에 로그인하여 Kafka, Flink, 스키마 레지스트리 및 Trino 서비스를 추가합니다. 사용자 이름과 비밀번호를 입력한 다음 로그인을 누르십시오.

    Ambari에 로그인

  3. 서비스 옆에 있는 줄임표(...)를 누른 다음 서비스 추가를 선택합니다.

    서비스 추가

  4. Kafka, Flink, 스키마 레지스트리, Trino를 선택한 후 Next를 누릅니다.

    서비스 선택

  5. 다음과 같은 권장 구성이 표시되면 PROCEED ANYWAY를 누른 다음 DEPLOY를 누릅니다.

    그래도 계속하세요

  6. 설치 후 다음, 완료 순으로 누릅니다.

    다음 설치 후

    설치 완료 후

  7. 영향을 받는 모든 구성 요소를 다시 시작합니다. 서비스 옆에 있는 줄임표(...)를 누르고 필수 항목 모두 재시작을 선택한 다음 재시작 후 확인을 누릅니다.

    Ambari Start 서비스

  8. SSH 명령을 통해 또는 opc 사용자 인증서를 사용하여 ppk 파일에서 putty를 사용하여 Oracle Big Data Service 클러스터 마스터 노드에 로그인합니다. 로그인한 후 root 사용자에게 권한을 올립니다. putty를 사용하여 노드에 로그인했습니다.

    sudo su -
    
  9. 다음 단계를 실행하여 Flink를 사용하여 Kafka에 액세스할 수 있도록 jar 파일을 업로드합니다.

    1. lib.zip을 모든 디렉토리(예: /tmp)에 다운로드하고 압축을 풉니다.

      cd /tmp
      wget https://objectstorage.ap-tokyo-1.oraclecloud.com/p/bhsTSKQYGjbeGwm5JW63Re3_o9o2JOrKiQVi_-6hrQfrB7lGSvA1z5RPyDLy3lpU/n/sehubjapacprod/b/live-lab/o/download/lib.zip
      #unzip lib
      unzip /tmp/lib.zip
      
    2. 다음 명령을 실행하여 jar을 모든 BDS 노드에 복사합니다.

      dcli -C "cp /usr/odh/current/flink/connectors/*kafka*.jar  /usr/odh/current/flink/lib"
      dcli -C "cp /usr/lib/flink/connectors/flink-connector-jdbc-1.15.2.jar  /usr/odh/current/flink/lib"
      dcli -C "cp /usr/odh/current/kafka-broker/libs/kafka-clients-3.2.0.jar  /usr/odh/current/flink/lib"
      su - hdfs
      hadoop fs -put /usr/odh/current/flink/connectors/*.jar /flink/lib/flink-libs/
      hadoop fs -put /usr/odh/current/kafka-broker/libs/kafka-clients-3.2.0.jar /flink/lib/flink-libs/
      hadoop fs -put /tmp/lib/mysql-connector-java-8.0.28.jar /flink/lib/flink-libs/
      
  10. 다음 명령을 사용하여 Trino에 로그인하고 Flink 소비자가 Avro 데이터를 쓸 스키마와 테이블을 만듭니다.

    For HA Cluster
    
    /usr/lib/trino/bin/trino-cli --server <cordinator dns>:7778 --krb5-principal <username> --krb5-keytab-path <path to key tab> trino.service.keytab --krb5-remote-service-name trino --user <username> --truststore-path=/etc/security/serverKeys/truststore.jks
    
    For NON HA Cluster
    
    /usr/lib/trino/bin/trino-cli --server <cordinator dns>:8285
    
    Create Schema
    
    CREATE SCHEMA <catalog>.<schema name> WITH ( LOCATION  = '<object store>/<folder>');
    
    Create table on SCHEMA
    
    CREATE TABLE <catalog>.<schema name>.<table name> (
    name varchar,
    lastname varchar,
    age int,
    email varchar,
    timestamp bigint
    )
    WITH (
    format = 'AVRO',
    avro_schema_url = '<object store>/<avro schema file name>'
    )
    
    

    : 단계에 따라 SerdeStorageSchemaReader 구현을 사용하도록 Hive 메타 저장소를 구성합니다. Trino는 OCI 객체 저장소에서 데이터를 쿼리해야 합니다.

  11. Ambari, Hive, ConfigCustom hive-site로 이동합니다. Hive 메타 저장소를 구성하려면 metastore.storage.schema.reader.impl=org.apache.hadoop.hive.metastore.SerDeStorageSchemaReader 속성을 설정합니다.

  12. 출력을 저장할 OCI 오브젝트 스토리지 버킷을 생성합니다.

작업 2: POM 종속성을 사용하여 Java Maven 프로젝트 빌드

실시간 데이터 파이프라인 개발을 시작하려면 다음 필수 종속성을 사용하여 Java Maven 프로젝트를 설정해 보겠습니다. 원하는 IDE 또는 프로젝트 유형을 선택할 수 있습니다.

<dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka</artifactId>
      <version>1.15.2</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.12</artifactId>
      <version>3.2.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>1.15.2</version>
      <scope>provided</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-core</artifactId>
      <version>1.15.2</version>
      <scope>provided</scope>
    </dependency>


    <!-- https://mvnrepository.com/artifact/com.hortonworks.registries/schema-registry-serdes -->
    <dependency>
      <groupId>com.hortonworks.registries</groupId>
      <artifactId>schema-registry-serdes</artifactId>
      <version>1.0.0</version>
    </dependency>


    <!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
    <dependency>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro</artifactId>
      <version>1.11.3</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.hortonworks.registries/schema-registry-client -->
    <dependency>
      <groupId>com.hortonworks.registries</groupId>
      <artifactId>schema-registry-client</artifactId>
      <version>1.0.0</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.hortonworks.registries/schema-registry-common -->
    <dependency>
      <groupId>com.hortonworks.registries</groupId>
      <artifactId>schema-registry-common</artifactId>
      <version>1.0.0</version>
    </dependency>


    <dependency>
      <groupId>javax.ws.rs</groupId>
      <artifactId>javax.ws.rs-api</artifactId>
      <version>2.0.1</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.hortonworks.registries/jersey-shaded -->
    <dependency>
      <groupId>com.hortonworks.registries</groupId>
      <artifactId>jersey-shaded</artifactId>
      <version>0.9.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.hortonworks.registries/common-auth -->
    <dependency>
      <groupId>com.hortonworks.registries</groupId>
      <artifactId>common-auth</artifactId>
      <version>1.0.0</version>
    </dependency>


    <!-- https://mvnrepository.com/artifact/com.hortonworks.registries/registry-common-client -->
    <dependency>
      <groupId>com.hortonworks.registries</groupId>
      <artifactId>registry-common-client</artifactId>
      <version>1.0.0</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>3.2.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-guava -->
    <dependency>
      <groupId>com.fasterxml.jackson.datatype</groupId>
      <artifactId>jackson-datatype-guava</artifactId>
      <version>2.14.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-joda -->
    <dependency>
      <groupId>com.fasterxml.jackson.datatype</groupId>
      <artifactId>jackson-datatype-joda</artifactId>
      <version>2.14.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.module/jackson-module-parameter-names -->
    <dependency>
      <groupId>com.fasterxml.jackson.module</groupId>
      <artifactId>jackson-module-parameter-names</artifactId>
      <version>2.14.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jdk8 -->
    <dependency>
      <groupId>com.fasterxml.jackson.datatype</groupId>
      <artifactId>jackson-datatype-jdk8</artifactId>
      <version>2.14.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jsr310 -->
    <dependency>
      <groupId>com.fasterxml.jackson.datatype</groupId>
      <artifactId>jackson-datatype-jsr310</artifactId>
      <version>2.14.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-json-org -->
    <dependency>
      <groupId>com.fasterxml.jackson.datatype</groupId>
      <artifactId>jackson-datatype-json-org</artifactId>
      <version>2.14.1</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-annotations -->
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-annotations</artifactId>
      <version>2.14.1</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-avro</artifactId>
      <version>1.15.2</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro-cloudera-registry -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-avro-cloudera-registry</artifactId>
      <version>1.15.1-csa1.8.0.4</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-redis -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-redis_2.10</artifactId>
      <version>1.1.5-hadoop1</version>
    </dependency>

    <dependency>
      <groupId>com.oracle.oci.sdk</groupId>
      <artifactId>oci-java-sdk-redis</artifactId>
      <version>3.26.0</version>
    </dependency>
</dependencies>

작업 3: 샘플 코드 및 avsc 파일을 사용하여 Java Kafka Avro 생산자 빌드

제공된 샘플 코드 및 Avro 스키마를 사용하여 데이터 파이프라인에 대한 Java Kafka Avro 생산자를 구축하십시오. your_kafka_bootstrap_servers, your_schema_registry_url, your_kafka_topic와 같은 자리 표시자를 바꾸고 Avro 스키마를 특정 값 및 구조로 조정합니다.

이 코드는 Avro 생산자를 위한 기반이 되어 실시간 데이터 파이프라인 내에서 Kafka 주제에 Avro 데이터를 생성할 수 있습니다.

package org.example;
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.StringSerializer;
import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;


import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Properties;

public class KafkaAvroProducer {
    public static void main(String[] args) {

        String schemaFileName = "ItemTransaction.avsc"; // Provide the path to your Avro schema file
        String topicName = "kafka-flink-avro-1"; // Provide the Kafka topic name

        try {
            //Map<String, Object> config = new HashMap<>();

            // Load producer properties
            Properties producerConfig = new Properties();
            //Map<String, Object> producerConfig = new HashMap<>();
            producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<Kafka server IP/URL>:<Port>");
            producerConfig.putAll(Collections.singletonMap(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), "http://<Schema Registry IP/URL>:<Port>/api/v1"));
            producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());

            // Create a Kafka producer
            Producer<String, GenericRecord> producer = new KafkaProducer<>(producerConfig);
            // Load Avro schema from resources
            Schema schema = loadAvroSchemaFromResources(schemaFileName);
            // Create a sample Avro record
            while (true) {
                GenericRecord avroRecord = new GenericData.Record(schema);
                avroRecord.put("name", "My Name");
                avroRecord.put("lastname", "My Last Name");
                avroRecord.put("age", 10);
                avroRecord.put("email", "a@mail.com");
                avroRecord.put("timestamp", System.currentTimeMillis());

                // Create a Kafka record with a topic and Avro payload
                ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(topicName, avroRecord);

                // Send the message
                producer.send(record);
                // Sleep for 3 seconds before sending the next message
                Thread.sleep(3000);
            }
            //producer.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static Schema loadAvroSchemaFromResources(String schemaFileName) throws IOException {
        try (InputStream inputStream = KafkaAvroProducer.class.getClassLoader().getResourceAsStream(schemaFileName)) {
            if (inputStream == null) {
                throw new IllegalArgumentException("Schema file not found in resources: " + schemaFileName);
            }
            return new Schema.Parser().parse(inputStream);
        }
    }
}

아래와 같이 스키마 파일 ItemTransaction.avsc로 정의된 Avro 메시지 형식은 프로젝트의 resources 폴더에 저장됩니다. Avro 직렬화 및 직렬화 해제를 위해서는 Avro 형식을 효과적으로 처리하기 위해 ItemTransaction.java이라는 해당 Java Avro 특정 POJO 클래스를 생성해야 합니다.

{
   "type" : "record",
   "name" : "ItemTransaction",
   "namespace": "org.example",
   "fields" : [
      { "name" : "name" , "type" : "string" },
      { "name" : "lastname" , "type" : "string" },
      { "name" : "age" , "type"  : "int" },
      { "name" : "email" , "type" : "string" },
      { "name" : "timestamp" , "type" : "long" }
   ]
}

제공된 샘플 코드를 사용하여 데이터 파이프라인에 대한 Java Flink 소비자를 구성합니다. your_kafka_bootstrap_servers, your_consumer_group_id, your_kafka_topic 등의 위치 표시자를 특정 Kafka 설정 세부정보로 바꿉니다.

이 샘플 코드는 수신 Kafka 스트림을 처리할 수 있도록 Flink 환경 내에서 Flink Kafka 소비자를 설정합니다. 필요에 따라 데이터 파이프라인에 대한 처리 논리를 사용자정의합니다.

package org.example;

import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.AvroOutputFormat;
import org.apache.flink.formats.avro.registry.cloudera.ClouderaRegistryKafkaDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Collections;
import java.util.Properties;

public class KafkaAvroFlinkConsumer {
    public static void main(String[] args) throws Exception {
        String topicName = "kafka-flink-avro-1"; // Same Kafka topic as the producer
        Properties consumerConfig = new Properties();
        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-group-id");
        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<Kafka server Ip/Url>:<Port>");
        consumerConfig.putAll(Collections.singletonMap(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), "<Schema Regitry Ip/Utl>:<Port>/api/v1"));
        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Create a Flink Kafka consumer with the custom Avro deserialization schema
        FlinkKafkaConsumer<ItemTransaction> kafkaConsumer = new FlinkKafkaConsumer<>(
                topicName,
                ClouderaRegistryKafkaDeserializationSchema
                        .builder(ItemTransaction.class)
                        .setRegistryAddress("http://<Schema Registry Ip/url>:<Port>/api/v1")
                        .build(),
                consumerConfig
        );

        // Add the Kafka consumer to the Flink environment
        DataStream<ItemTransaction> avroStream = env.addSource(kafkaConsumer);

        KeyedStream<ItemTransaction, Tuple1<String>> keyedStream = avroStream.keyBy(new KeyExtractor());

        // Process and write output using DynamicOutputPathMapper
        DataStream<Tuple2<String, Long>> windowedCounts = keyedStream
                .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
                .reduce(new ReduceFunction<ItemTransaction>() {
                    @Override
                    public ItemTransaction reduce(ItemTransaction item1, ItemTransaction item2) throws Exception {
                        return item1; // You can modify the reduce logic as needed
                    }
                })
                .map(new DynamicOutputPathMapper())
                .keyBy(0)
                .sum(1);

        windowedCounts.print();

        // Print Avro records for processing
        avroStream.print();

        env.execute("KafkaAvroFlinkConsumer");
    }

    public static class KeyExtractor implements KeySelector<ItemTransaction, Tuple1<String>> {
        @Override
        public Tuple1<String> getKey(ItemTransaction item) {
            return new Tuple1<>(item.getName().toString());
        }
    }

    // Custom Avro deserialization schema

    public static class DynamicOutputPathMapper extends RichMapFunction<ItemTransaction, Tuple2<String, Long>> {
        private long windowCount = 0;
        private transient AvroOutputFormat<ItemTransaction> avroOutputFormat;
        private transient int subtaskIndex;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
        }

        @Override
        public void close() throws Exception {
            if (avroOutputFormat != null) {
                avroOutputFormat.close();
                avroOutputFormat = null;
            }
        }

        @Override
        public Tuple2<String, Long> map(ItemTransaction item) throws Exception {
            // Increment the window count
            windowCount++;

            // Generate a dynamic output path based on the window count and subtask index
            String dynamicOutputPath = "oci://flink-output@orasenatdctocloudcorp01/output" + windowCount + "_" + subtaskIndex + ".txt";

            // Initialize or update the AvroOutputFormat
            if (avroOutputFormat == null) {
                avroOutputFormat = new AvroOutputFormat<>(new Path(dynamicOutputPath), ItemTransaction.class);
                avroOutputFormat.configure(null);
                avroOutputFormat.open(subtaskIndex, getRuntimeContext().getNumberOfParallelSubtasks());
            } else if (!dynamicOutputPath.equals(avroOutputFormat.getOutputFilePath())) {
                avroOutputFormat.close();
                avroOutputFormat.setOutputFilePath(new Path(dynamicOutputPath));
                avroOutputFormat.open(subtaskIndex, getRuntimeContext().getNumberOfParallelSubtasks());
            }

            // Write the item using AvroOutputFormat
            avroOutputFormat.writeRecord(item);

            return new Tuple2<>("Window Count", 1L);
        }
    }
}

작업 5: 응용 프로그램 jar 배치 및 실행

Java Kafka Avro 프로듀서 및 Flink 소비자를 구축했습니다. 이제 OCI BDS 클러스터에서 이러한 애플리케이션을 배포하고 실행할 때입니다. Hadoop 클러스터에 jar를 배치하고 다음 명령을 사용하여 생산자 및 소비자를 실행합니다.

sudo -u flink  /usr/odh/current/flink/bin//flink run -m yarn-cluster -yD classloader.check-leaked-classloader=false -c <Main Class Name> <Path/To/Jar>/<Jar Name>
sudo -u flink  /usr/odh/current/flink/bin//flink run -m yarn-cluster -yD classloader.check-leaked-classloader=false -c <Main Class Name> <Path/To/Jar>/<Jar Name>

작업 6: 토픽이 스키마 레지스트리에 등록되었는지 여부 확인

계속하기 전에 데이터 파이프라인에서 사용 중인 Kafka 토픽이 스키마 레지스트리에 등록되었는지 여부를 확인해야 합니다. 스키마가 생산자 애플리케이션에 사용된 토픽 이름에 등록되어 있음을 확인할 수 있습니다.

스키마 재시도

Flink 작업이 BDS 클러스터에서 실행 중입니다. Flink 웹 UI를 통해 진행 상황을 모니터링하고 Object Storage에 저장된 처리된 데이터를 검증해 보겠습니다.

Flink web ui

오브젝트 스토리지 출력

작업 8: Trino를 사용하여 Avro 데이터 쿼리

오픈 소스 분산 SQL 쿼리 엔진인 Trino는 Avro 데이터를 원활하게 쿼리하는 데 사용할 수 있습니다. Trino가 환경에 설치되고 구성되어 있는지 확인합니다. Trino 조정자 URL과 같은 필요한 연결 세부정보를 가져옵니다. BDS 터미널을 열고 명령을 사용하여 Trino CLI를 시작합니다. 다음 그림과 같이 Trino CLI를 사용하여 Avro 데이터에 대한 SQL 쿼리를 실행합니다.

Trino 쿼리 Avro 데이터

다음 단계

이 사용지침서에서는 빅데이터 서비스를 사용하여 OCI에서 실시간 데이터 파이프라인을 구축하고 Avro 데이터를 효율적으로 생성, 처리 및 쿼리하기 위한 필수 서비스를 통합하는 포괄적인 가이드를 제공합니다. 명확한 목표, 지침 및 긍정적인 강화를 통해 자체 실시간 데이터 파이프라인을 구성하고 최적화할 수 있습니다.

확인

추가 학습 자원

docs.oracle.com/learn에서 다른 실습을 살펴보거나 Oracle Learning YouTube 채널에서 더 많은 무료 학습 콘텐츠에 액세스하십시오. 또한 education.oracle.com/learning-explorer를 방문하여 Oracle Learning Explorer가 되십시오.

제품 설명서는 Oracle Help Center를 참조하십시오.