Re.Mark

My Life As A Blog

Publish-Subscribe with ActiveMq and NMS

with 55 comments

In my previous articles about using ActiveMq and NMS, I have looked at sending a message from a sender to a receiver and implementing request-response.  In this article, I’m going to investigate how to use the publish-subscribe pattern with ActiveMq and NMS.

The Publish-Subscribe pattern

The Publish-Subscribe pattern (pub-sub) can be thought of as a distributed implementation of the Observer pattern.  The Observer pattern defines a “dependency between objects so that when one object changes state, all its dependents are notified and updated automatically.”  In Enterprise Integration Patterns, Publish-Subscribe is defined as a way for “the sender [to] broadcast an event to all interested receivers.”  The JMS tutorial describes the basics of pub-sub as applied to JMS.  It is important to note the difference between a subscriber and a durable subscriber.  As subscriber will only receive messages that were sent while it is connected whereas a durable subscriber can receive messages sent while they were disconnected.  Also worth noting is the difference between synchronous message consumption and asynchronous message consumption.

Before we go any further…

…I’m making a couple of assumptions.  I’m assuming that you have already installed ActiveMq and have it running.  I’m also assuming that you have downloaded Spring.Messaging.Nms.  If you need more information about how to ensure these assumptions are valid, read my first article about ActiveMq and NMS.

A simple subscriber

Let’s get started by creating a simple subscriber.  Create a Windows Class Library project.  I called mine Core.  Add references to:

  • ActiveMQ
  • NMS

Create a class called SimpleTopicSubscriber.  The code for the class is listed below:

using System;

using ActiveMQ;
using ActiveMQ.Commands;
using NMS;

namespace Core
{
    public delegate void MessageReceivedDelegate(string message);

    public class SimpleTopicSubscriber : IDisposable
    {
        private readonly string topicName = null;
        private readonly IConnectionFactory connectionFactory;
        private readonly IConnection connection;
        private readonly ISession session;
        private readonly IMessageConsumer consumer;
        private bool isDisposed = false;
        public event MessageReceivedDelegate OnMessageReceived;

        public SimpleTopicSubscriber(string topicName, string brokerUri, string clientId, string consumerId)
        {
            this.topicName = topicName;
            this.connectionFactory = new ConnectionFactory(brokerUri);
            this.connection = this.connectionFactory.CreateConnection();
            this.connection.ClientId = clientId;
            this.connection.Start();
            this.session = connection.CreateSession();
            ActiveMQTopic topic = new ActiveMQTopic(topicName);
            this.consumer = this.session.CreateDurableConsumer(topic, consumerId, "2 > 1", false);
            this.consumer.Listener += new MessageListener(OnMessage);

        }

        public void OnMessage(IMessage message)
        {
            ITextMessage textMessage = message as ITextMessage;
            if (this.OnMessageReceived != null)
            {
                this.OnMessageReceived(textMessage.Text);
            }
        }


        #region IDisposable Members

        public void Dispose()
        {
            if (!this.isDisposed)
            {
                this.consumer.Dispose();
                this.session.Dispose();
                this.connection.Dispose();
                this.isDisposed = true;
            }
        }

        #endregion
    }
}

This class creates a durable subscription and consumes messages asynchronously.  All the work is done in the constructor.  The assignment of the client id is important in making this subscription durable.  Registering the OnMessage event handler provides the means to consume messages asynchronously.  The CreateDurableConsumer method on the session takes four parameters:

  • NMS.ITopic destination- the topic to which to subscribe
  • string name – the name of the consumer
  • string selector – I’m not sure what this parameter means yet.  I have copied this setting from some test cases I found for NMS.  And it seems to work.  I’ll try and figure out what it does and update the article because I don’t like to depend on magic.
  • bool noLocal – indicates whether you want to consume messages that originate locally.  So, setting this to false means we’ll receive all messages sent to the topic.

It’s worth noting that there’s a bunch of stuff that’ll need disposing – the connection, session and message consumer.  So, there’s a rudimentary implementation of IDisposable to clean up.  Finally, note that there’s an event that allows this class to bubble up the text in each message received.

Subscribe in action

Let’s see how this subscriber works.  Create a Windows Console application project.  I called mine ActiveMqFirstSubscriber.  This project needs references to:

  • the Core project
  • ActiveMQ
  • NMS

The code for Program class is:

using System;

using Core;

namespace ActiveMqFirstSubscriber
{
    class Program
    {
        const string TOPIC_NAME = "SampleSubscriptionTopic";
        const string BROKER = "tcp://localhost:61616";
        const string CLIENT_ID = "ActiveMqFirstSubscriber";
        const string CONSUMER_ID = "ActiveMqFirstSubscriber";

        static void Main(string[] args)
        {
            try
            {
                using (SimpleTopicSubscriber subscriber = new SimpleTopicSubscriber(TOPIC_NAME, BROKER, CLIENT_ID, CONSUMER_ID))
                {
                    subscriber.OnMessageReceived += new MessageReceivedDelegate(subscriber_OnMessageReceived);
                    Console.WriteLine("Press any key to exit...");
                    Console.ReadKey();
                }
            }
            catch(Exception ex)
            {
                Console.WriteLine(ex);
                Console.WriteLine("Press any key to exit...");
                Console.ReadKey();
            }

            
        }

