美文网首页
rxAndroid小结

rxAndroid小结

作者: 源来是你啊 | 来源:发表于2017-08-03 22:48 被阅读0次

部分参考了作者 飞奔的蚂蚁

初识rxAndroid

1.rxAndroid涉及对象:订阅者(观察者),被观察者以及订阅事件


订阅者与观察者

被观察者可以理解为数据源,它可以向订阅者(观察者)发送数据消息和消息,而订阅者则通过被观察者发送的消息做出相应的动作。而rxAndroid本身也提供了线程切换的API,可以分别指定订阅者和被观察者的线程。
每一段rxAndroid代码都从被观察者开始

observables 被观察者(事件源)
subscribers 观察者 订阅者
schedulers 子线程、主线程切换的调度 器,
schedulers.newThread() 在子线程中执行,
schedulers.mainThread()在主线程中执行,
schedulers.io(),访问网络和数据操作的线程执行

核心代码

Observable.subscribe(Subcribe);

其中被观察者通过call回调subscriber.onNext(数据)和onComplete()传递数据,而subcscriber实现onNext(数据){....},加上线程控制从而实现异步任务处理。

下面举个栗子:从网络获取json数据:
package com.example.rxtest;

import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
import android.view.View;
import android.widget.Button;
import android.widget.TextView;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;

import butterknife.BindView;
import butterknife.ButterKnife;
import rx.Observable;
import rx.Subscriber;
import rx.android.schedulers.AndroidSchedulers;
import rx.schedulers.Schedulers;


public class MainActivity extends AppCompatActivity {


    private static final String TAG = "MainActivity";


    @BindView(R.id.btn_search)
    Button btnSearch;
    @BindView(R.id.tv_result)
    TextView tvResult;

    String requestUrl = "http://www.cnblogs.com/whoislcj/p/5520384.html";

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        ButterKnife.bind(this);

        btnSearch.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View view) {
                createSubscription();
            }
        });

    }

    private void createSubscription() {
        Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {

                String s = loadText();
                subscriber.onNext(s);
                subscriber.onCompleted();
            }
        })//被观察者
                .subscribeOn(Schedulers.io())//指定上面的被观察者在io线程(阶级数据、访问网络)
                .observeOn(AndroidSchedulers.mainThread())//指定下面的订阅者的线程(UI线程)
                .subscribe(new Subscriber<String>() {//订阅者
                    @Override
                    public void onStart() {
                        tvResult.setText("开始加载..");
                    }

                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {
                        tvResult.setText("加载出错" + e.getMessage());

                    }

                    @Override
                    public void onNext(String s) {
                        //被观察者onnext传递过来的消息
                        tvResult.setText(s);
                    }
                });
    }

    private String loadText() {
        // 新建一个URL对象
        URL url = null;
        String result = null;

        try {
            url = new URL(requestUrl);

            // 打开一个HttpURLConnection连接
            HttpURLConnection urlConn = (HttpURLConnection) url.openConnection();
            // 设置连接主机超时时间
            urlConn.setConnectTimeout(5 * 1000);
            //设置从主机读取数据超时
            urlConn.setReadTimeout(5 * 1000);
            // 设置是否使用缓存  默认是true
            urlConn.setUseCaches(true);
            // 设置为Get请求
            urlConn.setRequestMethod("GET");
            //urlConn设置请求头信息
            //设置请求中的媒体类型信息。
            urlConn.setRequestProperty("Content-Type", "application/json");
            //设置客户端与服务连接类型
            urlConn.addRequestProperty("Connection", "Keep-Alive");
            // 开始连接
            urlConn.connect();
            // 判断请求是否成功
            if (urlConn.getResponseCode() == 200) {
                // 获取返回的数据
                result = streamToString(urlConn.getInputStream());
                Log.e(TAG, "Get方式请求成功,result--->" + result);
            } else {
                Log.e(TAG, "Get方式请求失败");
            }
            // 关闭连接
            urlConn.disconnect();
        } catch (Exception e) {
            Log.e(TAG, e.toString());
        }

        return result;
    }

    private String streamToString(InputStream inputStream) {

        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));

        StringBuffer buffer = new StringBuffer();
        String line;

        try {
            while ((line = bufferedReader.readLine()) != null){
                buffer.append(line);
            }

            bufferedReader.close();

        } catch (IOException e) {
            e.printStackTrace();
        }

        return buffer.toString();
    }

}

