Re.Mark

My Life As A Blog

Transactional Message Processing with ActiveMQ and NMS

with 12 comments

In my previous articles on ActiveMQ and NMS, I’ve looked at sending a message from a sender to a receiver, implementing request-response and implementing publish subscribe.  In this article I’m going to look at how you can process incoming messages transactionally – the same techniques can be applied to outgoing messages.

Getting started

In previous articles, I’ve assumed that you’ve downloaded Spring.Messaging.Nms, which includes ActiveMQ.dll and NMS.dll.  You can of course download the source and build these libraries yourself.  You’ll need a subversion client installed.  Assuming you are using a command-line subversion client, create a folder and open a command-line in that folder.  Type the following command:

svn co https://svn.apache.org/repos/asf/activemq/activemq-dotnet/trunk activemq-dotnet

Once all the code has been downloaded, you can build the code using Visual Studio 2005, SharpDevelop, MonoDevelop, Nant or Maven 2.  You’ll find solution files for Visual Studio, SharpDevelop and MonoDevelop in the folder you created.  There’s information on how to use Maven and Nant here.

I’m still making the assumption that you’ve downloaded and set up ActiveMQ itself.  If you haven’t, please refer to this article to get you started.

Transactional Message Processing

Before we get down and dirty with some code, let’s spend a moment reviewing what we’re trying to achieve.  Consider a situation in which we receive a message that for some reason we cannot process – for example, we need to do a database insert and the database is currently unavailable.  We don’t want to lose the message, and we’d rather not have to write code that handled this case and put the message back on the queue.  By making the read transactional, we can signal to the message broker whether we were able to process the message or not – if not, the broker rolls back the read and the message remains in the queue.

Poison Messages

A poison message is a message that the receiver cannot process.  By simply rolling back as described earlier, a poison message will be received over and over again.  To avoid this eventuality, there are a couple of options:

  • Move the poison message to another queue
  • Discard the message

Of course, in order to take either option, we need to know whether the message we are handling has been sent to us before.  NMS provides a MessageRedelivered property on messages for this reason.

The scaffolding

Let’s create a structure to help us explore transactional message processing.  Create three projects:

  • Core – a Class Library
  • ListenerConsole – a Console Application
  • SenderConsole – another Console Application

Core is going to contain classes used by both console applications.  I hope the names I’ve used for the other two projects give you a fairly clear indication of their intended purpose.  To run the example from one solution, use the Multiple startup projects option to set both console applications to run at start up.  All three projects need to reference both ActiveMQ and NMS.  The console applications both need to reference Core.

In the Core project, create 4 classes and an interface (I’m using the .NET convention of denoting interfaces by prefixing the name with a capital I) as follows:

  • IMessageProcessor
  • QueueConnection
  • QueueConnectionFactory
  • SimpleQueueListener
  • SimpleQueuePublisher

IMessageProcessor

This interface defines how we want to handle incoming messages.  Here it is:

public interface IMessageProcessor
{
    bool ReceiveMessage(ITextMessage message);
}

By implementing this interface, we can indicate easily whether or not we successfully processed the message.

SimpleQueuePublisher

The purpose of SimpleQueuePublisher is to encapsulate enqueuing messages.  Here it is:

public class SimpleQueuePublisher : IDisposable
{
    private readonly IMessageProducer producer;
    private bool isDisposed = false;

    public SimpleQueuePublisher(IMessageProducer producer)
    {
        this.producer = producer;
    }

    public void SendMessage(string message)
    {
        if (!this.isDisposed)
        {
            ITextMessage textMessage = new ActiveMQTextMessage(message);
            this.producer.Send(textMessage);
        }
        else
        {
            throw new ObjectDisposedException(this.GetType().FullName);
        }
    }

    #region IDisposable Members

    public void Dispose()
    {
        if (!this.isDisposed)
        {
            this.producer.Dispose();
            this.isDisposed = true;
        }
    }

    #endregion
}

As you can see, the SimpleQueuePublisher takes an IMessageProducer in the constructor and uses it to send messages.

SimpleQueueListener

The purpose of SimpleQueueListener is to encapsulate receiving messages from a queue.  It looks like this:

public class SimpleQueueListener : IDisposable
{
    private readonly IMessageConsumer consumer;
    private bool isDisposed = false;
    private readonly IMessageProcessor processor;
    private readonly ISession session;

