UDN-企业互联网技术人气社区

板块导航

浏览  : 604
回复  : 0

[讨论交流] 谜之RxJava (三)—— 线程切换

[复制链接]
呵呵燕的头像 楼主
发表于 2016-11-27 14:20:17 | 显示全部楼层 |阅读模式
  Rxjava -- 一个异步库

  RxJava最迷人的是什么?

  答案就是把异步序列写到一个工作流里!和JavaScript的Promise/A如出一辙。

  OK,在java中做异步的事情在我们传统理解过来可不方便,而且,如果要让异步按照我们的工作流来,就更困难了。

  但是在RxJava中,我们只要调用调用

  subscribOn()和observeOn()就能切换我们的工作线程,是不是让小伙伴都惊呆了?

  然后结合RxJava的Operator,写异步的时候,想切换线程就是一行代码的事情,整个workflow还非常清晰:

  1.   Observable.create()

  2.   // do something on io thread

  3.   .work() // work.. work..

  4.   .subscribeOn(Schedulers.io())

  5.   // observeOn android main thread

  6.   .observeOn(AndroidSchedulers.mainThread())

  7.   .subscribe();
复制代码


  我们再也不用去写什么见鬼的new Thread和Handler了,在这么几行代码里,我们实现了在io线程上做我们的工作(work),在main线程上,更新UI

  Subscribe On

  先看下subscribeOn干了什么

  1.   public final Observable subscribeOn(Scheduler scheduler) {

  2.   if (this instanceof ScalarSynchronousObservable) {

  3.   return ((ScalarSynchronousObservable)this).scalarScheduleOn(scheduler);

  4.   }

  5.   return nest().lift(new OperatorSubscribeOn(scheduler));

  6.   }
复制代码


  啊,原来也是个lift,就是从一个Observable生成另外一个Observable咯,这个nest是干嘛用?

  1.   public final Observable> nest() {

  2.   return just(this);

  3.   }
复制代码


  这里返回类型告诉我们,它是产生一个Observable>

  讲到这里,会有点晕,先记着这个,然后我们看OperatorSubscribeOn这个操作符,

  构造函数是

  1.   public OperatorSubscribeOn(Scheduler scheduler) {

  2.   this.scheduler = scheduler;

  3.   }

  4.   OK,这里保存了scheduler对象,然后就是我们前一章说过的转换方法。

  5.   @Override

  6.   public Subscriber> call(final Subscriber subscriber) {

  7.   final Worker inner = scheduler.createWorker();

  8.   subscriber.add(inner);

  9.   return new Subscriber>(subscriber) {

  10.   @Override

  11.   public void onCompleted() {

  12.   // ignore because this is a nested Observable and we expect only 1 Observable emitted to onNext

  13.   }

  14.   @Override

  15.   public void onError(Throwable e) {

  16.   subscriber.onError(e);

  17.   }

  18.   @Override

  19.   public void onNext(final Observable o) {

  20.   inner.schedule(new Action0() {

  21.   @Override

  22.   public void call() {

  23.   final Thread t = Thread.currentThread();

  24.   o.unsafeSubscribe(new Subscriber(subscriber) {

  25.   @Override

  26.   public void onCompleted() {

  27.   subscriber.onCompleted();

  28.   }

  29.   @Override

  30.   public void onError(Throwable e) {

  31.   subscriber.onError(e);

  32.   }

  33.   @Override

  34.   public void onNext(T t) {

  35.   subscriber.onNext(t);

  36.   }

  37.   @Override

  38.   public void setProducer(final Producer producer) {

  39.   subscriber.setProducer(new Producer() {

  40.   @Override

  41.   public void request(final long n) {

  42.   if (Thread.currentThread() == t) {

  43.   // don't schedule if we're already on the thread (primarily for first setProducer call)

  44.   // see unit test 'testSetProducerSynchronousRequest' for more context on this

  45.   producer.request(n);

  46.   } else {

  47.   inner.schedule(new Action0() {

  48.   @Override

  49.   public void call() {

  50.   producer.request(n);

  51.   }

  52.   });

  53.   }

  54.   }

  55.   });

  56.   }

  57.   });

  58.   }

  59.   });

  60.   }

  61.   };

  62.   }
复制代码


  让人纠结的类模板

  看完这段又臭又长的,先深呼吸一口气,我们慢慢分析下。

  首先要注意RxJava里面最让人头疼的模板问题,那么OperatorMap这个类的声明是

 
  1.  public final class OperatorMap implements Operator
复制代码


  而Operator这个接口继承Func1

  1.   public interface Func1 extends Function {

  2.   R call(T t);

  3.   }
