Re.Mark

My Life As A Blog

Implementing Request-Response with ActiveMQ and NMS

with 20 comments

In my last article about using ActiveMQ and NMS, I demonstrated sending a message from a sender to a receiver, which is fine for a number of patterns including asynchronous interactions.  Sometimes, however, the request-response paradigm is what you need.

Where do we start?
Our starting point is this article, which explains how to implement the request-response paradigm with JMS.  The crux of this is to create a temporary queue and consumer per client on startup – the temporary queue creates a channel for the response.  While there’s some sample code provided, it’s JMS code, so let’s figure out what that would look like using NMS.  I’m assuming that you’ve got ActiveMQ installed and running.  If not, follow the instructions in my last article.

The Client
Create a new Console application.  I called mine ActiveMqRequestResponseConsole - you may be able to come up with a better name.  This application is going to listen send a message and wait for the response.  Add references to:

  1. Spring.Core
  2. ActiveMQ
  3. NMS
  4. Spring.Messaging.NMS

Here’s the code for the Program class:

using System;
using System.Collections.Generic;
using System.Text;

using ActiveMQ;
using Spring.Messaging.Nms;
using Spring.Messaging.Nms.Support.IDestinations;
using NMS;

namespace ActiveMqRequestResponseConsole
{
    class Program
    {
        private const string URI = "tcp://localhost:61616";
        private const string DESTINATION = "test.queue";

        static void Main(string[] args)
        {
            ConnectionFactory connectionFactory = new ConnectionFactory(URI);
            try
            {
                using (IConnection connection = connectionFactory.CreateConnection())
                {
                    using (ISession session = connection.CreateSession())
                    {
                        ITemporaryQueue queue = session.CreateTemporaryQueue();
                        using (IMessageConsumer consumer = session.CreateConsumer(queue))
                        {
                            string text = "A message for you.";
                            ITextMessage message = session.CreateTextMessage(text);
                            message.NMSReplyTo = queue;
                            string correlationId = Guid.NewGuid().ToString();
                            message.NMSCorrelationID = correlationId;
                            using (IMessageProducer producer = session.CreateProducer())
                            {
                                NmsDestinationAccessor destinationResolver = new NmsDestinationAccessor();
                                IDestination destination = destinationResolver.ResolveDestinationName(session, DESTINATION); 
                                producer.Send(destination, message);
                            }
                            IMessage response = consumer.Receive(TimeSpan.FromSeconds(60));
                            ITextMessage textMessage = response as ITextMessage;
                            Console.WriteLine(textMessage.Text);
                        }

                    }
                }

            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }
            Console.WriteLine("Press any key to exit...");
            Console.ReadLine();
        }
    }
}

This is a little different from what we did before.  We create a ConnectionFactory and the create an IConnection.  Then, we create an ISession.  According to the JMS documentation, a Session is “a single threaded context for producing and consuming messages.”  It provides factory methods for MessageProducers and MessageConsumers.  A Session can also be used to create Queues.  Using the session, we create an ITemporaryQueue, which we use as the ReplyTo property of the message that we are about to send.  Temporary destinations (like this queue we have created) are unique to a connection. We also use the session to create an IMessageConsumer – it is this consumer that will be used to receive the response.

With all this set up, we’re ready to create and send a message.  To do this, we create an IMessageProducer and an ITextMessage.  The important things to note are setting the NMSReplyTo property and the NMSCorrelationID property.  The NMSReplyTo property will inform the server how to reply to the message we send, and the NMSCorrelationID is used to tie responses up with the corresponding requests (in this code example, we’re only going to send one message, so correlation is trivial.)  All that’s left is to create an IDestination for the message (using the NMSDestinationAccessor provided by the Spring.Messaging.NMS framework.) 

Once the message has been sent, we instruct the MessageConsumer to receive the response (I’ve specified a generous if not excessive timeout of 60 seconds.)  Once the response has come back, it gets written out to the console.

The Server
Add another Console application to the solution.  I called this one ActiveMqServer and I renamed the Program class to Server to reduce the chances of confusing myself.  Admittedly, the name of ActiveMqServer.Server needs some more thought.  You need to add the same references that you added to the client console application.  Here’s the code for the Server class:

using System;
using System.Collections.Generic;
using System.Text;

using ActiveMQ;
using Spring.Messaging.Nms;
using Spring.Messaging.Nms.Listener;

namespace ActiveMqServer
{
    class Server
    {
        private const string URI = "tcp://localhost:61616";
        private const string DESTINATION = "test.queue";

        static void Main(string[] args)
        {
            try
            {
                ConnectionFactory connectionFactory = new ConnectionFactory(URI);
                using (SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer())
                {
                    listenerContainer.ConnectionFactory = connectionFactory;
                    listenerContainer.DestinationName = DESTINATION;
                    listenerContainer.MessageListener = new ActiveMqServer.Listener(listenerContainer);
                    listenerContainer.AfterPropertiesSet();
                    Console.WriteLine("Listener started.");
                    Console.WriteLine("Press any key to exit.");
                    Console.ReadLine();
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }
        }
    }
}

