Ereignisrelais entwickeln und bereitstellen

Erstellen Sie eine Oracle Functions-Anwendung, die Blockchain-Ereignisse verbraucht und an Oracle Streaming Service weiterleitet.

Die Ereignisrelais-Anwendung benötigt Zugangsdaten für die Verbindung mit dem Streaming-Service. Die Zugangsdaten werden in Oracle Cloud Infrastructure Vault gespeichert. Die Zugangsdaten werden in Vault gespeichert, wenn Sie den Terraform-Code ausführen. Die Zugangsdaten werden beim Ausführen der anfänglichen Konfiguration der OCI-Umgebung generiert.

Die Funktion stellt die folgenden Services bereit:

  1. Streaming Service-Zugangsdaten aus Vault abrufen.
  2. Entschlüsseln Sie die Zugangsdaten (Vault speichert verschlüsselte Werte).
  3. Decode aus base64-codierter Zeichenfolge.
  4. Verwenden Sie die decodierten Zugangsdaten, um mit der Kafka-kompatiblen API eine Verbindung zum Streaming-Service herzustellen.
  5. Erstellen und füllen Sie ein Java-Objekt mit dem Namen event aus der JSON-Ereignismeldung.
  6. Verwenden Sie die Kafka-API, um die Ereignisnachricht zu senden.

Ereignisverschiebung erstellen

Sie können das Relay in jeder Programmiersprache erstellen. Da Kafka-APIs jedoch in Java dokumentiert sind, ist es am besten, Java zu verwenden.

Die Klasse EventProducer verfügt über die folgenden Methoden:

@FnConfiguration
public void config(RuntimeContext ctx) { ... }

private String decryptData(String cipherText) { ... }

public String handleRequest(Event event) { ... }

Die folgenden Abhängigkeiten sind auch in der Datei pom.xml erforderlich:

<dependencies>
    <dependency>
        <groupId>com.oracle.oci.sdk</groupId>
        <artifactId>oci-java-sdk-keymanagement</artifactId>
        <version>1.12.5</version>
    </dependency>
    <dependency>
        <groupId>javax.activation</groupId>
        <artifactId>javax.activation-api</artifactId>
        <version>1.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.4.0</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.7.30</version>
    </dependency>
    <dependency>
        <groupId>com.fnproject.fn</groupId>
        <artifactId>api</artifactId>
        <version>${fdk.version}</version>
    </dependency>
    .
    .
    .

