开发和部署事件中继

创建 Oracle Functions 应用程序,该应用程序使用区块链事件并将它们转发到 Oracle 流式处理服务。

事件中继应用程序需要身份证明才能连接到流式服务。身份证明保存在 Oracle Cloud Infrastructure Vault 中。运行 Terraform 代码时,凭证存储在 Vault 中。进行 OCI 环境的初始配置时会生成凭证。

该功能提供以下服务:

  1. 从 Vault 检索流服务身份证明。
  2. 解密凭证(Vault 存储加密值)。
  3. 从 base64 编码字符串解码。
  4. 使用解码的身份证明使用与 Kafka 兼容的 API 连接到流处理服务。
  5. 从 JSON 事件消息创建并填充名为 event 的 Java 对象。
  6. 使用 Kafka API 发送事件消息。

创建事件中继

您可以使用任何编程语言创建中继,但因为 Java 中记录了 Kafka API,因此最好使用 Java。

EventProducer 类将具有以下方法:

@FnConfiguration
public void config(RuntimeContext ctx) { ... }

private String decryptData(String cipherText) { ... }

public String handleRequest(Event event) { ... }

pom.xml 文件中也需要以下依赖项:

<dependencies>
    <dependency>
        <groupId>com.oracle.oci.sdk</groupId>
        <artifactId>oci-java-sdk-keymanagement</artifactId>
        <version>1.12.5</version>
    </dependency>
    <dependency>
        <groupId>javax.activation</groupId>
        <artifactId>javax.activation-api</artifactId>
        <version>1.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.4.0</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.7.30</version>
    </dependency>
    <dependency>
        <groupId>com.fnproject.fn</groupId>
        <artifactId>api</artifactId>
        <version>${fdk.version}</version>
    </dependency>
    .
    .
    .

