Publish-Subscribe with ActiveMq and NMS

In my previous articles about using ActiveMq and NMS, I have looked at sending a message from a sender to a receiver and implementing request-response.  In this article, I’m going to investigate how to use the publish-subscribe pattern with ActiveMq and NMS.

The Publish-Subscribe pattern

The Publish-Subscribe pattern (pub-sub) can be thought of as a distributed implementation of the Observer pattern.  The Observer pattern defines a ”dependency between objects so that when one object changes state, all its dependents are notified and updated automatically.”  In Enterprise Integration Patterns, Publish-Subscribe is defined as a way for “the sender [to] broadcast an event to all interested receivers.”  The JMS tutorial describes the basics of pub-sub as applied to JMS.  It is important to note the difference between a subscriber and a durable subscriber.  As subscriber will only receive messages that were sent while it is connected whereas a durable subscriber can receive messages sent while they were disconnected.  Also worth noting is the difference between synchronous message consumption and asynchronous message consumption.

Before we go any further…

…I’m making a couple of assumptions.  I’m assuming that you have already installed ActiveMq and have it running.  I’m also assuming that you have downloaded Spring.Messaging.Nms.  If you need more information about how to ensure these assumptions are valid, read my first article about ActiveMq and NMS.

A simple subscriber

Let’s get started by creating a simple subscriber.  Create a Windows Class Library project.  I called mine Core.  Add references to:

  • ActiveMQ
  • NMS

Create a class called SimpleTopicSubscriber.  The code for the class is listed below:

using System;

using ActiveMQ;
using ActiveMQ.Commands;
using NMS;

namespace Core
{
    public delegate void MessageReceivedDelegate(string message);

    public class SimpleTopicSubscriber : IDisposable
    {
        private readonly string topicName = null;
        private readonly IConnectionFactory connectionFactory;
        private readonly IConnection connection;
        private readonly ISession session;
        private readonly IMessageConsumer consumer;
        private bool isDisposed = false;
        public event MessageReceivedDelegate OnMessageReceived;

        public SimpleTopicSubscriber(string topicName, string brokerUri, string clientId, string consumerId)
        {
            this.topicName = topicName;
            this.connectionFactory = new ConnectionFactory(brokerUri);
            this.connection = this.connectionFactory.CreateConnection();
            this.connection.ClientId = clientId;
            this.connection.Start();
            this.session = connection.CreateSession();
            ActiveMQTopic topic = new ActiveMQTopic(topicName);
            this.consumer = this.session.CreateDurableConsumer(topic, consumerId, "2 > 1", false);
            this.consumer.Listener += new MessageListener(OnMessage);

        }

        public void OnMessage(IMessage message)
        {
            ITextMessage textMessage = message as ITextMessage;
            if (this.OnMessageReceived != null)
            {
                this.OnMessageReceived(textMessage.Text);
            }
        }

        #region IDisposable Members

        public void Dispose()
        {
            if (!this.isDisposed)
            {
                this.consumer.Dispose();
                this.session.Dispose();
                this.connection.Dispose();
                this.isDisposed = true;
            }
        }

        #endregion
    }
}

This class creates a durable subscription and consumes messages asynchronously.  All the work is done in the constructor.  The assignment of the client id is important in making this subscription durable.  Registering the OnMessage event handler provides the means to consume messages asynchronously.  The CreateDurableConsumer method on the session takes four parameters:

  • NMS.ITopic destination- the topic to which to subscribe
  • string name - the name of the consumer
  • string selector - I’m not sure what this parameter means yet.  I have copied this setting from some test cases I found for NMS.  And it seems to work.  I’ll try and figure out what it does and update the article because I don’t like to depend on magic.
  • bool noLocal - indicates whether you want to consume messages that originate locally.  So, setting this to false means we’ll receive all messages sent to the topic.

It’s worth noting that there’s a bunch of stuff that’ll need disposing - the connection, session and message consumer.  So, there’s a rudimentary implementation of IDisposable to clean up.  Finally, note that there’s an event that allows this class to bubble up the text in each message received.

Subscribe in action

Let’s see how this subscriber works.  Create a Windows Console application project.  I called mine ActiveMqFirstSubscriber.  This project needs references to:

  • the Core project
  • ActiveMQ
  • NMS

The code for Program class is:

using System;

using Core;

namespace ActiveMqFirstSubscriber
{
    class Program
    {
        const string TOPIC_NAME = "SampleSubscriptionTopic";
        const string BROKER = "tcp://localhost:61616";
        const string CLIENT_ID = "ActiveMqFirstSubscriber";
        const string CONSUMER_ID = "ActiveMqFirstSubscriber";

        static void Main(string[] args)
        {
            try
            {
                using (SimpleTopicSubscriber subscriber = new SimpleTopicSubscriber(TOPIC_NAME, BROKER, CLIENT_ID, CONSUMER_ID))
                {
                    subscriber.OnMessageReceived += new MessageReceivedDelegate(subscriber_OnMessageReceived);
                    Console.WriteLine("Press any key to exit...");
                    Console.ReadKey();
                }
            }
            catch(Exception ex)
            {
                Console.WriteLine(ex);
                Console.WriteLine("Press any key to exit...");
                Console.ReadKey();
            }

        }

        static void subscriber_OnMessageReceived(string message)
        {
            Console.WriteLine(message);
        }
    }
}

Run the console.  You should see a line telling you that you can press any key to exit.  Don’t exit yet.  Run jconsole and connect to ActiveMq (instructions here.)  Select the MBeans tab. Navigate to org.apache.activemq->localhost->Topic->SampleSubscriptionTopic->Operations->sendTextMessage as shown below:

jconsole-sendTextMessage-topic

In the text box next to the sendTextMessage button, enter some text.  Not wanting to break with tradition, I entered “Hello”.  You should see the text you entered show up in the console.  Now press any key in the console window and it will exit.  Send another message via jconsole (this time I sent “Hello again.”)  Run the console and there’s your message.

A simple publisher

Now we can subscribe to messages, the next step is to write a publisher.  Add a new class to the Core project and call it SimpleTopicPublisher.  Here’s the code:

using System;

using ActiveMQ;
using ActiveMQ.Commands;
using NMS;

namespace Core
{
    public class SimpleTopicPublisher : IDisposable
    {
        private readonly string topicName = null;
        private readonly IConnectionFactory connectionFactory;
        private readonly IConnection connection;
        private readonly ISession session;
        private readonly IMessageProducer producer;
        private bool isDisposed = false;

        public SimpleTopicPublisher(string topicName, string brokerUri)
        {
            this.topicName = topicName;
            this.connectionFactory = new ConnectionFactory(brokerUri);
            this.connection = this.connectionFactory.CreateConnection();
            this.connection.Start();
            this.session = connection.CreateSession();
            ActiveMQTopic topic = new ActiveMQTopic(topicName);
            this.producer = this.session.CreateProducer(topic);

        }

        public void SendMessage(string message)
        {
            if (!this.isDisposed)
            {
                ITextMessage textMessage = this.session.CreateTextMessage(message);
                this.producer.Send(textMessage);
            }
            else
            {
                throw new ObjectDisposedException(this.GetType().FullName);
            }
        }

        #region IDisposable Members

        public void Dispose()
        {
            if (!this.isDisposed)
            {
                this.producer.Dispose();
                this.session.Dispose();
                this.connection.Dispose();
                this.isDisposed = true;
            }
        }

        #endregion
    }
}

This code should all look pretty familiar after the subscriber code.  We don’t need to assign a clientId to the connection.  To be able to send messages, we create an IMessageProducer from the session.  The SendMessage method provides client code the ability to send messages to the topic.  Just like the subscriber, there’s a little cleaning up to do, so once again there’s a rudimentary implementation of IDisposable.

The publisher in action

Create a Windows Forms project.  I called mine TopicPublisher.  Create a form and call it MainForm.  Set the text of the form to “Publisher”.  Add a label called instructionLabel and set the text to “Enter a message.”  Add a text box called messageTextBox.  Add a button called sendButton and set the text to “Send Message”.  Create event handlers for the click event of button and the load and closed events of the form.  Here’s the code for the form:

using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Text;
using System.Windows.Forms;

using Core;

namespace TopicPublisher
{
    public partial class FirstMainForm : Form
    {
        const string TOPIC_NAME = "SampleSubscriptionTopic";
        const string BROKER = "tcp://localhost:61616";

        private SimpleTopicPublisher publisher;
        private readonly StringBuilder builder = new StringBuilder();
        private delegate void SetTextCallback(string text);

        public FirstMainForm()
        {
            InitializeComponent();
        }

        private void FirstMainForm_Load(object sender, EventArgs e)
        {
            try
            {
                this.publisher = new SimpleTopicPublisher(TOPIC_NAME, BROKER);
            }
            catch (Exception ex)
            {
                MessageBox.Show(ex.ToString());
                this.Close();
            }
        }

        private void sendButton_Click(object sender, EventArgs e)
        {
            this.publisher.SendMessage(this.messageTextBox.Text);
        }

        private void FirstMainForm_FormClosed(object sender, FormClosedEventArgs e)
        {
            try
            {
                this.publisher.Dispose();
            }
            catch (Exception ex)
            {
                MessageBox.Show(ex.ToString());
            }
        }
    }
}

Hopefully, this is all fairly obvious.  We create a publisher on load and dispose of it when the form closes.  When the button is clicked the text from the text box is sent to all subscribers.  If, like me, you have these projects in one solution, you’ll need to set both the Console application and the Windows Forms application to run at startup.  Go ahead and run the subscriber and the publisher.  You should be able to send messages form the form to the console.

The next step

So we can send messages from a to b.  And we know that if b is disconnected, it will receive the messages as soon as it reconnects.  To make this a little more interesting, how about a form that is both a publisher and a subscriber?

A little light refactoring

The obvious thing about SimpleTopicPublisher and SImpleTopicSubscriber is that they use their own connections and sessions.  If we are going to publish and subscribe from the same form, it’d make more sense to share connections and sessions.  Here’s the new code for SimpleTopicSubscriber:

using System;
using System.Collections.Generic;
using System.Text;

using ActiveMQ;
using NMS;

namespace Core
{
    public delegate void MessageReceivedDelegate(string message);

    public class SimpleTopicSubscriber : IDisposable
    {
        private readonly IMessageConsumer consumer;
        private bool isDisposed = false;
        public event MessageReceivedDelegate OnMessageReceived;

        public SimpleTopicSubscriber(IMessageConsumer consumer)
        {
            this.consumer = consumer;
            this.consumer.Listener += new MessageListener(OnMessage);
        }

        public void OnMessage(IMessage message)
        {
            ITextMessage textMessage = message as ITextMessage;
            if (this.OnMessageReceived != null)
            {
                this.OnMessageReceived(textMessage.Text);
            }
        }

        #region IDisposable Members

        public void Dispose()
        {
            if (!this.isDisposed)
            {
                this.consumer.Dispose();
                this.isDisposed = true;
            }
        }

        #endregion
    }
}

Now we pass an instance of IMessageConsumer in the constructor.  We can make a similar change to SimpleTopicPubisher by passing an instance of IMessageProducer in the constructor.  Here’s the code:

using System;
using System.Collections.Generic;
using System.Text;

using ActiveMQ;
using ActiveMQ.Commands;
using NMS;

namespace Core
{
    public class SimpleTopicPublisher : IDisposable
    {
        private readonly IMessageProducer producer;
        private bool isDisposed = false;

        public SimpleTopicPublisher(IMessageProducer producer)
        {
            this.producer = producer;
        }

        public void SendMessage(string message)
        {
            if (!this.isDisposed)
            {
                ITextMessage textMessage = new ActiveMQTextMessage(message);
                this.producer.Send(textMessage);
            }
            else
            {
                throw new ObjectDisposedException(this.GetType().FullName);
            }
        }

        #region IDisposable Members

        public void Dispose()
        {
            if (!this.isDisposed)
            {
                this.producer.Dispose();
                this.isDisposed = true;
            }
        }

        #endregion
    }
}

Since we probably don’t want to put all the code that creates IMessageConsumer instances and IMessageProducer interfaces into our form, we’ll need a new class that takes that responsibility.  Add a new class to the Core project and call it TopicConnection.  The code for the class is listed below:

using System;
using System.Collections.Generic;
using System.Text;

using ActiveMQ;
using ActiveMQ.Commands;
using NMS;

namespace Core
{
    public class TopicConnection : IDisposable
    {
        private readonly IConnection connection;
        private readonly ISession session;
        private readonly ITopic topic;
        private bool isDisposed = false;

        public TopicConnection(IConnectionFactory connectionFactory, string clientId, string topicName)
        {
            this.connection = connectionFactory.CreateConnection();
            this.connection.ClientId = clientId;
            this.connection.Start();
            this.session = this.connection.CreateSession();
            this.topic = new ActiveMQTopic(topicName);
        }

        public SimpleTopicPublisher CreateTopicPublisher()
        {
            IMessageProducer producer = this.session.CreateProducer(this.topic);
            return new SimpleTopicPublisher(producer);
        }

        public SimpleTopicSubscriber CreateSimpleTopicSubscriber(string consumerId)
        {
            IMessageConsumer consumer = this.session.CreateDurableConsumer(this.topic, consumerId, "2 > 1", false);
            return new SimpleTopicSubscriber(consumer);
        }

        #region IDisposable Members

        public void Dispose()
        {
            if (!this.isDisposed)
            {
                this.session.Dispose();
                this.connection.Dispose();
                this.isDisposed = true;
            }
        }

        #endregion
    }
}

This class has a method to create a SimpleTopicSubscriber and another to create a SimpleTopicPublisher.  The constructor takes an instance of IConnectionFactory, the client id and the name of the topic.  To complete this exercise, let’s add a class called TopicConnectionFactory.  This is a simple class that does little more than hold a reference to an instance of IConnectionFactory.  Here’s the code:

using System;
using System.Collections.Generic;
using System.Text;

using ActiveMQ;
using NMS;

namespace Core
{
    public class TopicConnectionFactory
    {
        private readonly IConnectionFactory connectionFactory;

        public TopicConnectionFactory(IConnectionFactory connectionFactory)
        {
            this.connectionFactory = connectionFactory;
        }

        public TopicConnection CreateConnection(string clientId, string topicName)
        {
            return new TopicConnection(this.connectionFactory, clientId, topicName);
        }
    }
}

Now the Core code is in better shape, let’s sort the form out.  We want to be able to set the client id from the form, so add a label, a text box and a button.  Call the label clientIdLabel and set the text to “Client Id”.  Call the text box clientIdTextBox.  Call the button connectButton and set the text to “Connect”.  Wire up an event handler to the click event of the button.  Here’s the code for the event handler:

private void connectButton_Click(object sender, EventArgs e)
{
    try
    {
        this.clientId = this.clientIdTextBox.Text;
        this.consumerId = this.clientId;
        this.connection = this.connectionFactory.CreateConnection(this.clientId, TOPIC_NAME);
        this.publisher = this.connection.CreateTopicPublisher();
        this.subscriber = this.connection.CreateSimpleTopicSubscriber(this.consumerId);
        this.subscriber.OnMessageReceived += new MessageReceivedDelegate(subscriber_OnMessageReceived);
        this.clientIdLabel.Enabled = false;
        this.clientIdTextBox.Enabled = false;
        this.connectButton.Enabled = false;
        this.messageTextBox.Enabled = true;
        this.instructionLabel.Enabled = true;
        this.historyTextBox.Enabled = true;
        this.submitButton.Enabled = true;
     }
     catch (Exception ex)
     {
        MessageBox.Show(ex.ToString());
        this.Close();
     }
}

So that the publish and subscribe bits are disabled (until a connection is established, remove the form load event and set the enabled property of messageTextBox, instructionLabel and submitButton to false.  You can also see from the code above that I’ve added another text box called historyTextBox.  This text box has the MultiLine property set to true.  Here’s how the form looks:

MessageSender

Here’s the full listing for the form:

using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Text;
using System.Windows.Forms;

using Core;

using ActiveMQ;

namespace ActiveMqMessageChat
{
    public partial class MainForm : Form
    {
        const string TOPIC_NAME = "SampleSubscriptionTopic";
        const string BROKER = "tcp://localhost:61616";

        private readonly TopicConnectionFactory connectionFactory = new TopicConnectionFactory(new ConnectionFactory(BROKER));
        private TopicConnection connection;
        private SimpleTopicPublisher publisher;
        private SimpleTopicSubscriber subscriber;
        private string clientId;
        private string consumerId;
        private readonly StringBuilder builder = new StringBuilder();
        private delegate void SetTextCallback(string text);

        public MainForm()
        {
            InitializeComponent();
        }

        private void submitButton_Click(object sender, EventArgs e)
        {
            this.publisher.SendMessage(this.messageTextBox.Text);
        }

        private void subscriber_OnMessageReceived(string message)
        {
            this.builder.AppendLine(message);
            SetText(this.builder.ToString());
        }

        private void SetText(string text)
        {
            // InvokeRequired required compares the thread ID of the
            // calling thread to the thread ID of the creating thread.
            // If these threads are different, it returns true.
            if (this.historyTextBox.InvokeRequired)
            {
                SetTextCallback d = new SetTextCallback(SetText);
                this.Invoke(d, new object[] { text });
            }
            else
            {
                this.historyTextBox.Text = text;
            }
        }

        private void MainForm_FormClosed(object sender, FormClosedEventArgs e)
        {
            try
            {
                this.publisher.Dispose();
                this.subscriber.Dispose();
                this.connection.Dispose();
            }
            catch { }
        }

        private void connectButton_Click(object sender, EventArgs e)
        {
            try
            {
                this.clientId = this.clientIdTextBox.Text;
                this.consumerId = this.clientId;

                this.connection = this.connectionFactory.CreateConnection(this.clientId, TOPIC_NAME);
                this.publisher = this.connection.CreateTopicPublisher();
                this.subscriber = this.connection.CreateSimpleTopicSubscriber(this.consumerId);
                this.subscriber.OnMessageReceived += new MessageReceivedDelegate(subscriber_OnMessageReceived);
                this.clientIdLabel.Enabled = false;
                this.clientIdTextBox.Enabled = false;
                this.connectButton.Enabled = false;
                this.messageTextBox.Enabled = true;
                this.instructionLabel.Enabled = true;
                this.historyTextBox.Enabled = true;
                this.submitButton.Enabled = true;
            }
            catch (Exception ex)
            {
                MessageBox.Show(ex.ToString());
                this.Close();
            }
        }
    }
}

The launcher

To make this a touch simpler to run, add another form and call it LaunchForm.  Make this form the form that gets run from the Program class.  Add one button called launchButton and set the text to “Create Client”.   Wire up an event handler so that every time the button is clicked, a new instance of MainForm is created.

OK.  Time to run the new form.  Create two instances and marvel at the ability to see messages sent to and fro.  Remember the purpose of the client id.  With two forms you can see the same message sent to two subscribers.  If you’ve got the console running, too then you can see the same message being sent to three subscribers.

And finally

It doesn’t end here.  The code in this article is an entry point to publish-subscribe with ActiveMQ and NMS.  There’s a lot more you can do.  Take a look at Virtual Destinations and Subscription Recovery Policy for starters.

26 responses to “Publish-Subscribe with ActiveMq and NMS”

9 08 2007
Edlueze (21:28:24) :

Yew Bwedy! Excellent article - thanks a lot for putting it together!! I needed to come up to speed on ActiveMQ and, for good measure, I decided to learn C# while I was at it - so I was ramping up two learning curves. Your article was a great - not only did I *finally* get some basic ActiveMQ publish-subscribe working but I also learned a lot about how C# works. You’re a champion - keep up the good work!

12 10 2007
Prakash (08:58:56) :

This is Excellent article, i need help about how to connect C# ssl client to ActiveMQ broker. If you known, give a some tips for how to connect C# ssl Client to AMQ

19 02 2008
Enterprise Messaging with ActiveMQ - Enterprise Application Integration (15:24:33) :

[...] in .NET, a good starting point which makes compelling reading is provided by Mark Bloodworth (see here).  In the coming weeks we will provide you with further postings on ActiveMQ and its [...]

25 02 2008
Publish-Subscribe with ActiveMq and NMS · Internet Articles ∞ (12:16:43) :

[...] Publish-Subscribe with ActiveMq and NMS [...]

5 05 2008
Stefan (19:42:42) :

Great series! Did you ever find out more info on the messageSelector parameter “2 >1″? This is the description given by the JavaDocs:

messageSelector - only messages with properties matching the message selector expression are delivered. A value of null or an empty string indicates that there is no message selector for the message consumer.

I’ve tried giving both a null value and empty string, but both raises an InvalidMessageSelectorException. There is also an overload in the Java API that doesn’t require this parameter, but it is not included in the .NET version.

5 05 2008
remark (20:26:29) :

Stefan,

I didn’t pursue the “2>1″ thing any further. I wonder if just passing in true would have the same effect. Interesting to see what you have tried and the results you have got. Someone out there must know the answer…

12 05 2008
Sam (21:10:45) :

I am trying Create Web Listener using AJAX and C# Dotnet. Do you any sample code for this?

12 05 2008
remark (21:32:37) :

Sam,

That’s not something I’ve tried to do. I’m interested to see how you get on.

13 05 2008
Sam (20:09:52) :

I have a asp.Net web application where I need to display real time data for users.

I thought Ajax whould be good solution. In aspNet 2.0, I have a scriptManager where I can start Triger to refresh every 2 seconds.I thought using ActiveMQ and NMS, I can write code to listen for Messages from JMS.

Do you think this would be feasible solution for this kind of application?

13 05 2008
remark (20:20:14) :

ActiveMQ has some info about using AJAX here. The example uses a servlet, but you may be able to adapt it to use .NET if you look at how the servlet is implemented.

13 05 2008
Sasha (21:33:40) :

Hello Mark!
First of all, thank you so much for this quick example/tutorial! I tried it a few days ago and it worked fine, and I learned the basics.
But now for some reason the messages are not delivered (I’m sending them from jconsole to my C# client), occasionally giving an error (in jconsole) such as “Error calling isBroadcaster: org.apache.activemq:BrokerName=localhost…”
Do you have ideas on why this may be happening?
Thank you!
Sasha

13 05 2008
remark (22:20:18) :

Sasha,

The error message looks to be originating in ActiveMQ. I’ve not seen it before myself, so I’d suggest trying the forums.

14 05 2008
Sasha (16:59:02) :

Mark, thank you for such prompt response!
I’ve found out at ActiveMQ dev forum it’s a known issue and it’s supposed to be fixed in 5.2 version which should be out any day now. So, hopefully that’ll fix it.

BTW, I’ve subscribed to your feed with my reader and definitely am enjoying reading your blog, thank you. :)

26 05 2008
vekaz (07:57:31) :

Great article, thanks a lot. I have one question though. When you say “Send another message via jconsole (this time I sent “Hello again.”) Run the console and there’s your message.”, does that mean that messages sent before subscriber is run should be picked up after I start the subscriber? If that is the case, it’s not working. I am running 5.1.

26 05 2008
26 05 2008
remark (09:26:50) :

vekaz,

My understanding is that messages sent when there are no subscribers will be discarded - unless you have previously registered a durable subscriber, in which case the message will be stored until the durable subscriber reconnects. In the example above, the console that spins up the subscriber should be started before the message is sent via jconsole.

The note about the environment variable is interesting. I wrote and ran the sample code on Windows Vista and version 4.1.1 of ActiveMq.

7 07 2008
Crive (13:12:00) :

Hello Mark!
First of all, thank you so much for these great examples!
You have put me on the right way on dealing with ActiveMQ.

I think I sorted out what the messageSelector does:
it is filtering messages by investigating their properties (message header).

a message (IMessage) has also Properties as method, this is a sort of Dictionary of key/value pairs, following code (VB.net) adds the Property ‘buddy’ with a value of ‘Crive’

Dim textMessage As ITextMessage = New ActiveMQTextMessage(message)
textMessage.Properties.SetString(”buddy”, “Crive” ;)
producer.Send(textMessage)

you will receive it by using the filterSelector “2 > 1″ but you can also receive it by using the filterSelector “buddy=’Crive’”, you will not receive it by using the filterSelector “buddy=’Mark’”.

this allows you to have a topic with several target audience but the receiver will receive only interested messages (not all sent to the topic).

I hope I’ve been clear… I am not a native english speaker ;)

7 07 2008
remark (13:53:31) :

Thanks Crive. That’s really useful information.

15 07 2008
Danish Ahmed (15:37:45) :

Hi Mark
Great article!!
I have a query about ActiveMQ load balancing. I want to know that does ActiveMQ supports multiple ActiveMQ brokers in LAN network to support load balancing like TIBCO Rendezvous?
For example i have 2 mahines on LAN. I run one instance of ActiveMQ on each machine. I have two client programs of ActiveMQ one is publisher and the other is subscriber. I run publisher on one machine and subscriber on another machine. Publisher is connected to its local ActiveMQ broker and Subscriber is connected to its local ActiveMQ broker. Now if the publisher sends a message on a topic to which the subscriber is listening, does subscriber gets the message or not? Please reply in both cases. if yes then please tell me how i achive this. The picture of scenario is like this:

same mahine LAN same machine
Publihser ActiveMQ ActiveMQ Subscriber

So is it possible that publisher and subscriber communicate with each other?

15 07 2008
remark (16:53:42) :

I’ve not looked into the load balancing capabilities of ActiveMQ. I’d suggest taking a look at this information.

1 08 2008
Daniel (01:39:14) :

I wish .NET comes with messaging support out of the box. I just recently had to implement a similar pub-sub model with WCF.

1 08 2008
Jashan (03:38:41) :

This article is helpful. However I have some troubles at the time of running durable consumer (given in “A simple Subscriber” and “Subscriber in action” sections).

When I close the console based durable subscriber and send message through jconsole, I see the following message in a new window. “Error calling isBroadcaster: org.apache.activemq:BrokerName=localhost,
Type=Subscription,persistentMode=Non-Durable,
destinationType=Topic,destinationName=topic_//ActiveMQ.Advisory.TempQueue_topic_//ActiveMQ.Advisory.TempTopic,clientId=ID_willow-1155-1217553490828-18_0,consumerId=ID_willow-1155-1217553490828-2_15_-1_1″

After that when I start the durable consumer console I don’t see any of the sent messages.

So how can I receive those messages?

Note: I am using ActiveMQ 5.1

1 08 2008
Adam (17:45:21) :

“A message selector allows a JMS consumer to be more selective about the messages it receives from a particular destination (topic or queue). Message selectors use message properties and headers as criteria in conditional expressions. These conditional expressions use boolean logic to declare which messages should be delivered to a JMS consumer.” - Java Message Service by Richard Monson - Haefel & David A. Chappell

Which means, having a selector “2 > 1″, your consumer will receive every message that is sent to it. Alternatively, if you had “1 > 2″, you wouldn’t receive any messages.

5 08 2008
David Kepes (00:45:13) :

Hi,
Is there any way to receibe a message syncronously by a correlationId using nms and activeMq?,
Thanks for your help

14 08 2008
Simone Busoli (11:37:26) :


It is important to note the difference between a subscriber and a durable subscriber. As subscriber will only receive messages that were sent while it is connected whereas a durable subscriber can receive messages sent while they were disconnected.

I’m not sure this is true. It’s not easy to find documentation about this topic, but as far as I’ve been able to experiment durable consumer and persistent messages are two distinct concepts.
Durable consumers subscribe to a topic, and, if the broker goes down, they don’t need to subscribe again because the broker will remember them and start do dispatch messages again. This DOES NOT mean that they’ll receive messages that were sent if and while they were offline. In order for this to happen the messages must be persistent. So, in order to receive messages that were sent while me subscriber was offline I need to be a durable consumer and the message to be persistent.
Note that I’m not sure about my statements, it’s a bit intricate so I would appreciate any insights about this.

14 08 2008
remark (14:02:02) :

SImone,

Good point about the effect of persistent messages - I don’t know either. I wrote this a while ago - so the detail’s a little fuzzy - but I did some rudimentary testing of shutting the subscriber down and restarting it. I did get messages that I would otherwise have missed - it would be good if shed some more light on this aspect of using ActiveMQ.

Leave a comment

You can use these tags : <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>