이벤트 릴레이 개발 및 배포

블록체인 이벤트를 소비하고 이를 Oracle Streaming Service에 연계하는 Oracle Functions 애플리케이션을 생성합니다.

이벤트 릴레이 응용 프로그램에는 스트리밍 서비스에 연결하기 위한 자격 증명이 필요합니다. 인증서는 Oracle Cloud Infrastructure Vault에 보관됩니다. 이 인증서는 Terraform 코드를 실행할 때 Vault에 저장됩니다. OCI 환경의 초기 구성을 수행하면 인증서가 생성됩니다.

이 기능은 다음 서비스를 제공합니다.

  1. Vault에서 스트리밍 서비스 인증서를 검색합니다.
  2. 자격 증명을 해독합니다(저장소는 암호화된 값을 저장).
  3. base64 인코딩 문자열에서 디코딩합니다.
  4. 디코딩된 인증서를 사용하여 Kafka 호환 API를 사용하여 스트리밍 서비스에 접속합니다.
  5. JSON 이벤트 메시지에서 event라는 Java 객체를 생성하고 채웁니다.
  6. Kafka API를 사용하여 이벤트 메시지를 전송합니다.

이벤트 릴레이 생성

릴레이를 프로그래밍 언어로 만들 수 있지만, Kafka API가 Java에 문서화되어 있으므로 Java를 사용하는 것이 좋습니다.

EventProducer 클래스에는 다음 메소드가 포함됩니다.

@FnConfiguration
public void config(RuntimeContext ctx) { ... }

private String decryptData(String cipherText) { ... }

public String handleRequest(Event event) { ... }

pom.xml 파일에서도 다음 종속성이 필요합니다.

<dependencies>
    <dependency>
        <groupId>com.oracle.oci.sdk</groupId>
        <artifactId>oci-java-sdk-keymanagement</artifactId>
        <version>1.12.5</version>
    </dependency>
    <dependency>
        <groupId>javax.activation</groupId>
        <artifactId>javax.activation-api</artifactId>
        <version>1.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.4.0</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.7.30</version>
    </dependency>
    <dependency>
        <groupId>com.fnproject.fn</groupId>
        <artifactId>api</artifactId>
        <version>${fdk.version}</version>
    </dependency>
    .
    .
    .

