美文网首页
消息队列使用与改造点

消息队列使用与改造点

作者: 东郭先生李 | 来源:发表于2018-03-30 11:29 被阅读24次

    序言

    文档背景

    消息队列改造是双创框架升级工作的一部分。

    文档主题

    文档主要讲述消息队列代码更新后,新的使用方式和如何使用原有的消费者模式完成业务逻辑。

    文档结构图

    [站外图片上传中...(image-e177db-1522380525219)]

    文档变更历史

    作者 日期 版本 变更点
    李清泉 2018-3-29 0.5 创建文档

    配置文件的写法

    配置数据源

    配置数据源有多种方式,我使用过有效的有两种:

    1. 单独配置一个源
    2. 配置到绑定

    单独配置一个源

    rabbitmq:
      addresses: amqp://192.168.1.241:5672
      username: mqadmin
      password: mqadmin
    

    单独配置的方式是使用rabbitmq作为顶层配置,然后在其他配置中引用,如绑定时使用:

    binders:
        rabbit1:
            type: rabbit
            environment:
               spring:
                  rabbitmq:
                    addresses: ${rabbitmq.addresses}
                    username: ${rabbitmq.username}
                    password: ${rabbitmq.password}
                    virtual-host: test1
    

    在上述代码中引用了单独配置。这种好处是可以配置一次多处引用,避免重复写。

    配置到绑定

    binders:
        rabbit1:
            type: rabbit
            environment:
               spring:
                  rabbitmq:
                    addresses: amqp://192.168.1.241:5672
                    username: mqadmin
                    password: mqadmin
                    virtual-host: test1
    

    这种方式是直接写到绑定上。如果只有一个配置,可以这么写,但如果有多个绑定并且用同一个数据源,就变成了冗余。

    绑定数据源

    绑定数据源是将数据源配置到一个变量之中,方便配置接收或者发送时使用。上述已经说过配置方法了。在下面的配置代码中,是将一个数据源配置到变量 rabbit1之中。

    binders:
        rabbit1:
            type: rabbit
            environment:
               spring:
                  rabbitmq:
                    addresses: amqp://192.168.1.241:5672
                    username: mqadmin
                    password: mqadmin
                    virtual-host: test1
    

    可以绑定多个的,如果下面的配置:

    binders:
        rabbit1:
            type: rabbit
            environment:
               spring:
                  rabbitmq:
                    addresses: amqp://192.168.1.241:5672
                    username: mqadmin
                    password: mqadmin
                    virtual-host: test1
        rabbit2:
            type: rabbit
            environment:
               spring:
                  rabbitmq:
                    addresses: amqp://192.168.1.245:5672
                    username: mqadmin
                    password: mqadmin
                    virtual-host: test2
    

    上面的代码配置了两个变量,rabbit1和rabbit2. 在接收和发送配置中可以引用这两个不同的变量代表不同的源。

    配置接收

    spring:
      cloud:
        stream:
          bindings:
            input1:
              binder: rabbit1
              contentType: text/plain
              destination: testquene
    

    上述代码中,定义一个叫做 input1的接收,内容格式为文本,目标(队列名)为testquene ,绑定到rabbit1源变量(上述的绑定配置)。

    配置发送

    spring:
      cloud:
        stream:
          bindings:
            output1:
              binder: rabbit1
              destination: testquene2
              contentType: text/plain
    

    上述代码中,配置了一个叫做output1的消息发送,目标(队列名)为testquene2,模式是文本。

    完整配置示例

    server:
      port: 9087
    
    rabbitmq:
      addresses: amqp://192.168.1.241:5672
      username: mqadmin
      password: mqadmin
    
    spring:
      cloud:
        stream:
          bindings:
            input1:
              binder: rabbit1
              #group: test.qqqq
              contentType: text/plain
              destination: mqTestDefault.test.qqqq
            input2:
              binder: rabbit1
              #group: test.qqqq
              contentType: text/plain
              destination: quene1
            output1:
              binder: rabbit1
              destination: mqTestDefault.test.qqqq
              contentType: text/plain
            output2:
              binder: rabbit1
              destination: quene1
              contentType: text/plain
          binders:
            rabbit1:
              type: rabbit
              environment:
                spring:
                  rabbitmq:
                    addresses: ${rabbitmq.addresses}
                    username: ${rabbitmq.username}
                    password: ${rabbitmq.password}
                    virtual-host: test1
      #              exchange:
          defaultBinder:
    
    

    消息队列使用方式

    消息队列涉及到两种操作:

    • 消费处理信息
    • 生产发送信息

    下面分别说明这两个操作的使用方式

    消息队列的接收处理

    消息队列的接收处理有两种方式:

    1. 直接使用监听
    2. 使用适配原有的消费模式

    将配置文件中的接收与发送定义到代码中

    无论是接收还是发送消息。都要先在一个接口类中定义信道。接收时,定义为SubscribableChannel ;发送时,定义为MessageChannel 。示例:

    public interface Sink {
        String INPUT1 = "input1";
    
        String INPUT2="input2";
    
        String OUTPUT1="output1";
        String OUTPUT2="output2";
    
        @Input(INPUT1)
        SubscribableChannel input1();
    
        @Input(INPUT2)
        SubscribableChannel input12();
    
        @Output(OUTPUT1)
        MessageChannel output1();
        @Output(OUTPUT2)
        MessageChannel output2();
    }
    

    上述代码中,分别定义了两个接收信道和两个发送信息。请务必注意名称一定要对应对配置文件中。比如上述的input1 input2 output1 output2 ,必须在配置文件中存在的。
    上述的定义是注册到spring之中的,使用时,只需要使用相应的名称的bean即可以。如果使用input1的bean名称,即为input1的接收信道。

    使用监听的方式

    定义好接收与发送的spring bean后,可以在监听中使用接收了。

    @Service
    @EnableBinding(Sink.class)
    public class StreamMessageQueneManager extends AbstractMessageQuene implements MessageQueueManager{
        // 监听 binding 为 Sink.INPUT 的消息
        @StreamListener(Sink.INPUT1)
        public void input1(Message<String> message) {
            System.out.println("第一个队列:" + message.getPayload());
            doSomething(message.getPayload());
        }
    }
    

    上述代码中,StreamMessageQueneManager 这个类为spring bean,使用了@EnableBinding(Sink.class)注解。代表将已经定义好的Sink接口中的定义好的接收在这个类中开启监听。在public void input1(Message<String> message)方法头上,加上了@StreamListener(Sink.INPUT1)注解,目的是将Sink中的INPUT1代表的接收在这个方法上开启监听。当监听收到消息时,将自动调用public void input1(Message<String> message)方法,传入message对象,我们就可以使用这个对象执行任何逻辑。

    使用适配原有消费模式

    原来的双创是使用生产-消费模式处理消息的,我们原来是使用MessageQueueManager接口接收和发送信息的,这个接口代码如下:

    public interface MessageQueueManager {
        /**
         * 发送消息
         *  @param queueName 队列名称
         *  @param message 放入队列的内容
         */
         void sendMessage(String queueName, Object message);
    
        /**
         * 获取通道消息
         *  @param queueName 队列名称
         *  @return message 队列的内容
         */
         Object getMessage(String queueName);
    
      
    }
    

    Object getMessage(String queueName);方法中,我们通过传入队列名的方法,主动获取消息的。因此改造后为了减少代码变动,这种方式保持不变。原有的代码不需要变动即可正常执行。原理是使用了适配的方法:

    @Service
    @EnableBinding(Sink.class)
    public class StreamMessageQueneManager  implements MessageQueueManager{
        private Map<String,Queue<String>> queueMap=new HashMap<>();
        // 监听 binding 为 Sink.INPUT 的消息
        @StreamListener(Sink.INPUT1)
        public void input1(Message<String> message) {
            System.out.println("第一个队列:" + message.getPayload());
            Queue queue = getQueue(Sink.INPUT1);
            queue.offer(message.getPayload());
        }
        private Queue getQueue(String queneName) {
            Queue queue=queueMap.get(queneName);
            if(queue==null){
                queue=new ConcurrentLinkedQueue();//非阻塞
                queueMap.put(queneName,queue);
            }
            return queue;
        }
         @Override
        public Object getMessage(String queueName) {
            Queue queue = getQueue(queueName);
            return queue.poll();
        }
        
        @Override
        public void sendMessage(String queueName, Object message) {
            MessageChannel channel=channelMap.get(queueName);
            if(channel==null){
                throw new RuntimeException(queueName+"对应的信通不存在!");
            }
            if(message!=null) {
                channel.send(MessageBuilder.withPayload(message).build());
            }
        }
    }
    

    在上述接口的实现类中,监听将input1收到的消息放入了临时非阻塞且线程安全的ConcurrentLinkedQueue中。业务逻辑通过定时主动调用public Object getMessage(String queueName),获取到目标队列并取出消息执行逻辑。值得注意的是由于ConcurrentLinkedQueue没有限制容量,如果不能及时消费掉里面存储的消息,可能会造成内存占用过多甚至溢出,因此需要考虑消费的速度和调用的间隔。

    发送消息

    消息的发送也有两种方式:

    • 使用 messageChannel
    • 使用原有生产消费模式接口

    使用 messageChannel

    相对于使用监听,发送也可以使用新的messageChannel方式。

    @Service
    public class TestSender  {
        @Autowired
        @Qualifier("output1")
        MessageChannel output1;
        
        public void send(){
            output1.send(MessageBuilder.withPayload("您好,这是一个测试消息").build());
        }
    }
    

    上述代码中,output1为spring bean的名称,代表了发送的信道代理。直接使用它就可以将消息发送到对应信道。

    使用原有的适配

    请参考上述的MessageQueueManager接口,调用public void sendMessage(String queueName, Object message) 指定队列名即可。原理是,适配代码中已将messagechannel封装起来:

    @Service
    @EnableBinding(Sink.class)
    public class StreamMessageQueneManager  implements MessageQueueManager{
        @Autowired
        @Qualifier("output1")
        MessageChannel output1;
    
        @Autowired
        @Qualifier("output2")
        MessageChannel output2;
        
        private Map<String,MessageChannel> channelMap;
        
         /**
         * bean初始化后执行这个方法
         */
        @PostConstruct
        public void postConstruct(){
            if (channelMap == null) {
                channelMap = new HashMap<>();
                channelMap.put(Sink.OUTPUT1, output1);
                channelMap.put(Sink.OUTPUT2, output2);
            }
        }
        
        @Override
        public void sendMessage(String queueName, Object message) {
            MessageChannel channel=channelMap.get(queueName);
            if(channel==null){
                throw new RuntimeException(queueName+"对应的信通不存在!");
            }
            if(message!=null) {
                channel.send(MessageBuilder.withPayload(message).build());
            }
        }
        
        @Override
        public Object getMessage(String queueName) {
            Queue queue = getQueue(queueName);
            return queue.poll();
        }
    
    }
    

    上述代码中,channel被放入Map中,通过队列名可以取出,然后发送消息。原理本质上是适配。

    改造点

    原有的双创中,只有solr和微博模块有用到消息队列的功能。
    我认为,至少下面的模块可以用消息队列:

    • solr写入消息
    • 微博发布
    • 站内信
    • 日志
    • 积分写入
    • 定时任务协调
    • 其他附合要求的模块
      什么是“附合要求”呢?

    消息队列应用点

    消息队列主要应用到以下情境:

    • 不要求该分逻辑与主逻辑实时性
    • 执行慢的逻辑
    • 任务协调 分布式集群中,同一个实例可以通过消息队列接收的唯一实例的特性进行任务协调。

    “附合要求”的其他模块是可以使用消息队列的,这需要我们在后续优化。

    相关文章

      网友评论

          本文标题:消息队列使用与改造点

          本文链接:https://www.haomeiwen.com/subject/ybobcftx.html