Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions app/scheduler/claim_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,31 @@ def get_runs(task_id: str, limit: int = 20, db_path: Path | None = None) -> list
conn.close()


def delete_runs(task_id: str, db_path: Path | None = None) -> int:
"""Delete all task-run records for a given task ID.

Returns the number of deleted rows. Safe to call when no DB or table
exists (returns 0). Idempotent — subsequent calls return 0.
"""
path = db_path or _default_db_path()
if not path.exists():
return 0
conn = _connect(path)
try:
_ensure_schema(conn)
cursor = conn.execute(
"DELETE FROM task_runs WHERE task_id = ?",
(task_id,),
)
conn.commit()
return cursor.rowcount
finally:
conn.close()


__all__ = [
"complete_run",
"delete_runs",
"get_runs",
"try_claim",
]
26 changes: 25 additions & 1 deletion app/scheduler/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from filelock import FileLock

from app.constants import OPENSRE_HOME_DIR
from app.scheduler.claim_store import _DB_FILENAME, delete_runs
from app.scheduler.types import ScheduledTask

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -79,7 +80,13 @@ def add_task(task: ScheduledTask, store_path: Path | None = None) -> ScheduledTa


def remove_task(task_id: str, store_path: Path | None = None) -> bool:
"""Remove a task by ID. Returns True if found and removed."""
"""Remove a task by ID and cascade-delete its run records.

Returns True if the task was found and removed from the JSON store.
Cascade deletion of ``TaskRun`` records in the SQLite claim store is
best-effort — a warning is logged on failure but the return value
reflects only the JSON-store result.
"""
path = store_path or _default_store_path()
lock = FileLock(_lock_path(path))
with lock:
Expand All @@ -89,6 +96,23 @@ def remove_task(task_id: str, store_path: Path | None = None) -> bool:
if len(raw) == original_len:
return False
_save_raw(path, raw)

# Cascade: remove orphaned TaskRun records from the SQLite claim store.
# Derive the DB path from the same directory as the JSON store.
db_path = path.with_name(_DB_FILENAME)
try:
deleted = delete_runs(task_id, db_path)
if deleted:
logger.info("Cascade-deleted %d run(s) for removed task %s", deleted, task_id)
except Exception: # noqa: BLE001
logger.warning(
"Failed to cascade-delete runs for task %s (DB: %s); "
"orphaned runs may remain",
task_id,
db_path,
exc_info=True,
)

return True


Expand Down
32 changes: 31 additions & 1 deletion tests/scheduler/test_claim_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import pytest

from app.scheduler.claim_store import complete_run, get_runs, try_claim
from app.scheduler.claim_store import complete_run, delete_runs, get_runs, try_claim
from app.scheduler.types import TaskStatus


Expand Down Expand Up @@ -92,6 +92,36 @@ def test_get_runs_empty(self, db_path: Path) -> None:
runs = get_runs("nonexistent", db_path=db_path)
assert runs == []

def test_delete_runs_removes_only_matching_task(self, db_path: Path) -> None:
try_claim("task1", "2026-01-01T09:00", db_path=db_path)
try_claim("task2", "2026-01-01T09:00", db_path=db_path)
assert len(get_runs("task1", db_path=db_path)) == 1
assert len(get_runs("task2", db_path=db_path)) == 1

deleted = delete_runs("task1", db_path=db_path)
assert deleted == 1

# task1 runs are gone
assert get_runs("task1", db_path=db_path) == []
# task2 runs are untouched
assert len(get_runs("task2", db_path=db_path)) == 1

def test_delete_runs_idempotent(self, db_path: Path) -> None:
try_claim("task1", "2026-01-01T09:00", db_path=db_path)
assert delete_runs("task1", db_path=db_path) == 1
assert delete_runs("task1", db_path=db_path) == 0

def test_delete_runs_empty_db(self, db_path: Path) -> None:
assert delete_runs("nonexistent", db_path=db_path) == 0

def test_delete_runs_deletes_multiple_runs(self, db_path: Path) -> None:
for i in range(3):
fire_time = f"2026-01-01T{i:02d}:00"
try_claim("task1", fire_time, db_path=db_path)

assert delete_runs("task1", db_path=db_path) == 3
assert get_runs("task1", db_path=db_path) == []


class TestConcurrency:
"""Verify the UNIQUE constraint prevents double-posting."""
Expand Down
51 changes: 51 additions & 0 deletions tests/scheduler/test_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import pytest

from app.scheduler.claim_store import get_runs, try_claim
from app.scheduler.store import add_task, get_task, list_tasks, remove_task, update_task
from app.scheduler.types import Provider, ScheduledTask, TaskKind

Expand All @@ -15,6 +16,10 @@ def store_path(tmp_path: Path) -> Path:
return tmp_path / "scheduler_tasks.json"


def _db_path(store_path: Path) -> Path:
return store_path.with_name("scheduler.db")


class TestStore:
def test_list_empty(self, store_path: Path) -> None:
tasks = list_tasks(store_path)
Expand Down Expand Up @@ -62,6 +67,52 @@ def test_remove_task(self, store_path: Path) -> None:
assert remove_task(task.id, store_path) is True
assert list_tasks(store_path) == []

def test_remove_task_cascade_deletes_runs(self, store_path: Path) -> None:
"""Removing a task must also remove its TaskRun records."""
task = ScheduledTask(
kind=TaskKind.DAILY_SUMMARY,
cron="0 9 * * *",
provider=Provider.TELEGRAM,
chat_id="-100",
)
add_task(task, store_path)

db_path = _db_path(store_path)
try_claim(task.id, "2026-01-01T09:00", db_path=db_path)
try_claim(task.id, "2026-01-01T10:00", db_path=db_path)

assert remove_task(task.id, store_path) is True

assert get_runs(task.id, db_path=db_path) == []

def test_remove_task_cascade_does_not_affect_other_tasks(self, store_path: Path) -> None:
"""Removing one task's runs must not delete another task's runs."""
task_a = ScheduledTask(
id="task-a",
kind=TaskKind.DAILY_SUMMARY,
cron="0 9 * * *",
provider=Provider.TELEGRAM,
chat_id="-100",
)
task_b = ScheduledTask(
id="task-b",
kind=TaskKind.WEEKLY_AUDIT,
cron="0 8 * * 1",
provider=Provider.SLACK,
chat_id="C123",
)
add_task(task_a, store_path)
add_task(task_b, store_path)

db_path = _db_path(store_path)
try_claim("task-a", "2026-01-01T09:00", db_path=db_path)
try_claim("task-b", "2026-01-01T09:00", db_path=db_path)

assert remove_task("task-a", store_path) is True

assert get_runs("task-a", db_path=db_path) == []
assert len(get_runs("task-b", db_path=db_path)) == 1

def test_remove_nonexistent(self, store_path: Path) -> None:
assert remove_task("nonexistent", store_path) is False

Expand Down