Desarrollo y despliegue de la transmisión de eventos

Cree una aplicación de Oracle Functions que consuma eventos de cadena de bloques y los transmita a Oracle Streaming Service.

La aplicación de retransmisión de eventos necesita credenciales para conectarse al servicio de transmisión. Las credenciales se mantienen en Oracle Cloud Infrastructure Vault. Las credenciales se almacenan en Vault al ejecutar el código de Terraform. Las credenciales se generan cuando se realiza la configuración inicial del entorno de OCI.

La función proporciona los siguientes servicios:

  1. Recupere las credenciales del servicio de flujo de Vault.
  2. Descifrar las credenciales (el almacén almacena valores cifrados).
  3. Desechar de cadena codificada en base64.
  4. Utilice las credenciales descodificadas para conectarse al servicio Streaming mediante una API compatible con Kafka.
  5. Cree y rellene un objeto Java denominado event desde el mensaje de evento de JSON.
  6. Utilice la API de Kafka para enviar el mensaje de evento.

Crear Relé de Eventos

Puede crear la transmisión en cualquier lenguaje de programación, pero como las API de Kafka están documentadas en Java, es mejor utilizar Java.

La clase EventProducer tendrá los siguientes métodos:

@FnConfiguration
public void config(RuntimeContext ctx) { ... }

private String decryptData(String cipherText) { ... }

public String handleRequest(Event event) { ... }

Las siguientes dependencias también son necesarias en el archivo pom.xml:

<dependencies>
    <dependency>
        <groupId>com.oracle.oci.sdk</groupId>
        <artifactId>oci-java-sdk-keymanagement</artifactId>
        <version>1.12.5</version>
    </dependency>
    <dependency>
        <groupId>javax.activation</groupId>
        <artifactId>javax.activation-api</artifactId>
        <version>1.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.4.0</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.7.30</version>
    </dependency>
    <dependency>
        <groupId>com.fnproject.fn</groupId>
        <artifactId>api</artifactId>
        <version>${fdk.version}</version>
    </dependency>
    .
    .
    .