 This is very similar to the code in the last article.  I’ve moved the creation of the SimpleMessageListenerContainer into the Main method so that it can be disposed of easily.  The Listener class code is below:

using System;
using System.Collections.Generic;
using System.Text;

using Spring.Messaging.Nms;
using Spring.Messaging.Nms.Listener;
using NMS;

namespace ActiveMqServer
{
    class Listener : IMessageListener
    {
        private readonly SimpleMessageListenerContainer container;

        public Listener(SimpleMessageListenerContainer container)
        {
            this.container = container;
            Console.WriteLine("Listener created.");
        }

        #region IMessageListener Members

        public void OnMessage(IMessage message)
        {
            using (ISession session = this.container.SharedConnection.CreateSession())
            {
                Console.WriteLine("Message Received.");
                ITextMessage textMessage = message as ITextMessage;
                string incomingText = textMessage.Text;
                Console.WriteLine("Message: {0}", incomingText);
                string outgoingText = string.Format("Thanks for sending the following message: {0}", incomingText);

                IDestination destination = message.NMSReplyTo;
                if (destination != null)
                {
                    ITextMessage response = session.CreateTextMessage(outgoingText);
                    response.NMSCorrelationID = message.NMSCorrelationID;
                    using (IMessageProducer producer = session.CreateProducer(destination))
                    {
                        producer.Send(response);
                    }
                }
            }
        }

        #endregion
    }
}

As you can see, this Listener takes a reference to the container in the constructor.  This reference is used to get access to the ISession, in order to create an IMessageProducer to send the response.  The important things are to note are the use of the NMSReplyTo property of the incoming message as the IDestination for the response and the setting of the NMSCorrelationID property of the response to the NMSCorrelationID property of the incoming message.

Putting it all together
It’s time to run the code.  The first thing to do is to ensure that ActiveMQ is running.  (If you don’t you’ll get an exception in the client and you’ll notice that the exception handling in the server is, well, absent.)  Next, right click on the Solution and select Set Start Up Projects.  Choose Multiple startup projects and set the Action of each to be Start.  It makes sense to have the server start before the client, so alter the start up order by moving the server to the top of the list.

Take a deep breath, we’re going in.  Run the solution.  All being well, you should see the message receipt in the server console window and the response message (“Thanks for sending the following message: A message for you.”) in the client console window.

Conclusion
When coming to messaging from an RPC or web service background, it isn’t necessarily obvious how to go about implementing the request-response paradigm.  The code in this article lets you use ActiveMQ for the Request-Response paradigm.  With a little refactoring and maybe a light sprinkling of encapsulation, the mechanics could be abstracted away from the calling code.

Written by remark

June 16, 2007 at 6:00 pm

20 Responses

Subscribe to comments with RSS.

  1. Hi.
    How is the throughput? I’ve implemented an SSL tunnel and saw that throughput is one message per 20 seconds; and obviously it’s awful. Is there any problem that I did? I think there maybe some misunderstanding!?

    Hamzeh

    May 31, 2008 at 8:32 am

  2. I haven’t done any performance profiling. One message per 20 seconds does sound poor. I’d suggest trying the forums. I wonder what other people’s experience has been.

    remark

    May 31, 2008 at 8:57 am

  3. Reuse at least connection and performance will boost. Even java version of MessageListenerContainer openc and closed connection by default on every Recieve if not reconfigured.

    Anthavio Lenz

    January 30, 2009 at 11:50 am

  4. Error 2 The property or indexer ‘Spring.Messaging.Nms.Listener.AbstractListenerContainer.SharedConnection’ cannot be used in this context because the get accessor is inaccessible C:\Documents and Settings\V644700\My Documents\Visual Studio 2008\Projects\ActiveMqRequestResponseConsole\ActiveMqServer\Listener.cs 26 39 ActiveMqServer

    Kash

    August 25, 2009 at 9:41 pm

  5. i am getting the below exception while compiling the …

    The property or indexer ‘Spring.Messaging.Nms.Listener.AbstractListenerContainer.SharedConnection’ cannot be used in this context because the get accessor is inaccessible

    any idea ??

    Kash

    August 26, 2009 at 6:03 pm

  6. I had the same SharedConnection problem you guys are experiencing. I’m not sure if this is the “correct” way of fixing it because I’m going through the same tutorial that you are but this is how I fixed mine.

    Change the line

    using (ISession session = this.container.SharedConnection.CreateSession())

    to

    using (ISession session = this.container.ConnectionFactory.CreateConnection().CreateSession())

    Nathan Palmer

    Nathan Palmer

    October 20, 2009 at 5:12 am

  7. You show setting the NMSCorrelationID when sending the message and also when sending the response message but you don’t show how to receive based on NMSCorrelationID and I cannot find anywhere so far how to make my consumer do so. I want to be able to receive the response message based on this value. I have it working in JMS just fine, but am struggling with NMS.

