美文网首页
关于RxJava1.0最友好的文章(源码学习)

关于RxJava1.0最友好的文章(源码学习)

作者: 闫鹏飞写字的地方 | 来源:发表于2018-04-03 15:45 被阅读48次
    1.学习简单的RxJava的流程
    学习简单的RXJAVA流程.gif
    /*
     *  同步情况下了解Rxjava的运行
     */
    public class NormalRxActivity extends AppCompatActivity implements View.OnClickListener{
    
        private TextView mText;
        private Button mBtn;
        private TextView mEdit;
        static String str ="一二三四五\n 上山打老虎\n 老虎一发威\n 武松就发怵\n";
        @Override
        protected void onCreate(Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.layout1);
            initView();
        }
    
        private void initView() {
            mText= (TextView) findViewById(R.id.text1);
            mEdit= (TextView) findViewById(R.id.edit1);
            mBtn= (Button) findViewById(R.id.button);
            mEdit.setText(str);
    
    
            mBtn.setOnClickListener(this);
            mText.setOnClickListener(this);
            mEdit.setOnClickListener(this);
    
        }
    
        @Override
        public void onClick(View view) {
            switch (view.getId()){
                case R.id.text1:
                    break;
                case R.id.edit1:
                    break;
                case R.id.button:
                    if(mText.getText().toString()!=null ||mText.getText().toString().length()>0){
                        mText.setText("");
                    }
                    start();
                    break;
            }
        }
    
        private void start() {
            //创建被观察者
            Observable observable = createObservable();
            //创建观察者
            Subscriber subscriber = createSubscriber();
    
            mText.append("开始订阅,准备观察...\n");
            //事实上,observable不止可以订阅subscriber,也可以订阅ActionX()
            observable.subscribe(subscriber);
    
            //就像现在这样
    //        observable.subscribe(new Action1<String>() {
    //            @Override
    //            public void call(String s) {
    //                //Action1也就意味着,只能传入一个参数 ----> String s,同理Action0,Action2....,
    //                //在这个call方法中传入了onNext()的参数,相当于代替了onNext方法,但是就不能监听onComplete,onError方法了
    //                mText.append("执行观察者中的onNext()...\n");
    //                mText.append(s+"...\n");
    //            }
    //        });
        }
    
        private Subscriber createSubscriber() {
            //创建观察者
            Subscriber subscriber=new Subscriber<String>() {
                @Override
                public void onCompleted() {
                    mText.append("执行观察者中的onCompleted()...\n");
                    mText.append("订阅完毕,结束观察...\n");
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
                @Override
                public void onNext(String s) {
                    mText.append("执行观察者中的onNext()...\n");
                    mText.append(s+"...\n");
                }
    
            };
            return  subscriber;
        }
    
        private Observable createObservable(){
            //创建被观察者,这是最正常的创建方法
            Observable observable = Observable.create(new Observable.OnSubscribe<String>(){
    
                @Override
                public void call(Subscriber<? super String> subscriber) {
                    subscriber.onNext("一二三四五");
                    subscriber.onNext("上山打老虎");
                    subscriber.onNext("老虎一发威");
                    subscriber.onNext("武松就发怵");
                    subscriber.onCompleted();
    
                }
            });
    
            //想要图方便,可以这样创建
            //from(T[])
    //        String [] kk={"一二三四五","上山打老虎","老虎一发威","武松就发怵"};
    //        Observable observable=Observable.from(kk);
    
            //或者这样
            //just(T...)
    //        Observable observable=Observable.just("一二三四五","上山打老虎","老虎一发威","武松就发怵");
    
            return observable;
        }
    }
    
    2.操作符--MAP
    操作符--MAP.gif
    public class RxMapActivity extends AppCompatActivity implements View.OnClickListener{
    
        private TextView mText;
        private Button mBtn;
        private TextView mEdit;
        private Integer [] number={1,2,3,4,5,6};
        @Override
        protected void onCreate(Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.layout1);
            initView();
        }
    
        private void initView() {
            mText= (TextView) findViewById(R.id.text1);
            mEdit= (TextView) findViewById(R.id.edit1);
            mBtn= (Button) findViewById(R.id.button);
            mBtn.setText("判断数组中的小于3的数");
            mEdit.setText("输入Integer(int):1,2,3,4,5,6 \n"+"\n"+"输出:type:true/false \n");
    
            mBtn.setOnClickListener(this);
            mText.setOnClickListener(this);
            mEdit.setOnClickListener(this);
    
        }
    
        @Override
        public void onClick(View view) {
            switch (view.getId()){
                case R.id.text1:
                    break;
                case R.id.edit1:
                    break;
                case R.id.button:
                    if(mText.getText().toString()!=null ||mText.getText().toString().length()>0){
                        mText.setText("");
                    }
                    start();
                    break;
            }
        }
    
        private void start() {
            mText.append("\n 输入参数: 1,2,3,4,5,6 \n");
            Observable.from(number)           //之前提到的创建Observable方法
                      .map(new Func1<Integer, Boolean>() {
    
                          @Override
                          public Boolean call(Integer integer) {
                              mText.append("\n\n map()  Integer--->Boolean");
                              return (integer<3);
                          }
                      })
                      .subscribe(new Action1<Boolean>() {
                        @Override
                        public void call(Boolean aBoolean) {
                            mText.append("\n观察到输出结果:\n");
                            mText.append(aBoolean.toString());
                        }
                    });
        }
    
    }
    
    3.Rxjava的线程调度
    RXJAVA线程调度.gif
    public class RxSchuderActivity extends AppCompatActivity implements View.OnClickListener{
    
        private TextView mText;
        private Button mBtn;
        private TextView mEdit;
        private LinearLayout mLinearlayout;
        @Override
        protected void onCreate(Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.layout1);
            initView();
        }
    
        private void initView() {
            mText= (TextView) findViewById(R.id.text1);
            mEdit= (TextView) findViewById(R.id.edit1);
            mBtn= (Button) findViewById(R.id.button);
            mLinearlayout= (LinearLayout) findViewById(R.id.linearlayout);
            mBtn.setText("从资源文件中获取图片,然后展示出来");
            mBtn.setOnClickListener(this);
            mText.setOnClickListener(this);
            mEdit.setOnClickListener(this);
    
        }
    
        @Override
        public void onClick(View view) {
            switch (view.getId()){
                case R.id.text1:
                    break;
                case R.id.edit1:
                    break;
                case R.id.button:
                    if(mText.getText().toString().length()>0){
                        mText.setText("");
                    }
                    start();
                    break;
            }
        }
    static  StringBuffer sb=null;
        private void start() {
            sb=new StringBuffer();
            Observable.create(new Observable.OnSubscribe<Drawable>(){
    
                @Override
                public void call(Subscriber<? super Drawable> subscriber) {
                   sb.append(" Observable.create(): 线程: "+Thread.currentThread().getName()+"\n\n");
                    Drawable dd=getResources().getDrawable(R.mipmap.gril);
                    subscriber.onNext(dd);
                    subscriber.onCompleted();
                }
            }).subscribeOn(Schedulers.io())
              .observeOn(Schedulers.newThread())
              .map(new Func1<Drawable, ImageView>() {
                  @Override
                  public ImageView call(Drawable drawable) {
                      sb.append("map():  drawable -->imageview 的线程: "+Thread.currentThread().getName()+"\n\n");
                      ImageView img=new ImageView(RxSchuderActivity.this);
                      LinearLayout.LayoutParams params= new LinearLayout.LayoutParams(LinearLayout.LayoutParams.WRAP_CONTENT, LinearLayout.LayoutParams.WRAP_CONTENT);
                      img.setLayoutParams(params);
                      img.setImageDrawable(drawable);
                      return img;
                  }
              }).observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<ImageView>() {
                    @Override
                    public void call(ImageView imageView) {
                        sb.append("call(): 线程: "+Thread.currentThread().getName()+"\n");
                        mText.setText(sb);
                        mLinearlayout.addView(imageView);
    
                    }
                });
        }
    }
    
    4.操作符--flatMap
    操作符-FLATMAP.gif
    public class RxFlatMapActivity extends AppCompatActivity implements View.OnClickListener{
    
        private TextView mText;
        private Button mBtn;
        private TextView mEdit;
    
        @Override
        protected void onCreate(Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.layout1);
            initView();
            initData();
        }
        private SchoolClass[] mSchoolClasses=new SchoolClass[2];
    
        private void initData() {
            Student[] student=new Student[5];
            for(int i=0;i<5;i++){
                Student s=new Student("二狗"+i,"17");
                student[i]=s;
            }
            mSchoolClasses[0]=new SchoolClass(student);
    
            Student[] student2=new Student[5];
            for(int i=0;i<5;i++){
                Student s=new Student("小明"+i,"27");
                student2[i]=s;
            }
            mSchoolClasses[1]=new SchoolClass(student2);
    
        }
    
        private void initView() {
            mText= (TextView) findViewById(R.id.text1);
            mEdit= (TextView) findViewById(R.id.edit1);
            mBtn= (Button) findViewById(R.id.button);
    
            mEdit.setText("打印一个学校所有班级所有学生姓名");
            mBtn.setOnClickListener(this);
            mText.setOnClickListener(this);
            mEdit.setOnClickListener(this);
    
        }
    
        @Override
        public void onClick(View view) {
            switch (view.getId()){
                case R.id.text1:
                    break;
                case R.id.edit1:
                    break;
                case R.id.button:
                    if(mText.getText().toString().length()>0){
                        mText.setText("");
                    }
                    strat();
                    break;
            }
        }
        public SchoolClass[] getSchoolClass(){
            return  mSchoolClasses;
        }
        private void strat() {
            Observable.from(getSchoolClass())
                    .flatMap(new Func1<SchoolClass, Observable<Student>>() {
                        @Override
                        public Observable<Student> call(SchoolClass schoolClass) {
                            //将Student列表使用from方法一个一个发出去
                            return Observable.from(schoolClass.getStudents());
                        }
                    })
                    .subscribe(new Action1<Student>() {
                        @Override
                        public void call(Student student) {
                            mText.append("打印单个学生信息:\n");
                            mText.append("name:"+student.name+"    age: "+student.age+"\n");
                        }
                    });
        }
    }
    
    class SchoolClass{
        Student[] stud;
        public SchoolClass(Student[] s){
            this.stud=s;
        }
        public Student[] getStudents(){
            return  stud;
        }
    }
    
    class Student{
        String name;
        String age;
        public Student(String name,String age){
            this.name=name;
            this.age=age;
        }
    }
    
    5.操作符--合并
    操作符-合并.gif
    public class RxMergeActivity extends AppCompatActivity implements View.OnClickListener{
    
        private TextView mText;
        private Button mBtn;
        private TextView mEdit;
        @Override
        protected void onCreate(Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.layout1);
            initView();
        }
    
        private void initView() {
            mText= (TextView) findViewById(R.id.text1);
            mEdit= (TextView) findViewById(R.id.edit1);
            mBtn= (Button) findViewById(R.id.button);
            mEdit.setText("两个任务并发进行,全部处理完毕之后在更新数据");
            mBtn.setOnClickListener(this);
            mText.setOnClickListener(this);
            mEdit.setOnClickListener(this);
    
        }
    
        @Override
        public void onClick(View view) {
            switch (view.getId()){
                case R.id.text1:
                    break;
                case R.id.edit1:
                    break;
                case R.id.button:
                    if(mText.getText().toString().length()>0){
                        mText.setText("");
                    }
                    start();
                    break;
            }
        }
    
        private void start() {
            Observable obs1=Observable.create(new Observable.OnSubscribe<String>(){
    
                @Override
                public void call(Subscriber<? super String> subscriber) {
                    try {
                        Thread.sleep(500);
                        subscriber.onNext(" aaa");
                        subscriber.onCompleted();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).subscribeOn(Schedulers.newThread());
    
            Observable obs2=Observable.create(new Observable.OnSubscribe<String>(){
    
                @Override
                public void call(Subscriber<? super String> subscriber) {
                    try {
                        Thread.sleep(1500);
                        subscriber.onNext("bbb");
                        subscriber.onCompleted();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).subscribeOn(Schedulers.newThread());
    
            Observable.merge(obs1,obs2)
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Subscriber<String>() {
                        StringBuffer sb=new StringBuffer();
                        @Override
                        public void onCompleted() {
                            mText.append("两个任务都处理完毕!!\n");
                            mText.append("更新数据:"+sb+"\n");
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onNext(String s) {
                            sb.append( s+",");
                            mText.append("得到一个数据:"+s+"\n");
                        }
                    });
        }
    }
    
    6.基于Rxjava的Binding
    基于RXJAVA的BINGING.gif
    public class RxBindingActivity extends AppCompatActivity implements View.OnClickListener{
    
        private TextView mText;
        private Button mBtn;
        private EditText mEdit;
        @Override
        protected void onCreate(Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.layout3);
            initView();
        }
    
        private void initView() {
            mText= (TextView) findViewById(R.id.text1);
            mEdit= (EditText) findViewById(R.id.edit1);
            mBtn= (Button) findViewById(R.id.button);
            mEdit.setHint("输入含有1的数字,下方才会出现提示");
            mText.setText("提示数据:\n");
    
            mBtn.setOnClickListener(this);
            mText.setOnClickListener(this);
            mEdit.setOnClickListener(this);
            //用来监听edittext输入,同时匹配输入数数据来提示
            RxTextView.textChanges(mEdit)
                    //在一次事件发生后的一段时间内没有新操作,则发出这次事件
                    .debounce(500,TimeUnit.MILLISECONDS)
                    //转换线程
                    .observeOn(Schedulers.newThread())
                    //通过输入的数据,来匹配"数据库"中的数据从而提示。。
                    .map(new Func1<CharSequence, List<String>>() {
                        List<String> list= new ArrayList<>();
                        @Override
                        public List<String> call(CharSequence charSequence) {
    
                            if (charSequence.toString().contains("1")){
                                for (int i=0;i<5;i++){
                                    list.add("11"+i);
                                }
                            }
                            return list;
                        }
                    })
                    //由于我不想要listl列表,所以使用了flatMap来分解成一个一个的数据发送
                    .flatMap(new Func1<List<String>, Observable<String>>() {
                        @Override
                        public Observable<String> call(List<String> strings) {
    
                            return Observable.from(strings);
                        }
                    })
                    //这里切换成主线程,不然没法操作组件
                    .observeOn(AndroidSchedulers.mainThread())
                    //这里做一些过滤动作
                    .filter(new Func1<String, Boolean>() {
                        @Override
                        public Boolean call(String s) {
                            return !mText.getText().toString().contains(s);
                        }
                    })
                    //订阅
                    .subscribe(new Action1<String>() {
                        @Override
                        public void call(String s) {
                            //这里展示提示数据
                            mText.append(s + "\n");
                        }
                    }, new Action1<Throwable>() {
                        @Override
                        public void call(Throwable throwable) {
                            Log.w("DDDDDDDD",throwable.getMessage().toString());
                        }
                    });
            mBtn.setText("连续点击防误触");
            RxView.clicks(mBtn)
                    //防误触(设定点击后500ms内丢弃新事件,或者说点击后500ms毫秒无响应)
                    .throttleFirst(500, TimeUnit.MILLISECONDS)
                    .subscribe(new Action1<Void>() {
                        //这就相当于OnClickListener中的OnClick方法回调
                        @Override
                        public void call(Void aVoid) {
                           mText.append("\n 防误触 测试  \n");
                        }
                    });
    
        }
    
        @Override
        public void onClick(View view) {
            switch (view.getId()){
                case R.id.text1:
                    break;
                case R.id.edit1:
                    break;
                case R.id.button:
                    mText.append("lalal\n");
                    break;
            }
        }
    }
    
    7.操作符--filter
    操作符-FILTER.gif
    public class RxFilterActivity extends AppCompatActivity implements View.OnClickListener{
    
        private TextView mText;
        private Button mBtn;
        private TextView mEdit;
        @Override
        protected void onCreate(Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.layout1);
            initView();
        }
    
        private void initView() {
            mText= (TextView) findViewById(R.id.text1);
            mEdit= (TextView) findViewById(R.id.edit1);
            mBtn= (Button) findViewById(R.id.button);
    
            mBtn.setOnClickListener(this);
            mText.setOnClickListener(this);
            mEdit.setOnClickListener(this);
            mEdit.setText("输入1-10,过滤掉能被2整除的数");
        }
    
        @Override
        public void onClick(View view) {
            switch (view.getId()){
                case R.id.text1:
                    break;
                case R.id.edit1:
                    break;
                case R.id.button:
    
                    start();
                    break;
            }
        }
    
        private void start() {
            Integer[] integers={1,2,3,4,5,6,7,8,9,10};
            Observable.from(integers)
                    .filter(new Func1<Integer, Boolean>() {
                        @Override
                        public Boolean call(Integer integer) {
                            return integer%2!=0;
                        }
                    })
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            mText.append(integer.toString()+",");
                        }
                    });
        }
    }
    
    8.操作符--take,doOnNext
    操作符-TAKE DOONNEXT.gif
    public class RxTakeActivity extends AppCompatActivity implements View.OnClickListener{
    
        private TextView mText;
        private Button mBtn;
        private TextView mEdit;
        private Integer [] number={1,2,3,4,5,6,7,8,9,10};
        @Override
        protected void onCreate(Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.layout1);
            initView();
        }
    
        private void initView() {
            mText= (TextView) findViewById(R.id.text1);
            mEdit= (TextView) findViewById(R.id.edit1);
            mBtn= (Button) findViewById(R.id.button);
    
            mEdit.setText("输出[1,2,3,4,5,6,7,8,9,10]中第三个和第四个奇数,\n\ntake(i) 取前i个事件 \ntakeLast(i) 取后i个事件 \ndoOnNext(Action1) 每次观察者中的onNext调用之前调用");
            mBtn.setOnClickListener(this);
            mText.setOnClickListener(this);
            mEdit.setOnClickListener(this);
    
        }
    
        @Override
        public void onClick(View view) {
            switch (view.getId()){
                case R.id.text1:
                    break;
                case R.id.edit1:
                    break;
                case R.id.button:
                    if(mText.getText().toString()!=null ||mText.getText().toString().length()>0){
                        mText.setText("");
                    }
                    start();
                    break;
            }
        }
    
        private void start() {
            Observable.from(number)
                      .filter(new Func1<Integer, Boolean>() {
                          @Override
                          public Boolean call(Integer integer) {
                              return integer%2!=0;
                          }
                      })
                        //取前四个
                        .take(4)
                        //取前四个中的后两个
                        .takeLast(2)
                        .doOnNext(new Action1<Integer>() {
                            @Override
                            public void call(Integer integer) {
                                mText.append("before onNext()\n");
                            }
                        })
                        .subscribe(new Action1<Integer>() {
                            @Override
                            public void call(Integer integer) {
                                mText.append("onNext()--->"+integer+"\n");
                            }
                        });
        }
    }
    
    9.操作符--Interval,取消订阅
    操作符-INTERVAL 取消订阅.gif
    public class RxTimerActivity extends AppCompatActivity implements View.OnClickListener{
    
        private TextView mText;
        private Button mBtn;
        private Button mBtnCancal;
        private TextView mEdit;
        private Subscription mSubscription=null;
        @Override
        protected void onCreate(Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.layout2);
            initView();
        }
    
        private void initView() {
            mText= (TextView) findViewById(R.id.text1);
            mEdit= (TextView) findViewById(R.id.edit1);
            mBtn= (Button) findViewById(R.id.button);
            mBtnCancal= (Button) findViewById(R.id.button_cancal);
            mEdit.setText("定时器,每一秒发送打印一个数字   \n\ninterval(1, TimeUnit.SECONDS)  创建一个每隔一秒发送一次事件的对象");
            mBtn.setOnClickListener(this);
            mText.setOnClickListener(this);
            mEdit.setOnClickListener(this);
            mBtnCancal.setOnClickListener(this);
        }
    
        @Override
        public void onClick(View view) {
            switch (view.getId()){
                case R.id.text1:
                    break;
                case R.id.edit1:
                    break;
                case R.id.button:
                    start();
                    break;
                case R.id.button_cancal:
                        //取消订阅
                         if (mSubscription!=null && !mSubscription.isUnsubscribed()){
                             mSubscription.unsubscribe();
                         }
                        break;
            }
        }
    
        private void start() {
            //interval()是运行在computation Scheduler线程中的,因此需要转到主线程
            mSubscription=Observable.interval(1, TimeUnit.SECONDS)
                        .observeOn(AndroidSchedulers.mainThread())
                      .subscribe(new Action1<Long>() {
                          @Override
                          public void call(Long aLong) {
                              mText.setText(aLong+"");
                          }
                      });
        }
    }
    
    10.操作符--toSortedList
    操作符--toSortedList.gif
    public class RxSortActivity extends AppCompatActivity implements View.OnClickListener{
    
        private TextView mText;
        private Button mBtn;
        private Button mBtnCancal;
        private TextView mEdit;
        private Subscription mSubscription=null;
        private Integer [] words={1,3,5,2,34,7,5,86,23,43};
        @Override
        protected void onCreate(Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.layout1);
            initView();
        }
    
        private void initView() {
            mText= (TextView) findViewById(R.id.text1);
            mEdit= (TextView) findViewById(R.id.edit1);
            mBtn= (Button) findViewById(R.id.button);
            mBtn.setText("开始排序");
    //        mBtnCancal= (Button) findViewById(R.id.button_cancal);
            mEdit.setText("为给定数据列表排序:1,3,5,2,34,7,5,86,23,43   \n\ntoSortedList() :为事件中的数据排序" );
            mBtn.setOnClickListener(this);
            mText.setOnClickListener(this);
            mEdit.setOnClickListener(this);
    //        mBtnCancal.setOnClickListener(this);
        }
    
        @Override
        public void onClick(View view) {
            switch (view.getId()){
                case R.id.text1:
                    break;
                case R.id.edit1:
                    break;
                case R.id.button:
                    start();
                    break;
    
            }
        }
    
        private void start() {
            //
            Observable.from(words)
                      .toSortedList()
                       .flatMap(new Func1<List<Integer>, Observable<Integer>>() {
                           @Override
                           public Observable<Integer> call(List<Integer> strings) {
                               return Observable.from(strings);
                           }
                       })
                      .subscribe(new Action1<Integer>() {
                          @Override
                          public void call(Integer strings) {
                              mText.append(strings+"\n");
                          }
                      });
        }
    }
    
    11.操作符--connect
    操作符--connect.gif
    public class RxConnetActivity extends AppCompatActivity implements View.OnClickListener{
    
        private TextView mText;
        private Button mBtn;
        private Button bntNormol;
        private TextView mEdit;
        private Subscription mSubscription=null;
        private Integer [] integer={1,2,3,4,5,6};
        @Override
        protected void onCreate(Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.layout2);
            initView();
        }
    
        private void initView() {
            mText= (TextView) findViewById(R.id.text1);
            mEdit= (TextView) findViewById(R.id.edit1);
            mBtn= (Button) findViewById(R.id.button);
            bntNormol= (Button) findViewById(R.id.button_cancal);
            mBtn.setText("正常情况下");
    
            bntNormol.setText("connect模式");
    
            mEdit.setText("Observable发送事件1-6,两个观察者同时观察这个Observable \n要求:每发出一个事件,观察者A和观察者都会收到,而不是先把所有的时间发送A,然后再发送给B  \n\n" );
            mBtn.setOnClickListener(this);
            mText.setOnClickListener(this);
            mEdit.setOnClickListener(this);
            bntNormol.setOnClickListener(this);
    
        }
    
        @Override
        public void onClick(View view) {
            switch (view.getId()){
                case R.id.text1:
                    break;
                case R.id.edit1:
                    break;
                case R.id.button:
                    mText.setText("");
                    normol();
                    break;
                case R.id.button_cancal:
                    mText.setText("");
                    start();
                    break;
    
            }
        }
    
        private void normol() {
            Observable  observable= Observable.from(integer);
            Action1 a1=new Action1<Integer>(){
                @Override
                public void call(Integer o) {
                    mText.append("观察者A  收到:  "+o+"\n");
                }
            };
            Action1 a2=new Action1<Integer>(){
                @Override
                public void call(Integer o) {
                    mText.append("观察者B  收到:  "+o+"\n");
                }
            };
    
            observable.subscribe(a1);
            observable.subscribe(a2);
    
        }
    
        private void start() {
    
            ConnectableObservable  observable= Observable.from(integer)
                                                        .publish();//将一个Observable转换为一个可连接的Observable
    
            Action1 a1=new Action1<Integer>(){
                @Override
                public void call(Integer o) {
                    mText.append("观察者A  收到:  "+o+"\n");
                }
            };
            Action1 a2=new Action1<Integer>(){
                @Override
                public void call(Integer o) {
                    mText.append("观察者B  收到:  "+o+"\n");
                }
            };
    
            observable.subscribe(a1);
            observable.subscribe(a2);
            observable.connect();
    
        }
    }
    
    12.操作符--
    操作符--.gif
    public class TimestampActivity extends AppCompatActivity implements View.OnClickListener{
    
        private TextView mText;
        private Button mBtn;
        private Button mBtnCancal;
        private TextView mEdit;
        private Subscription mSubscription=null;
        private Integer [] words={1,3,5,2,34,7,5,86,23,43};
        @Override
        protected void onCreate(Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.layout1);
            initView();
        }
    
        private void initView() {
            mText= (TextView) findViewById(R.id.text1);
            mEdit= (TextView) findViewById(R.id.edit1);
            mBtn= (Button) findViewById(R.id.button);
            mBtn.setText("开始");
    //        mBtnCancal= (Button) findViewById(R.id.button_cancal);
            mEdit.setText("为给定数据列表:1,3,5,2,34,7,5,86,23,43中每一个数据加上一个时间戳   \n\ntimestamp() :为每个事件加上一个时间戳" );
            mBtn.setOnClickListener(this);
            mText.setOnClickListener(this);
            mEdit.setOnClickListener(this);
    //        mBtnCancal.setOnClickListener(this);
        }
    
        @Override
        public void onClick(View view) {
            switch (view.getId()){
                case R.id.text1:
                    break;
                case R.id.edit1:
                    break;
                case R.id.button:
                    mText.setText("");
                    start();
                    break;
    
            }
        }
    
        private void start() {
            //
    //        Observable.from(words)
    //                  .toSortedList()
    //                   .flatMap(new Func1<List<Integer>, Observable<Integer>>() {
    //                       @Override
    //                       public Observable<Integer> call(List<Integer> strings) {
    //                           return Observable.from(strings);
    //                       }
    //                   })
    //                  .subscribe(new Action1<Integer>() {
    //                      @Override
    //                      public void call(Integer strings) {
    //                          mText.append(strings+"\n");
    //                      }
    //                  });
            Observable.from(words)
                    .timestamp()
    //                .timestamp(Schedulers.io()) 可指定线程环境,如果指定到子线程,请在最后切换成主线程
                    .subscribe(new Action1<Timestamped<Integer>>() {
                        @Override
                        public void call(Timestamped<Integer> integerTimestamped) {
    
                            SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd-HH:mm:ss");
                            mText.append("value: "+integerTimestamped.getValue()+"       time:   ");
                            mText.append(sdf.format(new Date(integerTimestamped.getTimestampMillis()))+"\n");
    
                        }
                    });
        }
    }
    
    

    相关文章

      网友评论

          本文标题:关于RxJava1.0最友好的文章(源码学习)

          本文链接:https://www.haomeiwen.com/subject/qqtchftx.html