How do you implement a client that subscribes to a topic?
Created May 4, 2012
Jerry Smith
Client implementations vary with respect to set-up operations, depending on the JMS implementation. The following client, TimeSubscriberSonicMQ, works with the SonicMQ JMS server:
import javax.jms.*; public class TimeSubscriberSonicMQ implements MessageListener, Runnable { public static final String SERVER_URL = "localhost:2506"; public static final String TOPIC = "j2ee.jms.timeTopic"; public static final String USER_ID = "subuser"; public static final String PASSWORD = "password"; public static final String SHUTDOWN_MSG = "Publishing Terminated..."; private String url; private TopicConnectionFactory tcf; private TopicConnection con; private TopicSession session; private TopicSubscriber subscriber; private Topic topic; private String statusMessage = ""; private boolean quit = false; private boolean readyToSubscribe; public static void main(String[] args) { if (!checkUsage(args)) System.exit(-1); TimeSubscriberSonicMQ ts = new TimeSubscriberSonicMQ(args[0]); if (!ts.isReadyToSubscribe()) { System.out.println("Not ready to subscribe."); System.exit(-1); } ts.run(); ts.close(); } public TimeSubscriberSonicMQ() { this(SERVER_URL); } public TimeSubscriberSonicMQ(String url) { readyToSubscribe = true; this.url = url; init(); } public void init() { if (!readyToSubscribe) return; try { tcf = new progress.message.jclient.TopicConnectionFactory(url, ""); con = tcf.createTopicConnection(USER_ID, PASSWORD); session = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); } catch (Exception e) { readyToSubscribe = false; setStatusMessage("Can't initialize time subscribing."); System.out.println(getStatusMessage()); return; } try { topic = session.createTopic(TOPIC); } catch (Exception e) { readyToSubscribe = false; closeConnection(); return; } try { subscriber = session.createSubscriber(topic); subscriber.setMessageListener(this); con.start(); } catch (Exception e) { readyToSubscribe = false; closeConnection(); return; } } public void close() { if (!readyToSubscribe) return; try { subscriber.close(); closeConnection(); } catch (Exception e) { setStatusMessage("Can't close TimeSubscriberSonicMQ instance."); System.out.println(getStatusMessage()); } readyToSubscribe = false; } private void closeConnection() { if (!readyToSubscribe) return; try { session.close(); con.close(); } catch (Exception e) { setStatusMessage("Can't close connection."); System.out.println(getStatusMessage()); } } public boolean isReadyToSubscribe() { return readyToSubscribe; } public String getStatusMessage() { return statusMessage; } private void setStatusMessage(String msg) { statusMessage = msg; } public void run() { if (!readyToSubscribe) return; synchronized (this) { while (!quit) { try { wait(); } catch (InterruptedException ie) { } } } } public void onMessage(Message msg) { if (!readyToSubscribe) return; try { String msgText; if (msg instanceof TextMessage) { msgText = ((TextMessage) msg).getText(); } else { msgText = msg.toString(); } System.out.println("The time is: " + msgText); if (msgText.equalsIgnoreCase(SHUTDOWN_MSG)) { synchronized (this) { quit = true; notifyAll(); } } } catch (Exception e) { setStatusMessage("Can't receive message: " + e); System.out.println(getStatusMessage()); } } public static boolean checkUsage(String[] args) { if (args.length != 1) { System.out.println("Usage: java TimeSubscriberSonicMQ <server-url>"); return false; } return true; } }