Développer et déployer le relais d'événement

Créez une application Oracle Functions qui consomme des événements de chaîne de blocs et les relaie dans Oracle Streaming Service.

L'application de relais d'événements a besoin d'informations d'identification pour se connecter au service de transmission en continu. Les informations d'identification sont conservées dans Oracle Cloud Infrastructure Vault. Les informations d'identification sont stockées dans Vault lorsque vous exécutez le code Terraform. Les informations d'identification sont générées lorsque vous effectuez la configuration initiale de votre environnement OCI.

La fonction fournit les services suivants :

  1. Extrayez les informations d'identification du service Streaming à partir de Vault.
  2. Déchiffrez les informations d'identification (Vault stocke les valeurs cryptées).
  3. Décoder à partir de la chaîne encodée base64.
  4. Utilisez les informations d'identification décodées pour vous connecter au service Streaming à l'aide de l'API compatible Kafka.
  5. Créez et remplissez un objet Java appelé event à partir du message d'événement JSON.
  6. Utilisez l'API Kafka pour envoyer le message d'événement.

Créer le relais d'événement

Vous pouvez créer le relais dans n'importe quel langage de programmation, mais comme les API Kafka sont documentées dans Java, il est préférable d'utiliser Java.

La classe EventProducer aura les méthodes suivantes :

@FnConfiguration
public void config(RuntimeContext ctx) { ... }

private String decryptData(String cipherText) { ... }

public String handleRequest(Event event) { ... }

Les dépendances suivantes sont également nécessaires dans le fichier pom.xml :

<dependencies>
    <dependency>
        <groupId>com.oracle.oci.sdk</groupId>
        <artifactId>oci-java-sdk-keymanagement</artifactId>
        <version>1.12.5</version>
    </dependency>
    <dependency>
        <groupId>javax.activation</groupId>
        <artifactId>javax.activation-api</artifactId>
        <version>1.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.4.0</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.7.30</version>
    </dependency>
    <dependency>
        <groupId>com.fnproject.fn</groupId>
        <artifactId>api</artifactId>
        <version>${fdk.version}</version>
    </dependency>
    .
    .
    .