fn CLI 도구가 설치되어 있는지 확인합니다. 자세한 내용은 https://github.com/fnproject/fn#quickstart을 참조하십시오.

  1. Fn 프로젝트를 만들고 적합한 이름(예: obpeventsfunc)을 지정합니다.
    fn init obpeventsfunc --runtime java
  2. HelloFunction.javaEventProducer.java라는 파일로 바꿉니다.

    모든 코드는 이 단일 클래스에 속합니다. 함수를 구성하려면 다음 클래스를 임포트해야 합니다.

    com.fnproject.fn.api.FnConfiguration
    com.fnproject.fn.api.RuntimeContext
    com.oracle.bmc.auth.AbstractAuthenticationDetailsProvider
    com.oracle.bmc.auth.ResourcePrincipalAuthenticationDetailsProvider
    com.oracle.bmc.keymanagement.KmsCryptoClient
    com.oracle.bmc.keymanagement.model.DecryptDataDetails
    com.oracle.bmc.keymanagement.requests.DecryptRequest
    com.oracle.bmc.keymanagement.responses.DecryptResponse
    org.apache.commons.codec.binary.Base64
    org.apache.kafka.clients.CommonClientConfigs
    org.apache.kafka.clients.producer.KafkaProducer
    org.apache.kafka.clients.producer.ProducerConfig
    org.apache.kafka.clients.producer.ProducerRecord
    org.apache.kafka.common.config.SaslConfigs
    org.apache.kafka.common.serialization.StringSerializer
    java.util.Properties
  3. 블록체인의 이벤트 메시지를 보관하는 내부 클래스를 생성합니다.

    이벤트 메시지의 구조에 해당하는 객체가 필요합니다. 세 개의 클래스 정의는 EventProducer 클래스의 일부입니다. 이 클래스는 Oracle Functions에서 Oracle Blockchain Platform에서 제공되는 JSON 데이터의 직렬화를 해제하는 데 사용됩니다.

    public static class Event {
         public String eventType;
         public String subid;
         public String channel;
         public EventMsg eventMsg;
     
         @Override
         public String toString() {
             return eventMsg.payload.data;
         }
     }
     
     public static class EventMsg {
         public String chaincodeId;
         public String txId;
         public String eventName;
         public Payload payload;
     }
     
     public static class Payload {
         public String type;
         public String data;
     }
  4. config 메소드에 채워지는 구성 값을 보유하도록 여러 String 변수를 전역적으로 정의합니다.

    @FnConfiguration 주석으로 인해 시스템은 RuntimeContext 객체를 이 메소드에 전달합니다. 그런 다음 getConfigurationByKey를 사용하여 전역 변수를 채웁니다. 다음 각 구성 키에 대한 변수가 필요합니다.

    BOOT_STRAP_SERVERS
    STREAM_OCID
    TENANT_NAME 
    USER_NAME
    AUTH_TOKEN
    KMS_ENDPOINT
    KMS_KEY_ID
    

    예:

    @FnConfiguration
    public void config(RuntimeContext ctx) { 
        bootstrapServers = ctx.getConfigurationByKey("BOOT_STRAP_SERVERS").orElse("");
        streamOCID = ...
        .
        .
        .
    }
  5. decryptData 메소드를 생성합니다.

    decryptData 메소드는 다음 코드 조각과 유사합니다. kmsEndpointkmsKeyId에 대한 값은 config 메소드에서 채워지며 KMS_ENDPOINT 및 KMS_KEY_ID 구성 키에 해당합니다. 값을 반환하기 전에 일반 텍스트는 Base64로 인코딩되어 Base64로 디코딩되어야 합니다. decryptData 메소드는 Kafka 인증 인증서를 설정할 때 handleRequest 메소드에서 사용됩니다.

    private String decryptData(String cipherText) {
     
        AbstractAuthenticationDetailsProvider provider = ResourcePrincipalAuthenticationDetailsProvider.builder().build();
        KmsCryptoClient cryptoClient = KmsCryptoClient.builder().endpoint(kmsEndpoint).build(provider);
     
        DecryptDataDetails decryptDataDetails = DecryptDataDetails.builder().keyId(kmsKeyId).ciphertext(cipherText).build();
        DecryptRequest decryptRequest = DecryptRequest.builder().decryptDataDetails(decryptDataDetails).build();
        DecryptResponse decryptResponse = cryptoClient.decrypt(decryptRequest);
     
        final String base64String = decryptResponse.getDecryptedData().getPlaintext();
        byte[] byteArray = Base64.decodeBase64(base64String.getBytes());
        String value = new String(byteArray);
     
        return value;
    }
    
  6. handleRequest 메소드를 생성합니다.

    이 방법은 Oracle Vault에서 스트리밍 서비스에 대한 인증서를 검색한 다음 이벤트 메시지를 스트리밍 서비스로 전송합니다. 또한 Kafka 구성 및 자격 증명 검색/암호 해독 등의 일부 "paperwork"도 수행합니다.

    handleRequest 메소드는 Event 객체를 유일한 매개변수로 사용합니다. 객체는 Oracle Functions 시스템에 의해 자동으로 생성되어 블록체인에서 제공되는 JSON의 데이터로 채워집니다. 이러한 이유로 이벤트 내부 클래스는 블록체인에서 가져온 JSON 데이터에 직접 매핑해야 합니다.

    먼저 "테스트" 이벤트 유형을 확인해야 합니다. 블록체인 이벤트에 가입하면 첫 번째 이벤트는 이벤트 소비자에게 릴레이하지 않을 테스트 이벤트입니다.

    1. "테스트" 이벤트 유형을 확인하십시오.
      public String handleRequest(Event event) {
       
          if(event.eventType.equalsIgnoreCase("test")) {
              return "done";
          }
          .
          .
          .
      }
    2. 스트리밍 서비스 SASL 구성에 필요한 문자열을 작성합니다.
          .
          .
          .
      
          String loginModule = "org.apache.kafka.common.security.plain.PlainLoginModule"
          String saslJassConfig = String.format(
              "%s required username=\"%s/%s/%s\" password=\"%s\";",
              loginModule,
              tenancyName,
              decryptData(userName),
              streamOCID,
              decryptData(authToken));
          .
          .
          .
      
    3. java.util.Properties 객체를 생성하고 구성으로 채웁니다.
          .
          .
          .
          Properties props = new Properties();
          props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG , bootstrapServers);
          props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG , StringSerializer.class.getName());
          props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , StringSerializer.class.getName());
          props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
          props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
          props.put(SaslConfigs.SASL_JAAS_CONFIG, saslJassConfig);
          props.put(ProducerConfig.RETRIES_CONFIG, 5);
          props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024 * 1024);
          .
          .
          .
      
    4. 구성을 등록하고 이벤트 메시지를 스트리밍 서비스로 전송합니다.
          .
          .
          .
          KafkaProducer<String,String> producer = new KafkaProducer<>(props);
          ProducerRecord<String,String> record = new ProducerRecord<>(event.eventMessage.eventName, event.toString());
      
          producer.send(record);
          producer.flush();
          producer.close();
      
  7. 스트리밍 서비스로 중계하기 위해 메시지가 전송될 때마다 시스템에서 handleRequest를 호출하도록 지시합니다.

    이렇게 하려면 EventProducer 클래스의 handleRequest 메소드를 가리키도록 func.yamlcmd 등록 정보를 수정합니다. 예:

    schema_version: 20180708
    name: obpeventsfunc
    version: 0.0.1
    runtime: java
    build_image: fnproject/fn-java-fdk-build:jdk11-1.0.105
    run_image: fnproject/fn-java-fdk:jre11-1.0.105
    cmd: producer.EventProducer::handleRequest

