UDN-企业互联网技术人气社区

板块导航

浏览  : 886
回复  : 0

[讨论交流] RxJava几个容易误解出错的地方

[复制链接]
哥屋恩的头像 楼主
发表于 2016-10-9 16:04:10 | 显示全部楼层 |阅读模式
  关于defer的用途

  defer操作符与create、just、from等操作符一样,是创建Observable的操作符,不过所有与该操作符相关的数据都是在订阅是才生效的。

  这里有个不好理解的地方.

  先看下面的例子:

  1.   static class SomeType {

  2.   private String value ="default";

  3.   public void setValue(String value) { this.value = value; }

  4.   public Observable valueObservable() {

  5.   return Observable.just(value);

  6.   }

  7.   public Observable valueObservableByDefer() {

  8.   return Observable.defer(()->Observable.just(value));

  9.   }

  10.   }
复制代码


  value默认值为"default",在subscribe前设置value值为"some value".

  如下代码, 如果我们使用instance.valueObservable()获取observable, 输出值为"default",而不是"Some value":

  1.   SomeType instance = new SomeType();

  2.   Observable value = instance.valueObservable();

  3.   instance.setValue("Some Value");

  4.   value.subscribe(System.out::println);
复制代码


  如下代码, 如果我们使用instance.valueObservableByDefer()获取observable, 输出值为"Some value":

  1.   SomeType instance = new SomeType();

  2.   Observable value = instance.valueObservableByDefer();

  3.   instance.setValue("Some Value");

  4.   value.subscribe(System.out::println);
复制代码


  valueObservableByDefer和valueObservable的区别是前者使用了defer.

  上面的例子能帮我们更好的理解下面这段话:

  The defer Observer allows you to defer or delay emitting items from an Observable until such time as an Observer subscribes to the Observable. This allows an Observer to easily obtain updates or a refreshed version of the sequence.

  可以简单的理解为defer创建的Observable是在subscribe调用时才创建的.

  操作到底在哪个线程执行

  平常我们通过subscribeOn指定执行的线程.比如下面, doSomething()就是在指定线程里执行的:

  
  1. Observable.create(s->{

  2.   s.onNext(doSomething());

  3.   s.onCompleted();

  4.   }).subscribeOn(Schedulers.io()).subscribe(System.out::println);
复制代码


  但是下面doSomething切是在当前线程而不是io线程执行的, 执行完成后,获取的值再传入just:

  
  1. Observable.just(doSomething())

  2.   .subscribeOn(Schedulers.io()).subscribe(System.out::println);
复制代码


  如果要在指定线程执行,可以采用上面Observable.create的做法, 也可以使用如下defer:

  
  1. Observable.defer(()->Observable.just(doSomething()))

  2.   .subscribeOn(Schedulers.io()).subscribe(System.out::println);
复制代码


  和时间相关的几个操作符是在computation Scheduler里

  执行的

  一般情况下,如果不指定执行线程,都在当前线程执行的,但是和时间相关的几个操作符,比如timer,delay,interval,它们都是在computation Scheduler里执行的.

  当然,它们也提供了对应的函数指定线程.

  比如delay,下面第一个是在computation Scheduler里执行的, 第二个可以指定scheduler:

  
  1. public final Observable delay(long delay, TimeUnit unit);

  2.   public final Observable delay(long delay, TimeUnit unit, Scheduler scheduler);
复制代码


  By default this variant of delay operates on the computation Scheduler, but you can choose a different Scheduler by passing it in as an optional third parameter to delay

  Schedulers.immediate和Schedulers.trampoline区别

  Schedulers.immediate和Schedulers.trampoline都在当前线程执行,区别是前者马上执行,后者是等队列中的其他任务完成后才执行.

  最明显的例子是,当你要执行一个任务,而任务中有要执行另一个任务时, 使用这两种方式就能明显看出区别.

  下面的例子比较容易看出区别.

  看下面代码任务outerAction中又启一个innerAction任务:

  1.   static void testScheduler(Scheduler scheduler){

  2.   Worker worker = scheduler.createWorker();

  3.   Action0 leafAction = () -> System.out.println("----leafAction.");

  4.   Action0 innerAction = () ->

  5.   {

  6.   System.out.println("--innerAction start.");

  7.   worker.schedule(leafAction);

  8.   System.out.println("--innerAction end.");

  9.   };

  10.   Action0 outerAction = () ->

  11.   {

  12.   System.out.println("outer start.");

  13.   worker.schedule(innerAction);

  14.   System.out.println("outer end.");

  15.   };

  16.   worker.schedule(outerAction);

  17.   }
复制代码


  使用Schedulers.immediate():

  testScheduler(Schedulers.immediate());

  输出:

  1.   outer start.

  2.   --innerAction start.

  3.   ----leafAction.

  4.   --innerAction end.

  5.   outer end.
