Runtime Implementation of Triggers

This topic describes how you can design logic that executes runtime implementation of a trigger by tranforming the data in the inbound webhook.

Runtime Implementation

Certain business scenarios may contain data in a raw form. To make it easy for the integration designer to understand the data, the trigger can implement logic that uses webhook to transform an inbound message into a integration mapper-friendly form. The integration mapper-friendly form is defined by the request logic that references a flow.

The following code sample shows a trigger definition that defines a request structure that's different than the inbound webhook. The execute property transforms the message structure of the raw data into a more refined form as modeled in the request.

"GCPPubSubNotificationTrigger": {
  "displayName": "Subscribe to a topic",
  "description": "This trigger detects when a message is publised to a subscribed topic.",
  "type": "webhook",
  "execute": "flow:mapNotificationFlow",
  "httpMethod": "POST",
  "request": "flow:DynamicInputFlowTrigger",
  "webhook": {
    "headers": [
      {
        "name": "from",
        "type": "string"
      },
      {
        "name": "content-type",
        "type": "string"
      },
      {
        "name": "host",
        "type": "string"
      },
      {
        "name": "content-length",
        "type": "string"
      },
      {
        "name": "user-agent",
        "type": "string"
      },
      {
        "name": "authorization",
        "type": "string"
      },
      {
        "name": "x-span-id",
        "type": "string"
      },
      {
        "name": "transfer-encoding",
        "type": "string"
      },
      {
        "name": "connection",
        "type": "string"
      },
      {
        "name": "accept-encoding",
        "type": "string"
      },
      {
        "name": "accept",
        "type": "string"
      }
    ],
    "body": {
      "schemaType": "application/schema+json",
      "schema": {
        "$ref": "#/schemas/NotificationSchemaGCP"
      }
    }
  },
  "subscription": {
    "register": "flow:CreateSubscriptionFlow",
    "deregister": "flow:DeleteSubscriptionFlow"
  },
  "configuration": [
    {
      "name": "TopicName",
      "displayName": "Select topic",
      "description": "",
      "type": "COMBO_BOX",
      "options": "flow:ListTopicsUIFlow",
      "required": true
    },
    {
      "name": "SchemaName",
      "displayName": "Schema type associated with topic",
      "description": "Schema type associated with topic",
      "type": "TEXT_BOX",
      "default": "flow:GetSchemaNameFlow",
      "required": true,
      "dependencies": {
        "TopicName": {
          "values": []
        }
      }
    },
    {
      "name": "SchemaSupport",
      "displayName": "Schema not supported",
      "description": "Schema not supported",
      "type": "TEXT_BOX",
      "default": "flow:SchemaNotSuportedFlow",
      "required": true,
      "dependencies": {
        "SchemaName": {
          "values": [
            "PROTOCOL_BUFFER",
            "AVRO"
          ]
        }
      }
    },
    {
      "name": "InputField",
      "displayName": "Provide JSON Sample",
      "description": "Provide JSON sample for data",
      "type": "TEXT_AREA",
      "required": true,
      "validation": "flow:ValidateJsonFlow",
      "dependencies": {
        "SchemaName": {
          "values": [
            "No schema"
          ]
        }
      }
    },
    {
      "name": "filter",
      "displayName": "Subscription filter",
      "description": "Max 256 characters. Example: attributes:k OR (attributes.k1=\"v\" AND NOT hasPrefix (attributes.k2,\"v\"))",
      "type": "TEXT_AREA",
      "required": false,
      "validation": "flow:ValidateFilterLengthFlow"
    },
    {
      "name": "ackDeadline",
      "displayName": "Acknowledgement deadline",
      "description": "Deadline time is from 10 seconds to 600 seconds.",
      "type": "TEXT_BOX",
      "required": true,
      "default": "10",
      "validation": "flow:ValidateAckDeadlineFlow"
    },
    {
      "name": "enableDeadLettering",
      "displayName": "Enable dead lettering",
      "description": "Subscriptions may configure a maximum number of delivery attempts. When a message cannot be delivered, it is republished to the specified dead-letter topic.",
      "default": "false",
      "type": "CHECK_BOX",
      "required": false
    },
    {
      "name": "deadLetterTopic",
      "displayName": "Select a dead-letter topic",
      "description": "Topic should have a subscription associated to it.",
      "type": "COMBO_BOX",
      "options": "flow:ListTopicsUIFlow",
      "required": true,
      "dependencies": {
        "enableDeadLettering": {
          "values": [
            "true"
          ]
        }
      }
    },
    {
      "name": "maxDeliveryAttempts",
      "displayName": "Maximun delivery attempts",
      "description": "Maximum delivery attempts is from 5 to 100.",
      "type": "TEXT_BOX",
      "required": true,
      "default": "5",
      "validation": "flow:ValidateMaxDeliveryAttemptsFlow",
      "dependencies": {
        "enableDeadLettering": {
          "values": [
            "true"
          ]
        }
      }
    },
    {
      "name": "enableMessageOrdering",
      "displayName": "Order messages with an ordering key",
      "description": "When enabled, messages tagged with the same ordering key will be received in the order that they are published. This option cannot be changed later.",
      "default": "false",
      "type": "CHECK_BOX",
      "required": false
    },
    {
      "name": "retainAckedMessages",
      "displayName": "Retain acknowledged messages",
      "description": "When enabled, acknowledged messages are retained for the message retention duration specified above. This increases message storage fees.",
      "default": "false",
      "type": "CHECK_BOX",
      "required": false
    },
    {
      "name": "messageRetentionDuration",
      "displayName": "Specify message retention duration",
      "description": "Duration is from 10 minutes to 7 days",
      "default": "false",
      "type": "CHECK_BOX",
      "required": false
    },
    {
      "name": "days",
      "displayName": "Days",
      "description": "",
      "type": "COMBO_BOX",
      "options": "flow:DaysFlow",
      "required": true,
      "dependencies": {
        "messageRetentionDuration": {
          "values": [
            "true"
          ]
        }
      }
    },
    {
      "name": "hours",
      "displayName": "Hours",
      "description": "",
      "type": "COMBO_BOX",
      "options": "flow:HoursFlow",
      "required": true,
      "dependencies": {
        "days": {
          "values": [
            "0",
            "1",
            "2",
            "3",
            "4",
            "5",
            "6"
          ]
        }
      }
    },
    {
      "name": "minutes",
      "displayName": "Minutes",
      "description": "",
      "type": "COMBO_BOX",
      "options": "flow:MinutesFlow",
      "required": true,
      "dependencies": {
        "days": {
          "values": [
            "0",
            "1",
            "2",
            "3",
            "4",
            "5",
            "6"
          ]
        }
      }
    }
  ]
}

The following sample code for the flows logic shows a data transformation that does not need to call a third party or an external API. Additionally, the sample code illustrates:

  • How jq expressions and out-of-box functions provided by the Rapid Adapter Builder platform can execute transformation of data into the required format.
  • Part of the webhook inbound message sent by the external application is encoded in base64.
  • Flow logic extracts base64 encoded portion of the message.
  • Flow logic decodes and transforms the data into a form that is presented in the integration mapper.
"mapNotificationFlow": {
  "id": "mapNotificationFlow",
  "version": "0.1",
  "start": "startState",
  "specVersion": "0.8",
  "functions": [
    {
      "name": "mapNotification",
      "operation": ".input|(if .message.attributes != null then (.message.attributes | to_entries | map({key: .key, value: .value})) as $attributes | .message.attributes = $attributes else . end | .message.data |= (if . then @base64d | fromjson else empty end))",
      "type": "expression"
    }
  ],
  "states": [
    {
      "actions": [
        {
          "functionRef": "mapNotification",
          "actionDataFilter": {
            "toStateData": "${ .output }"
          }
        }
      ],
      "name": "startState",
      "type": "operation",
      "end": true
    }
  ]
}