如图


rxAndroid简单示例

rxAndroid操作符

map操作符

map 用来操作observable和最终的subscriber之间修改observable发出事件的类型,比如发出类型为int类型,最终为String类型,中间用map来转换

from操作符

将其他种类的对象和数据类型转换为Observable
当你使用Observable时,如果你要处理的数据都可以转换成展现为Observables,而不是需要使用Observables和其他类型的数据

/**
     * 使用在被观察者,返回的对象一般都是数值类型
     */
    public static void from(){
        Integer [] items={1,2,3,4,5,6,7,8,9};
        Observable observable = Observable.from(items);
        observable.subscribe(new Action1() {
            @Override public void call(Object o) {
                Log.i("adu",o.toString());
            }
        });
    }
interval操作符

interval操作符既可以延迟执行一段逻辑

/**
     * 指定某一时刻进行数据发送
     */
    public static void interval(){
        Integer [] items = {1,2,3,4,5};
        Observable observable = Observable.interval(items,1, TimeUnit.SECONDS);//每隔一秒发送数据
        observable.subscribe(new Action1() {
            @Override public void call(Object o) {
                Log.i("adu",o.toString());
            }
        });
    }
just

获取输入数据, 直接分发, 更加简洁, 省略其他回调.

/**
     * 处理数组集合
     */
    public static void just(){
        Integer[] items1={1,2,3,4,5};
        Integer[] items2={4,5,1,6,0};
        Observable observable = Observable.just(items1,items2);
        observable.subscribe(new Subscriber<Integer[]>() {
            @Override public void onCompleted() {
                Log.i("adu","onCompleted");
            }
            @Override public void onError(Throwable e) {
                Log.i("adu","onError");
            }
            @Override public void onNext(Integer[] integers) {
                for (int i= 0;i<integers.length;i++){
                    Log.i("adu","onNext==》》"+integers[i]);//先打印items1再打印items2
                }
            }
        });
    }
range操作符

range操作符的作用根据出入的初始值n和数目m发射一系列大于等于n的m个值。

/**
     * 指定输出数据的范围
     */
    public static void range(){
        Observable observable = Observable.range(23,4);
        observable.subscribe(new Subscriber<Integer>() {
            @Override public void onCompleted() {
                Log.i("adu","onCompleted");
            }
            @Override public void onError(Throwable e) {
                Log.i("adu","onError");
            }
            @Override public void onNext(Integer integer) {
                Log.i("adu","onNext==》》"+integer);
            }
        });
    }
flter操作符

数据过滤器,比如得到大于或者小于某个数的值

/**
     * 过滤某些条件
     */
    public static void filter(){
        Observable observable = Observable.just(1,2,3,4,5,6,7,8);
        observable.filter(new Func1<Integer,Boolean>() {
            @Override public Boolean call(Integer integer) {
                return integer<5;
            }
        }).observeOn(Schedulers.io()).subscribe(new Subscriber<Integer>() {
            @Override public void onCompleted() {
                Log.i("adu","onCompleted");
            }
            @Override public void onError(Throwable e) {
                Log.i("adu","onError");
            }
            @Override public void onNext(Integer integer) {
                Log.i("adu","inNext==》》"+integer);
            }
        });
    }
take操作符

获取数据前几位值

takelast操作符

获取数据后几位值

deitinct

对一个值只处理一次

等等操作,详情参考rxAndroid操作

CompositeSubscription

用于订阅者的管理,一般在baseActivity中创建,将所有订阅者添加到CompositeSubcription中集中管理.

add()添加订阅者
clear()清理订阅者,可以再次使用
unsubscribe()取消订阅者,再也不能使用

相关文章

网友评论

      本文标题:rxAndroid小结

      本文链接:https://www.haomeiwen.com/subject/bvyslxtx.html