Asegúrese de tener instalada la herramienta de CLI de fn. Consulte https://github.com/fnproject/fn#quickstart para obtener más información.

  1. Cree un proyecto fn y asígnele un nombre razonable, como obpeventsfunc.
    fn init obpeventsfunc --runtime java
  2. Sustituya HelloFunction.java por un archivo denominado EventProducer.java.

    Todo el código estará en esta sola clase. Para crear la función, debe importar las siguientes clases:

    com.fnproject.fn.api.FnConfiguration
    com.fnproject.fn.api.RuntimeContext
    com.oracle.bmc.auth.AbstractAuthenticationDetailsProvider
    com.oracle.bmc.auth.ResourcePrincipalAuthenticationDetailsProvider
    com.oracle.bmc.keymanagement.KmsCryptoClient
    com.oracle.bmc.keymanagement.model.DecryptDataDetails
    com.oracle.bmc.keymanagement.requests.DecryptRequest
    com.oracle.bmc.keymanagement.responses.DecryptResponse
    org.apache.commons.codec.binary.Base64
    org.apache.kafka.clients.CommonClientConfigs
    org.apache.kafka.clients.producer.KafkaProducer
    org.apache.kafka.clients.producer.ProducerConfig
    org.apache.kafka.clients.producer.ProducerRecord
    org.apache.kafka.common.config.SaslConfigs
    org.apache.kafka.common.serialization.StringSerializer
    java.util.Properties
  3. Cree una clase interna para retener el mensaje de evento de la cadena de bloques.

    Necesita un objeto que corresponda a la estructura del mensaje de evento. Las tres definiciones de clase forman parte de la clase EventProducer. Oracle Functions utiliza esta clase para deserializar los datos JSON que provienen de Oracle Blockchain Platform.

    public static class Event {
         public String eventType;
         public String subid;
         public String channel;
         public EventMsg eventMsg;
     
         @Override
         public String toString() {
             return eventMsg.payload.data;
         }
     }
     
     public static class EventMsg {
         public String chaincodeId;
         public String txId;
         public String eventName;
         public Payload payload;
     }
     
     public static class Payload {
         public String type;
         public String data;
     }
  4. Defina varias variables de cadena de forma global para contener los valores de configuración, que se rellenan en el método config.

    Debido a la anotación @FnConfiguration, el sistema transfiere un objeto RuntimeContext a este método. A continuación, utilice getConfigurationByKey para rellenar las variables globales. Necesita una variable para cada una de las siguientes claves de configuración:

    BOOT_STRAP_SERVERS
    STREAM_OCID
    TENANT_NAME 
    USER_NAME
    AUTH_TOKEN
    KMS_ENDPOINT
    KMS_KEY_ID
    

    Por ejemplo:

    @FnConfiguration
    public void config(RuntimeContext ctx) { 
        bootstrapServers = ctx.getConfigurationByKey("BOOT_STRAP_SERVERS").orElse("");
        streamOCID = ...
        .
        .
        .
    }
  5. Cree el método decryptData.

    El método decryptData se parecerá al siguiente fragmento. Los valores de kmsEndpoint y kmsKeyId se rellenan en el método config y corresponden a las claves de configuración KMS_ENDPOINT y KMS_KEY_ID. Tenga en cuenta que el texto sin formato está codificado en Base64 y debe descodificarse en Base64 antes de devolver el valor. El método decryptData lo utiliza el método handleRequest al definir credenciales de autenticación de Kafka.

    private String decryptData(String cipherText) {
     
        AbstractAuthenticationDetailsProvider provider = ResourcePrincipalAuthenticationDetailsProvider.builder().build();
        KmsCryptoClient cryptoClient = KmsCryptoClient.builder().endpoint(kmsEndpoint).build(provider);
     
        DecryptDataDetails decryptDataDetails = DecryptDataDetails.builder().keyId(kmsKeyId).ciphertext(cipherText).build();
        DecryptRequest decryptRequest = DecryptRequest.builder().decryptDataDetails(decryptDataDetails).build();
        DecryptResponse decryptResponse = cryptoClient.decrypt(decryptRequest);
     
        final String base64String = decryptResponse.getDecryptedData().getPlaintext();
        byte[] byteArray = Base64.decodeBase64(base64String.getBytes());
        String value = new String(byteArray);
     
        return value;
    }
    
  6. Cree el método handleRequest.

    Este método recupera las credenciales para el servicio Streaming de Oracle Vault y, a continuación, envía el mensaje de evento al servicio Streaming. También hace algo de "trabajo en papel": configuración de Kafka y recuperación y descifrado de credenciales.

    El método handleRequest toma un objeto Event como su único parámetro. El sistema Oracle Functions crea automáticamente el objeto y se rellena con datos del JSON que proviene de Blockchain. Por este motivo, la clase interna Event debe asignarse directamente a los datos JSON que provienen de Blockchain.

    Lo primero que debe hacer es comprobar el tipo de evento "test". Cuando se suscribe a eventos de cadena de bloques, el primer evento es un evento de prueba que no desea retransmitir a los consumidores de eventos.

    1. Compruebe el tipo de evento "test".
      public String handleRequest(Event event) {
       
          if(event.eventType.equalsIgnoreCase("test")) {
              return "done";
          }
          .
          .
          .
      }
    2. Cree la cadena necesaria para la configuración de SASL del servicio Streaming.
          .
          .
          .
      
          String loginModule = "org.apache.kafka.common.security.plain.PlainLoginModule"
          String saslJassConfig = String.format(
              "%s required username=\"%s/%s/%s\" password=\"%s\";",
              loginModule,
              tenancyName,
              decryptData(userName),
              streamOCID,
              decryptData(authToken));
          .
          .
          .
      
    3. Cree un objeto java.util.Properties y rellénelo con la configuración.
          .
          .
          .
          Properties props = new Properties();
          props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG , bootstrapServers);
          props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG , StringSerializer.class.getName());
          props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , StringSerializer.class.getName());
          props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
          props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
          props.put(SaslConfigs.SASL_JAAS_CONFIG, saslJassConfig);
          props.put(ProducerConfig.RETRIES_CONFIG, 5);
          props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024 * 1024);
          .
          .
          .
      
    4. Registre la configuración y envíe el mensaje de evento al servicio Streaming.
          .
          .
          .
          KafkaProducer<String,String> producer = new KafkaProducer<>(props);
          ProducerRecord<String,String> record = new ProducerRecord<>(event.eventMessage.eventName, event.toString());
      
          producer.send(record);
          producer.flush();
          producer.close();
      
  7. Indique al sistema que llame a handleRequest cada vez que se envíe un mensaje para la retransmisión al servicio de transmisión.

    Para ello, modifique la propiedad cmd en func.yaml para que apunte al método handleRequest en la clase EventProducer. Por ejemplo:

    schema_version: 20180708
    name: obpeventsfunc
    version: 0.0.1
    runtime: java
    build_image: fnproject/fn-java-fdk-build:jdk11-1.0.105
    run_image: fnproject/fn-java-fdk:jre11-1.0.105
    cmd: producer.EventProducer::handleRequest

