我们使用.keyBy(“itemId”)对商品进行分组,使用.timeWindow(Time size, Time slide)对每个商品做滑动窗口(1小时窗口,5分钟滑动一次)。然后我们使用 .aggregate(AggregateFunction af, WindowFunction wf) 做增量的聚合操作,它能使用AggregateFunction提前聚合掉数据,减少 state 的存储压力。较之.apply(WindowFunction wf)会将窗口中的数据都存储下来,最后一起计算要高效地多。aggregate()方法的第一个参数用于这里的CountAgg实现了AggregateFunction接口,功能是统计窗口中的条数,即遇到一条数据就加一。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/** COUNT 统计的聚合函数实现,每出现一条记录加一 */ publicstaticclassCountAggimplementsAggregateFunction<UserBehavior, Long, Long> { @Override public Long createAccumulator(){ return0L; } @Override public Long add(UserBehavior userBehavior, Long acc){ return acc + 1; } @Override public Long getResult(Long acc){ return acc; } @Override public Long merge(Long acc1, Long acc2){ return acc1 + acc2; } }
.aggregate(AggregateFunction af, WindowFunction wf) 的第二个参数WindowFunction将每个 key每个窗口聚合后的结果带上其他信息进行输出。我们这里实现的WindowResultFunction将主键商品ID,窗口,点击量封装成了ItemViewCount进行输出。