6 Transactional Event QueuesおよびAdvanced Queuingに対応するJava Message Service
この章の内容は次のとおりです。
Oracle Transactional Event QueuesおよびAdvanced Queuingに対応するJava Messaging Serviceインタフェース
次のトピックでは、Oracle Database Advanced Queuing (AQ)へのOracle Java Message Service (JMS)インタフェースについて説明します。
JMSおよびOracle JMSの一般的な機能
この項の内容は次のとおりです。
JMSコネクションおよびセッション
この項の内容は次のとおりです。
ConnectionFactoryオブジェクト
ConnectionFactory
は、管理者によって定義された接続構成パラメータの集合をカプセル化します。クライアントはこれを使用してJMSプロバイダとの接続を確立します。この場合、Oracle Databaseの一部であるOracle JMSがJMSプロバイダです。
ConnectionFactory
オブジェクトには、次の3種類があります。
-
ConnectionFactory
-
QueueConnectionFactory
-
TopicConnectionFactory
AQjmsFactoryを使用したConnectionFactoryオブジェクトの取得
AQjmsFactory
クラスを使用して、ConnectionFactory
、QueueConnectionFactory
またはTopicConnectionFactory
オブジェクトに対するハンドルを取得できます。
Point-to-Point操作とパブリッシュ・サブスクライブ操作の両方をサポートしているConnectionFactory
を取得するには、AQjmsFactory.getConnectionFactory()
を使用します。QueueConnectionFactory
を取得するには、AQjmsFactory.getQueueConnectionFactory()
を使用します。TopicConnectionFactory
を取得するには、AQjmsFactory.getTopicConnectionFactory()
を使用します。
ConnectionFactory
、QueueConnectionFactory
またはTopicConnectionFactory
は、ホスト名、ポート番号、SIDドライバまたはJDBC URLおよびプロパティを使用して作成できます。
JNDIを使用したConnectionFactoryオブジェクトの検索
JMS管理者は、Lightweight Directory Access Protocol(LDAP)サーバーにConnectionFactory
オブジェクトを登録できます。JMSでJava Naming and Directory Interface(JNDI)の検索を使用可能にするには、次の設定が必要です。
JMSコネクション
JMSコネクション
は、クライアントとJMSプロバイダの間のアクティブなコネクションを表します。JMSコネクション
によって、次に示すいくつかの重要なサービスが実行されます。
-
JMSプロバイダとのオープン・コネクションまたはコネクション・プールのいずれかをカプセル化します。
-
クライアントとプロバイダのサービス・デーモン間のオープンなTCP/IPソケットを表します。
-
コネクション確立時に、クライアントを認証するための構造を提供します。
-
Sessions
を作成します -
コネクション・メタデータを提供します
-
オプションの
ExceptionListener
をサポートします。
データベースへのJMSコネクション
は、createConnection()
、createQueueConnection()
またはcreateTopicConnection()
を起動し、ConnectionFactory
オブジェクト、QueueConnectionFactory
オブジェクトまたはTopicConnectionFactory
オブジェクトにそれぞれパラメータusername
およびpassword
を渡して作成できます。
次に、Connection
オブジェクトでサポートされているメソッドのいくつかを示します。
-
start()
このメソッドは、着信メッセージの配信を開始または再開します。
-
stop()
このメソッドは、着信メッセージの配信を一時停止します。
Connection
オブジェクトが停止すると、すべてのメッセージ・コンシューマへの配信は禁止されます。また、同期受信のブロックおよびメッセージは、メッセージ・リスナーに配信されません。 -
close()
このメソッドは、JMSセッションをクローズして、関連付けられたすべてのリソースを解放します。
-
createSession(true, 0)
このメソッドは、JMS
Connection
インスタンスを使用してJMSSession
を作成します。 -
createQueueSession(true,
0)
このメソッドは
QueueSession
を作成します。 -
createTopicSession(true,
0
)このメソッドは
TopicSession
を作成します。 -
setExceptionListener(ExceptionListener)
このメソッドは、
Connection
の例外リスナーを設定します。問題が非同期でクライアントに通知されます。メッセージを処理するのみのConnection
の場合、その失敗を確認する他の方法はありません。 -
getExceptionListener()
このメソッドは、この
Connection
のExceptionListener
を取得します。
JMSクライアントは、通常1つのConnection
、1つのSession
、複数のMessageProducer
およびMessageConsumer
オブジェクトを作成します。現在のバージョンでは、次の場合を除いて、1つのConnection
につき1つのオープンSession
のみが許可されています。
-
JDBC OCI8ドライバを使用してJMSコネクションを作成する場合
-
ユーザーが、JMSコネクションの作成時に
OracleOCIConnectionPool
インスタンスを提供する場合
Connection
は停止モードで作成されます。この状態では、メッセージを受信できません。通常、設定が完了するまで、Connection
を停止モードにしておきます。その時点でConnection
start()
メソッドがコールされ、メッセージがConnection
コンシューマに配信され始めます。このような設定規則を確立することにより、クライアントが設定されている最中の非同期メッセージ配信の結果、発生するクライアントの混乱を最小限に止められます。
Connection
を起動し、続けて設定を実行できます。これを実行するクライアントは、設定処理中に非同期メッセージの配信処理の準備をする必要があります。MessageProducer
は、Connection
が停止中でもメッセージを送信できます。
JMSセッション
JMS Session
は、メッセージの作成および処理用の単一スレッドのコンテキストです。これは、Java Virtual Machine(JVM)の外でプロバイダ・リソースを割り当てますが、軽量なJMSオブジェクトとみなされます。
Session
には、次の役割があります。
-
MessageProducer
およびMessageConsumer
オブジェクトのファクトリを構成します。 -
宛先オブジェクト(キュー/トピック)に対するハンドルの取得方法を提供します。
-
プロバイダが最適化したメッセージ・ファクトリを提供します。
-
セッションの
MessageProducer
およびMessageConsumer
オブジェクトにまたがる作業を組み合せる一連のトランザクションをサポートし、これらを単位に構成します。 -
セッションが処理および作成するメッセージのシリアル順序を定義します。
-
セッションに登録された
MessageListener
オブジェクトの実行をシリアル化します。
Oracle Database 20cでは、JDBC thinまたはJDBC thick(OCI)ドライバのいずれかを使用すると、単一のJMS Connection
を使用して、リソースが許すかぎり多くのJMS Session
を作成できます。
プロバイダが、JVM外のSession
のためのリソースを割り当てることができるため、クライアントはリソースが必要ないときはこれらを閉じる必要があります。ガベージ・コレクションによる最終的なリソースの解放を待つ必要はありません。Session
が作成したMessageProducer
およびMessageConsumer
オブジェクトについても同様です。
Session
オブジェクトのメソッドには、次のものが含まれます。
-
commit()
このメソッドは、トランザクションで実行されるすべてのメッセージをコミットして、現在保持されているロックを解放します。
-
rollback()
このメソッドは、トランザクションで実行されたすべてのメッセージをロールバックして、現在保持されているロックを解放します。
-
close()
このメソッドは
Session
をクローズします。 -
getDBConnection()
このメソッドは、基礎となるJDBCコネクションに対するハンドルを取得します。このハンドルを使用して、他のSQL DML操作を同じ
Session
の一部として実行できます。このメソッドは、Oracle JMSに固有です。 -
acknowledge()
このメソッドは、非トランザクション・セッションでメッセージの受信を承認します。
-
recover()
このメソッドは、非トランザクション・セッションでメッセージの配信を再開します。セッション中に配信された一連のメッセージは、最後に承認されたメッセージの後でリセットされます。
Oracle JMSの拡張例は、次のとおりです。
-
createQueueTable()
このメソッドはキュー表を作成します。
-
getQueueTable()
このメソッドは既存のキュー表に対するハンドルを取得します。
-
createQueue()
このメソッドはキューを作成します。
-
getQueue()
このメソッドは既存のキューに対するハンドルを取得します。
-
createTopic()
このメソッドはトピックを作成します。
-
getTopic()
このメソッドは既存のトピックに対するハンドルを取得します。
どの拡張機能を使用する場合も、Session
オブジェクトをAQjmsSession
にキャストする必要があります。
ノート:
JMS仕様では、開始されていないJMS Connection
インスタンスで受信が完了したとき、プロバイダによりNULLメッセージが返されることを要求します。
javax.jms.Connection
インスタンスの作成後、start()
メソッドをそのインスタンスにコールしてメッセージを受信できるようにする必要があります。コネクション確立後で実際に受信する前に、t_conn.start();
などの行を追加するとメッセージを受信できます。
JMS宛先
Destination
は、クライアントがメッセージの送信先および受信元の指定に使用するオブジェクトです。Destination
オブジェクトはQueue
またはTopic
に指定できます。Oracle Database Advanced Queuingでは、特定のデータベースのschema.queue
にマップされます。Queue
は単一コンシューマ・キューに、Topic
はマルチ・コンシューマ・キューにマップします。
JMSセッションを使用したDestinationオブジェクトの取得
Destination
オブジェクトは、Session
オブジェクトからドメイン固有の次のSession
メソッドを使用して作成されます。
-
AQjmsSession.getQueue(queue_owner, queue_name)
このメソッドはJMSキューに対するハンドルを取得します。
-
AQjmsSession.getTopic(topic_owner, topic_name)
このメソッドはJMSトピックに対するハンドルを取得します。
JNDIを使用したDestinationオブジェクトの検索
LDAPサーバーにスキーマ・オブジェクトを登録するようにデータベースを設定できます。データベースがLDAPを使用できるように設定され、GLOBAL_TOPIC_ENABLEDパラメータがTRUEに設定されている場合、すべてのJMSキューおよびトピックは、作成時に自動的にLDAPサーバーに登録されます。管理者は、LDAPに登録されたキューおよびトピックの別名を作成することもできます。LDAPに登録されたキューおよびトピックは、JNDIを介してキューまたはトピックの名前または別名を使用してルックアップできます。
関連項目:
JMS宛先メソッド
Destination
オブジェクトのメソッドには、次のものが含まれます。
-
alter()
このメソッドは
キュー
またはトピック
を変更します。 -
schedulePropagation()
このメソッドは、ソースから宛先への伝播をスケジューリングします。
-
unschedulePropagation()
このメソッドは、スケジュール済伝播のスケジュールを解除します。
-
enablePropagationSchedule()
このメソッドは、伝播スケジュールを有効化します。
-
disablePropagationSchedule()
このメソッドは、伝播スケジュールを無効化します。
-
start()
このメソッドは
キュー
またはトピック
を開始します。キューは、エンキューまたはデキューを可能にするために開始されます。トピックは、パブリッシュまたはサブスクライブを可能にするために開始できます。 -
stop()
このメソッドは
キュー
またはトピック
を停止します。キューは、エンキューまたはデキューを実行不可にするために停止できます。トピックは、パブリッシュまたはサブスクライブを実行不可にするために停止できます。 -
drop()
このメソッドは
キュー
またはトピック
を削除します。
JMSでのシステム・レベルのアクセス制御
Oracle8i以上では、すべてのキューイング操作に対してシステム・レベルのアクセス制御をサポートします。この機能によって、アプリケーション設計者またはDBAは、ユーザーをキュー管理者にできます。キュー管理者は、データベースのどのキューに対してもJMSインタフェース(管理および操作)を起動できます。これによって、データベース上のキュー全体に対するすべての管理スクリプトを1つのスキーマで管理できるため、管理作業が容易になります。
メッセージが宛先キューに届くと、ソース・キューのスキーマ名に基づいたセッションによって、新しく届いたメッセージが宛先キューにエンキューされます。つまり、ソース・キューのスキーマに宛先キューに対するエンキュー権限を付与する必要があります。
リモートの宛先キューに伝播するために、エージェント構造体のアドレス・フィールドのデータベース・リンクに指定されたログイン・ユーザーには、ENQUEUE_ANY
権限を付与するか、または宛先キューに対するエンキュー権限を付与する必要があります。ただし、データベース・リンクのログイン・ユーザーが宛先のキュー表を所有している場合はどのような明示的な権限も付与する必要はありません。
JMSでの宛先レベルのアクセス制御
Oracle8i以上では、エンキューおよびデキュー操作に対してキュー・レベルまたはトピック・レベルのアクセス制御をサポートしています。この機能によって、アプリケーション設計者は、あるスキーマに作成されたキューおよびトピックを他のスキーマで実行中のアプリケーションから保護できます。そのキューまたはトピックが属するスキーマの外で実行しているアプリケーションには、最小限のアクセス権限のみを付与できます。キューまたはトピックに対するアクセス権限として、ENQUEUE
、DEQUEUE
およびALL
がサポートされています。
JMSでの保存およびメッセージ履歴
メッセージは、相互に関連していることがよくあります。たとえば、あるメッセージを処理した結果として他のメッセージが生成された場合、両者は関連付けられています。アプリケーション設計者としては、そのような関連を追跡することが必要な場合があります。Oracle Database Advanced Queuingでは、ユーザーがメッセージをキュー表に保持し、分析のためにSQLで問合せできます。
保存機能およびメッセージ識別子とともに、メッセージ・ジャーナルがOracle Database Advanced Queuingによって自動作成され、追跡ジャーナルまたはイベント・ジャーナルをコールできます。保存、メッセージ識別子およびSQL問合せの協調によって、強力なメッセージ・ウェアハウスを構築できます。
JMSでのOracle Real Application Clustersのサポート
トランザクション・イベント・キュー(TxEventQ)とは、システム管理のパーティション化を使用して、独立した複数の物理キューに分割される1つの論理キューです。TxEventQは、Oracle RACインスタンス全体で使用されるキュー、エンキュー率やデキュー率の高いキュー、サブスクライバの多いキューに適したJMSキューです。詳細は、「Transactional Event QueuesとOracle Real Application Clusters (Oracle RAC)」を参照してください。
AQキューの場合は、Oracle Real Application Clusters (Oracle RAC)を使用して、各インスタンスを別々のキューで管理できるようにすることで、Oracle Database Advanced Queuingのパフォーマンスを向上できます。このためには、キューを格納するキュー表に様々なインスタンス・アフィニティ(作業環境)を指定します。これによって、様々なキュー/トピックに対するキュー操作(エンキューまたはデキュー)またはトピック操作(パブリッシュ・サブスクライブ)を並行して行うことができるようになります。
Oracle Database Advanced Queuingのキュー・モニター・プロセスは、キュー表のインスタンス・アフィニティを継続的に監視します。キュー・モニターは指定されたプライマリ・インスタンスが使用可能な場合はそれにキュー表の所有権を割り当て、失敗した場合は指定されたセカンダリ・インスタンスに割り当てます。
キュー表を所有しているインスタンスが終了すると、キュー・モニターはセカンダリ・インスタンスなどの適切なインスタンスに所有者を変更します。
Oracle Database Advanced Queuingの伝播はOracle Real Application Clustersでも使用できますが、これはユーザーに対して透過的です。伝播スケジュールのジョブ・アフィニティは、それぞれのキュー表のアフィニティと同じ値に設定されます。このように、キュー表を所有するインスタンスに対応付けられたjob_queue_process
は、そのキュー表に格納されているキューからの伝播を処理し、ping操作を最小限に抑えます。
JMSでの統計ビューのサポート
各インスタンスは、それぞれのOracle Database Advanced Queuing統計情報をシステム・グローバル領域(SGA)に所有し、他のインスタンスによって収集された統計については認識しません。そのため、インスタンスがGV$AQ
ビューを問い合せると、その他のすべてのインスタンスからそれぞれのその時点の統計情報が問合せ元のインスタンスに集まります。
GV$AQ
ビューは、待機中、準備完了または期限切れの各メッセージ数を必要なときにいつでも問い合せることができます。また、メッセージが処理されるまでの平均待機秒数も表示します。
JMSでの構造化ペイロード/メッセージの型
JMSメッセージは、ヘッダー、プロパティおよび本体で構成されています。
ヘッダーは、クライアントおよびプロバイダの両方でメッセージの識別およびルーティングに使用される値が含まれるヘッダー・フィールドで構成されています。すべてのメッセージは、同じ一連のヘッダー・フィールドをサポートしています。
プロパティは、オプションのヘッダー・フィールドです。JMSによって定義された標準プロパティの他に、プロバイダ固有およびアプリケーション固有のプロパティを含めることもできます。
本体はメッセージ・ペイロードです。JMSは、様々な型のメッセージ・ペイロードおよびJMSで指定されたメッセージ型のJMSメッセージを格納できる型を定義します。
この項の内容は次のとおりです。
JMSメッセージ・ヘッダー
JMSメッセージ・ヘッダーには、次のフィールドが含まれます。
-
JMSDestination
このフィールドには、メッセージの送信先が含まれます。Oracle Database Advanced Queuingでは、これは送信先キューまたはトピックに相当します。
Send
メソッドの完了後にJMSによって設定されるDestination
型です。 -
JMSDeliveryMode
このフィールドは、メッセージのログが記録されるかどうかを決定します。JMSでは、
PERSISTENT
送信(メッセージが決まった記憶域にロギングされる)およびNONPERSISTENT
送信(メッセージがロギングされない)をサポートしています。Send
メソッドの完了後にJMSによって設定されるINTEGER
です。JMSでは、クライアントが指定したJMSDeliveryMode
の値をオーバーライドするよう管理者がJMSを構成することが可能です。 -
JMSMessageID
このフィールドは、プロバイダ内のメッセージを一意に識別します。すべてのメッセージIDは先頭に文字列
ID:
を使用する必要があります。Send
メソッドの完了後にJMSによって設定されるString
型です。 -
JMSTimeStamp
このフィールドには、送信先のプロバイダにメッセージが渡された時間が含まれます。Oracle Database Advanced Queuingのメッセージのエンキュー時間にマッピングされます。
Send
メソッドの完了後にJMSによって設定されるLong
型です。 -
JMSCorrelationID
このフィールドは、クライアントがメッセージを別のメッセージにリンクするために使用できます。JMSクライアントによって設定される
String
型です。 -
JMSReplyTo
このフィールドには、メッセージ送信時にクライアントが指定する
Destination
型が含まれます。クライアントはoracle.jms.AQjmsAgent
、javax.jms.Queue
またはjavax.jms.Topic
を使用できます。 -
JMSType
このフィールドには、送信時にクライアントが指定するメッセージ・タイプの識別子が含まれます。これは
String
型です。移植性を高めるために、JMSType
に記号値を使用することをお薦めします。 -
JMSExpiration
このフィールドは、非Java EE準拠モードでは、エンキュー時間と
TimeToLive
の合計です。準拠モードでは、デキューされたメッセージのJMSExpiration
ヘッダー値は、メッセージのエンキュー時のJMSTimeStamp
(ミリ秒単位のグリニッジ標準時)とTimeToLive
(ミリ秒単位)の合計です。Send
メソッドの完了後にJMSによって設定されるLong
型です。JMSでは、クライアントが指定したJMSExpiration
の値をオーバーライドするよう管理者がJMSを構成することが可能です。 -
JMSPriority
このフィールドには、メッセージの優先度が含まれます。
Send
メソッドの完了後にJMSによって設定されるINTEGER
です。Java EE準拠モードでは、優先度に使用できる値は0
から9
で、9
の優先度が最も高く、4
がデフォルトで、Sun Microsystem JMS 1.1標準に準拠しています。デフォルトは非準拠モードです。JMSでは、クライアントが指定したJMSPriority
の値をオーバーライドするよう管理者がJMSを構成することが可能です。 -
JMSRedelivered
このフィールドは、JMSプロバイダによって設定されたブールです。
関連項目:
JMSメッセージ・プロパティ
JMSプロパティは、クライアントによって明示的に設定されるか、またはJMSプロバイダによって自動的に設定されます(通常、これらは読取り専用です)。JMSプロパティには、Send
およびReceive
操作に指定されたパラメータを使用して設定されるものもあります。
プロパティで、メッセージにオプションのヘッダー・フィールドを追加できます。プロパティによって、クライアントは、messageSelector
を使用して、クライアントのかわりにJMSプロバイダにアプリケーション固有基準を使用してメッセージを選択させることができます。プロパティ名は文字列で、値はBoolean
、byte
、short
、int
、long
、float
、double
およびstring
です。
JMS定義のプロパティは、すべてJMSX
で始まります。次のものがあります。
-
JMSXUserID
このフィールドは、メッセージの送信元ユーザーの識別情報です。
Send
メソッドの完了後にJMSによって設定されるString
型です。 -
JMSXAppID
このフィールドは、メッセージの送信元アプリケーションの識別情報です。
Send
メソッドの完了後にJMSによって設定されるString
型です。 -
JMSXDeliveryCount
このフィールドは、メッセージの送信試行回数です。
Send
メソッドの完了後にJMSによって設定されるInteger
です。 -
JMSXGroupid
このフィールドは、メッセージが属するメッセージ・グループの識別情報です。JMSクライアントによって設定される
String
型です。 -
JMSXGroupSeq
このフィールドは、グループ内のメッセージの順序番号です。JMSクライアントによって設定される
Integer
型です。 -
JMSXRcvTimeStamp
このフィールドは、コンシューマにメッセージが配信された時間(デキュー時間)です。
Receive
メソッドの完了後にJMSによって設定されるString
型です。 -
JMSXState
このフィールドは、プロバイダによって設定されたメッセージ状態です。メッセージ状態は、
WAITING
、READY
、EXPIRED
またはRETAINED
です。
Oracle固有のJMSプロパティは、すべてJMS_Oracle
で始まります。次のものがあります。
-
JMS_OracleExcpQ
このフィールドは、元の宛先に配信できないメッセージを送信するキュー名です。JMSクライアントによって設定される
String
型です。EXCEPTION
タイプの宛先のみをJMS_OracleExcpQ
プロパティで指定できます。 -
JMS_OracleDelay
このフィールドは、メッセージの配信を遅延する時間(秒単位)です。JMSクライアントによって設定される
Integer
型です。これは、メッセージの配信順序に影響を与える可能性があります。 -
JMS_OracleOriginalMessageId
このフィールドは、メッセージがある宛先から別の宛先に伝播された場合に、ソースのメッセージのメッセージ識別子に設定されます。JMSプロバイダによって設定される
String
型です。メッセージが伝播されていない場合、このプロパティはJMSMessageId
と同じ値です。
クライアントは、プロパティを定義することによって、メッセージにヘッダー・フィールドを追加できます。これらのプロパティをmessageSelector
で使用して特定のメッセージを選択できます。
JMSメッセージ本体
JMSでは、次の5つのフォーマットのメッセージ本体が提供されます。
StreamMessage
StreamMessage
オブジェクトは、Javaプリミティブのストリームの送信に使用されます。格納と読取りは順次的に行われます。これはMessage
から継承し、StreamMessage
本文を追加します。メソッドは主にjava.io.DataInputStream
とjava.io.DataOutputStream
のメソッドに基づいています。
基本データ型の値は、それぞれの型のメソッドを使用して、明示的に読込みまたは書込みできます。抽象的なオブジェクトとして読込みまたは書込みすることもできます。StreamMessage
オブジェクトを使用するには、SYS.AQ$_JMS_STREAM_MESSAGE
またはAQ$_JMS_MESSAGE
ペイロード型を持つキュー表を作成します。
StreamMessage
オブジェクトは、表6-1に示す変換をサポートします。行の型として書き込まれる値は、列の型として読み込むことができます。
表6-1 StreamMessageの変換
入力 | Boolean | byte | short | char | int | long | float | double | String | byte[] |
---|---|---|---|---|---|---|---|---|---|---|
|
X |
- |
- |
- |
- |
- |
- |
- |
X |
- |
|
- |
X |
X |
- |
X |
X |
- |
- |
X |
- |
|
- |
- |
X |
- |
X |
X |
- |
- |
X |
- |
|
- |
- |
- |
X |
- |
- |
- |
- |
X |
- |
|
- |
- |
- |
- |
X |
X |
- |
- |
X |
- |
|
- |
- |
- |
- |
- |
X |
- |
- |
X |
- |
|
- |
- |
- |
- |
- |
- |
X |
X |
X |
- |
|
- |
- |
- |
- |
- |
- |
- |
X |
X |
- |
|
X |
X |
X |
X |
X |
X |
X |
X |
X |
- |
|
- |
- |
- |
- |
- |
- |
- |
- |
- |
X |
BytesMessage
BytesMessage
オブジェクトは、1つの未解釈バイトのストリームを含むメッセージを送信する場合に使用します。これはMessage
から継承し、BytesMessage
本文を追加します。メッセージの受信者が、そのバイト列を解析します。メソッドは主にjava.io.DataInputStream
とjava.io.DataOutputStream
のメソッドに基づいています。
これは、クライアントが既存のメッセージ・フォーマットをコード化するためのメッセージ型です。可能であれば、かわりに他の自己定義メッセージ型を使用してください。
基本データ型の値は、それぞれの型のメソッドを使用して、明示的に書込みができます。また、抽象的なオブジェクトとして書き込むこともできます。BytesMessage
オブジェクトを使用するには、SYS.AQ$_JMS_BYTES_MESSAGE
またはAQ$_JMS_MESSAGE
ペイロード型を持つキュー表を作成します。
MapMessage
MapMessage
オブジェクトは、名前がString
型で値がJava基本データ型である名前/値ペアの集合を送信する場合に使用します。このエントリには、名前を指定して順次的またはランダムにアクセスできます。エントリの順序は定義されません。これはMessage
から継承し、MapMessage
本文を追加します。基本データ型の値は、それぞれの型のメソッドを使用して、明示的に読込みまたは書込みできます。抽象的なオブジェクトとして読込みまたは書込みすることもできます。
MapMessage
オブジェクトを使用するには、SYS.AQ$_JMS_MAP_MESSAGE
またはAQ$_JMS_MESSAGE
ペイロード型を持つキュー表を作成します。MapMessage
オブジェクトは、表6-2に示す変換をサポートします。この表の「X」は、行の型として書き込まれる値を列の型として読み取ることができることを意味します。
表6-2 MapMessageの変換
入力 | Boolean | byte | short | char | int | long | float | double | String | byte[] |
---|---|---|---|---|---|---|---|---|---|---|
|
X |
- |
- |
- |
- |
- |
- |
- |
X |
- |
|
- |
X |
X |
- |
X |
X |
- |
- |
X |
- |
|
- |
- |
X |
- |
X |
X |
- |
- |
X |
- |
|
- |
- |
- |
X |
- |
- |
- |
- |
X |
- |
|
- |
- |
- |
- |
X |
X |
- |
- |
X |
- |
|
- |
- |
- |
- |
- |
X |
- |
- |
X |
- |
|
- |
- |
- |
- |
- |
- |
X |
X |
X |
- |
|
- |
- |
- |
- |
- |
- |
- |
X |
X |
- |
|
X |
X |
X |
X |
X |
X |
X |
X |
X |
- |
|
- |
- |
- |
- |
- |
- |
- |
- |
- |
X |
TextMessage
TextMessage
オブジェクトは、java.lang.StringBuffer
を含むメッセージを送信する場合に使用します。これはMessage
から継承し、TextMessage
本文を追加します。テキスト情報は、getText()
およびsetText(
...)
メソッドを使用して読込みまたは書込みができます。TextMessage
オブジェクトを使用するには、SYS.AQ$_JMS_TEXT_MESSAGE
またはAQ$_JMS_MESSAGE
ペイロード型を持つキュー表を作成します。
ObjectMessage
ObjectMessage
オブジェクトは、シリアライズ可能なJavaオブジェクトを含むメッセージを送信する場合に使用します。これはMessageから継承し、単一のJava参照を含む本文を追加します。シリアライズ可能なJavaオブジェクトのみ使用できます。Javaオブジェクトのコレクションを送信する必要がある場合、JDK 1.4で提供されているコレクション・クラスのいずれかを使用できます。オブジェクトは、getObject()
メソッドおよびsetObject(
...)
メソッドを使用して読込みまたは書込みできます。ObjectMessage
オブジェクトを使用する場合、SYS.AQ$_JMS_OBJECT_MESSAGE
またはAQ$_JMS_MESSAGE
ペイロード型でキュー表を作成します。
AdtMessage
AdtMessage
オブジェクトは、Oracleのオブジェクト型に対応するJavaオブジェクトを含むメッセージを送信する場合に使用します。これらのオブジェクトはMessage
から継承され、CustomDatum
またはORAData
インタフェースを実装するJavaオブジェクトを含む本体を追加します。
AdtMessage
オブジェクトを使用するには、Oracleのオブジェクト型としてのペイロード型を持つキュー表を作成します。AdtMessage
のペイロードは、getAdtPayload
およびsetAdtPayload
メソッドを使用して、読取りおよび書込みができます。
AdtMessage
オブジェクトを使用してSYS.XMLType
型のキューへメッセージを送信することもできます。oracle.xdb.XMLType
クラスを使用してメッセージを作成する必要があります。
AdtMessage
オブジェクトの場合、クライアントが次を取得できます。
-
JMSXDeliveryCount
-
JMSXRecvTimeStamp
-
JMSXState
-
JMS_OracleExcpQ
-
JMS_OracleDelay
関連項目:
CustomDatum
インタフェースおよびORAData
インタフェースの詳細は、『Oracle Database Java開発者ガイド』を参照してください。
異なるメッセージ型でのメッセージ・プロパティの使用
次のメッセージ・プロパティは、クライアントがsetProperty
コールを使用して設定できます。StreamMessage
、BytesMessage
、ObjectMessage
、TextMessage
およびMapMessage
オブジェクトの場合、クライアントによって次の設定ができます。
-
JMSXAppID
-
JMSXGroupID
-
JMSXGroupSeq
-
JMS_OracleExcpQ
-
JMS_OracleDelay
AdtMessage
オブジェクトの場合、クライアントによって次の設定ができます。
-
JMS_OracleExcpQ
-
JMS_OracleDelay
次のメッセージ・プロパティは、クライアントがgetProperty
コールを使用して取得できます。StreamMessage
、BytesMessage
、ObjectMessage
、TextMessage
およびMapMessage
オブジェクトの場合、クライアントが次を取得できます。
-
JMSXuserID
-
JMSXAppID
-
JMSXDeliveryCount
-
JMSXGroupID
-
JMSXGroupSeq
-
JMSXRecvTimeStamp
-
JMSXState
-
JMS_OracleExcpQ
-
JMS_OracleDelay
-
JMS_OracleOriginalMessageID
Oracle JMSを使用したバッファ済メッセージ
メッセージの送信時にdeliveryMode
としてNON_PERSISTENT
を指定すると、非永続JMSメッセージを送信できます。JMS非永続メッセージは決まった記憶域に記録する必要がないため、JMSシステム障害が発生すると消失する可能性があります。JMS非永続メッセージはOracle Database Advanced Queuingで使用可能なバッファ済メッセージに類似していますが、両者には重要な相違点も存在します。
ノート:
Oracle JMSの非永続メッセージを、Oracle Database 10gリリース2(10.2)で非推奨になったOracle Database Advanced Queuingの非永続キューと混同しないでください。
関連項目:
トランザクションのコミットとクライアントの確認
JMSのdeliveryMode
は、メッセージのトランザクション属性と直交します。JMS非永続メッセージは、処理済セッションまたは非処理済セッションで送受信できます。JMS非永続メッセージを処理済セッションで送受信する場合、JMS操作の効果は処理済セッションのコミット後にのみ参照可能です。CLIENT_ACKNOWLEDGE
確認モードの非処理済セッションで受信する場合、このメッセージの受信による効果は、クライアントがメッセージを確認した後にのみ参照可能です。確認されない場合、メッセージは削除されず、クライアントがSession.recover
をコールすると再配信されます。
これに対して、Oracle Database Advanced Queuingのバッファ済メッセージでは、このようなトランザクションまたは確認の概念はサポートされません。バッファ済メッセージの送受信は、どちらもIMMEDIATE
可視性モードで実行する必要があります。したがって、セッションがコミットされたかどうかやメッセージが確認されたかどうかは関係なく、ユーザーは送受信操作の効果を即時に参照できます。
各種API
通常のJMS送信およびパブリッシュ・メソッドで送信されるメッセージは、Oracle Database Advanced Queuingでは永続メッセージとして扱われます。通常のJMS受信メソッドは、AQ永続メッセージのみを受信します。バッファ済メッセージを送受信するには、Oracle拡張APIであるbufferSend
、bufferPublish
およびbufferReceive
を使用する必要があります。
関連項目:
bufferSend
、bufferPublish
およびbufferReceive
の詳細は、Oracle Databaseアドバンスト・キューイングJava APIリファレンスを参照してください
ペイロード制限
バッファ済メッセージのOracle Database Advanced Queuing実装では、LOB
属性はサポートされていません。このため、5種類の標準JMSメッセージのペイロードには次の制限が適用されます。
-
JMS
TextMessage
のペイロードは4000バイト以内です。Oracle JMSの文字セット変換時には、テキスト・ペイロードをデータベースに格納するために
VARCHAR
のかわりにCLOB
を使用するように控え目な選択が必要になる場合があるため、データベース文字セットによってはこの上限値がさらに低くなる場合があります。 -
JMS
BytesMessage
のペイロードは2000バイト以内です。 -
JavaでシリアライズされたJMS
ObjectMessage
、StreamMessage
およびMapMessage
データは、2000バイト以内である必要があります。 -
他のすべてのOracle JMS ADTメッセージの場合、対応するOracleデータベースADTに
LOB
属性を含めることはできません。
各種定数
表6-3に示すように、Oracle Database Advanced QueuingとOracle JMSのAPIでは、バッファ済メッセージと永続メッセージの指定に使用される数値が異なります。
表6-3 Oracle Database AQとOracle JMSのバッファ済メッセージ定数
API | 永続メッセージ | バッファ済メッセージ |
---|---|---|
Oracle Databaseアドバンスト・キューイング |
|
|
Oracle JMS |
|
|
JMSのバッファ済メッセージ
バッファ済メッセージでは、JMSメッセージ標準が完全にサポートされています。Oracle JMSでは、それらの標準がいくつかの方法で拡張されています。
関連項目:
JMSバッファ済メッセージのエンキュー
Oracle JMSでは、永続メッセージおよびバッファ済メッセージを同じJMSキュー/トピックにエンキューできるように、個別のメッセージにJMSDeliveryMode
を設定することによって、アプリケーションはバッファ済メッセージを送信できます。
Oracle JMSのバッファ済メッセージは、エンキュー時刻、優先順位またはその両方で順序付けできます。順序付けは異なるメッセージ・タイプ間では適用されません。このため、たとえば、後に送信された永続メッセージが前に送信されたバッファ済メッセージより前に配信されることがあります。Oracle JMSのバッファ済メッセージでは、期限切れもサポートされます。
関連項目:
JMSバッファ済メッセージのデキュー
JMSでは、JMSサブスクライバが両方のメッセージ・タイプを対象にできるように、サブスクライバが永続メッセージのみまたはバッファ済メッセージのみを対象とするように宣言することを要求しません。
Oracle JMSは、JMSMessageID
による高速で効率的なメッセージのデキュー、メッセージ・ヘッダーのセレクタ、およびメッセージ・プロパティのセレクタをサポートしています。Oracle JMSデキュー・コールは、永続メッセージとバッファ済メッセージの両方をチェックします。
ノート:
Oracle JMS永続メッセージには一意のメッセージ識別子があります。Oracle JMSバッファ済メッセージ識別子は、キュー/トピック内でのみ一意です。
同時デキュー・プロセスが同じサブスクライバの同じキューからデキューしている場合は、他のプロセスによってロックされているメッセージがスキップされます。
関連項目:
トランザクションのサポート
バッファ済メッセージがトランザクション・セッションでエンキューされる場合、JMSではそれらに対するトランザクション・サポートが必要となります。Oracle JMSでは、バッファ済メッセージに関連するトランザクション・セッションで次の標準が満たされることが保証されます。
-
原子性
Oracle JMSトランザクション内の永続メッセージおよびバッファ済メッセージは、アトミックにコミットまたはロールバックされます。バッファ済メッセージがディスクに書き込まれた場合(LOBに関連するメッセージの場合など)でも、ロールバックによってそれらが削除されます。
-
一貫性
永続メッセージ操作およびバッファ済メッセージ操作がトランザクションでインターリーブされる場合、すべてのOracle JMSユーザーは影響を受けるキュー/トピックの一貫性のあるビューを共有します。トランザクションによってエンキューされたすべての永続メッセージおよびバッファ済メッセージは、コミット時に表示できるようになります。プロセスがトランザクションの途中で終了した場合は、永続メッセージおよびバッファ済メッセージの両方が元に戻されます。Oracle JMSユーザーには、トランザクションのすべての永続メッセージおよびバッファ済メッセージが表示されるか、それらのいずれも表示されません。
-
分離
トランザクションのバッファ済エンキュー操作は、トランザクションがコミットされるまではオーナー・トランザクションにのみ表示されます。トランザクションがコミットされると、すべてのコンシューマに表示されます。
デキュー・トランザクションによってロックされているメッセージはブラウズできます。
確認メッセージの受信
非トランザクション・セッションの確認メッセージ受信のためのack_mode
パラメータには、3つの値が定義されています。
-
DUPS_OK_ACKNOWLEDGE
このモードでは、重複メッセージが許可されます。
-
AUTO_ACKNOWLEDGE
このモードでは、セッションはメッセージを自動的に確認します。
-
CLIENT_ACKNOWLEDGE
このモードでは、メッセージ・プロデューサ確認メソッドをコールすることによって、クライアントはメッセージを明示的に確認します。メッセージを確認すると、以前に処理されたすべてのメッセージが確認されます。
関連項目:
「セッションの作成」
バッファ済メッセージのサービス品質
JMSは、伝播されていないバッファ済メッセージを最大1回配信することをサポートするようにプロバイダに要求しています。バッファ済メッセージのリカバリが無効にされている場合、Oracle JMSはこの標準を満たしています。
現在のメッセージ伝播の実装では、メッセージの重複配信が行われることがあります。ただし、メッセージの伝播はOracle JMSによって提供される拡張であるため、これはJMS標準に違反しません。
関連項目:
バッファ済メッセージが重複配信される原因については、バッファ済メッセージの伝播を参照してください。
バッファ済メッセージでサポートされるJMSタイプ
Oracle JMSは、JMSで定義されているタイプをOracleのユーザー定義タイプにマップし、JMSメッセージを格納するためにそれらのユーザー定義タイプのキューを作成します。これらのタイプの一部にはLOB属性があり、メッセージが永続またはバッファ済のいずれであってもOracle JMSはディスクに書き込みます。
たとえば、JMSタイプJMSTextMessage
のユーザー定義タイプSYS.AQ$_JMS_TEXT_MESSAGE
では、4kより小さいテキスト文字列がVARCHAR2
列に格納されます。ただし、4kより大きいテキスト文字列を格納するためのCLOB属性があります。
JMSメッセージは4kより大きい場合がよくあるため、Oracle JMSは大きいメッセージをメモリーに格納できる新しいADTを提供しています。ADTのディスク表示は変わりませんが、いくつかのVARCHAR2
/RAW
属性では最大100kのサイズのJMSメッセージをメモリーに格納できます。100kより大きいメッセージもバッファ済メッセージとしてパブリッシュできますが、ディスクに書き込まれます。
関連項目:
JMS Point-to-Pointモデル機能
Point-to-Pointモデルでは、クライアントは、1つのポイントから別のポイントへメッセージを交換します。メッセージのプロデューサとコンシューマは、シングル・コンシューマ・キューを使用してメッセージを送受信します。管理者は、AQjmsSession
のcreateQueue
メソッドを使用して、シングル・コンシューマ・キューを作成します。キューを使用する前に、AQjmsDestination
でstart
コールを使用して、キューをエンキュー/デキューに対して有効にする必要があります。クライアントは、AQjmsSession
のgetQueue
メソッドを使用して事前に作成されたキューに対するハンドルを取得します。
シングル・コンシューマ・キューの場合、メッセージは1つのコンシューマが1度のみ処理できます。同じキューから同時にデキューするプロセスまたはオペレーティング・システム・スレッドが複数存在する場合、各プロセスはキューの先頭にあるロックされていない最初のメッセージをデキューします。ロックを作成したプロセス以外のプロセスは、ロックされたメッセージをデキューできません。
処理が済むと、キューの保存期間が0(ゼロ)の場合はそのメッセージは削除され、そうでない場合は指定された期間保存されます。メッセージが保存されている間は、キュー表ビューに対してSQLを使用して問い合せたり、QueueBrowser
で処理済メッセージのメッセージ識別子を指定してデキューできます。
QueueSender
クライアントは、QueueSender
を使用して、キューにメッセージを送信します。これは、クライアントのSession
でキューをcreateSender
メソッドに渡すことによって作成されます。また、クライアントにはキューを指定しないで、QueueSender
を作成するオプションがあります。この場合、キューを送信操作のたびに指定する必要があります。
クライアントは、QueueSender
によって送信されたすべてのメッセージのデフォルト配信モード、優先順位およびTimeToLive
を指定できます。または、クライアントはこれらのオプションを各メッセージに対して定義できます。
QueueReceiver
クライアントはQueueReceiver
を使用してキューからメッセージを受信します。これは、クライアントのSession
でcreateQueueReceiver
を使用して作成されます。messageSelector
があってもなくても作成できます。
QueueBrowser
QueueBrowser
を使用すると、クライアントはメッセージを削除しないで、キュー上でメッセージを参照できます。このブラウズ用メソッドは、キュー内のメッセージをスキャンするために使用されるjava
.util
.Enumeration
を戻します。nextElement
に対する最初のコールが、キューのスナップショットを取得します。QueueBrowser
は、messageSelector
を使用して作成しても、使用せずに作成してもかまいません。
QueueBrowser
も、メッセージをスキャン中にオプションでロックできます。これは、メッセージに対するSELECT
...for
UPDATE
コマンドの場合と似ています。これによって、他のコンシューマがスキャン中のメッセージを削除することはなくなります。
MessageSelector
messageSelector
によって、クライアントは、コンシューマに配信されるメッセージをmessageSelector
式と一致するメッセージに制限できるようになります。TextMessage
型、StreamMessage
型、BytesMessage
型、ObjectMessage
型またはMapMessage
型のペイロードを含むキューのmessageSelector
には、次の1つ以上を持つ任意の式を含めることができます。
-
接頭辞「ID:」が付いたJMSメッセージ識別子
JMSMessageID ='ID:23452345'
-
JMSメッセージ・ヘッダー・フィールドまたはプロパティ
JMSPriority < 3 AND JMSCorrelationID = 'Fiction' JMSCorrelationID LIKE 'RE%'
-
ユーザー定義のメッセージ・プロパティ
color IN ('RED', BLUE', 'GREEN') AND price < 30000
AdtMessage
型のペイロードを含むキューのmessageSelector
には、次の1つ以上を持つ任意の式を含めることができます。
-
接頭辞「ID:」なしのメッセージ識別子
msgid = '23434556566767676'
-
優先順位または相関識別子、あるいはその両方
priority < 3 AND corrid = 'Fiction'
-
メッセージ・ペイロード
tab.user_data.color = 'GREEN' AND tab.user_data.price < 30000
JMSパブリッシュ・サブスクライブ・モデル機能
この項の内容は次のとおりです。
JMSパブリッシュ・サブスクライブの概要
JMSでは、パブリッシャ(出版者)の機能を持つアプリケーションとサブスクライバ(購読者)の役割を果すアプリケーションとの間の柔軟で動的な通信が可能です。アプリケーションが結合されることはなく、メッセージとその内容に基づいて相互に作用します。
メッセージの配信では、パブリッシャ・アプリケーションが明示的にメッセージ受信者を処理または管理する必要はありません。このため、パブリッシャ・アプリケーションの論理を変更しなくても、新しいサブスクライバ・アプリケーションを動的に追加できます。
同様にサブスクライバ・アプリケーションは、メッセージを送信しているパブリッシャ・アプリケーションに関係なく、メッセージの内容に基づいてメッセージを受信します。このため、サブスクライバ・アプリケーションの論理を変更しなくても、新しいパブリッシャ・アプリケーションを動的に追加できます。
サブスクライバ・アプリケーションは、メッセージ・プロパティまたはトピックのメッセージ内容に対してルールベースのサブスクリプション(予約購読)を定義することで、どのようなメッセージに関心があるのかを指定できます。システムは、ルールベースのサブスクリプションを使用して、パブリッシュされたメッセージの受信者を計算し、自動的にルーティングします。
パブリッシュ・サブスクライブ・モデルでは、メッセージはトピックに対してパブリッシュされ、トピックから受信されます。トピックは、AQjmsSession
のCreateTopic()
メソッドを使用して作成されます。クライアントは、AQjmsSession
のgetTopic()
メソッドを使用して事前に作成されたトピックに対するハンドルを取得できます。
DurableSubscriber
クライアントはDurableSubscriber
を、クライアントのSession
内でcreateDurableSubscriber()
メソッドを使用して作成します。messageSelector
があってもなくても作成できます。
messageSelector
によって、クライアントは、サブスクライバに配信されるメッセージをセレクタにマッチするメッセージに制限できます。セレクタ構文の詳細は、Oracle Databaseアドバンスト・キューイングJava APIリファレンスのcreateDurableSubscriber
に関する項を参照してください。
永続サブスクライバが同じ名前を使用するときのアクションは、実行時にOracle Java Message Service (Oracle JMS)クライアントに設定されるJava EE準拠モードによって異なります。
非準拠モードでは、同じ名前の2つの永続的なTopicSubscriber
オブジェクトが、2つの異なるトピックに対してアクティブになることができます。準拠モードでは、複数の永続サブスクライバが同じ名前を持つことは許可されません。同じトピックに対して作成された2つのサブスクライバが同じ名前を使用する場合、各サブスクライバに使用されるセレクタが異なると、DBMS_AQJMS.ALTER_SUBSCRIBER()
の内部コールを使用して、基になるOracle Database Advanced Queuingサブスクリプションが変更されます。
2つのサブスクライバが同じ名前を使用し、2つの異なるトピックに対して作成される場合、同じサブスクリプション名を使用するクライアントがそれぞれサブスクリプション名を作成すると、既存のサブスクリプションは削除され新しいサブスクリプションが作成されます。
2つのサブスクライバが同じ名前を使用し、2つの異なるトピックに対して作成される場合、別のクライアント(最初にそのサブスクリプション名を作成したクライアントではない)が既存のサブスクリプション名を使用すると、サブスクリプションは削除されずにエラーが発生します。サブスクリプションがJMSまたはPL/SQLのどちらで作成されたかが不明であるため、その他のトピックについてのサブスクリプションは削除しないでください。
関連項目:
RemoteSubscriber
リモート・サブスクライバは、createRemoteSubscriber
コールを使用して定義されます。リモート・サブスクライバは、リモート・トピックでの特定のコンシューマ、またはリモート・トピックでのすべてのサブスクライバになることができます。
リモート・サブスクライバは、AQjmsAgent
構造を使用して定義されます。AQjmsAgent
は、名前およびアドレスで構成されます。名前は、リモート・トピックのconsumer_name
を参照します。アドレスは、次のようにしてリモート・トピックを参照します。
schema.topic_name[@dblink]
リモート・トピックで特定のコンシューマに対してメッセージをパブリッシュするには、リモート・トピックでの受信者のsubscription_name
が、AQjmsAgent
の名前フィールドに指定されている必要があります。リモート・トピックは、AQjmsAgent
のaddressフィールドに指定される必要があります。
リモート・トピックのすべてのサブスクライバに対してメッセージをパブリッシュするには、AQjmsAgent
のnameフィールドをNULLに設定する必要があります。リモート・トピックは、AQjmsAgent
のaddressフィールドに指定される必要があります。
TopicPublisher
メッセージはTopicPublisher
を使用してパブリッシュされます。これは、Topic
をcreatePublisher
メソッドに渡すことによって作成されます。また、クライアントにはTopic
を指定しないで、TopicPublisher
を作成するオプションがあります。この場合、Topic
をパブリッシュ操作のたびに指定する必要があります。クライアントは、TopicPublisher
によって送信されるすべてのメッセージのデフォルト配信モード、優先順位およびTimeToLive
を指定できます。また、各メッセージごとに、これらのオプションを指定することもできます。
Recipient Lists
JMSパブリッシュ・サブスクライブ・モデルでは、クライアントは、トピックのすべてのサブスクライバにメッセージを送信するのではなく、明示的な受信者リストを指定できます。これらの受信者は、トピックの既存のサブスクライバである場合もあれば、そうでない場合もあります。受信者リストは、このメッセージのトピックのサブスクリプション・リストをオーバーライドします。受信者リスト機能は、JMSに対するOracleの拡張です。
TopicReceiver
受信者名が受信者リストに明示的に指定されていても、その受信者がキューのサブスクライバではない場合、その受信者に送信されるメッセージは、TopicReceiver
を作成することによって受信できます。サブスクライバ名が指定されていない場合、クライアントがメッセージを受信するには、リモート・サイトで永続サブスクライバを使用する必要があります。TopicReceiver
は、JMSに対するOracleの拡張です。
TopicReceiver
は、messageSelector
を使用して作成できます。これによって、クライアントは、受信者に配信されるメッセージをセレクタにマッチするメッセージに制限できます。
関連項目:
TopicBrowser
TopicBrowser
を使用すると、クライアントはメッセージを削除しないでトピック上でメッセージを参照できます。このブラウズ用メソッドは、トピック・メッセージをスキャンするために使用されるjava.util.Enumeration
を戻します。TopicBrowser
を作成できるのは、永続サブスクライバのみです。nextElement
に対する最初のコールが、トピックのスナップショットを取得します。
TopicBrowser
は、メッセージをスキャン中にオプションでロックできます。これは、メッセージに対するSELECT
...for
UPDATE
コマンドの場合と類似しています。これによって、他のコンシューマがスキャン中のメッセージを削除することはなくなります。
TopicBrowser
は、messageSelector
を使用して作成できます。これによって、クライアントは、コンシューマに配信されるメッセージをセレクタにマッチするメッセージに制限できるようになります。
TopicBrowser
は、パージ機能をサポートしています。これによって、TopicBrowser
を使用するクライアントは、トピックの現行のブラウズ操作中に参照されたすべてのメッセージを廃棄できます。パージとは、参照済のすべてのメッセージを破壊的に受信することと同じです(TopicSubscriber
を使用して削除した場合と似ています)。
パージでは、メッセージがTopicBrowser
のjava.lang.Enumeration
のnextElement()
操作へのコールを使用してクライアントに戻された場合、そのメッセージは参照済とみなされます。クライアントがまだ参照していないメッセージは、パージ中には廃棄されません。パージ操作は、同じTopicBrowser
に対して何度も実行できます。
TopicBrowser
の作成に使用したJMS Session
がコミットされると、パージは正常に実行されます。セッションに対する操作がロールバックされた場合、パージ操作も取り消されます。
JMSメッセージ・プロデューサの機能
メッセージの優先順位および順序付け
メッセージの順序付けは、メッセージがキューまたはトピックから受信される順序を決定します。キューまたはトピックのキュー表の作成時に順序づけの方式が指定されます。現在は、Oracle Database Advanced Queuingでは、メッセージの優先順位とエンキュー時間に基づいた順序づけをサポートしており、これによって順序づけの方法が4種類あります。
-
先入れ先出し(FIFO)
エンキュー時刻が順序付け基準として選択されると、メッセージはエンキュー時刻の順序で受信されます。エンキュー時刻は、メッセージのパブリッシュ/送信時にOracle Database Advanced Queuingによってメッセージに割り当てられます。これはデフォルトの順序付けです。
-
優先順位による順序付け
優先順位による順序付けが選択されると、各メッセージに優先順位が割り当てられます。優先順位は、パブリッシュ/送信時に
メッセージ・プロデューサ
によってメッセージ・プロパティとして指定できます。メッセージは、割り当てられた優先順位の順序で受信されます。 -
FIFO優先順位
FIFO優先順位による順序付けが選択されると、トピックまたはキューは優先順位による順序付けの場合と同様に機能します。2つのメッセージに同じ優先順位が割り当てられた場合、両者はエンキュー時刻の順に受信されます。
-
エンキュー時刻に続く優先順位による順序付け
同じエンキュー時刻のメッセージは、そのメッセージの優先順位に従って受信されます。2つのメッセージの順序付け基準が同じ場合、受信される順序は予想できません。ただし、1つのセッション中に特定の順序付け基準で生成されたメッセージは、送信された順序で受信されることがOracle Database Advanced Queuingにより保証されます。
永続メッセージに使用できる順序付けスキームはすべて、バッファ済メッセージにも使用できますが、使用できるのは各メッセージ・クラス内のみです。同じセッション内でエンキューまたはパブリッシュされた永続メッセージとバッファ済メッセージ間の順序付けは、現在はサポートされていません。
メッセージ遅延の指定
メッセージをキュー/トピックに対して送信/パブリッシュするときに、遅延を指定できます。遅延は、そのメッセージがメッセージ・コンシューマに対して使用可能になるまでの時間を表します。遅延指定されたメッセージは、遅延の期限が切れるまで待機状態になります。遅延指定は、メッセージ識別子による受信でオーバーライドされます。
遅延は、JMSメッセージ・プロパティに対するOracle Database Advanced Queuingの拡張機能です。Oracle Database Advanced Queuingのバックグラウンド・プロセスのキュー・モニターが起動される必要があります。
メッセージ期限切れの指定
メッセージのプロデューサは、メッセージの期限切れまたはTimeToLive
を指定できます。これによって、そのメッセージがメッセージ・コンシューマに対して使用可能な期間が定義されます。
TimeToLive
は、送信/パブリッシュ時に指定するか、MessageProducer
の設定済TimeToLive
メソッドを使用して指定することが可能で、前者が後者よりも優先されます。TimeToLive
を実装するには、Oracle Database Advanced Queuingのバックグラウンド・プロセスのキュー・モニターが実行されている必要があります。
メッセージのグループ化
1つのキューまたはトピックに属するメッセージをグループ化してセットを形成し、一度に1人のユーザーのみによって消費されるように設定できます。そのためには、トランザクション処理によるメッセージのグループ化が可能なキュー表にキューまたはトピックを作成する必要があります。同じグループに属するメッセージはすべて同じトランザクション内に作成する必要があり、同じトランザクション内に作成されたメッセージはすべて同じグループに属します。
メッセージのグループ化は、JMS仕様に対するOracle Database Advanced Queuingの拡張機能です。
この機能を使用すると、複雑なメッセージを、リンクされた一連の単純なメッセージに分割できます。たとえば、請求書キュー宛ての請求書は、ヘッダーのメッセージ、詳細情報の複数のメッセージ、フッターのメッセージの順に分割できます。
小さいオブジェクトに分割できるイメージやビデオなどの複合ラージ・オブジェクトがメッセージ・ペイロードにある場合は、メッセージのグループ化が非常に有効です。
グループに含まれるメッセージの優先順位、遅延および期限切れの各プロパティは、単にグループの最初のメッセージ(ヘッダー)のプロパティによってのみ判断されます。グループの他のメッセージのプロパティは無視されます。
メッセージ・グループは、伝播中も保持されます。宛先トピックは、トランザクション処理のグループ化に対して使用可能にしておく必要があります。
関連項目:
トランザクション処理でグループ化可能なキューからメッセージをデキューするときにメッセージ・グループを保持する場合、注意する必要がある制限については、「デキュー機能」を参照してください。
JMSメッセージ・コンシューマ機能
この項の内容は次のとおりです。
メッセージの受信
JMSアプリケーションは、メッセージ・コンシューマを作成することによって、メッセージを受信できます。メッセージは、receive
コールを使用して同期的に受信するか、メッセージ・リスナーを使用して非同期的に受信できます。
受信モードには次の3つがあります。
-
メッセージがコンシューマに届くまでブロック
-
指定最大時間までブロック
-
非ブロック
受信におけるメッセージのナビゲーション
コンシューマがナビゲーション・モードを指定しない場合、セッションの最初のreceive
はキューまたはトピックの最初のメッセージを取り出し、第2のreceive
は次のメッセージを取得します。優先順位が高いメッセージがコンシューマに届く場合、すでに届いているメッセージをクリアするまで、コンシューマはこのメッセージを受信しません。
コンシューマが、メッセージに対してより効率的にキューのナビゲーションを制御できるように、Oracle Database Advanced QueuingにはJMS拡張機能として複数のナビゲーション・モードが用意されています。これらのモードは、TopicSubscriber
、QueueReceiver
またはTopicReceiver
で設定できます。
グループ化されていないメッセージには、次の2つのモードを使用できます。
-
FIRST_MESSAGE
このモードでは、位置がキューの先頭にリセットされます。コンシューマがキューの最上位にあるメッセージを削除できるため、優先順位に基づくキューで役立ちます。
-
NEXT_MESSAGE
このモードは、コンシューマの確立された位置の後のメッセージを取得します。たとえば、4番目のメッセージの位置で適用された
NEXT_MESSAGE
は、そのキューの5番目のメッセージを取得します。これはデフォルト・アクションです。
グループ化メッセージには、次の3つのモードを使用できます。
-
FIRST_MESSAGE
このモードでは、位置がキューの先頭にリセットされます。
-
NEXT_MESSAGE
このモードでは、位置が同一トランザクションの次のメッセージに設定されます。
-
NEXT_TRANSACTION
このモードでは、位置が次のトランザクションの最初のメッセージに設定されます。
ノート:
トランザクション・イベント・キューは、前述の3つのモードをサポートしていません。
次の方法でメッセージが受信される場合、グループ化トランザクションのプロパティを無効にできます。
-
セレクタに相関識別子を指定して受信
-
セレクタにメッセージ識別子を指定して受信
-
トランザクション・グループのメッセージがすべて受信される前にコミット
NEXT
_MESSAGE
またはNEXT
_TRANSACTION
オプションの使用中にコンシューマがキューの最後に到達したとします。ブロッキングreceive()
を指定していた場合は、ナビゲート位置は自動的にそのキューの先頭に変更されます。
デフォルトでは、QueueReceiver
、TopicReceiver
またはTopicSubscriber
は、最初のreceiveコールにFIRST_MESSAGE
を、以降のreceive()
コールにNEXT_MESSAGE
を使用します。
メッセージのブラウズ
デキューするクライアントがキューからメッセージを削除できる通常のreceive
の他に、JMSでは、JMSクライアントがキューで自身のメッセージをブラウズできるようにするインタフェースを提供しています。QueueBrowser
は、QueueSession
からcreateBrowser
メソッドを使用して作成できます。
メッセージが参照されると、そのメッセージは引き続き処理できます。メッセージがブラウズされた後は、同時セッションからreceive
コールがそのメッセージを削除する場合があるため、JMSセッションに引き続き使用できるとはかぎりません。
一度参照したメッセージが同時JMSクライアントによって削除されないようにするために、ロック・モードでメッセージを参照できます。そのためには、JMSインタフェースに対するOracle Database Advanced Queuingの拡張機能を使用して、ロック・モードを持つQueueBrowser
を作成する必要があります。メッセージのロックは、セッションがコミットまたはロールバックを実行すると解放されます。
QueueBrowser
によって参照されたメッセージを削除するには、セッションがQueueReceiver
を作成し、JMSmesssageID
をセレクタとして使用する必要があります。
取出しを伴わないメッセージの削除
コンシューマは、receiveNoData
コールを使用してキューまたはトピックからメッセージを取得することなく、メッセージを削除できます。アプリケーションがQueueBrowser
などを使用してメッセージをすでに確認した場合に役立ちます。このモードを使用すると、JMSクライアントがペイロードをデータベースから取得するオーバーヘッドを回避できます(メッセージが大きい場合、オーバーヘッドが顕著である可能性があります)。
遅延間隔をおいた後の再試行
キュー/トピックからメッセージを受信するトランザクションが失敗した場合、そのメッセージを削除する試行に失敗したとみなされます。Oracle Database Advanced Queuingは、メッセージ削除の試行に失敗した回数をメッセージ履歴に記録します。
アプリケーションでは、メッセージに対する再試行の最大回数をキュー/トピック・レベルで指定できます。メッセージ削除がこの数より多く失敗した場合、メッセージは例外キューに移動されます。
Oracle Database Advanced Queuingでは、ユーザーはmax_retries
とともにretry_delay
も指定できます。これは、受信の試行に失敗したメッセージを、retry_delay
間隔後に引き続きキューで参照し、デキューできることを意味します。それまでこのメッセージはWAITING
状態になります。Oracle Database Advanced Queuingのバックグラウンド・プロセスのタイム・マネージャは、再試行遅延プロパティを強制的に適用します。
再試行の最大回数および再試行の遅延は、キュー/トピックのプロパティです。このプロパティは、キュー/トピックの作成時、またはキュー/トピックに対する変更メソッドを使用して設定できます。MAX_RETRIES
のデフォルト値は5
です。
ノート:
トランザクション・イベント・キューは、再試行の遅延をサポートしていません。
MessageListenerを使用したメッセージの非同期受信
JMSクライアントは、setMessageListener
メソッドを使用してMessageListener
を設定することによって、メッセージを非同期的に受信できます。
コンシューマにメッセージが届いた場合、メッセージ・リスナーのonMessage
メソッドがそのメッセージで起動されます。メッセージ・リスナーは、メッセージの受信をコミットまたは異常終了できます。メッセージ・リスナーは、JMS Connection
が停止されている場合、メッセージを受信しません。一度メッセージ・リスナーがコンシューマに対して設定されると、メッセージの受信にreceive
コールを使用することはできません。
JMSクライアントは、セッションでMessageListener
を設定することによって、そのセッションのすべてのコンシューマに対してメッセージを非同期的に受信できます。一度メッセージ・リスナーが設定されると、そのセッションでは、その他のメッセージ受信モードを使用できません。
例外キュー
例外キューは、すべての期限切れのメッセージまたは使用できないメッセージが格納されるリポジトリです。アプリケーションがメッセージを例外キューに直接エンキューすることはできません。ただし、期限切れまたは処理できないメッセージを処理するアプリケーションは、例外キューからこれらのメッセージを受信または削除できます。
例外キューからメッセージを取り出すには、JMSクライアントはPoint-to-Pointインタフェースを使用する必要があります。トピック用のメッセージの例外キューは、使用可能な複数のコンシューマでキュー表に作成する必要があります。他のキューと同様に、例外キューもAQOracleQueue
クラスでstart
メソッドを使用して、メッセージを受信できる必要があります。例外キューをエンキュー可能に設定しようとすると、例外が発生します。
トランザクション・イベント・キュー(TxEventQ)は、DBMS_AQADM.CREATE_EQ_EXCEPTION_QUEUE
APIによって例外キューをサポートしています。
PROCEDURE CREATE_EQ_EXCEPTION_QUEUE(
queue_name IN VARCHAR2,
exception_queue_name IN VARCHAR2 DEFAULT NULL,
multiple_consumers IN BOOLEAN DEFAULT FALSE,
storage_clause IN VARCHAR2 DEFAULT NULL,
sort_list IN VARCHAR DEFAULT NULL,
comment IN VARCHAR2 DEFAULT NULL
);
例外キューは、"JMS_OracleExcpQ"
と呼ばれるOracle固有のメッセージ・プロパティで、メッセージの送信/パブリッシュ前に設定できます。例外キューが指定されていないと、デフォルトの例外キューが使用されます。AQキューの場合、デフォルトの例外キューは、キュー表の作成時にAQ$_
queue_table_name
_E
という名前で自動的に作成されます。デフォルトでは、TxEventQには例外キューは作成されません。
メッセージは、次の条件が成立するときに例外キューに移されます。
-
そのメッセージが、指定された
timeToLive
内にデキューされなかった場合。複数のサブスクライバを指定したメッセージの場合、指定された
timeToLive
内にそのメッセージをデキューできない受信者が1つ以上あると、メッセージは例外キューに移されます。 -
メッセージが正常に受信されたが、メッセージ処理中のエラーのためアプリケーションが
receive
を実行したトランザクションを異常終了した場合。そのメッセージはキュー/トピックに戻され、メッセージ受信のために待機中のどのアプリケーションでも使用可能になります。アプリケーションがトランザクション全体を異常終了するか、
receive
前のセーブポイントまでロールバックしたときは、receive
がロールバックまたはUNDOされたとみなされます。これは失敗したメッセージ受信の試行であるため、その再試行回数は更新されます。メッセージの再試行回数が、メッセージが常駐するキュー/トピックに指定された最大値を超える場合、そのメッセージは例外キューに移されます。
メッセージが複数のサブスクライバを持つ場合、そのメッセージは、すべての受信者が再試行制限を超えたときにのみ、例外キューに移されます。
ノート:
サーバー・プロセスがインスタンスで停止した(ALTER
SYSTEM
KILL
SESSION
またはSHUTDOWN
ABORT
など)ためにデキュー・トランザクションが失敗した場合、RETRY_COUNT
は増分されません。
JMS伝播
この項の内容は次のとおりです。
ノート:
TxEventQキューは、RemoteSubscriber、伝播のスケジュール、拡張伝播スケジュール機能および伝播中の例外処理をサポートしていません。
RemoteSubscriber
Oracle Database Advanced Queuingによって、他のデータベースにあるサブスクライバをトピックにサブスクライブできます。トピックに対してパブリッシュされたメッセージがリモート・サブスクライバの基準を満たしている場合は、リモート・サブスクライバに指定されているリモート・データベースにあるキュー/トピックに自動的に伝播します。伝播は、データベース・リンクおよびOracle Net Servicesを使用して実行されます。これによって、同じデータベースに接続しなくても、アプリケーション同士が互いに通信できます。
リモート・サブスクライバを実装するには、次の2つの方法があります。
-
createRemoteSubscriber
メソッドは、トピック上またはトピックに対してリモート・サブスクライバを作成するために使用します。このリモート・サブスクライバは、クラスAQjmsAgent
のインスタンスとして指定されます。 -
AQjmsAgent
には、名前およびアドレスがあります。アドレスは、キュー/トピックおよびサブスクライバのデータベースへのデータベース・リンクで構成されています。
リモート・サブスクライバには、次の2種類があります。
-
リモート・サブスクライバがトピックである場合。
これは、
AQjmsAgent
オブジェクトのリモート・サブスクライバに名前が指定されず、アドレスがトピックである場合に発生します。サブスクライバのサブスクリプションを満たすメッセージが、リモート・トピックに伝播されます。伝播されたメッセージは、それが満たすリモート・トピックのすべてのサブスクリプションに対して使用可能になります。 -
メッセージに対して特定のリモート受信者を指定する場合。
リモート・サブスクリプションは、リモート・データベースにある特定のコンシューマに対して指定できます。リモート受信者の名前が(
AQjmsAgent
オブジェクトに)指定される場合、サブスクリプションを満たすメッセージが、その受信者専用のリモート・データベースに伝播されます。リモート・データベースにある受信者は、TopicReceiver
インタフェースを使用してメッセージを取り出します。リモート・サブスクリプションは、Point-to-Pointキューに対して指定することもできます。
伝播のスケジュール
伝播は、メッセージがターゲットの接続先データベースに伝播されるすべてのトピックについて、schedule_propagation
メソッドを使用してスケジューリングされる必要があります。
スケジュールは時間の枠を示し、メッセージはその枠内でソース・トピックから伝播されます。この時間枠は、ネットワーク通信量、ソース・データベースの負荷、接続先データベースの負荷などの複数の要因に左右されます。したがって、スケジュールは特定のソースおよび宛先にあわせて調整する必要があります。スケジュールが作成されると、ジョブは自動的にjob_queue
機能に発行され、伝播が処理されます。
伝播スケジュールのための運用管理コールによって、スケジュール管理を柔軟に行うことができます。あるスケジュールの存続時間または伝播枠パラメータによって、伝播が開始される時間枠が指定されます。存続時間が指定されない場合、時間枠は無制限の単一枠になります。枠を定期的に繰り返す必要がある場合、連続する枠の間の周期的間隔を定義するnext_time
機能を使用して有限の存続時間を指定します。
あるキューに定義された伝播スケジュールは、そのキューの有効期間中いつでも変更または削除できます。さらに、(スケジュールを削除するかわりに)一時的に使用不可にするコール、および使用不可のスケジュールを使用可能にするコールがあります。メッセージがスケジュール内で伝播されているとき、そのスケジュールはアクティブです。すべての管理コールは、スケジュールがアクティブかどうかに関係なく実行されます。スケジュールがアクティブの場合、コールが実行されるまでに数秒かかります。
伝播が開始されるには、ジョブ・キュー・プロセスを起動する必要があります。少なくとも2つのジョブ・キュー・プロセスを起動する必要があります。接続先データベースへのデータベース・リンクも有効にする必要があります。伝播のソースおよび宛先トピックは、同じメッセージ型である必要があります。リモート・トピックは、エンキューできる必要があります。データベース・リンクのユーザーもリモート・トピックに対するエンキュー権限を持つ必要があります。
関連項目:
拡張伝播スケジュール機能
伝播のために定義されたカタログ・ビューは、アクティブ・スケジュールに関する次の情報を提供します。
-
そのスケジュールを処理しているバックグラウンド・プロセスの名前
-
伝播を処理しているセッションのSID(セッションおよびシリアル番号)
-
スケジュールを処理しているインスタンス(Oracle RACを使用している場合)
-
先行して正常に実行されたスケジュール
-
次に実行予定のスケジュール
スケジュールごとに次の伝播統計が保持され、キュー管理者がスケジュール調整に役立てることができます。
-
スケジュールの中で伝播されたメッセージ合計数
-
スケジュールの中で伝播されたバイト合計数
-
伝播枠の中で伝播されたメッセージの最大数
-
伝播枠の中で伝播されたバイトの最大値
-
伝播枠の中で伝播されたメッセージの平均数
-
伝播済メッセージの平均サイズ
-
伝播済メッセージの平均時間
伝播機能には、障害対処およびエラー・レポートが組み込まれています。たとえば、指定されたデータベース・リンクが無効な場合、リモート・データベースが使用できない場合、またはリモート・トピック/キューにエンキューできない場合、適切なエラー・メッセージがレポートされます。伝播は指数バックオフ・スキームを使用して、障害が発生したスケジュールからの伝播を再試行します。あるスケジュールで続けて障害が発生したときは、最初の再試行は30秒後、次の再試行は60秒後、3回目の再試行は120秒後、というように続きます。再試行時間が現行の伝播枠の期限切れ時刻を超える場合は、次の再試行は、次の伝播枠の開始時刻に行われます。最大16回の再試行が行われた後、そのスケジュールは自動的に使用不可能になります。
ノート:
再試行が次の伝播ウィンドウに移動されると、常に移動されるようになり、指数バックオフ・スキームは再試行のスケジュールを管理しません。DBMS_AQADM.SCHEDULE_PROPAGATION()
のnext_time
パラメータで指定された日付関数の結果、ウィンドウ間の間隔が短くなると、再試行の失敗数はすぐに16に達し、スケジュールが無効になります。
障害のためにスケジュールが自動的に使用不可になると、関連情報がアラート・ログに書き込まれます。スケジュールで失敗が発生したか、その場合は何回連続して失敗が発生したか、失敗の原因を示すエラー・メッセージ、および最後の失敗が発生した時刻を、いつでも確認できます。この情報を調べることで、管理者は障害を回復し、スケジュールを使用可能にできます。
再試行の間に伝播が成功したときは、障害の数は0(ゼロ)にリセットされます。
伝播機能にはOracle Real Application Clustersサポートが組み込まれていますが、ユーザーおよび管理者には透過的です。伝播を処理するジョブは、ソース・トピックが常駐しているキュー表の所有者と同じインスタンスに送られます。あるインスタンスに障害が発生してトピックを保存しているキュー表が他のインスタンスに移される場合は、伝播ジョブも必ず自動的に新しいインスタンスに移行されます。これによって、インスタンス間のping操作は最小限に抑えられ、パフォーマンスが向上します。伝播は、同時スケジュールをいくつでも処理できるように設計されています。
job_queue_processes
の最大数は1000で、その一部は伝播に関連しないジョブの処理に使用できます。このために、伝播にはマルチタスキングおよびロード・バランシングのサポートが組み込まれています。伝播アルゴリズムは、複数スケジュールが単一スナップショット(job_queue
)のプロセスによって処理できるように設計されています。job_queue
プロセスに対する伝播の負荷は、異なるソース・トピックからのメッセージ到着の割合に基づいて偏りが発生する場合があります。あるプロセスが数個のアクティブ・スケジュールによって過負荷になっている一方で、別のプロセスは受動的なスケジュールが多いために余力があるというとき、伝播はプロセス間で負荷が均等になるようにスケジュールを自動的に再分配します。
伝播中の例外処理
ネットワーク障害のようなシステム・エラーが発生した場合、Oracle Database Advanced Queuingは指数バックオフ・アルゴリズムを使用してメッセージを伝播する試みを継続します。キューからデータベース・リンクへの伝播中のアプリケーション・エラーを示している状況では、Oracle Database Advanced QueuingはそのメッセージにUNDELIVERABLE
というマークを付けてalert.log
に記録します。このようなエラーは、リモート・キューが存在しない場合、またはソース・キューおよびリモート・キューの型が一致しない場合に発生します。background_dump_dest
ディレクトリのトレース・ファイルには、そのエラーに関する追加情報があります。
新規のジョブ・キュー・プロセスが開始すると、型を再検証できるように型の不一致エラーをクリアします。ジョブ・キュー・プロセス数に上限を設定して、伝播のビジー状態が続く場合、ジョブ・キュー・プロセスが終了して再開するまで待つ必要はありません。キューの型は、必要に応じてDBMS_AQADM.VERIFY_QUEUE_TYPES
を使用して再検証できます。
ノート:
キューからキューへの伝播中に型の不一致が検出されると、伝播は停止してエラーが発生します。このような場合は、DBA_SCHEDULES
ビューを問い合せて、特定の宛先への伝播中に発生した最後のエラーを判断する必要があります。このメッセージには、UNDELIVERABLE
マークは付いていません。
JMS AQのメッセージ変換
あるフォーマットのメッセージを別のフォーマットのメッセージにマップするために変換を定義できます。変換は、同一の情報を異なるフォーマットで表現するアプリケーションを統合する必要がある場合に有効です。変換はSQL式およびPL/SQLファンクションです。メッセージ変換は、標準JMSインタフェースに対するOracle Database Advanced Queuingの拡張機能です。
変換は、DBMS_TRANSFORM.create_transformation
プロシージャを使用して作成できます。変換は、次の操作を行う場合に指定できます。
-
キューまたはトピックへのメッセージの送信
-
キューまたはトピックからのメッセージの受信。
-
TopicSubscriber
の作成 -
RemoteSubscriber
の作成。これによって、異なるフォーマットのトピック間でメッセージを伝播できます。
ノート:
TxEventQは、メッセージ変換をサポートしていません。
JMSストリーミング
AQ JMSは、大量のメッセージ・データまたはペイロードを送受信するアプリケーション用に、AQjmsBytesMessage
およびAQjmsStreamMessage
によるTxEventQのエンキューおよびデキューでのストリーミングをサポートします。
JMSストリーミングは、大きい連続したバイトの配列を送受信するのではなく、メッセージ・ペイロードを小さいチャンクに分割することにより、大きいメッセージを扱うときのメモリー要件を緩和します。JMS標準にストリーミング・メカニズムは含まれないため、AQ JMSはAQストリーミング・エンキューおよびデキュー機能を示すための固有のインタフェースを提供します。これにより、ユーザーは既存のjava入出力ストリームを使用してメッセージ・データまたはペイロードを簡単に送受信できます。
データベースのRDBMS 12.2へのアップグレード時に既存のアプリケーションが変更なしで動作できるように、デフォルトではストリーミングAPIは無効化されます。
クライアント・アプリケーションはシステム・プロパティoracle.jms.useJmsStreaming
をtrue
に設定して使用することにより、JMSストリーミングを有効化できます。
ノート:
JMSストリーミングはThinドライバの場合のみサポートされます。
エンキューでのJMSストリーミング
AQ JMSではAQjmsBytesMessage
およびAQjmsStreamMessage
の新しいAPI setInputStream(java.io.InputStream)
が提供され、メッセージ・データ用の入力ストリームを設定します。
/** * @param inputStream - InputStream to read the message payload * @throws JMSException - if the JMS provided fails to read the payload due to * some internal error */ public void setInputStream(InputStream inputStream) throws JMSException
次のコード・スニペットでは、AQjmsBytesMessage
タイプのメッセージが作成され、メッセージ・データ用のFileInputStream
が設定されます。
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); Destination destination = session.createQueue("queueName"); MessageProducer producer = session.createProducer(destination); AQjmsBytesMessage bytesMessage = (AQjmsBytesMessage)session.createBytesMessage(); InputStream input = new FileInputStream("somefile.data"); bytesMessage.setInputStream(input); producer.send(bytesMessage);
ノート:
-
BytesMessage
およびStreamMessage
のメソッドはjava.io.DataInputStream
およびjava.io.DataOutputStream
で検出されるメソッドに基づいているため、様々なread*()
およびwrite*()
メソッドの意味のある変換はストリームでは不可能です。次の使用例では例外が発生します。-
bytesMessage.setInputStream(input);
bytesMessage.writeInt(99);
-
bytesMessage.writeInt(99);
bytesMessage.setInputStream(input);
-
-
通常のエンキュー操作と同様に、ストリーミングを使用したエンキューは同期操作となり、エンキューが完了した後でのみ、制御がクライアントに戻されます。
-
これらのAPIが明示的にクライアントにより使用される場合にのみ、ストリーミングがエンキューで使用されます。AQ JMSはメッセージ・データのサイズと関係なく、通常のエンキューではストリーミングを使用しません。
デキューでのJMSストリーミング
ストリーミングを使用したデキュー操作は、2つのステップで実行されます。サーバーでは、メッセージ本文のサイズに基づき、メッセージ本文をストリーミングするかどうかが決定されます。デフォルトのしきい値制限は10MBです。そのため、メッセージ本文が10MBよりも大きく、システム・プロパティoracle.jms.useJmsStreamingを使用して、ストリーミングがクライアントで有効な場合、サーバーはデキューでストリーミングを使用します。
-
これはクライアントが
receive()
メソッドを呼び出す場合の通常のデキュー・プロセスです。Destination destination = session.createQueue ("queueName"); AQjmsConsumer consumer = (AQjmsConsumer) session.createConsumer(destination); Message message = consumer.receive(10000);
-
クライアントがペイロードなしのメッセージを受信する場合、クライアントは受信メッセージでの
isLargeBody()
の呼び出しにより、ストリーミングがデキューで使用されるかどうかを特定します。/** * This method can be used by the client applications to check whether the message * contains large messaege body and hence requires streaming with dequeue. * * @return true when the message body is large and server decides to stream * the payload with dequeue */ public boolean isLargeBody()
isLargeBody()
により戻されたtrueの値は、デキューでのストリーミングを示します。デキューでストリーミングを使用する場合、AQ JMSはAQjmsStreamMessage
とAQjmsBytesMessage
に対して正確にメッセージ本文の長さを移入します。そのため、クライアント・アプリケーションはメッセージでgetBodyLength()
を呼び出し、ペイロードのサイズを判断できます。public long getBodyLength()
クライアントがデキューでのストリーミングについて理解したら、受信メッセージで次のAPIのいずれかを使用してメッセージ・データをフェッチできます。
クライアント・アプリケーションは、AQjmsBytesMessage
およびAQjmsStreamMessage
で使用可能な次のAPIを使用して、メッセージ・データを受信できます。
/** * Writes the message body to the OutputStream specified. * * @param outputStream - the OutputStream to which message body can be written * @return the OutputStream containing the message body. * @throws JMSException - if the JMS provided fails to receive the message body * due to some internal error */ public OutputStream getBody(OutputStream outputStream) throws JMSException /** * Writes the message body to the OutputStream specified, with chunkSize bytes * written at a time. * * @param outputStream - the OutputStream to which message body can be written * @param chunkSize - the number of bytes to be written at a time, default value * 8192 (ie. 8KB) * @return the OutputStream containing the message body. * @throws JMSException - if the JMS provided fails to receive the message body * due to some internal error */ public OutputStream getBody(OutputStream outputStream, int chunkSize)throws JMSException /** * Writes the message body to the OutputStream specified. This method waits until * the message body is written completely to the OutputStream or the timeout expires. * * A timeout of zero never expires, and a timeout of negative value is ignored. * * @param outputStream - the OutputStream to which message body can be written * @param timeout - the timeout value (in milliseconds) * @return the OutputStream containing the message body. * @throws JMSException - if the JMS provided fails to receive the message body * due to some internal error */ public OutputStream getBody(OutputStream outputStream, long timeout) throws JMSException /** * Writes the message body to the OutputStream specified, chunkSize bytes at a time. * This method waits until the message body is written completely to the OutputStream * or the timeout expires. * * A timeout of zero never expires, and a timeout of negative value is ignored. * * @param outputStream - the OutputStream to which message body can be written * @param chunkSize - the number of bytes to be written at a time, * default value 8192 (ie. 8KB) * @param timeout - the timeout value (in milliseconds) * @return the OutputStream containing the message body. * @throws JMSException - if the JMS provided fails to receive the message body * due to some internal error */ public OutputStream getBody(OutputStream outputStream, int chunkSize, long timeout) throws JMSException
次のコード・スニペットは、ストリーミングがデキューで使用され、受信したペイロードがFileOutputStream
に書き込まれるかどうかをチェックします。
if (message instanceof BytesMessage && (AQjmsBytesMessage)message.isLargeBody()){ // optional : check the size of the payload and take appropriate action before // receiving the payload. (AQjmsBytesMessage) message.getBody(new FileOutputStream(new File("…"))); } else { // normal dequeue }
通常、両方のステップが完了すると、メッセージは完全に消費されたとみなされます。AQサーバーにより、ステップ1の後でメッセージでのロックは保持され、ステップ2の後でのみ解除されます。
メッセージ・コンシューマによりメッセージが部分的に消費されることにより発生する可能性のある問題を考慮して、確認モードCLIENT_ACKNOWLEDGE
およびSESSION_TRANSACTED
でのセッション用のストリーミングAPIは制限されました。
そのため、部分的に消費されたメッセージを含む、すべてのメッセージは、次の時点で完全に消費されたとみなされます。
-
message.acknowledge()
はCLIENT_ACKNOWLEDGE
セッションで呼び出されます。 -
セッションの
commit()
はトランザクション・セッションで呼び出されます。
通常の場合、セッションrollback()
は、そのセッションで受信したメッセージをロールバックします。
JMSストリーミングは使用可能ですが、次の制限があります。
-
ストリーミングはデフォルトで無効化されており、システム・プロパティ
oracle.jms.useJmsStreaming
を使用してクライアント・アプリケーションにより有効化できる -
メッセージ・データのサイズがしきい値よりも大きい場合に、デキューでストリーミングが使用されます。デフォルトのしきい値は10MBです。
-
ストリーミング・サポートは
AQjmsBytesMessage
およびAQjmsStreamMessage
で使用可能 -
ストリーミング・サポートはTxEventQキューでのみ使用可能
-
ストリーミング・サポートはThinドライバでのみ使用可能
-
ストリーミング・サポートは、メッセージ・プロデューサがメッセージ配信モードに
NON_PERSISTENT
を使用しているときは使用できない -
ストリーミングはメッセージ・リスナーではサポートされません。そのため、MessageConsumerにメッセージ・リスナー・セットがある場合、およびメッセージ・データがしきい値制限を超えている場合は、内部的に通常のデキューが使用されます。
-
ストリーミング・サポートは、確認モード
CLIENT_ACKNOWLEDGE
およびSESSION_TRANSACTED
を使用してセッションで使用可能です。
Java EEの準拠
Oracle JMSは、Oracle Sun Microsystems JMS 1.1標準に準拠しています。実行時に、Oracle Java Message Service (Oracle JMS)クライアントに対してJava EE準拠モードを定義できます。準拠モードにするには、コマンドライン・オプションとしてJavaプロパティ oracle.jms.j2eeCompliant
をTRUE
に設定します。非準拠モードにする場合は何もしません。FALSE
がデフォルト値です。
Java EE準拠をサポートし、非準拠モードでも使用できるOracle Database Advanced Queuingの機能は次のとおりです。
-
非トランザクション・セッション
-
永続サブスクライバ
-
一時キューおよびトピック
-
非永続配信モード
-
AQ$_JMS_MESSAGE
型のOracle Database Advanced Queuingキューを使用した単一のJMSキューまたはトピックに対する複数のJMSメッセージ・タイプ -
永続サブスクライバに対する
noLocal
オプション -
TxEventQはネイティブにJMSをサポートしていて、Java EEコンプライアンスに準拠しています
関連項目:
-
『Java Message Service Specification』、バージョン1.1、2002年3月18日、Sun Microsystems, Inc.
-
JMSPriorityおよびJMSExpirationに対するJavaプロパティ
oracle.jms.j2eeCompliant
の影響の詳細は、「JMSメッセージ・ヘッダー」を参照してください。 -
永続サブスクライバに対するJavaプロパティ
oracle.jms.j2eeCompliant
の影響の詳細は、「DurableSubscriber」を参照してください。
Oracle Java Message Serviceの基本操作
次のトピックでは、Oracle Database Advanced Queuing (AQ)の基本操作のためのJava Message Service (JMS)管理インタフェースについて説明します。
DBMS_AQINに対するEXECUTE権限
ユーザーがDBMS_AQIN
パッケージ内のメソッドを直接コールすることはありませんが、DBMS_AQIN
に対するEXECUTE
権限が必要です。次の構文を使用して実行します。
GRANT EXECUTE ON DBMS_AQIN to user;
ConnectionFactoryの登録
ConnectionFactoryは、次の4つの方法で登録できます。
データベースを介した登録: JDBCコネクション・パラメータの使用
public static int registerConnectionFactory(java.sql.Connection connection, java.lang.String conn_name, java.lang.String hostname, java.lang.String oracle_sid, int portno, java.lang.String driver, java.lang.String type) throws JMSException
このメソッドは、JDBCコネクション・パラメータを使用し、データベースを介してQueueConnectionFactory
またはTopicConnectionFactoryをLightweight Directory Access Protocol(LDAP)サーバーに登録します。このメソッドは静的であり、次のパラメータを取ります。
パラメータ | 説明 |
---|---|
|
登録に使用されるJDBCコネクション |
|
登録されるコネクションの名前 |
|
Oracle Database Advanced Queuingを実行しているホストの名前 |
|
Oracleシステム識別子 |
|
ポート番号 |
|
JDBCドライバの型 |
|
コネクション・ファクトリのタイプ( |
registerConnectionFactory
に渡されるデータベース接続には、AQ_ADMINISTRATOR_ROLE
を付与する必要があります。登録後は、Java Naming and Directory Interface(JNDI)を使用してコネクション・ファクトリを検索できます。
例6-1 データベースを介した登録: JDBCコネクション・パラメータの使用
String url; java.sql.connection db_conn; url = "jdbc:oracle:thin:@sun-123:1521:db1"; db_conn = DriverManager.getConnection(url, "scott", "tiger"); AQjmsFactory.registerConnectionFactory( db_conn, "queue_conn1", "sun-123", "db1", 1521, "thin", "queue");
データベースを介した登録: JDBC URLの使用
public static int registerConnectionFactory(java.sql.Connection connection, java.lang.String conn_name, java.lang.String jdbc_url, java.util.Properties info, java.lang.String type) throws JMSException
このメソッドは、JDBC URLを使用し、データベースを介してQueueConnectionFactory
またはTopicConnectionFactoryをLDAPに登録します。これは静的であり、次のパラメータを取ります。
パラメータ | 説明 |
---|---|
|
登録に使用されるJDBCコネクション |
|
登録されるコネクションの名前 |
|
接続先のURL |
|
プロパティの情報 |
|
ポート番号 |
|
コネクション・ファクトリのタイプ( |
registerConnectionFactory
に渡されるデータベース接続には、AQ_ADMINISTRATOR_ROLE
を付与する必要があります。登録後、JNDIを使用してコネクション・ファクトリを検索できます。
例6-2 データベースを介した登録: JDBC URLの使用
String url; java.sql.connection db_conn; url = "jdbc:oracle:thin:@sun-123:1521:db1"; db_conn = DriverManager.getConnection(url, "scott", "tiger"); AQjmsFactory.registerConnectionFactory( db_conn, "topic_conn1", url, null, "topic");
LDAPを介した登録: JDBCコネクション・パラメータの使用
public static int registerConnectionFactory(java.util.Hashtable env, java.lang.String conn_name, java.lang.String hostname, java.lang.String oracle_sid, int portno, java.lang.String driver, java.lang.String type) throws JMSException
このメソッドは、JDBCコネクション・パラメータを使用し、LDAPを介してQueueConnectionFactory
またはTopicConnectionFactoryをLDAPに登録します。これは静的であり、次のパラメータを取ります。
パラメータ | 説明 |
---|---|
|
LDAPコネクションの環境 |
|
登録されるコネクションの名前 |
|
Oracle Database Advanced Queuingを実行しているホストの名前 |
|
Oracleシステム識別子 |
|
ポート番号 |
|
JDBCドライバの型 |
|
コネクション・ファクトリのタイプ( |
registerConnectionFactory()
に渡されるハッシュ表に、LDAPサーバーと使用可能なコネクションを確立するための情報が含まれている必要があります。さらに、このコネクションには、LDAPサーバー内のコネクション・ファクトリのエントリに対する書込み権限が必要です(LDAPユーザーがデータベースそのものであるか、またはLDAPユーザーにGLOBAL_AQ_USER_ROLE
が付与されている必要があります)。登録後、JNDIを使用してコネクション・ファクトリを検索します。
例6-3 LDAPを介した登録: JDBCコネクション・パラメータの使用
Hashtable env = new Hashtable(5, 0.75f); /* the following statements set in hashtable env: * service provider package * the URL of the ldap server * the distinguished name of the database server * the authentication method (simple) * the LDAP username * the LDAP user password */ env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory"); env.put(Context.PROVIDER_URL, "ldap://sun-456:389"); env.put("searchbase", "cn=db1,cn=Oraclecontext,cn=acme,cn=com"); env.put(Context.SECURITY_AUTHENTICATION, "simple"); env.put(Context.SECURITY_PRINCIPAL, "cn=db1aqadmin,cn=acme,cn=com"); env.put(Context.SECURITY_CREDENTIALS, "welcome"); AQjmsFactory.registerConnectionFactory(env, "queue_conn1", "sun-123", "db1", 1521, "thin", "queue");
LDAPを介した登録: JDBC URLの使用
public static int registerConnectionFactory(java.util.Hashtable env, java.lang.String conn_name, java.lang.String jdbc_url, java.util.Properties info, java.lang.String type) throws JMSException
このメソッドは、JDBCコネクション・パラメータを使用し、LDAPを介してQueueConnectionFactory
またはTopicConnectionFactoryをLDAPに登録します。これは静的であり、次のパラメータを取ります。
パラメータ | 説明 |
---|---|
|
LDAPコネクションの環境 |
|
登録されるコネクションの名前 |
|
接続先のURL |
info |
プロパティの情報 |
|
コネクション・ファクトリのタイプ( |
registerConnectionFactory()
に渡されるハッシュ表に、LDAPサーバーと使用可能なコネクションを確立するための情報が含まれている必要があります。さらに、このコネクションには、LDAPサーバー内のコネクション・ファクトリのエントリに対する書込み権限が必要です(LDAPユーザーがデータベースそのものであるか、またはLDAPユーザーにGLOBAL_AQ_USER_ROLE
が付与されている必要があります)。登録後、JNDIを使用してコネクション・ファクトリを検索します。
例6-4 LDAPを介した登録: JDBC URLの使用
String url; Hashtable env = new Hashtable(5, 0.75f); /* the following statements set in hashtable env: * service provider package * the URL of the ldap server * the distinguished name of the database server * the authentication method (simple) * the LDAP username * the LDAP user password */ env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory"); env.put(Context.PROVIDER_URL, "ldap://sun-456:389"); env.put("searchbase", "cn=db1,cn=Oraclecontext,cn=acme,cn=com"); env.put(Context.SECURITY_AUTHENTICATION, "simple"); env.put(Context.SECURITY_PRINCIPAL, "cn=db1aqadmin,cn=acme,cn=com"); env.put(Context.SECURITY_CREDENTIALS, "welcome"); url = "jdbc:oracle:thin:@sun-123:1521:db1"; AQjmsFactory.registerConnectionFactory(env, "topic_conn1", url, null, "topic");
キュー/トピックConnectionFactoryの登録解除
LDAPのキュー/トピックConnectionFactory
を登録解除するには、2つの方法があります。
データベースを介した登録解除
public static int unregisterConnectionFactory(java.sql.Connection connection, java.lang.String conn_name) throws JMSException
このメソッドは、LDAP内のQueueConnectionFactory
またはTopicConnectionFactory
の登録を解除します。これは静的であり、次のパラメータを取ります。
パラメータ | 説明 |
---|---|
|
登録に使用されるJDBCコネクション |
|
登録されるコネクションの名前 |
unregisterConnectionFactory()
に渡されるデータベース接続には、AQ_ADMINISTRATOR_ROLE
を付与する必要があります。
例6-5 データベースを介した登録解除
String url; java.sql.connection db_conn; url = "jdbc:oracle:thin:@sun-123:1521:db1"; db_conn = DriverManager.getConnection(url, "scott", "tiger"); AQjmsFactory.unregisterConnectionFactory(db_conn, "topic_conn1");
LDAPを介した登録解除
public static int unregisterConnectionFactory(java.util.Hashtable env, java.lang.String conn_name) throws JMSException
このメソッドは、LDAP内のQueueConnectionFactory
またはTopicConnectionFactoryの登録を解除します。これは静的であり、次のパラメータを取ります。
パラメータ | 説明 |
---|---|
|
LDAPコネクションの環境 |
|
登録されるコネクションの名前 |
unregisterConnectionFactory()
に渡されるハッシュ表に、LDAPサーバーと使用可能なコネクションを確立するための情報が含まれている必要があります。さらに、このコネクションには、LDAPサーバー内のコネクション・ファクトリのエントリに対する書込み権限が必要です(LDAPユーザーがデータベースそのものであるか、またはLDAPユーザーにGLOBAL_AQ_USER_ROLE
が付与されている必要があります)。
例6-6 LDAPを介した登録解除
Hashtable env = new Hashtable(5, 0.75f); /* the following statements set in hashtable env: * service provider package * the distinguished name of the database server * the authentication method (simple) * the LDAP username * the LDAP user password */ env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory"); env.put(Context.PROVIDER_URL, "ldap://sun-456:389"); env.put("searchbase", "cn=db1,cn=Oraclecontext,cn=acme,cn=com"); env.put(Context.SECURITY_AUTHENTICATION, "simple"); env.put(Context.SECURITY_PRINCIPAL, "cn=db1aqadmin,cn=acme,cn=com"); env.put(Context.SECURITY_CREDENTIALS, "welcome"); url = "jdbc:oracle:thin:@sun-123:1521:db1"; AQjmsFactory.unregisterConnectionFactory(env, "queue_conn1");
QueueConnectionFactoryまたはTopicConnectionFactoryの取得
この項の内容は次のとおりです。
QueueConnectionFactoryの取得: JDBC URLの使用
public static javax.jms.QueueConnectionFactory getQueueConnectionFactory( java.lang.String jdbc_url, java.util.Properties info) throws JMSException
このメソッドは、JDBC URLを使用してQueueConnectionFactory
を取得します。これは静的であり、次のパラメータを取ります。
パラメータ | 説明 |
---|---|
|
接続先のURL |
info |
プロパティの情報 |
例6-7 QueueConnectionFactoryの取得: JDBC URLの使用
String url = "jdbc:oracle:oci10:internal/oracle" Properties info = new Properties(); QueueConnectionFactory qc_fact; info.put("internal_logon", "sysdba"); qc_fact = AQjmsFactory.getQueueConnectionFactory(url, info);
QueueConnectionFactoryの取得: JDBCコネクション・パラメータの使用
public static javax.jms.QueueConnectionFactory getQueueConnectionFactory( java.lang.String hostname, java.lang.String oracle_sid, int portno, java.lang.String driver) throws JMSException
このメソッドは、JDBC接続パラメータを使用してQueueConnectionFactory
を取得します。これは静的であり、次のパラメータを取ります。
パラメータ | 説明 |
---|---|
|
Oracle Database Advanced Queuingを実行しているホストの名前 |
|
Oracleシステム識別子 |
|
ポート番号 |
|
JDBCドライバの型 |
例6-8 QueueConnectionFactoryの取得: JDBCコネクション・パラメータの使用
String host = "dlsun"; String ora_sid = "rdbms10i" String driver = "thin"; int port = 5521; QueueConnectionFactory qc_fact; qc_fact = AQjmsFactory.getQueueConnectionFactory(host, ora_sid, port, driver);
TopicConnectionFactoryの取得: JDBC URLの使用
public static javax.jms.QueueConnectionFactory getQueueConnectionFactory( java.lang.String jdbc_url, java.util.Properties info) throws JMSException
このメソッドは、JDBC URLを使用してTopicConnectionFactory
を取得します。これは静的であり、次のパラメータを取ります。
パラメータ | 説明 |
---|---|
|
接続先のURL |
info |
プロパティの情報 |
例6-9 TopicConnectionFactoryの取得: JDBC URLの使用
String url = "jdbc:oracle:oci10:internal/oracle" Properties info = new Properties(); TopicConnectionFactory tc_fact; info.put("internal_logon", "sysdba"); tc_fact = AQjmsFactory.getTopicConnectionFactory(url, info);
TopicConnectionFactoryの取得: JDBCコネクション・パラメータの使用
public static javax.jms.TopicConnectionFactory getTopicConnectionFactory( java.lang.String hostname, java.lang.String oracle_sid, int portno, java.lang.String driver) throws JMSException
このメソッドは、JDBC接続パラメータを使用してTopicConnectionFactory
を取得します。これは静的であり、次のパラメータを取ります。
パラメータ | 説明 |
---|---|
|
Oracle Database Advanced Queuingを実行しているホストの名前 |
|
Oracleシステム識別子 |
|
ポート番号 |
|
JDBCドライバの型 |
例6-10 TopicConnectionFactoryの取得: JDBCコネクション・パラメータの使用
String host = "dlsun"; String ora_sid = "rdbms10i" String driver = "thin"; int port = 5521; TopicConnectionFactory tc_fact; tc_fact = AQjmsFactory.getTopicConnectionFactory(host, ora_sid, port, driver);
LDAP内のQueueConnectionFactoryまたはTopicConnectionFactoryの取得
このメソッドは、LDAPからQueueConnectionFactory
またはTopicConnectionFactory
を取得します。
例6-11 LDAP内のQueueConnectionFactoryまたはTopicConnectionFactoryの取得
Hashtable env = new Hashtable(5, 0.75f); DirContext ctx; queueConnectionFactory qc_fact; /* the following statements set in hashtable env: * service provider package * the URL of the ldap server * the distinguished name of the database server * the authentication method (simple) * the LDAP username * the LDAP user password */ env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory"); env.put(Context.PROVIDER_URL, "ldap://sun-456:389"); env.put(Context.SECURITY_AUTHENTICATION, "simple"); env.put(Context.SECURITY_PRINCIPAL, "cn=db1aquser1,cn=acme,cn=com"); env.put(Context.SECURITY_CREDENTIALS, "welcome"); ctx = new InitialDirContext(env); ctx = (DirContext)ctx.lookup("cn=OracleDBConnections,cn=db1,cn=Oraclecontext,cn=acme,cn=com"); qc_fact = (queueConnectionFactory)ctx.lookup("cn=queue_conn1");
LDAP内のキューまたはトピックの取得
このメソッドは、LDAPからキューまたはトピックを取得します。
例6-12 LDAP内のキューまたはトピックの取得
Hashtable env = new Hashtable(5, 0.75f); DirContext ctx; topic topic_1; /* the following statements set in hashtable env: * service provider package * the URL of the ldap server * the distinguished name of the database server * the authentication method (simple) * the LDAP username * the LDAP user password */ env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory"); env.put(Context.PROVIDER_URL, "ldap://sun-456:389"); env.put(Context.SECURITY_AUTHENTICATION, "simple"); env.put(Context.SECURITY_PRINCIPAL, "cn=db1aquser1,cn=acme,cn=com"); env.put(Context.SECURITY_CREDENTIALS, "welcome"); ctx = new InitialDirContext(env); ctx = (DirContext)ctx.lookup("cn=OracleDBQueues,cn=db1,cn=Oraclecontext,cn=acme,cn=com"); topic_1 = (topic)ctx.lookup("cn=topic_1");
AQキュー表の作成
public oracle.AQ.AQQueueTable createQueueTable( java.lang.String owner, java.lang.String name, oracle.AQ.AQQueueTableProperty property) throws JMSException
このメソッドはキュー表を作成します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
キュー表の所有者(スキーマ) |
name |
キュー表名 |
property |
キュー表のプロパティ |
キューを保持するためにキュー表を使用する場合、キュー表はマルチコンシューマに対して使用可能にすることはできません(デフォルト)。トピックを保持するためにキュー表を使用する場合、キュー表はマルチコンシューマに対して使用可能にする必要があります。
CLOB、BLOBおよびBFILEオブジェクトはOracle Database Advanced Queuingオブジェクト型のロードで有効な属性です。ただし、CLOBおよびBLOBのみ、Oracle8i以降のOracle Database Advanced Queuing 伝播を使用して伝播できます。
ノート:
現在、TxEventQキューは、DBMS_AQADM
PL/SQL APIのみで作成および削除できます。
例6-13 キュー表の作成
QueueSession q_sess = null; AQQueueTable q_table = null; AQQueueTableProperty qt_prop = null; qt_prop = new AQQueueTableProperty("SYS.AQ$_JMS_BYTES_MESSAGE"); q_table = ((AQjmsSession)q_sess).createQueueTable( "boluser", "bol_ship_queue_table", qt_prop);
キューの作成
この項の内容は次のとおりです。
Point-to-Pointキューの作成
public javax.jms.Queue createQueue( oracle.AQ.AQQueueTable q_table, java.lang.String queue_name, oracle.jms.AQjmsDestinationProperty dest_property) throws JMSException
このメソッドは、指定したキュー表にキューを作成します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
キューが作成されるキュー表。キュー表はシングル・コンシューマ・キュー表である必要があります。 |
|
作成されるキューの名前。 |
|
キューのプロパティ |
このメソッドは、Oracle JMSに固有です。このメソッドでは標準のJava javax.jms.Session
オブジェクトは使用できません。かわりに、標準の型をOracle JMSの具体的なクラスoracle.jms.AQjmsSession
にキャストする必要があります。
例6-14 Point-to-Pointキューの作成
QueueSession q_sess; AQQueueTable q_table; AqjmsDestinationProperty dest_prop; Queue queue; queue = ((AQjmsSession)q_sess).createQueue(q_table, "jms_q1", dest_prop);
パブリッシュ・サブスクライブ・トピックの作成
public javax.jms.Topic createTopic( oracle.AQ.AQQueueTable q_table, java.lang.String topic_name, oracle.jms.AQjmsDestinationProperty dest_property) throws JMSException
このメソッドは、パブリッシュ・サブスクライブ・モデルにトピックを作成します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
キューが作成されるキュー表。キュー表はマルチ・コンシューマ・キュー表である必要があります。 |
|
作成されるキューの名前。 |
|
キューのプロパティ |
このメソッドは、Oracle JMSに固有です。このメソッドでは標準のJava javax.jms.Session
オブジェクトは使用できません。かわりに、標準の型をOracle JMSの具体的なクラスoracle.jms.AQjmsSession
にキャストする必要があります。
例6-16では、在庫不足のために受注を満たせない場合は、注文を処理しているトランザクションが終了されます。bookedorders
トピックは、max_retries
= 4、retry_delay
= 12時間で設定されています。したがって、注文が2日で満たされない場合は、例外キューに移動されます。
例6-15 パブリッシュ・サブスクライブ・トピックの作成
TopicSession t_sess; AQQueueTable q_table; AqjmsDestinationProperty dest_prop; Topic topic; topic = ((AQjmsSessa)t_sess).createTopic(q_table, "jms_t1", dest_prop);
例6-16 メッセージの最大試行回数と最大遅延の指定
public BolOrder process_booked_order(TopicSession jms_session) { Topic topic; TopicSubscriber tsubs; ObjectMessage obj_message; BolCustomer customer; BolOrder booked_order = null; String country; int i = 0; try { /* get a handle to the OE_bookedorders_topic */ topic = ((AQjmsSession)jms_session).getTopic("WS", "WS_bookedorders_topic"); /* Create local subscriber - to track messages for Western Region */ tsubs = jms_session.createDurableSubscriber(topic, "SUBS1", "Region = 'Western' ", false); /* wait for a message to show up in the topic */ obj_message = (ObjectMessage)tsubs.receive(10); booked_order = (BolOrder)obj_message.getObject(); customer = booked_order.getCustomer(); country = customer.getCountry(); if (country == "US") { jms_session.commit(); } else { jms_session.rollback(); booked_order = null; } }catch (JMSException ex) { System.out.println("Exception " + ex) ;} return booked_order; }
Point-to-Pointキューおよびパブリッシュ・サブスクライブ・トピックのためのTxEventQキューの作成
AQ JMSには、TxEventQを作成および削除するための新しいAPIが定義されています。JMSにはキュー変更APIはありません。署名は次のとおりです。
/** * Create a TxEventQ queue. It also internally creates the related queue * objects (table, indexes) based on this name. * * @param queueName name of the queue to be created, format is schema.queueName * (where the schema. is optional * @param isMultipleConsumer flag to indicate whether the queue is a * multi-consumer or single-consumer queue * @return javax.jms.Destination * @throws JMSException if the queue could not be created */ public synchronized javax.jms.Destination createJMSTransactionalEventQueue(String queueName, boolean isMultipleConsumer) throws JMSException { return createJMSTransactionalEventQueue(queueName, isMultipleConsumer, null, 0, null); }
/** * Create a TxEventQ queue. It also internally creates the related queue * objects (table, indexes) based on this name. * * @param queueName name of the queue to be created, format is schema.queueName * (where the schema. is optional * @param isMultipleConsumer flag to indicate whether the queue is a * multi-consumer or single-consumer queue * @param storageClause additional storage clause * @param maxRetries retry count before skip the message while dequeue * @param comment comment for the queue * @return javax.jms.Destination * @throws JMSException if the queue could not be created */ public Destination createJMSTransactionalEventQueue(java.lang.String queueName, boolean isMultipleConsumer, java.lang.String storageClause, int maxRetries, java.lang.String comment) throws JMSException
AQキュー表の取得
public oracle.AQ.AQQueueTable getQueueTable(java.lang.String owner, java.lang.String name) throws JMSException
このメソッドでは、AQキューのキュー表を取得します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
キュー表の所有者(スキーマ) |
name |
キュー表名 |
コネクションをオープンしたコール側がキュー表の所有者ではない場合、コール側にはキュー表内のキュー/トピックに対するOracle Database Advanced Queuingエンキュー/デキュー権限が必要です。この権限がない場合、キュー表は取得できません。
例6-17 キュー表の取得
QueueSession q_sess; AQQueueTable q_table; q_table = ((AQjmsSession)q_sess).getQueueTable( "boluser", "bol_ship_queue_table");
権限の付与および取消し
この項の内容は次のとおりです。
Oracle Database Advanced Queuingシステム権限の付与
public void grantSystemPrivilege(java.lang.String privilege, java.lang.String grantee, boolean admin_option) throws JMSException
このメソッドは、ユーザーおよびロールにOracle Database Advanced Queuingシステム権限を付与します。
パラメータ | 説明 |
---|---|
|
|
|
権限受領者(ユーザー、ロールまたは |
|
TRUEに設定すると、権限受領者はこのプロシージャを使用して他のユーザーまたはロールにシステム権限を付与できます。 |
最初は、SYS
およびSYSTEM
のみがこのプロシージャを正常に使用できます。ENQUEUE_ANY
権限を付与されたユーザーは、データベース内の任意のキューにメッセージをエンキューできます。DEQUEUE_ANY
権限を付与されたユーザーは、データベース内の任意のキューからメッセージをデキューできます。MANAGE_ANY
権限を付与されたユーザーは、データベースのすべてのスキーマに対してDBMS_AQADM
コールを実行できます。
例6-18 Oracle Database Advanced Queuingシステム権限の付与
TopicSession t_sess; ((AQjmsSession)t_sess).grantSystemPrivilege("ENQUEUE_ANY", "scott", false);
Oracle Database Advanced Queuingシステム権限の取消し
public void revokeSystemPrivilege(java.lang.String privilege, java.lang.String grantee) throws JMSException
このメソッドは、ユーザーまたはロールからOracle Database Advanced Queuingシステム権限を取り消します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
|
|
権限受領者(ユーザー、ロールまたは |
ENQUEUE_ANY
権限を付与されたユーザーは、データベース内の任意のキューにメッセージをエンキューできます。DEQUEUE_ANY
権限を付与されたユーザーは、データベース内の任意のキューからメッセージをデキューできます。MANAGE_ANY
権限を付与されたユーザーは、データベースのすべてのスキーマに対してDBMS_AQADM
コールを実行できます。
例6-19 Oracle Database Advanced Queuingシステム権限の取消し
TopicSession t_sess; ((AQjmsSession)t_sess).revokeSystemPrivilege("ENQUEUE_ANY", "scott");
パブリッシュ・サブスクライブ・トピック権限の付与
public void grantTopicPrivilege(javax.jms.Session session, java.lang.String privilege, java.lang.String grantee, boolean grant_option) throws JMSException
このメソッドは、パブリッシュ・サブスクライブ・モデルでトピック権限を付与します。初期設定では、キュー表の所有者のみがこのプロシージャを使用してそのトピックの権限を付与できます。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
|
|
|
|
権限受領者(ユーザー、ロールまたは |
|
TRUEに設定すると、権限受領者はこのプロシージャを使用して他のユーザーまたはロールにシステム権限を付与できます。 |
例6-20 パブリッシュ・サブスクライブ・トピック権限の付与
TopicSession t_sess; Topic topic; ((AQjmsDestination)topic).grantTopicPrivilege( t_sess, "ENQUEUE", "scott", false);
パブリッシュ・サブスクライブ・トピック権限の取消し
public void revokeTopicPrivilege(javax.jms.Session session, java.lang.String privilege, java.lang.String grantee) throws JMSException
このメソッドは、パブリッシュ・サブスクライブ・モデルでトピック権限を取り消します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
JMSセッション |
|
|
|
権限が取り消される権限受領者(ユーザー、ロールまたは |
例6-21 パブリッシュ・サブスクライブ・トピック権限の取消し
TopicSession t_sess; Topic topic; ((AQjmsDestination)topic).revokeTopicPrivilege(t_sess, "ENQUEUE", "scott");
Point-to-Pointキュー権限の付与
public void grantQueuePrivilege(javax.jms.Session session, java.lang.String privilege, java.lang.String grantee, boolean grant_option) throws JMSException
このメソッドは、Point-to-Pointモデルでキュー権限を付与します。初期設定では、キュー表の所有者のみがこのプロシージャを使用してそのキューの権限を付与できます。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
JMSセッション |
|
|
|
権限受領者(ユーザー、ロールまたは |
|
TRUEに設定すると、権限受領者はこのプロシージャを使用して他のユーザーまたはロールにシステム権限を付与できます。 |
例6-22 Point-to-Pointキュー権限の付与
QueueSession q_sess; Queue queue; ((AQjmsDestination)queue).grantQueuePrivilege( q_sess, "ENQUEUE", "scott", false);
Point-to-Pointキュー権限の取消し
public void revokeQueuePrivilege(javax.jms.Session session, java.lang.String privilege, java.lang.String grantee) throws JMSException
このメソッドは、Point-to-Pointモデルでキュー権限を取り消します。初期設定では、キュー表の所有者のみがこのプロシージャを使用してそのキューの権限を付与できます。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
JMSセッション |
|
|
|
権限が取り消される権限受領者(ユーザー、ロールまたは |
権限を取り消すには、取消し実行者がその権限の付与者である必要があります。また、GRANT
オプションによって伝播された権限は、伝播させた付与者の権限が取り消されたときに取り消されます。
例6-23 Point-to-Pointキュー権限の取消し
QueueSession q_sess; Queue queue; ((AQjmsDestination)queue).revokeQueuePrivilege(q_sess, "ENQUEUE", "scott");
宛先の管理
宛先の開始
public void start(javax.jms.Session session, boolean enqueue, boolean dequeue) throws JMSException
このメソッドは、宛先を開始します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
JMSセッション |
|
|
デキュー |
|
例6-24 宛先の開始
TopicSession t_sess; QueueSession q_sess; Topic topic; Queue queue; (AQjmsDestination)topic.start(t_sess, true, true); (AQjmsDestination)queue.start(q_sess, true, true);
宛先の停止
public void stop(javax.jms.Session session, boolean enqueue, boolean dequeue, boolean wait) throws JMSException
このメソッドは、宛先を停止します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
JMSセッション |
|
|
デキュー |
|
|
TRUEに設定すると、キュー/トピックの保留トランザクションは、宛先が停止する前に完了できます。 |
例6-25 宛先の停止
TopicSession t_sess; Topic topic; ((AQjmsDestination)topic).stop(t_sess, true, false);
宛先の変更
public void alter(javax.jms.Session session, oracle.jms.AQjmsDestinationProperty dest_property) throws JMSException
このメソッドは、宛先を変更します。これには、次のプロパティがあります。
パラメータ | 説明 |
---|---|
|
JMSセッション |
dest_property |
キューまたはトピックの新規プロパティ |
例6-26 宛先の変更
QueueSession q_sess; Queue queue; TopicSession t_sess; Topic topic; AQjmsDestionationProperty dest_prop1, dest_prop2; ((AQjmsDestination)queue).alter(dest_prop1); ((AQjmsDestination)topic).alter(dest_prop2);
伝播スケジュール
この項の内容は次のとおりです。
ノート:
現在、TxEventQは、DBMS_AQADM
PL/SQL APIでのみ管理され、伝播をサポートしていません。
伝播のスケジューリング
public void schedulePropagation(javax.jms.Session session, java.lang.String destination, java.util.Date start_time, java.lang.Double duration, java.lang.String next_time, java.lang.Double latency) throws JMSException
このメソッドは、伝播をスケジュールします。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
JMSセッション |
|
伝播がスケジュールされているリモート・データベースのデータベース・リンク。文字列NULLは、伝播がトピックのデータベース内のすべてのサブスクライバに対してスケジュールされていることを示します。 |
|
伝播開始時刻 |
|
伝播継続時間 |
|
次回の伝播開始時刻。 |
|
許容可能な待機時間(秒単位)。待機時間は、メッセージがエンキューされた時間と伝播された時間の差異です。 |
メッセージ受信者が、同一または異なるキュー内の同じ宛先に複数存在する場合、メッセージはすべての受信者に同時に伝播されます。
例6-28 伝播のスケジューリング
TopicSession t_sess; Topic topic; ((AQjmsDestination)topic).schedulePropagation( t_sess, null, null, null, null, new Double(0));
伝播スケジュールの有効化
public void enablePropagationSchedule(javax.jms.Session session, java.lang.String destination) throws JMSException
このメソッドは、伝播スケジュールを有効化します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
JMSセッション |
|
宛先データベースのデータベース・リンク。NULL文字列は、ローカル・データベースに伝播されることを意味します。 |
例6-29 伝播スケジュールの有効化
TopicSession t_sess; Topic topic; ((AQjmsDestination)topic).enablePropagationSchedule(t_sess, "dbs1");
伝播スケジュールの変更
public void alterPropagationSchedule(javax.jms.Session session, java.lang.String destination, java.lang.Double duration, java.lang.String next_time, java.lang.Double latency) throws JMSException
このメソッドは、伝播スケジュールを変更します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
JMSセッション |
|
伝播がスケジュールされているリモート・データベースのデータベース・リンク。文字列NULLは、伝播がトピックのデータベース内のすべてのサブスクライバに対してスケジュールされていることを示します。 |
|
伝播継続時間 |
|
次回の伝播開始時刻。 |
|
許容可能な待機時間(秒単位)。待機時間は、メッセージがエンキューされた時間と伝播された時間の差異です。 |
例6-30 伝播スケジュールの変更
TopicSession t_sess; Topic topic; ((AQjmsDestination)topic).alterPropagationSchedule( t_sess, null, 30, null, new Double(30));
伝播スケジュールの無効化
public void disablePropagationSchedule(javax.jms.Session session, java.lang.String destination) throws JMSException
このメソッドは、伝播スケジュールを無効化します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
JMSセッション |
|
宛先データベースのデータベース・リンク。NULL文字列は、ローカル・データベースに伝播されることを意味します。 |
例6-31 伝播スケジュールの無効化
TopicSession t_sess; Topic topic; ((AQjmsDestination)topic).disablePropagationSchedule(t_sess, "dbs1");
伝播スケジュールの解除
public void unschedulePropagation(javax.jms.Session session, java.lang.String destination) throws JMSException
このメソッドは、スケジュール済伝播のスケジュールを解除します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
JMSセッション |
|
宛先データベースのデータベース・リンク。NULL文字列は、ローカル・データベースに伝播されることを意味します。 |
例6-32 伝播スケジュールの解除
TopicSession t_sess; Topic topic; ((AQjmsDestination)topic).unschedulePropagation(t_sess, "dbs1");
Oracle Java Message ServiceのPoint-to-Point
次のトピックでは、Oracle Database Advanced Queuing (AQ)のJava Message Service (JMS)操作インタフェースのうち、Point-to-Point操作固有のコンポーネントについて説明します。Point-to-Pointおよびパブリッシュ/サブスクライブで共有されるコンポーネントについては、「Oracle Java Message Serviceの共有インタフェース」を参照してください。
ユーザー名/パスワードが設定されたConnectionの作成
public javax.jms.Connection createConnection( java.lang.String username, java.lang.String password) throws JMSException
このメソッドは、指定されたユーザー名とパスワードを使用して、Point-to-Point操作とパブリッシュ/サブスクライブ操作の両方をサポートするコネクションを作成します。これは新規メソッドで、JMSバージョン1.1仕様をサポートしています。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
キュー用にデータベースに接続するユーザー名 |
|
サーバーへの接続を作成するためのパスワード |
コネクションの確立: デフォルトのConnectionFactoryパラメータの使用
public javax.jms.Connection createConnection() throws JMSException
このメソッドは、デフォルトのConnectionFactoryパラメータを使用して、Point-to-Point操作とパブリッシュ・サブスクライブ操作の両方をサポートするコネクションを作成します。これは新規メソッドで、JMSバージョン1.1仕様をサポートしています。ConnectionFactory
プロパティにデフォルトのユーザー名およびパスワードを含めないと、JMSExceptionが発生します。
ユーザー名/パスワードが設定されたQueueConnectionの作成
public javax.jms.QueueConnection createQueueConnection( java.lang.String username, java.lang.String password) throws JMSException
このメソッドは、指定されたユーザー名とパスワードを使用してキュー・コネクションを作成します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
キュー用にデータベースに接続するユーザー名 |
|
サーバーへの接続を作成するためのパスワード |
例6-33 ユーザー名/パスワードが設定されたQueueConnectionの作成
QueueConnectionFactory qc_fact = AQjmsFactory.getQueueConnectionFactory( "sun123", "oratest", 5521, "thin"); QueueConnection qc_conn = qc_fact.createQueueConnection("jmsuser", "jmsuser");
QueueConnectionの確立: オープンしているJDBCコネクションの使用
public static javax.jms.QueueConnection createQueueConnection( java.sql.Connection jdbc_connection) throws JMSException
このメソッドは、オープンしているJDBCコネクションを使用してキュー・コネクションを作成します。これは静的であり、次のパラメータを取ります。
パラメータ | 説明 |
---|---|
|
データベースへの有効なオープン・コネクション |
ユーザーがJMS操作に対して既存の(たとえばコネクション・プールの)JDBCコネクションを使用するときに、例6-34のメソッドを使用できます。この場合、JMSは新しいコネクションをオープンせずに、提供されたJDBCコネクションを使用してJMS QueueConnection
オブジェクトを作成します。
例6-35のメソッドは、データベース(JDBCサーバー・ドライバ)内のJavaストアド・プロシージャからJMSを使用する場合に、JMS QueueConnection
を確立する唯一の方法です。
例6-34 QueueConnectionの確立: オープンしているJDBCコネクションの使用
Connection db_conn; /* previously opened JDBC connection */ QueueConnection qc_conn = AQjmsQueueConnectionFactory.createQueueConnection( db_conn);
例6-35 データベース内のJavaプロシージャからのQueueConnectionの作成
OracleDriver ora = new OracleDriver(); QueueConnection qc_conn = AQjmsQueueConnectionFactory.createQueueConnection(ora.defaultConnection());
QueueConnectionの確立: デフォルトのConnectionFactoryパラメータの使用
public javax.jms.QueueConnection createQueueConnection() throws JMSException
このメソッドは、デフォルトのConnectionFactoryパラメータによってキュー・コネクションを確立します。QueueConnectionFactoryのプロパティにデフォルトのユーザー名およびパスワードを含めないと、JMSExceptionが発生します。
QueueConnectionの確立: オープンしているOracleOCIConnectionPoolの使用
public static javax.jms.QueueConnection createQueueConnection( oracle.jdbc.pool.OracleOCIConnectionPool cpool) throws JMSException
このメソッドは、オープンしているOracleOCIConnectionPool
を使用してキュー・コネクションを作成します。これは静的であり、次のパラメータを取ります。
パラメータ | 説明 |
---|---|
|
データベースに対してオープンなOCIコネクション・プール |
ユーザーがJMS操作に対して既存のOracleOCIConnectionPool
インスタンスを使用するとき、例6-36のメソッドを使用できます。この場合、JMSは新しいOracleOCIConnectionPool
インスタンスをオープンせずに、提供されたOracleOCIConnectionPool
インスタンスを使用してJMS QueueConnectionオブジェクトを作成します。
例6-36 QueueConnectionの確立: オープンしているOracleOCIConnectionPoolの使用
OracleOCIConnectionPool cpool; /* previously created OracleOCIConnectionPool */ QueueConnection qc_conn = AQjmsQueueConnectionFactory.createQueueConnection(cpool);
セッションの作成
public javax.jms.Session createSession(boolean transacted, int ack_mode) throws JMSException
このメソッドは、Point-to-Point操作とパブリッシュ・サブスクライブ操作の両方をサポートするSession
を作成します。これは新規メソッドで、JMSバージョン1.1仕様をサポートしています。トランザクションおよび非トランザクション・セッションがサポートされています。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
TRUEに設定すると、セッションはトランザクションになります。 |
|
コンシューマまたはクライアントが受信したメッセージを認識するかどうかを示します。トランザクション処理のセッションの場合は無視されます。有効な値は |
QueueSessionの作成
public javax.jms.QueueSession createQueueSession( boolean transacted, int ack_mode) throws JMSException
このメソッドはQueueSession
を作成します。トランザクションおよび非トランザクション・セッションがサポートされています。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
TRUEに設定すると、セッションはトランザクションになります。 |
|
コンシューマまたはクライアントが受信したメッセージを認識するかどうかを示します。トランザクション処理のセッションの場合は無視されます。有効な値は |
例6-37 トランザクションQueueSessionの作成
QueueConnection qc_conn; QueueSession q_sess = qc_conn.createQueueSession(true, 0);
QueueSenderの作成
public javax.jms.QueueSender createSender(javax.jms.Queue queue) throws JMSException
このメソッドはQueueSender
を作成します。送信者の作成時にデフォルト・キューが設定されていない場合は、すべての送信操作で宛先キューを指定する必要があります。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
宛先キューの名前 |
メッセージの送信: デフォルトの送信オプションのQueueSenderの使用
public void send(javax.jms.Queue queue, javax.jms.Message message) throws JMSException
このメソッドは、メッセージをデフォルトの送信オプションでQueueSender
を使用して送信します。この操作では、メッセージのpriority
(1
)とtimeToLive
(infinite
)のデフォルト値を使用します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
このメッセージを送信するためのキュー |
|
送信するメッセージ |
QueueSender
がデフォルト・キューで作成されている場合、必ずしもsend()
コールにqueueパラメータを指定する必要はありません。キューがsend()
操作で指定されている場合、この値はQueueSender
のデフォルト・キューをオーバーライドします。
QueueSender
がデフォルト・キューを使用しないで作成されている場合、send()
コールごとにqueueパラメータを指定する必要があります。
例6-38 任意のキューにメッセージを送信するセンダーの作成
/* Create a sender to send messages to any queue */ QueueSession jms_sess; QueueSender sender1; TextMessage message; sender1 = jms_sess.createSender(null); sender1.send(queue, message);
例6-39 特定のキューにメッセージを送信するセンダーの作成
/* Create a sender to send messages to a specific queue */ QueueSession jms_sess; QueueSender sender2; Queue billed_orders_que; TextMessage message; sender2 = jms_sess.createSender(billed_orders_que); sender2.send(queue, message);
メッセージの送信: 送信オプションを指定したQueueSenderの使用
public void send(javax.jms.Queue queue, javax.jms.Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
このメソッドは、送信オプションを指定してQueueSender
を使用してメッセージを送信します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
このメッセージを送信するためのキュー |
|
送信するメッセージ |
|
使用する配信モード |
|
このメッセージの優先順位 |
|
ミリ秒で指定されるメッセージの保存時間(ゼロは無制限) |
QueueSender
がデフォルト・キューで作成されている場合、必ずしもsend()
コールにqueueパラメータを指定する必要はありません。キューがsend()
操作で指定されている場合、この値はQueueSender
のデフォルト・キューをオーバーライドします。
QueueSender
がデフォルト・キューを使用しないで作成されている場合、send()
コールごとにqueueパラメータを指定する必要があります。
例6-40 メッセージの送信: 送信オプションを指定したQueueSenderの使用1
/* Create a sender to send messages to any queue */ /* Send a message to new_orders_que with priority 2 and timetoLive 100000 milliseconds */ QueueSession jms_sess; QueueSender sender1; TextMessage mesg; Queue new_orders_que sender1 = jms_sess.createSender(null); sender1.send(new_orders_que, mesg, DeliveryMode.PERSISTENT, 2, 100000);
例6-41 メッセージの送信: 送信オプションを指定したQueueSenderの使用2
/* Create a sender to send messages to a specific queue */ /* Send a message with priority 1 and timetoLive 400000 milliseconds */ QueueSession jms_sess; QueueSender sender2; Queue billed_orders_que; TextMessage mesg; sender2 = jms_sess.createSender(billed_orders_que); sender2.send(mesg, DeliveryMode.PERSISTENT, 1, 400000);
QueueBrowserの作成: 標準JMS型メッセージ
public javax.jms.QueueBrowser createBrowser(javax.jms.Queue queue, java.lang.String messageSelector) throws JMSException
このメソッドは、Text、Stream、Objects、BytesまたはMapMessageメッセージ本体を使用するキューに対するQueueBrowser
を作成します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
アクセス対象のキュー |
|
|
java.util.Enumeration
内のメソッドを使用して、メッセージのリストを参照してください。
関連項目:
例6-42 QueueBrowserの作成: セレクタの指定なし
/* Create a browser without a selector */ QueueSession jms_session; QueueBrowser browser; Queue queue; browser = jms_session.createBrowser(queue);
例6-43 QueueBrowserの作成: セレクタの指定あり
/* Create a browser for queues with a specified selector */ QueueSession jms_session; QueueBrowser browser; Queue queue; /* create a Browser to look at messages with correlationID = RUSH */ browser = jms_session.createBrowser(queue, "JMSCorrelationID = 'RUSH'");
QueueBrowserの作成: 標準JMS型メッセージ、メッセージのロック
public javax.jms.QueueBrowser createBrowser(javax.jms.Queue queue, java.lang.String messageSelector, boolean locked) throws JMSException
このメソッドは、TextMessage、StreamMessage、ObjectMessage、BytesMessageまたはMapMessageメッセージ本体を使用するキューに対して、ブラウズ中にメッセージをロックするQueueBrowser
を作成します。ロックされたメッセージは、ブラウズ・セッションによってトランザクションが終了されるまで、他のコンシューマによって削除できません。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
アクセス対象のキュー |
|
|
|
TRUEに設定すると、ブラウズ中のメッセージはロックされます(UPDATEのSELECTと同様)。 |
例6-44 QueueBrowserの作成: セレクタの指定なし、メッセージをロック
/* Create a browser without a selector */ QueueSession jms_session; QueueBrowser browser; Queue queue; browser = jms_session.createBrowser(queue, null, true);
例6-45 QueueBrowserの作成: セレクタの指定あり、メッセージをロック
/* Create a browser for queues with a specified selector */ QueueSession jms_session; QueueBrowser browser; Queue queue; /* create a Browser to look at messages with correlationID = RUSH in lock mode */ browser = jms_session.createBrowser(queue, "JMSCorrelationID = 'RUSH'", true);
QueueBrowserの作成: Oracleオブジェクト型メッセージ
public javax.jms.QueueBrowser createBrowser(javax.jms.Queue queue, java.lang.String messageSelector, java.lang.Object payload_factory) throws JMSException
このメソッドは、Oracleオブジェクト型メッセージ・キューに対するQueueBrowser
を作成します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
アクセス対象のキュー |
|
|
payload_factory |
Oracleのユーザー定義型にマップするJavaクラスの |
SQLオブジェクト型ペイロードにマップされる特定のJavaクラスに対するCustomDatumFactory
は、静的メソッドgetFactory
を使用して取得できます。
ノート:
CustomDatum
は今後のリリースではサポートされなくなります。かわりに、ORADataFactory
ペイロード・ファクトリを使用してください。
キューtest_queue
は、SCOTT.EMPLOYEE
型のペイロードを持ち、このOracleオブジェクト型に対してJPublisherによって生成されるJavaクラスがEmployeeであると仮定します。Employeeクラスは、CustomDatum
インタフェースを実装します。このクラスに対するCustomDatumFactory
は、Employee.getFactory()
メソッドを使用して取得できます。
ノート:
TEQは、オブジェクト型メッセージをサポートしていません
関連項目:
例6-46 ADTMessageに対するQueueBrowserの作成
/* Create a browser for a Queue with AdtMessage messages of type EMPLOYEE*/ QueueSession jms_session QueueBrowser browser; Queue test_queue; browser = ((AQjmsSession)jms_session).createBrowser(test_queue, "corrid='EXPRESS'", Employee.getFactory());
QueueBrowserの作成: Oracleオブジェクト型メッセージ、メッセージのロック
public javax.jms.QueueBrowser createBrowser(javax.jms.Queue queue, java.lang.String messageSelector, java.lang.Object payload_factory, boolean locked) throws JMSException
このメソッドは、Oracleオブジェクト型メッセージ・キューに対して、ブラウズ中にメッセージをロックするQueueBrowser
を作成します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
アクセス対象のキュー |
|
|
payload_factory |
Oracleのユーザー定義型にマップするJavaクラスの |
|
TRUEに設定すると、ブラウズ中のメッセージはロックされます(UPDATEのSELECTと同様)。 |
ノート:
CustomDatum
は今後のリリースではサポートされなくなります。かわりに、ORADataFactory
ペイロード・ファクトリを使用してください。
ノート:
TxEventQキューは、オブジェクト型メッセージをサポートしていません
例6-47 AdtMessageキューに対するQueueBrowserの作成、メッセージをロック
/* Create a browser for a Queue with AdtMessage messagess of type EMPLOYEE* in lock mode/ QueueSession jms_session QueueBrowser browser; Queue test_queue; browser = ((AQjmsSession)jms_session).createBrowser(test_queue, null, Employee.getFactory(), true);
QueueReceiverの作成: 標準JMS型メッセージ
public javax.jms.QueueReceiver createReceiver(javax.jms.Queue queue, java.lang.String messageSelector) throws JMSException
このメソッドは、標準JMS型メッセージ・キューに対するQueueReceiver
を作成します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
アクセス対象のキュー |
|
|
関連項目:
例6-48 QueueReceiverの作成: セレクタの指定なし
/* Create a receiver without a selector */ QueueSession jms_session QueueReceiver receiver; Queue queue; receiver = jms_session.createReceiver(queue);
例6-49 QueueReceiverの作成: セレクタの指定あり
/* Create a receiver for queues with a specified selector */ QueueSession jms_session; QueueReceiver receiver; Queue queue; /* create Receiver to receive messages with correlationID starting with EXP */ browser = jms_session.createReceiver(queue, "JMSCorrelationID LIKE 'EXP%'");
QueueReceiverの作成: Oracleオブジェクト型メッセージ
public javax.jms.QueueReceiver createReceiver(javax.jms.Queue queue, java.lang.String messageSelector, java.lang.Object payload_factory) throws JMSException
このメソッドは、Oracleオブジェクト型メッセージ・キューに対するQueueReceiver
を作成します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
アクセス対象のキュー |
|
|
payload_factory |
Oracleのユーザー定義型にマップするJavaクラスの |
SQLオブジェクト型ペイロードにマップされる特定のJavaクラスに対するCustomDatumFactory
は、静的メソッドgetFactory
を使用して取得できます。
ノート:
CustomDatum
は今後のリリースではサポートされなくなります。かわりに、ORADataFactory
ペイロード・ファクトリを使用してください。
キューtest_queue
は、SCOTT.EMPLOYEE
型のペイロードを持ち、このOracleオブジェクト型に対してJPublisherによって生成されるJavaクラスがEmployeeであると仮定します。Employeeクラスは、CustomDatum
インタフェースを実装します。このクラスに対するORADataFactory
は、Employee.getFactory()メソッドを使用して取得できます。
ノート:
TxEventQキューは、オブジェクト型メッセージをサポートしていません
関連項目:
例6-50 QueueReceiverの作成: AdtMessageメッセージ
/* Create a receiver for a Queue with AdtMessage messages of type EMPLOYEE*/ QueueSession jms_session QueueReceiver receiver; Queue test_queue; browser = ((AQjmsSession)jms_session).createReceiver( test_queue, "JMSCorrelationID = 'MANAGER', Employee.getFactory());
Oracle Java Message Serviceのパブリッシュ/サブスクライブ
次のトピックでは、Oracle Database Advanced Queuing (AQ)のJava Message Service (JMS)操作インタフェースのうち、パブリッシュ・サブスクライブ操作固有のコンポーネントについて説明します。Point-to-Pointおよびパブリッシュ/サブスクライブで共有されるコンポーネントについては、「Oracle Java Message Serviceの共有インタフェース」を参照してください。
ユーザー名/パスワードが設定されたConnectionの作成
public javax.jms.Connection createConnection( java.lang.String username, java.lang.String password) throws JMSException
このメソッドは、指定されたユーザー名とパスワードを使用して、Point-to-Point操作とパブリッシュ/サブスクライブ操作の両方をサポートするコネクションを作成します。これは新規メソッドで、JMSバージョン1.1仕様をサポートしています。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
キュー用にデータベースに接続するユーザー名 |
|
サーバーへの接続を作成するためのパスワード |
コネクションの確立: デフォルトのConnectionFactoryパラメータの使用
public javax.jms.Connection createConnection() throws JMSException
このメソッドは、デフォルトのConnectionFactoryパラメータを使用して、Point-to-Point操作とパブリッシュ・サブスクライブ操作の両方をサポートするコネクションを作成します。これは新規メソッドで、JMSバージョン1.1仕様をサポートしています。ConnectionFactory
プロパティにデフォルトのユーザー名およびパスワードを含めないと、JMSExceptionが発生します。
ユーザー名/パスワードが設定されたTopicConnectionの作成
public javax.jms.TopicConnection createTopicConnection( java.lang.String username, java.lang.String password) throws JMSException
このメソッドは、指定されたユーザー名とパスワードを使用してTopicConnection
を作成します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
キュー用にデータベースに接続するユーザー名 |
|
サーバーへの接続を作成するためのパスワード |
例6-51 ユーザー名/パスワードが設定されたTopicConnectionの作成
TopicConnectionFactory tc_fact = AQjmsFactory.getTopicConnectionFactory("sun123", "oratest", 5521, "thin"); /* Create a TopicConnection using a username/password */ TopicConnection tc_conn = tc_fact.createTopicConnection("jmsuser", "jmsuser");
TopicConnectionの確立: オープンしているJDBCコネクションの使用
public static javax.jms.TopicConnection createTopicConnection( java.sql.Connection jdbc_connection) throws JMSException
このメソッドは、オープンしているJDBCコネクションを使用してTopicConnection
を作成します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
データベースへの有効なオープン・コネクション |
例6-52 TopicConnectionの確立: オープンしているJDBCコネクションの使用
Connection db_conn; /*previously opened JDBC connection */ TopicConnection tc_conn = AQjmsTopicConnectionFactory createTopicConnection(db_conn);
例6-53 TopicConnectionの確立: 新規JDBCコネクションの使用
OracleDriver ora = new OracleDriver(); TopicConnection tc_conn = AQjmsTopicConnectionFactory.createTopicConnection(ora.defaultConnection());
TopicConnectionの確立: オープンしているOracleOCIConnectionPoolの使用
public static javax.jms.TopicConnection createTopicConnection( oracle.jdbc.pool.OracleOCIConnectionPool cpool) throws JMSException
このメソッドは、オープンしているOracleOCIConnectionPool
を使用してTopicConnection
を作成します。これは静的であり、次のパラメータを取ります。
パラメータ | 説明 |
---|---|
|
データベースに対してオープンなOCIコネクション・プール |
例6-54 TopicConnectionの確立: オープンしているOracleOCIConnectionPoolの使用
OracleOCIConnectionPool cpool; /* previously created OracleOCIConnectionPool */ TopicConnection tc_conn = AQjmsTopicConnectionFactory.createTopicConnection(cpool);
セッションの作成
public javax.jms.Session createSession(boolean transacted, int ack_mode) throws JMSException
このメソッドは、Point-to-Point操作とパブリッシュ・サブスクライブ操作の両方をサポートするSession
を作成します。これは新規で、JMSバージョン1.1仕様をサポートしています。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
TRUEに設定すると、セッションはトランザクションになります。 |
|
コンシューマまたはクライアントが受信したメッセージを認識するかどうかを示します。トランザクション処理のセッションの場合は無視されます。有効な値は |
TopicSessionの作成
public javax.jms.TopicSession createTopicSession(boolean transacted, int ack_mode) throws JMSException
このメソッドはTopicSession
を作成します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
TRUEに設定すると、セッションはトランザクションになります。 |
|
コンシューマまたはクライアントが受信したメッセージを認識するかどうかを示します。トランザクション処理のセッションの場合は無視されます。有効な値は |
例6-55 TopicSessionの作成
TopicConnection tc_conn; TopicSession t_sess = tc_conn.createTopicSession(true,0);
TopicPublisherの作成
public javax.jms.TopicPublisher createPublisher(javax.jms.Topic topic) throws JMSException
このメソッドはTopicPublisher
を作成します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
パブリッシュ対象のトピック。識別されていないプロデューサの場合は、NULL。 |
メッセージのパブリッシュ: 最小限の指定
public void publish(javax.jms.Message message) throws JMSException
このメソッドは、最小限の指定でメッセージをパブリッシュします。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
送信するメッセージ |
TopicPublisher
では、メッセージのpriority
(1
)とtimeToLive
(infinite
)のデフォルト値を使用します。
例6-56 トピック指定なしのパブリッシュ
/* Publish without specifying topic */ TopicConnectionFactory tc_fact = null; TopicConnection t_conn = null; TopicSession jms_sess; TopicPublisher publisher1; Topic shipped_orders; int myport = 5521; /* create connection and session */ tc_fact = AQjmsFactory.getTopicConnectionFactory( "MYHOSTNAME", "MYSID", myport, "oci8"); t_conn = tc_fact.createTopicConnection("jmstopic", "jmstopic"); /* create TopicSession */ jms_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE); /* get shipped orders topic */ shipped_orders = ((AQjmsSession )jms_sess).getTopic( "OE", "Shipped_Orders_Topic"); publisher1 = jms_sess.createPublisher(shipped_orders); /* create TextMessage */ TextMessage jms_sess.createTextMessage(); /* publish without specifying the topic */ publisher1.publish(text_message);
例6-57 相関と遅延の指定によるパブリッシュ
TopicConnectionFactory tc_fact = null; TopicConnection t_conn = null; TopicSession jms_sess; TopicPublisher publisher1; Topic shipped_orders; int myport = 5521; /* create connection and session */ tc_fact = AQjmsFactory.getTopicConnectionFactory( "MYHOSTNAME", "MYSID", myport, "oci8"); t_conn = tc_fact.createTopicConnection("jmstopic", "jmstopic"); jms_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE); shipped_orders = ((AQjmsSession )jms_sess).getTopic( "OE", "Shipped_Orders_Topic"); publisher1 = jms_sess.createPublisher(shipped_orders); /* Create TextMessage */ TextMessage jms_sess.createTextMessage(); /* Set correlation and delay */ /* Set correlation */ jms_sess.setJMSCorrelationID("FOO"); /* Set delay of 30 seconds */ jms_sess.setLongProperty("JMS_OracleDelay", 30); /* Publish */ publisher1.publish(text_message);
トピック指定によるメッセージのパブリッシュ
public void publish(javax.jms.Topic topic, javax.jms.Message message) throws JMSException
このメソッドは、トピックを指定してメッセージをパブリッシュします。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
パブリッシュするトピック |
|
送信するメッセージ |
TopicPublisher
がデフォルトのトピックで作成されている場合、publish()
コールにtopic
パラメータを指定することはできません。トピックが指定されている場合、その値はTopicPublisher
のデフォルトのトピックをオーバーライドします。TopicPublisher
がデフォルトのトピックを使用しないで作成されている場合、publish()
コールごとにトピックを指定する必要があります。
例6-58 トピック指定によるパブリッシュ
/* Publish specifying topic */ TopicConnectionFactory tc_fact = null; TopicConnection t_conn = null; TopicSession jms_sess; TopicPublisher publisher1; Topic shipped_orders; int myport = 5521; /* create connection and session */ tc_fact = AQjmsFactory.getTopicConnectionFactory( 'MYHOSTNAME', 'MYSID', myport, 'oci8'); t_conn = tc_fact.createTopicConnection("jmstopic", "jmstopic"); jms_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE); /* create TopicPublisher */ publisher1 = jms_sess.createPublisher(null); /* get topic object */ shipped_orders = ((AQjmsSession )jms_sess).getTopic( 'WS', 'Shipped_Orders_Topic'); /* create text message */ TextMessage jms_sess.createTextMessage(); /* publish specifying the topic */ publisher1.publish(shipped_orders, text_message);
メッセージのパブリッシュ: 配信モード、優先順位およびTimeToLiveの指定
public void publish(javax.jms.Topic topic, javax.jms.Message message, oracle.jms.AQjmsAgent[] recipient_list, int deliveryMode, int priority, long timeToLive) throws JMSException
このメソッドは、配信モード、優先順位およびTimeToLive
を指定してメッセージをパブリッシュします。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
メッセージのパブリッシュ先のトピック( |
|
パブリッシュするメッセージ |
|
メッセージがパブリッシュされる受信者のリスト。受信者の型は |
|
|
|
このメッセージの優先順位 |
|
ミリ秒で指定されるメッセージの保存時間(ゼロは無制限) |
例6-59 優先順位とTimeToLiveの指定によるパブリッシュ
TopicConnectionFactory tc_fact = null; TopicConnection t_conn = null; TopicSession jms_sess; TopicPublisher publisher1; Topic shipped_orders; int myport = 5521; /* create connection and session */ tc_fact = AQjmsFactory.getTopicConnectionFactory( "MYHOSTNAME", "MYSID", myport, "oci8"); t_conn = tc_fact.createTopicConnection("jmstopic", "jmstopic"); jms_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE); shipped_orders = ((AQjmsSession )jms_sess).getTopic( "OE", "Shipped_Orders_Topic"); publisher1 = jms_sess.createPublisher(shipped_orders); /* Create TextMessage */ TextMessage jms_sess.createTextMessage(); /* Publish message with priority 1 and time to live 200 seconds */ publisher1.publish(text_message, DeliveryMode.PERSISTENT, 1, 200000);
メッセージのパブリッシュ: 受信者リストの指定
public void publish(javax.jms.Message message, oracle.jms.AQjmsAgent[] recipient_list) throws JMSException
このメソッドは、トピック・サブスクライバをオーバーライドする受信者リストを指定して、メッセージをパブリッシュします。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
パブリッシュするメッセージ |
|
メッセージがパブリッシュされる受信者のリスト。受信者の型は |
例6-60 トピック・サブスクライバをオーバーライドする受信者リストの指定によるパブリッシュ
/* Publish specifying priority and timeToLive */ TopicConnectionFactory tc_fact = null; TopicConnection t_conn = null; TopicSession jms_sess; TopicPublisher publisher1; Topic shipped_orders; int myport = 5521; AQjmsAgent[] recipList; /* create connection and session */ tc_fact = AQjmsFactory.getTopicConnectionFactory( "MYHOSTNAME", "MYSID", myport, "oci8"); t_conn = tc_fact.createTopicConnection("jmstopic", "jmstopic"); jms_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE); shipped_orders = ((AQjmsSession )jms_sess).getTopic( "OE", "Shipped_Orders_Topic"); publisher1 = jms_sess.createPublisher(shipped_orders); /* create TextMessage */ TextMessage jms_sess.createTextMessage(); /* create two receivers */ recipList = new AQjmsAgent[2]; recipList[0] = new AQjmsAgent( "ES", "ES.shipped_orders_topic", AQAgent.DEFAULT_AGENT_PROTOCOL); recipList[1] = new AQjmsAgent( "WS", "WS.shipped_orders_topic", AQAgent.DEFAULT_AGENT_PROTOCOL); /* publish message specifying a recipient list */ publisher1.publish(text_message, recipList);
JMSトピックに対するDurableSubscriberの作成: セレクタの指定なし
public javax.jms.TopicSubscriber createDurableSubscriber( javax.jms.Topic topic, java.lang.String subs_name) throws JMSException
このメソッドは、セレクタ指定なしのJMSトピックにDurableSubscriberを作成します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
サブスクライブ先の非一時トピック |
|
このサブスクリプションの識別に使用される名前 |
トピックへの排他アクセス
CreateDurableSubscriber()
およびUnsubscribe()
は、どちらもターゲット・トピックに排他的にアクセスする必要があります。これらのコールが適用されるときに同じトピックに対する保留中のJMS send()
、publish()
またはreceive()
操作がある場合、例外ORA-4020が発生します。この問題を解決するには、次の2つの方法があります。
-
設定またはクリーン・アップのフェーズでの
createDurableSubscriber()
およびUnsubscribe()
のコールを、トピックに対して保留中の他のJMS操作がない場合に制限します。これによって、必要なリソースが他のJMS操作のコールによって保持されないようにできます。 -
createDurableSubscriber()
またはUnsubscribe()
をコールする前に、TopicSession.commit
をコールします。
例6-61 JMSトピックに対する永続サブスクライバの作成: セレクタの指定なし
TopicConnectionFactory tc_fact = null; TopicConnection t_conn = null; TopicSession jms_sess; TopicSubscriber subscriber1; Topic shipped_orders; int myport = 5521; AQjmsAgent[] recipList; /* create connection and session */ tc_fact = AQjmsFactory.getTopicConnectionFactory( "MYHOSTNAME", "MYSID", myport, "oci8"); t_conn = tc_fact.createTopicConnection("jmstopic", "jmstopic"); jms_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE); shipped_orders = ((AQjmsSession )jms_sess).getTopic( "OE", "Shipped_Orders_Topic"); /* create a durable subscriber on the shipped_orders topic*/ subscriber1 = jms_sess.createDurableSubscriber( shipped_orders, 'WesternShipping');
JMSトピックに対するDurableSubscriberの作成: セレクタの指定あり
public javax.jms.TopicSubscriber createDurableSubscriber( javax.jms.Topic topic, java.lang.String subs_name, java.lang.String messageSelector, boolean noLocal) throws JMSException
このメソッドは、セレクタを指定して、JMSトピックに対する永続サブスクライバを作成します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
サブスクライブ先の非一時トピック |
|
このサブスクリプションの識別に使用される名前 |
|
|
|
TRUEに設定すると、独自のコネクションによってパブリッシュされたメッセージの配信が禁止されます。 |
クライアントは、同一の名前および異なるmessageSelector
で永続トピック・サブスクライバを作成することによって、既存の永続サブスクリプションを変更できます。トピックのサブスクリプションを終了するには、unsubscribeコールが必要です。
関連項目:
例6-62 JMSトピックに対する永続サブスクライバの作成: セレクタの指定あり
TopicConnectionFactory tc_fact = null; TopicConnection t_conn = null; TopicSession jms_sess; TopicSubscriber subscriber1; Topic shipped_orders; int myport = 5521; AQjmsAgent[] recipList; /* create connection and session */ tc_fact = AQjmsFactory.getTopicConnectionFactory( "MYHOSTNAME", "MYSID", myport, "oci8"); t_conn = tc_fact.createTopicConnection("jmstopic", "jmstopic"); jms_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE); shipped_orders = ((AQjmsSession )jms_sess).getTopic( "OE", "Shipped_Orders_Topic"); /* create a subscriber */ /* with condition on JMSPriority and user property 'Region' */ subscriber1 = jms_sess.createDurableSubscriber( shipped_orders, 'WesternShipping', "JMSPriority > 2 and Region like 'Western%'", false);
Oracleオブジェクト型トピックに対するDurableSubscriberの作成: セレクタの指定なし
public javax.jms.TopicSubscriber createDurableSubscriber( javax.jms.Topic topic, java.lang.String subs_name, java.lang.Object payload_factory) throws JMSException
このメソッドは、セレクタを指定せずに、Oracleオブジェクト型のトピックに対する永続サブスクライバを作成します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
サブスクライブ先の非一時トピック |
|
このサブスクリプションの識別に使用される名前 |
|
Oracleのユーザー定義型にマップするJavaクラスの |
ノート:
-
CustomDatum
は今後のリリースではサポートされなくなります。かわりに、ORADataFactory
ペイロード・ファクトリを使用してください。 -
TxEventQキューは、オブジェクト型メッセージをサポートしていません。
関連項目:
例6-63 Oracleオブジェクト型トピックに対する永続サブスクライバの作成: セレクタの指定なし
/* Subscribe to an ADT queue */ TopicConnectionFactory tc_fact = null; TopicConnection t_conn = null; TopicSession t_sess = null; TopicSession jms_sess; TopicSubscriber subscriber1; Topic shipped_orders; int my[port = 5521; AQjmsAgent[] recipList; /* the java mapping of the oracle object type created by J Publisher */ ADTMessage message; /* create connection and session */ tc_fact = AQjmsFactory.getTopicConnectionFactory( "MYHOSTNAME", "MYSID", myport, "oci8"); t_conn = tc_fact.createTopicConnection("jmstopic", "jmstopic"); jms_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE); shipped_orders = ((AQjmsSession )jms_sess).getTopic( "OE", "Shipped_Orders_Topic"); /* create a subscriber, specifying the correct CustomDatumFactory */ subscriber1 = jms_sess.createDurableSubscriber( shipped_orders, 'WesternShipping', AQjmsAgent.getFactory());
Oracleオブジェクト型トピックに対するDurableSubscriberの作成: セレクタの指定あり
public javax.jms.TopicSubscriber createDurableSubscriber( javax.jms.Topic topic, java.lang.String subs_name, java.lang.String messageSelector, boolean noLocal, java.lang.Object payload_factory) throws JMSException
このメソッドは、セレクタを指定して、Oracleオブジェクト型のトピックに対する永続サブスクライバを作成します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
サブスクライブ先の非一時トピック |
|
このサブスクリプションの識別に使用される名前 |
|
|
|
TRUEに設定すると、独自のコネクションによってパブリッシュされたメッセージの配信が禁止されます。 |
|
Oracleのユーザー定義型にマップするJavaクラスの |
ノート:
-
CustomDatum
は今後のリリースではサポートされなくなります。かわりに、ORADataFactory
ペイロード・ファクトリを使用してください。 -
TxEventQキューは、オブジェクト型メッセージをサポートしていません。
関連項目:
例6-64 Oracleオブジェクト型トピックに対する永続サブスクライバの作成: セレクタの指定あり
TopicConnectionFactory tc_fact = null; TopicConnection t_conn = null; TopicSession jms_sess; TopicSubscriber subscriber1; Topic shipped_orders; int myport = 5521; AQjmsAgent[] recipList; /* the java mapping of the oracle object type created by J Publisher */ ADTMessage message; /* create connection and session */ tc_fact = AQjmsFactory.getTopicConnectionFactory( "MYHOSTNAME", "MYSID", myport, "oci8"); t_conn = tc_fact.createTopicConnection("jmstopic", "jmstopic"); jms_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE); shipped_orders = ((AQjmsSession )jms_sess).getTopic( "OE", "Shipped_Orders_Topic"); /* create a subscriber, specifying correct CustomDatumFactory and selector */ subscriber1 = jms_sess.createDurableSubscriber( shipped_orders, "WesternShipping", "priority > 1 and tab.user_data.region like 'WESTERN %'", false, ADTMessage.getFactory());
トピック・サブスクライバ作成時の変換の指定
キュー/トピックにメッセージを送信/パブリッシュするときに変換を適用できます。メッセージを変換してからキュー/トピックに送信します。
変換は、AQjmsQueueSender
およびAQjmsTopicPublisher
のsetTransformation
インタフェースを使用して指定できます。
CreateDurableSubscriber()
コールを使用してトピック・サブスクライバを作成する場合も、変換を指定できます。取り出されたメッセージを変換してから、サブスクライバに戻します。指定されたサブスクライバがすでにCreateDurableSubscriber()
コールに存在する場合、その変換は指定した変換に設定されます。
例6-65 変換による宛先へのメッセージの送信
注文入力アプリケーションによって処理された注文情報をWS_bookedorders_topic
にパブリッシュする必要があると仮定します。正しいフォーマットでトピックにメッセージを挿入するには、前の項で定義した変換OE2WSを使用します。
public void ship_bookedorders( TopicSession jms_session, AQjmsADTMessage adt_message) { TopicPublisher publisher; Topic topic; try { /* get a handle to the WS_bookedorders_topic */ topic = ((AQjmsSession)jms_session).getTopic("WS", "WS_bookedorders_topic"); publisher = jms_session.createPublisher(topic); /* set the transformation in the publisher */ ((AQjmsTopicPublisher)publisher).setTransformation("OE2WS"); publisher.publish(topic, adt_message); } catch (JMSException ex) { System.out.println("Exception :" ex); } }
例6-66 トピック・サブスクライバ作成時の変換の指定
西部地域向け出荷処理アプリケーションは、変換OE2WS
とともにOE_bookedorders_topicにサブスクライブします。この変換はメッセージに適用され、Oracleオブジェクト型WS.WS_orders
のメッセージが戻されます。
WSOrder JavaクラスがJPublisherによって生成され、Oracleオブジェクト型WS.WS_order
にマップされると仮定します。:
public AQjmsAdtMessage retrieve_bookedorders(TopicSession jms_session) { TopicSubscriber subscriber; Topic topic; AQjmsAdtMessage msg = null; try { /* get a handle to the OE_bookedorders_topic */ topic = ((AQjmsSession)jms_session).getTopic("OE", "OE_bookedorders_topic"); /* create a subscriber with the transformation OE2WS */ subs = ((AQjmsSession)jms_session).createDurableSubscriber( topic, 'WShip', null, false, WSOrder.getFactory(), "OE2WS"); msg = subscriber.receive(10); } catch (JMSException ex) { System.out.println("Exception :" ex); } return (AQjmsAdtMessage)msg; }
リモート・サブスクライバの作成: JMSメッセージ
public void createRemoteSubscriber(javax.jms.Topic topic, oracle.jms.AQjmsAgent remote_subscriber, java.lang.String messageSelector) throws JMSException
このメソッドは、JMSメッセージ・トピックに対するリモート・サブスクライバを作成します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
サブスクライブ先のトピック。 |
|
リモート・サブスクライバを表す |
|
|
Oracle Database Advanced Queuingでは、トピックがリモート・サブスクライバ(同一または異なるデータベース内の他のトピックのサブスクライバ)を持つことができます。リモート・サブスクライバを使用するには、ローカル・トピックとリモート・トピック間の伝播を設定する必要があります。
リモート・サブスクライバは、リモート・トピックの特定のコンシューマまたはリモート・トピックのすべてのサブスクライバにできます。リモート・サブスクライバは、AQjmsAgent
構造を使用して定義されます。AQjmsAgent
は、名前およびアドレスで構成されます。名前は、リモート・トピックのconsumer_name
を参照します。アドレスはリモート・トピックを参照します。構文は、schema.topic_name[@dblink]
です。
リモート・トピックで特定のコンシューマに対してメッセージをパブリッシュするには、リモート・トピックでの受信者のsubscription_name
をAQjmsAgent
のnameフィールドに指定し、リモート・トピックをaddressフィールドに指定する必要があります。リモート・トピックのすべてのサブスクライバに対してメッセージをパブリッシュするには、AQjmsAgent
のnameフィールドをNULLに設定する必要があります。
ノート:
TxEventQキューは、リモート・サブスクライバをサポートしていません。
関連項目:
例6-67 リモート・サブスクライバの作成: JMS型メッセージ・トピック
TopicConnectionFactory tc_fact = null; TopicConnection t_conn = null; TopicSession t_sess = null; TopicSession jms_sess; TopicSubscriber subscriber1; Topic shipped_orders; int my[port = 5521; AQjmsAgent remoteAgent; /* create connection and session */ tc_fact = AQjmsFactory.getTopicConnectionFactory( "MYHOSTNAME", "MYSID", myport, "oci8"); t_conn = tc_fact.createTopicConnection("jmstopic", "jmstopic"); jms_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE); shipped_orders = ((AQjmsSession )jms_sess).getTopic( "OE", "Shipped_Orders_Topic"); remoteAgent = new AQjmsAgent("WesternRegion", "WS.shipped_orders_topic", null); /* create a remote subscriber (selector is null )*/ subscriber1 = ((AQjmsSession)jms_sess).createRemoteSubscriber( shipped_orders, remoteAgent, null);
リモート・サブスクライバの作成: Oracleオブジェクト型メッセージ
public void createRemoteSubscriber(javax.jms.Topic topic, oracle.jms.AQjmsAgent remote_subscriber, java.lang.String messageSelector, java.lang.Object payload_factory) throws JMSException
このメソッドは、Oracleオブジェクト型メッセージ・トピックに対するリモート・サブスクライバを作成します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
サブスクライブ先のトピック。 |
|
リモート・サブスクライバを表す |
|
|
|
Oracleのユーザー定義型にマップするJavaクラスの |
ノート:
-
CustomDatum
は今後のリリースではサポートされなくなります。かわりに、ORADataFactory
ペイロード・ファクトリを使用してください。 -
TxEventQキューは、リモート・サブスクライバまたはオブジェクト型メッセージをサポートしていません。
Oracle Database Advanced Queuingでは、トピックがリモート・サブスクライバ(同一または異なるデータベース内の他のトピックのサブスクライバ)を持つことができます。リモート・サブスクライバを使用するには、ローカル・トピックとリモート・トピック間の伝播を設定する必要があります。
リモート・サブスクライバは、リモート・トピックの特定のコンシューマまたはリモート・トピックのすべてのサブスクライバにできます。リモート・サブスクライバは、AQjmsAgent
構造を使用して定義されます。AQjmsAgent
は、名前およびアドレスで構成されます。名前は、リモート・トピックのconsumer_name
を参照します。アドレスはリモート・トピックを参照します。構文は、schema.topic_name[@dblink]
です。
リモート・トピックで特定のコンシューマに対してメッセージをパブリッシュするには、リモート・トピックでの受信者のsubscription_name
をAQjmsAgent
のnameフィールドに指定し、リモート・トピックをaddressフィールドに指定する必要があります。リモート・トピックのすべてのサブスクライバに対してメッセージをパブリッシュするには、AQjmsAgent
のnameフィールドをNULLに設定する必要があります。
ノート:
AQは、宛先が同じである複数のdblinkの使用をサポートしていません。この問題を解決するには、各宛先に対し1つのデータベース・リンクを使用します。
関連項目:
例6-68 リモート・サブスクライバの作成: Oracleオブジェクト型メッセージ・トピック
TopicConnectionFactory tc_fact = null; TopicConnection t_conn = null; TopicSession t_sess = null; TopicSession jms_sess; TopicSubscriber subscriber1; Topic shipped_orders; int my[port = 5521; AQjmsAgent remoteAgent; ADTMessage message; /* create connection and session */ tc_fact = AQjmsFactory.getTopicConnectionFactory( "MYHOSTNAME", "MYSID", myport, "oci8"); t_conn = tc_fact.createTopicConnection("jmstopic", "jmstopic"); /* create TopicSession */ jms_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE); /* get the Shipped order topic */ shipped_orders = ((AQjmsSession )jms_sess).getTopic( "OE", "Shipped_Orders_Topic"); /* create a remote agent */ remoteAgent = new AQjmsAgent("WesternRegion", "WS.shipped_orders_topic", null); /* create a remote subscriber with null selector*/ subscriber1 = ((AQjmsSession)jms_sess).createRemoteSubscriber( shipped_orders, remoteAgent, null, message.getFactory);
リモート・サブスクライバ作成時の変換の指定
Oracle Database Advanced Queuingによって、リモート・サブスクライバ(他のデータベースにあるサブスクライバ)をトピックにサブスクライブできます。
createRemoteSubscriber()
コールを使用してリモート・サブスクライバを作成する場合、変換を指定できます。これによって、異なるフォーマットのトピック間でメッセージを伝播できます。トピックに対してパブリッシュされたメッセージがリモート・サブスクライバの基準を満たしている場合、Oracle Database Advanced Queuingはリモート・サブスクライバに指定されているリモート・データベースにあるキュー/トピックにメッセージを自動的に伝播します。変換が指定される場合も、Oracle Database Advanced Queuingはその変換をメッセージに適用してからリモート・データベース上のキュー/トピックに伝播します。
ノート:
TxEventQキューは、リモート・サブスクライバをサポートしていません。
例6-69 リモート・サブスクライバ作成時の変換の指定
メッセージが自動的にWS.WS_bookedorders_topicに伝播されるようにリモート・サブスクライバをOE.OE_bookedorders_topicに作成します。変換OE2WSは、WS_bookedorders_topicに到達したメッセージが正しいフォーマットとなるように、リモート・サブスクライバを作成するときに指定します。
WSOrder JavaクラスがJPublisherによって生成され、Oracleオブジェクト型WS.WS_order
にマップされると仮定します。
public void create_remote_sub(TopicSession jms_session) { AQjmsAgent subscriber; Topic topic; try { /* get a handle to the OE_bookedorders_topic */ topic = ((AQjmsSession)jms_session).getTopic("OE", "OE_bookedorders_topic"); subscriber = new AQjmsAgent("WShip", "WS.WS_bookedorders_topic"); ((AQjmsSession )jms_session).createRemoteSubscriber( topic, subscriber, null, WSOrder.getFactory(),"OE2WS"); } catch (JMSException ex) { System.out.println("Exception :" ex); } }
永続サブスクリプションのサブスクライブの解除: ローカル・サブスクライバ
public void unsubscribe(javax.jms.Topic topic, java.lang.String subs_name) throws JMSException
このメソッドは、ローカル・サブスクライバについて永続サブスクリプションのサブスクライブを解除します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
サブスクライブ解除先の非一時トピック |
|
このサブスクリプションの識別に使用される名前 |
関連項目:
例6-70 永続サブスクリプションのサブスクライブの解除: ローカル・サブスクライバ
TopicConnectionFactory tc_fact = null; TopicConnection t_conn = null; TopicSession jms_sess; TopicSubscriber subscriber1; Topic shipped_orders; int myport = 5521; AQjmsAgent[] recipList; /* create connection and session */ tc_fact = AQjmsFactory.getTopicConnectionFactory( "MYHOSTNAME", "MYSID", myport, "oci8"); t_conn = tc_fact.createTopicConnection("jmstopic", "jmstopic"); jms_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE); shipped_orders = ((AQjmsSession )jms_sess).getTopic( "OE", "Shipped_Orders_Topic"); /* unsusbcribe "WesternShipping" from shipped_orders */ jms_sess.unsubscribe(shipped_orders, "WesternShipping");
永続サブスクリプションのサブスクライブの解除: リモート・サブスクライバ
public void unsubscribe(javax.jms.Topic topic, oracle.jms.AQjmsAgent remote_subscriber) throws JMSException
このメソッドは、リモート・サブスクライバについて永続サブスクリプションのサブスクライブを解除します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
サブスクライブ解除先の非一時トピック |
|
リモート・サブスクライバを表す |
ノート:
TEQキューは、リモート・サブスクライバをサポートしていません。
関連項目:
例6-71 永続サブスクリプションのサブスクライブの解除: リモート・サブスクライバ
TopicConnectionFactory tc_fact = null; TopicConnection t_conn = null; TopicSession t_sess = null; TopicSession jms_sess; Topic shipped_orders; int myport = 5521; AQjmsAgent remoteAgent; /* create connection and session */ tc_fact = AQjmsFactory.getTopicConnectionFactory( "MYHOSTNAME", "MYSID", myport, "oci8"); t_conn = tc_fact.createTopicConnection("jmstopic", "jmstopic"); jms_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE); shipped_orders = ((AQjmsSession )jms_sess).getTopic( "OE", "Shipped_Orders_Topic"); remoteAgent = new AQjmsAgent("WS", "WS.Shipped_Orders_Topic", null); /* unsubscribe the remote agent from shipped_orders */ ((AQjmsSession)jms_sess).unsubscribe(shipped_orders, remoteAgent);
TopicReceiverの作成: 標準JMS型メッセージ・トピック
public oracle.jms.AQjmsTopicReceiver createTopicReceiver( javax.jms.Topic topic, java.lang.String receiver_name, java.lang.String messageSelector) throws JMSException
このメソッドは、標準JMS型メッセージ・トピックに対するTopicReceiver
を作成します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
アクセスするトピック |
|
メッセージ・レシーバの名前。 |
|
|
Oracle Database Advanced Queuingでは、特定の受信者にメッセージを送信できます。これらの受信者は、トピックのサブスクライバである場合とそうでない場合があります。受信者がトピックに対するサブスクライバでない場合、明示的にアドレス指定されているメッセージのみを受信します。このメソッドは、永続サブスクライバ以外のコンシューマに対するTopicReceiver
オブジェクトの作成に使用する必要があります。
関連項目:
例6-72 TopicReceiverの作成: 標準JMS型メッセージ
TopicConnectionFactory tc_fact = null; TopicConnection t_conn = null; TopicSession t_sess = ull; TopicSession jms_sess; Topic shipped_orders; int myport = 5521; TopicReceiver receiver; /* create connection and session */ tc_fact = AQjmsFactory.getTopicConnectionFactory( "MYHOSTNAME", "MYSID", myport, "oci8"); t_conn = tc_fact.createTopicConnection("jmstopic", "jmstopic"); jms_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE); shipped_orders = ((AQjmsSession )jms_sess).getTopic( "WS", "Shipped_Orders_Topic"); receiver = ((AQjmsSession)jms_sess).createTopicReceiver( shipped_orders, "WesternRegion", null);
TopicReceiverの作成: Oracleオブジェクト型メッセージ・トピック
public oracle.jms.AQjmsTopicReceiver createTopicReceiver( javax.jms.Topic topic, java.lang.String receiver_name, java.lang.String messageSelector, java.lang.Object payload_factory) throws JMSException
このメソッドは、セレクタで、Oracleオブジェクト型メッセージ・トピックに対するTopicReceiver
を作成します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
アクセスするトピック |
|
メッセージ・レシーバの名前。 |
|
|
|
Oracleのユーザー定義型にマップするJavaクラスの |
ノート:
-
CustomDatum
は今後のリリースではサポートされなくなります。かわりに、ORADataFactory
ペイロード・ファクトリを使用してください。 -
TxEventQキューは、オブジェクト型メッセージをサポートしていません。
Oracle Database Advanced Queuingでは、トピックのすべてのサブスクライバ、または特定の受信者にメッセージを送信できます。これらの受信者は、トピックのサブスクライバである場合とそうでない場合があります。受信者がトピックに対するサブスクライバでない場合、明示的にアドレス指定されているメッセージのみを受信します。このメソッドは、永続サブスクライバ以外のコンシューマに対するTopicReceiver
オブジェクトの作成に使用する必要があります。
関連項目:
例6-73 TopicReceiverの作成: Oracleオブジェクト型メッセージ
TopicConnectionFactory tc_fact = null; TopicConnection t_conn = null; TopicSession t_sess = null; TopicSession jms_sess; Topic shipped_orders; int myport = 5521; TopicReceiver receiver; /* create connection and session */ tc_fact = AQjmsFactory.getTopicConnectionFactory( "MYHOSTNAME", "MYSID", myport, "oci8"); t_conn = tc_fact.createTopicConnection("jmstopic", "jmstopic"); jms_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE); shipped_orders = ((AQjmsSession )jms_sess).getTopic( "WS", "Shipped_Orders_Topic"); receiver = ((AQjmsSession)jms_sess).createTopicReceiver( shipped_orders, "WesternRegion", null);
TopicBrowserの作成: 標準JMSメッセージ
public oracle.jms.TopicBrowser createBrowser(javax.jms.Topic topic, java.lang.String cons_name, java.lang.String messageSelector) throws JMSException
このメソッドは、TextMessage
、StreamMessage
、ObjectMessage
、BytesMessage
またはMapMessage
メッセージ本体を使用するトピックに対するTopicBrowser
を作成します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
アクセスするトピック |
|
永続サブスクライバまたはコンシューマの名前。 |
|
|
|
Oracleのユーザー定義型にマップするJavaクラスの |
関連項目:
例6-74 TopicBrowserの作成: セレクタの指定なし
/* Create a browser without a selector */ TopicSession jms_session; TopicBrowser browser; Topic topic; browser = ((AQjmsSession) jms_session).createBrowser(topic, "SUBS1");
例6-75 TopicBrowserの作成: セレクタの指定あり
/* Create a browser for topics with a specified selector */ TopicSession jms_session; TopicBrowser browser; Topic topic; /* create a Browser to look at messages with correlationID = RUSH */ browser = ((AQjmsSession) jms_session).createBrowser( topic, "SUBS1", "JMSCorrelationID = 'RUSH'");
TopicBrowserの作成: 標準JMSメッセージ、メッセージのロック
public oracle.jms.TopicBrowser createBrowser(javax.jms.Topic topic, java.lang.String cons_name, java.lang.String messageSelector, boolean locked) throws JMSException
このメソッドは、Text、Stream、Objects、Bytesまたはマップ・メッセージを使用するトピックに対して、ブラウズ中にメッセージをロックするTopicBrowser
を作成します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
アクセスするトピック |
|
永続サブスクライバまたはコンシューマの名前。 |
|
|
|
TRUEに設定すると、ブラウズ中のメッセージはロックされます(UPDATEのSELECTと同様)。 |
例6-76 TopicBrowserの作成: セレクタの指定なし、ブラウズ中にメッセージをロック
/* Create a browser without a selector */ TopicSession jms_session; TopicBrowser browser; Topic topic; browser = ((AQjmsSession) jms_session).createBrowser( topic, "SUBS1", true);
例6-77 TopicBrowserの作成: セレクタの指定あり、メッセージをロック
/* Create a browser for topics with a specified selector */ TopicSession jms_session; TopicBrowser browser; Topic topic; /* create a Browser to look at messages with correlationID = RUSH in lock mode */ browser = ((AQjmsSession) jms_session).createBrowser( topic, "SUBS1", "JMSCorrelationID = 'RUSH'", true);
TopicBrowserの作成: Oracleオブジェクト型メッセージ
public oracle.jms.TopicBrowser createBrowser(javax.jms.Topic topic, java.lang.String cons_name, java.lang.String messageSelector, java.lang.Object payload_factory) throws JMSException
このメソッドは、Oracleオブジェクト型メッセージ・トピックに対するTopicBrowser
を作成します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
アクセスするトピック |
|
永続サブスクライバまたはコンシューマの名前。 |
|
|
|
Oracleのユーザー定義型にマップするJavaクラスの |
ノート:
-
CustomDatum
は今後のリリースではサポートされなくなります。かわりに、ORADataFactory
ペイロード・ファクトリを使用してください。 -
TxEventQキューは、オブジェクト型メッセージをサポートしていません。
SQLオブジェクト型ペイロードにマップされる特定のJavaクラスに対するCustomDatumFactory
は、静的メソッドgetFactory
を使用して取得できます。トピック test_topic
は、SCOTT.EMPLOYEE
型のペイロードを持ち、このOracleオブジェクト型に対してJpublisherによって生成されるJavaクラスがEmployee
であると仮定します。Employeeクラスは、CustomDatum
インタフェースを実装します。このクラスに対するCustomDatumFactory
は、Employee.getFactory()
メソッドを使用して取得できます。
関連項目:
例6-78 TopicBrowserの作成: AdtMessageメッセージ
/* Create a browser for a Topic with AdtMessage messages of type EMPLOYEE*/ TopicSession jms_session TopicBrowser browser; Topic test_topic; browser = ((AQjmsSession) jms_session).createBrowser( test_topic, "SUBS1", Employee.getFactory());
TopicBrowserの作成: Oracleオブジェクト型メッセージ、メッセージのロック
public oracle.jms.TopicBrowser createBrowser(javax.jms.Topic topic, java.lang.String cons_name, java.lang.String messageSelector, java.lang.Object payload_factory, boolean locked) throws JMSException
このメソッドは、Oracleオブジェクト型メッセージ・トピックに対して、ブラウズ中にメッセージをロックするTopicBrowser
を作成します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
アクセスするトピック |
|
永続サブスクライバまたはコンシューマの名前。 |
|
|
|
Oracleのユーザー定義型にマップするJavaクラスの |
|
TRUEに設定すると、ブラウズ中のメッセージはロックされます(UPDATEのSELECTと同様)。 |
ノート:
-
CustomDatum
は今後のリリースではサポートされなくなります。かわりに、ORADataFactory
ペイロード・ファクトリを使用してください。 -
TxEventQキューは、オブジェクト型メッセージをサポートしていません。
関連項目:
例6-79 TopicBrowserの作成: AdtMessageメッセージ、メッセージのロック
/* Create a browser for a Topic with AdtMessage messages of type EMPLOYEE* in lock mode/ TopicSession jms_session TopicBrowser browser; Topic test_topic; browser = ((AQjmsSession) jms_session).createBrowser( test_topic, "SUBS1", Employee.getFactory(), true);
TopicBrowserを使用したメッセージのブラウズ
public void purgeSeen() throws JMSException
このメソッドは、TopicBrowser
を使用してメッセージをブラウズします。java.util.Enumeration
内のメソッドを使用して、メッセージのリストを参照してください。現行のブラウズ中に参照したメッセージを削除するには、TopicBrowser
内のpurgeSeen
メソッドを使用してください。
例6-80 TopicBrowserの作成: セレクタの指定あり
/* Create a browser for topics with a specified selector */ public void browse_rush_orders(TopicSession jms_session) TopicBrowser browser; Topic topic; ObjectMessage obj_message BolOrder new_order; Enumeration messages; /* get a handle to the new_orders topic */ topic = ((AQjmsSession) jms_session).getTopic("OE", "OE_bookedorders_topic"); /* create a Browser to look at RUSH orders */ browser = ((AQjmsSession) jms_session).createBrowser( topic, "SUBS1", "JMSCorrelationID = 'RUSH'"); /* Browse through the messages */ for (messages = browser.elements() ; message.hasMoreElements() ;) {obj_message = (ObjectMessage)message.nextElement();} /* Purge messages seen during this browse */ browser.purgeSeen()
Oracle Java Message Serviceの共有インタフェース
次のトピックでは、Oracle Database Advanced Queuing (AQ)に対するJava Message Service (JMS)操作インタフェース(共有インタフェース)を説明します。
Oracle Database Advanced Queuing JMS操作インタフェース: 共有インタフェース
次のトピックでは、JMS操作のためのOracle Database Advanced Queuing共有インタフェースを説明します。
JMSコネクションの起動
public void start() throws JMSException
AQjmsConnection.start()
は、メッセージ受信用のJMSコネクションを起動します。
JMSコネクションの取得
public oracle.jms.AQjmsConnection getJmsConnection() throws JMSException
AQjmsSession.getJmsConnection()
は、セッションからJMSコネクションを取得します。
セッションにおけるすべての操作のコミット
public void commit() throws JMSException
AQjmsSession.commit()
は、セッションで実行されているすべてのJMSおよびSQL操作をコミットします。
セッションにおけるすべての操作のロールバック
public void rollback() throws JMSException
AQjmsSession.rollback()
は、セッションで実行されているすべてのJMSおよびSQL操作を終了します。
セッションからのJDBCコネクションの取得
public java.sql.Connection getDBConnection() throws JMSException
AQjmsSession.getDBConnection()
は、JMSセッションから基礎となるJDBCコネクションを取得します。JDBCコネクションは、JMS操作が実行される同一のトランザクションの一部としてSQL操作を実行するために使用される場合があります。
例6-81 JMSセッションからの基礎となるJDBCコネクションの取得
java.sql.Connection db_conn; QueueSession jms_sess; db_conn = ((AQjmsSession)jms_sess).getDBConnection();
JMSコネクションからのOracleOCIConnectionPoolの取得
public oracle.jdbc.pool.OracleOCIConnectionPool getOCIConnectionPool()
AQjmsConnection.getOCIConnectionPool()
は、JMSコネクションから基礎となるOracleOCIConnectionPool
を取得します。OracleOCIConnectionPool
インスタンスの設定は、コネクションの使用状況(ユーザーが既存のコネクションを使用して確立するセッション数など)に応じてユーザーがチューニングできます。ただし、JMSコネクションが使用しているOracleOCIConnectionPool
インスタンスはクローズしないでください。
例6-82 JMSコネクションからの基礎となるOracleOCIConnectionPoolの取得
oracle.jdbc.pool.OracleOCIConnectionPool cpool; QueueConnection jms_conn; cpool = ((AQjmsConnection)jms_conn).getOCIConnectionPool();
MapMessageの作成
public javax.jms.MapMessage createMapMessage() throws JMSException
AQjmsSession.createMapMessage()
は、Mapメッセージを作成します。これを使用できるのは、宛先キュー/トピックを含むキュー表がペイロード型SYS.AQ$_JMS_MAP_MESSAGE
またはAQ$_JMS_MESSAGE
で作成された場合のみです。
StreamMessageの作成
public javax.jms.StreamMessage createStreamMessage() throws JMSException
AQjmsSession.createStreamMessage()
は、StreamMessageを作成します。これを使用できるのは、宛先キュー/トピックを含むキュー表がペイロード型SYS.AQ$_JMS_STREAM_MESSAGE
またはAQ$_JMS_MESSAGE
で作成された場合のみです。
ObjectMessageの作成
public javax.jms.ObjectMessage createObjectMessage(java.io.Serializable object) throws JMSException
AQjmsSession.createObjectMessage()
は、Objectメッセージを作成します。これを使用できるのは、宛先キュー/トピックを含むキュー表がペイロード型SYS.AQ$_JMS_OBJECT_MESSAGE
またはAQ$_JMS_MESSAGE
で作成された場合のみです。
TextMessageの作成
public javax.jms.TextMessage createTextMessage() throws JMSException
AQjmsSession.createTextMessage()
は、Textメッセージを作成します。これを使用できるのは、宛先キュー/トピックを含むキュー表がペイロード型SYS.AQ$_JMS_TEXT_MESSAGE
またはAQ$_JMS_MESSAGE
で作成された場合のみです。
JMSメッセージの作成
public javax.jms.Message createMessage() throws JMSException
AQjmsSession.createMessage()
は、JMSメッセージを作成します。AQ$_JMS_MESSAGE
メッセージを使用して、様々な型のメッセージを構成できます。メッセージ型は、次のいずれかである必要があります。
-
DBMS_AQ.JMS_TEXT_MESSAGE
-
DBMS_AQ.JMS_OBJECT_MESSAGE
-
DBMS_AQ.JMS_MAP_MESSAGE
-
DBMS_AQ.JMS_BYTES_MESSAGE
-
DBMS_AQ.JMS_STREAM_MESSAGE
このユーザー定義型を使用して、ヘッダーのみのJMSメッセージを作成することもできます。
AdtMessageの作成
public oracle.jms.AdtMessage createAdtMessage() throws JMSException
AQjmsSession.createAdtMessage()
は、AdtMessage
を作成します。これを使用できるのは、宛先キュー/トピックを含むキュー表がペイロード型Oracle ADTで作成された場合のみです。AdtMessage
には、CustomDatum
インタフェースを実装するオブジェクトが移入される必要があります。このオブジェクトは、キュー/トピックのペイロードとして定義されたSQL ADTのJavaマッピングである必要があります。SQL ADT型に対応するJavaクラスは、Jpublisherツールを使用して生成できます。
JMSメッセージ・プロパティの指定
JMSで始まるプロパティ名は、プロバイダ固有です。ユーザー定義のプロパティは、JMSで始めることができません。
次のプロバイダのプロパティは、Text、Stream、Object、BytesまたはMapMessageを使用するクライアントが設定できます。
-
JMSXAppID (string)
-
JMSXGroupID (string)
-
JMSXGroupSeq (int)
-
JMS_OracleExcpQ (string)
このメッセージ・プロパティでは例外キューを指定します。
-
JMS_OracleDelay (int)
このメッセージ・プロパティではメッセージの遅延秒数を指定します。
次のプロパティは、AdtMessage
に対して設定できます。
-
JMS_OracleExcpQ (String)
このメッセージ・プロパティでは、例外キューを
schema
.queue_name
として指定します。 -
JMS_OracleDelay (int)
このメッセージ・プロパティではメッセージの遅延秒数を指定します。
この項の内容は次のとおりです。
Booleanメッセージ・プロパティの設定
public void setBooleanProperty(java.lang.String name, boolean value) throws JMSException
AQjmsMessage.setBooleanProperty()
は、メッセージ・プロパティをBooleanとして指定します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
Booleanプロパティの名前 |
|
メッセージに設定するBooleanプロパティの値 |
Stringメッセージ・プロパティの設定
public void setStringProperty(java.lang.String name, java.lang.String value) throws JMSException
AQjmsMessage.setStringProperty()
は、メッセージ・プロパティをStringとして指定します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
Stringプロパティの名前 |
|
メッセージに設定するStringプロパティの値 |
Integerメッセージ・プロパティの設定
public void setIntProperty(java.lang.String name, int value) throws JMSException
AQjmsMessage.setIntProperty()
は、メッセージ・プロパティをIntegerとして指定します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
Integerプロパティの名前 |
|
メッセージに設定するIntegerプロパティの値 |
Doubleメッセージ・プロパティの設定
public void setDoubleProperty(java.lang.String name, double value) throws JMSException
AQjmsMessage.setDoubleProperty()
は、メッセージ・プロパティをDoubleとして指定します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
Doubleプロパティの名前 |
|
メッセージに設定するDoubleプロパティの値 |
Floatメッセージ・プロパティの設定
public void setFloatProperty(java.lang.String name, float value) throws JMSException
AQjmsMessage.setFloatProperty()
は、メッセージ・プロパティをFloatとして指定します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
Floatプロパティの名前 |
|
メッセージに設定するFloatプロパティの値 |
Byteメッセージ・プロパティの設定
public void setByteProperty(java.lang.String name, byte value) throws JMSException
AQjmsMessage.setByteProperty()
は、メッセージ・プロパティをByteとして指定します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
Byteプロパティの名前 |
|
メッセージに設定するByteプロパティの値 |
Longメッセージ・プロパティの設定
public void setLongProperty(java.lang.String name, long value) throws JMSException
AQjmsMessage.setLongProperty()
は、メッセージ・プロパティをLongとして指定します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
Longプロパティの名前 |
|
メッセージに設定するLongプロパティの値 |
Shortメッセージ・プロパティの設定
public void setShortProperty(java.lang.String name, short value) throws JMSException
AQjmsMessage.setShortProperty()
は、メッセージ・プロパティをShortとして指定します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
Shortプロパティの名前 |
|
メッセージに設定するShortプロパティの値 |
Objectメッセージ・プロパティの設定
public void setObjectProperty(java.lang.String name, java.lang.Object value) throws JMSException
AQjmsMessage.setObjectProperty()
は、メッセージ・プロパティをObjectとして指定します。オブジェクト化されたプリミティブ値(Boolean、Byte、Short、Integer、Long、Float、DoubleおよびString)のみサポートされています。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
JavaのObjectプロパティの名前 |
|
メッセージに設定するJavaのObjectプロパティの値 |
MessageProducerが送信するすべてのメッセージに対するデフォルトのTimeToLiveの設定
public void setTimeToLive(long timeToLive) throws JMSException
このメソッドは、MessageProducer
が送信するすべてのメッセージに対するデフォルトのTimeToLive
を設定します。メッセージの遅延が発生した後に計算されます。このメソッドには、次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
ミリ秒で指定されるメッセージのTime-To-Live(ゼロは無制限) |
例6-83 MessageProducerが送信するすべてのメッセージに対するデフォルトのTimeToLiveの設定
/* Set default timeToLive value to 100000 milliseconds for all messages sent by the QueueSender*/ QueueSender sender; sender.setTimeToLive(100000);
MessageProducerが送信するすべてのメッセージに対するデフォルトの優先順位の設定
public void setPriority(int priority) throws JMSException
このメソッドは、MessageProducer
が送信するすべてのメッセージに対するデフォルトのPriority
を設定します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
このMessageProducerのメッセージの優先順位。デフォルトは4です。 |
優先順位には、どの整数値でも指定できます。数値が小さいほど高い優先度を示します。優先順位がsend()
操作時に明示的に指定されている場合は、このメソッドで設定されたデフォルト値をオーバーライドします。
例6-84 QueueSenderが送信するすべてのメッセージに対するデフォルトの優先順位の設定
/* Set default priority value to 2 for all messages sent by the QueueSender*/ QueueSender sender; sender.setPriority(2);
例6-85 TopicPublisherが送信するすべてのメッセージに対するデフォルトの優先順位の設定
/* Set default priority value to 2 for all messages sent by the TopicPublisher*/ TopicPublisher publisher; publisher.setPriority(1);
AQjmsエージェントの作成
public void createAQAgent(java.lang.String agent_name, boolean enable_http, throws JMSException
このメソッドはAQjmsAgent
を作成します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
AQエージェントの名前。 |
|
TRUEに設定すると、このエージェントはHTTP経由でAQにアクセスできます。 |
メッセージの同期受信
タイムアウトを指定するか、待機なしで、メッセージを同期受信できます。また、変換を使用してメッセージを受信することもできます。
タイムアウト指定でのメッセージ・コンシューマの使用
public javax.jms.Message receive(long timeout) throws JMSException
このメソッドは、タイムアウトを指定し、メッセージ・コンシューマを使用してメッセージを受信します。
パラメータ | 説明 |
---|---|
|
タイムアウト値(ミリ秒単位) |
例6-86 タイムアウト指定でのメッセージ・コンシューマの使用
TopicConnectionFactory tc_fact = null; TopicConnection t_conn = null; TopicSession t_sess = null; TopicSession jms_sess; Topic shipped_orders; int myport = 5521; /* create connection and session */ tc_fact = AQjmsFactory.getTopicConnectionFactory( "MYHOSTNAME", "MYSID", myport, "oci8"); t_conn = tc_fact.createTopicConnection("jmstopic", "jmstopic"); jms_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE); shipped_orders = ((AQjmsSession )jms_sess).getTopic( "WS", "Shipped_Orders_Topic"); /* create a subscriber, specifying the correct CustomDatumFactory and selector */ subscriber1 = jms_sess.createDurableSubscriber( shipped_orders, 'WesternShipping', " priority > 1 and tab.user_data.region like 'WESTERN %'", false, AQjmsAgent.getFactory()); /* receive, blocking for 30 seconds if there were no messages */ Message = subscriber.receive(30000);
例6-87 JMS: メッセージが届くまでブロック
public BolOrder get_new_order1(QueueSession jms_session) { Queue queue; QueueReceiver qrec; ObjectMessage obj_message; BolCustomer customer; BolOrder new_order = null; String state; try { /* get a handle to the new_orders queue */ queue = ((AQjmsSession) jms_session).getQueue("OE", "OE_neworders_que"); qrec = jms_session.createReceiver(queue); /* wait for a message to show up in the queue */ obj_message = (ObjectMessage)qrec.receive(); new_order = (BolOrder)obj_message.getObject(); customer = new_order.getCustomer(); state = customer.getState(); System.out.println("Order: for customer " + customer.getName()); } catch (JMSException ex) { System.out.println("Exception: " + ex); } return new_order; }
待機なしでのメッセージ・コンシューマの使用
public javax.jms.Message receiveNoWait() throws JMSException
このメソッドは、待機なしで、メッセージ・コンシューマを使用してメッセージを受信します。
例6-88 JMS: メッセージを非ブロック
public BolOrder poll_new_order3(QueueSession jms_session) { Queue queue; QueueReceiver qrec; ObjectMessage obj_message; BolCustomer customer; BolOrder new_order = null; String state; try { /* get a handle to the new_orders queue */ queue = ((AQjmsSession) jms_session).getQueue("OE", "OE_neworders_que"); qrec = jms_session.createReceiver(queue); /* check for a message to show in the queue */ obj_message = (ObjectMessage)qrec.receiveNoWait(); new_order = (BolOrder)obj_message.getObject(); customer = new_order.getCustomer(); state = customer.getState(); System.out.println("Order: for customer " + customer.getName()); } catch (JMSException ex) { System.out.println("Exception: " + ex); } return new_order; }
変換による宛先からのメッセージの受信
キューまたはトピックからメッセージを受信するときに変換を適用できます。メッセージを変換してから、JMSアプリケーションに戻します。
変換は、AQjmsQueueReceiver
、AQjmsTopicSubscriber
またはAQjmsTopicReceiver
のsetTransformation()
インタフェースを使用して指定できます。
例6-89 JMS: 変換による宛先からのメッセージの受信
Western Shippingアプリケーションが、OE_bookedorders_topicからメッセージを取得すると仮定します。これは、変換OE2WS
がメッセージをOracleオブジェクト型WS_order
として取得するよう指定します。WSOrder JavaクラスがJPublisherによって生成され、Oracleオブジェクト型WS.WS_order
にマップされると仮定します。
public AQjmsAdtMessage retrieve_bookedorders(TopicSession jms_session) AQjmsTopicReceiver receiver; Topic topic; Message msg = null; try { /* get a handle to the OE_bookedorders_topic */ topic = ((AQjmsSession)jms_session).getTopic("OE", "OE_bookedorders_topic"); /* Create a receiver for WShip */ receiver = ((AQjmsSession)jms_session).createTopicReceiver( topic, "WShip, null, WSOrder.getFactory()); /* set the transformation in the publisher */ receiver.setTransformation("OE2WS"); msg = receiver.receive(10); } catch (JMSException ex) { System.out.println("Exception :", ex); } return (AQjmsAdtMessage)msg; }
メッセージの受信に対するナビゲーション・モードの指定
public void setNavigationMode(int mode) throws JMSException
このメソッドは、メッセージの受信に対するナビゲーション・モードを指定します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
ナビゲーション・モードの新規値 |
例6-90 メッセージの受信に対するナビゲーション・モードの指定
TopicConnectionFactory tc_fact = null; TopicConnection t_conn = null; TopicSession t_sess = null; TopicSession jms_sess; Topic shipped_orders; int myport = 5521; /* create connection and session */ tc_fact = AQjmsFactory.getTopicConnectionFactory( "MYHOSTNAME", "MYSID", myport, "oci8"); t_conn = tc_fact.createTopicConnection("jmstopic", "jmstopic"); jms_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE); shipped_orders = ((AQjmsSession )jms_sess).getTopic("WS", "Shipped_Orders_Topic"); /* create a subscriber, specifying the correct CustomDatumFactory and selector */ subscriber1 = jms_sess.createDurableSubscriber( shipped_orders, 'WesternShipping', "priority > 1 and tab.user_data.region like 'WESTERN %'", false, AQjmsAgent.getFactory()); subscriber1.setNavigationMode(AQjmsConstants.NAVIGATION_FIRST_MESSAGE); /* get message for the subscriber, returning immediately if there was nomessage */ Message = subscriber.receive();
メッセージの非同期受信
メッセージを非同期に受信するには、次の2つの方法があります。
メッセージ・コンシューマに対するメッセージ・リスナーの指定
public void setMessageListener(javax.jms.MessageListener myListener) throws JMSException
このメソッドは、メッセージ・コンシューマに対してメッセージ・リスナーを指定します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
コンシューマに対してメッセージ・リスナーを設定します。 |
例6-91 メッセージ・コンシューマに対するメッセージ・リスナーの指定
TopicConnectionFactory tc_fact = null; TopicConnection t_conn = null; TopicSession t_sess = null; TopicSession jms_sess; Topic shipped_orders; int myport = 5521; MessageListener mLis = null; /* create connection and session */ tc_fact = AQjmsFactory.getTopicConnectionFactory( "MYHOSTNAME", "MYSID", myport, "oci8"); t_conn = tc_fact.createTopicConnection("jmstopic", "jmstopic"); jms_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE); shipped_orders = ((AQjmsSession )jms_sess).getTopic( "WS", "Shipped_Orders_Topic"); /* create a subscriber, specifying the correct CustomDatumFactory and selector */ subscriber1 = jms_sess.createDurableSubscriber( shipped_orders, 'WesternShipping', "priority > 1 and tab.user_data.region like 'WESTERN %'", false, AQjmsAgent.getFactory()); mLis = new myListener(jms_sess, "foo"); /* get message for the subscriber, returning immediately if there was nomessage */ subscriber.setMessageListener(mLis); The definition of the myListener class import oracle.AQ.*; import oracle.jms.*; import javax.jms.*; import java.lang.*; import java.util.*; public class myListener implements MessageListener { TopicSession mySess; String myName; /* constructor */ myListener(TopicSession t_sess, String t_name) { mySess = t_sess; myName = t_name; } public onMessage(Message m) { System.out.println("Retrieved message with correlation: " || m.getJMSCorrelationID()); try{ /* commit the dequeue */ mySession.commit(); } catch (java.sql.SQLException e) {System.out.println("SQL Exception on commit"); } } }
メッセージIDの取得
この項の内容は次のとおりです。
相関識別子の取得
public java.lang.String getJMSCorrelationID() throws JMSException
AQjmsMessage.getJMSCorrelationID()
は、メッセージの相関識別子を取得します。
JMSメッセージ・プロパティの取得
この項の内容は次のとおりです。
Booleanメッセージ・プロパティの取得
public boolean getBooleanProperty(java.lang.String name) throws JMSException
AQjmsMessage.getBooleanProperty()
は、メッセージ・プロパティをBooleanとして取得します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
Booleanプロパティの名前 |
Stringメッセージ・プロパティの取得
public java.lang.String getStringProperty(java.lang.String name) throws JMSException
AQjmsMessage.getStringProperty()
は、メッセージ・プロパティをStringとして取得します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
Stringプロパティの名前 |
Integerメッセージ・プロパティの取得
public int getIntProperty(java.lang.String name) throws JMSException
AQjmsMessage.getIntProperty()
は、メッセージ・プロパティをIntegerとして取得します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
Integerプロパティの名前 |
Doubleメッセージ・プロパティの取得
public double getDoubleProperty(java.lang.String name) throws JMSException
AQjmsMessage.getDoubleProperty()
は、メッセージ・プロパティをDoubleとして取得します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
Doubleプロパティの名前 |
Floatメッセージ・プロパティの取得
public float getFloatProperty(java.lang.String name) throws JMSException
AQjmsMessage.getFloatProperty()
は、メッセージ・プロパティをFloatとして取得します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
Floatプロパティの名前 |
Byteメッセージ・プロパティの取得
public byte getByteProperty(java.lang.String name) throws JMSException
AQjmsMessage.getByteProperty()
は、メッセージ・プロパティをByteとして取得します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
Byteプロパティの名前 |
Longメッセージ・プロパティの取得
public long getLongProperty(java.lang.String name) throws JMSException
AQjmsMessage.getLongProperty()
は、メッセージ・プロパティをLongとして取得します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
Longプロパティの名前 |
Shortメッセージ・プロパティの取得
public short getShortProperty(java.lang.String name) throws JMSException
AQjmsMessage.getShortProperty()は、メッセージ・プロパティをShortとして取得します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
Shortプロパティの名前 |
Objectメッセージ・プロパティの取得
public java.lang.Object getObjectProperty(java.lang.String name) throws JMSException
AQjmsMessage.getObjectProperty()
は、メッセージ・プロパティをObjectとして取得します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
Objectプロパティの名前 |
例6-92 Objectメッセージ・プロパティの取得
TextMessage message; message.getObjectProperty("empid", new Integer(1000);
クローズおよびシャットダウン
この項の内容は次のとおりです。
MessageProducerのクローズ
public void close() throws JMSException
AQjmsProducer.close()
はMessageProducer
をクローズします。
メッセージ・コンシューマのクローズ
public void close() throws JMSException
AQjmsConsumer.close()
は、メッセージ・コンシューマをクローズします。
トラブルシューティング
この項の内容は次のとおりです。
JMSエラー・コードの取得
public java.lang.String getErrorCode()
AQjmsException.getErrorCode()
は、JMS例外のエラー・コードを取得します。
JMSエラー番号の取得
public int getErrorNumber()
AQjmsException.getErrorNumber()
は、JMS例外のエラー番号を取得します。
ノート:
このメソッドは今後のリリースでは使用されなくなります。かわりに、getErrorCode()
を使用してください。
JMS例外にリンクされた例外の取得
public java.lang.String getLinkString()
AQjmsException.getLinkString()
は、JMS例外にリンクされた例外を取得します。一般に、この例外にはデータベースで発行されるSQL例外が含まれます。
JMS例外のスタック・トレースの出力
public void printStackTrace(java.io.PrintStream s)
AQjmsException.printStackTrace()
は、JMS例外のスタック・トレースを出力します。
例外リスナーの設定
public void setExceptionListener(javax.jms.ExceptionListener listener) throws JMSException
AQjmsConnection.setExceptionListener()
は、コネクション用の例外リスナーを指定します。次のパラメータがあります。
パラメータ | 説明 |
---|---|
|
例外リスナー |
例外リスナーの登録が完了している場合は、コネクションに重大な問題が検出されると通知されます。これは、リスナーのonException()
メソッドをコールして、問題を説明するJMS例外を渡すことによって実行されます。これによって、問題がJMSクライアントに非同期に通知されます。メッセージを処理するのみのコネクションもあるため、コネクションが失敗したことを知るための他の方法がありません。
例6-93 コネクション用の例外リスナーの指定
//register an exception listener Connection jms_connection; jms_connection.setExceptionListener( new ExceptionListener() { public void onException (JMSException jmsException) { System.out.println("JMS-EXCEPTION: " + jmsException.toString()); } }; );
例外リスナーの取得
public javax.jms.ExceptionListener getExceptionListener() throws JMSException
AQjmsConnection.getExceptionListener()
は、コネクション用の例外リスナーを取得します。
例6-94に、ExceptionListener
とMessageListener
の組合せ使用の方法を示します。次の条件が満たされていることを確認してください。
-
ユーザー
jmsuser
(パスワードjmsuser
)が、適切な権限を所有してデータベースに作成されます。 -
キュー
demoQueue
が作成され起動されます。
この例は、コネクション再起動があり、例外リスナーがJMSオブジェクトを再作成する場合に、MessageListenerが非同期にメッセージを受信する方法を示します。
例6-94 ExceptionListenerとMessageListenerの組合せ使用
import java.util.Enumeration; import java.util.Properties; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.QueueBrowser; import javax.jms.Session; import javax.jms.TextMessage; import oracle.jms.AQjmsConnectionFactory; import oracle.jms.AQjmsFactory; import oracle.jms.AQjmsSession; public class JMSDemo { static String queueName = "demoQueue"; static String queueOwner = "jmsuser"; static String queueOwnerPassword = "jmsuser"; static Connection connection = null; static int numberOfMessages = 25000; static int messageCount = 0; static String jdbcURL = ""; public static void main(String args[]) { try { jdbcURL = System.getProperty("JDBC_URL"); if (jdbcURL == null) System.out .println("The system property JDBC_URL has not been set, " + "usage:java -DJDBC_URL=xxx filename "); else { JMSDemo demo = new JMSDemo(); demo.performJmsOperations(); } } catch (Exception exception) { System.out.println("Exception : " + exception); exception.printStackTrace(); } finally { try { if (connection != null) connection.close(); } catch (Exception exc) { exc.printStackTrace(); } } System.out.println("\nEnd of Demo aqjmsdemo11."); } public void performJmsOperations() { try { connection = getConnection(jdbcURL); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(queueName); // remove the messages from the Queue drainQueue(queueName, queueOwner, jdbcURL, true); // set the exception listener on the Connection connection.setExceptionListener(new DemoExceptionListener()); MessageProducer producer = session.createProducer(queue); TextMessage textMessage = null; System.out.println("Sending " + numberOfMessages + " messages to queue " + queueName); for (int i = 0; i < numberOfMessages; i++) { textMessage = session.createTextMessage(); textMessage.setText("Sample message text"); producer.send(textMessage); } MessageConsumer consumer = session.createConsumer(queue); System.out.println("Setting the message listener ..."); consumer.setMessageListener(new DemoMessageListener()); connection.start(); // Introduce a long wait to allow the listener to receive all the messages while (messageCount < numberOfMessages) { try { Thread.sleep(5000); } catch (InterruptedException interruptedException) { } } } catch (JMSException jmsException) { jmsException.printStackTrace(); } } // Sample message listener static class DemoMessageListener implements javax.jms.MessageListener { public void onMessage(Message message) { try { System.out.println("Message listener received message with JMSMessageID " + message.getJMSMessageID()); messageCount++; } catch (JMSException jmsException) { System.out.println("JMSException " + jmsException.getMessage()); } } } // sample exception listener static class DemoExceptionListener implements javax.jms.ExceptionListener { public void onException(JMSException jmsException) { try { // As a first step close the connection if (connection != null) connection.close(); } catch (JMSException exception) {} try { System.out.println("Re-create the necessary JMS objects ..."); connection = getConnection(jdbcURL); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(queueName); MessageConsumer consumer = session.createConsumer(queue); consumer.setMessageListener(new DemoMessageListener()); } catch (JMSException newJmsException) { newJmsException.printStackTrace(); } } } // Utility method to get a connection static Connection getConnection(String jdbcUrl) throws JMSException { Properties prop = new Properties(); prop.put("user", queueOwner); prop.put("password", queueOwnerPassword); AQjmsConnectionFactory fact = (AQjmsConnectionFactory) AQjmsFactory .getConnectionFactory(jdbcUrl, prop); Connection conn = fact.createConnection(); return conn; } // Utility method to remove the messages from the queue static void drainQueue(String queueName, String queueOwner, String jdbcUrl, boolean debugInfo) { Connection connection = null; Session session = null; long timeout = 10000; int count = 0; Message message = null; try { connection = getConnection(jdbcUrl); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = ((AQjmsSession) session).getQueue(queueOwner, queueName); MessageConsumer messageConsumer = session.createConsumer(queue); QueueBrowser browser = session.createBrowser(queue); Enumeration enumeration = browser.getEnumeration(); if (enumeration.hasMoreElements()) { while ((message = messageConsumer.receive(timeout)) != null) { if (debugInfo) { count++; } } } messageConsumer.close(); if (debugInfo) { System.out.println("Removed " + count + " messages from the queue : " + queueName); } } catch (JMSException jmsException) { jmsException.printStackTrace(); } finally { try { if (session != null) session.close(); if (connection != null) connection.close(); } catch (Exception exception) { } } } }
例6-95 コネクション用の例外リスナーの取得
//Get the exception listener Connection jms_connection; ExceptionListener el = jms_connection.getExceptionListener();
Oracle Java Message Service型の例
Oracle Database Advanced QueuingのJMS型の設定方法の例
例6-96 JMS型の例を実行する環境の設定
connect sys;
enter password: password
Rem
Rem Create the JMS user: jmsuser
Rem
DROP USER jmsuser CASCADE;
CREATE USER jmsuser IDENTIFIED BY jmsuser;
GRANT EXECUTE ON DBMS_AQADM TO jmsuser;
GRANT EXECUTE ON DBMS_AQ TO jmsuser;
GRANT EXECUTE ON DBMS_LOB TO jmsuser;
GRANT EXECUTE ON DBMS_JMS_PLSQL TO jmsuser;
set echo offset verify offconnect sysDROP USER jmsuser CASCADE;ACCEPT password CHAR PROMPT 'Enter the password for JMSUSER: ' HIDECREATE USER jmsuser IDENTIFIED BY &password;GRANT DBA, AQ_ADMINISTRATOR_ROLE, AQ_USER_ROLE to jmsuser;GRANT EXECUTE ON DBMS_AQADM TO jmsuser;GRANT EXECUTE ON DBMS_AQ TO jmsuser;GRANT EXECUTE ON DBMS_LOB TO jmsuser;GRANT EXECUTE ON DBMS_JMS_PLSQL TO jmsuser;connect jmsuser/&password
Rem
Rem Creating five AQ queue tables and five queues for five payloads:
Rem SYS.AQ$_JMS_TEXT_MESSAGE
Rem SYS.AQ$_JMS_BYTES_MESSAGE
Rem SYS.AQ$_JMS_STREAM_MESSAG
Rem SYS.AQ$_JMS_MAP_MESSAGE
Rem SYS.AQ$_JMS_MESSAGE
Rem
EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE (Queue_table => 'jmsuser.jms_qtt_text',
Queue_payload_type => 'SYS.AQ$_JMS_TEXT_MESSAGE', compatible => '8.1.0');
EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE (Queue_table => 'jmsuser.jms_qtt_bytes',
Queue_payload_type => 'SYS.AQ$_JMS_BYTES_MESSAGE', compatible => '8.1.0');
EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE (Queue_table => 'jmsuser.jms_qtt_stream',
Queue_payload_type => 'SYS.AQ$_JMS_STREAM_MESSAGE', compatible => '8.1.0');
EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE (Queue_table => 'jmsuser.jms_qtt_map',
Queue_payload_type => 'SYS.AQ$_JMS_MAP_MESSAGE', compatible => '8.1.0');
EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE (Queue_table => 'jmsuser.jms_qtt_general',
Queue_payload_type => 'SYS.AQ$_JMS_MESSAGE', compatible => '8.1.0');
EXECUTE DBMS_AQADM.CREATE_QUEUE (Queue_name => 'jmsuser.jms_text_que',
Queue_table => 'jmsuser.jms_qtt_text');
EXECUTE DBMS_AQADM.CREATE_QUEUE (Queue_name => 'jmsuser.jms_bytes_que',
Queue_table => 'jmsuser.jms_qtt_bytes');
EXECUTE DBMS_AQADM.CREATE_QUEUE (Queue_name => 'jmsuser.jms_stream_que',
Queue_table => 'jmsuser.jms_qtt_stream');
EXECUTE DBMS_AQADM.CREATE_QUEUE (Queue_name => 'jmsuser.jms_map_que',
Queue_table => 'jmsuser.jms_qtt_map');
EXECUTE DBMS_AQADM.CREATE_QUEUE (Queue_name => 'jmsuser.jms_general_que',
Queue_table => 'jmsuser.jms_qtt_general');
Rem
Rem Starting the queues and enable both enqueue and dequeue
Rem
EXECUTE DBMS_AQADM.START_QUEUE (Queue_name => 'jmsuser.jms_text_que');
EXECUTE DBMS_AQADM.START_QUEUE (Queue_name => 'jmsuser.jms_bytes_que');
EXECUTE DBMS_AQADM.START_QUEUE (Queue_name => 'jmsuser.jms_stream_que');
EXECUTE DBMS_AQADM.START_QUEUE (Queue_name => 'jmsuser.jms_map_que');
EXECUTE DBMS_AQADM.START_QUEUE (Queue_name => 'jmsuser.jms_general_que');
Rem The supporting utility used in the example to help display results in SQLPLUS enviroment
Rem
Rem Display a RAW data in SQLPLUS
Rem
create or replace procedure display_raw(rdata raw)
IS
pos pls_integer;
length pls_integer;
BEGIN
pos := 1;
length := UTL_RAW.LENGTH(rdata);
WHILE pos <= length LOOP
IF pos+20 > length+1 THEN
dbms_output.put_line(UTL_RAW.SUBSTR(rdata, pos, length-pos+1));
ELSE
dbms_output.put_line(UTL_RAW.SUBSTR(rdata, pos, 20));
END IF;
pos := pos+20;
END LOOP;
END display_raw;
/
show errors;
Rem
Rem Display a BLOB data in SQLPLUS
Rem
create or replace procedure display_blob(bdata blob)
IS
pos pls_integer;
length pls_integer;
BEGIN
length := dbms_lob.getlength(bdata);
pos := 1;
WHILE pos <= length LOOP
display_raw(DBMS_LOB.SUBSTR(bdata, 2000, pos));
pos := pos+2000;
END LOOP;
END display_blob;
/
show errors;
Rem
Rem Display a VARCHAR data in SQLPLUS
Rem
create or replace procedure display_varchar(vdata varchar)
IS
pos pls_integer;
text_len pls_integer;
BEGIN
text_len := length(vdata);
pos := 1;
WHILE pos <= text_len LOOP
IF pos+20 > text_len+1 THEN
dbms_output.put_line(SUBSTR(vdata, pos, text_len-pos+1));
ELSE
dbms_output.put_line(SUBSTR(vdata, pos, 20));
END IF;
pos := pos+20;
END LOOP;
END display_varchar;
/
show errors;
Rem
Rem Display a CLOB data in SQLPLUS
Rem
create or replace procedure display_clob(cdata clob)
IS
pos pls_integer;
length pls_integer;
BEGIN
length := dbms_lob.getlength(cdata);
pos := 1;
WHILE pos <= length LOOP
display_varchar(DBMS_LOB.SUBSTR(cdata, 2000, pos));
pos := pos+2000;
END LOOP;
END display_clob;
/
show errors;
Rem
Rem Display a SYS.AQ$_JMS_EXCEPTION data in SQLPLUS
Rem
Rem When application receives an ORA-24197 error, It means the JAVA stored
Rem procedures has thrown some exceptions that could not be catergorized. The
Rem user can use GET_EXCEPTION procedure of SYS.AQ$_JMS_BYTES_MESSAGE,
Rem SYS.AQ$_JMS_STREAM_MESSAG or SYS.AQ$_JMS_MAP_MESSAGE
Rem to retrieve a SYS.AQ$_JMS_EXCEPTION object which contains more detailed
Rem information on this JAVA exception including the exception name, JAVA error
Rem message and stack trace.
Rem
Rem This utility function is to help display the SYS.AQ$_JMS_EXCEPTION object in
Rem SQLPLUS
Rem
create or replace procedure display_exp(exp SYS.AQ$_JMS_EXCEPTION)
IS
pos1 pls_integer;
pos2 pls_integer;
text_data varchar(2000);
BEGIN
dbms_output.put_line('exception:'||exp.exp_name);
dbms_output.put_line('err_msg:'||exp.err_msg);
dbms_output.put_line('stack:'||length(exp.stack));
pos1 := 1;
LOOP
pos2 := INSTR(exp.stack, chr(10), pos1);
IF pos2 = 0 THEN
pos2 := length(exp.stack)+1;
END IF;
dbms_output.put_line(SUBSTR(exp.stack, pos1, pos2-pos1));
IF pos2 > length(exp.stack) THEN
EXIT;
END IF;
pos1 := pos2+1;
END LOOP;
END display_exp;
/
show errors;
EXIT;
例6-97 例の設定
例6-96では、JMS型の例に必要な設定を実行します。この例をコピーし、setup.sql
として保存します。
JMS BytesMessageの例
この項では、JMS BytesMessage
のエンキューおよびデキューの説明の例を示します。
例6-98では、JMS型メンバー関数とDBMS_AQ
関数を使用して、データベース内でsys.aq$_jms_bytes_message
型として表示されるJMS BytesMessage
を移入およびエンキューする方法を紹介します。このメッセージは、後でJAVA Oracle Java Message Service (Oracle JMS)クライアントによってデキューできます。
例6-99では、JMS型メンバー関数とDBMS_AQ
関数を使用して、データベース内でsys.aq$_jms_bytes_message
型として表示されるJMS BytesMessage
からデータをデキューおよび取り出す方法を紹介します。このメッセージは、Oracle JMSクライアントによってエンキューされる場合もあります。
例6-98 BytesMessageの移入とエンキュー
set echo offset verify offconnect sysDROP USER jmsuser CASCADE;ACCEPT password CHAR PROMPT 'Enter the password for JMSUSER: ' HIDECREATE USER jmsuser IDENTIFIED BY &password;GRANT DBA, AQ_ADMINISTRATOR_ROLE, AQ_USER_ROLE to jmsuser;GRANT EXECUTE ON DBMS_AQADM TO jmsuser;GRANT EXECUTE ON DBMS_AQ TO jmsuser;GRANT EXECUTE ON DBMS_LOB TO jmsuser;GRANT EXECUTE ON DBMS_JMS_PLSQL TO jmsuser;connect jmsuser/&password SET ECHO ON set serveroutput on DECLARE id pls_integer; agent sys.aq$_agent := sys.aq$_agent(' ', null, 0); message sys.aq$_jms_bytes_message; enqueue_options dbms_aq.enqueue_options_t; message_properties dbms_aq.message_properties_t; msgid raw(16); java_exp exception; pragma EXCEPTION_INIT(java_exp, -24197); BEGIN -- Consturct a empty BytesMessage object message := sys.aq$_jms_bytes_message.construct; -- Shows how to set the JMS header message.set_replyto(agent); message.set_type('tkaqpet1'); message.set_userid('jmsuser'); message.set_appid('plsql_enq'); message.set_groupid('st'); message.set_groupseq(1); -- Shows how to set JMS user properties message.set_string_property('color', 'RED'); message.set_int_property('year', 1999); message.set_float_property('price', 16999.99); message.set_long_property('mileage', 300000); message.set_boolean_property('import', True); message.set_byte_property('password', -127); -- Shows how to populate the message payload of aq$_jms_bytes_message -- Passing -1 reserve a new slot within the message store of sys.aq$_jms_bytes_message. -- The maximum number of sys.aq$_jms_bytes_message type of messges to be operated at -- the same time within a session is 20. Calling clean_body function with parameter -1 -- might result a ORA-24199 error if the messages currently operated is already 20. -- The user is responsible to call clean or clean_all function to clean up message store. id := message.clear_body(-1); -- Write data into the BytesMessage paylaod. These functions are analogy of JMS JAVA api's. -- See the document for detail. -- Write a byte to the BytesMessage payload message.write_byte(id, 10); -- Write a RAW data as byte array to the BytesMessage payload message.write_bytes(id, UTL_RAW.XRANGE(HEXTORAW('00'), HEXTORAW('FF'))); -- Write a portion of the RAW data as byte array to BytesMessage payload -- Note the offset follows JAVA convention, starting from 0 message.write_bytes(id, UTL_RAW.XRANGE(HEXTORAW('00'), HEXTORAW('FF')), 0, 16); -- Write a char to the BytesMessage payload message.write_char(id, 'A'); -- Write a double to the BytesMessage payload message.write_double(id, 9999.99); -- Write a float to the BytesMessage payload message.write_float(id, 99.99); -- Write a int to the BytesMessage payload message.write_int(id, 12345); -- Write a long to the BytesMessage payload message.write_long(id, 1234567); -- Write a short to the BytesMessage payload message.write_short(id, 123); -- Write a String to the BytesMessage payload, -- the String is encoded in UTF8 in the message payload message.write_utf(id, 'Hello World!'); -- Flush the data from JAVA stored procedure (JServ) to PL/SQL side -- Without doing this, the PL/SQL message is still empty. message.flush(id); -- Use either clean_all or clean to clean up the message store when the user -- do not plan to do paylaod population on this message anymore sys.aq$_jms_bytes_message.clean_all(); --message.clean(id); -- Enqueue this message into AQ queue using DBMS_AQ package dbms_aq.enqueue(queue_name => 'jmsuser.jms_bytes_que', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => msgid); EXCEPTION WHEN java_exp THEN dbms_output.put_line('exception information:'); display_exp(sys.aq$_jms_stream_message.get_exception()); END; / commit;
例6-99 JMS BytesMessageデータのデキューおよび取出し
set echo off set verify off DROP USER jmsuser CASCADE; ACCEPT password CHAR PROMPT 'Enter the password for JMSUSER: ' HIDE CREATE USER jmsuser IDENTIFIED BY &password; GRANT EXECUTE ON DBMS_AQADM TO jmsuser; GRANT EXECUTE ON DBMS_AQ TO jmsuser; GRANT EXECUTE ON DBMS_LOB TO jmsuser; GRANT EXECUTE ON DBMS_JMS_PLSQL TO jmsuser; connect jmsuser/&password set echo on set serveroutput on size 20000 DECLARE id pls_integer; blob_data blob; clob_data clob; blob_len pls_integer; message sys.aq$_jms_bytes_message; agent sys.aq$_agent; dequeue_options dbms_aq.dequeue_options_t; message_properties dbms_aq.message_properties_t; msgid raw(16); gdata sys.aq$_jms_value; java_exp exception; pragma EXCEPTION_INIT(java_exp, -24197); BEGIN DBMS_OUTPUT.ENABLE (20000); -- Dequeue this message from AQ queue using DBMS_AQ package dbms_aq.dequeue(queue_name => 'jmsuser.jms_bytes_que', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => msgid); -- Retrieve the header agent := message.get_replyto; dbms_output.put_line('Type: ' || message.get_type || ' UserId: ' || message.get_userid || ' AppId: ' || message.get_appid || ' GroupId: ' || message.get_groupid || ' GroupSeq: ' || message.get_groupseq); -- Retrieve the user properties dbms_output.put_line('price: ' || message.get_float_property('price')); dbms_output.put_line('color: ' || message.get_string_property('color')); IF message.get_boolean_property('import') = TRUE THEN dbms_output.put_line('import: Yes' ); ELSIF message.get_boolean_property('import') = FALSE THEN dbms_output.put_line('import: No' ); END IF; dbms_output.put_line('year: ' || message.get_int_property('year')); dbms_output.put_line('mileage: ' || message.get_long_property('mileage')); dbms_output.put_line('password: ' || message.get_byte_property('password')); -- Shows how to retrieve the message payload of aq$_jms_bytes_message -- Prepare call, send the content in the PL/SQL aq$_jms_bytes_message object to -- Java stored procedure(Jserv) in the form of a byte array. -- Passing -1 reserves a new slot in msg store of sys.aq$_jms_bytes_message. -- Max number of sys.aq$_jms_bytes_message type of messges to be operated at -- the same time in a session is 20. Call clean_body fn. with parameter -1 -- might result in ORA-24199 error if messages operated on are already 20. -- You must call clean or clean_all function to clean up message store. id := message.prepare(-1); -- Read data from BytesMessage paylaod. These fns. are analogy of JMS Java -- API's. See the JMS Types chapter for detail. dbms_output.put_line('Payload:'); -- read a byte from the BytesMessage payload dbms_output.put_line('read_byte:' || message.read_byte(id)); -- read a byte array into a blob object from the BytesMessage payload dbms_output.put_line('read_bytes:'); blob_len := message.read_bytes(id, blob_data, 272); display_blob(blob_data); -- read a char from the BytesMessage payload dbms_output.put_line('read_char:'|| message.read_char(id)); -- read a double from the BytesMessage payload dbms_output.put_line('read_double:'|| message.read_double(id)); -- read a float from the BytesMessage payload dbms_output.put_line('read_float:'|| message.read_float(id)); -- read a int from the BytesMessage payload dbms_output.put_line('read_int:'|| message.read_int(id)); -- read a long from the BytesMessage payload dbms_output.put_line('read_long:'|| message.read_long(id)); -- read a short from the BytesMessage payload dbms_output.put_line('read_short:'|| message.read_short(id)); -- read a String from the BytesMessage payload. -- the String is in UTF8 encoding in the message payload dbms_output.put_line('read_utf:'); message.read_utf(id, clob_data); display_clob(clob_data); -- Use either clean_all or clean to clean up the message store when the user -- do not plan to do paylaod retrieving on this message anymore message.clean(id); -- sys.aq$_jms_bytes_message.clean_all(); EXCEPTION WHEN java_exp THEN dbms_output.put_line('exception information:'); display_exp(sys.aq$_jms_bytes_message.get_exception()); END; / commit;
JMS StreamMessageの例
この項では、JMS StreamMessage
のエンキューおよびデキューの説明の例を示します。
例6-100では、JMS型メンバー関数とDBMS_AQ
関数を使用して、データベース内でsys.aq$_jms_stream_message
型として表示されるJMS StreamMessage
を移入およびエンキューする方法を紹介します。このメッセージは、後でOracle JMSクライアントによってデキューできます。
例6-101では、JMS型メンバー関数とDBMS_AQ
関数を使用して、データベース内でsys.aq$_jms_stream_message
型として表示されるJMS StreamMessage
からデータをデキューおよび取り出す方法を紹介します。このメッセージは、Oracle JMSクライアントによってエンキューされる場合もあります。
例6-100 JMS StreamMessageの移入とエンキュー
set echo off set verify off DROP USER jmsuser CASCADE; ACCEPT password CHAR PROMPT 'Enter the password for JMSUSER: ' HIDE CREATE USER jmsuser IDENTIFIED BY &password; GRANT EXECUTE ON DBMS_AQADM TO jmsuser; GRANT EXECUTE ON DBMS_AQ TO jmsuser; GRANT EXECUTE ON DBMS_LOB TO jmsuser; GRANT EXECUTE ON DBMS_JMS_PLSQL TO jmsuser; connect jmsuser/&password SET ECHO ON set serveroutput on DECLARE id pls_integer; agent sys.aq$_agent := sys.aq$_agent(' ', null, 0); message sys.aq$_jms_stream_message; enqueue_options dbms_aq.enqueue_options_t; message_properties dbms_aq.message_properties_t; msgid raw(16); java_exp exception; pragma EXCEPTION_INIT(java_exp, -24197); BEGIN -- Consturct a empty StreamMessage object message := sys.aq$_jms_stream_message.construct; -- Shows how to set the JMS header message.set_replyto(agent); message.set_type('tkaqpet1'); message.set_userid('jmsuser'); message.set_appid('plsql_enq'); message.set_groupid('st'); message.set_groupseq(1); -- Shows how to set JMS user properties message.set_string_property('color', 'RED'); message.set_int_property('year', 1999); message.set_float_property('price', 16999.99); message.set_long_property('mileage', 300000); message.set_boolean_property('import', True); message.set_byte_property('password', -127); -- Shows how to populate the message payload of aq$_jms_stream_message -- Passing -1 reserve a new slot within the message store of sys.aq$_jms_stream_message. -- The maximum number of sys.aq$_jms_stream_message type of messges to be operated at -- the same time within a session is 20. Calling clean_body function with parameter -1 -- might result a ORA-24199 error if the messages currently operated is already 20. -- The user is responsible to call clean or clean_all function to clean up message store. id := message.clear_body(-1); -- Write data into the message paylaod. These functions are analogy of JMS JAVA api's. -- See the document for detail. -- Write a byte to the StreamMessage payload message.write_byte(id, 10); -- Write a RAW data as byte array to the StreamMessage payload message.write_bytes(id, UTL_RAW.XRANGE(HEXTORAW('00'), HEXTORAW('FF'))); -- Write a portion of the RAW data as byte array to the StreamMessage payload -- Note the offset follows JAVA convention, starting from 0 message.write_bytes(id, UTL_RAW.XRANGE(HEXTORAW('00'), HEXTORAW('FF')), 0, 16); -- Write a char to the StreamMessage payload message.write_char(id, 'A'); -- Write a double to the StreamMessage payload message.write_double(id, 9999.99); -- Write a float to the StreamMessage payload message.write_float(id, 99.99); -- Write a int to the StreamMessage payload message.write_int(id, 12345); -- Write a long to the StreamMessage payload message.write_long(id, 1234567); -- Write a short to the StreamMessage payload message.write_short(id, 123); -- Write a String to the StreamMessage payload message.write_string(id, 'Hello World!'); -- Flush the data from JAVA stored procedure (JServ) to PL/SQL side -- Without doing this, the PL/SQL message is still empty. message.flush(id); -- Use either clean_all or clean to clean up the message store when the user -- do not plan to do paylaod population on this message anymore sys.aq$_jms_stream_message.clean_all(); --message.clean(id); -- Enqueue this message into AQ queue using DBMS_AQ package dbms_aq.enqueue(queue_name => 'jmsuser.jms_stream_que', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => msgid); EXCEPTION WHEN java_exp THEN dbms_output.put_line('exception information:'); display_exp(sys.aq$_jms_stream_message.get_exception()); END; / commit;
例6-101 JMS StreamMessageからのデータのデキューおよび取出し
set echo off set verify off DROP USER jmsuser CASCADE; ACCEPT password CHAR PROMPT 'Enter the password for JMSUSER: ' HIDE CREATE USER jmsuser IDENTIFIED BY &password; GRANT EXECUTE ON DBMS_AQADM TO jmsuser; GRANT EXECUTE ON DBMS_AQ TO jmsuser; GRANT EXECUTE ON DBMS_LOB TO jmsuser; GRANT EXECUTE ON DBMS_JMS_PLSQL TO jmsuser; connect jmsuser/&password set echo on set serveroutput on DECLARE id pls_integer; blob_data blob; clob_data clob; message sys.aq$_jms_stream_message; agent sys.aq$_agent; dequeue_options dbms_aq.dequeue_options_t; message_properties dbms_aq.message_properties_t; msgid raw(16); gdata sys.aq$_jms_value; java_exp exception; pragma EXCEPTION_INIT(java_exp, -24197); BEGIN DBMS_OUTPUT.ENABLE (20000); -- Dequeue this message from AQ queue using DBMS_AQ package dbms_aq.dequeue(queue_name => 'jmsuser.jms_stream_que', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => msgid); -- Retrieve the header agent := message.get_replyto; dbms_output.put_line('Type: ' || message.get_type || ' UserId: ' || message.get_userid || ' AppId: ' || message.get_appid || ' GroupId: ' || message.get_groupid || ' GroupSeq: ' || message.get_groupseq); -- Retrieve the user properties dbms_output.put_line('price: ' || message.get_float_property('price')); dbms_output.put_line('color: ' || message.get_string_property('color')); IF message.get_boolean_property('import') = TRUE THEN dbms_output.put_line('import: Yes' ); ELSIF message.get_boolean_property('import') = FALSE THEN dbms_output.put_line('import: No' ); END IF; dbms_output.put_line('year: ' || message.get_int_property('year')); dbms_output.put_line('mileage: ' || message.get_long_property('mileage')); dbms_output.put_line('password: ' || message.get_byte_property('password')); -- Shows how to retrieve the message payload of aq$_jms_stream_message -- The prepare call send the content in the PL/SQL aq$_jms_stream_message object to -- JAVA stored procedure(Jserv) in the form of byte array. -- Passing -1 reserve a new slot within the message store of sys.aq$_jms_stream_message. -- The maximum number of sys.aq$_jms_stream_message type of messges to be operated at -- the same time within a session is 20. Calling clean_body function with parameter -1 -- might result a ORA-24199 error if the messages currently operated is already 20. -- The user is responsible to call clean or clean_all function to clean up message store. id := message.prepare(-1); -- Assume the users know the types of data in the StreamMessage payload. -- The user can use the specific read function corresponding with the data type. -- These functions are analogy of JMS JAVA api's. See the document for detail. dbms_output.put_line('Retrieve payload by Type:'); -- Read a byte from the StreamMessage payload dbms_output.put_line('read_byte:' || message.read_byte(id)); -- Read a byte array into a blob object from the StreamMessage payload dbms_output.put_line('read_bytes:'); message.read_bytes(id, blob_data); display_blob(blob_data); -- Read another byte array into a blob object from the StreamMessage payload dbms_output.put_line('read_bytes:'); message.read_bytes(id, blob_data); display_blob(blob_data); -- Read a char from the StreamMessage payload dbms_output.put_line('read_char:'|| message.read_char(id)); -- Read a double from the StreamMessage payload dbms_output.put_line('read_double:'|| message.read_double(id)); -- Read a float from the StreamMessage payload dbms_output.put_line('read_float:'|| message.read_float(id)); -- Read a int from the StreamMessage payload dbms_output.put_line('read_int:'|| message.read_int(id)); -- Read a long from the StreamMessage payload dbms_output.put_line('read_long:'|| message.read_long(id)); -- Read a short from the StreamMessage payload dbms_output.put_line('read_short:'|| message.read_short(id)); -- Read a String into a clob data from the StreamMessage payload dbms_output.put_line('read_string:'); message.read_string(id, clob_data); display_clob(clob_data); -- Assume the users do not know the types of data in the StreamMessage payload. -- The user can use read_object method to read the data into a sys.aq$_jms_value object -- These functions are analogy of JMS JAVA api's. See the document for detail. -- Reset the stream pointer to the begining of the message so that we can read throught -- the message payload again. message.reset(id); LOOP message.read_object(id, gdata); IF gdata IS NULL THEN EXIT; END IF; CASE gdata.type WHEN sys.dbms_jms_plsql.DATA_TYPE_BYTE THEN dbms_output.put_line('read_object/byte:' || gdata.num_val); WHEN sys.dbms_jms_plsql.DATA_TYPE_SHORT THEN dbms_output.put_line('read_object/short:' || gdata.num_val); WHEN sys.dbms_jms_plsql.DATA_TYPE_INTEGER THEN dbms_output.put_line('read_object/int:' || gdata.num_val); WHEN sys.dbms_jms_plsql.DATA_TYPE_LONG THEN dbms_output.put_line('read_object/long:' || gdata.num_val); WHEN sys.dbms_jms_plsql.DATA_TYPE_FLOAT THEN dbms_output.put_line('read_object/float:' || gdata.num_val); WHEN sys.dbms_jms_plsql.DATA_TYPE_DOUBLE THEN dbms_output.put_line('read_object/double:' || gdata.num_val); WHEN sys.dbms_jms_plsql.DATA_TYPE_BOOLEAN THEN dbms_output.put_line('read_object/boolean:' || gdata.num_val); WHEN sys.dbms_jms_plsql.DATA_TYPE_CHARACTER THEN dbms_output.put_line('read_object/char:' || gdata.char_val); WHEN sys.dbms_jms_plsql.DATA_TYPE_STRING THEN dbms_output.put_line('read_object/string:'); display_clob(gdata.text_val); WHEN sys.dbms_jms_plsql.DATA_TYPE_BYTES THEN dbms_output.put_line('read_object/bytes:'); display_blob(gdata.bytes_val); ELSE dbms_output.put_line('No such data type'); END CASE; END LOOP; -- Use either clean_all or clean to clean up the message store when the user -- do not plan to do paylaod retrieving on this message anymore message.clean(id); -- sys.aq$_jms_stream_message.clean_all(); EXCEPTION WHEN java_exp THEN dbms_output.put_line('exception information:'); display_exp(sys.aq$_jms_stream_message.get_exception()); END; / commit;
JMS MapMessageの例
この項では、JMS MapMessage
のエンキューおよびデキューの説明の例を示します。
例6-102では、JMS型メンバー関数とDBMS_AQ
関数を使用して、データベース内でsys.aq$_jms_map_message
型として表示されるJMS MapMessage
を移入およびエンキューする方法を紹介します。このメッセージは、後でOracle JMSクライアントによってデキューできます。
例6-103では、JMS型メンバー関数とDBMS_AQ
関数を使用して、データベース内でsys.aq$_jms_map_message
型として表示されるJMS MapMessage
からデータをデキューおよび取り出す方法を紹介します。このメッセージは、Oracle JMSクライアントによってエンキューされる場合もあります。
例6-102 JMS MapMessageの移入とエンキュー
set echo off set verify off DROP USER jmsuser CASCADE; ACCEPT password CHAR PROMPT 'Enter the password for JMSUSER: ' HIDE CREATE USER jmsuser IDENTIFIED BY &password; GRANT EXECUTE ON DBMS_AQADM TO jmsuser; GRANT EXECUTE ON DBMS_AQ TO jmsuser; GRANT EXECUTE ON DBMS_LOB TO jmsuser; GRANT EXECUTE ON DBMS_JMS_PLSQL TO jmsuser; connect jmsuser/&password SET ECHO ON set serveroutput on DECLARE id pls_integer; agent sys.aq$_agent := sys.aq$_agent(' ', null, 0); message sys.aq$_jms_map_message; enqueue_options dbms_aq.enqueue_options_t; message_properties dbms_aq.message_properties_t; msgid raw(16); java_exp exception; pragma EXCEPTION_INIT(java_exp, -24197); BEGIN -- Consturct a empty map message object message := sys.aq$_jms_map_message.construct; -- Shows how to set the JMS header message.set_replyto(agent); message.set_type('tkaqpet1'); message.set_userid('jmsuser'); message.set_appid('plsql_enq'); message.set_groupid('st'); message.set_groupseq(1); -- Shows how to set JMS user properties message.set_string_property('color', 'RED'); message.set_int_property('year', 1999); message.set_float_property('price', 16999.99); message.set_long_property('mileage', 300000); message.set_boolean_property('import', True); message.set_byte_property('password', -127); -- Shows how to populate the message payload of aq$_jms_map_message -- Passing -1 reserve a new slot within the message store of sys.aq$_jms_map_message. -- The maximum number of sys.aq$_jms_map_message type of messges to be operated at -- the same time within a session is 20. Calling clean_body function with parameter -1 -- might result a ORA-24199 error if the messages currently operated is already 20. -- The user is responsible to call clean or clean_all function to clean up message store. id := message.clear_body(-1); -- Write data into the message paylaod. These functions are analogy of JMS JAVA api's. -- See the document for detail. -- Set a byte entry in map message payload message.set_byte(id, 'BYTE', 10); -- Set a byte array entry using RAW data in map message payload message.set_bytes(id, 'BYTES', UTL_RAW.XRANGE(HEXTORAW('00'), HEXTORAW('FF'))); -- Set a byte array entry using only a portion of the RAW data in map message payload -- Note the offset follows JAVA convention, starting from 0 message.set_bytes(id, 'BYTES_PART', UTL_RAW.XRANGE(HEXTORAW('00'), HEXTORAW('FF')), 0, 16); -- Set a char entry in map message payload message.set_char(id, 'CHAR', 'A'); -- Set a double entry in map message payload message.set_double(id, 'DOUBLE', 9999.99); -- Set a float entry in map message payload message.set_float(id, 'FLOAT', 99.99); -- Set a int entry in map message payload message.set_int(id, 'INT', 12345); -- Set a long entry in map message payload message.set_long(id, 'LONG', 1234567); -- Set a short entry in map message payload message.set_short(id, 'SHORT', 123); -- Set a String entry in map message payload message.set_string(id, 'STRING', 'Hello World!'); -- Flush the data from JAVA stored procedure (JServ) to PL/SQL side -- Without doing this, the PL/SQL message is still empty. message.flush(id); -- Use either clean_all or clean to clean up the message store when the user -- do not plan to do paylaod population on this message anymore sys.aq$_jms_map_message.clean_all(); --message.clean(id); -- Enqueue this message into AQ queue using DBMS_AQ package dbms_aq.enqueue(queue_name => 'jmsuser.jms_map_que', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => msgid); END; / commit;
例6-103 JMS MapMessageからのデータのデキューおよび取出し
set echo off set verify off DROP USER jmsuser CASCADE; ACCEPT password CHAR PROMPT 'Enter the password for JMSUSER: ' HIDE CREATE USER jmsuser IDENTIFIED BY &password; GRANT EXECUTE ON DBMS_AQADM TO jmsuser; GRANT EXECUTE ON DBMS_AQ TO jmsuser; GRANT EXECUTE ON DBMS_LOB TO jmsuser; GRANT EXECUTE ON DBMS_JMS_PLSQL TO jmsuser; connect jmsuser/&password set echo on set serveroutput on DECLARE id pls_integer; blob_data blob; clob_data clob; message sys.aq$_jms_map_message; agent sys.aq$_agent; dequeue_options dbms_aq.dequeue_options_t; message_properties dbms_aq.message_properties_t; msgid raw(16); name_arr sys.aq$_jms_namearray; gdata sys.aq$_jms_value; java_exp exception; pragma EXCEPTION_INIT(java_exp, -24197); BEGIN DBMS_OUTPUT.ENABLE (20000); -- Dequeue this message from AQ queue using DBMS_AQ package dbms_aq.dequeue(queue_name => 'jmsuser.jms_map_que', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => msgid); -- Retrieve the header agent := message.get_replyto; dbms_output.put_line('Type: ' || message.get_type || ' UserId: ' || message.get_userid || ' AppId: ' || message.get_appid || ' GroupId: ' || message.get_groupid || ' GroupSeq: ' || message.get_groupseq); -- Retrieve the user properties dbms_output.put_line('price: ' || message.get_float_property('price')); dbms_output.put_line('color: ' || message.get_string_property('color')); IF message.get_boolean_property('import') = TRUE THEN dbms_output.put_line('import: Yes' ); ELSIF message.get_boolean_property('import') = FALSE THEN dbms_output.put_line('import: No' ); END IF; dbms_output.put_line('year: ' || message.get_int_property('year')); dbms_output.put_line('mileage: ' || message.get_long_property('mileage')); dbms_output.put_line('password: ' || message.get_byte_property('password')); -- Shows how to retrieve the message payload of aq$_jms_map_message -- 'Prepare' sends the content in the PL/SQL aq$_jms_map_message object to -- Java stored procedure(Jserv) in the form of byte array. -- Passing -1 reserve a new slot within the message store of -- sys.aq$_jms_map_message. The maximum number of sys.aq$_jms_map_message -- type of messges to be operated at the same time within a session is 20. -- Calling clean_body function with parameter -1 -- might result a ORA-24199 error if the messages currently operated is -- already 20. The user is responsible to call clean or clean_all function -- to clean up message store. id := message.prepare(-1); -- Assume the users know the names and types in the map message payload. -- The user can use names to get the corresponsing values. -- These functions are analogous to JMS Java API's. See JMS Types chapter -- for detail. dbms_output.put_line('Retrieve payload by Name:'); -- Get a byte entry from the map message payload dbms_output.put_line('get_byte:' || message.get_byte(id, 'BYTE')); -- Get a byte array entry from the map message payload dbms_output.put_line('get_bytes:'); message.get_bytes(id, 'BYTES', blob_data); display_blob(blob_data); -- Get another byte array entry from the map message payload dbms_output.put_line('get_bytes:'); message.get_bytes(id, 'BYTES_PART', blob_data); display_blob(blob_data); -- Get a char entry from the map message payload dbms_output.put_line('get_char:'|| message.get_char(id, 'CHAR')); -- get a double entry from the map message payload dbms_output.put_line('get_double:'|| message.get_double(id, 'DOUBLE')); -- Get a float entry from the map message payload dbms_output.put_line('get_float:'|| message.get_float(id, 'FLOAT')); -- Get a int entry from the map message payload dbms_output.put_line('get_int:'|| message.get_int(id, 'INT')); -- Get a long entry from the map message payload dbms_output.put_line('get_long:'|| message.get_long(id, 'LONG')); -- Get a short entry from the map message payload dbms_output.put_line('get_short:'|| message.get_short(id, 'SHORT')); -- Get a String entry from the map message payload dbms_output.put_line('get_string:'); message.get_string(id, 'STRING', clob_data); display_clob(clob_data); -- Assume users do not know names and types in map message payload. -- User can first retrieve the name array containing all names in the -- payload and iterate through the name list and get the corresponding -- value. These functions are analogous to JMS Java API's. -- See JMS Type chapter for detail. dbms_output.put_line('Retrieve payload by iteration:'); -- Get the name array from the map message payload name_arr := message.get_names(id); -- Iterate through the name array to retrieve the value for each of the name. FOR i IN name_arr.FIRST..name_arr.LAST LOOP -- Test if a name exist in the map message payload -- (It is not necessary in this case, just a demostration on how to use it) IF message.item_exists(id, name_arr(i)) THEN dbms_output.put_line('item exists:'||name_arr(i)); -- Because we do not know the type of entry, we must use sys.aq$_jms_value -- type object for the data returned message.get_object(id, name_arr(i), gdata); IF gdata IS NOT NULL THEN CASE gdata.type WHEN sys.dbms_jms_plsql.DATA_TYPE_BYTE THEN dbms_output.put_line('get_object/byte:' || gdata.num_val); WHEN sys.dbms_jms_plsql.DATA_TYPE_SHORT THEN dbms_output.put_line('get_object/short:' || gdata.num_val); WHEN sys.dbms_jms_plsql.DATA_TYPE_INTEGER THEN dbms_output.put_line('get_object/int:' || gdata.num_val); WHEN sys.dbms_jms_plsql.DATA_TYPE_LONG THEN dbms_output.put_line('get_object/long:' || gdata.num_val); WHEN sys.dbms_jms_plsql.DATA_TYPE_FLOAT THEN dbms_output.put_line('get_object/float:' || gdata.num_val); WHEN sys.dbms_jms_plsql.DATA_TYPE_DOUBLE THEN dbms_output.put_line('get_object/double:' || gdata.num_val); WHEN sys.dbms_jms_plsql.DATA_TYPE_BOOLEAN THEN dbms_output.put_line('get_object/boolean:' || gdata.num_val); WHEN sys.dbms_jms_plsql.DATA_TYPE_CHARACTER THEN dbms_output.put_line('get_object/char:' || gdata.char_val); WHEN sys.dbms_jms_plsql.DATA_TYPE_STRING THEN dbms_output.put_line('get_object/string:'); display_clob(gdata.text_val); WHEN sys.dbms_jms_plsql.DATA_TYPE_BYTES THEN dbms_output.put_line('get_object/bytes:'); display_blob(gdata.bytes_val); ELSE dbms_output.put_line('No such data type'); END CASE; END IF; ELSE dbms_output.put_line('item not exists:'||name_arr(i)); END IF; END LOOP; -- Use either clean_all or clean to clean up the message store when the user -- do not plan to do paylaod population on this message anymore message.clean(id); -- sys.aq$_jms_map_message.clean_all(); EXCEPTION WHEN java_exp THEN dbms_output.put_line('exception information:'); display_exp(sys.aq$_jms_stream_message.get_exception()); END; / commit;
その他のOracle Database Advanced Queuing JMSの例
例6-104のサンプル・プログラムでは、JMS TEXT
メッセージを保持するために、Oracle JMS管理インタフェースによって作成されるOracle Database Advanced Queuingキュー内に、大きなTextMessage
を(JMSユーザー・プロパティとともに)エンキューします。この例でエンキューされるTextMessage
およびBytesMessage
は、どちらもOracle JMSクライアントでデキューできます。
例6-105のサンプル・プログラムでは、大きなBytesMessage
をエンキューします。
例6-104 大きなTextMessageのエンキュー
DECLARE text varchar2(32767); agent sys.aq$_agent := sys.aq$_agent(' ', null, 0); message sys.aq$_jms_text_message; enqueue_options dbms_aq.enqueue_options_t; message_properties dbms_aq.message_properties_t; msgid raw(16); BEGIN message := sys.aq$_jms_text_message.construct; message.set_replyto(agent); message.set_type('tkaqpet2'); message.set_userid('jmsuser'); message.set_appid('plsql_enq'); message.set_groupid('st'); message.set_groupseq(1); message.set_boolean_property('import', True); message.set_string_property('color', 'RED'); message.set_short_property('year', 1999); message.set_long_property('mileage', 300000); message.set_double_property('price', 16999.99); message.set_byte_property('password', 127); FOR i IN 1..500 LOOP text := CONCAT (text, '1234567890'); END LOOP; message.set_text(text); dbms_aq.enqueue(queue_name => 'jmsuser.jms_text_t1', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => msgid); END;
例6-105 大きなBytesMessageのエンキュー
DECLARE text VARCHAR2(32767); bytes RAW(32767); agent sys.aq$_agent := sys.aq$_agent(' ', null, 0); message sys.aq$_jms_bytes_message; body BLOB; position INT; enqueue_options dbms_aq.enqueue_options_t; message_properties dbms_aq.message_properties_t; msgid raw(16); BEGIN message := sys.aq$_jms_bytes_message.construct; message.set_replyto(agent); message.set_type('tkaqper4'); message.set_userid('jmsuser'); message.set_appid('plsql_enq_raw'); message.set_groupid('st'); message.set_groupseq(1); message.set_boolean_property('import', True); message.set_string_property('color', 'RED'); message.set_short_property('year', 1999); message.set_long_property('mileage', 300000); message.set_double_property('price', 16999.99); -- prepare a huge payload into a blob FOR i IN 1..1000 LOOP text := CONCAT (text, '0123456789ABCDEF'); END LOOP; bytes := HEXTORAW(text); dbms_lob.createtemporary(lob_loc => body, cache => TRUE); dbms_lob.open (body, DBMS_LOB.LOB_READWRITE); position := 1 ; FOR i IN 1..10 LOOP dbms_lob.write ( lob_loc => body, amount => FLOOR((LENGTH(bytes)+1)/2), offset => position, buffer => bytes); position := position + FLOOR((LENGTH(bytes)+1)/2) ; END LOOP; -- end of the preparation message.set_bytes(body); dbms_aq.enqueue(queue_name => 'jmsuser.jms_bytes_t1', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => msgid); dbms_lob.freetemporary(lob_loc => body); END;