使用单例管道高速缓存消息

Singleton Pipe 是 DBMS_PIPE 软件包的补充,可用于缓存和检索定制消息,并在多个数据库会话之间使用并发读取共享消息。

关于使用单例管道缓存消息

DBMS_PIPE 程序包在 Autonomous Database 上扩展了支持单例管道的功能。

DBMS_PIPE 中的单例管道:

  • 使用单例管道消息提供定制数据的内存中高速缓存。

  • 支持缓存和检索最多 32,767 字节的定制消息。

  • 支持使用并发读取在多个数据库会话之间共享缓存的消息。这可以提供高吞吐量,并支持在数据库会话之间并发读取消息。

  • 支持只读和读写数据库。

  • 支持多种高速缓存失效方法:

    • 由用户控制的显式高速缓存失效。
    • 在用户指定的时间间隔后(以秒为单位)高速缓存失效。此失效方法由消息发送器(使用 shelflife 参数)而不是消息读取器控制。这样可以避免由于读取器错误使用高速缓存而导致的常见陷阱。

关于标准管道和单例管道

DBMS_PIPE 软件包允许两个或多个数据库会话使用内存中消息进行通信。管道功能具有多个应用程序,例如外部服务接口、调试、独立事务处理和警报。有关更多信息,请参见 Oracle Database 19c PL/SQL Packages and Types ReferenceOracle Database 23ai PL/SQL Packages and Types Reference 中的 DBMS_PIPE

下面是 database-pipe-messages-singleton-pipes.eps 的说明
插图 database-pipe-messages-singleton-pipes.eps 的说明

单例管道可以是任何受支持的 DBMS_PIPE 类型之一:

  • 隐式管道:使用 DBMS_PIPE.SEND_MESSAGE 函数使用未知的管道名称发送消息时自动创建。
  • 显式管道:使用具有用户指定的管道名称的 DBMS_PIPE.CREATE_PIPE 函数创建。
  • 公共管道:任何对 DBMS_PIPE 软件包具有 EXECUTE 权限的用户均可访问。
  • 专用管道:由与管道创建者具有相同用户的会话访问。

通过单例管道,可以在 Autonomous Database 实例的内存中缓存一条消息。

下面显示了使用单件管道的一般工作流。

下面是单例管道 workflow.eps 的说明
插图 Singleton-pipe-workflow.eps 的说明

单例管道概述和功能

  • 单例消息

    • 单例管道可以在管道中缓存一条消息,因此名称为“单例”。
    • 单例管道中的消息可以由多个字段组成,最大消息总大小为 32,767 字节。
    • DBMS_PIPE 支持使用 DBMS_PIPE.PACK_MESSAGE 过程将多个属性打包到消息中。
    • 对于公共单例管道,任何对 DBMS_PIPE 程序包具有执行权限的数据库会话都可以接收该消息。
    • 对于 Private Singleton Pipe,与 Singleton Pipe 的创建者具有相同用户的会话可以接收消息。
  • 读取的高消息吞吐量
    • 单例管道在管道中缓存消息,直到该消息失效或被清除。数据库会话可以同时读取来自单例管道的消息。
    • 从单例管道接收消息是一种非阻塞操作。
  • 留言高速缓存
    • 使用 DBMS_PIPE.SEND_MESSAGE 在单例管道中缓存消息。
    • 如果单例管道中存在已高速缓存的消息,则 DBMS_PIPE.SEND_MESSAGE 会覆盖上一条消息以在单例管道中仅维护一条消息。
  • 消息失效
    • 显式失效:使用过程 DBMS_PIPE.PURGE 或使用 DBMS_PIPE.SEND_MESSAGE 覆盖消息来清除管道。
    • 自动失效:可以在指定的 shelflife 时间过后自动失效消息。
  • 不逐出数据库内存
    • 单例管道不会从 Oracle Database 内存中移除。
    • 显式单例管道将继续驻留在数据库内存中,直到使用 DBMS_PIPE.REMOVE_PIPE 删除该管道,或者直到数据库重新启动。
    • 隐式单例管道将保留在数据库内存中,直到管道中存在一条缓存的消息。

单例管道操作

使用高速缓存函数自动刷新高速缓存的消息

DBMS_PIPE 软件包允许您使用用户定义的高速缓存函数自动填充单例管道消息。

默认情况下,在单例管道显式或隐式失效时使消息失效后,后续的 DBMS_PIPE.RECEIVE_MESSAGE 将导致不接收任何消息。要向管道中添加新消息,必须通过调用 DBMS_PIPE.SEND_MESSAGE 显式缓存该消息。为了避免这种情况,当您从单例管道读取时没有消息可用时,您可以定义高速缓存函数。定义了高速缓存函数后,在以下情况下收到消息时,会自动调用高速缓存函数:

  • 单例管道为空时。
  • 单例管道中的消息由于经过了 shelflife 时间而无效。

要使用高速缓存函数,请定义高速缓存函数并将 cache_func 参数与 DBMS_PIPE.RECEIVE_MESSAGE 一起使用。用户定义的高速缓存函数提供以下功能:

  • 使用 DBMS_PIPE.RECEIVE_MESSAGE 从单例管道读取消息时,可以指定高速缓存函数。
  • 当单例管道中没有消息时,DBMS_PIPE.RECEIVE_MESSAGE 会调用高速缓存函数。
  • 当消息 shelflife 时间已过时,数据库会自动在单例管道中填充新消息。

使用高速缓存函数可简化使用单例管道的工作。您无需处理从空管道接收消息的失败案例。此外,高速缓存函数可确保从单例管道读取消息时不会缺少高速缓存,从而最大程度地利用高速缓存的消息。

下面是自动高速缓存 - 刷新 - 高速缓存 -function.eps 的说明
插图 auto-cache-refresh-cache-function.eps 的说明

定义高速缓存函数时,函数名称必须使用所有者方案完全限定:

  • OWNER.FUNCTION_NAME
  • OWNER.PACKAGE.FUNCTION_NAME

使用以下签名定义高速缓存函数:

CREATE OR REPLACE FUNCTION cache_function_name(
       pipename  IN VARCHAR2
) RETURN INTEGER;

高速缓存函数中的典型操作包括:

  • 使用 DBMS_PIPE.CREATE_PIPE 为显式管道创建单例管道。
  • 在单例管道中创建要高速缓存的消息。
  • 将消息发送到高速缓存函数中指定的管道,可以选择为隐式消息指定 shelflife

要使用高速缓存函数,调用 DBMS_PIPE.RECEIVE_MESSAGE 的当前会话用户必须具有执行高速缓存函数所需的权限。

有关定义高速缓存函数的更多信息,请参见 RECIEVE_MESSAGE Function

创建显式单例管道

介绍创建具有指定管道名称的单例管道(显式单例管道)的步骤。

首先,在本例中,创建 receive_message 助手函数以反复调用 DBMS_PIPE.RECEIVE_MESSAGE。这允许您测试单件管道功能。

CREATE OR REPLACE FUNCTION msg_types AS
       TYPE t_rcv_row IS RECORD (c1 VARCHAR2(32767), c2 NUMBER);
       TYPE t_rcv_tab IS TABLE OF t_rcv_row;
END;


