美文网首页
RxJava 线程的调度

RxJava 线程的调度

作者: 翻滚吧王咸鱼 | 来源:发表于2018-06-08 10:44 被阅读0次

    1.Android为例, 一个Activity的所有动作默认都是在主线程中运行的, 所有耗时操作要在线程实现. 所以在开发的时候,经常来来切换线程. 导致项目的代码的难以维护. 而Rxjava 很好的解决这个线程切换.
    在RxJava 在切换线程时用到了两个方法 subscribeOn() 和 observeOn() 下面来分别解释一下这两个方法

    subscribeOn:

    影响的是最开始的被观察者所在的线程。当使用多个 subscribeOn() 的时候,只有第一个 subscribeOn() 起作用.
    observeOn:
    影响的是跟在后面的操作(指定观察者运行的线程)。所以如果想要多次改变线程,可以多次使用 ####observeOn;

    默认情况下, 事件序列操作的线程与调用.subscribe()的线程一致

    写一个栗子来实践一下.

        //创建一个被观察者
            Observable<Integer> integerObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    Log.d("---->", "我在什么线程" + Thread.currentThread().getName());
                    Log.d("---->", "发射数据" + 1);
                    emitter.onNext(1);
    
                }
            });
    
                //创建观察者
    
            Observer<Integer>   observer =      new    Observer<Integer>(){
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.d("---->", "我在什么线程" + Thread.currentThread().getName());
    
                    Log.d("---->","结果"+integer);
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            };
    
            //创建联系
            integerObservable.subscribe(observer);
    

    在主线程中分别创建被观察者和观察者, 然后将他们连接在一起, 同时分别打印出它们所在的线程, 运行结果为:

    06-01 20:13:40.100 1441-1441/view.dome.com.rxjavadome D/---->: 我在什么线程main
        发射数据1
        我在什么线程main
        结果1
    

    说明默认是在同一个线程工作.
    这样肯定是满足不了我们的需求的, 我们更多想要的是这么一种情况, 在子线程中做耗时的操作, 然后回到主线程中来操作UI.

    当使用了 RXJava 里面

      //创建联系
            integerObservable
                    .subscribeOn(Schedulers.newThread())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(observer);
    

    结果就不一样了

    06-01 20:25:00.210 1657-1671/view.dome.com.rxjavadome D/---->: 我在什么线程RxNewThreadScheduler-1
        发射数据1
    06-01 20:25:00.330 1657-1657/view.dome.com.rxjavadome D/---->: 我在什么线程main
        结果1
    

    发现 第一次进那个是新的线程,而结果是的数据在还在主线,
    当我在subscribeOn 里面多次切换线程, observeOn 在里面也多次切换线程会是怎么样.

       //创建联系
            integerObservable
                    .subscribeOn(Schedulers.newThread())
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .observeOn(Schedulers.io())
                    .subscribe(observer);
    

    结果:

    06-01 20:38:23.930 1756-1771/view.dome.com.rxjavadome D/---->: 我在什么线程RxNewThreadScheduler-1
        发射数据1
    06-01 20:38:24.070 1756-1772/view.dome.com.rxjavadome D/---->: 我在什么线程RxCachedThreadScheduler-2
        结果1
    

    发现 只是最后的一个线程起作用了,

    当我们把observeOn 里面的线程位置换一下,看一下

      integerObservable
                    .subscribeOn(Schedulers.newThread())
                    .subscribeOn(Schedulers.io())
                    .observeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
    
                    .subscribe(observer);
    

    结果

    06-01 20:41:42.510 1823-1838/view.dome.com.rxjavadome D/---->: 我在什么线程RxNewThreadScheduler-1
        发射数据1
    06-01 20:41:42.630 1823-1823/view.dome.com.rxjavadome D/---->: 我在什么线程main
        结果1
    

    Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作
    Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作
    Schedulers.newThread() 代表一个常规的新线程
    AndroidSchedulers.mainThread() 代表Android的主线程

    在Retrofit2.0 跟RxJava 结合使用.

    1. gradle 的依赖添加
           // Android 支持 Rxjava
        implementation 'io.reactivex.rxjava2:rxjava:2.1.14'
        implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
        // Retrofit库
        implementation 'com.squareup.retrofit2:retrofit:2.4.0'
        // Okhttp库
        implementation 'com.squareup.okhttp3:okhttp:3.10.0'
        //Gson解析
        implementation'com.squareup.retrofit2:converter-gson:2.4.0'
        // 此处一定要注意使用RxJava2的版本
        implementation 'com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0'
        //Log
        implementation 'com.squareup.okhttp3:logging-interceptor:3.9.0'
    

    2.数据结构.

    {
        "status":1,
        "content":{
            "from":"en-EU",
            "to":"zh-CN",
            "vendor":"tencent",
            "out":"嗨世界",
            "ciba_use":"来自机器翻译。",
            "ciba_out":"",
            "err_no":0
        }
    }
    

    编写 数据Bean

    package view.dome.com.rxjavadome.bean;
    
    
    /**
     * 数据bean
     */
    public class Translation {
        
        /**
         * status : 1
         * content : {"from":"en-EU","to":"zh-CN","vendor":"tencent","out":"嗨世界","ciba_use":"来自机器翻译。","ciba_out":"","err_no":0}
         */
    
        private int status;
        private ContentBean content;
        
        public int getStatus() {
            return status;
        }
    
        public void setStatus(int status) {
            this.status = status;
        }
    
        public ContentBean getContent() {
            return content;
        }
    
        public void setContent(ContentBean content) {
            this.content = content;
        }
    
    
        public static class ContentBean {
            /**
             * from : en-EU
             * to : zh-CN
             * vendor : tencent
             * out : 嗨世界
             * ciba_use : 来自机器翻译。
             * ciba_out :
             * err_no : 0
             */
    
            private String from;
            private String to;
            private String vendor;
            private String out;
            private String ciba_use;
            private String ciba_out;
            private int err_no;
    
            public String getFrom() {
                return from;
            }
    
            public void setFrom(String from) {
                this.from = from;
            }
    
            public String getTo() {
                return to;
            }
    
            public void setTo(String to) {
                this.to = to;
            }
    
            public String getVendor() {
                return vendor;
            }
    
            public void setVendor(String vendor) {
                this.vendor = vendor;
            }
    
            public String getOut() {
                return out;
            }
    
            public void setOut(String out) {
                this.out = out;
            }
    
            public String getCiba_use() {
                return ciba_use;
            }
    
            public void setCiba_use(String ciba_use) {
                this.ciba_use = ciba_use;
            }
    
            public String getCiba_out() {
                return ciba_out;
            }
    
            public void setCiba_out(String ciba_out) {
                this.ciba_out = ciba_out;
            }
    
            public int getErr_no() {
                return err_no;
            }
    
            public void setErr_no(int err_no) {
                this.err_no = err_no;
            }
        }
    }
    

    3.写Observable<..>接口形式

    public interface GetRequest_Interface {
        @GET("ajax.php?a=fy&f=auto&t=auto&w=hi%20world")
        Observable<Translation> getCall();
        // 注解里传入 网络请求 的部分URL地址
        // Retrofit把网络请求的URL分成了两部分:一部分放在Retrofit对象里,另一部分放在网络请求接口里
        // 如果接口里的url是一个完整的网址,那么放在Retrofit对象里的URL可以忽略
        // 采用Observable<...>接口
        // getCall()是接受网络请求数据的方法
    }
    
    

    4.编写Retrofit跟Rx结合使用

     //定制OkHttp
            OkHttpClient.Builder httpClientBuilder = new OkHttpClient
                    .Builder();
            if (BuildConfig.DEBUG) {//发布版本不再打印
                // 日志显示级别
                HttpLoggingInterceptor.Level level= HttpLoggingInterceptor.Level.BODY;
                //新建log拦截器
                HttpLoggingInterceptor loggingInterceptor=new HttpLoggingInterceptor(new HttpLoggingInterceptor.Logger() {
                    @Override
                    public void log(String message) {
                    Log.e("Tag",message);
                    }
                });
                loggingInterceptor.setLevel(level);
                //OkHttp进行添加拦截器loggingInterceptor
                httpClientBuilder.addInterceptor(loggingInterceptor);
    
            }
            OkHttpClient client = httpClientBuilder.build();
            Retrofit retrofit = new Retrofit.Builder() //创建retrofit
                    .baseUrl("http://fy.iciba.com/") // 设置 网络请求 Url
                    .addConverterFactory(GsonConverterFactory.create()) //设置使用Gson解析(记得加入依赖)
                    .addCallAdapterFactory(RxJava2CallAdapterFactory.create()) // 支持RxJava
                    .client(client) //添加拦截器
                    .build();
    
            //  创建 网络请求接口 的实例
            GetRequest_Interface request = retrofit.create(GetRequest_Interface.class);
            //采用Observable<...>形式 对 网络请求 进行封装
            Observable<Translation> observable = request.getCall();
    
            observable.subscribeOn(Schedulers.io()) //在子线程请求网络
                    .observeOn(AndroidSchedulers.mainThread()) //在主线更新数据
                    .subscribe(new Observer<Translation>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d("----->", "开始采用subscribe连接");
                        }
    
                        @Override
                        public void onNext(Translation translation) {
                            Log.d("----->", "对返回的数据进行处理");
    
                            if (translation.getStatus()==1){   //成功
    
                                Translation.ContentBean content = translation.getContent();
                                if (content!=null){
                                    String ciba_out = content.getOut();
                                    Toast.makeText(MainActivity.this,ciba_out,Toast.LENGTH_SHORT).show();
                                }
                            }else {
                                Toast.makeText(MainActivity.this,"请求失败",Toast.LENGTH_SHORT).show();
    
                            }
    
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d("----->", "请求失败"+e.toString());
    
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d("----->", "请求成功");
                        }
                    });
    

    相关文章

      网友评论

          本文标题:RxJava 线程的调度

          本文链接:https://www.haomeiwen.com/subject/mowxsftx.html