美文网首页程序员今日看点IT在线课程
java自旋锁 去适应不同的CPU处理技术

java自旋锁 去适应不同的CPU处理技术

作者: holly_wang_王小飞 | 来源:发表于2016-12-23 21:38 被阅读0次

    主要的CPU处理技术:多线程、多核心、SMP(Symmetric Multi-Processing对称多处理结构)、NUMA技术、乱序执行、分枝技术、控制器 来自百度百科中央处理器CPU。目前主流的查百度都是说SMP/MPP/NUMA这三种。

    1 SMP (Symmetric Multi Processing),对称多处理系统内有许多紧耦合多处理器,在这样的系统中,所有的CPU共享全部资源,如总线,内存和I/O系统等,操作系统或管理数据库的复本只有一个,这种系统有一个最大的特点就是共享所有资源。多个CPU之间没有区别,平等地访问内存、外设、一个操作系统。操作系统管理着一个队列,每个处理器依次处理队列中的进程。如果两个处理器同时请求访问一个资源(例如同一段内存地址),由硬件、软件的锁机制去解决资源争用问题。Access to RAM is serialized; this and cache coherency issues causes performance to lag slightly behind the number of additional processors in the system.

    SMP.png
    SMP 服务器 CPU 利用率最好的情况是 2 至 4 个 CPU

    2 NUMA(Non-Uniform Memory Access) 由于 SMP 在扩展能力上的限制,人们开始探究如何进行有效地扩展从而构建大型系统的技术, NUMA 就是这种努力下的结果之一。利用 NUMA 技术,可以把几十个 CPU( 甚至上百个 CPU) 组合在一个服务器内。

    NUMA CPU 模块结构

    3 MPP(Massive Parallel Processing) 和 NUMA 不同, MPP 提供了另外一种进行系统扩展的方式,它由多个 SMP 服务器通过一定的节点互联网络进行连接,协同工作,完成相同的任务,从用户的角度来看是一个服务器系统。其基本特征是由多个 SMP 服务器 ( 每个 SMP 服务器称节点 ) 通过节点互联网络连接而成,每个节点只访问自己的本地资源 ( 内存、存储等 ) ,是一种完全无共享 (Share Nothing) 结构,因而扩展能力最好,理论上其扩展无限制,目前的技术可实现 512 个节点互联,数千个 CPU 。目前业界对节点互联网络暂无标准,如 NCR 的 Bynet , IBM 的 SPSwitch ,它们都采用了不同的内部实现机制。但节点互联网仅供 MPP 服务器内部使用,对用户而言是透明的。

    MPP CPU 模块结构

    更多三者的阅读请到SMP、NUMA、MPP体系结构介绍

    MESI(Modified Exclusive Shared Or Invalid)(也称为伊利诺斯协议,是因为该协议由伊利诺斯州立大学提出)是一种广泛使用的支持写回策略的缓存一致性协议,该协议被应用在Intel奔腾系列的CPU中,详见“support the more efficient write-back cache in addition to the write-through cache previously used by the Intel 486 processor
    MESI协议中的状态
    CPU中每个缓存行(caceh line)使用4种状态进行标记(使用额外的两位(bit)表示):
    M: 被修改(Modified)
    该缓存行只被缓存在该CPU的缓存中,并且是被修改过的(dirty),即与主存中的数据不一致,该缓存行中的内存需要在未来的某个时间点(允许其它CPU读取请主存中相应内存之前)写回(write back)主存。当被写回主存之后,该缓存行的状态会变成独享(exclusive)状态。
    E: 独享的(Exclusive)
    该缓存行只被缓存在该CPU的缓存中,它是未被修改过的(clean),与主存中数据一致。该状态可以在任何时刻当有其它CPU读取该内存时变成共享状态(shared)。同样地,当CPU修改该缓存行中内容时,该状态可以变成Modified状态。
    S: 共享的(Shared)
    该状态意味着该缓存行可能被多个CPU缓存,并且各个缓存中的数据与主存数据一致(clean),当有一个CPU修改该缓存行中,其它CPU中该缓存行可以被作废(变成无效状态(Invalid))。
    I: 无效的(Invalid)
    该缓存是无效的(可能有其它CPU修改了该缓存行)。
    操作:
    在一个典型系统中,可能会有几个缓存(在多核系统中,每个核心都会有自己的缓存)共享主存总线,每个相应的CPU会发出读写请求,而缓存的目的是为了减少CPU读写共享主存的次数。
    一个缓存除在Invalid状态外都可以满足cpu的读请求,一个invalid的缓存行必须从主存中读取(变成S或者 E状态)来满足该CPU的读请求。
    一个写请求只有在该缓存行是M或者E状态时才能被执行,如果缓存行处于S状态,必须先将其它缓存中该缓存行变成Invalid状态(也既是不允许不同CPU同时修改同一缓存行,即使修改该缓存行中不同位置的数据也不允许)。该操作经常作用广播的方式来完成,例如:Request For Ownership (RFO)
    缓存可以随时将一个非M状态的缓存行作废,或者变成Invalid状态,而一个M状态的缓存行必须先被写回主存。
    一个处于M状态的缓存行必须时刻监听所有试图读该缓存行相对就主存的操作,这种操作必须在缓存将该缓存行写回主存并将状态变成S状态之前被延迟执行。
    一个处于S状态的缓存行也必须监听其它缓存使该缓存行无效或者独享该缓存行的请求,并将该缓存行变成无效(Invalid)。
    一个处于E状态的缓存行也必须监听其它缓存读主存中该缓存行的操作,一旦有这种操作,该缓存行需要变成S状态。
    对于M和E状态而言总是精确的,他们在和该缓存行的真正状态是一致的。而S状态可能是非一致的,如果一个缓存将处于S状态的缓存行作废了,而另一个缓存实际上可能已经独享了该缓存行,但是该缓存却不会将该缓存行升迁为E状态,这是因为其它缓存不会广播他们作废掉该缓存行的通知,同样由于缓存并没有保存该缓存行的copy的数量,因此(即使有这种通知)也没有办法确定自己是否已经独享了该缓存行。
    从上面的意义看来E状态是一种投机性的优化:如果一个CPU想修改一个处于S状态的缓存行,总线事务需要将所有该缓存行的copy变成invalid状态,而修改E状态的缓存不需要使用总线事务。

    转换图.png

    好了了解了这些概念后可以来测试了,首先实现第一个简单的自旋锁 借助AtomicBoolean

    package com.alibaba.otter.canal.common;
    import java.util.concurrent.atomic.AtomicBoolean;
    public class TASLock { //test and set lock 尝试去set 锁
        AtomicBoolean state = new AtomicBoolean(false);  
        void lock() {
            while (true) {
                 if (state.compareAndSet(state.get(),true)) { 
                     return;
                 }  
            }
        } 
        void unLock() {  
            state.set(false);  
        }  
    }
    

    然后进行优化先取得是否被占用再去执行占用

    package com.alibaba.otter.canal.common;
    import java.util.concurrent.atomic.AtomicBoolean;
    public class TTASLock { // test for first then test set lock
        AtomicBoolean state = new AtomicBoolean(false);
        void lock() {
            while (true) {
                while (state.get()) {
                }
                if (state.compareAndSet(state.get(), true)) {
                    return;
                }
            }
        }
        void unLock() {
            state.set(false);
        }
    }
    

    测试类

    package com.alibaba.otter.canal.common;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import org.junit.After;
    import org.junit.Before;
    import org.junit.Test;
    @SuppressWarnings("restriction")
    public class TASTest extends AbstractZkTest {
         //并发线程数  
        private final static int NTHREAD = 10;  
        //计数  
        private static volatile int count = 0;  
        //最大计数  
        private final static int MAX = 1000000;  
        //TAS锁  
        private static TASLock lock = new TASLock();  
        //TTAS锁  
    //    private static TTASLock lock = new TTASLock();  
        @Before
        public void setUp() {
        }
        @After
        public void tearDown() {
        }
        @Test
        public void testUnsafe() throws InterruptedException {
                final CountDownLatch endGate = new CountDownLatch(1);  
                final CountDownLatch startGate = new CountDownLatch(1);  
                ExecutorService e = Executors.newFixedThreadPool(NTHREAD);  
                for (int i = 0; i < NTHREAD; i++) {  
                    e.execute(new Runnable() {  
                        @Override  
                        public void run() {  
                            try {  
                                startGate.await();  
                                while (true) {  
                                    lock.lock();  
                                    if (count < MAX) {  
                                        count++;  
                                        lock.unLock();  
                                        continue;  
                                    }  
                                    lock.unLock();  
                                    break;  
                                }  
                                endGate.countDown();  
                            } catch (InterruptedException ignored) {  
                            }  
          
                        }  
                    });  
                }  
                long start = System.currentTimeMillis();  
                startGate.countDown();  
                endGate.await();  
                long end = System.currentTimeMillis();  
                long time = end - start;  
                e.shutdown();  
                System.out.print(time);  
            }  
        @Test
        public void testSingleThread() throws InterruptedException {
            long start = System.currentTimeMillis();  
             while (true) {  
                 lock.lock();  
                 if (count < MAX*10) {  
                     count++;  
                     lock.unLock();  
                     continue;  
                 }  
                 lock.unLock();  
                 break;  
             }
             long end = System.currentTimeMillis();
             long time = end - start;  
             System.out.print(time);  
        }
        
    }
    

    测试结果,单线程的执行效率最高,多线程的情况下导致执行效率变慢,我的电脑时4核的,可以见线程的上下文切换很需要时间,而且CPU容易飙升到100%,这种情况下反而优化后的锁更浪费时间,因为CPU耗尽的原因,所以实际工作中还是需要结合服务器去做测试。
    接下来我们看下著名的CLH锁和MCS锁

    TAS、TTAS两种自旋锁。这两种锁的缺点是:任何一个处理器每一次对锁成功的访问(state.compareAndSet(state.get(),true)和set(false)任意一个方法的调用),都会将其他处理器的cache中的缓存失效掉。这样会导致以下后果:
    其他处理器无法再采用局部自旋的方式对相同数据进行访问,后续的其他处理器对锁的访问都会独占bus发送消息。而锁的释放也是需要占用bus,因此有可能其他处理器对bus的占用而导致锁的释放被延迟;
    当锁释放后的一刹那,其他正在局部自旋的处理器同时竞争去获取锁(调用state.compareAndSet(state.get(),true)),原本只有一个处理器能在此次竞争中获胜,但其他获取失败的处理器还是在bus上制造了大量无用的交通阻塞,因此也会导致锁的释放被延迟;
    由于数据访问存在局部性,cache从memory中加载数据时,一次会加载一个数据块,即cache存储的是cache line。Cache中锁状态数据的失效,会导致其他和锁状态处于同一个cache line中的数据也被失效掉,从而影响其他原本与锁不存在竞争的处理器的执行速度。
    另外,最后一个缺点就是无法实现锁的公平性,最先自旋等待获取锁的处理器,在竞争中,不一定最先获取到锁。
    虽然基于时间延迟的BackoffTTAS锁能一定程度的降低cache被无故失效的次数,但如何设置最优的延迟时间却是一个难题,因为不同的平台会有不同的最优参数,同时,引入延迟后,有可能导致处理器不能及时的得到锁,造成无谓的等待。 那么我们希望最理想的情况是:
    由于锁(此处指排它锁)同一时刻只能由一个处理器持有,所以在锁释放后的一刹那,最多只需要通知一个正在等待持有锁的处理器去获取锁,其他处理器就不需要同时去凑热闹了,轮到它的时候,会自动只通知他。
    当某个处理器获取到锁或者释放锁时,不要失效掉其他还未轮到顺序的处理器的cache,这样这些处理器就能一直在自己的cache中局部自旋,不会占用bus。
    CLH锁和MCS锁就是基于这种理念设计出来的

    先看CLHLock

    package com.alibaba.otter.canal.common;
    import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
    public class CLHLock {
        public static class CLHNode {
            private volatile boolean isLocked = true;
                  //CLH算法中,每个申请锁的线程通过一个CLHNode 对象表示,
                  //CLHNode 中有一个**volatile ****boolean**类型的成员变量isLocked ,
                  //isLocked 为true,则表示对应的线程已经获取到了锁或者正在等待获取锁;
                  //isLocked 为false,则表示对应的线程已经释放了锁。
        }
        @SuppressWarnings("unused")
        private volatile CLHNode tail;
        private static final ThreadLocal<CLHNode> LOCAL = new ThreadLocal<CLHNode>();
        private static final AtomicReferenceFieldUpdater<CLHLock, CLHNode> UPDATER = AtomicReferenceFieldUpdater
                .newUpdater(CLHLock.class, CLHNode.class, "tail");
        public void lock() {
            CLHNode node = new CLHNode();
            LOCAL.set(node);
            CLHNode preNode = UPDATER.getAndSet(this, node);
            if (preNode != null) {
                while (preNode.isLocked) {
                }
                preNode = null;
                LOCAL.set(node);
            }
        }
        public void unlock() {
            CLHNode node = LOCAL.get();
            if (!UPDATER.compareAndSet(this, node, null)) {
                node.isLocked = false;
            }
            node = null;
        }
    }
    

    锁由多个CLHNode 对象组成的虚拟链表表示,之所以称为虚拟链表,是因为CLHNode 之间并没有类似于next指针之类的引用,CLHNode 之间通过锁的一个本地线程(ThreadLocal)变量LOCAL 相连,preNode 指向当前节点的前驱节点,即当前线程的前一个线程。而链表的尾节点则通过锁AtomicReferenceFieldUpdater<CLHLock, CLHNode>类型的tail成员变量指向,即tail指向加入到申请锁的队列中的最近一个线程。
    当一个线程申请锁时:
    首先会实例化一个CLHNode 节点,并将其设置为自己的本地线程变量LOCAL中;
    然后利用 UPDATER.getAndSet(this, node); 得到 前一个节点 其实就是实例化的 CLHNode 的节点node 然后在此node上自旋。
    释放锁时:
    本地变量中获得node节点 然后进行tail的指向,释放锁 然后归还对象node。

    再看一下MCSLock

    package com.alibaba.otter.canal.common;
    import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
    public class MCSLock {
        public static class MCSNode {
            volatile MCSNode next;
            volatile boolean isLocked = true;
        }
        private static final ThreadLocal<MCSNode>                          NODE    = new ThreadLocal<MCSNode>();
        @SuppressWarnings("unused")
        private volatile MCSNode                                           queue;
        private static final AtomicReferenceFieldUpdater<MCSLock, MCSNode> UPDATER = AtomicReferenceFieldUpdater.newUpdater(MCSLock.class,
                                                                                       MCSNode.class, "queue");
        public void lock() {
            MCSNode currentNode = new MCSNode();
            NODE.set(currentNode);
            MCSNode preNode = UPDATER.getAndSet(this, currentNode);
            if (preNode != null) {
                preNode.next = currentNode;
                while (currentNode.isLocked) {
    
                }
            }
        }
        public void unlock() {
            MCSNode currentNode = NODE.get();
            if (currentNode.next == null) {
                if (UPDATER.compareAndSet(this, currentNode, null)) {
    
                } else {
                    while (currentNode.next == null) {//队列尾发生了改变,必然有新的节点正在或者将要添加进来,因此循环等待
                    }
                }
            } else {
                currentNode.next.isLocked = false;
                currentNode.next = null;
            }
        }
    }
    

    MCS队列锁也是为每个线程分配一个节点,节点中任然包含一个locked属性,和CLH队列锁不同,MCS队列锁使用一个真正的队列来保存等待线程,因此节点中还包含一个next属性,并且locked属性的含义也不一样,在这里表示该线程是否应该被阻塞,线程将循环探测自己节点的locked属性,直到该属性被前续节点的线程修改为false。
    线程调用lock后,首先获取自己对应的节点node,并将node设置为队列尾,并返回前续节点,如果前续节点不为空,则表明线程应该等待:线程首先将node的locked设置为true,表示自己将被阻塞,然后设置前续节点的next指向node,然后就开始循环直到node的locked属性被设置为false。 线程在调用unlock后,首先获取自己对应的节点node,如果node的next为空,则尝试将队列尾设置到空,如果成功,则说明队列已经为空,则退出;否则则说明队列尾发生了变化,需要等待其它线程设置node的next属性,最后设置node的next的locked属性,并退出。 MCS队列锁和CLH队列锁具有相同的特点,前续节点对状态的改变只会影响到后续的节点,不同点是MCS队列锁是在本地cache自旋等待。

    总结:我个人觉得MCSLock更好理解 (不知道为啥会这么想,可能是自旋的自己的节点的状态 而 CLHLock是前驱节点的状态) 适合MUMA系统,因为它解决了CLH在NUMA系统架构中获取locked域状态内存过远的问题

    相关文章

      网友评论

        本文标题:java自旋锁 去适应不同的CPU处理技术

        本文链接:https://www.haomeiwen.com/subject/karkvttx.html