        static void subscriber_OnMessageReceived(string message)
        {
            Console.WriteLine(message);
        }
    }
}

Run the console.  You should see a line telling you that you can press any key to exit.  Don’t exit yet.  Run jconsole and connect to ActiveMq (instructions here.)  Select the MBeans tab. Navigate to org.apache.activemq->localhost->Topic->SampleSubscriptionTopic->Operations->sendTextMessage as shown below:

jconsole-sendTextMessage-topic

In the text box next to the sendTextMessage button, enter some text.  Not wanting to break with tradition, I entered “Hello”.  You should see the text you entered show up in the console.  Now press any key in the console window and it will exit.  Send another message via jconsole (this time I sent “Hello again.”)  Run the console and there’s your message.

A simple publisher

Now we can subscribe to messages, the next step is to write a publisher.  Add a new class to the Core project and call it SimpleTopicPublisher.  Here’s the code:

using System;

using ActiveMQ;
using ActiveMQ.Commands;
using NMS;

namespace Core
{
    public class SimpleTopicPublisher : IDisposable
    {
        private readonly string topicName = null;
        private readonly IConnectionFactory connectionFactory;
        private readonly IConnection connection;
        private readonly ISession session;
        private readonly IMessageProducer producer;
        private bool isDisposed = false;

        public SimpleTopicPublisher(string topicName, string brokerUri)
        {
            this.topicName = topicName;
            this.connectionFactory = new ConnectionFactory(brokerUri);
            this.connection = this.connectionFactory.CreateConnection();
            this.connection.Start();
            this.session = connection.CreateSession();
            ActiveMQTopic topic = new ActiveMQTopic(topicName);
            this.producer = this.session.CreateProducer(topic);
            
        }

        public void SendMessage(string message)
        {
            if (!this.isDisposed)
            {
                ITextMessage textMessage = this.session.CreateTextMessage(message);
                this.producer.Send(textMessage);
            }
            else
            {
                throw new ObjectDisposedException(this.GetType().FullName);
            }
        }

        #region IDisposable Members

        public void Dispose()
        {
            if (!this.isDisposed)
            {
                this.producer.Dispose();
                this.session.Dispose();
                this.connection.Dispose();
                this.isDisposed = true;
            }
        }

        #endregion
    }
}

This code should all look pretty familiar after the subscriber code.  We don’t need to assign a clientId to the connection.  To be able to send messages, we create an IMessageProducer from the session.  The SendMessage method provides client code the ability to send messages to the topic.  Just like the subscriber, there’s a little cleaning up to do, so once again there’s a rudimentary implementation of IDisposable.

The publisher in action

Create a Windows Forms project.  I called mine TopicPublisher.  Create a form and call it MainForm.  Set the text of the form to “Publisher”.  Add a label called instructionLabel and set the text to “Enter a message.”  Add a text box called messageTextBox.  Add a button called sendButton and set the text to “Send Message”.  Create event handlers for the click event of button and the load and closed events of the form.  Here’s the code for the form:

using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Text;
using System.Windows.Forms;

using Core;

namespace TopicPublisher
{
    public partial class FirstMainForm : Form
    {
        const string TOPIC_NAME = "SampleSubscriptionTopic";
        const string BROKER = "tcp://localhost:61616";

        private SimpleTopicPublisher publisher;
        private readonly StringBuilder builder = new StringBuilder();
        private delegate void SetTextCallback(string text);

        public FirstMainForm()
        {
            InitializeComponent();
        }

        private void FirstMainForm_Load(object sender, EventArgs e)
        {
            try
            {
                this.publisher = new SimpleTopicPublisher(TOPIC_NAME, BROKER);
            }
            catch (Exception ex)
            {
                MessageBox.Show(ex.ToString());
                this.Close();
            }
        }

        private void sendButton_Click(object sender, EventArgs e)
        {
            this.publisher.SendMessage(this.messageTextBox.Text);
        }

        private void FirstMainForm_FormClosed(object sender, FormClosedEventArgs e)
        {
            try
            {
                this.publisher.Dispose();
            }
            catch (Exception ex)
            {
                MessageBox.Show(ex.ToString());
            }
        }
    }
}

Hopefully, this is all fairly obvious.  We create a publisher on load and dispose of it when the form closes.  When the button is clicked the text from the text box is sent to all subscribers.  If, like me, you have these projects in one solution, you’ll need to set both the Console application and the Windows Forms application to run at startup.  Go ahead and run the subscriber and the publisher.  You should be able to send messages form the form to the console.

The next step

So we can send messages from a to b.  And we know that if b is disconnected, it will receive the messages as soon as it reconnects.  To make this a little more interesting, how about a form that is both a publisher and a subscriber?

A little light refactoring

The obvious thing about SimpleTopicPublisher and SImpleTopicSubscriber is that they use their own connections and sessions.  If we are going to publish and subscribe from the same form, it’d make more sense to share connections and sessions.  Here’s the new code for SimpleTopicSubscriber:

using System;
using System.Collections.Generic;
using System.Text;

using ActiveMQ;
using NMS;

namespace Core
{
    public delegate void MessageReceivedDelegate(string message);

