第7章 聚合

如果你有数据存储在MongoDB中,你想做的可能就不仅仅是将数据提取出来那么简单了;你可能希望对数据进行分析并加以利用。本章介绍MongoDB提供的聚合工具:

  • 聚合框架;
  • MapReduce;
  • 几个简单聚合命令:countdistinctgroup

7.1 聚合框架

使用聚合框架可以对集合中的文档进行变换和组合。基本上,可以用多个构件创建一个管道(pipeline),用于对一连串的文档进行处理。这些构件包括筛选(filtering)、投射(projecting)、分组(grouping)、排序(sorting)、限制(limiting)和跳过(skipping)。

例如,有一个保存着杂志文章的集合,你可能希望找出发表文章最多的那个作者。假设每篇文章被保存为MongoDB中的一个文档,可以按照如下步骤创建管道。

  • 将每个文章文档中的作者投射出来。
  • 将作者按照名字排序,统计每个名字出现的次数。
  • 将作者按照名字出现次数降序排列。
  • 将返回结果限制为前5个。

这里面的每一步都对应聚合框架中的一个操作符:

  • {"$project" : {"author" : 1}}

这样可以将"author"从每个文档中投射出来。

这个语法与查询中的字段选择器比较像:可以通过指定"fieldname" : 1选择需要投射的字段,或者通过指定"fieldname":0排除不需要的字段。执行完这个"$project"操作之后,结果集中的每个文档都会以{"_id" : id, "author" : "authorName"}这样的形式表示。这些结果只会在内存中存在,不会被写入磁盘。

  • {"$group" : {"_id" : "$author", "count" : {"$sum" : 1}}}

这样就会将作者按照名字排序,某个作者的名字每出现一次,就会对这个作者的"count"加1。

这里首先指定了需要进行分组的字段"author"。这是由"_id" : "$author"指定的。可以将这个操作想象为:这个操作执行完后,每个作者只对应一个结果文档,所以"author"就成了文档的唯一标识符("_id")。

第二个字段的意思是为分组内每个文档的"count"字段加1。注意,新加入的文档中并不会有"count"字段;这"$group"创建的一个新字段。

执行完这一步之后,结果集中的每个文档会是这样的结构:{"_id" : "authorName", "count" : articleCount}

  • {"$sort" : {"count" : -1}}

这个操作会对结果集中的文档根据"count"字段进行降序排列。

  • {"$limit" : 5}

    这个操作将最终的返回结果限制为当前结果中的前5个文档。

在MongoDB中实际运行时,要将这些操作分别传给aggregate()函数:

  1. > db.articles.aggregate({"$project" : {"author" : 1}},
  2. ... {"$group" : {"_id" : "$author", "count" : {"$sum" : 1}}},
  3. ... {"$sort" : {"count" : -1}},
  4. ... {"$limit" : 5})
  5. {
  6. "result" : [
  7. {
  8. "_id" : "R. L. Stine",
  9. "count" : 430
  10. },
  11. {
  12. "_id" : "Edgar Wallace",
  13. "count" : 175
  14. },
  15. {
  16. "_id" : "Nora Roberts",
  17. "count" : 145
  18. },
  19. {
  20. "_id" : "Erle Stanley Gardner",
  21. "count" : 140
  22. },
  23. {
  24. "_id" : "Agatha Christie",
  25. "count" : 85
  26. }
  27. ],
  28. "ok" : 1
  29. }

aggregate()会返回一个文档数组,其中的内容是发表文章最多的5个作者。

第7章 聚合 - 图1如果管道没有给出预期的结果,就需要进行调试,调试时,可以先只指定第一个管道操作符。如果这时得到了预期结果,那就再指定第二个管道操作符。以前面的例子来说,首先要试着只使用"$project"操作符进行聚合;如果这个操作符的结果是有效的,就再添加"$group"操作符;如果结果还是有效的,就再添加"$sort";最后再添加"$limit"操作符。这样就可以逐步定位到造成问题的操作符。

本书写作时,聚合框架还不能对集合进行写入操作,因此所有结果必须返回给客户端。所以,聚合的结果必须要限制在16 MB以内(MongoDB支持的最大响应消息大小)。

7.2 管道操作符

每个操作符都会接受一连串的文档,对这些文档做一些类型转换,最后将转换后的文档作为结果传递给下一个操作符(对于最后一个管道操作符,是将结果返回给客户端)。

不同的管道操作符可以按任意顺序组合在一起使用,而且可以被重复任意多次。例如,可以先做"$match",然后做"$group",然后再做"$match"(与之前的"$match"匹配不同的查询条件)。

7.2.1 $match

$match用于对文档集合进行筛选,之后就可以在筛选得到的文档子集上做聚合。例如,如果想对Oregon(俄勒冈州,简写为OR)的用户做统计,就可以使用{$match : {"state" : "OR"}}"$match"可以使用所有常规的查询操作符("$gt""$lt""$in"等)。有一个例外需要注意:不能在"$match"中使用地理空间操作符。

通常,在实际使用中应该尽可能将"$match"放在管道的前面位置。这样做有两个好处:一是可以快速将不需要的文档过滤掉,以减少管道的工作量;二是如果在投射和分组之前执行"$match",查询可以使用索引。

7.2.2 $project

相对于“普通”的查询而言,管道中的投射操作更加强大。使用"$project"可以从子文档中提取字段,可以重命名字段,还可以在这些字段上进行一些有意思的操作。

最简单的一个"$project"操作是从文档中选择想要的字段。可以指定包含或者不包含一个字段,它的语法与查询中的第二个参数类似。如果在原来的集合上执行下面的代码,返回的结果文档中只包含一个"author"字段。

  1. > db.articles.aggregate({"$project" : {"author" : 1, "_id" : 0}})

