美文网首页
JAVA并发编程

JAVA并发编程

作者: hongzhenw | 来源:发表于2021-01-26 17:59 被阅读0次

[TOC]

Semaphore

此类的主要作用就是限制线程并发的数量,如果不限制并发的数量,则CPU的资源很快就被耗尽,每个线程执行的任务是相当缓慢,因为CPU要把时间片分配给不同的线程对象,而且上下文切换也要耗时,最终造成系统运行效率大幅降低,所以限制并发线程的数量还是非常有必要的。

1,类Smaphore的构造函数参数permits是许可的意思,代表同一时间内,最多允许多少个线程同时执行acquire()和release()之间的代码。

Semaphore semaphore = new Semaphore(1);

2,无参方法acquire()的作用是使用1个许可,是减法操作。

semaphore.acquire()

3,类Smaphore的构造函数参数permits>1时,该类并不能保证线程的安全性,因为还有可能出现多个线程共同访问实例变量,导致出现脏数据的情况。

Semaphore semaphore = new Semaphore(2);

4,有参方法acquire(int permits)的作用是每调用1次此方法,就是用x个许可。代码中有10个许可,每次执行semaphore.acquire(2),代码耗时掉2个,所以10/2=5,说明同一时间只有5个线程允许执行acquire()和release之间的代码。

Semaphore semaphore = new Semaphore(10);
semaphore.acquire(2);

5,多次调用Semaphore类的release()和release(int)方法时,还可以动态增加permits的个数。