이벤트 릴레이 빌드

이벤트 릴레이에 대한 Java 클래스를 생성한 후 이 클래스를 사용하여 Oracle Cloud Infrastructure Registry로 푸시할 Docker 이미지를 작성합니다. 이미지가 생성된 후에는 OCI 지역 및 오브젝트 스토리지 네임스페이스에 대한 정보로 태그를 지정해야 합니다.

이미지를 푸시하려면 먼저 docker tag 명령을 사용하여 로컬 소스 이미지의 복사본을 새 이미지로 만들어야 합니다(새 이미지는 실제로 기존 소스 이미지에 대한 참조일 뿐임). 새 이미지의 이름으로 Oracle Cloud Infrastructure Registry에서 이미지를 푸시할 대상 위치에 대한 전체 경로를 지정합니다.

태그에 대해 다음 정보가 필요합니다.

  • 해당 지역의 Docker 레지스트리 엔드포인트
  • 오브젝트 스토리지 네임스페이스

지역에 대한 끝점을 가져오려면 지역별 가용성의 테이블에서 지역을 조회합니다. 예를 들어, 미국 동부(애슈번)의 레지스트리 끝점은 https://iad.ocir.io입니다.

오브젝트 스토리지 네임스페이스를 찾으려면 다음과 같이 하십시오.

  1. 탐색 메뉴에서 관리를 누른 다음 테넌시 세부정보를 누릅니다.
  2. 스토리지 네임스페이스가 오브젝트 스토리지 설정 섹션에 있습니다.
  1. 최상위 디렉토리에서 build 명령을 실행합니다. 최상위 디렉토리는 fn init 명령을 실행할 때 만든 디렉토리입니다.
    fn build
  2. 이미지에 OCI 지역 및 오브젝트 스토리지 네임스페이스 정보를 태그 지정합니다.

    예를 들어, 해당 지역의 끝점이 https://iad.ocir.io이고 오브젝트 스토리지 네임스페이스가 yzl1yzrddld7인 경우 obpeventsfunc를 이벤트 릴레이 이름으로 사용한다고 가정하여 명령은 다음과 같습니다.

    docker tag obpeventsfunc:0.0.1 iad.ocir.io/yzl1yzrddld7/obpeventsfunc:0.0.1

    참고:

    명령에서 https:// 프로토콜 정보를 생략해야 합니다.

