UDN-企业互联网技术人气社区

板块导航

浏览  : 1054
回复  : 0

[MongoDB] Mongodb map-reduce实战

[复制链接]
巡山霉少女的头像 楼主
发表于 2016-4-27 20:01:14 | 显示全部楼层 |阅读模式

  Mongodb也支持map-reduce,作为聚合运算的一种方式。

  mongodb的聚合(Aggregation)总共有3种方式:

  Aggregation pipeline

  map-reduce function

  single purpose aggregation methods

  第1、3种都比较好理解,因为可以类比到我们熟悉的关系型数据库。第一种类似于group by查找,第三种类似于使用使用关系型数据库中自带的如count,distinct等函数做的简单的聚合查找。

  第二种其实也不复杂,让我通过最近项目中一个实际应用的例子,来向你展示mongodb map-reduce的应用,你就会发现他的强大之处,往往用来解决第1、3种聚合方式都不能解决的问题,这也是mongodb相比于传统关系型数据库在数据查找方面的过人之处。

  这个项目是面向建材行业的,用MEAN实现。其中有个需求:在项目表中分页查找出每个小区的名字和里面项目的数量,最近3天新增的和修改的项目数量,最近一次修改时间,并按最近修改时间排序(项目就是小区中要装修的某套房子的信息)。

  简单分析下,第三种用简单的count、distinct函数肯定处理不了这个问题,第一种方式虽然可以按照小区对项目分组,但是分组后统计近三天的新增和修改数量则无法实现,因为分组后只能进行简单的表达式运算,不能实现复杂的业务逻辑。还好mongodb还提供了最终杀器——map-reduce。

  首先,map函数

 
7.png


  用communityName(小区名)和areaId(省市区id)联合作为key,可以唯一确定一个小区(不同的城市可能有名字一样的小区,所以需要这两个字段联合做key)。value对象中把数据库中的crate_date,update_date传入reduce函数,用来计算最近3天的新增和修改情况,areaName(省市区名)用来给前端显示,count默认为1,用来做小区内项目总数的计数。这样就完成了按照不同小区的归类。

  之后,reduce函数

6.png


  在这里我们接收两个参数,分别是map函数传来的key和根据key归类的value数组values。我们遍历这个数组,把count累加就得到了小区内项目总数,然后简单的比较操作,找到最近一次更新的项目的更新时间。最后我们累计出最近3天的新增和修改数量(用update_date字段加3天和当前时间比较,另外如果update_date字段和create_date字段值相同,表示是新增的,不同则是修改的)。把结果封装到result对象返回。

  OK,貌似搞定了,实则不然。因为有些小区恰好只有1个项目,这样的记录通过map函数之后就不会再经过reduce函数处理了,这样会导致最终返回的json数组中的对象属性不一致。因此我们还需要再过滤一下,以便得到结构一致的结果。代码如下:

5.png


  不管有没有经过reduce函数,最终都会调用finalize函数,如果没有经过reduce,那么reduceVal传入的将是map函数的value对象我们通过 !reducedVal.latestUpdateDate && reducedVal.create_date && reducedVal.update_date

  这3个条件就可以判断出来,那么我们要做的就是把它转成reduce函数中的result对象,这样最终数组中的元素格式就一致了。

  引入查找条件:

4.png


  option对象是查找条件,这里就不展开了。mongo会先根据查找条件找出结果集,再进行map-reduce聚合。

  最后我们将查找结果放到results数组里面。

  接下来还有些善后工作:

  排序:(悲剧,需求是根据最近更新时间排序,这个是reduce中算出来的。没办法利用数据库的排序和分页了, 用lodash吧)

3.png


  分页:

2.png


  最后,由于mongodb保存的时间都是用的GMT时区,我们必须转一下时区,并且format日期格式。