private static void test2() {
        Semaphore semaphore = new Semaphore(5);
        try {
            semaphore.acquire();
            semaphore.acquire();
            semaphore.acquire();
            semaphore.acquire();
            semaphore.acquire();
            System.out.println(semaphore.availablePermits());
            semaphore.release();
            semaphore.release();
            semaphore.release();
            semaphore.release();
            semaphore.release();
            semaphore.release();
            System.out.println(semaphore.availablePermits());
            semaphore.release(4);
            System.out.println(semaphore.availablePermits());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

运行结果:

0
6
10

此实验说明构造参数new Semaphore(5)中的5并不是最终的许可数量,仅仅是初始的状态值。
6,方法acquireUninterruptibly()的作用是使等待进入的acquire()方法的线程,不允许被中断。

semaphore.acquireUninterruptibly();

7,availablePermits()返回此Semaphore对象中当前可用的许可数,此方法通常用于调试,因为许可的数量有可能实时在改变,并不是固定的数量。
8,drainPermits()可获取并返回立即可用的所有许可个数,并且将可用许可置0。
9,方法getQueueLength()的作用是取得等待许可的线程个数。

System.out.println("还有大约"+semaphore.getQueuelength()+"个线程在等待);

10,方法hasQueuedThreads()的作用是判断有没有线程在等待这个许可。

System.out.println("是否有线程正在等待信号量呢?"+semaphore.hasQueuedThreads());

11,公平信号量是获得锁的顺序与线程启动的顺序有关,也就是先启动的线程优先获得许可,但不代表100%地获得信号量,仅仅是在概率上能得到保证。

Semaphore semaphore = new Semaphore(1,true)

12,无参方法tryAcquire()的作用是尝试的获取1个许可,如果获取不到则返回false,此方法通常与if语句结合使用,其具有无阻塞的特点。无阻塞的特点可以使线程不至于在同步处一直持续等待的状态,如果if语句不成立,则线程会继续走else语句,程序会继续向下运行。

semaphore.tryAcquire();

13,有参方法tryAcquire(int permits)的作用是尝试的获得x个许可,如果获取不到则返回false。

semaphore.tryAcquire(3);

14,有参方法tryAcquire(long timeout,TimeUnit unit)的作用是在指定的时间内尝试的获得1个许可,如果获取不到则返回false。

semaphore.tryAcquire(3,TimeUnit.SECONDS);

15,有参方法tryAcquire(int permits,long timeout,TimeUnit unit)的作用是在指定的时间内尝试的获得x个许可,如果获取不到则返回false。

semaphore.tryAcquire(3,3,TimeUnit.SECONDS);

Exchanger

此类可以使2个线程之间传输数据,它比生产者/消费者模式使用的wait/notify要更加的方便。

1,类Exchanger中的exchange()方法具有阻塞的特点,也就是此方法被调用后等待其它线程来取得数据,如果没有其它线程取得数据,则一直阻塞等待。

Exchanger<String> exchanger = new Exchanger<>();
exchanger.exchange("中国人A");

2,方法exchange()传递数据

Exchanger<String> exchanger = new Exchanger<>();
ThreadA a = new ThreadA(exchanger);
ThreadB b = new ThreadB(exchanger);
a.start();
b.start();

3,方法exchange(V x, long timeout,TimeUnit unit)在指定的时间内没有其它线程获取数据,则出现超时异常。

exchanger.exchange("中国人A",5,TimeUnit.SECONDS);

CountDownLatch

该类使用效果是给定一个计数,当使用这个CountDownLatch类的线程判断计数不为0时,则呈wait状态,如果为0时则继续运行。

实现等待与继续运行的效果分别需要使用await()和countDown()方法来进行。调用await()方法时判断计数是否为0,如果不为0则呈等待状态。其它线程可以调用countDown()方法将计数减1,当计数减到为0时,呈等待的线程继续运行。而方法getCount()就是获得当前的计数个数。

1,方法await()时呈等待状态。

CountDownLatch down = new CountDownLatch(1);
down.await();

2,方法await(long timeout,TimeUnit unit)的作用使线程在指定的最大时间单位内进入waiting状态,如果超过这个时间则自动唤醒,程序继续向下运行。

CountDownLatch down = new CountDownLatch(1);
down.await(3,TimeUnit.SECONDS);

3,方法getCount()获取当前计数的值

CountDownLatch latch = new CountDownLatch(3);
System.err.println("a:" + latch.getCount());
latch.countDown();
System.err.println("b:" + latch.getCount());
latch.countDown();
System.err.println("c:" + latch.getCount());
latch.countDown();
System.err.println("d:" + latch.getCount());
latch.countDown();
System.err.println("e:" + latch.getCount());
latch.countDown();
System.err.println("f:" + latch.getCount());

运行结果:

a:3
b:2
c:1
d:0
e:0
f:0

CyclicBarrier

不仅有CountDownLatch所具有的功能,还可以实现屏障等待的功能,也就是阶段性同步,它在使用上的意义在于可以循环地实现线程要一起做任务的目标,而不是像类CountDownLatch一样,仅仅支持一次线程与同步点阻塞的特性。

该类允许一组线程互相等待,直到到达某个公共屏障点,这些线程必须实时地互相等待,这种情况下就可以使用CyclicBarrier类的来方便地实现这样的功能。CyclicBarrier类的功能屏障点可以重用,所以类的名称中有“cyclic循环”的单词。

CyclicBarrier与CountDownLatch在功能上有些相似,但在细节上还是有一些区别。

  • CountDownLatch作用:一个线程或多个线程,等待另外一个线程或多个线程完成某个事情之后才能继续执行。
  • CyclicBarrier的作用:多个线程之间相互等待,任何一个线程完成之前,所有的线程都必须等待,所以对于CyclicBarrier来说,重点是“多个线程之间”任何一个线程没有完成任务,则所有的线程都必须等待。

1,方法getNumberWaiting()的作用是获得有几个线程已经到达屏障点。

CyclicBarrier barrier = new CyclicBarrier(3);

H03Thread d1 = new H03Thread(barrier);
d1.start();
Thread.sleep(500);
System.err.println(barrier.getNumberWaiting());

H03Thread d2 = new H03Thread(barrier);
d2.start();
Thread.sleep(500);
System.err.println(barrier.getNumberWaiting());

H03Thread d3 = new H03Thread(barrier);
d3.start();
Thread.sleep(500);
System.err.println(barrier.getNumberWaiting());

H03Thread d4 = new H03Thread(barrier);
d4.start();
Thread.sleep(500);
System.err.println(barrier.getNumberWaiting());

H03Thread d5 = new H03Thread(barrier);
d5.start();
Thread.sleep(500);
System.err.println(barrier.getNumberWaiting());

H03Thread d6 = new H03Thread(barrier);
d6.start();
Thread.sleep(500);
System.err.println(barrier.getNumberWaiting());

运行结果:

1
2
0
1
2
0

2,方法isBroken()查询此屏障是否处于损坏状态。

try {
    if (Thread.currentThread().getName().equals("Thread-2")) {
        Thread.sleep(5000);
        Thread.currentThread().interrupt();
    }
    barrier.await();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (BrokenBarrierException e) {
    e.printStackTrace();
}

其中Thread-2线程进入了InterruptedException的catch语句块,其它线程进入BrokenBarrierException的catch语句块。

类CycicBarrier对于线程的中断interrupte处理会使用全有或全无的破坏模型,意思是如果有一个线程由于中断或超时提前离开了屏障点,其它所有在屏障点等待的线程也会抛出BrokenBarrierException或InterruptedException异常,并且离开屏障点。

3,方法await(long timeout,TimeUnit unit)的功能是如果在指定的时间内达到parties的数量,则程序继续向下运行,否则如果出现超时,则抛出TimeOutException异常。

barrier.await(3 ,TimeUnit.SECONDS);

4,方法getParties()的作用取得parties个数。

System.err.println("屏障个数:" + barrier.getParties());
System.err.println("在屏障处等待的个数:" + barrier.getNumberWaiting());

5,方法reset()的作用是重置屏障。

CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() {
    @Override
    public void run() {
        System.err.println("全都到了");
    }
});
ThreadA ta = new ThreadA(barrier);
ThreadB tb = new ThreadB(barrier);
ta.start();
tb.start();

barrier.reset();

由于parties是3个,因此屏障被重置后,2个线程出现Broken异常。

Phaser

通过使用CyclicBarrier类解决了CountDownLatch类的种种缺点,但不可否认的是,CyclicBarrier类还是有一些自身上的缺陷,比如不可以动态添加parties计数,调用一次await()方法仅占用1个parties计数,所以在JDK1.7中新增加了一个名称为Phase的类来解决这样的问题。

1,方法arriveAndawaitAdvance()的作用与CountDownLatch类中的await()方法大体一样,通过从方法的名称解释来看,arrive是到达的意思,wait是等待的意思,而advance是前进、促进的意思,所以执行这个方法的作用就是当前线程已经到达屏障,在此等待一段时间,等条件满足后继续向下一个屏障继续执行。

Phaser phaser = new Phaser(2);
phaser.arriveAndAwaitAdvance();

2,方法arriveAndDeregister()的作用是使当前线程退出,并且使parties值减1。

Phaser phaser = new Phaser(2);
phaser.arriveAndDeregister();

3,方法getPhase()获取的是已经到达第几个屏障。

System.err.println(phaser.getPhase());

4,方法onAdvance()的作用是通过新的屏障时被调用。

Phaser phaser = new Phaser(2) {
    @Override
    protected boolean onAdvance(int phase, int registeredParties) {
        System.err.println("main phase:" + phase + " registeredParties:" + registeredParties);
        return true;
        // 返回true不等待了,Phaser呈无效/销毁的状态
        // 返回false则Phaser继续工作
    }
};

5,方法getRegisteredPharties()获得注册的parties数量。每次执行一次方法register()就动态添加一个parties值。

Phaser phaser = new Phaser(5);
System.err.println("a:" + phaser.getRegisteredParties());

phaser.register();
System.err.println("b:" + phaser.getRegisteredParties());

phaser.register();
System.err.println("c:" + phaser.getRegisteredParties());

phaser.register();
System.err.println("d:" + phaser.getRegisteredParties());

运行结果:

a:5
b:6
c:7
d:8

6,方法bulkRegister()可以批量增加parties数量。

Phaser phaser = new Phaser(10);
System.err.println("a:" + phaser.getRegisteredParties());

phaser.bulkRegister(10);
System.err.println("b:" + phaser.getRegisteredParties());

phaser.bulkRegister(10);
System.err.println("c:" + phaser.getRegisteredParties());

phaser.bulkRegister(10);
System.err.println("d:" + phaser.getRegisteredParties());

运行结果:

a:10
b:20
c:30
d:40

7,方法getArrivedParties()获得已经被使用的parties个数。方法getUnarrivedParties()获得未被使用的parties个数。

System.err.println("已到达:" + phaser.getArrivedParties());
System.err.println("未到达:" + phaser.getUnarrivedParties());

8,方法arrive()的作用是使parties值加1,并且不在屏障处等待,直接向下面的代码继续运行,并且Phaser类有计数重置功能。

Phaser phaser = new Phaser(2) {
    @Override
    protected boolean onAdvance(int phase, int registeredParties) {
        System.err.println(phase + " " + registeredParties);
        return super.onAdvance(phase, registeredParties);
    }
};
System.err.println(phaser.getPhase() + " =a= " + phaser.getArrivedParties());
phaser.arrive();
System.err.println(phaser.getPhase() + " =b= " + phaser.getArrivedParties());
phaser.arrive();
System.err.println(phaser.getPhase() + " =c= " + phaser.getArrivedParties());
phaser.arrive();
System.err.println(phaser.getPhase() + " =d= " + phaser.getArrivedParties());
phaser.arrive();
System.err.println(phaser.getPhase() + " =e= " + phaser.getArrivedParties());
phaser.arrive();
System.err.println(phaser.getPhase() + " =f= " + phaser.getArrivedParties());
phaser.arrive();
System.err.println(phaser.getPhase() + " =g= " + phaser.getArrivedParties());

运行结果:

0 =a= 0
0 =b= 1
0 2
1 =c= 0
1 =d= 1
1 2
2 =e= 0
2 =f= 1
2 2
3 =g= 0

方法arrive()的功能是使getArrivedParties()计数加1,不等待其它线程到达屏障。在控制台中多次出现getArrivedParties()=0的运行结果,所以可以分析出Phaser类在到达屏障点后计数能被重置。

9,方法awaitAdvance(int Phase)的作用是:如果传入参数phase值和当前getPhase()方法返回值一样,则在屏障处等待,否则继续向下面运行,有些类似于旁观者的作用,当观察的条件满足了就等待(旁观),如果条件不满足,则程序向下继续运行。

phaser.awaitAdvance(1); // 跨栏的栏数,并不参与parties计数的操作,仅仅具有判断的功能

10,方法awaitAdvanceInterruptibly(int)是可中断的。

try {
    phaser.awaitAdvanceInterruptibly(0);
} catch (InterruptedException e) {
    System.err.println(e.toString());
}

控制台出现异常,线程被中断了。注意:当线程执行的跨栏数不符合指定的参数值时,则继续执行下面的代码。

11,方法awaitAdvanceInterruptibly(int, long,TimeUnit)的作用是在指定的跨栏数等待最大的单位时间,如果在指定的时间内,栏数未变,则出现异常,否则继续向下运行。

phaser.awaitAdvanceInterruptibly(0, 5, TimeUnit.SECONDS);

若5秒之后phaser阶段值没有发生改变,则会出现异常。

12,方法forceTermination()使Phaser对象的屏障功能失效,而方法isTerminated()是判断Phaser对象是否已经呈销毁状态。

phaser.forceTermination();

注意:CyclicBarrier类的reset()方法执行时却出现异常。

Executors

接口Executor仅仅是一种规范,是一种声明,是一种定义,并没有实现任何的功能,所以大多数情况下,需要使用接口的实现类来完成指定的功能。

1,使用newCachedThreadPool()方法创建的是无边界线程池,可以进行线程自动回收。所谓的“无界线程池”就是池中存放线程个数是理论上的Integer.MAX_VALUE最大值。

ExecutorService pool = Executors.newCachedThreadPool();
        pool.execute(() -> {
    System.err.println(Thread.currentThread() + " a1 " + System.currentTimeMillis());
    try {
        Thread.sleep(1000);
        System.err.println("a");
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.err.println(Thread.currentThread() + " a2 " + System.currentTimeMillis());
});

运行结果:

Thread[pool-1-thread-1,5,main] a1 1611660061705
a
Thread[pool-1-thread-1,5,main] a2 1611660062708

2,方法newCachedThreadPool(ThreadFactory),池中的Thread类可以自行定制。

public class E02Factory implements ThreadFactory {
    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r);
        thread.setName("定制的线程名称:");
        return thread;
    }
}

3,方法newFixedThreadPool(int)创建的是有界线程池,也就是池中的线程个数可以指定最大数量。

ExecutorService pool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 5; i++) {
    pool.execute(new E03Thread(i));
}
for (int i = 0; i < 3; i++) {
    pool.execute(new E03Thread(i));
}

4,方法newFixedThreadPool(int,ThreadFactory)定制线程工厂

5,方法newSingleThreadExecutor()可以创建单一线程池,单一线程池可以实现以队列的方式来执行任务。

ExecutorService executor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
    executor.execute(new E05Thread(i));
}

