Desenvolva e Implemente o Relay do Evento

Crie um aplicativo do Oracle Functions que consuma eventos de blockchain e os retransmita ao Oracle Streaming Service.

O aplicativo de retransmissão de eventos precisa de credenciais para estabelecer conexão com o serviço de streaming. As credenciais são mantidas no Oracle Cloud Infrastructure Vault. As credenciais são armazenadas no Vault quando você executa o código do Terraform. As credenciais são geradas quando você faz a configuração inicial do seu ambiente OCI.

A Função fornece os seguintes serviços:

  1. Recuperar credenciais do Serviço de Streaming do Vault.
  2. Descriptografar as credenciais (O Vault armazena valores criptografados).
  3. Decodificar da string codificada em base64.
  4. Use as credenciais decodificadas para estabelecer conexão com o Serviço de Streaming usando a API compatível com o Kafka.
  5. Crie e preencha um objeto Java chamado event na mensagem de evento JSON.
  6. Use a API do Kafka para enviar a mensagem de evento.

Criar o Relay do Evento

Você pode criar o retransmissão em qualquer linguagem de programação, mas, como as APIs Kafka são documentadas no Java, é melhor usar o Java.

A classe EventProducer terá os seguintes métodos:

@FnConfiguration
public void config(RuntimeContext ctx) { ... }

private String decryptData(String cipherText) { ... }

public String handleRequest(Event event) { ... }

As seguintes dependências também são necessárias no arquivo pom.xml:

<dependencies>
    <dependency>
        <groupId>com.oracle.oci.sdk</groupId>
        <artifactId>oci-java-sdk-keymanagement</artifactId>
        <version>1.12.5</version>
    </dependency>
    <dependency>
        <groupId>javax.activation</groupId>
        <artifactId>javax.activation-api</artifactId>
        <version>1.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.4.0</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.7.30</version>
    </dependency>
    <dependency>
        <groupId>com.fnproject.fn</groupId>
        <artifactId>api</artifactId>
        <version>${fdk.version}</version>
    </dependency>
    .
    .
    .

