How do you implement a client that publishes to a topic?
Created May 4, 2012
Jerry Smith
Client implementations vary with respect to set-up operations, depending on the JMS implementation. The following client, TimePublisherSonicMQ, works with the SonicMQ JMS server:
import java.util.Date; import javax.jms.*; public class TimePublisherSonicMQ extends Thread { public static final String SERVER_URL = "localhost:2506"; public static final String TOPIC = "j2ee.jms.timeTopic"; public static final String USER_ID = "pubuser"; public static final String PASSWORD = "password"; public static final int REPORTING_DURATION = 30000; // milliseconds public static final int REPORTING_INTERVAL = 5000; // milliseconds private String url; private TopicConnectionFactory tcf; private TopicConnection con; private TopicSession session; private TopicPublisher publisher; private Topic topic; private TextMessage message; private int interval = REPORTING_INTERVAL; private boolean quit; private boolean readyToPublish; // // main() is for testing only. It starts time publishing, waits // for a short period of time, and then stops time publishing. // public static void main(String[] args) { if (!checkUsage(args)) System.exit(-1); TimePublisherSonicMQ tp = new TimePublisherSonicMQ(args[0], REPORTING_INTERVAL); if (!tp.isReadyToPublish()) { System.out.println("Not ready to publish."); System.exit(-1); } tp.startPublishing(); try { Thread.sleep(REPORTING_DURATION); } catch (Exception e) {} tp.stopPublishing(); tp.close(); } public TimePublisherSonicMQ() { this(SERVER_URL, REPORTING_INTERVAL); } public TimePublisherSonicMQ(String url) { this(url, REPORTING_INTERVAL); } public TimePublisherSonicMQ(String url, int interval) { readyToPublish = true; this.url = url; this.interval = interval; init(); } public void init() { if (!readyToPublish) return; try { tcf = new progress.message.jclient.TopicConnectionFactory(url, ""); con = tcf.createTopicConnection(USER_ID, PASSWORD); session = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); } catch (Exception e) { readyToPublish = false; System.out.println("Can't initialize time publishing."); return; } try { topic = session.createTopic(TOPIC); } catch (Exception e) { readyToPublish = false; closeConnection(); return; } try { publisher = session.createPublisher(topic); message = session.createTextMessage(); con.start(); } catch (Exception e) { readyToPublish = false; closeConnection(); return; } } public void close() { if (!readyToPublish) return; try { publisher.close(); closeConnection(); } catch (Exception e) { System.out.println("Can't close TimePublisherSonicMQ instance."); } readyToPublish = false; } private void closeConnection() { if (!readyToPublish) return; try { session.close(); con.close(); } catch (Exception e) { System.out.println("Can't close connection."); } } public boolean isReadyToPublish() { return readyToPublish; } public void startPublishing() { if (!readyToPublish) return; start(); } synchronized public void run() { if (!readyToPublish) return; quit = false; while (!quit) { try { message.setText(new Date().toString()); publisher.publish(message); } catch (Exception e) { System.out.println("Can't publish times."); } try { wait(interval); } catch (InterruptedException e) { } } } public void sendMessageToSubscribers(String msg) { if (!readyToPublish) return; try { message.setText(msg); publisher.publish(message); } catch (Exception e) { System.out.println("Can't send message: " + e); } } synchronized public void stopPublishing() { if (!readyToPublish) return; quit = true; notifyAll(); } synchronized public void setInterval(int interval) { this.interval = interval; } public static boolean checkUsage(String[] args) { if (args.length != 1) { System.out.println("Usage: java TimePublisherSonicMQ <server-url>"); return false; } return true; } }