Skip to content

Commit 2d00591

Browse files
bpiwowarclaude
andcommitted
fix: enable kill_job and process info from ssh-monitor (#219)
The remote client was sending job_id without task_id, but the server handlers require both — kill_job, clean_job, and get_process_info RPCs all failed with TypeError. Add task_id to the params dict in each, and pass through the cpu/memory/threads metrics already computed server-side so ssh-monitor reaches feature parity with the local TUI. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 5728a8c commit 2d00591

3 files changed

Lines changed: 161 additions & 0 deletions

File tree

src/experimaestro/scheduler/remote/client.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1137,6 +1137,7 @@ def kill_job(self, job: BaseJob, perform: bool = False) -> bool:
11371137
return job.state.running()
11381138

11391139
params = {
1140+
"task_id": job.task_id,
11401141
"job_id": job.identifier,
11411142
"experiment_id": getattr(job, "experiment_id", ""),
11421143
"run_id": getattr(job, "run_id", ""),
@@ -1154,6 +1155,7 @@ def clean_job(self, job: BaseJob, perform: bool = False) -> bool:
11541155
return job.state.finished()
11551156

11561157
params = {
1158+
"task_id": job.task_id,
11571159
"job_id": job.identifier,
11581160
"experiment_id": getattr(job, "experiment_id", ""),
11591161
"run_id": getattr(job, "run_id", ""),
@@ -1199,6 +1201,7 @@ def get_process_info(self, job: BaseJob):
11991201
from experimaestro.scheduler.state_provider import ProcessInfo
12001202

12011203
params = {
1204+
"task_id": job.task_id,
12021205
"job_id": job.identifier,
12031206
"experiment_id": getattr(job, "experiment_id", ""),
12041207
"run_id": getattr(job, "run_id", ""),
@@ -1217,6 +1220,9 @@ def get_process_info(self, job: BaseJob):
12171220
pid=result["pid"],
12181221
type=result["type"],
12191222
running=result.get("running", False),
1223+
cpu_percent=result.get("cpu_percent"),
1224+
memory_mb=result.get("memory_mb"),
1225+
num_threads=result.get("num_threads"),
12201226
)
12211227

12221228
def execute_warning_action(

src/experimaestro/scheduler/remote/server.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,9 @@ def _handle_get_process_info(self, params: Dict) -> Optional[Dict]:
466466
"pid": pinfo.pid,
467467
"type": pinfo.type,
468468
"running": pinfo.running,
469+
"cpu_percent": pinfo.cpu_percent,
470+
"memory_mb": pinfo.memory_mb,
471+
"num_threads": pinfo.num_threads,
469472
}
470473

471474
def _handle_execute_warning_action(self, params: Dict) -> None:

src/experimaestro/tests/test_remote_state.py

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -780,6 +780,50 @@ def test_handle_clean_job_not_found(self, server_with_mock, mock_state_provider)
780780
assert result["success"] is False
781781
assert "error" in result
782782

783+
def test_handle_get_process_info_returns_metrics(
784+
self, server_with_mock, mock_state_provider
785+
):
786+
"""get_process_info response includes cpu/memory/threads metrics"""
787+
from experimaestro.scheduler.state_provider import ProcessInfo
788+
789+
mock_job = MockJob(
790+
identifier="job1",
791+
task_id="task.Test",
792+
path=Path("/tmp/jobs/job1"),
793+
state="running",
794+
starttime=None,
795+
endtime=None,
796+
progress=[],
797+
updated_at="",
798+
)
799+
mock_state_provider.get_job.return_value = mock_job
800+
mock_state_provider.get_process_info.return_value = ProcessInfo(
801+
pid=42,
802+
type="local",
803+
running=True,
804+
cpu_percent=12.5,
805+
memory_mb=128.0,
806+
num_threads=4,
807+
)
808+
809+
result = server_with_mock._handle_get_process_info(
810+
{"task_id": "task.Test", "job_id": "job1"}
811+
)
812+
813+
assert result == {
814+
"pid": 42,
815+
"type": "local",
816+
"running": True,
817+
"cpu_percent": 12.5,
818+
"memory_mb": 128.0,
819+
"num_threads": 4,
820+
}
821+
822+
def test_handle_get_process_info_missing_task_id(self, server_with_mock):
823+
"""get_process_info raises TypeError when task_id is missing"""
824+
with pytest.raises(TypeError, match="task_id and job_id are required"):
825+
server_with_mock._handle_get_process_info({"job_id": "job1"})
826+
783827
def test_handle_get_sync_info(self, server_with_mock):
784828
"""Test handling get_sync_info request"""
785829
result = server_with_mock._handle_get_sync_info({})
@@ -1018,6 +1062,114 @@ def test_path_mapping_outside_workspace(self, client):
10181062
assert job.path == Path("/other/path/job123")
10191063

10201064

1065+
class TestClientRPCParams:
1066+
"""Verify the client serializes the parameters expected by the server"""
1067+
1068+
@pytest.fixture
1069+
def client(self, tmp_path):
1070+
from experimaestro.scheduler.remote.client import SSHStateProviderClient
1071+
1072+
client = SSHStateProviderClient(
1073+
host="testhost",
1074+
remote_workspace="/remote/workspace",
1075+
)
1076+
client._temp_dir = str(tmp_path)
1077+
client.local_cache_dir = tmp_path
1078+
client.workspace_path = tmp_path
1079+
return client
1080+
1081+
@pytest.fixture
1082+
def running_job(self):
1083+
job = MockJob(
1084+
identifier="job1",
1085+
task_id="task.Test",
1086+
path=Path("/tmp/jobs/job1"),
1087+
state="running",
1088+
starttime=None,
1089+
endtime=None,
1090+
progress=[],
1091+
updated_at="",
1092+
)
1093+
return job
1094+
1095+
def test_kill_job_includes_task_id(self, client, running_job):
1096+
"""Client must send task_id alongside job_id (issue #219)"""
1097+
captured = {}
1098+
1099+
def fake_call_sync(method, params, timeout=None):
1100+
captured["method"] = method
1101+
captured["params"] = params
1102+
return {"success": True}
1103+
1104+
client._call_sync = fake_call_sync
1105+
1106+
assert client.kill_job(running_job, perform=True) is True
1107+
assert captured["method"] == RPCMethod.KILL_JOB
1108+
assert captured["params"]["task_id"] == "task.Test"
1109+
assert captured["params"]["job_id"] == "job1"
1110+
1111+
def test_clean_job_includes_task_id(self, client, running_job):
1112+
"""Client must send task_id alongside job_id for clean_job (issue #219)"""
1113+
captured = {}
1114+
1115+
def fake_call_sync(method, params, timeout=None):
1116+
captured["method"] = method
1117+
captured["params"] = params
1118+
return {"success": True}
1119+
1120+
client._call_sync = fake_call_sync
1121+
1122+
assert client.clean_job(running_job, perform=True) is True
1123+
assert captured["method"] == RPCMethod.CLEAN_JOB
1124+
assert captured["params"]["task_id"] == "task.Test"
1125+
assert captured["params"]["job_id"] == "job1"
1126+
1127+
def test_get_process_info_includes_task_id_and_metrics(self, client, running_job):
1128+
"""get_process_info sends task_id and surfaces metric fields (issue #219)"""
1129+
captured = {}
1130+
1131+
def fake_call_sync(method, params, timeout=None):
1132+
captured["method"] = method
1133+
captured["params"] = params
1134+
return {
1135+
"pid": 42,
1136+
"type": "local",
1137+
"running": True,
1138+
"cpu_percent": 12.5,
1139+
"memory_mb": 128.0,
1140+
"num_threads": 4,
1141+
}
1142+
1143+
client._call_sync = fake_call_sync
1144+
1145+
pinfo = client.get_process_info(running_job)
1146+
assert captured["method"] == RPCMethod.GET_PROCESS_INFO
1147+
assert captured["params"]["task_id"] == "task.Test"
1148+
assert captured["params"]["job_id"] == "job1"
1149+
assert pinfo is not None
1150+
assert pinfo.pid == 42
1151+
assert pinfo.type == "local"
1152+
assert pinfo.running is True
1153+
assert pinfo.cpu_percent == 12.5
1154+
assert pinfo.memory_mb == 128.0
1155+
assert pinfo.num_threads == 4
1156+
1157+
def test_get_process_info_old_server_response(self, client, running_job):
1158+
"""An old server omitting metric fields still yields a valid ProcessInfo"""
1159+
1160+
def fake_call_sync(method, params, timeout=None):
1161+
return {"pid": 42, "type": "slurm", "running": False}
1162+
1163+
client._call_sync = fake_call_sync
1164+
1165+
pinfo = client.get_process_info(running_job)
1166+
assert pinfo is not None
1167+
assert pinfo.pid == 42
1168+
assert pinfo.cpu_percent is None
1169+
assert pinfo.memory_mb is None
1170+
assert pinfo.num_threads is None
1171+
1172+
10211173
# =============================================================================
10221174
# Synchronizer Tests
10231175
# =============================================================================

0 commit comments

Comments
 (0)