默认情况下,如果文档中存在"_id"字段,这个字段就会被返回("_id"字段可以被一些管道操作符移除,也可能已经被之前的投射操作给移除了)。可以使用上面的代码将"_id"从结果文档中移除。包含字段和排除字段的规则与常规查询中的语法一致。

也可以将投射过的字段进行重命名。例如,可以将每个用户文档的"_id"在返回结果中重命名为"userId"

  1. > db.users.aggregate({"$project" : {"userId" : "$_id", "_id" : 0}})
  2. {
  3. "result" : [
  4. {
  5. "userId" : ObjectId("50e4b32427b160e099ddbee7")
  6. },
  7. {
  8. "userId" : ObjectId("50e4b32527b160e099ddbee8")
  9. }
  10. ...
  11. ],
  12. "ok" : 1
  13. }

这里的"$fieldname"语法是为了在聚合框架中引用fieldname字段(上面的例子中是"_id")的值。例如,"$age"会被替换为"age"字段的内容(可能是数值,也可能是字符串),"$tags.3"会被替换为tags数组中的第4个元素。所以,上面例子中的"$_id"会被替换为进入管道的每个文档的"_id"字段的值。

注意,必须明确指定将"_id"排除,否则这个字段的值会被返回两次:一次被标为"userId",一次被标为"_id"。可以使用这种技术生成字段的多个副本,以便在之后的"$group"中使用。

在对字段进行重命名时,MongoDB并不会记录字段的历史名称。因此,如果在"originalFieldname"字段上有一个索引,聚合框架无法在下面的排序操作中使用这个索引,尽管人眼一下子就能看出下面代码中的"newFieldname""originalFieldname"表示同一个字段。

  1. > db.articles.aggregate({"$project" : {"newFieldname" : "$originalFieldname"}},
  2. ... {"$sort" : {"newFieldname" : 1}})

所以,应该尽量在修改字段名称之前使用索引。

1. 管道表达式

最简单的"$project"表达式是包含和排除字段,以及字段名称("$fieldname")。但是,还有一些更强大的选项。也可以使用表达式(expression)将多个字面量和变量组合在一个值中使用。

在聚合框架中有几个表达式可用来组合或者进行任意深度的嵌套,以便创建复杂的表达式。

2. 数学表达式(mathematical expression)

算术表达式可用于操作数值。指定一组数值,就可以使用这个表达式进行操作了。例如,下面的表达式会将"salary""bonus"字段的值相加。

  1. > db.employees.aggregate(
  2. ... {
  3. ... "$project" : {
  4. ... "totalPay" : {
  5. ... "$add" : ["$salary", "$bonus"]
  6. ... }
  7. ... }
  8. ... })

可以将多个表达式嵌套在一起组成更复杂的表达式。假设我们想要从总金额中扣除为401(k)〔1〕缴纳的金额。可以使用"$subtract"表达式:

1 401(k)是美国的一种养老金计划。——译者注

  1. > db.employees.aggregate(
  2. ... {
  3. ... "$project" : {
  4. ... "totalPay" : {
  5. ... "$subtract" : [{"$add" : ["$salary", "$bonus"]}, "$401k"]
  6. ... }
  7. ... }
  8. ... })

表达式可以进行任意层次的嵌套。

下面是每个操作符的语法:

"$add" : [expr1[, expr2, …, exprN]]

这个操作符接受一个或多个表达式作为参数,将这些表达式相加。

"$subtract" : [expr1, expr2]

接受两个表达式作为参数,用第一个表达式减去第二个表达式作为结果。

"$multiply" : [expr1[, expr2, …, exprN]]

接受一个或者多个表达式,并且将它们相乘。

"$divide" : [expr1, expr2]

接受两个表达式,用第一个表达式除以第二个表达式的商作为结果。

"$mod" : [expr1, expr2]

接受两个表达式,将第一个表达式除以第二个表达式得到的余数作为结果。

3. 日期表达式(date expression)

许多聚合都是基于时间的:上周发生了什么?上个月发生了什么?过去一年间发生了什么?因此,聚合框架中包含了一些用于提取日期信息的表达式:"$year"“$month”"$week""$dayOfMonth""$dayOfWeek""$dayOfYear""$hour""$minute""$second"。只能对日期类型的字段进行日期操作,不能对数值类型字段做日期操作。

每种日期类型的操作都是类似的:接受一个日期表达式,返回一个数值。下面的代码会返回每个雇员入职的月份:

  1. > db.employees.aggregate(
  2. ... {
  3. ... "$project" : {
  4. ... "hiredIn" : {"$month" : "$hireDate"}
  5. ... }
  6. ... })

也可以使用字面量日期。下面的代码会计算出每个雇员在公司内的工作时间:

  1. > db.employees.aggregate(
  2. ... {
  3. ... "$project" : {
  4. ... "tenure" : {
  5. ... "$subtract" : [{"$year" : new Date()}, {"$year" : "$hireDate"}]
  6. ... }
  7. ... }
  8. ... })

4. 字符串表达式(string expression)

也有一些基本的字符串操作可以使用,它们的签名如下所示:

"$substr" : [expr, startOffset, numToReturn]

其中第一个参数expr必须是个字符串,这个操作会截取这个字符串的子串(从第startOffset字节开始的numToReturn字节,注意,是字节,不是字符。在多字节编码中尤其要注意这一点)expr必须是字符串。

"$concat" : [expr1[, expr2, …, exprN]]

将给定的表达式(或者字符串)连接在一起作为返回结果。

"$toLower" : expr

参数expr必须是个字符串值,这个操作返回expr的小写形式。

"$toUpper" : expr

参数expr必须是个字符串值,这个操作返回expr的大写形式。

改变字符大小写的操作,只保证对罗马字符有效。

