美文网首页
Java编程思想(二十) 并发

Java编程思想(二十) 并发

作者: kaiker | 来源:发表于2022-07-10 16:21 被阅读0次

    1、并发的多面性

    更快的执行

    • 如果没有任务会阻塞,那么在单处理器机器上使用并发就没有任何意义。
    • 实现并发最直接的方式是在操作系统级别使用进程。
    • 多任务操作系统可以通过周期性地将CPU从一个进程切换到另一个进程,来实现同时运行多个进程。进程会互相隔离开。

    改进代码设计

    • Java的线程机制是抢占式的,这表示调度机制会周期性地中断线程,将上下文切换到另一个线程,从而为每个线程都提供时间片,使得每个线程都提供时间片。
    • 上下文切换的开销通常比抢占式系统要低廉许多。

    2、基本的线程机制

    • 并发机制使得程序可划分为多个分离的、独立运行的任务。
    • 这些独立的任务中的每一个都将由执行线程来驱动。
    • 一个线程就是在进程中的一个单一的顺序控制流。
    • 底层机制是切分CPU时间,CPU将轮流给每个任务分配其占用时间。

    定义任务

    • 只需实现Runnable接口并编写run方法
    public class LiftOff implements Runnable {
      protected int countDown = 10; // Default
      private static int taskCount = 0;
      private final int id = taskCount++;
      public LiftOff() {}
      public LiftOff(int countDown) {
        this.countDown = countDown;
      }
      public String status() {
        return "#" + id + "(" +
          (countDown > 0 ? countDown : "Liftoff!") + "), ";
      }
      public void run() {
        while(countDown-- > 0) {
          System.out.print(status());
          Thread.yield();
        }
      }
    } ///:
    
    • Thread.yield()的调用是对线程调度器的一种简易。声明此时可以切换给其他任务执行。
    • 要实现线程行为,必须显示地将一个任务附着到线程上。

    Thread类

    • Runnable对象转变为工作任务的传统方式是把它提交给一个Thread构造器。
    public class BasicThreads {
      public static void main(String[] args) {
        Thread t = new Thread(new LiftOff());
        t.start();
        System.out.println("Waiting for LiftOff");
      }
    } /* Output: (90% match)
    Waiting for LiftOff
    #0(9), #0(8), #0(7), #0(6), #0(5), #0(4), #0(3), #0(2), #0(1), #0(Liftoff!),
    *///:~
    
    • Thread构造器只需要一个Runnable对象。调用Thread对象的start()方法为该线程执行必须的初始化操作,然后调用Runnable的run()方法。
    • 线程调度器将在多个处理器之间默默地分发线程。
    • 每个Thread都注册了自己,因此确实有一个对它的引用,而且在它的任务退出其run()并死亡之前,垃圾回收器都无法清除它。

    使用Executor

    • 执行器将为你管理Thread对象,从而简化并发编程。
    • Excutor代替显示创建Thread对象。
    public class CachedThreadPool {
      public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        for(int i = 0; i < 5; i++)
          exec.execute(new LiftOff());
        exec.shutdown();
      }
    } /* Output: (Sample)
    #0(9), #0(8), #1(9), #2(9), #3(9), #4(9), #0(7), #1(8), #2(8), #3(8), #4(8), #0(6), #1(7), #2(7), #3(7), #4(7), #0(5), #1(6), #2(6), #3(6), #4(6), #0(4), #1(5), #2(5), #3(5), #4(5), #0(3), #1(4), #2(4), #3(4), #4(4), #0(2), #1(3), #2(3), #3(3), #4(3), #0(1), #1(2), #2(2), #3(2), #4(2), #0(Liftoff!), #1(1), #2(1), #3(1), #4(1), #1(Liftoff!), #2(Liftoff!), #3(Liftoff!), #4(Liftoff!),
    *///:~
    
    • 非常常见的情况是,单个Executor被用来创建和管理系统中所有的任务。
    • 有了FixedThreadPool,就可以一次性预先执行代价高昂的线程分配。
    • 但通常还是优先用CachedThreadPool
    public class FixedThreadPool {
      public static void main(String[] args) {
        // Constructor argument is number of threads:
        ExecutorService exec = Executors.newFixedThreadPool(5);
        for(int i = 0; i < 5; i++)
          exec.execute(new LiftOff());
        exec.shutdown();
      }
    } /* Output: (Sample)
    #0(9), #0(8), #1(9), #2(9), #3(9), #4(9), #0(7), #1(8), #2(8), #3(8), #4(8), #0(6), #1(7), #2(7), #3(7), #4(7), #0(5), #1(6), #2(6), #3(6), #4(6), #0(4), #1(5), #2(5), #3(5), #4(5), #0(3), #1(4), #2(4), #3(4), #4(4), #0(2), #1(3), #2(3), #3(3), #4(3), #0(1), #1(2), #2(2), #3(2), #4(2), #0(Liftoff!), #1(1), #2(1), #3(1), #4(1), #1(Liftoff!), #2(Liftoff!), #3(Liftoff!), #4(Liftoff!),
    *///:~
    

    从任务中产生返回值

    • 如果你希望任务在完成时能够返回一个值,那么可以实现Callable接口而不是Runnable接口。
    • Callable是一种具有类型参数的泛型,它的类型参数表示的是从方法call()中返回的值。
    • 必须使用ExecutorService.submit()方法调用它。
    class TaskWithResult implements Callable<String> {
      private int id;
      public TaskWithResult(int id) {
        this.id = id;
      }
      public String call() {
        return "result of TaskWithResult " + id;
      }
    }
    
    public class CallableDemo {
      public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        ArrayList<Future<String>> results =
          new ArrayList<Future<String>>();
        for(int i = 0; i < 10; i++)
          results.add(exec.submit(new TaskWithResult(i)));
        for(Future<String> fs : results)
          try {
            // get() blocks until completion:
            System.out.println(fs.get());
          } catch(InterruptedException e) {
            System.out.println(e);
            return;
          } catch(ExecutionException e) {
            System.out.println(e);
          } finally {
            exec.shutdown();
          }
      }
    } /* Output:
    result of TaskWithResult 0
    result of TaskWithResult 1
    result of TaskWithResult 2
    result of TaskWithResult 3
    result of TaskWithResult 4
    result of TaskWithResult 5
    result of TaskWithResult 6
    result of TaskWithResult 7
    result of TaskWithResult 8
    result of TaskWithResult 9
    *///:~
    
    • submit()方法会产生Future对象,它用Callable返回结果的特定类型进行了参数化,可以用isDone()方法来查询Future是否已完成。当任务完成时,调用get()来获取结果。如果没完成,调用get()将会阻塞。

    休眠

    • 对sleep()的调用可以跑出InterruptedException异常。
    public class SleepingTask extends LiftOff {
      public void run() {
        try {
          while(countDown-- > 0) {
            System.out.print(status());
            // Old-style:
            // Thread.sleep(100);
            // Java SE5/6-style:
            TimeUnit.MILLISECONDS.sleep(100);
          }
        } catch(InterruptedException e) {
          System.err.println("Interrupted");
        }
      }
      public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        for(int i = 0; i < 5; i++)
          exec.execute(new SleepingTask());
        exec.shutdown();
      }
    } /* Output:
    #0(9), #1(9), #2(9), #3(9), #4(9), #0(8), #1(8), #2(8), #3(8), #4(8), #0(7), #1(7), #2(7), #3(7), #4(7), #0(6), #1(6), #2(6), #3(6), #4(6), #0(5), #1(5), #2(5), #3(5), #4(5), #0(4), #1(4), #2(4), #3(4), #4(4), #0(3), #1(3), #2(3), #3(3), #4(3), #0(2), #1(2), #2(2), #3(2), #4(2), #0(1), #1(1), #2(1), #3(1), #4(1), #0(Liftoff!), #1(Liftoff!), #2(Liftoff!), #3(Liftoff!), #4(Liftoff!),
    *///:~
    

    优先级

    • 优先级较低的线程仅仅是执行的效率较低。
    • 可以用setPriority来设置优先级。
    public class SimplePriorities implements Runnable {
      private int countDown = 5;
      private volatile double d; // No optimization
      private int priority;
      public SimplePriorities(int priority) {
        this.priority = priority;
      }
      public String toString() {
        return Thread.currentThread() + ": " + countDown;
      }
      public void run() {
        Thread.currentThread().setPriority(priority);
        while(true) {
          // An expensive, interruptable operation:
          for(int i = 1; i < 100000; i++) {
            d += (Math.PI + Math.E) / (double)i;
            if(i % 1000 == 0)
              Thread.yield();
          }
          System.out.println(this);
          if(--countDown == 0) return;
        }
      }
      public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        for(int i = 0; i < 5; i++)
          exec.execute(
            new SimplePriorities(Thread.MIN_PRIORITY));
        exec.execute(
            new SimplePriorities(Thread.MAX_PRIORITY));
        exec.shutdown();
      }
    } /* Output: (70% match)
    Thread[pool-1-thread-6,10,main]: 5
    Thread[pool-1-thread-6,10,main]: 4
    Thread[pool-1-thread-6,10,main]: 3
    Thread[pool-1-thread-6,10,main]: 2
    Thread[pool-1-thread-6,10,main]: 1
    Thread[pool-1-thread-3,1,main]: 5
    Thread[pool-1-thread-2,1,main]: 5
    Thread[pool-1-thread-1,1,main]: 5
    Thread[pool-1-thread-5,1,main]: 5
    Thread[pool-1-thread-4,1,main]: 5
    ...
    *///:~
    

    让步

    • 如果可以让别的线程使用CPU了,可以通过调用yield()方法来做出。

    后台线程

    • 在程序运行时在后台提供的一种通用线程。
    • 执意要也有任何非后台线程运行,程序就不会终止。
    • setDaemon()能把它设置为后台线程。
    • 后台进程将在不执行finally的情况下终止run方法

    编码变体

    • start()在构造器中调用
    class InnerRunnable1 {
      private int countDown = 5;
      private Inner inner;
      private class Inner implements Runnable {
        Thread t;
        Inner(String name) {
          t = new Thread(this, name);
          t.start();
        }
        public void run() {
          try {
            while(true) {
              print(this);
              if(--countDown == 0) return;
              TimeUnit.MILLISECONDS.sleep(10);
            }
          } catch(InterruptedException e) {
            print("sleep() interrupted");
          }
        }
        public String toString() {
          return t.getName() + ": " + countDown;
        }
      }
      public InnerRunnable1(String name) {
        inner = new Inner(name);
      }
    }
    

    加入一个线程

    • 用join(),效果是等待一段时间直到第二个线程结束。
    class Sleeper extends Thread {
      private int duration;
      public Sleeper(String name, int sleepTime) {
        super(name);
        duration = sleepTime;
        start();
      }
      public void run() {
        try {
          sleep(duration);
        } catch(InterruptedException e) {
          print(getName() + " was interrupted. " +
            "isInterrupted(): " + isInterrupted());
          return;
        }
        print(getName() + " has awakened");
      }
    }
    
    class Joiner extends Thread {
      private Sleeper sleeper;
      public Joiner(String name, Sleeper sleeper) {
        super(name);
        this.sleeper = sleeper;
        start();
      }
      public void run() {
       try {
          sleeper.join();
        } catch(InterruptedException e) {
          print("Interrupted");
        }
        print(getName() + " join completed");
      }
    }
    
    public class Joining {
      public static void main(String[] args) {
        Sleeper
          sleepy = new Sleeper("Sleepy", 1500),
          grumpy = new Sleeper("Grumpy", 1500);
        Joiner
          dopey = new Joiner("Dopey", sleepy),
          doc = new Joiner("Doc", grumpy);
        grumpy.interrupt();
      }
    } /* Output:
    Grumpy was interrupted. isInterrupted(): false
    Doc join completed
    Sleepy has awakened
    Dopey join completed
    *///:~
    

    3、共享受限资源

    • 防止冲突方法付就是当资源被有一个任务使用时,在其上加锁。
    • 这种机制常常称为互斥量
    • Java以提供关键字synchronized形式,防止资源冲突。当任务要执行被synchronized关键字保护代码片段将检查锁是否可用。
    • 共享资源一般是以对象形式存在的内存片段。
    • 所有对象都自动含有单一的锁,当对象上调用任意synchronized方法的时候,此对象都被加锁,这时该对象上的其他synchronized方法只有等到前一个方法调用完毕并释放了锁之后才能被调用。
    • 在使用并发时,域设置为private很重要,否则synchronized关键字就不能防止其他任务直接访问域。
    • 只有首先获得了锁的任务才能允许继续获取多个锁。
    • 针对每个类,也有一个锁,所以synchronized stattic方法可以在类的范围内防止对static数据的访问。
    • 如果你正在写一个变量,它可能接下来被另一个线程读取,或者正在读取一个上一次已经被另一个线程写过的变量,必须使用同步。
    public class
    SynchronizedEvenGenerator extends IntGenerator {
      private int currentEvenValue = 0;
      public synchronized int next() {
        ++currentEvenValue;
        Thread.yield(); // Cause failure faster
        ++currentEvenValue;
        return currentEvenValue;
      }
      public static void main(String[] args) {
        EvenChecker.test(new SynchronizedEvenGenerator());
      }
    } ///:~
    
    • Lock对象需要被显示地创建、锁定和释放。
    • finally里要有unlock()
    • return放在try中,确保unlock()不会过早发生。
    public class MutexEvenGenerator extends IntGenerator {
      private int currentEvenValue = 0;
      private Lock lock = new ReentrantLock();
      public int next() {
        lock.lock();
        try {
          ++currentEvenValue;
          Thread.yield(); // Cause failure faster
          ++currentEvenValue;
          return currentEvenValue;
        } finally {
          lock.unlock();
        }
      }
      public static void main(String[] args) {
        EvenChecker.test(new MutexEvenGenerator());
      }
    } ///:~
    
    • ReentrantLock允许你尝试着获取但最终未获取锁,这样如果其他人已经获取了这个锁,你就可以决定离开去执行一些事,而不是等待到锁释放。

    原子性与易变性

    • 原子操作是不能被线程调度机制中断的操作。
    • 依赖原子性是棘手且危险的。
    • long double的操作不是原子的,因为会分离为32位执行。但使用volatile关键字就会获得原子性。
    • volatile关键字确保了应用中的可视性,以内不同人物对应的状态有不同的视图,一些值也许还存在本地处理器缓存中。volatile域会立即被写入到主存中,读操作就发生在主存中。
    • 同步也会导致主存刷新,如果域完全由synchronized保护,就不必使用volatile
    • 使用volatile而不用synchronized唯一安全的情况是类中只有一个可变的域。

    原子类

    • 机器级别上的原子性
    • AtomicInteger、AtomicLong、AtomicReference
    • 通常锁更安全一些。
    public class AtomicIntegerTest implements Runnable {
      private AtomicInteger i = new AtomicInteger(0);
      public int getValue() { return i.get(); }
      private void evenIncrement() { i.addAndGet(2); }
      public void run() {
        while(true)
          evenIncrement();
      }
      public static void main(String[] args) {
        new Timer().schedule(new TimerTask() {
          public void run() {
            System.err.println("Aborting");
            System.exit(0);
          }
        }, 5000); // Terminate after 5 seconds
        ExecutorService exec = Executors.newCachedThreadPool();
        AtomicIntegerTest ait = new AtomicIntegerTest();
        exec.execute(ait);
        while(true) {
          int val = ait.getValue();
          if(val % 2 != 0) {
            System.out.println(val);
            System.exit(0);
          }
        }
      }
    } ///:~
    

    临界区

    • 希望防止多个线程同时访问方法内部的部分代码而不是防止访问整个方法。通过这种方式分离出来的代码被称为临街区。
    • 使用synchronized,被用来指定某个对象。
    • 使用同步控制快进行同步,对象不加锁的时间更长。
    class Pair { // Not thread-safe
      private int x, y;
      public Pair(int x, int y) {
        this.x = x;
        this.y = y;
      }
      public Pair() { this(0, 0); }
      public int getX() { return x; }
      public int getY() { return y; }
      public void incrementX() { x++; }
      public void incrementY() { y++; }
      public String toString() {
        return "x: " + x + ", y: " + y;
      }
      public class PairValuesNotEqualException
      extends RuntimeException {
        public PairValuesNotEqualException() {
          super("Pair values not equal: " + Pair.this);
        }
      }
      // Arbitrary invariant -- both variables must be equal:
      public void checkState() {
        if(x != y)
          throw new PairValuesNotEqualException();
      }
    }
    
    // Protect a Pair inside a thread-safe class:
    abstract class PairManager {
      AtomicInteger checkCounter = new AtomicInteger(0);
      protected Pair p = new Pair();
      private List<Pair> storage =
        Collections.synchronizedList(new ArrayList<Pair>());
      public synchronized Pair getPair() {
        // Make a copy to keep the original safe:
        return new Pair(p.getX(), p.getY());
      }
      // Assume this is a time consuming operation
      protected void store(Pair p) {
        storage.add(p);
        try {
          TimeUnit.MILLISECONDS.sleep(50);
        } catch(InterruptedException ignore) {}
      }
      public abstract void increment();
    }
    
    // Synchronize the entire method:
    class PairManager1 extends PairManager {
      public synchronized void increment() {
        p.incrementX();
        p.incrementY();
        store(getPair());
      }
    }
    
    // Use a critical section:
    class PairManager2 extends PairManager {
      public void increment() {
        Pair temp;
        synchronized(this) {
          p.incrementX();
          p.incrementY();
          temp = getPair();
        }
        store(temp);
      }
    }
    
    class PairManipulator implements Runnable {
      private PairManager pm;
      public PairManipulator(PairManager pm) {
        this.pm = pm;
      }
      public void run() {
        while(true)
          pm.increment();
      }
      public String toString() {
        return "Pair: " + pm.getPair() +
          " checkCounter = " + pm.checkCounter.get();
      }
    }
    
    class PairChecker implements Runnable {
      private PairManager pm;
      public PairChecker(PairManager pm) {
        this.pm = pm;
      }
      public void run() {
        while(true) {
          pm.checkCounter.incrementAndGet();
          pm.getPair().checkState();
        }
      }
    }
    
    public class CriticalSection {
      // Test the two different approaches:
      static void
      testApproaches(PairManager pman1, PairManager pman2) {
        ExecutorService exec = Executors.newCachedThreadPool();
        PairManipulator
          pm1 = new PairManipulator(pman1),
          pm2 = new PairManipulator(pman2);
        PairChecker
          pcheck1 = new PairChecker(pman1),
          pcheck2 = new PairChecker(pman2);
        exec.execute(pm1);
        exec.execute(pm2);
        exec.execute(pcheck1);
        exec.execute(pcheck2);
        try {
          TimeUnit.MILLISECONDS.sleep(500);
        } catch(InterruptedException e) {
          System.out.println("Sleep interrupted");
        }
        System.out.println("pm1: " + pm1 + "\npm2: " + pm2);
        System.exit(0);
      }
      public static void main(String[] args) {
        PairManager
          pman1 = new PairManager1(),
          pman2 = new PairManager2();
        testApproaches(pman1, pman2);
      }
    } /* Output: (Sample)
    pm1: Pair: x: 15, y: 15 checkCounter = 272565
    pm2: Pair: x: 16, y: 16 checkCounter = 3956974
    *///:~
    

    在其他对象上同步

    • 如果必须在另一个对象上同步,必须确保所有相关任务都是在这同一个对象上同步。

    线程本地存储

    • 使用相同变量的每个不同的线程都创建不同的存储。
    • ThreadLocal对象通常当作静态域存储。在创建ThreadLocal时,你只能通过get()和set()方法访问对象内容。
    class Accessor implements Runnable {
      private final int id;
      public Accessor(int idn) { id = idn; }
      public void run() {
        while(!Thread.currentThread().isInterrupted()) {
          ThreadLocalVariableHolder.increment();
          System.out.println(this);
          Thread.yield();
        }
      }
      public String toString() {
        return "#" + id + ": " +
          ThreadLocalVariableHolder.get();
      }
    }
    
    public class ThreadLocalVariableHolder {
      private static ThreadLocal<Integer> value =
        new ThreadLocal<Integer>() {
          private Random rand = new Random(47);
          protected synchronized Integer initialValue() {
            return rand.nextInt(10000);
          }
        };
      public static void increment() {
        value.set(value.get() + 1);
      }
      public static int get() { return value.get(); }
      public static void main(String[] args) throws Exception {
        ExecutorService exec = Executors.newCachedThreadPool();
        for(int i = 0; i < 5; i++)
          exec.execute(new Accessor(i));
        TimeUnit.SECONDS.sleep(3);  // Run for a while
        exec.shutdownNow();         // All Accessors will quit
      }
    } /* Output: (Sample)
    #0: 9259
    #1: 556
    #2: 6694
    #3: 1862
    #4: 962
    #0: 9260
    #1: 557
    #2: 6695
    #3: 1863
    #4: 963
    ...
    *///:~
    

    4、终结任务

    在阻塞时终结

    线程状态:

    1. 新建
    2. 就绪。这种状态下,只要把时间片分配给线程,线程就可以运行。
    3. 阻塞。线程能够运行,担忧条件组织它运行。调度器将忽略线程,不会分配给线程任何CPU时间。直到线程重新进入就绪。
    4. 死亡。通常方式是从run方法返回。
      进入阻塞状态:
    5. sleep()
    6. 通过wait()挂起,直到线程得到了notify()或notifyAll()。
    7. 任务在等待某个输入输出完成。
    8. 任务试图在某个对象上调用其同步控制方法,但是锁不可用。

    中断

    • 在任务的run()方法中间打断,更像是抛出异常,因此Java线程中的这种类型的异常中断中用到了异常。
    • Thread类包含interrupt()方法,这个方法将设置线程的中断状态。
    • 如果一个线程已经阻塞或者试图执行一个阻塞操作,设置这个线程中断状态将抛出InterruptedException。
    • 当抛出该异常或者该任务调用Thread.interrupted(),中断状态将被复位。
    • IO和Synchronized块上的等待是不可中断的。中断,就是已经在等东西,你给它打断了。
    class SleepBlocked implements Runnable {
      public void run() {
        try {
          TimeUnit.SECONDS.sleep(100);
        } catch(InterruptedException e) {
          print("InterruptedException");
        }
        print("Exiting SleepBlocked.run()");
      }
    }
    
    class IOBlocked implements Runnable {
      private InputStream in;
      public IOBlocked(InputStream is) { in = is; }
      public void run() {
        try {
          print("Waiting for read():");
          in.read();
        } catch(IOException e) {
          if(Thread.currentThread().isInterrupted()) {
            print("Interrupted from blocked I/O");
          } else {
            throw new RuntimeException(e);
          }
        }
        print("Exiting IOBlocked.run()");
      }
    }
    
    class SynchronizedBlocked implements Runnable {
      public synchronized void f() {
        while(true) // Never releases lock
          Thread.yield();
      }
      public SynchronizedBlocked() {
        new Thread() {
          public void run() {
            f(); // Lock acquired by this thread
          }
        }.start();
      }
      public void run() {
        print("Trying to call f()");
        f();
        print("Exiting SynchronizedBlocked.run()");
      }
    }
    
    public class Interrupting {
      private static ExecutorService exec =
        Executors.newCachedThreadPool();
      static void test(Runnable r) throws InterruptedException{
        Future<?> f = exec.submit(r);
        TimeUnit.MILLISECONDS.sleep(100);
        print("Interrupting " + r.getClass().getName());
        f.cancel(true); // Interrupts if running
        print("Interrupt sent to " + r.getClass().getName());
      }
      public static void main(String[] args) throws Exception {
        test(new SleepBlocked());
        test(new IOBlocked(System.in));
        test(new SynchronizedBlocked());
        TimeUnit.SECONDS.sleep(3);
        print("Aborting with System.exit(0)");
        System.exit(0); // ... since last 2 interrupts failed
      }
    } /* Output: (95% match)
    Interrupting SleepBlocked
    InterruptedException
    Exiting SleepBlocked.run()
    Interrupt sent to SleepBlocked
    Waiting for read():
    Interrupting IOBlocked
    Interrupt sent to IOBlocked
    Trying to call f()
    Interrupting SynchronizedBlocked
    Interrupt sent to SynchronizedBlocked
    Aborting with System.exit(0)
    *///:~
    
    • 在ReentrantLock上阻塞的任务具备可以被中断的能力。
    class BlockedMutex {
      private Lock lock = new ReentrantLock();
      public BlockedMutex() {
        // Acquire it right away, to demonstrate interruption
        // of a task blocked on a ReentrantLock:
        lock.lock();
      }
      public void f() {
        try {
          // This will never be available to a second task
          lock.lockInterruptibly(); // Special call
          print("lock acquired in f()");
        } catch(InterruptedException e) {
          print("Interrupted from lock acquisition in f()");
        }
      }
    }
    
    class Blocked2 implements Runnable {
      BlockedMutex blocked = new BlockedMutex();
      public void run() {
        print("Waiting for f() in BlockedMutex");
        blocked.f();
        print("Broken out of blocked call");
      }
    }
    
    public class Interrupting2 {
      public static void main(String[] args) throws Exception {
        Thread t = new Thread(new Blocked2());
        t.start();
        TimeUnit.SECONDS.sleep(1);
        System.out.println("Issuing t.interrupt()");
        t.interrupt();
      }
    } /* Output:
    Waiting for f() in BlockedMutex
    Issuing t.interrupt()
    Interrupted from lock acquisition in f()
    Broken out of blocked call
    *///:~
    

    检查中断

    • 中断发生的唯一时刻是在任务要进入到阻塞操作中或者已经在阻塞操作内部时。
    • 如果调用interrupt()以停止某个任务,那么在run()循环碰巧没有任何阻塞调用,需要其他方式退出。
    • 可以调用interrupted()来检查中断状态,可以清除中断状态。
      https://blog.csdn.net/qq_39682377/article/details/81449451
    class NeedsCleanup {
      private final int id;
      public NeedsCleanup(int ident) {
        id = ident;
        print("NeedsCleanup " + id);
      }
      public void cleanup() {
        print("Cleaning up " + id);
      }
    }
    
    class Blocked3 implements Runnable {
      private volatile double d = 0.0;
      public void run() {
        try {
          while(!Thread.interrupted()) {
            // point1
            NeedsCleanup n1 = new NeedsCleanup(1);
            // Start try-finally immediately after definition
            // of n1, to guarantee proper cleanup of n1:
            try {
              print("Sleeping");
              TimeUnit.SECONDS.sleep(1);
              // point2
              NeedsCleanup n2 = new NeedsCleanup(2);
              // Guarantee proper cleanup of n2:
              try {
                print("Calculating");
                // A time-consuming, non-blocking operation:
                for(int i = 1; i < 2500000; i++)
                  d = d + (Math.PI + Math.E) / d;
                print("Finished time-consuming operation");
              } finally {
                n2.cleanup();
              }
            } finally {
              n1.cleanup();
            }
          }
          print("Exiting via while() test");
        } catch(InterruptedException e) {
          print("Exiting via InterruptedException");
        }
      }
    }
    
    public class InterruptingIdiom {
      public static void main(String[] args) throws Exception {
        if(args.length != 1) {
          print("usage: java InterruptingIdiom delay-in-mS");
          System.exit(1);
        }
        Thread t = new Thread(new Blocked3());
        t.start();
        TimeUnit.MILLISECONDS.sleep(new Integer(args[0]));
        t.interrupt();
      }
    } /* Output: (Sample)
    NeedsCleanup 1
    Sleeping
    NeedsCleanup 2
    Calculating
    Finished time-consuming operation
    Cleaning up 2
    Cleaning up 1
    NeedsCleanup 1
    Sleeping
    Cleaning up 1
    Exiting via InterruptedException
    *///:~
    

    5、线程之间的协作

    • 多个任务一起工作去解决某个问题。在这些问题中,某些部分必须在其他部分被解决之前解决。
    • 关键是任务之间的握手。使用基础特性:互斥。确保只有一个任务可以相应某个信号。
    • 在互斥之上,为任务添加一种途径,可将自身挂起,知道某些外部条件发生变化。

    wait()与notifyAll()

    • wait()会在等待外部世界陈才产生变化的时候将任务挂起,并且只有在notify()或notifyAll()发生时,才会被唤醒并间抽查所产生的变化。
    • sleep()并没有释放锁,但调用wait()会释放锁。
    • 调用wait()、notify()和notifyAll()的任务在调用这些方法前必须拥有对象的所。
    class Car {
      private boolean waxOn = false;
      public synchronized void waxed() {
        waxOn = true; // Ready to buff
        notifyAll();
      }
      public synchronized void buffed() {
        waxOn = false; // Ready for another coat of wax
        notifyAll();
      }
      public synchronized void waitForWaxing()
      throws InterruptedException {
        while(waxOn == false)
          wait();
      }
      public synchronized void waitForBuffing()
      throws InterruptedException {
        while(waxOn == true)
          wait();
      }
    }
    
    class WaxOn implements Runnable {
      private Car car;
      public WaxOn(Car c) { car = c; }
      public void run() {
        try {
          while(!Thread.interrupted()) {
            printnb("Wax On! ");
            TimeUnit.MILLISECONDS.sleep(200);
            car.waxed();
            car.waitForBuffing();
          }
        } catch(InterruptedException e) {
          print("Exiting via interrupt");
        }
        print("Ending Wax On task");
      }
    }
    
    class WaxOff implements Runnable {
      private Car car;
      public WaxOff(Car c) { car = c; }
      public void run() {
        try {
          while(!Thread.interrupted()) {
            car.waitForWaxing();
            printnb("Wax Off! ");
            TimeUnit.MILLISECONDS.sleep(200);
            car.buffed();
          }
        } catch(InterruptedException e) {
          print("Exiting via interrupt");
        }
        print("Ending Wax Off task");
      }
    }
    
    public class WaxOMatic {
      public static void main(String[] args) throws Exception {
        Car car = new Car();
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(new WaxOff(car));
        exec.execute(new WaxOn(car));
        TimeUnit.SECONDS.sleep(5); // Run for a while...
        exec.shutdownNow(); // Interrupt all tasks
      }
    } /* Output: (95% match)
    Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Exiting via interrupt
    Ending Wax On task
    Exiting via interrupt
    Ending Wax Off task
    *///:~
    
    • 必须使用一个检查感兴趣条件的while循环包围wait(),因为有可能有多个任务出于相同原因等待一个锁,唤起一个,其他还得继续等;任务唤起,某个其他任务已经做出改变,使得这个任务不能执行;某些任务出于不同原因在等待你的对象上的锁。
      错失的信号
      错失信号
    • 如果刚好somecondition满足了,到了Point1,这个时候切换了,T1发了个notify(),T2就错过这个notify()了。
    • 正确的办法是这样


    notify与notifyAll

    • 使用notify()而不是notifyAll()是一种优化。使用notify()必须保证被唤醒的是恰当的任务。为了使用notify(),所有任务必须等待相同的条件。
    • notifyAll()将唤醒所有正在等待的任务,唤醒的是等待这个锁的任务。

    生产者与消费者

    class Meal {
      private final int orderNum;
      public Meal(int orderNum) { this.orderNum = orderNum; }
      public String toString() { return "Meal " + orderNum; }
    }
    
    class WaitPerson implements Runnable {
      private Restaurant restaurant;
      public WaitPerson(Restaurant r) { restaurant = r; }
      public void run() {
        try {
          while(!Thread.interrupted()) {
            synchronized(this) {
              while(restaurant.meal == null)
                wait(); // ... for the chef to produce a meal
            }
            print("Waitperson got " + restaurant.meal);
            synchronized(restaurant.chef) {
              restaurant.meal = null;
              restaurant.chef.notifyAll(); // Ready for another
            }
          }
        } catch(InterruptedException e) {
          print("WaitPerson interrupted");
        }
      }
    }
    
    class Chef implements Runnable {
      private Restaurant restaurant;
      private int count = 0;
      public Chef(Restaurant r) { restaurant = r; }
      public void run() {
        try {
          while(!Thread.interrupted()) {
            synchronized(this) {
              while(restaurant.meal != null)
                wait(); // ... for the meal to be taken
            }
            if(++count == 10) {
              print("Out of food, closing");
              restaurant.exec.shutdownNow();
            }
            printnb("Order up! ");
            synchronized(restaurant.waitPerson) {
              restaurant.meal = new Meal(count);
              restaurant.waitPerson.notifyAll();
            }
            TimeUnit.MILLISECONDS.sleep(100);
          }
        } catch(InterruptedException e) {
          print("Chef interrupted");
        }
      }
    }
    
    public class Restaurant {
      Meal meal;
      ExecutorService exec = Executors.newCachedThreadPool();
      WaitPerson waitPerson = new WaitPerson(this);
      Chef chef = new Chef(this);
      public Restaurant() {
        exec.execute(chef);
        exec.execute(waitPerson);
      }
      public static void main(String[] args) {
        new Restaurant();
      }
    } /* Output:
    Order up! Waitperson got Meal 1
    Order up! Waitperson got Meal 2
    Order up! Waitperson got Meal 3
    Order up! Waitperson got Meal 4
    Order up! Waitperson got Meal 5
    Order up! Waitperson got Meal 6
    Order up! Waitperson got Meal 7
    Order up! Waitperson got Meal 8
    Order up! Waitperson got Meal 9
    Out of food, closing
    WaitPerson interrupted
    Order up! Chef interrupted
    *///:~
    
    • 就像前面提到的,服务员锁了自己之后就在wait(),wait()是对服务员产生的,所以notify()要获取服务员的锁。厨师也一样。
    • shuntdonwNow将向所有ExecutorService启动的任务发送interrup()
    • 厨师、服务员都在Restaurant类里面。自己也持有同一个restaurant对象。

    显式的Lock和Condition

    class Car {
      private Lock lock = new ReentrantLock();
      private Condition condition = lock.newCondition();
      private boolean waxOn = false;
      public void waxed() {
        lock.lock();
        try {
          waxOn = true; // Ready to buff
          condition.signalAll();
        } finally {
          lock.unlock();
        }
      }
      public void buffed() {
        lock.lock();
        try {
          waxOn = false; // Ready for another coat of wax
          condition.signalAll();
        } finally {
          lock.unlock();
        }
      }
      public void waitForWaxing() throws InterruptedException {
        lock.lock();
        try {
          while(waxOn == false)
            condition.await();
        } finally {
          lock.unlock();
        }
      }
      public void waitForBuffing() throws InterruptedException{
        lock.lock();
        try {
          while(waxOn == true)
            condition.await();
        } finally {
          lock.unlock();
        }
      }
    }
    
    class WaxOn implements Runnable {
      private Car car;
      public WaxOn(Car c) { car = c; }
      public void run() {
        try {
          while(!Thread.interrupted()) {
            printnb("Wax On! ");
            TimeUnit.MILLISECONDS.sleep(200);
            car.waxed();
            car.waitForBuffing();
          }
        } catch(InterruptedException e) {
          print("Exiting via interrupt");
        }
        print("Ending Wax On task");
      }
    }
    
    class WaxOff implements Runnable {
      private Car car;
      public WaxOff(Car c) { car = c; }
      public void run() {
        try {
          while(!Thread.interrupted()) {
            car.waitForWaxing();
            printnb("Wax Off! ");
            TimeUnit.MILLISECONDS.sleep(200);
            car.buffed();
          }
        } catch(InterruptedException e) {
          print("Exiting via interrupt");
        }
        print("Ending Wax Off task");
      }
    }
    
    public class WaxOMatic2 {
      public static void main(String[] args) throws Exception {
        Car car = new Car();
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(new WaxOff(car));
        exec.execute(new WaxOn(car));
        TimeUnit.SECONDS.sleep(5);
        exec.shutdownNow();
      }
    } /* Output: (90% match)
    Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Exiting via interrupt
    Ending Wax Off task
    Exiting via interrupt
    Ending Wax On task
    *///:~
    

    生产者-消费者队列

    • 使用同步队列来解决任务写作问题,队列在任何时候只允许一个任务插入或移出元素。
    • 如果消费者任务试图从队列中获取对象,而队列为空,那么这些队列可以挂起消费者任务,并且当有更多元素时可以恢复消费者任务。
    • java.util.concurrent.BlockingQueue

    制作吐司的例子

    class Toast {
      public enum Status { DRY, BUTTERED, JAMMED }
      private Status status = Status.DRY;
      private final int id;
      public Toast(int idn) { id = idn; }
      public void butter() { status = Status.BUTTERED; }
      public void jam() { status = Status.JAMMED; }
      public Status getStatus() { return status; }
      public int getId() { return id; }
      public String toString() {
        return "Toast " + id + ": " + status;
      }
    }
    
    class ToastQueue extends LinkedBlockingQueue<Toast> {}
    
    class Toaster implements Runnable {
      private ToastQueue toastQueue;
      private int count = 0;
      private Random rand = new Random(47);
      public Toaster(ToastQueue tq) { toastQueue = tq; }
      public void run() {
        try {
          while(!Thread.interrupted()) {
            TimeUnit.MILLISECONDS.sleep(
              100 + rand.nextInt(500));
            // Make toast
            Toast t = new Toast(count++);
            print(t);
            // Insert into queue
            toastQueue.put(t);
          }
        } catch(InterruptedException e) {
          print("Toaster interrupted");
        }
        print("Toaster off");
      }
    }
    
    // Apply butter to toast:
    class Butterer implements Runnable {
      private ToastQueue dryQueue, butteredQueue;
      public Butterer(ToastQueue dry, ToastQueue buttered) {
        dryQueue = dry;
        butteredQueue = buttered;
      }
      public void run() {
        try {
          while(!Thread.interrupted()) {
            // Blocks until next piece of toast is available:
            Toast t = dryQueue.take();
            t.butter();
            print(t);
            butteredQueue.put(t);
          }
        } catch(InterruptedException e) {
          print("Butterer interrupted");
        }
        print("Butterer off");
      }
    }
    
    // Apply jam to buttered toast:
    class Jammer implements Runnable {
      private ToastQueue butteredQueue, finishedQueue;
      public Jammer(ToastQueue buttered, ToastQueue finished) {
        butteredQueue = buttered;
        finishedQueue = finished;
      }
      public void run() {
        try {
          while(!Thread.interrupted()) {
            // Blocks until next piece of toast is available:
            Toast t = butteredQueue.take();
            t.jam();
            print(t);
            finishedQueue.put(t);
          }
        } catch(InterruptedException e) {
          print("Jammer interrupted");
        }
        print("Jammer off");
      }
    }
    
    // Consume the toast:
    class Eater implements Runnable {
      private ToastQueue finishedQueue;
      private int counter = 0;
      public Eater(ToastQueue finished) {
        finishedQueue = finished;
      }
      public void run() {
        try {
          while(!Thread.interrupted()) {
            // Blocks until next piece of toast is available:
            Toast t = finishedQueue.take();
            // Verify that the toast is coming in order,
            // and that all pieces are getting jammed:
            if(t.getId() != counter++ ||
               t.getStatus() != Toast.Status.JAMMED) {
              print(">>>> Error: " + t);
              System.exit(1);
            } else
              print("Chomp! " + t);
          }
        } catch(InterruptedException e) {
          print("Eater interrupted");
        }
        print("Eater off");
      }
    }
    
    public class ToastOMatic {
      public static void main(String[] args) throws Exception {
        ToastQueue dryQueue = new ToastQueue(),
                   butteredQueue = new ToastQueue(),
                   finishedQueue = new ToastQueue();
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(new Toaster(dryQueue));
        exec.execute(new Butterer(dryQueue, butteredQueue));
        exec.execute(new Jammer(butteredQueue, finishedQueue));
        exec.execute(new Eater(finishedQueue));
        TimeUnit.SECONDS.sleep(5);
        exec.shutdownNow();
      }
    } /* (Execute to see output) *///:~
    

    6、死锁

    死锁的条件:

    1. 互斥条件。使用的资源中至少一个是不能共享的。
    2. 至少有一个任务它必须持有一个资源且正在等待获取一个当前被别的任务持有的资源。
    3. 资源不能被任务抢占,任务必须把资源释放当做普通时间。
    4. 必须有循环等待。
    • 防止死锁最容易的方法是破坏第四个条件。
    • 哲学家问题,筷子是共享的资源,只要让其中一个哲学家先拿起左边的,后拿起右边的,其他人先拿右边的,再拿左边的,就可以解决死锁。
    public class FixedDiningPhilosophers {
      public static void main(String[] args) throws Exception {
        int ponder = 5;
        if(args.length > 0)
          ponder = Integer.parseInt(args[0]);
        int size = 5;
        if(args.length > 1)
          size = Integer.parseInt(args[1]);
        ExecutorService exec = Executors.newCachedThreadPool();
        Chopstick[] sticks = new Chopstick[size];
        for(int i = 0; i < size; i++)
          sticks[i] = new Chopstick();
        for(int i = 0; i < size; i++)
          if(i < (size-1))
            exec.execute(new Philosopher(
              sticks[i], sticks[i+1], i, ponder));
          else
            exec.execute(new Philosopher(
              sticks[0], sticks[i], i, ponder));
        if(args.length == 3 && args[2].equals("timeout"))
          TimeUnit.SECONDS.sleep(5);
        else {
          System.out.println("Press 'Enter' to quit");
          System.in.read();
        }
        exec.shutdownNow();
      }
    } /* (Execute to see output) *///:~
    

    7、新类库中的构件

    CountDownLatch

    • 同步一个或多个任务,强制它们等待由其他任务执行的一组操作完成。
    • 任何在这个对象上调用wait()的方法都将阻塞,直至这个技术到达0。其他任务在结束工作时可以对该对象调用countDown()来减小计数。
    • CountDownLatch被设计为只处罚一次,计数不能被充值。
    • 对await()的调用会被阻塞,直至技术值到达0。
    class TaskPortion implements Runnable {
      private static int counter = 0;
      private final int id = counter++;
      private static Random rand = new Random(47);
      private final CountDownLatch latch;
      TaskPortion(CountDownLatch latch) {
        this.latch = latch;
      }
      public void run() {
        try {
          doWork();
          latch.countDown();
        } catch(InterruptedException ex) {
          // Acceptable way to exit
        }
      }
      public void doWork() throws InterruptedException {
        TimeUnit.MILLISECONDS.sleep(rand.nextInt(2000));
        print(this + "completed");
      }
      public String toString() {
        return String.format("%1$-3d ", id);
      }
    }
    
    // Waits on the CountDownLatch:
    class WaitingTask implements Runnable {
      private static int counter = 0;
      private final int id = counter++;
      private final CountDownLatch latch;
      WaitingTask(CountDownLatch latch) {
        this.latch = latch;
      }
      public void run() {
        try {
          latch.await();
          print("Latch barrier passed for " + this);
        } catch(InterruptedException ex) {
          print(this + " interrupted");
        }
      }
      public String toString() {
        return String.format("WaitingTask %1$-3d ", id);
      }
    }
    
    public class CountDownLatchDemo {
      static final int SIZE = 100;
      public static void main(String[] args) throws Exception {
        ExecutorService exec = Executors.newCachedThreadPool();
        // All must share a single CountDownLatch object:
        CountDownLatch latch = new CountDownLatch(SIZE);
        for(int i = 0; i < 10; i++)
          exec.execute(new WaitingTask(latch));
        for(int i = 0; i < SIZE; i++)
          exec.execute(new TaskPortion(latch));
        print("Launched all tasks");
        exec.shutdown(); // Quit when all tasks complete
      }
    } /* (Execute to see output) *///:~
    

    CyclicBarrier

    • 可多此次重用
    • 赛马仿真
    class Horse implements Runnable {
      private static int counter = 0;
      private final int id = counter++;
      private int strides = 0;
      private static Random rand = new Random(47);
      private static CyclicBarrier barrier;
      public Horse(CyclicBarrier b) { barrier = b; }
      public synchronized int getStrides() { return strides; }
      public void run() {
        try {
          while(!Thread.interrupted()) {
            synchronized(this) {
              strides += rand.nextInt(3); // Produces 0, 1 or 2
            }
            barrier.await();
          }
        } catch(InterruptedException e) {
          // A legitimate way to exit
        } catch(BrokenBarrierException e) {
          // This one we want to know about
          throw new RuntimeException(e);
        }
      }
      public String toString() { return "Horse " + id + " "; }
      public String tracks() {
        StringBuilder s = new StringBuilder();
        for(int i = 0; i < getStrides(); i++)
          s.append("*");
        s.append(id);
        return s.toString();
      }
    }
    
    public class HorseRace {
      static final int FINISH_LINE = 75;
      private List<Horse> horses = new ArrayList<Horse>();
      private ExecutorService exec =
        Executors.newCachedThreadPool();
      private CyclicBarrier barrier;
      public HorseRace(int nHorses, final int pause) {
        barrier = new CyclicBarrier(nHorses, new Runnable() {
          public void run() {
            StringBuilder s = new StringBuilder();
            for(int i = 0; i < FINISH_LINE; i++)
              s.append("="); // The fence on the racetrack
            print(s);
            for(Horse horse : horses)
              print(horse.tracks());
            for(Horse horse : horses)
              if(horse.getStrides() >= FINISH_LINE) {
                print(horse + "won!");
                exec.shutdownNow();
                return;
              }
            try {
              TimeUnit.MILLISECONDS.sleep(pause);
            } catch(InterruptedException e) {
              print("barrier-action sleep interrupted");
            }
          }
        });
        for(int i = 0; i < nHorses; i++) {
          Horse horse = new Horse(barrier);
          horses.add(horse);
          exec.execute(horse);
        }
      }
      public static void main(String[] args) {
        int nHorses = 7;
        int pause = 200;
        if(args.length > 0) { // Optional argument
          int n = new Integer(args[0]);
          nHorses = n > 0 ? n : nHorses;
        }
        if(args.length > 1) { // Optional argument
          int p = new Integer(args[1]);
          pause = p > -1 ? p : pause;
        }
        new HorseRace(nHorses, pause);
      }
    } /* (Execute to see output) *///:~
    
    

    DelayQueue

    • 无界的BlockingQueue,用于放置实现了Delayed接口的对象。其中对象智能在到期时才能从队列取走。
    • 队头的对象就是等待最长的对象。
    • List<DelayTask>存放任务创建顺序,getDelay()返回延迟时间。
    class DelayedTask implements Runnable, Delayed {
      private static int counter = 0;
      private final int id = counter++;
      private final int delta;
      private final long trigger;
      protected static List<DelayedTask> sequence =
        new ArrayList<DelayedTask>();
      public DelayedTask(int delayInMilliseconds) {
        delta = delayInMilliseconds;
        trigger = System.nanoTime() +
          NANOSECONDS.convert(delta, MILLISECONDS);
        sequence.add(this);
      }
      public long getDelay(TimeUnit unit) {
        return unit.convert(
          trigger - System.nanoTime(), NANOSECONDS);
      }
      public int compareTo(Delayed arg) {
        DelayedTask that = (DelayedTask)arg;
        if(trigger < that.trigger) return -1;
        if(trigger > that.trigger) return 1;
        return 0;
      }
      public void run() { printnb(this + " "); }
      public String toString() {
        return String.format("[%1$-4d]", delta) +
          " Task " + id;
      }
      public String summary() {
        return "(" + id + ":" + delta + ")";
      }
      public static class EndSentinel extends DelayedTask {
        private ExecutorService exec;
        public EndSentinel(int delay, ExecutorService e) {
          super(delay);
          exec = e;
        }
        public void run() {
          for(DelayedTask pt : sequence) {
            printnb(pt.summary() + " ");
          }
          print();
          print(this + " Calling shutdownNow()");
          exec.shutdownNow();
        }
      }
    }
    
    class DelayedTaskConsumer implements Runnable {
      private DelayQueue<DelayedTask> q;
      public DelayedTaskConsumer(DelayQueue<DelayedTask> q) {
        this.q = q;
      }
      public void run() {
        try {
          while(!Thread.interrupted())
            q.take().run(); // Run task with the current thread
        } catch(InterruptedException e) {
          // Acceptable way to exit
        }
        print("Finished DelayedTaskConsumer");
      }
    }
    
    public class DelayQueueDemo {
      public static void main(String[] args) {
        Random rand = new Random(47);
        ExecutorService exec = Executors.newCachedThreadPool();
        DelayQueue<DelayedTask> queue =
          new DelayQueue<DelayedTask>();
        // Fill with tasks that have random delays:
        for(int i = 0; i < 20; i++)
          queue.put(new DelayedTask(rand.nextInt(5000)));
        // Set the stopping point
        queue.add(new DelayedTask.EndSentinel(5000, exec));
        exec.execute(new DelayedTaskConsumer(queue));
      }
    } /* Output:
    [128 ] Task 11 [200 ] Task 7 [429 ] Task 5 [520 ] Task 18 [555 ] Task 1 [961 ] Task 4 [998 ] Task 16 [1207] Task 9 [1693] Task 2 [1809] Task 14 [1861] Task 3 [2278] Task 15 [3288] Task 10 [3551] Task 12 [4258] Task 0 [4258] Task 19 [4522] Task 8 [4589] Task 13 [4861] Task 17 [4868] Task 6 (0:4258) (1:555) (2:1693) (3:1861) (4:961) (5:429) (6:4868) (7:200) (8:4522) (9:1207) (10:3288) (11:128) (12:3551) (13:4589) (14:1809) (15:2278) (16:998) (17:4861) (18:520) (19:4258) (20:5000)
    [5000] Task 20 Calling shutdownNow()
    Finished DelayedTaskConsumer
    *///:~
    

    PriorityBlockingQueue

    • 具有可阻塞的读取操作。

    ScheduledExecutor

    • 通过使用schedule()或scheduleAtFixedRate(),可以将Runnable对象设置为在将来某个时刻执行。

    Semaphore

    • 允许n个任务同时访问这个资源。
    public class Pool<T> {
      private int size;
      private List<T> items = new ArrayList<T>();
      private volatile boolean[] checkedOut;
      private Semaphore available;
      public Pool(Class<T> classObject, int size) {
        this.size = size;
        checkedOut = new boolean[size];
        available = new Semaphore(size, true);
        // Load pool with objects that can be checked out:
        for(int i = 0; i < size; ++i)
          try {
            // Assumes a default constructor:
            items.add(classObject.newInstance());
          } catch(Exception e) {
            throw new RuntimeException(e);
          }
      }
      public T checkOut() throws InterruptedException {
        available.acquire();
        return getItem();
      }
      public void checkIn(T x) {
        if(releaseItem(x))
          available.release();
      }
      private synchronized T getItem() {
        for(int i = 0; i < size; ++i)
          if(!checkedOut[i]) {
            checkedOut[i] = true;
            return items.get(i);
          }
        return null; // Semaphore prevents reaching here
      }
      private synchronized boolean releaseItem(T item) {
        int index = items.indexOf(item);
        if(index == -1) return false; // Not in the list
        if(checkedOut[index]) {
          checkedOut[index] = false;
          return true;
        }
        return false; // Wasn't checked out
      }
    } ///:~
    

    Excharger

    • 两个任务之间交换对象的栅栏。
    • 任务进入这些栅栏时,它们各自拥有一个对象,离开时,它们都有由之前对象持有的对象。
    • 典型场景是,一个任务创建对象,这些对象生产代价高昂,而另一个任务在消费这些对象,通过这种方式,可以有更多的对象在被创建的同时被消费。
    class ExchangerProducer<T> implements Runnable {
      private Generator<T> generator;
      private Exchanger<List<T>> exchanger;
      private List<T> holder;
      ExchangerProducer(Exchanger<List<T>> exchg,
      Generator<T> gen, List<T> holder) {
        exchanger = exchg;
        generator = gen;
        this.holder = holder;
      }
      public void run() {
        try {
          while(!Thread.interrupted()) {
            for(int i = 0; i < ExchangerDemo.size; i++)
              holder.add(generator.next());
            // Exchange full for empty:
            holder = exchanger.exchange(holder);
          }
        } catch(InterruptedException e) {
          // OK to terminate this way.
        }
      }
    }
    
    class ExchangerConsumer<T> implements Runnable {
      private Exchanger<List<T>> exchanger;
      private List<T> holder;
      private volatile T value;
      ExchangerConsumer(Exchanger<List<T>> ex, List<T> holder){
        exchanger = ex;
        this.holder = holder;
      }
      public void run() {
        try {
          while(!Thread.interrupted()) {
            holder = exchanger.exchange(holder);
            for(T x : holder) {
              value = x; // Fetch out value
              holder.remove(x); // OK for CopyOnWriteArrayList
            }
          }
        } catch(InterruptedException e) {
          // OK to terminate this way.
        }
        System.out.println("Final value: " + value);
      }
    }
    
    public class ExchangerDemo {
      static int size = 10;
      static int delay = 5; // Seconds
      public static void main(String[] args) throws Exception {
        if(args.length > 0)
          size = new Integer(args[0]);
        if(args.length > 1)
          delay = new Integer(args[1]);
        ExecutorService exec = Executors.newCachedThreadPool();
        Exchanger<List<Fat>> xc = new Exchanger<List<Fat>>();
        List<Fat>
          producerList = new CopyOnWriteArrayList<Fat>(),
          consumerList = new CopyOnWriteArrayList<Fat>();
        exec.execute(new ExchangerProducer<Fat>(xc,
          BasicGenerator.create(Fat.class), producerList));
        exec.execute(
          new ExchangerConsumer<Fat>(xc,consumerList));
        TimeUnit.SECONDS.sleep(delay);
        exec.shutdownNow();
      }
    } /* Output: (Sample)
    Final value: Fat id: 29999
    *///:~
    

    8、仿真

    • SynchronousQueue没有内部容量的阻塞队列,每个put()都需要等待一个take()
    • Order由顾客给服务员然后再给厨师;Plate是厨师给服务员再给顾客。
    • 顾客持有服务员对象,有一个placeSetting队列,可以获取菜。
    • 服务员持有一个filledOrders,是做好的菜存放的地方。点单是放restaurant的BlockingQueue里
      各种队列 https://blog.csdn.net/Student111w/article/details/118887900
    // This is given to the waiter, who gives it to the chef:
    class Order { // (A data-transfer object)
      private static int counter = 0;
      private final int id = counter++;
      private final Customer customer;
      private final WaitPerson waitPerson;
      private final Food food;
      public Order(Customer cust, WaitPerson wp, Food f) {
        customer = cust;
        waitPerson = wp;
        food = f;
      }
      public Food item() { return food; }
      public Customer getCustomer() { return customer; }
      public WaitPerson getWaitPerson() { return waitPerson; }
      public String toString() {
        return "Order: " + id + " item: " + food +
          " for: " + customer +
          " served by: " + waitPerson;
      }
    }
    
    // This is what comes back from the chef:
    class Plate {
      private final Order order;
      private final Food food;
      public Plate(Order ord, Food f) {
        order = ord;
        food = f;
      }
      public Order getOrder() { return order; }
      public Food getFood() { return food; }
      public String toString() { return food.toString(); }
    }
    
    class Customer implements Runnable {
      private static int counter = 0;
      private final int id = counter++;
      private final WaitPerson waitPerson;
      // Only one course at a time can be received:
      private SynchronousQueue<Plate> placeSetting =
        new SynchronousQueue<Plate>();
      public Customer(WaitPerson w) { waitPerson = w; }
      public void
      deliver(Plate p) throws InterruptedException {
        // Only blocks if customer is still
        // eating the previous course:
        placeSetting.put(p);
      }
      public void run() {
        for(Course course : Course.values()) {
          Food food = course.randomSelection();
          try {
            waitPerson.placeOrder(this, food);
            // Blocks until course has been delivered:
            print(this + "eating " + placeSetting.take());
          } catch(InterruptedException e) {
            print(this + "waiting for " +
              course + " interrupted");
            break;
          }
        }
        print(this + "finished meal, leaving");
      }
      public String toString() {
        return "Customer " + id + " ";
      }
    }
    
    class WaitPerson implements Runnable {
      private static int counter = 0;
      private final int id = counter++;
      private final Restaurant restaurant;
      BlockingQueue<Plate> filledOrders =
        new LinkedBlockingQueue<Plate>();
      public WaitPerson(Restaurant rest) { restaurant = rest; }
      public void placeOrder(Customer cust, Food food) {
        try {
          // Shouldn't actually block because this is
          // a LinkedBlockingQueue with no size limit:
          restaurant.orders.put(new Order(cust, this, food));
        } catch(InterruptedException e) {
          print(this + " placeOrder interrupted");
        }
      }
      public void run() {
        try {
          while(!Thread.interrupted()) {
            // Blocks until a course is ready
            Plate plate = filledOrders.take();
            print(this + "received " + plate +
              " delivering to " +
              plate.getOrder().getCustomer());
            plate.getOrder().getCustomer().deliver(plate);
          }
        } catch(InterruptedException e) {
          print(this + " interrupted");
        }
        print(this + " off duty");
      }
      public String toString() {
        return "WaitPerson " + id + " ";
      }
    }
    
    class Chef implements Runnable {
      private static int counter = 0;
      private final int id = counter++;
      private final Restaurant restaurant;
      private static Random rand = new Random(47);
      public Chef(Restaurant rest) { restaurant = rest; }
      public void run() {
        try {
          while(!Thread.interrupted()) {
            // Blocks until an order appears:
            Order order = restaurant.orders.take();
            Food requestedItem = order.item();
            // Time to prepare order:
            TimeUnit.MILLISECONDS.sleep(rand.nextInt(500));
            Plate plate = new Plate(order, requestedItem);
            order.getWaitPerson().filledOrders.put(plate);
          }
        } catch(InterruptedException e) {
          print(this + " interrupted");
        }
        print(this + " off duty");
      }
      public String toString() { return "Chef " + id + " "; }
    }
    
    class Restaurant implements Runnable {
      private List<WaitPerson> waitPersons =
        new ArrayList<WaitPerson>();
      private List<Chef> chefs = new ArrayList<Chef>();
      private ExecutorService exec;
      private static Random rand = new Random(47);
      BlockingQueue<Order>
        orders = new LinkedBlockingQueue<Order>();
      public Restaurant(ExecutorService e, int nWaitPersons,
        int nChefs) {
        exec = e;
        for(int i = 0; i < nWaitPersons; i++) {
          WaitPerson waitPerson = new WaitPerson(this);
          waitPersons.add(waitPerson);
          exec.execute(waitPerson);
        }
        for(int i = 0; i < nChefs; i++) {
          Chef chef = new Chef(this);
          chefs.add(chef);
          exec.execute(chef);
        }
      }
      public void run() {
        try {
          while(!Thread.interrupted()) {
            // A new customer arrives; assign a WaitPerson:
            WaitPerson wp = waitPersons.get(
              rand.nextInt(waitPersons.size()));
            Customer c = new Customer(wp);
            exec.execute(c);
            TimeUnit.MILLISECONDS.sleep(100);
          }
        } catch(InterruptedException e) {
          print("Restaurant interrupted");
        }
        print("Restaurant closing");
      }
    }
    
    public class RestaurantWithQueues {
      public static void main(String[] args) throws Exception {
        ExecutorService exec = Executors.newCachedThreadPool();
        Restaurant restaurant = new Restaurant(exec, 5, 2);
        exec.execute(restaurant);
        if(args.length > 0) // Optional argument
          TimeUnit.SECONDS.sleep(new Integer(args[0]));
        else {
          print("Press 'Enter' to quit");
          System.in.read();
        }
        exec.shutdownNow();
      }
    } /* Output: (Sample)
    WaitPerson 0 received SPRING_ROLLS delivering to Customer 1
    Customer 1 eating SPRING_ROLLS
    WaitPerson 3 received SPRING_ROLLS delivering to Customer 0
    Customer 0 eating SPRING_ROLLS
    WaitPerson 0 received BURRITO delivering to Customer 1
    Customer 1 eating BURRITO
    WaitPerson 3 received SPRING_ROLLS delivering to Customer 2
    Customer 2 eating SPRING_ROLLS
    WaitPerson 1 received SOUP delivering to Customer 3
    Customer 3 eating SOUP
    WaitPerson 3 received VINDALOO delivering to Customer 0
    Customer 0 eating VINDALOO
    WaitPerson 0 received FRUIT delivering to Customer 1
    ...
    *///:~
    

    9、性能

    各类互斥技术

    • 使用Lock通常会比synchronized更高效。
    • 但synchronized写法简洁,应该以synchronized入口,只有性能调优时才替换Lock对象。
    • Atomic只有在非常简单的情况下才有效

    免锁容器

    • CopyOnWriteArrayList,写入将导致创建整个底层数组的副本,而源数组将保留在原地,使得复制的数组在被修改时,读操作可以安全地执行。
    • 只要你主要从免锁容器中读取,就会比synchronized快很多
    • ConcurrentHashMap和ConcurrentLinkedQueue 容器中只有部分内容而不是整个容器可以被复制和修改。任何修改完成前,读取者看不到他们。

    乐观加锁

    • 实际上没有使用互斥,在计算完成准备更新Atomic对象时,需要使用compareAndSet()方法。如果和旧值比发现值不一样,那操作就会失败。
    public class FastSimulation {
      static final int N_ELEMENTS = 100000;
      static final int N_GENES = 30;
      static final int N_EVOLVERS = 50;
      static final AtomicInteger[][] GRID =
        new AtomicInteger[N_ELEMENTS][N_GENES];
      static Random rand = new Random(47);
      static class Evolver implements Runnable {
        public void run() {
          while(!Thread.interrupted()) {
            // Randomly select an element to work on:
            int element = rand.nextInt(N_ELEMENTS);
            for(int i = 0; i < N_GENES; i++) {
              int previous = element - 1;
              if(previous < 0) previous = N_ELEMENTS - 1;
              int next = element + 1;
              if(next >= N_ELEMENTS) next = 0;
              int oldvalue = GRID[element][i].get();
              // Perform some kind of modeling calculation:
              int newvalue = oldvalue +
                GRID[previous][i].get() + GRID[next][i].get();
              newvalue /= 3; // Average the three values
              if(!GRID[element][i]
                .compareAndSet(oldvalue, newvalue)) {
                // Policy here to deal with failure. Here, we
                // just report it and ignore it; our model
                // will eventually deal with it.
                print("Old value changed from " + oldvalue);
              }
            }
          }
        }
      }
      public static void main(String[] args) throws Exception {
        ExecutorService exec = Executors.newCachedThreadPool();
        for(int i = 0; i < N_ELEMENTS; i++)
          for(int j = 0; j < N_GENES; j++)
            GRID[i][j] = new AtomicInteger(rand.nextInt(1000));
        for(int i = 0; i < N_EVOLVERS; i++)
          exec.execute(new Evolver());
        TimeUnit.SECONDS.sleep(5);
        exec.shutdownNow();
      }
    } /* (Execute to see output) *///:~
    

    ReadWriteLock

    • ReadWriteLock使得你可以同时有多个读取者,只要它们都不试图写入即可。
    • 如果写锁已经被其他任务持有,那么任何读取者都不能访问,直至这个写锁被释放位置。
    • 能否提高程序性能是不确定的,取决与数据读取频率、被修改的频率、读取和写入的消耗时间、多少线程竞争等因素

    相关文章

      网友评论

          本文标题:Java编程思想(二十) 并发

          本文链接:https://www.haomeiwen.com/subject/tyalbrtx.html