背景
- 线程批次概念主要作用于消息消费的过程,在PMQ项目是比较好的设计功能点之一,可以借鉴学习在日后工作中。
概念
- 在PMQ中,消息消费默认是采用多线程进行消费,所以偏移量的提交是一个很复杂的问题。试想一下,在单个队列里面会出现多个线程消费,当个某个线程中消息id比较大的消息消费完了,如果此时提交此大id的偏移量,就可能会出现消息id小的消息还没有消费完的情况。如果此时出现系统宕机,这个时候就会产生部分消息丢失的情况。为了保证消息不丢失,必须要最小化偏移量提交的方式,这样子可以保证偏移量之前的消息都被客户端消费到。那在PMQ中是如何做到最小化提交呢?一种方式是当调度线程启动多线程消费时,会等待多线程全部执行完毕后,再提交偏移量,这样可以可以保证偏移量之前的消息都被消费到,不管成功还是失败。但是这种方式会有一个问题,就是如果出现部分线程消费很慢,部分线程消费很快,如果这时采用等待全部消费完毕再提交,然后再进行下一次的消费的方式,会导致线程池资源浪费。所以在PMQ中,在上面这种方式的基础上引入了线程批次的概念。
- 线程批次的意思是,当线程调度器每次检查线程池中有空闲的线程和缓冲队列里面有数据就会产生新的批次消费线程组,并将此线程批次保存到一个列表中。每个批次线程组都有一个自增编号,当批次线程组中的线程都执行完成时,就表示当前批次线程组执行完毕,并记录好当前批次线程组的偏移量。然后再在批次线程组列表中,检查是否有连续执行完毕的批次线程组,直到遇到未执行完毕的批次线程组为止。找到此批次线程组后,就可以提交此队列的偏移量。这种方式的好处是,不用等待线程池所有的线程都执行完毕。只要线程池中还有空闲的资源就可以进行新的消费,提高了消费速度。
- 上述简单举例是:假设当前线程开始消费,先查询线程池中有多少个空闲的线程,假设空闲线程有5个,则从message中拉取5条消息来消费,这个5条消息结合成[批次3]。当[批次3]中完成消费的消息条数达到5时,则表示本批次消费完成。此处开始从0批次遍历,如果[批次3]之前的[批次2]或者[批次1]仍未提交,那么[批次3]仍不commit。只有当[批次3]之前不存在未提交批次时才可以commit。
代码流程
- (1)获取线程池中空闲线程的个数,其中ConsumerQueueDto默认线程是10(MqQueueExcutorService.java)
//线程批次入口
private void doHandleData(ConsumerQueueDto pre, int msgSize) {
//消费线程默认大小 - 核心线程活跃个数 = 核心线程空闲格式
int threadSize = pre.getThreadSize() + 2 - executor.getActiveCount();
int startThread = (int) ((msgSize + pre.getConsumerBatchSize() - 1) / pre.getConsumerBatchSize());
//限制下每个批次最大数量也就是pre的默认值10
if (startThread >= threadSize) {
startThread = threadSize;
}
if (startThread > pre.getThreadSize()) {
startThread = pre.getThreadSize();
}
//获取起进程批次号,转【2】
long batchRecorderId = batchRecorder.begin(startThread);
CountDownLatch countDownLatch = new CountDownLatch(startThread);
//批量的进行消息消费,转【3】
batchExcute(pre, startThread, batchRecorderId, countDownLatch);
try {
countDownLatch.await();
} catch (InterruptedException e) {
}
}
- (2)获取进程批号(MqQueueExcutorService.java)
//在内存中缓存进程批次和详情
public class BatchRecorder {
//线程安全的hashMap
public Map<Long, MqQueueExcutorService.BatchRecorderItem> recordMap = new ConcurrentHashMap<Long, MqQueueExcutorService.BatchRecorderItem>();
// 记录最小的线程批次编号
private volatile long start = 0L;
// 记录当前开启的线程批次编号
private volatile long current = 0L;
private Object lockObject = new Object();
//线程批次current自增
public long begin(int threadCount) {
current++;
MqQueueExcutorService.BatchRecorderItem batchRecorderItem = new MqQueueExcutorService.BatchRecorderItem();
batchRecorderItem.batchReacorderId = current;
batchRecorderItem.threadCount = threadCount;
recordMap.put(current, batchRecorderItem);
//返回本次线程批次号
return current;
}
}
- (3)批量消息消费,对消息列表进行遍历,执行MsgThread中的run方法(MqQueueExcutorService.java)
public class MsgThread implements Runnable {
......
/***
* @author: kantlin
* @date: 2021/10/28 13:47
* @description:【消费者message流程】 执行消费线程中的run方法
*/
@Override
public void run() {
......
MqQueueExcutorService.BatchRecorderItem batchRecorderItem = null;
long maxId = 0;
try {
if (isRunning && checkOffsetVersion(pre)) {
//从message队列中拉取消息进行消费,并记录下最大消息的maxId,转【4】
maxId = threadExcute(pre);
//更新pre中maxId的值
updateOffset(pre, maxId);
}
} catch (Exception e) {
}
//根据batchRecorderItem的值是否为null在判断可否提交,转【6】
batchRecorderItem = batchRecorder.end(batchRecorderId, maxId);
if (batchRecorderItem != null && iAsynSubscriber == null) {
//若batchRecorderItem 不为空则执行提交逻辑,转【8】
doCommit(pre, batchRecorderItem);
}
......
}
}
- (4)拉取待消费消息并获取消息最大Id,之后遍历列表执行真正的消费操作(MqQueueExcutorService.java)
protected long threadExcute(ConsumerQueueDto pre) {
if (isRunning && (iSubscriber != null || iAsynSubscriber != null)) {
......
//拉取待消费的消息并获取最大的Id,转【5】
MqQueueExcutorService.Pair<Long, Boolean> pair = prepareValue(pre, messageMap);
long maxId = pair.item1;
......
//从
if (messageMap.size() > 0) {
......
//消费消息
List<Long> failIds = invokeMessage(pre, messageMap);
......
return maxId;
} else {
......
}
}
return 0;
}
- (5)获取消息的最大Id(MqQueueExcutorService.java)
protected MqQueueExcutorService.Pair<Long, Boolean> prepareValue(ConsumerQueueDto pre, Map<Long, MessageDto> messageMap) {
......
while (count < pre.getConsumerBatchSize()) {
//从message队列拉取消息,数量为批次大小
MessageDto messageDto = messages.poll();
if (isRunning && messageDto != null && checkOffsetVersion(pre)) {
if (onMsgFilter(messageDto)) {
if (checkTag(pre, messageDto)) {
if (checkDelay(messageDto, pre))
if (checkRetryCount(messageDto, pre)) {
......
}
}
}
//前后对比获取拉取消息的最大ID
maxId = maxId < messageDto.getId() ? messageDto.getId() : maxId;
// flag = true;
pair.item1 = maxId;
pair.item2 = true;
}
count++;
}
return pair;
}
- (6)判断本批次是否可提交(MqQueueExcutorService.java)
public MqQueueExcutorService.BatchRecorderItem end(long batchReacorderId, long maxId) {
MqQueueExcutorService.BatchRecorderItem finishedItem = recordMap.get(batchReacorderId);
if (finishedItem == null) {
return null;
}
//本批次完成数+1
int count = finishedItem.counter.incrementAndGet();
//排它锁更新操作
synchronized (lockObject) {
//更新本次批次消费信息的最大maxId
if (finishedItem.maxId < maxId) {
finishedItem.maxId = maxId;
}
//判断是否本次已执行完成
if (!finishedItem.batchFinished) {
finishedItem.batchFinished = count == finishedItem.threadCount;
}
}
//若已完成还得看是否为可提交批次,转【7】
if (finishedItem.batchFinished) {
MqQueueExcutorService.BatchRecorderItem rs = getLastestItem();
return rs;
}
return null;
}
- (7)若批次执行完成,则判断是否为可提交批次(MqQueueExcutorService.java)
public MqQueueExcutorService.BatchRecorderItem getLastestItem() {
MqQueueExcutorService.BatchRecorderItem finishedItem = null;
boolean rs = false;
//若首次提交start从0开始,后续start值等于最小提交批次
for (long i = start + 1; i <= current; i++) {
finishedItem = recordMap.get(i);
if (finishedItem == null) {
continue;
}
//若当前i !=current,且i批次未完成,则current还不可提交
if (!finishedItem.batchFinished) {
break;
} else {
rs = true;
}
}
//current为最新可提交批次了,则返回为not null
if (!rs) {
finishedItem = null;
}
return finishedItem;
}
- (8)开始执行doCommit方法进行提交(MqQueueExcutorService.java)
private void doCommit(ConsumerQueueDto temp, MqQueueExcutorService.BatchRecorderItem batchRecorderItem) {
if (batchRecorderItem == null)
return;
CommitOffsetRequest request = new CommitOffsetRequest();
if (checkOffsetVersion(temp)) {
List<ConsumerQueueVersionDto> queueVersionDtos = new ArrayList<>();
request.setQueueOffsets(queueVersionDtos);
ConsumerQueueVersionDto consumerQueueVersionDto = new ConsumerQueueVersionDto();
......
queueVersionDtos.add(consumerQueueVersionDto);
//调用broke的/commitOffset接口
mqResource.commitOffset(request);
}
batchRecorder.delete(batchRecorderItem.batchReacorderId);
}
//修改start的开始值并remove map中缓存
public void delete(long batchReacorderId) {
long temp = start;
start = batchReacorderId;
for (long i = temp + 1; i <= batchReacorderId; i++) {
recordMap.remove(i);
}
}
网友评论