美文网首页
RxJava 线程的调度

RxJava 线程的调度

作者: 翻滚吧王咸鱼 | 来源:发表于2018-06-08 10:44 被阅读0次

1.Android为例, 一个Activity的所有动作默认都是在主线程中运行的, 所有耗时操作要在线程实现. 所以在开发的时候,经常来来切换线程. 导致项目的代码的难以维护. 而Rxjava 很好的解决这个线程切换.
在RxJava 在切换线程时用到了两个方法 subscribeOn() 和 observeOn() 下面来分别解释一下这两个方法

subscribeOn:

影响的是最开始的被观察者所在的线程。当使用多个 subscribeOn() 的时候,只有第一个 subscribeOn() 起作用.
observeOn:
影响的是跟在后面的操作(指定观察者运行的线程)。所以如果想要多次改变线程,可以多次使用 ####observeOn;

默认情况下, 事件序列操作的线程与调用.subscribe()的线程一致

写一个栗子来实践一下.

    //创建一个被观察者
        Observable<Integer> integerObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d("---->", "我在什么线程" + Thread.currentThread().getName());
                Log.d("---->", "发射数据" + 1);
                emitter.onNext(1);

            }
        });

            //创建观察者

        Observer<Integer>   observer =      new    Observer<Integer>(){
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                Log.d("---->", "我在什么线程" + Thread.currentThread().getName());

                Log.d("---->","结果"+integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        };

        //创建联系
        integerObservable.subscribe(observer);

在主线程中分别创建被观察者和观察者, 然后将他们连接在一起, 同时分别打印出它们所在的线程, 运行结果为:

06-01 20:13:40.100 1441-1441/view.dome.com.rxjavadome D/---->: 我在什么线程main
    发射数据1
    我在什么线程main
    结果1

说明默认是在同一个线程工作.
这样肯定是满足不了我们的需求的, 我们更多想要的是这么一种情况, 在子线程中做耗时的操作, 然后回到主线程中来操作UI.

当使用了 RXJava 里面

  //创建联系
        integerObservable
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(observer);

结果就不一样了

06-01 20:25:00.210 1657-1671/view.dome.com.rxjavadome D/---->: 我在什么线程RxNewThreadScheduler-1
    发射数据1
06-01 20:25:00.330 1657-1657/view.dome.com.rxjavadome D/---->: 我在什么线程main
    结果1

发现 第一次进那个是新的线程,而结果是的数据在还在主线,
当我在subscribeOn 里面多次切换线程, observeOn 在里面也多次切换线程会是怎么样.

   //创建联系
        integerObservable
                .subscribeOn(Schedulers.newThread())
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .observeOn(Schedulers.io())
                .subscribe(observer);

结果:

06-01 20:38:23.930 1756-1771/view.dome.com.rxjavadome D/---->: 我在什么线程RxNewThreadScheduler-1
    发射数据1
06-01 20:38:24.070 1756-1772/view.dome.com.rxjavadome D/---->: 我在什么线程RxCachedThreadScheduler-2
    结果1

发现 只是最后的一个线程起作用了,

当我们把observeOn 里面的线程位置换一下,看一下

  integerObservable
                .subscribeOn(Schedulers.newThread())
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())

                .subscribe(observer);

结果

06-01 20:41:42.510 1823-1838/view.dome.com.rxjavadome D/---->: 我在什么线程RxNewThreadScheduler-1
    发射数据1
06-01 20:41:42.630 1823-1823/view.dome.com.rxjavadome D/---->: 我在什么线程main
    结果1

Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作
Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作
Schedulers.newThread() 代表一个常规的新线程
AndroidSchedulers.mainThread() 代表Android的主线程

在Retrofit2.0 跟RxJava 结合使用.

  1. gradle 的依赖添加
       // Android 支持 Rxjava
    implementation 'io.reactivex.rxjava2:rxjava:2.1.14'
    implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
    // Retrofit库
    implementation 'com.squareup.retrofit2:retrofit:2.4.0'
    // Okhttp库
    implementation 'com.squareup.okhttp3:okhttp:3.10.0'
    //Gson解析
    implementation'com.squareup.retrofit2:converter-gson:2.4.0'
    // 此处一定要注意使用RxJava2的版本
    implementation 'com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0'
    //Log
    implementation 'com.squareup.okhttp3:logging-interceptor:3.9.0'

2.数据结构.

{
    "status":1,
    "content":{
        "from":"en-EU",
        "to":"zh-CN",
        "vendor":"tencent",
        "out":"嗨世界",
        "ciba_use":"来自机器翻译。",
        "ciba_out":"",
        "err_no":0
    }
}

编写 数据Bean

package view.dome.com.rxjavadome.bean;


/**
 * 数据bean
 */
public class Translation {
    