Certifique-se de ter a ferramenta CLI fn instalada. Consulte https://github.com/fnproject/fn#quickstart para obter mais informações.

  1. Crie um projeto fn e forneça-lhe um nome razoável como obpeventsfunc.
    fn init obpeventsfunc --runtime java
  2. Substitua HelloFunction.java por um arquivo chamado EventProducer.java.

    Todo o código estará nesta classe única. Para construir a função, você deve importar as seguintes classes:

    com.fnproject.fn.api.FnConfiguration
    com.fnproject.fn.api.RuntimeContext
    com.oracle.bmc.auth.AbstractAuthenticationDetailsProvider
    com.oracle.bmc.auth.ResourcePrincipalAuthenticationDetailsProvider
    com.oracle.bmc.keymanagement.KmsCryptoClient
    com.oracle.bmc.keymanagement.model.DecryptDataDetails
    com.oracle.bmc.keymanagement.requests.DecryptRequest
    com.oracle.bmc.keymanagement.responses.DecryptResponse
    org.apache.commons.codec.binary.Base64
    org.apache.kafka.clients.CommonClientConfigs
    org.apache.kafka.clients.producer.KafkaProducer
    org.apache.kafka.clients.producer.ProducerConfig
    org.apache.kafka.clients.producer.ProducerRecord
    org.apache.kafka.common.config.SaslConfigs
    org.apache.kafka.common.serialization.StringSerializer
    java.util.Properties
  3. Crie uma classe interna para conter a mensagem de evento do blockchain.

    Você precisa de um objeto que corresponda à estrutura da mensagem de evento. As três definições de classe fazem parte da classe EventProducer. Essa classe é usada pelo Oracle Functions para desserializar os dados JSON que vêm do Oracle Blockchain Platform.

    public static class Event {
         public String eventType;
         public String subid;
         public String channel;
         public EventMsg eventMsg;
     
         @Override
         public String toString() {
             return eventMsg.payload.data;
         }
     }
     
     public static class EventMsg {
         public String chaincodeId;
         public String txId;
         public String eventName;
         public Payload payload;
     }
     
     public static class Payload {
         public String type;
         public String data;
     }
  4. Defina várias variáveis de String globalmente para conter os valores de configuração, que são preenchidas no método config.

    Devido à anotação @FnConfiguration, o sistema passa um objeto RuntimeContext para esse método. Em seguida, use getConfigurationByKey para preencher as variáveis globais. Você precisa de uma variável para cada uma das seguintes chaves de configuração:

    BOOT_STRAP_SERVERS
    STREAM_OCID
    TENANT_NAME 
    USER_NAME
    AUTH_TOKEN
    KMS_ENDPOINT
    KMS_KEY_ID
    

    Por exemplo:

    @FnConfiguration
    public void config(RuntimeContext ctx) { 
        bootstrapServers = ctx.getConfigurationByKey("BOOT_STRAP_SERVERS").orElse("");
        streamOCID = ...
        .
        .
        .
    }
  5. Crie o método decryptData.

    O método decryptData será semelhante ao trecho de código a seguir. Os valores para kmsEndpoint e kmsKeyId são preenchidos no método config e correspondem às chaves de configuração KMS_ENDPOINT e KMS_KEY_ID. Observe que o texto simples é codificado em Base64 e precisa ser decodificado em Base64 antes de retornar o valor. O método decryptData é usado pelo método handleRequest ao definir as credenciais de autenticação do Kafka.

    private String decryptData(String cipherText) {
     
        AbstractAuthenticationDetailsProvider provider = ResourcePrincipalAuthenticationDetailsProvider.builder().build();
        KmsCryptoClient cryptoClient = KmsCryptoClient.builder().endpoint(kmsEndpoint).build(provider);
     
        DecryptDataDetails decryptDataDetails = DecryptDataDetails.builder().keyId(kmsKeyId).ciphertext(cipherText).build();
        DecryptRequest decryptRequest = DecryptRequest.builder().decryptDataDetails(decryptDataDetails).build();
        DecryptResponse decryptResponse = cryptoClient.decrypt(decryptRequest);
     
        final String base64String = decryptResponse.getDecryptedData().getPlaintext();
        byte[] byteArray = Base64.decodeBase64(base64String.getBytes());
        String value = new String(byteArray);
     
        return value;
    }
    
  6. Crie o método handleRequest.

    Este método recupera as credenciais do serviço Streaming do Oracle Vault e envia a mensagem de evento para o serviço Streaming. Ele também faz algumas "papel": Configuração Kafka e recuperação de credenciais e decriptografia.

    O método handleRequest toma um objeto Evento como seu único parâmetro. O objeto é criado automaticamente pelo sistema Oracle Functions e preenchido com dados do JSON que vem do Blockchain. Por esse motivo, a classe interna Evento precisa mapear diretamente para os dados JSON provenientes do Blockchain.

    A primeira coisa que você precisa fazer é verificar o tipo de evento "teste". Quando você se inscreve em eventos de blockchain, o primeiro evento é um evento de teste que você não deseja retransmitir para consumidores de eventos.

    1. Verifique o tipo de evento "teste".
      public String handleRequest(Event event) {
       
          if(event.eventType.equalsIgnoreCase("test")) {
              return "done";
          }
          .
          .
          .
      }
    2. Crie a string necessária para a configuração SASL do Serviço de Streaming.
          .
          .
          .
      
          String loginModule = "org.apache.kafka.common.security.plain.PlainLoginModule"
          String saslJassConfig = String.format(
              "%s required username=\"%s/%s/%s\" password=\"%s\";",
              loginModule,
              tenancyName,
              decryptData(userName),
              streamOCID,
              decryptData(authToken));
          .
          .
          .
      
    3. Crie um objeto java.util.Properties e preencha-o com a configuração.
          .
          .
          .
          Properties props = new Properties();
          props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG , bootstrapServers);
          props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG , StringSerializer.class.getName());
          props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , StringSerializer.class.getName());
          props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
          props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
          props.put(SaslConfigs.SASL_JAAS_CONFIG, saslJassConfig);
          props.put(ProducerConfig.RETRIES_CONFIG, 5);
          props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024 * 1024);
          .
          .
          .
      
    4. Registre a configuração e envie a mensagem de evento para o Serviço de Streaming.
          .
          .
          .
          KafkaProducer<String,String> producer = new KafkaProducer<>(props);
          ProducerRecord<String,String> record = new ProducerRecord<>(event.eventMessage.eventName, event.toString());
      
          producer.send(record);
          producer.flush();
          producer.close();
      
  7. Instrua o sistema a chamar handleRequest sempre que uma mensagem for enviada para retransmissão para o serviço de streaming.

    Para fazer isso, modifique a propriedade cmd em func.yaml para apontar para o método handleRequest na classe EventProducer. Por exemplo:

    schema_version: 20180708
    name: obpeventsfunc
    version: 0.0.1
    runtime: java
    build_image: fnproject/fn-java-fdk-build:jdk11-1.0.105
    run_image: fnproject/fn-java-fdk:jre11-1.0.105
    cmd: producer.EventProducer::handleRequest

