diff --git a/core/Kafka/Internal/StreamsRebalanceListener.cs b/core/Kafka/Internal/StreamsRebalanceListener.cs index 39b51e01..00d21f22 100644 --- a/core/Kafka/Internal/StreamsRebalanceListener.cs +++ b/core/Kafka/Internal/StreamsRebalanceListener.cs @@ -34,7 +34,7 @@ public void PartitionsAssigned(IConsumer consumer, List consumer, List foreach (var innerE in e.InnerExceptions) { logger.Log(LogLevel.Error, innerE, $"{logPrefix}Error during initializing internal topics"); - SetState(State.PENDING_SHUTDOWN); SetState(State.ERROR); } diff --git a/core/Metrics/StreamMetricsRegistry.cs b/core/Metrics/StreamMetricsRegistry.cs index 547703d5..510e8077 100644 --- a/core/Metrics/StreamMetricsRegistry.cs +++ b/core/Metrics/StreamMetricsRegistry.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.ObjectModel; using System.Linq; @@ -15,15 +16,15 @@ namespace Streamiz.Kafka.Net.Metrics /// public class StreamMetricsRegistry { - private IDictionary sensors = new Dictionary(); + private ConcurrentDictionary sensors = new(); private IList clientLevelSensors = new List(); - private IDictionary> threadLevelSensors = new Dictionary>(); - private IDictionary> taskLevelSensors = new Dictionary>(); - private IDictionary> nodeLevelSensors = new Dictionary>(); - private IDictionary> storeLevelSensors = new Dictionary>(); - private IDictionary> librdkafkaSensors = new Dictionary>(); + private ConcurrentDictionary> threadLevelSensors = new(); + private ConcurrentDictionary> taskLevelSensors = new(); + private ConcurrentDictionary> nodeLevelSensors = new(); + private ConcurrentDictionary> storeLevelSensors = new(); + private ConcurrentDictionary> librdkafkaSensors = new(); - private IDictionary> threadScopeSensors = new Dictionary>(); + private ConcurrentDictionary> threadScopeSensors = new(); private readonly string clientId; private readonly MetricsRecordingLevel recordingLevel; @@ -274,7 +275,7 @@ internal void RemoveThreadSensors(string threadId) { sensors.RemoveAll(sensorKeys); threadScopeSensors[threadId].RemoveAll(sensorKeys); - threadLevelSensors.Remove(key); + threadLevelSensors.TryRemove(key, out _); } } } @@ -324,7 +325,7 @@ internal void RemoveTaskSensors(string threadId, string taskId) { sensors.RemoveAll(sensorKeys); threadScopeSensors[threadId].RemoveAll(sensorKeys); - taskLevelSensors.Remove(key); + taskLevelSensors.TryRemove(key, out _); } } } @@ -376,7 +377,7 @@ internal void RemoveNodeSensors(string threadId, string taskId, string processor { sensors.RemoveAll(sensorKeys); threadScopeSensors[threadId].RemoveAll(sensorKeys); - nodeLevelSensors.Remove(key); + nodeLevelSensors.TryRemove(key, out _); } } } @@ -428,7 +429,7 @@ internal void RemoveStoreSensors(string threadId, string taskId, string storeNam { sensors.RemoveAll(sensorKeys); threadScopeSensors[threadId].RemoveAll(sensorKeys); - storeLevelSensors.Remove(key); + storeLevelSensors.TryRemove(key, out _); } } } @@ -475,7 +476,7 @@ internal void RemoveLibrdKafkaSensors(string threadId, string librdKafkaClientId { sensors.RemoveAll(sensorKeys); threadScopeSensors[threadId].RemoveAll(sensorKeys); - librdkafkaSensors.Remove(key); + librdkafkaSensors.TryRemove(key, out _); } } } @@ -568,7 +569,7 @@ private T GetSensor( if (!TestMetricsRecordingLevel(metricsRecordingLevel)) sensor.NoRunnable = true; - sensors.Add(name, sensor); + sensors.TryAdd(name, sensor); return sensor; } @@ -579,7 +580,7 @@ private void AddSensorThreadScope(string threadId, string fullSensorName) if (threadScopeSensors.TryGetValue(threadId, out var list)) list.Add(fullSensorName); else - threadScopeSensors.Add(threadId, new List { fullSensorName }); + threadScopeSensors.TryAdd(threadId, new List { fullSensorName }); } internal IEnumerable GetThreadScopeSensor(string threadId) diff --git a/core/Processors/DefaultTopicManager.cs b/core/Processors/DefaultTopicManager.cs index 0cdcbd4b..4d76dd02 100644 --- a/core/Processors/DefaultTopicManager.cs +++ b/core/Processors/DefaultTopicManager.cs @@ -5,6 +5,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; @@ -51,6 +52,9 @@ async Task> Run() var topicsNewCreated = new List(); var topicsToCreate = new List(); + var clusterMetadata = AdminClient.GetMetadata(timeout); + log.LogDebug($"Metadata cluster : {clusterMetadata}"); + // 1. get source topic partition // 2. check if changelog exist, : // 2.1 - if yes and partition number exactly same; continue; @@ -59,6 +63,8 @@ async Task> Run() foreach (var t in topics) { var metadata = AdminClient.GetMetadata(t.Key, timeout); + log.LogDebug($"Metadata topic {t.Key} : {metadata}"); + var numberPartitions = GetNumberPartitionForTopic(metadata, t.Key); if (numberPartitions == 0) { @@ -128,14 +134,21 @@ async Task> Run() { ++i; _e = e; - log.LogDebug( + log.LogInformation( "Error when creating all internal topics: {Message}. Maybe an another instance of your application just created them. (try: {Try}, max retry : {MaxTry})", e.Message, i + 1, maxRetry); + + WaitRandomily((int)timeout.TotalSeconds); } } throw new StreamsException(_e); } + + private static void WaitRandomily(int maxSecondsWaited) + { + Thread.Sleep(RandomGenerator.GetInt32(maxSecondsWaited) * 1000); + } public void Dispose() => AdminClient.Dispose(); diff --git a/test/Streamiz.Kafka.Net.IntegrationTests/Fixtures/ApacheKafkaBuilder.cs b/test/Streamiz.Kafka.Net.IntegrationTests/Fixtures/ApacheKafkaBuilder.cs index e84b6ea0..bbbf5c0c 100644 --- a/test/Streamiz.Kafka.Net.IntegrationTests/Fixtures/ApacheKafkaBuilder.cs +++ b/test/Streamiz.Kafka.Net.IntegrationTests/Fixtures/ApacheKafkaBuilder.cs @@ -64,6 +64,7 @@ protected sealed override ApacheKafkaBuilder Init() .WithEnvironment("KAFKA_PROCESS_ROLES", "broker,controller") .WithEnvironment("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER") .WithEnvironment("KAFKA_NODE_ID", "1") + .WithEnvironment("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false") .WithEnvironment("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1") .WithEnvironment("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1") .WithEnvironment("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1") diff --git a/test/Streamiz.Kafka.Net.IntegrationTests/Fixtures/KafkaFixture.cs b/test/Streamiz.Kafka.Net.IntegrationTests/Fixtures/KafkaFixture.cs index 9b97c71f..54bfe1f7 100644 --- a/test/Streamiz.Kafka.Net.IntegrationTests/Fixtures/KafkaFixture.cs +++ b/test/Streamiz.Kafka.Net.IntegrationTests/Fixtures/KafkaFixture.cs @@ -7,6 +7,8 @@ using System.Threading.Tasks; using Confluent.Kafka; using Confluent.Kafka.Admin; +using Streamiz.Kafka.Net.IntegrationTests.Seed; +using Streamiz.Kafka.Net.SerDes; using Testcontainers.Kafka; namespace Streamiz.Kafka.Net.IntegrationTests.Fixtures @@ -18,7 +20,7 @@ public class KafkaFixture public KafkaFixture() { container = new ApacheKafkaBuilder() - .WithImage(ApacheKafkaBuilder.APACHE_KAFKA_NATIVE_IMAGE_NAME) + //.WithImage(ApacheKafkaBuilder.APACHE_KAFKA_NATIVE_IMAGE_NAME) .WithPortBinding(9092) .WithName("kafka-streamiz-integration-tests") .Build(); @@ -98,6 +100,44 @@ internal bool ConsumeUntil(string topic, int size, long timeoutMs) return sizeCompleted; } + internal Task ProduceContinuously( + string topic, + ISeeder keySeeder, ISeeder valueSeeder, + Action alterValueAfterGeneration, + ISerDes keySerdes, ISerDes valueSerdes, + TimeSpan pauseBetweenEachIteration, + KafkaStats stats, + CancellationToken token) + { + var task = Task.Factory.StartNew(async () => + { + using var producer = new ProducerBuilder(ProducerProperties).Build(); + while (!token.IsCancellationRequested) + { + var key = keySeeder.SeedOnce(); + var value = valueSeeder.SeedOnce(); + alterValueAfterGeneration(key, value); + + var keyBytes = keySerdes.Serialize(key, new SerializationContext(MessageComponentType.Key, topic)); + var valueBytes = valueSerdes.Serialize(value, new SerializationContext(MessageComponentType.Value, topic)); + + ++stats.SentMessages; + var result = await producer.ProduceAsync(topic, new Message() + { + Key = keyBytes, + Value = valueBytes + }); + + if (result.Status == PersistenceStatus.Persisted) + ++stats.MessagesCorrectlyPersisted; + + Thread.Sleep(pauseBetweenEachIteration); + } + }, token); + + return task; + } + internal async Task> Produce(string topic, string key, byte[] bytes) { using var producer = new ProducerBuilder(ProducerProperties).Build(); @@ -156,13 +196,28 @@ public async Task CreateTopic(string name, int partitions = 1) using IAdminClient client = builder.Build(); var metadata = client.GetMetadata(name, TimeSpan.FromSeconds(10)); - if(metadata.Topics.Any(t => t.Topic.Contains(name))) + + var tpm = metadata.Topics + .FirstOrDefault(t => t.Topic.Contains(name) && !t.Error.IsBrokerError); + + if(tpm != null) await client.DeleteTopicsAsync(new List { name }); - await client.CreateTopicsAsync(new List{new TopicSpecification + try + { + await client.CreateTopicsAsync(new List + { + new() + { + Name = name, + NumPartitions = partitions + } + }); + } + catch (CreateTopicsException e) { - Name = name, - NumPartitions = partitions - }}); + Console.WriteLine(e.Message); + /* nothing */ + } } public Task DisposeAsync() => container.DisposeAsync().AsTask(); diff --git a/test/Streamiz.Kafka.Net.IntegrationTests/Fixtures/KafkaStats.cs b/test/Streamiz.Kafka.Net.IntegrationTests/Fixtures/KafkaStats.cs new file mode 100644 index 00000000..fb309cb8 --- /dev/null +++ b/test/Streamiz.Kafka.Net.IntegrationTests/Fixtures/KafkaStats.cs @@ -0,0 +1,8 @@ +namespace Streamiz.Kafka.Net.IntegrationTests.Fixtures; + +public class KafkaStats +{ + public int SentMessages { get; set; } + public int MessagesCorrectlyPersisted { get; set; } + +} \ No newline at end of file diff --git a/test/Streamiz.Kafka.Net.IntegrationTests/LargeConcurrencyTests.cs b/test/Streamiz.Kafka.Net.IntegrationTests/LargeConcurrencyTests.cs new file mode 100644 index 00000000..32ced46c --- /dev/null +++ b/test/Streamiz.Kafka.Net.IntegrationTests/LargeConcurrencyTests.cs @@ -0,0 +1,191 @@ +using System; +using System.Collections.Generic; +using System.Reflection.Metadata; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Bogus; +using Confluent.Kafka; +using ICSharpCode.SharpZipLib.Tar; +using NUnit.Framework; +using Streamiz.Kafka.Net.IntegrationTests.Fixtures; +using Streamiz.Kafka.Net.IntegrationTests.Seed; +using Streamiz.Kafka.Net.Processors.Public; +using Streamiz.Kafka.Net.SerDes; +using Streamiz.Kafka.Net.State; + +namespace Streamiz.Kafka.Net.IntegrationTests; + +public sealed class LargeConcurrencyTests +{ + private class KeySeeder : ISeeder + { + private readonly Faker faker; + + public KeySeeder() + { + faker = new Faker(); + } + + public string SeedOnce() + => $"sess_{faker.Random.AlphaNumeric(2)}"; + + } + + private class SessionSeeder : ISeeder + { + private readonly Faker faker; + + public SessionSeeder() + { + faker = new Faker("en") + .RuleFor(s => s.Id, f => f.Random.Guid().ToString()) + .RuleFor(s => s.Date, f => f.Date.Recent(30)) + .RuleFor(s => s.SessionId, f => $"sess_{f.Random.AlphaNumeric(2)}") + .RuleFor(s => s.User, f => f.Internet.UserName()); + } + + public SessionState SeedOnce() + => faker.Generate(); + } + + private class SessionProcessor : ITransformer + { + private IKeyValueStore store1; + private IKeyValueStore store2; + + public void Init(ProcessorContext context) + { + store1 = (IKeyValueStore)context.GetStateStore("store1"); + store2 = (IKeyValueStore)context.GetStateStore("store2"); + } + + public Record Process(Record record) + { + store1.Put(record.Key, record.Value); + store2.Put(record.Key, Encoding.UTF8.GetBytes(record.Value.User)); + return record; + } + + public void Close() + { + + } + } + + private class SessionState + { + public string Id { get; set; } + public DateTime Date { get; set; } + public string SessionId { get; set; } + public string User { get; set; } + } + + private KafkaFixture kafkaFixture; + + [SetUp] + public void Setup() + { + kafkaFixture = new KafkaFixture(); + Console.WriteLine("Starting"); + kafkaFixture.InitializeAsync().Wait(TimeSpan.FromMinutes(5)); + Console.WriteLine("Started"); + } + + [TearDown] + public void TearDown() + { + Console.WriteLine("Pending shutdown"); + kafkaFixture.DisposeAsync().Wait(TimeSpan.FromMinutes(5)); + Console.WriteLine("Shutdown"); + } + + [Test] + [NonParallelizable] + public async Task TestSimpleTopology() + { + var tokenSource = new CancellationTokenSource(); + var config = new StreamConfig> + { + ApplicationId = $"test-concurrency-{Guid.NewGuid().ToString()}", + BootstrapServers = kafkaFixture.BootstrapServers, + AutoOffsetReset = AutoOffsetReset.Earliest, + NumStreamThreads = Math.Min(Environment.ProcessorCount, 4), + Guarantee = ProcessingGuarantee.AT_LEAST_ONCE, + CommitIntervalMs = StreamConfig.EOS_DEFAULT_COMMIT_INTERVAL_MS * 10 * 5, // 5 seconds + AllowAutoCreateTopics = false, + CompressionType = CompressionType.Lz4 + }; + + await kafkaFixture.CreateTopic("source", 30); + await kafkaFixture.CreateTopic("output", 30); + + var builder = new StreamBuilder(); + + var sourceStream = builder + .Stream("source"); + + var store1Builder = Stores.KeyValueStoreBuilder( + Stores.PersistentKeyValueStore("store1"), + new StringSerDes(), + new JsonSerDes()) + .WithLoggingEnabled(new Dictionary()) // Changelog for EOS + .WithCachingEnabled(); // Reduce write amplification + + builder.AddStateStore(store1Builder); + + var store2Builder = Stores.KeyValueStoreBuilder( + Stores.PersistentKeyValueStore("store2"), + new StringSerDes(), + new ByteArraySerDes()) + .WithLoggingEnabled(new Dictionary()); + + builder.AddStateStore(store2Builder); + + sourceStream.Transform( + TransformerBuilder.New() + .Transformer() + .Build(), null, "store1", "store2") + .To>("output"); + + KafkaStream.State stream1State = KafkaStream.State.CREATED, stream2State = KafkaStream.State.CREATED; + var stream1 = new KafkaStream(builder.Build(), config); + stream1.StateChanged += (_old, _new) => stream1State = _new; + var stream2 = new KafkaStream(builder.Build(), config); + stream1.StateChanged += (_old, _new) => stream2State = _new; + + var stats = new KafkaStats(); + var task = kafkaFixture.ProduceContinuously( + "source", + new KeySeeder(), + new SessionSeeder(), + (key, session) => session.SessionId = key, + new StringSerDes(), + new JsonSerDes(), + TimeSpan.FromMilliseconds(1), + stats, + tokenSource.Token); + + await stream1.StartAsync(); + Thread.Sleep(TimeSpan.FromSeconds(4)); + + await stream2.StartAsync(); + Thread.Sleep(TimeSpan.FromSeconds(4)); + + stream2.Dispose(); + Thread.Sleep(TimeSpan.FromSeconds(4)); + + Assert.AreNotEqual(KafkaStream.State.ERROR ,stream1State); + Assert.AreNotEqual(KafkaStream.State.ERROR ,stream2State); + + await tokenSource.CancelAsync(); + await task; + + var result = kafkaFixture.ConsumeUntil("output", stats.MessagesCorrectlyPersisted, 1000 * 120); + + stream1.Dispose(); + stream2.Dispose(); + + Assert.IsTrue(result); + } +} \ No newline at end of file diff --git a/test/Streamiz.Kafka.Net.IntegrationTests/Seed/ISeeder.cs b/test/Streamiz.Kafka.Net.IntegrationTests/Seed/ISeeder.cs new file mode 100644 index 00000000..e789eddd --- /dev/null +++ b/test/Streamiz.Kafka.Net.IntegrationTests/Seed/ISeeder.cs @@ -0,0 +1,6 @@ +namespace Streamiz.Kafka.Net.IntegrationTests.Seed; + +public interface ISeeder +{ + public T SeedOnce(); +} \ No newline at end of file diff --git a/test/Streamiz.Kafka.Net.IntegrationTests/Streamiz.Kafka.Net.IntegrationTests.csproj b/test/Streamiz.Kafka.Net.IntegrationTests/Streamiz.Kafka.Net.IntegrationTests.csproj index 079a2d4a..ab9b8664 100644 --- a/test/Streamiz.Kafka.Net.IntegrationTests/Streamiz.Kafka.Net.IntegrationTests.csproj +++ b/test/Streamiz.Kafka.Net.IntegrationTests/Streamiz.Kafka.Net.IntegrationTests.csproj @@ -7,6 +7,7 @@ + all runtime; build; native; contentfiles; analyzers; buildtransitive