RxJava的使用

作者: 进击的大东 | 来源:发表于2018-06-03 21:54 被阅读30次

RxJava在Android中的使用

首先我们需要了解什么是观察者模式

举例说明,上代码(假如你交了三个女朋友,然后自己有状态更新的时候通知她们)
    //定义一个女朋友接口(观察者)
   public interface Observer {
    public String getName();
    void update(String state);
  }

具体的女朋友实现类,女朋友的名字,接收到新状态输出

     public class GFObserver implements Observer{
      private String name;
      public GFObserver(String name) {
    this.name=name;
      }
      public String getName() {
      return name;
      }
    @Override
    public void update(String state) {
    System.out.println(this.name+"接收到消息,男朋友最新状态"+state);
    }

      }

//定义一个主题状态基类,包含添加删除女朋友,以及通知女朋友你的状态

  import java.util.ArrayList;
  import java.util.List;

    public abstract class Subject {
/**
 * 保存女朋友的集合
 */
private List<Observer> observers=new ArrayList<>();

/**
 * 新交了个女朋友(添加)
 * @param observer
 */
  public void attach(Observer observer) {
    observers.add(observer);
    System.out.println("我新交女朋友了,她名字叫"+observer.getName());
  }

/**
 * 分手了(删除)
 * @param observer
 */
public void dettch(Observer observer ) {
    observers.remove(observer);
    System.out.println("我和"+observer.getName()+"她分手了");
}
/**
 * 通知所有女朋友更新状态
 * @param state
 */
public void notifyObserver(String state) {
    for(Observer observer:observers) {
        observer.update(state);
    }
  }
  }

//实现主题

public class MyStateSubject extends Subject{
  public void change(String state) {
    notifyObserver(state);
  }
}

//main方法调用

    public class ClientMain {
public static void main(String[] args) {
    //女朋友是观察者
    Observer observer=new GFObserver("罗小丽");
    Observer observer2=new GFObserver("赵美丽");
    Observer observer3=new GFObserver("Ammey");
    //你是被观察者,有新状态就通知女朋友
    MyStateSubject subject=new MassageSubject();
    subject.attach(observer);//在android相当于添加监听器
    subject.attach(observer2);
    subject.attach(observer3);
    subject.change("我在大保健");//在android相当于执行点击事件,android按钮点击属于观察者模式
    
    subject.dettch(observer);
    subject.change("泡吧");
}
}
在上面的例子中女朋友是观察者,你是被观察者,我们通过attach订阅关系,通过change来通知观察者状态的更新.以上就是一个简单的观察者模式的例子

观察者模式在rxjava中的解析

  • Observerble:被观察者(主题Subject)
  • Observer/Subscriber:观察者
  • Subscribe:订阅(相当于我们的attach在订阅完之后通过change发送订阅事件,在rxJava中订阅完之后会自动发送订阅事件)
  • Observerble和Observer通过subscribe()方法实现订阅关系
RxJavaAPI文档:http://reactivex.io/RxJava/2.x/javadoc/
RXJava WikiHome:https://github.com/ReactiveX/RxJava/wiki

简单使用

1.使用rxJava监听EditText输入变化
      RxTextView.textChanges(mEditSearch).debounce(200, TimeUnit.MILLISECONDS)
            .subscribeOn(AndroidSchedulers.mainThread())//监听变化需要早主线程
            .filter(new Func1<CharSequence, Boolean>() {
                @Override
                public Boolean call(CharSequence charSequence) {
                    //过滤数据
                    return charSequence.toString().trim().length()>0&&charSequence.toString().trim().contains("a");
                }
            })
            //flatMap
            .switchMap(new Func1<CharSequence, Observable<List<String>>>() {//2.0中Func1变成了ObserverbleSource
                @Override
                public Observable<List<String>> call(CharSequence charSequence) {
                    Log.d("SearchResult","flatMap=="+charSequence);
                    //search
                    List<String> list=new ArrayList<>();
                    list.add("abc");
                    list.add("ada");

                    return Observable.just(list);
                }
            })
            .subscribeOn(Schedulers.io())//网络请求在io
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Action1<List<String>>() {//在2.0版本中Action1是Custom
        @Override
        public void call(List<String> strings) {
            Log.d("SearchResult","list="+strings);
        }
    }, new Action1<Throwable>() {
        @Override
        public void call(Throwable throwable) {
            throwable.printStackTrace();
        }
    });
