美文网首页
利用redis做一个简单的消息队列

利用redis做一个简单的消息队列

作者: guessguess | 来源:发表于2020-08-23 19:46 被阅读0次

下面先贴pom文件的配置

<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.gee</groupId>
    <artifactId>sb-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.5.RELEASE</version>
    </parent>
    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- 开启热部署 -->
        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-devtools -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <optional>true</optional>
        </dependency>

        <!-- json -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.28</version>
        </dependency>

        <!-- 单元测试 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        
    </dependencies>

    <build>
        <plugins>
            <!-- 指定maven编译的jdk的版本 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        

            <!-- 打包成springboot专用的jar包,指定入口信息等等 -->
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <configuration>
                    <skipTests>true</skipTests>
                </configuration>
            </plugin>

        </plugins>
    </build>
</project>

配置文件

spring.redis.database=0
spring.redis.host=127.0.0.1
spring.redis.port=6379
spring.redis.timeout=3000

spring.redis.pool.max-idle=200

spring.redis.pool.min-idle=200

spring.redis.pool.max-active=2000

spring.redis.pool.max-wait=1000

相关的实体类

public class TaskItem<T> {
    public String taskId;
    public T msg;
    public String getTaskId() {
        return taskId;
    }
    public void setTaskId(String taskId) {
        this.taskId = taskId;
    }
    public T getMsg() {
        return msg;
    }
    public void setMsg(T msg) {
        this.msg = msg;
    }
    @Override
    public String toString() {
        return "TaskItem [taskId=" + taskId + ", msg=" + msg + "]";
    }
    
}

public class User implements Serializable {
    /**
     * 
     */
    private static final long serialVersionUID = 1L;
    private Integer id;
    private String username;// 用户姓名
    private String sex;// 性别
    private Date birthday;// 生日
    private String address;// 地址

    public Integer getId() {
        return id;
    }
    public void setId(Integer id) {
        this.id = id;
    }
    public String getUsername() {
        return username;
    }
    public void setUsername(String username) {
        this.username = username;
    }
    public String getSex() {
        return sex;
    }
    public void setSex(String sex) {
        this.sex = sex;
    }
    public Date getBirthday() {
        return birthday;
    }
    public void setBirthday(Date birthday) {
        this.birthday = birthday;
    }
    public String getAddress() {
        return address;
    }
    public void setAddress(String address) {
        this.address = address;
    }
    @Override
    public String toString() {
        return "User [id=" + id + ", username=" + username + ", sex=" + sex
                + ", birthday=" + birthday + ", address=" + address + "]";
    }
}

消费者的接口,用于我是使用一个线程作为消费者,重复从队列中获取消息,因此结构如下

public interface RedisQueueConsumer<T> extends Runnable{
    
    public void consumer();
    
    public void handle(TaskItem<T> t);
    
}

生产者,对于生产者来说功能都是一样的,因此没必要实现接口。

public class RedisDelayQueue<T> {
    
    private StringRedisTemplate strRedisTemplate;
    
    private String queueKey;
    //作为分数,分数小的优先消费。
    private AtomicInteger actomicInteger = new AtomicInteger(0);
    //实现消息队列需要三个参数,生产者,消费者,以及队列。
    public RedisDelayQueue(StringRedisTemplate strRedisTemplate, String queueKey, RedisQueueConsumer<T> consumer) {
        this.strRedisTemplate = strRedisTemplate;
        this.queueKey = queueKey;
        new Thread(consumer).start();
    }
    
     //统一的生产入口。
    public boolean addTask(TaskItem<T> taskItem) {
        String value = JSON.toJSONString(taskItem);
        boolean addResult = strRedisTemplate.opsForZSet().add(this.queueKey, value, actomicInteger.getAndAdd(1));
        System.out.println("task = " + value + "  addResult = " + addResult);
        return addResult;
    }
}

最终的消息队列实现类。

package com.gee.configuration;

import java.lang.reflect.Type;
import java.util.Set;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.util.CollectionUtils;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.gee.entity.TaskItem;
import com.gee.entity.User;
import com.gee.redis.RedisDelayQueue;
import com.gee.redis.RedisQueueConsumer;

@Configuration
public class RedisQueueConfiguration {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    @Bean
    public RedisDelayQueue<User> redisDelayQueue() {
        //设置队列的名称,以及每次处理的数量,以及范围
        String queueKey = "test-queue";
        double min = 0;
        double max =  Double.MAX_VALUE;
        int startIndex = 0;
        int limit = 1;
        return new RedisDelayQueue<User>(redisTemplate, queueKey, new RedisQueueConsumer<User>() {
            
            @Override
            public void run() {
                consumer();
            }
            
            @Override
            public void consumer() {
                Type TaskType = new TypeReference<TaskItem<User>>() { }.getType();
                while(true) {
                    Set<String> values = redisTemplate.opsForZSet().rangeByScore(queueKey, min, max, startIndex, limit);
                    if(CollectionUtils.isEmpty(values)) {
                        try {
                            Thread.sleep(100);
                        } catch (InterruptedException e) {
                            break;
                        }
                    }
                    if(values.iterator().hasNext()) {
                        String value = values.iterator().next();
                        if(redisTemplate.opsForZSet().remove(queueKey, value) > 0) {
                            TaskItem task = JSON.parseObject(value, TaskType);
                            handle(task);
                        }
                    }
                }
                
            }

            @Override
            public void handle(TaskItem<User> t) {
                System.out.println(t.toString());
            }
        });
    }
}

测试方法

public class TestRedis extends ConfigTest{
    
    @Test
    public void testRedis() {
        RedisDelayQueue<User> redisDelayQueue = (RedisDelayQueue) ApplicationContextUtils.APP_CONTEXT.getBean("redisDelayQueue");
        TaskItem<User> taskItem = new TaskItem<User>();
        taskItem.setTaskId(UUID.randomUUID().toString());
        User user = new User();
        user.setUsername("username");
        user.setId(100);
        taskItem.setMsg(user);
        
        TaskItem<User> taskItem2 = new TaskItem<User>();
        taskItem2.setMsg(user);
        
        redisDelayQueue.addTask(taskItem);
        redisDelayQueue.addTask(taskItem2);

        
        try {
            Thread.sleep(100000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

最终输出结果

task = {"msg":{"id":100,"username":"username"},"taskId":"551e5636-6857-487a-85be-c5d7ec680ad4"}  addResult = true
task = {"msg":{"id":100,"username":"username"}}  addResult = true
TaskItem [taskId=551e5636-6857-487a-85be-c5d7ec680ad4, msg=User [id=100, username=username, sex=null, birthday=null, address=null]]
TaskItem [taskId=null, msg=User [id=100, username=username, sex=null, birthday=null, address=null]]

相关文章

网友评论

      本文标题:利用redis做一个简单的消息队列

      本文链接:https://www.haomeiwen.com/subject/iqcwjktx.html