多线程学习

作者: 奇点一氪 | 来源:发表于2021-05-08 10:22 被阅读0次

    多线程状态

    1. 新建(NEW):新创建了一个线程对象。

    2. 可运行(RUNNABLE):线程对象创建后,其他线程(比如main线程)调用了该对象的start()方法。该状态的线程位于可运行线程池中,等待被线程调度选中,获取cpu 的使用权 。

    3. 运行(RUNNING):可运行状态(runnable)的线程获得了cpu 时间片(timeslice) ,执行程序代码。

    4. 阻塞(BLOCKED):阻塞状态是指线程因为某种原因放弃了cpu 使用权,也即让出了cpu timeslice,暂时停止运行。直到线程进入可运行(runnable)状态,才有机会再次获得cpu timeslice 转到运行(running)状态。阻塞的情况分三种:

    (1). 等待阻塞:运行(running)的线程执行o.wait()方法,JVM会把该线程放入等待队列(waitting queue)中。
    (2). 同步阻塞:运行(running)的线程在获取对象的同步锁时,若该同步锁被别的线程占用,则JVM会把该线程放入锁池(lock pool)中。
    (3). 其他阻塞:运行(running)的线程执行Thread.sleep(long ms)或t.join()方法,或者发出了I/O请求时,JVM会把该线程置为阻塞状态。当sleep()状态超时、join()等待线程终止或者超时、或者I/O处理完毕时,线程重新转入可运行(runnable)状态。

    1. 死亡(DEAD):线程run()、main() 方法执行结束,或者因异常退出了run()方法,则该线程结束生命周期。死亡的线程不可再次复生。

    线程的状态图

    image.png

    wait 和sleep区别

    1.不同类

    wait --> object类
    sleep --> Thread类
    睡眠使用(TimeUnit)

    2.关于锁释放

    wait会释放锁;
    sleep不会释放锁;

    3.使用范围不同

    wait 必须在同步在代码块中;
    sleep 可以在任何地方睡觉;

    4.需要捕获异常

    wait 不需要;
    sleep 需要捕获异常;

    lock

    ReentrantLock
    一个可重入互斥Lock

    公平锁:十分公平,先来先用;
    非公平锁:不公平,可以插队(默认);

    使用锁;
    1.new ReentrantLock
    2.lock.lock()
    3.finally 代码块 lock.unlock();解锁

    synchronized 和lock区别

    1.synchronized 内置的Java关键字;lock 是一个java类;
    2.synchronized 无法判断获取锁的状态;lock 可以获取锁的状态;
    3.synchronized 会自动释放锁;lock 需要手动释放锁;不释放锁会造成死锁;
    4.synchronized 线程1(获取锁,阻塞)、线程2(等待);lock锁不会一直等下去,调用lock.trylock()
    尝试加锁;
    5.synchronized 可重入锁,不可以中断,非公平;lock:可重入锁,可以判断锁,非公平(可以自己设置);
    6.synchronized:适合锁少量的代码同步问题;lock:适合锁大量同步代码;

    锁是什么,如何判断锁是谁!

    生产者和消费者问题;

    虚假唤醒


    将if判断改成while循环;

    synchronized 生产者和消费者

    JUC生产者和消费者问题;

    Condition因素出Object监视器方法( waitnotifynotifyAll )成不同的对象,以得到具有多个等待集的每个对象,通过将它们与使用任意的组合的效果Lock实现。 Lock替换synchronized方法和语句的使用, Condition取代了对象监视器方法的使用。

    条件(也称为条件队列条件变量 )为一个线程暂停执行(“等待”)提供了一种方法,直到另一个线程通知某些状态现在可能为真。 因为访问此共享状态信息发生在不同的线程中,所以它必须被保护,因此某种形式的锁与该条件相关联。 等待条件的关键属性是它原子地释放相关的锁并挂起当前线程,就像Object.wait

    一个Condition实例本质上绑定到一个锁。 要获得特定Condition实例的Condition实例,请使用其newCondition()方法。

    例如,假设我们有一个有限的缓冲区,它支持puttake方法。 如果在一个空的缓冲区尝试一个take ,则线程将阻塞直到一个项目可用; 如果put试图在一个完整的缓冲区,那么线程将阻塞,直到空间变得可用。 我们希望在单独的等待集中等待put线程和take线程,以便我们可以在缓冲区中的项目或空间可用的时候使用仅通知单个线程的优化。 这可以使用两个Condition实例来实现。

    class BoundedBuffer {
       final Lock lock = new ReentrantLock();
       final Condition notFull  = lock.newCondition(); 
       final Condition notEmpty = lock.newCondition(); 
    
       final Object[] items = new Object[100];
       int putptr, takeptr, count;
    
       public void put(Object x) throws InterruptedException {
         lock.lock(); try {
           while (count == items.length)
             notFull.await();
           items[putptr] = x;
           if (++putptr == items.length) putptr = 0;
           ++count;
           notEmpty.signal();
         } finally { lock.unlock(); }
       }
    
       public Object take() throws InterruptedException {
         lock.lock(); try {
           while (count == 0)
             notEmpty.await();
           Object x = items[takeptr];
           if (++takeptr == items.length) takeptr = 0;
           --count;
           notFull.signal();
           return x;
         } finally { lock.unlock(); }
       }
     } 
    

    synchronized

    static操作的class,对类加锁
    phone操作的对象,对对象加锁

    CopyOnWriteArrayList

    是一个线程安全的变体ArrayList ,其中所有可变操作( addset ,等等)通过对底层数组的最新副本实现。

    这通常是太昂贵的,但是当遍历操作大大超过突变时,可能比替代方法有效,并且在不能或不想同步遍历时需要有用,但需要排除并发线程之间的干扰。 “快照”样式迭代器方法在创建迭代器的时候使用对数组状态的引用。 该数组在迭代器的生命周期内永远不会改变,所以干扰是不可能的,迭代器保证不会丢弃ConcurrentModificationException 。 自迭代器创建以来,迭代器将不会反映列表的添加,删除或更改。 元变化的迭代器操作本身( removesetadd )不被支持。 这些方法抛出UnsupportedOperationException

    允许所有元素,包括null

    内存一致性效果:与其他并发集合一样,在将对象放入CopyOnWriteArrayList 之后的线程中的操作,在另一个线程中从CopyOnWriteArrayList访问或删除该元素之后。

    CopyOnWriteArraySet

    一个Set使用内部CopyOnWriteArrayList其所有操作。 因此,它具有相同的基本属性:

    • 它最适合于集合大小通常保持较小,只读操作大大超过突变操作的应用程序,并且您需要防止遍历期间线程之间的干扰。
    • 它是线程安全的。
    • 可变操作( addsetremove ,等)是昂贵的,因为它们通常意味着复制整个底层数组。
    • 迭代器不支持突变remove操作。
    • 遍历遍历迭代器是快速的,不能遇到来自其他线程的干扰。 迭代器构建时迭代器依赖于数组的不变快照。

    hashSet底层

    底层hashMap,使用Map.put(e,PRESENT); PRESENT是一个固定对象;

    ConcurrentHashMap

    Callable

    package com.ws.util;
    
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.FutureTask;
    
    /**
     * @Author
     * @Description //TODO
     * @Date  2019/7/12 11:24
     * @Param
     * @return
     **/
    public class CallableTest {
        public static void main(String[] args) {
            Map<String, Object> returnMap = new HashMap<>();
            Callable<List> callable = new Callable<List>() {
                @Override
                public List call() throws Exception {
                    return  new ArrayList(); //去service 查询数据返回
                }
            };
    
            Callable<List> callable2 = new Callable<List>() {
                @Override
                public List call() throws Exception {
                    return  new ArrayList(); //去service 查询数据返回2
                }
            };
    
            Callable<List<String>> listCallable = new Callable<List<String>>() {
                @Override
                public List<String> call() throws Exception {
                    return null;
                }
            };
    
            FutureTask<List> futureTask = new FutureTask<>(callable);
            FutureTask<List> futureTask2= new FutureTask<>(callable2);
            FutureTask<List<String>> listFutureTask = new FutureTask<>(listCallable);
            Thread t1 = new Thread(futureTask); //声明线程
            Thread t2 = new Thread(futureTask2);
            Thread t3 = new Thread(listFutureTask);
            t1.start(); //开始线程
            t2.start();
            t3.start();
            try {
                List list1 = futureTask.get(); //等待结果返回,可能会产生阻塞;
                List list2 = futureTask2.get();
                List list3 = listFutureTask.get();
                returnMap.put("list1",list1); //将结果放入map中返回
                returnMap.put("list2",list2);
                returnMap.put("list3",list3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }finally {
                t1.interrupt(); //关闭线程
                t2.interrupt();
                t3.interrupt();
            }
             //返回  returnMap
        }
    }
    

    常用辅助类

    CountDownLatch

    允许一个或多个线程等待直到在其他线程中执行的一组操作完成的同步辅助。

    • A CountDownLatch用给定的计数初始化。await方法阻塞,直到由于countDown方法的调用而导致当前计数达到零,之后所有等待线程被释放,并且任何后续的await 调用立即返回。 这是一个一次性的现象 - 计数无法重置。 如果您需要重置计数的版本,请考虑使用CyclicBarrier。

      A CountDownLatch是一种通用的同步工具,可用于多种用途。 一个CountDownLatch为一个计数的CountDownLatch用作一个简单的开/关锁存器,或者门:所有线程调用await在门口等待,直到被调用countDown()的线程打开。 一个CountDownLatch初始化N可以用来做一个线程等待,直到N个线程完成某项操作,或某些动作已经完成N次。

      CountDownLatch一个有用的属性是,它不要求调用countDown线程等待计数到达零之前继续,它只是阻止任何线程通过await ,直到所有线程可以通过

    package com.fjq.thread.countdownlatch;
    
    import java.util.concurrent.CountDownLatch;
    
    /**
     * @author: fjq
     * @date: 2021/5/7 17:20
     */
    public class CountDownLatchTest {
        public static void main(String[] args) {
    
            //必须要执行任务时在使用;
            CountDownLatch countDownLatch = new CountDownLatch(6);
    
            for (int i = 1; i < 6; i++) {
                new Thread(()-> {
                    System.out.println( Thread.currentThread().getName()+"Go out");
                    countDownLatch.countDown();
                },String.valueOf(i)).start();
            }
    
            try {
                //计数器归0在向下运行
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(" Close Door");
        }
    }
    

    CyclicBarrier 栅栏/屏障 加法计数器

    允许一组线程全部等待彼此达到共同屏障点的同步辅助。 循环阻塞在涉及固定大小的线程方的程序中很有用,这些线程必须偶尔等待彼此。 屏障被称为循环 ,因为它可以在等待的线程被释放之后重新使用。

    Semaphore

    一个计数信号量。 在概念上,信号量维持一组许可证。 如果有必要,每个acquire()都会阻塞,直到许可证可用,然后才能使用它。 每个release()添加许可证,潜在地释放阻塞获取方。 但是,没有使用实际的许可证对象; Semaphore只保留可用数量的计数,并相应地执行。

    信号量通常用于限制线程数,而不是访问某些(物理或逻辑)资源。 例如,这是一个使用信号量来控制对一个项目池的访问的类:

    package com.fjq.thread.semaphore;
    
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author: fjq
     * @date: 2021/5/7 18:17
     */
    public class SemaphoreDemo {
        public static void main(String[] args) {
    
            Semaphore semaphore = new Semaphore(3);
    
            for (int i = 1; i <= 6; i++) {
                new Thread(() -> {
                    try {
                        //获得
                        semaphore.acquire();
                        System.out.println(Thread.currentThread().getName() + "抢到停车位!");
                        TimeUnit.SECONDS.sleep(2);
                        System.out.println(Thread.currentThread().getName() + "离开停车位!");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        //释放
                        semaphore.release();
                    }
                }, String.valueOf(i)).start();
            }
        }
    }
    

    原理

    ReadWiterLock

    • ReadWriteLock维护一对关联的locks ,一个用于只读操作,一个用于写入。read lock可以由多个阅读器线程同时进行,只要没有作者。write lock是独家的。

    所有ReadWriteLock实现必须保证的存储器同步效应writeLock操作(如在指定Lock接口)也保持相对于所述相关联的readLock 。 也就是说,一个线程成功获取读锁定将会看到在之前发布的写锁定所做的所有更新。

    读写锁允许访问共享数据时的并发性高于互斥锁所允许的并发性。 它利用了这样一个事实:一次只有一个线程( 写入线程)可以修改共享数据,在许多情况下,任何数量的线程都可以同时读取数据(因此读取器线程)。 从理论上讲,通过使用读写锁允许的并发性增加将导致性能改进超过使用互斥锁。 实际上,并发性的增加只能在多处理器上完全实现,然后只有在共享数据的访问模式是合适的时才可以。

    读写锁是否会提高使用互斥锁的性能取决于数据被读取的频率与被修改的频率相比,读取和写入操作的持续时间以及数据的争用 - 即是,将尝试同时读取或写入数据的线程数。 例如,最初填充数据的集合,然后经常被修改的频繁搜索(例如某种目录)是使用读写锁的理想候选。 然而,如果更新变得频繁,那么数据的大部分时间将被专门锁定,并且并发性增加很少。 此外,如果读取操作太短,则读写锁定实现(其本身比互斥锁更复杂)的开销可以支配执行成本,特别是因为许多读写锁定实现仍将序列化所有线程通过小部分代码。 最终,只有剖析和测量将确定使用读写锁是否适合您的应用程序。

    虽然读写锁的基本操作是直接的,但是执行必须做出许多策略决策,这可能会影响给定应用程序中读写锁定的有效性。 这些政策的例子包括:

    • 在写入器释放写入锁定时,确定在读取器和写入器都在等待时是否授予读取锁定或写入锁定。 作家偏好是常见的,因为写作预计会很短,很少见。 读者喜好不常见,因为如果读者经常和长期的预期,写作可能导致漫长的延迟。 公平的或“按顺序”的实现也是可能的。
    • 确定在读卡器处于活动状态并且写入器正在等待时请求读取锁定的读取器是否被授予读取锁定。 读者的偏好可以无限期地拖延作者,而对作者的偏好可以减少并发的潜力。
    • 确定锁是否可重入:一个具有写锁的线程是否可以重新获取? 持有写锁可以获取读锁吗? 读锁本身是否可重入?
    • 写入锁可以降级到读锁,而不允许插入写者? 读锁可以升级到写锁,优先于其他等待读者或作者吗?

    在评估应用程序的给定实现的适用性时,应考虑所有这些问题。

    package com.fjq.thread.readwritelock;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.locks.ReadWriteLock;
    import java.util.concurrent.locks.ReentrantReadWriteLock;
    
    /**
     * @author: fjq
     * @date: 2021/5/8 9:53
     */
    public class ReadWirteLockDemo {
    
        public static void main(String[] args) {
    
            MyCacheLock myCacheLock = new MyCacheLock();
    
            for (int i = 1; i <= 6; i++) {
                final int temp = i;
                new Thread(() -> {
                    myCacheLock.put(temp + "", temp + "");
                }).start();
            }
    
    
            for (int i = 1; i <= 6; i++) {
                final int temp = i;
                new Thread(() -> {
                    myCacheLock.get(temp + "");
                }).start();
            }
    
        }
    }
    
    class MyCacheLock {
    
        private volatile Map<String, Object> map = new HashMap<>();
    
        private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    
        public void put(String key, Object value) {
            readWriteLock.writeLock().lock();
            try {
                System.out.println(Thread.currentThread().getName() + "写入" + key);
                map.put(key, value);
                System.out.println(Thread.currentThread().getName() + "写入OK");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                readWriteLock.writeLock().unlock();
            }
        }
    
    
        public Object get(String key) {
            readWriteLock.readLock().lock();
            Object o = null;
            try {
                System.out.println(Thread.currentThread().getName() + "读取" + key);
                o = map.get(key);
                System.out.println(Thread.currentThread().getName() + "读取OK");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                readWriteLock.readLock().unlock();
            }
            return o;
        }
    }
    

    BlockingQueue 阻塞队列

    多线程并发处理,线程池;



    Queue 另外支持在检索元素时等待队列变为非空的操作,并且在存储元素时等待队列中的空间变得可用。
    BlockingQueue方法有四种形式,具有不同的操作方式,不能立即满足,但可能在将来的某个时间点满足:

    • 第一个抛出异常;
    • 第二个返回一个特殊值( null或false ,具体取决于操作);
    • 第三个程序将无限期地阻止当前线程,直到操作成功为止;
    • 第四个程序块在放弃之前只有给定的最大时限;
      这些方法总结在下表中:


    BlockingQueue不接受null元素。 实现抛出NullPointerException上尝试add , put或offer一个null 。null用作哨兵值以指示poll操作失败。

    BlockingQueue可能是容量有限的。 在任何给定的时间它可能有一个remainingCapacity超过其中没有额外的元素可以put没有阻止。 没有任何内在容量限制的A BlockingQueue总是报告剩余容量为Integer.MAX_VALUE 。
    BlockingQueue实现被设计为主要用于生产者 - 消费者队列,但另外支持Collection接口。 因此,例如,可以使用remove(x)从队列中删除任意元素。 然而,这样的操作通常不能非常有效地执行,并且仅用于偶尔使用,例如当排队的消息被取消时。
    BlockingQueue实现是线程安全的。 所有排队方法使用内部锁或其他形式的并发控制在原子上实现其效果。 然而, 大量的Collection操作addAll , containsAll , retainAll和removeAll 不一定原子除非在实现中另有规定执行。 因此有可能,例如,为addAll(c)到只增加一些元件在后失败(抛出异常) c 。
    BlockingQueue上不支持任何类型的“关闭”或“关闭”操作,表示不再添加项目。 这些功能的需求和使用往往依赖于实现。 例如,一个常见的策略是生产者插入特殊的尾流或毒物 ,这些消费者在被消费者摄取时被相应地解释。

    ArrayBlockingQueue

    LinkedBlockingQueue

    SynchronousQueue

    其中每个插入操作必须等待另一个线程相应的删除操作,反之亦然。 同步队列没有任何内部容量,甚至没有一个容量。 你不能peek在同步队列,因为一个元素,当您尝试删除它才存在; 您无法插入元素(使用任何方法),除非另有线程正在尝试删除它; 你不能迭代,因为没有什么可以迭代。 队列的头部是第一个排队的插入线程尝试添加到队列中的元素; 如果没有这样排队的线程,那么没有元素可用于删除,并且poll()将返回null 。 为了其他Collection方法(例如contains )的目的, SynchronousQueue充当空集合。 此队列不允许null元素。

    线程池

    三大方法、7大参数、4中拒接策略;

    线程池好处:
    1.降低资源的消耗;
    2.提高响应速度;
    3.方便管理;

    线程服用,可以控制最大并发数,管理线程

    三大方法

       ExecutorService pool = Executors.newSingleThreadExecutor(); //创建一个线程
       ExecutorService pool =Executors.newFixedThreadPool(5);    //创建固定线程
       ExecutorService pool = Executors.newCachedThreadPool();  //创建一个伸缩的线程
    
        public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
        }
    
    public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
        }
    
    public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
        }
    

    ThreadPoolExecutor 7个参数

    • corePoolSize:核心线程数
    • maximumPoolSize:最大线程数
    • keepAliveTime:超时了没有人调用就会释放
    • unit:超时单位
    • workQueue:阻塞队列
    • threadFactory 线程工厂,创建线程
    • handler 拒绝策略
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    

    四种拒绝策略

    new ThreadPoolExecutor.AbortPolicy(); 队列满了,在有进入的直接抛出个RejectedExecutionException异常,也不执行这个任务了。
    new ThreadPoolExecutor.CallerRunsPolicy(); 队列满了,在任务被拒绝添加后,会调用当前线程池的所在的线程去执行被拒绝的任务。
    new ThreadPoolExecutor.DiscardPolicy(); 队列满了,不会抛异常也不会执行 ,直接丢掉任务;
    new ThreadPoolExecutor.DiscardOldestPolicy(); 队列满了,尝试去和最早的竞争,不抛异常;

    最大线程如何设置:

      1. CPU密集型:指的是系统的磁盘读写效率高于cpu效率
      1. IO密集型:磁盘读写频繁,cpu等待io执行导致cpu使用率不高;
        设置最大线程数个数;
        如果是CPU密集型的,线程数量一般为CPU的核数+1;
        如果是IO密集型:可多分配一点 cpu核数*2
        也可以使用公式:CPU 核数 / (1 - 阻塞系数);其中阻塞系数 在 0.8 ~ 0.9 之间。

    四大函数式接口

    • 函数式接口;
      1.有且只有一个方法抽象方法的接口,可以有其他的方法.
      格式;interface name{
      //有且只有一个抽象方法
      //可以有其它默认方法,静态方法,私有方法…}
      2.@FunctionalInterface注解;
      在定义"函数式接口"时,为防止发生定义错误.可以使用@FunctionalInterface注解,强制按照"函数式接口"的语法检查,如果语法错误.编译器将会方法错误.
      @FunctionalInterface
      public interface Runnable {
      public abstract void run();
      }
      3.自定义函数式接口DEMO;
    /** 函数式接口
     * @author: fjq
     * @date: 2021/5/8 17:45
     */
    public class demo1 {
        public static void main(String[] args) {
    //        Function<String, String> function = new Function<String, String>() {
    //            @Override
    //            public String apply(String s) {
    //                return "返回"+s;
    //            }
    //        };
            Function function = (str) -> {return  "返回"+str;};
            System.out.println(function.apply("+入参"));
        }
    }
    
    • 链式编程;
    • lambda表达式;
    • Stream流式计算;
    List<Long> userIdList = new ArrayList<>();
    for (User user: list) {
         userIdList.add(user.id);
    }
    

    在1.8有了lambda表达式以后,我们会这样写:

    List<Long> userIdList = new ArrayList<>();
    list.forEach(user -> list.add(user.id));
    

    在有了stream之后,我们还可以这样写:

    List<Long> userIdList = list.stream().map(User::getId).collect(Collectors.toList());
    

    ForkJoin

    在jdk1.7,并行执行任务,提高效率,大数据量;


    public class Main {
        public static void main(String[] args) throws Exception {
            // 创建2000个随机数组成的数组:
            long[] array = new long[2000];
            long expectedSum = 0;
            for (int i = 0; i < array.length; i++) {
                array[i] = random();
                expectedSum += array[i];
            }
            System.out.println("Expected sum: " + expectedSum);
            // fork/join:
            ForkJoinTask<Long> task = new SumTask(array, 0, array.length);
            long startTime = System.currentTimeMillis();
            Long result = ForkJoinPool.commonPool().invoke(task);
            long endTime = System.currentTimeMillis();
            System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms.");
        }
    
        static Random random = new Random(0);
    
        static long random() {
            return random.nextInt(10000);
        }
    }
    
    class SumTask extends RecursiveTask<Long> {
        static final int THRESHOLD = 500;
        long[] array;
        int start;
        int end;
    
        SumTask(long[] array, int start, int end) {
            this.array = array;
            this.start = start;
            this.end = end;
        }
    
        @Override
        protected Long compute() {
            if (end - start <= THRESHOLD) {
                // 如果任务足够小,直接计算:
                long sum = 0;
                for (int i = start; i < end; i++) {
                    sum += this.array[i];
                    // 故意放慢计算速度:
                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                    }
                }
                return sum;
            }
            // 任务太大,一分为二:
            int middle = (end + start) / 2;
            System.out.println(String.format("split %d~%d ==> %d~%d, %d~%d", start, end, start, middle, middle, end));
            SumTask subtask1 = new SumTask(this.array, start, middle);
            SumTask subtask2 = new SumTask(this.array, middle, end);
            invokeAll(subtask1, subtask2);
            Long subresult1 = subtask1.join();
            Long subresult2 = subtask2.join();
            Long result = subresult1 + subresult2;
            System.out.println("result = " + subresult1 + " + " + subresult2 + " ==> " + result);
            return result;
        }
    }
    
    Expected sum: 9788366 
    split 0~2000 ==> 0~1000, 1000~2000 
    split 0~1000 ==> 0~500, 500~1000 
    split 1000~2000 ==> 1000~1500, 1500~2000 
    result = 2485485 + 2491717 ==> 4977202 
    result = 2391591 + 2419573 ==> 4811164 
    result = 4811164 + 4977202 ==> 9788366 
    Fork/join sum: 9788366 in 1135 ms. 
    

    ForkJoin特点:工作窃取

    volatile

    是java虚拟机提供的轻量级的同步机制
    1.保证可见性
    2.保证原子性
    3.禁止指令重排

    线程加锁前从主内存中读取变量到工作内从中,


    volatile:可以保证可见性,不能保证原子性,由于内存屏障,可以保证指令重排现象;

    CAS 比较并交换

    compareAndSet(int expect, int update)比较当前工作内存的值==`为预期值,则将该值原子设置为给定的更新值,如果不是期望值,就一会循环。
    好处:自带原子性;
    缺点:
    1.循环会耗时;
    2.一次性只能保证一个共享变量 的愿你性;
    3.ABA问题

    • 参数:
    • expect - 预期值
    • update - 新价值
    • 结果:
      true如果成功。 False return表示实际值不等于预期值。
      java 无法操作内存,java可以调用c++的native方法操作内存,java的后门,可以已通过Unsafe类操作内存;

    Unsafe类

    ABA问题

    原子引用

    公平锁/非公平锁

    公平锁:公平,不能够插队;
    非公平锁:不公平。能够插队

    ReentrantLock可重入锁

    java除了使用关键字synchronized外,还可以使用ReentrantLock实现独占锁的功能。而且ReentrantLock相比synchronized而言功能更加丰富,使用起来更为灵活,也更适合复杂的并发场景。

    ReentrantLock常常对比着synchronized来分析,我们先对比着来看然后再一点一点分析。
    (1)synchronized是独占锁,加锁和解锁的过程自动进行,易于操作,但不够灵活。ReentrantLock也是独占锁,加锁和解锁的过程需要手动进行,不易操作,但非常灵活。
    (2)synchronized可重入,因为加锁和解锁自动进行,不必担心最后是否释放锁;ReentrantLock也可重入,但加锁和解锁需要手动进行,且次数需一样,否则其他线程无法获得锁。
    (3)synchronized不可响应中断,一个线程获取不到锁就一直等着;ReentrantLock可以相应中断。

    ReentrantLock好像比synchronized关键字没好太多,我们再去看看synchronized所没有的,一个最主要的就是ReentrantLock还可以实现公平锁机制。什么叫公平锁呢?也就是在锁上等待时间最长的线程将获得锁的使用权。通俗的理解就是谁排队时间最长谁先执行获取锁。

    1、简单使用
    我们先给出一个最基础的使用案例,也就是实现锁的功能


    在这里我们定义了一个ReentrantLock,然后再test方法中分别lock和unlock,运行一边就可以实现我们的功能。这就是最简单的功能实现,代码很简单。我们再看看ReentrantLock和synchronized不一样的地方,那就是公平锁的实现。

    2、公平锁实现
    对于公平锁的实现,就要结合着我们的可重入性质了。公平锁的含义我们上面已经说了,就是谁等的时间最长,谁就先获取锁。


    首先new一个ReentrantLock的时候参数为true,表明实现公平锁机制。在这里我们多定义几个线程ABCDE,然后再test方法中循环执行了两次加锁和解锁的过程。


    3、非公平锁实现
    非公平锁那就随机的获取,谁运气好,cpu时间片轮到哪个线程,哪个线程就能获取锁,和上面公平锁的区别很简单,就在于先new一个ReentrantLock的时候参数为false,当然我们也可以不写,默认就是false。直接测试一下


    4、响应中断
    响应中断就是一个线程获取不到锁,不会傻傻的一直等下去,ReentrantLock会给予一个中断回应。在这里我们举一个死锁的案例。
    首先我们定义一个测试类ReentrantLockTest3。


    在这里我们定义了两个锁lock1和lock2。然后使用两个线程thread和thread1构造死锁场景。正常情况下,这两个线程相互等待获取资源而处于死循环状态。但是我们此时thread中断,另外一个线程就可以获取资源,正常地执行了。

    5、限时等待

    这个是什么意思呢?也就是通过我们的tryLock方法来实现,可以选择传入时间参数,表示等待指定的时间,无参则表示立即返回锁申请的结果:true表示获取锁成功,false表示获取锁失败。我们可以将这种方法用来解决死锁问题。

    首先还是测试代码,不过在这里我们不需要再去中断其中的线程了,我们直接看线程类是如何实现的。


    在这个案例中,一个线程获取lock1时候第一次失败,那就等10毫秒之后第二次获取,就这样一直不停的调试,一直等到获取到相应的资源为止。

    当然,我们可以设置tryLock的超时等待时间tryLock(long timeout,TimeUnit unit),也就是说一个线程在指定的时间内没有获取锁,那就会返回false,就可以再去做其他事了。

    自选锁

    死锁

    所谓死锁,是指多个进程在运行过程中因争夺资源而造成的一种僵局,当进程处于这种僵持状态时,若无外力作用,它们都将无法再向前推进。 因此我们举个例子来描述,如果此时有一个线程A,按照先锁a再获得锁b的的顺序获得锁,而在此同时又有另外一个线程B,按照先锁b再锁a的顺序获得锁。如下图所示:



    产生死锁的原因?
    可归结为如下两点:

      1. 竞争资源
    • 系统中的资源可以分为两类:
      (1).可剥夺资源,是指某进程在获得这类资源后,该资源可以再被其他进程或系统剥夺,CPU和主存均属于可剥夺性资源;
      (2).另一类资源是不可剥夺资源,当系统把这类资源分配给某进程后,再不能强行收回,只能在进程用完后自行释放,如磁带机、打印机等。

    • 产生死锁中的竞争资源之一指的是竞争不可剥夺资源(例如:系统中只有一台打印机,可供进程P1使用,假定P1已占用了打印机,若P2继续要求打印机打印将阻塞)

    • 产生死锁中的竞争资源另外一种资源指的是竞争临时资源(临时资源包括硬件中断、信号、消息、缓冲区内的消息等),通常消息通信顺序进行不当,则会产生死锁

    • b. 进程间推进顺序非法

    • 若P1保持了资源R1,P2保持了资源R2,系统处于不安全状态,因为这两个进程再向前推进,便可能发生死锁

    • 例如,当P1运行到P1:Request(R2)时,将因R2已被P2占用而阻塞;当P2运行到P2:Request(R1)时,也将因R1已被P1占用而阻塞,于是发生进程死锁

    死锁产生的4个必要条件?

    产生死锁的必要条件:
    1.互斥条件:进程要求对所分配的资源进行排它性控制,即在一段时间内某资源仅为一进程所占用。
    2.请求和保持条件:当进程因请求资源而阻塞时,对已获得的资源保持不放。
    3.不剥夺条件:进程已获得的资源在未使用完之前,不能剥夺,只能在使用完时由自己释放。
    4.环路等待条件:在发生死锁时,必然存在一个进程--资源的环形链。

    解决死锁的基本方法

    预防死锁:

    • 资源一次性分配:一次性分配所有资源,这样就不会再有请求了:(破坏请求条件)
    • 只要有一个资源得不到分配,也不给这个进程分配其他的资源:(破坏请保持条件)
    • 可剥夺资源:即当某进程获得了部分资源,但得不到其它资源,则释放已占有的资源(破坏不可剥夺条件)
    • 资源有序分配法:系统给每类资源赋予一个编号,每一个进程按编号递增的顺序请求资源,释放则相反(破坏环路等待条件)
      1、以确定的顺序获得锁

    如果必须获取多个锁,那么在设计的时候需要充分考虑不同线程之前获得锁的顺序。按照上面的例子,两个线程获得锁的时序图如下:



    如果此时把获得锁的时序改成:



    那么死锁就永远不会发生。 针对两个特定的锁,开发者可以尝试按照锁对象的hashCode值大小的顺序,分别获得两个锁,这样锁总是会以特定的顺序获得锁,那么死锁也不会发生。问题变得更加复杂一些,如果此时有多个线程,都在竞争不同的锁,简单按照锁对象的hashCode进行排序(单纯按照hashCode顺序排序会出现“环路等待”),可能就无法满足要求了,这个时候开发者可以使用银行家算法,所有的锁都按照特定的顺序获取,同样可以防止死锁的发生,该算法在这里就不再赘述了,有兴趣的可以自行了解一下。

    2、超时放弃

    当使用synchronized关键词提供的内置锁时,只要线程没有获得锁,那么就会永远等待下去,然而Lock接口提供了boolean tryLock(long time, TimeUnit unit) throws InterruptedException方法,该方法可以按照固定时长等待锁,因此线程可以在获取锁超时以后,主动释放之前已经获得的所有的锁。通过这种方式,也可以很有效地避免死锁。 还是按照之前的例子,时序图如下:

    避免死锁:

    • 预防死锁的几种策略,会严重地损害系统性能。因此在避免死锁时,要施加较弱的限制,从而获得 较满意的系统性能。由于在避免死锁的策略中,允许进程动态地申请资源。因而,系统在进行资源分配之前预先计算资源分配的安全性。若此次分配不会导致系统进入不安全的状态,则将资源分配给进程;否则,进程等待。其中最具有代表性的避免死锁算法是银行家算法。
    • 银行家算法:首先需要定义状态和安全状态的概念。系统的状态是当前给进程分配的资源情况。因此,状态包含两个向量Resource(系统中每种资源的总量)和Available(未分配给进程的每种资源的总量)及两个矩阵Claim(表示进程对资源的需求)和Allocation(表示当前分配给进程的资源)。安全状态是指至少有一个资源分配序列不会导致死锁。当进程请求一组资源时,假设同意该请求,从而改变了系统的状态,然后确定其结果是否还处于安全状态。如果是,同意这个请求;如果不是,阻塞该进程知道同意该请求后系统状态仍然是安全的。

    检测死锁

    • 首先为每个进程和每个资源指定一个唯一的号码;
    • 然后建立资源分配表和进程等待表。

    解除死锁:

    当发现有进程死锁后,便应立即把它从死锁状态中解脱出来,常采用的方法有:

    • 剥夺资源:从其它进程剥夺足够数量的资源给死锁进程,以解除死锁状态;
    • 撤消进程:可以直接撤消死锁进程或撤消代价最小的进程,直至有足够的资源可用,死锁状态.消除为止;所谓代价是指优先级、运行代价、进程的重要性和价值等。

    死锁检测

    1、Jstack命令

    jstack是java虚拟机自带的一种堆栈跟踪工具。jstack用于打印出给定的java进程ID或core file或远程调试服务的Java堆栈信息。 Jstack工具可以用于生成java虚拟机当前时刻的线程快照。线程快照是当前java虚拟机内每一条线程正在执行的方法堆栈的集合,生成线程快照的主要目的是定位线程出现长时间停顿的原因,如线程间死锁、死循环、请求外部资源导致的长时间等待等。 线程出现停顿的时候通过jstack来查看各个线程的调用堆栈,就可以知道没有响应的线程到底在后台做什么事情,或者等待什么资源。

    2、JConsole工具

    Jconsole是JDK自带的监控工具,在JDK/bin目录下可以找到。它用于连接正在运行的本地或者远程的JVM,对运行在Java应用程序的资源消耗和性能进行监控,并画出大量的图表,提供强大的可视化界面。而且本身占用的服务器内存很小,甚至可以说几乎不消耗。

    死锁参考博文:
    https://blog.csdn.net/jonnyhsu_0913/article/details/79633656
    https://blog.csdn.net/sinat_21043777/article/details/53457233
    https://blog.csdn.net/wsq119/article/details/82218911

    相关文章

      网友评论

        本文标题:多线程学习

        本文链接:https://www.haomeiwen.com/subject/lmdddltx.html