이벤트 릴레이를 레지스트리로 푸시

이벤트 릴레이를 구성하고 적절한 태그를 지정한 후 Oracle Cloud Infrastructure Registry에 푸시할 수 있습니다.

Oracle Cloud Infrastructure Registry에 로그인할 때는 다음 정보가 필요합니다.
  • 해당 지역의 API 끝점(예: iad.ocir.io)입니다. 이전에 Docker 이미지에 태그를 지정하는 데 사용한 값과 동일합니다.
  • 테넌시 네임스페이스: 테넌시의 자동 생성된 객체 스토리지 네임스페이스 문자열(테넌시 정보 페이지에 나와 있음)입니다.
  • 사용자 이름: Oracle Cloud Infrastructure의 사용자 이름입니다.
  • 인증 토큰: 계획 섹션에서 이전에 생성한 토큰입니다.
  1. Oracle Cloud Infrastructure Registry에 로그인합니다.
    docker login <region-key>.ocir.io

    프롬프트가 표시되면 사용자 이름을 <tenancy-namespace>/<username> 형식으로 입력합니다. 예: yzl1yzrddld7/jdoe@example.com. 테넌시가 Oracle Identity Cloud Service와 통합되는 경우 <tenancy-namespace>/oracleidentitycloudservice/<username> 형식을 사용합니다.

  2. 메시지가 표시되면 이전에 생성한 인증 토큰을 입력합니다.
  3. 컴퓨터에서 Docker 이미지를 Oracle Cloud Infrastructure Registry로 푸시합니다.

    예를 들어, 해당 지역의 끝점이 https://iad.ocir.io이고 오브젝트 스토리지 네임스페이스가 yzl1yzrddld7인 경우 명령은 다음과 같습니다.

    docker push iad.ocir.io/yzl1yzrddld7/obpeventsfunc:0.0.1

API 키 생성

키 쌍을 생성하고 공용 키를 Oracle Cloud Infrastructure 사용자 프로파일에 업로드합니다.

이 작업은 두 가지 방법으로 수행할 수 있지만 생성된 키를 계정에 직접 안전하게 업로드할 수 있으므로 Cloud Shell을 사용합니다.

  1. 테넌시에 대한 Cloud Shell에 로그인합니다.
  2. 키 쌍에 대한 디렉토리 생성
    mkdir ~/.oci
  3. 2048비트 개인 키를 생성하고 oci_api_key.pem라는 파일에 저장합니다.
    openssl genrsa -out ~/.oci/oci_api_key.pem 2048
  4. 사용자가 파일을 읽기 전용으로 설정합니다.
    chmod u=r,go= ~/.oci/oci_api_key.pem
  5. 공용 키를 생성하고 oci_api_key_public.pem라는 파일에 저장합니다.
    openssl rsa -pubout -in ~/.oci/oci_api_key.pem -out ~/.oci/oci_api_key_public.pem
  6. 공용 키를 사용자 프로파일에 업로드합니다.

    다음 코드 조각에서 --user-id 값을 자신의 OCID로 바꾸고 --region 값을 자신의 지역에 대한 Region Key 값으로 변경합니다.

    oci iam user api-key upload \
    --user-id oci.user.oc1..aaaaa ... z3goa \
    --key-file ~/.oci/oci_api_key_public.pem \
    --region IAD
  7. JSON 응답에서 지문 값을 찾아 기록합니다. 나중에 이 값이 필요합니다.

Terraform 구성 적용

GitHub 저장소에서 Terraform 구성을 다운로드하고 Terraform 변수 파일을 업데이트한 후 구성을 적용합니다.