下面是一个生成 j.doe@example.com格式的email地址的例子。它提取"$firstname"的第一个字符,将其与多个常量字符串和"$lastname"连接成一个字符串:

  1. > db.employees.aggregate(
  2. ... {
  3. ... "$project" : {
  4. ... "email" : {
  5. ... "$concat" : [
  6. ... {"$substr" : ["$firstName", 0, 1]},
  7. ... ".",
  8. ... "$lastName",
  9. ... "@example.com"
  10. ... ]
  11. ... }
  12. ... }
  13. ... })

5. 逻辑表达式(logical expression)

有一些逻辑表达式可以用于控制语句。

下面是几个比较表达式。

"$cmp" : [expr1, expr2]

比较expr1expr2。如果expr1等于expr2,返回0;如果expr1 < expr2,返回一个负数;如果expr1 >expr2,返回一个正数。

"$strcasecmp" : [string1, string2]

比较string1string2,区分大小写。只对罗马字符组成的字符串有效。

"$eq"/"$ne"/"$gt"/"$gte"/"$lt"/"$lte" : [expr1, expr2]

expr1expr2执行相应的比较操作,返回比较的结果(truefalse)。

下面是几个布尔表达式。

"$and" : [expr1[, expr2, …, exprN]]

如果所有表达式的值都是true,那就返回true,否则返回false

"$or" : [expr1[, expr2, …, exprN]]

只要有任意表达式的值为true,就返回true,否则返回false

"$not" : expr

expr取反。

还有两个控制语句。

"$cond" : [booleanExpr, trueExpr, falseExpr]

如果booleanExpr的值是true,那就返回trueExpr,否则返回falseExpr

"$ifNull" : [expr, replacementExpr]

如果exprnull,返回replacementExpr,否则返回expr

通过这些操作符,就可以在聚合中使用更复杂的逻辑,可以对不同数据执行不同的代码,得到不同的结果。

管道对于输入数据的形式有特定要求,所以这些操作符在传入数据时要特别注意。算术操作符必须接受数值,日期操作符必须接受日期,字符串操作符必须接受字符串,如果有字符缺失,这些操作符就会报错。如果你的数据集不一致,可以通过这个条件来检测缺失的值,并且进行填充。

6. 一个提取的例子

假如有个教授想通过某种比较复杂的计算为学生打分:出勤率占10%,日常测验成绩占30%,期末考试占60%(如果是老师最宠爱的学生,那么分数就是100)。可以使用如下代码:

  1. > db.students.aggregate(
  2. ... {
  3. ... "$project" : {
  4. ... "grade" : {
  5. ... "$cond" : [
  6. ... "$teachersPet",
  7. ... 100, // if
  8. ... { // else
  9. ... "$add" : [
  10. ... {"$multiply" : [.1, "$attendanceAvg"]},
  11. ... {"$multiply" : [.3, "$quizzAvg"]},
  12. ... {"$multiply" : [.6, "$testAvg"]}
  13. ... ]
  14. ... }
  15. ... ]
  16. ... }
  17. ... }
  18. ... })

7.2.3 $group

$group操作可以将文档依据特定字段的不同值进行分组。下面是几个分组的例子。

  • 如果我们以分钟作为计量单位,希望找出每天的平均湿度,就可以根据"day"字段进行分组。
  • 如果有一个学生集合,希望按照分数等级将学生分为多个组,可以根据"grade"字段进行分组。
  • 如果有一个用户集合,希望知道每个城市有多少用户,可以根据"state""city"两个字段对集合进行分组,每个"city"/"state"对对应一个分组。不应该只根据"city"字段进行分组,因为不同的州可能拥有相同名字的城市。

如果选定了需要进行分组的字段,就可以将选定的字段传递给"$group"函数的"_id"字段。对于上面的例子,相应的代码如下:

  • {"$group" : {"_id" : "$day"}}
  • {"$group" : {"_id" : "$grade"}}
  • {"$group" : {"_id" : {"state" : "$state", "city" : "$city"}}}

如果执行这些代码,结果集中每个分组对应一个只有一个字段(分组键)的文档。例如,按学生分数等级进行分组的结果可能是:{"result" : [{"_id" : "A+"}, {"_id" : "A"}, {"_id" : "A-"}, …, {"_id" : "F"}], "ok" : 1}。通过上面这些代码,可以得到特定字段中每一个不同的值,但是所有例子都要求基于这些分组进行一些计算。因此,可以添加一些字段,使用分组操作符对每个分组中的文档做一些计算。

1. 分组操作符

这些分组操作符允许对每个分组进行计算,得到相应的结果。7.1节介绍过"$sum"分组操作符的作用:分组中每出现一个文档,它就对计算结果加1,这样便可以得到每个分组中的文档数量。

2. 算术操作符

有两个操作符可以用于对数值类型字段的值进行计算:"$sum""$average"

  • "$sum" : value

    对于分组中的每一个文档,将value与计算结果相加。注意,上面的例子中使用了一个字面量数字1,但是这里也可以使用比较复杂的值。例如,如果有一个集合,其中的内容是各个国家的销售数据,使用下面的代码就可以得到每个国家的总收入:

  1. > db.sales.aggregate(
  2. ... {
  3. ... "$group" : {
  4. ... "_id" : "$country",
  5. ... "totalRevenue" : {"$sum" : "$revenue"}
  6. ... }
  7. ... })
  • "$avg" : value

返回每个分组的平均值。

例如,下面的代码会返回每个国家的平均收入,以及每个国家的销量:

  1. > db.sales.aggregate(
  2. ... {
  3. ... "$group" : {
  4. ... "_id" : "$country",
  5. ... "totalRevenue" : {"$avg" : "$revenue"},
  6. ... "numSales" : {"$sum" : 1}
  7. ... }
  8. ... })

3. 极值操作符(extreme operator)

