美文网首页
如何使用消息队列发布与订阅【易扩展】

如何使用消息队列发布与订阅【易扩展】

作者: 逸如风飞 | 来源:发表于2019-06-27 13:57 被阅读0次

    redis消息队列实现

    定义消息接受注解

    @Component
    @Retention(value = RetentionPolicy.RUNTIME)
    @Target(ElementType.TYPE)
    public @interface RedisTopic {
        String value();
    }
    

    定义消息接受interface

    public interface RedisTopicInterface<T> {
        public final static String PREX = "houdamis_";
        /**
         * 发布消息
         * @return
         */
        public String getChannel();
        /**
         * 接受消息
         * @return
         */
        public boolean receiveMsg(T message,String channel);
        /**
         * 接受消息
         * @return
         */
        public boolean isReceiver( String channel);
    
    }
    

    消息接受代码示例

    @RedisTopic(value = TOPIC)
    public class DataSourceReloadService  implements RedisTopicInterface {
        /**
            必须参数
         * 定义消息队列主题
         */
        public final static String TOPIC = RedisTopicInterface.PREX + "dbsync";
    
        public   final Logger logger = LoggerFactory.getLogger(getClass());
    
        @Override
        public String getChannel( ) {
            return TOPIC;
        }
    
        /**
         * 接受消息
         *
         * @param message 消息内容
         * @param channel 消息主题 同topic
         * @return
         */
        @Override
        public boolean receiveMsg(Object message, String channel) {
            if(StringUtils.isNotEmpty(channel) && TOPIC.equals(channel)){
                try {
    //               TODO 进行业务处理
                    xxxx
                } catch (Exception e) {
                    e.printStackTrace();
                    logger.error("{} 接受消息失败:{}",TOPIC ,e.getMessage());
                }
            }
            return false;
        }
    
        /**
         * 接受消息
         *
         * @param channel
         * @return
         */
        @Override
        public boolean isReceiver(String channel) {
            if(StringUtils.isNotEmpty(channel) && TOPIC.equals(channel)){
                return true;
            }
            return false;
        }
    }
    

    消息队列工具类

    public class RedisTopicUtils {
        private static RedisTemplate redisTemplate = SpringContextHolder.getBean("redisTemplate");
        private static ConcurrentHashMap<String , Set<RedisTopicInterface>> topicBeans = new ConcurrentHashMap<String, Set<RedisTopicInterface>>();
    
        /**
         * 发布消息
         * @param channel
         * @param messgae
         * @return
         */
        public static boolean sendMessage(String channel ,Object messgae){
            redisTemplate.convertAndSend(channel, messgae);
            return true;
        }
    
        /**
         * 接受消息
         * @param channel
         * @param messgae
         */
        public static void receiveMessage(String channel, Object messgae) {
            Set<RedisTopicInterface> beans = getReceiver(channel);
            for (RedisTopicInterface v : beans) {
                try {
                    v.receiveMsg(messgae,channel);
                } catch (Throwable e) {
                    e.printStackTrace();
                }
            }
        }
    
        /**
         * 注册RedisTopicInterface
         * @param receiver
         */
        public static void regist(RedisTopicInterface receiver){
            String key = receiver.getClass().getAnnotation(RedisTopic.class).value();
            Set<RedisTopicInterface>  beans = getReceiver(key);
            beans.add(receiver);
            topicBeans.put(key,beans);
        }
    
        public static Set<RedisTopicInterface> getReceiver(String key){
            Set<RedisTopicInterface>  beans = topicBeans.get(key);
            if(beans == null){
                beans = Sets.newHashSet();
            }
            return beans;
        }
    
        /**
         * 初始化
         */
        public static void init(){
            try {
                Map<String, Object> serviceBeanMap = SpringContextHolder.getApplicationContext().getBeansWithAnnotation(RedisTopic.class);
                if (serviceBeanMap != null && !serviceBeanMap.isEmpty()) {
                    for (Object serviceBean : serviceBeanMap.values()) {
                        String interfaceName = serviceBean.getClass().getAnnotation(RedisTopic.class).value();
                        if(null != interfaceName){
                             regist( (RedisTopicInterface) serviceBean);
                            System.out.println(serviceBean.getClass().getName()+ " regist to RedisTopic[" + interfaceName+"]");
                        }else{
                            System.err.println(serviceBean.getClass().getName()+ "can't regist to RedisTopic[" + interfaceName+"]");
                        }
                    }
                }
            } catch ( Throwable e) {
                e.printStackTrace();
            }
        }
    
    
    }
    

    消息队列-接受者注册

    系统启动时进行注册

    @Service
    public class DataSourceInitListener implements ApplicationListener<ContextRefreshedEvent> {
    
        @Override
        public void onApplicationEvent(ContextRefreshedEvent evt) {
            
            RedisTopicUtils.init();
        }
    }
    

    redis xml配置

    <!-- 定义监听类 -->
        <bean id="redisMessageListener" class="com.thinkgem.jeesite.common.redis.topic.RedisMessageListener">
            <property name="redisTemplate" ref="redisTemplate"/>
        </bean>
        <!-- 定义监听容器 -->
        <bean id="redisMessageListenerContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer"
              destroy-method="destroy">
            <property name="connectionFactory" ref="jedisConnectionFactory"/>
            <!-- 任务执行器 -->
            <property name="taskExecutor">
                <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler">
                    <property name="poolSize" value="10"/>
                </bean>
            </property>
            <!-- 消息监听器 -->
            <property name="messageListeners">
                <map>
                    <entry key-ref="redisMessageListener">
                        <list>
                            <bean class="org.springframework.data.redis.listener.PatternTopic">
                                <constructor-arg value="*" />
                            </bean>
                        </list>
                    </entry>
                </map>
            </property>
        </bean>
    

    消息监听

    RedisMessageListener监听到消息之后,交由RedisTopicUtils处理,RedisTopicUtils根据topic找到已注册的Set<RedisTopicInterface>,然后通知每个RedisTopicInterface元素进行处理。

    
    /**
     * redis消息接受
     */
    public class RedisMessageListener implements MessageListener {
    
        @Autowired
        private RedisTemplate<String, Object> redisTemplate;
    
        @Override
        public void onMessage(Message message, byte[] pattern) {
            byte[] body = message.getBody();
            byte[] channel = message.getChannel();
            String msgChannel = (String) getRedisTemplate().getKeySerializer().deserialize(channel);
            RedisTopicUtils.receiveMessage(msgChannel,getRedisTemplate().getValueSerializer().deserialize(body));
        }
        public RedisTemplate<String, Object> getRedisTemplate() {
            return redisTemplate;
        }
        public void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) {
            this.redisTemplate = redisTemplate;
        }
        
    }
    

    消息发布

    //        发布 消息
            RedisTopicUtils.sendMessage(DataSourceReloadService.TOPIC, “这是xx消息”;
    

    如何使用消息队列发布与订阅

    订阅消息

    新增代码 实现RedisTopicInterface,添加@RedisTopic注解,并设置TOPIC对应的主题,实现3个方法(有2个直接使用示例代码即可)
    主要实现receiveMsg()方法。

    @RedisTopic(value = TOPIC)
    public class AAAService  implements RedisTopicInterface {
    /** * 定义消息队列主题 */
    public final static String TOPIC = RedisTopicInterface.PREX + "dbsync";
        /**
         * 接受消息
         * @param message
         * @param channel
         * @return
         */
        @Override
        public boolean receiveMsg(Object message, String channel) {
            if(StringUtils.isNotEmpty(channel) && TOPIC.equals(channel)){
                try {
    //             TODO   进行业务处理
                   此处实现
                } catch (Exception e) {
                    e.printStackTrace();
                    logger.error("{} 接受消息失败:{}",TOPIC ,e.getMessage());
                }
            }
            return false;
        }
         @Override
        public String getChannel( ) {
            return TOPIC;
        }
    
        /**
         * 接受消息
         *
         * @param channel
         * @return
         */
        @Override
        public boolean isReceiver(String channel) {
            if(StringUtils.isNotEmpty(channel) && TOPIC.equals(channel)){
                return true;
            }
            return false;
        }
    
    

    发布消息

    //        发布 消息
           RedisTopicUtils.sendMessage(AAAService.TOPIC, “这是xx消息”;
    

    相关文章

      网友评论

          本文标题:如何使用消息队列发布与订阅【易扩展】

          本文链接:https://www.haomeiwen.com/subject/pkxrcctx.html