시작하기 전에 다음 정보를 사용할 수 있는지 확인합니다. terraform.tvars 파일에 대한 정보가 필요합니다.
  • 영역 - 영역의 영역 식별자입니다. Oracle Cloud Infrastructure 콘솔의 URL에서 추출할 수 있습니다. 예를 들어, URL이 https://console.us-ashburn-1.oraclecloud.com/compute/instances인 경우 지역 식별자는 us-ashburn-1입니다. URL이 https://cloud.oracle.com/compute/instances인 경우 https://docs.oracle.com/iaas/Content/General/Concepts/regions.htm에서 지역 식별자를 조회해야 합니다.
  • 구획 OCID - 프로젝트에 대한 리소스가 포함된 구획의 OCID입니다. [계획] 섹션의 지침을 따른 경우 OBP_Events 구획에 대한 OCID가 필요합니다.
  • 지문 - 이전에 생성하고 프로파일에 업로드한 공용 API 키의 지문입니다.
  • 개인 키 - 이전에 생성한 전용 API 키의 전체 경로 및 파일 이름입니다. 예: /home/opc/oci_api_key.pem. 경로에 ~를 사용하지 마십시오.
  • 사용자 OCID - 사용자 세부정보 페이지에서 확인할 수 있습니다. [콘솔] 메뉴를 열고 ID로 이동하고 사용자를 누릅니다. 목록에서 사용자 이름을 누릅니다.
  • 테넌시 OCID - 테넌시 세부정보 페이지에서 가져올 수 있습니다. [콘솔] 메뉴를 열고 관리로 이동하고 테넌시 세부정보를 누릅니다.

정보가 있으면 Terraform 구성을 다운로드하여 적용합니다.

  1. 테넌시에 대한 Cloud Shell에 로그인합니다.
  2. GitHub에서 oci-obp-extension 저장소를 복제합니다.
    git clone https://github.com/oracle-quickstart/oci-obp-extension.git
  3. Terraform 디렉토리로 변경합니다.
    cd oci-obp-extension/terraform
  4. 텍스트 편집기에서 terraform.tvars 파일을 열고 자리 표시자 값을 실제 값으로 바꿉니다.
  5. OCI_AUTHOCI_use_obo_token 환경 변수를 지웁니다.
    unset OCI_AUTH OCI_use_obo_token
  6. Terraform 구성을 위한 디렉토리를 준비합니다.
    terraform init
  7. Terraform 구성이 생성할 내용을 보고 확인합니다.
    terraform plan
  8. Terraform 구성을 적용합니다.
    terraform apply -auto-approve
  9. Terraform 구성이 완료될 때 표시되는 API 게이트웨이 정보를 기록합니다. OBP_Event_Subscribe_Callbackuse_to_extract_ssl_certificate 값을 기록해야 합니다.

블록체인 이벤트 구독

API 게이트웨이가 블록체인 이벤트를 수신할 수 있도록 콜백 URL을 등록합니다.

등록한 URL은 Terraform 프로세스가 완료될 때 표시된 API 게이트웨이 콜백 URL입니다. https://joh5rb...fuxy.apigateway.us-ashburn-1.oci.customer-oci.com/obpevents/callback 형식입니다. 또한 Terraform 프로세스가 완료될 때 use_to_extract_ssl_certificate로 표시된 API 게이트웨이 배포 엔드포인트가 필요합니다. joh5rb...fuxy.apigateway.us-ashburn-1.oci.customer-oci.com:443 형식입니다.

또한 체인코드 이벤트에 가입하려면 Oracle Blockchain Platform REST API 엔드포인트가 필요합니다. 형식은 다음과 같습니다.

https://<rest_server_url:port/restproxy#>/bcsgw/rest/v1/event/subscribe

<rest_server_url:port/restproxy#>은 Oracle Blockchain Platform 콘솔에 나열된 REST 프록시의 URL입니다. 콘솔에서 [노드] 페이지를 열고 REST 프록시 노드를 찾습니다. 경로 열에는 포트 및 REST 프록시 번호를 포함한 URL이 나열됩니다.

