美文网首页
Java并发编程 -- 基础知识

Java并发编程 -- 基础知识

作者: TomyZhang | 来源:发表于2019-07-17 20:11 被阅读0次

    一、线程安全性

    如果当多个线程访问同一个可变的状态变量时没有使用合适的同步,那么程序就会出现错误。有三种方式可以修复这个问题:

    • 不在线程之间共享该状态变量。
    • 将状态变量修改为不可变的变量。
    • 在访问状态变量时使用同步。

    1.什么是线程安全性

    当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些线程将如何交替执行,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么就称这个类是线程安全的。

    在线程安全类中封装了必要的同步机制,因此客户端无须进一步采取同步措施。

    无状态对象一定是线程安全的。

    2.原子性

    递增操作:

    ++count;
    

    这是一个"读取-修改-写入"的操作序列,并且其结果状态依赖于之前的状态。因此这个操作并非原子的。

    竞态条件:
    在并发编程中,由于不恰当的执行时序而出现不正确的结果是一种非常重要的情况,它有一个正式的名字,称为竞态条件。

    要避免竞态条件问题,就必须在某个线程修改该变量时,通过某种方式防止其他线程使用这个变量,从而确保其他线程只能在修改操作完成之前或之后读取和修改状态,而不是在修改状态的过程中。

    3.加锁机制

    要保持状态的一致性,就需要在单个原子操作中更新所有相关的状态变量。

    Java提供了一种内置的锁机制来支持原子性:同步代码块。同步代码块包括两部分:一个作为锁的对象引用,一个作为由这个锁保护的代码块。以关键字synchronized来修饰的方法就是一种横跨整个方法体的同步代码块,其中该同步代码块的锁就是方法调用所在的对象。静态的synchronized方法以Class对象作为锁。

    synchronized(lock) {
        //访问或修改由锁保护的共享状态
    }
    

    内置锁:
    每个Java对象都可以用做一个实现同步的锁,这些锁被称为内置锁或监视器锁。线程在进入同步代码块之前会自动获得锁,并且在退出同步代码块时自动释放锁,而无论是通过正常的控制路径退出,还是通过从代码块中抛出异常退出。获得内置锁的唯一途径就是进入由这个锁保护的同步代码块或方法。

    Java的内置锁相当于一种互斥体(或互斥锁),这意味着最多只有一个线程能持有这种锁。当线程A尝试获取一个由线程B持有的锁时,线程A必须等待或者阻塞,直到线程B释放这个锁。如果B永远不释放锁,那么A也将永远地等下去。

    由于每次只能有一个线程执行内置锁保护的代码块,因此,由这个锁保护的同步代码块会以原子方式执行,多个线程在执行该代码块时也不会相互干扰。并发环境中的原子性与事务应用程序中的原子性有着相同的含义——一组语句作为一个不可分割的单元被执行。任何一个执行同步代码块的线程,都不可能看到有其他线程正在执行由同一个锁保护的同步代码块。

    重入:
    当某个线程请求一个由其他线程持有的锁时,发出请求的线程就会阻塞。然而,由于内置锁是可重入的,因此如果某个线程试图获得一个已经由它自己持有的锁,那么这个请求就会成功。

    4.用锁来保护状态

    对于可能被多个线程同时访问的可变状态变量,在访问它时都需要持有同一个锁,在这种情况下,我们称状态变量是由这个锁保护的。

    每个共享的和可变的变量都应该只由一个锁来保护,从而使维护人员知道是哪一个锁。

    对于每个包含多个变量的不变性条件,其中涉及的所有变量都需要由同一个锁来保护。

    5.活跃性与性能

    通常,在简单性与性能之间存在着相互制约因素。当实现某个同步策略时,一定不要盲目地为了性能而牺牲简单性(这可能会破坏安全性)。

    当执行时间较长的计算或者可能无法快速完成的操作时(例如,网络I/O或控制台I/O),一定不要持有锁。

    二、对象的共享

    1.可见性

    在单线程环境中,如果向某个变量先写入值,然后在没有其他写入操作的情况下读取这个变量,那么总能得到相同的值。然而,当读操作和写操作在不同的线程中执行时,情况却并非如此,我们无法确保执行读操作的线程能适时地看到其他线程写入的值,有时甚至是根本不可能的事情。为了确保多个线程之间对内存写入操作的可见性,必须使用同步机制。

    重排序:
    在没有同步的情况下,编译器、处理器以及运行时等都可能对操作的执行顺序进行一些意想不到的调整。在缺乏足够同步的多线程程序中,要想对内存操作的执行顺序进行判断,几乎无法得出正确的结论。

    加锁的含义不仅仅局限于互斥行为,还包括内存可见性。为了确保所有线程都能看到共享变量的最新值,所有执行读操作或者写操作的线程都必须在同一个锁上同步。

    volatile变量:
    Java语言提供了一种稍弱的同步机制,即volatile变量,用来确保将变量的更新操作通知到其他线程。当把变量声明为volatile类型后,编译器与运行时都会注意到这个变量是共享的,因此不会将该变量上的操作与其他内存操作一起重排序。volatile变量不会被缓存在寄存器或者对其他处理器不可见的地方,因此在读取volatile类型的变量时总会返回最新写入的值。

    加锁机制既可以确保可见性又可以确保原子性,而volatile变量只能确保可见性。

    当且仅当满足以下所有条件时,才应该使用volatile变量:

    • 对变量的写入操作不依赖变量的当前值,或者能确保只有单个线程更新变量的值。
    • 该变量不会与其他状态变量一起纳入不变性条件中。
    • 在访问变量时不需要加锁。

    2.发布与逸出

    发布一个对象的意思是指,使对象能够在当前作用域之外的代码中使用。例如,将一个指向该对象的引用保存到其他代码可以访问的地方,或者在某一个非私有的方法中返回该引用,或者将引用传递到其他类的方法中。在许多情况中,我们要确保对象及其内部状态不被发布。而在某些情况下,我们又需要发布某个对象,但如果在发布时要确保线程安全性,则可能需要同步。发布内部状态可能会破坏封装性,并使得程序难以维持不变性条件。例如,如果在对象构造完成之前就发布该对象,就会破坏线程安全性。当某个不应该发布的对象被发布时,这种情况就被称为逸出。

    当发布一个对象时,在该对象的非私有域中引用的所有对象同样会被发布。一般来说,如果一个已经发布的对象能够通过非私有的变量引用和方法调用到达其他的对象,那么这些对象也都会被发布。

    3.线程封闭

    当访问共享的可变数据时,通常需要使用同步。一种避免使用同步的方式就是不共享数据。如果仅在单线程内访问数据,就不需要同步。这种技术被称为线程封闭。

    在Java语言中并没有强制规定某个变量必须由锁来保护,同样在Java语言中也无法强制将对象封闭在某个线程中。线程封闭是在程序设计中的一个考虑因素,必须在程序中实现。Java语言及其核心库提供了一些机制来帮助维持线程封闭性,例如局部变量和ThreadLocal类,但即便如此,程序员仍然需要负责确保封闭在线程中的对象不会从线程中逸出。

    Ad-hoc线程封闭:
    Ad-hoc线程封闭是指,维护线程封闭性的职责完全由程序实现来承担。Ad-hoc线程封闭是非常脆弱的,因为没有任何一种语言特性,例如可见性修饰符或局部变量,能将对象封闭到目标线程上。

    在volatile变量上存在一种特殊的线程封闭。只要能确保只有单个线程对共享的volatile变量执行写入操作,那么就可以安全地在这些共享的volatile变量上执行"读取-修改-写入"的操作。在这种情况下,相当于将修改操作封闭在单个线程中以防止发生竞态条件,并且volatile变量的可见性保证还确保了其他线程能看到最新的值。

    由于Ad-hoc线程封闭技术的脆弱性,因此在程序中尽量少用它,在可能的情况下,应该使用更强的线程封闭技术(例如,栈封闭或ThreadLocal类)。

    栈封闭:
    栈封闭是线程封闭的一种特例,在栈封闭中,只能通过局部变量才能访问对象。栈封闭(也被称为线程内部使用或者线程局部使用,不要与核心类库中的ThreadLocal混淆)比Ad-hoc线程封闭更易于维护,也更加健壮。

    对于基本类型的局部变量,无论如何都不会破坏栈封闭性。由于任何方法都无法获得对基本类型的引用,因此Java语言的这种语义就确保了基本类型的局部变量始终封闭在线程内。

    在维持对象引用的栈封闭性时,程序员需要多做一些工作以确保被引用的对象不会逸出。

    ThreadLocal类:
    维持线程封闭性的一种更规范方法是使用ThreadLocal,这个类能使线程中的某个值与保存值的对象关联起来。ThreadLocal提供了get与set等访问接口或方法,这些方法为每个使用该变量的线程都存有一份独立的副本,因此get总是返回由当前执行线程在调用set时设置的最新值。

    ThreadLocal变量类似于全局变量,它能降低代码的可重用性,并在类之间引入隐含的耦合性,因此在使用时要格外小心。

    4.不变性

    如果某个对象在被创建后其状态就不能被修改,那么这个对象就称为不可变对象。不可变对象一定是线程安全的。

    当满足以下条件时,对象才是不可变的:

    • 对象创建以后其状态就不能修改。
    • 对象的所有域都是final类型。
    • 对象是正确创建的(在对象的创建期间,this引用没有逸出)。

    final域:
    final类型的域是不能修改的,但如果final域所引用的对象是可变的,那么这些被引用的对象是可以修改的。

    正如"除非需要更高的可见性,否则应将所有的域都声明为私有域"是一个良好的编程习惯,"除非需要某个域是可变的,否则应将其声明为final域"也是一个良好的编程习惯。

    5.安全发布

    在某些情况下我们希望在多个线程间共享对象,此时必须确保安全地进行共享。

    不可变对象与初始化安全性:
    任何线程都可以在不需要额外同步的情况下安全地访问不可变对象,即使在发布这些对象时没有使用同步。

    安全发布的常用模式:
    要安全地发布一个对象,对象的引用以及对象的状态必须同时对其他线程可见。一个正确构造的对象可以通过以下方式来安全地发布:

    • 在静态初始化函数中初始化一个对象引用。
    • 将对象的引用保存到volatile类型的域或者AtomicReference对象中。
    • 将对象的引用保存到某个正确构造对象的final类型域中。
    • 将对象的引用保存到一个由锁保护的域中。

    事实不可变对象:
    如果对象从技术上来看是可变的,但其状态在发布后不会再改变,那么把这种对象称为"事实不可变对象"。在没有额外的同步的情况下,任何线程都可以安全地使用被安全发布的事实不可变对象。

    可变对象:
    如果对象在构造后可以修改,那么安全发布只能确保"发布当时"状态的可见性。对于可变对象,不仅在发布对象时需要使用同步,而且在每次对象访问时同样需要使用同步来确保后续修改操作的可见性。

    对象的发布需求取决于它的可变性:

    • 不可变对象可以通过任意机制来发布。
    • 事实不可变对象必须通过安全方式来发布。
    • 可变对象必须通过安全方式来发布,并且必须是线程安全的或者由某个锁保护起来。

    在并发程序中使用和共享对象时,可以使用一些实用的策略,包括:

    • 线程封闭。
      线程封闭的对象只能由一个线程拥有,对象被封闭在该线程中,并且只能由这个线程修改。
    • 只读共享。
      在没有额外同步的情况下,共享的只读对象可以由多个线程并发访问,但任何线程都不能修改它。共享的只读对象包括不可变对象和事实不可变对象。
    • 线程安全共享。
      线程安全的对象在其内部实现同步,因此多个线程可以通过对象的公有接口来进行访问而不需要进一步的同步。
    • 保护对象。
      被保护的对象只能通过持有特定的锁来访问。保护对象包括封装在其他线程安全对象中的对象,以及已发布的并且由某个特定锁保护的对象。

    三、对象的组合

    1.设计线程安全的类

    通过使用封装技术,可以使得在不对整个程序进行分析的情况下就可以判断一个类是否是线程安全的。

    在设计线程安全类的过程中,需要包含以下三个基本要素:

    • 找出构成对象状态的所有变量。
    • 找出约束状态变量的不变性条件。
    • 建立对象状态的并发访问管理策略。

    要分析对象的状态,首先从对象的域开始。如果对象中所有的域都是基本类型的变量,那么这些域将构成对象的全部状态。如果在对象的域中引用了其他对象,那么该对象的状态将包含被引用对象的域。

    同步策略定义了如何在不违背对象不变条件或后验条件的情况下对其状态的访问操作进行协同。同步策略规定了如何将不可变性、线程封闭与加锁机制等结合起来以维护线程的安全性,并且还规定了哪些变量由哪些锁来保护。

    2.实例封闭

    如果某对象不是线程安全的,那么可以通过多种技术使其在多线程程序中安全地使用。可以确保该对象只能由单个线程访问(线程封闭),或者通过一个锁来保护对该对象的所有访问。

    将数据封装在对象内部,可以将数据的访问限制在对象的方法上,从而更容易确保线程在访问数据时总能持有正确的锁。

    封闭机制更易于构造线程安全的类,因为当封闭类的状态时,在分析类的线程安全性时就无须检查整个程序。

    3.线程安全性的委托

    如果一个类是由多个独立且线程安全的状态变量组成,并且在所有的操作中都不包含无效状态转换,那么可以将线程安全性委托给底层的状态变量。

    如果一个状态变量是线程安全的,并且没有任何不变性条件来约束它的值,在变量的操作上也不存在任何不允许的状态转换,那么就可以安全地发布这个变量。

    如果某个类含有复合操作,那么仅靠委托并不足以实现线程安全性。在这种情况下,这个类必须提供自己的加锁机制以保证这些复合操作都是原子操作,除非整个复合操作都可以委托给状态变量。

    4.在现有的线程安全类中添加功能

    要添加一个新的原子操作,最安全的做法是修改原始的类,但这通常无法做到,因为你可能无法访问或修改类的源代码。要想修改原始的类,就需要理解代码中的同步策略,这样增加的功能才能与原有的设计保持一致。如果直接将新方法添加到类中,那么意味着实现同步策略的所有代码仍然处于一个源代码文件中,从而更容易理解与维护。

    另一种方法是扩展这个类,假定在设计这个类时考虑了可扩展性。"扩展"方法比直接将代码添加到类中更加脆弱,因为现在的同步策略实现被分布到多个单独维护的源代码文件中。如果底层的类改变了同步策略并选择了不同的锁来保护它的状态变量,那么子类会被破坏,因为在同步策略改变后它无法再使用正确的锁来控制对基类状态的并发访问。

    客户端加锁机制:
    客户端加锁机制与扩展类机制有许多共同点,二者都是将派生类的行为与基类的实现耦合在一起。通过添加一个原子操作来扩展类是脆弱的,因为它将类的加锁代码分布到多个类中。然而,客户端加锁却更加脆弱,因为它将类C的加锁代码放到与C完全无关的其他类中。

    组合:
    当为现有的类添加一个原子操作时,有一种更好的方法:组合。

    public class ImprovedList<T> implements List<T> {
        private final List<T> list;
    
        public ImprovedList(List<T> list) {
            this.list = list;
        }
    
        public synchronized boolean putIfAbsent(T x) { //同步
            boolean contains = list.contains(x);
            if(!contains) {
                list.add(x);
            }
            return !contains;
        }
    
        public synchronized void clear() { //同步
            list.clear();
        }
    
        ...
    }
    

    虽然额外的同步可能导致轻微的性能损失,但是能实现线程安全性,且程序更为健壮。

    5.将同步策略文档化

    在文档中说明客户代码需要了解的线程安全性保证,以及代码维护人员需要了解的同步策略。

    在设计同步策略时需要考虑多个方面,例如,将哪些变量声明为volatile类型,哪些变量用锁来保护,哪些锁保护哪些变量,哪些变量必须是不可变的或者被封闭在线程中的,哪些操作必须是原子操作等。其中某些方面是严格的实现细节,应该将它们文档化以便于日后的维护。还有一些方面会影响类中加锁行为的外在表现,也应该将其作为规范的一部分写入文档。

    四、基础构建模块

    1.同步容器类

    同步容器类包括Vector和Hashtable,二者是早期JDK的一部分,此外还包括在JDK 1.2中添加的一些功能相似的类,这些同步的封装器类是由Collections.synchronizedXxx等工厂方法创建的。这些类实现线程安全的方式是:将它们的状态封装起来,并对每个公有方法都进行同步,使得每次只有一个线程能访问容器的状态。

    当容器在迭代过程中被修改时,就会抛出一个ConcurrentModificationException异常。因此必须在所有对共享容器进行迭代的地方进行加锁。

    2.并发容器

    Java 5.0提供了多种并发容器类来改进同步容器的性能。通过并发容器来代替同步容器,可以极大地提高伸缩性并降低风险。

    ConcurrentHashMap:
    同步容器类在执行每个操作期间都持有一个锁。然而ConcurrentHashMap并不是将每个方法都在同一个锁上同步并使得每次只能有一个线程访问容器,而是使用一种粒度更细的加锁机制来实现更大程度的共享,这种机制称为分段锁。在这种机制中,任意数量的读取线程可以并发地访问Map,执行读取操作的线程和执行写入操作的线程可以并发地访问Map,并且一定数量的写入线程可以并发地修改Map。ConcurrentHashMap带来的结果是,在并发访问环境下将实现更高的吞吐量,而在单线程环境中只损失非常小的性能。

    ConcurrentHashMap与其他并发容器一起增强了同步容器类:它们提供的迭代器不会抛出ConcurrentModificationException,因此不需要在迭代过程中对容器加锁。ConcurrentHashMap返回的迭代器具有弱一致性,而并非"及时失败"。弱一致性的迭代器可以容忍并发的修改,当创建迭代器时会遍历已有的元素,并可以(但是不保证)在迭代器被构造后将修改操作反映给容器。

    与Hashtable和synchronizedMap相比,ConcurrentHashMap有着更多的优势以及更少的劣势,因此在大多数情况下,用ConcurrentHashMap来代替同步Map能进一步提高代码的可伸缩性。只有当应用程序需要加锁Map以进行独占访问时,才应该放弃使用ConcurrentHashMap。

    额外的原子Map操作:
    由于ConcurrentHashMap不能被加锁来执行独占访问,因此我们无法使用客户端加锁来创建新的原子操作。但是,一些常见的复合操作,例如"若没有则添加"、"若相等则移除"和"若相等则替换"等,都已经实现为原子操作并且在ConcurrentMap的接口中声明。

    public interface ConcurrentMap<K, V> extends Map<K, V> {
        V putIfAbsent(K key, V value); //仅当K没有相应的映射值时才插入
        boolean remove(K key, V value); //仅当K被映射到V时才移除
        boolean replace(K key, V oldValue, V newValue); //仅当K被映射到oldValue时才替换为newValue
        V replace(K key, V newValue); //仅当K被映射到某个值时才替换为newValue
    }
    

    CopyOnWriteArrayList:
    CopyOnWriteArrayList用于替代同步List,在某些情况下它提供了更好的并发性能,并且在迭代期间不需要对容器进行加锁或复制。(类似地,CopyOnWriteArraySet的作用是替代同步Set。)

    "写入时复制(Copy-On-Write)"容器的线程安全性在于,只要正确地发布一个事实不可变的对象,那么在访问该对象时就不再需要进一步的同步。在每次修改时,都会创建并重新发布一个新的容器副本,从而实现可变性。"写入时复制"容器的迭代器保留一个指向底层基础数组的引用,这个数组当前位于迭代器的起始位置,由于它不会被修改,因此在对其进行同步时只需确保数组内容的可见性。因此,多个线程可以同时对这个容器进行迭代,而不会彼此干扰或者与修改容器的线程相互干扰。"写入时复制"容器返回的迭代器不会抛出ConcurrentModificationException,并且返回的元素与迭代器创建时的元素完全一致,而不必考虑之后修改操作带来的影响。

    显然,每当修改容器时都会复制底层数组,这需要一定的开销,特别是当容器的规模较大时。仅当迭代操作远远多于修改操作时,才应该使用"写入时复制"容器。

    3.阻塞队列和生产者-消费者模式

    阻塞队列提供了可阻塞的put和take方法,以及支持定时的offer和poll方法。如果队列已经满了,那么put方法将阻塞直到有空间可用;如果队列为空,那么take方法将会阻塞直到有元素可用。队列可以是有界的也可以是无界的,无界队列永远都不会充满,因此无界队列上的put方法也永远不会阻塞。

    阻塞队列支持生产者-消费者这种设计模式,该模式将"找出需要完成的工作"与"执行工作"这两个过程分离开来,并把工作项放入一个"待完成"列表中以便在随后处理,而不是找出后立即处理。生产者-消费者模式能简化开发过程,因为它消除了生产者类和消费者类之间的代码依赖性,此外,该模式还将生产数据的过程与使用数据的过程解耦开来以简化工作负载的管理,因为这两个过程在处理数据的速率上有所不同。

    在构建高可靠的应用程序时,有界队列是一种强大的资源管理工具:它们能抑制并防止产生过多的工作项,使应用程序在负荷过载的情况下变得更加健壮。

    在类库中包含了BlockingQueue的多种实现,其中,LinkedBlockingQueue和ArrayBlockingQueue是FIFO队列,二者分别与LinkedList和ArrayList类似,但比同步List拥有更好的并发性能。PriorityBlockingQueue是一个按优先级排序的队列,当你希望按照某种顺序而不是FIFO来处理元素时,这个队列将非常有用。正如其他有序的容器一样,PriorityBlockingQueue既可以根据元素的自然顺序来比较元素(如果它们实现了Comparable方法),也可以使用Comparator来比较。

    最后一个BlockingQueue实现是SynchronousQueue,实际上它不是一个真正的队列,因为它不会为队列中元素维护存储空间。与其他队列不同的是,它维护一组线程,这些线程在等待着把元素加入或移出队列。因为SynchronousQueue没有存储功能,因此put和take会一直阻塞,直到有另一个线程已经准备好参与到交付过程中。仅当有足够多的消费者,并且总是有一个消费者准备好获取交付的工作时,才适合使用同步队列。

    4.阻塞方法与中断方法

    线程可能会阻塞或暂停执行,原因有多种:等待I/O操作结束,等待获得一个锁,等待从Thread.sleep方法中醒来,或是等待另一个线程的计算结果。

    BlockingQueue的put和take等方法会抛出受检异常InterruptedException,这与类库中其他一些方法的做法相同,例如Thread.sleep。当某方法抛出InterruptedException时,表示该方法是一个阻塞方法,如果这个方法被中断,那么它将努力提前结束阻塞状态。Thread提供了interrupt方法,用于中断线程或者查询线程是否已经被中断。每个线程都有一个布尔类型的属性,表示线程的中断状态,当中断线程时将设置这个状态。

    当在代码中调用了一个将抛出InterruptedException异常的方法时,你自己的方法也就变成了一个阻塞方法,并且必须要处理对中断的响应。对于库代码来说,有两种基本选择:

    • 传递InterruptedException。
      避开这个异常通常是最明智的策略,只需把InterruptedException传递给方法的调用者。传递InterruptedException的方法包括,根本不捕获该异常,或者捕获该异常,然后在执行某种简单的清理工作后再次抛出这个异常。
    • 恢复中断。
      有时候不能抛出InterruptedException,例如当代码是Runnable的一部分时。在这些情况下,必须捕获InterruptedException,并通过调用当前线程上的interrupt方法恢复中断状态,这样在调用栈中更高层的代码将看到引发了一个中断。
    public class TaskRunnable implements Runnable {
        BlockingQueue<Task> queue;
        ...
        public void run() {
            try {
                processTask(queue.take());
            } catch(InterruptedException e) {
                Thread.currentThread().interrupt(); //恢复被中断的状态
            } 
        }
    }
    

    5.同步工具类

    在容器类中,阻塞队列是一种独特的类,它们不仅能作为保存对象的容器,还能协调生产者和消费者等线程之间的控制流,因为take和put等方法将阻塞,直到队列达到期望的状态(队列既非空,也非满)。同步工具类可以是任何一个对象,只要它根据其自身的状态来协调线程的控制流。阻塞队列可以作为同步工具类,其他类型的同步工具类还包括信号量(Semaphore)、栅栏(Barrier)以及闭锁(Latch)。

    所有的同步工具类都包含一些特定的结构化属性:它们封装了一些状态,这些状态将决定执行同步工具类的线程是继续执行还是等待,此外还提供了一些方法对状态进行操作,以及另一些方法用于高效地等待同步工具类进入到预期状态。

    闭锁:
    闭锁是一种同步工具类,可以延迟线程的进度直到其到达终止状态。闭锁的作用相当于一扇门:在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有的线程通过。当闭锁到达结束状态后,将不会再改变状态,因此这扇门将永远保持打开状态。闭锁可以用来确保某些活动直到其他活动都完成后才继续执行。

    CountDownLatch是一种灵活的闭锁实现,它可以使一个或多个线程等待一组事件发生。闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量。countDown方法递减计数器,表示有一个事件已经发生了,而await方法等待计数器达到零,这表示所有需要等待的事件都已经发生。如果计数器的值非零,那么await会一直阻塞直到计数器为零,或者等待中的线程中断,或者等待超时。

    //TestHarness
    public class TestHarness {
        public void timeTasks(int nThreads) throws InterruptedException {
            final CountDownLatch startGate = new CountDownLatch(1);
            final CountDownLatch endGate = new CountDownLatch(nThreads);
    
            for(int i=0; i<nThreads; i++) {
                Thread t = new Thread() {
                    @Override
                    public void run() {
                        try {
                            Log.d("TestHarness", "zwm, thread: " + Thread.currentThread().getName() + " startGate await before");
                            startGate.await();
                            Log.d("TestHarness", "zwm, thread: " + Thread.currentThread().getName() + " startGate await after");
                            try {
                                //do something
                            } finally {
                                endGate.countDown();
                            }
                        } catch (InterruptedException e) {
                        }
                    }
                };
                t.start();
            }
    
            startGate.countDown();
            Log.d("TestHarness", "zwm, thread: " + Thread.currentThread().getName() + " endGate await before");
            endGate.await();
            Log.d("TestHarness", "zwm, thread: " + Thread.currentThread().getName() + " endGate await after");
        }
    }
    
    //测试代码
    private void testMethod() {
        TestHarness testHarness = new TestHarness();
        try {
            testHarness.timeTasks(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    //输出log
    2019-07-17 16:09:43.361 main endGate await before
    2019-07-17 16:09:43.362 Thread-10 startGate await before
    2019-07-17 16:09:43.362 Thread-10 startGate await after
    2019-07-17 16:09:43.375 Thread-12 startGate await before
    2019-07-17 16:09:43.375 Thread-11 startGate await before
    2019-07-17 16:09:43.375 Thread-12 startGate await after
    2019-07-17 16:09:43.375 Thread-11 startGate await after
    2019-07-17 16:09:43.378 main endGate await after
    

    FutureTask:
    FutureTask也可以用做闭锁。FutureTask表示的计算是通过Callable来实现的,相当于一种可生成结果的Runnable,并且可以处于以下3种状态:等待运行、正在运行和运行完成。"执行完成"表示计算的所有可能结束方式,包括正常结束、由于取消而结束和由于异常而结束等。当FutureTask进入完成状态后,它会永远停止在这个状态上。

    Future.get的行为取决于任务的状态。如果任务已经完成,那么get会立即返回结果,否则get将阻塞直到任务进入完成状态,然后返回结果或者抛出异常。FutureTask将计算结果从执行计算的线程传递到获取这个结果的线程,而FutureTask的规范确保了这种传递过程能实现结果的安全发布。

    //Preloader
    public class Preloader {
    
        public class MyExcepton extends Exception {
    
        }
    
        private final FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
            @Override
            public String call() throws MyExcepton {
                Log.d("Preloader", "zwm, call");
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    return "InterruptedException";
                }
    
                if(false) {
                    throw new MyExcepton();
                }
    
                return "tomorrow";
            }
        });
    
        public void runTask() {
            Log.d("Preloader", "runTask");
            Thread threaad1 = new Thread(futureTask);
            threaad1.start();
        }
    
        public void getResult() throws MyExcepton, InterruptedException{
            Log.d("Preloader", "getResult");
            String result;
            try {
                result = futureTask.get();
                Log.d("Preloader", "zwm, result: " + result);
            } catch (ExecutionException e) {
                Throwable cause = e.getCause();
                if(cause instanceof MyExcepton) {
                    throw (MyExcepton) cause;
                } else {
                    throw launderThrowable(cause);
                }
            }
        }
    
        private RuntimeException launderThrowable(Throwable t) {
            if(t instanceof RuntimeException) {
                return (RuntimeException) t;
            } else if(t instanceof Error) {
                throw (Error) t;
            } else {
                throw new IllegalStateException("Not checked", t);
            }
        }
    }
    
    //测试代码
    private void testMethod() {
        Preloader preloader = new Preloader();
        preloader.runTask();
        try {
            preloader.getResult();
        } catch (Preloader.MyExcepton e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    //输出log
    2019-07-17 17:08:09.724 zwm, call
    2019-07-17 17:08:12.725 zwm, result: tomorrow
    

    Callable表示的任务可以抛出受检查或未受检查的异常,并且任何代码都可能抛出一个Error。无论任务代码抛出什么异常,都会被封装到一个ExecutionException中,并在Future.get中被重新抛出。这将使调用get的代码变得复杂,因为它不仅需要处理可能出现的ExecutionException(以及未检查的CancellationException),而且还由于ExecutionException是作为一个Throwable类返回的,因此处理起来不容易。

    信号量:
    计数信号量用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。计数信号量还可以用来实现某种资源池,或者对容器施加边界。

    Semaphore中管理着一组虚拟的许可,许可的初始数量可通过构造函数来指定。在执行操作时可以首先获得许可(只要还有剩余的许可),并在使用以后释放许可。如果没有许可,那么acquire将阻塞直到有许可(或者直到被中断或者操作超时)。release方法将返回一个许可给信号量。计算信号量的一种简化形式是二值信号量,即初始值为1的Semaphore。二者信号量可以用做互斥体,并具备不可重入的加锁语义,谁拥有这个唯一的许可,谁就拥有了互斥锁。

    //SemaphoreTest
    public class SemaphoreTest {
        private final Semaphore semaphore = new Semaphore(3);
    
        public void doSomething() {
            try {
                semaphore.acquire();
                Log.d("Semaphore", "zwm, thread: " + Thread.currentThread().getName() + " acquire after");
                Thread.sleep(5000);
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    //测试代码
    private void testMethod() {
        final SemaphoreTest semaphoreTest = new SemaphoreTest();
        for (int i = 0; i < 10; i++) {
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    semaphoreTest.doSomething();
                }
            });
            thread.start();
        }
    }
    
    //输出log
    2019-07-17 17:30:51.601 zwm, thread: Thread-10 acquire after
    2019-07-17 17:30:51.601 zwm, thread: Thread-11 acquire after
    2019-07-17 17:30:51.612 zwm, thread: Thread-12 acquire after
    2019-07-17 17:30:56.602 zwm, thread: Thread-15 acquire after
    2019-07-17 17:30:56.602 zwm, thread: Thread-14 acquire after
    2019-07-17 17:30:56.613 zwm, thread: Thread-16 acquire after
    2019-07-17 17:31:01.603 zwm, thread: Thread-17 acquire after
    2019-07-17 17:31:01.603 zwm, thread: Thread-13 acquire after
    2019-07-17 17:31:01.614 zwm, thread: Thread-18 acquire after
    2019-07-17 17:31:06.605 zwm, thread: Thread-19 acquire after
    

    栅栏:
    通过闭锁可以启动一组相关的操作,或者等待一组相关的操作结束。闭锁是一次性对象,一旦进入终止状态,就不能被重置。栅栏类似于闭锁,它能阻塞一组线程直到某个事件发生。栅栏与闭锁的关键区别在于,所有线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于用于等待其他线程。

    CyclicBarrier可以使一定数量的参与方反复地在栅栏位置汇集,它在并行迭代算法中非常有用:这种算法通常将一个问题拆分成一系列相互独立的子问题。当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达了栅栏位置,那么栅栏将打开,此时所有线程都被释放,而栅栏将被重置以便下次使用。如果对await的调用超时,或者await阻塞的线程被中断,那么栅栏就被认为是打破了,所有阻塞的await调用都将终止并抛出BrokenBarrierException。如果成功通过栅栏,那么await将为每个线程返回一个唯一的到达索引号,我们可以利用这些索引来"选举"产生一个领导线程,并在下一次迭代中由该领导线程执行一些特殊的工作。CyclicBarrier还可以使你将一个栅栏操作传递给构造函数,这是一个Runnable,当成功通过栅栏时会(在一个子任务线程中)执行它,但在阻塞线程被释放之前是不能执行的。

    //TestHarness
    public class TestHarness {
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
    
        public void doSomething() {
            try {
                cyclicBarrier.await();
                Log.d("TestHarness", "zwm, thread: " + Thread.currentThread().getName() + " cyclicBarrier await after");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
    
    //测试代码
    private void testMethod() {
        final TestHarness testHarness = new TestHarness();
        for (int i = 0; i < 10; i++) {
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    testHarness.doSomething();
                }
            });
    
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            thread.start();
        }
    }
    
    //输出log
    2019-07-17 19:04:00.131 zwm, thread: Thread-10 cyclicBarrier await after
    2019-07-17 19:04:00.131 zwm, thread: Thread-11 cyclicBarrier await after
    2019-07-17 19:04:00.131 zwm, thread: Thread-12 cyclicBarrier await after
    2019-07-17 19:04:03.136 zwm, thread: Thread-13 cyclicBarrier await after
    2019-07-17 19:04:03.136 zwm, thread: Thread-14 cyclicBarrier await after
    2019-07-17 19:04:03.136 zwm, thread: Thread-15 cyclicBarrier await after
    2019-07-17 19:04:06.146 zwm, thread: Thread-18 cyclicBarrier await after
    2019-07-17 19:04:06.147 zwm, thread: Thread-16 cyclicBarrier await after
    2019-07-17 19:04:06.147 zwm, thread: Thread-17 cyclicBarrier await after
    

    另一种形式的栅栏是Exchanger,它是一种两方栅栏,各方在栅栏位置上交换数据。当两方执行不对称的操作时,Exchanger会非常有用,例如当一个线程向缓冲区写入数据,而另一个线程从缓冲区读取数据。这些线程可以使用Exchanger来汇合,并将满的缓冲区与空的缓冲区交换。当两个线程通过Exchanger交换对象时,这种交换就把这两个对象安全地发布给另一方。

    //TestHarness
    public class TestHarness {
        final Exchanger exchanger = new Exchanger();
    
        public void prepareData1() {
            Thread thread1 = new Thread(new Runnable() {
                @Override
                public void run() {
                    String data1 = "This is data1";
                    try {
                        Log.d("TestHarness", "zwm, exchange data1 before");
                        data1 = (String)exchanger.exchange(data1);
                        Log.d("TestHarness", "zwm, exchange data1 after, data1: " + data1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
    
            thread1.start();
        }
    
        public void prepareData2() {
            Thread thread2 = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    String data2 = "This is data2";
                    try {
                        Log.d("TestHarness", "zwm, exchange data2 before");
                        data2 = (String)exchanger.exchange(data2);
                        Log.d("TestHarness", "zwm, exchange data2 after, data2: " + data2);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
    
            thread2.start();
        }
    }
    
    //测试代码
    private void testMethod() {
        final TestHarness testHarness = new TestHarness();
        testHarness.prepareData1();
        testHarness.prepareData2();
    }
    
    //输出log
    2019-07-17 19:23:06.036 zwm, exchange data1 before
    2019-07-17 19:23:11.043 zwm, exchange data2 before
    2019-07-17 19:23:11.043 zwm, exchange data1 after, data1: This is data2
    2019-07-17 19:23:11.043 zwm, exchange data2 after, data2: This is data1
    

    五、总结

    • 可变状态是至关重要的。
      所有的并发问题都可以归结为如何协调对并发状态的访问。可变状态越少,就越容易确保线程安全性。
    • 尽量将域声明为final类型,除非需要它们是可变的。
    • 不可变对象一定是线程安全的。
      不可变对象能极大地降低并发编程的复杂性。它们更为简单而且安全,可以任意共享而无须使用加锁或保护性复制等机制。
    • 封装有助于管理复杂性。
      在编写线程安全的程序时,虽然可以将所有数据都保存在全局变量中,但为什么要这样做?将数据封装在对象中,更易于维持不变性条件:将同步机制封装在对象中,更易于遵循同步策略。
    • 用锁来保护每个可变变量。
    • 当保护同一个不变性条件中的所有变量时,要使用同一个锁。
    • 在执行复合操作期间,要持有锁。
    • 如果从多个线程中访问同一个可变变量时没有同步机制,那么程序会出现问题。
    • 不要故作聪明地推断出不需要使用同步。
    • 在设计过程中考虑线程安全性,或者在文档中明确地指出它不是线程安全的。
    • 将同步策略文档化。

    相关文章

      网友评论

          本文标题:Java并发编程 -- 基础知识

          本文链接:https://www.haomeiwen.com/subject/eloclctx.html