    Am setting my consumer like this, but it doesn’t seem to matter what the value is, it receives all messages:
    using (IMessageConsumer consumer = session.CreateConsumer(destination, “NMSCorrelationID = abc” ))

    Diane

    November 5, 2009 at 12:47 am

  8. Hi Mark,
    Thanks for a great and informative article.
    I was wondering if you had played around with ActiveMQ, Camel (running in the broker) in .NET?

    I tried adapting your example above to using a Camel route, but was not having any luck. I was not setting NMSReplyTo and CorrelationID, since the InOut parameter in Camel is supposed to do that for you.

    Any suggestions?

    Thanks,
    Chet

    Chet Bannerjee

    May 13, 2010 at 4:53 pm

  9. Hi Chet, I haven’t looked at ActiveMQ and Camel with .NET. Have you tried the Camel forums?

    remark

    May 13, 2010 at 11:20 pm

  10. Hi Mark,

    Good post which I was looking for.

    I have followed the your example but not able to produce the out put as expected. Since, in the Listener we are reading destination like

    — IDestination replyTo = textMessage.NMSReplyTo;

    but in my code the NMSReplyTo is null and I never get the response from the server class. So, I set NMSReplyTo value in while sending original messaage, at that time I received the message back in the same consol window instead of getting in the first one! :(

    I am not sure where it went wrong! could please tell mw what could be the wrong. If you need I can send my code to you.

    Regards.

    Sree Harshavardhana

    June 24, 2010 at 9:55 am

    • Hi Mark,

      Sorry.. I corrected the issue i was facing. I had left the ‘textMessage.NMSReplyTo = queue’ line while creating example.

      Now it’s working.

      Thanks for the article.

      Regards.

      Sree Harshavardhana

      June 24, 2010 at 11:00 am

  11. Hi Mark,

    We are working on the model in such a way that, the listener should be running always, so that when ever the request comes, the caller should get the reply back immediatly.

    What is the approch you would suggest us? We are trying to implement the same Listener using WCF Services, but some time the listener wont invokes and not getting the responses immediatly, but some time we do get.

    Please let me know what would be the best approch for this.

    Regards.

    Sree Harshavardhana

    June 24, 2010 at 12:55 pm

  12. [...] Home NMS Download ActiveMQ n .NET Request Response with NMS Publish n Subscribe with NMS Transactional Messaging with [...]

  13. hey,
    i am getting null reference exception, since it showing object reference not set to an instance of object. I copied as it is code and runned. Please help me on this.

    Puneeth

    October 12, 2010 at 1:05 pm

  14. Hello,

    I have several problems with the script (I am using Spring.NET-1.3.1). First, I made the changes regarding the using directives to make it work with Spring.NET-1.3.1, similarly as one has to do at http://remark.wordpress.com/articles/messaging-with-net-and-activemq/ if one wants to make it run.

    In addition, I am missing the package Spring.Messaging.Nms.Support.IDestinations, changing to Spring.Messaging.Nms.Support.Destinations works. Also, I had the problem and used the solution of Nathan Palmer in the comments above regarding the line “using (ISession session = this…)”.

    Now, the two consoles do run and the message of the client is received by the server BUT the response of the server is not received by the client, instead I get an exception in the client thread (error in the line containing “Console.WriteLine(textMessage.Text);” it says that object reference was not set to an object instance…”

    Does anyone have a solution? Thanks a lot in advance! :)

    Tyrex

    February 10, 2011 at 4:03 pm

    • I have the same error.
      The response message is null.

      You fix this?

      Marcio Althmann

      May 2, 2011 at 9:02 pm

      • Hi, I had this problem as well. I changed the example code following Nathan’s comment, and then modified it to call connection.Start() on my listener like so:

        public void OnMessage(IMessage message)
        {
        var conn = this.container.ConnectionFactory.CreateConnection();
        conn.Start();
        using (ISession session = conn.CreateSession())

        Alex

        June 2, 2011 at 10:40 am

  15. how can we send and recieve messages by using only silverlight with activemq case can u plz help me out..,i need it urgently for my project

    nichole

    April 28, 2011 at 10:07 am

  16. [...] http://remark.wordpress.com/articles/implementing-request-response-with-activemq-and-nms/.  This article was my starting point and is very well written, however I wasn't able to get it to work properly (couldn't get the response back as noted in some of the comments), and it uses Spring.NET which is a little heavy for my purposes. [...]

  17. Hi Mark,

    Thanks a lot for such a wonderful articles. I am refering all these related to SPRING.NET and Apache NMS and these are really very very useful.

    I am trying to implement request and response one and I get below error:

    ‘Spring.Messaging.Nms.Listener.AbstractListenerContainer.SharedConnection’ is inaccessible due to its protection level.

    I am referring spring and Apache library for .net framework 4.

    Could you please help me to get rid of this?

    Your prompt reply would be highly appreciated.

    Thanks and Regards,
    Yogita

    Yogita

    January 24, 2014 at 2:18 pm


Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

Follow

Get every new post delivered to your Inbox.

%d bloggers like this: