Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions src/kai/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,12 @@ async def _notify_if_queued(update: Update, chat_id: int) -> bool:
)

# Safety-net timeout for acquiring the per-chat lock (seconds). If the
# wall-clock timeout in claude.py doesn't fire for some reason, this
# prevents a stuck interaction from blocking all future messages. Set
# longer than claude.py's wall-clock limit (timeout_seconds * 5) so the
# inner timeout fires first when the process is emitting frequent lines.
# Note: the wall-clock check runs between readline calls, so it can
# overshoot by up to one readline timeout (timeout_seconds * 3 = 6 min
# at defaults). Effective worst case is ~16 min, not 10.
_LOCK_ACQUIRE_TIMEOUT = 660 # 11 minutes
# idle timeout in claude.py doesn't fire for some reason, this prevents
# a stuck interaction from blocking all future messages indefinitely.
# Set generously: the idle timer in claude.py is the real safety net
# (fires after timeout_seconds * 5 of silence); this is a last-resort
# backstop for interactions that run legitimately long with active output.
_LOCK_ACQUIRE_TIMEOUT = 3600 # 1 hour


async def _acquire_lock_or_kill(
Expand Down
34 changes: 19 additions & 15 deletions src/kai/claude.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,23 +575,22 @@ async def _send_locked(self, prompt: str | list, chat_id: int | None = None) ->
return

accumulated_text = ""
# Wall-clock limit for the entire interaction. The per-readline
# timeout below catches dead processes (no output for 6 min), but
# a process stuck in a tool-use loop emits progress events that
# reset the readline timer indefinitely. This outer limit catches
# that case: if the total interaction exceeds N minutes regardless
# of output, kill the process.
interaction_start = time.monotonic()
max_interaction_seconds = self.timeout_seconds * 5 # 10 min at default 120s
# Idle-activity timeout for the interaction. Resets every time the
# process emits a line of output. If the process goes silent for
# this long, it is likely dead or wedged. The per-readline timeout
# below (timeout_seconds * 3) is the primary dead-process detector;
# this is a secondary safety net measured across the whole interaction.
last_activity = time.monotonic()
max_idle_seconds = self.timeout_seconds * 5 # 10 min of silence at default 120s
try:
while True:
# Check wall-clock limit before each readline
elapsed = time.monotonic() - interaction_start
if elapsed > max_interaction_seconds:
# Check idle timeout before each readline
idle_seconds = time.monotonic() - last_activity
if idle_seconds > max_idle_seconds:
log.error(
"Interaction exceeded wall-clock limit (%.0fs > %ds)",
elapsed,
max_interaction_seconds,
"Interaction idle timeout (%.0fs with no output, limit %ds)",
idle_seconds,
max_idle_seconds,
)
await self._kill()
yield StreamEvent(
Expand All @@ -600,7 +599,7 @@ async def _send_locked(self, prompt: str | list, chat_id: int | None = None) ->
response=ClaudeResponse(
success=False,
text=accumulated_text,
error="Claude interaction timed out (too long)",
error="Claude interaction timed out (no output)",
),
)
return
Expand All @@ -619,6 +618,11 @@ async def _send_locked(self, prompt: str | list, chat_id: int | None = None) ->
)
return

# Reset idle timer - process is alive and producing output.
# Do NOT reset on empty line (EOF); that means the process died.
if line:
last_activity = time.monotonic()

if not line:
# Process died unexpectedly
log.error("Claude process EOF")
Expand Down
2 changes: 1 addition & 1 deletion tests/test_bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -2458,7 +2458,7 @@ async def test_timeout_kills_claude(self):
await held_lock.acquire()

try:
# Patch the timeout to something tiny so the test doesn't wait 11 min
# Patch the timeout to something tiny so the test doesn't wait 1 hour
with patch("kai.bot._LOCK_ACQUIRE_TIMEOUT", 0.05):
result = await _acquire_lock_or_kill(chat_id, claude, update)
finally:
Expand Down
115 changes: 88 additions & 27 deletions tests/test_claude.py
Original file line number Diff line number Diff line change
Expand Up @@ -863,52 +863,69 @@ async def test_timeout(self):
assert "timed out" in events[-1].response.error.lower()

@pytest.mark.asyncio
async def test_wall_clock_timeout(self):
"""Interaction killed when total elapsed time exceeds wall-clock limit.

Simulates a tool-use loop: the process keeps emitting assistant
events (resetting the per-readline timeout) but the overall
interaction exceeds timeout_seconds * 5. The wall-clock guard
should fire and kill the process.
async def test_idle_timeout(self):
"""Interaction killed when process goes silent longer than idle limit.

