ヘッダーをスキップ
Oracle Database JDBC開発者ガイドおよびリファレンス
11gリリース1(11.1)
E05720-02
  目次
目次
索引
索引

戻る
戻る
次へ
次へ
 

25 Oracleアドバンスト・キューイング(AQ)

Oracleアドバンスト・キューイング(AQ)は、データベース統合型のメッセージ・キューイング機能を提供します。Oracle AQはOracle Streamsの最上位に構築される機能で、Oracle Databaseの機能を最適化します。これにより、メッセージを永続的に格納したり、異なるコンピュータやデータベース上のキュー間でメッセージを伝播させたり、Oracle Net Services、HTTPおよびHTTPSを使用してメッセージを伝送できる環境が実現します。またOracle AQはデータベース表内に実装されるので、データ操作について得られる高可用性、拡張性および信頼性の利点はすべて、キュー・データについても同様に得られます。この章では、Oracle AQに対するJavaインタフェースについて説明します。


関連項目:

『Oracle Streamsアドバンスト・キューイング・ユーザーズ・ガイド』

次の項目が含まれます。

Oracleアドバンスト・キューイングの機能とフレームワーク

Oracle Database 11gリリース1(11.1)では、新しいJavaパッケージoracle.jdbc.aqの導入により、AQに対する高速Javaインタフェースがサポートされています。このパッケージには、次のものが含まれています。

これらのクラスとインタフェースを使用して、既存のキューにアクセスしたり、メッセージを作成したり、メッセージをエンキューおよびデキューできます。


注意:

Oracle JDBCドライバでは、キューを作成するためのAPIを提供していません。キューは、DBMS_AQADM PL/SQLパッケージを使用して作成する必要があります。


関連項目:

APIの詳細は、Javadocを参照してください。

AQ非同期イベント通知

JDBCアプリケーションでは次のことができます。

登録されたクライアントは、イベントがトリガーされた場合または明示的なAQエンキューの場合に非同期に通知されます(または、通知希望を登録したキューに新規メッセージがエンキューされた場合)。クライアントはデータベースに接続されている必要はありません。

例25-1は、リリース11.1のJDBC Thinドライバの新しい機能である新しいJDBC AQインタフェースの説明です。単一コンシューマおよび複数コンシューマのキューからRAW型をエンキューおよびデキューする方法を示しています。また、AQ非同期通知の動作についても示します。この例では、ユーザーSCOTTがデータベースに接続しています。そのため、データベース内でこのユーザーに次の権限を付与する必要があります。

GRANT EXECUTE ON DBMS_AQ to SCOTT;
GRANT EXECUTE ON DBMS_AQADM to SCOTT;
GRANT AQ_ADMINISTRATOR_ROLE TO SCOTT;
GRANT ADMINISTER DATABASE TRIGGER TO SCOTT;

例25-1 AQ非同期イベント通知の例

