Thursday, December 11, 2008

Spring, JMS, Topics, Queues, Templating...

I recently had the opportunity to write some software at work that needed the ability to use JMS Message Broker to send asynchronous messages around my cluster and to perform data lookup and aggregation; then publish batches of aggregate data back out to another queue so that another cluster could then take that data and present it to the end user. I found that there was a lot of information out there to get this all started, but nothing too complete. So here is my attempt to take all the information that was out there and aggregate it so that someone else out there looking for this same information doesn't have to spend the hours it took to get all of the pieces together.

Summary of Technologies Used
JMS - Java Message Service
Topic - A distribution mechanism for publishing messages that are delivered to multiple subscribers.
Queue - A staging area that contains messages that have been sent and are waiting to be read. As the name queue suggests, the messages are delivered in the order
sent. A message is removed from the queue once it has been read.
Spring - using 2.5 at time of this article ( of particular interest JmsTemplate )
Tomcat - using Tomcat 6.0
JNDI - Java Naming and Directory Interface
ActiveMQ - Powerful open source Message Broker. Free, Open and easy to tweak and run. Great documentation! Always important.
Java - JSE 6 and Servlet Spec 2.4


Getting started with ActiveMQ really took just a few minutes.
  1. Get the binary distribution from ActiveMQ on Apache.
  2. Unzip the binaries to your place of choice.
  3. cd into the $ACTIVEMQ_HOME/bin and run activemq script.
  4. then check http://localhost:8161/admin/ to see that it is running. This is the cool admin console that comes with ActiveMQ. Great!

So now that we have our Message Queue Server up and running we can move on to how to use Spring to wire up your application to use the Queue.

Creating a Message Producer
First we need to have a connection to our super secret JMS Broker. So to get this into our Spring configuration we add the following:

<bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" ref="brokerLocation" />
</bean>
</property>
</bean>


brokerLocation is a reference to my JNDI config defined like so in my applicationContext.xml:

<jee:jndi-lookup id="brokerLocation" lookup-on-startup="true" jndi-name="jms/brokerLocation" resource-ref="true" cache="true" />


Spring's support for JNDI is truly amazing and painless... Definitely check it out. This is literally all you have to do reference JNDI values in your Spring application!


<Environment name="jms/brokerLocation" override="false" type="java.lang.String" value="tcp://some.host.at.your.com:61616"/>


So all this wires up the connection factory to use for, well, connecting to your message broker.

The cool thing with this, is that if you decide at some point to change what message broker you were using this is the only thing you would have to change in your application.

The next step is to define a name for the queue you want to put messages on. This is what you do. The constructor-arg is the name of the queue that you want to send messages to.

<bean id="queueOne" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="QUEUE.ONE" />
</bean>


Then simply define the class that will be used to create the messages to be place on the queue. You need to have a handle on a javax.jmx.Queue and a org.springframework.jms.core.JmsTemplate. The template is then used to send messages to the broker. Below you will see this set up. The produce method is called from an external class with what ever you want to put on the Queue. Here I am passing in 2 String values in and then using a javax.jms.MapMessage to create a message of key/value pairs to be placed on the queue.

package com.foo.jms;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.Session;

import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class TestProducer {

private Logger logger = Logger.getLogger(TestProducer.class);
private JmsTemplate jmsTemplate;
private Queue queue;

public void produce(final String foo, final String bar) {
this.jmsTemplate.send(this.queue, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
MapMessage mm = session.createMapMessage();
mm.setString("foo", foo);
mm.setString("bar", bar);
return mm;
}
});
logger.info("Message sent to message broker");
}

@Required
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.jmsTemplate = new JmsTemplate(connectionFactory);
}

@Required
public void setQueue(Queue queue) {
this.queue = queue;
}
}



Here is how we wire up the above class with Spring. You can see that we pass in the connectionFactory that we defined earlier and a reference to the Queue that we also defined above.


<bean name="testProducer" class="com.foo.jms.TestProducer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="queue" ref="queueOne" />
</bean>