Stellen Sie sicher, dass das fn CLI-Tool installiert ist. Weitere Informationen finden Sie unter https://github.com/fnproject/fn#quickstart.

  1. Erstellen Sie ein fn-Projekt, und geben Sie ihm einen geeigneten Namen wie obpeventsfunc.
    fn init obpeventsfunc --runtime java
  2. Ersetzen Sie HelloFunction.java durch eine Datei mit dem Namen EventProducer.java.

    Der gesamte Code befindet sich in dieser einzelnen Klasse. Um die Funktion zu erstellen, müssen Sie die folgenden Klassen importieren:

    com.fnproject.fn.api.FnConfiguration
    com.fnproject.fn.api.RuntimeContext
    com.oracle.bmc.auth.AbstractAuthenticationDetailsProvider
    com.oracle.bmc.auth.ResourcePrincipalAuthenticationDetailsProvider
    com.oracle.bmc.keymanagement.KmsCryptoClient
    com.oracle.bmc.keymanagement.model.DecryptDataDetails
    com.oracle.bmc.keymanagement.requests.DecryptRequest
    com.oracle.bmc.keymanagement.responses.DecryptResponse
    org.apache.commons.codec.binary.Base64
    org.apache.kafka.clients.CommonClientConfigs
    org.apache.kafka.clients.producer.KafkaProducer
    org.apache.kafka.clients.producer.ProducerConfig
    org.apache.kafka.clients.producer.ProducerRecord
    org.apache.kafka.common.config.SaslConfigs
    org.apache.kafka.common.serialization.StringSerializer
    java.util.Properties
  3. Erstellen Sie eine innere Klasse für die Ereignisnachricht aus der Blockchain.

    Sie benötigen ein Objekt, das der Struktur der Ereignisnachricht entspricht. Die drei Klassendefinitionen sind Teil der Klasse EventProducer. Diese Klasse wird von Oracle Functions verwendet, um die JSON-Daten aus Oracle Blockchain Platform zu deserialisieren.

    public static class Event {
         public String eventType;
         public String subid;
         public String channel;
         public EventMsg eventMsg;
     
         @Override
         public String toString() {
             return eventMsg.payload.data;
         }
     }
     
     public static class EventMsg {
         public String chaincodeId;
         public String txId;
         public String eventName;
         public Payload payload;
     }
     
     public static class Payload {
         public String type;
         public String data;
     }
  4. Definieren Sie mehrere Zeichenfolgenvariablen global, um die Konfigurationswerte aufzunehmen, die in der Methode config aufgefüllt werden.

    Aufgrund der Annotation @FnConfiguration übergibt das System ein RuntimeContext-Objekt an diese Methode. Danach verwenden Sie getConfigurationByKey, um die globalen Variablen aufzufüllen. Für jeden der folgenden Konfigurationsschlüssel ist eine Variable erforderlich:

    BOOT_STRAP_SERVERS
    STREAM_OCID
    TENANT_NAME 
    USER_NAME
    AUTH_TOKEN
    KMS_ENDPOINT
    KMS_KEY_ID
    

    Beispiel:

    @FnConfiguration
    public void config(RuntimeContext ctx) { 
        bootstrapServers = ctx.getConfigurationByKey("BOOT_STRAP_SERVERS").orElse("");
        streamOCID = ...
        .
        .
        .
    }
  5. Erstellen Sie die Methode decryptData.

    Die Methode decryptData sieht wie das folgende Snippet aus. Die Werte für kmsEndpoint und kmsKeyId werden in der Methode config aufgefüllt und entsprechen den Konfigurationsschlüsseln KMS_ENDPOINT und KMS_KEY_ID. Beachten Sie, dass der Nur-Text Base64-codiert ist und vor der Rückgabe des Wertes Base64 decodiert werden muss. Die Methode decryptData wird von der Methode handleRequest verwendet, wenn Kafka-Authentifizierungszugangsdaten festgelegt werden.

    private String decryptData(String cipherText) {
     
        AbstractAuthenticationDetailsProvider provider = ResourcePrincipalAuthenticationDetailsProvider.builder().build();
        KmsCryptoClient cryptoClient = KmsCryptoClient.builder().endpoint(kmsEndpoint).build(provider);
     
        DecryptDataDetails decryptDataDetails = DecryptDataDetails.builder().keyId(kmsKeyId).ciphertext(cipherText).build();
        DecryptRequest decryptRequest = DecryptRequest.builder().decryptDataDetails(decryptDataDetails).build();
        DecryptResponse decryptResponse = cryptoClient.decrypt(decryptRequest);
     
        final String base64String = decryptResponse.getDecryptedData().getPlaintext();
        byte[] byteArray = Base64.decodeBase64(base64String.getBytes());
        String value = new String(byteArray);
     
        return value;
    }
    
  6. Erstellen Sie die Methode handleRequest.

    Diese Methode ruft die Zugangsdaten für den Streaming-Service aus Oracle Vault ab und sendet die Ereignisnachricht dann an den Streaming-Service. Es macht auch einige "Paperwork": Kafka-Konfiguration und Abruf und Entschlüsselung von Zugangsdaten.

    Die Methode handleRequest akzeptiert ein Ereignisobjekt als einzigen Parameter. Das Objekt wird automatisch vom Oracle Functions-System erstellt und mit Daten aus der JSON aus der Blockchain aufgefüllt. Aus diesem Grund muss die innere Klasse des Ereignisses direkt den JSON-Daten aus Blockchain zugeordnet werden.

    Sie müssen zuerst den Ereignistyp "Test" prüfen. Wenn Sie Blockchain-Ereignisse abonnieren, ist das erste Ereignis ein Testereignis, das Sie nicht an Event-Consumer weiterleiten möchten.

    1. Prüfen Sie den Ereignistyp "Test".
      public String handleRequest(Event event) {
       
          if(event.eventType.equalsIgnoreCase("test")) {
              return "done";
          }
          .
          .
          .
      }
    2. Erstellen Sie die Zeichenfolge, die für die Streaming Service-SASL-Konfiguration erforderlich ist.
          .
          .
          .
      
          String loginModule = "org.apache.kafka.common.security.plain.PlainLoginModule"
          String saslJassConfig = String.format(
              "%s required username=\"%s/%s/%s\" password=\"%s\";",
              loginModule,
              tenancyName,
              decryptData(userName),
              streamOCID,
              decryptData(authToken));
          .
          .
          .
      
    3. Erstellen Sie ein java.util.Properties-Objekt, und füllen Sie es mit der Konfiguration aus.
          .
          .
          .
          Properties props = new Properties();
          props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG , bootstrapServers);
          props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG , StringSerializer.class.getName());
          props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , StringSerializer.class.getName());
          props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
          props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
          props.put(SaslConfigs.SASL_JAAS_CONFIG, saslJassConfig);
          props.put(ProducerConfig.RETRIES_CONFIG, 5);
          props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024 * 1024);
          .
          .
          .
      
    4. Registrieren Sie die Konfiguration, und senden Sie die Ereignisnachricht an Streaming Service.
          .
          .
          .
          KafkaProducer<String,String> producer = new KafkaProducer<>(props);
          ProducerRecord<String,String> record = new ProducerRecord<>(event.eventMessage.eventName, event.toString());
      
          producer.send(record);
          producer.flush();
          producer.close();
      
  7. Weisen Sie das System an, handleRequest aufzurufen, wenn eine Nachricht zur Weiterleitung an den Streaming-Service gesendet wird.

    Ändern Sie dazu die Eigenschaft cmd in func.yaml, sodass sie auf die Methode handleRequest in der Klasse EventProducer verweist. Beispiel:

    schema_version: 20180708
    name: obpeventsfunc
    version: 0.0.1
    runtime: java
    build_image: fnproject/fn-java-fdk-build:jdk11-1.0.105
    run_image: fnproject/fn-java-fdk:jre11-1.0.105
    cmd: producer.EventProducer::handleRequest

