使用redis的队列实现接口排队
在调用接口时将线程号(多实例的情况下得用uuid,线程号可能会重复)存入redis队列,查询队首线程号(uuid)如果是当前线程,则执行逻辑、出队,否则等待。
-
使用到的redis命令有:
- RPUSH key value1 [value2] 入队
- LPOP key 出队
- LRANGE key start stop 查询队首元素
-
增加一个注解
@QueuingPoll
来标记要排队的接口
/**
* @author Jenson
*/
public @interface QueuingPoll {
}
- controller 需要排队队接口
加了个sleep延长接口返回的时间以便看效果
@GetMapping("/slowly")
@QueuingPoll
public String slowlyInterface(@RequestParam Integer tenantId,
@RequestParam Integer time) throws InterruptedException {
Thread currentThread = Thread.currentThread();
String threadName = currentThread.getName();
System.out.println(threadName + " 开始休眠: " + time);
Thread.sleep(time);
System.out.println(threadName + " 休眠结束: " + time);
return threadName;
}
- 使用切面来拦截接口,实现接口排队
/**
* @author Jenson
*/
@Aspect
@Component
public class QueuingPollAspect {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Around(value = "@annotation(com.jenson.annotation.QueuingPoll)")
public Object translateReturning(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
String tenantId = "null";
if (attributes != null) {
HttpServletRequest request = attributes.getRequest();
tenantId = request.getParameter("tenantId");
}
// Thread currentThread = Thread.currentThread();
// 线程 ID 是唯一的,并且在其生命周期内保持不变。 当一个线程终止时,这个线程 ID 可能会被重用。
// String threadId = String.valueOf(currentThread.getId());
// 在多实例多情况下线程ID可能会导致重复,所以使用UUID
String uuid = UUID.randomUUID().toString().replaceAll("-", "");
// 相同的租户放入同一个redis队列里,实现同租户串行不同的租户并行
String key = "jenson:list-thread:" + tenantId;
redisTemplate.opsForList().rightPush(key, uuid);
boolean waitFlag = Boolean.TRUE;
while (waitFlag) {
waitFlag = Boolean.FALSE;
// System.out.println(threadName + "轮询查看是否轮到自己");
List<String> top = redisTemplate.opsForList().range(key, 0, 0);
if (top != null && top.size() > 0) {
// redis 里有数据
if (!uuid.equals(top.get(0))) {
// 队列顶部不是该接口,线程等待
waitFlag = Boolean.TRUE;
}
}
if (waitFlag) {
// 根据接口执行平均时长来适度调整休眠时间,休眠时会让出cpu给其他的线程
Thread.sleep(100);
}
}
Object result = proceedingJoinPoint.proceed();
// 执行结束,推出队列顶端元素
redisTemplate.opsForList().leftPop(key);
return result;
}
@AfterThrowing(value = "@annotation(com.jenson.annotation.QueuingPoll)", throwing = "e")
public void throwingAdvice(JoinPoint joinPoint, Exception e) {
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
String tenantId = "null";
if (attributes != null) {
HttpServletRequest request = attributes.getRequest();
tenantId = request.getParameter("tenantId");
}
String key = "jenson:list-thread:" + tenantId;
// 抛出错误时也要推出队列顶端元素,否则后面的接口就堵死了
redisTemplate.opsForList().leftPop(key);
}
}
- 测试,先调用接口延时10s再调用接口演示100ms
http-nio-8010-exec-1 开始休眠: 10000
http-nio-8010-exec-1 休眠结束: 10000
http-nio-8010-exec-4 开始休眠: 100
http-nio-8010-exec-4 休眠结束: 100
后调用的接口后执行了,如果不加该注解的情况如下,执行快的先执行完:
http-nio-8010-exec-2 开始休眠: 10000
http-nio-8010-exec-4 开始休眠: 100
http-nio-8010-exec-4 休眠结束: 100
http-nio-8010-exec-2 休眠结束: 10000
- 但是这种办法有很大风险,在服务重启或停机时,如果redis队列中有数据,会导致服务重启后接口一直等待,所以在启动项目时将队列清空。
/**
* @author Jenson
*/
@Configuration
public class InitInterfaceQueueConfig {
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**
* 在启动项目时清空redis队列
* <p>
* PostConstruct注解的方法将会在依赖注入完成后被自动调用。
* PostConstruct是spring框架的注解,在方法上加该注解会在项目启动的时候执行该方法,也可以理解为在spring容器初始化的时候执行该方法。
*/
@PostConstruct
public void init() {
Set<String> keys = redisTemplate.keys("jenson:list-thread:" + "*");
System.out.println(keys);
if (keys != null && keys.size() > 0) {
redisTemplate.delete(keys);
System.out.println("删除redis命名空间 jenson:list-thread 成功...");
}
}
}
简化问题
假如不考虑多租户和多实例的情况,就有很多种实现方式
使用公平锁
/**
* @author Jenson
*/
@Aspect
@Component
public class Queuing1Aspect {
/**
* 重入锁,公平锁
*/
public static ReentrantLock lock = new ReentrantLock(true);
@Around(value = "@annotation(com.jenson.annotation.Queuing1)")
public Object translateReturning(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
lock.lock();
try {
Object result = proceedingJoinPoint.proceed();
return result;
} catch (Exception e) {
throw e;
}
finally {
lock.unlock();
System.out.println("-------------unlock-----------");
}
}
}
使用并发队列
- ConcurrentLinkedQueue
使用到的方法有offer
,poll
和peek
ConcurrentLinkedQueue<String> concurrentLinkedQueue = new ConcurrentLinkedQueue<String>();
// add 函数内部调用的offer,和offer等价
concurrentLinkedQueue.add("test1");
concurrentLinkedQueue.offer("test2");
System.out.println(concurrentLinkedQueue.poll());
System.out.println(concurrentLinkedQueue.poll());
// 如果队列中没有元素,会返回null
System.out.println(concurrentLinkedQueue.poll());
System.out.println("-----------------------------------------------");
concurrentLinkedQueue.add("test1");
concurrentLinkedQueue.offer("test2");
// peek只查看队首元素,但不移除队首元素
System.out.println(concurrentLinkedQueue.peek());
System.out.println(concurrentLinkedQueue.poll());
System.out.println(concurrentLinkedQueue.poll());
// peek 队列为空返回null
System.out.println(concurrentLinkedQueue.peek());
输出如下
test1
test2
null
-----------------------------------------------
test1
test1
test2
null
实现如下:
/**
* @author Jenson
*/
@Aspect
@Component
public class Queuing2Aspect {
public static final ConcurrentLinkedQueue<String> concurrentLinkedQueue = new ConcurrentLinkedQueue<String>();
@Around(value = "@annotation(com.jenson.annotation.Queuing2)")
public Object translateReturning(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
Thread currentThread = Thread.currentThread();
String threadId = String.valueOf(currentThread.getId());
concurrentLinkedQueue.offer(threadId);
boolean waitFlag = Boolean.TRUE;
while (waitFlag) {
waitFlag = Boolean.FALSE;
String top = concurrentLinkedQueue.peek();
if (top != null ) {
// 队列中有数据
if (!threadId.equals(top)) {
// 队列顶部不是该接口,线程等待
waitFlag = Boolean.TRUE;
}
}
if (waitFlag) {
// 根据接口执行平均时长来适度调整休眠时间,休眠时会让出cpu给其他的线程
Thread.sleep(100);
}
}
Object result = proceedingJoinPoint.proceed();
// 执行结束,推出队列顶端元素
concurrentLinkedQueue.poll();
return result;
}
@AfterThrowing(value = "@annotation(com.jenson.annotation.Queuing2)", throwing = "e")
public void throwingAdvice(JoinPoint joinPoint, Exception e) {
// 抛出错误时也要推出队列顶端元素,否则后面的接口就堵死了
concurrentLinkedQueue.poll();
}
}
- BlockingQueue
使用到LinkedBlockingQueue
这个实现类,使用到的方法有offer
,poll
和peek
// BlockingQueue 是个接口,实现中有 ArrayBlockingQueue 和 LinkedBlockingQueue
// ArrayBlockingQueue是基于数组实现但有界队列
// LinkedBlockingQueue 是链表实现的,可做无界队列
LinkedBlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<String>();
// 入队
try {
// 队列已满时会阻塞
linkedBlockingQueue.put("test1");
} catch (InterruptedException e) {
e.printStackTrace();
}
// 如果可以在不超过队列容量的情况下立即在此队列的尾部插入指定的元素,则在成功时返回true ,如果队列已满则返回false
linkedBlockingQueue.offer("test2");
// 出队
try {
// 出队,队列为空会阻塞
System.out.println(linkedBlockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
// peek只查看队首元素,但不移除队首元素
System.out.println(linkedBlockingQueue.peek());
// 出队,队列为空返回null
System.out.println(linkedBlockingQueue.poll());
// peek 队列为空返回null
System.out.println(linkedBlockingQueue.peek());
输出如下:
test1
test2
test2
null
实现方式和使用ConcurrentLinkedQueue方式一样,BlockingQueue关键在于take
方法,当队列为空时,take会阻塞,适合实现生产者消费者模式。
看一下take的源码,当队列为空,调用的是await,当队列不为空时,出队,再判断队列还不为空,唤醒其他take的线程

再看一下offer的源码,当队列不为空,唤醒take的线程


网友评论