使用单例管道缓存消息
Singleton Pipe 是 DBMS_PIPE 软件包的补充,它允许您缓存和检索定制消息,并在多个数据库会话之间共享具有并发读取的消息。
关于使用单例管道缓存消息
DBMS_PIPE 程序包在自治 AI 数据库上扩展了支持单例管道的功能。
DBMS_PIPE 中的单例管道:
-
使用 Singleton Pipe 消息提供定制数据的内存中缓存。
-
支持缓存和检索最多 32,767 字节的自定义消息的功能。
-
支持在多个具有并发读取的数据库会话之间共享缓存的消息。这提供了高吞吐量并支持跨数据库会话并发读取消息。
-
支持只读和读写数据库。
-
支持多种高速缓存失效方法:
-
由用户控制的显式高速缓存失效。
-
用户指定时间间隔后高速缓存失效(以秒为单位)。此失效方法由消息发送者使用
shelflife参数而不是消息读取者控制。这样可以避免由于读取器错误使用高速缓存而导致的常见陷阱。
-
标准管道和单件管道
DBMS_PIPE 程序包允许两个或更多数据库会话使用内存中消息进行通信。管道功能具有多个应用程序,例如外部服务接口、调试、独立事务处理和警报。有关详细信息,请参阅《Oracle Database 19c PL/SQL Packages and Types Reference 》或《Oracle Database 26ai PL/SQL Packages and Types Reference 》中的 DBMS_PIPE 。

插图 database-pipe-messages-singleton-pipes.png 的说明
Singleton Pipe 可以是支持的 DBMS_PIPE 类型之一:
-
隐式管道:使用
DBMS_PIPE.SEND_MESSAGE函数发送带有未知管道名称的消息时自动创建。 -
显式管道:使用用户指定的管道名称的
DBMS_PIPE.CREATE_PIPE函数创建。 -
公共管道:对
DBMS_PIPE软件包具有EXECUTE权限的任何用户均可访问。 -
专用管道:与管道创建者具有相同用户的会话可以访问。
通过单例管道,可以在自治 AI 数据库实例的内存中缓存单个消息。
下面显示了使用单例管道的一般工作流。

