イベントリレーの開発と展開

ブロックチェーン・イベントを消費し、Oracle Streaming ServiceにリレーするOracle Functionsアプリケーションを作成します。

イベント・リレー・アプリケーションには、ストリーミング・サービスに接続するための資格証明が必要です。資格証明はOracle Cloud Infrastructure Vaultに保持されます。資格証明は、Terraformコードの実行時にボールトに格納されます。資格証明は、OCI環境の初期構成を行うときに生成されます。

この関数には次のサービスがあります。

  1. Vaultからストリーミング・サービス資格証明を取得します。
  2. 資格証明を復号化します(Vaultでは暗号化された値が格納されます)。
  3. base64エンコード文字列からデコードします。
  4. 暗号化された資格証明を使用して、Kafka互換APIを使用してストリーミング・サービスに接続します。
  5. JSONイベント・メッセージからeventというJavaオブジェクトを作成して移入します。
  6. Kafka APIを使用してイベント・メッセージを送信します。

イベントリレーの作成

リレーは任意のプログラミング言語で作成できますが、Kafka APIはJavaに文書化されているため、Javaを使用することをお薦めします。

EventProducerクラスには次のメソッドがあります。

@FnConfiguration
public void config(RuntimeContext ctx) { ... }

private String decryptData(String cipherText) { ... }

public String handleRequest(Event event) { ... }

pom.xmlファイルには、次の依存性も必要です。

<dependencies>
    <dependency>
        <groupId>com.oracle.oci.sdk</groupId>
        <artifactId>oci-java-sdk-keymanagement</artifactId>
        <version>1.12.5</version>
    </dependency>
    <dependency>
        <groupId>javax.activation</groupId>
        <artifactId>javax.activation-api</artifactId>
        <version>1.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.4.0</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.7.30</version>
    </dependency>
    <dependency>
        <groupId>com.fnproject.fn</groupId>
        <artifactId>api</artifactId>
        <version>${fdk.version}</version>
    </dependency>
    .
    .
    .

