如果你有数据存储在MongoDB中,你想做的可能就不仅仅是将数据提取出来那么简单了;你可能希望对数据进行分析并加以利用,因此本节介绍MongoDB提供的聚合工具。
聚合框架
使用聚合框架可以对集合中的文档进行变换和组合。基本上,可以用多个构件创建一个管道(pipeline),用于对一连串的文档进行处理。这些构件包括筛选(filtering)、投射(projecting)、分组(grouping)、排序(sorting)、限制(limiting)和跳过(skipping)。
下面是一个例子:
如果有一个test集合,里面有以下文档:
{ "_id" : ObjectId("60445f35b6e04e8e7e360e2e"), "name" : "zbp", "age" : 24, "job" : "programme" }
{ "_id" : ObjectId("60445f48b6e04e8e7e360e2f"), "name" : "zbp1", "age" : 23, "job" : "job1" }
{ "_id" : ObjectId("60445f55b6e04e8e7e360e30"), "name" : "yf", "age" : 25, "job" : "programme" }
{ "_id" : ObjectId("60445f68b6e04e8e7e360e31"), "name" : "ww", "age" : 36, "job" : "manager" }
{ "_id" : ObjectId("60445f79b6e04e8e7e360e32"), "name" : "zbp", "age" : 24, "job" : "job3" }
现在我希望获取job字段统计,获取每种job的数量count并将结果按count倒序排序,只显示5条结果。
test.aggregate(
{"$project":{"job":1}}, // 投射
{"$group":{"_id":"$job", "count":{"$sum":1}}}, //分组
{"$sort":{"count":-1}},
{"$limit":5}
)
结果为:
{ "_id" : "programme", "count" : 2 }
{ "_id" : "job3", "count" : 1 }
{ "_id" : "manager", "count" : 1 }
{ "_id" : "job1", "count" : 1 }
aggregate()就是聚合管道函数。
上面的操作其实很像是linux中的管道操作,这里使用了4个管道操作符($project、$group、$sort和$limit),每个操作符都会接受一连串的文档,对这些文档做一些处理和转换,最后将处理后的文档作为结果传递给下一个操作符(对于最后一个管道操作符,是将结果返回给客户端)。
管道操作符
$match
用于对文档集合进行筛选,之后就可以在筛选得到的文档子集上做聚合。相当于where和having条件查询,只不过$match是用在聚合中的条件筛选。
例如,上面的例子中,我希望统计年龄在25岁以下的job的情况,就可以使用:
test.aggregate(
{"$match":{"age":{"$lt":25}}},
{"$project":{"job":1}},
{"$group":{"_id":"$job", "count":{"$sum":1}}}, {"$sort":{"count":-1}}, {"$limit":5}
);
在实际使用中应该尽可能将"$match"放在管道的前面位置。这样做有两个好处:
一是可以快速将不需要的文档过滤掉,以减少管道的工作量;
二是如果在投射和分组之前执行"$match"(相当于where,如果是在分组之后执行$match,那么就相当于having),查询可以使用到索引(也就是说如果在分组和投射之后的条件筛选是用不到索引的)。
$project
$project操作(投射操作)是从文档中选择想要的字段。可以指定查询结果中显示或者不显示一个字段,还可以为字段起别名。
例如:
test.aggregate({"$project":{"job":1}});
结果为:
{ "_id" : ObjectId("60445f35b6e04e8e7e360e2e"), "job" : "programme" }
{ "_id" : ObjectId("60445f48b6e04e8e7e360e2f"), "job" : "job1" }
{ "_id" : ObjectId("60445f55b6e04e8e7e360e30"), "job" : "programme" }
{ "_id" : ObjectId("60445f68b6e04e8e7e360e31"), "job" : "manager" }
{ "_id" : ObjectId("60445f79b6e04e8e7e360e32"), "job" : "job3" }
只显示了job字段,没有显示name和age字段。
_id字段是默认显示的,如果希望不显示_id可以这样:
test.aggregate({"$project":{"job":1,"_id":0}})
将投射的字段进行重命名:
test.aggregate({"$project":{"career":"$job","_id":0}});
这里将job字段重命名为career
这里的"$fieldname"语法是为了在聚合框架中引用fieldname字段(上面的例子中是"job")的值。例如,"$age"
会被替换为"age"字段的内容(可能是数值,也可能是字符串),"$tags.3"会被替换为tags数组中的第4个元素。所以,上面例子中的"$job"会被替换为进入管道的每个文档的"job"字段的值。
可以使用这种技术生成字段的多个副本,以便在之后的"$group"中使用。
如果在"job"字段上有一个索引,但是使用了career为job重命名后,聚合框架无法在下面的排序操作中使用这个索引。
所以,应该尽量在修改字段名称之前使用索引(例如将$sort放在$project前)。
除了上面的基本用法,$project还可以接表达式例如:
db.employees.aggregate(
{
"$project" : {
"totalPay" :{
"$add" : ["$salary", "$bonus"]
}
}
})
将salary和bonus字段的值相加得到totalPay字段
数字表达式-加减乘除:
"$add" : [expr1[, expr2, , exprN]]
"$subtract" : [expr1, expr2]
"$multiply" : [expr1[, expr2, , exprN]]
"$divide" : [expr1, expr2]
还能组合使用
db.employees.aggregate(
{
"$project" : {
"totalPay" : {
"$subtract" : [{"$add" : ["$salary", "$bonus"]}, "$wh"]
}
}
})
// salary+bonus-wh
日期表达式
聚合框架中包含了一些用于提取日期信息的表达式:"$year"、“$month”、"$week"、"$dayOfMonth"、"$dayOfWeek"、"$dayOfYear"、"$hour"、"$minute"和"$second"
只能对日期类型(Date类型)的字段进行日期操作,不能对数值类型字段做日期操作。
例如:
db.employees.aggregate(
{
"$project" : {
"hiredIn" : {"$month" : "$hireDate"}
}
})
hireDate字段必须是日期类型,得到的hireIn字段是hireDate字段的月份部分。
又例如:
db.employees.aggregate(
{
"$project" : {
"tenure" : {
"$subtract" : [{"$year" : new Date()}, {"$year" : "$hireDate"}]
}
}
})
用当前时间的年减去hireDate的年。
字符串表达式
// 字符串截取(这里截取的是字节而不是字符)
"$substr" : [expr, startOffset, numToReturn]
// 连接多个字符串字段
"$concat" : [expr1[, expr2, , exprN]]
// 大小写
"$toLower" : expr
"$toUpper" : expr
例如:
db.employees.aggregate(
{
"$project" : {
"email" : {
"$concat" : [
{"$substr" : ["$firstName", 0, 1]},
".",
"$lastName",
"@example.com"
]
}
}
})
其实我建议如果可以的话把数字运算,字符串操作和逻辑运算放在代码中做,而不是交给mongodb中做。
$group
$group操作可以将文档依据特定字段的不同值进行分组。
如果选定了需要进行分组的字段,就可以将选定的字段传递给"$group"函数的"_id"字段。
例如:
{"$group" : {"_id" : "$day"}} // 对day字段分组
{"$group" : {"_id" : "$grade"}} // 对grade字段分组
{"$group" : {"_id" : {"state" : "$state", "city" : "$city"}}} //对state和city字段分组
分组时我们一般会做一些统计的计算
"$sum" 求和
"$average" 均值
db.sales.aggregate(
{
"$group" : {
"_id" : "$country",
"totalRevenue" : {"$sum" : "$revenue"}
}
})
如果使用 "$sum":1,相当于mysql中的count(*),即获取每个分类下的记录行数。
// 最大最小值
"$max" :expr
"$min" :expr
// 返回分组的第一个/最后一个值,忽略后面所有值。只有排序之后,明确知道数据顺序时这个操作才有意义。
"$first" :expr
"$last" :expr
$max"和"$min"会查看每一个文档,以便得到极值。因此,如果数据是无序的,这两个操作符也可以有效工作;如果数据是有序的,这两个操作符就会有些浪费。假设有一个存有学生考试成绩的数据集,需要找到其中的最高分与最低分。
db.scores.aggregate(
{
"$group" : {
"_id" : "$grade",
"lowestScore" : {"$min" : "$score"},
"highestScore" : {"$max" : "$score"}
}
})
另一方面,如果数据集是按照希望的字段排序过的,那么"$first"和"$last"操作符就会非常有用。下面的代码与上面的代码可以得到同样的结果:
db.scores.aggregate(
{
"$sort" : {"score" : 1}
},
{
"$group" : {
"_id" : "$grade",
"lowestScore":{"$first" :"$score"},
"highestScore" : {"$last" : "$score"}
}
})
如果数据是排过序的,那么$first和$last会比$min和$max效率更高。如果不准备对数据进行排序,那么直接使用$min和$max会比先排序再使用$first和$last效率更高。
需要注意:
在分片的情况下"$group"会先在每个分片上执行,然后各个分片上的分组结果会被发送到mongos再进行最后的统一分组,剩余的管道工作(也就是$group之后的管道操作符操作)也都是在mongos(而不是在分片)上运行的。
$unwind
拆分(unwind)可以将数组中的每一个值拆分为单独的文档。例如,如果有一篇拥有多条评论的博客文章,可以使用$unwind将每条评论拆分为一个独立的文档:
> db.blog.findOne()
{
"_id" : ObjectId("50eeffc4c82a5271290530be"),
"author" : "k",
"post" : "Hello, world!",
"comments" : [
{
"author" : "mark",
"date" : ISODate("2013-01-10T17:52:04.148Z"),
"text" : "Nice post"
},
{
"author" : "bill",
"date" : ISODate("2013-01-10T17:52:04.148Z"),
"text" : "I agree"
}
]
}
> db.blog.aggregate({"$unwind" : "$comments"})
{
"results" :
{
"_id" : ObjectId("50eeffc4c82a5271290530be"),
"author" : "k",
"post" : "Hello, world!",
"comments" : {
"author" : "mark",
"date" : ISODate("2013-01-10T17:52:04.148Z"),
"text" : "Nice post"
}
},
{
"_id" : ObjectId("50eeffc4c82a5271290530be"),
"author" : "k",
"post" : "Hello, world!",
"comments" : {
"author" : "bill",
"date" : ISODate("2013-01-10T17:52:04.148Z"),
"text" : "I agree"
}
}
],
"ok" : 1
}
如果希望在查询中得到特定的子文档,这个操作符就会非常有用:先使用"$unwind"得到所有子文档,再使用"$match"得到想要的文档。
例如,如果要得到特定用户的所有评论(只需要得到评论,不需要返回评论所属的文章),使用普通的查询是不可能做到的。但是,通过提取、拆分、匹配,就很容易了。
db.blog.aggregate(
{"$project" : {"comments" : "$comments"}}, // 提取comments字段
{"$unwind" : "$comments"}, // 将comments数组中的每一个元素拆分为一行
{"$match" : {"comments.author" : "Mark"}} // 条件匹配所有评论的作者为Mark的评论
)
如果经常需要查询特定作者的所有评论,建议对comments.author建立嵌套索引,然后在聚合的时候将$match放在最前面,这样可以用到索引:
db.blog.aggregate(
{"$match" : {"comments.author" : "Mark"}} // 条件匹配所有评论的作者为Mark的评论
{"$project" : {"comments" : "$comments"}}, // 提取comments字段
{"$unwind" : "$comments"}, // 将comments数组中的每一个元素拆分为一行
)
$sort
和$match一样,如果在投射$project和分组$group之前使用$sort排序,这时的排序操作可以使用索引,否则,排序过程就会比较慢,而且会占用大量内存。
db.employees.aggregate(
{
"$project" : {
"compensation" : {
"$add" : ["$salary", "$bonus"]
},
"name" : 1
}
},
{
"$sort" : {"compensation" : -1, "name" : 1}
}
)
这个例子会对员工排序,最终的结果是按照报酬从高到低,姓名从A到Z的顺序排列。排序方向可以是1(升序)和-1(降序)。
与前面讲过的"$group"一样,"$sort"在分片环境下,先在各个分片上进行排序,然后将各个分片的排序结果发送到mongos做进一步处理。
$limit
$limit会接受一个数字n,返回结果集中的前n个文档
$skip
$skip也是接受一个数字n,丢弃结果集中的前n个文档,将剩余文档作为结果返回。在“普通”查询中,如果需要跳过大量的数据,那么这个操作符的效率会很低。在聚合中也是如此,因为它必须要先匹配到所有需要跳过的文档,然后再将这些文档丢弃。
应该尽量在管道的开始阶段(执行"$project"、"$group"或者"$unwind"操作之前)就将尽可能多的文档和字段过滤掉($match)。因为管道如果不是直接从原先的集合中使用数据,那就无法在筛选和排序中使用索引。如果可能,聚合管道会尝试对操作进行排序,以便能够有效使用索引。
MongoDB不允许单一的聚合操作占用过多的系统内存:如果MongoDB发现某个聚合操作占用了20%以上的内存(比如由于聚合分组和排序而产生了临时集合,即类似于mysql中的临时表),这个操作就会直接输出错误。允许将输出结果利用管道放入一个集合中是为了方便以后使用(这样可以将所需的内存减至最小)。
如果能够通过"$match"操作迅速减小结果集的大小,就可以使用管道进行实时聚合。由于管道会不断包含更多的文档,会越来越复杂,所以几乎不可能实时得到管道的操作结果。
MapReduce
有些问题过于复杂,无法使用聚合框架的查询语言来表达,这时可以使用MapReduce。MapReduce使用代码块中的操作作为“查询”,因此它能够表达任意复杂的逻辑。然而,这种强大是有代价的:MapReduce非常慢,不应该用在实时的数据分析中。
本节不会对MapReduce做更多的介绍,想了解的朋友可以参考官方文档