Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
__pycache__
src/_assembled.html
src/web_ui_gz.h
dist/
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ build/
build_web.py ← assembles template + tool files → gzip C header
dev_server.py ← HTTP + mock WebSocket dev server, auto-reload
tools/
userial/ ← RS-232 serial monitor/sender
userial/ ← Dual-UART serial monitor/injector/passthrough bridge
wifiscan/ ← WiFi network scanner
```

Expand Down
210 changes: 189 additions & 21 deletions build/dev_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,114 @@
_assembled_html = ""
_last_hash = ""
_mock_data = {}
_ws_url_rewrite = None


def _preview_ascii(data: bytes) -> str:
out = []
for value in data[:32]:
if 32 <= value < 127:
out.append(chr(value))
elif value == 13:
out.append("\\r")
elif value == 10:
out.append("\\n")
elif value == 9:
out.append("\\t")
else:
out.append(".")
return "".join(out)


def _preview_hex(data: bytes) -> str:
return " ".join(f"{value:02X}" for value in data[:32])


def _parse_hex_bytes(raw: str):
raw = raw or ""
out = bytearray()
i = 0
while i < len(raw):
ch = raw[i]
if ch in " ,\t\r\n":
i += 1
continue
if ch not in "0123456789abcdefABCDEF":
return None
token = ch
i += 1
if i < len(raw) and raw[i] in "0123456789abcdefABCDEF":
token += raw[i]
i += 1
if i < len(raw) and raw[i] not in " ,\t\r\n":
return None
out.append(int(token, 16))
return bytes(out)


def _normalize_mock_state():
status = _mock_data.setdefault("status", {"type": "status"})
settings = _mock_data.setdefault("settings", {"type": "settings"})
history = _mock_data.setdefault("history", {"type": "history", "items": []})

status.setdefault("rxBytes", 0)
status.setdefault("txBytes", 0)
status.setdefault("uptime", 0)
status.setdefault("heap", 180000)
status.setdefault("baud", 9600)
status.setdefault("databits", 8)
status.setdefault("parity", "N")
status.setdefault("stopbits", 1)
status.setdefault("config", "8N1")
status.setdefault("passthroughMode", "both")
status.setdefault("wifiMode", 0)
status.setdefault("mdnsHost", "tool")
status.setdefault("mdnsActive", True)
status.setdefault("apIP", "192.168.4.1")

ports = status.setdefault("ports", [
{"id": "A", "label": "Channel A", "rxPin": 4, "txPin": 5, "rxBytes": 0, "txBytes": 0},
{"id": "B", "label": "Channel B", "rxPin": 6, "txPin": 7, "rxBytes": 0, "txBytes": 0},
])
for port in ports:
port.setdefault("label", f"Channel {port.get('id', 'A')}")
port.setdefault("rxPin", 4 if port.get("id") == "A" else 6)
port.setdefault("txPin", 5 if port.get("id") == "A" else 7)
port.setdefault("rxBytes", 0)
port.setdefault("txBytes", 0)

settings.update({
"type": "settings",
"ssid": settings.get("ssid", "ToolAP"),
"apHasPass": settings.get("apHasPass", True),
"wifiMode": settings.get("wifiMode", status["wifiMode"]),
"staSSID": settings.get("staSSID", ""),
"staHasPass": settings.get("staHasPass", False),
"mdnsHost": settings.get("mdnsHost", status["mdnsHost"]),
"baud": settings.get("baud", status["baud"]),
"databits": settings.get("databits", status["databits"]),
"parity": settings.get("parity", status["parity"]),
"stopbits": settings.get("stopbits", status["stopbits"]),
"config": settings.get("config", status["config"]),
"passthroughMode": settings.get("passthroughMode", status["passthroughMode"]),
"ports": ports,
})
history.setdefault("type", "history")
history.setdefault("items", [])


def _port_state(port_id: str):
port_id = (port_id or "A").upper()
for port in _mock_data["status"]["ports"]:
if port["id"] == port_id:
return port
return _mock_data["status"]["ports"][0]


def _append_history(entry):
items = _mock_data.setdefault("history", {"type": "history", "items": []})["items"]
items.append(entry)
del items[:-80]

# ---------------------------------------------------------------------------
# File watching
Expand Down Expand Up @@ -74,6 +182,11 @@ def _rebuild_if_needed():
if current != _last_hash:
_last_hash = current
_assembled_html = assemble(_base_dir, _tool_dir)
if _ws_url_rewrite:
_assembled_html = _assembled_html.replace(
"ws://' + location.host + '/ws",
_ws_url_rewrite
)
# Inject auto-reload snippet
reload_script = """
<script>
Expand Down Expand Up @@ -182,6 +295,7 @@ def _load_mock_data():
"items": []
}
}
_normalize_mock_state()


async def _mock_ws_handler(websocket):
Expand Down Expand Up @@ -211,24 +325,88 @@ async def _mock_ws_handler(websocket):
}))
elif cmd in ("send", "sendAscii"):
data = msg.get("data", "")
port = _port_state(msg.get("port", "A"))
if msg.get("crlf"):
data += "\r\n"
payload = data.encode("utf-8", "ignore")
port["txBytes"] += len(payload)
_mock_data["status"]["txBytes"] = sum(item["txBytes"] for item in _mock_data["status"]["ports"])
_append_history({
"ts": int(time.time() * 1000),
"port": port["id"],
"dir": "TX",
"hex": _preview_hex(payload),
"ascii": _preview_ascii(payload),
})
await websocket.send(json.dumps({
"type": "sent",
"len": len(data),
"hex": " ".join(f"{ord(c):02X}" for c in data[:32]),
"ascii": data[:32],
"total": 256 + len(data)
"port": port["id"],
"len": len(payload),
"hex": _preview_hex(payload),
"ascii": _preview_ascii(payload),
"total": _mock_data["status"]["txBytes"],
"portTxBytes": port["txBytes"],
"ts": int(time.time() * 1000)
}))
elif cmd == "sendHex":
port = _port_state(msg.get("port", "A"))
payload = _parse_hex_bytes(msg.get("data", ""))
if payload is None:
await websocket.send(json.dumps({"type": "error", "msg": "Invalid hex payload"}))
continue
port["txBytes"] += len(payload)
_mock_data["status"]["txBytes"] = sum(item["txBytes"] for item in _mock_data["status"]["ports"])
_append_history({
"ts": int(time.time() * 1000),
"port": port["id"],
"dir": "TX",
"hex": _preview_hex(payload),
"ascii": _preview_ascii(payload),
})
await websocket.send(json.dumps({
"type": "sent",
"len": 4,
"hex": msg.get("data", "00"),
"ascii": "....",
"total": 260
"port": port["id"],
"len": len(payload),
"hex": _preview_hex(payload),
"ascii": _preview_ascii(payload),
"total": _mock_data["status"]["txBytes"],
"portTxBytes": port["txBytes"],
"ts": int(time.time() * 1000)
}))
elif cmd == "setSerial":
baud = int(msg.get("baud", _mock_data["status"]["baud"]))
databits = int(msg.get("databits", _mock_data["status"]["databits"]))
parity = str(msg.get("parity", _mock_data["status"]["parity"]))
stopbits = int(msg.get("stopbits", _mock_data["status"]["stopbits"]))
config = f"{databits}{parity}{stopbits}"
for section in ("status", "settings"):
_mock_data[section]["baud"] = baud
_mock_data[section]["databits"] = databits
_mock_data[section]["parity"] = parity
_mock_data[section]["stopbits"] = stopbits
_mock_data[section]["config"] = config
await websocket.send(json.dumps({
"type": "serialConfig",
"baud": baud,
"databits": databits,
"parity": parity,
"stopbits": stopbits,
"config": config
}))
elif cmd == "setPassthrough":
mode = msg.get("mode", "both")
_mock_data["status"]["passthroughMode"] = mode
_mock_data["settings"]["passthroughMode"] = mode
await websocket.send(json.dumps({"type": "passthroughConfig", "mode": mode}))
elif cmd == "savesettings" or cmd == "savewifi":
await websocket.send(json.dumps({"type": "saved"}))
elif cmd == "clearHistory":
for port in _mock_data["status"]["ports"]:
port["rxBytes"] = 0
port["txBytes"] = 0
_mock_data["status"]["rxBytes"] = 0
_mock_data["status"]["txBytes"] = 0
_mock_data.setdefault("history", {"type": "history", "items": []})["items"] = []
await websocket.send(json.dumps({"type": "cleared"}))
else:
print(f"[dev] Unhandled WS command: {cmd}")
Expand All @@ -253,7 +431,7 @@ async def _run_ws_server(host, port):
# ---------------------------------------------------------------------------

def main():
global _base_dir, _tool_dir
global _base_dir, _tool_dir, _ws_url_rewrite

parser = argparse.ArgumentParser(description="Toolbox Base dev server")
parser.add_argument("--base-dir", type=Path, default=Path(__file__).resolve().parent.parent,
Expand All @@ -267,6 +445,8 @@ def main():
_base_dir = args.base_dir.resolve()
_tool_dir = args.tool_dir.resolve()
http_port = args.port
ws_port = args.ws_port or (http_port + 1)
_ws_url_rewrite = f"ws://' + location.hostname + ':{ws_port}/ws"

if not (_base_dir / "web" / "template.html").exists():
print(f"Error: template.html not found in {_base_dir / 'web'}")
Expand All @@ -278,18 +458,6 @@ def main():
_load_mock_data()
_rebuild_if_needed()

# The trick: we serve HTTP and WS on the same port by using a combined approach
# HTTP serves the page, WS runs on the same port with /ws path
# We'll use separate ports for simplicity: HTTP on --port, WS on --port+1
ws_port = args.ws_port or (http_port + 1)

# Patch the assembled HTML to point WS to the dev server port
global _assembled_html
_assembled_html = _assembled_html.replace(
"ws://' + location.host + '/ws",
f"ws://' + location.hostname + ':{ws_port}/ws"
)

# Start HTTP server in a thread
httpd = HTTPServer(("0.0.0.0", http_port), DevHandler)
http_thread = threading.Thread(target=httpd.serve_forever, daemon=True)
Expand Down
Loading
Loading