美文网首页
rxJava&rxAndroid-实际运用(1)

rxJava&rxAndroid-实际运用(1)

作者: ZzzRicardo_Yue | 来源:发表于2016-12-01 11:07 被阅读0次

    整理自:
    http://blog.chinaunix.net/uid-20771867-id-5187376.html


    1、使用subscriber的如下方式来创建Obserable

    Observable.create(new Observable.OnSubscribe< ArrayList<Song> >() {
                @Override
                public void call(Subscriber<? super ArrayList<Song>> subscriber){···}
    

    Subscriber的onNext方法不会自动执行,需要在call方法中手动调用

    Observable observable = Observable.create(new Observable.OnSubscribe< ArrayList<Song> >() {
    
                @Override
                public void call(Subscriber<? super ArrayList<Song>> subscriber)
                {
                    SongScanInteractor songScanInteractor=new SongScanInteractor(mContext);
                    songs=songScanInteractor.scanSong();
    
                    subscriber.onNext(songs);
                    subscriber.onCompleted();
                }
            });
    

    2、应当在Obserable中设置当Subscriber取消对Obserable的监听之后,Obserable不再调用Subscriber的onNext()方法(即发送消息),如下有一段代码示意

    private Observable<Integer> createObserver() {
            return Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    if (!subscriber.isUnsubscribed()) {
                        for (int i = 0; i < 5; i++) {
                            int temp = new Random().nextInt(10);
                            if (temp > 8) {
                                //if value>8, we make an error
                                subscriber.onError(new Throwable("value >8"));
                                break;
                            } else {
                                subscriber.onNext(temp);
                            }
                            // on error,complete the job
                            if (i == 4) {
                                subscriber.onCompleted();
                            }
                        }
                    }
                }
            });
        }
    

    3、Obserable发送的信号阻塞了Subscriber的信号处理,导致只有信号发送无信息处理

    现在发现Observable的一个特性,那就是Observable不间断发送信号(这里体现为手动调用onNext()),Subscriber的onNext()方法根本不会得到执行,因为来不及执行(我是这么理解的),所以我们需要加上Thread.sleep(400);这样的代码减缓Obserable发送请求的频率。
    同时我还发现,如果这个while(!subscriber.isUnsubscribed())里面的条件一直设为true,即写成while(true),当你把绑定的subscriber解绑之后再与该Obserable绑定,Obserable的onNext()方法依然无法得到执行,与上述不加Thread.sleep(400);的情况是一样的,即没有信号处理,只有信号发送。
    我的理解是,在解绑这段时间里Obserable不断发出的信号没有处理一直被积压,所以自然新加入的Subsciber自然没有能力处理这些积压的发送信号,所以瘫痪了···

    Observable observable=Observable.create(new Observable.OnSubscribe<double[]>()
            {
                @Override
                public void call(Subscriber<? super double[]> subscriber)
                {
                    while(!subscriber.isUnsubscribed())
                    {
                        try
                        {
                            Thread.sleep(400);
                        } catch (InterruptedException e)
                        {
                            e.printStackTrace();
                        }
                        double[] doubles =new double[2];
                        doubles[0] = DeviceUtils.getDeviceWidth(mContext);  //屏幕总长
                        doubles[1] =getMusicCurPos()/getDuration();   //歌曲播放比例
    
                        subscriber.onNext(doubles);
                        Log.d("PlayerProgress","onNext");
                    }
                }
            });
    

    相关文章

      网友评论

          本文标题:rxJava&rxAndroid-实际运用(1)

          本文链接:https://www.haomeiwen.com/subject/yolxmttx.html