美文网首页
消息队列

消息队列

作者: 刺猬_3e91 | 来源:发表于2019-04-29 15:30 被阅读0次

    什么是消息队列

    消息队列”是在消息的传输过程中保存消息的容器。消息队列管理器在将消息从它的源中继到它的目标时充当中间人。队列的主要目的是提供路由并保证消息的传递;如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它


    u=2714269904,457215482&fm=173&app=25&f=JPEG.jpg

    为什么要用消息队列

    一,消息队列的特性

    • 解耦:一个具有普适性质的消息队列组件不需要考虑上层的业务模型,只做好消息的分发就可以了,上层业务的不同模块反而需要依赖消息队列所定义的规范进行通信
    • FIFO:先进先出可以保证消息的顺序
    • 容灾:下游消费者由于各种原因宕机无法消费,队列可以暂时保存消息,直到消费者重新上线

    二,消息队列的好处

    • 提高系统响应速度:使用了消息队列,生产者一方,把消息往队列里一扔,就可以立马返回,响应用户了。无需等待处理结果。
      例如:医院取化验单
    • 提高系统稳定性:异步,解耦,削峰
      • 削峰(消除峰值):在分布式系统中,一次分布式事务关联的是多个节点,其中每一个节点出现问题都会成为整个事务处理流程中的瓶颈。如果逻辑节点与数据库之间没有一个起到缓冲作用的节点,那就是每次操作都要访问数据库,如果有10万设备同事在线足以搞垮MySQL节点。消息队列就是在这之间的缓冲。
      • 横向扩展:便捷扩容,如果消费者处理业务的速度不够快导致消息积压,可以很方便的扩展成N多个消费者。
        例如:


        微信截图_20190429115807.png

    消息队列的选择

    https://blog.csdn.net/songfeihu0810232/article/details/78648706

    • rabbitMQ:稳定,可靠,数据一致,支持多协议,有消息确认,性能一般,基于erlang语言,二次开发困难.
    • kafka:高吞吐,高性能,快速持久化,无消息确认,无消息遗漏,可能会有有重复消息,依赖于zookeeper,成本高.
    • ZeroMQ:灵活快速,不支持持久化,需要大量编码来实现稳定可靠.
    • rocketMQ:性能好,高吞吐,高可用性,支持大规模分布式.
    • ActiveMQ:性能没有rocketMQ好,吞吐没有kafka高,没有ZeroMQ灵活,没有rabbitMQ稳定, 但是性价比高,使用简单

    对比图:


    20171127202420239.png

    消息队列的两种模式

    JMS规范目前支持两种消息模型:点对点(point to point, queue)和发布/订阅(publish/subscribe,topic)。

    • 点对点:Queue,不可重复消费
      消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。
      消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费
    • 发布/订阅:Topic,可以重复消费
      消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。

    ActiveMq发送消息

    1. Queue模式

    注入队列bean ActiveMQQueue(队列名)

    @Bean
    public Queue information() {
        return new ActiveMQQueue(jmsParmConfig.getQueue().getInformation());
    }
    

    发送消息 convertAndSend(队列名,消息)
    消息类型可以是String 也可以是对象 但是必须要实现Serializable接口

    public void sendLocationInfoMessage(final String message) {
        try {
            jmsTemplate.convertAndSend(pub.information(), message);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    

    ActiveMq消费消息

    JmsListener 为MQ监听destination =“对列名”

      @JmsListener(destination ="${jmsconfig.queue.information}")
    public void locationBathLinforationQueue(String message) throws Exception {
         log.info("MQ消费数据" +message);
    }
    

    这就是一个简单的点对点Queue模式的消息队列

    1. Topic模式

    注入bean方法差不多 注意注入的类是ActiveMQTopic

    @Bean
    public Topic information() {
        return new ActiveMQTopic(jmsParmConfig.getTopic().getInformation());
    }
    

    发送消息

     public void selectInformation(final AdminMessage message) {
        try {
            log.info("收到后台查询位置请求:"+message.toString());
            jmsTemplate.convertAndSend(topicConfig.information(), message);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    

    订阅toipc 如果传输的是对象则接收需要用ObjectMessage 并且不能直接强转,需要先转成Object

    @JmsListener(destination = "${jmsconfig.topic.information}")
    public void selectInformation(ObjectMessage message) throws JMSException {
        AdminMessage msg= (AdminMessage) message.getObject();
        Session session = sessionManager.findByTerminalPhone(msg.getSerialNo());
        if (session==null){
            return;
        }
        adminMsgProcessService.sendSelect(session);
    }
    

    注:topic模式下消费者需要修改配置文件

      jms:pub-sub-domain: true #使用订阅模式
    

    Redis队列模式

    Redis主要还是内存数据库,当然也支持队列queue可以作为一个轻量级MQ使用,但是没有真正的MQ功能全。

    • 可靠性:没有相应的机制保证消息的可靠消费,如果发布者发布一条消息,而没有对应的订阅者的话,这条消息将丢失,不会存在内存中
    • 实时性:实时性高,redis作为高效的缓存服务器,所有数据都存在在服务器中,所以它具有更高的实时性
    • 总结:Redis 轻量级,低延迟,高并发,低可靠性;

    Redis队列处理

    发送消息redisTemplate方法为convertAndSend,jedis方法为lpush我们使用的是redisTemplate

      /**
     * 发布消息
     * @param channel 通道名字
     * @param o
     */
    public void pubMessage(String channel,Object o) {
        redisTemplate.convertAndSend(channel, o);
    }
    

    消息监听 注入监听 RedisReceiver

      @Bean(value = "MessageListener")
    public MessageListenerAdapter getMessageListenerAdapter() {
        return new MessageListenerAdapter(new RedisReceiver());
    }
    
    @Bean(value = "RedisMessageListenerContainer")
    public RedisMessageListenerContainer getRedisMessageListenerContainer(
            RedisConnectionFactory jedisConnectionFactory,
            MessageListenerAdapter messageListenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(jedisConnectionFactory);
    
        List<ChannelTopic> topics = Arrays.asList(new ChannelTopic("deviceInfo"));
        container.addMessageListener(messageListenerAdapter, topics);
    
        return container;
    }
    

    使用消费实现MessageListener 接口

         public class RedisReceiver implements MessageListener {
    @Override
    public void onMessage(Message message, byte[] bytes) {
        System.out.println(message);
    }
    

    }

    总结

    消息队列可以提升系统的性能,也能降低峰值时某个操作的异常而导致后续所有操作都异常。但是也有一定的弊端。
    系统可用性降低:你想啊,本来其他系统只要运行好好的,那你的系统就是正常的。现在你非要加个消息队列进去,那消息队列挂了,你的系统不是呵呵了。因此,系统可用性降低
    系统复杂性增加:要多考虑很多方面的问题,比如一致性问题、如何保证消息不被重复消费,如何保证保证消息可靠传输。因此,需要考虑的东西更多,系统复杂性增大。
    不要盲目选型,选择当前项目最适合的方案才是王道

    相关文章

      网友评论

          本文标题:消息队列

          本文链接:https://www.haomeiwen.com/subject/fzsqnqtx.html