UDN-企业互联网技术人气社区

板块导航

浏览  : 937
回复  : 0

[讨论交流] RxJava几种场景的实现

[复制链接]
呵呵燕的头像 楼主
发表于 2016-10-5 17:06:30 | 显示全部楼层 |阅读模式
  1.推迟执行动作

  可以使用timer+map方法实现.代码如下:

  1.   Observable.timer(5, TimeUnit.MILLISECONDS).map(value->{

  2.   return doSomething();

  3.   }).subscribe(System.out::println);

  4.   }
复制代码


  2.推迟发送执行的结果

  这种场景要求产生数据的动作是马上执行,但是结果推迟发送.这和上面场景的是不一样的.

  这种场景可以使用Observable.zip来实现.

  zip操作符将多个Observable发射的数据按顺序组合起来,每个数据只能组合一次,而且都是有序的。最终组合的数据的数量由发射数据最少的Observable来决定。

  对于各个observable相同位置的数据,需要相互等待,也就说,第一个observable第一个位置的数据产生后,要等待第二个observable第一个位置的数据产生,等各个Observable相同位置的数据都产生后,才能按指定规则进行组合.这真是我们要利用的.

  zip有很多种声明,但大致上是一样的,就是传入几个observable,然后指定一个规则,对每个observable对应位置的数据进行处理,产生一个新的数据, 下面是其中一个最简单的:

  public static Observable zip(Observable o1, Observable o2, final Func2 zipFunction);

  用zip实现推送发送执行结果如下:

 
  1.  Observable.zip(Observable.timer(5,TimeUnit.MILLISECONDS)

  2.   ,Observable.just(doSomething()), (x,y)->y)

  3.   .subscribe(System.out::println));
复制代码


  3.使用defer在指定线程里执行某种动作

  如下面的代码,虽然我们指定了线程的运行方式,但是doSomething()这个函数还是在当前代码调用的线程中执行的.

  1.   Observable.just(doSomething())

  2.   .subscribeOn(Schedulers.io())

  3.   .observeOn(Schedulers.computation())

  4.   .subscribe(v->Utils.printlnWithThread(v.toString()););
复制代码


  通常我们采用下面的方法达到目的:

  1.   Observable.create(s->{s.onNext(doSomething());})

  2.   .subscribeOn(Schedulers.io())

  3.   .observeOn(Schedulers.computation())

  4.   .subscribe(v->{

  5.   Utils.printlnWithThread(v.toString());

  6.   });
复制代码


  但其实我们采用defer也能达到相同的目的.

  关于defer

  defer 操作符与create、just、from等操作符一样,是创建类操作符,不过所有与该操作符相关的数据都是在订阅是才生效的。

  声明:

  
  1. public static Observable defer(Func0> observableFactory);
复制代码


  defer的Func0里的Observable是在订阅(subscribe)的时候才创建的.

  作用:

  Do not create the Observable until an Observer subscribes; create a fresh Observable on each subscription.

  也就说observable是在订阅的时候才创建的.

  上面的问题用defer实现:

  1.   Observable.defer(()->Observable.just(doSomething()))

  2.   .subscribeOn(Schedulers.io())

  3.   .observeOn(Schedulers.computation())

  4.   .subscribe(v->{Utils.printlnWithThread(v.toString());

  5.   });
