Register and Deregister a Subscription

This topic describes how to register and deregister a subscription.

Subscription Registration

Typically, an application or service administrator configures the webhooks. A webhook includes a URI that indicates the intended destination to the application or service.

The webhook configuration may include:

  • A ping from the application to ensure that the endpoint can be reached.
  • Explicit acknowledgment of the ping with a specific message.
  • Validation information.

    For example, transmission of an event specific key.

An application or a service may also programatically setup the webhook.

In Oracle Integration, the registration of a webhook corresponds to the activation of an integration and indicates that Oracle Integration is ready to receive a message from an event. A deregistration corresponds to the deactivation of an integration and indicates that the integration is inactive and does not receive events.

In the adapter definition document, the adapter developer can register and deregister by referencing flows as shown in the following example code:

"subscription": {
  "register": "flow:CreateSubscriptionFlow",
  "deregister": "flow:DeleteSubscriptionFlow"
},

Flow to Register a Subscription

The flow to register a webhook consists of the following logic:

  • Lists the subscriptions for the logged in account.

    Executes an API call to the external application or service to retrieve the list.

  • Determines if the subscription exists.

    Parses the results and executes a jq expression and a CNCF function.

  • Updates an existing subscription.

    Executes an API call to the external application or service.

  • Creates a new subscription.

    Executes an API call to the external application or service.

Note:

In the following example code, the Oracle Integration endpoint is represented by .integrationProperties.INTEGRATION_FLOW_URL and helps find the details of the subscription registration.
"CreateSubscriptionFlow": {
  "id": "CreateSubscriptionFlow",
  "description": "CreateSubscriptionFlow",
  "version": "0.1",
  "start": "startState",
  "specVersion": "0.8",
  "functions": [
    {
      "name": "ResolveSubscriptionName",
      "operation": "(.integrationProperties.INTEGRATION_FLOW_URL| split(\"/\") | .[-3]) as $intName | (.integrationProperties.INTEGRATION_FLOW_URL| split(\"/\") | .[-2]) as $flow | (.integrationProperties.INTEGRATION_FLOW_URL| split(\"/\") | .[2]) as$host | (if (.integrationProperties.INTEGRATION_FLOW_URL| split(\"/\") | .[-5])|test(\"project\") then $intName+\"_\"+\"project_\"+$flow+\"_\"+$host else $intName+\"_\"+$flow+\"_\"+$host end )",
      "type": "expression"
    },
    {
      "name": "generalRestFunc",
      "operation": "connectivity::rest",
      "type": "custom"
    },
    {
      "name": "TransformSubName",
      "operation": "[.integrationProperties.INTEGRATION_FLOW_URL==.subscriptionList.body.subscriptions[].pushConfig.pushEndpoint]|any",
      "type": "expression"
    }
  ],
  "states": [
    {
      "actions": [
        {
          "functionRef": "ResolveSubscriptionName",
          "actionDataFilter": {
            "toStateData": "${ .subscriptionName }"
          }
        },
        {
          "name": "ListSubscriptionFunction",
          "functionRef": {
            "refName": "generalRestFunc",
            "arguments": {
              "uri": "${\"https://\"+.connectionProperties.hostName+\"/v1/projects/\"+.connectionProperties.projectID+\"/subscriptions\"}",
              "method": "GET",
              "parameters": {
                "pageSize": 1000
              }
            }
          },
          "actionDataFilter": {
            "results": "${{ body: .body, headers: .headers }}",
            "toStateData": "${ .subscriptionList }"
          }
        },
        {
          "functionRef": "TransformSubName",
          "actionDataFilter": {
            "toStateData": "${ .subscriptionExists }"
          }
        },
        {
          "name": "UpdateSubscriptionFunction",
          "functionRef": {
            "refName": "generalRestFunc",
            "arguments": {
              "uri": "${\"https://\"+.connectionProperties.hostName+\"/v1/projects/\"+.connectionProperties.projectID+\"/subscriptions/\"+.subscriptionName}",
              "method": "PATCH",
              "body": "${{subscription:{ackDeadlineSeconds: .configuration.ackDeadline,pushConfig:{oidcToken: {serviceAccountEmail: .connectionProperties.serviceAccount}},deadLetterPolicy: (if .configuration.enableDeadLettering == \"true\" then {deadLetterTopic: (\"projects/\"+.connectionProperties.projectID+\"/topics/\"+.configuration.deadLetterTopic), maxDeliveryAttempts:.configuration.maxDeliveryAttempts} else null end),retainAckedMessages: (if .configuration.retainAckedMessages == \"true\" then .configuration.retainAckedMessages else null end),messageRetentionDuration: (if .configuration.messageRetentionDuration ==\"true\" then (.configuration |if .days!=null and .days!=\"7\" then((.days|tonumber * 24*60*60 )+  (.hours|tonumber * 60*60) +  (.minutes|tonumber * 60) | tostring +\"s\")  else ((.days|tonumber * 24*60*60 )| tostring +\"s\") end) else null end) }| with_entries(select(.value != null)),updateMask:(.configuration | \"retainAckedMessages,ackDeadlineSeconds,pushConfig.oidcToken.serviceAccountEmail\" + (if .enableDeadLettering == \"true\" then \",deadLetterPolicy.deadLetterTopic,deadLetterPolicy.maxDeliveryAttempts\" else \"\" end) + (if .messageRetentionDuration == \"true\" then \",messageRetentionDuration\" else \"\" end))}}"
            }
          },
          "actionDataFilter": {
            "results": "${ { body: .body, headers: .headers } }",
            "toStateData": "${ .output }"
          },
          "condition": "${.subscriptionExists==true}"
        },
        {
          "name": "CreateSubscriptionFunction",
          "functionRef": {
            "refName": "generalRestFunc",
            "arguments": {
              "uri": "${\"https://\"+.connectionProperties.hostName+\"/v1/projects/\"+.connectionProperties.projectID+\"/subscriptions/\"+.subscriptionName}",
              "method": "PUT",
              "body": "${{topic: (\"projects/\"+.connectionProperties.projectID+\"/topics/\"+.configuration.TopicName),retryPolicy:{minimumBackoff: \"30s\",maximumBackoff: \"300s\"},ackDeadlineSeconds: .configuration.ackDeadline,expirationPolicy:{}, pushConfig: {pushEndpoint: .integrationProperties.INTEGRATION_FLOW_URL,oidcToken: {serviceAccountEmail: .connectionProperties.serviceAccount}}, deadLetterPolicy: (if .configuration.enableDeadLettering == \"true\" then {deadLetterTopic: (\"projects/\"+.connectionProperties.projectID+\"/topics/\"+.configuration.deadLetterTopic), maxDeliveryAttempts:.configuration.maxDeliveryAttempts} else null end), filter: (if .configuration.filter then .configuration.filter else null end), enableMessageOrdering: (if .configuration.enableMessageOrdering == \"true\" then .configuration.enableMessageOrdering else null end),retainAckedMessages: (if .configuration.retainAckedMessages == \"true\" then .configuration.retainAckedMessages else null end),messageRetentionDuration: (if .configuration.messageRetentionDuration ==\"true\" then (.configuration |if .days!=null and .days!=\"7\" then((.days|tonumber * 24*60*60 )+  (.hours|tonumber * 60*60) +  (.minutes|tonumber * 60) | tostring +\"s\")  else ((.days|tonumber * 24*60*60 )| tostring +\"s\") end) else null end) } | with_entries(select(.value != null))}"
            }
          },
          "actionDataFilter": {
            "results": "${ { body: .body, headers: .headers } }",
            "toStateData": "${ .output }"
          },
          "condition": "${.subscriptionExists==false}"
        }
      ],
      "name": "startState",
      "type": "operation",
      "end": true
    }
  ]
},
"DeleteSubscriptionFlow": {
  "id": "DeleteSubscriptionFlow",
  "version": "0.1",
  "start": "startState",
  "specVersion": "0.8",
  "functions": [
    {
      "name": "generalRestFunc",
      "operation": "connectivity::rest",
      "type": "custom"
    }
  ],
  "states": [
    {
      "actions": [
        {
          "name": "DeleteSubscriptionFunction",
          "functionRef": {
            "refName": "generalRestFunc",
            "arguments": {
              "uri": "https://{hostName}/v1/projects/{projectID}/subscriptions/{name}",
              "method": "DELETE",
              "parameters": {
                "hostName": "${.connectionProperties.hostName}",
                "name": "${(.integrationProperties.INTEGRATION_FLOW_URL| split(\"/\") | .[-3]) as $intName | (.integrationProperties.INTEGRATION_FLOW_URL| split(\"/\") | .[-2]) as $flow | (.integrationProperties.INTEGRATION_FLOW_URL| split(\"/\") | .[2]) as$host | (if (.integrationProperties.INTEGRATION_FLOW_URL| split(\"/\")| .[-5])|test(\"project\") then $intName+\"_\"+\"project_\"+$flow+\"_\"+$host else $intName+\"_\"+$flow+\"_\"+$host end )}",
                "projectID": "${.connectionProperties.projectID}"
              }
            }
          },
          "actionDataFilter": {
            "results": "${ { body: .body, headers: .headers } }",
            "toStateData": "${ .output }"
          }
        }
      ],
      "name": "startState",
      "type": "operation",
      "end": true
    }
  ]
}

Flow to Deregister a Subscription

The flow to deregister a subscription executes an API call to delete the subscription. The API call does not check if the subscription exists.

Note:

To delete or search for an existing subscription, the flow must identify the subscription that was created during registration.