对存储在云对象存储中的消息使用持久消息传送

DBMS_PIPE 程序包在 Autonomous Database 上扩展了功能,以支持持久性消息传送,其中消息存储在 Cloud Object Store 中。

关于使用 DBMS_PIPE 进行持久性消息传送

通过使用 DBMS_PIPE 的持久性消息传递,一个或多个数据库会话可以在同一区域或具有存储在云对象存储中的消息的区域之间进行通信。

DBMS_PIPE 中的持久消息:

  • 允许您发送和检索非常大的消息。

  • 支持发送大量管道消息。

  • 支持在单个数据库中、跨多个数据库以及跨不同区域的数据库发送和接收消息。

  • 支持使用同一云对象存储位置 URI 的多个管道。

可以使用任何支持的 DBMS_PIPE 类型创建持久性消息传送管道:

  • 隐式管道:使用 DBMS_PIPE.SEND_MESSAGE 函数使用未知的管道名称发送消息时自动创建。
  • 显式管道:使用具有用户指定的管道名称的 DBMS_PIPE.CREATE_PIPE 函数创建。
  • 公共管道:任何对 DBMS_PIPE 软件包具有 EXECUTE 权限的用户均可访问。
  • 专用管道:由与管道创建者具有相同用户的会话访问。

注意:

Oracle 建议您在发送或接收具有持久性消息传送的消息之前创建显式管道。使用 DBMS_PIPE.CREATE_PIPE 创建显式管道可确保使用所需的访问权限(公共或专用)创建管道(通过设置 private 参数)。

下面显示了使用持久性消息传送的 DBMS_PIPE 的常规工作流:

后面是 database-pipe-persistent-messaging.eps 的说明
插图 database-pipe-persistent-messaging.eps 的说明

使用 DBMS_PIPE 的现有应用程序可以继续以最少的更改进行操作。可以使用登录触发器或使用其他一些初始化例程,将 DBMS_PIPE 与身份证明对象和位置 URI 一起使用的现有应用程序配置。设置 DBMS_PIPE 身份证明和位置 URI 后,无需进行其他更改即可使用持久性消息传送。后续使用管道时,会将消息存储在云对象存储中,而不是存储在数据库内存中。这样,您可以将内存中消息的存储方法更改为持久性云对象存储,并且更改极少。

持久消息传送概述和功能

使用持久性消息传送的 DBMS_PIPE 的功能:

  • 可以在同一区域中的多个 Autonomous Database 实例之间或跨区域发送和检索消息。

  • 持久性消息保证只能由一个进程写入或读取。这样可以防止由于并发读取和写入而导致消息内容不一致。DBMS_PIPE 仅允许一个操作,发送消息或接收消息在给定时间处于活动状态,这些操作受锁定机制保护。但是,如果由于正在进行的操作而无法执行某个操作,该进程将定期重试,直到达到 timeout 值。

  • DBMS_PIPE 使用 DBMS_CLOUD 访问云对象存储。消息可以存储在任何受支持的云对象存储中。有关更多信息,请参见 Cloud Object Storage URI Formats

  • DBMS_PIPE 使用 DBMS_CLOUD 访问云对象存储,并且所有支持的身份证明类型都可用:

DBMS_PIPE 权限授权和安全

DBMS_PIPE 过程使用调用者的权限运行。专用管道由当前用户拥有,由用户创建的专用管道只能由同一用户使用。这适用于将消息存储到云对象存储的内存中管道和持久性消息传送管道。在调用者的方案中运行发送和接收消息。

使用将消息存储到云对象存储的专用管道,需要凭证对象才能使用 location_uri 参数标识的云对象存储进行验证。调用用户必须对使用用于访问对象存储的 credential_name 参数指定的身份证明对象具有 EXECUTE 权限。

要使用公共管道,用户数据库会话必须具有对 DBMS_PIPE 的执行权限。对于使用持久性消息传递并将消息存储到云对象存储的公共管道,用户数据库会话必须具有对 DBMS_CLOUD 的执行权限并对身份证明对象执行权限(或者您可以创建允许访问包含消息的位置 URI 的身份证明对象)。

DBMS_PIPE 限制

DBMS_PIPE 软件包不支持在使用不同字符集的数据库之间发送消息。例如,如果有一个使用 AL32UTF8 的 Autonomous Database 实例和一个使用 WE8MSWIN1252 的实例,则不能在这两个数据库之间发送带有 DBMS_PIPE 的消息。在这种情况下,如果尝试在这两个数据库之间发送带有 DBMS_PIPE 的消息,系统将引发错误 ORA-12704

