Thursday, December 11, 2008

Spring, JMS, Topics, Queues, Templating...

I recently had the opportunity to write some software at work that needed the ability to use JMS Message Broker to send asynchronous messages around my cluster and to perform data lookup and aggregation; then publish batches of aggregate data back out to another queue so that another cluster could then take that data and present it to the end user. I found that there was a lot of information out there to get this all started, but nothing too complete. So here is my attempt to take all the information that was out there and aggregate it so that someone else out there looking for this same information doesn't have to spend the hours it took to get all of the pieces together.

Summary of Technologies Used
JMS - Java Message Service
Topic - A distribution mechanism for publishing messages that are delivered to multiple subscribers.
Queue - A staging area that contains messages that have been sent and are waiting to be read. As the name queue suggests, the messages are delivered in the order
sent. A message is removed from the queue once it has been read.
Spring - using 2.5 at time of this article ( of particular interest JmsTemplate )
Tomcat - using Tomcat 6.0
JNDI - Java Naming and Directory Interface
ActiveMQ - Powerful open source Message Broker. Free, Open and easy to tweak and run. Great documentation! Always important.
Java - JSE 6 and Servlet Spec 2.4


Getting started with ActiveMQ really took just a few minutes.
  1. Get the binary distribution from ActiveMQ on Apache.
  2. Unzip the binaries to your place of choice.
  3. cd into the $ACTIVEMQ_HOME/bin and run activemq script.
  4. then check http://localhost:8161/admin/ to see that it is running. This is the cool admin console that comes with ActiveMQ. Great!

So now that we have our Message Queue Server up and running we can move on to how to use Spring to wire up your application to use the Queue.

Creating a Message Producer
First we need to have a connection to our super secret JMS Broker. So to get this into our Spring configuration we add the following:

<bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" ref="brokerLocation" />
</bean>
</property>
</bean>


brokerLocation is a reference to my JNDI config defined like so in my applicationContext.xml:

<jee:jndi-lookup id="brokerLocation" lookup-on-startup="true" jndi-name="jms/brokerLocation" resource-ref="true" cache="true" />


Spring's support for JNDI is truly amazing and painless... Definitely check it out. This is literally all you have to do reference JNDI values in your Spring application!


<Environment name="jms/brokerLocation" override="false" type="java.lang.String" value="tcp://some.host.at.your.com:61616"/>


So all this wires up the connection factory to use for, well, connecting to your message broker.

The cool thing with this, is that if you decide at some point to change what message broker you were using this is the only thing you would have to change in your application.

The next step is to define a name for the queue you want to put messages on. This is what you do. The constructor-arg is the name of the queue that you want to send messages to.

<bean id="queueOne" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="QUEUE.ONE" />
</bean>


Then simply define the class that will be used to create the messages to be place on the queue. You need to have a handle on a javax.jmx.Queue and a org.springframework.jms.core.JmsTemplate. The template is then used to send messages to the broker. Below you will see this set up. The produce method is called from an external class with what ever you want to put on the Queue. Here I am passing in 2 String values in and then using a javax.jms.MapMessage to create a message of key/value pairs to be placed on the queue.

package com.foo.jms;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.Session;

import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class TestProducer {

private Logger logger = Logger.getLogger(TestProducer.class);
private JmsTemplate jmsTemplate;
private Queue queue;

public void produce(final String foo, final String bar) {
this.jmsTemplate.send(this.queue, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
MapMessage mm = session.createMapMessage();
mm.setString("foo", foo);
mm.setString("bar", bar);
return mm;
}
});
logger.info("Message sent to message broker");
}

@Required
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.jmsTemplate = new JmsTemplate(connectionFactory);
}

@Required
public void setQueue(Queue queue) {
this.queue = queue;
}
}



Here is how we wire up the above class with Spring. You can see that we pass in the connectionFactory that we defined earlier and a reference to the Queue that we also defined above.


<bean name="testProducer" class="com.foo.jms.TestProducer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="queue" ref="queueOne" />
</bean>



Creating a Message Consumer
This is going to be fairly similar to creating a message producer. So now we define the message listener or consumer; whatever you want to call it.


<bean name="fooListener" class="com.foo.jms.FooListener" />



This looks a little like WTF?! Right? Yeah I thought so as well. So here is the class com.foo.jms.FooListener:


package com.foo.jms;

import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;

import org.apache.log4j.Logger;


public class FooListener implements MessageListener {

private Logger logger = Logger.getLogger(FooListener.class);

@Override
public void onMessage(Message message) {
try {
if (message instanceof MapMessage) {
MapMessage mapMessage = (MapMessage) message;
String foo = mapMessage.getString("foo");
String bar = mapMessage.getString("bar");
//...do something with the data you just got
}
} catch (Exception e) {
logger.error(e);
}
}
}



The cool thing here is that your Message Listener just has to implement the javax.jms.MessageListener interface and then you wire that class up in Spring and then add this to your beans tag xmlns:jms="http://www.springframework.org/schema/jms" and in the xmlns:context add "http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-2.5.xsd". Also you need to specify the destination which is just the name of the queue that you want to listen for. Like this:

<jms:listener-container connection-factory="connectionFactory" cache="none">
<jms:listener destination="QUEUE.ONE" ref="fooListener" />
</jms:listener-container>



Now we are all set to send and receive messages to/from the Message Broker. But how about subscribing to a topic. The reason for doing this is so that all N of your servers in your cluster pick up on the message that gets published to the topic. Here is how we publish messages to the topic queue.

Publishing to a Topic
First we need to define an instance of a javax.jms.Topic by using the org.apache.activemq.ActiveMQTopic implemementation of the Topic interface.


<bean id="topicOne" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="TOPIC.ONE" />
</bean>



Now that we have the Topic defined we need to pass that to a class that knows how to publish to the topic. Here is the configuration:


<bean name="topicPublisher" class="com.foo.jms.TopicPublisher">
<property name="connectionFactory" ref="connectionFactory" />
<property name="topic" ref="topicOne" />
</bean>



Here we are using the same config as defined earlier for the connectionFactory. We just need to pass the reference in along with a reference to the Topic that we just created.

Here is what you class will look like:


package com.foo.jms;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;

import org.springframework.beans.factory.annotation.Required;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class TopicPublisher {

private JmsTemplate jmsTemplate;
private Topic topic;

public void produce(final Object object) {
this.jmsTemplate.send(this.topic, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
ObjectMessage mm = session.createObjectMessage();
mm.setObject(object);
return mm;
}
});
}

@Required
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.jmsTemplate = new JmsTemplate(connectionFactory);
}

@Required
public void setTopic(Topic topic) {
this.topic = topic;
}
}



Now this looks pretty much identical to the TestProducer we defined earlier and it is, except the fact that it is using a javax.jms.Topic instead of a javax.jms.Queue. You can push any Serializable object on to the Queue/Topic.

Subscribing to a Topic
The last portion here is subscribing to this Topic we just created. This is similar to listening to a Queue, the difference here is the destination-type is set to "topic". With this listener defined we are now 'subscribed' to the Topic.


<bean name="topicListener" class="com.foo.jms.TopicListener"/>

<jms:listener-container connection-factory="connectionFactory" cache="none" destination-type="topic">
<jms:listener destination="TOPIC.ONE" ref="topicListener" />
</jms:listener-container>



Here is the code. Same as before pretty much. Implement MessageListener and Spring takes of registering with the Message Broker. When a new message has been published to the topic, this listener will pick it up and automagically call the onMessage method defined below in the TopicListener class.


package com.foo.jms;

import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;

import org.apache.log4j.Logger;

public class TopicListener implements MessageListener {

private Logger logger = Logger.getLogger(GeoDisplayListener.class);

@Override
public void onMessage(Message message) {
try {
if (message instanceof ObjectMessage) {
ObjectMessage mapMessage = (ObjectMessage) message;
Object obj = mapMessage.getObject();
//... do something with the data here
}
} catch (Exception e) {
logger.error(e);
}
}
}



Summary
So adding JMS support into your Spring-based application is pretty easy and painless. The one thing that you should take note is the difference between a Queue and a Topic. If you don't need multiple machine to get the same message as it is placed on the Message Broker then use a Queue. That is a Queue holds onto a message just long enough for 1 message listener to pick it off. Otherwise, if you have the need to have a message to be picked off by a cluster of machines then use the Topic. With a Topic, the Message Broker knows that it needs to hold onto the message until all subscribers of the Topic get their copy of the message.

Hope this was helpful to someone. This was definitely a fun exercise in resume building. ;)




Wednesday, December 3, 2008

Spring, Filters and Configuration easier than you think.

So I needed to add a filter for some cool stuff in my application.

First off I needed to get my filter to play nicely with Spring, so I needed to do the following in my web.xml

<filter>
<filter-name>myCoolNewFilter</filter-name>
<filter-class>org.springframework.web.filter.DelegatingFilterProxy</filter-class>
<init-param>
<param-name>targetBeanName</param-name>
<param-value>myCoolNewFilterSpringBean</param-value>
</init-param>
</filter>

So then I go ahead and open up my applicationContext.xml file and wire up the bean like normal

<bean name="myCoolNewFilterSpringBean" class="com.mycompany.filter.MyCoolNewFilter" />

Notice that the bean name in the app context matches the param-value in the filter config: myCoolNewFilterSpringBean

Then instead of trying to do URL patterns for the config of the filter I just tried the following:

<filter-mapping>
<filter-name>myCoolNewFilterSpringBean</filter-name>
<servlet-name>dispatcher</servlet-name>
</filter-mapping>

Which maps this filter to all filter all calls being directed to my dispatcher servlet! No nasty or convoluted URL patterns... Just easily readable servlet names!

Kill your database! Pt. 2

Friend of mine beat me to it... check out the post Kill Your Database with Terracotta over at Will Code 4 Beer.

It is an interesting article challenging the idea that we need a database to begin with. There may be an excuse for a database in some cases, but for many applications it is overkill. When you need a super fast, scalable application to hold on to just enough data then Terracotta can store your objects to disk and allow your entire cluster to share state through the shared Heap. It is an interesting read, so go check it out.