Microsoft .NET 対応の WebLogic JMS クライアントの使用

     前  次    新しいウィンドウで目次を開く     
ここから内容の開始

JMS .NET クライアント サンプル アプリケーション

 


MessagingSample.cs

以下の C# で記述された .NET クライアント サンプル プログラムでは、WebLogic JMS .NET API の基本的な機能の概要を示します。API の詳細については、.NET クライアントの WebLogic メッセージング API リファレンスを参照してください。

このサンプルのコピーを作成してフォーマットを維持するには、サポートされるブラウザで MessagingSample.cs ファイルを開き、テキストをコピーして任意のエディタに貼り付けてください。

コード リスト A-1 MessagingSample.cs
using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;

using WebLogic.Messaging;


/// <summary> WebLogic JMS .NET API の例示
/// <para>
/// このコマンドライン プログラムは WebLogic JMS に接続して、
/// キューとトピックのメッセージング処理を実行する。
/// バージョン 10g リリース 3 以降でサポートされる。
/// プログラムをコンパイルするには、「WebLogic.Messaging.dll」にリンクする。
/// 使用方法を表示するには、パラメータとして「-help」を指定してプログラムを実行する。
/// </para>
/// <para>
/// Copyright 1996,2008, Oracle and/or its affiliates. All rights reserved.
/// </para>
/// </summary>

public class MessagingSample
{
private static string NL = Environment.NewLine;

private string host = "localhost";
private int port = 7001;

private string cfName = "weblogic.jms.ConnectionFactory";
private string queueName = "jms.queue.TestQueue1";
private string topicName = "jms.topic.TestTopic1";

private static string USAGE =
"Usage: " + Environment.GetCommandLineArgs()[0] + NL +
" [-host <hostname>] [-port <portnum>] " + NL +
" [-cf <connection factory JNDI name>] " + NL +
" [-queue <queue JNDI name>] [-topic <topic JNDI name>]";


public static void Main(string[] args)
{
try {
MessagingSample ms = new MessagingSample();

      // コマンドライン引数でデフォルトをオーバーライド
      if (!ms.ParseCommandLine(args)) return;

ms.DemoSyncQueueReceiveWithAutoAcknowledge();

ms.DemoAsyncNondurableTopicConsumerAutoAcknowledge();

ms.DemoSyncTopicDurableSubscriberClientAcknowledge();

} catch (Exception e) {
Console.WriteLine(e);
}
}


private void DemoSyncQueueReceiveWithAutoAcknowledge()
{
Console.WriteLine(
NL + "-- DemoSyncQueueReceiveWithAutoAcknowledge -- " + NL);

    // ------------------------------------------------
    // WebLogic へのネットワーク接続を確立してログイン
    // ------------------------------------------------

    IDictionary<string, Object> paramMap = new Dictionary<string, Object>();
    
    paramMap[Constants.Context.PROVIDER_URL] =
      "t3://" + this.host + ":" + this.port;

    IContext context = ContextFactory.CreateContext(paramMap);

    try {
      // -------------------------------------
      // コンテキストでリソースをルックアップ
      // -------------------------------------

IConnectionFactory cf = context.LookupConnectionFactory(this.cfName);

IQueue queue = (IQueue)context.LookupDestination(this.queueName);

      // -------------------------------------------------
      // 接続ファクトリを使用して接続を作成
      // -------------------------------------------------

IConnection connection = cf.CreateConnection();

      // -----------------------------------------------------------------
      // 受信側がメッセージを取得できるように接続を開始
      // -----------------------------------------------------------------

connection.Start();

      // -----------------
      // セッションを作成
      // -----------------
      // 重要 : セッションはスレッドセーフではない。プロデューサとコンシューマを
      // 同時に実行する必要がある場合は、複数のセッションを使用する。
      // 詳細については、以下の非同期コンシューマ例を参照。
      //

ISession session = connection.CreateSession(
Constants.SessionMode.AUTO_ACKNOWLEDGE);

      // ------------------------------------------------
      // プロデューサを作成して永続メッセージを送信
      // ------------------------------------------------

IMessageProducer producer = session.CreateProducer(queue);

producer.DeliveryMode = Constants.DeliveryMode.PERSISTENT;

ITextMessage sendMessage = session.CreateTextMessage("My q message");

producer.Send(sendMessage);

PrintMessage("Sent Message:", sendMessage);

      // ----------------------------------------
      // コンシューマを作成してメッセージを受信
      // ----------------------------------------
      // コンシューマのセッションが AUTO_ACKNOWLEDGE モードで作成されたため、
      // メッセージはサーバから自動的に削除される。
      //

IMessageConsumer consumer = session.CreateConsumer(queue);

IMessage recvMessage = consumer.Receive(500);

PrintMessage("Received Message:", recvMessage);

      // ------------------------------------------------------------------
      // 接続を閉じる。接続を閉じるとその子セッション、コンシューマ、
      // プロデューサも閉じられる。
      // ------------------------------------------------------------------

connection.Close();

} finally {

      // ------------------------------------------------------------------
      // コンテキストを閉じる。CloseAll メソッドでは、ネットワーク接続と、
      // 関連する開いている接続、セッション、プロデューサ、コンシューマを
      // すべて閉じる。
      // ------------------------------------------------------------------

context.CloseAll();
}
}