6,方法newSingleThreadExecutor(ThreadFactory)定制线程工厂

ThreadPoolExector

类ThreadPoolExector可以非常方便的创建线程池对象,而不需要程序员设计大量的new实例化Thread相关代码。

1,类ThreadPoolExector最常使用的构造方法是:

ThreadPoolExector(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue)

参数解释如下:

  • corePoolSize:池中所保存的线程数,包括空闲线程,也就是核心池的大小。
  • maximumPoolSize:池中允许的最大线程数。
  • keepAliveTime:当线程数量大于corePoolSize值时,在没有超过指定的时间内是不从线程池中将空闲线程删除的,如果超过此时间单位,则删除。
  • unit:keepAliveTime参数的时间单位
  • workQueue:执行前用于保持任务的队列,此队列仅保持由execute方法提交的Runnable任务。

为了更好地理解这些参数在使用上的一些关系,可以将他们进行详细话的注释:

  • A代表execute(runnable)欲之行的runnable的数量
  • B代表corePoolSize
  • C代表maximumPoolSize
  • D代表A-B(假设A>=B)
  • E代表new LinkedBlockingDeque()队列,无构造参数
  • F代表SynchronousQueue队列
  • G代表keepAliveTime

构造方法中5个参数之间都有使用上的关系,在使用线程池的过程中大部分会出现如下5种过程:

  • 如果A<=B,那么马上创建线程运行这个任务,并不放入扩展队列Queue中,其他功能参数忽略
  • 如果A>B&&A<=C&&E,则C和G参数忽略,并把D放入E中等待被执行
  • 如果A>B&&A<=C&&F,则C和G参数有效,并且马上创建线程运行这些任务,而不把D放入F中,D执行完任务后在指定时间后发生超时时将D进行清除。
  • 如果A>B&&A>C&&E,则C和G参数忽略,并把D放入到E中等待被执行
  • 如果A>B&&A>C&&F,则处理C的任务,其他任务则不再处理抛出异常。
  • BlockingQueue只是一个接口,常用的实现类有LinkedBlockingQueue和ArrayBlockingQueue。用LinkedBlockingQueue的好处在于没有大小限制,优点是队列容量非常大,所以执行execute()不会抛出异常,而线程池中运行的线程数也永远不会超过corePoolSize值,因为其他多余的线程被放入LinkedBlockingQueue队列中,keepAliveTime参数也就没有意义了。

    当线程数量大于corePoolSize值时,在没有超过指定的时间内是不从线程池中将空闲线程删除的,如果超过此时间单位,则删除。如果为0则任务执行完毕后立即从队列中删除。

    2,方法getCorePoolSize()获取线程池中保存的core线程数。