Crear Relé de Eventos

Después de crear la clase Java para la retransmisión de eventos, utilícela para crear la imagen de Docker que enviará a Oracle Cloud Infrastructure Registry. Una vez creada la imagen, debe etiquetarla con información sobre la región de OCI y el espacio de nombres de Object Storage.

Antes de poder transferir una imagen, debe utilizar el comando docker tag para crear una copia de la imagen de origen local como una nueva imagen (la nueva imagen es realmente solo una referencia a la imagen de origen existente). Como nombre de la nueva imagen, especifique la ruta de acceso totalmente cualificada a la ubicación de destino en Oracle Cloud Infrastructure Registry en la que desea transferir la imagen.

Necesita la siguiente información para la etiqueta:

  • El punto final del registro de Docker para su región
  • Su espacio de nombres de Object Storage

Para obtener el punto final de su región, consulte su región en la tabla de Disponibilidad por región. Por ejemplo, el punto final de registro para Este de EE. UU. (Ashburn) es https://iad.ocir.io.

Para buscar el espacio de nombres de Object Storage:

  1. En el menú de navegación, haga clic en Administración y, a continuación, en Detalles de arrendamiento.
  2. El espacio de nombres de almacenamiento de objetos está en la sección Object Storage Settings.
  1. En el directorio de nivel superior, ejecute el comando build. El directorio de nivel superior es el que creó cuando ejecutó el comando fn init.
    fn build
  2. Etiquete la imagen con la información de región de OCI y espacio de nombres de Object Storage.

    Por ejemplo, si el punto final de su región es https://iad.ocir.io y el espacio de nombres de Object Storage es yzl1yzrddld7, el comando es el siguiente, suponiendo que utiliza obpeventsfunc como nombre de la retransmisión de eventos:

    docker tag obpeventsfunc:0.0.1 iad.ocir.io/yzl1yzrddld7/obpeventsfunc:0.0.1

    Nota:

    Asegúrese de omitir la información del protocolo https:// del comando.

Enviar Relé de Evento al Registro

Una vez creada la retransmisión del evento y con la etiqueta adecuada, puede enviarla a Oracle Cloud Infrastructure Registry.

Necesita la siguiente información al conectarse a Oracle Cloud Infrastructure Registry:
  • Punto final de API para su región: por ejemplo, iad.ocir.io. Es el mismo valor que utilizó anteriormente para etiquetar la imagen de Docker.
  • Espacio de nombres de arrendamiento: cadena de espacio de nombres de almacenamiento de objetos generada automáticamente de su arrendamiento (como se muestra en la página Información de arrendamiento).
  • Nombre de usuario: su nombre de usuario en Oracle Cloud Infrastructure.
  • Token de autenticación: token que creó anteriormente en la sección Plan.
  1. Conéctese a Oracle Cloud Infrastructure Registry.
    docker login <region-key>.ocir.io

    Cuando se le solicite, introduzca su nombre de usuario con el formato <tenancy-namespace>/<username>. Por ejemplo, yzl1yzrddld7/jdoe@example.com. Si su arrendamiento está federado con Oracle Identity Cloud Service, utilice el formato <tenancy-namespace>/oracleidentitycloudservice/<username>.

  2. Cuando se le solicite, introduzca el token de autenticación que ha creado antes.
  3. Envíe la imagen de Docker de la computadora a Oracle Cloud Infrastructure Registry.

    Por ejemplo, si el punto final de su región es https://iad.ocir.io y el espacio de nombres de Object Storage es yzl1yzrddld7, el comando es el siguiente:

    docker push iad.ocir.io/yzl1yzrddld7/obpeventsfunc:0.0.1

Crear una clave de API

Genere un par de claves y cargue la clave pública en su perfil de usuario de Oracle Cloud Infrastructure.

