Java Library

The information provided in this section applies to Java library.

Topics:

Create Resources

The example shows a command-line program that creates the following resources when the program is started and its main method is run:

  • Two queues named Preprocessing and Postprocessing

  • One topic named Incoming

  • One message push listener named Forwarder, which receives messages from the topic named Incoming and sends them to the queue named Preprocessing

package oracle.cloud.messaging.samples;
 
import oracle.cloud.messaging.*;
import oracle.cloud.messaging.client.*;
import oracle.cloud.messaging.common.*;
  
import java.io.*;
 
public class CreateResources {
 
    public static void main(String[] argv) {
 
        MessagingServiceFactory factory = MessagingServiceFactory.getInstance();
        try {
 
            Namespace ns = new MessagingServiceNamespace("https://messaging.us2.oraclecloud.com/mymessaging-john");
            Credentials creds = new MessagingServiceCredentials("john.doe@oracle.com", "fFHG04x7");
            MessagingService ms = factory.getMessagingService(ns, creds);
 
            try {
                ms.createQueue("Preprocessing");                
            } catch (DestinationExistsException ex) {
                System.out.println("Preprocessing queue already exists.");
            }
 
            try {
                ms.createQueue("Postprocessing");
            } catch (DestinationExistsException ex) {
                System.out.println("Postprocessing queue already exists.");
            }
 
            try {
                ms.createTopic("Incoming");
            } catch (DestinationExistsException ex) {
                System.out.println("Incoming already exists.");
            }
 
            String name = "Forwarder";
            Medium topic = Medium.getMedium(MediumType.TOPIC,"Incoming");
            PushMedium target = PushMedium.getInstance(Medium.getMedium(MediumType.QUEUE,"Preprocessing"));
            MessagePushListener mpl = MessagePushListener.getInstance(name,
                                                                      topic,
                                                                      null,     // No selector
                                                                      target,
                                                                      null      // No failure policy
                                                                     );
            try {
                ms.createListener(null,     // No verification token; allowed because no URLs pushed to
                                  mpl
                                 );                
            } catch (ListenerExistsException ex) {
                System.out.println("Forwarder MPL already exists");
            }
                        
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    } 
}

Send a Message to a Topic

The example shows a JAX-RS web service that sends a TextMessage to a topic named Incoming. The body of the HTTP request is used as the body of the TextMessage.

package oracle.cloud.messaging.samples;
 
import javax.ws.rs.*;
import javax.ws.rs.core.*;
 
import javax.jms.*;
import oracle.cloud.messaging.*;
import oracle.cloud.messaging.client.*;
import oracle.cloud.messaging.common.*;
 
import java.io.*;
 
@Path("/sendMessageToTopic")
public class sendMessageToTopic {
    
  @PUT
  @Produces(MediaType.TEXT_PLAIN)
  @Consumes(MediaType.TEXT_PLAIN)
  public String sendMessage(String body) {
 
      Connection conn = null;
      MessagingServiceFactory factory = MessagingServiceFactory.getInstance();
      try {
 
          Namespace ns = new MessagingServiceNamespace("https://messaging.us2.oraclecloud.com/mymessaging-john");
          Credentials creds = new MessagingServiceCredentials("john.doe@oracle.com", "fFHG04x7");
          MessagingService ms = factory.getMessagingService(ns, creds);
          
          ConnectionFactory cf = ms.getConnectionFactory();
          conn = cf.createConnection();
          conn.start();
          Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
          Topic topic = session.createTopic("Incoming");  
          MessageProducer producer = session.createProducer(topic);         
          Message message = session.createTextMessage(body);
          producer.send(message);
      } catch (Exception ex) {
 
          StringWriter sw = new StringWriter();
          PrintWriter pw = new PrintWriter(sw);
          ex.printStackTrace(pw);
          return sw.toString();
      } finally {
 
          try {
              if (conn != null) {
                  conn.close();
              }
          } catch (Exception jmsex) {
              jmsex.printStackTrace();
          }
      }
 
      return "Message sent to topic successfully \n";
      
  }
 
} 

Receive a Message from a Queue with an Optional Selector

The example shows a JAX-RS web service that receives a message from a queue named Postprocessing.

If the optional query parameter palindromeOnly=true is passed with the request, the consumer will use a selector to receive only messages that have the boolean message property palindrome set to true. If a message was received from the queue, the body of the message is returned in the body of the HTTP response.

package oracle.cloud.messaging.samples;
 
import javax.ws.rs.*;
import javax.ws.rs.core.*;
 
import javax.jms.*;
import oracle.cloud.messaging.*;
import oracle.cloud.messaging.client.*;
import oracle.cloud.messaging.common.*;
 
import java.io.*;
 
@Path("/receiveMessageFromQueue")
public class receiveMessageFromQueue {
    
