Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 32 additions & 38 deletions src/agent_scan/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,59 +58,53 @@ async def inspect_pipeline(
home_dirs_with_users = get_readable_home_directories(all_users=inspect_args.all_users)
all_usernames: list[str] = [username for _path, username in home_dirs_with_users]

# fetch clients to inspect
# fetch clients to inspect, building scan_path_results for paths that don't exist
scan_path_results: list[ScanPathResult] = []
clients_to_inspect: list[ClientToInspect] = []
if inspect_args.paths:
clients_to_inspect = [
cti
for path in inspect_args.paths
for cti in await client_to_inspect_from_path(path, True, home_dirs_with_users, inspect_args.scan_skills)
]
for path in inspect_args.paths:
ctis = await client_to_inspect_from_path(path, True, home_dirs_with_users, inspect_args.scan_skills)
if ctis:
clients_to_inspect.extend(ctis)
else:
scan_path_results.append(
ScanPathResult(
path=path,
client=path,
servers=[],
issues=[],
labels=[],
error=ScanError(
message="File or folder not found", is_failure=False, category="file_not_found"
),
)
)
else:
clients_to_inspect = [
cti
for client in get_well_known_clients()
for cti in await get_mcp_config_per_client(client, home_dirs_with_users)
]
for client in get_well_known_clients():
ctis = await get_mcp_config_per_client(client, home_dirs_with_users)
if ctis:
clients_to_inspect.extend(ctis)
else:
logger.info(f"Client {client.name} does not exist on this machine. {client.client_exists_paths}")

# Only report usernames where an agent was detected in their home directory.
# When no usernames were associated with detected agents:
# - Discovery mode with --scan-all-users: fall back to all readable usernames.
# - Otherwise (explicit paths or single-user mode): fall back to the current OS user only,
# to avoid disclosing unrelated usernames on the machine.
detected_usernames: list[str] = sorted(
{cti.username for cti in clients_to_inspect if cti is not None and cti.username is not None}
)
detected_usernames: list[str] = sorted({cti.username for cti in clients_to_inspect if cti.username is not None})
if detected_usernames:
scanned_usernames = detected_usernames
elif not inspect_args.paths and inspect_args.all_users:
scanned_usernames = all_usernames
else:
scanned_usernames = [getpass.getuser()]
# inspect
scan_path_results: list[ScanPathResult] = []
for i, client_to_inspect in enumerate(clients_to_inspect):
if client_to_inspect is None and inspect_args.paths:
scan_path_results.append(
ScanPathResult(
path=inspect_args.paths[i],
client=inspect_args.paths[i],
servers=[],
issues=[],
labels=[],
error=ScanError(message="File or folder not found", is_failure=False, category="file_not_found"),
)
)
continue
elif client_to_inspect is None:
logger.info(
f"Client {get_well_known_clients()[i].name} does not exist os this machine. {get_well_known_clients()[i].client_exists_paths}"
)
continue
else:
inspected_client = await inspect_client(
client_to_inspect, inspect_args.timeout, inspect_args.tokens, inspect_args.scan_skills
)
scan_path_results.append(inspected_client_to_scan_path_result(inspected_client))
for client_to_inspect in clients_to_inspect:
inspected_client = await inspect_client(
client_to_inspect, inspect_args.timeout, inspect_args.tokens, inspect_args.scan_skills
)
scan_path_results.append(inspected_client_to_scan_path_result(inspected_client))
return scan_path_results, scanned_usernames


Expand Down
3 changes: 3 additions & 0 deletions src/agent_scan/printer.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,9 @@ def print_scan_result(
full_description: bool = False,
args=None,
) -> None:
if not result:
rich.print("No MCP client configurations found on this machine.")
return
if not internal_issues:
for res in result:
res.issues = [issue for issue in res.issues if issue.code not in ["W003", "W004", "W005", "W006"]]
Expand Down
4 changes: 0 additions & 4 deletions src/agent_scan/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,6 @@ async def upload(
scan_context: Optional dict containing scan context metadata to include in upload
scanned_usernames: List of usernames detected during the scan. When provided, used instead of the current OS username.
"""
if not results:
logger.info("No scan results to upload")
return

additional_headers = additional_headers or {}

user_info = ScanUserInfo(
Expand Down
24 changes: 24 additions & 0 deletions tests/unit/test_control_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,30 @@ async def test_upload_exponential_backoff():
assert sleep_calls == [1, 2] # Exponential: 1s, 2s


@pytest.mark.asyncio
async def test_upload_sends_payload_when_results_empty():
"""
When results is empty (no MCP configs on machine), upload should still POST
with user_info so the control server can register the machine.
"""
mock_http_response = AsyncMock(status=200)
mock_http_response.json.return_value = []

mock_post_context_manager = AsyncMock()
mock_post_context_manager.__aenter__.return_value = mock_http_response

with patch("agent_scan.upload.aiohttp.ClientSession.post") as mock_post_method:
mock_post_method.return_value = mock_post_context_manager

await upload([], "https://control.mcp.scan", "test@example.com", scanned_usernames=["alice"])

mock_post_method.assert_called_once()
payload = json.loads(mock_post_method.call_args.kwargs["data"])
assert payload["scan_path_results"] == []
assert payload["scan_user_info"]["identifier"] == "test@example.com"
assert payload["scan_user_info"]["username"] == ["alice"]


@pytest.mark.asyncio
async def test_upload_does_not_retry_on_unexpected_error():
"""
Expand Down
43 changes: 43 additions & 0 deletions tests/unit/test_inspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,49 @@ async def test_inspect_pipeline_deduplicates_usernames_across_clients():
shutil.rmtree(tmp)


@pytest.mark.asyncio
async def test_inspect_pipeline_no_clients_returns_empty_results():
"""When no MCP clients are installed, inspect_pipeline should return empty scan_path_results."""
tmp = tempfile.mkdtemp()
try:
home_dirs = [(Path(tmp) / "alice", "alice")]
(Path(tmp) / "alice").mkdir()

candidate = CandidateClient(
name="nonexistent-client",
client_exists_paths=["~/.nonexistent-client"],
mcp_config_paths=[],
skills_dir_paths=[],
)

with (
patch("agent_scan.pipelines.get_readable_home_directories", return_value=home_dirs),
patch("agent_scan.pipelines.get_well_known_clients", return_value=[candidate]),
):
args = InspectArgs(timeout=10, tokens=[], paths=[])
results, _ = await inspect_pipeline(args)

assert results == []
finally:
shutil.rmtree(tmp)


@pytest.mark.asyncio
async def test_inspect_pipeline_missing_explicit_path_returns_file_not_found_error():
"""When an explicit path doesn't exist, inspect_pipeline should return a file_not_found error result."""
with (
patch("agent_scan.pipelines.get_readable_home_directories", return_value=[]),
patch("agent_scan.pipelines.client_to_inspect_from_path", new_callable=AsyncMock, return_value=[]),
):
args = InspectArgs(timeout=10, tokens=[], paths=["/nonexistent/path.json"])
results, _ = await inspect_pipeline(args)

assert len(results) == 1
assert results[0].path == "/nonexistent/path.json"
assert results[0].error is not None
assert results[0].error.category == "file_not_found"


@pytest.mark.asyncio
async def test_inspect_pipeline_paths_mode_does_not_leak_all_usernames():
"""When using --paths, scanned_usernames should not fall back to all readable usernames."""
Expand Down
Loading