有关详细信息,请参阅Autonomous Database 的字符集选择

创建显式持久性管道并发送消息

介绍创建具有指定管道名称(显式管道)的持久性管道的步骤。

  1. 使用过程 DBMS_CLOUD.CREATE_CREDENTIAL 存储对象存储身份证明。例如:
    BEGIN
      DBMS_CLOUD.CREATE_CREDENTIAL(
        credential_name => 'my_persistent_pipe_cred',
        username => 'adb_user@example.com',
        password => 'password'
      );
    END;
    /

    此操作以加密格式将身份证明存储在数据库中。您可以使用身份证明名称的任何名称。请注意,除非对象存储身份证明发生更改,否则此步骤仅需要一次。存储身份证明后,可以使用相同的身份证明名称访问云对象存储以通过 DBMS_PIPE 发送和接收消息。

    有关参数的详细信息,请参见CREATE_CREDENTIAL Procedure 。对于 Oracle Cloud Infrastructure Object Storage ,凭证必须使用原生 Oracle Cloud Infrastructure 验证。

    注意:

    某些工具(如 SQL*Plus 和 SQL Developer)使用“和”号字符 (&) 作为特殊字符。如果密码中有 & 符号,请使用这些工具中的 SET DEFINE OFF 命令(如示例中所示)禁用特殊字符并正确创建凭证。
  2. 创建用于发送和检索消息的显式管道。例如,创建一个名为 ORDER_PIPE 的管道。
    DECLARE
      r_status INTEGER;
    BEGIN
        r_status := DBMS_PIPE.CREATE_PIPE(pipename => 'ORDER_PIPE');
    END;
    /

    有关更多信息,请参见 CREATE_PIPE Function

  3. 验证是否已创建管道。
    SELECT ownerid, name, type FROM v$db_pipes 
           WHERE name = 'ORDER_PIPE';
    OWNERID NAME       TYPE    
    ------- ---------- ------- 
         80 ORDER_PIPE PRIVATE 
  4. 使用 DBMS_PIPE 过程设置默认访问身份证明和位置 URI 以将持久性消息存储到云对象存储。
    BEGIN
        DBMS_PIPE.SET_CREDENTIAL_NAME('my_persistent_pipe_cred');
        DBMS_PIPE.SET_LOCATION_URI('https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname1/'); 
    END;
    /

    这些过程设置用于 DBMS_PIPE 过程的缺省凭证名称和缺省位置 URI。

    如果您使用 Oracle Cloud Infrastructure Object Storage 存储消息,则可以使用 Oracle Cloud Infrastructure 本机 URI 或 Swift URI。但是,位置 URI 和身份证明的类型必须匹配,如下所示:

    • 如果使用本机 URI 格式访问 Oracle Cloud Infrastructure Object Storage ,则必须在身份证明对象中使用本机 Oracle Cloud Infrastructure 签名密钥验证。

    • 如果使用 Swift URI 格式访问 Oracle Cloud Infrastructure Object Storage ,则必须在身份证明对象中使用验证令牌验证。

    有关更多信息,请参见 SET_CREDENTIAL_NAME ProcedureGET_LOCATION_URI Function

  5. 在管道上打包并发送消息。
    DECLARE
      l_result INTEGER;
      l_date   DATE;
    BEGIN
        l_date := sysdate;
        DBMS_PIPE.PACK_MESSAGE(l_date);         -- date of order
        DBMS_PIPE.PACK_MESSAGE('C123');         -- order number
        DBMS_PIPE.PACK_MESSAGE(5);              -- number of items in order
        DBMS_PIPE.PACK_MESSAGE('Printers');     -- type of item in order
    
     
        l_result := DBMS_PIPE.SEND_MESSAGE(
                        pipename => 'ORDER_PIPE',
                        credential_name => DBMS_PIPE.GET_CREDENTIAL_NAME,
                        location_uri => DBMS_PIPE.GET_LOCATION_URI);
         
        IF l_result = 0 THEN
            DBMS_OUTPUT.put_line('DBMS_PIPE sent order successfully');
        END IF;
     
    END;
    /

在同一数据库上检索持久性消息

介绍从同一 Autonomous Database 实例(消息发送到的实例)上的显式管道中检索持久性消息的步骤。

