diff --git a/core/Crosscutting/DictionaryExtensions.cs b/core/Crosscutting/DictionaryExtensions.cs index 23dbe790..228ecc05 100644 --- a/core/Crosscutting/DictionaryExtensions.cs +++ b/core/Crosscutting/DictionaryExtensions.cs @@ -81,13 +81,28 @@ public static IDictionary ToDictionary(this IEnumerable ToUpdateDictionary( this IEnumerable source, Func keySelector, - Func elementSelector) + Func elementSelector, + Func mergeOperator = null) { var dictonary = new Dictionary(); foreach (var element in source) { var key = keySelector(element); - dictonary[key] = elementSelector(element); + if (mergeOperator == null) + { + dictonary[key] = elementSelector(element); + } + else + { + if (dictonary.TryGetValue(key, out V previousValue)) + { + dictonary[key] = mergeOperator(previousValue, elementSelector(element)); + } + else + { + dictonary[key] = elementSelector(element); + } + } } return dictonary; diff --git a/core/Processors/DefaultStreamPartitioner.cs b/core/Processors/DefaultStreamPartitioner.cs index 3345b6ac..8edcad0c 100644 --- a/core/Processors/DefaultStreamPartitioner.cs +++ b/core/Processors/DefaultStreamPartitioner.cs @@ -9,6 +9,17 @@ namespace Streamiz.Kafka.Net.Processors /// Value record type public class DefaultStreamPartitioner : IStreamPartitioner { + private bool _resuffleKey; + + /// + /// Initialize the current partitioner. + /// + /// Global stream configuration + public void Initialize(IStreamConfig config) + { + _resuffleKey = config.DefaultPartitionerResuffleEveryKey; + } + /// /// Function used to determine how records are distributed among partitions of the topic /// @@ -20,9 +31,10 @@ public class DefaultStreamPartitioner : IStreamPartitioner /// Return the source partition as the sink partition of the record if there is enough sink partitions, Partition.Any otherwise public Partition Partition(string topic, K key, V value, Partition sourcePartition, int numPartitions) { - if (sourcePartition.Value <= numPartitions - 1) - return sourcePartition; - return Confluent.Kafka.Partition.Any; + return !_resuffleKey + && sourcePartition.Value <= numPartitions - 1 ? + sourcePartition + : Confluent.Kafka.Partition.Any; } } } \ No newline at end of file diff --git a/core/Processors/IStreamPartitioner.cs b/core/Processors/IStreamPartitioner.cs index c29d5318..d32386f4 100644 --- a/core/Processors/IStreamPartitioner.cs +++ b/core/Processors/IStreamPartitioner.cs @@ -4,6 +4,12 @@ namespace Streamiz.Kafka.Net.Processors { public interface IStreamPartitioner { + /// + /// Initialize the current partitioner + /// + /// Global stream configuration + void Initialize(IStreamConfig config); + /// /// Function used to determine how records are distributed among partitions of the topic /// diff --git a/core/Processors/Internal/WrapperStreamPartitioner.cs b/core/Processors/Internal/WrapperStreamPartitioner.cs index 608c8ec2..65945ba2 100644 --- a/core/Processors/Internal/WrapperStreamPartitioner.cs +++ b/core/Processors/Internal/WrapperStreamPartitioner.cs @@ -12,6 +12,11 @@ public WrapperStreamPartitioner(Func pa _partitioner = partitioner; } + public void Initialize(IStreamConfig config) + { + + } + public Partition Partition(string topic, K key, V value, Partition sourcePartition, int numPartitions) => _partitioner(topic, key, value, sourcePartition, numPartitions); } diff --git a/core/Processors/SinkProcessor.cs b/core/Processors/SinkProcessor.cs index 9470a9f6..6596ba6d 100644 --- a/core/Processors/SinkProcessor.cs +++ b/core/Processors/SinkProcessor.cs @@ -46,6 +46,7 @@ public override void Init(ProcessorContext context) Key?.Initialize(context.SerDesContext); Value?.Initialize(context.SerDesContext); + partitioner?.Initialize(context.Configuration); base.Init(context); } diff --git a/core/Processors/StreamTask.cs b/core/Processors/StreamTask.cs index 9dd388c8..615ef609 100644 --- a/core/Processors/StreamTask.cs +++ b/core/Processors/StreamTask.cs @@ -100,10 +100,15 @@ private void RegisterSensors() restorationRecordsSendsor = TaskMetrics.RestorationRecordsSensor(threadId, Id, streamMetricsRegistry); } - private IDictionary CheckpointableOffsets + // for testing + internal IDictionary CheckpointableOffsets => collector.CollectorOffsets - .Union(consumedOffsets.AsEnumerable()) - .ToDictionary(); + .Union(consumedOffsets.AsEnumerable()) + .ToUpdateDictionary( + tp => tp.Key, + tp => tp.Value, + (previousOffset, currentOffset) + => currentOffset > previousOffset ? currentOffset : previousOffset); private IEnumerable GetPartitionsWithOffset() { diff --git a/core/StreamConfig.cs b/core/StreamConfig.cs index aac2aec8..52505e89 100644 --- a/core/StreamConfig.cs +++ b/core/StreamConfig.cs @@ -347,6 +347,14 @@ public interface IStreamConfig : ICloneable /// Timeout for QueryWatermarkOffsets operations when restoring state stores. (Default: 5 seconds) /// TimeSpan QueryWatermarkOffsetsTimeout { get; set; } + + /// + /// Enable the automatic resuffle with the default stream partitioner. (Default: true) + /// If the value is true, when sending to a topic, the partition is always recalculated based on the destination topic's partition count. + /// If the value is false, the default partitioner check if the destination topic has the same or more partitions as the source, if yes the source partition is used, even if the key has changed otherwise the partition is recalculated. + /// Check + /// + bool DefaultPartitionerResuffleEveryKey { get; set; } #endregion @@ -500,6 +508,7 @@ public int GetHashCode(KeyValuePair obj) private const string logProcessingSummaryCst = "log.processing.summary"; private const string stateStoreCacheMaxBytesCst = "statestore.cache.max.bytes"; private const string queryWatermarkOffsetsTimeoutMsCst = "query.watermark.offsets.timeout.ms"; + private const string defaultPartitionerResuffleEveryKeyCst = "default.partitioner.resuffle.every.key"; /// /// Default commit interval in milliseconds when exactly once is not enabled @@ -2463,6 +2472,7 @@ public StreamConfig(IDictionary properties, ILoggerFactory logg MaxDegreeOfParallelism = 8; DefaultStateStoreCacheMaxBytes = 5 * 1024 * 1024; QueryWatermarkOffsetsTimeout = TimeSpan.FromSeconds(5); + DefaultPartitionerResuffleEveryKey = true; _consumerConfig = new ConsumerConfig(); _producerConfig = new ProducerConfig(); @@ -2965,6 +2975,20 @@ public TimeSpan QueryWatermarkOffsetsTimeout get => configProperties[queryWatermarkOffsetsTimeoutMsCst]; set => configProperties.AddOrUpdate(queryWatermarkOffsetsTimeoutMsCst, value); } + + + /// + /// Enable the automatic resuffle with the default stream partitioner. (Default: true) + /// If the value is true, when sending to a topic, the partition is always recalculated based on the destination topic's partition count. + /// If the value is false, the default partitioner check if the destination topic has the same or more partitions as the source, if yes the source partition is used, even if the key has changed else partition is recalculated. + /// Check + /// + [StreamConfigProperty("" + defaultPartitionerResuffleEveryKeyCst)] + public bool DefaultPartitionerResuffleEveryKey + { + get => configProperties[defaultPartitionerResuffleEveryKeyCst]; + set => configProperties.AddOrUpdate(defaultPartitionerResuffleEveryKeyCst, value); + } /// /// Get the configs to the diff --git a/test/Streamiz.Kafka.Net.Tests/Helpers/DictionaryExtensionsTests.cs b/test/Streamiz.Kafka.Net.Tests/Helpers/DictionaryExtensionsTests.cs new file mode 100644 index 00000000..bb56c671 --- /dev/null +++ b/test/Streamiz.Kafka.Net.Tests/Helpers/DictionaryExtensionsTests.cs @@ -0,0 +1,32 @@ +using System.Collections.Generic; +using NUnit.Framework; +using Streamiz.Kafka.Net.Crosscutting; + +namespace Streamiz.Kafka.Net.Tests.Helpers; + +public class DictionaryExtensionsTests +{ + [Test] + public void ToUpdateDictionaryWithMergeOperatorTest() + { + var dictionary = new List> + { + new("a", 1L), + new("b", 1L), + new("c", 1L), + new("d", 1L), + new("a", 2L), + new("b", 3L), + }; + + var results = dictionary.ToUpdateDictionary( + kv => kv.Key, + kv => kv.Value, + (@old, @new) => @new); + + Assert.AreEqual(2L, results["a"]); + Assert.AreEqual(3L, results["b"]); + Assert.AreEqual(1L, results["c"]); + Assert.AreEqual(1L, results["d"]); + } +} \ No newline at end of file diff --git a/test/Streamiz.Kafka.Net.Tests/Private/StreamTaskTests.cs b/test/Streamiz.Kafka.Net.Tests/Private/StreamTaskTests.cs index 7e1181e4..bec123fd 100644 --- a/test/Streamiz.Kafka.Net.Tests/Private/StreamTaskTests.cs +++ b/test/Streamiz.Kafka.Net.Tests/Private/StreamTaskTests.cs @@ -455,5 +455,101 @@ public void StreamTaskWrittingCheckpoint() Directory.Delete(config.StateDir, true); } + + [Test] + public void StreamTaskCheckpointOffsetTest() + { + var config = new StreamConfig(); + config.ApplicationId = "test-app"; + config.StateDir = Path.Combine(".", Guid.NewGuid().ToString()); + + var serdes = new StringSerDes(); + var builder = new StreamBuilder(); + + builder + .Stream("topic", new StringSerDes(), new StringSerDes()) + .To("topic", new StringSerDes(), new StringSerDes()); + + TaskId id = new TaskId {Id = 0, Partition = 0}; + var topology = builder.Build(); + topology.Builder.RewriteTopology(config); + + var processorTopology = topology.Builder.BuildTopology(id); + + var supplier = new SyncKafkaSupplier(); + var consumer = supplier.GetConsumer(config.ToConsumerConfig(), null); + + var streamsProducer = new StreamsProducer( + config, + "thread-0", + Guid.NewGuid(), + supplier, + "log-prefix"); + + var tp = new TopicPartition("topic", 0); + StreamTask task = new StreamTask( + "thread-0", + id, + new List + { + tp , + }, + processorTopology, + consumer, + config, + supplier, + streamsProducer, + new MockChangelogRegister() + , new StreamMetricsRegistry()); + + + void AddRecord(string key, string value, TopicPartition tp, long offset) + { + task.AddRecord( new ConsumeResult { + Message = new Message + { + Key = serdes.Serialize(key, new SerializationContext()), + Value = serdes.Serialize(value, new SerializationContext()) + }, + TopicPartitionOffset = new TopicPartitionOffset( tp, offset) + }); + } + + task.InitializeStateStores(); + task.InitializeTopology(); + task.RestorationIfNeeded(); + task.CompleteRestoration(); + + AddRecord("key1", "value1", tp, 0L); + AddRecord("key2", "value2", tp,3L); + AddRecord("key3", "value3", tp,4L); + AddRecord("key4", "value4", tp,1L); + + Assert.IsTrue(task.CanProcess(DateTime.Now.GetMilliseconds())); + + while (task.CanProcess(DateTime.Now.GetMilliseconds())) + Assert.IsTrue(task.Process()); + + var checkpointOffsets = task.CheckpointableOffsets; + + Assert.AreEqual(3L, checkpointOffsets[tp]); + + AddRecord("key5", "value5", tp,5L); + + Assert.IsTrue(task.CanProcess(DateTime.Now.GetMilliseconds())); + + while (task.CanProcess(DateTime.Now.GetMilliseconds())) + Assert.IsTrue(task.Process()); + + var checkpointOffsets2 = task.CheckpointableOffsets; + + Assert.AreEqual(5L, checkpointOffsets2[tp]); + + task.Suspend(); + task.Close(false); + + if(Directory.Exists(config.StateDir)) + Directory.Delete(config.StateDir, true); + } } } \ No newline at end of file diff --git a/test/Streamiz.Kafka.Net.Tests/Processors/KStreamToTests.cs b/test/Streamiz.Kafka.Net.Tests/Processors/KStreamToTests.cs index 9c4833e2..a431c7fc 100644 --- a/test/Streamiz.Kafka.Net.Tests/Processors/KStreamToTests.cs +++ b/test/Streamiz.Kafka.Net.Tests/Processors/KStreamToTests.cs @@ -15,6 +15,11 @@ public class KStreamToTests { private class MyStreamPartitioner : IStreamPartitioner { + public void Initialize(IStreamConfig config) + { + + } + public Partition Partition(string topic, string key, string value, Partition sourcePartition, int numPartitions) { switch (key)