Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/Processors/StreamTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ public override TaskScheduled RegisterScheduleTask(TimeSpan interval, Punctuatio
return ScheduleTask(0L, interval, punctuationType, punctuator);
case PunctuationType.PROCESSING_TIME:
// align punctuation to now, punctuate after interval has elapsed
return ScheduleTask(DateTime.Now.GetMilliseconds() + (long)interval.TotalMilliseconds, interval, punctuationType, punctuator);
return ScheduleTask(DateTime.Now.GetMilliseconds(), interval, punctuationType, punctuator);
default:
return null;
}
Expand Down
61 changes: 60 additions & 1 deletion test/Streamiz.Kafka.Net.Tests/Private/TaskScheduledTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,24 @@ public class TaskScheduledTests
{
class MySystemProcessor : Net.Processors.Public.IProcessor<string, string>
{
private readonly long scheduledMs;

public MySystemProcessor()
:this(10)
{
}

public MySystemProcessor(long scheduledMs)
{
this.scheduledMs = scheduledMs;
}

public long count { get; set; }

public void Init(ProcessorContext<string, string> context)
{
context.Schedule(
TimeSpan.FromMilliseconds(10),
TimeSpan.FromMilliseconds(scheduledMs),
PunctuationType.PROCESSING_TIME,
(now) => {
++count;
Expand Down Expand Up @@ -187,6 +199,53 @@ private ConsumeResult<byte[], byte[]> CreateRecord(string topic, int partition,
Offset = offset
};
}

[Test]
public void FixIssue457()
{
var config = new StreamConfig<StringSerDes, StringSerDes>();
config.ApplicationId = "test-punctuator";

var builder = new StreamBuilder();
builder
.Stream<string, string>("topic")
.Process(
new ProcessorBuilder<string, string>()
.Processor<MySystemProcessor>(100)
.Build());

var topology = builder.Build();

var supplier = new SyncKafkaSupplier();
var consumer = supplier.GetConsumer(config.ToConsumerConfig(), null);
var restoreConsumer = supplier.GetRestoreConsumer(config.ToConsumerConfig());

var storeChangelogReader =
new StoreChangelogReader(config, restoreConsumer, "thread-0", new StatestoreRestoreManager(null),new StreamMetricsRegistry());
var streamsProducer = new StreamsProducer(
config,
"thread-0",
Guid.NewGuid(),
supplier,
"");

var taskCreator = new TaskCreator(topology.Builder, config, "thread-0", supplier,
storeChangelogReader, new StreamMetricsRegistry());
var taskManager = new TaskManager(topology.Builder, taskCreator,
supplier.GetAdmin(config.ToAdminConfig("admin")), consumer, storeChangelogReader, streamsProducer);

taskManager.CreateTasks(
new List<TopicPartition> {
new("topic", 0)
});
taskManager.TryToCompleteRestoration();
// See : https://github.com/LGouellec/streamiz/issues/457
Thread.Sleep(100);
Assert.AreEqual(1, taskManager.Punctuate());

taskManager.Close();
}


[Test]
public void StandardSystemTimePunctuator()
Expand Down
Loading