最近在阅读《多处理器编程艺术》一书,掌握了很多Java多线程的底层知识,现在就做一下书中链表-锁的作用一章的总结。
为了节约你的时间,本文主要内容如下:
- 带锁的链表队列
- 细粒度同步
- 乐观同步
- 惰性同步
- 非阻塞同步
粗粒度同步
所谓粗粒度同步其实很简单,就是在List的add
,remove
,contains
函数的开始就直接使用Lock加锁,然后在函数结尾释放。
add
函数的代码如下所示,函数的主体就是链表的遍历添加逻辑,只不过在开始和结束进行了锁的获取和释放。
private Node head;
private Lock lock = new ReentrantLock();
public boolean add(T item) {
Node pred, curr;
int key = item.hashCode();
lock.lock();
try {
pred = head;
curr = pred.next;
while(curr.key < key) {
pred = curr;
curr = pred.next;
}
if (key == curr.key) {
return false;
} else {
Node node = new Node(item);
node.next = curr;
pred.next = node;
return true;
}
} finally {
lock.unlock();
}
}
大家看到这里就会想到,这不就是类似于Hashtable
的实现方式吗?把可能出现多线程问题的函数都用一个重入锁锁住。但是这个方法的缺点很明显,如果竞争激烈的话,对链表的操作效率会很低,因为add
,remove
,contains
三个函数都需要获取锁,也都需要等待锁的释放。至于如何优化,我们可以一步一步往下看
细粒度同步
我们可以通过锁定单个节点而不是整个链表来提高并发。给每个节点增加一个Lock变量以及相关的lock()和unlock()函数,当线程遍历链表的时候,若它是第一个访问节点的线程,则锁住被访问的节点,在随后的某个时刻释放锁。这种细粒度的锁机制允许并发线程以流水线的方式遍历链表。
使用这种方式来遍历链表,必须同时获取两个相邻节点的锁,通过“交叉手”的方式来获取锁:除了初始的head哨兵节点外,只有在已经获取pred的锁时,才能获取curr的锁。
//每个Node对象中都有一个Lock对象,可以进行lock()和unlock()操作
public boolean add(T item) {
int key = item.hashCode();
head.lock();
Node pred = head;
try {
Node curr = pred.next;
curr.lock();
try {
while (curr.key < key) {
pred.unlock();
pred = curr;
curr = pred.next;
curr.lock();
}
if (curr.key == key) {
return false;
}
Node newNode = new Node(item);
newNode.next = curr;
pred.next = newNode;
return true;
} finally {
curr.unlock();
}
} finally {
pred.unlock();
}
}
乐观同步
虽然细粒度锁是对单一粒度锁的一种改进,但它仍然出现很长的获取锁和释放锁的序列。而且,访问链表中不同部分的线程仍然可能相互阻塞。例如,一个正在删除链表中第二个元素的线程将会阻塞所有试图查找后继节点的线程。
减少同步代价的一种方式就是乐观:不需要获取锁就可以查找,对找到的节点进行加锁,然后确认锁住的节点是正确的;如果一个同步冲突导致节点被错误的锁定,则释放这些锁重新开始。
public boolean add(T item) {
int key = item.hashCode();
while (true) { //如果不成功,就进行重试
Node pred = head;
Node curr = pred.next;
while (curr.key < key) {
pred = curr;
curr = pred.next;
}
//找到目标相关的pred和curr之后再将二者锁住
pred.lock();
curr.lock();
try {
//锁住二者之后再进行判断,是否存在并发冲突
if (validate(pred, curr)) {
//如果不存在,那么就直接进行正常操作
if (curr.key == key) {
return false;
} else {
Node node = new Node(item);
node.next = curr;
pred.next = node;
}
}
} finally {
pred.unlock();
curr.unlock();
}
}
}
public boolean validate(Node pred, Node curr) {
//从队列头开始查找pred和curr,判断是否存在并发冲突
Node node = head;
while (node.key <= pred.key) {
if (node == pred) {
return pred.next == curr;
}
node = node.next;
}
return false;
}
由于不再使用能保护并发修改的锁,所以每个方法调用都可能遍历那些已经被删除的节点,所以在进行添加,删除获取判断是否存在的之前必须再次进行验证。
惰性同步
当不用锁遍历两次链表的代价比使用锁遍历一次链表的代价小很多时,乐观同步的实现效果非常好。但是这种算法的缺点之一就是contains()方法在遍历时需要锁,这一点并不令人满意,其原因在于对contains()的调用要比其他方法的调用频繁得多。
使用惰性同步的方法,使得contains()调用是无等待的,同时add()和remove()方法即使在被阻塞的情况下也只需要遍历一次链表。
对每个节点增加一个布尔类型的marked域,用于说明该节点是否在节点集合中。现在,遍历不再需要锁定目标结点,也没有必须通过重新遍历整个链表来验证结点是否可达。所有未被标记的节点必然是可达的。
//add方法和乐观同步的方法一致,只有检验方法做了修改。
//只需要检测节点的marked变量就可以,并且查看pred的next是否还是指向curr,需要注意的是marked变量一定是voliate的。
private boolean validate(Node pred, Node curr) {
return !pred.marked && !curr.marked && pred.next == curr;
}
惰性同步的优点之一就是能够将类似于设置一个flag这样的逻辑操作与类似于删除结点的链接这种对结构的物理改变分开。通常情况下,延迟操作可以是批量处理方式进行,且在某个方便的时候再懒惰地进行处理,从而降低了对结构进行物理修改的整体破裂性。惰性同步的主要缺点是add()和remove()调用是阻塞的:如果一个线程延迟,那么其他线程也将延迟。
非阻塞同步
使用惰性同步的思维是非常有益处的。我们可以进一步将add(),remove()和contains()这三个方法都变成非阻塞的。前两个方法是无锁的,最后一个方法是无等待的。我们无法直接使用compareAndSet()来改变next域来实现,因为这样会出现问题。但是我们可以将结点的next域和marked域看作是单个的原子单位:当marked域为true时,对next域的任何修改都将失败。
我们可以使用AtomicMarkableReference<T>对象将指向类型T的对象引用next和布尔值marked封装在一起。这些域可以一起或单个地原子更新。可以让每个结点的next域为一个AtomicMarkableReference<Node>。线程可以通过设置结点next域中的标记位来逻辑地删除curr,和其他正在执行add()和remove()的线程共享物理删除:当每个线程遍历链表时,通过物理删除所有被标记的节点来清理链表。
public Window find(Node head, int key) {
Node pred = null, curr = null, succ = null;
boolean[] marked = {false};
boolean snip;
retry: while(true) {
pred = head;
curr = curr.next.get(marked);
while(true) {
succ = curr.next.get(marked); //获取succ,并且查看是被被标记
while (marked[0]) {//如果被标记了,说明curr被逻辑删除了,需要继续物理删除
snip = pred.next.compareAndSet(curr, succ, false, false);//
if (!snip) continue retry;
curr = succ;
succ = curr.next.get(marked);
}
//当不需要删除后,才继续遍历
if (curr.key >= key) {
return new Window(pred, curr);
}
pred = curr;
curr = succ;
}
}
}
public boolean add(T item) {
int key = item.hashCode();
while(true) {
Window window = find(head, key);
Node pred = window.pred, curr = window.curr;
if (curr.key == key) {
return false;
} else {
Node node = new Node(item);
node.next = new AtomicMarkableReference<>(curr, false);
if (pred.next.compareAndSet(curr, node, false, false)) {
return true;
}
}
}
}
public boolean remove(T item) {
int key = item.hashCode();
boolean sinp;
while(true) {
Window window = find(head, key);
Node pred = window.pred, curr = window.curr;
if (curr.key != key) {
return false;
} else {
Node succ = curr.next.getReference();
//要进行删除了,那么就直接将curr.next设置为false,然后在进行真正的物理删除。
sinp = curr.next.compareAndSet(curr, succ, false, true);
if (!sinp) {
continue;
}
pred.next.compareAndSet(curr, succ, false, false);
return true;
}
}
}
class Node {
AtomicMarkableReference<Node> next;
}
后记
文中的代码在我的github的这个repo中都可以找到。
网友评论