  @POST
  @Produces(MediaType.TEXT_PLAIN)
  @Consumes(MediaType.TEXT_PLAIN)
  public String receiveMessage(@QueryParam("palindromeOnly") String palindromeOnly) {
 
      Connection conn = null;
      MessagingServiceFactory factory = MessagingServiceFactory.getInstance();
      try {
 
          Namespace ns = new MessagingServiceNamespace("https://messaging.us2.oraclecloud.com/mymessaging-john");
          Credentials creds = new MessagingServiceCredentials("john.doe@oracle.com", "fFHG04x7");
          MessagingService ms = factory.getMessagingService(ns, creds);
          
          ConnectionFactory cf = ms.getConnectionFactory();
          conn = cf.createConnection();
          conn.start();
          Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
          Queue queue = session.createQueue("Postprocessing");
          MessageConsumer consumer;
  
          if(palindromeOnly != null && palindromeOnly.equals("true")) {
              consumer = session.createConsumer(queue, "palindrome");
          } else {
              consumer = session.createConsumer(queue);
          }
 
          Message message = consumer.receive(1000);
 
          if (message != null) {
              if (message instanceof TextMessage) {
                  return (((TextMessage) message).getText());                  
              } else {
                  return "A message not of type TextMessage was received\n";
              }
          } else {
              return "No message on queue\n";
          }
 
      } catch (Exception ex) {
          StringWriter sw = new StringWriter();
          PrintWriter pw = new PrintWriter(sw);
          ex.printStackTrace(pw);
          return sw.toString();
      } finally {
          try {
              if (conn != null) {
                  conn.close();
              }
          } catch (Exception jmsex) {
              jmsex.printStackTrace();
          }
      }
      
  }
 
} 

Asynchronously Receive Messages with a Durable Subscription

The example shows a command-line program that also implements the MessageListener interface.

When the program is started and its main method is run, a durable subscription named audit is either created or reconnected to if it already exists. Messages are asynchronously received from the durable subscription and printed until an input is made to the program's standard input (System.in).

package oracle.cloud.messaging.samples;
 
import javax.jms.*;
 
import oracle.cloud.messaging.*;
import oracle.cloud.messaging.client.*;
import oracle.cloud.messaging.common.*;
 
import java.io.*;
 
public class AsyncReceiveFromDurableSubscription implements MessageListener {
 
    public static void main(String[] argv) {
 
        Connection conn = null;
 
        try {
            MessagingServiceFactory factory = MessagingServiceFactory.getInstance();
 
            Namespace ns = new MessagingServiceNamespace("https://messaging.us2.oraclecloud.com/mymessaging-john");
            Credentials creds = new MessagingServiceCredentials("john.doe@oracle.com", "fFHG04x7");
            MessagingService ms = factory.getMessagingService(ns, creds);
 
            ConnectionFactory cf = ms.getConnectionFactory();
            conn = cf.createConnection();
            conn.setClientID("AuditClient");
            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Topic topic = session.createTopic("Incoming");            
            MessageConsumer consumer = session.createDurableSubscriber(topic, "audit");            
 
            consumer.setMessageListener(new AsyncReceiveFromDurableSubscription());
            conn.start();
 
            System.out.println("Hit RETURN to exit");
            System.in.read(new byte[1024]);
        } catch (Exception ex) { 
            ex.printStackTrace();
        } finally {
            if (conn != null) {
                try {
                    conn.close();
                } catch (Exception jmsex) {
                    jmsex.printStackTrace();
                }
            }
        }
    }
 
    @Override
    public void onMessage(Message message) {
        if (!(message instanceof TextMessage)) {
            System.err.println("A message not of type TextMessage was received");
        } else {
            try {
                System.out.println("Message Received from durable subscription: " + 
                                   ((TextMessage)message).getText());
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    } 
}

Asynchronously Process Messages Within a Transaction

The example shows a command-line program that also implements the MessageListener interface.

When the program is started and its main method is run, a transacted session is created. Messages are asynchronously received from the queue named Preprocessing via the MessageListener. When a message is received, the contents of the message are reversed. A new message is created from the reversed contents of the original message and sent to the queue named Postprocessing. If the original message and the reversed message are identical, then a boolean message property named palindrome is set to true. After the send completes, the transacted session is committed. Messages are processed until an input is made to the program's standard input (System.in).

package oracle.cloud.messaging.samples;
 
import javax.jms.*;
 
import oracle.cloud.messaging.*;
import oracle.cloud.messaging.client.*;
import oracle.cloud.messaging.common.*;
 
import java.io.*;
 
public class AsyncTransactionProcessing implements MessageListener {
 
    private Session session;
    private MessageProducer producer;
 
    public AsyncTransactionProcessing(Session session, MessageProducer producer) {
        this.session = session;
        this.producer = producer;
    }
 
    public static void main(String[] argv) {
 
        Connection conn = null;
 
        try {
            MessagingServiceFactory factory = MessagingServiceFactory.getInstance();
            Namespace ns = new MessagingServiceNamespace("https://messaging.us2.oraclecloud.com/mymessaging-john");
            Credentials creds = new MessagingServiceCredentials("john.doe@oracle.com", "fFHG04x7");
            MessagingService ms = factory.getMessagingService(ns, creds);
 
            ConnectionFactory cf = ms.getConnectionFactory();
            conn = cf.createConnection();
            Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
            Queue preprocessingQueue = session.createQueue("Preprocessing");            
            MessageConsumer consumer = session.createConsumer(preprocessingQueue);
            Queue postprocessingQueue = session.createQueue("Postprocessing");
            MessageProducer producer = session.createProducer(postprocessingQueue);
 
            consumer.setMessageListener(new AsyncTransactionProcessing(session,producer));
            conn.start();
 
            System.out.println("Hit RETURN to exit");
            System.in.read(new byte[1024]);
        } catch (Exception ex) { 
            ex.printStackTrace();
        } finally {
            if (conn != null) {
                try {
                    conn.close();
                } catch (Exception jmsex) {
                    jmsex.printStackTrace();
                }
            }
        }
    }
 
    @Override
    public void onMessage(Message message) {
 
        if (!(message instanceof TextMessage)) {
            System.err.println("A message not of type TextMessage was received");
            return;
        }
 
        try {
 
            String body = ((TextMessage)message).getText();            
            String reversed_body = new StringBuilder(body).reverse().toString();
            TextMessage outgoingMessage = session.createTextMessage(reversed_body);
            
            if (body.equals(reversed_body)) {
                outgoingMessage.setBooleanProperty("palindrome", true);
            }
 
            producer.send(outgoingMessage);
            session.commit();
        } catch (Exception ex) {
            try {
                session.rollback();
                ex.printStackTrace();
            } catch (JMSException jmsex) {
                jmsex.printStackTrace();
            }
        }
    }
 
}

Use Message Groups

The example explains the step-by-step process to send a large message as a group of smaller messages. The example includes sample code to demonstrate how messages can be grouped and sequenced for the consumer.

Steps

  1. Create a queue.

  2. Create the consumers.

  3. Create a producer.

  4. Send a large message over the queue:
    1. Divide the large message into multiple smaller messages.

    2. Set the message groupId and groupSeq.

    3. Send the messages over the queue.

  5. Consume the messages.

    Consolidate the messages with the same groupId into a single large message/file.

  6. Send a large message over the queue.

  7. Divide the large message into multiple smaller messages.

  8. Set the message groupId and groupSeq.

Here’s a sample code:

import java.io.*;
import javax.jms.*;
import oracle.cloud.messaging.client.*;
import oracle.cloud.messaging.common.DestinationExistsException;


/**
 *  Sends a file in chunks, optionally grouped using
 *  message properties.
 */
public class Sender
{
    private static void send(
        String fileName,
        String messageGroupID,
        MessageProducer prod,
        Session sess
    )
        throws Exception
    {
        FileReader in = null;

        try
        {
            int messageGroupSeq = 1;

            in = new FileReader(fileName);

            char[] buffer = new char[102400];

            for(
                int numRead = in.read(buffer);
                numRead > 0;
                numRead = in.read(buffer)
            )
            {
                TextMessage tmessage =
                    sess.createTextMessage(new String(buffer,0,numRead));

                if (messageGroupID != null)
                {
                    tmessage.setStringProperty("JMSXGroupID",messageGroupID);
                    tmessage.setIntProperty("JMSXGroupSeq",messageGroupSeq++);
                }

                System.err.printf("Sending %d character\n",numRead);
                System.err.flush();

                prod.send(tmessage);
            }

            System.err.printf("Sending EOF messages\n");
            System.err.flush();

            for(
                int i = 0;
                i < 5;
                i++
            )
            {
                TextMessage tmessage = sess.createTextMessage();

                if (messageGroupID != null)
                {
                    tmessage.setStringProperty("JMSXGroupID",messageGroupID);
                    tmessage.setIntProperty("JMSXGroupSeq",messageGroupSeq++);
                }

                prod.send(tmessage);
            }
        }
        finally
        {
            if (in != null)
            {
                in.close();
            }
        }
    }

    public static void main(String[] argv)
    {
        if (argv.length < 5)
        {
            System.err.printf(
                "<URL> <user> <password> <queue name> <file name> [<message group ID>]\n"
            );
            return;
        }

        OracleCloudConnection conn = null;

        try
        {
            OracleCloudConnectionFactory fact =
                MessagingServiceFactory
                    .getInstance()
                    .getMessagingService(
                        new MessagingServiceNamespace(argv[0]),
                        new MessagingServiceCredentials(argv[1],argv[2])
                    )
                    .getConnectionFactory();

            conn = fact.createConnection();

            OracleCloudSession sess =
                conn.createSession(
                    TransactionMode.NON_TRANSACTED,
                    AcknowledgementMode.AUTO_ACKNOWLEDGE
                );

            OracleCloudQueue q = sess.createQueue(argv[3]);

            OracleCloudMessageProducer prod = sess.createProducer(q);
            prod.setTimeToLive(TimeToLive.timeInMilliseconds(10000));

            conn.start();

            Sender.send(
                argv[4],
                ((argv.length >= 6) ? argv[5] : null),
                prod,
                sess
            );
        }
        catch(Exception exc)
        {
            exc.printStackTrace();
        }
        finally
        {
            if (conn != null)
            {
                try
                {
                    conn.close();
                }
                catch(Exception exc)
                {
                    exc.printStackTrace();
                }
            }
        }
    }
}

/*
 *  Receives a file sent in chunks and accumulates them
 *  into a file.
 */
public class Receiver
{
    public static void main(String[] argv)
    {
        if (argv.length < 5)
        {
            System.err.printf(
                "<URL> <user> <password> <queue name> <file name for output>\n"
            );
            return;
        }

        OracleCloudConnection conn = null;

        try
        {
            OracleCloudConnectionFactory fact =
                MessagingServiceFactory
                    .getInstance()
                    .getMessagingService(
                        new MessagingServiceNamespace(argv[0]),
                        new MessagingServiceCredentials(argv[1],argv[2])
                    )
                    .getConnectionFactory();

            conn = fact.createConnection();

            OracleCloudSession sess =
                conn.createSession(
                    TransactionMode.NON_TRANSACTED,
                    AcknowledgementMode.AUTO_ACKNOWLEDGE
                );

            OracleCloudQueue q = sess.createQueue(argv[3]);

            OracleCloudMessageConsumer cons = sess.createConsumer(q);

            Accumulator accumulator = new Accumulator(cons,argv[4]);

            conn.start();

            accumulator.start();

            System.err.printf("RETURN to stop receiving\n");
            System.err.flush();
            System.in.read(new byte[32]);

            accumulator.interrupt();
            while(accumulator.isAlive());
        }
        catch(Exception exc)
        {
            exc.printStackTrace();
        }
        finally
        {
            if (conn != null)
            {
                try
                {
                    conn.close();
                }
                catch(Exception exc)
                {
                    exc.printStackTrace();
                }
            }
        }
    }
}

/**
 *  This creates the queue to use for the demo, and then
 *  deletes it after a RETURN on the console.  It can
 *  probably be omitted from the sample code.
 */
public class Initializer
{
    public static void main(String[] argv) throws Exception
    {
        if (argv.length < 4)
        {
            System.err.printf(
                "<URL> <user> <password> <queue name>\n"
            );
            return;
        }

        MessagingService ms =
            MessagingServiceFactory
                .getInstance()
                .getMessagingService(
                    new MessagingServiceNamespace(argv[0]),
                    new MessagingServiceCredentials(argv[1],argv[2])
                );

        try
        {
            ms.createQueue(argv[3]);
        }
        catch(DestinationExistsException deexc)
        {
            // Already exists; ignore
        }

        System.err.printf("RETURN to delete the queue");
        System.in.read(new byte[32]);

        ms.deleteQueue(argv[3]);
    }
}

import java.io.*;
import javax.jms.*;

/*
 *  Thread that repeatedly consumes from a queue,
 *  concatenating received text payloads into a file until
 *  it receives an empty payload, at which point it closes
 *  the file.
 */
public class Accumulator extends Thread
{
    private MessageConsumer consumer = null;
    private byte[] buffer = new byte[102400];
    private FileWriter out = null;
    private String outName = null;

    public Accumulator(MessageConsumer consumer, String outName)
    {
        this.outName = outName;
        this.consumer = consumer;
    }

    @Override
    public void run()
    {
        while(!Thread.currentThread().isInterrupted())
        {
            try
            {
                this.process();
            }
            catch(Exception exc)
            {
                exc.printStackTrace();
                break;
            }
        }
    }

    public void process() throws IOException, JMSException
    {
        Message message = this.consumer.receive(1000);

        if (message instanceof TextMessage)
        {
            TextMessage tmessage = (TextMessage)message;

            String payload = tmessage.getText();

            if (payload == null)
            {
                if (this.out != null)
                {
                    System.err.printf(
                        "Closing file '%s'\n",
                        this.outName
                    );
                    System.err.flush();

                    this.out.close();
                    this.out = null;
                }
            }
            else
            {
                if (this.out == null)
                {
                    System.err.printf(
                        "Opening file '%s'\n",
                        this.outName
                    );
                    System.err.flush();

                    this.out = new FileWriter(this.outName);
                }

                System.err.printf(
                    "Writing %d chars to '%s'\n",
                    payload.length(),
                    this.outName
                );

                this.out.write(payload);
                this.out.flush();
            }
        }
    }
}

Receive Messages from a Queue Using a MessageListener

This example shows sample code to receive messages from a queue using a MessageListener.

The following is a command-line client to set up the MessageListener:

package oracle.cloud.messaging.demo;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;

import oracle.cloud.messaging.client.MessagingService;
import oracle.cloud.messaging.client.MessagingServiceCredentials;
import oracle.cloud.messaging.client.MessagingServiceFactory;
import oracle.cloud.messaging.client.MessagingServiceNamespace;

import oracle.cloud.messaging.MessagingException;

/**
 *  Oracle Messaging Service client code to receive
 *  messages from a queue using a MessageListener.
 */
public class MessageToFileClient
{
    private String urlWithNamespace = null;
    private String user = null;
    private String password = null;
    private String queueName = null;
    private String dir = null;

    private boolean started = false;

    private Connection connection = null;
    private Session session = null;
    private MessageConsumer consumer = null;

    /**
     *  @param urlWithNamespace
     *      Namespace URL for the messaging service
     *      instance to use
     *
     *  @param user
     *      User with admin privileges for the service
     *      instance
     *
     *  @param password
     *      Password for the user
     *
     *  @param queueName
     *      Name of the queue for the listener to listen on
     *
     *  @param dir
     *      Name of the directory into which to put text
     *      message payloads as files
     */
    public MessageToFileClient(
        String urlWithNamespace,
        String user,
        String password,
        String queueName,
        String dir
    )
    {
        if (urlWithNamespace == null)
        {
            throw new IllegalArgumentException("urlWithNamespace == null");
        }

        if (user == null)
        {
            throw new IllegalArgumentException("user == null");
        }

        if (password == null)
        {
            throw new IllegalArgumentException("password == null");
        }

        if (queueName == null)
        {
            throw new IllegalArgumentException("queueName == null");
        }

        if (dir == null)
        {
            throw new IllegalArgumentException("dir == null");
        }

        this.urlWithNamespace = urlWithNamespace;
        this.user = user;
        this.password = password;
        this.queueName = queueName;
        this.dir = dir;
    }

    /**
     *  Start the client. If already started, this is
     *  a no-op. On return, the client is set up and
     *  listening.
     */
    public void start() throws MessagingException, JMSException
    {
        synchronized(this)
        {
            if (this.started)
            {
                return;
            }

            MessagingServiceFactory factory = MessagingServiceFactory.getInstance();

            MessagingServiceNamespace ns =
                new MessagingServiceNamespace(this.urlWithNamespace);
            MessagingServiceCredentials cred =
                new MessagingServiceCredentials(this.user,this.password);

            MessagingService ms = factory.getMessagingService(ns,cred);

            ConnectionFactory cf = ms.getConnectionFactory();

            this.connection = cf.createConnection();

            this.session =
                this.connection.createSession(
                    false,      // Not transacted
                    Session.AUTO_ACKNOWLEDGE
                );

            Queue q = this.session.createQueue(this.queueName);

            this.consumer = this.session.createConsumer(q);

            this.consumer
                .setMessageListener(
                    new MessageToFileListener(this.dir)
                );

            this.connection.start();

            this.started = true;
        }
    }

    /**
     *  Return whether the client is started.
     */
    public boolean isStarted()
    {
        synchronized(this)
        {
            return this.started;
        }
    }

    /**
     *  Pause the listener. This will cause the
     *  listener to stop receiving messages until {@link
     *  #restart()} is called. If the client has not been
     *  started, IllegalStateException is thrown.
     */
    public void pause() throws JMSException
    {
        synchronized(this)
        {
            if (this.started)
            {
                this.connection.stop();
            }
            else
            {
                throw new IllegalStateException("Client unstarted");
            }
        }
    }

    /**
     *  Make the listener resume receiving messages.
     *  If the listener is not paused, this is a
     *  no-op. If the client has not been started,
     *  IllegalStateException is thrown.
     */
    public void restart() throws JMSException
    {
        synchronized(this)
        {
            if (this.started)
            {
                this.connection.start();
            }
            else
            {
                throw new IllegalStateException("Client unstarted");
            }
        }
    }

    /**
     *  Stop the client. If the client has not been
     *  started, this is a no-op. Once the client has
     *  been stopped, it cannot be re-started.
     */
    public void stop() throws JMSException
    {
        synchronized(this)
        {
            if (this.started)
            {
                this.connection.close();
            }
        }
    }

    /**
     *  Run the client from the command line. The first
     *  5 arguments to the command line are the 5 inputs
     *  to the constructor, in order. After starting,
     *  the client will run until a newline is input to
     *  System.in, after which
     *  it will stop itself.
     */
    public static void main(String[] argv) throws Exception
    {
        MessageToFileClient client =
            new MessageToFileClient(
                argv[0],    // urlWithNamespace,
                argv[1],    // user,
                argv[2],    // password,
                argv[3],    // queueName,
                argv[4]     // dir
            );

        System.err.printf("Starting client ... ");
        System.err.flush();
        client.start();
        System.err.printf("started\n");
        System.err.flush();

        byte[] buffer = new byte[1024];

        System.in.read(buffer);

        System.err.printf("Stopping client ... ");
        System.err.flush();
        client.stop();
        System.err.printf("stopped\n");
        System.err.flush();
    }

    protected void finalize() throws Throwable
    {
        try
        {
            this.stop();
        }
        finally
        {
            super.finalize();
        }
    }
}

The following is the MessageListener class:

package oracle.cloud.messaging.demo;

import java.io.File;
import java.io.FileOutputStream;

import java.util.UUID;

import java.util.logging.Level;
import java.util.logging.Logger;

import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;

import oracle.cloud.messaging.client.HttpContent;

/**
 *  Client-side asynchronous message listener. This
 *  listener assumes that the messages it receives are
 *  ObjectMessages with HttpContent payloads, as would
 *  be the case if they had been sent via the REST
 *  API with message type HTTP, or no message type.
 *  Any messages not of this form are logged and discarded.
 *  The listener puts the body content of messages it processes
 *  into files in a specified directory.
 */
public class MessageToFileListener implements MessageListener
{
    private static final Logger logger =
        Logger.getLogger(MessageToFileListener.class.getName());

    private String dir = null;

    /**
     *  @param dir
     *      Path to the directory in which files containing
     *      message payloads will be put; may not be null
     */
    public MessageToFileListener(String dir)
    {
        if (dir == null)
        {
            throw new IllegalArgumentException("dir == null");
        }

        this.dir = dir;
    }

    @Override
    public void onMessage(Message message)
    {
        if (message instanceof ObjectMessage)
        {
            ObjectMessage omessage = (ObjectMessage)message;
            byte[] body = null;

            try
            {
                Object payload = omessage.getObject();

                if (payload instanceof HttpContent)
                {
                    String type = ((HttpContent)payload).getContentType();
                    body = ((HttpContent)payload).getContent();

                    System.err.printf("Got object message with '%s' content:\n",type);
                    System.err.flush();
                    System.err.write(body);

                    FileOutputStream out =
                        new FileOutputStream(
                            this.dir +
                            File.separator +
                            UUID.randomUUID().toString() +
                            ".dat"
                        );

                    out.write(body);
                    out.flush();
                    out.close();
                }
                else
                {
                    MessageToFileListener.logger.log(
                        Level.SEVERE,
                        "Message delivered to listener is an ObjectMessage, but payload is not HttpContent; payload class is '" +
                        payload.getClass().getName() +
                        "'"
                    );
                }
            }
            catch(Exception exc)
            {
                MessageToFileListener.logger.log(
                    Level.SEVERE,
                    "Exception writing message to file",
                    exc
                );
            }
        }
        else
        {
            MessageToFileListener.logger.log(
                Level.SEVERE,
                "Message delivered to listener is not an ObjectMessage; class is '" +
                message.getClass().getName() +
                "'"
            );
        }
    }
}