1 场景
生产者/消费者
是java中的一种线程模型,用来保证同一个资源同一个时刻
只被一个生产者或者一个消费者访问。
生产者
向存储空间放入数据
,消费者
从存储空间拿出数据
。
存储空间为空
,消费者
阻塞等待。
存储空间已满
,生产者
阻塞等待。
2 wait/notify实现
借助在synchronized
块中使用wait()/notifyAll()
的方式来实现线程的休眠
和唤醒
。
synchronized只支持非公平锁
。
2.1 代码
如下代码,存储空间
大小为3,生产者、消费者各执行5次。
通过Thread.sleep(xx),让生产者的速度比消费者的速度快
些,仅测试用。
- 定义生产者消费者
import java.util.concurrent.LinkedBlockingQueue;
/**
* 生产者消费者命名空间
* <p>
* wait:告诉当前线程,释放锁,然后开始睡眠等待,此时的状态为Watting,直到有线程进入一样的监视器调用notify或者notifyAll唤醒它
* notify:随机唤醒一个在一样的对象监视器上等待的线程(notify()非常容易导致死锁)
* notifyAll:唤醒所有的在一样对象监视器上等待的线程
**/
public class WaitNotityContext {
/**
* 队列大小
*/
private static final Integer MAX_SIZE = 3;
/**
* 队列
*/
private static final LinkedBlockingQueue<String> QUEUE = new LinkedBlockingQueue<>(MAX_SIZE);
/**
* 生产者
*/
public static class Producter implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
// 测试使用(让生产者生产快一些)
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
String value = "{" + Thread.currentThread().getName() + "生产" + (i + 1) + "}";
synchronized (QUEUE) {
while (QUEUE.size() >= MAX_SIZE) {
System.out.println("【" + Thread.currentThread().getName() + "】:队列已满,阻塞" + ",pool size :" + QUEUE.size());
try {
// 让出当前锁,让其他线程可以拿到锁
QUEUE.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
QUEUE.add(value);
System.out.println("【" + Thread.currentThread().getName() + "】:放入队列内容:" + value + ",pool size :" + QUEUE.size());
// 唤醒全部wait状态的线程
QUEUE.notifyAll();
}
}
}
}
/**
* 消费者
*/
public static class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
// 测试使用(让生产者生产快一些)
try {
Thread.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (QUEUE) {
while (QUEUE.size() <= 0) {
System.out.println("【" + Thread.currentThread().getName() + "】:队列为空,阻塞" + ",pool size :" + QUEUE.size());
try {
QUEUE.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
String value = QUEUE.remove();
System.out.println("【" + Thread.currentThread().getName() + "】:拿出队列内容:" + value + ",pool size :" + QUEUE.size());
// 唤醒全部wait状态的线程
QUEUE.notifyAll();
}
}
}
}
}
- 使用
public static void main(String[] args) {
// 初始化
Consumer consumer = new Consumer();
Producter producter = new Producter();
// 启动生产者
new Thread(producter, "producter01").start();
new Thread(producter, "producter02").start();
// 启动消费者
new Thread(consumer, "consumer01").start();
new Thread(consumer, "consumer02").start();
}
2.2 结果
输出结果如下:
【producter02】:放入队列内容:{producter02生产1},pool size :1
【producter01】:放入队列内容:{producter01生产1},pool size :2
【consumer01】:拿出队列内容:{producter02生产1},pool size :1
【producter01】:放入队列内容:{producter01生产2},pool size :2
【producter02】:放入队列内容:{producter02生产2},pool size :3
【consumer02】:拿出队列内容:{producter01生产1},pool size :2
【producter02】:放入队列内容:{producter02生产3},pool size :3
【producter01】:队列已满,阻塞,pool size :3
【consumer02】:拿出队列内容:{producter01生产2},pool size :2
【producter01】:放入队列内容:{producter01生产3},pool size :3
【consumer01】:拿出队列内容:{producter02生产2},pool size :2
【producter02】:放入队列内容:{producter02生产4},pool size :3
【producter02】:队列已满,阻塞,pool size :3
【producter01】:队列已满,阻塞,pool size :3
【consumer01】:拿出队列内容:{producter02生产3},pool size :2
【producter01】:放入队列内容:{producter01生产4},pool size :3
【producter02】:队列已满,阻塞,pool size :3
【consumer02】:拿出队列内容:{producter01生产3},pool size :2
【producter02】:放入队列内容:{producter02生产5},pool size :3
【producter01】:队列已满,阻塞,pool size :3
【consumer01】:拿出队列内容:{producter02生产4},pool size :2
【producter01】:放入队列内容:{producter01生产5},pool size :3
【consumer02】:拿出队列内容:{producter01生产4},pool size :2
【consumer01】:拿出队列内容:{producter02生产5},pool size :1
【consumer02】:拿出队列内容:{producter01生产5},pool size :0
3 ReetranLock实现
ReetranLock是Java中JUC
并发包 中的可重入锁
,支持公平锁
和非公平锁
,默认为非公平锁
。其使用方式如下:
// 定义锁
ReentrantLock reentrantLock = new ReentrantLock();
// 获取锁
reentrantLock.lock();
try {
// 业务逻辑......
}finally {
// 释放锁
reentrantLock.unlock();
}
3.1 代码
- 定义生产者消费者
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* ReetranLock生产者消费者命名空间
*/
public class ReentrantLockContext {
/**
* 队列大小
*/
private static final Integer MAX_SIZE = 3;
/**
* 队列
*/
private static final LinkedBlockingQueue<String> QUEUE = new LinkedBlockingQueue<>(MAX_SIZE);
/**
* 锁
*/
private static final Lock LOCK = new ReentrantLock();
/**
* 等待队列
*/
private static final Condition CONDITION = LOCK.newCondition();
/**
* 生产者
*/
public static class Producter implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
// 测试使用(让生产者生产快一些)
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
String value = "{" + Thread.currentThread().getName() + "生产" + (i + 1) + "}";
// 获得锁。如果锁不可用,则当前线程将被禁用以进行线程调度,并处于休眠状态,直到获得锁为止。
LOCK.lock();
try {
while (QUEUE.size() >= MAX_SIZE) {
System.out.println("【" + Thread.currentThread().getName() + "】:队列已满,阻塞" + ",pool size :" + QUEUE.size());
// 使当前线程等待,直到被唤醒或中断
CONDITION.await();
}
QUEUE.add(value);
System.out.println("【" + Thread.currentThread().getName() + "】:放入队列内容:" + value + ",pool size :" + QUEUE.size());
// 唤醒所有等待的线程
CONDITION.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 释放锁
LOCK.unlock();
}
}
}
}
/**
* 消费者
*/
public static class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
// 测试使用(让生产者生产快一些)
try {
Thread.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
LOCK.lock();
try {
while (QUEUE.size() <= 0) {
System.out.println("【" + Thread.currentThread().getName() + "】:队列为空,阻塞" + ",pool size :" + QUEUE.size());
try {
// 使当前线程等待,直到被唤醒或中断
CONDITION.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
String value = QUEUE.remove();
System.out.println("【" + Thread.currentThread().getName() + "】:拿出队列内容:" + value + ",pool size :" + QUEUE.size());
// 唤醒所有等待的线程
CONDITION.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
LOCK.unlock();
}
}
}
}
}
- 使用
public static void main(String[] args) {
// 初始化
Consumer consumer = new Consumer();
Producter producter = new Producter();
// 启动生产者
new Thread(producter, "producter01").start();
new Thread(producter, "producter02").start();
// 启动消费者
new Thread(consumer, "consumer01").start();
new Thread(consumer, "consumer02").start();
}
3.2 结果
【producter01】:放入队列内容:{producter01生产1},pool size :1
【producter02】:放入队列内容:{producter02生产1},pool size :2
【consumer02】:拿出队列内容:{producter01生产1},pool size :1
【producter01】:放入队列内容:{producter01生产2},pool size :2
【consumer01】:拿出队列内容:{producter02生产1},pool size :1
【producter02】:放入队列内容:{producter02生产2},pool size :2
【producter01】:放入队列内容:{producter01生产3},pool size :3
【producter02】:队列已满,阻塞,pool size :3
【consumer02】:拿出队列内容:{producter01生产2},pool size :2
【producter01】:放入队列内容:{producter01生产4},pool size :3
【consumer01】:拿出队列内容:{producter02生产2},pool size :2
【producter02】:放入队列内容:{producter02生产3},pool size :3
【producter02】:队列已满,阻塞,pool size :3
【producter01】:队列已满,阻塞,pool size :3
【consumer02】:拿出队列内容:{producter01生产3},pool size :2
【consumer01】:拿出队列内容:{producter01生产4},pool size :1
【producter02】:放入队列内容:{producter02生产4},pool size :2
【producter01】:放入队列内容:{producter01生产5},pool size :3
【producter02】:队列已满,阻塞,pool size :3
【consumer02】:拿出队列内容:{producter02生产3},pool size :2
【consumer01】:拿出队列内容:{producter02生产4},pool size :1
【producter02】:放入队列内容:{producter02生产5},pool size :2
【consumer02】:拿出队列内容:{producter01生产5},pool size :1
【consumer01】:拿出队列内容:{producter02生产5},pool size :0
网友评论