下面的四个操作符可用于得到数据集合中的“边缘”值。

  • "$max" : expr 返回分组内的最大值。

  • "$min" : expr

    返回分组内的最小值。

  • "$first" : expr 返回分组的第一个值,忽略后面所有值。只有排序之后,明确知道数据顺序时这个操作才有意义。

  • "$last" : expr

    "$first"相反,返回分组的最后一个值。

"$max""$min"会查看每一个文档,以便得到极值。因此,如果数据是无序的,这两个操作符也可以有效工作;如果数据是有序的,这两个操作符就会有些浪费。假设有一个存有学生考试成绩的数据集,需要找到其中的最高分与最低分:

  1. > db.scores.aggregate(
  2. ... {
  3. ... "$group" : {
  4. ... "_id" : "$grade",
  5. ... "lowestScore" : {"$min" : "$score"},
  6. ... "highestScore" : {"$max" : "$score"}
  7. ... }
  8. ... })

另一方面,如果数据集是按照希望的字段排序过的,那么"$first""$last"操作符就会非常有用。下面的代码与上面的代码可以得到同样的结果:

  1. > db.scores.aggregate(
  2. ... {
  3. ... "$sort" : {"score" : 1}
  4. ... },
  5. ... {
  6. ... "$group" : {
  7. ... "_id" : "$grade",
  8. ... "lowestScore" : {"$first" : "$score"},
  9. ... "highestScore" : {"$last" : "$score"}
  10. ... }
  11. ... })

如果数据是排过序的,那么$first$last会比$min$max效率更高。如果不准备对数据进行排序,那么直接使用$min$max会比先排序再使用$first$last效率更高。

4. 数组操作符

有两个操作符可以进行数组操作。

  • "$addToSet" : expr

如果当前数组中不包含expr ,那就将它添加到数组中。在返回结果集中,每个元素最多只出现一次,而且元素的顺序是不确定的。

  • "$push" : expr

    不管expr是什么值,都将它添加到数组中。返回包含所有值的数组。

5. 分组行为

有两个操作符不能用前面介绍的流式工作方式对文档进行处理,"$group"是其中之一。大部分操作符的工作方式都是流式的,只要有新文档进入,就可以对新文档进行处理,但是"$group"必须要等收到所有的文档之后,才能对文档进行分组,然后才能将各个分组发送给管道中的下一个操作符。这意味着,在分片的情况下,"$group"会先在每个分片上执行,然后各个分片上的分组结果会被发送到mongos再进行最后的统一分组,剩余的管道工作也都是在mongos(而不是在分片)上运行的。

7.2.4 $unwind

拆分(unwind)可以将数组中的每一个值拆分为单独的文档。例如,如果有一篇拥有多条评论的博客文章,可以使用$unwind将每条评论拆分为一个独立的文档:

  1. > db.blog.findOne()
  2. {
  3. "_id" : ObjectId("50eeffc4c82a5271290530be"),
  4. "author" : "k",
  5. "post" : "Hello, world!",
  6. "comments" : [
  7. {
  8. "author" : "mark",
  9. "date" : ISODate("2013-01-10T17:52:04.148Z"),
  10. "text" : "Nice post"
  11. },
  12. {
  13. "author" : "bill",
  14. "date" : ISODate("2013-01-10T17:52:04.148Z"),
  15. "text" : "I agree"
  16. }
  17. ]
  18. }
  19. > db.blog.aggregate({"$unwind" : "$comments"})
  20. {
  21. "results" :
  22. {
  23. "_id" : ObjectId("50eeffc4c82a5271290530be"),
  24. "author" : "k",
  25. "post" : "Hello, world!",
  26. "comments" : {
  27. "author" : "mark",
  28. "date" : ISODate("2013-01-10T17:52:04.148Z"),
  29. "text" : "Nice post"
  30. }
  31. },
  32. {
  33. "_id" : ObjectId("50eeffc4c82a5271290530be"),
  34. "author" : "k",
  35. "post" : "Hello, world!",
  36. "comments" : {
  37. "author" : "bill",
  38. "date" : ISODate("2013-01-10T17:52:04.148Z"),
  39. "text" : "I agree"
  40. }
  41. }
  42. ],
  43. "ok" : 1
  44. }

如果希望在查询中得到特定的子文档,这个操作符就会非常有用:先使用"$unwind"得到所有子文档,再使用"$match"得到想要的文档。例如,如果要得到特定用户的所有评论(只需要得到评论,不需要返回评论所属的文章),使用普通的查询是不可能做到的。但是,通过提取、拆分、匹配,就很容易了:

  1. > db.blog.aggregate({"$project" : {"comments" : "$comments"}},
  2. ... {"$unwind" : "$comments"},
  3. ... {"$match" : {"comments.author" : "Mark"}})

由于最后得到的结果仍然是一个"comments"子文档,所以你可能希望再做一次投射,以便让输出结果更优雅。

7.2.5 $sort

可以根据任何字段(或者多个字段)进行排序,与在普通查询中的语法相同。如果要对大量的文档进行排序,强烈建议在管道的第一阶段进行排序,这时的排序操作可以使用索引。否则,排序过程就会比较慢,而且会占用大量内存。

可以在排序中使用文档中实际存在的字段,也可以使用在投射时重命名的字段:

  1. > db.employees.aggregate(
  2. ... {
  3. ... "$project" : {
  4. ... "compensation" : {
  5. ... "$add" : ["$salary", "$bonus"]
  6. ... },
  7. ... "name" : 1
  8. ... }
  9. ... },
  10. ... {
  11. ... "$sort" : {"compensation" : -1, "name" : 1}
  12. ... })

这个例子会对员工排序,最终的结果是按照报酬从高到低,姓名从A到Z的顺序排列。

排序方向可以是1(升序)和-1(降序)。

与前面讲过的"$group"一样,"$sort"也是一个无法使用流式工作方式的操作符。"$sort"也必须要接收到所有文档之后才能进行排序。在分片环境下,先在各个分片上进行排序,然后将各个分片的排序结果发送到mongos做进一步处理。

