開發及建置事件轉送

建立使用區塊鏈事件的 Oracle Functions 應用程式,並將它們轉送至「Oracle 串流服務」。

事件轉送應用程式需要證明資料,才能連線串流服務。證明資料會保留在 Oracle Cloud Infrastructure Vault 中。執行 Terraform 程式碼時,證明資料會儲存在 Vault 中。證明資料會在您執行 OCI 環境的初始組態設定時產生。

函數提供下列服務:

  1. 從保存庫擷取串流服務證明資料。
  2. 解密證明資料 (Vault 會儲存加密值)。
  3. 從 base64 編碼的字串解碼。
  4. 使用與 Kafka-compatible API 連線至串流服務時,請使用解碼的證明資料。
  5. 從 JSON 事件訊息建立並填入名為 event 的 Java 物件。
  6. 使用 Kafka API 傳送事件訊息。

建立事件轉送

您可以使用任何程式設計語言建立轉送,但 Kafka API 是以 Java 記錄的最佳使用 Java。

EventProducer 類別具有下列方法:

@FnConfiguration
public void config(RuntimeContext ctx) { ... }

private String decryptData(String cipherText) { ... }

public String handleRequest(Event event) { ... }

pom.xml 檔案中也需要下列相依性:

<dependencies>
    <dependency>
        <groupId>com.oracle.oci.sdk</groupId>
        <artifactId>oci-java-sdk-keymanagement</artifactId>
        <version>1.12.5</version>
    </dependency>
    <dependency>
        <groupId>javax.activation</groupId>
        <artifactId>javax.activation-api</artifactId>
        <version>1.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.4.0</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.7.30</version>
    </dependency>
    <dependency>
        <groupId>com.fnproject.fn</groupId>
        <artifactId>api</artifactId>
        <version>${fdk.version}</version>
    </dependency>
    .
    .
    .

