Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
327 changes: 327 additions & 0 deletions engine/src/db/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,39 @@ impl super::Db {
)?)
}

/// Count "fruitless" tasks for a given Linear issue and pipeline stage (RIG-405).
///
/// A fruitless task is one that made no progress:
/// - `status = 'failed'` (crashed or timed out), OR
/// - `status = 'completed'` but produced **no effects** in the effects table
/// (e.g. Qwen agent completes with empty output — no MoveIssue, CreatePr, etc.)
///
/// Used by `should_skip_due_to_failures()` to detect infinite spawn loops where
/// agents complete successfully but make zero progress. Pure `count_failed_tasks`
/// misses this case because the task status is "completed", not "failed".
pub fn count_fruitless_tasks_for_issue_stage(
&self,
issue_id: &str,
stage: &str,
) -> Result<i64> {
Ok(self.conn.query_row(
"SELECT COUNT(*) FROM tasks t
WHERE t.issue_identifier = ?1
AND t.pipeline_stage = ?2
AND (
t.status = 'failed'
OR (
t.status = 'completed'
AND NOT EXISTS (
SELECT 1 FROM effects e WHERE e.task_id = t.id
)
)
)",
params![issue_id, stage],
|row| row.get(0),
)?)
}

/// Count all attempts (completed + failed) for a given Linear issue and pipeline stage.
/// Used as a general circuit breaker (RIG-309). Retry cap (RIG-338) uses
/// `count_failed_tasks_for_issue_stage` instead to avoid capping successful verdicts.
Expand Down Expand Up @@ -362,6 +395,41 @@ impl super::Db {
Ok(result)
}

/// Get the `finished_at` timestamp of the most recent fruitless task for an issue+stage.
/// A fruitless task is one that failed OR completed without producing any effects.
/// Returns `None` if no fruitless tasks exist or if `finished_at` is NULL.
/// Used by the poller cooldown (RIG-405) — mirrors `count_fruitless_tasks_for_issue_stage`
/// but returns the latest timestamp instead of a count.
pub fn last_fruitless_task_time_for_issue_stage(
&self,
issue_id: &str,
stage: &str,
) -> Result<Option<String>> {
let result: Option<String> = self
.conn
.query_row(
"SELECT finished_at FROM tasks t
WHERE t.issue_identifier = ?1
AND t.pipeline_stage = ?2
AND (
t.status = 'failed'
OR (
t.status = 'completed'
AND NOT EXISTS (
SELECT 1 FROM effects e WHERE e.task_id = t.id
)
)
)
AND t.finished_at IS NOT NULL
ORDER BY t.finished_at DESC
LIMIT 1",
params![issue_id, stage],
|row| row.get(0),
)
.ok();
Ok(result)
}

// --- PR Reviewed ---

