UDN-企业互联网技术人气社区

板块导航

浏览  : 639
回复  : 0

[讨论交流] 谜之RxJava(四)—— Retrofit和RxJava的基情

[复制链接]
呵呵燕的头像 楼主
发表于 2016-11-27 21:22:19 | 显示全部楼层 |阅读模式
  概述

  前文回顾: 谜之RxJava (三)—— 线程切换

  今天来介绍下和RxJava搭配使用的好基友,就是我们的Retrofit啦,Retrofit使用动态代理的机制,为我们提供了一个简要的使用方法来获取网络上的资料,现在更新到2.0.0-beta2了(因为是beta,我也碰到一些坑,期待作者发布下一个版本)。

  Retrofit不算是一个网络库,它应该算是封装了okhttp,然后为我们提供了一个友好的接口的一个工具库吧。

  我们今天着重来介绍下RxJavaCallAdapterFactory这个类。用过的朋友们都知道,它是用来把Retrofit转成RxJava可用的适配类。

  RxJavaCallAdapterFactory

  OK,首先看下声明

 
  1.  public final class RxJavaCallAdapterFactory implements CallAdapter.Factory
复制代码

  CallAdapter.Factory是Retrofit这个库中的接口,用来给我们自定义去解析我们自己想要的类型用的。

  举个栗子:

  1.   @GET("/aaa")

  2.   Observable getQuestionNewestList();
复制代码


  这么一个接口,retrofit本身是无法识别Observable然后去工作的,如果没有这个适配器就根本无法工作,因此我们的适配器的作用,就是生成我们想要的Observable。

  看下它的实现。

  1.   @Override

  2.   public CallAdapter get(Type returnType, Annotation[] annotations, Retrofit retrofit) {

  3.   Class rawType = Utils.getRawType(returnType);

  4.   boolean isSingle = "rx.Single".equals(rawType.getCanonicalName());

  5.   if (rawType != Observable.class && !isSingle) {

  6.   return null;

  7.   }

  8.   if (!(returnType instanceof ParameterizedType)) {

  9.   String name = isSingle ? "Single" : "Observable";

  10.   throw new IllegalStateException(name + " return type must be parameterized"

  11.   + " as " + name + " or " + name + "");

  12.   }

  13.   CallAdapter> callAdapter = getCallAdapter(returnType);

  14.   if (isSingle) {

  15.   // Add Single-converter wrapper from a separate class. This defers classloading such that

  16.   // regular Observable operation can be leveraged without relying on this unstable RxJava API.

  17.   return SingleHelper.makeSingle(callAdapter);

  18.   }

  19.   return callAdapter;

  20.   }
复制代码


  这里代码的意思就是说,如果你返回的不是Observable这种类型,我就不干!

  如果是的话,那我再来详细看下模板类是哪个,也就是getCallAdapter接口干的事。

  1.   private CallAdapter> getCallAdapter(Type returnType) {

  2.   Type observableType = Utils.getSingleParameterUpperBound((ParameterizedType) returnType);

  3.   Class rawObservableType = Utils.getRawType(observableType);

  4.   if (rawObservableType == Response.class) {

  5.   if (!(observableType instanceof ParameterizedType)) {

  6.   throw new IllegalStateException("Response must be parameterized"

  7.   + " as Response or Response");

  8.   }

  9.   Type responseType = Utils.getSingleParameterUpperBound((ParameterizedType) observableType);

  10.   return new ResponseCallAdapter(responseType);

  11.   }

  12.   if (rawObservableType == Result.class) {

  13.   if (!(observableType instanceof ParameterizedType)) {

  14.   throw new IllegalStateException("Result must be parameterized"

  15.   + " as Result or Result");

  16.   }

  17.   Type responseType = Utils.getSingleParameterUpperBound((ParameterizedType) observableType);

  18.   return new ResultCallAdapter(responseType);

  19.   }

  20.   return new SimpleCallAdapter(observableType);

  21.   }