7.2.6 $limit

$limit会接受一个数字n,返回结果集中的前n个文档。

7.2.7 $skip

$skip也是接受一个数字n,丢弃结果集中的前n个文档,将剩余文档作为结果返回。在“普通”查询中,如果需要跳过大量的数据,那么这个操作符的效率会很低。在聚合中也是如此,因为它必须要先匹配到所有需要跳过的文档,然后再将这些文档丢弃。

7.2.8 使用管道

应该尽量在管道的开始阶段(执行"$project""$group"或者"$unwind"操作之前)就将尽可能多的文档和字段过滤掉。管道如果不是直接从原先的集合中使用数据,那就无法在筛选和排序中使用索引。如果可能,聚合管道会尝试对操作进行排序,以便能够有效使用索引。

MongoDB不允许单一的聚合操作占用过多的系统内存:如果MongoDB发现某个聚合操作占用了20%以上的内存,这个操作就会直接输出错误。允许将输出结果利用管道放入一个集合中是为了方便以后使用(这样可以将所需的内存减至最小)。

如果能够通过"$match"操作迅速减小结果集的大小,就可以使用管道进行实时聚合。由于管道会不断包含更多的文档,会越来越复杂,所以几乎不可能实时得到管道的操作结果。

7.3 MapReduce

MapReduce是聚合工具中的明星,它非常强大、非常灵活。有些问题过于复杂,无法使用聚合框架的查询语言来表达,这时可以使用MapReduce。MapReduce使用JavaScript作为“查询语言”,因此它能够表达任意复杂的逻辑。然而,这种强大是有代价的:MapReduce非常慢,不应该用在实时的数据分析中。

MapReduce能够在多台服务器之间并行执行。它会将一个大问题分割为多个小问题,将各个小问题发送到不同的机器上,每台机器只负责完成一部分工作。所有机器都完成时,再将这些零碎的解决方案合并为一个完整的解决方案。

MapReduce需要几个步骤。最开始是映射(map),将操作映射到集合中的每个文档。这个操作要么“无作为”,要么“产生一些键和X个值”。然后就是中间环节,称作洗牌(shuffle),按照键分组,并将产生的键值组成列表放到对应的键中。化简(reduce)则把列表中的值化简成一个单值。这个值被返回,然后接着进行洗牌,直到每个键的列表只有一个值为止,这个值也就是最终结果。

下面会多举几个MapReduce的例子,这个工具非常强大,但也有点复杂。

7.3.1 示例1:找出集合中的所有键

用MapReduce来解决这个问题有点大材小用,不过还是一种了解其机制的不错的方式。要是已经知道MapReduce的原理,则直接跳到本节最后,看看MongoDB中MapReduce的使用注意事项。

MongoDB会假设你的模式是动态的,所以并不跟踪记录每个文档中的键。通常找到集合中所有文档所有键的最好方式就是用MapReduce。在本例中,会记录每个键出现了多少次。内嵌文档中的键就不计算了,但给map函数做个简单修改就能实现这个功能了。

在映射环节,我们希望得到集合中每个文档的所有键。map函数使用特别的emit函数“返回”要处理的值。emit会给MapReduce一个键(类似于前面$group所使用的键)和一个值。这里用emit将文档某个键的计数(count)返回({count : 1})。我们想为每个键单独计数,所以为文档中的每个键调用一次emitthis就是当前映射文档的引用:

  1. > map = function() {
  2. ... for (var key in this) {
  3. ... emit(key, {count : 1});
  4. ... }};

这样就有了许许多多{count : 1}文档,每一个都与集合中的一个键相关。这种由一个或多个{count : 1}文档组成的数组,会传递给reduce函数。reduce函数有两个参数,一个是key,也就是emit返回的第一个值,还有另外一个数组,由一个或者多个与键对应的{count : 1}文档组成。

  1. > reduce = function(key, emits) {
  2. ... total = 0;
  3. ... for (var i in emits) {
  4. ... total += emits[i].count;
  5. ... }
  6. ... return {"count" : total};
  7. ... }

reduce一定要能够在之前的map阶段或者前一个reduce阶段的结果上反复执行。所以reduce返回的文档必须能作为reduce的第二个参数的一个元素。例如,x键映射到了3个文档{count : 1,id : 1}{count : 1,id : 2}{count : 1,id : 3},其中id键只用于区分不同的文档。MongoDB可能会这样调用reduce

  1. > r1 = reduce("x", [{count : 1, id : 1}, {count : 1, id : 2}])
  2. {count : 2}
  3. > r2 = reduce("x", [{count : 1, id : 3}])
  4. {count : 1}
  5. > reduce("x", [r1, r2])
  6. {count : 3}

不能认为第二个参数总是初始文档之一(比如{count:1})或者长度固定。reduce应该能处理emit文档和其他reduce返回结果的各种组合。

总之,MapReduce函数可能会是下面这样:

  1. > mr = db.runCommand({"mapreduce" : "foo", "map" : map, "reduce" : reduce})
  2. {
  3. "result" : "tmp.mr.mapreduce_1266787811_1",
  4. "timeMillis" : 12,
  5. "counts" : {
  6. "input" : 6
  7. "emit" : 14
  8. "output" : 5
  9. },
  10. "ok" : true
  11. }

MapReduce返回的文档包含很多与操作有关的元信息。

  • "result" : "tmp.mr.mapreduce_1266787811_1"

    这是存放MapReduce结果的集合名。这是个临时集合,MapReduce的连接关闭后它就被自动删除了。本章稍后会介绍如何指定一个好一点的名字以及将结果集合持久化。

  • "timeMillis" : 12

操作花费的时间,单位是毫秒。

  • "counts" : { … }

    这个内嵌文档主要用作调试,其中包含3个键。

    • "input" : 6