Assurez-vous que l'outil CLI fn est installé. Pour plus d'informations, reportez-vous à https://github.com/fnproject/fn#quickstart.

  1. Créez un projet fn et attribuez-lui un nom raisonnable, tel que obpeventsfunc.
    fn init obpeventsfunc --runtime java
  2. Remplacez HelloFunction.java par un fichier nommé EventProducer.java.

    Tout le code se trouve dans cette classe unique. Pour construire la fonction, vous devez importer les classes suivantes :

    com.fnproject.fn.api.FnConfiguration
    com.fnproject.fn.api.RuntimeContext
    com.oracle.bmc.auth.AbstractAuthenticationDetailsProvider
    com.oracle.bmc.auth.ResourcePrincipalAuthenticationDetailsProvider
    com.oracle.bmc.keymanagement.KmsCryptoClient
    com.oracle.bmc.keymanagement.model.DecryptDataDetails
    com.oracle.bmc.keymanagement.requests.DecryptRequest
    com.oracle.bmc.keymanagement.responses.DecryptResponse
    org.apache.commons.codec.binary.Base64
    org.apache.kafka.clients.CommonClientConfigs
    org.apache.kafka.clients.producer.KafkaProducer
    org.apache.kafka.clients.producer.ProducerConfig
    org.apache.kafka.clients.producer.ProducerRecord
    org.apache.kafka.common.config.SaslConfigs
    org.apache.kafka.common.serialization.StringSerializer
    java.util.Properties
  3. Créez une classe interne pour contenir le message d'événement de la chaîne de blocs.

    Vous avez besoin d'un objet qui correspond à la structure du message d'événement. Les trois définitions de classe font partie de la classe EventProducer. Cette classe est utilisée par Oracle Functions pour désérialiser les données JSON provenant d'Oracle Blockchain Platform.

    public static class Event {
         public String eventType;
         public String subid;
         public String channel;
         public EventMsg eventMsg;
     
         @Override
         public String toString() {
             return eventMsg.payload.data;
         }
     }
     
     public static class EventMsg {
         public String chaincodeId;
         public String txId;
         public String eventName;
         public Payload payload;
     }
     
     public static class Payload {
         public String type;
         public String data;
     }
  4. Définissez globalement plusieurs variables String afin de conserver les valeurs de configuration, qui sont renseignées dans la méthode config.

    En raison de l'annotation @FnConfiguration, le système transmet un objet RuntimeContext à cette méthode. Utilisez ensuite getConfigurationByKey pour remplir les variables globales. Vous avez besoin d'une variable pour chacune des clés de configuration suivantes :

    BOOT_STRAP_SERVERS
    STREAM_OCID
    TENANT_NAME 
    USER_NAME
    AUTH_TOKEN
    KMS_ENDPOINT
    KMS_KEY_ID
    

    Par exemple :

    @FnConfiguration
    public void config(RuntimeContext ctx) { 
        bootstrapServers = ctx.getConfigurationByKey("BOOT_STRAP_SERVERS").orElse("");
        streamOCID = ...
        .
        .
        .
    }
  5. Créez la méthode decryptData.

    La méthode decryptData ressemble au fragment de code suivant. Les valeurs pour kmsEndpoint et kmsKeyId sont renseignées dans la méthode config et correspondent aux clés de configuration KMS_ENDPOINT et KMS_KEY_ID. Notez que le texte brut est encodé en Base64 et qu'il doit être décodé en Base64 avant de renvoyer la valeur. La méthode decryptData est utilisée par la méthode handleRequest lors de la définition des informations d'identification d'authentification Kafka.

    private String decryptData(String cipherText) {
     
        AbstractAuthenticationDetailsProvider provider = ResourcePrincipalAuthenticationDetailsProvider.builder().build();
        KmsCryptoClient cryptoClient = KmsCryptoClient.builder().endpoint(kmsEndpoint).build(provider);
     
        DecryptDataDetails decryptDataDetails = DecryptDataDetails.builder().keyId(kmsKeyId).ciphertext(cipherText).build();
        DecryptRequest decryptRequest = DecryptRequest.builder().decryptDataDetails(decryptDataDetails).build();
        DecryptResponse decryptResponse = cryptoClient.decrypt(decryptRequest);
     
        final String base64String = decryptResponse.getDecryptedData().getPlaintext();
        byte[] byteArray = Base64.decodeBase64(base64String.getBytes());
        String value = new String(byteArray);
     
        return value;
    }
    
  6. Créez la méthode handleRequest.

    Cette méthode extrait les informations d'identification du service Streaming à partir d'Oracle Vault, puis envoie le message d'événement au service Streaming. Il effectue également une opération sur papier : configuration Kafka et extraction et décryptage des informations d'identification.

    La méthode handleRequest prend un objet Event comme seul paramètre. L'objet est créé automatiquement par le système Oracle Functions et alimenté avec les données du JSON provenant de Blockchain. Pour cette raison, la classe interne Evénement doit être mise en correspondance directement avec les données JSON provenant de Blockchain.

    La première chose à faire est de vérifier le type d'événement "test". Lorsque vous vous abonnez à des événements de blockchain, le premier événement est un événement de test que vous ne souhaitez pas relayer aux consommateurs d’événements.

    1. Recherchez le type d'événement "test".
      public String handleRequest(Event event) {
       
          if(event.eventType.equalsIgnoreCase("test")) {
              return "done";
          }
          .
          .
          .
      }
    2. Créez la chaîne requise pour la configuration SASL de service de transmission en continu.
          .
          .
          .
      
          String loginModule = "org.apache.kafka.common.security.plain.PlainLoginModule"
          String saslJassConfig = String.format(
              "%s required username=\"%s/%s/%s\" password=\"%s\";",
              loginModule,
              tenancyName,
              decryptData(userName),
              streamOCID,
              decryptData(authToken));
          .
          .
          .
      
    3. Créez un objet java.util.Properties et remplissez-le avec la configuration.
          .
          .
          .
          Properties props = new Properties();
          props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG , bootstrapServers);
          props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG , StringSerializer.class.getName());
          props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , StringSerializer.class.getName());
          props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
          props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
          props.put(SaslConfigs.SASL_JAAS_CONFIG, saslJassConfig);
          props.put(ProducerConfig.RETRIES_CONFIG, 5);
          props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024 * 1024);
          .
          .
          .
      
    4. Enregistrez la configuration et envoyez le message d'événement au service Streaming.
          .
          .
          .
          KafkaProducer<String,String> producer = new KafkaProducer<>(props);
          ProducerRecord<String,String> record = new ProducerRecord<>(event.eventMessage.eventName, event.toString());
      
          producer.send(record);
          producer.flush();
          producer.close();
      
  7. Demandez au système d'appeler handleRequest chaque fois qu'un message est envoyé pour le relais vers le service de transmission en continu.

    Pour ce faire, modifiez la propriété cmd dans func.yaml pour qu'elle pointe vers la méthode handleRequest dans la classe EventProducer. Par exemple :

    schema_version: 20180708
    name: obpeventsfunc
    version: 0.0.1
    runtime: java
    build_image: fnproject/fn-java-fdk-build:jdk11-1.0.105
    run_image: fnproject/fn-java-fdk:jre11-1.0.105
    cmd: producer.EventProducer::handleRequest

