美文网首页
基于Redission使用Redis的Stream

基于Redission使用Redis的Stream

作者: 小狼在IT | 来源:发表于2019-01-16 16:40 被阅读0次
            redisson = Redisson.create(config);
    
            RStream<String, String> stream = redisson.getStream("test3");
            //初始化,不知为啥,但不这样做create不到group
            stream.add("0","0");
            //创建一个group,一个group需要在stream数据添加前创建,否则这个group只能读它创建以后写入stream的数据
            stream.createGroup("testGroup31");
            //往stream添加消息
            for(Integer i=0;i<30;i++){
                stream.add(i.toString(), i.toString());
            }
    
            //消费消息
            for(Integer i=0;i<6;i++){
                Integer finalI = i;
                Thread t = new Thread( ()->{
                    try {
                        Thread.sleep(1000);
                    }catch (Exception e){
    
                    }
    
                    Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup31", "consumer"+ finalI.toString(),1);
                    if(s!=null && s.size()>0){
                        for (Map.Entry<StreamMessageId, Map<String, String>> entry : s.entrySet()) {
                            Map<String, String> m2 = entry.getValue();
                            for(Map.Entry<String,String> entry1:m2.entrySet()){
                                System.out.println(Thread.currentThread().getName()+" : Key = " + entry1.getKey() + ", Value = " + entry1.getValue());
                            }
                            //消费了消息,要应答一下
                            stream.ack("testGroup31",entry.getKey());
                            //如果消费了消息想删除,可以删除掉
                            //stream.remove(entry.getKey());
                        }
                    }
    
                });
                t.start();
            }
    
    image.png image.png

    这里,有个group名字叫testGroup31,里面有消费者6个,pending代表目前有6个数据被读取了,但没有ack。last_delivered-id代表这个group目前读到哪条消息。

    相关文章

      网友评论

          本文标题:基于Redission使用Redis的Stream

          本文链接:https://www.haomeiwen.com/subject/lbimdqtx.html