-##### ArrayBlockingQueue
package com.byedbl.queue;
import java.io.Serializable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class MyArrayBlockingQueue<E> implements Serializable {
private static final long serialVersionUID = 102615140181641502L;
private final E[] items;
private int takeIndex;
/** items index for next put, offer, or add. */
private int putIndex;
private final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
private int count;
public MyArrayBlockingQueue(int capacity) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = (E[]) new Object[capacity];
lock = new ReentrantLock();
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public boolean add(E e) {
if (offer(e)) {
return true;
} else {
throw new IllegalStateException("Queue full");
}
}
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
lock.lock();
try {
if (count == items.length)
return false;
else {
insert(e);
return true;
}
} finally {
lock.unlock();
}
}
// 循环增加
private final int inc(int i) {
return (++i == items.length) ? 0 : i;
}
private void insert(E e) {
items[putIndex] = e;
putIndex = inc(putIndex);
++count;
// 唤醒非空线程
notEmpty.signal();
}
private E extract() {
final E[] items = this.items;
E e = items[takeIndex];
items[takeIndex] = null;
takeIndex = inc(takeIndex);
--count;
notFull.signal();
return e;
}
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null)
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 添加新元素
if (count != items.length) {
insert(e);
return true;
}
// 没时间了就返回false
if (nanos <= 0) {
return false;
}
// 等待
try {
// 返回的nanos值会减去传入的nanos值,所以基本等一次就会为<=0了
nanos = notFull.awaitNanos(nanos);
} catch (InterruptedException e1) {
notFull.signal();
throw e1;
}
}
} finally {
lock.unlock();
}
}
public void put(E e) throws InterruptedException {
if (e == null)
throw new NullPointerException();
final E[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (count == items.length) {
notFull.await();
}
} catch (InterruptedException e1) {
notFull.signal();
throw e1;
}
insert(e);
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (count == 0) {
notEmpty.await();
}
} catch (InterruptedException e1) {
notEmpty.signal();
throw e1;
}
E e = extract();
return e;
} finally {
lock.unlock();
}
}
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == 0) {
return null;
}
return extract();
} finally {
lock.unlock();
}
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
if (count == 0) {
try {
nanos = notEmpty.awaitNanos(nanos);
} catch (InterruptedException e) {
notEmpty.signal();
throw e;
}
}
if (nanos <= 0) {
return null;
}
return extract();
}
} finally {
lock.unlock();
}
}
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : items[takeIndex];
} finally {
lock.unlock();
}
}
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
public boolean remove(E e) {
if (e == null)
throw new NullPointerException();
final E[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = takeIndex;
int k = 0;
for (;;) {
if (k++ >= count)
return false;
if (e.equals(items[i])) {
removeAt(i);
return true;
}
i = inc(i);
}
} finally {
lock.unlock();
}
}
private void removeAt(int i) {
final E[] items = this.items;
if(i == takeIndex){
items[takeIndex] = null;
takeIndex = inc(takeIndex);
}else{
for(;;){
int nexti = inc(i);
if(nexti != putIndex){
items[i] = items[nexti];
i = nexti;
}else{
items[i] = null;
putIndex = i;
break;
}
}
}
--count;
notFull.signal();
}
public boolean contains(Object o) {
if (o == null) return false;
final E[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = takeIndex;
int k = 0;
while (k++ < count) {
if (o.equals(items[i]))
return true;
i = inc(i);
}
return false;
} finally {
lock.unlock();
}
}
public String toString() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return super.toString();
} finally {
lock.unlock();
}
}
}
-
测试
package com.byedbl.queue;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class MyArrayBlockingQueueTest {
public static void main(String[] args) throws InterruptedException {
final ReentrantLock lock = new ReentrantLock();
final Condition producer = lock.newCondition();
final Condition consumer = lock.newCondition();
final MyArrayBlockingQueue<Integer> queue = new MyArrayBlockingQueue<Integer>(2);
// for(int i=0;i<6;i++){
// queue.add(i);
// }
for (int i = 0; i < 2; i++) {
queue.add(i);
}
Thread c = new Thread(new Runnable() {
@Override
public void run() {
for(;;){
lock.lock();
try{
if(queue.size()>0){
Integer value = queue.poll();
System.out.println("poll a value:"+value);
producer.signal();
try {
consumer.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}finally{
lock.unlock();
}
}
}
});
c.start();
Thread p = new Thread(new Runnable() {
@Override
public void run() {
for(;;){
int value = new Random().nextInt(100);
lock.lock();
try{
if (!queue.contains(value)) {
queue.add(value);
System.out.println("add a value:"+value);
consumer.signal();
try {
producer.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}finally{
lock.unlock();
}
}
}
});
p.start();
Thread.sleep(50000);
System.out.println(queue.size());
System.out.println(queue.toString());
}
}
网友评论