Puede hacerlo de dos formas, pero aquí usaremos Cloud Shell porque puede cargar de forma segura la clave generada directamente en su cuenta.

  1. Conéctese a Cloud Shell para su arrendamiento.
  2. Crear un directorio para el par de claves
    mkdir ~/.oci
  3. Genere una clave privada de 2048 bits y almacénela en un archivo denominado oci_api_key.pem.
    openssl genrsa -out ~/.oci/oci_api_key.pem 2048
  4. Convierta el archivo en de sólo lectura por parte del usuario.
    chmod u=r,go= ~/.oci/oci_api_key.pem
  5. Genere una clave pública y almacénela en un archivo denominado oci_api_key_public.pem.
    openssl rsa -pubout -in ~/.oci/oci_api_key.pem -out ~/.oci/oci_api_key_public.pem
  6. Cargue la clave pública en su perfil de usuario.

    En el siguiente fragmento, sustituya el valor --user-id por su propio OCID y cambie el valor --region al valor Region Key para su propia región.

    oci iam user api-key upload \
    --user-id oci.user.oc1..aaaaa ... z3goa \
    --key-file ~/.oci/oci_api_key_public.pem \
    --region IAD
  7. En la respuesta de JSON, localice y registre el valor para la huella. Necesitará este valor más adelante.

Aplicación de la configuración de Terraform

Descargue la configuración de Terraform desde el repositorio de GitHub, actualice el archivo de variables de Terraform y, a continuación, aplique la configuración.

Antes de empezar, asegúrese de tener la siguiente información disponible. La información es necesaria para el archivo terraform.tvars.
  • Región: el identificador de región de la región. Puede extraer esto de la URL de la consola de Oracle Cloud Infrastructure. Por ejemplo, si la URL es https://console.us-ashburn-1.oraclecloud.com/compute/instances, el identificador de región es us-ashburn-1. Si la URL es https://cloud.oracle.com/compute/instances, debe buscar el identificador de región en https://docs.oracle.com/iaas/Content/General/Concepts/regions.htm.
  • OCID de compartimento: el OCID del compartimento que contiene los recursos del proyecto. Si ha seguido las instrucciones de la sección Plan, necesita el OCID para el compartimento OBP_Events.
  • Huella: huella de la clave de API pública que ha generado y cargado en el perfil anteriormente.
  • Clave privada: la ruta completa y el nombre de archivo de la clave de API privada que ha generado anteriormente. Por ejemplo, /home/opc/oci_api_key.pem. No utilice ~ en la ruta de acceso.
  • OCID de usuario: puede obtenerlo de la página Detalles de usuario. Abra el menú de la consola, vaya a Identidad y haga clic en Usuarios. Haga clic en su nombre de usuario en la lista.
  • OCID de arrendamiento: puede obtenerlo de la página Detalles de arrendamiento. Abra el menú de la consola, vaya a Administración y haga clic en Detalles de arrendamiento.

Cuando tenga la información, descargue y aplique la configuración de Terraform.

  1. Conéctese a Cloud Shell para su arrendamiento.
  2. Clone el repositorio oci-obp-extension de GitHub.
    git clone https://github.com/oracle-quickstart/oci-obp-extension.git
  3. Cambie al directorio de Terraform.
    cd oci-obp-extension/terraform
  4. Abra el archivo terraform.tvars en un editor de texto y sustituya los valores de marcador de posición por los valores reales.
  5. Borre las variables de entorno OCI_AUTH y OCI_use_obo_token.
    unset OCI_AUTH OCI_use_obo_token
  6. Prepare el directorio para la configuración de Terraform.
    terraform init
  7. Vea y verifique lo que creará la configuración de Terraform.
    terraform plan
  8. Aplique la configuración de Terraform.
    terraform apply -auto-approve
  9. Registre la información de gateway de API que se muestra cuando finaliza la configuración de Terraform. Debe realizar un registro de los siguientes valores: OBP_Event_Subscribe_Callback y use_to_extract_ssl_certificate.

Suscribirse a eventos de cadena de bloques

Registre una URL de devolución de llamada para que el gateway de API pueda recibir eventos de cadena de bloques.

La URL que se registra es la URL de devolución de llamada de gateway de API que se mostró cuando se completó el proceso de Terraform. Tiene el formato https://joh5rb...fuxy.apigateway.us-ashburn-1.oci.customer-oci.com/obpevents/callback. También necesitará el punto final de despliegue de gateway de API, que se mostró como use_to_extract_ssl_certificate cuando se completó el proceso de Terraform. Tiene el formato joh5rb...fuxy.apigateway.us-ashburn-1.oci.customer-oci.com:443

También necesita el punto final de la API de REST de Oracle Blockchain Platform para suscribirse a eventos de chaincode. Tiene el siguiente formato:

https://<rest_server_url:port/restproxy#>/bcsgw/rest/v1/event/subscribe

