6 Transactional Event QueuesおよびAdvanced Queuingに対応するJava Message Service
この章の内容は次のとおりです。
Oracle Transactional Event QueuesおよびAdvanced Queuingに対応するJava Messaging Serviceインタフェース
次のトピックでは、Oracle Database Advanced Queuing (AQ)へのOracle Java Message Service (JMS)インタフェースについて説明します。
JMSおよびOracle JMSの一般的な機能
この項の内容は次のとおりです。
JMSコネクションおよびセッション
この項の内容は次のとおりです。
ConnectionFactoryオブジェクト
ConnectionFactoryは、管理者によって定義された接続構成パラメータの集合をカプセル化します。クライアントはこれを使用してJMSプロバイダとの接続を確立します。この場合、Oracle Databaseの一部であるOracle JMSがJMSプロバイダです。
ConnectionFactoryオブジェクトには、次の3種類があります。
-
ConnectionFactory -
QueueConnectionFactory -
TopicConnectionFactory
AQjmsFactoryを使用したConnectionFactoryオブジェクトの取得
AQjmsFactoryクラスを使用して、ConnectionFactory、QueueConnectionFactoryまたはTopicConnectionFactoryオブジェクトに対するハンドルを取得できます。
Point-to-Point操作とパブリッシュ・サブスクライブ操作の両方をサポートしているConnectionFactoryを取得するには、AQjmsFactory.getConnectionFactory()を使用します。QueueConnectionFactoryを取得するには、AQjmsFactory.getQueueConnectionFactory()を使用します。TopicConnectionFactoryを取得するには、AQjmsFactory.getTopicConnectionFactory()を使用します。
ConnectionFactory、QueueConnectionFactoryまたはTopicConnectionFactoryは、ホスト名、ポート番号、SIDドライバまたはJDBC URLおよびプロパティを使用して作成できます。
JNDIを使用したConnectionFactoryオブジェクトの検索
JMS管理者は、Lightweight Directory Access Protocol(LDAP)サーバーにConnectionFactoryオブジェクトを登録できます。JMSでJava Naming and Directory Interface(JNDI)の検索を使用可能にするには、次の設定が必要です。
JMSコネクション
JMSコネクションは、クライアントとJMSプロバイダの間のアクティブなコネクションを表します。JMSコネクションによって、次に示すいくつかの重要なサービスが実行されます。
-
JMSプロバイダとのオープン・コネクションまたはコネクション・プールのいずれかをカプセル化します。
-
クライアントとプロバイダのサービス・デーモン間のオープンなTCP/IPソケットを表します。
-
コネクション確立時に、クライアントを認証するための構造を提供します。
-
Sessionsを作成します -
コネクション・メタデータを提供します
-
オプションの
ExceptionListenerをサポートします。
データベースへのJMSコネクションは、createConnection()、createQueueConnection()またはcreateTopicConnection()を起動し、ConnectionFactoryオブジェクト、QueueConnectionFactoryオブジェクトまたはTopicConnectionFactoryオブジェクトにそれぞれパラメータusernameおよびpasswordを渡して作成できます。
次に、Connectionオブジェクトでサポートされているメソッドのいくつかを示します。
-
start()このメソッドは、着信メッセージの配信を開始または再開します。
-
stop()このメソッドは、着信メッセージの配信を一時停止します。
Connectionオブジェクトが停止すると、すべてのメッセージ・コンシューマへの配信は禁止されます。また、同期受信のブロックおよびメッセージは、メッセージ・リスナーに配信されません。 -
close()このメソッドは、JMSセッションをクローズして、関連付けられたすべてのリソースを解放します。
-
createSession(true, 0)このメソッドは、JMS
Connectionインスタンスを使用してJMSSessionを作成します。 -
createQueueSession(true,0)このメソッドは
QueueSessionを作成します。 -
createTopicSession(true,0)このメソッドは
TopicSessionを作成します。 -
setExceptionListener(ExceptionListener)このメソッドは、
Connectionの例外リスナーを設定します。問題が非同期でクライアントに通知されます。メッセージを処理するのみのConnectionの場合、その失敗を確認する他の方法はありません。 -
getExceptionListener()このメソッドは、この
ConnectionのExceptionListenerを取得します。
JMSクライアントは、通常1つのConnection、1つのSession、複数のMessageProducerおよびMessageConsumerオブジェクトを作成します。現在のバージョンでは、次の場合を除いて、1つのConnectionにつき1つのオープンSessionのみが許可されています。
-
JDBC OCI8ドライバを使用してJMSコネクションを作成する場合
-
ユーザーが、JMSコネクションの作成時に
OracleOCIConnectionPoolインスタンスを提供する場合
Connectionは停止モードで作成されます。この状態では、メッセージを受信できません。通常、設定が完了するまで、Connectionを停止モードにしておきます。その時点でConnection start()メソッドがコールされ、メッセージがConnectionコンシューマに配信され始めます。このような設定規則を確立することにより、クライアントが設定されている最中の非同期メッセージ配信の結果、発生するクライアントの混乱を最小限に止められます。
Connectionを起動し、続けて設定を実行できます。これを実行するクライアントは、設定処理中に非同期メッセージの配信処理の準備をする必要があります。MessageProducerは、Connectionが停止中でもメッセージを送信できます。
JMSセッション
JMS Sessionは、メッセージの作成および処理用の単一スレッドのコンテキストです。これは、Java Virtual Machine(JVM)の外でプロバイダ・リソースを割り当てますが、軽量なJMSオブジェクトとみなされます。
Sessionには、次の役割があります。
-
MessageProducerおよびMessageConsumerオブジェクトのファクトリを構成します。 -
宛先オブジェクト(キュー/トピック)に対するハンドルの取得方法を提供します。
-
プロバイダが最適化したメッセージ・ファクトリを提供します。
-
セッションの
MessageProducerおよびMessageConsumerオブジェクトにまたがる作業を組み合せる一連のトランザクションをサポートし、これらを単位に構成します。 -
セッションが処理および作成するメッセージのシリアル順序を定義します。
-
セッションに登録された
MessageListenerオブジェクトの実行をシリアル化します。
Oracle Database 20cでは、JDBC thinまたはJDBC thick(OCI)ドライバのいずれかを使用すると、単一のJMS Connectionを使用して、リソースが許すかぎり多くのJMS Sessionを作成できます。
プロバイダが、JVM外のSessionのためのリソースを割り当てることができるため、クライアントはリソースが必要ないときはこれらを閉じる必要があります。ガベージ・コレクションによる最終的なリソースの解放を待つ必要はありません。Sessionが作成したMessageProducerおよびMessageConsumerオブジェクトについても同様です。
Sessionオブジェクトのメソッドには、次のものが含まれます。
-
commit()このメソッドは、トランザクションで実行されるすべてのメッセージをコミットして、現在保持されているロックを解放します。
-
rollback()このメソッドは、トランザクションで実行されたすべてのメッセージをロールバックして、現在保持されているロックを解放します。
-
close()このメソッドは
Sessionをクローズします。 -
getDBConnection()このメソッドは、基礎となるJDBCコネクションに対するハンドルを取得します。このハンドルを使用して、他のSQL DML操作を同じ
Sessionの一部として実行できます。このメソッドは、Oracle JMSに固有です。 -
acknowledge()このメソッドは、非トランザクション・セッションでメッセージの受信を承認します。
-
recover()このメソッドは、非トランザクション・セッションでメッセージの配信を再開します。セッション中に配信された一連のメッセージは、最後に承認されたメッセージの後でリセットされます。
Oracle JMSの拡張例は、次のとおりです。
-
createQueueTable()このメソッドはキュー表を作成します。
-
getQueueTable()このメソッドは既存のキュー表に対するハンドルを取得します。
-
createQueue()このメソッドはキューを作成します。
-
getQueue()このメソッドは既存のキューに対するハンドルを取得します。
-
createTopic()このメソッドはトピックを作成します。
-
getTopic()このメソッドは既存のトピックに対するハンドルを取得します。
どの拡張機能を使用する場合も、SessionオブジェクトをAQjmsSessionにキャストする必要があります。
ノート:
JMS仕様では、開始されていないJMS Connectionインスタンスで受信が完了したとき、プロバイダによりNULLメッセージが返されることを要求します。
javax.jms.Connectionインスタンスの作成後、start()メソッドをそのインスタンスにコールしてメッセージを受信できるようにする必要があります。コネクション確立後で実際に受信する前に、t_conn.start();などの行を追加するとメッセージを受信できます。
JMS宛先
Destinationは、クライアントがメッセージの送信先および受信元の指定に使用するオブジェクトです。DestinationオブジェクトはQueueまたはTopicに指定できます。Oracle Database Advanced Queuingでは、特定のデータベースのschema.queueにマップされます。Queueは単一コンシューマ・キューに、Topicはマルチ・コンシューマ・キューにマップします。
JMSセッションを使用したDestinationオブジェクトの取得
Destinationオブジェクトは、Sessionオブジェクトからドメイン固有の次のSessionメソッドを使用して作成されます。
-
AQjmsSession.getQueue(queue_owner, queue_name)このメソッドはJMSキューに対するハンドルを取得します。
-
AQjmsSession.getTopic(topic_owner, topic_name)このメソッドはJMSトピックに対するハンドルを取得します。
JNDIを使用したDestinationオブジェクトの検索
LDAPサーバーにスキーマ・オブジェクトを登録するようにデータベースを設定できます。データベースがLDAPを使用できるように設定され、GLOBAL_TOPIC_ENABLEDパラメータがTRUEに設定されている場合、すべてのJMSキューおよびトピックは、作成時に自動的にLDAPサーバーに登録されます。管理者は、LDAPに登録されたキューおよびトピックの別名を作成することもできます。LDAPに登録されたキューおよびトピックは、JNDIを介してキューまたはトピックの名前または別名を使用してルックアップできます。
関連項目:
JMS宛先メソッド
Destinationオブジェクトのメソッドには、次のものが含まれます。
-
alter()このメソッドは
キューまたはトピックを変更します。 -
schedulePropagation()このメソッドは、ソースから宛先への伝播をスケジューリングします。
-
unschedulePropagation()このメソッドは、スケジュール済伝播のスケジュールを解除します。
-
enablePropagationSchedule()このメソッドは、伝播スケジュールを有効化します。
-
disablePropagationSchedule()このメソッドは、伝播スケジュールを無効化します。
-
start()このメソッドは
キューまたはトピックを開始します。キューは、エンキューまたはデキューを可能にするために開始されます。トピックは、パブリッシュまたはサブスクライブを可能にするために開始できます。 -
stop()このメソッドは
キューまたはトピックを停止します。キューは、エンキューまたはデキューを実行不可にするために停止できます。トピックは、パブリッシュまたはサブスクライブを実行不可にするために停止できます。 -
drop()このメソッドは
キューまたはトピックを削除します。
JMSでのシステム・レベルのアクセス制御
Oracle8i以上では、すべてのキューイング操作に対してシステム・レベルのアクセス制御をサポートします。この機能によって、アプリケーション設計者またはDBAは、ユーザーをキュー管理者にできます。キュー管理者は、データベースのどのキューに対してもJMSインタフェース(管理および操作)を起動できます。これによって、データベース上のキュー全体に対するすべての管理スクリプトを1つのスキーマで管理できるため、管理作業が容易になります。
メッセージが宛先キューに届くと、ソース・キューのスキーマ名に基づいたセッションによって、新しく届いたメッセージが宛先キューにエンキューされます。つまり、ソース・キューのスキーマに宛先キューに対するエンキュー権限を付与する必要があります。
リモートの宛先キューに伝播するために、エージェント構造体のアドレス・フィールドのデータベース・リンクに指定されたログイン・ユーザーには、ENQUEUE_ANY権限を付与するか、または宛先キューに対するエンキュー権限を付与する必要があります。ただし、データベース・リンクのログイン・ユーザーが宛先のキュー表を所有している場合はどのような明示的な権限も付与する必要はありません。
JMSでの宛先レベルのアクセス制御
Oracle8i以上では、エンキューおよびデキュー操作に対してキュー・レベルまたはトピック・レベルのアクセス制御をサポートしています。この機能によって、アプリケーション設計者は、あるスキーマに作成されたキューおよびトピックを他のスキーマで実行中のアプリケーションから保護できます。そのキューまたはトピックが属するスキーマの外で実行しているアプリケーションには、最小限のアクセス権限のみを付与できます。キューまたはトピックに対するアクセス権限として、ENQUEUE、DEQUEUEおよびALLがサポートされています。
JMSでの保存およびメッセージ履歴
メッセージは、相互に関連していることがよくあります。たとえば、あるメッセージを処理した結果として他のメッセージが生成された場合、両者は関連付けられています。アプリケーション設計者としては、そのような関連を追跡することが必要な場合があります。Oracle Database Advanced Queuingでは、ユーザーがメッセージをキュー表に保持し、分析のためにSQLで問合せできます。
保存機能およびメッセージ識別子とともに、メッセージ・ジャーナルがOracle Database Advanced Queuingによって自動作成され、追跡ジャーナルまたはイベント・ジャーナルをコールできます。保存、メッセージ識別子およびSQL問合せの協調によって、強力なメッセージ・ウェアハウスを構築できます。
JMSでのOracle Real Application Clustersのサポート
トランザクション・イベント・キュー(TxEventQ)とは、システム管理のパーティション化を使用して、独立した複数の物理キューに分割される1つの論理キューです。TxEventQは、Oracle RACインスタンス全体で使用されるキュー、エンキュー率やデキュー率の高いキュー、サブスクライバの多いキューに適したJMSキューです。詳細は、「Transactional Event QueuesとOracle Real Application Clusters (Oracle RAC)」を参照してください。
AQキューの場合は、Oracle Real Application Clusters (Oracle RAC)を使用して、各インスタンスを別々のキューで管理できるようにすることで、Oracle Database Advanced Queuingのパフォーマンスを向上できます。このためには、キューを格納するキュー表に様々なインスタンス・アフィニティ(作業環境)を指定します。これによって、様々なキュー/トピックに対するキュー操作(エンキューまたはデキュー)またはトピック操作(パブリッシュ・サブスクライブ)を並行して行うことができるようになります。
Oracle Database Advanced Queuingのキュー・モニター・プロセスは、キュー表のインスタンス・アフィニティを継続的に監視します。キュー・モニターは指定されたプライマリ・インスタンスが使用可能な場合はそれにキュー表の所有権を割り当て、失敗した場合は指定されたセカンダリ・インスタンスに割り当てます。
キュー表を所有しているインスタンスが終了すると、キュー・モニターはセカンダリ・インスタンスなどの適切なインスタンスに所有者を変更します。
Oracle Database Advanced Queuingの伝播はOracle Real Application Clustersでも使用できますが、これはユーザーに対して透過的です。伝播スケジュールのジョブ・アフィニティは、それぞれのキュー表のアフィニティと同じ値に設定されます。このように、キュー表を所有するインスタンスに対応付けられたjob_queue_processは、そのキュー表に格納されているキューからの伝播を処理し、ping操作を最小限に抑えます。
JMSでの統計ビューのサポート
各インスタンスは、それぞれのOracle Database Advanced Queuing統計情報をシステム・グローバル領域(SGA)に所有し、他のインスタンスによって収集された統計については認識しません。そのため、インスタンスがGV$AQビューを問い合せると、その他のすべてのインスタンスからそれぞれのその時点の統計情報が問合せ元のインスタンスに集まります。
GV$AQビューは、待機中、準備完了または期限切れの各メッセージ数を必要なときにいつでも問い合せることができます。また、メッセージが処理されるまでの平均待機秒数も表示します。
JMSでの構造化ペイロード/メッセージの型
JMSメッセージは、ヘッダー、プロパティおよび本体で構成されています。
ヘッダーは、クライアントおよびプロバイダの両方でメッセージの識別およびルーティングに使用される値が含まれるヘッダー・フィールドで構成されています。すべてのメッセージは、同じ一連のヘッダー・フィールドをサポートしています。
プロパティは、オプションのヘッダー・フィールドです。JMSによって定義された標準プロパティの他に、プロバイダ固有およびアプリケーション固有のプロパティを含めることもできます。
本体はメッセージ・ペイロードです。JMSは、様々な型のメッセージ・ペイロードおよびJMSで指定されたメッセージ型のJMSメッセージを格納できる型を定義します。
この項の内容は次のとおりです。
JMSメッセージ・ヘッダー
JMSメッセージ・ヘッダーには、次のフィールドが含まれます。
-
JMSDestinationこのフィールドには、メッセージの送信先が含まれます。Oracle Database Advanced Queuingでは、これは送信先キューまたはトピックに相当します。
Sendメソッドの完了後にJMSによって設定されるDestination型です。 -
JMSDeliveryModeこのフィールドは、メッセージのログが記録されるかどうかを決定します。JMSでは、
PERSISTENT送信(メッセージが決まった記憶域にロギングされる)およびNONPERSISTENT送信(メッセージがロギングされない)をサポートしています。Sendメソッドの完了後にJMSによって設定されるINTEGERです。JMSでは、クライアントが指定したJMSDeliveryModeの値をオーバーライドするよう管理者がJMSを構成することが可能です。 -
JMSMessageIDこのフィールドは、プロバイダ内のメッセージを一意に識別します。すべてのメッセージIDは先頭に文字列
ID:を使用する必要があります。Sendメソッドの完了後にJMSによって設定されるString型です。 -
JMSTimeStampこのフィールドには、送信先のプロバイダにメッセージが渡された時間が含まれます。Oracle Database Advanced Queuingのメッセージのエンキュー時間にマッピングされます。
Sendメソッドの完了後にJMSによって設定されるLong型です。 -
JMSCorrelationIDこのフィールドは、クライアントがメッセージを別のメッセージにリンクするために使用できます。JMSクライアントによって設定される
String型です。 -
JMSReplyToこのフィールドには、メッセージ送信時にクライアントが指定する
Destination型が含まれます。クライアントはoracle.jms.AQjmsAgent、javax.jms.Queueまたはjavax.jms.Topicを使用できます。 -
JMSTypeこのフィールドには、送信時にクライアントが指定するメッセージ・タイプの識別子が含まれます。これは
String型です。移植性を高めるために、JMSTypeに記号値を使用することをお薦めします。 -
JMSExpirationこのフィールドは、非Java EE準拠モードでは、エンキュー時間と
TimeToLiveの合計です。準拠モードでは、デキューされたメッセージのJMSExpirationヘッダー値は、メッセージのエンキュー時のJMSTimeStamp(ミリ秒単位のグリニッジ標準時)とTimeToLive(ミリ秒単位)の合計です。Sendメソッドの完了後にJMSによって設定されるLong型です。JMSでは、クライアントが指定したJMSExpirationの値をオーバーライドするよう管理者がJMSを構成することが可能です。 -
JMSPriorityこのフィールドには、メッセージの優先度が含まれます。
Sendメソッドの完了後にJMSによって設定されるINTEGERです。Java EE準拠モードでは、優先度に使用できる値は0から9で、9の優先度が最も高く、4がデフォルトで、Sun Microsystem JMS 1.1標準に準拠しています。デフォルトは非準拠モードです。JMSでは、クライアントが指定したJMSPriorityの値をオーバーライドするよう管理者がJMSを構成することが可能です。 -
JMSRedeliveredこのフィールドは、JMSプロバイダによって設定されたブールです。
関連項目:
JMSメッセージ・プロパティ
JMSプロパティは、クライアントによって明示的に設定されるか、またはJMSプロバイダによって自動的に設定されます(通常、これらは読取り専用です)。JMSプロパティには、SendおよびReceive操作に指定されたパラメータを使用して設定されるものもあります。
プロパティで、メッセージにオプションのヘッダー・フィールドを追加できます。プロパティによって、クライアントは、messageSelectorを使用して、クライアントのかわりにJMSプロバイダにアプリケーション固有基準を使用してメッセージを選択させることができます。プロパティ名は文字列で、値はBoolean、byte、short、int、long、float、doubleおよびstringです。
JMS定義のプロパティは、すべてJMSXで始まります。次のものがあります。
-
JMSXUserIDこのフィールドは、メッセージの送信元ユーザーの識別情報です。
Sendメソッドの完了後にJMSによって設定されるString型です。 -
JMSXAppIDこのフィールドは、メッセージの送信元アプリケーションの識別情報です。
Sendメソッドの完了後にJMSによって設定されるString型です。 -
JMSXDeliveryCountこのフィールドは、メッセージの送信試行回数です。
Sendメソッドの完了後にJMSによって設定されるIntegerです。 -
JMSXGroupidこのフィールドは、メッセージが属するメッセージ・グループの識別情報です。JMSクライアントによって設定される
String型です。 -
JMSXGroupSeqこのフィールドは、グループ内のメッセージの順序番号です。JMSクライアントによって設定される
Integer型です。 -
JMSXRcvTimeStampこのフィールドは、コンシューマにメッセージが配信された時間(デキュー時間)です。
Receiveメソッドの完了後にJMSによって設定されるString型です。 -
JMSXStateこのフィールドは、プロバイダによって設定されたメッセージ状態です。メッセージ状態は、
WAITING、READY、EXPIREDまたはRETAINEDです。
Oracle固有のJMSプロパティは、すべてJMS_Oracleで始まります。次のものがあります。
-
JMS_OracleExcpQこのフィールドは、元の宛先に配信できないメッセージを送信するキュー名です。JMSクライアントによって設定される
String型です。EXCEPTIONタイプの宛先のみをJMS_OracleExcpQプロパティで指定できます。 -
JMS_OracleDelayこのフィールドは、メッセージの配信を遅延する時間(秒単位)です。JMSクライアントによって設定される
Integer型です。これは、メッセージの配信順序に影響を与える可能性があります。 -
JMS_OracleOriginalMessageIdこのフィールドは、メッセージがある宛先から別の宛先に伝播された場合に、ソースのメッセージのメッセージ識別子に設定されます。JMSプロバイダによって設定される
String型です。メッセージが伝播されていない場合、このプロパティはJMSMessageIdと同じ値です。
クライアントは、プロパティを定義することによって、メッセージにヘッダー・フィールドを追加できます。これらのプロパティをmessageSelectorで使用して特定のメッセージを選択できます。
JMSメッセージ本体
JMSでは、次の5つのフォーマットのメッセージ本体が提供されます。
StreamMessage
StreamMessageオブジェクトは、Javaプリミティブのストリームの送信に使用されます。格納と読取りは順次的に行われます。これはMessageから継承し、StreamMessage本文を追加します。メソッドは主にjava.io.DataInputStreamとjava.io.DataOutputStreamのメソッドに基づいています。
基本データ型の値は、それぞれの型のメソッドを使用して、明示的に読込みまたは書込みできます。抽象的なオブジェクトとして読込みまたは書込みすることもできます。StreamMessageオブジェクトを使用するには、SYS.AQ$_JMS_STREAM_MESSAGEまたはAQ$_JMS_MESSAGEペイロード型を持つキュー表を作成します。
StreamMessageオブジェクトは、表6-1に示す変換をサポートします。行の型として書き込まれる値は、列の型として読み込むことができます。
表6-1 StreamMessageの変換
| 入力 | Boolean | byte | short | char | int | long | float | double | String | byte[] |
|---|---|---|---|---|---|---|---|---|---|---|
|
|
X |
- |
- |
- |
- |
- |
- |
- |
X |
- |
|
|
- |
X |
X |
- |
X |
X |
- |
- |
X |
- |
|
|
- |
- |
X |
- |
X |
X |
- |
- |
X |
- |
|
|
- |
- |
- |
X |
- |
- |
- |
- |
X |
- |
|
|
- |
- |
- |
- |
X |
X |
- |
- |
X |
- |
|
|
- |
- |
- |
- |
- |
X |
- |
- |
X |
- |
|
|
- |
- |
- |
- |
- |
- |
X |
X |
X |
- |
|
|
- |
- |
- |
- |
- |
- |
- |
X |
X |
- |
|
|
X |
X |
X |
X |
X |
X |
X |
X |
X |
- |
|
|
- |
- |
- |
- |
- |
- |
- |
- |
- |
X |
BytesMessage
BytesMessageオブジェクトは、1つの未解釈バイトのストリームを含むメッセージを送信する場合に使用します。これはMessageから継承し、BytesMessage本文を追加します。メッセージの受信者が、そのバイト列を解析します。メソッドは主にjava.io.DataInputStreamとjava.io.DataOutputStreamのメソッドに基づいています。
これは、クライアントが既存のメッセージ・フォーマットをコード化するためのメッセージ型です。可能であれば、かわりに他の自己定義メッセージ型を使用してください。
基本データ型の値は、それぞれの型のメソッドを使用して、明示的に書込みができます。また、抽象的なオブジェクトとして書き込むこともできます。BytesMessageオブジェクトを使用するには、SYS.AQ$_JMS_BYTES_MESSAGEまたはAQ$_JMS_MESSAGEペイロード型を持つキュー表を作成します。
MapMessage
MapMessageオブジェクトは、名前がString型で値がJava基本データ型である名前/値ペアの集合を送信する場合に使用します。このエントリには、名前を指定して順次的またはランダムにアクセスできます。エントリの順序は定義されません。これはMessageから継承し、MapMessage本文を追加します。基本データ型の値は、それぞれの型のメソッドを使用して、明示的に読込みまたは書込みできます。抽象的なオブジェクトとして読込みまたは書込みすることもできます。
MapMessageオブジェクトを使用するには、SYS.AQ$_JMS_MAP_MESSAGEまたはAQ$_JMS_MESSAGEペイロード型を持つキュー表を作成します。MapMessageオブジェクトは、表6-2に示す変換をサポートします。この表の「X」は、行の型として書き込まれる値を列の型として読み取ることができることを意味します。
表6-2 MapMessageの変換
| 入力 | Boolean | byte | short | char | int | long | float | double | String | byte[] |
|---|---|---|---|---|---|---|---|---|---|---|
|
|
X |
- |
- |
- |
- |
- |
- |
- |
X |
- |
|
|
- |
X |
X |
- |
X |
X |
- |
- |
X |
- |
|
|
- |
- |
X |
- |
X |
X |
- |
- |
X |
- |
|
|
- |
- |
- |
X |
- |
- |
- |
- |
X |
- |
|
|
- |
- |
- |
- |
X |
X |
- |
- |
X |
- |
|
|
- |
- |
- |
- |
- |
X |
- |
- |
X |
- |
|
|
- |
- |
- |
- |
- |
- |
X |
X |
X |
- |
|
|
- |
- |
- |
- |
- |
- |
- |
X |
X |
- |
|
|
X |
X |
X |
X |
X |
X |
X |
X |
X |
- |
|
|
- |
- |
- |
- |
- |
- |
- |
- |
- |
X |
TextMessage
TextMessageオブジェクトは、java.lang.StringBufferを含むメッセージを送信する場合に使用します。これはMessageから継承し、TextMessage本文を追加します。テキスト情報は、getText()およびsetText(...)メソッドを使用して読込みまたは書込みができます。TextMessageオブジェクトを使用するには、SYS.AQ$_JMS_TEXT_MESSAGEまたはAQ$_JMS_MESSAGEペイロード型を持つキュー表を作成します。
ObjectMessage
ObjectMessageオブジェクトは、シリアライズ可能なJavaオブジェクトを含むメッセージを送信する場合に使用します。これはMessageから継承し、単一のJava参照を含む本文を追加します。シリアライズ可能なJavaオブジェクトのみ使用できます。Javaオブジェクトのコレクションを送信する必要がある場合、JDK 1.4で提供されているコレクション・クラスのいずれかを使用できます。オブジェクトは、getObject()メソッドおよびsetObject(...)メソッドを使用して読込みまたは書込みできます。ObjectMessageオブジェクトを使用する場合、SYS.AQ$_JMS_OBJECT_MESSAGEまたはAQ$_JMS_MESSAGEペイロード型でキュー表を作成します。
AdtMessage
AdtMessageオブジェクトは、Oracleのオブジェクト型に対応するJavaオブジェクトを含むメッセージを送信する場合に使用します。これらのオブジェクトはMessageから継承され、CustomDatumまたはORADataインタフェースを実装するJavaオブジェクトを含む本体を追加します。
AdtMessageオブジェクトを使用するには、Oracleのオブジェクト型としてのペイロード型を持つキュー表を作成します。AdtMessageのペイロードは、getAdtPayloadおよびsetAdtPayloadメソッドを使用して、読取りおよび書込みができます。
AdtMessageオブジェクトを使用してSYS.XMLType型のキューへメッセージを送信することもできます。oracle.xdb.XMLTypeクラスを使用してメッセージを作成する必要があります。
AdtMessageオブジェクトの場合、クライアントが次を取得できます。
-
JMSXDeliveryCount -
JMSXRecvTimeStamp -
JMSXState -
JMS_OracleExcpQ -
JMS_OracleDelay
関連項目:
CustomDatumインタフェースおよびORADataインタフェースの詳細は、『Oracle Database Java開発者ガイド』を参照してください。
異なるメッセージ型でのメッセージ・プロパティの使用
次のメッセージ・プロパティは、クライアントがsetPropertyコールを使用して設定できます。StreamMessage、BytesMessage、ObjectMessage、TextMessageおよびMapMessageオブジェクトの場合、クライアントによって次の設定ができます。
-
JMSXAppID -
JMSXGroupID -
JMSXGroupSeq -
JMS_OracleExcpQ -
JMS_OracleDelay
AdtMessageオブジェクトの場合、クライアントによって次の設定ができます。
-
JMS_OracleExcpQ -
JMS_OracleDelay
次のメッセージ・プロパティは、クライアントがgetPropertyコールを使用して取得できます。StreamMessage、BytesMessage、ObjectMessage、TextMessageおよびMapMessageオブジェクトの場合、クライアントが次を取得できます。
-
JMSXuserID -
JMSXAppID -
JMSXDeliveryCount -
JMSXGroupID -
JMSXGroupSeq -
JMSXRecvTimeStamp -
JMSXState -
JMS_OracleExcpQ -
JMS_OracleDelay -
JMS_OracleOriginalMessageID
Oracle JMSを使用したバッファ済メッセージ
メッセージの送信時にdeliveryModeとしてNON_PERSISTENTを指定すると、非永続JMSメッセージを送信できます。JMS非永続メッセージは決まった記憶域に記録する必要がないため、JMSシステム障害が発生すると消失する可能性があります。JMS非永続メッセージはOracle Database Advanced Queuingで使用可能なバッファ済メッセージに類似していますが、両者には重要な相違点も存在します。
ノート:
Oracle JMSの非永続メッセージを、Oracle Database 10gリリース2(10.2)で非推奨になったOracle Database Advanced Queuingの非永続キューと混同しないでください。
関連項目:
トランザクションのコミットとクライアントの確認
JMSのdeliveryModeは、メッセージのトランザクション属性と直交します。JMS非永続メッセージは、処理済セッションまたは非処理済セッションで送受信できます。JMS非永続メッセージを処理済セッションで送受信する場合、JMS操作の効果は処理済セッションのコミット後にのみ参照可能です。CLIENT_ACKNOWLEDGE確認モードの非処理済セッションで受信する場合、このメッセージの受信による効果は、クライアントがメッセージを確認した後にのみ参照可能です。確認されない場合、メッセージは削除されず、クライアントがSession.recoverをコールすると再配信されます。
これに対して、Oracle Database Advanced Queuingのバッファ済メッセージでは、このようなトランザクションまたは確認の概念はサポートされません。バッファ済メッセージの送受信は、どちらもIMMEDIATE可視性モードで実行する必要があります。したがって、セッションがコミットされたかどうかやメッセージが確認されたかどうかは関係なく、ユーザーは送受信操作の効果を即時に参照できます。
各種API
通常のJMS送信およびパブリッシュ・メソッドで送信されるメッセージは、Oracle Database Advanced Queuingでは永続メッセージとして扱われます。通常のJMS受信メソッドは、AQ永続メッセージのみを受信します。バッファ済メッセージを送受信するには、Oracle拡張APIであるbufferSend、bufferPublishおよびbufferReceiveを使用する必要があります。
関連項目:
bufferSend、bufferPublishおよびbufferReceiveの詳細は、Oracle Databaseアドバンスト・キューイングJava APIリファレンスを参照してください
ペイロード制限
バッファ済メッセージのOracle Database Advanced Queuing実装では、LOB属性はサポートされていません。このため、5種類の標準JMSメッセージのペイロードには次の制限が適用されます。
-
JMS
TextMessageのペイロードは4000バイト以内です。Oracle JMSの文字セット変換時には、テキスト・ペイロードをデータベースに格納するために
VARCHARのかわりにCLOBを使用するように控え目な選択が必要になる場合があるため、データベース文字セットによってはこの上限値がさらに低くなる場合があります。 -
JMS
BytesMessageのペイロードは2000バイト以内です。 -
JavaでシリアライズされたJMS
ObjectMessage、StreamMessageおよびMapMessageデータは、2000バイト以内である必要があります。 -
他のすべてのOracle JMS ADTメッセージの場合、対応するOracleデータベースADTに
LOB属性を含めることはできません。
各種定数
表6-3に示すように、Oracle Database Advanced QueuingとOracle JMSのAPIでは、バッファ済メッセージと永続メッセージの指定に使用される数値が異なります。
表6-3 Oracle Database AQとOracle JMSのバッファ済メッセージ定数
| API | 永続メッセージ | バッファ済メッセージ |
|---|---|---|
|
Oracle Databaseアドバンスト・キューイング |
|
|
|
Oracle JMS |
|
|
JMSのバッファ済メッセージ
バッファ済メッセージでは、JMSメッセージ標準が完全にサポートされています。Oracle JMSでは、それらの標準がいくつかの方法で拡張されています。
関連項目:
JMSバッファ済メッセージのエンキュー
Oracle JMSでは、永続メッセージおよびバッファ済メッセージを同じJMSキュー/トピックにエンキューできるように、個別のメッセージにJMSDeliveryModeを設定することによって、アプリケーションはバッファ済メッセージを送信できます。
Oracle JMSのバッファ済メッセージは、エンキュー時刻、優先順位またはその両方で順序付けできます。順序付けは異なるメッセージ・タイプ間では適用されません。このため、たとえば、後に送信された永続メッセージが前に送信されたバッファ済メッセージより前に配信されることがあります。Oracle JMSのバッファ済メッセージでは、期限切れもサポートされます。
関連項目:
JMSバッファ済メッセージのデキュー
JMSでは、JMSサブスクライバが両方のメッセージ・タイプを対象にできるように、サブスクライバが永続メッセージのみまたはバッファ済メッセージのみを対象とするように宣言することを要求しません。
Oracle JMSは、JMSMessageIDによる高速で効率的なメッセージのデキュー、メッセージ・ヘッダーのセレクタ、およびメッセージ・プロパティのセレクタをサポートしています。Oracle JMSデキュー・コールは、永続メッセージとバッファ済メッセージの両方をチェックします。
ノート:
Oracle JMS永続メッセージには一意のメッセージ識別子があります。Oracle JMSバッファ済メッセージ識別子は、キュー/トピック内でのみ一意です。
同時デキュー・プロセスが同じサブスクライバの同じキューからデキューしている場合は、他のプロセスによってロックされているメッセージがスキップされます。
関連項目:
トランザクションのサポート
バッファ済メッセージがトランザクション・セッションでエンキューされる場合、JMSではそれらに対するトランザクション・サポートが必要となります。Oracle JMSでは、バッファ済メッセージに関連するトランザクション・セッションで次の標準が満たされることが保証されます。
-
原子性
Oracle JMSトランザクション内の永続メッセージおよびバッファ済メッセージは、アトミックにコミットまたはロールバックされます。バッファ済メッセージがディスクに書き込まれた場合(LOBに関連するメッセージの場合など)でも、ロールバックによってそれらが削除されます。
-
一貫性
永続メッセージ操作およびバッファ済メッセージ操作がトランザクションでインターリーブされる場合、すべてのOracle JMSユーザーは影響を受けるキュー/トピックの一貫性のあるビューを共有します。トランザクションによってエンキューされたすべての永続メッセージおよびバッファ済メッセージは、コミット時に表示できるようになります。プロセスがトランザクションの途中で終了した場合は、永続メッセージおよびバッファ済メッセージの両方が元に戻されます。Oracle JMSユーザーには、トランザクションのすべての永続メッセージおよびバッファ済メッセージが表示されるか、それらのいずれも表示されません。
-
分離
トランザクションのバッファ済エンキュー操作は、トランザクションがコミットされるまではオーナー・トランザクションにのみ表示されます。トランザクションがコミットされると、すべてのコンシューマに表示されます。
デキュー・トランザクションによってロックされているメッセージはブラウズできます。
確認メッセージの受信
非トランザクション・セッションの確認メッセージ受信のためのack_modeパラメータには、3つの値が定義されています。
-
DUPS_OK_ACKNOWLEDGEこのモードでは、重複メッセージが許可されます。
-
AUTO_ACKNOWLEDGEこのモードでは、セッションはメッセージを自動的に確認します。
-
CLIENT_ACKNOWLEDGEこのモードでは、メッセージ・プロデューサ確認メソッドをコールすることによって、クライアントはメッセージを明示的に確認します。メッセージを確認すると、以前に処理されたすべてのメッセージが確認されます。
関連項目:
「セッションの作成」
バッファ済メッセージのサービス品質
JMSは、伝播されていないバッファ済メッセージを最大1回配信することをサポートするようにプロバイダに要求しています。バッファ済メッセージのリカバリが無効にされている場合、Oracle JMSはこの標準を満たしています。
現在のメッセージ伝播の実装では、メッセージの重複配信が行われることがあります。ただし、メッセージの伝播はOracle JMSによって提供される拡張であるため、これはJMS標準に違反しません。
関連項目:
バッファ済メッセージが重複配信される原因については、バッファ済メッセージの伝播を参照してください。
バッファ済メッセージでサポートされるJMSタイプ
Oracle JMSは、JMSで定義されているタイプをOracleのユーザー定義タイプにマップし、JMSメッセージを格納するためにそれらのユーザー定義タイプのキューを作成します。これらのタイプの一部にはLOB属性があり、メッセージが永続またはバッファ済のいずれであってもOracle JMSはディスクに書き込みます。
たとえば、JMSタイプJMSTextMessageのユーザー定義タイプSYS.AQ$_JMS_TEXT_MESSAGEでは、4kより小さいテキスト文字列がVARCHAR2列に格納されます。ただし、4kより大きいテキスト文字列を格納するためのCLOB属性があります。
JMSメッセージは4kより大きい場合がよくあるため、Oracle JMSは大きいメッセージをメモリーに格納できる新しいADTを提供しています。ADTのディスク表示は変わりませんが、いくつかのVARCHAR2/RAW属性では最大100kのサイズのJMSメッセージをメモリーに格納できます。100kより大きいメッセージもバッファ済メッセージとしてパブリッシュできますが、ディスクに書き込まれます。
関連項目:
JMS Point-to-Pointモデル機能
Point-to-Pointモデルでは、クライアントは、1つのポイントから別のポイントへメッセージを交換します。メッセージのプロデューサとコンシューマは、シングル・コンシューマ・キューを使用してメッセージを送受信します。管理者は、AQjmsSessionのcreateQueueメソッドを使用して、シングル・コンシューマ・キューを作成します。キューを使用する前に、AQjmsDestinationでstartコールを使用して、キューをエンキュー/デキューに対して有効にする必要があります。クライアントは、AQjmsSessionのgetQueueメソッドを使用して事前に作成されたキューに対するハンドルを取得します。
シングル・コンシューマ・キューの場合、メッセージは1つのコンシューマが1度のみ処理できます。同じキューから同時にデキューするプロセスまたはオペレーティング・システム・スレッドが複数存在する場合、各プロセスはキューの先頭にあるロックされていない最初のメッセージをデキューします。ロックを作成したプロセス以外のプロセスは、ロックされたメッセージをデキューできません。
処理が済むと、キューの保存期間が0(ゼロ)の場合はそのメッセージは削除され、そうでない場合は指定された期間保存されます。メッセージが保存されている間は、キュー表ビューに対してSQLを使用して問い合せたり、QueueBrowserで処理済メッセージのメッセージ識別子を指定してデキューできます。
QueueSender
クライアントは、QueueSenderを使用して、キューにメッセージを送信します。これは、クライアントのSessionでキューをcreateSenderメソッドに渡すことによって作成されます。また、クライアントにはキューを指定しないで、QueueSenderを作成するオプションがあります。この場合、キューを送信操作のたびに指定する必要があります。
クライアントは、QueueSenderによって送信されたすべてのメッセージのデフォルト配信モード、優先順位およびTimeToLiveを指定できます。または、クライアントはこれらのオプションを各メッセージに対して定義できます。
QueueReceiver
クライアントはQueueReceiverを使用してキューからメッセージを受信します。これは、クライアントのSessionでcreateQueueReceiverを使用して作成されます。messageSelectorがあってもなくても作成できます。
QueueBrowser
QueueBrowserを使用すると、クライアントはメッセージを削除しないで、キュー上でメッセージを参照できます。このブラウズ用メソッドは、キュー内のメッセージをスキャンするために使用されるjava.util.Enumerationを戻します。nextElementに対する最初のコールが、キューのスナップショットを取得します。QueueBrowserは、messageSelectorを使用して作成しても、使用せずに作成してもかまいません。
QueueBrowserも、メッセージをスキャン中にオプションでロックできます。これは、メッセージに対するSELECT...for UPDATEコマンドの場合と似ています。これによって、他のコンシューマがスキャン中のメッセージを削除することはなくなります。
MessageSelector
messageSelectorによって、クライアントは、コンシューマに配信されるメッセージをmessageSelector式と一致するメッセージに制限できるようになります。TextMessage型、StreamMessage型、BytesMessage型、ObjectMessage型またはMapMessage型のペイロードを含むキューのmessageSelectorには、次の1つ以上を持つ任意の式を含めることができます。
-
接頭辞「ID:」が付いたJMSメッセージ識別子
JMSMessageID ='ID:23452345'
-
JMSメッセージ・ヘッダー・フィールドまたはプロパティ
JMSPriority < 3 AND JMSCorrelationID = 'Fiction' JMSCorrelationID LIKE 'RE%'
-
ユーザー定義のメッセージ・プロパティ
color IN ('RED', BLUE', 'GREEN') AND price < 30000
AdtMessage型のペイロードを含むキューのmessageSelectorには、次の1つ以上を持つ任意の式を含めることができます。
-
接頭辞「ID:」なしのメッセージ識別子
msgid = '23434556566767676'
-
優先順位または相関識別子、あるいはその両方
priority < 3 AND corrid = 'Fiction'
-
メッセージ・ペイロード
tab.user_data.color = 'GREEN' AND tab.user_data.price < 30000
JMSパブリッシュ・サブスクライブ・モデル機能
この項の内容は次のとおりです。
JMSパブリッシュ・サブスクライブの概要
JMSでは、パブリッシャ(出版者)の機能を持つアプリケーションとサブスクライバ(購読者)の役割を果すアプリケーションとの間の柔軟で動的な通信が可能です。アプリケーションが結合されることはなく、メッセージとその内容に基づいて相互に作用します。
メッセージの配信では、パブリッシャ・アプリケーションが明示的にメッセージ受信者を処理または管理する必要はありません。このため、パブリッシャ・アプリケーションの論理を変更しなくても、新しいサブスクライバ・アプリケーションを動的に追加できます。
同様にサブスクライバ・アプリケーションは、メッセージを送信しているパブリッシャ・アプリケーションに関係なく、メッセージの内容に基づいてメッセージを受信します。このため、サブスクライバ・アプリケーションの論理を変更しなくても、新しいパブリッシャ・アプリケーションを動的に追加できます。
サブスクライバ・アプリケーションは、メッセージ・プロパティまたはトピックのメッセージ内容に対してルールベースのサブスクリプション(予約購読)を定義することで、どのようなメッセージに関心があるのかを指定できます。システムは、ルールベースのサブスクリプションを使用して、パブリッシュされたメッセージの受信者を計算し、自動的にルーティングします。
パブリッシュ・サブスクライブ・モデルでは、メッセージはトピックに対してパブリッシュされ、トピックから受信されます。トピックは、AQjmsSessionのCreateTopic()メソッドを使用して作成されます。クライアントは、AQjmsSessionのgetTopic()メソッドを使用して事前に作成されたトピックに対するハンドルを取得できます。
DurableSubscriber
クライアントはDurableSubscriberを、クライアントのSession内でcreateDurableSubscriber()メソッドを使用して作成します。messageSelectorがあってもなくても作成できます。
messageSelectorによって、クライアントは、サブスクライバに配信されるメッセージをセレクタにマッチするメッセージに制限できます。セレクタ構文の詳細は、Oracle Databaseアドバンスト・キューイングJava APIリファレンスのcreateDurableSubscriberに関する項を参照してください。
永続サブスクライバが同じ名前を使用するときのアクションは、実行時にOracle Java Message Service (Oracle JMS)クライアントに設定されるJava EE準拠モードによって異なります。
非準拠モードでは、同じ名前の2つの永続的なTopicSubscriberオブジェクトが、2つの異なるトピックに対してアクティブになることができます。準拠モードでは、複数の永続サブスクライバが同じ名前を持つことは許可されません。同じトピックに対して作成された2つのサブスクライバが同じ名前を使用する場合、各サブスクライバに使用されるセレクタが異なると、DBMS_AQJMS.ALTER_SUBSCRIBER()の内部コールを使用して、基になるOracle Database Advanced Queuingサブスクリプションが変更されます。
2つのサブスクライバが同じ名前を使用し、2つの異なるトピックに対して作成される場合、同じサブスクリプション名を使用するクライアントがそれぞれサブスクリプション名を作成すると、既存のサブスクリプションは削除され新しいサブスクリプションが作成されます。
2つのサブスクライバが同じ名前を使用し、2つの異なるトピックに対して作成される場合、別のクライアント(最初にそのサブスクリプション名を作成したクライアントではない)が既存のサブスクリプション名を使用すると、サブスクリプションは削除されずにエラーが発生します。サブスクリプションがJMSまたはPL/SQLのどちらで作成されたかが不明であるため、その他のトピックについてのサブスクリプションは削除しないでください。
関連項目:
RemoteSubscriber
リモート・サブスクライバは、createRemoteSubscriberコールを使用して定義されます。リモート・サブスクライバは、リモート・トピックでの特定のコンシューマ、またはリモート・トピックでのすべてのサブスクライバになることができます。
リモート・サブスクライバは、AQjmsAgent構造を使用して定義されます。AQjmsAgentは、名前およびアドレスで構成されます。名前は、リモート・トピックのconsumer_nameを参照します。アドレスは、次のようにしてリモート・トピックを参照します。
schema.topic_name[@dblink]
リモート・トピックで特定のコンシューマに対してメッセージをパブリッシュするには、リモート・トピックでの受信者のsubscription_nameが、AQjmsAgentの名前フィールドに指定されている必要があります。リモート・トピックは、AQjmsAgentのaddressフィールドに指定される必要があります。
リモート・トピックのすべてのサブスクライバに対してメッセージをパブリッシュするには、AQjmsAgentのnameフィールドをNULLに設定する必要があります。リモート・トピックは、AQjmsAgentのaddressフィールドに指定される必要があります。
TopicPublisher
メッセージはTopicPublisherを使用してパブリッシュされます。これは、TopicをcreatePublisherメソッドに渡すことによって作成されます。また、クライアントにはTopicを指定しないで、TopicPublisherを作成するオプションがあります。この場合、Topicをパブリッシュ操作のたびに指定する必要があります。クライアントは、TopicPublisherによって送信されるすべてのメッセージのデフォルト配信モード、優先順位およびTimeToLiveを指定できます。また、各メッセージごとに、これらのオプションを指定することもできます。
Recipient Lists
JMSパブリッシュ・サブスクライブ・モデルでは、クライアントは、トピックのすべてのサブスクライバにメッセージを送信するのではなく、明示的な受信者リストを指定できます。これらの受信者は、トピックの既存のサブスクライバである場合もあれば、そうでない場合もあります。受信者リストは、このメッセージのトピックのサブスクリプション・リストをオーバーライドします。受信者リスト機能は、JMSに対するOracleの拡張です。
TopicReceiver
受信者名が受信者リストに明示的に指定されていても、その受信者がキューのサブスクライバではない場合、その受信者に送信されるメッセージは、TopicReceiverを作成することによって受信できます。サブスクライバ名が指定されていない場合、クライアントがメッセージを受信するには、リモート・サイトで永続サブスクライバを使用する必要があります。TopicReceiverは、JMSに対するOracleの拡張です。
TopicReceiverは、messageSelectorを使用して作成できます。これによって、クライアントは、受信者に配信されるメッセージをセレクタにマッチするメッセージに制限できます。
関連項目:
TopicBrowser
TopicBrowserを使用すると、クライアントはメッセージを削除しないでトピック上でメッセージを参照できます。このブラウズ用メソッドは、トピック・メッセージをスキャンするために使用されるjava.util.Enumerationを戻します。TopicBrowserを作成できるのは、永続サブスクライバのみです。nextElementに対する最初のコールが、トピックのスナップショットを取得します。
TopicBrowserは、メッセージをスキャン中にオプションでロックできます。これは、メッセージに対するSELECT...for UPDATEコマンドの場合と類似しています。これによって、他のコンシューマがスキャン中のメッセージを削除することはなくなります。
TopicBrowserは、messageSelectorを使用して作成できます。これによって、クライアントは、コンシューマに配信されるメッセージをセレクタにマッチするメッセージに制限できるようになります。
TopicBrowserは、パージ機能をサポートしています。これによって、TopicBrowserを使用するクライアントは、トピックの現行のブラウズ操作中に参照されたすべてのメッセージを廃棄できます。パージとは、参照済のすべてのメッセージを破壊的に受信することと同じです(TopicSubscriberを使用して削除した場合と似ています)。
パージでは、メッセージがTopicBrowserのjava.lang.EnumerationのnextElement()操作へのコールを使用してクライアントに戻された場合、そのメッセージは参照済とみなされます。クライアントがまだ参照していないメッセージは、パージ中には廃棄されません。パージ操作は、同じTopicBrowserに対して何度も実行できます。
TopicBrowserの作成に使用したJMS Sessionがコミットされると、パージは正常に実行されます。セッションに対する操作がロールバックされた場合、パージ操作も取り消されます。
JMSメッセージ・プロデューサの機能
メッセージの優先順位および順序付け
メッセージの順序付けは、メッセージがキューまたはトピックから受信される順序を決定します。キューまたはトピックのキュー表の作成時に順序づけの方式が指定されます。現在は、Oracle Database Advanced Queuingでは、メッセージの優先順位とエンキュー時間に基づいた順序づけをサポートしており、これによって順序づけの方法が4種類あります。
-
先入れ先出し(FIFO)
エンキュー時刻が順序付け基準として選択されると、メッセージはエンキュー時刻の順序で受信されます。エンキュー時刻は、メッセージのパブリッシュ/送信時にOracle Database Advanced Queuingによってメッセージに割り当てられます。これはデフォルトの順序付けです。
-
優先順位による順序付け
優先順位による順序付けが選択されると、各メッセージに優先順位が割り当てられます。優先順位は、パブリッシュ/送信時に
メッセージ・プロデューサによってメッセージ・プロパティとして指定できます。メッセージは、割り当てられた優先順位の順序で受信されます。 -
FIFO優先順位
FIFO優先順位による順序付けが選択されると、トピックまたはキューは優先順位による順序付けの場合と同様に機能します。2つのメッセージに同じ優先順位が割り当てられた場合、両者はエンキュー時刻の順に受信されます。
-
エンキュー時刻に続く優先順位による順序付け
同じエンキュー時刻のメッセージは、そのメッセージの優先順位に従って受信されます。2つのメッセージの順序付け基準が同じ場合、受信される順序は予想できません。ただし、1つのセッション中に特定の順序付け基準で生成されたメッセージは、送信された順序で受信されることがOracle Database Advanced Queuingにより保証されます。
永続メッセージに使用できる順序付けスキームはすべて、バッファ済メッセージにも使用できますが、使用できるのは各メッセージ・クラス内のみです。同じセッション内でエンキューまたはパブリッシュされた永続メッセージとバッファ済メッセージ間の順序付けは、現在はサポートされていません。
メッセージ遅延の指定
メッセージをキュー/トピックに対して送信/パブリッシュするときに、遅延を指定できます。遅延は、そのメッセージがメッセージ・コンシューマに対して使用可能になるまでの時間を表します。遅延指定されたメッセージは、遅延の期限が切れるまで待機状態になります。遅延指定は、メッセージ識別子による受信でオーバーライドされます。
遅延は、JMSメッセージ・プロパティに対するOracle Database Advanced Queuingの拡張機能です。Oracle Database Advanced Queuingのバックグラウンド・プロセスのキュー・モニターが起動される必要があります。
メッセージ期限切れの指定
メッセージのプロデューサは、メッセージの期限切れまたはTimeToLiveを指定できます。これによって、そのメッセージがメッセージ・コンシューマに対して使用可能な期間が定義されます。
TimeToLiveは、送信/パブリッシュ時に指定するか、MessageProducerの設定済TimeToLiveメソッドを使用して指定することが可能で、前者が後者よりも優先されます。TimeToLiveを実装するには、Oracle Database Advanced Queuingのバックグラウンド・プロセスのキュー・モニターが実行されている必要があります。
メッセージのグループ化
1つのキューまたはトピックに属するメッセージをグループ化してセットを形成し、一度に1人のユーザーのみによって消費されるように設定できます。そのためには、トランザクション処理によるメッセージのグループ化が可能なキュー表にキューまたはトピックを作成する必要があります。同じグループに属するメッセージはすべて同じトランザクション内に作成する必要があり、同じトランザクション内に作成されたメッセージはすべて同じグループに属します。
メッセージのグループ化は、JMS仕様に対するOracle Database Advanced Queuingの拡張機能です。
この機能を使用すると、複雑なメッセージを、リンクされた一連の単純なメッセージに分割できます。たとえば、請求書キュー宛ての請求書は、ヘッダーのメッセージ、詳細情報の複数のメッセージ、フッターのメッセージの順に分割できます。
小さいオブジェクトに分割できるイメージやビデオなどの複合ラージ・オブジェクトがメッセージ・ペイロードにある場合は、メッセージのグループ化が非常に有効です。
グループに含まれるメッセージの優先順位、遅延および期限切れの各プロパティは、単にグループの最初のメッセージ(ヘッダー)のプロパティによってのみ判断されます。グループの他のメッセージのプロパティは無視されます。
メッセージ・グループは、伝播中も保持されます。宛先トピックは、トランザクション処理のグループ化に対して使用可能にしておく必要があります。
関連項目:
トランザクション処理でグループ化可能なキューからメッセージをデキューするときにメッセージ・グループを保持する場合、注意する必要がある制限については、「デキュー機能」を参照してください。
JMSメッセージ・コンシューマ機能
この項の内容は次のとおりです。
メッセージの受信
JMSアプリケーションは、メッセージ・コンシューマを作成することによって、メッセージを受信できます。メッセージは、receiveコールを使用して同期的に受信するか、メッセージ・リスナーを使用して非同期的に受信できます。
受信モードには次の3つがあります。
-
メッセージがコンシューマに届くまでブロック
-
指定最大時間までブロック
-
非ブロック
受信におけるメッセージのナビゲーション
コンシューマがナビゲーション・モードを指定しない場合、セッションの最初のreceiveはキューまたはトピックの最初のメッセージを取り出し、第2のreceiveは次のメッセージを取得します。優先順位が高いメッセージがコンシューマに届く場合、すでに届いているメッセージをクリアするまで、コンシューマはこのメッセージを受信しません。
コンシューマが、メッセージに対してより効率的にキューのナビゲーションを制御できるように、Oracle Database Advanced QueuingにはJMS拡張機能として複数のナビゲーション・モードが用意されています。これらのモードは、TopicSubscriber、QueueReceiverまたはTopicReceiverで設定できます。
グループ化されていないメッセージには、次の2つのモードを使用できます。
-
FIRST_MESSAGEこのモードでは、位置がキューの先頭にリセットされます。コンシューマがキューの最上位にあるメッセージを削除できるため、優先順位に基づくキューで役立ちます。
-
NEXT_MESSAGEこのモードは、コンシューマの確立された位置の後のメッセージを取得します。たとえば、4番目のメッセージの位置で適用された
NEXT_MESSAGEは、そのキューの5番目のメッセージを取得します。これはデフォルト・アクションです。
グループ化メッセージには、次の3つのモードを使用できます。
-
FIRST_MESSAGEこのモードでは、位置がキューの先頭にリセットされます。
-
NEXT_MESSAGEこのモードでは、位置が同一トランザクションの次のメッセージに設定されます。
-
NEXT_TRANSACTIONこのモードでは、位置が次のトランザクションの最初のメッセージに設定されます。
ノート:
トランザクション・イベント・キューは、前述の3つのモードをサポートしていません。
次の方法でメッセージが受信される場合、グループ化トランザクションのプロパティを無効にできます。
-
セレクタに相関識別子を指定して受信
-
セレクタにメッセージ識別子を指定して受信
-
トランザクション・グループのメッセージがすべて受信される前にコミット
NEXT_MESSAGEまたはNEXT_TRANSACTIONオプションの使用中にコンシューマがキューの最後に到達したとします。ブロッキングreceive()を指定していた場合は、ナビゲート位置は自動的にそのキューの先頭に変更されます。
デフォルトでは、QueueReceiver、TopicReceiverまたはTopicSubscriberは、最初のreceiveコールにFIRST_MESSAGEを、以降のreceive()コールにNEXT_MESSAGEを使用します。
メッセージのブラウズ
デキューするクライアントがキューからメッセージを削除できる通常のreceiveの他に、JMSでは、JMSクライアントがキューで自身のメッセージをブラウズできるようにするインタフェースを提供しています。QueueBrowserは、QueueSessionからcreateBrowserメソッドを使用して作成できます。
メッセージが参照されると、そのメッセージは引き続き処理できます。メッセージがブラウズされた後は、同時セッションからreceiveコールがそのメッセージを削除する場合があるため、JMSセッションに引き続き使用できるとはかぎりません。
一度参照したメッセージが同時JMSクライアントによって削除されないようにするために、ロック・モードでメッセージを参照できます。そのためには、JMSインタフェースに対するOracle Database Advanced Queuingの拡張機能を使用して、ロック・モードを持つQueueBrowserを作成する必要があります。メッセージのロックは、セッションがコミットまたはロールバックを実行すると解放されます。
QueueBrowserによって参照されたメッセージを削除するには、セッションがQueueReceiverを作成し、JMSmesssageIDをセレクタとして使用する必要があります。
取出しを伴わないメッセージの削除
コンシューマは、receiveNoDataコールを使用してキューまたはトピックからメッセージを取得することなく、メッセージを削除できます。アプリケーションがQueueBrowserなどを使用してメッセージをすでに確認した場合に役立ちます。このモードを使用すると、JMSクライアントがペイロードをデータベースから取得するオーバーヘッドを回避できます(メッセージが大きい場合、オーバーヘッドが顕著である可能性があります)。
遅延間隔をおいた後の再試行
キュー/トピックからメッセージを受信するトランザクションが失敗した場合、そのメッセージを削除する試行に失敗したとみなされます。Oracle Database Advanced Queuingは、メッセージ削除の試行に失敗した回数をメッセージ履歴に記録します。
アプリケーションでは、メッセージに対する再試行の最大回数をキュー/トピック・レベルで指定できます。メッセージ削除がこの数より多く失敗した場合、メッセージは例外キューに移動されます。
Oracle Database Advanced Queuingでは、ユーザーはmax_retriesとともにretry_delayも指定できます。これは、受信の試行に失敗したメッセージを、retry_delay間隔後に引き続きキューで参照し、デキューできることを意味します。それまでこのメッセージはWAITING状態になります。Oracle Database Advanced Queuingのバックグラウンド・プロセスのタイム・マネージャは、再試行遅延プロパティを強制的に適用します。
再試行の最大回数および再試行の遅延は、キュー/トピックのプロパティです。このプロパティは、キュー/トピックの作成時、またはキュー/トピックに対する変更メソッドを使用して設定できます。MAX_RETRIESのデフォルト値は5です。
ノート:
トランザクション・イベント・キューは、再試行の遅延をサポートしていません。
MessageListenerを使用したメッセージの非同期受信
JMSクライアントは、setMessageListenerメソッドを使用してMessageListenerを設定することによって、メッセージを非同期的に受信できます。
コンシューマにメッセージが届いた場合、メッセージ・リスナーのonMessageメソッドがそのメッセージで起動されます。メッセージ・リスナーは、メッセージの受信をコミットまたは異常終了できます。メッセージ・リスナーは、JMS Connectionが停止されている場合、メッセージを受信しません。一度メッセージ・リスナーがコンシューマに対して設定されると、メッセージの受信にreceiveコールを使用することはできません。
JMSクライアントは、セッションでMessageListenerを設定することによって、そのセッションのすべてのコンシューマに対してメッセージを非同期的に受信できます。一度メッセージ・リスナーが設定されると、そのセッションでは、その他のメッセージ受信モードを使用できません。
例外キュー
例外キューは、すべての期限切れのメッセージまたは使用できないメッセージが格納されるリポジトリです。アプリケーションがメッセージを例外キューに直接エンキューすることはできません。ただし、期限切れまたは処理できないメッセージを処理するアプリケーションは、例外キューからこれらのメッセージを受信または削除できます。
例外キューからメッセージを取り出すには、JMSクライアントはPoint-to-Pointインタフェースを使用する必要があります。トピック用のメッセージの例外キューは、使用可能な複数のコンシューマでキュー表に作成する必要があります。他のキューと同様に、例外キューもAQOracleQueueクラスでstartメソッドを使用して、メッセージを受信できる必要があります。例外キューをエンキュー可能に設定しようとすると、例外が発生します。
トランザクション・イベント・キュー(TxEventQ)は、DBMS_AQADM.CREATE_EQ_EXCEPTION_QUEUE APIによって例外キューをサポートしています。
PROCEDURE CREATE_EQ_EXCEPTION_QUEUE(
queue_name IN VARCHAR2,
exception_queue_name IN VARCHAR2 DEFAULT NULL,
multiple_consumers IN BOOLEAN DEFAULT FALSE,
storage_clause IN VARCHAR2 DEFAULT NULL,
sort_list IN VARCHAR DEFAULT NULL,
comment IN VARCHAR2 DEFAULT NULL
);例外キューは、"JMS_OracleExcpQ"と呼ばれるOracle固有のメッセージ・プロパティで、メッセージの送信/パブリッシュ前に設定できます。例外キューが指定されていないと、デフォルトの例外キューが使用されます。AQキューの場合、デフォルトの例外キューは、キュー表の作成時にAQ$_queue_table_name_Eという名前で自動的に作成されます。デフォルトでは、TxEventQには例外キューは作成されません。
メッセージは、次の条件が成立するときに例外キューに移されます。
-
そのメッセージが、指定された
timeToLive内にデキューされなかった場合。複数のサブスクライバを指定したメッセージの場合、指定された
timeToLive内にそのメッセージをデキューできない受信者が1つ以上あると、メッセージは例外キューに移されます。 -
メッセージが正常に受信されたが、メッセージ処理中のエラーのためアプリケーションが
receiveを実行したトランザクションを異常終了した場合。そのメッセージはキュー/トピックに戻され、メッセージ受信のために待機中のどのアプリケーションでも使用可能になります。アプリケーションがトランザクション全体を異常終了するか、
receive前のセーブポイントまでロールバックしたときは、receiveがロールバックまたはUNDOされたとみなされます。これは失敗したメッセージ受信の試行であるため、その再試行回数は更新されます。メッセージの再試行回数が、メッセージが常駐するキュー/トピックに指定された最大値を超える場合、そのメッセージは例外キューに移されます。
メッセージが複数のサブスクライバを持つ場合、そのメッセージは、すべての受信者が再試行制限を超えたときにのみ、例外キューに移されます。
ノート:
サーバー・プロセスがインスタンスで停止した(ALTER SYSTEM KILL SESSIONまたはSHUTDOWN ABORTなど)ためにデキュー・トランザクションが失敗した場合、RETRY_COUNTは増分されません。
JMS伝播
この項の内容は次のとおりです。
ノート:
TxEventQキューは、RemoteSubscriber、伝播のスケジュール、拡張伝播スケジュール機能および伝播中の例外処理をサポートしていません。
RemoteSubscriber
Oracle Database Advanced Queuingによって、他のデータベースにあるサブスクライバをトピックにサブスクライブできます。トピックに対してパブリッシュされたメッセージがリモート・サブスクライバの基準を満たしている場合は、リモート・サブスクライバに指定されているリモート・データベースにあるキュー/トピックに自動的に伝播します。伝播は、データベース・リンクおよびOracle Net Servicesを使用して実行されます。これによって、同じデータベースに接続しなくても、アプリケーション同士が互いに通信できます。
リモート・サブスクライバを実装するには、次の2つの方法があります。
-
createRemoteSubscriberメソッドは、トピック上またはトピックに対してリモート・サブスクライバを作成するために使用します。このリモート・サブスクライバは、クラスAQjmsAgentのインスタンスとして指定されます。 -
AQjmsAgentには、名前およびアドレスがあります。アドレスは、キュー/トピックおよびサブスクライバのデータベースへのデータベース・リンクで構成されています。
リモート・サブスクライバには、次の2種類があります。
-
リモート・サブスクライバがトピックである場合。
これは、
AQjmsAgentオブジェクトのリモート・サブスクライバに名前が指定されず、アドレスがトピックである場合に発生します。サブスクライバのサブスクリプションを満たすメッセージが、リモート・トピックに伝播されます。伝播されたメッセージは、それが満たすリモート・トピックのすべてのサブスクリプションに対して使用可能になります。 -
メッセージに対して特定のリモート受信者を指定する場合。
リモート・サブスクリプションは、リモート・データベースにある特定のコンシューマに対して指定できます。リモート受信者の名前が(
AQjmsAgentオブジェクトに)指定される場合、サブスクリプションを満たすメッセージが、その受信者専用のリモート・データベースに伝播されます。リモート・データベースにある受信者は、TopicReceiverインタフェースを使用してメッセージを取り出します。リモート・サブスクリプションは、Point-to-Pointキューに対して指定することもできます。
伝播のスケジュール
伝播は、メッセージがターゲットの接続先データベースに伝播されるすべてのトピックについて、schedule_propagationメソッドを使用してスケジューリングされる必要があります。
スケジュールは時間の枠を示し、メッセージはその枠内でソース・トピックから伝播されます。この時間枠は、ネットワーク通信量、ソース・データベースの負荷、接続先データベースの負荷などの複数の要因に左右されます。したがって、スケジュールは特定のソースおよび宛先にあわせて調整する必要があります。スケジュールが作成されると、ジョブは自動的にjob_queue機能に発行され、伝播が処理されます。
伝播スケジュールのための運用管理コールによって、スケジュール管理を柔軟に行うことができます。あるスケジュールの存続時間または伝播枠パラメータによって、伝播が開始される時間枠が指定されます。存続時間が指定されない場合、時間枠は無制限の単一枠になります。枠を定期的に繰り返す必要がある場合、連続する枠の間の周期的間隔を定義するnext_time機能を使用して有限の存続時間を指定します。
あるキューに定義された伝播スケジュールは、そのキューの有効期間中いつでも変更または削除できます。さらに、(スケジュールを削除するかわりに)一時的に使用不可にするコール、および使用不可のスケジュールを使用可能にするコールがあります。メッセージがスケジュール内で伝播されているとき、そのスケジュールはアクティブです。すべての管理コールは、スケジュールがアクティブかどうかに関係なく実行されます。スケジュールがアクティブの場合、コールが実行されるまでに数秒かかります。
伝播が開始されるには、ジョブ・キュー・プロセスを起動する必要があります。少なくとも2つのジョブ・キュー・プロセスを起動する必要があります。接続先データベースへのデータベース・リンクも有効にする必要があります。伝播のソースおよび宛先トピックは、同じメッセージ型である必要があります。リモート・トピックは、エンキューできる必要があります。データベース・リンクのユーザーもリモート・トピックに対するエンキュー権限を持つ必要があります。
関連項目:
拡張伝播スケジュール機能
伝播のために定義されたカタログ・ビューは、アクティブ・スケジュールに関する次の情報を提供します。
-
そのスケジュールを処理しているバックグラウンド・プロセスの名前
-
伝播を処理しているセッションのSID(セッションおよびシリアル番号)
-
スケジュールを処理しているインスタンス(Oracle RACを使用している場合)
-
先行して正常に実行されたスケジュール
-
次に実行予定のスケジュール
スケジュールごとに次の伝播統計が保持され、キュー管理者がスケジュール調整に役立てることができます。
-
スケジュールの中で伝播されたメッセージ合計数
-
スケジュールの中で伝播されたバイト合計数
-
伝播枠の中で伝播されたメッセージの最大数
-
伝播枠の中で伝播されたバイトの最大値
-
伝播枠の中で伝播されたメッセージの平均数
-
伝播済メッセージの平均サイズ
-
伝播済メッセージの平均時間
伝播機能には、障害対処およびエラー・レポートが組み込まれています。たとえば、指定されたデータベース・リンクが無効な場合、リモート・データベースが使用できない場合、またはリモート・トピック/キューにエンキューできない場合、適切なエラー・メッセージがレポートされます。伝播は指数バックオフ・スキームを使用して、障害が発生したスケジュールからの伝播を再試行します。あるスケジュールで続けて障害が発生したときは、最初の再試行は30秒後、次の再試行は60秒後、3回目の再試行は120秒後、というように続きます。再試行時間が現行の伝播枠の期限切れ時刻を超える場合は、次の再試行は、次の伝播枠の開始時刻に行われます。最大16回の再試行が行われた後、そのスケジュールは自動的に使用不可能になります。
ノート:
再試行が次の伝播ウィンドウに移動されると、常に移動されるようになり、指数バックオフ・スキームは再試行のスケジュールを管理しません。DBMS_AQADM.SCHEDULE_PROPAGATION()のnext_timeパラメータで指定された日付関数の結果、ウィンドウ間の間隔が短くなると、再試行の失敗数はすぐに16に達し、スケジュールが無効になります。
障害のためにスケジュールが自動的に使用不可になると、関連情報がアラート・ログに書き込まれます。スケジュールで失敗が発生したか、その場合は何回連続して失敗が発生したか、失敗の原因を示すエラー・メッセージ、および最後の失敗が発生した時刻を、いつでも確認できます。この情報を調べることで、管理者は障害を回復し、スケジュールを使用可能にできます。
再試行の間に伝播が成功したときは、障害の数は0(ゼロ)にリセットされます。
伝播機能にはOracle Real Application Clustersサポートが組み込まれていますが、ユーザーおよび管理者には透過的です。伝播を処理するジョブは、ソース・トピックが常駐しているキュー表の所有者と同じインスタンスに送られます。あるインスタンスに障害が発生してトピックを保存しているキュー表が他のインスタンスに移される場合は、伝播ジョブも必ず自動的に新しいインスタンスに移行されます。これによって、インスタンス間のping操作は最小限に抑えられ、パフォーマンスが向上します。伝播は、同時スケジュールをいくつでも処理できるように設計されています。
job_queue_processesの最大数は1000で、その一部は伝播に関連しないジョブの処理に使用できます。このために、伝播にはマルチタスキングおよびロード・バランシングのサポートが組み込まれています。伝播アルゴリズムは、複数スケジュールが単一スナップショット(job_queue)のプロセスによって処理できるように設計されています。job_queueプロセスに対する伝播の負荷は、異なるソース・トピックからのメッセージ到着の割合に基づいて偏りが発生する場合があります。あるプロセスが数個のアクティブ・スケジュールによって過負荷になっている一方で、別のプロセスは受動的なスケジュールが多いために余力があるというとき、伝播はプロセス間で負荷が均等になるようにスケジュールを自動的に再分配します。
伝播中の例外処理
ネットワーク障害のようなシステム・エラーが発生した場合、Oracle Database Advanced Queuingは指数バックオフ・アルゴリズムを使用してメッセージを伝播する試みを継続します。キューからデータベース・リンクへの伝播中のアプリケーション・エラーを示している状況では、Oracle Database Advanced QueuingはそのメッセージにUNDELIVERABLEというマークを付けてalert.logに記録します。このようなエラーは、リモート・キューが存在しない場合、またはソース・キューおよびリモート・キューの型が一致しない場合に発生します。background_dump_destディレクトリのトレース・ファイルには、そのエラーに関する追加情報があります。
新規のジョブ・キュー・プロセスが開始すると、型を再検証できるように型の不一致エラーをクリアします。ジョブ・キュー・プロセス数に上限を設定して、伝播のビジー状態が続く場合、ジョブ・キュー・プロセスが終了して再開するまで待つ必要はありません。キューの型は、必要に応じてDBMS_AQADM.VERIFY_QUEUE_TYPESを使用して再検証できます。
ノート:
キューからキューへの伝播中に型の不一致が検出されると、伝播は停止してエラーが発生します。このような場合は、DBA_SCHEDULESビューを問い合せて、特定の宛先への伝播中に発生した最後のエラーを判断する必要があります。このメッセージには、UNDELIVERABLEマークは付いていません。
JMS AQのメッセージ変換
あるフォーマットのメッセージを別のフォーマットのメッセージにマップするために変換を定義できます。変換は、同一の情報を異なるフォーマットで表現するアプリケーションを統合する必要がある場合に有効です。変換はSQL式およびPL/SQLファンクションです。メッセージ変換は、標準JMSインタフェースに対するOracle Database Advanced Queuingの拡張機能です。
変換は、DBMS_TRANSFORM.create_transformationプロシージャを使用して作成できます。変換は、次の操作を行う場合に指定できます。
-
キューまたはトピックへのメッセージの送信
-
キューまたはトピックからのメッセージの受信。
-
TopicSubscriberの作成 -
RemoteSubscriberの作成。これによって、異なるフォーマットのトピック間でメッセージを伝播できます。
ノート:
TxEventQは、メッセージ変換をサポートしていません。
JMSストリーミング
AQ JMSは、大量のメッセージ・データまたはペイロードを送受信するアプリケーション用に、AQjmsBytesMessageおよびAQjmsStreamMessageによるTxEventQのエンキューおよびデキューでのストリーミングをサポートします。
JMSストリーミングは、大きい連続したバイトの配列を送受信するのではなく、メッセージ・ペイロードを小さいチャンクに分割することにより、大きいメッセージを扱うときのメモリー要件を緩和します。JMS標準にストリーミング・メカニズムは含まれないため、AQ JMSはAQストリーミング・エンキューおよびデキュー機能を示すための固有のインタフェースを提供します。これにより、ユーザーは既存のjava入出力ストリームを使用してメッセージ・データまたはペイロードを簡単に送受信できます。
データベースのRDBMS 12.2へのアップグレード時に既存のアプリケーションが変更なしで動作できるように、デフォルトではストリーミングAPIは無効化されます。
クライアント・アプリケーションはシステム・プロパティoracle.jms.useJmsStreamingをtrueに設定して使用することにより、JMSストリーミングを有効化できます。
ノート:
JMSストリーミングはThinドライバの場合のみサポートされます。
エンキューでのJMSストリーミング
AQ JMSではAQjmsBytesMessageおよびAQjmsStreamMessageの新しいAPI setInputStream(java.io.InputStream)が提供され、メッセージ・データ用の入力ストリームを設定します。
/**
* @param inputStream - InputStream to read the message payload
* @throws JMSException - if the JMS provided fails to read the payload due to
* some internal error
*/
public void setInputStream(InputStream inputStream) throws JMSException
次のコード・スニペットでは、AQjmsBytesMessageタイプのメッセージが作成され、メッセージ・データ用のFileInputStreamが設定されます。
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Destination destination = session.createQueue("queueName");
MessageProducer producer = session.createProducer(destination);
AQjmsBytesMessage bytesMessage = (AQjmsBytesMessage)session.createBytesMessage();
InputStream input = new FileInputStream("somefile.data");
bytesMessage.setInputStream(input);
producer.send(bytesMessage);
ノート:
-
BytesMessageおよびStreamMessageのメソッドはjava.io.DataInputStreamおよびjava.io.DataOutputStreamで検出されるメソッドに基づいているため、様々なread*()およびwrite*()メソッドの意味のある変換はストリームでは不可能です。次の使用例では例外が発生します。-
bytesMessage.setInputStream(input);bytesMessage.writeInt(99); -
bytesMessage.writeInt(99);bytesMessage.setInputStream(input);
-
-
通常のエンキュー操作と同様に、ストリーミングを使用したエンキューは同期操作となり、エンキューが完了した後でのみ、制御がクライアントに戻されます。
-
これらのAPIが明示的にクライアントにより使用される場合にのみ、ストリーミングがエンキューで使用されます。AQ JMSはメッセージ・データのサイズと関係なく、通常のエンキューではストリーミングを使用しません。
デキューでのJMSストリーミング
ストリーミングを使用したデキュー操作は、2つのステップで実行されます。サーバーでは、メッセージ本文のサイズに基づき、メッセージ本文をストリーミングするかどうかが決定されます。デフォルトのしきい値制限は10MBです。そのため、メッセージ本文が10MBよりも大きく、システム・プロパティoracle.jms.useJmsStreamingを使用して、ストリーミングがクライアントで有効な場合、サーバーはデキューでストリーミングを使用します。
-
これはクライアントが
receive()メソッドを呼び出す場合の通常のデキュー・プロセスです。Destination destination = session.createQueue ("queueName"); AQjmsConsumer consumer = (AQjmsConsumer) session.createConsumer(destination); Message message = consumer.receive(10000); -
クライアントがペイロードなしのメッセージを受信する場合、クライアントは受信メッセージでの
isLargeBody()の呼び出しにより、ストリーミングがデキューで使用されるかどうかを特定します。/** * This method can be used by the client applications to check whether the message * contains large messaege body and hence requires streaming with dequeue. * * @return true when the message body is large and server decides to stream * the payload with dequeue */ public boolean isLargeBody()
isLargeBody()により戻されたtrueの値は、デキューでのストリーミングを示します。デキューでストリーミングを使用する場合、AQ JMSはAQjmsStreamMessageとAQjmsBytesMessageに対して正確にメッセージ本文の長さを移入します。そのため、クライアント・アプリケーションはメッセージでgetBodyLength()を呼び出し、ペイロードのサイズを判断できます。public long getBodyLength()
クライアントがデキューでのストリーミングについて理解したら、受信メッセージで次のAPIのいずれかを使用してメッセージ・データをフェッチできます。
クライアント・アプリケーションは、AQjmsBytesMessageおよびAQjmsStreamMessageで使用可能な次のAPIを使用して、メッセージ・データを受信できます。
/**
* Writes the message body to the OutputStream specified.
*
* @param outputStream - the OutputStream to which message body can be written
* @return the OutputStream containing the message body.
* @throws JMSException - if the JMS provided fails to receive the message body
* due to some internal error
*/
public OutputStream getBody(OutputStream outputStream) throws JMSException
/**
* Writes the message body to the OutputStream specified, with chunkSize bytes
* written at a time.
*
* @param outputStream - the OutputStream to which message body can be written
* @param chunkSize - the number of bytes to be written at a time, default value
* 8192 (ie. 8KB)
* @return the OutputStream containing the message body.
* @throws JMSException - if the JMS provided fails to receive the message body
* due to some internal error
*/
public OutputStream getBody(OutputStream outputStream, int chunkSize)throws JMSException
/**
* Writes the message body to the OutputStream specified. This method waits until
* the message body is written completely to the OutputStream or the timeout expires.
*
* A timeout of zero never expires, and a timeout of negative value is ignored.
*
* @param outputStream - the OutputStream to which message body can be written
* @param timeout - the timeout value (in milliseconds)
* @return the OutputStream containing the message body.
* @throws JMSException - if the JMS provided fails to receive the message body
* due to some internal error
*/
public OutputStream getBody(OutputStream outputStream, long timeout) throws JMSException
/**
* Writes the message body to the OutputStream specified, chunkSize bytes at a time.
* This method waits until the message body is written completely to the OutputStream
* or the timeout expires.
*
* A timeout of zero never expires, and a timeout of negative value is ignored.
*
* @param outputStream - the OutputStream to which message body can be written
* @param chunkSize - the number of bytes to be written at a time,
* default value 8192 (ie. 8KB)
* @param timeout - the timeout value (in milliseconds)
* @return the OutputStream containing the message body.
* @throws JMSException - if the JMS provided fails to receive the message body
* due to some internal error
*/
public OutputStream getBody(OutputStream outputStream, int chunkSize, long timeout) throws JMSException
次のコード・スニペットは、ストリーミングがデキューで使用され、受信したペイロードがFileOutputStreamに書き込まれるかどうかをチェックします。
if (message instanceof BytesMessage && (AQjmsBytesMessage)message.isLargeBody()){
// optional : check the size of the payload and take appropriate action before
// receiving the payload.
(AQjmsBytesMessage) message.getBody(new FileOutputStream(new File("…")));
} else {
// normal dequeue
}
通常、両方のステップが完了すると、メッセージは完全に消費されたとみなされます。AQサーバーにより、ステップ1の後でメッセージでのロックは保持され、ステップ2の後でのみ解除されます。
メッセージ・コンシューマによりメッセージが部分的に消費されることにより発生する可能性のある問題を考慮して、確認モードCLIENT_ACKNOWLEDGEおよびSESSION_TRANSACTEDでのセッション用のストリーミングAPIは制限されました。
そのため、部分的に消費されたメッセージを含む、すべてのメッセージは、次の時点で完全に消費されたとみなされます。
-
message.acknowledge()はCLIENT_ACKNOWLEDGEセッションで呼び出されます。 -
セッションの
commit()はトランザクション・セッションで呼び出されます。
通常の場合、セッションrollback()は、そのセッションで受信したメッセージをロールバックします。
JMSストリーミングは使用可能ですが、次の制限があります。
-
ストリーミングはデフォルトで無効化されており、システム・プロパティ
oracle.jms.useJmsStreamingを使用してクライアント・アプリケーションにより有効化できる -
メッセージ・データのサイズがしきい値よりも大きい場合に、デキューでストリーミングが使用されます。デフォルトのしきい値は10MBです。
-
ストリーミング・サポートは
AQjmsBytesMessageおよびAQjmsStreamMessageで使用可能 -
ストリーミング・サポートはTxEventQキューでのみ使用可能
-
ストリーミング・サポートはThinドライバでのみ使用可能
-
ストリーミング・サポートは、メッセージ・プロデューサがメッセージ配信モードに
NON_PERSISTENTを使用しているときは使用できない -
ストリーミングはメッセージ・リスナーではサポートされません。そのため、MessageConsumerにメッセージ・リスナー・セットがある場合、およびメッセージ・データがしきい値制限を超えている場合は、内部的に通常のデキューが使用されます。
-
ストリーミング・サポートは、確認モード
CLIENT_ACKNOWLEDGEおよびSESSION_TRANSACTEDを使用してセッションで使用可能です。
Java EEの準拠
Oracle JMSは、Oracle Sun Microsystems JMS 1.1標準に準拠しています。実行時に、Oracle Java Message Service (Oracle JMS)クライアントに対してJava EE準拠モードを定義できます。準拠モードにするには、コマンドライン・オプションとしてJavaプロパティ oracle.jms.j2eeCompliantをTRUEに設定します。非準拠モードにする場合は何もしません。FALSEがデフォルト値です。
Java EE準拠をサポートし、非準拠モードでも使用できるOracle Database Advanced Queuingの機能は次のとおりです。
-
非トランザクション・セッション
-
永続サブスクライバ
-
一時キューおよびトピック
-
非永続配信モード
-
AQ$_JMS_MESSAGE型のOracle Database Advanced Queuingキューを使用した単一のJMSキューまたはトピックに対する複数のJMSメッセージ・タイプ -
永続サブスクライバに対する
noLocalオプション -
TxEventQはネイティブにJMSをサポートしていて、Java EEコンプライアンスに準拠しています
関連項目:
-
『Java Message Service Specification』、バージョン1.1、2002年3月18日、Sun Microsystems, Inc.
-
JMSPriorityおよびJMSExpirationに対するJavaプロパティ
oracle.jms.j2eeCompliantの影響の詳細は、「JMSメッセージ・ヘッダー」を参照してください。 -
永続サブスクライバに対するJavaプロパティ
oracle.jms.j2eeCompliantの影響の詳細は、「DurableSubscriber」を参照してください。
Oracle Java Message Serviceの基本操作
次のトピックでは、Oracle Database Advanced Queuing (AQ)の基本操作のためのJava Message Service (JMS)管理インタフェースについて説明します。
DBMS_AQINに対するEXECUTE権限
ユーザーがDBMS_AQINパッケージ内のメソッドを直接コールすることはありませんが、DBMS_AQINに対するEXECUTE権限が必要です。次の構文を使用して実行します。
GRANT EXECUTE ON DBMS_AQIN to user; ConnectionFactoryの登録
ConnectionFactoryは、次の4つの方法で登録できます。
データベースを介した登録: JDBCコネクション・パラメータの使用
public static int registerConnectionFactory(java.sql.Connection connection,
java.lang.String conn_name,
java.lang.String hostname,
java.lang.String oracle_sid,
int portno,
java.lang.String driver,
java.lang.String type)
throws JMSException
このメソッドは、JDBCコネクション・パラメータを使用し、データベースを介してQueueConnectionFactoryまたはTopicConnectionFactoryをLightweight Directory Access Protocol(LDAP)サーバーに登録します。このメソッドは静的であり、次のパラメータを取ります。
| パラメータ | 説明 |
|---|---|
|
|
登録に使用されるJDBCコネクション |
|
|
登録されるコネクションの名前 |
|
|
Oracle Database Advanced Queuingを実行しているホストの名前 |
|
|
Oracleシステム識別子 |
|
|
ポート番号 |
|
|
JDBCドライバの型 |
|
|
コネクション・ファクトリのタイプ( |
registerConnectionFactoryに渡されるデータベース接続には、AQ_ADMINISTRATOR_ROLEを付与する必要があります。登録後は、Java Naming and Directory Interface(JNDI)を使用してコネクション・ファクトリを検索できます。
例6-1 データベースを介した登録: JDBCコネクション・パラメータの使用
String url; java.sql.connection db_conn; url = "jdbc:oracle:thin:@sun-123:1521:db1"; db_conn = DriverManager.getConnection(url, "scott", "tiger"); AQjmsFactory.registerConnectionFactory( db_conn, "queue_conn1", "sun-123", "db1", 1521, "thin", "queue");
データベースを介した登録: JDBC URLの使用
public static int registerConnectionFactory(java.sql.Connection connection,
java.lang.String conn_name,
java.lang.String jdbc_url,
java.util.Properties info,
java.lang.String type)
throws JMSException
このメソッドは、JDBC URLを使用し、データベースを介してQueueConnectionFactoryまたはTopicConnectionFactoryをLDAPに登録します。これは静的であり、次のパラメータを取ります。
| パラメータ | 説明 |
|---|---|
|
|
登録に使用されるJDBCコネクション |
|
|
登録されるコネクションの名前 |
|
|
接続先のURL |
|
|
プロパティの情報 |
|
|
ポート番号 |
|
|
コネクション・ファクトリのタイプ( |
registerConnectionFactoryに渡されるデータベース接続には、AQ_ADMINISTRATOR_ROLEを付与する必要があります。登録後、JNDIを使用してコネクション・ファクトリを検索できます。
例6-2 データベースを介した登録: JDBC URLの使用
String url; java.sql.connection db_conn; url = "jdbc:oracle:thin:@sun-123:1521:db1"; db_conn = DriverManager.getConnection(url, "scott", "tiger"); AQjmsFactory.registerConnectionFactory( db_conn, "topic_conn1", url, null, "topic");
LDAPを介した登録: JDBCコネクション・パラメータの使用
public static int registerConnectionFactory(java.util.Hashtable env,
java.lang.String conn_name,
java.lang.String hostname,
java.lang.String oracle_sid,
int portno,
java.lang.String driver,
java.lang.String type)
throws JMSException
このメソッドは、JDBCコネクション・パラメータを使用し、LDAPを介してQueueConnectionFactoryまたはTopicConnectionFactoryをLDAPに登録します。これは静的であり、次のパラメータを取ります。
| パラメータ | 説明 |
|---|---|
|
|
LDAPコネクションの環境 |
|
|
登録されるコネクションの名前 |
|
|
Oracle Database Advanced Queuingを実行しているホストの名前 |
|
|
Oracleシステム識別子 |
|
|
ポート番号 |
|
|
JDBCドライバの型 |
|
|
コネクション・ファクトリのタイプ( |
registerConnectionFactory()に渡されるハッシュ表に、LDAPサーバーと使用可能なコネクションを確立するための情報が含まれている必要があります。さらに、このコネクションには、LDAPサーバー内のコネクション・ファクトリのエントリに対する書込み権限が必要です(LDAPユーザーがデータベースそのものであるか、またはLDAPユーザーにGLOBAL_AQ_USER_ROLEが付与されている必要があります)。登録後、JNDIを使用してコネクション・ファクトリを検索します。
例6-3 LDAPを介した登録: JDBCコネクション・パラメータの使用
Hashtable env = new Hashtable(5, 0.75f);
/* the following statements set in hashtable env:
* service provider package
* the URL of the ldap server
* the distinguished name of the database server
* the authentication method (simple)
* the LDAP username
* the LDAP user password
*/
env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory");
env.put(Context.PROVIDER_URL, "ldap://sun-456:389");
env.put("searchbase", "cn=db1,cn=Oraclecontext,cn=acme,cn=com");
env.put(Context.SECURITY_AUTHENTICATION, "simple");
env.put(Context.SECURITY_PRINCIPAL, "cn=db1aqadmin,cn=acme,cn=com");
env.put(Context.SECURITY_CREDENTIALS, "welcome");
AQjmsFactory.registerConnectionFactory(env,
"queue_conn1",
"sun-123",
"db1",
1521,
"thin",
"queue");LDAPを介した登録: JDBC URLの使用
public static int registerConnectionFactory(java.util.Hashtable env,
java.lang.String conn_name,
java.lang.String jdbc_url,
java.util.Properties info,
java.lang.String type)
throws JMSException
このメソッドは、JDBCコネクション・パラメータを使用し、LDAPを介してQueueConnectionFactoryまたはTopicConnectionFactoryをLDAPに登録します。これは静的であり、次のパラメータを取ります。
| パラメータ | 説明 |
|---|---|
|
|
LDAPコネクションの環境 |
|
|
登録されるコネクションの名前 |
|
|
接続先のURL |
|
info |
プロパティの情報 |
|
|
コネクション・ファクトリのタイプ( |
registerConnectionFactory()に渡されるハッシュ表に、LDAPサーバーと使用可能なコネクションを確立するための情報が含まれている必要があります。さらに、このコネクションには、LDAPサーバー内のコネクション・ファクトリのエントリに対する書込み権限が必要です(LDAPユーザーがデータベースそのものであるか、またはLDAPユーザーにGLOBAL_AQ_USER_ROLEが付与されている必要があります)。登録後、JNDIを使用してコネクション・ファクトリを検索します。
例6-4 LDAPを介した登録: JDBC URLの使用
String url;
Hashtable env = new Hashtable(5, 0.75f);
/* the following statements set in hashtable env:
* service provider package
* the URL of the ldap server
* the distinguished name of the database server
* the authentication method (simple)
* the LDAP username
* the LDAP user password
*/
env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory");
env.put(Context.PROVIDER_URL, "ldap://sun-456:389");
env.put("searchbase", "cn=db1,cn=Oraclecontext,cn=acme,cn=com");
env.put(Context.SECURITY_AUTHENTICATION, "simple");
env.put(Context.SECURITY_PRINCIPAL, "cn=db1aqadmin,cn=acme,cn=com");
env.put(Context.SECURITY_CREDENTIALS, "welcome");
url = "jdbc:oracle:thin:@sun-123:1521:db1";
AQjmsFactory.registerConnectionFactory(env, "topic_conn1", url, null, "topic");キュー/トピックConnectionFactoryの登録解除
LDAPのキュー/トピックConnectionFactoryを登録解除するには、2つの方法があります。
データベースを介した登録解除
public static int unregisterConnectionFactory(java.sql.Connection connection,
java.lang.String conn_name)
throws JMSException
このメソッドは、LDAP内のQueueConnectionFactoryまたはTopicConnectionFactoryの登録を解除します。これは静的であり、次のパラメータを取ります。
| パラメータ | 説明 |
|---|---|
|
|
登録に使用されるJDBCコネクション |
|
|
登録されるコネクションの名前 |
unregisterConnectionFactory()に渡されるデータベース接続には、AQ_ADMINISTRATOR_ROLEを付与する必要があります。
例6-5 データベースを介した登録解除
String url; java.sql.connection db_conn; url = "jdbc:oracle:thin:@sun-123:1521:db1"; db_conn = DriverManager.getConnection(url, "scott", "tiger"); AQjmsFactory.unregisterConnectionFactory(db_conn, "topic_conn1");
LDAPを介した登録解除
public static int unregisterConnectionFactory(java.util.Hashtable env,
java.lang.String conn_name)
throws JMSException
このメソッドは、LDAP内のQueueConnectionFactoryまたはTopicConnectionFactoryの登録を解除します。これは静的であり、次のパラメータを取ります。
| パラメータ | 説明 |
|---|---|
|
|
LDAPコネクションの環境 |
|
|
登録されるコネクションの名前 |
unregisterConnectionFactory()に渡されるハッシュ表に、LDAPサーバーと使用可能なコネクションを確立するための情報が含まれている必要があります。さらに、このコネクションには、LDAPサーバー内のコネクション・ファクトリのエントリに対する書込み権限が必要です(LDAPユーザーがデータベースそのものであるか、またはLDAPユーザーにGLOBAL_AQ_USER_ROLEが付与されている必要があります)。
例6-6 LDAPを介した登録解除
Hashtable env = new Hashtable(5, 0.75f);
/* the following statements set in hashtable env:
* service provider package
* the distinguished name of the database server
* the authentication method (simple)
* the LDAP username
* the LDAP user password
*/
env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory");
env.put(Context.PROVIDER_URL, "ldap://sun-456:389");
env.put("searchbase", "cn=db1,cn=Oraclecontext,cn=acme,cn=com");
env.put(Context.SECURITY_AUTHENTICATION, "simple");
env.put(Context.SECURITY_PRINCIPAL, "cn=db1aqadmin,cn=acme,cn=com");
env.put(Context.SECURITY_CREDENTIALS, "welcome");
url = "jdbc:oracle:thin:@sun-123:1521:db1";
AQjmsFactory.unregisterConnectionFactory(env, "queue_conn1");QueueConnectionFactoryまたはTopicConnectionFactoryの取得
この項の内容は次のとおりです。
QueueConnectionFactoryの取得: JDBC URLの使用
public static javax.jms.QueueConnectionFactory getQueueConnectionFactory(
java.lang.String jdbc_url,
java.util.Properties info)
throws JMSException
このメソッドは、JDBC URLを使用してQueueConnectionFactoryを取得します。これは静的であり、次のパラメータを取ります。
| パラメータ | 説明 |
|---|---|
|
|
接続先のURL |
|
info |
プロパティの情報 |
例6-7 QueueConnectionFactoryの取得: JDBC URLの使用
String url = "jdbc:oracle:oci10:internal/oracle"
Properties info = new Properties();
QueueConnectionFactory qc_fact;
info.put("internal_logon", "sysdba");
qc_fact = AQjmsFactory.getQueueConnectionFactory(url, info);QueueConnectionFactoryの取得: JDBCコネクション・パラメータの使用
public static javax.jms.QueueConnectionFactory getQueueConnectionFactory(
java.lang.String hostname,
java.lang.String oracle_sid,
int portno,
java.lang.String driver)
throws JMSException
このメソッドは、JDBC接続パラメータを使用してQueueConnectionFactoryを取得します。これは静的であり、次のパラメータを取ります。
| パラメータ | 説明 |
|---|---|
|
|
Oracle Database Advanced Queuingを実行しているホストの名前 |
|
|
Oracleシステム識別子 |
|
|
ポート番号 |
|
|
JDBCドライバの型 |
例6-8 QueueConnectionFactoryの取得: JDBCコネクション・パラメータの使用
String host = "dlsun"; String ora_sid = "rdbms10i" String driver = "thin"; int port = 5521; QueueConnectionFactory qc_fact; qc_fact = AQjmsFactory.getQueueConnectionFactory(host, ora_sid, port, driver);
TopicConnectionFactoryの取得: JDBC URLの使用
public static javax.jms.QueueConnectionFactory getQueueConnectionFactory(
java.lang.String jdbc_url,
java.util.Properties info)
throws JMSException
このメソッドは、JDBC URLを使用してTopicConnectionFactoryを取得します。これは静的であり、次のパラメータを取ります。
| パラメータ | 説明 |
|---|---|
|
|
接続先のURL |
|
info |
プロパティの情報 |
例6-9 TopicConnectionFactoryの取得: JDBC URLの使用
String url = "jdbc:oracle:oci10:internal/oracle"
Properties info = new Properties();
TopicConnectionFactory tc_fact;
info.put("internal_logon", "sysdba");
tc_fact = AQjmsFactory.getTopicConnectionFactory(url, info);TopicConnectionFactoryの取得: JDBCコネクション・パラメータの使用
public static javax.jms.TopicConnectionFactory getTopicConnectionFactory(
java.lang.String hostname,
java.lang.String oracle_sid,
int portno,
java.lang.String driver)
throws JMSException
このメソッドは、JDBC接続パラメータを使用してTopicConnectionFactoryを取得します。これは静的であり、次のパラメータを取ります。
| パラメータ | 説明 |
|---|---|
|
|
Oracle Database Advanced Queuingを実行しているホストの名前 |
|
|
Oracleシステム識別子 |
|
|
ポート番号 |
|
|
JDBCドライバの型 |
例6-10 TopicConnectionFactoryの取得: JDBCコネクション・パラメータの使用
String host = "dlsun"; String ora_sid = "rdbms10i" String driver = "thin"; int port = 5521; TopicConnectionFactory tc_fact; tc_fact = AQjmsFactory.getTopicConnectionFactory(host, ora_sid, port, driver);
LDAP内のQueueConnectionFactoryまたはTopicConnectionFactoryの取得
このメソッドは、LDAPからQueueConnectionFactoryまたはTopicConnectionFactoryを取得します。
例6-11 LDAP内のQueueConnectionFactoryまたはTopicConnectionFactoryの取得
Hashtable env = new Hashtable(5, 0.75f);
DirContext ctx;
queueConnectionFactory qc_fact;
/* the following statements set in hashtable env:
* service provider package
* the URL of the ldap server
* the distinguished name of the database server
* the authentication method (simple)
* the LDAP username
* the LDAP user password
*/
env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory");
env.put(Context.PROVIDER_URL, "ldap://sun-456:389");
env.put(Context.SECURITY_AUTHENTICATION, "simple");
env.put(Context.SECURITY_PRINCIPAL, "cn=db1aquser1,cn=acme,cn=com");
env.put(Context.SECURITY_CREDENTIALS, "welcome");
ctx = new InitialDirContext(env);
ctx = (DirContext)ctx.lookup("cn=OracleDBConnections,cn=db1,cn=Oraclecontext,cn=acme,cn=com");
qc_fact = (queueConnectionFactory)ctx.lookup("cn=queue_conn1");LDAP内のキューまたはトピックの取得
このメソッドは、LDAPからキューまたはトピックを取得します。
例6-12 LDAP内のキューまたはトピックの取得
Hashtable env = new Hashtable(5, 0.75f);
DirContext ctx;
topic topic_1;
/* the following statements set in hashtable env:
* service provider package
* the URL of the ldap server
* the distinguished name of the database server
* the authentication method (simple)
* the LDAP username
* the LDAP user password
*/
env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory");
env.put(Context.PROVIDER_URL, "ldap://sun-456:389");
env.put(Context.SECURITY_AUTHENTICATION, "simple");
env.put(Context.SECURITY_PRINCIPAL, "cn=db1aquser1,cn=acme,cn=com");
env.put(Context.SECURITY_CREDENTIALS, "welcome");
ctx = new InitialDirContext(env);
ctx = (DirContext)ctx.lookup("cn=OracleDBQueues,cn=db1,cn=Oraclecontext,cn=acme,cn=com");
topic_1 = (topic)ctx.lookup("cn=topic_1");AQキュー表の作成
public oracle.AQ.AQQueueTable createQueueTable(
java.lang.String owner,
java.lang.String name,
oracle.AQ.AQQueueTableProperty property)
throws JMSException
このメソッドはキュー表を作成します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
キュー表の所有者(スキーマ) |
|
name |
キュー表名 |
|
property |
キュー表のプロパティ |
キューを保持するためにキュー表を使用する場合、キュー表はマルチコンシューマに対して使用可能にすることはできません(デフォルト)。トピックを保持するためにキュー表を使用する場合、キュー表はマルチコンシューマに対して使用可能にする必要があります。
CLOB、BLOBおよびBFILEオブジェクトはOracle Database Advanced Queuingオブジェクト型のロードで有効な属性です。ただし、CLOBおよびBLOBのみ、Oracle8i以降のOracle Database Advanced Queuing 伝播を使用して伝播できます。
ノート:
現在、TxEventQキューは、DBMS_AQADM PL/SQL APIのみで作成および削除できます。
例6-13 キュー表の作成
QueueSession q_sess = null;
AQQueueTable q_table = null;
AQQueueTableProperty qt_prop = null;
qt_prop = new AQQueueTableProperty("SYS.AQ$_JMS_BYTES_MESSAGE");
q_table = ((AQjmsSession)q_sess).createQueueTable(
"boluser", "bol_ship_queue_table", qt_prop);キューの作成
この項の内容は次のとおりです。
Point-to-Pointキューの作成
public javax.jms.Queue createQueue(
oracle.AQ.AQQueueTable q_table,
java.lang.String queue_name,
oracle.jms.AQjmsDestinationProperty dest_property)
throws JMSException
このメソッドは、指定したキュー表にキューを作成します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
キューが作成されるキュー表。キュー表はシングル・コンシューマ・キュー表である必要があります。 |
|
|
作成されるキューの名前。 |
|
|
キューのプロパティ |
このメソッドは、Oracle JMSに固有です。このメソッドでは標準のJava javax.jms.Sessionオブジェクトは使用できません。かわりに、標準の型をOracle JMSの具体的なクラスoracle.jms.AQjmsSessionにキャストする必要があります。
例6-14 Point-to-Pointキューの作成
QueueSession q_sess; AQQueueTable q_table; AqjmsDestinationProperty dest_prop; Queue queue; queue = ((AQjmsSession)q_sess).createQueue(q_table, "jms_q1", dest_prop);
パブリッシュ・サブスクライブ・トピックの作成
public javax.jms.Topic createTopic(
oracle.AQ.AQQueueTable q_table,
java.lang.String topic_name,
oracle.jms.AQjmsDestinationProperty dest_property)
throws JMSException
このメソッドは、パブリッシュ・サブスクライブ・モデルにトピックを作成します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
キューが作成されるキュー表。キュー表はマルチ・コンシューマ・キュー表である必要があります。 |
|
|
作成されるキューの名前。 |
|
|
キューのプロパティ |
このメソッドは、Oracle JMSに固有です。このメソッドでは標準のJava javax.jms.Sessionオブジェクトは使用できません。かわりに、標準の型をOracle JMSの具体的なクラスoracle.jms.AQjmsSessionにキャストする必要があります。
例6-16では、在庫不足のために受注を満たせない場合は、注文を処理しているトランザクションが終了されます。bookedordersトピックは、max_retries = 4、retry_delay = 12時間で設定されています。したがって、注文が2日で満たされない場合は、例外キューに移動されます。
例6-15 パブリッシュ・サブスクライブ・トピックの作成
TopicSession t_sess; AQQueueTable q_table; AqjmsDestinationProperty dest_prop; Topic topic; topic = ((AQjmsSessa)t_sess).createTopic(q_table, "jms_t1", dest_prop);
例6-16 メッセージの最大試行回数と最大遅延の指定
public BolOrder process_booked_order(TopicSession jms_session)
{
Topic topic;
TopicSubscriber tsubs;
ObjectMessage obj_message;
BolCustomer customer;
BolOrder booked_order = null;
String country;
int i = 0;
try
{
/* get a handle to the OE_bookedorders_topic */
topic = ((AQjmsSession)jms_session).getTopic("WS",
"WS_bookedorders_topic");
/* Create local subscriber - to track messages for Western Region */
tsubs = jms_session.createDurableSubscriber(topic, "SUBS1",
"Region = 'Western' ",
false);
/* wait for a message to show up in the topic */
obj_message = (ObjectMessage)tsubs.receive(10);
booked_order = (BolOrder)obj_message.getObject();
customer = booked_order.getCustomer();
country = customer.getCountry();
if (country == "US")
{
jms_session.commit();
}
else
{
jms_session.rollback();
booked_order = null;
}
}catch (JMSException ex)
{ System.out.println("Exception " + ex) ;}
return booked_order;
}Point-to-Pointキューおよびパブリッシュ・サブスクライブ・トピックのためのTxEventQキューの作成
AQ JMSには、TxEventQを作成および削除するための新しいAPIが定義されています。JMSにはキュー変更APIはありません。署名は次のとおりです。
/**
* Create a TxEventQ queue. It also internally creates the related queue
* objects (table, indexes) based on this name.
*
* @param queueName name of the queue to be created, format is schema.queueName
* (where the schema. is optional
* @param isMultipleConsumer flag to indicate whether the queue is a
* multi-consumer or single-consumer queue
* @return javax.jms.Destination
* @throws JMSException if the queue could not be created
*/
public synchronized javax.jms.Destination createJMSTransactionalEventQueue(String queueName,
boolean isMultipleConsumer) throws JMSException {
return createJMSTransactionalEventQueue(queueName, isMultipleConsumer, null, 0, null);
}
/**
* Create a TxEventQ queue. It also internally creates the related queue
* objects (table, indexes) based on this name.
*
* @param queueName name of the queue to be created, format is schema.queueName
* (where the schema. is optional
* @param isMultipleConsumer flag to indicate whether the queue is a
* multi-consumer or single-consumer queue
* @param storageClause additional storage clause
* @param maxRetries retry count before skip the message while dequeue
* @param comment comment for the queue
* @return javax.jms.Destination
* @throws JMSException if the queue could not be created
*/
public Destination createJMSTransactionalEventQueue(java.lang.String queueName,
boolean isMultipleConsumer,
java.lang.String storageClause,
int maxRetries,
java.lang.String comment)
throws JMSExceptionAQキュー表の取得
public oracle.AQ.AQQueueTable getQueueTable(java.lang.String owner,
java.lang.String name)
throws JMSException
このメソッドでは、AQキューのキュー表を取得します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
キュー表の所有者(スキーマ) |
|
name |
キュー表名 |
コネクションをオープンしたコール側がキュー表の所有者ではない場合、コール側にはキュー表内のキュー/トピックに対するOracle Database Advanced Queuingエンキュー/デキュー権限が必要です。この権限がない場合、キュー表は取得できません。
例6-17 キュー表の取得
QueueSession q_sess; AQQueueTable q_table; q_table = ((AQjmsSession)q_sess).getQueueTable( "boluser", "bol_ship_queue_table");
権限の付与および取消し
この項の内容は次のとおりです。
Oracle Database Advanced Queuingシステム権限の付与
public void grantSystemPrivilege(java.lang.String privilege,
java.lang.String grantee,
boolean admin_option)
throws JMSException
このメソッドは、ユーザーおよびロールにOracle Database Advanced Queuingシステム権限を付与します。
| パラメータ | 説明 |
|---|---|
|
|
|
|
|
権限受領者(ユーザー、ロールまたは |
|
|
TRUEに設定すると、権限受領者はこのプロシージャを使用して他のユーザーまたはロールにシステム権限を付与できます。 |
最初は、SYSおよびSYSTEMのみがこのプロシージャを正常に使用できます。ENQUEUE_ANY権限を付与されたユーザーは、データベース内の任意のキューにメッセージをエンキューできます。DEQUEUE_ANY権限を付与されたユーザーは、データベース内の任意のキューからメッセージをデキューできます。MANAGE_ANY権限を付与されたユーザーは、データベースのすべてのスキーマに対してDBMS_AQADMコールを実行できます。
例6-18 Oracle Database Advanced Queuingシステム権限の付与
TopicSession t_sess;
((AQjmsSession)t_sess).grantSystemPrivilege("ENQUEUE_ANY", "scott", false);Oracle Database Advanced Queuingシステム権限の取消し
public void revokeSystemPrivilege(java.lang.String privilege,
java.lang.String grantee)
throws JMSException
このメソッドは、ユーザーまたはロールからOracle Database Advanced Queuingシステム権限を取り消します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
|
|
|
権限受領者(ユーザー、ロールまたは |
ENQUEUE_ANY権限を付与されたユーザーは、データベース内の任意のキューにメッセージをエンキューできます。DEQUEUE_ANY権限を付与されたユーザーは、データベース内の任意のキューからメッセージをデキューできます。MANAGE_ANY権限を付与されたユーザーは、データベースのすべてのスキーマに対してDBMS_AQADMコールを実行できます。
例6-19 Oracle Database Advanced Queuingシステム権限の取消し
TopicSession t_sess;
((AQjmsSession)t_sess).revokeSystemPrivilege("ENQUEUE_ANY", "scott");パブリッシュ・サブスクライブ・トピック権限の付与
public void grantTopicPrivilege(javax.jms.Session session,
java.lang.String privilege,
java.lang.String grantee,
boolean grant_option)
throws JMSException
このメソッドは、パブリッシュ・サブスクライブ・モデルでトピック権限を付与します。初期設定では、キュー表の所有者のみがこのプロシージャを使用してそのトピックの権限を付与できます。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
|
|
|
|
|
|
権限受領者(ユーザー、ロールまたは |
|
|
TRUEに設定すると、権限受領者はこのプロシージャを使用して他のユーザーまたはロールにシステム権限を付与できます。 |
例6-20 パブリッシュ・サブスクライブ・トピック権限の付与
TopicSession t_sess; Topic topic; ((AQjmsDestination)topic).grantTopicPrivilege( t_sess, "ENQUEUE", "scott", false);
パブリッシュ・サブスクライブ・トピック権限の取消し
public void revokeTopicPrivilege(javax.jms.Session session,
java.lang.String privilege,
java.lang.String grantee)
throws JMSException
このメソッドは、パブリッシュ・サブスクライブ・モデルでトピック権限を取り消します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
JMSセッション |
|
|
|
|
|
権限が取り消される権限受領者(ユーザー、ロールまたは |
例6-21 パブリッシュ・サブスクライブ・トピック権限の取消し
TopicSession t_sess; Topic topic; ((AQjmsDestination)topic).revokeTopicPrivilege(t_sess, "ENQUEUE", "scott");
Point-to-Pointキュー権限の付与
public void grantQueuePrivilege(javax.jms.Session session,
java.lang.String privilege,
java.lang.String grantee,
boolean grant_option)
throws JMSException
このメソッドは、Point-to-Pointモデルでキュー権限を付与します。初期設定では、キュー表の所有者のみがこのプロシージャを使用してそのキューの権限を付与できます。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
JMSセッション |
|
|
|
|
|
権限受領者(ユーザー、ロールまたは |
|
|
TRUEに設定すると、権限受領者はこのプロシージャを使用して他のユーザーまたはロールにシステム権限を付与できます。 |
例6-22 Point-to-Pointキュー権限の付与
QueueSession q_sess; Queue queue; ((AQjmsDestination)queue).grantQueuePrivilege( q_sess, "ENQUEUE", "scott", false);
Point-to-Pointキュー権限の取消し
public void revokeQueuePrivilege(javax.jms.Session session,
java.lang.String privilege,
java.lang.String grantee)
throws JMSException
このメソッドは、Point-to-Pointモデルでキュー権限を取り消します。初期設定では、キュー表の所有者のみがこのプロシージャを使用してそのキューの権限を付与できます。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
JMSセッション |
|
|
|
|
|
権限が取り消される権限受領者(ユーザー、ロールまたは |
権限を取り消すには、取消し実行者がその権限の付与者である必要があります。また、GRANTオプションによって伝播された権限は、伝播させた付与者の権限が取り消されたときに取り消されます。
例6-23 Point-to-Pointキュー権限の取消し
QueueSession q_sess; Queue queue; ((AQjmsDestination)queue).revokeQueuePrivilege(q_sess, "ENQUEUE", "scott");
宛先の管理
宛先の開始
public void start(javax.jms.Session session,
boolean enqueue,
boolean dequeue)
throws JMSException
このメソッドは、宛先を開始します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
JMSセッション |
|
|
|
|
デキュー |
|
例6-24 宛先の開始
TopicSession t_sess; QueueSession q_sess; Topic topic; Queue queue; (AQjmsDestination)topic.start(t_sess, true, true); (AQjmsDestination)queue.start(q_sess, true, true);
宛先の停止
public void stop(javax.jms.Session session,
boolean enqueue,
boolean dequeue,
boolean wait)
throws JMSException
このメソッドは、宛先を停止します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
JMSセッション |
|
|
|
|
デキュー |
|
|
|
TRUEに設定すると、キュー/トピックの保留トランザクションは、宛先が停止する前に完了できます。 |
例6-25 宛先の停止
TopicSession t_sess; Topic topic; ((AQjmsDestination)topic).stop(t_sess, true, false);
宛先の変更
public void alter(javax.jms.Session session,
oracle.jms.AQjmsDestinationProperty dest_property)
throws JMSException
このメソッドは、宛先を変更します。これには、次のプロパティがあります。
| パラメータ | 説明 |
|---|---|
|
|
JMSセッション |
|
dest_property |
キューまたはトピックの新規プロパティ |
例6-26 宛先の変更
QueueSession q_sess; Queue queue; TopicSession t_sess; Topic topic; AQjmsDestionationProperty dest_prop1, dest_prop2; ((AQjmsDestination)queue).alter(dest_prop1); ((AQjmsDestination)topic).alter(dest_prop2);
伝播スケジュール
この項の内容は次のとおりです。
ノート:
現在、TxEventQは、DBMS_AQADM PL/SQL APIでのみ管理され、伝播をサポートしていません。
伝播のスケジューリング
public void schedulePropagation(javax.jms.Session session,
java.lang.String destination,
java.util.Date start_time,
java.lang.Double duration,
java.lang.String next_time,
java.lang.Double latency)
throws JMSException
このメソッドは、伝播をスケジュールします。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
JMSセッション |
|
|
伝播がスケジュールされているリモート・データベースのデータベース・リンク。文字列NULLは、伝播がトピックのデータベース内のすべてのサブスクライバに対してスケジュールされていることを示します。 |
|
|
伝播開始時刻 |
|
|
伝播継続時間 |
|
|
次回の伝播開始時刻。 |
|
|
許容可能な待機時間(秒単位)。待機時間は、メッセージがエンキューされた時間と伝播された時間の差異です。 |
メッセージ受信者が、同一または異なるキュー内の同じ宛先に複数存在する場合、メッセージはすべての受信者に同時に伝播されます。
例6-28 伝播のスケジューリング
TopicSession t_sess; Topic topic; ((AQjmsDestination)topic).schedulePropagation( t_sess, null, null, null, null, new Double(0));
伝播スケジュールの有効化
public void enablePropagationSchedule(javax.jms.Session session,
java.lang.String destination)
throws JMSException
このメソッドは、伝播スケジュールを有効化します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
JMSセッション |
|
|
宛先データベースのデータベース・リンク。NULL文字列は、ローカル・データベースに伝播されることを意味します。 |
例6-29 伝播スケジュールの有効化
TopicSession t_sess; Topic topic; ((AQjmsDestination)topic).enablePropagationSchedule(t_sess, "dbs1");
伝播スケジュールの変更
public void alterPropagationSchedule(javax.jms.Session session,
java.lang.String destination,
java.lang.Double duration,
java.lang.String next_time,
java.lang.Double latency)
throws JMSException
このメソッドは、伝播スケジュールを変更します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
JMSセッション |
|
|
伝播がスケジュールされているリモート・データベースのデータベース・リンク。文字列NULLは、伝播がトピックのデータベース内のすべてのサブスクライバに対してスケジュールされていることを示します。 |
|
|
伝播継続時間 |
|
|
次回の伝播開始時刻。 |
|
|
許容可能な待機時間(秒単位)。待機時間は、メッセージがエンキューされた時間と伝播された時間の差異です。 |
例6-30 伝播スケジュールの変更
TopicSession t_sess; Topic topic; ((AQjmsDestination)topic).alterPropagationSchedule( t_sess, null, 30, null, new Double(30));
伝播スケジュールの無効化
public void disablePropagationSchedule(javax.jms.Session session,
java.lang.String destination)
throws JMSException
このメソッドは、伝播スケジュールを無効化します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
JMSセッション |
|
|
宛先データベースのデータベース・リンク。NULL文字列は、ローカル・データベースに伝播されることを意味します。 |
例6-31 伝播スケジュールの無効化
TopicSession t_sess; Topic topic; ((AQjmsDestination)topic).disablePropagationSchedule(t_sess, "dbs1");
伝播スケジュールの解除
public void unschedulePropagation(javax.jms.Session session,
java.lang.String destination)
throws JMSException
このメソッドは、スケジュール済伝播のスケジュールを解除します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
JMSセッション |
|
|
宛先データベースのデータベース・リンク。NULL文字列は、ローカル・データベースに伝播されることを意味します。 |
例6-32 伝播スケジュールの解除
TopicSession t_sess; Topic topic; ((AQjmsDestination)topic).unschedulePropagation(t_sess, "dbs1");
Oracle Java Message ServiceのPoint-to-Point
次のトピックでは、Oracle Database Advanced Queuing (AQ)のJava Message Service (JMS)操作インタフェースのうち、Point-to-Point操作固有のコンポーネントについて説明します。Point-to-Pointおよびパブリッシュ/サブスクライブで共有されるコンポーネントについては、「Oracle Java Message Serviceの共有インタフェース」を参照してください。
ユーザー名/パスワードが設定されたConnectionの作成
public javax.jms.Connection createConnection(
java.lang.String username,
java.lang.String password)
throws JMSException
このメソッドは、指定されたユーザー名とパスワードを使用して、Point-to-Point操作とパブリッシュ/サブスクライブ操作の両方をサポートするコネクションを作成します。これは新規メソッドで、JMSバージョン1.1仕様をサポートしています。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
キュー用にデータベースに接続するユーザー名 |
|
|
サーバーへの接続を作成するためのパスワード |
コネクションの確立: デフォルトのConnectionFactoryパラメータの使用
public javax.jms.Connection createConnection()
throws JMSException
このメソッドは、デフォルトのConnectionFactoryパラメータを使用して、Point-to-Point操作とパブリッシュ・サブスクライブ操作の両方をサポートするコネクションを作成します。これは新規メソッドで、JMSバージョン1.1仕様をサポートしています。ConnectionFactoryプロパティにデフォルトのユーザー名およびパスワードを含めないと、JMSExceptionが発生します。
ユーザー名/パスワードが設定されたQueueConnectionの作成
public javax.jms.QueueConnection createQueueConnection(
java.lang.String username,
java.lang.String password)
throws JMSException
このメソッドは、指定されたユーザー名とパスワードを使用してキュー・コネクションを作成します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
キュー用にデータベースに接続するユーザー名 |
|
|
サーバーへの接続を作成するためのパスワード |
例6-33 ユーザー名/パスワードが設定されたQueueConnectionの作成
QueueConnectionFactory qc_fact = AQjmsFactory.getQueueConnectionFactory(
"sun123", "oratest", 5521, "thin");
QueueConnection qc_conn = qc_fact.createQueueConnection("jmsuser", "jmsuser");QueueConnectionの確立: オープンしているJDBCコネクションの使用
public static javax.jms.QueueConnection createQueueConnection( java.sql.Connection jdbc_connection) throws JMSException
このメソッドは、オープンしているJDBCコネクションを使用してキュー・コネクションを作成します。これは静的であり、次のパラメータを取ります。
| パラメータ | 説明 |
|---|---|
|
|
データベースへの有効なオープン・コネクション |
ユーザーがJMS操作に対して既存の(たとえばコネクション・プールの)JDBCコネクションを使用するときに、例6-34のメソッドを使用できます。この場合、JMSは新しいコネクションをオープンせずに、提供されたJDBCコネクションを使用してJMS QueueConnectionオブジェクトを作成します。
例6-35のメソッドは、データベース(JDBCサーバー・ドライバ)内のJavaストアド・プロシージャからJMSを使用する場合に、JMS QueueConnectionを確立する唯一の方法です。
例6-34 QueueConnectionの確立: オープンしているJDBCコネクションの使用
Connection db_conn; /* previously opened JDBC connection */
QueueConnection qc_conn = AQjmsQueueConnectionFactory.createQueueConnection(
db_conn);
例6-35 データベース内のJavaプロシージャからのQueueConnectionの作成
OracleDriver ora = new OracleDriver(); QueueConnection qc_conn = AQjmsQueueConnectionFactory.createQueueConnection(ora.defaultConnection());
QueueConnectionの確立: デフォルトのConnectionFactoryパラメータの使用
public javax.jms.QueueConnection createQueueConnection()
throws JMSException
このメソッドは、デフォルトのConnectionFactoryパラメータによってキュー・コネクションを確立します。QueueConnectionFactoryのプロパティにデフォルトのユーザー名およびパスワードを含めないと、JMSExceptionが発生します。
QueueConnectionの確立: オープンしているOracleOCIConnectionPoolの使用
public static javax.jms.QueueConnection createQueueConnection(
oracle.jdbc.pool.OracleOCIConnectionPool cpool)
throws JMSException
このメソッドは、オープンしているOracleOCIConnectionPoolを使用してキュー・コネクションを作成します。これは静的であり、次のパラメータを取ります。
| パラメータ | 説明 |
|---|---|
|
|
データベースに対してオープンなOCIコネクション・プール |
ユーザーがJMS操作に対して既存のOracleOCIConnectionPoolインスタンスを使用するとき、例6-36のメソッドを使用できます。この場合、JMSは新しいOracleOCIConnectionPoolインスタンスをオープンせずに、提供されたOracleOCIConnectionPoolインスタンスを使用してJMS QueueConnectionオブジェクトを作成します。
例6-36 QueueConnectionの確立: オープンしているOracleOCIConnectionPoolの使用
OracleOCIConnectionPool cpool; /* previously created OracleOCIConnectionPool */ QueueConnection qc_conn = AQjmsQueueConnectionFactory.createQueueConnection(cpool);
セッションの作成
public javax.jms.Session createSession(boolean transacted,
int ack_mode)
throws JMSException
このメソッドは、Point-to-Point操作とパブリッシュ・サブスクライブ操作の両方をサポートするSessionを作成します。これは新規メソッドで、JMSバージョン1.1仕様をサポートしています。トランザクションおよび非トランザクション・セッションがサポートされています。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
TRUEに設定すると、セッションはトランザクションになります。 |
|
|
コンシューマまたはクライアントが受信したメッセージを認識するかどうかを示します。トランザクション処理のセッションの場合は無視されます。有効な値は |
QueueSessionの作成
public javax.jms.QueueSession createQueueSession(
boolean transacted, int ack_mode)
throws JMSException
このメソッドはQueueSessionを作成します。トランザクションおよび非トランザクション・セッションがサポートされています。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
TRUEに設定すると、セッションはトランザクションになります。 |
|
|
コンシューマまたはクライアントが受信したメッセージを認識するかどうかを示します。トランザクション処理のセッションの場合は無視されます。有効な値は |
例6-37 トランザクションQueueSessionの作成
QueueConnection qc_conn; QueueSession q_sess = qc_conn.createQueueSession(true, 0);
QueueSenderの作成
public javax.jms.QueueSender createSender(javax.jms.Queue queue)
throws JMSException
このメソッドはQueueSenderを作成します。送信者の作成時にデフォルト・キューが設定されていない場合は、すべての送信操作で宛先キューを指定する必要があります。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
宛先キューの名前 |
メッセージの送信: デフォルトの送信オプションのQueueSenderの使用
public void send(javax.jms.Queue queue,
javax.jms.Message message)
throws JMSException
このメソッドは、メッセージをデフォルトの送信オプションでQueueSenderを使用して送信します。この操作では、メッセージのpriority(1)とtimeToLive(infinite)のデフォルト値を使用します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
このメッセージを送信するためのキュー |
|
|
送信するメッセージ |
QueueSenderがデフォルト・キューで作成されている場合、必ずしもsend()コールにqueueパラメータを指定する必要はありません。キューがsend()操作で指定されている場合、この値はQueueSenderのデフォルト・キューをオーバーライドします。
QueueSenderがデフォルト・キューを使用しないで作成されている場合、send()コールごとにqueueパラメータを指定する必要があります。
例6-38 任意のキューにメッセージを送信するセンダーの作成
/* Create a sender to send messages to any queue */ QueueSession jms_sess; QueueSender sender1; TextMessage message; sender1 = jms_sess.createSender(null); sender1.send(queue, message);
例6-39 特定のキューにメッセージを送信するセンダーの作成
/* Create a sender to send messages to a specific queue */ QueueSession jms_sess; QueueSender sender2; Queue billed_orders_que; TextMessage message; sender2 = jms_sess.createSender(billed_orders_que); sender2.send(queue, message);
メッセージの送信: 送信オプションを指定したQueueSenderの使用
public void send(javax.jms.Queue queue,
javax.jms.Message message,
int deliveryMode,
int priority,
long timeToLive)
throws JMSException
このメソッドは、送信オプションを指定してQueueSenderを使用してメッセージを送信します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
このメッセージを送信するためのキュー |
|
|
送信するメッセージ |
|
|
使用する配信モード |
|
|
このメッセージの優先順位 |
|
|
ミリ秒で指定されるメッセージの保存時間(ゼロは無制限) |
QueueSenderがデフォルト・キューで作成されている場合、必ずしもsend()コールにqueueパラメータを指定する必要はありません。キューがsend()操作で指定されている場合、この値はQueueSenderのデフォルト・キューをオーバーライドします。
QueueSenderがデフォルト・キューを使用しないで作成されている場合、send()コールごとにqueueパラメータを指定する必要があります。
例6-40 メッセージの送信: 送信オプションを指定したQueueSenderの使用1
/* Create a sender to send messages to any queue */ /* Send a message to new_orders_que with priority 2 and timetoLive 100000 milliseconds */ QueueSession jms_sess; QueueSender sender1; TextMessage mesg; Queue new_orders_que sender1 = jms_sess.createSender(null); sender1.send(new_orders_que, mesg, DeliveryMode.PERSISTENT, 2, 100000);
例6-41 メッセージの送信: 送信オプションを指定したQueueSenderの使用2
/* Create a sender to send messages to a specific queue */ /* Send a message with priority 1 and timetoLive 400000 milliseconds */ QueueSession jms_sess; QueueSender sender2; Queue billed_orders_que; TextMessage mesg; sender2 = jms_sess.createSender(billed_orders_que); sender2.send(mesg, DeliveryMode.PERSISTENT, 1, 400000);
QueueBrowserの作成: 標準JMS型メッセージ
public javax.jms.QueueBrowser createBrowser(javax.jms.Queue queue,
java.lang.String messageSelector)
throws JMSException
このメソッドは、Text、Stream、Objects、BytesまたはMapMessageメッセージ本体を使用するキューに対するQueueBrowserを作成します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
アクセス対象のキュー |
|
|
|
java.util.Enumeration内のメソッドを使用して、メッセージのリストを参照してください。
関連項目:
例6-42 QueueBrowserの作成: セレクタの指定なし
/* Create a browser without a selector */ QueueSession jms_session; QueueBrowser browser; Queue queue; browser = jms_session.createBrowser(queue);
例6-43 QueueBrowserの作成: セレクタの指定あり
/* Create a browser for queues with a specified selector */ QueueSession jms_session; QueueBrowser browser; Queue queue; /* create a Browser to look at messages with correlationID = RUSH */ browser = jms_session.createBrowser(queue, "JMSCorrelationID = 'RUSH'");
QueueBrowserの作成: 標準JMS型メッセージ、メッセージのロック
public javax.jms.QueueBrowser createBrowser(javax.jms.Queue queue,
java.lang.String messageSelector,
boolean locked)
throws JMSException
このメソッドは、TextMessage、StreamMessage、ObjectMessage、BytesMessageまたはMapMessageメッセージ本体を使用するキューに対して、ブラウズ中にメッセージをロックするQueueBrowserを作成します。ロックされたメッセージは、ブラウズ・セッションによってトランザクションが終了されるまで、他のコンシューマによって削除できません。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
アクセス対象のキュー |
|
|
|
|
|
TRUEに設定すると、ブラウズ中のメッセージはロックされます(UPDATEのSELECTと同様)。 |
例6-44 QueueBrowserの作成: セレクタの指定なし、メッセージをロック
/* Create a browser without a selector */ QueueSession jms_session; QueueBrowser browser; Queue queue; browser = jms_session.createBrowser(queue, null, true);
例6-45 QueueBrowserの作成: セレクタの指定あり、メッセージをロック
/* Create a browser for queues with a specified selector */ QueueSession jms_session; QueueBrowser browser; Queue queue; /* create a Browser to look at messages with correlationID = RUSH in lock mode */ browser = jms_session.createBrowser(queue, "JMSCorrelationID = 'RUSH'", true);
QueueBrowserの作成: Oracleオブジェクト型メッセージ
public javax.jms.QueueBrowser createBrowser(javax.jms.Queue queue,
java.lang.String messageSelector,
java.lang.Object payload_factory)
throws JMSException
このメソッドは、Oracleオブジェクト型メッセージ・キューに対するQueueBrowserを作成します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
アクセス対象のキュー |
|
|
|
|
payload_factory |
Oracleのユーザー定義型にマップするJavaクラスの |
SQLオブジェクト型ペイロードにマップされる特定のJavaクラスに対するCustomDatumFactoryは、静的メソッドgetFactoryを使用して取得できます。
ノート:
CustomDatumは今後のリリースではサポートされなくなります。かわりに、ORADataFactoryペイロード・ファクトリを使用してください。
キューtest_queueは、SCOTT.EMPLOYEE型のペイロードを持ち、このOracleオブジェクト型に対してJPublisherによって生成されるJavaクラスがEmployeeであると仮定します。Employeeクラスは、CustomDatumインタフェースを実装します。このクラスに対するCustomDatumFactoryは、Employee.getFactory()メソッドを使用して取得できます。
ノート:
TEQは、オブジェクト型メッセージをサポートしていません
関連項目:
例6-46 ADTMessageに対するQueueBrowserの作成
/* Create a browser for a Queue with AdtMessage messages of type EMPLOYEE*/
QueueSession jms_session
QueueBrowser browser;
Queue test_queue;
browser = ((AQjmsSession)jms_session).createBrowser(test_queue,
"corrid='EXPRESS'",
Employee.getFactory());QueueBrowserの作成: Oracleオブジェクト型メッセージ、メッセージのロック
public javax.jms.QueueBrowser createBrowser(javax.jms.Queue queue,
java.lang.String messageSelector,
java.lang.Object payload_factory,
boolean locked)
throws JMSException
このメソッドは、Oracleオブジェクト型メッセージ・キューに対して、ブラウズ中にメッセージをロックするQueueBrowserを作成します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
アクセス対象のキュー |
|
|
|
|
payload_factory |
Oracleのユーザー定義型にマップするJavaクラスの |
|
|
TRUEに設定すると、ブラウズ中のメッセージはロックされます(UPDATEのSELECTと同様)。 |
ノート:
CustomDatumは今後のリリースではサポートされなくなります。かわりに、ORADataFactoryペイロード・ファクトリを使用してください。
ノート:
TxEventQキューは、オブジェクト型メッセージをサポートしていません
例6-47 AdtMessageキューに対するQueueBrowserの作成、メッセージをロック
/* Create a browser for a Queue with AdtMessage messagess of type EMPLOYEE* in lock mode/
QueueSession jms_session
QueueBrowser browser;
Queue test_queue;
browser = ((AQjmsSession)jms_session).createBrowser(test_queue,
null,
Employee.getFactory(),
true);QueueReceiverの作成: 標準JMS型メッセージ
public javax.jms.QueueReceiver createReceiver(javax.jms.Queue queue,
java.lang.String messageSelector)
throws JMSException
このメソッドは、標準JMS型メッセージ・キューに対するQueueReceiverを作成します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
アクセス対象のキュー |
|
|
|
関連項目:
例6-48 QueueReceiverの作成: セレクタの指定なし
/* Create a receiver without a selector */ QueueSession jms_session QueueReceiver receiver; Queue queue; receiver = jms_session.createReceiver(queue);
例6-49 QueueReceiverの作成: セレクタの指定あり
/* Create a receiver for queues with a specified selector */ QueueSession jms_session; QueueReceiver receiver; Queue queue; /* create Receiver to receive messages with correlationID starting with EXP */ browser = jms_session.createReceiver(queue, "JMSCorrelationID LIKE 'EXP%'");
QueueReceiverの作成: Oracleオブジェクト型メッセージ
public javax.jms.QueueReceiver createReceiver(javax.jms.Queue queue,
java.lang.String messageSelector,
java.lang.Object payload_factory)
throws JMSException
このメソッドは、Oracleオブジェクト型メッセージ・キューに対するQueueReceiverを作成します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
アクセス対象のキュー |
|
|
|
|
payload_factory |
Oracleのユーザー定義型にマップするJavaクラスの |
SQLオブジェクト型ペイロードにマップされる特定のJavaクラスに対するCustomDatumFactoryは、静的メソッドgetFactoryを使用して取得できます。
ノート:
CustomDatumは今後のリリースではサポートされなくなります。かわりに、ORADataFactoryペイロード・ファクトリを使用してください。
キューtest_queueは、SCOTT.EMPLOYEE型のペイロードを持ち、このOracleオブジェクト型に対してJPublisherによって生成されるJavaクラスがEmployeeであると仮定します。Employeeクラスは、CustomDatumインタフェースを実装します。このクラスに対するORADataFactoryは、Employee.getFactory()メソッドを使用して取得できます。
ノート:
TxEventQキューは、オブジェクト型メッセージをサポートしていません
関連項目:
例6-50 QueueReceiverの作成: AdtMessageメッセージ
/* Create a receiver for a Queue with AdtMessage messages of type EMPLOYEE*/
QueueSession jms_session
QueueReceiver receiver;
Queue test_queue;
browser = ((AQjmsSession)jms_session).createReceiver(
test_queue,
"JMSCorrelationID = 'MANAGER',
Employee.getFactory());Oracle Java Message Serviceのパブリッシュ/サブスクライブ
次のトピックでは、Oracle Database Advanced Queuing (AQ)のJava Message Service (JMS)操作インタフェースのうち、パブリッシュ・サブスクライブ操作固有のコンポーネントについて説明します。Point-to-Pointおよびパブリッシュ/サブスクライブで共有されるコンポーネントについては、「Oracle Java Message Serviceの共有インタフェース」を参照してください。
ユーザー名/パスワードが設定されたConnectionの作成
public javax.jms.Connection createConnection(
java.lang.String username,
java.lang.String password)
throws JMSException
このメソッドは、指定されたユーザー名とパスワードを使用して、Point-to-Point操作とパブリッシュ/サブスクライブ操作の両方をサポートするコネクションを作成します。これは新規メソッドで、JMSバージョン1.1仕様をサポートしています。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
キュー用にデータベースに接続するユーザー名 |
|
|
サーバーへの接続を作成するためのパスワード |
コネクションの確立: デフォルトのConnectionFactoryパラメータの使用
public javax.jms.Connection createConnection()
throws JMSException
このメソッドは、デフォルトのConnectionFactoryパラメータを使用して、Point-to-Point操作とパブリッシュ・サブスクライブ操作の両方をサポートするコネクションを作成します。これは新規メソッドで、JMSバージョン1.1仕様をサポートしています。ConnectionFactoryプロパティにデフォルトのユーザー名およびパスワードを含めないと、JMSExceptionが発生します。
ユーザー名/パスワードが設定されたTopicConnectionの作成
public javax.jms.TopicConnection createTopicConnection(
java.lang.String username,
java.lang.String password)
throws JMSException
このメソッドは、指定されたユーザー名とパスワードを使用してTopicConnectionを作成します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
キュー用にデータベースに接続するユーザー名 |
|
|
サーバーへの接続を作成するためのパスワード |
例6-51 ユーザー名/パスワードが設定されたTopicConnectionの作成
TopicConnectionFactory tc_fact = AQjmsFactory.getTopicConnectionFactory("sun123", "oratest", 5521, "thin");
/* Create a TopicConnection using a username/password */
TopicConnection tc_conn = tc_fact.createTopicConnection("jmsuser", "jmsuser");TopicConnectionの確立: オープンしているJDBCコネクションの使用
public static javax.jms.TopicConnection createTopicConnection(
java.sql.Connection jdbc_connection)
throws JMSException
このメソッドは、オープンしているJDBCコネクションを使用してTopicConnectionを作成します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
データベースへの有効なオープン・コネクション |
例6-52 TopicConnectionの確立: オープンしているJDBCコネクションの使用
Connection db_conn; /*previously opened JDBC connection */ TopicConnection tc_conn = AQjmsTopicConnectionFactory createTopicConnection(db_conn);
例6-53 TopicConnectionの確立: 新規JDBCコネクションの使用
OracleDriver ora = new OracleDriver(); TopicConnection tc_conn = AQjmsTopicConnectionFactory.createTopicConnection(ora.defaultConnection());
TopicConnectionの確立: オープンしているOracleOCIConnectionPoolの使用
public static javax.jms.TopicConnection createTopicConnection(
oracle.jdbc.pool.OracleOCIConnectionPool cpool)
throws JMSException
このメソッドは、オープンしているOracleOCIConnectionPoolを使用してTopicConnectionを作成します。これは静的であり、次のパラメータを取ります。
| パラメータ | 説明 |
|---|---|
|
|
データベースに対してオープンなOCIコネクション・プール |
例6-54 TopicConnectionの確立: オープンしているOracleOCIConnectionPoolの使用
OracleOCIConnectionPool cpool; /* previously created OracleOCIConnectionPool */ TopicConnection tc_conn = AQjmsTopicConnectionFactory.createTopicConnection(cpool);
セッションの作成
public javax.jms.Session createSession(boolean transacted,
int ack_mode)
throws JMSException
このメソッドは、Point-to-Point操作とパブリッシュ・サブスクライブ操作の両方をサポートするSessionを作成します。これは新規で、JMSバージョン1.1仕様をサポートしています。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
TRUEに設定すると、セッションはトランザクションになります。 |
|
|
コンシューマまたはクライアントが受信したメッセージを認識するかどうかを示します。トランザクション処理のセッションの場合は無視されます。有効な値は |
TopicSessionの作成
public javax.jms.TopicSession createTopicSession(boolean transacted,
int ack_mode)
throws JMSException
このメソッドはTopicSessionを作成します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
TRUEに設定すると、セッションはトランザクションになります。 |
|
|
コンシューマまたはクライアントが受信したメッセージを認識するかどうかを示します。トランザクション処理のセッションの場合は無視されます。有効な値は |
例6-55 TopicSessionの作成
TopicConnection tc_conn; TopicSession t_sess = tc_conn.createTopicSession(true,0);
TopicPublisherの作成
public javax.jms.TopicPublisher createPublisher(javax.jms.Topic topic)
throws JMSException
このメソッドはTopicPublisherを作成します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
パブリッシュ対象のトピック。識別されていないプロデューサの場合は、NULL。 |
メッセージのパブリッシュ: 最小限の指定
public void publish(javax.jms.Message message)
throws JMSException
このメソッドは、最小限の指定でメッセージをパブリッシュします。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
送信するメッセージ |
TopicPublisherでは、メッセージのpriority(1)とtimeToLive(infinite)のデフォルト値を使用します。
例6-56 トピック指定なしのパブリッシュ
/* Publish without specifying topic */
TopicConnectionFactory tc_fact = null;
TopicConnection t_conn = null;
TopicSession jms_sess;
TopicPublisher publisher1;
Topic shipped_orders;
int myport = 5521;
/* create connection and session */
tc_fact = AQjmsFactory.getTopicConnectionFactory(
"MYHOSTNAME",
"MYSID",
myport,
"oci8");
t_conn = tc_fact.createTopicConnection("jmstopic", "jmstopic");
/* create TopicSession */
jms_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE);
/* get shipped orders topic */
shipped_orders = ((AQjmsSession )jms_sess).getTopic(
"OE",
"Shipped_Orders_Topic");
publisher1 = jms_sess.createPublisher(shipped_orders);
/* create TextMessage */
TextMessage jms_sess.createTextMessage();
/* publish without specifying the topic */
publisher1.publish(text_message);例6-57 相関と遅延の指定によるパブリッシュ
TopicConnectionFactory tc_fact = null;
TopicConnection t_conn = null;
TopicSession jms_sess;
TopicPublisher publisher1;
Topic shipped_orders;
int myport = 5521;
/* create connection and session */
tc_fact = AQjmsFactory.getTopicConnectionFactory(
"MYHOSTNAME",
"MYSID",
myport,
"oci8");
t_conn = tc_fact.createTopicConnection("jmstopic", "jmstopic");
jms_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE);
shipped_orders = ((AQjmsSession )jms_sess).getTopic(
"OE",
"Shipped_Orders_Topic");
publisher1 = jms_sess.createPublisher(shipped_orders);
/* Create TextMessage */
TextMessage jms_sess.createTextMessage();
/* Set correlation and delay */
/* Set correlation */
jms_sess.setJMSCorrelationID("FOO");
/* Set delay of 30 seconds */
jms_sess.setLongProperty("JMS_OracleDelay", 30);
/* Publish */
publisher1.publish(text_message);トピック指定によるメッセージのパブリッシュ
public void publish(javax.jms.Topic topic, javax.jms.Message message)
throws JMSException
このメソッドは、トピックを指定してメッセージをパブリッシュします。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
パブリッシュするトピック |
|
|
送信するメッセージ |
TopicPublisherがデフォルトのトピックで作成されている場合、publish()コールにtopicパラメータを指定することはできません。トピックが指定されている場合、その値はTopicPublisherのデフォルトのトピックをオーバーライドします。TopicPublisherがデフォルトのトピックを使用しないで作成されている場合、publish()コールごとにトピックを指定する必要があります。
例6-58 トピック指定によるパブリッシュ
/* Publish specifying topic */
TopicConnectionFactory tc_fact = null;
TopicConnection t_conn = null;
TopicSession jms_sess;
TopicPublisher publisher1;
Topic shipped_orders;
int myport = 5521;
/* create connection and session */
tc_fact = AQjmsFactory.getTopicConnectionFactory(
'MYHOSTNAME', 'MYSID', myport, 'oci8');
t_conn = tc_fact.createTopicConnection("jmstopic", "jmstopic");
jms_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE);
/* create TopicPublisher */
publisher1 = jms_sess.createPublisher(null);
/* get topic object */
shipped_orders = ((AQjmsSession )jms_sess).getTopic(
'WS', 'Shipped_Orders_Topic');
/* create text message */
TextMessage jms_sess.createTextMessage();
/* publish specifying the topic */
publisher1.publish(shipped_orders, text_message);
メッセージのパブリッシュ: 配信モード、優先順位およびTimeToLiveの指定
public void publish(javax.jms.Topic topic,
javax.jms.Message message,
oracle.jms.AQjmsAgent[] recipient_list,
int deliveryMode,
int priority,
long timeToLive)
throws JMSException
このメソッドは、配信モード、優先順位およびTimeToLiveを指定してメッセージをパブリッシュします。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
メッセージのパブリッシュ先のトピック( |
|
|
パブリッシュするメッセージ |
|
|
メッセージがパブリッシュされる受信者のリスト。受信者の型は |
|
|
|
|
|
このメッセージの優先順位 |
|
|
ミリ秒で指定されるメッセージの保存時間(ゼロは無制限) |
例6-59 優先順位とTimeToLiveの指定によるパブリッシュ
TopicConnectionFactory tc_fact = null;
TopicConnection t_conn = null;
TopicSession jms_sess;
TopicPublisher publisher1;
Topic shipped_orders;
int myport = 5521;
/* create connection and session */
tc_fact = AQjmsFactory.getTopicConnectionFactory(
"MYHOSTNAME", "MYSID", myport, "oci8");
t_conn = tc_fact.createTopicConnection("jmstopic", "jmstopic");
jms_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE);
shipped_orders = ((AQjmsSession )jms_sess).getTopic(
"OE", "Shipped_Orders_Topic");
publisher1 = jms_sess.createPublisher(shipped_orders);
/* Create TextMessage */
TextMessage jms_sess.createTextMessage();
/* Publish message with priority 1 and time to live 200 seconds */
publisher1.publish(text_message, DeliveryMode.PERSISTENT, 1, 200000);メッセージのパブリッシュ: 受信者リストの指定
public void publish(javax.jms.Message message,
oracle.jms.AQjmsAgent[] recipient_list)
throws JMSException
このメソッドは、トピック・サブスクライバをオーバーライドする受信者リストを指定して、メッセージをパブリッシュします。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
パブリッシュするメッセージ |
|
|
メッセージがパブリッシュされる受信者のリスト。受信者の型は |
例6-60 トピック・サブスクライバをオーバーライドする受信者リストの指定によるパブリッシュ
/* Publish specifying priority and timeToLive */
TopicConnectionFactory tc_fact = null;
TopicConnection t_conn = null;
TopicSession jms_sess;
TopicPublisher publisher1;
Topic shipped_orders;
int myport = 5521;
AQjmsAgent[] recipList;
/* create connection and session */
tc_fact = AQjmsFactory.getTopicConnectionFactory(
"MYHOSTNAME", "MYSID", myport, "oci8");
t_conn = tc_fact.createTopicConnection("jmstopic", "jmstopic");
jms_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE);
shipped_orders = ((AQjmsSession )jms_sess).getTopic(
"OE", "Shipped_Orders_Topic");
publisher1 = jms_sess.createPublisher(shipped_orders);
/* create TextMessage */
TextMessage jms_sess.createTextMessage();
/* create two receivers */
recipList = new AQjmsAgent[2];
recipList[0] = new AQjmsAgent(
"ES", "ES.shipped_orders_topic", AQAgent.DEFAULT_AGENT_PROTOCOL);
recipList[1] = new AQjmsAgent(
"WS", "WS.shipped_orders_topic", AQAgent.DEFAULT_AGENT_PROTOCOL);
/* publish message specifying a recipient list */
publisher1.publish(text_message, recipList);
JMSトピックに対するDurableSubscriberの作成: セレクタの指定なし
public javax.jms.TopicSubscriber createDurableSubscriber(
javax.jms.Topic topic,
java.lang.String subs_name)
throws JMSException
このメソッドは、セレクタ指定なしのJMSトピックにDurableSubscriberを作成します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
サブスクライブ先の非一時トピック |
|
|
このサブスクリプションの識別に使用される名前 |
トピックへの排他アクセス
CreateDurableSubscriber()およびUnsubscribe()は、どちらもターゲット・トピックに排他的にアクセスする必要があります。これらのコールが適用されるときに同じトピックに対する保留中のJMS send()、publish()またはreceive()操作がある場合、例外ORA-4020が発生します。この問題を解決するには、次の2つの方法があります。
-
設定またはクリーン・アップのフェーズでの
createDurableSubscriber()およびUnsubscribe()のコールを、トピックに対して保留中の他のJMS操作がない場合に制限します。これによって、必要なリソースが他のJMS操作のコールによって保持されないようにできます。 -
createDurableSubscriber()またはUnsubscribe()をコールする前に、TopicSession.commitをコールします。
例6-61 JMSトピックに対する永続サブスクライバの作成: セレクタの指定なし
TopicConnectionFactory tc_fact = null;
TopicConnection t_conn = null;
TopicSession jms_sess;
TopicSubscriber subscriber1;
Topic shipped_orders;
int myport = 5521;
AQjmsAgent[] recipList;
/* create connection and session */
tc_fact = AQjmsFactory.getTopicConnectionFactory(
"MYHOSTNAME",
"MYSID",
myport,
"oci8");
t_conn = tc_fact.createTopicConnection("jmstopic", "jmstopic");
jms_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE);
shipped_orders = ((AQjmsSession )jms_sess).getTopic(
"OE",
"Shipped_Orders_Topic");
/* create a durable subscriber on the shipped_orders topic*/
subscriber1 = jms_sess.createDurableSubscriber(
shipped_orders,
'WesternShipping');JMSトピックに対するDurableSubscriberの作成: セレクタの指定あり
public javax.jms.TopicSubscriber createDurableSubscriber(
javax.jms.Topic topic,
java.lang.String subs_name,
java.lang.String messageSelector,
boolean noLocal)
throws JMSException
このメソッドは、セレクタを指定して、JMSトピックに対する永続サブスクライバを作成します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
サブスクライブ先の非一時トピック |
|
|
このサブスクリプションの識別に使用される名前 |
|
|
|
|
|
TRUEに設定すると、独自のコネクションによってパブリッシュされたメッセージの配信が禁止されます。 |
クライアントは、同一の名前および異なるmessageSelectorで永続トピック・サブスクライバを作成することによって、既存の永続サブスクリプションを変更できます。トピックのサブスクリプションを終了するには、unsubscribeコールが必要です。
関連項目:
例6-62 JMSトピックに対する永続サブスクライバの作成: セレクタの指定あり
TopicConnectionFactory tc_fact = null;
TopicConnection t_conn = null;
TopicSession jms_sess;
TopicSubscriber subscriber1;
Topic shipped_orders;
int myport = 5521;
AQjmsAgent[] recipList;
/* create connection and session */
tc_fact = AQjmsFactory.getTopicConnectionFactory(
"MYHOSTNAME", "MYSID", myport, "oci8");
t_conn = tc_fact.createTopicConnection("jmstopic", "jmstopic");
jms_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE);
shipped_orders = ((AQjmsSession )jms_sess).getTopic(
"OE", "Shipped_Orders_Topic");
/* create a subscriber */
/* with condition on JMSPriority and user property 'Region' */
subscriber1 = jms_sess.createDurableSubscriber(
shipped_orders, 'WesternShipping',
"JMSPriority > 2 and Region like 'Western%'", false);Oracleオブジェクト型トピックに対するDurableSubscriberの作成: セレクタの指定なし
public javax.jms.TopicSubscriber createDurableSubscriber(
javax.jms.Topic topic,
java.lang.String subs_name,
java.lang.Object payload_factory)
throws JMSException
このメソッドは、セレクタを指定せずに、Oracleオブジェクト型のトピックに対する永続サブスクライバを作成します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
サブスクライブ先の非一時トピック |
|
|
このサブスクリプションの識別に使用される名前 |
|
|
Oracleのユーザー定義型にマップするJavaクラスの |
ノート:
-
CustomDatumは今後のリリースではサポートされなくなります。かわりに、ORADataFactoryペイロード・ファクトリを使用してください。 -
TxEventQキューは、オブジェクト型メッセージをサポートしていません。
関連項目:
例6-63 Oracleオブジェクト型トピックに対する永続サブスクライバの作成: セレクタの指定なし
/* Subscribe to an ADT queue */
TopicConnectionFactory tc_fact = null;
TopicConnection t_conn = null;
TopicSession t_sess = null;
TopicSession jms_sess;
TopicSubscriber subscriber1;
Topic shipped_orders;
int my[port = 5521;
AQjmsAgent[] recipList;
/* the java mapping of the oracle object type created by J Publisher */
ADTMessage message;
/* create connection and session */
tc_fact = AQjmsFactory.getTopicConnectionFactory(
"MYHOSTNAME", "MYSID", myport, "oci8");
t_conn = tc_fact.createTopicConnection("jmstopic", "jmstopic");
jms_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE);
shipped_orders = ((AQjmsSession )jms_sess).getTopic(
"OE", "Shipped_Orders_Topic");
/* create a subscriber, specifying the correct CustomDatumFactory */
subscriber1 = jms_sess.createDurableSubscriber(
shipped_orders, 'WesternShipping', AQjmsAgent.getFactory());Oracleオブジェクト型トピックに対するDurableSubscriberの作成: セレクタの指定あり
public javax.jms.TopicSubscriber createDurableSubscriber(
javax.jms.Topic topic,
java.lang.String subs_name,
java.lang.String messageSelector,
boolean noLocal,
java.lang.Object payload_factory)
throws JMSException
このメソッドは、セレクタを指定して、Oracleオブジェクト型のトピックに対する永続サブスクライバを作成します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
サブスクライブ先の非一時トピック |
|
|
このサブスクリプションの識別に使用される名前 |
|
|
|
|
|
TRUEに設定すると、独自のコネクションによってパブリッシュされたメッセージの配信が禁止されます。 |
|
|
Oracleのユーザー定義型にマップするJavaクラスの |
ノート:
-
CustomDatumは今後のリリースではサポートされなくなります。かわりに、ORADataFactoryペイロード・ファクトリを使用してください。 -
TxEventQキューは、オブジェクト型メッセージをサポートしていません。
関連項目:
例6-64 Oracleオブジェクト型トピックに対する永続サブスクライバの作成: セレクタの指定あり
TopicConnectionFactory tc_fact = null;
TopicConnection t_conn = null;
TopicSession jms_sess;
TopicSubscriber subscriber1;
Topic shipped_orders;
int myport = 5521;
AQjmsAgent[] recipList;
/* the java mapping of the oracle object type created by J Publisher */
ADTMessage message;
/* create connection and session */
tc_fact = AQjmsFactory.getTopicConnectionFactory(
"MYHOSTNAME", "MYSID", myport, "oci8");
t_conn = tc_fact.createTopicConnection("jmstopic", "jmstopic");
jms_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE);
shipped_orders = ((AQjmsSession )jms_sess).getTopic(
"OE", "Shipped_Orders_Topic");
/* create a subscriber, specifying correct CustomDatumFactory and selector */
subscriber1 = jms_sess.createDurableSubscriber(
shipped_orders, "WesternShipping",
"priority > 1 and tab.user_data.region like 'WESTERN %'", false,
ADTMessage.getFactory());トピック・サブスクライバ作成時の変換の指定
キュー/トピックにメッセージを送信/パブリッシュするときに変換を適用できます。メッセージを変換してからキュー/トピックに送信します。
変換は、AQjmsQueueSenderおよびAQjmsTopicPublisherのsetTransformationインタフェースを使用して指定できます。
CreateDurableSubscriber()コールを使用してトピック・サブスクライバを作成する場合も、変換を指定できます。取り出されたメッセージを変換してから、サブスクライバに戻します。指定されたサブスクライバがすでにCreateDurableSubscriber()コールに存在する場合、その変換は指定した変換に設定されます。
例6-65 変換による宛先へのメッセージの送信
注文入力アプリケーションによって処理された注文情報をWS_bookedorders_topicにパブリッシュする必要があると仮定します。正しいフォーマットでトピックにメッセージを挿入するには、前の項で定義した変換OE2WSを使用します。
public void ship_bookedorders(
TopicSession jms_session,
AQjmsADTMessage adt_message)
{
TopicPublisher publisher;
Topic topic;
try
{
/* get a handle to the WS_bookedorders_topic */
topic = ((AQjmsSession)jms_session).getTopic("WS", "WS_bookedorders_topic");
publisher = jms_session.createPublisher(topic);
/* set the transformation in the publisher */
((AQjmsTopicPublisher)publisher).setTransformation("OE2WS");
publisher.publish(topic, adt_message);
}
catch (JMSException ex)
{
System.out.println("Exception :" ex);
}
}
例6-66 トピック・サブスクライバ作成時の変換の指定
西部地域向け出荷処理アプリケーションは、変換OE2WSとともにOE_bookedorders_topicにサブスクライブします。この変換はメッセージに適用され、Oracleオブジェクト型WS.WS_ordersのメッセージが戻されます。
WSOrder JavaクラスがJPublisherによって生成され、Oracleオブジェクト型WS.WS_orderにマップされると仮定します。:
public AQjmsAdtMessage retrieve_bookedorders(TopicSession jms_session)
{
TopicSubscriber subscriber;
Topic topic;
AQjmsAdtMessage msg = null;
try
{
/* get a handle to the OE_bookedorders_topic */
topic = ((AQjmsSession)jms_session).getTopic("OE", "OE_bookedorders_topic");
/* create a subscriber with the transformation OE2WS */
subs = ((AQjmsSession)jms_session).createDurableSubscriber(
topic, 'WShip', null, false, WSOrder.getFactory(), "OE2WS");
msg = subscriber.receive(10);
}
catch (JMSException ex)
{
System.out.println("Exception :" ex);
}
return (AQjmsAdtMessage)msg;
}リモート・サブスクライバの作成: JMSメッセージ
public void createRemoteSubscriber(javax.jms.Topic topic,
oracle.jms.AQjmsAgent remote_subscriber,
java.lang.String messageSelector)
throws JMSException
このメソッドは、JMSメッセージ・トピックに対するリモート・サブスクライバを作成します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
サブスクライブ先のトピック。 |
|
|
リモート・サブスクライバを表す |
|
|
|
Oracle Database Advanced Queuingでは、トピックがリモート・サブスクライバ(同一または異なるデータベース内の他のトピックのサブスクライバ)を持つことができます。リモート・サブスクライバを使用するには、ローカル・トピックとリモート・トピック間の伝播を設定する必要があります。
リモート・サブスクライバは、リモート・トピックの特定のコンシューマまたはリモート・トピックのすべてのサブスクライバにできます。リモート・サブスクライバは、AQjmsAgent構造を使用して定義されます。AQjmsAgentは、名前およびアドレスで構成されます。名前は、リモート・トピックのconsumer_nameを参照します。アドレスはリモート・トピックを参照します。構文は、schema.topic_name[@dblink]です。
リモート・トピックで特定のコンシューマに対してメッセージをパブリッシュするには、リモート・トピックでの受信者のsubscription_nameをAQjmsAgentのnameフィールドに指定し、リモート・トピックをaddressフィールドに指定する必要があります。リモート・トピックのすべてのサブスクライバに対してメッセージをパブリッシュするには、AQjmsAgentのnameフィールドをNULLに設定する必要があります。
ノート:
TxEventQキューは、リモート・サブスクライバをサポートしていません。
関連項目:
例6-67 リモート・サブスクライバの作成: JMS型メッセージ・トピック
TopicConnectionFactory tc_fact = null;
TopicConnection t_conn = null;
TopicSession t_sess = null;
TopicSession jms_sess;
TopicSubscriber subscriber1;
Topic shipped_orders;
int my[port = 5521;
AQjmsAgent remoteAgent;
/* create connection and session */
tc_fact = AQjmsFactory.getTopicConnectionFactory(
"MYHOSTNAME", "MYSID", myport, "oci8");
t_conn = tc_fact.createTopicConnection("jmstopic", "jmstopic");
jms_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE);
shipped_orders = ((AQjmsSession )jms_sess).getTopic(
"OE", "Shipped_Orders_Topic");
remoteAgent = new AQjmsAgent("WesternRegion", "WS.shipped_orders_topic", null);
/* create a remote subscriber (selector is null )*/
subscriber1 = ((AQjmsSession)jms_sess).createRemoteSubscriber(
shipped_orders, remoteAgent, null);リモート・サブスクライバの作成: Oracleオブジェクト型メッセージ
public void createRemoteSubscriber(javax.jms.Topic topic,
oracle.jms.AQjmsAgent remote_subscriber,
java.lang.String messageSelector,
java.lang.Object payload_factory)
throws JMSException
このメソッドは、Oracleオブジェクト型メッセージ・トピックに対するリモート・サブスクライバを作成します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
サブスクライブ先のトピック。 |
|
|
リモート・サブスクライバを表す |
|
|
|
|
|
Oracleのユーザー定義型にマップするJavaクラスの |
ノート:
-
CustomDatumは今後のリリースではサポートされなくなります。かわりに、ORADataFactoryペイロード・ファクトリを使用してください。 -
TxEventQキューは、リモート・サブスクライバまたはオブジェクト型メッセージをサポートしていません。
Oracle Database Advanced Queuingでは、トピックがリモート・サブスクライバ(同一または異なるデータベース内の他のトピックのサブスクライバ)を持つことができます。リモート・サブスクライバを使用するには、ローカル・トピックとリモート・トピック間の伝播を設定する必要があります。
リモート・サブスクライバは、リモート・トピックの特定のコンシューマまたはリモート・トピックのすべてのサブスクライバにできます。リモート・サブスクライバは、AQjmsAgent構造を使用して定義されます。AQjmsAgentは、名前およびアドレスで構成されます。名前は、リモート・トピックのconsumer_nameを参照します。アドレスはリモート・トピックを参照します。構文は、schema.topic_name[@dblink]です。
リモート・トピックで特定のコンシューマに対してメッセージをパブリッシュするには、リモート・トピックでの受信者のsubscription_nameをAQjmsAgentのnameフィールドに指定し、リモート・トピックをaddressフィールドに指定する必要があります。リモート・トピックのすべてのサブスクライバに対してメッセージをパブリッシュするには、AQjmsAgentのnameフィールドをNULLに設定する必要があります。
ノート:
AQは、宛先が同じである複数のdblinkの使用をサポートしていません。この問題を解決するには、各宛先に対し1つのデータベース・リンクを使用します。
関連項目:
例6-68 リモート・サブスクライバの作成: Oracleオブジェクト型メッセージ・トピック
TopicConnectionFactory tc_fact = null;
TopicConnection t_conn = null;
TopicSession t_sess = null;
TopicSession jms_sess;
TopicSubscriber subscriber1;
Topic shipped_orders;
int my[port = 5521;
AQjmsAgent remoteAgent;
ADTMessage message;
/* create connection and session */
tc_fact = AQjmsFactory.getTopicConnectionFactory(
"MYHOSTNAME", "MYSID", myport, "oci8");
t_conn = tc_fact.createTopicConnection("jmstopic", "jmstopic");
/* create TopicSession */
jms_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE);
/* get the Shipped order topic */
shipped_orders = ((AQjmsSession )jms_sess).getTopic(
"OE", "Shipped_Orders_Topic");
/* create a remote agent */
remoteAgent = new AQjmsAgent("WesternRegion", "WS.shipped_orders_topic", null);
/* create a remote subscriber with null selector*/
subscriber1 = ((AQjmsSession)jms_sess).createRemoteSubscriber(
shipped_orders, remoteAgent, null, message.getFactory);リモート・サブスクライバ作成時の変換の指定
Oracle Database Advanced Queuingによって、リモート・サブスクライバ(他のデータベースにあるサブスクライバ)をトピックにサブスクライブできます。
createRemoteSubscriber()コールを使用してリモート・サブスクライバを作成する場合、変換を指定できます。これによって、異なるフォーマットのトピック間でメッセージを伝播できます。トピックに対してパブリッシュされたメッセージがリモート・サブスクライバの基準を満たしている場合、Oracle Database Advanced Queuingはリモート・サブスクライバに指定されているリモート・データベースにあるキュー/トピックにメッセージを自動的に伝播します。変換が指定される場合も、Oracle Database Advanced Queuingはその変換をメッセージに適用してからリモート・データベース上のキュー/トピックに伝播します。
ノート:
TxEventQキューは、リモート・サブスクライバをサポートしていません。
例6-69 リモート・サブスクライバ作成時の変換の指定
メッセージが自動的にWS.WS_bookedorders_topicに伝播されるようにリモート・サブスクライバをOE.OE_bookedorders_topicに作成します。変換OE2WSは、WS_bookedorders_topicに到達したメッセージが正しいフォーマットとなるように、リモート・サブスクライバを作成するときに指定します。
WSOrder JavaクラスがJPublisherによって生成され、Oracleオブジェクト型WS.WS_orderにマップされると仮定します。
public void create_remote_sub(TopicSession jms_session)
{
AQjmsAgent subscriber;
Topic topic;
try
{
/* get a handle to the OE_bookedorders_topic */
topic = ((AQjmsSession)jms_session).getTopic("OE", "OE_bookedorders_topic");
subscriber = new AQjmsAgent("WShip", "WS.WS_bookedorders_topic");
((AQjmsSession )jms_session).createRemoteSubscriber(
topic, subscriber, null, WSOrder.getFactory(),"OE2WS");
}
catch (JMSException ex)
{
System.out.println("Exception :" ex);
}
}永続サブスクリプションのサブスクライブの解除: ローカル・サブスクライバ
public void unsubscribe(javax.jms.Topic topic,
java.lang.String subs_name)
throws JMSException
このメソッドは、ローカル・サブスクライバについて永続サブスクリプションのサブスクライブを解除します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
サブスクライブ解除先の非一時トピック |
|
|
このサブスクリプションの識別に使用される名前 |
関連項目:
例6-70 永続サブスクリプションのサブスクライブの解除: ローカル・サブスクライバ
TopicConnectionFactory tc_fact = null;
TopicConnection t_conn = null;
TopicSession jms_sess;
TopicSubscriber subscriber1;
Topic shipped_orders;
int myport = 5521;
AQjmsAgent[] recipList;
/* create connection and session */
tc_fact = AQjmsFactory.getTopicConnectionFactory(
"MYHOSTNAME", "MYSID", myport, "oci8");
t_conn = tc_fact.createTopicConnection("jmstopic", "jmstopic");
jms_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE);
shipped_orders = ((AQjmsSession )jms_sess).getTopic(
"OE", "Shipped_Orders_Topic");
/* unsusbcribe "WesternShipping" from shipped_orders */
jms_sess.unsubscribe(shipped_orders, "WesternShipping");永続サブスクリプションのサブスクライブの解除: リモート・サブスクライバ
public void unsubscribe(javax.jms.Topic topic,
oracle.jms.AQjmsAgent remote_subscriber)
throws JMSException
このメソッドは、リモート・サブスクライバについて永続サブスクリプションのサブスクライブを解除します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
サブスクライブ解除先の非一時トピック |
|
|
リモート・サブスクライバを表す |
ノート:
TEQキューは、リモート・サブスクライバをサポートしていません。
関連項目:
例6-71 永続サブスクリプションのサブスクライブの解除: リモート・サブスクライバ
TopicConnectionFactory tc_fact = null;
TopicConnection t_conn = null;
TopicSession t_sess = null;
TopicSession jms_sess;
Topic shipped_orders;
int myport = 5521;
AQjmsAgent remoteAgent;
/* create connection and session */
tc_fact = AQjmsFactory.getTopicConnectionFactory(
"MYHOSTNAME", "MYSID", myport, "oci8");
t_conn = tc_fact.createTopicConnection("jmstopic", "jmstopic");
jms_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE);
shipped_orders = ((AQjmsSession )jms_sess).getTopic(
"OE", "Shipped_Orders_Topic");
remoteAgent = new AQjmsAgent("WS", "WS.Shipped_Orders_Topic", null);
/* unsubscribe the remote agent from shipped_orders */
((AQjmsSession)jms_sess).unsubscribe(shipped_orders, remoteAgent);TopicReceiverの作成: 標準JMS型メッセージ・トピック
public oracle.jms.AQjmsTopicReceiver createTopicReceiver(
javax.jms.Topic topic,
java.lang.String receiver_name,
java.lang.String messageSelector)
throws JMSException
このメソッドは、標準JMS型メッセージ・トピックに対するTopicReceiverを作成します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
アクセスするトピック |
|
|
メッセージ・レシーバの名前。 |
|
|
|
Oracle Database Advanced Queuingでは、特定の受信者にメッセージを送信できます。これらの受信者は、トピックのサブスクライバである場合とそうでない場合があります。受信者がトピックに対するサブスクライバでない場合、明示的にアドレス指定されているメッセージのみを受信します。このメソッドは、永続サブスクライバ以外のコンシューマに対するTopicReceiverオブジェクトの作成に使用する必要があります。
関連項目:
例6-72 TopicReceiverの作成: 標準JMS型メッセージ
TopicConnectionFactory tc_fact = null;
TopicConnection t_conn = null;
TopicSession t_sess = ull;
TopicSession jms_sess;
Topic shipped_orders;
int myport = 5521;
TopicReceiver receiver;
/* create connection and session */
tc_fact = AQjmsFactory.getTopicConnectionFactory(
"MYHOSTNAME", "MYSID", myport, "oci8");
t_conn = tc_fact.createTopicConnection("jmstopic", "jmstopic");
jms_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE);
shipped_orders = ((AQjmsSession )jms_sess).getTopic(
"WS", "Shipped_Orders_Topic");
receiver = ((AQjmsSession)jms_sess).createTopicReceiver(
shipped_orders, "WesternRegion", null); TopicReceiverの作成: Oracleオブジェクト型メッセージ・トピック
public oracle.jms.AQjmsTopicReceiver createTopicReceiver(
javax.jms.Topic topic,
java.lang.String receiver_name,
java.lang.String messageSelector,
java.lang.Object payload_factory)
throws JMSException
このメソッドは、セレクタで、Oracleオブジェクト型メッセージ・トピックに対するTopicReceiverを作成します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
アクセスするトピック |
|
|
メッセージ・レシーバの名前。 |
|
|
|
|
|
Oracleのユーザー定義型にマップするJavaクラスの |
ノート:
-
CustomDatumは今後のリリースではサポートされなくなります。かわりに、ORADataFactoryペイロード・ファクトリを使用してください。 -
TxEventQキューは、オブジェクト型メッセージをサポートしていません。
Oracle Database Advanced Queuingでは、トピックのすべてのサブスクライバ、または特定の受信者にメッセージを送信できます。これらの受信者は、トピックのサブスクライバである場合とそうでない場合があります。受信者がトピックに対するサブスクライバでない場合、明示的にアドレス指定されているメッセージのみを受信します。このメソッドは、永続サブスクライバ以外のコンシューマに対するTopicReceiverオブジェクトの作成に使用する必要があります。
関連項目:
例6-73 TopicReceiverの作成: Oracleオブジェクト型メッセージ
TopicConnectionFactory tc_fact = null;
TopicConnection t_conn = null;
TopicSession t_sess = null;
TopicSession jms_sess;
Topic shipped_orders;
int myport = 5521;
TopicReceiver receiver;
/* create connection and session */
tc_fact = AQjmsFactory.getTopicConnectionFactory(
"MYHOSTNAME", "MYSID", myport, "oci8");
t_conn = tc_fact.createTopicConnection("jmstopic", "jmstopic");
jms_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE);
shipped_orders = ((AQjmsSession )jms_sess).getTopic(
"WS", "Shipped_Orders_Topic");
receiver = ((AQjmsSession)jms_sess).createTopicReceiver(
shipped_orders, "WesternRegion", null);TopicBrowserの作成: 標準JMSメッセージ
public oracle.jms.TopicBrowser createBrowser(javax.jms.Topic topic,
java.lang.String cons_name,
java.lang.String messageSelector)
throws JMSException
このメソッドは、TextMessage、StreamMessage、ObjectMessage、BytesMessageまたはMapMessageメッセージ本体を使用するトピックに対するTopicBrowserを作成します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
アクセスするトピック |
|
|
永続サブスクライバまたはコンシューマの名前。 |
|
|
|
|
|
Oracleのユーザー定義型にマップするJavaクラスの |
関連項目:
例6-74 TopicBrowserの作成: セレクタの指定なし
/* Create a browser without a selector */ TopicSession jms_session; TopicBrowser browser; Topic topic; browser = ((AQjmsSession) jms_session).createBrowser(topic, "SUBS1");
例6-75 TopicBrowserの作成: セレクタの指定あり
/* Create a browser for topics with a specified selector */
TopicSession jms_session;
TopicBrowser browser;
Topic topic;
/* create a Browser to look at messages with correlationID = RUSH */
browser = ((AQjmsSession) jms_session).createBrowser(
topic, "SUBS1", "JMSCorrelationID = 'RUSH'");TopicBrowserの作成: 標準JMSメッセージ、メッセージのロック
public oracle.jms.TopicBrowser createBrowser(javax.jms.Topic topic,
java.lang.String cons_name,
java.lang.String messageSelector,
boolean locked)
throws JMSException
このメソッドは、Text、Stream、Objects、Bytesまたはマップ・メッセージを使用するトピックに対して、ブラウズ中にメッセージをロックするTopicBrowserを作成します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
アクセスするトピック |
|
|
永続サブスクライバまたはコンシューマの名前。 |
|
|
|
|
|
TRUEに設定すると、ブラウズ中のメッセージはロックされます(UPDATEのSELECTと同様)。 |
例6-76 TopicBrowserの作成: セレクタの指定なし、ブラウズ中にメッセージをロック
/* Create a browser without a selector */
TopicSession jms_session;
TopicBrowser browser;
Topic topic;
browser = ((AQjmsSession) jms_session).createBrowser(
topic, "SUBS1", true);例6-77 TopicBrowserの作成: セレクタの指定あり、メッセージをロック
/* Create a browser for topics with a specified selector */
TopicSession jms_session;
TopicBrowser browser;
Topic topic;
/* create a Browser to look at messages with correlationID = RUSH in
lock mode */
browser = ((AQjmsSession) jms_session).createBrowser(
topic, "SUBS1", "JMSCorrelationID = 'RUSH'", true);TopicBrowserの作成: Oracleオブジェクト型メッセージ
public oracle.jms.TopicBrowser createBrowser(javax.jms.Topic topic,
java.lang.String cons_name,
java.lang.String messageSelector,
java.lang.Object payload_factory)
throws JMSException
このメソッドは、Oracleオブジェクト型メッセージ・トピックに対するTopicBrowserを作成します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
アクセスするトピック |
|
|
永続サブスクライバまたはコンシューマの名前。 |
|
|
|
|
|
Oracleのユーザー定義型にマップするJavaクラスの |
ノート:
-
CustomDatumは今後のリリースではサポートされなくなります。かわりに、ORADataFactoryペイロード・ファクトリを使用してください。 -
TxEventQキューは、オブジェクト型メッセージをサポートしていません。
SQLオブジェクト型ペイロードにマップされる特定のJavaクラスに対するCustomDatumFactoryは、静的メソッドgetFactoryを使用して取得できます。トピック test_topicは、SCOTT.EMPLOYEE型のペイロードを持ち、このOracleオブジェクト型に対してJpublisherによって生成されるJavaクラスがEmployeeであると仮定します。Employeeクラスは、CustomDatumインタフェースを実装します。このクラスに対するCustomDatumFactoryは、Employee.getFactory()メソッドを使用して取得できます。
関連項目:
例6-78 TopicBrowserの作成: AdtMessageメッセージ
/* Create a browser for a Topic with AdtMessage messages of type EMPLOYEE*/
TopicSession jms_session
TopicBrowser browser;
Topic test_topic;
browser = ((AQjmsSession) jms_session).createBrowser(
test_topic, "SUBS1", Employee.getFactory());TopicBrowserの作成: Oracleオブジェクト型メッセージ、メッセージのロック
public oracle.jms.TopicBrowser createBrowser(javax.jms.Topic topic,
java.lang.String cons_name,
java.lang.String messageSelector,
java.lang.Object payload_factory,
boolean locked)
throws JMSException
このメソッドは、Oracleオブジェクト型メッセージ・トピックに対して、ブラウズ中にメッセージをロックするTopicBrowserを作成します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
アクセスするトピック |
|
|
永続サブスクライバまたはコンシューマの名前。 |
|
|
|
|
|
Oracleのユーザー定義型にマップするJavaクラスの |
|
|
TRUEに設定すると、ブラウズ中のメッセージはロックされます(UPDATEのSELECTと同様)。 |
ノート:
-
CustomDatumは今後のリリースではサポートされなくなります。かわりに、ORADataFactoryペイロード・ファクトリを使用してください。 -
TxEventQキューは、オブジェクト型メッセージをサポートしていません。
関連項目:
例6-79 TopicBrowserの作成: AdtMessageメッセージ、メッセージのロック
/* Create a browser for a Topic with AdtMessage messages of type EMPLOYEE* in
lock mode/
TopicSession jms_session
TopicBrowser browser;
Topic test_topic;
browser = ((AQjmsSession) jms_session).createBrowser(
test_topic, "SUBS1", Employee.getFactory(), true);TopicBrowserを使用したメッセージのブラウズ
public void purgeSeen()
throws JMSException
このメソッドは、TopicBrowserを使用してメッセージをブラウズします。java.util.Enumeration内のメソッドを使用して、メッセージのリストを参照してください。現行のブラウズ中に参照したメッセージを削除するには、TopicBrowser内のpurgeSeenメソッドを使用してください。
例6-80 TopicBrowserの作成: セレクタの指定あり
/* Create a browser for topics with a specified selector */
public void browse_rush_orders(TopicSession jms_session)
TopicBrowser browser;
Topic topic;
ObjectMessage obj_message
BolOrder new_order;
Enumeration messages;
/* get a handle to the new_orders topic */
topic = ((AQjmsSession) jms_session).getTopic("OE", "OE_bookedorders_topic");
/* create a Browser to look at RUSH orders */
browser = ((AQjmsSession) jms_session).createBrowser(
topic, "SUBS1", "JMSCorrelationID = 'RUSH'");
/* Browse through the messages */
for (messages = browser.elements() ; message.hasMoreElements() ;)
{obj_message = (ObjectMessage)message.nextElement();}
/* Purge messages seen during this browse */
browser.purgeSeen() Oracle Java Message Serviceの共有インタフェース
次のトピックでは、Oracle Database Advanced Queuing (AQ)に対するJava Message Service (JMS)操作インタフェース(共有インタフェース)を説明します。
Oracle Database Advanced Queuing JMS操作インタフェース: 共有インタフェース
次のトピックでは、JMS操作のためのOracle Database Advanced Queuing共有インタフェースを説明します。
JMSコネクションの起動
public void start()
throws JMSException
AQjmsConnection.start()は、メッセージ受信用のJMSコネクションを起動します。
JMSコネクションの取得
public oracle.jms.AQjmsConnection getJmsConnection()
throws JMSException
AQjmsSession.getJmsConnection()は、セッションからJMSコネクションを取得します。
セッションにおけるすべての操作のコミット
public void commit()
throws JMSException
AQjmsSession.commit()は、セッションで実行されているすべてのJMSおよびSQL操作をコミットします。
セッションにおけるすべての操作のロールバック
public void rollback()
throws JMSException
AQjmsSession.rollback()は、セッションで実行されているすべてのJMSおよびSQL操作を終了します。
セッションからのJDBCコネクションの取得
public java.sql.Connection getDBConnection()
throws JMSException
AQjmsSession.getDBConnection()は、JMSセッションから基礎となるJDBCコネクションを取得します。JDBCコネクションは、JMS操作が実行される同一のトランザクションの一部としてSQL操作を実行するために使用される場合があります。
例6-81 JMSセッションからの基礎となるJDBCコネクションの取得
java.sql.Connection db_conn; QueueSession jms_sess; db_conn = ((AQjmsSession)jms_sess).getDBConnection();
JMSコネクションからのOracleOCIConnectionPoolの取得
public oracle.jdbc.pool.OracleOCIConnectionPool getOCIConnectionPool()
AQjmsConnection.getOCIConnectionPool()は、JMSコネクションから基礎となるOracleOCIConnectionPoolを取得します。OracleOCIConnectionPoolインスタンスの設定は、コネクションの使用状況(ユーザーが既存のコネクションを使用して確立するセッション数など)に応じてユーザーがチューニングできます。ただし、JMSコネクションが使用しているOracleOCIConnectionPoolインスタンスはクローズしないでください。
例6-82 JMSコネクションからの基礎となるOracleOCIConnectionPoolの取得
oracle.jdbc.pool.OracleOCIConnectionPool cpool; QueueConnection jms_conn; cpool = ((AQjmsConnection)jms_conn).getOCIConnectionPool();
MapMessageの作成
public javax.jms.MapMessage createMapMessage()
throws JMSException
AQjmsSession.createMapMessage()は、Mapメッセージを作成します。これを使用できるのは、宛先キュー/トピックを含むキュー表がペイロード型SYS.AQ$_JMS_MAP_MESSAGEまたはAQ$_JMS_MESSAGEで作成された場合のみです。
StreamMessageの作成
public javax.jms.StreamMessage createStreamMessage()
throws JMSException
AQjmsSession.createStreamMessage()は、StreamMessageを作成します。これを使用できるのは、宛先キュー/トピックを含むキュー表がペイロード型SYS.AQ$_JMS_STREAM_MESSAGEまたはAQ$_JMS_MESSAGEで作成された場合のみです。
ObjectMessageの作成
public javax.jms.ObjectMessage createObjectMessage(java.io.Serializable object)
throws JMSException
AQjmsSession.createObjectMessage()は、Objectメッセージを作成します。これを使用できるのは、宛先キュー/トピックを含むキュー表がペイロード型SYS.AQ$_JMS_OBJECT_MESSAGEまたはAQ$_JMS_MESSAGEで作成された場合のみです。
TextMessageの作成
public javax.jms.TextMessage createTextMessage()
throws JMSException
AQjmsSession.createTextMessage()は、Textメッセージを作成します。これを使用できるのは、宛先キュー/トピックを含むキュー表がペイロード型SYS.AQ$_JMS_TEXT_MESSAGEまたはAQ$_JMS_MESSAGEで作成された場合のみです。
JMSメッセージの作成
public javax.jms.Message createMessage()
throws JMSException
AQjmsSession.createMessage()は、JMSメッセージを作成します。AQ$_JMS_MESSAGEメッセージを使用して、様々な型のメッセージを構成できます。メッセージ型は、次のいずれかである必要があります。
-
DBMS_AQ.JMS_TEXT_MESSAGE -
DBMS_AQ.JMS_OBJECT_MESSAGE -
DBMS_AQ.JMS_MAP_MESSAGE -
DBMS_AQ.JMS_BYTES_MESSAGE -
DBMS_AQ.JMS_STREAM_MESSAGE
このユーザー定義型を使用して、ヘッダーのみのJMSメッセージを作成することもできます。
AdtMessageの作成
public oracle.jms.AdtMessage createAdtMessage()
throws JMSException
AQjmsSession.createAdtMessage()は、AdtMessageを作成します。これを使用できるのは、宛先キュー/トピックを含むキュー表がペイロード型Oracle ADTで作成された場合のみです。AdtMessageには、CustomDatumインタフェースを実装するオブジェクトが移入される必要があります。このオブジェクトは、キュー/トピックのペイロードとして定義されたSQL ADTのJavaマッピングである必要があります。SQL ADT型に対応するJavaクラスは、Jpublisherツールを使用して生成できます。
JMSメッセージ・プロパティの指定
JMSで始まるプロパティ名は、プロバイダ固有です。ユーザー定義のプロパティは、JMSで始めることができません。
次のプロバイダのプロパティは、Text、Stream、Object、BytesまたはMapMessageを使用するクライアントが設定できます。
-
JMSXAppID (string) -
JMSXGroupID (string) -
JMSXGroupSeq (int) -
JMS_OracleExcpQ (string)このメッセージ・プロパティでは例外キューを指定します。
-
JMS_OracleDelay (int)このメッセージ・プロパティではメッセージの遅延秒数を指定します。
次のプロパティは、AdtMessageに対して設定できます。
-
JMS_OracleExcpQ (String)このメッセージ・プロパティでは、例外キューを
schema.queue_nameとして指定します。 -
JMS_OracleDelay (int)このメッセージ・プロパティではメッセージの遅延秒数を指定します。
この項の内容は次のとおりです。
Booleanメッセージ・プロパティの設定
public void setBooleanProperty(java.lang.String name,
boolean value)
throws JMSException
AQjmsMessage.setBooleanProperty()は、メッセージ・プロパティをBooleanとして指定します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
Booleanプロパティの名前 |
|
|
メッセージに設定するBooleanプロパティの値 |
Stringメッセージ・プロパティの設定
public void setStringProperty(java.lang.String name,
java.lang.String value)
throws JMSException
AQjmsMessage.setStringProperty()は、メッセージ・プロパティをStringとして指定します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
Stringプロパティの名前 |
|
|
メッセージに設定するStringプロパティの値 |
Integerメッセージ・プロパティの設定
public void setIntProperty(java.lang.String name,
int value)
throws JMSException
AQjmsMessage.setIntProperty()は、メッセージ・プロパティをIntegerとして指定します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
Integerプロパティの名前 |
|
|
メッセージに設定するIntegerプロパティの値 |
Doubleメッセージ・プロパティの設定
public void setDoubleProperty(java.lang.String name,
double value)
throws JMSException
AQjmsMessage.setDoubleProperty()は、メッセージ・プロパティをDoubleとして指定します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
Doubleプロパティの名前 |
|
|
メッセージに設定するDoubleプロパティの値 |
Floatメッセージ・プロパティの設定
public void setFloatProperty(java.lang.String name,
float value)
throws JMSException
AQjmsMessage.setFloatProperty()は、メッセージ・プロパティをFloatとして指定します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
Floatプロパティの名前 |
|
|
メッセージに設定するFloatプロパティの値 |
Byteメッセージ・プロパティの設定
public void setByteProperty(java.lang.String name,
byte value)
throws JMSException
AQjmsMessage.setByteProperty()は、メッセージ・プロパティをByteとして指定します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
Byteプロパティの名前 |
|
|
メッセージに設定するByteプロパティの値 |
Longメッセージ・プロパティの設定
public void setLongProperty(java.lang.String name,
long value)
throws JMSException
AQjmsMessage.setLongProperty()は、メッセージ・プロパティをLongとして指定します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
Longプロパティの名前 |
|
|
メッセージに設定するLongプロパティの値 |
Shortメッセージ・プロパティの設定
public void setShortProperty(java.lang.String name,
short value)
throws JMSException
AQjmsMessage.setShortProperty()は、メッセージ・プロパティをShortとして指定します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
Shortプロパティの名前 |
|
|
メッセージに設定するShortプロパティの値 |
Objectメッセージ・プロパティの設定
public void setObjectProperty(java.lang.String name,
java.lang.Object value)
throws JMSException
AQjmsMessage.setObjectProperty()は、メッセージ・プロパティをObjectとして指定します。オブジェクト化されたプリミティブ値(Boolean、Byte、Short、Integer、Long、Float、DoubleおよびString)のみサポートされています。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
JavaのObjectプロパティの名前 |
|
|
メッセージに設定するJavaのObjectプロパティの値 |
MessageProducerが送信するすべてのメッセージに対するデフォルトのTimeToLiveの設定
public void setTimeToLive(long timeToLive)
throws JMSException
このメソッドは、MessageProducerが送信するすべてのメッセージに対するデフォルトのTimeToLiveを設定します。メッセージの遅延が発生した後に計算されます。このメソッドには、次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
ミリ秒で指定されるメッセージのTime-To-Live(ゼロは無制限) |
例6-83 MessageProducerが送信するすべてのメッセージに対するデフォルトのTimeToLiveの設定
/* Set default timeToLive value to 100000 milliseconds for all messages sent by the QueueSender*/ QueueSender sender; sender.setTimeToLive(100000);
MessageProducerが送信するすべてのメッセージに対するデフォルトの優先順位の設定
public void setPriority(int priority)
throws JMSException
このメソッドは、MessageProducerが送信するすべてのメッセージに対するデフォルトのPriorityを設定します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
このMessageProducerのメッセージの優先順位。デフォルトは4です。 |
優先順位には、どの整数値でも指定できます。数値が小さいほど高い優先度を示します。優先順位がsend()操作時に明示的に指定されている場合は、このメソッドで設定されたデフォルト値をオーバーライドします。
例6-84 QueueSenderが送信するすべてのメッセージに対するデフォルトの優先順位の設定
/* Set default priority value to 2 for all messages sent by the QueueSender*/ QueueSender sender; sender.setPriority(2);
例6-85 TopicPublisherが送信するすべてのメッセージに対するデフォルトの優先順位の設定
/* Set default priority value to 2 for all messages sent by the TopicPublisher*/ TopicPublisher publisher; publisher.setPriority(1);
AQjmsエージェントの作成
public void createAQAgent(java.lang.String agent_name,
boolean enable_http,
throws JMSException
このメソッドはAQjmsAgentを作成します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
AQエージェントの名前。 |
|
|
TRUEに設定すると、このエージェントはHTTP経由でAQにアクセスできます。 |
メッセージの同期受信
タイムアウトを指定するか、待機なしで、メッセージを同期受信できます。また、変換を使用してメッセージを受信することもできます。
タイムアウト指定でのメッセージ・コンシューマの使用
public javax.jms.Message receive(long timeout)
throws JMSException
このメソッドは、タイムアウトを指定し、メッセージ・コンシューマを使用してメッセージを受信します。
| パラメータ | 説明 |
|---|---|
|
|
タイムアウト値(ミリ秒単位) |
例6-86 タイムアウト指定でのメッセージ・コンシューマの使用
TopicConnectionFactory tc_fact = null;
TopicConnection t_conn = null;
TopicSession t_sess = null;
TopicSession jms_sess;
Topic shipped_orders;
int myport = 5521;
/* create connection and session */
tc_fact = AQjmsFactory.getTopicConnectionFactory(
"MYHOSTNAME", "MYSID", myport, "oci8");
t_conn = tc_fact.createTopicConnection("jmstopic", "jmstopic");
jms_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE);
shipped_orders = ((AQjmsSession )jms_sess).getTopic(
"WS", "Shipped_Orders_Topic");
/* create a subscriber, specifying the correct CustomDatumFactory and
selector */
subscriber1 = jms_sess.createDurableSubscriber(
shipped_orders, 'WesternShipping',
" priority > 1 and tab.user_data.region like 'WESTERN %'",
false, AQjmsAgent.getFactory());
/* receive, blocking for 30 seconds if there were no messages */
Message = subscriber.receive(30000);例6-87 JMS: メッセージが届くまでブロック
public BolOrder get_new_order1(QueueSession jms_session)
{
Queue queue;
QueueReceiver qrec;
ObjectMessage obj_message;
BolCustomer customer;
BolOrder new_order = null;
String state;
try
{
/* get a handle to the new_orders queue */
queue = ((AQjmsSession) jms_session).getQueue("OE", "OE_neworders_que");
qrec = jms_session.createReceiver(queue);
/* wait for a message to show up in the queue */
obj_message = (ObjectMessage)qrec.receive();
new_order = (BolOrder)obj_message.getObject();
customer = new_order.getCustomer();
state = customer.getState();
System.out.println("Order: for customer " + customer.getName());
}
catch (JMSException ex)
{
System.out.println("Exception: " + ex);
}
return new_order;
}待機なしでのメッセージ・コンシューマの使用
public javax.jms.Message receiveNoWait()
throws JMSException
このメソッドは、待機なしで、メッセージ・コンシューマを使用してメッセージを受信します。
例6-88 JMS: メッセージを非ブロック
public BolOrder poll_new_order3(QueueSession jms_session)
{
Queue queue;
QueueReceiver qrec;
ObjectMessage obj_message;
BolCustomer customer;
BolOrder new_order = null;
String state;
try
{
/* get a handle to the new_orders queue */
queue = ((AQjmsSession) jms_session).getQueue("OE", "OE_neworders_que");
qrec = jms_session.createReceiver(queue);
/* check for a message to show in the queue */
obj_message = (ObjectMessage)qrec.receiveNoWait();
new_order = (BolOrder)obj_message.getObject();
customer = new_order.getCustomer();
state = customer.getState();
System.out.println("Order: for customer " + customer.getName());
}
catch (JMSException ex)
{
System.out.println("Exception: " + ex);
}
return new_order;
}変換による宛先からのメッセージの受信
キューまたはトピックからメッセージを受信するときに変換を適用できます。メッセージを変換してから、JMSアプリケーションに戻します。
変換は、AQjmsQueueReceiver、AQjmsTopicSubscriberまたはAQjmsTopicReceiverのsetTransformation()インタフェースを使用して指定できます。
例6-89 JMS: 変換による宛先からのメッセージの受信
Western Shippingアプリケーションが、OE_bookedorders_topicからメッセージを取得すると仮定します。これは、変換OE2WSがメッセージをOracleオブジェクト型WS_orderとして取得するよう指定します。WSOrder JavaクラスがJPublisherによって生成され、Oracleオブジェクト型WS.WS_orderにマップされると仮定します。
public AQjmsAdtMessage retrieve_bookedorders(TopicSession jms_session)
AQjmsTopicReceiver receiver;
Topic topic;
Message msg = null;
try
{
/* get a handle to the OE_bookedorders_topic */
topic = ((AQjmsSession)jms_session).getTopic("OE", "OE_bookedorders_topic");
/* Create a receiver for WShip */
receiver = ((AQjmsSession)jms_session).createTopicReceiver(
topic, "WShip, null, WSOrder.getFactory());
/* set the transformation in the publisher */
receiver.setTransformation("OE2WS");
msg = receiver.receive(10);
}
catch (JMSException ex)
{
System.out.println("Exception :", ex);
}
return (AQjmsAdtMessage)msg;
}メッセージの受信に対するナビゲーション・モードの指定
public void setNavigationMode(int mode)
throws JMSException
このメソッドは、メッセージの受信に対するナビゲーション・モードを指定します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
ナビゲーション・モードの新規値 |
例6-90 メッセージの受信に対するナビゲーション・モードの指定
TopicConnectionFactory tc_fact = null;
TopicConnection t_conn = null;
TopicSession t_sess = null;
TopicSession jms_sess;
Topic shipped_orders;
int myport = 5521;
/* create connection and session */
tc_fact = AQjmsFactory.getTopicConnectionFactory(
"MYHOSTNAME", "MYSID", myport, "oci8");
t_conn = tc_fact.createTopicConnection("jmstopic", "jmstopic");
jms_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE);
shipped_orders = ((AQjmsSession )jms_sess).getTopic("WS", "Shipped_Orders_Topic");
/* create a subscriber, specifying the correct CustomDatumFactory and selector */
subscriber1 = jms_sess.createDurableSubscriber(
shipped_orders, 'WesternShipping',
"priority > 1 and tab.user_data.region like 'WESTERN %'", false,
AQjmsAgent.getFactory());
subscriber1.setNavigationMode(AQjmsConstants.NAVIGATION_FIRST_MESSAGE);
/* get message for the subscriber, returning immediately if there was nomessage */
Message = subscriber.receive();メッセージの非同期受信
メッセージを非同期に受信するには、次の2つの方法があります。
メッセージ・コンシューマに対するメッセージ・リスナーの指定
public void setMessageListener(javax.jms.MessageListener myListener)
throws JMSException
このメソッドは、メッセージ・コンシューマに対してメッセージ・リスナーを指定します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
コンシューマに対してメッセージ・リスナーを設定します。 |
例6-91 メッセージ・コンシューマに対するメッセージ・リスナーの指定
TopicConnectionFactory tc_fact = null;
TopicConnection t_conn = null;
TopicSession t_sess = null;
TopicSession jms_sess;
Topic shipped_orders;
int myport = 5521;
MessageListener mLis = null;
/* create connection and session */
tc_fact = AQjmsFactory.getTopicConnectionFactory(
"MYHOSTNAME", "MYSID", myport, "oci8");
t_conn = tc_fact.createTopicConnection("jmstopic", "jmstopic");
jms_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE);
shipped_orders = ((AQjmsSession )jms_sess).getTopic(
"WS", "Shipped_Orders_Topic");
/* create a subscriber, specifying the correct CustomDatumFactory and selector */
subscriber1 = jms_sess.createDurableSubscriber(
shipped_orders, 'WesternShipping',
"priority > 1 and tab.user_data.region like 'WESTERN %'",
false, AQjmsAgent.getFactory());
mLis = new myListener(jms_sess, "foo");
/* get message for the subscriber, returning immediately if there was nomessage */
subscriber.setMessageListener(mLis);
The definition of the myListener class
import oracle.AQ.*;
import oracle.jms.*;
import javax.jms.*;
import java.lang.*;
import java.util.*;
public class myListener implements MessageListener
{
TopicSession mySess;
String myName;
/* constructor */
myListener(TopicSession t_sess, String t_name)
{
mySess = t_sess;
myName = t_name;
}
public onMessage(Message m)
{
System.out.println("Retrieved message with correlation: " ||
m.getJMSCorrelationID());
try{
/* commit the dequeue */
mySession.commit();
} catch (java.sql.SQLException e)
{System.out.println("SQL Exception on commit"); }
}
}メッセージIDの取得
この項の内容は次のとおりです。
相関識別子の取得
public java.lang.String getJMSCorrelationID()
throws JMSException
AQjmsMessage.getJMSCorrelationID()は、メッセージの相関識別子を取得します。
JMSメッセージ・プロパティの取得
この項の内容は次のとおりです。
Booleanメッセージ・プロパティの取得
public boolean getBooleanProperty(java.lang.String name)
throws JMSException
AQjmsMessage.getBooleanProperty()は、メッセージ・プロパティをBooleanとして取得します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
Booleanプロパティの名前 |
Stringメッセージ・プロパティの取得
public java.lang.String getStringProperty(java.lang.String name)
throws JMSException
AQjmsMessage.getStringProperty()は、メッセージ・プロパティをStringとして取得します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
Stringプロパティの名前 |
Integerメッセージ・プロパティの取得
public int getIntProperty(java.lang.String name)
throws JMSException
AQjmsMessage.getIntProperty()は、メッセージ・プロパティをIntegerとして取得します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
Integerプロパティの名前 |
Doubleメッセージ・プロパティの取得
public double getDoubleProperty(java.lang.String name)
throws JMSException
AQjmsMessage.getDoubleProperty()は、メッセージ・プロパティをDoubleとして取得します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
Doubleプロパティの名前 |
Floatメッセージ・プロパティの取得
public float getFloatProperty(java.lang.String name)
throws JMSException
AQjmsMessage.getFloatProperty()は、メッセージ・プロパティをFloatとして取得します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
Floatプロパティの名前 |
Byteメッセージ・プロパティの取得
public byte getByteProperty(java.lang.String name)
throws JMSException
AQjmsMessage.getByteProperty()は、メッセージ・プロパティをByteとして取得します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
Byteプロパティの名前 |
Longメッセージ・プロパティの取得
public long getLongProperty(java.lang.String name)
throws JMSException
AQjmsMessage.getLongProperty()は、メッセージ・プロパティをLongとして取得します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
Longプロパティの名前 |
Shortメッセージ・プロパティの取得
public short getShortProperty(java.lang.String name)
throws JMSException
AQjmsMessage.getShortProperty()は、メッセージ・プロパティをShortとして取得します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
Shortプロパティの名前 |
Objectメッセージ・プロパティの取得
public java.lang.Object getObjectProperty(java.lang.String name)
throws JMSException
AQjmsMessage.getObjectProperty()は、メッセージ・プロパティをObjectとして取得します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
Objectプロパティの名前 |
例6-92 Objectメッセージ・プロパティの取得
TextMessage message;
message.getObjectProperty("empid", new Integer(1000);クローズおよびシャットダウン
この項の内容は次のとおりです。
MessageProducerのクローズ
public void close()
throws JMSException
AQjmsProducer.close()はMessageProducerをクローズします。
メッセージ・コンシューマのクローズ
public void close()
throws JMSException
AQjmsConsumer.close()は、メッセージ・コンシューマをクローズします。
トラブルシューティング
この項の内容は次のとおりです。
JMSエラー・コードの取得
public java.lang.String getErrorCode()
AQjmsException.getErrorCode()は、JMS例外のエラー・コードを取得します。
JMSエラー番号の取得
public int getErrorNumber()
AQjmsException.getErrorNumber()は、JMS例外のエラー番号を取得します。
ノート:
このメソッドは今後のリリースでは使用されなくなります。かわりに、getErrorCode()を使用してください。
JMS例外にリンクされた例外の取得
public java.lang.String getLinkString()
AQjmsException.getLinkString()は、JMS例外にリンクされた例外を取得します。一般に、この例外にはデータベースで発行されるSQL例外が含まれます。
JMS例外のスタック・トレースの出力
public void printStackTrace(java.io.PrintStream s)
AQjmsException.printStackTrace()は、JMS例外のスタック・トレースを出力します。
例外リスナーの設定
public void setExceptionListener(javax.jms.ExceptionListener listener)
throws JMSException
AQjmsConnection.setExceptionListener()は、コネクション用の例外リスナーを指定します。次のパラメータがあります。
| パラメータ | 説明 |
|---|---|
|
|
例外リスナー |
例外リスナーの登録が完了している場合は、コネクションに重大な問題が検出されると通知されます。これは、リスナーのonException()メソッドをコールして、問題を説明するJMS例外を渡すことによって実行されます。これによって、問題がJMSクライアントに非同期に通知されます。メッセージを処理するのみのコネクションもあるため、コネクションが失敗したことを知るための他の方法がありません。
例6-93 コネクション用の例外リスナーの指定
//register an exception listener
Connection jms_connection;
jms_connection.setExceptionListener(
new ExceptionListener() {
public void onException (JMSException jmsException) {
System.out.println("JMS-EXCEPTION: " + jmsException.toString());
}
};
);例外リスナーの取得
public javax.jms.ExceptionListener getExceptionListener()
throws JMSException
AQjmsConnection.getExceptionListener()は、コネクション用の例外リスナーを取得します。
例6-94に、ExceptionListenerとMessageListenerの組合せ使用の方法を示します。次の条件が満たされていることを確認してください。
-
ユーザー
jmsuser(パスワードjmsuser)が、適切な権限を所有してデータベースに作成されます。 -
キュー
demoQueueが作成され起動されます。
この例は、コネクション再起動があり、例外リスナーがJMSオブジェクトを再作成する場合に、MessageListenerが非同期にメッセージを受信する方法を示します。
例6-94 ExceptionListenerとMessageListenerの組合せ使用
import java.util.Enumeration;
import java.util.Properties;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import oracle.jms.AQjmsConnectionFactory;
import oracle.jms.AQjmsFactory;
import oracle.jms.AQjmsSession;
public class JMSDemo {
static String queueName = "demoQueue";
static String queueOwner = "jmsuser";
static String queueOwnerPassword = "jmsuser";
static Connection connection = null;
static int numberOfMessages = 25000;
static int messageCount = 0;
static String jdbcURL = "";
public static void main(String args[]) {
try {
jdbcURL = System.getProperty("JDBC_URL");
if (jdbcURL == null)
System.out
.println("The system property JDBC_URL has not been set, " +
"usage:java -DJDBC_URL=xxx filename ");
else {
JMSDemo demo = new JMSDemo();
demo.performJmsOperations();
}
} catch (Exception exception) {
System.out.println("Exception : " + exception);
exception.printStackTrace();
} finally {
try {
if (connection != null)
connection.close();
} catch (Exception exc) {
exc.printStackTrace();
}
}
System.out.println("\nEnd of Demo aqjmsdemo11.");
}
public void performJmsOperations() {
try {
connection = getConnection(jdbcURL);
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
// remove the messages from the Queue
drainQueue(queueName, queueOwner, jdbcURL, true);
// set the exception listener on the Connection
connection.setExceptionListener(new DemoExceptionListener());
MessageProducer producer = session.createProducer(queue);
TextMessage textMessage = null;
System.out.println("Sending " + numberOfMessages + " messages to queue "
+ queueName);
for (int i = 0; i < numberOfMessages; i++) {
textMessage = session.createTextMessage();
textMessage.setText("Sample message text");
producer.send(textMessage);
}
MessageConsumer consumer = session.createConsumer(queue);
System.out.println("Setting the message listener ...");
consumer.setMessageListener(new DemoMessageListener());
connection.start();
// Introduce a long wait to allow the listener to receive all the messages
while (messageCount < numberOfMessages) {
try {
Thread.sleep(5000);
} catch (InterruptedException interruptedException) {
}
}
} catch (JMSException jmsException) {
jmsException.printStackTrace();
}
}
// Sample message listener
static class DemoMessageListener implements javax.jms.MessageListener {
public void onMessage(Message message) {
try {
System.out.println("Message listener received message with JMSMessageID "
+ message.getJMSMessageID());
messageCount++;
} catch (JMSException jmsException) {
System.out.println("JMSException " + jmsException.getMessage());
}
}
}
// sample exception listener
static class DemoExceptionListener implements javax.jms.ExceptionListener {
public void onException(JMSException jmsException) {
try {
// As a first step close the connection
if (connection != null)
connection.close();
} catch (JMSException exception) {}
try {
System.out.println("Re-create the necessary JMS objects ...");
connection = getConnection(jdbcURL);
connection.start();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(new DemoMessageListener());
} catch (JMSException newJmsException) {
newJmsException.printStackTrace();
}
}
}
// Utility method to get a connection
static Connection getConnection(String jdbcUrl) throws JMSException {
Properties prop = new Properties();
prop.put("user", queueOwner);
prop.put("password", queueOwnerPassword);
AQjmsConnectionFactory fact = (AQjmsConnectionFactory) AQjmsFactory
.getConnectionFactory(jdbcUrl, prop);
Connection conn = fact.createConnection();
return conn;
}
// Utility method to remove the messages from the queue
static void drainQueue(String queueName, String queueOwner, String jdbcUrl,
boolean debugInfo) {
Connection connection = null;
Session session = null;
long timeout = 10000;
int count = 0;
Message message = null;
try {
connection = getConnection(jdbcUrl);
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = ((AQjmsSession) session).getQueue(queueOwner, queueName);
MessageConsumer messageConsumer = session.createConsumer(queue);
QueueBrowser browser = session.createBrowser(queue);
Enumeration enumeration = browser.getEnumeration();
if (enumeration.hasMoreElements()) {
while ((message = messageConsumer.receive(timeout)) != null) {
if (debugInfo) {
count++;
}
}
}
messageConsumer.close();
if (debugInfo) {
System.out.println("Removed " + count + " messages from the queue : "
+ queueName);
}
} catch (JMSException jmsException) {
jmsException.printStackTrace();
} finally {
try {
if (session != null)
session.close();
if (connection != null)
connection.close();
} catch (Exception exception) {
}
}
}
}
例6-95 コネクション用の例外リスナーの取得
//Get the exception listener Connection jms_connection; ExceptionListener el = jms_connection.getExceptionListener();
Oracle Java Message Service型の例
Oracle Database Advanced QueuingのJMS型の設定方法の例
例6-96 JMS型の例を実行する環境の設定
connect sys;
enter password: password
Rem
Rem Create the JMS user: jmsuser
Rem
DROP USER jmsuser CASCADE;
CREATE USER jmsuser IDENTIFIED BY jmsuser;
GRANT EXECUTE ON DBMS_AQADM TO jmsuser;
GRANT EXECUTE ON DBMS_AQ TO jmsuser;
GRANT EXECUTE ON DBMS_LOB TO jmsuser;
GRANT EXECUTE ON DBMS_JMS_PLSQL TO jmsuser;
set echo offset verify offconnect sysDROP USER jmsuser CASCADE;ACCEPT password CHAR PROMPT 'Enter the password for JMSUSER: ' HIDECREATE USER jmsuser IDENTIFIED BY &password;GRANT DBA, AQ_ADMINISTRATOR_ROLE, AQ_USER_ROLE to jmsuser;GRANT EXECUTE ON DBMS_AQADM TO jmsuser;GRANT EXECUTE ON DBMS_AQ TO jmsuser;GRANT EXECUTE ON DBMS_LOB TO jmsuser;GRANT EXECUTE ON DBMS_JMS_PLSQL TO jmsuser;connect jmsuser/&password
Rem
Rem Creating five AQ queue tables and five queues for five payloads:
Rem SYS.AQ$_JMS_TEXT_MESSAGE
Rem SYS.AQ$_JMS_BYTES_MESSAGE
Rem SYS.AQ$_JMS_STREAM_MESSAG
Rem SYS.AQ$_JMS_MAP_MESSAGE
Rem SYS.AQ$_JMS_MESSAGE
Rem
EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE (Queue_table => 'jmsuser.jms_qtt_text',
Queue_payload_type => 'SYS.AQ$_JMS_TEXT_MESSAGE', compatible => '8.1.0');
EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE (Queue_table => 'jmsuser.jms_qtt_bytes',
Queue_payload_type => 'SYS.AQ$_JMS_BYTES_MESSAGE', compatible => '8.1.0');
EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE (Queue_table => 'jmsuser.jms_qtt_stream',
Queue_payload_type => 'SYS.AQ$_JMS_STREAM_MESSAGE', compatible => '8.1.0');
EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE (Queue_table => 'jmsuser.jms_qtt_map',
Queue_payload_type => 'SYS.AQ$_JMS_MAP_MESSAGE', compatible => '8.1.0');
EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE (Queue_table => 'jmsuser.jms_qtt_general',
Queue_payload_type => 'SYS.AQ$_JMS_MESSAGE', compatible => '8.1.0');
EXECUTE DBMS_AQADM.CREATE_QUEUE (Queue_name => 'jmsuser.jms_text_que',
Queue_table => 'jmsuser.jms_qtt_text');
EXECUTE DBMS_AQADM.CREATE_QUEUE (Queue_name => 'jmsuser.jms_bytes_que',
Queue_table => 'jmsuser.jms_qtt_bytes');
EXECUTE DBMS_AQADM.CREATE_QUEUE (Queue_name => 'jmsuser.jms_stream_que',
Queue_table => 'jmsuser.jms_qtt_stream');
EXECUTE DBMS_AQADM.CREATE_QUEUE (Queue_name => 'jmsuser.jms_map_que',
Queue_table => 'jmsuser.jms_qtt_map');
EXECUTE DBMS_AQADM.CREATE_QUEUE (Queue_name => 'jmsuser.jms_general_que',
Queue_table => 'jmsuser.jms_qtt_general');
Rem
Rem Starting the queues and enable both enqueue and dequeue
Rem
EXECUTE DBMS_AQADM.START_QUEUE (Queue_name => 'jmsuser.jms_text_que');
EXECUTE DBMS_AQADM.START_QUEUE (Queue_name => 'jmsuser.jms_bytes_que');
EXECUTE DBMS_AQADM.START_QUEUE (Queue_name => 'jmsuser.jms_stream_que');
EXECUTE DBMS_AQADM.START_QUEUE (Queue_name => 'jmsuser.jms_map_que');
EXECUTE DBMS_AQADM.START_QUEUE (Queue_name => 'jmsuser.jms_general_que');
Rem The supporting utility used in the example to help display results in SQLPLUS enviroment
Rem
Rem Display a RAW data in SQLPLUS
Rem
create or replace procedure display_raw(rdata raw)
IS
pos pls_integer;
length pls_integer;
BEGIN
pos := 1;
length := UTL_RAW.LENGTH(rdata);
WHILE pos <= length LOOP
IF pos+20 > length+1 THEN
dbms_output.put_line(UTL_RAW.SUBSTR(rdata, pos, length-pos+1));
ELSE
dbms_output.put_line(UTL_RAW.SUBSTR(rdata, pos, 20));
END IF;
pos := pos+20;
END LOOP;
END display_raw;
/
show errors;
Rem
Rem Display a BLOB data in SQLPLUS
Rem
create or replace procedure display_blob(bdata blob)
IS
pos pls_integer;
length pls_integer;
BEGIN
length := dbms_lob.getlength(bdata);
pos := 1;
WHILE pos <= length LOOP
display_raw(DBMS_LOB.SUBSTR(bdata, 2000, pos));
pos := pos+2000;
END LOOP;
END display_blob;
/
show errors;
Rem
Rem Display a VARCHAR data in SQLPLUS
Rem
create or replace procedure display_varchar(vdata varchar)
IS
pos pls_integer;
text_len pls_integer;
BEGIN
text_len := length(vdata);
pos := 1;
WHILE pos <= text_len LOOP
IF pos+20 > text_len+1 THEN
dbms_output.put_line(SUBSTR(vdata, pos, text_len-pos+1));
ELSE
dbms_output.put_line(SUBSTR(vdata, pos, 20));
END IF;
pos := pos+20;
END LOOP;
END display_varchar;
/
show errors;
Rem
Rem Display a CLOB data in SQLPLUS
Rem
create or replace procedure display_clob(cdata clob)
IS
pos pls_integer;
length pls_integer;
BEGIN
length := dbms_lob.getlength(cdata);
pos := 1;
WHILE pos <= length LOOP
display_varchar(DBMS_LOB.SUBSTR(cdata, 2000, pos));
pos := pos+2000;
END LOOP;
END display_clob;
/
show errors;
Rem
Rem Display a SYS.AQ$_JMS_EXCEPTION data in SQLPLUS
Rem
Rem When application receives an ORA-24197 error, It means the JAVA stored
Rem procedures has thrown some exceptions that could not be catergorized. The
Rem user can use GET_EXCEPTION procedure of SYS.AQ$_JMS_BYTES_MESSAGE,
Rem SYS.AQ$_JMS_STREAM_MESSAG or SYS.AQ$_JMS_MAP_MESSAGE
Rem to retrieve a SYS.AQ$_JMS_EXCEPTION object which contains more detailed
Rem information on this JAVA exception including the exception name, JAVA error
Rem message and stack trace.
Rem
Rem This utility function is to help display the SYS.AQ$_JMS_EXCEPTION object in
Rem SQLPLUS
Rem
create or replace procedure display_exp(exp SYS.AQ$_JMS_EXCEPTION)
IS
pos1 pls_integer;
pos2 pls_integer;
text_data varchar(2000);
BEGIN
dbms_output.put_line('exception:'||exp.exp_name);
dbms_output.put_line('err_msg:'||exp.err_msg);
dbms_output.put_line('stack:'||length(exp.stack));
pos1 := 1;
LOOP
pos2 := INSTR(exp.stack, chr(10), pos1);
IF pos2 = 0 THEN
pos2 := length(exp.stack)+1;
END IF;
dbms_output.put_line(SUBSTR(exp.stack, pos1, pos2-pos1));
IF pos2 > length(exp.stack) THEN
EXIT;
END IF;
pos1 := pos2+1;
END LOOP;
END display_exp;
/
show errors;
EXIT;例6-97 例の設定
例6-96では、JMS型の例に必要な設定を実行します。この例をコピーし、setup.sqlとして保存します。
JMS BytesMessageの例
この項では、JMS BytesMessageのエンキューおよびデキューの説明の例を示します。
例6-98では、JMS型メンバー関数とDBMS_AQ関数を使用して、データベース内でsys.aq$_jms_bytes_message型として表示されるJMS BytesMessageを移入およびエンキューする方法を紹介します。このメッセージは、後でJAVA Oracle Java Message Service (Oracle JMS)クライアントによってデキューできます。
例6-99では、JMS型メンバー関数とDBMS_AQ関数を使用して、データベース内でsys.aq$_jms_bytes_message型として表示されるJMS BytesMessageからデータをデキューおよび取り出す方法を紹介します。このメッセージは、Oracle JMSクライアントによってエンキューされる場合もあります。
例6-98 BytesMessageの移入とエンキュー
set echo offset verify offconnect sysDROP USER jmsuser CASCADE;ACCEPT password CHAR PROMPT 'Enter the password for JMSUSER: ' HIDECREATE USER jmsuser IDENTIFIED BY &password;GRANT DBA, AQ_ADMINISTRATOR_ROLE, AQ_USER_ROLE to jmsuser;GRANT EXECUTE ON DBMS_AQADM TO jmsuser;GRANT EXECUTE ON DBMS_AQ TO jmsuser;GRANT EXECUTE ON DBMS_LOB TO jmsuser;GRANT EXECUTE ON DBMS_JMS_PLSQL TO jmsuser;connect jmsuser/&password
SET ECHO ON
set serveroutput on
DECLARE
id pls_integer;
agent sys.aq$_agent := sys.aq$_agent(' ', null, 0);
message sys.aq$_jms_bytes_message;
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
msgid raw(16);
java_exp exception;
pragma EXCEPTION_INIT(java_exp, -24197);
BEGIN
-- Consturct a empty BytesMessage object
message := sys.aq$_jms_bytes_message.construct;
-- Shows how to set the JMS header
message.set_replyto(agent);
message.set_type('tkaqpet1');
message.set_userid('jmsuser');
message.set_appid('plsql_enq');
message.set_groupid('st');
message.set_groupseq(1);
-- Shows how to set JMS user properties
message.set_string_property('color', 'RED');
message.set_int_property('year', 1999);
message.set_float_property('price', 16999.99);
message.set_long_property('mileage', 300000);
message.set_boolean_property('import', True);
message.set_byte_property('password', -127);
-- Shows how to populate the message payload of aq$_jms_bytes_message
-- Passing -1 reserve a new slot within the message store of sys.aq$_jms_bytes_message.
-- The maximum number of sys.aq$_jms_bytes_message type of messges to be operated at
-- the same time within a session is 20. Calling clean_body function with parameter -1
-- might result a ORA-24199 error if the messages currently operated is already 20.
-- The user is responsible to call clean or clean_all function to clean up message store.
id := message.clear_body(-1);
-- Write data into the BytesMessage paylaod. These functions are analogy of JMS JAVA api's.
-- See the document for detail.
-- Write a byte to the BytesMessage payload
message.write_byte(id, 10);
-- Write a RAW data as byte array to the BytesMessage payload
message.write_bytes(id, UTL_RAW.XRANGE(HEXTORAW('00'), HEXTORAW('FF')));
-- Write a portion of the RAW data as byte array to BytesMessage payload
-- Note the offset follows JAVA convention, starting from 0
message.write_bytes(id, UTL_RAW.XRANGE(HEXTORAW('00'), HEXTORAW('FF')), 0, 16);
-- Write a char to the BytesMessage payload
message.write_char(id, 'A');
-- Write a double to the BytesMessage payload
message.write_double(id, 9999.99);
-- Write a float to the BytesMessage payload
message.write_float(id, 99.99);
-- Write a int to the BytesMessage payload
message.write_int(id, 12345);
-- Write a long to the BytesMessage payload
message.write_long(id, 1234567);
-- Write a short to the BytesMessage payload
message.write_short(id, 123);
-- Write a String to the BytesMessage payload,
-- the String is encoded in UTF8 in the message payload
message.write_utf(id, 'Hello World!');
-- Flush the data from JAVA stored procedure (JServ) to PL/SQL side
-- Without doing this, the PL/SQL message is still empty.
message.flush(id);
-- Use either clean_all or clean to clean up the message store when the user
-- do not plan to do paylaod population on this message anymore
sys.aq$_jms_bytes_message.clean_all();
--message.clean(id);
-- Enqueue this message into AQ queue using DBMS_AQ package
dbms_aq.enqueue(queue_name => 'jmsuser.jms_bytes_que',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => message,
msgid => msgid);
EXCEPTION
WHEN java_exp THEN
dbms_output.put_line('exception information:');
display_exp(sys.aq$_jms_stream_message.get_exception());
END;
/
commit;
例6-99 JMS BytesMessageデータのデキューおよび取出し
set echo off
set verify off
DROP USER jmsuser CASCADE;
ACCEPT password CHAR PROMPT 'Enter the password for JMSUSER: ' HIDE
CREATE USER jmsuser IDENTIFIED BY &password;
GRANT EXECUTE ON DBMS_AQADM TO jmsuser;
GRANT EXECUTE ON DBMS_AQ TO jmsuser;
GRANT EXECUTE ON DBMS_LOB TO jmsuser;
GRANT EXECUTE ON DBMS_JMS_PLSQL TO jmsuser;
connect jmsuser/&password
set echo on
set serveroutput on size 20000
DECLARE
id pls_integer;
blob_data blob;
clob_data clob;
blob_len pls_integer;
message sys.aq$_jms_bytes_message;
agent sys.aq$_agent;
dequeue_options dbms_aq.dequeue_options_t;
message_properties dbms_aq.message_properties_t;
msgid raw(16);
gdata sys.aq$_jms_value;
java_exp exception;
pragma EXCEPTION_INIT(java_exp, -24197);
BEGIN
DBMS_OUTPUT.ENABLE (20000);
-- Dequeue this message from AQ queue using DBMS_AQ package
dbms_aq.dequeue(queue_name => 'jmsuser.jms_bytes_que',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => message,
msgid => msgid);
-- Retrieve the header
agent := message.get_replyto;
dbms_output.put_line('Type: ' || message.get_type ||
' UserId: ' || message.get_userid ||
' AppId: ' || message.get_appid ||
' GroupId: ' || message.get_groupid ||
' GroupSeq: ' || message.get_groupseq);
-- Retrieve the user properties
dbms_output.put_line('price: ' || message.get_float_property('price'));
dbms_output.put_line('color: ' || message.get_string_property('color'));
IF message.get_boolean_property('import') = TRUE THEN
dbms_output.put_line('import: Yes' );
ELSIF message.get_boolean_property('import') = FALSE THEN
dbms_output.put_line('import: No' );
END IF;
dbms_output.put_line('year: ' || message.get_int_property('year'));
dbms_output.put_line('mileage: ' || message.get_long_property('mileage'));
dbms_output.put_line('password: ' || message.get_byte_property('password'));
-- Shows how to retrieve the message payload of aq$_jms_bytes_message
-- Prepare call, send the content in the PL/SQL aq$_jms_bytes_message object to
-- Java stored procedure(Jserv) in the form of a byte array.
-- Passing -1 reserves a new slot in msg store of sys.aq$_jms_bytes_message.
-- Max number of sys.aq$_jms_bytes_message type of messges to be operated at
-- the same time in a session is 20. Call clean_body fn. with parameter -1
-- might result in ORA-24199 error if messages operated on are already 20.
-- You must call clean or clean_all function to clean up message store.
id := message.prepare(-1);
-- Read data from BytesMessage paylaod. These fns. are analogy of JMS Java
-- API's. See the JMS Types chapter for detail.
dbms_output.put_line('Payload:');
-- read a byte from the BytesMessage payload
dbms_output.put_line('read_byte:' || message.read_byte(id));
-- read a byte array into a blob object from the BytesMessage payload
dbms_output.put_line('read_bytes:');
blob_len := message.read_bytes(id, blob_data, 272);
display_blob(blob_data);
-- read a char from the BytesMessage payload
dbms_output.put_line('read_char:'|| message.read_char(id));
-- read a double from the BytesMessage payload
dbms_output.put_line('read_double:'|| message.read_double(id));
-- read a float from the BytesMessage payload
dbms_output.put_line('read_float:'|| message.read_float(id));
-- read a int from the BytesMessage payload
dbms_output.put_line('read_int:'|| message.read_int(id));
-- read a long from the BytesMessage payload
dbms_output.put_line('read_long:'|| message.read_long(id));
-- read a short from the BytesMessage payload
dbms_output.put_line('read_short:'|| message.read_short(id));
-- read a String from the BytesMessage payload.
-- the String is in UTF8 encoding in the message payload
dbms_output.put_line('read_utf:');
message.read_utf(id, clob_data);
display_clob(clob_data);
-- Use either clean_all or clean to clean up the message store when the user
-- do not plan to do paylaod retrieving on this message anymore
message.clean(id);
-- sys.aq$_jms_bytes_message.clean_all();
EXCEPTION
WHEN java_exp THEN
dbms_output.put_line('exception information:');
display_exp(sys.aq$_jms_bytes_message.get_exception());
END;
/
commit;JMS StreamMessageの例
この項では、JMS StreamMessageのエンキューおよびデキューの説明の例を示します。
例6-100では、JMS型メンバー関数とDBMS_AQ関数を使用して、データベース内でsys.aq$_jms_stream_message型として表示されるJMS StreamMessageを移入およびエンキューする方法を紹介します。このメッセージは、後でOracle JMSクライアントによってデキューできます。
例6-101では、JMS型メンバー関数とDBMS_AQ関数を使用して、データベース内でsys.aq$_jms_stream_message型として表示されるJMS StreamMessageからデータをデキューおよび取り出す方法を紹介します。このメッセージは、Oracle JMSクライアントによってエンキューされる場合もあります。
例6-100 JMS StreamMessageの移入とエンキュー
set echo off
set verify off
DROP USER jmsuser CASCADE;
ACCEPT password CHAR PROMPT 'Enter the password for JMSUSER: ' HIDE
CREATE USER jmsuser IDENTIFIED BY &password;
GRANT EXECUTE ON DBMS_AQADM TO jmsuser;
GRANT EXECUTE ON DBMS_AQ TO jmsuser;
GRANT EXECUTE ON DBMS_LOB TO jmsuser;
GRANT EXECUTE ON DBMS_JMS_PLSQL TO jmsuser;
connect jmsuser/&password
SET ECHO ON
set serveroutput on
DECLARE
id pls_integer;
agent sys.aq$_agent := sys.aq$_agent(' ', null, 0);
message sys.aq$_jms_stream_message;
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
msgid raw(16);
java_exp exception;
pragma EXCEPTION_INIT(java_exp, -24197);
BEGIN
-- Consturct a empty StreamMessage object
message := sys.aq$_jms_stream_message.construct;
-- Shows how to set the JMS header
message.set_replyto(agent);
message.set_type('tkaqpet1');
message.set_userid('jmsuser');
message.set_appid('plsql_enq');
message.set_groupid('st');
message.set_groupseq(1);
-- Shows how to set JMS user properties
message.set_string_property('color', 'RED');
message.set_int_property('year', 1999);
message.set_float_property('price', 16999.99);
message.set_long_property('mileage', 300000);
message.set_boolean_property('import', True);
message.set_byte_property('password', -127);
-- Shows how to populate the message payload of aq$_jms_stream_message
-- Passing -1 reserve a new slot within the message store of sys.aq$_jms_stream_message.
-- The maximum number of sys.aq$_jms_stream_message type of messges to be operated at
-- the same time within a session is 20. Calling clean_body function with parameter -1
-- might result a ORA-24199 error if the messages currently operated is already 20.
-- The user is responsible to call clean or clean_all function to clean up message store.
id := message.clear_body(-1);
-- Write data into the message paylaod. These functions are analogy of JMS JAVA api's.
-- See the document for detail.
-- Write a byte to the StreamMessage payload
message.write_byte(id, 10);
-- Write a RAW data as byte array to the StreamMessage payload
message.write_bytes(id, UTL_RAW.XRANGE(HEXTORAW('00'), HEXTORAW('FF')));
-- Write a portion of the RAW data as byte array to the StreamMessage payload
-- Note the offset follows JAVA convention, starting from 0
message.write_bytes(id, UTL_RAW.XRANGE(HEXTORAW('00'), HEXTORAW('FF')), 0, 16);
-- Write a char to the StreamMessage payload
message.write_char(id, 'A');
-- Write a double to the StreamMessage payload
message.write_double(id, 9999.99);
-- Write a float to the StreamMessage payload
message.write_float(id, 99.99);
-- Write a int to the StreamMessage payload
message.write_int(id, 12345);
-- Write a long to the StreamMessage payload
message.write_long(id, 1234567);
-- Write a short to the StreamMessage payload
message.write_short(id, 123);
-- Write a String to the StreamMessage payload
message.write_string(id, 'Hello World!');
-- Flush the data from JAVA stored procedure (JServ) to PL/SQL side
-- Without doing this, the PL/SQL message is still empty.
message.flush(id);
-- Use either clean_all or clean to clean up the message store when the user
-- do not plan to do paylaod population on this message anymore
sys.aq$_jms_stream_message.clean_all();
--message.clean(id);
-- Enqueue this message into AQ queue using DBMS_AQ package
dbms_aq.enqueue(queue_name => 'jmsuser.jms_stream_que',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => message,
msgid => msgid);
EXCEPTION
WHEN java_exp THEN
dbms_output.put_line('exception information:');
display_exp(sys.aq$_jms_stream_message.get_exception());
END;
/
commit;
例6-101 JMS StreamMessageからのデータのデキューおよび取出し
set echo off
set verify off
DROP USER jmsuser CASCADE;
ACCEPT password CHAR PROMPT 'Enter the password for JMSUSER: ' HIDE
CREATE USER jmsuser IDENTIFIED BY &password;
GRANT EXECUTE ON DBMS_AQADM TO jmsuser;
GRANT EXECUTE ON DBMS_AQ TO jmsuser;
GRANT EXECUTE ON DBMS_LOB TO jmsuser;
GRANT EXECUTE ON DBMS_JMS_PLSQL TO jmsuser;
connect jmsuser/&password
set echo on
set serveroutput on
DECLARE
id pls_integer;
blob_data blob;
clob_data clob;
message sys.aq$_jms_stream_message;
agent sys.aq$_agent;
dequeue_options dbms_aq.dequeue_options_t;
message_properties dbms_aq.message_properties_t;
msgid raw(16);
gdata sys.aq$_jms_value;
java_exp exception;
pragma EXCEPTION_INIT(java_exp, -24197);
BEGIN
DBMS_OUTPUT.ENABLE (20000);
-- Dequeue this message from AQ queue using DBMS_AQ package
dbms_aq.dequeue(queue_name => 'jmsuser.jms_stream_que',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => message,
msgid => msgid);
-- Retrieve the header
agent := message.get_replyto;
dbms_output.put_line('Type: ' || message.get_type ||
' UserId: ' || message.get_userid ||
' AppId: ' || message.get_appid ||
' GroupId: ' || message.get_groupid ||
' GroupSeq: ' || message.get_groupseq);
-- Retrieve the user properties
dbms_output.put_line('price: ' || message.get_float_property('price'));
dbms_output.put_line('color: ' || message.get_string_property('color'));
IF message.get_boolean_property('import') = TRUE THEN
dbms_output.put_line('import: Yes' );
ELSIF message.get_boolean_property('import') = FALSE THEN
dbms_output.put_line('import: No' );
END IF;
dbms_output.put_line('year: ' || message.get_int_property('year'));
dbms_output.put_line('mileage: ' || message.get_long_property('mileage'));
dbms_output.put_line('password: ' || message.get_byte_property('password'));
-- Shows how to retrieve the message payload of aq$_jms_stream_message
-- The prepare call send the content in the PL/SQL aq$_jms_stream_message object to
-- JAVA stored procedure(Jserv) in the form of byte array.
-- Passing -1 reserve a new slot within the message store of sys.aq$_jms_stream_message.
-- The maximum number of sys.aq$_jms_stream_message type of messges to be operated at
-- the same time within a session is 20. Calling clean_body function with parameter -1
-- might result a ORA-24199 error if the messages currently operated is already 20.
-- The user is responsible to call clean or clean_all function to clean up message store.
id := message.prepare(-1);
-- Assume the users know the types of data in the StreamMessage payload.
-- The user can use the specific read function corresponding with the data type.
-- These functions are analogy of JMS JAVA api's. See the document for detail.
dbms_output.put_line('Retrieve payload by Type:');
-- Read a byte from the StreamMessage payload
dbms_output.put_line('read_byte:' || message.read_byte(id));
-- Read a byte array into a blob object from the StreamMessage payload
dbms_output.put_line('read_bytes:');
message.read_bytes(id, blob_data);
display_blob(blob_data);
-- Read another byte array into a blob object from the StreamMessage payload
dbms_output.put_line('read_bytes:');
message.read_bytes(id, blob_data);
display_blob(blob_data);
-- Read a char from the StreamMessage payload
dbms_output.put_line('read_char:'|| message.read_char(id));
-- Read a double from the StreamMessage payload
dbms_output.put_line('read_double:'|| message.read_double(id));
-- Read a float from the StreamMessage payload
dbms_output.put_line('read_float:'|| message.read_float(id));
-- Read a int from the StreamMessage payload
dbms_output.put_line('read_int:'|| message.read_int(id));
-- Read a long from the StreamMessage payload
dbms_output.put_line('read_long:'|| message.read_long(id));
-- Read a short from the StreamMessage payload
dbms_output.put_line('read_short:'|| message.read_short(id));
-- Read a String into a clob data from the StreamMessage payload
dbms_output.put_line('read_string:');
message.read_string(id, clob_data);
display_clob(clob_data);
-- Assume the users do not know the types of data in the StreamMessage payload.
-- The user can use read_object method to read the data into a sys.aq$_jms_value object
-- These functions are analogy of JMS JAVA api's. See the document for detail.
-- Reset the stream pointer to the begining of the message so that we can read throught
-- the message payload again.
message.reset(id);
LOOP
message.read_object(id, gdata);
IF gdata IS NULL THEN
EXIT;
END IF;
CASE gdata.type
WHEN sys.dbms_jms_plsql.DATA_TYPE_BYTE THEN
dbms_output.put_line('read_object/byte:' || gdata.num_val);
WHEN sys.dbms_jms_plsql.DATA_TYPE_SHORT THEN
dbms_output.put_line('read_object/short:' || gdata.num_val);
WHEN sys.dbms_jms_plsql.DATA_TYPE_INTEGER THEN
dbms_output.put_line('read_object/int:' || gdata.num_val);
WHEN sys.dbms_jms_plsql.DATA_TYPE_LONG THEN
dbms_output.put_line('read_object/long:' || gdata.num_val);
WHEN sys.dbms_jms_plsql.DATA_TYPE_FLOAT THEN
dbms_output.put_line('read_object/float:' || gdata.num_val);
WHEN sys.dbms_jms_plsql.DATA_TYPE_DOUBLE THEN
dbms_output.put_line('read_object/double:' || gdata.num_val);
WHEN sys.dbms_jms_plsql.DATA_TYPE_BOOLEAN THEN
dbms_output.put_line('read_object/boolean:' || gdata.num_val);
WHEN sys.dbms_jms_plsql.DATA_TYPE_CHARACTER THEN
dbms_output.put_line('read_object/char:' || gdata.char_val);
WHEN sys.dbms_jms_plsql.DATA_TYPE_STRING THEN
dbms_output.put_line('read_object/string:');
display_clob(gdata.text_val);
WHEN sys.dbms_jms_plsql.DATA_TYPE_BYTES THEN
dbms_output.put_line('read_object/bytes:');
display_blob(gdata.bytes_val);
ELSE dbms_output.put_line('No such data type');
END CASE;
END LOOP;
-- Use either clean_all or clean to clean up the message store when the user
-- do not plan to do paylaod retrieving on this message anymore
message.clean(id);
-- sys.aq$_jms_stream_message.clean_all();
EXCEPTION
WHEN java_exp THEN
dbms_output.put_line('exception information:');
display_exp(sys.aq$_jms_stream_message.get_exception());
END;
/
commit;JMS MapMessageの例
この項では、JMS MapMessageのエンキューおよびデキューの説明の例を示します。
例6-102では、JMS型メンバー関数とDBMS_AQ関数を使用して、データベース内でsys.aq$_jms_map_message型として表示されるJMS MapMessageを移入およびエンキューする方法を紹介します。このメッセージは、後でOracle JMSクライアントによってデキューできます。
例6-103では、JMS型メンバー関数とDBMS_AQ関数を使用して、データベース内でsys.aq$_jms_map_message型として表示されるJMS MapMessageからデータをデキューおよび取り出す方法を紹介します。このメッセージは、Oracle JMSクライアントによってエンキューされる場合もあります。
例6-102 JMS MapMessageの移入とエンキュー
set echo off
set verify off
DROP USER jmsuser CASCADE;
ACCEPT password CHAR PROMPT 'Enter the password for JMSUSER: ' HIDE
CREATE USER jmsuser IDENTIFIED BY &password;
GRANT EXECUTE ON DBMS_AQADM TO jmsuser;
GRANT EXECUTE ON DBMS_AQ TO jmsuser;
GRANT EXECUTE ON DBMS_LOB TO jmsuser;
GRANT EXECUTE ON DBMS_JMS_PLSQL TO jmsuser;
connect jmsuser/&password
SET ECHO ON
set serveroutput on
DECLARE
id pls_integer;
agent sys.aq$_agent := sys.aq$_agent(' ', null, 0);
message sys.aq$_jms_map_message;
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
msgid raw(16);
java_exp exception;
pragma EXCEPTION_INIT(java_exp, -24197);
BEGIN
-- Consturct a empty map message object
message := sys.aq$_jms_map_message.construct;
-- Shows how to set the JMS header
message.set_replyto(agent);
message.set_type('tkaqpet1');
message.set_userid('jmsuser');
message.set_appid('plsql_enq');
message.set_groupid('st');
message.set_groupseq(1);
-- Shows how to set JMS user properties
message.set_string_property('color', 'RED');
message.set_int_property('year', 1999);
message.set_float_property('price', 16999.99);
message.set_long_property('mileage', 300000);
message.set_boolean_property('import', True);
message.set_byte_property('password', -127);
-- Shows how to populate the message payload of aq$_jms_map_message
-- Passing -1 reserve a new slot within the message store of sys.aq$_jms_map_message.
-- The maximum number of sys.aq$_jms_map_message type of messges to be operated at
-- the same time within a session is 20. Calling clean_body function with parameter -1
-- might result a ORA-24199 error if the messages currently operated is already 20.
-- The user is responsible to call clean or clean_all function to clean up message store.
id := message.clear_body(-1);
-- Write data into the message paylaod. These functions are analogy of JMS JAVA api's.
-- See the document for detail.
-- Set a byte entry in map message payload
message.set_byte(id, 'BYTE', 10);
-- Set a byte array entry using RAW data in map message payload
message.set_bytes(id, 'BYTES', UTL_RAW.XRANGE(HEXTORAW('00'), HEXTORAW('FF')));
-- Set a byte array entry using only a portion of the RAW data in map message payload
-- Note the offset follows JAVA convention, starting from 0
message.set_bytes(id, 'BYTES_PART', UTL_RAW.XRANGE(HEXTORAW('00'), HEXTORAW('FF')), 0, 16);
-- Set a char entry in map message payload
message.set_char(id, 'CHAR', 'A');
-- Set a double entry in map message payload
message.set_double(id, 'DOUBLE', 9999.99);
-- Set a float entry in map message payload
message.set_float(id, 'FLOAT', 99.99);
-- Set a int entry in map message payload
message.set_int(id, 'INT', 12345);
-- Set a long entry in map message payload
message.set_long(id, 'LONG', 1234567);
-- Set a short entry in map message payload
message.set_short(id, 'SHORT', 123);
-- Set a String entry in map message payload
message.set_string(id, 'STRING', 'Hello World!');
-- Flush the data from JAVA stored procedure (JServ) to PL/SQL side
-- Without doing this, the PL/SQL message is still empty.
message.flush(id);
-- Use either clean_all or clean to clean up the message store when the user
-- do not plan to do paylaod population on this message anymore
sys.aq$_jms_map_message.clean_all();
--message.clean(id);
-- Enqueue this message into AQ queue using DBMS_AQ package
dbms_aq.enqueue(queue_name => 'jmsuser.jms_map_que',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => message,
msgid => msgid);
END;
/
commit;
例6-103 JMS MapMessageからのデータのデキューおよび取出し
set echo off
set verify off
DROP USER jmsuser CASCADE;
ACCEPT password CHAR PROMPT 'Enter the password for JMSUSER: ' HIDE
CREATE USER jmsuser IDENTIFIED BY &password;
GRANT EXECUTE ON DBMS_AQADM TO jmsuser;
GRANT EXECUTE ON DBMS_AQ TO jmsuser;
GRANT EXECUTE ON DBMS_LOB TO jmsuser;
GRANT EXECUTE ON DBMS_JMS_PLSQL TO jmsuser;
connect jmsuser/&password
set echo on
set serveroutput on
DECLARE
id pls_integer;
blob_data blob;
clob_data clob;
message sys.aq$_jms_map_message;
agent sys.aq$_agent;
dequeue_options dbms_aq.dequeue_options_t;
message_properties dbms_aq.message_properties_t;
msgid raw(16);
name_arr sys.aq$_jms_namearray;
gdata sys.aq$_jms_value;
java_exp exception;
pragma EXCEPTION_INIT(java_exp, -24197);
BEGIN
DBMS_OUTPUT.ENABLE (20000);
-- Dequeue this message from AQ queue using DBMS_AQ package
dbms_aq.dequeue(queue_name => 'jmsuser.jms_map_que',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => message,
msgid => msgid);
-- Retrieve the header
agent := message.get_replyto;
dbms_output.put_line('Type: ' || message.get_type ||
' UserId: ' || message.get_userid ||
' AppId: ' || message.get_appid ||
' GroupId: ' || message.get_groupid ||
' GroupSeq: ' || message.get_groupseq);
-- Retrieve the user properties
dbms_output.put_line('price: ' || message.get_float_property('price'));
dbms_output.put_line('color: ' || message.get_string_property('color'));
IF message.get_boolean_property('import') = TRUE THEN
dbms_output.put_line('import: Yes' );
ELSIF message.get_boolean_property('import') = FALSE THEN
dbms_output.put_line('import: No' );
END IF;
dbms_output.put_line('year: ' || message.get_int_property('year'));
dbms_output.put_line('mileage: ' || message.get_long_property('mileage'));
dbms_output.put_line('password: ' || message.get_byte_property('password'));
-- Shows how to retrieve the message payload of aq$_jms_map_message
-- 'Prepare' sends the content in the PL/SQL aq$_jms_map_message object to
-- Java stored procedure(Jserv) in the form of byte array.
-- Passing -1 reserve a new slot within the message store of
-- sys.aq$_jms_map_message. The maximum number of sys.aq$_jms_map_message
-- type of messges to be operated at the same time within a session is 20.
-- Calling clean_body function with parameter -1
-- might result a ORA-24199 error if the messages currently operated is
-- already 20. The user is responsible to call clean or clean_all function
-- to clean up message store.
id := message.prepare(-1);
-- Assume the users know the names and types in the map message payload.
-- The user can use names to get the corresponsing values.
-- These functions are analogous to JMS Java API's. See JMS Types chapter
-- for detail.
dbms_output.put_line('Retrieve payload by Name:');
-- Get a byte entry from the map message payload
dbms_output.put_line('get_byte:' || message.get_byte(id, 'BYTE'));
-- Get a byte array entry from the map message payload
dbms_output.put_line('get_bytes:');
message.get_bytes(id, 'BYTES', blob_data);
display_blob(blob_data);
-- Get another byte array entry from the map message payload
dbms_output.put_line('get_bytes:');
message.get_bytes(id, 'BYTES_PART', blob_data);
display_blob(blob_data);
-- Get a char entry from the map message payload
dbms_output.put_line('get_char:'|| message.get_char(id, 'CHAR'));
-- get a double entry from the map message payload
dbms_output.put_line('get_double:'|| message.get_double(id, 'DOUBLE'));
-- Get a float entry from the map message payload
dbms_output.put_line('get_float:'|| message.get_float(id, 'FLOAT'));
-- Get a int entry from the map message payload
dbms_output.put_line('get_int:'|| message.get_int(id, 'INT'));
-- Get a long entry from the map message payload
dbms_output.put_line('get_long:'|| message.get_long(id, 'LONG'));
-- Get a short entry from the map message payload
dbms_output.put_line('get_short:'|| message.get_short(id, 'SHORT'));
-- Get a String entry from the map message payload
dbms_output.put_line('get_string:');
message.get_string(id, 'STRING', clob_data);
display_clob(clob_data);
-- Assume users do not know names and types in map message payload.
-- User can first retrieve the name array containing all names in the
-- payload and iterate through the name list and get the corresponding
-- value. These functions are analogous to JMS Java API's.
-- See JMS Type chapter for detail.
dbms_output.put_line('Retrieve payload by iteration:');
-- Get the name array from the map message payload
name_arr := message.get_names(id);
-- Iterate through the name array to retrieve the value for each of the name.
FOR i IN name_arr.FIRST..name_arr.LAST LOOP
-- Test if a name exist in the map message payload
-- (It is not necessary in this case, just a demostration on how to use it)
IF message.item_exists(id, name_arr(i)) THEN
dbms_output.put_line('item exists:'||name_arr(i));
-- Because we do not know the type of entry, we must use sys.aq$_jms_value
-- type object for the data returned
message.get_object(id, name_arr(i), gdata);
IF gdata IS NOT NULL THEN
CASE gdata.type
WHEN sys.dbms_jms_plsql.DATA_TYPE_BYTE
THEN dbms_output.put_line('get_object/byte:' || gdata.num_val);
WHEN sys.dbms_jms_plsql.DATA_TYPE_SHORT
THEN dbms_output.put_line('get_object/short:' || gdata.num_val);
WHEN sys.dbms_jms_plsql.DATA_TYPE_INTEGER
THEN dbms_output.put_line('get_object/int:' || gdata.num_val);
WHEN sys.dbms_jms_plsql.DATA_TYPE_LONG
THEN dbms_output.put_line('get_object/long:' || gdata.num_val);
WHEN sys.dbms_jms_plsql.DATA_TYPE_FLOAT
THEN dbms_output.put_line('get_object/float:' || gdata.num_val);
WHEN sys.dbms_jms_plsql.DATA_TYPE_DOUBLE
THEN dbms_output.put_line('get_object/double:' || gdata.num_val);
WHEN sys.dbms_jms_plsql.DATA_TYPE_BOOLEAN
THEN dbms_output.put_line('get_object/boolean:' || gdata.num_val);
WHEN sys.dbms_jms_plsql.DATA_TYPE_CHARACTER
THEN dbms_output.put_line('get_object/char:' || gdata.char_val);
WHEN sys.dbms_jms_plsql.DATA_TYPE_STRING
THEN dbms_output.put_line('get_object/string:');
display_clob(gdata.text_val);
WHEN sys.dbms_jms_plsql.DATA_TYPE_BYTES
THEN
dbms_output.put_line('get_object/bytes:');
display_blob(gdata.bytes_val);
ELSE dbms_output.put_line('No such data type');
END CASE;
END IF;
ELSE
dbms_output.put_line('item not exists:'||name_arr(i));
END IF;
END LOOP;
-- Use either clean_all or clean to clean up the message store when the user
-- do not plan to do paylaod population on this message anymore
message.clean(id);
-- sys.aq$_jms_map_message.clean_all();
EXCEPTION
WHEN java_exp THEN
dbms_output.put_line('exception information:');
display_exp(sys.aq$_jms_stream_message.get_exception());
END;
/
commit;その他のOracle Database Advanced Queuing JMSの例
例6-104のサンプル・プログラムでは、JMS TEXTメッセージを保持するために、Oracle JMS管理インタフェースによって作成されるOracle Database Advanced Queuingキュー内に、大きなTextMessageを(JMSユーザー・プロパティとともに)エンキューします。この例でエンキューされるTextMessageおよびBytesMessageは、どちらもOracle JMSクライアントでデキューできます。
例6-105のサンプル・プログラムでは、大きなBytesMessageをエンキューします。
例6-104 大きなTextMessageのエンキュー
DECLARE
text varchar2(32767);
agent sys.aq$_agent := sys.aq$_agent(' ', null, 0);
message sys.aq$_jms_text_message;
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
msgid raw(16);
BEGIN
message := sys.aq$_jms_text_message.construct;
message.set_replyto(agent);
message.set_type('tkaqpet2');
message.set_userid('jmsuser');
message.set_appid('plsql_enq');
message.set_groupid('st');
message.set_groupseq(1);
message.set_boolean_property('import', True);
message.set_string_property('color', 'RED');
message.set_short_property('year', 1999);
message.set_long_property('mileage', 300000);
message.set_double_property('price', 16999.99);
message.set_byte_property('password', 127);
FOR i IN 1..500 LOOP
text := CONCAT (text, '1234567890');
END LOOP;
message.set_text(text);
dbms_aq.enqueue(queue_name => 'jmsuser.jms_text_t1',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => message,
msgid => msgid);
END;
例6-105 大きなBytesMessageのエンキュー
DECLARE
text VARCHAR2(32767);
bytes RAW(32767);
agent sys.aq$_agent := sys.aq$_agent(' ', null, 0);
message sys.aq$_jms_bytes_message;
body BLOB;
position INT;
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
msgid raw(16);
BEGIN
message := sys.aq$_jms_bytes_message.construct;
message.set_replyto(agent);
message.set_type('tkaqper4');
message.set_userid('jmsuser');
message.set_appid('plsql_enq_raw');
message.set_groupid('st');
message.set_groupseq(1);
message.set_boolean_property('import', True);
message.set_string_property('color', 'RED');
message.set_short_property('year', 1999);
message.set_long_property('mileage', 300000);
message.set_double_property('price', 16999.99);
-- prepare a huge payload into a blob
FOR i IN 1..1000 LOOP
text := CONCAT (text, '0123456789ABCDEF');
END LOOP;
bytes := HEXTORAW(text);
dbms_lob.createtemporary(lob_loc => body, cache => TRUE);
dbms_lob.open (body, DBMS_LOB.LOB_READWRITE);
position := 1 ;
FOR i IN 1..10 LOOP
dbms_lob.write ( lob_loc => body,
amount => FLOOR((LENGTH(bytes)+1)/2),
offset => position,
buffer => bytes);
position := position + FLOOR((LENGTH(bytes)+1)/2) ;
END LOOP;
-- end of the preparation
message.set_bytes(body);
dbms_aq.enqueue(queue_name => 'jmsuser.jms_bytes_t1',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => message,
msgid => msgid);
dbms_lob.freetemporary(lob_loc => body);
END;