Crie o Relay do Evento

Depois de criar a classe Java para a retransmissão do evento, use-a para criar a imagem do Docker que você enviará para o Oracle Cloud Infrastructure Registry. Após a criação da imagem, você deverá marcá-la com informações sobre a Região do OCI e o namespace do Object Storage.

Antes de enviar uma imagem, você deve primeiro usar o comando docker tag para criar uma cópia da imagem de origem local como uma nova imagem (a nova imagem é, na verdade, apenas uma referência para a imagem de origem existente). Como um nome para a nova imagem, especifique o caminho totalmente qualificado para o local de destino no Oracle Cloud Infrastructure Registry onde deseja enviar a imagem.

Você precisa das seguintes informações para a tag:

  • O ponto final do registro do Docker da sua região
  • Seu namespace do serviço Object Storage

Para obter o ponto final da sua região, procure sua região na tabela em Disponibilidade por Região. Por exemplo, o ponto final do registro para Leste dos EUA (Ashburn) é https://iad.ocir.io.

Para localizar seu namespace do serviço Object Storage:

  1. No menu de navegação, clique em Administração e, em seguida, clique em Detalhes da Tenancy.
  2. O Namespace de Armazenamento de Obseto está na seção Definições de Armazenamento de Objetos
  1. No diretório de nível superior, execute o comando build. O diretório de nível superior é aquele que você criou quando executou o comando fn init.
    fn build
  2. Marque a imagem com as informações de Região do OCI e Namespace do Object Storage.

    Por exemplo, se o ponto final da sua região for https://iad.ocir.io e seu Namespace de Armazenamento de Objetos for yzl1yzrddld7, o comando será o seguinte, pressupondo que você use obpeventsfunc como o nome do retransmissão do evento:

    docker tag obpeventsfunc:0.0.1 iad.ocir.io/yzl1yzrddld7/obpeventsfunc:0.0.1

    Observação:

    Certifique-se de omitir as informações do protocolo https:// do comando.

Enviar o Relay do Evento para o Registro

Depois que o retransmissão de eventos for criado e tiver a tag adequada, você poderá enviá-la para o Oracle Cloud Infrastructure Registry.

Você precisa das seguintes informações ao fazer log-in no Oracle Cloud Infrastructure Registry:
  • Ponto final da API para sua região: Por exemplo, iad.ocir.io. É o mesmo valor que você usou anteriormente para marcar a imagem do Docker.
  • Namespace da Tenancy: a string de namespace do Object Storage gerada automaticamente de sua tenancy (conforme mostrado na página Informações da Tenancy).
  • Nome do Usuário: Seu nome de usuário no Oracle Cloud Infrastructure.
  • Token de Autenticação: o token criado anteriormente na seção Plano.
  1. Acesse o Oracle Cloud Infrastructure Registry.
    docker login <region-key>.ocir.io

    Quando solicitado, informe seu nome de usuário no formato <tenancy-namespace>/<username>. Por exemplo, yzl1yzrddld7/jdoe@example.com. Se sua tenancy for federada com o Oracle Identity Cloud Service, use o formato <tenancy-namespace>/oracleidentitycloudservice/<username>.

  2. Quando solicitado, informe o token de autenticação criado anteriormente.
  3. Envie a imagem do Docker do seu computador para o Oracle Cloud Infrastructure Registry.

    Por exemplo, se o ponto final da sua região for https://iad.ocir.io e seu Namespace de Armazenamento de Objetos for yzl1yzrddld7, o comando será o seguinte:

    docker push iad.ocir.io/yzl1yzrddld7/obpeventsfunc:0.0.1

Criar uma Chave de API

Gere um par de chaves e faça upload da chave pública para o perfil de usuário do Oracle Cloud Infrastructure.