確定您已安裝 fn CLI 工具。如需詳細資訊,請參閱 https://github.com/fnproject/fn#quickstart。

  1. 建立 fn 專案並提供合理的名稱,例如 obpeventsfunc
    fn init obpeventsfunc --runtime java
  2. 以名為 EventProducer.java 的檔案取代 HelloFunction.java

    所有程式碼都將在此單一類別中。若要建構函數,您必須匯入下列類別:

    com.fnproject.fn.api.FnConfiguration
    com.fnproject.fn.api.RuntimeContext
    com.oracle.bmc.auth.AbstractAuthenticationDetailsProvider
    com.oracle.bmc.auth.ResourcePrincipalAuthenticationDetailsProvider
    com.oracle.bmc.keymanagement.KmsCryptoClient
    com.oracle.bmc.keymanagement.model.DecryptDataDetails
    com.oracle.bmc.keymanagement.requests.DecryptRequest
    com.oracle.bmc.keymanagement.responses.DecryptResponse
    org.apache.commons.codec.binary.Base64
    org.apache.kafka.clients.CommonClientConfigs
    org.apache.kafka.clients.producer.KafkaProducer
    org.apache.kafka.clients.producer.ProducerConfig
    org.apache.kafka.clients.producer.ProducerRecord
    org.apache.kafka.common.config.SaslConfigs
    org.apache.kafka.common.serialization.StringSerializer
    java.util.Properties
  3. 建立一個內部類別來保存區塊鏈的事件訊息。

    您需要一個對應事件訊息結構的物件。這三個類別定義是 EventProducer 類別的一部分。Oracle Functions 會使用此類別來還原序列化 Oracle Blockchain Platform 的 JSON 資料。

    public static class Event {
         public String eventType;
         public String subid;
         public String channel;
         public EventMsg eventMsg;
     
         @Override
         public String toString() {
             return eventMsg.payload.data;
         }
     }
     
     public static class EventMsg {
         public String chaincodeId;
         public String txId;
         public String eventName;
         public Payload payload;
     }
     
     public static class Payload {
         public String type;
         public String data;
     }
  4. 全域定義數個「字串」變數,以保存填入 config 方法的組態值。

    因為@FnConfiguration 註解的緣故,系統會將 RuntimeContext 物件傳遞至此方法。然後使用 getConfigurationByKey 填入全域變數。下列每個組態索引鍵都需要一個變數:

    BOOT_STRAP_SERVERS
    STREAM_OCID
    TENANT_NAME 
    USER_NAME
    AUTH_TOKEN
    KMS_ENDPOINT
    KMS_KEY_ID
    

    例如:

    @FnConfiguration
    public void config(RuntimeContext ctx) { 
        bootstrapServers = ctx.getConfigurationByKey("BOOT_STRAP_SERVERS").orElse("");
        streamOCID = ...
        .
        .
        .
    }
  5. 建立 decryptData 方法。

    decryptData 方法看起來會像下面的程式碼片段。kmsEndpointkmsKeyId 的值會填入 config 方法中,並對應至 KMS_ENDPOINT 和 KMS_KEY_ID 組態索引鍵。請注意,純文字為 Base64 編碼,需要先以 Base64 解碼才能傳回值。設定 Kafka 認證證明資料時,handleRequest 方法會使用 decryptData 方法。

    private String decryptData(String cipherText) {
     
        AbstractAuthenticationDetailsProvider provider = ResourcePrincipalAuthenticationDetailsProvider.builder().build();
        KmsCryptoClient cryptoClient = KmsCryptoClient.builder().endpoint(kmsEndpoint).build(provider);
     
        DecryptDataDetails decryptDataDetails = DecryptDataDetails.builder().keyId(kmsKeyId).ciphertext(cipherText).build();
        DecryptRequest decryptRequest = DecryptRequest.builder().decryptDataDetails(decryptDataDetails).build();
        DecryptResponse decryptResponse = cryptoClient.decrypt(decryptRequest);
     
        final String base64String = decryptResponse.getDecryptedData().getPlaintext();
        byte[] byteArray = Base64.decodeBase64(base64String.getBytes());
        String value = new String(byteArray);
     
        return value;
    }
    
  6. 建立 handleRequest 方法。

    此方法會從 Oracle Vault 擷取「串流」服務的證明資料,然後將事件訊息傳送至「串流」服務。它也會執行一些「paperwork」:Kafka 組態和證明資料擷取及解密。

    handleRequest 方法會使用「事件」物件作為其唯一的參數。物件是由 Oracle Functions 系統自動建立,並植入來自區塊鏈之 JSON 的資料。因此,事件內部類別需要直接對應至來自區塊鏈的 JSON 資料。

    您必須執行的第一項作業是檢查「測試」事件類型。訂閱區塊鏈事件時,第一個事件是您不想要轉送給事件用戶的測試事件。

    1. 檢查「測試」事件類型。
      public String handleRequest(Event event) {
       
          if(event.eventType.equalsIgnoreCase("test")) {
              return "done";
          }
          .
          .
          .
      }
    2. 建立串流服務 SASL 組態所需的字串。
          .
          .
          .
      
          String loginModule = "org.apache.kafka.common.security.plain.PlainLoginModule"
          String saslJassConfig = String.format(
              "%s required username=\"%s/%s/%s\" password=\"%s\";",
              loginModule,
              tenancyName,
              decryptData(userName),
              streamOCID,
              decryptData(authToken));
          .
          .
          .
      
    3. 建立 java.util。Properties 物件並填入組態。
          .
          .
          .
          Properties props = new Properties();
          props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG , bootstrapServers);
          props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG , StringSerializer.class.getName());
          props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , StringSerializer.class.getName());
          props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
          props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
          props.put(SaslConfigs.SASL_JAAS_CONFIG, saslJassConfig);
          props.put(ProducerConfig.RETRIES_CONFIG, 5);
          props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024 * 1024);
          .
          .
          .
      
    4. 註冊組態並將事件訊息傳送至「串流服務」。
          .
          .
          .
          KafkaProducer<String,String> producer = new KafkaProducer<>(props);
          ProducerRecord<String,String> record = new ProducerRecord<>(event.eventMessage.eventName, event.toString());
      
          producer.send(record);
          producer.flush();
          producer.close();
      
  7. 指示每當傳送訊息以轉送至串流服務時,系統都呼叫 handleRequest

    若要這麼做,請修改 func.yaml 中的 cmd 特性,以指向 EventProducer 類別中的 handleRequest 方法。例如:

    schema_version: 20180708
    name: obpeventsfunc
    version: 0.0.1
    runtime: java
    build_image: fnproject/fn-java-fdk-build:jdk11-1.0.105
    run_image: fnproject/fn-java-fdk:jre11-1.0.105
    cmd: producer.EventProducer::handleRequest

