Skip to content

Commit d2d222a

Browse files
committed
Add metrics support
1 parent 0553092 commit d2d222a

16 files changed

Lines changed: 476 additions & 45 deletions

samples/kafkaBasic/Consumer/ConsumerWorker.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,19 @@ internal sealed class ConsumerWorker(IConsumer<string, string> consumer, ILogger
99
{
1010
protected override Task ExecuteAsync(CancellationToken stoppingToken)
1111
{
12+
long i = 0;
1213
return Task.Factory.StartNew(() =>
1314
{
1415
consumer.Subscribe("topic");
1516
while (!stoppingToken.IsCancellationRequested)
1617
{
1718
var result = consumer.Consume();
18-
logger.LogInformation($"Received message '{result.Message.Value}'.");
19+
20+
i++;
21+
if (i % 1000 == 0)
22+
{
23+
logger.LogInformation($"Received {i} messages. current offset is '{result.Offset}'");
24+
}
1925
}
2026
}, stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Current);
2127
}

samples/kafkaBasic/Consumer/appsettings.json

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,10 @@
1010
"Aspire": {
1111
"Kafka": {
1212
"Consumer": {
13-
"AutoOffsetReset": "Earliest",
14-
"GroupId": "aspire"
13+
"Config": {
14+
"AutoOffsetReset": "Earliest",
15+
"GroupId": "aspire"
16+
}
1517
}
1618
}
1719
}

samples/kafkaBasic/Producer/ProducerWorker.cs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,20 @@ internal sealed class ProducerWorker(IProducer<string, string> producer, ILogger
99
{
1010
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
1111
{
12-
using var timer = new PeriodicTimer(TimeSpan.FromSeconds(1));
1312
long i = 0;
14-
while (await timer.WaitForNextTickAsync(stoppingToken))
13+
while (!stoppingToken.IsCancellationRequested)
1514
{
16-
var message = new Message<string, string> { Value = $"Hello, World! {i}" };
17-
await producer.ProduceAsync("topic", message, stoppingToken);
18-
logger.LogInformation($"Sent message '{message.Value}'");
19-
i++;
15+
for (int j = 0; j < 1000; j++, i++)
16+
{
17+
var message = new Message<string, string> { Value = $"Hello, World! {i}" };
18+
producer.Produce("topic", message);
19+
}
20+
21+
producer.Flush(TimeSpan.FromSeconds(10));
22+
23+
logger.LogInformation("Sent 1000 messages, waiting 10 s");
24+
25+
await Task.Delay(10000, stoppingToken);
2026
}
2127
}
2228
}

samples/kafkaBasic/Producer/appsettings.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@
66
"Azure": "Warning"
77
}
88
},
9-
109
"Aspire": {
1110
"Kafka": {
1211
"Producer": {
13-
"Acks": "All"
12+
"Config": {
13+
"Acks": "All"
14+
}
1415
}
1516
}
1617
}
Lines changed: 59 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
// Licensed to the .NET Foundation under one or more agreements.
22
// The .NET Foundation licenses this file to you under the MIT license.
33

4+
using Aspire.Kafka;
5+
using System.Text.Json;
46
using Aspire.Kafka.Consumer;
57
using Confluent.Kafka;
68
using Microsoft.Extensions.Configuration;
79
using Microsoft.Extensions.DependencyInjection;
10+
using Microsoft.Extensions.DependencyInjection.Extensions;
11+
using Microsoft.Extensions.Logging;
812

913
namespace Microsoft.Extensions.Hosting;
1014

@@ -19,68 +23,109 @@ public static class AspireKafkaConsumerExtensions
1923
/// Registers <see cref="IConsumer{TKey,TValue}"/> as a singleton in the services provided by the <paramref name="builder"/>.
2024
/// </summary>
2125
/// <param name="builder">The <see cref="IHostApplicationBuilder" /> to read config from and add services to.</param>
22-
/// <param name="connectionName">A name used to retrieve the connection string from the ConnectionStrings configuration section.</param> ///
26+
/// <param name="connectionName">A name used to retrieve the connection string from the ConnectionStrings configuration section.</param>
27+
/// <param name="configureKafkaConsumerSettings"></param>
2328
/// <param name="configureConsumerBuilder">A method used for customizing the <see cref="ConsumerBuilder{TKey,TValue}"/>.</param>
2429
/// <param name="configureConsumerConfig">An optional method that can be used for customizing the <see cref="ConsumerConfig"/>. It's invoked after the settings are read from the configuration.</param>
2530
/// <remarks>Reads the configuration from "Aspire:Kafka:Consumer" section.</remarks>
26-
public static void AddKafkaConsumer<TKey, TValue>(this IHostApplicationBuilder builder, string connectionName, Action<ConsumerBuilder<TKey, TValue>>? configureConsumerBuilder = null, Action<ConsumerConfig>? configureConsumerConfig = null)
27-
=> AddKafkaConsumer(builder, DefaultConfigSectionName, configureConsumerBuilder, configureConsumerConfig, connectionName, serviceKey: null);
31+
public static void AddKafkaConsumer<TKey, TValue>(this IHostApplicationBuilder builder, string connectionName, Action<KafkaConsumerSettings>? configureKafkaConsumerSettings = null, Action<ConsumerBuilder<TKey, TValue>>? configureConsumerBuilder = null, Action<ConsumerConfig>? configureConsumerConfig = null)
32+
=> AddKafkaConsumer(builder, DefaultConfigSectionName, configureKafkaConsumerSettings, configureConsumerBuilder, configureConsumerConfig, connectionName, serviceKey: null);
2833

2934
/// <summary>
3035
/// Registers <see cref="IConsumer{TKey,TValue}"/> as a keyed singleton for the given <paramref name="name"/> in the services provided by the <paramref name="builder"/>.
3136
/// </summary>
3237
/// <param name="builder">The <see cref="IHostApplicationBuilder" /> to read config from and add services to.</param>
3338
/// <param name="name">The name of the component, which is used as the <see cref="ServiceDescriptor.ServiceKey"/> of the service and also to retrieve the connection string from the ConnectionStrings configuration section.</param>
39+
/// <param name="configureKafkaConsumerSettings"></param>
3440
/// <param name="configureConsumerBuilder">A method used for customizing the <see cref="ConsumerBuilder{TKey,TValue}"/>.</param>
3541
/// <param name="configureConsumerConfig">An optional method that can be used for customizing the <see cref="ConsumerConfig"/>. It's invoked after the settings are read from the configuration.</param>
3642
/// <remarks>Reads the configuration from "Aspire:Kafka:Consumer:{name}" section.</remarks>
37-
public static void AddKeyedKafkaConsumer<TKey, TValue>(this IHostApplicationBuilder builder, string name, Action<ConsumerBuilder<TKey, TValue>>? configureConsumerBuilder = null, Action<ConsumerConfig>? configureConsumerConfig = null)
43+
public static void AddKeyedKafkaConsumer<TKey, TValue>(this IHostApplicationBuilder builder, string name, Action<KafkaConsumerSettings>? configureKafkaConsumerSettings = null, Action<ConsumerBuilder<TKey, TValue>>? configureConsumerBuilder = null, Action<ConsumerConfig>? configureConsumerConfig = null)
3844
{
3945
ArgumentException.ThrowIfNullOrEmpty(name);
4046

41-
AddKafkaConsumer(builder, $"{DefaultConfigSectionName}:{name}", configureConsumerBuilder, configureConsumerConfig, connectionName: name, serviceKey: name);
47+
AddKafkaConsumer(builder, $"{DefaultConfigSectionName}:{name}", configureKafkaConsumerSettings, configureConsumerBuilder, configureConsumerConfig, connectionName: name, serviceKey: name);
4248
}
4349

4450
private static void AddKafkaConsumer<TKey, TValue>(
4551
IHostApplicationBuilder builder,
4652
string configurationSectionName,
53+
Action<KafkaConsumerSettings>? configureKafkaConsumerSettings,
4754
Action<ConsumerBuilder<TKey, TValue>>? configureConsumerBuilder,
4855
Action<ConsumerConfig>? configureConsumerConfig,
4956
string connectionName,
5057
string? serviceKey)
5158
{
5259
ArgumentNullException.ThrowIfNull(builder);
5360

54-
var configSection = builder.Configuration.GetSection(configurationSectionName);
55-
5661
ConsumerConfig config = new();
62+
// First get the config from the config section
63+
var configSection = builder.Configuration.GetSection($"{configurationSectionName}:Config");
5764
configSection.Bind(config);
65+
// Then override with the optional configureProducerConfig
66+
configureConsumerConfig?.Invoke(config);
5867

68+
KafkaConsumerSettings settings = new();
69+
// First get the settings from the config section
70+
var kafkaProducerSettingsSection = builder.Configuration.GetSection($"{configurationSectionName}");
71+
kafkaProducerSettingsSection.Bind(settings);
5972
if (builder.Configuration.GetConnectionString(connectionName) is string connectionString)
6073
{
61-
config.BootstrapServers = connectionString;
74+
settings.ConnectionString = connectionString;
6275
}
76+
// Then override with the optional configureProducerSettings
77+
configureKafkaConsumerSettings?.Invoke(settings);
6378

64-
configureConsumerConfig?.Invoke(config);
79+
// Apply the settings to the config
80+
settings.Apply(config);
6581

66-
ConsumerBuilder<TKey, TValue> CreateConsumerBuilder()
82+
ConsumerBuilder<TKey, TValue> CreateConsumerBuilder(IServiceProvider serviceProvider)
6783
{
6884
ConsumerBuilder<TKey, TValue> consumerBuilder = new(config);
6985
configureConsumerBuilder?.Invoke(consumerBuilder);
86+
if (settings.Metrics)
87+
{
88+
consumerBuilder.SetStatisticsHandler(BuildStatisticsHandler<TKey, TValue>(serviceProvider));
89+
}
7090
return consumerBuilder;
7191
}
7292

73-
ConsumerConnectionFactory<TKey, TValue> consumerConnectionFactory = new(CreateConsumerBuilder(), config);
74-
7593
if (serviceKey is null)
7694
{
77-
builder.Services.AddSingleton<ConsumerConnectionFactory<TKey, TValue>>(_ => consumerConnectionFactory);
95+
builder.Services.AddSingleton<ConsumerConnectionFactory<TKey, TValue>>(sp => new(CreateConsumerBuilder(sp), config));
7896
builder.Services.AddSingleton<IConsumer<TKey, TValue>>(sp => sp.GetRequiredService<ConsumerConnectionFactory<TKey, TValue>>().Create());
7997
}
8098
else
8199
{
82-
builder.Services.AddKeyedSingleton<ConsumerConnectionFactory<TKey, TValue>>(serviceKey, (sp, key) => consumerConnectionFactory);
100+
builder.Services.AddKeyedSingleton<ConsumerConnectionFactory<TKey, TValue>>(serviceKey, (sp, key) => new(CreateConsumerBuilder(sp), config));
83101
builder.Services.AddKeyedSingleton<IConsumer<TKey, TValue>>(serviceKey, (sp, key) => sp.GetRequiredKeyedService<ConsumerConnectionFactory<TKey, TValue>>(key).Create());
84102
}
103+
104+
if (settings.Metrics)
105+
{
106+
builder.Services.TryAddSingleton<ConsumerMetrics>();
107+
builder.Services.AddOpenTelemetry()
108+
.WithMetrics(metricBuilderProvider => metricBuilderProvider.AddMeter(ConsumerMetrics.MeterName));
109+
}
110+
}
111+
112+
private static Action<IConsumer<TKey, TValue>, string> BuildStatisticsHandler<TKey, TValue>(IServiceProvider sp)
113+
{
114+
ConsumerMetrics metrics = sp.GetRequiredService<ConsumerMetrics>();
115+
return (_, json) =>
116+
{
117+
if (string.IsNullOrEmpty(json))
118+
{
119+
return;
120+
}
121+
122+
Statistics? statistics = JsonSerializer.Deserialize(json, typeof(Statistics), StatisticsJsonSerializerContext.Default) as Statistics;
123+
if (statistics == null)
124+
{
125+
return;
126+
}
127+
128+
metrics.Update(statistics);
129+
};
85130
}
86131
}