fn CLIツールがインストールされていることを確認します。詳細は、https://github.com/fnproject/fn#quickstartを参照してください。

  1. fnプロジェクトを作成し、obpeventsfuncなどの妥当な名前を付けます。
    fn init obpeventsfunc --runtime java
  2. HelloFunction.javaEventProducer.javaというファイルに置き換えます。

    すべてのコードは、この1つのクラスに格納されます。この関数を作成するには、次のクラスをインポートする必要があります。

    com.fnproject.fn.api.FnConfiguration
    com.fnproject.fn.api.RuntimeContext
    com.oracle.bmc.auth.AbstractAuthenticationDetailsProvider
    com.oracle.bmc.auth.ResourcePrincipalAuthenticationDetailsProvider
    com.oracle.bmc.keymanagement.KmsCryptoClient
    com.oracle.bmc.keymanagement.model.DecryptDataDetails
    com.oracle.bmc.keymanagement.requests.DecryptRequest
    com.oracle.bmc.keymanagement.responses.DecryptResponse
    org.apache.commons.codec.binary.Base64
    org.apache.kafka.clients.CommonClientConfigs
    org.apache.kafka.clients.producer.KafkaProducer
    org.apache.kafka.clients.producer.ProducerConfig
    org.apache.kafka.clients.producer.ProducerRecord
    org.apache.kafka.common.config.SaslConfigs
    org.apache.kafka.common.serialization.StringSerializer
    java.util.Properties
  3. ブロックチェーンからイベント・メッセージを保持する内部クラスを作成します。

    イベント・メッセージの構造に対応するオブジェクトが必要です。3つのクラス定義は、EventProducerクラスの一部です。このクラスは、Oracle FunctionsでOracle Blockchain Platformが提供するJSONデータのデシリアライズに使用されます。

    public static class Event {
         public String eventType;
         public String subid;
         public String channel;
         public EventMsg eventMsg;
     
         @Override
         public String toString() {
             return eventMsg.payload.data;
         }
     }
     
     public static class EventMsg {
         public String chaincodeId;
         public String txId;
         public String eventName;
         public Payload payload;
     }
     
     public static class Payload {
         public String type;
         public String data;
     }
  4. configメソッドに移入される構成値を保持するために、複数の文字列変数をグローバルに定義します。

    @FnConfiguration注釈のため、システムはこのメソッドにRuntimeContextオブジェクトを渡します。次に、getConfigurationByKeyを使用してグローバル変数を移入します。次の構成キーごとに変数が必要です。

    BOOT_STRAP_SERVERS
    STREAM_OCID
    TENANT_NAME 
    USER_NAME
    AUTH_TOKEN
    KMS_ENDPOINT
    KMS_KEY_ID
    

    例:

    @FnConfiguration
    public void config(RuntimeContext ctx) { 
        bootstrapServers = ctx.getConfigurationByKey("BOOT_STRAP_SERVERS").orElse("");
        streamOCID = ...
        .
        .
        .
    }
  5. decryptDataメソッドを作成します。

    decryptDataメソッドは、次のスニペットのようになります。kmsEndpointおよびkmsKeyIdの値は、configメソッドに移入され、KMS_ENDPOINTおよびKMS_KEY_ID構成キーに対応します。プレーン・テキストはBase64でエンコードされるため、値を返す前にBase64をデコードする必要があります。decryptDataメソッドは、Kafka認証資格証明を設定するときにhandleRequestメソッドで使用されます。

    private String decryptData(String cipherText) {
     
        AbstractAuthenticationDetailsProvider provider = ResourcePrincipalAuthenticationDetailsProvider.builder().build();
        KmsCryptoClient cryptoClient = KmsCryptoClient.builder().endpoint(kmsEndpoint).build(provider);
     
        DecryptDataDetails decryptDataDetails = DecryptDataDetails.builder().keyId(kmsKeyId).ciphertext(cipherText).build();
        DecryptRequest decryptRequest = DecryptRequest.builder().decryptDataDetails(decryptDataDetails).build();
        DecryptResponse decryptResponse = cryptoClient.decrypt(decryptRequest);
     
        final String base64String = decryptResponse.getDecryptedData().getPlaintext();
        byte[] byteArray = Base64.decodeBase64(base64String.getBytes());
        String value = new String(byteArray);
     
        return value;
    }
    
  6. handleRequestメソッドを作成します。

    このメソッドは、Oracle Vaultからストリーミング・サービスの資格証明を取得し、イベント・メッセージをストリーミング・サービスに送信します。また、Kafka構成および資格証明の取得および復号化も行われています。

    handleRequestメソッドは、唯一のパラメータとしてイベント・オブジェクトを取得します。オブジェクトはOracle Functionsシステムによって自動的に作成され、ブロックチェーンに由来するJSONからのデータが移入されます。このため、イベント内部クラスは、ブロックチェーンから得られたJSONデータに直接マップする必要があります。

    最初に実行する必要があるのは、testイベント・タイプを確認することです。ブロックチェーン・イベントをサブスクライブした場合、最初のイベントは、イベント・コンシューマにリレーしないテスト・イベントです。

    1. 「test」イベント・タイプを確認してください。
      public String handleRequest(Event event) {
       
          if(event.eventType.equalsIgnoreCase("test")) {
              return "done";
          }
          .
          .
          .
      }
    2. ストリーミングサービスSASL構成に必要な文字列を作成します。
          .
          .
          .
      
          String loginModule = "org.apache.kafka.common.security.plain.PlainLoginModule"
          String saslJassConfig = String.format(
              "%s required username=\"%s/%s/%s\" password=\"%s\";",
              loginModule,
              tenancyName,
              decryptData(userName),
              streamOCID,
              decryptData(authToken));
          .
          .
          .
      
    3. java.util.Propertiesオブジェクトを作成し、構成を入力します。
          .
          .
          .
          Properties props = new Properties();
          props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG , bootstrapServers);
          props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG , StringSerializer.class.getName());
          props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , StringSerializer.class.getName());
          props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
          props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
          props.put(SaslConfigs.SASL_JAAS_CONFIG, saslJassConfig);
          props.put(ProducerConfig.RETRIES_CONFIG, 5);
          props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024 * 1024);
          .
          .
          .
      
    4. 構成を登録し、イベント・メッセージをストリーミング・サービスに送信します。
          .
          .
          .
          KafkaProducer<String,String> producer = new KafkaProducer<>(props);
          ProducerRecord<String,String> record = new ProducerRecord<>(event.eventMessage.eventName, event.toString());
      
          producer.send(record);
          producer.flush();
          producer.close();
      
  7. ストリーミング・サービスにリレーするためのメッセージが送信されるたびにhandleRequestをコールするようにシステムに指示します。

    これを行うには、EventProducerクラスのhandleRequestメソッドを指すようにfunc.yamlcmdプロパティを変更します。例:

    schema_version: 20180708
    name: obpeventsfunc
    version: 0.0.1
    runtime: java
    build_image: fnproject/fn-java-fdk-build:jdk11-1.0.105
    run_image: fnproject/fn-java-fdk:jre11-1.0.105
    cmd: producer.EventProducer::handleRequest

