A JMS .NET Client Sample Application

This chapter provides a .NET client sample program written in C# which includes basic features of the WebLogic JMS .NET API. For details about the API, see the "WebLogic Messaging API Reference for .NET Clients" documentation.

Example A-1 MessagingSample.cs

using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;

using WebLogic.Messaging;


/// <summary> Demonstrate the WebLogic JMS .NET API.
/// <para>
/// This command line program connects to WebLogic JMS and performs
/// queue and topic messaging operations.  It is supported with
/// versions 10g Release 3 and later.  To compile the program,
/// link it with "WebLogic.Messaging.dll".   For usage information,
/// run the program with "-help" as a parameter.
/// </para>
/// <para>
/// Copyright 1996,2008, Oracle and/or its affiliates. All rights reserved. 
/// </para>
/// </summary>

public class MessagingSample
{
  private static string NL = Environment.NewLine;

  private string host      = "localhost";
  private int    port      = 7001;

  private string cfName    = "weblogic.jms.ConnectionFactory";
  private string queueName = "jms.queue.TestQueue1";
  private string topicName = "jms.topic.TestTopic1";

  private static string USAGE =
    "Usage: " + Environment.GetCommandLineArgs()[0] + NL +
    "          [-host <hostname>] [-port <portnum>] " + NL +
    "          [-cf <connection factory JNDI name>] " + NL +
    "          [-queue <queue JNDI name>] [-topic <topic JNDI name>]";


  public static void Main(string[] args) 
  {
    try {
      MessagingSample ms = new MessagingSample();

      // override defaults with command line arguments
      if (!ms.ParseCommandLine(args)) return;

      ms.DemoSyncQueueReceiveWithAutoAcknowledge();

      ms.DemoAsyncNondurableTopicConsumerAutoAcknowledge();

      ms.DemoSyncTopicDurableSubscriberClientAcknowledge();

    } catch (Exception e) {
      Console.WriteLine(e);
    }
  }


  private void DemoSyncQueueReceiveWithAutoAcknowledge()
  { 
    Console.WriteLine(
      NL + "-- DemoSyncQueueReceiveWithAutoAcknowledge -- " + NL);

    // ------------------------------------------------
    // Make a network connection to WebLogic and login:
    // ------------------------------------------------

    IDictionary<string, Object> paramMap = new Dictionary<string, Object>();
    
    paramMap[Constants.Context.PROVIDER_URL] = 
      "t3://" + this.host + ":" + this.port;

    IContext context = ContextFactory.CreateContext(paramMap);

    try {
      // -------------------------------------
      // Look up our resources in the context: 
      // -------------------------------------

      IConnectionFactory cf = context.LookupConnectionFactory(this.cfName);
  
      IQueue queue = (IQueue)context.LookupDestination(this.queueName);
  
      // -------------------------------------------------
      // Create a connection using the connection factory:
      // -------------------------------------------------
  
      IConnection connection = cf.CreateConnection();

      // -----------------------------------------------------------------
      // Start the connection in order to allow receivers to get messages:
      // -----------------------------------------------------------------

      connection.Start();

      // -----------------
      // Create a session:
      // -----------------
      // IMPORTANT:  Sessions are not thread-safe.   Use multiple sessions 
      // if you need to run producers and/or consumers concurrently. For 
      // more information, see the asynchronous consumer example below.
      //
  
      ISession session = connection.CreateSession(
         Constants.SessionMode.AUTO_ACKNOWLEDGE);

      // ------------------------------------------------
      // Create a producer and send a persistent message:
      // ------------------------------------------------

      IMessageProducer producer = session.CreateProducer(queue);

      producer.DeliveryMode = Constants.DeliveryMode.PERSISTENT;
  
      ITextMessage sendMessage = session.CreateTextMessage("My q message");
  
      producer.Send(sendMessage);
  
      PrintMessage("Sent Message:", sendMessage);

      // ----------------------------------------
      // Create a consumer and receive a message:
      // ----------------------------------------
      // The message will automatically be deleted from the server as the 
      // consumer's session was created in AUTO_ACKNOWLEDGE mode.
      //

      IMessageConsumer consumer = session.CreateConsumer(queue);

      IMessage recvMessage = consumer.Receive(500);

      PrintMessage("Received Message:", recvMessage);
  
      // ------------------------------------------------------------------
      // Close the connection.   Note that closing a connection also closes
      // its child sessions, consumers, and producers.
      // ------------------------------------------------------------------

      connection.Close();      

    } finally {

      // ------------------------------------------------------------------
      // Close the context.  The CloseAll method closes the network
      // connection and all related open connections, sessions, producers,
      // and consumers.
      // ------------------------------------------------------------------

      context.CloseAll();
    }
  }

