From 36d60fc4a8669ccd8c4e474714ad683590cc3c3e Mon Sep 17 00:00:00 2001 From: Tanmay Gupta Date: Sat, 31 Jan 2026 18:00:54 -0800 Subject: [PATCH 1/2] add standalone mcp shell server --- mcp/loopy-shell.policy.toml | 25 +++++++ mcp/loopy_mcp.py | 26 ++++++++ mcp/policy.py | 72 ++++++++++++++++++++ mcp/pyproject.toml | 16 +++++ mcp/server.py | 130 ++++++++++++++++++++++++++++++++++++ 5 files changed, 269 insertions(+) create mode 100644 mcp/loopy-shell.policy.toml create mode 100644 mcp/loopy_mcp.py create mode 100644 mcp/policy.py create mode 100644 mcp/pyproject.toml create mode 100644 mcp/server.py diff --git a/mcp/loopy-shell.policy.toml b/mcp/loopy-shell.policy.toml new file mode 100644 index 0000000..8476d84 --- /dev/null +++ b/mcp/loopy-shell.policy.toml @@ -0,0 +1,25 @@ +root_dir = ".." +timeout_ms = 120000 +max_output_bytes = 50000 + +[allow] +commands = [ + "git", + "ls", + "pwd", + "rg", + "cat", + "python", + "uv", + "make", + "node", + "npm", + "pnpm", + "bun", +] + +[deny] +commands = ["rm", "shutdown", "reboot", "mkfs", "dd"] + +[env] +allowlist = ["PATH", "HOME", "LANG", "LC_ALL"] diff --git a/mcp/loopy_mcp.py b/mcp/loopy_mcp.py new file mode 100644 index 0000000..64a846a --- /dev/null +++ b/mcp/loopy_mcp.py @@ -0,0 +1,26 @@ +from __future__ import annotations + +import argparse +from pathlib import Path + +from policy import load_policy +from server import create_server + + +def main() -> None: + parser = argparse.ArgumentParser(description="Loopy MCP shell server") + parser.add_argument( + "--policy", + type=Path, + default=None, + help="Path to policy TOML (defaults to $LOOPY_MCP_POLICY or ./loopy-shell.policy.toml)", + ) + args = parser.parse_args() + + policy = load_policy(args.policy) + mcp = create_server(policy) + mcp.run() + + +if __name__ == "__main__": + main() diff --git a/mcp/policy.py b/mcp/policy.py new file mode 100644 index 0000000..7284de4 --- /dev/null +++ b/mcp/policy.py @@ -0,0 +1,72 @@ +from __future__ import annotations + +from dataclasses import dataclass +import os +from pathlib import Path +import tomli as tomllib + + +@dataclass(frozen=True) +class ShellPolicy: + root_dir: Path + timeout_ms: int + max_output_bytes: int + allow_commands: set[str] + deny_commands: set[str] + env_allowlist: set[str] + + +def _load_toml(path: Path) -> dict: + with path.open("rb") as handle: + return tomllib.load(handle) + + +def _parse_commands(config: dict, key: str) -> set[str]: + value = config.get(key, {}) + commands = value.get("commands", []) + return {str(cmd).strip() for cmd in commands if str(cmd).strip()} + + +def _parse_env_allowlist(config: dict) -> set[str]: + env_config = config.get("env", {}) + allowlist = env_config.get("allowlist", []) + return {str(name).strip() for name in allowlist if str(name).strip()} + + +def _resolve_root_dir(raw: str | None, base_dir: Path) -> Path: + if not raw: + return base_dir.resolve() + root = Path(raw) + if not root.is_absolute(): + root = (base_dir / root).resolve() + return root + + +def load_policy(path: Path | None) -> ShellPolicy: + if path is None: + env_path = os.environ.get("LOOPY_MCP_POLICY") + path = Path(env_path) if env_path else Path.cwd() / "loopy-shell.policy.toml" + + if not path.exists(): + raise FileNotFoundError(f"policy file not found: {path}") + + config = _load_toml(path) + root_dir = _resolve_root_dir(config.get("root_dir"), path.parent) + timeout_ms = int(config.get("timeout_ms", 120000)) + max_output_bytes = int(config.get("max_output_bytes", 50000)) + + allow_commands = _parse_commands(config, "allow") + deny_commands = _parse_commands(config, "deny") + env_allowlist = _parse_env_allowlist(config) + + if not allow_commands: + raise ValueError("policy must define allow.commands") + + return ShellPolicy( + root_dir=root_dir, + timeout_ms=timeout_ms, + max_output_bytes=max_output_bytes, + allow_commands=allow_commands, + deny_commands=deny_commands, + env_allowlist=env_allowlist, + ) diff --git a/mcp/pyproject.toml b/mcp/pyproject.toml new file mode 100644 index 0000000..302a4f8 --- /dev/null +++ b/mcp/pyproject.toml @@ -0,0 +1,16 @@ +[project] +name = "loopy-shell-mcp" +version = "0.1.0" +description = "MCP shell server for Loopy" +requires-python = ">=3.10" +dependencies = [ + "mcp>=1.0.0,<2", + "tomli>=2.0.0", +] + +[project.scripts] +loopy-mcp = "loopy_mcp:main" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" diff --git a/mcp/server.py b/mcp/server.py new file mode 100644 index 0000000..32e6fb8 --- /dev/null +++ b/mcp/server.py @@ -0,0 +1,130 @@ +from __future__ import annotations + +from dataclasses import dataclass +import os +from pathlib import Path +import shlex +import subprocess + +from mcp.server.mcpserver import MCPServer + +from policy import ShellPolicy + + +@dataclass(frozen=True) +class ShellResult: + stdout: str + stderr: str + exit_code: int + truncated_stdout: bool + truncated_stderr: bool + + +def _command_name(tokens: list[str]) -> str: + if not tokens: + raise ValueError("empty command") + return Path(tokens[0]).name + + +def _ensure_allowed(policy: ShellPolicy, name: str) -> None: + if name in policy.deny_commands: + raise ValueError(f"command denied: {name}") + if name not in policy.allow_commands: + raise ValueError(f"command not allowed: {name}") + + +def _resolve_cwd(policy: ShellPolicy, cwd: str | None) -> Path: + base = policy.root_dir + if cwd: + candidate = Path(cwd) + if not candidate.is_absolute(): + candidate = base / candidate + base = candidate + resolved = base.resolve() + if not resolved.is_dir(): + raise ValueError(f"cwd is not a directory: {resolved}") + if not resolved.is_relative_to(policy.root_dir): + raise ValueError(f"cwd is outside root_dir: {resolved}") + return resolved + + +def _build_env(policy: ShellPolicy, extra: dict[str, str] | None) -> dict[str, str]: + env = { + name: value + for name, value in os.environ.items() + if name in policy.env_allowlist + } + if not extra: + return env + + for name in extra: + if name not in policy.env_allowlist: + raise ValueError(f"env var not allowed: {name}") + env.update({name: str(value) for name, value in extra.items()}) + return env + + +def _truncate(text: str, max_bytes: int) -> tuple[str, bool]: + if max_bytes <= 0: + return "", True + data = text.encode("utf-8") + if len(data) <= max_bytes: + return text, False + return data[:max_bytes].decode("utf-8", errors="ignore"), True + + +def _run_command( + policy: ShellPolicy, + cmd: str, + cwd: str | None, + env: dict[str, str] | None, + timeout_ms: int | None, +) -> ShellResult: + tokens = shlex.split(cmd) + name = _command_name(tokens) + _ensure_allowed(policy, name) + + resolved_cwd = _resolve_cwd(policy, cwd) + resolved_env = _build_env(policy, env) + + timeout = policy.timeout_ms / 1000 + if timeout_ms is not None: + if timeout_ms <= 0: + raise ValueError("timeout_ms must be positive") + timeout = min(timeout_ms / 1000, timeout) + + result = subprocess.run( + tokens, + cwd=resolved_cwd, + env=resolved_env, + text=True, + capture_output=True, + timeout=timeout, + check=False, + ) + + stdout, truncated_stdout = _truncate(result.stdout, policy.max_output_bytes) + stderr, truncated_stderr = _truncate(result.stderr, policy.max_output_bytes) + + return ShellResult( + stdout=stdout, + stderr=stderr, + exit_code=result.returncode, + truncated_stdout=truncated_stdout, + truncated_stderr=truncated_stderr, + ) + + +def create_server(policy: ShellPolicy) -> MCPServer: + mcp = MCPServer("loopy-shell") + + @mcp.tool() + def shell_run( + cmd: str, + cwd: str | None = None, + env: dict[str, str] | None = None, + timeout_ms: int | None = None, + ) -> ShellResult: + return _run_command(policy, cmd, cwd, env, timeout_ms) + + return mcp From a3d743a6ffc5462c2d09041ff119c810135f0520 Mon Sep 17 00:00:00 2001 From: Tanmay Gupta Date: Sat, 31 Jan 2026 18:12:07 -0800 Subject: [PATCH 2/2] simplify mcp policy --- mcp/loopy-shell.policy.toml | 43 +++++++++++++++++++++---------------- mcp/policy.py | 27 ++++++----------------- mcp/server.py | 26 +++------------------- 3 files changed, 35 insertions(+), 61 deletions(-) diff --git a/mcp/loopy-shell.policy.toml b/mcp/loopy-shell.policy.toml index 8476d84..2298c8a 100644 --- a/mcp/loopy-shell.policy.toml +++ b/mcp/loopy-shell.policy.toml @@ -2,24 +2,31 @@ root_dir = ".." timeout_ms = 120000 max_output_bytes = 50000 -[allow] -commands = [ - "git", +allowed_commands = [ "ls", - "pwd", - "rg", "cat", - "python", - "uv", - "make", - "node", - "npm", - "pnpm", - "bun", + "grep", + "find", + "tree", + "du", + "pwd", + "cd", + "echo", + "printf", + "split", + "mv", + "cp", + "ln", + "readlink", + "rm", + "mkdir", + "touch", + "write", + "sed", + "help", + "info", + "head", + "tail", + "wc", + "sort", ] - -[deny] -commands = ["rm", "shutdown", "reboot", "mkfs", "dd"] - -[env] -allowlist = ["PATH", "HOME", "LANG", "LC_ALL"] diff --git a/mcp/policy.py b/mcp/policy.py index 7284de4..09daab8 100644 --- a/mcp/policy.py +++ b/mcp/policy.py @@ -11,9 +11,7 @@ class ShellPolicy: root_dir: Path timeout_ms: int max_output_bytes: int - allow_commands: set[str] - deny_commands: set[str] - env_allowlist: set[str] + allowed_commands: set[str] def _load_toml(path: Path) -> dict: @@ -21,18 +19,11 @@ def _load_toml(path: Path) -> dict: return tomllib.load(handle) -def _parse_commands(config: dict, key: str) -> set[str]: - value = config.get(key, {}) - commands = value.get("commands", []) +def _parse_allowed_commands(config: dict) -> set[str]: + commands = config.get("allowed_commands", []) return {str(cmd).strip() for cmd in commands if str(cmd).strip()} -def _parse_env_allowlist(config: dict) -> set[str]: - env_config = config.get("env", {}) - allowlist = env_config.get("allowlist", []) - return {str(name).strip() for name in allowlist if str(name).strip()} - - def _resolve_root_dir(raw: str | None, base_dir: Path) -> Path: if not raw: return base_dir.resolve() @@ -55,18 +46,14 @@ def load_policy(path: Path | None) -> ShellPolicy: timeout_ms = int(config.get("timeout_ms", 120000)) max_output_bytes = int(config.get("max_output_bytes", 50000)) - allow_commands = _parse_commands(config, "allow") - deny_commands = _parse_commands(config, "deny") - env_allowlist = _parse_env_allowlist(config) + allowed_commands = _parse_allowed_commands(config) - if not allow_commands: - raise ValueError("policy must define allow.commands") + if not allowed_commands: + raise ValueError("policy must define allowed_commands") return ShellPolicy( root_dir=root_dir, timeout_ms=timeout_ms, max_output_bytes=max_output_bytes, - allow_commands=allow_commands, - deny_commands=deny_commands, - env_allowlist=env_allowlist, + allowed_commands=allowed_commands, ) diff --git a/mcp/server.py b/mcp/server.py index 32e6fb8..eafe3d6 100644 --- a/mcp/server.py +++ b/mcp/server.py @@ -27,9 +27,7 @@ def _command_name(tokens: list[str]) -> str: def _ensure_allowed(policy: ShellPolicy, name: str) -> None: - if name in policy.deny_commands: - raise ValueError(f"command denied: {name}") - if name not in policy.allow_commands: + if name not in policy.allowed_commands: raise ValueError(f"command not allowed: {name}") @@ -48,22 +46,6 @@ def _resolve_cwd(policy: ShellPolicy, cwd: str | None) -> Path: return resolved -def _build_env(policy: ShellPolicy, extra: dict[str, str] | None) -> dict[str, str]: - env = { - name: value - for name, value in os.environ.items() - if name in policy.env_allowlist - } - if not extra: - return env - - for name in extra: - if name not in policy.env_allowlist: - raise ValueError(f"env var not allowed: {name}") - env.update({name: str(value) for name, value in extra.items()}) - return env - - def _truncate(text: str, max_bytes: int) -> tuple[str, bool]: if max_bytes <= 0: return "", True @@ -77,7 +59,6 @@ def _run_command( policy: ShellPolicy, cmd: str, cwd: str | None, - env: dict[str, str] | None, timeout_ms: int | None, ) -> ShellResult: tokens = shlex.split(cmd) @@ -85,7 +66,7 @@ def _run_command( _ensure_allowed(policy, name) resolved_cwd = _resolve_cwd(policy, cwd) - resolved_env = _build_env(policy, env) + resolved_env = dict(os.environ) timeout = policy.timeout_ms / 1000 if timeout_ms is not None: @@ -122,9 +103,8 @@ def create_server(policy: ShellPolicy) -> MCPServer: def shell_run( cmd: str, cwd: str | None = None, - env: dict[str, str] | None = None, timeout_ms: int | None = None, ) -> ShellResult: - return _run_command(policy, cmd, cwd, env, timeout_ms) + return _run_command(policy, cmd, cwd, timeout_ms) return mcp