30 Oracleアドバンスト・キューイング
Oracleアドバンスト・キューイング(AQ)では、データベース統合型のメッセージ・キューイング機能が提供されます。これは、Oracle Streamsの上に構築されます。
これによってOracle Databaseの機能が最適化されて、メッセージを永続的に格納し、様々なコンピュータ上およびデータベース上のキュー間で伝播し、Oracle Net Services、HTTPおよびHTTPSを使用して送信することができるようになります。Oracle AQはデータベース表に実装されます。そのため、運用上の利点である高可用性、スケーラビリティ、信頼性のすべてがキュー・データにも適用されます。この章では、Oracle AQへのJavaインタフェースに関する情報を提供します。
ノート:
-
Oracleアドバンスト・キューイング(AQ)は、Oracle JDBC Thinドライバの機能であり、JDBC OCIドライバではサポートされていません。
-
Oracle Database 21cリリースで、JSONキューのサポートが追加されました。Oracle Database 19cリリースまでは、サポートされているペイロード・タイプは
RAW、ADT、ANYDATAおよびXMLTypeでした。Oracle Database 23aiでは、配列エンキューおよびデキュー操作でJSONペイロード・タイプを使用することもできます。
この章の構成は、次のとおりです。
30.1 Oracleアドバンスト・キューイングの機能とフレームワーク
Oracle JDBCパッケージであるoracle.jdbc.aqは、高速JavaインタフェースをAQに提供しています。このパッケージには、次のものが含まれています。
-
クラス
-
AQDequeueOptionsデキュー操作で利用可能なオプションを指定します。
-
AQEnqueueOptionsエンキュー操作で利用可能なオプションを指定します。
-
AQFactoryAQのファクトリ・クラスです。
-
AQNotificationEvent通知を有効にするよう登録したキューに新しいメッセージがエンキューされると作成されます。
-
-
インタフェース
-
AQAgentキューのユーザーや、メッセージのプロデューサまたはコンシューマを、表したり特定したりするために使用します。
-
AQMessageエンキューまたはデキュー対象のメッセージを表します。
-
AQMessageProperties相関、送信者、遅延、有効期限、受信者、優先度、順序付けなどのメッセージ・プロパティが含まれます。
-
AQNotificationListenerAQ通知イベントを受信するためのリスナー・インタフェースです。
-
AQNotificationRegistration特定のキューに新しいメッセージがエンキューされた場合に通知を受けることを表します。
-
これらのクラスとインタフェースを使用して、既存のキューにアクセスしたり、メッセージを作成したり、メッセージをエンキューおよびデキューできます。
ノート:
Oracle JDBCドライバには、キューを作成するためのAPIは一切用意されていません。キューは、DBMS_AQADM PL/SQLパッケージを使用して作成する必要があります。
関連項目:
APIの詳細は、Oracle Database JDBC Java APIリファレンスを参照してください。
30.2 データベースの変更
この章で使用されているコードの抜粋では、ユーザーHRがデータベースに接続しているものとします。そのため、データベース内でHRに次の権限を付与する必要があります。
GRANT EXECUTE ON DBMS_AQ to HR; GRANT EXECUTE ON DBMS_AQADM to HR; GRANT AQ_ADMINISTRATOR_ROLE TO HR; GRANT ADMINISTER DATABASE TRIGGER TO HR;
メッセージをエンキューおよびデキューする前に、データベース内にキューが存在する必要があります。手順は次のとおりです。
-
次のようにキュー表を作成します。
BEGIN DBMS_AQADM.CREATE_QUEUE_TABLE( QUEUE_TABLE =>'HR.RAW_SINGLE_QUEUE_TABLE', QUEUE_PAYLOAD_TYPE =>'RAW', COMPATIBLE => '10.0'); END; -
次のようにキューを作成します。
BEGIN DBMS_AQADM.CREATE_QUEUE( QUEUE_NAME =>'HR.RAW_SINGLE_QUEUE', QUEUE_TABLE =>'HR.RAW_SINGLE_QUEUE_TABLE', END; -
次のようにキューを起動します。
BEGIN DBMS_AQADM.START_QUEUE( 'HR.RAW_SINGLE_QUEUE', END;
キューの停止やデータベースからのキュー表の削除が必要な場合があります。次の方法で対処できます。
30.3 AQ非同期イベント通知
JDBCアプリケーションでは次のことができます。
-
AQネームスペースを登録し、エンキュー発生時に通知を受け取ります。その方法は次のとおりです。
public AQNotificationRegistration registerForAQEvents( OracleConnection conn, String queueName) throws SQLException { Properties globalOptions = new Properties(); String[] queueNameArr = new String[1]; queueNameArr[0] = queueName; Properties[] opt = new Properties[1]; opt[0] = new Properties(); opt[0].setProperty(OracleConnection.NTF_AQ_PAYLOAD,"true"); AQNotificationRegistration[] regArr = conn.registerAQNotification(queueNameArr,opt,globalOptions); AQNotificationRegistration reg = regArr[0]; return reg; } -
データベース・イベントにサブスクリプションを登録し、イベントのトリガー時に通知を受け取ります。
登録されたクライアントは、イベントがトリガーされた場合または明示的なAQエンキューの場合に非同期に通知されます(または、通知希望を登録したキューに新規メッセージがエンキューされた場合)。クライアントは、データベースに接続する必要はありません。
次のコードは、データベース・イベントにサブスクライブし、イベントのトリガー時に通知を受け取る方法を示します。
class DemoAQRawQueueListener implements AQNotificationListener { OracleConnection conn; String queueName; String typeName; int eventsCount = 0; public DemoAQRawQueueListener(String _queueName, String _typeName) throws SQLException { queueName = _queueName; typeName = _typeName; conn = (OracleConnection)DriverManager.getConnection (DemoAQRawQueue.URL, DemoAQRawQueue.USERNAME, DemoAQRawQueue.PASSWORD); } public void onAQNotification(AQNotificationEvent e) { try { AQDequeueOptions deqopt = new AQDequeueOptions(); deqopt.setRetrieveMessageId(true); if(e.getConsumerName() != null) deqopt.setConsumerName(e.getConsumerName()); if((e.getMessageProperties()).getDeliveryMode() == AQMessageProperties.DeliveryMode.BUFFERED) { deqopt.setDeliveryMode(AQDequeueOptions.DEQUEUE_BUFFERED); deqopt.setVisibility(AQDequeueOptions.DEQUEUE_IMMEDIATE); } AQMessage msg = conn.dequeue(queueName,deqopt,typeName); byte[] msgId = msg.getMessageId(); if(msgId != null) { String mesgIdStr = DemoAQRawQueue.byteBufferToHexString(msgId,20); System.out.println("ID of message dequeued = "+mesgIdStr); } System.out.println(msg.getMessageProperties().toString()); byte[] payload = msg.getPayload(); if(typeName.equals("RAW")) { String payloadStr = new String(payload,0,10); System.out.println("payload.length="+payload.length+", value="+payloadStr); } } catch(SQLException sqlex) { System.out.println(sqlex.getMessage()); } eventsCount++; } public int getEventsCount() { return eventsCount; } public void closeConnection() throws SQLException { conn.close(); } } -
次のようにリスナーに登録します。
AQNotificationRegistration reg = registerForAQEvents(conn,queueName+":BLUE"); DemoAQRawQueueListener demo_li = new DemoAQRawQueueListener(queueName,queueType); reg.addListener(demo_li);
30.4 メッセージの作成について
この項では、次の概念について説明します。
30.4.1 メッセージの作成
メッセージをエンキューするには、まずそのメッセージを作成する必要があります。AQメッセージは、AQMessageインタフェースを実装したクラスのインスタンスによって表されます。各AQメッセージには、一連のプロパティ(メタデータ)と1つのペイロード(データ)が含まれます。AQメッセージを作成するには次の操作を実行します。
30.4.2 AQメッセージのプロパティ
AQメッセージのプロパティは、AQMessagePropertiesインタフェースのインスタンスによって表されます。設定または取得できるメッセージ・プロパティは次のとおりです。
-
デキュー試行回数: メッセージのデキューを試行した回数を示します。このプロパティは、設定できません。
-
相関: メッセージのエンキュー時に、そのメッセージのプロデューサによって提供されるIDです。
-
遅延: メッセージをいつまで
WAITING状態にしておくかを示す秒数です。指定された遅延時間が経過すると、メッセージはREADY状態になり、デキューできるようになります。メッセージID(msgid)を使用してメッセージをデキューした場合、遅延時間の指定はオーバーライドされます。ノート:
バッファ済メッセージではdelayはサポートされません。
-
配信モード: メッセージがバッファ・メッセージであるか永続メッセージであるかを示します。このプロパティは、設定できません。
-
エンキュー時間: メッセージがエンキューされた時刻を示します。この値はシステムにより判断されるため、ユーザーは設定できません。
-
例外キュー: メッセージを正常に処理できない場合にメッセージの移動先となるキューの名前を指定します。メッセージが移動されるのは、次の2つの場合です。
-
デキューに失敗した回数が
max_retriesを超えた場合。 -
メッセージが期限切れになった場合。
-
-
有効期限: メッセージが
READY状態になってから、そのメッセージをデキューできなくなるまでの秒数。期限切れになる前にデキューされない場合、メッセージはEXPIRED状態で例外キューに移されます。 -
メッセージ状態: メッセージがデキューされた時点のメッセージの状態を示します。このプロパティは、設定できません。
-
直前のキューにおけるメッセージID: 現在のメッセージを生成した最後のキューに入っているメッセージのIDです。メッセージがあるキューから別のキューに伝播される際に、この属性はメッセージの最後の伝播元であるキューのIDを識別します。このプロパティは、設定できません。
-
優先度: メッセージの優先順位を指定します。マイナスの整数を含む任意の整数です。小さい値ほど、高い優先度を表します。
-
受信者リスト: 受信者を表す
AQAgentオブジェクトのリストです。デフォルトの受信者はキューのサブスクライバです。このパラメータは、複数コンシューマのキューに対してのみ有効です。 -
送信者: メッセージのエンキュー時にプロデューサによって指定される識別子です。これは
AQAgentのインスタンスです。 -
トランザクション・グループ: キューがトランザクション・グループ対応である場合に、メッセージのトランザクション・グループを示します。このプロパティは、
dequeueArrayメソッドに対するコールが成功した後に設定されます。
30.4.3 AQメッセージのペイロード
AQメッセージのペイロードは、キューの型に応じ、AQMessageインタフェースのsetPayloadメソッドを使用して指定します。次のコードは、ペイロードの設定方法の例を示しています。
... byte[] rawPayload = "Example_Payload".getBytes(); mesg.setPayload(new oracle.sql.RAW(rawPayload)); ...
AQメッセージのペイロードを取得するには、次のようにgetPayloadメソッドまたは適切なgetXXXPayloadメソッドを使用します。
byte[] payload = mesg.getPayload();
これらのメソッドは、AQMessageインタフェースで定義されています。
30.5 例: メッセージの作成およびペイロードの設定
この項では、メッセージを作成しペイロードを設定する方法の例を示します。
例30-1 メッセージの作成およびペイロードの設定
この例では、AQMessagePropertiesのインスタンスの作成、プロパティ属性の設定、AQメッセージの作成およびペイロードの設定の方法を示します。
AQMessageProperties msgprop = AQFactory.createAQMessageProperties();
msgprop.setCorrelation("mycorrelation");
msgprop.setExceptionQueue("MY_EXCEPTION_QUEUE");
AQAgent ag = AQFactory.createAQAgent();
ag.setName("MY_SENDER_AGENT_NAME");
ag.setAddress("MY_SENDER_AGENT_ADDRESS");
msgprop.setSender(ag);
// handle multi consumer case:
if(recipients != null)
msgprop.setRecipientList(recipients);
System.out.println(msgprop.toString());
AQMessage mesg = AQFactory.createAQMessage(msgprop);
byte[] rawPayload = "Example_Payload".getBytes();
mesg.setPayload(new oracle.sql.RAW(rawPayload));
30.6 メッセージのエンキュー
メッセージを作成し、メッセージのプロパティとペイロードを設定したら、OracleConnectionインタフェースのenqueueメソッドを使用してメッセージをエンキューできます。メッセージをエンキューする前には、いくつかのエンキュー・オプションを指定できます。具体的には、AQEnqueueOptionsクラスを使用して、次のエンキュー・オプションを指定できます。
-
配信モード: 配信モードを指定します。配信モードは、永続(
ENQUEUE_PERSISTENT)またはバッファ(ENQUEUE_BUFFERED)に設定できます。 -
メッセージIDの取得: メッセージのエンキュー時にサーバーからメッセージIDを取得するかどうかを指定します。デフォルトでは、メッセージIDは取得されません。
-
変換: エンキューの前にメッセージに適用する変換を指定します。変換ファンクションの戻り型は、キューの型と一致する必要があります。
ノート:
変換は、
DBMS_TRANSFORM.CREATE_TRANSFORMATION(...)を使用して、PL/SQL内で作成する必要があります。 -
可視性: エンキュー・リクエストの、トランザクション上の動作を指定します。このオプションのデフォルト値は
ENQUEUE_ON_COMMITです。この設定値は、エンキュー操作が現在のトランザクションの一部であることを示します。ENQUEUE_IMMEDIATEを指定した場合、エンキュー操作は自律型トランザクションとなり、操作の完了時にコミットされます。バッファ・メッセージの場合は、ENQUEUE_IMMEDIATEを使用する必要があります。
次のコードは、エンキュー・オプションを設定してメッセージをエンキューする方法の例を示しています。
... AQEnqueueOptions opt = new AQEnqueueOptions();opt.setRetrieveMessageId(true); conn.enqueue(queueName, opt, mesg); ...
30.7 メッセージのデキュー
エンキューされたメッセージをデキューするには、OracleConnectionインタフェースのdequeueメソッドを使用します。メッセージをデキューする前には、デキュー・オプションを設定する必要があります。具体的には、AQDequeueOptionsクラスを使用して、次のデキュー・オプションを指定できます。
-
条件: メッセージ・プロパティ、メッセージ・データ・プロパティおよびPL/SQLファンクションに基づいて条件式を指定します。デキュー条件は、SQL問合せの
WHERE句に似た構文を使用し、Boolean式として指定します。 -
コンシューマ名: これを指定すると、そのコンシューマ名に一致するメッセージのみがアクセスされます。
ノート:
キューが単一コンシューマ・キューの場合は、このオプションを設定しないでください。
-
相関: デキュー操作に適用する相関基準(検索基準)を指定します。
-
配信フィルタ: デキューするメッセージの型を指定します。指定できるのは、バッファ・メッセージのみ(
DEQUEUE_BUFFERED)、永続メッセージのみ(DEQUEUE_PERSISTENT、デフォルト)またはその両方(DEQUEUE_PERSISTENT_OR_BUFFERED)です。 -
デキューするメッセージのID: デキューするメッセージのメッセージIDを指定します。このオプションは、IDがわかっている一意のメッセージをデキューする場合に使用できます。
-
デキュー・モード: デキュー操作に関連付けるロック動作を指定します。次のいずれかの値を指定できます。
-
DequeueMode.BROWSE: ロックを取得せずにメッセージをデキューします。 -
DequeueMode.LOCKED: トランザクションの完了まで持続する書込みロックをかけてメッセージをデキューします。 -
DequeueMode.REMOVE: (デフォルト)メッセージをデキュー後に削除します。保持プロパティが永続であれば、メッセージはキュー内に保持されます。 -
DequeueMode.REMOVE_NO_DATA: メッセージを、更新済または削除済としてマークします。
-
-
最大バッファ長: メッセージが
RAWキューからデキューされる場合に、メッセージに割り当てることができる最大バイト数を指定します。デフォルトの最大値はDEFAULT_MAX_PAYLOAD_LENGTHですが、これはゼロを除く任意の他の値に変更できます。メッセージ全体を格納できるほどバッファが大きくない場合、超過したバイト数は警告なしに無視されます。 -
ナビゲーション: どの位置のメッセージを取得対象とするかを指定します。次のいずれかの値を指定できます。
-
NavigationOption.FIRST_MESSAGE: 検索基準に一致するメッセージのうち、利用可能な最初のメッセージをデキューします。 -
NavigationOption.NEXT_MESSAGE: (デフォルト)検索基準に一致する、次のデキュー可能なメッセージをデキューします。前のメッセージがメッセージ・グループに属する場合は、そのメッセージ・グループに属するメッセージの中から、検索基準に一致する次に使用可能なメッセージがデキューされます。 -
NavigationOption.NEXT_TRANSACTION: 現在のトランザクション・グループ内のメッセージをスキップし、次のトランザクション・グループの最初のメッセージをデキューします。この設定を使用できるのは、キューでメッセージのグループ化が可能な場合のみです。
-
-
メッセージIDの取得: デキューするメッセージのメッセージIDを取得する必要があるかどうかを指定します。デフォルトでは取得されません。
-
変換: デキューの後にメッセージに適用する変換を指定します。変換のソース・タイプは、キューのタイプと一致させる必要があります。
ノート:
変換は、
DBMS_TRANSFORM.CREATE_TRANSFORMATION(...)を使用して、PL/SQL内で作成する必要があります。 -
可視性: メッセージを、現在のトランザクションの一部としてデキューするかどうかを指定します。次のいずれかの値を指定できます。
-
VisibilityOption.ON_COMMIT: (デフォルト)現在のトランザクションの一部としてデキューします。 -
VisibilityOption.IMMEDIATE: 操作の完了時にコミットされる自律型トランザクションとしてデキューします。
ノート:
デキュー・モードが
DequeueMode.BROWSEの場合、可視性オプションは無視されます。配信フィルタがDEQUEUE_BUFFEREDまたはDEQUEUE_PERSISTENT_OR_BUFFEREDである場合、このオプションはVisibilityOption.IMMEDIATEに設定する必要があります。 -
-
待機: 検索基準に一致するメッセージが1つもない場合にデキュー操作に適用する待機時間を指定します。デフォルト値の
DEQUEUE_WAIT_FOREVERは、デキュー操作を無期限に待機状態にすることを示します。DEQUEUE_NO_WAITに設定した場合、デキュー操作は待機状態になりません。数値を指定した場合、デキュー操作は指定された秒数の間待機状態になります。ノート:
DEQUEUE_WAIT_FOREVERを指定すると、デキュー操作は検索基準に一致するメッセージがキュー内でデキュー可能になるまで待機状態が続きます。ただし、OracleConnectionオブジェクトに対してcancelメソッドをコールすると、デキュー操作を中断できます。
次のコードは、デキュー・オプションを設定してメッセージをデキューする方法の例を示しています。
... AQDequeueOptions deqopt = new AQDequeueOptions(); deqopt.setRetrieveMessageId(true); deqopt.setConsumerName(consumerName); AQMessage msg = conn.dequeue(queueName,deqopt,queueType);
30.8 例: エンキューとデキュー
例30-2 単一メッセージのエンキュー
この例では、キューにアクセスし、メッセージを作成して、メッセージをエンキューする方法を示しています。
AQMessageProperties msgprop = AQFactory.createAQMessageProperties();
msgprop.setPriority(1);
msgprop.setExceptionQueue("EXCEPTION_QUEUE");
msgprop.setExpiration(0);
AQAgent agent = AQFactory.createAQAgent();
agent.setName("AGENTNAME");
agent.setAddress("AGENTADDRESS");
msgprop.setSender(agent);
AQMessage mesg = AQFactory.createAQMessage(msgprop);
mesg.setPayload(buffer); // where buffer is a byte array (for a RAW queue)
AQEnqueueOptions options = new AQEnqueueOptions();
conn.enqueue("HR.MY_QUEUE", options, mesg);
例30-3 単一メッセージのデキュー
この例では、キューにアクセスし、デキュー・オプションを設定して、メッセージをデキューする方法を示しています。
AQDequeueOptions options = new AQDequeueOptions();
options.setDeliveryFilter(AQDequeueOptions.DeliveryFilter.BUFFERED);
AQMessage mesg = conn.dequeue("HR.MY_QUEUE", options, "RAW");