发送到map函数的文档个数。

  • "emit" : 14

    map函数中emit被调用的次数。

  • "output" : 5

    结果集合中的文档数量。

对结果集合进行查询会发现原有集合的所有键及其计数:

  1. > db[mr.result].find()
  2. { "_id" : "_id", "value" : { "count" : 6 } }
  3. { "_id" : "a", "value" : { "count" : 4 } }
  4. { "_id" : "b", "value" : { "count" : 2 } }
  5. { "_id" : "x", "value" : { "count" : 1 } }
  6. { "_id" : "y", "value" : { "count" : 1 } }

这个结果集中的每个"_id"对应原集合中的一个键,"value"键的值就是reduce的最终结果。

7.3.2 示例2:网页分类

假设有个网站,人们可以提交其他网页的链接,比如reddit(http://www.reddit.com)。提交者可以给这个链接添加标签,表明主题,比如politics、geek或者icanhascheezburger。可以用MapReduce找出哪个主题最为热门,热门与否由最近的投票决定。

首先,建立一个map函数,发出(emit)标签和一个基于流行度和新旧程度的值。

  1. map = function() {
  2. for (var i in this.tags) {
  3. var recency = 1/(new Date() - this.date);
  4. var score = recency * this.score;
  5. emit(this.tags[i], {"urls" : [this.url], "score" : score});
  6. }
  7. };

现在就化简同一个标签的所有值,以得到这个标签的分数:

  1. reduce = function(key, emits) {
  2. var total = {urls : [], score : 0}
  3. for (var i in emits) {
  4. emits[i].urls.forEach(function(url) {
  5. total.urls.push(url);
  6. }
  7. total.score += emits[i].score;
  8. }
  9. return total;
  10. };

最终的集合包含每个标签的URL列表和表示该标签流行程度的分数。

7.3.3 MongoDB和MapReduce

前面两个例子只用到了mapreducemapreduce键。这3个键是必需的,但是MapReduce命令还有很多可选的键。

  • "finalize" : function

    可以将reduce的结果发送给这个键,这是整个处理过程的最后一步。

  • "keeptemp" : boolean

    如果为值为true,那么在连接关闭时会将临时结果集合保存下来,否则不保存。

  • "out" : string

    输出集合的名称。如果设置了这选项,系统会自动设置keeptemp : true

  • "query" : document

    在发往map函数前,先用指定条件过滤文档。

  • "sort" : document

    在发往map前先给文档排序(与limit一同使用非常有用)。

  • "limit" : integer

    发往map函数的文档数量的上限。

  • "scope" : document

    可以在JavaScript代码中使用的变量。

  • "verbose" : boolean

是否记录详细的服务器日志。

1. finalize函数

group命令一样,MapReduce也可以使用finalize函数作为参数。它会在最后一个reduce输出结果后执行,然后将结果存到临时集合中。

返回体积比较大的结果集对MapReduce不是什么大不了的事情,因为它不像group那样有4 MB的限制。然而,信息总是要传递出去的,通常来说,finalize是计算平均数、裁剪数组、清除多余信息的好时机。

2. 保存结果集合

默认情况下,Mongo会在执行MapReduce时创建一个临时集合,集合名是系统选的一个不太常用的名字,将"mr"、执行MapReduce的集合名、时间戳以及数据库作业ID,用“.”连成一个字符串,这就是临时集合的名字。结果产生形如mr.stuff.18234210220.2这样的名字。MongoDB会在调用的连接关闭时自动销毁这个集合(也可以在用完之后手动删除)。如果希望保存这个集合,就要将keeptemp选项指定为true

如果要经常使用这个临时集合,你可能想给它起个好点的名字。利用out选项(该选项接受字符串作为参数)就可以为临时集合指定一个易读易懂的名字。如果用了out选项,就不必指定keeptemp : true了,因为指定out选项时系统会将keeptemp设置为true。即便你取了一个非常好的名字,MongoDB也会在MapReduce的中间过程使用自动生成的集合名。处理完成后,会自动将临时集合的名字更改为你指定的集合名,这个重命名的过程是原子性的。也就是说,如果多次对同一个集合调用MapReduce,也不会在操作中遇到集合不完整的情况。

MapReduce产生的集合就是一个普通的集合,在这个集合上执行MapReduce完全没有问题,或者在前一个MapReduce的结果上执行MapReduce也没有问题,如此往复直到无穷都没问题!

3. 对文档子集执行MapReduce

有时需要对集合的一部分执行MapReduce。只需在传给map函数前使用查询对文档进行过滤就好了。

每个传递给map函数的文档都要先反序列化,从BSON对象转换为JavaScript对象,这个过程非常耗时。如果事先知道只需要对集合的一部分文档执行MapReduce,那么在map之前先对文档进行过滤可以极大地提高map速度。可以通过"query""limit""sort"等键对文档进行过滤。

"query"键的值是一个查询文档。通常查询返回的结果会传递给map函数。例如,有一个做跟踪分析的应用程序,现在我们需要上周的总结摘要,只要使用如下命令对上周的文档执行MapReduce就好了:

  1. > db.runCommand({"mapreduce" : "analytics", "map" : map, "reduce" : reduce,
  2. "query" : {"date" : {"$gt" : week_ago}}})

sort选项和limit一起使用时通常能够发挥非常大的作用。limit也可以单独使用,用来截取一部分文档发送给map函数。

如果在上个例子中想分析最近10 000个页面的访问次数(而不是最近一周的),就可以使用limitsort

  1. > db.runCommand({"mapreduce" : "analytics", "map" : map, "reduce" : reduce,
  2. "limit" : 10000, "sort" : {"date" : -1}})

querylimitsort可以随意组合,但是如果不使用limit的话,sort就不能有效发挥作用。

4. 使用作用域

MapReduce可以为mapreducefinalize函数都采用一种代码类型。但多数语言里,可以指定传递代码的作用域。然而MapReduce会忽略这个作用域。它有自己的作用域键"scope",如果想在MapReduce中使用客户端的值,则必须使用这个参数。可以用“变量名 : 值”这样的普通文档来设置该选项,然后在mapreducefinalize函数中就能使用了。作用域在这些函数内部是不变的。例如,上一节的例子使用1/(newDate() - this.date)计算页面的新旧程度。可以将当前日期作为作用域的一部分传递进去:

  1. > db.runCommand({"mapreduce" : "webpages", "map" : map, "reduce" : reduce,
  2. "scope" : {now : new Date()}})

这样,在map函数中就能计算1/(now - this.date)了。

5. 获得更多的输出

还有个用于调试的详细输出选项。如果想看看MapReduce的运行过程,可以将"verbose"指定为true

也可以用printmapreducefinalize过程中的信息输出到服务器日志上。

7.4 聚合命令

MongoDB为在集合上执行基本的聚合任务提供了一些命令。这些命令在聚合框架出现之前就已经存在了,现在(大多数情况下)已经被聚合框架取代。然而,复杂的group操作可能仍然需要使用JavaScript,countdistinct操作可以被简化为普通命令,不需要使用聚合框架。

7.4.1 count

count是最简单的聚合工具,用于返回集合中的文档数量:

  1. > db.foo.count()
  2. 0
  3. > db.foo.insert({"x" : 1})
  4. > db.foo.count()
  5. 1

不论集合有多大,count都会很快返回总的文档数量。

也可以给count传递一个查询文档,Mongo会计算查询结果的数量:

  1. > db.foo.insert({"x" : 2})
  2. > db.foo.count()
  3. 2
  4. > db.foo.count({"x" : 1})
  5. 1

对分页显示来说总数非常必要:“共439个,目前显示0~10个”。但是,增加查询条件会使count变慢。count可以使用索引,但是索引并没有足够的元数据供count使用,所以不如直接使用查询来得快。

7.4.2 distinct

distinct用来找出给定键的所有不同值。使用时必须指定集合和键。

  1. > db.runCommand({"distinct" : "people", "key" : "age"})

假设集合中有如下文档:

  1. {"name" : "Ada", "age" : 20}
  2. {"name" : "Fred", "age" : 35}
  3. {"name" : "Susan", "age" : 60}
  4. {"name" : "Andy", "age" : 35}

如果对"age"键使用distinct,会得到所有不同的年龄:

  1. > db.runCommand({"distinct" : "people", "key" : "age"})
  2. {"values" : [20, 35, 60], "ok" : 1}

这里还有一个常见问题:有没有办法获得集合里面所有不同的呢?MongoDB并没有直接提供这样的功能,但是可以用MapReduce(详见7.3节)自己写一个。

7.4.3 group

使用group可以执行更复杂的聚合。先选定分组所依据的键,而后MongoDB就会将集合依据选定键的不同值分成若干组。然后可以对每一个分组内的文档进行聚合,得到一个结果文档。

第7章 聚合 - 图2如果你熟悉SQL,那么这个group和SQL中的GROUP BY差不多。

假设现在有个跟踪股票价格的站点。从上午10点到下午4点每隔几分钟就会更新某只股票的价格,并保存在MongoDB中。现在报表程序要获得近30天的收盘价。用group就可以轻松办到。

股价集合中包含数以千计如下形式的文档:

  1. {"day" : "2010/10/03", "time" : "10/3/2010 03:57:01 GMT-400", "price" : 4.23}
  2. {"day" : "2010/10/04", "time" : "10/4/2010 11:28:39 GMT-400", "price" : 4.27}
  3. {"day" : "2010/10/03", "time" : "10/3/2010 05:00:23 GMT-400", "price" : 4.10}
  4. {"day" : "2010/10/06", "time" : "10/6/2010 05:27:58 GMT-400", "price" : 4.30}
  5. {"day" : "2010/10/04", "time" : "10/4/2010 08:34:50 GMT-400", "price" : 4.01}

第7章 聚合 - 图3注意,由于精度的问题,实际使用中不要将金额以浮点数的方式存储,这个例子只是为了简便才这么做。

我们需要的结果列表中应该包含每天的最后交易时间和价格,就像下面这样:

  1. [
  2. {"time" : "10/3/2010 05:00:23 GMT-400", "price" : 4.10},
  3. {"time" : "10/4/2010 11:28:39 GMT-400", "price" : 4.27},
  4. {"time" : "10/6/2010 05:27:58 GMT-400", "price" : 4.30}
  5. ]

先把集合按照"day"字段进行分组,然后在每个分组中查找"time"值最大的文档,将其添加到结果集中就完成了。整个过程如下所示:

  1. > db.runCommand({"group" : {
  2. ... "ns" : "stocks",
  3. ... "key" : "day",
  4. ... "initial" : {"time" : 0},
  5. ... "$reduce" : function(doc, prev) {
  6. ... if (doc.time > prev.time) {
  7. ... prev.price = doc.price;
  8. ... prev.time = doc.time;
  9. ... }
  10. ... }}})

把这个命令分解开看看。

  • "ns" : "stocks"

    指定要进行分组的集合。

  • "key" : "day"

    指定文档分组依据的键。这里就是"day"键。所有"day"值相同的文档被分到一组。

  • "initial" : {"time" : 0}

    每一组reduce函数调用中的初始"time"值,会作为初始文档传递给后续过程。每一组的所有成员都会使用这个累加器,所以它的任何变化都可以保存下来。

  • "$reduce" : function(doc, prev) { … }

这个函数会在集合内的每个文档上执行。系统会传递两个参数:当前文档和累加器文档(本组当前的结果)。本例中,想让reduce函数比较当前文档的时间和累加器的时间。如果当前文档的时间更晚一些,则将累加器的日期和价格替换为当前文档的值。别忘了,每一组都有一个独立的累加器,所以不必担心不同日期的命令会使用同一个累加器。

在问题一开始的描述中,就提到只要最近30天的股价。然而,我们在这里迭代了整个集合。这就是要添加"condition"的原因,因为这样就可以只对必要的文档进行处理。

  1. > db.runCommand({"group" : {
  2. ... "ns" : "stocks",
  3. ... "key" : "day",
  4. ... "initial" : {"time" : 0},
  5. ... "$reduce" : function(doc, prev) {
  6. ... if (doc.time > prev.time) {
  7. ... prev.price = doc.price;
  8. ... prev.time = doc.time;
  9. ... }},
  10. ... "condition" : {"day" : {"$gt" : "2010/09/30"}}
  11. ... }})

第7章 聚合 - 图4有些参考资料提及"cond"键或者"q"键,其实和"condition"键是完全一样的(就是表达力不如"condition"好)。

最后就会返回一个包含30个文档的数组,其实每个文档都是一个分组。每组都包含分组依据的键(这里就是"day" : string)以及这组最终的prev值。如果有的文档不存在指定用于分组的键,这些文档会被单独分为一组,缺失的键会使用"day : null"这样的形式。在"condition"中加入"day" : {"$exists" : true}就可以排除不包含指定用于分组的键的文档。group命令同时返回了用到的文档总数和"key"的不同值数量:

  1. > db.runCommand({"group" : {...}})
  2. {
  3. "retval" :
  4. [
  5. {
  6. "day" : "2010/10/04",
  7. "time" : "Mon Oct 04 2010 11:28:39 GMT-0400 (EST)"
  8. "price" : 4.27
  9. },
  10. ...
  11. ],
  12. "count" : 734,
  13. "keys" : 30,
  14. "ok" : 1
  15. }

这里每组的"price"都是显式设置的,"time"先由初始化器设置,然后在迭代中进行更新。"day"是默认被加进去的,因为用于分组的键会默认加入到每个"retval"内嵌文档中。要是不想在结果集中看到这个键,可以用完成器将累加器文档变为任何想要的形态,甚至变换成非文档(例如数字或字符串)。

1. 使用完成器

完成器(finalizer)用于精简从数据库传到用户的数据,这个步骤非常重要,因为group命令的输出结果需要能够通过单次数据库响应返回给用户。为进一步说明,这里举个博客的例子,其中每篇文章都有多个标签(tag)。现在要找出每天最热门的标签。可以(再一次)按天分组,得到每一个标签的计数。就像下面这样:

  1. > db.posts.group({
  2. ... "key" : {"day" : true},
  3. ... "initial" : {"tags" : {}},
  4. ... "$reduce" : function(doc, prev) {
  5. ... for (i in doc.tags) {
  6. ... if (doc.tags[i] in prev.tags) {
  7. ... prev.tags[doc.tags[i]]++;
  8. ... } else {
  9. ... prev.tags[doc.tags[i]] = 1;
  10. ... }
  11. ... }
  12. ... }})

得到的结果如下所示:

  1. [
  2. {"day" : "2010/01/12", "tags" : {"nosql" : 4, "winter" : 10, "sledding" : 2}},
  3. {"day" : "2010/01/13", "tags" : {"soda" : 5, "php" : 2}},
  4. {"day" : "2010/01/14", "tags" : {"python" : 6, "winter" : 4, "nosql": 15}}
  5. ]

接着可以在客户端找出"tags"文档中出现次数最多的标签。然而,向客户端发送每天所有的标签文档需要许多额外的开销——每天所有的键/值对都被传送给用户,而我们需要的仅仅是一个字符串。这也就是group有一个可选的"finalize"键的原因。"finalize"可以包含一个函数,在每组结果传递到客户端之前调用一次。可以使用"finalize"函数将不需要的内容从结果集中移除:

  1. > db.runCommand({"group" : {
  2. ... "ns" : "posts",
  3. ... "key" : {"day" : true},
  4. ... "initial" : {"tags" : {}},
  5. ... "$reduce" : function(doc, prev) {
  6. ... for (i in doc.tags) {
  7. ... if (doc.tags[i] in prev.tags) {
  8. ... prev.tags[doc.tags[i]]++;
  9. ... } else {
  10. ... prev.tags[doc.tags[i]] = 1;
  11. ... }
  12. ... },
  13. ... "finalize" : function(prev) {
  14. ... var mostPopular = 0;
  15. ... for (i in prev.tags) {
  16. ... if (prev.tags[i] > mostPopular) {
  17. ... prev.tag = i;
  18. ... mostPopular = prev.tags[i];
  19. ... }
  20. ... }
  21. ... delete prev.tags
  22. ... }}})

现在,我们就得到了想要的信息,服务器返回的内容可能如下:

  1. [
  2. {"day" : "2010/01/12", "tag" : "winter"},
  3. {"day" : "2010/01/13", "tag" : "soda"},
  4. {"day" : "2010/01/14", "tag" : "nosql"}
  5. ]

finalize可以对传递进来的参数进行修改,也可以返回一个新值。

2. 将函数作为键使用

有时分组所依据的条件可能会非常复杂,而不是单个键。比如要使用group计算每个类别有多少篇博客文章(每篇文章只属于一个类别)。由于不同作者的风格不同,填写分类名称时可能有人使用大写也有人使用小写。所以,如果要是按类别名来分组,最后“MongoDB”和“mongodb”就是两个完全不同的组。为了消除这种大小写的影响,就要定义一个函数来决定文档分组所依据的键。

定义分组函数就要用到$keyf键(注意不是"key"),使用"$keyf"group命令如下所示:

  1. > db.posts.group({"ns" : "posts",
  2. ... "$keyf" : function(x) { return x.category.toLowerCase(); },
  3. ... "initializer" : ... })

有了"$keyf",就能依据各种复杂的条件进行分组了。