美文网首页
第五章——基础构建模块

第五章——基础构建模块

作者: 你可记得叫安可 | 来源:发表于2020-10-25 15:47 被阅读0次

5.1 同步容器类

同步容器类包括 VectorHashtable,二者是早期 JDK 的一部分,此外还包括在 JDK 1.2 中添加的一些功能相似的类,这些同步的封装器类是由 Collections.synchronizedXxx 等工厂方法去创建的:

Collections.synchronizedXxx
这些类实现线程安全的方式是:将它们的状态封装起来,并对每个公有方法都进行同步,使得每次只有一个线程能访问容器的状态。
5.1.1 同步容器类的问题

同步容器类都是线程安全的,但在某些情况下可能需要额外的客户端加锁来保护复合操作。如下程序清单 5-1:

// 程序清单 5-1
public static Object getLast(Vector list) {
    int lastIndex = list.size() - 1;
    // 调用 get 方法时,vector 可能已经被下面的 deleteLast 更改了
    return list.get(lastIndex);
}
public static void deleteLast(Vector list) {
    int lastIndext = list.size() - 1;
    list.remove(lastIndext);
}

上面代码出错的根本原因是因为代码中的方法不是原子操作。我们可以通过获得容器类的锁,来使 getLastdeleteLast 成为原子操作:

// 程序清单 5-2
public static Object getLast(Vector list) {
    synchronized (list) {
        int lastIndex = list.size() - 1;
        return list.get(lastIndex);
    }
}
public static void deleteLast(Vector list) {
    synchronized (list) {
        int lastIndext = list.size() - 1;
        list.remove(lastIndext);
    }
}

5.2 并发容器

同步容器将所有对容器状态的访问都串行化,以实现它们的线程安全性。这种方法的代价是严重降低并发行,当多个线程竞争容器的锁时,吞吐量将严重减低。

另一方面,并发容器是针对多个线程并发访问设计的。在 Java 5.0 中增加了 ConcurrentHashMap,用来替代同步且基于散列的 Map,以及 CopyOnWriteArrayList,用于在遍历操作为主要操作的情况下代替同步的 ListJava 5.0 增加了两种新的容器类型:QueueBlockingQueue

Queue 用来临时保存一组等待处理的元素。它提供了几种实现,包括:ConcurrentLinkedQueue,这是一个传统的先进先出队列,以及 PriorityQueue,这是一个(非并发的)优先队列。

BlockingQueue 扩展了 Queue,增加了可阻塞的插入和获取等操作。

5.2.1 ConcurrentHashMap

ConcurrentHashMap 并不是将每个方法都在同一个锁上同步并使得每次只能有一个线程访问容器。
事实上,ConcurrentHashMapJava 1.7Java 1.8 中的实现有一点是不一样的。

Java 1.7 中的实现是该书中所介绍的,使用分段锁机制来在粒度更细的层次进行加锁。我们去看 Java 1.7 中的源码实现就会发现,它的分段锁机制由一个 Segment 的数据结构来实现,Segment 自身继承自可重入锁 ReentrantLock。而且 HashEntry 中的 value 是被 volatile 修饰的,因此它支持任意数量的读取线程并发地访问 Map,执行读取操作的线程和执行写入操作的线程可以并发地访问 Map,由 volatile 来保证可见性。

并且它还支持一定数量的写入线程可以并发地修改 Map,并发数量由 Segment 的数量来决定。这是因为 ConcurrentHashMap 采用 Segment 的分段锁机制。但一个写入线程在一个 Segment 上操作时,因为它只持有这个 Segment 的锁,所以其他的写入线程依然可以进入另外的 Segment 进行操作。

Java 1.8 中,ConcurrentHashMap 的实现使用了更进一步的优化。在数据结构上放弃了分段锁的实现,而是采用了根普通 HashMap 差不多的结构,只不过在多线程更新数据结构时采用了 CAS机制 和 synchronized 加锁。

ConcurrentHashMap 与其他并发容器一起增强了同步容器类:它们提供的迭代器不会抛出 ConcurrentModificationException,因此不需要在迭代过程中对容器加锁。ConcurrentHashMap 返回的迭代器具有弱一致性(Weakly Consistent),而并非“及时失败”。弱一致性的迭代器可以容忍并发的修改,当创建迭代器时会遍历已有的元素,并可以(但是不保证)在迭代器被构造后将修改操作反映给容器。

