UDN-企业互联网技术人气社区

板块导航

浏览  : 916
回复  : 0

[讨论交流] RxJava深入

[复制链接]
呵呵燕的头像 楼主
发表于 2016-11-27 14:13:32 | 显示全部楼层 |阅读模式
  RxJava源码解析

  地图平铺时间表应用

  RxJava源码解析

  观察者模式实现

  首先,我们先看看RxJava中各个对象的名称:

  可观察到的(被观察者),观察者(通常看到的是订户),订阅方法subscrib()。

  首先,创建一个“被观察者”(可观察)

  1.   Observable observable = Observable.create(new Observable.OnSubscribe() {

  2.   @Override

  3.   public void call(Subscriber subscriber) {

  4.   subscriber.onNext("Hello");

  5.   subscriber.onNext("Hi");

  6.   subscriber.onNext("RxJava");

  7.   subscriber.onCompleted();

  8.   }

  9.   });
复制代码


  其中创建的参数是OnSubscribe对象,OnSubscribe的作用相当于一个计划表,当观察到与oberver创建订阅关系的时候,这个OnSubscribe的计划(电话)就会被调用和执行。当然这个计划里面以认购(即观察者)为参数,从而做到通知观察者的操作。

  下面,就是创建一个观察者观察者(也就是用户)

  1.   Subscriber subscriber = new Subscriber() {

  2.   @Override

  3.   public void onNext(String s) {

  4.   Log.d(tag, "Item: " + s);

  5.   }

  6.   @Override

  7.   public void onCompleted() {

  8.   Log.d(tag, "Completed!");

  9.   }

  10.   @Override

  11.   public void onError(Throwable e) {

  12.   Log.d(tag, "Error!");

  13.   }

  14.   };
复制代码


  这里可以重写onNext,的onComplete,onError的等方法实现观察者对于被观察者作出的反应。

  最后,通过认购()方法,将两者关联起来,也就是订阅:

  
  1. observable.subscribe(subscriber);
复制代码


  当然,我们一般的写法是从observable.create开始一整个链式操作,上面这样写的话更加清晰一些。

  下面我们来看一下核心操作订阅的源码:

  1.   static Subscription subscribe(Subscriber subscriber, Observable observable) {

  2.   // validate and proceed

  3.   /**省略**/

  4.   // new Subscriber so onStart it

  5.   subscriber.onStart();

  6.   /*

  7.   * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls

  8.   * to user code from within an Observer"

  9.   */

  10.   // if not already wrapped

  11.   if (!(subscriber instanceof SafeSubscriber)) {

  12.   // assign to `observer` so we return the protected version

  13.   subscriber = new SafeSubscriber(subscriber);

  14.   }

  15.   // The code below is exactly the same an unsafeSubscribe but not used because it would

  16.   // add a significant depth to already huge call stacks.

  17.   try {

  18.   // allow the hook to intercept and/or decorate

  19.   RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);

  20.   return RxJavaHooks.onObservableReturn(subscriber);

  21.   }

  22.   /**省略**/

  23.   }
复制代码


  可以看到,核心的一条语句是“RxJavaHooks.onObservableStart(观察到,observable.onSubscribe).CALL(用户);”

  其中调用(订户)就是我们在上面创建OnSubscribe对象重写的调用方法,那么很明显,前面的RxJavaHooks.onObservableStart(观察到,observable.onSubscribe)就是返回了OnSubscribe对象,具体可以看下面这段代码:

  1.   @Deprecated

  2.   public OnSubscribe onSubscribeStart(Observable observableInstance, final OnSubscribe onSubscribe) {

  3.   // pass through by default

  4.   return onSubscribe;

  5.   }
复制代码


  到这里的代码逻辑都非常清晰,当订阅函数订阅被调用的时候,OnSubscribe对象就会执行调用方法,开始事件的发生,然后以用户为参数,开始事件的消费,消费主要在onNext,的onComplete,onerror的中实现。

  图()源码

  地图是RxJava中用的比较多的操作符,先看看它的使用:

  1.   Observable.create(new Observable.OnSubscribe() {

  2.   @Override

  3.   public void call(Subscriber subscriber) {

  4.   subscriber.onNext("hello");

  5.   }

  6.   })

  7.   .map(new Func1() {

  8.   @Override

  9.   public String call(String s) {

  10.   return s + " rx";

  11.   }

  12.   })

  13.   .subscribe(new Subscriber() {

  14.   @Override

  15.   public void onCompleted() {

  16.   }

  17.   @Override

  18.   public void onError(Throwable e) {

  19.   }

  20.   @Override

  21.   public void onNext(String s) {

  22.   showToast(s);

  23.   }

  24.   });
复制代码


  上面这段代码,增加了一个地图操作,里面的参数是FUN1的子类,实现了把字符串加上“RX”的操作,那么可以猜到最终onNext输出的结果是“你好RX”。我们来看看它是如何实现的:

  下面是地图()函数的源码:

  1.   public final Observable map(Func1 func) {

  2.   return create(new OnSubscribeMap(this, func));

  3.   }