組建事件轉送

建立事件轉送的 Java Class 之後,請使用它來建立要植入 Oracle Cloud Infrastructure Registry 的 Docker 映像檔。建立影像之後,您必須使用 OCI 區域和物件儲存命名空間的相關資訊來標記影像。

您必須先使用 docker tag 命令,將本機來源影像的複本建立為新影像 (新影像實際上只是現有來源影像的參照),才能發送影像。作為新影像的名稱,指定 Oracle Cloud Infrastructure Registry 中您要植入影像之目標位置的完整路徑。

標記需要以下資訊:

  • 您區域的 Docker 登錄端點
  • 您的物件儲存命名空間

若要取得區域的端點,請在依區域區分的可用性表格中查詢您的區域。例如,美國東部 (阿什本) 的登錄端點是https://iad.ocir.io

尋找您的物件儲存命名空間:

  1. 在導覽功能表中,按一下管理,然後按一下租用戶詳細資訊。
  2. 物件儲存體設定值段落中有物件儲存體命名空間
  1. 在最上層目錄中,執行 build 命令。最上層目錄是您執行 fn init 命令時所建立的目錄。
    fn build
  2. 使用 OCI 區域和物件儲存命名空間資訊標記影像。

    例如,如果您區域的端點是https://iad.ocir.io,而您的物件儲存命名空間是 yzl1yzrddld7,則命令如下所示,假設您使用 obpeventsfunc 作為事件轉送的名稱:

    docker tag obpeventsfunc:0.0.1 iad.ocir.io/yzl1yzrddld7/obpeventsfunc:0.0.1

    備註:

    請務必省略命令中的https://協定資訊。

發送事件轉接至登錄

建立事件轉送並有適當的標記之後,就可以將它推送至 Oracle Cloud Infrastructure Registry

登入 Oracle Cloud Infrastructure Registry 時,您需要下列資訊:
  • 您區域的 API 端點:例如 iad.ocir.io。此值與您之前用來標記 Docker 映像檔的值相同。
  • 租用戶命名空間:您租用戶之自動產生的物件儲存命名空間字串 (如「租用戶資訊」頁面所示)。
  • 使用者名稱:您在 Oracle Cloud Infrastructure 中的使用者名稱。
  • 認證記號:您先前在「計畫」區段中建立的記號。
  1. 登入 Oracle Cloud Infrastructure Registry
    docker login <region-key>.ocir.io

    出現提示時,請以<<tenancy-namespace>/<username> 格式輸入使用者名稱。例如,yzl1yzrddld7/jdoe@example.com。如果您的租用戶與 Oracle Identity Cloud Service 同盟,請使用<<tenancy-namespace>/oracleidentitycloudservice/<username>格式。

  2. 出現提示時,請輸入您先前建立的認證記號。
  3. 將電腦上的 Docker 映像檔推送至 Oracle Cloud Infrastructure Registry

    例如,如果您區域的端點是https://iad.ocir.io,而您的物件儲存命名空間是 yzl1yzrddld7,則命令如下:

    docker push iad.ocir.io/yzl1yzrddld7/obpeventsfunc:0.0.1

建立 API 金鑰

產生金鑰組並將公開金鑰上傳至您的 Oracle Cloud Infrastructure 使用者設定檔。

