주:
- 이 사용지침서에서는 Oracle Cloud에 액세스해야 합니다. 무료 계정에 등록하려면 Oracle Cloud Infrastructure Free Tier 시작하기를 참조하십시오.
- Oracle Cloud Infrastructure 인증서, 테넌시 및 구획에 대한 예제 값을 사용합니다. 실습을 마치면 이러한 값을 자신의 클라우드 환경과 관련된 값으로 대체합니다.
Oracle Cloud Infrastructure Big Data Service로 실시간 데이터 파이프라인 구축
소개
오늘날의 데이터 중심 세계에서 조직은 효율적인 데이터 파이프라인을 사용하여 데이터를 실시간으로 캡처, 처리 및 분석합니다. Oracle Cloud Infrastructure(OCI)는 강력한 데이터 파이프라인 구축을 위해 활용할 수 있는 강력한 서비스 에코시스템을 제공합니다. 이 사용지침서에서는 Oracle Cloud Infrastructure Big Data Service(OCI BDS)를 활용하는 데이터 파이프라인 생성 과정을 안내합니다. OCI BDS는 사용자가 Hadoop 클러스터, Spark 클러스터 및 기타 빅데이터 서비스를 생성하고 관리할 수 있도록 지원하는 Oracle에서 제공하는 클라우드 기반 서비스입니다. 우리는 Kafka, Flink, Schema Registry 및 Trino를 포함한 생태계의 일부를 활용하여이 파이프라인을 구축 할 것입니다.
이 자습서 전체에서 원활하고 안전한 데이터 파이프라인 배포를 보장하기 위한 자세한 지침과 코드 예를 제공합니다.
목표
-
OCI 리소스 설정: 원활한 데이터 파이프라인을 위해 BDS 클러스터, Kafka, Flink, 스키마 레지스트리, OCI Object Storage, Trino 등 OCI 리소스를 구성하는 방법을 알아봅니다.
-
Java(Avro Producer): 스키마 레지스트리에서 Avro 스키마를 사용하는 동안 Kafka 항목으로 Avro 데이터를 생성하기 위한 Java 애플리케이션을 생성합니다.
-
Java(Flink Consumer): Kafka의 데이터를 소비하고 처리한 다음 Object Storage에 저장하는 Flink 소비자에 대한 BDS 클러스터를 설정합니다.
-
응용 프로그램 배치 및 실행: Avro 생산자와 Flink 소비자 응용 프로그램을 BDS 클러스터에서 배치하고 실행하는 방법을 알아봅니다.
-
선별된 데이터 쿼리: Trino, BDS Cloud SQL, 쿼리 서비스, OAC 등 다양한 OCI 툴을 사용하여 OCI Object Storage에서 선별된 데이터에 접근할 수 있습니다. Trino를 사용하여 Avro 데이터를 쿼리합니다.
필요 조건
-
OCI 계정입니다.
-
OCI BDS, OCI Object Storage를 포함한 OCI 서비스에 액세스합니다.
-
Java와 Apache Flink에 대한 지식
-
필요한 OCI CLI(명령행 인터페이스) 도구 및 SDK가 설치되어 있습니다.
-
Avro와 Kafka에 대한 이해
작업 1: Kafka/Flink/스키마 레지스트리/Trino가 사용으로 설정된 Oracle Big Data 클러스터 생성
-
OCI에서 BDS(ODH 2.0) 환경을 생성합니다. BDS 클러스터를 생성하려면 고가용성 ODH 빅데이터 클러스터 시작하기를 참조하십시오.
-
https://<cluster name>un0-hostname>:7183/
URL로 Ambari에 로그인하여 Kafka, Flink, 스키마 레지스트리 및 Trino 서비스를 추가합니다. 사용자 이름과 비밀번호를 입력한 다음 로그인을 누르십시오. -
서비스 옆에 있는 줄임표(...)를 누른 다음 서비스 추가를 선택합니다.
-
Kafka, Flink, 스키마 레지스트리, Trino를 선택한 후 Next를 누릅니다.
-
다음과 같은 권장 구성이 표시되면 PROCEED ANYWAY를 누른 다음 DEPLOY를 누릅니다.
-
설치 후 다음, 완료 순으로 누릅니다.
-
영향을 받는 모든 구성 요소를 다시 시작합니다. 서비스 옆에 있는 줄임표(...)를 누르고 필수 항목 모두 재시작을 선택한 다음 재시작 후 확인을 누릅니다.
-
SSH
명령을 통해 또는opc
사용자 인증서를 사용하여ppk
파일에서 putty를 사용하여 Oracle Big Data Service 클러스터 마스터 노드에 로그인합니다. 로그인한 후root
사용자에게 권한을 올립니다. putty를 사용하여 노드에 로그인했습니다.sudo su -
-
다음 단계를 실행하여 Flink를 사용하여 Kafka에 액세스할 수 있도록 jar 파일을 업로드합니다.
-
lib.zip
을 모든 디렉토리(예:/tmp
)에 다운로드하고 압축을 풉니다.cd /tmp wget https://objectstorage.ap-tokyo-1.oraclecloud.com/p/bhsTSKQYGjbeGwm5JW63Re3_o9o2JOrKiQVi_-6hrQfrB7lGSvA1z5RPyDLy3lpU/n/sehubjapacprod/b/live-lab/o/download/lib.zip #unzip lib unzip /tmp/lib.zip
-
다음 명령을 실행하여 jar을 모든 BDS 노드에 복사합니다.
dcli -C "cp /usr/odh/current/flink/connectors/*kafka*.jar /usr/odh/current/flink/lib" dcli -C "cp /usr/lib/flink/connectors/flink-connector-jdbc-1.15.2.jar /usr/odh/current/flink/lib" dcli -C "cp /usr/odh/current/kafka-broker/libs/kafka-clients-3.2.0.jar /usr/odh/current/flink/lib" su - hdfs hadoop fs -put /usr/odh/current/flink/connectors/*.jar /flink/lib/flink-libs/ hadoop fs -put /usr/odh/current/kafka-broker/libs/kafka-clients-3.2.0.jar /flink/lib/flink-libs/ hadoop fs -put /tmp/lib/mysql-connector-java-8.0.28.jar /flink/lib/flink-libs/
-
-
다음 명령을 사용하여 Trino에 로그인하고 Flink 소비자가 Avro 데이터를 쓸 스키마와 테이블을 만듭니다.
For HA Cluster /usr/lib/trino/bin/trino-cli --server <cordinator dns>:7778 --krb5-principal <username> --krb5-keytab-path <path to key tab> trino.service.keytab --krb5-remote-service-name trino --user <username> --truststore-path=/etc/security/serverKeys/truststore.jks For NON HA Cluster /usr/lib/trino/bin/trino-cli --server <cordinator dns>:8285 Create Schema CREATE SCHEMA <catalog>.<schema name> WITH ( LOCATION = '<object store>/<folder>'); Create table on SCHEMA CREATE TABLE <catalog>.<schema name>.<table name> ( name varchar, lastname varchar, age int, email varchar, timestamp bigint ) WITH ( format = 'AVRO', avro_schema_url = '<object store>/<avro schema file name>' )
주: 단계에 따라
SerdeStorageSchemaReader
구현을 사용하도록 Hive 메타 저장소를 구성합니다. Trino는 OCI 객체 저장소에서 데이터를 쿼리해야 합니다. -
Ambari, Hive, Config 및 Custom hive-site로 이동합니다. Hive 메타 저장소를 구성하려면
metastore.storage.schema.reader.impl=org.apache.hadoop.hive.metastore.SerDeStorageSchemaReader
속성을 설정합니다. -
출력을 저장할 OCI 오브젝트 스토리지 버킷을 생성합니다.
작업 2: POM 종속성을 사용하여 Java Maven 프로젝트 빌드
실시간 데이터 파이프라인 개발을 시작하려면 다음 필수 종속성을 사용하여 Java Maven 프로젝트를 설정해 보겠습니다. 원하는 IDE 또는 프로젝트 유형을 선택할 수 있습니다.
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.15.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>3.2.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.15.2</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.15.2</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/schema-registry-serdes -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>schema-registry-serdes</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/schema-registry-client -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>schema-registry-client</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/schema-registry-common -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>schema-registry-common</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
<version>2.0.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/jersey-shaded -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>jersey-shaded</artifactId>
<version>0.9.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/common-auth -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>common-auth</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hortonworks.registries/registry-common-client -->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>registry-common-client</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-guava -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-guava</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-joda -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-joda</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.module/jackson-module-parameter-names -->
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-parameter-names</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jdk8 -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jdk8</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jsr310 -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-json-org -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-json-org</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-annotations -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>1.15.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro-cloudera-registry -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro-cloudera-registry</artifactId>
<version>1.15.1-csa1.8.0.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-redis -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.10</artifactId>
<version>1.1.5-hadoop1</version>
</dependency>
<dependency>
<groupId>com.oracle.oci.sdk</groupId>
<artifactId>oci-java-sdk-redis</artifactId>
<version>3.26.0</version>
</dependency>
</dependencies>
작업 3: 샘플 코드 및 avsc
파일을 사용하여 Java Kafka Avro 생산자 빌드
제공된 샘플 코드 및 Avro 스키마를 사용하여 데이터 파이프라인에 대한 Java Kafka Avro 생산자를 구축하십시오. your_kafka_bootstrap_servers
, your_schema_registry_url
, your_kafka_topic
와 같은 자리 표시자를 바꾸고 Avro 스키마를 특정 값 및 구조로 조정합니다.
이 코드는 Avro 생산자를 위한 기반이 되어 실시간 데이터 파이프라인 내에서 Kafka 주제에 Avro 데이터를 생성할 수 있습니다.
package org.example;
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.StringSerializer;
import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Properties;
public class KafkaAvroProducer {
public static void main(String[] args) {
String schemaFileName = "ItemTransaction.avsc"; // Provide the path to your Avro schema file
String topicName = "kafka-flink-avro-1"; // Provide the Kafka topic name
try {
//Map<String, Object> config = new HashMap<>();
// Load producer properties
Properties producerConfig = new Properties();
//Map<String, Object> producerConfig = new HashMap<>();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<Kafka server IP/URL>:<Port>");
producerConfig.putAll(Collections.singletonMap(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), "http://<Schema Registry IP/URL>:<Port>/api/v1"));
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
// Create a Kafka producer
Producer<String, GenericRecord> producer = new KafkaProducer<>(producerConfig);
// Load Avro schema from resources
Schema schema = loadAvroSchemaFromResources(schemaFileName);
// Create a sample Avro record
while (true) {
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("name", "My Name");
avroRecord.put("lastname", "My Last Name");
avroRecord.put("age", 10);
avroRecord.put("email", "a@mail.com");
avroRecord.put("timestamp", System.currentTimeMillis());
// Create a Kafka record with a topic and Avro payload
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(topicName, avroRecord);
// Send the message
producer.send(record);
// Sleep for 3 seconds before sending the next message
Thread.sleep(3000);
}
//producer.close();
} catch (Exception e) {
e.printStackTrace();
}
}
private static Schema loadAvroSchemaFromResources(String schemaFileName) throws IOException {
try (InputStream inputStream = KafkaAvroProducer.class.getClassLoader().getResourceAsStream(schemaFileName)) {
if (inputStream == null) {
throw new IllegalArgumentException("Schema file not found in resources: " + schemaFileName);
}
return new Schema.Parser().parse(inputStream);
}
}
}
아래와 같이 스키마 파일 ItemTransaction.avsc
로 정의된 Avro 메시지 형식은 프로젝트의 resources
폴더에 저장됩니다. Avro 직렬화 및 직렬화 해제를 위해서는 Avro 형식을 효과적으로 처리하기 위해 ItemTransaction.java
이라는 해당 Java Avro 특정 POJO 클래스를 생성해야 합니다.
{
"type" : "record",
"name" : "ItemTransaction",
"namespace": "org.example",
"fields" : [
{ "name" : "name" , "type" : "string" },
{ "name" : "lastname" , "type" : "string" },
{ "name" : "age" , "type" : "int" },
{ "name" : "email" , "type" : "string" },
{ "name" : "timestamp" , "type" : "long" }
]
}
작업 4: 예제 코드를 사용하여 Java Flink consumer 작성
제공된 샘플 코드를 사용하여 데이터 파이프라인에 대한 Java Flink 소비자를 구성합니다. your_kafka_bootstrap_servers
, your_consumer_group_id
, your_kafka_topic
등의 위치 표시자를 특정 Kafka 설정 세부정보로 바꿉니다.
이 샘플 코드는 수신 Kafka 스트림을 처리할 수 있도록 Flink 환경 내에서 Flink Kafka 소비자를 설정합니다. 필요에 따라 데이터 파이프라인에 대한 처리 논리를 사용자정의합니다.
package org.example;
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.AvroOutputFormat;
import org.apache.flink.formats.avro.registry.cloudera.ClouderaRegistryKafkaDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;
public class KafkaAvroFlinkConsumer {
public static void main(String[] args) throws Exception {
String topicName = "kafka-flink-avro-1"; // Same Kafka topic as the producer
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-group-id");
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<Kafka server Ip/Url>:<Port>");
consumerConfig.putAll(Collections.singletonMap(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), "<Schema Regitry Ip/Utl>:<Port>/api/v1"));
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create a Flink Kafka consumer with the custom Avro deserialization schema
FlinkKafkaConsumer<ItemTransaction> kafkaConsumer = new FlinkKafkaConsumer<>(
topicName,
ClouderaRegistryKafkaDeserializationSchema
.builder(ItemTransaction.class)
.setRegistryAddress("http://<Schema Registry Ip/url>:<Port>/api/v1")
.build(),
consumerConfig
);
// Add the Kafka consumer to the Flink environment
DataStream<ItemTransaction> avroStream = env.addSource(kafkaConsumer);
KeyedStream<ItemTransaction, Tuple1<String>> keyedStream = avroStream.keyBy(new KeyExtractor());
// Process and write output using DynamicOutputPathMapper
DataStream<Tuple2<String, Long>> windowedCounts = keyedStream
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
.reduce(new ReduceFunction<ItemTransaction>() {
@Override
public ItemTransaction reduce(ItemTransaction item1, ItemTransaction item2) throws Exception {
return item1; // You can modify the reduce logic as needed
}
})
.map(new DynamicOutputPathMapper())
.keyBy(0)
.sum(1);
windowedCounts.print();
// Print Avro records for processing
avroStream.print();
env.execute("KafkaAvroFlinkConsumer");
}
public static class KeyExtractor implements KeySelector<ItemTransaction, Tuple1<String>> {
@Override
public Tuple1<String> getKey(ItemTransaction item) {
return new Tuple1<>(item.getName().toString());
}
}
// Custom Avro deserialization schema
public static class DynamicOutputPathMapper extends RichMapFunction<ItemTransaction, Tuple2<String, Long>> {
private long windowCount = 0;
private transient AvroOutputFormat<ItemTransaction> avroOutputFormat;
private transient int subtaskIndex;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
}
@Override
public void close() throws Exception {
if (avroOutputFormat != null) {
avroOutputFormat.close();
avroOutputFormat = null;
}
}
@Override
public Tuple2<String, Long> map(ItemTransaction item) throws Exception {
// Increment the window count
windowCount++;
// Generate a dynamic output path based on the window count and subtask index
String dynamicOutputPath = "oci://flink-output@orasenatdctocloudcorp01/output" + windowCount + "_" + subtaskIndex + ".txt";
// Initialize or update the AvroOutputFormat
if (avroOutputFormat == null) {
avroOutputFormat = new AvroOutputFormat<>(new Path(dynamicOutputPath), ItemTransaction.class);
avroOutputFormat.configure(null);
avroOutputFormat.open(subtaskIndex, getRuntimeContext().getNumberOfParallelSubtasks());
} else if (!dynamicOutputPath.equals(avroOutputFormat.getOutputFilePath())) {
avroOutputFormat.close();
avroOutputFormat.setOutputFilePath(new Path(dynamicOutputPath));
avroOutputFormat.open(subtaskIndex, getRuntimeContext().getNumberOfParallelSubtasks());
}
// Write the item using AvroOutputFormat
avroOutputFormat.writeRecord(item);
return new Tuple2<>("Window Count", 1L);
}
}
}
작업 5: 응용 프로그램 jar 배치 및 실행
Java Kafka Avro 프로듀서 및 Flink 소비자를 구축했습니다. 이제 OCI BDS 클러스터에서 이러한 애플리케이션을 배포하고 실행할 때입니다. Hadoop 클러스터에 jar를 배치하고 다음 명령을 사용하여 생산자 및 소비자를 실행합니다.
sudo -u flink /usr/odh/current/flink/bin//flink run -m yarn-cluster -yD classloader.check-leaked-classloader=false -c <Main Class Name> <Path/To/Jar>/<Jar Name>
sudo -u flink /usr/odh/current/flink/bin//flink run -m yarn-cluster -yD classloader.check-leaked-classloader=false -c <Main Class Name> <Path/To/Jar>/<Jar Name>
작업 6: 토픽이 스키마 레지스트리에 등록되었는지 여부 확인
계속하기 전에 데이터 파이프라인에서 사용 중인 Kafka 토픽이 스키마 레지스트리에 등록되었는지 여부를 확인해야 합니다. 스키마가 생산자 애플리케이션에 사용된 토픽 이름에 등록되어 있음을 확인할 수 있습니다.
작업 7: Flink Web UI를 통해 Flink 모니터링 및 OCI Object Storage의 출력 파일 확인
Flink 작업이 BDS 클러스터에서 실행 중입니다. Flink 웹 UI를 통해 진행 상황을 모니터링하고 Object Storage에 저장된 처리된 데이터를 검증해 보겠습니다.
작업 8: Trino를 사용하여 Avro 데이터 쿼리
오픈 소스 분산 SQL 쿼리 엔진인 Trino는 Avro 데이터를 원활하게 쿼리하는 데 사용할 수 있습니다. Trino가 환경에 설치되고 구성되어 있는지 확인합니다. Trino 조정자 URL과 같은 필요한 연결 세부정보를 가져옵니다. BDS 터미널을 열고 명령을 사용하여 Trino CLI를 시작합니다. 다음 그림과 같이 Trino CLI를 사용하여 Avro 데이터에 대한 SQL 쿼리를 실행합니다.
다음 단계
이 사용지침서에서는 빅데이터 서비스를 사용하여 OCI에서 실시간 데이터 파이프라인을 구축하고 Avro 데이터를 효율적으로 생성, 처리 및 쿼리하기 위한 필수 서비스를 통합하는 포괄적인 가이드를 제공합니다. 명확한 목표, 지침 및 긍정적인 강화를 통해 자체 실시간 데이터 파이프라인을 구성하고 최적화할 수 있습니다.
관련 링크
확인
- Authors - Pavan Upadhyay(주요 클라우드 엔지니어), Saket Bihari(주요 클라우드 엔지니어)
추가 학습 자원
docs.oracle.com/learn에서 다른 실습을 살펴보거나 Oracle Learning YouTube 채널에서 더 많은 무료 학습 콘텐츠에 액세스하십시오. 또한 education.oracle.com/learning-explorer를 방문하여 Oracle Learning Explorer가 되십시오.
제품 설명서는 Oracle Help Center를 참조하십시오.
Build a Real-Time Data Pipeline with Oracle Cloud Infrastructure Big Data Service
F89771-01
November 2023
Copyright © 2023, Oracle and/or its affiliates.