5.2.3 CopyOnWriteArrayList

CopyOnWriteArrayList 用于替代同步 List,在某些情况下它提供了更好的并发性能,并且在迭代期间不需要对容器进行加锁或复制。

“写入时复制(Copy-On-Write)”容器的线程安全性在于,只要正确地发布一个事实不可变的对象,那么在访问该对象时就不再需要进一步的同步。在每次修改时,都会创建并重新发布一个新的容器副本,从而实现可变性。“写入时复制”容器返回的迭代器不会抛出 ConcurrentModificationException,并且返回的元素与迭代器创建时的元素完全一致,而不必考虑之后修改操作所带来的影响。

5.3 阻塞队列和生产者 - 消费者模式

一种常见的生产者 - 消费者设计模式就是线程池与工作队列的组合,在 Executor 任务执行框架中就体现了这种模式。

在类库中包含了 BlockingQueue 的多种实现,其中,LinkedBlockingQueueArrayBlockingQueueFIFO 队列,二者分别与 LinkedListArrayList 类似,但比同步 List 拥有更好的并发性能。PriorityBlockingQueue 是一个按优先级排列的队列,当你希望按照某种顺序而不是 FIFO 来处理元素时,这个队列将非常有用。

最后一个 BlockingQueue 实现是 SynchronousQueue,实际上它不是一个真正的队列,因为它不会为队列中元素维护存储空间。与其他队列不同的是,它维护一组线程,这些线程在等待着把元素加入或移除队列。

5.3.3 双端队列和工作密取

Java 6 增加了两种容器类型,Deque(发音为 "deck",是 Double Enabled Queue 的缩写)和 BlockingDeque,它们分别对 QueueBlockingQueue 进行了扩展。Deque 是一个双端队列,实现了在队列头和对列尾的高效插入和移除。具体实现包括 ArrayDequeLinkedBlockingDeque

正如阻塞队列适用于生产者 - 消费者模式,双端队列同样适用于另一种相关模式,即工作密取(Work Stealing)。在生产者 - 消费者设计中,所有消费者有一个共享的工作队列,而在工作密取设计中,每个消费者都有各自的双端队列。如果一个消费者完成了自己双端队列中的全部工作,那么它可以从其他消费者双端队列末尾秘密地获取工作。密取工作模式比传统的生产者 - 消费者模式具有更高的可伸缩性,这是因为工作者线程不会在单个共享的任务队列上发生竞争。在大多数时候,它们都只是访问自己的双端队列,从而极大地减少了竞争。当工作者线程需要访问另一个队列时,它会从队列的尾部而不是从头部获取工作,因此进一步降低了队列上的竞争程度。

Java 7Java 8 中,都原生的实现了拥有工作密取机制的线程池,分别是 ForkJoinPoolnewWorkStealingPool 类。

5.4 阻塞方法与中断方法

Java 中 InterruptedException 的最佳实践

5.5 同步工具类

同步工具类可以是任何一个对象,只要它根据其自身的状态来协调现成的控制流。阻塞队列可以作为同步工具类,其他类型的同步工具类还包括信号量(Semaphore)、栅栏(Barrier)以及闭锁(Latch)。

5.5.1 闭锁

闭锁是一种同步工具类,可以延迟线程的进度直到其到达终止状态。当闭锁到达结束状态后,将不会再改变状态,闭锁可以用来确保某些活动直到其他活动都完成后才继续执行。

CountDownLatch 是一种灵活的闭所实现,它可以使一个或多个线程等待一组事件发生。闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量。countDown 方法递减计数器,表示有一个事件已经发生了,而 await 方法等待计数器达到零,表示所有需要等待的事件都已经发生。如果计数器的值非零,那么 await 会一直阻塞直到计数器为零,或者等待中的线程中断,或者等待超时。

// 程序清单 5-11
static class TestHarness {
    public long timeTasks(int nThreads, final Runnable task) throws InterruptedException {
        final CountDownLatch startGate = new CountDownLatch(1);
        final CountDownLatch endGate = new CountDownLatch(nThreads);
        
        for (int i = 0; i < nThreads; i++) {
            Thread t = new Thread() {
                @Override
                public void run() {
                    try {
                        startGate.await();
                        try {
                            task.run();
                        } finally {
                            endGate.countDown();
                        }
                    } catch (InterruptedException ignored) {}
                }
            };
            t.start();
        }
        long start = System.nanoTime();
        startGate.countDown();
        endGate.await();
        long end = System.nanoTime();
        return end - start;
    }
}

