Implementing Request-Response with ActiveMQ and NMS
In my last article about using ActiveMQ and NMS, I demonstrated sending a message from a sender to a receiver, which is fine for a number of patterns including asynchronous interactions. Sometimes, however, the request-response paradigm is what you need.
Where do we start?
Our starting point is this article, which explains how to implement the request-response paradigm with JMS. The crux of this is to create a temporary queue and consumer per client on startup – the temporary queue creates a channel for the response. While there’s some sample code provided, it’s JMS code, so let’s figure out what that would look like using NMS. I’m assuming that you’ve got ActiveMQ installed and running. If not, follow the instructions in my last article.
The Client
Create a new Console application. I called mine ActiveMqRequestResponseConsole - you may be able to come up with a better name. This application is going to listen send a message and wait for the response. Add references to:
- Spring.Core
- ActiveMQ
- NMS
- Spring.Messaging.NMS
Here’s the code for the Program class:
using System;
using System.Collections.Generic;
using System.Text;
using ActiveMQ;
using Spring.Messaging.Nms;
using Spring.Messaging.Nms.Support.IDestinations;
using NMS;
namespace ActiveMqRequestResponseConsole
{
class Program
{
private const string URI = "tcp://localhost:61616";
private const string DESTINATION = "test.queue";
static void Main(string[] args)
{
ConnectionFactory connectionFactory = new ConnectionFactory(URI);
try
{
using (IConnection connection = connectionFactory.CreateConnection())
{
using (ISession session = connection.CreateSession())
{
ITemporaryQueue queue = session.CreateTemporaryQueue();
using (IMessageConsumer consumer = session.CreateConsumer(queue))
{
string text = "A message for you.";
ITextMessage message = session.CreateTextMessage(text);
message.NMSReplyTo = queue;
string correlationId = Guid.NewGuid().ToString();
message.NMSCorrelationID = correlationId;
using (IMessageProducer producer = session.CreateProducer())
{
NmsDestinationAccessor destinationResolver = new NmsDestinationAccessor();
IDestination destination = destinationResolver.ResolveDestinationName(session, DESTINATION);
producer.Send(destination, message);
}
IMessage response = consumer.Receive(TimeSpan.FromSeconds(60));
ITextMessage textMessage = response as ITextMessage;
Console.WriteLine(textMessage.Text);
}
}
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
Console.WriteLine("Press any key to exit...");
Console.ReadLine();
}
}
}
This is a little different from what we did before. We create a ConnectionFactory and the create an IConnection. Then, we create an ISession. According to the JMS documentation, a Session is “a single threaded context for producing and consuming messages.” It provides factory methods for MessageProducers and MessageConsumers. A Session can also be used to create Queues. Using the session, we create an ITemporaryQueue, which we use as the ReplyTo property of the message that we are about to send. Temporary destinations (like this queue we have created) are unique to a connection. We also use the session to create an IMessageConsumer – it is this consumer that will be used to receive the response.
With all this set up, we’re ready to create and send a message. To do this, we create an IMessageProducer and an ITextMessage. The important things to note are setting the NMSReplyTo property and the NMSCorrelationID property. The NMSReplyTo property will inform the server how to reply to the message we send, and the NMSCorrelationID is used to tie responses up with the corresponding requests (in this code example, we’re only going to send one message, so correlation is trivial.) All that’s left is to create an IDestination for the message (using the NMSDestinationAccessor provided by the Spring.Messaging.NMS framework.)
Once the message has been sent, we instruct the MessageConsumer to receive the response (I’ve specified a generous if not excessive timeout of 60 seconds.) Once the response has come back, it gets written out to the console.
The Server
Add another Console application to the solution. I called this one ActiveMqServer and I renamed the Program class to Server to reduce the chances of confusing myself. Admittedly, the name of ActiveMqServer.Server needs some more thought. You need to add the same references that you added to the client console application. Here’s the code for the Server class:
using System;
using System.Collections.Generic;
using System.Text;
using ActiveMQ;
using Spring.Messaging.Nms;
using Spring.Messaging.Nms.Listener;
namespace ActiveMqServer
{
class Server
{
private const string URI = "tcp://localhost:61616";
private const string DESTINATION = "test.queue";
static void Main(string[] args)
{
try
{
ConnectionFactory connectionFactory = new ConnectionFactory(URI);
using (SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer())
{
listenerContainer.ConnectionFactory = connectionFactory;
listenerContainer.DestinationName = DESTINATION;
listenerContainer.MessageListener = new ActiveMqServer.Listener(listenerContainer);
listenerContainer.AfterPropertiesSet();
Console.WriteLine("Listener started.");
Console.WriteLine("Press any key to exit.");
Console.ReadLine();
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
}
}
}
This is very similar to the code in the last article. I’ve moved the creation of the SimpleMessageListenerContainer into the Main method so that it can be disposed of easily. The Listener class code is below:
using System;
using System.Collections.Generic;
using System.Text;
using Spring.Messaging.Nms;
using Spring.Messaging.Nms.Listener;
using NMS;
namespace ActiveMqServer
{
class Listener : IMessageListener
{
private readonly SimpleMessageListenerContainer container;
public Listener(SimpleMessageListenerContainer container)
{
this.container = container;
Console.WriteLine("Listener created.");
}
#region IMessageListener Members
public void OnMessage(IMessage message)
{
using (ISession session = this.container.SharedConnection.CreateSession())
{
Console.WriteLine("Message Received.");
ITextMessage textMessage = message as ITextMessage;
string incomingText = textMessage.Text;
Console.WriteLine("Message: {0}", incomingText);
string outgoingText = string.Format("Thanks for sending the following message: {0}", incomingText);
IDestination destination = message.NMSReplyTo;
if (destination != null)
{
ITextMessage response = session.CreateTextMessage(outgoingText);
response.NMSCorrelationID = message.NMSCorrelationID;
using (IMessageProducer producer = session.CreateProducer(destination))
{
producer.Send(response);
}
}
}
}
#endregion
}
}
As you can see, this Listener takes a reference to the container in the constructor. This reference is used to get access to the ISession, in order to create an IMessageProducer to send the response. The important things are to note are the use of the NMSReplyTo property of the incoming message as the IDestination for the response and the setting of the NMSCorrelationID property of the response to the NMSCorrelationID property of the incoming message.
Putting it all together
It’s time to run the code. The first thing to do is to ensure that ActiveMQ is running. (If you don’t you’ll get an exception in the client and you’ll notice that the exception handling in the server is, well, absent.) Next, right click on the Solution and select Set Start Up Projects. Choose Multiple startup projects and set the Action of each to be Start. It makes sense to have the server start before the client, so alter the start up order by moving the server to the top of the list.
Take a deep breath, we’re going in. Run the solution. All being well, you should see the message receipt in the server console window and the response message (“Thanks for sending the following message: A message for you.”) in the client console window.
Conclusion
When coming to messaging from an RPC or web service background, it isn’t necessarily obvious how to go about implementing the request-response paradigm. The code in this article lets you use ActiveMQ for the Request-Response paradigm. With a little refactoring and maybe a light sprinkling of encapsulation, the mechanics could be abstracted away from the calling code.
Hi.
How is the throughput? I’ve implemented an SSL tunnel and saw that throughput is one message per 20 seconds; and obviously it’s awful. Is there any problem that I did? I think there maybe some misunderstanding!?
I haven’t done any performance profiling. One message per 20 seconds does sound poor. I’d suggest trying the forums. I wonder what other people’s experience has been.
Reuse at least connection and performance will boost. Even java version of MessageListenerContainer openc and closed connection by default on every Recieve if not reconfigured.
Error 2 The property or indexer ‘Spring.Messaging.Nms.Listener.AbstractListenerContainer.SharedConnection’ cannot be used in this context because the get accessor is inaccessible C:\Documents and Settings\V644700\My Documents\Visual Studio 2008\Projects\ActiveMqRequestResponseConsole\ActiveMqServer\Listener.cs 26 39 ActiveMqServer
i am getting the below exception while compiling the …
The property or indexer ‘Spring.Messaging.Nms.Listener.AbstractListenerContainer.SharedConnection’ cannot be used in this context because the get accessor is inaccessible
any idea ??
I had the same SharedConnection problem you guys are experiencing. I’m not sure if this is the “correct” way of fixing it because I’m going through the same tutorial that you are but this is how I fixed mine.
Change the line
using (ISession session = this.container.SharedConnection.CreateSession())
to
using (ISession session = this.container.ConnectionFactory.CreateConnection().CreateSession())
Nathan Palmer
You show setting the NMSCorrelationID when sending the message and also when sending the response message but you don’t show how to receive based on NMSCorrelationID and I cannot find anywhere so far how to make my consumer do so. I want to be able to receive the response message based on this value. I have it working in JMS just fine, but am struggling with NMS.
Am setting my consumer like this, but it doesn’t seem to matter what the value is, it receives all messages:
using (IMessageConsumer consumer = session.CreateConsumer(destination, “NMSCorrelationID = abc” ))