How do you implement a client that receives messages via a queue?
Created May 4, 2012
Jerry Smith
Client implementations vary with respect to set-up operations, depending on the JMS implementation. The following client, ObjectReceiver, works with the WebLogic Java application server, which supports JMS:
import javax.jms.*; import java.util.Hashtable; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; import java.util.Hashtable; public class ObjectReceiver implements MessageListener, Runnable { public static final String SERVER_URL = "t3://localhost:7001"; public static final String JMS_FACTORY = "javax.jms.QueueConnectionFactory"; public static final String QUEUE = "j2ee.jms.objectQueue"; private Context c; private QueueConnectionFactory qcf; private QueueConnection con; private QueueSession session; private QueueReceiver receiver; private Queue queue; private boolean quit = false; private boolean readyToReceive; public static void main(String[] args) { if (!checkUsage(args)) System.exit(-1); ObjectReceiver or = (args.length == 1) ? new ObjectReceiver(args[0]) : new ObjectReceiver(); if (!or.isReadyToReceive()) { System.out.println("Not ready to receive."); System.exit(-1); } or.run(); or.close(); } public ObjectReceiver() { this(SERVER_URL); } public ObjectReceiver(String url) { readyToReceive = true; c = getInitialContextWebLogic(url); if (c == null) readyToReceive = false; else init(); } public void init() { if (!readyToReceive) return; try { qcf = (QueueConnectionFactory) c.lookup(JMS_FACTORY); con = qcf.createQueueConnection(); session = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); } catch (Exception e) { readyToReceive = false; System.out.println("Can't initialize object receiving."); closeContext(); return; } try { queue = (Queue) c.lookup(QUEUE); } catch (NamingException ne) { try { queue = session.createQueue(QUEUE); c.bind(QUEUE, queue); } catch (Exception e) { readyToReceive = false; closeConnection(); return; } } try { receiver = session.createReceiver(queue); receiver.setMessageListener(this); con.start(); } catch (Exception e) { readyToReceive = false; closeConnection(); return; } } public void close() { if (!readyToReceive) return; try { receiver.close(); closeConnection(); } catch (Exception e) { System.out.println("Can't close connections."); } } private void closeConnection() { if (!readyToReceive) return; try { session.close(); con.close(); closeContext(); } catch (Exception e) { System.out.println("Can't close connection."); } } public void closeContext() { if (!readyToReceive) return; try { c.close(); } catch (Exception e) { System.out.println("Can't close JNDI context."); } } public boolean isReadyToReceive() { return readyToReceive; } public void run() { if (!readyToReceive) return; synchronized (this) { while (!quit) { try { wait(); } catch (InterruptedException ie) { } } } } public void onMessage(Message msg) { if (!readyToReceive) return; Runnable obj = null; try { if (msg instanceof ObjectMessage) { try { obj = (Runnable) ((ObjectMessage) msg).getObject(); } catch (Exception e) { System.out.println("Message is not an object!"); return; } try { if (obj != null) obj.run(); } catch (Exception e) { System.out.println("Can't execute the arriving object."); } } else { System.out.println("Message should be an 'ObjectMessage'."); System.out.println(msg.toString()); } if (msg == null || obj == null) { synchronized (this) { quit = true; notifyAll(); } } } catch (Exception e) { System.out.println("Can't receive message: " + e); } } private Context getInitialContextWebLogic(String url) { try { Hashtable h = new Hashtable(); h.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory"); h.put("weblogic.jndi.createIntermediateContexts", "true"); h.put(Context.PROVIDER_URL, url); return new InitialContext(h); } catch (Exception e) { System.out.println("Can't get JNDI context."); return null; } } public static boolean checkUsage(String[] args) { if (args.length > 1) { System.out.println("Usage: java ObjectReceiver [<server-url>]"); return false; } return true; } }