UDN-企业互联网技术人气社区

板块导航

浏览  : 1174
回复  : 0

[讨论交流] 谜之RxJava (二) —— Magic Lift

[复制链接]
呵呵燕的头像 楼主
发表于 2016-11-27 14:22:33 | 显示全部楼层 |阅读模式
  回顾

  上一篇文章 讲了Observable、OnSubscribe和Subscriber之间的关系。 我们知道,Observable的具体工作都是在OnSubscribe中完成的。从这个类名我们也知道,如果生成了一个Observable对象,而不进行subscribe,那么什么都不会发生!

  OK,RxJava最让人兴奋的就是它有各种各样的操作符,什么map呀,flatMap呀各种,我们今天要知其然知其所以然,那么他们是如何实现功能的呢?

  例子

  1.   Observable.create(new Observable.OnSubscribe() {

  2.   @Override

  3.   public void call(Subscriber subscriber) {

  4.   subscriber.onNext("hello");

  5.   }

  6.   })

  7.   .map(new Func1() {

  8.   @Override

  9.   public String call(String s) {

  10.   return s + "word";

  11.   }

  12.   })

  13.   .subscribe(new Subscriber() {

  14.   @Override

  15.   public void onCompleted() {

  16.   }

  17.   @Override

  18.   public void onError(Throwable e) {

  19.   }

  20.   @Override

  21.   public void onNext(String s) {

  22.   Log.d("rx", s);

  23.   }

  24.   });
复制代码


  lift

  我们先看下进行链式调用map之后,发生了什么。

  1.   public final Observable map(Func1 func) {

  2.   return lift(new OperatorMap(func));

  3.   }
复制代码


  对,就是调用了lift函数!,然后把我们的转换器(Transfomer,我好想翻译成变形金刚)传入进去,看下它做了什么事。

  1.   public final Observable lift(final Operator operator) {

  2.   return new Observable(new OnSubscribe() {

  3.   @Override

  4.   public void call(Subscriber o) {

  5.   try {

  6.   Subscriber st = hook.onLift(operator).call(o);

  7.   try {

  8.   // new Subscriber created and being subscribed with so 'onStart' it

  9.   st.onStart();

  10.   onSubscribe.call(st);

  11.   } catch (Throwable e) {

  12.   // localized capture of errors rather than it skipping all operators

  13.   // and ending up in the try/catch of the subscribe method which then

  14.   // prevents onErrorResumeNext and other similar approaches to error handling

  15.   if (e instanceof OnErrorNotImplementedException) {

  16.   throw (OnErrorNotImplementedException) e;

  17.   }

  18.   st.onError(e);

  19.   }

  20.   } catch (Throwable e) {

  21.   if (e instanceof OnErrorNotImplementedException) {

  22.   throw (OnErrorNotImplementedException) e;

  23.   }

  24.   // if the lift function failed all we can do is pass the error to the final Subscriber

  25.   // as we don't have the operator available to us

  26.   o.onError(e);

  27.   }

  28.   }

  29.   });

  30.   }
复制代码


  来,我来简化一下

  1.   public final Observable lift(final Operator operator) {

  2.   return new Observable(...);

  3.   }
复制代码


  返回了一个新的Observable对象,这才是重点! 这种链式调用看起来特别熟悉?有没有像JavaScript中的Promise/A,在then中返回一个Promise对象进行链式调用?

  OK,那么我们要看下它是如何工作的啦。

  在map()调用之后,我们操作的就是新的Observable对象,我们可以把它取名为Observable$2,OK,我们这里调用subscribe,完整的就是Observable$2.subscribe,继续看到subscribe里,重要的几个调用:

  1.   hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);

  2.   return hook.onSubscribeReturn(subscriber);
复制代码

  注意注意 ! 这里的observable是Observable$2!!也就是说,这里的onSubscribe是,lift中定义的!!

  OK,我们追踪下去,回到lift的定义中。

  1.   return new Observable(new OnSubscribe() {

  2.   @Override

  3.   public void call(Subscriber o) {

  4.   try {

  5.   Subscriber st = hook.onLift(operator).call(o);

  6.   try {

  7.   // new Subscriber created and being subscribed with so 'onStart' it

  8.   st.onStart();

  9.   onSubscribe.call(st); //请注意我!! 这个onSubscribe是原始的OnSubScribe对象!!

  10.   } catch (Throwable e) {

  11.   // localized capture of errors rather than it skipping all operators

  12.   // and ending up in the try/catch of the subscribe method which then

  13.   // prevents onErrorResumeNext and other similar approaches to error handling

  14.   if (e instanceof OnErrorNotImplementedException) {

  15.   throw (OnErrorNotImplementedException) e;

  16.   }

  17.   st.onError(e);

  18.   }

  19.   } catch (Throwable e) {

  20.   if (e instanceof OnErrorNotImplementedException) {

  21.   throw (OnErrorNotImplementedException) e;

  22.   }

  23.   // if the lift function failed all we can do is pass the error to the final Subscriber

  24.   // as we don't have the operator available to us

  25.   o.onError(e);

  26.   }

  27.   }

  28.   });
复制代码


  一定一定要注意这段函数执行的上下文!,这段函数中的onSubscribe对象指向的是外部类,也就是第一个Observable的onSubScribe!而不是Observable$2中的onSubscribe,OK,谨记这一点之后,看看

  1.   Subscriber st = hook.onLift(operator).call(o);
复制代码

  这行代码,就是定义operator,生成一个经过operator操作过的Subscriber,看下OperatorMap这个类中的call方法

  1.   @Override

  2.   public Subscriber call(final Subscriber o) {

  3.   return new Subscriber(o) {

  4.   @Override

  5.   public void onCompleted() {

  6.   o.onCompleted();

  7.   }

  8.   @Override

  9.   public void onError(Throwable e) {

  10.   o.onError(e);

  11.   }

  12.   @Override

  13.   public void onNext(T t) {

  14.   try {

  15.   o.onNext(transformer.call(t));

  16.   } catch (Throwable e) {

  17.   Exceptions.throwIfFatal(e);

  18.   onError(OnErrorThrowable.addValueAsLastCause(e, t));

  19.   }

  20.   }

  21.   };

  22.   }
复制代码


  没错,对传入的Subscriber做了一个代理,把转换后的值传入。

  这样就生成了一个代理的Subscriber,

  最后我们最外层的OnSubscribe对象对我们代理的Subscriber进行了调用。。

  也就是

  1.   @Override

  2.   public void call(Subscriber subscriber) {

  3.   //此处的subscriber就是被map包裹(wrapper)后的对象。

  4.   subscriber.onNext("hello");

  5.   }
复制代码


  然后这个subscriber传入到内部,链式的通知,最后通知到我们在subscribe函数中定义的对象。

  这时候要盗下扔物线大大文章的图

e.gif


  还不明白的各位,可以自己写一个Demo试一下。

  下一章讲下RxJava中很重要的线程切换。

  【迷之RxJava(三)—— 线程切换】

原文作者: Gemini 来源:开发者头条
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关于我们
联系我们
  • 电话:010-86393388
  • 邮件:udn@yonyou.com
  • 地址:北京市海淀区北清路68号
移动客户端下载
关注我们
  • 微信公众号:yonyouudn
  • 扫描右侧二维码关注我们
  • 专注企业互联网的技术社区
版权所有:用友网络科技股份有限公司82041 京ICP备05007539号-11 京公网网备安1101080209224 Powered by Discuz!
快速回复 返回列表 返回顶部