Chapter 4. Publish-and-Subscribe Messaging

This chapter focuses on the publish-and-subscribe (pub/sub) messaging model that was introduced in Chapter 2. The pub/sub messaging model allows a message producer (also called a publisher) to broadcast a message to one or more consumers (called subscribers). There are three important aspects of the pub/sub model:

  • Messages are pushed to consumers, which means that consumers are delivered messages without having to request them. Messages are exchanged through a virtual channel called a topic. A topic is a destination where producers can publish, and subscribers can consume, messages. Messages delivered to a topic are automatically pushed to all qualified consumers.

  • As in enterprise messaging in general, there is no coupling of the producers to the consumers. Subscribers and publishers can be added dynamically at runtime, which allows the system to grow or shrink in complexity over time.

  • Every client that subscribes to a topic receives its own copy of messages published to that topic. A single message produced by one publisher may be copied and distributed to hundreds, or even thousands of subscribers.

In Chapter 2 you learned the basics of the pub/sub model by developing a simple chat client. In this chapter we will build on those lessons and examine more advanced features of this model, including guaranteed messaging, topic-based addressing, durable subscriptions, request-reply, and temporary topics.

Getting Started with the B2B Application

In this chapter we abandon the simple chat example for a more complex and real-world Business-to-Business (B2B) scenario. In our new example, a wholesaler wants to distribute price information to retailers, and the retailers want to respond by generating orders. We’ll implement this scenario using the publish-and-subscribe model: the wholesaler will publish messages containing new prices and hot deals, and the retailers will respond by creating their own messages to order stock.

This scenario is typical of many Business-to-Business operations. We call the clients retailers and wholesalers, but these names are really only for convenience. There’s little difference between our wholesaler/retailer scenario and a stock broker broadcasting stock prices to investors, or a manufacturer broadcasting bid requests to multiple suppliers. The fact that we use a retailer and a wholesaler to illustrate our example is much less important than the way we apply JMS.

Our simple trading system is implemented by two classes, both of which are JMS clients: Wholesaler and Retailer . In the interest of keeping the code simple, we won’t implement a fancy user interface; our application has a rudimentary command-line user interface.

Running the B2B Application

Before looking at the code, let’s look at how the application works. As with the Chat application, the Wholesaler class includes a main( ) method so it can be run as a standalone Java application. It’s executed from the command line as follows:

java chap4.B2B.Wholesaler localhost username password

username and password are the authentication information for the client. The Retailer class can be executed in the same manner:

java chap4.B2B.Retailer localhost username password

Start your JMS server, then run one instance of a Wholesaler client and a Retailer client in separate command windows. In the Wholesaler client you are prompted to enter an item description, an old price, and a new price. Enter the following as shown:

Bowling Shoes, 100.00, 55.00

Upon hitting the Enter key, you should see the Retailer application display information on the screen indicating that it has received a price change notice. You should then see the Wholesaler indicating that it has received a “buy” order from the Retailer. Here’s the complete interaction with the Wholesaler and the Retailer:[1]

               java chap4.B2B.Wholesaler localhost WHOLESALER passwd1
Enter: Item, Old Price, New Price
e.g., Bowling Shoes, 100.00, 55.00
Bowling Shoes, 100.00, 55.00
Order received - 1000 Bowling Shoes from DurableRetailer
-----------------------
java chap4.B2B.Retailer localhost RETAILER passwd2
Retailer application started.
Received Hot Buy: Bowling Shoes, 100.00, 55.00
Buying 1000 Bowling Shoes

Here’s what happened. The Wholesaler publishes a price quotation on a topic, “Hot Deals,” which is intended for one or more Retailers. The Retailers subscribe to the “Hot Deals” topic in order to receive price quotes. The Retailer application has no interaction with a live user. Instead, it has an autoBuy( ) method that examines the old price and the new price. If the new price represents a reduction of greater than ten percent, the Retailer sends a message back to the Wholesaler on the “Buy Order” topic, telling it to purchase 1,000 items. In JMS terms, the Wholesaler is a producer of the “Hot Deals” topic and a consumer of the “Buy Order” topic. Conversely, the Retailer is a consumer of the “Hot Deals” topic and a producer of the “Buy Order” topic, as illustrated in Figure 4.1.

