美文网首页
RxJava 中的设计模式(三)代理模式之切换线程实现

RxJava 中的设计模式(三)代理模式之切换线程实现

作者: 蓝笔头 | 来源:发表于2021-06-27 13:11 被阅读0次

    代理模式介绍

    在代理模式(Proxy Pattern)中,一个类代表另一个类的功能。这种类型的设计模式属于结构型模式。

    • 意图:为其他对象提供一种代理以控制对这个对象的访问。
    • 何时使用:想在访问一个类时做一些控制。
    • 如何解决:增加中间层。

    注意事项:

    实现

    UML 类图
    public interface Service {
        void handle();
    }
    
    @Slf4j
    public class RealService implements Service{
        @Override
        public void handle() {
            log.info("I am the RealService");
        }
    }
    
    @Slf4j
    public class ProxyService implements Service {
        private Service realService;
    
        public ProxyService(Service realService) {
            this.realService = realService;
        }
    
        @Override
        public void handle() {
            new Thread(() -> {
                this.realService.handle();
            }).start();
        }
    }
    

    测试:

    public class Main {
    
        public static void main(String[] args) {
            Service realService = new RealService();
            realService.handle();
    
            ProxyService proxyService = new ProxyService(realService);
            proxyService.handle();
        }
    }
    

    输出结果:

    10:53:10.375 [main] INFO org.company.pattern.proxy.RealService - I am the RealService
    10:53:10.425 [Thread-0] INFO org.company.pattern.proxy.RealService - I am the RealService
    

    RxJava 切换线程实现

    通过代理模式实现 RxJava 中的线程切换代码如下文所示。

    订阅时(SubscribeOn)

    订阅时,即在调用 subscribe() 方法时,切换线程。

    1)实现调度器 Schedulers

    import java.util.concurrent.*;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class Schedulers {
        private static Scheduler IO = new IoScheduler("IO");
        private static Scheduler NEW_THREAD = new NewThreadScheduler();
    
        public static Scheduler io() {
            return IO;
        }
    
        public static Scheduler newThread() {
            return NEW_THREAD;
        }
    
        interface Scheduler {
            void schedule(Runnable run);
        }
    
        static class NamedThreadFactory extends AtomicInteger implements ThreadFactory {
            private String prefix;
    
            public NamedThreadFactory(String prefix) {
                this.prefix = prefix;
            }
    
            @Override
            public Thread newThread(Runnable r) {
                StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet());
                String name = nameBuilder.toString();
                return new Thread(r, name);
            }
        }
    
        static class NewThreadScheduler implements Scheduler {
            private static final NamedThreadFactory threadFactory = new NamedThreadFactory("New-Thread");
    
            @Override
            public void schedule(Runnable run) {
                Thread thread = threadFactory.newThread(run);
                thread.start();
            }
        }
    
        static class IoScheduler implements Scheduler {
            private static final NamedThreadFactory threadFactory = new NamedThreadFactory("IO");
            private ExecutorService executorService;
    
            public IoScheduler(String tag) {
                executorService = Executors.newFixedThreadPool(1, threadFactory);
            }
    
            @Override
            public void schedule(Runnable run) {
                executorService.submit(run);
            }
        }
    }
    

    2)实现 ObservableSubscribeOn

    @Slf4j
    public class ObservableSubscribeOn<T> extends Observable<T>{
        private Observable<T> upstream;
        private Schedulers.Scheduler scheduler;
    
        public ObservableSubscribeOn(Observable<T> upstream, Schedulers.Scheduler scheduler) {
            this.upstream = upstream;
            this.scheduler = scheduler;
        }
    
        @Override
        protected void subscribeActual(Observer<T> observer) {
            scheduler.schedule(() -> {
                log.info("schedule is called.");
                this.upstream.subscribe(observer);
            });
        }
    }
    

    代理模式分析:

    • upstream 字段所引用的 Observable 对象可以看作是上文的 RealService
    • ObservableSubscribeOn 可以看作是上文的 ProxyService

    注意:这里代理的的是 Observable

    观察时(ObserveOn)

    观察时,即事件处理时(调用 ObserveronNext() 方法)。
    这个时候也可以切换线程。

    1)Schedulers.Scheduler 新增 createWorker() 方法。

    public class Schedulers {
        ...
        interface Scheduler {
            ...
            ExecutorService createWorker();
        }
    
        static class NewThreadScheduler implements Scheduler {
            ...
            @Override
            public ExecutorService createWorker() {
                return Executors.newFixedThreadPool(1, threadFactory);
            }
        }
    
        static class IoScheduler implements Scheduler {
            ...
            @Override
            public ExecutorService createWorker() {
                return Executors.newFixedThreadPool(1, threadFactory);
            }
        }
    }
    

    因为一个事件流中可以包含多个事件数据,所以要使用 Worker(线程池)的形式执行事件监听处理(ObserveronNext() 方法)。

    2)实现 ObservableObserveOn

    public class ObservableObserveOn<T> extends Observable<T> {
        private Observable<T> upstream;
        private Schedulers.Scheduler scheduler;
    
        public ObservableObserveOn(Observable<T> upstream, Schedulers.Scheduler scheduler) {
            this.upstream = upstream;
            this.scheduler = scheduler;
        }
    
        @Override
        protected void subscribeActual(Observer observer) {
            ObserveOnObserver<T> observeOnObserver = new ObserveOnObserver(observer, this.scheduler);
            this.upstream.subscribe(observeOnObserver);
        }
    
        public static class ObserveOnObserver<T> implements Observer<T> {
            private Observer<T> downstream;
            private ExecutorService worker;
    
            public ObserveOnObserver(Observer<T> downstream, Schedulers.Scheduler scheduler) {
                this.downstream = downstream;
                this.worker = scheduler.createWorker();
            }
    
            @Override
            public void onNext(T value) {
                this.worker.submit(() -> {
                    this.downstream.onNext(value);
                });
            }
        }
    }
    

    代理模式分析:

    • downstream 字段所引用的 Observer 对象可以看作是上文的 RealService
    • ObserveOnObserver 可以看作是上文的 ProxyService

    注意:这里代理的的是 Observer

    测试代码如下所示:

    @Slf4j
    public class Main {
    
        public static void main(String[] args) throws InterruptedException {
            Observable.create(new ObservableOnSubscribe<Integer>() {
                        @Override
                        public void subscribe(Emitter<Integer> emitter) {
                            log.info("emitter begin");
                            emitter.onNext(1);
                            emitter.onNext(2);
                            emitter.onNext(3);
                        }
                    })
                    .map(v -> v + 10) // 11 12 13
                    .subscribeOn(Schedulers.io())
                    .observeOn(Schedulers.newThread())
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onNext(Integer o) {
                            log.info("onNext {}", o);
                        }
                    });
        }
    }
    

    输出如下所示:

    17:17:51.623 [IO-1] INFO org.company.rxjava.pattern.ObservableSubscribeOn - schedule is called.
    17:17:51.639 [IO-1] INFO org.company.rxjava.pattern.Main2 - emitter begin
    17:17:51.642 [New-Thread-1] INFO org.company.rxjava.pattern.Main2 - onNext 11
    17:17:51.649 [New-Thread-1] INFO org.company.rxjava.pattern.Main2 - onNext 12
    17:17:51.649 [New-Thread-1] INFO org.company.rxjava.pattern.Main2 - onNext 13
    

    从输出中可以得知,订阅(subscribe()) 在 [IO-0] 线程中执行,事件处理(onNext())在 [New-Thread-[worker]-0] 线程中执行。

    说明实现符合预期。

    总结

    • ObservableSubscribeOn 中用代理模式代理了 upstream 字段引用的 Observable 对象。
    • ObserveOnObserver 中用代理模式代理了 downstream 字段所引用的 Observer 对象。

    参考

    相关文章

      网友评论

          本文标题:RxJava 中的设计模式(三)代理模式之切换线程实现

          本文链接:https://www.haomeiwen.com/subject/fecxultx.html