diff --git a/core/Processors/StreamTask.cs b/core/Processors/StreamTask.cs index 615ef609..7b7560eb 100644 --- a/core/Processors/StreamTask.cs +++ b/core/Processors/StreamTask.cs @@ -395,7 +395,7 @@ public override TaskScheduled RegisterScheduleTask(TimeSpan interval, Punctuatio return ScheduleTask(0L, interval, punctuationType, punctuator); case PunctuationType.PROCESSING_TIME: // align punctuation to now, punctuate after interval has elapsed - return ScheduleTask(DateTime.Now.GetMilliseconds() + (long)interval.TotalMilliseconds, interval, punctuationType, punctuator); + return ScheduleTask(DateTime.Now.GetMilliseconds(), interval, punctuationType, punctuator); default: return null; } diff --git a/test/Streamiz.Kafka.Net.Tests/Private/TaskScheduledTests.cs b/test/Streamiz.Kafka.Net.Tests/Private/TaskScheduledTests.cs index 66fc3fac..cf2b6ad1 100644 --- a/test/Streamiz.Kafka.Net.Tests/Private/TaskScheduledTests.cs +++ b/test/Streamiz.Kafka.Net.Tests/Private/TaskScheduledTests.cs @@ -23,12 +23,24 @@ public class TaskScheduledTests { class MySystemProcessor : Net.Processors.Public.IProcessor { + private readonly long scheduledMs; + + public MySystemProcessor() + :this(10) + { + } + + public MySystemProcessor(long scheduledMs) + { + this.scheduledMs = scheduledMs; + } + public long count { get; set; } public void Init(ProcessorContext context) { context.Schedule( - TimeSpan.FromMilliseconds(10), + TimeSpan.FromMilliseconds(scheduledMs), PunctuationType.PROCESSING_TIME, (now) => { ++count; @@ -187,6 +199,53 @@ private ConsumeResult CreateRecord(string topic, int partition, Offset = offset }; } + + [Test] + public void FixIssue457() + { + var config = new StreamConfig(); + config.ApplicationId = "test-punctuator"; + + var builder = new StreamBuilder(); + builder + .Stream("topic") + .Process( + new ProcessorBuilder() + .Processor(100) + .Build()); + + var topology = builder.Build(); + + var supplier = new SyncKafkaSupplier(); + var consumer = supplier.GetConsumer(config.ToConsumerConfig(), null); + var restoreConsumer = supplier.GetRestoreConsumer(config.ToConsumerConfig()); + + var storeChangelogReader = + new StoreChangelogReader(config, restoreConsumer, "thread-0", new StatestoreRestoreManager(null),new StreamMetricsRegistry()); + var streamsProducer = new StreamsProducer( + config, + "thread-0", + Guid.NewGuid(), + supplier, + ""); + + var taskCreator = new TaskCreator(topology.Builder, config, "thread-0", supplier, + storeChangelogReader, new StreamMetricsRegistry()); + var taskManager = new TaskManager(topology.Builder, taskCreator, + supplier.GetAdmin(config.ToAdminConfig("admin")), consumer, storeChangelogReader, streamsProducer); + + taskManager.CreateTasks( + new List { + new("topic", 0) + }); + taskManager.TryToCompleteRestoration(); + // See : https://github.com/LGouellec/streamiz/issues/457 + Thread.Sleep(100); + Assert.AreEqual(1, taskManager.Punctuate()); + + taskManager.Close(); + } + [Test] public void StandardSystemTimePunctuator()