Skip to content

Commit db4de96

Browse files
Merge pull request #46 from keyboardstaff/ws-rework
refactor: extract constants, deduplicate ack pattern
2 parents bb17cdf + 4e222e2 commit db4de96

2 files changed

Lines changed: 69 additions & 57 deletions

File tree

helpers/state_monitor.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
build_snapshot_from_request,
1616
)
1717
from helpers.ws import ConnectionIdentity, ConnectionNotFoundError, _ws_debug_enabled, ws_debug
18+
from helpers.ws_manager import STATE_PUSH_EVENT
1819

1920
if TYPE_CHECKING: # pragma: no cover - hints only
2021
from helpers.ws_manager import WsManager
@@ -292,7 +293,7 @@ async def _flush_push(self, identity: ConnectionIdentity) -> None:
292293
await manager.emit_to(
293294
namespace,
294295
sid,
295-
"state_push",
296+
STATE_PUSH_EVENT,
296297
payload,
297298
handler_id=handler_id,
298299
)

helpers/ws_manager.py

Lines changed: 67 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,16 @@ class _HandlerExecution:
233233
DIAGNOSTIC_EVENT = "ws_dev_console_event"
234234
LIFECYCLE_CONNECT_EVENT = "ws_lifecycle_connect"
235235
LIFECYCLE_DISCONNECT_EVENT = "ws_lifecycle_disconnect"
236+
STATE_PUSH_EVENT = "state_push"
237+
SERVER_RESTART_EVENT = "server_restart"
238+
239+
# Error codes returned by _build_error_result
240+
ERR_NO_HANDLERS = "NO_HANDLERS"
241+
ERR_HANDLER_ERROR = "HANDLER_ERROR"
242+
ERR_INVALID_FILTER = "INVALID_FILTER"
243+
ERR_INVALID_EVENT = "INVALID_EVENT"
244+
ERR_CONNECTION_NOT_FOUND = "CONNECTION_NOT_FOUND"
245+
ERR_TIMEOUT = "TIMEOUT"
236246

237247

