UDN-企业互联网技术人气社区

板块导航

浏览  : 1020
回复  : 0

[讨论交流] RxJava系列番外篇:一个RxJava解决复杂业务逻辑的案例

[复制链接]
舞操的头像 楼主
发表于 2017-1-6 09:41:58 | 显示全部楼层 |阅读模式

  业务场景

  拿MinimalistWeather这个开源的天气App来举例:

  进入App首页后,首先我们需要从数据库中获取当前城市的天气数据,如果数据库中存在天气数据则在UI页面上展示天气数据;如果数据库中未存储当前城市的天气数据,或者已存储的天气数据的发布时间相比现在已经超过了一小时,并且网络属于连接状态则调用API从服务端获取天气数据。如果获取到到的天气数据发布时间和当前数据库中的天气数据发布时间一致则丢弃掉从服务端获取到的天气数据,如果不一致则更新数据库并且在页面上展示最新的天气信息。(同时天气数据源是可配置的,可选择是小米天气数据源还是Know天气数据源)

  解决方案

  首先我们需要创建一个从数据库获取天气数据的Observable observableForGetWeatherFromDB,同时我们也需要创建一个从API获取天气数据的Observable observableForGetWeatherFromNetWork;为了在无网络状态下免于创建observableForGetWeatherFromNetWork我们在这之前需要首先判断下网络状态。最后使用contact操作符将两个Observable合并,同时使用distinct和takeUntil操作符来过滤筛选数据以符合业务需求,然后结合subscribeOn和observeOn做线程切换。上述这一套复杂的业务逻辑如果使用传统编码方式将是极其复杂的。下面我们来看看使用RxJava如何清晰简洁的来实现这个复杂的业务:

  1.   Observable observableForGetWeatherData;

  2.   //首先创建一个从数据库获取天气数据的Observable

  3.   Observable observableForGetWeatherFromDB = Observable.create(new Observable.OnSubscribe() {

  4.   @Override

  5.   public void call(Subscriber subscriber) {

  6.   try {

  7.   Weather weather = weatherDao.queryWeather(cityId);

  8.   subscriber.onNext(weather);

  9.   subscriber.onCompleted();

  10.   } catch (SQLException e) {

  11.   throw Exceptions.propagate(e);

  12.   }

  13.   }

  14.   });

  15.   if (!NetworkUtils.isNetworkConnected(context)) {

  16.   observableForGetWeatherData = observableForGetWeatherFromDB;

  17.   } else {

  18.   //接着创建一个从网络获取天气数据的Observable

  19.   Observable observableForGetWeatherFromNetWork = null;

  20.   switch (configuration.getDataSourceType()) {

  21.   case ApiConstants.WEATHER_DATA_SOURCE_TYPE_KNOW:

  22.   observableForGetWeatherFromNetWork = ApiClient.weatherService.getKnowWeather(cityId)

  23.   .map(new Func1() {

  24.   @Override

  25.   public Weather call(KnowWeather knowWeather) {

  26.   return new KnowWeatherAdapter(knowWeather).getWeather();

  27.   }

  28.   });

  29.   break;

  30.   case ApiConstants.WEATHER_DATA_SOURCE_TYPE_MI:

  31.   observableForGetWeatherFromNetWork = ApiClient.weatherService.getMiWeather(cityId)

  32.   .map(new Func1() {

  33.   @Override

  34.   public Weather call(MiWeather miWeather) {

  35.   return new MiWeatherAdapter(miWeather).getWeather();

  36.   }

  37.   });

  38.   break;

  39.   }

  40.   assert observableForGetWeatherFromNetWork != null;

  41.   observableForGetWeatherFromNetWork = observableForGetWeatherFromNetWork

  42.   .doOnNext(new Action1() {

  43.   @Override

  44.   public void call(Weather weather) {

  45.   Schedulers.io().createWorker().schedule(() -> {

  46.   try {

  47.   weatherDao.insertOrUpdateWeather(weather);

  48.   } catch (SQLException e) {

  49.   throw Exceptions.propagate(e);

  50.   }

  51.   });

  52.   }

  53.   });

  54.   //使用concat操作符将两个Observable合并

  55.   observableForGetWeatherData = Observable.concat(observableForGetWeatherFromDB, observableForGetWeatherFromNetWork)

  56.   .filter(new Func1() {

  57.   @Override

  58.   public Boolean call(Weather weather) {

  59.   return weather != null && !TextUtils.isEmpty(weather.getCityId());

  60.   }

  61.   })

  62.   .distinct(new Func1() {

  63.   @Override

  64.   public Long call(Weather weather) {

  65.   return weather.getRealTime().getTime();//如果天气数据发布时间一致,我们再认为是相同的数据从丢弃掉

  66.   }

  67.   })

  68.   .takeUntil(new Func1() {

  69.   @Override

  70.   public Boolean call(Weather weather) {

  71.   return System.currentTimeMillis() - weather.getRealTime().getTime() <= 60 * 60 * 1000;//如果天气数据发布的时间和当前时间差在一小时以内则终止事件流

  72.   }

  73.   });

  74.   }

  75.   observableForGetWeatherData.subscribeOn(Schedulers.io())

  76.   .observeOn(AndroidSchedulers.mainThread())

  77.   .subscribe(new Action1() {

  78.   @Override

  79.   public void call(Weather weather) {

  80.   displayWeatherInformation();

  81.   }

  82.   }, new Action1() {

  83.   @Override

  84.   public void call(Throwable throwable) {

  85.   Toast.makeText(context, throwable.getMessage(), Toast.LENGTH_LONG).show();

  86.   }

  87.   });
