How do you implement a client that receives messages via a queue?

Jerry Smith

Client implementations vary with respect to set-up operations, depending on the JMS implementation. The following client, ObjectReceiver, works with the WebLogic Java application server, which supports JMS:

import javax.jms.*;
import java.util.Hashtable;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Hashtable;

public class ObjectReceiver
    implements MessageListener, Runnable {
  public static final String SERVER_URL = "t3://localhost:7001";
  public static final String JMS_FACTORY =
    "javax.jms.QueueConnectionFactory";
  public static final String QUEUE = "j2ee.jms.objectQueue";

  private Context c;
  private QueueConnectionFactory qcf;
  private QueueConnection con;
  private QueueSession session;
  private QueueReceiver receiver;
  private Queue queue;
  private boolean quit = false;
  private boolean readyToReceive;

  public static void main(String[] args) {
    if (!checkUsage(args))
      System.exit(-1);
    ObjectReceiver or =
      (args.length == 1) ?
        new ObjectReceiver(args[0]) :
        new ObjectReceiver();
    if (!or.isReadyToReceive()) {
      System.out.println("Not ready to receive.");
      System.exit(-1);
    }
    or.run();
    or.close();
  }

  public ObjectReceiver() {
    this(SERVER_URL);
  }

  public ObjectReceiver(String url) {
    readyToReceive = true;
    c = getInitialContextWebLogic(url);
    if (c == null)
      readyToReceive = false;
    else
      init();
  }

  public void init() {
    if (!readyToReceive)
      return;
    try {
      qcf = (QueueConnectionFactory) c.lookup(JMS_FACTORY);
      con = qcf.createQueueConnection();
      session = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
    }
    catch (Exception e) {
      readyToReceive = false;
      System.out.println("Can't initialize object receiving.");
      closeContext();
      return;
    }
    try {
      queue = (Queue) c.lookup(QUEUE);
    }
    catch (NamingException ne) {
      try {
        queue = session.createQueue(QUEUE);
        c.bind(QUEUE, queue);
      }
      catch (Exception e) {
        readyToReceive = false;
        closeConnection();
        return;
      }
    }
    try {
      receiver = session.createReceiver(queue);
      receiver.setMessageListener(this);
      con.start();
    }
    catch (Exception e) {
      readyToReceive = false;
      closeConnection();
      return;
    }
  }

  public void close() {
    if (!readyToReceive)
      return;
    try {
      receiver.close();
      closeConnection();
    }
    catch (Exception e) {
      System.out.println("Can't close connections.");
    }
  }

  private void closeConnection() {
    if (!readyToReceive)
      return;
    try {
      session.close();
      con.close();
      closeContext();
    }
    catch (Exception e) {
      System.out.println("Can't close connection.");
    }
  }

  public void closeContext() {
    if (!readyToReceive)
      return;
    try {
     c.close();
    }
    catch (Exception e) {
      System.out.println("Can't close JNDI context.");
    }
  }

  public boolean isReadyToReceive() {
    return readyToReceive;
  }

  public void run() {
    if (!readyToReceive)
      return;
    synchronized (this) {
      while (!quit) {
        try { wait(); }
        catch (InterruptedException ie) { }
      }
    }
  }

  public void onMessage(Message msg) {
    if (!readyToReceive)
      return;
    Runnable obj = null;
    try {
      if (msg instanceof ObjectMessage) {
        try {
          obj = (Runnable) ((ObjectMessage) msg).getObject();
        }
        catch (Exception e) {
          System.out.println("Message is not an object!");
          return;
        }
        try {
          if (obj != null)
            obj.run();
        }
        catch (Exception e) {
          System.out.println("Can't execute the arriving object.");
        }
      }
      else {
        System.out.println("Message should be an 'ObjectMessage'.");
        System.out.println(msg.toString());
      }
      if (msg == null || obj == null) {
        synchronized (this) {
          quit = true;
          notifyAll();
        }
      }
    }
    catch (Exception e) {
      System.out.println("Can't receive message:  " + e);
    }
  }

  private Context getInitialContextWebLogic(String url) {
    try {
      Hashtable h = new Hashtable();
      h.put(Context.INITIAL_CONTEXT_FACTORY,
        "weblogic.jndi.WLInitialContextFactory");
      h.put("weblogic.jndi.createIntermediateContexts", "true");
      h.put(Context.PROVIDER_URL, url);
      return new InitialContext(h);
    }
    catch (Exception e) {
      System.out.println("Can't get JNDI context.");
      return null;
    }
  }

  public static boolean checkUsage(String[] args) {
    if (args.length > 1) {
      System.out.println("Usage:  java ObjectReceiver [<server-url>]");
      return false;
    }
    return true;
  }
}
0 Comments  (click to add your comment)
Comment and Contribute

 

 

 

 

 


(Maximum characters: 1200). You have 1200 characters left.

 

 

About | Sitemap | Contact