Creating a Message Consumer
This is going to be fairly similar to creating a message producer. So now we define the message listener or consumer; whatever you want to call it.


<bean name="fooListener" class="com.foo.jms.FooListener" />



This looks a little like WTF?! Right? Yeah I thought so as well. So here is the class com.foo.jms.FooListener:


package com.foo.jms;

import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;

import org.apache.log4j.Logger;


public class FooListener implements MessageListener {

private Logger logger = Logger.getLogger(FooListener.class);

@Override
public void onMessage(Message message) {
try {
if (message instanceof MapMessage) {
MapMessage mapMessage = (MapMessage) message;
String foo = mapMessage.getString("foo");
String bar = mapMessage.getString("bar");
//...do something with the data you just got
}
} catch (Exception e) {
logger.error(e);
}
}
}



The cool thing here is that your Message Listener just has to implement the javax.jms.MessageListener interface and then you wire that class up in Spring and then add this to your beans tag xmlns:jms="http://www.springframework.org/schema/jms" and in the xmlns:context add "http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-2.5.xsd". Also you need to specify the destination which is just the name of the queue that you want to listen for. Like this:

<jms:listener-container connection-factory="connectionFactory" cache="none">
<jms:listener destination="QUEUE.ONE" ref="fooListener" />
</jms:listener-container>



Now we are all set to send and receive messages to/from the Message Broker. But how about subscribing to a topic. The reason for doing this is so that all N of your servers in your cluster pick up on the message that gets published to the topic. Here is how we publish messages to the topic queue.

Publishing to a Topic
First we need to define an instance of a javax.jms.Topic by using the org.apache.activemq.ActiveMQTopic implemementation of the Topic interface.


<bean id="topicOne" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="TOPIC.ONE" />
</bean>



Now that we have the Topic defined we need to pass that to a class that knows how to publish to the topic. Here is the configuration:


<bean name="topicPublisher" class="com.foo.jms.TopicPublisher">
<property name="connectionFactory" ref="connectionFactory" />
<property name="topic" ref="topicOne" />
</bean>



Here we are using the same config as defined earlier for the connectionFactory. We just need to pass the reference in along with a reference to the Topic that we just created.

Here is what you class will look like:


package com.foo.jms;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;

import org.springframework.beans.factory.annotation.Required;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class TopicPublisher {

private JmsTemplate jmsTemplate;
private Topic topic;

public void produce(final Object object) {
this.jmsTemplate.send(this.topic, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
ObjectMessage mm = session.createObjectMessage();
mm.setObject(object);
return mm;
}
});
}

@Required
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.jmsTemplate = new JmsTemplate(connectionFactory);
}

@Required
public void setTopic(Topic topic) {
this.topic = topic;
}
}



Now this looks pretty much identical to the TestProducer we defined earlier and it is, except the fact that it is using a javax.jms.Topic instead of a javax.jms.Queue. You can push any Serializable object on to the Queue/Topic.

Subscribing to a Topic
The last portion here is subscribing to this Topic we just created. This is similar to listening to a Queue, the difference here is the destination-type is set to "topic". With this listener defined we are now 'subscribed' to the Topic.


<bean name="topicListener" class="com.foo.jms.TopicListener"/>

<jms:listener-container connection-factory="connectionFactory" cache="none" destination-type="topic">
<jms:listener destination="TOPIC.ONE" ref="topicListener" />
</jms:listener-container>



Here is the code. Same as before pretty much. Implement MessageListener and Spring takes of registering with the Message Broker. When a new message has been published to the topic, this listener will pick it up and automagically call the onMessage method defined below in the TopicListener class.


package com.foo.jms;

import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;

import org.apache.log4j.Logger;

public class TopicListener implements MessageListener {

private Logger logger = Logger.getLogger(GeoDisplayListener.class);

@Override
public void onMessage(Message message) {
try {
if (message instanceof ObjectMessage) {
ObjectMessage mapMessage = (ObjectMessage) message;
Object obj = mapMessage.getObject();
//... do something with the data here
}
} catch (Exception e) {
logger.error(e);
}
}
}



