美文网首页
利用redis做一个简单的消息队列

利用redis做一个简单的消息队列

作者: guessguess | 来源:发表于2020-08-23 19:46 被阅读0次

    下面先贴pom文件的配置

    <project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.gee</groupId>
        <artifactId>sb-demo</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.0.5.RELEASE</version>
        </parent>
        <dependencies>
            <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <!-- 开启热部署 -->
            <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-devtools -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-devtools</artifactId>
                <optional>true</optional>
            </dependency>
    
            <!-- json -->
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.28</version>
            </dependency>
    
            <!-- 单元测试 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
            </dependency>
            
        </dependencies>
    
        <build>
            <plugins>
                <!-- 指定maven编译的jdk的版本 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                        <encoding>UTF-8</encoding>
                    </configuration>
                </plugin>
            
    
                <!-- 打包成springboot专用的jar包,指定入口信息等等 -->
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
    
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-surefire-plugin</artifactId>
                    <configuration>
                        <skipTests>true</skipTests>
                    </configuration>
                </plugin>
    
            </plugins>
        </build>
    </project>
    

    配置文件

    spring.redis.database=0
    spring.redis.host=127.0.0.1
    spring.redis.port=6379
    spring.redis.timeout=3000
    
    spring.redis.pool.max-idle=200
    
    spring.redis.pool.min-idle=200
    
    spring.redis.pool.max-active=2000
    
    spring.redis.pool.max-wait=1000
    

    相关的实体类

    public class TaskItem<T> {
        public String taskId;
        public T msg;
        public String getTaskId() {
            return taskId;
        }
        public void setTaskId(String taskId) {
            this.taskId = taskId;
        }
        public T getMsg() {
            return msg;
        }
        public void setMsg(T msg) {
            this.msg = msg;
        }
        @Override
        public String toString() {
            return "TaskItem [taskId=" + taskId + ", msg=" + msg + "]";
        }
        
    }
    
    public class User implements Serializable {
        /**
         * 
         */
        private static final long serialVersionUID = 1L;
        private Integer id;
        private String username;// 用户姓名
        private String sex;// 性别
        private Date birthday;// 生日
        private String address;// 地址
    
        public Integer getId() {
            return id;
        }
        public void setId(Integer id) {
            this.id = id;
        }
        public String getUsername() {
            return username;
        }
        public void setUsername(String username) {
            this.username = username;
        }
        public String getSex() {
            return sex;
        }
        public void setSex(String sex) {
            this.sex = sex;
        }
        public Date getBirthday() {
            return birthday;
        }
        public void setBirthday(Date birthday) {
            this.birthday = birthday;
        }
        public String getAddress() {
            return address;
        }
        public void setAddress(String address) {
            this.address = address;
        }
        @Override
        public String toString() {
            return "User [id=" + id + ", username=" + username + ", sex=" + sex
                    + ", birthday=" + birthday + ", address=" + address + "]";
        }
    }
    
    

    消费者的接口,用于我是使用一个线程作为消费者,重复从队列中获取消息,因此结构如下

    public interface RedisQueueConsumer<T> extends Runnable{
        
        public void consumer();
        
        public void handle(TaskItem<T> t);
        
    }
    
    

    生产者,对于生产者来说功能都是一样的,因此没必要实现接口。

    public class RedisDelayQueue<T> {
        
        private StringRedisTemplate strRedisTemplate;
        
        private String queueKey;
        //作为分数,分数小的优先消费。
        private AtomicInteger actomicInteger = new AtomicInteger(0);
        //实现消息队列需要三个参数,生产者,消费者,以及队列。
        public RedisDelayQueue(StringRedisTemplate strRedisTemplate, String queueKey, RedisQueueConsumer<T> consumer) {
            this.strRedisTemplate = strRedisTemplate;
            this.queueKey = queueKey;
            new Thread(consumer).start();
        }
        
         //统一的生产入口。
        public boolean addTask(TaskItem<T> taskItem) {
            String value = JSON.toJSONString(taskItem);
            boolean addResult = strRedisTemplate.opsForZSet().add(this.queueKey, value, actomicInteger.getAndAdd(1));
            System.out.println("task = " + value + "  addResult = " + addResult);
            return addResult;
        }
    }
    

    最终的消息队列实现类。

    package com.gee.configuration;
    
    import java.lang.reflect.Type;
    import java.util.Set;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.util.CollectionUtils;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.TypeReference;
    import com.gee.entity.TaskItem;
    import com.gee.entity.User;
    import com.gee.redis.RedisDelayQueue;
    import com.gee.redis.RedisQueueConsumer;
    
    @Configuration
    public class RedisQueueConfiguration {
        
        @Autowired
        private StringRedisTemplate redisTemplate;
        
        @Bean
        public RedisDelayQueue<User> redisDelayQueue() {
            //设置队列的名称,以及每次处理的数量,以及范围
            String queueKey = "test-queue";
            double min = 0;
            double max =  Double.MAX_VALUE;
            int startIndex = 0;
            int limit = 1;
            return new RedisDelayQueue<User>(redisTemplate, queueKey, new RedisQueueConsumer<User>() {
                
                @Override
                public void run() {
                    consumer();
                }
                
                @Override
                public void consumer() {
                    Type TaskType = new TypeReference<TaskItem<User>>() { }.getType();
                    while(true) {
                        Set<String> values = redisTemplate.opsForZSet().rangeByScore(queueKey, min, max, startIndex, limit);
                        if(CollectionUtils.isEmpty(values)) {
                            try {
                                Thread.sleep(100);
                            } catch (InterruptedException e) {
                                break;
                            }
                        }
                        if(values.iterator().hasNext()) {
                            String value = values.iterator().next();
                            if(redisTemplate.opsForZSet().remove(queueKey, value) > 0) {
                                TaskItem task = JSON.parseObject(value, TaskType);
                                handle(task);
                            }
                        }
                    }
                    
                }
    
                @Override
                public void handle(TaskItem<User> t) {
                    System.out.println(t.toString());
                }
            });
        }
    }
    
    

    测试方法

    public class TestRedis extends ConfigTest{
        
        @Test
        public void testRedis() {
            RedisDelayQueue<User> redisDelayQueue = (RedisDelayQueue) ApplicationContextUtils.APP_CONTEXT.getBean("redisDelayQueue");
            TaskItem<User> taskItem = new TaskItem<User>();
            taskItem.setTaskId(UUID.randomUUID().toString());
            User user = new User();
            user.setUsername("username");
            user.setId(100);
            taskItem.setMsg(user);
            
            TaskItem<User> taskItem2 = new TaskItem<User>();
            taskItem2.setMsg(user);
            
            redisDelayQueue.addTask(taskItem);
            redisDelayQueue.addTask(taskItem2);
    
            
            try {
                Thread.sleep(100000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
    

    最终输出结果

    task = {"msg":{"id":100,"username":"username"},"taskId":"551e5636-6857-487a-85be-c5d7ec680ad4"}  addResult = true
    task = {"msg":{"id":100,"username":"username"}}  addResult = true
    TaskItem [taskId=551e5636-6857-487a-85be-c5d7ec680ad4, msg=User [id=100, username=username, sex=null, birthday=null, address=null]]
    TaskItem [taskId=null, msg=User [id=100, username=username, sex=null, birthday=null, address=null]]
    

    相关文章

      网友评论

          本文标题:利用redis做一个简单的消息队列

          本文链接:https://www.haomeiwen.com/subject/iqcwjktx.html