美文网首页
多线程学习笔记

多线程学习笔记

作者: YFaye | 来源:发表于2019-05-03 12:44 被阅读0次
    • 多线程出现目的
    • 如何使用多线程
    • 线程状态(6种)
    • 线程开启/停止
    • 线程安全
      • Volilate
      • Sychronized
        • 机制
          • 如何实现锁
          • 为什么任何一个对象都可以成为锁
          • 锁的优化
      • Lock与Synchronized区别
      • CAS
      • AQS(AbstractQueuedSychronizer)
        • ReentrantLock
          • Lock()加锁分析
          • unlock()释放锁分析
        • CountDownLatch
          • 是什么
          • 如何使用
          • 分析

    多线程出现目的

    场景:

    1. 当一个进程处理过程中,遇到网络与IO操作都会进入阻塞状态,不再处理任何东西,浪费系统资源。
    2. 一个函数的处理非常耗时,其实其中多个逻辑可以并行处理。

    多线程的面世就是要解决以上问题。

    如何使用多线程

    1. extends Thread
    public class ThreadDemo extends Thread {
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + ":" + "ThreadDemo Running");
        }
    
        public static void main(String[] args) {
            for (int i = 0; i < 10; i++) {
                new ThreadDemo().start();
            }
        }
    }
    
    1. implements Runnable
    public class RunnableDemo implements Runnable {
    
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + ":" + "RunnableDemo Running");
        }
    
        public static void main(String[] args) {
            RunnableDemo runnableDemo = new RunnableDemo();
            for (int i = 0; i < 10; i++) {
                new Thread(runnableDemo).start();
            }
        }
    }
    
    1. ExecutorService
      • Executors.newFixedThreadPool
      • Executors.newCachedThreadPool
      • Executors.newSingleThreadPool
      • Executors.newScheduledThreadPool
    public class ExecutorServiceDemo {
    
        private static ThreadPoolExecutor threadPool;
    
        private static ThreadFactory factory = new ThreadFactory() {
            private final AtomicInteger integer = new AtomicInteger();
    
            @Override
            public Thread newThread(Runnable r) {
                int threadName = integer.getAndIncrement();
                System.out.println("Created Thread:" + threadName);
                return new Thread(r, "ThreadPool Thread:" + threadName);
            }
        };
    
        private static BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(10);
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            ExecutorService executorService = Executors.newFixedThreadPool(1);
    
            threadPool = new ThreadPoolExecutor(10, 15, 1000L,
                    TimeUnit.SECONDS,
                    workQueue,
                    factory);
    
            //execute()与submit()的区别在于submit有一个Future类型的返回,
            // 实际submit是把Callable入参包装成RunnableFuture类型后再调用execute();
            for (int i = 0; i < 15; i++) {
                System.out.println("threadPool.execute");
                threadPool.execute(new RunnableDemo());
            }
    
            for (int i = 0; i < 15; i++) {
                System.out.println("threadPool.submit");
                Future<?> future = threadPool.submit(new CallableDemo());
                System.out.println(future.get());
            }
    
        }
    }
    
    1. implements Callable<>
    public class CallableDemo implements Callable<String> {
        @Override
        public String call() throws Exception {
            System.out.println(Thread.currentThread().getName() + ":" +"CallableDemo Running");
            return "Callable Result";
        }
    
        public static void main(String[] args) throws Exception {
            CallableDemo callableDemo = new CallableDemo();
            String callableReturn = callableDemo.call();
            System.out.println("callableReturn :" + callableReturn);
        }
    }
    

    Callable与Runable区别:
    Re:

    1. Callable任务线程能返回执行结果,而Runnable任务线程不能返回结果
    2. Callable能向上抛出异常,而Runnable接口异常只能内部消化

    为什么提供extends Thread又提供implements Runnable
    Re:因为JAVA不支持多继承

    线程状态(6种)

    image.png
    • NEW 调用Start方法前
    • RUNNABLE 运行
    • BLOCKED 阻塞
      • 等待阻塞 wait
      • 同步阻塞 synchronized
      • 其它阻塞 sleep/join
    • WAITING 等待
    • TIMED_WAITING 超时等待
    • TERMINATED 终止

    状态变更图示:


    image.png

    线程开启/停止

    开始:start()
    停止:interrupt()
    通过设置标志位的方式终止线程,使其能有机会去清理资源,而非暴力的方式直接kill掉,这种方式更新安全。

    public class demo4 {
        private static int num;
    
        public static void main(String[] args) throws InterruptedException {
            Thread thread = new Thread(() -> {
                while (!Thread.currentThread().isInterrupted()) {
                    num++;
                    System.out.println("Num:" + num);
                }
            });
            thread.start();
            TimeUnit.SECONDS.sleep(1);
            thread.interrupt();
        }
    }
    

    线程安全

    • 可见性
    • 原子性
    • 有序性

    Volilate

    public class VolatileDemo {
        private volatile static boolean stop = false;
    
        public static void main(String[] args) {
            Thread thread = new Thread(() -> {
                int i = 0;
                while (!stop) {
                    i++;
                    System.out.println("i: " + i);
                }
            });
    
            try {
                long startTime = System.currentTimeMillis();
                thread.start();
                System.out.println("Thread Start");
                TimeUnit.SECONDS.sleep(1);
                stop = true;
                long endTime = System.currentTimeMillis();
                System.out.println("Runtime: " + (endTime - startTime) / 1000 + " Second");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

    因为每个线程有自己私有的内存空间,修改变量需要同步回主存才能对其它线程可见,而volatile就会有哪下作用

    1. 修改volatile修饰的变量时,会强制将修改的值回写到主存。
    2. 读取volatile修饰的变量时,会强制到主存获取数据,不再到缓存读取
    3. volatile会使被volatile修饰的语句禁止指令重排序

    指令重排序实例:

    int a = 1;
    int b = 2;
    int c = 3;
    //以上例子,可能是int c = 3优先于int a = 1和int b = 2执行
    
    int a = 1;
    volatile int b = 2;
    int c = 3;
    //以上例子则volatile int b = 2一定是在int a = 1和int c = 3之间执行。
    

    问题:为什么需要编译器指令重排?
    Re:优化执行效率。

    问题:什么是CPU乱序执行?


    image.png

    问题:为什么要禁止编译器指令重排呢?
    Re:因为多线程下指令重排可能会导致处理出错,例如:

    Thread-1:
    int b = 10;
    int c = b;
    boolean flag = true;
    
    Thread-2:
    while(flag){
        System.out.println(b);
    }
    

    如果编译器把Thread-1的第3条指令重排到第一行,那Thread-2就有可能出错,因为B还没有初始化。

    问题:volilate为什么不能保证原子性
    Re:因为volilate对变量的操作在字节码层面是由多条指令组成,非原子性操作,所以它只保证了可见性,不保证原子性。


    Volilate因为只保证了Read and Load即从主存加载到工作内存时加载的值是最新的,例如:
    线程1和线程2在执行Read and Load的时候,发现主存里的值都是5,双方都加载了这个最新值,然后双方都对该值加1,再把值放回主存,事实主存值结果为6,此操作有线程安全问题。

    小结
    声明了volatile的变量进行写操作,JVM就会向处理器发送一条Lock前缀的指令,把这个变量所在的缓存行的数据写回到系统内存,再根据我们前面提到过的MESI的缓存一致性协议,来保证多CPU下的各个高速缓存中的数据的一致性。

    Sychronized

    机制

    1. 使用方法
    • 修饰实例方法
    • 修饰静态方法
    • 修饰代码块
    1. 使用Sychronized后,会通过字节码生成以下指令:
    • 修饰方法时:ACC_SYNCHRONIZED


      image.png
    • 修饰代码块:monitorenter monitorexit


    1. 获取锁情况
    • 修饰实例方法
      进行同步代码前,需要获取当前实例的锁
    • 修饰静态方法
      进行同步代码前,需要获取当前类对象的锁
    • 修饰代码块
      进行同步代码前,需要获取给定对象的锁

    如何实现锁

    本质:对象监视器的获取(独占锁)

    为什么任何一个对象都可以成为锁

    因为对象在内存中分为三块区域:对象头、实例数据、对齐填充


    image.png

    对象头:


    image.png

    而Synchroned使用的锁存在每一个对象的对象头里,其中锁标志位指向的是monitor对象(也称为管程或监视器锁)的起始地址。

    锁的优化

    锁的状态:

    • 无向锁
    • 偏向锁
      大多数情况下,锁不仅不存在多线程竞争,而且总是由同一线程多次获得。使用传统的重量级锁会有频繁锁操作,为了让线程获得锁的代价更低,引入了偏向锁,
    • 轻量锁
      当多线程竟然偏向锁时会升级为轻量级锁
    • 重量锁
      基于monitor的锁实现。

    注意:锁只能从轻到重的方向发展,不可逆。

    Lock与Synchronized区别

    1. Lock是一个接口
    2. synchronized是JVM层的一个实现
    3. synchronized是被动的触发锁机制,而Lock是可以灵活的控制,锁的创建和释放都需要人为控制,特别是异常发生的时候要注意释放锁。
    4. Lock相对来讲控制粒度更小,例如还可以分别控制读写锁
    5. Lock支持公平、非公平锁,而synchronized只支持非公平锁

    CAS

    CAS是JDK提供的Unsafe类里的一系列操作,这一系列操作由JDK来保证原子性。

        public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
    
        public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
    
        public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
    
    image.png

    Atomic一系列的对象是根据CAS的封装来实例原子性。

    AQS(AbstractQueuedSychronizer)

    AQS的关键数据结构:


    image.png

    链表的操作通过CAS原子操作来保证多线程下的原子性:

    • compareAndSetTail
        /**
         * CAS tail field. Used only by enq.
         */
        private final boolean compareAndSetTail(Node expect, Node update) {
            return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
        }
    
    • compareAndSetHead
         /**
         * CAS head field. Used only by enq.
         */
        private final boolean compareAndSetHead(Node update) {
            return unsafe.compareAndSwapObject(this, headOffset, null, update);
        }
    

    通过compareAndSwapObject这个native方法来保证链表操作的线程安全性

    ReentrantLock

    Lock()加锁分析

    image.png

    非公平锁逻辑流程图

    image.png

    公平锁与非公平锁的差异

    • 公平锁获取锁的过程
        final void lock() {
            acquire(1);
        }
        
        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
        
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            } else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    
    • 非公平锁获取锁的过程
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
     
        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                    acquireQueued(addWaiter(AbstractQueuedSynchronizer.Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
        
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
        
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            } else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        } 
    
        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }
    
        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                return true;
            if (ws > 0) {
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    

    以上可以看出,非公平锁在Lock()方法被调用时是首先尝试当前线程是否能直接获得锁,然后tryAcquire()方法的时候公平锁是需要检查AQS队列里是否有等待的节点,有的话是当前线程获取锁不成功,而非公平锁是直接CAS当前锁的状态,若通过就把锁给当前线程了。同时也可以看出双方在获取不到锁的时候,进行AQS队列方式是一样的,都是加在队尾。在加入队列后,还需要根据当前节点的前驱节点的waitStatus若是Node.SIGNAL状态判断是否需要把当前线程挂起,以省系统资源,

    unlock()释放锁分析

        public void unlock() {
            sync.release(1);
        }
    
        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
        
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
    

    可以看出,每个unlock()操作都是一个State - 1操作,直到State == 0的时候,把ExclusiveOwnerThread即当前获得锁的线程设置为null来释放锁。

    小结
    在获取锁的时候,会维护一个双向链表,用于存放获取锁失败的的线程到队列中进行自旋来获取锁,

    CountDownLatch

    是什么

    CountDownLatch是JUC中提供的一个同步工具,使用调用await()它可以使一个或者多个线程进行等待,直到其它线程执行CountDown()方法把倒数器减至0后,等待的线程才会启动。

    如何使用

    public class CountDownLatchDemo {
    
        public static void main(String[] args) throws InterruptedException {
            CountDownLatch countDownLatch = new CountDownLatch(3);
    
            new Thread(() -> {
                System.out.println(Thread.currentThread() + "执行完毕");
                countDownLatch.countDown();
            }, "Thread-1").start();
            new Thread(() -> {
                System.out.println(Thread.currentThread() + "执行完毕");
                countDownLatch.countDown();
            }, "Thread-2").start();
            new Thread(() -> {
                System.out.println(Thread.currentThread() + "执行完毕");
                countDownLatch.countDown();
            }, "Thread-3").start();
    
            countDownLatch.await();
            System.out.println("全部线程执行完毕");
        }
    }
    

    分析

    • await()
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
        
    
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted()) // 若线程中端,直接抛异常
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
    
    
    // 计数为0时,表示获取锁成功
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }
    
    // 阻塞,并入队
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED); // 入队
        boolean failed = true;
        try {
            for (;;) {
                // 获取前驱节点
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // 获取锁成功,设置队列头为node节点
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) // 线程挂起
                  && parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    1. 检查计数器是否为0,为0直接返回
    2. 计数器大于0,即当前线程需要阻塞并等待计数器变为0
    3. 当前线程需要被封装成Node对象并添加到AQS双向链表里去
    4. 最后自旋尝试获取锁,即检查计数器是否为0,获取成功即出队,然后放行当前线程
    • countDonw()
    // 计数-1
    public void countDown() {
        sync.releaseShared(1);
    }
    
    
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) { // 首先尝试释放锁
            doReleaseShared();
            return true;
        }
        return false;
    }
    
    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0) //如果计数已经为0,则返回失败
                return false;
            int nextc = c-1;
            // 原子操作实现计数-1
            if (compareAndSetState(c, nextc)) 
                return nextc == 0;
        }
    }
    
    // 唤醒被阻塞的线程
    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) { // 队列非空,表示有线程被阻塞
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) { 
                // 头结点如果为SIGNAL,则唤醒头结点下个节点上关联的线程,并出队
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head) // 没有线程被阻塞,直接跳出
                break;
        }
    }
    
    1. 尝试释放锁,即将计数器-1,并判断state是否为0,若为0即表示当前没有锁,可以开始唤醒链表中阻塞中的线程
    2. 如果链表里为空,即没有阻塞的线程,直接退出
    3. 如果头节点waitStatus为SIGNAL,就依次唤醒下个节点的线程,并出队

    相关文章

      网友评论

          本文标题:多线程学习笔记

          本文链接:https://www.haomeiwen.com/subject/unianqtx.html