美文网首页
Java并发编程

Java并发编程

作者: HannahLi_9f1c | 来源:发表于2020-11-28 21:07 被阅读0次

    这篇文章目的就是将Java并发编程的知识点串联起来,不仅仅是为了面试,还跟实际工作中的最佳实践相结合,当然,楼主也只是工作不到一年的小菜鸟,可能会有疏漏或者有错误之处,恳请指正。以下是这篇文章的大纲


    i并发编程大纲

    线程共享

    线程共享问题是并发编程中不可避免的问题,多线程协作通常会共享全局变量。我们期望多个线程想要修改同一个共享变量,同一时刻只有一个线程修改成功;并且当一个线程读取共享变量的值时,不会被其他线程修改,否则会造成不一致。总的来说读读可以同时,读写和写写不可同时。而且当其他线程修改共享变量的值之后,其他线程能够看到,这指的是内存的可见性。

    Java内存模型

    说到可见性,就不得不提到Java的内存模型了,操作系统是底层硬件资源的抽象,在操作系统中,多线程共享进程的地址空间。Java虚拟机定义了“Java内存模型”,屏蔽操作系统对不同硬件的处理差异,实现了多平台的统一性。在Java内存模型中,每个线程有自己的工作内存,对于读取和赋值,只能在自己的工作线程进行。


    image.png

    那么什么时候才会从主内存中加载值呢?Java虚拟机的设计人员定义了8种操作访问规则,相当繁琐,这里就不列出了。说了那么多Java内存模型,就是为了让大家理解什么是可见性,当一个线程在工作内存修改共享变量的值后,将值赋给主内存保存,其他线程加载修改的值,就能看到最新的值。而volatile就保证了其他线程能够看到最新修改后的值。

    volatile

    原理

    当一个变量被volatile修饰,其保证的语义是可见性和防止指令重排序。是因为线程在读取volatile的值时,都会从主内存中刷到最新的值,普通变量没有这种保证,所以看到的可能是旧值。
    指令重排序:CPU在执行指令过程中,在不改变指令的执行结果下会对指令进行优化,改变顺序,volatile通过读写屏障保证了指令的顺序性。我们可以通过单例模式知道这种优化带来的好处。

    public class Singeton {
        private volatile static Singeton instance ;
        public static getInstance{
            if(instance ==  null) {
                synchronized (Singeton.class) {
                    if(instance == null) {
                        instance = new Singeton(); 
                     }
                }
              return instance;
        }
    }
    

    这是双重锁检验的代码。其中实例用volatile使用volatile修饰,用来防止指令重排序。new Singeton()实例化包含多条指令,如果多条指令不能保证顺序,一个线程尚未实例化出单例对象,其他线程判断instance==null为false,就会返回一个尚未实例化完的对象,造成问题。

    使用方式

    volatile保证可见性和有序性,但是不能保证原子性,因此多个线程的修改动作不是原子性时,会有线程安全问题。volatile适用的场景。

    • 运算结算不依赖当前值,或者确保只有单一的线程能够改变量的值。“运算结果不依赖当前值”是多个线程同时访问并运算时,不依赖当前值是多少,比如说将boolean类型修饰为volatile,然后将其置为true/false,最终的结果都是true/flase,不依赖当前的值。“单一线程能够改变量的值“,cas自旋volatile的场景就能够保证只有一个线程进入代码段,并改变变量。下文会介绍cas自旋volatile变量的经典使用姿势。
    • 变量不需要与其他状态变量共同参与不变约束
    源码经典使用
    1. FutureTask源码
    private volatile int state;
     private static final long stateOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = FutureTask.class;
                stateOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("state"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
        protected void set(V v) {
            if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                outcome = v;
                UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
                finishCompletion();
            }
        }
    

    截取了部分FutureTask的源码,重点看state变量是如何使用的,FutureTask中用state来表示线程的执行状态,stateOffset是内存反射值。当使用set方法对结果进行赋值,并修改state的值时,使用CAS原子指令,表明如果没有其他线程进入这个方法,那么当前线程可以进入设置线程结果,并使用反射将state值设置为NORMAL。那么为什么不用state=NORMAL而是用UNSAFE.putOrderedInt(this, stateOffset, NORMAL); ?

    ThreadLocal

    ThreadLocal用线程隔离的方式解决了线程共享的问题,每个线程保存了变量的副本,其他线程访问不了。由于不需要考虑解决线程安全问题,因此不需要加锁,是一种以空间换时间的思路。

    使用方式

    我们通常会在项目中定义一个请求处理的拦截器,实现HandlerInterceptor,在请求前,请求方法执行完和请求结束对RequestBody进行处理或者打印相关日志。代码中ThreadLocal修饰了requestBody变量,因为服务器使用的是Tomcat,所以一个请求会交给一个线程来处理,那么requestBody的get()和set方法设置的就是当前线程的请求体的值,跟其他线程互不影响。除了这种场景,ThreadLocal还用在数据库连接,每个请求用户名等等

    public class ReqLogInterceptor implements HandlerInterceptor {
    ThreadLocal<String> requestBody = new ThreadLocal<String>();
      @Override
      public boolean preHandle(HttpServletRequest httpServletRequest,
          HttpServletResponse httpServletResponse, Object o) throws Exception {
        requestBody.set("");
        return true;
      }
      @Override
      public void postHandle(HttpServletRequest httpServletRequest,
          HttpServletResponse httpServletResponse, Object o, ModelAndView modelAndView)
          throws Exception {
        if (httpServletResponse instanceof ContentCachingResponseWrapper) {
          responseBody.set("");
          byte[] body = ((ContentCachingResponseWrapper) httpServletResponse).getContentAsByteArray();
          responseBody.set(new String(body, httpServletResponse.getCharacterEncoding()));
        }
      }
    
      }
    
    源码

    小伙伴们一定也很好奇其中的源码是如何实现的吧?我们进入Jdk查看

    1. set(T value)源码,讲解请看注释
        public void set(T value) {
    //获取当前线程,因为每个线程拥有自己的ThreadLocalMap
            Thread t = Thread.currentThread();
    //通过Thread获取ThreadLocalMap
            ThreadLocalMap map = getMap(t);
    //如果map不为空,以当前ThreadLocal对象为key,存储的值为value放置进map
            if (map != null)
                map.set(this, value);
    //否则。创建map,并放入当前值
            else
                createMap(t, value);
        }
    

    继续看createMap做了什么?

       void createMap(Thread t, T firstValue) {
    //将ThreadLocal对象和set的value值传入ThreadLocalMap的构造函数
            t.threadLocals = new ThreadLocalMap(this, firstValue);
        }
    

    构造函数初始化并put value

            ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
    //table是Entry[]数组,Entry以ThreadLocal作为key,以设置的值作为value,类似于HashMap的Entry
                table = new Entry[INITIAL_CAPACITY];
    //算出放入的值在table的下标
                int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
                table[i] = new Entry(firstKey, firstValue);
                size = 1;
    //设置初始容积
                setThreshold(INITIAL_CAPACITY);
            }
    

    虽然ThreadLocal的结构跟HashMap,不过我觉得冲突的概念会更小
    如果已经初始化过了,可以直接往map里set值

            private void set(ThreadLocal<?> key, Object value) {
                Entry[] tab = table;
                int len = tab.length;
                int i = key.threadLocalHashCode & (len-1);
    //从这里可以看出,ThreadLocalMap解决冲突的方式不是拉链法,而是线性探测法
    //从计算出的下标继续往下找
                for (Entry e = tab[i];
                     e != null;
                     e = tab[i = nextIndex(i, len)]) {
                    ThreadLocal<?> k = e.get();
    //key相同。放置当前值并返回
                    if (k == key) {
                        e.value = value;
                        return;
                    }
    //key为null。调用replaceStaleEntry放置当前值,并处理key为null的结点,下一节内存泄漏会讲解
                    if (k == null) {
                        replaceStaleEntry(key, value, i);
                        return;
                    }
                }
    
                tab[i] = new Entry(key, value);
                int sz = ++size;
    //清理为null的结点
                if (!cleanSomeSlots(i, sz) && sz >= threshold)
                    rehash();
            }
    
    内存泄漏
    1. 内存泄漏的原因
      通过阅读源码,我们发现ThreadLocalMap中的Entry是弱引用WeakReference,Java中有四种引用,分别是强引用、软引用、弱引用、虚引用,弱引用关联的对象在下一次虚拟机收集垃圾时就会消亡,那么为什么Java要把Entry中的key设置为弱引用呢?这是因为ThreadLocal是一种以时间换空间的策略,如果有一千个线程,那么就需要存储1千份的副本,而且每个线程的ThreadLocal对象不仅有一个,所以Java将其设计为弱引用就是为了在GC回收的时候,能够将对象进行回收。


      ThreadLocal引用分析

      但是即使ThreadLocal被回收了,还会有内存泄漏的可能,如上图的引用关系,ThreadLocal的强引用被设置为null后,那么没有到达Entry中的key的引用链(这里可以去复习下Java的垃圾回收)。但是依然存在从Thread Ref-> Thread-> Map->value的引用,所以Entry没有被回收,虽然说Thread如果执行完被销毁之后,Thread Ref到Thread的引用会断开,那么Entry就能够被回收,但是现在大部分项目使用线程池来管理线程,所以复用线程时依然存在这样的引用链。

    2. 如何避免内存泄漏
      ThreadLocal的set、get、remove方法都会检查key为null的结点,将其进行回收。我们再一次阅读set方法的源码,可以发现在判断到key为null的时候,会调用replaceStaleEntry方法,环形遍历回收附近key为null的结点。如果当前table[i]为null,也会调用cleanSomeSlots清除掉过期的key。更多的源码回收细节可以参考博客:https://www.jianshu.com/p/dde92ec37bd1,讲的十分全面。
            private void set(ThreadLocal<?> key, Object value) {
                Entry[] tab = table;
                int len = tab.length;
                int i = key.threadLocalHashCode & (len-1);
                for (Entry e = tab[i];
                     e != null;
                     e = tab[i = nextIndex(i, len)]) {
                    ThreadLocal<?> k = e.get();
                    if (k == key) {
                        e.value = value;
                        return;
                    }
                    if (k == null) {
                        replaceStaleEntry(key, value, i);
                        return;
                    }
                }
    
                tab[i] = new Entry(key, value);
                int sz = ++size;
                if (!cleanSomeSlots(i, sz) && sz >= threshold)
                    rehash();
            }
    
    ThreadLocal最佳实践
    1. 及时调用ThreadLocal的remove方法,可以避免内存泄漏问题,更重要的是防止造成业务逻辑的错乱,因为通常会使用线程池管理线程,如果一个用户登录之后的name相关的ThreadLocal对象,没有及时remove,那么其他用户登录进来之后,会发现自己的用户名显示错误。

    显式锁

    另一种解决线程共享的方式就是使用显式锁,显式锁的作用就是让所有线程排队使用共享变量,只有当前线程拥有锁,其他线程需要等待锁释放。显式锁的实现主要有Java内置锁synchronized和JDK实现的锁ReetrantLock,他们的共同点是可重入、独占性,在JDK1.6优化后性能差异不大。Lock具有更多的高级特性,例如公平锁,可中断性。

    synchronized
    1. 原理
      synchronized关键字经过Javac编译,会产生monitorenter和montinorexit两条指令。他们都需要绑定到一个对象上,如果synchronized中指定了要锁住的对象,那么就以对象引用作为绑定对象,如果没有指定,那么就根据修饰的方法的类型是静态方法还是实例方法,来选择绑定类的Class对象还是对象实例。
    2. 加锁过程

    《深入理解Java虚拟机》
    在执行montiorenter时,首先尝试去获取对象的锁。如果对象没被锁定,或者当前线程已经拥有对象的锁,就把锁的计数器加一,而在执行montiorexit指令时会被锁计数器减一。一旦计数器减为0,锁就会被释放。如果获取对象锁失败,当前线程就会阻塞等待,直到请求锁定的对象被持有它的线程释放为止

    1. synronized的优化
      由于Java中的线程是映射到操作系统内核线程上的,因此阻塞和唤醒线程,都需要操作系统的帮助,阻塞和唤醒都涉及到用户态和内核态之间的转换,为什么内核态和用户态之间的转换成本会那么高呢?原因就是线程的执行不仅仅需要代码和数据,还需要上下文,这里的上下文包括寄存器和程序计数器,上下文的保存和恢复的时间和空间的开销都很大,所以说synchronized开销较大,因此在JDK1.6之前性能远不如ReetrantLock,但是JDK1.6之后,虚拟机团队对其进行了优化,性能有了较大提升。锁的升级过程如流程图所示,下面也会逐一介绍每个优化


      升级过程
    • 自旋锁、自适应锁:我们已经知道当一个线程无法获取锁,需要阻塞就会进入内核态,但是很多时候线程占用锁的时间比较短,那么这个线程经过短时间的阻塞又要被唤醒了,在短时间内在内核态和用户态之间切换,开销会很大。自旋锁在线程获取不到锁的时候,先进行空循环,占用CPU时间等待锁,如果能够等到锁释放,就能够避免用户态和内核态的切换了。而自适应锁是通过判断锁前一次在同一个锁的自旋时间和拥有者状态来决定是否自旋和自旋时间,如果在一个锁上很少能够自旋等待成功,那么虚拟机会选择不在这个锁上进行自旋。
    • 锁消除:锁消除就是通过逃逸分析,如果判断一个一段代码在堆上的数据不会被其他线程访问到,那么可以把这段代码当成栈数据来使用,不需要锁
    • 锁粗化:如果在同一个对象上连续进行加锁解锁,消耗也很大,锁粗化就是把同步范围变大,避免反复加锁解锁。
    • 轻量级锁 :


      对象头结构

      在进入同步块之前,对象是无锁的状态,锁标志位是01,虚拟机在当前线程的栈帧建立一个Lock Record存储Mark Word的拷贝。然后虚拟机利用CAS尝试将对象的Mark Word更新为指向Lock Record的指针,如果成功,表示加锁成功,Mark Word标志位变为00。如果CAS失败,虚拟机会检查对象的Mark Word是否已变为当前线程的栈帧,如果不是,说明其他线程已经占有了该对象。此时就会膨胀为重量级锁,标志位变为10。解锁的过程也是使用CAS。轻量级锁提升性能的依据就是绝大部分锁,在整个同步周期,是不存在竞争的

    • 偏向锁:如果当前虚拟机开启了偏向锁,那么当锁对象第一次被线程获取的时候,会把对象头的标志位设置为“01”,偏向标志记为1,并用CAS把当前线程的ID记录在对象头。下次这个线程再次获取这个对象锁时,就不需要同步了。一旦其他线程尝试获取这个锁,偏向模式就结束了
    ReetrantLock

    ReetrantLock的优势在于可以中断等待的锁,公平锁。ReetrantLock内部包含两种锁,公平锁Sync和非公平锁NonfairSync都是继承了AQS。

    1. AQS


      AQS框架

    AQS是同步工具的抽象,AQS通过state将共享变量资源抽象化,同步队列控制线程的访问顺序,子类可以重写获取独占锁tryRelease或者重写获取共享锁tryReleaseShared,而对于出队、入队的细节都封装在AQS。

    1. 公平锁和非公平锁实现
      ReetrantLock中定义了一个抽象类Sync,继承了AQS,定义了抽象方法lock,重写了AQS的tryRealse方法,这个是公平锁和非公平锁使用的方法。
      公平锁:
        static final class FairSync extends Sync {
            private static final long serialVersionUID = -3000897897090466540L;
    
            final void lock() {
                acquire(1);
            }
    
            /**
             * Fair version of tryAcquire.  Don't grant access unless
             * recursive call or no waiters or is first.
             */
            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
    //这里有一个公平锁专门的判断hasQueuedPredecessors,这个方法判断队列是否有后继者,如果没有才会尝试去获取锁
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
        }
    

    非公平锁

        static final class NonfairSync extends Sync {
            private static final long serialVersionUID = 7316153563782823691L;
    
            /**
             * Performs lock.  Try immediate barge, backing up to normal
             * acquire on failure.
             */
            final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
    
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
        }
    //大家可以发现非公平锁的获取方法除了一个判断是否有后继者的方法,其他跟公平锁一毛一样
            final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    
    1. 等待可中断

    相关文章

      网友评论

          本文标题:Java并发编程

          本文链接:https://www.haomeiwen.com/subject/qgbfwktx.html