Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions core/Crosscutting/DictionaryExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,28 @@ public static IDictionary<K, V> ToDictionary<K, V>(this IEnumerable<KeyValuePair
public static IDictionary<K, V> ToUpdateDictionary<T, K, V>(
this IEnumerable<T> source,
Func<T, K> keySelector,
Func<T, V> elementSelector)
Func<T, V> elementSelector,
Func<V, V, V> mergeOperator = null)
{
var dictonary = new Dictionary<K, V>();
foreach (var element in source)
{
var key = keySelector(element);
dictonary[key] = elementSelector(element);
if (mergeOperator == null)
{
dictonary[key] = elementSelector(element);
}
else
{
if (dictonary.TryGetValue(key, out V previousValue))
{
dictonary[key] = mergeOperator(previousValue, elementSelector(element));
}
else
{
dictonary[key] = elementSelector(element);
}
}
}

return dictonary;
Expand Down
18 changes: 15 additions & 3 deletions core/Processors/DefaultStreamPartitioner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,17 @@ namespace Streamiz.Kafka.Net.Processors
/// <typeparam name="V">Value record type</typeparam>
public class DefaultStreamPartitioner<K, V> : IStreamPartitioner<K, V>
{
private bool _resuffleKey;

/// <summary>
/// Initialize the current partitioner.
/// </summary>
/// <param name="config">Global stream configuration</param>
public void Initialize(IStreamConfig config)
{
_resuffleKey = config.DefaultPartitionerResuffleEveryKey;
}

/// <summary>
/// Function used to determine how records are distributed among partitions of the topic
/// </summary>
Expand All @@ -20,9 +31,10 @@ public class DefaultStreamPartitioner<K, V> : IStreamPartitioner<K, V>
/// <returns>Return the source partition as the sink partition of the record if there is enough sink partitions, Partition.Any otherwise</returns>
public Partition Partition(string topic, K key, V value, Partition sourcePartition, int numPartitions)
{
if (sourcePartition.Value <= numPartitions - 1)
return sourcePartition;
return Confluent.Kafka.Partition.Any;
return !_resuffleKey
&& sourcePartition.Value <= numPartitions - 1 ?
sourcePartition
: Confluent.Kafka.Partition.Any;
}
}
}
6 changes: 6 additions & 0 deletions core/Processors/IStreamPartitioner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ namespace Streamiz.Kafka.Net.Processors
{
public interface IStreamPartitioner<K, V>
{
/// <summary>
/// Initialize the current partitioner
/// </summary>
/// <param name="config">Global stream configuration</param>
void Initialize(IStreamConfig config);

/// <summary>
/// Function used to determine how records are distributed among partitions of the topic
/// </summary>
Expand Down
5 changes: 5 additions & 0 deletions core/Processors/Internal/WrapperStreamPartitioner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ public WrapperStreamPartitioner(Func<string, K, V, Partition, int, Partition> pa
_partitioner = partitioner;
}

public void Initialize(IStreamConfig config)
{

}

public Partition Partition(string topic, K key, V value, Partition sourcePartition, int numPartitions)
=> _partitioner(topic, key, value, sourcePartition, numPartitions);
}
Expand Down
1 change: 1 addition & 0 deletions core/Processors/SinkProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public override void Init(ProcessorContext context)

Key?.Initialize(context.SerDesContext);
Value?.Initialize(context.SerDesContext);
partitioner?.Initialize(context.Configuration);

base.Init(context);
}
Expand Down
11 changes: 8 additions & 3 deletions core/Processors/StreamTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,15 @@ private void RegisterSensors()
restorationRecordsSendsor = TaskMetrics.RestorationRecordsSensor(threadId, Id, streamMetricsRegistry);
}

private IDictionary<TopicPartition, long> CheckpointableOffsets
// for testing
internal IDictionary<TopicPartition, long> CheckpointableOffsets
=> collector.CollectorOffsets
.Union(consumedOffsets.AsEnumerable())
.ToDictionary();
.Union(consumedOffsets.AsEnumerable())
.ToUpdateDictionary(
tp => tp.Key,
tp => tp.Value,
(previousOffset, currentOffset)
=> currentOffset > previousOffset ? currentOffset : previousOffset);

