Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/Kafka/Internal/StreamsRebalanceListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void PartitionsAssigned(IConsumer<byte[], byte[]> consumer, List<TopicPar
{
if (Thread.IsRunning)
{
log.LogInformation($"New partitions assign requested : {string.Join(",", partitions)}");
log.LogInformation($"[{Thread.Name}] - New partitions assign requested : {string.Join(",", partitions)}");

DateTime start = DateTime.Now;
manager.RebalanceInProgress = true;
Expand All @@ -44,7 +44,7 @@ public void PartitionsAssigned(IConsumer<byte[], byte[]> consumer, List<TopicPar
manager.RebalanceInProgress = false;

StringBuilder sb = new StringBuilder();
sb.AppendLine($"Partition assignment took {DateTime.Now - start} ms.");
sb.AppendLine($"[{Thread.Name}] - Partition assignment took {DateTime.Now - start} ms.");
sb.AppendLine(
$"\tCurrently assigned active tasks: {string.Join(",", this.manager.ActiveTaskIds)}");
log.LogInformation(sb.ToString());
Expand Down
1 change: 0 additions & 1 deletion core/KafkaStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,6 @@ await Task.Factory.StartNew(() =>
foreach (var innerE in e.InnerExceptions)
{
logger.Log(LogLevel.Error, innerE, $"{logPrefix}Error during initializing internal topics");
SetState(State.PENDING_SHUTDOWN);
SetState(State.ERROR);
}

Expand Down
29 changes: 15 additions & 14 deletions core/Metrics/StreamMetricsRegistry.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
Expand All @@ -15,15 +16,15 @@ namespace Streamiz.Kafka.Net.Metrics
/// </summary>
public class StreamMetricsRegistry
{
private IDictionary<string, Sensor> sensors = new Dictionary<string, Sensor>();
private ConcurrentDictionary<string, Sensor> sensors = new();
private IList<string> clientLevelSensors = new List<string>();
private IDictionary<string, IList<string>> threadLevelSensors = new Dictionary<string, IList<string>>();
private IDictionary<string, IList<string>> taskLevelSensors = new Dictionary<string, IList<string>>();
private IDictionary<string, IList<string>> nodeLevelSensors = new Dictionary<string, IList<string>>();
private IDictionary<string, IList<string>> storeLevelSensors = new Dictionary<string, IList<string>>();
private IDictionary<string, IList<string>> librdkafkaSensors = new Dictionary<string, IList<string>>();
private ConcurrentDictionary<string, IList<string>> threadLevelSensors = new();
private ConcurrentDictionary<string, IList<string>> taskLevelSensors = new();
private ConcurrentDictionary<string, IList<string>> nodeLevelSensors = new();
private ConcurrentDictionary<string, IList<string>> storeLevelSensors = new();
private ConcurrentDictionary<string, IList<string>> librdkafkaSensors = new();

private IDictionary<string, IList<string>> threadScopeSensors = new Dictionary<string, IList<string>>();
private ConcurrentDictionary<string, IList<string>> threadScopeSensors = new();

private readonly string clientId;
private readonly MetricsRecordingLevel recordingLevel;
Expand Down Expand Up @@ -274,7 +275,7 @@ internal void RemoveThreadSensors(string threadId)
{
sensors.RemoveAll(sensorKeys);
threadScopeSensors[threadId].RemoveAll(sensorKeys);
threadLevelSensors.Remove(key);
threadLevelSensors.TryRemove(key, out _);
}
}
}
Expand Down Expand Up @@ -324,7 +325,7 @@ internal void RemoveTaskSensors(string threadId, string taskId)
{
sensors.RemoveAll(sensorKeys);
threadScopeSensors[threadId].RemoveAll(sensorKeys);
taskLevelSensors.Remove(key);
taskLevelSensors.TryRemove(key, out _);
}
}
}
Expand Down Expand Up @@ -376,7 +377,7 @@ internal void RemoveNodeSensors(string threadId, string taskId, string processor
{
sensors.RemoveAll(sensorKeys);
threadScopeSensors[threadId].RemoveAll(sensorKeys);
nodeLevelSensors.Remove(key);
nodeLevelSensors.TryRemove(key, out _);
}
}
}
Expand Down Expand Up @@ -428,7 +429,7 @@ internal void RemoveStoreSensors(string threadId, string taskId, string storeNam
{
sensors.RemoveAll(sensorKeys);
threadScopeSensors[threadId].RemoveAll(sensorKeys);
storeLevelSensors.Remove(key);
storeLevelSensors.TryRemove(key, out _);
}
}
}
Expand Down Expand Up @@ -475,7 +476,7 @@ internal void RemoveLibrdKafkaSensors(string threadId, string librdKafkaClientId
{
sensors.RemoveAll(sensorKeys);
threadScopeSensors[threadId].RemoveAll(sensorKeys);
librdkafkaSensors.Remove(key);
librdkafkaSensors.TryRemove(key, out _);
}
}
}
Expand Down Expand Up @@ -568,7 +569,7 @@ private T GetSensor<T>(
if (!TestMetricsRecordingLevel(metricsRecordingLevel))
sensor.NoRunnable = true;

sensors.Add(name, sensor);
sensors.TryAdd(name, sensor);
return sensor;

}
Expand All @@ -579,7 +580,7 @@ private void AddSensorThreadScope(string threadId, string fullSensorName)
if (threadScopeSensors.TryGetValue(threadId, out var list))
list.Add(fullSensorName);
else
threadScopeSensors.Add(threadId, new List<string> { fullSensorName });
threadScopeSensors.TryAdd(threadId, new List<string> { fullSensorName });
}

internal IEnumerable<Sensor> GetThreadScopeSensor(string threadId)
Expand Down
15 changes: 14 additions & 1 deletion core/Processors/DefaultTopicManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;

