diff --git a/src/WorkflowCore.DSL/Services/DefinitionLoader.cs b/src/WorkflowCore.DSL/Services/DefinitionLoader.cs index aa22988eb..5d6595550 100644 --- a/src/WorkflowCore.DSL/Services/DefinitionLoader.cs +++ b/src/WorkflowCore.DSL/Services/DefinitionLoader.cs @@ -23,7 +23,7 @@ public class DefinitionLoader : IDefinitionLoader // ParsingConfig to allow access to commonly used .NET methods like object.Equals private static readonly ParsingConfig ParsingConfig = new ParsingConfig { - AllowNewToEvaluateAnyType = true, + AllowNewToEvaluateAnyType = false, AreContextKeywordsEnabled = true }; @@ -62,7 +62,15 @@ public DefinitionLoader(IWorkflowRegistry registry, ITypeResolver typeResolver) public WorkflowDefinition LoadDefinition(string source, Func deserializer) { + if (string.IsNullOrWhiteSpace(source)) + throw new ArgumentNullException(nameof(source)); + if (deserializer == null) + throw new ArgumentNullException(nameof(deserializer)); + var sourceObj = deserializer(source); + if (sourceObj == null) + throw new InvalidOperationException("Deserialization returned null."); + var def = Convert(sourceObj); _registry.RegisterWorkflow(def); return def; @@ -110,11 +118,17 @@ private WorkflowStepCollection ConvertSteps(ICollection source, Ty { containerType = typeof(WorkflowStep<>).MakeGenericType(stepType); - targetStep = (containerType.GetConstructor(new Type[] { }).Invoke(null) as WorkflowStep); + var ctor = containerType.GetConstructor(new Type[] { }); + if (ctor == null) + throw new InvalidOperationException($"Type '{containerType.FullName}' does not have a parameterless constructor."); + targetStep = ctor.Invoke(null) as WorkflowStep; } else { - targetStep = stepType.GetConstructor(new Type[] { }).Invoke(null) as WorkflowStep; + var ctor = stepType.GetConstructor(new Type[] { }); + if (ctor == null) + throw new InvalidOperationException($"Type '{stepType.FullName}' does not have a parameterless constructor."); + targetStep = ctor.Invoke(null) as WorkflowStep; if (targetStep != null) stepType = targetStep.BodyType; } @@ -122,15 +136,25 @@ private WorkflowStepCollection ConvertSteps(ICollection source, Ty if (nextStep.Saga) { containerType = typeof(SagaContainer<>).MakeGenericType(stepType); - targetStep = (containerType.GetConstructor(new Type[] { }).Invoke(null) as WorkflowStep); + var sagaCtor = containerType.GetConstructor(new Type[] { }); + if (sagaCtor == null) + throw new InvalidOperationException($"Type '{containerType.FullName}' does not have a parameterless constructor."); + targetStep = sagaCtor.Invoke(null) as WorkflowStep; } if (!string.IsNullOrEmpty(nextStep.CancelCondition)) { var cancelExprType = typeof(Expression<>).MakeGenericType(typeof(Func<,>).MakeGenericType(dataType, typeof(bool))); var dataParameter = Expression.Parameter(dataType, "data"); - var cancelExpr = DynamicExpressionParser.ParseLambda(ParsingConfig, false, new[] { dataParameter }, typeof(bool), TransformExpression(nextStep.CancelCondition)); - targetStep.CancelCondition = cancelExpr; + try + { + var cancelExpr = DynamicExpressionParser.ParseLambda(ParsingConfig, false, new[] { dataParameter }, typeof(bool), TransformExpression(nextStep.CancelCondition)); + targetStep.CancelCondition = cancelExpr; + } + catch (Exception ex) when (ex is System.Linq.Dynamic.Core.Exceptions.ParseException || ex is InvalidOperationException) + { + throw new WorkflowDefinitionLoadException($"Error parsing cancel condition expression '{nextStep.CancelCondition}' for step '{nextStep.Id}': {ex.Message}", ex); + } } targetStep.Id = i; @@ -252,7 +276,15 @@ private void AttachOutputs(StepSourceV1 source, Type dataType, Type stepType, Wo foreach (var output in source.Outputs) { var stepParameter = Expression.Parameter(stepType, "step"); - var sourceExpr = DynamicExpressionParser.ParseLambda(ParsingConfig, false, new[] { stepParameter }, typeof(object), TransformExpression(output.Value)); + LambdaExpression sourceExpr; + try + { + sourceExpr = DynamicExpressionParser.ParseLambda(ParsingConfig, false, new[] { stepParameter }, typeof(object), TransformExpression(output.Value)); + } + catch (Exception ex) when (ex is System.Linq.Dynamic.Core.Exceptions.ParseException || ex is InvalidOperationException) + { + throw new WorkflowDefinitionLoadException($"Error parsing output expression '{output.Value}': {ex.Message}", ex); + } var dataParameter = Expression.Parameter(dataType, "data"); @@ -288,7 +320,15 @@ private void AttachDirectlyOutput(KeyValuePair output, WorkflowS Action acn = (pStep, pData) => { - object resolvedValue = sourceExpr.Compile().DynamicInvoke(pStep); ; + object resolvedValue; + try + { + resolvedValue = sourceExpr.Compile().DynamicInvoke(pStep); + } + catch (TargetInvocationException ex) + { + throw new InvalidOperationException($"Error evaluating expression for output property.", ex.InnerException ?? ex); + } propertyInfo.SetValue(pData, resolvedValue, new object[] { output.Key }); }; @@ -340,8 +380,24 @@ private void AttachNestedOutput(KeyValuePair output, WorkflowSte Action acn = (pStep, pData) => { var targetExpr = Expression.Lambda(memberExpression, dataParameter); - object data = targetExpr.Compile().DynamicInvoke(pData); - object resolvedValue = sourceExpr.Compile().DynamicInvoke(pStep); ; + object data; + try + { + data = targetExpr.Compile().DynamicInvoke(pData); + } + catch (TargetInvocationException ex) + { + throw new InvalidOperationException($"Error evaluating expression for nested output property.", ex.InnerException ?? ex); + } + object resolvedValue; + try + { + resolvedValue = sourceExpr.Compile().DynamicInvoke(pStep); + } + catch (TargetInvocationException ex) + { + throw new InvalidOperationException($"Error evaluating expression for nested output property.", ex.InnerException ?? ex); + } propertyInfo.SetValue(data, resolvedValue, new object[] { items[1] }); }; @@ -354,7 +410,7 @@ private void AttachNestedOutput(KeyValuePair output, WorkflowSte { targetProperty = Expression.Property(targetProperty, propertyName); } - catch + catch (ArgumentException) { targetProperty = null; break; @@ -379,7 +435,15 @@ private void AttachOutcomes(StepSourceV1 source, Type dataType, WorkflowStep ste foreach (var nextStep in source.SelectNextStep) { - var sourceDelegate = DynamicExpressionParser.ParseLambda(ParsingConfig, false, new[] { dataParameter, outcomeParameter }, typeof(object), TransformExpression(nextStep.Value)).Compile(); + Delegate sourceDelegate; + try + { + sourceDelegate = DynamicExpressionParser.ParseLambda(ParsingConfig, false, new[] { dataParameter, outcomeParameter }, typeof(object), TransformExpression(nextStep.Value)).Compile(); + } + catch (Exception ex) when (ex is System.Linq.Dynamic.Core.Exceptions.ParseException || ex is InvalidOperationException) + { + throw new WorkflowDefinitionLoadException($"Error parsing select next step expression '{nextStep.Value}': {ex.Message}", ex); + } Expression> sourceExpr = (data, outcome) => System.Convert.ToBoolean(sourceDelegate.DynamicInvoke(data, outcome)); step.Outcomes.Add(new ExpressionOutcome(sourceExpr) { @@ -396,13 +460,38 @@ private Type FindType(string name) private static Action BuildScalarInputAction(KeyValuePair input, ParameterExpression dataParameter, ParameterExpression contextParameter, ParameterExpression environmentVarsParameter, PropertyInfo stepProperty) { var expr = System.Convert.ToString(input.Value); - var sourceExpr = DynamicExpressionParser.ParseLambda(ParsingConfig, false, new[] { dataParameter, contextParameter, environmentVarsParameter }, typeof(object), TransformExpression(expr)); + LambdaExpression sourceExpr; + try + { + sourceExpr = DynamicExpressionParser.ParseLambda(ParsingConfig, false, new[] { dataParameter, contextParameter, environmentVarsParameter }, typeof(object), TransformExpression(expr)); + } + catch (Exception ex) when (ex is System.Linq.Dynamic.Core.Exceptions.ParseException || ex is InvalidOperationException) + { + throw new WorkflowDefinitionLoadException($"Error parsing input expression '{expr}' for property '{input.Key}': {ex.Message}", ex); + } void acn(IStepBody pStep, object pData, IStepExecutionContext pContext) { - object resolvedValue = sourceExpr.Compile().DynamicInvoke(pData, pContext, Environment.GetEnvironmentVariables()); + object resolvedValue; + try + { + resolvedValue = sourceExpr.Compile().DynamicInvoke(pData, pContext, Environment.GetEnvironmentVariables()); + } + catch (TargetInvocationException ex) + { + throw new InvalidOperationException($"Error evaluating expression for step property.", ex.InnerException ?? ex); + } if (stepProperty.PropertyType.IsEnum) - stepProperty.SetValue(pStep, Enum.Parse(stepProperty.PropertyType, (string)resolvedValue, true)); + { + try + { + stepProperty.SetValue(pStep, Enum.Parse(stepProperty.PropertyType, resolvedValue?.ToString() ?? string.Empty, true)); + } + catch (ArgumentException ex) + { + throw new InvalidOperationException($"Invalid enum value '{resolvedValue}' for property '{stepProperty.Name}' of type '{stepProperty.PropertyType.Name}'.", ex); + } + } else { if ((resolvedValue != null) && (stepProperty.PropertyType.IsAssignableFrom(resolvedValue.GetType()))) @@ -430,7 +519,15 @@ void acn(IStepBody pStep, object pData, IStepExecutionContext pContext) if (prop.Name.StartsWith("@")) { var sourceExpr = DynamicExpressionParser.ParseLambda(ParsingConfig, false, new[] { dataParameter, contextParameter, environmentVarsParameter }, typeof(object), TransformExpression(prop.Value.ToString())); - object resolvedValue = sourceExpr.Compile().DynamicInvoke(pData, pContext, Environment.GetEnvironmentVariables()); + object resolvedValue; + try + { + resolvedValue = sourceExpr.Compile().DynamicInvoke(pData, pContext, Environment.GetEnvironmentVariables()); + } + catch (TargetInvocationException ex) + { + throw new InvalidOperationException($"Error evaluating expression for step property.", ex.InnerException ?? ex); + } subobj.Remove(prop.Name); subobj.Add(prop.Name.TrimStart('@'), JToken.FromObject(resolvedValue)); } diff --git a/src/WorkflowCore.DSL/Services/Deserializers.cs b/src/WorkflowCore.DSL/Services/Deserializers.cs index 4958f6f09..ee3fa71c4 100644 --- a/src/WorkflowCore.DSL/Services/Deserializers.cs +++ b/src/WorkflowCore.DSL/Services/Deserializers.cs @@ -9,7 +9,8 @@ public static class Deserializers { private static Serializer yamlSerializer = new Serializer(); - public static Func Json = (source) => JsonConvert.DeserializeObject(source); + public static Func Json = (source) => + JsonConvert.DeserializeObject(source, new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.None }); public static Func Yaml = (source) => yamlSerializer.DeserializeInto(source, new DefinitionSourceV1()); } diff --git a/src/WorkflowCore.DSL/Services/TypeResolver.cs b/src/WorkflowCore.DSL/Services/TypeResolver.cs index 992d38948..57b998ab6 100644 --- a/src/WorkflowCore.DSL/Services/TypeResolver.cs +++ b/src/WorkflowCore.DSL/Services/TypeResolver.cs @@ -9,7 +9,14 @@ public class TypeResolver : ITypeResolver { public Type FindType(string name) { - return Type.GetType(name, true, true); + try + { + return Type.GetType(name, true, true); + } + catch (TypeLoadException ex) + { + throw new InvalidOperationException($"Could not resolve type '{name}'. Ensure the type exists and the assembly is referenced.", ex); + } } } } diff --git a/src/WorkflowCore.Testing/JsonWorkflowTest.cs b/src/WorkflowCore.Testing/JsonWorkflowTest.cs index 4e8f4281b..cda115ba5 100644 --- a/src/WorkflowCore.Testing/JsonWorkflowTest.cs +++ b/src/WorkflowCore.Testing/JsonWorkflowTest.cs @@ -17,6 +17,7 @@ public abstract class JsonWorkflowTest : IDisposable protected IDefinitionLoader DefinitionLoader; protected IWorkflowRegistry Registry; protected List UnhandledStepErrors = new List(); + private ServiceProvider _serviceProvider; protected virtual void Setup() { @@ -25,16 +26,16 @@ protected virtual void Setup() services.AddLogging(); ConfigureServices(services); - var serviceProvider = services.BuildServiceProvider(); + _serviceProvider = services.BuildServiceProvider(); //config logging - var loggerFactory = serviceProvider.GetService(); + var loggerFactory = _serviceProvider.GetService(); //loggerFactory.AddConsole(LogLevel.Debug); - PersistenceProvider = serviceProvider.GetService(); - DefinitionLoader = serviceProvider.GetService(); - Registry = serviceProvider.GetService(); - Host = serviceProvider.GetService(); + PersistenceProvider = _serviceProvider.GetService(); + DefinitionLoader = _serviceProvider.GetService(); + Registry = _serviceProvider.GetService(); + Host = _serviceProvider.GetService(); Host.OnStepError += Host_OnStepError; Host.Start(); } @@ -104,6 +105,7 @@ protected TData GetData(string workflowId) public void Dispose() { Host.Stop(); + _serviceProvider?.Dispose(); } } diff --git a/src/WorkflowCore.Testing/WorkflowTest.cs b/src/WorkflowCore.Testing/WorkflowTest.cs index bf0eb97ab..d7189e492 100644 --- a/src/WorkflowCore.Testing/WorkflowTest.cs +++ b/src/WorkflowCore.Testing/WorkflowTest.cs @@ -17,6 +17,7 @@ public abstract class WorkflowTest : IDisposable protected IWorkflowHost Host; protected IPersistenceProvider PersistenceProvider; protected List UnhandledStepErrors = new List(); + private ServiceProvider _serviceProvider; protected virtual void Setup() { @@ -25,10 +26,10 @@ protected virtual void Setup() services.AddLogging(); ConfigureServices(services); - var serviceProvider = services.BuildServiceProvider(); + _serviceProvider = services.BuildServiceProvider(); - PersistenceProvider = serviceProvider.GetService(); - Host = serviceProvider.GetService(); + PersistenceProvider = _serviceProvider.GetService(); + Host = _serviceProvider.GetService(); Host.RegisterWorkflow(); Host.OnStepError += Host_OnStepError; Host.Start(); @@ -119,6 +120,7 @@ protected TData GetData(string workflowId) public void Dispose() { Host.Stop(); + _serviceProvider?.Dispose(); } } diff --git a/src/WorkflowCore/Exceptions/WorkflowDefinitionLoadException.cs b/src/WorkflowCore/Exceptions/WorkflowDefinitionLoadException.cs index 6cc35d6f0..7b5257cfa 100644 --- a/src/WorkflowCore/Exceptions/WorkflowDefinitionLoadException.cs +++ b/src/WorkflowCore/Exceptions/WorkflowDefinitionLoadException.cs @@ -8,5 +8,10 @@ public WorkflowDefinitionLoadException(string message) : base (message) { } + + public WorkflowDefinitionLoadException(string message, Exception innerException) + : base(message, innerException) + { + } } } diff --git a/src/WorkflowCore/Services/ActivityController.cs b/src/WorkflowCore/Services/ActivityController.cs index 491f9c47e..194507e3d 100644 --- a/src/WorkflowCore/Services/ActivityController.cs +++ b/src/WorkflowCore/Services/ActivityController.cs @@ -30,14 +30,18 @@ public async Task GetPendingActivity(string activityName, strin var endTime = _dateTimeProvider.UtcNow.Add(timeout ?? TimeSpan.Zero); var firstPass = true; EventSubscription subscription = null; + bool lockAcquired = false; while ((subscription == null && _dateTimeProvider.UtcNow < endTime) || firstPass) { if (!firstPass) await Task.Delay(100); subscription = await _subscriptionRepository.GetFirstOpenSubscription(Event.EventTypeActivity, activityName, _dateTimeProvider.UtcNow); if (subscription != null) - if (!await _lockProvider.AcquireLock($"sub:{subscription.Id}", CancellationToken.None)) + { + lockAcquired = await _lockProvider.AcquireLock($"sub:{subscription.Id}", CancellationToken.None); + if (!lockAcquired) subscription = null; + } firstPass = false; } if (subscription == null) @@ -51,7 +55,7 @@ public async Task GetPendingActivity(string activityName, strin Token = token.Encode(), ActivityName = subscription.EventKey, Parameters = subscription.SubscriptionData, - TokenExpiry = new DateTime(DateTime.MaxValue.Ticks, DateTimeKind.Utc) + TokenExpiry = DateTime.SpecifyKind(DateTime.MaxValue, DateTimeKind.Utc) }; if (!await _subscriptionRepository.SetSubscriptionToken(subscription.Id, result.Token, workerId, result.TokenExpiry)) @@ -61,7 +65,8 @@ public async Task GetPendingActivity(string activityName, strin } finally { - await _lockProvider.ReleaseLock($"sub:{subscription.Id}"); + if (lockAcquired) + await _lockProvider.ReleaseLock($"sub:{subscription.Id}"); } } diff --git a/src/WorkflowCore/Services/BackgroundTasks/IndexConsumer.cs b/src/WorkflowCore/Services/BackgroundTasks/IndexConsumer.cs index 29565e647..2947b1967 100644 --- a/src/WorkflowCore/Services/BackgroundTasks/IndexConsumer.cs +++ b/src/WorkflowCore/Services/BackgroundTasks/IndexConsumer.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; @@ -14,7 +15,7 @@ internal class IndexConsumer : QueueConsumer, IBackgroundTask private readonly ISearchIndex _searchIndex; private readonly ObjectPool _persistenceStorePool; private readonly ILogger _logger; - private readonly Dictionary _errorCounts = new Dictionary(); + private readonly ConcurrentDictionary _errorCounts = new ConcurrentDictionary(); protected override QueueType Queue => QueueType.Index; protected override bool EnableSecondPasses => true; @@ -35,23 +36,12 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance WorkflowActivity.Enrich(workflow, "index"); await _searchIndex.IndexWorkflow(workflow); - lock (_errorCounts) - { - _errorCounts.Remove(itemId); - } + _errorCounts.TryRemove(itemId, out _); } catch (Exception e) { Logger.LogWarning(default(EventId), $"Error indexing workfow - {itemId} - {e.Message}"); - var errCount = 0; - lock (_errorCounts) - { - if (!_errorCounts.ContainsKey(itemId)) - _errorCounts.Add(itemId, 0); - - _errorCounts[itemId]++; - errCount = _errorCounts[itemId]; - } + var errCount = _errorCounts.AddOrUpdate(itemId, 1, (key, count) => count + 1); if (errCount < 5) { @@ -65,10 +55,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance return; } - lock (_errorCounts) - { - _errorCounts.Remove(itemId); - } + _errorCounts.TryRemove(itemId, out _); Logger.LogError(default(EventId), e, $"Unable to index workfow - {itemId} - {e.Message}"); } diff --git a/src/WorkflowCore/Services/BackgroundTasks/QueueConsumer.cs b/src/WorkflowCore/Services/BackgroundTasks/QueueConsumer.cs index 8295ea983..0d9ff1645 100644 --- a/src/WorkflowCore/Services/BackgroundTasks/QueueConsumer.cs +++ b/src/WorkflowCore/Services/BackgroundTasks/QueueConsumer.cs @@ -58,7 +58,10 @@ public virtual void Stop() _cancellationTokenSource.Cancel(); if (DispatchTask != null) { - DispatchTask.Wait(); + if (!DispatchTask.Wait(TimeSpan.FromSeconds(30))) + { + Logger.LogWarning("Dispatch task did not complete within 30 seconds during shutdown"); + } DispatchTask = null; } } @@ -89,6 +92,7 @@ private async Task Execute() if (item == null) { activity?.Dispose(); + activity = null; if (!QueueProvider.IsDequeueBlocking) await Task.Delay(Options.IdleTime, cancelToken); continue; @@ -107,6 +111,7 @@ private async Task Execute() if (!EnableSecondPasses) await QueueProvider.QueueWork(item, Queue); activity?.Dispose(); + activity = null; continue; } @@ -159,9 +164,9 @@ private async Task Execute() { await Task.WhenAll(tasksToAwait); } - catch + catch (Exception ex) { - // Individual task exceptions are already logged in ExecuteItem + Logger.LogWarning(ex, "One or more queued tasks failed"); } } } diff --git a/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs b/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs index c7d13138b..bc564bcee 100644 --- a/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs +++ b/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs @@ -96,7 +96,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance var readAheadTicks = _datetimeProvider.UtcNow.Add(Options.PollInterval).Ticks; if (workflow.NextExecution.Value < readAheadTicks) { - new Task(() => FutureQueue(workflow, cancellationToken)).Start(); + _ = FutureQueue(workflow, cancellationToken); } else { @@ -160,7 +160,7 @@ private async Task TryProcessSubscription(EventSubscription subscription, IPersi } } - private async void FutureQueue(WorkflowInstance workflow, CancellationToken cancellationToken) + private async Task FutureQueue(WorkflowInstance workflow, CancellationToken cancellationToken) { try { diff --git a/src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs b/src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs index cb69c8791..89e0edd25 100644 --- a/src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs +++ b/src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs @@ -78,7 +78,7 @@ public async Task GetWorkflowInstance(string Id, CancellationT { lock (_instances) { - return _instances.First(x => x.Id == Id); + return _instances.FirstOrDefault(x => x.Id == Id); } } @@ -149,8 +149,9 @@ public async Task TerminateSubscription(string eventSubscriptionId, Cancellation { lock (_subscriptions) { - var sub = _subscriptions.Single(x => x.Id == eventSubscriptionId); - _subscriptions.Remove(sub); + var sub = _subscriptions.SingleOrDefault(x => x.Id == eventSubscriptionId); + if (sub != null) + _subscriptions.Remove(sub); } } @@ -158,7 +159,7 @@ public Task GetSubscription(string eventSubscriptionId, Cance { lock (_subscriptions) { - var sub = _subscriptions.Single(x => x.Id == eventSubscriptionId); + var sub = _subscriptions.SingleOrDefault(x => x.Id == eventSubscriptionId); return Task.FromResult(sub); } } @@ -177,7 +178,9 @@ public Task SetSubscriptionToken(string eventSubscriptionId, string token, { lock (_subscriptions) { - var sub = _subscriptions.Single(x => x.Id == eventSubscriptionId); + var sub = _subscriptions.SingleOrDefault(x => x.Id == eventSubscriptionId); + if (sub == null) + return Task.FromResult(false); sub.ExternalToken = token; sub.ExternalWorkerId = workerId; sub.ExternalTokenExpiry = expiry; @@ -190,7 +193,9 @@ public Task ClearSubscriptionToken(string eventSubscriptionId, string token, Can { lock (_subscriptions) { - var sub = _subscriptions.Single(x => x.Id == eventSubscriptionId); + var sub = _subscriptions.SingleOrDefault(x => x.Id == eventSubscriptionId); + if (sub == null) + throw new InvalidOperationException($"Subscription {eventSubscriptionId} not found."); if (sub.ExternalToken != token) throw new InvalidOperationException(); sub.ExternalToken = null; @@ -271,7 +276,7 @@ public async Task MarkEventUnprocessed(string id, CancellationToken _ = default) public async Task PersistErrors(IEnumerable errors, CancellationToken _ = default) { - lock (errors) + lock (_errors) { _errors.AddRange(errors); } diff --git a/src/WorkflowCore/Services/DefaultProviders/SingleNodeEventHub.cs b/src/WorkflowCore/Services/DefaultProviders/SingleNodeEventHub.cs index 007b51efb..b83ec46ee 100644 --- a/src/WorkflowCore/Services/DefaultProviders/SingleNodeEventHub.cs +++ b/src/WorkflowCore/Services/DefaultProviders/SingleNodeEventHub.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading.Tasks; using Microsoft.Extensions.Logging; @@ -9,7 +10,7 @@ namespace WorkflowCore.Services { public class SingleNodeEventHub : ILifeCycleEventHub { - private ICollection> _subscribers = new HashSet>(); + private readonly ConcurrentBag> _subscribers = new ConcurrentBag>(); private readonly ILogger _logger; public SingleNodeEventHub(ILoggerFactory loggerFactory) @@ -19,9 +20,9 @@ public SingleNodeEventHub(ILoggerFactory loggerFactory) public Task PublishNotification(LifeCycleEvent evt) { - Task.Run(() => + return Task.Run(() => { - foreach (var subscriber in _subscribers) + foreach (var subscriber in _subscribers.ToArray()) { try { @@ -33,7 +34,6 @@ public Task PublishNotification(LifeCycleEvent evt) } } }); - return Task.CompletedTask; } public void Subscribe(Action action) @@ -48,7 +48,7 @@ public Task Start() public Task Stop() { - _subscribers.Clear(); + while (_subscribers.TryTake(out _)) { } return Task.CompletedTask; } } diff --git a/src/WorkflowCore/Services/DefaultProviders/SingleNodeQueueProvider.cs b/src/WorkflowCore/Services/DefaultProviders/SingleNodeQueueProvider.cs index 56f63b550..69bccaea3 100644 --- a/src/WorkflowCore/Services/DefaultProviders/SingleNodeQueueProvider.cs +++ b/src/WorkflowCore/Services/DefaultProviders/SingleNodeQueueProvider.cs @@ -48,7 +48,12 @@ public Task Stop() } public void Dispose() - { + { + foreach (var queue in _queues.Values) + { + queue.CompleteAdding(); + queue.Dispose(); + } } } diff --git a/src/WorkflowCore/Services/ErrorHandlers/CompensateHandler.cs b/src/WorkflowCore/Services/ErrorHandlers/CompensateHandler.cs index d8990c160..4ae16b040 100644 --- a/src/WorkflowCore/Services/ErrorHandlers/CompensateHandler.cs +++ b/src/WorkflowCore/Services/ErrorHandlers/CompensateHandler.cs @@ -99,8 +99,8 @@ public void Handle(WorkflowInstance workflow, WorkflowDefinition def, ExecutionP nextCompensationPointer.Active = false; nextCompensationPointer.Status = PointerStatus.PendingPredecessor; nextCompensationPointer.PredecessorId = compensationPointer.Id; - compensationPointer = nextCompensationPointer; } + compensationPointer = nextCompensationPointer; workflow.ExecutionPointers.Add(nextCompensationPointer); siblingPointer.Status = PointerStatus.Compensated; diff --git a/src/WorkflowCore/Services/GreyList.cs b/src/WorkflowCore/Services/GreyList.cs index 85b67fbc8..31b3cb647 100644 --- a/src/WorkflowCore/Services/GreyList.cs +++ b/src/WorkflowCore/Services/GreyList.cs @@ -36,7 +36,7 @@ public bool Contains(string id) var result = start > (_dateTimeProvider.Now.AddMinutes(-1 * TTL)); if (!result) - _list.TryRemove(id, out var _); + _list.TryRemove(id, out var _); // Benign race: entry may have been refreshed between check and remove return result; } diff --git a/src/WorkflowCore/Services/WorkflowExecutor.cs b/src/WorkflowCore/Services/WorkflowExecutor.cs index da3e9cd85..116c6bcd7 100755 --- a/src/WorkflowCore/Services/WorkflowExecutor.cs +++ b/src/WorkflowCore/Services/WorkflowExecutor.cs @@ -210,7 +210,12 @@ private void ProcessAfterExecutionIteration(WorkflowInstance workflow, WorkflowD foreach (var pointer in pointers) { var step = workflowDef.Steps.FindById(pointer.StepId); - step?.AfterWorkflowIteration(workflowResult, workflowDef, workflow, pointer); + if (step == null) + { + _logger.LogWarning("Step {StepId} not found in workflow definition {WorkflowId}", pointer.StepId, workflowDef.Id); + continue; + } + step.AfterWorkflowIteration(workflowResult, workflowDef, workflow, pointer); } } diff --git a/src/WorkflowCore/Services/WorkflowHost.cs b/src/WorkflowCore/Services/WorkflowHost.cs index d2a09ec70..6f3195cbf 100644 --- a/src/WorkflowCore/Services/WorkflowHost.cs +++ b/src/WorkflowCore/Services/WorkflowHost.cs @@ -79,7 +79,7 @@ public Task PublishEvent(string eventName, string eventKey, object eventData, Da public void Start() { - StartAsync(CancellationToken.None).Wait(); + StartAsync(CancellationToken.None).GetAwaiter().GetResult(); } public async Task StartAsync(CancellationToken cancellationToken) @@ -116,7 +116,7 @@ public async Task StartAsync(CancellationToken cancellationToken) public void Stop() { - StopAsync(CancellationToken.None).Wait(); + StopAsync(CancellationToken.None).GetAwaiter().GetResult(); } public async Task StopAsync(CancellationToken cancellationToken) diff --git a/src/WorkflowCore/Services/WorkflowRegistry.cs b/src/WorkflowCore/Services/WorkflowRegistry.cs index beed19c0e..bd3903f08 100644 --- a/src/WorkflowCore/Services/WorkflowRegistry.cs +++ b/src/WorkflowCore/Services/WorkflowRegistry.cs @@ -23,32 +23,29 @@ public WorkflowDefinition GetDefinition(string workflowId, int? version = null) { if (version.HasValue) { - if (!_registry.ContainsKey($"{workflowId}-{version}")) - return default; - return _registry[$"{workflowId}-{version}"]; + _registry.TryGetValue($"{workflowId}-{version}", out var result); + return result; } else { - if (!_lastestVersion.ContainsKey(workflowId)) - return default; - return _lastestVersion[workflowId]; + _lastestVersion.TryGetValue(workflowId, out var result); + return result; } } public void DeregisterWorkflow(string workflowId, int version) { - if (!_registry.ContainsKey($"{workflowId}-{version}")) - return; - lock (_registry) { - _registry.TryRemove($"{workflowId}-{version}", out var _); - if (_lastestVersion[workflowId].Version == version) + if (!_registry.TryRemove($"{workflowId}-{version}", out var _)) + return; + if (_lastestVersion.TryGetValue(workflowId, out var current) && current.Version == version) { _lastestVersion.TryRemove(workflowId, out var _); - - var latest = _registry.Values.Where(x => x.Id == workflowId).OrderByDescending(x => x.Version).FirstOrDefault(); - if (latest != default) + var latest = _registry.Values.Where(x => x.Id == workflowId) + .OrderByDescending(x => x.Version) + .FirstOrDefault(); + if (latest != null) _lastestVersion[workflowId] = latest; } } diff --git a/src/extensions/WorkflowCore.AI.AzureFoundry/Services/ChatCompletionService.cs b/src/extensions/WorkflowCore.AI.AzureFoundry/Services/ChatCompletionService.cs index e740fbcf1..a0d9c3f3b 100644 --- a/src/extensions/WorkflowCore.AI.AzureFoundry/Services/ChatCompletionService.cs +++ b/src/extensions/WorkflowCore.AI.AzureFoundry/Services/ChatCompletionService.cs @@ -136,17 +136,21 @@ private ChatRequestMessage ConvertToSdkMessage(ConversationMessage message) } /// - /// Ensures tool call ID is valid (max 40 characters per API requirement) + /// Ensures tool call ID is valid (max 40 characters per API requirement). + /// Warning: truncation may cause mismatched tool call IDs between request and response. /// - private static string EnsureValidToolCallId(string id) + private string EnsureValidToolCallId(string id) { if (string.IsNullOrEmpty(id)) { - return "call_" + Guid.NewGuid().ToString("N").Substring(0, 24); + var generated = "call_" + Guid.NewGuid().ToString("N").Substring(0, 24); + _logger.LogWarning("Tool call ID was null or empty, generated replacement: {GeneratedId}", generated); + return generated; } if (id.Length > 40) { + _logger.LogWarning("Tool call ID truncated from {OriginalLength} to 40 characters: {OriginalId}", id.Length, id); return id.Substring(0, 40); } diff --git a/src/extensions/WorkflowCore.AI.AzureFoundry/Services/InMemoryConversationStore.cs b/src/extensions/WorkflowCore.AI.AzureFoundry/Services/InMemoryConversationStore.cs index c6fca1b75..36bb6f252 100644 --- a/src/extensions/WorkflowCore.AI.AzureFoundry/Services/InMemoryConversationStore.cs +++ b/src/extensions/WorkflowCore.AI.AzureFoundry/Services/InMemoryConversationStore.cs @@ -26,25 +26,34 @@ public Task GetThreadAsync(string threadId) public Task GetOrCreateThreadAsync(string workflowInstanceId, string executionPointerId) { var key = $"{workflowInstanceId}:{executionPointerId}"; - - if (_workflowThreadMap.TryGetValue(key, out var threadId)) + + var threadId = _workflowThreadMap.GetOrAdd(key, k => { - if (_threads.TryGetValue(threadId, out var existingThread)) + var thread = new ConversationThread { - return Task.FromResult(existingThread); - } + WorkflowInstanceId = workflowInstanceId, + ExecutionPointerId = executionPointerId + }; + _threads[thread.Id] = thread; + return thread.Id; + }); + + if (_threads.TryGetValue(threadId, out var existingThread)) + { + return Task.FromResult(existingThread); } - var thread = new ConversationThread + // The thread was removed or the mapping is stale; recreate and update the mapping. + var newThread = new ConversationThread { WorkflowInstanceId = workflowInstanceId, ExecutionPointerId = executionPointerId }; - _threads[thread.Id] = thread; - _workflowThreadMap[key] = thread.Id; + _threads[newThread.Id] = newThread; + _workflowThreadMap[key] = newThread.Id; - return Task.FromResult(thread); + return Task.FromResult(newThread); } public Task SaveThreadAsync(ConversationThread thread) diff --git a/src/extensions/WorkflowCore.AI.AzureFoundry/Services/ToolRegistry.cs b/src/extensions/WorkflowCore.AI.AzureFoundry/Services/ToolRegistry.cs index ffe113698..d2fa6eadc 100644 --- a/src/extensions/WorkflowCore.AI.AzureFoundry/Services/ToolRegistry.cs +++ b/src/extensions/WorkflowCore.AI.AzureFoundry/Services/ToolRegistry.cs @@ -39,6 +39,8 @@ public void Register(IAgentTool tool) public void Register() where T : IAgentTool { + if (_serviceProvider == null) + throw new InvalidOperationException("ServiceProvider is required to register tools by type. Provide a non-null IServiceProvider in the constructor."); var tool = _serviceProvider.GetRequiredService(); Register(tool); _toolTypes[tool.Name] = typeof(T); @@ -51,6 +53,8 @@ public IAgentTool GetTool(string name) if (_toolTypes.TryGetValue(name, out var type)) { + if (_serviceProvider == null) + throw new InvalidOperationException("ServiceProvider is required to resolve tools by type."); tool = (IAgentTool)_serviceProvider.GetRequiredService(type); _tools[name] = tool; return tool; diff --git a/src/extensions/WorkflowCore.WebAPI/Controllers/EventsController.cs b/src/extensions/WorkflowCore.WebAPI/Controllers/EventsController.cs index 4930aa22d..9608ce059 100644 --- a/src/extensions/WorkflowCore.WebAPI/Controllers/EventsController.cs +++ b/src/extensions/WorkflowCore.WebAPI/Controllers/EventsController.cs @@ -23,6 +23,11 @@ public EventsController(IWorkflowHost workflowHost, ILoggerFactory loggerFactory [HttpPost("{eventName}/{eventKey}")] public async Task Post(string eventName, string eventKey, [FromBody]object eventData) { + if (string.IsNullOrEmpty(eventName)) + return BadRequest("Event name is required"); + if (string.IsNullOrEmpty(eventKey)) + return BadRequest("Event key is required"); + await _workflowHost.PublishEvent(eventName, eventKey, eventData); return Ok(); } diff --git a/src/extensions/WorkflowCore.WebAPI/Controllers/WorkflowsController.cs b/src/extensions/WorkflowCore.WebAPI/Controllers/WorkflowsController.cs index 3c88df3fb..3af7b5283 100644 --- a/src/extensions/WorkflowCore.WebAPI/Controllers/WorkflowsController.cs +++ b/src/extensions/WorkflowCore.WebAPI/Controllers/WorkflowsController.cs @@ -31,6 +31,10 @@ public WorkflowsController(IWorkflowHost workflowHost, IWorkflowRegistry registr [HttpGet] public async Task Get(WorkflowStatus? status, string type, DateTime? createdFrom, DateTime? createdTo, int skip, int take) { + if (skip < 0) skip = 0; + if (take <= 0) take = 10; + if (take > 1000) take = 1000; + var result = await _workflowStore.GetWorkflowInstances(status, type, createdFrom, createdTo, skip, take); return Json(result.ToList()); } @@ -38,6 +42,9 @@ public async Task Get(WorkflowStatus? status, string type, DateTi [HttpGet("{id}")] public async Task Get(string id) { + if (string.IsNullOrEmpty(id)) + return BadRequest("Workflow id is required"); + var result = await _workflowStore.GetWorkflowInstance(id); return Json(result); } @@ -49,7 +56,7 @@ public async Task Post(string id, int? version, string reference, string workflowId = null; var def = _registry.GetDefinition(id, version); if (def == null) - return BadRequest(String.Format("Workflow defintion {0} for version {1} not found", id, version)); + return NotFound(String.Format("Workflow definition of {0} not found", id)); if ((data != null) && (def.DataType != null)) { var dataStr = JsonConvert.SerializeObject(data); diff --git a/src/providers/WorkflowCore.LockProviders.SqlServer/SqlLockProvider.cs b/src/providers/WorkflowCore.LockProviders.SqlServer/SqlLockProvider.cs index 8bb0cf5dc..0b3a98946 100644 --- a/src/providers/WorkflowCore.LockProviders.SqlServer/SqlLockProvider.cs +++ b/src/providers/WorkflowCore.LockProviders.SqlServer/SqlLockProvider.cs @@ -77,10 +77,10 @@ public async Task AcquireLock(string Id, CancellationToken cancellationTok return false; } } - catch (Exception ex) + catch (Exception) { connection.Close(); - throw ex; + throw; } } finally @@ -97,8 +97,8 @@ public async Task ReleaseLock(string Id) { try { - SqlConnection connection = null; - connection = _locks[Id]; + if (!_locks.TryGetValue(Id, out var connection)) + return; if (connection == null) return; diff --git a/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs b/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs index 22ba680f5..28a147d2e 100644 --- a/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs +++ b/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs @@ -105,7 +105,7 @@ public async Task GetWorkflowInstance(string Id, CancellationT .Include(wf => wf.ExecutionPointers) .ThenInclude(ep => ep.ExtensionAttributes) .Include(wf => wf.ExecutionPointers) - .FirstAsync(x => x.InstanceId == uid, cancellationToken); + .FirstOrDefaultAsync(x => x.InstanceId == uid, cancellationToken); if (raw == null) return null; @@ -238,7 +238,7 @@ public async Task GetEvent(string id, CancellationToken cancellationToken { Guid uid = new Guid(id); var raw = await db.Set() - .FirstAsync(x => x.EventId == uid, cancellationToken); + .FirstOrDefaultAsync(x => x.EventId == uid, cancellationToken); if (raw == null) return null; @@ -271,7 +271,10 @@ public async Task MarkEventProcessed(string id, CancellationToken cancellationTo var existingEntity = await db.Set() .Where(x => x.EventId == uid) .AsTracking() - .FirstAsync(cancellationToken); + .FirstOrDefaultAsync(cancellationToken); + + if (existingEntity == null) + return; existingEntity.IsProcessed = true; await db.SaveChangesAsync(cancellationToken); @@ -305,7 +308,10 @@ public async Task MarkEventUnprocessed(string id, CancellationToken cancellation var existingEntity = await db.Set() .Where(x => x.EventId == uid) .AsTracking() - .FirstAsync(cancellationToken); + .FirstOrDefaultAsync(cancellationToken); + + if (existingEntity == null) + return; existingEntity.IsProcessed = false; await db.SaveChangesAsync(cancellationToken); diff --git a/src/providers/WorkflowCore.Persistence.MongoDB/ServiceCollectionExtensions.cs b/src/providers/WorkflowCore.Persistence.MongoDB/ServiceCollectionExtensions.cs index f03205c3a..9c7b8be59 100644 --- a/src/providers/WorkflowCore.Persistence.MongoDB/ServiceCollectionExtensions.cs +++ b/src/providers/WorkflowCore.Persistence.MongoDB/ServiceCollectionExtensions.cs @@ -1,4 +1,5 @@ -using MongoDB.Driver; +using Microsoft.Extensions.Logging; +using MongoDB.Driver; using System; using WorkflowCore.Interface; using WorkflowCore.Models; @@ -23,7 +24,7 @@ public static WorkflowOptions UseMongoDB( configureClient?.Invoke(mongoClientSettings); var client = new MongoClient(mongoClientSettings); var db = client.GetDatabase(databaseName); - return new MongoPersistenceProvider(db); + return new MongoPersistenceProvider(db, sp.GetRequiredService()); }); options.Services.AddTransient(sp => { @@ -49,7 +50,7 @@ public static WorkflowOptions UseMongoDB( options.UsePersistence(sp => { var db = createDatabase(sp); - return new MongoPersistenceProvider(db); + return new MongoPersistenceProvider(db, sp.GetRequiredService()); }); options.Services.AddTransient(sp => { diff --git a/src/providers/WorkflowCore.Persistence.MongoDB/Services/MongoPersistenceProvider.cs b/src/providers/WorkflowCore.Persistence.MongoDB/Services/MongoPersistenceProvider.cs index a72340d68..4fc59f7b6 100644 --- a/src/providers/WorkflowCore.Persistence.MongoDB/Services/MongoPersistenceProvider.cs +++ b/src/providers/WorkflowCore.Persistence.MongoDB/Services/MongoPersistenceProvider.cs @@ -9,6 +9,7 @@ using MongoDB.Bson.Serialization.Conventions; using MongoDB.Bson.Serialization.Serializers; using MongoDB.Driver.Linq; +using Microsoft.Extensions.Logging; using WorkflowCore.Interface; using WorkflowCore.Models; using System.Threading; @@ -19,10 +20,12 @@ public class MongoPersistenceProvider : IPersistenceProvider { internal const string WorkflowCollectionName = "wfc.workflows"; private readonly IMongoDatabase _database; + private readonly ILogger _logger; - public MongoPersistenceProvider(IMongoDatabase database) + public MongoPersistenceProvider(IMongoDatabase database, ILoggerFactory logFactory) { _database = database; + _logger = logFactory.CreateLogger(); } static MongoPersistenceProvider() @@ -86,9 +89,12 @@ static MongoPersistenceProvider() .SetIgnoreExtraElements(true); } - static bool indexesCreated = false; + private static bool indexesCreated = false; + private static readonly object _indexLock = new object(); static void CreateIndexes(MongoPersistenceProvider instance) { + lock (_indexLock) + { if (!indexesCreated) { instance.WorkflowInstances.Indexes.CreateOne(new CreateIndexModel( @@ -128,6 +134,7 @@ static void CreateIndexes(MongoPersistenceProvider instance) indexesCreated = true; } + } } private IMongoCollection WorkflowInstances => _database.GetCollection(WorkflowCollectionName); @@ -162,9 +169,17 @@ public async Task PersistWorkflow(WorkflowInstance workflow, List x.CommandName == command.CommandName && x.Data == command.Data); } - catch (Exception) + catch (Exception ex) { - //TODO: add logger + _logger.LogError(ex, "Error processing scheduled command {CommandName}", command.CommandName); } } } diff --git a/src/providers/WorkflowCore.Persistence.RavenDB/Services/RavendbPersistenceProvider.cs b/src/providers/WorkflowCore.Persistence.RavenDB/Services/RavendbPersistenceProvider.cs index 3e1d5e2ea..9b5e8da27 100644 --- a/src/providers/WorkflowCore.Persistence.RavenDB/Services/RavendbPersistenceProvider.cs +++ b/src/providers/WorkflowCore.Persistence.RavenDB/Services/RavendbPersistenceProvider.cs @@ -17,7 +17,8 @@ public class RavendbPersistenceProvider : IPersistenceProvider { internal const string WorkflowCollectionName = "wfc.workflows"; private readonly IDocumentStore _database; - static bool indexesCreated = false; + private static bool indexesCreated = false; + private static readonly object _indexLock = new object(); public bool SupportsScheduledCommands => false; @@ -29,16 +30,19 @@ public RavendbPersistenceProvider(IDocumentStore database) static void CreateIndexes(RavendbPersistenceProvider instance) { - if (!indexesCreated) + lock (_indexLock) { - /* - // create the indexes here based on assemby of classes in the file 'RavenDbIndexes.cs' - IndexCreation.CreateIndexes(typeof(WorkflowInstances_Id).Assembly, instance._database); - IndexCreation.CreateIndexes(typeof(EventSubscriptions_Id).Assembly, instance._database); - IndexCreation.CreateIndexes(typeof(Events_Id).Assembly, instance._database); - IndexCreation.CreateIndexes(typeof(ExecutionErrors_Id).Assembly, instance._database); - */ - indexesCreated = true; + if (!indexesCreated) + { + /* + // create the indexes here based on assemby of classes in the file 'RavenDbIndexes.cs' + IndexCreation.CreateIndexes(typeof(WorkflowInstances_Id).Assembly, instance._database); + IndexCreation.CreateIndexes(typeof(EventSubscriptions_Id).Assembly, instance._database); + IndexCreation.CreateIndexes(typeof(Events_Id).Assembly, instance._database); + IndexCreation.CreateIndexes(typeof(ExecutionErrors_Id).Assembly, instance._database); + */ + indexesCreated = true; + } } } diff --git a/src/providers/WorkflowCore.Providers.AWS/Services/DynamoLockProvider.cs b/src/providers/WorkflowCore.Providers.AWS/Services/DynamoLockProvider.cs index 0863f1393..2bbf6b0fb 100644 --- a/src/providers/WorkflowCore.Providers.AWS/Services/DynamoLockProvider.cs +++ b/src/providers/WorkflowCore.Providers.AWS/Services/DynamoLockProvider.cs @@ -122,19 +122,20 @@ public async Task Start() _cancellationTokenSource = new CancellationTokenSource(); - _heartbeatTask = new Task(SendHeartbeat); - _heartbeatTask.Start(); + _heartbeatTask = Task.Run(() => SendHeartbeat()); } - public Task Stop() + public async Task Stop() { _cancellationTokenSource.Cancel(); - _heartbeatTask.Wait(); - _heartbeatTask = null; - return Task.CompletedTask; + if (_heartbeatTask != null) + { + await _heartbeatTask; + _heartbeatTask = null; + } } - private async void SendHeartbeat() + private async Task SendHeartbeat() { while (!_cancellationTokenSource.IsCancellationRequested) { diff --git a/src/providers/WorkflowCore.Providers.AWS/Services/DynamoPersistenceProvider.cs b/src/providers/WorkflowCore.Providers.AWS/Services/DynamoPersistenceProvider.cs index b22eb955e..dab5b5d34 100644 --- a/src/providers/WorkflowCore.Providers.AWS/Services/DynamoPersistenceProvider.cs +++ b/src/providers/WorkflowCore.Providers.AWS/Services/DynamoPersistenceProvider.cs @@ -406,7 +406,7 @@ public Task PersistErrors(IEnumerable errors, CancellationToken public void EnsureStoreExists() { - _provisioner.ProvisionTables().Wait(); + _provisioner.ProvisionTables().GetAwaiter().GetResult(); } public async Task GetSubscription(string eventSubscriptionId, CancellationToken cancellationToken = default) diff --git a/src/providers/WorkflowCore.Providers.AWS/Services/KinesisStreamConsumer.cs b/src/providers/WorkflowCore.Providers.AWS/Services/KinesisStreamConsumer.cs index 5c89f7837..843db4f92 100644 --- a/src/providers/WorkflowCore.Providers.AWS/Services/KinesisStreamConsumer.cs +++ b/src/providers/WorkflowCore.Providers.AWS/Services/KinesisStreamConsumer.cs @@ -31,8 +31,7 @@ public KinesisStreamConsumer(AmazonKinesisClient kinesisClient, IKinesisTracker _tracker = tracker; _lockManager = lockManager; _client = kinesisClient; - _processTask = new Task(Process); - _processTask.Start(); + _processTask = Task.Run(() => Process()); _dateTimeProvider = dateTimeProvider; } @@ -55,7 +54,7 @@ public async Task Subscribe(string appName, string stream, Action action } } - private async void Process() + private async Task Process() { while (!_cancelToken.IsCancellationRequested) { diff --git a/src/providers/WorkflowCore.Providers.Azure/Services/AzureLockManager.cs b/src/providers/WorkflowCore.Providers.Azure/Services/AzureLockManager.cs index 6780a77ad..987c673bb 100644 --- a/src/providers/WorkflowCore.Providers.Azure/Services/AzureLockManager.cs +++ b/src/providers/WorkflowCore.Providers.Azure/Services/AzureLockManager.cs @@ -110,7 +110,12 @@ public Task Stop() return Task.CompletedTask; } - private async void RenewLeases(object state) + private void RenewLeases(object state) + { + _ = RenewLeasesAsync(); + } + + private async Task RenewLeasesAsync() { _logger.LogDebug("Renewing active leases"); if (_mutex.WaitOne()) diff --git a/src/providers/WorkflowCore.QueueProviders.RabbitMQ/Services/RabbitMQProvider.cs b/src/providers/WorkflowCore.QueueProviders.RabbitMQ/Services/RabbitMQProvider.cs index 1554d8bed..f1b95b199 100644 --- a/src/providers/WorkflowCore.QueueProviders.RabbitMQ/Services/RabbitMQProvider.cs +++ b/src/providers/WorkflowCore.QueueProviders.RabbitMQ/Services/RabbitMQProvider.cs @@ -1,5 +1,4 @@ -using Newtonsoft.Json; -using RabbitMQ.Client; +using RabbitMQ.Client; using System; using System.Linq; using System.Text; @@ -19,7 +18,6 @@ public class RabbitMQProvider : IQueueProvider private readonly IServiceProvider _serviceProvider; private IConnection _connection = null; - private static JsonSerializerSettings SerializerSettings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All }; public bool IsDequeueBlocking => false; @@ -46,7 +44,7 @@ public async Task QueueWork(string id, QueueType queue) } finally { - await channel.CloseAsync(200, "OK", abort: false, CancellationToken.None); + await channel.DisposeAsync(); } } @@ -80,12 +78,13 @@ await channel.QueueDeclareAsync(queue: _queueNameProvider.GetQueueName(queue), } finally { - await channel.CloseAsync(200, "OK", abort: false, CancellationToken.None); + await channel.DisposeAsync(); } } public void Dispose() { + // GetAwaiter().GetResult() is the best available sync-over-async pattern for IDisposable.Dispose() if (_connection != null) { if (_connection.IsOpen) diff --git a/src/providers/WorkflowCore.QueueProviders.SqlServer/Services/SqlServerQueueProvider.cs b/src/providers/WorkflowCore.QueueProviders.SqlServer/Services/SqlServerQueueProvider.cs index 7a6044bbd..11436df0e 100644 --- a/src/providers/WorkflowCore.QueueProviders.SqlServer/Services/SqlServerQueueProvider.cs +++ b/src/providers/WorkflowCore.QueueProviders.SqlServer/Services/SqlServerQueueProvider.cs @@ -74,7 +74,7 @@ public async Task Stop() public void Dispose() { - Stop().Wait(); + Stop().GetAwaiter().GetResult(); } /// @@ -89,8 +89,7 @@ public async Task QueueWork(string id, QueueType queue) if (string.IsNullOrEmpty(id)) throw new ArgumentNullException(nameof(id), "Param id must not be null"); - SqlConnection cn = new SqlConnection(_connectionString); - try + using (var cn = new SqlConnection(_connectionString)) { await cn.OpenAsync(); var par = _config.GetByQueue(queue); @@ -103,10 +102,6 @@ await _sqlCommandExecutor.ExecuteCommandAsync(cn, null, _queueWorkCommand, new SqlParameter("@RequestMessage", id) ); } - finally - { - cn.Close(); - } } /// @@ -118,21 +113,23 @@ await _sqlCommandExecutor.ExecuteCommandAsync(cn, null, _queueWorkCommand, /// Next id from queue, null if no message arrives in one second. public async Task DequeueWork(QueueType queue, CancellationToken cancellationToken) { - SqlConnection cn = new SqlConnection(_connectionString); - try + using (var cn = new SqlConnection(_connectionString)) { await cn.OpenAsync(cancellationToken); var par = _config.GetByQueue(queue); - var sql = _dequeueWorkCommand.Replace("{queueName}", par.QueueName); + var sql = _dequeueWorkCommand.Replace("{queueName}", SanitizeIdentifier(par.QueueName)); var msg = await _sqlCommandExecutor.ExecuteScalarAsync(cn, null, sql); return msg is DBNull ? null : (string)msg; - - } - finally - { - cn.Close(); } } + + private static string SanitizeIdentifier(string name) + { + if (string.IsNullOrWhiteSpace(name)) + throw new ArgumentException("Queue name cannot be null or empty.", nameof(name)); + // Escape any ']' characters to prevent breaking out of the delimited identifier + return name.Replace("]", "]]"); + } } } \ No newline at end of file diff --git a/src/providers/WorkflowCore.QueueProviders.SqlServer/Services/SqlServerQueueProviderMigrator.cs b/src/providers/WorkflowCore.QueueProviders.SqlServer/Services/SqlServerQueueProviderMigrator.cs index cfed8a4c3..531952f09 100644 --- a/src/providers/WorkflowCore.QueueProviders.SqlServer/Services/SqlServerQueueProviderMigrator.cs +++ b/src/providers/WorkflowCore.QueueProviders.SqlServer/Services/SqlServerQueueProviderMigrator.cs @@ -31,40 +31,38 @@ public SqlServerQueueProviderMigrator(string connectionString, IQueueConfigProvi public async Task MigrateDbAsync() { - var cn = new SqlConnection(_connectionString); - await cn.OpenAsync(); - var tx = cn.BeginTransaction(); - try + using (var cn = new SqlConnection(_connectionString)) { - var queueConfigurations = new[] + await cn.OpenAsync(); + var tx = cn.BeginTransaction(); + try { - _configProvider.GetByQueue(QueueType.Workflow), - _configProvider.GetByQueue(QueueType.Event), - _configProvider.GetByQueue(QueueType.Index) - }; + var queueConfigurations = new[] + { + _configProvider.GetByQueue(QueueType.Workflow), + _configProvider.GetByQueue(QueueType.Event), + _configProvider.GetByQueue(QueueType.Index) + }; - foreach (var item in queueConfigurations) - { - await CreateMessageType(cn, tx, item.MsgType); + foreach (var item in queueConfigurations) + { + await CreateMessageType(cn, tx, item.MsgType); - await CreateContract(cn, tx, item.ContractName, item.MsgType); + await CreateContract(cn, tx, item.ContractName, item.MsgType); - await CreateQueue(cn, tx, item.QueueName); + await CreateQueue(cn, tx, item.QueueName); - await CreateService(cn, tx, item.InitiatorService, item.QueueName, item.ContractName); - await CreateService(cn, tx, item.TargetService, item.QueueName, item.ContractName); - } + await CreateService(cn, tx, item.InitiatorService, item.QueueName, item.ContractName); + await CreateService(cn, tx, item.TargetService, item.QueueName, item.ContractName); + } - tx.Commit(); - } - catch - { - tx.Rollback(); - throw; - } - finally - { - cn.Close(); + tx.Commit(); + } + catch + { + tx.Rollback(); + throw; + } } } @@ -76,7 +74,7 @@ private async Task CreateService(SqlConnection cn, SqlTransaction tx, string nam if (!string.IsNullOrEmpty(existing)) return; - await _sqlCommandExecutor.ExecuteCommandAsync(cn, tx, $"CREATE SERVICE [{name}] ON QUEUE [{queueName}]([{contractName}]);"); + await _sqlCommandExecutor.ExecuteCommandAsync(cn, tx, $"CREATE SERVICE [{SanitizeIdentifier(name)}] ON QUEUE [{SanitizeIdentifier(queueName)}]([{SanitizeIdentifier(contractName)}]);"); } private async Task CreateQueue(SqlConnection cn, SqlTransaction tx, string queueName) @@ -87,7 +85,7 @@ private async Task CreateQueue(SqlConnection cn, SqlTransaction tx, string queue if (!string.IsNullOrEmpty(existing)) return; - await _sqlCommandExecutor.ExecuteCommandAsync(cn, tx, $"CREATE QUEUE [{queueName}];"); + await _sqlCommandExecutor.ExecuteCommandAsync(cn, tx, $"CREATE QUEUE [{SanitizeIdentifier(queueName)}];"); } private async Task CreateContract(SqlConnection cn, SqlTransaction tx, string contractName, string messageName) @@ -98,7 +96,7 @@ private async Task CreateContract(SqlConnection cn, SqlTransaction tx, string co if (!string.IsNullOrEmpty(existing)) return; - await _sqlCommandExecutor.ExecuteCommandAsync(cn, tx, $"CREATE CONTRACT [{contractName}] ( [{messageName}] SENT BY INITIATOR);"); + await _sqlCommandExecutor.ExecuteCommandAsync(cn, tx, $"CREATE CONTRACT [{SanitizeIdentifier(contractName)}] ( [{SanitizeIdentifier(messageName)}] SENT BY INITIATOR);"); } private async Task CreateMessageType(SqlConnection cn, SqlTransaction tx, string message) @@ -109,11 +107,18 @@ private async Task CreateMessageType(SqlConnection cn, SqlTransaction tx, string if (!string.IsNullOrEmpty(existing)) return; - await _sqlCommandExecutor.ExecuteCommandAsync(cn, tx, $"CREATE MESSAGE TYPE [{message}] VALIDATION = NONE;"); + await _sqlCommandExecutor.ExecuteCommandAsync(cn, tx, $"CREATE MESSAGE TYPE [{SanitizeIdentifier(message)}] VALIDATION = NONE;"); } #endregion + private static string SanitizeIdentifier(string name) + { + if (string.IsNullOrWhiteSpace(name)) + throw new ArgumentException("Identifier cannot be null or empty.", nameof(name)); + return name.Replace("]", "]]"); + } + public async Task CreateDbAsync() { var builder = new SqlConnectionStringBuilder(_connectionString); @@ -123,10 +128,9 @@ public async Task CreateDbAsync() var masterCnStr = masterBuilder.ToString(); bool dbPresente; - var cn = new SqlConnection(masterCnStr); - await cn.OpenAsync(); - try + using (var cn = new SqlConnection(masterCnStr)) { + await cn.OpenAsync(); var cmd = cn.CreateCommand(); cmd.CommandText = "select name from sys.databases where name = @dbname"; cmd.Parameters.AddWithValue("@dbname", builder.InitialCatalog); @@ -140,38 +144,32 @@ public async Task CreateDbAsync() await createCmd.ExecuteNonQueryAsync(); } } - finally - { - cn.Close(); - } await EnableBroker(masterCnStr, builder.InitialCatalog); } private async Task EnableBroker(string masterCn, string db) { - var cn = new SqlConnection(masterCn); - await cn.OpenAsync(); + using (var cn = new SqlConnection(masterCn)) + { + await cn.OpenAsync(); - var isBrokerEnabled = await _sqlCommandExecutor.ExecuteScalarAsync(cn, null, @"select is_broker_enabled from sys.databases where name = @name", new SqlParameter("@name", db)); + var isBrokerEnabled = await _sqlCommandExecutor.ExecuteScalarAsync(cn, null, @"select is_broker_enabled from sys.databases where name = @name", new SqlParameter("@name", db)); - if (isBrokerEnabled) - return; + if (isBrokerEnabled) + return; - var tx = cn.BeginTransaction(); - try - { - await _sqlCommandExecutor.ExecuteCommandAsync(cn, tx, $"ALTER DATABASE [{db}] SET ENABLE_BROKER;"); - tx.Commit(); - } - catch - { - tx.Rollback(); - throw; - } - finally - { - cn.Close(); + var tx = cn.BeginTransaction(); + try + { + await _sqlCommandExecutor.ExecuteCommandAsync(cn, tx, $"ALTER DATABASE [{SanitizeIdentifier(db)}] SET ENABLE_BROKER;"); + tx.Commit(); + } + catch + { + tx.Rollback(); + throw; + } } } } diff --git a/test/Directory.Build.props b/test/Directory.Build.props index cdb7e1676..cd70cf018 100644 --- a/test/Directory.Build.props +++ b/test/Directory.Build.props @@ -7,7 +7,7 @@ - + diff --git a/test/Docker.Testify/Docker.Testify.csproj b/test/Docker.Testify/Docker.Testify.csproj index a83648df4..ba7966809 100644 --- a/test/Docker.Testify/Docker.Testify.csproj +++ b/test/Docker.Testify/Docker.Testify.csproj @@ -1,7 +1,7 @@  - + diff --git a/test/Docker.Testify/DockerSetup.cs b/test/Docker.Testify/DockerSetup.cs index 71bb76e26..790bcc5ff 100644 --- a/test/Docker.Testify/DockerSetup.cs +++ b/test/Docker.Testify/DockerSetup.cs @@ -32,9 +32,9 @@ public abstract class DockerSetup : IDisposable protected DockerSetup() { if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) - docker = new DockerClientConfiguration(new Uri("npipe://./pipe/docker_engine")).CreateClient(); + docker = new DockerClientConfiguration(new Uri("npipe://./pipe/docker_engine")).CreateClient(new System.Version(1, 45)); else - docker = new DockerClientConfiguration(new Uri("unix:///var/run/docker.sock")).CreateClient(); + docker = new DockerClientConfiguration(new Uri("unix:///var/run/docker.sock")).CreateClient(new System.Version(1, 45)); ExternalPort = GetFreePort(); diff --git a/test/WorkflowCore.Tests.Elasticsearch/WorkflowCore.Tests.Elasticsearch.csproj b/test/WorkflowCore.Tests.Elasticsearch/WorkflowCore.Tests.Elasticsearch.csproj index d5bae6c71..0e3402428 100644 --- a/test/WorkflowCore.Tests.Elasticsearch/WorkflowCore.Tests.Elasticsearch.csproj +++ b/test/WorkflowCore.Tests.Elasticsearch/WorkflowCore.Tests.Elasticsearch.csproj @@ -1,11 +1,11 @@  - net6.0 + net8.0 - + diff --git a/test/WorkflowCore.Tests.MongoDB/MongoPersistenceProviderFixture.cs b/test/WorkflowCore.Tests.MongoDB/MongoPersistenceProviderFixture.cs index fd6546aca..2c5ad735d 100644 --- a/test/WorkflowCore.Tests.MongoDB/MongoPersistenceProviderFixture.cs +++ b/test/WorkflowCore.Tests.MongoDB/MongoPersistenceProviderFixture.cs @@ -1,4 +1,5 @@ -using MongoDB.Driver; +using Microsoft.Extensions.Logging; +using MongoDB.Driver; using WorkflowCore.Interface; using WorkflowCore.Persistence.MongoDB.Services; using WorkflowCore.UnitTests; @@ -22,7 +23,7 @@ protected override IPersistenceProvider Subject { var client = new MongoClient(MongoDockerSetup.ConnectionString); var db = client.GetDatabase(nameof(MongoPersistenceProviderFixture)); - var provider = new MongoPersistenceProvider(db); + var provider = new MongoPersistenceProvider(db, new LoggerFactory()); provider.EnsureStoreExists(); return provider; } diff --git a/test/WorkflowCore.Tests.MongoDB/WorkflowCore.Tests.MongoDB.csproj b/test/WorkflowCore.Tests.MongoDB/WorkflowCore.Tests.MongoDB.csproj index 0bffc02e8..64cd9edfc 100644 --- a/test/WorkflowCore.Tests.MongoDB/WorkflowCore.Tests.MongoDB.csproj +++ b/test/WorkflowCore.Tests.MongoDB/WorkflowCore.Tests.MongoDB.csproj @@ -21,7 +21,7 @@ - + diff --git a/test/WorkflowCore.Tests.MySQL/WorkflowCore.Tests.MySQL.csproj b/test/WorkflowCore.Tests.MySQL/WorkflowCore.Tests.MySQL.csproj index bdb4029e7..7ac48eb75 100644 --- a/test/WorkflowCore.Tests.MySQL/WorkflowCore.Tests.MySQL.csproj +++ b/test/WorkflowCore.Tests.MySQL/WorkflowCore.Tests.MySQL.csproj @@ -1,11 +1,11 @@ - net6.0;net8.0 + net8.0 - + diff --git a/test/WorkflowCore.Tests.PostgreSQL/DockerSetup.cs b/test/WorkflowCore.Tests.PostgreSQL/DockerSetup.cs index 8548bf85d..a29173855 100644 --- a/test/WorkflowCore.Tests.PostgreSQL/DockerSetup.cs +++ b/test/WorkflowCore.Tests.PostgreSQL/DockerSetup.cs @@ -1,5 +1,4 @@ -using System; -using System.Threading.Tasks; +using System.Threading.Tasks; using Squadron; using Xunit; @@ -13,7 +12,6 @@ public class PostgresDockerSetup : IAsyncLifetime public PostgresDockerSetup() { - AppContext.SetSwitch("Npgsql.EnableLegacyTimestampBehavior", true); _postgreSqlResource = new PostgreSqlResource(); } diff --git a/test/WorkflowCore.Tests.PostgreSQL/WorkflowCore.Tests.PostgreSQL.csproj b/test/WorkflowCore.Tests.PostgreSQL/WorkflowCore.Tests.PostgreSQL.csproj index 53a91aa9a..0ab8bc505 100644 --- a/test/WorkflowCore.Tests.PostgreSQL/WorkflowCore.Tests.PostgreSQL.csproj +++ b/test/WorkflowCore.Tests.PostgreSQL/WorkflowCore.Tests.PostgreSQL.csproj @@ -7,7 +7,7 @@ false false false - net6.0;net8.0 + net8.0 @@ -21,7 +21,7 @@ - + diff --git a/test/WorkflowCore.Tests.Redis/WorkflowCore.Tests.Redis.csproj b/test/WorkflowCore.Tests.Redis/WorkflowCore.Tests.Redis.csproj index f6a8aaed1..6349f0ed3 100644 --- a/test/WorkflowCore.Tests.Redis/WorkflowCore.Tests.Redis.csproj +++ b/test/WorkflowCore.Tests.Redis/WorkflowCore.Tests.Redis.csproj @@ -1,12 +1,12 @@ - net6.0 + net8.0 - - + + diff --git a/test/WorkflowCore.Tests.SqlServer/WorkflowCore.Tests.SqlServer.csproj b/test/WorkflowCore.Tests.SqlServer/WorkflowCore.Tests.SqlServer.csproj index 11b6c00e2..ac6e7ffb8 100644 --- a/test/WorkflowCore.Tests.SqlServer/WorkflowCore.Tests.SqlServer.csproj +++ b/test/WorkflowCore.Tests.SqlServer/WorkflowCore.Tests.SqlServer.csproj @@ -1,11 +1,11 @@  - net6.0;net8.0;net9.0 + net8.0;net9.0 - +