CREATE OR REPLACE FUNCTION receive_message(
      pipename    IN VARCHAR2,
      rcv_count   IN NUMBER DEFAULT 1,
      cache_func  IN VARCHAR2 DEFAULT NULL)
   RETURN msg_types.t_rcv_tab pipelined
    AS
       l_msg    VARCHAR2(32767);
       l_status NUMBER;
 BEGIN
      FOR i IN 1..rcv_count LOOP
           l_status := DBMS_PIPE.RECEIVE_MESSAGE(
            pipename   => pipename,
            cache_func => cache_func,
            timeout    => 1);
         IF l_status != 0 THEN
              raise_application_error(-20000,
             'Message not received for attempt: ' || to_char(i) || ' status: ' ||
            l_status);
         END IF;

         DBMS_PIPE.UNPACK_MESSAGE(l_msg);
             pipe row(msg_types.t_rcv_row(l_msg));
     END LOOP;
 RETURN;
 END;
  1. 创建名为 PIPE_TEST 的显式单例管道,并将 shelflife 参数设置为 3600(秒)。
    DECLARE
      l_status INTEGER;
    BEGIN
      l_status := DBMS_PIPE.CREATE_PIPE(
                    pipename => 'MY_PIPE1',
                    private => TRUE,
                    singleton => TRUE,
                    shelflife => 3600);
    END;
    /

    有关更多信息,请参见 CREATE_PIPE 函数

  2. 验证是否已创建单例管道。
    SELECT name, singleton, type
         FROM v$db_pipes WHERE name= '&pipename' ORDER BY 1;
    
    NAME                 SINGLETON  TYPE
    -------------------- ---------- -------
    PIPE_TEST            YES        PRIVATE
  3. 在单件管道上打包并发送消息。
    
    EXEC DBMS_PIPE.PACK_MESSAGE('This is a real message that you can get multiple times');
    
    SELECT DBMS_PIPE.SEND_MESSAGE(pipename => '&pipename') status FROM DUAL;
    
    STATUS
    ----------
    0
  4. 从单件管道接收消息。
    SELECT * FROM receive_message(
        pipename => '&pipename',
        rcv_count => 2);
    
    MESSAGE
    --------------------------------------------------------------------------------
    This is a real message that you can get multiple times
    This is a real message that you can get multiple times

    receive_message 函数是调用 DBMS_PIPE.RECEIVE_MESSAGE 的辅助函数。

  5. 清除消息并删除管道。
    EXEC DBMS_PIPE.PURGE('&pipename');
    SELECT DBMS_PIPE.REMOVE_PIPE('&pipename') status FROM DUAL;

使用高速缓存函数创建显式单例管道

介绍创建具有指定管道名称的单例管道(显式单例管道)并提供高速缓存函数的步骤。高速缓存函数允许您自动在单件管道中填充消息。

  1. 为单例管道创建高速缓存函数 test_cache_message
    CREATE OR REPLACE FUNCTION test_cache_message(
         pipename IN VARCHAR2) return NUMBER 
    AS 
       l_status NUMBER;
       l_data VARCHAR2(4000);
    BEGIN
       l_status := DBMS_PIPE.CREATE_PIPE(
              pipename => pipename,
              private => TRUE,
              singleton => true,
              shelflife => 600);
       IF l_status != 0 THEN RETURN l_status;
       END IF;
     
       DBMS_PIPE.PACK_MESSAGE('This is a placeholder cache message for an empty pipe');
       l_status := DBMS_PIPE.SEND_MESSAGE(pipename => pipename);
       RETURN l_status;
     END;
    /

    注意:

    调用 DBMS_PIPE.RECEIVE_MESSAGE 的当前会话用户必须具有执行高速缓存函数所需的权限。
  2. 使用高速缓存函数接收并确认消息已填充到管道中。管道必须作为在高速缓存函数中创建的专用管道存在。
    SELECT * FROM receive_message(
         pipename => '&pipename',
         rcv_count => 1,
         cache_func => 'TEST_CACHE_MESSAGE');
    
    MESSAGE
    ---------------
    This is a placeholder cache message for an empty pipe
    

    receive_message 函数是调用 DBMS_PIPE.RECEIVE_MESSAGE 的辅助函数。有关 receive_message 定义,请参见 Create an Explicit Singleton Pipe

    有关更多信息,请参见 CREATE_PIPE 函数

  3. 接收时不使用缓存功能来确认消息仍然存在于管道中。
    SELECT * FROM receive_message(
         pipename => '&pipename',
         rcv_count => 2);
    
    MESSAGE
    ---------------
    This is a placeholder cache message for an empty pipe
    This is a placeholder cache message for an empty pipe

    receive_message 函数是调用 DBMS_PIPE.RECEIVE_MESSAGE 的辅助函数。有关 receive_message 定义,请参见 Create an Explicit Singleton Pipe

    有关更多信息,请参见 CREATE_PIPE 函数