Develop and Deploy the Event Relay

Create an Oracle Functions application that consumes blockchain events and relays them to Oracle Streaming Service.

The event relay application needs credentials for connecting to the streaming service. The credentials are kept in Oracle Cloud Infrastructure Vault. The credentials are stored in Vault when you run the Terraform code. The credentials are generated when you do the initial configuration of your OCI environment.

The Function provides the following services:

  1. Retrieve Streaming Service credentials from Vault.
  2. Decrypt the credentials (Vault stores encrypted values).
  3. Decode from base64 encoded string.
  4. Use the decoded credentials to connect to Streaming Service using Kafka-compatible API.
  5. Create and populate a Java object called event from the JSON event message.
  6. Use Kafka API to send the event message.

Create the Event Relay

You can create the relay in any programming language, but because Kafka APIs are documented in Java it's best to use Java.

The EventProducer class will have the following methods:

@FnConfiguration
public void config(RuntimeContext ctx) { ... }

private String decryptData(String cipherText) { ... }

public String handleRequest(Event event) { ... }

The following dependencies are also needed in the pom.xmlfile:

<dependencies>
    <dependency>
        <groupId>com.oracle.oci.sdk</groupId>
        <artifactId>oci-java-sdk-keymanagement</artifactId>
        <version>1.12.5</version>
    </dependency>
    <dependency>
        <groupId>javax.activation</groupId>
        <artifactId>javax.activation-api</artifactId>
        <version>1.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.4.0</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.7.30</version>
    </dependency>
    <dependency>
        <groupId>com.fnproject.fn</groupId>
        <artifactId>api</artifactId>
        <version>${fdk.version}</version>
    </dependency>
    .
    .
    .

