注意:

使用来自 Microsoft Entra ID 的访问令牌安全地访问 Oracle Integration

简介

随着 Oracle Integration 客户采用多云策略,他们通常需要跨不同的云提供商连接业务应用和流程。例如,公司可能有一个在 Microsoft Azure 上运行的应用程序,需要从 Oracle Cloud Infrastructure 应用程序访问数据。通常,您可以从 Oracle Cloud Infrastructure Identity and Access Management (OCI IAM) 获取令牌来检索此数据。但是,使用多个云提供商意味着处理多个令牌,这可能很复杂并构成安全风险。

想象一下,如果可以使用一个 OAuth 令牌与不同云提供商的应用集成,会有多方便。本教程介绍如何使用第三方 OAuth 提供程序调用 Oracle Integration 流。

体系结构

让我们可视化解决方案流程:

图像 1

该过程从从从 Microsoft Entra ID 获取 OAuth 令牌的用户或业务应用程序开始。获取后,此令牌用于调用通过 OCI API 网关公开的端点。配置为使用定制授权方 OCI 函数(以前称为 Oracle Functions)的 OCI API 网关首先调用此授权方函数来验证令牌。验证成功后,它将调用实际后端端点,即 Oracle Integration 流。

现在,让我们深入了解实施此过程的细节。为了简单起见,我们将将其分为三个步骤:

为什么要分别使用资源所有者密码身份证明 (ROPC) 和 JSON Web 标记 (JWT) 断言授权类型来从 Microsoft Entra ID 和 OCI IAM 获取访问标记?

同时使用 ROPC 和 JWT 断言授权提供了一种简化的安全方法来处理多云环境中的身份验证和令牌交换。

目标读者

目标

先决条件

任务 1:使用 Microsoft Entra ID 注册应用程序

要使用 Microsoft Entra ID 的 IAM 功能,包括访问受保护资源(图形 API),必须注册应用程序。

  1. 注册应用程序。有关详细信息,请参阅使用 Microsoft 身份平台注册应用程序

  2. 请注意概览部分中的 Application (client) ID 值。

    图像 2

  3. 转到 ManageCertificates & Secrets 并添加客户端密钥。请记下密钥值,因为它将在以后的任务中使用。

    图像 3

任务 2:OCI 身份域中 JWT 用户断言的先决条件步骤

  1. 从此处完成先决条件任务: JWT 用户断言的先决条件

  2. 为所需的范围验证 Oracle Integration 应用程序后,将生成自签名密钥对并配置机密应用程序。记下 scopeprivate_key.pemClient IDClient Secret

    Note: While importing the private key as a trusted partner in the confidential application use the same alias as being used during creation of the self-signed key pairs and note down the alias for later tasks.

  3. 创建动态组以允许特定区间中的资源类型 function 能够从 OCI Vault 服务读取密钥。

    图像 6

任务 3:在 OCI Vault 中创建密钥

使用 OCI Vault 手动生成密钥选项存储从任务 1 和任务 2 收集的密钥。有关详细信息,请参阅在 Vault 中创建密钥

图像 4

创建密钥后,从密钥信息部分复制 OCID 值,并将其存储用于后续任务。

图像 5

任务 4:创建和配置 func.py 文件

