UDN-企业互联网技术人气社区

板块导航

浏览  : 1002
回复  : 0

[讨论交流] 浅谈RxJava与2.0的新特性

[复制链接]
舞操的头像 楼主
发表于 2016-9-3 17:23:05 | 显示全部楼层 |阅读模式
  简介

  说起 RxJava ,相信诸多 Android 开发者都不会陌生。作为一个知名的响应式编程库,从前年开始逐渐变得火热,从小众到被众多 Android 开发者们广泛引入与流传,其在 GitHub 的仓库截止笔者写这篇文章时,已经有16400+个 star 。甚至有一些大牛专门为 Android 写了 RxJava 的适配库,如

  RxAndroid

  RxBinding

  RxLifecycle

  为什么 RxJava 如此受到 Android 开发者们的欢迎。我想不外乎两个原因。 1. 异步 2. 链式操作

  异步

  对 Android 线程有所了解的朋友都知道, Android的 UI 绘制 与 事件响应是在主线程的,为了保证界面的流畅性,很多耗时操作如读写数据库、读写文件、请求网络,我们都会挪到异步线程去完成,再回调到主线程。当然在4.0以后主线程直接就不允许请求网络了。

  在过去没有 RxJava 的时候,开发者一般都是通过 AsyncTask , Thread ,更好些的就是通过线程池来完成这些任务。而有了 RxJava 以后,简简单单的一句话就可以随意的切换线程,简直不用太舒服。

  最典型的 RxJava 中的Observable类,提供了2个函数, 分别是subscribeOn与observeOn。前者可以切换被观察时的线程(如果说数据发射的线程不够严谨,数据并非一定在观察时发射的,尤其是开发者自定义OnSubscribe时),后者可以切换数据被消费时的线程。

  举一个切换线程的例子:

  1.   Log.i("debug", Thread.currentThread().getName());

  2.   Observable.empty()

  3.   .doOnCompleted(new Action0() {

  4.   @Override

  5.   public void call() {

  6.   Log.i("debug", Thread.currentThread().getName());

  7.   }

  8.   })

  9.   .subscribeOn(Schedulers.io())

  10.   .observeOn(AndroidSchedulers.mainThread())

  11.   .doOnCompleted(new Action0() {

  12.   @Override

  13.   public void call() {

  14.   Log.i("debug", Thread.currentThread().getName());

  15.   }

  16.   })

  17.   .subscribe();
复制代码


  这里我们没有任何数据,就仅仅发射了一个onComplete,但是在切换线程的代码中,我们增加了onComplte时要额外执行的代码,输出结果如下:

 
  1.  08-27 10:47:41.173 6741-6741/com.dieyidezui.rxjavademo I/debug: main

  2.   08-27 10:47:41.201 6741-6762/com.dieyidezui.rxjavademo I/debug: RxIoScheduler-2

  3.   08-27 10:47:41.217 6741-6741/com.dieyidezui.rxjavademo I/debug: main
复制代码


  这仅仅是简单的例子, RxJava 提供了很多便捷的操作符供我们使用,如map、filter、flatMap、merge、concat等。可见当熟练使用后对我们的编程效率确实有很大帮助。尤其是 MVP 模式, RxJava 与之结合可谓是”天作之合”。

  链式操作

  上面笔者演示的代码其实就是 RxJava 的典型使用方式:

  发射数据源

  中间操作

  处理结果

  其中中间操作包含诸多用法, 如果切换线程,变换数据等。

  为什么我说链式操作很好。第一,链式逻辑替代深度回调逻辑,容易编写,不易出 BUG 。第二,RxJava 提供诸多了整体处理数据的操作符,非常实用。第三,配合 Java8 的 lambda 表达式,使代码简短优雅。

  好了,对 RxJava 的介绍就此为止了。进阶用法、原理剖析以后会有专门的文章。对 RxJava 不熟悉的同学,建议先去看一下官方的 wiki 。链接:https://github.com/ReactiveX/RxJava/wiki

  RxJava2.0

  前天, RxJava终于发布了2.0 RC1 版本,一直关注于此的笔者立刻就进去尝鲜了。结合官方的介绍,笔者总结并翻译了一些与 1.x 的异同与大家分享。

  包名与MAVEN依赖

  首先要说的就是 RxJava 2和1是互相独立的。因此包名与 maven 的依赖也是不一样的,就类似于 OkHttp 3与2一样。 RxJava 2.x的依赖是全新的io.reactivex.rxjava2:rxjava:2.x.y,并且类处于该io.reactivex包名下,而不再是rx。

  接口变化

  RxJava2 是遵循 Reactive Streams Specification 的规范完成的,新的特性依赖其提供的4个基础接口。分别是

  Publisher

  Subscriber

  Subscription

  Processor

  Flowable与Observable

  新的实现叫做Flowable, 同时旧的Observable也保留了。因为在 RxJava1.x 中,有很多事件不被能正确的背压,从而抛出MissingBackpressureException。

  举个简单的例子,在 RxJava1.x 中的 observeOn, 因为是切换了消费者的线程,因此内部实现用队列存储事件。在 Android 中默认的 buffersize 大小是16,因此当消费比生产慢时, 队列中的数目积累到超过16个,就会抛出MissingBackpressureException, 初学者很难明白为什么会这样,使得学习曲线异常得陡峭。

  而在 2.0 中,Observable 不再支持背压,而Flowable 支持非阻塞式的背压。并且规范要求,所有的操作符强制支持背压。幸运的是, Flowable 中的操作符大多与旧有的 Observable 类似。

  Single、Completable

  Single 与 Completable 都基于新的 Reactive Streams 的思想重新设计了接口,主要是消费者的接口, 现在他们是这样的:

  1.   interface SingleObserver {

  2.   void onSubscribe(Disposable d);

  3.   void onSuccess(T value);

  4.   void onError(Throwable error);

  5.   }

  6.   interface CompletableObserver {

  7.   void onSubscribe(Disposable d);

  8.   void onComplete();

  9.   void onError(Throwable error);

  10.   }
复制代码


  Subscriber

  对比一下 Subscriber :

  1.   public interface Subscriber {

  2.   public void onSubscribe(Subscription s);

  3.   public void onNext(T t);

  4.   public void onError(Throwable t);

  5.   public void onComplete();

  6.   }
复制代码


  我们会发现和以前不一样的是多了一个onSubscribe的方法,Subscription如下:

  Subscription

   
  1.  public interface Subscription {

  2.   public void request(long n);

  3.   public void cancel();

  4.   }
复制代码


  熟悉 RxJava 1.x 的朋友能发现, 新的Subscription更像是综合了旧的Producer与Subscription的综合体。他既可以向上游请求数据,又可以打断并释放资源。而旧的Subscription在这里因为名字被占,而被重新命名成了Disposable

  Disposable

  
  1.   public interface Disposable {

  2.   void dispose();

  3.   boolean isDisposed();

  4.   }
复制代码


  这里最大的不同就是这个onSubscribe,根据 Specification, 这个函数一定是第一个被调用的, 然后就会传给调用方一个Subscription,通过这种方式组织新的背压关系。当我们消费数据时,可以通过Subscription对象,自己决定请求数据。

  这里就可以解释上面的非阻塞的背压。旧的阻塞式的背压,就是根据下游的消费速度,中游可以选择阻塞住等待下游的消费,随后向上游请求数据。而新的非阻塞就不在有中间阻塞的过程,由下游自己决定取多少,还有背压策略,如抛弃最新、抛弃最旧、缓存、抛异常等。

  而新的接口带来的新的调用方式与旧的也不太一样, subscribe后不再会有 Subscription 也就是如今的 Disposable,为了保持向后的兼容, Flowable 提供了 subscribeWith方法 返回当前的Subscriber对象, 并且同时提供了DefaultSubscriber, ResourceSubscriber,DisposableSubscriber,让他们提供了Disposable接口, 可以完成和以前类似的代码:

 
  1.  ResourceSubscriber subscriber = new ResourceSubscriber() {

  2.   @Override

  3.   public void onStart() {

  4.   request(Long.MAX_VALUE);

  5.   }

  6.   @Override

  7.   public void onNext(Integer t) {

  8.   System.out.println(t);

  9.   }

  10.   @Override

  11.   public void onError(Throwable t) {

  12.   t.printStackTrace();

  13.   }

  14.   @Override

  15.   public void onComplete() {

  16.   System.out.println("Done");

  17.   }

  18.   };

  19.   Flowable.range(1, 10).delay(1, TimeUnit.SECONDS).subscribe(subscriber);

  20.   subscriber.dispose();
复制代码


  收回 create 方法权限

  在RxJava 1.x 最明显的问题就是由于 create 的太过开放,导致其被开发者滥用,而不是学习使用提供的操作符。

  并且用户对 RxJava 不够了解,导致各种各样的问题,如背压、异常处理等。

  由于规范要求所有的操作符强制支持背压,因此新的 create 采用了保守的设计,让用户实现FlowableOnSubscribe接口,并选取背压策略,然后在内部实现封装支持背压,简单的例子如下:

  1.   Flowable.create((FlowableEmitter emitter) -> {

  2.   emitter.onNext(1);

  3.   emitter.onNext(2);

  4.   emitter.onComplete();

  5.   }, BackpressureStrategy.BUFFER);
复制代码

  Functions可以抛出异常

  新的ActionX、FunctionX的方法声明都增加了一个throws Exception,这带来了显而易见的好处,现在我们可以这样写:

 
  1.  Flowable.just("file.txt")

  2.   .map(name -> Files.readLines(name))

  3.   .subscribe(lines -> System.out.println(lines.size()), Throwable::printStackTrace);
复制代码


  而在以前是不行的, 因为Files.readLines(name)会显式的抛出一个IOException。这样对 lambda 更加友好,而不必再去 try catch 。

  Scheduler可以直接schedule

  在以前是必须要先createWorker,用 Worker 对象去 shedule, 现在可以直接在Scheduler用这些方法:

 
  1.  public abstract class Scheduler {

  2.   public Disposable scheduleDirect(Runnable task) { ... }

  3.   public Disposable scheduleDirect(Runnable task, long delay, TimeUnit unit) { ... }

  4.   public Disposable scheduleDirectPeriodically(Runnable task, long initialDelay,

  5.   long period, TimeUnit unit) { ... }

  6.   public long now(TimeUnit unit) { ... }

  7.   // ... rest is the same: lifecycle methods, worker creation

  8.   }
复制代码


  这算是一个小优化,方便开发者使用。

  Observable的一些继承并入了Flowable中

  如ConnectableObservable、BlockObservable等,这样可以直接在Flowable中写出这样的代码:

  
  1. List list = Flowable.range(1, 100).toList().blockingFirst();
复制代码


  其他修改

  还有一些普通开发者不太在意的修改:

  hook方式变化,现在可以通过提供接口在 runtime hook

  部分在 1.x 中 被标记@Beta、@Experimental的操作符现在合并到正式版里了

  由于类结构的变动,一些类名的变化

  等其他变动。

  结语

  RxJava 作为开源的经典之作,笔者一直都有所关注。后续笔者会继续为大家带来 RxJava 的源码解析与进阶使用系列等。感谢大家的阅读,如有不知之处,欢迎讨论交流。

原文作者:佚名 来源:开发者头条

相关帖子

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关于我们
联系我们
  • 电话:010-86393388
  • 邮件:udn@yonyou.com
  • 地址:北京市海淀区北清路68号
移动客户端下载
关注我们
  • 微信公众号:yonyouudn
  • 扫描右侧二维码关注我们
  • 专注企业互联网的技术社区
版权所有:用友网络科技股份有限公司82041 京ICP备05007539号-11 京公网网备安1101080209224 Powered by Discuz!
快速回复 返回列表 返回顶部