美文网首页
java基础-多线程

java基础-多线程

作者: 巨子联盟 | 来源:发表于2018-07-23 14:24 被阅读0次
    • java线程池的实现 ThreadPoolExecutor
    • java线程池几个参数
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler)
    
    1. corePoolSize
      当线程数没达到 corePoolSize时就创建新的线程,直到创建到corePoolSize就会用队列,队列满了就会创建新的线程直到maximumPoolSize,详见下面 线程池任务提交的判断逻辑
    2. keepAliveTime
      当线程池的线程数超过corePoolSize时,如果没有新的任务提交会等待keepAliveTime后才销毁
    3. ThreadFactory
      创建新线程,默认使用Executors.defaultThreadFactory()创建,使用默认的ThreadFactory来创建线程时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称。
    4. BlockingQueue
    Queue名称 是否有界 是否阻塞 是否有优先级 executors 其他说明
    ArrayBlockingQueue 有界 阻塞 无优先级 FIFO,底层是个Object数组
    LinkedBlockingQueue 无界 阻塞 newFixedThreadPool,
    newSingleThreadExecutor
    FIFO,比ArrayBlockingQueue吞吐量高
    SynchronousQueue 不存储元素,等上一个元素移除才能插入 阻塞 newCachedThreadPool 吞吐量比LinkedBlockingQueue通常要高
    PriorityBlockingQueue 无界 阻塞 底层是数组,但是会自动增
    1. RejectedExecutionHandler 见下方
    • 溢出的时候的抛弃策略 RejectedExecutionHandler
    1. AbortPolicy 直接抛异常
    2. CallerRunsPolicy在任务被拒绝添加后,会调用当前线程池的所在的线程去执行被拒绝的任务
    3. DiscardPolicy 什么也不干,空方法.
    4. DiscardOldestPolicy 当任务拒绝添加时,会抛弃任务队列中最旧的任务也就是最先加入队列的,再把这个新任务添加进去。
    • 线程池任务提交的判断逻辑

    当有新任务在execute()方法提交时,会执行以下判断:

    1. 如果运行的线程少于 corePoolSize,则创建新线程来处理任务,即使线程池中的其他线程是空闲的;
    2. 如果线程池中的线程数量大于等于 corePoolSize 且小于 maximumPoolSize,则只有当workQueue满时才创建新的线程去处理任务;
    3. 如果设置的corePoolSize 和 maximumPoolSize相同,则创建的线程池的大小是固定的,这时如果有新任务提交,若workQueue未满,则将请求放入workQueue中,等待有空闲的线程去从workQueue中取任务并处理;
    4. 如果运行的线程数量大于等于maximumPoolSize,这时如果workQueue已经满了,则通过handler所指定的策略来处理任务;
      所以,任务提交时,判断的顺序为 corePoolSize --> workQueue --> maximumPoolSize
    1. 用法区别
      synchronized:在需要同步的对象中加入此控制,synchronized可以加在方法上,也可以加在特定代码块中,括号中表示需要锁的对象。
      lock:需要显示指定起始位置和终止位置。一般使用ReentrantLock类做为锁,多个线程中必须要使用一个ReentrantLock类做为对象才能保证锁的生效。且在加锁和解锁处需要通过lock()和unlock()显示指出。所以一般会在finally块中写unlock()以防死锁。
    2. 性能区别
      synchronized是托管给JVM执行,lock是java写的;在JDK1.5中是性能低下的,因为这个是个重量级操作,有可能加锁时间比执行的时间还长;但是到了JDK1.6以后就有很大不同了,因为synchronized的语义很清晰,可以进行很多优化,比如:适应性自旋(如果上一次失败了,下一次就会少自旋几次),锁消除,轻量级锁,偏向锁等;synchronized原始采用的是CPU悲观锁机制,即线程获得的是独占锁,都赞锁意味着其他线程只能依靠线程阻塞来等待锁释放,而CPU在转换线程阻塞时会引起上下文切换,当有很多线程竞争时,上下文切换回导致效率很低.
      而Lock是用乐观锁的方式,采用的是CAS操作,性能相对较高.
    3. 用途区别
      复杂用途时用ReentrantLock,比如:
      1.某个线程在等待一个锁的控制权的这段时间需要中断,lock.lockInterruptibly()能有效的响应中断
      2.需要分开处理一些wait-notify,ReentrantLock里面的Condition应用,能够控制notify哪个线程
      3.具有公平锁功能,每个到来的线程都将排队等候
      4.尝试非阻塞式的获取锁,tryLock
    • java跳表的并发问题
    • 什么是跳表SkipList
      SkipList是基于链表的数据结构,而且是有层次的链表结构,是有序的,默认按照key值升序
      如图:

      image.png
      普通的链表搜索时时间复杂度是O(N),跳表的按层次进行,一层一层的搜索.时间复杂度为O(logn)
      当需要添加一个元素的时候,会先按照上面的方法很容易找到应该加入的位置,然后在用一种随机的算法来确定层次,再重新调整不同层次的链表,如图:
      image.png
      java中提供了跳表的两种实现,ConcurrentSkipListMap 和 ConcurrentSkipListSet
    • ConcurrentSkipListMap
      内部有3个比较重要的结构:Node,Index,HeadIndex;Node表示最底层的单链表有序节点、Index表示为基于Node的索引层,HeadIndex用来维护索引层次;ConcurrentSkipListMap是通过HeadIndex维护索引层次,通过Index从最上层开始往下层查找,一步一步缩小查询范围,最后到达最底层Node时,就只需要比较很小一部分数据了.

    • 容器

    线程安全容器 说明 线程不安全容器 说明
    CopyOnWriteArrayList volatile修饰数组,修改时同步并复制;
    只适合小数据量的,大数据量复制效率超低
    ArrayList
    CopyOnWriteArraySet 内部包含了一个CopyOnWriteArrayList HashSet
    ConcurrentSkipListSet 用ConcurrentSkipListMap实现 TreeSet
    ConcurrentHashMap HashMap
    ConcurrentSkipListMap TreeMap 红黑树实现排序
    ConcurrentLinkedQueue 无界队列,支持FIF
    ConcurrentLinkedDeque 无界双端队列,支持FIFO和FILO。
    ArrayBlockingQueue 数组实现的 线程安全的 有限 阻塞队列
    LinkedBlockingQueue 单链表实现的、线程安全的、无限 阻塞队列
    LinkedBlockingDeque 双向链表实现的、线程安全的、 双端 无限 阻塞队列

    并发容器大合集-腾讯

    • 怎么设计线程池的大小
    • 影响因素: 负载特性以及底层硬件;任务阻塞的频率.
      1. 任务的性质:IO密集型,CPU密集型,混合型.
      2. 任务的优先级: 高中低
      3. 任务的平均执行时间:长,中,短
      4. 任务的依赖性:是否依赖其他系统资源,如数据库连接等。
    线程数
    CPU密集型线程数 = CPU 数+1,设置尽可能小的线程数,和CPU数一样就可以了,减少线程上下文的切换
    IO密集型线程数 = 2*CPU数 +1 ,尽可能大的线程数,不要让CPU闲下来
    混合型= 等待资源的时间越长,线程数应该设置的越大,不要让 CPU闲下来
    • 公式
    最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目 =(线程等待时间/线程CPU时间 + 1)* CPU数目
    
    场景 设置方法
    高并发,任务执行时间短 设置为CPU数+1,减少上下文切换
    低并发,任务执行时间长 1.CPU密集型:线程数 = CPU数+1
    2.IO密集型:适当加大线程数以免CPU闲下来
    高并发,任务执行时间长 解决这种类型任务的关键不在于线程池而在于整体架构的设计.主要方向为减少任务执行时间,比如
    1.是否可以利用缓存;2.增加服务器;3.能否用消息中间件进行解耦
    • 应用KISS原则(Keep it simple,stupid),可以将ThreadPoolExecutor的核心线程数和最大线程数设置成一样,在选择队列方面,如果适合使用无界任务列表就用LinkedBlockingQueue,如果适合有界的就用ArrayBlockingQueue

    • AbstractQueuedSynchronizer剖析

    队列同步器AbstractQueuedSynchronizer 是用来构建锁和其他同步组件的框架,使用了一个int成员变量private volatile int state表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作
    同步器的三个操作状态的方法分别是:getState(),setState(int newState),compareAndSetState(int expect,int update)

    1. 同步队列

    一个FIFO的双向队列

    2. 独占式同步状态获取与释放
    3. 共享式同步状态获取与释放
    4. 超时获取同步状态
    • ReentrantLock剖析
    • 死锁
    • 例子

    package com.byedbl.lock;
    
    public class DeadLockDemo {
    
        private String A = "a";
        private String B = "b";
        public static void main(String[] args) {
            new DeadLockDemo().deadlock();
        }
    
        private void deadlock() {
            Thread t1 = new Thread(new Runnable() {
                @Override
                public void run() {
                    synchronized (A) {
                        try {
                            Thread.sleep(2000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        synchronized (B) {
                            System.out.println("t1");
                        }
    
                    }
                }
            });
            Thread t2 = new Thread(new Runnable() {
                @Override
                public void run() {
                    synchronized (B) {
                        synchronized (A) {
                            System.out.println("t2");
                        }
                    }    
                }
            });
    
            t1.start();
            t2.start();
        }
    }
    
    

    运行发现一直在跑,没打印任何东西出来,查看dump文件有如下内容:


    image.png
    • 怎么避免
      1.避免一个线程同时获取多个锁
      2.避免一个线程在锁内同时占用多个资源,尽量保证每个锁只占一个资源
      3.尝试使用定时锁,lock.tryLock(timeout)来替代使用内部锁
      4.对于数据库锁,加锁和解锁必须在数据库连接内,否则会出现解锁失败的情况.

    多线程的问题
    1.上下文切换
    线程让出CPU时间片会产生上下文切换;
    解决方案:
    1)无锁编程,数据hash(id)取模分段处理.
    2)用CAS,Atomic包就是CAS算法.
    3)合理的创建线程,不要创建过多的线程.

    Synchronize 关键字原理:
    有三种使用方式:

    1. 同步方法,锁的是当前的实例对象
    2. 同步静态方法,锁的是class类对象
    3. 同步代码块,锁的是{}代码中的对象
      实现原理就是 JVM通过在进入退出对象监视器Monitor来实现对方法,同步块的同步的.没有获取到Monitor的就进入等待队列.

    锁优化:
    synchronize为重量级锁
    轻量锁:
    认为大多数锁在整个同步周期都不存在竞争,所以使用CAS比使用互斥开销更小,但是如果竞争激烈,轻量锁就不仅有互斥开销,而且有CAS开销,效率更差.
    偏向锁:
    进一步降低了获取锁的代价,偏向锁可以提高带有同步却没有竞争的程序性能,但如果程序中大多数锁都存在竞争时,那偏向锁就起不到太大作用.
    适应性自旋:
    自旋耗CPU,所以加入了适应性自旋,下次就会减少自旋.


    volatile 可以保证可见性和顺序性.
    JVM会重排指令,所以双重检查锁机制会加个volatile关键字.


    锁:
    重入锁 ReentrantLock :每次获取锁时会判断当前线程是否为获取锁的线程,如果是就状态+1,释放时是将状态-1,只有将同步状态为0时才会释放锁.
    AQS(AbstractQueuedSynchronizer)
    ReentrantLock分公平锁和非公平锁,公平锁要关心队列情况效率更慢
    读写锁:
    ReentrantReadWriteLock,同时维护一堆锁,读锁和写锁,当写线程时则其他锁都将阻塞,读时就不会.这样可大大增加吞吐量和并发量.
    分布式锁
    1)基于数据库主键
    2)基于数据库for update
    3)基于Redis的 setNX 和 setEX(timeout)
    4)基于ZK


    • 什么时候用线程池
    image.png
    • 手写线程池
    1. 创建线程池接口 ThreadPool
    package com.byedbl.threadpool;
    
    import java.util.List;
    
    public interface ThreadPool {
        //执行一个Runnable类型的任务
        void execute(Runnable task);
        void execute(Runnable[] tasks);
        void execute(List<Runnable> tasks);
        
        //返回已经执行任务的个数  executor执行者 task任务
        int getExecuteTaskNumber();
        
        //返回任务队列的长度,即还没处理的任务个数  wait等待
        int getWaitTaskNumber();
        
        //返回工作线程的个数
        int getWorkThreadNumber();
        //关闭线程池
        void destroy();
    }
    
    
    1. 实现线程池接口
    package com.byedbl.threadpool;
    
    import java.util.List;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingDeque;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.concurrent.atomic.AtomicLong;
    public class ThreadPoolManager implements ThreadPool {
        //线程池中默认的线程个数为5
        private static  int workerNum=5;
        //工作线程组
        WorkThread[] workThreads;
        //执行任务的数量 volatile
        private static AtomicInteger executeTaskNumber= new AtomicInteger(0);
        //任务队列,作为一个缓冲,List线程不安全(阻塞队列)
        private BlockingQueue<Runnable> taskQueue=new LinkedBlockingDeque<Runnable>();
        private static ThreadPoolManager threadPool;
        //可以用原子方式更新的 long 值(原子类)
        private AtomicLong threadNum=new AtomicLong();
        
        private ThreadPoolManager(){
            this(workerNum);//创建默认线程个数的线程池
        }
        
        private ThreadPoolManager(int worker_Num){
            ThreadPoolManager.workerNum=worker_Num;
            workThreads=new WorkThread[worker_Num];
            for(int i=0;i<worker_Num;i++){
                workThreads[i]=new WorkThread();//初始化(调用父类Thread类的无参构造方法,分配新的Thread对象)
                System.out.println("线程池中的工作线程数量:"+(i+1)+" 当前线程的名称是:"+workThreads[i].getName());
                workThreads[i].start();//一个native方法,它将启动一个新线程,并执行run()方法
            }
        }
        public static ThreadPool getThreadPool(){//获得默认线程个数的线程池
             return getThreadPool(ThreadPoolManager.workerNum);
        }
        //单例模式
        public static ThreadPool getThreadPool(int worker_Num){
            if(worker_Num<=0){
                worker_Num=ThreadPoolManager.workerNum;
            }
            if(threadPool==null){
                threadPool=new ThreadPoolManager(worker_Num);
            }
            return threadPool;
        }
         // 执行任务,其实只是把任务加入任务队列,什么时候执行有线程池管理器觉定  
        public void execute(Runnable task) {  
            synchronized (taskQueue) {  
                taskQueue.add(task);  
                taskQueue.notifyAll();  
            }  
        }  
      
        // 批量执行任务,其实只是把任务加入任务队列,什么时候执行有线程池管理器觉定  
        public void execute(Runnable[] task) {  
            synchronized (taskQueue) {  
                for (Runnable t : task)  
                    taskQueue.add(t);  
                    taskQueue.notifyAll();  
                }  
        }  
      
        // 批量执行任务,其实只是把任务加入任务队列,什么时候执行有线程池管理器觉定  
        public void execute(List<Runnable> task) {  
            synchronized (taskQueue) {  
                for (Runnable t : task)  
                    taskQueue.add(t);  //把任务加入队列
                taskQueue.notifyAll();   //当调用execute()方法的时候,执行notiry(),只会唤醒线程池中的一个线程,注意与notoryAll()的区别
            }  
        }  
     
        @Override
        public int getExecuteTaskNumber() {
            return executeTaskNumber.get();
        }
     
        @Override
        public int getWaitTaskNumber() {
            return taskQueue.size();
        }
     
        @Override
        public int getWorkThreadNumber() {
            return workerNum;
        }
        @Override
        public String toString() {
              return "当前线程数量:" +workerNum+ " 已完成任务:"  
                        +getExecuteTaskNumber()+ "  等待任务数:" + getWaitTaskNumber();
        }
        @Override
        public void destroy() {
            //不断检查任务队列中存在任务
            while (!taskQueue.isEmpty()) {// 如果还有任务没执行完成,就先睡会吧  
                try {  
                    Thread.sleep(10);  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
            // 工作线程停止工作,且置为null  
            for (int i = 0; i < workerNum; i++) {  
                workThreads[i].stopWorker();  
                workThreads[i] = null;  
            }  
            threadPool=null;  
            taskQueue.clear();// 清空任务队列  
        }
        /*内部类 即一个线程池对象*/
        private class WorkThread extends Thread{
            //该工作线程是否有效,用来接收该工作线程
            private volatile boolean isRunnable=true;
            /*
             * 关键所在,如果任务队列不空,则取出任务执行,若任务队列为空,则等待
            */
            @Override
            public void run() {
                //接收队列当中的任务对象 任务对象Runnable类型
                Runnable r=null;
                while(isRunnable){
                    //队列同步机制
                    synchronized(taskQueue){
                        while(isRunnable && taskQueue.isEmpty()){//队列为空
                            try {
                                taskQueue.wait(20);
                            } catch (InterruptedException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                            }
                        }
                        if(!taskQueue.isEmpty()){
                            try {
                                r=taskQueue.take();// 获取并移除第一个元素
                            } catch (InterruptedException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                            }
                        }
                        if(r!=null){
                            r.run();//执行任务
                            executeTaskNumber.incrementAndGet();
                        }
                        
                        r=null;
                    }
                }
            }
            public void stopWorker(){
                isRunnable=false;
            }
        }
    }
    
    1. 客户端实现
    package com.byedbl.threadpool;
    
    public class Test {
        public static void main(String[] args) {  
            // 创建3个线程的线程池  
            ThreadPool t = ThreadPoolManager.getThreadPool();  
            t.execute(new Runnable[] { new Task(), new Task(), new Task() });  
            t.execute(new Runnable[] { new Task(), new Task(), new Task() });   
            System.out.println(t);
            t.destroy();// 所有线程都执行完成才destory  
            System.out.println(t);
        }  
      
        // 任务类  
        static class Task implements Runnable {  
            private static volatile int i = 1;  
            @Override  
            public void run() {// 执行任务  
                System.out.println("任务 " + (i++) + " 完成");  
            }  
        } 
    }
    
    

    相关文章

      网友评论

          本文标题:java基础-多线程

          本文链接:https://www.haomeiwen.com/subject/mqptmftx.html