Re.Mark

My Life As A Blog

Search Results

ActiveMQ version 5 is released

leave a comment »

I’ve written a few articles about ActiveMQ over the last few months.  Version 5 has just been released.  There’s more info on James Strachan’s blog here.

Written by remark

December 17, 2007 at 11:11 pm

Posted in Development, Java, Open Source, Software, Tools

Tagged with

Transactional Message Processing with ActiveMQ and NMS

with 12 comments

In my previous articles on ActiveMQ and NMS, I’ve looked at sending a message from a sender to a receiver, implementing request-response and implementing publish subscribe.  In this article I’m going to look at how you can process incoming messages transactionally – the same techniques can be applied to outgoing messages.

Getting started

In previous articles, I’ve assumed that you’ve downloaded Spring.Messaging.Nms, which includes ActiveMQ.dll and NMS.dll.  You can of course download the source and build these libraries yourself.  You’ll need a subversion client installed.  Assuming you are using a command-line subversion client, create a folder and open a command-line in that folder.  Type the following command:

svn co https://svn.apache.org/repos/asf/activemq/activemq-dotnet/trunk activemq-dotnet

Once all the code has been downloaded, you can build the code using Visual Studio 2005, SharpDevelop, MonoDevelop, Nant or Maven 2.  You’ll find solution files for Visual Studio, SharpDevelop and MonoDevelop in the folder you created.  There’s information on how to use Maven and Nant here.

I’m still making the assumption that you’ve downloaded and set up ActiveMQ itself.  If you haven’t, please refer to this article to get you started.

Transactional Message Processing

Before we get down and dirty with some code, let’s spend a moment reviewing what we’re trying to achieve.  Consider a situation in which we receive a message that for some reason we cannot process – for example, we need to do a database insert and the database is currently unavailable.  We don’t want to lose the message, and we’d rather not have to write code that handled this case and put the message back on the queue.  By making the read transactional, we can signal to the message broker whether we were able to process the message or not – if not, the broker rolls back the read and the message remains in the queue.

Poison Messages

A poison message is a message that the receiver cannot process.  By simply rolling back as described earlier, a poison message will be received over and over again.  To avoid this eventuality, there are a couple of options:

  • Move the poison message to another queue
  • Discard the message

Of course, in order to take either option, we need to know whether the message we are handling has been sent to us before.  NMS provides a MessageRedelivered property on messages for this reason.

The scaffolding

Let’s create a structure to help us explore transactional message processing.  Create three projects:

  • Core – a Class Library
  • ListenerConsole – a Console Application
  • SenderConsole – another Console Application

Core is going to contain classes used by both console applications.  I hope the names I’ve used for the other two projects give you a fairly clear indication of their intended purpose.  To run the example from one solution, use the Multiple startup projects option to set both console applications to run at start up.  All three projects need to reference both ActiveMQ and NMS.  The console applications both need to reference Core.

In the Core project, create 4 classes and an interface (I’m using the .NET convention of denoting interfaces by prefixing the name with a capital I) as follows:

  • IMessageProcessor
  • QueueConnection
  • QueueConnectionFactory
  • SimpleQueueListener
  • SimpleQueuePublisher

IMessageProcessor

This interface defines how we want to handle incoming messages.  Here it is:

public interface IMessageProcessor
{
    bool ReceiveMessage(ITextMessage message);
}

By implementing this interface, we can indicate easily whether or not we successfully processed the message.

SimpleQueuePublisher

The purpose of SimpleQueuePublisher is to encapsulate enqueuing messages.  Here it is:

public class SimpleQueuePublisher : IDisposable
{
    private readonly IMessageProducer producer;
    private bool isDisposed = false;

    public SimpleQueuePublisher(IMessageProducer producer)
    {
        this.producer = producer;
    }

    public void SendMessage(string message)
    {
        if (!this.isDisposed)
        {
            ITextMessage textMessage = new ActiveMQTextMessage(message);
            this.producer.Send(textMessage);
        }
        else
        {
            throw new ObjectDisposedException(this.GetType().FullName);
        }
    }

    #region IDisposable Members

    public void Dispose()
    {
        if (!this.isDisposed)
        {
            this.producer.Dispose();
            this.isDisposed = true;
        }
    }

    #endregion
}

As you can see, the SimpleQueuePublisher takes an IMessageProducer in the constructor and uses it to send messages.

SimpleQueueListener

The purpose of SimpleQueueListener is to encapsulate receiving messages from a queue.  It looks like this:

public class SimpleQueueListener : IDisposable
{
    private readonly IMessageConsumer consumer;
    private bool isDisposed = false;
    private readonly IMessageProcessor processor;
    private readonly ISession session;

    public SimpleQueueListener(IMessageConsumer consumer, IMessageProcessor processor, ISession session)
    {
        this.consumer = consumer;
        MessageConsumer activeMqConsumer = this.consumer as MessageConsumer;
        if (activeMqConsumer != null)
        {
            activeMqConsumer.MaximumRedeliveryCount = 3;
        }
        this.consumer.Listener += new MessageListener(OnMessage);
        this.processor = processor;
        this.session = session;
    }

    public void OnMessage(IMessage message)
    {
        ITextMessage textMessage = message as ITextMessage;
        if (this.processor.ReceiveMessage(textMessage))
        {
            this.session.Commit();
        }
        else
        {
            Console.WriteLine("Error - returning message to queue.");
            this.session.Rollback();
        }
    }
    #region IDisposable Members

    public void Dispose()
    {
        if (!this.isDisposed)
        {
            this.consumer.Dispose();
            this.isDisposed = true;
        }
    }

    #endregion
}

SimpleQueueListener takes an IMessageConsumer, an IMessageProcessor and an ISessionIMessageConsumer receives incoming messages and IMessageProcessor processes them.  We need ISession in order to be able to commit or rollback the transaction – as can be seen in the OnMessage method.  Note that we’ve set the MaximumRedeliveryCount of the message consumer to 3.  This setting limits the number of times we will attempt to process the same message.  After we have reached the limit, these messages will be put on a dead letter queue (the queue name is ActiveMQ.DLQ for those who are interested) where they can be read at a later stage – allowing for diagnosis of the problem.

QueueConnection

QueueConnection encapsulates the connection to a queue.  Here it is:

public class QueueConnection : IDisposable
{
    private readonly IConnection connection;
    private readonly ISession session;
    private readonly IQueue queue;
    private bool isDisposed = false;

    public QueueConnection(IConnectionFactory connectionFactory, string queueName) : this(connectionFactory, queueName, AcknowledgementMode.AutoAcknowledge)
    {
    }

    public QueueConnection(IConnectionFactory connectionFactory, string queueName, AcknowledgementMode acknowledgementMode)
    {
        this.connection = connectionFactory.CreateConnection();
        this.connection.Start();
        this.session = this.connection.CreateSession(acknowledgementMode);
        this.queue = new ActiveMQQueue(queueName);
    }

    public SimpleQueuePublisher CreateSimpleQueuePublisher()
    {
        IMessageProducer producer = this.session.CreateProducer(this.queue);
        return new SimpleQueuePublisher(producer);
    }

    public SimpleQueueListener CreateSimpleQueueListener(IMessageProcessor processor)
    {
        IMessageConsumer consumer = this.session.CreateConsumer(this.queue, "2 > 1");
        return new SimpleQueueListener(consumer, processor, this.session);
    }

    #region IDisposable Members

    public void Dispose()
    {
        if (!this.isDisposed)
        {
            this.session.Dispose();
            this.connection.Stop();
            this.connection.Dispose();
            this.isDisposed = true;
        }
    }

    #endregion
}

QueueConnection provides methods to create a SimpleQueueListener and a SimpleQueuePublisher.  It takes an IConnectionFactory and a string containing the queue name in the constructor.  It also optionally takes an AcknowledgementMode, which is set to AutoAcknowledge by default.  The AcknowledgementMode is crucial to transactional processing.

Acknowledging Messages

When creating a session, there are 4 options for the AcknowledgementMode – the way we want to acknowledge receipt of messages.  They are:

  • AutoAcknowledge
  • DupsOkAcknowledge
  • ClientAcknowledge
  • Transactional

AutoAcknowledge – messages are acknowledged automatically on receipt.  This setting means that messages are delivered exactly once.

DupsOkAcknowledge – messages a acknowledged automatically but lazily, which can lead to messages being delivered more than once.   This setting means that messages are delivered at least once.

ClientAcknowledge – the client must acknowledge receipt of the message, which means that the client has complete control over when and how messages are acknowledged.

Transactional – message receipt is wrapped in a transaction, so if the client commits a transaction the message is acknowledged.

Persistent Messages

ActiveMQ supports persistent and non-persistent delivery.  The difference is that persistent messages are saved to a datastore (disk or a database) and will, consequently, survive a broker restart.  Of course, non-persistent messages will not survive a restart, so transactional message receipt with non-persistent messages may lead to some messages getting lost.  Persistent delivery is the default.

QueueConnectionFactory

QueueConnectionFactory is a simple wrapper to IConnectionFactory.  Here’s the code:

public class QueueConnectionFactory
{
    private readonly IConnectionFactory connectionFactory;

    public QueueConnectionFactory(IConnectionFactory connectionFactory)
    {
        this.connectionFactory = connectionFactory;
    }

    public QueueConnection CreateConnection(string queueName)
    {
        return new QueueConnection(this.connectionFactory, queueName);
    }

    public QueueConnection CreateTransactedConnection(string queueName)
    {
        return new QueueConnection(this.connectionFactory, queueName, AcknowledgementMode.Transactional);
    }
}

QueueConnectionFactory provides two methods – one to create a connection and another to create a transacted connection.

The Sender

Now that we’re done building the scaffolding, let’s build the sender.  Here’s the Program class from the SenderConsole project:

class Program
{
    const string BROKER_URI = "tcp://localhost:61616";
    const string QUEUE_NAME = "test.queue";

    const string FORMAT = "Message {0}";

    static void Main(string[] args)
    {
        try
        {
            QueueConnectionFactory factory = new QueueConnectionFactory(new ActiveMQ.ConnectionFactory(BROKER_URI));

            for (int i = 1; i < 11; i++)
            {
                using (QueueConnection connection = factory.CreateConnection(QUEUE_NAME))
                {
                    using (SimpleQueuePublisher publisher = connection.CreateSimpleQueuePublisher())
                    {
                        publisher.SendMessage(string.Format(FORMAT, i));
                        Console.WriteLine(i);
                    }
                }
            }
            Exit();
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex);
            Exit();
        }
    }

    static void Exit()
    {
        Console.WriteLine("Press any key to exit...");
        Console.ReadKey();
    }
}

Our sender is fairly simple – it will send ten messages to the queue (in this case “test.queue”.)

The Listener

The final step is to build the listener.  Firstly, add a class called MessageProducer to the SenderConsole project.  Here’s the code for it:

public class MessageProcessor : IMessageProcessor
{
    private bool errorOnNextMessage = false;
    private int numberOfErrors;
    private int errorCount = 0;

    public MessageProcessor(): this(false)
    {
    }

    public MessageProcessor(bool errorOnNextMessage) : this(errorOnNextMessage, 10)
    {
    }

    public MessageProcessor(bool errorOnNextMessage, int numberOfErrors)
    {
        this.errorOnNextMessage = errorOnNextMessage;
        this.numberOfErrors = numberOfErrors;
    }

    internal bool ErrorOnNextMessage
    {
        set
        {
            this.errorOnNextMessage = value;
        }
    }

    #region IMessageProcessor Members

    public bool ReceiveMessage(NMS.ITextMessage message)
    {
        if (message.NMSRedelivered)
        {
            Console.WriteLine("The following message is being redelivered:");
        }
        Console.WriteLine(message.Text);
        bool result = !this.errorOnNextMessage;
        if (this.errorOnNextMessage)
        {
            this.errorCount++;
            this.errorOnNextMessage = (errorCount <= numberOfErrors);
        }
        return result;
        
   }

    #endregion
}

Were this a real application, we’d expect to do some message processing here, but what we’re looking at is transactional processing, so you can see that the logic is rigged to error a number of times.  By using this class from the Program class in the ListenerConsole project, we should be able to see messages being delivered and redelivered after transactions that have been rolled back.  Here’s the code for the Program class:

class Program
{
    const string BROKER_URI = "tcp://localhost:61616";
    const string QUEUE_NAME = "test.queue";

    static void Main(string[] args)
    {
        try
        {                
            QueueConnectionFactory factory = new QueueConnectionFactory(new ActiveMQ.ConnectionFactory(BROKER_URI));

            using (QueueConnection connection = factory.CreateTransactedConnection(QUEUE_NAME))
            {
                using (SimpleQueueListener listener = connection.CreateSimpleQueueListener(new MessageProcessor(true)))
                {
                    Exit();
                }

                    
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex);
            Exit();
        }
    }

    static void Exit()
    {
        Console.WriteLine("Press any key to exit...");
        Console.ReadKey();
    }
}

If you want to observe non-transactional behaviour, change the connection creation statement to CreateConnection.

Running It

That’s all the code.  It should compile and run.  Make sure ActiveMQ is running first.  You should see output like the following from the SenderConsole:

1
2
3
4
5
6
7
8
9
10
Press any key to exit...

And output like the following from the ListenerConsole:

Press any key to exit...
Message 1
Error - returning message to queue.
The following message is being redelivered:
Message 1
Error - returning message to queue.
The following message is being redelivered:
Message 1
Error - returning message to queue.
The following message is being redelivered:
Message 1
Error - returning message to queue.
Message 2
Error - returning message to queue.
The following message is being redelivered:
Message 2
Error - returning message to queue.
The following message is being redelivered:
Message 2
Error - returning message to queue.
The following message is being redelivered:
Message 2
Error - returning message to queue.
Message 3
Error - returning message to queue.
The following message is being redelivered:
Message 3
Error - returning message to queue.
The following message is being redelivered:
Message 3
Error - returning message to queue.
The following message is being redelivered:
Message 3
Message 4
Message 5
Message 6
Message 7
Message 8
Message 9
Message 10

Here we can see the first and second message being retried 3 times after the initial delivery before being put on the dead letter queue.  Given the set number of errors, the third message succeeds after two retries.  All the remaining messages are successful first time.

Where next?

Having got to grips with transactional processing with ActiveMQ and NMS, you  should be able to incorporate these techniques into your applications.  Of course there are more features to look at – such as message expiration and message groups – that can be combined with this basic transactional behaviour to provide more sophistication and comlexity when required.

Written by remark

September 16, 2007 at 11:18 am

Posted in

ActiveMQ and SQL Server

leave a comment »

At first, when I tried to use SQL Server (2000) with ActiveMQ there was an error. The error was:

Failed to acquire lock: com.microsoft.sqlserver.jdbc.SQLServerException: Line 1: FOR UPDATE clause allowed only for DECLARE CURSOR.

I’m using the jtds jdbc driver. I attached a profiler and the SQL that SQL Server finds offensive is:

SELECT * FROM ACTIVEMQ_LOCK FOR UPDATE

I searched through the Nabble forums for ActiveMQ and found the answer here. My jtds config looks like this:

<bean id="jtds-ds" class="net.sourceforge.jtds.jdbcx.JtdsDataSource">
	<property name="serverName" value="localhost"/>
	<property name="portNumber" value="1433"/>
	<property name="databaseName" value="databaseName"/>
	<property name="user" value="userName"/>
	<property name="password" value="password"/>
</bean>

and my persistence adapter is configured like this:

<persistenceAdapter>
    <journaledJDBC journalLogFiles="5" dataDirectory="../activemq-data" dataSource="#jtds-ds" useDatabaseLock="false"/>
</persistenceAdapter>

The useDatabaseLock=False being the important bit.

All of which seems to work.

Written by remark

July 19, 2007 at 4:43 pm

Pub-sub with ActiveMQ

with one comment

I’ve just added an article about publish-subscribe with ActiveMQ and NMS.  Read it here.

Written by remark

July 7, 2007 at 10:12 pm

Publish-Subscribe with ActiveMq and NMS

with 55 comments

In my previous articles about using ActiveMq and NMS, I have looked at sending a message from a sender to a receiver and implementing request-response.  In this article, I’m going to investigate how to use the publish-subscribe pattern with ActiveMq and NMS.

The Publish-Subscribe pattern

The Publish-Subscribe pattern (pub-sub) can be thought of as a distributed implementation of the Observer pattern.  The Observer pattern defines a “dependency between objects so that when one object changes state, all its dependents are notified and updated automatically.”  In Enterprise Integration Patterns, Publish-Subscribe is defined as a way for “the sender [to] broadcast an event to all interested receivers.”  The JMS tutorial describes the basics of pub-sub as applied to JMS.  It is important to note the difference between a subscriber and a durable subscriber.  As subscriber will only receive messages that were sent while it is connected whereas a durable subscriber can receive messages sent while they were disconnected.  Also worth noting is the difference between synchronous message consumption and asynchronous message consumption.

Before we go any further…

…I’m making a couple of assumptions.  I’m assuming that you have already installed ActiveMq and have it running.  I’m also assuming that you have downloaded Spring.Messaging.Nms.  If you need more information about how to ensure these assumptions are valid, read my first article about ActiveMq and NMS.

A simple subscriber

Let’s get started by creating a simple subscriber.  Create a Windows Class Library project.  I called mine Core.  Add references to:

  • ActiveMQ
  • NMS

Create a class called SimpleTopicSubscriber.  The code for the class is listed below:

using System;

using ActiveMQ;
using ActiveMQ.Commands;
using NMS;

namespace Core
{
    public delegate void MessageReceivedDelegate(string message);

    public class SimpleTopicSubscriber : IDisposable
    {
        private readonly string topicName = null;
        private readonly IConnectionFactory connectionFactory;
        private readonly IConnection connection;
        private readonly ISession session;
        private readonly IMessageConsumer consumer;
        private bool isDisposed = false;
        public event MessageReceivedDelegate OnMessageReceived;

        public SimpleTopicSubscriber(string topicName, string brokerUri, string clientId, string consumerId)
        {
            this.topicName = topicName;
            this.connectionFactory = new ConnectionFactory(brokerUri);
            this.connection = this.connectionFactory.CreateConnection();
            this.connection.ClientId = clientId;
            this.connection.Start();
            this.session = connection.CreateSession();
            ActiveMQTopic topic = new ActiveMQTopic(topicName);
            this.consumer = this.session.CreateDurableConsumer(topic, consumerId, "2 > 1", false);
            this.consumer.Listener += new MessageListener(OnMessage);

        }

        public void OnMessage(IMessage message)
        {
            ITextMessage textMessage = message as ITextMessage;
            if (this.OnMessageReceived != null)
            {
                this.OnMessageReceived(textMessage.Text);
            }
        }


        #region IDisposable Members

        public void Dispose()
        {
            if (!this.isDisposed)
            {
                this.consumer.Dispose();
                this.session.Dispose();
                this.connection.Dispose();
                this.isDisposed = true;
            }
        }

        #endregion
    }
}

This class creates a durable subscription and consumes messages asynchronously.  All the work is done in the constructor.  The assignment of the client id is important in making this subscription durable.  Registering the OnMessage event handler provides the means to consume messages asynchronously.  The CreateDurableConsumer method on the session takes four parameters:

  • NMS.ITopic destination- the topic to which to subscribe
  • string name – the name of the consumer
  • string selector – I’m not sure what this parameter means yet.  I have copied this setting from some test cases I found for NMS.  And it seems to work.  I’ll try and figure out what it does and update the article because I don’t like to depend on magic.
  • bool noLocal – indicates whether you want to consume messages that originate locally.  So, setting this to false means we’ll receive all messages sent to the topic.

It’s worth noting that there’s a bunch of stuff that’ll need disposing – the connection, session and message consumer.  So, there’s a rudimentary implementation of IDisposable to clean up.  Finally, note that there’s an event that allows this class to bubble up the text in each message received.

Subscribe in action

Let’s see how this subscriber works.  Create a Windows Console application project.  I called mine ActiveMqFirstSubscriber.  This project needs references to:

  • the Core project
  • ActiveMQ
  • NMS

The code for Program class is:

using System;

using Core;

namespace ActiveMqFirstSubscriber
{
    class Program
    {
        const string TOPIC_NAME = "SampleSubscriptionTopic";
        const string BROKER = "tcp://localhost:61616";
        const string CLIENT_ID = "ActiveMqFirstSubscriber";
        const string CONSUMER_ID = "ActiveMqFirstSubscriber";

        static void Main(string[] args)
        {
            try
            {
                using (SimpleTopicSubscriber subscriber = new SimpleTopicSubscriber(TOPIC_NAME, BROKER, CLIENT_ID, CONSUMER_ID))
                {
                    subscriber.OnMessageReceived += new MessageReceivedDelegate(subscriber_OnMessageReceived);
                    Console.WriteLine("Press any key to exit...");
                    Console.ReadKey();
                }
            }
            catch(Exception ex)
            {
                Console.WriteLine(ex);
                Console.WriteLine("Press any key to exit...");
                Console.ReadKey();
            }

            
        }

        static void subscriber_OnMessageReceived(string message)
        {
            Console.WriteLine(message);
        }
    }
}

Run the console.  You should see a line telling you that you can press any key to exit.  Don’t exit yet.  Run jconsole and connect to ActiveMq (instructions here.)  Select the MBeans tab. Navigate to org.apache.activemq->localhost->Topic->SampleSubscriptionTopic->Operations->sendTextMessage as shown below:

jconsole-sendTextMessage-topic

In the text box next to the sendTextMessage button, enter some text.  Not wanting to break with tradition, I entered “Hello”.  You should see the text you entered show up in the console.  Now press any key in the console window and it will exit.  Send another message via jconsole (this time I sent “Hello again.”)  Run the console and there’s your message.

A simple publisher

Now we can subscribe to messages, the next step is to write a publisher.  Add a new class to the Core project and call it SimpleTopicPublisher.  Here’s the code:

using System;

using ActiveMQ;
using ActiveMQ.Commands;
using NMS;

namespace Core
{
    public class SimpleTopicPublisher : IDisposable
    {
        private readonly string topicName = null;
        private readonly IConnectionFactory connectionFactory;
        private readonly IConnection connection;
        private readonly ISession session;
        private readonly IMessageProducer producer;
        private bool isDisposed = false;

        public SimpleTopicPublisher(string topicName, string brokerUri)
        {
            this.topicName = topicName;
            this.connectionFactory = new ConnectionFactory(brokerUri);
            this.connection = this.connectionFactory.CreateConnection();
            this.connection.Start();
            this.session = connection.CreateSession();
            ActiveMQTopic topic = new ActiveMQTopic(topicName);
            this.producer = this.session.CreateProducer(topic);
            
        }

        public void SendMessage(string message)
        {
            if (!this.isDisposed)
            {
                ITextMessage textMessage = this.session.CreateTextMessage(message);
                this.producer.Send(textMessage);
            }
            else
            {
                throw new ObjectDisposedException(this.GetType().FullName);
            }
        }

        #region IDisposable Members

        public void Dispose()
        {
            if (!this.isDisposed)
            {
                this.producer.Dispose();
                this.session.Dispose();
                this.connection.Dispose();
                this.isDisposed = true;
            }
        }

        #endregion
    }
}

This code should all look pretty familiar after the subscriber code.  We don’t need to assign a clientId to the connection.  To be able to send messages, we create an IMessageProducer from the session.  The SendMessage method provides client code the ability to send messages to the topic.  Just like the subscriber, there’s a little cleaning up to do, so once again there’s a rudimentary implementation of IDisposable.

The publisher in action

Create a Windows Forms project.  I called mine TopicPublisher.  Create a form and call it MainForm.  Set the text of the form to “Publisher”.  Add a label called instructionLabel and set the text to “Enter a message.”  Add a text box called messageTextBox.  Add a button called sendButton and set the text to “Send Message”.  Create event handlers for the click event of button and the load and closed events of the form.  Here’s the code for the form:

using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Text;
using System.Windows.Forms;

using Core;

namespace TopicPublisher
{
    public partial class FirstMainForm : Form
    {
        const string TOPIC_NAME = "SampleSubscriptionTopic";
        const string BROKER = "tcp://localhost:61616";

        private SimpleTopicPublisher publisher;
        private readonly StringBuilder builder = new StringBuilder();
        private delegate void SetTextCallback(string text);

        public FirstMainForm()
        {
            InitializeComponent();
        }

        private void FirstMainForm_Load(object sender, EventArgs e)
        {
            try
            {
                this.publisher = new SimpleTopicPublisher(TOPIC_NAME, BROKER);
            }
            catch (Exception ex)
            {
                MessageBox.Show(ex.ToString());
                this.Close();
            }
        }

        private void sendButton_Click(object sender, EventArgs e)
        {
            this.publisher.SendMessage(this.messageTextBox.Text);
        }

        private void FirstMainForm_FormClosed(object sender, FormClosedEventArgs e)
        {
            try
            {
                this.publisher.Dispose();
            }
            catch (Exception ex)
            {
                MessageBox.Show(ex.ToString());
            }
        }
    }
}

Hopefully, this is all fairly obvious.  We create a publisher on load and dispose of it when the form closes.  When the button is clicked the text from the text box is sent to all subscribers.  If, like me, you have these projects in one solution, you’ll need to set both the Console application and the Windows Forms application to run at startup.  Go ahead and run the subscriber and the publisher.  You should be able to send messages form the form to the console.

The next step

So we can send messages from a to b.  And we know that if b is disconnected, it will receive the messages as soon as it reconnects.  To make this a little more interesting, how about a form that is both a publisher and a subscriber?

A little light refactoring

The obvious thing about SimpleTopicPublisher and SImpleTopicSubscriber is that they use their own connections and sessions.  If we are going to publish and subscribe from the same form, it’d make more sense to share connections and sessions.  Here’s the new code for SimpleTopicSubscriber:

using System;
using System.Collections.Generic;
using System.Text;

using ActiveMQ;
using NMS;

namespace Core
{
    public delegate void MessageReceivedDelegate(string message);

    public class SimpleTopicSubscriber : IDisposable
    {
        private readonly IMessageConsumer consumer;
        private bool isDisposed = false;
        public event MessageReceivedDelegate OnMessageReceived;

        public SimpleTopicSubscriber(IMessageConsumer consumer)
        {
            this.consumer = consumer;
            this.consumer.Listener += new MessageListener(OnMessage);
        }

        public void OnMessage(IMessage message)
        {
            ITextMessage textMessage = message as ITextMessage;
            if (this.OnMessageReceived != null)
            {
                this.OnMessageReceived(textMessage.Text);
            }
        }

        #region IDisposable Members

        public void Dispose()
        {
            if (!this.isDisposed)
            {
                this.consumer.Dispose();
                this.isDisposed = true;
            }
        }

        #endregion
    }
}

Now we pass an instance of IMessageConsumer in the constructor.  We can make a similar change to SimpleTopicPubisher by passing an instance of IMessageProducer in the constructor.  Here’s the code:

using System;
using System.Collections.Generic;
using System.Text;

using ActiveMQ;
using ActiveMQ.Commands;
using NMS;

namespace Core
{
    public class SimpleTopicPublisher : IDisposable
    {
        private readonly IMessageProducer producer;
        private bool isDisposed = false;

        public SimpleTopicPublisher(IMessageProducer producer)
        {
            this.producer = producer;
        }

        public void SendMessage(string message)
        {
            if (!this.isDisposed)
            {
                ITextMessage textMessage = new ActiveMQTextMessage(message);
                this.producer.Send(textMessage);
            }
            else
            {
                throw new ObjectDisposedException(this.GetType().FullName);
            }
        }

        #region IDisposable Members

        public void Dispose()
        {
            if (!this.isDisposed)
            {
                this.producer.Dispose();
                this.isDisposed = true;
            }
        }

        #endregion
    }
}

Since we probably don’t want to put all the code that creates IMessageConsumer instances and IMessageProducer interfaces into our form, we’ll need a new class that takes that responsibility.  Add a new class to the Core project and call it TopicConnection.  The code for the class is listed below:

using System;
using System.Collections.Generic;
using System.Text;

using ActiveMQ;
using ActiveMQ.Commands;
using NMS;

namespace Core
{
    public class TopicConnection : IDisposable
    {
        private readonly IConnection connection;
        private readonly ISession session;
        private readonly ITopic topic;
        private bool isDisposed = false;

        public TopicConnection(IConnectionFactory connectionFactory, string clientId, string topicName)
        {
            this.connection = connectionFactory.CreateConnection();
            this.connection.ClientId = clientId;
            this.connection.Start();
            this.session = this.connection.CreateSession();
            this.topic = new ActiveMQTopic(topicName);
        }

        public SimpleTopicPublisher CreateTopicPublisher()
        {
            IMessageProducer producer = this.session.CreateProducer(this.topic);
            return new SimpleTopicPublisher(producer);
        }

        public SimpleTopicSubscriber CreateSimpleTopicSubscriber(string consumerId)
        {
            IMessageConsumer consumer = this.session.CreateDurableConsumer(this.topic, consumerId, "2 > 1", false);
            return new SimpleTopicSubscriber(consumer);
        }

        #region IDisposable Members

        public void Dispose()
        {
            if (!this.isDisposed)
            {
                this.session.Dispose();
                this.connection.Dispose();
                this.isDisposed = true;
            }
        }

        #endregion
    }
}

This class has a method to create a SimpleTopicSubscriber and another to create a SimpleTopicPublisher.  The constructor takes an instance of IConnectionFactory, the client id and the name of the topic.  To complete this exercise, let’s add a class called TopicConnectionFactory.  This is a simple class that does little more than hold a reference to an instance of IConnectionFactory.  Here’s the code:

using System;
using System.Collections.Generic;
using System.Text;

using ActiveMQ;
using NMS;

namespace Core
{
    public class TopicConnectionFactory
    {
        private readonly IConnectionFactory connectionFactory;

        public TopicConnectionFactory(IConnectionFactory connectionFactory)
        {
            this.connectionFactory = connectionFactory;
        }

        public TopicConnection CreateConnection(string clientId, string topicName)
        {
            return new TopicConnection(this.connectionFactory, clientId, topicName);
        }
    }
}

Now the Core code is in better shape, let’s sort the form out.  We want to be able to set the client id from the form, so add a label, a text box and a button.  Call the label clientIdLabel and set the text to “Client Id”.  Call the text box clientIdTextBox.  Call the button connectButton and set the text to “Connect”.  Wire up an event handler to the click event of the button.  Here’s the code for the event handler:

private void connectButton_Click(object sender, EventArgs e)
{
    try
    {
        this.clientId = this.clientIdTextBox.Text;
        this.consumerId = this.clientId;
        this.connection = this.connectionFactory.CreateConnection(this.clientId, TOPIC_NAME);
        this.publisher = this.connection.CreateTopicPublisher();
        this.subscriber = this.connection.CreateSimpleTopicSubscriber(this.consumerId);
        this.subscriber.OnMessageReceived += new MessageReceivedDelegate(subscriber_OnMessageReceived);
        this.clientIdLabel.Enabled = false;
        this.clientIdTextBox.Enabled = false;
        this.connectButton.Enabled = false;
        this.messageTextBox.Enabled = true;
        this.instructionLabel.Enabled = true;
        this.historyTextBox.Enabled = true;
        this.submitButton.Enabled = true;
     }
     catch (Exception ex)
     {
        MessageBox.Show(ex.ToString());
        this.Close();
     }
}

So that the publish and subscribe bits are disabled (until a connection is established, remove the form load event and set the enabled property of messageTextBox, instructionLabel and submitButton to false.  You can also see from the code above that I’ve added another text box called historyTextBox.  This text box has the MultiLine property set to true.  Here’s how the form looks:

MessageSender

Here’s the full listing for the form:

using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Text;
using System.Windows.Forms;

using Core;

using ActiveMQ;

namespace ActiveMqMessageChat
{
    public partial class MainForm : Form
    {
        const string TOPIC_NAME = "SampleSubscriptionTopic";
        const string BROKER = "tcp://localhost:61616";

        private readonly TopicConnectionFactory connectionFactory = new TopicConnectionFactory(new ConnectionFactory(BROKER));
        private TopicConnection connection;
        private SimpleTopicPublisher publisher;
        private SimpleTopicSubscriber subscriber;
        private string clientId;
        private string consumerId;
        private readonly StringBuilder builder = new StringBuilder();
        private delegate void SetTextCallback(string text);

        public MainForm()
        {
            InitializeComponent();
        }

        private void submitButton_Click(object sender, EventArgs e)
        {
            this.publisher.SendMessage(this.messageTextBox.Text);
        }

        private void subscriber_OnMessageReceived(string message)
        {
            this.builder.AppendLine(message);
            SetText(this.builder.ToString());
        }

        private void SetText(string text)
        {
            // InvokeRequired required compares the thread ID of the
            // calling thread to the thread ID of the creating thread.
            // If these threads are different, it returns true.
            if (this.historyTextBox.InvokeRequired)
            {
                SetTextCallback d = new SetTextCallback(SetText);
                this.Invoke(d, new object[] { text });
            }
            else
            {
                this.historyTextBox.Text = text;
            }
        }


        private void MainForm_FormClosed(object sender, FormClosedEventArgs e)
        {
            try
            {
                this.publisher.Dispose();
                this.subscriber.Dispose();
                this.connection.Dispose();
            }
            catch { }
        }

        private void connectButton_Click(object sender, EventArgs e)
        {
            try
            {
                this.clientId = this.clientIdTextBox.Text;
                this.consumerId = this.clientId;

                this.connection = this.connectionFactory.CreateConnection(this.clientId, TOPIC_NAME);
                this.publisher = this.connection.CreateTopicPublisher();
                this.subscriber = this.connection.CreateSimpleTopicSubscriber(this.consumerId);
                this.subscriber.OnMessageReceived += new MessageReceivedDelegate(subscriber_OnMessageReceived);
                this.clientIdLabel.Enabled = false;
                this.clientIdTextBox.Enabled = false;
                this.connectButton.Enabled = false;
                this.messageTextBox.Enabled = true;
                this.instructionLabel.Enabled = true;
                this.historyTextBox.Enabled = true;
                this.submitButton.Enabled = true;
            }
            catch (Exception ex)
            {
                MessageBox.Show(ex.ToString());
                this.Close();
            }
        }
    }
}

The launcher

To make this a touch simpler to run, add another form and call it LaunchForm.  Make this form the form that gets run from the Program class.  Add one button called launchButton and set the text to “Create Client”.   Wire up an event handler so that every time the button is clicked, a new instance of MainForm is created.

OK.  Time to run the new form.  Create two instances and marvel at the ability to see messages sent to and fro.  Remember the purpose of the client id.  With two forms you can see the same message sent to two subscribers.  If you’ve got the console running, too then you can see the same message being sent to three subscribers.

And finally

It doesn’t end here.  The code in this article is an entry point to publish-subscribe with ActiveMQ and NMS.  There’s a lot more you can do.  Take a look at Virtual Destinations and Subscription Recovery Policy for starters.

Written by remark

July 7, 2007 at 10:08 pm

Posted in

Request-Response with ActiveMQ and NMS

leave a comment »

I’ve posted an article about how to implement request-response with ActiveMQ and NMS.   Read it here.

Written by remark

June 16, 2007 at 6:06 pm

Implementing Request-Response with ActiveMQ and NMS

with 20 comments

In my last article about using ActiveMQ and NMS, I demonstrated sending a message from a sender to a receiver, which is fine for a number of patterns including asynchronous interactions.  Sometimes, however, the request-response paradigm is what you need.

Where do we start?
Our starting point is this article, which explains how to implement the request-response paradigm with JMS.  The crux of this is to create a temporary queue and consumer per client on startup – the temporary queue creates a channel for the response.  While there’s some sample code provided, it’s JMS code, so let’s figure out what that would look like using NMS.  I’m assuming that you’ve got ActiveMQ installed and running.  If not, follow the instructions in my last article.

The Client
Create a new Console application.  I called mine ActiveMqRequestResponseConsole – you may be able to come up with a better name.  This application is going to listen send a message and wait for the response.  Add references to:

  1. Spring.Core
  2. ActiveMQ
  3. NMS
  4. Spring.Messaging.NMS

Here’s the code for the Program class:

using System;
using System.Collections.Generic;
using System.Text;

using ActiveMQ;
using Spring.Messaging.Nms;
using Spring.Messaging.Nms.Support.IDestinations;
using NMS;

namespace ActiveMqRequestResponseConsole
{
    class Program
    {
        private const string URI = "tcp://localhost:61616";
        private const string DESTINATION = "test.queue";

        static void Main(string[] args)
        {
            ConnectionFactory connectionFactory = new ConnectionFactory(URI);
            try
            {
                using (IConnection connection = connectionFactory.CreateConnection())
                {
                    using (ISession session = connection.CreateSession())
                    {
                        ITemporaryQueue queue = session.CreateTemporaryQueue();
                        using (IMessageConsumer consumer = session.CreateConsumer(queue))
                        {
                            string text = "A message for you.";
                            ITextMessage message = session.CreateTextMessage(text);
                            message.NMSReplyTo = queue;
                            string correlationId = Guid.NewGuid().ToString();
                            message.NMSCorrelationID = correlationId;
                            using (IMessageProducer producer = session.CreateProducer())
                            {
                                NmsDestinationAccessor destinationResolver = new NmsDestinationAccessor();
                                IDestination destination = destinationResolver.ResolveDestinationName(session, DESTINATION); 
                                producer.Send(destination, message);
                            }
                            IMessage response = consumer.Receive(TimeSpan.FromSeconds(60));
                            ITextMessage textMessage = response as ITextMessage;
                            Console.WriteLine(textMessage.Text);
                        }

                    }
                }

            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }
            Console.WriteLine("Press any key to exit...");
            Console.ReadLine();
        }
    }
}

This is a little different from what we did before.  We create a ConnectionFactory and the create an IConnection.  Then, we create an ISession.  According to the JMS documentation, a Session is “a single threaded context for producing and consuming messages.”  It provides factory methods for MessageProducers and MessageConsumers.  A Session can also be used to create Queues.  Using the session, we create an ITemporaryQueue, which we use as the ReplyTo property of the message that we are about to send.  Temporary destinations (like this queue we have created) are unique to a connection. We also use the session to create an IMessageConsumer – it is this consumer that will be used to receive the response.

With all this set up, we’re ready to create and send a message.  To do this, we create an IMessageProducer and an ITextMessage.  The important things to note are setting the NMSReplyTo property and the NMSCorrelationID property.  The NMSReplyTo property will inform the server how to reply to the message we send, and the NMSCorrelationID is used to tie responses up with the corresponding requests (in this code example, we’re only going to send one message, so correlation is trivial.)  All that’s left is to create an IDestination for the message (using the NMSDestinationAccessor provided by the Spring.Messaging.NMS framework.) 

Once the message has been sent, we instruct the MessageConsumer to receive the response (I’ve specified a generous if not excessive timeout of 60 seconds.)  Once the response has come back, it gets written out to the console.

The Server
Add another Console application to the solution.  I called this one ActiveMqServer and I renamed the Program class to Server to reduce the chances of confusing myself.  Admittedly, the name of ActiveMqServer.Server needs some more thought.  You need to add the same references that you added to the client console application.  Here’s the code for the Server class:

using System;
using System.Collections.Generic;
using System.Text;

using ActiveMQ;
using Spring.Messaging.Nms;
using Spring.Messaging.Nms.Listener;

namespace ActiveMqServer
{
    class Server
    {
        private const string URI = "tcp://localhost:61616";
        private const string DESTINATION = "test.queue";

        static void Main(string[] args)
        {
            try
            {
                ConnectionFactory connectionFactory = new ConnectionFactory(URI);
                using (SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer())
                {
                    listenerContainer.ConnectionFactory = connectionFactory;
                    listenerContainer.DestinationName = DESTINATION;
                    listenerContainer.MessageListener = new ActiveMqServer.Listener(listenerContainer);
                    listenerContainer.AfterPropertiesSet();
                    Console.WriteLine("Listener started.");
                    Console.WriteLine("Press any key to exit.");
                    Console.ReadLine();
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }
        }
    }
}

 This is very similar to the code in the last article.  I’ve moved the creation of the SimpleMessageListenerContainer into the Main method so that it can be disposed of easily.  The Listener class code is below:

using System;
using System.Collections.Generic;
using System.Text;

using Spring.Messaging.Nms;
using Spring.Messaging.Nms.Listener;
using NMS;

namespace ActiveMqServer
{
    class Listener : IMessageListener
    {
        private readonly SimpleMessageListenerContainer container;

        public Listener(SimpleMessageListenerContainer container)
        {
            this.container = container;
            Console.WriteLine("Listener created.");
        }

        #region IMessageListener Members

        public void OnMessage(IMessage message)
        {
            using (ISession session = this.container.SharedConnection.CreateSession())
            {
                Console.WriteLine("Message Received.");
                ITextMessage textMessage = message as ITextMessage;
                string incomingText = textMessage.Text;
                Console.WriteLine("Message: {0}", incomingText);
                string outgoingText = string.Format("Thanks for sending the following message: {0}", incomingText);

                IDestination destination = message.NMSReplyTo;
                if (destination != null)
                {
                    ITextMessage response = session.CreateTextMessage(outgoingText);
                    response.NMSCorrelationID = message.NMSCorrelationID;
                    using (IMessageProducer producer = session.CreateProducer(destination))
                    {
                        producer.Send(response);
                    }
                }
            }
        }

        #endregion
    }
}

As you can see, this Listener takes a reference to the container in the constructor.  This reference is used to get access to the ISession, in order to create an IMessageProducer to send the response.  The important things are to note are the use of the NMSReplyTo property of the incoming message as the IDestination for the response and the setting of the NMSCorrelationID property of the response to the NMSCorrelationID property of the incoming message.

Putting it all together
It’s time to run the code.  The first thing to do is to ensure that ActiveMQ is running.  (If you don’t you’ll get an exception in the client and you’ll notice that the exception handling in the server is, well, absent.)  Next, right click on the Solution and select Set Start Up Projects.  Choose Multiple startup projects and set the Action of each to be Start.  It makes sense to have the server start before the client, so alter the start up order by moving the server to the top of the list.

Take a deep breath, we’re going in.  Run the solution.  All being well, you should see the message receipt in the server console window and the response message (“Thanks for sending the following message: A message for you.”) in the client console window.

Conclusion
When coming to messaging from an RPC or web service background, it isn’t necessarily obvious how to go about implementing the request-response paradigm.  The code in this article lets you use ActiveMQ for the Request-Response paradigm.  With a little refactoring and maybe a light sprinkling of encapsulation, the mechanics could be abstracted away from the calling code.

Written by remark

June 16, 2007 at 6:00 pm

Posted in

ActiveMQ and NMS

leave a comment »

I’ve posted an article about how to use ActiveMQ from .NET. You can read it here.

Written by remark

May 20, 2007 at 5:07 pm

Messaging with .NET and ActiveMQ

with 83 comments

You’ve probably heard of Java Message Service (JMS). It’s a standard Java API for creating, sending, receiving and reading messages. ActiveMQ, an Apache project, is an open source message broker that supports JMS 1.1. In addition to supporting JMS, it supports other protocols that allow clients to be written in a variety of languages. Look at this page for more information. Sounds good, doesn’t it? Let’s try it out using .NET, C# and Visual Studio 2005.

Download and install the JDK

ActiveMQ is written in Java, so you’ll need a Java Runtime Environment (JRE) to be able to run it. The Java Development Kit (JDK) comes with extra utilities that you’ll find useful later on. I used the Java Development Kit SE 6 update 1, which you can find here.

Download and install ActiveMQ

Get the latest release of ActiveMQ from the downloads section. I used version 4.1.1. Once you have downloaded the zip file, extract the contents to a suitable folder. To test the installation, open a command prompt and use the cd command to set the current folder to be the installation folder (in which you extracted the ActiveMQ files.) Then type the following command:

bin\activemq

All being well, you should see a number of lines of information – the last of which should be something like:

INFO  ActiveMQ JMS Message Broker (localhost, ID:your-PC-51222-1140729837569-0:0) has started

The installation notes on the ActiveMQ site point out that there are working directories that get created relative to the current working folder. This means that for ActiveMQ to work correctly, you must start it from the installation folder. To double check, start a new command prompt and type the following command:

netstat -an|find "61616"

The response should be something like the following:

  TCP    0.0.0.0:61616          0.0.0.0:0              LISTENING

When you want to stop ActiveMQ, enter <CTRL+C>. For now leave it running.

Download Spring.Messaging.NMS

NMS is the .NET Messaging API. Spring .NET has a messaging library built on NMS. It’s under development, but you can get it here. I used version 20070320-1632. Extract the files from the zip file to somewhere sensible.

The Listener

Create a new console application. I called mine ListenerConsole. You need to add 4 references:

  1. Spring.Core
  2. ActiveMQ
  3. NMS
  4. Spring.Messaging.NMS

These dlls can all be found in the bin\net\2.0\debug folder of Spring.Messaging.NMS. Here’s the code for the Program class in the listener console:

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

using ActiveMQ;
using Spring.Messaging.Nms;
using Spring.Messaging.Nms.Listener;

namespace ListenerConsole
{
    class Program
    {
        private const string URI = "tcp://localhost:61616";
        private const string DESTINATION = "test.queue";

        static void Main(string[] args)
        {
            try
            {
                ConnectionFactory connectionFactory = new ConnectionFactory(URI);

                using (SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer())
                {
                    listenerContainer.ConnectionFactory = connectionFactory;
                    listenerContainer.DestinationName = DESTINATION;
                    listenerContainer.MessageListener = new Listener();
                    listenerContainer.AfterPropertiesSet();
                    Console.WriteLine("Listener started.");
                    Console.WriteLine("Press <ENTER> to exit.");
                    Console.ReadLine();
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
                Console.WriteLine("Press <ENTER> to exit.");
                Console.Read();
            }
            
        }
    }
}

The interesting part of the code is the creation and set up of the SimpleMessageListenerContainer. ConnectionFactory is from the ActiveMQ namespace and implements the IConnectionFactory interface defined in NMS. The Uri of the message broker is passed to the constructor. SimpleMessageListenerContainer is part of Spring.Messaging.NMS (in the Listener namespace.) Note that SimpleMessageListenerContainer implements IDisposable. The missing part of this puzzle is the Listener class. Create a new class in the project, call it Listener and insert the following code:

using System;
using Spring.Messaging.Nms;
using NMS;
namespace ListenerConsole
{
    class Listener : IMessageListener
    {
        public Listener()
        {
            Console.WriteLine("Listener created.rn");
        }
        #region IMessageListener Members

        public void OnMessage(NMS.IMessage message)
        {
            ITextMessage textMessage = message as ITextMessage;
            Console.WriteLine(textMessage.Text);
        }

        #endregion
    }
}

IMessageListener is defined in Spring.Messaging.NMS. Build and run the console.

Start jconsole

The JDK comes with a utility called jconsole. You’ll find it in the bin folder. So, launch a command prompt and cd to the bin folder of the JDK. Then type:

jconsole

This will launch the Java Monitoring & Management Console. To connect to the running instance of ActiveMQ, select Remote Process as shown below:

jconsole_activemq

Enter the following in the Remote Process text box: service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi It’s easier to work out what to enter here than you might think. If you go back to the command prompt in which you launched ActiveMQ and look towards the top of the output, you’ll find a line that reads:

INFO  ManagementContext - JMX consoles can connect to service:jmx:ri:///jndi/rmi://localhost:1099/jmxrmi

Click Connect. Select the MBeans tab. Navigate to org.apache.activemq->localhost->Queue->test.queue->Operations->sendTextMessage as shown below:

Sending a text message from jconsole.

In the text box next to the sendTextMessage button, enter some text. I entered (somewhat unimaginatively) “Hello”. Now, click sendTextMessage. In the .NET console window for the listener, you should see the text you entered. So what just happened? jconsole put a message on the queue using JMS and our console application read it using NMS. Why not send yourself a couple more messages?

The Sender

To complete our foray into messaging with ActiveMQ, let’s create an application that can send messages. Create another Console Application. I called mine SenderConsole. You may have started to notice a pattern in my naming conventions. Add the same references you added to the listener console. Here’s the code for the sender console:

using System;
using System.Collections.Generic;
using System.Text;

using ActiveMQ;
using Spring.Messaging.Nms;

namespace SenderConsole
{
    class Program
    {
        private const string URI = "tcp://localhost:61616";
        private const string DESTINATION = "test.queue";

        static void Main(string[] args)
        {
            ConnectionFactory connectionFactory = new ConnectionFactory(URI);
            NmsTemplate template = new NmsTemplate(connectionFactory);
            template.ConvertAndSend(DESTINATION, "Hello from the sender.");
        }
    }
}

Run the sender console and you should find that the message “Hello from the sender.” has appeared in the console window of the listener.

Conclusion

Sending and receiving messages using ActiveMQ is enabled by NMS and Spring.Messaging.NMS. We’ve seen how to create a simple set up using .NET that can be extended for real-world needs. JMS is no longer the preserve of Java developers.

Written by remark

May 20, 2007 at 4:20 pm

Posted in

Transactional Messages

leave a comment »

I’ve posted an article about transactional message processing using ActiveMQ and NMS. You can read it here.

Written by remark

September 16, 2007 at 3:24 pm