您可以透過幾種方式執行此作業,但此處我們將使用 Cloud Shell,因為您可以直接將產生的金鑰安全地上傳至您的帳戶。

  1. 登入您租用戶的 Cloud Shell。
  2. 建立金鑰組的目錄
    mkdir ~/.oci
  3. 產生 2048 位元的私密金鑰,並將其儲存在稱為 oci_api_key.pem 的檔案中。
    openssl genrsa -out ~/.oci/oci_api_key.pem 2048
  4. 讓使用者將檔案設為唯讀。
    chmod u=r,go= ~/.oci/oci_api_key.pem
  5. 產生公開金鑰並將它儲存在名為 oci_api_key_public.pem 的檔案中。
    openssl rsa -pubout -in ~/.oci/oci_api_key.pem -out ~/.oci/oci_api_key_public.pem
  6. 將公開金鑰上傳至您的使用者設定檔。

    在下列程式碼片段中,將--user-id 值取代為您自己的 OCID,並將--region 值變更為您自己區域的「區域索引鍵」值。

    oci iam user api-key upload \
    --user-id oci.user.oc1..aaaaa ... z3goa \
    --key-file ~/.oci/oci_api_key_public.pem \
    --region IAD
  7. 在 JSON 回應中,尋找並記錄指紋的值。您稍後將需要這個值。

套用 Terraform 組態

從 GitHub 儲存區域下載 Terraform 組態、更新 Terraform 變數檔、然後套用組態。

開始之前,請確定您有下列可用資訊。terraform.tvars 檔案需要此資訊。
  • 域-區域的「區域識別碼」。您可以從 Oracle Cloud Infrastructure 主控台的 URL 擷取此項目。例如,如果 URL 是 https://console.us-ashburn-1.oraclecloud.com/compute/instances,您的「區域 ID」就會是 us-ashburn-1。如果您的 URL 是 https://cloud.oracle.com/compute/instances,則必須在 https://docs.oracle.com/iaas/Content/General/Concepts/regions.htm 中查詢您的「區域 ID」
  • 區間 OCID -包含專案資源之區間的 OCID。如果您遵循「計畫」區段中的指示,則需要 OBP_Events 區間的 OCID。
  • 指紋-您先前產生並上傳至設定檔之公用 API 金鑰的指紋。
  • 私密金鑰-您先前產生之私密 API 金鑰的完整路徑和檔案名稱。例如,/home/opc/oci_api_key.pem。請勿在路徑中使用~
  • 使用者 OCID -您可以從「使用者詳細資訊」頁面取得。開啟主控台功能表,移至別,然後按一下使用者。按一下清單中的您的使用者名稱。
  • 租用戶 OCID -您可以從「租用戶詳細資訊」頁面取得此資訊。開啟主控台功能表,移至管理,然後按一下租用戶詳細資訊。

有資訊時,請下載並套用 Terraform 組態。

  1. 登入您租用戶的 Cloud Shell。
  2. 從 GitHub 複製 oci-obp-extension 儲存庫。
    git clone https://github.com/oracle-quickstart/oci-obp-extension.git
  3. 變更至 Terraform 目錄。
    cd oci-obp-extension/terraform
  4. 在文字編輯器中開啟 terraform.tvars 檔案,然後以實際值取代預留位置值。
  5. 清除 OCI_AUTHOCI_use_obo_token 環境變數。
    unset OCI_AUTH OCI_use_obo_token
  6. 準備 Terraform 組態的目錄。
    terraform init
  7. 檢視並驗證 Terraform 組態要建立的項目。
    terraform plan
  8. 套用 Terraform 組態。
    terraform apply -auto-approve
  9. 記錄 Terraform 組態完成時顯示的 API 閘道資訊。您必須記錄下列值:OBP_Event_Subscribe_Callbackuse_to_extract_ssl_certificate

訂閱區塊鏈事件

註冊回呼 URL,讓 API 閘道能夠接收區塊鏈事件。

您註冊的 URL 是 Terraform 處理作業完成時所顯示的 API 閘道回呼 URL。它的格式為 https://joh5rb...fuxy.apigateway.us-ashburn-1.oci.customer-oci.com/obpevents/callback。您還需要 API 閘道部署端點,此端點會在 Terraform 處理作業完成時顯示為 use_to_extract_ssl_certificate。它的格式為 joh5rb...fuxy.apigateway.us-ashburn-1。oci.customer-oci.com:443

您也需要 Oracle Blockchain Platform REST API 端點,才能訂閱鏈碼事件。它的形式如下:

