From 405ad994bef68093b7c401df95778ad0ad60a316 Mon Sep 17 00:00:00 2001 From: Frank Graziano <7840487+frankgraziano@users.noreply.github.com> Date: Mon, 23 Mar 2026 13:38:19 -0400 Subject: [PATCH 1/4] Add MCP ecosystem enum and scanner infrastructure Introduce MCP (Model Context Protocol) as a first-class ecosystem in guarddog. This adds the ECOSYSTEM.MCP enum value, config file discovery for 11 MCP clients (Claude Desktop, Claude Code, Cursor, VS Code, Windsurf, Cline, Roo Code, Continue, Codex, Gemini CLI, Copilot CLI), a normalized data model (MCPServerConfig / MCPConfigFile / MCPInventory), and two scanner classes: MCPConfigScanner (package scanner) and MCPDiscoveryScanner (project scanner). Discovery scopes recursive globs to known VS Code extension and config directories to avoid expensive walks over the entire home directory. Co-Authored-By: Claude Opus 4.6 --- guarddog/ecosystems.py | 3 + guarddog/scanners/__init__.py | 14 +- guarddog/scanners/mcp/__init__.py | 7 + guarddog/scanners/mcp/discovery.py | 156 ++++++++++++++++ guarddog/scanners/mcp/models.py | 65 +++++++ guarddog/scanners/mcp/parsers/__init__.py | 27 +++ guarddog/scanners/mcp/parsers/base.py | 172 ++++++++++++++++++ guarddog/scanners/mcp/parsers/claude_code.py | 43 +++++ .../scanners/mcp/parsers/claude_desktop.py | 42 +++++ guarddog/scanners/mcp/parsers/cline.py | 42 +++++ guarddog/scanners/mcp/parsers/codex.py | 43 +++++ guarddog/scanners/mcp/parsers/continue_dev.py | 71 ++++++++ guarddog/scanners/mcp/parsers/copilot_cli.py | 43 +++++ guarddog/scanners/mcp/parsers/cursor.py | 43 +++++ guarddog/scanners/mcp/parsers/gemini_cli.py | 47 +++++ guarddog/scanners/mcp/parsers/roo_code.py | 45 +++++ guarddog/scanners/mcp/parsers/vscode.py | 43 +++++ guarddog/scanners/mcp/parsers/windsurf.py | 43 +++++ guarddog/scanners/mcp_config_scanner.py | 71 ++++++++ guarddog/scanners/mcp_project_scanner.py | 91 +++++++++ 20 files changed, 1105 insertions(+), 6 deletions(-) create mode 100644 guarddog/scanners/mcp/__init__.py create mode 100644 guarddog/scanners/mcp/discovery.py create mode 100644 guarddog/scanners/mcp/models.py create mode 100644 guarddog/scanners/mcp/parsers/__init__.py create mode 100644 guarddog/scanners/mcp/parsers/base.py create mode 100644 guarddog/scanners/mcp/parsers/claude_code.py create mode 100644 guarddog/scanners/mcp/parsers/claude_desktop.py create mode 100644 guarddog/scanners/mcp/parsers/cline.py create mode 100644 guarddog/scanners/mcp/parsers/codex.py create mode 100644 guarddog/scanners/mcp/parsers/continue_dev.py create mode 100644 guarddog/scanners/mcp/parsers/copilot_cli.py create mode 100644 guarddog/scanners/mcp/parsers/cursor.py create mode 100644 guarddog/scanners/mcp/parsers/gemini_cli.py create mode 100644 guarddog/scanners/mcp/parsers/roo_code.py create mode 100644 guarddog/scanners/mcp/parsers/vscode.py create mode 100644 guarddog/scanners/mcp/parsers/windsurf.py create mode 100644 guarddog/scanners/mcp_config_scanner.py create mode 100644 guarddog/scanners/mcp_project_scanner.py diff --git a/guarddog/ecosystems.py b/guarddog/ecosystems.py index 66ba5e558..bfc3ac216 100644 --- a/guarddog/ecosystems.py +++ b/guarddog/ecosystems.py @@ -8,6 +8,7 @@ class ECOSYSTEM(Enum): GITHUB_ACTION = "github-action" EXTENSION = "extension" RUBYGEMS = "rubygems" + MCP = "mcp" def get_friendly_name(ecosystem: ECOSYSTEM) -> str: @@ -24,5 +25,7 @@ def get_friendly_name(ecosystem: ECOSYSTEM) -> str: return "Extension" case ECOSYSTEM.RUBYGEMS: return "RubyGems" + case ECOSYSTEM.MCP: + return "MCP" case _: return ecosystem.value diff --git a/guarddog/scanners/__init__.py b/guarddog/scanners/__init__.py index 29747a20b..5ecccb02c 100644 --- a/guarddog/scanners/__init__.py +++ b/guarddog/scanners/__init__.py @@ -11,21 +11,21 @@ from .extension_scanner import ExtensionScanner from .rubygems_package_scanner import RubyGemsPackageScanner from .rubygems_project_scanner import RubyGemsRequirementsScanner +from .mcp_config_scanner import MCPConfigScanner +from .mcp_project_scanner import MCPDiscoveryScanner from .scanner import PackageScanner, ProjectScanner from ..ecosystems import ECOSYSTEM def get_package_scanner(ecosystem: ECOSYSTEM) -> Optional[PackageScanner]: """ - Return a `PackageScanner` for the given ecosystem or `None` if it - is not yet supported. + Return a `PackageScanner` for the given ecosystem or `None` if it is not yet supported. Args: ecosystem (ECOSYSTEM): The ecosystem of the desired scanner Returns: Optional[PackageScanner]: The result of the scanner request - """ match ecosystem: case ECOSYSTEM.PYPI: @@ -40,20 +40,20 @@ def get_package_scanner(ecosystem: ECOSYSTEM) -> Optional[PackageScanner]: return ExtensionScanner() case ECOSYSTEM.RUBYGEMS: return RubyGemsPackageScanner() + case ECOSYSTEM.MCP: + return MCPConfigScanner() return None def get_project_scanner(ecosystem: ECOSYSTEM) -> Optional[ProjectScanner]: """ - Return a `ProjectScanner` for the given ecosystem or `None` if - it is not yet supported. + Return a `ProjectScanner` for the given ecosystem or `None` if it is not yet supported. Args: ecosystem (ECOSYSTEM): The ecosystem of the desired scanner Returns: Optional[ProjectScanner]: The result of the scanner request - """ match ecosystem: case ECOSYSTEM.PYPI: @@ -68,4 +68,6 @@ def get_project_scanner(ecosystem: ECOSYSTEM) -> Optional[ProjectScanner]: return None # we're not including dependency scanning for this PR case ECOSYSTEM.RUBYGEMS: return RubyGemsRequirementsScanner() + case ECOSYSTEM.MCP: + return MCPDiscoveryScanner() return None diff --git a/guarddog/scanners/mcp/__init__.py b/guarddog/scanners/mcp/__init__.py new file mode 100644 index 000000000..b03a59ac8 --- /dev/null +++ b/guarddog/scanners/mcp/__init__.py @@ -0,0 +1,7 @@ +from .models import MCPConfigFile, MCPInventory, MCPServerConfig + +__all__ = [ + "MCPConfigFile", + "MCPInventory", + "MCPServerConfig", +] diff --git a/guarddog/scanners/mcp/discovery.py b/guarddog/scanners/mcp/discovery.py new file mode 100644 index 000000000..d3cac5d54 --- /dev/null +++ b/guarddog/scanners/mcp/discovery.py @@ -0,0 +1,156 @@ +from __future__ import annotations + +import logging +import os +from pathlib import Path + +from guarddog.scanners.mcp.models import MCPConfigFile, MCPInventory +from guarddog.scanners.mcp.parsers import ( + ClaudeCodeParser, + ClaudeDesktopParser, + ClineParser, + CodexParser, + ContinueParser, + CopilotCLIParser, + CursorParser, + GeminiCLIParser, + RooCodeParser, + VSCodeParser, + WindsurfParser, +) + +log = logging.getLogger("guarddog") + + +PARSERS = [ + ClaudeDesktopParser(), + ClaudeCodeParser(), + CursorParser(), + VSCodeParser(), + WindsurfParser(), + ClineParser(), + RooCodeParser(), + ContinueParser(), + CodexParser(), + GeminiCLIParser(), + CopilotCLIParser(), +] + + +def _candidate_paths(root: str) -> list[str]: + root_path = Path(root) + candidates: set[str] = set() + + if root_path.is_file(): + return [str(root_path.resolve())] + + # Project/workspace candidates + project_patterns = [ + ".mcp.json", + ".claude.json", + ".cursor/mcp.json", + ".vscode/mcp.json", + ".roo/mcp.json", + ".gemini/settings.json", + ".continue/mcpServers/*.json", + ".continue/mcpServers/*.yaml", + ".continue/mcpServers/*.yml", + ] + for pattern in project_patterns: + candidates.update(str(p.resolve()) for p in root_path.glob(pattern)) + + # User config candidates only when scanning home-ish paths + home = Path.home() + resolved_root = str(root_path.resolve()) + resolved_home = str(home.resolve()) + scan_user_space = ( + resolved_root == resolved_home + or resolved_root.startswith(resolved_home + os.sep) + ) + if scan_user_space: + user_candidates = [ + home / "Library" / "Application Support" / "Claude" / "claude_desktop_config.json", + home / ".claude.json", + home / ".cursor" / "mcp.json", + home / ".codeium" / "windsurf" / "mcp_config.json", + home / ".codex" / "config.toml", + home / ".gemini" / "settings.json", + home / ".copilot" / "mcp-config.json", + ] + for candidate in user_candidates: + if candidate.exists(): + candidates.add(str(candidate.resolve())) + + # Cline / Roo Code / Windsurf settings in known VS Code-style dirs. + # Avoid expensive ** globs over all of $HOME; instead target the + # well-known extension-host directories where globalStorage lives. + _vscode_dirs = [ + home / ".vscode" / "extensions", + home / ".vscode-server" / "extensions", + home / ".cursor" / "extensions", + ] + _config_dirs = [ + home / ".config", + home / "AppData" / "Roaming", + ] + for d in _config_dirs: + if d.is_dir(): + for p in d.glob("**/cline_mcp_settings.json"): + if p.is_file(): + candidates.add(str(p.resolve())) + for p in d.glob("**/mcp_settings.json"): + if p.is_file(): + candidates.add(str(p.resolve())) + for d in _vscode_dirs: + if d.is_dir(): + for p in d.glob("**/globalStorage/**/cline_mcp_settings.json"): + if p.is_file(): + candidates.add(str(p.resolve())) + for p in d.glob("**/globalStorage/**/mcp_settings.json"): + if p.is_file(): + candidates.add(str(p.resolve())) + + return sorted(candidates) + + +def parse_mcp_config_file(path: str) -> MCPConfigFile | None: + for parser in PARSERS: + if parser.matches(path): + try: + return parser.parse(path) + except Exception as exc: + log.debug("Failed to parse %s with %s: %s", path, parser.client_name, exc) + return None + return None + + +def discover_mcp_configs(path: str) -> list[str]: + candidates = _candidate_paths(path) + log.info("Discovering MCP configs under %s ...", path) + log.info("Found %d candidate config file(s)", len(candidates)) + for c in candidates: + log.debug(" candidate: %s", c) + return candidates + + +def discover_and_parse_mcp_configs(path: str) -> MCPInventory: + config_files: list[MCPConfigFile] = [] + + for candidate in discover_mcp_configs(path): + parsed = parse_mcp_config_file(candidate) + if parsed is not None: + server_count = len(parsed.servers) + log.info( + "Parsed %s (%s, %d server(s))", + candidate, parsed.client, server_count, + ) + config_files.append(parsed) + else: + log.debug("Skipped %s (no matching parser or parse error)", candidate) + + total_servers = sum(len(cf.servers) for cf in config_files) + log.info( + "Discovery complete: %d config file(s), %d server(s) total", + len(config_files), total_servers, + ) + return MCPInventory(config_files=config_files) diff --git a/guarddog/scanners/mcp/models.py b/guarddog/scanners/mcp/models.py new file mode 100644 index 000000000..bb2d4d602 --- /dev/null +++ b/guarddog/scanners/mcp/models.py @@ -0,0 +1,65 @@ +from __future__ import annotations + +from dataclasses import asdict, dataclass, field +from typing import Any + + +@dataclass +class MCPServerConfig: + client: str + scope: str + source_path: str + server_name: str + transport: str = "unknown" + command: str | None = None + args: list[str] = field(default_factory=list) + url: str | None = None + env: dict[str, str | None] = field(default_factory=dict) + cwd: str | None = None + headers: dict[str, str | None] = field(default_factory=dict) + annotations: dict[str, Any] = field(default_factory=dict) + trust: dict[str, Any] = field(default_factory=dict) + raw: dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> dict[str, Any]: + return asdict(self) + + +@dataclass +class MCPConfigFile: + file_path: str + client: str + scope: str + servers: list[MCPServerConfig] = field(default_factory=list) + + def to_dict(self) -> dict[str, Any]: + return { + "file_path": self.file_path, + "client": self.client, + "scope": self.scope, + "servers": [server.to_dict() for server in self.servers], + } + + +@dataclass +class MCPInventory: + config_files: list[MCPConfigFile] = field(default_factory=list) + + @property + def servers(self) -> list[MCPServerConfig]: + return [ + server + for config_file in self.config_files + for server in config_file.servers + ] + + def to_dict(self) -> dict[str, Any]: + config_dicts = [config_file.to_dict() for config_file in self.config_files] + # Reuse the already-serialised server dicts instead of serialising twice. + all_servers = [ + srv for cf in config_dicts for srv in cf.get("servers", []) + ] + return { + "config_files": config_dicts, + "servers": all_servers, + } diff --git a/guarddog/scanners/mcp/parsers/__init__.py b/guarddog/scanners/mcp/parsers/__init__.py new file mode 100644 index 000000000..e90bcf55f --- /dev/null +++ b/guarddog/scanners/mcp/parsers/__init__.py @@ -0,0 +1,27 @@ +from .base import MCPConfigParser +from .claude_desktop import ClaudeDesktopParser +from .claude_code import ClaudeCodeParser +from .cursor import CursorParser +from .vscode import VSCodeParser +from .windsurf import WindsurfParser +from .cline import ClineParser +from .roo_code import RooCodeParser +from .continue_dev import ContinueParser +from .codex import CodexParser +from .gemini_cli import GeminiCLIParser +from .copilot_cli import CopilotCLIParser + +__all__ = [ + "MCPConfigParser", + "ClaudeDesktopParser", + "ClaudeCodeParser", + "CursorParser", + "VSCodeParser", + "WindsurfParser", + "ClineParser", + "RooCodeParser", + "ContinueParser", + "CodexParser", + "GeminiCLIParser", + "CopilotCLIParser", +] diff --git a/guarddog/scanners/mcp/parsers/base.py b/guarddog/scanners/mcp/parsers/base.py new file mode 100644 index 000000000..01534bc91 --- /dev/null +++ b/guarddog/scanners/mcp/parsers/base.py @@ -0,0 +1,172 @@ +from __future__ import annotations + +import json +import os +from abc import ABC, abstractmethod +from pathlib import Path +from typing import Any + +from guarddog.scanners.mcp.models import MCPConfigFile, MCPServerConfig + +try: + import yaml # type: ignore +except Exception: # pragma: no cover + yaml = None + + +class MCPConfigParser(ABC): + client_name = "unknown" + + @abstractmethod + def matches(self, path: str) -> bool: + raise NotImplementedError + + @abstractmethod + def parse(self, path: str) -> MCPConfigFile: + raise NotImplementedError + + def _read_text(self, path: str) -> str: + return Path(path).read_text(encoding="utf-8") + + def _load_json(self, path: str) -> dict[str, Any]: + data = json.loads(self._read_text(path)) + if not isinstance(data, dict): + raise ValueError(f"Expected JSON object in {path}") + return data + + def _load_toml(self, path: str) -> dict[str, Any]: + with open(path, "rb") as f: + data = __import__("tomllib").load(f) + if not isinstance(data, dict): + raise ValueError(f"Expected TOML object in {path}") + return data + + def _load_yaml(self, path: str) -> dict[str, Any]: + if yaml is None: + raise RuntimeError( + "PyYAML is required to parse YAML MCP configs but is not installed" + ) + data = yaml.safe_load(self._read_text(path)) or {} + if not isinstance(data, dict): + raise ValueError(f"Expected YAML object in {path}") + return data + + def _normalize_env(self, value: Any) -> dict[str, str | None]: + if not isinstance(value, dict): + return {} + + result: dict[str, str | None] = {} + for key, env_value in value.items(): + if env_value is None: + result[str(key)] = None + elif isinstance(env_value, (str, int, float, bool)): + result[str(key)] = str(env_value) + else: + result[str(key)] = json.dumps(env_value, sort_keys=True) + return result + + def _normalize_headers(self, value: Any) -> dict[str, str | None]: + return self._normalize_env(value) + + def _normalize_args(self, value: Any) -> list[str]: + if value is None: + return [] + if isinstance(value, list): + return [str(v) for v in value] + if isinstance(value, str): + return [value] + return [str(value)] + + def _infer_transport( + self, + *, + command: str | None = None, + url: str | None = None, + transport: str | None = None, + ) -> str: + if transport: + normalized = str(transport).strip().lower() + if normalized in {"stdio", "http", "https", "sse", "streamable-http"}: + return "http" if normalized == "https" else normalized + return normalized + + if url: + lower_url = url.lower() + if lower_url.startswith(("http://", "https://")): + return "http" + if lower_url.startswith("sse://"): + return "sse" + + if command: + return "stdio" + + return "unknown" + + def _scope_from_path(self, path: str) -> str: + normalized = path.replace("\\", "/").lower() + project_markers = [ + "/.vscode/", + "/.cursor/", + "/.continue/", + "/.roo/", + "/.gemini/", + "/.mcp.json", + ] + if any(marker in normalized for marker in project_markers): + return "project" + home = str(Path.home()).replace("\\", "/").lower() + if normalized.startswith(home): + return "user" + return "unknown" + + def _make_server( + self, + *, + source_path: str, + server_name: str, + command: str | None = None, + args: Any = None, + url: str | None = None, + env: Any = None, + cwd: str | None = None, + headers: Any = None, + transport: str | None = None, + annotations: dict[str, Any] | None = None, + trust: dict[str, Any] | None = None, + raw: dict[str, Any] | None = None, + scope: str | None = None, + ) -> MCPServerConfig: + normalized_scope = scope or self._scope_from_path(source_path) + return MCPServerConfig( + client=self.client_name, + scope=normalized_scope, + source_path=source_path, + server_name=server_name, + transport=self._infer_transport( + command=command, + url=url, + transport=transport, + ), + command=command, + args=self._normalize_args(args), + url=url, + env=self._normalize_env(env), + cwd=cwd, + headers=self._normalize_headers(headers), + annotations=annotations or {}, + trust=trust or {}, + raw=raw or {}, + ) + + def _make_config_file( + self, + path: str, + servers: list[MCPServerConfig], + scope: str | None = None, + ) -> MCPConfigFile: + return MCPConfigFile( + file_path=os.path.abspath(path), + client=self.client_name, + scope=scope or self._scope_from_path(path), + servers=servers, + ) diff --git a/guarddog/scanners/mcp/parsers/claude_code.py b/guarddog/scanners/mcp/parsers/claude_code.py new file mode 100644 index 000000000..4d9cf2767 --- /dev/null +++ b/guarddog/scanners/mcp/parsers/claude_code.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +import os + +from guarddog.scanners.mcp.models import MCPConfigFile +from guarddog.scanners.mcp.parsers.base import MCPConfigParser + + +class ClaudeCodeParser(MCPConfigParser): + client_name = "claude_code" + + def matches(self, path: str) -> bool: + normalized = path.replace("\\", "/").lower() + return normalized.endswith("/.claude.json") or normalized.endswith("/.mcp.json") + + def parse(self, path: str) -> MCPConfigFile: + data = self._load_json(path) + servers_obj = data.get("mcpServers", data.get("mcp_servers", {})) + servers = [] + + if isinstance(servers_obj, dict): + for server_name, server_cfg in servers_obj.items(): + if not isinstance(server_cfg, dict): + continue + servers.append( + self._make_server( + source_path=os.path.abspath(path), + server_name=str(server_name), + command=server_cfg.get("command"), + args=server_cfg.get("args"), + url=server_cfg.get("url"), + env=server_cfg.get("env"), + cwd=server_cfg.get("cwd"), + headers=server_cfg.get("headers"), + transport=server_cfg.get("transport"), + annotations=server_cfg.get("annotations"), + trust=server_cfg.get("trust"), + raw=server_cfg, + ) + ) + + scope = "project" if path.replace("\\", "/").lower().endswith("/.mcp.json") else "user" + return self._make_config_file(path, servers, scope=scope) diff --git a/guarddog/scanners/mcp/parsers/claude_desktop.py b/guarddog/scanners/mcp/parsers/claude_desktop.py new file mode 100644 index 000000000..bfb874ca3 --- /dev/null +++ b/guarddog/scanners/mcp/parsers/claude_desktop.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +import os + +from guarddog.scanners.mcp.models import MCPConfigFile +from guarddog.scanners.mcp.parsers.base import MCPConfigParser + + +class ClaudeDesktopParser(MCPConfigParser): + client_name = "claude_desktop" + + def matches(self, path: str) -> bool: + normalized = path.replace("\\", "/").lower() + return normalized.endswith("claude_desktop_config.json") + + def parse(self, path: str) -> MCPConfigFile: + data = self._load_json(path) + servers_obj = data.get("mcpServers", {}) + servers = [] + + if isinstance(servers_obj, dict): + for server_name, server_cfg in servers_obj.items(): + if not isinstance(server_cfg, dict): + continue + servers.append( + self._make_server( + source_path=os.path.abspath(path), + server_name=str(server_name), + command=server_cfg.get("command"), + args=server_cfg.get("args"), + url=server_cfg.get("url"), + env=server_cfg.get("env"), + cwd=server_cfg.get("cwd"), + headers=server_cfg.get("headers"), + transport=server_cfg.get("transport"), + annotations=server_cfg.get("annotations"), + trust=server_cfg.get("trust"), + raw=server_cfg, + ) + ) + + return self._make_config_file(path, servers, scope="user") diff --git a/guarddog/scanners/mcp/parsers/cline.py b/guarddog/scanners/mcp/parsers/cline.py new file mode 100644 index 000000000..21d792a99 --- /dev/null +++ b/guarddog/scanners/mcp/parsers/cline.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +import os + +from guarddog.scanners.mcp.models import MCPConfigFile +from guarddog.scanners.mcp.parsers.base import MCPConfigParser + + +class ClineParser(MCPConfigParser): + client_name = "cline" + + def matches(self, path: str) -> bool: + return path.replace("\\", "/").lower().endswith("cline_mcp_settings.json") + + def parse(self, path: str) -> MCPConfigFile: + data = self._load_json(path) + servers_obj = data.get("mcpServers", data.get("servers", {})) + servers = [] + + if isinstance(servers_obj, dict): + for server_name, server_cfg in servers_obj.items(): + if not isinstance(server_cfg, dict): + continue + servers.append( + self._make_server( + source_path=os.path.abspath(path), + server_name=str(server_name), + command=server_cfg.get("command"), + args=server_cfg.get("args"), + url=server_cfg.get("url"), + env=server_cfg.get("env"), + cwd=server_cfg.get("cwd"), + headers=server_cfg.get("headers"), + transport=server_cfg.get("transport"), + annotations=server_cfg.get("annotations"), + trust=server_cfg.get("trust"), + raw=server_cfg, + scope="user", + ) + ) + + return self._make_config_file(path, servers, scope="user") diff --git a/guarddog/scanners/mcp/parsers/codex.py b/guarddog/scanners/mcp/parsers/codex.py new file mode 100644 index 000000000..9693dedb4 --- /dev/null +++ b/guarddog/scanners/mcp/parsers/codex.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +import os + +from guarddog.scanners.mcp.models import MCPConfigFile +from guarddog.scanners.mcp.parsers.base import MCPConfigParser + + +class CodexParser(MCPConfigParser): + client_name = "codex" + + def matches(self, path: str) -> bool: + normalized = path.replace("\\", "/").lower() + return normalized.endswith("/.codex/config.toml") + + def parse(self, path: str) -> MCPConfigFile: + data = self._load_toml(path) + servers = [] + + mcp_servers = data.get("mcp_servers", {}) + if isinstance(mcp_servers, dict): + for server_name, server_cfg in mcp_servers.items(): + if not isinstance(server_cfg, dict): + continue + servers.append( + self._make_server( + source_path=os.path.abspath(path), + server_name=str(server_name), + command=server_cfg.get("command"), + args=server_cfg.get("args"), + url=server_cfg.get("url"), + env=server_cfg.get("env"), + cwd=server_cfg.get("cwd"), + headers=server_cfg.get("headers"), + transport=server_cfg.get("transport"), + annotations=server_cfg.get("annotations"), + trust=server_cfg.get("trust"), + raw=server_cfg, + scope="user", + ) + ) + + return self._make_config_file(path, servers, scope="user") diff --git a/guarddog/scanners/mcp/parsers/continue_dev.py b/guarddog/scanners/mcp/parsers/continue_dev.py new file mode 100644 index 000000000..d706a09d8 --- /dev/null +++ b/guarddog/scanners/mcp/parsers/continue_dev.py @@ -0,0 +1,71 @@ +from __future__ import annotations + +import os +from pathlib import Path + +from guarddog.scanners.mcp.models import MCPConfigFile +from guarddog.scanners.mcp.parsers.base import MCPConfigParser + + +class ContinueParser(MCPConfigParser): + client_name = "continue" + + def matches(self, path: str) -> bool: + normalized = path.replace("\\", "/").lower() + return "/.continue/mcpservers/" in normalized and normalized.endswith((".json", ".yaml", ".yml")) + + def parse(self, path: str) -> MCPConfigFile: + suffix = Path(path).suffix.lower() + if suffix == ".json": + data = self._load_json(path) + else: + data = self._load_yaml(path) + + servers = [] + + if "mcpServers" in data and isinstance(data["mcpServers"], dict): + for server_name, server_cfg in data["mcpServers"].items(): + if not isinstance(server_cfg, dict): + continue + servers.append( + self._make_server( + source_path=os.path.abspath(path), + server_name=str(server_name), + command=server_cfg.get("command"), + args=server_cfg.get("args"), + url=server_cfg.get("url"), + env=server_cfg.get("env"), + cwd=server_cfg.get("cwd"), + headers=server_cfg.get("headers"), + transport=server_cfg.get("transport"), + annotations=server_cfg.get("annotations"), + trust=server_cfg.get("trust"), + raw=server_cfg, + scope="project", + ) + ) + else: + server_name = ( + data.get("name") + or data.get("server") + or Path(path).stem + ) + servers.append( + self._make_server( + source_path=os.path.abspath(path), + server_name=str(server_name), + command=data.get("command"), + args=data.get("args"), + url=data.get("url"), + env=data.get("env"), + cwd=data.get("cwd"), + headers=data.get("headers"), + transport=data.get("transport"), + annotations=data.get("annotations"), + trust=data.get("trust"), + raw=data, + scope="project", + ) + ) + + return self._make_config_file(path, servers, scope="project") diff --git a/guarddog/scanners/mcp/parsers/copilot_cli.py b/guarddog/scanners/mcp/parsers/copilot_cli.py new file mode 100644 index 000000000..a4dce672a --- /dev/null +++ b/guarddog/scanners/mcp/parsers/copilot_cli.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +import os + +from guarddog.scanners.mcp.models import MCPConfigFile +from guarddog.scanners.mcp.parsers.base import MCPConfigParser + + +class CopilotCLIParser(MCPConfigParser): + client_name = "copilot_cli" + + def matches(self, path: str) -> bool: + normalized = path.replace("\\", "/").lower() + return normalized.endswith("/.copilot/mcp-config.json") + + def parse(self, path: str) -> MCPConfigFile: + data = self._load_json(path) + servers_obj = data.get("mcpServers", data.get("servers", {})) + servers = [] + + if isinstance(servers_obj, dict): + for server_name, server_cfg in servers_obj.items(): + if not isinstance(server_cfg, dict): + continue + servers.append( + self._make_server( + source_path=os.path.abspath(path), + server_name=str(server_name), + command=server_cfg.get("command"), + args=server_cfg.get("args"), + url=server_cfg.get("url"), + env=server_cfg.get("env"), + cwd=server_cfg.get("cwd"), + headers=server_cfg.get("headers"), + transport=server_cfg.get("transport"), + annotations=server_cfg.get("annotations"), + trust=server_cfg.get("trust"), + raw=server_cfg, + scope="user", + ) + ) + + return self._make_config_file(path, servers, scope="user") diff --git a/guarddog/scanners/mcp/parsers/cursor.py b/guarddog/scanners/mcp/parsers/cursor.py new file mode 100644 index 000000000..66a52cade --- /dev/null +++ b/guarddog/scanners/mcp/parsers/cursor.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +import os + +from guarddog.scanners.mcp.models import MCPConfigFile +from guarddog.scanners.mcp.parsers.base import MCPConfigParser + + +class CursorParser(MCPConfigParser): + client_name = "cursor" + + def matches(self, path: str) -> bool: + normalized = path.replace("\\", "/").lower() + return normalized.endswith("/.cursor/mcp.json") + + def parse(self, path: str) -> MCPConfigFile: + data = self._load_json(path) + servers_obj = data.get("mcpServers", {}) + servers = [] + + if isinstance(servers_obj, dict): + for server_name, server_cfg in servers_obj.items(): + if not isinstance(server_cfg, dict): + continue + servers.append( + self._make_server( + source_path=os.path.abspath(path), + server_name=str(server_name), + command=server_cfg.get("command"), + args=server_cfg.get("args"), + url=server_cfg.get("url"), + env=server_cfg.get("env"), + cwd=server_cfg.get("cwd"), + headers=server_cfg.get("headers"), + transport=server_cfg.get("transport"), + annotations=server_cfg.get("annotations"), + trust=server_cfg.get("trust"), + raw=server_cfg, + scope="project", + ) + ) + + return self._make_config_file(path, servers, scope="project") diff --git a/guarddog/scanners/mcp/parsers/gemini_cli.py b/guarddog/scanners/mcp/parsers/gemini_cli.py new file mode 100644 index 000000000..fcabeb3f5 --- /dev/null +++ b/guarddog/scanners/mcp/parsers/gemini_cli.py @@ -0,0 +1,47 @@ +from __future__ import annotations + +import os + +from guarddog.scanners.mcp.models import MCPConfigFile +from guarddog.scanners.mcp.parsers.base import MCPConfigParser + + +class GeminiCLIParser(MCPConfigParser): + client_name = "gemini_cli" + + def matches(self, path: str) -> bool: + normalized = path.replace("\\", "/").lower() + return normalized.endswith("/.gemini/settings.json") + + def parse(self, path: str) -> MCPConfigFile: + data = self._load_json(path) + servers_obj = data.get("mcpServers", data.get("mcp_servers", {})) + servers = [] + + if isinstance(servers_obj, dict): + for server_name, server_cfg in servers_obj.items(): + if not isinstance(server_cfg, dict): + continue + scope = "project" if "/.gemini/" in path.replace("\\", "/").lower() and not path.startswith(str(os.path.expanduser("~"))) else "user" + servers.append( + self._make_server( + source_path=os.path.abspath(path), + server_name=str(server_name), + command=server_cfg.get("command"), + args=server_cfg.get("args"), + url=server_cfg.get("url"), + env=server_cfg.get("env"), + cwd=server_cfg.get("cwd"), + headers=server_cfg.get("headers"), + transport=server_cfg.get("transport"), + annotations=server_cfg.get("annotations"), + trust=server_cfg.get("trust"), + raw=server_cfg, + scope=scope, + ) + ) + + normalized = os.path.abspath(path).replace("\\", "/").lower() + home = os.path.expanduser("~").replace("\\", "/").lower() + scope = "user" if normalized.startswith(home) and normalized.endswith("/.gemini/settings.json") else "project" + return self._make_config_file(path, servers, scope=scope) diff --git a/guarddog/scanners/mcp/parsers/roo_code.py b/guarddog/scanners/mcp/parsers/roo_code.py new file mode 100644 index 000000000..e52d22410 --- /dev/null +++ b/guarddog/scanners/mcp/parsers/roo_code.py @@ -0,0 +1,45 @@ +from __future__ import annotations + +import os + +from guarddog.scanners.mcp.models import MCPConfigFile +from guarddog.scanners.mcp.parsers.base import MCPConfigParser + + +class RooCodeParser(MCPConfigParser): + client_name = "roo_code" + + def matches(self, path: str) -> bool: + normalized = path.replace("\\", "/").lower() + return normalized.endswith("mcp_settings.json") or normalized.endswith("/.roo/mcp.json") + + def parse(self, path: str) -> MCPConfigFile: + data = self._load_json(path) + servers_obj = data.get("mcpServers", data.get("servers", {})) + servers = [] + + if isinstance(servers_obj, dict): + for server_name, server_cfg in servers_obj.items(): + if not isinstance(server_cfg, dict): + continue + scope = "project" if path.replace("\\", "/").lower().endswith("/.roo/mcp.json") else "user" + servers.append( + self._make_server( + source_path=os.path.abspath(path), + server_name=str(server_name), + command=server_cfg.get("command"), + args=server_cfg.get("args"), + url=server_cfg.get("url"), + env=server_cfg.get("env"), + cwd=server_cfg.get("cwd"), + headers=server_cfg.get("headers"), + transport=server_cfg.get("transport"), + annotations=server_cfg.get("annotations"), + trust=server_cfg.get("trust"), + raw=server_cfg, + scope=scope, + ) + ) + + scope = "project" if path.replace("\\", "/").lower().endswith("/.roo/mcp.json") else "user" + return self._make_config_file(path, servers, scope=scope) diff --git a/guarddog/scanners/mcp/parsers/vscode.py b/guarddog/scanners/mcp/parsers/vscode.py new file mode 100644 index 000000000..6fe76f93e --- /dev/null +++ b/guarddog/scanners/mcp/parsers/vscode.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +import os + +from guarddog.scanners.mcp.models import MCPConfigFile +from guarddog.scanners.mcp.parsers.base import MCPConfigParser + + +class VSCodeParser(MCPConfigParser): + client_name = "vscode" + + def matches(self, path: str) -> bool: + normalized = path.replace("\\", "/").lower() + return normalized.endswith("/.vscode/mcp.json") + + def parse(self, path: str) -> MCPConfigFile: + data = self._load_json(path) + servers_obj = data.get("servers", data.get("mcpServers", {})) + servers = [] + + if isinstance(servers_obj, dict): + for server_name, server_cfg in servers_obj.items(): + if not isinstance(server_cfg, dict): + continue + servers.append( + self._make_server( + source_path=os.path.abspath(path), + server_name=str(server_name), + command=server_cfg.get("command"), + args=server_cfg.get("args"), + url=server_cfg.get("url") or server_cfg.get("serverUrl"), + env=server_cfg.get("env"), + cwd=server_cfg.get("cwd"), + headers=server_cfg.get("headers"), + transport=server_cfg.get("transport") or server_cfg.get("type"), + annotations=server_cfg.get("annotations"), + trust=server_cfg.get("trust"), + raw=server_cfg, + scope="project", + ) + ) + + return self._make_config_file(path, servers, scope="project") diff --git a/guarddog/scanners/mcp/parsers/windsurf.py b/guarddog/scanners/mcp/parsers/windsurf.py new file mode 100644 index 000000000..eaaad3784 --- /dev/null +++ b/guarddog/scanners/mcp/parsers/windsurf.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +import os + +from guarddog.scanners.mcp.models import MCPConfigFile +from guarddog.scanners.mcp.parsers.base import MCPConfigParser + + +class WindsurfParser(MCPConfigParser): + client_name = "windsurf" + + def matches(self, path: str) -> bool: + normalized = path.replace("\\", "/").lower() + return normalized.endswith("/.codeium/windsurf/mcp_config.json") + + def parse(self, path: str) -> MCPConfigFile: + data = self._load_json(path) + servers_obj = data.get("mcpServers", data.get("servers", {})) + servers = [] + + if isinstance(servers_obj, dict): + for server_name, server_cfg in servers_obj.items(): + if not isinstance(server_cfg, dict): + continue + servers.append( + self._make_server( + source_path=os.path.abspath(path), + server_name=str(server_name), + command=server_cfg.get("command"), + args=server_cfg.get("args"), + url=server_cfg.get("url"), + env=server_cfg.get("env"), + cwd=server_cfg.get("cwd"), + headers=server_cfg.get("headers"), + transport=server_cfg.get("transport"), + annotations=server_cfg.get("annotations"), + trust=server_cfg.get("trust"), + raw=server_cfg, + scope="user", + ) + ) + + return self._make_config_file(path, servers, scope="user") diff --git a/guarddog/scanners/mcp_config_scanner.py b/guarddog/scanners/mcp_config_scanner.py new file mode 100644 index 000000000..94f2e5b7c --- /dev/null +++ b/guarddog/scanners/mcp_config_scanner.py @@ -0,0 +1,71 @@ +from __future__ import annotations + +import logging +import os +import typing + +from guarddog.analyzer.analyzer import Analyzer +from guarddog.ecosystems import ECOSYSTEM +from guarddog.scanners.mcp.discovery import discover_and_parse_mcp_configs +from guarddog.scanners.scanner import PackageScanner, noop + +log = logging.getLogger("guarddog") + + +class MCPConfigScanner(PackageScanner): + """ + Local-only scanner for MCP config files/directories. + """ + + def __init__(self) -> None: + super().__init__(Analyzer(ECOSYSTEM.MCP)) + + def scan_local( + self, + path, + rules=None, + callback: typing.Callable[[dict], None] = noop, + ) -> dict: + log.info("Scanning MCP configs at %s", os.path.abspath(path)) + inventory = discover_and_parse_mcp_configs(path) + return self._scan_inventory(path, inventory, rules, callback) + + def _scan_inventory( + self, + path, + inventory, + rules=None, + callback: typing.Callable[[dict], None] = noop, + ) -> dict: + """Run metadata analysis on a pre-parsed MCPInventory (avoids re-discovery).""" + if rules is not None: + rules = set(rules) + + payload = inventory.to_dict() + + num_rules = len(rules) if rules else len(self.analyzer.metadata_ruleset) + log.info("Running %d metadata rule(s) against %d server(s) ...", + num_rules, len(inventory.servers)) + + result = self.analyzer.analyze_metadata( + path=os.path.abspath(path), + info=payload, + rules=rules, + name=os.path.basename(path), + version=None, + ) + + log.info("Scan complete: %d issue(s) found", result.get("issues", 0)) + + result["path"] = os.path.abspath(path) + result["inventory"] = payload + callback(result) + return result + + def download_and_get_package_info( + self, + directory: str, + package_name: str, + version=None, + ) -> tuple[dict, str]: + raise NotImplementedError("Remote MCP scans are not supported") diff --git a/guarddog/scanners/mcp_project_scanner.py b/guarddog/scanners/mcp_project_scanner.py new file mode 100644 index 000000000..f843c3edf --- /dev/null +++ b/guarddog/scanners/mcp_project_scanner.py @@ -0,0 +1,91 @@ +from __future__ import annotations + +import logging +import os +import typing +from dataclasses import dataclass +from typing import List + +from guarddog.scanners.mcp.discovery import discover_and_parse_mcp_configs, discover_mcp_configs +from guarddog.scanners.mcp.models import MCPConfigFile, MCPInventory +from guarddog.scanners.mcp_config_scanner import MCPConfigScanner +from guarddog.scanners.scanner import Dependency, DependencyFile, DependencyVersion, ProjectScanner, noop + +log = logging.getLogger("guarddog") + + +@dataclass +class MCPDependencyFile(DependencyFile): + dependencies: List[Dependency] + + +class MCPDiscoveryScanner(ProjectScanner): + """ + Project scanner that discovers MCP configs under a repo/workspace path and + analyzes each config file locally. + """ + + def __init__(self) -> None: + super().__init__(MCPConfigScanner()) + + def parse_requirements(self, raw_requirements: str) -> List[Dependency]: + # Not used for MCP because parsing is file-format specific. + return [] + + def find_requirements(self, directory: str) -> list[str]: + return discover_mcp_configs(directory) + + def _dependency_files_from_inventory( + self, + config_files: list[MCPConfigFile], + ) -> list[DependencyFile]: + dep_files: list[DependencyFile] = [] + + for config_file in config_files: + dependencies = [ + Dependency( + name=server.server_name, + versions={DependencyVersion(version=server.transport, location=0)}, + ) + for server in config_file.servers + ] + dep_files.append( + MCPDependencyFile( + file_path=config_file.file_path, + dependencies=dependencies, + ) + ) + + return dep_files + + def scan_local( + self, + path, + rules=None, + callback: typing.Callable[[dict], None] = noop, + ) -> tuple[list[DependencyFile], list[dict]]: + log.info("Verifying MCP configs under %s", os.path.abspath(path)) + inventory = discover_and_parse_mcp_configs(path) + dep_files = self._dependency_files_from_inventory(inventory.config_files) + + results: list[dict] = [] + total = len(inventory.config_files) + for idx, config_file in enumerate(inventory.config_files, 1): + log.info("[%d/%d] Scanning %s ...", idx, total, config_file.file_path) + # Build a single-file inventory so the config scanner can reuse + # the already-parsed data instead of re-discovering and re-parsing. + single = MCPInventory(config_files=[config_file]) + result = self.package_scanner._scan_inventory( + config_file.file_path, single, rules=rules, + ) + shaped = { + "dependency": config_file.file_path, + "version": None, + "result": result, + } + callback(shaped) + results.append(shaped) + + total_issues = sum(r["result"].get("issues", 0) for r in results) + log.info("Verify complete: scanned %d config file(s), %d total issue(s)", total, total_issues) + return dep_files, results From 1e42c2124212d95ac3e4f1e1bfc8a96fb7fcefdc Mon Sep 17 00:00:00 2001 From: Frank Graziano <7840487+frankgraziano@users.noreply.github.com> Date: Mon, 23 Mar 2026 13:38:38 -0400 Subject: [PATCH 2/4] Add MCP metadata detectors Add 7 security rules for MCP configuration analysis: - inline-secret-in-mcp-config: hard-coded credentials in env/headers - plaintext-http-mcp: insecure HTTP endpoints - arbitrary-shell-launcher: shell wrapper execution (bash, sh, cmd, etc.) - shared-project-mcp-config: project-scoped configs shared via VCS - floating-package-launcher: unpinned npx/uvx/pipx/docker packages - dangerous-tool-surface: server names suggesting risky capabilities - overbroad-filesystem-access: broad paths (/, ~, .ssh, .aws) in args Register the rules in the metadata detector registry so they are available via 'guarddog mcp list-rules' and 'guarddog mcp scan'. Co-Authored-By: Claude Opus 4.6 --- guarddog/analyzer/metadata/__init__.py | 14 +- guarddog/analyzer/metadata/mcp.py | 328 +++++++++++++++++++++++++ 2 files changed, 337 insertions(+), 5 deletions(-) create mode 100644 guarddog/analyzer/metadata/mcp.py diff --git a/guarddog/analyzer/metadata/__init__.py b/guarddog/analyzer/metadata/__init__.py index 86bfbedd0..f4d000fd3 100644 --- a/guarddog/analyzer/metadata/__init__.py +++ b/guarddog/analyzer/metadata/__init__.py @@ -1,23 +1,27 @@ from guarddog.analyzer.metadata.detector import Detector -from guarddog.analyzer.metadata.npm import NPM_METADATA_RULES -from guarddog.analyzer.metadata.pypi import PYPI_METADATA_RULES -from guarddog.analyzer.metadata.go import GO_METADATA_RULES -from guarddog.analyzer.metadata.github_action import GITHUB_ACTION_METADATA_RULES -from guarddog.analyzer.metadata.rubygems import RUBYGEMS_METADATA_RULES from guarddog.ecosystems import ECOSYSTEM def get_metadata_detectors(ecosystem: ECOSYSTEM) -> dict[str, Detector]: match (ecosystem): case ECOSYSTEM.PYPI: + from guarddog.analyzer.metadata.pypi import PYPI_METADATA_RULES return PYPI_METADATA_RULES case ECOSYSTEM.NPM: + from guarddog.analyzer.metadata.npm import NPM_METADATA_RULES return NPM_METADATA_RULES case ECOSYSTEM.GO: + from guarddog.analyzer.metadata.go import GO_METADATA_RULES return GO_METADATA_RULES case ECOSYSTEM.GITHUB_ACTION: + from guarddog.analyzer.metadata.github_action import GITHUB_ACTION_METADATA_RULES return GITHUB_ACTION_METADATA_RULES case ECOSYSTEM.EXTENSION: return {} # No metadata detectors for extensions currently case ECOSYSTEM.RUBYGEMS: + from guarddog.analyzer.metadata.rubygems import RUBYGEMS_METADATA_RULES return RUBYGEMS_METADATA_RULES + case ECOSYSTEM.MCP: + from guarddog.analyzer.metadata.mcp import MCP_METADATA_RULES + return MCP_METADATA_RULES + return {} diff --git a/guarddog/analyzer/metadata/mcp.py b/guarddog/analyzer/metadata/mcp.py new file mode 100644 index 000000000..67526d698 --- /dev/null +++ b/guarddog/analyzer/metadata/mcp.py @@ -0,0 +1,328 @@ +from __future__ import annotations + +import re +from typing import Any, Iterable + +from guarddog.analyzer.metadata.detector import Detector + +_GUARDDOG_DOCS_BASE = "https://github.com/DataDog/guarddog/wiki/MCP-Rules" + +SECRET_KEY_RE = re.compile( + r"(api[_-]?key|token|secret|password|passwd|authorization|auth|cookie|session)", + re.IGNORECASE, +) + +SECRET_VALUE_RE = re.compile( + r"(?i)(sk-[a-z0-9]{16,}|ghp_[a-z0-9]{20,}|github_pat_[a-z0-9_]{20,}|bearer\s+[a-z0-9._-]{10,})" +) + +DANGEROUS_NAME_RE = re.compile( + r"(shell|exec|run|delete|write|push|deploy|ssh|kubectl|terraform|sql|browser)", + re.IGNORECASE, +) + + +def _servers(package_info: dict[str, Any] | None) -> list[dict[str, Any]]: + if not isinstance(package_info, dict): + return [] + servers = package_info.get("servers", []) + return [server for server in servers if isinstance(server, dict)] + + +def _is_placeholder(value: str | None) -> bool: + if value is None: + return False + return ( + "${" in value + or value.startswith("$") + or value.startswith("%") + or (value.startswith("{") and value.endswith("}")) + ) + + +def _iter_secret_candidates(server: dict[str, Any]) -> Iterable[tuple[str, str]]: + env = server.get("env", {}) + headers = server.get("headers", {}) + for section_name, section in (("env", env), ("headers", headers)): + if not isinstance(section, dict): + continue + for key, value in section.items(): + if value is None: + continue + value_str = str(value) + if _is_placeholder(value_str): + continue + yield f"{section_name}.{key}", value_str + + +class InlineSecretInMCPConfig(Detector): + RULE_NAME = "inline-secret-in-mcp-config" + + def __init__(self) -> None: + super().__init__( + self.RULE_NAME, + "Detects inline secrets in MCP config env vars or headers", + help_url=f"{_GUARDDOG_DOCS_BASE}#inline-secret-in-mcp-config", + verbose_description=( + "Hard-coded credentials in MCP configuration files are exposed to " + "anyone with read access to the config. Secrets in env vars or headers " + "should be referenced via environment variable expansion (e.g. " + "${API_KEY}) or a secrets manager rather than stored as plaintext values. " + "Leaked API keys and tokens can lead to unauthorized access to external " + "services." + ), + ) + + def detect(self, package_info, path=None, name=None, version=None): + for server in _servers(package_info): + server_name = server.get("server_name", "") + for field_name, value in _iter_secret_candidates(server): + if SECRET_KEY_RE.search(field_name) or SECRET_VALUE_RE.search(value): + return ( + True, + f"MCP server '{server_name}' contains an inline secret in '{field_name}'", + ) + return (False, None) + + +class PlaintextHTTPMCP(Detector): + RULE_NAME = "plaintext-http-mcp" + + def __init__(self) -> None: + super().__init__( + self.RULE_NAME, + "Detects MCP servers using plaintext HTTP", + help_url=f"{_GUARDDOG_DOCS_BASE}#plaintext-http-mcp", + verbose_description=( + "MCP servers configured with http:// endpoints transmit tool calls, " + "responses, and any embedded credentials in cleartext. A network-level " + "attacker can intercept or modify traffic. Use https:// to ensure TLS " + "encryption for all MCP transport." + ), + ) + + def detect(self, package_info, path=None, name=None, version=None): + for server in _servers(package_info): + url = server.get("url") + if isinstance(url, str) and url.lower().startswith("http://"): + return ( + True, + f"MCP server '{server.get('server_name', '')}' uses insecure HTTP endpoint '{url}'", + ) + return (False, None) + + +class ArbitraryShellLauncher(Detector): + RULE_NAME = "arbitrary-shell-launcher" + + _SHELL_COMMANDS = { + "bash", "sh", "zsh", "cmd", "powershell", "pwsh", + "fish", "ksh", "csh", "tcsh", "dash", + } + + def __init__(self) -> None: + super().__init__( + self.RULE_NAME, + "Detects MCP servers launched through a shell wrapper", + help_url=f"{_GUARDDOG_DOCS_BASE}#arbitrary-shell-launcher", + verbose_description=( + "Launching an MCP server through a shell interpreter (e.g. bash -c '...') " + "allows arbitrary command execution and makes it difficult to audit what " + "actually runs. The shell may expand variables, follow pipes, or execute " + "additional commands. Prefer invoking the server binary directly with " + "explicit arguments." + ), + ) + + def detect(self, package_info, path=None, name=None, version=None): + for server in _servers(package_info): + command = str(server.get("command") or "").lower() + args = [str(arg).lower() for arg in server.get("args", [])] + + if command in self._SHELL_COMMANDS: + return ( + True, + f"MCP server '{server.get('server_name', '')}' is launched via shell command '{command}'", + ) + + dangerous_flags = {"-c", "/c", "-command", "-encodedcommand"} + if any(arg in dangerous_flags for arg in args): + return ( + True, + f"MCP server '{server.get('server_name', '')}' uses shell execution flags in args", + ) + + return (False, None) + + +class SharedProjectMCPConfig(Detector): + RULE_NAME = "shared-project-mcp-config" + + def __init__(self) -> None: + super().__init__( + self.RULE_NAME, + "Detects project-scoped MCP configuration likely to be shared in a repository", + help_url=f"{_GUARDDOG_DOCS_BASE}#shared-project-mcp-config", + verbose_description=( + "Project-scoped MCP config files (e.g. .mcp.json, .cursor/mcp.json) " + "are typically committed to version control and shared with all " + "collaborators and CI. A malicious contributor could add or modify server " + "entries to exfiltrate data or run arbitrary code on other developers' " + "machines. Review project MCP configs carefully during code review and " + "consider whether they should be in .gitignore." + ), + ) + + def detect(self, package_info, path=None, name=None, version=None): + for server in _servers(package_info): + source_path = str(server.get("source_path") or "") + normalized = source_path.replace("\\", "/").lower() + if any( + normalized.endswith(marker) + for marker in ( + "/.mcp.json", + "/.claude.json", + "/.cursor/mcp.json", + "/.vscode/mcp.json", + "/.roo/mcp.json", + ) + ): + return ( + True, + f"MCP config '{source_path}' is project-scoped and may be shared with collaborators or CI", + ) + return (False, None) + + +class FloatingPackageLauncher(Detector): + RULE_NAME = "floating-package-launcher" + + _VERSION_PIN_RE = re.compile(r"@[\d]") + + def __init__(self) -> None: + super().__init__( + self.RULE_NAME, + "Detects unpinned launchers such as npx, uvx, pipx, or docker latest", + help_url=f"{_GUARDDOG_DOCS_BASE}#floating-package-launcher", + verbose_description=( + "Package launchers like npx, uvx, and pipx resolve packages at runtime. " + "Without an explicit version pin, the resolved package can change between " + "runs. An attacker who compromises a package or publishes a typosquat can " + "execute arbitrary code the next time the MCP server starts. Pin all " + "packages to a specific version (e.g. npx some-package@1.2.3)." + ), + ) + + def detect(self, package_info, path=None, name=None, version=None): + for server in _servers(package_info): + command = str(server.get("command") or "").lower() + args = [str(arg) for arg in server.get("args", [])] + args_lower = [a.lower() for a in args] + rendered = " ".join([command, *args_lower]).strip() + + if command == "npx" and ("@latest" in rendered or "-y" in args_lower): + return ( + True, + f"MCP server '{server.get('server_name', '')}' is launched with floating npx package resolution", + ) + + if command in {"uvx", "pipx"}: + if not any(self._VERSION_PIN_RE.search(a) for a in args): + return ( + True, + f"MCP server '{server.get('server_name', '')}' is launched through '{command}' without an explicit pinned package version", + ) + + if command == "docker" and any(":latest" in arg for arg in args_lower): + return ( + True, + f"MCP server '{server.get('server_name', '')}' uses a docker image pinned to ':latest'", + ) + + return (False, None) + + +class DangerousToolSurface(Detector): + RULE_NAME = "dangerous-tool-surface" + + def __init__(self) -> None: + super().__init__( + self.RULE_NAME, + "Detects MCP server names suggesting exec, write, admin, or automation capabilities", + help_url=f"{_GUARDDOG_DOCS_BASE}#dangerous-tool-surface", + verbose_description=( + "MCP servers whose name or command suggests destructive or privileged " + "operations (shell, exec, delete, deploy, ssh, kubectl, etc.) present a " + "higher risk surface. If an AI agent is granted access to such a server, " + "a prompt-injection or misconfiguration could lead to unintended system " + "changes. Verify that the server is necessary and scope its permissions " + "to the minimum required." + ), + ) + + def detect(self, package_info, path=None, name=None, version=None): + for server in _servers(package_info): + server_name = str(server.get("server_name") or "") + command = str(server.get("command") or "") + if DANGEROUS_NAME_RE.search(server_name) or DANGEROUS_NAME_RE.search(command): + return ( + True, + f"MCP server '{server_name}' exposes a potentially high-risk tool surface", + ) + + return (False, None) + + +class OverbroadFilesystemAccess(Detector): + RULE_NAME = "overbroad-filesystem-access" + + _HIGH_RISK_PATTERNS = [ + re.compile(r"(?:^|\s)/$|(?:^|\s)/\s"), # bare root / + re.compile(r"(?:^|\s)~(?:\s|/|$)"), # bare tilde + re.compile(r"(?:^|[\s/])\.ssh(?:\s|/|$)"), # .ssh dir + re.compile(r"(?:^|[\s/])\.aws(?:\s|/|$)"), # .aws dir + re.compile(r"(?:^|[\s/])\.config/gcloud(?:\s|/|$)"), # gcloud config + re.compile(r"(?:^|\s)/root(?:\s|/|$)"), # /root + re.compile(r"(?:^|\s)/home(?:\s|/|$)", re.IGNORECASE), # /home + re.compile(r"(?:^|\s)/users(?:\s|/|$)", re.IGNORECASE), # /users + ] + + def __init__(self) -> None: + super().__init__( + self.RULE_NAME, + "Detects MCP servers configured with broad filesystem scope", + help_url=f"{_GUARDDOG_DOCS_BASE}#overbroad-filesystem-access", + verbose_description=( + "MCP servers that receive access to broad or sensitive filesystem paths " + "(/, ~, /home, .ssh, .aws) can read credentials, private keys, or modify " + "system files if the server is compromised or the AI agent is manipulated. " + "Scope filesystem arguments to the narrowest directory required for the " + "task (e.g. the current project directory)." + ), + ) + + def detect(self, package_info, path=None, name=None, version=None): + for server in _servers(package_info): + args = [str(arg) for arg in server.get("args", [])] + cwd = str(server.get("cwd") or "") + haystack = " ".join(args + [cwd]) + + for pattern in self._HIGH_RISK_PATTERNS: + if pattern.search(haystack): + return ( + True, + f"MCP server '{server.get('server_name', '')}' appears to target a broad or sensitive filesystem scope", + ) + + return (False, None) + + +MCP_METADATA_RULES = { + InlineSecretInMCPConfig.RULE_NAME: InlineSecretInMCPConfig(), + PlaintextHTTPMCP.RULE_NAME: PlaintextHTTPMCP(), + ArbitraryShellLauncher.RULE_NAME: ArbitraryShellLauncher(), + SharedProjectMCPConfig.RULE_NAME: SharedProjectMCPConfig(), + FloatingPackageLauncher.RULE_NAME: FloatingPackageLauncher(), + DangerousToolSurface.RULE_NAME: DangerousToolSurface(), + OverbroadFilesystemAccess.RULE_NAME: OverbroadFilesystemAccess(), +} From 3627fca1a5f31ff027e58b7992e31c1a1bf56ebf Mon Sep 17 00:00:00 2001 From: Frank Graziano <7840487+frankgraziano@users.noreply.github.com> Date: Mon, 23 Mar 2026 13:39:05 -0400 Subject: [PATCH 3/4] Add --verbose flag, finding citations, and lazy rule loading Extend the Detector base class with optional help_url and verbose_description fields. The human-readable reporter now shows a 'ref:' link for each metadata finding, and a 'why:' explanation when --verbose is passed. The --verbose flag is threaded through all CLI scan/verify paths and all reporter interfaces. Also introduce _LazyRulesChoice in the CLI to defer metadata detector instantiation until rule names are actually needed. This prevents eager imports that triggered network requests (e.g. rubygems cache refresh) on every CLI invocation, even for unrelated ecosystems. Co-Authored-By: Claude Opus 4.6 --- guarddog/analyzer/metadata/detector.py | 10 +++- guarddog/cli.py | 83 +++++++++++++++++++------- guarddog/reporters/__init__.py | 11 +++- guarddog/reporters/human_readable.py | 54 +++++++++++++++-- guarddog/reporters/json.py | 11 +++- guarddog/reporters/sarif.py | 3 +- 6 files changed, 138 insertions(+), 34 deletions(-) diff --git a/guarddog/analyzer/metadata/detector.py b/guarddog/analyzer/metadata/detector.py index 360e4e4d0..cd908adab 100644 --- a/guarddog/analyzer/metadata/detector.py +++ b/guarddog/analyzer/metadata/detector.py @@ -5,9 +5,17 @@ class Detector: RULE_NAME = "" - def __init__(self, name: str, description: str) -> None: + def __init__( + self, + name: str, + description: str, + help_url: Optional[str] = None, + verbose_description: Optional[str] = None, + ) -> None: self.name = name self.description = description + self.help_url = help_url + self.verbose_description = verbose_description # returns (ruleMatches, message) @abstractmethod diff --git a/guarddog/cli.py b/guarddog/cli.py index e3793c561..a5721adfd 100644 --- a/guarddog/cli.py +++ b/guarddog/cli.py @@ -39,31 +39,64 @@ def common_options(fn): is_flag=True, help="Exit with a non-zero status code if at least one issue is identified", )(fn) + fn = click.option( + "--verbose", + default=False, + is_flag=True, + help="Show detailed explanations and reference links for each finding", + )(fn) fn = click.argument("target")(fn) return fn -def legacy_rules_options(fn): - ALL_RULES = reduce( - lambda a, b: a | b, - map( - lambda e: set(r.id for r in get_sourcecode_rules(e)) - | set(get_metadata_detectors(e).keys()), - [e for e in ECOSYSTEM], - ), - ) +class _LazyRulesChoice(click.Choice): + """Defers computation of rule sets until first access. + + This avoids eagerly instantiating metadata detectors at import time + (which can trigger network requests for cache refresh). + """ + + def __init__(self, ecosystems=None): + super().__init__([], case_sensitive=False) + self._resolved = False + self._ecosystems = ecosystems + + def _resolve(self): + if not self._resolved: + targets = self._ecosystems or list(ECOSYSTEM) + self.choices = sorted( + reduce( + lambda a, b: a | b, + map( + lambda e: set(r.id for r in get_sourcecode_rules(e)) + | set(get_metadata_detectors(e).keys()), + targets, + ), + ) + ) + self._resolved = True + + def get_metavar(self, param): + self._resolve() + return super().get_metavar(param) + def convert(self, value, param, ctx): + self._resolve() + return super().convert(value, param, ctx) + + +def legacy_rules_options(fn): fn = click.option( "-r", "--rules", multiple=True, - type=click.Choice(ALL_RULES, case_sensitive=False), + type=_LazyRulesChoice(), )(fn) fn = click.option( "-x", "--exclude-rules", multiple=True, - type=click.Choice(ALL_RULES, case_sensitive=False), + type=_LazyRulesChoice(), )(fn) return fn @@ -147,7 +180,8 @@ def _get_rule_param( def _verify( - path, rules, exclude_rules, output_format, exit_non_zero_on_finding, ecosystem + path, rules, exclude_rules, output_format, exit_non_zero_on_finding, ecosystem, + verbose=False, ): """Verify a requirements.txt file @@ -171,6 +205,7 @@ def _verify( rule_names=rule_docs, scan_results=results, ecosystem=ecosystem, + verbose=verbose, ) sys.stdout.write(stdout) @@ -190,6 +225,7 @@ def _scan( output_format, exit_non_zero_on_finding, ecosystem: ECOSYSTEM, + verbose=False, ): """Scan a package @@ -223,7 +259,7 @@ def _scan( sys.exit(1) reporter = ReporterFactory.create_reporter(ReporterType.from_str(output_format)) - stdout, stderr = reporter.render_scan(result) + stdout, stderr = reporter.render_scan(result, ecosystem=ecosystem, verbose=verbose) sys.stdout.write(stdout) sys.stderr.write(stderr) @@ -260,18 +296,18 @@ def __init__(self, ecosystem: ECOSYSTEM): self.ecosystem = ecosystem def rule_options(fn): - rules = _get_all_rules(self.ecosystem) + lazy_choice = _LazyRulesChoice(ecosystems=[self.ecosystem]) fn = click.option( "-r", "--rules", multiple=True, - type=click.Choice(rules, case_sensitive=False), + type=lazy_choice, )(fn) fn = click.option( "-x", "--exclude-rules", multiple=True, - type=click.Choice(rules, case_sensitive=False), + type=lazy_choice, )(fn) return fn @@ -286,6 +322,7 @@ def scan_ecosystem( exclude_rules, output_format, exit_non_zero_on_finding, + verbose, ): return _scan( target, @@ -295,6 +332,7 @@ def scan_ecosystem( output_format, exit_non_zero_on_finding, self.ecosystem, + verbose=verbose, ) @click.command("verify", help=f"Verify a given {self.ecosystem.name} package") @@ -302,7 +340,8 @@ def scan_ecosystem( @verify_options @rule_options def verify_ecosystem( - target, rules, exclude_rules, output_format, exit_non_zero_on_finding + target, rules, exclude_rules, output_format, exit_non_zero_on_finding, + verbose, ): return _verify( target, @@ -311,6 +350,7 @@ def verify_ecosystem( output_format, exit_non_zero_on_finding, self.ecosystem, + verbose=verbose, ) @click.command( @@ -333,14 +373,15 @@ def list_rules_ecosystem(): @common_options @verify_options @legacy_rules_options -def verify(target, rules, exclude_rules, output_format, exit_non_zero_on_finding): - return verify( +def verify(target, rules, exclude_rules, output_format, exit_non_zero_on_finding, verbose): + return _verify( target, rules, exclude_rules, output_format, exit_non_zero_on_finding, ECOSYSTEM.PYPI, + verbose=verbose, ) @@ -349,7 +390,8 @@ def verify(target, rules, exclude_rules, output_format, exit_non_zero_on_finding @scan_options @legacy_rules_options def scan( - target, version, rules, exclude_rules, output_format, exit_non_zero_on_finding + target, version, rules, exclude_rules, output_format, exit_non_zero_on_finding, + verbose, ): return _scan( target, @@ -359,6 +401,7 @@ def scan( output_format, exit_non_zero_on_finding, ECOSYSTEM.PYPI, + verbose=verbose, ) diff --git a/guarddog/reporters/__init__.py b/guarddog/reporters/__init__.py index f906ab7eb..df713ca17 100644 --- a/guarddog/reporters/__init__.py +++ b/guarddog/reporters/__init__.py @@ -1,5 +1,5 @@ from guarddog.scanners.scanner import DependencyFile -from typing import List +from typing import List, Optional from guarddog.ecosystems import ECOSYSTEM @@ -9,7 +9,11 @@ class BaseReporter: """ @staticmethod - def render_scan(scan_results: dict) -> tuple[str, str]: + def render_scan( + scan_results: dict, + ecosystem: Optional[ECOSYSTEM] = None, + verbose: bool = False, + ) -> tuple[str, str]: """ Report the scans results. """ @@ -20,7 +24,8 @@ def render_verify( dependency_files: List[DependencyFile], rule_names: list[str], scan_results: list[dict], - ecosystem: ECOSYSTEM, + ecosystem: ECOSYSTEM = None, + verbose: bool = False, ) -> tuple[str, str]: """ Report the scans results. diff --git a/guarddog/reporters/human_readable.py b/guarddog/reporters/human_readable.py index 40c717136..ebcb08022 100644 --- a/guarddog/reporters/human_readable.py +++ b/guarddog/reporters/human_readable.py @@ -1,10 +1,26 @@ from termcolor import colored +from typing import List, Optional + from guarddog.reporters import BaseReporter -from typing import List from guarddog.scanners.scanner import DependencyFile from guarddog.ecosystems import ECOSYSTEM +def _get_detector_metadata(ecosystem: Optional[ECOSYSTEM], rule_name: str): + """Look up help_url and verbose_description for a rule, if available.""" + if ecosystem is None: + return None, None + try: + from guarddog.analyzer.metadata import get_metadata_detectors + detectors = get_metadata_detectors(ecosystem) + detector = detectors.get(rule_name) + if detector is not None: + return getattr(detector, "help_url", None), getattr(detector, "verbose_description", None) + except Exception: + pass + return None, None + + class HumanReadableReporter(BaseReporter): """ HumanReadableReporter is a class that formats and prints scan results in a human-readable format. @@ -31,7 +47,12 @@ def print_errors(identifier: str, results: dict) -> str: return "\n".join(lines) @staticmethod - def print_scan_results(identifier: str, results: dict) -> str: + def print_scan_results( + identifier: str, + results: dict, + ecosystem: Optional[ECOSYSTEM] = None, + verbose: bool = False, + ) -> str: def _format_code_line_for_output(code) -> str: return " " + colored( @@ -72,6 +93,16 @@ def _format_code_line_for_output(code) -> str: lines.append( colored(finding, None, attrs=["bold"]) + ": " + description ) + # Add citation / help link + help_url, verbose_desc = _get_detector_metadata(ecosystem, finding) + if help_url: + lines.append( + " " + colored("ref:", "cyan") + " " + help_url + ) + if verbose and verbose_desc: + lines.append( + " " + colored("why:", "cyan") + " " + verbose_desc + ) lines.append("") elif isinstance(description, list): # semgrep rule result: source_code_findings = description @@ -95,7 +126,11 @@ def _format_code_line_for_output(code) -> str: return "\n".join(lines) @staticmethod - def render_scan(scan_results: dict) -> tuple[str, str]: + def render_scan( + scan_results: dict, + ecosystem: Optional[ECOSYSTEM] = None, + verbose: bool = False, + ) -> tuple[str, str]: """ Report the scans results in a human-readable format. @@ -104,7 +139,10 @@ def render_scan(scan_results: dict) -> tuple[str, str]: """ return ( HumanReadableReporter.print_scan_results( - identifier=scan_results["package"], results=scan_results + identifier=scan_results["package"], + results=scan_results, + ecosystem=ecosystem, + verbose=verbose, ), HumanReadableReporter.print_errors( identifier=scan_results["package"], results=scan_results @@ -116,13 +154,17 @@ def render_verify( dependency_files: List[DependencyFile], rule_names: list[str], scan_results: list[dict], - ecosystem: ECOSYSTEM, + ecosystem: ECOSYSTEM = None, + verbose: bool = False, ) -> tuple[str, str]: return ( "\n".join( [ HumanReadableReporter.print_scan_results( - identifier=s["dependency"], results=s["result"] + identifier=s["dependency"], + results=s["result"], + ecosystem=ecosystem, + verbose=verbose, ) for s in scan_results ] diff --git a/guarddog/reporters/json.py b/guarddog/reporters/json.py index efb75eb1d..7b3d31c49 100644 --- a/guarddog/reporters/json.py +++ b/guarddog/reporters/json.py @@ -1,5 +1,5 @@ import json -from typing import List +from typing import List, Optional from guarddog.scanners.scanner import DependencyFile from guarddog.ecosystems import ECOSYSTEM @@ -12,12 +12,17 @@ def render_verify( dependency_files: List[DependencyFile], rule_names: list[str], scan_results: list[dict], - ecosystem: ECOSYSTEM, + ecosystem: ECOSYSTEM = None, + verbose: bool = False, ) -> tuple[str, str]: return json.dumps(scan_results), "" @staticmethod - def render_scan(scan_results: dict) -> tuple[str, str]: + def render_scan( + scan_results: dict, + ecosystem: Optional[ECOSYSTEM] = None, + verbose: bool = False, + ) -> tuple[str, str]: """ Report the scans results in a json format. diff --git a/guarddog/reporters/sarif.py b/guarddog/reporters/sarif.py index 456f59e3c..35141cb85 100644 --- a/guarddog/reporters/sarif.py +++ b/guarddog/reporters/sarif.py @@ -20,7 +20,8 @@ def render_verify( dependency_files: List[DependencyFile], rule_names: list[str], scan_results: list[dict], - ecosystem: ECOSYSTEM, + ecosystem: ECOSYSTEM = None, + verbose: bool = False, ) -> tuple[str, str]: """ Report the scans results in the SARIF format. From 8eb8d7a95ac1d7f26439bccda9a209b0b375e228 Mon Sep 17 00:00:00 2001 From: Frank Graziano <7840487+frankgraziano@users.noreply.github.com> Date: Mon, 23 Mar 2026 13:39:22 -0400 Subject: [PATCH 4/4] Add tests for MCP detectors and config scanner Add 48 unit tests covering all 7 MCP metadata detectors, including positive detection, negative/safe cases, edge cases (placeholders, substrings, scoped paths), and the rule registry. Add 5 integration tests for MCPConfigScanner.scan_local() using temporary .mcp.json configs to verify end-to-end detection of inline secrets, plaintext HTTP, shell launchers, and benign configs. Co-Authored-By: Claude Opus 4.6 --- tests/analyzer/metadata/test_mcp_detectors.py | 241 ++++++++++++++++++ tests/core/test_mcp_config_scanner.py | 87 +++++++ 2 files changed, 328 insertions(+) create mode 100644 tests/analyzer/metadata/test_mcp_detectors.py create mode 100644 tests/core/test_mcp_config_scanner.py diff --git a/tests/analyzer/metadata/test_mcp_detectors.py b/tests/analyzer/metadata/test_mcp_detectors.py new file mode 100644 index 000000000..9a586538c --- /dev/null +++ b/tests/analyzer/metadata/test_mcp_detectors.py @@ -0,0 +1,241 @@ +import pytest + +from guarddog.analyzer.metadata.mcp import ( + MCP_METADATA_RULES, + ArbitraryShellLauncher, + DangerousToolSurface, + FloatingPackageLauncher, + InlineSecretInMCPConfig, + OverbroadFilesystemAccess, + PlaintextHTTPMCP, + SharedProjectMCPConfig, +) + + +def _make_info(*servers): + return {"servers": list(servers)} + + +def _server(**kwargs): + base = {"server_name": "test-server", "source_path": "/tmp/mcp.json"} + base.update(kwargs) + return base + + +class TestInlineSecretInMCPConfig: + detector = InlineSecretInMCPConfig() + + def test_detects_secret_key_in_env(self): + info = _make_info(_server(env={"API_KEY": "sk-abc123def456ghij"})) + matched, msg = self.detector.detect(info) + assert matched + assert "inline secret" in msg + + def test_detects_secret_value_pattern(self): + info = _make_info(_server(env={"MY_VAR": "ghp_abcdefghijklmnopqrstuvwx"})) + matched, msg = self.detector.detect(info) + assert matched + + def test_detects_secret_in_headers(self): + info = _make_info(_server(headers={"Authorization": "Bearer my-token-value123"})) + matched, msg = self.detector.detect(info) + assert matched + + def test_ignores_placeholder_env(self): + info = _make_info(_server(env={"API_KEY": "${API_KEY}"})) + matched, _ = self.detector.detect(info) + assert not matched + + def test_ignores_safe_env(self): + info = _make_info(_server(env={"LOG_LEVEL": "debug"})) + matched, _ = self.detector.detect(info) + assert not matched + + def test_no_servers(self): + matched, _ = self.detector.detect({"servers": []}) + assert not matched + + +class TestPlaintextHTTPMCP: + detector = PlaintextHTTPMCP() + + def test_detects_http_url(self): + info = _make_info(_server(url="http://example.com/mcp")) + matched, msg = self.detector.detect(info) + assert matched + assert "HTTP" in msg + + def test_allows_https_url(self): + info = _make_info(_server(url="https://example.com/mcp")) + matched, _ = self.detector.detect(info) + assert not matched + + def test_no_url(self): + info = _make_info(_server(command="npx", args=["some-server"])) + matched, _ = self.detector.detect(info) + assert not matched + + +class TestArbitraryShellLauncher: + detector = ArbitraryShellLauncher() + + @pytest.mark.parametrize("shell", ["bash", "sh", "zsh", "cmd", "powershell", "pwsh"]) + def test_detects_shell_command(self, shell): + info = _make_info(_server(command=shell, args=["-c", "echo hello"])) + matched, msg = self.detector.detect(info) + assert matched + assert shell in msg.lower() or "shell" in msg.lower() + + def test_detects_shell_flag_in_args(self): + info = _make_info(_server(command="node", args=["-c", "some-code"])) + matched, msg = self.detector.detect(info) + assert matched + + def test_allows_normal_command(self): + info = _make_info(_server(command="npx", args=["@modelcontextprotocol/server"])) + matched, _ = self.detector.detect(info) + assert not matched + + +class TestSharedProjectMCPConfig: + detector = SharedProjectMCPConfig() + + @pytest.mark.parametrize( + "path", + [ + "/repo/.mcp.json", + "/repo/.cursor/mcp.json", + "/repo/.vscode/mcp.json", + "/repo/.roo/mcp.json", + ], + ) + def test_detects_project_scoped_config(self, path): + info = _make_info(_server(source_path=path)) + matched, msg = self.detector.detect(info) + assert matched + assert "project-scoped" in msg + + def test_allows_user_scoped_config(self): + info = _make_info( + _server(source_path="/Users/me/Library/Application Support/Claude/config.json") + ) + matched, _ = self.detector.detect(info) + assert not matched + + +class TestFloatingPackageLauncher: + detector = FloatingPackageLauncher() + + def test_detects_npx_latest(self): + info = _make_info(_server(command="npx", args=["@modelcontextprotocol/server@latest"])) + matched, msg = self.detector.detect(info) + assert matched + assert "npx" in msg + + def test_detects_npx_dash_y(self): + info = _make_info(_server(command="npx", args=["-y", "some-package"])) + matched, msg = self.detector.detect(info) + assert matched + + def test_detects_uvx(self): + info = _make_info(_server(command="uvx", args=["some-package"])) + matched, msg = self.detector.detect(info) + assert matched + assert "uvx" in msg + + def test_detects_pipx(self): + info = _make_info(_server(command="pipx", args=["run", "some-package"])) + matched, msg = self.detector.detect(info) + assert matched + + def test_detects_docker_latest(self): + info = _make_info(_server(command="docker", args=["run", "myimage:latest"])) + matched, msg = self.detector.detect(info) + assert matched + assert "docker" in msg + + def test_allows_pinned_npx(self): + info = _make_info(_server(command="npx", args=["some-package@1.2.3"])) + matched, _ = self.detector.detect(info) + assert not matched + + +class TestDangerousToolSurface: + detector = DangerousToolSurface() + + @pytest.mark.parametrize( + "name", + ["shell-executor", "exec-server", "run-command", "delete-files", "ssh-tunnel"], + ) + def test_detects_dangerous_server_name(self, name): + info = _make_info(_server(server_name=name)) + matched, msg = self.detector.detect(info) + assert matched + assert "high-risk" in msg + + def test_detects_dangerous_command(self): + info = _make_info(_server(server_name="safe-name", command="kubectl")) + matched, msg = self.detector.detect(info) + assert matched + + def test_allows_safe_name(self): + info = _make_info(_server(server_name="weather-api", command="node")) + matched, _ = self.detector.detect(info) + assert not matched + + +class TestOverbroadFilesystemAccess: + detector = OverbroadFilesystemAccess() + + @pytest.mark.parametrize("path", ["~", ".ssh", ".aws"]) + def test_detects_broad_path_in_args(self, path): + info = _make_info(_server(args=["--dir", path])) + matched, msg = self.detector.detect(info) + assert matched + assert "broad" in msg or "sensitive" in msg + + @pytest.mark.parametrize("path", ["/", "/root", "/home", "/users"]) + def test_detects_broad_absolute_path(self, path): + info = _make_info(_server(args=[path])) + matched, msg = self.detector.detect(info) + assert matched + assert "broad" in msg or "sensitive" in msg + + def test_detects_broad_cwd(self): + info = _make_info(_server(cwd="/")) + matched, _ = self.detector.detect(info) + assert matched + + def test_allows_scoped_path(self): + """A specific project directory should not trigger.""" + info = _make_info(_server(args=["--dir", "/opt/myapp/data"], cwd="/opt/myapp")) + matched, _ = self.detector.detect(info) + assert not matched + + def test_allows_safe_path(self): + info = _make_info(_server(args=["--port", "8080"], cwd=None)) + matched, _ = self.detector.detect(info) + assert not matched + + def test_no_false_positive_on_substring(self): + """Paths like /opt/dot.ssh-backup should not trigger the .ssh rule.""" + info = _make_info(_server(args=["/opt/dot.ssh-backup"])) + matched, _ = self.detector.detect(info) + assert not matched + + +class TestMCPMetadataRulesRegistry: + def test_all_seven_rules_registered(self): + assert len(MCP_METADATA_RULES) == 7 + + def test_rule_names(self): + expected = { + "inline-secret-in-mcp-config", + "plaintext-http-mcp", + "arbitrary-shell-launcher", + "shared-project-mcp-config", + "floating-package-launcher", + "dangerous-tool-surface", + "overbroad-filesystem-access", + } + assert set(MCP_METADATA_RULES.keys()) == expected diff --git a/tests/core/test_mcp_config_scanner.py b/tests/core/test_mcp_config_scanner.py new file mode 100644 index 000000000..102aea76f --- /dev/null +++ b/tests/core/test_mcp_config_scanner.py @@ -0,0 +1,87 @@ +import json +import os +import tempfile + +from guarddog.scanners.mcp_config_scanner import MCPConfigScanner + + +def _write_mcp_json(directory, servers): + """Write a project-scoped .mcp.json config that discovery will find.""" + config = {"mcpServers": servers} + path = os.path.join(directory, ".mcp.json") + with open(path, "w") as f: + json.dump(config, f) + return path + + +def test_scan_local_detects_inline_secret(): + scanner = MCPConfigScanner() + with tempfile.TemporaryDirectory() as tmpdir: + _write_mcp_json(tmpdir, { + "risky-server": { + "command": "node", + "args": ["server.js"], + "env": {"API_KEY": "sk-abcdef1234567890"}, + } + }) + result = scanner.scan_local(tmpdir) + assert "issues" in result + assert result["issues"] > 0 + assert "inline-secret-in-mcp-config" in result["results"] + assert result["results"]["inline-secret-in-mcp-config"] is not None + + +def test_scan_local_detects_plaintext_http(): + scanner = MCPConfigScanner() + with tempfile.TemporaryDirectory() as tmpdir: + _write_mcp_json(tmpdir, { + "http-server": { + "url": "http://example.com/mcp", + } + }) + result = scanner.scan_local(tmpdir) + assert result["issues"] > 0 + assert result["results"]["plaintext-http-mcp"] is not None + + +def test_scan_local_detects_shell_launcher(): + scanner = MCPConfigScanner() + with tempfile.TemporaryDirectory() as tmpdir: + _write_mcp_json(tmpdir, { + "shell-server": { + "command": "bash", + "args": ["-c", "python server.py"], + } + }) + result = scanner.scan_local(tmpdir) + assert result["issues"] > 0 + assert result["results"]["arbitrary-shell-launcher"] is not None + + +def test_scan_local_benign_config(): + """A .mcp.json is project-scoped so shared-project-mcp-config always fires. + Verify that no *other* rules trigger for an otherwise benign config.""" + scanner = MCPConfigScanner() + with tempfile.TemporaryDirectory() as tmpdir: + _write_mcp_json(tmpdir, { + "safe-server": { + "command": "node", + "args": ["./server.js"], + "env": {"LOG_LEVEL": "info"}, + } + }) + result = scanner.scan_local(tmpdir) + assert "issues" in result + # shared-project-mcp-config fires because .mcp.json is project-scoped + findings = {k for k, v in result["results"].items() if v is not None} + assert "inline-secret-in-mcp-config" not in findings + assert "plaintext-http-mcp" not in findings + assert "arbitrary-shell-launcher" not in findings + + +def test_scan_local_empty_directory(): + scanner = MCPConfigScanner() + with tempfile.TemporaryDirectory() as tmpdir: + result = scanner.scan_local(tmpdir) + assert "issues" in result + assert result["issues"] == 0