UDN-企业互联网技术人气社区

板块导航

浏览  : 1586
回复  : 0

[讨论交流] RxJava系列2(基本概念及使用介绍)

[复制链接]
舞操的头像 楼主
发表于 2017-1-6 10:39:43 | 显示全部楼层 |阅读模式

  前言

  上一篇的示例代码中大家一定发现了Observable这个类。从纯Java的观点看,Observable类源自于经典的观察者模式。RxJava的异步实现正是基于观察者模式来实现的,而且是一种扩展的观察者模式。

  观察者模式

  观察者模式基于Subject这个概念,Subject是一种特殊对象,又叫做主题或者被观察者。当它改变时那些由它保存的一系列对象将会得到通知,而这一系列对象被称作Observer(观察者)。它们会对外暴漏了一个通知方法(比方说update之类的),当Subject状态发生变化时会调用的这个方法。

  观察者模式很适合下面这些场景中的任何一个:

  当你的架构有两个实体类,一个依赖另一个,你想让它们互不影响或者是独立复用它们时。

  当一个变化的对象通知那些与它自身变化相关联的未知数量的对象时。

  当一个变化的对象通知那些无需推断具体类型的对象时。

  通常一个观察者模式的类图是这样的:

2.png


  如果你对观察者模式bu很了解,那么强烈建议你先去学习下。关于观察者模式的详细介绍可以参考我之前的文章:设计模式之观察者模式

  扩展的观察者模式

  在RxJava中主要有4个角色:

  Observable

  Subject

  Observer

  Subscriber

  Observable和Subject是两个“生产”实体,Observer和Subscriber是两个“消费”实体。说直白点Observable对应于观察者模式中的被观察者,而Observer和Subscriber对应于观察者模式中的观察者。Subscriber其实是一个实现了Observer的抽象类,后面我们分析源码的时候也会介绍到。Subject比较复杂,以后再分析。

  上一篇文章中我们说到RxJava中有个关键概念:事件。观察者Observer和被观察者Observable通过subscribe()方法实现订阅关系。从而Observable 可以在需要的时候发出事件来通知Observer。

  RxJava如何使用

  我自己在学习一种新技术的时候通常喜欢先去了解它是怎么用的,掌握了使用方法后再去深挖其原理。那么我们现在就来说说RxJava到底该怎么用。

  第一步:创建观察者Observer

  1. Observer<Object> observer = new Observer<Object>() {

  2.     @Override
  3.     public void onCompleted() {

  4.     }

  5.     @Override
  6.     public void onError(Throwable e) {

  7.     }

  8.     @Override
  9.     public void onNext(Object s) {

  10.     }
  11. };
复制代码


  这么简单,一个观察者Observer创建了!

  大兄弟你等等...,你之前那篇观察者模式中不是说观察者只提供一个update方法的吗?这特么怎么有三个?!!

  少年勿急,且听我慢慢道来。在普通的观察者模式中观察者一般只会提供一个update()方法用于被观察者的状态发生变化时,用于提供给被观察者调用。而在RxJava中的观察者Observer提供了:onNext()、 onCompleted()和onError()三个方法。还记得吗?开篇我们讲过RxJava是基于一种扩展的观察这模式实现,这里多出的onCompleted和onError正是对观察者模式的扩展。ps:onNext就相当于普通观察者模式中的update

  RxJava中添加了普通观察者模式缺失的三个功能:

  RxJava中规定当不再有新的事件发出时,可以调用onCompleted()方法作为标示;

  当事件处理出现异常时框架自动触发onError()方法;

  同时Observables支持链式调用,从而避免了回调嵌套的问题。

  第二步:创建被观察者Observable

  Observable.create()方法可以创建一个Observable,使用crate()创建Observable需要一个OnSubscribe对象,这个对象继承Action1。当观察者订阅我们的Observable时,它作为一个参数传入并执行call()函数。

  1. Observable<Object> observable = Observable.create(new Observable.OnSubscribe<Object>() {         
  2.     @Override
  3.     public void call(Subscriber<? super Object> subscriber) {

  4.     }
  5. });
复制代码


  除了create(),just()和from()同样可以创建Observable。看看下面两个例子:

  just(T...)将传入的参数依次发送

  1. Observable observable = Observable.just("One", "Two", "Three");
  2. //上面这行代码会依次调用
  3. //onNext("One");
  4. //onNext("Two");
  5. //onNext("Three");
  6. //onCompleted();
复制代码

   
  from(T[])/from(Iterable<? extends T>)将传入的数组或者Iterable拆分成Java对象依次发送

  1. String[] parameters = {"One", "Two", "Three"};
  2. Observable observable = Observable.from(parameters);
  3. //上面这行代码会依次调用
  4. //onNext("One");
  5. //onNext("Two");
  6. //onNext("Three");
  7. //onCompleted();
复制代码


  第三步:被观察者Observable订阅观察者Observer(ps:你没看错,不同于普通的观察者模式,这里是被观察者订阅观察者)

  有了观察者和被观察者,我们就可以通过subscribe()l,就像这来实现二者的订阅关系了。

  1. observable.subscribe(observer);
复制代码

  连在一起写就是这样:

  1. Observable.create(new Observable.OnSubscribe<Integer>() {

  2.     @Override
  3.     public void call(Subscriber<? super Integer> subscriber) {
  4.         for (int i = 0; i < 5; i++) {
  5.             subscriber.onNext(i);
  6.         }
  7.         subscriber.onCompleted();
  8.     }

  9. }).subscribe(new Observer<Integer>() {

  10.     @Override
  11.     public void onCompleted() {
  12.         System.out.println("onCompleted");
  13.     }

  14.     @Override
  15.     public void onError(Throwable e) {
  16.         System.out.println("onError");
  17.     }

  18.     @Override
  19.     public void onNext(Integer item) {
  20.         System.out.println("Item is " + item);
  21.     }
  22. });
