美文网首页
关于RxJava1.0最友好的文章(源码学习)

关于RxJava1.0最友好的文章(源码学习)

作者: 闫鹏飞写字的地方 | 来源:发表于2018-04-03 15:45 被阅读48次
1.学习简单的RxJava的流程
学习简单的RXJAVA流程.gif
/*
 *  同步情况下了解Rxjava的运行
 */
public class NormalRxActivity extends AppCompatActivity implements View.OnClickListener{

    private TextView mText;
    private Button mBtn;
    private TextView mEdit;
    static String str ="一二三四五\n 上山打老虎\n 老虎一发威\n 武松就发怵\n";
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.layout1);
        initView();
    }

    private void initView() {
        mText= (TextView) findViewById(R.id.text1);
        mEdit= (TextView) findViewById(R.id.edit1);
        mBtn= (Button) findViewById(R.id.button);
        mEdit.setText(str);


        mBtn.setOnClickListener(this);
        mText.setOnClickListener(this);
        mEdit.setOnClickListener(this);

    }

    @Override
    public void onClick(View view) {
        switch (view.getId()){
            case R.id.text1:
                break;
            case R.id.edit1:
                break;
            case R.id.button:
                if(mText.getText().toString()!=null ||mText.getText().toString().length()>0){
                    mText.setText("");
                }
                start();
                break;
        }
    }

    private void start() {
        //创建被观察者
        Observable observable = createObservable();
        //创建观察者
        Subscriber subscriber = createSubscriber();

        mText.append("开始订阅,准备观察...\n");
        //事实上,observable不止可以订阅subscriber,也可以订阅ActionX()
        observable.subscribe(subscriber);

        //就像现在这样
//        observable.subscribe(new Action1<String>() {
//            @Override
//            public void call(String s) {
//                //Action1也就意味着,只能传入一个参数 ----> String s,同理Action0,Action2....,
//                //在这个call方法中传入了onNext()的参数,相当于代替了onNext方法,但是就不能监听onComplete,onError方法了
//                mText.append("执行观察者中的onNext()...\n");
//                mText.append(s+"...\n");
//            }
//        });
    }

    private Subscriber createSubscriber() {
        //创建观察者
        Subscriber subscriber=new Subscriber<String>() {
            @Override
            public void onCompleted() {
                mText.append("执行观察者中的onCompleted()...\n");
                mText.append("订阅完毕,结束观察...\n");
            }

            @Override
            public void onError(Throwable e) {

            }
            @Override
            public void onNext(String s) {
                mText.append("执行观察者中的onNext()...\n");
                mText.append(s+"...\n");
            }

        };
        return  subscriber;
    }

    private Observable createObservable(){
        //创建被观察者,这是最正常的创建方法
        Observable observable = Observable.create(new Observable.OnSubscribe<String>(){

            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("一二三四五");
                subscriber.onNext("上山打老虎");
                subscriber.onNext("老虎一发威");
                subscriber.onNext("武松就发怵");
                subscriber.onCompleted();

            }
        });

        //想要图方便,可以这样创建
        //from(T[])
//        String [] kk={"一二三四五","上山打老虎","老虎一发威","武松就发怵"};
//        Observable observable=Observable.from(kk);

        //或者这样
        //just(T...)
//        Observable observable=Observable.just("一二三四五","上山打老虎","老虎一发威","武松就发怵");

        return observable;
    }
}
2.操作符--MAP
操作符--MAP.gif
public class RxMapActivity extends AppCompatActivity implements View.OnClickListener{

    private TextView mText;
    private Button mBtn;
    private TextView mEdit;
    private Integer [] number={1,2,3,4,5,6};
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.layout1);
        initView();
    }

    private void initView() {
        mText= (TextView) findViewById(R.id.text1);
        mEdit= (TextView) findViewById(R.id.edit1);
        mBtn= (Button) findViewById(R.id.button);
        mBtn.setText("判断数组中的小于3的数");
        mEdit.setText("输入Integer(int):1,2,3,4,5,6 \n"+"\n"+"输出:type:true/false \n");

        mBtn.setOnClickListener(this);
        mText.setOnClickListener(this);
        mEdit.setOnClickListener(this);

    }

    @Override
    public void onClick(View view) {
        switch (view.getId()){
            case R.id.text1:
                break;
            case R.id.edit1:
                break;
            case R.id.button:
                if(mText.getText().toString()!=null ||mText.getText().toString().length()>0){
                    mText.setText("");
                }
                start();
                break;
        }
    }

    private void start() {
        mText.append("\n 输入参数: 1,2,3,4,5,6 \n");
        Observable.from(number)           //之前提到的创建Observable方法
                  .map(new Func1<Integer, Boolean>() {

                      @Override
                      public Boolean call(Integer integer) {
                          mText.append("\n\n map()  Integer--->Boolean");
                          return (integer<3);
                      }
                  })
                  .subscribe(new Action1<Boolean>() {
                    @Override
                    public void call(Boolean aBoolean) {
                        mText.append("\n观察到输出结果:\n");
                        mText.append(aBoolean.toString());
                    }
                });
    }

}
3.Rxjava的线程调度
RXJAVA线程调度.gif
public class RxSchuderActivity extends AppCompatActivity implements View.OnClickListener{

    private TextView mText;
    private Button mBtn;
    private TextView mEdit;
    private LinearLayout mLinearlayout;
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.layout1);
        initView();
    }

    private void initView() {
        mText= (TextView) findViewById(R.id.text1);
        mEdit= (TextView) findViewById(R.id.edit1);
        mBtn= (Button) findViewById(R.id.button);
        mLinearlayout= (LinearLayout) findViewById(R.id.linearlayout);
        mBtn.setText("从资源文件中获取图片,然后展示出来");
        mBtn.setOnClickListener(this);
        mText.setOnClickListener(this);
        mEdit.setOnClickListener(this);

    }

    @Override
    public void onClick(View view) {
        switch (view.getId()){
            case R.id.text1:
                break;
            case R.id.edit1:
                break;
            case R.id.button:
                if(mText.getText().toString().length()>0){
                    mText.setText("");
                }
                start();
                break;
        }
    }
static  StringBuffer sb=null;
    private void start() {
        sb=new StringBuffer();
        Observable.create(new Observable.OnSubscribe<Drawable>(){

            @Override
            public void call(Subscriber<? super Drawable> subscriber) {
               sb.append(" Observable.create(): 线程: "+Thread.currentThread().getName()+"\n\n");
                Drawable dd=getResources().getDrawable(R.mipmap.gril);
                subscriber.onNext(dd);
                subscriber.onCompleted();
            }
        }).subscribeOn(Schedulers.io())
          .observeOn(Schedulers.newThread())
          .map(new Func1<Drawable, ImageView>() {
              @Override
              public ImageView call(Drawable drawable) {
                  sb.append("map():  drawable -->imageview 的线程: "+Thread.currentThread().getName()+"\n\n");
                  ImageView img=new ImageView(RxSchuderActivity.this);
                  LinearLayout.LayoutParams params= new LinearLayout.LayoutParams(LinearLayout.LayoutParams.WRAP_CONTENT, LinearLayout.LayoutParams.WRAP_CONTENT);
                  img.setLayoutParams(params);
                  img.setImageDrawable(drawable);
                  return img;
              }
          }).observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Action1<ImageView>() {
                @Override
                public void call(ImageView imageView) {
                    sb.append("call(): 线程: "+Thread.currentThread().getName()+"\n");
                    mText.setText(sb);
                    mLinearlayout.addView(imageView);

                }
            });
    }
}
4.操作符--flatMap
操作符-FLATMAP.gif
public class RxFlatMapActivity extends AppCompatActivity implements View.OnClickListener{

    private TextView mText;
    private Button mBtn;
    private TextView mEdit;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.layout1);
        initView();
        initData();
    }
    private SchoolClass[] mSchoolClasses=new SchoolClass[2];

    private void initData() {
        Student[] student=new Student[5];
        for(int i=0;i<5;i++){
            Student s=new Student("二狗"+i,"17");
            student[i]=s;
        }
        mSchoolClasses[0]=new SchoolClass(student);

        Student[] student2=new Student[5];
        for(int i=0;i<5;i++){
            Student s=new Student("小明"+i,"27");
            student2[i]=s;
        }
        mSchoolClasses[1]=new SchoolClass(student2);

    }

    private void initView() {
        mText= (TextView) findViewById(R.id.text1);
        mEdit= (TextView) findViewById(R.id.edit1);
        mBtn= (Button) findViewById(R.id.button);

        mEdit.setText("打印一个学校所有班级所有学生姓名");
        mBtn.setOnClickListener(this);
        mText.setOnClickListener(this);
        mEdit.setOnClickListener(this);

    }

    @Override
    public void onClick(View view) {
        switch (view.getId()){
            case R.id.text1:
                break;
            case R.id.edit1:
                break;
            case R.id.button:
                if(mText.getText().toString().length()>0){
                    mText.setText("");
                }
                strat();
                break;
        }
    }
    public SchoolClass[] getSchoolClass(){
        return  mSchoolClasses;
    }
    private void strat() {
        Observable.from(getSchoolClass())
                .flatMap(new Func1<SchoolClass, Observable<Student>>() {
                    @Override
                    public Observable<Student> call(SchoolClass schoolClass) {
                        //将Student列表使用from方法一个一个发出去
                        return Observable.from(schoolClass.getStudents());
                    }
                })
                .subscribe(new Action1<Student>() {
                    @Override
                    public void call(Student student) {
                        mText.append("打印单个学生信息:\n");
                        mText.append("name:"+student.name+"    age: "+student.age+"\n");
                    }
                });
    }
}

class SchoolClass{
    Student[] stud;
    public SchoolClass(Student[] s){
        this.stud=s;
    }
    public Student[] getStudents(){
        return  stud;
    }
}

class Student{
    String name;
    String age;
    public Student(String name,String age){
        this.name=name;
        this.age=age;
    }
}
5.操作符--合并
操作符-合并.gif
public class RxMergeActivity extends AppCompatActivity implements View.OnClickListener{

    private TextView mText;
    private Button mBtn;
    private TextView mEdit;
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.layout1);
        initView();
    }

    private void initView() {
        mText= (TextView) findViewById(R.id.text1);
        mEdit= (TextView) findViewById(R.id.edit1);
        mBtn= (Button) findViewById(R.id.button);
        mEdit.setText("两个任务并发进行,全部处理完毕之后在更新数据");
        mBtn.setOnClickListener(this);
        mText.setOnClickListener(this);
        mEdit.setOnClickListener(this);

    }

    @Override
    public void onClick(View view) {
        switch (view.getId()){
            case R.id.text1:
                break;
            case R.id.edit1:
                break;
            case R.id.button:
                if(mText.getText().toString().length()>0){
                    mText.setText("");
                }
                start();
                break;
        }
    }

    private void start() {
        Observable obs1=Observable.create(new Observable.OnSubscribe<String>(){

            @Override
            public void call(Subscriber<? super String> subscriber) {
                try {
                    Thread.sleep(500);
                    subscriber.onNext(" aaa");
                    subscriber.onCompleted();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).subscribeOn(Schedulers.newThread());

        Observable obs2=Observable.create(new Observable.OnSubscribe<String>(){

            @Override
            public void call(Subscriber<? super String> subscriber) {
                try {
                    Thread.sleep(1500);
                    subscriber.onNext("bbb");
                    subscriber.onCompleted();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).subscribeOn(Schedulers.newThread());

        Observable.merge(obs1,obs2)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<String>() {
                    StringBuffer sb=new StringBuffer();
                    @Override
                    public void onCompleted() {
                        mText.append("两个任务都处理完毕!!\n");
                        mText.append("更新数据:"+sb+"\n");
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(String s) {
                        sb.append( s+",");
                        mText.append("得到一个数据:"+s+"\n");
                    }
                });
    }
}
6.基于Rxjava的Binding
基于RXJAVA的BINGING.gif
public class RxBindingActivity extends AppCompatActivity implements View.OnClickListener{

    private TextView mText;
    private Button mBtn;
    private EditText mEdit;
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.layout3);
        initView();
    }

    private void initView() {
        mText= (TextView) findViewById(R.id.text1);
        mEdit= (EditText) findViewById(R.id.edit1);
        mBtn= (Button) findViewById(R.id.button);
        mEdit.setHint("输入含有1的数字,下方才会出现提示");
        mText.setText("提示数据:\n");

        mBtn.setOnClickListener(this);
        mText.setOnClickListener(this);
        mEdit.setOnClickListener(this);
        //用来监听edittext输入,同时匹配输入数数据来提示
        RxTextView.textChanges(mEdit)
                //在一次事件发生后的一段时间内没有新操作,则发出这次事件
                .debounce(500,TimeUnit.MILLISECONDS)
                //转换线程
                .observeOn(Schedulers.newThread())
                //通过输入的数据,来匹配"数据库"中的数据从而提示。。
                .map(new Func1<CharSequence, List<String>>() {
                    List<String> list= new ArrayList<>();
                    @Override
                    public List<String> call(CharSequence charSequence) {

                        if (charSequence.toString().contains("1")){
                            for (int i=0;i<5;i++){
                                list.add("11"+i);
                            }
                        }
                        return list;
                    }
                })
                //由于我不想要listl列表,所以使用了flatMap来分解成一个一个的数据发送
                .flatMap(new Func1<List<String>, Observable<String>>() {
                    @Override
                    public Observable<String> call(List<String> strings) {

                        return Observable.from(strings);
                    }
                })
                //这里切换成主线程,不然没法操作组件
                .observeOn(AndroidSchedulers.mainThread())
                //这里做一些过滤动作
                .filter(new Func1<String, Boolean>() {
                    @Override
                    public Boolean call(String s) {
                        return !mText.getText().toString().contains(s);
                    }
                })
                //订阅
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        //这里展示提示数据
                        mText.append(s + "\n");
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        Log.w("DDDDDDDD",throwable.getMessage().toString());
                    }
                });
        mBtn.setText("连续点击防误触");
        RxView.clicks(mBtn)
                //防误触(设定点击后500ms内丢弃新事件,或者说点击后500ms毫秒无响应)
                .throttleFirst(500, TimeUnit.MILLISECONDS)
                .subscribe(new Action1<Void>() {
                    //这就相当于OnClickListener中的OnClick方法回调
                    @Override
                    public void call(Void aVoid) {
                       mText.append("\n 防误触 测试  \n");
                    }
                });

    }

    @Override
    public void onClick(View view) {
        switch (view.getId()){
            case R.id.text1:
                break;
            case R.id.edit1:
                break;
            case R.id.button:
                mText.append("lalal\n");
                break;
        }
    }
}
7.操作符--filter
操作符-FILTER.gif
public class RxFilterActivity extends AppCompatActivity implements View.OnClickListener{