    public SimpleQueueListener(IMessageConsumer consumer, IMessageProcessor processor, ISession session)
    {
        this.consumer = consumer;
        MessageConsumer activeMqConsumer = this.consumer as MessageConsumer;
        if (activeMqConsumer != null)
        {
            activeMqConsumer.MaximumRedeliveryCount = 3;
        }
        this.consumer.Listener += new MessageListener(OnMessage);
        this.processor = processor;
        this.session = session;
    }

    public void OnMessage(IMessage message)
    {
        ITextMessage textMessage = message as ITextMessage;
        if (this.processor.ReceiveMessage(textMessage))
        {
            this.session.Commit();
        }
        else
        {
            Console.WriteLine("Error - returning message to queue.");
            this.session.Rollback();
        }
    }
    #region IDisposable Members

    public void Dispose()
    {
        if (!this.isDisposed)
        {
            this.consumer.Dispose();
            this.isDisposed = true;
        }
    }

    #endregion
}

SimpleQueueListener takes an IMessageConsumer, an IMessageProcessor and an ISessionIMessageConsumer receives incoming messages and IMessageProcessor processes them.  We need ISession in order to be able to commit or rollback the transaction – as can be seen in the OnMessage method.  Note that we’ve set the MaximumRedeliveryCount of the message consumer to 3.  This setting limits the number of times we will attempt to process the same message.  After we have reached the limit, these messages will be put on a dead letter queue (the queue name is ActiveMQ.DLQ for those who are interested) where they can be read at a later stage – allowing for diagnosis of the problem.

QueueConnection

QueueConnection encapsulates the connection to a queue.  Here it is:

public class QueueConnection : IDisposable
{
    private readonly IConnection connection;
    private readonly ISession session;
    private readonly IQueue queue;
    private bool isDisposed = false;

    public QueueConnection(IConnectionFactory connectionFactory, string queueName) : this(connectionFactory, queueName, AcknowledgementMode.AutoAcknowledge)
    {
    }

    public QueueConnection(IConnectionFactory connectionFactory, string queueName, AcknowledgementMode acknowledgementMode)
    {
        this.connection = connectionFactory.CreateConnection();
        this.connection.Start();
        this.session = this.connection.CreateSession(acknowledgementMode);
        this.queue = new ActiveMQQueue(queueName);
    }

    public SimpleQueuePublisher CreateSimpleQueuePublisher()
    {
        IMessageProducer producer = this.session.CreateProducer(this.queue);
        return new SimpleQueuePublisher(producer);
    }

    public SimpleQueueListener CreateSimpleQueueListener(IMessageProcessor processor)
    {
        IMessageConsumer consumer = this.session.CreateConsumer(this.queue, "2 > 1");
        return new SimpleQueueListener(consumer, processor, this.session);
    }

    #region IDisposable Members

    public void Dispose()
    {
        if (!this.isDisposed)
        {
            this.session.Dispose();
            this.connection.Stop();
            this.connection.Dispose();
            this.isDisposed = true;
        }
    }

    #endregion
}

QueueConnection provides methods to create a SimpleQueueListener and a SimpleQueuePublisher.  It takes an IConnectionFactory and a string containing the queue name in the constructor.  It also optionally takes an AcknowledgementMode, which is set to AutoAcknowledge by default.  The AcknowledgementMode is crucial to transactional processing.

Acknowledging Messages

When creating a session, there are 4 options for the AcknowledgementMode – the way we want to acknowledge receipt of messages.  They are:

  • AutoAcknowledge
  • DupsOkAcknowledge
  • ClientAcknowledge
  • Transactional

AutoAcknowledge – messages are acknowledged automatically on receipt.  This setting means that messages are delivered exactly once.

DupsOkAcknowledge – messages a acknowledged automatically but lazily, which can lead to messages being delivered more than once.   This setting means that messages are delivered at least once.

ClientAcknowledge – the client must acknowledge receipt of the message, which means that the client has complete control over when and how messages are acknowledged.

Transactional – message receipt is wrapped in a transaction, so if the client commits a transaction the message is acknowledged.

Persistent Messages

ActiveMQ supports persistent and non-persistent delivery.  The difference is that persistent messages are saved to a datastore (disk or a database) and will, consequently, survive a broker restart.  Of course, non-persistent messages will not survive a restart, so transactional message receipt with non-persistent messages may lead to some messages getting lost.  Persistent delivery is the default.

QueueConnectionFactory

QueueConnectionFactory is a simple wrapper to IConnectionFactory.  Here’s the code:

public class QueueConnectionFactory
{
    private readonly IConnectionFactory connectionFactory;