Créer le relais d'événement

Une fois que vous avez créé la classe Java pour le relais d'événements, utilisez-la pour créer l'image Docker que vous propagez vers Oracle Cloud Infrastructure Registry. Une fois l'image créée, vous devez la baliser avec des informations sur la région OCI et l'espace de noms Object Storage.

Pour pouvoir propager une image, vous devez d'abord utiliser la commande docker tag afin de créer une copie de l'image source locale en tant que nouvelle image (la nouvelle image est en fait simplement une référence à l'image source existante). En tant que nom de la nouvelle image, indiquez le chemin qualifié complet de l'emplacement cible dans Oracle Cloud Infrastructure Registry vers lequel propager l'image.

Pour la balise, vous devez disposer des informations suivantes :

  • Adresse de registre Docker pour votre région
  • Votre espace de noms Object Storage

Pour obtenir l'adresse de votre région, recherchez la région dans la table Disponibilité par région. Par exemple, l'adresse de registre pour Est des Etats-Unis (Ashburn) est https://iad.ocir.io.

Pour trouver votre espace de noms Object Storage, procédez comme suit :

  1. Dans le menu de navigation, cliquez sur Administration, puis sur Informations sur la location.
  2. L'espace de noms Obect Storage figure dans la section Paramètres Object Storage
  1. Dans le répertoire de niveau supérieur, exécutez la commande build. Le répertoire de niveau supérieur est celui que vous avez créé lors de l'exécution de la commande fn init.
    fn build
  2. Baliser l'image à l'aide des informations de région OCI et d'espace de noms Object Storage.

    Par exemple, si l'adresse de votre région est https://iad.ocir.io et que votre espace de noms Object Storage est yzl1yzrddld7, la commande est la suivante, en supposant que vous utilisez obpeventsfunc comme nom du relais d'événement :

    docker tag obpeventsfunc:0.0.1 iad.ocir.io/yzl1yzrddld7/obpeventsfunc:0.0.1

    Remarque :

    Veillez à omettre les informations du protocole https:// de la commande.

Transmettre le relais d'événement au registre

Une fois le relais d'événement créé et doté de la balise appropriée, vous pouvez le propager vers Oracle Cloud Infrastructure Registry.

Vous avez besoin des informations suivantes lors de la connexion à Oracle Cloud Infrastructure Registry :
  • Adresse d'API pour votre région : par exemple, iad.ocir.io. Il s'agit de la même valeur que celle que vous avez utilisée précédemment pour baliser l'image Docker.
  • Espace de noms de location : chaîne d'espace de noms Object Storage générée automatiquement pour votre location (comme indiqué sur la page Informations sur la location).
  • Nom utilisateur : nom utilisateur dans Oracle Cloud Infrastructure.
  • Jeton d'authentification : jeton que vous avez créé précédemment dans la section Plan.
  1. Connectez-vous à Oracle Cloud Infrastructure Registry.
    docker login <region-key>.ocir.io

    Lorsque vous y êtes invité, entrez votre nom utilisateur au format <tenancy-namespace>/<username>. Par exemple : yzl1yzrddld7/jdoe@example.com. Si votre location est fédérée avec Oracle Identity Cloud Service, utilisez le format <tenancy-namespace>/oracleidentitycloudservice/<username>.

  2. Entrez le jeton d'authentification que vous avez créé précédemment.
  3. Propagez l'image Docker de votre ordinateur vers Oracle Cloud Infrastructure Registry.

    Par exemple, si l'adresse de votre région est https://iad.ocir.io et que votre espace de noms Object Storage est yzl1yzrddld7, la commande est la suivante :

    docker push iad.ocir.io/yzl1yzrddld7/obpeventsfunc:0.0.1

Création d'une clé d'API

Générez une paire de clés et téléchargez la clé publique vers votre profil utilisateur Oracle Cloud Infrastructure.

Vous pouvez effectuer cette opération de deux façons, mais ici, nous allons utiliser Cloud Shell car vous pouvez télécharger la clé générée directement sur votre compte en toute sécurité.

  1. Connectez-vous à Cloud Shell pour votre location.
  2. Créer un répertoire pour la paire de clés
    mkdir ~/.oci
  3. Générez une clé privée de 2048 bits et stockez-la dans un fichier appelé oci_api_key.pem.
    openssl genrsa -out ~/.oci/oci_api_key.pem 2048
  4. Mettez le fichier en lecture seule par votre utilisateur.
    chmod u=r,go= ~/.oci/oci_api_key.pem
  5. Générez une clé publique et enregistrez-la dans un fichier nommé oci_api_key_public.pem.
    openssl rsa -pubout -in ~/.oci/oci_api_key.pem -out ~/.oci/oci_api_key_public.pem
  6. Téléchargez la clé publique vers votre profil utilisateur.

    Dans le fragment de code suivant, remplacez la valeur --user-id par votre propre OCID et remplacez la valeur --region par la valeur Region Key pour votre propre région.

    oci iam user api-key upload \
    --user-id oci.user.oc1..aaaaa ... z3goa \
    --key-file ~/.oci/oci_api_key_public.pem \
    --region IAD
  7. Dans la réponse JSON, localisez et enregistrez la valeur de l'empreinte. Vous aurez besoin de cette valeur ultérieurement.

Application de la configuration Terraform

Téléchargez la configuration Terraform à partir du référentiel GitHub, mettez à jour le fichier de variables Terraform, puis appliquez la configuration.

Avant de commencer, assurez-vous que les informations suivantes sont disponibles. Les informations sont requises pour le fichier terraform.tvars.
  • Région : identificateur de région. Vous pouvez l'extraire à partir de l'URL de la console Oracle Cloud Infrastructure. Par exemple, si l'URL est https://console.us-ashburn-1.oraclecloud.com/compute/instances, l'identificateur de votre région est us-ashburn-1. Si votre URL est https://cloud.oracle.com/compute/instances, vous devez rechercher votre identifiant de région dans https://docs.oracle.com/iaas/Content/General/Concepts/regions.htm.
  • OCID de compartiment : OCID du compartiment contenant les ressources du projet. Si vous avez suivi les instructions de la section Plan, vous avez besoin de l'OCID pour le compartiment OBP_Events.
  • Empreinte - empreinte de la clé d'API publique que vous avez générée et téléchargée précédemment dans votre profil.
  • Clé privée : chemin complet et nom de fichier de la clé d'API privée que vous avez générée précédemment. Par exemple, /home/opc/oci_api_key.pem. N'utilisez pas ~ dans le chemin.
  • OCID utilisateur : vous pouvez l'obtenir à partir de la page Détails de l'utilisateur. Ouvrez le menu de la console, accédez à Identité et cliquez sur Utilisateurs. Cliquez sur votre nom utilisateur dans la liste.
  • OCID de location : vous pouvez l'obtenir à partir de la page Détails de location. Ouvrez le menu de la console, accédez à Administration et cliquez sur Détails de location.

Lorsque vous disposez des informations, téléchargez et appliquez la configuration Terraform.

  1. Connectez-vous à Cloud Shell pour votre location.
  2. Clonez le référentiel oci-obp-extension à partir de GitHub.
    git clone https://github.com/oracle-quickstart/oci-obp-extension.git
  3. Accédez au répertoire Terraform.
    cd oci-obp-extension/terraform
  4. Ouvrez le fichier terraform.tvars dans un éditeur de texte et remplacez les valeurs de réserve par les valeurs réelles.
  5. Effacez les variables d'environnement OCI_AUTH et OCI_use_obo_token.
    unset OCI_AUTH OCI_use_obo_token
  6. Préparez le répertoire pour la configuration Terraform.
    terraform init
  7. Affichez et vérifiez la création de la configuration Terraform.
    terraform plan
  8. Appliquez la configuration Terraform.
    terraform apply -auto-approve
  9. Enregistrez les informations sur la passerelle d'API qui s'affichent lorsque la configuration Terraform est terminée. Vous devez enregistrer les valeurs suivantes : OBP_Event_Subscribe_Callback et use_to_extract_ssl_certificate.

S’abonner aux événements Blockchain

Enregistrez une URL de rappel afin qu'API Gateway puisse recevoir des événements de chaîne de blocs.

L'URL que vous enregistrez est l'URL de rappel de la passerelle d'API qui s'affiche lorsque le processus Terraform est terminé. Elle se présente sous la forme https://joh5rb...fuxy.apigateway.us-ashburn-1.oci.customer-oci.com/obpevents/callback. Vous aurez également besoin de l'adresse de déploiement de passerelle d'API, affichée sous la forme use_to_extract_ssl_certificate lorsque le processus Terraform sera terminé. Elle se présente sous la forme joh5rb...fuxy.apigateway.us-ashburn-1.oci.customer-oci.com:443

Vous avez également besoin de l'adresse d'API REST Oracle Blockchain Platform pour vous abonner à des événements de code chaîne. Il se présente comme suit :

https://<rest_server_url:port/restproxy#>/bcsgw/rest/v1/event/subscribe

<rest_server_url:port/restproxy#> est l'URL du proxy REST répertoriée dans la console Oracle Blockchain Platform. Dans la console, ouvrez la page Noeuds et recherchez le noeud proxy REST. Dans la colonne Route, une URL incluant le port et un numéro de proxy REST est répertoriée.

Pour vous abonner aux événements blockchain :

  1. Extrayez la valeur caCert pour le rappel de passerelle d'API.
    1. Connectez-vous à Cloud Shell pour votre location.
    2. Exécutez la commande suivante.

      Remplacez le texte API_GATEWAY par la valeur use_to_extract_ssl_certificate que vous avez enregistrée précédemment à la fin de l'étape de configuration Terraform.

      echo | openssl s_client -showcerts \
      -connect API_GATEWAY 2>/dev/null | openssl x509 \
      -inform pem -out cert.pem;echo;echo;awk 'NF {sub(/\r/, ""); \
      printf "%s\\n",$0;}'  cert.pem;rm cert.pem;echo;echo
    3. Copiez le certificat affiché. Veillez à inclure le texte -----BEGIN CERTIFICATE----- et -----END CERTIFICATE-----.
  2. Enregistrez la fonction relais auprès d'Oracle Blockchain Platform.

    Utilisez l'API REST Blockchain pour enregistrer la fonction de relais. Utilisez cURL, Postman ou un autre outil pour POST la demande sur la blockchain. Le corps de la demande présente le format JSON suivant :

    {
      "requests": [
        {
          "eventType": "chaincodeEvent",
          "callbackURL":"https://joh5rb...fuxy.apigateway.us-ashburn-1.oci.customer-oci.com/obpevents/callback",
          "expires": "10m",
          "chaincode": "obcs-cardealer",
          "eventName": "VehiclePartCreated",
          "callbackTlsCerts": {
              "caCert": "-----BEGIN CERTIFICATE-----\n ... \n-----END CERTIFICATE-----\n" 
          }
        }
      ]
      

    Veillez à utiliser les valeurs callbackURL, chaincode, eventName et caCert qui vous sont applicables.

    Remarque :

    Vous pouvez avoir besoin de plusieurs tentatives pour obtenir une réponse de réussite en raison du temps nécessaire à la création et au déploiement du conteneur de fonctions.

Création et test du consommateur d'événements

Le consommateur d'événements utilise des API Kafka pour authentifier et extraire des messages de chaîne de blocs à partir d'Oracle Cloud Infrastructure Streaming.

Pour créer un destinataire, vous devez fournir des informations sur les chaînes suivantes :

  • USER-NAME : nom utilisateur.
  • AUTH-TOKEN : jeton que vous avez généré et copié précédemment.
  • TENANCY-NAME : Cette valeur est disponible sur la page Détails de location de la console OCI. Pour ouvrir la page Détails de location, dans le menu de navigation de la console, sélectionnez Administration, puis cliquez sur Détails de location.
  • STREAM-POOL-OCID : OCID du pool de flux de données créé lors de l'exécution du script Terraform. Pour trouver cette valeur, ouvrez le menu de navigation de la console, accédez à Administration et cliquez sur Traitement en continu. Sélectionnez le pool de flux de données et, sur la page qui s'ouvre, copiez l'OCID.
  • REGION : "us-ashburn-1". Votre région peut être différente.
  • EVENT-NAME : Il s'agit du même nom d'événement que celui que vous avez utilisé précédemment lorsque vous avez inscrit l'URL de rappel pour recevoir des événements de chaîne de blocs.

Utilisez l'exemple suivant pour tester et valider le fonctionnement attendu du système.

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class Consumer {

    public static void main(String[] args) throws Exception {
       
        Properties props = new Properties();
        props.put("bootstrap.servers", "streaming.REGION.oci.oraclecloud.com:9092");
        props.put("security.protocol", "SASL_SSL");
        props.put("sasl.mechanism", "PLAIN");
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"TENANCY-NAME/USER-NAME/STREAM-POOL-OCID\" password=\"AUTH-TOKEN\";");
        props.put("group.id", "group-0");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // Create a consumer and subscribe to it
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("EVENT-NAME"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s\n", 
                    record.offset(), record.key(), record.value());
        }     
    } 
}

Lorsque vous démarrez le consommateur, il consomme tous les événements existants qui ont déjà été propagés dans le flux.

Une fois que l'exemple fonctionne et montre que la solution fonctionne, vous êtes prêt à créer une ou plusieurs applications de consommation au niveau de la production. Les applications peuvent être écrites en JavaScript, Python ou dans un autre langage.