复制代码


  我们这里不要记T和R,记住传入左边的模板是形参,传入右边的模板是返回值。

  好了,那么这里的call就是从一个T转换成一个Observable的过程了。

  总结一下,我们这一次调用subscribeOn,做了两件事

  1、nest() 为Observable生成了一个Observable>

  2、lift() 对Observalbe>进行一个变化,变回Observable

  因为lift是一个模板函数,它的返回值的类型是参照它的形参来,而他的形参是Operator> 这个结论非常重要!!

  OK,到这里我们已经存储了所有的序列,等着我们调用了。

  调用链

  首先,记录我们在调用这条指令之前的Observable,记为Observable$1

  然后,经过lift生成的Observable记为Observable$2

  好了,现在我们拿到的依然是Observable这个对象,但是它不是原始的Observable$1,要深深记住这一点,它是由lift生成的Observable$2,这时候进行subscribe,那看到首先调用的就是OnSubscribe.call方法,好,直接进入lift当中生成的那个地方。

  我们知道这一层lift的operator就是刚刚的OperatorSubscribOn,那么调用它的call方法,生成的是一个Subscriber>

  1.   Subscriber st = hook.onLift(operator).call(o);

  2.   try {

  3.   // new Subscriber created and being subscribed with so 'onStart' it

  4.   st.onStart();

  5.   onSubscribe.call(st);

  6.   } catch (Throwable e) {

  7.   ...

  8.   }
复制代码


  好,还记得我们调用过nest么?,这里的onSubscribe可是nest上下文中的噢,每一次,到这个地方,这个onSubscribe就是上一层Observable的onSubscribe,即Observable>的onSubscribe,相当于栈弹出了一层。它的call直接在Subscriber的onNext中给出了最开始的Observable,我们这里就要看下刚刚在OperatorSubscribeOn中生成的Subscriber

  1.   new Subscriber>(subscriber) {

  2.   @Override

  3.   public void onCompleted() {

  4.   // ignore because this is a nested Observable and we expect only 1 Observable emitted to onNext

  5.   }

  6.   @Override

  7.   public void onError(Throwable e) {

  8.   subscriber.onError(e);

  9.   }

  10.   @Override

  11.   public void onNext(final Observable o) {

  12.   inner.schedule(new Action0() {

  13.   @Override

  14.   public void call() {

  15.   final Thread t = Thread.currentThread();

  16.   o.unsafeSubscribe(new Subscriber(subscriber) {

  17.   @Override

  18.   public void onCompleted() {

  19.   subscriber.onCompleted();

  20.   }

  21.   @Override

  22.   public void onError(Throwable e) {

  23.   subscriber.onError(e);

  24.   }

  25.   @Override

  26.   public void onNext(T t) {

  27.   subscriber.onNext(t);

  28.   }

  29.   });

  30.   }

  31.   });

  32.   }

  33.   }
复制代码


  对,就是它,这里要注意,这里的subscriber就是我们在lift中,传入的o

  
  1. Subscriber st = hook.onLift(operator).call(o);
复制代码


  对,就是它,其实它就是SafeSubscriber。

  回过头,看看刚刚的onNext()方法,inner.schedule() 这个函数,我们可以认为就是postRun()类似的方法,而onNext()中传入的o是我们之前生成的Observable$1,是从Observable.just封装出来的Observable>中产生的,这里调用了Observable$1.unsafeSubscribe方法,我们暂时不关心它和subscribe有什么不同,但是我们知道最终功能是一样的就好了。

  注意它运行时的线程!!在inner这个Worker上!于是它的运行线程已经被改了!!

  好,这里的unsafeSubscribe调用的方法就是调用原先Observable$1.onSubscribe中的call方法:

  这个Observable$1就是我们之前自己定义的Observable了。

  综上所述,如果我们需要我们的Observable$1在一个别的线程上运行的时候,只需要在后面跟一个subscribeOn即可。结合扔物线大大的图如下:

f.png


  总结

  这里逻辑着实不好理解。如果还没有理解的朋友,可以按照我前文说的顺序,细致的看下来,我把逻辑过一遍之后,发现lift的陷阱实在太大,内部类用的风生水起,一不小心,就不知道一个变量的上下文是什么,需要特别小心。

  迷之RxJava(四)—— Retrofit和RxJava的基情

原文作者:佚名  来源:开发者头条
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关于我们
联系我们
  • 电话:010-86393388
  • 邮件:udn@yonyou.com
  • 地址:北京市海淀区北清路68号
移动客户端下载
关注我们
  • 微信公众号:yonyouudn
  • 扫描右侧二维码关注我们
  • 专注企业互联网的技术社区
版权所有:用友网络科技股份有限公司82041 京ICP备05007539号-11 京公网网备安1101080209224 Powered by Discuz!
快速回复 返回列表 返回顶部