我们使用 startGate 闭锁来达到所有子线程都调用了 start 方法启动后,主线程才开始计时的功能。同时我们使用 endGate 闭锁来达到最后一个子线程执行完时,主线程才结束计时的功能。

5.5.2 FutureTask

FutureTask 也可以用作闭锁。FutureTask 表示的计算是通过 Callable 来实现的,相当于一种可生成结果的 Runnable,并且可以处于以下 3 种状态:等待运行(Waiting to run),正在运行(Running)和运行完成(Completed)。“执行完成”表示计算的所有可能结束方式,包括正常结束、由于取消而结束和由于异常而结束等。当 FutureTask 进入完成状态后,它会永远停止在这个状态上。

Future.get 的行为取决于任务的状态。如果任务已经完成,那么 get 会立即返回结果,否则 get 将阻塞直到任务进入完成状态,然后返回结果或者抛出异常。FutureTask 将计算结果从执行计算的线程传递到获取这个结果的线程,而 FutureTask 的规范确保了这种传递过程能实现结果的安全发布。

static class Preloader {
    private final FutureTask<ProductInfo> future =
            new FutureTask<ProductInfo>(new Callable<ProductInfo>() {
                @Override
                public ProductInfo call() throws Exception {
                    return loadProductInfo();
                }
            });
    private final Thread thread = new Thread(future);
    public void start() {
        thread.start();
    }
    public ProductInfo get() throws DataLoadException, InterruptedException {
        try {
            return future.get();  // 可能会阻塞地等待结果
        } catch (ExecutionException e) {
            Throwable cause = e.getCause();
            if (cause instanceof DataLoadException) {
                throw (DataLoadException) cause;
            } else {
                throw launderThrowable(cause);
            }
        }
    }
    
    public static RuntimeException launderThrowable(Throwable t) {
        if (t instanceof RuntimeException) {
            return (RuntimeException) t;
        } else if (t instanceof Error) {
            throw (Error) t;
        } else {
            throw new IllegalStateException("Not unchecked", t);
        }
    }
}

无论任务代码抛出什么异常,都会被封装到一个 ExecutionException 中,并在 Future.get 中被重新抛出。这将使调用 get 的代码变得复杂,因为它不仅需要处理可能出现的 ExecutionException(以及未检查的 CancellationException),而且还由于 ExecutionException 是作为一个 Throwable 类返回的,因此处理起来并不容易。我们使用 launderThrowable 方法来处理 get 抛出的各种情况的异常。

5.5.3 信号量

计数信号量(Counting Semaphore)用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。计数信号量还可以用来实现某种资源池,或者对容器施加边界。

Semaphore 中管理着一组虚拟的许可(permit),许可的初始数量可通过构造函数来指定。在执行操作时可以首先获得许可(只要还有剩余的许可),并在使用以后释放许可。如果没有许可,那么 acquire 将阻塞直到有许可(或者直到被中断或者操作超时)。release 操作将返回一个许可给信号量。而且 Semaphore 并不受限于它在创建时的初始许可数量。

计算信号量的一种简化形式是二值信号量,即初始值为 1 的 Semaphore。二值信号量可以用作互斥体(mutex),并具备不可重入的加锁语义:谁拥有这个唯一的许可,就拥有了互斥锁。