  // Implement a MessageEventHandler delegate.  It will receive 
  // asynchronously delivered messages.

  public void OnMessage(IMessageConsumer consumer, MessageEventArgs args) {
    PrintMessage("Received Message Asynchronously:", args.Message);

    // -----------------------------------------------------------------
    // If the consumer's session is CLIENT_ACKNOWLEDGE, remember to
    // call args.Message.Acknowledge() to prevent the message from
    // getting redelivered, or consumer.Session.Recover() to force redelivery.
    // Similarly, if the consumer's session is TRANSACTED, remember to
    // call consumer.Session.Commit() to prevent the message from
    // getting redeliverd, or consumer.Session.Rollback() to force redeivery.
  }

  private void DemoAsyncNondurableTopicConsumerAutoAcknowledge()
  { 
    Console.WriteLine(
      NL + "-- DemoAsyncNondurableTopicConsumerAutoAcknowledge -- " + NL);

    // ------------------------------------------------
    // Make a network connection to WebLogic and login:
    // ------------------------------------------------

    IDictionary<string, Object> paramMap = new Dictionary<string, Object>();
    
    paramMap[Constants.Context.PROVIDER_URL] = 
      "t3://" + this.host + ":" + this.port;

    IContext context = ContextFactory.CreateContext(paramMap);

    try {
      // -------------------------------------
      // Look up our resources in the context:
      // -------------------------------------

      IConnectionFactory cf = context.LookupConnectionFactory(this.cfName);
  
      ITopic topic = (ITopic)context.LookupDestination(this.topicName);
  
      // --------------------------------------------------------------
      // Create a connection using the connection factory and start it:
      // --------------------------------------------------------------
  
      IConnection connection = cf.CreateConnection();

      // -----------------------------------------------------------------
      // Start the connection in order to allow receivers to get messages:
      // -----------------------------------------------------------------

      connection.Start();

      // ------------------------------------------
      // Create the asynchronous consumer delegate.   
      // ------------------------------------------
      // Create a session and a consumer; also designate a delegate 
      // that listens for messages that arrive asynchronously.  
      //
      // Unlike queue consumers, topic consumers must be created
      // *before* a message is sent in order to receive the message!
      //
      // IMPORTANT:  Sessions are not thread-safe.   We use multiple sessions 
      // in order to run the producer and async consumer concurrently.  The
      // consumer session and any of its producers and consumers 
      // can no longer be used outside of the OnMessage
      // callback once OnMessage is designated as its event handler, as
      // messages for the event handler may arrive in another thread.
      //
  
      ISession consumerSession = connection.CreateSession(
         Constants.SessionMode.AUTO_ACKNOWLEDGE);

      IMessageConsumer consumer = consumerSession.CreateConsumer(topic);

      consumer.Message += new MessageEventHandler(this.OnMessage);

      // -------------
      // Send Message:
      // -------------
      // Create a producer and send a non-persistent message.  Note
      // that even if the message were sent as persistent, it would be
      // automatically downgraded to non-persistent, as there are only
      // non-durable consumers subscribing to the topic.
      //

      ISession producerSession = connection.CreateSession(
         Constants.SessionMode.AUTO_ACKNOWLEDGE);

      IMessageProducer producer = producerSession.CreateProducer(topic);

      producer.DeliveryMode = Constants.DeliveryMode.NON_PERSISTENT;
  
      ITextMessage sendMessage = producerSession.CreateTextMessage(
                                   "My topic message");
  
      producer.Send(sendMessage);
  
      PrintMessage("Sent Message:", sendMessage);

      // -----------------
      // Wait for Message:
      // -----------------
      // Sleep for one second to allow the delegate time to receive and
      // automatically acknowledge the message.  The delegate will print
      // to the console when it receives the message.
      //

      Thread.Sleep(1000);


      // ---------
      // Clean Up:
      // ---------
      // We could just call connection.Close(), which would close
      // the connection's sessions, etc, or we could even just
      // call context.CloseAll(), but we want to demonstrate closing each
      // individual resource.
      //

      producer.Close();
      consumer.Close();
      producerSession.Close();
      consumerSession.Close();
      connection.Close();      

    } finally {

      // -------------------------------------------------------------
      // Close the context.  The CloseAll method closes the network
      // connection and any open JMS connections, sessions, producers,
      // or consumers.
      // -------------------------------------------------------------

      context.CloseAll();
    }
  }

  private void DemoSyncTopicDurableSubscriberClientAcknowledge() {

    Console.WriteLine(
      NL + "-- DemoSyncTopicDurableSubscriberClientAcknowledge -- " + NL);

    // ------------------------------------------------
    // Make a network connection to WebLogic and login:
    // ------------------------------------------------

    IDictionary<string, Object> paramMap = new Dictionary<string, Object>();
    
    paramMap[Constants.Context.PROVIDER_URL] = 
      "t3://" + this.host + ":" + this.port;

    IContext context = ContextFactory.CreateContext(paramMap);

    try {
      // -------------------------------------
      // Look up our resources in the context: 
      // -------------------------------------

      IConnectionFactory cf = context.LookupConnectionFactory(this.cfName);
  
      ITopic topic = (ITopic)context.LookupDestination(this.topicName);
  
      // -------------------------------------------------
      // Create a connection using the connection factory:
      // -------------------------------------------------
  
      IConnection connection = cf.CreateConnection();

      // --------------------------------------------
      // Assign a unique client-id to the connection:
      // --------------------------------------------
      // Durable subscribers must use a connection with an assigned
      // client-id.   Only one connection with a given client-id
      // can exist in a cluster at the same time.  An alternative
      // to using the API is to configure a client-id via connection
      // factory configuration.

      connection.ClientID = "MyConnectionID";
  
      // -----------------------------------------------------------------
      // Start the connection in order to allow consumers to get messages:
      // -----------------------------------------------------------------

      connection.Start();

      // -----------------
      // Create a session:
      // -----------------
      // IMPORTANT:  Sessions are not thread-safe.   Use multiple sessions 
      // if you need to run producers and/or consumers concurrently. For 
      // more information, see the asynchronous consumer example above.
      //
  
      ISession session = connection.CreateSession(
         Constants.SessionMode.CLIENT_ACKNOWLEDGE);

      // -----------------------------------------------
      // Create a durable subscription and its consumer.
      // -----------------------------------------------
      // Only one consumer at a time can attach to the durable
      // subscription for connection ID "MyConnectionID" and
      // subscription ID "MySubscriberID.
      //
      // Unlike queue consumers, topic consumers must be created
      // *before* a message is sent in order to receive the message!
      //

      IMessageConsumer consumer = session.CreateDurableSubscriber(
        topic, "MySubscriberID");

      // ------------------------------------------------
      // Create a producer and send a persistent message:
      // ------------------------------------------------

      IMessageProducer producer = session.CreateProducer(topic);

      producer.DeliveryMode = Constants.DeliveryMode.PERSISTENT;
  
      ITextMessage sendMessage = session.CreateTextMessage("My durable message");
  
      producer.Send(sendMessage);
  
      PrintMessage("Sent Message To Durable Subscriber:", sendMessage);

      // ----------------------------------------------------
      // Demonstrate closing and re-creating the consumer.
      //
      // The new consumer will implicitly connect to the durable
      // subscription created above, as we specify the same
      // connection id and subscription id.
      //
      // A durable subscription continues to exist and accumulate
      // new messages when it has no consumer, and even keeps
      // its persistent messages in the event of a client or server
      // crash and restart.
      //
      // Non-durable subscriptions and their messages cease to
      // exist when they are closed, or when their host server
      // shuts down or crashes.
      // ----------------------------------------------------

      consumer.Close();

      consumer = session.CreateDurableSubscriber(
        topic, "MySubscriberID");

      // -------------------------------------------------------------------
      // Demonstrate client acknowledge.  Get the message, force
      // it to redeliver, get it again, and then finally delete the message.
      // -------------------------------------------------------------------
      // In client ack mode "recover()" forces message redelivery, while
      // "acknowledge()" deletes the message.  If the client application
      // crashes or closes without acknowledging a message, it will be 
      // redelivered.

      ITextMessage recvMessage = (ITextMessage)consumer.Receive(500);

      PrintMessage("Durable Subscriber Received Message:", recvMessage);

      session.Recover(); 

      recvMessage = (ITextMessage)consumer.Receive(500);

      PrintMessage("Durable Subscriber Received Message Again:", recvMessage);

      recvMessage.Acknowledge();

      // ------------------------------------------------------------
      // Delete the durable subscription, otherwise it would continue
      // to exist after the demo exits.
      // ------------------------------------------------------------
      //

      consumer.Close();  // closes consumer, but doesn't delete subscription

      session.Unsubscribe("MySubscriberID"); // deletes subscription
  
      // ------------------------------------------------------------------
      // Close the connection.   Note that closing a connection also closes
      // its child sessions, consumers, and producers.
      // ------------------------------------------------------------------

      connection.Close();      

    } finally {

      // ------------------------------------------------------------------
      // Close the context.  The CloseAll method closes the network
      // connection and all related open connections, sessions, producers,
      // and consumers.
      // ------------------------------------------------------------------

      context.CloseAll();
    }
  }

