一、前言
上篇 https://www.jianshu.com/p/6402676abc86 文章讲解了一个定时生产消费时候消费队列里面最多有几个元素的问题。本文来探讨另外一个问题,由于生产和消费线程执行的不确定性,会产生当生产线程t1时间投递任务到队列后,消费线程可能在t1+1左右时候才会开始消费其中的一个队列,也就是生产与消费之间会有1s时间的的间隔,那么有没有办法保证生产线程t1时间投递完毕后,消费线程能在接近于t1时刻就开始消费那?
二、问题出现
image.pngt1时刻生产线程投递一个元素分别到队列1,2,假如t1+0.001时刻投递元素到第3个队列,而消费线程正好在t1+0.0001时刻刚刚处理完队列3发现队列为空后就放弃当前1s的执行,那么消费线程会在t1+0.0001+1时刻消费第一个队列的元素。到这里生成线程明明是t1时刻放入元素到队列,而消费线程却在1s后才开始处理。
三、改进方案
消费线程在执行当前1s的任务时候如果发现当前队列为空,则去看下一个队列,直到有一个队列不为空为止。这时候有可能在1s内队列都为空,那么定时消费线程的下一个定时任务会被延迟,不过没关系。但是这会导致消费线程同1s内可能消费了两个队列。那么有没有更好的方法那?
既然要求生产后马上消费那自然会想到通知等待模型,也就是消费线程一开始阻塞,当生产线程把元素放入队列后,发送通知激活消费线程,那么直接用notify,wait?不不,并发包里面有个Semaphore可以做这个事情:
- 生产线程放入元素到三个队列后,调用semaphore.release(3);释放3个信号量,内部信号量计数器值递增3;
- 消费线程则使用如下结构:
static volatile int curTagIndex = 0;
public static void consume() {
while (!Thread.currentThread().isInterrupted()) {
try {
//(1)获取一个信号量,内部计数器会减轻1
semaphore.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
int seconds = new Date().getSeconds();
ArrayBlockingQueue<String> curTagQueue = tagQueue.get(curTagIndex);
System.out.println(j.join("timer curqueue ", curTagIndex, curTagQueue.poll(), curTagQueue.size(), System.currentTimeMillis()));
//休眠1s
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
}
curTagIndex++;
if (curTagIndex >= TAG_QUEUE_SIZE) {
curTagIndex = 0;
}
}
}
四、总结
定时生产消费模型一般使用的比较多,但是消费与生产直接可能会存在大概1s的延迟,一旦延迟产生,那么后面每个元素的消费都会进行延迟。而基于信号量的生成消费模型,使用通知等待,当元素生成后就通知消费者来消费,这在一定程度上减少了延迟。
作者:加多
网友评论