Saturday, February 25, 2012

JMS in Java

JMS in Java

Dear Reader,
    Today we will discuss about JMS- Java Message Service API.
    JMS is the standard API used by J2EE applications to access Message-Oriented Middleware (MOM) services. 
    Messaging is the ability of applications to interact and communicate with each other asynchronously. This 
    communication can be either synchronous or asynchronous. Message-driven applications are designed to 
    accomplish loose coupling and portability. Each part of these enterprise applications can be developed as 
    self-contained business components, and then can be integrated into a reliable and flexible system.
    
    JMS provides a unified API for J2EE enterprise applications to create, send, receive, and process messages 
    using any MOM products. These MOM products, also known as JMS providers, implement JMS API so that Java 
    applications can use the JMS API in a vendor-neutral manner. This allows applications to communicate with 
    each other using messages through any JMS provider. Communications between applications occur in an 
    asynchronous manner, which means that a sender sends a message and does not wait for the response, but 
    continues the flow of execution. This is similar to sending an e-mail message, and you don’t have to wait 
    for a reply.
    
    Because messaging is peer-to-peer, all users of JMS are referred to as JMS clients. A JMS application 
    consists of a set of application-defined messages and a set of JMS clients that exchange them. 
    A message basically has two parts: a header and payload/body. The header is comprised of special fields that 
    are used to identify the message, declare attributes of the message, and provide information for routing. 
    The difference between message types is determined largely by their payload, i.e., the type of application 
    data the message contains. The Message class, which is the super-class of all message objects, has no payload.
    Type of messages are below:

    Message
        This type has no payload. It is useful for simple event notification.
    
    TextMessage
        This type carries a java.lang.String as its payload. It is useful for exchanging simple text messages and 
        also for more complex character data, such as XML documents.
    
    ObjectMessage
        This type carries a serializable Java object as its payload. It's useful for exchanging Java objects.
    
    BytesMessage
        This type carries an array of primitive bytes as its payload. It's useful for exchanging data in an application's 
        native format, which may not be compatible with other existing Message types. It is also useful where JMS is used 
        purely as a transport between two systems, and the message payload is opaque to the JMS client.
    
    StreamMessage
        This type carries a stream of primitive Java types (int, double, char, etc.) as its payload. It provides a set of 
        convenience methods for mapping a formatted stream of bytes to Java primitives. It's an easy programming model 
        when exchanging primitive application data in a fixed order. 

    MapMessage
        This type carries a set of name-value pairs as its payload. The payload is similar to a java.util.Properties object, 
        except the values must be Java primitives or their wrappers. The MapMessage is useful for delivering keyed data.

    
    A destination is a logical channel that encapsulates the addresses of both the sending and receiving endpoints, 
    like a queue or a topic. JMS providers either broadcast a message from a destination to many clients, or send 
    a message to a single client. A JMS provider optionally supports the guaranteed delivery of messages, but does 
    not guarantee the order in which they are delivered. In many systems, such as financial and banking applications, 
    messages are required to be delivered once and only once. Messages can be delivered based on priority, expiration 
    time, or whether acknowledgment is required.

So there are two programming models in JMS API:
    Point to Point (PTP) (called Queue)
    Publish and Subscribe (pub-and-sub) (called Topic)

In PTP model, a message is delivered only to a single receiver but in pub-and-sub a message is broadcasted 
to multiple receivers.

Example of few JMS providers: Apache ActiveMQ, Websphere, Weblogic etc.

Below are the steps to write a JMS client (Either sender or receiver):
        1. Import javax.jms package   
        2. Lookup ConnectionFactory  //JNDI name must be set in XML config file for ConnectionFactory.
        3. Create Connection
        4. Create Session
        5. Lookup Destination (Topic or Queue)
        6. Create Producers and Consumers
        7. Create Message
        8. Send and receive Messages

Complete coding is below (Sender):
        QueueConnectionFactory qcf = (QueueConnectionFactory)ctx.lookup("jms/qcf"); //JNDI name must be set in XML file.
        QueueConnection jmsconn = qcf.createQueueConnection();
        QueueSession session = jmsconn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = (Queue)ctx.lookup("jms/q");
        QueueSender sender = session.createSender(queue);
        TextMessage message = session.createTextMessage("Message Text");
        sender.send(message);
        
