Sviluppa e distribuisci il Relay dell'evento

Crea un'applicazione Oracle Functions che utilizza eventi blockchain e li indirizza a Oracle Streaming Service.

L'applicazione di relay degli eventi richiede credenziali per la connessione al servizio di streaming. Le credenziali vengono conservate in Oracle Cloud Infrastructure Vault. Le credenziali vengono memorizzate nel vault quando si esegue il codice Terraform. Le credenziali vengono generate quando si esegue la configurazione iniziale dell'ambiente OCI.

La funzione fornisce i servizi seguenti:

  1. Recuperare le credenziali del servizio di streaming dal vault.
  2. Decifrare le credenziali (il Vault memorizza i valori cifrati).
  3. Decodifica dalla stringa con codifica base64.
  4. Utilizzare le credenziali decodificate per connettersi al servizio di streaming utilizzando l'API compatibile con Kafka.
  5. Creare e popolare un oggetto Java denominato event dal messaggio evento JSON.
  6. Utilizzare l'API Kafka per inviare il messaggio di evento.

Crea il Relay dell'evento

Puoi creare la relay in qualsiasi linguaggio di programmazione, ma perché le API Kafka sono documentate in Java, è meglio usare Java.

La classe EventProducer avrà i seguenti metodi:

@FnConfiguration
public void config(RuntimeContext ctx) { ... }

private String decryptData(String cipherText) { ... }

public String handleRequest(Event event) { ... }

Nel file pom.xml sono necessarie anche le dipendenze seguenti:

<dependencies>
    <dependency>
        <groupId>com.oracle.oci.sdk</groupId>
        <artifactId>oci-java-sdk-keymanagement</artifactId>
        <version>1.12.5</version>
    </dependency>
    <dependency>
        <groupId>javax.activation</groupId>
        <artifactId>javax.activation-api</artifactId>
        <version>1.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.4.0</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.7.30</version>
    </dependency>
    <dependency>
        <groupId>com.fnproject.fn</groupId>
        <artifactId>api</artifactId>
        <version>${fdk.version}</version>
    </dependency>
    .
    .
    .