复制代码


  使用Schedulers.trampoline():

  testScheduler(Schedulers.trampoline());

  输出:

  1.   outer start.

  2.   outer end.

  3.   --innerAction start.

  4.   --innerAction end.

  5.   ----leafAction.
复制代码


  使用默认值XXXOrDefault

  如果我们使用某种操作符获取某个数据,有可能数据根本就不存在, 则时候会引发NoSuchElementException或者IndexOutOfBoundsException等异常, 这时候可以采用相应的OrDefault版本,提供默认值.

  比如下面, 我们发射三个数据,然后忽略三个,再去取第一个,其实已经没有第一个了, 这时候会发生NoSuchElementException, onError被调用,打印"java.util.NoSuchElementException: Sequence contains no elements":

 
  1.  Observable.just(1,2,3).skip(3).first().subscribe(System.out::println

  2.   ,e->System.out.println(e.toString())

  3.   ,()->System.out.println("onCompleted"));
复制代码


  输出:

 
  1.  java.util.NoSuchElementException: Sequence contains no elements
复制代码


  如果使用firstOrDefault,输出默认值,且onCompleted会被调用.

  1.   Observable.just(1,2,3).skip(3).firstOrDefault(-100).subscribe(System.out::println

  2.   ,e->System.out.println(e.toString())

  3.   ,()->System.out.println("onCompleted"));
复制代码


  输出:

 
  1.  -100

  2.   onCompleted
复制代码


  有OrDefault版本的函数有如下:

  elementAtOrDefault, firstOrDefault, lastOrDefault, singleOrDefault

  zip函数的推迟功能

  combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function

  我们知道, zip将多个Observable相应位置上的数据(用item更贴切)按指定的函数合成一个结果,然后重新形成一个新的Observable.

  比如Observable1发射了第一个数据,那么必须等到Observable2发射了第一个数据,才能按指定的函数生成新的Observable的第一个数据.

  利用这个特性,我们可以做一些推迟的工作.

  比如如下, Observable1是一个执行5毫秒后发送一个数据的Observable,Observable2是执行doSomething()后产生的结果:

  1.   Observable.zip(Observable.timer(5,TimeUnit.MILLISECONDS)

  2.   ,Observable.just(doSomething()), (x,y)->y)

  3.   .subscribe(System.out::println));
复制代码


  上面的做法其实目的就是讲doSomething()产生的数据推迟5毫秒发射出来,但是要注意, doSomething()这个动作是马上执行的,而不是推迟5毫秒执行的,推迟的是数据的发射.

  以前刚接触时我误以为是doSomething()也推迟了,其实不是的.

  XXDelayError

  我们知道, 在进行一些合并操作时,如果碰到某个Observable发送了Error事件,则操作就会终止. 这时候如果需要先暂时忽略错误,将相应的操作进行完后再将发送Error事件,测可以用该方法对应的DelayError版本的方法.

  举个例子, Observable.concat是将几个Observable的数据合并, 如下所示,第一个Observable除了发射数据外,还会发射一个Error,如果使用concat, 则无法合并第二个Observable的内容, 具体如下:

  1.   Observable obs1 = Observable.create(s->{

  2.   s.onNext("a1");

  3.   s.onNext("a2");

  4.   s.onNext("a3");

  5.   s.onError(new Throwable("error from obs1"));

  6.   });

  7.   Observable obs2 = Observable.just("b1","b2","b3");
复制代码


  使用concat:

  1.   Observable.concat(obs1,obs2).subscribe(System.out::println

  2.   ,e->System.out.println(e.getMessage())

  3.   ,()->System.out.println("onCompleted"));
复制代码


  输出如下:

  1.   a1

  2.   a2

  3.   a3

  4.   error from obs1
复制代码


  改用concatDelayError,则Error会推迟到最后获得, 能获取到其他的数据, 具体如下:

  1.   Observable.concatDelayError(Arrays.asList(obs1,obs2)).subscribe(System.out::println

  2.   ,e->System.out.println(e.getMessage())

  3.   ,()->System.out.println("onCompleted"));
复制代码


  输出如下:

 
  1.  a1

  2.   a2

  3.   a3

  4.   b1

  5.   b2

  6.   b3

  7.   error from obs1
复制代码


  很多函数都有提供DelayError版本的方法, 比如:

  combineLatestDelayError,concatDelayError, mergeDelayError, concatMapDelayError, switchMapDelayError, switchOnNextDelayError.

原文作者:lluo2010  来源:开发者头条

相关帖子

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关于我们
联系我们
  • 电话:010-86393388
  • 邮件:udn@yonyou.com
  • 地址:北京市海淀区北清路68号
移动客户端下载
关注我们
  • 微信公众号:yonyouudn
  • 扫描右侧二维码关注我们
  • 专注企业互联网的技术社区
版权所有:用友网络科技股份有限公司82041 京ICP备05007539号-11 京公网网备安1101080209224 Powered by Discuz!
快速回复 返回列表 返回顶部