A connection represents an open connection (TCP/IP) from the JMS client to the JMS server. 
Connection is used to create Session objects that in turn create message producers and consumers.
A Connection object is created by ConnectionFactory that could either be a TopicConnection or a QueueConnection.
A Session object is a single-threaded context for producing and consuming messages. But a connection object
can be multi-threaded as one connection can serve many sessions.

Optimization with Connection:
    Start the Connection when appropriate.
    Process messages concurrently.
    Close the Connection when finished.
    
1. Start the Connection when appropriate
    You need to start a Connection using the start() method for allowing the flow of messages from the producer 
    to the JMS server. First start Receiver/Consumer and then start the Producer Connection else the messages have 
    to wait in the JMS server or they persist if they are durable messages. This is an unnecessary overhead.

2. Process messages concurrently
    JMS provides a facility to process messages concurrently by getting a ConnectionConsumer that uses server 
    session pool. The server session pool is a pool of Sessions, each one executes separate message concurrently. 
    This facility gives an application ability to process messages concurrently thus improving performance.
    By the way this point can be avoided as it is intended for advance configurations.

You can create ConnectionConsumer using these methods.
For Queues :
    public ConnectionConsumer createConnectionConsumer(
        Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages)
        throws JMSException

For Topics :
public ConnectionConsumer createConnectionConsumer(
        Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages)
        throws JMSException

    In these methods the main parameters are maxMessages and ServerSessionPool. maxMessages denote the maximum 
    number of messages that can be simultaneously assigned to a server session. ServerSessionPool is an 
    administered object that you configure in vendor specific manner. This process works similar to Message 
    driven beans where you can process multiple messages concurrently. This gives good performance and 
    scalability. Some of the JMS vendors support this facility, but some vendors don't.

3. Close the Connection when finished
    You are closing the external resources like network or a database connection explicitly as soon 
    as you are done with them. Similarly a JMS connection is also a TCP/IP connection to the JMS server, 
    you should close the Connection using the close() method as and when you finish your work, when you 
    close the Connection it closes it's Session and Producer/Consumer objects also.

Optimization with Session
    A Session is used to create multiple producers and consumers. A Session can be a QueueSession for a PTP 
    or a TopicSession for a pub-and-sub model.

When you create a Session object, consider the following optimization techniques to improve performance:
    1. Choose proper acknowledgement mode
    2. Control Transaction
    3. Close the Session when finished.

1. Choose proper acknowledgement mode
    When you create a Session object, you can choose anyone of the three acknowledgement modes, 
    AUTO_ACKNOWLEDGE, CLIENT_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE. 
    For example,

    For Topic:
        topicSession=topicConnect.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE);

    For Queue:
        qsession=queueConnect.createQueueSession(false, session.AUTO_ACKNOWLEDGE);

Here you have a choice of choosing an acknowledgement among three modes. Each of these modes has a specific 
functionality. As per performance perspective, which mode gives the best performance?

    CLIENT_ACKNOWLEDGE mode is not a feasible option (when you have the freedom to choose from the other two 
    options) since the JMS server cannot send subsequent messages till it receives an acknowledgement from 
    the client.

    AUTO_ACKNOWLEDGE mode follows the policy of delivering the message once-and-only once but this incurs an 
    overhead on the server to maintain this policy.

    DUPS_OK_ACKNOWLEDGE mode has a different policy of sending the message more than once thereby reducing the 
    overhead on the server (imposed when using the  AUTO_ACKNOWLEDGE) but imposes an 
    overhead on the network traffic by sending the message more than once. But the AUTO_ACKNOWLEDGE mode 
    cleans up resources early from the persistent storage/memory which reduces the overhead because of that.

In summary, AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE give better performance than CLIENT_ACKNOWLEDGE.

2. Control Transaction
    In JMS a transaction is a group of messages that are consumed/delivered in all-or-none fashion. 
    You make messages as transactional messages by giving 'true' flag when creating a session.

    topicSession = tConnect.createTopicSession(true, Session.AUTO_ACKNOWLEDGE);
    queueSession = qConnect.createQueueSession(true, Session.AUTO_ACKNOWLEDGE);

