Use Persistent Messaging with Messages Stored in Cloud Object Store

The DBMS_PIPE package has extended functionality on Autonomous Database to support persistent messaging, where messages are stored in Cloud Object Store.

About Persistent Messaging with DBMS_PIPE

Persistent messaging with DBMS_PIPE allows one or more database sessions to communicate in the same region or across regions with messages that are stored in Cloud Object Store.

Persistent messages in DBMS_PIPE:

  • Allow you to send and retrieve very large messages.

  • Support a sending a large number of pipe messages.

  • Support sending and receiving messages within a single database, across multiple databases and across databases in different regions.

  • Support multiple pipes using the same Cloud Object Store location URI.

Persistent messaging pipes can be created in any of the supported DBMS_PIPE types:

  • Implicit Pipe: Automatically created when a message is sent with an unknown pipe name using the DBMS_PIPE.SEND_MESSAGE function.
  • Explicit Pipe: Created using the DBMS_PIPE.CREATE_PIPE function with a user specified pipe name.
  • Public Pipe: Accessible by any user with EXECUTE permission on DBMS_PIPE package.
  • Private Pipe: Accessible by sessions with the same user as the pipe creator.

Note:

Oracle recommends creating an explicit pipe before you send or receive messages with persistent messaging. Creating an explicit pipe with DBMS_PIPE.CREATE_PIPE ensures that the pipe is created with the access permissions you want, either public or private (by setting the private parameter).

The following shows the general workflow for DBMS_PIPE with persistent messaging:

Description of database-pipe-persistent-messaging.eps follows
Description of the illustration database-pipe-persistent-messaging.eps

Existing applications using DBMS_PIPE can continue to operate with minimal changes. You can configure existing applications that use DBMS_PIPE with a credential object and location URI using a logon trigger or using some other initialization routine. After setting the DBMS_PIPE credential and location URI, no other changes are needed to use persistent messaging. All subsequent use of the pipe stores the messages in Cloud Object Store instead of in database memory. This allows you to change the storage method for messages from in-memory to persistent Cloud Object Storage, with minimal changes.

Persistent Messaging Overview and Features

Features of DBMS_PIPE using persistent messaging:

  • Messages can be sent and retrieved across multiple Autonomous Database instances in the same region or across regions.

  • Persistent messages are guaranteed to either be written or read by exactly one process. This prevents message content inconsistency due to concurrent writes and reads. Using a persistent messaging pipe, DBMS_PIPE allows only one operation, sending a message or a receiving message to be active at a given time. However, if an operation is not possible due to an ongoing operation, the process retries periodically until the timeout value is reached.

  • DBMS_PIPE uses DBMS_CLOUD to access Cloud Object Store. Messages can be stored in any of the supported Cloud Object Stores. See DBMS_CLOUD Package File URI Formats for more information.

  • DBMS_PIPE uses DBMS_CLOUD to access Cloud Object Store and all supported credential types are available:

DBMS_PIPE Privileges Authorization and Security

The DBMS_PIPE procedures run with invoker's rights. Private pipes are owned by the current user and a private pipe that is created by a user can only be used by the same user. This applies to both in-memory pipes and persistent messaging pipes where messages are stored to Cloud Object Store. Sending and receiving messages run in the invoker's schema.

Using private pipes, where messages are stored to Cloud Object Store, a credential object is required for authentication with the Cloud Object store identified by the location_uri parameter. The invoking user must have EXECUTE privilege on the credential object specified with the credential_name parameter that is used to access the Object Store.