System.err.println(executor.getCorePoolSize())

3,方法getMaximumPoolSize()获取线程池中最大的线程数。

System.err.println(executor.getMaximumPoolSize())

4,方法shutdown()的作用是使当前未执行完的线程继续执行,而不再添加新的任务task,还有shutdown方法不会阻塞,调用后,主线程main就马上结束了,而线程池会继续运行直到所有任务(正在执行和队列中的任务)执行完才会停止。如果不调用shutdown()方法,那么线程池会一直保持下去,以便随时执行被添加的新task任务。

executor.shutdown();

5,方法shutdownNow()的作用是中断所有的任务task,并且抛出InterruptedException异常,前提是在Runnable中使用if(Thread.currentThread().isInterrupted())语句来判断当前线程的中断状态,而未执行的线程不再执行,也就是从执行队列中清除。如果没if(Thread.currentThread().isInterrupted())语句及抛出异常的代码,则池中正在运行的线程直到执行完毕,而未执行的线程不再执行,也从执行队列中清除。

executor.shutdownNow();

6,方法isShutdown()的作用是判断线程池是否已经关闭。

System.err.println(executor.isShutdown());

备注:只要调用shutdown()方法,isShutdown()方法的返回值就是true。
7,如果正在执行的程序处于shuwdown或shutdownNow之后处于正在终止,但尚未完全终止的过程中,调用方法isTerminating()则返回true。此方法可以比喻成,门是否正在关闭,门彻底关闭时,线程池也就关闭了。

