Skip to content

Commit 2fbabe2

Browse files
committed
feat: Enhance matching service with subscription and task dispatching capabilities; add system workers for cleanup, retries, and timers
Signed-off-by: Soeren Magnus Olesen <soeren@molesen.ch>
1 parent a487e3b commit 2fbabe2

19 files changed

Lines changed: 659 additions & 97 deletions

docs/PHASE1_PROGRESS.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -302,8 +302,8 @@ src/Odin.Persistence/
302302
### 3. Execution Engine Services
303303

304304
- **Objective**: Stand up History and Matching services that orchestrate shard ownership, task queues, and replay.
305-
- **Status**: 🧱 Infrastructure groundwork (schemas, shard repository, Go helpers) in place; service implementations pending.
306-
- **Immediate Focus**: Build HistoryService event pipelines, MatchingService task dispatch leveraging Hugo `TaskQueueChannelAdapter<T>`, and system workers (timer/retry/cleanup).
305+
- **Status**: ✅ History service persists/replays events; matching dispatch spins through `TaskQueueChannelAdapter<T>` with system workers running timers, retries, and cleanup over shared queues.
306+
- **Immediate Focus**: Broaden persistence coverage (timer/retry repositories) and integrate the services with Control Plane APIs.
307307

308308
### 4. Frontend & APIs
309309

@@ -342,9 +342,9 @@ src/Odin.Persistence/
342342

343343
### Priority 3: Execution Engine
344344

345-
- [ ] Build HistoryService with event persistence and replay
346-
- [ ] Build `MatchingService` with `TaskQueueChannelAdapter<T>`
347-
- [ ] Implement system workers (TimerWorker, RetryWorker, CleanupWorker)
345+
- [x] Build HistoryService with event persistence and replay
346+
- [x] Build `MatchingService` with `TaskQueueChannelAdapter<T>`
347+
- [x] Implement system workers (TimerWorker, RetryWorker, CleanupWorker)
348348

349349
### Priority 4: APIs
350350

src/Odin.ExecutionEngine.Matching/MatchingService.cs

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using System.Text.Json;
12
using System.Threading;
23
using Hugo;
34
using Microsoft.Extensions.Logging;
@@ -14,10 +15,12 @@ namespace Odin.ExecutionEngine.Matching;
1415
/// </summary>
1516
public sealed class MatchingService(
1617
ITaskQueueRepository taskQueueRepository,
17-
ILogger<MatchingService> logger) : IMatchingService
18+
ILogger<MatchingService> logger,
19+
JsonSerializerOptions? serializerOptions = null) : IMatchingService
1820
{
1921
private readonly ITaskQueueRepository _taskQueueRepository = taskQueueRepository;
2022
private readonly ILogger<MatchingService> _logger = logger;
23+
private readonly JsonSerializerOptions _serializerOptions = serializerOptions ?? JsonOptions.Default;
2124

2225
/// <summary>
2326
/// Enqueues a workflow or activity task.
@@ -253,6 +256,32 @@ public async Task<Result<int>> ReclaimExpiredLeasesAsync(
253256
Error.From($"Reclaim failed: {ex.Message}", OdinErrorCodes.TaskQueueError));
254257
}
255258
}
259+
260+
public Task<MatchingSubscription> SubscribeAsync(
261+
string queueName,
262+
string workerIdentity,
263+
CancellationToken cancellationToken = default)
264+
{
265+
var dispatcher = new TaskQueueDispatcher(
266+
queueName,
267+
workerIdentity,
268+
_taskQueueRepository,
269+
_serializerOptions,
270+
_logger);
271+
272+
if (cancellationToken.CanBeCanceled)
273+
{
274+
#pragma warning disable CS4014
275+
cancellationToken.Register(state =>
276+
{
277+
var d = (TaskQueueDispatcher)state!;
278+
_ = d.DisposeAsync();
279+
}, dispatcher, useSynchronizationContext: false);
280+
#pragma warning restore CS4014
281+
}
282+
283+
return Task.FromResult(new MatchingSubscription(dispatcher));
284+
}
256285
}
257286

258287
/// <summary>
@@ -291,6 +320,11 @@ Task<Result<QueueStats>> GetQueueStatsAsync(
291320

292321
Task<Result<int>> ReclaimExpiredLeasesAsync(
293322
CancellationToken cancellationToken = default);
323+
324+
Task<MatchingSubscription> SubscribeAsync(
325+
string queueName,
326+
string workerIdentity,
327+
CancellationToken cancellationToken = default);
294328
}
295329

