Summary of Technologies Used
JMS - Java Message Service
Topic - A distribution mechanism for publishing messages that are delivered to multiple subscribers.
Queue - A staging area that contains messages that have been sent and are waiting to be read. As the name queue suggests, the messages are delivered in the order
sent. A message is removed from the queue once it has been read.
Spring - using 2.5 at time of this article ( of particular interest JmsTemplate )
Tomcat - using Tomcat 6.0
JNDI - Java Naming and Directory Interface
ActiveMQ - Powerful open source Message Broker. Free, Open and easy to tweak and run. Great documentation! Always important.
Java - JSE 6 and Servlet Spec 2.4
Getting started with ActiveMQ really took just a few minutes.
- Get the binary distribution from ActiveMQ on Apache.
- Unzip the binaries to your place of choice.
- cd into the $ACTIVEMQ_HOME/bin and run activemq script.
- then check http://localhost:8161/admin/ to see that it is running. This is the cool admin console that comes with ActiveMQ. Great!
Creating a Message Producer
First we need to have a connection to our super secret JMS Broker. So to get this into our Spring configuration we add the following:
<bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" ref="brokerLocation" />
</bean>
</property>
</bean>
brokerLocation is a reference to my JNDI config defined like so in my applicationContext.xml:
<jee:jndi-lookup id="brokerLocation" lookup-on-startup="true" jndi-name="jms/brokerLocation" resource-ref="true" cache="true" />
Spring's support for JNDI is truly amazing and painless... Definitely check it out. This is literally all you have to do reference JNDI values in your Spring application!
<Environment name="jms/brokerLocation" override="false" type="java.lang.String" value="tcp://some.host.at.your.com:61616"/>
So all this wires up the connection factory to use for, well, connecting to your message broker.
The cool thing with this, is that if you decide at some point to change what message broker you were using this is the only thing you would have to change in your application.
The next step is to define a name for the queue you want to put messages on. This is what you do. The constructor-arg is the name of the queue that you want to send messages to.
<bean id="queueOne" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="QUEUE.ONE" />
</bean>
Then simply define the class that will be used to create the messages to be place on the queue. You need to have a handle on a javax.jmx.Queue and a org.springframework.jms.core.JmsTemplate. The template is then used to send messages to the broker. Below you will see this set up. The produce method is called from an external class with what ever you want to put on the Queue. Here I am passing in 2 String values in and then using a javax.jms.MapMessage to create a message of key/value pairs to be placed on the queue.
package com.foo.jms;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
public class TestProducer {
private Logger logger = Logger.getLogger(TestProducer.class);
private JmsTemplate jmsTemplate;
private Queue queue;
public void produce(final String foo, final String bar) {
this.jmsTemplate.send(this.queue, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
MapMessage mm = session.createMapMessage();
mm.setString("foo", foo);
mm.setString("bar", bar);
return mm;
}
});
logger.info("Message sent to message broker");
}
@Required
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.jmsTemplate = new JmsTemplate(connectionFactory);
}
@Required
public void setQueue(Queue queue) {
this.queue = queue;
}
}
Here is how we wire up the above class with Spring. You can see that we pass in the connectionFactory that we defined earlier and a reference to the Queue that we also defined above.
<bean name="testProducer" class="com.foo.jms.TestProducer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="queue" ref="queueOne" />
</bean>
Creating a Message Consumer
This is going to be fairly similar to creating a message producer. So now we define the message listener or consumer; whatever you want to call it.
<bean name="fooListener" class="com.foo.jms.FooListener" />
This looks a little like WTF?! Right? Yeah I thought so as well. So here is the class com.foo.jms.FooListener:
package com.foo.jms;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.apache.log4j.Logger;
public class FooListener implements MessageListener {
private Logger logger = Logger.getLogger(FooListener.class);
@Override
public void onMessage(Message message) {
try {
if (message instanceof MapMessage) {
MapMessage mapMessage = (MapMessage) message;
String foo = mapMessage.getString("foo");
String bar = mapMessage.getString("bar");
//...do something with the data you just got
}
} catch (Exception e) {
logger.error(e);
}
}
}
The cool thing here is that your Message Listener just has to implement the javax.jms.MessageListener interface and then you wire that class up in Spring and then add this to your beans tag xmlns:jms="http://www.springframework.org/schema/jms" and in the xmlns:context add "http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-2.5.xsd". Also you need to specify the destination which is just the name of the queue that you want to listen for. Like this:
<jms:listener-container connection-factory="connectionFactory" cache="none">
<jms:listener destination="QUEUE.ONE" ref="fooListener" />
</jms:listener-container>
Now we are all set to send and receive messages to/from the Message Broker. But how about subscribing to a topic. The reason for doing this is so that all N of your servers in your cluster pick up on the message that gets published to the topic. Here is how we publish messages to the topic queue.
Publishing to a Topic
First we need to define an instance of a javax.jms.Topic by using the org.apache.activemq.ActiveMQTopic implemementation of the Topic interface.
<bean id="topicOne" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="TOPIC.ONE" />
</bean>
Now that we have the Topic defined we need to pass that to a class that knows how to publish to the topic. Here is the configuration:
<bean name="topicPublisher" class="com.foo.jms.TopicPublisher">
<property name="connectionFactory" ref="connectionFactory" />
<property name="topic" ref="topicOne" />
</bean>
Here we are using the same config as defined earlier for the connectionFactory. We just need to pass the reference in along with a reference to the Topic that we just created.
Here is what you class will look like:
package com.foo.jms;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
public class TopicPublisher {
private JmsTemplate jmsTemplate;
private Topic topic;
public void produce(final Object object) {
this.jmsTemplate.send(this.topic, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
ObjectMessage mm = session.createObjectMessage();
mm.setObject(object);
return mm;
}
});
}
@Required
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.jmsTemplate = new JmsTemplate(connectionFactory);
}
@Required
public void setTopic(Topic topic) {
this.topic = topic;
}
}
Now this looks pretty much identical to the TestProducer we defined earlier and it is, except the fact that it is using a javax.jms.Topic instead of a javax.jms.Queue. You can push any Serializable object on to the Queue/Topic.
Subscribing to a Topic
The last portion here is subscribing to this Topic we just created. This is similar to listening to a Queue, the difference here is the destination-type is set to "topic". With this listener defined we are now 'subscribed' to the Topic.
<bean name="topicListener" class="com.foo.jms.TopicListener"/>
<jms:listener-container connection-factory="connectionFactory" cache="none" destination-type="topic">
<jms:listener destination="TOPIC.ONE" ref="topicListener" />
</jms:listener-container>
Here is the code. Same as before pretty much. Implement MessageListener and Spring takes of registering with the Message Broker. When a new message has been published to the topic, this listener will pick it up and automagically call the onMessage method defined below in the TopicListener class.
package com.foo.jms;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import org.apache.log4j.Logger;
public class TopicListener implements MessageListener {
private Logger logger = Logger.getLogger(GeoDisplayListener.class);
@Override
public void onMessage(Message message) {
try {
if (message instanceof ObjectMessage) {
ObjectMessage mapMessage = (ObjectMessage) message;
Object obj = mapMessage.getObject();
//... do something with the data here
}
} catch (Exception e) {
logger.error(e);
}
}
}
Summary
So adding JMS support into your Spring-based application is pretty easy and painless. The one thing that you should take note is the difference between a Queue and a Topic. If you don't need multiple machine to get the same message as it is placed on the Message Broker then use a Queue. That is a Queue holds onto a message just long enough for 1 message listener to pick it off. Otherwise, if you have the need to have a message to be picked off by a cluster of machines then use the Topic. With a Topic, the Message Broker knows that it needs to hold onto the message until all subscribers of the Topic get their copy of the message.
Hope this was helpful to someone. This was definitely a fun exercise in resume building. ;)