2.使用RxJava监听按钮点击(可以通过函数debounce或throttleFirst来实现防止按钮在短时间内连续点击)
      RxView.clicks(mButton).throttleFirst(1, TimeUnit.SECONDS).subscribe(new Observer<Void>() {
        @Override
        public void onCompleted() {
        }
        @Override
        public void onError(Throwable e) {
        }
        @Override
        public void onNext(Void aVoid) {
            Log.d("Example0Activity","我是按钮点击");
        }
    });
3.使用RxJava实现本地和网络数据合并
          private Observable<List<Course>> getDatasFromLocal(){
    List<Course> list=new ArrayList<>();
    list.add(new Course("1","菜鸟商城"));
    list.add(new Course("2","菜鸟新闻"));
    return Observable.just(list);
}
private Observable<List<Course>> getDatasFromNetWork(){
    //从网络获取数据
    return mApi.getCourses().subscribeOn(Schedulers.io());
}
//点击按钮合并数据
  public void click(View view) {
    Observable.merge(getDatasFromLocal(),getDatasFromNetWork()).subscribe(new Subscriber<List<Course>>() {
        @Override
        public void onCompleted() {
        }
        @Override
        public void onError(Throwable e) {
        }
        @Override
        public void onNext(List<Course> courses) {
            for(Course course:courses){
                Log.d("Example1Activity","course="+course);
            }
        }
    });
}
4.使用RxJava实现发送验证码倒计时
         int count=60;
public void click(View view) {
    Observable.interval(0,1, TimeUnit.SECONDS)
            .take(count+1)//倒计时的1没有就加1
            .map(new Func1<Long, Long>() {
                @Override
                public Long call(Long aLong) {
                    return count-aLong;
                }//让倒计时数值从大到小
            })
            .observeOn(AndroidSchedulers.mainThread())//在主线程更新UI
            .doOnSubscribe(new Action0() {//在倒计时中不能点击
                @Override
                public void call() {
                    mButton.setTextColor(Color.BLACK);
                    mButton.setEnabled(false);
                }
            })
            .subscribe(new Subscriber<Long>() {
        @Override
        public void onCompleted() {
            Log.d("Example2Activity","onCompleted");
            mButton.setEnabled(true);
            mButton.setText("发送验证码");
        }
        @Override
        public void onError(Throwable e) {
            e.printStackTrace();
        }
        @Override
        public void onNext(Long num) {
            Log.d("Example2Activity","onNext"+num);
            mButton.setText("剩余"+num+"秒");
        }
    });
}

使用RxJava封装一个类似Picasso的图片三级缓存加载框架

1.通过思考和查看api我们发现可以用操作符concat来实现三级缓存就近选取,即当缓存没有数据就去网络获取,缓存有数据就读取缓存数据.
  • 伪代码实现三级缓存,并且通过点击按钮来监测内存、磁盘是否有数据,有就拿到数据,没有就去网络获取;
    //从缓存获取数据

       private Observable<String> getMemoryObservable(){
            return Observable.create(new Observable.OnSubscribe<String>() {
          @Override
          public void call(Subscriber<? super String> subscriber) {
              subscriber.onNext("memory");//当此处为空时就去磁盘获取
              subscriber.onCompleted();
          }
      });
      }
    

//从磁盘获取数据

      private Observable<String> getDiskObservable(){
    return Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("disk");//当此处为空时就去网络获取
            subscriber.onCompleted();
        }
    });
}

//从网络获取数据

private Observable<String> getNetWorkObservable(){
    return Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("network");
            subscriber.onCompleted();
        }
    });
}

//点击按钮实现从三级缓存获取数据

      //首先获取到缓存和网络的数据对象
    final Observable<String> memoryObservable = getMemoryObservable();
    final Observable<String> diskObservable = getDiskObservable();
    final Observable<String> netWorkObservable = getNetWorkObservable();
    RxView.clicks(mButton).subscribe(new Action1<Void>() {
        @Override
        public void call(Void aVoid){Observable.concat(memoryObservable,diskObservable,netWorkObservable)
                .first(new Func1<String, Boolean>() {
                        @Override
                        public Boolean call(String s) {
                            return !TextUtils.isEmpty(s);
                      }
                    })//只取第一条,判断当前所获取是否为空,是空就去上级数据源获取数据
                    .subscribe(new Action1<String>() {
                        @Override
                        public void call(String s) {//日期输出你是从哪里获取的数据
                            Log.d("activity_example3","get data from--"+s);
                        }
                    });
        }
    });
