开发和部署事件中继
创建 Oracle Functions 应用程序,该应用程序使用区块链事件并将它们转发到 Oracle 流式处理服务。
事件中继应用程序需要身份证明才能连接到流式服务。身份证明保存在 Oracle Cloud Infrastructure Vault 中。运行 Terraform 代码时,凭证存储在 Vault 中。进行 OCI 环境的初始配置时会生成凭证。
该功能提供以下服务:
- 从 Vault 检索流服务身份证明。
- 解密凭证(Vault 存储加密值)。
- 从 base64 编码字符串解码。
- 使用解码的身份证明使用与 Kafka 兼容的 API 连接到流处理服务。
- 从 JSON 事件消息创建并填充名为 event 的 Java 对象。
- 使用 Kafka API 发送事件消息。
创建事件中继
您可以使用任何编程语言创建中继,但因为 Java 中记录了 Kafka API,因此最好使用 Java。
EventProducer 类将具有以下方法:
@FnConfiguration
public void config(RuntimeContext ctx) { ... }
private String decryptData(String cipherText) { ... }
public String handleRequest(Event event) { ... }pom.xml 文件中也需要以下依赖项:
<dependencies>
<dependency>
<groupId>com.oracle.oci.sdk</groupId>
<artifactId>oci-java-sdk-keymanagement</artifactId>
<version>1.12.5</version>
</dependency>
<dependency>
<groupId>javax.activation</groupId>
<artifactId>javax.activation-api</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.30</version>
</dependency>
<dependency>
<groupId>com.fnproject.fn</groupId>
<artifactId>api</artifactId>
<version>${fdk.version}</version>
</dependency>
.
.
.
确保已安装 fn CLI 工具。有关更多信息,请参见 https://github.com/fnproject/fn#quickstart。
构建事件中继
必须先使用 docker tag 命令创建本地源映像的副本作为新映像(新映像实际上只是对现有源映像的引用),然后才能推送映像。作为新映像的名称,请在要推送映像的 Oracle Cloud Infrastructure Registry 中指定目标位置的全限定路径。
标记需要以下信息:
- 您所在区域的 Docker 注册端点
- 对象存储名称空间
要获取您所在区域的端点,请在按区域列出的可用性的表中查找区域。例如,美国东部(阿什本)的注册表端点是 https://iad.ocir.io。
要查找对象存储名称空间,请执行以下操作:
- 在导航菜单中,单击管理,然后单击租户详细信息。
- 对象存储名称空间位于“对象存储设置”部分中。
将事件中继推送到注册表
构建事件中继并具有正确的标记后,您可以将其推送到 Oracle Cloud Infrastructure Registry。
- 您所在区域的 API 端点: 例如,
iad.ocir.io。与之前用来标记 Docker 映像的值相同。 - 租户名称空间:租户的自动生成的对象存储名称空间字符串(如租户信息页上所示)。
- 用户名:Oracle Cloud Infrastructure 中的用户名。
- 验证标记:您在“计划”部分中先前创建的标记。
创建 API 密钥
生成密钥对并将公共密钥上载到 Oracle Cloud Infrastructure 用户概要文件。
您可以通过多种方式执行此操作,但此处我们将使用 Cloud Shell,因为您可以将生成的密钥直接安全上载到您的账户。
应用 Terraform 配置
从 GitHub 系统信息库下载 Terraform 配置,更新 Terraform 变量文件,然后应用配置。
terraform.tvars 文件需要此信息。
- 区域 - 区域的区域标识符。您可以从 Oracle Cloud Infrastructure 控制台的 URL 中提取此项。例如,如果 URL 为
https://console.us-ashburn-1.oraclecloud.com/compute/instances,则区域标识符为 us-ashburn-1。如果您的 URL 为https://cloud.oracle.com/compute/instances,则必须在 https://docs.oracle.com/iaas/Content/General/Concepts/regions.htm 中查找区域标识符 - 区间 OCID - 包含项目资源的区间的 OCID。如果您按照“计划”部分中的说明操作,则需要 OBP_Events 区间的 OCID。
- 指纹- 以前生成并上载到您的概要信息的公共 API 密钥的指纹。
- 私有密钥- 您以前生成的专用 API 密钥的完整路径和文件名。例如,
/home/opc/oci_api_key.pem。请勿在路径中使用~。 - 用户 OCID- 可以从用户详细信息页获取此信息。打开“控制台”菜单,获得身份并单击用户。单击列表中的用户名。
- 租户 OCID - 可以从租户详细信息页获取。打开控制台菜单,获得管理,然后单击租户详细信息。
当您拥有该信息时,请下载并应用 Terraform 配置。
订阅区块链事件
注册回调 URL,以便 API 网关可以接收区块链事件。
https://joh5rb...fuxy.apigateway.us-ashburn-1.oci.customer-oci.com/obpevents/callback。您还需要 API 网关部署端点,该端点在 Terraform 进程完成时显示为 use_to_extract_ssl_certificate。格式为 joh5rb...fuxy.apigateway.us-ashburn-1.oci.customer-oci.com:443您还需要 Oracle Blockchain Platform REST API 端点才能订阅链代码事件。其形式如下:
https://<rest_server_url:port/restproxy#>/bcsgw/rest/v1/event/subscribe<rest_server_url:port/restproxy#> 是 Oracle Blockchain Platform 控制台中列出的 REST 代理的 URL。在控制台中,打开“节点”页并查找 REST 代理节点。在“路由”列中,列出了包括端口和 REST 代理编号的 URL。
要订阅区块链事件,请执行以下操作:
创建和测试事件使用者
事件使用者使用 Kafka API 来验证和检索来自 Oracle Cloud Infrastructure 流处理的区块链消息。
要创建消费者,您需要以下字符串的信息:
- USER-NAME:用户名。
- AUTH-TOKEN:这是您之前生成并复制的标记。
- TENANCY-NAME:可以在 OCI 控制台的 Tenancy Details 页面上找到此值。要打开租户详细信息页,请在控制台导航菜单中,选择管理,然后单击租户详细信息。
- STREAM-POOL-OCID:这是运行 Terraform 脚本时创建的流池的 OCID。要查找此值,请打开控制台导航菜单,然后转到管理并单击流处理。选择流池,并在打开复制 OCID 的页面中。
- REGION:"us-ashburn-1"。您所在的区域可能有所不同。
- EVENT-NAME:这是您之前注册回调 URL 以接收区块链事件时使用的事件名称。
使用以下示例测试并验证系统是否按预期运行。
import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class Consumer {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "streaming.REGION.oci.oraclecloud.com:9092");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"TENANCY-NAME/USER-NAME/STREAM-POOL-OCID\" password=\"AUTH-TOKEN\";");
props.put("group.id", "group-0");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// Create a consumer and subscribe to it
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("EVENT-NAME"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
}
}启动使用者时,它将消耗已推入流中的所有现有事件。
在示例工作且表明解决方案工作正常之后,您可以创建一个或多个生产级消费者应用。应用程序可以使用 JavaScript、Python 或其他语言编写。