Oracleアドバンスト・キューイング(AQ)は、データベース統合型のメッセージ・キューイング機能を提供します。Oracle AQはOracle Streamsの最上位に構築される機能で、Oracle Databaseの機能を最適化します。これにより、メッセージを永続的に格納したり、異なるコンピュータやデータベース上のキュー間でメッセージを伝播させたり、Oracle Net Services、HTTPおよびHTTPSを使用してメッセージを伝送できる環境が実現します。またOracle AQはデータベース表内に実装されるので、データ操作について得られる高可用性、拡張性および信頼性の利点はすべて、キュー・データについても同様に得られます。この章では、Oracle AQに対するJavaインタフェースについて説明します。
|
関連項目: 『Oracle Streamsアドバンスト・キューイング・ユーザーズ・ガイド』 |
次の項目が含まれます。
Oracle Database 11gリリース1(11.1)では、新しいJavaパッケージoracle.jdbc.aqの導入により、AQに対する高速Javaインタフェースがサポートされています。このパッケージには、次のものが含まれています。
クラス
AQDequeueOptions
デキュー操作で利用可能なオプションを指定します。
AQEnqueueOptions
エンキュー操作で利用可能なオプションを指定します。
AQFactory
AQのファクトリ・クラスです。
AQNotificationEvent
通知を有効にするよう登録したキューに新しいメッセージがエンキューされると作成されます。
インタフェース
AQAgent
キューのユーザーや、メッセージのプロデューサまたはコンシューマを、表したり特定したりするために使用します。
AQMessage
エンキューまたはデキュー対象のメッセージを表します。
AQMessageProperties
相関、送信者、遅延、有効期限、受信者、優先度、順序付けなどのメッセージ・プロパティが含まれます。
AQNotificationListener
AQ通知イベントを受信するためのリスナー・インタフェースです。
AQNotificationRegistration
特定のキューに新しいメッセージがエンキューされた場合に通知を受けることを表します。
これらのクラスとインタフェースを使用して、既存のキューにアクセスしたり、メッセージを作成したり、メッセージをエンキューおよびデキューできます。
|
注意: Oracle JDBCドライバでは、キューを作成するためのAPIを提供していません。キューは、DBMS_AQADM PL/SQLパッケージを使用して作成する必要があります。 |
|
関連項目: APIの詳細は、Javadocを参照してください。 |
JDBCアプリケーションでは次のことができます。
AQネームスペースに通知希望項目を登録し、エンキュー発生時に通知を受け取る
データベース・イベントにサブスクリプションの希望項目を登録し、イベントのトリガー時に通知を受け取る
登録されたクライアントは、イベントがトリガーされた場合または明示的なAQエンキューの場合に非同期に通知されます(または、通知希望を登録したキューに新規メッセージがエンキューされた場合)。クライアントはデータベースに接続されている必要はありません。
例
例25-1は、リリース11.1のJDBC Thinドライバの新しい機能である新しいJDBC AQインタフェースの説明です。単一コンシューマおよび複数コンシューマのキューからRAW型をエンキューおよびデキューする方法を示しています。また、AQ非同期通知の動作についても示します。この例では、ユーザーSCOTTがデータベースに接続しています。そのため、データベース内でこのユーザーに次の権限を付与する必要があります。
GRANT EXECUTE ON DBMS_AQ to SCOTT; GRANT EXECUTE ON DBMS_AQADM to SCOTT; GRANT AQ_ADMINISTRATOR_ROLE TO SCOTT; GRANT ADMINISTER DATABASE TRIGGER TO SCOTT;
例25-1 AQ非同期イベント通知の例
import java.sql.*;
import java.util.Properties;
import oracle.jdbc.*;
import oracle.jdbc.aq.*;
public class DemoAQRawQueue
{
static final String USERNAME= "scott";
static final String PASSWORD= "tiger";
static final String URL = "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=tcp)(HOST=oracleserver.mydomain.com)
(PORT=1521))"+"(CONNECT_DATA=(SERVICE_NAME=mydatabaseinstance)))";
public static final void main(String[] argv)
{
DemoAQRawQueue demo = new DemoAQRawQueue();
try
{
demo.run();
}catch(SQLException ex)
{
ex.printStackTrace();
}
}
void run() throws SQLException
{
OracleConnection connection = connect();
connection.setAutoCommit(false);
cleanup(connection);
setup(connection);
// run the demo for single consumer queue:
demoSingleConsumerQueue(connection);
// run the demo for multi consumer queue:
demoMultipleConsumerQueue(connection);
connection.close();
}
/**
* Single consumer queue demo:
* This method enqueues a dummy message into the RAW_SINGLE_QUEUE queue and dequeues it.
* Then it registers for AQ notification on this same queue and enqueues a
* message again. The AQ listener will be notified that a new message has arrived
* in the queue and it will dequeue it.
*/
void demoSingleConsumerQueue(OracleConnection connection) throws SQLException
{
System.out.println("\n ============= Start single consumer queue demo ============= \n");
String queueType = "RAW";
String queueName = USERNAME+".RAW_SINGLE_QUEUE";
enqueueDummyMessage(connection,queueName,null);
dequeue(connection,queueName,queueType,null);
AQNotificationRegistration reg = registerForAQEvents(
connection,queueName);
DemoAQRawQueueListener single_li = new DemoAQRawQueueListener(
queueName,queueType);
reg.addListener(single_li);
enqueueDummyMessage(connection,queueName,null);
connection.commit();
while(single_li.getEventsCount() < 1)
{
try
{ Thread.currentThread().sleep(1000); }
catch (InterruptedException e)
{ }
}
single_li.closeConnection();
connection.unregisterAQNotification(reg);
}
/**
* Multi consumer queue demo:
* This method first registers for AQ notification upon the agent "BLUE". It
* then enqueues a message for "RED" and "BLUE"
*/
void demoMultipleConsumerQueue(OracleConnection connection) throws SQLException
{
System.out.println("\n ============= Start multi consumer queue demo ============= \n");
String queueType = "RAW";
String queueName = USERNAME+".RAW_MULTIPLE_QUEUE";
AQNotificationRegistration reg = registerForAQEvents(
connection,queueName+":BLUE");
DemoAQRawQueueListener multi_li = new DemoAQRawQueueListener(
queueName,queueType);
reg.addListener(multi_li);
AQAgent[] recipients = new AQAgent[2];
recipients[0] = AQFactory.createAQAgent();
recipients[0].setName("BLUE");
recipients[1] = AQFactory.createAQAgent();
recipients[1].setName("RED");
enqueueDummyMessage(connection,queueName,recipients);
connection.commit();
while(multi_li.getEventsCount() < 1)
{
try
{ Thread.currentThread().sleep(1000); }
catch (InterruptedException e)
{ }
}
dequeue(connection,queueName,queueType,"RED");
multi_li.closeConnection();
connection.unregisterAQNotification(reg);
}
/**
* This method enqueues a dummy message in the queue specified by its name.
*/
public void enqueueDummyMessage(OracleConnection conn,
String queueName,
AQAgent[] recipients) throws SQLException
{
System.out.println("----------- Enqueue start ------------");
// First create the message properties:
AQMessageProperties msgprop = AQFactory.createAQMessageProperties();
msgprop.setCorrelation("mycorrelation");
msgprop.setExceptionQueue("MY_EXCEPTION_QUEUE");
// Specify an agent as the sender:
AQAgent ag = AQFactory.createAQAgent();
ag.setName("MY_SENDER_AGENT_NAME");
ag.setAddress("MY_SENDER_AGENT_ADDRESS");
msgprop.setSender(ag);
// handle multi consumer case:
if(recipients != null)
msgprop.setRecipientList(recipients);
System.out.println(msgprop.toString());
// Create the actual AQMessage instance:
AQMessage mesg = AQFactory.createAQMessage(msgprop);
// and add a payload:
byte[] rawPayload = new byte[500];
for(int i=0;i<rawPayload.length;i++)
rawPayload[i] = 'b';
mesg.setPayload(new oracle.sql.RAW(rawPayload));
// We want to retrieve the message id after enqueue:
AQEnqueueOptions opt = new AQEnqueueOptions();
opt.setRetrieveMessageId(true);
// execute the actual enqueue operation:
conn.enqueue(queueName,opt,mesg);
byte[] mesgId = mesg.getMessageId();
if(mesgId != null)
{
String mesgIdStr = byteBufferToHexString(mesgId,20);
System.out.println("Message ID from enqueue call: "+mesgIdStr);
}
System.out.println("----------- Enqueue done ------------");
}
/**
* This methods dequeues the next available message from the queue specified
* by "queueName".
*/
public void dequeue(OracleConnection conn,
String queueName,
String queueType,
String consumerName
) throws SQLException
{
System.out.println("----------- Dequeue start ------------");
AQDequeueOptions deqopt = new AQDequeueOptions();
deqopt.setRetrieveMessageId(true);
if(consumerName != null)
deqopt.setConsumerName(consumerName);
// dequeue operation:
AQMessage msg = conn.dequeue(queueName,deqopt,queueType);
// print out the message that has been dequeued:
byte[] payload = msg.getPayload();
byte[] msgId = msg.getMessageId();
if(msgId != null)
{
String mesgIdStr = byteBufferToHexString(msgId,20);
System.out.println("ID of message dequeued = "+mesgIdStr);
}
AQMessageProperties msgProp = msg.getMessageProperties();
System.out.println(msgProp.toString());
String payloadStr = new String(payload,0,10);
System.out.println("payload.length="+payload.length+", value="+payloadStr);
System.out.println("----------- Dequeue done ------------");
}
public AQNotificationRegistration registerForAQEvents(
OracleConnection conn,
String queueName) throws SQLException
{
Properties globalOptions = new Properties();
String[] queueNameArr = new String[1];
queueNameArr[0] = queueName;
Properties[] opt = new Properties[1];
opt[0] = new Properties();
opt[0].setProperty(OracleConnection.NTF_AQ_PAYLOAD,"true");
AQNotificationRegistration[] regArr = conn.registerAQNotification(queueNameArr,opt,globalOptions);
AQNotificationRegistration reg = regArr[0];
return reg;
}
/**
*
*/
private void setup(Connection conn) throws SQLException
{
doUpdateDatabase(conn,
"BEGIN "+
"DBMS_AQADM.CREATE_QUEUE_TABLE( "+
" QUEUE_TABLE => '"+USERNAME+".RAW_SINGLE_QUEUE_TABLE', "+
" QUEUE_PAYLOAD_TYPE => 'RAW', "+
" COMPATIBLE => '10.0'); "+
"END; ");
doUpdateDatabase(conn,
"BEGIN "+
"DBMS_AQADM.CREATE_QUEUE( "+
" QUEUE_NAME => '"+USERNAME+".RAW_SINGLE_QUEUE', "+
" QUEUE_TABLE => '"+USERNAME+".RAW_SINGLE_QUEUE_TABLE'); "+
"END; ");
doUpdateDatabase(conn,
"BEGIN "+
" DBMS_AQADM.START_QUEUE('"+USERNAME+".RAW_SINGLE_QUEUE'); "+
"END; ");
// create a multi consumer RAW queue:
doUpdateDatabase(conn,
"BEGIN "+
"DBMS_AQADM.CREATE_QUEUE_TABLE( "+
" QUEUE_TABLE => '"+USERNAME+".RAW_MULTIPLE_QUEUE_TABLE', "+
" QUEUE_PAYLOAD_TYPE => 'RAW', "+
" MULTIPLE_CONSUMERS => TRUE, "+
" COMPATIBLE => '10.0'); "+
"END; ");
doUpdateDatabase(conn,
"BEGIN "+
"DBMS_AQADM.CREATE_QUEUE( "+
" QUEUE_NAME => '"+USERNAME+".RAW_MULTIPLE_QUEUE', "+
" QUEUE_TABLE => '"+USERNAME+".RAW_MULTIPLE_QUEUE_TABLE'); "+
"END; ");
doUpdateDatabase(conn,
"BEGIN "+
" DBMS_AQADM.START_QUEUE('"+USERNAME+".RAW_MULTIPLE_QUEUE'); "+
"END; ");
}
void cleanup( Connection conn)
{
tryUpdateDatabase(conn,
"BEGIN "+
" DBMS_AQADM.STOP_QUEUE('"+USERNAME+".RAW_SINGLE_QUEUE'); "+
"END; ");
tryUpdateDatabase(conn,
"BEGIN "+
" DBMS_AQADM.DROP_QUEUE_TABLE( "+
" QUEUE_TABLE => '"+USERNAME+".RAW_SINGLE_QUEUE_TABLE', "+
" FORCE => TRUE); "+
"END; ");
tryUpdateDatabase(conn,
"BEGIN "+
" DBMS_AQADM.STOP_QUEUE('"+USERNAME+".RAW_MULTIPLE_QUEUE'); "+
"END; ");
tryUpdateDatabase(conn,
"BEGIN "+
" DBMS_AQADM.DROP_QUEUE_TABLE( "+
" QUEUE_TABLE => '"+USERNAME+".RAW_MULTIPLE_QUEUE_TABLE', "+
" FORCE => TRUE); "+
"END; ");
}
/**
* Creates a connection the database.
*/
OracleConnection connect() throws SQLException
{
OracleDriver dr = new OracleDriver();
Properties prop = new Properties();
prop.setProperty("user",DemoAQRawQueue.USERNAME);
prop.setProperty("password",DemoAQRawQueue.PASSWORD);
return (OracleConnection)dr.connect(DemoAQRawQueue.URL,prop);
}
/**
* Utility method: executes a DML query but doesn't throw any exception if
* an error occurs.
*/
private void tryUpdateDatabase(Connection conn,String sql)
{
Statement stmt = null;
try
{
stmt = conn.createStatement();
stmt.executeUpdate(sql);
}catch(SQLException sqlex)
{
System.out.println("Exception ("+sqlex.getMessage()+") while trying to execute \""+sql+"\"");
}
finally
{
if(stmt != null)
{
try
{
stmt.close();
}catch(SQLException exx){exx.printStackTrace();}
}
}
}
/**
* Utility method: executes a DML query and throws an exception if
* an error occurs.
*/
private void doUpdateDatabase(Connection conn,String sql) throws SQLException
{
Statement stmt = null;
try
{
stmt = conn.createStatement();
stmt.executeUpdate(sql);
}
finally
{
if(stmt != null)
{
try
{
stmt.close();
}catch(SQLException exx){}
}
}
}
static final String byteBufferToHexString(byte[] buffer, int maxNbOfBytes)
{
if(buffer == null)
return null;
int offset = 0;
boolean isFirst = true;
StringBuffer sb = new StringBuffer();
while(offset < buffer.length && offset < maxNbOfBytes)
{
if(!isFirst)
sb.append(' ');
else
isFirst = false;
String hexrep = Integer.toHexString((int)buffer[offset]&0xFF);
if(hexrep.length() == 1)
hexrep = "0"+hexrep;
sb.append(hexrep);
offset++;
}
String ret= sb.toString();
return ret;
}
}
class DemoAQRawQueueListener implements AQNotificationListener
{
OracleConnection conn;
String queueName;
String typeName;
int eventsCount = 0;
public DemoAQRawQueueListener(String _queueName, String _typeName)
throws SQLException
{
queueName = _queueName;
typeName = _typeName;
conn = (OracleConnection)DriverManager.getConnection
(DemoAQRawQueue.URL, DemoAQRawQueue.USERNAME, DemoAQRawQueue.PASSWORD);
}
public void onAQNotification(AQNotificationEvent e)
{
System.out.println("\n----------- DemoAQRawQueueListener: got an event ----------- ");
System.out.println(e.toString());
System.out.println("------------------------------------------------------");
System.out.println("----------- DemoAQRawQueueListener: Dequeue start -----------");
try
{
AQDequeueOptions deqopt = new AQDequeueOptions();
deqopt.setRetrieveMessageId(true);
if(e.getConsumerName() != null)
deqopt.setConsumerName(e.getConsumerName());
if((e.getMessageProperties()).getDeliveryMode()
== AQMessageProperties.MESSAGE_BUFFERED)
{
deqopt.setDeliveryMode(AQDequeueOptions.DEQUEUE_BUFFERED);
deqopt.setVisibility(AQDequeueOptions.DEQUEUE_IMMEDIATE);
}
AQMessage msg = conn.dequeue(queueName,deqopt,typeName);
byte[] msgId = msg.getMessageId();
if(msgId != null)
{
String mesgIdStr = DemoAQRawQueue.byteBufferToHexString(msgId,20);
System.out.println("ID of message dequeued = "+mesgIdStr);
}
System.out.println(msg.getMessageProperties().toString());
byte[] payload = msg.getPayload();
if(typeName.equals("RAW"))
{
String payloadStr = new String(payload,0,10);
System.out.println("payload.length="+payload.length+", value="+payloadStr);
}
System.out.println("----------- DemoAQRawQueueListener: Dequeue done -----------");
}
catch(SQLException sqlex)
{
System.out.println(sqlex.getMessage());
}
eventsCount++;
}
public int getEventsCount()
{
return eventsCount;
}
public void closeConnection() throws SQLException
{
conn.close();
}
}
メッセージをエンキューするには、まずそのメッセージを作成する必要があります。AQメッセージは、AQMessageインタフェースを実装したクラスのインスタンスによって表されます。各AQメッセージには、一連のプロパティ(メタデータ)と1つのペイロード(データ)が含まれます。AQメッセージを作成するには次の操作を実行します。
AQMessagePropertiesのインスタンスを作成します。
プロパティ属性を設定します。
AQMessagePropertiesオブジェクトを使用してAQメッセージを作成します。
ペイロードを設定します。
AQメッセージのプロパティ
AQメッセージのプロパティは、AQMessagePropertiesインタフェースのインスタンスによって表されます。設定または取得できるメッセージ・プロパティは次のとおりです。
デキュー試行回数: メッセージのデキューを試行した回数を示します。このプロパティは、設定できません。
相関: メッセージのエンキュー時に、そのメッセージのプロデューサによって提供されるIDです。
遅延: メッセージをいつまでWAITING状態にしておくかを示す秒数です。指定された遅延時間が経過すると、メッセージはREADY状態になり、デキューできるようになります。メッセージID(msgid)を使用してメッセージをデキューした場合、遅延時間の指定はオーバーライドされます。
|
注意: バッファ・メッセージについては遅延はサポートされません。 |
配信モード: メッセージがバッファ・メッセージであるか永続メッセージであるかを示します。このプロパティは、設定できません。
エンキュー時間: メッセージがエンキューされた時刻を示します。この値はシステムによって決定され、ユーザーが設定することはできません。
例外キュー: メッセージを正常に処理できない場合にメッセージの移動先となるキューの名前を指定します。メッセージが移動されるのは、次の2つの場合です。
デキューに失敗した回数がmax_retriesを超えた場合。
メッセージが期限切れになった場合。
有効期限: メッセージがREADY状態になってから、そのメッセージをデキューできなくなるまでの秒数。デキューされないまま期限切れとなったメッセージは、EXPIRED状態になって例外キューに移動されます。
メッセージ状態: メッセージがデキューされた時点のメッセージの状態を示します。このプロパティは、設定できません。
直前のキューにおけるメッセージID: 現在のメッセージを生成したキューで、最も最近入っていたキューでのメッセージのIDです。メッセージがあるキューから別のキューに伝播された場合は、この属性により、メッセージの最後の伝播元となったキューのIDが特定されます。このプロパティは、設定できません。
優先度: メッセージの優先度を指定します。負の整数を含め、すべての整数を指定できます。値が小さいほど、優先度は高くなります。
受信者リスト: 受信者を表すAQAgentオブジェクトのリストです。デフォルトの受信者は、キューのサブスクライバです。このパラメータは、複数コンシューマのキューに対してのみ有効です。
送信者: メッセージのエンキュー時にプロデューサによって指定される識別子です。これはAQAgentのインスタンスです。
トランザクション・グループ: キューがトランザクション・グループ対応である場合に、メッセージのトランザクション・グループを示します。このプロパティは、dequeueArrayメソッドに対するコールが成功した後に設定されます。
次のコードは、AQMessagePropertiesオブジェクトを作成し、それを使用してAQメッセージを作成する方法の例を示しています。
...
// Create the message properties object
AQMessageProperties msgprop = AQFactory.createAQMessageProperties();
// Set some properties (optional)
msgprop.setCorrelation("mycorrelation");
msgprop.setExceptionQueue("MY_EXCEPTION_QUEUE");
msgprop.setExpiration(0);
msgprop.setPriority(1);
AQAgent ag = AQFactory.createAQAgent();
ag.setName("MY_SENDER_AGENT_NAME");ag.setAddress("MY_SENDER_AGENT_ADDRESS");msgprop.setSender(ag);
msgprop.setRecipientList(recipients);
// Create the message
AQMessage mesg = AQFactory.createAQMessage(msgprop);
...
AQメッセージのペイロード
AQメッセージのペイロードは、キューの型に応じ、AQMessageインタフェースのsetPayloadメソッドを使用して指定します。次のコードは、ペイロードの設定方法の例を示しています。
... byte[] rawPayload = new byte[500];for(int i=0;i<rawPayload.length;i++) rawPayload[i] = 'b'; mesg.setPayload(new oracle.sql.RAW(rawPayload)); ...
AQメッセージのペイロードを取得するには、getPayloadメソッドまたは適切なgetXXXPayloadメソッドを使用します。これらのメソッドは、AQMessageインタフェースで定義されています。
メッセージを作成し、メッセージのプロパティとペイロードを設定したら、OracleConnectionインタフェースのenqueueメソッドを使用してメッセージをエンキューできます。メッセージをエンキューする前には、いくつかのエンキュー・オプションを指定できます。具体的には、AQEnqueueOptionsクラスを使用して、次のエンキュー・オプションを指定できます。
配信モード: 配信モードを指定します。配信モードは、永続(ENQUEUE_PERSISTENT)またはバッファ(ENQUEUE_BUFFERED)に設定できます。
メッセージIDの取得: メッセージのエンキュー時にサーバーからメッセージIDを取得するかどうかを指定します。デフォルトでは、メッセージIDは取得されません。
変換: エンキューの前にメッセージに適用する変換を指定します。変換関数の戻り型は、キューの型と一致する必要があります。
|
注意: 変換は、DBMS_TRANSFORM.CREATE_TRANSFORMATION(...)を使用して、PL/SQL内で作成する必要があります。 |
可視性: エンキュー・リクエストの、トランザクション上の動作を指定します。このオプションのデフォルト値はENQUEUE_ON_COMMITです。この設定値は、エンキュー操作が現在のトランザクションの一部であることを示します。ENQUEUE_IMMEDIATEを指定した場合、エンキュー操作は自律型トランザクションとなり、操作の完了時にコミットされます。バッファ・メッセージの場合は、ENQUEUE_IMMEDIATEを使用する必要があります。
次のコードは、エンキュー・オプションを設定してメッセージをエンキューする方法の例を示しています。
... // Set the enqueue options AQEnqueueOptions opt = new AQEnqueueOptions();opt.setRetrieveMessageId(true); // Enqueue the message // conn is an instance of OracleConnection // queueName is a String // mesg is an instance of AQMessage conn.enqueue(queueName, opt, mesg); ...
エンキューされたメッセージをデキューするには、OracleConnectionインタフェースのdequeueメソッドを使用します。メッセージをデキューする前には、デキュー・オプションを設定する必要があります。具体的には、AQDequeueOptionsクラスを使用して、次のデキュー・オプションを指定できます。
条件: メッセージ・プロパティ、メッセージ・データ・プロパティおよびPL/SQLファンクションに基づいて条件式を指定します。デキュー条件は、SQL問合せのWHERE句に似た構文を使用し、Boolean式として指定します。
コンシューマ名: これを指定すると、そのコンシューマ名に一致するメッセージのみがアクセスされます。
|
注意: キューがシングル・コンシューマ・キューの場合、このオプションは設定しないでください。 |
相関: デキュー操作に適用する相関基準(検索基準)を指定します。
配信フィルタ: デキューするメッセージの型を指定します。指定できるのは、バッファ・メッセージのみ(DEQEUE_BUFFERED)、永続メッセージのみ(DEQUEUE_PERSISTENT、デフォルト)またはその両方(DEQUEUE_PERSISTENT_OR_BUFFERED)です。
デキューするメッセージのID: デキューするメッセージのメッセージIDを指定します。このオプションは、IDがわかっている一意のメッセージをデキューする場合に使用できます。
デキュー・モード: デキュー操作に関連付けるロック動作を指定します。次のいずれかの値を指定できます。
DequeueMode.BROWSE: ロックを取得せずにメッセージをデキューします。
DequeueMode.LOCKED: トランザクションの完了まで持続する書込みロックをかけてメッセージをデキューします。
DequeueMode.REMOVE: (デフォルト)メッセージをデキュー後に削除します。保持プロパティが永続であれば、メッセージはキュー内に保持されます。
DequeueMode.REMOVE_NO_DATA: メッセージを、更新済または削除済としてマークします。
最大バッファ長: メッセージがRAWキューからデキューされる場合に、メッセージに割り当てることができる最大バイト数を指定します。デフォルトの最大値はDEFAULT_MAX_PAYLOAD_LENGTHですが、これはゼロを除く任意の他の値に変更できます。メッセージ全体を格納できるほどバッファが大きくない場合、超過したバイト数は警告なしに無視されます。
ナビゲーション: どの位置のメッセージを取得対象とするかを指定します。次のいずれかの値を指定できます。
NavigationOption.FIRST_MESSAGE: 検索基準に一致するメッセージのうち、利用可能な最初のメッセージをデキューします。
NavigationOption.NEXT_MESSAGE: (デフォルト)検索基準に一致する、次のデキュー可能なメッセージをデキューします。直前のメッセージがメッセージ・グループに属している場合は、そのメッセージ・グループ内で検索基準に一致する、次のデキュー可能なメッセージをデキューします。
NavigationOption.NEXT_TRANSACTION: 現在のトランザクション・グループ内のメッセージをスキップし、次のトランザクション・グループの最初のメッセージをデキューします。この設定は、メッセージ・グループに対応したキューでのみ使用できます。
メッセージIDの取得: デキューするメッセージのメッセージIDを取得する必要があるかどうかを指定します。デフォルトでは取得されません。
変換: デキューの後にメッセージに適用する変換を指定します。変換のソース型は、キューの型と一致する必要があります。
|
注意: 変換は、DBMS_TRANSFORM.CREATE_TRANSFORMATION(...)を使用して、PL/SQL内で作成する必要があります。 |
可視性: メッセージを、現在のトランザクションの一部としてデキューするかどうかを指定します。次のいずれかの値を指定できます。
VisibilityOption.ON_COMMIT: (デフォルト)現在のトランザクションの一部としてデキューします。
VisibilityOption.IMMEDIATE: 操作の完了時にコミットされる自律型トランザクションとしてデキューします。
|
注意: デキュー・モードがDequeueMode.BROWSEの場合、可視性オプションは無視されます。配信フィルタがDEQUEUE_BUFFEREDまたはDEQUEUE_PERSISTENT_OR_BUFFEREDの場合、このオプションはVisibilityOption.IMMEDIATEに設定する必要があります。 |
待機: 検索基準に一致するメッセージが1つもない場合にデキュー操作に適用する待機時間を指定します。デフォルト値のDEQUEUE_WAIT_FOREVERは、デキュー操作を無期限に待機状態にすることを示します。DEQUEUE_NO_WAITに設定した場合、デキュー操作は待機をしません。数値を指定した場合、デキュー操作は指定された秒数の間待機します。
|
注意: DEQUEUE_WAIT_FOREVERを指定すると、デキュー操作は検索基準に一致するメッセージがキュー内でデキュー可能になるまで待機し続けます。ただし、OracleConnectionオブジェクトに対してcancelメソッドをコールすると、デキュー操作を中断できます。 |
次のコードは、デキュー・オプションを設定してメッセージをデキューする方法の例を示しています。
... // Set the dequeue options AQDequeueOptions deqopt = new AQDequeueOptions(); deqopt.setRetrieveMessageId(true); deqopt.setConsumerName(consumerName); // Dequeue the message // conn is an OracleConnection object // queueName is a String identifying the queue // queueType is a String specifying the type of the queue, such as RAW AQMessage msg = conn.dequeue(queueName,deqopt,queueType);
この項では、メッセージのエンキュー方法とデキュー方法を、いくつかの例で示します。
例25-2ではメッセージのエンキュー方法を、例25-3ではメッセージのデキュー方法を示します。
例25-2 単一メッセージのエンキュー
この例では、キューにアクセスし、メッセージを作成して、メッセージをエンキューする方法を示します。
// Get access to the queue
// conn is an instance of oracle.jdbc.OracleConnection
oracle.jdbc.aq.AQQueue queue = conn.getAQQueue("SCOTT.MY_QUEUE","RAW");
// Create the message properties object
oracle.jdbc.aq.AQMessageProperties msgprop =
oracle.jdbc.aq.AQMessageProperties.createAQMessageProperties();
// Set some properties (optional)
msgprop.setPriority(1);
msgprop.setExceptionQueue("EXCEPTION_QUEUE");
msgprop.setExpiration(0);
AQAgent agent = AQAgent.createAQAgent("AGENTNAME");
ag2.setAddress("AGENTADDRESS");
msgprop.setSender(agent);
// Create the message
AQMessage mesg = AQMessage.createAQMessage(msgprop);
// Set the payload
// where buffer contains the RAW payload:
mesg.setRawPayload(new AQRawPayload(buffer));
// Set the enqueue options
AQEnqueueOptions options = new AQEnqueueOptions();
//Set the enqueue options using the setXXX methods of AQEnqueueOptions.
// Enqueue the message
queue.enqueue(options, mesg);
例25-3 単一メッセージのデキュー
この例では、キューにアクセスし、デキュー・オプションを設定して、メッセージをデキューする方法を示します。
// Get access to the queue
// conn is an instance of oracle.jdbc.OracleConnection
oracle.jdbc.aq.AQQueue queue = conn.getAQQueue ("SCOTT.MY_QUEUE","RAW");
// Set the dequeue options
AQDequeueOptions options = new AQDequeueOptions();
options.setDeliveryMode(AQDequeueOptions.BUFFERED);
// Dequeue the message
AQMessage mesg = queue.dequeue(options);