Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Elsa.Common.Multitenancy;
using Elsa.Scheduling.Quartz.Jobs;
using Quartz;

namespace Elsa.Scheduling.Quartz;
Expand All @@ -18,4 +19,27 @@ internal static class JobExecutionExtensions

return await tenantFinder.FindByIdAsync(tenantId, context.CancellationToken);
}

/// <summary>
/// Executes delete job if allowed
/// </summary>
/// <param name="context">The Quartz job execution context.</param>
/// <param name="jobKey">The Quartz job key.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public static async Task DeleteJob(this IJobExecutionContext context, JobKey jobKey, CancellationToken cancellationToken = default)
{
if (IsJobAllowedToBeDeleted(jobKey.Name))
await context.Scheduler.DeleteJob(jobKey, cancellationToken);
}

/// <summary>
/// Checks if the job is allowed to be deleted by name
/// </summary>
/// <param name="jobName">Name of the job to check</param>
/// <returns>False if the job is one of the required ones, otherwise true</returns>
private static bool IsJobAllowedToBeDeleted(string jobName)
{
return jobName != nameof(ResumeWorkflowJob)
&& jobName != nameof(RunWorkflowJob);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public async Task Execute(IJobExecutionContext context)
catch (Exception e)
{
logger.LogError(e, "An error occurred while resuming workflow instance {WorkflowInstanceId}", workflowInstanceId);
await context.Scheduler.DeleteJob(context.JobDetail.Key, cancellationToken);
await context.DeleteJob(context.JobDetail.Key, cancellationToken);
Comment thread
kk-nuv marked this conversation as resolved.
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public async Task Execute(IJobExecutionContext context)
catch (WorkflowGraphNotFoundException e)
{
logger.LogWarning(e, "Could not find workflow graph for workflow definition handle {WorkflowDefinitionHandle}", startRequest.WorkflowDefinitionHandle);
await context.Scheduler.DeleteJob(context.JobDetail.Key, cancellationToken);
await context.Scheduler.UnscheduleJob(context.Trigger.Key, cancellationToken);
}
Comment thread
kk-nuv marked this conversation as resolved.
Comment thread
kk-nuv marked this conversation as resolved.
catch (Exception e) when (transientExceptionDetector.IsTransient(e))
{
Expand All @@ -68,7 +68,7 @@ public async Task Execute(IJobExecutionContext context)
catch (Exception e)
{
logger.LogError(e, "An error occurred while starting workflow {WorkflowDefinitionHandle} with correlation ID {CorrelationId}", startRequest.WorkflowDefinitionHandle, startRequest.CorrelationId);
await context.Scheduler.DeleteJob(context.JobDetail.Key, cancellationToken);
await context.DeleteJob(context.JobDetail.Key, cancellationToken);
}
Comment thread
kk-nuv marked this conversation as resolved.
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ public static class QuartzJobTestHelper
/// Creates a mock job execution context with the specified job data.
/// </summary>
public static (IJobExecutionContext Context, Mock<QuartzScheduler> Scheduler) CreateJobExecutionContext(
IDictionary<string, object> jobData)
IDictionary<string, object> jobData,
string? jobKeyName = null)
{
var jobDataMap = new JobDataMap(jobData);
var jobKey = new JobKey("test-job");
var jobKey = new JobKey(jobKeyName ?? "test-job");
var triggerKey = new TriggerKey("test-trigger");

var jobDetail = new Mock<IJobDetail>();
Expand Down Expand Up @@ -130,6 +131,18 @@ public void VerifyRescheduled() =>
/// </summary>
public void VerifyDeleted() =>
scheduler.Verify(s => s.DeleteJob(It.IsAny<JobKey>(), It.IsAny<CancellationToken>()), Times.Once);

/// <summary>
/// Verifies that the scheduler did not delete a job.
/// </summary>
public void VerifyNotDeleted() =>
scheduler.Verify(s => s.DeleteJob(It.IsAny<JobKey>(), It.IsAny<CancellationToken>()), Times.Never);

/// <summary>
/// Verifies that the scheduler unscheduled a job exactly once.
/// </summary>
public void VerifyUnscheduled() =>
scheduler.Verify(s => s.UnscheduleJob(It.IsAny<TriggerKey>(), It.IsAny<CancellationToken>()), Times.Once);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,22 @@ public async Task Execute_TransientException_ReschedulesJob(Type exceptionType,
}

[Theory]
[InlineData(typeof(InvalidOperationException))]
[InlineData(typeof(ArgumentException))]
public async Task Execute_NonTransientException_DeletesJob(Type exceptionType)
[InlineData(typeof(InvalidOperationException), null)]
[InlineData(typeof(ArgumentException), null)]
[InlineData(typeof(InvalidOperationException), "ResumeWorkflowJob")]
[InlineData(typeof(ArgumentException), "ResumeWorkflowJob")]
public async Task Execute_NonTransientException_DeletesJob(Type exceptionType, string? jobKeyName)
{
var (context, scheduler) = CreateJobExecutionContext();
var (context, scheduler) = CreateJobExecutionContext(jobKeyName: jobKeyName);
_transientDetector.SetupIsTransient(false);
_workflowRuntime.SetupCreateClientThrows((Exception)Activator.CreateInstance(exceptionType)!);

await _job.Execute(context);

scheduler.VerifyDeleted();
if (string.IsNullOrEmpty(jobKeyName))
scheduler.VerifyDeleted();
else
scheduler.VerifyNotDeleted();
}

[Fact]
Expand Down Expand Up @@ -121,7 +126,8 @@ public async Task Execute_WithActivityHandle_DeserializesCorrectly()
Assert.Equal("activity-123", capturedRequest.ActivityHandle?.ActivityId);
}

private static (IJobExecutionContext, Mock<QuartzScheduler>) CreateJobExecutionContext(string? activityHandle = null)
private static (IJobExecutionContext, Mock<QuartzScheduler>) CreateJobExecutionContext(string? activityHandle = null,
string? jobKeyName = null)
{
var jobData = new Dictionary<string, object>
{
Expand All @@ -132,6 +138,6 @@ private static (IJobExecutionContext, Mock<QuartzScheduler>) CreateJobExecutionC
if (activityHandle != null)
jobData.Add("ActivityHandle", activityHandle);

return QuartzJobTestHelper.CreateJobExecutionContext(jobData);
return QuartzJobTestHelper.CreateJobExecutionContext(jobData, jobKeyName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public async Task Execute_WorkflowGraphNotFound_DeletesJob()

await _job.Execute(context);

scheduler.VerifyDeleted();
scheduler.VerifyUnscheduled();
}

[Theory]
Expand All @@ -83,17 +83,22 @@ public async Task Execute_TransientException_ReschedulesJob(Type exceptionType,
}

[Theory]
[InlineData(typeof(InvalidOperationException))]
[InlineData(typeof(ArgumentException))]
public async Task Execute_NonTransientException_DeletesJob(Type exceptionType)
[InlineData(typeof(InvalidOperationException), null)]
[InlineData(typeof(ArgumentException), null)]
[InlineData(typeof(InvalidOperationException), "RunWorkflowJob")]
[InlineData(typeof(ArgumentException), "RunWorkflowJob")]
public async Task Execute_NonTransientException_DeletesJob(Type exceptionType, string? jobKeyName)
{
var (context, scheduler) = CreateJobExecutionContext();
var (context, scheduler) = CreateJobExecutionContext(jobKeyName);
_transientDetector.SetupIsTransient(false);
_workflowStarter.SetupStartWorkflowThrows((Exception)Activator.CreateInstance(exceptionType)!);

await _job.Execute(context);

scheduler.VerifyDeleted();
if (string.IsNullOrEmpty(jobKeyName))
scheduler.VerifyDeleted();
else
scheduler.VerifyNotDeleted();
}

[Fact]
Expand All @@ -111,11 +116,11 @@ public async Task Execute_UsesCorrectWorkflowDefinitionHandle()
Assert.Equal("workflow-def-123", capturedRequest.WorkflowDefinitionHandle.DefinitionVersionId);
}

private static (IJobExecutionContext, Mock<QuartzScheduler>) CreateJobExecutionContext() =>
private static (IJobExecutionContext, Mock<QuartzScheduler>) CreateJobExecutionContext(string? jobKeyName = null) =>
QuartzJobTestHelper.CreateJobExecutionContext(new Dictionary<string, object>
{
{ "DefinitionVersionId", "workflow-def-123" },
{ "CorrelationId", "corr-123" },
{ "TriggerActivityId", "trigger-123" }
});
}, jobKeyName);
}
Loading