src/Components/Aspire.Kafka.Consumer/ConsumerConnectionFactory.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public ConsumerConnectionFactory(ConsumerBuilder<TKey, TValue> consumerBuilder,
2424
}
2525
_consumerBuilder = consumerBuilder;
2626
}
27+
2728
public ConsumerConfig Config => _consumerConfig;
2829

2930
public IConsumer<TKey, TValue> Create() => _consumerBuilder.Build();
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
// Licensed to the .NET Foundation under one or more agreements.
5+
// The .NET Foundation licenses this file to you under the MIT license.
6+
7+
using System.Diagnostics;
8+
using System.Diagnostics.Metrics;
9+
10+
namespace Aspire.Kafka.Consumer;
11+
12+
internal sealed class ConsumerMetrics
13+
{
14+
public const string MeterName = "Aspire.Kafka.Consumer";
15+
16+
private readonly Meter _meter;
17+
private Statistics? _statistics;
18+
19+
private Measurement<long> _replyQueueMeasurement;
20+
private readonly ObservableGauge<long> _replyQueue;
21+
22+
private Measurement<long> _messageCountMeasurement;
23+
private readonly ObservableGauge<long> _messageCount;
24+
private Measurement<long> _messageSizeMeasurement;
25+
private readonly ObservableGauge<long> _messageSize;
26+
27+
private readonly Counter<long> _tx;
28+
private readonly Counter<long> _txBytes;
29+
private readonly Counter<long> _txMessages;
30+
private readonly Counter<long> _txMessageBytes;
31+
32+
private readonly Counter<long> _rx;
33+
private readonly Counter<long> _rxBytes;
34+
private readonly Counter<long> _rxMessages;
35+
private readonly Counter<long> _rxMessageBytes;
36+
37+
private readonly IMeterFactory _meterFactory;
38+
39+
public ConsumerMetrics(IMeterFactory meterFactory)
40+
{
41+
_meterFactory = meterFactory;
42+
_meter = _meterFactory.Create(MeterName);
43+
44+
_replyQueue = _meter.CreateObservableGauge<long>(Counters.ReplyQueue, () => _replyQueueMeasurement);
45+
_messageCount = _meter.CreateObservableGauge<long>(Counters.MessageCount, () => _messageCountMeasurement);
46+
_messageSize = _meter.CreateObservableGauge<long>(Counters.MessageSize, () => _messageSizeMeasurement);
47+
48+
_tx = _meter.CreateCounter<long>(Counters.Tx);
49+
_txBytes = _meter.CreateCounter<long>(Counters.TxBytes);
50+
_txMessages = _meter.CreateCounter<long>(Counters.TxMessages);
51+
_txMessageBytes = _meter.CreateCounter<long>(Counters.TxMessageBytes);
52+
_rx = _meter.CreateCounter<long>(Counters.Rx);
53+
_rxBytes = _meter.CreateCounter<long>(Counters.RxBytes);
54+
_rxMessages = _meter.CreateCounter<long>(Counters.RxMessages);
55+
_rxMessageBytes = _meter.CreateCounter<long>(Counters.RxMessageBytes);
56+
}
57+
58+
internal void Update(Statistics statistics)
59+
{
60+
TagList tags = new()
61+
{
62+
{ Tags.ClientId, statistics.ClientId },
63+
{ Tags.Type, statistics.Type },
64+
{ Tags.Name, statistics.Name }
65+
};
66+
67+
_replyQueueMeasurement = new Measurement<long>(statistics.ReplyQueue, tags);
68+
_messageCountMeasurement = new Measurement<long>(statistics.MessageCount, tags);
69+
_messageSizeMeasurement = new Measurement<long>(statistics.MessageSize, tags);
70+
71+
_tx.Add(statistics.Tx - _statistics?.Tx ?? 0, tags);
72+
_txBytes.Add(statistics.TxBytes - _statistics?.TxBytes ?? 0, tags);
73+
_txMessages.Add(statistics.TxMessages - _statistics?.TxMessages ?? 0, tags);
74+
_txMessageBytes.Add(statistics.TxMessageBytes - _statistics?.TxMessageBytes ?? 0, tags);
75+
_rx.Add(statistics.Rx - _statistics?.Rx ?? 0, tags);
76+
_rxBytes.Add(statistics.RxBytes - _statistics?.RxBytes ?? 0, tags);
77+
_rxMessages.Add(statistics.RxMessages - _statistics?.RxMessages ?? 0, tags);
78+
_rxMessageBytes.Add(statistics.RxMessageBytes - _statistics?.RxMessageBytes ?? 0, tags);
79+
80+
_statistics = statistics;
81+
}
82+
83+
public static class Counters
84+
{
85+
public const string ReplyQueue = "replyq";
86+
public const string MessageCount = "msg_cnt";
87+
public const string MessageSize = "msg_size";
88+
public const string Tx = "tx";
89+
public const string TxBytes = "tx_bytes";
90+
public const string Rx = "rx";
91+
public const string RxBytes = "rx_bytes";
92+
public const string TxMessages = "txmsgs";
93+
public const string TxMessageBytes = "txmsg_bytes";
94+
public const string RxMessages = "rxmsgs";
95+
public const string RxMessageBytes = "rxmsg_bytes";
96+
}
97+
98+
public static class Tags
99+
{
100+
public const string ClientId = "client_id";
101+
public const string Type = "type";
102+
public const string Name = "name";
103+
}
104+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
using Confluent.Kafka;
5+
6+
namespace Aspire.Kafka.Consumer;
7+
8+
public class KafkaConsumerSettings
9+
{
10+
public string? ConnectionString { get; set; }
11+
12+
public bool Metrics { get; set; } = true;
13+
14+
internal void Apply(ConsumerConfig config)
15+
{
16+
if (ConnectionString is not null)
17+
{
18+
config.BootstrapServers = ConnectionString;
19+
}
20+
21+
if (Metrics)
22+
{
23+
config.StatisticsIntervalMs = 1000;
24+
}
25+
}
26+
}

0 commit comments

Comments
 (0)