System.err.println(executor.isTerminating());

8,如果线程池关闭后,也就是所有任务都已完成,则方法isTerminated()返回true。此方法可以比喻成,门是否已经关闭。

System.err.println(executor.isTerminated());

备注:方法shutdown()或shutdownNow()的功能是发出一个关闭大门的命令,方法isShutdown()是判断这个关闭大门的命令发出或未发出。方法isTerminaing()代表大门是否正在关闭进行中,而isTerminated()方法判断大门是否已经关闭了。

9,方法awaitTermination(long timeout,TimeUnit unit)的作用就是查看在指定的时间之间,线程池是否已经终止工作,也就是最多等待多少时间后去判断线程池是否已经终止工作。此方法需要有shutdown()方法的配合。

executor.shutdonw();
executor.awaitTermination(10,TimeUnit.SECONDS);
// 最多等待10秒,也就是阻塞10秒

备注:方法awaitTermination()被执行时,如果池中有任务在被执行时,则调用awaitTermination()方法出现阻塞,等待指定的时间,如果没有任务时则不再阻塞。如果awaitTermination()正在阻塞的过程中任务执行完毕,则awaitTermination()取消阻塞继续执行后面的代码。

10,方法ThreadPoolExector(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory factory)定制线程工厂。除了自定义ThreadFactory外,还可以使用setThreadFactory()方法来设置自定义ThreadFactory。

executor.setThreadFactory

11,在使用自定义线程工厂时,线程如果出现异常完全可以自定义处理的。

public class L06Factory implements ThreadFactory {
    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r);
        thread.setName("定制名称");
        thread.setUncaughtExceptionHandler((t, e) -> {
            System.err.println("自定义处理异常启用:" + t.getId());
            e.printStackTrace();
        });
        return thread;
    }
}

12,方法setRejectedExecutionHandler()和getRejectedExecutionHandler()的作用是可以处理任务被拒绝执行时的行为。

public class L07RejectedExecutionHandler implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        L07Thread thread = (L07Thread) r;
        System.err.println(thread.getName() + " 被拒绝了");
        executor.shutdown();
    }
}

13,方法allowsCoreThreadTimeOut()和allowsCoreThreadTimeOut(boolean value)的作用是配置核心线程是否有超时效果。

executor.allowCoreThreadTimeOut(true);

14,方法prestartCoreThread()每调用一次就创建一个核心线程,
返回值为boolean,含义是是否启动了。

ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
System.err.println("a1 getPoolSize:" + executor.getPoolSize());

System.err.println("b1:" + executor.prestartCoreThread());
System.err.println("a2 getPoolSize:" + executor.getPoolSize());

System.err.println("b2:" + executor.prestartCoreThread());
System.err.println("a3 getPoolSize:" + executor.getPoolSize());

System.err.println("b3:" + executor.prestartCoreThread());//无效代码
System.err.println("b4:" + executor.prestartCoreThread());//无效代码
System.err.println("b5:" + executor.prestartCoreThread());//无效代码

System.err.println("a4 getPoolSize:" + executor.getPoolSize());
executor.shutdown();

运行结果:

a1 getPoolSize:0
b1:true
a2 getPoolSize:1
b2:true
a3 getPoolSize:2
b3:false
b4:false
b5:false
a4 getPoolSize:2

15,方法prestartAllCoreThread()作用是启动全部核心线程,返回值是启动核心线程的数量。

ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
System.err.println("a1 getPoolSize:" + executor.getPoolSize());
System.err.println(executor.prestartAllCoreThreads());
System.err.println(executor.prestartAllCoreThreads());//再次启动无效
System.err.println(executor.prestartAllCoreThreads());//再次启动无效
System.err.println("a2 getPoolSize:" + executor.getPoolSize());
System.err.println("a3 getPoolSize:" + executor.getPoolSize());
executor.shutdown();