Assicurarsi di disporre dello strumento CLI fn installato. Per ulteriori informazioni, vedere https://github.com/fnproject/fn#quickstart.

  1. Creare un progetto fn e assegnargli un nome ragionevole, ad esempio obpeventsfunc.
    fn init obpeventsfunc --runtime java
  2. Sostituire HelloFunction.java con un file denominato EventProducer.java.

    Tutto il codice sarà in questa singola classe. Per creare la funzione, è necessario importare le seguenti classi:

    com.fnproject.fn.api.FnConfiguration
    com.fnproject.fn.api.RuntimeContext
    com.oracle.bmc.auth.AbstractAuthenticationDetailsProvider
    com.oracle.bmc.auth.ResourcePrincipalAuthenticationDetailsProvider
    com.oracle.bmc.keymanagement.KmsCryptoClient
    com.oracle.bmc.keymanagement.model.DecryptDataDetails
    com.oracle.bmc.keymanagement.requests.DecryptRequest
    com.oracle.bmc.keymanagement.responses.DecryptResponse
    org.apache.commons.codec.binary.Base64
    org.apache.kafka.clients.CommonClientConfigs
    org.apache.kafka.clients.producer.KafkaProducer
    org.apache.kafka.clients.producer.ProducerConfig
    org.apache.kafka.clients.producer.ProducerRecord
    org.apache.kafka.common.config.SaslConfigs
    org.apache.kafka.common.serialization.StringSerializer
    java.util.Properties
  3. Crea una classe interna in cui memorizzare il messaggio dell'evento dalla blockchain.

    È necessario un oggetto che corrisponda alla struttura del messaggio evento. Le tre definizioni di classe fanno parte della classe EventProducer. Questa classe viene utilizzata da Oracle Functions per deserializzare i dati JSON forniti da Oracle Blockchain Platform.

    public static class Event {
         public String eventType;
         public String subid;
         public String channel;
         public EventMsg eventMsg;
     
         @Override
         public String toString() {
             return eventMsg.payload.data;
         }
     }
     
     public static class EventMsg {
         public String chaincodeId;
         public String txId;
         public String eventName;
         public Payload payload;
     }
     
     public static class Payload {
         public String type;
         public String data;
     }
  4. Definire più variabili stringa a livello globale per contenere i valori di configurazione, che vengono popolati nel metodo config.

    A causa dell'annotazione @FnConfiguration, il sistema passa un oggetto RuntimeContext a questo metodo. Utilizzare quindi getConfigurationByKey per popolare le variabili globali. È necessaria una variabile per ciascuna delle chiavi di configurazione seguenti:

    BOOT_STRAP_SERVERS
    STREAM_OCID
    TENANT_NAME 
    USER_NAME
    AUTH_TOKEN
    KMS_ENDPOINT
    KMS_KEY_ID
    

    Ad esempio:

    @FnConfiguration
    public void config(RuntimeContext ctx) { 
        bootstrapServers = ctx.getConfigurationByKey("BOOT_STRAP_SERVERS").orElse("");
        streamOCID = ...
        .
        .
        .
    }
  5. Creare il metodo decryptData.

    Il metodo decryptData avrà l'aspetto dello snippet seguente. I valori per kmsEndpoint e kmsKeyId vengono popolati nel metodo config e corrispondono alle chiavi di configurazione KMS_ENDPOINT e KMS_KEY_ID. Tenere presente che il testo normale è codificato in Base64 e deve essere decodificato in Base64 prima di restituire il valore. Il metodo decryptData viene utilizzato dal metodo handleRequest durante l'impostazione delle credenziali di autenticazione Kafka.

    private String decryptData(String cipherText) {
     
        AbstractAuthenticationDetailsProvider provider = ResourcePrincipalAuthenticationDetailsProvider.builder().build();
        KmsCryptoClient cryptoClient = KmsCryptoClient.builder().endpoint(kmsEndpoint).build(provider);
     
        DecryptDataDetails decryptDataDetails = DecryptDataDetails.builder().keyId(kmsKeyId).ciphertext(cipherText).build();
        DecryptRequest decryptRequest = DecryptRequest.builder().decryptDataDetails(decryptDataDetails).build();
        DecryptResponse decryptResponse = cryptoClient.decrypt(decryptRequest);
     
        final String base64String = decryptResponse.getDecryptedData().getPlaintext();
        byte[] byteArray = Base64.decodeBase64(base64String.getBytes());
        String value = new String(byteArray);
     
        return value;
    }
    
  6. Creare il metodo handleRequest.

    Questo metodo recupera le credenziali per il servizio di streaming da Oracle Vault, quindi invia il messaggio dell'evento al servizio di streaming. Fa anche alcune "carta di lavoro": configurazione Kafka e recupero e decifrazione delle credenziali.

    Il metodo handleRequest utilizza un oggetto Event come unico parametro. L'oggetto viene creato automaticamente dal sistema Oracle Functions e popolato con i dati della notazione JSON proveniente dalla blockchain. Per questo motivo, la classe interna dell'evento deve essere mappata direttamente ai dati JSON forniti dalla blockchain.

    La prima cosa da fare è controllare il tipo di evento "test". Quando esegui la sottoscrizione agli eventi di blockchain, il primo evento è un evento di test che non desideri trasmettere ai consumatori di eventi.

    1. Controllare il tipo di evento "test".
      public String handleRequest(Event event) {
       
          if(event.eventType.equalsIgnoreCase("test")) {
              return "done";
          }
          .
          .
          .
      }
    2. Creare la stringa richiesta per la configurazione SASL del servizio di streaming.
          .
          .
          .
      
          String loginModule = "org.apache.kafka.common.security.plain.PlainLoginModule"
          String saslJassConfig = String.format(
              "%s required username=\"%s/%s/%s\" password=\"%s\";",
              loginModule,
              tenancyName,
              decryptData(userName),
              streamOCID,
              decryptData(authToken));
          .
          .
          .
      
    3. Creare un oggetto java.util.Properties e popolarlo con la configurazione.
          .
          .
          .
          Properties props = new Properties();
          props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG , bootstrapServers);
          props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG , StringSerializer.class.getName());
          props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , StringSerializer.class.getName());
          props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
          props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
          props.put(SaslConfigs.SASL_JAAS_CONFIG, saslJassConfig);
          props.put(ProducerConfig.RETRIES_CONFIG, 5);
          props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024 * 1024);
          .
          .
          .
      
    4. Registrare la configurazione e inviare il messaggio evento al servizio di streaming.
          .
          .
          .
          KafkaProducer<String,String> producer = new KafkaProducer<>(props);
          ProducerRecord<String,String> record = new ProducerRecord<>(event.eventMessage.eventName, event.toString());
      
          producer.send(record);
          producer.flush();
          producer.close();
      
  7. Indica al sistema di chiamare handleRequest ogni volta che viene inviato un messaggio per la relay al servizio di streaming.

    A tale scopo, modificare la proprietà cmd in func.yaml per puntare al metodo handleRequest nella classe EventProducer. Ad esempio:

    schema_version: 20180708
    name: obpeventsfunc
    version: 0.0.1
    runtime: java
    build_image: fnproject/fn-java-fdk-build:jdk11-1.0.105
    run_image: fnproject/fn-java-fdk:jre11-1.0.105
    cmd: producer.EventProducer::handleRequest