我们将使用 OCI Functions 作为自定义授权者来验证 Microsoft Entra ID 访问令牌,并将 OCI IAM 访问令牌生成为 back_end_token

  1. 要启动,请创建一个应用程序。在 OCI Functions 中,应用程序是函数的逻辑分组。为应用程序指定的属性确定该应用程序中所有函数的资源分配和配置。有关详细信息,请参阅创建应用程序

  2. 创建应用程序后,将配置添加到应用程序。我们将从函数代码中提取以下项目,使其更便于移植和可配置,而无需修改代码。输入 Key(键)Value(值),然后单击 +

    添加来自 Microsoft Entra ID、OCI 身份域的客户端 ID、在任务 3 中收集的密钥的 OCID、从任务 2 收集的别名、范围以及将根据其验证微软 Entra ID 令牌的图形端点 https://graph.microsoft.com/v1.0/me

    图像 7

  3. 要创建函数,请转到入门并单击启动 OCI Cloud Shell 以在浏览器中打开交互式 Linux 样式的云 Shell。加载 OCI Cloud Shell 后,您可以立即从 OCI Cloud Shell 创建、开发和部署定制授权者 Oracle 功能。

  4. 要使用 Fn 项目命令行界面 (Command Line Interface,CLI) 创建函数,请为 Python 函数 fn init --runtime python MyCustomAuthorizer 输入以下命令,然后单击 Enter

    图像 8

  5. 创建函数的样板,现在可以相应地对其进行编辑,以包括定制授权者逻辑。将目录更改为函数文件夹并编辑 func.py 文件。复制并粘贴以下代码片段。

    图像 9

    import io
    import json
    import logging
    import jwt
    import datetime
    from datetime import timedelta
    import time
    import base64
    
    from fdk import response
    import requests
    from requests.auth import HTTPBasicAuth
    from cryptography.hazmat.primitives import serialization
    from cryptography.hazmat.backends import default_backend
    import ociVault
    
    oauth_apps = {}
    
    def initContext(context):
        # This method takes elements from the Application Context and from OCI Vault to create the OAuth App Clients object.
        if (len(oauth_apps) < 2):
            try:
                logging.getLogger().info("initContext: Initializing context")
    
                oauth_apps['idcs'] = {'introspection_endpoint': context['idcs_token_endpoint'],
                                    'client_id': context['idcs_app_client_id'],
                                    'scope':context['idcs_oauth_scope'],
                                    'alias':context['alias'],
                                    'client_secret': ociVault.getSecret(context['idcs_client_secret_ocid'])}
                oauth_apps['AD'] = {'token_endpoint': context['ad_endpoint'],
                                    'client_id': context['ad_app_client_id'],
                                    'client_secret': ociVault.getSecret(context['ad_client_secret_ocid'])}
    
            except Exception as ex:
                logging.getLogger().error("initContext: Failed to get config or secrets" + str(ex))
                raise
    
    
    def getAuthContext(token, client_apps):
        # This method populates the Auth Context that will be returned to the gateway.
        auth_context = {}
        access_token = token[len('Bearer '):]
        jwtToken = json.loads(json.dumps(jwt.decode(access_token, options={"verify_signature": False})))
        # Calling MSFT to validate the token
        try:
        logging.getLogger().info("getAuthContext: Calling Token Introspection function") 
        respIntrospectToken = introspectToken(access_token, client_apps['AD']['token_endpoint'], client_apps['AD']['client_id'], client_apps['AD']['client_secret'])
        except Exception as ex:
                logging.getLogger().error("getAuthContext: Failed to introspect token" + str(ex))
                raise
    
        # If AD confirmed the token valid and active, we can proceed to populate the auth context
        if (respIntrospectToken.status_code == 200):
            auth_context['active'] = True
            auth_context['principal'] = jwtToken['upn']
            auth_context['scope'] = 'https://graph.microsoft.com/.default'
            # Retrieving the back-end Token
            backend_token = getBackEndAuthToken(client_apps['idcs']['introspection_endpoint'], client_apps['idcs']['client_id'], client_apps['idcs']['client_secret'],client_apps['idcs']['scope'],client_apps['idcs']['alias'],auth_context['principal'])
    
            # The maximum TTL for this auth is the lesser of the API Client Auth (Entra ID) and the Gateway Client Auth (OCI IAM)
            if (datetime.datetime.fromtimestamp(jwtToken['exp']) < (datetime.datetime.utcnow() + timedelta(seconds=backend_token['expires_in']))):
                auth_context['expiresAt'] = (datetime.datetime.fromtimestamp(jwtToken['exp'])).replace(tzinfo=datetime.timezone.utc).astimezone().replace(microsecond=0).isoformat()
            else:
                auth_context['expiresAt'] = (datetime.datetime.utcnow() + timedelta(seconds=backend_token['expires_in'])).replace(tzinfo=datetime.timezone.utc).astimezone().replace(microsecond=0).isoformat()
            # Storing the back_end_token in the context of the auth decision so we can map it to Authorization header using the request/response transformation policy
            auth_context['context'] = {'back_end_token': ('Bearer ' + str(backend_token['access_token']))}
    
        else:
            # API Client token is not active, so we will go ahead and respond with the wwwAuthenticate header
            auth_context['active'] = False
            auth_context['wwwAuthenticate'] = 'Bearer realm=\"identity.oraclecloud.com\"'
    
        return(auth_context)
    
    def introspectToken(access_token, introspection_endpoint, client_id, client_secret):
        # This method simply invokes the introspection api as configured in the configuration screen.  
        # The real validation happens in the getAuthContext function.  
        #payload = {'token': access_token}
        headers = {'Accept': 'application/json',
                'Authorization':'Bearer '+access_token}
        try:
            logging.getLogger().info("introspectToken: Introspecting Token") 
            resp = requests.get(introspection_endpoint,
                                headers=headers)
            print(resp)
    
        except Exception as ex:
            logging.getLogger().error("introspectToken: Failed to introspect token" + str(ex))
            raise
    
        return resp
    
    def getBackEndAuthToken(token_endpoint, client_id, client_secret, scope, alias, principal):
        # This method gets the token from the back-end system (ORDS in this case)
        try:
            logging.getLogger().info("getBackEndAuthToken: Getting Backend Token") 
            print("Sub is " + principal)
    
            with open("private_key.pem", "rb") as key_file:
                private_key = serialization.load_pem_private_key(
                    key_file.read(),
                    password=None,
                    backend=default_backend()
                )
    
            headers = {
                "alg": "RS256",
                "typ": "JWT",
                "kid": "abc"
            }
    
            claims = {
                "sub": principal,
                "aud": "https://identity.oraclecloud.com/",
                "iss": client_id,
                "iat": int(time.time()),
                "exp": int(time.time()) + 3600,  # 1 hour expiration
                "jti": "8c7df446-bfae-40be-be09-0ab55c655436"  # random number
            }
    
            logging.getLogger().info("Claims : ")
            logging.getLogger().info(claims) 
    
            jwt_assertion = jwt.encode(
                payload=claims,
                key=private_key,
                algorithm="RS256",
                headers=headers
            )
            logging.getLogger().info("Assertion is :") 
            logging.getLogger().info(jwt_assertion) 
    
            encoded = client_id + ":" + client_secret
            baseencoded = base64.urlsafe_b64encode(encoded.encode('UTF-8')).decode('ascii')
            payload = {'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer',
                    'scope':scope, 'assertion':jwt_assertion}
            headers = {'Content-Type': 'application/x-www-form-urlencoded;charset=UTF-8', 'Authorization': 'Basic %s' % baseencoded, 'Accept': '*/*'}
            backend_token = json.loads(requests.post(token_endpoint,
                                                    data=payload,
                                                    headers=headers).text)
            logging.getLogger().info("Backend token in generated :") 
            logging.getLogger().info(backend_token) 
    
        except Exception as ex:
            logging.getLogger().error("getBackEndAuthToken: Failed to get ORDS token" + str(ex))
            raise
    
        return backend_token
    
    
    
    def handler(ctx, data: io.BytesIO = None):
        initContext(dict(ctx.Config()))
        logging.getLogger().info(oauth_apps)
    
        auth_context = {}
        try:
            logging.getLogger().info("handler: Started Function Execution") 
            gateway_auth = json.loads(data.getvalue())
            auth_context = getAuthContext(gateway_auth['token'], oauth_apps)
            if (auth_context['active']):
                logging.getLogger().info('Authorizer returning 200...')
                return response.Response(
                    ctx,
                    response_data=json.dumps(auth_context),
                    status_code = 200,
                    headers={"Content-Type": "application/json"}
                    )
            else:
                logging.getLogger().info('Authorizer returning 401...')
                return response.Response(
                    ctx,
                    response_data=json.dumps(str(auth_context)),
                    status_code = 401,
                    headers={"Content-Type": "application/json"}
                    )
    
        except (Exception, ValueError) as ex:
            logging.getLogger().info('error parsing json payload: ' + str(ex))
    
            return response.Response(
                ctx,
                response_data=json.dumps(str(auth_context)),
                status_code = 500,
                headers={"Content-Type": "application/json"}
                )
    
    
    • 导入

      • iojsonloggingdatetimetimebase64用于处理 I/O、JSON 数据、日志记录、日期和时间操作以及 base64 编码的标准 Python 库。
      • jwt用于对 JSON Web 标记 (JWT) 进行编码和解码的库。
      • requests用于发出 HTTP 请求的库。
      • HTTPBasicAuth用于处理 HTTP 基本验证的类。
      • serializationdefault_backend从加密库中,用于处理加密操作。
      • ociVault用于与 OCI Vault 交互的定制模块。
    • 全局变量

      • oauth_apps: 字典,用于存储应用程序配置。
    • 函数

      • initContext(context): 此函数的用途是使用 OCI Vault 中的上下文数据和密钥初始化应用程序配置。它接收上下文字典对象,该对象作为主处理程序方法中的第一项被调用,并使用任务 5 中解释的 getSecret() 函数从 OCI Vault 检索密钥。

      • getAuthContext(token, client_apps): 填充并返回 OCI API 网关的验证上下文。提取并解码访问令牌。调用 introspectToken() 函数以验证具有 Entra ID 的标记。如果令牌有效,请设置验证上下文,调用 getBackEndAuthToken() 函数以从 OCI IAM 检索后端令牌并设置到期时间。如果令牌无效,请设置 wwwAuthenticate 标头以指示验证错误。

      • introspectToken(access_token, introspection_endpoint, client_id, client_secret): 使用提供的 introspection_endpoint 验证令牌。使用令牌向自测端点发出 GET 请求。返回自测或验证端点的响应。由于 Microsoft Entra ID 没有 OAuth 自测 API 端点,因此我们将使用收到的标记作为输入来调用配置的端点。

      • getBackEndAuthToken(token_endpoint, client_id, client_secret, scope, alias, principal): 从 PEM 文件装入私钥。创建 JWT 声明并将其编码为 JWT 声明。为令牌请求准备有效负载和标头。对令牌端点发出 POST 请求以获取后端令牌并将后端令牌返回到 getAuthContext() 函数。

      • handler(ctx, data: io.BytesIO = None): 用于处理函数执行的主函数。使用 initContext() 函数初始化 OAuth 上下文,并调用 getAuthContext() 函数以获取验证上下文。如果令牌有效,则返回 200 响应,否则返回 401 响应。在出现错误时记录并返回 500 响应。

