diff --git a/.github/workflows/dotnet.yml b/.github/workflows/dotnet.yml
index 9c857b8..d03c80f 100644
--- a/.github/workflows/dotnet.yml
+++ b/.github/workflows/dotnet.yml
@@ -45,6 +45,15 @@ jobs:
- name: Test
run: dotnet test --collect:"XPlat Code Coverage" --no-restore --no-build
working-directory: ./src/Fluss.UnitTest
+ - name: Start PostgreSQL
+ run: docker compose up -d
+ working-directory: ./src/Fluss.PostgreSQL.IntegrationTest
+ - name: Test PostgreSQL
+ run: dotnet test --collect:"XPlat Code Coverage" --no-restore --no-build
+ working-directory: ./src/Fluss.PostgreSQL.IntegrationTest
+ - name: Stop PostgreSQL
+ run: docker compose down
+ working-directory: ./src/Fluss.PostgreSQL.IntegrationTest
- name: Upload coverage reports to Codecov
uses: codecov/codecov-action@v4
env:
diff --git a/src/Benchmark/Bench.cs b/src/Benchmark/Bench.cs
index 4574084..1a0eb4d 100644
--- a/src/Benchmark/Bench.cs
+++ b/src/Benchmark/Bench.cs
@@ -11,13 +11,13 @@
namespace Benchmark;
-[GitJob("879573d", baseline: true, id: "0_before")]
-[SimpleJob(id: "1_after")]
+[GitJob("HEAD", baseline: true, id: "0_before", iterationCount: 10)]
+[SimpleJob(id: "1_after", iterationCount: 10)]
[RPlotExporter]
[MemoryDiagnoser]
public class Bench
{
- [Params("postgres", "in-memory")]
+ [Params("postgres")]
public string StorageType { get; set; } = "in-memory";
[IterationSetup]
diff --git a/src/Fluss.PostgreSQL.IntegrationTest/Fluss.PostgreSQL.IntegrationTest.csproj b/src/Fluss.PostgreSQL.IntegrationTest/Fluss.PostgreSQL.IntegrationTest.csproj
new file mode 100644
index 0000000..b801c77
--- /dev/null
+++ b/src/Fluss.PostgreSQL.IntegrationTest/Fluss.PostgreSQL.IntegrationTest.csproj
@@ -0,0 +1,34 @@
+
+
+
+ net9.0;net8.0
+ enable
+ enable
+ false
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ PreserveNewest
+
+
+
diff --git a/src/Fluss.PostgreSQL.IntegrationTest/PostgreSQLTest.cs b/src/Fluss.PostgreSQL.IntegrationTest/PostgreSQLTest.cs
new file mode 100644
index 0000000..0046131
--- /dev/null
+++ b/src/Fluss.PostgreSQL.IntegrationTest/PostgreSQLTest.cs
@@ -0,0 +1,459 @@
+using System.Text.Json;
+using System.Text.Json.Nodes;
+using Fluss.Events;
+using Fluss.Upcasting;
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.DependencyInjection;
+using Newtonsoft.Json;
+using Newtonsoft.Json.Linq;
+using Npgsql;
+using Xunit.Abstractions;
+
+namespace Fluss.PostgreSQL.IntegrationTest;
+
+public class PostgreSQLTest : IAsyncLifetime
+{
+ private readonly ITestOutputHelper _testOutputHelper;
+ private string _dbName = null!;
+ private string _managementConnectionString = null!;
+ private string _connectionString = null!;
+
+ public PostgreSQLTest(ITestOutputHelper testOutputHelper)
+ {
+ _testOutputHelper = testOutputHelper;
+ }
+
+ public async Task InitializeAsync()
+ {
+ var config = new ConfigurationBuilder()
+ .AddJsonFile("appsettings.json", false, false)
+ .Build();
+
+ _managementConnectionString = config.GetConnectionString("DefaultConnection")!;
+ _dbName = "test" + Guid.NewGuid().ToString().Replace("-", "");
+
+ await using var npgsqlConnection = new NpgsqlConnection(_managementConnectionString);
+ await npgsqlConnection.OpenAsync();
+
+ await using var command = new NpgsqlCommand("CREATE DATABASE " + _dbName, npgsqlConnection);
+ await command.ExecuteNonQueryAsync();
+
+ _connectionString = new NpgsqlConnectionStringBuilder(_managementConnectionString)
+ {
+ Database = _dbName
+ }.ConnectionString;
+ }
+
+ [Fact]
+ public async Task SimpleTest()
+ {
+ var sc = new ServiceCollection();
+ sc.AddEventSourcing();
+ sc.AddPostgresEventSourcingRepository(_connectionString);
+
+ var sp = sc.BuildServiceProvider();
+
+ await sp.GetRequiredService().StartAsync(default);
+ await sp.GetRequiredService().StartAsync(default);
+ await sp.GetRequiredService().WaitForFinish();
+
+ var eventRepository = sp.GetRequiredService();
+ await eventRepository.Publish([
+ new EventEnvelope
+ {
+ Event = new TestEvent(42),
+ Version = 0,
+ At = DateTimeOffset.UtcNow,
+ By = null
+ }
+ ]);
+
+ var version = await eventRepository.GetLatestVersion();
+ Assert.Equal(0, version);
+
+ var events = await eventRepository.GetEvents(-1, 0);
+ Assert.Single(events);
+
+ Assert.Equal(new TestEvent(42), events[0].Span[0].Event);
+ }
+
+ [Fact]
+ public async Task TestGetRawEvents()
+ {
+ var sc = new ServiceCollection();
+ sc.AddEventSourcing();
+ sc.AddPostgresEventSourcingRepository(_connectionString);
+
+ var sp = sc.BuildServiceProvider();
+
+ await sp.GetRequiredService().StartAsync(default);
+ await sp.GetRequiredService().StartAsync(default);
+ await sp.GetRequiredService().WaitForFinish();
+
+ var eventRepository = sp.GetRequiredService();
+ var baseEventRepository = sp.GetRequiredService();
+
+ // Publish some events
+ await eventRepository.Publish([
+ new EventEnvelope
+ {
+ Event = new TestEvent(1),
+ Version = 0,
+ At = DateTimeOffset.UtcNow,
+ By = null
+ },
+ new EventEnvelope
+ {
+ Event = new TestEvent(2),
+ Version = 1,
+ At = DateTimeOffset.UtcNow,
+ By = null
+ }
+ ]);
+
+ // Get raw events
+ var rawEvents = await baseEventRepository.GetRawEvents();
+ var eventList = rawEvents.ToList();
+
+ Assert.Equal(2, eventList.Count);
+ Assert.Equal(0, eventList[0].Version);
+ Assert.Equal(1, eventList[1].Version);
+ }
+
+ [Fact]
+ public async Task TestReplaceEvent()
+ {
+ var sc = new ServiceCollection();
+ sc.AddEventSourcing();
+ sc.AddPostgresEventSourcingRepository(_connectionString);
+
+ var sp = sc.BuildServiceProvider();
+
+ await sp.GetRequiredService().StartAsync(default);
+ await sp.GetRequiredService().StartAsync(default);
+ await sp.GetRequiredService().WaitForFinish();
+
+ var baseEventRepository = (PostgreSQLEventRepository)sp.GetRequiredService();
+
+ // Publish an event
+ await baseEventRepository.Publish([
+ new EventEnvelope
+ {
+ Event = new TestEvent(1),
+ Version = 0,
+ At = DateTimeOffset.UtcNow,
+ By = null
+ }
+ ]);
+
+ // Replace the event
+ var newEvent = new RawEventEnvelope
+ {
+ Version = 0,
+ At = DateTimeOffset.UtcNow,
+ By = null,
+ RawEvent = EventSerializer.Serialize(new TestEvent(2))
+ };
+
+ await baseEventRepository.ReplaceEvent(0, [newEvent]);
+
+ // Verify the event was replaced
+ var events = await baseEventRepository.GetEvents(-1, 0);
+ Assert.Single(events);
+ Assert.Equal(new TestEvent(2), events[0].Span[0].Event);
+ }
+
+ [Fact]
+ public async Task TestReplaceEventWithMultiple()
+ {
+ var sc = new ServiceCollection();
+ sc.AddEventSourcing();
+ sc.AddPostgresEventSourcingRepository(_connectionString);
+
+ var sp = sc.BuildServiceProvider();
+
+ await sp.GetRequiredService().StartAsync(default);
+ await sp.GetRequiredService().StartAsync(default);
+
+ await sp.GetRequiredService().WaitForFinish();
+
+ var baseEventRepository = sp.GetRequiredService();
+
+ // Publish an initial event
+ await baseEventRepository.Publish([
+ new EventEnvelope
+ {
+ Event = new TestEvent(1),
+ Version = 0,
+ At = DateTimeOffset.UtcNow,
+ By = null
+ }
+ ]);
+
+ // Replace the event with multiple events
+ var newEvents = new List
+ {
+ new RawEventEnvelope
+ {
+ Version = 0,
+ At = DateTimeOffset.UtcNow,
+ By = null,
+ RawEvent = EventSerializer.Serialize(new TestEvent(2))
+ },
+ new RawEventEnvelope
+ {
+ Version = 1,
+ At = DateTimeOffset.UtcNow,
+ By = null,
+ RawEvent = EventSerializer.Serialize(new TestEvent(3))
+ }
+ };
+
+ await baseEventRepository.ReplaceEvent(0, newEvents);
+
+ // Verify the events were replaced
+ var events = await baseEventRepository.GetEvents(-1, 1);
+ Assert.Equal(2, events[0].Length);
+ Assert.Equal(new TestEvent(2), events[0].Span[0].Event);
+ Assert.Equal(new TestEvent(3), events[0].Span[1].Event);
+ }
+
+ [Fact]
+ public async Task TestUpcaster()
+ {
+ var sc = new ServiceCollection();
+ sc.AddEventSourcing();
+ sc.AddPostgresEventSourcingRepository(_connectionString);
+
+ await using (var sp = sc.BuildServiceProvider())
+ {
+ var migrator = sp.GetRequiredService();
+
+ await migrator.StartAsync(default);
+ await migrator.WaitForFinish();
+
+ var repository = sp.GetRequiredService();
+ await repository.Publish([
+ new EventEnvelope
+ {
+ Event = new TestEvent(1),
+ Version = 0,
+ At = DateTimeOffset.UtcNow,
+ By = null
+ }
+ ]);
+ }
+
+ sc.AddUpcaster();
+
+ await using (var sp = sc.BuildServiceProvider())
+ {
+ var migrator = sp.GetRequiredService();
+ var upcaster = sp.GetRequiredService();
+
+ await migrator.StartAsync(default);
+ await migrator.WaitForFinish();
+ await upcaster.StartAsync(default);
+ await upcaster.ExecuteTask!;
+
+ var repository = sp.GetRequiredService();
+ var events = await repository.GetEvents(-1, 0);
+ Assert.Single(events);
+ Assert.Equal(new TestEvent2(1), events[0].Span[0].Event);
+ }
+ }
+
+ [Fact]
+ public async Task TestNewEventsSubscription()
+ {
+ var sc = new ServiceCollection();
+ sc.AddEventSourcing();
+ sc.AddPostgresEventSourcingRepository(_connectionString);
+
+ await using var sp = sc.BuildServiceProvider();
+
+ var migrator = sp.GetRequiredService();
+ await migrator.StartAsync(default);
+ await migrator.WaitForFinish();
+
+ var repository = sp.GetRequiredService();
+
+ var eventRaised = new TaskCompletionSource();
+
+ void Handler(object? sender, EventArgs args)
+ {
+ try
+ {
+ eventRaised.SetResult(true);
+ }
+ catch (Exception)
+ {
+ // ignored
+ }
+ }
+
+ repository.NewEvents += Handler;
+
+ try
+ {
+ await repository.Publish([
+ new EventEnvelope
+ {
+ Event = new TestEvent(1),
+ Version = 0,
+ At = DateTimeOffset.UtcNow,
+ By = null
+ }
+ ]);
+
+ // Wait for the event to be raised or timeout after 5 seconds
+ var eventRaisedTask = eventRaised.Task;
+ var timeoutTask = Task.Delay(TimeSpan.FromSeconds(5));
+
+ var completedTask = await Task.WhenAny(eventRaisedTask, timeoutTask);
+
+ Assert.Equal(eventRaisedTask, completedTask);
+ Assert.True(await eventRaisedTask, "NewEvents event was not raised");
+ }
+ finally
+ {
+ repository.NewEvents -= Handler;
+ }
+
+ // Test removing the event handler
+ var secondEventRaised = new TaskCompletionSource();
+ repository.NewEvents += (_, _) =>
+ {
+ try
+ {
+ secondEventRaised.SetResult(true);
+ }
+ catch (Exception)
+ {
+ // ignored
+ }
+ };
+
+ await repository.Publish([
+ new EventEnvelope
+ {
+ Event = new TestEvent(2),
+ Version = 1,
+ At = DateTimeOffset.UtcNow,
+ By = null
+ }
+ ]);
+
+ var secondEventRaisedTask = secondEventRaised.Task;
+ var secondTimeoutTask = Task.Delay(TimeSpan.FromSeconds(5));
+
+ var secondCompletedTask = await Task.WhenAny(secondEventRaisedTask, secondTimeoutTask);
+
+ Assert.Equal(secondEventRaisedTask, secondCompletedTask);
+ Assert.True(await secondEventRaisedTask, "NewEvents event was not raised after removing a handler");
+ }
+
+ [Fact]
+ public async Task TestDatabaseNotificationForwarding()
+ {
+ var sc1 = new ServiceCollection();
+ sc1.AddEventSourcing();
+ sc1.AddPostgresEventSourcingRepository(_connectionString);
+
+ var sc2 = new ServiceCollection();
+ sc2.AddEventSourcing();
+ sc2.AddPostgresEventSourcingRepository(_connectionString);
+
+ await using var sp1 = sc1.BuildServiceProvider();
+ await using var sp2 = sc2.BuildServiceProvider();
+
+ var migrator = sp1.GetRequiredService();
+ await migrator.StartAsync(default);
+ await migrator.WaitForFinish();
+
+ var repository1 = sp1.GetRequiredService();
+ var repository2 = sp2.GetRequiredService();
+
+ var eventRaised1 = new TaskCompletionSource();
+ var eventRaised2 = new TaskCompletionSource();
+
+ repository1.NewEvents += (_, _) =>
+ {
+ try
+ {
+ eventRaised1.TrySetResult(true);
+ }
+ catch (Exception)
+ {
+ // ignored
+ }
+ };
+ repository2.NewEvents += (_, _) =>
+ {
+ try
+ {
+ eventRaised2.TrySetResult(true);
+ }
+ catch (Exception)
+ {
+ // ignored
+ }
+ };
+
+ // Publish an event using the first repository
+ await repository1.Publish([
+ new EventEnvelope
+ {
+ Event = new TestEvent(1),
+ Version = 0,
+ At = DateTimeOffset.UtcNow,
+ By = null
+ }
+ ]);
+
+ // Wait for both event handlers to be triggered or timeout after 5 seconds
+ var timeoutTask = Task.Delay(TimeSpan.FromSeconds(5));
+ var allTasks = await Task.WhenAny(
+ Task.WhenAll(eventRaised1.Task, eventRaised2.Task),
+ timeoutTask
+ );
+
+ Assert.NotEqual(timeoutTask, allTasks);
+ Assert.True(await eventRaised1.Task, "NewEvents event was not raised on the first repository");
+ Assert.True(await eventRaised2.Task, "NewEvents event was not raised on the second repository");
+ }
+
+ public async Task DisposeAsync()
+ {
+ await using var npgsqlConnection = new NpgsqlConnection(_managementConnectionString);
+ await npgsqlConnection.OpenAsync();
+
+ await using var command = new NpgsqlCommand($"DROP DATABASE {_dbName} WITH (FORCE)", npgsqlConnection);
+ await command.ExecuteNonQueryAsync();
+ }
+
+ public record TestEvent(int Test) : Event;
+ public record TestEvent2(int Test) : Event;
+
+ public record TestEventUpcaster : IUpcaster
+ {
+ public IEnumerable? Upcast(JsonObject eventJson)
+ {
+ var eventType = eventJson["$type"]!.GetValue();
+ if (eventType == typeof(TestEvent).AssemblyQualifiedName)
+ {
+ var eventJson2 = new JsonObject(eventJson)
+ {
+ ["$type"] = typeof(TestEvent2).AssemblyQualifiedName
+ };
+
+ return
+ [
+ eventJson2
+ ];
+ }
+
+ return null;
+ }
+ }
+}
diff --git a/src/Fluss.PostgreSQL.IntegrationTest/appsettings.json b/src/Fluss.PostgreSQL.IntegrationTest/appsettings.json
new file mode 100644
index 0000000..c644e43
--- /dev/null
+++ b/src/Fluss.PostgreSQL.IntegrationTest/appsettings.json
@@ -0,0 +1,5 @@
+{
+ "ConnectionStrings": {
+ "DefaultConnection": "Host=localhost;Port=5432;Database=postgres;Username=postgres;Password=postgres"
+ }
+}
\ No newline at end of file
diff --git a/src/Fluss.PostgreSQL.IntegrationTest/docker-compose.yaml b/src/Fluss.PostgreSQL.IntegrationTest/docker-compose.yaml
new file mode 100644
index 0000000..2113418
--- /dev/null
+++ b/src/Fluss.PostgreSQL.IntegrationTest/docker-compose.yaml
@@ -0,0 +1,9 @@
+services:
+ database:
+ image: postgres:15
+ environment:
+ POSTGRES_USER: postgres
+ POSTGRES_PASSWORD: postgres
+ POSTGRES_DB: postgres
+ ports:
+ - "5432:5432"
\ No newline at end of file
diff --git a/src/Fluss.PostgreSQL/Fluss.PostgreSQL.csproj b/src/Fluss.PostgreSQL/Fluss.PostgreSQL.csproj
index eee5d38..f513885 100644
--- a/src/Fluss.PostgreSQL/Fluss.PostgreSQL.csproj
+++ b/src/Fluss.PostgreSQL/Fluss.PostgreSQL.csproj
@@ -1,5 +1,4 @@
-
net8.0;net9.0
enable
@@ -11,14 +10,12 @@
https://github.com/atmina/fluss
git
MIT
- true
-
@@ -28,5 +25,4 @@
-
diff --git a/src/Fluss.PostgreSQL/PostgreSQLEventRepository.cs b/src/Fluss.PostgreSQL/PostgreSQLEventRepository.cs
index 2a0fe74..1551fe0 100644
--- a/src/Fluss.PostgreSQL/PostgreSQLEventRepository.cs
+++ b/src/Fluss.PostgreSQL/PostgreSQLEventRepository.cs
@@ -1,10 +1,9 @@
using System.Collections.ObjectModel;
using System.Data;
using System.Diagnostics;
+using System.Text.Json.Nodes;
using Fluss.Events;
using Fluss.Exceptions;
-using Newtonsoft.Json;
-using Newtonsoft.Json.Linq;
using Npgsql;
using NpgsqlTypes;
@@ -12,19 +11,14 @@ namespace Fluss.PostgreSQL;
public partial class PostgreSQLEventRepository : IBaseEventRepository
{
- private readonly NpgsqlDataSource dataSource;
+ private readonly NpgsqlDataSource _dataSource;
+
+
public PostgreSQLEventRepository(PostgreSQLConfig config)
{
var dataSourceBuilder = new NpgsqlDataSourceBuilder(config.ConnectionString);
- dataSourceBuilder.UseJsonNet(settings: new JsonSerializerSettings
- {
- TypeNameHandling = TypeNameHandling.All,
- TypeNameAssemblyFormatHandling = TypeNameAssemblyFormatHandling.Full,
- MetadataPropertyHandling =
- MetadataPropertyHandling.ReadAhead // While this is marked as a performance hit, profiling approves
- });
- dataSource = dataSourceBuilder.Build();
+ _dataSource = dataSourceBuilder.Build();
}
private async ValueTask Publish(IReadOnlyList envelopes, Func eventExtractor,
@@ -34,7 +28,7 @@ private async ValueTask Publish(IReadOnlyList envelopes, F
activity?.SetTag("EventSourcing.EventRepository", nameof(PostgreSQLEventRepository));
// await using var connection has the side-effect that our connection passed from the outside is also disposed, so we split this up.
- await using var freshConnection = dataSource.OpenConnection();
+ await using var freshConnection = _dataSource.OpenConnection();
var connection = conn ?? freshConnection;
activity?.AddEvent(new ActivityEvent("Connection open"));
@@ -89,7 +83,7 @@ public async ValueTask Publish(IReadOnlyList envelopes)
private async ValueTask WithReader(long fromExclusive, long toInclusive,
Func> action)
{
- await using var connection = dataSource.OpenConnection();
+ await using var connection = _dataSource.OpenConnection();
await using var cmd =
new NpgsqlCommand(
"""
@@ -149,7 +143,7 @@ public async ValueTask> GetRawEvents()
Version = reader.GetInt64(0),
At = reader.GetDateTime(1),
By = reader.IsDBNull(2) ? null : reader.GetGuid(2),
- RawEvent = reader.GetFieldValue(3),
+ RawEvent = reader.GetFieldValue(3),
});
}
@@ -161,7 +155,7 @@ public async ValueTask ReplaceEvent(long at, IEnumerable newEn
{
var envelopes = newEnvelopes.ToList();
- await using var connection = dataSource.OpenConnection();
+ await using var connection = _dataSource.OpenConnection();
await using var transaction = connection.BeginTransaction();
await using var deleteCommand =
@@ -231,7 +225,7 @@ public async ValueTask GetLatestVersion()
private async Task GetLatestVersionImpl()
{
- await using var connection = dataSource.OpenConnection();
+ await using var connection = _dataSource.OpenConnection();
await using var cmd = new NpgsqlCommand("""SELECT MAX("Version") FROM "Events";""", connection);
await cmd.PrepareAsync();
diff --git a/src/Fluss.PostgreSQL/PostgreSQLEventRepositorySubscriptions.cs b/src/Fluss.PostgreSQL/PostgreSQLEventRepositorySubscriptions.cs
index ad0fcca..ce5172b 100644
--- a/src/Fluss.PostgreSQL/PostgreSQLEventRepositorySubscriptions.cs
+++ b/src/Fluss.PostgreSQL/PostgreSQLEventRepositorySubscriptions.cs
@@ -33,7 +33,7 @@ private async Task InitializeTrigger()
}
_triggerInitialized = true;
- await using var listenConnection = dataSource.CreateConnection();
+ await using var listenConnection = _dataSource.CreateConnection();
await listenConnection.OpenAsync(_cancellationTokenSource.Token);
listenConnection.Notification += (_, _) =>
diff --git a/src/Fluss.PostgreSQL/ServiceCollectionExtensions.cs b/src/Fluss.PostgreSQL/ServiceCollectionExtensions.cs
index f7ff5d4..629e695 100644
--- a/src/Fluss.PostgreSQL/ServiceCollectionExtensions.cs
+++ b/src/Fluss.PostgreSQL/ServiceCollectionExtensions.cs
@@ -7,6 +7,7 @@
[assembly: InternalsVisibleTo("Fluss.UnitTest")]
+[assembly: InternalsVisibleTo("Fluss.PostgreSQL.IntegrationTest")]
namespace Fluss.PostgreSQL;
@@ -28,7 +29,8 @@ public static IServiceCollection AddPostgresEventSourcingRepository(this IServic
.AddSingleton(new PostgreSQLConfig(connectionString))
.AddSingleton()
.AddHostedService(sp => sp.GetRequiredService())
- .AddHostedService();
+ .AddSingleton()
+ .AddHostedService(sp => sp.GetRequiredService());
}
}
diff --git a/src/Fluss.UnitTest/Core/Events/EventSerializerTest.cs b/src/Fluss.UnitTest/Core/Events/EventSerializerTest.cs
new file mode 100644
index 0000000..8d4ee61
--- /dev/null
+++ b/src/Fluss.UnitTest/Core/Events/EventSerializerTest.cs
@@ -0,0 +1,18 @@
+using Fluss.Events;
+
+namespace Fluss.UnitTest.Core.Events;
+
+public class EventSerializerTest
+{
+ [Fact]
+ public void Serialize_Deserialize_TestEvent1()
+ {
+ var testEvent = new TestEvent("Value");
+ var serialized = EventSerializer.Serialize(testEvent);
+ var deserialized = EventSerializer.Deserialize(serialized);
+
+ Assert.Equal(testEvent, deserialized);
+ }
+
+ public record TestEvent(string Value) : Event;
+}
\ No newline at end of file
diff --git a/src/Fluss.UnitTest/Core/Upcasting/EventUpcasterServiceTest.cs b/src/Fluss.UnitTest/Core/Upcasting/EventUpcasterServiceTest.cs
index 22d898c..0572c94 100644
--- a/src/Fluss.UnitTest/Core/Upcasting/EventUpcasterServiceTest.cs
+++ b/src/Fluss.UnitTest/Core/Upcasting/EventUpcasterServiceTest.cs
@@ -1,3 +1,4 @@
+using System.Text.Json.Nodes;
using Fluss.Events;
using Fluss.Upcasting;
using Microsoft.Extensions.Logging;
@@ -10,14 +11,14 @@ public class EventUpcasterServiceTest
{
private static RawEventEnvelope GetRawTestEvent1Envelope(int version)
{
- var jObject = new TestEvent1("Value").ToJObject();
+ var jObject = EventSerializer.Serialize(new TestEvent1("Value"));
return new RawEventEnvelope { Version = version, RawEvent = jObject };
}
private static RawEventEnvelope GetRawTestEvent2Envelope(int version)
{
- var jObject = new TestEvent2("Value2").ToJObject();
+ var jObject = EventSerializer.Serialize(new TestEvent2("Value2"));
return new RawEventEnvelope { Version = version, RawEvent = jObject };
}
@@ -62,7 +63,7 @@ public async Task SingleEventsAreUpcast()
repo => repo.ReplaceEvent(
It.IsAny(),
It.Is>(
- newEvents => newEvents.SingleOrDefault()!.RawEvent["Property1"]!.ToObject() == "Upcast"
+ newEvents => newEvents.SingleOrDefault()!.RawEvent["Property1"]!.GetValue() == "Upcast"
)
),
Times.Exactly(4)
@@ -101,7 +102,7 @@ public async Task UpcastsAreChainable()
repo => repo.ReplaceEvent(
It.IsAny(),
It.Is>(
- newEvents => newEvents.SingleOrDefault()!.RawEvent["Property2"]!.ToObject() == "Value"
+ newEvents => newEvents.SingleOrDefault()!.RawEvent["Property2"]!.GetValue() == "Value"
)
),
Times.Exactly(4)
@@ -111,7 +112,7 @@ public async Task UpcastsAreChainable()
repo => repo.ReplaceEvent(
It.IsAny(),
It.Is>(
- newEvents => newEvents.SingleOrDefault()!.RawEvent["Property2"]!.ToObject() == "Upcast-Value"
+ newEvents => newEvents.SingleOrDefault()!.RawEvent["Property2"]!.GetValue() == "Upcast-Value"
)
),
Times.Exactly(4)
@@ -121,7 +122,7 @@ public async Task UpcastsAreChainable()
repo => repo.ReplaceEvent(
It.IsAny(),
It.Is>(
- newEvents => newEvents.SingleOrDefault()!.RawEvent["Property2"]!.ToObject() == "Upcast-Value2")
+ newEvents => newEvents.SingleOrDefault()!.RawEvent["Property2"]!.GetValue() == "Upcast-Value2")
),
Times.Once
);
@@ -155,19 +156,21 @@ record TestEvent2(string Property2) : Event;
internal class NoopUpcast : IUpcaster
{
- public IEnumerable? Upcast(JObject eventJson) => null;
+ public IEnumerable? Upcast(JsonObject eventJson) => null;
}
internal class SingleEventUpcast : IUpcaster
{
- public IEnumerable? Upcast(JObject eventJson)
+ public IEnumerable? Upcast(JsonObject eventJson)
{
- var type = eventJson.GetValue("$type")?.ToObject();
+ var type = eventJson["$type"]!.GetValue();
if (type != typeof(TestEvent1).AssemblyQualifiedName) return null;
- var clone = (JObject)eventJson.DeepClone();
- clone["Property1"] = "Upcast";
+ var clone = new JsonObject(eventJson)
+ {
+ ["Property1"] = "Upcast"
+ };
return [clone];
}
@@ -175,9 +178,9 @@ internal class SingleEventUpcast : IUpcaster
internal class MultiEventUpcast : IUpcaster
{
- public IEnumerable? Upcast(JObject eventJson)
+ public IEnumerable? Upcast(JsonObject eventJson)
{
- var type = eventJson.GetValue("$type")?.ToObject();
+ var type = eventJson["$type"]!.GetValue();
if (type != typeof(TestEvent1).AssemblyQualifiedName) return null;
@@ -187,16 +190,18 @@ internal class MultiEventUpcast : IUpcaster
internal class ChainedEventUpcast : IUpcaster
{
- public IEnumerable? Upcast(JObject eventJson)
+ public IEnumerable? Upcast(JsonObject eventJson)
{
- var type = eventJson.GetValue("$type")?.ToObject();
+ var type = eventJson["$type"]!.GetValue();
if (type != typeof(TestEvent1).AssemblyQualifiedName) return null;
- var clone = (JObject)eventJson.DeepClone();
- clone["Property2"] = clone["Property1"];
+ var clone = new JsonObject(eventJson)
+ {
+ ["Property2"] = eventJson["Property1"],
+ ["$type"] = typeof(TestEvent2).AssemblyQualifiedName
+ };
clone.Remove("Property1");
- clone["$type"] = typeof(TestEvent2).AssemblyQualifiedName;
return [clone];
}
@@ -205,14 +210,16 @@ internal class ChainedEventUpcast : IUpcaster
[DependsOn(typeof(ChainedEventUpcast))]
internal class ChainedEventUpcast2 : IUpcaster
{
- public IEnumerable? Upcast(JObject eventJson)
+ public IEnumerable? Upcast(JsonObject eventJson)
{
- var type = eventJson.GetValue("$type")?.ToObject();
+ var type = eventJson["$type"]!.GetValue();
if (type != typeof(TestEvent2).AssemblyQualifiedName) return null;
- var clone = (JObject)eventJson.DeepClone();
- clone["Property2"] = "Upcast-" + clone["Property2"]!.ToObject();
+ var clone = new JsonObject(eventJson)
+ {
+ ["Property2"] = "Upcast-" + eventJson["Property2"]!.GetValue()
+ };
return [clone];
}
diff --git a/src/Fluss.UnitTest/Core/Upcasting/UpcasterSorterTest.cs b/src/Fluss.UnitTest/Core/Upcasting/UpcasterSorterTest.cs
index f2a8f73..e5aeca6 100644
--- a/src/Fluss.UnitTest/Core/Upcasting/UpcasterSorterTest.cs
+++ b/src/Fluss.UnitTest/Core/Upcasting/UpcasterSorterTest.cs
@@ -1,5 +1,5 @@
+using System.Text.Json.Nodes;
using Fluss.Upcasting;
-using Newtonsoft.Json.Linq;
namespace Fluss.UnitTest.Core.Upcasting;
@@ -50,34 +50,34 @@ public void ThrowsWhenMissingDependencies()
internal class ExampleUpcasterNoDeps : IUpcaster
{
- public IEnumerable Upcast(JObject eventJson) => throw new NotImplementedException();
+ public IEnumerable Upcast(JsonObject eventJson) => throw new NotImplementedException();
}
internal class ExampleUpcasterDeps1 : IUpcaster
{
- public IEnumerable Upcast(JObject eventJson) => throw new NotImplementedException();
+ public IEnumerable Upcast(JsonObject eventJson) => throw new NotImplementedException();
}
[DependsOn(typeof(ExampleUpcasterDeps1), typeof(ExampleUpcasterNoDeps))]
internal class ExampleUpcasterDeps2 : IUpcaster
{
- public IEnumerable Upcast(JObject eventJson) => throw new NotImplementedException();
+ public IEnumerable Upcast(JsonObject eventJson) => throw new NotImplementedException();
}
[DependsOn(typeof(ExampleUpcasterDeps1), typeof(ExampleUpcasterDeps2))]
internal class ExampleUpcasterDeps3 : IUpcaster
{
- public IEnumerable Upcast(JObject eventJson) => throw new NotImplementedException();
+ public IEnumerable Upcast(JsonObject eventJson) => throw new NotImplementedException();
}
[DependsOn(typeof(ExampleUpcasterCyclic2))]
internal class ExampleUpcasterCyclic1 : IUpcaster
{
- public IEnumerable Upcast(JObject eventJson) => throw new NotImplementedException();
+ public IEnumerable Upcast(JsonObject eventJson) => throw new NotImplementedException();
}
[DependsOn(typeof(ExampleUpcasterCyclic1))]
internal class ExampleUpcasterCyclic2 : IUpcaster
{
- public IEnumerable Upcast(JObject eventJson) => throw new NotImplementedException();
+ public IEnumerable Upcast(JsonObject eventJson) => throw new NotImplementedException();
}
diff --git a/src/Fluss.UnitTest/Regen/RegenTests.cs b/src/Fluss.UnitTest/Regen/RegenTests.cs
index 5465e55..0dc776d 100644
--- a/src/Fluss.UnitTest/Regen/RegenTests.cs
+++ b/src/Fluss.UnitTest/Regen/RegenTests.cs
@@ -1,7 +1,6 @@
using Fluss.Regen;
using Microsoft.CodeAnalysis;
using Microsoft.CodeAnalysis.CSharp;
-using Newtonsoft.Json.Linq;
namespace Fluss.UnitTest.Regen;
@@ -179,12 +178,12 @@ public Task GenerateForUpcaster()
"""
using Fluss;
using Fluss.Upcasting;
- using Newtonsoft.Json.Linq;
+ using System.Text.Json;
namespace TestNamespace;
public class TestUpcaster : IUpcaster {
- public IEnumerable? Upcast(JObject eventJson) {
+ public IEnumerable? Upcast(JsonObject eventJson) {
return null;
}
}
@@ -226,7 +225,6 @@ public static GeneratorDriverRunResult GenerateFor(string source)
[
MetadataReference.CreateFromFile(typeof(object).Assembly.Location),
MetadataReference.CreateFromFile(typeof(UnitOfWork).Assembly.Location),
- MetadataReference.CreateFromFile(typeof(JObject).Assembly.Location),
]);
var runResult = driver.RunGenerators(compilation).GetRunResult();
diff --git a/src/Fluss.sln b/src/Fluss.sln
index 0ae3d82..12cb03d 100644
--- a/src/Fluss.sln
+++ b/src/Fluss.sln
@@ -14,6 +14,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Fluss.Regen", "Fluss.Regen\
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Benchmark", "Benchmark\Benchmark.csproj", "{970AB461-4D44-4B99-AACA-7DAA569D4C34}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Fluss.PostgreSQL.IntegrationTest", "Fluss.PostgreSQL.IntegrationTest\Fluss.PostgreSQL.IntegrationTest.csproj", "{3210592D-7B76-4D90-BAAC-7B9CBF7E580D}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -48,5 +50,9 @@ Global
{970AB461-4D44-4B99-AACA-7DAA569D4C34}.Debug|Any CPU.Build.0 = Debug|Any CPU
{970AB461-4D44-4B99-AACA-7DAA569D4C34}.Release|Any CPU.ActiveCfg = Release|Any CPU
{970AB461-4D44-4B99-AACA-7DAA569D4C34}.Release|Any CPU.Build.0 = Release|Any CPU
+ {3210592D-7B76-4D90-BAAC-7B9CBF7E580D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {3210592D-7B76-4D90-BAAC-7B9CBF7E580D}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {3210592D-7B76-4D90-BAAC-7B9CBF7E580D}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {3210592D-7B76-4D90-BAAC-7B9CBF7E580D}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
EndGlobal
diff --git a/src/Fluss/Events/Event.cs b/src/Fluss/Events/Event.cs
index 09b7257..91597ae 100644
--- a/src/Fluss/Events/Event.cs
+++ b/src/Fluss/Events/Event.cs
@@ -1,17 +1,5 @@
-using Newtonsoft.Json;
-using Newtonsoft.Json.Linq;
-
namespace Fluss.Events;
public interface Event
{
}
-
-public static class EventExtension
-{
- public static JObject ToJObject(this Event @event)
- {
- var serializer = new JsonSerializer { TypeNameHandling = TypeNameHandling.All, TypeNameAssemblyFormatHandling = TypeNameAssemblyFormatHandling.Full };
- return JObject.FromObject(@event, serializer);
- }
-}
diff --git a/src/Fluss/Events/EventEnvelope.cs b/src/Fluss/Events/EventEnvelope.cs
index d464065..84f575b 100644
--- a/src/Fluss/Events/EventEnvelope.cs
+++ b/src/Fluss/Events/EventEnvelope.cs
@@ -1,5 +1,4 @@
-using System.Runtime.InteropServices;
-using Newtonsoft.Json.Linq;
+using System.Text.Json.Nodes;
namespace Fluss.Events;
@@ -13,7 +12,7 @@ public abstract record Envelope
public sealed record RawEventEnvelope : Envelope
{
- public required JObject RawEvent { get; init; }
+ public required JsonObject RawEvent { get; init; }
}
public record EventEnvelope : Envelope
diff --git a/src/Fluss/Events/EventSerializer.cs b/src/Fluss/Events/EventSerializer.cs
new file mode 100644
index 0000000..675c74b
--- /dev/null
+++ b/src/Fluss/Events/EventSerializer.cs
@@ -0,0 +1,37 @@
+using System.Text.Json;
+using System.Text.Json.Nodes;
+using System.Text.Json.Serialization;
+
+namespace Fluss.Events;
+
+public static class EventSerializer
+{
+ public class FooBar : JsonConverter
+ {
+
+ public override Event? Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
+ {
+ throw new NotImplementedException();
+ }
+
+ public override void Write(Utf8JsonWriter writer, Event value, JsonSerializerOptions options)
+ {
+ throw new NotImplementedException();
+ }
+ }
+
+ public static JsonSerializerOptions Options { get; } = new()
+ {
+ Converters = { new FooBar() },
+ };
+
+ public static JsonObject Serialize(Event @event)
+ {
+ return JsonSerializer.Deserialize(JsonSerializer.Serialize(@event, Options))!;
+ }
+
+ public static Event Deserialize(JsonObject json)
+ {
+ return (Event)JsonSerializer.Deserialize(json.ToString(), typeof(Event))!;
+ }
+}
\ No newline at end of file
diff --git a/src/Fluss/Events/InMemoryEventRepository.cs b/src/Fluss/Events/InMemoryEventRepository.cs
index aa423e4..ecd2ea2 100644
--- a/src/Fluss/Events/InMemoryEventRepository.cs
+++ b/src/Fluss/Events/InMemoryEventRepository.cs
@@ -1,7 +1,6 @@
using System.Collections.ObjectModel;
using Collections.Pooled;
using Fluss.Exceptions;
-using Newtonsoft.Json;
namespace Fluss.Events;
@@ -46,7 +45,7 @@ public ValueTask> GetRawEvents()
{
var rawEnvelopes = _events.Select(envelope =>
{
- var rawEvent = envelope.Event.ToJObject();
+ var rawEvent = EventSerializer.Serialize(envelope.Event);
return new RawEventEnvelope
{
@@ -71,11 +70,10 @@ public ValueTask ReplaceEvent(long at, IEnumerable newEvents)
}
_events.RemoveAt(checkedAt);
- var serializer = new JsonSerializer { TypeNameHandling = TypeNameHandling.All };
-
+
var eventEnvelopes = events.Select(envelope =>
{
- var @event = envelope.RawEvent.ToObject(serializer);
+ var @event = EventSerializer.Deserialize(envelope.RawEvent);
if (@event is null) throw new Exception("Failed to convert raw event to Event");
diff --git a/src/Fluss/Fluss.csproj b/src/Fluss/Fluss.csproj
index 0c42f56..e9f6558 100644
--- a/src/Fluss/Fluss.csproj
+++ b/src/Fluss/Fluss.csproj
@@ -11,7 +11,6 @@
https://github.com/atmina/fluss
git
MIT
- true
@@ -21,7 +20,6 @@
-
diff --git a/src/Fluss/Upcasting/IUpcaster.cs b/src/Fluss/Upcasting/IUpcaster.cs
index 9b06c9e..f3ea90d 100644
--- a/src/Fluss/Upcasting/IUpcaster.cs
+++ b/src/Fluss/Upcasting/IUpcaster.cs
@@ -1,11 +1,11 @@
using System.Collections.Immutable;
-using Newtonsoft.Json.Linq;
+using System.Text.Json.Nodes;
namespace Fluss.Upcasting;
public interface IUpcaster
{
- public IEnumerable? Upcast(JObject eventJson);
+ public IEnumerable? Upcast(JsonObject eventJson);
}
[AttributeUsage(AttributeTargets.Class)]