在并发编程中,阻塞队列非常重要,所谓的阻塞队列就是支持两个附加操作的队列.这两个附加操作是阻塞的插入与移除.阻塞的插入是指当队列满的之后,后续的插入线程会被阻塞直到队列不满,阻塞的移除是指当队列为空时后续的移除线程会被阻塞直接队列非空.
这里面就需要用到线程间的等待/通知,通常有两种实现,一种是synchronized,wait(),noftify()结合使用,一种是Lock,Condition结合使用.比如:ArrayBlockingQueue就是使用的第二种方式. 这种更灵活些.
那借助阻塞队列的思想也可以实现一个阻塞的list.
下面会通过两种方式实现,最后通过生产者与消费者这个场景来进行测试.
第一种:
package concurrent.aqs;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* 实现阻塞的arrylist
*
* 1.通过 synchronized, wait(),notify()实现等待通知模型
* 2. 通过 lock, condition 实现等待通知模型
*
*/
public class BlockList<T> {
private static int size = 3;
private List<T> list;
public BlockList() {
if (size <= 0)
throw new IllegalArgumentException();
list = new ArrayList(size);
}
public void put(T t) {
if (null == t) {
throw new NullPointerException();
}
synchronized (BlockList.class) {
while (size == list.size()) {
try {
System.out.println("list已满,thread:"+Thread.currentThread().getName());
BlockList.class.wait();
} catch (InterruptedException e) {
System.out.println("put error !!!");
}
}
list.add(t);
//通知
BlockList.class.notify();
}
}
public T get() {
T t;
synchronized (BlockList.class) {
while (0 == list.size()) {
try {
System.out.println("list为空,thread:"+Thread.currentThread().getName());
BlockList.class.wait();
} catch (InterruptedException e) {
System.out.println("get error!!!");
}
}
t = list.remove(0);
//通知
BlockList.class.notify();
}
return t;
}
public static void main(String[] args) throws InterruptedException {
BlockList blockList = new BlockList();
Thread produce = new Thread(() -> {
for (;;){
int i = 1;
blockList.put(i);
System.out.println("生产者生产:"+ i);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"生产者");
Thread customer = new Thread(() -> {
for (;;){
Object o = blockList.get();
System.out.println("消费者得到:" + o);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"消费者");
Thread monitor = new Thread(() -> {
for (; ; ) {
System.out.println("queue 长度:[" + blockList.list.size() + "] 当前线程为:" + Thread.currentThread().getName());
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "监控线程");
produce.start();
customer.start();
monitor.start();
//阻止主线程先执行完毕,可加可不加
produce.join();
}
}
第二种:
package concurrent.aqs;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BlockListByLock<T>{
private static int size = 10;
private List<T> list;
private Lock lock ;
private Condition fullCondition ;
private Condition emptyCondition;
public BlockListByLock() {
if (size <= 0)
throw new IllegalArgumentException();
list = new ArrayList<>(size);
lock = new ReentrantLock();
fullCondition = lock.newCondition();
emptyCondition = lock.newCondition();
}
public void put(T t) throws InterruptedException {
if (null == t) {
throw new NullPointerException();
}
lock.lockInterruptibly();
try {
while (size == list.size()) {
fullCondition.await();
}
list.add(t);
emptyCondition.signal();
}finally {
//释放锁
lock.unlock();
}
}
public T get() throws InterruptedException {
T t;
lock.lockInterruptibly();
try {
while (0 == list.size()) {
emptyCondition.await();
}
t = list.remove(0);
fullCondition.signal();
}finally {
lock.unlock();
}
return t;
}
public static void main(String[] args) throws InterruptedException {
BlockListByLock blockList = new BlockListByLock();
Thread produce = new Thread(() -> {
for (;;){
int i = 1;
try {
blockList.put(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("生产者生产:"+ i);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"生产者");
Thread customer = new Thread(() -> {
for (;;){
Object o = null;
try {
o = blockList.get();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者得到:" + o);
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"消费者");
Thread monitor = new Thread(() -> {
for (; ; ) {
System.out.println("queue 长度:[" + blockList.list.size() + "] 当前线程为:" + Thread.currentThread().getName());
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "监控线程");
produce.start();
customer.start();
monitor.start();
//阻止主线程先执行完毕,可加可不加
produce.join();
}
}
END
网友评论