- 第7章 聚合
- 7.1 聚合框架
- 7.2 管道操作符
- 1. 管道表达式
- 2. 数学表达式(mathematical expression)
- 3. 日期表达式(date expression)
- 4. 字符串表达式(string expression)
- 5. 逻辑表达式(logical expression)
- 6. 一个提取的例子
- 1. 分组操作符
- 2. 算术操作符
- 3. 极值操作符(extreme operator)
- 4. 数组操作符
- 5. 分组行为
- 7.3 MapReduce
- 1.
finalize
函数 - 2. 保存结果集合
- 3. 对文档子集执行MapReduce
- 4. 使用作用域
- 5. 获得更多的输出
- 7.4 聚合命令
- 1. 使用完成器
- 2. 将函数作为键使用
第7章 聚合
如果你有数据存储在MongoDB中,你想做的可能就不仅仅是将数据提取出来那么简单了;你可能希望对数据进行分析并加以利用。本章介绍MongoDB提供的聚合工具:
- 聚合框架;
- MapReduce;
- 几个简单聚合命令:
count
、distinct
和group
。
7.1 聚合框架
使用聚合框架可以对集合中的文档进行变换和组合。基本上,可以用多个构件创建一个管道(pipeline),用于对一连串的文档进行处理。这些构件包括筛选(filtering)、投射(projecting)、分组(grouping)、排序(sorting)、限制(limiting)和跳过(skipping)。
例如,有一个保存着杂志文章的集合,你可能希望找出发表文章最多的那个作者。假设每篇文章被保存为MongoDB中的一个文档,可以按照如下步骤创建管道。
- 将每个文章文档中的作者投射出来。
- 将作者按照名字排序,统计每个名字出现的次数。
- 将作者按照名字出现次数降序排列。
- 将返回结果限制为前5个。
这里面的每一步都对应聚合框架中的一个操作符:
{"$project" : {"author" : 1}}
这样可以将"author"
从每个文档中投射出来。
这个语法与查询中的字段选择器比较像:可以通过指定"fieldname" : 1
选择需要投射的字段,或者通过指定"fieldname":0
排除不需要的字段。执行完这个"$project"
操作之后,结果集中的每个文档都会以{"_id" : id, "author" : "authorName"}
这样的形式表示。这些结果只会在内存中存在,不会被写入磁盘。
{"$group" : {"_id" : "$author", "count" : {"$sum" : 1}}}
这样就会将作者按照名字排序,某个作者的名字每出现一次,就会对这个作者的"count"
加1。
这里首先指定了需要进行分组的字段"author"
。这是由"_id" : "$author"
指定的。可以将这个操作想象为:这个操作执行完后,每个作者只对应一个结果文档,所以"author"
就成了文档的唯一标识符("_id"
)。
第二个字段的意思是为分组内每个文档的"count"
字段加1。注意,新加入的文档中并不会有"count"
字段;这"$group"
创建的一个新字段。
执行完这一步之后,结果集中的每个文档会是这样的结构:{"_id" : "authorName", "count" : articleCount}
。
{"$sort" : {"count" : -1}}
这个操作会对结果集中的文档根据"count"
字段进行降序排列。
{"$limit" : 5}
这个操作将最终的返回结果限制为当前结果中的前5个文档。
在MongoDB中实际运行时,要将这些操作分别传给aggregate()
函数:
> db.articles.aggregate({"$project" : {"author" : 1}},
... {"$group" : {"_id" : "$author", "count" : {"$sum" : 1}}},
... {"$sort" : {"count" : -1}},
... {"$limit" : 5})
{
"result" : [
{
"_id" : "R. L. Stine",
"count" : 430
},
{
"_id" : "Edgar Wallace",
"count" : 175
},
{
"_id" : "Nora Roberts",
"count" : 145
},
{
"_id" : "Erle Stanley Gardner",
"count" : 140
},
{
"_id" : "Agatha Christie",
"count" : 85
}
],
"ok" : 1
}
aggregate()
会返回一个文档数组,其中的内容是发表文章最多的5个作者。
如果管道没有给出预期的结果,就需要进行调试,调试时,可以先只指定第一个管道操作符。如果这时得到了预期结果,那就再指定第二个管道操作符。以前面的例子来说,首先要试着只使用
"$project"
操作符进行聚合;如果这个操作符的结果是有效的,就再添加"$group"
操作符;如果结果还是有效的,就再添加"$sort"
;最后再添加"$limit"
操作符。这样就可以逐步定位到造成问题的操作符。
本书写作时,聚合框架还不能对集合进行写入操作,因此所有结果必须返回给客户端。所以,聚合的结果必须要限制在16 MB以内(MongoDB支持的最大响应消息大小)。
7.2 管道操作符
每个操作符都会接受一连串的文档,对这些文档做一些类型转换,最后将转换后的文档作为结果传递给下一个操作符(对于最后一个管道操作符,是将结果返回给客户端)。
不同的管道操作符可以按任意顺序组合在一起使用,而且可以被重复任意多次。例如,可以先做"$match"
,然后做"$group"
,然后再做"$match"
(与之前的"$match"
匹配不同的查询条件)。
7.2.1 $match
$match
用于对文档集合进行筛选,之后就可以在筛选得到的文档子集上做聚合。例如,如果想对Oregon(俄勒冈州,简写为OR)的用户做统计,就可以使用{$match : {"state" : "OR"}}
。"$match"
可以使用所有常规的查询操作符("$gt"
、"$lt"
、"$in"
等)。有一个例外需要注意:不能在"$match"
中使用地理空间操作符。
通常,在实际使用中应该尽可能将"$match"
放在管道的前面位置。这样做有两个好处:一是可以快速将不需要的文档过滤掉,以减少管道的工作量;二是如果在投射和分组之前执行"$match"
,查询可以使用索引。
7.2.2 $project
相对于“普通”的查询而言,管道中的投射操作更加强大。使用"$project"
可以从子文档中提取字段,可以重命名字段,还可以在这些字段上进行一些有意思的操作。
最简单的一个"$project"
操作是从文档中选择想要的字段。可以指定包含或者不包含一个字段,它的语法与查询中的第二个参数类似。如果在原来的集合上执行下面的代码,返回的结果文档中只包含一个"author"
字段。
> db.articles.aggregate({"$project" : {"author" : 1, "_id" : 0}})
默认情况下,如果文档中存在"_id"
字段,这个字段就会被返回("_id"
字段可以被一些管道操作符移除,也可能已经被之前的投射操作给移除了)。可以使用上面的代码将"_id"
从结果文档中移除。包含字段和排除字段的规则与常规查询中的语法一致。
也可以将投射过的字段进行重命名。例如,可以将每个用户文档的"_id"
在返回结果中重命名为"userId"
:
> db.users.aggregate({"$project" : {"userId" : "$_id", "_id" : 0}})
{
"result" : [
{
"userId" : ObjectId("50e4b32427b160e099ddbee7")
},
{
"userId" : ObjectId("50e4b32527b160e099ddbee8")
}
...
],
"ok" : 1
}
这里的"$fieldname"
语法是为了在聚合框架中引用fieldname
字段(上面的例子中是"_id"
)的值。例如,"$age"
会被替换为"age"
字段的内容(可能是数值,也可能是字符串),"$tags.3"
会被替换为tags
数组中的第4个元素。所以,上面例子中的"$_id"
会被替换为进入管道的每个文档的"_id"
字段的值。
注意,必须明确指定将"_id"
排除,否则这个字段的值会被返回两次:一次被标为"userId"
,一次被标为"_id"
。可以使用这种技术生成字段的多个副本,以便在之后的"$group"
中使用。
在对字段进行重命名时,MongoDB并不会记录字段的历史名称。因此,如果在"originalFieldname"
字段上有一个索引,聚合框架无法在下面的排序操作中使用这个索引,尽管人眼一下子就能看出下面代码中的"newFieldname"
与"originalFieldname"
表示同一个字段。
> db.articles.aggregate({"$project" : {"newFieldname" : "$originalFieldname"}},
... {"$sort" : {"newFieldname" : 1}})
所以,应该尽量在修改字段名称之前使用索引。
1. 管道表达式
最简单的"$project"
表达式是包含和排除字段,以及字段名称("$fieldname"
)。但是,还有一些更强大的选项。也可以使用表达式(expression)将多个字面量和变量组合在一个值中使用。
在聚合框架中有几个表达式可用来组合或者进行任意深度的嵌套,以便创建复杂的表达式。
2. 数学表达式(mathematical expression)
算术表达式可用于操作数值。指定一组数值,就可以使用这个表达式进行操作了。例如,下面的表达式会将"salary"
和"bonus"
字段的值相加。
> db.employees.aggregate(
... {
... "$project" : {
... "totalPay" : {
... "$add" : ["$salary", "$bonus"]
... }
... }
... })
可以将多个表达式嵌套在一起组成更复杂的表达式。假设我们想要从总金额中扣除为401(k)〔1〕缴纳的金额。可以使用"$subtract"
表达式:
1 401(k)是美国的一种养老金计划。——译者注
> db.employees.aggregate(
... {
... "$project" : {
... "totalPay" : {
... "$subtract" : [{"$add" : ["$salary", "$bonus"]}, "$401k"]
... }
... }
... })
表达式可以进行任意层次的嵌套。
下面是每个操作符的语法:
"$add" : [expr1[, expr2, …, exprN]]
这个操作符接受一个或多个表达式作为参数,将这些表达式相加。
"$subtract" : [expr1, expr2]
接受两个表达式作为参数,用第一个表达式减去第二个表达式作为结果。
"$multiply" : [expr1[, expr2, …, exprN]]
接受一个或者多个表达式,并且将它们相乘。
"$divide" : [expr1, expr2]
接受两个表达式,用第一个表达式除以第二个表达式的商作为结果。
"$mod" : [expr1, expr2]
接受两个表达式,将第一个表达式除以第二个表达式得到的余数作为结果。
3. 日期表达式(date expression)
许多聚合都是基于时间的:上周发生了什么?上个月发生了什么?过去一年间发生了什么?因此,聚合框架中包含了一些用于提取日期信息的表达式:"$year"
、“$month”
、"$week"
、"$dayOfMonth"
、"$dayOfWeek"
、"$dayOfYear"
、"$hour"
、"$minute"
和"$second"
。只能对日期类型的字段进行日期操作,不能对数值类型字段做日期操作。
每种日期类型的操作都是类似的:接受一个日期表达式,返回一个数值。下面的代码会返回每个雇员入职的月份:
> db.employees.aggregate(
... {
... "$project" : {
... "hiredIn" : {"$month" : "$hireDate"}
... }
... })
也可以使用字面量日期。下面的代码会计算出每个雇员在公司内的工作时间:
> db.employees.aggregate(
... {
... "$project" : {
... "tenure" : {
... "$subtract" : [{"$year" : new Date()}, {"$year" : "$hireDate"}]
... }
... }
... })
4. 字符串表达式(string expression)
也有一些基本的字符串操作可以使用,它们的签名如下所示:
"$substr" : [expr, startOffset, numToReturn]
其中第一个参数expr
必须是个字符串,这个操作会截取这个字符串的子串(从第startOffset
字节开始的numToReturn
字节,注意,是字节,不是字符。在多字节编码中尤其要注意这一点)expr
必须是字符串。
"$concat" : [expr1[, expr2, …, exprN]]
将给定的表达式(或者字符串)连接在一起作为返回结果。
"$toLower" : expr
参数expr
必须是个字符串值,这个操作返回expr
的小写形式。
"$toUpper" : expr
参数expr
必须是个字符串值,这个操作返回expr
的大写形式。
改变字符大小写的操作,只保证对罗马字符有效。
下面是一个生成 j.doe@example.com格式的email地址的例子。它提取"$firstname"
的第一个字符,将其与多个常量字符串和"$lastname"
连接成一个字符串:
> db.employees.aggregate(
... {
... "$project" : {
... "email" : {
... "$concat" : [
... {"$substr" : ["$firstName", 0, 1]},
... ".",
... "$lastName",
... "@example.com"
... ]
... }
... }
... })
5. 逻辑表达式(logical expression)
有一些逻辑表达式可以用于控制语句。
下面是几个比较表达式。
"$cmp" : [expr1, expr2]
比较expr1
和expr2
。如果expr1
等于expr2
,返回0;如果expr1
< expr2
,返回一个负数;如果expr1
>expr2
,返回一个正数。
"$strcasecmp" : [string1, string2]
比较string1
和string2
,区分大小写。只对罗马字符组成的字符串有效。
"$eq"/"$ne"/"$gt"/"$gte"/"$lt"/"$lte" : [expr1, expr2]
对expr1
和expr2
执行相应的比较操作,返回比较的结果(true
或false
)。
下面是几个布尔表达式。
"$and" : [expr1[, expr2, …, exprN]]
如果所有表达式的值都是true
,那就返回true
,否则返回false
。
"$or" : [expr1[, expr2, …, exprN]]
只要有任意表达式的值为true
,就返回true
,否则返回false
。
"$not" : expr
对expr
取反。
还有两个控制语句。
"$cond" : [booleanExpr, trueExpr, falseExpr]
如果booleanExpr
的值是true
,那就返回trueExpr
,否则返回falseExpr
。
"$ifNull" : [expr, replacementExpr]
如果expr
是null
,返回replacementExpr
,否则返回expr
。
通过这些操作符,就可以在聚合中使用更复杂的逻辑,可以对不同数据执行不同的代码,得到不同的结果。
管道对于输入数据的形式有特定要求,所以这些操作符在传入数据时要特别注意。算术操作符必须接受数值,日期操作符必须接受日期,字符串操作符必须接受字符串,如果有字符缺失,这些操作符就会报错。如果你的数据集不一致,可以通过这个条件来检测缺失的值,并且进行填充。
6. 一个提取的例子
假如有个教授想通过某种比较复杂的计算为学生打分:出勤率占10%,日常测验成绩占30%,期末考试占60%(如果是老师最宠爱的学生,那么分数就是100)。可以使用如下代码:
> db.students.aggregate(
... {
... "$project" : {
... "grade" : {
... "$cond" : [
... "$teachersPet",
... 100, // if
... { // else
... "$add" : [
... {"$multiply" : [.1, "$attendanceAvg"]},
... {"$multiply" : [.3, "$quizzAvg"]},
... {"$multiply" : [.6, "$testAvg"]}
... ]
... }
... ]
... }
... }
... })
7.2.3 $group
$group
操作可以将文档依据特定字段的不同值进行分组。下面是几个分组的例子。
- 如果我们以分钟作为计量单位,希望找出每天的平均湿度,就可以根据
"day"
字段进行分组。 - 如果有一个学生集合,希望按照分数等级将学生分为多个组,可以根据
"grade"
字段进行分组。 - 如果有一个用户集合,希望知道每个城市有多少用户,可以根据
"state"
和"city"
两个字段对集合进行分组,每个"city"/"state"
对对应一个分组。不应该只根据"city"
字段进行分组,因为不同的州可能拥有相同名字的城市。
如果选定了需要进行分组的字段,就可以将选定的字段传递给"$group"
函数的"_id"
字段。对于上面的例子,相应的代码如下:
{"$group" : {"_id" : "$day"}}
{"$group" : {"_id" : "$grade"}}
{"$group" : {"_id" : {"state" : "$state", "city" : "$city"}}}
如果执行这些代码,结果集中每个分组对应一个只有一个字段(分组键)的文档。例如,按学生分数等级进行分组的结果可能是:{"result" : [{"_id" : "A+"}, {"_id" : "A"}, {"_id" : "A-"}, …, {"_id" : "F"}], "ok" : 1}
。通过上面这些代码,可以得到特定字段中每一个不同的值,但是所有例子都要求基于这些分组进行一些计算。因此,可以添加一些字段,使用分组操作符对每个分组中的文档做一些计算。
1. 分组操作符
这些分组操作符允许对每个分组进行计算,得到相应的结果。7.1节介绍过"$sum"
分组操作符的作用:分组中每出现一个文档,它就对计算结果加1,这样便可以得到每个分组中的文档数量。
2. 算术操作符
有两个操作符可以用于对数值类型字段的值进行计算:"$sum"
和"$average"
。
"$sum" : value
对于分组中的每一个文档,将
value
与计算结果相加。注意,上面的例子中使用了一个字面量数字1,但是这里也可以使用比较复杂的值。例如,如果有一个集合,其中的内容是各个国家的销售数据,使用下面的代码就可以得到每个国家的总收入:
> db.sales.aggregate(
... {
... "$group" : {
... "_id" : "$country",
... "totalRevenue" : {"$sum" : "$revenue"}
... }
... })
"$avg" : value
返回每个分组的平均值。
例如,下面的代码会返回每个国家的平均收入,以及每个国家的销量:
> db.sales.aggregate(
... {
... "$group" : {
... "_id" : "$country",
... "totalRevenue" : {"$avg" : "$revenue"},
... "numSales" : {"$sum" : 1}
... }
... })
3. 极值操作符(extreme operator)
下面的四个操作符可用于得到数据集合中的“边缘”值。
"$max" : expr
返回分组内的最大值。"$min" : expr
返回分组内的最小值。
"$first" : expr
返回分组的第一个值,忽略后面所有值。只有排序之后,明确知道数据顺序时这个操作才有意义。"$last" : expr
与
"$first"
相反,返回分组的最后一个值。
"$max"
和"$min"
会查看每一个文档,以便得到极值。因此,如果数据是无序的,这两个操作符也可以有效工作;如果数据是有序的,这两个操作符就会有些浪费。假设有一个存有学生考试成绩的数据集,需要找到其中的最高分与最低分:
> db.scores.aggregate(
... {
... "$group" : {
... "_id" : "$grade",
... "lowestScore" : {"$min" : "$score"},
... "highestScore" : {"$max" : "$score"}
... }
... })
另一方面,如果数据集是按照希望的字段排序过的,那么"$first"
和"$last"
操作符就会非常有用。下面的代码与上面的代码可以得到同样的结果:
> db.scores.aggregate(
... {
... "$sort" : {"score" : 1}
... },
... {
... "$group" : {
... "_id" : "$grade",
... "lowestScore" : {"$first" : "$score"},
... "highestScore" : {"$last" : "$score"}
... }
... })
如果数据是排过序的,那么$first
和$last
会比$min
和$max
效率更高。如果不准备对数据进行排序,那么直接使用$min
和$max
会比先排序再使用$first
和$last
效率更高。
4. 数组操作符
有两个操作符可以进行数组操作。
"$addToSet" : expr
如果当前数组中不包含expr
,那就将它添加到数组中。在返回结果集中,每个元素最多只出现一次,而且元素的顺序是不确定的。
"$push" : expr
不管
expr
是什么值,都将它添加到数组中。返回包含所有值的数组。
5. 分组行为
有两个操作符不能用前面介绍的流式工作方式对文档进行处理,"$group"
是其中之一。大部分操作符的工作方式都是流式的,只要有新文档进入,就可以对新文档进行处理,但是"$group"
必须要等收到所有的文档之后,才能对文档进行分组,然后才能将各个分组发送给管道中的下一个操作符。这意味着,在分片的情况下,"$group"
会先在每个分片上执行,然后各个分片上的分组结果会被发送到mongos再进行最后的统一分组,剩余的管道工作也都是在mongos(而不是在分片)上运行的。
7.2.4 $unwind
拆分(unwind)可以将数组中的每一个值拆分为单独的文档。例如,如果有一篇拥有多条评论的博客文章,可以使用$unwind
将每条评论拆分为一个独立的文档:
> db.blog.findOne()
{
"_id" : ObjectId("50eeffc4c82a5271290530be"),
"author" : "k",
"post" : "Hello, world!",
"comments" : [
{
"author" : "mark",
"date" : ISODate("2013-01-10T17:52:04.148Z"),
"text" : "Nice post"
},
{
"author" : "bill",
"date" : ISODate("2013-01-10T17:52:04.148Z"),
"text" : "I agree"
}
]
}
> db.blog.aggregate({"$unwind" : "$comments"})
{
"results" :
{
"_id" : ObjectId("50eeffc4c82a5271290530be"),
"author" : "k",
"post" : "Hello, world!",
"comments" : {
"author" : "mark",
"date" : ISODate("2013-01-10T17:52:04.148Z"),
"text" : "Nice post"
}
},
{
"_id" : ObjectId("50eeffc4c82a5271290530be"),
"author" : "k",
"post" : "Hello, world!",
"comments" : {
"author" : "bill",
"date" : ISODate("2013-01-10T17:52:04.148Z"),
"text" : "I agree"
}
}
],
"ok" : 1
}
如果希望在查询中得到特定的子文档,这个操作符就会非常有用:先使用"$unwind"
得到所有子文档,再使用"$match"
得到想要的文档。例如,如果要得到特定用户的所有评论(只需要得到评论,不需要返回评论所属的文章),使用普通的查询是不可能做到的。但是,通过提取、拆分、匹配,就很容易了:
> db.blog.aggregate({"$project" : {"comments" : "$comments"}},
... {"$unwind" : "$comments"},
... {"$match" : {"comments.author" : "Mark"}})
由于最后得到的结果仍然是一个"comments"
子文档,所以你可能希望再做一次投射,以便让输出结果更优雅。
7.2.5 $sort
可以根据任何字段(或者多个字段)进行排序,与在普通查询中的语法相同。如果要对大量的文档进行排序,强烈建议在管道的第一阶段进行排序,这时的排序操作可以使用索引。否则,排序过程就会比较慢,而且会占用大量内存。
可以在排序中使用文档中实际存在的字段,也可以使用在投射时重命名的字段:
> db.employees.aggregate(
... {
... "$project" : {
... "compensation" : {
... "$add" : ["$salary", "$bonus"]
... },
... "name" : 1
... }
... },
... {
... "$sort" : {"compensation" : -1, "name" : 1}
... })
这个例子会对员工排序,最终的结果是按照报酬从高到低,姓名从A到Z的顺序排列。
排序方向可以是1(升序)和-1(降序)。
与前面讲过的"$group
"一样,"$sort"
也是一个无法使用流式工作方式的操作符。"$sort"
也必须要接收到所有文档之后才能进行排序。在分片环境下,先在各个分片上进行排序,然后将各个分片的排序结果发送到mongos做进一步处理。
7.2.6 $limit
$limit
会接受一个数字n,返回结果集中的前n个文档。
7.2.7 $skip
$skip
也是接受一个数字n,丢弃结果集中的前n个文档,将剩余文档作为结果返回。在“普通”查询中,如果需要跳过大量的数据,那么这个操作符的效率会很低。在聚合中也是如此,因为它必须要先匹配到所有需要跳过的文档,然后再将这些文档丢弃。
7.2.8 使用管道
应该尽量在管道的开始阶段(执行"$project"
、"$group"
或者"$unwind"
操作之前)就将尽可能多的文档和字段过滤掉。管道如果不是直接从原先的集合中使用数据,那就无法在筛选和排序中使用索引。如果可能,聚合管道会尝试对操作进行排序,以便能够有效使用索引。
MongoDB不允许单一的聚合操作占用过多的系统内存:如果MongoDB发现某个聚合操作占用了20%以上的内存,这个操作就会直接输出错误。允许将输出结果利用管道放入一个集合中是为了方便以后使用(这样可以将所需的内存减至最小)。
如果能够通过"$match"
操作迅速减小结果集的大小,就可以使用管道进行实时聚合。由于管道会不断包含更多的文档,会越来越复杂,所以几乎不可能实时得到管道的操作结果。
7.3 MapReduce
MapReduce是聚合工具中的明星,它非常强大、非常灵活。有些问题过于复杂,无法使用聚合框架的查询语言来表达,这时可以使用MapReduce。MapReduce使用JavaScript作为“查询语言”,因此它能够表达任意复杂的逻辑。然而,这种强大是有代价的:MapReduce非常慢,不应该用在实时的数据分析中。
MapReduce能够在多台服务器之间并行执行。它会将一个大问题分割为多个小问题,将各个小问题发送到不同的机器上,每台机器只负责完成一部分工作。所有机器都完成时,再将这些零碎的解决方案合并为一个完整的解决方案。
MapReduce需要几个步骤。最开始是映射(map),将操作映射到集合中的每个文档。这个操作要么“无作为”,要么“产生一些键和X个值”。然后就是中间环节,称作洗牌(shuffle),按照键分组,并将产生的键值组成列表放到对应的键中。化简(reduce)则把列表中的值化简成一个单值。这个值被返回,然后接着进行洗牌,直到每个键的列表只有一个值为止,这个值也就是最终结果。
下面会多举几个MapReduce的例子,这个工具非常强大,但也有点复杂。
7.3.1 示例1:找出集合中的所有键
用MapReduce来解决这个问题有点大材小用,不过还是一种了解其机制的不错的方式。要是已经知道MapReduce的原理,则直接跳到本节最后,看看MongoDB中MapReduce的使用注意事项。
MongoDB会假设你的模式是动态的,所以并不跟踪记录每个文档中的键。通常找到集合中所有文档所有键的最好方式就是用MapReduce。在本例中,会记录每个键出现了多少次。内嵌文档中的键就不计算了,但给map
函数做个简单修改就能实现这个功能了。
在映射环节,我们希望得到集合中每个文档的所有键。map
函数使用特别的emit
函数“返回”要处理的值。emit
会给MapReduce一个键(类似于前面$group
所使用的键)和一个值。这里用emit
将文档某个键的计数(count)返回({count : 1}
)。我们想为每个键单独计数,所以为文档中的每个键调用一次emit
。this
就是当前映射文档的引用:
> map = function() {
... for (var key in this) {
... emit(key, {count : 1});
... }};
这样就有了许许多多{count : 1}
文档,每一个都与集合中的一个键相关。这种由一个或多个{count : 1}
文档组成的数组,会传递给reduce
函数。reduce
函数有两个参数,一个是key
,也就是emit
返回的第一个值,还有另外一个数组,由一个或者多个与键对应的{count : 1}
文档组成。
> reduce = function(key, emits) {
... total = 0;
... for (var i in emits) {
... total += emits[i].count;
... }
... return {"count" : total};
... }
reduce
一定要能够在之前的map
阶段或者前一个reduce
阶段的结果上反复执行。所以reduce
返回的文档必须能作为reduce
的第二个参数的一个元素。例如,x
键映射到了3个文档{count : 1,id : 1}
、{count : 1,id : 2}
和{count : 1,id : 3}
,其中id
键只用于区分不同的文档。MongoDB可能会这样调用reduce
:
> r1 = reduce("x", [{count : 1, id : 1}, {count : 1, id : 2}])
{count : 2}
> r2 = reduce("x", [{count : 1, id : 3}])
{count : 1}
> reduce("x", [r1, r2])
{count : 3}
不能认为第二个参数总是初始文档之一(比如{count:1}
)或者长度固定。reduce
应该能处理emit
文档和其他reduce
返回结果的各种组合。
总之,MapReduce函数可能会是下面这样:
> mr = db.runCommand({"mapreduce" : "foo", "map" : map, "reduce" : reduce})
{
"result" : "tmp.mr.mapreduce_1266787811_1",
"timeMillis" : 12,
"counts" : {
"input" : 6
"emit" : 14
"output" : 5
},
"ok" : true
}
MapReduce返回的文档包含很多与操作有关的元信息。
"result" : "tmp.mr.mapreduce_1266787811_1"
这是存放MapReduce结果的集合名。这是个临时集合,MapReduce的连接关闭后它就被自动删除了。本章稍后会介绍如何指定一个好一点的名字以及将结果集合持久化。
"timeMillis" : 12
操作花费的时间,单位是毫秒。
"counts" : { … }
这个内嵌文档主要用作调试,其中包含3个键。
"input" : 6
发送到map
函数的文档个数。
"emit" : 14
在
map
函数中emit
被调用的次数。"output" : 5
结果集合中的文档数量。
对结果集合进行查询会发现原有集合的所有键及其计数:
> db[mr.result].find()
{ "_id" : "_id", "value" : { "count" : 6 } }
{ "_id" : "a", "value" : { "count" : 4 } }
{ "_id" : "b", "value" : { "count" : 2 } }
{ "_id" : "x", "value" : { "count" : 1 } }
{ "_id" : "y", "value" : { "count" : 1 } }
这个结果集中的每个"_id"
对应原集合中的一个键,"value"
键的值就是reduce
的最终结果。
7.3.2 示例2:网页分类
假设有个网站,人们可以提交其他网页的链接,比如reddit(http://www.reddit.com)。提交者可以给这个链接添加标签,表明主题,比如politics、geek或者icanhascheezburger。可以用MapReduce找出哪个主题最为热门,热门与否由最近的投票决定。
首先,建立一个map
函数,发出(emit)标签和一个基于流行度和新旧程度的值。
map = function() {
for (var i in this.tags) {
var recency = 1/(new Date() - this.date);
var score = recency * this.score;
emit(this.tags[i], {"urls" : [this.url], "score" : score});
}
};
现在就化简同一个标签的所有值,以得到这个标签的分数:
reduce = function(key, emits) {
var total = {urls : [], score : 0}
for (var i in emits) {
emits[i].urls.forEach(function(url) {
total.urls.push(url);
}
total.score += emits[i].score;
}
return total;
};
最终的集合包含每个标签的URL列表和表示该标签流行程度的分数。
7.3.3 MongoDB和MapReduce
前面两个例子只用到了mapreduce
、map
和reduce
键。这3个键是必需的,但是MapReduce命令还有很多可选的键。
"finalize" : function
可以将
reduce
的结果发送给这个键,这是整个处理过程的最后一步。"keeptemp" : boolean
如果为值为
true
,那么在连接关闭时会将临时结果集合保存下来,否则不保存。"out" : string
输出集合的名称。如果设置了这选项,系统会自动设置
keeptemp : true
。"query" : document
在发往
map
函数前,先用指定条件过滤文档。"sort" : document
在发往
map
前先给文档排序(与limit
一同使用非常有用)。"limit" : integer
发往
map
函数的文档数量的上限。"scope" : document
可以在JavaScript代码中使用的变量。
"verbose" : boolean
是否记录详细的服务器日志。
1. finalize
函数
和group
命令一样,MapReduce也可以使用finalize
函数作为参数。它会在最后一个reduce
输出结果后执行,然后将结果存到临时集合中。
返回体积比较大的结果集对MapReduce不是什么大不了的事情,因为它不像group
那样有4 MB的限制。然而,信息总是要传递出去的,通常来说,finalize
是计算平均数、裁剪数组、清除多余信息的好时机。
2. 保存结果集合
默认情况下,Mongo会在执行MapReduce时创建一个临时集合,集合名是系统选的一个不太常用的名字,将"mr"
、执行MapReduce的集合名、时间戳以及数据库作业ID,用“.”连成一个字符串,这就是临时集合的名字。结果产生形如mr.stuff.18234210220.2这样的名字。MongoDB会在调用的连接关闭时自动销毁这个集合(也可以在用完之后手动删除)。如果希望保存这个集合,就要将keeptemp
选项指定为true
。
如果要经常使用这个临时集合,你可能想给它起个好点的名字。利用out
选项(该选项接受字符串作为参数)就可以为临时集合指定一个易读易懂的名字。如果用了out
选项,就不必指定keeptemp : true
了,因为指定out
选项时系统会将keeptemp
设置为true
。即便你取了一个非常好的名字,MongoDB也会在MapReduce的中间过程使用自动生成的集合名。处理完成后,会自动将临时集合的名字更改为你指定的集合名,这个重命名的过程是原子性的。也就是说,如果多次对同一个集合调用MapReduce,也不会在操作中遇到集合不完整的情况。
MapReduce产生的集合就是一个普通的集合,在这个集合上执行MapReduce完全没有问题,或者在前一个MapReduce的结果上执行MapReduce也没有问题,如此往复直到无穷都没问题!
3. 对文档子集执行MapReduce
有时需要对集合的一部分执行MapReduce。只需在传给map
函数前使用查询对文档进行过滤就好了。
每个传递给map
函数的文档都要先反序列化,从BSON对象转换为JavaScript对象,这个过程非常耗时。如果事先知道只需要对集合的一部分文档执行MapReduce,那么在map
之前先对文档进行过滤可以极大地提高map
速度。可以通过"query"
、"limit"
和"sort"
等键对文档进行过滤。
"query"
键的值是一个查询文档。通常查询返回的结果会传递给map
函数。例如,有一个做跟踪分析的应用程序,现在我们需要上周的总结摘要,只要使用如下命令对上周的文档执行MapReduce就好了:
> db.runCommand({"mapreduce" : "analytics", "map" : map, "reduce" : reduce,
"query" : {"date" : {"$gt" : week_ago}}})
sort
选项和limit
一起使用时通常能够发挥非常大的作用。limit
也可以单独使用,用来截取一部分文档发送给map
函数。
如果在上个例子中想分析最近10 000个页面的访问次数(而不是最近一周的),就可以使用limit
和sort
:
> db.runCommand({"mapreduce" : "analytics", "map" : map, "reduce" : reduce,
"limit" : 10000, "sort" : {"date" : -1}})
query
、limit
、sort
可以随意组合,但是如果不使用limit
的话,sort
就不能有效发挥作用。
4. 使用作用域
MapReduce可以为map
、reduce
、finalize
函数都采用一种代码类型。但多数语言里,可以指定传递代码的作用域。然而MapReduce会忽略这个作用域。它有自己的作用域键"scope"
,如果想在MapReduce中使用客户端的值,则必须使用这个参数。可以用“变量名 : 值”这样的普通文档来设置该选项,然后在map
、reduce
和finalize
函数中就能使用了。作用域在这些函数内部是不变的。例如,上一节的例子使用1/(newDate() - this.date)
计算页面的新旧程度。可以将当前日期作为作用域的一部分传递进去:
> db.runCommand({"mapreduce" : "webpages", "map" : map, "reduce" : reduce,
"scope" : {now : new Date()}})
这样,在map
函数中就能计算1/(now - this.date)
了。
5. 获得更多的输出
还有个用于调试的详细输出选项。如果想看看MapReduce的运行过程,可以将"verbose"
指定为true
。
也可以用print
把map
、reduce
、finalize
过程中的信息输出到服务器日志上。
7.4 聚合命令
MongoDB为在集合上执行基本的聚合任务提供了一些命令。这些命令在聚合框架出现之前就已经存在了,现在(大多数情况下)已经被聚合框架取代。然而,复杂的group
操作可能仍然需要使用JavaScript,count
和distinct
操作可以被简化为普通命令,不需要使用聚合框架。
7.4.1 count
count
是最简单的聚合工具,用于返回集合中的文档数量:
> db.foo.count()
0
> db.foo.insert({"x" : 1})
> db.foo.count()
1
不论集合有多大,count
都会很快返回总的文档数量。
也可以给count
传递一个查询文档,Mongo会计算查询结果的数量:
> db.foo.insert({"x" : 2})
> db.foo.count()
2
> db.foo.count({"x" : 1})
1
对分页显示来说总数非常必要:“共439个,目前显示0~10个”。但是,增加查询条件会使count
变慢。count
可以使用索引,但是索引并没有足够的元数据供count
使用,所以不如直接使用查询来得快。
7.4.2 distinct
distinct
用来找出给定键的所有不同值。使用时必须指定集合和键。
> db.runCommand({"distinct" : "people", "key" : "age"})
假设集合中有如下文档:
{"name" : "Ada", "age" : 20}
{"name" : "Fred", "age" : 35}
{"name" : "Susan", "age" : 60}
{"name" : "Andy", "age" : 35}
如果对"age"
键使用distinct
,会得到所有不同的年龄:
> db.runCommand({"distinct" : "people", "key" : "age"})
{"values" : [20, 35, 60], "ok" : 1}
这里还有一个常见问题:有没有办法获得集合里面所有不同的键呢?MongoDB并没有直接提供这样的功能,但是可以用MapReduce(详见7.3节)自己写一个。
7.4.3 group
使用group
可以执行更复杂的聚合。先选定分组所依据的键,而后MongoDB就会将集合依据选定键的不同值分成若干组。然后可以对每一个分组内的文档进行聚合,得到一个结果文档。
如果你熟悉SQL,那么这个
group
和SQL中的GROUP BY
差不多。
假设现在有个跟踪股票价格的站点。从上午10点到下午4点每隔几分钟就会更新某只股票的价格,并保存在MongoDB中。现在报表程序要获得近30天的收盘价。用group
就可以轻松办到。
股价集合中包含数以千计如下形式的文档:
{"day" : "2010/10/03", "time" : "10/3/2010 03:57:01 GMT-400", "price" : 4.23}
{"day" : "2010/10/04", "time" : "10/4/2010 11:28:39 GMT-400", "price" : 4.27}
{"day" : "2010/10/03", "time" : "10/3/2010 05:00:23 GMT-400", "price" : 4.10}
{"day" : "2010/10/06", "time" : "10/6/2010 05:27:58 GMT-400", "price" : 4.30}
{"day" : "2010/10/04", "time" : "10/4/2010 08:34:50 GMT-400", "price" : 4.01}
注意,由于精度的问题,实际使用中不要将金额以浮点数的方式存储,这个例子只是为了简便才这么做。
我们需要的结果列表中应该包含每天的最后交易时间和价格,就像下面这样:
[
{"time" : "10/3/2010 05:00:23 GMT-400", "price" : 4.10},
{"time" : "10/4/2010 11:28:39 GMT-400", "price" : 4.27},
{"time" : "10/6/2010 05:27:58 GMT-400", "price" : 4.30}
]
先把集合按照"day"
字段进行分组,然后在每个分组中查找"time"
值最大的文档,将其添加到结果集中就完成了。整个过程如下所示:
> db.runCommand({"group" : {
... "ns" : "stocks",
... "key" : "day",
... "initial" : {"time" : 0},
... "$reduce" : function(doc, prev) {
... if (doc.time > prev.time) {
... prev.price = doc.price;
... prev.time = doc.time;
... }
... }}})
把这个命令分解开看看。
"ns" : "stocks"
指定要进行分组的集合。
"key" : "day"
指定文档分组依据的键。这里就是
"day"
键。所有"day"
值相同的文档被分到一组。"initial" : {"time" : 0}
每一组
reduce
函数调用中的初始"time"
值,会作为初始文档传递给后续过程。每一组的所有成员都会使用这个累加器,所以它的任何变化都可以保存下来。"$reduce" : function(doc, prev) { … }
这个函数会在集合内的每个文档上执行。系统会传递两个参数:当前文档和累加器文档(本组当前的结果)。本例中,想让reduce
函数比较当前文档的时间和累加器的时间。如果当前文档的时间更晚一些,则将累加器的日期和价格替换为当前文档的值。别忘了,每一组都有一个独立的累加器,所以不必担心不同日期的命令会使用同一个累加器。
在问题一开始的描述中,就提到只要最近30天的股价。然而,我们在这里迭代了整个集合。这就是要添加"condition"
的原因,因为这样就可以只对必要的文档进行处理。
> db.runCommand({"group" : {
... "ns" : "stocks",
... "key" : "day",
... "initial" : {"time" : 0},
... "$reduce" : function(doc, prev) {
... if (doc.time > prev.time) {
... prev.price = doc.price;
... prev.time = doc.time;
... }},
... "condition" : {"day" : {"$gt" : "2010/09/30"}}
... }})
有些参考资料提及
"cond"
键或者"q"
键,其实和"condition"
键是完全一样的(就是表达力不如"condition"
好)。
最后就会返回一个包含30个文档的数组,其实每个文档都是一个分组。每组都包含分组依据的键(这里就是"day" : string
)以及这组最终的prev
值。如果有的文档不存在指定用于分组的键,这些文档会被单独分为一组,缺失的键会使用"day : null"
这样的形式。在"condition"
中加入"day" : {"$exists" : true}
就可以排除不包含指定用于分组的键的文档。group
命令同时返回了用到的文档总数和"key"
的不同值数量:
> db.runCommand({"group" : {...}})
{
"retval" :
[
{
"day" : "2010/10/04",
"time" : "Mon Oct 04 2010 11:28:39 GMT-0400 (EST)"
"price" : 4.27
},
...
],
"count" : 734,
"keys" : 30,
"ok" : 1
}
这里每组的"price"
都是显式设置的,"time"
先由初始化器设置,然后在迭代中进行更新。"day"
是默认被加进去的,因为用于分组的键会默认加入到每个"retval"
内嵌文档中。要是不想在结果集中看到这个键,可以用完成器将累加器文档变为任何想要的形态,甚至变换成非文档(例如数字或字符串)。
1. 使用完成器
完成器(finalizer)用于精简从数据库传到用户的数据,这个步骤非常重要,因为group
命令的输出结果需要能够通过单次数据库响应返回给用户。为进一步说明,这里举个博客的例子,其中每篇文章都有多个标签(tag)。现在要找出每天最热门的标签。可以(再一次)按天分组,得到每一个标签的计数。就像下面这样:
> db.posts.group({
... "key" : {"day" : true},
... "initial" : {"tags" : {}},
... "$reduce" : function(doc, prev) {
... for (i in doc.tags) {
... if (doc.tags[i] in prev.tags) {
... prev.tags[doc.tags[i]]++;
... } else {
... prev.tags[doc.tags[i]] = 1;
... }
... }
... }})
得到的结果如下所示:
[
{"day" : "2010/01/12", "tags" : {"nosql" : 4, "winter" : 10, "sledding" : 2}},
{"day" : "2010/01/13", "tags" : {"soda" : 5, "php" : 2}},
{"day" : "2010/01/14", "tags" : {"python" : 6, "winter" : 4, "nosql": 15}}
]
接着可以在客户端找出"tags"
文档中出现次数最多的标签。然而,向客户端发送每天所有的标签文档需要许多额外的开销——每天所有的键/值对都被传送给用户,而我们需要的仅仅是一个字符串。这也就是group
有一个可选的"finalize"
键的原因。"finalize"
可以包含一个函数,在每组结果传递到客户端之前调用一次。可以使用"finalize"
函数将不需要的内容从结果集中移除:
> db.runCommand({"group" : {
... "ns" : "posts",
... "key" : {"day" : true},
... "initial" : {"tags" : {}},
... "$reduce" : function(doc, prev) {
... for (i in doc.tags) {
... if (doc.tags[i] in prev.tags) {
... prev.tags[doc.tags[i]]++;
... } else {
... prev.tags[doc.tags[i]] = 1;
... }
... },
... "finalize" : function(prev) {
... var mostPopular = 0;
... for (i in prev.tags) {
... if (prev.tags[i] > mostPopular) {
... prev.tag = i;
... mostPopular = prev.tags[i];
... }
... }
... delete prev.tags
... }}})
现在,我们就得到了想要的信息,服务器返回的内容可能如下:
[
{"day" : "2010/01/12", "tag" : "winter"},
{"day" : "2010/01/13", "tag" : "soda"},
{"day" : "2010/01/14", "tag" : "nosql"}
]
finalize
可以对传递进来的参数进行修改,也可以返回一个新值。
2. 将函数作为键使用
有时分组所依据的条件可能会非常复杂,而不是单个键。比如要使用group
计算每个类别有多少篇博客文章(每篇文章只属于一个类别)。由于不同作者的风格不同,填写分类名称时可能有人使用大写也有人使用小写。所以,如果要是按类别名来分组,最后“MongoDB”和“mongodb”就是两个完全不同的组。为了消除这种大小写的影响,就要定义一个函数来决定文档分组所依据的键。
定义分组函数就要用到$keyf
键(注意不是"key"
),使用"$keyf"
的group
命令如下所示:
> db.posts.group({"ns" : "posts",
... "$keyf" : function(x) { return x.category.toLowerCase(); },
... "initializer" : ... })
有了"$keyf"
,就能依据各种复杂的条件进行分组了。