美文网首页
Phaser并发阶段任务的运行

Phaser并发阶段任务的运行

作者: yincb | 来源:发表于2019-04-30 01:31 被阅读0次

Phaser是一个更强大的、更复杂的同步辅助类,可以代替CyclicBarrier CountDownLatch的功能,但是比他们更强大。
Phaser类机制是在每一步结束的位置对线程进行同步,当所有的线程都完成了这一步,才能进行下一步。
当我们有并发任务并且需要分解成几步执行的时候,这种机制就非常适合。
CyclicBarrier CountDownLatch 只能在构造时指定参与量,而phaser可以动态的增减参与量。
phaser 使用说明:
使用phaser.arriveAndAwaitAdvance(); //等待参与者达到指定数量,才开始运行下面的代码
使用phaser.arriveAndDeregister(); //注销当前线程,该线程就不会进入休眠状态,也会从phaser的数量中减少
模拟代替CountDownLatch功能,只需要当前线程arriveAndAwaitAdvance()之后运行需要的代码之后,就arriveAndDeregister()取消当前线程的注册。
phaser有一个重大特性,就是不必对它的方法进行异常处理。置于休眠的线程不会响应中断事件,不会抛出interruptedException异常, 只有一个方法会响应:AwaitAdvanceInterruptibly(int phaser).
其他api

arrive():这个方法通知phase对象一个参与者已经完成了当前阶段,但是它不应该等待其他参与者都完成当前阶段,必须小心使用这个方法,因为它不会与其他线程同步。
awaitAdvance(int phase):如果传入的阶段参数与当前阶段一致,这个方法会将当前线程至于休眠,直到这个阶段的所有参与者都运行完成。如果传入的阶段参数与当前阶段不一致,这个方法立即返回。
awaitAdvanceInterruptibly(int phaser):这个方法跟awaitAdvance(int phase)一样,不同处是:该访问将会响应线程中断。会抛出interruptedException异常
将参与者注册到phaser中:
register():将一个新的参与者注册到phase中,这个新的参与者将被当成没有执完本阶段的线程。
bulkRegister(int parties):将指定数目的参与者注册到phaser中,所有这些新的参与者都将被当成没有执行完本阶段的线程。
减少参与者
只提供了一个方法来减少参与者:arriveAndDeregister():告知phaser对应的线程已经完成了当前阶段,并它不会参与到下一阶段的操作中。
强制终止
当一个phaser么有参与者的时候,它就处于终止状态,使用forceTermination()方法来强制phaser进入终止状态,不管是否存在未注册的参与线程,当一个线程出现错误时,强制终止phaser是很有意义的。
当phaser处于终止状态的时候,arriveAndAwaitAdvance() 和 awaitAdvance() 立即返回一个负数,而不再是一个正值了,如果知道phaser可能会被终止,就需要验证这些方法的值,以确定phaser是不是被终止了。
被终止的phaser不会保证参与者的同步。

example

public class PhaserExample1 {

    private static Random random = new Random(System.currentTimeMillis());

    public static void main(String[] args) {
        final Phaser phaser = new Phaser();
        IntStream.rangeClosed(1,5).boxed().map(i->phaser).forEach(Task::new);

        phaser.register();
        phaser.arriveAndAwaitAdvance();
        System.out.println("all thread finish work.");
    }

    static class Task extends Thread{

        private final Phaser phaser;

        Task(Phaser phaser) {
            this.phaser = phaser;
            phaser.register();
            start();
        }

        @Override
        public void run() {
            try {
                System.out.println("The worker ["+getName()+"] start");
                TimeUnit.SECONDS.sleep(random.nextInt(5));
                System.out.println("The worker ["+getName()+"] end.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            phaser.arriveAndAwaitAdvance();
        }
    }
}

运行结果:
The worker [Thread-0] start
The worker [Thread-2] start
The worker [Thread-1] start
The worker [Thread-4] start
The worker [Thread-3] start
The worker [Thread-3] end.
The worker [Thread-0] end.
The worker [Thread-1] end.
The worker [Thread-4] end.
The worker [Thread-2] end.
all thread finish work.

public class PhaserExample2 {

    private final static Random random = new Random(System.currentTimeMillis());

    /**
     * running
     *
     * bicycle
     *
     * long jump
     * @param args
     */

    public static void main(String[] args) {
        final Phaser phaser = new Phaser(5);
        for (int i = 0; i < 6; i++) {
            new Athletes(i,phaser).start();
        }
    }

    static class Athletes extends Thread{
        private final int no;
        private final Phaser phaser;

        Athletes(int no, Phaser phaser) {
            this.no = no;
            this.phaser = phaser;
        }

        @Override
        public void run() {
            System.out.println(no + " running start.");
            sleep(random.nextInt(5));
            System.out.println(no + " running end.");
            phaser.arriveAndAwaitAdvance();

            System.out.println(no + " bicycle start.");
            sleep(random.nextInt(5));
            System.out.println(no + " bicycle end.");
            phaser.arriveAndAwaitAdvance();

            System.out.println(no + " long jump start.");
            sleep(random.nextInt(5));
            System.out.println(no + " long jump end.");
            phaser.arriveAndAwaitAdvance();
        }

        private void sleep(int time){
            try {
                TimeUnit.SECONDS.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

相关文章

  • Phaser并发阶段任务的运行

    Phaser是一个更强大的、更复杂的同步辅助类,可以代替CyclicBarrier CountDownLatch的...

  • Phaser

    Phaser更像是CountDownLatch和CyclicBarrier的结合体。Phaser是按照不同的阶段来...

  • Java 并发特性 延迟队列/StampedLock/并发累加器

    Java 并发特性 延迟队列/StampedLock/并发累加器/Bit Set/Phaser 1.延迟队列 在 ...

  • JVM七大垃圾回收器之 CMS

    注释:浮动垃圾是并发清理阶段。CMS在并发清理阶段用户线程还是在运行着,伴随着程序运行自然还会有新的垃圾不断产生,...

  • go 进程和线程

    进程: 并发运行:并发运行是多个任务被同时发起运行,但同一时刻这些任务只能有一个处于运行状态。这取决于cpu核心...

  • 并发,并行 | 同步,异步 | 阻塞,非阻塞

    并发:一个处理器同时处理多个任务,在多个任务间快速切换,实现多任务同时运行的效果。称为并发 并行:多个任务同时进行...

  • 多任务

    1.多任务:在同一时间,执行多个任务。 2.并发与并行区别 并发:任务轮流交替运行 并行:同一时刻同时运行。 3....

  • 并发一:JAVA并发模型

    一、并发 并发程序是指在运行中有两个及以上的任务同时在处理,与之相关的概念并行,是指在运行中有两个及以上的任务同...

  • 并发和并行的区别

    并发指的是系统能够同时处理多个任务。并行指的是系统能够同时运行多个任务。 处理未必是运行。多个任务可以先存下来,然...

  • Java并发之Executor(返回结果处理)

    运行多个任务并处理第一个结果 运行多个任务并处理所有结果 运行多个任务并处理第一个结果 并发编程常见的问题,就是当...

网友评论

      本文标题:Phaser并发阶段任务的运行

      本文链接:https://www.haomeiwen.com/subject/fsbpnqtx.html