美文网首页
Rxjava简单学习

Rxjava简单学习

作者: 番茄tomato | 来源:发表于2019-10-10 18:26 被阅读0次

看简书上大神的文章总算是理解了rxjava
链接:https://www.jianshu.com/p/464fa025229e
总共写了十篇文章
目前看到第四篇 根据每篇都写了一个test代码 备注后边加上了对于的哪篇
其中还有网络请求的代码 api接口和bean 可以看看我的文章
https://www.jianshu.com/p/1ae93acffa64
都是用的同样的

import android.util.Log;
import org.reactivestreams.Subscriber;
import java.util.ArrayList;
import java.util.List;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.ObservableSource;
import io.reactivex.Observer;
import io.reactivex.Scheduler;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;
import retrofit2.Retrofit;
import retrofit2.adapter.rxjava2.RxJava2CallAdapterFactory;
import retrofit2.converter.gson.GsonConverterFactory;
import swust.yuqiaodan.tomatoapp.MyRxjavaStudy.service.APIService;
import swust.yuqiaodan.tomatoapp.MyRxjavaStudy.service.WeatherBean;
// https://www.jianshu.com/p/464fa025229e
public class RxjavaTest {
    public static void main(String[] args) {
        test6();
    }

    //最简单的使用<一>
    public static void test1() {
        //最简单的使用

        //建立观察者
        Observer<String> observer = new Observer<String>() {
            //开始订阅被观察者的动作
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }

            //执行完一次操作
            @Override
            public void onNext(String s) {
                System.out.println("Item: " + s);
            }

            //出错
            @Override
            public void onError(Throwable e) {
                System.out.println("Error");
            }

            //完成所有操作
            @Override
            public void onComplete() {
                System.out.println("Complete");
            }


        };

/**************建立被观察者  三种方式 *****************/

        //第一种 正常方式
        Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {

                //被观察者 我需要在这里的订阅时间中做些事情
                //比如在这里发出网络请求 最终得到一个数据比如str="Hello"
                //然后在onNext中吧这个str发送给观察者 观察者可以在主线程进行下一步处理

                emitter.onNext("Hello");
                emitter.onNext("My");
                emitter.onNext("Rxjava");
                //这里发出已经完成所有操作
                emitter.onComplete();
            }
        });

        //第二种 just简化方式  将传入的参数依次发送出来
        //Observable observable = Observable.just("Hello", "Hi", "Aloha");

        //第三种 fromArray简化方式 将传入的数组或 Iterable 拆分成具体对象后,依次发出来
/*        String[] words = {"Hello", "Hi", "Aloha"};
        Observable observable = Observable.fromArray(words);*/

        //建立订阅关系
        observable.subscribe(observer);
    }

    //将Test1中的代码精简化<一>
    public static void test2() {
        //将Test1中的代码精简化
        String[] words = {"Hello", "Hi", "Aloha"};
        Observable.fromArray(words).subscribe(new Observer<String>() {
            //开始订阅被观察者的动作
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }

            //执行完一次操作
            @Override
            public void onNext(String s) {
                System.out.println("Item: " + s);

            }

            //出错
            @Override
            public void onError(Throwable e) {
                System.out.println("Error");
            }

            //完成所有操作
            @Override
            public void onComplete() {
                System.out.println("Complete");
            }
        });


    }

    //在test2中加入线程调度Scheduler<二>
    public static void test3() {
        Observable.just("Hello", "Hi", "Aloha")
                .subscribeOn(Schedulers.io())// 指定 subscribe() 发生在 IO 线程
                .observeOn(AndroidSchedulers.mainThread())// 指定 Subscriber 的回调发生在主线程
                .subscribe(new Observer<String>() {
                    //开始订阅被观察者的动作
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("onSubscribe");
                    }

                    //执行完一次操作
                    @Override
                    public void onNext(String s) {
                        System.out.println("Item: " + s);
                    }

                    //出错
                    @Override
                    public void onError(Throwable e) {
                        System.out.println("Error");
                    }

                    //完成所有操作
                    @Override
                    public void onComplete() {
                        System.out.println("Complete");
                    }
                });

    }

    //在test3的基础上加入map的使用<三>
    public static void test4() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                //比如我这里说明了是将int转化为String
                //我需要在这里边进行相应的处理 把被观察者传递过来的对象转换为我想要的对象
                String str = "This is result " + String.valueOf(integer);
                return str;
            }
        }).subscribe(new Consumer<String>() {//《一》 Consumer也是一个观察者对象 但是它只在乎上游的noNext方法 不会在乎错误处理等等
            @Override
            public void accept(String s) throws Exception {

                System.out.println(s);
            }
        });
    }

    //在test3的基础上 结合retrofit的使用<二>
    public static void test5() {
        Retrofit retrofit = new Retrofit
                .Builder()
                .baseUrl("http://apis.juhe.cn/")
                .addConverterFactory(GsonConverterFactory.create())
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create())//加入rxjava适配
                .build();

        retrofit.create(APIService.class)
                .getWeather("成都", "19fd22d1c21e46438c7e62f87087e2ef")
                .subscribe(new Observer<WeatherBean>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(WeatherBean weatherBean) {
                        System.out.println(weatherBean.getResult().getCity());
                        System.out.println(weatherBean.getResult().getRealtime().getInfo());
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("请求错误");
                    }

                    @Override
                    public void onComplete() {

                    }
                });


    }

    //在test4的基础上学习FlatMap的使用<三>
    public static void test6() {

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
        }).flatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                final List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("I am value " + integer);
                }
                    return Observable.fromIterable(list);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });
    }

}

Rxjava单独使用

1.Rxjava简单的开启一个异步线程
                    Observable.create(new Observable.OnSubscribe<Object>() {
                        @Override
                        public void call(Subscriber<? super Object> subscriber) {

                            /**
                             * 在这里可以完成任何异步操作
                             *
                             * 如果不需要回调的话 就只管操作完成就可以了
                             *
                             * 但是需要回调的话
                             * 可以在完成某操作后使用subscriber.onNext或者onError等等
                             * 来进行回调
                             *
                             * */

                        }//.subscribeOn可以指定上边的操作在任何线程进行 
                        // 比如耗时的文件读取就在io,但是要显示dialog则需要在main线程
                        //可以从任何线程中开启异步线程
                    }).subscribeOn(Schedulers.io())
                            .subscribe();//如果不需要回调可以不设置订阅者
2. Rxjava简单计时器
import java.util.concurrent.TimeUnit;
import io.reactivex.Observable;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.functions.Consumer;
    //延迟五秒后隐藏界面提示
    public void hintDismissByTime() {
        Observable.timer(5, TimeUnit.SECONDS)//计时5秒
                .observeOn(AndroidSchedulers.mainThread())//订阅在主线程 因为后续要对界面进行操作(也可以在其他线程)
                .subscribe(
                        new Consumer<Long>() {
                            @Override
                            public void accept(Long aLong) throws Exception {
//如果订阅在主线程 则可以 操作界面控件等等
                                //collectQueryResult.setVisibility(View.INVISIBLE);
                            }
                        }

                );
    }

相关文章

网友评论

      本文标题:Rxjava简单学习

      本文链接:https://www.haomeiwen.com/subject/jlbapctx.html