private IEnumerable<TopicPartitionOffset> GetPartitionsWithOffset()
{
Expand Down
24 changes: 24 additions & 0 deletions core/StreamConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,14 @@ public interface IStreamConfig : ICloneable<IStreamConfig>
/// Timeout for QueryWatermarkOffsets operations when restoring state stores. (Default: 5 seconds)
/// </summary>
TimeSpan QueryWatermarkOffsetsTimeout { get; set; }

/// <summary>
/// Enable the automatic resuffle with the default stream partitioner. (Default: true)
/// If the value is true, when sending to a topic, the partition is always recalculated based on the destination topic's partition count.
/// If the value is false, the default partitioner check if the destination topic has the same or more partitions as the source, if yes the source partition is used, even if the key has changed otherwise the partition is recalculated.
/// Check <see cref="DefaultStreamPartitioner{K,V}"/>
/// </summary>
bool DefaultPartitionerResuffleEveryKey { get; set; }

#endregion

Expand Down Expand Up @@ -500,6 +508,7 @@ public int GetHashCode(KeyValuePair<string, string> obj)
private const string logProcessingSummaryCst = "log.processing.summary";
private const string stateStoreCacheMaxBytesCst = "statestore.cache.max.bytes";
private const string queryWatermarkOffsetsTimeoutMsCst = "query.watermark.offsets.timeout.ms";
private const string defaultPartitionerResuffleEveryKeyCst = "default.partitioner.resuffle.every.key";

/// <summary>
/// Default commit interval in milliseconds when exactly once is not enabled
Expand Down Expand Up @@ -2463,6 +2472,7 @@ public StreamConfig(IDictionary<string, dynamic> properties, ILoggerFactory logg
MaxDegreeOfParallelism = 8;
DefaultStateStoreCacheMaxBytes = 5 * 1024 * 1024;
QueryWatermarkOffsetsTimeout = TimeSpan.FromSeconds(5);
DefaultPartitionerResuffleEveryKey = true;

_consumerConfig = new ConsumerConfig();
_producerConfig = new ProducerConfig();
Expand Down Expand Up @@ -2965,6 +2975,20 @@ public TimeSpan QueryWatermarkOffsetsTimeout
get => configProperties[queryWatermarkOffsetsTimeoutMsCst];
set => configProperties.AddOrUpdate(queryWatermarkOffsetsTimeoutMsCst, value);
}


/// <summary>
/// Enable the automatic resuffle with the default stream partitioner. (Default: true)
/// If the value is true, when sending to a topic, the partition is always recalculated based on the destination topic's partition count.
/// If the value is false, the default partitioner check if the destination topic has the same or more partitions as the source, if yes the source partition is used, even if the key has changed else partition is recalculated.
/// Check <see cref="DefaultStreamPartitioner{K,V}"/>
/// </summary>
[StreamConfigProperty("" + defaultPartitionerResuffleEveryKeyCst)]
public bool DefaultPartitionerResuffleEveryKey
{
get => configProperties[defaultPartitionerResuffleEveryKeyCst];
set => configProperties.AddOrUpdate(defaultPartitionerResuffleEveryKeyCst, value);
}

/// <summary>
/// Get the configs to the <see cref="IProducer{TKey, TValue}"/>
Expand Down
32 changes: 32 additions & 0 deletions test/Streamiz.Kafka.Net.Tests/Helpers/DictionaryExtensionsTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using System.Collections.Generic;
using NUnit.Framework;
using Streamiz.Kafka.Net.Crosscutting;

namespace Streamiz.Kafka.Net.Tests.Helpers;

public class DictionaryExtensionsTests
{
[Test]
public void ToUpdateDictionaryWithMergeOperatorTest()
{
var dictionary = new List<KeyValuePair<string, long>>
{
new("a", 1L),
new("b", 1L),
new("c", 1L),
new("d", 1L),
new("a", 2L),
new("b", 3L),
};

var results = dictionary.ToUpdateDictionary(
kv => kv.Key,
kv => kv.Value,
(@old, @new) => @new);

Assert.AreEqual(2L, results["a"]);
Assert.AreEqual(3L, results["b"]);
Assert.AreEqual(1L, results["c"]);
Assert.AreEqual(1L, results["d"]);
}
}
96 changes: 96 additions & 0 deletions test/Streamiz.Kafka.Net.Tests/Private/StreamTaskTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -455,5 +455,101 @@ public void StreamTaskWrittingCheckpoint()

Directory.Delete(config.StateDir, true);
}