Make sure that you have the fn CLI tool installed. See https://github.com/fnproject/fn#quickstart for more information.

  1. Create a fn project and give it a reasonable name such as obpeventsfunc.
    fn init obpeventsfunc --runtime java
  2. Replace HelloFunction.java with a file called EventProducer.java.

    All the code will be in this single class. To construct the function, you must import the following classes:

    com.fnproject.fn.api.FnConfiguration
    com.fnproject.fn.api.RuntimeContext
    com.oracle.bmc.auth.AbstractAuthenticationDetailsProvider
    com.oracle.bmc.auth.ResourcePrincipalAuthenticationDetailsProvider
    com.oracle.bmc.keymanagement.KmsCryptoClient
    com.oracle.bmc.keymanagement.model.DecryptDataDetails
    com.oracle.bmc.keymanagement.requests.DecryptRequest
    com.oracle.bmc.keymanagement.responses.DecryptResponse
    org.apache.commons.codec.binary.Base64
    org.apache.kafka.clients.CommonClientConfigs
    org.apache.kafka.clients.producer.KafkaProducer
    org.apache.kafka.clients.producer.ProducerConfig
    org.apache.kafka.clients.producer.ProducerRecord
    org.apache.kafka.common.config.SaslConfigs
    org.apache.kafka.common.serialization.StringSerializer
    java.util.Properties
  3. Create an inner class to hold the event message from the blockchain.

    You need an object that corresponds to the structure of the event message. The three class definitions are part of the EventProducer class. This class is used by Oracle Functions to deserialize the JSON data that comes from Oracle Blockchain Platform.

    public static class Event {
         public String eventType;
         public String subid;
         public String channel;
         public EventMsg eventMsg;
     
         @Override
         public String toString() {
             return eventMsg.payload.data;
         }
     }
     
     public static class EventMsg {
         public String chaincodeId;
         public String txId;
         public String eventName;
         public Payload payload;
     }
     
     public static class Payload {
         public String type;
         public String data;
     }
  4. Define several String variables globally to hold the configuration values, which are populated in the config method.

    Because of the @FnConfiguration annotation, the system passes a RuntimeContext object to this method. You then use getConfigurationByKey to populate the global variables. You need a variable for each of the following configuration keys:

    BOOT_STRAP_SERVERS
    STREAM_OCID
    TENANT_NAME 
    USER_NAME
    AUTH_TOKEN
    KMS_ENDPOINT
    KMS_KEY_ID
    

    For example:

    @FnConfiguration
    public void config(RuntimeContext ctx) { 
        bootstrapServers = ctx.getConfigurationByKey("BOOT_STRAP_SERVERS").orElse("");
        streamOCID = ...
        .
        .
        .
    }
  5. Create the decryptData method.

    The decryptData method will look like the following snippet. The values for kmsEndpoint and kmsKeyId are populated in the config method and correspond to the KMS_ENDPOINT and KMS_KEY_ID configuration keys. Note that the plain text is Base64 encoded and needs to be Base64 decoded before returning the value. The decryptData method is used by the handleRequest method when setting Kafka authentication credentials.

    private String decryptData(String cipherText) {
     
        AbstractAuthenticationDetailsProvider provider = ResourcePrincipalAuthenticationDetailsProvider.builder().build();
        KmsCryptoClient cryptoClient = KmsCryptoClient.builder().endpoint(kmsEndpoint).build(provider);
     
        DecryptDataDetails decryptDataDetails = DecryptDataDetails.builder().keyId(kmsKeyId).ciphertext(cipherText).build();
        DecryptRequest decryptRequest = DecryptRequest.builder().decryptDataDetails(decryptDataDetails).build();
        DecryptResponse decryptResponse = cryptoClient.decrypt(decryptRequest);
     
        final String base64String = decryptResponse.getDecryptedData().getPlaintext();
        byte[] byteArray = Base64.decodeBase64(base64String.getBytes());
        String value = new String(byteArray);
     
        return value;
    }
    
  6. Create the handleRequest method.

    This method retrieves the credentials for the Streaming service from Oracle Vault and then sends the event message to the Streaming service. It also does some "paperwork": Kafka configuration and credential retrieval and decryption.

    The handleRequest method takes an Event object as its only parameter. The object is created automatically by the Oracle Functions system and populated with data from the JSON that comes from Blockchain. For this reason, the Event inner class needs to map directly to the JSON data that comes from Blockchain.

    The first thing you need to do is check for the "test" event type. When you subscribe to blockchain events, the first event is a test event that you don't want to relay to event consumers.

    1. Check for the "test" event type.
      public String handleRequest(Event event) {
       
          if(event.eventType.equalsIgnoreCase("test")) {
              return "done";
          }
          .
          .
          .
      }
    2. Build the string that's required for Streaming Service SASL configuration.
          .
          .
          .
      
          String loginModule = "org.apache.kafka.common.security.plain.PlainLoginModule"
          String saslJassConfig = String.format(
              "%s required username=\"%s/%s/%s\" password=\"%s\";",
              loginModule,
              tenancyName,
              decryptData(userName),
              streamOCID,
              decryptData(authToken));
          .
          .
          .
      
    3. Create a java.util.Properties object and populate it with the configuration.
          .
          .
          .
          Properties props = new Properties();
          props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG , bootstrapServers);
          props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG , StringSerializer.class.getName());
          props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , StringSerializer.class.getName());
          props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
          props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
          props.put(SaslConfigs.SASL_JAAS_CONFIG, saslJassConfig);
          props.put(ProducerConfig.RETRIES_CONFIG, 5);
          props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024 * 1024);
          .
          .
          .
      
    4. Register the configuration and send the event message out to Streaming Service.
          .
          .
          .
          KafkaProducer<String,String> producer = new KafkaProducer<>(props);
          ProducerRecord<String,String> record = new ProducerRecord<>(event.eventMessage.eventName, event.toString());
      
          producer.send(record);
          producer.flush();
          producer.close();
      
  7. Instruct the system to call handleRequest whenever a message is sent for relaying to the streaming service.

    To do this, modify the cmd property in func.yaml to point to the handleRequest method in the EventProducer class. For example:

    schema_version: 20180708
    name: obpeventsfunc
    version: 0.0.1
    runtime: java
    build_image: fnproject/fn-java-fdk-build:jdk11-1.0.105
    run_image: fnproject/fn-java-fdk:jre11-1.0.105
    cmd: producer.EventProducer::handleRequest

Build the Event Relay

After you create the Java class for the event relay, use it to build the Docker image that you will push to Oracle Cloud Infrastructure Registry. After the image is built, you must tag it with information about the OCI Region and the Object Storage namespace.

Before you can push an image, you must first use the docker tag command to create a copy of the local source image as a new image (the new image is actually just a reference to the existing source image). As a name for the new image, specify the fully qualified path to the target location in Oracle Cloud Infrastructure Registry where you want to push the image.

You need the following information for the tag:

  • The Docker registry endpoint for your region
  • Your Object Storage namespace

To get the endpoint for your region, look up your region in the table on Availability by Region. For example, the registry endpoint for US East (Ashburn) is https://iad.ocir.io.

To find your Object Storage namespace:

  1. In the navigation menu, click Administration and then click Tenancy Details.
  2. The Obect Storage Namespace is in the Object Storage Settings section
  1. In the top-level directory, run the build command. The top-level directory is the one you created when you ran the fn init command.
    fn build
  2. Tag the image with the OCI Region and Object Storage Namespace information.

    For example, if the endpoint for your region is https://iad.ocir.io and your Object Storage Namespace is yzl1yzrddld7 then the command is as follows, assuming that you use obpeventsfunc as the name of the event relay:

    docker tag obpeventsfunc:0.0.1 iad.ocir.io/yzl1yzrddld7/obpeventsfunc:0.0.1

    Note:

    Make sure to omit the https:// protocol information from the command.