import java.sql.*;
import java.util.Properties;
import oracle.jdbc.*;
import oracle.jdbc.aq.*;
public class DemoAQRawQueue
{
  static final String USERNAME= "scott";
  static final String PASSWORD= "tiger";
  static final String URL = "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=tcp)(HOST=oracleserver.mydomain.com)
 (PORT=1521))"+"(CONNECT_DATA=(SERVICE_NAME=mydatabaseinstance)))";

  public static final void main(String[] argv)
  {
    DemoAQRawQueue demo = new DemoAQRawQueue();
    try
    {
      demo.run();
    }catch(SQLException ex)
    {
      ex.printStackTrace();
    }
  }

  void run() throws SQLException
  {
    OracleConnection connection = connect();
    connection.setAutoCommit(false);
    cleanup(connection);
    setup(connection);
    // run the demo for single consumer queue:
    demoSingleConsumerQueue(connection);
    // run the demo for multi consumer queue:
    demoMultipleConsumerQueue(connection);
    connection.close();
  }

  /**
   * Single consumer queue demo:
   * This method enqueues a dummy message into the RAW_SINGLE_QUEUE queue and dequeues it.
   * Then it registers for AQ notification on this same queue and enqueues a
   * message again. The AQ listener will be notified that a new message has arrived
   * in the queue and it will dequeue it.
   */
  void demoSingleConsumerQueue(OracleConnection connection) throws SQLException
  {
    System.out.println("\n ============= Start single consumer queue demo ============= \n");

    String queueType = "RAW";
    String queueName = USERNAME+".RAW_SINGLE_QUEUE";

    enqueueDummyMessage(connection,queueName,null);
    dequeue(connection,queueName,queueType,null);

    AQNotificationRegistration reg = registerForAQEvents(
      connection,queueName);
    DemoAQRawQueueListener single_li = new DemoAQRawQueueListener(
       queueName,queueType);
    reg.addListener(single_li);

    enqueueDummyMessage(connection,queueName,null);
    connection.commit();

    while(single_li.getEventsCount() < 1)
    {
      try
      { Thread.currentThread().sleep(1000); }
      catch (InterruptedException e)
      { }
    }
    single_li.closeConnection();
    connection.unregisterAQNotification(reg);
  }

  /**
   * Multi consumer queue demo:
   * This method first registers for AQ notification upon the agent "BLUE". It
   * then enqueues a message for "RED" and "BLUE"
   */
  void demoMultipleConsumerQueue(OracleConnection connection) throws SQLException
  {
    System.out.println("\n ============= Start multi consumer queue demo ============= \n");
    String queueType = "RAW";
    String queueName = USERNAME+".RAW_MULTIPLE_QUEUE";

    AQNotificationRegistration reg = registerForAQEvents(
      connection,queueName+":BLUE");
    DemoAQRawQueueListener multi_li = new DemoAQRawQueueListener(
       queueName,queueType);
    reg.addListener(multi_li);
    AQAgent[] recipients = new AQAgent[2];
    recipients[0] = AQFactory.createAQAgent();
    recipients[0].setName("BLUE");
    recipients[1] = AQFactory.createAQAgent();
    recipients[1].setName("RED");

    enqueueDummyMessage(connection,queueName,recipients);
    connection.commit();

    while(multi_li.getEventsCount() < 1)
    {
      try
      { Thread.currentThread().sleep(1000); }
      catch (InterruptedException e)
      { }
    }

    dequeue(connection,queueName,queueType,"RED");

    multi_li.closeConnection();
    connection.unregisterAQNotification(reg);
  }


 /**
  * This method enqueues a dummy message in the queue specified by its name.
  */
  public void enqueueDummyMessage(OracleConnection conn,
                      String queueName,
                      AQAgent[] recipients) throws SQLException
  {
    System.out.println("----------- Enqueue start ------------");
    // First create the message properties:
    AQMessageProperties msgprop = AQFactory.createAQMessageProperties();
    msgprop.setCorrelation("mycorrelation");
    msgprop.setExceptionQueue("MY_EXCEPTION_QUEUE");

    // Specify an agent as the sender:
    AQAgent ag = AQFactory.createAQAgent();
    ag.setName("MY_SENDER_AGENT_NAME");
    ag.setAddress("MY_SENDER_AGENT_ADDRESS");
    msgprop.setSender(ag);

    // handle multi consumer case:
    if(recipients != null)
      msgprop.setRecipientList(recipients);

    System.out.println(msgprop.toString());

    // Create the actual AQMessage instance:
    AQMessage mesg = AQFactory.createAQMessage(msgprop);
    // and add a payload:
    byte[] rawPayload = new byte[500];
    for(int i=0;i<rawPayload.length;i++)
      rawPayload[i] = 'b';
    mesg.setPayload(new oracle.sql.RAW(rawPayload));

    // We want to retrieve the message id after enqueue:
    AQEnqueueOptions opt = new AQEnqueueOptions();
    opt.setRetrieveMessageId(true);

    // execute the actual enqueue operation:
    conn.enqueue(queueName,opt,mesg);

    byte[] mesgId = mesg.getMessageId();

    if(mesgId != null)
    {
      String mesgIdStr = byteBufferToHexString(mesgId,20);
      System.out.println("Message ID from enqueue call: "+mesgIdStr);
    }
    System.out.println("----------- Enqueue done ------------");
  }

  /**
   * This methods dequeues the next available message from the queue specified
   * by "queueName".
   */
  public void dequeue(OracleConnection conn,
                      String queueName,
                      String queueType,
                      String consumerName
    ) throws SQLException
  {
    System.out.println("----------- Dequeue start ------------");
    AQDequeueOptions deqopt = new AQDequeueOptions();
    deqopt.setRetrieveMessageId(true);
    if(consumerName != null)
      deqopt.setConsumerName(consumerName);

    // dequeue operation:
    AQMessage msg = conn.dequeue(queueName,deqopt,queueType);

    // print out the message that has been dequeued:
    byte[] payload = msg.getPayload();
    byte[] msgId = msg.getMessageId();
    if(msgId != null)
    {
      String mesgIdStr = byteBufferToHexString(msgId,20);
      System.out.println("ID of message dequeued = "+mesgIdStr);
    }
    AQMessageProperties msgProp = msg.getMessageProperties();
    System.out.println(msgProp.toString());
    String payloadStr = new String(payload,0,10);
    System.out.println("payload.length="+payload.length+", value="+payloadStr);
    System.out.println("----------- Dequeue done ------------");
  }


  public AQNotificationRegistration registerForAQEvents(
    OracleConnection conn,
    String queueName) throws SQLException
  {
    Properties globalOptions = new Properties();
    String[] queueNameArr = new String[1];
    queueNameArr[0] = queueName;
    Properties[] opt = new Properties[1];
    opt[0] = new Properties();
    opt[0].setProperty(OracleConnection.NTF_AQ_PAYLOAD,"true");
    AQNotificationRegistration[] regArr = conn.registerAQNotification(queueNameArr,opt,globalOptions);
    AQNotificationRegistration reg = regArr[0];
    return reg;
  }

  /**
   *
   */
  private void setup(Connection conn) throws SQLException
  {

    doUpdateDatabase(conn,
               "BEGIN "+
               "DBMS_AQADM.CREATE_QUEUE_TABLE( "+
               "   QUEUE_TABLE        =>  '"+USERNAME+".RAW_SINGLE_QUEUE_TABLE',  "+
               "   QUEUE_PAYLOAD_TYPE =>  'RAW', "+
               "   COMPATIBLE         =>  '10.0'); "+
               "END; ");
    doUpdateDatabase(conn,
               "BEGIN "+
               "DBMS_AQADM.CREATE_QUEUE( "+
               "    QUEUE_NAME     =>   '"+USERNAME+".RAW_SINGLE_QUEUE', "+
               "    QUEUE_TABLE    =>   '"+USERNAME+".RAW_SINGLE_QUEUE_TABLE'); "+
               "END;  ");
    doUpdateDatabase(conn,
               "BEGIN "+
               "  DBMS_AQADM.START_QUEUE('"+USERNAME+".RAW_SINGLE_QUEUE'); "+
               "END; ");

    // create a multi consumer RAW queue:
    doUpdateDatabase(conn,
               "BEGIN "+
               "DBMS_AQADM.CREATE_QUEUE_TABLE( "+
               "   QUEUE_TABLE        =>  '"+USERNAME+".RAW_MULTIPLE_QUEUE_TABLE',  "+
               "   QUEUE_PAYLOAD_TYPE =>  'RAW', "+
               "   MULTIPLE_CONSUMERS =>  TRUE, "+
               "   COMPATIBLE         =>  '10.0'); "+
               "END; ");
    doUpdateDatabase(conn,
               "BEGIN "+
               "DBMS_AQADM.CREATE_QUEUE( "+
               "    QUEUE_NAME     =>   '"+USERNAME+".RAW_MULTIPLE_QUEUE', "+
               "    QUEUE_TABLE    =>   '"+USERNAME+".RAW_MULTIPLE_QUEUE_TABLE'); "+
               "END;  ");
    doUpdateDatabase(conn,
               "BEGIN "+
               "  DBMS_AQADM.START_QUEUE('"+USERNAME+".RAW_MULTIPLE_QUEUE'); "+
               "END; ");
  }

  void cleanup( Connection conn)
  {
    tryUpdateDatabase(conn,
                "BEGIN "+
                "  DBMS_AQADM.STOP_QUEUE('"+USERNAME+".RAW_SINGLE_QUEUE'); "+
                "END; ");
    tryUpdateDatabase(conn,
                "BEGIN "+
                "  DBMS_AQADM.DROP_QUEUE_TABLE( "+
                "    QUEUE_TABLE         => '"+USERNAME+".RAW_SINGLE_QUEUE_TABLE', "+
                "    FORCE               => TRUE); "+
                "END; ");

    tryUpdateDatabase(conn,
                "BEGIN "+
                "  DBMS_AQADM.STOP_QUEUE('"+USERNAME+".RAW_MULTIPLE_QUEUE'); "+
                "END; ");
    tryUpdateDatabase(conn,
                "BEGIN "+
                "  DBMS_AQADM.DROP_QUEUE_TABLE( "+
                "    QUEUE_TABLE         => '"+USERNAME+".RAW_MULTIPLE_QUEUE_TABLE', "+
                "    FORCE               => TRUE); "+
                "END; ");
  }


  /**
   * Creates a connection the database.
   */
  OracleConnection connect() throws SQLException
  {
    OracleDriver dr = new OracleDriver();
    Properties prop = new Properties();
    prop.setProperty("user",DemoAQRawQueue.USERNAME);
    prop.setProperty("password",DemoAQRawQueue.PASSWORD);
    return (OracleConnection)dr.connect(DemoAQRawQueue.URL,prop);
  }

  /**
   * Utility method: executes a DML query but doesn't throw any exception if
   * an error occurs.
   */
  private void tryUpdateDatabase(Connection conn,String sql)
  {
    Statement stmt = null;
    try
    {
      stmt = conn.createStatement();
      stmt.executeUpdate(sql);
    }catch(SQLException sqlex)
    {
      System.out.println("Exception ("+sqlex.getMessage()+") while trying to execute \""+sql+"\"");
    }
    finally
    {
      if(stmt != null)
      {
        try
        {
          stmt.close();
        }catch(SQLException exx){exx.printStackTrace();}
      }
    }
  }

  /**
   * Utility method: executes a DML query and throws an exception if
   * an error occurs.
   */
  private void doUpdateDatabase(Connection conn,String sql) throws SQLException
  {
    Statement stmt = null;
    try
    {
      stmt = conn.createStatement();
      stmt.executeUpdate(sql);
    }
    finally
    {
      if(stmt != null)
      {
        try
        {
          stmt.close();
        }catch(SQLException exx){}
      }
    }
  }

  static final String byteBufferToHexString(byte[] buffer, int maxNbOfBytes)
  {
    if(buffer == null)
     return null;
    int offset = 0;
    boolean isFirst = true;
    StringBuffer sb = new StringBuffer();
    while(offset < buffer.length && offset < maxNbOfBytes)
    {
      if(!isFirst)
        sb.append(' ');
      else
        isFirst = false;
      String hexrep = Integer.toHexString((int)buffer[offset]&0xFF);
      if(hexrep.length() == 1)
        hexrep = "0"+hexrep;
      sb.append(hexrep);
      offset++;
    }
    String ret= sb.toString();
    return ret;
  }
}