Semaphore 可以用于构造阻塞对象池:如果将 Semaphore 的计数值初始化为池的大小,并在从池中获取一个资源之前首先调用 acquire 方法获取一个许可,在将资源返回给池之后调用 release 释放许可,那么 acquire 将一直阻塞直到资源不为空。(但是,实现阻塞对象池时,一种更简单的方法是使用 BlockingQueue 来保存池的资源。

同样,你也可以使用 Semaphore 将任何一种容器变成有界阻塞容器。

// 程序清单 5-14
static class BoundedHashSet<T> {
    private final Set<T> set;
    private final Semaphore sem;
    public BoundedHashSet(int bound) {
        this.set = Collections.synchronizedSet(new HashSet<>());
        sem = new Semaphore(bound);
    }
    public boolean add(T o) throws InterruptedException {
        sem.acquire();
        boolean wasAdded = false;
        try {
            wasAdded = set.add(o);
            return wasAdded;
        } finally {
            if (!wasAdded) {
                sem.release();
            }
        }
    }
    public boolean remove(Object o) {
        boolean wasRemoved = set.remove(o);
        if (wasRemoved) {
            sem.release();
        }
        return wasRemoved;
    }
}

信号量 sem 的计数值会初始化为容器容量的最大值。add 操作在向底层容器中添加一个元素之前,首先要获取一个许可。如果 add 操作没有添加任何元素,那么会立刻释放许可。同样,remove 操作释放一个许可,使更多的元素能够添加到容器中。底层的 Set 实现并不知道关于边界的任何信息,这是由 BoundedHashSet 来处理的。

5.5.4 栅栏

这个在应用中比较少见,先暂时不看。

5.6 构建高效且可伸缩的结果缓存

本节我们将开发一个高效且可伸缩的缓存,用于改进一个高计算开销的函数。我们首先从简单的 HashMap 开始,然后分析它的并发性缺陷,并讨论如何修复它们。

// 程序清单 5-16

public interface Computable<A, V> {
    V compute(A arg) throws InterruptedException;
}

public class ExpensiveFunction implements Computable<String, BigInteger> {
    @Override
    public BigInteger compute(String arg) throws InterruptedException {
        // 在经过长时间的计算后
        return new BigInteger(arg);
    }
}

public class Memoizer1<A, V> implements Computable<A, V> {
    private final Map<A, V> cache = new HashMap<>();
    private final Computable<A, V> c;

    public Memoizer1(Computable<A, V> c) {
        this.c = c;
    }

    // 通过 synchronized 关键字来进行同步
    @Override
    public synchronized V compute(A arg) throws InterruptedException {
        V result = cache.get(arg);
        if (result == null) {
            result = c.compute(arg);
            cache.put(arg, result);
        }
        return result;
    }
}

上面的程序逻辑很简单,使用 HashMap 来保存之前计算的结果。compute 方法将首先检查需要的结果是否已经在缓存中,如果存在则返回之前计算的值,否则,将把计算结果缓存在 HashMap 中,然后再返回。

HashMap 不是线程安全的,因此要确保两个线程不会同时访问 HashMapMemoizer1 使用了 synchronize 关键字对 compute 方法进行同步。这种方法不会有线程安全性问题,但是会有一个明显的可伸缩性问题:每次只有一个线程能够执行 compute。这显然不是我们想要的。

// 程序清单 5-17
public class Memoizer2<A, V> implements Computable<A, V> {
    private final Map<A, V> cache = new ConcurrentHashMap<>();
    private final Computable<A, V> c;

    public Memoizer2(Computable<A, V> c) {
        this.c = c;
    }

    @Override
    public V compute(A arg) throws InterruptedException {
        V result = cache.get(arg);
        if (result == null) {
            result = c.compute(arg);
            cache.put(arg, result);
        }
        return result;
    }
}

上面 Memoizer2ConcurrentHashMap 代替 HashMap 来改进 Memoizer 中糟糕的并发行为。由于 ConcurrentHashMap 是线程安全的,因此在访问底层 Map 时就不需要进行同步,因而避免了在对 Memoizer1 中的 compute 方法进行同步时带来的串行性。

但是 Memoizer2 仍然存在一个问题,当两个线程同时调用 compute 时,可能会导致计算得到相同值,这是因为一个线程不知道另一个线程也在做相同的计算。因此我们的一个解决方案是,一个线程在进行计算前,需要有能力去查看是否有其他线程正在进行同样的计算?

我们已经知道有一个类能基本实现这个功能:FutureTaskFutureTask 表示一个计算的过程,这个过程可能已经计算完成,也可能正在进行。如果有结果可用,那么 FutureTask.get 将立即返回结果,否则它会一直阻塞,直到结果计算出来再将其返回。

// 程序清单 5-18
public class Memoizer3<A, V> implements Computable<A, V> {
    private final Map<A, Future<V>> cache = new ConcurrentHashMap<>();
    private final Computable<A, V> c;
    public Memoizer3(Computable<A, V> c) {
        this.c = c;
    }
    @Override
    public V compute(A arg) throws InterruptedException {
        // 这里的 get 与下面的 put 不是原子操作,因此同样存在短暂的时间,两个线程可能会同时进入下面的 if 语句
        Future<V> f = cache.get(arg);
        if (f == null) {
            Callable<V> eval = () -> c.compute(arg); // 将计算包装成一个 Callable
            FutureTask<V> ft = new FutureTask<>(eval);
            f = ft;
            cache.put(arg, ft);
            ft.run(); // 开始执行计算
        }
        try {
            return f.get(); // 阻塞等待计算结果
        } catch (ExecutionException e) {
            throw launderThrowable(e.getCause());
        }
    }
}

上面的 Memoizer3 几乎是完美的:若结果已经计算出来,那么将立即返回。如果其他线程正在计算该结果,那么新到的线程将一直等待这个结果被计算出来。它只有一个缺陷:正如代码中注释所说的,compute 方法中的 if 代码块仍然是非原子(nonatomic)的 “先检查再执行” 操作,因此两个线程仍有可能在同一时间内调用 compute 来计算相同的值,即二者都没有在缓存中找到期望的值,因此都开始计算。

// 程序清单 5-19
public class Memoizer<A, V> implements Computable<A, V> {
    private final ConcurrentMap<A, Future<V>> cache = new ConcurrentHashMap<>();
    private final Computable<A, V> c;

    public Memoizer(Computable<A, V> c) {
        this.c = c;
    }

    @Override
    public V compute(A arg) throws InterruptedException {
        while (true) {
            Future<V> f = cache.get(arg);
            if (f == null) {
                Callable<V> eval = () -> c.compute(arg);
                FutureTask<V> ft = new FutureTask<>(eval);
                f = cache.putIfAbsent(arg, ft); // 使用 putIfAbsent 原子方法来避免两个线程同时执行下面的 run 方法
                if (f == null) {
                    f = ft;
                    ft.run();
                }
            }
            try {
                return f.get();
            } catch (CancellationException e) {
                cache.remove(arg, f);
            } catch (ExecutionException e) {
                throw launderThrowable(e.getCause());
            }
        }
    }
}

上面的 Memoizer 使用了 ConcurrentMap 中的原子方法 putIfAbsent,避免了 Memozier3 的漏洞。当缓存的是 Future 而不是值时,将导致缓存污染(Cache Pollution)问题:如果某个计算被取消或失败,那么将把 Future 从缓存中移除,这样将来的计算才可能成功。

Memoizer 的代码中,实际上永远不会抛出 CancellationException,但是由于无法保证使用者在 Callable 的实现中会执行怎么样的操作(包括取消操作),因此考虑 catch 住这个异常并没有任何问题。

Memoizer 同样没有解决缓存逾期的问题,

相关文章

  • 第五章——基础构建模块

    5.1 同步容器类 同步容器类包括 Vector 和 Hashtable,二者是早期 JDK 的一部分,此外还包括...

  • 第五章 管理多模块构建

    第五章 管理多模块构建 Android Studio 不仅可以为 应用 和 依赖库 建立模块,还可以为 Andro...

  • 被动扫描框架Recon-ng

    目录 基础使用步骤启动菜单使用模块查看结果 具体模块侦查发现攻击报告 自行构建模块 基础使用步骤 启动 recon...

  • 使用gradle构建springboot多模块项目,并构建成do

    使用gradle构建springboot多模块项目,并构建成docker容器的demo. 基础环境 gradle(...

  • TypeScript基础入门之模块(五)

    转载 # TypeScript基础入门之模块(五) 构建模块的指南 导出尽可能接近顶级 使用您导出的东西时,模块的...

  • Android 模块化/组件化

    项目例子 问题 解耦 模块间的通信 基础核心模块的构建 业务组件化 代码边界 资源合并 Arouter 配置 各个...

  • Maven构建多模块工程

    构建多模块Maven工程 基础知识铺垫  Maven多模块项目,适用于一些比较大的项目,通过合理的模块拆分,实现代...

  • 五、基础构建模块

    同步容器类1.1 同步容器类的问题同步容器类都是线程安全的,但在某些情况下可能需要额外的客户端加锁来保护符合操作。...

  • RESTful是什么

    在程序猿DD Spring Boot基础教程的开山篇《基础项目构建,引入Web模块,完成一个简单的RESTful ...

  • Getting started with multi-modul

    这一章介绍Go语言多模块工作空间的基础概念,我们会在共享的多模块工作空间创建两个模块并修改它们,从而在构建程序过程...

网友评论

      本文标题:第五章——基础构建模块

      本文链接:https://www.haomeiwen.com/subject/ovrsmktx.html