运行结果:

a1 getPoolSize:0
2
0
0
a2 getPoolSize:2
a3 getPoolSize:2

16,方法getCompletedTaskCount()的作用是取得已经执行完成的任务数。

System.err.println(executor.getCompletedTaskCount());

17,线程池ThreadPoolExecutor的拒绝策略。线程池中的资源全部被占用的时候,对新添加的task任务有不同的处理策略,在默认的情况下,ThreadPoolExceutor类中有4种不同的处理方法是:

  • AbortPolicy:当任务添加到线程池中被拒绝时,它将抛出RejectedExecutionException异常。
  • CallerRunsPolicy:当任务添加到线程池中被拒绝时,会使用调用线程池的Thread线程对象处理被拒绝的任务。
  • DiscardOldestPolicy:当任务添加到线程池中被拒绝时,线程池会放弃等待队列中最旧的未处理的任务,然后将拒绝的任务添加到等待队列中。
  • DiscardPolicy:当任务添加到线程池中被拒绝时,线程池讲丢弃被拒绝的任务。
  • 18,在线程池ThreadPoolExecutor类中重写方法afterExecute()和beforeExecute()可以对线程池中执行的线程对象实现监控。

public class L08ThreadPoolExecutor extends ThreadPoolExecutor {

    public L08ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        System.err.println(((L08Thread) r).getName() + " 准备执行");
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        System.err.println(((L08Thread) r).getName() + " 执行完毕");
    }
}

19,方法remove(Runnable)可以删除尚未被执行的Runnable任务。

executor.remove(runnable);

使用execute()方法,未运行的任务,可以被remove()方法成功删除,但使用submit()方法提交的任务未被执行时,remove方法却不能删除此任务。

20,多个get方法的测试

  • 方法getActiveCount()的作用是取得有多少个线程正在执行任务。
  • 方法getPoolSize()的作用是获得当前线程池里面有多少个线程,这些线程数包括正在执行的任务的线程,也包括正在休眠的线程。
  • 方法getCompletedTaskCount()的作用是取得有多少个线程已经执行完任务了。
  • 方法getCorePoolSize()的作用是取得构造方法传入的corePoolSize参数值。
  • 方法getMaximumPoolSize()的作用是取得构造方法传入的maximumPoolSize参数值。
  • 方法getTaskCount()的作用是取得有多少个任务发送给了线程池。
  • Future

    在默认情况下,线程Thread对象不具有返回值的功能,如果在需要取得返回值的情况下是极为不方便的,但在java1.5的并发包中可以使用Future和Callable来使线程具有返回值的功能。

    接口Callable和线程功能密不可分,但和Runnable的主要区别为:

    • Callable接口的call()方法可以有返回值,而Runnable接口的run()方法没有返回值。
    • Callable接口的call()方法可以声明抛出异常,而Runnable接口的run()方法不可以声明抛出异常。

    执行完Callable接口中的任务后,返回值是通过Future接口进行获得的。

    1,方法get()用于获得返回值,方法submit(Callable<T>)可以执行参数为Callable的任务。

ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
B01Callable callable = new B01Callable(100);
Future<String> future = executor.submit(callable);
String str = future.get();
System.err.println(str);

2,方法submit()不仅可以传入Callable对象,也可以传入Runnable对象,说明submit()方法支持有返回和无返回值的功能。方法get()具有阻塞特性,而isDone()方法无阻塞特性。

ExecutorService executor = Executors.newCachedThreadPool();
Future<?> future = executor.submit(runnable);
System.err.println(future.get());
System.err.println(future.isDone());

运行结果:

null
true

3,方法submit(Runnable,T result)的第2个参数result可以作为执行结果的返回值,而不需要使用get()方法来进行获得。

ExecutorService pool = Executors.newCachedThreadPool();
Future<UserInfo> future = pool.submit(new B02Thread(userInfo), userInfo);

UserInfo info = future.get();
System.err.println(info.getUsername() + " " + info.getPassword());

4,方法cancel(boolean mayInterruptIfRunning)的参数mayInterruptIfRunning的作用是:如果线程正在运行则是否中断正在运行的线程,在代码中需要使用if(Thread.currentThread().isInterrupted())进行配合。方法isCancelled()的返回值代表发送取消任务的命令是否成功完成。

ExecutorService executor = Executors.newCachedThreadPool();
Future future = executor.submit(new B03Thread());

String str = (String) future.get();
System.err.println(str);

System.err.println(future.cancel(true) + " " + future.isCancelled());

运行结果:

I'm god
false false

备注:从打印结果看,线程任务已经运行完毕,线程对象已经销毁,所以方法cacel()返回值是false,代表发送取消的命令没有成功。

5,方法get(long timeout,TimeUnit uni)的作用是在指定的最大时间内等待获得返回值。