class DemoAQRawQueueListener implements AQNotificationListener
{
  OracleConnection conn;
  String queueName;
  String typeName;
  int eventsCount = 0;

  public DemoAQRawQueueListener(String _queueName, String _typeName)
   throws SQLException
  {
   queueName = _queueName;
   typeName = _typeName;
   conn = (OracleConnection)DriverManager.getConnection
     (DemoAQRawQueue.URL, DemoAQRawQueue.USERNAME, DemoAQRawQueue.PASSWORD);
  }

  public void onAQNotification(AQNotificationEvent e)
  {
    System.out.println("\n----------- DemoAQRawQueueListener: got an event ----------- ");
    System.out.println(e.toString());
    System.out.println("------------------------------------------------------");

    System.out.println("----------- DemoAQRawQueueListener: Dequeue start -----------");
    try
    {
     AQDequeueOptions deqopt = new AQDequeueOptions();
     deqopt.setRetrieveMessageId(true);
     if(e.getConsumerName() != null)
       deqopt.setConsumerName(e.getConsumerName());
     if((e.getMessageProperties()).getDeliveryMode()
        == AQMessageProperties.MESSAGE_BUFFERED)
     {
       deqopt.setDeliveryMode(AQDequeueOptions.DEQUEUE_BUFFERED);
       deqopt.setVisibility(AQDequeueOptions.DEQUEUE_IMMEDIATE);
     }
     AQMessage msg = conn.dequeue(queueName,deqopt,typeName);
     byte[] msgId = msg.getMessageId();
     if(msgId != null)
     {
       String mesgIdStr = DemoAQRawQueue.byteBufferToHexString(msgId,20);
       System.out.println("ID of message dequeued = "+mesgIdStr);
     }
     System.out.println(msg.getMessageProperties().toString());
     byte[] payload = msg.getPayload();
     if(typeName.equals("RAW"))
     {
       String payloadStr = new String(payload,0,10);
       System.out.println("payload.length="+payload.length+", value="+payloadStr);
     }
     System.out.println("----------- DemoAQRawQueueListener: Dequeue done -----------");
    }
    catch(SQLException sqlex)
    {
     System.out.println(sqlex.getMessage());
    }
    eventsCount++;
  }
  public int getEventsCount()
  {
    return eventsCount;
  }
  public void closeConnection() throws SQLException
  {
    conn.close();
  }
}

