English 中文(简体)
ActiveMQ Consumer / Producer Connection Listener
原标题:

I can t seem to find a way to listen for new producer and consumer connections (or connection interrupts) in ActiveMQ (Java Version). I want to be able to tell the consumers (or they can find out themselves) that the producer s connection dropped. The other way around (the producer finding out that a certain consumer disconnected) is also required.

I d appreciate some help.

最佳回答

I think you want to listen for new producers and consumers on a particular destination (a particular queue or topic). Is that right?

You can instantiate ConsumerEventSource and ProducerEventSource, and register your own listeners by calling their setConsumerListener and setProducerListener on them, respectively.

So:

Connection conn = yourconnection; // the connection your listener will use
Destination dest = yourdestination; // the destination you re paying attention to
ConsumerEventSource source = new ConsumerEventSource(conn, dest);
source.setConsumerListener(new ConsumerListener() {

   public void onConsumerEvent(ConsumerEvent event) {
      if (event.isStarted()) {
         System.out.println("a new consumer has started - " + event.getConsumerId());
      } else {
         System.out.println("a consumer has dropped - " + event.getConsumerId());
      }
   }

});

If you look at the code for ConsumerEventSource or ProducerEventSource, you ll see that they re simple objects that use the methods of AdvisorySupport to listen on a special advisory topic whose business it is to broadcast news about producers and consumers. You might learn more by reading the source code for those classes.

Your use of "connection" is potentially a problem; in ActiveMQ land (which is a subset of JMS land), a "Connection" is a lower-level object that isn t associated with a particular destination. A particular client creates a "Session" from a Connection - still not specific to a destination - and then creates a destination-specific QueueSender, QueueReceiver, TopicPublisher, or TopicSubscriber. When those are created, or when the sessions that created them die, those are the events you want to hear about, and will hear about if you use the code above.

问题回答

All the information I need is published in the ActiveMQ Advisory topics such as "ActiveMQ.Advisory.Connection" or simply "ActiveMQ.Advisory..>" for all of them. Even the events that happen in a Stomp Connection are published in the ActiveMQ Advisory topics. The following code gives an example of this (tested with a Flex Client connected through Stomp):

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("user", "password", ActiveMQConnection.DEFAULT_BROKER_URL);
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(transacted, ackMode);
connection.start();
Destination destinationAdvisory = session.createTopic("ActiveMQ.Advisory..>");
MessageConsumer consumerAdvisory = session.createConsumer(destinationAdvisory);
consumerAdvisory.setMessageListener(new MessageListener() {
    public void onMessage(Message message) {

            if (message instanceof ActiveMQMessage) {
                ActiveMQMessage activeMessage = (ActiveMQMessage) message;
                Object command = activeMessage.getDataStructure();
                if (command instanceof ConsumerInfo) {
                    System.out.println("A consumer subscribed to a topic or queue: " + command);
                } else if (command instanceof RemoveInfo) {
                    RemoveInfo removeInfo = (RemoveInfo) command;
                    if (removeInfo.isConsumerRemove()) {
                        System.out.println("A consumer unsubscribed from a topic or queue");
                    }
                    else {
                        System.out.println("RemoveInfo, a connection was closed: " + command);
                    }
                } else if (command instanceof ConnectionInfo) {
                    System.out.println("ConnectionInfo, a new connection was made: " + command);
                } else {
                    System.out.println("Unknown command: " + command);
                }
            }
    }
});




相关问题
Spring Properties File

Hi have this j2ee web application developed using spring framework. I have a problem with rendering mnessages in nihongo characters from the properties file. I tried converting the file to ascii using ...

Logging a global ID in multiple components

I have a system which contains multiple applications connected together using JMS and Spring Integration. Messages get sent along a chain of applications. [App A] -> [App B] -> [App C] We set a ...

Java Library Size

If I m given two Java Libraries in Jar format, 1 having no bells and whistles, and the other having lots of them that will mostly go unused.... my question is: How will the larger, mostly unused ...

How to get the Array Class for a given Class in Java?

I have a Class variable that holds a certain type and I need to get a variable that holds the corresponding array class. The best I could come up with is this: Class arrayOfFooClass = java.lang....

SQLite , Derby vs file system

I m working on a Java desktop application that reads and writes from/to different files. I think a better solution would be to replace the file system by a SQLite database. How hard is it to migrate ...

热门标签