future.get(5,TimeUnit.SECNONDS);

备注:若超时,则会抛出TimeoutException

6,接口RejectedExecutionHandler的主要作用是当线程池关闭后依然有任务要执行时,可以实现一些处理。

public class B07RejectedExecutorException implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        B07Thread thread = (B07Thread) r;
        System.err.println(thread.getName() + " 被拒绝了");
    }
}

ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
executor.setRejectedExecutionHandler(new B07RejectedExecutorException());

6,方法execute()与submit()的区别

  • 方法execute()没有返回值,而submit()方法可以有返回值
  • 方法execute()在默认的情况下异常直接抛出,不能捕获,但可以通过自定义ThreadFactory的方式进行捕获,而submit()方法在默认的情况下,可以catch ExecutionException捕获异常。

7,Future的缺点:

  • 调用get()方法取得处理的结果值时具有阻塞性
  • 主线程并不能保证首先获得是最先完成任务的返回值

CompletionService

由于Future接口的缺点,JDK1.5提供了CompletionService接口可以解决这个问题。

接口CompletionService的功能是以异步的方式一边生产新的任务,一边处理已完成任务的结果,这样可以将执行任务与处理任务分离开来进行处理。使用submit执行任务,使用take取得已完成的任务,并按照完成这些任务的时间顺序处理它们的结果。

1,方法take()的作用是取得最先完成任务的Future对象,谁执行时间最短谁最先返回。

ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
ExecutorCompletionService service = new ExecutorCompletionService<>(executor);

List<K01Thread> lists = new ArrayList<>();
K01Thread a1 = new K01Thread("a1", 5000);
lists.add(a1);
K01Thread a2 = new K01Thread("a2", 4000);
lists.add(a2);
K01Thread a3 = new K01Thread("a3", 3000);
lists.add(a3);
K01Thread a4 = new K01Thread("a4", 2000);
lists.add(a4);
K01Thread a5 = new K01Thread("a5", 1000);
lists.add(a5);

for (int i = 0; i < executor.getCorePoolSize(); i++) {
    K01Thread thread = lists.get(i);
    service.submit(thread);
}

for (int i = 0; i < executor.getCorePoolSize(); i++) {
    System.err.println("打印第" + (i + 1) + "返回值");
    System.err.println(service.take().get());
}

运行结果:

打印第1返回值
返回:a5
打印第2返回值
返回:a4
打印第3返回值
返回:a3
打印第4返回值
返回:a2
打印第5返回值
返回:a1

从打印结果来看,CompletionService完全解决了Future阻塞的特性,也就是使用CompletionService接口后,哪个任务先执行完,哪个任务的返回值就先打印。在CompletionService接口中如果当前没有任务被执行完,则service.take().get()方法还是呈阻塞特性。

2,方法poll()的作用是获取并移除表示下一个已完成任务的Future,如果不存在这样的任务,则返回null,方法poll()无阻塞的效果。

ExecutorService pool = Executors.newCachedThreadPool();
ExecutorCompletionService<String> service = new ExecutorCompletionService<>(pool);

service.submit(() -> {
    Thread.sleep(3000);
    System.err.println("3s过去了");
    return "hello world";
});

System.err.println(service.poll());

运行结果:

null
3s过去了

从运行结果来看,方法poll()返回的Future为null,因为当前没有任何已完成任务的Future对象,所以返回为null,通过此誓言证明poll()方法不像take()方法具有阻塞效果。

3,方法Future<V> poll(long timeout,TimeUnit unit)的作用是等待指定的timeout时间,在timeout时间之内获取到值时立即向下继续执行,如果超时也立即向下执行。

ExecutorService pool = Executors.newCachedThreadPool();
ExecutorCompletionService<String> service = new ExecutorCompletionService<>(pool);

service.submit(new K02ThreadA());
service.submit(new K02ThreadB());

for (int i = 0; i < 2; i++) {
    System.err.println("main:" + service.poll(6, TimeUnit.SECONDS).get() + " " + System.currentTimeMillis());
}
System.err.println("main finish");
pool.shutdown();

运行结果:

K02ThreadA:1611888770613
K02ThreadB:1611888770614
main:K02ThreadB 1611888771616
main:K02ThreadA 1611888775616
main finish

返回两个值,因为一共等待了12秒。

4,方法Future<V> submit(Runnable task,V result),参数V是submit()方法的返回值。

ExecutorService pool = Executors.newCachedThreadPool();
ExecutorCompletionService<UserInfo> service = new ExecutorCompletionService<>(pool);

UserInfo info = new UserInfo();
Future<UserInfo> future = service.submit(new K04Thread(info), info);
System.err.println(future.get().getUsername() + " " + future.get().getPassword());

运行结果:

hongzhenw 123456

ExecutorService

1,方法invokeAny()取得第一个完成任务的结果值,当第一个任务执行完成后,会调用interrupte()方法将其他任务中断,所以在这些任务中可以结合if(Thread.currentThread().isInterrupted())代码来决定任务是否继续运行。具有阻塞特性。