Expand Down Expand Up @@ -51,6 +52,9 @@ async Task<IEnumerable<string>> Run()
var topicsNewCreated = new List<string>();
var topicsToCreate = new List<string>();

var clusterMetadata = AdminClient.GetMetadata(timeout);
log.LogDebug($"Metadata cluster : {clusterMetadata}");

// 1. get source topic partition
// 2. check if changelog exist, :
// 2.1 - if yes and partition number exactly same; continue;
Expand All @@ -59,6 +63,8 @@ async Task<IEnumerable<string>> Run()
foreach (var t in topics)
{
var metadata = AdminClient.GetMetadata(t.Key, timeout);
log.LogDebug($"Metadata topic {t.Key} : {metadata}");

var numberPartitions = GetNumberPartitionForTopic(metadata, t.Key);
if (numberPartitions == 0)
{
Expand Down Expand Up @@ -128,14 +134,21 @@ async Task<IEnumerable<string>> Run()
{
++i;
_e = e;
log.LogDebug(
log.LogInformation(
"Error when creating all internal topics: {Message}. Maybe an another instance of your application just created them. (try: {Try}, max retry : {MaxTry})",
e.Message, i + 1, maxRetry);

WaitRandomily((int)timeout.TotalSeconds);
}
}

throw new StreamsException(_e);
}

private static void WaitRandomily(int maxSecondsWaited)
{
Thread.Sleep(RandomGenerator.GetInt32(maxSecondsWaited) * 1000);
}

public void Dispose()
=> AdminClient.Dispose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ protected sealed override ApacheKafkaBuilder Init()
.WithEnvironment("KAFKA_PROCESS_ROLES", "broker,controller")
.WithEnvironment("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER")
.WithEnvironment("KAFKA_NODE_ID", "1")
.WithEnvironment("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false")
.WithEnvironment("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
.WithEnvironment("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1")
.WithEnvironment("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
Expand Down
67 changes: 61 additions & 6 deletions test/Streamiz.Kafka.Net.IntegrationTests/Fixtures/KafkaFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
using System.Threading.Tasks;
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using Streamiz.Kafka.Net.IntegrationTests.Seed;
using Streamiz.Kafka.Net.SerDes;
using Testcontainers.Kafka;

namespace Streamiz.Kafka.Net.IntegrationTests.Fixtures
Expand All @@ -18,7 +20,7 @@ public class KafkaFixture
public KafkaFixture()
{
container = new ApacheKafkaBuilder()
.WithImage(ApacheKafkaBuilder.APACHE_KAFKA_NATIVE_IMAGE_NAME)
//.WithImage(ApacheKafkaBuilder.APACHE_KAFKA_NATIVE_IMAGE_NAME)
.WithPortBinding(9092)
.WithName("kafka-streamiz-integration-tests")
.Build();
Expand Down Expand Up @@ -98,6 +100,44 @@ internal bool ConsumeUntil(string topic, int size, long timeoutMs)
return sizeCompleted;
}

internal Task ProduceContinuously<K, V>(
string topic,
ISeeder<K> keySeeder, ISeeder<V> valueSeeder,
Action<K, V> alterValueAfterGeneration,
ISerDes<K> keySerdes, ISerDes<V> valueSerdes,
TimeSpan pauseBetweenEachIteration,
KafkaStats stats,
CancellationToken token)
{
var task = Task.Factory.StartNew(async () =>
{
using var producer = new ProducerBuilder<byte[], byte[]>(ProducerProperties).Build();
while (!token.IsCancellationRequested)
{
var key = keySeeder.SeedOnce();
var value = valueSeeder.SeedOnce();
alterValueAfterGeneration(key, value);

var keyBytes = keySerdes.Serialize(key, new SerializationContext(MessageComponentType.Key, topic));
var valueBytes = valueSerdes.Serialize(value, new SerializationContext(MessageComponentType.Value, topic));

++stats.SentMessages;
var result = await producer.ProduceAsync(topic, new Message<byte[], byte[]>()
{
Key = keyBytes,
Value = valueBytes
});

if (result.Status == PersistenceStatus.Persisted)
++stats.MessagesCorrectlyPersisted;

Thread.Sleep(pauseBetweenEachIteration);
}
}, token);

return task;
}

internal async Task<DeliveryResult<string, byte[]>> Produce(string topic, string key, byte[] bytes)
{
using var producer = new ProducerBuilder<string, byte[]>(ProducerProperties).Build();
Expand Down Expand Up @@ -156,13 +196,28 @@ public async Task CreateTopic(string name, int partitions = 1)

using IAdminClient client = builder.Build();
var metadata = client.GetMetadata(name, TimeSpan.FromSeconds(10));
if(metadata.Topics.Any(t => t.Topic.Contains(name)))

var tpm = metadata.Topics
.FirstOrDefault(t => t.Topic.Contains(name) && !t.Error.IsBrokerError);

if(tpm != null)
await client.DeleteTopicsAsync(new List<string> { name });
await client.CreateTopicsAsync(new List<TopicSpecification>{new TopicSpecification
try
{
await client.CreateTopicsAsync(new List<TopicSpecification>
{
new()
{
Name = name,
NumPartitions = partitions
}
});
}
catch (CreateTopicsException e)
{
Name = name,
NumPartitions = partitions
}});
Console.WriteLine(e.Message);
/* nothing */
}
}

public Task DisposeAsync() => container.DisposeAsync().AsTask();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace Streamiz.Kafka.Net.IntegrationTests.Fixtures;

public class KafkaStats
{
public int SentMessages { get; set; }
public int MessagesCorrectlyPersisted { get; set; }

}
Loading
Loading