How do you implement a client that publishes to a topic?

Jerry Smith

Client implementations vary with respect to set-up operations, depending on the JMS implementation. The following client, TimePublisherSonicMQ, works with the SonicMQ JMS server:

import java.util.Date;
import javax.jms.*;

public class TimePublisherSonicMQ extends Thread {
  public static final String SERVER_URL = "localhost:2506";
  public static final String TOPIC = "j2ee.jms.timeTopic";
  public static final String USER_ID = "pubuser";
  public static final String PASSWORD = "password";
  public static final int REPORTING_DURATION = 30000; // milliseconds
  public static final int REPORTING_INTERVAL = 5000; // milliseconds

  private String url;
  private TopicConnectionFactory tcf;
  private TopicConnection con;
  private TopicSession session;
  private TopicPublisher publisher;
  private Topic topic;
  private TextMessage message;
  private int interval = REPORTING_INTERVAL;
  private boolean quit;
  private boolean readyToPublish;

  //
  // main() is for testing only.  It starts time publishing, waits
  // for a short period of time, and then stops time publishing.
  //
  
  public static void main(String[] args) {
    if (!checkUsage(args))
      System.exit(-1);
    TimePublisherSonicMQ tp = new TimePublisherSonicMQ(args[0], REPORTING_INTERVAL);
    if (!tp.isReadyToPublish()) {
      System.out.println("Not ready to publish.");
      System.exit(-1);
    }
    tp.startPublishing();
    try { Thread.sleep(REPORTING_DURATION); }
    catch (Exception e) {}
    tp.stopPublishing();
    tp.close();
  }

  public TimePublisherSonicMQ() {
    this(SERVER_URL, REPORTING_INTERVAL);
  }

  public TimePublisherSonicMQ(String url) {
    this(url, REPORTING_INTERVAL);
  }

  public TimePublisherSonicMQ(String url, int interval) {
    readyToPublish = true;
    this.url = url;
    this.interval = interval;
    init();
  }

  public void init() {
    if (!readyToPublish)
      return;
    try {
      tcf = new progress.message.jclient.TopicConnectionFactory(url, "");
      con = tcf.createTopicConnection(USER_ID, PASSWORD);
      session = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
    }
    catch (Exception e) {
      readyToPublish = false;
      System.out.println("Can't initialize time publishing.");
      return;
    }
    try {
      topic = session.createTopic(TOPIC);
    }
    catch (Exception e) {
      readyToPublish = false;
      closeConnection();
      return;
    }
    try {
      publisher = session.createPublisher(topic);
      message = session.createTextMessage();
      con.start();
    }
    catch (Exception e) {
      readyToPublish = false;
      closeConnection();
      return;
    }
  }

  public void close() {
    if (!readyToPublish)
      return;
    try {
      publisher.close();
      closeConnection();
    }
    catch (Exception e) {
      System.out.println("Can't close TimePublisherSonicMQ instance.");
    }
    readyToPublish = false;
  }

  private void closeConnection() {
    if (!readyToPublish)
      return;
    try {
      session.close();
      con.close();
    }
    catch (Exception e) {
      System.out.println("Can't close connection.");
    }
  }

  public boolean isReadyToPublish() {
    return readyToPublish;
  }

  public void startPublishing() {
    if (!readyToPublish)
      return;
    start();
  }

  synchronized public void run() {
    if (!readyToPublish)
      return;
    quit = false;
    while (!quit) {
      try {
        message.setText(new Date().toString());
        publisher.publish(message);
      }
      catch (Exception e) {
        System.out.println("Can't publish times.");
      }
      try {
        wait(interval);
      }
      catch (InterruptedException e) {
      }
    }
  }

  public void sendMessageToSubscribers(String msg) {
    if (!readyToPublish)
      return;
    try {
      message.setText(msg);
      publisher.publish(message);
    }
    catch (Exception e) {
      System.out.println("Can't send message:  " + e);
    }
  }

  synchronized public void stopPublishing() {
    if (!readyToPublish)
      return;
    quit = true;
    notifyAll();
  }

  synchronized public void setInterval(int interval) {
    this.interval = interval;
  }

  public static boolean checkUsage(String[] args) {
    if (args.length != 1) {
      System.out.println("Usage:  java TimePublisherSonicMQ <server-url>");
      return false;
    }
    return true;
  }
}
0 Comments  (click to add your comment)
Comment and Contribute

 

 

 

 

 


(Maximum characters: 1200). You have 1200 characters left.

 

 

About | Sitemap | Contact