美文网首页
java 多线程、线程池

java 多线程、线程池

作者: 汪梓文 | 来源:发表于2017-05-10 18:12 被阅读0次

    概念:

    • 原子性
    • 原子是世界上的最小单位,具有不可分割性。比如 a=0;(a非long和double类型) 这个操作是不可分割的,那么我们说这个操作时原子操作。再比如:a++; 这个操作实际是a = a + 1;是可分割的,所以他不是一个原子操作。非原子操作都会存在线程安全问题,需要我们使用同步技术(sychronized)来让它变成一个原子操作。一个操作是原子操作,那么我们称它具有原子性。Java的concurrent包下提供了一些原子类,我们可以通过阅读API来了解这些原子类的用法。比如:AtomicInteger、AtomicLong、AtomicReference等。
    • 可见性,是指线程之间的可见性,一个线程修改的状态对另一个线程是可见的。也就是一个线程修改的结果。另一个线程马上就能看到。比如:用volatile修饰的变量,就会具有可见性。volatile修饰的变量不允许线程内部缓存和重排序,即直接修改内存。所以对其他线程是可见的。但是这里需要注意一个问题,volatile只能让被他修饰内容具有可见性,但不能保证它具有原子性。比如 volatile int a = 0;之后有一个操作 a++;这个变量a具有可见性,但是a++ 依然是一个非原子操作,也就这这个操作同样存在线程安全问题。
    • 什么是线程安全:
      • 线程安全是指,同一段代码多条线程访问,不会产生不同的结果
    • 并行和并发区别
      • 并行是多条线程同时再赛跑。
      • 并发是指同个资源,两者交替轮流使用资源。
    • Fifo(First In first Out):
      • Queue队列的特性就是先入先出
    • Filo(First ln Last Out):
      • stact栈的特性就是先入后出
    • CAS(Compare And Swap):
      • CAS 指的是现代 CPU 广泛支持的一种对内存中的共享数据进行操作的一种特殊指令。这个指令会对内存中的共享数据做原子的读写操作。简单介绍一下这个指令的操作过程:首先,CPU 会将内存中将要被更改的数据与期望的值做比较。然后,当这两个值相等时,CPU 才会将内存中的数值替换为新的值。否则便不做操作。最后,CPU 会将旧的数值返回。这一系列的操作是原子的。它们虽然看似复杂,但却是 Java 5 并发机制优于原有锁机制的根本。简单来说,CAS 的含义是“我认为原有的值应该是什么,如果是,则将原有的值更新为新值,否则不做修改,并告诉我原来的值是多少”。(这段描述引自《Java并发编程实践》)
        简单的来说,CAS有3个操作数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则返回V。这是一种乐观锁的思路,它相信在它修改之前,没有其它线程去修改它;而Synchronized是一种悲观锁,它认为在它修改之前,一定会有其它线程去修改它,悲观锁效率很低
    CAS的ABA问题
    
    所谓 ,问题基本是这个样子:
    
    进程P1在共享变量中读到值为A
    P1被抢占了,进程P2执行
    P2把共享变量里的值从A改成了B,再改回到A,此时被P1抢占。
    P1回来看到共享变量里的值没有被改变,于是继续执行。
    虽然P1以为变量值没有改变,继续执行了,但是这个会引发一些潜在的问题。ABA问题最容易发生在lock free 的算法中的,CAS首当其冲,因为CAS判断的是指针的地址。如果这个地址被重用了呢,问题就很大了。(地址被重用是很经常发生的,一个内存分配后释放了,再分配,很有可能还是原来的地址)
    
    比如上述的DeQueue()函数,因为我们要让head和tail分开,所以我们引入了一个dummy指针给head,当我们做CAS的之前,如果head的那块内存被回收并被重用了,而重用的内存又被EnQueue()进来了,这会有很大的问题。(内存管理中重用内存基本上是一种很常见的行为)
    
    这个例子你可能没有看懂,维基百科上给了一个活生生的例子——
    
    你拿着一个装满钱的手提箱在飞机场,此时过来了一个火辣性感的美女,然后她很暖昧地挑逗着你,并趁你不注意的时候,把用一个一模一样的手提箱和你那装满钱的箱子调了个包,然后就离开了,你看到你的手提箱还在那,于是就提着手提箱去赶飞机去了。
    
    这就是ABA的问题。
    

    一、Java线程池ThreadPoolExecutor -- 参数

    • corePoolSize: 核心线程数
      • 核心线程会一直存活,及时没有任务需要执行
      • 当线程数小于核心线程数时,即使有线程空闲,线程池也会优先创建新线程处理
      • 设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭
      • 在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中
    • maxmumPoolSize : 最大线程数
      • 当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务
      • 当线程数=maxPoolSize,且任务队列已满时,线程池会拒绝处理任务而抛出异常
    • keepAliveTime : 线程空闲时间
      • 当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize
      • 如果allowCoreThreadTimeout=true,则会直到线程数量=0
      • 默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0
    • unit : keepAliveTime 时间单位
    TimeUnit.DAYS;               //天
    TimeUnit.HOURS;             //小时
    TimeUnit.MINUTES;           //分钟
    TimeUnit.SECONDS;           //秒
    TimeUnit.MILLISECONDS;      //毫秒
    TimeUnit.MICROSECONDS;      //微妙
    TimeUnit.NANOSECONDS;       //纳秒
    
    • threadFactory

      • 线程工厂,主要用来创建线程
    • handler

      • 表示当拒绝处理任务时的策略,有以下四种取值:

      ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
      ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
      ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
      ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

    • workQueue : 任务队列

      • Queue:Queue中元素按Fifo原则进行排序
        • ConcurrentLinkedQueue : 非阻塞队列
          • Queue是一个安全实现
          • 使用CAS无锁编程,来保证数据的一致性
    public class ConcurrentLinkedQueueTest {
        private static ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<Integer>();
        private static int count = 2; // 线程个数
        //CountDownLatch,一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
        private static CountDownLatch latch = new CountDownLatch(count);
    
        public static void main(String[] args) throws InterruptedException {
            long timeStart = System.currentTimeMillis();
            ExecutorService es = Executors.newFixedThreadPool(4);
            ConcurrentLinkedQueueTest.offer();
            for (int i = 0; i < count; i++) {
                es.submit(new Poll());
            }
            latch.await(); //使得主线程(main)阻塞直到latch.countDown()为零才继续执行
            System.out.println("cost time " + (System.currentTimeMillis() - timeStart) + "ms");
            es.shutdown();
        }
    
        /**
         * 生产
         */
        public static void offer() {
            for (int i = 0; i < 100000; i++) {
                queue.offer(i);
            }
        }
    
    
        /**
         * 消费
         *
         * @author 林计钦
         * @version 1.0 2013-7-25 下午05:32:56
         */
        static class Poll implements Runnable {
            public void run() {
                // while (queue.size()>0) {
                while (!queue.isEmpty()) {
                    System.out.println(queue.poll());
                }
                latch.countDown();
            }
        }
    }
    
    
    + BlockingQueue:阻塞队列
      + LinkedBlockingQueue:实现是线程安全的,实现了先进先出的特性,是作为生产者消费者的首选
        + put:添加方法,在队列满的时候会阻塞
        + take : 读取方法,在队列为空的时候会阻塞,直到有成员被加
    
    package com.example;  
    
    import java.util.concurrent.*;
    // LinkedBlockingQueue Demo
    public class BlockingQueueTest {
        /**
         * 定义装苹果的篮子
         */
        public class Basket {
            // 篮子,能够容纳3个苹果
            BlockingQueue<String> basket = new LinkedBlockingQueue<String>(3);
    
            // 生产苹果,放入篮子
            public void produce() throws InterruptedException {
                // put方法放入一个苹果,若basket满了,等到basket有位置
                basket.put("An apple");
            }
    
            // 消费苹果,从篮子中取走
            public String consume() throws InterruptedException {
                // take方法取出一个苹果,若basket为空,等到basket有苹果为止(获取并移除此队列的头部)
                return basket.take();
            }
        }
    
        // 定义苹果生产者
        class Producer implements Runnable {
            private String instance;
            private Basket basket;
    
            public Producer(String instance, Basket basket) {
                this.instance = instance;
                this.basket = basket;
            }
    
            public void run() {
                try {
                    while (true) {
                        // 生产苹果
                        System.out.println("生产者准备生产苹果:" + instance);
                        basket.produce();
                        System.out.println("!生产者生产苹果完毕:" + instance);
                        // 休眠300ms
                        Thread.sleep(300);
                    }
                } catch (InterruptedException ex) {
                    System.out.println("Producer Interrupted");
                }
            }
        }
    
        // 定义苹果消费者
        class Consumer implements Runnable {
            private String instance;
            private Basket basket;
    
            public Consumer(String instance, Basket basket) {
                this.instance = instance;
                this.basket = basket;
            }
    
            public void run() {
                try {
                    while (true) {
                        // 消费苹果
                        System.out.println("消费者准备消费苹果:" + instance);
                        System.out.println(basket.consume());
                        System.out.println("!消费者消费苹果完毕:" + instance);
                        // 休眠1000ms
                        Thread.sleep(1000);
                    }
                } catch (InterruptedException ex) {
                    System.out.println("Consumer Interrupted");
                }
            }
        }
    
        public static void main(String[] args) {
            BlockingQueueTest test = new BlockingQueueTest();
    
            // 建立一个装苹果的篮子
            Basket basket = test.new Basket();
    
            ExecutorService service = Executors.newCachedThreadPool();
            Producer producer = test.new Producer("生产者001", basket);
            Producer producer2 = test.new Producer("生产者002", basket);
            Consumer consumer = test.new Consumer("消费者001", basket);
            service.submit(producer);
            service.submit(producer2);
            service.submit(consumer);
            // 程序运行5s后,所有任务停止
    //        try {
    //            Thread.sleep(1000 * 5);
    //        } catch (InterruptedException e) {
    //            e.printStackTrace();
    //        }
    //        service.shutdownNow();
        }
    }
    

    二、Runnable And Callable
    1、Runnable实现是Run方法,Callable实现是call方法
    2、Callable的call方法可以有返回值使用Future接受返回值、Runnable的run方法没有返回值
    3、Callable的call方法可以抛出异常、run方法无法捕获线程异常

    三、Future AND CompletionService
    1、用于接受Callable线程执行完成返回的结果

    import java.util.ArrayList;
    import java.util.List;
    import java.util.Random;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    
    /**
     * 多线程执行,异步获取结果
     * 
     * @author i-clarechen
     *
     */
    public class AsyncThread {
    
        public static void main(String[] args) {
            AsyncThread t = new AsyncThread();
            List<Future<String>> futureList = new ArrayList<Future<String>>();
            t.generate(3, futureList);
            t.doOtherThings();
            t.getResult(futureList);
        }
    
        /**
         * 生成指定数量的线程,都放入future数组
         * 
         * @param threadNum
         * @param fList
         */
        public void generate(int threadNum, List<Future<String>> fList) {
            ExecutorService service = Executors.newFixedThreadPool(threadNum);
            for (int i = 0; i < threadNum; i++) {
                Future<String> f = service.submit(getJob(i));
                fList.add(f);
            }
            service.shutdown();
        }
    
        /**
         * other things
         */
        public void doOtherThings() {
            try {
                for (int i = 0; i < 3; i++) {
                    System.out.println("do thing no:" + i);
                    Thread.sleep(1000 * (new Random().nextInt(10)));
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 从future中获取线程结果,打印结果
         * 
         * @param fList
         */
        public void getResult(List<Future<String>> fList) {
            ExecutorService service = Executors.newSingleThreadExecutor();
            service.execute(getCollectJob(fList));
            service.shutdown();
        }
    
        /**
         * 生成指定序号的线程对象
         * 
         * @param i
         * @return
         */
        public Callable<String> getJob(final int i) {
            final int time = new Random().nextInt(10);
            return new Callable<String>() {
                @Override
                public String call() throws Exception {
                    Thread.sleep(1000 * time);
                    return "thread-" + i;
                }
            };
        }
    
        /**
         * 生成结果收集线程对象
         * 
         * @param fList
         * @return
         */
        public Runnable getCollectJob(final List<Future<String>> fList) {
            return new Runnable() {
                public void run() {
                    for (Future<String> future : fList) {
                        try {
                            while (true) {
                                if (future.isDone() && !future.isCancelled()) {
                                    System.out.println("Future:" + future
                                            + ",Result:" + future.get());
                                    break;
                                } else {
                                    Thread.sleep(1000);
                                }
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            };
        }
    
    }
    

    结果

    do thing no:0
    do thing no:1
    do thing no:2
    Future:java.util.concurrent.FutureTask@68e1ca74,Result:thread-0
    Future:java.util.concurrent.FutureTask@3fb2bb77,Result:thread-1
    Future:java.util.concurrent.FutureTask@6f31a24c,Result:thread-2
    

    2、使用误CompletionService实现非阻塞式Future

    • 当向Executor提交批处理任务时,并且希望在它们完成后获得结果,如果用FutureTask,你可以循环获取task,并用future.get()去获取结果,但是如果这个task没有完成,你就得阻塞在这里,这个实效性不高,其实在很多场合,其实你拿第一个任务结果时,此时结果并没有生成并阻塞,其实在阻塞在第一个任务时,第二个task的任务已经早就完成了,显然这种情况用future task不合适的,效率也不高。
    import java.util.Random;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.Callable;
    import java.util.concurrent.CompletionService;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorCompletionService;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.LinkedBlockingDeque;
    
    
    public class testCallable {
        public static void main(String[] args) {
            try {
                completionServiceCount();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    
      
        /**
         * 使用completionService收集callable结果
         * @throws ExecutionException 
         * @throws InterruptedException 
         */
        public static void completionServiceCount() throws InterruptedException, ExecutionException {
            ExecutorService executorService = Executors.newCachedThreadPool();
            CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
                    executorService);
            int threadNum = 5;
            for (int i = 0; i < threadNum; i++) {
                completionService.submit(getTask(i));
            }
            int sum = 0;
            int temp = 0;
            for(int i=0;i<threadNum;i++){
                temp = completionService.take().get();
                sum += temp;
                System.out.print(temp + "\t");
            }
            System.out.println("CompletionService all is : " + sum);
            executorService.shutdown();
        }
    
        public static Callable<Integer> getTask(final int no) {
            final Random rand = new Random();
            Callable<Integer> task = new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    int time = rand.nextInt(100)*100;
                    System.out.println("thead:"+no+" time is:"+time);
                    Thread.sleep(time);
                    return no;
                }
            };
            return task;
        }
    }
    

    结果:最先执行完成的线程先输出结果

    thead:0 time is:4200
    thead:1 time is:6900
    thead:2 time is:2900
    thead:3 time is:9000
    thead:4 time is:7100
    2    0    1    4    3    CompletionService all is : 10
    

    相关文章

      网友评论

          本文标题:java 多线程、线程池

          本文链接:https://www.haomeiwen.com/subject/gndgtxtx.html