1.png


  OK,这才算真正完成了。完整代码如下:

  1.   this.getClueGroupByCommunity = function(option, page, size) {
  2.     return new Promise(function (resolve, reject) {
  3.         co(function*() {
  4.             try {
  5.                 var results = yield self.model.mapReduce({
  6.                     map: function () {
  7.                         var key = {
  8.                             communityName: this.address.communityName,
  9.                             areaId: this.address.areaId
  10.                         };
  11.                         var value = {
  12.                             count: 1,
  13.                             create_date: this.create_date,
  14.                             update_date: this.update_date,
  15.                             areaName: this.address.areaName
  16.                         };
  17.                         emit(key, value);
  18.                     },
  19.                     reduce: function (key, values) {
  20.                         var result = {
  21.                             count: 0,
  22.                             latestUpdateDate: '',
  23.                             added: 0,
  24.                             updated: 0,
  25.                             areaName: values[0].areaName
  26.                         };
  27.                         for (var i = 0; i < values.length; i++) {
  28.                             //project count
  29.                             result.count += values[i].count;

  30.                             //latest update date
  31.                             if (!result.latestUpdateDate || values[i].update_date > result.latestUpdateDate) {
  32.                                 result.latestUpdateDate = values[i].update_date;
  33.                             }

  34.                             //added and updated
  35.                             var endDate = new Date(values[i].update_date);
  36.                             endDate.setDate(endDate.getDate() + 3);
  37.                             if (endDate.getTime() >= new Date().getTime()) {
  38.                                 if (values[i].update_date.getTime() == values[i].create_date.getTime()) {
  39.                                     result.added += 1;
  40.                                 } else {
  41.                                     result.updated += 1;
  42.                                 }
  43.                             }
  44.                         }
  45.                         return result;
  46.                     },
  47.                     finalize: function (key, reducedVal) {
  48.                         if (!reducedVal.latestUpdateDate && reducedVal.create_date && reducedVal.update_date) {//mapped but not reduced
  49.                             var fixedReduceVal = {};
  50.                             fixedReduceVal.count = 1;
  51.                             fixedReduceVal.latestUpdateDate = reducedVal.update_date;
  52.                             fixedReduceVal.added = 0;
  53.                             fixedReduceVal.updated = 0;

  54.                             //added and updated
  55.                             var endDate = new Date(reducedVal.update_date);
  56.                             endDate.setDate(endDate.getDate() + 3);
  57.                             if (endDate.getTime() >= new Date().getTime()) {
  58.                                 if (reducedVal.update_date.getTime() == reducedVal.create_date.getTime()) {
  59.                                     fixedReduceVal.added = 1;
  60.                                 } else {
  61.                                     fixedReduceVal.updated = 1;
  62.                                 }
  63.                             }
  64.                             fixedReduceVal.areaName = reducedVal.areaName;
  65.                             return fixedReduceVal;
  66.                         } else {
  67.                             return reducedVal;
  68.                         }
  69.                     },
  70.                     query: option
  71.                 });

  72.                 //sort
  73.                 var sortedResult = _.sortByOrder(results, ['value.latestUpdateDate'], ['desc']);

  74.                 //page
  75.                 var start = (page - 1) * size;
  76.                 var end = start + size;
  77.                 var length = sortedResult.length;
  78.                 if (end < length) {
  79.                     sortedResult = _.take(sortedResult, end);
  80.                     sortedResult = _.takeRight(sortedResult, size);
  81.                 } else if (start < length && length <= end) {
  82.                     sortedResult = _.takeRight(sortedResult, (length - start));
  83.                 } else {
  84.                     sortedResult = [];
  85.                 }


  86.                 //date format
  87.                 for (var i = 0; i < sortedResult.length; i++) {
  88.                     sortedResult[i].value.latestUpdateDate = moment.tz(sortedResult[i].value.latestUpdateDate, 'Asia/Shanghai').format('YYYY-MM-DD');
  89.                 }

  90.                 return resolve({data: sortedResult, total: length.toString()});
  91.             } catch (e) {
  92.                 return reject(e);
  93.             }
  94.         });

  95.     });
  96. };
复制代码


  对结果稍加封装,最终测试环境API返回:

 
  1.  { "code": 200, "status": "ok", "data": { "communities": [ { "id": { "communityName": "爱家金河湾", "areaId": "559a144eb7ed7c6002221bc2" }, "value": { "count": "2", "latestUpdateDate": "2015-12-07", "added": "0", "updated": "0", "areaName": "江苏省无锡市崇安区" } }, { "id": { "communityName": "测试小区", "areaId": "55c05ca0a6d3b5c017a9f430" }, "value": { "count": "3", "latestUpdateDate": "2015-12-01", "added": "0", "updated": "0", "areaName": "上海市浦东新区" } }, { "id": { "communityName": "世茂首府", "areaId": "559a144eb7ed7c6002221bc2" }, "value": { "count": "1", "latestUpdateDate": "2015-11-23", "added": "0", "updated": "0", "areaName": "江苏省无锡市崇安区" } }, { "id": { "communityName": "保利达江湾城", "areaId": "559a144eb7ed7c6002221bc2" }, "value": { "count": "6", "latestUpdateDate": "2015-11-19", "added": "0", "updated": "0", "areaName": "江苏省无锡市崇安区" } }, { "id": { "communityName": "太湖锦绣园", "areaId": "55c051a5f1b19ef80b415698" }, "value": { "count": "5", "latestUpdateDate": "2015-11-18", "added": "0", "updated": "0", "areaName": "江苏省苏州市吴中区" } }, { "id": { "communityName": "江南和院", "areaId": "55b9d697e520079014d29fd7" }, "value": { "count": "1", "latestUpdateDate": "2015-11-18", "added": "0", "updated": "0", "areaName": "江苏省无锡市宜兴市" } }, { "id": { "communityName": "和泽佳苑", "areaId": "559a149fa78aca0431281187" }, "value": { "count": "1", "latestUpdateDate": "2015-11-12", "added": "0", "updated": "0", "areaName": "江苏省无锡市锡山区" } }, { "id": { "communityName": "华夏青城", "areaId": "559a149fa78aca0431281187" }, "value": { "count": "1", "latestUpdateDate": "2015-11-12", "added": "0", "updated": "0", "areaName": "江苏省无锡市锡山区" } }, { "id": { "communityName": "冠达豪景东苑", "areaId": "559a149fa78aca0431281187" }, "value": { "count": "1", "latestUpdateDate": "2015-11-12", "added": "0", "updated": "0", "areaName": "江苏省无锡市锡山区" } }, { "id": { "communityName": "橄榄湾", "areaId": "5599d741f84b0094180b857e" }, "value": { "count": "1", "latestUpdateDate": "2015-11-02", "added": "0", "updated": "0", "areaName": "江苏省南京市鼓楼区" } } ], "page": { "page": "1", "size": "10", "total": "12" } } }
复制代码


  最后,mongodb的官方文档也指出Aggregation pipeline会提供更好的性能,所以在对实时性要求较高的场景尽量不要用map-reduce。就像hadoop的map-reduce作业一般也是用于非实时数据分析一样。

原文作者:佚名 来源:开发者头条
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关于我们
联系我们
  • 电话:010-86393388
  • 邮件:udn@yonyou.com
  • 地址:北京市海淀区北清路68号
移动客户端下载
关注我们
  • 微信公众号:yonyouudn
  • 扫描右侧二维码关注我们
  • 专注企业互联网的技术社区
版权所有:用友网络科技股份有限公司82041 京ICP备05007539号-11 京公网网备安1101080209224 Powered by Discuz!
快速回复 返回列表 返回顶部