ActiveMQ

作者: Aaron_Swartz | 来源:发表于2019-11-13 11:48 被阅读0次

    ActiveMQ: 本质的好处就是解决异步处理消息的问题。

    • toptic模式
      消息为广播模式,每一个监听到的队列都可以收到消息。
    @JmsListener(destination = "topic.receivemsg", containerFactory = "jmsListenerContainerMsgTopic", concurrency = "5")
    
    • queue模式
      消息为竞争消费模式,只有竞争到消息的队列可以消费,即使多个监听者监听同一个队列,最终也只有一个消费者可以消费。
    // 配置
    @JmsListener(destination = "${lly.queue.sendMsgInfo}" + "${spring.profiles.active}", containerFactory = "jmsListenerContainerQueue", concurrency = "5")
    
    package com.xes.ops.insight.plus.mp;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import javax.jms.*;
    import java.util.concurrent.TimeUnit;
    
    public class Sender {
        public static void main(String[] args) throws JMSException, InterruptedException {
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                    "system",
                    "manager",
                    "tcp://203.195.176.21:61616");
            Connection connection = connectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("first");
            MessageProducer messageProducer = session.createProducer(destination);
            for (int i = 0; i < 100; ++i) {
                TextMessage textMessage = session.createTextMessage("这是消息:" + i);
                messageProducer.send(destination, textMessage);
                TimeUnit.SECONDS.sleep(1);
            }
            System.out.println("ok");
        }
    }
    
    
    import javax.jms.*;
    import java.util.concurrent.TimeUnit;
    
    public class Consumer {
        public static void main(String[] args) throws JMSException, InterruptedException {
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                    "system",
                    "manager",
                    "tcp://203.195.176.21:61616");
            Connection connection = connectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("first");
            MessageConsumer consumer = session.createConsumer(destination);
            for (int i = 0; i < 100; ++i) {
                Message message = consumer.receive();
                System.out.println(message);
                TimeUnit.SECONDS.sleep(1);
            }
        }
    }
    
    • 消息的消费者接收消息可以采用两种方式
    1. consumer.receive() 或 consumer.receive(int timeout);
    2. 注册一个MessageListener.
      采用第一种方式,消息的接收者会一直等待下去,知道有消息到达或者超时。后一种方式会注册一个监听器,
      当有消息到达的时候,会调用它的onMessage()方法。以下举例说明:
    MessageConsumer consumer=session.createConsumer(queue);
    consumer.setMessageListener(new MessageListener(){
    public void onMessage(Message msg)
    {
    System.out.println("接收到的消息为+"((TextMessage)msg).getText());
    }
    })
    

    相关文章

      网友评论

          本文标题:ActiveMQ

          本文链接:https://www.haomeiwen.com/subject/cnjhictx.html