diff --git a/src/agent_scan/inspect.py b/src/agent_scan/inspect.py index 7b94ce1e..32a60909 100644 --- a/src/agent_scan/inspect.py +++ b/src/agent_scan/inspect.py @@ -1,3 +1,4 @@ +import glob as glob_module import logging import os import traceback @@ -92,10 +93,12 @@ async def get_mcp_config_per_client(client: CandidateClient) -> ClientToInspect # parse skills dirs skills_dirs: dict[str, list[tuple[str, SkillServer]] | FileNotFoundConfig] = {} for skills_dir_path in client.skills_dir_paths: - if os.path.exists(os.path.expanduser(skills_dir_path)): - skills_dirs[skills_dir_path] = inspect_skills_dir(skills_dir_path) - else: + expanded = glob_module.glob(os.path.expanduser(skills_dir_path)) + if not expanded: skills_dirs[skills_dir_path] = FileNotFoundConfig(message=f"Skills dir {skills_dir_path} does not exist") + else: + for resolved_path in expanded: + skills_dirs[resolved_path] = inspect_skills_dir(resolved_path) return ClientToInspect( name=client.name, diff --git a/src/agent_scan/well_known_clients.py b/src/agent_scan/well_known_clients.py index b1a7ac2b..e851f357 100644 --- a/src/agent_scan/well_known_clients.py +++ b/src/agent_scan/well_known_clients.py @@ -1,3 +1,4 @@ +import json import logging import os import re @@ -44,7 +45,11 @@ name="claude code", client_exists_paths=["~/.claude"], mcp_config_paths=["~/.claude.json"], - skills_dir_paths=["~/.claude/skills"], + skills_dir_paths=[ + "~/.claude/skills", + # Glob fallback for plugin skills in case installed_plugins.json is unavailable + "~/.claude/plugins/cache/*/*/*", + ], ), CandidateClient( name="gemini cli", @@ -111,7 +116,11 @@ name="claude code", client_exists_paths=["~/.claude"], mcp_config_paths=["~/.claude.json"], - skills_dir_paths=["~/.claude/skills"], + skills_dir_paths=[ + "~/.claude/skills", + # Installed plugin skills: plugins/cache/{marketplace}/{plugin-name}/{version}/ + "~/.claude/plugins/cache/*/*/*", + ], ), CandidateClient( name="gemini cli", @@ -185,7 +194,11 @@ name="claude code", client_exists_paths=["~/.claude"], mcp_config_paths=["~/.claude.json"], - skills_dir_paths=["~/.claude/skills"], + skills_dir_paths=[ + "~/.claude/skills", + # Installed plugin skills: plugins/cache/{marketplace}/{plugin-name}/{version}/ + "~/.claude/plugins/cache/*/*/*", + ], ), CandidateClient( name="gemini cli", @@ -220,11 +233,108 @@ ] +def discover_cowork_skills_dirs() -> list[str]: + """ + Dynamically discover skill directories created by Claude Cowork (DXT plugin sessions). + + Cowork stores skills in two places that the static well-known-clients list cannot + enumerate at import time because they depend on runtime-generated UUIDs: + + 1. DXT session directories + ~/Library/Application Support/Claude/local-agent-mode-sessions/skills-plugin/ + {dxt-uuid}/ + {session-uuid}/ <- newest mtime = active session + skills/ <- each subdir here is one skill (contains SKILL.md) + + 2. Plugin install registry (~/.claude/plugins/installed_plugins.json) + Each plugin entry has an installPath that may contain a skills/ subdirectory. + + Returns a list of absolute paths to `skills/` directories that can be fed + directly into CandidateClient.skills_dir_paths. + """ + discovered: list[str] = [] + + # ── 1. DXT session skills ──────────────────────────────────────────────── + skills_plugin_base = os.path.expanduser( + "~/Library/Application Support/Claude/local-agent-mode-sessions/skills-plugin" + ) + if os.path.isdir(skills_plugin_base): + for dxt_uuid in os.listdir(skills_plugin_base): + dxt_dir = os.path.join(skills_plugin_base, dxt_uuid) + if not os.path.isdir(dxt_dir): + continue + session_dirs = [ + os.path.join(dxt_dir, s) + for s in os.listdir(dxt_dir) + if os.path.isdir(os.path.join(dxt_dir, s)) + ] + if not session_dirs: + continue + + def _manifest_mtime(d: str) -> float: + m = os.path.join(d, "manifest.json") + return os.path.getmtime(m) if os.path.isfile(m) else os.path.getmtime(d) + + active_session = max(session_dirs, key=_manifest_mtime) + skills_dir = os.path.join(active_session, "skills") + if os.path.isdir(skills_dir): + logger.debug("Cowork DXT skills dir discovered: %s", skills_dir) + discovered.append(skills_dir) + + # ── 2. Plugin-installed skills (installed_plugins.json) ────────────────── + plugins_json = os.path.expanduser("~/.claude/plugins/installed_plugins.json") + if os.path.isfile(plugins_json): + try: + with open(plugins_json) as f: + data = json.load(f) + home = os.path.expanduser("~") + for plugin_key, installs in data.get("plugins", {}).items(): + for install in installs: + raw_path = install.get("installPath", "") + if not raw_path: + continue + # Normalise the path – it may be stored as an absolute path + # under a *different* user home (e.g. from inside the VM) or + # as a session-relative "mnt/.claude/…" prefix. + parts = raw_path.split(os.sep) + if len(parts) >= 3 and parts[1] == "Users": + install_path = os.path.join(home, *parts[3:]) + elif raw_path.startswith("mnt/.claude/"): + install_path = os.path.join(home, ".claude", raw_path[len("mnt/.claude/"):]) + elif os.path.isabs(raw_path): + install_path = raw_path + else: + install_path = os.path.join(os.path.dirname(plugins_json), raw_path) + skills_dir = os.path.join(install_path, "skills") + if os.path.isdir(skills_dir): + logger.debug( + "Cowork plugin skills dir discovered for %s: %s", plugin_key, skills_dir + ) + discovered.append(skills_dir) + except Exception: + logger.exception("Failed to read %s while discovering Cowork skills", plugins_json) + + return discovered + + def get_well_known_clients() -> list[CandidateClient]: if sys.platform == "linux" or sys.platform == "linux2": return LINUX_WELL_KNOWN_CLIENTS elif sys.platform == "darwin": - return MACOS_WELL_KNOWN_CLIENTS + clients: list[CandidateClient] = list(MACOS_WELL_KNOWN_CLIENTS) + cowork_skills_dirs = discover_cowork_skills_dirs() + if cowork_skills_dirs: + clients.append( + CandidateClient( + name="cowork", + client_exists_paths=[ + "~/Library/Application Support/Claude/local-agent-mode-sessions" + ], + mcp_config_paths=[], + skills_dir_paths=cowork_skills_dirs, + ) + ) + return clients elif sys.platform == "win32": return WINDOWS_WELL_KNOWN_CLIENTS else: diff --git a/tests/unit/test_skills_glob.py b/tests/unit/test_skills_glob.py new file mode 100644 index 00000000..42dc9968 --- /dev/null +++ b/tests/unit/test_skills_glob.py @@ -0,0 +1,265 @@ +"""Tests for glob expansion of skills_dir_paths and discover_cowork_skills_dirs.""" + +import json +import os +import tempfile +import time +import uuid + +import pytest + +from agent_scan.inspect import get_mcp_config_per_client +from agent_scan.models import CandidateClient, FileNotFoundConfig +from agent_scan.well_known_clients import discover_cowork_skills_dirs + + +SKILL_MD_CONTENT = """\ +--- +name: test-skill +description: A test skill for unit testing glob expansion. +--- + +# Test Skill + +This skill exists only for testing purposes. +""" + + +def _make_skill_dir(parent: str, skill_name: str = "my-skill") -> str: + """Create a minimal skill directory with a SKILL.md inside parent.""" + skill_path = os.path.join(parent, skill_name) + os.makedirs(skill_path, exist_ok=True) + with open(os.path.join(skill_path, "SKILL.md"), "w", encoding="utf-8") as f: + f.write(SKILL_MD_CONTENT) + return skill_path + + +@pytest.mark.asyncio +async def test_glob_expansion_finds_nested_dxt_skills(): + """Skills under {uuid1}/{uuid2}/skills/ are discovered when the path uses wildcards.""" + with tempfile.TemporaryDirectory() as tmpdir: + dxt_uuid = str(uuid.uuid4()) + session_uuid = str(uuid.uuid4()) + skills_dir = os.path.join(tmpdir, dxt_uuid, session_uuid, "skills") + os.makedirs(skills_dir, exist_ok=True) + _make_skill_dir(skills_dir, "my-dxt-skill") + + glob_pattern = os.path.join(tmpdir, "*", "*", "skills") + # Use a real directory for client_exists_paths + client = CandidateClient( + name="test-client", + client_exists_paths=[tmpdir], + mcp_config_paths=[], + skills_dir_paths=[glob_pattern], + ) + + result = await get_mcp_config_per_client(client) + + assert result is not None + # The resolved path (not the pattern) should be the key + assert glob_pattern not in result.skills_dirs + assert skills_dir in result.skills_dirs + entries = result.skills_dirs[skills_dir] + assert not isinstance(entries, FileNotFoundConfig) + skill_names = [name for name, _ in entries] + assert "my-dxt-skill" in skill_names + + +@pytest.mark.asyncio +async def test_glob_expansion_missing_path_returns_file_not_found(): + """A pattern that matches nothing produces a FileNotFoundConfig entry.""" + with tempfile.TemporaryDirectory() as tmpdir: + glob_pattern = os.path.join(tmpdir, "*", "*", "skills") + client = CandidateClient( + name="test-client", + client_exists_paths=[tmpdir], + mcp_config_paths=[], + skills_dir_paths=[glob_pattern], + ) + + result = await get_mcp_config_per_client(client) + + assert result is not None + assert glob_pattern in result.skills_dirs + assert isinstance(result.skills_dirs[glob_pattern], FileNotFoundConfig) + + +@pytest.mark.asyncio +async def test_glob_expansion_plain_path_still_works(): + """A plain (non-glob) path continues to work as before.""" + with tempfile.TemporaryDirectory() as tmpdir: + skills_dir = os.path.join(tmpdir, "skills") + os.makedirs(skills_dir, exist_ok=True) + _make_skill_dir(skills_dir, "plain-skill") + + client = CandidateClient( + name="test-client", + client_exists_paths=[tmpdir], + mcp_config_paths=[], + skills_dir_paths=[skills_dir], + ) + + result = await get_mcp_config_per_client(client) + + assert result is not None + assert skills_dir in result.skills_dirs + entries = result.skills_dirs[skills_dir] + assert not isinstance(entries, FileNotFoundConfig) + skill_names = [name for name, _ in entries] + assert "plain-skill" in skill_names + + +@pytest.mark.asyncio +async def test_glob_expansion_multiple_matches(): + """Multiple directories matching a glob pattern are all discovered.""" + with tempfile.TemporaryDirectory() as tmpdir: + plugin_pattern = os.path.join(tmpdir, "*", "*", "*") + # Create two plugin skill directories + for marketplace in ("official", "community"): + for plugin in ("plugin-a", "plugin-b"): + version_dir = os.path.join(tmpdir, marketplace, plugin, "1.0.0") + os.makedirs(version_dir, exist_ok=True) + _make_skill_dir(version_dir, "skill") + + client = CandidateClient( + name="test-client", + client_exists_paths=[tmpdir], + mcp_config_paths=[], + skills_dir_paths=[plugin_pattern], + ) + + result = await get_mcp_config_per_client(client) + + assert result is not None + # Pattern itself should not be a key; resolved dirs should be + assert plugin_pattern not in result.skills_dirs + resolved_paths = list(result.skills_dirs.keys()) + assert len(resolved_paths) == 4 # official/{a,b}/1.0.0 + community/{a,b}/1.0.0 + + +# ── discover_cowork_skills_dirs tests ──────────────────────────────────────── + + +def test_discover_cowork_dxt_picks_newest_session(monkeypatch, tmp_path): + """Only the session with the newest manifest.json mtime is returned per DXT UUID.""" + skills_plugin_base = tmp_path / "skills-plugin" + dxt_dir = skills_plugin_base / str(uuid.uuid4()) + + old_session = dxt_dir / "old-session" + new_session = dxt_dir / "new-session" + for session in (old_session, new_session): + (session / "skills").mkdir(parents=True) + (session / "manifest.json").write_text("{}") + + # Make old_session clearly older + old_time = time.time() - 1000 + os.utime(old_session / "manifest.json", (old_time, old_time)) + + _real = os.path.expanduser + monkeypatch.setattr( + "agent_scan.well_known_clients.os.path.expanduser", + lambda p: str(skills_plugin_base) if "local-agent-mode-sessions" in p + else str(tmp_path / "nonexistent_plugins.json") if "installed_plugins.json" in p + else _real(p), + ) + + result = discover_cowork_skills_dirs() + assert len(result) == 1 + assert result[0] == str(new_session / "skills") + + +def test_discover_cowork_dxt_skips_session_without_skills_dir(monkeypatch, tmp_path): + """Sessions whose skills/ dir doesn't exist are skipped.""" + skills_plugin_base = tmp_path / "skills-plugin" + dxt_dir = skills_plugin_base / str(uuid.uuid4()) + session = dxt_dir / "session-no-skills" + session.mkdir(parents=True) + (session / "manifest.json").write_text("{}") + # No skills/ subdir created + + _real = os.path.expanduser + monkeypatch.setattr( + "agent_scan.well_known_clients.os.path.expanduser", + lambda p: str(skills_plugin_base) if "local-agent-mode-sessions" in p + else str(tmp_path / "nonexistent_plugins.json") if "installed_plugins.json" in p + else _real(p), + ) + + result = discover_cowork_skills_dirs() + assert result == [] + + +def test_discover_cowork_plugins_json(monkeypatch, tmp_path): + """Skills discovered via installed_plugins.json installPath are returned.""" + plugin_install = tmp_path / "my-plugin" + skills_dir = plugin_install / "skills" + skills_dir.mkdir(parents=True) + + plugins_json = tmp_path / "installed_plugins.json" + plugins_json.write_text(json.dumps({ + "plugins": { + "my-plugin": [{"installPath": str(plugin_install)}] + } + })) + + _real = os.path.expanduser + + def fake_expanduser(p: str) -> str: + if "local-agent-mode-sessions" in p: + return str(tmp_path / "nonexistent") + if "installed_plugins.json" in p: + return str(plugins_json) + return _real(p) + + monkeypatch.setattr("agent_scan.well_known_clients.os.path.expanduser", fake_expanduser) + + result = discover_cowork_skills_dirs() + assert str(skills_dir) in result + + +def test_discover_cowork_plugins_json_missing_skills_subdir(monkeypatch, tmp_path): + """A plugin installPath without a skills/ subdir is skipped.""" + plugin_install = tmp_path / "no-skills-plugin" + plugin_install.mkdir() + # No skills/ subdir + + plugins_json = tmp_path / "installed_plugins.json" + plugins_json.write_text(json.dumps({ + "plugins": { + "no-skills-plugin": [{"installPath": str(plugin_install)}] + } + })) + + _real = os.path.expanduser + + def fake_expanduser(p: str) -> str: + if "local-agent-mode-sessions" in p: + return str(tmp_path / "nonexistent") + if "installed_plugins.json" in p: + return str(plugins_json) + return _real(p) + + monkeypatch.setattr("agent_scan.well_known_clients.os.path.expanduser", fake_expanduser) + + result = discover_cowork_skills_dirs() + assert result == [] + + +def test_discover_cowork_plugins_json_corrupt(monkeypatch, tmp_path): + """A corrupt installed_plugins.json is handled gracefully (no exception raised).""" + plugins_json = tmp_path / "installed_plugins.json" + plugins_json.write_text("not valid json{{{") + + _real = os.path.expanduser + + def fake_expanduser(p: str) -> str: + if "local-agent-mode-sessions" in p: + return str(tmp_path / "nonexistent") + if "installed_plugins.json" in p: + return str(plugins_json) + return _real(p) + + monkeypatch.setattr("agent_scan.well_known_clients.os.path.expanduser", fake_expanduser) + + result = discover_cowork_skills_dirs() + assert result == []