消息队列的需求
1.消息保序
虽然消费者是异步处理消息,但是,消费者仍然需要按照生产者发送消息的顺序来处理消息,避免后发送的消息被先处理了。对于要求消息保序的场景来说,一旦出现这种消息被乱序处理的情况,就可能会导致业务逻辑被错误执行,从而给业务方造成损失。
2.处理重复的消息
消费者从消息队列读取消息时,有时会因为网络堵塞而出现消息重传的情况。此时,消费者可能会收到多条重复的消息。对于重复的消息,消费者如果多次处理的话,就可能造成一个业务逻辑被多次执行,如果业务逻辑正好是要修改数据,那就会出现数据被多次修改的问题了。
3.保证消息可靠性
消费者在处理消息的时候,还可能出现因为故障或宕机导致消息没有处理完成的情况。此时,消息队列需要能提供消息可靠性的保证,也就是说,当消费者重启后,可以重新读取消息再次进行处理,否则,就会出现消息漏处理的问题了。
基于List实现
消息保序解决方案
List 本身就是按先进先出的顺序对数据进行存取的,所以,如果使用 List 作为消息队列保存消息的话,就已经能满足消息保序的需求了。
处理重复的消息解决方案
生产者提供一个消息id,消费者要把已经处理过的消息 ID 号记录下来。当收到一条消息后,消费者程序就可以对比收到的消息 ID 和记录的已处理过的消息 ID,来判断当前收到的消息有没有经过处理。如果已经处理过,那么,消费者程序就不再进行处理了。
保证消息可靠性解决方案
为了留存消息,List 类型提供了 BRPOPLPUSH 命令,这个命令的作用是让消费者程序从一个 List 中读取消息,同时,Redis 会把这个消息再插入到另一个 List(可以叫作备份 List)留存。这样一来,如果消费者程序读了消息但没能正常处理,等它重启后,就可以从备份 List 中重新读取消息并进行处理了。
提高消费消息性能
在生产者往 List 中写入数据时,List 并不会主动地通知消费者有新消息写入,如果消费者想要及时处理消息,就需要在程序中不停地调用 RPOP 命令(比如使用一个 while(1) 循环)。如果有新消息写入,RPOP 命令就会返回结果,否则,RPOP 命令返回空值,再继续循环。所以,即使没有新消息写入 List,消费者也要不停地调用 RPOP 命令,这就会导致消费者程序的 CPU 一直消耗在执行 RPOP 命令上,带来不必要的性能损失。为了解决这个问题,Redis 提供了 BRPOP 命令。BRPOP 命令也称为阻塞式读取,客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据。和消费者程序自己不停地调用 RPOP 命令相比,这种方式能节省 CPU 开销。
消费者实现
import com.alibaba.fastjson.JSON;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.HashSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@Component
public class RedisQueueTask {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
// @PostConstruct 该注解被用来修饰一个非静态的void方法。该方法会在服务器加载Servlet的时候运行,并且只会被服务器执行一次。
@PostConstruct
public void consumeMessage() {
// 创建一个单线程的执行器
ExecutorService executorService = Executors.newSingleThreadExecutor();
// 放入循环消费消息的任务
executorService.execute(() -> {
// 存储已处理过的消息id,这里不考虑历史id占用内存大小问题
HashSet<String> processed = new HashSet<>();
// 循环出队
while (true) {
try {
// 读取备份list,存在则先消费备份list
HashMap<String, String> map = (HashMap<String, String>) redisTemplate.opsForList()
.rightPop("backup-mp");
if (map == null) {
// 使用BRPOP命令进行阻塞式读取,这里没有读到队列数据时阻塞10s,超时或者读取到数据后,再开始下一轮读取
// 同时将map存入备份list中,防止由于故障或宕机而造成消息丢失
map = (HashMap<String, String>) redisTemplate.opsForList()
.rightPopAndLeftPush("messageQueue", "backup-mp", 10, TimeUnit.SECONDS);
}
// 无消息时,开始下一轮
if (map == null) {
continue;
}
// 重复消息直接丢弃
if (processed.contains(map.get("messageId"))) {
// 备份list出队
redisTemplate.opsForList().rightPop("backup-mp");
continue;
}
// 进行业务处理
System.out.println("消费消息:" + JSON.toJSONString(map));
// 记录已消费的messageId
processed.add(map.get("messageId"));
// 备份list出队
redisTemplate.opsForList().rightPop("backup-mp");
} catch (Exception e) {
// 异常捕获,防止循环因异常停止
e.printStackTrace();
}
}
});
}
}
生产者实现
import com.alibaba.fastjson.JSON;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.RedisTemplate;
import java.util.HashMap;
import java.util.List;
@SpringBootTest
public class MyTest {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* redis 发送消息
*/
@Test
void test2() {
for (int i = 0; i < 10; i++) {
HashMap<String, String> map = new HashMap<>();
// 生产者设置消息id,用来防止消费者重复消费
map.put("messageId", String.valueOf(i));
map.put("userId", "user-id-" + i);
// 发送消息
redisTemplate.opsForList().leftPush("messageQueue", map);
System.out.println("发送消息:" + JSON.toJSONString(map));
// 制造 messageId 为 6 7 8 9 的重复消息
if (i > 5) {
redisTemplate.opsForList().leftPush("messageQueue", map);
System.out.println("发送消息:" + JSON.toJSONString(map));
}
}
}
}
测试结果
发送消息日志
image
消费消息日志
image
基于Streams实现
Streams 是 Redis 专门为消息队列设计的数据类型,它提供了丰富的消息队列操作命令。
XADD:插入消息,保证有序,可以自动生成全局唯一 ID;
XREAD:用于读取消息,可以按 ID 读取数据;
XREADGROUP:按消费组形式读取消息;
XPENDING :XPENDING 命令可以用来查询每个消费组内所有消费者已读取但尚未确认的消息,
XACK:XACK 命令用于向消息队列确认消息处理已完成。
RedisTemplate操作Streams
import com.alibaba.fastjson.JSON;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.RedisTemplate;
import java.util.HashMap;
import java.util.List;
@SpringBootTest
public class RedisMqTest {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 初始化 消息队列
* XADD 添加消息到末尾
* XGROUP CREATE 创建消费者组
* XDEL 删除消息
*/
@Test
void initMessageQueue() {
HashMap<String, String> pram = new HashMap<>();
pram.put("name", "init");
RecordId recordId = redisTemplate.opsForStream().add("integral-queue", pram);
redisTemplate.opsForStream().createGroup("integral-queue", "integral-group");
redisTemplate.opsForStream().delete("integral-queue", recordId);
}
/**
* 发送消息
* XADD 添加消息到末尾
*/
@Test
void sendMessage() {
HashMap<String, String> pram = new HashMap<>();
pram.put("name", "加积分");
redisTemplate.opsForStream().add("integral-queue", pram);
}
/**
* 删除消息
* XDEL 删除消息
*/
@Test
void delMessage() {
RecordId recordId = RecordId.of("1608255921129-0");
redisTemplate.opsForStream().delete("integral-queue", recordId);
}
/**
* 根据is获取消息
* XREAD 以阻塞或非阻塞方式获取消息列表
*/
@Test
void getMessage() {
Range<String> idRange = Range.just("1608254725290-0");
List<MapRecord<String, Object, Object>> range = redisTemplate.opsForStream().range("integral-queue", idRange);
System.out.println(JSON.toJSONString(range));
}
/**
* 获取未消费的消息
* XREAD 以阻塞或非阻塞方式获取消息列表
*/
@Test
void getMessageQueue() {
ReadOffset offset = ReadOffset.from("0-0");
StreamOffset<String> streamOffset = StreamOffset.create("integral-queue", offset);
List<MapRecord<String, Object, Object>> read = redisTemplate.opsForStream().read(streamOffset);
System.out.println(JSON.toJSONString(read));
}
/**
* 获取已读取但未消费的消息
* XPENDING 显示待处理消息的相关信息
* XREAD 以阻塞或非阻塞方式获取消息列表
*/
@Test
void getPendingQueue() {
PendingMessagesSummary pending = redisTemplate.opsForStream().pending("integral-queue", "integral-group");
Range<String> idRange = pending.getIdRange();
List<MapRecord<String, Object, Object>> range = redisTemplate.opsForStream().range("integral-queue", idRange);
System.out.println(JSON.toJSONString(range));
}
/**
* 将消息标记为"已处理"
* XACK 将消息标记为"已处理"
*/
@Test
void ack() {
RecordId id = RecordId.of("1608255921129-0");
redisTemplate.opsForStream().acknowledge("integral-queue", "integral-group", id);
}
}
消费者实现
监听器
import com.alibaba.fastjson.JSON;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
public class IntegralRedisStreamsListener implements StreamListener<String, MapRecord<String, String, String>> {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Override
public void onMessage(MapRecord<String, String, String> message) {
// 获取参数
String stream = message.getStream();
RecordId id = message.getId();
Map<String, String> map = message.getValue();
// 执行业务逻辑
System.out.println(stream + ":" + id.toString() + ":" + JSON.toJSONString(map));
// 消费成功确认,消息删除和消息确认是一个事务
redisTemplate.multi();
redisTemplate.opsForStream().delete("integral-queue", id);
redisTemplate.opsForStream().acknowledge("integral-group", message);
redisTemplate.exec();
}
}
监听器容器
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer.StreamMessageListenerContainerOptions;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.time.Duration;
@Component
public class StreamConsumerRunner implements ApplicationRunner, DisposableBean {
@Autowired
private RedisConnectionFactory redisConnectionFactory;
@Autowired
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Autowired
private IntegralRedisStreamsListener integralRedisStreamsListener;
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer;
@Override
public void run(ApplicationArguments args) {
// 初始化 streamMessageListenerContainer
this.init();
// 使用监听容器对象开始监听消费(使用的是手动确认方式),自动ack使用receiveAutoAck方法
this.streamMessageListenerContainer.receive(
Consumer.from("integral-group", "consumer-1"),
StreamOffset.create("integral-queue", ReadOffset.lastConsumed()),
this.integralRedisStreamsListener
);
// 启动监听
this.streamMessageListenerContainer.start();
}
@Override
public void destroy() {
this.streamMessageListenerContainer.stop();
}
private void init() {
// 创建配置对象
StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> streamMessageListenerContainerOptions = StreamMessageListenerContainerOptions
.builder()
// 一次性最多拉取多少条消息
.batchSize(10)
// 执行消息轮询的执行器
.executor(this.threadPoolTaskExecutor)
// 消息消费异常的handler
.errorHandler(t -> {
// StreamListener中的异常会在这里抛出
System.out.println("errorHandler: " + t.getMessage());
})
// 超时时间,设置为0,表示不超时(超时后会抛出异常)
.pollTimeout(Duration.ZERO)
.build();
this.streamMessageListenerContainer = StreamMessageListenerContainer
.create(this.redisConnectionFactory, streamMessageListenerContainerOptions);
}
}
测试
1.初始化
初始化消息队列(integral-queue)和消费者组(integral-group),执行RedisMqTest.initMessageQueue()即可
2.启动消费者程序
3.调用RedisMqTest.sendMessage()发送消息
4.消费者程序成功接收到消息并进行业务处理
image5.消息去重解决方案未实现
可参考List实现的处理方式
6.消息可靠性解决方案未实现
可使用RedisMqTest.getPendingQueue()获取未成功消费的消息队列进行处理
网友评论