블록체인 이벤트에 가입하려면 다음과 같이 하십시오.

  1. API 게이트웨이 콜백에 대한 caCert의 압축을 풉니다.
    1. 테넌시에 대한 Cloud Shell에 로그인합니다.
    2. 다음 명령을 실행합니다.

      API_GATEWAY 텍스트를 Terraform 구성 단계의 끝에서 이전에 기록한 use_to_extract_ssl_certificate 값으로 바꿉니다.

      echo | openssl s_client -showcerts \
      -connect API_GATEWAY 2>/dev/null | openssl x509 \
      -inform pem -out cert.pem;echo;echo;awk 'NF {sub(/\r/, ""); \
      printf "%s\\n",$0;}'  cert.pem;rm cert.pem;echo;echo
    3. 표시된 인증서를 복사합니다. -----BEGIN CERTIFICATE----------END CERTIFICATE----- 텍스트를 포함해야 합니다.
  2. Oracle Blockchain Platform에 릴레이 함수를 등록합니다.

    블록체인 REST API를 사용하여 릴레이 함수를 등록합니다. cURL, Postman 또는 다른 툴을 사용하여 블록체인에 요청을 POST할 수 있습니다. 요청 본문은 다음 JSON 형식을 사용합니다.

    {
      "requests": [
        {
          "eventType": "chaincodeEvent",
          "callbackURL":"https://joh5rb...fuxy.apigateway.us-ashburn-1.oci.customer-oci.com/obpevents/callback",
          "expires": "10m",
          "chaincode": "obcs-cardealer",
          "eventName": "VehiclePartCreated",
          "callbackTlsCerts": {
              "caCert": "-----BEGIN CERTIFICATE-----\n ... \n-----END CERTIFICATE-----\n" 
          }
        }
      ]
      

    적용되는 callbackURL, chaincode, eventNamecaCert 값을 사용해야 합니다.

    참고:

    함수 컨테이너를 구축 및 배포하는 데 필요한 시간 때문에 성공 응답을 받기 전에 여러 번 시도해야 할 수 있습니다.

이벤트 소비자 생성 및 테스트

이벤트 소비자는 Kafka API를 사용하여 Oracle Cloud Infrastructure 스트리밍에서 블록체인 메시지를 인증하고 검색합니다.

소비자를 생성하려면 다음 문자열에 대한 정보가 필요합니다.

  • USER-NAME: 사용자 이름입니다.
  • AUTH-TOKEN: 이전에 생성하고 복사한 토큰입니다.
  • TENANCY-NAME: OCI 콘솔의 테넌시 세부정보 페이지에서 이 값을 찾을 수 있습니다. 테넌시 세부정보 페이지를 열려면 콘솔 탐색 메뉴에서 관리를 선택하고 테넌시 세부정보를 누릅니다.
  • STREAM-POOL-OCID: Terraform 스크립트를 실행할 때 생성된 스트림 풀의 OCID입니다. 이 값을 찾으려면 콘솔 탐색 메뉴를 열고 관리로 이동하여 스트리밍을 누릅니다. 스트림 풀과 OCID 복사를 여는 페이지를 선택합니다.
  • REGION: "us-ashburn-1" 지역이 다를 수도 있습니다.
  • EVENT-NAME: 블록체인 이벤트를 수신하기 위해 콜백 URL을 등록할 때 이전에 사용한 것과 동일한 이벤트 이름입니다.

다음 예를 사용하여 시스템이 예상대로 작동하는지 테스트하고 검증합니다.

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class Consumer {

    public static void main(String[] args) throws Exception {
       
        Properties props = new Properties();
        props.put("bootstrap.servers", "streaming.REGION.oci.oraclecloud.com:9092");
        props.put("security.protocol", "SASL_SSL");
        props.put("sasl.mechanism", "PLAIN");
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"TENANCY-NAME/USER-NAME/STREAM-POOL-OCID\" password=\"AUTH-TOKEN\";");
        props.put("group.id", "group-0");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // Create a consumer and subscribe to it
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("EVENT-NAME"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s\n", 
                    record.offset(), record.key(), record.value());
        }     
    } 
}

소비자를 시작하면 이미 스트림으로 푸시된 기존 이벤트가 소비됩니다.

예제가 작동하고 솔루션 작동에 대해 설명하면 운용 레벨 소비자 앱을 하나 이상 생성할 준비가 된 것입니다. 이 앱은 JavaScript, Python 또는 다른 언어로 작성할 수 있습니다.