复制代码


  这里告诉我们,除了Observable和Observable需要不同的Adapter处理外,其他的都让SimpleCallAdapter处理。

  OK,我们就不看别的,直捣黄龙,看SimpleCallAdapter!

  SimpleCallAdapter 创建Observable的类

  1.   static final class SimpleCallAdapter implements CallAdapter> {

  2.   private final Type responseType;

  3.   SimpleCallAdapter(Type responseType) {

  4.   this.responseType = responseType;

  5.   }

  6.   @Override public Type responseType() {

  7.   return responseType;

  8.   }

  9.   @Override public Observable adapt(Call call) {

  10.   return Observable.create(new CallOnSubscribe<>(call)) //

  11.   .flatMap(new Func1, Observable>() {

  12.   @Override public Observable call(Response response) {

  13.   if (response.isSuccess()) {

  14.   return Observable.just(response.body());

  15.   }

  16.   return Observable.error(new HttpException(response));

  17.   }

  18.   });

  19.   }

  20.   }
复制代码


  这里总算看见我们熟悉的Observable接口咯,原来是自己定义了一个OnSubscribe,然后把Response通过flatMap转为我们想要的对象了。 这里同时也判断请求是否成功,进入Observable的工作流里了。

  好,我们最终可以看下CallOnSubscribe干了啥

  1.   static final class CallOnSubscribe implements Observable.OnSubscribe> {

  2.   private final Call originalCall;

  3.   private CallOnSubscribe(Call originalCall) {

  4.   this.originalCall = originalCall;

  5.   }

  6.   @Override public void call(final Subscriber> subscriber) {

  7.   // Since Call is a one-shot type, clone it for each new subscriber.

  8.   final Call call = originalCall.clone();

  9.   // Attempt to cancel the call if it is still in-flight on unsubscription.

  10.   subscriber.add(Subscriptions.create(new Action0() {

  11.   @Override public void call() {

  12.   call.cancel();

  13.   }

  14.   }));

  15.   if (subscriber.isUnsubscribed()) {

  16.   return;

  17.   }

  18.   try {

  19.   Response response = call.execute();

  20.   if (!subscriber.isUnsubscribed()) {

  21.   subscriber.onNext(response);

  22.   }

  23.   } catch (Throwable t) {

  24.   Exceptions.throwIfFatal(t);

  25.   if (!subscriber.isUnsubscribed()) {

  26.   subscriber.onError(t);

  27.   }

  28.   return;

  29.   }

  30.   if (!subscriber.isUnsubscribed()) {

  31.   subscriber.onCompleted();

  32.   }

  33.   }

  34.   }
复制代码


  这里其实蛮简单的,call是retrofit对okhttp的一个代理,是一个同步网络请求,在这里就是典型的对网络进行数据请求,完了放到subscriber的onNext里,完成网络请求。我们可以看下,它把unsubscribe,也就是取消请求的情况处理的挺好。

  1.   subscriber.add(Subscriptions.create(new Action0() {

  2.   @Override public void call() {

  3.   call.cancel();

  4.   }

  5.   }));
复制代码


  这段代码是给subscribe增加一个unsubscribe的事件。 也就是请求完成的时候,会自动对call进行一个终止,也就是http的close行为。

  前方高能

  今天被坑到这里很久,我们对API调用了observeOn(MainThread)之后,线程会跑在主线程上,包括onComplete也是,unsubscribe也在主线程,然后如果这时候调用call.cancel会导致NetworkOnMainThreadException,所以一定要在retrofit的API调用ExampleAPI.subscribeOn(io).observeOn(MainThread)之后加一句unsubscribeOn(io)。

  完整的就是ExampleAPI.subscribeOn(io).observeOn(MainThread).unsubscribeOn(io)。

原文作者:Gemini 来源:开发者头条
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关于我们
联系我们
  • 电话:010-86393388
  • 邮件:udn@yonyou.com
  • 地址:北京市海淀区北清路68号
移动客户端下载
关注我们
  • 微信公众号:yonyouudn
  • 扫描右侧二维码关注我们
  • 专注企业互联网的技术社区
版权所有:用友网络科技股份有限公司82041 京ICP备05007539号-11 京公网网备安1101080209224 Powered by Discuz!
快速回复 返回列表 返回顶部