美文网首页鸿蒙Harmoney
Harmony鸿蒙 专属RxHarmony

Harmony鸿蒙 专属RxHarmony

作者: h2coder | 来源:发表于2021-07-18 14:50 被阅读0次

前言

鸿蒙也支持Java语言开发,所以也可以使用RxJava,RxJava除了有众多操作符外,还有便捷的线程切换,例如子线程执行完耗时操作,在主线程中更新UI

Android中,与RxJava配套的有RxAndroid,提供了主线程Scheduler调度器,而鸿蒙也有主线程的概念,API也比较类似,按道理是可以按葫芦画瓢,实现一个RxHarmony

代码原理

要写一个RxRxHarmony库,必须先了解RxAndroid,了解后,才能理解怎么实现。原理可以参考这篇文章,RxAndroid 源码分析

大概原理就是,RxAndroid使用Handler把异步任务发消息到主线程处理,实现线程切换。而RxHarmony则使用EventHandler发消息到主线程,原理基本一致

类结构

  • RxHarmonyPlugins,工具类,提供一系列的static方法,外部可以调用进行配置,在特定时机,进行hook和处理
  • HarmonySchedulers,线程调度器工厂
  • EventHandlerScheduler,Harmony主线程调度器,内部通过EventHandler,发送消息到主线程,对要执行的异步任务进行处理,实现线程切换
  • MainThreadDisposable,主线程Disposable实现类,一般配合RxBinding使用

RxHarmonyPlugins

/**
 * 工具类,提供一系列的static方法,外部可以调用进行配置,在特定时机,进行hook和处理
 */
public final class RxHarmonyPlugins {
    private static volatile Function<Callable<Scheduler>, Scheduler> onInitMainThreadHandler;
    private static volatile Function<Scheduler, Scheduler> onMainThreadHandler;

    /**
     * 工具类,隐藏构造方法,如果被反射,抛出异常
     */
    private RxHarmonyPlugins() {
        throw new AssertionError("No instances.");
    }

    /**
     * 初始化主线程调度器
     *
     * @param scheduler 默认的调度器,被Callable回调包裹
     * @return 要被应用的主线程调度器
     */
    public static Scheduler initMainThreadScheduler(Callable<Scheduler> scheduler) {
        //判空
        if (scheduler == null) {
            throw new NullPointerException("scheduler == null");
        }
        //如果有通过setInitMainThreadSchedulerHandler()方法设置了,hook回调函数,则通过hook回调函数进行处理,再返回调度器
        Function<Callable<Scheduler>, Scheduler> f = onInitMainThreadHandler;
        if (f == null) {
            //没有设置,判空后,再返回
            return callRequireNonNull(scheduler);
        }
        //设置了,调用设置的回调函数,进行处理
        return applyRequireNonNull(f, scheduler);
    }

    /**
     * 设置主线程调度器hook回调函数
     */
    public static void setMainThreadSchedulerHandler(Function<Scheduler, Scheduler> handler) {
        onMainThreadHandler = handler;
    }

    /**
     * 处理传入的Scheduler调度器
     */
    public static Scheduler onMainThreadScheduler(Scheduler scheduler) {
        if (scheduler == null) {
            throw new NullPointerException("scheduler == null");
        }
        Function<Scheduler, Scheduler> f = onMainThreadHandler;
        if (f == null) {
            return scheduler;
        }
        return apply(f, scheduler);
    }

    /**
     * 获取hook回调函数,可能为null
     */
    public static Function<Callable<Scheduler>, Scheduler> getInitMainThreadSchedulerHandler() {
        return onInitMainThreadHandler;
    }

    /**
     * 设置hook回调函数,可以对设置的调度器进行处理,再返回
     */
    public static void setInitMainThreadSchedulerHandler(Function<Callable<Scheduler>, Scheduler> handler) {
        onInitMainThreadHandler = handler;
    }

    /**
     * 返回配置hook回调函数
     *
     * @return 返回hook回调函数,可能为null
     */
    public static Function<Scheduler, Scheduler> getOnMainThreadSchedulerHandler() {
        return onMainThreadHandler;
    }

    /**
     * 重置所有配置
     */
    public static void reset() {
        setInitMainThreadSchedulerHandler(null);
        setMainThreadSchedulerHandler(null);
    }