In the above code the first parameter indicates the session as transactional session. 
Session has commit(), rollback() and isTransacted() methods to deal with transactional messages. 
A transaction starts implicitly when session is created and ends when commit() or rollback() method is 
called. Transactional messages are cumulated at JMS server until the transaction is committed or rolledback. 
This imposes significant overhead on JMS server.

Suppose if you want to send 100 messages, out of which you want only 10 messages to be in a transaction 
means either commit or rollback (all or none fashion). How would you control transactions in such situations? 
The best method is to divide transactional messages and non-transactional messages separately. Create 
transactional session for transactional messages by giving 'true' flag (see code above) and create a 
separate non-transactional session for non-transactional messages by giving 'false' flag. 

3. Close the Session when finished
    It is always better to remove an object as early as possible when finished with, although closing a 
    connection class closes session, this allows the garbage collector to remove objects.

Optimization with Destination:
    Destination (for Topic or Queue) is a virtual end-point between producers and consumers. Producers send 
    messages to the Destination which in turn delivers messages to consumers.

Destination (Topic/Queue) is configured in a vendor specific manner, The following parameters can be configured :
    Maximum size of the Destination
    Maximum number of messages in the Destination
    Priority of messages
    Time to live
    Time to deliver
    Delivery mode
    Re-delivery delay
    Re-delivery limit
    
We need to look up JNDI to get Destination object:
    InitialContext ic = new InitialContext(properties);
    Topic topic = (Topic) ic.lookup(topicName);
    Queue queue = (Queue) ic.lookup(queueName);

All the above configurable parameters have an impact on the performance. Here we will discuss the 
size of Destination, maximum messages in the Destination, Redelivery delay and Redelivery limit.

A message can be durable or non-durable. For non-durable messages, the time that messages take to deliver to the 
Destination depends upon its number and Destination size. If a large number of messages got collected in a Destination, 
they take more time to be delivered. So give less Destination size and less number of maximum messages to the Destination 
to improve performance.

Re-delivery delay time defines when to re-deliver a message if a failure occurs. If this is less, the frequency of 
re-delivery of a message is high thus increasing network traffic and vice versa. So high Redelivery delay time gives 
better performance. 

Re-delivery Limit defines the number of times a message should be re-delivered if a failure occurs and message is not 
delivered. If the Redelivery limit is less, then the performance is better because the memory overhead for non-durable 
messages and persistent overhead for durable messages is reduced. So set optimal Re-delivery limit to improve performance.


Optimization with Message Producer/Consumer:
    Producer(Sender/Publisher) sends messages to the Destination(Queue/Topic) where as Consumer(Receiver/Subscriber) 
    consumes messages from the Destination. Message Producer/Consumer is created by Session object.

For Topics:
    InitialContext ic = new InitialContext(properties);  //properties having URL related details.
    Topic topic = (Topic) ic.lookup(topicName);    
    TopicPublisher publisher = pubSession.createPublisher(topic);
    TopicSubscriber subscriber = subSession.createSubscriber(topic);

For Queues:
    InitialContext ic = new InitialContext(properties);    
    Queue queue = (Queue) ic.lookup(queueName);
    QueueSender sender = sendSession.createSender(queue);
    QueueReceiver receiver = receiverSession.createReceiver(queue);

You send the messages using Producer:
For Topics:
    publisher.publish(Message message); or
    publisher.publish(Topic topic, Message message, int deliveryMode, int priority, long timeToLive);

For Queues:
    sender.send(Message message); or
    sender.send(Queue queue, Message message, int deliveryMode, int priority, long timeToLive);

The parameters DeliveryMode and TimeToLive are important from performance perspective. You can give values 
for these parameters when you configure ConnectionFactory or Destination or before sending a message.

