Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions mcp/loopy-shell.policy.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
root_dir = ".."
timeout_ms = 120000
max_output_bytes = 50000

allowed_commands = [
"ls",
"cat",
"grep",
"find",
"tree",
"du",
"pwd",
"cd",
"echo",
"printf",
"split",
"mv",
"cp",
"ln",
"readlink",
"rm",
"mkdir",
"touch",
"write",
"sed",
"help",
"info",
"head",
"tail",
"wc",
"sort",
]
26 changes: 26 additions & 0 deletions mcp/loopy_mcp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from __future__ import annotations

import argparse
from pathlib import Path

from policy import load_policy
from server import create_server


def main() -> None:
parser = argparse.ArgumentParser(description="Loopy MCP shell server")
parser.add_argument(
"--policy",
type=Path,
default=None,
help="Path to policy TOML (defaults to $LOOPY_MCP_POLICY or ./loopy-shell.policy.toml)",
)
args = parser.parse_args()

policy = load_policy(args.policy)
mcp = create_server(policy)
mcp.run()


if __name__ == "__main__":
main()
59 changes: 59 additions & 0 deletions mcp/policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from __future__ import annotations

from dataclasses import dataclass
import os
from pathlib import Path
import tomli as tomllib


@dataclass(frozen=True)
class ShellPolicy:
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

our shellpolicy here is very limited to just the shell commands that we support baesd on README - make suer thats respected

root_dir: Path
timeout_ms: int
max_output_bytes: int
allowed_commands: set[str]


def _load_toml(path: Path) -> dict:
with path.open("rb") as handle:
return tomllib.load(handle)


def _parse_allowed_commands(config: dict) -> set[str]:
commands = config.get("allowed_commands", [])
return {str(cmd).strip() for cmd in commands if str(cmd).strip()}


def _resolve_root_dir(raw: str | None, base_dir: Path) -> Path:
if not raw:
return base_dir.resolve()
root = Path(raw)
if not root.is_absolute():
root = (base_dir / root).resolve()
return root


def load_policy(path: Path | None) -> ShellPolicy:
if path is None:
env_path = os.environ.get("LOOPY_MCP_POLICY")
path = Path(env_path) if env_path else Path.cwd() / "loopy-shell.policy.toml"

if not path.exists():
raise FileNotFoundError(f"policy file not found: {path}")

config = _load_toml(path)
root_dir = _resolve_root_dir(config.get("root_dir"), path.parent)
timeout_ms = int(config.get("timeout_ms", 120000))
max_output_bytes = int(config.get("max_output_bytes", 50000))

allowed_commands = _parse_allowed_commands(config)

if not allowed_commands:
raise ValueError("policy must define allowed_commands")

return ShellPolicy(
root_dir=root_dir,
timeout_ms=timeout_ms,
max_output_bytes=max_output_bytes,
allowed_commands=allowed_commands,
)
16 changes: 16 additions & 0 deletions mcp/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[project]
name = "loopy-shell-mcp"
version = "0.1.0"
description = "MCP shell server for Loopy"
requires-python = ">=3.10"
dependencies = [
"mcp>=1.0.0,<2",
"tomli>=2.0.0",
]

[project.scripts]
loopy-mcp = "loopy_mcp:main"

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
110 changes: 110 additions & 0 deletions mcp/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
from __future__ import annotations

from dataclasses import dataclass
import os
from pathlib import Path
import shlex
import subprocess

from mcp.server.mcpserver import MCPServer

from policy import ShellPolicy


Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how will this change now that we have a file backed feature?

@dataclass(frozen=True)
class ShellResult:
stdout: str
stderr: str
exit_code: int
truncated_stdout: bool
truncated_stderr: bool


def _command_name(tokens: list[str]) -> str:
if not tokens:
raise ValueError("empty command")
return Path(tokens[0]).name


def _ensure_allowed(policy: ShellPolicy, name: str) -> None:
if name not in policy.allowed_commands:
raise ValueError(f"command not allowed: {name}")


def _resolve_cwd(policy: ShellPolicy, cwd: str | None) -> Path:
base = policy.root_dir
if cwd:
candidate = Path(cwd)
if not candidate.is_absolute():
candidate = base / candidate
base = candidate
resolved = base.resolve()
if not resolved.is_dir():
raise ValueError(f"cwd is not a directory: {resolved}")
if not resolved.is_relative_to(policy.root_dir):
raise ValueError(f"cwd is outside root_dir: {resolved}")
return resolved


def _truncate(text: str, max_bytes: int) -> tuple[str, bool]:
if max_bytes <= 0:
return "", True
data = text.encode("utf-8")
if len(data) <= max_bytes:
return text, False
return data[:max_bytes].decode("utf-8", errors="ignore"), True


def _run_command(
policy: ShellPolicy,
cmd: str,
cwd: str | None,
timeout_ms: int | None,
) -> ShellResult:
tokens = shlex.split(cmd)
name = _command_name(tokens)
_ensure_allowed(policy, name)

resolved_cwd = _resolve_cwd(policy, cwd)
resolved_env = dict(os.environ)

timeout = policy.timeout_ms / 1000
if timeout_ms is not None:
if timeout_ms <= 0:
raise ValueError("timeout_ms must be positive")
timeout = min(timeout_ms / 1000, timeout)

result = subprocess.run(
tokens,
cwd=resolved_cwd,
env=resolved_env,
text=True,
capture_output=True,
timeout=timeout,
check=False,
)

stdout, truncated_stdout = _truncate(result.stdout, policy.max_output_bytes)
stderr, truncated_stderr = _truncate(result.stderr, policy.max_output_bytes)

return ShellResult(
stdout=stdout,
stderr=stderr,
exit_code=result.returncode,
truncated_stdout=truncated_stdout,
truncated_stderr=truncated_stderr,
)


def create_server(policy: ShellPolicy) -> MCPServer:
mcp = MCPServer("loopy-shell")

@mcp.tool()
def shell_run(
cmd: str,
cwd: str | None = None,
timeout_ms: int | None = None,
) -> ShellResult:
return _run_command(policy, cmd, cwd, timeout_ms)

return mcp