    /**
     * 判空获取的Scheduler调度器,非null,则返回,为null则抛异常
     */
    static Scheduler callRequireNonNull(Callable<Scheduler> s) {
        try {
            Scheduler scheduler = s.call();
            if (scheduler == null) {
                throw new NullPointerException("Scheduler Callable returned null");
            }
            return scheduler;
        } catch (Throwable ex) {
            throw Exceptions.propagate(ex);
        }
    }

    /**
     * 调用传入的Function回调函数,对Scheduler进行处理
     *
     * @param f 回调函数
     * @param s Scheduler调度器
     * @return 要应用的调度器
     */
    static Scheduler applyRequireNonNull(Function<Callable<Scheduler>, Scheduler> f, Callable<Scheduler> s) {
        Scheduler scheduler = apply(f, s);
        if (scheduler == null) {
            throw new NullPointerException("Scheduler Callable returned null");
        }
        return scheduler;
    }

    /**
     * 调用回调函数对目标对象进行处理
     */
    static <T, R> R apply(Function<T, R> f, T t) {
        try {
            return f.apply(t);
        } catch (Throwable ex) {
            throw Exceptions.propagate(ex);
        }
    }
}

HarmonySchedulers

/**
 * HarmonyOS,线程调度器工厂
 */
public final class HarmonySchedulers {
    /**
     * 主线程调度器
     */
    private static final Scheduler MAIN_THREAD =
            RxHarmonyPlugins.initMainThreadScheduler(() -> MainHolder.DEFAULT);

    /**
     * 单例,保存主线程调度器
     */
    private static final class MainHolder {
        static final Scheduler DEFAULT = new EventHandlerScheduler(new EventHandlerScheduler.WithIdEventHandler(EventRunner.current()));
    }

    /**
     * 工具类,隐藏构造方法,如果被反射,抛出异常
     */
    private HarmonySchedulers() {
        throw new AssertionError("No instances.");
    }

    /**
     * 获取主线程调度器
     */
    public static Scheduler mainThread() {
        return RxHarmonyPlugins.onMainThreadScheduler(MAIN_THREAD);
    }

    /**
     * 可以指定EventRunner,类似Android中的Looper轮训器,所以调度器可以绑定在非主线程中
     */
    public static Scheduler from(EventRunner eventRunner) {
        //判空
        if (eventRunner == null) throw new NullPointerException("looper == null");
        //根据配置,创建对应的主线程调度器
        return new EventHandlerScheduler(new EventHandlerScheduler.WithIdEventHandler(eventRunner));
    }
}
  • EventHandlerScheduler
/**
 * Harmony主线程调度器,内部通过EventHandler,发送消息到主线程,对要执行的异步任务进行处理,实现线程切换
 */
public class EventHandlerScheduler extends Scheduler {
    private final WithIdEventHandler eventHandler;

    /**
     * 构造方法,保存传入的EventHandler实例
     */
    EventHandlerScheduler(WithIdEventHandler eventHandler) {
        this.eventHandler = eventHandler;
    }

    /**
     * 调度方法
     *
     * @param run   要执行的异步任务
     * @param delay 延时时间
     * @param unit  时间单位
     * @return 返回Disposable实例,被对任务进行取消
     */
    @NonNull
    @Override
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        //判空
        if (run == null) throw new NullPointerException("run == null");
        if (unit == null) throw new NullPointerException("unit == null");

        //执行hook回调函数,如果没有设置,会返回传入的Runnable实例
        run = RxJavaPlugins.onSchedule(run);

        //对异步任务进行包装,ScheduledRunnable实现了Runnable和Disposable接口,可以对异步任务进行取消
        ScheduledRunnable scheduled = new ScheduledRunnable(eventHandler, run);

