Skip to content

Commit 0d720b0

Browse files
committed
Add Aspire.Confluent.Kafka component
apply pr suggestions apply pr suggestions apply pr suggestions
1 parent e6588b9 commit 0d720b0

57 files changed

Lines changed: 3553 additions & 3 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Aspire.sln

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,20 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Aspire.MongoDB.Driver.Tests
172172
EndProject
173173
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ConfigurationSchemaGenerator.Tests", "tests\ConfigurationSchemaGenerator.Tests\ConfigurationSchemaGenerator.Tests.csproj", "{00FEA181-84C9-42A7-AC81-29A9F176A1A0}"
174174
EndProject
175+
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "KafkaBasic", "KafkaBasic", "{587D0C62-D596-4676-8081-3EFC72946D32}"
176+
EndProject
177+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Consumer", "samples\KafkaBasic\Consumer\Consumer.csproj", "{FAD7EB8D-549A-4989-BA0A-EA66F454AAC4}"
178+
EndProject
179+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaBasic.AppHost", "samples\KafkaBasic\KafkaBasic.AppHost\KafkaBasic.AppHost.csproj", "{51577092-DAC9-424E-A2E5-CE51BC58D827}"
180+
EndProject
181+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaBasic.ServiceDefaults", "samples\KafkaBasic\KafkaBasic.ServiceDefaults\KafkaBasic.ServiceDefaults.csproj", "{FD6EDF50-57B1-41ED-B3A9-2353A64BD2E5}"
182+
EndProject
183+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Producer", "samples\KafkaBasic\Producer\Producer.csproj", "{E72357DF-BB44-4E92-A70B-C354DB6D4F0B}"
184+
EndProject
185+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Aspire.Confluent.Kafka", "src\Components\Aspire.Confluent.Kafka\Aspire.Confluent.Kafka.csproj", "{174E0507-3BB0-4CDC-829E-9CA75DA66473}"
186+
EndProject
187+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Aspire.Confluent.Kafka.Tests", "tests\Aspire.Confluent.Kafka.Tests\Aspire.Confluent.Kafka.Tests.csproj", "{A8CB331A-1247-41D9-8118-538E5A2CC9DF}"
188+
EndProject
175189
Global
176190
GlobalSection(SolutionConfigurationPlatforms) = preSolution
177191
Debug|Any CPU = Debug|Any CPU
@@ -458,6 +472,30 @@ Global
458472
{00FEA181-84C9-42A7-AC81-29A9F176A1A0}.Debug|Any CPU.Build.0 = Debug|Any CPU
459473
{00FEA181-84C9-42A7-AC81-29A9F176A1A0}.Release|Any CPU.ActiveCfg = Release|Any CPU
460474
{00FEA181-84C9-42A7-AC81-29A9F176A1A0}.Release|Any CPU.Build.0 = Release|Any CPU
475+
{FAD7EB8D-549A-4989-BA0A-EA66F454AAC4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
476+
{FAD7EB8D-549A-4989-BA0A-EA66F454AAC4}.Debug|Any CPU.Build.0 = Debug|Any CPU
477+
{FAD7EB8D-549A-4989-BA0A-EA66F454AAC4}.Release|Any CPU.ActiveCfg = Release|Any CPU
478+
{FAD7EB8D-549A-4989-BA0A-EA66F454AAC4}.Release|Any CPU.Build.0 = Release|Any CPU
479+
{51577092-DAC9-424E-A2E5-CE51BC58D827}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
480+
{51577092-DAC9-424E-A2E5-CE51BC58D827}.Debug|Any CPU.Build.0 = Debug|Any CPU
481+
{51577092-DAC9-424E-A2E5-CE51BC58D827}.Release|Any CPU.ActiveCfg = Release|Any CPU
482+
{51577092-DAC9-424E-A2E5-CE51BC58D827}.Release|Any CPU.Build.0 = Release|Any CPU
483+
{FD6EDF50-57B1-41ED-B3A9-2353A64BD2E5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
484+
{FD6EDF50-57B1-41ED-B3A9-2353A64BD2E5}.Debug|Any CPU.Build.0 = Debug|Any CPU
485+
{FD6EDF50-57B1-41ED-B3A9-2353A64BD2E5}.Release|Any CPU.ActiveCfg = Release|Any CPU
486+
{FD6EDF50-57B1-41ED-B3A9-2353A64BD2E5}.Release|Any CPU.Build.0 = Release|Any CPU
487+
{E72357DF-BB44-4E92-A70B-C354DB6D4F0B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
488+
{E72357DF-BB44-4E92-A70B-C354DB6D4F0B}.Debug|Any CPU.Build.0 = Debug|Any CPU
489+
{E72357DF-BB44-4E92-A70B-C354DB6D4F0B}.Release|Any CPU.ActiveCfg = Release|Any CPU
490+
{E72357DF-BB44-4E92-A70B-C354DB6D4F0B}.Release|Any CPU.Build.0 = Release|Any CPU
491+
{174E0507-3BB0-4CDC-829E-9CA75DA66473}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
492+
{174E0507-3BB0-4CDC-829E-9CA75DA66473}.Debug|Any CPU.Build.0 = Debug|Any CPU
493+
{174E0507-3BB0-4CDC-829E-9CA75DA66473}.Release|Any CPU.ActiveCfg = Release|Any CPU
494+
{174E0507-3BB0-4CDC-829E-9CA75DA66473}.Release|Any CPU.Build.0 = Release|Any CPU
495+
{A8CB331A-1247-41D9-8118-538E5A2CC9DF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
496+
{A8CB331A-1247-41D9-8118-538E5A2CC9DF}.Debug|Any CPU.Build.0 = Debug|Any CPU
497+
{A8CB331A-1247-41D9-8118-538E5A2CC9DF}.Release|Any CPU.ActiveCfg = Release|Any CPU
498+
{A8CB331A-1247-41D9-8118-538E5A2CC9DF}.Release|Any CPU.Build.0 = Release|Any CPU
461499
EndGlobalSection
462500
GlobalSection(SolutionProperties) = preSolution
463501
HideSolutionNode = FALSE
@@ -537,6 +575,13 @@ Global
537575
{20A5A907-A135-4735-B4BF-E13514F360E3} = {27381127-6C45-4B4C-8F18-41FF48DFE4B2}
538576
{E592E447-BA3C-44FA-86C1-EBEDC864A644} = {4981B3A5-4AFD-4191-BF7D-8692D9783D60}
539577
{00FEA181-84C9-42A7-AC81-29A9F176A1A0} = {4981B3A5-4AFD-4191-BF7D-8692D9783D60}
578+
{587D0C62-D596-4676-8081-3EFC72946D32} = {D173887B-AF42-4576-B9C1-96B9E9B3D9C0}
579+
{FAD7EB8D-549A-4989-BA0A-EA66F454AAC4} = {587D0C62-D596-4676-8081-3EFC72946D32}
580+
{51577092-DAC9-424E-A2E5-CE51BC58D827} = {587D0C62-D596-4676-8081-3EFC72946D32}
581+
{FD6EDF50-57B1-41ED-B3A9-2353A64BD2E5} = {587D0C62-D596-4676-8081-3EFC72946D32}
582+
{E72357DF-BB44-4E92-A70B-C354DB6D4F0B} = {587D0C62-D596-4676-8081-3EFC72946D32}
583+
{174E0507-3BB0-4CDC-829E-9CA75DA66473} = {27381127-6C45-4B4C-8F18-41FF48DFE4B2}
584+
{A8CB331A-1247-41D9-8118-538E5A2CC9DF} = {4981B3A5-4AFD-4191-BF7D-8692D9783D60}
540585
EndGlobalSection
541586
GlobalSection(ExtensibilityGlobals) = postSolution
542587
SolutionGuid = {6DCEDFEC-988E-4CB3-B45B-191EB5086E0C}

Directory.Packages.props

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
<PackageVersion Include="AspNetCore.HealthChecks.Azure.Storage.Blobs" Version="8.0.0" />
3838
<PackageVersion Include="AspNetCore.HealthChecks.Azure.Storage.Queues" Version="8.0.0" />
3939
<PackageVersion Include="AspNetCore.HealthChecks.AzureServiceBus" Version="8.0.0" />
40+
<PackageVersion Include="AspNetCore.HealthChecks.Kafka" Version="8.0.0" />
4041
<PackageVersion Include="AspNetCore.HealthChecks.MongoDb" Version="8.0.0" />
4142
<PackageVersion Include="AspNetCore.HealthChecks.MySql" Version="8.0.0" />
4243
<PackageVersion Include="AspNetCore.HealthChecks.NpgSql" Version="8.0.0" />
@@ -64,6 +65,7 @@
6465
<PackageVersion Include="Microsoft.Extensions.Primitives" Version="$(MicrosoftExtensionsPrimitivesPackageVersion)" />
6566
<PackageVersion Include="Microsoft.Extensions.Http.Resilience" Version="$(MicrosoftExtensionsHttpResiliencePackageVersion)" />
6667
<!-- external dependencies -->
68+
<PackageVersion Include="Confluent.Kafka" Version="2.3.0" />
6769
<PackageVersion Include="Dapr.AspNetCore" Version="1.12.0" />
6870
<PackageVersion Include="DnsClient" Version="1.7.0" />
6971
<PackageVersion Include="Grpc.AspNetCore" Version="2.59.0" />
@@ -104,5 +106,7 @@
104106
<PackageVersion Include="Microsoft.DotNet.Build.Tasks.Workloads" Version="8.0.0-beta.23564.4" />
105107
<PackageVersion Include="Microsoft.Signed.Wix" Version="1.0.0-v3.14.0.5722" />
106108
<PackageVersion Include="Microsoft.DotNet.Build.Tasks.Installers" Version="8.0.0-beta.23564.4" />
109+
<!-- unit test dependencies -->
110+
<PackageVersion Include="Microsoft.Extensions.Diagnostics.Testing" Version="8.0.0" />
107111
</ItemGroup>
108112
</Project>
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
<Project Sdk="Microsoft.NET.Sdk.Worker">
2+
3+
<PropertyGroup>
4+
<OutputType>Exe</OutputType>
5+
<TargetFramework>net8.0</TargetFramework>
6+
<ImplicitUsings>enable</ImplicitUsings>
7+
<Nullable>enable</Nullable>
8+
</PropertyGroup>
9+
<ItemGroup>
10+
<PackageReference Include="Microsoft.Extensions.Hosting" />
11+
</ItemGroup>
12+
<ItemGroup>
13+
<ProjectReference Include="..\..\..\src\Components\Aspire.Confluent.Kafka\Aspire.Confluent.Kafka.csproj" />
14+
<ProjectReference Include="..\KafkaBasic.ServiceDefaults\KafkaBasic.ServiceDefaults.csproj" />
15+
</ItemGroup>
16+
</Project>
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
using Confluent.Kafka;
5+
6+
namespace Consumer;
7+
8+
internal sealed class ConsumerWorker(IConsumer<Ignore, string> consumer, ILogger<ConsumerWorker> logger) : BackgroundService
9+
{
10+
protected override Task ExecuteAsync(CancellationToken stoppingToken)
11+
{
12+
long i = 0;
13+
return Task.Factory.StartNew(async () =>
14+
{
15+
consumer.Subscribe("topic");
16+
while (!stoppingToken.IsCancellationRequested)
17+
{
18+
ConsumeResult<Ignore, string>? result = default;
19+
try
20+
{
21+
result = consumer.Consume(TimeSpan.FromSeconds(1));
22+
}
23+
catch (ConsumeException ex) when (ex.Error.Code == ErrorCode.UnknownTopicOrPart)
24+
{
25+
await Task.Delay(100);
26+
continue;
27+
}
28+
29+
i++;
30+
if (i % 1000 == 0)
31+
{
32+
logger.LogInformation($"Received {i} messages. current offset is '{result!.Offset}'");
33+
}
34+
}
35+
}, stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Current);
36+
}
37+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
using Confluent.Kafka;
5+
using Consumer;
6+
7+
var builder = Host.CreateApplicationBuilder(args);
8+
9+
builder.AddServiceDefaults();
10+
11+
builder.AddKafkaConsumer<Ignore, string>("kafka");
12+
13+
builder.Services.AddHostedService<ConsumerWorker>();
14+
15+
builder.Build().Run();
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
{
2+
"Logging": {
3+
"LogLevel": {
4+
"Default": "Information",
5+
"Microsoft.Hosting.Lifetime": "Information"
6+
}
7+
}
8+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
{
2+
"Logging": {
3+
"LogLevel": {
4+
"Default": "Information",
5+
"Microsoft.Hosting.Lifetime": "Information",
6+
"Azure": "Warning"
7+
}
8+
},
9+
"Aspire": {
10+
"Confluent": {
11+
"Kafka": {
12+
"Consumer": {
13+
"Config": {
14+
"AutoOffsetReset": "Earliest",
15+
"GroupId": "aspire"
16+
}
17+
}
18+
}
19+
}
20+
}
21+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
<Project>
2+
3+
<Import Project="$([MSBuild]::GetPathOfFileAbove('Directory.Build.props', '$(MSBuildThisFileDirectory)../'))" />
4+
5+
<!-- NOTE: This line is only required because we are using P2P references, not NuGet. It will not exist in real apps. -->
6+
<Import Project="../../../src/Aspire.Hosting/build/Aspire.Hosting.props" />
7+
8+
</Project>
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
<Project>
2+
3+
<Import Project="$([MSBuild]::GetPathOfFileAbove('Directory.Build.targets', '$(MSBuildThisFileDirectory)../'))" />
4+
5+
<!-- NOTE: This line is only required because we are using P2P references, not NuGet. It will not exist in real apps. -->
6+
<Import Project="../../../src/Aspire.Hosting/build/Aspire.Hosting.targets" />
7+
8+
</Project>
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<OutputType>Exe</OutputType>
5+
<TargetFramework>net8.0</TargetFramework>
6+
<ImplicitUsings>enable</ImplicitUsings>
7+
<Nullable>enable</Nullable>
8+
<IsAspireHost>true</IsAspireHost>
9+
</PropertyGroup>
10+
11+
<ItemGroup>
12+
<ProjectReference Include="..\..\..\src\Aspire.Hosting\Aspire.Hosting.csproj" />
13+
<ProjectReference Include="..\Consumer\Consumer.csproj" />
14+
<ProjectReference Include="..\Producer\Producer.csproj" />
15+
</ItemGroup>
16+
17+
</Project>

0 commit comments

Comments
 (0)