美文网首页
activeMQ-04Java实现通讯(主题TOPIC)

activeMQ-04Java实现通讯(主题TOPIC)

作者: 誓俭草 | 来源:发表于2020-02-01 20:37 被阅读0次

    回顾下,activeMQ的发布/订阅模式(topic主题)
    特点:
    1)每一个消息可以有多个消费者,即一对多的关系;
    2)生产者与消费者有时间上的相关性,消费者只能消费订阅之后发布的消息;
    3)生产者topic不保存消息,当没有消费者时,则视为废消息,所以一般先启动消费者,再启动生产者。

    • 简易图如下:


      activemq.png
    • 代码实现
      生产者:
    package com.jjclub.activeMQ_01;
    
    import javax.jms.Connection;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Queue;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.jms.Topic;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    //生产者01
    public class Producer_topic01 {
        //服务器地址
        private static String url = "tcp://localhost:61616";
        //主题名称
        private static String topicName="queue1";
        public static void main(String[] args) {
            //创建activemq连接工场
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(url);
            Connection connection = null;
            Session session =null;
            MessageProducer messageProducer =null;
            try {
                //创建连接connection
                connection = activeMQConnectionFactory.createConnection();
                //启动连接
                connection.start();
                //创建连接session;第一个参数为事务,第二个参数为签发机制。暂时选择默认,后续说明;
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                //创建消息目的地topic(主题名称)
                Topic topic = session.createTopic(topicName);
                //创建生产者
                messageProducer = session.createProducer(topic);
                //创建消息体(文本消息内容)
                TextMessage textMessage = session.createTextMessage("hello");
                //发送消息到队列中
                messageProducer.send(textMessage);
            } catch (JMSException e) {
                e.printStackTrace();
            }finally {
                try {
                    //关闭消息
                    messageProducer.close();
                    session.close();
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }       
        }   
    }
    

    消费者:
    同样有两种方式,此处就举例一种:

    package com.jjclub.activeMQ_01;
    
    import javax.jms.Connection;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.Queue;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.jms.Topic;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    //消费者
    public class Consumer_topic01 {
            //服务器地址
            private static String url = "tcp://localhost:61616";
            //主题名称
            private static String topicName="queue1";
            
            public static void main(String[] args) {
                //创建activemq连接工场
                ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(url);
                Connection connection = null;
                Session session =null;
                MessageConsumer messageConsumer =null;
                try {
                    //创建连接connection
                    connection = activeMQConnectionFactory.createConnection();
                    //启动连接
                    connection.start();
                    //创建连接session;第一个参数为事务,第二个参数为签发机制。暂时选择默认,后续说明;
                    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                    //创建消息目的地topic(主题名称)
                    Topic topic = session.createTopic(topicName);
                    //创建消费者
                    messageConsumer = session.createConsumer(topic);
                    while(true) {
                        // messageConsumer.receive();此方法会一直等待消息,不会中止进程
                        // messageConsumer.receive(4000L);等待4s后,若无消息,则中止进程,不再等待
                        Message message = messageConsumer.receive();
                        if(message!=null) {
                            TextMessage textMessage = (TextMessage) message;
                            System.out.println("消费的消息是"+textMessage);
                        }else {
                            break;
                        }
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                }finally {
                    try {
                        //关闭消息
                        messageConsumer.close();
                        session.close();
                        connection.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
    }
    

    相关文章

      网友评论

          本文标题:activeMQ-04Java实现通讯(主题TOPIC)

          本文链接:https://www.haomeiwen.com/subject/aqkxxhtx.html