Build the Event Relay

Dopo aver creato la classe Java per il relay di eventi, puoi utilizzarla per creare l'immagine Docker che esegui il PUSH su Oracle Cloud Infrastructure Registry. Una volta creata l'immagine, devi contrassegnarla con le informazioni sull'area OCI e sullo spazio di nomi dello storage degli oggetti.

Prima di poter eseguire il push di un'immagine, è necessario utilizzare il comando docker tag per creare una copia dell'immagine di origine locale come nuova immagine (la nuova immagine in realtà è solo un riferimento all'immagine di origine esistente). Come nome della nuova immagine, specificare il percorso completamente qualificato della posizione di destinazione in Oracle Cloud Infrastructure Registry, dove si desidera eseguire il push dell'immagine.

Per la tag sono necessarie le seguenti informazioni:

  • Endpoint del registro Docker per la tua area geografica
  • Lo spazio di nomi dello storage degli oggetti

Per ottenere l'endpoint per la propria area, cercare la propria area nella tabella in Disponibilità per area. Ad esempio, l'endpoint del registro per US East (Ashburn) è https://iad.ocir.io.

Per trovare lo spazio di nomi dello storage degli oggetti:

  1. Nel menu di navigazione, fare clic su Amministrazione, quindi su Dettagli tenancy.
  2. Lo spazio di nomi di storage degli oggetti si trova nella sezione Impostazioni di storage degli oggetti
  1. Nella directory di livello superiore eseguire il comando build. La directory di livello superiore è quella creata quando si esegue il comando fn init.
    fn build
  2. Contrassegnare l'immagine con le informazioni sull'area geografica e sullo spazio di nomi dello storage degli oggetti OCI.

    Ad esempio, se l'endpoint per la tua area è https://iad.ocir.io e lo spazio di nomi dello storage degli oggetti è yzl1yzrddld7, il comando è il seguente, supponendo di utilizzare obpeventsfunc come nome del relay dell'evento:

    docker tag obpeventsfunc:0.0.1 iad.ocir.io/yzl1yzrddld7/obpeventsfunc:0.0.1

    Nota:

    Accertarsi di omettere le informazioni sul protocollo https:// dal comando.

Invia il resoconto dell'evento al registro

Dopo aver creato la relay di eventi e averla associata alla tag appropriata, puoi spostarla in Oracle Cloud Infrastructure Registry.

Quando si esegue il login a Oracle Cloud Infrastructure Registry, sono necessarie le informazioni riportate di seguito.
  • Endpoint API per l'area, ad esempio iad.ocir.io. È lo stesso valore utilizzato in precedenza per contrassegnare l'immagine Docker.
  • Spazio di nomi della tenancy: la stringa dello spazio di nomi dello storage degli oggetti generata automaticamente della tenancy (come mostrato nella pagina Informazioni sulla tenancy).
  • Nome utente: il nome utente in Oracle Cloud Infrastructure.
  • Token di autenticazione: il token creato in precedenza nella sezione Piano.
  1. Eseguire il login a Oracle Cloud Infrastructure Registry.
    docker login <region-key>.ocir.io

    Quando richiesto, immettere il nome utente nel formato <tenancy-namespace>/<username>. Ad esempio, yzl1yzrddld7/jdoe@example.com. Se la tenancy è federata con Oracle Identity Cloud Service, utilizzare il formato <tenancy-namespace>/oracleidentitycloudservice/<username>.

  2. Quando richiesto, immettere il token di autenticazione creato in precedenza.
  3. Premere l'immagine Docker dal computer a Oracle Cloud Infrastructure Registry.

    Ad esempio, se l'endpoint per la tua area è https://iad.ocir.io e lo spazio di nomi dello storage degli oggetti è yzl1yzrddld7, il comando è il seguente:

    docker push iad.ocir.io/yzl1yzrddld7/obpeventsfunc:0.0.1

Creare una chiave API

Generare una coppia di chiavi e caricare la chiave pubblica nel profilo utente di Oracle Cloud Infrastructure.

Questa operazione può essere eseguita in due modi, ma in questa sezione verrà utilizzato Cloud Shell poiché è possibile caricare in modo sicuro la chiave generata direttamente nell'account.

  1. Eseguire il login a Cloud Shell per la tenancy.
  2. Creare una directory per la coppia di chiavi
    mkdir ~/.oci
  3. Generare una chiave privata a 2048 bit e memorizzarla in un file denominato oci_api_key.pem.
    openssl genrsa -out ~/.oci/oci_api_key.pem 2048
  4. Rendere il file di sola lettura da parte dell'utente.
    chmod u=r,go= ~/.oci/oci_api_key.pem
  5. Generare una chiave pubblica e memorizzarla in un file denominato oci_api_key_public.pem.
    openssl rsa -pubout -in ~/.oci/oci_api_key.pem -out ~/.oci/oci_api_key_public.pem
  6. Caricare la chiave pubblica nel profilo utente.

    Nello snippit seguente, sostituire il valore --user-id con il proprio OCID e modificare il valore --region con il valore Region Key per la propria area.

    oci iam user api-key upload \
    --user-id oci.user.oc1..aaaaa ... z3goa \
    --key-file ~/.oci/oci_api_key_public.pem \
    --region IAD
  7. Nella risposta JSON individuare e registrare il valore dell'impronta digitale. Questo valore sarà necessario in seguito.

Applicare la configurazione di Terraform

Scaricare la configurazione Terraform dal repository GitHub, aggiornare il file delle variabili Terraform, quindi applicare la configurazione.

Prima di iniziare, verificare di disporre delle seguenti informazioni. Le informazioni sono richieste per il file terraform.tvars.
  • Area: l'identificativo dell'area. È possibile estrarlo dall'URL della console di Oracle Cloud Infrastructure. Ad esempio, se l'URL è https://console.us-ashburn-1.oraclecloud.com/compute/instances, l'identificativo area è us-ashburn-1. Se l'URL è https://cloud.oracle.com/compute/instances, è necessario cercare l'identificativo area in https://docs.oracle.com/iaas/Content/General/Concepts/regions.htm
  • OCID compartimento: l'OCID del compartimento che contiene le risorse per il progetto. Se hai seguito le istruzioni della sezione Plan, hai bisogno dell'OCID per il compartimento OBP_Events.
  • Impronta: l'impronta digitale della chiave API pubblica generata e caricata nel profilo in precedenza.
  • Chiave privata: il percorso completo e il nome file della chiave API privata generati in precedenza. Ad esempio, /home/opc/oci_api_key.pem. Non utilizzare ~ nel percorso.
  • OCID utente: è possibile ottenerlo dalla pagina Dettagli utente. Aprire il menu della console, andare a Identità e fare clic su Utenti. Fare clic sul proprio nome utente nell'elenco.
  • OCID tenancy: è possibile ottenerlo dalla pagina Dettagli tenancy. Aprire il menu della console, andare a Amministrazione e fare clic su Dettagli tenancy.

Una volta acquisite le informazioni, scaricare e applicare la configurazione Terraform.

  1. Eseguire il login a Cloud Shell per la tenancy.
  2. Duplicare il repository oci-obp-extension da GitHub.
    git clone https://github.com/oracle-quickstart/oci-obp-extension.git
  3. Spostarsi nella directory Terraform.
    cd oci-obp-extension/terraform
  4. Aprire il file terraform.tvars in un editor di testo e sostituire i valori segnaposto con i valori effettivi.
  5. Cancellare le variabili d'ambiente OCI_AUTH e OCI_use_obo_token.
    unset OCI_AUTH OCI_use_obo_token
  6. Preparare la directory per la configurazione Terraform.
    terraform init
  7. Visualizzare e verificare cosa creerà la configurazione Terraform.
    terraform plan
  8. Applicare la configurazione Terraform.
    terraform apply -auto-approve
  9. Registrare le informazioni sul gateway API visualizzate al termine della configurazione di Terraform. È necessario creare un record dei seguenti valori: OBP_Event_Subscribe_Callback e use_to_extract_ssl_certificate.

Iscriviti a eventi blockchain

Registra un URL di callback in modo che API Gateway possa ricevere eventi di blockchain.

L'URL registrato è l'URL di callback del gateway API visualizzato al termine del processo Terraform. Il formato è https://joh5rb...fuxy.apigateway.us-ashburn-1.oci.customer-oci.com/obpevents/callback. Inoltre, sarà necessario l'endpoint di distribuzione del gateway API, visualizzato come use_to_extract_ssl_certificate al termine del processo Terraform. Il formato è joh5rb...fuxy.apigateway.us-ashburn-1.oci.customer-oci.com:443

Inoltre, hai bisogno dell'endpoint API REST di Oracle Blockchain Platform per effettuare la sottoscrizione agli eventi relativi al codice concatenato. Ha il seguente modulo:

https://<rest_server_url:port/restproxy#>/bcsgw/rest/v1/event/subscribe

<rest_server_url:port/restproxy#> è l'URL per il proxy REST elencato nella console di Oracle Blockchain Platform. Nella console aprire la pagina Nodi e cercare il nodo proxy REST. Nella colonna Instradamento è elencato un URL che include la porta e un numero di proxy REST.

Per sottoscrivere eventi blockchain:

  1. Estrarre il file caCert per il callback del gateway API.
    1. Eseguire il login a Cloud Shell per la tenancy.
    2. Eseguire il comando seguente.

      Sostituire il testo API_GATEWAY con il valore use_to_extract_ssl_certificate registrato in precedenza alla fine del passo di configurazione di Terraform.

      echo | openssl s_client -showcerts \
      -connect API_GATEWAY 2>/dev/null | openssl x509 \
      -inform pem -out cert.pem;echo;echo;awk 'NF {sub(/\r/, ""); \
      printf "%s\\n",$0;}'  cert.pem;rm cert.pem;echo;echo
    3. Copiare il certificato visualizzato. Accertarsi di includere il testo -----BEGIN CERTIFICATE----- e -----END CERTIFICATE-----.
  2. Registra la funzione di relay con Oracle Blockchain Platform.

    Utilizza l'API REST della blockchain per registrare la funzione di relay. Utilizza cURL, Postman o un altro strumento per inviare di nuovo la richiesta alla blockchain. Il corpo della richiesta ha il formato JSON seguente:

    {
      "requests": [
        {
          "eventType": "chaincodeEvent",
          "callbackURL":"https://joh5rb...fuxy.apigateway.us-ashburn-1.oci.customer-oci.com/obpevents/callback",
          "expires": "10m",
          "chaincode": "obcs-cardealer",
          "eventName": "VehiclePartCreated",
          "callbackTlsCerts": {
              "caCert": "-----BEGIN CERTIFICATE-----\n ... \n-----END CERTIFICATE-----\n" 
          }
        }
      ]
      

    Assicurarsi di utilizzare i valori callbackURL, chaincode, eventName e caCert che si applicano all'utente.

    Nota:

    Potrebbero essere necessari diversi tentativi prima di ottenere una risposta positiva a causa del tempo necessario per creare e distribuire il contenitore delle funzioni.

Creazione e test del consumer di eventi

Il consumer di eventi utilizza le API Kafka per autenticare e recuperare i messaggi di blockchain dal servizio di streaming di Oracle Cloud Infrastructure.

Per creare un consumatore, sono necessarie informazioni per le seguenti stringhe:

  • USER-NAME: nome utente personale.
  • AUTH-TOKEN: il token generato e copiato in precedenza.
  • TENANCY-NAME: è possibile trovare questo valore nella pagina Dettagli tenancy della console OCI. Per aprire la pagina Dettagli tenancy, nel menu di navigazione della console selezionare Amministrazione, quindi fare clic su Dettagli tenancy.
  • STREAM-POOL-OCID: l'OCID del pool di flussi creato durante l'esecuzione dello script Terraform. Per trovare questo valore, aprire il menu di navigazione della console, andare a Amministrazione e fare clic su Streaming. Selezionare il pool di flussi e nella pagina che apre la copia dell'OCID.
  • REGION: "us-ashburn-1". La regione potrebbe essere diversa.
  • EVENT-NAME: questo è lo stesso nome di evento utilizzato in precedenza quando hai registrato l'URL di callback per ricevere gli eventi blockchain.

Utilizzare l'esempio seguente per verificare e verificare che il sistema funzioni come previsto.

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class Consumer {

    public static void main(String[] args) throws Exception {
       
        Properties props = new Properties();
        props.put("bootstrap.servers", "streaming.REGION.oci.oraclecloud.com:9092");
        props.put("security.protocol", "SASL_SSL");
        props.put("sasl.mechanism", "PLAIN");
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"TENANCY-NAME/USER-NAME/STREAM-POOL-OCID\" password=\"AUTH-TOKEN\";");
        props.put("group.id", "group-0");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // Create a consumer and subscribe to it
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("EVENT-NAME"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s\n", 
                    record.offset(), record.key(), record.value());
        }     
    } 
}

Quando avvii il consumatore, consuma tutti gli eventi esistenti già inseriti nel flusso.

Dopo che l'esempio funziona e dimostra che la soluzione funziona, è possibile creare una o più applicazioni consumer a livello di produzione. Le applicazioni possono essere scritte in linguaggio JavaScript, Python o altro.