UDN-企业互联网技术人气社区

板块导航

浏览  : 908
回复  : 0

[讨论交流] 谜之RxJava (一) —— 最基本的观察者模式

[复制链接]
呵呵燕的头像 楼主
发表于 2016-11-27 14:15:20 | 显示全部楼层 |阅读模式
  最近在Android界,最火的framework大概就是RxJava了。

  扔物线大大之前写了一篇文章 《给 Android 开发者的 RxJava 详解》,在我学习RxJava的过程中受益匪浅。经过阅读这篇文章后,我们来看下RxJava的源码,揭开它神秘的面纱。

  这里准备分几篇文章写,为了能让自己有个喘口气的机会。

  先来上个最最简单的,经典的Demo。

  Demo

  1.   Observable.create(new Observable.OnSubscribe() {

  2.   @Override

  3.   public void call(Subscriber subscriber) {

  4.   subscriber.onNext("hello");

  5.   }

  6.   }).subscribe(new Subscriber() {

  7.   @Override

  8.   public void onCompleted() {

  9.   }

  10.   @Override

  11.   public void onError(Throwable e) {

  12.   }

  13.   @Override

  14.   public void onNext(String s) {

  15.   Log.d("rx", s);

  16.   }

  17.   });
复制代码


  这段代码产生的最终结果就是在Log里会出现hello。

  看下这段代码的具体流程吧。

  这里有2个函数create和subscribe,我们看看create里面干了啥。

  OnSubscribe对象

  1.   public final static Observable create(OnSubscribe f) {

  2.   return new Observable(hook.onCreate(f));

  3.   }

  4.   // constructor

  5.   protected Observable(OnSubscribe f) {

  6.   this.onSubscribe = f;

  7.   }
复制代码


  这里的hook是一个默认实现,里面不做任何事,就是返回f。我们看见create只是给Observable的onSubscribe赋值了我们定义的OnSubscribe。

  Subscriber对象

  来看下subscribe这个函数做了什么事

  1.   public final Subscription subscribe(Subscriber subscriber) {

  2.   return Observable.subscribe(subscriber, this);

  3.   }

  4.   private static Subscription subscribe(Subscriber subscriber, Observable observable) {

  5.   // validate and proceed

  6.   if (subscriber == null) {

  7.   throw new IllegalArgumentException("observer can not be null");

  8.   }

  9.   if (observable.onSubscribe == null) {

  10.   throw new IllegalStateException("onSubscribe function can not be null.");

  11.   /*

  12.   * the subscribe function can also be overridden but generally that's not the appropriate approach

  13.   * so I won't mention that in the exception

  14.   */

  15.   }

  16.   // new Subscriber so onStart it

  17.   subscriber.onStart();

  18.   /*

  19.   * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls

  20.   * to user code from within an Observer"

  21.   */

  22.   // if not already wrapped

  23.   if (!(subscriber instanceof SafeSubscriber)) {

  24.   // assign to `observer` so we return the protected version

  25.   subscriber = new SafeSubscriber(subscriber);

  26.   }

  27.   // The code below is exactly the same an unsafeSubscribe but not used because it would add a sigificent depth to alreay huge call stacks.

  28.   try {

  29.   // allow the hook to intercept and/or decorate

  30.   hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);

  31.   return hook.onSubscribeReturn(subscriber);

  32.   } catch (Throwable e) {

  33.   // special handling for certain Throwable/Error/Exception types

  34.   Exceptions.throwIfFatal(e);

  35.   // if an unhandled error occurs executing the onSubscribe we will propagate it

  36.   try {

  37.   subscriber.onError(hook.onSubscribeError(e));

  38.   } catch (OnErrorNotImplementedException e2) {

  39.   // special handling when onError is not implemented ... we just rethrow

  40.   throw e2;

  41.   } catch (Throwable e2) {

  42.   // if this happens it means the onError itself failed (perhaps an invalid function implementation)

  43.   // so we are unable to propagate the error correctly and will just throw

  44.   RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);

  45.   // TODO could the hook be the cause of the error in the on error handling.

  46.   hook.onSubscribeError(r);

  47.   // TODO why aren't we throwing the hook's return value.

  48.   throw r;

  49.   }

  50.   return Subscriptions.unsubscribed();

  51.   }

  52.   }
复制代码


  我们看到,这里我们的subscriber被SafeSubscriber包裹了一层。

  1.   if (!(subscriber instanceof SafeSubscriber)) {

  2.   // assign to `observer` so we return the protected version

  3.   subscriber = new SafeSubscriber(subscriber);

  4.   }
复制代码


  然后开始执行工作流

  1.   hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);

  2.   return hook.onSubscribeReturn(subscriber);
复制代码


  默认的hook只是返回我们之前定义的onSubscribe,这里调用的call方法就是我们在外面定义的

  1.   new Observable.OnSubscribe() {

  2.   @Override

  3.   public void call(Subscriber subscriber) {

  4.   subscriber.onNext("hello");

  5.   }

  6.   })

  7.   我们调用传入的subscriber对象的onNext方法,这里的subscriber是SafeSubscriber

  8.   在SafeScriber中

  9.   public void onNext(T args) {

  10.   try {

  11.   if (!done) {

  12.   actual.onNext(args);

  13.   }

  14.   } catch (Throwable e) {

  15.   // we handle here instead of another method so we don't add stacks to the frame

  16.   // which can prevent it from being able to handle StackOverflow

  17.   Exceptions.throwIfFatal(e);

  18.   // handle errors if the onNext implementation fails, not just if the Observable fails

  19.   onError(e);

  20.   }

  21.   }
复制代码


  actual就是我们自己定义的subscriber。 原来SafeSubscriber只是为了帮我们处理好异常,以及防止工作流的重复。

  这是RxJava最最基本的工作流,让我们认识到他是怎么工作的。之后我们来讲讲其中的细节和其他神奇的内容。

  【谜之RxJava (二) —— Magic Lift】

原文作者:Gemini 来源:开发者头条

相关帖子

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关于我们
联系我们
  • 电话:010-86393388
  • 邮件:udn@yonyou.com
  • 地址:北京市海淀区北清路68号
移动客户端下载
关注我们
  • 微信公众号:yonyouudn
  • 扫描右侧二维码关注我们
  • 专注企业互联网的技术社区
版权所有:用友网络科技股份有限公司82041 京ICP备05007539号-11 京公网网备安1101080209224 Powered by Discuz!
快速回复 返回列表 返回顶部