  // MessageEventHandler 委託を実装する。
  // 非同期で配信されたメッセージを受信する。

public void OnMessage(IMessageConsumer consumer, MessageEventArgs args) {
PrintMessage("Received Message Asynchronously:", args.Message);

    // -----------------------------------------------------------------
    // コンシューマのセッションが CLIENT_ACKNOWLEDGE である場合は、
    // args.Message.Acknowledge() を呼び出してメッセージの再配信を防止するか、
    // consumer.Session.Recover() を呼び出して再配信を強制すること。
    // 同様に、コンシューマのセッションが TRANSACTED である場合は、
    // consumer.Session.Commit() を呼び出してメッセージの再配信を防止するか、
    // consumer.Session.Rollback() を呼び出して再配信を強制すること。
}

private void DemoAsyncNondurableTopicConsumerAutoAcknowledge()
{
Console.WriteLine(
NL + "-- DemoAsyncNondurableTopicConsumerAutoAcknowledge -- " + NL);

    // ------------------------------------------------
    // WebLogic へのネットワーク接続を確立してログイン
    // ------------------------------------------------

IDictionary<string, Object> paramMap = new Dictionary<string, Object>();

paramMap[Constants.Context.PROVIDER_URL] =
"t3://" + this.host + ":" + this.port;

IContext context = ContextFactory.CreateContext(paramMap);

try {
      // -------------------------------------
      // コンテキストでリソースをルックアップ
      // -------------------------------------

IConnectionFactory cf = context.LookupConnectionFactory(this.cfName);

ITopic topic = (ITopic)context.LookupDestination(this.topicName);

      // --------------------------------------------------------------
      // 接続ファクトリを使用して接続を作成して開始
      // --------------------------------------------------------------

IConnection connection = cf.CreateConnection();

      // -----------------------------------------------------------------
      // 受信側がメッセージを取得できるように接続を開始
      // -----------------------------------------------------------------

connection.Start();

      // ------------------------------------------
      // 非同期コンシューマの委託を作成
      // ------------------------------------------
      // セッションとコンシューマを作成する。
      // 非同期に到着するメッセージをリスンする委託も指定する。
      //
      // キュー コンシューマとは異なり、メッセージを受信するには、
      // メッセージが送信される「前に」トピック コンシューマを作成しておく必要がある。
      //
      // 重要 : セッションはスレッドセーフではない。プロデューサと
      // コンシューマを同時に実行するために、複数のセッションを使用する。
      // OnMessage をイベント ハンドラとして指定すると、イベント ハンドラに
      // 対するメッセージは別のスレッドに届く可能性があるため、コンシューマ
      // セッションとそのプロデューサとコンシューマを OnMessage コールバック
      // の外側で使用できなくなる。
      //

ISession consumerSession = connection.CreateSession(
Constants.SessionMode.AUTO_ACKNOWLEDGE);

IMessageConsumer consumer = consumerSession.CreateConsumer(topic);

consumer.Message += new MessageEventHandler(this.OnMessage);

      // -------------
      // メッセージを送信
      // -------------
      // プロデューサを作成して非永続メッセージを送信する。
      // トピックをサブスクライブしている非恒久コンシューマしか
      // 存在しないため、メッセージは「永続」として送信された場合でも、
      // 自動的に「非永続」にダウングレードされる。
      //

ISession producerSession = connection.CreateSession(
Constants.SessionMode.AUTO_ACKNOWLEDGE);

IMessageProducer producer = producerSession.CreateProducer(topic);

producer.DeliveryMode = Constants.DeliveryMode.NON_PERSISTENT;

ITextMessage sendMessage = producerSession.CreateTextMessage(
"My topic message");

producer.Send(sendMessage);

PrintMessage("Sent Message:", sendMessage);

      // -----------------
      // メッセージを待機
      // -----------------
      // 委託がメッセージを受信して自動的に確認応答できるように、
      // 1 秒間スリープする。
      // 委託はメッセージを受信するとコンソールに出力する。
      //

Thread.Sleep(1000);


      // ---------
      // クリーンアップ
      // ---------
      // 単に connection.Close() を呼び出してもよい
      // (接続のセッションなどが閉じられる)。
      // または、単に context.CloseAll() を呼び出してもよい。
      // しかし、ここでは個々のリソースを閉じる方法を示す。
      //

producer.Close();
consumer.Close();
producerSession.Close();
consumerSession.Close();
connection.Close();

} finally {

      // -------------------------------------------------------------
      // コンテキストを閉じる。CloseAll メソッドは、ネットワーク接続と、
      // 開いている JMS 接続、セッション、プロデューサ、または
      // コンシューマを閉じる。
      // -------------------------------------------------------------

context.CloseAll();
}
}

private void DemoSyncTopicDurableSubscriberClientAcknowledge() {

Console.WriteLine(
NL + "-- DemoSyncTopicDurableSubscriberClientAcknowledge -- " + NL);

    // ------------------------------------------------
    // WebLogic へのネットワーク接続を確立してログイン
    // ------------------------------------------------

IDictionary<string, Object> paramMap = new Dictionary<string, Object>();

paramMap[Constants.Context.PROVIDER_URL] =
"t3://" + this.host + ":" + this.port;

IContext context = ContextFactory.CreateContext(paramMap);

try {
      // -------------------------------------
      // コンテキストでリソースをルックアップ
      // -------------------------------------

IConnectionFactory cf = context.LookupConnectionFactory(this.cfName);

ITopic topic = (ITopic)context.LookupDestination(this.topicName);

      // -------------------------------------------------
      // 接続ファクトリを使用して接続を作成
      // -------------------------------------------------

IConnection connection = cf.CreateConnection();

      // --------------------------------------------
      // ユニークなクライアント ID を接続に割り当て
      // --------------------------------------------
      // 恒久サブスクライバは割り当てられたクライアント ID を使って
      // 接続を使用する必要がある。クラスタ内に一度に存在できるのは、
      // 特定のクライアント ID を持つ 1 つの接続のみ。
      // API を使用する代わりに、接続ファクトリのコンフィグレーションから
      // クライアント ID をコンフィグレーションする方法もある。

connection.ClientID = "MyConnectionID";

     // -----------------------------------------------------------------
      // コンシューマがメッセージを取得できるように接続を開始
      // -----------------------------------------------------------------

connection.Start();

      // -----------------
      // セッションを作成
      // -----------------
      // 重要 : セッションはスレッドセーフではない。プロデューサとコンシューマを
      // 同時に実行する必要がある場合は、複数のセッションを使用する。
      // 詳細については、以下の非同期コンシューマ例を参照。
      //

ISession session = connection.CreateSession(
Constants.SessionMode.CLIENT_ACKNOWLEDGE);

      // -----------------------------------------------
      // 恒久サブスクリプションとそのコンシューマを作成
      // -----------------------------------------------
      // 接続 ID「MyConnectionID」とサブスクリプション ID「MySubscriberID」で、
      // 一度に 1 つのコンシューマのみが恒久サブスクリプションに
      // アタッチできる。
      //
      // キュー コンシューマとは異なり、メッセージを受信するには、
      // メッセージが送信される「前に」トピック コンシューマを作成しておく必要がある。
      //

IMessageConsumer consumer = session.CreateDurableSubscriber(
topic, "MySubscriberID");

      // ------------------------------------------------
      // プロデューサを作成して永続メッセージを送信
      // ------------------------------------------------

IMessageProducer producer = session.CreateProducer(topic);

producer.DeliveryMode = Constants.DeliveryMode.PERSISTENT;

ITextMessage sendMessage = session.CreateTextMessage("My durable message");

producer.Send(sendMessage);

PrintMessage("Sent Message To Durable Subscriber:", sendMessage);

      // ----------------------------------------------------
      // コンシューマのクローズと再作成を例示する。
      //
      // 同じ接続 ID とサブスクリプション ID を指定するので、
      // 新しいコンシューマは上記で作成された恒久サブスクリプション
      // に暗黙的に接続する。
      //
      // 恒久サブスクリプションは存在し続け、
      // コンシューマがない場合は新しいメッセージを蓄積する。
      // クライアントまたはサーバがクラッシュして再起動した場合は、
      // 永続メッセージを保持する。
      //
      // 非恒久サブスクリプションとそのメッセージは、閉じられた場合に、
      // またはホスト サーバが停止またはクラッシュした場合に、
      // 存在しなくなる。
      // ----------------------------------------------------

consumer.Close();

consumer = session.CreateDurableSubscriber(
topic, "MySubscriberID");

      // -------------------------------------------------------------------
      // クライアントの確認応答を例示する。メッセージを取得し、
      // 再配信を強制し、再びメッセージを取得して、最後にメッセージを削除する。
      // -------------------------------------------------------------------
      // クライアントの確認応答モードの場合、「acknowledge()」がメッセージを
      // 削除する一方で「recover()」がメッセージの再配信を強制する。
      // メッセージを確認応答せずにクライアント アプリケーションが
      // クラッシュするか閉じられた場合、メッセージは再配信される。

      ITextMessage recvMessage = (ITextMessage)consumer.Receive(500);

PrintMessage("Durable Subscriber Received Message:", recvMessage);

session.Recover();

recvMessage = (ITextMessage)consumer.Receive(500);

PrintMessage("Durable Subscriber Received Message Again:", recvMessage);

recvMessage.Acknowledge();

      // ------------------------------------------------------------
      // 恒久サブスクリプションを削除する。
      // 削除しないと、このデモが終了した後も存在し続ける。
      // ------------------------------------------------------------
      //

consumer.Close(); // コンシューマを閉じるが、サブスクリプションは削除されない

      session.Unsubscribe("MySubscriberID"); // サブスクリプションを削除

      // ------------------------------------------------------------------
      // 接続を閉じる。接続を閉じるとその子セッション、コンシューマ、
      // プロデューサも閉じられる。
      // ------------------------------------------------------------------

connection.Close();

} finally {

      // ------------------------------------------------------------------
      // コンテキストを閉じる。CloseAll メソッドでは、ネットワーク接続と、
      // 関連する開いている接続、セッション、プロデューサ、コンシューマを
      // すべて閉じる。
      // ------------------------------------------------------------------

context.CloseAll();
}
}

private void PrintMessage(String header, IMessage msg) {
string msgtext;

if (msg is ITextMessage)
msgtext = " Text=" + ((ITextMessage)msg).Text + NL;
else
msgtext = " The message is not an ITextMessage";

string dcProp =
Constants.MessagePropertyNames.DELIVERY_COUNT_PROPERTY_NAME;

System.Console.WriteLine(
header + NL +
" JMSMessageID=" + msg.JMSMessageID + NL +
" JMSRedelivered=" + msg.JMSRedelivered + NL +
" " + dcProp + "=" + msg.GetObjectProperty(dcProp) + NL +
msgtext);
}

private bool ParseCommandLine(string[] args)
{
int i = 0;
try {
for(i = 0; i < args.Length; i++) {
if (args[i].Equals("-host")) {
host = args[++i];
continue;
}
if (args[i].Equals("-port")) {
port = Convert.ToInt32(args[++i]);
continue;
}
if (args[i].Equals("-cf")) {
cfName = args[++i];
continue;
}
if (args[i].Equals("-queue")) {
queueName = args[++i];
continue;
}
if (args[i].Equals("-topic")) {
topicName = args[++i];
continue;
}
if (args[i].Equals("-help") || args[i].Equals("-?")) {
Console.WriteLine(USAGE);
return false;
}
Console.WriteLine("Unrecognized parameter '" + args[i] + "'.");
Console.WriteLine(USAGE);
return false;
}
} catch (System.IndexOutOfRangeException) {
Console.WriteLine(
"Missing argument for " + args[i - 1] + "."
);
Console.WriteLine(USAGE);
return false;
} catch (FormatException) {
Console.WriteLine(
"Invalid argument '" + args[i] + "' for " + args[i - 1] + "."
);
Console.WriteLine(USAGE);
return false;
}
Console.WriteLine(
"WebLogic JMS .NET Client Demo " + NL +
NL +
"Settings: " + NL +
" host = " + host + NL +
" port = " + port + NL +
" cf = " + cfName + NL +
" queue = " + queueName + NL +
" topic = " + topicName + NL
);
return true;
}
}

  ページの先頭       前  次