MongoDB按照天数或小时聚合
需求
最近接到需求,需要对用户账户下的设备状态,分别按照天以及小时进行聚合,以此为基础绘制设备状态趋势图.
实现思路是启动定时任务,对各用户的设备状态数据分别按照小时以及天进行聚合,并存储进数据库*用户后续查询.
涉及到的技术栈分别为:Spring Boot
,MongoDB,Morphia
.
数据模型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
@Data
@Builder
@Entity(value = "rawDevStatus" , noClassnameStored = true )
// 设备状态索引
@Indexes({
// 设置数据超时时间(TTL,MongoDB根据TTL在后台进行数据删除操作)
@ Index (fields = @Field( "time" ), options = @IndexOptions(expireAfterSeconds = 3600 * 24 * 72)),
@ Index (fields = {@Field( "userId" ), @Field(value = "time" , type = IndexType. DESC )})
})
public class RawDevStatus {
@Id
@JsonProperty(access = JsonProperty.Access.WRITE_ONLY)
private ObjectId objectId;
private String userId;
private Instant time ;
@Embedded( "points" )
List<Point> protocolPoints;
@Data
@AllArgsConstructor
public static class Point {
/**
* 协议类型
*/
private Protocol protocol;
/**
* 设备总数
*/
private Integer total;
/**
* 设备在线数目
*/
private Integer onlineNum;
/**
* 处于启用状态设备数目
*/
private Integer enableNum;
}
}
|
上述代码是设备状态实体类,其中设备状态数据是按照设备所属协议进行区分的.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
@Data
@Builder
@Entity(value = "aggregationDevStatus" , noClassnameStored = true )
@Indexes({
@ Index (fields = @Field( "expireAt" ), options = @IndexOptions(expireAfterSeconds = 0)),
@ Index (fields = {@Field( "userId" ), @Field(value = "time" , type = IndexType. DESC )})
})
public class AggregationDevStatus {
@Id
@JsonProperty(access = JsonProperty.Access.WRITE_ONLY)
private ObjectId objectId;
/**
* 用户ID
*/
private String userId;
/**
* 设备总数
*/
private Double total;
/**
* 设备在线数目
*/
private Double onlineNum;
/**
* 处于启用状态设备数目
*/
private Double enableNum;
/**
* 聚合类型(按照小时还是按照天聚合)
*/
@Property( "aggDuration" )
private AggregationDuration aggregationDuration;
private Instant time ;
/**
* 动态设置文档过期时间
*/
private Instant expireAt;
}
|
上述代码是期待的聚合结果,其中构建两个索引:(1)超时索引;(2)复合索引,程序会根据用户名以及时间查询设备状态聚合结果.
聚合操作符介绍
聚合操作类似于管道,管道中的每一步操作产生的中间结果作为下一步的输入源,最终输出聚合结果.
此次聚合主要涉及以下操作:
•$project:指定输出文档中的字段.
•$unwind:拆分数据中的数组;
•match:选择要处理的文档数据;
•group:根据key分组聚合结果.
原始聚合语句
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
db.getCollection( 'raw_dev_status' ).aggregate([
{$match:
{
time :{$gte: ISODate( "2019-06-27T00:00:00Z" )},
}
},
{$unwind: "$points" },
{$project:
{
userId:1,points:1,
tmp: {$dateToString: { format: "%Y:%m:%dT%H:00:00Z" , date : "$time" } }
}
},
{$project:
{
userId:1,points:1,
groupTime: {$dateFromString: { dateString: "$tmp" , format: "%Y:%m:%dT%H:%M:%SZ" , } }
}
},
{$ group :
{
_id:{user_id: '$userId' , cal_time: '$groupTime' },
devTotal:{ '$avg' : '$points.total' },
onlineTotal:{ '$avg' : '$points.onlineNum' },
enableTotal:{ '$avg' : '$points.enableNum' }
}
},
])
|
上述代码是按小时聚合数据,以下来逐步介绍处理思路:
(1) $match
根据小时聚合数据,因为只需要获取近24小时的聚合结果,所以对数据进行初步筛选.
(2) $unwind
raw_dev_status中的设备状态是按照协议区分的数组,因此需要对其进行展开,以便下一步进行筛选;
(3) $project
1
2
3
4
5
6
|
{$project:
{
userId:1,points:1,
tmp: {$dateToString: { format: "%Y:%m:%dT%H:00:00Z" , date : "$time" } }
}
}
|
选择需要输出的数据,分别为:userId,points
以及tmp.
需要注意,为了按照时间聚合,对$time属性进行操作,提取%Y:%m:%dT%H时信息至$tmp作为下一步的聚合依据.
如果需要按天聚合,则format数据可修改为:%Y:%m:%dT00:00:00Z
即可满足要求.
(4) $project
1
2
3
4
5
6
|
{$project:
{
userId:1,points:1,
groupTime: {$dateFromString: { dateString: "$tmp" , format: "%Y:%m:%dT%H:%M:%SZ" , } }
}
}
|
因为上一步project操作中,tmp为字符串数据,最终的聚合结果需要时间戳(主要懒,不想在程序中进行转换操作).
因此,此处对$tmp进行操作,转换为时间类型数据,即groupTime.
(5) $group
对聚合结果进行分类操作,并生成最终输出结果.
1
2
3
4
5
6
7
8
9
10
11
12
|
{$ group :
{
# 根据_id进行分组操作,依据是`user_id`以及`$groupTime`
_id:{user_id: '$userId' , cal_time: '$groupTime' },
# 求设备总数平均值
devTotal:{ '$avg' : '$points.total' },
# 求设备在线数平均值
onlineTotal:{ '$avg' : '$points.onlineNum' },
# ...
enableTotal:{ '$avg' : '$points.enableNum' }
}
}
|
代码编写
此处ODM选择Morphia,亦可以使用MongoTemplate,原理类似.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
/**
* 创建聚合条件
*
* @param pastTime 过去时间段
* @param dateToString 格式化字符串(%Y:%m:%dT%H:00:00Z或%Y:%m:%dT00:00:00Z)
* @ return 聚合条件
*/
private AggregationPipeline createAggregationPipeline(Instant pastTime, String dateToString, String stringToDate) {
Query<RawDevStatus> query = datastore.createQuery(RawDevStatus.class);
return datastore.createAggregation(RawDevStatus.class)
.match(query.field( "time" ).greaterThanOrEq(pastTime))
.unwind( "points" , new UnwindOptions().preserveNullAndEmptyArrays( false ))
.match(query.field( "points.protocol" ).equal( "ALL" ))
.project(Projection.projection( "userId" ),
Projection.projection( "points" ),
Projection.projection( "convertTime" ,
Projection.expression( "$dateToString" ,
new BasicDBObject( "format" , dateToString)
.append( "date" , "$time" ))
)
)
.project(Projection.projection( "userId" ),
Projection.projection( "points" ),
Projection.projection( "convertTime" ,
Projection.expression( "$dateFromString" ,
new BasicDBObject( "format" , stringToDate)
.append( "dateString" , "$convertTime" ))
)
)
. group (
Group .id( Group . grouping ( "userId" ), Group . grouping ( "convertTime" )),
Group . grouping ( "total" , Group .average( "points.total" )),
Group . grouping ( "onlineNum" , Group .average( "points.onlineNum" )),
Group . grouping ( "enableNum" , Group .average( "points.enableNum" ))
);
}
/**
* 获取聚合结果
*
* @param pipeline 聚合条件
* @ return 聚合结果
*/
private List<AggregationMidDevStatus> getAggregationResult(AggregationPipeline pipeline) {
List<AggregationMidDevStatus> statuses = new ArrayList<>();
Iterator<AggregationMidDevStatus> resultIterator = pipeline.aggregate(
AggregationMidDevStatus.class, AggregationOptions.builder().allowDiskUse( true ).build());
while (resultIterator.hasNext()) {
statuses. add (resultIterator. next ());
}
return statuses;
}
//......................................................................................
// 获取聚合结果(省略若干代码)
AggregationPipeline pipeline = createAggregationPipeline(pastTime, dateToString, stringToDate);
List<AggregationMidDevStatus> midStatuses = getAggregationResult(pipeline);
if (CollectionUtils.isEmpty(midStatuses)) {
log.warn( "Can not get dev status aggregation result." );
return ;
}
|
总结
以上所述是小编给大家介绍的基于Morphia实现MongoDB按小时、按天聚合操作方法,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对服务器之家网站的支持!
如果你觉得本文对你有帮助,欢迎转载,烦请注明出处,谢谢!
原文链接:https://www.cnblogs.com/jason1990/archive/2019/07/31/11269658.html