Producers and consumers in the B2B example

Figure 4.1. Producers and consumers in the B2B example

The B2B Source Code

The rest of this chapter examines the source code for the Wholesaler and Retailer classes, and covers several advanced subjects related to the pub/sub messaging model.

The Wholesaler class

After the listing, we will take a brief tour of the methods in this class, and discuss their responsibilities. We will go into detail about the implementation later in this chapter. Now, here is the complete definition of the Wholesaler class, which is responsible for publishing items to the “Hot Deals” topic and receiving “Buy Orders” on those deals from retailers:

public class Wholesaler implements javax.jms.MessageListener{

   private javax.jms.TopicConnection connect = null;
   private javax.jms.TopicSession pubSession = null;
   private javax.jms.TopicSession subSession = null;
   private javax.jms.TopicPublisher publisher = null;
   private javax.jms.TopicSubscriber subscriber = null;
   private javax.jms.Topic hotDealsTopic = null;
   private javax.jms.TemporaryTopic buyOrdersTopic = null;

   public Wholesaler(String broker, String username, String password){
      try {
         Properties env = new Properties( );
         // ... specify the JNDI properties specific to the vendor
         
         InitialContext jndi = new InitialContext(env);
                  
         TopicConnectionFactory factory = 
          (TopicConnectionFactory)jndi.lookup(broker);
         connect = factory.createTopicConnection (username, password);

         pubSession = 
          connect.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
         subSession = 
          connect.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
         
         hotDealsTopic = (Topic)jndi.lookup("Hot Deals");
         publisher = pubSession.createPublisher(hotDealsTopic);

         buyOrdersTopic = subSession.createTemporaryTopic( );

         subscriber = subSession.createSubscriber(buyOrdersTopic);
         subscriber.setMessageListener(this);
         
         connect.start( );
         
      } catch (javax.jms.JMSException jmse){
         jmse.printStackTrace( ); System.exit(1);
      } catch (javax.naming.NamingException jne){
         jne.printStackTrace( ); System.exit(1);
      }
   }
   private void publishPriceQuotes(String dealDesc, String username, 
                                   String itemDesc,  float oldPrice, 
                                   float newPrice){
      try {
        javax.jms.StreamMessage message = 
           pubSession.createStreamMessage( );
        message.writeString(dealDesc);
        message.writeString(itemDesc);
        message.writeFloat(oldPrice);
        message.writeFloat(newPrice);
                   
        message.setStringProperty("Username", username);
        message.setStringProperty("Itemdesc", itemDesc);
                   
        message.setJMSReplyTo(buyOrdersTopic);               
                   
        publisher.publish(
            message,
            javax.jms.DeliveryMode.PERSISTENT,
            javax.jms.Message.DEFAULT_PRIORITY,
            1800000);
            
      } catch ( javax.jms.JMSException jmse ){
         jmse.printStackTrace( );
      }      
   }
   public void onMessage( javax.jms.Message message){
      try {
         TextMessage textMessage = (TextMessage) message;
         String text = textMessage.getText( );
         System.out.println("\nOrder received - "+text+
                            " from " + message.getJMSCorrelationID( ));
      } catch (java.lang.Exception rte){
         rte.printStackTrace( );
      }         
   }
   public void exit( ){
      try {
        connect.close( );
      } catch (javax.jms.JMSException jmse){
        jmse.printStackTrace( );
      }
      System.exit(0);
   }
   public static void main(String argv[]) {
      String broker, username, password;
      if  (argv.length == 3){
         broker = argv[0];
         username = argv[1];
         password = argv[2];
      } else  {
         System.out.println("Invalid arguments. Should be: ");
         System.out.println("java Wholesaler broker username password");
         return;
      }
      
      Wholesaler wholesaler = new Wholesaler(broker,username,password);
      
      try {
         // Read all standard input and send it as a message.
         java.io.BufferedReader stdin = new java.io.BufferedReader
            (new java.io.InputStreamReader( System.in ) );
         System.out.println ("Enter: Item, Old Price, New Price");
         System.out.println("\ne.g., Bowling Shoes, 100.00, 55.00");

         while ( true ){
            String dealDesc = stdin.readLine( );
            if  (dealDesc != null && dealDesc.length( ) > 0){
               // Parse the deal description
               StringTokenizer tokenizer = 
               new StringTokenizer(dealDesc,",") ;
                  String itemDesc = tokenizer.nextToken( );
                  String temp = tokenizer.nextToken( );
                  float oldPrice = 
                    Float.valueOf(temp.trim()).floatValue( );
                  temp = tokenizer.nextToken( );
                  float newPrice = 
                    Float.valueOf(temp.trim()).floatValue( );
                  
               wholesaler.publishPriceQuotes(dealDesc,username,
                                           itemDesc,oldPrice,newPrice);
            } else  {
                wholesaler.exit( );
            }
         }
      } catch ( java.io.IOException ioe ){
         ioe.printStackTrace( );
      }
   }
}