pub fn is_pr_reviewed(&self, pr_key: &str) -> Result<bool> {
Expand Down Expand Up @@ -1286,4 +1354,263 @@ mod tests {
.unwrap();
assert_eq!(result, Some("2026-04-01T10:00:00".to_string()));
}

// ─── RIG-405: last_fruitless_task_time_for_issue_stage ────────────────

#[test]
fn last_fruitless_time_none_when_no_tasks() {
let db = Db::open_in_memory().unwrap();
let result = db
.last_fruitless_task_time_for_issue_stage("RIG-405", "engineer")
.unwrap();
assert!(result.is_none());
}

#[test]
fn last_fruitless_time_returns_failed_task() {
let db = Db::open_in_memory().unwrap();

let mut t = make_test_task("20260409-010");
t.issue_identifier = "RIG-405".to_string();
t.pipeline_stage = "engineer".to_string();
db.insert_task(&t).unwrap();
db.set_task_status("20260409-010", Status::Failed).unwrap();
db.update_task_field("20260409-010", "finished_at", "2026-04-09T10:00:00")
.unwrap();

let result = db
.last_fruitless_task_time_for_issue_stage("RIG-405", "engineer")
.unwrap();
assert_eq!(result, Some("2026-04-09T10:00:00".to_string()));
}

#[test]
fn last_fruitless_time_returns_completed_without_effects() {
let db = Db::open_in_memory().unwrap();

let mut t = make_test_task("20260409-011");
t.issue_identifier = "RIG-405".to_string();
t.pipeline_stage = "engineer".to_string();
db.insert_task(&t).unwrap();
db.set_task_status("20260409-011", Status::Completed)
.unwrap();
db.update_task_field("20260409-011", "finished_at", "2026-04-09T11:00:00")
.unwrap();

let result = db
.last_fruitless_task_time_for_issue_stage("RIG-405", "engineer")
.unwrap();
assert_eq!(
result,
Some("2026-04-09T11:00:00".to_string()),
"completed task with no effects should be fruitless"
);
}

#[test]
fn last_fruitless_time_ignores_completed_with_effects() {
use crate::models::{Effect, EffectStatus, EffectType};
let db = Db::open_in_memory().unwrap();

let mut t = make_test_task("20260409-012");
t.issue_identifier = "RIG-405".to_string();
t.pipeline_stage = "engineer".to_string();
db.insert_task(&t).unwrap();
db.set_task_status("20260409-012", Status::Completed)
.unwrap();
db.update_task_field("20260409-012", "finished_at", "2026-04-09T12:00:00")
.unwrap();

let effect = Effect {
id: 0,
dedup_key: "20260409-012:create-pr".to_string(),
task_id: "20260409-012".to_string(),
issue_id: "RIG-405".to_string(),
effect_type: EffectType::CreatePr,
payload: serde_json::json!({}),
blocking: true,
status: EffectStatus::Pending,
attempts: 0,
max_attempts: 3,
created_at: "2026-04-09T12:00:00".to_string(),
next_retry_at: None,
executed_at: None,
error: None,
};
db.insert_effects(&[effect]).unwrap();

let result = db
.last_fruitless_task_time_for_issue_stage("RIG-405", "engineer")
.unwrap();
assert!(
result.is_none(),
"completed task with effects must NOT be fruitless"
);
}

#[test]
fn last_fruitless_time_returns_most_recent() {
let db = Db::open_in_memory().unwrap();

// Failed task at 10:00
let mut t1 = make_test_task("20260409-013");
t1.issue_identifier = "RIG-405".to_string();
t1.pipeline_stage = "engineer".to_string();
db.insert_task(&t1).unwrap();
db.set_task_status("20260409-013", Status::Failed).unwrap();
db.update_task_field("20260409-013", "finished_at", "2026-04-09T10:00:00")
.unwrap();

// Fruitless completion at 11:00
let mut t2 = make_test_task("20260409-014");
t2.issue_identifier = "RIG-405".to_string();
t2.pipeline_stage = "engineer".to_string();
db.insert_task(&t2).unwrap();
db.set_task_status("20260409-014", Status::Completed)
.unwrap();
db.update_task_field("20260409-014", "finished_at", "2026-04-09T11:00:00")
.unwrap();

let result = db
.last_fruitless_task_time_for_issue_stage("RIG-405", "engineer")
.unwrap();
assert_eq!(
result,
Some("2026-04-09T11:00:00".to_string()),
"should return the most recent fruitless task time"
);
}

// ─── RIG-405: count_fruitless_tasks_for_issue_stage ─────────────────

#[test]
fn fruitless_count_zero_when_no_tasks() {
let db = Db::open_in_memory().unwrap();
let count = db
.count_fruitless_tasks_for_issue_stage("RIG-405", "engineer")
.unwrap();
assert_eq!(count, 0);
}

#[test]
fn fruitless_count_includes_failed_tasks() {
let db = Db::open_in_memory().unwrap();

let mut t = make_test_task("20260409-001");
t.issue_identifier = "RIG-405".to_string();
t.pipeline_stage = "engineer".to_string();
db.insert_task(&t).unwrap();
db.set_task_status("20260409-001", Status::Failed).unwrap();

let count = db
.count_fruitless_tasks_for_issue_stage("RIG-405", "engineer")
.unwrap();
assert_eq!(count, 1, "failed task must count as fruitless");
}

#[test]
fn fruitless_count_includes_completed_without_effects() {
// Simulates Qwen completing with empty output (no effects produced).
let db = Db::open_in_memory().unwrap();

let mut t = make_test_task("20260409-002");
t.issue_identifier = "RIG-405".to_string();
t.pipeline_stage = "engineer".to_string();
db.insert_task(&t).unwrap();
db.set_task_status("20260409-002", Status::Completed)
.unwrap();

let count = db
.count_fruitless_tasks_for_issue_stage("RIG-405", "engineer")
.unwrap();
assert_eq!(
count, 1,
"completed task with no effects must count as fruitless"
);
}

#[test]
fn fruitless_count_excludes_completed_with_effects() {
// A completed task that produced at least one effect is NOT fruitless.
use crate::models::{Effect, EffectStatus, EffectType};
let db = Db::open_in_memory().unwrap();

let mut t = make_test_task("20260409-003");
t.issue_identifier = "RIG-405".to_string();
t.pipeline_stage = "engineer".to_string();
db.insert_task(&t).unwrap();
db.set_task_status("20260409-003", Status::Completed)
.unwrap();

// Insert an effect for this task (e.g. engineer created a PR)
let effect = Effect {
id: 0,
dedup_key: "20260409-003:create-pr".to_string(),
task_id: "20260409-003".to_string(),
issue_id: "RIG-405".to_string(),
effect_type: EffectType::CreatePr,
payload: serde_json::json!({}),
blocking: true,
status: EffectStatus::Pending,
attempts: 0,
max_attempts: 3,
created_at: "2026-04-09T10:00:00".to_string(),
next_retry_at: None,
executed_at: None,
error: None,
};
db.insert_effects(&[effect]).unwrap();

let count = db
.count_fruitless_tasks_for_issue_stage("RIG-405", "engineer")
.unwrap();
assert_eq!(
count, 0,
"completed task with effects must NOT count as fruitless"
);
}

#[test]
fn fruitless_count_filters_by_stage() {
let db = Db::open_in_memory().unwrap();

// Failed task for "reviewer" stage
let mut t = make_test_task("20260409-004");
t.issue_identifier = "RIG-405".to_string();
t.pipeline_stage = "reviewer".to_string();
db.insert_task(&t).unwrap();
db.set_task_status("20260409-004", Status::Failed).unwrap();

// Engineer stage should see 0
let count = db
.count_fruitless_tasks_for_issue_stage("RIG-405", "engineer")
.unwrap();
assert_eq!(count, 0, "must filter by stage name");
}

#[test]
fn fruitless_count_excludes_running_and_pending() {
let db = Db::open_in_memory().unwrap();

// Pending task
let mut t1 = make_test_task("20260409-005");
t1.issue_identifier = "RIG-405".to_string();
t1.pipeline_stage = "engineer".to_string();
db.insert_task(&t1).unwrap();

// Running task
let mut t2 = make_test_task("20260409-006");
t2.issue_identifier = "RIG-405".to_string();
t2.pipeline_stage = "engineer".to_string();
db.insert_task(&t2).unwrap();
db.set_task_status("20260409-006", Status::Running).unwrap();

let count = db
.count_fruitless_tasks_for_issue_stage("RIG-405", "engineer")
.unwrap();
assert_eq!(
count, 0,
"pending/running tasks must not count as fruitless"
);
}
}
Loading
Loading