Ereignisverschiebung erstellen

Nachdem Sie die Java-Klasse für das Ereignisrelais erstellt haben, können Sie damit das Docker-Image erstellen, das Sie an Oracle Cloud Infrastructure Registry übertragen. Nachdem das Image erstellt wurde, müssen Sie es mit Informationen über die OCI-Region und den Object Storage-Namespace taggen.

Bevor Sie ein Image per Push übergeben können, müssen Sie zuerst den Befehl docker tag verwenden, um eine Kopie des lokalen Quellimage als neues Image zu erstellen (das neue Image ist eigentlich nur ein Verweis auf das vorhandene Quellimage). Geben Sie als Namen für das neue Image den vollqualifizierten Pfad zum Zielspeicherort in Oracle Cloud Infrastructure Registry an, in den Sie das Image per Push übergeben möchten.

Sie benötigen die folgenden Informationen für das Tag:

  • Der Docker-Registry-Endpunkt für Ihre Region
  • Ihr Object Storage-Namespace

Um den Endpunkt für Ihre Region abzurufen, suchen Sie in der Tabelle unter Verfügbarkeit nach Region nach Ihrer Region. Beispiel: Der Registry-Endpunkt für US East (Ashburn) ist https://iad.ocir.io.

So suchen Sie den Object Storage-Namespace:

  1. Klicken Sie im Navigationsmenü auf Administration und dann auf Mandantendetails.
  2. Der Obect-Speicher-Namespace befindet sich im Abschnitt "Object Storage-Einstellungen".
  1. Führen Sie im Verzeichnis der obersten Ebene den Befehl "Build" aus. Das Verzeichnis der obersten Ebene ist das Verzeichnis, das Sie erstellt haben, als Sie den Befehl fn init ausgeführt haben.
    fn build
  2. Taggen Sie das Image mit den Informationen zur OCI-Region und zum Object Storage-Namespace.

    Beispiel: Wenn der Endpunkt für Ihre Region https://iad.ocir.io ist und der Object Storage-Namespace yzl1yzrddld7 lautet, lautet der Befehl folgendermaßen: Sie verwenden obpeventsfunc als Namen des Ereignisrelais:

    docker tag obpeventsfunc:0.0.1 iad.ocir.io/yzl1yzrddld7/obpeventsfunc:0.0.1

    Hinweis:

    Achten Sie darauf, die Protokollinformationen https:// aus dem Befehl auszulassen.

Ereignis-Relay an die Registry übertragen

Nachdem das Ereignisrelais erstellt wurde und über das richtige Tag verfügt, können Sie es in Oracle Cloud Infrastructure Registry übertragen.

