美文网首页
activeMQ-04Java实现通讯(主题TOPIC)

activeMQ-04Java实现通讯(主题TOPIC)

作者: 誓俭草 | 来源:发表于2020-02-01 20:37 被阅读0次

回顾下,activeMQ的发布/订阅模式(topic主题)
特点:
1)每一个消息可以有多个消费者,即一对多的关系;
2)生产者与消费者有时间上的相关性,消费者只能消费订阅之后发布的消息;
3)生产者topic不保存消息,当没有消费者时,则视为废消息,所以一般先启动消费者,再启动生产者。

  • 简易图如下:


    activemq.png
  • 代码实现
    生产者:
package com.jjclub.activeMQ_01;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

//生产者01
public class Producer_topic01 {
    //服务器地址
    private static String url = "tcp://localhost:61616";
    //主题名称
    private static String topicName="queue1";
    public static void main(String[] args) {
        //创建activemq连接工场
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(url);
        Connection connection = null;
        Session session =null;
        MessageProducer messageProducer =null;
        try {
            //创建连接connection
            connection = activeMQConnectionFactory.createConnection();
            //启动连接
            connection.start();
            //创建连接session;第一个参数为事务,第二个参数为签发机制。暂时选择默认,后续说明;
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建消息目的地topic(主题名称)
            Topic topic = session.createTopic(topicName);
            //创建生产者
            messageProducer = session.createProducer(topic);
            //创建消息体(文本消息内容)
            TextMessage textMessage = session.createTextMessage("hello");
            //发送消息到队列中
            messageProducer.send(textMessage);
        } catch (JMSException e) {
            e.printStackTrace();
        }finally {
            try {
                //关闭消息
                messageProducer.close();
                session.close();
                connection.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }       
    }   
}

消费者:
同样有两种方式,此处就举例一种:

package com.jjclub.activeMQ_01;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

//消费者
public class Consumer_topic01 {
        //服务器地址
        private static String url = "tcp://localhost:61616";
        //主题名称
        private static String topicName="queue1";
        
        public static void main(String[] args) {
            //创建activemq连接工场
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(url);
            Connection connection = null;
            Session session =null;
            MessageConsumer messageConsumer =null;
            try {
                //创建连接connection
                connection = activeMQConnectionFactory.createConnection();
                //启动连接
                connection.start();
                //创建连接session;第一个参数为事务,第二个参数为签发机制。暂时选择默认,后续说明;
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                //创建消息目的地topic(主题名称)
                Topic topic = session.createTopic(topicName);
                //创建消费者
                messageConsumer = session.createConsumer(topic);
                while(true) {
                    // messageConsumer.receive();此方法会一直等待消息,不会中止进程
                    // messageConsumer.receive(4000L);等待4s后,若无消息,则中止进程,不再等待
                    Message message = messageConsumer.receive();
                    if(message!=null) {
                        TextMessage textMessage = (TextMessage) message;
                        System.out.println("消费的消息是"+textMessage);
                    }else {
                        break;
                    }
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }finally {
                try {
                    //关闭消息
                    messageConsumer.close();
                    session.close();
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
}

相关文章

  • activeMQ-04Java实现通讯(主题TOPIC)

    回顾下,activeMQ的发布/订阅模式(topic主题)特点:1)每一个消息可以有多个消费者,即一对多的关系;2...

  • service 通讯与自定义 srv 类型

    引言 ROS node 之间的通讯形式主要包括两种:topic 和 service。 通过 topic 通讯时,不...

  • topic 通讯与自定义 msg 类型

    引言 ROS 中各个 node 通过 topic 传输数据。topic 可以直观理解成通讯管道,每个 topic...

  • ConsumeQueue索引文件及构建

    ConsumeQueue概览 RocketMQ是基于主题订阅模式实现消息消费,消费者关心的是主题Topic下的所有...

  • RabbitMQ消息队列---Topic Exchange细节总

    RabbitMQ主题交换机(Topic Exchange)细节学习,英文官方文档 主题交换机(Topic Exch...

  • Kafka

    介绍 Broker: 服务 Topic: 消息主题 Partition: 分区,一个topic存在多个Partit...

  • vue-全局通讯层

    今天分享的主题是复杂组件中的事件通讯,内容包括建立通讯层的出发点,建立过程中的方案确定,以及最终实现。 建立通讯层...

  • 8.13托福写作课堂笔记

    一、主题句的注意事项 1.主题句一定要直白,不要写谚语、哲学思想 2.主题句一定要包含topic,topic的同义...

  • Kafka命令行操作

    1)查看当前集群中已存在的主题topic 2)创建topic --zookeeper 连接zk集群--create...

  • RabbitMQ(六) - 主题(topic)

    主题(topic) 在上一个教程中我们改善了我们的日志系统。我们使用direct类型的exchange,可以选择性...

网友评论

      本文标题:activeMQ-04Java实现通讯(主题TOPIC)

      本文链接:https://www.haomeiwen.com/subject/aqkxxhtx.html