美文网首页
并发核心框架:3.Phaser的使用

并发核心框架:3.Phaser的使用

作者: 81bad73e9053 | 来源:发表于2016-09-08 22:52 被阅读271次

    CyclicBarrier的缺点:
    1.不可以动态的添加parties计数
    2.调用一次await方法仅仅占用一个parties计数

    1.Phaser堆计数的操作是加法操作

    2.arriveAndAwaitAdvance()方法测试

    执行这个方法的作用是当前线程已经达到屏障,在此等待一段时间,等待条件满足后继续向下一个屏障执行
    Phaser具有设置多屏障的功能

    public class PrintTools {
    
        public static Phaser phaser;
    
        public static void methodA() {
            System.out.println(Thread.currentThread().getName() + " A1 begin="
                    + System.currentTimeMillis());
            phaser.arriveAndAwaitAdvance();
            System.out.println(Thread.currentThread().getName() + " A1   end="
                    + System.currentTimeMillis());
    
            System.out.println(Thread.currentThread().getName() + " A2 begin="
                    + System.currentTimeMillis());
            phaser.arriveAndAwaitAdvance();
            System.out.println(Thread.currentThread().getName() + " A2   end="
                    + System.currentTimeMillis());
        }
    
        public static void methodB() {
            try {
                System.out.println(Thread.currentThread().getName() + " A1 begin="
                        + System.currentTimeMillis());
                Thread.sleep(5000);
                phaser.arriveAndAwaitAdvance();
                System.out.println(Thread.currentThread().getName() + " A1   end="
                        + System.currentTimeMillis());
    
                System.out.println(Thread.currentThread().getName() + " A2 begin="
                        + System.currentTimeMillis());
                Thread.sleep(5000);
                phaser.arriveAndAwaitAdvance();
                System.out.println(Thread.currentThread().getName() + " A2   end="
                        + System.currentTimeMillis());
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
    }
    
    
    public class ThreadA extends Thread {
    
        private Phaser phaser;
    
        public ThreadA(Phaser phaser) {
            super();
            this.phaser = phaser;
        }
    
        public void run() {
            PrintTools.methodA();
        }
    
    }
    
    
    public class ThreadB extends Thread {
    
        private Phaser phaser;
    
        public ThreadB(Phaser phaser) {
            super();
            this.phaser = phaser;
        }
    
        public void run() {
            PrintTools.methodA();
        }
    
    }
    
    public class ThreadC extends Thread {
    
        private Phaser phaser;
    
        public ThreadC(Phaser phaser) {
            super();
            this.phaser = phaser;
        }
    
        public void run() {
            PrintTools.methodB();
        }
    
    }
    
    
    public class Run {
        public static void main(String[] args) {
            Phaser phaser = new Phaser(3);
            PrintTools.phaser = phaser;
    
            ThreadA a = new ThreadA(phaser);
            a.setName("A");
            a.start();
    
            ThreadB b = new ThreadB(phaser);
            b.setName("B");
            b.start();
    
            ThreadC c = new ThreadC(phaser);
            c.setName("C");
            c.start();
        }
    }
    运行结果:
    A A1 begin=1473729059256
    B A1 begin=1473729059256
    C A1 begin=1473729059258
    C A1   end=1473729064258
    C A2 begin=1473729064258
    B A1   end=1473729064258
    B A2 begin=1473729064258
    A A1   end=1473729064258
    A A2 begin=1473729064258
    C A2   end=1473729069258
    A A2   end=1473729069258
    B A2   end=1473729069258
    

    3.arriveAndAwaitAdvance()方法测试2

    修改上面代码

    public class PrintTools {
    
        public static Phaser phaser;
    
        public static void methodA() {
            System.out.println(Thread.currentThread().getName() + " A1 begin="
                    + System.currentTimeMillis());
            phaser.arriveAndAwaitAdvance();
            System.out.println(Thread.currentThread().getName() + " A1   end="
                    + System.currentTimeMillis());
    
            System.out.println(Thread.currentThread().getName() + " A2 begin="
                    + System.currentTimeMillis());
            phaser.arriveAndAwaitAdvance();
            System.out.println(Thread.currentThread().getName() + " A2   end="
                    + System.currentTimeMillis());
        }
    
        public static void methodB() {
            try {
                System.out.println(Thread.currentThread().getName() + " A1 begin="
                        + System.currentTimeMillis());
                Thread.sleep(5000);
                phaser.arriveAndAwaitAdvance();
                System.out.println(Thread.currentThread().getName() + " A1   end="
                        + System.currentTimeMillis());
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
    }
    
    运行结果:
    A A1 begin=1473729358613
    B A1 begin=1473729358613
    C A1 begin=1473729358614
    B A1   end=1473729363615
    C A1   end=1473729363615
    B A2 begin=1473729363615
    A A1   end=1473729363615
    A A2 begin=1473729363615
    
    

    4.arriveAndDeregister方法测试

    public class PrintTools {
    
        public static Phaser phaser;
    
        public static void methodA() {
            System.out.println(Thread.currentThread().getName() + " A1 begin="
                    + System.currentTimeMillis());
            phaser.arriveAndAwaitAdvance();
            System.out.println(Thread.currentThread().getName() + " A1   end="
                    + System.currentTimeMillis());
    
            System.out.println(Thread.currentThread().getName() + " A2 begin="
                    + System.currentTimeMillis());
            phaser.arriveAndAwaitAdvance();
            System.out.println(Thread.currentThread().getName() + " A2   end="
                    + System.currentTimeMillis());
        }
    
        public static void methodB() {
            try {
                System.out.println(Thread.currentThread().getName() + " A1 begin="
                        + System.currentTimeMillis());
                Thread.sleep(5000);
                System.out.println("A:" + phaser.getRegisteredParties());
                phaser.arriveAndDeregister();
                System.out.println("B:" + phaser.getRegisteredParties());
                System.out.println(Thread.currentThread().getName() + " A1   end="
                        + System.currentTimeMillis());
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
    }
    运行结果:
    A A1 begin=1473729632834
    B A1 begin=1473729632834
    C A1 begin=1473729632835
    A:3
    B:2
    C A1   end=1473729637835
    B A1   end=1473729637835
    A A1   end=1473729637835
    A A2 begin=1473729637835
    B A2 begin=1473729637835
    A A2   end=1473729637835
    B A2   end=1473729637835
    

    5 getPhase和onAdvance方法

    5.1getPhase方法的作用:获取已经到达的是第几个屏障

    public class ThreadA extends Thread {
    
        private Phaser phaser;
    
        public ThreadA(Phaser phaser) {
            super();
            this.phaser = phaser;
        }
    
        public void run() {
            System.out.println("A  begin");
            phaser.arriveAndAwaitAdvance();
            System.out.println("A    end phase value=" + phaser.getPhase());
    
            System.out.println("A  begin");
            phaser.arriveAndAwaitAdvance();
            System.out.println("A    end phase value=" + phaser.getPhase());
    
            System.out.println("A  begin");
            phaser.arriveAndAwaitAdvance();
            System.out.println("A    end phase value=" + phaser.getPhase());
    
            System.out.println("A  begin");
            phaser.arriveAndAwaitAdvance();
            System.out.println("A    end phase value=" + phaser.getPhase());
    
        }
    
    }
    
    public class Run {
        public static void main(String[] args) {
            Phaser phaser = new Phaser(1);
            ThreadA a = new ThreadA(phaser);
            a.start();
        }
    }
    运行结果:
    A  begin
    A    end phase value=1
    A  begin
    A    end phase value=2
    A  begin
    A    end phase value=3
    A  begin
    A    end phase value=4
    
    

    修改代码

    public class Run {
        public static void main(String[] args) {
            Phaser phaser = new Phaser(2);
            ThreadA a = new ThreadA(phaser);
            a.start();
        }
    }
    
    运行结果:
    a begin 
    然后阻塞
    

    5.2onAdvance在通过新的屏障时被调用

    public class MyService {
        private Phaser phaser;
    
        public MyService(Phaser phaser) {
            super();
            this.phaser = phaser;
        }
    
        public void testMethod() {
            try {
                System.out.println("A  begin ThreadName="
                        + Thread.currentThread().getName()
                        + "                              "
                        + System.currentTimeMillis());
                if (Thread.currentThread().getName().equals("B")) {
                    Thread.sleep(5000);
                }
                phaser.arriveAndAwaitAdvance();
                System.out.println("A    end  ThreadName="
                        + Thread.currentThread().getName() + " end phase value="
                        + phaser.getPhase() + " " + System.currentTimeMillis());
                // ////////
                System.out.println("B  begin ThreadName="
                        + Thread.currentThread().getName()
                        + "                              "
                        + System.currentTimeMillis());
                if (Thread.currentThread().getName().equals("B")) {
                    Thread.sleep(5000);
                }
                phaser.arriveAndAwaitAdvance();
                System.out.println("B    end  ThreadName="
                        + Thread.currentThread().getName() + " end phase value="
                        + phaser.getPhase() + " " + System.currentTimeMillis());
                // ////////
                System.out.println("C  begin ThreadName="
                        + Thread.currentThread().getName()
                        + "                              "
                        + System.currentTimeMillis());
                if (Thread.currentThread().getName().equals("B")) {
                    Thread.sleep(5000);
                }
                phaser.arriveAndAwaitAdvance();
                System.out.println("C    end  ThreadName="
                        + Thread.currentThread().getName() + " end phase value="
                        + phaser.getPhase() + " " + System.currentTimeMillis());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
        }
    }
    
    
    public class ThreadA extends Thread {
    
        private MyService myService;
    
        public ThreadA(MyService myService) {
            super();
            this.myService = myService;
        }
    
        public void run() {
            myService.testMethod();
    
        }
    
    }
    
    
    public class ThreadB extends Thread {
    
        private MyService myService;
    
        public ThreadB(MyService myService) {
            super();
            this.myService = myService;
        }
    
        public void run() {
            myService.testMethod();
    
        }
    
    }
    
    
    public class Run {
        public static void main(String[] args) {
            Phaser phaser = new Phaser(2) {
                protected boolean onAdvance(int phase, int registeredParties) {
                    System.out .println(" onAdvance 被调用!");
                    return false;
                    // 返回true不等待了,Phaser呈无效/销毁的状态
                    // 返回false则Phaser继续工作
                }
            };
    
            MyService service = new MyService(phaser);
    
            ThreadA a = new ThreadA(service);
            a.setName("A");
            a.start();
            ThreadB b = new ThreadB(service);
            b.setName("B");
            b.start();
        }
    }
    运行结果:
    
    A  begin ThreadName=A                              1473747797710
    A  begin ThreadName=B                              1473747797711
     onAdvance 被调用!
    A    end  ThreadName=B end phase value=1 1473747802712
    A    end  ThreadName=A end phase value=1 1473747802712
    B  begin ThreadName=B                              1473747802712
    B  begin ThreadName=A                              1473747802712
     onAdvance 被调用!
    B    end  ThreadName=B end phase value=2 1473747807712
    B    end  ThreadName=A end phase value=2 1473747807712
    C  begin ThreadName=A                              1473747807712
    C  begin ThreadName=B                              1473747807712
     onAdvance 被调用!
    C    end  ThreadName=B end phase value=3 1473747812712
    C    end  ThreadName=A end phase value=3 1473747812712
    
    

    6 getRegisterParties和register方法

    getRegisterParties获取注册的parties数量
    register:动态的添加一个parties值

    public class Run {
        public static void main(String[] args) {
            Phaser phaser = new Phaser(5);
            System.out.println(phaser.getRegisteredParties());
    
            phaser.register();
            System.out.println(phaser.getRegisteredParties());
    
            phaser.register();
            System.out.println(phaser.getRegisteredParties());
    
            phaser.register();
            System.out.println(phaser.getRegisteredParties());
    
            phaser.register();
            System.out.println(phaser.getRegisteredParties());
        }
    }
    运行结果:
    5
    6
    7
    8
    9
    

    7.bulkRegister方法

    批量增加parties数量

    public class Run {
        public static void main(String[] args) {
            Phaser phaser = new Phaser(10);
            System.out.println(phaser.getRegisteredParties());
    
            phaser.bulkRegister(10);
            System.out.println(phaser.getRegisteredParties());
    
            phaser.bulkRegister(10);
            System.out.println(phaser.getRegisteredParties());
    
            phaser.bulkRegister(10);
            System.out.println(phaser.getRegisteredParties());
    
            phaser.bulkRegister(10);
            System.out.println(phaser.getRegisteredParties());
        }
    }
    
    运行结果:
    10
    20
    30
    40
    50
    

    8. getArrivedParties和getUnArrivedParties方法

    getArrivedParties获得已经被使用的parties数量
    getUnarrivedParites获取未被使用的parties个数

    public class MyThread extends Thread {
    
        private Phaser phaser;
    
        public MyThread(Phaser phaser) {
            super();
            this.phaser = phaser;
        }
    
        public void run() {
            System.out.println(Thread.currentThread().getName() + " A1 begin="
                    + System.currentTimeMillis());
            phaser.arriveAndAwaitAdvance();
            System.out.println(Thread.currentThread().getName() + " A1   end="
                    + System.currentTimeMillis());
        }
    
    }
    
    
    public class Run {
        public static void main(String[] args) throws InterruptedException {
            Phaser phaser = new Phaser(7);
            MyThread[] myThreadArray = new MyThread[5];
            for (int i = 0; i < myThreadArray.length; i++) {
                myThreadArray[i] = new MyThread(phaser);
                myThreadArray[i].setName("Thread" + (i + 1));
                myThreadArray[i].start();
            }
            Thread.sleep(2000);
            System.out.println("已到达:" + phaser.getArrivedParties());
            System.out.println("未到达:" + phaser.getUnarrivedParties());
    
        }
    }
    运行结果:
    Thread1 A1 begin=1473748522279
    Thread2 A1 begin=1473748522279
    Thread5 A1 begin=1473748522280
    Thread4 A1 begin=1473748522280
    Thread3 A1 begin=1473748522280
    已到达:5
    未到达:2
    
    

    9. arrive方法测试1

    arrive方法的作用:使parties值加1,并且不在屏障处等待,直接向下面的代码继续运行,并且Phaser具有计数重置的功能

    public class Run {
    
        public static void main(String[] args) {
            Phaser phaser = new Phaser(2) {
                protected boolean onAdvance(int phase, int registeredParties) {
                    System.out.println("到达了未通过!phase=" + phase
                            + " registeredParties=" + registeredParties);
                    return super.onAdvance(phase, registeredParties);
                };
            };
            System.out.println("A1 getPhase=" + phaser.getPhase()
                    + " getRegisteredParties=" + phaser.getRegisteredParties()
                    + " getArrivedParties=" + phaser.getArrivedParties());
            phaser.arrive();
            System.out.println("A1 getPhase=" + phaser.getPhase()
                    + " getRegisteredParties=" + phaser.getRegisteredParties()
                    + " getArrivedParties=" + phaser.getArrivedParties());
    
            System.out.println("A2 getPhase=" + phaser.getPhase()
                    + " getRegisteredParties=" + phaser.getRegisteredParties()
                    + " getArrivedParties=" + phaser.getArrivedParties());
            phaser.arrive();
            System.out.println("A2 getPhase=" + phaser.getPhase()
                    + " getRegisteredParties=" + phaser.getRegisteredParties()
                    + " getArrivedParties=" + phaser.getArrivedParties());
            // //////////////
    
            System.out.println("B1 getPhase=" + phaser.getPhase()
                    + " getRegisteredParties=" + phaser.getRegisteredParties()
                    + " getArrivedParties=" + phaser.getArrivedParties());
            phaser.arrive();
            System.out.println("B1 getPhase=" + phaser.getPhase()
                    + " getRegisteredParties=" + phaser.getRegisteredParties()
                    + " getArrivedParties=" + phaser.getArrivedParties());
    
            System.out.println("B2 getPhase=" + phaser.getPhase()
                    + " getRegisteredParties=" + phaser.getRegisteredParties()
                    + " getArrivedParties=" + phaser.getArrivedParties());
            phaser.arrive();
            System.out.println("B2 getPhase=" + phaser.getPhase()
                    + " getRegisteredParties=" + phaser.getRegisteredParties()
                    + " getArrivedParties=" + phaser.getArrivedParties());
            // //////////////
            System.out.println("C1 getPhase=" + phaser.getPhase()
                    + " getRegisteredParties=" + phaser.getRegisteredParties()
                    + " getArrivedParties=" + phaser.getArrivedParties());
            phaser.arrive();
            System.out.println("C1 getPhase=" + phaser.getPhase()
                    + " getRegisteredParties=" + phaser.getRegisteredParties()
                    + " getArrivedParties=" + phaser.getArrivedParties());
            System.out.println("C2 getPhase=" + phaser.getPhase()
                    + " getRegisteredParties=" + phaser.getRegisteredParties()
                    + " getArrivedParties=" + phaser.getArrivedParties());
            phaser.arrive();
            System.out.println("C2 getPhase=" + phaser.getPhase()
                    + " getRegisteredParties=" + phaser.getRegisteredParties()
                    + " getArrivedParties=" + phaser.getArrivedParties());
            // //////////////
    
        }
    
    }
    运行结果:
    A1 getPhase=0 getRegisteredParties=2 getArrivedParties=0
    A1 getPhase=0 getRegisteredParties=2 getArrivedParties=1
    A2 getPhase=0 getRegisteredParties=2 getArrivedParties=1
    到达了未通过!phase=0 registeredParties=2
    A2 getPhase=1 getRegisteredParties=2 getArrivedParties=0
    B1 getPhase=1 getRegisteredParties=2 getArrivedParties=0
    B1 getPhase=1 getRegisteredParties=2 getArrivedParties=1
    B2 getPhase=1 getRegisteredParties=2 getArrivedParties=1
    到达了未通过!phase=1 registeredParties=2
    B2 getPhase=2 getRegisteredParties=2 getArrivedParties=0
    C1 getPhase=2 getRegisteredParties=2 getArrivedParties=0
    C1 getPhase=2 getRegisteredParties=2 getArrivedParties=1
    C2 getPhase=2 getRegisteredParties=2 getArrivedParties=1
    到达了未通过!phase=2 registeredParties=2
    C2 getPhase=3 getRegisteredParties=2 getArrivedParties=0
    

    10. arrive方法测试2

    public class MyService {
    
        public Phaser phaser;
    
        public MyService(Phaser phaser) {
            super();
            this.phaser = phaser;
        }
    
        public void testMethodA() {
            try {
                System.out.println(Thread.currentThread().getName() + " begin A1 "
                        + System.currentTimeMillis());
                Thread.sleep(3000);
                System.out.println(phaser.getArrivedParties());
                phaser.arriveAndAwaitAdvance();
                System.out.println(Thread.currentThread().getName() + "   end A1 "
                        + System.currentTimeMillis());
    
                System.out.println(Thread.currentThread().getName() + " begin A2 "
                        + System.currentTimeMillis());
                Thread.sleep(3000);
                phaser.arriveAndAwaitAdvance();
                System.out.println(Thread.currentThread().getName() + "   end A2 "
                        + System.currentTimeMillis());
    
                System.out.println(Thread.currentThread().getName() + " begin A3 "
                        + System.currentTimeMillis());
                Thread.sleep(3000);
                phaser.arriveAndAwaitAdvance();
                System.out.println(Thread.currentThread().getName() + "   end A3 "
                        + System.currentTimeMillis());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public void testMethodB() {
            System.out.println(Thread.currentThread().getName() + " begin A1 "
                    + System.currentTimeMillis());
            phaser.arrive();
            System.out.println(Thread.currentThread().getName() + "   end A1 "
                    + System.currentTimeMillis());
    
            System.out.println(Thread.currentThread().getName() + " begin A2 "
                    + System.currentTimeMillis());
            phaser.arrive();
            System.out.println(Thread.currentThread().getName() + "   end A2 "
                    + System.currentTimeMillis());
    
            System.out.println(Thread.currentThread().getName() + " begin A3 "
                    + System.currentTimeMillis());
            phaser.arrive();
            System.out.println(Thread.currentThread().getName() + "   end A3 "
                    + System.currentTimeMillis());
        }
    
    }
    
    
    
    public class ThreadA extends Thread {
    
        private MyService myService;
    
        public ThreadA(MyService myService) {
            super();
            this.myService = myService;
        }
    
        @Override
        public void run() {
            myService.testMethodA();
        }
    
    }
    
    public class ThreadB extends Thread {
    
        private MyService myService;
    
        public ThreadB(MyService myService) {
            super();
            this.myService = myService;
        }
    
        @Override
        public void run() {
            myService.testMethodA();
        }
    
    }
    
    
    public class ThreadC extends Thread {
    
        private MyService myService;
    
        public ThreadC(MyService myService) {
            super();
            this.myService = myService;
        }
    
        @Override
        public void run() {
            myService.testMethodB();
        }
    
    }
    
    public class Run {
    
        public static void main(String[] args) {
            Phaser phaser = new Phaser(3);
            MyService service = new MyService(phaser);
    
            ThreadA a = new ThreadA(service);
            a.setName("A");
            a.start();
    
            ThreadB b = new ThreadB(service);
            b.setName("B");
            b.start();
    
            ThreadC c = new ThreadC(service);
            c.setName("C");
            c.start();
    
        }
    
    }
    运行结果:arrive方法达到之后就归零,所以AB继续等待
    A begin A1 1473749324848
    B begin A1 1473749324849
    C begin A1 1473749324850
    C   end A1 1473749324850
    C begin A2 1473749324850
    C   end A2 1473749324850
    C begin A3 1473749324850
    C   end A3 1473749324850
    0 
    1
    
    

    11 awaitAndAdvance(phase)方法

    awaitAndAdvance(phase):如果传入的phase的值和当前getPhase方法返回值一致,就再屏障处等待,否则继续向下运行
    awaitAndAdvance(phase)并不参与计数,仅仅具有判断的功能

    public class ThreadA extends Thread {
    
        private Phaser phaser;
    
        public ThreadA(Phaser phaser) {
            super();
            this.phaser = phaser;
        }
    
        public void run() {
            System.out.println(Thread.currentThread().getName() + " A1 begin="
                    + System.currentTimeMillis()+" getPhase"+phaser.getPhase());
            phaser.arriveAndAwaitAdvance();
            System.out.println(Thread.currentThread().getName() + " A1   end="
                    + System.currentTimeMillis()+" getPhase"+phaser.getPhase());
        }
    
    }
    
    public class ThreadB extends Thread {
    
        private Phaser phaser;
    
        public ThreadB(Phaser phaser) {
            super();
            this.phaser = phaser;
        }
    
        public void run() {
            System.out.println(Thread.currentThread().getName() + " A1 begin="
                    + System.currentTimeMillis()+" getPhase"+phaser.getPhase());
            phaser.arriveAndAwaitAdvance();
            System.out.println(Thread.currentThread().getName() + " A1   end="
                    + System.currentTimeMillis()+" getPhase"+phaser.getPhase());
        }
    
    }
    
    
    public class ThreadC extends Thread {
    
        private Phaser phaser;
    
        public ThreadC(Phaser phaser) {
            super();
            this.phaser = phaser;
        }
    
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " A1 begin="
                        + System.currentTimeMillis()+" getPhase"+phaser.getPhase());
                Thread.sleep(3000);
                phaser.awaitAdvance(0);// 跨栏的栏数
                System.out.println(Thread.currentThread().getName() + " A1   end="
                        + System.currentTimeMillis()+" getPhase"+phaser.getPhase());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    }
    
    
    public class ThreadD extends Thread {
    
        private Phaser phaser;
    
        public ThreadD(Phaser phaser) {
            super();
            this.phaser = phaser;
        }
    
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " A1 begin="
                        + System.currentTimeMillis()+" getPhase"+phaser.getPhase());
                Thread.sleep(5000);
                phaser.arriveAndAwaitAdvance();
                System.out.println(Thread.currentThread().getName() + " A1   end="
                        + System.currentTimeMillis()+" getPhase"+phaser.getPhase());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    }
    
    public class Run {
        public static void main(String[] args) {
            Phaser phaser = new Phaser(3);
    
            ThreadA a = new ThreadA(phaser);
            a.setName("A");
            a.start();
    
            ThreadB b = new ThreadB(phaser);
            b.setName("B");
            b.start();
    
            ThreadC c = new ThreadC(phaser);
            c.setName("C");
            c.start();
    
            ThreadD d = new ThreadD(phaser);
            d.setName("D");
            d.start();
        }
    }
    运行结果:
    A A1 begin=1473749992110 getPhase0
    B A1 begin=1473749992111 getPhase0
    C A1 begin=1473749992112 getPhase0
    D A1 begin=1473749992113 getPhase0
    D A1   end=1473749997114 getPhase1
    A A1   end=1473749997114 getPhase1
    C A1   end=1473749997114 getPhase1
    B A1   end=1473749997114 getPhase1
    

    12 awaitAdvanceinterruptibly(int)测试1

    awaitAdvanceinterruptibly方法是不可中断的

    public class ThreadA extends Thread {
    
        private Phaser phaser;
    
        public ThreadA(Phaser phaser) {
            super();
            this.phaser = phaser;
        }
    
        public void run() {
            System.out.println(Thread.currentThread().getName() + " A1 begin="
                    + System.currentTimeMillis()+" getPhase "+phaser.getPhase());
            phaser.awaitAdvance(0);
            System.out.println(Thread.currentThread().getName() + " A1   end="
                    + System.currentTimeMillis());
        }
    
    }
    
    public class Run {
        public static void main(String[] args) {
            try {
                Phaser phaser = new Phaser(3);
                ThreadA a = new ThreadA(phaser);
                a.setName("A");
                a.start();
                Thread.sleep(5000);
                a.interrupt();
                System.out.println("中断了c");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    运行结果:
    A A1 begin=1473750345591 getPhase 0
    中断了c
    

    12awaitAdvanceinterruptibly方法测试2

    public class ThreadA extends Thread {
    
        private Phaser phaser;
    
        public ThreadA(Phaser phaser) {
            super();
            this.phaser = phaser;
        }
    
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " A1 begin="
                        + System.currentTimeMillis());
                phaser.awaitAdvanceInterruptibly(0);// 符合栏数就wait
                System.out.println(Thread.currentThread().getName() + " A1   end="
                        + System.currentTimeMillis());
            } catch (InterruptedException e) {
                System.out.println("进入catch");
                e.printStackTrace();
            }
        } 
    }
    
    
    
    public class Run {
        public static void main(String[] args) {
            try {
                Phaser phaser = new Phaser(3);
                ThreadA a = new ThreadA(phaser);
                a.setName("A");
                a.start();
                Thread.sleep(5000);
                a.interrupt();
                System.out.println("中断了c");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    运行结果:
    A A1 begin=1473750452441
    中断了c
    进入catch
    java.lang.InterruptedException
        at java.util.concurrent.Phaser.awaitAdvanceInterruptibly(Unknown Source)
        at extthread.ThreadA.run(ThreadA.java:18)
    

    14 awaitAdvanceinterruptibly方法测试3

    public class ThreadA extends Thread {
    
        private Phaser phaser;
    
        public ThreadA(Phaser phaser) {
            super();
            this.phaser = phaser;
        }
    
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " A1 begin="
                        + System.currentTimeMillis());
                phaser.awaitAdvanceInterruptibly(10);// 不符合栏数就继续运行
                System.out.println(Thread.currentThread().getName() + " A1   end="
                        + System.currentTimeMillis());
            } catch (InterruptedException e) {
                System.out.println("进入catch");
                e.printStackTrace();
            }
        }
    
    }
    
    
    public class Run {
        public static void main(String[] args) {
            Phaser phaser = new Phaser(3);
            ThreadA a = new ThreadA(phaser);
            a.setName("A");
            a.start();
        }
    }
    运行结果:
    A A1 begin=1473750622169
    A A1   end=1473750622169
    

    15 awaitAdvanceinterruptibly(phase,timeout,timeunit)方法测试4

    如果在指定时间内没变,则出现异常,否则继续向下运行

    
    public class ThreadA extends Thread {
    
        private Phaser phaser;
    
        public ThreadA(Phaser phaser) {
            super();
            this.phaser = phaser;
        }
    
        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " begin "
                        + System.currentTimeMillis());
                phaser.awaitAdvanceInterruptibly(0, 5, TimeUnit.SECONDS);
                System.out.println(Thread.currentThread().getName() + "   end "
                        + System.currentTimeMillis());
            } catch (InterruptedException e) {
                e.printStackTrace();
                System.out.println("InterruptedException e");
            } catch (TimeoutException e) {
                e.printStackTrace();
                System.out.println("TimeoutException e");
            }
        }
    
    }
    
    
    public class Run1 {
    
        public static void main(String[] args) {
            Phaser phaser = new Phaser(3);
            ThreadA a = new ThreadA(phaser);
            a.setName("A");
            a.start();
        }
    
    }
    运行结果:
    A begin 1473750743978
    java.util.concurrent.TimeoutExceptionTimeoutException e
    
        at java.util.concurrent.Phaser.awaitAdvanceInterruptibly(Unknown Source)
        at extthread.ThreadA.run(ThreadA.java:21)
    
    public class Run2 {
    
        public static void main(String[] args) throws InterruptedException {
            Phaser phaser = new Phaser(3);
            ThreadA a = new ThreadA(phaser);
            a.setName("A");
            a.start();
            Thread.sleep(1000);
            phaser.arrive();
            Thread.sleep(1000);
            phaser.arrive();
            Thread.sleep(1000);
            phaser.arrive();
            System.out.println(System.currentTimeMillis());
        }
    
    }
    运行结果:
    A begin 1473750909777
    1473750912777
    A   end 1473750912777
    
    
    public class Run3 {
    
        public static void main(String[] args) throws InterruptedException {
            Phaser phaser = new Phaser(3);
            ThreadA a = new ThreadA(phaser);
            a.setName("A");
            a.start();
            Thread.sleep(1000);
            a.interrupt();
        }
    
    }
    运行结果:
    A begin 1473750949798
    java.lang.InterruptedException
    InterruptedException e
        at java.util.concurrent.Phaser.awaitAdvanceInterruptibly(Unknown Source)
        at extthread.ThreadA.run(ThreadA.java:21)
    

    16 forceTermination和isTerminated方法

    forceTermination的功能:使Phaser对象的屏障功能失效
    isTerminated的功能:判断Phaser对象是否已经呈销毁状态

    public class ThreadA extends Thread {
    
        private Phaser phaser;
    
        public ThreadA(Phaser phaser) {
            super();
            this.phaser = phaser;
        }
    
        public void run() {
            System.out.println(Thread.currentThread().getName() + " A1 begin="
                    + System.currentTimeMillis());
            phaser.arriveAndAwaitAdvance();
            System.out.println(Thread.currentThread().getName() + " A1   end="
                    + System.currentTimeMillis());
        }
    
    }
    
    public class ThreadB extends Thread {
    
        private Phaser phaser;
    
        public ThreadB(Phaser phaser) {
            super();
            this.phaser = phaser;
        }
    
        public void run() {
            System.out.println(Thread.currentThread().getName() + " A1 begin="
                    + System.currentTimeMillis());
            phaser.arriveAndAwaitAdvance();
            System.out.println(Thread.currentThread().getName() + " A1   end="
                    + System.currentTimeMillis());
        }
    }
    
    
    public class Run1 {
        public static void main(String[] args) {
            Phaser phaser = new Phaser(3);
            ThreadA a = new ThreadA(phaser);
            a.setName("A");
            a.start();
            ThreadB b = new ThreadB(phaser);
            b.setName("B");
            b.start();
        }
    }
    
    运行结果:
    A A1 begin=1473751201581
    B A1 begin=1473751201583
    
    
    public class Run2 {
        public static void main(String[] args) {
            try {
                Phaser phaser = new Phaser(3);
                ThreadA a = new ThreadA(phaser);
                a.setName("A");
                a.start();
                ThreadB b = new ThreadB(phaser);
                b.setName("B");
                b.start();
                Thread.sleep(1000);
                phaser.forceTermination();
                System.out.println(phaser.isTerminated());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    运行结果:
    A A1 begin=1473751225751
    B A1 begin=1473751225752
    true
    A A1   end=1473751226752
    B A1   end=1473751226752
    
    

    17控制Phaser类的运行时机

    public class ThreadA extends Thread {
    
        private Phaser phaser;
    
        public ThreadA(Phaser phaser) {
            super();
            this.phaser = phaser;
        }
    
        public void run() {
            System.out.println(Thread.currentThread().getName() + " A1 begin="
                    + System.currentTimeMillis());
            phaser.arriveAndAwaitAdvance();
            System.out.println(Thread.currentThread().getName() + " A1   end="
                    + System.currentTimeMillis());
        }
    
    }
    
    public class Run1 {
        public static void main(String[] args) {
            Phaser phaser = new Phaser(3);
            for (int i = 0; i < 3; i++) {
                ThreadA t = new ThreadA(phaser);
                t.start();
            }
        }
    }
    
    public class Run2 {
        public static void main(String[] args) throws InterruptedException {
            Phaser phaser = new Phaser(3);
            phaser.register();
            for (int i = 0; i < 3; i++) {
                ThreadA t = new ThreadA(phaser);
                t.start();
            }
            Thread.sleep(5000);
            phaser.arriveAndDeregister();
        }
    }
    

    相关文章

      网友评论

          本文标题:并发核心框架:3.Phaser的使用

          本文链接:https://www.haomeiwen.com/subject/kaqeettx.html