美文网首页
多线程编程那些事

多线程编程那些事

作者: 小龙的城堡 | 来源:发表于2020-12-10 23:56 被阅读0次

    多线程编程那些事

    标签:HPC、多线程、JMM、Volatile、锁、CPU多核构架、Happens before、LOCK指令

    先看一段代码:

    package jvm.valatile;
    
    public class VolatileTest extends Thread {
        boolean flag = true;
        long i = 0L;
    
        @Override
        public void run() {
            while (flag) {
                i++;
            }
        }
    
        public static void main(String[] args) throws Exception {
            VolatileTest vt = new VolatileTest();
            vt.start();
            Thread.sleep(1000);//挂起下主线程,确保子线程完成初始化并启动成功。
            setFlag(vt);//设置flag,尝试停止线程
            System.out.println("vt value: " + vt.i);
            vt.join();
            System.out.println("stop and final flag is:" + vt.flag);
        }
    
        private static void setFlag(VolatileTest vt) {
            vt.flag = false;
        }
    }
    
    

    对多线程并发编程比较熟悉的人大概一眼就能看出这段代码的问题,以及背后的技术。

    我们看结果:

    root@moon-light:~/share/java# java VolatileTest
    vt value: 744337091
    

    是的,风扇已经开始转了,后台线程没有停下来,此时如果运行top会看到用户态的CPU时间是99.5%以上,程序陷入了死循环。

    image.png

    我们稍微改一行代码,就能让子线程收到主线程的消息,停下来,加上volatile关键字:

    volatile boolean flag = true;
    

    结果:

    root@moon-light:~/share/java# java VolatileTest
    vt value: 763169009
    stop and final flag is:false
    

    是的java中靠volatile关键字来保证线程间的共享变量的可见,那么为什么volatile能够保证共享变量的可见性呢?这可能要从java的内存模型JMM说起了。

    JMM

    JMM有个公开文档来说明java内存模型的设计目的、用意与特性,我这里总结下:

    首先,为什么要有JMM?

    1. 因为java运行在jvm基础上,既然是虚拟机,而且一次编译还要处处运行,那么首要任务必须能屏蔽掉所有的可能的计算平台(Intel、AMD、PowerPC以及各种大中小型机),而不同的机器在运行程序的时候有不同的特性,比如:乱序执行、分支预测、以及CPU级别的内存模型;不同的缓存构架(不同的缓存行同步协议);所以为了统一并对下进行屏蔽,要为java字节码运行扫清障碍,定义一个虚拟的机器——jvm以及运行在这种机器上的内存访问模型——jmm;所以,jvm控制字节码的执行,相当于CPU,jmm相当于内存;

    2. CPU、编译器都会对程序的执行顺序进行修改以使用CPU的各种加速技术,那么在多线程的情况下,程序的运行结果就能难预测,特别是在多核构架下的CPU运行多线程程序,事情就变得更加复杂了。而多线程程序之间的通信归根结底只有一种方式——共享变量;而共享变量是存在内存中的,加上不同CPU对内存的访问是有区别的,所以需要JMM制定规则来通过控制内存的访问来控制程序的执行预期。不然,同一段开吗在A机器得到的结果很可能在B机器就是另一个不同的结果。引用一下原文:

      A memory model describes, given a program and an execution trace of that program, whether the execution trace is a legal execution of the program.

      The memory model predicts the possible behaviors of a program. An implementation is free to produce any code it likes, as long as all resulting executions of a program produce a result that can be predicted by the memory model.

    一句话:JVM通过JMM来控制(多线程)程序的执行顺序。

    那么什么是JVM定义的执行顺序呢?简单的说就是一个单线程程序的执行顺序。原文:

    Program Order Among all the inter-thread actions performed by each thread t, the program order of t is a total order that reflects the order in which these actions would be performed according to the intra-thread semantics of t.

    以前面的例子来说,我们的预期顺序是:

    1. 启动主线程;
    2. 主线程启动一个子线程,子线程运行一个循环来对共享变量进行累加;
    3. 主线程关闭子线程;
    4. 子线程退出;
    5. 打印累加结果。

    但是,如果不加volatile关键字的共享变量是不会按照这个顺序得到预期的结果的。所以,JMM的作用就是通过volatile关键字来保证程序按照预期输出结果。

    Happens-Before

    JMM为了保证大家写的程序能够获得预期的结果,特别是多线程程序也能按照正常顺序也就是”Program Order“来执行,引入了一些控制执行顺序的规则,统一叫做Happens-Before的规则,简写成hb。

    简单说就是,加持了hb属性的读写操作,在执行的时候,甚至是多线程执行的时候,jmm会保证按照你的意思来执行。就上面的例子,volatile boolean flag = true; flag就被jmm加持了hb属性,jmm会保证对它的写操作完成后,其他拥有这个变量的线程会立即同步到最新的写入结果,也就是能立马结束线程的运行。

    其实除开volatile关键字有hb语义,还有几种情况,引自原文:

    •1、Each action in a thread happens before every subsequent action in that thread.

    •2、An unlock on a monitor happens before every subsequent lock on that monitor.

    • 3、A write to a volatile field happens before every subsequent read of that volatile.

    • 4、A call to start() on a thread happens before any actions in the started thread.

    • 5、All actions in a thread happen before any other thread successfully returns from a join() on that thread.

    • 6、If an action a happens before an action b, and b happens before an action c, then a happens before c.

    我们例子中是命中了第3条规则。

    问题

    大家发现了一个问题吗?两个线程其实共享了两个变量的,一个是flag还有一个i。问题是,i如果没有被volatile加持,为啥最后在主线程中输出i的累加结果时不是主线程的本地变量0呢?也就是说,主线程执行System.out.println("vt value: " + vt.i);的时候,子线程已经将i的累加值同步回主线程了。怎么做到的?

    我想,问题肯定出在println这个函数上,它肯定也被加持了hb的属性了。我们跟进去看看println的实现:

    public void println(String x) {
            synchronized (this) {
                print(x);
                newLine();
            }
        }
    

    我们发现了synchronized关键字,结合hb规则第2条,我觉可能是这样的:

    1. synchronized在jvm底层C++代码中对应一个monitor对象;
    2. 因为monitor对象有hb语义,也就是在打印x之前,从堆上给我同步下其他线程的结果。

    所以能够每次打印出正确的i值结果来。

    至于,如果有超过一个线程对i累加,到底同步哪个线程的值,就取决于jvm特定平台的实现了。

    引伸下

    既然printlnhb的属性加持,那么是不是可以将程序稍微改下,不用volatile也可以达到让线程停下来的目的?改一下程序:

    public class VolatileTest extends Thread {
        boolean flag = true;
        long count = 0L;
    
        public void run() {
            while (flag) {
                if (count % 4300000000L == 0 && count != 0L) {//让程序运行久一点
                    System.out.println(count);
                }
                count++;
            }
        }
    
        public static void main(String[] args) throws Exception {
            VolatileTest vt = new VolatileTest();
            vt.start();
            Thread.sleep(1000);
            setFlag(vt);
            vt.join();
            System.out.println("vt value: " + vt.count);
            System.out.println("stop and final flag is:" + vt.flag);
        }
    
        private static void setFlag(VolatileTest vt) {
            vt.flag = false;
        }
    }
    

    结果也能停下来:

    root@moon-light:~/share/java# java VolatileTest
    4300000000
    vt value: 4300000001
    stop and final flag is:false
    

    也是符合预期的。

    再引伸下

    我们知道,jmm的hb就那6种条件,我大胆的猜测下,如果我让子线程主动切换出去休息,是不是回来后也会同步共享变量呢?如果我是jmm的某个平台的实现者我会这么做的;因为,发生线程上下文切换肯定是惊动了内核,到内核去玩资源了,一定是做了什么不得了的事情,比如网络数据包来了,文件读写了,这么一来大概率会对共享变量有所修改,所以不如同步一把,以免程序员报编译器bug。

    我们就测试一下

    public class VolatileTest extends Thread {
        boolean flag = true;
        long count = 0L;
    
        public void run() {
            while (flag) {
                if (count % 4300000000L == 0 && count != 0L) {
                   try {
                     Thread.sleep(1);
                   } catch (InterruptedException e) {
                      e.printStackTrace();
                   }
                }
                count++;
            }
        }
    
        public static void main(String[] args) throws Exception {
            VolatileTest vt = new VolatileTest();
            vt.start();
            Thread.sleep(1000);
            setFlag(vt);
            vt.join();
            System.out.println("vt value: " + vt.count);
            System.out.println("stop and final flag is:" + vt.flag);
        }
    
        private static void setFlag(VolatileTest vt) {
            vt.flag = false;
        }
    }
    

    嗯,结果是也能停下来:

    root@moon-light:~/share/java# java VolatileTest
    vt value: 4300000001
    stop and final flag is:false
    

    一种可能的JMM的内存布局

    测试了这些代码,我不由的想猜测下JMM的实际内存布局,来彻底搞清楚文章开头那个例子为啥会同步不到主线程的变量。于是我画了张图: JMM

    可以看到,主线程与子线程的线程栈中都有变量i,但是他们并不共享缓存行(CacheLine),而是通过JMM+执行引擎来完成线程同步;实际上JMM的工作机制更像是缓存行一致性协议负责在各个线程之间同步信息。所以,JVM的线程并不直接收到CPU的控制,而是受控于JVM,JMM本身。

    这也能解释为啥setFlag(vt);//设置flag,尝试停止线程不工作的原因。因为:

    1. 两个线程不共享缓存行;
    2. JMM会在合适的时候(那6个hb规则)更新i在线程栈的值,已同步线程。

    问题2

    如果真是这样,那么,对于编译型语言那岂不是不用加volatile也能同步共享变量了?因为,我知道,多核构架下CPU核心之间是有缓存行一致性协议存在的。比如,Intel就是MESI(modified, exclu- sive, shared, invalid)协议就是干这个的。这是个很大的话题,不细讲了。简单来说,可以把CPU类比成微服务中的服务缓存,当拥有多个实例的时候,就要配置缓存的刷新或者一致性协议,当一个服务实例更新了缓存数据,就必须要广播到其他实例,以免读脏。

    那么,对于C++来说,这个问题的内存模型应该是:

    C++

    我们测试下C++的代码:

    #include <iostream>
    #include <string>
    #include <thread>
    #include <mutex>
    std::mutex g_display_mutex;
    
    const int TC = 4;
    int count = 0;
    bool flag = true;
    void inc_count()
    {
        count += 1;
    }
    void testRun()
    {
        while (flag)
        {
            inc_count();
        }
        g_display_mutex.lock();
        std::cout << "child thread id:" << std::this_thread::get_id() <<" count is:" <<count<<std::endl;
        std::cout << "terminated\n";
        g_display_mutex.unlock();
    }
    
    int main()
    {
        std::thread threads[TC]; // 默认构造线程
        for (int i = 0; i < TC; ++i)
        {
            threads[i] = std::thread(testRun); // move-assign threads
        }
    
        std::this_thread::sleep_for(std::chrono::seconds(1));
        std::cout << "main thread id:" << std::this_thread::get_id() << std::endl;
        flag = false;//终止线程
        for (auto &thread : threads)
        {
            thread.join();
        }
    
        std::cout << "All threads joined!"
                  << " count:" << count;
    }
    

    结果为:

    root@moon-light:~/share# g++ -std=c++11 -pthread -g test_volatile_1.cpp -o test_volatile_1
    root@moon-light:~/share# ./test_volatile_1
    main thread id:140351807989568
    child thread id:140351782807296 count is:72319772
    terminated
    child thread id:140351807985408 count is:72319772
    terminated
    child thread id:140351799592704 count is:72319772
    terminated
    child thread id:140351791200000 count is:72319772
    terminated
    

    完美的停下来了。

    多运行几次试试

    All threads joined! count:62006648root@moon-light:~/share# ./test_volatile_1
    main thread id:140251589162816
    child thread id:140251563980544 count is:61286071
    terminated
    child thread id:140251580765952 count is:61262101
    terminated
    child thread id:140251589158656 count is:61262101
    terminated
    child thread id:140251572373248 count is:61282797
    terminated  
    
    All threads joined! count:70375805root@moon-light:~/share# ./test_volatile_1
    main thread id:140332342216512
    child thread id:140332333819648 count is:65634928
    terminated
    child thread id:140332317034240 count is:65634929
    terminated
    child thread id:140332325426944 count is:65634929
    terminated
    child thread id:140332342212352 count is:65460361
    

    发现,确实都停下来了,但是这个累加的结果好像并不是一样的,也就是,当我设置完flag = false,如果每个core都实时收到主线程对flag的更新,那么理论上来讲每个线程count的结果是一样的,为啥会出现差别呢?

    而且,我发现这个差别并不大,鉴于CPU的速度,应该只是”卡了“那么一丢丢。

    CPU多核构架与缓存行一致性协议

    查了很多资料,我发现很可能跟intel CPU的多核构架有关,有兴趣的人可以看看Intel CPU手册。

    简单来说就是(intel cpu):

    1. 缓存行一致性协议(MESI)类似于微服务中的多实例+kafka的模型;

    2. 在多核构架下,每个核心中的线程共享L1与L2,共享变量会通过L3进行广播、共享(L3作用类似于Kafka)

    3. 在通过L3同步本地变量时,在CPU中还有读写buffer,这会影响同步的及时性,也就是可见性的延迟,这个优化也是”万恶之源“(直接导致了cpu也有所谓的happens before属性的存在)

    4. CPU的hb属性主要是一些内存屏障指令最常见的是LOCK指令。

      注:使用读写buffer来延迟发送更新命令很好理解,可以提高系统的吞吐量,但是弊端就是出现同步不及时的问题,这个问题需要使用特殊的CPU指令来规避;这跟我们平时设计高并发读多写少时的策略差不多,可见处理IO的策略在各个层次都是一致的。

    好了,我们画个图来说明。


    MESI

    说明:

    1、CPU不直接跟内存打交道,而是通过L1-L3来访问;

    2、CPU每次从内存加载一个Cacheline大小的数据(跟局部性原理相关,可以优化程序运行速度);

    3、MESI协议作用在缓存行这个级别,这也是最小的内存访问粒度;

    4、L3之上还有个环形互联的总线,MESI用它来广播数据;(类似消息队列)

    5、每个缓存行都有自己的状态,MESI通过这些状态来在不同的核心之间同步数据;

    6、当T1读入C1缓存行的时候,会发送指令到环形互联ring,看看是否有其他核心有C1这个缓存行,如果有,则从ring上拉取下来(从L2)标记为S;如果没有则从L3上取(或者击穿到内存)并标记为Exclusive状态;

    7、当T1要写C1缓存行的时候,会发送invalid消息到ring,通知其他core我要写了,状态同步成功后,其他核心中C1变成Invalid状态,自己的C1变成Modified状态,并发送指令更新L3与内存;

    8、当其他core要访问C1,发现C1状态为Invalid,则发送消息到ring获取最新的C1值;然后Core1嗅探到这个消息,就将最新的值从L2通知到ring,并将C1状态改成Shared;

    9、而这个MESI协议也有同步问题。因为性能问题,不可能每次读写都发送如此多的嗅探消息,性能太低,因此每个核心并不是每次读写都会发送消息,而是在读端加入读buffer;写端加入写buffer来做缓冲,因此Core对Cacheline的读写都有延迟;

    10、正是因为这个buffer的存在——core的流水线去读取buffer有延迟,所以会造成上述例子中每次运行后,每个线程(core)的累加最终结果不同的原因。这个延迟的存在使得有些core会多运行”几次“。

    猜测

    我们稍微改一下程序,让4个线程做加法,每个线程累加10000次,然后预期的结果是4个线程都结束时,count的值是40000;我想因为有这个buffer的存在,应该是加不到40000的。

    #include <iostream>
    #include <string>
    #include <thread>
    #include <mutex>
    std::mutex g_display_mutex;
    
    const int TC = 4;
    int count = 0;
    bool flag = true;
    void inc_count()
    {
        count += 1;
    }
    void testRun()
    {
        for(int i=0; i<10000;i++)
        {
            inc_count();
        }
        g_display_mutex.lock();
        std::cout << "child thread id:" << std::this_thread::get_id() <<" count is:" <<count<<std::endl;
        std::cout << "terminated\n";
        g_display_mutex.unlock();
    }
    
    int main()
    {
        std::thread threads[TC]; // 默认构造线程
        for (int i = 0; i < TC; ++i)
        {
            threads[i] = std::thread(testRun); // move-assign threads
        }
    
        std::this_thread::sleep_for(std::chrono::seconds(2));
        std::cout << "main thread id:" << std::this_thread::get_id() << std::endl;
        flag = false;
        for (auto &thread : threads)
        {
            thread.join();
        }
    
        std::cout << "All threads joined!"
                  << " count:" << count<<std::endl;
    }
    

    运行结果:

    child thread id:139688286332672 count is:10000
    terminated
    child thread id:139688277939968 count is:20000
    terminated
    child thread id:139688269547264 count is:31117
    terminated
    child thread id:139688261154560 count is:37526
    terminated
    main thread id:139688286336832
    All threads joined! count:37526
    

    嗯,不错,果然加不到40000,

    很简单嘛,因为没有加锁,加锁就行了啊。

    对,加锁当然可以解决这个问题,但是锁是什么呢?锁是怎么实现的呢?

    经过我们前面的分析,产生这个现象的根本原因是MESI协议因为有buffer,所以导致多线程程序在多核运行时共享变量并不能达到强一致性,那么如果最底层都没法达到强一致性,那么我们写程序时的锁又是从哪来的呢?或者说怎么实现的呢?是否有什么操作可以保证强一致性呢?

    HACK

    我不想写mutexsynchronizedReentryLock来同步程序,因为那样会索然无味,于是,我试图从Intel手册中找到答案。

    我真的找到了,不然就不会有这篇文章了吧。

    在手册的第三章第8章MULTIPLE-PROCESSOR MANAGEMENT中,我找到了一个叫做LOCK的指令,它可以实现多核多线程的原子指令

    8.1 LOCKED ATOMIC OPERATIONS

    The 32-bit IA-32 processors support locked atomic operations on locations in system memory. These operations are typically used to manage shared data structures (such as semaphores, segment descriptors, system segments, or page tables) in which two or more processors may try simultaneously to modify the same field or flag. The processor uses three interdependent mechanisms for carrying out locked atomic operations:

    •Guaranteed atomic operations •Bus locking, using the LOCK# signal and the LOCK instruction prefix

    •Cache coherency protocols that ensure that atomic operations can be carried out on cached data structures (cache lock); this mechanism is present in the Pentium 4, Intel Xeon, and P6 family processors

    简单来说就是:LOCK是一种指令前缀,能够加这个前缀的指令就能保证操作的原子性(多核下),可以实现对内存的原子访问,是实现锁的基础。

    哪些指令可以加这个前缀呢?继续翻手册。

    To explicitly force the LOCK semantics, software can use the LOCK prefix with the following instructions when they are used to modify a memory location. An invalid-opcode exception (#UD) is generated when the LOCK prefix is used with any other instruction or when no write operation is made to memory (that is, when the destination operand is in a register).
    •The bit test and modify instructions (BTS, BTR, and BTC).
    •The exchange instructions (XADD, CMPXCHG, and CMPXCHG8B).
    •The LOCK prefix is automatically assumed for XCHG instruction.
    •The following single-operand arithmetic and logical instructions: INC, DEC, NOT , and NEG.
    •The following two-operand arithmetic and logical instructions: ADD, ADC, SUB, SBB, AND, OR, and XOR.

    可以看到有个ADD指令,是的count+=1不就是Add操作吗?那我们不就有办法用这个最原始的锁来实现多线程累加程序了吗?

    我们知道,CPU上层是汇编,我想java肯定干不成这事了,只能用C++了。

    void inc_count()
    {
        asm("movl   $1, %eax");
        asm("lock addl %eax,count(%rip)");
    }
    

    我们只需要修改inc_count函数即可,在这个函数中显式的调用LOCK指令来强制同步下缓存行即可。

    编译、链接、运行...

    root@moon-light:~/share# ./test_volatile_2
    main thread id:139873222453056
    child thread id:139873222448896 count is:17628
    terminated
    child thread id:139873214056192 count is:24639
    terminated
    child thread id:139873205663488 count is:35229
    terminated
    child thread id:139873197270784 count is:40000
    terminated
    All threads joined! count:40000
    root@moon-light:~/share# ./test_volatile_2
    main thread id:140615447291712
    child thread id:140615447287552 count is:13509
    terminated
    child thread id:140615430502144 count is:24803
    terminated
    child thread id:140615422109440 count is:33008
    terminated
    child thread id:140615438894848 count is:40000
    terminated
    All threads joined! count:40000
    root@moon-light:~/share# ./test_volatile_2
    main thread id:140172760246080
    child thread id:140172760241920 count is:17644
    terminated
    child thread id:140172751849216 count is:29650
    terminated
    child thread id:140172743456512 count is:32806
    terminated
    child thread id:140172735063808 count is:40000
    terminated
    All threads joined! count:40000
    

    不管运行了多少次都是正确的结果——40000。

    去掉LOCK再试试?

    void inc_count()
    {
        asm("movl   $1, %eax");
        asm("addl %eax,count(%rip)");
    }
    

    结果是:

    root@moon-light:~/share# ./test_volatile_2
    main thread id:child thread id:140026343032640140026343028480 count is:
    15217
    terminated
    child thread id:140026326243072 count is:26761
    terminated
    child thread id:140026334635776 count is:26761
    terminated
    child thread id:140026317850368 count is:26761
    terminated
    All threads joined! count:26761
    root@moon-light:~/share# ./test_volatile_2
    main thread id:child thread id:140632667023168140632658626304 count is:
    12776
    terminated
    child thread id:140632667019008 count is:33191
    terminated
    child thread id:140632650233600 count is:33191
    terminated
    child thread id:140632641840896 count is:33191
    terminated
    All threads joined! count:33191
    root@moon-light:~/share# ./test_volatile_2
    main thread id:child thread id:140121883453248140121883449088 count is:
    10000
    terminated
    child thread id:140121875056384 count is:29073
    terminated
    child thread id:140121866663680 count is:32190
    terminated
    child thread id:140121858270976 count is:37510
    terminated
    All threads joined! count:37510
    

    结果又回到了最初。

    有了这个指令,我想什么synchronizedCASvolatile不就都能理解了吗?它们都依赖这个最底层的指令,而且据说这个指令是所有CPU都必须实现的,所以Linux、JVM都在大量使用。在底层代码中看到这个就不足为奇了。

    具体怎么用这个指令实现这些锁,那就到下篇文章讲解了。

    另外不管什么语言,框架如果说自己是最快的、最好的,多半是假的,可以想想Intel有几千页的文档,你最好的方案永远都出自硬件的支持;应该只有在某些方面表现比其他工具更好而已,不存在绝对性。

    总结

    多线程编程,有趣而又深邃。

    往大了说能够提升性能,有着化腐朽为神奇的力量;

    往深了说涉及到CPU、操作系统与编程语言的精准操控

    总之,趣味十足。

    相关文章

      网友评论

          本文标题:多线程编程那些事

          本文链接:https://www.haomeiwen.com/subject/pgydgktx.html