Apache Flink: Window Functions and the beginning of time -
in windowassigner
, element gets assigned 1 or more timewindow
instances. in case of sliding event time window, happens in slidingeventtimewindows#assignwindows
1.
in case of window size=5
, slide=1
, element timestamp=0
gets assigned following windows:
- window(start=0, end=5)
- window(start=-1, end=4)
- window(start=-2, end=3)
- window(start=-3, end=2)
- window(start=-4, end=1)
in 1 picture:
+-> beginning of time | | +----------------------------------------------+ | size = 5 +--+ element | | slide = 1 | | | v | | t=[ 0,5[ window 1 xxxxx | | t=[-1,4[ window 2 xxxxx | | t=[-2,3[ window 3 xxxxx | | t=[-3,2[ window 4 xxxxx | | t=[-4,1[ window 5 xxxxx | | | | time(-4 +4) ---- | | 432101234 | +---------------------------+------------------+ | | | +
is there way tell flink there beginning of time , before, there no windows? if not, start looking change that? in above case, flink should have 1 window (t=[4,8[ window 1
) first element. this:
+-> beginning of time | | +-----------------------------------------------+ | size = 5 +--+ element | | slide = 1 | | | v | | t=[ 0,5[ window 1 xxxxx | | t=[ 1,6[ window 2 xxxxx | | t=[ 2,7[ window 3 xxxxx | | t=[ 3,8[ window 4 xxxxx | | t=[ 4,9[ window 5 xxxxx | | | | time(-4 +8) ---- | | 4321012345678 | +---------------------------+-------------------+ | | | +
this have no more effect once number of windows reaches , exceeds window size. then, in above case, elements inside of 5 windows.
footnotes:
org.apache.flink.streaming.api.windowing.assigners.slidingeventtimewindows#assignwindows
at moment there no way specify valid time interval of flink job. might little bit problematic given might want apply job on historic data well.
what do, though, filter windows start before beginning of time out manually:
val env = streamexecutionenvironment.getexecutionenvironment env.setstreamtimecharacteristic(timecharacteristic.eventtime) val starttime = 1 val windowlength = 2 val slide = 1 val input = env.fromelements((1,1), (2,2), (3,3)).assignascendingtimestamps(x => x._2) val windowed = input.timewindowall(time.milliseconds(windowlength), time.milliseconds(slide)).apply{ (window, iterable, collector: collector[int]) => if (window.getstart >= starttime) { collector.collect(iterable.map(_._1).reduce(_ + _)) } else { // discard windows } } windowed.print() env.execute()
Comments
Post a Comment