Sie benötigen die folgenden Informationen, wenn Sie sich bei Oracle Cloud Infrastructure Registry anmelden:
  • API-Endpunkt für Ihre Region: Beispiel: iad.ocir.io. Dies ist derselbe Wert, den Sie zuvor zum Taggen des Docker-Images verwendet haben.
  • Mandanten-Namespace: Die automatisch generierte Object Storage-Namespace-Zeichenfolge Ihres Mandanten (wie auf der Seite Mandanteninformationen angezeigt).
  • Benutzername: Ihr Benutzername in Oracle Cloud Infrastructure.
  • Authentifizierungstoken: Das Token, das Sie zuvor im Abschnitt "Plan" erstellt haben.
  1. Melden Sie sich bei Oracle Cloud Infrastructure Registry an.
    docker login <region-key>.ocir.io

    Wenn Sie dazu aufgefordert werden, geben Sie Ihren Benutzernamen im Format <tenancy-namespace>/<username> ein. Beispiel: yzl1yzrddld7/jdoe@example.com. Wenn Ihr Mandant mit Oracle Identity Cloud Service föderiert ist, verwenden Sie das Format <tenancy-namespace>/oracleidentitycloudservice/<username>.

  2. Geben Sie ggf. das zuvor erstellte Authentifizierungstoken ein.
  3. Senden Sie das Docker-Image von Ihrem Computer an Oracle Cloud Infrastructure Registry.

    Beispiel: Wenn der Endpunkt für Ihre Region https://iad.ocir.io und der Object Storage-Namespace yzl1yzrddld7 lautet, lautet der Befehl wie folgt:

    docker push iad.ocir.io/yzl1yzrddld7/obpeventsfunc:0.0.1

API-Schlüssel erstellen

Generieren Sie ein Schlüsselpaar, und laden Sie den Public Key in Ihr Oracle Cloud Infrastructure-Benutzerprofil hoch.

Dies können Sie auf unterschiedliche Weise tun, aber hier verwenden wir die Cloud-Shell, weil Sie den generierten Schlüssel sicher direkt in Ihren Account hochladen können.

  1. Melden Sie sich bei der Cloud-Shell für Ihren Mandanten an.
  2. Verzeichnis für das Schlüsselpaar erstellen
    mkdir ~/.oci
  3. Generieren Sie einen Private Key mit 2048 Bit, und speichern Sie ihn in einer Datei mit dem Namen oci_api_key.pem.
    openssl genrsa -out ~/.oci/oci_api_key.pem 2048
  4. Machen Sie die Datei für Ihren Benutzer schreibgeschützt.
    chmod u=r,go= ~/.oci/oci_api_key.pem
  5. Generieren Sie einen Public Key, und speichern Sie ihn in einer Datei mit dem Namen oci_api_key_public.pem.
    openssl rsa -pubout -in ~/.oci/oci_api_key.pem -out ~/.oci/oci_api_key_public.pem
  6. Laden Sie den Public Key in Ihr Benutzerprofil hoch.

    Ersetzen Sie im folgenden Snippit den Wert "--user-id" durch Ihre eigene OCID, und ändern Sie den Wert "--region" in den Wert "Region Key" für Ihre eigene Region.

    oci iam user api-key upload \
    --user-id oci.user.oc1..aaaaa ... z3goa \
    --key-file ~/.oci/oci_api_key_public.pem \
    --region IAD
  7. Suchen und erfassen Sie in der JSON-Antwort den Wert für den Fingerprint. Sie benötigen diesen Wert später.

Terraform-Konfiguration anwenden

Laden Sie die Terraform-Konfiguration aus dem GitHub-Repository herunter, aktualisieren Sie die Terraform-Variablendatei, und wenden Sie dann die Konfiguration an.

