diff --git a/tests/test_vibesec.py b/tests/test_vibesec.py index a5a41f9..3fb044c 100644 --- a/tests/test_vibesec.py +++ b/tests/test_vibesec.py @@ -5,7 +5,13 @@ import pytest -from scanner.cli.vibesec import _collect_files, _print_scan_results, _scan_file, cmd_init, cmd_scan +from scanner.cli.vibesec import ( + _collect_files, + _print_scan_results, + _scan_file, + cmd_init, + cmd_scan, +) MOCK_RULES = [ { @@ -81,7 +87,9 @@ def test_scan_file_with_findings(tmp_path): @patch("scanner.cli.vibesec.SCAN_RULES", MOCK_RULES) def test_scan_file_with_multiple_findings(tmp_path): test_file = tmp_path / "unsafe_multiple.js" - test_file.write_text("const key = MOCK_SECRET_KEY;\n// TODO: fix auth checks here\n") + test_file.write_text( + "const key = MOCK_SECRET_KEY;\n// TODO: fix auth checks here\n" + ) findings = _scan_file(test_file, tmp_path) rule_ids = [f["rule_id"] for f in findings] @@ -119,26 +127,34 @@ def test_scan_file_rule_cache_invalidates_when_scan_rules_change(tmp_path): test_file = tmp_path / "unsafe.py" test_file.write_text("FIRST_TOKEN\nSECOND_TOKEN\n") - first_rules = [{ - "id": "first", - "pattern": re.compile(r"FIRST_TOKEN"), - "severity": "HIGH", - "message": "first token", - "extensions": [".py"], - }] - second_rules = [{ - "id": "second", - "pattern": re.compile(r"SECOND_TOKEN"), - "severity": "HIGH", - "message": "second token", - "extensions": [".py"], - }] + first_rules = [ + { + "id": "first", + "pattern": re.compile(r"FIRST_TOKEN"), + "severity": "HIGH", + "message": "first token", + "extensions": [".py"], + } + ] + second_rules = [ + { + "id": "second", + "pattern": re.compile(r"SECOND_TOKEN"), + "severity": "HIGH", + "message": "second token", + "extensions": [".py"], + } + ] with patch("scanner.cli.vibesec.SCAN_RULES", first_rules): - assert [finding["rule_id"] for finding in _scan_file(test_file, tmp_path)] == ["first"] + assert [finding["rule_id"] for finding in _scan_file(test_file, tmp_path)] == [ + "first" + ] with patch("scanner.cli.vibesec.SCAN_RULES", second_rules): - assert [finding["rule_id"] for finding in _scan_file(test_file, tmp_path)] == ["second"] + assert [finding["rule_id"] for finding in _scan_file(test_file, tmp_path)] == [ + "second" + ] def test_collect_files(): @@ -156,7 +172,9 @@ def test_collect_files(): (base_path / "package.lock").touch() collected_files = list(_collect_files(base_path)) - collected_rel_paths = {f.relative_to(base_path).as_posix() for f in collected_files} + collected_rel_paths = { + f.relative_to(base_path).as_posix() for f in collected_files + } assert collected_rel_paths == {"src/main.py", "src/utils.js", "README.md"} assert "node_modules/index.js" not in collected_rel_paths @@ -171,7 +189,9 @@ def test_collect_files_skips_file_symlink(tmp_path): link = tmp_path / "linked.py" _create_symlink(target, link) - collected_rel_paths = {f.relative_to(tmp_path).as_posix() for f in _collect_files(tmp_path)} + collected_rel_paths = { + f.relative_to(tmp_path).as_posix() for f in _collect_files(tmp_path) + } assert "target.py" in collected_rel_paths assert "linked.py" not in collected_rel_paths @@ -184,7 +204,9 @@ def test_collect_files_skips_dir_symlink(tmp_path): link = tmp_path / "linked_dir" _create_symlink(real_dir, link, target_is_directory=True) - collected_rel_paths = {f.relative_to(tmp_path).as_posix() for f in _collect_files(tmp_path)} + collected_rel_paths = { + f.relative_to(tmp_path).as_posix() for f in _collect_files(tmp_path) + } assert "real/nested.py" in collected_rel_paths assert "linked_dir/nested.py" not in collected_rel_paths @@ -207,7 +229,9 @@ def test_collect_files_handles_cyclic_symlink(tmp_path): _create_symlink(dir_b, dir_a / "to_b", target_is_directory=True) _create_symlink(dir_a, dir_b / "to_a", target_is_directory=True) - collected_rel_paths = {f.relative_to(tmp_path).as_posix() for f in _collect_files(tmp_path)} + collected_rel_paths = { + f.relative_to(tmp_path).as_posix() for f in _collect_files(tmp_path) + } assert collected_rel_paths == {"a/a.py", "b/b.py"} @@ -243,14 +267,16 @@ def test_print_scan_results_empty(capsys): def test_print_scan_results_critical(capsys): - findings = [{ - "severity": "CRITICAL", - "file": "app/page.tsx", - "line": 10, - "rule_id": "VSEC-001", - "message": "Found a critical issue", - "snippet": "const secret = 'abc';", - }] + findings = [ + { + "severity": "CRITICAL", + "file": "app/page.tsx", + "line": 10, + "rule_id": "VSEC-001", + "message": "Found a critical issue", + "snippet": "const secret = 'abc';", + } + ] _print_scan_results(findings, 2) captured = capsys.readouterr() @@ -260,18 +286,23 @@ def test_print_scan_results_critical(capsys): assert "Code: const secret = 'abc';" in captured.out assert "🔴 1 critical" in captured.out assert "❌ Critical issues found. Fix before deploying." in captured.out - assert "💡 Run 'vibesec review' to get an AI prompt for fixing these issues." in captured.out + assert ( + "💡 Run 'vibesec review' to get an AI prompt for fixing these issues." + in captured.out + ) def test_print_scan_results_high(capsys): - findings = [{ - "severity": "HIGH", - "file": "app/api/route.ts", - "line": 5, - "rule_id": "VSEC-002", - "message": "Found a high issue", - "snippet": "export async function GET() {}", - }] + findings = [ + { + "severity": "HIGH", + "file": "app/api/route.ts", + "line": 5, + "rule_id": "VSEC-002", + "message": "Found a high issue", + "snippet": "export async function GET() {}", + } + ] _print_scan_results(findings, 3) captured = capsys.readouterr() @@ -281,14 +312,16 @@ def test_print_scan_results_high(capsys): def test_print_scan_results_warnings_only(capsys): - findings = [{ - "severity": "WARNING", - "file": "utils.ts", - "line": 1, - "rule_id": "VSEC-003", - "message": "Found a warning", - "snippet": "console.log(data);", - }] + findings = [ + { + "severity": "WARNING", + "file": "utils.ts", + "line": 1, + "rule_id": "VSEC-003", + "message": "Found a warning", + "snippet": "console.log(data);", + } + ] _print_scan_results(findings, 1) captured = capsys.readouterr() @@ -299,10 +332,38 @@ def test_print_scan_results_warnings_only(capsys): def test_print_scan_results_sorting(capsys): findings = [ - {"severity": "INFO", "file": "info.ts", "line": 1, "rule_id": "VSEC-004", "message": "Info message", "snippet": "info"}, - {"severity": "CRITICAL", "file": "crit.ts", "line": 2, "rule_id": "VSEC-001", "message": "Crit message", "snippet": "crit"}, - {"severity": "HIGH", "file": "high.ts", "line": 3, "rule_id": "VSEC-002", "message": "High message", "snippet": "high"}, - {"severity": "WARNING", "file": "warn.ts", "line": 4, "rule_id": "VSEC-003", "message": "Warn message", "snippet": "warn"}, + { + "severity": "INFO", + "file": "info.ts", + "line": 1, + "rule_id": "VSEC-004", + "message": "Info message", + "snippet": "info", + }, + { + "severity": "CRITICAL", + "file": "crit.ts", + "line": 2, + "rule_id": "VSEC-001", + "message": "Crit message", + "snippet": "crit", + }, + { + "severity": "HIGH", + "file": "high.ts", + "line": 3, + "rule_id": "VSEC-002", + "message": "High message", + "snippet": "high", + }, + { + "severity": "WARNING", + "file": "warn.ts", + "line": 4, + "rule_id": "VSEC-003", + "message": "Warn message", + "snippet": "warn", + }, ] _print_scan_results(findings, 4) out = capsys.readouterr().out @@ -359,7 +420,10 @@ def test_cmd_init_claude_code_skip(tmp_path, monkeypatch, capsys): cmd_init(Args(tool="claude-code")) assert claude_file.read_text() == "VibeSec existing rules\n" - assert "CLAUDE.md already contains VibeSec rules — skipping." in capsys.readouterr().out + assert ( + "CLAUDE.md already contains VibeSec rules — skipping." + in capsys.readouterr().out + ) def test_cmd_init_windsurf(tmp_path, monkeypatch, capsys): @@ -403,6 +467,7 @@ def test_cmd_init_supabase_stack(tmp_path, monkeypatch, capsys): def test_sanitize_terminal_output(): from scanner.cli.vibesec import _sanitize_terminal_output + # Test normal strings assert _sanitize_terminal_output("normal string") == "normal string" assert _sanitize_terminal_output("tabs\tare\tallowed") == "tabs\tare\tallowed" @@ -416,3 +481,130 @@ def test_sanitize_terminal_output(): # Test non-strings assert _sanitize_terminal_output(None) is None +def test_collect_files_scandir_error(tmp_path): + test_dir = tmp_path / "testdir" + test_dir.mkdir() + + with patch("os.scandir", side_effect=PermissionError("Permission denied")): + assert list(_collect_files(test_dir)) == [] + + with patch("os.scandir", side_effect=OSError("OS error")): + assert list(_collect_files(test_dir)) == [] + + +def test_collect_files_entry_error(tmp_path): + test_dir = tmp_path / "testdir" + test_dir.mkdir() + (test_dir / "file.py").touch() + + import os + + original_scandir = os.scandir + + def mocked_scandir(path): + it = original_scandir(path) + + class MockIterator: + def __enter__(self): + return self + + def __exit__(self, *args): + pass + + def __iter__(self): + for entry in it: + + class MockEntry: + @property + def name(self): + return entry.name + + @property + def path(self): + return entry.path + + def is_symlink(self): + raise PermissionError("Denied") + + def is_dir(self, **kwargs): + return False + + def is_file(self, **kwargs): + return False + + yield MockEntry() + + return MockIterator() + + with patch("os.scandir", side_effect=mocked_scandir): + assert list(_collect_files(test_dir)) == [] + + +def test_scan_file_lstat_error(tmp_path): + test_file = tmp_path / "unsafe.ts" + test_file.write_text("const key = 'x';\n") + + with patch("os.lstat", side_effect=PermissionError("Permission denied")): + assert _scan_file(test_file, tmp_path) == [] + + with patch("os.lstat", side_effect=OSError("OS error")): + assert _scan_file(test_file, tmp_path) == [] + + +def test_scan_file_large_file_skip(tmp_path): + test_file = tmp_path / "unsafe.ts" + test_file.write_text("const key = 'x';\n") + + import os + + original_lstat = os.lstat + + def mocked_lstat(path): + st = original_lstat(path) + + class MockStat: + @property + def st_mode(self): + return st.st_mode + + @property + def st_size(self): + return 10 * 1024 * 1024 + 1 + + return MockStat() + + with patch("os.lstat", side_effect=mocked_lstat): + assert _scan_file(test_file, tmp_path) == [] + + +@patch("scanner.cli.vibesec.SCAN_RULES", MOCK_RULES) +def test_scan_file_read_error_during_iteration(tmp_path): + test_file = tmp_path / "unsafe.ts" + test_file.write_text("const key = MOCK_SECRET_KEY;\n") + + class MockFile: + def __init__(self, *args, **kwargs): + pass + + def __enter__(self): + return self + + def __exit__(self, *args): + pass + + def __iter__(self): + yield "const key = MOCK_SECRET_KEY;\n" + raise OSError("OS error mid-read") + + with patch("pathlib.Path.open", side_effect=MockFile): + findings = _scan_file(test_file, tmp_path) + assert len(findings) == 1 + assert findings[0]["rule_id"] == "mock-secret" + + +def test_scan_file_no_applicable_rules(tmp_path): + test_file = tmp_path / "unsafe.xyz" + test_file.write_text("const key = MOCK_SECRET_KEY;\n") + + with patch("scanner.cli.vibesec._get_applicable_rules", return_value=[]): + assert _scan_file(test_file, tmp_path) == []