    public QueueConnectionFactory(IConnectionFactory connectionFactory)
    {
        this.connectionFactory = connectionFactory;
    }

    public QueueConnection CreateConnection(string queueName)
    {
        return new QueueConnection(this.connectionFactory, queueName);
    }

    public QueueConnection CreateTransactedConnection(string queueName)
    {
        return new QueueConnection(this.connectionFactory, queueName, AcknowledgementMode.Transactional);
    }
}

QueueConnectionFactory provides two methods – one to create a connection and another to create a transacted connection.

The Sender

Now that we’re done building the scaffolding, let’s build the sender.  Here’s the Program class from the SenderConsole project:

class Program
{
    const string BROKER_URI = "tcp://localhost:61616";
    const string QUEUE_NAME = "test.queue";

    const string FORMAT = "Message {0}";

    static void Main(string[] args)
    {
        try
        {
            QueueConnectionFactory factory = new QueueConnectionFactory(new ActiveMQ.ConnectionFactory(BROKER_URI));

            for (int i = 1; i < 11; i++)
            {
                using (QueueConnection connection = factory.CreateConnection(QUEUE_NAME))
                {
                    using (SimpleQueuePublisher publisher = connection.CreateSimpleQueuePublisher())
                    {
                        publisher.SendMessage(string.Format(FORMAT, i));
                        Console.WriteLine(i);
                    }
                }
            }
            Exit();
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex);
            Exit();
        }
    }

    static void Exit()
    {
        Console.WriteLine("Press any key to exit...");
        Console.ReadKey();
    }
}

Our sender is fairly simple – it will send ten messages to the queue (in this case “test.queue”.)

The Listener

The final step is to build the listener.  Firstly, add a class called MessageProducer to the SenderConsole project.  Here’s the code for it:

public class MessageProcessor : IMessageProcessor
{
    private bool errorOnNextMessage = false;
    private int numberOfErrors;
    private int errorCount = 0;

    public MessageProcessor(): this(false)
    {
    }

    public MessageProcessor(bool errorOnNextMessage) : this(errorOnNextMessage, 10)
    {
    }

    public MessageProcessor(bool errorOnNextMessage, int numberOfErrors)
    {
        this.errorOnNextMessage = errorOnNextMessage;
        this.numberOfErrors = numberOfErrors;
    }

    internal bool ErrorOnNextMessage
    {
        set
        {
            this.errorOnNextMessage = value;
        }
    }

    #region IMessageProcessor Members

    public bool ReceiveMessage(NMS.ITextMessage message)
    {
        if (message.NMSRedelivered)
        {
            Console.WriteLine("The following message is being redelivered:");
        }
        Console.WriteLine(message.Text);
        bool result = !this.errorOnNextMessage;
        if (this.errorOnNextMessage)
        {
            this.errorCount++;
            this.errorOnNextMessage = (errorCount <= numberOfErrors);
        }
        return result;
        
   }

    #endregion
}

Were this a real application, we’d expect to do some message processing here, but what we’re looking at is transactional processing, so you can see that the logic is rigged to error a number of times.  By using this class from the Program class in the ListenerConsole project, we should be able to see messages being delivered and redelivered after transactions that have been rolled back.  Here’s the code for the Program class:

class Program
{
    const string BROKER_URI = "tcp://localhost:61616";
    const string QUEUE_NAME = "test.queue";

    static void Main(string[] args)
    {
        try
        {                
            QueueConnectionFactory factory = new QueueConnectionFactory(new ActiveMQ.ConnectionFactory(BROKER_URI));

            using (QueueConnection connection = factory.CreateTransactedConnection(QUEUE_NAME))
            {
                using (SimpleQueueListener listener = connection.CreateSimpleQueueListener(new MessageProcessor(true)))
                {
                    Exit();
                }

                    
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex);
            Exit();
        }
    }

    static void Exit()
    {
        Console.WriteLine("Press any key to exit...");
        Console.ReadKey();
    }
}

If you want to observe non-transactional behaviour, change the connection creation statement to CreateConnection.

Running It

That’s all the code.  It should compile and run.  Make sure ActiveMQ is running first.  You should see output like the following from the SenderConsole:

1
2
3
4
5
6
7
8
9
10
Press any key to exit...

And output like the following from the ListenerConsole:

Press any key to exit...
Message 1
Error - returning message to queue.
The following message is being redelivered:
Message 1
Error - returning message to queue.
The following message is being redelivered:
Message 1
Error - returning message to queue.
The following message is being redelivered:
Message 1
Error - returning message to queue.
Message 2
Error - returning message to queue.
The following message is being redelivered:
Message 2
Error - returning message to queue.
The following message is being redelivered:
Message 2
Error - returning message to queue.
The following message is being redelivered:
Message 2
Error - returning message to queue.
Message 3
Error - returning message to queue.
The following message is being redelivered:
Message 3
Error - returning message to queue.
The following message is being redelivered:
Message 3
Error - returning message to queue.
The following message is being redelivered:
Message 3
Message 4
Message 5
Message 6
Message 7
Message 8
Message 9
Message 10

Here we can see the first and second message being retried 3 times after the initial delivery before being put on the dead letter queue.  Given the set number of errors, the third message succeeds after two retries.  All the remaining messages are successful first time.

Where next?

Having got to grips with transactional processing with ActiveMQ and NMS, you  should be able to incorporate these techniques into your applications.  Of course there are more features to look at – such as message expiration and message groups – that can be combined with this basic transactional behaviour to provide more sophistication and comlexity when required.

Written by remark

September 16, 2007 at 11:18 am

12 Responses

Subscribe to comments with RSS.

  1. Thans for the great article – it helped me a lot.
    I am trying to make message groups working with AMQ 5.0.0 and NMS.
    I don’t see easy way to configure it via classes. I tried add exclusive=true on destination url but consumer doesn’t get messages in order.
    I’ll appreciate any tips

    ivo

    May 6, 2008 at 2:45 pm

  2. I ma trying to put this into a Windows Service. My main thread looks like this:

    private void ThreadRun(){
    using (QueueConnection connection = factory.CreateTransactedConnection(ConsumerQueue))
    {
    using (QueueListener listener = connection.CreateQueueListener(new MessageProcessor(traceLogging, ServiceName, ConnectString, emailProps)))
    {
    while (threadrun)
    {
    Thread.Sleep(interval); //interval = 1000
    }
    }
    }
    }

    The while loop just spins to keep the thread and the listener alive.

    Once in a while, I will start the service with messages waiting on the queue but they do not get processed by my service. No errors are logged it just acts like they are not there,

    1. Is my ThreadRun method the best way to keep the listener alive and..

    2. Has anyone come across the problem I describe where messages are not seen?

    Rick

    July 29, 2008 at 10:06 pm

  3. Great Article!

    Do you know if there are any plans with incorporating XA Transactions with NMS?

    Thanks.

    Adam

    August 5, 2008 at 5:32 pm

  4. Adam, I’d suggest having a look at this article as a starting point. It’s not something I’ve looked into in any detail.

    remark

    August 7, 2008 at 3:49 pm

  5. Thanks for the code. i can now send and recieve message in ActiveMQ. But my problem is how can i get all the list of queues in the activeMQ server

    Thanks

    JuliusR

    October 7, 2008 at 5:37 am

    • >>JuliusR
      Me too. Native API provides easy to use getDestinations(), but I can’t find something like that in NMS…

      TimSail

      November 17, 2009 at 6:35 am

  6. I have written a Windows Service using code very similar to what is posted above. Occasionally, I receive messages in a different sequence than they were put on the queue. Is this expected behavior for the MessageListner? Should I rewrite to use consumer.receive instead?

    Thanks

    Rick

    January 9, 2009 at 1:11 am

  7. Hi,
    This code is working very well for me. Thanks for the sample.
    I would like to read the DLQ and reprocess the Message. Is there any sample .net code available for this?

    Thanks
    Murali

    Murali

    September 15, 2009 at 7:44 pm

  8. Hi,
    I found a way to loop through the DLQ.
    Just change the below constant
    private const string QUEUE_NAME= “test.queue”;

    to

    private const string QUEUE_NAME= “ActiveMQ.DLQ”;

    This is really great stuff and Easy to understand.

    Murali

    September 15, 2009 at 9:36 pm

  9. […] NMS Home NMS Download ActiveMQ n .NET Request Response with NMS Publish n Subscribe with NMS Transactional Messaging with NMS […]

  10. Why the Dispose method and logic? This is all single threaded code. This Dispose is not necessary, Session and Connection alreadt have a Dispose, use them.

    Jack

    September 13, 2015 at 12:15 pm

  11. Hello,

    Since your session AcknowledgeMode is Transactional, QueuePublisher needs to commit message after send it to queue.

    FIKRAT HUSEYNKHANOV

    February 13, 2018 at 10:14 am


Leave a comment