美文网首页
Scheduler (一)

Scheduler (一)

作者: 捞月亮的阿汤哥 | 来源:发表于2021-01-05 14:43 被阅读0次

    本篇文章主要是阅读官网的scheduler文档。原文链接http://reactivex.io/documentation/scheduler.html

    弹珠图

    弹珠图可以很好的描述rxjava的运作流程,建议先看下这篇文章Understanding Marble Diagrams for Reactive Streams

    简单scheduler

    输出结果: RxNewThreadScheduler-1 hello world!

    public class SchedulersWork {
        private static final CountDownLatch latch = new CountDownLatch(1);
    
        public static void main(String[] args) throws InterruptedException {
            Scheduler.Worker worker = Schedulers.newThread().createWorker();
            worker.schedule(SchedulersWork::sayHello);
            latch.await();
            worker.dispose();
        }
    
        public static void sayHello() {
            System.out.println(Thread.currentThread().getName() + " hello world!");
            latch.countDown();
        }
    }
    

    递归scheduler

    输出结果: RxNewThreadScheduler-1 hello world! ...
    循环输出直到dispose worker

    public class RecursiveScheduler {
        public static void main(String[] args) throws InterruptedException {
            Scheduler.Worker worker = Schedulers.newThread().createWorker();
            worker.schedule(new Runnable() {
                @Override
                public void run() {
                    sayHello();
                    //递归直到dispose
                    worker.schedule(this);
                }
            });
    
            Thread.sleep(1000);
            worker.dispose();
        }
    
        public static void sayHello() {
            System.out.println(Thread.currentThread().getName() + " hello world!");
        }
    }
    

    ⚠️: 递归调用需要限制递归次数或者主动设置dispose状态,否则会出现死循环。

    检查或者设置dispose状态

    package com.zihao.schedulers;
    
    import io.reactivex.rxjava3.core.Scheduler;
    import io.reactivex.rxjava3.schedulers.Schedulers;
    
    import java.util.Scanner;
    
    /**
     * 相比较普通的recursive 调度器,使用dispose状态检查
     * 如果不订阅了 就停止并释放资源
     *
     * @author tangzihao
     * @Date 2021/1/3 11:20 上午
     */
    public class CheckDisposeRecursiveScheduler {
        public static void main(String[] args) throws InterruptedException {
            Scheduler.Worker worker = Schedulers.newThread().createWorker();
            worker.schedule(new Runnable() {
                @Override
                public void run() {
                    while (!worker.isDisposed()) {
                        sayHello();
                    }
                    System.out.println("worker被终止了。。。");
                }
            });
    
            //主线程通过终端控制worker结束
            Scanner scan = new Scanner(System.in);
            System.out.println("终止worker工作: ");
            if (scan.hasNext()) {
                String str = scan.next();
                if (str.equals("stop")) {
                    worker.dispose();
                }
            }
            scan.close();
        }
    
        public static void sayHello() {
            //do nothing 故意不输出方便终端输入
        }
    }
    

    输出结果

    终止worker工作: 
    stop  //在控制台输入 main线程
    worker被终止了。。。 //worker线程
    

    延迟或周期性调度器

    delayed scheduler

    schedule有三个参数,调用的方法,延迟的时间,时间单位

    public class DelayedAndPeriodicScheduler {
        public static void main(String[] args) throws InterruptedException {
            Scheduler.Worker worker = Schedulers.newThread().createWorker();
            worker.schedule(DelayedAndPeriodicScheduler::sayHello,1, TimeUnit.SECONDS);
            Thread.sleep(2000);
        }
    
        public static void sayHello(){
            System.out.println("hello,world!");
        }
    }
    

    periodic scheduler

    schedulePeriodically有四个参数,调用的方法,延迟的时间,周期性时间,时间单位

    public class DelayedAndPeriodicScheduler {
        public static void main(String[] args) throws InterruptedException {
            periodicScheduler();
        }
    
        public static void periodicScheduler() throws InterruptedException {
            Scheduler.Worker worker = Schedulers.newThread().createWorker();
            worker.schedulePeriodically(DelayedAndPeriodicScheduler::sayHello, 500, 250, TimeUnit.MILLISECONDS);
            Thread.sleep(3000);
        }
    
        public static void sayHello() {
            System.out.println("hello,world!");
        }
    }
    

    相关文章

      网友评论

          本文标题:Scheduler (一)

          本文链接:https://www.haomeiwen.com/subject/zdtwoktx.html