Java 并发

作者: 4b4f3ceb6f71 | 来源:发表于2017-01-16 16:58 被阅读309次

    并发

    进程和线程

    在并发编程当中,有两个基本的执行单元:进程和线程。在java中,我们大部分关心的是线程。然而进程也很重要。

    一个电脑系统通常有许多活跃的进程和线程。在只有一个核心的系统当中,在任意一个时刻,实际上只有一个线程在执行。进程和线程通过操作系统的时间分片特性共享单个核心的处理时间。

    进程

    一个进程有独立的执行环境。进程一般有完整的、私有的基本运行资源。尤其要说的每个进程都有自己的独立内存空间。

    进程经常被视作一个程序或者是一个应用。然而那些被用户视作的单个应用程序进程也许实际上由一些相互协作的进程组合而成。为了促进进程间的通信,大部分操作系统支持内部进程交流(Inter Process Communication[IPC])资源,例如管道和套接字。IPC不仅仅可以在同一个系统的进程间进行通信,也可以在不同的系统的进程间进行通信。

    大多数的Java虚拟机的实现都是以单个进程的方式运行的。一个Java应用程序可以用ProcessBuilder对象创建额外的进程。多进程应用程序不再这节课的讨论范围之内。

    线程

    线程有的时候叫做轻量级的进程。进程和线程都提供一个执行的环境,但是创建一个新的线程所需要的资源比创建一个进程所需要的资源少。

    线程存在在进程之中,每一个进程至少有一个线程。线程共享进程的资源,包括内存和打开的文件。这样设计是为了提高效率和交流,但是可能带来隐藏的问题。

    多线程执行是Java虚拟机的基本特性。如果你计数系统的线程像内存管理和信号处理一样,那么每一个应用程序至少有一个线程或者是多个。但是从应用程序开发者的角度看,你仅仅只调用了一个叫做main thread的线程。这个线程有能力去创建额外的线程,我们将在下章节进行讲解。

    线程对象

    每一个线程都可以和类Thread的一个实例联系起来。有两种基本的策略使用线程对象创建一个并发的应用程序。

    • 为了直接控制线程的创建和管理,仅仅只在应用程序需要启动一个异步任务的时候实例化线程
    • 为了抽象线程的管理,可以将应用程序的任务加入到executor中。
      本节介绍Thread对象的使用。Executors将和更高级别并行对象一起讨论。

    线程的定义和启动

    一个程序创建一个线程的实例必须提供需要运行的代码。下面有两种实现的方法:

    • 提供一个Runnable的对象。Runnable接口定义了一个方法run,这个方法里面包含了在线程当中执行的代码。Runnable对象可以做为Thread的构造器参数就像下面HelloRunnable的例子:

      ` public class HelloRunnable implements Runnable {
      
            public void run() {
                System.out.println("Hello from a thread!");
            }
      
            public static void main(String args[]) {
               (new Thread(new HelloRunnable())).start();
            }
         }  `
      
    • 继承Thread。类Thread本身就实现了Runnable接口,Thread的run方法什么都没有做。程序可以继承类Thread,实现自己的run方法就像下面的HelloThread例子:

      `public class HelloThread extends Thread {
      
            public void run() {
                System.out.println("Hello from a thread!");
            }
      
            public static void main(String args[]) {
                  (new HelloThread()).start();
            }
          } `
      

    请注意两个例子都是调用Thread的start的方法启动新的线程。

    你应该使用哪一种方法?第一种方法实现Runnable接口,这种方法使用更加普遍,因为除了Thread类,这个对象还可以继承其他类。第二种方法在简单的程序中使用起来更简单,但是有个限制就是你的任务类必须是Thread的子类。这一节主要是聚焦在第一种方法上,它把用Runnable任务和用Thread对象去执行任务区分开来。这种方法不仅仅更加灵活,而且适用性更高,这点在后面更高级别的线程管理APIs中会提到。

    类Thread定义了一些有用的线程管理的方法。包括提供一些关于调用方法的线程的信息和影响线程状态的静态方法。被其他线程调用的一些方法也可以管理线程和Thread对象。我们将在接下来的章节中学习它们中的一些方法。

    用Sleep方法暂停线程的执行

    Thread的sleep方法调用会让当前执行的线程暂停一段特定的时间。这是让运行在电脑系统上的应用或者其他应用的其他线程可以占用处理器时间的有效方式。Sleep方法也可以让线程一步一步的执行,就像下面例子中展示的,或者是等待另外一个有时间需求的线程,就像后面章节中介绍的SimpleThreads例子。

    提供了两个重载的sleep方法:也是休眠时间单位是微秒,另外一个休眠的时间单位是纳米。然而,这些休眠的时间不能保证是准确的,因为它们受限于操作系统之下的硬件设备。同时,休眠过程中也可以被中断终止,我们将在后面的章节看到。在任何情况下,你不可以认为调用sleep方法可以让线程暂停指定的准确的时间。

    SleepMessages例子用sleep方法实现每4秒打印消息:

        `public class SleepMessages {
    
            public static void main(String args[]) throws InterruptedException {
                String importantInfo[] = {
                    "Mares eat oats",
                    "Does eat oats",
                    "Little lambs eat ivy",
                    "A kid will eat ivy too"
                };
    
                for (int i = 0;
                     i < importantInfo.length;
                     i++) {
                    //Pause for 4 seconds
                    Thread.sleep(4000);
                    //Print a message
                    System.out.println(importantInfo[i]);
                }
            }
        }`
    

    注音main函数声明抛出InterruptedException。当一个线程在睡眠当中,另外一个线程中断这个线程,那么sleep方法将抛出InterruptedException。因为这个程序没有定义另外一个线程去调用中断,所以就没有写catch语句去捕获InterruptedException。

    中断

    一个中断是告诉一个线程它应该暂停它正在做的事情。线程怎么去回应这个中断完全取决于开发者的决定,但是让线程终止也是很正常的决定。这是在这一节着重介绍的用处。一个线程通过调用Thread对象的interrupt方法发送一个中断给到需要中断的线程。为了让中断机制运行正确,被中断的线程必须支持自己的中断。

    支持中断

    一个线程怎样支持它自己的中断列?这个依赖于它正在做什么。假如说一个线程经常调用抛出InterruptedException的方法,在它捕获这个异常之后它仅仅是从run方法中返回。例如,假如在SleepMessages例子中的主要循环语句在Runnable对象的run方法当中。然后它可以被修改成下面的形式支持中断:

        `for (int i = 0; i < importantInfo.length; i++) {
            // Pause for 4 seconds
            try {
                Thread.sleep(4000);
            } catch (InterruptedException e) {
                // We've been interrupted: no more messages.
                return;
            }
            // Print a message
            System.out.println(importantInfo[i]);
        }`
    

    很多方法抛出InterruptedException,例如sleep方法,这些方法被设计成当它们收到中断时立马取消当前的操作和立即返回。

    假如有个线程没有调用抛出InterruptedException的方法,那它怎样去响应中断列?那么它必须定期的去调用Thread的interrupted方法,这个方法在该线程设置了中断的情况下返回true。例如:

        `for (int i = 0; i < inputs.length; i++) {
            heavyCrunch(inputs[i]);
            if (Thread.interrupted()) {
                // We've been interrupted: no more crunching.
                return;
            }
        }`
    

    在这个简单的例子当中,代码仅仅检查中断如果那个线程已经接收了中断,那么这个check返回true。在很多复杂的应用当中,抛出一个InterruptedException更加让人明白:

        `if (Thread.interrupted()) {
            throw new InterruptedException();
        }`
    

    这个例子让中断处理的代码集中到catch的语句当中。

    中断状态标志

    中断原理是用内部的一个叫做中断状态的标志来实现的。调用Thread的interrupt方法会设置这个标志。当一个线程通过调用Thread的静态方法interrupted检查中断时,中断状态会被清除。一个线程会用另外一个线程的非静态的isInterrupted方法来查询它的中断状态,这个操作不会改变另外一个线程的中断状态标志。

    按照规定,任何一个可以抛出InterruptedException的方法在抛出InterruptedException之后会清除中断状态。然而,也很有可能,这个中断状态会立马被另外一个线程调用interrupt方法设置。

    Join方法

    Join方法让一个线程可以等待另外一个线程执行完。如果t是一个正在执行的线程对象,

        `t.join();`
    

    这个调用会让当前线程停止执行直到t执行完成。join的方法的重载让开发者可以指定特定的等待时间。然而和sleep方法一样,join方法等待的时间依赖于操作系统,因此你不可以认为join方法会准确的等待你所指定的时间。

    就像sleep方法,join通过抛出InterruptedException来响应中断。

    SimpleThreads例子

    下面的这个例子把这部分的一些概念综合起来进行展示。SimpleThreads包含两个线程。第一个线程是每个Java程序都会有的主线程。主线程通过Runnable对象创建了一个叫MessageLoop的新线程,然后主线程等待这个线程完成。如果MessageLoop这个线程花费了太久都没有完成,那么主线程就会中断这个线程。

    MessageLoop线程会打印一系列的消息。如果在打印完所有消息之前中断这个线程,MessageLoop线程将打印一个消息然后退出。

        `public class SimpleThreads {
            // Display a message, preceded by
            // the name of the current thread
            static void threadMessage(String message) {
                String threadName =
                    Thread.currentThread().getName();
                System.out.format("%s: %s%n",
                                  threadName,
                                  message);
            }
    
            private static class MessageLoop
                implements Runnable {
                public void run() {
                    String importantInfo[] = {
                        "Mares eat oats",
                        "Does eat oats",
                        "Little lambs eat ivy",
                        "A kid will eat ivy too"
                    };
                    try {
                        for (int i = 0;
                             i < importantInfo.length;
                             i++) {
                            // Pause for 4 seconds
                            Thread.sleep(4000);
                            // Print a message
                            threadMessage(importantInfo[i]);
                        }
                    } catch (InterruptedException e) {
                        threadMessage("I wasn't done!");
                    }
                }
            }
    
            public static void main(String args[])
                throws InterruptedException {
    
                // Delay, in milliseconds before
                // we interrupt MessageLoop
                // thread (default one hour).
                long patience = 1000 * 60 * 60;
    
                // If command line argument
                // present, gives patience
                // in seconds.
                if (args.length > 0) {
                    try {
                        patience = Long.parseLong(args[0]) * 1000;
                    } catch (NumberFormatException e) {
                        System.err.println("Argument must be an integer.");
                        System.exit(1);
                    }
                }
    
                threadMessage("Starting MessageLoop thread");
                long startTime = System.currentTimeMillis();
                Thread t = new Thread(new MessageLoop());
                t.start();
    
                threadMessage("Waiting for MessageLoop thread to finish");
                // loop until MessageLoop
                // thread exits
                while (t.isAlive()) {
                    threadMessage("Still waiting...");
                    // Wait maximum of 1 second
                    // for MessageLoop thread
                    // to finish.
                    t.join(1000);
                    if (((System.currentTimeMillis() - startTime) > patience)
                          && t.isAlive()) {
                        threadMessage("Tired of waiting!");
                        t.interrupt();
                        // Shouldn't be long now
                        // -- wait indefinitely
                        t.join();
                    }
                }
                threadMessage("Finally!");
            }
        }`
    

    同步

    线程通信从根本上是通过对属性和对象引用的属性的共享访问实现的。这种形式的通信效率特别高,但是会带来两种可能的错误:线程干扰和内存一致性错误。防止这类错误发生的工具就是同步。

    然而,同步又会引入线程竞争问题,这种问题在两个或者多个线程同时去访问相同的资源的时候会发生,会让Java执行一些线程变的更慢甚至可能暂停它们的执行。饥饿和活锁是线程竞争的表现形式。可以在章节活锁中了解关于这方面更多信息。

    这一节主要是讲解下面这些话题:

    • 线程干扰是描述多线程访问共享数据时错误是怎么引入的。
    • 内存一致性错误描述的是共享内存的不一致性错误。
    • 同步方法描述的是一种有效的防止线程干扰和内存一致性错误的方法。
    • 隐式锁和同步描述的是一种更加普遍的基于隐士锁的同步方法。
    • 原子性讨论的是不能被其他线程干扰的操作的大体概念。

    线程干扰

    下面有个叫做Counter的简单类

        `class Counter {
            private int c = 0;
    
            public void increment() {
                c++;
            }
    
            public void decrement() {
                c--;
            }
    
            public int value() {
                return c;
            }
    
        }`
    

    Counter类的每次increment的方法的调用会让c的值加一,decrement的方法的调用会让c的值减一。然而,如果这个Counter对象被多线程引用,线程之间的干扰让它没有按照预期的运行。

    当两个操作在不同的线程间在同样的数据上交替运行时会产生线程干扰。也就是说这两个操作包含多个步骤,然后步骤的序列产生了重叠。

    看起来似乎在Counter的实例对象上面的操作交替进行是不可能的,因为两个在变量c上面的操作都是单个的、简单的语句。然而,尽管简单的语句也可以被虚拟机转化成为多个步骤执行。我们不需要检查到底虚拟机会花了多少步,只需要知道表达式c++被分解成3部就足够了:

    1. 取得c现在的值。
    2. 在取得的值上面增加1.
    3. 在把增加之后的值存储到c中。

    表达式c--也会被按照相同的方式分解,除了把第二步的增加换为减少就行了。

    假定线程A调用了increment方法,在同一时刻,线程B调用了decrement方法。如果c的初始值是0,它们交替执行的序列可能是下面这样:

    1. 线程A:取得c的值。
    2. 线程B:取得c的值。
    3. 线程A:增加取得的值;结果是1。
    4. 线程B:减少取得的值;结果是-1。
    5. 线程A:存储结果到c中;c的值现在是1。
    6. 线程B:存储结果到c中;c的值现在是-1。

    线程A执行的结果被线程B重写了。这种交替序列仅仅是其中的一种可能。在不同的情况下,可能是线程B的结果丢失,或者是得到预期的结果。因为线程干扰的bugs是不可预测的。

    内存一致性错误

    数据一致性错误发生在不同的线程去读相同的数据时,读到的数据不一致。引起内存一致性错误的原因很复杂超出了这篇教程的范围。幸运的是,开发者不需要详细知道这些原因。开发者需要知道的是如何去避免这些错误。

    避免内存一致性错误的关键是理解happens-before关系。这个关系仅仅是保证一块内存被一个特定的语句的写操作对另外一个特定的语句是可见的。为了理解上面的这句话,我们来看下下面的例子。假定定义了一个简单的int类型的属性,且初始值是0:

        `int counter = 0;`
    

    这个属性在线程A和B之间是共享的。假定线程A增加了counter的值:

        `counter++;`
    

    然后在很短的时间内,线程B打印了counter的值:

        `System.out.println(counter);`
    

    如果这个两个语句在同一个线程当中执行,那么这个属性被打印出来的值肯定是‘’1‘’。但是如果两个语句在不同的线程当中执行,那么打印出来的值可能就是“0”,因为没有保证线程A对属性counter的改变对线程B是可见的,除非开发者在这两条语句之间建立了happens-before的关系。

    有数种建立happens-before关系的行为。其中一种就是同步,这个我们将在接下来的章节当中看到。

    我们已经看到下面两种行为会创建happens-before关系:

    • 当一个语句调用Thread.start方法时,那么每一个对这条语句有happens-before关系的语句对这个新线程执行的语句都有happens-before关系。
    • 当一个线程终止执行且在另外一个线程中调用Thread.join返回时,然后所有的在这个终止的线程中执行的语句都对那个被join的线程的接下来执行的语句有happens-before关系。在这个线程中代码的变化对被join的线程就是可见的。

    所有的创建happens-before关系的行为.

    同步方法

    Java编程语言提供了两种编程方式:同步方法和同步代码块。其中复杂的同步代码块在下个章节进行讲解。这个章节讲解同步方法。

    仅仅只需要在方法声明前面加上关键字synchronized,就也可以把这个方法变成同步的:

        `public class SynchronizedCounter {
            private int c = 0;
    
            public synchronized void increment() {
                c++;
            }
    
            public synchronized void decrement() {
                c--;
            }
    
            public synchronized int value() {
                return c;
            }
        }`
    

    如果count是SynchronizedCounter的一个实例,把这些方法变成同步的有下面两个影响:

    • 首先,交替调用同一个对象的同步方法是不可能的。当一个线程在执行一个对象的同步方法时,所有其他再调用这个对象的同步方法的线程都将被阻塞直到第一个调用的线程执行完成。
    • 其次,当一个同步方法退出时,它就自动对后面的同一个对象的同步方法的调用建立了happens-before关系。这样子保证了对象的状态的改变对所有的线程都是可见的。

    记住构造器方法是不可以同步的,在构造器方法前面加关键字synchronized是有语法错误的。同步构造器方法没有任何意义,因为只有创建这个对象的线程在这个创建这个对象的时候才有访问它的权限。

    *警告:当创建一个在多个线程中共享的对象时,要特别小心对象的引用提早泄漏出去。举个例子,假定你想让一个叫instances的List包含每一个类的实例。你也许会在你的构造器中加入下面的代码,但是然后其他线程可以在对象的构造完成之前用instances去访问对象(has issue,need optimize):

              `instances.add(this);`
    

    同步方法是一个简单的防止线程干扰和内存一致性错误的策略;如果一个对象对一个以上的线程是可见的,那么这个对象的变量的所有的读和写操作是通过同步方法完成的。(一个重要的特例:当对象创建之后不可以被修改的final属性是可以被非同步的方法安全的读的。)这个策略很有效,但是可能会产生并发活跃性的问题,我们将在后面的章节看到。

    内部锁和同步

    同步时建立在一个内部实体周围也就是大家知道的内部锁或者叫监控锁。(API文档里面几次把这实体叫做监控)内部锁不仅仅在同步方面(强制排它的访问对象的状态)起到作用,同时也在建立happens-before关系(对可见性是必须的)起到作用。

    每一个对象都有一个和它相关联的内部锁。一个需要排它和一致访问对象属性的线程在访问对象属性之前必须要先获得对象的内部锁,然后完成之后释放内部锁。也就是线程在持有这个对象的内部锁在获得和释放这个锁之间。只要一个线程持有了一个内部锁,那么其他的线程就都不可以拿到这个内部锁。其他的线程会被阻塞当它们尝试去获得这个锁时。

    当一个线程释放一个内部锁时,在这个动作和任意后面的获得这个锁的动作建立了happens-before关系。

    在同步方法中的锁

    当一个线程调用一个同步方法时,它会自动去获得这个方法的对象的内部锁,然后当这个方法退出时释放这个锁,即使是这个方法的退出是因为没有捕获的异常引起的。

    你也许想知道当一个同步的静态方法被调用时会发生什么,因为一个静态的方法是和类相关联的,不是对象。在这种情况下,线程获得的是和这个类相关的对象的内部锁。因此控制访问类的静态属性的锁是和任何类的实例的锁是不一样的。

    同步代码块

    另外一个种建立同步代码的的方法是同步代码块。和同步方法不一样,同步代码块必须指定哪一个对象的内部锁:

              `public void addName(String name) {
                      synchronized(this) {
                          lastName = name;
                          nameCount++;
                      }
                      nameList.add(name);
               }`
    

    在这个例子中,addName方法需要同步的改变lastName和nameCount的属性的值,但是同时又需要避免其他对象方法的同步调用。(通过同步代码调用其他对象的方法会带来像章节Liveness。)如果没有同步代码,则必须要有单独的非同步的方法去调用nameList.add。

    同步代码块用细粒度的锁可以提高并发。举个例子,类MsLunch有两个实例属性,c1和c2,它们从来不回被同时使用。所有的这两个属性的更新必须是同步的,如果c1的更新和c2 的更新用的是同一个对象的锁,那么在c1和c2交替更新时会造成不必要的阻塞从而降低了并发。我们可以创建两个对象单独的提供锁来代替使用同步方法获使用this锁。

              `public class MsLunch {
                        private long c1 = 0;
                        private long c2 = 0;
                        private Object lock1 = new Object();
                        private Object lock2 = new Object();
    
                        public void inc1() {
                              synchronized(lock1) {
                                      c1++;
                              }
                        }
    
                        public void inc2() {
                              synchronized(lock2) {
                                      c2++;
                              }
                        }
              }`
    

    使用这种非常谨慎的方式。你一定可以完全确认交替的去访问受影响的属性是线程安全的。

    重入同步

    回想一下,一个线程是不可以获得一个被其他线程持有的锁。但是一个线程可以获得他自己持有的锁。允许一个线程可以获得多次获得同一个锁让重入同步成为可能。这个描述的是这样一个场景,一个同步的代码直接或者间接的调用一个包含同步的代码的方法,且这两个同步包含的同一个锁。如果没有重入同步特性,同步代码不得不做很多额外的措施去避免自己把自己阻塞。

    原子访问

    在编写程序的过程中,一个原子是操作是指一次执行完所有的操作。一个原子操作不可以在中间停下来:要么全部执行完成,要么都没有执行。原子操作带来的影响只有当它全部完成了以后才是可见的。

    我们已经看过一个这样的自增表达式,例如c++,这不是一个原子操作。即使是非常简单的表达式也可以定义复杂的操作,这个操作可以被分解成其它的不同的操作。然而你可以定义一组操作是原子的。

    • 对于引用变量和大多数的简单类型的变量(除了long和double类型)的读和写操作都是原子类型的。

    • 对于所有声明为volatile的变量(包括long和double类型)的读和写操作都是原子类型的。

    原子操作不可以被打断,所以使用原子操作不会受线程干扰的影响。然而,这并不排除所有需要同步的原子操作的错误,因为内存一致性错误还是存在。使用volatile变量降低了内存一致性错误出现的风险,因为任何对volatile变量的写操作都对后续的这个变量的读操作建立了happens-before的关系。也就是说volatile变量的变化对其他的线程总是可见的。更重要的是,当一个线程读一个volatile的变量时,不仅仅可以看到最新的变化,连代码的副作用导致的变化也可以看到。

    使用简单的原子变量访问比用同步代码去控制变量访问更高效,但是开发者需要考虑的更多去避免内存一致性错误。这么做是否值得取决于应用的大小和复杂性。

    在java.util.concurrent的包中的一些类提供了原子的方法,这些方法不依赖同步。我们将在后续章节中讨论。

    活跃性

    一个并发应用及时执行任务的能力叫做活跃性。这个章节主要介绍最常见的活跃性问题死锁,然后会简短的介绍两种其他的活跃性问题,饥饿和活锁。

    死锁

    死锁描述的是这样一个场景,二个或者多个线程因为互相等待而永久阻塞。举个例子。

    A和B是朋友,同时也是非常有礼貌的信徒。一个严格的有礼貌的规则是当你向你的朋友鞠躬时,你必须保持鞠躬的动作直到你的朋友有机会向你鞠躬回礼。不幸的是,这个规则没有考虑两个朋友同时向对方鞠躬的情况。下面的这个死锁例子模拟了这种情况:

        `public class Deadlock {
            static class Friend {
                private final String name;
                public Friend(String name) {
                    this.name = name;
                }
                public String getName() {
                    return this.name;
                }
                public synchronized void bow(Friend bower) {
                    System.out.format("%s: %s"
                        + "  has bowed to me!%n", 
                        this.name, bower.getName());
                    bower.bowBack(this);
                }
                public synchronized void bowBack(Friend bower) {
                    System.out.format("%s: %s"
                        + " has bowed back to me!%n",
                        this.name, bower.getName());
                }
            }
    
            public static void main(String[] args) {
                final Friend alphonse =
                    new Friend("Alphonse");
                final Friend gaston =
                    new Friend("Gaston");
                new Thread(new Runnable() {
                    public void run() { alphonse.bow(gaston); }
                }).start();
                new Thread(new Runnable() {
                    public void run() { gaston.bow(alphonse); }
                }).start();
            }
        }`
    

    当这个死锁例子运行时,两个线程极有可能阻塞当它们尝试去调用bowBack方法时。两个线程都会一直阻塞下去,因为它们都在等待对方鞠躬回礼。

    饥饿和活锁

    饥饿和活锁没有死锁那么常见,但也是每一个并发程序设计者可能会遇到的问题。

    饥饿

    饥饿描述的是一种线程不能够获得定期访问共享资源的场景,且不能够取得进展。饥饿发生在共享资源被“贪婪的”线程长期占用造成共享资源不可用的情况下。例如,假如一个对象提供一个同步方法,然后这个同步方法经常需要很长的时间才能返回。如果一个线程经常调用这个方法,其他需要经常同步访问这个对象的线程就会经常被阻塞。

    活锁

    一个线程经常是去响应另外一个线程的操作。如果这个另外的一个线程的执行又是响应另外的一个线程,然后活锁可能就产生了。和死锁一样,活锁线程也不能够取得进一步的进展。然而这些线程并没有被阻塞——它们仅仅是因为太忙而不能去互相响应然后继续工作。这就和两个人在一条走廊里面尝试去超过对方:A移到他的左边让B通过,同事B也同时移动他的右边让A通过。他们仍然阻塞对方。

    警戒块

    线程经常需要去调整它们的行为。最常见的调整方式是警戒块。这个块在可以继续执行之前通过轮询一个必须为true的条件开始。为了保证这个做正确有一些步骤需要做。

    假定,例如guardedJoy是一个在共享变量joy被另外一个线程设置了以后才会继续执行的一个方法。这个方法理论上可以一直循环直到条件满足,但是轮询是很浪费的,因为它在等待的过程中一直在执行。

        `public void guardedJoy() {
            // Simple loop guard. Wastes
            // processor time. Don't do this!
            while(!joy) {}
            System.out.println("Joy has been achieved!");
        }`
    

    一个更有效的警戒方式是调用Object的wait方法挂起当前线程。wait方法的调用直到另外一个线程发布通知之后,才会返回,这个通知可能是一些事件已经发生,尽管这个事件可能不是这个线程等待的:

              `public synchronized void guardedJoy() {
                       // This guard only loops once for each special event, which may not
                       // be the event we're waiting for.
                       while(!joy) {
                             try {
                                  wait();
                            } catch (InterruptedException e) {}
                       }
                       System.out.println("Joy and efficiency have been achieved!");
               }`
    

    注意:你需要在检测特定的条件的循环中调用wait方法。不要认为这个中断是你等待的特定的条件或者这个条件这个条件仍然是正确的。


    像许多其他挂起执行的方法一样,wait方法也会抛出InterruptedException。在上面的这个例子中,我们可以忽略这个异常,变量joy的值是我们唯一关心的。

    为什么这个版本的guardedJoy方法需要用关键字synchronized修饰咧?假定d是我们调用wait方法的对象。当一个线程调用 d的wait方法时,它必须要先获得d的对象内部锁否则会抛出异常——调用wait方法前,必须要先获得对象的内部锁。在同步方法内部调用wait时一种简单的获得内部锁的方法。

    当wait方法被调用时,线程释放了内部锁,挂起了执行。在将来的某个时间,另外一个线程将获得同一个内部锁,调用该对象的notifyAll方法,告诉所有等待在这个锁上面的线程重要的事情放生了:

             `public synchronized notifyJoy() {
                    joy = true;
                    notifyAll();
             }`�
    

    在第二个线程已经释放锁之后的某个时间,第一个线程重新获得了锁,从wait方法中返回然后继续执行。


    注意:有另外一种通知的方法,notify,这个方法只唤醒一个线程。因为notify这个方法不允许你指定将要被唤醒的线程,所以notify方法只适用于大规模的并行应用程序,那种大量的做类似工作的 线程。在这种应用程序中,你不需要去关心哪一个线程被唤醒。


    让我们用警戒块来创建一个生产者——消费者应用程序。这中应用程序在两个线程当中共享数据:生产者线程,负责创建数据;消费者线程,负责消费数据。两个线程用一个共享的对象进行交流。协调是必不可少的:消费者线程在生产者传递数据之前不可以尝试获得数据,生产者在消费者还没有获得就数据之前不可以尝试去传递新数据。

    在下面的这个例子中,数据时一系列的文本消息,这些数据在Drop对象中共享。

        `public class Drop {
            // Message sent from producer
            // to consumer.
            private String message;
            // True if consumer should wait
            // for producer to send message,
            // false if producer should wait for
            // consumer to retrieve message.
            private boolean empty = true;
    
            public synchronized String take() {
                // Wait until message is
                // available.
                while (empty) {
                    try {
                        wait();
                    } catch (InterruptedException e) {}
                }
                // Toggle status.
                empty = true;
                // Notify producer that
                // status has changed.
                notifyAll();
                return message;
            }
    
            public synchronized void put(String message) {
                // Wait until message has
                // been retrieved.
                while (!empty) {
                    try { 
                        wait();
                    } catch (InterruptedException e) {}
                }
                // Toggle status.
                empty = false;
                // Store message.
                this.message = message;
                // Notify consumer that status
                // has changed.
                notifyAll();
            }
        }`
    

    消费者线程在类Producer中进行定义,它发送一系列常见的消息。DONE字符串表示所有的消息已经被发送完成。为了模仿真实世界中应用程序的不可预知性,生产者线程在发送消息之间停止随机的时间间隔。

        `import java.util.Random;
    
        public class Producer implements Runnable {
            private Drop drop;
    
            public Producer(Drop drop) {
                this.drop = drop;
            }
    
            public void run() {
                String importantInfo[] = {
                    "Mares eat oats",
                    "Does eat oats",
                    "Little lambs eat ivy",
                    "A kid will eat ivy too"
                };
                Random random = new Random();
    
                for (int i = 0;
                     i < importantInfo.length;
                     i++) {
                    drop.put(importantInfo[i]);
                    try {
                        Thread.sleep(random.nextInt(5000));
                    } catch (InterruptedException e) {}
                }
                drop.put("DONE");
            }
        }`
    

    消费者线程在类Consumer中定义,它仅仅获取这些数据然后打印出来直到收到DONE字符串。这个线程也停止随机时间。

        `import java.util.Random;
    
        public class Consumer implements Runnable {
            private Drop drop;
    
            public Consumer(Drop drop) {
                this.drop = drop;
            }
    
            public void run() {
                Random random = new Random();
                for (String message = drop.take();
                     ! message.equals("DONE");
                     message = drop.take()) {
                    System.out.format("MESSAGE RECEIVED: %s%n", message);
                    try {
                        Thread.sleep(random.nextInt(5000));
                    } catch (InterruptedException e) {}
                }
            }
        }`
    

    最后,下面是启动生产线程和消费着线程的类ProducerConsumerExample。

        `public class ProducerConsumerExample {
            public static void main(String[] args) {
                Drop drop = new Drop();
                (new Thread(new Producer(drop))).start();
                (new Thread(new Consumer(drop))).start();
            }
        }`
    

    注意:类Drop这样写是为了展示警戒块。为了避免重复造轮子,在去编写你的共享对象时吗,先去Java Collections Framework 中检查一下已经存在的数据结构。如果你想了解更多的信息,请跳转到章节Questions and Exercises


    不可变对象

    如果一个对象在创建之后,它的状态就不可以改变了,那么这个对象就是不可变对象。对不可变对象的最大依赖被广泛认为是创建简单可靠代码的合理策略。

    不可变对象在并发应用当中特别有用。因为它们的状态无法改变,它们不可能受线程干扰和状态不一致的影响。

    开发者经常不情愿的去使用不可变对象,比起更新对象的开销它们更担心的是创建一个新的对象的开销。创建新对象的影响经常被估高了,这些影响可以被一些高效的不可变对像抵消。这包括减少由于gc或者为了消除让那些可变对象在并发中运行的代码的开销。

    下面的几个小节从可变的对象的类当中得到不可变对象的类。通过这么做,让大家知道这种通用的转换规则,同时也展示了一些不可变对象的优点。

    一个同步类的例子

    类SynchronizedRGB定义了代表颜色的对象。每一个对象代表一种颜色,由三个int类型代表颜色的和颜色的名字组成。

        `public class SynchronizedRGB {
    
            // Values must be between 0 and 255.
            private int red;
            private int green;
            private int blue;
            private String name;
    
            private void check(int red,
                               int green,
                               int blue) {
                if (red < 0 || red > 255
                    || green < 0 || green > 255
                    || blue < 0 || blue > 255) {
                    throw new IllegalArgumentException();
                }
            }
    
            public SynchronizedRGB(int red,
                                   int green,
                                   int blue,
                                   String name) {
                check(red, green, blue);
                this.red = red;
                this.green = green;
                this.blue = blue;
                this.name = name;
            }
    
            public void set(int red,
                            int green,
                            int blue,
                            String name) {
                check(red, green, blue);
                synchronized (this) {
                    this.red = red;
                    this.green = green;
                    this.blue = blue;
                    this.name = name;
                }
            }
    
            public synchronized int getRGB() {
                return ((red << 16) | (green << 8) | blue);
            }
    
            public synchronized String getName() {
                return name;
            }
    
            public synchronized void invert() {
                red = 255 - red;
                green = 255 - green;
                blue = 255 - blue;
                name = "Inverse of " + name;
            }
        }`
    

    使用类SynchronizedRGB必须特别小心,避免出现名字和颜色的状态不统一的情况。假定,例如,一个线程执行了下面的代码:

        `SynchronizedRGB color =
            new SynchronizedRGB(0, 0, 0, "Pitch Black");
        ...
        int myColorInt = color.getRGB();      //Statement 1
        String myColorName = color.getName(); //Statement 2`
    

    如果另外一个线程在语句一和语句二之间调用了color的set方法,myColorInt的值将和myColorName的值不符。为了避免这种情况发生,这个两条语句必须绑在一起执行:

        `synchronized (color) {
            int myColorInt = color.getRGB();
            String myColorName = color.getName();
        } `
    

    这一类的不一致只可能在可变的对象中出现,这对于类SynchronizedRGB的不可变版本来说不是问题。

    定义不可变对象的策略

    下面几条规则定义了一个简单的创建不可变对象的策略。不是所有记录在案的不可变对象都遵循这些规则。但这不是指这些类的创建者很粗心——他们也许有更好的理由去保证这些类的实例在创建之后不会改变状态。然而这些策略需要复杂的分析,不适合初学者。

    1. 不提供改变属性或者对象引用属性的setter方法。
    2. 所有的属性都声明为final和private的。
    3. 不允许子类重写方法。最简单实现这个的方法是声明类为final的。 一个更加复杂的方法是声明构造器是private的,在工厂方法中创建实例。
    4. 如果实例属性中包含可变对象,不允许这些对象被改变:
    • 不提供改变这些对象的方法。
    • 不要共享可变对象的引用。决不存储传递给构造器的外部可变的对象的引用;如果需要,创建副本,存储副本的引用。同样,创建你的内部可变对象的引用时,要避免直接使用原来的方法。

    将这些策略应用到类SynchronizedRGB的结果如果:

    1. 在这个类当中有两个setter方法。第一个方法set可以任意改变对象的状态,在不可变版本的类当中需要删除。第二个方法invert可以转化为创建一个新对象而不是改变现有的对象。
    2. 所有的属性已经是private的了;它们需要进一步声明为final的。
    3. 将类声明为final的。
    4. 只有一个属性指向这个对象,且这个对象本身是不可变的。所以防范改变包含的可变的对象是不需要的。

    下面是改变之后的ImmutableRGB:

        `final public class ImmutableRGB {
    
            // Values must be between 0 and 255.
            final private int red;
            final private int green;
            final private int blue;
            final private String name;
    
            private void check(int red,
                               int green,
                               int blue) {
                if (red < 0 || red > 255
                    || green < 0 || green > 255
                    || blue < 0 || blue > 255) {
                    throw new IllegalArgumentException();
                }
            }
    
            public ImmutableRGB(int red,
                                int green,
                                int blue,
                                String name) {
                check(red, green, blue);
                this.red = red;
                this.green = green;
                this.blue = blue;
                this.name = name;
            }
    
    
            public int getRGB() {
                return ((red << 16) | (green << 8) | blue);
            }
    
            public String getName() {
                return name;
            }
    
            public ImmutableRGB invert() {
                return new ImmutableRGB(255 - red,
                               255 - green,
                               255 - blue,
                               "Inverse of " + name);
            }
        }`
    

    高级别的并发对象

    到目前为止,从一开始我们主要集中在讲解Java平台的部分低级别的API。对于一些基本的任务这些API可以胜任,但是一些更高级的任务需要更高级别的构建块。这对于充分利用当今的多处理器和多核系统的大规模并发应用来说尤其如此。

    在本节中,我们将介绍Java平台5.0版本中引入的一些高级并发功能。这些功能中的大多数都是在新包java.util.concurrent中实现的。在Java容器框架中也有新的并发数据结构。

    • Lock objects支持简化许多应用程序的锁定方法。
    • Executors定义了一种高级别的启动和管理线程的API。包java.util.concurrent的Executor的实现提供了使用于大型应用程序的线程池管理。
    • 并发集合使管理大量数据变得更容易,可以极大的减少同步的需要。
    • 原子变量有最小化同步的特性,且可以帮助避免内存一致性错误。
    • 在JDK7中的ThreadLocalRandom类提供了在多线程中高效产生随机数的方式。

    锁对象

    同步代码依赖一种简单的重入锁。这种锁极易使用但是也有很多限制。包java.util.concurrent.locks支持更加复杂的锁方式。我们不会去检查这个包的详情,我们主要是集中在基础的接口Lock
    上面。

    锁对象和同步代码的隐士锁一样工作。和隐士锁一样,在同一时刻,只有一个线程可以占有锁对象。锁对象同时也支持wait或notify原理,通过相关联的Condition对象实现。

    锁对象相比于隐士锁最大的优势是锁对象有尝试获得锁然后退出的能力。tryLock方法会立即退出或者在一个指定的超时时间后退出。lockInterruptibly方法在它获得锁之前如果另外一个线程发送了中断会退出。

    让我们来用锁对象解决在Liveness章节中遇到的死锁问题。A和B通过训练知道什么时候别人会鞠躬。我们通过要求我们的朋友对象在继续进行鞠躬之前必须获得两个参与者的锁来模拟这种改进。下面是改进的类SafeLock的源代码。为了展示这种方式的多功能性,我们假定A和B非常喜欢他们新的安全的鞠躬的能力,他们不停的像对方鞠躬。

        `import java.util.concurrent.locks.Lock;
        import java.util.concurrent.locks.ReentrantLock;
        import java.util.Random;
    
        public class Safelock {
            static class Friend {
                private final String name;
                private final Lock lock = new ReentrantLock();
    
                public Friend(String name) {
                    this.name = name;
                }
    
                public String getName() {
                    return this.name;
                }
    
                public boolean impendingBow(Friend bower) {
                    Boolean myLock = false;
                    Boolean yourLock = false;
                    try {
                        myLock = lock.tryLock();
                        yourLock = bower.lock.tryLock();
                    } finally {
                        if (! (myLock && yourLock)) {
                            if (myLock) {
                                lock.unlock();
                            }
                            if (yourLock) {
                                bower.lock.unlock();
                            }
                        }
                    }
                    return myLock && yourLock;
                }
                    
                public void bow(Friend bower) {
                    if (impendingBow(bower)) {
                        try {
                            System.out.format("%s: %s has"
                                + " bowed to me!%n", 
                                this.name, bower.getName());
                            bower.bowBack(this);
                        } finally {
                            lock.unlock();
                            bower.lock.unlock();
                        }
                    } else {
                        System.out.format("%s: %s started"
                            + " to bow to me, but saw that"
                            + " I was already bowing to"
                            + " him.%n",
                            this.name, bower.getName());
                    }
                }
    
                public void bowBack(Friend bower) {
                    System.out.format("%s: %s has" +
                        " bowed back to me!%n",
                        this.name, bower.getName());
                }
            }
    
            static class BowLoop implements Runnable {
                private Friend bower;
                private Friend bowee;
    
                public BowLoop(Friend bower, Friend bowee) {
                    this.bower = bower;
                    this.bowee = bowee;
                }
            
                public void run() {
                    Random random = new Random();
                    for (;;) {
                        try {
                            Thread.sleep(random.nextInt(10));
                        } catch (InterruptedException e) {}
                        bowee.bow(bower);
                    }
                }
            }
                    
    
            public static void main(String[] args) {
                final Friend alphonse =
                    new Friend("Alphonse");
                final Friend gaston =
                    new Friend("Gaston");
                new Thread(new BowLoop(alphonse, gaston)).start();
                new Thread(new BowLoop(gaston, alphonse)).start();
            }
        }`
    

    Executors

    Executor的一些接口

    包java.util.concurrent定义了3个executor接口:

    • Executor,一个可以支持启动新任务的简单接口。
    • ExecutorService是Executor的子接口,它增加了有助于管理生命周期的功能,包括单个任务和执行器本身。
    • ScheduledExecutorService是ExecutorService的子接口,支持未来和/或定期执行任务。

    通常,引用执行器对象的变量被声明为这三种接口类型之一,而不是执行器类类型。

    Executor接口

    Executor接口提供了一个简单的execute方法,这个方法被设计成代替常见的线程创建方式。如果r是一个Runnable对象,且e是一个Executor对象,你可以用替换

        `(new Thread(r)).start();`
    

                `e.execute(r);`
    

    然而execute的定义不太具体。低级别的方式创建一个线程然后立即启动它。依赖于Executor的实现,execute方法可以做相同的事情,但是这个更像是使用一个已经存在的线程去跑r或者是说把r放到一个等待队列中,等到有空闲的工作线程。我们将在章节Thread Pools中讲述工作线程。

    ExecutorService接口

    ExecutorService接口提供类似于execute的更通用的submit方法。和execute方法一样,submit方法接受Runnable的对象,同时也接受Callable
    对象,这个对象允许任务有返回值。submit方法返回一个Future
    的对象,这个对象用来获得Callable的返回值,管理Callable和Runnable任务的状态。

    ExecutorService
    接口也提供方法提交大量的Callable对象。最后,ExecutorService提供一些管理executor停止执行的方法。为了支持立即的停止执行,任务需要正确的处理中断。

    ScheduledExecutorService Interface

    ScheduledExecutorService
    接口schedule方法,这个方法可以在指定的延迟之后执行Runnable或者是Callable任务。而且,接口还定义了scheduleAtFixedRate和schedulerWithFixedDelay方法,这些方法可以在指定的时间间隔内重复执行。

    线程池

    大多数在包java.util.concurrent中的executor的实现都使用了线程池,线程池当中包含工作线程。这种线程和Runable任务,Callable任务是不同的,它们经常被用来执行多任务。

    使用工作线程最小化线程创建所带来的开销。线程对象需要使用大量的内存,在大规模的应用中,多线程对象的分配和释放需要消耗大量的内存。

    一种常见的线程池类型是固定大小的线程池。这种类型的线程池当中总是保持指定大小的线程数运行着;如果一个线程在使用当中因为某些原因被终止了,它将自动的被一个新的线程替代。任务通过一个内部的队列提交到线程池当中,当提交的任务数超过线程数时,这些超过的任务会进入到队列当中去。

    固定线程池的一个重要优点是应用程序可以优雅降级的使用它。为了理解这个,假设一个网页服务应用程序用单独的线程处理每个http的请求。如果应用程序为每个新的http请求都创建一个新的线程,紧接着系统会立即收到很多线程,当所有这些线程的开销超过系统的容量时应用程序会突然停止响应所有的请求。因为线程创建的数量是有限制的,应用程序将不能够按照http请求进来的速度响应它们,但是应用程序会以自己所能处理的最快速度去响应它们。

    一个简单的创建固定大小的线程池的方法是调用java.util.concurrent.Executors
    newFixedThreadPool
    工厂方法。这个类同时也提供了下列的这些工厂方法:

    • newCachedThreadPool
      方法创建一个有着可以扩展大小的线程池的执行器。此执行器适用于启动许多短时间任务的应用程序。

    • newSingleThreadExecutor
      方法创建一个在同一时刻只执行单个任务的执行器。

    • 数个工厂方法是上述执行器的ScheduledExecutorService数个版本。

    如果上述executors提供的工厂方法没有一个满足你的要求,创建 java.util.concurrent.ThreadPoolExecutor
    java.util.concurrent.ScheduledThreadPoolExecutor
    的实例将给你更多的选择。

    Fork/Join

    Fork/Join是一种实现了ExecutorService的框架,它可以帮助你充分利用多处理器的优势。它是为那些可以递归的分解成更小的任务的工作而设计的。它的目的是使用所有可用的处理器的能力来增强应用的性能。

    和其它ExecutorService的实现一样,fork/join框架将任务分配给在线程池中的工作线程。但是fork/join框架的区别是它使用了work-stealing算法。工作线程在做完自己的任务之后可以偷其它繁忙的线程的任务来做。

    fork/join框架的核心是类是ForkJoinPool
    ,它是类AbstractExecutorService的扩展。ForkJoinPool实现了work-stealing算法,它可以执行ForkJoinTask

    基本使用方法

    使用fork/join框架的第一步是编写工作的一段的代码。你的代码应该和下面的这些代码类似:

    if (my portion of the work is small enough) do the work directly else split my work into two pieces invoke the two pieces and wait for the results

    把这段代码放在ForkJoinTask的子类当中,典型的使用更加专业的类,RecursiveTask
    (可以返回结果)或者是RecursiveAction

    在你的ForkJoinTask的子类准备好了之后,创建一个代表所有的需要做的工作的对象,然后把它给到ForkJoinPool实例的invoke方法。

    模糊处理

    为了帮你理解fork/join框架是如何工作的,看一下下面的例子。假定你要模糊处理一张图片。一个整数类型的数组代表原始的图片,每一个单个的整数代表单个像素的颜色的值。经过模糊处理的目标图片也是用与原图片相同大小的整数数组表示的。

    模糊处理是通过一次处理原数组的一个像素来完成的。每一个像素取它的周围的像素的平均值 (红色、绿色和蓝色分别平均),然后将结果放到目标数组当中。因为一张图片是一个很大的数组,这样子处理会消耗很长的时间。你可以使用fork/join框架充分利用多处理器系统的并发处理能力。下面是一个可能的实现:

    `public class ForkBlur extends RecursiveAction {
    private int[] mSource;
    private int mStart;
    private int mLength;
    private int[] mDestination;
    
    // Processing window size; should be odd.
    private int mBlurWidth = 15;
    
    public ForkBlur(int[] src, int start, int length, int[] dst) {
        mSource = src;
        mStart = start;
        mLength = length;
        mDestination = dst;
    }
    
    protected void computeDirectly() {
        int sidePixels = (mBlurWidth - 1) / 2;
        for (int index = mStart; index < mStart + mLength; index++) {
            // Calculate average.
            float rt = 0, gt = 0, bt = 0;
            for (int mi = -sidePixels; mi <= sidePixels; mi++) {
                int mindex = Math.min(Math.max(mi + index, 0),
                                    mSource.length - 1);
                int pixel = mSource[mindex];
                rt += (float)((pixel & 0x00ff0000) >> 16)
                      / mBlurWidth;
                gt += (float)((pixel & 0x0000ff00) >>  8)
                      / mBlurWidth;
                bt += (float)((pixel & 0x000000ff) >>  0)
                      / mBlurWidth;
            }
          
            // Reassemble destination pixel.
            int dpixel = (0xff000000     ) |
                   (((int)rt) << 16) |
                   (((int)gt) <<  8) |
                   (((int)bt) <<  0);
            mDestination[index] = dpixel;
        }
    }`
    

    ...

    未完待续~~~

    相关文章

      网友评论

      本文标题:Java 并发

      本文链接:https://www.haomeiwen.com/subject/wvjpbttx.html