The idle timeout (timeout_seconds * 5) is a secondary safety net
behind the per-readline timeout (timeout_seconds * 3). Because the
readline timeout is shorter, it normally fires first for a truly
silent process. This test uses a mocked time source to force the
idle check to trigger, verifying the error path works correctly.
"""
claude = _make_claude(timeout_seconds=1) # wall-clock limit = 5s

# Feed assistant events indefinitely (simulating a chatty tool loop)
call_count = 0

async def slow_readline():
nonlocal call_count
call_count += 1
# Each readline returns an assistant event after a short delay.
# After enough calls, the wall-clock limit is exceeded.
await asyncio.sleep(0.3)
return _assistant_event(f"Tool output {call_count}")
claude = _make_claude(timeout_seconds=1) # idle limit = 5s

# Control time progression in kai.claude without affecting asyncio.
# The streaming loop calls time.monotonic() in a fixed pattern:
# 1. Init: last_activity = time.monotonic()
# 2. Idle check (iter 1): time.monotonic() - last_activity
# 3. Reset after readline 1: last_activity = time.monotonic()
# 4. Idle check (iter 2): time.monotonic() - last_activity
# 5. Reset after readline 2: last_activity = time.monotonic()
# 6. Idle check (iter 3): time.monotonic() - last_activity <- jump here
# Calls 1-5 return small values; call 6+ returns 100.0 so the
# idle check sees (100.0 - 0.5) > 5s and fires.
mono_call = [0]

def fake_monotonic():
mono_call[0] += 1
if mono_call[0] <= 5:
return mono_call[0] * 0.1
return 100.0

readline_count = [0]

async def readline_with_output():
readline_count[0] += 1
if readline_count[0] <= 2:
return _assistant_event(f"Output {readline_count[0]}")
# If we get here, the idle timeout didn't fire as expected
pytest.fail("readline reached call 3 - idle timeout did not fire")

proc = _make_mock_proc([])
proc.stdout.readline = slow_readline
proc.stdout.readline = readline_with_output
claude._proc = proc
claude._fresh_session = False

# Patch wait_for to actually respect the real timeout (not mock it)
events = await _collect_events(claude)
with patch("kai.claude.time.monotonic", side_effect=fake_monotonic):
events = await _collect_events(claude)

# Should get the wall-clock timeout error, not a readline timeout
# Should get the idle timeout error
done_event = events[-1]
assert done_event.done is True
assert done_event.response.success is False
assert "too long" in done_event.response.error.lower()
assert "no output" in done_event.response.error.lower()

@pytest.mark.asyncio
async def test_wall_clock_normal_completion_unaffected(self):
"""Normal interactions that complete quickly are not affected by wall-clock limit."""
async def test_idle_timeout_normal_completion_unaffected(self):
"""Normal interactions that complete quickly are not affected by idle timeout."""
proc = _make_mock_proc(
[
_system_event(),
_assistant_event("Hello"),
_result_event("Done"),
]
)
claude = _make_claude(timeout_seconds=1) # wall-clock = 5s
claude = _make_claude(timeout_seconds=1) # idle limit = 5s
claude._proc = proc
claude._fresh_session = False

Expand All @@ -919,6 +936,50 @@ async def test_wall_clock_normal_completion_unaffected(self):
assert done_event.response.success is True
assert done_event.response.error is None

@pytest.mark.asyncio
async def test_active_process_survives_past_old_wall_clock(self):
"""A process that keeps producing output is not killed by the idle timer.

This is the core behavioral change: under the old wall-clock limit,
any interaction exceeding timeout_seconds * 5 was killed regardless
of activity. The idle timer only fires after prolonged silence.
"""
claude = _make_claude(timeout_seconds=1) # idle limit = 5s, old wall-clock = 5s

# Simulate a process that takes longer than timeout_seconds * 5 total
# but keeps producing output (resetting the idle timer each time).
# Uses mocked time so the test runs instantly. Each readline advances
# time by 0.5s; after 20 calls that is 10s total, well past the old
# 5s wall-clock limit. The idle timer never fires because each
# readline resets it to within 0.5s.
sim_time = [0.0]

def advancing_monotonic():
return sim_time[0]

call_count = [0]

async def active_readline():
call_count[0] += 1
if call_count[0] <= 20:
sim_time[0] += 0.5 # advance 0.5s per event
return _assistant_event(f"Working... step {call_count[0]}")
return _result_event("All done")

proc = _make_mock_proc([])
proc.stdout.readline = active_readline
claude._proc = proc
claude._fresh_session = False

with patch("kai.claude.time.monotonic", side_effect=advancing_monotonic):
events = await _collect_events(claude)

# Should complete successfully despite total time > timeout_seconds * 5
done_event = events[-1]
assert done_event.done is True
assert done_event.response.success is True
assert done_event.response.text # Has content

@pytest.mark.asyncio
async def test_eof_with_text(self):
"""EOF with accumulated text yields success=True, error=None."""
Expand Down
Loading