Bevor Sie beginnen, stellen Sie sicher, dass die folgenden Informationen verfügbar sind. Die Informationen sind für die Datei terraform.tvars erforderlich.
  • Region - Die Regions-ID für die Region. Sie können diesen aus der URL der Oracle Cloud Infrastructure-Konsole extrahieren. Beispiel: Wenn die URL https://console.us-ashburn-1.oraclecloud.com/compute/instances lautet, lautet die Regions-ID us-ashburn-1. Wenn die URL https://cloud.oracle.com/compute/instances lautet, müssen Sie die Regions-ID in https://docs.oracle.com/iaas/Content/General/Concepts/regions.htm suchen.
  • Compartment-OCID: Die OCID des Compartments, das die Ressourcen für das Projekt enthält. Wenn Sie die Anweisungen im Abschnitt "Plan" befolgt haben, benötigen Sie die OCID für das Compartment OBP_Events.
  • Fingerprint: Der Fingerprint des öffentlichen API-Schlüssels, den Sie zuvor generiert und in Ihr Profil hochgeladen haben.
  • Private Key - Der vollständige Pfad und Dateiname des zuvor generierten privaten API-Schlüssels. Beispiel: /home/opc/oci_api_key.pem. Verwenden Sie ~ nicht im Pfad.
  • Benutzer-OCID - Sie können diese auf der Seite Benutzerdetails abrufen. Öffnen Sie das Menü "Konsole", navigieren Sie zu Identität, und klicken Sie auf Benutzer. Klicken Sie in der Liste auf Ihren Benutzernamen.
  • Mandanten-OCID - Sie können diese auf der Seite Mandantendetails abrufen. Öffnen Sie das Menü "Konsole", navigieren Sie zu Administration, und klicken Sie auf Mandantendetails.

Wenn Sie die Informationen haben, laden Sie die Terraform-Konfiguration herunter und wenden sie an.

  1. Melden Sie sich bei der Cloud-Shell für Ihren Mandanten an.
  2. Klonen Sie das oci-obp-extension-Repository aus GitHub.
    git clone https://github.com/oracle-quickstart/oci-obp-extension.git
  3. Wechseln Sie in das Terraform-Verzeichnis.
    cd oci-obp-extension/terraform
  4. Öffnen Sie die Datei terraform.tvars in einem Texteditor, und ersetzen Sie die Platzhalterwerte durch die tatsächlichen Werte.
  5. Löschen Sie die Umgebungsvariablen OCI_AUTH und OCI_use_obo_token.
    unset OCI_AUTH OCI_use_obo_token
  6. Bereiten Sie das Verzeichnis auf die Terraform-Konfiguration vor.
    terraform init
  7. Zeigen Sie die Terraform-Konfiguration an, und prüfen Sie sie.
    terraform plan
  8. Wenden Sie die Terraform-Konfiguration an.
    terraform apply -auto-approve
  9. Aufzeichnen Sie die API-Gateway-Informationen, die angezeigt werden, wenn die Terraform-Konfiguration abgeschlossen ist. Sie müssen einen Datensatz mit den folgenden Werten erstellen: OBP_Event_Subscribe_Callback und use_to_extract_ssl_certificate.

Blockchain-Ereignisse abonnieren

Registrieren Sie eine Callback-URL, damit API Gateway Blockchain-Ereignisse empfangen kann.

Die URL, die Sie registrieren, ist die API-Gateway-Callback-URL, die angezeigt wurde, als der Terraform-Prozess abgeschlossen wurde. Sie hat das Format https://joh5rb...fuxy.apigateway.us-ashburn-1.oci.customer-oci.com/obpevents/callback. Sie benötigen außerdem den API-Gateway-Deployment-Endpunkt, der beim Abschluss des Terraform-Prozesses als use_to_extract_ssl_certificate angezeigt wurde. Es hat die Form joh5rb...fuxy.apigateway.us-ashburn-1.oci.customer-oci.com:443

Sie benötigen außerdem den REST-API-Endpunkt von Oracle Blockchain Platform, um Chaincode-Ereignisse abonnieren zu können. Sie hat folgende Form:

https://<rest_server_url:port/restproxy#>/bcsgw/rest/v1/event/subscribe

<rest_server_url:port/restproxy#> ist die URL für den REST-Proxy, der in der Oracle Blockchain Platform-Konsole aufgeführt wird. Öffnen Sie in der Konsole die Seite "Knoten", und suchen Sie nach dem REST-Proxyknoten. In der Spalte "Route" wird eine URL mit Port und einer REST-Proxynummer aufgeführt.