复制代码


  上面的代码看起来比较复杂,我们采用Lambda表达式简化下代码:

  1.   Observable observableForGetWeatherData;

  2.   //首先创建一个从数据库获取天气数据的Observable

  3.   Observable observableForGetWeatherFromDB = Observable.create(new Observable.OnSubscribe() {

  4.   @Override

  5.   public void call(Subscriber subscriber) {

  6.   try {

  7.   Weather weather = weatherDao.queryWeather(cityId);

  8.   subscriber.onNext(weather);

  9.   subscriber.onCompleted();

  10.   } catch (SQLException e) {

  11.   throw Exceptions.propagate(e);

  12.   }

  13.   }

  14.   });

  15.   if (!NetworkUtils.isNetworkConnected(context)) {

  16.   observableForGetWeatherData = observableForGetWeatherFromDB;

  17.   } else {

  18.   //接着创建一个从网络获取天气数据的Observable

  19.   Observable observableForGetWeatherFromNetWork = null;

  20.   switch (configuration.getDataSourceType()) {

  21.   case ApiConstants.WEATHER_DATA_SOURCE_TYPE_KNOW:

  22.   observableForGetWeatherFromNetWork = ApiClient.weatherService.getKnowWeather(cityId)

  23.   .map(knowWeather -> new KnowWeatherAdapter(knowWeather).getWeather());

  24.   break;

  25.   case ApiConstants.WEATHER_DATA_SOURCE_TYPE_MI:

  26.   observableForGetWeatherFromNetWork = ApiClient.weatherService.getMiWeather(cityId)

  27.   .map(miWeather -> new MiWeatherAdapter(miWeather).getWeather());

  28.   break;

  29.   }

  30.   assert observableForGetWeatherFromNetWork != null;

  31.   observableForGetWeatherFromNetWork = observableForGetWeatherFromNetWork

  32.   .doOnNext(weather -> Schedulers.io().createWorker().schedule(() -> {

  33.   try {

  34.   weatherDao.insertOrUpdateWeather(weather);

  35.   } catch (SQLException e) {

  36.   throw Exceptions.propagate(e);

  37.   }

  38.   }));

  39.   //使用concat操作符将两个Observable合并

  40.   observableForGetWeatherData = Observable.concat(observableForGetWeatherFromDB, observableForGetWeatherFromNetWork)

  41.   .filter(weather -> weather != null && !TextUtils.isEmpty(weather.getCityId()))

  42.   .distinct(weather -> weather.getRealTime().getTime())//如果天气数据发布时间一致,我们再认为是相同的数据从丢弃掉

  43.   .takeUntil(weather -> System.currentTimeMillis() - weather.getRealTime().getTime() <= 60 * 60 * 1000);//如果天气数据发布的时间和当前时间差在一小时以内则终止事件流

  44.   }

  45.   observableForGetWeatherData.subscribeOn(Schedulers.io())

  46.   .observeOn(AndroidSchedulers.mainThread())

  47.   .subscribe(weather -> displayWeatherInformation(),

  48.   throwable -> Toast.makeText(context, throwable.getMessage(), Toast.LENGTH_LONG).show());
复制代码


  小技巧

  在上述的实现中有几点是我们需要注意的:

  为什么我需要在判断网络那块整个if else?这样看起来很不优雅,我们通过RxJava符完全可以实现同样的操作啊!之所以这样做是为了在无网络状况下去创建不必要的Observable observableForGetWeatherFromNetWork;

  更新数据库的操作不应该阻塞更新UI,因此我们在observableForGetWeatherFromNetWork的doOnNext中需要通过http://Schedulers.io().createWorker()去另起一条线程,以此保证更新数据库不会阻塞更新UI的操作。

  有同学可能会问为什么不在doOnNext之后再调用一次observeOn把更新数据库的操作切换到一条新的子线程去操作呢?其实一开始我也是这样做的,后来想想不对。整个Observable的事件传递处理就像是在一条流水线上完成的,虽然我们可以通过observeOn来指定子线程去处理更新数据库的操作,但是只有等这条子线程完成了更新数据库的任务后事件才会继续往后传递,这样就阻塞了更新UI的操作。对此有疑问的同学可以去看看我之前关于RxJava源码分析的文章或者自己动手debug看看。

  问题

  最后给大家留个两个问题:

  上述代码是最佳实现方案吗?还有什么更加合理的做法?

  我们在observableForGetWeatherData中使用distinct和takeUntil过滤筛选天气数据的时候网络请求会不会已经发出去了?这样做还有意义吗?

原文作者:张磊(BaronZhang)   来源:开发者头条
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关于我们
联系我们
  • 电话:010-86393388
  • 邮件:udn@yonyou.com
  • 地址:北京市海淀区北清路68号
移动客户端下载
关注我们
  • 微信公众号:yonyouudn
  • 扫描右侧二维码关注我们
  • 专注企业互联网的技术社区
版权所有:用友网络科技股份有限公司82041 京ICP备05007539号-11 京公网网备安1101080209224 Powered by Discuz!
快速回复 返回列表 返回顶部