From 5f1198661d38e1400f40d29982fee3bb0feb65fb Mon Sep 17 00:00:00 2001 From: Thorsten Thiel Date: Wed, 25 Sep 2024 22:29:11 +0200 Subject: [PATCH 1/2] Add postgres integration test --- .github/workflows/dotnet.yml | 9 + .../Fluss.PostgreSQL.IntegrationTest.csproj | 34 ++ .../PostgreSQLTest.cs | 457 ++++++++++++++++++ .../appsettings.json | 5 + .../docker-compose.yaml | 9 + src/Fluss.PostgreSQL/Fluss.PostgreSQL.csproj | 1 - .../PostgreSQLEventRepository.cs | 15 +- .../ServiceCollectionExtensions.cs | 4 +- src/Fluss.sln | 6 + src/Fluss/Fluss.csproj | 1 - 10 files changed, 531 insertions(+), 10 deletions(-) create mode 100644 src/Fluss.PostgreSQL.IntegrationTest/Fluss.PostgreSQL.IntegrationTest.csproj create mode 100644 src/Fluss.PostgreSQL.IntegrationTest/PostgreSQLTest.cs create mode 100644 src/Fluss.PostgreSQL.IntegrationTest/appsettings.json create mode 100644 src/Fluss.PostgreSQL.IntegrationTest/docker-compose.yaml diff --git a/.github/workflows/dotnet.yml b/.github/workflows/dotnet.yml index 9c857b8..d03c80f 100644 --- a/.github/workflows/dotnet.yml +++ b/.github/workflows/dotnet.yml @@ -45,6 +45,15 @@ jobs: - name: Test run: dotnet test --collect:"XPlat Code Coverage" --no-restore --no-build working-directory: ./src/Fluss.UnitTest + - name: Start PostgreSQL + run: docker compose up -d + working-directory: ./src/Fluss.PostgreSQL.IntegrationTest + - name: Test PostgreSQL + run: dotnet test --collect:"XPlat Code Coverage" --no-restore --no-build + working-directory: ./src/Fluss.PostgreSQL.IntegrationTest + - name: Stop PostgreSQL + run: docker compose down + working-directory: ./src/Fluss.PostgreSQL.IntegrationTest - name: Upload coverage reports to Codecov uses: codecov/codecov-action@v4 env: diff --git a/src/Fluss.PostgreSQL.IntegrationTest/Fluss.PostgreSQL.IntegrationTest.csproj b/src/Fluss.PostgreSQL.IntegrationTest/Fluss.PostgreSQL.IntegrationTest.csproj new file mode 100644 index 0000000..b801c77 --- /dev/null +++ b/src/Fluss.PostgreSQL.IntegrationTest/Fluss.PostgreSQL.IntegrationTest.csproj @@ -0,0 +1,34 @@ + + + + net9.0;net8.0 + enable + enable + false + + + + + + + + + + + + + + + + + + + + + + + + PreserveNewest + + + diff --git a/src/Fluss.PostgreSQL.IntegrationTest/PostgreSQLTest.cs b/src/Fluss.PostgreSQL.IntegrationTest/PostgreSQLTest.cs new file mode 100644 index 0000000..502e4d0 --- /dev/null +++ b/src/Fluss.PostgreSQL.IntegrationTest/PostgreSQLTest.cs @@ -0,0 +1,457 @@ +using Fluss.Events; +using Fluss.Upcasting; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; +using Npgsql; +using Xunit.Abstractions; + +namespace Fluss.PostgreSQL.IntegrationTest; + +public class PostgreSQLTest : IAsyncLifetime +{ + private readonly ITestOutputHelper _testOutputHelper; + private string _dbName = null!; + private string _managementConnectionString = null!; + private string _connectionString = null!; + + public PostgreSQLTest(ITestOutputHelper testOutputHelper) + { + _testOutputHelper = testOutputHelper; + } + + public async Task InitializeAsync() + { + var config = new ConfigurationBuilder() + .AddJsonFile("appsettings.json", false, false) + .Build(); + + _managementConnectionString = config.GetConnectionString("DefaultConnection")!; + _dbName = "test" + Guid.NewGuid().ToString().Replace("-", ""); + + await using var npgsqlConnection = new NpgsqlConnection(_managementConnectionString); + await npgsqlConnection.OpenAsync(); + + await using var command = new NpgsqlCommand("CREATE DATABASE " + _dbName, npgsqlConnection); + await command.ExecuteNonQueryAsync(); + + _connectionString = new NpgsqlConnectionStringBuilder(_managementConnectionString) + { + Database = _dbName + }.ConnectionString; + } + + [Fact] + public async Task SimpleTest() + { + var sc = new ServiceCollection(); + sc.AddEventSourcing(); + sc.AddPostgresEventSourcingRepository(_connectionString); + + var sp = sc.BuildServiceProvider(); + + await sp.GetRequiredService().StartAsync(default); + await sp.GetRequiredService().StartAsync(default); + await sp.GetRequiredService().WaitForFinish(); + + var eventRepository = sp.GetRequiredService(); + await eventRepository.Publish([ + new EventEnvelope + { + Event = new TestEvent(42), + Version = 0, + At = DateTimeOffset.UtcNow, + By = null + } + ]); + + var version = await eventRepository.GetLatestVersion(); + Assert.Equal(0, version); + + var events = await eventRepository.GetEvents(-1, 0); + Assert.Single(events); + + Assert.Equal(new TestEvent(42), events[0].Span[0].Event); + } + + [Fact] + public async Task TestGetRawEvents() + { + var sc = new ServiceCollection(); + sc.AddEventSourcing(); + sc.AddPostgresEventSourcingRepository(_connectionString); + + var sp = sc.BuildServiceProvider(); + + await sp.GetRequiredService().StartAsync(default); + await sp.GetRequiredService().StartAsync(default); + await sp.GetRequiredService().WaitForFinish(); + + var eventRepository = sp.GetRequiredService(); + var baseEventRepository = sp.GetRequiredService(); + + // Publish some events + await eventRepository.Publish([ + new EventEnvelope + { + Event = new TestEvent(1), + Version = 0, + At = DateTimeOffset.UtcNow, + By = null + }, + new EventEnvelope + { + Event = new TestEvent(2), + Version = 1, + At = DateTimeOffset.UtcNow, + By = null + } + ]); + + // Get raw events + var rawEvents = await baseEventRepository.GetRawEvents(); + var eventList = rawEvents.ToList(); + + Assert.Equal(2, eventList.Count); + Assert.Equal(0, eventList[0].Version); + Assert.Equal(1, eventList[1].Version); + } + + [Fact] + public async Task TestReplaceEvent() + { + var sc = new ServiceCollection(); + sc.AddEventSourcing(); + sc.AddPostgresEventSourcingRepository(_connectionString); + + var sp = sc.BuildServiceProvider(); + + await sp.GetRequiredService().StartAsync(default); + await sp.GetRequiredService().StartAsync(default); + await sp.GetRequiredService().WaitForFinish(); + + var baseEventRepository = (PostgreSQLEventRepository)sp.GetRequiredService(); + + // Publish an event + await baseEventRepository.Publish([ + new EventEnvelope + { + Event = new TestEvent(1), + Version = 0, + At = DateTimeOffset.UtcNow, + By = null + } + ]); + + // Replace the event + var newEvent = new RawEventEnvelope + { + Version = 0, + At = DateTimeOffset.UtcNow, + By = null, + RawEvent = JObject.FromObject(new TestEvent(2), JsonSerializer.Create(PostgreSQLEventRepository.JsonSerializerSettings)) + }; + + await baseEventRepository.ReplaceEvent(0, [newEvent]); + + // Verify the event was replaced + var events = await baseEventRepository.GetEvents(-1, 0); + Assert.Single(events); + Assert.Equal(new TestEvent(2), events[0].Span[0].Event); + } + + [Fact] + public async Task TestReplaceEventWithMultiple() + { + var sc = new ServiceCollection(); + sc.AddEventSourcing(); + sc.AddPostgresEventSourcingRepository(_connectionString); + + var sp = sc.BuildServiceProvider(); + + await sp.GetRequiredService().StartAsync(default); + await sp.GetRequiredService().StartAsync(default); + + await sp.GetRequiredService().WaitForFinish(); + + var baseEventRepository = sp.GetRequiredService(); + + // Publish an initial event + await baseEventRepository.Publish([ + new EventEnvelope + { + Event = new TestEvent(1), + Version = 0, + At = DateTimeOffset.UtcNow, + By = null + } + ]); + + // Replace the event with multiple events + var newEvents = new List + { + new RawEventEnvelope + { + Version = 0, + At = DateTimeOffset.UtcNow, + By = null, + RawEvent = JObject.FromObject(new TestEvent(2), JsonSerializer.Create(PostgreSQLEventRepository.JsonSerializerSettings)) + }, + new RawEventEnvelope + { + Version = 1, + At = DateTimeOffset.UtcNow, + By = null, + RawEvent = JObject.FromObject(new TestEvent(3), JsonSerializer.Create(PostgreSQLEventRepository.JsonSerializerSettings)) + } + }; + + await baseEventRepository.ReplaceEvent(0, newEvents); + + // Verify the events were replaced + var events = await baseEventRepository.GetEvents(-1, 1); + Assert.Equal(2, events[0].Length); + Assert.Equal(new TestEvent(2), events[0].Span[0].Event); + Assert.Equal(new TestEvent(3), events[0].Span[1].Event); + } + + [Fact] + public async Task TestUpcaster() + { + var sc = new ServiceCollection(); + sc.AddEventSourcing(); + sc.AddPostgresEventSourcingRepository(_connectionString); + + await using (var sp = sc.BuildServiceProvider()) + { + var migrator = sp.GetRequiredService(); + + await migrator.StartAsync(default); + await migrator.WaitForFinish(); + + var repository = sp.GetRequiredService(); + await repository.Publish([ + new EventEnvelope + { + Event = new TestEvent(1), + Version = 0, + At = DateTimeOffset.UtcNow, + By = null + } + ]); + } + + sc.AddUpcaster(); + + await using (var sp = sc.BuildServiceProvider()) + { + var migrator = sp.GetRequiredService(); + var upcaster = sp.GetRequiredService(); + + await migrator.StartAsync(default); + await migrator.WaitForFinish(); + await upcaster.StartAsync(default); + await upcaster.ExecuteTask!; + + var repository = sp.GetRequiredService(); + var events = await repository.GetEvents(-1, 0); + Assert.Single(events); + Assert.Equal(new TestEvent2(1), events[0].Span[0].Event); + } + } + + [Fact] + public async Task TestNewEventsSubscription() + { + var sc = new ServiceCollection(); + sc.AddEventSourcing(); + sc.AddPostgresEventSourcingRepository(_connectionString); + + await using var sp = sc.BuildServiceProvider(); + + var migrator = sp.GetRequiredService(); + await migrator.StartAsync(default); + await migrator.WaitForFinish(); + + var repository = sp.GetRequiredService(); + + var eventRaised = new TaskCompletionSource(); + + void Handler(object? sender, EventArgs args) + { + try + { + eventRaised.SetResult(true); + } + catch (Exception) + { + // ignored + } + } + + repository.NewEvents += Handler; + + try + { + await repository.Publish([ + new EventEnvelope + { + Event = new TestEvent(1), + Version = 0, + At = DateTimeOffset.UtcNow, + By = null + } + ]); + + // Wait for the event to be raised or timeout after 5 seconds + var eventRaisedTask = eventRaised.Task; + var timeoutTask = Task.Delay(TimeSpan.FromSeconds(5)); + + var completedTask = await Task.WhenAny(eventRaisedTask, timeoutTask); + + Assert.Equal(eventRaisedTask, completedTask); + Assert.True(await eventRaisedTask, "NewEvents event was not raised"); + } + finally + { + repository.NewEvents -= Handler; + } + + // Test removing the event handler + var secondEventRaised = new TaskCompletionSource(); + repository.NewEvents += (_, _) => + { + try + { + secondEventRaised.SetResult(true); + } + catch (Exception) + { + // ignored + } + }; + + await repository.Publish([ + new EventEnvelope + { + Event = new TestEvent(2), + Version = 1, + At = DateTimeOffset.UtcNow, + By = null + } + ]); + + var secondEventRaisedTask = secondEventRaised.Task; + var secondTimeoutTask = Task.Delay(TimeSpan.FromSeconds(5)); + + var secondCompletedTask = await Task.WhenAny(secondEventRaisedTask, secondTimeoutTask); + + Assert.Equal(secondEventRaisedTask, secondCompletedTask); + Assert.True(await secondEventRaisedTask, "NewEvents event was not raised after removing a handler"); + } + + [Fact] + public async Task TestDatabaseNotificationForwarding() + { + var sc1 = new ServiceCollection(); + sc1.AddEventSourcing(); + sc1.AddPostgresEventSourcingRepository(_connectionString); + + var sc2 = new ServiceCollection(); + sc2.AddEventSourcing(); + sc2.AddPostgresEventSourcingRepository(_connectionString); + + await using var sp1 = sc1.BuildServiceProvider(); + await using var sp2 = sc2.BuildServiceProvider(); + + var migrator = sp1.GetRequiredService(); + await migrator.StartAsync(default); + await migrator.WaitForFinish(); + + var repository1 = sp1.GetRequiredService(); + var repository2 = sp2.GetRequiredService(); + + var eventRaised1 = new TaskCompletionSource(); + var eventRaised2 = new TaskCompletionSource(); + + repository1.NewEvents += (_, _) => + { + try + { + eventRaised1.TrySetResult(true); + } + catch (Exception) + { + // ignored + } + }; + repository2.NewEvents += (_, _) => + { + try + { + eventRaised2.TrySetResult(true); + } + catch (Exception) + { + // ignored + } + }; + + // Publish an event using the first repository + await repository1.Publish([ + new EventEnvelope + { + Event = new TestEvent(1), + Version = 0, + At = DateTimeOffset.UtcNow, + By = null + } + ]); + + // Wait for both event handlers to be triggered or timeout after 5 seconds + var timeoutTask = Task.Delay(TimeSpan.FromSeconds(5)); + var allTasks = await Task.WhenAny( + Task.WhenAll(eventRaised1.Task, eventRaised2.Task), + timeoutTask + ); + + Assert.NotEqual(timeoutTask, allTasks); + Assert.True(await eventRaised1.Task, "NewEvents event was not raised on the first repository"); + Assert.True(await eventRaised2.Task, "NewEvents event was not raised on the second repository"); + } + + public async Task DisposeAsync() + { + await using var npgsqlConnection = new NpgsqlConnection(_managementConnectionString); + await npgsqlConnection.OpenAsync(); + + await using var command = new NpgsqlCommand($"DROP DATABASE {_dbName} WITH (FORCE)", npgsqlConnection); + await command.ExecuteNonQueryAsync(); + } + + public record TestEvent(int Test) : Event; + public record TestEvent2(int Test) : Event; + + public record TestEventUpcaster : IUpcaster + { + public IEnumerable? Upcast(JObject eventJson) + { + var eventType = eventJson["$type"]?.Value(); + if (eventType == typeof(TestEvent).AssemblyQualifiedName) + { + var eventJson2 = new JObject(eventJson) + { + ["$type"] = typeof(TestEvent2).AssemblyQualifiedName + }; + + return + [ + eventJson2 + ]; + } + + return null; + } + } +} diff --git a/src/Fluss.PostgreSQL.IntegrationTest/appsettings.json b/src/Fluss.PostgreSQL.IntegrationTest/appsettings.json new file mode 100644 index 0000000..c644e43 --- /dev/null +++ b/src/Fluss.PostgreSQL.IntegrationTest/appsettings.json @@ -0,0 +1,5 @@ +{ + "ConnectionStrings": { + "DefaultConnection": "Host=localhost;Port=5432;Database=postgres;Username=postgres;Password=postgres" + } +} \ No newline at end of file diff --git a/src/Fluss.PostgreSQL.IntegrationTest/docker-compose.yaml b/src/Fluss.PostgreSQL.IntegrationTest/docker-compose.yaml new file mode 100644 index 0000000..2113418 --- /dev/null +++ b/src/Fluss.PostgreSQL.IntegrationTest/docker-compose.yaml @@ -0,0 +1,9 @@ +services: + database: + image: postgres:15 + environment: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + POSTGRES_DB: postgres + ports: + - "5432:5432" \ No newline at end of file diff --git a/src/Fluss.PostgreSQL/Fluss.PostgreSQL.csproj b/src/Fluss.PostgreSQL/Fluss.PostgreSQL.csproj index eee5d38..66ec37d 100644 --- a/src/Fluss.PostgreSQL/Fluss.PostgreSQL.csproj +++ b/src/Fluss.PostgreSQL/Fluss.PostgreSQL.csproj @@ -11,7 +11,6 @@ https://github.com/atmina/fluss git MIT - true diff --git a/src/Fluss.PostgreSQL/PostgreSQLEventRepository.cs b/src/Fluss.PostgreSQL/PostgreSQLEventRepository.cs index 2a0fe74..c764a8a 100644 --- a/src/Fluss.PostgreSQL/PostgreSQLEventRepository.cs +++ b/src/Fluss.PostgreSQL/PostgreSQLEventRepository.cs @@ -1,6 +1,7 @@ using System.Collections.ObjectModel; using System.Data; using System.Diagnostics; +using System.Text.Json; using Fluss.Events; using Fluss.Exceptions; using Newtonsoft.Json; @@ -13,17 +14,17 @@ namespace Fluss.PostgreSQL; public partial class PostgreSQLEventRepository : IBaseEventRepository { private readonly NpgsqlDataSource dataSource; + internal static readonly JsonSerializerSettings JsonSerializerSettings = new() + { + TypeNameHandling = TypeNameHandling.All, + TypeNameAssemblyFormatHandling = TypeNameAssemblyFormatHandling.Full, + MetadataPropertyHandling = MetadataPropertyHandling.ReadAhead + }; public PostgreSQLEventRepository(PostgreSQLConfig config) { var dataSourceBuilder = new NpgsqlDataSourceBuilder(config.ConnectionString); - dataSourceBuilder.UseJsonNet(settings: new JsonSerializerSettings - { - TypeNameHandling = TypeNameHandling.All, - TypeNameAssemblyFormatHandling = TypeNameAssemblyFormatHandling.Full, - MetadataPropertyHandling = - MetadataPropertyHandling.ReadAhead // While this is marked as a performance hit, profiling approves - }); + dataSourceBuilder.UseJsonNet(settings: JsonSerializerSettings); dataSource = dataSourceBuilder.Build(); } diff --git a/src/Fluss.PostgreSQL/ServiceCollectionExtensions.cs b/src/Fluss.PostgreSQL/ServiceCollectionExtensions.cs index f7ff5d4..629e695 100644 --- a/src/Fluss.PostgreSQL/ServiceCollectionExtensions.cs +++ b/src/Fluss.PostgreSQL/ServiceCollectionExtensions.cs @@ -7,6 +7,7 @@ [assembly: InternalsVisibleTo("Fluss.UnitTest")] +[assembly: InternalsVisibleTo("Fluss.PostgreSQL.IntegrationTest")] namespace Fluss.PostgreSQL; @@ -28,7 +29,8 @@ public static IServiceCollection AddPostgresEventSourcingRepository(this IServic .AddSingleton(new PostgreSQLConfig(connectionString)) .AddSingleton() .AddHostedService(sp => sp.GetRequiredService()) - .AddHostedService(); + .AddSingleton() + .AddHostedService(sp => sp.GetRequiredService()); } } diff --git a/src/Fluss.sln b/src/Fluss.sln index 0ae3d82..12cb03d 100644 --- a/src/Fluss.sln +++ b/src/Fluss.sln @@ -14,6 +14,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Fluss.Regen", "Fluss.Regen\ EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Benchmark", "Benchmark\Benchmark.csproj", "{970AB461-4D44-4B99-AACA-7DAA569D4C34}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Fluss.PostgreSQL.IntegrationTest", "Fluss.PostgreSQL.IntegrationTest\Fluss.PostgreSQL.IntegrationTest.csproj", "{3210592D-7B76-4D90-BAAC-7B9CBF7E580D}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -48,5 +50,9 @@ Global {970AB461-4D44-4B99-AACA-7DAA569D4C34}.Debug|Any CPU.Build.0 = Debug|Any CPU {970AB461-4D44-4B99-AACA-7DAA569D4C34}.Release|Any CPU.ActiveCfg = Release|Any CPU {970AB461-4D44-4B99-AACA-7DAA569D4C34}.Release|Any CPU.Build.0 = Release|Any CPU + {3210592D-7B76-4D90-BAAC-7B9CBF7E580D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {3210592D-7B76-4D90-BAAC-7B9CBF7E580D}.Debug|Any CPU.Build.0 = Debug|Any CPU + {3210592D-7B76-4D90-BAAC-7B9CBF7E580D}.Release|Any CPU.ActiveCfg = Release|Any CPU + {3210592D-7B76-4D90-BAAC-7B9CBF7E580D}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection EndGlobal diff --git a/src/Fluss/Fluss.csproj b/src/Fluss/Fluss.csproj index 0c42f56..fc920a9 100644 --- a/src/Fluss/Fluss.csproj +++ b/src/Fluss/Fluss.csproj @@ -11,7 +11,6 @@ https://github.com/atmina/fluss git MIT - true From f4f4d669ba9313522f27721b67da7253b3144892 Mon Sep 17 00:00:00 2001 From: Thorsten Thiel Date: Sun, 6 Oct 2024 11:48:44 +0200 Subject: [PATCH 2/2] WIP [skip-ci] --- src/Benchmark/Bench.cs | 6 +-- .../PostgreSQLTest.cs | 16 +++--- src/Fluss.PostgreSQL/Fluss.PostgreSQL.csproj | 3 -- .../PostgreSQLEventRepository.cs | 27 ++++------ .../PostgreSQLEventRepositorySubscriptions.cs | 2 +- .../Core/Events/EventSerializerTest.cs | 18 +++++++ .../Upcasting/EventUpcasterServiceTest.cs | 51 +++++++++++-------- .../Core/Upcasting/UpcasterSorterTest.cs | 14 ++--- src/Fluss.UnitTest/Regen/RegenTests.cs | 6 +-- src/Fluss/Events/Event.cs | 12 ----- src/Fluss/Events/EventEnvelope.cs | 5 +- src/Fluss/Events/EventSerializer.cs | 37 ++++++++++++++ src/Fluss/Events/InMemoryEventRepository.cs | 8 ++- src/Fluss/Fluss.csproj | 1 - src/Fluss/Upcasting/IUpcaster.cs | 4 +- 15 files changed, 123 insertions(+), 87 deletions(-) create mode 100644 src/Fluss.UnitTest/Core/Events/EventSerializerTest.cs create mode 100644 src/Fluss/Events/EventSerializer.cs diff --git a/src/Benchmark/Bench.cs b/src/Benchmark/Bench.cs index 4574084..1a0eb4d 100644 --- a/src/Benchmark/Bench.cs +++ b/src/Benchmark/Bench.cs @@ -11,13 +11,13 @@ namespace Benchmark; -[GitJob("879573d", baseline: true, id: "0_before")] -[SimpleJob(id: "1_after")] +[GitJob("HEAD", baseline: true, id: "0_before", iterationCount: 10)] +[SimpleJob(id: "1_after", iterationCount: 10)] [RPlotExporter] [MemoryDiagnoser] public class Bench { - [Params("postgres", "in-memory")] + [Params("postgres")] public string StorageType { get; set; } = "in-memory"; [IterationSetup] diff --git a/src/Fluss.PostgreSQL.IntegrationTest/PostgreSQLTest.cs b/src/Fluss.PostgreSQL.IntegrationTest/PostgreSQLTest.cs index 502e4d0..0046131 100644 --- a/src/Fluss.PostgreSQL.IntegrationTest/PostgreSQLTest.cs +++ b/src/Fluss.PostgreSQL.IntegrationTest/PostgreSQLTest.cs @@ -1,4 +1,6 @@ -using Fluss.Events; +using System.Text.Json; +using System.Text.Json.Nodes; +using Fluss.Events; using Fluss.Upcasting; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; @@ -150,7 +152,7 @@ await baseEventRepository.Publish([ Version = 0, At = DateTimeOffset.UtcNow, By = null, - RawEvent = JObject.FromObject(new TestEvent(2), JsonSerializer.Create(PostgreSQLEventRepository.JsonSerializerSettings)) + RawEvent = EventSerializer.Serialize(new TestEvent(2)) }; await baseEventRepository.ReplaceEvent(0, [newEvent]); @@ -196,14 +198,14 @@ await baseEventRepository.Publish([ Version = 0, At = DateTimeOffset.UtcNow, By = null, - RawEvent = JObject.FromObject(new TestEvent(2), JsonSerializer.Create(PostgreSQLEventRepository.JsonSerializerSettings)) + RawEvent = EventSerializer.Serialize(new TestEvent(2)) }, new RawEventEnvelope { Version = 1, At = DateTimeOffset.UtcNow, By = null, - RawEvent = JObject.FromObject(new TestEvent(3), JsonSerializer.Create(PostgreSQLEventRepository.JsonSerializerSettings)) + RawEvent = EventSerializer.Serialize(new TestEvent(3)) } }; @@ -435,12 +437,12 @@ public record TestEvent2(int Test) : Event; public record TestEventUpcaster : IUpcaster { - public IEnumerable? Upcast(JObject eventJson) + public IEnumerable? Upcast(JsonObject eventJson) { - var eventType = eventJson["$type"]?.Value(); + var eventType = eventJson["$type"]!.GetValue(); if (eventType == typeof(TestEvent).AssemblyQualifiedName) { - var eventJson2 = new JObject(eventJson) + var eventJson2 = new JsonObject(eventJson) { ["$type"] = typeof(TestEvent2).AssemblyQualifiedName }; diff --git a/src/Fluss.PostgreSQL/Fluss.PostgreSQL.csproj b/src/Fluss.PostgreSQL/Fluss.PostgreSQL.csproj index 66ec37d..f513885 100644 --- a/src/Fluss.PostgreSQL/Fluss.PostgreSQL.csproj +++ b/src/Fluss.PostgreSQL/Fluss.PostgreSQL.csproj @@ -1,5 +1,4 @@  - net8.0;net9.0 enable @@ -17,7 +16,6 @@ - @@ -27,5 +25,4 @@ - diff --git a/src/Fluss.PostgreSQL/PostgreSQLEventRepository.cs b/src/Fluss.PostgreSQL/PostgreSQLEventRepository.cs index c764a8a..1551fe0 100644 --- a/src/Fluss.PostgreSQL/PostgreSQLEventRepository.cs +++ b/src/Fluss.PostgreSQL/PostgreSQLEventRepository.cs @@ -1,11 +1,9 @@ using System.Collections.ObjectModel; using System.Data; using System.Diagnostics; -using System.Text.Json; +using System.Text.Json.Nodes; using Fluss.Events; using Fluss.Exceptions; -using Newtonsoft.Json; -using Newtonsoft.Json.Linq; using Npgsql; using NpgsqlTypes; @@ -13,19 +11,14 @@ namespace Fluss.PostgreSQL; public partial class PostgreSQLEventRepository : IBaseEventRepository { - private readonly NpgsqlDataSource dataSource; - internal static readonly JsonSerializerSettings JsonSerializerSettings = new() - { - TypeNameHandling = TypeNameHandling.All, - TypeNameAssemblyFormatHandling = TypeNameAssemblyFormatHandling.Full, - MetadataPropertyHandling = MetadataPropertyHandling.ReadAhead - }; + private readonly NpgsqlDataSource _dataSource; + + public PostgreSQLEventRepository(PostgreSQLConfig config) { var dataSourceBuilder = new NpgsqlDataSourceBuilder(config.ConnectionString); - dataSourceBuilder.UseJsonNet(settings: JsonSerializerSettings); - dataSource = dataSourceBuilder.Build(); + _dataSource = dataSourceBuilder.Build(); } private async ValueTask Publish(IReadOnlyList envelopes, Func eventExtractor, @@ -35,7 +28,7 @@ private async ValueTask Publish(IReadOnlyList envelopes, F activity?.SetTag("EventSourcing.EventRepository", nameof(PostgreSQLEventRepository)); // await using var connection has the side-effect that our connection passed from the outside is also disposed, so we split this up. - await using var freshConnection = dataSource.OpenConnection(); + await using var freshConnection = _dataSource.OpenConnection(); var connection = conn ?? freshConnection; activity?.AddEvent(new ActivityEvent("Connection open")); @@ -90,7 +83,7 @@ public async ValueTask Publish(IReadOnlyList envelopes) private async ValueTask WithReader(long fromExclusive, long toInclusive, Func> action) { - await using var connection = dataSource.OpenConnection(); + await using var connection = _dataSource.OpenConnection(); await using var cmd = new NpgsqlCommand( """ @@ -150,7 +143,7 @@ public async ValueTask> GetRawEvents() Version = reader.GetInt64(0), At = reader.GetDateTime(1), By = reader.IsDBNull(2) ? null : reader.GetGuid(2), - RawEvent = reader.GetFieldValue(3), + RawEvent = reader.GetFieldValue(3), }); } @@ -162,7 +155,7 @@ public async ValueTask ReplaceEvent(long at, IEnumerable newEn { var envelopes = newEnvelopes.ToList(); - await using var connection = dataSource.OpenConnection(); + await using var connection = _dataSource.OpenConnection(); await using var transaction = connection.BeginTransaction(); await using var deleteCommand = @@ -232,7 +225,7 @@ public async ValueTask GetLatestVersion() private async Task GetLatestVersionImpl() { - await using var connection = dataSource.OpenConnection(); + await using var connection = _dataSource.OpenConnection(); await using var cmd = new NpgsqlCommand("""SELECT MAX("Version") FROM "Events";""", connection); await cmd.PrepareAsync(); diff --git a/src/Fluss.PostgreSQL/PostgreSQLEventRepositorySubscriptions.cs b/src/Fluss.PostgreSQL/PostgreSQLEventRepositorySubscriptions.cs index ad0fcca..ce5172b 100644 --- a/src/Fluss.PostgreSQL/PostgreSQLEventRepositorySubscriptions.cs +++ b/src/Fluss.PostgreSQL/PostgreSQLEventRepositorySubscriptions.cs @@ -33,7 +33,7 @@ private async Task InitializeTrigger() } _triggerInitialized = true; - await using var listenConnection = dataSource.CreateConnection(); + await using var listenConnection = _dataSource.CreateConnection(); await listenConnection.OpenAsync(_cancellationTokenSource.Token); listenConnection.Notification += (_, _) => diff --git a/src/Fluss.UnitTest/Core/Events/EventSerializerTest.cs b/src/Fluss.UnitTest/Core/Events/EventSerializerTest.cs new file mode 100644 index 0000000..8d4ee61 --- /dev/null +++ b/src/Fluss.UnitTest/Core/Events/EventSerializerTest.cs @@ -0,0 +1,18 @@ +using Fluss.Events; + +namespace Fluss.UnitTest.Core.Events; + +public class EventSerializerTest +{ + [Fact] + public void Serialize_Deserialize_TestEvent1() + { + var testEvent = new TestEvent("Value"); + var serialized = EventSerializer.Serialize(testEvent); + var deserialized = EventSerializer.Deserialize(serialized); + + Assert.Equal(testEvent, deserialized); + } + + public record TestEvent(string Value) : Event; +} \ No newline at end of file diff --git a/src/Fluss.UnitTest/Core/Upcasting/EventUpcasterServiceTest.cs b/src/Fluss.UnitTest/Core/Upcasting/EventUpcasterServiceTest.cs index 22d898c..0572c94 100644 --- a/src/Fluss.UnitTest/Core/Upcasting/EventUpcasterServiceTest.cs +++ b/src/Fluss.UnitTest/Core/Upcasting/EventUpcasterServiceTest.cs @@ -1,3 +1,4 @@ +using System.Text.Json.Nodes; using Fluss.Events; using Fluss.Upcasting; using Microsoft.Extensions.Logging; @@ -10,14 +11,14 @@ public class EventUpcasterServiceTest { private static RawEventEnvelope GetRawTestEvent1Envelope(int version) { - var jObject = new TestEvent1("Value").ToJObject(); + var jObject = EventSerializer.Serialize(new TestEvent1("Value")); return new RawEventEnvelope { Version = version, RawEvent = jObject }; } private static RawEventEnvelope GetRawTestEvent2Envelope(int version) { - var jObject = new TestEvent2("Value2").ToJObject(); + var jObject = EventSerializer.Serialize(new TestEvent2("Value2")); return new RawEventEnvelope { Version = version, RawEvent = jObject }; } @@ -62,7 +63,7 @@ public async Task SingleEventsAreUpcast() repo => repo.ReplaceEvent( It.IsAny(), It.Is>( - newEvents => newEvents.SingleOrDefault()!.RawEvent["Property1"]!.ToObject() == "Upcast" + newEvents => newEvents.SingleOrDefault()!.RawEvent["Property1"]!.GetValue() == "Upcast" ) ), Times.Exactly(4) @@ -101,7 +102,7 @@ public async Task UpcastsAreChainable() repo => repo.ReplaceEvent( It.IsAny(), It.Is>( - newEvents => newEvents.SingleOrDefault()!.RawEvent["Property2"]!.ToObject() == "Value" + newEvents => newEvents.SingleOrDefault()!.RawEvent["Property2"]!.GetValue() == "Value" ) ), Times.Exactly(4) @@ -111,7 +112,7 @@ public async Task UpcastsAreChainable() repo => repo.ReplaceEvent( It.IsAny(), It.Is>( - newEvents => newEvents.SingleOrDefault()!.RawEvent["Property2"]!.ToObject() == "Upcast-Value" + newEvents => newEvents.SingleOrDefault()!.RawEvent["Property2"]!.GetValue() == "Upcast-Value" ) ), Times.Exactly(4) @@ -121,7 +122,7 @@ public async Task UpcastsAreChainable() repo => repo.ReplaceEvent( It.IsAny(), It.Is>( - newEvents => newEvents.SingleOrDefault()!.RawEvent["Property2"]!.ToObject() == "Upcast-Value2") + newEvents => newEvents.SingleOrDefault()!.RawEvent["Property2"]!.GetValue() == "Upcast-Value2") ), Times.Once ); @@ -155,19 +156,21 @@ record TestEvent2(string Property2) : Event; internal class NoopUpcast : IUpcaster { - public IEnumerable? Upcast(JObject eventJson) => null; + public IEnumerable? Upcast(JsonObject eventJson) => null; } internal class SingleEventUpcast : IUpcaster { - public IEnumerable? Upcast(JObject eventJson) + public IEnumerable? Upcast(JsonObject eventJson) { - var type = eventJson.GetValue("$type")?.ToObject(); + var type = eventJson["$type"]!.GetValue(); if (type != typeof(TestEvent1).AssemblyQualifiedName) return null; - var clone = (JObject)eventJson.DeepClone(); - clone["Property1"] = "Upcast"; + var clone = new JsonObject(eventJson) + { + ["Property1"] = "Upcast" + }; return [clone]; } @@ -175,9 +178,9 @@ internal class SingleEventUpcast : IUpcaster internal class MultiEventUpcast : IUpcaster { - public IEnumerable? Upcast(JObject eventJson) + public IEnumerable? Upcast(JsonObject eventJson) { - var type = eventJson.GetValue("$type")?.ToObject(); + var type = eventJson["$type"]!.GetValue(); if (type != typeof(TestEvent1).AssemblyQualifiedName) return null; @@ -187,16 +190,18 @@ internal class MultiEventUpcast : IUpcaster internal class ChainedEventUpcast : IUpcaster { - public IEnumerable? Upcast(JObject eventJson) + public IEnumerable? Upcast(JsonObject eventJson) { - var type = eventJson.GetValue("$type")?.ToObject(); + var type = eventJson["$type"]!.GetValue(); if (type != typeof(TestEvent1).AssemblyQualifiedName) return null; - var clone = (JObject)eventJson.DeepClone(); - clone["Property2"] = clone["Property1"]; + var clone = new JsonObject(eventJson) + { + ["Property2"] = eventJson["Property1"], + ["$type"] = typeof(TestEvent2).AssemblyQualifiedName + }; clone.Remove("Property1"); - clone["$type"] = typeof(TestEvent2).AssemblyQualifiedName; return [clone]; } @@ -205,14 +210,16 @@ internal class ChainedEventUpcast : IUpcaster [DependsOn(typeof(ChainedEventUpcast))] internal class ChainedEventUpcast2 : IUpcaster { - public IEnumerable? Upcast(JObject eventJson) + public IEnumerable? Upcast(JsonObject eventJson) { - var type = eventJson.GetValue("$type")?.ToObject(); + var type = eventJson["$type"]!.GetValue(); if (type != typeof(TestEvent2).AssemblyQualifiedName) return null; - var clone = (JObject)eventJson.DeepClone(); - clone["Property2"] = "Upcast-" + clone["Property2"]!.ToObject(); + var clone = new JsonObject(eventJson) + { + ["Property2"] = "Upcast-" + eventJson["Property2"]!.GetValue() + }; return [clone]; } diff --git a/src/Fluss.UnitTest/Core/Upcasting/UpcasterSorterTest.cs b/src/Fluss.UnitTest/Core/Upcasting/UpcasterSorterTest.cs index f2a8f73..e5aeca6 100644 --- a/src/Fluss.UnitTest/Core/Upcasting/UpcasterSorterTest.cs +++ b/src/Fluss.UnitTest/Core/Upcasting/UpcasterSorterTest.cs @@ -1,5 +1,5 @@ +using System.Text.Json.Nodes; using Fluss.Upcasting; -using Newtonsoft.Json.Linq; namespace Fluss.UnitTest.Core.Upcasting; @@ -50,34 +50,34 @@ public void ThrowsWhenMissingDependencies() internal class ExampleUpcasterNoDeps : IUpcaster { - public IEnumerable Upcast(JObject eventJson) => throw new NotImplementedException(); + public IEnumerable Upcast(JsonObject eventJson) => throw new NotImplementedException(); } internal class ExampleUpcasterDeps1 : IUpcaster { - public IEnumerable Upcast(JObject eventJson) => throw new NotImplementedException(); + public IEnumerable Upcast(JsonObject eventJson) => throw new NotImplementedException(); } [DependsOn(typeof(ExampleUpcasterDeps1), typeof(ExampleUpcasterNoDeps))] internal class ExampleUpcasterDeps2 : IUpcaster { - public IEnumerable Upcast(JObject eventJson) => throw new NotImplementedException(); + public IEnumerable Upcast(JsonObject eventJson) => throw new NotImplementedException(); } [DependsOn(typeof(ExampleUpcasterDeps1), typeof(ExampleUpcasterDeps2))] internal class ExampleUpcasterDeps3 : IUpcaster { - public IEnumerable Upcast(JObject eventJson) => throw new NotImplementedException(); + public IEnumerable Upcast(JsonObject eventJson) => throw new NotImplementedException(); } [DependsOn(typeof(ExampleUpcasterCyclic2))] internal class ExampleUpcasterCyclic1 : IUpcaster { - public IEnumerable Upcast(JObject eventJson) => throw new NotImplementedException(); + public IEnumerable Upcast(JsonObject eventJson) => throw new NotImplementedException(); } [DependsOn(typeof(ExampleUpcasterCyclic1))] internal class ExampleUpcasterCyclic2 : IUpcaster { - public IEnumerable Upcast(JObject eventJson) => throw new NotImplementedException(); + public IEnumerable Upcast(JsonObject eventJson) => throw new NotImplementedException(); } diff --git a/src/Fluss.UnitTest/Regen/RegenTests.cs b/src/Fluss.UnitTest/Regen/RegenTests.cs index 5465e55..0dc776d 100644 --- a/src/Fluss.UnitTest/Regen/RegenTests.cs +++ b/src/Fluss.UnitTest/Regen/RegenTests.cs @@ -1,7 +1,6 @@ using Fluss.Regen; using Microsoft.CodeAnalysis; using Microsoft.CodeAnalysis.CSharp; -using Newtonsoft.Json.Linq; namespace Fluss.UnitTest.Regen; @@ -179,12 +178,12 @@ public Task GenerateForUpcaster() """ using Fluss; using Fluss.Upcasting; - using Newtonsoft.Json.Linq; + using System.Text.Json; namespace TestNamespace; public class TestUpcaster : IUpcaster { - public IEnumerable? Upcast(JObject eventJson) { + public IEnumerable? Upcast(JsonObject eventJson) { return null; } } @@ -226,7 +225,6 @@ public static GeneratorDriverRunResult GenerateFor(string source) [ MetadataReference.CreateFromFile(typeof(object).Assembly.Location), MetadataReference.CreateFromFile(typeof(UnitOfWork).Assembly.Location), - MetadataReference.CreateFromFile(typeof(JObject).Assembly.Location), ]); var runResult = driver.RunGenerators(compilation).GetRunResult(); diff --git a/src/Fluss/Events/Event.cs b/src/Fluss/Events/Event.cs index 09b7257..91597ae 100644 --- a/src/Fluss/Events/Event.cs +++ b/src/Fluss/Events/Event.cs @@ -1,17 +1,5 @@ -using Newtonsoft.Json; -using Newtonsoft.Json.Linq; - namespace Fluss.Events; public interface Event { } - -public static class EventExtension -{ - public static JObject ToJObject(this Event @event) - { - var serializer = new JsonSerializer { TypeNameHandling = TypeNameHandling.All, TypeNameAssemblyFormatHandling = TypeNameAssemblyFormatHandling.Full }; - return JObject.FromObject(@event, serializer); - } -} diff --git a/src/Fluss/Events/EventEnvelope.cs b/src/Fluss/Events/EventEnvelope.cs index d464065..84f575b 100644 --- a/src/Fluss/Events/EventEnvelope.cs +++ b/src/Fluss/Events/EventEnvelope.cs @@ -1,5 +1,4 @@ -using System.Runtime.InteropServices; -using Newtonsoft.Json.Linq; +using System.Text.Json.Nodes; namespace Fluss.Events; @@ -13,7 +12,7 @@ public abstract record Envelope public sealed record RawEventEnvelope : Envelope { - public required JObject RawEvent { get; init; } + public required JsonObject RawEvent { get; init; } } public record EventEnvelope : Envelope diff --git a/src/Fluss/Events/EventSerializer.cs b/src/Fluss/Events/EventSerializer.cs new file mode 100644 index 0000000..675c74b --- /dev/null +++ b/src/Fluss/Events/EventSerializer.cs @@ -0,0 +1,37 @@ +using System.Text.Json; +using System.Text.Json.Nodes; +using System.Text.Json.Serialization; + +namespace Fluss.Events; + +public static class EventSerializer +{ + public class FooBar : JsonConverter + { + + public override Event? Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + throw new NotImplementedException(); + } + + public override void Write(Utf8JsonWriter writer, Event value, JsonSerializerOptions options) + { + throw new NotImplementedException(); + } + } + + public static JsonSerializerOptions Options { get; } = new() + { + Converters = { new FooBar() }, + }; + + public static JsonObject Serialize(Event @event) + { + return JsonSerializer.Deserialize(JsonSerializer.Serialize(@event, Options))!; + } + + public static Event Deserialize(JsonObject json) + { + return (Event)JsonSerializer.Deserialize(json.ToString(), typeof(Event))!; + } +} \ No newline at end of file diff --git a/src/Fluss/Events/InMemoryEventRepository.cs b/src/Fluss/Events/InMemoryEventRepository.cs index aa423e4..ecd2ea2 100644 --- a/src/Fluss/Events/InMemoryEventRepository.cs +++ b/src/Fluss/Events/InMemoryEventRepository.cs @@ -1,7 +1,6 @@ using System.Collections.ObjectModel; using Collections.Pooled; using Fluss.Exceptions; -using Newtonsoft.Json; namespace Fluss.Events; @@ -46,7 +45,7 @@ public ValueTask> GetRawEvents() { var rawEnvelopes = _events.Select(envelope => { - var rawEvent = envelope.Event.ToJObject(); + var rawEvent = EventSerializer.Serialize(envelope.Event); return new RawEventEnvelope { @@ -71,11 +70,10 @@ public ValueTask ReplaceEvent(long at, IEnumerable newEvents) } _events.RemoveAt(checkedAt); - var serializer = new JsonSerializer { TypeNameHandling = TypeNameHandling.All }; - + var eventEnvelopes = events.Select(envelope => { - var @event = envelope.RawEvent.ToObject(serializer); + var @event = EventSerializer.Deserialize(envelope.RawEvent); if (@event is null) throw new Exception("Failed to convert raw event to Event"); diff --git a/src/Fluss/Fluss.csproj b/src/Fluss/Fluss.csproj index fc920a9..e9f6558 100644 --- a/src/Fluss/Fluss.csproj +++ b/src/Fluss/Fluss.csproj @@ -20,7 +20,6 @@ - diff --git a/src/Fluss/Upcasting/IUpcaster.cs b/src/Fluss/Upcasting/IUpcaster.cs index 9b06c9e..f3ea90d 100644 --- a/src/Fluss/Upcasting/IUpcaster.cs +++ b/src/Fluss/Upcasting/IUpcaster.cs @@ -1,11 +1,11 @@ using System.Collections.Immutable; -using Newtonsoft.Json.Linq; +using System.Text.Json.Nodes; namespace Fluss.Upcasting; public interface IUpcaster { - public IEnumerable? Upcast(JObject eventJson); + public IEnumerable? Upcast(JsonObject eventJson); } [AttributeUsage(AttributeTargets.Class)]