Oracleアドバンスト・キューイング(AQ)は、データベース統合型のメッセージ・キューイング機能を提供します。Oracle Streams上で構築され、Oracleデータベースの機能を最適化して、メッセージを永続的に格納し、異なるコンピュータやデータベース上のキューにメッセージを伝播し、Oracle Netサービス、HTTPおよびHTTPSを使用してメッセージを送信できます。Oracle AQはデータベース表に実装されるため、運用上の利点である高可用性、スケーラビリティ、信頼性のすべてがキュー・データにも適用されます。この章では、Oracle AQへのJavaインタフェースに関する情報を提供します。
|
注意: Oracle Database 11gリリース2(11.2)では、XMLTypeキューのサポートが追加されました。Oracle Database 11gリリース1(11.1)までは、サポートされているキューの型は、RAW、ADTおよびANYDATA型でした。 |
|
関連項目: Oracle Streamsアドバンスト・キューイング・ユーザーズ・ガイド |
この章の内容は次のとおりです。
Oracle JDBCパッケージであるoracle.jdbc.aqは、高速JavaインタフェースをAQに提供しています。このパッケージには、次のものが含まれています。
クラス
AQDequeueOptions
デキュー操作で利用可能なオプションを指定します。
AQEnqueueOptions
エンキュー操作で利用可能なオプションを指定します。
AQFactory
AQのファクトリ・クラスです。
AQNotificationEvent
通知を有効にするよう登録したキューに新しいメッセージがエンキューされると作成されます。
インタフェース
AQAgent
キューのユーザーや、メッセージのプロデューサまたはコンシューマを、表したり特定したりするために使用します。
AQMessage
エンキューまたはデキュー対象のメッセージを表します。
AQMessageProperties
相関、送信者、遅延、有効期限、受信者、優先度、順序付けなどのメッセージ・プロパティが含まれます。
AQNotificationListener
AQ通知イベントを受信するためのリスナー・インタフェースです。
AQNotificationRegistration
特定のキューに新しいメッセージがエンキューされた場合に通知を受けることを表します。
これらのクラスとインタフェースを使用して、既存のキューにアクセスしたり、メッセージを作成したり、メッセージをエンキューおよびデキューできます。
|
注意: Oracle JDBCドライバには、キューを作成するためのAPIは一切用意されていません。キューは、DBMS_AQADM PL/SQLパッケージを使用して作成する必要があります。 |
|
関連項目: APIの詳細は、Javadocを参照してください。
|
この章で使用されているコードの抜粋では、ユーザーSCOTTがデータベースに接続しているものとします。そのため、データベース内でSCOTTに次の権限を付与する必要があります。
GRANT EXECUTE ON DBMS_AQ to SCOTT; GRANT EXECUTE ON DBMS_AQADM to SCOTT; GRANT AQ_ADMINISTRATOR_ROLE TO SCOTT; GRANT ADMINISTER DATABASE TRIGGER TO SCOTT;
メッセージをエンキューおよびデキューする前に、データベース内にキューが存在する必要があります。手順は次のとおりです。
次のようにキュー表を作成します。
BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE(
QUEUE_TABLE =>'scott.RAW_SINGLE_QUEUE_TABLE',
QUEUE_PAYLOAD_TYPE =>'RAW',
COMPATIBLE => '10.0');
END;
次のようにキューを作成します。
BEGIN
DBMS_AQADM.CREATE_QUEUE(
QUEUE_NAME =>'scott.RAW_SINGLE_QUEUE',
QUEUE_TABLE =>'scott.RAW_SINGLE_QUEUE_TABLE',
END;
次のようにキューを起動します。
BEGIN
DBMS_AQADM.START_QUEUE(
'scott.RAW_SINGLE_QUEUE',
END;
キューの停止やデータベースからのキュー表の削除が必要な場合があります。次の方法で対処できます。
次のようにキューを停止します。
BEGIN
DBMS_AQADM.STOP_QUEUE(
scott.RAW_SINGLE_QUEUE',
END;
次のようにデータベースからキュー表を削除します。
BEGIN
DBMS_AQADM.DROP_QUEUE_TABLE(
QUEUE_TABLE =>'scott.RAW_SINGLE_QUEUE_TABLE',
FORCE => TRUE
END;
JDBCアプリケーションでは次のことができます。
AQネームスペースを登録し、エンキュー発生時に通知を受け取ります。その方法は次のとおりです。
public AQNotificationRegistration registerForAQEvents(
OracleConnection conn,
String queueName) throws SQLException
{
Properties globalOptions = new Properties();
String[] queueNameArr = new String[1];
queueNameArr[0] = queueName;
Properties[] opt = new Properties[1];
opt[0] = new Properties();
opt[0].setProperty(OracleConnection.NTF_AQ_PAYLOAD,"true");
AQNotificationRegistration[] regArr = conn.registerAQNotification(queueNameArr,opt,globalOptions);
AQNotificationRegistration reg = regArr[0];
return reg;
}
データベース・イベントにサブスクリプションを登録し、イベントのトリガー時に通知を受け取ります。
登録されたクライアントは、イベントがトリガーされた場合または明示的なAQエンキューの場合に非同期に通知されます(または、通知希望を登録したキューに新規メッセージがエンキューされた場合)。クライアントはデータベースに接続されている必要はありません。
次のコードは、データベース・イベントにサブスクライブし、イベントのトリガー時に通知を受け取る方法を示します。
class DemoAQRawQueueListener implements AQNotificationListener
{
OracleConnection conn;
String queueName;
String typeName;
int eventsCount = 0;
public DemoAQRawQueueListener(String _queueName, String _typeName)
throws SQLException
{
queueName = _queueName;
typeName = _typeName;
conn = (OracleConnection)DriverManager.getConnection
(DemoAQRawQueue.URL, DemoAQRawQueue.USERNAME, DemoAQRawQueue.PASSWORD);
}
public void onAQNotification(AQNotificationEvent e)
{
try
{
AQDequeueOptions deqopt = new AQDequeueOptions();
deqopt.setRetrieveMessageId(true);
if(e.getConsumerName() != null)
deqopt.setConsumerName(e.getConsumerName());
if((e.getMessageProperties()).getDeliveryMode()
== AQMessageProperties.MESSAGE_BUFFERED)
{
deqopt.setDeliveryMode(AQDequeueOptions.DEQUEUE_BUFFERED);
deqopt.setVisibility(AQDequeueOptions.DEQUEUE_IMMEDIATE);
}
AQMessage msg = conn.dequeue(queueName,deqopt,typeName);
byte[] msgId = msg.getMessageId();
if(msgId != null)
{
String mesgIdStr = DemoAQRawQueue.byteBufferToHexString(msgId,20);
System.out.println("ID of message dequeued = "+mesgIdStr);
}
System.out.println(msg.getMessageProperties().toString());
byte[] payload = msg.getPayload();
if(typeName.equals("RAW"))
{
String payloadStr = new String(payload,0,10);
System.out.println("payload.length="+payload.length+", value="+payloadStr);
}
}
catch(SQLException sqlex)
{
System.out.println(sqlex.getMessage());
}
eventsCount++;
}
public int getEventsCount()
{
return eventsCount;
}
public void closeConnection() throws SQLException
{
conn.close();
}
}
次のようにリスナーに登録します。
AQNotificationRegistration reg = registerForAQEvents(conn,queueName+":BLUE"); DemoAQRawQueueListener demo_li = new DemoAQRawQueueListener(queueName,queueType); reg.addListener(demo_li);
メッセージをエンキューするには、まずそのメッセージを作成する必要があります。AQメッセージは、AQMessageインタフェースを実装したクラスのインスタンスによって表されます。各AQメッセージには、一連のプロパティ(メタデータ)と1つのペイロード(データ)が含まれます。AQメッセージを作成するには次の操作を実行します。
次のようにAQMessagePropertiesのインスタンスを作成します。
AQMessageProperties msgprop = AQFactory.createAQMessageProperties();
次のようにプロパティ属性を設定します。
msgprop.setCorrelation("mycorrelation");
msgprop.setExceptionQueue("MY_EXCEPTION_QUEUE");
msgprop.setExpiration(0);
msgprop.setPriority(1);
次のようにAQMessagePropertiesオブジェクトを使用してAQメッセージを作成します。
AQMessage mesg = AQFactory.createAQMessage(msgprop);
次のようにペイロードを設定します。
byte[] rawPayload = "Example_Payload".getBytes(); mesg.setPayload(new oracle.sql.RAW(rawPayload));
AQメッセージのプロパティ
AQメッセージのプロパティは、AQMessagePropertiesインタフェースのインスタンスによって表されます。設定または取得できるメッセージ・プロパティは次のとおりです。
デキュー試行回数: メッセージのデキューを試行した回数を示します。このプロパティは、設定できません。
相関: メッセージのエンキュー時に、そのメッセージのプロデューサによって提供されるIDです。
遅延: メッセージをいつまでWAITING状態にしておくかを示す秒数です。指定された遅延時間が経過すると、メッセージはREADY状態になり、デキューできるようになります。メッセージID(msgid)を使用してメッセージをデキューした場合、遅延時間の指定はオーバーライドされます。
|
注意: バッファ・メッセージについては遅延はサポートされません。 |
配信モード: メッセージがバッファ・メッセージであるか永続メッセージであるかを示します。このプロパティは、設定できません。
エンキュー時間: メッセージがエンキューされた時刻を示します。この値はシステムによって決定され、ユーザーが設定することはできません。
例外キュー: メッセージを正常に処理できない場合にメッセージの移動先となるキューの名前を指定します。メッセージが移動されるのは、次の2つの場合です。
デキューに失敗した回数がmax_retriesを超えた場合。
メッセージが期限切れになった場合。
有効期限: メッセージがREADY状態になってから、そのメッセージをデキューできなくなるまでの秒数。デキューされないまま期限切れとなったメッセージは、EXPIRED状態になって例外キューに移されます。
メッセージ状態: メッセージがデキューされた時点のメッセージの状態を示します。このプロパティは、設定できません。
直前のキューにおけるメッセージID: 現在のメッセージを生成した最後のキューに入っているメッセージのIDです。メッセージがあるキューから別のキューに伝播された場合は、この属性により、メッセージの最後の伝播元となったキューのIDが特定されます。このプロパティは、設定できません。
優先度: メッセージの優先順位を指定します。マイナスの整数を含む任意の整数です。小さい値ほど、高い優先度を表します。
受信者リスト: 受信者を表すAQAgentオブジェクトのリストです。デフォルトの受信者は、キューのサブスクライバです。このパラメータは、複数コンシューマのキューに対してのみ有効です。
送信者: メッセージのエンキュー時にプロデューサによって指定される識別子です。これはAQAgentのインスタンスです。
トランザクション・グループ: キューがトランザクション・グループ対応である場合に、メッセージのトランザクション・グループを示します。このプロパティは、dequeueArrayメソッドに対するコールが成功した後に設定されます。
AQメッセージのペイロード
AQメッセージのペイロードは、キューの型に応じ、AQMessageインタフェースのsetPayloadメソッドを使用して指定します。次のコードは、ペイロードの設定方法の例を示しています。
... byte[] rawPayload = "Example_Payload".getBytes(); mesg.setPayload(new oracle.sql.RAW(rawPayload)); ...
AQメッセージのペイロードを取得するには、次のようにgetPayloadメソッドまたは適切なgetXXXPayloadメソッドを使用します。
byte[] payload = mesg.getPayload();
これらのメソッドは、AQMessageインタフェースで定義されています。
この項では、メッセージを作成しペイロードを設定する方法の例を示します。
例25-1 メッセージの作成およびペイロードの設定
この例では、AQMessagePropertiesのインスタンスの作成、プロパティ属性の設定、AQメッセージの作成およびペイロードの設定の方法を示します。
AQMessageProperties msgprop = AQFactory.createAQMessageProperties();
msgprop.setCorrelation("mycorrelation");
msgprop.setExceptionQueue("MY_EXCEPTION_QUEUE");
AQAgent ag = AQFactory.createAQAgent();
ag.setName("MY_SENDER_AGENT_NAME");
ag.setAddress("MY_SENDER_AGENT_ADDRESS");
msgprop.setSender(ag);
// handle multi consumer case:
if(recipients != null)
msgprop.setRecipientList(recipients);
System.out.println(msgprop.toString());
AQMessage mesg = AQFactory.createAQMessage(msgprop);
byte[] rawPayload = "Example_Payload".getBytes();
mesg.setPayload(new oracle.sql.RAW(rawPayload));
メッセージを作成し、メッセージのプロパティとペイロードを設定したら、OracleConnectionインタフェースのenqueueメソッドを使用してメッセージをエンキューできます。メッセージをエンキューする前には、いくつかのエンキュー・オプションを指定できます。具体的には、AQEnqueueOptionsクラスを使用して、次のエンキュー・オプションを指定できます。
配信モード: 配信モードを指定します。配信モードは、永続(ENQUEUE_PERSISTENT)またはバッファ(ENQUEUE_BUFFERED)に設定できます。
メッセージIDの取得: メッセージのエンキュー時にサーバーからメッセージIDを取得するかどうかを指定します。デフォルトでは、メッセージIDは取得されません。
変換: エンキューの前にメッセージに適用する変換を指定します。変換関数の戻り型は、キューの型と一致する必要があります。
|
注意: 変換は、DBMS_TRANSFORM.CREATE_TRANSFORMATION(...)を使用して、PL/SQL内で作成する必要があります。 |
可視性: エンキュー・リクエストの、トランザクション上の動作を指定します。このオプションのデフォルト値はENQUEUE_ON_COMMITです。この設定値は、エンキュー操作が現在のトランザクションの一部であることを示します。ENQUEUE_IMMEDIATEを指定した場合、エンキュー操作は自律型トランザクションとなり、操作の完了時にコミットされます。バッファ・メッセージの場合は、ENQUEUE_IMMEDIATEを使用する必要があります。
次のコードは、エンキュー・オプションを設定してメッセージをエンキューする方法の例を示しています。
... AQEnqueueOptions opt = new AQEnqueueOptions();opt.setRetrieveMessageId(true); conn.enqueue(queueName, opt, mesg); ...
エンキューされたメッセージをデキューするには、OracleConnectionインタフェースのdequeueメソッドを使用します。メッセージをデキューする前には、デキュー・オプションを設定する必要があります。具体的には、AQDequeueOptionsクラスを使用して、次のデキュー・オプションを指定できます。
条件: メッセージ・プロパティ、メッセージ・データ・プロパティおよびPL/SQLファンクションに基づいて条件式を指定します。デキュー条件は、SQL問合せのWHERE句に似た構文を使用し、Boolean式として指定します。
コンシューマ名: これを指定すると、そのコンシューマ名に一致するメッセージのみがアクセスされます。
|
注意: キューが単一コンシューマ・キューの場合は、このオプションを設定しないでください。 |
相関: デキュー操作に適用する相関基準(検索基準)を指定します。
配信フィルタ: デキューするメッセージの型を指定します。指定できるのは、バッファ・メッセージのみ(DEQEUE_BUFFERED)、永続メッセージのみ(DEQUEUE_PERSISTENT、デフォルト)またはその両方(DEQUEUE_PERSISTENT_OR_BUFFERED)です。
デキューするメッセージのID: デキューするメッセージのメッセージIDを指定します。このオプションは、IDがわかっている一意のメッセージをデキューする場合に使用できます。
デキュー・モード: デキュー操作に関連付けるロック動作を指定します。次のいずれかの値を指定できます。
DequeueMode.BROWSE: ロックを取得せずにメッセージをデキューします。
DequeueMode.LOCKED: トランザクションの完了まで持続する書込みロックをかけてメッセージをデキューします。
DequeueMode.REMOVE: (デフォルト)メッセージをデキュー後に削除します。保持プロパティが永続であれば、メッセージはキュー内に保持されます。
DequeueMode.REMOVE_NO_DATA: メッセージを、更新済または削除済としてマークします。
最大バッファ長: メッセージがRAWキューからデキューされる場合に、メッセージに割り当てることができる最大バイト数を指定します。デフォルトの最大値はDEFAULT_MAX_PAYLOAD_LENGTHですが、これはゼロを除く任意の他の値に変更できます。メッセージ全体を格納できるほどバッファが大きくない場合、超過したバイト数は警告なしに無視されます。
ナビゲーション: どの位置のメッセージを取得対象とするかを指定します。次のいずれかの値を指定できます。
NavigationOption.FIRST_MESSAGE: 検索基準に一致するメッセージのうち、利用可能な最初のメッセージをデキューします。
NavigationOption.NEXT_MESSAGE: (デフォルト)検索基準に一致する、次のデキュー可能なメッセージをデキューします。直前のメッセージがメッセージ・グループに属している場合は、そのメッセージ・グループ内で検索基準に一致する、次のデキュー可能なメッセージをデキューします。
NavigationOption.NEXT_TRANSACTION: 現在のトランザクション・グループ内のメッセージをスキップし、次のトランザクション・グループの最初のメッセージをデキューします。この設定を使用できるのは、キューでメッセージのグループ化が可能な場合のみです。
メッセージIDの取得: デキューするメッセージのメッセージIDを取得する必要があるかどうかを指定します。デフォルトでは取得されません。
変換: デキューの後にメッセージに適用する変換を指定します。変換のソース型は、キューの型と一致する必要があります。
|
注意: 変換は、DBMS_TRANSFORM.CREATE_TRANSFORMATION(...)を使用して、PL/SQL内で作成する必要があります。 |
可視性: メッセージを、現在のトランザクションの一部としてデキューするかどうかを指定します。次のいずれかの値を指定できます。
VisibilityOption.ON_COMMIT: (デフォルト)現在のトランザクションの一部としてデキューします。
VisibilityOption.IMMEDIATE: 操作の完了時にコミットされる自律型トランザクションとしてデキューします。
|
注意: デキュー・モードがDequeueMode.BROWSEの場合、可視性オプションは無視されます。配信フィルタがDEQUEUE_BUFFEREDまたはDEQUEUE_PERSISTENT_OR_BUFFEREDである場合、このオプションはVisibilityOption.IMMEDIATEに設定する必要があります。 |
待機: 検索基準に一致するメッセージが1つもない場合にデキュー操作に適用する待機時間を指定します。デフォルト値のDEQUEUE_WAIT_FOREVERは、デキュー操作を無期限に待機状態にすることを示します。DEQUEUE_NO_WAITに設定した場合、デキュー操作は待機状態になりません。数値を指定した場合、デキュー操作は指定された秒数の間待機状態になります。
|
注意: DEQUEUE_WAIT_FOREVERを指定すると、デキュー操作は検索基準に一致するメッセージがキュー内でデキュー可能になるまで待機状態が続きます。ただし、OracleConnectionオブジェクトに対してcancelメソッドをコールすると、デキュー操作を中断できます。 |
次のコードは、デキュー・オプションを設定してメッセージをデキューする方法の例を示しています。
... AQDequeueOptions deqopt = new AQDequeueOptions(); deqopt.setRetrieveMessageId(true); deqopt.setConsumerName(consumerName); AQMessage msg = conn.dequeue(queueName,deqopt,queueType);
この項では、メッセージのエンキュー方法とデキュー方法を、いくつかの例で示します。
例25-2ではメッセージのエンキュー方法を、例25-3ではメッセージのデキュー方法を示しています。
例25-2 単一メッセージのエンキュー
この例では、キューにアクセスし、メッセージを作成して、メッセージをエンキューする方法を示しています。
AQMessageProperties msgprop = AQFactory.createAQMessageProperties();
msgprop.setPriority(1);
msgprop.setExceptionQueue("EXCEPTION_QUEUE");
msgprop.setExpiration(0);
AQAgent agent = AQFactory.createAQAgent();
agent.setName("AGENTNAME");
agent.setAddress("AGENTADDRESS");
msgprop.setSender(agent);
AQMessage mesg = AQFactory.createAQMessage(msgprop);
mesg.setPayload(buffer); // where buffer is a byte array (for a RAW queue)
AQEnqueueOptions options = new AQEnqueueOptions();
conn.enqueue("SCOTT.MY_QUEUE", options, mesg);