-
-
Notifications
You must be signed in to change notification settings - Fork 79
PROCESSING_TIME punctuations fire at 2x the scheduled interval #457
Description
PROCESSING_TIME punctuations scheduled via ProcessorContext.Schedule() fire at approximately 2x the expected interval, not at the scheduled interval.
For comparison, the STREAM_TIME case works correctly because it uses startTime = 0L, meaning the first punctuation fires as soon as any stream time >= interval is observed.
Expected Behavior
When scheduling a PROCESSING_TIME punctuation with an interval of 100ms, the first punctuation should fire after ~100ms, and subsequent punctuations should fire at ~100ms intervals.
Actual Behavior
The first punctuation fires after ~200ms (2x the interval), and subsequent punctuations fire at ~100ms intervals.
Root Cause
In StreamTask.RegisterScheduleTask(), for PROCESSING_TIME:
case PunctuationType.PROCESSING_TIME:
// align punctuation to now, punctuate after interval has elapsed
return ScheduleTask(DateTime.Now.GetMilliseconds() + (long)interval.TotalMilliseconds, interval, punctuationType, punctuator);The startTime is set to now + interval.
However, TaskScheduled.CanExecute() checks:
internal bool CanExecute(long now)
=> now - lastTime >= interval.TotalMilliseconds;Where lastTime is initialized to startTime.
So for the first check:
lastTime = now + intervalCanExecutecheck:now - (now + interval) >= interval- This simplifies to:
-interval >= interval= false
The punctuation won't fire until now = startTime + interval, which means:
now = (originalNow + interval) + interval = originalNow + 2*interval
Impact
- Any processor using
PROCESSING_TIMEpunctuations experiences delayed first punctuation - This affects buffering transformers, windowed operations, and any time-sensitive processing
- The issue is subtle because subsequent punctuations fire at the correct interval (after
Execute()updateslastTime = now)
Suggestion
Change the startTime for PROCESSING_TIME to be the current time (not now + interval):
case PunctuationType.PROCESSING_TIME:
// Set startTime to now, so first punctuation fires after 'interval' has elapsed
// CanExecute checks: now - lastTime >= interval
// With lastTime = now, the first check becomes: (now + interval) - now >= interval = true
return ScheduleTask(GetWallClockTime(), interval, punctuationType, punctuator);Additional Context
This issue was discovered while I was implementing a way to advance wall clock time in analogous to Java Kafka Streams' advanceWallClockTime for unit testing PROCESSING_TIME punctuations. This issue became obvious when controlling time explicitly in tests.
Environment
- Streamiz.Kafka.Net version: 1.7.1 (and likely all prior versions)
- .NET version: 8.0