イベントリレーの作成

イベント・リレーのJavaクラスを作成した後、これを使用して、Oracle Cloud Infrastructure RegistryにプッシュするDockerイメージを作成します。イメージの作成後、OCIリージョンとオブジェクト・ストレージ・ネームスペースに関する情報でタグ付けする必要があります。

イメージをプッシュする前に、最初にdocker tagコマンドを使用して、ローカル・ソース・イメージのコピーを新規イメージとして作成する必要があります(新規イメージは実際には既存のソース・イメージへの参照です)。新しいイメージの名前として、イメージをプッシュするOracle Cloud Infrastructure Registry内のターゲットの場所の完全修飾パスを指定します。

タグには、次の情報が必要です。

  • リージョンのDockerレジストリ・エンドポイント
  • オブジェクト・ストレージのネームスペース

リージョンのエンドポイントを取得するには、リージョン別の可用性の表でリージョンを検索します。たとえば、米国東部(アッシュバーン)のレジストリ・エンドポイントはhttps://iad.ocir.ioです。

オブジェクト・ストレージ・ネームスペースを検索するには:

  1. ナビゲーション・メニューで、「管理」「テナンシ詳細」の順にクリックします。
  2. 「オブベクト・ストレージ・ネームスペース」は「オブジェクト・ストレージ設定」セクションにあります
  1. 最上位ディレクトリで、buildコマンドを実行します。最上位ディレクトリは、fn initコマンドを実行したときに作成したディレクトリです。
    fn build
  2. OCIリージョンおよびオブジェクト・ストレージ・ネームスペース情報でイメージをタグ付けします。

    たとえば、リージョンのエンドポイントがhttps://iad.ocir.ioで、オブジェクト・ストレージ・ネームスペースがyzl1yzrddld7の場合、obpeventsfuncをイベント・リレーの名前として使用すると、コマンドは次のようになります。

    docker tag obpeventsfunc:0.0.1 iad.ocir.io/yzl1yzrddld7/obpeventsfunc:0.0.1

    ノート:

    コマンドのhttps://プロトコル情報は必ず省略してください。

レジストリへのイベントリレーのプッシュ

イベント・リレーが作成され、適切なタグが付いていたら、それをOracle Cloud Infrastructure Registryにプッシュできます。

Oracle Cloud Infrastructure Registryにログインするには、次の情報が必要です:
  • リージョンのAPIエンドポイント:たとえば、iad.ocir.ioです。これは、Dockerイメージをタグ付けするために以前に使用した値と同じです。
  • テナンシ・ネームスペース:テナンシの自動生成されたオブジェクト・ストレージ・ネームスペース文字列(「テナンシ情報」ページに表示)。
  • ユーザー名: Oracle Cloud Infrastructureでのユーザー名。
  • Auth Token:「Plan(プラン)」セクションで前に作成したトークン。
  1. Oracle Cloud Infrastructure Registryにログインします。
    docker login <region-key>.ocir.io

    プロンプトが表示されたら、ユーザー名を<tenancy-namespace>/<username>の書式で入力します。たとえば、yzl1yzrddld7/jdoe@example.comです。テナンシがOracle Identity Cloud Serviceとフェデレートされている場合は、<tenancy-namespace>/oracleidentitycloudservice/<username>の書式を使用します。

  2. 要求された場合は、前に作成した認証トークンを入力します。
  3. コンピュータからOracle Cloud Infrastructure RegistryにDockerイメージをプッシュします。

    たとえば、リージョンのエンドポイントがhttps://iad.ocir.ioで、オブジェクト・ストレージ・ネームスペースがyzl1yzrddld7の場合、コマンドは次のようになります。

    docker push iad.ocir.io/yzl1yzrddld7/obpeventsfunc:0.0.1

APIキーの作成

キー・ペアを生成し、公開キーをOracle Cloud Infrastructureユーザー・プロファイルにアップロードします。

