Skip Headers
Oracle® Database JDBC Developer's Guide and Reference,
11g Release 1 (11.1)

B31224-04
Go to Documentation Home
Home
Go to Book List
Book List
Go to Table of Contents
Contents
Go to Index
Index
Go to Master Index
Master Index
Go to Feedback page
Contact Us

Go to previous page
Previous
Go to next page
Next
PDF · Mobi · ePub

25 Oracle Advanced Queuing

Oracle Advanced Queuing (AQ) provides database-integrated message queuing functionality. It is built on top of Oracle Streams and optimizes the functions of Oracle Database so that messages can be stored persistently, propagated between queues on different computers and databases, and transmitted using Oracle Net Services, HTTP, and HTTPS. Because Oracle AQ is implemented in database tables, all operational benefits of high availability, scalability, and reliability are also applicable to queue data. This chapter provides information about the Java interface to Oracle AQ.

See Also:

Oracle Streams Advanced Queuing User's Guide

This chapters covers the following topics:

Functionality and Framework of Oracle Advanced Queuing

Oracle Database 11g Release 1 (11.1) provides a fast Java interface to AQ by introducing a new Java package, oracle.jdbc.aq. This package contains the following:

These classes and interfaces enable you to access an existing queue, create messages, and enqueue and dequeue messages.

Note:

Oracle JDBC drivers do not provide any API to create a queue. Queues must be created through the DBMS_AQADM PL/SQL package.

See Also:

For more information about the APIs, refer to the Javadoc.

AQ Asynchronous Event Notification

A JDBC application can do the following:

Registered clients are notified asynchronously when events are triggered or on an explicit AQ enqueue (or a new message is enqueued in a queue for which you have registered your interest). Clients do not need to be connected to a database.

Example

Example 25-1 illustrates the use of the new JDBC AQ interface which is new feature of the JDBC Thin driver in release 11.1. It shows how to enqueue and dequeue from a RAW type single-consumer and multiple-consumer queue. It also shows how AQ asynchronous notification works. In this example, the SCOTT user is connecting to the database. Therefore, in the database, you must grant the following privileges to the user:

GRANT EXECUTE ON DBMS_AQ to SCOTT;
GRANT EXECUTE ON DBMS_AQADM to SCOTT;
GRANT AQ_ADMINISTRATOR_ROLE TO SCOTT;
GRANT ADMINISTER DATABASE TRIGGER TO SCOTT;

Example 25-1 AQ Asynchronous Event Notification Example

import java.sql.*;
import java.util.Properties;
import oracle.jdbc.*;
import oracle.jdbc.aq.*;
public class DemoAQRawQueue
{
  static final String USERNAME= "scott";
  static final String PASSWORD= "tiger";
  static final String URL = "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=tcp)(HOST=oracleserver.mydomain.com)
 (PORT=1521))"+"(CONNECT_DATA=(SERVICE_NAME=mydatabaseinstance)))";
  
  public static final void main(String[] argv)
  {
    DemoAQRawQueue demo = new DemoAQRawQueue();
    try
    {
      demo.run();
    }catch(SQLException ex)
    {
      ex.printStackTrace();
    }
  }
 
  void run() throws SQLException
  {
    OracleConnection connection = connect();
    connection.setAutoCommit(false);
    cleanup(connection);
    setup(connection);
    // run the demo for single consumer queue:
    demoSingleConsumerQueue(connection);
    // run the demo for multi consumer queue:
    demoMultipleConsumerQueue(connection);
    connection.close();
  }
  
  /**
   * Single consumer queue demo:
   * This method enqueues a dummy message into the RAW_SINGLE_QUEUE queue and dequeues it.
   * Then it registers for AQ notification on this same queue and enqueues a
   * message again. The AQ listener will be notified that a new message has arrived
   * in the queue and it will dequeue it.
   */
  void demoSingleConsumerQueue(OracleConnection connection) throws SQLException
  {
    System.out.println("\n ============= Start single consumer queue demo ============= \n");
 
    String queueType = "RAW";
    String queueName = USERNAME+".RAW_SINGLE_QUEUE";
    
    enqueueDummyMessage(connection,queueName,null);
    dequeue(connection,queueName,queueType,null);
 
    AQNotificationRegistration reg = registerForAQEvents(
      connection,queueName);    
    DemoAQRawQueueListener single_li = new DemoAQRawQueueListener(
       queueName,queueType);
    reg.addListener(single_li);
    
    enqueueDummyMessage(connection,queueName,null);
    connection.commit();
    
    while(single_li.getEventsCount() < 1)
    {
      try
      { Thread.currentThread().sleep(1000); }
      catch (InterruptedException e)
      { }
    }
    single_li.closeConnection();
    connection.unregisterAQNotification(reg);
  }
 
  /**
   * Multi consumer queue demo:
   * This method first registers for AQ notification upon the agent "BLUE". It
   * then enqueues a message for "RED" and "BLUE"
   */
  void demoMultipleConsumerQueue(OracleConnection connection) throws SQLException
  {
    System.out.println("\n ============= Start multi consumer queue demo ============= \n");
    String queueType = "RAW";
    String queueName = USERNAME+".RAW_MULTIPLE_QUEUE";
 
    AQNotificationRegistration reg = registerForAQEvents(
      connection,queueName+":BLUE");    
    DemoAQRawQueueListener multi_li = new DemoAQRawQueueListener(
       queueName,queueType);
    reg.addListener(multi_li);
    AQAgent[] recipients = new AQAgent[2];
    recipients[0] = AQFactory.createAQAgent();
    recipients[0].setName("BLUE");
    recipients[1] = AQFactory.createAQAgent();
    recipients[1].setName("RED");
    
    enqueueDummyMessage(connection,queueName,recipients);
    connection.commit();
    
    while(multi_li.getEventsCount() < 1)
    {
      try
      { Thread.currentThread().sleep(1000); }
      catch (InterruptedException e)
      { }
    }
    
    dequeue(connection,queueName,queueType,"RED");
    
    multi_li.closeConnection();
    connection.unregisterAQNotification(reg);
  }
 
 
 /**
  * This method enqueues a dummy message in the queue specified by its name.
  */
  public void enqueueDummyMessage(OracleConnection conn,
                      String queueName,
                      AQAgent[] recipients) throws SQLException
  {
    System.out.println("----------- Enqueue start ------------");
    // First create the message properties:
    AQMessageProperties msgprop = AQFactory.createAQMessageProperties();
    msgprop.setCorrelation("mycorrelation");
    msgprop.setExceptionQueue("MY_EXCEPTION_QUEUE");
    
    // Specify an agent as the sender:
    AQAgent ag = AQFactory.createAQAgent();
    ag.setName("MY_SENDER_AGENT_NAME");
    ag.setAddress("MY_SENDER_AGENT_ADDRESS");
    msgprop.setSender(ag);
    
    // handle multi consumer case:
    if(recipients != null)
      msgprop.setRecipientList(recipients);
    
    System.out.println(msgprop.toString());
    
    // Create the actual AQMessage instance:
    AQMessage mesg = AQFactory.createAQMessage(msgprop);
    // and add a payload:
    byte[] rawPayload = new byte[500];
    for(int i=0;i<rawPayload.length;i++)
      rawPayload[i] = 'b';
    mesg.setPayload(new oracle.sql.RAW(rawPayload));
   
    // We want to retrieve the message id after enqueue:
    AQEnqueueOptions opt = new AQEnqueueOptions();
    opt.setRetrieveMessageId(true);
    
    // execute the actual enqueue operation:
    conn.enqueue(queueName,opt,mesg);
    
    byte[] mesgId = mesg.getMessageId();
    
    if(mesgId != null)
    {
      String mesgIdStr = byteBufferToHexString(mesgId,20);
      System.out.println("Message ID from enqueue call: "+mesgIdStr);
    }
    System.out.println("----------- Enqueue done ------------");
  }
  
  /**
   * This methods dequeues the next available message from the queue specified
   * by "queueName".
   */
  public void dequeue(OracleConnection conn,
                      String queueName,
                      String queueType,
                      String consumerName
    ) throws SQLException
  {
    System.out.println("----------- Dequeue start ------------");
    AQDequeueOptions deqopt = new AQDequeueOptions();
    deqopt.setRetrieveMessageId(true);
    if(consumerName != null)
      deqopt.setConsumerName(consumerName);
      
    // dequeue operation:
    AQMessage msg = conn.dequeue(queueName,deqopt,queueType);
    
    // print out the message that has been dequeued:
    byte[] payload = msg.getPayload();
    byte[] msgId = msg.getMessageId();
    if(msgId != null)
    {
      String mesgIdStr = byteBufferToHexString(msgId,20);
      System.out.println("ID of message dequeued = "+mesgIdStr);
    }
    AQMessageProperties msgProp = msg.getMessageProperties();
    System.out.println(msgProp.toString());
    String payloadStr = new String(payload,0,10);
    System.out.println("payload.length="+payload.length+", value="+payloadStr);
    System.out.println("----------- Dequeue done ------------");
  }
 
 
  public AQNotificationRegistration registerForAQEvents(
    OracleConnection conn,
    String queueName) throws SQLException
  {
    Properties globalOptions = new Properties();
    String[] queueNameArr = new String[1];
    queueNameArr[0] = queueName;
    Properties[] opt = new Properties[1];
    opt[0] = new Properties();
    opt[0].setProperty(OracleConnection.NTF_AQ_PAYLOAD,"true");
    AQNotificationRegistration[] regArr = conn.registerAQNotification(queueNameArr,opt,globalOptions);
    AQNotificationRegistration reg = regArr[0];
    return reg;
  }
 
  /**
   * 
   */
  private void setup(Connection conn) throws SQLException
  {
    
    doUpdateDatabase(conn,
               "BEGIN "+
               "DBMS_AQADM.CREATE_QUEUE_TABLE( "+
               "   QUEUE_TABLE        =>  '"+USERNAME+".RAW_SINGLE_QUEUE_TABLE',  "+
               "   QUEUE_PAYLOAD_TYPE =>  'RAW', "+
               "   COMPATIBLE         =>  '10.0'); "+
               "END; ");
    doUpdateDatabase(conn,
               "BEGIN "+
               "DBMS_AQADM.CREATE_QUEUE( "+
               "    QUEUE_NAME     =>   '"+USERNAME+".RAW_SINGLE_QUEUE', "+
               "    QUEUE_TABLE    =>   '"+USERNAME+".RAW_SINGLE_QUEUE_TABLE'); "+
               "END;  ");
    doUpdateDatabase(conn,
               "BEGIN "+
               "  DBMS_AQADM.START_QUEUE('"+USERNAME+".RAW_SINGLE_QUEUE'); "+
               "END; ");
 
    // create a multi consumer RAW queue:
    doUpdateDatabase(conn,
               "BEGIN "+
               "DBMS_AQADM.CREATE_QUEUE_TABLE( "+
               "   QUEUE_TABLE        =>  '"+USERNAME+".RAW_MULTIPLE_QUEUE_TABLE',  "+
               "   QUEUE_PAYLOAD_TYPE =>  'RAW', "+
               "   MULTIPLE_CONSUMERS =>  TRUE, "+
               "   COMPATIBLE         =>  '10.0'); "+
               "END; ");
    doUpdateDatabase(conn,
               "BEGIN "+
               "DBMS_AQADM.CREATE_QUEUE( "+
               "    QUEUE_NAME     =>   '"+USERNAME+".RAW_MULTIPLE_QUEUE', "+
               "    QUEUE_TABLE    =>   '"+USERNAME+".RAW_MULTIPLE_QUEUE_TABLE'); "+
               "END;  ");
    doUpdateDatabase(conn,
               "BEGIN "+
               "  DBMS_AQADM.START_QUEUE('"+USERNAME+".RAW_MULTIPLE_QUEUE'); "+
               "END; ");
  }
 
  void cleanup( Connection conn)
  {
    tryUpdateDatabase(conn,
                "BEGIN "+
                "  DBMS_AQADM.STOP_QUEUE('"+USERNAME+".RAW_SINGLE_QUEUE'); "+
                "END; "); 
    tryUpdateDatabase(conn,
                "BEGIN "+
                "  DBMS_AQADM.DROP_QUEUE_TABLE( "+
                "    QUEUE_TABLE         => '"+USERNAME+".RAW_SINGLE_QUEUE_TABLE', "+
                "    FORCE               => TRUE); "+
                "END; "); 
                
    tryUpdateDatabase(conn,
                "BEGIN "+
                "  DBMS_AQADM.STOP_QUEUE('"+USERNAME+".RAW_MULTIPLE_QUEUE'); "+
                "END; "); 
    tryUpdateDatabase(conn,
                "BEGIN "+
                "  DBMS_AQADM.DROP_QUEUE_TABLE( "+
                "    QUEUE_TABLE         => '"+USERNAME+".RAW_MULTIPLE_QUEUE_TABLE', "+
                "    FORCE               => TRUE); "+
                "END; "); 
  }
 
 
  /**
   * Creates a connection the database.
   */
  OracleConnection connect() throws SQLException
  {
    OracleDriver dr = new OracleDriver();
    Properties prop = new Properties();
    prop.setProperty("user",DemoAQRawQueue.USERNAME);
    prop.setProperty("password",DemoAQRawQueue.PASSWORD);
    return (OracleConnection)dr.connect(DemoAQRawQueue.URL,prop);
  }
  
  /**
   * Utility method: executes a DML query but doesn't throw any exception if
   * an error occurs.
   */
  private void tryUpdateDatabase(Connection conn,String sql)
  {
    Statement stmt = null;
    try
    {
      stmt = conn.createStatement();
      stmt.executeUpdate(sql);
    }catch(SQLException sqlex)
    {
      System.out.println("Exception ("+sqlex.getMessage()+") while trying to execute \""+sql+"\"");
    }
    finally
    {
      if(stmt != null)
      {
        try
        {
          stmt.close();
        }catch(SQLException exx){exx.printStackTrace();}
      }
    }
  }
 
  /**
   * Utility method: executes a DML query and throws an exception if
   * an error occurs.
   */
  private void doUpdateDatabase(Connection conn,String sql) throws SQLException
  {
    Statement stmt = null;
    try
    {
      stmt = conn.createStatement();
      stmt.executeUpdate(sql);
    }
    finally
    {
      if(stmt != null)
      {
        try
        {
          stmt.close();
        }catch(SQLException exx){}
      }
    }
  }
  
  static final String byteBufferToHexString(byte[] buffer, int maxNbOfBytes)
  {
    if(buffer == null)
     return null;
    int offset = 0;
    boolean isFirst = true;
    StringBuffer sb = new StringBuffer();
    while(offset < buffer.length && offset < maxNbOfBytes)
    {
      if(!isFirst)
        sb.append(' ');
      else
        isFirst = false;
      String hexrep = Integer.toHexString((int)buffer[offset]&0xFF);
      if(hexrep.length() == 1)
        hexrep = "0"+hexrep;
      sb.append(hexrep);
      offset++;
    }
    String ret= sb.toString();
    return ret;
  }
}
 
class DemoAQRawQueueListener implements AQNotificationListener
{
  OracleConnection conn;
  String queueName;
  String typeName;
  int eventsCount = 0;
  
  public DemoAQRawQueueListener(String _queueName, String _typeName)
   throws SQLException
  {
   queueName = _queueName;
   typeName = _typeName;
   conn = (OracleConnection)DriverManager.getConnection 
     (DemoAQRawQueue.URL, DemoAQRawQueue.USERNAME, DemoAQRawQueue.PASSWORD);
  }
  
  public void onAQNotification(AQNotificationEvent e)
  {
    System.out.println("\n----------- DemoAQRawQueueListener: got an event ----------- ");
    System.out.println(e.toString());
    System.out.println("------------------------------------------------------");
    
    System.out.println("----------- DemoAQRawQueueListener: Dequeue start -----------");
    try
    {
     AQDequeueOptions deqopt = new AQDequeueOptions();
     deqopt.setRetrieveMessageId(true);
     if(e.getConsumerName() != null)
       deqopt.setConsumerName(e.getConsumerName());
     if((e.getMessageProperties()).getDeliveryMode() 
        == AQMessageProperties.MESSAGE_BUFFERED)
     {
       deqopt.setDeliveryMode(AQDequeueOptions.DEQUEUE_BUFFERED);
       deqopt.setVisibility(AQDequeueOptions.DEQUEUE_IMMEDIATE);
     }
     AQMessage msg = conn.dequeue(queueName,deqopt,typeName);
     byte[] msgId = msg.getMessageId();
     if(msgId != null)
     {
       String mesgIdStr = DemoAQRawQueue.byteBufferToHexString(msgId,20);
       System.out.println("ID of message dequeued = "+mesgIdStr);
     }
     System.out.println(msg.getMessageProperties().toString());
     byte[] payload = msg.getPayload();
     if(typeName.equals("RAW"))
     {
       String payloadStr = new String(payload,0,10);
       System.out.println("payload.length="+payload.length+", value="+payloadStr);
     }
     System.out.println("----------- DemoAQRawQueueListener: Dequeue done -----------");
    }
    catch(SQLException sqlex)
    {
     System.out.println(sqlex.getMessage());
    }
    eventsCount++; 
  }
  public int getEventsCount()
  {
    return eventsCount;
  }
  public void closeConnection() throws SQLException
  {
    conn.close();
  }
}

Creating Messages

Before you enqueue a message, you must create the message. An instance of a class implementing the AQMessage interface represents an AQ message. An AQ message contains properties (metadata) and a payload (data). Perform the following to create an AQ message:

  1. Create an instance of AQMessageProperties.

  2. Set the property attributes.

  3. Create the AQ message using the AQMessageProperties object.

  4. Set the payload.

AQ Message Properties

The properties of the AQ message are represented by an instance of the AQMessageProperties interface. You can set or get the following message properties:

The following code snippet illustrates how to create an AQMessageProperties object and create an AQ message using it:

...
// Create the message properties object
AQMessageProperties msgprop = AQFactory.createAQMessageProperties();
// Set some properties (optional)
msgprop.setCorrelation("mycorrelation");
msgprop.setExceptionQueue("MY_EXCEPTION_QUEUE");
msgprop.setExpiration(0);
msgprop.setPriority(1);
AQAgent ag = AQFactory.createAQAgent();
ag.setName("MY_SENDER_AGENT_NAME");ag.setAddress("MY_SENDER_AGENT_ADDRESS");msgprop.setSender(ag);
msgprop.setRecipientList(recipients);

// Create the message
AQMessage mesg = AQFactory.createAQMessage(msgprop);
...

AQ Message Payload

Depending on the type of the queue, the payload of the AQ message can be specified using the setPayload method of the AQMessage interface. The following code snippet illustrates how to set the payload:

...
byte[] rawPayload = new byte[500];for(int i=0;i<rawPayload.length;i++)  rawPayload[i] = 'b';
mesg.setPayload(new oracle.sql.RAW(rawPayload));
...

You can retrieve the payload of an AQ message using the getPayload method or the appropriate getXXXPayload method. These methods are defined in the AQMessage interface.

Enqueuing Messages

After you create a message and set the message properties and payload, you can enqueue the message using the enqueue method of the OracleConnection interface. Before you enqueue the message, you can specify some enqueue options. The AQEnqueueOptions class enables you to specify the following enqueue options:

The following code snippet illustrates how to set the enqueue options and enqueue the message:

...
// Set the enqueue options
AQEnqueueOptions opt = new AQEnqueueOptions();opt.setRetrieveMessageId(true);

// Enqueue the message
// conn is an instance of OracleConnection
// queueName is a String
// mesg is an instance of AQMessage
conn.enqueue(queueName, opt, mesg);
...

Dequeuing Messages

Enqueued messages can be dequeued using the dequeue method of the OracleConnection interface. Before you dequeue a message you must set the dequeue options. The AQDequeueOptions class enables you to specify the following dequeue options:

The following code snippet illustrates how to set the dequeue options and dequeue the message:

...
// Set the dequeue options
AQDequeueOptions deqopt = new AQDequeueOptions();
deqopt.setRetrieveMessageId(true);
deqopt.setConsumerName(consumerName);
// Dequeue the message
// conn is an OracleConnection object
// queueName is a String identifying the queue
// queueType is a String specifying the type of the queue, such as RAW
AQMessage msg = conn.dequeue(queueName,deqopt,queueType);

Examples: Enqueuing and Dequeuing

This section provides a few examples that illustrate how to enqueue and dequeue messages.

Example 25-2 illustrates how to enqueue a message, and Example 25-3 illustrates how to dequeue a message.

Example 25-2 Enqueuing a Single Message

This example illustrates how to obtain access to a queue, create a message, and enqueue it.

// Get access to the queue
// conn is an instance of oracle.jdbc.OracleConnection
oracle.jdbc.aq.AQQueue queue = conn.getAQQueue("SCOTT.MY_QUEUE","RAW");

// Create the message properties object
oracle.jdbc.aq.AQMessageProperties msgprop =
oracle.jdbc.aq.AQMessageProperties.createAQMessageProperties();
// Set some properties (optional)
msgprop.setPriority(1);
msgprop.setExceptionQueue("EXCEPTION_QUEUE");
msgprop.setExpiration(0);
AQAgent agent = AQAgent.createAQAgent("AGENTNAME");
ag2.setAddress("AGENTADDRESS");
msgprop.setSender(agent);

// Create the message
AQMessage mesg = AQMessage.createAQMessage(msgprop);

// Set the payload
// where buffer contains the RAW payload:
mesg.setRawPayload(new AQRawPayload(buffer));

// Set the enqueue options
AQEnqueueOptions options = new AQEnqueueOptions();

//Set the enqueue options using the setXXX methods of AQEnqueueOptions.
// Enqueue the message
queue.enqueue(options, mesg);

Example 25-3 Dequeuing a Single Message

This example illustrates how to obtain access to a queue, set the dequeue options, and dequeue the message.

// Get access to the queue
// conn is an instance of oracle.jdbc.OracleConnection
oracle.jdbc.aq.AQQueue queue = conn.getAQQueue ("SCOTT.MY_QUEUE","RAW");

// Set the dequeue options
AQDequeueOptions options = new AQDequeueOptions();
options.setDeliveryMode(AQDequeueOptions.BUFFERED);

// Dequeue the message
AQMessage mesg = queue.dequeue(options);