How do you implement a client that subscribes to a topic?

Jerry Smith

Client implementations vary with respect to set-up operations, depending on the JMS implementation. The following client, TimeSubscriberSonicMQ, works with the SonicMQ JMS server:

import javax.jms.*;

public class TimeSubscriberSonicMQ
    implements MessageListener, Runnable {
  public static final String SERVER_URL = "localhost:2506";
  public static final String TOPIC = "j2ee.jms.timeTopic";
  public static final String USER_ID = "subuser";
  public static final String PASSWORD = "password";
  public static final String SHUTDOWN_MSG = "Publishing Terminated...";

  private String url;
  private TopicConnectionFactory tcf;
  private TopicConnection con;
  private TopicSession session;
  private TopicSubscriber subscriber;
  private Topic topic;
  private String statusMessage = "";
  private boolean quit = false;
  private boolean readyToSubscribe;

  public static void main(String[] args) {
    if (!checkUsage(args))
      System.exit(-1);
    TimeSubscriberSonicMQ ts = new TimeSubscriberSonicMQ(args[0]);
    if (!ts.isReadyToSubscribe()) {
      System.out.println("Not ready to subscribe.");
      System.exit(-1);
    }
    ts.run();
    ts.close();
  }

  public TimeSubscriberSonicMQ() {
    this(SERVER_URL);
  }

  public TimeSubscriberSonicMQ(String url) {
    readyToSubscribe = true;
    this.url = url;
    init();
  }

  public void init() {
    if (!readyToSubscribe)
      return;
    try {
      tcf = new progress.message.jclient.TopicConnectionFactory(url, "");
      con = tcf.createTopicConnection(USER_ID, PASSWORD);
      session = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
    }
    catch (Exception e) {
      readyToSubscribe = false;
      setStatusMessage("Can't initialize time subscribing.");
      System.out.println(getStatusMessage());
      return;
    }
    try {
      topic = session.createTopic(TOPIC);
    }
    catch (Exception e) {
      readyToSubscribe = false;
      closeConnection();
      return;
    }
    try {
      subscriber = session.createSubscriber(topic);
      subscriber.setMessageListener(this);
      con.start();
    }
    catch (Exception e) {
      readyToSubscribe = false;
      closeConnection();
      return;
    }
  }

  public void close() {
    if (!readyToSubscribe)
      return;
    try {
      subscriber.close();
      closeConnection();
    }
    catch (Exception e) {
      setStatusMessage("Can't close TimeSubscriberSonicMQ instance.");
      System.out.println(getStatusMessage());
    }
    readyToSubscribe = false;
  }

  private void closeConnection() {
    if (!readyToSubscribe)
      return;
    try {
      session.close();
      con.close();
    }
    catch (Exception e) {
      setStatusMessage("Can't close connection.");
      System.out.println(getStatusMessage());
    }
  }

  public boolean isReadyToSubscribe() {
    return readyToSubscribe;
  }

  public String getStatusMessage() {
    return statusMessage;
  }

  private void setStatusMessage(String msg) {
    statusMessage = msg;
  }

  public void run() {
    if (!readyToSubscribe)
      return;
    synchronized (this) {
      while (!quit) {
        try { wait(); }
        catch (InterruptedException ie) { }
      }
    }
  }

  public void onMessage(Message msg) {
    if (!readyToSubscribe)
      return;
    try {
      String msgText;
      if (msg instanceof TextMessage) {
        msgText = ((TextMessage) msg).getText();
      }
      else {
        msgText = msg.toString();
      }
      System.out.println("The time is: " + msgText);
      if (msgText.equalsIgnoreCase(SHUTDOWN_MSG)) {
        synchronized (this) {
          quit = true;
          notifyAll();
        }
      }
    }
    catch (Exception e) {
      setStatusMessage("Can't receive message:  " + e);
      System.out.println(getStatusMessage());
    }
  }

  public static boolean checkUsage(String[] args) {
    if (args.length != 1) {
      System.out.println("Usage:  java TimeSubscriberSonicMQ <server-url>");
      return false;
    }
    return true;
  }
}
0 Comments  (click to add your comment)
Comment and Contribute

 

 

 

 

 


(Maximum characters: 1200). You have 1200 characters left.

 

 

About | Sitemap | Contact