Autonomous Database 实例上,您可以接收从其他会话发送到管道的消息。DBMS_PIPE 过程是调用者的权限过程,并以当前调用的用户身份运行。

专用管道由创建管道的当前用户拥有。专用管道只能由创建该管道的同一用户访问。这适用于使用内存中消息的管道,以及使用持久消息传送和存储在云对象存储中的消息的管道。

DBMS_PIPE 具有执行权限的任何数据库会话都可以访问公共管道。这适用于使用内存中消息的管道,以及使用持久消息传送和存储在云对象存储中的消息的管道。

  1. 验证是否已创建管道。
    SELECT ownerid, name, type FROM v$db_pipes 
           WHERE name = 'ORDER_PIPE';
    
    OWNERID NAME       TYPE    
    ------- ---------- ------- 
         80 ORDER_PIPE PRIVATE 

    当您在同一 Autonomous Database 实例上并且管道存在时,您无需在收到消息之前运行 DBMS_PIPE.CREATE_PIPE。这在同一实例上创建管道时适用,如 Create an Explicit Persistent Pipe and Send a Message 中所示。

  2. 从管道接收消息。
    DECLARE
        message1  DATE;
        message2  VARCHAR2(100);
        message3  INTEGER;
        message4  VARCHAR2(100);
        l_result  INTEGER;
    
    BEGIN
    
        DBMS_PIPE.SET_CREDENTIAL_NAME('my_persistent_pipe_cred');
        DBMS_PIPE.SET_LOCATION_URI('https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname1/'); 
        l_result := DBMS_PIPE.RECEIVE_MESSAGE (
                      pipename => 'ORDER_PIPE',
                      timeout  => DBMS_PIPE.MAXWAIT,
                      credential_name => DBMS_PIPE.GET_CREDENTIAL_NAME,
                      location_uri => DBMS_PIPE.GET_LOCATION_URI);
     
        IF l_result = 0 THEN
            DBMS_PIPE.unpack_message(message1);
            DBMS_PIPE.unpack_message(message2);
            DBMS_PIPE.unpack_message(message3);
            DBMS_PIPE.unpack_message(message4);
     
            DBMS_OUTPUT.put_line('Order Received Successfully On: ' || TO_CHAR(sysdate, 'dd-mm-yyyy hh24:mi:ss'));
            DBMS_OUTPUT.put_line('Date of Order: ' || message1);
            DBMS_OUTPUT.put_line('Order Number: ' || message2);
            DBMS_OUTPUT.put_line('Number of Items In Order: ' || message3);
            DBMS_OUTPUT.put_line('Item Type in Order: ' || message4);
        END IF;
     
    END;
    /

    在同一 Autonomous Database 实例上时,身份证明已存在,您无需运行 DBMS_CLOUD.CREATE_CREDENTIAL 即可接收消息。这在同一实例上创建管道时适用,如 Create an Explicit Persistent Pipe and Send a Message 中所示。

有关更多信息,请参见 SET_CREDENTIAL_NAME ProcedureGET_LOCATION_URI Function

有关更多信息,请参见 RECEIVE_MESSAGE Function

通过在其他数据库上创建管道来检索持久性消息

