美文网首页
Java核心技术(卷I) 20、并发

Java核心技术(卷I) 20、并发

作者: kaiker | 来源:发表于2021-05-23 20:29 被阅读0次

    1、线程的例子

    public interface Runnable {
      void run();
    }
    
    // 构造一个Runnable对象
    Runnable r = () -> {task code};
    
    // 构造一个Thread对象
    var t = new Thread(r);
    
    // 启动
    t.start();
    

    2、线程状态

    线程状态

    新建线程

    • 线程还没开始运行的时候,状态是新建, new Thread(r)

    可运行线程

    • 一旦调用start方法,线程就处于可运行状态
    • 一个线程开始运行,不一定始终保持运行,线程调度的细节依赖于操作系统提供的服务
    • 桌面及服务器操作系统都适用抢占式调度

    阻塞和等待线程

    • 当一个线程试图获取一个内部的对象锁,而这个锁目前被其他线程占有,就会被堵塞
    • 线程等待另一个线程通知调度器出现一个条件,会进入等待状态
    • 设置了超时参数的,调用这些方法会让线程进入计时等待,这一状态会一直保持到超时满或者接收到适合的通知

    终止线程

    • run方法正常退出,线程自然终止
    • 由于未捕获的异常终止了run方法,线程意外终止

    3、线程属性

    中断线程

    • interrupt方法可以用来请求终止一个线程
    • 如果线程被阻塞,再调用interrupt,会被InterruptException异常中断
    • 中断一个线程只是要引起它的注意,被中断的线程可以决定如何响应中断
    • 如果每次工作迭代后都调用sleep方法,会清除中断状态,并抛出InterruptException
    Runnable r = () -> {
      try {
        while (xx) {
          do something
          Thread.sleep(delay);
        }
      } catch(InterruptedException e) {
        // thread was interrupted during sleep;
      } finally {
        cleanup
      }
      // exiting the run method terminates the thread
    }
    

    守护线程

    • t.setDaemon(true)可以把一个线程转换为守护线程
    • 守护线程会为其他线程提供服务
    • 当只剩下守护线程,虚拟机就会退出

    未捕获异常的处理器

    4、同步

    两个或以上线程需要共享同一数据的存取,这种情况被称为竞争状态

    锁对象

    pulic class bank{
    
    private Lock bankLock;
    
    public void transfer(int from, int to, double amount) throws InterruptedException
       {
          bankLock.lock();
          try
          {
             while (accounts[from] < amount)
                sufficientFunds.await();
             System.out.print(Thread.currentThread());
             accounts[from] -= amount;
             System.out.printf(" %10.2f from %d to %d", amount, from, to);
             accounts[to] += amount;
             System.out.printf(" Total Balance: %10.2f%n", getTotalBalance());
             sufficientFunds.signalAll();
          }
          finally
          {
             bankLock.unlock(); // 必须放在finnal里
          }
       }
    }
    
    • 这种锁被称为重入(reentrant)锁,线程可以反复获得已拥有的锁。
    • 锁有一个持有计数来跟踪对lock方法的嵌套调用。

    条件对象

    • 使用条件对象来管理那些已经获得了一个锁却不能做有用工作的线程
    public Bank(int n, double initialBalance)
       {
          accounts = new double[n];
          Arrays.fill(accounts, initialBalance);
          bankLock = new ReentrantLock();
          sufficientFunds = bankLock.newCondition(); // 用来表示资金状态
       }
    
    • 调用await()后,线程暂停,放弃锁
    • 当另一个线程完成转账,需要调用signAll(),就会重新激活所有线程
    • sign是随机解锁一个,如果解锁这个没继续sign存在风险

    synchronized关键字

    • 如果一个方法声明有synchronized,那么对象的所将保护整个方法
    • 这样会有一个内部锁生成,而且只有一个关联条件,用wait和notifyall来阻塞和通知
    public synchronized void transfer(int from, int to, double amount) throws InterruptedException
       {
          while (accounts[from] < amount)
             wait();
          System.out.print(Thread.currentThread());
          accounts[from] -= amount;
          System.out.printf(" %10.2f from %d to %d", amount, from, to);
          accounts[to] += amount;
          System.out.printf(" Total Balance: %10.2f%n", getTotalBalance());
          notifyAll();
       }
    
    • 静态方法也可以声明为同步,如果调用这样一个方法,它会获得相关类对象的内部锁,如果锁定,其他线程没有办法调用这个类的任何同步方法
    • 内部锁和条件限制:1、不能中断一个正在尝试获得锁的线程;2、不能指定尝试获得锁时的超时时间;3、只有一个条件

    同步块

    public class Bank{
      private var lock = new Object();
      public void transfer(int from, int to, int amount) {
        synchronized(lock) {
          amount[from] -= amount;
          amount[to] += amount;
        }
      }
    }
    

    volatile

    • 不能提供原子性
    • 编译器和虚拟机就知道该字段可能被另一个线程并发更新

    https://www.cnblogs.com/dolphin0520/p/3920373.html

    原子性

    • java.util.concurrent.atomic包中有很多类使用机器指令保证操作原子性
    • AtomicInteger、LongAdder、LongAccumulator等

    线程局部变量

    • 使用ThreadLocal辅助类为各个线程提供各自的实例,可以用来避免共享变量
    • public static final ThreadLocal<SimpleDateFormat> dateFromat = ThreadLocal.withInital(()->new SimpleDateFormat("yyyy-MM-dd"))

    5、线程安全的集合

    • 提供锁可以保护共享的数据结构,但如果最开始就使用线程安全的集合就更好

    阻塞队列

    • 生产者向队列插入元素,消费者线程则获取元素
    • 当队列已满,或者想从空队列移出元素,阻塞队列将导致线程阻塞
    public class BlockingQueueTest
    {
       private static final int FILE_QUEUE_SIZE = 10;
       private static final int SEARCH_THREADS = 100;
       private static final File DUMMY = new File("");
       private static BlockingQueue<File> queue = new ArrayBlockingQueue<>(FILE_QUEUE_SIZE);
    
       public static void main(String[] args)
       {
          try (Scanner in = new Scanner(System.in)) 
          {
             System.out.print("Enter base directory (e.g. /opt/jdk1.8.0/src): ");
             String directory = in.nextLine();
             System.out.print("Enter keyword (e.g. volatile): ");
             String keyword = in.nextLine();
              
             Runnable enumerator = () -> {
                try
                {
                   enumerate(new File(directory));
                   queue.put(DUMMY);
                }
                catch (InterruptedException e)
                {
                }            
             };
             
             new Thread(enumerator).start();
             for (int i = 1; i <= SEARCH_THREADS; i++) {
                Runnable searcher = () -> {
                   try
                   {
                      boolean done = false;
                      while (!done)
                      {
                         File file = queue.take();
                         if (file == DUMMY)
                         {
                            queue.put(file);
                            done = true;
                         }
                         else search(file, keyword);
                      }
                   }
                   catch (IOException e)
                   {
                      e.printStackTrace();
                   }
                   catch (InterruptedException e)
                   {
                   }               
                };
                new Thread(searcher).start();
             }
          }
       }
       
       /**
        * Recursively enumerates all files in a given directory and its subdirectories.
        * @param directory the directory in which to start
        */
       public static void enumerate(File directory) throws InterruptedException
       {
          File[] files = directory.listFiles();
          for (File file : files)
          {
             if (file.isDirectory()) enumerate(file);
             else queue.put(file);
          }
       }
       
       /**
        * Searches a file for a given keyword and prints all matching lines.
        * @param file the file to search
        * @param keyword the keyword to search for
        */
       public static void search(File file, String keyword) throws IOException
       {
          try (Scanner in = new Scanner(file, "UTF-8"))
          {
             int lineNumber = 0;
             while (in.hasNextLine())
             {
                lineNumber++;
                String line = in.nextLine();
                if (line.contains(keyword)) 
                   System.out.printf("%s:%d:%s%n", file.getPath(), lineNumber, line);
             }
          }
       }
    }
    

    映射、集、队列

    • ConcurrentHashMap、ConcurrentSkipListMap、ConcurrentSkipListSet、ConcurrentLinkedQueue
    • 允许并发地访问数据结构的不同部分尽可能减少竞争

    映射条目原子更新

    • map.compute(word, (k,v)-> v == null ? 1: v+ 1);可以完成原子更新
    • map.merge(word, 1L, Long::sum);

    并发散列映射的批操作

    • 并发散列批操作,即使有其他线程在处理映射,这些操作也可以安全执行。search reduce forEach map.search(threshold, (k,v)->v>1000?v:null);

    并发集视图

    • 并发集视图Set<String> words = ConcurrentHashMap.<String>newKetSet();

    写数组拷贝

    • CopyOnWriteArrayList和CopyOnWriteArraySet是线程安全集合,其中所有更改器会建立底层数组的一个副本

    并行数组算法

    • Arrays.parallelSort可以对一个基本类型值或对象的数组排序
    • Arrays.parallelSetAll用一个函数计算得到的值填充一个数组

    6、任务和线程池

    构造一个新的线程开销比较大,涉及操作系统交互,如果程序中创建了大量生命期短的线程,可以使用线程池。线程池中包含很多准备运行的线程。

    Callable与Future

    • Callable类似Runnable,有返回值,返回的是一个类型为V的对象的异步计算
    public interface Callable<V>{
      V call() throws Exception;
    }
    
    • Future保存异步计算结果,Future对象的所有者在结果计算好之后就可以获得结果
    V get()
    V get(long timeout, TimeUnit unit)
    void cancel(boolean mayInterrupt)
    boolean isCancelled()
    boolean isDone()
    
    Callable<Integer> task = ...;
    var futureTask = new FutureTask<Integer>(task);
    var t = new Thread(futureTask);
    t.start();
    Integer result = task.get();
    

    执行器

    https://blog.csdn.net/suifeng3051/article/details/49443835

    • 执行器类用于构造线程池


      执行器工厂方法
    • 可以通过这些方法将Runnable或Callable提交给ExecutorService:
    Future<T> submit(Callable<T> task)
    Future<?> submit(Runnable task)
    Future<T> submit(Runnable task, T result)
    
    • 使用连接池所做的工作:1、调用Executors类的静态方法newCachedThreadPool\newFixedThreadPool;2、调用submit;3、保存Future对象;4、当不提交任务的时候调用shutdown

    控制任务组

    • shutdownNoe方法可取消所有任务
    • invokeAny方法提交一个Callable对象集合中的所有对象,并返回某个已完成任务的结果,在只获得其中一个结果的情况下可用
    • invokeAll方法提交一个Callable对象集合中的所有对象,这个方法会阻塞,直到所有任务都完成,返回表示所有任务答案的一个Futrue对象列表

    fork-join

    • 可能对每个处理器内核分别使用一个线程,完成计算密集型任务
    if (problemSize < threshold) solve problem
    else {
        break problem into subproblems
        recursively solve each subproblem
       combine the results
    }
    
    class Counter extends RecursiveTask<Integer>
    {
       public static final int THRESHOLD = 1000;
       private double[] values;
       private int from;
       private int to;
       private DoublePredicate filter;
    
       public Counter(double[] values, int from, int to, DoublePredicate filter)
       {
          this.values = values;
          this.from = from;
          this.to = to;
          this.filter = filter;
       }
    
       protected Integer compute()
       {
          if (to - from < THRESHOLD)
          {
             int count = 0;
             for (int i = from; i < to; i++)
             {
                if (filter.test(values[i])) count++;
             }
             return count;
          }
          else
          {
             int mid = (from + to) / 2;
             Counter first = new Counter(values, from, mid, filter);
             Counter second = new Counter(values, mid, to, filter);
             invokeAll(first, second);
             return first.join() + second.join();
          }
       }
    }
    

    7、异步计算

    可完成Future

    • CompletableFuture类实现了Future接口,需要注册一个回调,一旦结果可用,就会在某个线程中利用该结果调用这个回调
    • 通过这种方式,无须阻塞就可以在结果可用时对结果进行处理
    CompletableFuture<String> f = ...;
    f.thenAccept(s -> Process the result string s);
    
    • 要想异步运行任务并得到CompletableFutrue,不要把它直接提交给执行器服务,而应当调用静态方法CompletableFuture.supplyAsync
    public CompletableFuture<String> readPage(URL url) {
      return CompletableFuture.supplyAsync(() -> {
        try {return new String(url.openStream().readAllBytes(), "UTF-8");}
        catch(IOException e) {throw new UncheckedIOException(e);}
      }, excutor);
      })
    }
    
    • 处理CompletableFuture完成:得到一个结果,或者有一个未捕获的异常f.whenComplete(s,t) -> {if (t==null) { process the result s} else {process the Throwable t;}}

    组合可完成Future

    • CompletableFuture类提供了一种机制,将异步任务组合为一个处理管线
    • 返回一个future,结果可用时,对future结果应用f,第二个调用在另一个线程中运行f
    CompletableFuture<U> future.thenApply(f);
    
    • 还有exceptionally等


      为CompletableFuture<T>对象增加一个动作
      组合多个future

    8、进程

    • Process类在一个单独的操作系统进程中执行一个命令,允许我们与标准输入、输出、错误流交互
    • ProcessBuilder类允许我们配置Process对象

    https://www.cnblogs.com/dolphin0520/p/3913517.html

    相关文章

      网友评论

          本文标题:Java核心技术(卷I) 20、并发

          本文链接:https://www.haomeiwen.com/subject/oyesdltx.html