复制代码


  创建函数,我们在上一部分的时候已经看到过,生成一个可观察的对象,我们称之为observable2,因为在地图之前,我们已经有一个observable1,按照我们在第一部分的逻辑,后面调用订阅()函数的时候,应该调用了observable2所对应的OnSubscribe对象的调用方法,再仔细看一下上面的创建方法,其实就是OnSubscribeMap的调用方法。

  我们就聚焦OnSubscribeMap这个类来看看它是如何实现修改输出结果的:

  1.   public final class OnSubscribeMap implements OnSubscribe {

  2.   final Observable source;

  3.   final Func1 transformer;

  4.   public OnSubscribeMap(Observable source, Func1 transformer) {

  5.   this.source = source;

  6.   this.transformer = transformer;

  7.   }

  8.   @Override

  9.   public void call(final Subscriber o) {

  10.   MapSubscriber parent = new MapSubscriber(o, transformer);

  11.   o.add(parent);

  12.   source.unsafeSubscribe(parent);

  13.   }

  14.   }
复制代码


  这个类的两个主要函数:

  先看构造函数:源,就是我们上面说的observable1; 变压器,就是上面的变化方法FUNC1。

  再看调用方法,因为上面说过,一执行订阅就会调用这个电话。

  可以看到先新了一个MapSubscriber对象,然后订阅了这个MapSubscriber,是谁订阅了?,是源,就是observable1,那么看看observable1的调用方法执行了什么,执行了subscribe.onNext(“你好”)。

  那么接下来的操作就应该是执行MapSubscriber的onNext(“你好”),于是看看MapSubscriber类的源码:

  1.   static final class MapSubscriber extends Subscriber {

  2.   final Subscriber actual;

  3.   final Func1 mapper;

  4.   boolean done;

  5.   public MapSubscriber(Subscriber actual, Func1 mapper) {

  6.   this.actual = actual;

  7.   this.mapper = mapper;

  8.   }

  9.   @Override

  10.   public void onNext(T t) {

  11.   R result;

  12.   try {

  13.   result = mapper.call(t);

  14.   } catch (Throwable ex) {

  15.   Exceptions.throwIfFatal(ex);

  16.   unsubscribe();

  17.   onError(OnErrorThrowable.addValueAsLastCause(ex, t));

  18.   return;

  19.   }

  20.   actual.onNext(result);

  21.   }

  22.   @Override

  23.   public void onError(Throwable e) {

  24.   if (done) {

  25.   RxJavaHooks.onError(e);

  26.   return;

  27.   }

  28.   done = true;

  29.   actual.onError(e);

  30.   }

  31.   @Override

  32.   public void onCompleted() {

  33.   if (done) {

  34.   return;

  35.   }

  36.   actual.onCompleted();

  37.   }

  38.   @Override

  39.   public void setProducer(Producer p) {

  40.   actual.setProducer(p);

  41.   }

  42.   }
复制代码


  又是分2步看:

  构造函数,实际是订阅的用户,也就是我们自己写的订户;映射器,还是我们自己写的转化函数FUNC1。

  onNext()方法,这里是最后的重点了:调用了mapper.call(T),按照上面传过来的,也就是mapper.call(“你好”),因此结果就是“你好RX”,然后是实际的。 onNext(结果),实际是我们自己写的用户,那么就是showToast(结果)。

  大功告成!

  项目中的应用

  上面简单分析了部分功能的源码,而这只是RxJava的冰山一角,其中最重要的一个特性就是:日程参数化异步数据流,以及地图延伸出来的flatmap,电梯等操作。

  下面就给出我在项目中应用的一个例子:

  1.   addSubscription(APIFactory.instance().getPayInfo(mOrder.getId())

  2.   .observeOn(Schedulers.io())

  3.   .map(new Func1() {

  4.   @Override

  5.   public PayResult call(PayResponse payResponse) {

  6.   return OrderFactory.aliPayThread(

  7.   OrderInfoActivity.this, payResponse.getPayInfo());

  8.   }

  9.   })

  10.   .filter(new Func1() {

  11.   @Override

  12.   public Boolean call(PayResult payResult) {

  13.   return payResult != null &&

  14.   payResult.getResultStatus().equals(OrderFactory.SUCCESS_CODE);

  15.   }

  16.   }).flatMap(new Func1>() {

  17.   @Override

  18.   public Observable call(PayResult payResult) {

  19.   return ApiFactory.instance().payNotify(payResult);

  20.   }

  21.   }).subscribe(new Action1() {

  22.   @Override

  23.   public void call(Order order) {

  24.   initView(order);

  25.   }

  26.   }, toastErrorAction1()));
复制代码


  上述代码是实现了一个支付流程,其中涉及到的3个主要操作:

  网络请求后端对应订单的支付信息(getPayInfo)

  调用支付宝SDK支付(aliPayThread,并且一定要在非UI线程中进行)

  支付成功则网络通知后端,后端返回支付成功的订单信息(payNotify)

  这三个操作,有2个网络请求,1个线程操作,当时没有用RxJava的时候,用了处理程序,线程,支付成功接口,导致代码量很大,嵌套代码很多,两个文件,非常难读。

  现在,用了RxJava,地图,flatmap,observeOn,一个链式操作就搞定了!

  最后,引用一句接收官网的话:“

  上,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流

  ReactiveX不仅仅是一个编程接口,它是一种编程思想的突破,它影响了许多其它的程序库和框架以及编程语言。 “

原文作者:sunbinqiang 来源:开发者头条
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关于我们
联系我们
  • 电话:010-86393388
  • 邮件:udn@yonyou.com
  • 地址:北京市海淀区北清路68号
移动客户端下载
关注我们
  • 微信公众号:yonyouudn
  • 扫描右侧二维码关注我们
  • 专注企业互联网的技术社区
版权所有:用友网络科技股份有限公司82041 京ICP备05007539号-11 京公网网备安1101080209224 Powered by Discuz!
快速回复 返回列表 返回顶部