    public class SimpleTopicSubscriber : IDisposable
    {
        private readonly IMessageConsumer consumer;
        private bool isDisposed = false;
        public event MessageReceivedDelegate OnMessageReceived;

        public SimpleTopicSubscriber(IMessageConsumer consumer)
        {
            this.consumer = consumer;
            this.consumer.Listener += new MessageListener(OnMessage);
        }

        public void OnMessage(IMessage message)
        {
            ITextMessage textMessage = message as ITextMessage;
            if (this.OnMessageReceived != null)
            {
                this.OnMessageReceived(textMessage.Text);
            }
        }

        #region IDisposable Members

        public void Dispose()
        {
            if (!this.isDisposed)
            {
                this.consumer.Dispose();
                this.isDisposed = true;
            }
        }

        #endregion
    }
}

Now we pass an instance of IMessageConsumer in the constructor.  We can make a similar change to SimpleTopicPubisher by passing an instance of IMessageProducer in the constructor.  Here’s the code:

using System;
using System.Collections.Generic;
using System.Text;

using ActiveMQ;
using ActiveMQ.Commands;
using NMS;

namespace Core
{
    public class SimpleTopicPublisher : IDisposable
    {
        private readonly IMessageProducer producer;
        private bool isDisposed = false;

        public SimpleTopicPublisher(IMessageProducer producer)
        {
            this.producer = producer;
        }

        public void SendMessage(string message)
        {
            if (!this.isDisposed)
            {
                ITextMessage textMessage = new ActiveMQTextMessage(message);
                this.producer.Send(textMessage);
            }
            else
            {
                throw new ObjectDisposedException(this.GetType().FullName);
            }
        }

        #region IDisposable Members

        public void Dispose()
        {
            if (!this.isDisposed)
            {
                this.producer.Dispose();
                this.isDisposed = true;
            }
        }

        #endregion
    }
}

Since we probably don’t want to put all the code that creates IMessageConsumer instances and IMessageProducer interfaces into our form, we’ll need a new class that takes that responsibility.  Add a new class to the Core project and call it TopicConnection.  The code for the class is listed below:

using System;
using System.Collections.Generic;
using System.Text;

using ActiveMQ;
using ActiveMQ.Commands;
using NMS;

namespace Core
{
    public class TopicConnection : IDisposable
    {
        private readonly IConnection connection;
        private readonly ISession session;
        private readonly ITopic topic;
        private bool isDisposed = false;

        public TopicConnection(IConnectionFactory connectionFactory, string clientId, string topicName)
        {
            this.connection = connectionFactory.CreateConnection();
            this.connection.ClientId = clientId;
            this.connection.Start();
            this.session = this.connection.CreateSession();
            this.topic = new ActiveMQTopic(topicName);
        }

        public SimpleTopicPublisher CreateTopicPublisher()
        {
            IMessageProducer producer = this.session.CreateProducer(this.topic);
            return new SimpleTopicPublisher(producer);
        }

        public SimpleTopicSubscriber CreateSimpleTopicSubscriber(string consumerId)
        {
            IMessageConsumer consumer = this.session.CreateDurableConsumer(this.topic, consumerId, "2 > 1", false);
            return new SimpleTopicSubscriber(consumer);
        }

        #region IDisposable Members

        public void Dispose()
        {
            if (!this.isDisposed)
            {
                this.session.Dispose();
                this.connection.Dispose();
                this.isDisposed = true;
            }
        }

        #endregion
    }
}

This class has a method to create a SimpleTopicSubscriber and another to create a SimpleTopicPublisher.  The constructor takes an instance of IConnectionFactory, the client id and the name of the topic.  To complete this exercise, let’s add a class called TopicConnectionFactory.  This is a simple class that does little more than hold a reference to an instance of IConnectionFactory.  Here’s the code:

using System;
using System.Collections.Generic;
using System.Text;

using ActiveMQ;
using NMS;

namespace Core
{
    public class TopicConnectionFactory
    {
        private readonly IConnectionFactory connectionFactory;

        public TopicConnectionFactory(IConnectionFactory connectionFactory)
        {
            this.connectionFactory = connectionFactory;
        }

        public TopicConnection CreateConnection(string clientId, string topicName)
        {
            return new TopicConnection(this.connectionFactory, clientId, topicName);
        }
    }
}

Now the Core code is in better shape, let’s sort the form out.  We want to be able to set the client id from the form, so add a label, a text box and a button.  Call the label clientIdLabel and set the text to “Client Id”.  Call the text box clientIdTextBox.  Call the button connectButton and set the text to “Connect”.  Wire up an event handler to the click event of the button.  Here’s the code for the event handler:

private void connectButton_Click(object sender, EventArgs e)
{
    try
    {
        this.clientId = this.clientIdTextBox.Text;
        this.consumerId = this.clientId;
        this.connection = this.connectionFactory.CreateConnection(this.clientId, TOPIC_NAME);
        this.publisher = this.connection.CreateTopicPublisher();
        this.subscriber = this.connection.CreateSimpleTopicSubscriber(this.consumerId);
        this.subscriber.OnMessageReceived += new MessageReceivedDelegate(subscriber_OnMessageReceived);
        this.clientIdLabel.Enabled = false;
        this.clientIdTextBox.Enabled = false;
        this.connectButton.Enabled = false;
        this.messageTextBox.Enabled = true;
        this.instructionLabel.Enabled = true;
        this.historyTextBox.Enabled = true;
        this.submitButton.Enabled = true;
     }
     catch (Exception ex)
     {
        MessageBox.Show(ex.ToString());
        this.Close();
     }
}