The main( ) method creates an instance of the Wholesaler class, passing it the information it needs to set up its publishers and subscribers.

In the Wholesaler class’s constructor, JNDI is used to obtain the “Hot Deals” topic identifier, which is then used to create a publisher. Most of this should look familiar to you; it’s similar in many ways to the Chat application, except for the creation of a temporary topic, which is discussed in more detail later in this section.

Once the Wholesaler is instantiated, the main( ) method continues to monitor the command line for new “Hot Deals.” When a “Hot Deal” is entered at the command prompt, the main( ) method parses the information and passes it to the Wholesaler instance via the publishPriceQuotes( ) method.

The publishPriceQuotes( ) method is responsible for publishing messages containing information about price quotes to the “Hot Deals” topic.

The onMessage( ) method receives messages from clients responding to deals published on the “Hot Deals” topic. The contents of these messages are simply printed to the command line.

The Retailer class

Here is the complete definition of the Retailer class, which subscribes to the “Hot Deals” topic and responds with “Buy Orders” on attractive deals:

public class Retailer implements javax.jms.MessageListener{

    private javax.jms.TopicConnection connect = null;
    private javax.jms.TopicSession session = null;
    private javax.jms.TopicPublisher publisher = null;
    private javax.jms.Topic hotDealsTopic = null;

    public Retailer( String broker, String username, String password){
        try {
            Properties env = new Properties( );
            // ... specify the JNDI properties specific to the vendor
                 
            InitialContext jndi = new InitialContext(env);
                          
            TopicConnectionFactory factory = 
            (TopicConnectionFactory)jndi.lookup(broker);
                
            connect = factory.createTopicConnection(username, password);
            connect.setClientID("DurableRetailer");

            session = 
            connect.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
                
            hotDealsTopic = (Topic)jndi.lookup("Hot Deals");
                
            javax.jms.TopicSubscriber subscriber = 
                session.createDurableSubscriber(hotDealsTopic,
                   "Hot Deals Subscription");
            subscriber.setMessageListener(this);
            connect.start( );
            
        } catch (javax.jms.JMSException jmse){
            jmse.printStackTrace( );
            System.exit(1);
        } catch (javax.naming.NamingException jne){
         jne.printStackTrace( ); System.exit(1);
        }
    }
    public void onMessage(javax.jms.Message aMessage){
        try {
            autoBuy(aMessage);
        } catch (java.lang.RuntimeException rte){
            rte.printStackTrace( );
        }            
    }
    
