イベント・サブスクリプションの理解
webフック・トリガーの場合、外部アプリケーションからイベントを受信するには、それらのイベントをサブスクライブする必要があります。
webフック構成プロセスでは、イベントにサブスクライブし、外部アプリケーションがイベント・データを送信する宛先URIを指定します。 イベントのサブスクリプションは、次のいずれかの方法で作成できます:
-
手動: 外部アプリケーションの管理ページを使用してwebフック・サブスクリプションを作成します。
-
プログラム: 外部アプリケーションへのAPIコールを実行します。
サブスクリプションの設定の一環として、外部アプリケーションがpingを発行して、エンドポイントに到達できることを確認できます。 また、pingの明示的な確認が必要になる場合もあります。 「Webフック登録時の検証」を参照してください。
同様の方法で、必要に応じてイベントのサブスクライブを解除できます。
Oracle Integrationでは、webフックの登録は、Oracle Integrationがイベントを受信する準備ができていることを示す統合フローのアクティブ化に対応しています。 登録解除は、統合フローがイベントを受信していないことを示す統合フローの非アクティブ化に対応します。
Rapid Adapter Builderのアダプタ定義ドキュメントでは、Register構造体とDeregister構造体の両方がサポートされています。 次の例に示すように、これらの構成は両方ともフローを取ります。 各フローの目的は、1つ以上のAPIコールを実行して、外部サービス上のwebフックにプログラムでサブスクライブすることです。
"subscription": {
"register": "flow:CreateSubscriptionFlow",
"deregister": "flow:DeleteSubscriptionFlow"
},サブスクリプションの登録および登録解除のフロー
RegisterおよびDeregisterコンストラクトのフローの例を次に示します。
Registerロジックでは、次のアクションが実行されます:
- 外部アプリケーションへのAPIコールを実行して、ログイン・アカウントのサブスクリプションをリストします。
- 結果を解析してjq式を実行して、サブスクリプションがすでに存在するかどうかを決定します。 これはCNCF機能としてモデル化されています。
- サブスクリプションが存在する場合、APIコールを介してサブスクリプションが更新されます。
- サブスクリプションが存在しない場合は、APIコールを介して新しいサブスクリプションが作成されます。
フローのDeregisterロジックは、サブスクリプションを削除するAPIをコールするのみですが、(追加のAPIコールを回避するために)サブスクリプションが存在するかどうかはチェックしません。
ノート:
次のコード例では、Oracle Integrationエンドポイント(.integrationProperties.INTEGRATION_FLOW_URLと表記)は、サブスクリプション登録の詳細の検索に役立ちます。
"CreateSubscriptionFlow": {
"id": "CreateSubscriptionFlow",
"description": "CreateSubscriptionFlow",
"version": "0.1",
"start": "startState",
"specVersion": "0.8",
"functions": [
{
"name": "ResolveSubscriptionName",
"operation": "(.integrationProperties.INTEGRATION_FLOW_URL| split(\"/\") | .[-3]) as $intName | (.integrationProperties.INTEGRATION_FLOW_URL| split(\"/\") | .[-2]) as $flow | (.integrationProperties.INTEGRATION_FLOW_URL| split(\"/\") | .[2]) as$host | (if (.integrationProperties.INTEGRATION_FLOW_URL| split(\"/\") | .[-5])|test(\"project\") then $intName+\"_\"+\"project_\"+$flow+\"_\"+$host else $intName+\"_\"+$flow+\"_\"+$host end )",
"type": "expression"
},
{
"name": "generalRestFunc",
"operation": "connectivity::rest",
"type": "custom"
},
{
"name": "TransformSubName",
"operation": "[.integrationProperties.INTEGRATION_FLOW_URL==.subscriptionList.body.subscriptions[].pushConfig.pushEndpoint]|any",
"type": "expression"
}
],
"states": [
{
"actions": [
{
"functionRef": "ResolveSubscriptionName",
"actionDataFilter": {
"toStateData": "${ .subscriptionName }"
}
},
{
"name": "ListSubscriptionFunction",
"functionRef": {
"refName": "generalRestFunc",
"arguments": {
"uri": "${\"https://\"+.connectionProperties.hostName+\"/v1/projects/\"+.connectionProperties.projectID+\"/subscriptions\"}",
"method": "GET",
"parameters": {
"pageSize": 1000
}
}
},
"actionDataFilter": {
"results": "${{ body: .body, headers: .headers }}",
"toStateData": "${ .subscriptionList }"
}
},
{
"functionRef": "TransformSubName",
"actionDataFilter": {
"toStateData": "${ .subscriptionExists }"
}
},
{
"name": "UpdateSubscriptionFunction",
"functionRef": {
"refName": "generalRestFunc",
"arguments": {
"uri": "${\"https://\"+.connectionProperties.hostName+\"/v1/projects/\"+.connectionProperties.projectID+\"/subscriptions/\"+.subscriptionName}",
"method": "PATCH",
"body": "${{subscription:{ackDeadlineSeconds: .configuration.ackDeadline,pushConfig:{oidcToken: {serviceAccountEmail: .connectionProperties.serviceAccount}},deadLetterPolicy: (if .configuration.enableDeadLettering == \"true\" then {deadLetterTopic: (\"projects/\"+.connectionProperties.projectID+\"/topics/\"+.configuration.deadLetterTopic), maxDeliveryAttempts:.configuration.maxDeliveryAttempts} else null end),retainAckedMessages: (if .configuration.retainAckedMessages == \"true\" then .configuration.retainAckedMessages else null end),messageRetentionDuration: (if .configuration.messageRetentionDuration ==\"true\" then (.configuration |if .days!=null and .days!=\"7\" then((.days|tonumber * 24*60*60 )+ (.hours|tonumber * 60*60) + (.minutes|tonumber * 60) | tostring +\"s\") else ((.days|tonumber * 24*60*60 )| tostring +\"s\") end) else null end) }| with_entries(select(.value != null)),updateMask:(.configuration | \"retainAckedMessages,ackDeadlineSeconds,pushConfig.oidcToken.serviceAccountEmail\" + (if .enableDeadLettering == \"true\" then \",deadLetterPolicy.deadLetterTopic,deadLetterPolicy.maxDeliveryAttempts\" else \"\" end) + (if .messageRetentionDuration == \"true\" then \",messageRetentionDuration\" else \"\" end))}}"
}
},
"actionDataFilter": {
"results": "${ { body: .body, headers: .headers } }",
"toStateData": "${ .output }"
},
"condition": "${.subscriptionExists==true}"
},
{
"name": "CreateSubscriptionFunction",
"functionRef": {
"refName": "generalRestFunc",
"arguments": {
"uri": "${\"https://\"+.connectionProperties.hostName+\"/v1/projects/\"+.connectionProperties.projectID+\"/subscriptions/\"+.subscriptionName}",
"method": "PUT",
"body": "${{topic: (\"projects/\"+.connectionProperties.projectID+\"/topics/\"+.configuration.TopicName),retryPolicy:{minimumBackoff: \"30s\",maximumBackoff: \"300s\"},ackDeadlineSeconds: .configuration.ackDeadline,expirationPolicy:{}, pushConfig: {pushEndpoint: .integrationProperties.INTEGRATION_FLOW_URL,oidcToken: {serviceAccountEmail: .connectionProperties.serviceAccount}}, deadLetterPolicy: (if .configuration.enableDeadLettering == \"true\" then {deadLetterTopic: (\"projects/\"+.connectionProperties.projectID+\"/topics/\"+.configuration.deadLetterTopic), maxDeliveryAttempts:.configuration.maxDeliveryAttempts} else null end), filter: (if .configuration.filter then .configuration.filter else null end), enableMessageOrdering: (if .configuration.enableMessageOrdering == \"true\" then .configuration.enableMessageOrdering else null end),retainAckedMessages: (if .configuration.retainAckedMessages == \"true\" then .configuration.retainAckedMessages else null end),messageRetentionDuration: (if .configuration.messageRetentionDuration ==\"true\" then (.configuration |if .days!=null and .days!=\"7\" then((.days|tonumber * 24*60*60 )+ (.hours|tonumber * 60*60) + (.minutes|tonumber * 60) | tostring +\"s\") else ((.days|tonumber * 24*60*60 )| tostring +\"s\") end) else null end) } | with_entries(select(.value != null))}"
}
},
"actionDataFilter": {
"results": "${ { body: .body, headers: .headers } }",
"toStateData": "${ .output }"
},
"condition": "${.subscriptionExists==false}"
}
],
"name": "startState",
"type": "operation",
"end": true
}
]
},
"DeleteSubscriptionFlow": {
"id": "DeleteSubscriptionFlow",
"version": "0.1",
"start": "startState",
"specVersion": "0.8",
"functions": [
{
"name": "generalRestFunc",
"operation": "connectivity::rest",
"type": "custom"
}
],
"states": [
{
"actions": [
{
"name": "DeleteSubscriptionFunction",
"functionRef": {
"refName": "generalRestFunc",
"arguments": {
"uri": "https://{hostName}/v1/projects/{projectID}/subscriptions/{name}",
"method": "DELETE",
"parameters": {
"hostName": "${.connectionProperties.hostName}",
"name": "${(.integrationProperties.INTEGRATION_FLOW_URL| split(\"/\") | .[-3]) as $intName | (.integrationProperties.INTEGRATION_FLOW_URL| split(\"/\") | .[-2]) as $flow | (.integrationProperties.INTEGRATION_FLOW_URL| split(\"/\") | .[2]) as$host | (if (.integrationProperties.INTEGRATION_FLOW_URL| split(\"/\")| .[-5])|test(\"project\") then $intName+\"_\"+\"project_\"+$flow+\"_\"+$host else $intName+\"_\"+$flow+\"_\"+$host end )}",
"projectID": "${.connectionProperties.projectID}"
}
}
},
"actionDataFilter": {
"results": "${ { body: .body, headers: .headers } }",
"toStateData": "${ .output }"
}
}
],
"name": "startState",
"type": "operation",
"end": true
}
]
}