26 Oracleアドバンスト・キューイング
Oracleアドバンスト・キューイング(AQ)は、データベース統合型のメッセージ・キューイング機能を提供します。Oracle Streams上で構築され、Oracle Databaseの機能を最適化して、メッセージを永続的に格納し、異なるコンピュータやデータベース上のキューにメッセージを伝播し、Oracle Netサービス、HTTPおよびHTTPSを使用してメッセージを送信できます。Oracle AQはデータベース表に実装されるため、運用上の利点である高可用性、スケーラビリティ、信頼性のすべてがキュー・データにも適用されます。この章では、Oracle AQへのJavaインタフェースに関する情報を提供します。
ノート:
-
Oracleアドバンスト・キューイング(AQ)は、Oracle JDBC Thinドライバの機能であり、JDBC OCIドライバではサポートされていません。
-
Oracle Database 12cリリース1 (12.1)では、
XMLType
キューのサポートが追加されました。Oracle Database 11gリリース1までは、サポートされているキューの型は、RAW
、ADT
およびANYDATA
型でした。
この章の内容は次のとおりです。
26.1 Oracleアドバンスト・キューイングの機能とフレームワーク
Oracle JDBCパッケージであるoracle.jdbc.aq
は、高速JavaインタフェースをAQに提供しています。このパッケージには、次のものが含まれています。
-
クラス
-
AQDequeueOptions
デキュー操作で利用可能なオプションを指定します。
-
AQEnqueueOptions
エンキュー操作で利用可能なオプションを指定します。
-
AQFactory
AQのファクトリ・クラスです。
-
AQNotificationEvent
通知を有効にするよう登録したキューに新しいメッセージがエンキューされると作成されます。
-
-
インタフェース
-
AQAgent
キューのユーザーや、メッセージのプロデューサまたはコンシューマを、表したり特定したりするために使用します。
-
AQMessage
エンキューまたはデキュー対象のメッセージを表します。
-
AQMessageProperties
相関、送信者、遅延、有効期限、受信者、優先度、順序付けなどのメッセージ・プロパティが含まれます。
-
AQNotificationListener
AQ通知イベントを受信するためのリスナー・インタフェースです。
-
AQNotificationRegistration
特定のキューに新しいメッセージがエンキューされた場合に通知を受けることを表します。
-
これらのクラスとインタフェースを使用して、既存のキューにアクセスしたり、メッセージを作成したり、メッセージをエンキューおよびデキューできます。
ノート:
Oracle JDBCドライバには、キューを作成するためのAPIは一切用意されていません。キューは、DBMS_AQADM
PL/SQLパッケージを使用して作成する必要があります。
関連項目:
APIの詳細は、Oracle Database JDBC Java APIリファレンスを参照してください。
26.2 データベースの変更
この章で使用されているコードの抜粋では、ユーザーHR
がデータベースに接続しているものとします。そのため、データベース内でHR
に次の権限を付与する必要があります。
GRANT EXECUTE ON DBMS_AQ to HR; GRANT EXECUTE ON DBMS_AQADM to HR; GRANT AQ_ADMINISTRATOR_ROLE TO HR; GRANT ADMINISTER DATABASE TRIGGER TO HR;
メッセージをエンキューおよびデキューする前に、データベース内にキューが存在する必要があります。手順は次のとおりです。
-
次のようにキュー表を作成します。
BEGIN DBMS_AQADM.CREATE_QUEUE_TABLE( QUEUE_TABLE =>'HR.RAW_SINGLE_QUEUE_TABLE', QUEUE_PAYLOAD_TYPE =>'RAW', COMPATIBLE => '10.0'); END;
-
次のようにキューを作成します。
BEGIN DBMS_AQADM.CREATE_QUEUE( QUEUE_NAME =>'HR.RAW_SINGLE_QUEUE', QUEUE_TABLE =>'HR.RAW_SINGLE_QUEUE_TABLE', END;
-
次のようにキューを起動します。
BEGIN DBMS_AQADM.START_QUEUE( 'HR.RAW_SINGLE_QUEUE', END;
キューの停止やデータベースからのキュー表の削除が必要な場合があります。次の方法で対処できます。
26.3 AQ非同期イベント通知
JDBCアプリケーションでは次のことができます。
-
AQネームスペースを登録し、エンキュー発生時に通知を受け取ります。その方法は次のとおりです。
public AQNotificationRegistration registerForAQEvents( OracleConnection conn, String queueName) throws SQLException { Properties globalOptions = new Properties(); String[] queueNameArr = new String[1]; queueNameArr[0] = queueName; Properties[] opt = new Properties[1]; opt[0] = new Properties(); opt[0].setProperty(OracleConnection.NTF_AQ_PAYLOAD,"true"); AQNotificationRegistration[] regArr = conn.registerAQNotification(queueNameArr,opt,globalOptions); AQNotificationRegistration reg = regArr[0]; return reg; }
-
データベース・イベントにサブスクリプションを登録し、イベントのトリガー時に通知を受け取ります。
登録されたクライアントは、イベントがトリガーされた場合または明示的なAQエンキューの場合に非同期に通知されます(または、通知希望を登録したキューに新規メッセージがエンキューされた場合)。クライアントは、データベースに接続する必要はありません。
次のコードは、データベース・イベントにサブスクライブし、イベントのトリガー時に通知を受け取る方法を示します。
class DemoAQRawQueueListener implements AQNotificationListener { OracleConnection conn; String queueName; String typeName; int eventsCount = 0; public DemoAQRawQueueListener(String _queueName, String _typeName) throws SQLException { queueName = _queueName; typeName = _typeName; conn = (OracleConnection)DriverManager.getConnection (DemoAQRawQueue.URL, DemoAQRawQueue.USERNAME, DemoAQRawQueue.PASSWORD); } public void onAQNotification(AQNotificationEvent e) { try { AQDequeueOptions deqopt = new AQDequeueOptions(); deqopt.setRetrieveMessageId(true); if(e.getConsumerName() != null) deqopt.setConsumerName(e.getConsumerName()); if((e.getMessageProperties()).getDeliveryMode() == AQMessageProperties.DeliveryMode.BUFFERED) { deqopt.setDeliveryMode(AQDequeueOptions.DEQUEUE_BUFFERED); deqopt.setVisibility(AQDequeueOptions.DEQUEUE_IMMEDIATE); } AQMessage msg = conn.dequeue(queueName,deqopt,typeName); byte[] msgId = msg.getMessageId(); if(msgId != null) { String mesgIdStr = DemoAQRawQueue.byteBufferToHexString(msgId,20); System.out.println("ID of message dequeued = "+mesgIdStr); } System.out.println(msg.getMessageProperties().toString()); byte[] payload = msg.getPayload(); if(typeName.equals("RAW")) { String payloadStr = new String(payload,0,10); System.out.println("payload.length="+payload.length+", value="+payloadStr); } } catch(SQLException sqlex) { System.out.println(sqlex.getMessage()); } eventsCount++; } public int getEventsCount() { return eventsCount; } public void closeConnection() throws SQLException { conn.close(); } }
-
次のようにリスナーに登録します。
AQNotificationRegistration reg = registerForAQEvents(conn,queueName+":BLUE"); DemoAQRawQueueListener demo_li = new DemoAQRawQueueListener(queueName,queueType); reg.addListener(demo_li);
26.4 メッセージの作成について
この項では、次の概念について説明します。
26.4.1 メッセージの作成
メッセージをエンキューするには、まずそのメッセージを作成する必要があります。AQメッセージは、AQMessage
インタフェースを実装したクラスのインスタンスによって表されます。各AQメッセージには、一連のプロパティ(メタデータ)と1つのペイロード(データ)が含まれます。AQメッセージを作成するには次の操作を実行します。
26.4.2 AQメッセージのプロパティ
AQメッセージのプロパティは、AQMessageProperties
インタフェースのインスタンスによって表されます。設定または取得できるメッセージ・プロパティは次のとおりです。
-
デキュー試行回数: メッセージのデキューを試行した回数を示します。このプロパティは、設定できません。
-
相関: メッセージのエンキュー時に、そのメッセージのプロデューサによって提供されるIDです。
-
遅延: メッセージをいつまで
WAITING
状態にしておくかを示す秒数です。指定された遅延時間が経過すると、メッセージはREADY
状態になり、デキューできるようになります。メッセージID(msgid)を使用してメッセージをデキューした場合、遅延時間の指定はオーバーライドされます。ノート:
バッファ済メッセージではdelayはサポートされません。
-
配信モード: メッセージがバッファ・メッセージであるか永続メッセージであるかを示します。このプロパティは、設定できません。
-
エンキュー時間: メッセージがエンキューされた時刻を示します。この値はシステムにより判断されるため、ユーザーは設定できません。
-
例外キュー: メッセージを正常に処理できない場合にメッセージの移動先となるキューの名前を指定します。メッセージが移動されるのは、次の2つの場合です。
-
デキューに失敗した回数が
max_retries
を超えた場合。 -
メッセージが期限切れになった場合。
-
-
有効期限: メッセージが
READY
状態になってから、そのメッセージをデキューできなくなるまでの秒数。期限切れになる前にデキューされない場合、メッセージはEXPIRED
状態で例外キューに移されます。 -
メッセージ状態: メッセージがデキューされた時点のメッセージの状態を示します。このプロパティは、設定できません。
-
直前のキューにおけるメッセージID: 現在のメッセージを生成した最後のキューに入っているメッセージのIDです。メッセージがあるキューから別のキューに伝播される際に、この属性はメッセージの最後の伝播元であるキューのIDを識別します。このプロパティは、設定できません。
-
優先度: メッセージの優先順位を指定します。マイナスの整数を含む任意の整数です。小さい値ほど、高い優先度を表します。
-
受信者リスト: 受信者を表す
AQAgent
オブジェクトのリストです。デフォルトの受信者はキューのサブスクライバです。このパラメータは、複数コンシューマのキューに対してのみ有効です。 -
送信者: メッセージのエンキュー時にプロデューサによって指定される識別子です。これは
AQAgent
のインスタンスです。 -
トランザクション・グループ: キューがトランザクション・グループ対応である場合に、メッセージのトランザクション・グループを示します。このプロパティは、
dequeueArray
メソッドに対するコールが成功した後に設定されます。
26.4.3 AQメッセージのペイロード
AQメッセージのペイロードは、キューの型に応じ、AQMessage
インタフェースのsetPayload
メソッドを使用して指定します。次のコードは、ペイロードの設定方法の例を示しています。
... byte[] rawPayload = "Example_Payload".getBytes(); mesg.setPayload(new oracle.sql.RAW(rawPayload)); ...
AQメッセージのペイロードを取得するには、次のようにgetPayload
メソッドまたは適切なget
XXX
Payload
メソッドを使用します。
byte[] payload = mesg.getPayload();
これらのメソッドは、AQMessage
インタフェースで定義されています。
26.5 例: メッセージの作成およびペイロードの設定
この項では、メッセージを作成しペイロードを設定する方法の例を示します。
例26-1 メッセージの作成およびペイロードの設定
この例では、AQMessageProperties
のインスタンスの作成、プロパティ属性の設定、AQメッセージの作成およびペイロードの設定の方法を示します。
AQMessageProperties msgprop = AQFactory.createAQMessageProperties();
msgprop.setCorrelation("mycorrelation");
msgprop.setExceptionQueue("MY_EXCEPTION_QUEUE");
AQAgent ag = AQFactory.createAQAgent();
ag.setName("MY_SENDER_AGENT_NAME");
ag.setAddress("MY_SENDER_AGENT_ADDRESS");
msgprop.setSender(ag);
// handle multi consumer case:
if(recipients != null)
msgprop.setRecipientList(recipients);
System.out.println(msgprop.toString());
AQMessage mesg = AQFactory.createAQMessage(msgprop);
byte[] rawPayload = "Example_Payload".getBytes();
mesg.setPayload(new oracle.sql.RAW(rawPayload));
26.6 メッセージのエンキュー
メッセージを作成し、メッセージのプロパティとペイロードを設定したら、OracleConnection
インタフェースのenqueue
メソッドを使用してメッセージをエンキューできます。メッセージをエンキューする前には、いくつかのエンキュー・オプションを指定できます。具体的には、AQEnqueueOptions
クラスを使用して、次のエンキュー・オプションを指定できます。
-
配信モード: 配信モードを指定します。配信モードは、永続(
ENQUEUE_PERSISTENT
)またはバッファ(ENQUEUE_BUFFERED
)に設定できます。 -
メッセージIDの取得: メッセージのエンキュー時にサーバーからメッセージIDを取得するかどうかを指定します。デフォルトでは、メッセージIDは取得されません。
-
変換: エンキューの前にメッセージに適用する変換を指定します。変換ファンクションの戻り型は、キューの型と一致する必要があります。
ノート:
変換は、
DBMS_TRANSFORM.CREATE_TRANSFORMATION(...)
を使用して、PL/SQL内で作成する必要があります。 -
可視性: エンキュー・リクエストの、トランザクション上の動作を指定します。このオプションのデフォルト値は
ENQUEUE_ON_COMMIT
です。この設定値は、エンキュー操作が現在のトランザクションの一部であることを示します。ENQUEUE_IMMEDIATE
を指定した場合、エンキュー操作は自律型トランザクションとなり、操作の完了時にコミットされます。バッファ・メッセージの場合は、ENQUEUE_IMMEDIATE
を使用する必要があります。
次のコードは、エンキュー・オプションを設定してメッセージをエンキューする方法の例を示しています。
... AQEnqueueOptions opt = new AQEnqueueOptions();opt.setRetrieveMessageId(true); conn.enqueue(queueName, opt, mesg); ...
26.7 メッセージのデキュー
エンキューされたメッセージをデキューするには、OracleConnection
インタフェースのdequeue
メソッドを使用します。メッセージをデキューする前には、デキュー・オプションを設定する必要があります。具体的には、AQDequeueOptions
クラスを使用して、次のデキュー・オプションを指定できます。
-
条件: メッセージ・プロパティ、メッセージ・データ・プロパティおよびPL/SQLファンクションに基づいて条件式を指定します。デキュー条件は、SQL問合せの
WHERE
句に似た構文を使用し、Boolean
式として指定します。 -
コンシューマ名: これを指定すると、そのコンシューマ名に一致するメッセージのみがアクセスされます。
ノート:
キューが単一コンシューマ・キューの場合は、このオプションを設定しないでください。
-
相関: デキュー操作に適用する相関基準(検索基準)を指定します。
-
配信フィルタ: デキューするメッセージの型を指定します。指定できるのは、バッファ・メッセージのみ(
DEQUEUE_BUFFERED
)、永続メッセージのみ(DEQUEUE_PERSISTENT
、デフォルト)またはその両方(DEQUEUE_PERSISTENT_OR_BUFFERED
)です。 -
デキューするメッセージのID: デキューするメッセージのメッセージIDを指定します。このオプションは、IDがわかっている一意のメッセージをデキューする場合に使用できます。
-
デキュー・モード: デキュー操作に関連付けるロック動作を指定します。次のいずれかの値を指定できます。
-
DequeueMode.BROWSE
: ロックを取得せずにメッセージをデキューします。 -
DequeueMode.LOCKED
: トランザクションの完了まで持続する書込みロックをかけてメッセージをデキューします。 -
DequeueMode.REMOVE
: (デフォルト)メッセージをデキュー後に削除します。保持プロパティが永続であれば、メッセージはキュー内に保持されます。 -
DequeueMode.REMOVE_NO_DATA
: メッセージを、更新済または削除済としてマークします。
-
-
最大バッファ長: メッセージが
RAW
キューからデキューされる場合に、メッセージに割り当てることができる最大バイト数を指定します。デフォルトの最大値はDEFAULT_MAX_PAYLOAD_LENGTH
ですが、これはゼロを除く任意の他の値に変更できます。メッセージ全体を格納できるほどバッファが大きくない場合、超過したバイト数は警告なしに無視されます。 -
ナビゲーション: どの位置のメッセージを取得対象とするかを指定します。次のいずれかの値を指定できます。
-
NavigationOption.FIRST_MESSAGE
: 検索基準に一致するメッセージのうち、利用可能な最初のメッセージをデキューします。 -
NavigationOption.NEXT_MESSAGE
: (デフォルト)検索基準に一致する、次のデキュー可能なメッセージをデキューします。前のメッセージがメッセージ・グループに属する場合は、そのメッセージ・グループに属するメッセージの中から、検索基準に一致する次に使用可能なメッセージがデキューされます。 -
NavigationOption.NEXT_TRANSACTION
: 現在のトランザクション・グループ内のメッセージをスキップし、次のトランザクション・グループの最初のメッセージをデキューします。この設定を使用できるのは、キューでメッセージのグループ化が可能な場合のみです。
-
-
メッセージIDの取得: デキューするメッセージのメッセージIDを取得する必要があるかどうかを指定します。デフォルトでは取得されません。
-
変換: デキューの後にメッセージに適用する変換を指定します。変換のソース・タイプは、キューのタイプと一致させる必要があります。
ノート:
変換は、
DBMS_TRANSFORM.CREATE_TRANSFORMATION(...)
を使用して、PL/SQL内で作成する必要があります。 -
可視性: メッセージを、現在のトランザクションの一部としてデキューするかどうかを指定します。次のいずれかの値を指定できます。
-
VisibilityOption.ON_COMMIT
: (デフォルト)現在のトランザクションの一部としてデキューします。 -
VisibilityOption.IMMEDIATE
: 操作の完了時にコミットされる自律型トランザクションとしてデキューします。
ノート:
デキュー・モードが
DequeueMode.BROWSE
の場合、可視性オプションは無視されます。配信フィルタがDEQUEUE_BUFFERED
またはDEQUEUE_PERSISTENT_OR_BUFFERED
である場合、このオプションはVisibilityOption.IMMEDIATE
に設定する必要があります。 -
-
待機: 検索基準に一致するメッセージが1つもない場合にデキュー操作に適用する待機時間を指定します。デフォルト値の
DEQUEUE_WAIT_FOREVER
は、デキュー操作を無期限に待機状態にすることを示します。DEQUEUE_NO_WAIT
に設定した場合、デキュー操作は待機状態になりません。数値を指定した場合、デキュー操作は指定された秒数の間待機状態になります。ノート:
DEQUEUE_WAIT_FOREVER
を指定すると、デキュー操作は検索基準に一致するメッセージがキュー内でデキュー可能になるまで待機状態が続きます。ただし、OracleConnection
オブジェクトに対してcancel
メソッドをコールすると、デキュー操作を中断できます。
次のコードは、デキュー・オプションを設定してメッセージをデキューする方法の例を示しています。
... AQDequeueOptions deqopt = new AQDequeueOptions(); deqopt.setRetrieveMessageId(true); deqopt.setConsumerName(consumerName); AQMessage msg = conn.dequeue(queueName,deqopt,queueType);
26.8 例: エンキューとデキュー
例26-2 単一メッセージのエンキュー
この例では、キューにアクセスし、メッセージを作成して、メッセージをエンキューする方法を示しています。
AQMessageProperties msgprop = AQFactory.createAQMessageProperties(); msgprop.setPriority(1); msgprop.setExceptionQueue("EXCEPTION_QUEUE"); msgprop.setExpiration(0); AQAgent agent = AQFactory.createAQAgent(); agent.setName("AGENTNAME"); agent.setAddress("AGENTADDRESS"); msgprop.setSender(agent); AQMessage mesg = AQFactory.createAQMessage(msgprop); mesg.setPayload(buffer); // where buffer is a byte array (for a RAW queue) AQEnqueueOptions options = new AQEnqueueOptions(); conn.enqueue("HR.MY_QUEUE", options, mesg);
例26-3 単一メッセージのデキュー
この例では、キューにアクセスし、デキュー・オプションを設定して、メッセージをデキューする方法を示しています。
AQDequeueOptions options = new AQDequeueOptions(); options.setDeliveryFilter(AQDequeueOptions.DeliveryFilter.BUFFERED); AQMessage mesg = conn.dequeue("HR.MY_QUEUE", options, "RAW");