确保已安装 fn CLI 工具。有关更多信息,请参见 https://github.com/fnproject/fn#quickstart。

  1. 创建 fn 项目并为其指定合理的名称,如 obpeventsfunc
    fn init obpeventsfunc --runtime java
  2. HelloFunction.java 替换为名为 EventProducer.java 的文件。

    所有代码都将位于此单个类中。要构建函数,必须导入以下类:

    com.fnproject.fn.api.FnConfiguration
    com.fnproject.fn.api.RuntimeContext
    com.oracle.bmc.auth.AbstractAuthenticationDetailsProvider
    com.oracle.bmc.auth.ResourcePrincipalAuthenticationDetailsProvider
    com.oracle.bmc.keymanagement.KmsCryptoClient
    com.oracle.bmc.keymanagement.model.DecryptDataDetails
    com.oracle.bmc.keymanagement.requests.DecryptRequest
    com.oracle.bmc.keymanagement.responses.DecryptResponse
    org.apache.commons.codec.binary.Base64
    org.apache.kafka.clients.CommonClientConfigs
    org.apache.kafka.clients.producer.KafkaProducer
    org.apache.kafka.clients.producer.ProducerConfig
    org.apache.kafka.clients.producer.ProducerRecord
    org.apache.kafka.common.config.SaslConfigs
    org.apache.kafka.common.serialization.StringSerializer
    java.util.Properties
  3. 创建一个内部类来保存区块链中的活动消息。

    您需要与事件消息的结构相对应的对象。三个类定义是 EventProducer 类的一部分。此类由 Oracle Functions 用于反序列化来自 Oracle Blockchain Platform 的 JSON 数据。

    public static class Event {
         public String eventType;
         public String subid;
         public String channel;
         public EventMsg eventMsg;
     
         @Override
         public String toString() {
             return eventMsg.payload.data;
         }
     }
     
     public static class EventMsg {
         public String chaincodeId;
         public String txId;
         public String eventName;
         public Payload payload;
     }
     
     public static class Payload {
         public String type;
         public String data;
     }
  4. 在全局定义多个字符串变量以保存配置值,这些值在 config 方法中填充。

    由于有 @FnConfiguration 注释,系统会将 RuntimeContext 对象传递给此方法。然后,使用 getConfigurationByKey 填充全局变量。您需要以下每个配置密钥的变量:

    BOOT_STRAP_SERVERS
    STREAM_OCID
    TENANT_NAME 
    USER_NAME
    AUTH_TOKEN
    KMS_ENDPOINT
    KMS_KEY_ID
    

    例如:

    @FnConfiguration
    public void config(RuntimeContext ctx) { 
        bootstrapServers = ctx.getConfigurationByKey("BOOT_STRAP_SERVERS").orElse("");
        streamOCID = ...
        .
        .
        .
    }
  5. 创建 decryptData 方法。

    decryptData 方法将如下所示。kmsEndpointkmsKeyId 的值在 config 方法中填充,对应于 KMS_ENDPOINT 和 KMS_KEY_ID 配置密钥。请注意,纯文本是 Base64 编码的,在返回值之前需要对 Base64 进行解码。设置 Kafka 验证凭证时,handleRequest 方法使用 decryptData 方法。

    private String decryptData(String cipherText) {
     
        AbstractAuthenticationDetailsProvider provider = ResourcePrincipalAuthenticationDetailsProvider.builder().build();
        KmsCryptoClient cryptoClient = KmsCryptoClient.builder().endpoint(kmsEndpoint).build(provider);
     
        DecryptDataDetails decryptDataDetails = DecryptDataDetails.builder().keyId(kmsKeyId).ciphertext(cipherText).build();
        DecryptRequest decryptRequest = DecryptRequest.builder().decryptDataDetails(decryptDataDetails).build();
        DecryptResponse decryptResponse = cryptoClient.decrypt(decryptRequest);
     
        final String base64String = decryptResponse.getDecryptedData().getPlaintext();
        byte[] byteArray = Base64.decodeBase64(base64String.getBytes());
        String value = new String(byteArray);
     
        return value;
    }
    
  6. 创建 handleRequest 方法。

    此方法从 Oracle Vault 检索流式处理服务的身份证明,然后将事件消息发送到流式处理服务。它还做了一些“文件工作”:Kafka 配置和身份证明检索和解密。

    handleRequest 方法将事件对象作为其唯一参数。对象由 Oracle Functions 系统自动创建,并使用来自区块链的 JSON 数据进行填充。因此,活动内部类需要直接映射到区块链中产生的 JSON 数据。

    首先需要检查“测试”事件类型。订阅区块链事件时,第一个事件是您不想传递给事件使用者的测试事件。

    1. 检查 "test" 事件类型。
      public String handleRequest(Event event) {
       
          if(event.eventType.equalsIgnoreCase("test")) {
              return "done";
          }
          .
          .
          .
      }
    2. 构建流式处理服务 SASL 配置所需的字符串。
          .
          .
          .
      
          String loginModule = "org.apache.kafka.common.security.plain.PlainLoginModule"
          String saslJassConfig = String.format(
              "%s required username=\"%s/%s/%s\" password=\"%s\";",
              loginModule,
              tenancyName,
              decryptData(userName),
              streamOCID,
              decryptData(authToken));
          .
          .
          .
      
    3. 创建 java.util.Properties 对象并使用配置填充该对象。
          .
          .
          .
          Properties props = new Properties();
          props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG , bootstrapServers);
          props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG , StringSerializer.class.getName());
          props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , StringSerializer.class.getName());
          props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
          props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
          props.put(SaslConfigs.SASL_JAAS_CONFIG, saslJassConfig);
          props.put(ProducerConfig.RETRIES_CONFIG, 5);
          props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024 * 1024);
          .
          .
          .
      
    4. 注册配置并将事件消息发送到流式处理服务。
          .
          .
          .
          KafkaProducer<String,String> producer = new KafkaProducer<>(props);
          ProducerRecord<String,String> record = new ProducerRecord<>(event.eventMessage.eventName, event.toString());
      
          producer.send(record);
          producer.flush();
          producer.close();
      
  7. 指示系统在发送消息以传递到流服务时调用 handleRequest

    为此,请修改 func.yaml 中的 cmd 属性,以指向 EventProducer 类中的 handleRequest 方法。例如:

    schema_version: 20180708
    name: obpeventsfunc
    version: 0.0.1
    runtime: java
    build_image: fnproject/fn-java-fdk-build:jdk11-1.0.105
    run_image: fnproject/fn-java-fdk:jre11-1.0.105
    cmd: producer.EventProducer::handleRequest