    private TextView mText;
    private Button mBtn;
    private TextView mEdit;
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.layout1);
        initView();
    }

    private void initView() {
        mText= (TextView) findViewById(R.id.text1);
        mEdit= (TextView) findViewById(R.id.edit1);
        mBtn= (Button) findViewById(R.id.button);

        mBtn.setOnClickListener(this);
        mText.setOnClickListener(this);
        mEdit.setOnClickListener(this);
        mEdit.setText("输入1-10,过滤掉能被2整除的数");
    }

    @Override
    public void onClick(View view) {
        switch (view.getId()){
            case R.id.text1:
                break;
            case R.id.edit1:
                break;
            case R.id.button:

                start();
                break;
        }
    }

    private void start() {
        Integer[] integers={1,2,3,4,5,6,7,8,9,10};
        Observable.from(integers)
                .filter(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer%2!=0;
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        mText.append(integer.toString()+",");
                    }
                });
    }
}
8.操作符--take,doOnNext
操作符-TAKE DOONNEXT.gif
public class RxTakeActivity extends AppCompatActivity implements View.OnClickListener{

    private TextView mText;
    private Button mBtn;
    private TextView mEdit;
    private Integer [] number={1,2,3,4,5,6,7,8,9,10};
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.layout1);
        initView();
    }

    private void initView() {
        mText= (TextView) findViewById(R.id.text1);
        mEdit= (TextView) findViewById(R.id.edit1);
        mBtn= (Button) findViewById(R.id.button);

        mEdit.setText("输出[1,2,3,4,5,6,7,8,9,10]中第三个和第四个奇数,\n\ntake(i) 取前i个事件 \ntakeLast(i) 取后i个事件 \ndoOnNext(Action1) 每次观察者中的onNext调用之前调用");
        mBtn.setOnClickListener(this);
        mText.setOnClickListener(this);
        mEdit.setOnClickListener(this);

    }

    @Override
    public void onClick(View view) {
        switch (view.getId()){
            case R.id.text1:
                break;
            case R.id.edit1:
                break;
            case R.id.button:
                if(mText.getText().toString()!=null ||mText.getText().toString().length()>0){
                    mText.setText("");
                }
                start();
                break;
        }
    }

    private void start() {
        Observable.from(number)
                  .filter(new Func1<Integer, Boolean>() {
                      @Override
                      public Boolean call(Integer integer) {
                          return integer%2!=0;
                      }
                  })
                    //取前四个
                    .take(4)
                    //取前四个中的后两个
                    .takeLast(2)
                    .doOnNext(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            mText.append("before onNext()\n");
                        }
                    })
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            mText.append("onNext()--->"+integer+"\n");
                        }
                    });
    }
}
9.操作符--Interval,取消订阅
操作符-INTERVAL 取消订阅.gif
public class RxTimerActivity extends AppCompatActivity implements View.OnClickListener{

