Skip to content

Commit 070be2f

Browse files
fix(SerialManager): strictly gate polling on IDN in all paths; clear registry (IDN + status_chN) on disconnect/reconnect failure; add monitoring tests to ensure no polling without IDN
- Gate monitor_connections polling on registry IDN - Clear IDN and all status_chN on empty/exception polls and reconnection failures - Add tests: no polling when IDN missing; polling starts only after IDN; empty poll clears IDN and halts polling; identify not called while IDN present Co-authored-by: openhands <openhands@all-hands.dev>
1 parent 16812ff commit 070be2f

3 files changed

Lines changed: 332 additions & 28 deletions

File tree

benchmesh-serial-service/src/benchmesh_service/serial_manager.py

Lines changed: 131 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
from .logger import setup_logger
1111

1212
logger = logging.getLogger(__name__)
13+
IDENTIFY_INTERVAL = 1.0
14+
1315

1416
MANIFEST_ALIASES = {
1517
'tenma_psu': 'tenma_72',
@@ -72,6 +74,33 @@ def _get_channel_count(dev: dict) -> int:
7274
return max(1, ch)
7375
except Exception:
7476
return 1
77+
78+
79+
def _get_poll_interval(dev: dict) -> float:
80+
try:
81+
driver_key = dev.get("driver")
82+
manifest = _load_manifest(driver_key) if driver_key else None
83+
if not isinstance(manifest, dict):
84+
return 2.0
85+
models = manifest.get("models") or {}
86+
model_key = dev.get("model")
87+
model_cfg = None
88+
if model_key and isinstance(models.get(model_key), dict):
89+
model_cfg = models.get(model_key)
90+
elif isinstance(models, dict) and models:
91+
model_cfg = next(iter(models.values()))
92+
if not isinstance(model_cfg, dict):
93+
return 2.0
94+
pooling = model_cfg.get("pooling") or []
95+
for entry in pooling:
96+
try:
97+
if entry.get("method") == "poll_status":
98+
return float(entry.get("interval", 2.0))
99+
except Exception:
100+
continue
101+
return 2.0
102+
except Exception:
103+
return 2.0
75104
def _load_driver_class(driver_key: str):
76105
"""Load a driver class given its key.
77106
@@ -142,6 +171,7 @@ def __init__(self, config_source: Any):
142171
self.dev_threads: Dict[str, threading.Thread] = {}
143172
self.registry: Dict[str, Dict[str, Any]] = {d.get('id'): {} for d in self.devices if d.get('id')}
144173
self.dev_channels: Dict[str, int] = {}
174+
self.dev_poll_interval: Dict[str, float] = {}
145175
self._last_registry_log: float = 0.0
146176

147177
self.establish_connections()
@@ -184,6 +214,20 @@ def _update_registry(self, dev_id: str, key: str, value: Any):
184214
self.registry[dev_id] = {}
185215
self.registry[dev_id][key] = value
186216

217+
def _clear_disconnected_registry(self, dev_id: str):
218+
data = self.registry.get(dev_id)
219+
if not isinstance(data, dict):
220+
self.registry[dev_id] = {}
221+
return
222+
# Remove IDN and all per-channel status entries
223+
if 'IDN' in data:
224+
del data['IDN']
225+
for k in [k for k in list(data.keys()) if isinstance(k, str) and k.startswith('status_ch')]:
226+
try:
227+
del data[k]
228+
except Exception:
229+
pass
230+
187231
def monitor_connections(self):
188232
print("Starting connection monitor thread.")
189233
while self.keep_running:
@@ -196,30 +240,47 @@ def monitor_connections(self):
196240
drv = self.connections.get(device_id)
197241
is_open = getattr(getattr(drv, 't', None), 'is_open', False) if drv else False
198242

199-
if is_open:
200-
# Poll device status every 2 seconds (legacy monitor path)
243+
if is_open and self.registry.get(device_id, {}).get('IDN'):
244+
# Poll device status per manifest interval
245+
poll_iv = self.dev_poll_interval.get(device_id)
246+
if not poll_iv:
247+
poll_iv = _get_poll_interval(dev)
248+
self.dev_poll_interval[device_id] = poll_iv
201249
last_poll = self.last_probe.get(device_id, 0.0)
202-
if now - last_poll >= 2.0:
250+
if now - last_poll >= poll_iv:
203251
try:
204252
# Determine channel count for this device and poll per channel
205253
ch_count = self.dev_channels.get(device_id)
206254
if not ch_count:
207255
ch_count = _get_channel_count(dev)
208256
self.dev_channels[device_id] = ch_count
209257
# poll each channel and store under status_ch{n}
258+
polled_any = False
210259
for ch in range(1, max(1, ch_count)+1):
211260
try:
212-
if hasattr(drv, 'poll_status'):
213-
status = drv.poll_status(ch)
214-
else:
215-
status = {}
216-
key = f'status_ch{ch}'
217-
self._update_registry(device_id, key, status)
261+
status = drv.poll_status(ch) if hasattr(drv, 'poll_status') else {}
218262
except Exception as _e:
219-
raise
220-
self.last_ok[device_id] = now
221-
self.last_probe[device_id] = now
222-
logger.debug("Polled status for %s -> %s", device_id, status)
263+
status = {}
264+
# If status is falsy (empty dict/None), treat as link drop
265+
if not status:
266+
# Link considered down: clear IDN and per-channel statuses, stop polling
267+
self._clear_disconnected_registry(device_id)
268+
polled_any = False
269+
break
270+
key = f'status_ch{ch}'
271+
self._update_registry(device_id, key, status)
272+
polled_any = True
273+
if polled_any:
274+
self.last_ok[device_id] = now
275+
self.last_probe[device_id] = now
276+
logger.debug("Polled status for %s (channels=%s)", device_id, ch_count)
277+
else:
278+
# Consider link down: close and mark disconnected; do not update last_probe to allow immediate identify
279+
try:
280+
drv.close()
281+
except Exception:
282+
pass
283+
self.connections[device_id] = None
223284
except Exception:
224285
# any error -> mark disconnected and allow reconnect on schedule
225286
try:
@@ -229,11 +290,27 @@ def monitor_connections(self):
229290
self.connections[device_id] = None
230291
self.last_probe[device_id] = now
231292
# If not open or marked None -> try to reconnect every 2 seconds
232-
if not is_open or self.connections.get(device_id) is None:
293+
# If link not open, try to (re)identify at a fixed cadence without calling other methods
294+
if not is_open or self.connections.get(device_id) is None or not self.registry.get(device_id, {}).get('IDN'):
233295
last_attempt = self.last_open_attempt.get(device_id, 0.0)
234-
if now - last_attempt >= 2.0:
296+
if now - last_attempt >= IDENTIFY_INTERVAL:
235297
self.last_open_attempt[device_id] = now
236-
self.reconnect(dev)
298+
try:
299+
# Attempt reconnect if driver missing or closed
300+
if not is_open or self.connections.get(device_id) is None:
301+
self.reconnect(dev)
302+
# If we have a driver and transport is open, try identify only
303+
drv = self.connections.get(device_id)
304+
is_open = getattr(getattr(drv, 't', None), 'is_open', False) if drv else False
305+
if drv and is_open:
306+
ident = self._try_identify(drv)
307+
if ident:
308+
self._update_registry(device_id, 'IDN', ident)
309+
self.last_ok[device_id] = now
310+
self.last_probe[device_id] = now
311+
except Exception:
312+
# ignore, will retry
313+
pass
237314

238315
# Periodically dump registry at DEBUG
239316
if now - self._last_registry_log >= 5.0:
@@ -258,33 +335,55 @@ def _device_worker(self, dev_id: str):
258335
continue
259336
drv = self.connections.get(dev_id)
260337
is_open = getattr(getattr(drv, 't', None), 'is_open', False) if drv else False
261-
if is_open:
262-
# Poll device status every 2 seconds
338+
if is_open and self.registry.get(dev_id, {}).get('IDN'):
339+
# Poll device status per manifest interval
340+
poll_iv = self.dev_poll_interval.get(dev_id)
341+
if not poll_iv:
342+
poll_iv = _get_poll_interval(dev)
343+
self.dev_poll_interval[dev_id] = poll_iv
263344
last_poll = self.last_probe.get(dev_id, 0.0)
264-
if now - last_poll >= 2.0:
345+
if now - last_poll >= poll_iv:
265346
try:
266347
# Determine channel count and poll each channel
267348
ch_count = self.dev_channels.get(dev_id)
268349
if not ch_count:
269350
ch_count = _get_channel_count(dev)
270351
self.dev_channels[dev_id] = ch_count
352+
polled_any = False
271353
for ch in range(1, max(1, ch_count)+1):
272-
st = {}
273-
if hasattr(drv, 'poll_status'):
274-
st = drv.poll_status(ch)
354+
try:
355+
st = drv.poll_status(ch) if hasattr(drv, 'poll_status') else {}
356+
except Exception:
357+
st = {}
358+
if not st:
359+
self._clear_disconnected_registry(dev_id)
360+
polled_any = False
361+
break
275362
key = f"status_ch{ch}"
276363
self._update_registry(dev_id, key, st)
277-
self.last_ok[dev_id] = now
278-
self.last_probe[dev_id] = now
279-
logger.debug("Polled status for %s (channels=%s)", dev_id, ch_count)
364+
polled_any = True
365+
if polled_any:
366+
self.last_ok[dev_id] = now
367+
self.last_probe[dev_id] = now
368+
logger.debug("Polled status for %s (channels=%s)", dev_id, ch_count)
369+
else:
370+
# Consider link down: clear IDN and per-channel statuses, close and mark disconnected
371+
self._clear_disconnected_registry(dev_id)
372+
try:
373+
drv.close()
374+
except Exception:
375+
pass
376+
self.connections[dev_id] = None
280377
except Exception as e:
281378
logger.warning("Status poll failed for %s: %s; marking disconnected", dev_id, e)
379+
# On poll exception, clear IDN and per-channel statuses, close, and mark disconnected
380+
self._clear_disconnected_registry(dev_id)
282381
try:
283382
drv.close()
284383
except Exception:
285384
pass
286385
self.connections[dev_id] = None
287-
self.last_probe[dev_id] = now
386+
# do not update last_probe to allow immediate identify attempts
288387
if not is_open or self.connections.get(dev_id) is None:
289388
last_attempt = self.last_open_attempt.get(dev_id, 0.0)
290389
if now - last_attempt >= 2.0:
@@ -323,6 +422,10 @@ def reconnect(self, device_or_id):
323422
except Exception:
324423
pass
325424

425+
# If link not open or reconnection failed -> clear registry to reflect disconnected state
426+
if self.connections.get(device_id) is None:
427+
self._clear_disconnected_registry(device_id)
428+
326429
# Create driver instance using manifest EOL settings and config serial params
327430
try:
328431
driver_key = dev['driver']
@@ -374,6 +477,8 @@ def identify(self):
374477
except Exception as e:
375478
self.logger.error(f"Reconnection failed for {dev.get('name', device_id)}: {e}")
376479
self.connections[device_id] = None
480+
# Ensure registry reflects disconnected state
481+
self._clear_disconnected_registry(device_id)
377482
return None
378483

379484
def start(self):

benchmesh-serial-service/tests/test_serial_manager_concurrency.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,9 @@ def stub(channel=1):
7676
return {"ok": True, "dev": dev_id}
7777
return stub
7878
drv.poll_status = _make_stub(d['id']) # type: ignore[attr-defined]
79-
# Force immediate poll on next loop by resetting last_probe after start/identify
79+
# Provide IDN to enable polling and force immediate poll on next loop
80+
m.registry[d['id']]['IDN'] = 'FAKE,IDN'
81+
m.dev_poll_interval[d['id']] = 0.1
8082
m.last_probe[d['id']] = 0.0
8183

8284
# Wait for first poll to occur for each device
@@ -138,11 +140,17 @@ def failing_poll(channel=1):
138140
raise RuntimeError('boom')
139141
drv.poll_status = failing_poll # type: ignore[attr-defined]
140142
# Ensure the device worker tries immediately but prevent instant reconnect
143+
# Mark as identified so worker will poll and then drop on failure
144+
m.registry[dev_id]['IDN'] = 'FAKE,IDN'
145+
141146
m.last_probe[dev_id] = 0.0
147+
# speed up poll interval for test determinism
148+
m.dev_poll_interval[dev_id] = 0.1
149+
142150
m.last_open_attempt[dev_id] = time.time()
143151

144152
# Allow worker loop to run and drop connection
145-
time.sleep(0.8)
153+
time.sleep(1.1)
146154
assert m.connections[dev_id] is None, "Expected connection to be dropped after poll failure"
147155

148156
# Now install a reconnect spy and allow immediate reconnect

0 commit comments

Comments
 (0)