介绍在 Autonomous Database 实例上使用显式管道检索存储在 Cloud Object Store 中的持久性消息的步骤,该实例与发送消息的实例不同。

  1. 使用过程 DBMS_CLOUD.CREATE_CREDENTIAL 存储对象存储身份证明。例如:
    BEGIN
      DBMS_CLOUD.CREATE_CREDENTIAL(
        credential_name => 'my_persistent_pipe_cred',
        username => 'adb_user@example.com',
        password => 'password'
      );
    END;
    /

    此操作以加密格式将身份证明存储在数据库中。您可以使用身份证明名称的任何名称。请注意,除非对象存储身份证明发生更改,否则此步骤仅需要一次。存储身份证明后,可以使用相同的身份证明名称访问云对象存储以使用 DBMS_PIPE 发送和接收消息。

    有关参数的详细信息,请参见CREATE_CREDENTIAL Procedure

    注意:

    某些工具(如 SQL*Plus 和 SQL Developer)使用“和”号字符 (&) 作为特殊字符。如果密码中有 & 符号,请使用这些工具中的 SET DEFINE OFF 命令(如示例中所示)禁用特殊字符并正确创建凭证。
  2. 创建与发送消息的管道同名的显式管道。例如,创建一个名为 ORDER_PIPE 的管道。
    DECLARE
      r_status INTEGER;
    BEGIN
        r_status := DBMS_PIPE.CREATE_PIPE(pipename => 'ORDER_PIPE');
    END;
    /

    请参见 CREATE_PIPE Function

  3. 验证是否已创建管道。
    SELECT ownerid, name, type FROM v$db_pipes 
           WHERE name = 'ORDER_PIPE';
    
    OWNERID NAME       TYPE    
    ------- ---------- ------- 
         80 ORDER_PIPE PRIVATE 
  4. 使用 DBMS_PIPE 过程为对象存储设置默认访问身份证明和位置 URI,以便 DBMS_PIPE 可以访问持久性消息。
    BEGIN
        DBMS_PIPE.SET_CREDENTIAL_NAME('my_persistent_pipe_cred');
        DBMS_PIPE.SET_LOCATION_URI('https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname1/'); 
    END;
    /

    这些过程设置用于 DBMS_PIPE 过程的缺省凭证名称和缺省位置 URI。

    如果您使用 Oracle Cloud Infrastructure Object Storage 存储消息,则可以使用 Oracle Cloud Infrastructure 本机 URI 或 Swift URI。但是,位置 URI 和身份证明的类型必须匹配,如下所示:

    • 如果使用本机 URI 格式访问 Oracle Cloud Infrastructure Object Storage ,则必须在身份证明对象中使用本机 Oracle Cloud Infrastructure 签名密钥验证。

    • 如果使用 Swift URI 格式访问 Oracle Cloud Infrastructure Object Storage ,则必须在身份证明对象中使用验证令牌验证。

    有关更多信息,请参见 SET_CREDENTIAL_NAME ProcedureGET_LOCATION_URI Function

  5. 从持久性管道接收消息。
    DECLARE
        message1  DATE;
        message2  VARCHAR2(100);
        message3  INTEGER;
        message4  VARCHAR2(100);
        l_result  INTEGER;
    
    BEGIN
    
        DBMS_PIPE.SET_CREDENTIAL_NAME('my_persistent_pipe_cred');
        DBMS_PIPE.SET_LOCATION_URI('https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname1/'); 
        l_result := DBMS_PIPE.RECEIVE_MESSAGE (
                      pipename => 'ORDER_PIPE',
                      timeout  => DBMS_PIPE.MAXWAIT,
                      credential_name => DBMS_PIPE.GET_CREDENTIAL_NAME,
                      location_uri => DBMS_PIPE.GET_LOCATION_URI);
     
        IF l_result = 0 THEN
            DBMS_PIPE.unpack_message(message1);
            DBMS_PIPE.unpack_message(message2);
            DBMS_PIPE.unpack_message(message3);
            DBMS_PIPE.unpack_message(message4);
     
            DBMS_OUTPUT.put_line('Order Received Successfully On: ' || TO_CHAR(sysdate, 'dd-mm-yyyy hh24:mi:ss'));
            DBMS_OUTPUT.put_line('Date of Order: ' || message1);
            DBMS_OUTPUT.put_line('Order Number: ' || message2);
            DBMS_OUTPUT.put_line('Number of Items In Order: ' || message3);
            DBMS_OUTPUT.put_line('Item Type in Order: ' || message4);
        END IF;
     
    END;
    /

    有关更多信息,请参见 RECEIVE_MESSAGE Function

删除持久性管道

介绍删除持久性管道的步骤。

持久管道通过将消息存储在云对象存储中来发送和接收消息。使用 DBMS_PIPE.REMOVE_PIPE 删除 Autonomous Database 实例上的持久性管道。

  1. 调用 DBMS_PIPE.REMOVE_PIPE 函数以删除管道。
    DECLARE
       l_result  INTEGER;
    BEGIN
         l_result := DBMS_PIPE.REMOVE_PIPE('ORDER_PIPE');
    END;
    /

    REMOVE_PIPE 函数从运行该管道的 Autonomous Database 实例中删除管道,但是 REMOVE_PIPE 不会影响使用相同位置 URI 的管道名称相同的其他 Autonomous Database 实例。

  2. 在运行 DBMS_PIPE.REMOVE_PIPEAutonomous Database 实例上,验证管道是否已删除。
    SELECT ownerid, name, type FROM v$db_pipes 
           WHERE name = 'ORDER_PIPE';
    
    No rows selected