任务 5:创建和配置 ociVault.py 文件

在同一文件夹中创建 ociVault.py 文件并粘贴以下代码片段。此实用程序函数从 OCI Vault 服务读取密钥。

# Utility Function to get secrets from OCI Vault
import logging
import oci
import base64

def getSecret(ocid):
    signer = oci.auth.signers.get_resource_principals_signer()
    try:
        client = oci.secrets.SecretsClient({}, signer=signer)
        secret_content = client.get_secret_bundle(ocid).data.secret_bundle_content.content.encode('utf-8')
        decrypted_secret_content = base64.b64decode(secret_content).decode('utf-8')
    except Exception as ex:
        logging.getLogger().error("getSecret: Failed to get Secret" + ex)
        print("Error [getSecret]: failed to retrieve", ex, flush=True)
        raise
    return decrypted_secret_content

注:将任务 2 中的 private_key.pem 文件保存在同一文件夹中。

图像 10

任务 5:测试函数

为了测试函数,我们需要部署函数,然后通过传递 Microsoft Entra ID 令牌作为输入来调用它。

  1. 导航到函数文件夹并运行以下命令 fn -v deploy --app MyCustomAuthorizer 来部署它。Fn Project CLI 命令将在 OCI Functions 应用程序上构建函数并部署相同的函数。

    图像 11

    注意:在部署函数应用程序之前,在 requirements.txt 文件中包括 fdk>=0.1.74requestsocipyjwtserialization

  2. 使用 Postman 客户端通过 OAuth 2.0 ROPC 流从 Microsoft Entra ID 生成访问令牌。

    图像 12

  3. 请注意访问令牌以生成 payload.json,该令牌将作为输入传递以测试 OCI 函数。将 JSON 文件保存在同一个函数目录中。

    图像 13

  4. 保存有效负载后,您可以运行以下命令来模拟函数的执行,因为它将通过 OCI API 网关 cat payload.json | fn invoke <AppName> <function name> 调用,如下图中所示。

    图像 14

    如果 Microsoft Entra ID 令牌有效,您将看到如下图中所示的响应,您将在上下文结构的 back_end_token 值中看到 OCI IAM 令牌值。

