再续RxBus--RxJava实现事件总线

作者: 噬魂Miss | 来源:发表于2017-04-27 10:41 被阅读363次
    大家好,我叫石头

    前言

    事件总线出现的原因:为了使组件之间的通信变得简单,深度解耦!
    说白了就是切断组件之间的直接联系,采用 发布/订阅 的模式(观察者模式


    相信我们很多人都用过EventBus或者Otto来作为我们APP中的事件总线,所以我们会有这样的困惑,RxBus真的能替代EventBus吗?
    那接下来我们就先来分析分析下:

    This project is deprecated in favor of RxJava and RxAndroid. These projects permit the same event-driven programming model as Otto, but they’re more capable and offer better control of threading.
    该项目已被RxJavaRxAndroid取代。Rx类项目允许与Otto类似的事件驱动编程模型,而且能力更强,操作线程更方便。

    Otto已经停止开发了,所以我们只需对比EventBus和RxBus了。

    对于EventBus和RxBus的比较我们要先明白 一个完美的事件总线应该具备哪些功能?

    • 容易订阅事件:事件订阅者只要声明自己就好了,当事件发生时自然会被调到。订阅和取消可以方便绑定到Activity和Fragment的生命周期上。

    • 容易发送事件:事件发送者直接发送就好了,其他的事都不管。

    • 方便的切换线程:有些事必须主线程干,有些事必须非主线程干,所以这个还是要说清楚。

    • 性能:随着应用的成长,总线可能会被重度使用,性能一定要好。

    纠结到底是用EventBus还是RxBus的朋友可以参考这篇文章--RxBus真的能替代EventBus吗?


    接下来我们就开始RxBus之旅了---------------

    一、添加RxJava和RxAndroid依赖

    //RxJava and RxAndroid
    compile 'io.reactivex:rxandroid:1.1.0'
    compile 'io.reactivex:rxjava:1.1.0'
    

    顺便说下,我们是用的rxjava1.X的版本,现在也有了rxjava2.x的版本,他们之间有些区别,感兴趣的朋友可以去看看。

    二、建立RxBus类

    import java.util.HashMap;
    
    import rx.Observable;
    import rx.Subscription;
    import rx.android.schedulers.AndroidSchedulers;
    import rx.functions.Action1;
    import rx.schedulers.Schedulers;
    import rx.subjects.PublishSubject;
    import rx.subjects.SerializedSubject;
    import rx.subscriptions.CompositeSubscription;
    
    /**
     * Created by shitou on 2017/4/26.
     */
    
    public class RxBus {
        private static volatile RxBus mInstance;
         /**
         * PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者
         */
        private SerializedSubject<Object, Object> mSubject;
        private HashMap<String, CompositeSubscription> mSubscriptionMap;
    
        private RxBus() {
            mSubject = new SerializedSubject<>(PublishSubject.create());
        }
    
        public static RxBus getInstance() {
            if (mInstance == null) {
                synchronized (RxBus.class) {
                    if (mInstance == null) {
                        mInstance = new RxBus();
                    }
                }
            }
            return mInstance;
        }
    
        /**
         * 发送事件
         */
        public void post(Object o) {
            mSubject.onNext(o);
        }
    
        /**
         * 是否已有观察者订阅
         */
        public boolean hasObservers() {
            return mSubject.hasObservers();
        }
    
        /**
         * 一个默认的订阅方法
         */
        public <T> Subscription doSubscribe(Class<T> type, Action1<T> next, Action1<Throwable> error) {
            return toObservable(type)
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(next, error);
        }
    
        /**
         * 返回指定类型的Observable实例
         */
        public <T> Observable<T> toObservable(final Class<T> type) {
            return mSubject.ofType(type);
        }
    
        /**
         * 保存订阅后的subscription
         */
        public void addSubscription(Object o, Subscription subscription) {
            if (mSubscriptionMap == null) {
                mSubscriptionMap = new HashMap<>();
            }
            String key = o.getClass().getName();
            if (mSubscriptionMap.get(key) != null) {
                mSubscriptionMap.get(key).add(subscription);
            } else {
                CompositeSubscription compositeSubscription = new CompositeSubscription();
                compositeSubscription.add(subscription);
                mSubscriptionMap.put(key, compositeSubscription);
            }
        }
    
        /**
         * 取消订阅
         */
        public void unSubscribe(Object o) {
            if (mSubscriptionMap == null) {
                return;
            }
    
            String key = o.getClass().getName();
            if (!mSubscriptionMap.containsKey(key)){
                return;
            }
            if (mSubscriptionMap.get(key) != null) {
                mSubscriptionMap.get(key).unsubscribe();
            }
    
            mSubscriptionMap.remove(key);
        }
    }
    

    在RxJava中有个Subject类,它继承Observable类,同时实现了Observer接口,因此Subject可以同时担当订阅者和被订阅者的角色,这里我们使用Subject的子类PublishSubject来创建一个Subject对象(PublishSubject只有被订阅后才会把接收到的事件立刻发送给订阅者
    Rxjava中,订阅操作会返回一个Subscription对象,以便在合适的时机取消订阅,防止内存泄漏,如果一个类产生多个Subscription对象,我们可以用一个CompositeSubscription存储起来,以进行批量的取消订阅。

    由于Subject类是非线程安全的,所以我们通过它的子类SerializedSubjectPublishSubject转换成一个线程安全的Subject对象。

    public <T> Observable<T> toObservable(final Class<T> type) {
            return mSubject.ofType(type);
        }
    

    ofType()方法能过滤掉不符合条件的事件类型(比如你的type是EventType1.class,那么就只能输出EventType1.class的类型),然后将满足条件的事件类型通过cast()方法,转换成对应类型的Observable对象,这是在源码中转换的。

    /**
     * 一个默认的订阅方法
     */
        public <T> Subscription doSubscribe(Class<T> type, Action1<T> next, Action1<Throwable> error) {
            return toObservable(type)
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(next, error);
        }
    

    上面的方法封装了订阅方法,并且指定了执行线程,我们只需要传入type(事件类型)next(成功的Action1)error(错误的Action1),其实你也可以根据你自己的需要封装自己的doSubscribe方法,来简化代码。

    在需要发送事件的地方调用post()方法,它间接的通过mSubject.onNext(o);将事件发送给订阅者。
    同时RxBus提供了addSubscription()unSubscribe()方法,分别来保存订阅时返回的`Subscription对象,以及取消订阅。

    实战一

    主线程(UI线程)发送String类型的事件

    button点击事件代码

    mButton1 = (Button) findViewById(R.id.button);
    mButton1.setOnClickListener(new View.OnClickListener() {
         @Override
         public void onClick(View v) {
             //在主线程中发送String类型的事件
             RxBus.getInstance().post("hello RxBus!");
         }
    });
    

    onCreate中实现下面代码

    Subscription subscription = RxBus.getInstance()
                    .toObservable(String.class)  
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Action1<String>() {
                        @Override
                        public void call(String s) {
                            mTextView.setText("接收的事件内容"+s);
                        }
                    }, new Action1<Throwable>() {
                        @Override
                        public void call(Throwable throwable) {
                            Log.e(TAG, "error");
                        }
                    });
    

    之后我们可以把subscription对象保存到HashMap<String, CompositeSubscription>集合中去。

     RxBus.getInstance().addSubscription(this,subscription);
    

    这样当我们点击button时,textview就收到了消息。

    最后,一定要记得在生命周期结束的地方取消订阅事件,防止RxJava可能会引起的内存泄漏问题。

    protected void onDestroy() {
            super.onDestroy();
            RxBus.getInstance().unSubscribe(this);
        }
    

    实战二

    在子线程中发送Integer类型的事件

    button点击事件代码

    mButton2 = (Button) findViewById(R.id.button2);
    mButton2.setOnClickListener(new View.OnClickListener() {
                @Override
                public void onClick(View v) {
                    new Thread(new Runnable() {
                        @Override
                        public void run() {
                            RxBus.getInstance().post(1234);
                        }
                    }).start();
                }
            });
    

    在onCreate中实现下面代码

    Subscription subscription1 = RxBus.getInstance()
                    .doSubscribe(Integer.class, new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            mTextView.setText("接收的事件内容"+integer);
                        }
                    }, new Action1<Throwable>() {
                        @Override
                        public void call(Throwable throwable) {
                            Log.e(TAG, "error");
                        }
                    });
    

    之后我们可以把subscription对象保存到HashMap<String, CompositeSubscription>集合中去。

    RxBus.getInstance().addSubscription(this,subscription1);
    

    最后,一定要记得在生命周期结束的地方取消订阅事件,防止RxJava可能会引起的内存泄漏问题。

    protected void onDestroy() {
            super.onDestroy();
            RxBus.getInstance().unSubscribe(this);
        }
    

    上面的都是发送的基本数据类型,那么我们能不能发送自己封装的类型呢?答案是:肯定行的!

    实战三

    创建你要发送的事件类

    下面我们来创建一个学生类:StudentEvent

    public class StudentEvent {
        private String id;
        private String name;
    
        public StudentEvent(String id, String name) {
            this.id = id;
            this.name = name;
        }
    
        public String getId() {
            return id;
        }
    
        public void setId(String id) {
            this.id = id;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    }
    

    发送事件

    RxBus.getInstance().post(new StudentEvent("110","小明"));
    

    注册和接收事件

    Subscription subscription2 = RxBus.getInstance()
                    .toObservable(StudentEvent.class)
                    .observeOn(Schedulers.io())
                    .subscribeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Action1<StudentEvent>() {
                        @Override
                        public void call(StudentEvent studentEvent) {
                            String id = studentEvent.getId();
                            String name = studentEvent.getName();
                            mTextView.setText("学生的id:"+id+" 名字:"+name);
                        }
                    }, new Action1<Throwable>() {
                        @Override
                        public void call(Throwable throwable) {
    
                        }
                    });
    

    最后,一定要记得在生命周期结束的地方取消订阅事件,防止RxJava可能会引起的内存泄漏问题。

    protected void onDestroy() {
            super.onDestroy();
            RxBus.getInstance().unSubscribe(this);
        }
    

    实战四

    广播中发送事件,订阅方式按照实战一的方式。
    定义一个检测网络状态的广播:

    public class NetworkChangeReceiver extends BroadcastReceiver {
        @Override
        public void onReceive(Context context, Intent intent) {
            ConnectivityManager manager = (ConnectivityManager) context.getSystemService(Context.CONNECTIVITY_SERVICE);
            NetworkInfo networkInfo = manager.getActiveNetworkInfo();
            if (networkInfo != null && networkInfo.isAvailable()) {
                RxBus.getInstance().post("网络连接成功");
            } else {
                RxBus.getInstance().post("网络不可用");
            }
        }
    }
    

    在网络可用与不可用时发送提示事件,然后在onCreate()方法中注册广播:

    private void registerReceiver() {
            IntentFilter intentFilter = new IntentFilter();
            intentFilter.addAction("android.net.conn.CONNECTIVITY_CHANGE");
            mReceiver = new NetworkChangeReceiver();
            registerReceiver(mReceiver, intentFilter);
        }
    

    最后不要忘了在onDestory()中对广播进行取消注册,以及取消订阅。

    protected void onDestroy() {
            super.onDestroy();
            unregisterReceiver(mReceiver);
            RxBus.getInstance().unSubscribe(this);
    }
    

    到这里我们实现了几种事件传送,但是细心的童鞋可能发现我们在上面的例子中都是先订阅 事件,然后发送 事件(因为我们是用的PublishSubject,PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者,这在前面我们提到过),如果我们反过来,先发送了事件,再进行订阅操作,怎么保证发送的事件不丢失呢?也就是EventBus中的StickyEven功能。RxBus--支持Sticky事件里面讲解了Subject的4种实现,有兴趣的朋友可以去看看。

    最后推荐一些RxJava的学习资源:RxJava入门给 Android 开发者的 RxJava 详解

    相关文章

      网友评论

      • 壞蛋:这个写法不就是直接调用接口,,,,当调用到onNext的时候,把所有的接口都拿出来,然后遍历一遍吗。。。。。。跟rxjava有什么关系呢?
      • apkcore:建议支持到rxjava2,毕竟迟早要升上来
        噬魂Miss:@Trash :grin: 过段时间写一篇关于Rxjava2的版本

      本文标题:再续RxBus--RxJava实现事件总线

      本文链接:https://www.haomeiwen.com/subject/hzdizttx.html