メッセージの作成

メッセージをエンキューするには、まずそのメッセージを作成する必要があります。AQメッセージは、AQMessageインタフェースを実装したクラスのインスタンスによって表されます。各AQメッセージには、一連のプロパティ(メタデータ)と1つのペイロード(データ)が含まれます。AQメッセージを作成するには次の操作を実行します。

  1. AQMessagePropertiesのインスタンスを作成します。

  2. プロパティ属性を設定します。

  3. AQMessagePropertiesオブジェクトを使用してAQメッセージを作成します。

  4. ペイロードを設定します。

AQメッセージのプロパティ

AQメッセージのプロパティは、AQMessagePropertiesインタフェースのインスタンスによって表されます。設定または取得できるメッセージ・プロパティは次のとおりです。

次のコードは、AQMessagePropertiesオブジェクトを作成し、それを使用してAQメッセージを作成する方法の例を示しています。

...
// Create the message properties object
AQMessageProperties msgprop = AQFactory.createAQMessageProperties();
// Set some properties (optional)
msgprop.setCorrelation("mycorrelation");
msgprop.setExceptionQueue("MY_EXCEPTION_QUEUE");
msgprop.setExpiration(0);
msgprop.setPriority(1);
AQAgent ag = AQFactory.createAQAgent();
ag.setName("MY_SENDER_AGENT_NAME");ag.setAddress("MY_SENDER_AGENT_ADDRESS");msgprop.setSender(ag);
msgprop.setRecipientList(recipients);