构建事件中继

为事件中继创建 Java 类后,可使用该类构建要推送到 Oracle Cloud Infrastructure Registry 的 Docker 映像。构建映像后,必须使用有关 OCI 区域和对象存储名称空间的信息对其进行标记。

必须先使用 docker tag 命令创建本地源映像的副本作为新映像(新映像实际上只是对现有源映像的引用),然后才能推送映像。作为新映像的名称,请在要推送映像的 Oracle Cloud Infrastructure Registry 中指定目标位置的全限定路径。

标记需要以下信息:

  • 您所在区域的 Docker 注册端点
  • 对象存储名称空间

要获取您所在区域的端点,请在按区域列出的可用性的表中查找区域。例如,美国东部(阿什本)的注册表端点是 https://iad.ocir.io

要查找对象存储名称空间,请执行以下操作:

  1. 在导航菜单中,单击管理,然后单击租户详细信息
  2. 对象存储名称空间位于“对象存储设置”部分中。
  1. 在顶级目录中,运行构建命令。顶级目录是运行 fn init 命令时创建的目录。
    fn build
  2. 使用 OCI 区域和对象存储名称空间信息标记图像。

    例如,如果区域的端点为 https://iad.ocir.io,并且您的对象存储名称空间为 yzl1yzrddld7,则假定您使用 obpeventsfunc 作为事件中继的名称,该命令如下所示:

    docker tag obpeventsfunc:0.0.1 iad.ocir.io/yzl1yzrddld7/obpeventsfunc:0.0.1

    注意:

    确保省略命令中的 https:// 协议信息。

将事件中继推送到注册表

构建事件中继并具有正确的标记后,您可以将其推送到 Oracle Cloud Infrastructure Registry

登录 Oracle Cloud Infrastructure Registry 时需要以下信息:
  • 您所在区域的 API 端点: 例如, iad.ocir.io。与之前用来标记 Docker 映像的值相同。
  • 租户名称空间:租户的自动生成的对象存储名称空间字符串(如租户信息页上所示)。
  • 用户名:Oracle Cloud Infrastructure 中的用户名。
  • 验证标记:您在“计划”部分中先前创建的标记。
  1. 登录到 Oracle Cloud Infrastructure Registry
    docker login <region-key>.ocir.io

    出现提示时,按 <tenancy-namespace>/<username> 格式输入用户名。例如,yzl1yzrddld7/jdoe@example.com。如果租户与 Oracle Identity Cloud Service 联合,请使用格式 <tenancy-namespace>/oracleidentitycloudservice/<username>

  2. 出现提示时,输入您之前创建的验证标记。
  3. 将 Docker 映像从您的计算机推送到 Oracle Cloud Infrastructure Registry

    例如,如果区域的端点为 https://iad.ocir.io,并且您的对象存储名称空间为 yzl1yzrddld7,则该命令如下所示:

    docker push iad.ocir.io/yzl1yzrddld7/obpeventsfunc:0.0.1

创建 API 密钥

生成密钥对并将公共密钥上载到 Oracle Cloud Infrastructure 用户概要文件。