[Test]
public void StreamTaskCheckpointOffsetTest()
{
var config = new StreamConfig<StringSerDes, StringSerDes>();
config.ApplicationId = "test-app";
config.StateDir = Path.Combine(".", Guid.NewGuid().ToString());

var serdes = new StringSerDes();
var builder = new StreamBuilder();

builder
.Stream("topic", new StringSerDes(), new StringSerDes())
.To("topic", new StringSerDes(), new StringSerDes());

TaskId id = new TaskId {Id = 0, Partition = 0};
var topology = builder.Build();
topology.Builder.RewriteTopology(config);

var processorTopology = topology.Builder.BuildTopology(id);

var supplier = new SyncKafkaSupplier();
var consumer = supplier.GetConsumer(config.ToConsumerConfig(), null);

var streamsProducer = new StreamsProducer(
config,
"thread-0",
Guid.NewGuid(),
supplier,
"log-prefix");

var tp = new TopicPartition("topic", 0);
StreamTask task = new StreamTask(
"thread-0",
id,
new List<TopicPartition>
{
tp ,
},
processorTopology,
consumer,
config,
supplier,
streamsProducer,
new MockChangelogRegister()
, new StreamMetricsRegistry());


void AddRecord(string key, string value, TopicPartition tp, long offset)
{
task.AddRecord( new ConsumeResult<byte[], byte[]> {
Message = new Message<byte[], byte[]>
{
Key = serdes.Serialize(key, new SerializationContext()),
Value = serdes.Serialize(value, new SerializationContext())
},
TopicPartitionOffset = new TopicPartitionOffset( tp, offset)
});
}

task.InitializeStateStores();
task.InitializeTopology();
task.RestorationIfNeeded();
task.CompleteRestoration();

AddRecord("key1", "value1", tp, 0L);
AddRecord("key2", "value2", tp,3L);
AddRecord("key3", "value3", tp,4L);
AddRecord("key4", "value4", tp,1L);

Assert.IsTrue(task.CanProcess(DateTime.Now.GetMilliseconds()));

while (task.CanProcess(DateTime.Now.GetMilliseconds()))
Assert.IsTrue(task.Process());

var checkpointOffsets = task.CheckpointableOffsets;

Assert.AreEqual(3L, checkpointOffsets[tp]);

AddRecord("key5", "value5", tp,5L);

Assert.IsTrue(task.CanProcess(DateTime.Now.GetMilliseconds()));

while (task.CanProcess(DateTime.Now.GetMilliseconds()))
Assert.IsTrue(task.Process());

var checkpointOffsets2 = task.CheckpointableOffsets;

Assert.AreEqual(5L, checkpointOffsets2[tp]);

task.Suspend();
task.Close(false);

if(Directory.Exists(config.StateDir))
Directory.Delete(config.StateDir, true);
}
}
}
5 changes: 5 additions & 0 deletions test/Streamiz.Kafka.Net.Tests/Processors/KStreamToTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ public class KStreamToTests
{
private class MyStreamPartitioner : IStreamPartitioner<string, string>
{
public void Initialize(IStreamConfig config)
{

}

public Partition Partition(string topic, string key, string value, Partition sourcePartition, int numPartitions)
{
switch (key)
Expand Down
Loading