Push the Event Relay to the Registry

After the event relay is built and has the proper tag, you can push it to Oracle Cloud Infrastructure Registry.

You need the following information when logging into Oracle Cloud Infrastructure Registry:
  • API endpoint for your region: For example, iad.ocir.io. It's the same value that you used earlier to tag the Docker image.
  • Tenancy Namespace: the auto-generated Object Storage namespace string of your tenancy (as shown on the Tenancy Information page).
  • Username: Your user name in Oracle Cloud Infrastructure.
  • Auth Token: The token that you created earlier in the Plan section.
  1. Log into Oracle Cloud Infrastructure Registry.
    docker login <region-key>.ocir.io

    When prompted, enter your user name in the format <tenancy-namespace>/<username>. For example, yzl1yzrddld7/jdoe@example.com. If your tenancy is federated with Oracle Identity Cloud Service, use the format <tenancy-namespace>/oracleidentitycloudservice/<username>.

  2. When prompted, enter the auth token that you created earlier.
  3. Push the Docker image from your computer to Oracle Cloud Infrastructure Registry.

    For example, if the endpoint for your region is https://iad.ocir.io and your Object Storage Namespace is yzl1yzrddld7 then the command is as follows:

    docker push iad.ocir.io/yzl1yzrddld7/obpeventsfunc:0.0.1

Create an API Key

Generate a key pair and upload the public key to your Oracle Cloud Infrastructure user profile.

You can do this in couple of ways, but here we'll use the Cloud Shell because you can securely upload the generated key directly to your account.

  1. Log into the Cloud Shell for your tenancy.
  2. Create a directory for the key pair
    mkdir ~/.oci
  3. Generate a 2048 bit private key and store it in a file called oci_api_key.pem.
    openssl genrsa -out ~/.oci/oci_api_key.pem 2048
  4. Make the file read-only by your user.
    chmod u=r,go= ~/.oci/oci_api_key.pem
  5. Generate a public key and store it in a file called oci_api_key_public.pem.
    openssl rsa -pubout -in ~/.oci/oci_api_key.pem -out ~/.oci/oci_api_key_public.pem
  6. Upload the public key to your user profile.

    In the following snippit, replace the --user-id value with your own OCID and change the --region value to the Region Key value for your own Region.

    oci iam user api-key upload \
    --user-id oci.user.oc1..aaaaa ... z3goa \
    --key-file ~/.oci/oci_api_key_public.pem \
    --region IAD
  7. In the JSON response, locate and record the value for the fingerprint. You will need this value later.

Apply the Terraform Configuration

Download the Terraform configuration from the GitHub repository, update the Terraform variables file, then apply the configuration.

Before you begin, make sure you have the following information available. The information is required for the terraform.tvars file.
  • Region - the Region Identifier for the region. You can extract this from the URL of your Oracle Cloud Infrastructure Console. For example, if the URL is https://console.us-ashburn-1.oraclecloud.com/compute/instances, the your Region Identifier is us-ashburn-1. If your URL is https://cloud.oracle.com/compute/instances, then you have to look up your Region Identifier in https://docs.oracle.com/iaas/Content/General/Concepts/regions.htm
  • Compartment OCID - the OCID of the compartment that contains the resources for the project. If you followed the instructions in the Plan section, you need the OCID for the OBP_Events compartment.
  • Fingerprint - the fingerprint of the public API Key that you generated and uploaded to your profile previously.
  • Private key - the full path and filename of the private API Key that you generated previously. For example, /home/opc/oci_api_key.pem. Do not use ~ in the path.
  • User OCID - You can obtain this from the User Details page. Open the Console menu, goto Identity and click Users. Click your user name in the list.
  • Tenancy OCID - You can obtain this from the Tenancy Details page. Open the Console menu, goto Administration and click Tenancy Details.

When you have the information, download and apply the Terraform configuration.

  1. Log into the Cloud Shell for your tenancy.
  2. Clone the oci-obp-extension repository from GitHub.
    git clone https://github.com/oracle-quickstart/oci-obp-extension.git
  3. Change into the Terraform directory.
    cd oci-obp-extension/terraform
  4. Open the terraform.tvars file in a text editor and replace the place-holder values with the actual values.
  5. Clear the OCI_AUTH and OCI_use_obo_token environment variables.
    unset OCI_AUTH OCI_use_obo_token
  6. Prepare the directory for Terraform configuration.
    terraform init
  7. View and verify what the Terraform configuration will create.
    terraform plan
  8. Apply the Terraform configuration.
    terraform apply -auto-approve
  9. Record the API Gateway information that is displayed when the Terraform configuration completes. You need to make a record of the following values: OBP_Event_Subscribe_Callback and use_to_extract_ssl_certificate.