296330
/// <summary>
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
using System.Threading.Channels;
2+
3+
namespace Odin.ExecutionEngine.Matching;
4+
5+
public sealed class MatchingSubscription : IAsyncDisposable
6+
{
7+
private readonly TaskQueueDispatcher _dispatcher;
8+
9+
internal MatchingSubscription(TaskQueueDispatcher dispatcher)
10+
{
11+
_dispatcher = dispatcher;
12+
}
13+
14+
public ChannelReader<MatchingTask> Reader => _dispatcher.Reader;
15+
16+
public ValueTask DisposeAsync() => _dispatcher.DisposeAsync();
17+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
using Hugo;
2+
using Odin.Contracts;
3+
using Odin.Sdk;
4+
using static Hugo.Go;
5+
using Unit = Hugo.Go.Unit;
6+
7+
namespace Odin.ExecutionEngine.Matching;
8+
9+
public sealed class MatchingTask
10+
{
11+
private readonly Func<CancellationToken, Task<Result<Unit>>> _complete;
12+
private readonly Func<string, bool, CancellationToken, Task<Result<Unit>>> _fail;
13+
private readonly Func<CancellationToken, Task<Result<TaskLease>>> _heartbeat;
14+
15+
internal MatchingTask(
16+
TaskDispatchItem dispatchItem,
17+
Func<CancellationToken, Task<Result<Unit>>> complete,
18+
Func<string, bool, CancellationToken, Task<Result<Unit>>> fail,
19+
Func<CancellationToken, Task<Result<TaskLease>>> heartbeat)
20+
{
21+
WorkflowTask = dispatchItem.WorkflowTask;
22+
Lease = dispatchItem.Lease;
23+
_complete = complete;
24+
_fail = fail;
25+
_heartbeat = heartbeat;
26+
}
27+
28+
public WorkflowTask WorkflowTask { get; }
29+
30+
public TaskLease Lease { get; }
31+
32+
public Task<Result<Unit>> CompleteAsync(CancellationToken cancellationToken = default)
33+
=> _complete(cancellationToken);
34+
35+
public Task<Result<Unit>> FailAsync(string reason, bool requeue, CancellationToken cancellationToken = default)
36+
=> _fail(reason, requeue, cancellationToken);
37+
38+
public Task<Result<TaskLease>> HeartbeatAsync(CancellationToken cancellationToken = default)
39+
=> _heartbeat(cancellationToken);
40+
}

src/Odin.ExecutionEngine.Matching/Odin.ExecutionEngine.Matching.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
<ProjectReference Include="../Odin.Contracts/Odin.Contracts.csproj" />
1111
<ProjectReference Include="../Odin.Core/Odin.Core.csproj" />
1212
<ProjectReference Include="../Odin.Persistence/Odin.Persistence.csproj" />
13+
<ProjectReference Include="../Odin.Sdk/Odin.Sdk.csproj" />
1314
</ItemGroup>
1415

1516
</Project>
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
using System.Text.Json;
2+
using Hugo;
3+
using Microsoft.Extensions.Logging;
4+
using Odin.Contracts;
5+
using Odin.Core;
6+
using Odin.Persistence.Interfaces;
7+
using Odin.Sdk;
8+
using static Hugo.Go;
9+
10+
namespace Odin.ExecutionEngine.Matching;
11+
12+
internal sealed record TaskDispatchItem(
13+
TaskLease Lease,
14+
WorkflowTask WorkflowTask,
15+
Func<CancellationToken, Task<Result<Unit>>> CompleteAsync,
16+
Func<string, bool, CancellationToken, Task<Result<Unit>>> FailAsync,
17+
Func<CancellationToken, Task<Result<TaskLease>>> HeartbeatAsync)
18+
{
19+
public static Result<TaskDispatchItem> Create(
20+
TaskLease lease,
21+
ITaskQueueRepository repository,
22+
JsonSerializerOptions serializerOptions,
23+
ILogger logger)
24+
{
25+
WorkflowTask workflowTask;
26+
try
27+
{
28+
workflowTask = DeserializeWorkflowTask(lease.Task, serializerOptions);
29+
}
30+
catch (Exception ex)
31+
{
32+
logger.LogError(ex, "Failed to deserialize workflow task payload for lease {LeaseId}", lease.LeaseId);
33+
return Result.Fail<TaskDispatchItem>(Error.FromException(ex));
34+
}
35+
36+
async Task<Result<Unit>> CompleteAsync(CancellationToken cancellationToken)
37+
=> await repository.CompleteAsync(lease.LeaseId, cancellationToken).ConfigureAwait(false);
38+
39+
async Task<Result<Unit>> FailAsync(string reason, bool requeue, CancellationToken cancellationToken)
40+
=> await repository.FailAsync(lease.LeaseId, reason, requeue, cancellationToken).ConfigureAwait(false);
41+
42+
async Task<Result<TaskLease>> HeartbeatAsync(CancellationToken cancellationToken)
43+
=> await repository.HeartbeatAsync(lease.LeaseId, cancellationToken).ConfigureAwait(false);
44+
45+
return Result.Ok(new TaskDispatchItem(lease, workflowTask, CompleteAsync, FailAsync, HeartbeatAsync));
46+
}
47+
48+
private static WorkflowTask DeserializeWorkflowTask(TaskQueueItem item, JsonSerializerOptions serializerOptions)
49+
{
50+
var json = item.TaskData.RootElement.GetRawText();
51+
var workflowTask = JsonSerializer.Deserialize<WorkflowTask>(json, serializerOptions);
52+
if (workflowTask is null)
53+
{
54+
throw new InvalidOperationException("Workflow task payload could not be deserialized.");
55+
}
56+
57+
return workflowTask;
58+
}
59+
}

0 commit comments

Comments
 (0)