    /**
     * status : 1
     * content : {"from":"en-EU","to":"zh-CN","vendor":"tencent","out":"嗨世界","ciba_use":"来自机器翻译。","ciba_out":"","err_no":0}
     */

    private int status;
    private ContentBean content;
    
    public int getStatus() {
        return status;
    }

    public void setStatus(int status) {
        this.status = status;
    }

    public ContentBean getContent() {
        return content;
    }

    public void setContent(ContentBean content) {
        this.content = content;
    }


    public static class ContentBean {
        /**
         * from : en-EU
         * to : zh-CN
         * vendor : tencent
         * out : 嗨世界
         * ciba_use : 来自机器翻译。
         * ciba_out :
         * err_no : 0
         */

        private String from;
        private String to;
        private String vendor;
        private String out;
        private String ciba_use;
        private String ciba_out;
        private int err_no;

        public String getFrom() {
            return from;
        }

        public void setFrom(String from) {
            this.from = from;
        }

        public String getTo() {
            return to;
        }

        public void setTo(String to) {
            this.to = to;
        }

        public String getVendor() {
            return vendor;
        }

        public void setVendor(String vendor) {
            this.vendor = vendor;
        }

        public String getOut() {
            return out;
        }

        public void setOut(String out) {
            this.out = out;
        }

        public String getCiba_use() {
            return ciba_use;
        }

        public void setCiba_use(String ciba_use) {
            this.ciba_use = ciba_use;
        }

        public String getCiba_out() {
            return ciba_out;
        }

        public void setCiba_out(String ciba_out) {
            this.ciba_out = ciba_out;
        }

        public int getErr_no() {
            return err_no;
        }

        public void setErr_no(int err_no) {
            this.err_no = err_no;
        }
    }
}

3.写Observable<..>接口形式

public interface GetRequest_Interface {
    @GET("ajax.php?a=fy&f=auto&t=auto&w=hi%20world")
    Observable<Translation> getCall();
    // 注解里传入 网络请求 的部分URL地址
    // Retrofit把网络请求的URL分成了两部分:一部分放在Retrofit对象里,另一部分放在网络请求接口里
    // 如果接口里的url是一个完整的网址,那么放在Retrofit对象里的URL可以忽略
    // 采用Observable<...>接口
    // getCall()是接受网络请求数据的方法
}

4.编写Retrofit跟Rx结合使用

 //定制OkHttp
        OkHttpClient.Builder httpClientBuilder = new OkHttpClient
                .Builder();
        if (BuildConfig.DEBUG) {//发布版本不再打印
            // 日志显示级别
            HttpLoggingInterceptor.Level level= HttpLoggingInterceptor.Level.BODY;
            //新建log拦截器
            HttpLoggingInterceptor loggingInterceptor=new HttpLoggingInterceptor(new HttpLoggingInterceptor.Logger() {
                @Override
                public void log(String message) {
                Log.e("Tag",message);
                }
            });
            loggingInterceptor.setLevel(level);
            //OkHttp进行添加拦截器loggingInterceptor
            httpClientBuilder.addInterceptor(loggingInterceptor);

        }
        OkHttpClient client = httpClientBuilder.build();
        Retrofit retrofit = new Retrofit.Builder() //创建retrofit
                .baseUrl("http://fy.iciba.com/") // 设置 网络请求 Url
                .addConverterFactory(GsonConverterFactory.create()) //设置使用Gson解析(记得加入依赖)
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create()) // 支持RxJava
                .client(client) //添加拦截器
                .build();

        //  创建 网络请求接口 的实例
        GetRequest_Interface request = retrofit.create(GetRequest_Interface.class);
        //采用Observable<...>形式 对 网络请求 进行封装
        Observable<Translation> observable = request.getCall();

        observable.subscribeOn(Schedulers.io()) //在子线程请求网络
                .observeOn(AndroidSchedulers.mainThread()) //在主线更新数据
                .subscribe(new Observer<Translation>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d("----->", "开始采用subscribe连接");
                    }

                    @Override
                    public void onNext(Translation translation) {
                        Log.d("----->", "对返回的数据进行处理");

                        if (translation.getStatus()==1){   //成功

                            Translation.ContentBean content = translation.getContent();
                            if (content!=null){
                                String ciba_out = content.getOut();
                                Toast.makeText(MainActivity.this,ciba_out,Toast.LENGTH_SHORT).show();
                            }
                        }else {
                            Toast.makeText(MainActivity.this,"请求失败",Toast.LENGTH_SHORT).show();

                        }

                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d("----->", "请求失败"+e.toString());

                    }

                    @Override
                    public void onComplete() {
                        Log.d("----->", "请求成功");
                    }
                });

相关文章

网友评论

      本文标题:RxJava 线程的调度

      本文链接:https://www.haomeiwen.com/subject/mowxsftx.html