So abonnieren Sie Blockchain-Ereignisse:

  1. Extrahieren Sie caCert für den API-Gateway-Callback.
    1. Melden Sie sich bei der Cloud-Shell für Ihren Mandanten an.
    2. Führen Sie den folgenden Befehl aus.

      Ersetzen Sie den Text API_GATEWAY durch den Wert use_to_extract_ssl_certificate, den Sie zuvor am Ende des Terraform-Konfigurationsschritts aufgezeichnet haben.

      echo | openssl s_client -showcerts \
      -connect API_GATEWAY 2>/dev/null | openssl x509 \
      -inform pem -out cert.pem;echo;echo;awk 'NF {sub(/\r/, ""); \
      printf "%s\\n",$0;}'  cert.pem;rm cert.pem;echo;echo
    3. Kopieren Sie das angezeigte Zertifikat. Achten Sie darauf, den Text -----BEGIN CERTIFICATE----- und -----END CERTIFICATE----- aufzunehmen.
  2. Registrieren Sie die Relay-Funktion bei Oracle Blockchain Platform.

    Verwenden Sie die Blockchain-REST-API, um die Relay-Funktion zu registrieren. Verwenden Sie cURL, Postman oder ein anderes Tool, um die Anforderung an die Blockchain zu POST. Der Text der Anforderung hat das folgende JSON-Format:

    {
      "requests": [
        {
          "eventType": "chaincodeEvent",
          "callbackURL":"https://joh5rb...fuxy.apigateway.us-ashburn-1.oci.customer-oci.com/obpevents/callback",
          "expires": "10m",
          "chaincode": "obcs-cardealer",
          "eventName": "VehiclePartCreated",
          "callbackTlsCerts": {
              "caCert": "-----BEGIN CERTIFICATE-----\n ... \n-----END CERTIFICATE-----\n" 
          }
        }
      ]
      

    Stellen Sie sicher, dass Sie die Werte callbackURL, chaincode, eventName und caCert verwenden, die für Sie gelten.

    Hinweis:

    Möglicherweise benötigen Sie mehrere Versuche, bevor Sie eine Erfolgsantwort erhalten, da der Funktionscontainer erstellt und bereitgestellt werden muss.

Ereignis-Consumer erstellen und testen

Der Event Consumer verwendet Kafka-APIs zur Authentifizierung und zum Abrufen von Blockchain-Nachrichten aus Oracle Cloud Infrastructure Streaming.

Um einen Consumer zu erstellen, benötigen Sie Informationen für die folgenden Zeichenfolgen:

  • USER-NAME: Ihr Benutzername.
  • AUTH-TOKEN: Dies ist das Token, das Sie zuvor generiert und kopiert haben.
  • TENANCY-NAME: Sie finden diesen Wert auf der Seite Mandantendetails der OCI-Konsole. Um die Seite Mandantendetails zu öffnen, wählen Sie im Navigationsmenü der Konsole die Option Administration und klicken dann auf Mandantendetails.
  • STREAM-POOL-OCID: Dies ist die OCID des Streampools, der bei der Ausführung des Terraform-Skripts erstellt wurde. Um diesen Wert zu ermitteln, öffnen Sie das Navigationsmenü der Konsole, gehen Sie zu Administration, und klicken Sie auf Streaming. Wählen Sie den Streampool und auf der Seite, die geöffnet wird, die OCID kopieren.
  • REGION: "us-ashburn-1". Ihre Region könnte anders sein.
  • EVENT-NAME: Dies ist derselbe Ereignisname, den Sie zuvor bei der Registrierung der Callback-URL für den Empfang von Blockchain-Ereignissen verwendet haben.

Im folgenden Beispiel können Sie testen und prüfen, ob das System wie erwartet funktioniert.

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class Consumer {

    public static void main(String[] args) throws Exception {
       
        Properties props = new Properties();
        props.put("bootstrap.servers", "streaming.REGION.oci.oraclecloud.com:9092");
        props.put("security.protocol", "SASL_SSL");
        props.put("sasl.mechanism", "PLAIN");
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"TENANCY-NAME/USER-NAME/STREAM-POOL-OCID\" password=\"AUTH-TOKEN\";");
        props.put("group.id", "group-0");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // Create a consumer and subscribe to it
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("EVENT-NAME"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s\n", 
                    record.offset(), record.key(), record.value());
        }     
    } 
}

Wenn Sie den Consumer starten, werden alle vorhandenen Ereignisse verbraucht, die bereits in den Stream übertragen wurden.

Nachdem das Beispiel funktioniert und die Lösung funktioniert, können Sie eine oder mehrere Consumer-Apps auf Produktionsebene erstellen. Die Apps können in JavaScript, Python oder einer anderen Sprache geschrieben werden.