上次的学习
1.了解了MessageQueue运用场景结合handler、looper使用。
2.单向链表的基本运用,MessageQueue中enqueueSyncBarrier和removeSyncBarrier方法有实际运用。
3.同步方式,同步块,功能和逻辑分开,尽量提高性能和可读性。
4.同步分割栏(消息屏障),意识到项目中有很多延迟的处理可以用这个方法取代。
LinkedBlockingQueue 是一个阻塞队列,结合线程池使用
源码:
//序列号
private static final long serialVersionUID = -6903933977591709194L;
serialVersionUID的作用
简单来说,Java的序列化机制是通过在运行时判断类的serialVersionUID来验证版本一致性的。
在进行反序列化时,JVM会把传来的字节流中的serialVersionUID与本地实体类的serialVersionUID进行比较
serialVersionUID有两种显示的生成方式:
1.默认的1L,比如:private static final long serialVersionUID = 1L;
2.根据类名、接口名、成员方法及属性等来生成一个64位的哈希字段,比如:private static final long serialVersionUID = xxxxL;
//所有的元素都通过Node这个静态内部类来进行存储
static class Node<E> {
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;
Node(E x) { item = x; }
}
//阻塞队列所能存储的容量
private final int capacity;
//LinkedBlockingQueue的入队列和出队列使用的是两个不同的lock对象,因此无论是在入队列还是出队列,都会涉及对元素数量的并发修改
private final AtomicInteger count = new AtomicInteger();
transient Node<E> head; //链表的头部
private transient Node<E> last; //链表的尾部
//元素出队列时线程所获取的锁,当执行take、poll等操作时线程需要获取的锁
private final ReentrantLock takeLock = new ReentrantLock();
//当队列为空时,通过该Condition让从队列中获取元素的线程处于等待状态
private final Condition notEmpty = takeLock.newCondition();
//元素入队列时线程所获取的锁,当执行add、put、offer等操作时线程需要获取锁
private final ReentrantLock putLock = new ReentrantLock();
//当队列的元素已经达到capactiy,通过该Condition让元素入队列的线程处于等待状态
private final Condition notFull = putLock.newCondition();
总结:从上面我们可以发现LinkedBlockingQueue在入队列和出队列时使用的不是同一个Lock,这也意味着它们之间的操作不会存在互斥操作。它们可以做到真正的在同一时刻既消费、又生产,能够做到并行处理。
//通过对应的方法唤醒获取元素的线程
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
//作用传进来的node = 下一个 再等于最后一个 最终放到最后面?
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
// 让头部元素出队列的过程,往头部移,移到头部置空,为什么不往尾部移?
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
//加锁和解锁
/**
* Locks to prevent both puts and takes.
*/
void fullyLock() {
putLock.lock();
takeLock.lock();
}
/**
* Unlocks to allow both puts and takes.
*/
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
//设置默认为最大值
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
//小于等于0报异常,头和尾都置空
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
//在初始化LinkedBlockingQueue的时候,还可以直接将一个集合,中的元素全部入队列,此时队列最大容量依然是int的最大值。
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
//将指定元素插入到此队列的尾部,如有必要等待空间变得可用。
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
网友评论