diff --git a/mcp/loopy-shell.policy.toml b/mcp/loopy-shell.policy.toml new file mode 100644 index 0000000..2298c8a --- /dev/null +++ b/mcp/loopy-shell.policy.toml @@ -0,0 +1,32 @@ +root_dir = ".." +timeout_ms = 120000 +max_output_bytes = 50000 + +allowed_commands = [ + "ls", + "cat", + "grep", + "find", + "tree", + "du", + "pwd", + "cd", + "echo", + "printf", + "split", + "mv", + "cp", + "ln", + "readlink", + "rm", + "mkdir", + "touch", + "write", + "sed", + "help", + "info", + "head", + "tail", + "wc", + "sort", +] diff --git a/mcp/loopy_mcp.py b/mcp/loopy_mcp.py new file mode 100644 index 0000000..64a846a --- /dev/null +++ b/mcp/loopy_mcp.py @@ -0,0 +1,26 @@ +from __future__ import annotations + +import argparse +from pathlib import Path + +from policy import load_policy +from server import create_server + + +def main() -> None: + parser = argparse.ArgumentParser(description="Loopy MCP shell server") + parser.add_argument( + "--policy", + type=Path, + default=None, + help="Path to policy TOML (defaults to $LOOPY_MCP_POLICY or ./loopy-shell.policy.toml)", + ) + args = parser.parse_args() + + policy = load_policy(args.policy) + mcp = create_server(policy) + mcp.run() + + +if __name__ == "__main__": + main() diff --git a/mcp/policy.py b/mcp/policy.py new file mode 100644 index 0000000..09daab8 --- /dev/null +++ b/mcp/policy.py @@ -0,0 +1,59 @@ +from __future__ import annotations + +from dataclasses import dataclass +import os +from pathlib import Path +import tomli as tomllib + + +@dataclass(frozen=True) +class ShellPolicy: + root_dir: Path + timeout_ms: int + max_output_bytes: int + allowed_commands: set[str] + + +def _load_toml(path: Path) -> dict: + with path.open("rb") as handle: + return tomllib.load(handle) + + +def _parse_allowed_commands(config: dict) -> set[str]: + commands = config.get("allowed_commands", []) + return {str(cmd).strip() for cmd in commands if str(cmd).strip()} + + +def _resolve_root_dir(raw: str | None, base_dir: Path) -> Path: + if not raw: + return base_dir.resolve() + root = Path(raw) + if not root.is_absolute(): + root = (base_dir / root).resolve() + return root + + +def load_policy(path: Path | None) -> ShellPolicy: + if path is None: + env_path = os.environ.get("LOOPY_MCP_POLICY") + path = Path(env_path) if env_path else Path.cwd() / "loopy-shell.policy.toml" + + if not path.exists(): + raise FileNotFoundError(f"policy file not found: {path}") + + config = _load_toml(path) + root_dir = _resolve_root_dir(config.get("root_dir"), path.parent) + timeout_ms = int(config.get("timeout_ms", 120000)) + max_output_bytes = int(config.get("max_output_bytes", 50000)) + + allowed_commands = _parse_allowed_commands(config) + + if not allowed_commands: + raise ValueError("policy must define allowed_commands") + + return ShellPolicy( + root_dir=root_dir, + timeout_ms=timeout_ms, + max_output_bytes=max_output_bytes, + allowed_commands=allowed_commands, + ) diff --git a/mcp/pyproject.toml b/mcp/pyproject.toml new file mode 100644 index 0000000..302a4f8 --- /dev/null +++ b/mcp/pyproject.toml @@ -0,0 +1,16 @@ +[project] +name = "loopy-shell-mcp" +version = "0.1.0" +description = "MCP shell server for Loopy" +requires-python = ">=3.10" +dependencies = [ + "mcp>=1.0.0,<2", + "tomli>=2.0.0", +] + +[project.scripts] +loopy-mcp = "loopy_mcp:main" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" diff --git a/mcp/server.py b/mcp/server.py new file mode 100644 index 0000000..eafe3d6 --- /dev/null +++ b/mcp/server.py @@ -0,0 +1,110 @@ +from __future__ import annotations + +from dataclasses import dataclass +import os +from pathlib import Path +import shlex +import subprocess + +from mcp.server.mcpserver import MCPServer + +from policy import ShellPolicy + + +@dataclass(frozen=True) +class ShellResult: + stdout: str + stderr: str + exit_code: int + truncated_stdout: bool + truncated_stderr: bool + + +def _command_name(tokens: list[str]) -> str: + if not tokens: + raise ValueError("empty command") + return Path(tokens[0]).name + + +def _ensure_allowed(policy: ShellPolicy, name: str) -> None: + if name not in policy.allowed_commands: + raise ValueError(f"command not allowed: {name}") + + +def _resolve_cwd(policy: ShellPolicy, cwd: str | None) -> Path: + base = policy.root_dir + if cwd: + candidate = Path(cwd) + if not candidate.is_absolute(): + candidate = base / candidate + base = candidate + resolved = base.resolve() + if not resolved.is_dir(): + raise ValueError(f"cwd is not a directory: {resolved}") + if not resolved.is_relative_to(policy.root_dir): + raise ValueError(f"cwd is outside root_dir: {resolved}") + return resolved + + +def _truncate(text: str, max_bytes: int) -> tuple[str, bool]: + if max_bytes <= 0: + return "", True + data = text.encode("utf-8") + if len(data) <= max_bytes: + return text, False + return data[:max_bytes].decode("utf-8", errors="ignore"), True + + +def _run_command( + policy: ShellPolicy, + cmd: str, + cwd: str | None, + timeout_ms: int | None, +) -> ShellResult: + tokens = shlex.split(cmd) + name = _command_name(tokens) + _ensure_allowed(policy, name) + + resolved_cwd = _resolve_cwd(policy, cwd) + resolved_env = dict(os.environ) + + timeout = policy.timeout_ms / 1000 + if timeout_ms is not None: + if timeout_ms <= 0: + raise ValueError("timeout_ms must be positive") + timeout = min(timeout_ms / 1000, timeout) + + result = subprocess.run( + tokens, + cwd=resolved_cwd, + env=resolved_env, + text=True, + capture_output=True, + timeout=timeout, + check=False, + ) + + stdout, truncated_stdout = _truncate(result.stdout, policy.max_output_bytes) + stderr, truncated_stderr = _truncate(result.stderr, policy.max_output_bytes) + + return ShellResult( + stdout=stdout, + stderr=stderr, + exit_code=result.returncode, + truncated_stdout=truncated_stdout, + truncated_stderr=truncated_stderr, + ) + + +def create_server(policy: ShellPolicy) -> MCPServer: + mcp = MCPServer("loopy-shell") + + @mcp.tool() + def shell_run( + cmd: str, + cwd: str | None = None, + timeout_ms: int | None = None, + ) -> ShellResult: + return _run_command(policy, cmd, cwd, timeout_ms) + + return mcp