美文网首页Android知识@IT·互联网程序员
RxBus学习之旅--从入门到提高

RxBus学习之旅--从入门到提高

作者: sugaryaruan | 来源:发表于2016-08-13 10:57 被阅读2084次

    在公司的技术分享会上,做了关于RxBus的学习分享,记录如下:

    一.RxBus与RxJava

    一次RxJava调用过程可以划分为以下环节:

    • 创建观察内容 (片段1)
    • 数据处理/映射(片段2)
    • 选择线程(片段3)
    • 订阅(片段4,片段5)
    • 完成/错误处理(片段6)

    示例代码:

    Observable
                    // 片段1
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        subscriber.onStart();
                        String trim = mainEd.getText().toString().trim();
                        subscriber.onNext(trim);
                        subscriber.onError(new Throwable());
                        subscriber.onCompleted();
                    }
                }) 
                // 片段2
                .map(new Func1<String, String>() {
                    @Override
                    public String call(String s) {
                        return s + " sugarya";
                    }
                })
                // 片段3
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())  
                // 片段4
                .subscribe(
                        // 片段5
                        new Subscriber<String>() {
    
                    @Override
                    public void onStart() {
                        super.onStart();
                    }
    
                    //片段6
                    @Override
                    public void onCompleted() {
    
                    }
    
                    @Override
                    public void onError(Throwable e) {
    
                    }
    
                    @Override
                    public void onNext(String s) {
                        mainTv2.setText(s);
                    }
                });
    

    二.RxBus与接口回调

    一次完整的接口回调,包括四个步骤:

    • 接口定义
    • 接口调用
    • 接口实现
    • 接口注入

    RxBus的使用过程,就是一个接口回调的过程。

    接口定义,在RxJava定义好了。

    上面示例代码片段1里的suscriber.onNext(),onStart(),onError(),onComplete()对应接口调用。

    代码片段5,片段6这些是接口实现

    注入的过程是调用suscribe()方法订阅的过程

    三.RxBus源码分析

    RxBus的代码实现如下:

    public class RxBus {
    
    private static volatile RxBus instance;
    private final Subject<Object, Object> _bus;
    
    
    private RxBus() {
        _bus = new SerializedSubject<>(PublishSubject.create());
    }
    
    public static RxBus getInstance() {
        if (null == instance) {
            synchronized (RxBus.class) {
                if (null == instance) {
                    instance = new RxBus();
                }
            }
        }
        return instance;
    }
    
    public void send(Object object) {
        try{
            _bus.onNext(object);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    
    public boolean hasObservers() {
        return _bus.hasObservers();
    }
    
    
    
    private <T> Observable<T> toObservable(final Class<T> type) {
        return _bus.ofType(type);//filter + cast
    }
    
    
    
    
    
    
    
    public <T> Subscription toSubscription(final Class<T> type, Observer<T> observer) {
        return toObservable(type).subscribe(observer);
    }
    
    public <T> Subscription toSubscription(final Class<T> type, Action1<T> action1) {
        return toObservable(type).subscribe(action1);
    }
    
    public <T> Subscription toSubscription(final Class<T> type, Action1<T> action1, Action1<Throwable> errorAction1) {
        return toObservable(type).subscribe(action1,errorAction1);
    }
    }
    

    接下来对上述代码做些简要分析:

    volatile

    • 保证instance可见性
    • 禁止指令重排

    这里涉及到Java内存模型,相关资料: 传送门

    SerializedSubject

    • SerializedSubject extends Subject extends Observable implements Observer,既是观察内容,又是观察者,起到桥梁/数据转发的作用

    • 保证多线程安全,Subject 当作一个 Subscriber 使用,从多个线程中调用它的onNext方法(包括其它的on系列方法)

    PublishSubject

    主题,RxJava里有四种主题

    • PublishSubject
    • BehaviorSubject
    • ReplaySubject
    • AsyncSubject

    PublishSubject的含义是:在订阅者订阅的时间点之后的数据发送给观察者

    ofType操作符 = filter操作符 + cast操作符

    • filter只有符合过滤条件的数据才会被“发射”
    • cast将一个Observable转换成指定类型的Observable

    CompositeSubscription

    该对象作为subscription的容器,方便统一取消订阅

    四.RxBus异常处理

    当RxBus在执行过程中,任意环节发生了错误异常,订阅关系就会被取消。之后再次发送,将无法执行订阅后的回调。

    做了一个数组越界的错误来演示

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        ButterKnife.bind(this);
        onRxBus();
    }
    
    
    private void onRxBus() {
        int[] array = new int[2];
        RxBus.getInstance().toSubscription(Integer.class, new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
    
            }
    
            @Override
            public void onError(Throwable e) {
                //onRxBus();
                Log.e(TAG, "mSubscriber17 onError: " + e.toString());
            }
    
            @Override
            public void onNext(Integer integer) {
                array[integer] = integer;
                Log.e(TAG, "mSubscriber17 onNext:" + integer);
            }
        });
    }
    
    @OnClick(R.id.btn_main17)
    void onClick17() {
        RxBus.getInstance().send(2);
    }
    

    这时候点击button按钮,数组越界异常,第二次再点击Button发送消息,没有响应了。这里,RxBus,确切的说是RxJava捕获到错误异常,就会取消订阅关系。

    E/MainActivity: mSubscriber17 onError: java.lang.ArrayIndexOutOfBoundsException: length=2; index=2
    

    解决的思路:

    • 使用try-catch捕获异常,不让异常被RxJava捕获
    • 在onError里重新订阅。

    接下来说说第二种方法,具体怎么操作,其实就是在onError方法里,重新执行一遍订阅,执行上述注释掉的代码onRxBus,就解决问题了。

    五.小结

    上述其实是这次技术分享大纲,技术分享准备功课前,刷了下面的文章,要对RxBus有更细致的学习和了解,可以阅读:(推荐)

    谢三弟系列

    RxBus简单实现

    RxBus深入源码解析

    RxJava里onError异常处理

    Yokey系列

    RxJava实现事件总线

    RxBus异常处理

    RxBus实现Sticky事件(粘性订阅)

    其他

    RxBus从基础实现到升级——打造属于自己的RxBus

    EventBus和RxBus实现和性能比较

    相关文章

      网友评论

        本文标题:RxBus学习之旅--从入门到提高

        本文链接:https://www.haomeiwen.com/subject/pvwesttx.html