feature: Add option to configure custom azure servicebus processor options#140
feature: Add option to configure custom azure servicebus processor options#140jwdb wants to merge 1 commit into
Conversation
Greptile SummaryThis PR adds a Confidence Score: 3/5Not safe to merge without addressing the AutoCompleteMessages = false gap — that option will silently dead-letter all messages. One P1 logic bug (manual message completion is architecturally impossible with the current Worker design) prevents the advertised feature from working; two P2 issues (wrong doc comment, single-instance options for all workers) are non-blocking. Worker.cs needs ProcessMessageEventArgs to be forwarded or the AutoCompleteMessages use-case to be documented as unsupported.
|
| Filename | Overview |
|---|---|
| src/modules/servicebus/Elsa.ServiceBus.AzureServiceBus/Features/AzureServiceBusFeature.cs | Adds ProcessorOptionsFactory property (singleton-registered) and an incorrect XML doc comment referencing the wrong type; all workers share one ServiceBusProcessorOptions instance. |
| src/modules/servicebus/Elsa.ServiceBus.AzureServiceBus/Services/Worker.cs | Injects ServiceBusProcessorOptions from DI; but ProcessMessageEventArgs is not forwarded to workflows, making AutoCompleteMessages = false effectively broken. |
| src/modules/servicebus/Elsa.ServiceBus.AzureServiceBus/Options/AzureServiceBusOptions.cs | Adds using Azure.Messaging.ServiceBus import — trivial, no issues. |
Sequence Diagram
sequenceDiagram
participant App as Application Startup
participant Feature as AzureServiceBusFeature
participant DI as DI Container
participant WM as WorkerManager
participant W as Worker
participant SB as ServiceBusClient
App->>Feature: Apply()
Feature->>DI: AddSingleton(ProcessorOptionsFactory)
Feature->>DI: AddSingleton WorkerManager
App->>WM: StartWorkerAsync(queueOrTopic, subscription)
WM->>DI: ActivatorUtilities.CreateInstance Worker
DI-->>WM: ServiceBusClient, ServiceBusProcessorOptions singleton, IServiceScopeFactory
WM->>W: new Worker(queueOrTopic, subscription, client, processorOptions, ...)
W->>SB: CreateProcessor(queueOrTopic, processorOptions)
SB-->>W: ServiceBusProcessor
W->>W: subscribe ProcessMessageAsync and ProcessErrorAsync
W->>SB: StartProcessingAsync()
Note over W,SB: On message received
SB->>W: OnMessageReceivedAsync(ProcessMessageEventArgs args)
W->>W: InvokeWorkflowsAsync(args.Message, args.CancellationToken)
Note right of W: args with CompleteMessageAsync etc. is discarded
Comments Outside Diff (1)
-
src/modules/servicebus/Elsa.ServiceBus.AzureServiceBus/Services/Worker.cs, line 63 (link)AutoCompleteMessages = falseunusable —ProcessMessageEventArgsnot forwardedThe PR description explicitly lists "not autocomplete messages (but manually in the workflow)" as a supported scenario, but
OnMessageReceivedAsynconly passesargs.Messageandargs.CancellationTokentoInvokeWorkflowsAsync— theProcessMessageEventArgsobject (which exposesCompleteMessageAsync,AbandonMessageAsync,DeadLetterMessageAsync, etc.) is discarded. There are no workflow activities to complete messages either. WithAutoCompleteMessages = false, every received message will be left unsettled, its lock will eventually expire, and the message will be redelivered until the dead-letter threshold is reached.Prompt To Fix With AI
This is a comment left during a code review. Path: src/modules/servicebus/Elsa.ServiceBus.AzureServiceBus/Services/Worker.cs Line: 63 Comment: **`AutoCompleteMessages = false` unusable — `ProcessMessageEventArgs` not forwarded** The PR description explicitly lists "not autocomplete messages (but manually in the workflow)" as a supported scenario, but `OnMessageReceivedAsync` only passes `args.Message` and `args.CancellationToken` to `InvokeWorkflowsAsync` — the `ProcessMessageEventArgs` object (which exposes `CompleteMessageAsync`, `AbandonMessageAsync`, `DeadLetterMessageAsync`, etc.) is discarded. There are no workflow activities to complete messages either. With `AutoCompleteMessages = false`, every received message will be left unsettled, its lock will eventually expire, and the message will be redelivered until the dead-letter threshold is reached. How can I resolve this? If you propose a fix, please make it concise.
Prompt To Fix All With AI
Fix the following 3 code review issues. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 3
src/modules/servicebus/Elsa.ServiceBus.AzureServiceBus/Services/Worker.cs:63
**`AutoCompleteMessages = false` unusable — `ProcessMessageEventArgs` not forwarded**
The PR description explicitly lists "not autocomplete messages (but manually in the workflow)" as a supported scenario, but `OnMessageReceivedAsync` only passes `args.Message` and `args.CancellationToken` to `InvokeWorkflowsAsync` — the `ProcessMessageEventArgs` object (which exposes `CompleteMessageAsync`, `AbandonMessageAsync`, `DeadLetterMessageAsync`, etc.) is discarded. There are no workflow activities to complete messages either. With `AutoCompleteMessages = false`, every received message will be left unsettled, its lock will eventually expire, and the message will be redelivered until the dead-letter threshold is reached.
### Issue 2 of 3
src/modules/servicebus/Elsa.ServiceBus.AzureServiceBus/Features/AzureServiceBusFeature.cs:40-42
**Incorrect XML doc comment references wrong type**
The summary says "A delegate to create a `<see cref="AzureServiceBusOptions"/>` instance" but the property creates a `ServiceBusProcessorOptions` instance, not an `AzureServiceBusOptions`.
### Issue 3 of 3
src/modules/servicebus/Elsa.ServiceBus.AzureServiceBus/Features/AzureServiceBusFeature.cs:76
**Singleton `ServiceBusProcessorOptions` shared across all workers**
`ProcessorOptionsFactory` is registered as a singleton, so every `Worker` instance — across all queues and topics — receives the exact same `ServiceBusProcessorOptions` object. If a user needs different concurrency, prefetch, or timeout settings per queue/topic, this design prevents it. Consider documenting this limitation or changing the approach (e.g., a `Func<string, string?, ServiceBusProcessorOptions>` keyed by queue/subscription).
Reviews (1): Last reviewed commit: "feature: Add option to configure custom ..." | Re-trigger Greptile
| /// A delegate to create a <see cref="AzureServiceBusOptions"/> instance. | ||
| /// </summary> | ||
| public Func<IServiceProvider, ServiceBusProcessorOptions> ProcessorOptionsFactory { get; set; } = _ => new ServiceBusProcessorOptions(); |
There was a problem hiding this comment.
Incorrect XML doc comment references wrong type
The summary says "A delegate to create a <see cref="AzureServiceBusOptions"/> instance" but the property creates a ServiceBusProcessorOptions instance, not an AzureServiceBusOptions.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/modules/servicebus/Elsa.ServiceBus.AzureServiceBus/Features/AzureServiceBusFeature.cs
Line: 40-42
Comment:
**Incorrect XML doc comment references wrong type**
The summary says "A delegate to create a `<see cref="AzureServiceBusOptions"/>` instance" but the property creates a `ServiceBusProcessorOptions` instance, not an `AzureServiceBusOptions`.
How can I resolve this? If you propose a fix, please make it concise.| Services | ||
| .AddSingleton(ServiceBusAdministrationClientFactory) | ||
| .AddSingleton(ServiceBusClientFactory) | ||
| .AddSingleton(ProcessorOptionsFactory) |
There was a problem hiding this comment.
Singleton
ServiceBusProcessorOptions shared across all workers
ProcessorOptionsFactory is registered as a singleton, so every Worker instance — across all queues and topics — receives the exact same ServiceBusProcessorOptions object. If a user needs different concurrency, prefetch, or timeout settings per queue/topic, this design prevents it. Consider documenting this limitation or changing the approach (e.g., a Func<string, string?, ServiceBusProcessorOptions> keyed by queue/subscription).
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/modules/servicebus/Elsa.ServiceBus.AzureServiceBus/Features/AzureServiceBusFeature.cs
Line: 76
Comment:
**Singleton `ServiceBusProcessorOptions` shared across all workers**
`ProcessorOptionsFactory` is registered as a singleton, so every `Worker` instance — across all queues and topics — receives the exact same `ServiceBusProcessorOptions` object. If a user needs different concurrency, prefetch, or timeout settings per queue/topic, this design prevents it. Consider documenting this limitation or changing the approach (e.g., a `Func<string, string?, ServiceBusProcessorOptions>` keyed by queue/subscription).
How can I resolve this? If you propose a fix, please make it concise.
This allows the user to configure the processor for concurrent calls, prefetching, to not autocomplete messages (but manually in the workflow for example) or reduce or increase the timeouts.