Você pode fazer isso de duas maneiras, mas aqui usaremos o Cloud Shell porque você pode fazer upload seguro da chave gerada diretamente para sua conta.

  1. Faça log-in no Cloud Shell da sua tenancy.
  2. Criar um diretório para o par de chaves
    mkdir ~/.oci
  3. Gere uma chave privada de 2048 bits e armazene-a em um arquivo chamado oci_api_key.pem.
    openssl genrsa -out ~/.oci/oci_api_key.pem 2048
  4. Torne o arquivo somente leitura pelo usuário.
    chmod u=r,go= ~/.oci/oci_api_key.pem
  5. Gere uma chave pública e armazene-a em um arquivo chamado oci_api_key_public.pem.
    openssl rsa -pubout -in ~/.oci/oci_api_key.pem -out ~/.oci/oci_api_key_public.pem
  6. Faça upload da chave pública para seu perfil de usuário.

    No trecho a seguir, substitua o valor --user-id pelo seu próprio OCID e altere o valor --region para o valor da Chave da Região da sua própria Região.

    oci iam user api-key upload \
    --user-id oci.user.oc1..aaaaa ... z3goa \
    --key-file ~/.oci/oci_api_key_public.pem \
    --region IAD
  7. Na resposta JSON, localize e registre o valor da impressão digital. Esse valor será necessário mais tarde.

Aplicar a Configuração do Terraform

Faça download da configuração do Terraform no repositório do GitHub, atualize o arquivo de variáveis do Terraform e aplique a configuração.

Antes de começar, verifique se as informações a seguir estão disponíveis. As informações são necessárias para o arquivo terraform.tvars.
  • Região - o Identificador da Região da região. Você pode extrair isso do URL da Console do Oracle Cloud Infrastructure. Por exemplo, se o URL for https://console.us-ashburn-1.oraclecloud.com/compute/instances, o Identificador de Região será us-ashburn-1. Se o URL for https://cloud.oracle.com/compute/instances, você precisará pesquisar o Identificador da Região em https://docs.oracle.com/iaas/Content/General/Concepts/regions.htm
  • OCID do Compartimento - o OCID do compartimento que contém os recursos do projeto. Se você seguiu as instruções na seção Plano, precisará do OCID do compartimento OBP_Events.
  • Impressão Digital - a impressão digital da Chave de API pública que você gerou e fez upload para seu perfil anteriormente.
  • Chave privada - o caminho completo e o nome do arquivo da Chave de API privada que você gerou anteriormente. Por exemplo, /home/opc/oci_api_key.pem. Não use ~ no caminho.
  • OCID do Usuário - Você pode obter isso na página Detalhes do Usuário. Abra o menu da Console, vá para Identidade e clique em Usuários. Clique em seu nome de usuário na lista.
  • OCID da Tenancy - Você pode obter isso na página Detalhes da Tenancy. Abra o menu Console, vá para Administração e clique em Detalhes da Tenancy.

Quando você tiver as informações, faça download e aplique a configuração do Terraform.

  1. Faça log-in no Cloud Shell da sua tenancy.
  2. Clone o repositório oci-obp-extension do GitHub.
    git clone https://github.com/oracle-quickstart/oci-obp-extension.git
  3. Altere o diretório do Terraform.
    cd oci-obp-extension/terraform
  4. Abra o arquivo terraform.tvars em um editor de texto e substitua os valores do espaço reservado pelos valores reais.
  5. Limpe as variáveis de ambiente OCI_AUTH e OCI_use_obo_token.
    unset OCI_AUTH OCI_use_obo_token
  6. Prepare o diretório para a configuração do Terraform.
    terraform init
  7. Exiba e verifique qual configuração do Terraform será criada.
    terraform plan
  8. Aplique a configuração do Terraform.
    terraform apply -auto-approve
  9. Registre as informações do Gateway de API que são exibidas quando a configuração do Terraform é concluída. Você precisa fazer um registro dos seguintes valores: OBP_Event_Subscribe_Callback e use_to_extract_ssl_certificate.

Assinar Eventos do Blockchain

Registre um URL de callback para que o API Gateway possa receber eventos de blockchain.

O URL registrado é o URL de callback do Gateway de API que foi exibido quando o processo do Terraform foi concluído. É do formato https://joh5rb...fuxy.apigateway.us-ashburn-1.oci.customer-oci.com/obpevents/callback. Você também precisará do ponto final de implantação do API Gateway, que foi exibido como use_to_extract_ssl_certificate quando o processo do Terraform for concluído. É no formato joh5rb...fuxy.apigateway.us-ashburn-1.oci.customer-oci.com:443

Você também precisa do ponto final da API REST do Oracle Blockchain Platform para assinar eventos de chaincode. Ele tem o seguinte formato:

https://<rest_server_url:port/restproxy#>/bcsgw/rest/v1/event/subscribe

<rest_server_url:port/restproxy#> é o URL do proxy REST que é listado na console do Oracle Blockchain Platform. Na console, abra a página Nós e procure o nó proxy REST. Na coluna Rota, um URL, incluindo porta e um número de proxy REST, é listado.

Para assinar eventos de blockchain:

  1. Extraia o caCert do callback do Gateway de API.
    1. Faça log-in no Cloud Shell da sua tenancy.
    2. Execute o seguinte comando.

      Substitua o texto API_GATEWAY pelo valor de use_to_extract_ssl_certificate que você gravou anteriormente no final da etapa de configuração do Terraform.

      echo | openssl s_client -showcerts \
      -connect API_GATEWAY 2>/dev/null | openssl x509 \
      -inform pem -out cert.pem;echo;echo;awk 'NF {sub(/\r/, ""); \
      printf "%s\\n",$0;}'  cert.pem;rm cert.pem;echo;echo
    3. Copie o certificado que é exibido. Certifique-se de incluir o texto -----BEGIN CERTIFICATE----- e -----END CERTIFICATE-----.
  2. Registre a função de retransmissão com o Oracle Blockchain Platform.

    Use a API REST do blockchain para registrar a função de retransmissão. Use cURL, Postman ou outra ferramenta para POST da solicitação para o blockchain. O corpo da solicitação tem o seguinte formato JSON:

    {
      "requests": [
        {
          "eventType": "chaincodeEvent",
          "callbackURL":"https://joh5rb...fuxy.apigateway.us-ashburn-1.oci.customer-oci.com/obpevents/callback",
          "expires": "10m",
          "chaincode": "obcs-cardealer",
          "eventName": "VehiclePartCreated",
          "callbackTlsCerts": {
              "caCert": "-----BEGIN CERTIFICATE-----\n ... \n-----END CERTIFICATE-----\n" 
          }
        }
      ]
      

    Certifique-se de usar os valores callbackURL, chaincode, eventName e caCert que se aplicam a você.

    Observação:

    Talvez você precise de várias tentativas para obter uma resposta de sucesso devido ao tempo necessário para criar e implantar o contêiner de funções.

Criar e Testar o Consumidor de Eventos

O consumidor de eventos usa APIs Kafka para autenticar e recuperar mensagens de blockchain do Streaming do Oracle Cloud Infrastructure.

Para criar um consumidor, você precisa de informações para as seguintes Strings:

  • USER-NAME: Seu nome de usuário.
  • AUTH-TOKEN: Este é o token gerado e copiado anteriormente.
  • TENANCY-NAME: Você pode encontrar esse valor na página Detalhes da Tenancy da console do OCI. Para abrir a página Detalhes da Tenancy, no menu de navegação da Console, selecione Administração e clique em Detalhes da Tenancy.
  • STREAM-POOL-OCID: Este é o OCID do pool de fluxos que foi criado quando você executou o script Terraform. Para localizar esse valor, abra o menu de navegação da Console e vá para Administração e clique em Streaming. Selecione o pool de fluxos e na página que abre a cópia do OCID.
  • REGION: "us-ashburn-1". Sua região pode ser diferente.
  • EVENT-NAME: Este é o mesmo nome de evento usado anteriormente quando você registrou o URL de callback para receber eventos de blockchain.

Use o exemplo a seguir para testar e validar se o sistema está funcionando conforme o esperado.

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class Consumer {

    public static void main(String[] args) throws Exception {
       
        Properties props = new Properties();
        props.put("bootstrap.servers", "streaming.REGION.oci.oraclecloud.com:9092");
        props.put("security.protocol", "SASL_SSL");
        props.put("sasl.mechanism", "PLAIN");
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"TENANCY-NAME/USER-NAME/STREAM-POOL-OCID\" password=\"AUTH-TOKEN\";");
        props.put("group.id", "group-0");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // Create a consumer and subscribe to it
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("EVENT-NAME"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s\n", 
                    record.offset(), record.key(), record.value());
        }     
    } 
}

Quando você inicia o consumidor, ele consome todos os eventos existentes que já foram enviados para o fluxo.

Depois que o exemplo estiver funcionando e demonstrar que a solução funciona, você está pronto para criar um ou mais aplicativos de consumidor de nível de produção. Os aplicativos podem ser criados em JavaScript, Python ou outra linguagem.