https://<rest_server_url:port/restproxy#>/bcsgw/rest/v1/event/subscribe

<<rest_server_url:port/restproxy#>是 Oracle Blockchain Platform 主控台中所列 REST 代理主機的 URL。在主控台中,開啟「節點」頁面並尋找 REST 代理主機節點。路由資料欄中會列出一個 URL,包括連接埠和 REST 代理主機號碼。

訂閱區塊鏈事件:

  1. 擷取 API 閘道回呼的 caCert。
    1. 登入您租用戶的 Cloud Shell。
    2. 請執行下列命令。

      將文字 API_GATEWAY 取代為您在 Terraform 組態步驟結束時先前記錄的 use_to_extract_ssl_certificate 值。

      echo | openssl s_client -showcerts \
      -connect API_GATEWAY 2>/dev/null | openssl x509 \
      -inform pem -out cert.pem;echo;echo;awk 'NF {sub(/\r/, ""); \
      printf "%s\\n",$0;}'  cert.pem;rm cert.pem;echo;echo
    3. 複製顯示的憑證。請務必包括-----BEGIN CERTIFICATE----------END CERTIFICATE-----字。
  2. 在 Oracle Blockchain Platform 註冊轉送函數。

    使用區塊鏈 REST API 註冊轉送函數。使用 cURL、Postman 或其他工具將要求張貼至區塊鏈。要求內文的 JSON 格式如下:

    {
      "requests": [
        {
          "eventType": "chaincodeEvent",
          "callbackURL":"https://joh5rb...fuxy.apigateway.us-ashburn-1.oci.customer-oci.com/obpevents/callback",
          "expires": "10m",
          "chaincode": "obcs-cardealer",
          "eventName": "VehiclePartCreated",
          "callbackTlsCerts": {
              "caCert": "-----BEGIN CERTIFICATE-----\n ... \n-----END CERTIFICATE-----\n" 
          }
        }
      ]
      

    請務必使用您適用的 callbackURLchaincodeeventNamecaCert 值。

    備註:

    因為建立與部署函數容器所需的時間,您可能需要幾次嘗試,才能取得成功回應。

建立並測試事件用戶

事件用戶使用 Kafka API 從 Oracle Cloud Infrastructure 串流處理認證和擷取區塊鏈訊息。

若要建立用戶,您需要下列字串的資訊:

  • USER-NAME:您的使用者名稱。
  • AUTH-TOKEN:這是您先前產生並複製的記號。
  • TENANCY-NAME:您可以在 OCI 主控台的「租用戶詳細資訊」頁面中找到這個值。若要開啟「租用戶詳細資訊」頁面,請由主控台導覽功能表中選取管理,然後按一下租用戶詳細資訊。
  • STREAM-POOL-OCID:這是執行 Terraform 命令檔時所建立之串流集區的 OCID。若要尋找這個值,請開啟主控台導覽功能表,移至管理,然後按一下串流。選取串流集區,然後在開啟的頁面中複製 OCID。
  • 域:"us-ashburn-1"。您的區域可能不同。
  • EVENT-NAME:這與您先前註冊接收區塊鏈事件的回呼 URL 時所使用的事件名稱相同。

使用下列範例來測試並驗證系統是否正常運作。

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class Consumer {

    public static void main(String[] args) throws Exception {
       
        Properties props = new Properties();
        props.put("bootstrap.servers", "streaming.REGION.oci.oraclecloud.com:9092");
        props.put("security.protocol", "SASL_SSL");
        props.put("sasl.mechanism", "PLAIN");
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"TENANCY-NAME/USER-NAME/STREAM-POOL-OCID\" password=\"AUTH-TOKEN\";");
        props.put("group.id", "group-0");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // Create a consumer and subscribe to it
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("EVENT-NAME"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s\n", 
                    record.offset(), record.key(), record.value());
        }     
    } 
}

當您啟動用戶時,它會使用任何已植入串流中的現有事件。

範例運作並示範解決方案運作之後,您就可以建立一或多個實際環境執行層次用戶應用程式。您可以使用 JavaScript、Python 或其他語言撰寫應用程式。