これはいくつかの方法で行うことができますが、ここでは、生成されたキーをアカウントに直接アップロードできるため、クラウド・シェルを使用します。

  1. テナンシのクラウド・シェルにログインします。
  2. キー・ペアのディレクトリを作成します
    mkdir ~/.oci
  3. 2048ビット秘密キーを生成し、oci_api_key.pemというファイルに格納します。
    openssl genrsa -out ~/.oci/oci_api_key.pem 2048
  4. ユーザーがファイルを読取り専用にします。
    chmod u=r,go= ~/.oci/oci_api_key.pem
  5. 公開キーを生成し、oci_api_key_public.pemというファイルに格納します。
    openssl rsa -pubout -in ~/.oci/oci_api_key.pem -out ~/.oci/oci_api_key_public.pem
  6. 公開鍵をユーザー・プロファイルにアップロードします。

    次のスニペットで、--user-id値を独自のOCIDに置き換え、--region値を自分のリージョンのリージョン・キー値に変更します。

    oci iam user api-key upload \
    --user-id oci.user.oc1..aaaaa ... z3goa \
    --key-file ~/.oci/oci_api_key_public.pem \
    --region IAD
  7. JSONレスポンスで、フィンガープリントの値を見つけて記録します。この値は後で必要になります。

Terraform構成の適用

GitHubリポジトリからTerraform構成をダウンロードし、Terraform変数ファイルを更新してから、構成を適用します。

作業を開始する前に、次の情報があることを確認してください。terraform.tvarsファイルには情報が必須です。
  • リージョン -リージョンのリージョン識別子。これは、Oracle Cloud InfrastructureコンソールのURLから抽出できます。たとえば、URLがhttps://console.us-ashburn-1.oraclecloud.com/compute/instancesの場合、リージョン識別子はus-ashburn-1です。URLがhttps://cloud.oracle.com/compute/instancesの場合、https://docs.oracle.com/iaas/Content/General/Concepts/regions.htmでリージョン識別子を検索する必要があります
  • コンパートメントOCID -プロジェクトのリソースを含むコンパートメントのOCID。「プラン」セクションの指示に従った場合、OBP_EventsコンパートメントのOCIDが必要です。
  • フィンガープリント -以前に生成してプロファイルにアップロードしたパブリックAPIキーのフィンガープリント。
  • 秘密キー -以前に生成したプライベートAPIキーのフルパスとファイル名。たとえば、/home/opc/oci_api_key.pemです。パスに~を使用しないでください。
  • ユーザーOCID -これは、「ユーザー詳細」ページから取得できます。コンソール・メニューを開き、「アイデンティティ」に移動して「ユーザー」をクリックします。リストでユーザー名をクリックします。
  • テナンシOCID -これは、「テナンシ詳細」ページから取得できます。コンソール・メニューを開き、「管理」に移動して「テナンシ詳細」をクリックします。

情報がある場合は、Terraform構成をダウンロードして適用します。

  1. テナンシのクラウド・シェルにログインします。
  2. GitHubからoci-obp-extensionリポジトリをクローニングします。
    git clone https://github.com/oracle-quickstart/oci-obp-extension.git
  3. Terraformディレクトリに移動します。
    cd oci-obp-extension/terraform
  4. テキスト・エディタでterraform.tvarsファイルを開き、プレースホルダ値を実際の値に置き換えます。
  5. OCI_AUTHおよびOCI_use_obo_token環境変数をクリアします。
    unset OCI_AUTH OCI_use_obo_token
  6. Terraform構成のディレクトリを準備します。
    terraform init
  7. Terraform構成によって作成される内容を表示して確認します。
    terraform plan
  8. Terraform構成を適用します。
    terraform apply -auto-approve
  9. Terraform構成が完了すると表示されるAPIゲートウェイ情報を記録します。次の値のレコードを作成する必要があります: OBP_Event_Subscribe_Callbackおよびuse_to_extract_ssl_certificate

ブロックチェーン・イベントのサブスクライブ

APIゲートウェイがブロックチェーン・イベントを受信できるようにコールバックURLを登録します。

登録するURLは、Terraformプロセスの完了時に表示されるAPIゲートウェイ・コールバックURLです。これはhttps://joh5rb...fuxy.apigateway.us-ashburn-1.oci.customer-oci.com/obpevents/callbackの形式です。また、Terraformプロセスの完了時にuse_to_extract_ssl_certificateとして表示されるAPIゲートウェイ・デプロイメント・エンドポイントも必要です。joh5rb...fuxy.apigateway.us-ashburn-1.oci.customer-oci.com:443の形式です

チェーンコード・イベントをサブスクライブするには、Oracle Blockchain Platform REST APIエンドポイントも必要です。形式:

https://<rest_server_url:port/restproxy#>/bcsgw/rest/v1/event/subscribe

<rest_server_url:port/restproxy#>は、Oracle Blockchain PlatformコンソールにリストされるRESTプロキシのURLです。コンソールで、ノード・ページを開いてRESTプロキシ・ノードを探します。ルート列に、ポート番号とRESTプロキシ番号を含むURLがリストされます。

ブロックチェーン・イベントをサブスクライブするには:

  1. APIゲートウェイ・コールバックのcaCertを抽出します。
    1. テナンシのクラウド・シェルにログインします。
    2. 次のコマンドを実行します。

      テキストAPI_GATEWAYを、Terraform構成ステップの最後に前に記録したuse_to_extract_ssl_certificateの値に置き換えます。

      echo | openssl s_client -showcerts \
      -connect API_GATEWAY 2>/dev/null | openssl x509 \
      -inform pem -out cert.pem;echo;echo;awk 'NF {sub(/\r/, ""); \
      printf "%s\\n",$0;}'  cert.pem;rm cert.pem;echo;echo
    3. 表示される証明書をコピーします。テキスト-----BEGIN CERTIFICATE-----および-----END CERTIFICATE-----を必ず含めます。
  2. リレー機能をOracle Blockchain Platformに登録します。

    ブロックチェーンREST APIを使用して、リレー機能を登録します。cURL、Postmanまたはその他のツールを使用して、ブロックチェーンへのリクエストをPOSTします。リクエストの本文には次のJSON形式があります。

    {
      "requests": [
        {
          "eventType": "chaincodeEvent",
          "callbackURL":"https://joh5rb...fuxy.apigateway.us-ashburn-1.oci.customer-oci.com/obpevents/callback",
          "expires": "10m",
          "chaincode": "obcs-cardealer",
          "eventName": "VehiclePartCreated",
          "callbackTlsCerts": {
              "caCert": "-----BEGIN CERTIFICATE-----\n ... \n-----END CERTIFICATE-----\n" 
          }
        }
      ]
      

    該当するcallbackURLchaincodeeventNameおよびcaCertの値を使用してください。

    ノート:

    関数コンテナの構築とデプロイに必要な時間のために、成功レスポンスを得る前に数回試行する必要があります。

イベント・コンシューマの作成およびテスト

イベント・コンシューマはKafka APIを使用して、Oracle Cloud Infrastructureストリーミングからブロックチェーン・メッセージを認証および取得します。

コンシューマを作成するには、次の文字列の情報が必要です。

  • USER-NAME:ユーザー名。
  • AUTH-TOKEN:以前に生成およびコピーしたトークンです。
  • TENANCY-NAME:この値はOCIコンソールのテナンシ詳細ページに表示されます。テナンシ詳細ページを開くには、コンソール・ナビゲーション・メニューで「管理」を選択し、「テナンシ詳細」をクリックします。
  • STREAM-POOL-OCID: Terraformスクリプトの実行時に作成されたストリーム・プールのOCIDです。この値を確認するには、コンソールのナビゲーション・メニューを開き、「管理」に移動して「ストリーミング」をクリックします。ストリーム・プールを選択し、それを開くページでOCIDをコピーします。
  • REGION: "us-ashburn-1"。リージョンが異なる場合があります。
  • EVENT-NAME:これは、ブロックチェーン・イベントを受信するためにコールバックURLを登録したときに以前に使用したイベント名と同じです。

次の例を使用して、システムが期待どおりに動作していることをテストし、検証します。

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class Consumer {

    public static void main(String[] args) throws Exception {
       
        Properties props = new Properties();
        props.put("bootstrap.servers", "streaming.REGION.oci.oraclecloud.com:9092");
        props.put("security.protocol", "SASL_SSL");
        props.put("sasl.mechanism", "PLAIN");
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"TENANCY-NAME/USER-NAME/STREAM-POOL-OCID\" password=\"AUTH-TOKEN\";");
        props.put("group.id", "group-0");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // Create a consumer and subscribe to it
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("EVENT-NAME"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s\n", 
                    record.offset(), record.key(), record.value());
        }     
    } 
}

コンシューマを起動すると、ストリームにすでにプッシュされている既存のイベントがすべて消費されます。

例が機能し、ソリューションが機能することを示した後、1つ以上の本番レベルのコンシューマ・アプリケーションを作成できます。アプリケーションは、JavaScript、Pythonまたはその他の言語で記述できます。