您可以通过多种方式执行此操作,但此处我们将使用 Cloud Shell,因为您可以将生成的密钥直接安全上载到您的账户。

  1. 登录租户的云 Shell。
  2. 为键对创建目录
    mkdir ~/.oci
  3. 生成一个 2048 位私有密钥并将其存储在名为 oci_api_key.pem 的文件中。
    openssl genrsa -out ~/.oci/oci_api_key.pem 2048
  4. 将文件设为用户只读。
    chmod u=r,go= ~/.oci/oci_api_key.pem
  5. 生成公共密钥并将其存储于名为 oci_api_key_public.pem 的文件中。
    openssl rsa -pubout -in ~/.oci/oci_api_key.pem -out ~/.oci/oci_api_key_public.pem
  6. 将公钥上载到您的用户概要信息。

    在以下代码片段中,将 --user-id 值替换为自己的 OCID,并将 --region 值更改为您自己的区域的区域关键字值。

    oci iam user api-key upload \
    --user-id oci.user.oc1..aaaaa ... z3goa \
    --key-file ~/.oci/oci_api_key_public.pem \
    --region IAD
  7. 在 JSON 响应中,找到并记录指纹的值。您以后将需要此值。

应用 Terraform 配置

从 GitHub 系统信息库下载 Terraform 配置,更新 Terraform 变量文件,然后应用配置。

开始之前,请确保提供以下信息。terraform.tvars 文件需要此信息。
  • 区域 - 区域的区域标识符。您可以从 Oracle Cloud Infrastructure 控制台的 URL 中提取此项。例如,如果 URL 为 https://console.us-ashburn-1.oraclecloud.com/compute/instances,则区域标识符为 us-ashburn-1。如果您的 URL 为 https://cloud.oracle.com/compute/instances,则必须在 https://docs.oracle.com/iaas/Content/General/Concepts/regions.htm 中查找区域标识符
  • 区间 OCID - 包含项目资源的区间的 OCID。如果您按照“计划”部分中的说明操作,则需要 OBP_Events 区间的 OCID。
  • 指纹- 以前生成并上载到您的概要信息的公共 API 密钥的指纹。
  • 私有密钥- 您以前生成的专用 API 密钥的完整路径和文件名。例如,/home/opc/oci_api_key.pem。请勿在路径中使用 ~
  • 用户 OCID- 可以从用户详细信息页获取此信息。打开“控制台”菜单,获得身份并单击用户。单击列表中的用户名。
  • 租户 OCID - 可以从租户详细信息页获取。打开控制台菜单,获得管理,然后单击租户详细信息

当您拥有该信息时,请下载并应用 Terraform 配置。

  1. 登录租户的云 Shell。
  2. 从 GitHub 克隆 oci-obp-extension 系统信息库。
    git clone https://github.com/oracle-quickstart/oci-obp-extension.git
  3. 转到 Terraform 目录。
    cd oci-obp-extension/terraform
  4. 在文本编辑器中打开 terraform.tvars 文件,并将占位符值替换为实际值。
  5. 清除 OCI_AUTHOCI_use_obo_token 环境变量。
    unset OCI_AUTH OCI_use_obo_token
  6. 准备 Terraform 配置的目录。
    terraform init
  7. 查看并验证 Terraform 配置将创建什么。
    terraform plan
  8. 应用 Terraform 配置。
    terraform apply -auto-approve
  9. 记录在 Terraform 配置完成时显示的 API 网关信息。您需要记录以下值:OBP_Event_Subscribe_Callbackuse_to_extract_ssl_certificate

订阅区块链事件

注册回调 URL,以便 API 网关可以接收区块链事件。

注册的 URL 是 Terraform 进程完成时显示的 API 网关回调 URL。格式为 https://joh5rb...fuxy.apigateway.us-ashburn-1.oci.customer-oci.com/obpevents/callback。您还需要 API 网关部署端点,该端点在 Terraform 进程完成时显示为 use_to_extract_ssl_certificate。格式为 joh5rb...fuxy.apigateway.us-ashburn-1.oci.customer-oci.com:443

您还需要 Oracle Blockchain Platform REST API 端点才能订阅链代码事件。其形式如下:

https://<rest_server_url:port/restproxy#>/bcsgw/rest/v1/event/subscribe

<rest_server_url:port/restproxy#> 是 Oracle Blockchain Platform 控制台中列出的 REST 代理的 URL。在控制台中,打开“节点”页并查找 REST 代理节点。在“路由”列中,列出了包括端口和 REST 代理编号的 URL。

要订阅区块链事件,请执行以下操作:

  1. 提取 API Gateway 回调的 caCert。
    1. 登录租户的云 Shell。
    2. 运行以下命令。

      将文本 API_GATEWAY 替换为之前在 Terraform 配置步骤结束时记录的 use_to_extract_ssl_certificate 值。

      echo | openssl s_client -showcerts \
      -connect API_GATEWAY 2>/dev/null | openssl x509 \
      -inform pem -out cert.pem;echo;echo;awk 'NF {sub(/\r/, ""); \
      printf "%s\\n",$0;}'  cert.pem;rm cert.pem;echo;echo
    3. 复制显示的证书。确保包括文本 -----BEGIN CERTIFICATE----------END CERTIFICATE-----
  2. 将中继功能注册到 Oracle Blockchain Platform。

    使用区块链 REST API 注册中继函数。使用 cURL、Postman 或其他工具将请求发布到区块链。请求正文具有以下 JSON 格式:

    {
      "requests": [
        {
          "eventType": "chaincodeEvent",
          "callbackURL":"https://joh5rb...fuxy.apigateway.us-ashburn-1.oci.customer-oci.com/obpevents/callback",
          "expires": "10m",
          "chaincode": "obcs-cardealer",
          "eventName": "VehiclePartCreated",
          "callbackTlsCerts": {
              "caCert": "-----BEGIN CERTIFICATE-----\n ... \n-----END CERTIFICATE-----\n" 
          }
        }
      ]
      

    确保使用适用于您的 callbackURLchaincodeeventNamecaCert 值。

    注意:

    由于构建和部署函数容器所需的时间,在获得成功响应之前可能需要多次尝试。

创建和测试事件使用者

事件使用者使用 Kafka API 来验证和检索来自 Oracle Cloud Infrastructure 流处理的区块链消息。

要创建消费者,您需要以下字符串的信息:

  • USER-NAME:用户名。
  • AUTH-TOKEN:这是您之前生成并复制的标记。
  • TENANCY-NAME:可以在 OCI 控制台的 Tenancy Details 页面上找到此值。要打开租户详细信息页,请在控制台导航菜单中,选择管理,然后单击租户详细信息
  • STREAM-POOL-OCID:这是运行 Terraform 脚本时创建的流池的 OCID。要查找此值,请打开控制台导航菜单,然后转到管理并单击流处理。选择流池,并在打开复制 OCID 的页面中。
  • REGION:"us-ashburn-1"。您所在的区域可能有所不同。
  • EVENT-NAME:这是您之前注册回调 URL 以接收区块链事件时使用的事件名称。

使用以下示例测试并验证系统是否按预期运行。

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class Consumer {

    public static void main(String[] args) throws Exception {
       
        Properties props = new Properties();
        props.put("bootstrap.servers", "streaming.REGION.oci.oraclecloud.com:9092");
        props.put("security.protocol", "SASL_SSL");
        props.put("sasl.mechanism", "PLAIN");
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"TENANCY-NAME/USER-NAME/STREAM-POOL-OCID\" password=\"AUTH-TOKEN\";");
        props.put("group.id", "group-0");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // Create a consumer and subscribe to it
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("EVENT-NAME"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s\n", 
                    record.offset(), record.key(), record.value());
        }     
    } 
}

启动使用者时,它将消耗已推入流中的所有现有事件。

在示例工作且表明解决方案工作正常之后,您可以创建一个或多个生产级消费者应用。应用程序可以使用 JavaScript、Python 或其他语言编写。