So that the publish and subscribe bits are disabled (until a connection is established, remove the form load event and set the enabled property of messageTextBox, instructionLabel and submitButton to false.  You can also see from the code above that I’ve added another text box called historyTextBox.  This text box has the MultiLine property set to true.  Here’s how the form looks:

MessageSender

Here’s the full listing for the form:

using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Text;
using System.Windows.Forms;

using Core;

using ActiveMQ;

namespace ActiveMqMessageChat
{
    public partial class MainForm : Form
    {
        const string TOPIC_NAME = "SampleSubscriptionTopic";
        const string BROKER = "tcp://localhost:61616";

        private readonly TopicConnectionFactory connectionFactory = new TopicConnectionFactory(new ConnectionFactory(BROKER));
        private TopicConnection connection;
        private SimpleTopicPublisher publisher;
        private SimpleTopicSubscriber subscriber;
        private string clientId;
        private string consumerId;
        private readonly StringBuilder builder = new StringBuilder();
        private delegate void SetTextCallback(string text);

        public MainForm()
        {
            InitializeComponent();
        }

        private void submitButton_Click(object sender, EventArgs e)
        {
            this.publisher.SendMessage(this.messageTextBox.Text);
        }

        private void subscriber_OnMessageReceived(string message)
        {
            this.builder.AppendLine(message);
            SetText(this.builder.ToString());
        }

        private void SetText(string text)
        {
            // InvokeRequired required compares the thread ID of the
            // calling thread to the thread ID of the creating thread.
            // If these threads are different, it returns true.
            if (this.historyTextBox.InvokeRequired)
            {
                SetTextCallback d = new SetTextCallback(SetText);
                this.Invoke(d, new object[] { text });
            }
            else
            {
                this.historyTextBox.Text = text;
            }
        }


        private void MainForm_FormClosed(object sender, FormClosedEventArgs e)
        {
            try
            {
                this.publisher.Dispose();
                this.subscriber.Dispose();
                this.connection.Dispose();
            }
            catch { }
        }

        private void connectButton_Click(object sender, EventArgs e)
        {
            try
            {
                this.clientId = this.clientIdTextBox.Text;
                this.consumerId = this.clientId;

                this.connection = this.connectionFactory.CreateConnection(this.clientId, TOPIC_NAME);
                this.publisher = this.connection.CreateTopicPublisher();
                this.subscriber = this.connection.CreateSimpleTopicSubscriber(this.consumerId);
                this.subscriber.OnMessageReceived += new MessageReceivedDelegate(subscriber_OnMessageReceived);
                this.clientIdLabel.Enabled = false;
                this.clientIdTextBox.Enabled = false;
                this.connectButton.Enabled = false;
                this.messageTextBox.Enabled = true;
                this.instructionLabel.Enabled = true;
                this.historyTextBox.Enabled = true;
                this.submitButton.Enabled = true;
            }
            catch (Exception ex)
            {
                MessageBox.Show(ex.ToString());
                this.Close();
            }
        }
    }
}

The launcher

To make this a touch simpler to run, add another form and call it LaunchForm.  Make this form the form that gets run from the Program class.  Add one button called launchButton and set the text to “Create Client”.   Wire up an event handler so that every time the button is clicked, a new instance of MainForm is created.

OK.  Time to run the new form.  Create two instances and marvel at the ability to see messages sent to and fro.  Remember the purpose of the client id.  With two forms you can see the same message sent to two subscribers.  If you’ve got the console running, too then you can see the same message being sent to three subscribers.

And finally

It doesn’t end here.  The code in this article is an entry point to publish-subscribe with ActiveMQ and NMS.  There’s a lot more you can do.  Take a look at Virtual Destinations and Subscription Recovery Policy for starters.

Written by remark

July 7, 2007 at 10:08 pm

55 Responses

Subscribe to comments with RSS.

  1. Yew Bwedy! Excellent article – thanks a lot for putting it together!! I needed to come up to speed on ActiveMQ and, for good measure, I decided to learn C# while I was at it – so I was ramping up two learning curves. Your article was a great – not only did I *finally* get some basic ActiveMQ publish-subscribe working but I also learned a lot about how C# works. You’re a champion – keep up the good work!

    Edlueze

    August 9, 2007 at 9:28 pm

  2. This is Excellent article, i need help about how to connect C# ssl client to ActiveMQ broker. If you known, give a some tips for how to connect C# ssl Client to AMQ

    Prakash

    October 12, 2007 at 8:58 am

  3. […] in .NET, a good starting point which makes compelling reading is provided by Mark Bloodworth (see here).  In the coming weeks we will provide you with further postings on ActiveMQ and its […]

  4. […] Publish-Subscribe with ActiveMq and NMS […]

  5. Great series! Did you ever find out more info on the messageSelector parameter “2 >1”? This is the description given by the JavaDocs:

    messageSelector – only messages with properties matching the message selector expression are delivered. A value of null or an empty string indicates that there is no message selector for the message consumer.

    I’ve tried giving both a null value and empty string, but both raises an InvalidMessageSelectorException. There is also an overload in the Java API that doesn’t require this parameter, but it is not included in the .NET version.

    Stefan

    May 5, 2008 at 7:42 pm

  6. Stefan,

    I didn’t pursue the “2>1” thing any further. I wonder if just passing in true would have the same effect. Interesting to see what you have tried and the results you have got. Someone out there must know the answer…

    remark

    May 5, 2008 at 8:26 pm

  7. I am trying Create Web Listener using AJAX and C# Dotnet. Do you any sample code for this?

    Sam

    May 12, 2008 at 9:10 pm

  8. Sam,

    That’s not something I’ve tried to do. I’m interested to see how you get on.

    remark

    May 12, 2008 at 9:32 pm

  9. I have a asp.Net web application where I need to display real time data for users.

    I thought Ajax whould be good solution. In aspNet 2.0, I have a scriptManager where I can start Triger to refresh every 2 seconds.I thought using ActiveMQ and NMS, I can write code to listen for Messages from JMS.

    Do you think this would be feasible solution for this kind of application?

    Sam

    May 13, 2008 at 8:09 pm

  10. ActiveMQ has some info about using AJAX here. The example uses a servlet, but you may be able to adapt it to use .NET if you look at how the servlet is implemented.

    remark

    May 13, 2008 at 8:20 pm

  11. Hello Mark!
    First of all, thank you so much for this quick example/tutorial! I tried it a few days ago and it worked fine, and I learned the basics.
    But now for some reason the messages are not delivered (I’m sending them from jconsole to my C# client), occasionally giving an error (in jconsole) such as “Error calling isBroadcaster: org.apache.activemq:BrokerName=localhost…”
    Do you have ideas on why this may be happening?
    Thank you!
    Sasha

    Sasha

    May 13, 2008 at 9:33 pm

  12. Sasha,

    The error message looks to be originating in ActiveMQ. I’ve not seen it before myself, so I’d suggest trying the forums.

    remark

    May 13, 2008 at 10:20 pm

  13. Mark, thank you for such prompt response!
    I’ve found out at ActiveMQ dev forum it’s a known issue and it’s supposed to be fixed in 5.2 version which should be out any day now. So, hopefully that’ll fix it.

    BTW, I’ve subscribed to your feed with my reader and definitely am enjoying reading your blog, thank you. 🙂

    Sasha

    May 14, 2008 at 4:59 pm

  14. Great article, thanks a lot. I have one question though. When you say “Send another message via jconsole (this time I sent “Hello again.”) Run the console and there’s your message.”, does that mean that messages sent before subscriber is run should be picked up after I start the subscriber? If that is the case, it’s not working. I am running 5.1.

    vekaz

    May 26, 2008 at 7:57 am

  15. vekaz,

    My understanding is that messages sent when there are no subscribers will be discarded – unless you have previously registered a durable subscriber, in which case the message will be stored until the durable subscriber reconnects. In the example above, the console that spins up the subscriber should be started before the message is sent via jconsole.

    The note about the environment variable is interesting. I wrote and ran the sample code on Windows Vista and version 4.1.1 of ActiveMq.

    remark

    May 26, 2008 at 9:26 am

  16. Hello Mark!
    First of all, thank you so much for these great examples!
    You have put me on the right way on dealing with ActiveMQ.

    I think I sorted out what the messageSelector does:
    it is filtering messages by investigating their properties (message header).

    a message (IMessage) has also Properties as method, this is a sort of Dictionary of key/value pairs, following code (VB.net) adds the Property ‘buddy’ with a value of ‘Crive’

    Dim textMessage As ITextMessage = New ActiveMQTextMessage(message)
    textMessage.Properties.SetString(“buddy”, “Crive”)
    producer.Send(textMessage)

    you will receive it by using the filterSelector “2 > 1” but you can also receive it by using the filterSelector “buddy=’Crive'”, you will not receive it by using the filterSelector “buddy=’Mark'”.

    this allows you to have a topic with several target audience but the receiver will receive only interested messages (not all sent to the topic).

    I hope I’ve been clear… I am not a native english speaker 😉

    Crive

    July 7, 2008 at 1:12 pm

  17. Thanks Crive. That’s really useful information.

    remark

    July 7, 2008 at 1:53 pm

  18. Hi Mark
    Great article!!
    I have a query about ActiveMQ load balancing. I want to know that does ActiveMQ supports multiple ActiveMQ brokers in LAN network to support load balancing like TIBCO Rendezvous?
    For example i have 2 mahines on LAN. I run one instance of ActiveMQ on each machine. I have two client programs of ActiveMQ one is publisher and the other is subscriber. I run publisher on one machine and subscriber on another machine. Publisher is connected to its local ActiveMQ broker and Subscriber is connected to its local ActiveMQ broker. Now if the publisher sends a message on a topic to which the subscriber is listening, does subscriber gets the message or not? Please reply in both cases. if yes then please tell me how i achive this. The picture of scenario is like this:

    same mahine LAN same machine
    Publihser ActiveMQ ActiveMQ Subscriber

    So is it possible that publisher and subscriber communicate with each other?

    Danish Ahmed

    July 15, 2008 at 3:37 pm

  19. I’ve not looked into the load balancing capabilities of ActiveMQ. I’d suggest taking a look at this information.

    remark

    July 15, 2008 at 4:53 pm

  20. I wish .NET comes with messaging support out of the box. I just recently had to implement a similar pub-sub model with WCF.

    Daniel

    August 1, 2008 at 1:39 am

  21. This article is helpful. However I have some troubles at the time of running durable consumer (given in “A simple Subscriber” and “Subscriber in action” sections).

    When I close the console based durable subscriber and send message through jconsole, I see the following message in a new window. “Error calling isBroadcaster: org.apache.activemq:BrokerName=localhost,
    Type=Subscription,persistentMode=Non-Durable,
    destinationType=Topic,destinationName=topic_//ActiveMQ.Advisory.TempQueue_topic_//ActiveMQ.Advisory.TempTopic,clientId=ID_willow-1155-1217553490828-18_0,consumerId=ID_willow-1155-1217553490828-2_15_-1_1”

    After that when I start the durable consumer console I don’t see any of the sent messages.

    So how can I receive those messages?

    Note: I am using ActiveMQ 5.1

    Jashan

    August 1, 2008 at 3:38 am

  22. “A message selector allows a JMS consumer to be more selective about the messages it receives from a particular destination (topic or queue). Message selectors use message properties and headers as criteria in conditional expressions. These conditional expressions use boolean logic to declare which messages should be delivered to a JMS consumer.” – Java Message Service by Richard Monson – Haefel & David A. Chappell

    Which means, having a selector “2 > 1”, your consumer will receive every message that is sent to it. Alternatively, if you had “1 > 2”, you wouldn’t receive any messages.

    Adam

    August 1, 2008 at 5:45 pm

  23. Hi,
    Is there any way to receibe a message syncronously by a correlationId using nms and activeMq?,
    Thanks for your help

    David Kepes

    August 5, 2008 at 12:45 am


  24. It is important to note the difference between a subscriber and a durable subscriber. As subscriber will only receive messages that were sent while it is connected whereas a durable subscriber can receive messages sent while they were disconnected.

    I’m not sure this is true. It’s not easy to find documentation about this topic, but as far as I’ve been able to experiment durable consumer and persistent messages are two distinct concepts.
    Durable consumers subscribe to a topic, and, if the broker goes down, they don’t need to subscribe again because the broker will remember them and start do dispatch messages again. This DOES NOT mean that they’ll receive messages that were sent if and while they were offline. In order for this to happen the messages must be persistent. So, in order to receive messages that were sent while me subscriber was offline I need to be a durable consumer and the message to be persistent.
    Note that I’m not sure about my statements, it’s a bit intricate so I would appreciate any insights about this.

    Simone Busoli

    August 14, 2008 at 11:37 am

  25. SImone,

    Good point about the effect of persistent messages – I don’t know either. I wrote this a while ago – so the detail’s a little fuzzy – but I did some rudimentary testing of shutting the subscriber down and restarting it. I did get messages that I would otherwise have missed – it would be good if shed some more light on this aspect of using ActiveMQ.

    remark

    August 14, 2008 at 2:02 pm

  26. Great article, thank you. I was wondering how to unsubscribe a durable subscriber from a topic? It doesn’t seem to be covered anywhere.

    Brenda

    September 10, 2008 at 10:30 pm

  27. @Brenda

    if you grab nms code from the repo you can see that in a recent chieckin they introduced the ability to unsubscribe a durable consumer. IIRC it should be on the connection class.

    Simone Busoli

    September 17, 2008 at 7:54 pm

  28. Great.. a year down the line and I got it running in 5min..

    just had to add:

    using Spring.Messaging.Nms;
    using Spring.Messaging.Nms.Listener;

    using Spring.Messaging.Nms.Core;
    using Apache.NMS;
    using Apache.NMS.ActiveMQ;
    using Apache.NMS.ActiveMQ.Commands;

    Amit

    October 29, 2008 at 6:20 pm

  29. Great article. Thanks!

    Scott White

    December 2, 2008 at 8:55 pm

  30. Great article, thank you – got me MQ’ing in no time. Just one thing that bugged me that is worth a mention – when creating a durable consumer for a topic, if it is your intention to ignore all local messages, it is the connection that determines who is “local” – the last parameter in the constructor is worthless if the connection isn’t shared between the publisher and the subscriber.

    Quoting:
    In some cases, a connection may both publish and subscribe to a topic. The subscriber NoLocal attribute allows a subscriber to inhibit the delivery of messages published by its own connection.

    Marina K

    January 3, 2009 at 11:39 pm

  31. Great article, but i have a “bug” when trying to use the failover option in the publisher for example. Everything works great if I use your code, but if I change
    “const string BROKER = “tcp://localhost:61616″;” to
    “const string BROKER = “failover:tcp://localhost:61616″;”
    The subscriber will crash at startup, with the message
    “Specified argument was out of the range of valid values.
    Parameter name: port”
    I’m using a network of brokers and it is kindof useless if i cant use the failover option 🙂
    The debugger tells me the problems comes when The “SimpleTopicPublishers” executes the line
    “this.connection = this.connectionFactory.CreateConnection();”
    If anybody has been able to use the failover or has the same problem, just let met know please!
    Cheers,
    Sebas

    Srodrigu

    January 29, 2009 at 9:16 am

    • This was/is a bug inside the Spring Framework which does not know how to split this uri..

      Using the following string: failover:(tcp://localhost:61616) works like a charm in NMS itself ( your bug got me looking for a long time too)

      Since we decided not to use the Spring Framework in our communication with ActiveMQ because it only adds overhead to an already very complete library, it’s easier to just checkout activemq-dotnet and use NMS only instead.

      Mvhoof

      May 3, 2009 at 12:33 pm

  32. […] are some examples using the Spring.Messaging.Nms. And publish-subscribe is outlined […]

  33. Hi,

    Not sure if anyone will pick this up but i’m trying to write an asynchronous consumer in VB but i can’t work out what the VB equivalent of “this.consumer.Listener += new MessageListener(OnMessage);” is in the code above…can anyone help me please?!

    Thanks
    Dan

    Dan

    September 10, 2009 at 10:10 am

  34. Thank you for this article. I would have been lost without it.

    Is there a way to either monitor (poll) the state of the connection described here, OR react on some type of event trigger?

    Every now and then the connection drops out and I would like to check if comms are still running.

    mieliepap

    November 2, 2009 at 7:37 pm

  35. Ons way of doing monitoring the state of the connection is with the following code (in the TopicConnection Class):
    IConnection connection;
    this.connection.ExceptionListener += new ExceptionListener(myExceptionListener);
    private void myExceptionListener (Exception e)
    {
    MessageBox.Show(
    e.Message,
    “Connection_ExceptionListener”,
    MessageBoxButtons.OK,
    MessageBoxIcon.Error);

    }

    mieliepap

    November 4, 2009 at 9:35 am

  36. Great example.. It works 100% with an ActiveMQ server. However, I have a problem in that I can’t seem to make it work with a GlassFish/OpenESB.

    I am trying to use the same code you posted, i.e. the SimpleTopicSubscriber to connect to a topic on GlassFish. But I am unsuccessful. I have not succeeded in tracing down the error, and I feel that I have checked everything that I can think of.

    Essentially, I created a jms/TestTopic on my Glassfish server. The broker is on tcp://localhost:7676

    —error—snip snip—-
    Apache.NMS.ActiveMQ.BrokerException: : Unknown data type: 105

    at Apache.NMS.ActiveMQ.Transport.ResponseCorrelator.Request(Command command, TimeSpan timeout) in d:\Hudson\jobs\Regatta_Products\workspace\External\Apache.NMS\project\Apache.NMS.ActiveMQ\tags\1.1.0\src\main\csharp\Transport\ResponseCorrelator.cs:line 105
    at Apache.NMS.ActiveMQ.Connection.SyncRequest(Command command, TimeSpan requestTimeout) in d:\Hudson\jobs\Regatta_Products\workspace\External\Apache.NMS\project\Apache.NMS.ActiveMQ\tags\1.1.0\src\main\csharp\Connection.cs:line 338
    at Apache.NMS.ActiveMQ.Connection.SyncRequest(Command command) in d:\Hudson\jobs\Regatta_Products\workspace\External\Apache.NMS\project\Apache.NMS.ActiveMQ\tags\1.1.0\src\main\csharp\Connection.cs:line 332
    at Apache.NMS.ActiveMQ.Connection.CheckConnected() in d:\Hudson\jobs\Regatta_
    Products\workspace\External\Apache.NMS\project\Apache.NMS.ActiveMQ\tags\1.1.0\src\main\csharp\Connection.cs:line 410 at Apache.NMS.ActiveMQ.Connection.Start() in d:\Hudson\jobs\Regatta_Products\workspace\External\Apache.NMS\project\Apache.NMS.ActiveMQ\tags\1.1.0\src\main\csharp\Connection.cs:line 113
    at ESBTest2.SimpleTopicSubscriber..ctor(String topicName, String brokerUri, String clientId, String consumerId) in C:\Documents and Settings\Tobie\My Documents\Visual Studio 2008\Projects\ESBTest2\ESBTest2\SimpleTopicSubscriber.cs:line 28
    at ESBTest2.Program.Main(String[] args) in C:\Documents and Settings\Tobie\My
    Documents\Visual Studio 2008\Projects\ESBTest2\ESBTest2\Program.cs:line 16
    —error—snip snip—-

    Hope you have an idea for me. I am willing to try anything at this point.

    Kind regards,
    Tobie

    Boerewors

    November 13, 2009 at 2:15 pm

  37. I am trying to create a asp.net web application that asynchronously listens to an activemq topic. Is this possible and if so how would I do it. I know there is a java version that makes use of a servlet but I need a asp/.net client.

    Chris

    January 12, 2010 at 9:09 pm

  38. […] Home NMS Download ActiveMQ n .NET Request Response with NMS Publish n Subscribe with NMS Transactional Messaging with […]

  39. […] Publish-Subscribe with ActiveMq and NMS « Re.Mark (tags: .net java messaging activemq) […]

    • when i am running the above code, i am getting the following errors:
      Error 1: The best overloaded method match for ‘Core.TopicConnectionFactory.TopicConnectionFactory(Apache.NMS.IConnectionFactory)’ has some invalid arguments

      Error 2: Argument 1: cannot convert from ‘Apache.NMS.ActiveMQ.ConnectionFactory’ to ‘Apache.NMS.IConnectionFactory’

      Error 3: The type ‘Apache.NMS.IConnectionFactory’ is defined in an assembly that is not referenced. You must add a reference to assembly ‘Apache.NMS, Version=1.5.0.2194, Culture=neutral, PublicKeyToken=82756feee3957618’.

      When i am adding a reference to assembly ‘Apache.NMS’ as it is asking for this in Error 3, i am getting more errors including warnings. Please do help me in this regard as soon as possible. Thanks in advance!

      Pankaj

      April 29, 2011 at 6:25 am

      • Instead of ConnectionFactory, use NMSConnectionFactory defined in Apache.NMS namespace.

        Ashish Sharma

        October 17, 2011 at 7:43 am

  40. Great article, Remark!
    It really helps me started with ApacheMQ + .NET.
    However, I’ve an issue while trying reconnect when server shuts down and then comes back.
    So I track server’s state with an ExceptionListener: this.connection.ExceptionListener += new ExceptionListener(OnException);
    and it works fine in case of server’s error, it catches System.IO.EndOfStream exception and then I’m trying to reconnect. When server begins to run again and reconnect is successfully done, I catch instantly an exception again with the same listener. Then according to my algorythm the subscriber is doing reconnect and receives an exception that this broker is already registered and I got into vicious cirle.
    I tried to call Dispose() in exception and then start reconnects after some interval, but it didn’t help. So my question is how to implement in correct way reconnect algorythm? Thanks

    Sergii Tsegelnyk

    May 3, 2011 at 11:59 am

  41. Ultimate article Remark!
    It’s a very good article for starters. Good Work…

    Ashish Sharma

    October 17, 2011 at 4:16 am

  42. Hi,

    If any one has experience on SSL implementation , can you please send me some examples.
    I am trying for last few day but no sucesses. i was able to communicate on TCP.

    Thanks
    Suresh

    Suresh Chalicham

    November 3, 2011 at 1:48 am

  43. Hi,

    Thanks man. your articles are really really helpful !!!!
    I hope you’ll keep with it 🙂 it helps us all.

    Thanks,
    Yair

    Yair

    January 22, 2012 at 9:04 am

  44. how to dequeue a message?

    thoseeardo

    July 20, 2012 at 11:33 pm

  45. ogkd5FMvxx

    O653OKB2OY www.yandex.ru

    September 1, 2019 at 3:27 am

  46. Great Article Re.Mark, this helped me alot to understand AMQ PUB/SUB Mechanism
    But i need to read AMQ pub/sub and then write to Mainframe MQ in ASP.net web application without using forms.
    if i will get any help on this ,i am really very thankfull.

    suhasini

    May 21, 2020 at 11:23 am

  47. Awesome article,It helped me to spin up the code in few mins.
    Do you have similar examples for .net core and I also noticed if my consumer runs for a longer period more than 24 hrs then I am not receiving any messages but my connection is active.I have to restart the service.
    Any idea what could be the problem

    Muru

    September 23, 2020 at 11:08 am

  48. Trying to consume the message from a Topic, using .net core libraries and i am stuck. The execution stops are waiting for the message from consumer.receive() ” var message = consumer.Receive(); ” but no message return or any error. If i change the consumer to queue , the consumer.receive is fine. can some one point me where i am going wrong.

    public async Task SubscribeToTopic(string topic,string clientid,string selector, string durableConsumerName, Func messageReceived, CancellationToken cancellation)
    {
    //using (var connection = connectionFactory.CreateConnection())
    using (var connection = connectionFactory.CreateConnection())
    {
    connection.ClientId = clientid;
    connection.Start();
    using (var session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
    {
    // var dest = session.GetQueue(topic);
    // var dest = session.GetTopic(topic);
    ActiveMQTopic dest = new ActiveMQTopic(topic);
    //using (var consumer = session.CreateConsumer(dest))
    using (var consumer = session.CreateDurableConsumer(dest, durableConsumerName, selector, false))
    {
    while (!cancellation.IsCancellationRequested)
    {
    await SubscribeToMessage(consumer, messageReceived);
    }
    }
    }
    }
    }

    private async Task SubscribeToMessage(IMessageConsumer consumer, Func messageReceived)
    {
    string messageString = null;
    try
    {

    var message = consumer.Receive();
    var textMessage = message as ITextMessage;

    if (textMessage == null)
    {
    throw new InvalidOperationException(
    $”Invalid message type received, expected {nameof(ITextMessage)} but got {message.GetType().FullName}”);
    }

    messageString = textMessage.Text;

    logger.LogInformation($”Message received: {messageString}”);

    var messageModel = jsonSerializer.Deserialize(messageString);

    await messageReceived(messageModel).ConfigureAwait(false);

    return true;
    }
    catch (JsonException)
    {
    logger.LogError($”Invalid JSON content in message: {messageString}”);
    return false;
    }
    catch (Exception ex)
    {
    logger.LogError(ex.Message);
    return false;
    }
    }

    Muru

    December 4, 2020 at 7:01 pm


Leave a reply to remark Cancel reply