// Create the message
AQMessage mesg = AQFactory.createAQMessage(msgprop);
...

AQメッセージのペイロード

AQメッセージのペイロードは、キューの型に応じ、AQMessageインタフェースのsetPayloadメソッドを使用して指定します。次のコードは、ペイロードの設定方法の例を示しています。

...
byte[] rawPayload = new byte[500];for(int i=0;i<rawPayload.length;i++)  rawPayload[i] = 'b';
mesg.setPayload(new oracle.sql.RAW(rawPayload));
...

AQメッセージのペイロードを取得するには、getPayloadメソッドまたは適切なgetXXXPayloadメソッドを使用します。これらのメソッドは、AQMessageインタフェースで定義されています。

メッセージのエンキュー

メッセージを作成し、メッセージのプロパティとペイロードを設定したら、OracleConnectionインタフェースのenqueueメソッドを使用してメッセージをエンキューできます。メッセージをエンキューする前には、いくつかのエンキュー・オプションを指定できます。具体的には、AQEnqueueOptionsクラスを使用して、次のエンキュー・オプションを指定できます。

次のコードは、エンキュー・オプションを設定してメッセージをエンキューする方法の例を示しています。

...
// Set the enqueue options
AQEnqueueOptions opt = new AQEnqueueOptions();opt.setRetrieveMessageId(true);