任务 6:配置 OCI API 网关

OCI API Gateway 是一个完全托管、可扩展的云原生 API 管理平台,提供从快速 API 部署到生命周期管理和后端服务集成的一整套服务。我们将利用 API 网关使用外部身份提供者(如 Microsoft Entra ID)来调解 Oracle Integration 的授权。

首先创建新的 API 网关,然后在 API 网关上创建新部署。

  1. 导航到 Developer ServicesAPI ManagementGateways 。输入以下信息,然后单击创建网关

    图像 15

    图像 16

  2. 网关详细信息页中,单击创建部署,然后为 API 部署输入以下必需信息。

    • 名称:输入一个名称。
    • 路径前缀:定义路径。
    • 区间:为 API 部署选择适当的区间。

    图像 17

  3. 添加验证策略详细信息。您可以在此处配置要作为定制授权者调用的 OCI 函数。选择在任务 4 中创建的函数。

    图像 18

  4. 路由页中,配置到后端服务的 API 路由。在本教程中,我们将定义到 Oracle Integration 端点的路由。

    图像 19

  5. 单击显示路由请求策略,用户将在其中对请求的验证标头执行从 OCI Functions 响应的验证标记交换。

    图像 20

    此步骤涉及基于后端身份提供者为后端服务设置验证令牌。在我们的方案中,我们将设置从定制授权者 OCI Functions 收到的 OCI IAM 的持有者令牌。在此处,我们将授权标头配置为使用值 ${request.auth[back_end_token]} 覆盖。请注意,back_end_token 是 Oracle 函数响应结构中上下文的一部分。确保在定制授权者 OCI 函数完成后,此表达式计算成功。

  6. 成功复查配置后,单击保存更改以保存部署,并等待部署状态更改为活动

    图像 21

    激活 API 部署后,从部署信息部分复制端点(基本 URL)。此 URL 用作部署的端点,您的业务流程或应用程序将使用 Microsoft Entra ID 持有者标记调用 Oracle Integration 端点。我们将在下一个任务中使用基本 URL。

    图像 22

任务 7:测试 API

首先,使用 Postman 客户端从 Microsoft Entra ID 获取访问令牌。我们将使用 ROPC 流来确保访问令牌包含必要的身份信息。

  1. 复制访问令牌,因为从 API 网关调用 API 时将使用相同的访问令牌。

    图像 12

  2. 创建新的 REST 请求,合并从 API 网关和 Oracle Integration 端点复制在任务 6 中的基本端点 URL,如下图所示。在请求标头中使用 Bearer 标记。

    图像 23

  3. 单击发送以调用 API 请求,它将运行 Oracle Integration,并应成功输出。

    图像 24

后续步骤

我们使用来自 Microsoft Entra ID 的 OAuth 标记在 Oracle API Gateway 上成功调用 API,并从 Oracle Integration REST 触发器流接收响应。这种集成对于客户在不同云供应商之间连接数字服务至关重要。

确认

更多学习资源

浏览 docs.oracle.com/learn 上的其他实验室,或者访问 Oracle Learning YouTube 渠道上的更多免费学习内容。此外,请访问 education.oracle.com/learning-explorer 成为 Oracle Learning Explorer。

有关产品文档,请访问 Oracle 帮助中心