Summary
So adding JMS support into your Spring-based application is pretty easy and painless. The one thing that you should take note is the difference between a Queue and a Topic. If you don't need multiple machine to get the same message as it is placed on the Message Broker then use a Queue. That is a Queue holds onto a message just long enough for 1 message listener to pick it off. Otherwise, if you have the need to have a message to be picked off by a cluster of machines then use the Topic. With a Topic, the Message Broker knows that it needs to hold onto the message until all subscribers of the Topic get their copy of the message.

Hope this was helpful to someone. This was definitely a fun exercise in resume building. ;)




12 comments:

Eduardo Luttner said...

great article

Arun said...

i was using linux (ubuntu) for this and seems for active mq you need to have multicast enabled...the following should get you out of woods.
"sudo route add -net 224.0.0.0 netmask 240.0.0.0 dev eth0" what it does is enable the loopback for multicast addresses...for more details look here

Unknown said...

Thank you for posting such a coherent article on this subject matter. This is the first I've come across that lays out it out this simply and it works great. You rock.

Helena Hjertén said...

Great writing! The way you put it all together is very useful, I wish I found this a few hours ago.

I just struggled to get it to work. I am using JBoss, Maven, Spring (and Seam) but now it finally works.

We couldn't get the jms:listener-container configuration to work. The Consumers refused to pick the topic I choose with this configuration.

Had to use this instead:
(Don´t know how to go around the problems of using "greater than". Used @-sign instead...)

@bean id="myContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer" depends-on="myBroker"@
@property name="connectionFactory" ref="jmsConnectionFactory"/@
@property name="destination" ref="myTopic"/@
@property name="messageListener" ref="MyListener"/@
@/bean@

Now I need to configure one for each subscriber (which seems unnecessary), but I am googling right now to figure out how to solve it.

Thanks for you informative article. I will definitely come back to see what else you are sharing with us...

Unknown said...

Thanks a lot, it was a lifesaver for me :-)

Kshitiz said...

Hi,

Thanks for such a great post.....

I am trying to get ur example running for both a topic and a queue....but getting the following error

log4j:WARN No appenders could be found for logger (org.springframework.context.support.ClassPathXmlApplicationContext).
log4j:WARN Please initialize the log4j system properly.
Exception in thread "main" org.springframework.beans.factory.xml.XmlBeanDefinitionStoreException: Line 18 in XML document from class path resource [springJMSExample/spring.xml] is invalid; nested exception is org.xml.sax.SAXParseException: cvc-complex-type.2.4.a: Invalid content was found starting with element 'Environment'. One of '{"http://www.springframework.org/schema/beans":import, "http://www.springframework.org/schema/beans":alias, "http://www.springframework.org/schema/beans":bean, WC[##other:"http://www.springframework.org/schema/beans"]}' is expected.
at org.springframework.beans.factory.xml.XmlBeanDefinitionReader.doLoadBeanDefinitions(XmlBeanDefinitionReader.java:396)
at org.springframework.beans.factory.xml.XmlBeanDefinitionReader.loadBeanDefinitions(XmlBeanDefinitionReader.java:334)

my spring.xml is in the next post:

Kshitiz said...
This comment has been removed by the author.
Kshitiz said...
This comment has been removed by the author.
Kshitiz said...
This comment has been removed by the author.
Kshitiz said...

Line 18 in my spring.xml looks like

Environment name="jms/brokerLocation" override="false" type="java.lang.String" value="tcp://localhost:8161"/>

Kshitiz said...

By not using JNDI lookup and having a connection setting as









I was able to run your example ...this is of great help....thanks !

Kshitiz said...

can you throw some light on what is the use case of using Spring Integration API http://www.springsource.org/spring-integration with ActiveMQ.....

I meant to ask that in this example, we are simply using spring with activemq....what if we use spring integration and how does it matter to us ?