// Enqueue the message
// conn is an instance of OracleConnection
// queueName is a String
// mesg is an instance of AQMessage
conn.enqueue(queueName, opt, mesg);
...

メッセージのデキュー

エンキューされたメッセージをデキューするには、OracleConnectionインタフェースのdequeueメソッドを使用します。メッセージをデキューする前には、デキュー・オプションを設定する必要があります。具体的には、AQDequeueOptionsクラスを使用して、次のデキュー・オプションを指定できます。

次のコードは、デキュー・オプションを設定してメッセージをデキューする方法の例を示しています。

...
// Set the dequeue options
AQDequeueOptions deqopt = new AQDequeueOptions();
deqopt.setRetrieveMessageId(true);
deqopt.setConsumerName(consumerName);
// Dequeue the message
// conn is an OracleConnection object
// queueName is a String identifying the queue
// queueType is a String specifying the type of the queue, such as RAW
AQMessage msg = conn.dequeue(queueName,deqopt,queueType);

例: エンキューとデキュー

この項では、メッセージのエンキュー方法とデキュー方法を、いくつかの例で示します。

例25-2ではメッセージのエンキュー方法を、例25-3ではメッセージのデキュー方法を示します。

例25-2 単一メッセージのエンキュー

この例では、キューにアクセスし、メッセージを作成して、メッセージをエンキューする方法を示します。

// Get access to the queue
// conn is an instance of oracle.jdbc.OracleConnection
oracle.jdbc.aq.AQQueue queue = conn.getAQQueue("SCOTT.MY_QUEUE","RAW");

// Create the message properties object
oracle.jdbc.aq.AQMessageProperties msgprop =
oracle.jdbc.aq.AQMessageProperties.createAQMessageProperties();
// Set some properties (optional)
msgprop.setPriority(1);
msgprop.setExceptionQueue("EXCEPTION_QUEUE");
msgprop.setExpiration(0);
AQAgent agent = AQAgent.createAQAgent("AGENTNAME");
ag2.setAddress("AGENTADDRESS");
msgprop.setSender(agent);

// Create the message
AQMessage mesg = AQMessage.createAQMessage(msgprop);

// Set the payload
// where buffer contains the RAW payload:
mesg.setRawPayload(new AQRawPayload(buffer));

// Set the enqueue options
AQEnqueueOptions options = new AQEnqueueOptions();

//Set the enqueue options using the setXXX methods of AQEnqueueOptions.
// Enqueue the message
queue.enqueue(options, mesg);

例25-3 単一メッセージのデキュー

この例では、キューにアクセスし、デキュー・オプションを設定して、メッセージをデキューする方法を示します。

// Get access to the queue
// conn is an instance of oracle.jdbc.OracleConnection
oracle.jdbc.aq.AQQueue queue = conn.getAQQueue ("SCOTT.MY_QUEUE","RAW");

// Set the dequeue options
AQDequeueOptions options = new AQDequeueOptions();
options.setDeliveryMode(AQDequeueOptions.BUFFERED);

// Dequeue the message
AQMessage mesg = queue.dequeue(options);