feat: scheduled pipelines + dependency cascade fix#80
Conversation
Add SchedulePipeline MCP tool and CLI support for creating recurring or one-time schedules that trigger multi-step pipelines. Each trigger creates fresh tasks with linear dependencies (step N depends on step N-1). Bug fixes: - Dependency failure cascade: failed/cancelled upstream tasks now cascade cancellation to dependents instead of incorrectly unblocking them - Queue handler race condition: fast-path dependencyState check prevents blocked tasks from being enqueued before dependency rows are written Features: - SchedulePipeline MCP tool (2-20 steps, cron/one-time, per-step agent) - CLI: --pipeline --step flags for schedule create - CancelSchedule: optional cancelTasks flag for in-flight pipeline tasks - ListSchedules: isPipeline/stepCount indicators - GetSchedule: pipelineSteps in response - Database migration 8: pipeline_steps, pipeline_task_ids columns - 188 new tests (1,467 total), zero regressions
Confidence Score: 3/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant Scheduler
participant ScheduleHandler
participant TaskRepo
participant ScheduleRepo
participant EventBus
participant DependencyHandler
participant QueueHandler
Scheduler->>ScheduleHandler: ScheduleTriggered (pipeline schedule)
ScheduleHandler->>ScheduleHandler: resolveAfterScheduleTaskId()
loop For each step i
ScheduleHandler->>TaskRepo: save(task_i, dependsOn=[task_{i-1}])
TaskRepo-->>ScheduleHandler: ok(task_i)
end
ScheduleHandler->>ScheduleRepo: recordTriggeredExecution(lastTaskId, allTaskIds)
ScheduleHandler->>ScheduleRepo: updateScheduleAfterTrigger()
Note over ScheduleHandler: ⚠️ If update fails here, N tasks orphaned with no cleanup
loop Emit TaskDelegated per step
ScheduleHandler->>EventBus: emit TaskDelegated(task_i)
Note over ScheduleHandler: Step 0 failure → cancel all tasks + return error
EventBus->>QueueHandler: TaskDelegated(task_0)
QueueHandler->>QueueHandler: dependencyState==='blocked'? fast-path skip
QueueHandler->>EventBus: emit TaskQueued(task_0)
end
ScheduleHandler->>EventBus: emit ScheduleExecuted(lastTaskId)
Note over EventBus,DependencyHandler: When task_0 completes...
EventBus->>DependencyHandler: TaskCompleted(task_0)
DependencyHandler->>DependencyHandler: getDependencies(task_1) — check for failed/cancelled deps
alt All deps completed
DependencyHandler->>EventBus: emit TaskUnblocked(task_1)
else Any dep failed/cancelled
DependencyHandler->>EventBus: emit TaskCancellationRequested(task_1)
end
Last reviewed commit: bd56703 |
| this.logger.info('Injected afterSchedule dependency on pipeline step 0', { | ||
| scheduleId, | ||
| afterScheduleId: schedule.afterScheduleId, | ||
| dependsOnTaskId: latestExecution.taskId, |
There was a problem hiding this comment.
HIGH: Duplicated afterScheduleId resolution logic
The handlePipelineTrigger method contains inline afterScheduleId resolution (lines 327-345 approximately) that duplicates the extracted resolveAfterScheduleDependency helper (lines 455-483). The single-task path correctly calls the shared helper, but the pipeline path re-implements the same business logic:
- Fetch execution history
- Get latest execution
- Check if task exists and is in terminal state
- Return undefined or the task ID
Impact: Two places to maintain the same logic. If afterScheduleId resolution rules change (e.g., checking multiple executions, different terminal state semantics), both paths must be updated independently.
Fix: Refactor resolveAfterScheduleDependency to return the resolved TaskId | undefined instead of a modified template, so both handleSingleTaskTrigger and handlePipelineTrigger consume the same primitive:
private async resolveAfterScheduleTaskId(afterScheduleId: ScheduleId): Promise<TaskId | undefined> {
const historyResult = await this.scheduleRepo.getExecutionHistory(afterScheduleId, 1);
if (!historyResult.ok || historyResult.value.length === 0) return undefined;
const latestExecution = historyResult.value[0];
if (!latestExecution.taskId) return undefined;
const depTaskResult = await this.taskRepo.findById(latestExecution.taskId);
if (!depTaskResult.ok || !depTaskResult.value || isTerminalState(depTaskResult.value.status)) {
return undefined;
}
return latestExecution.taskId;
}Flagged by: Architecture, Complexity, Consistency reviews
Code Review Summary: PR #80 - Scheduled PipelinesStatus: CHANGES_REQUESTED This PR introduces scheduled pipelines with dependency failure cascade fixes. The feature is well-structured overall, but there are 8 HIGH severity and 5 MEDIUM severity blocking issues that should be addressed before merge. Below is a deduplicated summary based on comprehensive reviews from 10 reviewers (Architecture, Complexity, Consistency, Database, Documentation, Performance, Regression, Security, Tests, TypeScript). BLOCKING ISSUESCritical1. TASK-DEPENDENCIES.md contradicts cascade behavior (
HIGH Issues2. Duplicated afterScheduleId resolution logic (
private async resolveAfterScheduleTaskId(afterScheduleId: ScheduleId): Promise<TaskId | undefined> {
const historyResult = await this.scheduleRepo.getExecutionHistory(afterScheduleId, 1);
if (!historyResult.ok || historyResult.value.length === 0) return undefined;
const latestExecution = historyResult.value[0];
if (!latestExecution.taskId) return undefined;
const depTaskResult = await this.taskRepo.findById(latestExecution.taskId);
if (!depTaskResult.ok || !depTaskResult.value || isTerminalState(depTaskResult.value.status)) return undefined;
return latestExecution.taskId;
}3. createSchedule not using validateScheduleTiming (
4. Pipeline task creation loop not wrapped in transaction (
const txResult = await this.taskRepo.transaction(async (txRepo) => {
for (let i = 0; i < steps.length; i++) {
// ... build task ...
const saveResult = await txRepo.save(task);
if (!saveResult.ok) return saveResult;
savedTasks.push(task);
}
return ok(undefined);
});
if (!txResult.ok) {
await this.recordFailedExecution(...);
return txResult;
}5. MCP adapter tests use simulate helpers instead of real code (
6. Missing release notes for v0.6.0 (
7. No JSDoc on cancelSchedule updated signature (
/**
* @param cancelTasks - If true, also cancel in-flight tasks from the latest execution
*/8. No JSDoc on createScheduledPipeline (
MEDIUM Issues9. Pipeline cleanup bypasses event system (
10. Validation duplication in CLI (
11. Non-null assertion on pipelineSteps (
// Option A: Accept as parameter
private async handlePipelineTrigger(
schedule: Schedule,
triggeredAt: number,
steps: readonly PipelineStepRequest[]
): Promise<Result<void>> { ... }
// Option B: Local guard
if (!schedule.pipelineSteps || schedule.pipelineSteps.length === 0) {
return err(new BackbeatError(ErrorCode.INVALID_INPUT, 'Pipeline requires steps'));
}
const steps = schedule.pipelineSteps; // TypeScript narrows correctly12. Missing Zod validation on pipeline_task_ids JSON parse (
const PipelineTaskIdsSchema = z.array(z.string().min(1));
if (data.pipeline_task_ids) {
try {
const parsed = JSON.parse(data.pipeline_task_ids);
const validated = PipelineTaskIdsSchema.parse(parsed);
pipelineTaskIds = validated.map((id) => TaskId(id));
} catch {
pipelineTaskIds = undefined;
}
}13. No service-level test for cancelSchedule with cancelTasks=true (
RECOMMENDATIONCHANGES_REQUESTED - The blocking issues should be resolved before merge: Priority 1 (Critical/High - Required):
Priority 2 (Medium - Should Fix): POSITIVE OBSERVATIONS
Files Requiring Changes
Claude Code - Code Review for Backbeat PR #80 |
…k in handleSchedulePipeline The handleSchedulePipeline handler used undefined as the nextRunAt fallback, while all other schedule handlers (handleScheduleTask, handleListSchedules, handleGetSchedule) used null. This caused inconsistent JSON serialization — undefined omits the field entirely, while null includes it explicitly as "nextRunAt": null. Co-Authored-By: Claude <noreply@anthropic.com>
…hedule Replace ~80 lines of inline timing validation in createSchedule with a call to the existing validateScheduleTiming helper. The helper's JSDoc already claims it is shared between createSchedule and createScheduledPipeline, but createSchedule was never refactored to use it. Now both methods follow the same pattern. Co-Authored-By: Claude <noreply@anthropic.com>
Replace bare `as string[]` type assertion with PipelineTaskIdsSchema Zod validation, matching the pattern used by pipeline_steps. This ensures malformed JSON (e.g., non-string elements, empty strings) is caught at the database boundary rather than silently propagated. Co-Authored-By: Claude <noreply@anthropic.com>
…lease notes TASK-DEPENDENCIES.md contradicted running code: the v0.6.0 cascade cancellation behavior (failed/cancelled deps auto-cancel dependents) was not reflected in the Event Flow diagram, Error Handling examples, or Best Practices section. Updated all sections to match the actual DependencyHandler implementation. Created missing RELEASE_NOTES_v0.6.0.md covering scheduled pipelines, dependency cascade fix, queue handler race condition fix, migration 8, SchedulePipeline MCP tool, and CLI pipeline flags. Co-Authored-By: Claude <noreply@anthropic.com>
…type-safety - Extract resolveAfterScheduleTaskId() returning TaskId | undefined, replacing the old resolveAfterScheduleDependency() that returned a modified task template. Both single-task and pipeline paths now call the same shared helper, eliminating ~18 lines of duplicated logic. - Pass pipelineSteps as a typed parameter (NonNullable) to handlePipelineTrigger, removing the non-null assertion (!). - Add ARCHITECTURE EXCEPTION comment to pipeline cleanup code explaining why direct taskRepo.update() is correct (no events emitted yet). - Add TODO for async transaction wrapping (better-sqlite3 limitation). Co-Authored-By: Claude <noreply@anthropic.com>
…rage Add 3 tests for the cancelTasks branch in ScheduleManagerService.cancelSchedule(): - Pipeline execution with pipelineTaskIds emits TaskCancellationRequested per task - Single taskId fallback when no pipelineTaskIds present - cancelTasks=false does not emit TaskCancellationRequested Co-Authored-By: Claude <noreply@anthropic.com>
| if (cancelTasks) { | ||
| const historyResult = await this.scheduleRepository.getExecutionHistory(scheduleId, 1); | ||
| if (historyResult.ok && historyResult.value.length > 0) { | ||
| const latestExecution = historyResult.value[0]; | ||
| const taskIds = latestExecution.pipelineTaskIds ?? (latestExecution.taskId ? [latestExecution.taskId] : []); | ||
| for (const taskId of taskIds) { | ||
| const cancelResult = await this.eventBus.emit('TaskCancellationRequested', { | ||
| taskId, | ||
| reason: `Schedule ${scheduleId} cancelled`, | ||
| }); | ||
| if (!cancelResult.ok) { | ||
| this.logger.warn('Failed to cancel pipeline task', { | ||
| taskId, | ||
| scheduleId, | ||
| error: cancelResult.error.message, | ||
| }); | ||
| } | ||
| } | ||
| this.logger.info('Cancelled in-flight pipeline tasks', { | ||
| scheduleId, | ||
| taskCount: taskIds.length, | ||
| }); | ||
| } | ||
| } |
There was a problem hiding this comment.
cancelTasks only covers the single latest execution
getExecutionHistory(scheduleId, 1) limits the lookup to one record. For CRON schedules with short intervals and slow multi-step pipelines, it is possible for a second trigger to fire before the first pipeline finishes. In that situation --cancel-tasks / cancelTasks: true will cancel the newly-created tasks from the latest run but leave the still-running tasks from the previous run untouched.
Consider fetching a small window (e.g., the last 5 executions) and deduplicating task IDs before cancelling:
const historyResult = await this.scheduleRepository.getExecutionHistory(scheduleId, 5);
if (historyResult.ok) {
const taskIdSet = new Set<TaskId>();
for (const execution of historyResult.value) {
if (execution.pipelineTaskIds) {
execution.pipelineTaskIds.forEach((id) => taskIdSet.add(id));
} else if (execution.taskId) {
taskIdSet.add(execution.taskId);
}
}
for (const taskId of taskIdSet) { ... }
}At a minimum the doc-comment / tool description should mention that only the most recent execution is targeted.
Chained schedules (afterScheduleId) resolve the predecessor's execution.taskId to check if it's terminal. Storing firstTaskId caused the chain to fire when step 1 completed, not the full pipeline. Now stores lastTaskId so chaining waits for pipeline completion.
…ascade check When getDependencies returned an error, code fell through to the unblock path — potentially unblocking a task whose dependency actually failed. Now logs a warning and skips the task instead.
…ectory validatePath() resolves relative/symlink paths to absolute, but the validation loop discarded the result — schedule stored the original un-normalized path. Now builds normalizedSteps array with resolved paths from validatePath().
…iledExecution Helper hardcoded "Failed to create task: " prefix. Pipeline callsite already passed "Pipeline failed at step N: ..." — resulting in double-wrapping in the audit trail. Moved prefix to single-task callsite, helper now passes errorMessage directly.
Field name implied task IDs but carried a boolean. Renamed to cancelTasksRequested to accurately describe its semantics.
…ernary Replace two identical 7-line ternary chains in schedule create (single-task and pipeline modes) with the existing toMissedRunPolicy() from schedule-manager. Guards with undefined check to preserve CLI pass-through semantics.
…eline error Step 0 is the only task that becomes runnable — all later steps block on it. If its TaskDelegated emit fails, the pipeline is orphaned forever. Now cancels all saved tasks instead of continuing best-effort. Also warn when positional prompt words are silently ignored in --pipeline mode, and fix Biome formatting in schedule-manager test.
Prevents silently discarding pipeline steps when --pipeline is omitted. Now exits with a helpful error message pointing to correct usage.
| // Update schedule state | ||
| const updateResult = await this.updateScheduleAfterTrigger(schedule, triggeredAt); | ||
| if (!updateResult.ok) return updateResult; |
There was a problem hiding this comment.
updateScheduleAfterTrigger failure orphans all N pipeline tasks
If updateScheduleAfterTrigger fails (e.g. a transient SQLite error), all N pipeline tasks have already been saved to the DB and the execution has been recorded as 'triggered' via recordTriggeredExecution. But because the return happens before the TaskDelegated emit loop, no task is ever delegated:
- Step 0 is stuck in
pending— never enqueued, never picked up by a worker. - Steps 1–N are
blockedon step 0 forever — their dependencies will never resolve. - The execution record shows
'triggered'with all task IDs, even though nothing will run.
For CRON schedules this is especially bad: because the scheduleRepo.update() call that advances nextRunAt failed, the schedule's nextRunAt is still in the past. On the very next scheduler tick findDue() picks it up again, handlePipelineTrigger runs again, and a new batch of orphaned tasks accumulates. This loop repeats indefinitely until updateScheduleAfterTrigger finally succeeds — potentially creating hundreds of ghost task sets.
Consider adding a cleanup block before returning:
const updateResult = await this.updateScheduleAfterTrigger(schedule, triggeredAt);
if (!updateResult.ok) {
// Clean up saved tasks so they don't accumulate as orphans
for (const savedTask of savedTasks) {
await this.taskRepo.update(savedTask.id, { status: TaskStatus.CANCELLED });
}
return updateResult;
}The same gap exists in handleSingleTaskTrigger (one orphaned task), but it is far less severe than up-to-20 tasks accumulating with every tick.
| } catch { | ||
| // Non-fatal: log but don't fail | ||
| pipelineTaskIds = undefined; | ||
| } |
There was a problem hiding this comment.
Comment says "log" but no logging actually happens
The inline comment reads "Non-fatal: log but don't fail", but SQLiteScheduleRepository has no logger, so silently setting pipelineTaskIds = undefined is the actual behavior. This matters operationally: when cancelTasks: true is used, cancelSchedule falls back to only the single taskId from the execution record (the last task ID), leaving all intermediate pipeline tasks running — and there's no log entry to indicate that the pipelineTaskIds field was silently dropped.
Note the asymmetry with rowToSchedule, which throws on a corrupt pipeline_steps column. Since a corrupt pipelineTaskIds has a real functional impact (incomplete cancellation), it warrants at minimum a console.warn or a dedicated error-logging hook:
} catch (e) {
// Non-fatal: don't fail execution history reads, but warn so operators can
// detect DB corruption before it silently breaks --cancel-tasks.
console.warn(`[schedule-repository] Failed to parse pipeline_task_ids for execution ${data.id}:`, e);
pipelineTaskIds = undefined;
}
Summary
SchedulePipelineMCP tool and--pipeline --stepCLI flags for creating cron or one-time schedules that trigger multi-step pipelines (2–20 steps). Each trigger creates fresh tasks with linear dependencies.dependencyStatecheck prevents blocked tasks from being enqueued before dependency rows are written to DB.Changes (21 files, +2180 / -183)
Bug fixes:
dependency-handler.ts— cascade cancellation on failed/cancelled upstreamqueue-handler.ts— fast-path blocked task check viadependencyStateCore:
domain.ts—pipelineStepson Schedule,ScheduledPipelineCreateRequestinterfaces.ts—createScheduledPipeline(),cancelTaskson canceldatabase.ts— migration 8:pipeline_steps,pipeline_task_idscolumnsschedule-repository.ts— JSON round-trip with Zod validationschedule-manager.ts—createScheduledPipeline(), shared timing validation,cancelTaskssupportschedule-handler.ts—handlePipelineTrigger()with partial save failure cleanupAdapters & CLI:
mcp-adapter.ts—SchedulePipelinetool, enhancedCancelSchedule/ListSchedules/GetScheduleschedule.ts—--pipeline --stepflags,--cancel-taskson cancelTests (188 new, 1,467 total):
Docs:
Closes #78
Test plan
npm run test:handlers— 115 passed (dependency cascade + queue handler + schedule handler pipeline)npm run test:services— 141 passed (schedule manager createScheduledPipeline)npm run test:implementations— 302 passed (schedule repo pipeline round-trip)npm run test:adapters— 55 passed (SchedulePipeline tool, CancelSchedule cancelTasks)npm run test:cli— 150 passed (--pipeline --step, --cancel-tasks)npm run test:all— 1,467 tests, zero regressionsnpm run build— clean