复制代码


  4.使用compose不要打断链式结构

  我们经常看到下面的代码:

  1.   Observable.just(doSomething())

  2.   .subscribeOn(Schedulers.io())

  3.   .observeOn(Schedulers.computation())

  4.   .subscribe(v->{Utils.printlnWithThread(v.toString());
复制代码


  上面的代码中,subscribeOn(xxx).observeOn(xxx)可能在很多地方都是一样的, 如果我们打算把它统一在某一个地方实现, 我们可以这么写:

  1.   private static Observable applySchedulers(Observable observable) {

  2.   return observable.subscribeOn(Schedulers.io())

  3.   .observeOn(Schedulers.computation());

  4.   }
复制代码


  但是这样每次我们需要调用上面的方法, 大致会像下面这样,最外面是一个函数,等于打破了链接结构:

  1.   applySchedulers(Observable.from(someSource).map(new Func1() {

  2.   @Override public Data call(Data data) {

  3.   return manipulate(data);

  4.   }

  5.   })

  6.   ).subscribe(new Action1() {

  7.   @Override public void call(Data data) {

  8.   doSomething(data);

  9.   }

  10.   });
复制代码


  可以使用compose操作符达到不打破链接结构的目的.

  compose的申明如下:

 
  1.  public Observable compose(Transformer transformer);
复制代码


  它的入参是一个Transformer接口,输出是一个Observable. 而Transformer实际上就是一个Func1, Observable>,换言之就是:可以通过它将一种类型的Observable转换成另一种类型的Observable.

  简单的说,compose可以通过指定的转化方式(输入参数transformer),将原来的observable转化为另外一种Observable.

  通过compose, 采用下面方式指定线程方式:

  1.   private static Transformer applySchedulers() {

  2.   return new Transformer() {

  3.   @Override

  4.   public Observable call(Observable observable) {

  5.   return observable.subscribeOn(Schedulers.io())

  6.   .observeOn(Schedulers.computation());

  7.   }

  8.   };

  9.   }

  10.   Observable.just(doSomething()).compose(applySchedulers())

  11.   .subscribe(v->{Utils.printlnWithThread(v.toString());

  12.   });
复制代码


  函数applySchedulers可以使用lambda表达式进一步简化为下面为:

  1.   private static Transformer applySchedulers() {

  2.   return observable->observable.subscribeOn(Schedulers.io())

  3.   .observeOn(Schedulers.computation());

  4.   }
复制代码


  5.按优先级使用不同的执行结果

  上面这个标题估计没表达清楚我想表达的场景. 其实我想表达的场景类似于平常的获取网络数据场景:如果缓存有,从缓存获取,如果没有,再从网络获取.

  这里要求,如果缓存有,不会做从网络获取数据的动作.

  这个可以采用concat+first实现.

  concat将几个Observable合并成一个Observable,返回最终的一个Observable. 而那些数据就像从一个Observable发出来一样. 参数可以是多个Observable,也可以是包含Observalbe的Iterator.

  新的observable内的数据排列按原来concat里的observable顺序排列,即新结果内的数据是按原来的顺序排序的.

  下面是上述需求的实现:

  1.   Observable.concat(getDataFromCache(),getDataFromNetwork()).first()

  2.   .subscribe(v->System.out.println("result:"+v));

  3.   //从缓存获取数据

  4.   private static Observable getDataFromCache(){

  5.   return Observable.create(s -> {

  6.   //dosomething to get data

  7.   int value = new Random().nextInt();

  8.   value = value%2;

  9.   if (value!=0){

  10.   s.onNext("data from cache:"+value); //产生数据

  11.   }

  12.   //s.onError(new Throwable("none"));

  13.   s.onCompleted();

  14.   }

  15.   );

  16.   }

  17.   //从网络获取数据

  18.   private static Observable getDataFromNetwork(){

  19.   return Observable.create(s -> {

  20.   for (int i = 0; i < 10; i++) {

  21.   Utils.println("obs2 generate "+i);

  22.   s.onNext("data from network:" + i); //产生数据

  23.   }

  24.   s.onCompleted();

  25.   }

  26.   );

  27.   }
复制代码


  上面的实现,如果getDataFromCache有数据, getDataFromNetwork这里的代码是不会执行的, 这正是我们想要的.

  上面实现有几个需要注意:

  有可能从两个地方都获取不到数据, 这种场景下使用first会抛出异常NoSuchElementException,如果是这样的场景,需要用firstOrDefault替换上面的first.

  上面getDataFromCache()里,如果没有数据,我们直接调用onCompleted,如果不调用onCompleted,而是调用onError,则上述采用concat是得不到任何结果的.因为concat在收到任何一个error,合并就会停止.所以,如果要用onError, 则需要用concatDelayError替代concat.concatDelayError会先忽略error,将error推迟到最后在处理.

原文作者:lluo2010  来源:开发者头条

相关帖子

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关于我们
联系我们
  • 电话:010-86393388
  • 邮件:udn@yonyou.com
  • 地址:北京市海淀区北清路68号
移动客户端下载
关注我们
  • 微信公众号:yonyouudn
  • 扫描右侧二维码关注我们
  • 专注企业互联网的技术社区
版权所有:用友网络科技股份有限公司82041 京ICP备05007539号-11 京公网网备安1101080209224 Powered by Discuz!
快速回复 返回列表 返回顶部