美文网首页
java.util.concurrent概览

java.util.concurrent概览

作者: 传葱 | 来源:发表于2019-02-18 22:00 被阅读0次

前言

  • java.util.concurrent提供了许多提高并发读写性能的工具类,合格的开发者应该能够熟练使用这些开发工具,辅助开发,主要看一下怎么使用的。

核心类

  • Executor
  • ExecutorService
  • ScheduledExecutorService
  • Future
  • CountDownLatch
  • CyclicBarrier
  • Semaphore
  • ThreadFactory
  • BlockingQueue
  • DelayQueue
  • Locks
  • Phaser

Executor

  • 这个接口定义表达一个执行任务的对象
public class InvokeExecutor implements Executor {
    @Override
    public void execute(Runnable command) {
        command.run();
    }

    public static void main(String[] args) {
        executor();
    }

    public static void executor() {
        //替代专门create一个线程
        Executor executor = new InvokeExecutor();
        executor.execute(() -> {
            System.out.println("hello,world!");
        });

        executor.execute(() -> {
            System.out.println("lalal");
        });
    }
}
  • new 一个Executor可以执行多个不同的任务

ExecutorService

  • 这个类一般用于线程池,异步执行线程,内存的queue中存储task,线程调度
public class ExecutorServiceTest {
    ExecutorService executor = Executors.newFixedThreadPool(10);

    public void executor() {
        executor.submit(() -> {
            new Task();
        });

        executor.submit(() -> {
            new Task1();
        });

    }

    public static void main(String[] args) {
        ExecutorServiceTest executorServiceTest = new ExecutorServiceTest();
        executorServiceTest.executor();
    }

}

class Task implements Runnable {

    @Override
    public void run() {

    }
}

class Task1 implements Runnable {

    @Override
    public void run() {

    }
}
  • 具体线程调度交给线程池来解决
  • 看下Termination方法:shutDown(),所有task执行完毕之后关闭线程池,释放资源;shutdownNow(),立即关闭线程池,释放资源,强制所有线程停止。

ScheduledExecutorService

  • 这个和上个ExecutorService很像,但是ScheduledExecutorService会延迟执行任务
public class ScheduleTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ScheduleTest scheduleTest = new ScheduleTest();
        scheduleTest.executor();
    }

    public void executor() throws ExecutionException, InterruptedException {
        ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        Future<String> future = scheduledExecutorService.schedule(() -> {
            return "hello, world!";
        }, 1,  TimeUnit.SECONDS);

        System.out.println(future.get());

        //上一个任务执行完成后delay delay长的时间开始执行下个任务
        scheduledExecutorService.scheduleWithFixedDelay(() -> {
            System.out.println("lalal");
        }, 1, 2 ,TimeUnit.SECONDS);

        //间隔perid开始执行新的任务
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            System.out.println("lalal");
        }, 1, 2 ,TimeUnit.SECONDS);

//      scheduledExecutorService.shutdown();
    }
}
  • scheduledExecutorService.schedule:很好理解,delay长的时间后开始执行任务,执行一次,比如这里是1秒后执行任务;
  • scheduledExecutorService.scheduleAtFixedRate:1秒后执行任务,此后每次间隔两秒执行任务;
  • scheduledExecutorService.scheduleWithFixedDelay:1秒后执行任务,任务执行完成后,间隔两秒执行下一个任务

Future

  • Future一般用来获得异步执行的结果。可以用来判断异步操作是否执行完毕,获取计算的结果。
public void invoke() {
    ExecutorService executorService = Executors.newFixedThreadPool(10);
 
    Future<String> future = executorService.submit(() -> {
        // ...
        Thread.sleep(10000l);
        return "Hello world";
    });
}


if (future.isDone() && !future.isCancelled()) {
    try {
        str = future.get();
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}
  • 看一下用法,isDone,但是没有cancelled,这个时候,可以安全的获取future存储的value。
try {
    future.get(10, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
    e.printStackTrace();
}
  • 上面的用法,可以设置等待的timeOut,超过这个时间,抛出TimeoutException。

CountDownLatch

  • jdk5提供的,可以阻塞一队线程,一直到某个操作发生
  • counter(Integer type);
  • counter不断decrements,一直到减为0,这个时候其它的线程都会释放。
  • 比如一个电子门,上午7点开始,超过8个人开门之后,大门完全打开。

CyclicBarrier

  • 这个工具和CountDownLatch很像,但是CountDownLatch减为0的时候就不能重复使用了;
  • CyclicBarrier可以重复使用,线程之间互相等待,一直到所有的线程准备就绪。
  • 使用:校车需要等所有的小朋友都上车后才会启动。
public class Task implements Runnable {
 
    private CyclicBarrier barrier;
 
    public Task(CyclicBarrier barrier) {
        this.barrier = barrier;
    }
 
    @Override
    public void run() {
        try {
            LOG.info(Thread.currentThread().getName() + 
              " is waiting");
            barrier.await();
            LOG.info(Thread.currentThread().getName() + 
              " is released");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
 
}


public void start() {
 
    CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
        // ...
        LOG.info("All previous tasks are completed");
    });
 
    Thread t1 = new Thread(new Task(cyclicBarrier), "T1"); 
    Thread t2 = new Thread(new Task(cyclicBarrier), "T2"); 
    Thread t3 = new Thread(new Task(cyclicBarrier), "T3"); 
 
    if (!cyclicBarrier.isBroken()) { 
        t1.start(); 
        t2.start(); 
        t3.start(); 
    }
}
  • T1, T2, T3互相等待,CyclicBarrier初始化为0,满足条件需要增长到3,每当一个线程await,增1。
  • cyclicBarrier.isBroken():所有线程都进入等待状态.

相关文章

网友评论

      本文标题:java.util.concurrent概览

      本文链接:https://www.haomeiwen.com/subject/urlreqtx.html