美文网首页
如何使用消息队列发布与订阅【易扩展】

如何使用消息队列发布与订阅【易扩展】

作者: 逸如风飞 | 来源:发表于2019-06-27 13:57 被阅读0次

redis消息队列实现

定义消息接受注解

@Component
@Retention(value = RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface RedisTopic {
    String value();
}

定义消息接受interface

public interface RedisTopicInterface<T> {
    public final static String PREX = "houdamis_";
    /**
     * 发布消息
     * @return
     */
    public String getChannel();
    /**
     * 接受消息
     * @return
     */
    public boolean receiveMsg(T message,String channel);
    /**
     * 接受消息
     * @return
     */
    public boolean isReceiver( String channel);

}

消息接受代码示例

@RedisTopic(value = TOPIC)
public class DataSourceReloadService  implements RedisTopicInterface {
    /**
        必须参数
     * 定义消息队列主题
     */
    public final static String TOPIC = RedisTopicInterface.PREX + "dbsync";

    public   final Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public String getChannel( ) {
        return TOPIC;
    }

    /**
     * 接受消息
     *
     * @param message 消息内容
     * @param channel 消息主题 同topic
     * @return
     */
    @Override
    public boolean receiveMsg(Object message, String channel) {
        if(StringUtils.isNotEmpty(channel) && TOPIC.equals(channel)){
            try {
//               TODO 进行业务处理
                xxxx
            } catch (Exception e) {
                e.printStackTrace();
                logger.error("{} 接受消息失败:{}",TOPIC ,e.getMessage());
            }
        }
        return false;
    }

    /**
     * 接受消息
     *
     * @param channel
     * @return
     */
    @Override
    public boolean isReceiver(String channel) {
        if(StringUtils.isNotEmpty(channel) && TOPIC.equals(channel)){
            return true;
        }
        return false;
    }
}

消息队列工具类

public class RedisTopicUtils {
    private static RedisTemplate redisTemplate = SpringContextHolder.getBean("redisTemplate");
    private static ConcurrentHashMap<String , Set<RedisTopicInterface>> topicBeans = new ConcurrentHashMap<String, Set<RedisTopicInterface>>();

    /**
     * 发布消息
     * @param channel
     * @param messgae
     * @return
     */
    public static boolean sendMessage(String channel ,Object messgae){
        redisTemplate.convertAndSend(channel, messgae);
        return true;
    }

    /**
     * 接受消息
     * @param channel
     * @param messgae
     */
    public static void receiveMessage(String channel, Object messgae) {
        Set<RedisTopicInterface> beans = getReceiver(channel);
        for (RedisTopicInterface v : beans) {
            try {
                v.receiveMsg(messgae,channel);
            } catch (Throwable e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 注册RedisTopicInterface
     * @param receiver
     */
    public static void regist(RedisTopicInterface receiver){
        String key = receiver.getClass().getAnnotation(RedisTopic.class).value();
        Set<RedisTopicInterface>  beans = getReceiver(key);
        beans.add(receiver);
        topicBeans.put(key,beans);
    }

    public static Set<RedisTopicInterface> getReceiver(String key){
        Set<RedisTopicInterface>  beans = topicBeans.get(key);
        if(beans == null){
            beans = Sets.newHashSet();
        }
        return beans;
    }

    /**
     * 初始化
     */
    public static void init(){
        try {
            Map<String, Object> serviceBeanMap = SpringContextHolder.getApplicationContext().getBeansWithAnnotation(RedisTopic.class);
            if (serviceBeanMap != null && !serviceBeanMap.isEmpty()) {
                for (Object serviceBean : serviceBeanMap.values()) {
                    String interfaceName = serviceBean.getClass().getAnnotation(RedisTopic.class).value();
                    if(null != interfaceName){
                         regist( (RedisTopicInterface) serviceBean);
                        System.out.println(serviceBean.getClass().getName()+ " regist to RedisTopic[" + interfaceName+"]");
                    }else{
                        System.err.println(serviceBean.getClass().getName()+ "can't regist to RedisTopic[" + interfaceName+"]");
                    }
                }
            }
        } catch ( Throwable e) {
            e.printStackTrace();
        }
    }


}

消息队列-接受者注册

系统启动时进行注册

@Service
public class DataSourceInitListener implements ApplicationListener<ContextRefreshedEvent> {

    @Override
    public void onApplicationEvent(ContextRefreshedEvent evt) {
        
        RedisTopicUtils.init();
    }
}

redis xml配置

<!-- 定义监听类 -->
    <bean id="redisMessageListener" class="com.thinkgem.jeesite.common.redis.topic.RedisMessageListener">
        <property name="redisTemplate" ref="redisTemplate"/>
    </bean>
    <!-- 定义监听容器 -->
    <bean id="redisMessageListenerContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer"
          destroy-method="destroy">
        <property name="connectionFactory" ref="jedisConnectionFactory"/>
        <!-- 任务执行器 -->
        <property name="taskExecutor">
            <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler">
                <property name="poolSize" value="10"/>
            </bean>
        </property>
        <!-- 消息监听器 -->
        <property name="messageListeners">
            <map>
                <entry key-ref="redisMessageListener">
                    <list>
                        <bean class="org.springframework.data.redis.listener.PatternTopic">
                            <constructor-arg value="*" />
                        </bean>
                    </list>
                </entry>
            </map>
        </property>
    </bean>

消息监听

RedisMessageListener监听到消息之后,交由RedisTopicUtils处理,RedisTopicUtils根据topic找到已注册的Set<RedisTopicInterface>,然后通知每个RedisTopicInterface元素进行处理。


/**
 * redis消息接受
 */
public class RedisMessageListener implements MessageListener {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @Override
    public void onMessage(Message message, byte[] pattern) {
        byte[] body = message.getBody();
        byte[] channel = message.getChannel();
        String msgChannel = (String) getRedisTemplate().getKeySerializer().deserialize(channel);
        RedisTopicUtils.receiveMessage(msgChannel,getRedisTemplate().getValueSerializer().deserialize(body));
    }
    public RedisTemplate<String, Object> getRedisTemplate() {
        return redisTemplate;
    }
    public void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }
    
}

消息发布

//        发布 消息
        RedisTopicUtils.sendMessage(DataSourceReloadService.TOPIC, “这是xx消息”;

如何使用消息队列发布与订阅

订阅消息

新增代码 实现RedisTopicInterface,添加@RedisTopic注解,并设置TOPIC对应的主题,实现3个方法(有2个直接使用示例代码即可)
主要实现receiveMsg()方法。

@RedisTopic(value = TOPIC)
public class AAAService  implements RedisTopicInterface {
/** * 定义消息队列主题 */
public final static String TOPIC = RedisTopicInterface.PREX + "dbsync";
    /**
     * 接受消息
     * @param message
     * @param channel
     * @return
     */
    @Override
    public boolean receiveMsg(Object message, String channel) {
        if(StringUtils.isNotEmpty(channel) && TOPIC.equals(channel)){
            try {
//             TODO   进行业务处理
               此处实现
            } catch (Exception e) {
                e.printStackTrace();
                logger.error("{} 接受消息失败:{}",TOPIC ,e.getMessage());
            }
        }
        return false;
    }
     @Override
    public String getChannel( ) {
        return TOPIC;
    }

    /**
     * 接受消息
     *
     * @param channel
     * @return
     */
    @Override
    public boolean isReceiver(String channel) {
        if(StringUtils.isNotEmpty(channel) && TOPIC.equals(channel)){
            return true;
        }
        return false;
    }

发布消息

//        发布 消息
       RedisTopicUtils.sendMessage(AAAService.TOPIC, “这是xx消息”;

相关文章

  • 如何使用消息队列发布与订阅【易扩展】

    redis消息队列实现 定义消息接受注解 定义消息接受interface 消息接受代码示例 消息队列工具类 消息队...

  • 最通俗易懂的Redis发布订阅及代码实战

    发布订阅简介 除了使用List实现简单的消息队列功能以外,Redis还提供了发布订阅的消息机制。在这种机制下,消息...

  • MQ

    1. 消息队列的通信模型 点对点模式 发布/订阅 2.kafka使用场景 消息队列 追踪网站活动 Metrics(...

  • redis的高级特性

    发布订阅 特点 可以使用发布订阅来实现消息队列,但因为发出的消息不会被持久化,所以消费者只能收到从它开始订阅这个频...

  • Redis实现发布订阅模式

    简述 Redis可作为消息队列中间件使用,属于发布订阅模式 消息队列可以实现【系统解耦】、【异步处理】、【流量削峰...

  • Redis消息队列&发布订阅模式使用

    Redis实现消息队列原理 Redis的列表是使用双向链表实现的,保存了头尾节点,在列表头尾两边插取元素是非常...

  • Redis消息队列(Message queues)

    Redis消息队列(Message queues) 场景 + 发布者订阅者模式:发布者生产消息放到队列里,多个监听...

  • redis实现MQTT模型订阅分布

    订阅发布其实也是属于一种特殊的消息队列Queue机制,与redis的普通队列list区别在于,如果订阅端存在多个消...

  • Kafka 与 RabbitMQ 如何做选择

    前言 我们在工作中经常会用到异步消息,主要使用两种消息模式: 消息队列 发布/订阅 消息队列:多个生产者可以向同一...

  • 一文带你了解 Redis 的发布与订阅的底层原理

    01、前言 发布订阅系统在我们日常的工作中经常会使用到,这种场景大部分情况我们都是使用消息队列的,常用的消息队列有...

网友评论

      本文标题:如何使用消息队列发布与订阅【易扩展】

      本文链接:https://www.haomeiwen.com/subject/pkxrcctx.html