美文网首页
实现发布订阅的三种方式

实现发布订阅的三种方式

作者: 钟离惜 | 来源:发表于2021-04-02 11:44 被阅读0次

    一、Redis

    Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息。

    Redis 客户端可以订阅任意数量的频道。

    redis 127.0.0.1:6379> SUBSCRIBE runoobChat
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"    # 返回值的类型:显示订阅成功
    2) "runoobChat"    # 订阅的频道
    3) (integer) 1    # 目前已订阅的模式的数量
    

    现在,我们先重新开启个 redis 客户端,然后在同一个频道 runoobChat 发布两次消息,订阅者就能接收到消息。

    redis 127.0.0.1:6379> PUBLISH runoobChat "Redis PUBLISH test"
    (integer) 1
    redis 127.0.0.1:6379> PUBLISH runoobChat "Learn redis by runoob.com"
    (integer) 1
    
    # 订阅者的客户端会显示如下消息
    1) "message"    # 返回值的类型:信息
    2) "runoobChat"    # 信息本身的目标频道
    3) "Redis PUBLISH test"    # 信息的内容
     1) "message"
    2) "runoobChat"
    3) "Learn redis by runoob.com"
    

    二、RabbitMQ

    RabbitMQ的发布订阅使用交换机类型:Fanout,也称为广播。
    声明Exchange,不再声明Queue。发送消息到Exchange,不再发送到Queue。

      private static String EXCHANGE_NAME="hello111";
        @Autowired
        AmqpTemplate amqpt;
    
        @RequestMapping(value="test")
        public void test() throws InterruptedException{
    
            for(int i=0;i<10;i++){
                Thread.sleep(i*20);
                amqpt.convertAndSend(EXCHANGE_NAME,"fanout", "hello word"+i);
            }
        System.err.println("消息发送成功");
        }
    

    在生产者声明Exchange,EXCHANGE_NAME="hello111",使用Fanout类型的交换机,这里不声明Queue。
    在消费者指定接收的交换器名字。

    @Configuration
    public class Confi {
          private static String EXCHANGE_NAME="hello111";
          //声明两个队列
        @Bean
        public Queue queue(){
            return new Queue("hello");
        }
    
        @Bean
        public Queue queue1(){
            return new Queue("hello1");
        }
        //声明一个fanout的交换机
        @Bean
        public FanoutExchange fanoutExchange(){
            return new FanoutExchange(EXCHANGE_NAME);   
        }
        //将队列和交互机进行绑定
        @Bean
        public Binding bindfanout(Queue queue,FanoutExchange fanoutExchange){
            return BindingBuilder.bind(queue).to(fanoutExchange);
    
        }
    
        @Bean
        public Binding bindfanout1(Queue queue1,FanoutExchange fanoutExchange){
            return BindingBuilder.bind(queue1).to(fanoutExchange);
    
        }
    }
    
    @Service
    @RabbitListener(queues="hello") //这个队列就是我们之前定义的队列名称,监听hello队列的消息
    public class Receive1 {
    
        @RabbitHandler 
        public void receive(String message) throws InterruptedException{
    
            System.err.println("1:收到消息"+message);
    
        }
    }
    
    @Service
    @RabbitListener(queues="hello1")
    public class Receive2 {
    
        @RabbitHandler
        public void receive(String message) throws InterruptedException{
            System.err.println("2收到消息:"+message);
        }
    }
    

    三、ZooKeeper

    ZooKeeper本身有发布订阅机制,当某个Znode发生变化时,所有监听这个Znode的客户端都会受到消息。

    先创建一个Znode:


    然后启动两个ZooKeeper客户端,主要监听代码如下:

    import java.util.concurrent.CountDownLatch;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.Watcher.Event.EventType;
    import org.apache.zookeeper.Watcher.Event.KeeperState;
    import org.apache.zookeeper.ZooKeeper;
    import org.apache.zookeeper.data.Stat;
     
    /**
     * 分布式配置中心demo
     * @author 
     *
     */
    public class ZooKeeperProSync implements Watcher {
     
        private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
        private static ZooKeeper zk = null;
        private static Stat stat = new Stat();
     
        public static void main(String[] args) throws Exception {
            //zookeeper配置数据存放路径
            String path = "/username";
            //连接zookeeper并且注册一个默认的监听器
            zk = new ZooKeeper("192.168.31.100:2181", 5000, //
                    new ZooKeeperProSync());
            //等待zk连接成功的通知
            connectedSemaphore.await();
            //获取path目录节点的配置数据,并注册默认的监听器
            System.out.println(new String(zk.getData(path, true, stat)));
     
            Thread.sleep(Integer.MAX_VALUE);
        }
     
        public void process(WatchedEvent event) {
            if (KeeperState.SyncConnected == event.getState()) {  //zk连接成功通知事件
                if (EventType.None == event.getType() && null == event.getPath()) {
                    connectedSemaphore.countDown();
                } else if (event.getType() == EventType.NodeDataChanged) {  //zk目录节点数据变化通知事件
                    try {
                        System.out.println("配置已修改,新值为:" + new String(zk.getData(event.getPath(), true, stat)));
                    } catch (Exception e) {
                    }
                }
            }
        }
    }
    

    两个程序启动后都正确的读取到了zookeeper的/username目录节点下的数据'qingfeng'。

    然后在zookeeper里修改下目录节点/username下的数据:


    修改完成后,我们看见两个监听客户端都及时收到了他们监听的目录节点数据变更后的值,如下所示:


    四、三种发布订阅的对比

    Redis的PUBLISH和SUBSCRIBE的缺陷在于客户端必须一直在线才能接收到消息,断线可能会导致客户端丢失消息,除此之外,旧版的redis可能会由于订阅者消费不够快而变的不稳定导致崩溃,甚至被管理员杀掉。
    Redis订阅发布简单,轻量级,延迟比较低,适合业务量不大非核心的一些订阅功能。

    RabbitMQ应该是最合适也是最可靠的发部订阅方案。
    RabbitMQ有发送方确认模式、接收方确认机制、消息持久化等机制确保方案的可行性。

    ZooKeeper不适宜做发部订阅,虽然本质上zookeeper=文件系统+监听通知机制,但是主要应用场景如下图:


    转载文章
    Redis 发布订阅
    rabbitmq-----发布订阅模式
    Zookeeper入门看这篇就够了
    Zookeeper典型应用场景篇

    相关文章

      网友评论

          本文标题:实现发布订阅的三种方式

          本文链接:https://www.haomeiwen.com/subject/vipfkltx.html