  private void PrintMessage(String header, IMessage msg) {
    string msgtext;

    if (msg is ITextMessage) 
      msgtext = " Text=" + ((ITextMessage)msg).Text + NL;
    else
      msgtext = " The message is not an ITextMessage";

    string dcProp =
       Constants.MessagePropertyNames.DELIVERY_COUNT_PROPERTY_NAME;

    System.Console.WriteLine(
      header + NL +
      " JMSMessageID=" + msg.JMSMessageID + NL +
      " JMSRedelivered=" + msg.JMSRedelivered + NL +
      " " + dcProp + "=" + msg.GetObjectProperty(dcProp) + NL +
      msgtext); 
  }

  private bool ParseCommandLine(string[] args)
  {
    int i = 0;
    try {
      for(i = 0; i < args.Length; i++) {
        if (args[i].Equals("-host")) {
          host = args[++i];
          continue;
        }
        if (args[i].Equals("-port")) {
          port = Convert.ToInt32(args[++i]);
          continue;
        }
        if (args[i].Equals("-cf")) {
          cfName = args[++i];
          continue;
        }
        if (args[i].Equals("-queue")) {
          queueName = args[++i];
          continue;
        }
        if (args[i].Equals("-topic")) {
          topicName = args[++i];
          continue;
        }
        if (args[i].Equals("-help") || args[i].Equals("-?")) {
          Console.WriteLine(USAGE);
          return false;
        }
        Console.WriteLine("Unrecognized parameter '" + args[i] + "'.");
        Console.WriteLine(USAGE);
        return false;
      }
    } catch (System.IndexOutOfRangeException) {
      Console.WriteLine(
        "Missing argument for " + args[i - 1] + "."
      ); 
      Console.WriteLine(USAGE);
      return false;
    } catch (FormatException) {
      Console.WriteLine(
        "Invalid argument '" + args[i] + "' for " + args[i - 1] + "."
      ); 
      Console.WriteLine(USAGE);
      return false;
    }  
    Console.WriteLine(
      "WebLogic JMS .NET Client Demo " + NL +
      NL +
      "Settings: " + NL +
      "  host     = " + host + NL +
      "  port     = " + port + NL +
      "  cf       = " + cfName + NL +
      "  queue    = " + queueName + NL +
      "  topic    = " + topicName + NL
    );
    return true;
  }
}