Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 22 additions & 10 deletions gateway/platforms/qqbot/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -1129,6 +1129,16 @@ async def _handle_c2c_message(
return
if not self._is_dm_allowed(user_openid):
return
source = self.build_source(
chat_id=user_openid,
user_id=user_openid,
chat_type="dm",
)
allow_attachment_processing = True
if self.gateway_runner:
authorizer = getattr(self.gateway_runner, "_is_user_authorized", None)
if callable(authorizer) and not authorizer(source):
allow_attachment_processing = False

text = content
attachments_raw = d.get("attachments")
Expand Down Expand Up @@ -1156,11 +1166,17 @@ async def _handle_c2c_message(
)

# Process all attachments uniformly (images, voice, files)
att_result = await self._process_attachments(attachments_raw)
image_urls = att_result["image_urls"]
image_media_types = att_result["image_media_types"]
voice_transcripts = att_result["voice_transcripts"]
attachment_info = att_result["attachment_info"]
if allow_attachment_processing:
att_result = await self._process_attachments(attachments_raw)
image_urls = att_result["image_urls"]
image_media_types = att_result["image_media_types"]
voice_transcripts = att_result["voice_transcripts"]
attachment_info = att_result["attachment_info"]
else:
image_urls = []
image_media_types = []
voice_transcripts = []
attachment_info = ""

# Append voice transcripts to the text body
if voice_transcripts:
Expand Down Expand Up @@ -1195,11 +1211,7 @@ async def _handle_c2c_message(

self._chat_type_map[user_openid] = "c2c"
event = MessageEvent(
source=self.build_source(
chat_id=user_openid,
user_id=user_openid,
chat_type="dm",
),
source=source,
text=text,
message_type=self._detect_message_type(image_urls, image_media_types),
raw_message=d,
Expand Down
11 changes: 8 additions & 3 deletions gateway/platforms/webhook.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
DEFAULT_PORT = 8644
_INSECURE_NO_AUTH = "INSECURE_NO_AUTH"
_DYNAMIC_ROUTES_FILENAME = "webhook_subscriptions.json"
_ENV_PLACEHOLDER_RE = re.compile(r"^\$\{[A-Za-z_][A-Za-z0-9_]*\}$")

# Hostnames/IP literals that only serve connections originating on the same
# machine. Anything else is treated as a public bind for safety-rail purposes.
Expand Down Expand Up @@ -590,24 +591,28 @@ def _validate_signature(
self, request: "web.Request", body: bytes, secret: str
) -> bool:
"""Validate webhook signature (GitHub, GitLab, generic HMAC-SHA256)."""
secret_text = secret.strip()
if _ENV_PLACEHOLDER_RE.fullmatch(secret_text):
return False

# GitHub: X-Hub-Signature-256 = sha256=<hex>
gh_sig = request.headers.get("X-Hub-Signature-256", "")
if gh_sig:
expected = "sha256=" + hmac.new(
secret.encode(), body, hashlib.sha256
secret_text.encode(), body, hashlib.sha256
).hexdigest()
return hmac.compare_digest(gh_sig, expected)

# GitLab: X-Gitlab-Token = <plain secret>
gl_token = request.headers.get("X-Gitlab-Token", "")
if gl_token:
return hmac.compare_digest(gl_token, secret)
return hmac.compare_digest(gl_token, secret_text)

# Generic: X-Webhook-Signature = <hex HMAC-SHA256>
generic_sig = request.headers.get("X-Webhook-Signature", "")
if generic_sig:
expected = hmac.new(
secret.encode(), body, hashlib.sha256
secret_text.encode(), body, hashlib.sha256
).hexdigest()
return hmac.compare_digest(generic_sig, expected)

Expand Down
1 change: 1 addition & 0 deletions gateway/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -5461,6 +5461,7 @@ def _is_user_authorized(self, source: SessionSource) -> bool:
# Check pairing store (always checked, regardless of allowlists)
platform_name = source.platform.value if source.platform else ""
auth_user_id = user_id
team_id = str(getattr(source, "guild_id", "") or "").strip()
if source.platform == Platform.WECOM_CALLBACK and source.chat_id:
auth_user_id = source.chat_id
pairing_check_ids = [auth_user_id]
Expand Down
65 changes: 65 additions & 0 deletions tests/tools/test_mcp_surrogate_json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""Regression tests for UTF-8-safe MCP JSON serialization."""

import asyncio
from types import SimpleNamespace


class _AsyncLock:
async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc, tb):
return False


def test_mcp_json_dumps_preserves_cjk_but_escapes_lone_surrogates():
from tools.mcp_tool import _mcp_json_dumps

payload = _mcp_json_dumps({"result": "中文\ud800"})

assert "中文" in payload
assert "\ud800" not in payload
assert "\\ud800" in payload
payload.encode("utf-8")


def test_mcp_tool_handler_returns_utf8_encodable_result_for_surrogate_text(monkeypatch):
import tools.mcp_tool as mcp_tool

class FakeSession:
async def call_tool(self, tool_name, arguments):
return SimpleNamespace(
isError=False,
content=[SimpleNamespace(text="tool output \ud800")],
)

fake_server = SimpleNamespace(session=FakeSession(), _rpc_lock=_AsyncLock())
monkeypatch.setitem(mcp_tool._servers, "surrogate-server", fake_server)
monkeypatch.setattr(
mcp_tool,
"_run_on_mcp_loop",
lambda coro, timeout=None: asyncio.run(coro),
)

handler = mcp_tool._make_tool_handler("surrogate-server", "demo", 1.0)
result = handler({})

assert "\ud800" not in result
assert "\\ud800" in result
result.encode("utf-8")


def test_sampling_tool_arguments_are_utf8_encodable():
from tools.mcp_tool import SamplingHandler

handler = SamplingHandler("surrogate-server", {})
tool_use = SimpleNamespace(name="demo", input={"value": "中文\ud800"}, id="call_1")
message = SimpleNamespace(role="assistant", content=[tool_use], content_as_list=[tool_use])

converted = handler._convert_messages(SimpleNamespace(messages=[message]))
arguments = converted[0]["tool_calls"][0]["function"]["arguments"]

assert "中文" in arguments
assert "\ud800" not in arguments
assert "\\ud800" in arguments
arguments.encode("utf-8")
69 changes: 47 additions & 22 deletions tools/mcp_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,31 @@
logger = logging.getLogger(__name__)


def _utf8_safe_text(text: str) -> str:
"""Return text that can always be encoded as UTF-8."""
return text.encode("utf-8", errors="backslashreplace").decode("utf-8")
Comment on lines +95 to +97
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To avoid unnecessary string allocations and encoding/decoding overhead for valid UTF-8 strings (which represent the vast majority of cases), we can perform a fast-path check using a try...except UnicodeEncodeError block. If the string is already valid UTF-8, we can return it directly.

Suggested change
def _utf8_safe_text(text: str) -> str:
"""Return text that can always be encoded as UTF-8."""
return text.encode("utf-8", errors="backslashreplace").decode("utf-8")
def _utf8_safe_text(text: str) -> str:
"""Return text that can always be encoded as UTF-8."""
try:
text.encode("utf-8")
return text
except UnicodeEncodeError:
return text.encode("utf-8", errors="backslashreplace").decode("utf-8")



def _sanitize_json_value(value: Any) -> Any:
"""Preserve valid Unicode while escaping lone surrogates in JSON data."""
if isinstance(value, str):
return _utf8_safe_text(value)
if isinstance(value, dict):
return {
_sanitize_json_value(key): _sanitize_json_value(item)
for key, item in value.items()
}
if isinstance(value, (list, tuple)):
return [_sanitize_json_value(item) for item in value]
return value


def _mcp_json_dumps(value: Any, **kwargs: Any) -> str:
"""Serialize MCP-controlled data without emitting invalid UTF-8 text."""
kwargs.setdefault("ensure_ascii", False)
return json.dumps(_sanitize_json_value(value), **kwargs)
Comment on lines +114 to +117
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Recursively traversing and copying the entire JSON object structure in _sanitize_json_value on every serialization call introduces significant CPU and memory overhead, especially for large payloads (e.g., file contents or resource lists). Since lone surrogates are extremely rare, we can optimize this by attempting a fast-path json.dumps first, and only falling back to sanitization if encoding the resulting payload fails with a UnicodeEncodeError.

Suggested change
def _mcp_json_dumps(value: Any, **kwargs: Any) -> str:
"""Serialize MCP-controlled data without emitting invalid UTF-8 text."""
kwargs.setdefault("ensure_ascii", False)
return json.dumps(_sanitize_json_value(value), **kwargs)
def _mcp_json_dumps(value: Any, **kwargs: Any) -> str:
"""Serialize MCP-controlled data without emitting invalid UTF-8 text."""
kwargs.setdefault("ensure_ascii", False)
payload = json.dumps(value, **kwargs)
try:
payload.encode("utf-8")
return payload
except UnicodeEncodeError:
return json.dumps(_sanitize_json_value(value), **kwargs)



# ---------------------------------------------------------------------------
# Stdio subprocess stderr redirection
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -669,7 +694,7 @@ def _convert_messages(self, params) -> List[dict]:
"type": "function",
"function": {
"name": tu.name,
"arguments": json.dumps(tu.input, ensure_ascii=False) if isinstance(tu.input, dict) else str(tu.input),
"arguments": _mcp_json_dumps(tu.input) if isinstance(tu.input, dict) else _utf8_safe_text(str(tu.input)),
},
})
msg_dict: dict = {"role": msg.role, "tool_calls": tc_list}
Expand Down Expand Up @@ -1820,7 +1845,7 @@ async def _recover():
# needs_reauth error. Bumps the circuit breaker so the model stops
# retrying the tool.
_bump_server_error(server_name)
return json.dumps({
return _mcp_json_dumps({
"error": (
f"MCP server '{server_name}' requires re-authentication. "
f"Run `hermes mcp login {server_name}` (or delete the tokens "
Expand Down Expand Up @@ -2080,7 +2105,7 @@ def _run_on_mcp_loop(coro, timeout: float = 30):

def _interrupted_call_result() -> str:
"""Standardized JSON error for a user-interrupted MCP tool call."""
return json.dumps({
return _mcp_json_dumps({
"error": "MCP call interrupted: user sent a new message"
}, ensure_ascii=False)

Expand Down Expand Up @@ -2178,7 +2203,7 @@ def _handler(args: dict, **kwargs) -> str:
age = time.monotonic() - opened_at
if age < _CIRCUIT_BREAKER_COOLDOWN_SEC:
remaining = max(1, int(_CIRCUIT_BREAKER_COOLDOWN_SEC - age))
return json.dumps({
return _mcp_json_dumps({
"error": (
f"MCP server '{server_name}' is unreachable after "
f"{_server_error_counts[server_name]} consecutive "
Expand All @@ -2193,7 +2218,7 @@ def _handler(args: dict, **kwargs) -> str:
server = _servers.get(server_name)
if not server or not server.session:
_bump_server_error(server_name)
return json.dumps({
return _mcp_json_dumps({
"error": f"MCP server '{server_name}' is not connected"
}, ensure_ascii=False)

Expand All @@ -2206,7 +2231,7 @@ async def _call():
for block in (result.content or []):
if hasattr(block, "text"):
error_text += block.text
return json.dumps({
return _mcp_json_dumps({
"error": _sanitize_error(
error_text or "MCP tool returned an error"
)
Expand Down Expand Up @@ -2240,12 +2265,12 @@ async def _call():
structured = getattr(result, "structuredContent", None)
if structured is not None:
if text_result:
return json.dumps({
return _mcp_json_dumps({
"result": text_result,
"structuredContent": structured,
}, ensure_ascii=False)
return json.dumps({"result": structured}, ensure_ascii=False)
return json.dumps({"result": text_result}, ensure_ascii=False)
return _mcp_json_dumps({"result": structured}, ensure_ascii=False)
return _mcp_json_dumps({"result": text_result}, ensure_ascii=False)

def _call_once():
return _run_on_mcp_loop(_call(), timeout=tool_timeout)
Expand Down Expand Up @@ -2290,7 +2315,7 @@ def _call_once():
"MCP tool %s/%s call failed: %s",
server_name, tool_name, exc,
)
return json.dumps({
return _mcp_json_dumps({
"error": _sanitize_error(
f"MCP call failed: {type(exc).__name__}: {_exc_str(exc)}"
)
Expand All @@ -2306,7 +2331,7 @@ def _handler(args: dict, **kwargs) -> str:
with _lock:
server = _servers.get(server_name)
if not server or not server.session:
return json.dumps({
return _mcp_json_dumps({
"error": f"MCP server '{server_name}' is not connected"
}, ensure_ascii=False)

Expand All @@ -2325,7 +2350,7 @@ async def _call():
if hasattr(r, "mimeType") and r.mimeType:
entry["mimeType"] = r.mimeType
resources.append(entry)
return json.dumps({"resources": resources}, ensure_ascii=False)
return _mcp_json_dumps({"resources": resources}, ensure_ascii=False)

def _call_once():
return _run_on_mcp_loop(_call(), timeout=tool_timeout)
Expand All @@ -2348,7 +2373,7 @@ def _call_once():
logger.error(
"MCP %s/list_resources failed: %s", server_name, exc,
)
return json.dumps({
return _mcp_json_dumps({
"error": _sanitize_error(
f"MCP call failed: {type(exc).__name__}: {_exc_str(exc)}"
)
Expand All @@ -2366,7 +2391,7 @@ def _handler(args: dict, **kwargs) -> str:
with _lock:
server = _servers.get(server_name)
if not server or not server.session:
return json.dumps({
return _mcp_json_dumps({
"error": f"MCP server '{server_name}' is not connected"
}, ensure_ascii=False)

Expand All @@ -2385,7 +2410,7 @@ async def _call():
parts.append(block.text)
elif hasattr(block, "blob"):
parts.append(f"[binary data, {len(block.blob)} bytes]")
return json.dumps({"result": "\n".join(parts) if parts else ""}, ensure_ascii=False)
return _mcp_json_dumps({"result": "\n".join(parts) if parts else ""}, ensure_ascii=False)

def _call_once():
return _run_on_mcp_loop(_call(), timeout=tool_timeout)
Expand All @@ -2408,7 +2433,7 @@ def _call_once():
logger.error(
"MCP %s/read_resource failed: %s", server_name, exc,
)
return json.dumps({
return _mcp_json_dumps({
"error": _sanitize_error(
f"MCP call failed: {type(exc).__name__}: {_exc_str(exc)}"
)
Expand All @@ -2424,7 +2449,7 @@ def _handler(args: dict, **kwargs) -> str:
with _lock:
server = _servers.get(server_name)
if not server or not server.session:
return json.dumps({
return _mcp_json_dumps({
"error": f"MCP server '{server_name}' is not connected"
}, ensure_ascii=False)

Expand All @@ -2448,7 +2473,7 @@ async def _call():
for a in p.arguments
]
prompts.append(entry)
return json.dumps({"prompts": prompts}, ensure_ascii=False)
return _mcp_json_dumps({"prompts": prompts}, ensure_ascii=False)

def _call_once():
return _run_on_mcp_loop(_call(), timeout=tool_timeout)
Expand All @@ -2471,7 +2496,7 @@ def _call_once():
logger.error(
"MCP %s/list_prompts failed: %s", server_name, exc,
)
return json.dumps({
return _mcp_json_dumps({
"error": _sanitize_error(
f"MCP call failed: {type(exc).__name__}: {_exc_str(exc)}"
)
Expand All @@ -2489,7 +2514,7 @@ def _handler(args: dict, **kwargs) -> str:
with _lock:
server = _servers.get(server_name)
if not server or not server.session:
return json.dumps({
return _mcp_json_dumps({
"error": f"MCP server '{server_name}' is not connected"
}, ensure_ascii=False)

Expand Down Expand Up @@ -2519,7 +2544,7 @@ async def _call():
resp = {"messages": messages}
if hasattr(result, "description") and result.description:
resp["description"] = result.description
return json.dumps(resp, ensure_ascii=False)
return _mcp_json_dumps(resp, ensure_ascii=False)

def _call_once():
return _run_on_mcp_loop(_call(), timeout=tool_timeout)
Expand All @@ -2542,7 +2567,7 @@ def _call_once():
logger.error(
"MCP %s/get_prompt failed: %s", server_name, exc,
)
return json.dumps({
return _mcp_json_dumps({
"error": _sanitize_error(
f"MCP call failed: {type(exc).__name__}: {_exc_str(exc)}"
)
Expand Down
Loading