<rest_server_url:port/restproxy#> es la URL del proxy REST que se muestra en la consola de Oracle Blockchain Platform. En la consola, abra la página Nodes y busque el nodo de proxy REST. En la columna Ruta, se muestra una URL que incluye el puerto y un número de proxy REST.

Para suscribirse a eventos de cadena de bloques:

  1. Extraiga caCert para la devolución de llamada de gateway de API.
    1. Conéctese a Cloud Shell para su arrendamiento.
    2. Ejecute el siguiente comando.

      Sustituya el texto API_GATEWAY por el valor use_to_extract_ssl_certificate registrado anteriormente al final del paso de configuración de Terraform.

      echo | openssl s_client -showcerts \
      -connect API_GATEWAY 2>/dev/null | openssl x509 \
      -inform pem -out cert.pem;echo;echo;awk 'NF {sub(/\r/, ""); \
      printf "%s\\n",$0;}'  cert.pem;rm cert.pem;echo;echo
    3. Copie el certificado que se muestra. Asegúrese de incluir el texto -----BEGIN CERTIFICATE----- y -----END CERTIFICATE-----.
  2. Registre la función de relé con Oracle Blockchain Platform.

    Utilice la API de REST de cadena de bloques para registrar la función de relé. Utilice cURL, Postman u otra herramienta para POST la solicitud a la cadena de bloques. El cuerpo de la solicitud tiene el siguiente formato JSON:

    {
      "requests": [
        {
          "eventType": "chaincodeEvent",
          "callbackURL":"https://joh5rb...fuxy.apigateway.us-ashburn-1.oci.customer-oci.com/obpevents/callback",
          "expires": "10m",
          "chaincode": "obcs-cardealer",
          "eventName": "VehiclePartCreated",
          "callbackTlsCerts": {
              "caCert": "-----BEGIN CERTIFICATE-----\n ... \n-----END CERTIFICATE-----\n" 
          }
        }
      ]
      

    Asegúrese de utilizar los valores callbackURL, chaincode, eventName y caCert que se le aplican.

    Nota:

    Es posible que necesite varios intentos antes de obtener una respuesta correcta debido al tiempo necesario para crear y desplegar el contenedor de funciones.

Creación y Prueba del Consumidor de Eventos

El consumidor de eventos utiliza API de Kafka para autenticar y recuperar mensajes de cadena de bloques de la transmisión de Oracle Cloud Infrastructure.

Para crear un consumidor, necesita información para las siguientes cadenas:

  • USER-NAME: nombre de usuario.
  • AUTH-TOKEN: es el token que ha generado y copiado anteriormente.
  • TENANCY-NAME: puede encontrar este valor en la página Detalles de arrendamiento de la consola de OCI. Para abrir la página Detalles de arrendamiento, en el menú de navegación de la consola, seleccione Administración y, a continuación, haga clic en Detalles de arrendamiento.
  • STREAM-POOL-OCID: es el OCID del pool de flujos creado al ejecutar el script de Terraform. Para buscar este valor, abra el menú de navegación de la consola y vaya a Administración y haga clic en Flujo. Seleccione el pool de flujos y en la página que abre copie el OCID.
  • REGION: "us-ashburn-1". Su región puede ser diferente.
  • EVENT-NAME: es el mismo nombre de evento que utilizó anteriormente al registrar la URL de devolución de llamada para recibir eventos de cadena de bloques.

Utilice el siguiente ejemplo para probar y validar que el sistema funciona como se esperaba.

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class Consumer {

    public static void main(String[] args) throws Exception {
       
        Properties props = new Properties();
        props.put("bootstrap.servers", "streaming.REGION.oci.oraclecloud.com:9092");
        props.put("security.protocol", "SASL_SSL");
        props.put("sasl.mechanism", "PLAIN");
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"TENANCY-NAME/USER-NAME/STREAM-POOL-OCID\" password=\"AUTH-TOKEN\";");
        props.put("group.id", "group-0");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // Create a consumer and subscribe to it
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("EVENT-NAME"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s\n", 
                    record.offset(), record.key(), record.value());
        }     
    } 
}

Al iniciar el consumidor, consume todos los eventos existentes que ya se han introducido en el flujo.

Después de que el ejemplo funcione y demuestre que la solución funciona, está listo para crear una o más aplicaciones de consumidor a nivel de producción. Las aplicaciones se pueden escribir en JavaScript, Python u otro lenguaje.