插图 singleton-pipe-workflow.png 的说明
单件管道概述和功能
-
单例消息
-
单例管道可以在管道中缓存一条消息,因此名称为“单例”。
-
单例管道中的消息可以包含多个字段,最多消息大小为 32,767 字节。
-
DBMS_PIPE支持使用DBMS_PIPE.PACK_MESSAGE过程在消息中打包多个属性。 -
对于公共单例管道,对
DBMS_PIPE软件包具有执行权限的任何数据库会话都可以接收消息。 -
对于 Private Singleton Pipe,消息可以由与 Singleton Pipe 创建者具有相同用户的会话接收。
-
-
读取的高消息吞吐量
-
单例管道会在管道中缓存消息,直到消息失效或被清除。数据库会话可以同时从 Singleton Pipe 中读取消息。
-
从单例管道接收消息是非阻塞操作。
-
-
消息高速缓存
-
使用
DBMS_PIPE.SEND_MESSAGE将消息缓存在单例管道中。 -
如果 Singleton Pipe 中存在现有的缓存消息,则
DBMS_PIPE.SEND_MESSAGE将覆盖之前的消息,以在 Singleton Pipe 中仅维护一条消息。
-
-
消息失效
-
显式失效:通过过程
DBMS_PIPE.PURGE或通过使用DBMS_PIPE.SEND_MESSAGE覆盖消息来清除管道。 -
自动失效:在指定的
shelflife时间过后,消息可以自动失效。
-
-
没有从数据库内存中逐出
-
单例管道不会从 Oracle Database 内存中逐出。
-
显式单例管道会一直驻留在数据库内存中,直到使用
DBMS_PIPE.REMOVE_PIPE删除该管道,或者直到数据库重新启动。 -
隐式单例管道将保留在数据库内存中,直到管道中存在一条高速缓存的消息。
-
Singleton 管道操作
| 操作 | DBMS_PIPE 函数或程序 |
|---|---|
| 创建显式单例管道 | CREATE_PIPE 函数 |
| 在单例管道中缓存消息 | Oracle Database 19c PL/SQL Packages and Types Reference 或 Oracle Database 26ai PL/SQL Packages and Types Reference 中的 PACK_MESSAGE Procedures |
| 从 Singleton Pipe 读取缓存的消息 | RECEIVE_MESSAGE Function(RECEIVE_MESSAGE 函数)、 UNPACK_MESSAGE Procedures in Oracle Database 19c PL/SQL Packages and Types Reference 或 Oracle Database 26ai PL/SQL Packages and Types Reference(程序包和类型参考) |
| 在单例管道中删除消息 | Oracle Database 19c PL/SQL Packages and Types Reference 或 Oracle Database 26ai PL/SQL Packages and Types Reference 中的 PURGE Procedure |
| 删除显式单例管道 | Oracle Database 19c PL/SQL Packages and Types Reference 或 Oracle Database 26ai PL/SQL Packages and Types Reference 中的 REMOVE_PIPE Function |
自动刷新缓存函数的缓存消息
DBMS_PIPE 程序包允许您使用用户定义的高速缓存函数自动填充 Singleton Pipe 消息。
缺省情况下,当一条消息因单例管道显式或隐式失效而失效后,后续的 DBMS_PIPE.RECEIVE_MESSAGE 将不会收到任何消息。要向管道添加新消息,必须通过调用 DBMS_PIPE.SEND_MESSAGE 显式高速缓存该消息。为了避免这种情况,当您从单例管道读取时没有消息可用时,可以定义高速缓存函数。定义了高速缓存函数后,当您在以下情况下收到消息时,将自动调用高速缓存函数:
-
当 Singleton Pipe 为空时。
-
单例管道中的消息因
shelflife时间过长而无效时。
要使用高速缓存函数,请定义高速缓存函数并将 cache_func 参数与 DBMS_PIPE.RECEIVE_MESSAGE 一起包含。用户定义的高速缓存函数提供以下各项:
-
使用
DBMS_PIPE.RECEIVE_MESSAGE从单例管道读取消息时,可以指定高速缓存函数。 -
当 Singleton Pipe 中没有消息时,
DBMS_PIPE.RECEIVE_MESSAGE调用高速缓存函数。 -
当消息
shelflife时间过后,数据库会自动在 Singleton Pipe 中填充新消息。
使用高速缓存函数可以简化单例管道的工作。您无需处理从空管道接收消息的失败案例。此外,高速缓存函数可确保从单例管道读取消息时没有高速缓存未命中,从而最大限度地利用高速缓存的消息。