        //获取一个InnerEvent消息
        InnerEvent innerEvent = InnerEvent.get(scheduled);
        //发送消息到主线程
        eventHandler.sendEvent(innerEvent, unit.toMillis(delay));
        //返回任务,外部可以对该任务进行取消
        return scheduled;
    }

    /**
     * 创建Worker实例,会创建HandlerWorker实例并返回
     */
    @NonNull
    @Override
    public Worker createWorker() {
        return new EventHandlerWorker(eventHandler);
    }

    /**
     * Worker子类
     */
    private static final class EventHandlerWorker extends Worker {
        private final WithIdEventHandler handler;

        /**
         * 是否被切断标志位
         */
        private volatile boolean disposed;

        /**
         * 参数生成器,用于在 dispose() 方法中,移除该Worker调度的任务
         */
        private static final AtomicLong paramsCreator = new AtomicLong();
        /**
         * 参数
         */
        private final long params;

        EventHandlerWorker(WithIdEventHandler eventHandler) {
            this.handler = eventHandler;
            params = paramsCreator.incrementAndGet();
        }

        @Override
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            //判空
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");

            //如果被切断,直接返回
            if (disposed) {
                return Disposables.disposed();
            }

            //执行hook函数,如果没有设置,则会返回传入的Runnable对象
            run = RxJavaPlugins.onSchedule(run);

            //对异步任务进行包装
            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

            //获取一个InnerEvent消息
            InnerEvent innerEvent = InnerEvent.get(scheduled);
            //保存异步任务到事件中,在EventHandler的processEvent()回调时,进行处理
            innerEvent.object = this;
            //保存Worker的参数到消息,在dispose()中,移除消息
            innerEvent.param = params;

            //发送消息到主线程进行执行
            handler.sendEvent(innerEvent, unit.toMillis(delay));

            //再次检查是否被切断,如果被切断,则取消任务,直接返回
            if (disposed) {
                handler.removeTask(scheduled);
                return Disposables.disposed();
            }
            return scheduled;
        }

        @Override
        public void dispose() {
            //被切断了,记录标志位,移除任务
            disposed = true;
            //通过 param 参数移除该 Worker 调度,单未执行的 InnerEvent
            handler.removeEvent(handler.id, params);
        }

        @Override
        public boolean isDisposed() {
            //返回切断标志位
            return disposed;
        }
    }

    /**
     * 对异步任务进行包装,实现Disposable接口,提供可取消任务的功能
     */
    private static final class ScheduledRunnable implements Runnable, Disposable {
        private final EventHandler handler;
        private final Runnable delegate;

        /**
         * 切断标志
         */
        private volatile boolean disposed;

        ScheduledRunnable(EventHandler handler, Runnable delegate) {
            this.handler = handler;
            this.delegate = delegate;
        }

        @Override
        public void run() {
            try {
                //执行异步任务
                delegate.run();
            } catch (Throwable t) {
                //如果出现异常,交给hook回调函数处理
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public void dispose() {
            //被切断,移除任务
            handler.removeTask(this);
            disposed = true;
        }

        @Override
        public boolean isDisposed() {
            //返回是否被切断
            return disposed;
        }
    }

    /**
     * EventHandler 带一个消息Id
     */
    static class WithIdEventHandler extends EventHandler {
        private static final AtomicInteger idCreator = new AtomicInteger();
        /**
         * 每个EventHandler都配置一个id,通过这个EventHandler发送的事件,它的eventId都统一为这个id
         */
        private final int id;

        public WithIdEventHandler(EventRunner runner) throws IllegalArgumentException {
            super(runner);
            //生成Id
            id = idCreator.incrementAndGet();
        }

        @Override
        protected void processEvent(InnerEvent event) {
            super.processEvent(event);
            //非当前Handler的事件不处理
            if (event.eventId != id) {
                return;
            }
            Object obj = event.object;
            if (obj instanceof Runnable) {
                ((Runnable) obj).run();
            }
        }
    }
}

MainThreadDisposable

public abstract class MainThreadDisposable implements Disposable {
    //工具方法,可以检查是否是主线程,非主线程会抛出一个IllegalStateException异常
    public static void verifyMainThread() {
        if (EventRunner.current() != EventRunner.getMainEventRunner()) {
            throw new IllegalStateException(
                "Expected to be called on the main thread but was " + Thread.currentThread().getName());
        }
    }

    //原子变量,保证多线程安全
    private final AtomicBoolean unsubscribed = new AtomicBoolean();

    @Override
    public final boolean isDisposed() {
        //返回是否切换
        return unsubscribed.get();
    }

    @Override
    public final void dispose() {
        //被切断,确保只能切断一次,原子标志只能设置一次,从false变true
        if (unsubscribed.compareAndSet(false, true)) {
            //主线程,直接切断,非主线程则通过主线程调度器发送任务进行切断
            if (EventRunner.current() == EventRunner.getMainEventRunner()) {
                onDispose();
            } else {
                //非主线程,通过调度器,在主线程中切断
                HarmonySchedulers.mainThread().scheduleDirect(this::onDispose);
            }
        }
    }

    protected abstract void onDispose();
}

相关文章

网友评论

    本文标题:Harmony鸿蒙 专属RxHarmony

    本文链接:https://www.haomeiwen.com/subject/orcapltx.html