To use a public pipe, the user, database session, must have execute privilege on DBMS_PIPE. For a public pipe using persistent messaging and storing messages to Cloud Object Store, the user, database session, must have execute privilege on DBMS_CLOUD and execute privilege on the credential object (or you can create a credential object that is allowed to access the location URI that contains the message.

Create an Explicit Persistent Pipe and Send a Message

Describes the steps to create a persistent pipe with a specified pipe name (Explicit Pipe).

  1. Store your object store credentials using the procedure DBMS_CLOUD.CREATE_CREDENTIAL. For example:
    BEGIN
      DBMS_CLOUD.CREATE_CREDENTIAL(
        credential_name => 'my_persistent_pipe_cred',
        username => 'adb_user@example.com',
        password => 'password'
      );
    END;
    /

    This operation stores the credentials in the database in an encrypted format. You can use any name for the credential name. Note that this step is required only once unless your object store credentials change. After you store the credentials you can then use the same credential name to access Cloud Object Store to send and receive messages with DBMS_PIPE.

    For detailed information about the parameters, see CREATE_CREDENTIAL Procedure. For Oracle Cloud Infrastructure Object Storage, it is required that the credential uses native Oracle Cloud Infrastructure authentication.

    Creating a credential to access Oracle Cloud Infrastructure Object Store is not required if you enable resource principal credentials. See Use Resource Principal to Access Oracle Cloud Infrastructure Resources for more information.

    Note:

    Some tools like SQL*Plus and SQL Developer use the ampersand character (&) as a special character. If you have the ampersand character in your password use the SET DEFINE OFF command in those tools as shown in the example to disable the special character and get the credential created properly.
  2. Create an explicit pipe to send and retrieve messages. For example, create a pipe named ORDER_PIPE.
    DECLARE
      r_status INTEGER;
    BEGIN
        r_status := DBMS_PIPE.CREATE_PIPE(pipename => 'ORDER_PIPE');
    END;
    /

    See CREATE_PIPE Function for more information.

  3. Verify that the pipe is created.
    SELECT ownerid, name, type FROM v$db_pipes 
           WHERE name = 'ORDER_PIPE';
    OWNERID NAME       TYPE    
    ------- ---------- ------- 
         80 ORDER_PIPE PRIVATE 
  4. Use DBMS_PIPE procedures to set the default access credential and location URI to store persistent messages to Cloud Object Store.
    BEGIN
        DBMS_PIPE.SET_CREDENTIAL_NAME('my_persistent_pipe_cred');
        DBMS_PIPE.SET_LOCATION_URI('https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname1/'); 
    END;
    /

    These procedures set the default credential name and the default location URI for use with DBMS_PIPE procedures.

    If you use Oracle Cloud Infrastructure Object Storage to store messages, you can use Oracle Cloud Infrastructure Native URIs or Swift URIs. However, the location URI and the credential must match in type as follows:

    • If you use a native URI format to access Oracle Cloud Infrastructure Object Storage, you must use Native Oracle Cloud Infrastructure Signing Keys authentication in the credential object.

    • If you use Swift URI format to access Oracle Cloud Infrastructure Object Storage, you must use an auth token authentication in the credential object.

    See SET_CREDENTIAL_NAME Procedure and SET_LOCATION_URI Procedure for more information.

  5. Pack and send a message on the pipe.
    DECLARE
      l_result INTEGER;
      l_date   DATE;
    BEGIN
        l_date := sysdate;
        DBMS_PIPE.PACK_MESSAGE(l_date);         -- date of order
        DBMS_PIPE.PACK_MESSAGE('C123');         -- order number
        DBMS_PIPE.PACK_MESSAGE(5);              -- number of items in order
        DBMS_PIPE.PACK_MESSAGE('Printers');     -- type of item in order
    
     
        l_result := DBMS_PIPE.SEND_MESSAGE(
                        pipename => 'ORDER_PIPE',
                        credential_name => DBMS_PIPE.GET_CREDENTIAL_NAME,
                        location_uri => DBMS_PIPE.GET_LOCATION_URI);
         
        IF l_result = 0 THEN
            DBMS_OUTPUT.put_line('DBMS_PIPE sent order successfully');
        END IF;
     
    END;
    /

    See PACK_MESSAGE Procedures and SEND_MESSAGE Function for more information.

Retrieve a Persistent Message on Same Database

Describes the steps to retrieve a persistent message from an explicit pipe on the same Autonomous Database instance (the instance where the message was sent).

On an Autonomous Database instance you can receive messages sent to a pipe from a different session. The DBMS_PIPE procedures are invoker's rights procedures and run as the current invoked user.

Private pipes are owned by the current user that creates the pipe. Private pipes can only be accessed by the same user that created the pipe. This applies to pipes using in-memory messages and to pipes using persistent messaging with messages stored in Cloud Object Store.

Public pipes can be accessed by any database session having execute privilege on DBMS_PIPE. This applies to pipes using in-memory messages and to pipes using persistent messaging with messages stored in Cloud Object Store.

  1. Verify that the pipe is created.
    SELECT ownerid, name, type FROM v$db_pipes 
           WHERE name = 'ORDER_PIPE';
    
    OWNERID NAME       TYPE    
    ------- ---------- ------- 
         80 ORDER_PIPE PRIVATE 

    When you are on the same Autonomous Database instance and the pipe exists, you do not need to run DBMS_PIPE.CREATE_PIPE before you receive a message. This applies when the pipe was created on the same instance, as shown in Create an Explicit Persistent Pipe and Send a Message.

  2. Receive a message from the pipe.
    DECLARE
        message1  DATE;
        message2  VARCHAR2(100);
        message3  INTEGER;
        message4  VARCHAR2(100);
        l_result  INTEGER;
    
    BEGIN
    
        DBMS_PIPE.SET_CREDENTIAL_NAME('my_persistent_pipe_cred');
        DBMS_PIPE.SET_LOCATION_URI('https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname1/'); 
        l_result := DBMS_PIPE.RECEIVE_MESSAGE (
                      pipename => 'ORDER_PIPE',
                      timeout  => DBMS_PIPE.MAXWAIT,
                      credential_name => DBMS_PIPE.GET_CREDENTIAL_NAME,
                      location_uri => DBMS_PIPE.GET_LOCATION_URI);
     
        IF l_result = 0 THEN
            DBMS_PIPE.unpack_message(message1);
            DBMS_PIPE.unpack_message(message2);
            DBMS_PIPE.unpack_message(message3);
            DBMS_PIPE.unpack_message(message4);
     
            DBMS_OUTPUT.put_line('Order Received Successfully On: ' || TO_CHAR(sysdate, 'dd-mm-yyyy hh24:mi:ss'));
            DBMS_OUTPUT.put_line('Date of Order: ' || message1);
            DBMS_OUTPUT.put_line('Order Number: ' || message2);
            DBMS_OUTPUT.put_line('Number of Items In Order: ' || message3);
            DBMS_OUTPUT.put_line('Item Type in Order: ' || message4);
        END IF;
     
    END;
    /

    When you are on the same Autonomous Database instance, the credential already exists and you do not need to run DBMS_CLOUD.CREATE_CREDENTIAL to receive a message. This applies when the pipe was created on the same instance, as shown in Create an Explicit Persistent Pipe and Send a Message.

See SET_CREDENTIAL_NAME Procedure and SET_LOCATION_URI Procedure for more information.

See RECEIVE_MESSAGE Function for more information.

Retrieve a Persistent Message by Creating a Pipe on a Different Database

Describes the steps to retrieve a persistent message stored in Cloud Object Store with an explicit pipe on an Autonomous Database instance that is different than the instance that sent the message.

  1. Store your object store credentials using the procedure DBMS_CLOUD.CREATE_CREDENTIAL. For example:
    BEGIN
      DBMS_CLOUD.CREATE_CREDENTIAL(
        credential_name => 'my_persistent_pipe_cred',
        username => 'adb_user@example.com',
        password => 'password'
      );
    END;
    /

    This operation stores the credentials in the database in an encrypted format. You can use any name for the credential name. Note that this step is required only once unless your object store credentials change. Once you store the credentials you can then use the same credential name to access the Cloud Object Store to send and receive messages with DBMS_PIPE.

    For detailed information about the parameters, see CREATE_CREDENTIAL Procedure.

    Creating a credential to access Oracle Cloud Infrastructure Object Store is not required if you enable resource principal credentials. See Use Resource Principal to Access Oracle Cloud Infrastructure Resources for more information.

    Note:

    Some tools like SQL*Plus and SQL Developer use the ampersand character (&) as a special character. If you have the ampersand character in your password use the SET DEFINE OFF command in those tools as shown in the example to disable the special character and get the credential created properly.
  2. Create an explicit pipe with the same name as the pipe that sent the message. For example, create a pipe named ORDER_PIPE.
    DECLARE
      r_status INTEGER;
    BEGIN
        r_status := DBMS_PIPE.CREATE_PIPE(pipename => 'ORDER_PIPE');
    END;
    /
  3. Verify that the pipe is created.
    SELECT ownerid, name, type FROM v$db_pipes 
           WHERE name = 'ORDER_PIPE';
    
    OWNERID NAME       TYPE    
    ------- ---------- ------- 
         80 ORDER_PIPE PRIVATE 
  4. Use DBMS_PIPE procedures to set the default access credential and location URI for Object Store so that DBMS_PIPE can access the persistent message.
    BEGIN
        DBMS_PIPE.SET_CREDENTIAL_NAME('my_persistent_pipe_cred');
        DBMS_PIPE.SET_LOCATION_URI('https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname1/'); 
    END;
    /

    These procedures set the default credential name and the default location URI for use with DBMS_PIPE procedures.

    If you use Oracle Cloud Infrastructure Object Storage to store messages, you can use Oracle Cloud Infrastructure Native URIs or Swift URIs. However, the location URI and the credential must match in type as follows:

    • If you use a native URI format to access Oracle Cloud Infrastructure Object Storage, you must use Native Oracle Cloud Infrastructure Signing Keys authentication in the credential object.

    • If you use Swift URI format to access Oracle Cloud Infrastructure Object Storage, you must use an auth token authentication in the credential object.

    See SET_CREDENTIAL_NAME Procedure and SET_LOCATION_URI Procedure for more information.

  5. Receive a message from the persistent pipe.
    DECLARE
        message1  DATE;
        message2  VARCHAR2(100);
        message3  INTEGER;
        message4  VARCHAR2(100);
        l_result  INTEGER;
    
    BEGIN
    
        DBMS_PIPE.SET_CREDENTIAL_NAME('my_persistent_pipe_cred');
        DBMS_PIPE.SET_LOCATION_URI('https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname1/'); 
        l_result := DBMS_PIPE.RECEIVE_MESSAGE (
                      pipename => 'ORDER_PIPE',
                      timeout  => DBMS_PIPE.MAXWAIT,
                      credential_name => DBMS_PIPE.GET_CREDENTIAL_NAME,
                      location_uri => DBMS_PIPE.GET_LOCATION_URI);
     
        IF l_result = 0 THEN
            DBMS_PIPE.unpack_message(message1);
            DBMS_PIPE.unpack_message(message2);
            DBMS_PIPE.unpack_message(message3);
            DBMS_PIPE.unpack_message(message4);
     
            DBMS_OUTPUT.put_line('Order Received Successfully On: ' || TO_CHAR(sysdate, 'dd-mm-yyyy hh24:mi:ss'));
            DBMS_OUTPUT.put_line('Date of Order: ' || message1);
            DBMS_OUTPUT.put_line('Order Number: ' || message2);
            DBMS_OUTPUT.put_line('Number of Items In Order: ' || message3);
            DBMS_OUTPUT.put_line('Item Type in Order: ' || message4);
        END IF;
     
    END;
    /

    See RECEIVE_MESSAGE Function for more information.

Remove a Persistent Pipe

Describes the steps to remove a persistent pipe.

Persistent pipes send and receive messages by storing messages in Cloud Object Store. Use DBMS_PIPE.REMOVE_PIPE to remove a persistent pipe on an Autonomous Database instance.

  1. Call the DBMS_PIPE.REMOVE_PIPE function to remove a pipe.
    DECLARE
       l_result  INTEGER;
    BEGIN
         l_result := DBMS_PIPE.REMOVE_PIPE('ORDER_PIPE');
    END;
    /

    The REMOVE_PIPE function removes the pipe from the Autonomous Database instance where it runs, however REMOVE_PIPE does not affect other Autonomous Database instances with a pipe with the same name that uses the same location URI.

  2. On the Autonomous Database instance were you run DBMS_PIPE.REMOVE_PIPE, verify that the pipe is removed.
    SELECT ownerid, name, type FROM v$db_pipes 
           WHERE name = 'ORDER_PIPE';
    
    No rows selected