插图 auto-cache-refresh-cache-function.png 的说明
定义高速缓存函数时,必须使用所有者方案对函数名称进行完全限定:
-
OWNER.FUNCTION_NAME -
OWNER.PACKAGE.FUNCTION_NAME
使用以下签名定义高速缓存函数:
CREATE OR REPLACE FUNCTION *cache_function_name*(
pipename IN VARCHAR2
) RETURN INTEGER;
高速缓存函数中的典型操作包括:
-
使用
DBMS_PIPE.CREATE_PIPE为显式管道创建单例管道。 -
创建消息以在单例管道中缓存。
-
将消息发送到高速缓存函数中指定的管道,可以选择为隐式消息指定
shelflife。
要使用高速缓存函数,调用 DBMS_PIPE.RECEIVE_MESSAGE 的当前会话用户必须具有执行高速缓存函数所需的特权。
有关定义高速缓存函数的更多信息,请参见 RECIEVE_MESSAGE Function 。
创建显式单例管道
介绍创建具有指定管道名称(显式单例管道)的单例管道的步骤。
首先,对于本示例,创建 receive_message 帮助程序函数以重复调用 DBMS_PIPE.RECEIVE_MESSAGE。这允许您测试单吨管功能。
CREATE OR REPLACE FUNCTION msg_types AS
TYPE t_rcv_row IS RECORD (c1 VARCHAR2(32767), c2 NUMBER);
TYPE t_rcv_tab IS TABLE OF t_rcv_row;
END;
CREATE OR REPLACE FUNCTION receive_message(
pipename IN VARCHAR2,
rcv_count IN NUMBER DEFAULT 1,
cache_func IN VARCHAR2 DEFAULT NULL)
RETURN msg_types.t_rcv_tab pipelined
AS
l_msg VARCHAR2(32767);
l_status NUMBER;
BEGIN
FOR i IN 1..rcv_count LOOP
l_status := DBMS_PIPE.RECEIVE_MESSAGE(
pipename => pipename,
cache_func => cache_func,
timeout => 1);
IF l_status != 0 THEN
raise_application_error(-20000,
'Message not received for attempt: ' || to_char(i) || ' status: ' ||
l_status);
END IF;
DBMS_PIPE.UNPACK_MESSAGE(l_msg);
pipe row(msg_types.t_rcv_row(l_msg));
END LOOP;
RETURN;
END;
-
创建名为
PIPE_TEST的显式单例管道,并将shelflife参数设置为 3600(秒)。DECLARE l_status INTEGER; BEGIN l_status := DBMS_PIPE.CREATE_PIPE( pipename => 'MY_PIPE1', private => TRUE, singleton => TRUE, shelflife => 3600); END; /有关更多信息,请参见 CREATE_PIPE Function 。
-
验证是否已创建单例管道。
SELECT name, singleton, type FROM v$db_pipes WHERE name= '&pipename' ORDER BY 1;NAME SINGLETON TYPE -------------------- ---------- ------- PIPE_TEST YES PRIVATE -
在单例管道上打包并发送消息。
EXEC DBMS_PIPE.PACK_MESSAGE('This is a real message that you can get multiple times'); SELECT DBMS_PIPE.SEND_MESSAGE(pipename => '&pipename') status FROM DUAL;STATUS ---------- 0有关详细信息,请参阅 Oracle Database 19c PL/SQL Packages and Types Reference 或 Oracle Database 26ai PL/SQL Packages and Types Reference 和 SEND_MESSAGE Function 中的 PACK_MESSAGE Procedures 。
-
从单例管道接收消息。
SELECT * FROM receive_message( pipename => '&pipename', rcv_count => 2);MESSAGE -------------------------------------------------------------------------------- This is a real message that you can get multiple times This is a real message that you can get multiple timesreceive_message函数是调用DBMS_PIPE.RECEIVE_MESSAGE的帮助程序函数。 -
清除消息并删除管道。
EXEC DBMS_PIPE.PURGE('&pipename'); SELECT DBMS_PIPE.REMOVE_PIPE('&pipename') status FROM DUAL;
使用高速缓存函数创建显式单例管道
介绍创建具有指定管道名称的单例管道和显式单例管道并提供高速缓存功能的步骤。高速缓存功能允许您自动在单例管道中填充消息。
-
为单例管道创建高速缓存函数
test_cache_message。CREATE OR REPLACE FUNCTION test_cache_message( pipename IN VARCHAR2) return NUMBER AS l_status NUMBER; l_data VARCHAR2(4000); BEGIN l_status := DBMS_PIPE.CREATE_PIPE( pipename => pipename, private => TRUE, singleton => true, shelflife => 600); IF l_status != 0 THEN RETURN l_status; END IF; DBMS_PIPE.PACK_MESSAGE('This is a placeholder cache message for an empty pipe'); l_status := DBMS_PIPE.SEND_MESSAGE(pipename => pipename); RETURN l_status; END; /注:调用
DBMS_PIPE.RECEIVE_MESSAGE的当前会话用户必须具有执行高速缓存函数所需的权限。 -
使用缓存函数接收并确认消息填充到管道中。管道必须作为高速缓存函数中创建的专用管道存在。
SELECT * FROM receive_message( pipename => '&pipename', rcv_count => 1, cache_func => 'TEST_CACHE_MESSAGE');MESSAGE --------------- This is a placeholder cache message for an empty pipereceive_message函数是调用DBMS_PIPE.RECEIVE_MESSAGE的帮助程序函数。有关receive_message定义,请参见 Create an Explicit Singleton Pipe 。有关更多信息,请参见 CREATE_PIPE Function 。
-
在没有高速缓存函数的情况下接收,以确认消息仍然存在于管道中。
SELECT * FROM receive_message( pipename => '&pipename', rcv_count => 2);MESSAGE --------------- This is a placeholder cache message for an empty pipe This is a placeholder cache message for an empty pipereceive_message函数是调用DBMS_PIPE.RECEIVE_MESSAGE的帮助程序函数。有关receive_message定义,请参见 Create an Explicit Singleton Pipe 。有关更多信息,请参见 CREATE_PIPE Function 。