前面文章介绍了Flink的任务执行流程,每一个operator都会有对应的Task去执行,如果程序中使用了window的话,当程序执行到window的task时就会调用WindowOperator中的实现。
public void processElement(StreamRecord<IN> element) throws Exception {
//根据元素划分窗口
final Collection<W> elementWindows = windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(), windowAssignerContext);
//if element is handled by none of assigned elementWindows
boolean isSkippedElement = true;
//拿到划分的Key
final K key = this.<K>getKeyedStateBackend().getCurrentKey();
//如果是session window 则会有合并的过程
if (windowAssigner instanceof MergingWindowAssigner) {
MergingWindowSet<W> mergingWindows = getMergingWindowSet();
//...session window这里不讨论
} else {
//for循环是因为如果是slide window的话一条数据可能属于多个窗口
for (W window: elementWindows) {
// drop if the window is already late
if (isWindowLate(window)) {
continue;
}
isSkippedElement = false;
windowState.setCurrentNamespace(window);
//将数据存入window state
windowState.add(element.getValue());
triggerContext.key = key;
triggerContext.window = window;
//调用trigger的onElement方法,Trigger可以是eventTime、processTime或自定义的trigger
TriggerResult triggerResult = triggerContext.onElement(element);
//如果返回结果是Fire则触发计算
if (triggerResult.isFire()) {
ACC contents = windowState.get();
if (contents == null) {
continue;
}
emitWindowContents(window, contents);
}
//如果同时要清除窗口状态的话则清除
if (triggerResult.isPurge()) {
windowState.clear();
}
//清理定时器
registerCleanupTimer(window);
}
}
// side output input event if
// element not handled by any window
// late arriving tag has been set
// windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
//迟到的元素通过side output输出
if (isSkippedElement && isElementLate(element)) {
if (lateDataOutputTag != null){
sideOutput(element);
} else {
this.numLateRecordsDropped.inc();
}
}
}
Trigger
不同的trigger实现有所不同,分别看下eventTime和processTime的 EventTimeTrigger
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
判断当前水印是否大于等于窗口结束时间,是的话则返回FIRE,否则将窗口结束时间注册为定时器,返回CONTINUE。
public void registerEventTimeTimer(N namespace, long time) {
eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
}
直接添加到的优先级队列中。
再看ProcessingTimeTrigger
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
ctx.registerProcessingTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
直接注册定时器,返回CONTINUE,因为process time是基于机器系统时间来判断的,跟元素eventtime没有关系。
@Override
public void registerProcessingTimeTimer(N namespace, long time) {
InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {
long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
// check if we need to re-schedule our timer to earlier
if (time < nextTriggerTime) {
if (nextTimer != null) {
nextTimer.cancel(false);
}
nextTimer = processingTimeService.registerTimer(time, this::onProcessingTime);
}
}
}
注册定时器差异比较大,先从优先级队列中取出时间最近的定时器,尝试吧当前时间注册进去,如果成功了,判断当前时间是不是比最近的定时器还要早,是的话就取消最近的定时器。 这里涉及java8的方法引用传递的概念,把onProcessingTime的引用传递给registerTimer ProcessingTimeServiceImpl
public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target) {
return timerService.registerTimer(timestamp, processingTimeCallbackWrapper.apply(target));
}
SystemProcessingTimeService
/** The executor service that schedules and calls the triggers of this task. */
private final ScheduledThreadPoolExecutor timerService;
@Override
public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback callback) {
// delay the firing of the timer by 1 ms to align the semantics with watermark. A watermark
// T says we won't see elements in the future with a timestamp smaller or equal to T.
// With processing time, we therefore need to delay firing the timer by one ms.
//延迟调度时间=窗口的结束时间-当前时间+1ms
long delay = Math.max(timestamp - getCurrentProcessingTime(), 0) + 1;
// we directly try to register the timer and only react to the status on exception
// that way we save unnecessary volatile accesses for each timer
try {
//将上面的回调函数包装成task放到线程池中调度
return timerService.schedule(wrapOnTimerCallback(callback, timestamp), delay, TimeUnit.MILLISECONDS);
}
catch (RejectedExecutionException e) {
final int status = this.status.get();
if (status == STATUS_QUIESCED) {
return new NeverCompleteFuture(delay);
}
else if (status == STATUS_SHUTDOWN) {
throw new IllegalStateException("Timer service is shut down");
}
else {
// something else happened, so propagate the exception
throw e;
}
}
}
private Runnable wrapOnTimerCallback(ProcessingTimeCallback callback, long timestamp) {
//起一个调度线程,将回调函数传递进去
return new ScheduledTask(status, exceptionHandler, callback, timestamp, 0);
}
当延迟时间到达时会调用ScheduledTask的run方法
public void run() {
if (serviceStatus.get() != STATUS_ALIVE) {
return;
}
try {
callback.onProcessingTime(nextTimestamp);
} catch (Exception ex) {
exceptionHandler.handleException(ex);
}
nextTimestamp += period;
}
上面的callback就是上面onProcessingTime的引用,当触发 InternalTimerServiceImpl.onProcessingTime() 回调后,会从优先级队列中取出所有符合条件的触发器,并调用 triggerTarget.onProcessingTime(timer)。
private void onProcessingTime(long time) throws Exception {
// null out the timer in case the Triggerable calls registerProcessingTimeTimer()
// inside the callback.
nextTimer = null;
InternalTimer<K, N> timer;
while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
processingTimeTimersQueue.poll();
keyContext.setCurrentKey(timer.getKey());
triggerTarget.onProcessingTime(timer);//调用对应trigger的onProcessingTime方法
}
if (timer != null && nextTimer == null) {
//注册下一个定时器
nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this::onProcessingTime);
}
}
这样就会不断循环下去。 上面是Trigger的过程,当窗口触发计算后,如果还需要清除状态,那么就把状态清除,然后取消当前窗口的定时器。
Evicotr
如果使用了Evictor的话,则会调用EvictingWindowOperator.processElement和上面不同的是emitWindowContents方法
private void emitWindowContents(W window, Iterable<StreamRecord<IN>> contents, ListState<StreamRecord<IN>> windowState) throws Exception {
timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
// Work around type system restrictions...
FluentIterable<TimestampedValue<IN>> recordsWithTimestamp = FluentIterable
.from(contents)
.transform(new Function<StreamRecord<IN>, TimestampedValue<IN>>() {
@Override
public TimestampedValue<IN> apply(StreamRecord<IN> input) {
return TimestampedValue.from(input);
}
});
evictorContext.evictBefore(recordsWithTimestamp, Iterables.size(recordsWithTimestamp));
FluentIterable<IN> projectedContents = recordsWithTimestamp
.transform(new Function<TimestampedValue<IN>, IN>() {
@Override
public IN apply(TimestampedValue<IN> input) {
return input.getValue();
}
});
processContext.window = triggerContext.window;
userFunction.process(triggerContext.key, triggerContext.window, processContext, projectedContents, timestampedCollector);
evictorContext.evictAfter(recordsWithTimestamp, Iterables.size(recordsWithTimestamp));
//work around to fix FLINK-4369, remove the evicted elements from the windowState.
//this is inefficient, but there is no other way to remove elements from ListState, which is an AppendingState.
windowState.clear();
for (TimestampedValue<IN> record : recordsWithTimestamp) {
windowState.add(record.getStreamRecord());
}
}
在真正窗口处理函数之前和之后分别调用evictBefore和evictAfter方法来做一些过滤。
详细的窗口demo、trigger、evictor、windowFunction的使用样例,参见我的项目 https://github.com/zhuxiaoshang/flink-be-god