复制代码


  至此一个完整的RxJava调用就完成了。

  兄台,你叨逼叨叨逼叨的说了一大堆,可是我没搞定你特么到底在干啥啊?!!不急,我现在就来告诉你们到底发生了什么。

  首先我们使用Observable.create()创建了一个新的Observable<Integer>,并为create()方法传入了一个OnSubscribe,OnSubscribe中包含一个call()方法,一旦我们调用subscribe()订阅后就会自动触发call()方法。call()方法中的参数Subscriber其实就是subscribe()方法中的观察者Observer。我们在call()方法中调用了5次onNext()和1次onCompleted()方法。一套流程周下来以后输出结果就是下面这样的:

  1. Item is 0
  2. Item is 1
  3. Item is 2
  4. Item is 3
  5. Item is 4
  6. onCompleted
复制代码


  看到这里可能你又要说了,大兄弟你别唬我啊!OnSubscribe的call()方法中的参数Subscriber怎么就变成了subscribe()方法中的观察者
Observer?!!!这俩儿货明明看起来就是两个不同的类啊。

  我们先看看Subscriber这个类:

  1. public abstract class Subscriber<T> implements Observer<T>, Subscription {

  2.     ...
  3. }
复制代码


  从源码中我们可以看到,Subscriber是Observer的一个抽象实现类,所以我首先可以肯定的是Subscriber和Observer类型是一致的。接着往下我们看看subscribe()这个方法:

  1. public final Subscription subscribe(final Observer<? super T> observer) {

  2.     //这里的if判断对于我们要分享的问题没有关联,可以先无视
  3.     if (observer instanceof Subscriber) {
  4.         return subscribe((Subscriber<? super T>)observer);
  5.     }
  6.     return subscribe(new Subscriber<T>() {

  7.         @Override
  8.         public void onCompleted() {
  9.             observer.onCompleted();
  10.         }

  11.         @Override
  12.         public void onError(Throwable e) {
  13.             observer.onError(e);
  14.         }

  15.         @Override
  16.         public void onNext(T t) {
  17.             observer.onNext(t);
  18.         }

  19.     });
  20. }
复制代码


  我们看到subscribe()方法内部首先将传进来的Observer做了一层代理,将它转换成了Subscriber。我们再看看这个方法内部的subscribe()方法:

  1. public final Subscription subscribe(Subscriber<? super T> subscriber) {
  2.     return Observable.subscribe(subscriber, this);
  3. }
复制代码


  进一步往下追踪看看return后面这段代码到底做了什么。精简掉其他无关代码后的subscribe(subscriber, this)方法是这样的:

  1. private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {

  2.     subscriber.onStart();
  3.     try {
  4.         hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
  5.         return hook.onSubscribeReturn(subscriber);
  6.     } catch (Throwable e) {
  7.         return Subscriptions.unsubscribed();
  8.     }
  9. }
复制代码


  我们重点看看hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber),前面这个hook.onSubscribeStart(observable, observable.onSubscribe)返回的是它自己括号内的第二个参数observable.onSubscribe,然后调用了它的call方法。而这个observable.onSubscribe正是create()方法中的Subscriber,这样整个流程就理顺了。看到这里是不是对RxJava的执行流程清晰了一点呢?这里也建议大家在学习新技术的时候多去翻一翻源码,知其然还要能知其所以然不是吗。

  subscribe()的参数除了可以是Observer和Subscriber以外还可以是Action1、Action0;这是一种更简单的回调,只有一个call(T)方法;由于太简单这里就不做详细介绍了!

  异步

  上一篇文章中开篇就讲到RxJava就是来处理异步任务的。但是默认情况下我们在哪个线程调用subscribe()就在哪个线程生产事件,在哪个线程生产事件就在哪个线程消费事件。那怎么做到异步呢?RxJava为我们提供Scheduler用来做线程调度,我们来看看RxJava提供了哪些Scheduler。

3.png


  同时RxJava还为我们提供了subscribeOn()和observeOn()两个方法来指定Observable和Observer运行的线程。

  1. Observable.from(getCommunitiesFromServer())
  2.             .flatMap(community -> Observable.from(community.houses))
  3.             .filter(house -> house.price>=5000000).subscribeOn(Schedulers.io())
  4.             .observeOn(AndroidSchedulers.mainThread())
  5.             .subscribe(this::addHouseInformationToScreen);
复制代码


    上面这段代码大家应该有印象吧,没错正是我们上一篇文章中的例子。subscribeOn(http://Schedulers.io())指定了获取小区列表、处理房源信息等一系列事件都是在IO线程中运行,observeOn(AndroidSchedulers.mainThread())指定了在屏幕上展示房源的操作在UI线程执行。这就做到了在子线程获取房源,主线程展示房源。

  好了,RxJava系列的入门内容我们就聊到这。下一篇我们再继续介绍更多的API以及它们内部的原理。

原文作者:佚名  来源:开发者头条

相关帖子

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关于我们
联系我们
  • 电话:010-86393388
  • 邮件:udn@yonyou.com
  • 地址:北京市海淀区北清路68号
移动客户端下载
关注我们
  • 微信公众号:yonyouudn
  • 扫描右侧二维码关注我们
  • 专注企业互联网的技术社区
版权所有:用友网络科技股份有限公司82041 京ICP备05007539号-11 京公网网备安1101080209224 Powered by Discuz!
快速回复 返回列表 返回顶部