    private TextView mText;
    private Button mBtn;
    private Button mBtnCancal;
    private TextView mEdit;
    private Subscription mSubscription=null;
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.layout2);
        initView();
    }

    private void initView() {
        mText= (TextView) findViewById(R.id.text1);
        mEdit= (TextView) findViewById(R.id.edit1);
        mBtn= (Button) findViewById(R.id.button);
        mBtnCancal= (Button) findViewById(R.id.button_cancal);
        mEdit.setText("定时器,每一秒发送打印一个数字   \n\ninterval(1, TimeUnit.SECONDS)  创建一个每隔一秒发送一次事件的对象");
        mBtn.setOnClickListener(this);
        mText.setOnClickListener(this);
        mEdit.setOnClickListener(this);
        mBtnCancal.setOnClickListener(this);
    }

    @Override
    public void onClick(View view) {
        switch (view.getId()){
            case R.id.text1:
                break;
            case R.id.edit1:
                break;
            case R.id.button:
                start();
                break;
            case R.id.button_cancal:
                    //取消订阅
                     if (mSubscription!=null && !mSubscription.isUnsubscribed()){
                         mSubscription.unsubscribe();
                     }
                    break;
        }
    }

    private void start() {
        //interval()是运行在computation Scheduler线程中的,因此需要转到主线程
        mSubscription=Observable.interval(1, TimeUnit.SECONDS)
                    .observeOn(AndroidSchedulers.mainThread())
                  .subscribe(new Action1<Long>() {
                      @Override
                      public void call(Long aLong) {
                          mText.setText(aLong+"");
                      }
                  });
    }
}
10.操作符--toSortedList
操作符--toSortedList.gif
public class RxSortActivity extends AppCompatActivity implements View.OnClickListener{

    private TextView mText;
    private Button mBtn;
    private Button mBtnCancal;
    private TextView mEdit;
    private Subscription mSubscription=null;
    private Integer [] words={1,3,5,2,34,7,5,86,23,43};
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.layout1);
        initView();
    }

    private void initView() {
        mText= (TextView) findViewById(R.id.text1);
        mEdit= (TextView) findViewById(R.id.edit1);
        mBtn= (Button) findViewById(R.id.button);
        mBtn.setText("开始排序");
//        mBtnCancal= (Button) findViewById(R.id.button_cancal);
        mEdit.setText("为给定数据列表排序:1,3,5,2,34,7,5,86,23,43   \n\ntoSortedList() :为事件中的数据排序" );
        mBtn.setOnClickListener(this);
        mText.setOnClickListener(this);
        mEdit.setOnClickListener(this);
//        mBtnCancal.setOnClickListener(this);
    }

    @Override
    public void onClick(View view) {
        switch (view.getId()){
            case R.id.text1:
                break;
            case R.id.edit1:
                break;
            case R.id.button:
                start();
                break;

        }
    }

    private void start() {
        //
        Observable.from(words)
                  .toSortedList()
                   .flatMap(new Func1<List<Integer>, Observable<Integer>>() {
                       @Override
                       public Observable<Integer> call(List<Integer> strings) {
                           return Observable.from(strings);
                       }
                   })
                  .subscribe(new Action1<Integer>() {
                      @Override
                      public void call(Integer strings) {
                          mText.append(strings+"\n");
                      }
                  });
    }
}
11.操作符--connect
操作符--connect.gif
public class RxConnetActivity extends AppCompatActivity implements View.OnClickListener{

