using System; using System.Collections; using System.Collections.Generic; using System.Threading; using WebLogic.Messaging; /// Demonstrate the WebLogic JMS .NET API. /// /// This command line program connects to WebLogic JMS and performs /// queue and topic messaging operations. It is supported with /// versions 10.3 and later. To compile the program, /// link it with "WebLogic.Messaging.dll". For usage information, /// run the program with "-help" as a parameter. /// /// /// Copyright 1996,2008, Oracle and/or its affiliates. All rights reserved. /// /// public class MessagingSample { private static string NL = Environment.NewLine; private string host = "localhost"; private int port = 7001; private string cfName = "weblogic.jms.ConnectionFactory"; private string queueName = "jms.queue.TestQueue1"; private string topicName = "jms.topic.TestTopic1"; private static string USAGE = "Usage: " + Environment.GetCommandLineArgs()[0] + NL + " [-host ] [-port ] " + NL + " [-cf ] " + NL + " [-queue ] [-topic ]"; public static void Main(string[] args) { try { MessagingSample ms = new MessagingSample(); // override defaults with command line arguments if (!ms.ParseCommandLine(args)) return; ms.DemoSyncQueueReceiveWithAutoAcknowledge(); ms.DemoAsyncNondurableTopicConsumerAutoAcknowledge(); ms.DemoSyncTopicDurableSubscriberClientAcknowledge(); } catch (Exception e) { Console.WriteLine(e); } } private void DemoSyncQueueReceiveWithAutoAcknowledge() { Console.WriteLine( NL + "-- DemoSyncQueueReceiveWithAutoAcknowledge -- " + NL); // ------------------------------------------------ // Make a network connection to WebLogic and login: // ------------------------------------------------ IDictionary paramMap = new Dictionary(); paramMap[Constants.Context.PROVIDER_URL] = "t3://" + this.host + ":" + this.port; IContext context = ContextFactory.CreateContext(paramMap); try { // ------------------------------------- // Look up our resources in the context: // ------------------------------------- IConnectionFactory cf = context.LookupConnectionFactory(this.cfName); IQueue queue = (IQueue)context.LookupDestination(this.queueName); // ------------------------------------------------- // Create a connection using the connection factory: // ------------------------------------------------- IConnection connection = cf.CreateConnection(); // ----------------------------------------------------------------- // Start the connection in order to allow receivers to get messages: // ----------------------------------------------------------------- connection.Start(); // ----------------- // Create a session: // ----------------- // IMPORTANT: Sessions are not thread-safe. Use multiple sessions // if you need to run producers and/or consumers concurrently. For // more information, see the asynchronous consumer example below. // ISession session = connection.CreateSession( Constants.SessionMode.AUTO_ACKNOWLEDGE); // ------------------------------------------------ // Create a producer and send a persistent message: // ------------------------------------------------ IMessageProducer producer = session.CreateProducer(queue); producer.DeliveryMode = Constants.DeliveryMode.PERSISTENT; ITextMessage sendMessage = session.CreateTextMessage("My q message"); producer.Send(sendMessage); PrintMessage("Sent Message:", sendMessage); // ---------------------------------------- // Create a consumer and receive a message: // ---------------------------------------- // The message will automatically be deleted from the server as the // consumer's session was created in AUTO_ACKNOWLEDGE mode. // IMessageConsumer consumer = session.CreateConsumer(queue); IMessage recvMessage = consumer.Receive(500); PrintMessage("Received Message:", recvMessage); // ------------------------------------------------------------------ // Close the connection. Note that closing a connection also closes // its child sessions, consumers, and producers. // ------------------------------------------------------------------ connection.Close(); } finally { // ------------------------------------------------------------------ // Close the context. The CloseAll method closes the network // connection and all related open connections, sessions, producers, // and consumers. // ------------------------------------------------------------------ context.CloseAll(); } } // Implement a MessageEventHandler delegate. It will receive // asynchronously delivered messages. public void OnMessage(IMessageConsumer consumer, MessageEventArgs args) { PrintMessage("Received Message Asynchronously:", args.Message); // ----------------------------------------------------------------- // If the consumer's session is CLIENT_ACKNOWLEDGE, remember to // call args.Message.Acknowledge() to prevent the message from // getting redelivered, or consumer.Session.Recover() to force redelivery. // Similarly, if the consumer's session is TRANSACTED, remember to // call consumer.Session.Commit() to prevent the message from // getting redeliverd, or consumer.Session.Rollback() to force redeivery. } private void DemoAsyncNondurableTopicConsumerAutoAcknowledge() { Console.WriteLine( NL + "-- DemoAsyncNondurableTopicConsumerAutoAcknowledge -- " + NL); // ------------------------------------------------ // Make a network connection to WebLogic and login: // ------------------------------------------------ IDictionary paramMap = new Dictionary(); paramMap[Constants.Context.PROVIDER_URL] = "t3://" + this.host + ":" + this.port; IContext context = ContextFactory.CreateContext(paramMap); try { // ------------------------------------- // Look up our resources in the context: // ------------------------------------- IConnectionFactory cf = context.LookupConnectionFactory(this.cfName); ITopic topic = (ITopic)context.LookupDestination(this.topicName); // -------------------------------------------------------------- // Create a connection using the connection factory and start it: // -------------------------------------------------------------- IConnection connection = cf.CreateConnection(); // ----------------------------------------------------------------- // Start the connection in order to allow receivers to get messages: // ----------------------------------------------------------------- connection.Start(); // ------------------------------------------ // Create the asynchronous consumer delegate. // ------------------------------------------ // Create a session and a consumer; also designate a delegate // that listens for messages that arrive asynchronously. // // Unlike queue consumers, topic consumers must be created // *before* a message is sent in order to receive the message! // // IMPORTANT: Sessions are not thread-safe. We use multiple sessions // in order to run the producer and async consumer concurrently. The // consumer session and any of its producers and consumers // can no longer be used outside of the OnMessage // callback once OnMessage is designated as its event handler, as // messages for the event handler may arrive in another thread. // ISession consumerSession = connection.CreateSession( Constants.SessionMode.AUTO_ACKNOWLEDGE); IMessageConsumer consumer = consumerSession.CreateConsumer(topic); consumer.Message += new MessageEventHandler(this.OnMessage); // ------------- // Send Message: // ------------- // Create a producer and send a non-persistent message. Note // that even if the message were sent as persistent, it would be // automatically downgraded to non-persistent, as there are only // non-durable consumers subscribing to the topic. // ISession producerSession = connection.CreateSession( Constants.SessionMode.AUTO_ACKNOWLEDGE); IMessageProducer producer = producerSession.CreateProducer(topic); producer.DeliveryMode = Constants.DeliveryMode.NON_PERSISTENT; ITextMessage sendMessage = producerSession.CreateTextMessage( "My topic message"); producer.Send(sendMessage); PrintMessage("Sent Message:", sendMessage); // ----------------- // Wait for Message: // ----------------- // Sleep for one second to allow the delegate time to receive and // automatically acknowledge the message. The delegate will print // to the console when it receives the message. // Thread.Sleep(1000); // --------- // Clean Up: // --------- // We could just call connection.Close(), which would close // the connection's sessions, etc, or we could even just // call context.CloseAll(), but we want to demonstrate closing each // individual resource. // producer.Close(); consumer.Close(); producerSession.Close(); consumerSession.Close(); connection.Close(); } finally { // ------------------------------------------------------------- // Close the context. The CloseAll method closes the network // connection and any open JMS connections, sessions, producers, // or consumers. // ------------------------------------------------------------- context.CloseAll(); } } private void DemoSyncTopicDurableSubscriberClientAcknowledge() { Console.WriteLine( NL + "-- DemoSyncTopicDurableSubscriberClientAcknowledge -- " + NL); // ------------------------------------------------ // Make a network connection to WebLogic and login: // ------------------------------------------------ IDictionary paramMap = new Dictionary(); paramMap[Constants.Context.PROVIDER_URL] = "t3://" + this.host + ":" + this.port; IContext context = ContextFactory.CreateContext(paramMap); try { // ------------------------------------- // Look up our resources in the context: // ------------------------------------- IConnectionFactory cf = context.LookupConnectionFactory(this.cfName); ITopic topic = (ITopic)context.LookupDestination(this.topicName); // ------------------------------------------------- // Create a connection using the connection factory: // ------------------------------------------------- IConnection connection = cf.CreateConnection(); // -------------------------------------------- // Assign a unique client-id to the connection: // -------------------------------------------- // Durable subscribers must use a connection with an assigned // client-id. Only one connection with a given client-id // can exist in a cluster at the same time. An alternative // to using the API is to configure a client-id via connection // factory configuration. connection.ClientID = "MyConnectionID"; // ----------------------------------------------------------------- // Start the connection in order to allow consumers to get messages: // ----------------------------------------------------------------- connection.Start(); // ----------------- // Create a session: // ----------------- // IMPORTANT: Sessions are not thread-safe. Use multiple sessions // if you need to run producers and/or consumers concurrently. For // more information, see the asynchronous consumer example above. // ISession session = connection.CreateSession( Constants.SessionMode.CLIENT_ACKNOWLEDGE); // ----------------------------------------------- // Create a durable subscription and its consumer. // ----------------------------------------------- // Only one consumer at a time can attach to the durable // subscription for connection ID "MyConnectionID" and // subscription ID "MySubscriberID. // // Unlike queue consumers, topic consumers must be created // *before* a message is sent in order to receive the message! // IMessageConsumer consumer = session.CreateDurableSubscriber( topic, "MySubscriberID"); // ------------------------------------------------ // Create a producer and send a persistent message: // ------------------------------------------------ IMessageProducer producer = session.CreateProducer(topic); producer.DeliveryMode = Constants.DeliveryMode.PERSISTENT; ITextMessage sendMessage = session.CreateTextMessage("My durable message"); producer.Send(sendMessage); PrintMessage("Sent Message To Durable Subscriber:", sendMessage); // ---------------------------------------------------- // Demonstrate closing and re-creating the consumer. // // The new consumer will implicitly connect to the durable // subscription created above, as we specify the same // connection id and subscription id. // // A durable subscription continues to exist and accumulate // new messages when it has no consumer, and even keeps // its persistent messages in the event of a client or server // crash and restart. // // Non-durable subscriptions and their messages cease to // exist when they are closed, or when their host server // shuts down or crashes. // ---------------------------------------------------- consumer.Close(); consumer = session.CreateDurableSubscriber( topic, "MySubscriberID"); // ------------------------------------------------------------------- // Demonstrate client acknowledge. Get the message, force // it to redeliver, get it again, and then finally delete the message. // ------------------------------------------------------------------- // In client ack mode "recover()" forces message redelivery, while // "acknowledge()" deletes the message. If the client application // crashes or closes without acknowledging a message, it will be // redelivered. ITextMessage recvMessage = (ITextMessage)consumer.Receive(500); PrintMessage("Durable Subscriber Received Message:", recvMessage); session.Recover(); recvMessage = (ITextMessage)consumer.Receive(500); PrintMessage("Durable Subscriber Received Message Again:", recvMessage); recvMessage.Acknowledge(); // ------------------------------------------------------------ // Delete the durable subscription, otherwise it would continue // to exist after the demo exits. // ------------------------------------------------------------ // consumer.Close(); // closes consumer, but doesn't delete subscription session.Unsubscribe("MySubscriberID"); // deletes subscription // ------------------------------------------------------------------ // Close the connection. Note that closing a connection also closes // its child sessions, consumers, and producers. // ------------------------------------------------------------------ connection.Close(); } finally { // ------------------------------------------------------------------ // Close the context. The CloseAll method closes the network // connection and all related open connections, sessions, producers, // and consumers. // ------------------------------------------------------------------ context.CloseAll(); } } private void PrintMessage(String header, IMessage msg) { string msgtext; if (msg is ITextMessage) msgtext = " Text=" + ((ITextMessage)msg).Text + NL; else msgtext = " The message is not an ITextMessage"; string dcProp = Constants.MessagePropertyNames.DELIVERY_COUNT_PROPERTY_NAME; System.Console.WriteLine( header + NL + " JMSMessageID=" + msg.JMSMessageID + NL + " JMSRedelivered=" + msg.JMSRedelivered + NL + " " + dcProp + "=" + msg.GetObjectProperty(dcProp) + NL + msgtext); } private bool ParseCommandLine(string[] args) { int i = 0; try { for(i = 0; i < args.Length; i++) { if (args[i].Equals("-host")) { host = args[++i]; continue; } if (args[i].Equals("-port")) { port = Convert.ToInt32(args[++i]); continue; } if (args[i].Equals("-cf")) { cfName = args[++i]; continue; } if (args[i].Equals("-queue")) { queueName = args[++i]; continue; } if (args[i].Equals("-topic")) { topicName = args[++i]; continue; } if (args[i].Equals("-help") || args[i].Equals("-?")) { Console.WriteLine(USAGE); return false; } Console.WriteLine("Unrecognized parameter '" + args[i] + "'."); Console.WriteLine(USAGE); return false; } } catch (System.IndexOutOfRangeException) { Console.WriteLine( "Missing argument for " + args[i - 1] + "." ); Console.WriteLine(USAGE); return false; } catch (FormatException) { Console.WriteLine( "Invalid argument '" + args[i] + "' for " + args[i - 1] + "." ); Console.WriteLine(USAGE); return false; } Console.WriteLine( "WebLogic JMS .NET Client Demo " + NL + NL + "Settings: " + NL + " host = " + host + NL + " port = " + port + NL + " cf = " + cfName + NL + " queue = " + queueName + NL + " topic = " + topicName + NL ); return true; } }