Subscribe to Blockchain Events

Register a callback URL so that API Gateway can receive blockchain events.

The URL that you register is the API Gateway callback URL that was displayed when the Terraform process completed. It's of the form https://joh5rb...fuxy.apigateway.us-ashburn-1.oci.customer-oci.com/obpevents/callback. You'll also need the API Gateway deployment endpoint, which was displayed as use_to_extract_ssl_certificate when the Terraform process completed. It's of the form joh5rb...fuxy.apigateway.us-ashburn-1.oci.customer-oci.com:443

You also need the Oracle Blockchain Platform REST API endpoint for subscribing to chaincode events. It has the following form:

https://<rest_server_url:port/restproxy#>/bcsgw/rest/v1/event/subscribe

<rest_server_url:port/restproxy#> is the URL for the REST proxy which is listed in the Oracle Blockchain Platform console. In the console, open the Nodes page and look for the REST proxy node. In the Route column, a URL including port and a REST proxy number is listed.

To subscribe to blockchain events:

  1. Extract the caCert for the API Gateway callback.
    1. Log into the Cloud Shell for your tenancy.
    2. Run the following command.

      Replace the text API_GATEWAY with the value of use_to_extract_ssl_certificate that you recorded earlier at the end of the Terraform configuration step.

      echo | openssl s_client -showcerts \
      -connect API_GATEWAY 2>/dev/null | openssl x509 \
      -inform pem -out cert.pem;echo;echo;awk 'NF {sub(/\r/, ""); \
      printf "%s\\n",$0;}'  cert.pem;rm cert.pem;echo;echo
    3. Copy the certificate that is displayed. Make sure to include the text -----BEGIN CERTIFICATE----- and -----END CERTIFICATE-----.
  2. Register the relay function with Oracle Blockchain Platform.

    Use the blockchain REST API to register the relay function. Use cURL, Postman, or another tool to POST the request to the blockchain. The body of the request has the following JSON format:

    {
      "requests": [
        {
          "eventType": "chaincodeEvent",
          "callbackURL":"https://joh5rb...fuxy.apigateway.us-ashburn-1.oci.customer-oci.com/obpevents/callback",
          "expires": "10m",
          "chaincode": "obcs-cardealer",
          "eventName": "VehiclePartCreated",
          "callbackTlsCerts": {
              "caCert": "-----BEGIN CERTIFICATE-----\n ... \n-----END CERTIFICATE-----\n" 
          }
        }
      ]
      

    Make sure to use the callbackURL, chaincode, eventName, and caCert values that apply to you.

    Note:

    You might need several attempts before you get a success response because of the time required to build and deploy the function container.

Create and Test the Event Consumer

The event consumer uses Kafka APIs to authenticate and retrieve blockchain messages from Oracle Cloud Infrastructure Streaming.

To create a consumer, you need information for the following Strings:

  • USER-NAME: Your user name.
  • AUTH-TOKEN: This is the token that you generated and copied earlier.
  • TENANCY-NAME: You can find this value on the Tenancy Details page of the OCI console. To open the Tenancy Details page, in the Console navigation menu select Administration then click Tenancy Details.
  • STREAM-POOL-OCID: This is the OCID of the stream pool that was created when you ran the Terraform script. To find this value, open the Console navigation menu and go to Administration and click Streaming. Select the stream pool and in the page that opens copy the OCID.
  • REGION: "us-ashburn-1". Your region might be different.
  • EVENT-NAME: This is the same event name that you used earlier when you registered the callback URL to receive blockchain events.

Use the following example to test and validate that the system is working as expected.

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class Consumer {

    public static void main(String[] args) throws Exception {
       
        Properties props = new Properties();
        props.put("bootstrap.servers", "streaming.REGION.oci.oraclecloud.com:9092");
        props.put("security.protocol", "SASL_SSL");
        props.put("sasl.mechanism", "PLAIN");
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"TENANCY-NAME/USER-NAME/STREAM-POOL-OCID\" password=\"AUTH-TOKEN\";");
        props.put("group.id", "group-0");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // Create a consumer and subscribe to it
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("EVENT-NAME"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s\n", 
                    record.offset(), record.key(), record.value());
        }     
    } 
}

When you start the consumer, it consumes any existing events that were already pushed into the stream.

After the example is working and it demonstrates that the solution works, you're ready to create one or more production-level consumer apps. The apps can be written in JavaScript, Python, or another language.