    private void autoBuy (javax.jms.Message message){
        int count = 1000;
        try {
            StreamMessage strmMsg = (StreamMessage)message;
            String dealDesc = strmMsg.readString( );
            String itemDesc = strmMsg.readString( );
            float oldPrice = strmMsg.readFloat( );
            float newPrice = strmMsg.readFloat( );
            System.out.println("Received Hot Buy :"+dealDesc);
            
            // If price reduction is greater than 10 percent, buy
            if (newPrice == 0 || oldPrice / newPrice > 1.1){
                System.out.println("\nBuying " + count +" "+ itemDesc);

                TextMessage textMsg = session.createTextMessage( );
                textMsg.setText(count + " " + itemDesc );

                javax.jms.Topic buytopic = 
                    (javax.jms.Topic)message.getJMSReplyTo( );
                    
                publisher = session.createPublisher(buytopic);
                
                textMsg.setJMSCorrelationID("DurableRetailer");
                
                publisher.publish(
                    textMsg,
                    javax.jms.DeliveryMode.PERSISTENT,
                    javax.jms.Message.DEFAULT_PRIORITY,
                    1800000);
            } else  {
                System.out.println ("\nBad Deal- Not buying.");
            }
        } catch (javax.jms.JMSException jmse){
            jmse.printStackTrace( );
        }
    }
    private void exit(String s){
        try {
            if ( s != null && 
                s.equalsIgnoreCase("unsubscribe"))
            {
                subscriber.close( );
                session.unsubscribe("Hot Deals Subscription");
            }
            connect.close( );
        } catch (javax.jms.JMSException jmse){
            jmse.printStackTrace( );
        }
        System.exit(0);
    }
    public static void main(String argv[]) {
        String broker, username, password;
        if  (argv.length == 3){
            broker = argv[0];
            username = argv[1];
            password = argv[2];
        } else  {
            System.out.println("Invalid arguments. Should be: ");
            System.out.println
            ("java Retailer broker username password");
            return;
        }
        
        Retailer retailer  = new Retailer(broker, username, password);
        
        try {
            System.out.println("\nRetailer application started.\n");
            // Read all standard input and send it as a message.
            java.io.BufferedReader stdin =
                new java.io.BufferedReader
                ( new java.io.InputStreamReader( System.in ) );
            while ( true ){
                String s = stdin.readLine( );
                if ( s == null )retailer.exit(null);
                else if ( s.equalsIgnoreCase("unsubscribe") )
                    retailer.exit ( s );
            }
        } catch ( java.io.IOException ioe ){
            ioe.printStackTrace( );
        }
    }
}

The main( ) method of Retailer is much like the main( ) method of Wholesaler. It creates an instance of the Retailer class and passes it the information it needs to set up its publishers and subscribers.

The constructor of the Retailer class is also similar to that of the Wholesaler class, except that it creates a durable subscription using the “Hot Deals” topic. Durable subscriptions will be discussed in more detail later in this section.

Once the Retailer is instantiated, the main( ) method uses the readLine( ) method as a way of blocking program execution in order to monitor for message input.

The publishPriceQuotes( ) method is responsible for publishing messages containing information about price quotes to the “Hot Deals” topic.

The onMessage( ) method receives messages from the Wholesaler client, then delegates its work to the autoBuy( ) method. The autoBuy( ) method examines the message, determines whether the price change is significant, and arbitrarily orders 1000 items. It orders the items by publishing a persistent message back to the Wholesaler client’s temporary topic, using the JMSCorrelationID as a way of identifying itself. We will examine persistent publishing and temporary topics in the next section.

Temporary Topics

In the chat example we explored in Chapter 2, we assumed that JMS clients would communicate with each other using established topics on which messages are asynchronously produced and consumed. In the next sections, we’ll explore ways to augment this basic mechanism. We’ll start by looking at temporary topics, which is a mechanism for JMS clients to create topics dynamically.

The constructor of the Wholesaler class creates a temporary topic. This topic is used as a JMSReplyTo destination for messages published to the “Hot Deals” topic in the publishPriceQuotes( ) method:

public Wholesaler(String broker, String username, String password){
    try {
        ...
        session = 
        connect.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
        ...
        buyOrdersTopic = session.createTemporaryTopic( );
        ...
}
...
private void publishPriceQuotes(String dealDesc, String username, 
                                   String itemDesc,  float oldPrice, 
                                   float newPrice){
      try {
        javax.jms.StreamMessage message = session.createStreamMessage( );
        ...
        message.setJMSReplyTo(buyOrdersTopic);               
                   
        publisher.publish(
            message,
            javax.jms.DeliveryMode.PERSISTENT,
            javax.jms.Message.DEFAULT_PRIORITY,
            600000);
       ...
}

When the Retailer client decides to respond to a “Hot Deals” message with a buy order, it uses the JMSReplyTo destination, which is the temporary topic created by Wholesaler application:

private void autoBuy (javax.jms.Message message){
    int count = 1000;
    try {
        StreamMessage strmMsg = (StreamMessage)message;
        ...           
        // If price reduction is greater than 10 percent, buy
        if (newPrice == 0 || oldPrice / newPrice > 1.1){
            ...
            javax.jms.Topic buytopic = 
                (javax.jms.Topic)message.getJMSReplyTo( );
                    
            publisher = session.createPublisher(buytopic);
        ...
}

A temporary topic is a topic that is dynamically created by the JMS provider, using the createTemporaryTopic( ) method of the TopicSession object. A temporary topic is associated with the connection that belongs to the TopicSession that created it. It is only active for the duration of the connection, and it is guaranteed to be unique across all connections. Since it is temporary, it can’t be durable: it lasts only as long as its associated client connection is active. In all other respects it is just like a “regular” topic.

Since a temporary topic is unique across all client connections—-it is obtained dynamically through a method call on a client’s session object—it is unavailable to other JMS clients unless the topic identity is transferred using the JMSReplyTo header. While any client may publish messages on another client’s temporary topic, only the sessions that are associated with the JMS client connection that created the temporary topic may subscribe to it. JMS clients can also, of course, publish messages to their own temporary topics.

In the interest of exploring concepts like temporary topics we have designed our B2B example so that the consumer responds directly to the producer. In larger real-world applications, however, there may be many publishers and subscribers exchanging messages across many topics. A message may represent a workflow, which may take multiple hops through various stages of a business process. In that type of scenario the consumer of a message may never respond directly to the producer that originated the message. It is more likely that the response to the message will be forwarded to some other process. Thus, the JMSReplyTo header can be used as a place to specify a forwarding address, rather than the destination address of the original sender.

JMS provides a set of design patterns and helper classes for performing a direct request-reply conversation, which we will get into later in Section 4.6 of this chapter.

Durable Subscriptions

A durable subscription is one that outlasts a client’s connection with a message server. While a durable subscriber is disconnected from the JMS server, it is the responsibility of the server to store messages the subscriber misses. When the durable subscriber reconnects, the message server sends it all the unexpired messages that accumulated. This behavior is commonly referred to as store-and-forward messaging . Store-and-forward messaging is a key component of the guaranteed messaging solution. Durable subscriptions make a JMS consumer tolerant of disconnections, whether they are intentional or the result of a partial failure

We can demonstrate durable subscriptions with the B2B example. If you still have the Retailer application up and running, try simulating an abnormal shutdown by typing Ctrl-C in the command window. Leave the Wholesaler running. In the command window for the wholesaler application, type:

Surfboards, 500.00, 299.99
Hockey Sticks, 20.00, 9.99

Once the deals have been entered, restart the Retailer application:

java chap4.B2B.Retailer localhost username password

The first time you ran the Retailer application, a topic was registered as durable. When you abnormally terminated the application, the subscription information was retained by the JMS provider. When the Retailer application comes back up, the surfboards and hockey sticks messages are received, processed, and responded to. Because the Retailer had a durable subscription to the “Hot Deals” topic, the JMS server saved the messages that arrived while the Retailer was down. The messages were then delivered when the Retailer resubscribed to the topic.

Here’s how we set up the durable subscription. A durable subscription is created by a TopicSession object, the same as with a nondurable subscription. The Retailer class obtains a durable subscription in its constructor:

public Retailer( String broker, String username, String password){
    try {
        ...           
        hotDealsTopic = (Topic)jndi.lookup("Hot Deals");
        javax.jms.TopicSubscriber subscriber = 
            session.createDurableSubscriber(hotDealsTopic, 
            "Hot Deals Subscription");
        subscriber.setMessageListener(this);
        connect.start( );
        ....
}

The createDurableSubscriber( ) method takes two parameters: a topic name, and a subscription name. In our example we are using the String “Hot Deals Subscription” to identify the subscription name. While topic names are specified as being supported as JMS administered objects, subscription names are not. While not required by JMS, it is good practice for a JMS provider to provide an administration tool that monitors active subscription names, as illustrated in Figure 4.2.

Managing active durable subscriptions

Figure 4.2. Managing active durable subscriptions

A durable subscription’s uniqueness is defined by the client ID and the subscription name. In the event that the client disconnects without unsubscribing, a JMS provider will store these messages until they can be delivered later. Upon reconnecting and resubscribing, the JMS provider will match up the messages based on these two identifiers, and deliver them to the subscriber.

You might think that the client ID and the topic would be enough for the provider to uniquely identify a durable subscription. However, a client may have multiple subscriptions on the same topic; for example, a client may want to use different message selectors to sort the incoming messages. (Message selectors are discussed in detail in Appendix D.) Therefore, durable subscriptions must be identified by their own name; simply using the topic name and the client ID will not suffice.

The JMS specification is intentionally vague about how the JMS provider determines the uniqueness of a client ID. Various provider implementations are allowed to have their own internal rules for what constitutes a unique client. The setClientID( ) method on the connection object is provided in the API as a hint. The client ID is set in the constructor of our Retailer example:

public Retailer( String broker, String username, String password){
    try {
        ....
        connect = factory.createTopicConnection (username, password);
        connect.setClientID(username);
        ....
    }
    ....
}

Publishing the Message Persistently

Both the Wholesaler and Retailer classes publish messages using the persistent delivery mode:

publisher.publish(
    message,
    javax.jms.DeliveryMode.PERSISTENT,
    javax.jms.Message.DEFAULT_PRIORITY,
    1800000);

Note the use of the overloaded publish( ) method, with parameters that specify delivery mode, priority, and message expiration. This method provides an alternative to using the Message.setJMSDeliveryMode( ) and TopicPublisher.setTimeToLive( ) operations, as discussed in Chapter 3. In JMS, the delivery mode (persistent, nonpersistent) is a Quality of Service (QoS) setting on the message itself. Marking the message as persistent ensures that the message will be saved to a reliable persistent store by the JMS provider before the publish( ) method returns, and allows client execution to continue. More on how and why this works reliably can be found in Chapter 6.

Persistent Messages and Temporary Topics

When you are using a temporary topic as a way of posting a reply to a message, you should realize that the total round trip (the initial message and the reply) isn’t guaranteed to survive a certain failure condition, even if you use persistent messages. The problem is that temporary topics cannot be used for durable subscriptions. Consider the following scenario:

  1. A JMS client (producer) creates a temporary topic, puts it in the JMSReplyTo header of a message, marks the message as persistent, and publishes it.

  2. The subscriber gets the message and publishes a response on the temporary topic using a persistent message.

  3. The original producer expects a reply on the temporary topic, but disconnects or crashes before it is received.

  4. The original producer restarts, and is no longer able to subscribe to the original temporary topic that it had established in its previous life. It can’t resubscribe because the temporary topic was only valid for the duration of the previous connection. Calling createTemporaryTopic( ) in the new session returns a new temporary topic, not the previous one.

This is a subtle point, since any client with a nondurable subscription will not get messages during a failure. In other scenarios it may be acceptable to lose messages for a time, yet still be able to start receiving newly published “responses” when the original producer of the message starts up again. In the B2B example, a failure of the Wholesaler means that the reply messages sent to the temporary topic will be lost. An alternative and superior design would use the JMSReplyTo header, with an established topic instead of a temporary one. Chapter 6 provides more detail on message delivery semantics, Quality of Service, and failure conditions.

JMSCorrelationID

In the B2B example, we are using the JMSCorrelationID as a way for the Retailer to associate its identity with its reply message, as illustrated by the following code in Retailer.autoBuy( ) :

private void autoBuy (javax.jms.Message message){
    ...
    publisher = session.createPublisher(buytopic);
    textMsg.setJMSCorrelationID("DurableRetailer");
                
    publisher.publish(
           textMsg,
           javax.jms.DeliveryMode.PERSISTENT,
           javax.jms.Message.DEFAULT_PRIORITY,
           1800000);
        ...           
}

In Wholesaler, the JMSCorrelationID is extracted in the onMessage( ) handler, and simply printed on the command line:

public void onMessage( javax.jms.Message message){
        ...           
         System.out.println("Order received - "+text+
                            " from " + message.getJMSCorrelationID( ));
        ...           
   }

Another way to associate the Retailer’s identity with the reply message would be to store something unique in a message property, or in the message body itself.

A more common use of the JMSCorrelationID is not for the sake of establishing identity; it is for correlating the asynchronous reception of a message with a message that had been previously sent. A message consumer wishing to create a message to be used as a response may place the JMSMessageID of the original message in the JMSCorrelationID of the response message.

Request and Reply

JMS provides design patterns and helper classes to make it easier to write applications that need a direct request-reply between two end points. We have already shown two JMS features that can be used as part of a request-reply solution: temporary topics and the JMSReplyTo header. These features can be used independently or in combination to create an asynchronous request-reply conversation. On occasion you may want to create a synchronous request-reply conversation. There are two ways of doing this. You may call the TopicSubscriber.receive( ) method directly, or you may make use of the TopicRequestor class.

TopicSubscriber.receive( )

The receive( ) method is defined in the MessageConsumer class, which is the superclass of TopicSubscriber. The receive( ) method is a way of proactively asking for the message rather than passively receiving it through the onMessage( ) callback. In fact, the use of the receive( ) method negates the use of the onMessage( ) callback. The default behavior of the receive( ) method is to block program execution until a message is retrieved from the message server. The receive( ) method effectively changes the pub/sub model from a “push” to a “pull” model. From the client’s perspective, you can think of this as a polling mechanism; although that’s not necessarily how it is implemented by the JMS provider.

There are three flavors of the receive( ) method:

package javax.jms;
public interface MessageConsumer{
    ...
    Message receive( );
    Message receive(long timeout);
    Message receiveNoWait( );
    ...
}

The receive( ) method with no parameters blocks indefinitely, until a message is received. The receive(long timeout) method blocks until a message is received, or until the timeout period expires, whichever comes first. The receive( ) method will return null if the session is closed while the method is blocking. The receiveNoWait( ) method does not block at all. It either returns a message if one is available, or it returns null, if there is nothing currently pending to be delivered. Here is a slightly modified version of Wholesaler.publishPriceQuotes() that makes use of the receive( ) method:

private void publishPriceQuotes(String dealDesc, String username, 
                                   String itemDesc,  float oldPrice, 
                                   float newPrice){
  ...
  System.out.println("\nInitiating Synchronous Request");

  // Publish the message persistently
  publisher.publish(
     msg,                               //message
     javax.jms.DeliveryMode.PERSISTENT, //publish persistently
     javax.jms.Message.DEFAULT_PRIORITY,//priority
     MESSAGE_LIFESPAN);                 //Time to Live
                        
  javax.jms.Message aMessage = subscriber.receive( );
    
  System.out.println("\nRequest Sent, Reply Received!");
  if (aMessage != null)
  {
     onMessage(aMessage);
  }
...
}

In this example the subscriber, which subscribes to the “Buy Order” temporary topic, has its receive( ) method called. The receive( ) method blocks until a message is published by the Retailer to the “Buy Order” topic. The Wholesaler client becomes a synchronous client waiting for the Retailer to respond. When the receive( ) method returns with a message, the Wholesaler simply calls onMessage( ) directly to process the message.

Due to threading restrictions imposed on a JMS session object, it is impractical to have both synchronous and asynchronous operations on a session. Hence the Wholesaler’s constructor does not make a call to setMessageListener(this). The onMessage( ) handler will never get called automatically.

The recipient side of the conversation still looks the same as in our previous example. The Retailer.autoBuy( ) method receives the message, gets the return address from the JMSReplyTo header, and publishes a response using that topic.

It is erroneous for a session to be operated by more than one thread of control at any given time. In our example, there appears to be only one thread of control: the main thread of the application. However, when the onMessage( ) handler is invoked, it is being called by another thread that is owned by the JMS provider. Due to the asynchronous nature of the onMessage( ) callback, it could possibly be invoked while the main thread is blocking on a synchronous receive( ) .

TopicRequestor

The TopicRequestor class is distributed in source code form as a part of the JMS 1.0.2 distribution package. The class is very simple. Its constructor takes two parameters: a session and a topic. The constructor creates a temporary topic to be used for the duration of the session. Its most important method is request( ) , which looks like this:

public Message request(Message message) throws JMSException {
    message.setJMSReplyTo(tempTopic);
    publisher.publish(message);
    return(subscriber.receive( ));
}

The use of the TopicRequestor is similar to our receive( ) example, except that the calls to publish( ) and receive( ) are replaced with one call to request( ). Here is a modified excerpt from Wholesaler.publishPriceQuotes( ) illustrating how to use a TopicRequestor:

private void publishPriceQuotes(String dealDesc, String username, 
                                   String itemesc,  float oldPrice, 
                                   float newPrice){
   ...
   System.out.println("\nInitiating Synchronous Request");

   javax.jms.TopicRequestor requestor = 
   new javax.jms.TopicRequestor(session, pricetopic);

   javax.jms.Message aMessage = requestor.request(msg);   
                     
   System.out.println("\nRequest Sent, Reply Received!");
   if (aMessage != null)
   {
      onMessage(aMessage);
   }
   ...
}

As in our previous receive( ) example, the recipient side of the conversation remains unchanged. Retailer.autoBuy( ) receives the message, gets the return address from the JMSReplyTo header, and publishes a response using that topic.

As you can see, the TopicRequestor object is a higher-level abstraction built on top of the TopicSubscriber.receive( ) mechanism. It is very handy if you are willing to live with its limitations. Here are some reasons why you may want to call receive( ) yourself instead of using the TopicRequestor:

  • You may want to set time-to-live or persistent properties on the message.

  • You may not want to use a temporary topic. TopicRequestor creates its own temporary topic as its way of getting a response back.

  • You want to use the alternate receive(long timeout) or receiveNoWait( ) options.

  • You may want to publish on a topic, and receive responses on a p2p queue.

  • You may want to receive more than one message in response to a request.

  • TopicRequestor.close( ) will arbitrarily close the session. It may not be the behavior you are looking for.

  • You may want to receive the responses using a transaction. (More on JMS transactions can be found in Chapter 6.)

Unsubscribing

Upon closing the session, the JMS provider should automatically take care of unsubscribing any nondurable subscriptions that were created by the session. But there may be cases where you want to explicitly unsubscribe a durable subscriber in a client application. Here is how that is accomplished in Retailer.exit( ) :

private void exit(String s){
        try {
            if ( s != null && 
                s.equalsIgnoreCase("unsubscribe"))
            {
                subscriber.close( );
                session.unsubscribe("Hot Deals Subscription");
            }
            connect.close( );
        } catch (javax.jms.JMSException jmse){
            jmse.printStackTrace( );
        }
        System.exit(0);
    }

For nondurable subscriptions, calling the close( ) method on the TopicSubscriber class is sufficient. For durable subscriptions, there is a unsubscribe(String name) method on the TopicSession object, which takes the subscription name as its parameter. This informs the JMS provider that it should no longer store messages on behalf of this client. It is an error to call the unsubscribe( ) method without first closing the subscription. Hence both methods need to be called for durable subscriptions.



[1] WHOLESALER and RETAILER are usernames you have set up when configuring your JMS server. passwd1 and passwd2 are the passwords you’ve assigned to those usernames. If you are using an evaluation version of a JMS provider, it may not be necessary to set up usernames and passwords; check your vendor’s documentation for more information.

Get Java Message Service now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.