When you send the message using send() method or when you configure the delivery mode and timeToLive parameters 
in ConnectionFactory or Destination, consider the following optimization techniques to improve performance.
    1. Choose non-durable messages where appropriate
    2. Set TimeToLive value properly
    3. Receive messages asynchronously
    4. Close Producer/Consumer when finished

    Below are descriptions about above points mentioned:
    1. Choose non-durable messages where appropriate
            Delivery mode defines whether the message can be durable/persistent or non-durable/non-persistent. 
            This factor has an impact on the performance. This parameter ensures that message delivery is guaranteed. 
            For durable messages the delivery mode value is Deliverymode.PERSISTENT, for non-durable messages 
            delivery mode value is Deliverymode.NON_PERSISTENT.

        If you define the delivery mode as durable then the message is stored by the JMS server before delivering 
        it to the consumer. When using the durable delivery mode, each message has to be stored by the JMS server either 
        in the database or the file system depending on the vendor before delivery of message to consumer and 
        removed after delivery of message. This has a huge impact on the performance. So as far as possible 
        restrict the use of durable delivery mode unless and until absolutely necessary for your application 
        to avoid the overheads involved. The following lines will elaborate this:

    Durable/Persistent Message Flow:
        1) JMS Message Producer sends message to JMS Server.
        2) JMS Server store message to Persistent Storage.
        3) JMS Server sends acknoledgement to Producer.
        4) Consumer receives message from Server.
        5) Consumer sends acknoledgement to Server.
        6) Server removes message from Persistent storage.  

    Non-Durable/Non-Persistent Message Flow:
        1) JMS Message Producer sends message to JMS Server.
        2) JMS Server sends acknoledgement to Producer.
        3) Consumer receives message from Server.
        4) Consumer sends acknoledgement to Server.

    2. Set TimeToLive value properly
        You can set the age of the message by setting the Time_To_Live parameter after which the message 
        expires. By default the message never expires, set optimal message age so as to reduce memory overhead, 
        thus improving performance.

    3. Receive messages asynchronously
        You can receive messages synchronously or asynchronously. For recieving asynchronous messages you need 
        to implement the MessageListener interface and hence onMessage() method should be implemented. 
        
        For receiving Synchronous messages you need to use anyone of the following methods of MessageConsumer:
            receive();  //This method blocks the call until it receives the next message
            receive(long timeout); //This method blocks till a timeout occurs
            receiveNoWait();   //This method never blocks.
        
        For Asynchronous:
            queueCon = queueConFactory.createQueueConnection();
            queueSession = queueCon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
            queueSender = queueSession.createSender(queue);
            queueSession.setMessageListener(new MyMessageListener());  //MyMessageListener implements MessageListener
            queueCon.start();

               public class MyMessageListener implements javax.jms.MessageListener {
              public void onMessage(Message message){
                try{
                    TextMessage textMessage = (TextMessage)message;
                    String text = textMessage.getText( );
                    System.out.println(text);
                } catch (JMSException jmse){jmse.printStackTrace( );}
              }            
            }

    4. Close Producer/Consumer when finished
        It is always better to remove an object as early as possible when finished with, although closing a 
        connection class closes session and Producer/Consumer, this allows the garbage collector to remove objects.


Optimization with Message:
    A Message object contains information that is passed between applications. As per performance perspective, 
    you need to consider the type of message-Text, Object, Byte, Stream or Map message. The message size depends 
    on the type of message you choose which in turn has an impact on the performance.

    Less size gives better performance and vice versa. For example, ByteMessage takes less memory than TextMessage.
    ObjectMessage carries a serialized java object, when you choose ObjectMessage you need to use 'transient' 
    keyword for variables that need not be sent over the network to reduce overhead. 

    In summary choose message type carefully to avoid unnecessary memory overhead.

Also any Vendor specific optimization features like providing connection pool feature for ConnectionFactory and 
Sessions, Support for Clustering for scalability etc would be beneficial for application.

Key Points
    a. Start Consumer connection first then start producer connection.
    b. Use concurrent processing of messages.
    c. Close the Connection when finished.
    d. Choose either DUPS_OK_ACKNOWLEDGE or AUTO_ACKNOWLEDGE rather than CLIENT_ACKNOWLEDGE.
    e. Control transactions by using separate transactional session for transactional messages 
        and non-transactional session for non-transactional messages.
    f. Close session object when finished.
    g. Make Destination with less capacity and send messages accordingly.
    h. Set high Redelivery delay time to reduce network traffic.
    i. Set less Redelivery limit for reducing number of message hits.
    j. Choose non-durable messages wherever appropriate to avoid persistence overhead.
    k. Set optimal message age (TimeToLive value).
    l. Receive messages asynchronously.
    m. Close Producer/Consumer when finished.
    n. Choose message type carefully to avoid unnecessary memory overhead.
    o. Use 'transient' key word for variables of ObjectMessage which need not be transferred.

-----------------------------------------END---------------------------------------

No comments:

Post a Comment