238248
class WsManager:
@@ -559,13 +569,12 @@ async def process_client_event(
559569
handler_payload["correlationId"] = correlation_id
560570

561571
if not handlers:
562-
error = self._build_error_result(
572+
return self._ack_error(
563573
handler_id=self._identifier,
564-
code="NO_HANDLERS",
574+
code=ERR_NO_HANDLERS,
565575
message="No handlers available after security filtering",
566576
correlation_id=correlation_id,
567577
)
568-
return {"correlationId": correlation_id, "results": [error]}
569578

570579
with self.lock:
571580
info = self.connections.get((namespace, sid))
@@ -638,7 +647,7 @@ def _collect_results(
638647
results.append(
639648
self._build_error_result(
640649
handler_id=handler.identifier,
641-
code="HANDLER_ERROR",
650+
code=ERR_HANDLER_ERROR,
642651
message="Internal server error",
643652
details=str(value),
644653
correlation_id=correlation_id,
@@ -722,7 +731,7 @@ async def handle_connect(
722731
await self.emit_to(
723732
namespace,
724733
sid,
725-
"server_restart",
734+
SERVER_RESTART_EVENT,
726735
{
727736
"emittedAt": self._timestamp(),
728737
"runtimeId": runtime.get_runtime_id(),
@@ -829,121 +838,102 @@ async def route_event(
829838
include_meta_raw, "includeHandlers"
830839
)
831840
except ValueError as exc:
832-
error = self._build_error_result(
841+
return self._ack_error(
833842
handler_id=handler_id or self._identifier,
834-
code="INVALID_FILTER",
843+
code=ERR_INVALID_FILTER,
835844
message=str(exc),
836845
correlation_id=correlation_id,
846+
ack=ack,
837847
)
838-
if ack:
839-
ack({"correlationId": correlation_id, "results": [error]})
840-
return {"correlationId": correlation_id, "results": [error]}
841848

842849
try:
843850
exclude_meta = self._normalize_handler_filter(
844851
exclude_meta_raw, "excludeHandlers"
845852
)
846853
except ValueError as exc:
847-
error = self._build_error_result(
854+
return self._ack_error(
848855
handler_id=handler_id or self._identifier,
849-
code="INVALID_FILTER",
856+
code=ERR_INVALID_FILTER,
850857
message=str(exc),
851858
correlation_id=correlation_id,
859+
ack=ack,
852860
)
853-
payload_error = {"correlationId": correlation_id, "results": [error]}
854-
if ack:
855-
ack(payload_error)
856-
return payload_error
857861

858862
if exclude_meta_raw is not None and not allow_exclude:
859-
error = self._build_error_result(
863+
return self._ack_error(
860864
handler_id=handler_id or self._identifier,
861-
code="INVALID_FILTER",
865+
code=ERR_INVALID_FILTER,
862866
message="excludeHandlers is not supported for this operation",
863867
correlation_id=correlation_id,
868+
ack=ack,
864869
)
865-
if ack:
866-
ack({"correlationId": correlation_id, "results": [error]})
867-
return {"correlationId": correlation_id, "results": [error]}
868870

869871
if include_handlers is not None and include_meta is not None:
870872
if include_handlers != include_meta:
871-
error = self._build_error_result(
873+
return self._ack_error(
872874
handler_id=handler_id or self._identifier,
873-
code="INVALID_FILTER",
875+
code=ERR_INVALID_FILTER,
874876
message="Conflicting includeHandlers filters supplied",
875877
correlation_id=correlation_id,
878+
ack=ack,
876879
)
877-
if ack:
878-
ack({"correlationId": correlation_id, "results": [error]})
879-
return {"correlationId": correlation_id, "results": [error]}
880880

881881
if allow_exclude and exclude_handlers is not None and exclude_meta is not None:
882882
if exclude_handlers != exclude_meta:
883-
error = self._build_error_result(
883+
return self._ack_error(
884884
handler_id=handler_id or self._identifier,
885-
code="INVALID_FILTER",
885+
code=ERR_INVALID_FILTER,
886886
message="Conflicting excludeHandlers filters supplied",
887887
correlation_id=correlation_id,
888+
ack=ack,
888889
)
889-
if ack:
890-
ack({"correlationId": correlation_id, "results": [error]})
891-
return {"correlationId": correlation_id, "results": [error]}
892890

893891
include = include_handlers or include_meta
894892
exclude = exclude_handlers or (exclude_meta if allow_exclude else None)
895893

896894
try:
897895
validate_event_type(event_type)
898896
except (TypeError, ValueError) as exc:
899-
error = self._build_error_result(
897+
return self._ack_error(
900898
handler_id=handler_id or self._identifier,
901-
code="INVALID_EVENT",
899+
code=ERR_INVALID_EVENT,
902900
message=str(exc),
903901
correlation_id=correlation_id,
902+
ack=ack,
904903
)
905-
if ack:
906-
ack({"correlationId": correlation_id, "results": [error]})
907-
return {"correlationId": correlation_id, "results": [error]}
908904

909905
registered = self.handlers.get(namespace, [])
910906
if not registered:
911907
PrintStyle.warning(f"No handlers registered for namespace '{namespace}'")
912-
error = self._build_error_result(
908+
return self._ack_error(
913909
handler_id=handler_id or self._identifier,
914-
code="NO_HANDLERS",
910+
code=ERR_NO_HANDLERS,
915911
message=f"No handler for namespace '{namespace}'",
916912
correlation_id=correlation_id,
913+
ack=ack,
917914
)
918-
if ack:
919-
ack({"correlationId": correlation_id, "results": [error]})
920-
return {"correlationId": correlation_id, "results": [error]}
921915

922916
try:
923917
selected_handlers, _ = self._select_handlers(
924918
namespace, include=include, exclude=exclude
925919
)
926920
except ValueError as exc:
927-
error = self._build_error_result(
921+
return self._ack_error(
928922
handler_id=handler_id or self._identifier,
929-
code="INVALID_FILTER",
923+
code=ERR_INVALID_FILTER,
930924
message=str(exc),
931925
correlation_id=correlation_id,
926+
ack=ack,
932927
)
933-
if ack:
934-
ack({"correlationId": correlation_id, "results": [error]})
935-
return {"correlationId": correlation_id, "results": [error]}
936928

937929
if not selected_handlers:
938-
error = self._build_error_result(
930+
return self._ack_error(
939931
handler_id=handler_id or self._identifier,
940-
code="NO_HANDLERS",
932+
code=ERR_NO_HANDLERS,
941933
message=f"No handler for '{event_type}' after applying filters",
942934
correlation_id=correlation_id,
935+
ack=ack,
943936
)
944-
if ack:
945-
ack({"correlationId": correlation_id, "results": [error]})
946-
return {"correlationId": correlation_id, "results": [error]}
947937

948938
with self.lock:
949939
info = self.connections.get((namespace, sid))
@@ -1011,7 +1001,7 @@ async def request_for_sid(
10111001
"results": [
10121002
self._build_error_result(
10131003
handler_id=handler_id or self._identifier,
1014-
code="CONNECTION_NOT_FOUND",
1004+
code=ERR_CONNECTION_NOT_FOUND,
10151005
message=f"Connection '{sid}' not found in namespace '{namespace}'",
10161006
correlation_id=correlation_id,
10171007
)
@@ -1040,7 +1030,7 @@ async def _invoke() -> dict[str, Any]:
10401030
"results": [
10411031
self._build_error_result(
10421032
handler_id=handler_id or self._identifier,
1043-
code="TIMEOUT",
1033+
code=ERR_TIMEOUT,
10441034
message="Request timeout",
10451035
correlation_id=correlation_id,
10461036
)
@@ -1073,7 +1063,7 @@ async def route_event_all(
10731063
except ValueError as exc:
10741064
error = self._build_error_result(
10751065
handler_id=handler_id or self._identifier,
1076-
code="INVALID_FILTER",
1066+
code=ERR_INVALID_FILTER,
10771067
message=str(exc),
10781068
correlation_id=correlation_id,
10791069
)
@@ -1090,7 +1080,7 @@ async def route_event_all(
10901080
elif exclude_meta is not None and exclude_combined != exclude_meta:
10911081
error = self._build_error_result(
10921082
handler_id=handler_id or self._identifier,
1093-
code="INVALID_FILTER",
1083+
code=ERR_INVALID_FILTER,
10941084
message="Conflicting excludeHandlers filters supplied",
10951085
correlation_id=correlation_id,
10961086
)
@@ -1155,7 +1145,7 @@ async def _dispatch() -> dict[str, Any]:
11551145
"results": [
11561146
self._build_error_result(
11571147
handler_id=handler_id or self._identifier,
1158-
code="TIMEOUT",
1148+
code=ERR_TIMEOUT,
11591149
message="Request timeout",
11601150
correlation_id=correlation_id,
11611151
)
@@ -1459,6 +1449,27 @@ def _build_error_result(
14591449
result["durationMs"] = round(duration_ms, 4)
14601450
return result
14611451

1452+
def _ack_error(
1453+
self,
1454+
*,
1455+
handler_id: str | None = None,
1456+
code: str,
1457+
message: str,
1458+
correlation_id: str | None = None,
1459+
ack: Callable | None = None,
1460+
) -> dict[str, Any]:
1461+
"""Build an error response, optionally invoke the ack callback, and return."""
1462+
error = self._build_error_result(
1463+
handler_id=handler_id,
1464+
code=code,
1465+
message=message,
1466+
correlation_id=correlation_id,
1467+
)
1468+
response = {"correlationId": correlation_id, "results": [error]}
1469+
if ack:
1470+
ack(response)
1471+
return response
1472+
14621473
# Session tracking helpers (single-user defaults)
14631474
def get_sids_for_user(self, user: str | None = None) -> list[ConnectionIdentity]:
14641475
"""Return connection identities for a user; single-user default returns all."""

0 commit comments

Comments
 (0)