2,方法invokeAll()等全部线程任务执行完毕后,取得全部完成任务的结果值。具有阻塞特性。

3,方法invokeAny(Collection tasks, long tiemout,TimeUnit unit)作用是在指定时间内取得第一个先执行完成任务的结果值。

4,方法invokeAll(Collection tasks, long timeout,TimeUnit unit)的作用是全部任务在指定的时间内没有完成,则出现异常。

ScheduledExecutorService

Java中的计划任务Timer工具类提供了以计时器或计划任务的功能来实现按指定时间或时间间隔执行任务,但由于Timer工具类并不是以池pool,而是以队列的方式来管理线程的,所以在高并发的情况下运行效率较低,在新版JDK中提供了ScheduleExectorService对象来解决效率与定时任务的功能。

类ScheduleExectorService的主要作用就是可以将定时任务与线程池功能结合使用。

1,使用Callable接口进行任务延迟运行的实验,具有返回值的功能。

// 调用方法newSingleThreadScheduledExecutor取得一个单任务的计划任务执行池
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture<String> scheduleA = executor.schedule(new M01TheadA(), 5, TimeUnit.SECONDS);
ScheduledFuture<String> scheduleB = executor.schedule(new M01TheadB(), 5, TimeUnit.SECONDS);

System.err.println("main1:" + System.currentTimeMillis());
System.err.println("scheduleA:" + scheduleA.get());
System.err.println("scheduleB:" + scheduleB.get());
System.err.println("main2:" + System.currentTimeMillis());

运行结果:

main1:1612164877772
a1:1612164882775
a2:1612164885777
scheduleA:return a
b1:1612164885777
b2:1612164885777
scheduleB:return b
main2:1612164885778

schedule(callable, delay,timeunit)方法中的第2个参数在多个任务中同时消耗时间,并不是一个任务执行完毕后再等待4秒继续执行的效果。

2,使用Runnable接口进行无返回值计划任务实验。

3,使用scheduleAtFixedRate()方法实现周期性执行。

public class M05Thread implements Runnable {

    @Override
    public void run() {
        System.err.println("m05:" + System.currentTimeMillis());
    }
}

ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(1);
pool.scheduleAtFixedRate(new M05Thread(), 1, 2, TimeUnit.SECONDS);

运行结果:

m05:1612168682130
m05:1612168684130
m05:1612168686129
m05:1612168688129
m05:1612168690131
……

注意,scheduleAtFixedRate()方法返回的ScheduledFuture对象无法获得返回值,也就是scheduleAtFixedRate()方法不具有获得返回值的功能,而schedule()方法却可以获得返回值。所以当使用scheduleAtFixedRate()方法实现重复运行任务的效果时,需要结合自定义Runnable接口的实现类,不要使用FutureTask类,因为FutureTask类并不能实现重复运行的效果。

4,方法scheduleWithFixedDelay()的作用是设置多个任务之间固定的运行时间间隔。

public class M05Thread implements Runnable {

    @Override
    public void run() {
        System.err.println("m05:" + System.currentTimeMillis());
    }
}

ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

System.err.println("main1:" + System.currentTimeMillis());
executor.scheduleWithFixedDelay(new M05Thread(), 1, 2, TimeUnit.SECONDS);
System.err.println("main2:" + System.currentTimeMillis());

运行结果:

main1:1612169196763
main2:1612169196766
m05:1612169197771
m05:1612169199771
m05:1612169201767
m05:1612169203766
……

5,方法getQueue()作用是取得队列中的任务,而这些任务是未来将要运行的,正在运行的任务不在此队列中。使用scheduleAtFixedRate()和scheduleWithFixedDelay()两个方法实现周期性执行任务时,未来欲执行的任务都是放入此队列中。

6,方法setExecuteExistingDelayedTasksAfterShutdownPolicy()的作用是当对ScheduledThreadPoolExecutor执行了shutdown()方法时,任务是否继续运行,默认值时true,也就是继续运行,当使用setExecuteExistingDelayedTasksAfterShutdownPolicy(false)时任务不再运行。

7,方法setContinueExistingPeriodicTasksAfterShutdownPolicy()传入true的作用是当使用scheduleAtFixedRate()方法或scheduleWithFixedDelay()方法时,如果调用ScheduledThreadPoolExecutor对象的shutdown()方法,任务还会继续运行,传入false时任务不运行,进程销毁。

8,方法cancel(boolean)的作用设定是否取消任务。

scheduledFuture.cancel(true)

9,方法setRemoveOnCancelPolicy(boolean)的作用设定是否将取消后的任务从队列中清除。

scheduledFuture.setRemoveOnCancelPolicy(true);
scheduledFuture.cancel(true);

使用setRemoveOnCancelPolicy(true)和cancel(true),则队列中的任务被删除。

相关文章

网友评论

      本文标题:JAVA并发编程

      本文链接:https://www.haomeiwen.com/subject/vacizktx.html