2.查看Picasso是如何通过链式思想来封装的,把用RxJava处理三级缓存的代码块封装到框架中,通过一个图片加载的创造器来实现图片的加载和缓存并对外提供调用,链式编程思想最后封装并调用测试封装成果.
  • 定义一个缓存基类,提供从数据源读取数据和缓存数据的方法,把得到的数据转换为图片并加载到页面显示

        public abstract class CacheObservable {
        //从缓存获取数据并发送到页面
        public  Observable<Image> getImage(final String url){
    return Observable.create(new Observable.OnSubscribe<Image>() {
        @Override
        public void call(Subscriber<? super Image> subscriber) {
            if(!subscriber.isUnsubscribed()){
                Image image=getDataFromCache(url);//这里我们用到了Image,我们可以自己封装一个Image对象
                subscriber.onNext(image);
                subscriber.onCompleted();
            }
        }
    }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
      }
        //从缓存获取数据
        public abstract Image getDataFromCache(String url);
        //把数据加载到缓存
        public abstract void putDataToCache(Image image);
    
  • 封装Image对象

  public class Image {
  private String url;//图片地址
  private Bitmap bitmap;//图片对象
   
public Image(String url, Bitmap bitmap) {
    this.url = url;
    this.bitmap = bitmap;
}

public String getUrl() {
    return url;
}

public void setUrl(String url) {
    this.url = url;
}

public Bitmap getBitmap() {
    return bitmap;
}

public void setBitmap(Bitmap bitmap) {
    this.bitmap = bitmap;
}
}
  • 把图片放到缓存

      public class MemoryCacheObservble extends CacheObservable {
    int maxMemory= (int) (Runtime.getRuntime().maxMemory()/1024);
    int cacheSize=maxMemory/8;
    LruCache<String,Bitmap> mLruCache=new LruCache<String,       Bitmap>(cacheSize){
      @Override
      protected int sizeOf(String key, Bitmap value) {
          return value.getRowBytes()*value.getHeight()/1024;
      }
    };
    @Override
    public Image getDataFromCache(String url) {
      Bitmap bitmap = mLruCache.get(url);
      if(bitmap!=null){
          return new Image(url,bitmap);
      }
      return null;
      }
    
    @Override
    public void putDataToCache(Image image) {
      mLruCache.put(image.getUrl(),image.getBitmap());
    }
    }
    
  • 把图片放到磁盘

          public class DiskCacheObservble extends CacheObservable {
          private DiskLruCache mDiskLruCache;
          private Context mContext;
          private int maxSize=20*1024*1024;
          public DiskCacheObservble(Context context) {
          mContext = context;
          initDiskLruCache();
    }  
    
    @Override
      public Image getDataFromCache(String url) {
      Bitmap bitmap = getDataFromDiskLruCache(url);
      if(bitmap!=null){
          return new Image(url,bitmap);
      }
      return null;
      }
    
      @Override
      public void putDataToCache(final Image image) {
      Observable.create(new Observable.OnSubscribe<Image>() {
          @Override
          public void call(Subscriber<? super Image> subscriber) {
              putDataToDiskLrucache(image);
          }
      }).subscribeOn(Schedulers.io()).subscribe();
    }
    private void initDiskLruCache(){
      try {
          File cacheDir=DiskCacheUtil.getDiskCacheDir(this.mContext,"imge_cache");
          if(!cacheDir.exists()){
              cacheDir.mkdirs();
          }
          int versionCode = DiskCacheUtil.getAppVersion(mContext);
            mDiskLruCache=DiskLruCache.open(cacheDir,versionCode,1,maxSize);
      }catch (Exception e){
      }
    }
    private void putDataToDiskLrucache(Image image){
      try {
          //第一步:获取将要缓存的图片的对应唯一key值
          String key = DiskCacheUtil.getMd5String(image.getUrl());
          //第二步:获取DiskLruCahe的Editor
          DiskLruCache.Editor editor = mDiskLruCache.edit(key);
          if(editor!=null){
              //第三步:从Editor中获取outPutStream
              OutputStream outputStream = editor.newOutputStream(0);
              boolean isSuccessfull = download(image.getUrl(), outputStream);
              if(isSuccessfull){
                  editor.commit();;
              }else {
                  editor.abort();
              }
              mDiskLruCache.flush();
          }
      }catch (Exception e){}
    }
    private Bitmap getDataFromDiskLruCache(String url){
      FileDescriptor fileDescriptor=null;
      FileInputStream fileInputStream=null;
      DiskLruCache.Snapshot snapshot=null;
      try {
          final String key = DiskCacheUtil.getMd5String(url);
          //查找key对应的缓存
          snapshot=mDiskLruCache.get(key);
          if(snapshot!=null){
              fileInputStream= (FileInputStream) snapshot.getInputStream(0);
              fileDescriptor=fileInputStream.getFD();
          }
          //将数据解析成bitmap对象
          Bitmap bitmap=null;
          if(fileDescriptor!=null){
              bitmap=BitmapFactory.decodeFileDescriptor(fileDescriptor);
          }
          return  bitmap;
      }catch (Exception e){
          e.printStackTrace();
      }finally {
          if(fileDescriptor==null&&fileInputStream!=null){
              try {
                  fileInputStream.close();
              }catch (IOException e){}
          }
      }
      return null;
    }
    private boolean download(String urlString,OutputStream outputStream){
      HttpURLConnection urlConnection=null;
      BufferedOutputStream out=null;
      BufferedInputStream in=null;
      try {
          final URL url=new URL(urlString);
          urlConnection= (HttpURLConnection) url.openConnection();
          in=new BufferedInputStream(urlConnection.getInputStream(),8*1024);
          out=new BufferedOutputStream(outputStream,8*1024);
          int b;
          while ((b=in.read())!=-1){
              out.write(b);
          }
          return true;
      }catch (IOException e){
          e.printStackTrace();
      }finally {
          if(urlConnection!=null){urlConnection.disconnect();}
          try {
             if(out!=null){
                  out.close();}
                  if(in!=null){in.close();}
          } catch (IOException e) {
                  e.printStackTrace();
              }
      }
      return false;
    }
    }
    
  • 这里变我们要用到libcore.io.DiskLruCache中的DiskLruCache.java类可以在github上找到

  • DiskCacheUtil类获取缓存

      public class DiskCacheUtil {
      /**
       *
       * 获取app缓存路径
       * @param context
       * @param uniqueName
       * @return
       */
      public static File getDiskCacheDir(Context context, String uniqueName) {
      String cachePath;
      if (Environment.MEDIA_MOUNTED.equals(Environment.getExternalStorageState()) || !Environment.isExternalStorageRemovable()) {
          cachePath = context.getExternalCacheDir().getPath();
      } else {
          cachePath = context.getCacheDir().getPath();
      }
      return new File(cachePath + File.separator + uniqueName);
      }
    
      /**
       *
       * 获取app版本号
       * @param context
       * @return
       */
      public static int getAppVersion(Context context) {
      try {
          PackageInfo info = context.getPackageManager().getPackageInfo(context.getPackageName(), 0);
          return info.versionCode;
      } catch (PackageManager.NameNotFoundException e) {
          e.printStackTrace();
      }
      return 1;
      }
        public static String getMd5String(String key){
      String cacheKey;
      try {
          final MessageDigest mDigest = MessageDigest.getInstance("MD5");
          mDigest.update(key.getBytes());
          cacheKey=bytesToHexString(mDigest.digest());
      }catch (NoSuchAlgorithmException e){
          cacheKey=String.valueOf(key.hashCode());
      }
      return cacheKey;
        }
      private static String bytesToHexString(byte[] bytes){
      StringBuilder sb=new StringBuilder();
      for(int i=0;i<bytes.length;i++){
          String hex= Integer.toHexString(0xFF & bytes[i]);
          if(hex.length()==1){
              sb.append("0");
          }
          sb.append(hex);
      }
      return sb.toString();
      }
      }
    
  • 从网络获取图片

      public class NetWorkObservble extends CacheObservable {
      @Override
      public Image getDataFromCache(String url) {
      Bitmap bitmap=downLoadImage(url);
      if(bitmap!=null){
          return new Image(url,bitmap);
      }
      return null;
      }
    
        @Override
        public void putDataToCache(Image image) {
    
      }
      private Bitmap downLoadImage(String url){
      Bitmap bitmap= null;
      InputStream inputStream=null;
      try {
          URLConnection con=new URL(url).openConnection();
          inputStream=con.getInputStream();
          bitmap=BitmapFactory.decodeStream(inputStream);
      }catch (IOException e){
          e.printStackTrace();
      }finally {
          if(inputStream!=null){
              try {
                  inputStream.close();
              }catch (IOException e){
                  e.printStackTrace();
              }
          }
      }
      return bitmap;
      }
    }
    
  • 创建RequestCreator类,实现图片加载和缓存操作,并对外提供调用

      public class RequestCreator {
      private MemoryCacheObservble mMemoryCacheObservble;
      private DiskCacheObservble mDiskCacheObservble;
      private NetWorkObservble mNetWorkObservble;
    
      public RequestCreator(Context context) {
      mMemoryCacheObservble = new MemoryCacheObservble();
      mDiskCacheObservble = new DiskCacheObservble(context);
      mNetWorkObservble = new NetWorkObservble();
      }
    
      public rx.Observable<Image> getImageFromMemory(String url){
      return mMemoryCacheObservble.getImage(url).filter(new Func1<Image, Boolean>() {
          @Override
          public Boolean call(Image image) {
              return image!=null;
          }
      });
    }
      public rx.Observable<Image> getImageFromDisk(String url){
      return mDiskCacheObservble.getImage(url).filter(new Func1<Image, Boolean>() {
          @Override
          public Boolean call(Image image) {
              return image!=null;
          }
      }).doOnNext(new Action1<Image>() {
          @Override
          public void call(Image image) {
              mMemoryCacheObservble.putDataToCache(image);
          }
      });
      }
      public rx.Observable<Image> getImageFromNetWork(String url){
      return mNetWorkObservble.getImage(url)
              .filter(new Func1<Image, Boolean>() {
                  @Override
                  public Boolean call(Image image) {
                      return image!=null;
                  }
              })
              .doOnNext(new Action1<Image>() {
          @Override
          public void call(Image image) {
              mDiskCacheObservble.putDataToCache(image);
              mMemoryCacheObservble.putDataToCache(image);
          }
      });
      }
    }
    
  • 链式编程思想封装

            public class RxImageLoader {
          static RxImageLoader singleton;
          private String mUrl;
          private RequestCreator mRequestCreator;
          private RxImageLoader(Builder builder) {
      mRequestCreator=new RequestCreator(builder.mContext);
          }
    
        public static RxImageLoader with(Context context){
      if(singleton==null){
          synchronized (RxImageLoader.class){
              if(singleton==null){
                  singleton=new Builder(context).build();
              }
          }
      }
      return singleton;
      }
      public void into(final ImageView imageView){
      Observable.concat(mRequestCreator.getImageFromMemory(mUrl),
              mRequestCreator.getImageFromDisk(mUrl),
              mRequestCreator.getImageFromNetWork(mUrl))
              .first(new Func1<Image, Boolean>() {
                  @Override
                  public Boolean call(Image image) {
                      return image!=null;
                  }
              })//只取第一条
              .subscribe(new Observer<Image>() {
                  @Override
                  public void onCompleted() {
                  }
                  @Override
                  public void onError(Throwable e) {
                  }
                  @Override
                  public void onNext(Image image) {
                      imageView.setImageBitmap(image.getBitmap());
                  }
              });
     }
        //构造器模式
    
          public static class Builder{
      private Context mContext;
      public Builder(Context context) {
          this.mContext=context;
      }
      public RxImageLoader build(){
          return new RxImageLoader(this);
      }
      }
        public RxImageLoader load(String url){
        this.mUrl=url;
      return singleton;
      }
    }
    
  • 在页面中放一个按钮和图片组件,点击按钮加载图片测试封装的图片加载框架

         RxView.clicks(mButton).subscribe(new Action1<Void>() {
        @Override
        public void call(Void aVoid) {
            RxImageLoader.with(Example3Activity.this).load("https://www.baidu.com/img/bd_logo1.png?where=super").into(iv);                      
        }
    });

相关文章

网友评论

    本文标题:RxJava的使用

    本文链接:https://www.haomeiwen.com/subject/rtessftx.html