美文网首页
Redis Queue

Redis Queue

作者: spilledyear | 来源:发表于2018-05-09 17:53 被阅读0次

    消息模式

    一般来说,消息队列有两种模式:

    • 生产者消费者模式(Queue);
    image.png

    消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。
    消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。

    • 一种是发布者订阅者模式(Topic);
    image.png

    消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。

    利用redis这两种场景的消息队列都能实现。

    Queue 模式

    生产者生产消息放到队列中,多个消费者同时监听队列,谁先抢到消息谁就会从队列中取走消息,即对于每个消息最多只能被一个消费者拥有。
    具体的方法就是创建一个任务队列,生产者主动lpush消息,而消费者去rpop数据。但是这样存在一个问题,就是消费者需要主动去请求数据,周期性的请求会造成资源的浪费。如果可以实现一旦有新消息加入队列就通知消费者就好了,这时借助brpop命令就可以实现这样的需求。brpop和rpop命令相似,唯一区别就是当列表中没有元素时,brpop命令会一直阻塞住连接,直到有新元素加入。

    package com.hand.hap.message;
    
    import redis.clients.jedis.Jedis;
    
    import java.util.List;
    
    public class TestQueue {
        static Jedis jedis = new Jedis("localhost", 6379);
    
        public static void main(String[] args) throws Exception {
            TestQueue test = new TestQueue();
            Thread thred2 = new Thread(test.runnable, "消息发送者线程");
            thred2.start();
    
            while (true) {
                Thread.currentThread().sleep(1);
    
                // 设置超时时间为0,表示无限期阻塞
                List<String> message = jedis.brpop(0, "queue1");
    
                System.out.println(message.toString());
            }
        }
    
    
        Runnable runnable = () -> {
            Long count = 0L;
            while (count < 10) {
                System.out.println(Thread.currentThread().getName());
                count++;
                jedis.lpush("queue1", "message: hello redis queue" + count);
    
            }
        };
    
    }
    

    分别在命令行下执行以下命令,结果如下

    brpop queue1 0
    
    pop queue1 0
    
    image.png

    发布者订阅者模式

    发布者生产消息放到队列里,多个监听队列的订阅者都会受到同一份消息,订阅者可以订阅多个Topic。

    package com.hand.hap.message;
    
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPubSub;
    
    import java.util.Date;
    
    public class TestTopic {
        static Jedis jedis = new Jedis("localhost", 6379);
    
        public static void main(String[] args) throws Exception {
            MessageHandler n = new MessageHandler();
            Thread thread1 = new Thread(n);
            thread1.start();
    
            Thread thread2 = new Thread(n);
            thread2.start();
    
            Thread thread3 = new Thread(n);
            thread3.start();
    
            Thread.currentThread().sleep(1000);
    
            // 向“channel1”的频道发送消息, 返回订阅者的数量
            Long publishCount = jedis.publish("channel1", new Date() + ": hello redis channel1");
            System.out.println("发送成功,该频道有" + publishCount + "个订阅者");
            jedis.publish("channel1", "close channel");
    
    
        }
    }
    
    
    class MessageHandler extends JedisPubSub implements Runnable {
    
        /**
         * channel频道接收到新消息后,执行的逻辑
         *
         * @param channel
         * @param message
         */
        @Override
        public void onMessage(String channel, String message) {
            // 执行逻辑
            System.out.println(channel + "频道发来消息:" + message);
            // 如果消息为 close channel, 则取消此频道的订阅
            if ("close channel".equals(message)) {
                this.unsubscribe(channel);
            }
        }
    
        /**
         * channel频道有新的订阅者时执行的逻辑
         *
         * @param channel
         * @param subscribedChannels
         */
        @Override
        public void onSubscribe(String channel, int subscribedChannels) {
            System.out.println(channel + "频道新增了" + subscribedChannels + "个订阅者");
        }
    
    
        /**
         * channel频道有订阅者退订时执行的逻辑
         *
         * @param channel
         * @param subscribedChannels
         */
        @Override
        public void onUnsubscribe(String channel, int subscribedChannels) {
            System.out.println(channel + "频道退订成功");
        }
    
        @Override
        public void run() {
            MessageHandler handler = new MessageHandler();
            Jedis jedis = new Jedis("localhost", 6379);
            jedis.subscribe(handler, "channel1");
    
            /**
             * 使用下面会报错
             * ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context
             */
    //        TestTopic.jedis.subscribe(handler, "channel1");
        }
    }
    

    Spring继承

    本章节要介绍 Redis两种消息队列模式在Spring中的应用,依赖于spring-data-redis。所以在此之前,简单的了解以下spring-data-redis。

    RedisMessageListenerContainer

    RedisMessageListenerContainer在spring-data-redis中负责消息监听。客户程序需要自己实现MessageListener,并以指定的topic注册到RedisMessageListenerContainer,这样,在指定的topic上如果有消息,RedisMessageListenerContainer便会通知该MessageListener。好了,有关于对 spring-data-redis 的了解到这里就可以结束了,因为本文的主要目的是介绍Redis两种消息队列模式在Hap中的应用,其它的可以根据自己兴趣选择看或者不看。

    配置文件

    Hap中提供了两种消息队列支持:Redis 和 RabbitMQ。相对来说 RabbitMQ 相对 Redis 更重量级一些,如果只是一些简单的业务场景,使用 Redis 作为消息队列也足够了。RabbitMQ不属本文的讲解范文,所以主要看看 Redis 消息队列的配置文件。

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd">
    
    
        <bean id="mapSerializer" class="org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer">
            <constructor-arg type="java.lang.Class" value="java.util.HashMap"/>
            <property name="objectMapper" ref="objectMapper"/>
        </bean>
    
        <!-- 发布消息工具类 -->
        <bean class="com.hand.hap.message.impl.MessagePublisherImpl"/>
    
        <!--发布/订阅监听-->
        <bean class="com.hand.hap.message.TopicListenerContainer">
            <property name="connectionFactory" ref="v2redisConnectionFactory"/>
            <property name="recoveryInterval" value="10000"/>
            <!--<property name="messageListeners" ref="messageListeners"/>-->
        </bean>
    
        <bean id="simpleQueueConsumer" class="com.hand.hap.message.impl.SimpleMessageConsumer"/>
    
        <!--队列监听-->
        <bean class="com.hand.hap.message.QueueListenerContainer">
            <property name="connectionFactory" ref="v2redisConnectionFactory"/>
            <property name="recoveryInterval" value="5000"/>
            <property name="stringRedisSerializer" ref="stringRedisSerializer"/>
            <property name="listeners">
                <list>
                    <!-- auto detect bean with annotation QueueMonitor -->
                </list>
            </property>
        </bean>
    
    </beans>
    

    v2redisConnectionFactory 在application-redis.x配置文件中已经定义过了,这里没必要刨根问底这些配置项对应的的各个含义,没有太大意思。SimpleMessageConsumer 在这里没什么做哦有那个,所以,这里主要关注的内容有个:MessagePublisherImplTopicListenerContainer、、QueueListenerContainer。除了MessagePublisherImpl是和消息发布有关,其它两个都是属于消息监听,刚好对应Redis消息队列的两种模式,TopicListenerContainer 对应 Topic 模式, QueueListenerContainer对应 Queue模式。

    MessagePublisherImpl

    这其实是一个发布消息工具类。不太清楚为什么要在配置文件里面配置,加个注解不是会更好吗?或许也是为了方便我们理解吧。其源码如下

    /*
     * #{copyright}#
     */
    
    package com.hand.hap.message.impl;
    
    import com.hand.hap.message.components.ChannelAndQueuePrefix;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.stereotype.Component;
    
    import com.fasterxml.jackson.core.JsonProcessingException;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import com.hand.hap.message.IMessagePublisher;
    
    @Component
    public class MessagePublisherImpl implements IMessagePublisher {
    
        @Autowired
        private ObjectMapper objectMapper;
    
        @Autowired
        private RedisTemplate<String, String> redisTemplate;
    
        private Logger logger = LoggerFactory.getLogger(MessagePublisherImpl.class);
    
        @Override
        public void publish(String channel, Object message) {
            //添加前缀
            channel = ChannelAndQueuePrefix.addPrefix(channel);
            if (message instanceof String || message instanceof Number) {
                redisTemplate.convertAndSend(channel, message.toString());
            } else {
                try {
                    redisTemplate.convertAndSend(channel, objectMapper.writeValueAsString(message));
                } catch (JsonProcessingException e) {
                    if (logger.isErrorEnabled()) {
                        logger.error("publish message failed.", e);
                    }
                }
            }
        }
    
        @Override
        public void rPush(String list, Object message) {
            message(list, message);
        }
    
        @Override
        public void message(String name, Object message) {
            if (message instanceof String || message instanceof Number) {
                redisTemplate.opsForList().rightPush(name, message.toString());
            } else {
                try {
                    redisTemplate.opsForList().rightPush(name, objectMapper.writeValueAsString(message));
                } catch (JsonProcessingException e) {
                    if (logger.isErrorEnabled()) {
                        logger.error("push data failed.", e);
                    }
                }
            }
        }
    }
    

    发布消息的时候,也是调用spring-data-redis的API,这里就不过多说明。

    TopicListenerContainer

    这个类要具体分析,起源吗如下:

    /*
     * Copyright Hand China Co.,Ltd.
     */
    
    package com.hand.hap.message;
    
    import com.hand.hap.core.AppContextInitListener;
    import com.hand.hap.message.components.ChannelAndQueuePrefix;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.context.ApplicationContext;
    import org.springframework.data.redis.connection.Message;
    import org.springframework.data.redis.connection.MessageListener;
    import org.springframework.data.redis.listener.PatternTopic;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    import org.springframework.data.redis.listener.Topic;
    import org.springframework.data.redis.serializer.RedisSerializer;
    
    import java.lang.reflect.Method;
    import java.util.ArrayList;
    import java.util.Collection;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    /**
     * @author shengyang.zhou@hand-china.com
     */
    public class TopicListenerContainer extends RedisMessageListenerContainer implements AppContextInitListener {
    
        private Logger logger = LoggerFactory.getLogger(TopicListenerContainer.class);
    
        @Override
        public boolean isAutoStartup(){
            return false;
        }
    
        @Override
        public void contextInitialized(ApplicationContext applicationContext) {
            Map<String, Object> monitors = applicationContext.getBeansWithAnnotation(TopicMonitor.class);
            Map<MessageListener, Collection<? extends Topic>> listeners = new HashMap<>();
            monitors.forEach((k, v) -> {
                Class<?> clazz = v.getClass();
                TopicMonitor tm = clazz.getAnnotation(TopicMonitor.class);
                String mn = MethodReflectUtils.getTopicMethodName(tm.method(), v);
                List<Method> avaMethods = MethodReflectUtils.findMethod(clazz, new MethodReflectUtils.FindDesc(mn, 2));
    
                if (avaMethods.isEmpty()) {
                    if (logger.isErrorEnabled()) {
                        logger.error("can not find proper method of name '{}' for bean {}", mn, v);
                    }
                    return;
                }
    
                SimpleMessageListener adaptor = new SimpleMessageListener(v, avaMethods.get(0));
                List<Topic> topics = new ArrayList<>();
                for (String t : tm.channel()) {
                    //添加前缀
                    t = ChannelAndQueuePrefix.addPrefix(t);
                    Topic topic = new PatternTopic(t);
                    topics.add(topic);
                }
    
                listeners.put(adaptor, topics);
            });
            setMessageListeners(listeners);
            // start(); // auto call
    //        if (listeners != null) {
    //            for (ITopicMessageListener receiver : listeners) {
    //                MessageListenerAdapter messageListener = new MessageListenerAdapter(receiver, "onTopicMessage");
    //                if (receiver.getRedisSerializer() != null) {
    //                    messageListener.setSerializer(receiver.getRedisSerializer());
    //                }
    //                messageListener.afterPropertiesSet();
    //                List<Topic> topics = new ArrayList<>();
    //                for (String t : receiver.getTopic()) {
    //                    Topic topic = new PatternTopic(t);
    //                    topics.add(topic);
    //                }
    //                listeners.put(messageListener, topics);
    //            }
    //        }
        }
    
        private static class SimpleMessageListener implements MessageListener {
            private RedisSerializer redisSerializer;
    
            private Object target;
            private Method method;
    
            private Logger logger;
    
            SimpleMessageListener(Object target, Method method) {
                this.target = target;
                this.method = method;
                Class p0 = method.getParameterTypes()[0];
                redisSerializer = MethodReflectUtils.getProperRedisSerializer(p0);
                logger = LoggerFactory.getLogger(target.getClass());
            }
    
            @Override
            public void onMessage(Message message, byte[] pattern) {
                try {
                    Object obj = redisSerializer.deserialize(message.getBody());
                    String p = new String(pattern, "UTF-8");
                    //去掉前缀
                    p = ChannelAndQueuePrefix.removePrefix(p);
                    method.invoke(target, obj, p);
                } catch (Exception e) {
                    Throwable thr = e;
                    while (thr.getCause() != null) {
                        thr = thr.getCause();
                    }
                    if (logger.isErrorEnabled()) {
                        logger.error(thr.getMessage(), thr);
                    }
                }
            }
        }
    }
    

    实现了 AppContextInitListener 类,所以在项目启动的时候 contextInitialized 方法将被调用。整体思路就是:在项目启动的时候,根据注解找到所以的 订阅者,然后维护“订阅者”和“主题”的关系,然后交给消息监听器。

            Map<String, Object> monitors = applicationContext.getBeansWithAnnotation(TopicMonitor.class);
    
    

    就是通过这行代码找到所有的消息订阅者。TopicMonitor 注解如下,包括两个属性:channel 和 method。channel 就是主题,;可能大家对method 的作用不是很理解,我们知道,实现了 MessageListener 的消息监听器,在收到消息的时候,会执行 onMessage() 方法,这个 method ,就是为了让消息监听器收到消息时可以执行别的方法,而这个方法名就是通过 method 属性定义。

    package com.hand.hap.message;
    
    import java.lang.annotation.ElementType;
    import java.lang.annotation.Retention;
    import java.lang.annotation.RetentionPolicy;
    import java.lang.annotation.Target;
    
    
    @Target({ ElementType.TYPE })
    @Retention(RetentionPolicy.RUNTIME)
    public @interface TopicMonitor {
        String[] channel() default {};
    
        /**
         * default empty,auto detect depends on object type.
         * <p>
         * IQueueMessageListener:onQueueMessage<br>
         * OTHERS:onMessage
         *
         */
        String method() default "";
    }
    

    获取到所有的消息执订阅者之后,然后进行遍历,下面就是获取方法名的具体逻辑

        String mn = MethodReflectUtils.getTopicMethodName(tm.method(), v);
    
    
        public static String getTopicMethodName(String mn, Object target) {
            if (org.apache.commons.lang.StringUtils.isBlank(mn)) {
                if (target instanceof ITopicMessageListener) {
                    mn = ITopicMessageListener.DEFAULT_METHOD_NAME;
                } else {
                    mn = IMessageConsumer.DEFAULT_METHOD_NAME;
                }
            }
            return mn;
        }
    

    也就是说,如果代码中是同通过 实现 ITopicMessageListener 来实现消息监听的话,则会执行 onTopicMessage 方法,或者会执行 onMessage 方法。

    image.png
    image.png

    如上所示,在获取到方法名之后,组装 "消息订阅者" 和 "Topic" 为Map对象, 因为 一个 订阅者 可以订阅多个 Topic,所以是一对多的关系。组装之后,然后将 listeners 交给 spring-data-redi管理,最后通过反射执行具体的逻辑代码。

    QueueListenerContainer

    QueueListenerContainer没有继承RedisMessageListenerContainer,所以它的实现方式有些不同,相当于是框架为我们封装了一套API,其具体实现如下:

    /*
     * #{copyright}#
     */
    
    package com.hand.hap.message;
    
    import java.lang.reflect.Method;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.DisposableBean;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.SmartLifecycle;
    import org.springframework.data.redis.connection.RedisConnection;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.serializer.RedisSerializer;
    import org.springframework.scheduling.SchedulingAwareRunnable;
    import org.springframework.util.Assert;
    
    import com.hand.hap.core.AppContextInitListener;
    
    import redis.clients.jedis.Jedis;
    
    
    public class QueueListenerContainer implements AppContextInitListener, DisposableBean, SmartLifecycle {
    
        private Logger logger = LoggerFactory.getLogger(QueueListenerContainer.class);
    
        private RedisConnectionFactory connectionFactory;
    
        private static final int PHASE = 9999;
    
        private static final long MIN_RECOVERY_INTERVAL = 2000L;
    
        private static final long DEFAULT_RECOVERY_INTERVAL = 5000L;
    
        /**
         * 100ms.
         */
        private static final long IDLE_SLEEP_TIME = 100L;
    
        private long recoveryInterval = DEFAULT_RECOVERY_INTERVAL;
    
        private volatile boolean running = false;
    
        private ExecutorService executorService;
    
        private List<IQueueMessageListener<?>> listeners;
    
        private List<MonitorTask> monitorTaskList = new ArrayList<>();
    
        private RedisSerializer<String> stringRedisSerializer;
    
        public RedisConnectionFactory getConnectionFactory() {
            return connectionFactory;
        }
    
        public void setConnectionFactory(RedisConnectionFactory connectionFactory) {
            this.connectionFactory = connectionFactory;
        }
    
        public long getRecoveryInterval() {
            return recoveryInterval;
        }
    
        public void setRecoveryInterval(long recoveryInterval) {
            this.recoveryInterval = recoveryInterval;
            if (recoveryInterval < MIN_RECOVERY_INTERVAL) {
                if (logger.isWarnEnabled()) {
                    logger.warn("minimum for recoveryInterval is {}", MIN_RECOVERY_INTERVAL);
                }
                this.recoveryInterval = MIN_RECOVERY_INTERVAL;
            }
        }
    
        public List<IQueueMessageListener<?>> getListeners() {
            return listeners;
        }
    
        public void setListeners(List<IQueueMessageListener<?>> listeners) {
            this.listeners = listeners;
        }
    
        public RedisSerializer<String> getStringRedisSerializer() {
            return stringRedisSerializer;
        }
    
        @Autowired
        public void setStringRedisSerializer(RedisSerializer<String> stringRedisSerializer) {
            this.stringRedisSerializer = stringRedisSerializer;
        }
    
        @Override
        public void destroy() throws Exception {
            stop();
        }
    
        @Override
        public void contextInitialized(ApplicationContext applicationContext) {
            if (listeners == null) {
                listeners = new ArrayList<>();
            }
            Map<String, Object> lts = applicationContext.getBeansWithAnnotation(QueueMonitor.class);
            lts.forEach((k, v) -> {
                Class clazz = v.getClass();
                QueueMonitor qm = (QueueMonitor) clazz.getAnnotation(QueueMonitor.class);
                final String queue = qm.queue();
                String mn = MethodReflectUtils.getQueueMethodName(qm.method(), v);
                List<Method> methods = MethodReflectUtils.findMethod(clazz, new MethodReflectUtils.FindDesc(mn, 2));
                if (methods.isEmpty()) {
                    if (logger.isErrorEnabled()) {
                        logger.error("can not find proper method of name '{}' for bean {}", mn, v);
                    }
                    return;
                }
                final Method method = methods.get(0);
                IQueueMessageListener qml = new SimpleQueueListener(queue, v, method);
                listeners.add(qml);
    
            });
            executorService = Executors.newFixedThreadPool(listeners.size());
            for (IQueueMessageListener<?> receiver : listeners) {
                MonitorTask task = new MonitorTask(receiver);
                monitorTaskList.add(task);
                executorService.execute(task);
            }
        }
    
        @Override
        public boolean isAutoStartup() {
            return true;
        }
    
        @Override
        public void stop(Runnable callback) {
            stop();
            callback.run();
        }
    
        @Override
        public void start() {
            if (!running) {
                running = true;
    
                if (logger.isDebugEnabled()) {
                    logger.debug("startup success");
                }
            }
        }
    
        @Override
        public void stop() {
            if (isRunning()) {
                running = false;
                monitorTaskList.forEach(MonitorTask::stop);
                executorService.shutdownNow();
                if (logger.isDebugEnabled()) {
                    logger.debug("shutdown complete");
                }
            }
        }
    
        @Override
        public boolean isRunning() {
            return running;
        }
    
        @Override
        public int getPhase() {
            return PHASE;
        }
    
        private static class SimpleQueueListener implements IQueueMessageListener {
            private String queue;
            private Object target;
            private Method method;
            private RedisSerializer redisSerializer;
            private Logger logger;
    
            SimpleQueueListener(String queue, Object target, Method method) {
                this.queue = queue;
                this.target = target;
                this.method = method;
                this.redisSerializer = MethodReflectUtils.getProperRedisSerializer(method.getParameterTypes()[0]);
                this.logger = LoggerFactory.getLogger(target.getClass());
            }
    
            @Override
            public String getQueue() {
                return queue;
            }
    
            @Override
            public RedisSerializer getRedisSerializer() {
                return redisSerializer;
            }
    
            @Override
            public void onQueueMessage(Object message, String queue) {
                try {
                    method.invoke(target, message, queue);
                } catch (Exception e) {
                    Throwable thr = e;
                    while (thr.getCause() != null) {
                        thr = thr.getCause();
                    }
                    if (logger.isErrorEnabled()) {
                        logger.error(thr.getMessage(), thr);
                    }
                }
            }
        }
    
        /**
         * 
         * @param <T>
         */
        private class MonitorTask<T> implements SchedulingAwareRunnable {
    
            private IQueueMessageListener<T> receiver;
            private RedisConnection connection;
    
            private boolean running = false;
    
            MonitorTask(IQueueMessageListener<T> receiver) {
                this.receiver = receiver;
                Assert.notNull(receiver, "receiver is null.");
                Assert.hasText(receiver.getQueue(), "queue is not valid");
            }
    
            public void stop() {
                running = false;
                safeClose(true);
            }
    
            @Override
            public void run() {
                running = true;
                T message;
                while (running) {
                    try {
                        if (connection == null) {
                            connection = connectionFactory.getConnection();
                        }
                        message = fetchMessage(connection, receiver.getQueue());
                        if (message == null) {
                            sleep_(IDLE_SLEEP_TIME);
                            continue;
                        }
                    } catch (Throwable thr) {
                        if (!running) {
                            break;
                        }
                        safeClose();
                        if (logger.isDebugEnabled()) {
                            logger.error("exception occurred while get message from queue [" + receiver.getQueue() + "]",
                                    thr);
                            logger.debug("try recovery after {}ms", getRecoveryInterval());
                        }
                        sleep_(getRecoveryInterval());
                        continue;
                    }
                    try {
                        receiver.onQueueMessage(message, receiver.getQueue());
                    } catch (Throwable thr) {
                        if (logger.isWarnEnabled()) {
                            logger.warn("exception occurred while receiver consume message.", thr);
                        }
                    }
                }
                if (logger.isDebugEnabled()) {
                    logger.debug("stop monitor:" + this);
                }
                safeClose();
            }
    
            T fetchMessage(RedisConnection connection, String queue) {
                List<byte[]> bytes = connection.bLPop(0, stringRedisSerializer.serialize(queue));
                if (bytes == null || bytes.isEmpty()) {
                    return null;
                }
                return receiver.getRedisSerializer().deserialize(bytes.get(1));
            }
    
            void safeClose(boolean... closeNative) {
                if (connection != null) {
                    try {
                        if (closeNative.length > 0 && closeNative[0]) {
                            // close native connection to interrupt blocked
                            // operation
                            ((Jedis) connection.getNativeConnection()).disconnect();
                        }
                        connection.close();
                    } catch (Exception e) {
                        // if (logger.isErrorEnabled()) {
                        // logger.error(e.getMessage(), e);
                        // }
                    }
                }
                connection = null;
            }
    
            void sleep_(long time) {
                try {
                    Thread.sleep(time);
                } catch (Exception e) {
                    // if (logger.isErrorEnabled()) {
                    // logger.error(e.getMessage(), e);
                    // }
                }
            }
    
            @Override
            public boolean isLongLived() {
                return true;
            }
        }
    }
    

    前面的实现基本类似,区别如果实现了IQueueMessageListener 类,消息监听器收到消息的时候会执行 onQueueMessage 方法;另一个区别就是这里用了另外一个 注解 QueueMonitor

    package com.hand.hap.message;
    
    import java.lang.annotation.ElementType;
    import java.lang.annotation.Retention;
    import java.lang.annotation.RetentionPolicy;
    import java.lang.annotation.Target;
    
    
    @Target({ ElementType.TYPE })
    @Retention(RetentionPolicy.RUNTIME)
    public @interface QueueMonitor {
        String queue() default "";
    
        /**
         * default empty,auto detect depends on object type.
         * <p>
         * ITopicMessageListener:onTopicMessage<br>
         * OTHERS:onMessage
         * 
         */
        String method() default "";
    
    }
    
    image.png
    image.png

    获取到所有的消息监听器之后,开启多线程执行消息监听。因为存在多个消费者监听同一个队列的情况,存在消费者竞争,所以需要判断一下消息是否还在。

    使用方式

    package com.hand.hap.activiti.manager;
    
    import com.hand.hap.activiti.util.ActivitiUtils;
    import com.hand.hap.hr.dto.Employee;
    import com.hand.hap.hr.dto.Position;
    import com.hand.hap.hr.mapper.EmployeeMapper;
    import com.hand.hap.hr.mapper.PositionMapper;
    import com.hand.hap.message.IMessageConsumer;
    import com.hand.hap.message.TopicMonitor;
    import org.activiti.engine.identity.Group;
    import org.activiti.engine.impl.persistence.entity.UserEntity;
    import org.activiti.engine.impl.persistence.entity.data.impl.MybatisUserDataManager;
    import org.activiti.spring.SpringProcessEngineConfiguration;
    import org.springframework.beans.factory.InitializingBean;
    import org.springframework.beans.factory.annotation.Autowired;
    
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    /**
     * @author shengyang.zhou@hand-china.com
     */
    @TopicMonitor(channel = "employee.change")
    public class CustomUserDataManager extends MybatisUserDataManager
            implements IMessageConsumer<Employee>, InitializingBean {
    
        @Autowired
        private PositionMapper positionMapper;
    
        @Autowired
        private EmployeeMapper employeeMapper;
    
        public Map<String, UserEntity> userCache = new HashMap<>();
    
        @Autowired
        private SpringProcessEngineConfiguration pec;
    
        public CustomUserDataManager() {
            super(null);
        }
    
        @Override
        public List<Group> findGroupsByUser(String userId) {
            List<Position> positions = positionMapper.getPositionByEmployeeCode(userId);
            List<Group> gs = new ArrayList<>();
            for (Position position : positions) {
                gs.add(ActivitiUtils.toActivitiGroup(position));
            }
            return gs;
        }
    
        /**
         * 这个方法使用非常频繁,做缓存支持
         *
         * @param entityId
         * @return
         */
        @Override
        public UserEntity findById(String entityId) {
            UserEntity userEntity = userCache.get(entityId);
            if (userEntity == null) {
                Employee employee = employeeMapper.queryByCode(entityId);
                userEntity = ActivitiUtils.toActivitiUser(employee);
                userCache.put(entityId, userEntity);
            }
            return userEntity;
        }
    
        @Override
        public void onMessage(Employee message, String pattern) {
            userCache.remove(message.getEmployeeCode());
        }
    
        @Override
        public void afterPropertiesSet() throws Exception {
            this.processEngineConfiguration = pec;
        }
    }
    
    package com.hand.hap.hr.service.impl;
    
    import com.github.pagehelper.PageHelper;
    import com.github.pagehelper.StringUtil;
    import com.hand.hap.account.dto.User;
    import com.hand.hap.account.dto.UserRole;
    import com.hand.hap.account.mapper.UserRoleMapper;
    import com.hand.hap.account.service.IUserRoleService;
    import com.hand.hap.account.service.IUserService;
    import com.hand.hap.cache.impl.UserCache;
    import com.hand.hap.core.IRequest;
    import com.hand.hap.hr.dto.Employee;
    import com.hand.hap.hr.dto.UserAndRoles;
    import com.hand.hap.hr.mapper.EmployeeAssignMapper;
    import com.hand.hap.hr.mapper.EmployeeMapper;
    import com.hand.hap.hr.service.IEmployeeService;
    import com.hand.hap.message.IMessagePublisher;
    import com.hand.hap.message.TopicMonitor;
    import com.hand.hap.mybatis.common.Criteria;
    import com.hand.hap.system.dto.DTOStatus;
    import com.hand.hap.system.service.impl.BaseServiceImpl;
    import org.apache.commons.collections.CollectionUtils;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.ApplicationContext;
    import org.springframework.stereotype.Service;
    import org.springframework.transaction.annotation.Transactional;
    
    import java.util.List;
    
    @Service
    public class EmployeeServiceImpl extends BaseServiceImpl<Employee> implements IEmployeeService {
    
        @Autowired
        EmployeeMapper employeeMapper;
    
        @Autowired
        private IMessagePublisher messagePublisher;
    
        @Autowired
        private IUserRoleService userRoleService;
    
        @Autowired
        private IUserService userService;
    
        @Autowired
        private UserRoleMapper userRoleMapper;
    
        @Autowired
        private EmployeeAssignMapper employeeAssignMapper;
    
        @Autowired
        private UserCache userCache;
    
        @Autowired
        private ApplicationContext applicationContext;
    
        @Override
        public List<Employee> submit(IRequest request, List<Employee> list) {
            self().batchUpdate(request, list);
            for (Employee e : list) {
                messagePublisher.publish("employee.change", e);
            }
            return list;
        }
    .......
    }
    

    当在界面修改了员工信息的时候,会调用 messagePublisher.publish("employee.change", e); 方法,因为 CustomUserDataManager 监听了 "employee.change" 主题的消息,所以在发布之后,会执行CustomUserDataManager .onMessage 的方法,消息队列的使用方式与之类似。

    思考

    在Hap中,定义过多的 Queue 消息监听器会影响系统性能吗?一个消息监听器就要开启一个线程,如果消息监听器太多,线程池够用吗?不知道自己的思路有没有问题,值得验证。

    相关文章

      网友评论

          本文标题:Redis Queue

          本文链接:https://www.haomeiwen.com/subject/cmonrftx.html