    private TextView mText;
    private Button mBtn;
    private Button bntNormol;
    private TextView mEdit;
    private Subscription mSubscription=null;
    private Integer [] integer={1,2,3,4,5,6};
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.layout2);
        initView();
    }

    private void initView() {
        mText= (TextView) findViewById(R.id.text1);
        mEdit= (TextView) findViewById(R.id.edit1);
        mBtn= (Button) findViewById(R.id.button);
        bntNormol= (Button) findViewById(R.id.button_cancal);
        mBtn.setText("正常情况下");

        bntNormol.setText("connect模式");

        mEdit.setText("Observable发送事件1-6,两个观察者同时观察这个Observable \n要求:每发出一个事件,观察者A和观察者都会收到,而不是先把所有的时间发送A,然后再发送给B  \n\n" );
        mBtn.setOnClickListener(this);
        mText.setOnClickListener(this);
        mEdit.setOnClickListener(this);
        bntNormol.setOnClickListener(this);

    }

    @Override
    public void onClick(View view) {
        switch (view.getId()){
            case R.id.text1:
                break;
            case R.id.edit1:
                break;
            case R.id.button:
                mText.setText("");
                normol();
                break;
            case R.id.button_cancal:
                mText.setText("");
                start();
                break;

        }
    }

    private void normol() {
        Observable  observable= Observable.from(integer);
        Action1 a1=new Action1<Integer>(){
            @Override
            public void call(Integer o) {
                mText.append("观察者A  收到:  "+o+"\n");
            }
        };
        Action1 a2=new Action1<Integer>(){
            @Override
            public void call(Integer o) {
                mText.append("观察者B  收到:  "+o+"\n");
            }
        };

        observable.subscribe(a1);
        observable.subscribe(a2);

    }

    private void start() {

        ConnectableObservable  observable= Observable.from(integer)
                                                    .publish();//将一个Observable转换为一个可连接的Observable

        Action1 a1=new Action1<Integer>(){
            @Override
            public void call(Integer o) {
                mText.append("观察者A  收到:  "+o+"\n");
            }
        };
        Action1 a2=new Action1<Integer>(){
            @Override
            public void call(Integer o) {
                mText.append("观察者B  收到:  "+o+"\n");
            }
        };

        observable.subscribe(a1);
        observable.subscribe(a2);
        observable.connect();

    }
}
12.操作符--
操作符--.gif
public class TimestampActivity extends AppCompatActivity implements View.OnClickListener{

    private TextView mText;
    private Button mBtn;
    private Button mBtnCancal;
    private TextView mEdit;
    private Subscription mSubscription=null;
    private Integer [] words={1,3,5,2,34,7,5,86,23,43};
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.layout1);
        initView();
    }

    private void initView() {
        mText= (TextView) findViewById(R.id.text1);
        mEdit= (TextView) findViewById(R.id.edit1);
        mBtn= (Button) findViewById(R.id.button);
        mBtn.setText("开始");
//        mBtnCancal= (Button) findViewById(R.id.button_cancal);
        mEdit.setText("为给定数据列表:1,3,5,2,34,7,5,86,23,43中每一个数据加上一个时间戳   \n\ntimestamp() :为每个事件加上一个时间戳" );
        mBtn.setOnClickListener(this);
        mText.setOnClickListener(this);
        mEdit.setOnClickListener(this);
//        mBtnCancal.setOnClickListener(this);
    }

    @Override
    public void onClick(View view) {
        switch (view.getId()){
            case R.id.text1:
                break;
            case R.id.edit1:
                break;
            case R.id.button:
                mText.setText("");
                start();
                break;

        }
    }

    private void start() {
        //
//        Observable.from(words)
//                  .toSortedList()
//                   .flatMap(new Func1<List<Integer>, Observable<Integer>>() {
//                       @Override
//                       public Observable<Integer> call(List<Integer> strings) {
//                           return Observable.from(strings);
//                       }
//                   })
//                  .subscribe(new Action1<Integer>() {
//                      @Override
//                      public void call(Integer strings) {
//                          mText.append(strings+"\n");
//                      }
//                  });
        Observable.from(words)
                .timestamp()
//                .timestamp(Schedulers.io()) 可指定线程环境,如果指定到子线程,请在最后切换成主线程
                .subscribe(new Action1<Timestamped<Integer>>() {
                    @Override
                    public void call(Timestamped<Integer> integerTimestamped) {

                        SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd-HH:mm:ss");
                        mText.append("value: "+integerTimestamped.getValue()+"       time:   ");
                        mText.append(sdf.format(new Date(integerTimestamped.getTimestampMillis()))+"\n");

                    }
                });
    }
}

相关文章

网友评论

      本文标题:关于RxJava1.0最友好的文章(源码学习)

      本文链接:https://www.haomeiwen.com/subject/qqtchftx.html