diff --git a/tests/scripts/ci/__init__.py b/tests/scripts/ci/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tests/scripts/ci/test_opencode_review_normalize_output.py b/tests/scripts/ci/test_opencode_review_normalize_output.py deleted file mode 100644 index f7f8c18..0000000 --- a/tests/scripts/ci/test_opencode_review_normalize_output.py +++ /dev/null @@ -1,46 +0,0 @@ -import json -import pytest -from unittest.mock import patch - -import sys -from pathlib import Path - -# Add project root to path so we can import scripts -sys.path.insert(0, str(Path(__file__).parent.parent.parent.parent)) - -from scripts.ci.opencode_review_normalize_output import iter_json_objects - - -def test_iter_json_objects_valid_json(): - # Test valid JSON string without prose - text = '{"key": "value"}' - result = iter_json_objects(text) - # The current implementation will find the main json, then scan for `{` - # and find it again. - assert result == [{"key": "value"}, {"key": "value"}] - - -def test_iter_json_objects_invalid_json_with_prose(): - # Test JSON string with surrounding prose - text = 'Here is some text: {"key": "value"} and more text.' - result = iter_json_objects(text) - assert result == [{"key": "value"}] - - -def test_iter_json_objects_json_decode_error_in_try_block(): - # Test error path where json.loads raises JSONDecodeError - # We mock json.loads to force the exception - text = '{"key": "value"}' - with patch( - "json.loads", side_effect=json.JSONDecodeError("Expecting value", "", 0) - ): - result = iter_json_objects(text) - assert result == [{"key": "value"}] - - -def test_iter_json_objects_json_decode_error_in_loop(): - # Test error path where decoder.raw_decode raises JSONDecodeError - # e.g., an incomplete JSON object - text = 'Here is a broken { "key": ' - result = iter_json_objects(text) - assert result == [] diff --git a/tests/scripts/ci/test_pr_review_merge_scheduler.py b/tests/scripts/ci/test_pr_review_merge_scheduler.py new file mode 100644 index 0000000..d286f39 --- /dev/null +++ b/tests/scripts/ci/test_pr_review_merge_scheduler.py @@ -0,0 +1,19 @@ +import pytest +from scripts.ci.pr_review_merge_scheduler import split_repo + +def test_split_repo_valid(): + assert split_repo("owner/name") == ("owner", "name") + assert split_repo("owner/name/extra") == ("owner", "name/extra") + +def test_split_repo_invalid(): + with pytest.raises(ValueError, match="repo must be owner/name, got 'owner'"): + split_repo("owner") + + with pytest.raises(ValueError, match="repo must be owner/name, got '/name'"): + split_repo("/name") + + with pytest.raises(ValueError, match="repo must be owner/name, got 'owner/'"): + split_repo("owner/") + + with pytest.raises(ValueError, match="repo must be owner/name, got '/'"): + split_repo("/") diff --git a/tests/test_vibesec.py b/tests/test_vibesec.py index 3fb044c..a5a41f9 100644 --- a/tests/test_vibesec.py +++ b/tests/test_vibesec.py @@ -5,13 +5,7 @@ import pytest -from scanner.cli.vibesec import ( - _collect_files, - _print_scan_results, - _scan_file, - cmd_init, - cmd_scan, -) +from scanner.cli.vibesec import _collect_files, _print_scan_results, _scan_file, cmd_init, cmd_scan MOCK_RULES = [ { @@ -87,9 +81,7 @@ def test_scan_file_with_findings(tmp_path): @patch("scanner.cli.vibesec.SCAN_RULES", MOCK_RULES) def test_scan_file_with_multiple_findings(tmp_path): test_file = tmp_path / "unsafe_multiple.js" - test_file.write_text( - "const key = MOCK_SECRET_KEY;\n// TODO: fix auth checks here\n" - ) + test_file.write_text("const key = MOCK_SECRET_KEY;\n// TODO: fix auth checks here\n") findings = _scan_file(test_file, tmp_path) rule_ids = [f["rule_id"] for f in findings] @@ -127,34 +119,26 @@ def test_scan_file_rule_cache_invalidates_when_scan_rules_change(tmp_path): test_file = tmp_path / "unsafe.py" test_file.write_text("FIRST_TOKEN\nSECOND_TOKEN\n") - first_rules = [ - { - "id": "first", - "pattern": re.compile(r"FIRST_TOKEN"), - "severity": "HIGH", - "message": "first token", - "extensions": [".py"], - } - ] - second_rules = [ - { - "id": "second", - "pattern": re.compile(r"SECOND_TOKEN"), - "severity": "HIGH", - "message": "second token", - "extensions": [".py"], - } - ] + first_rules = [{ + "id": "first", + "pattern": re.compile(r"FIRST_TOKEN"), + "severity": "HIGH", + "message": "first token", + "extensions": [".py"], + }] + second_rules = [{ + "id": "second", + "pattern": re.compile(r"SECOND_TOKEN"), + "severity": "HIGH", + "message": "second token", + "extensions": [".py"], + }] with patch("scanner.cli.vibesec.SCAN_RULES", first_rules): - assert [finding["rule_id"] for finding in _scan_file(test_file, tmp_path)] == [ - "first" - ] + assert [finding["rule_id"] for finding in _scan_file(test_file, tmp_path)] == ["first"] with patch("scanner.cli.vibesec.SCAN_RULES", second_rules): - assert [finding["rule_id"] for finding in _scan_file(test_file, tmp_path)] == [ - "second" - ] + assert [finding["rule_id"] for finding in _scan_file(test_file, tmp_path)] == ["second"] def test_collect_files(): @@ -172,9 +156,7 @@ def test_collect_files(): (base_path / "package.lock").touch() collected_files = list(_collect_files(base_path)) - collected_rel_paths = { - f.relative_to(base_path).as_posix() for f in collected_files - } + collected_rel_paths = {f.relative_to(base_path).as_posix() for f in collected_files} assert collected_rel_paths == {"src/main.py", "src/utils.js", "README.md"} assert "node_modules/index.js" not in collected_rel_paths @@ -189,9 +171,7 @@ def test_collect_files_skips_file_symlink(tmp_path): link = tmp_path / "linked.py" _create_symlink(target, link) - collected_rel_paths = { - f.relative_to(tmp_path).as_posix() for f in _collect_files(tmp_path) - } + collected_rel_paths = {f.relative_to(tmp_path).as_posix() for f in _collect_files(tmp_path)} assert "target.py" in collected_rel_paths assert "linked.py" not in collected_rel_paths @@ -204,9 +184,7 @@ def test_collect_files_skips_dir_symlink(tmp_path): link = tmp_path / "linked_dir" _create_symlink(real_dir, link, target_is_directory=True) - collected_rel_paths = { - f.relative_to(tmp_path).as_posix() for f in _collect_files(tmp_path) - } + collected_rel_paths = {f.relative_to(tmp_path).as_posix() for f in _collect_files(tmp_path)} assert "real/nested.py" in collected_rel_paths assert "linked_dir/nested.py" not in collected_rel_paths @@ -229,9 +207,7 @@ def test_collect_files_handles_cyclic_symlink(tmp_path): _create_symlink(dir_b, dir_a / "to_b", target_is_directory=True) _create_symlink(dir_a, dir_b / "to_a", target_is_directory=True) - collected_rel_paths = { - f.relative_to(tmp_path).as_posix() for f in _collect_files(tmp_path) - } + collected_rel_paths = {f.relative_to(tmp_path).as_posix() for f in _collect_files(tmp_path)} assert collected_rel_paths == {"a/a.py", "b/b.py"} @@ -267,16 +243,14 @@ def test_print_scan_results_empty(capsys): def test_print_scan_results_critical(capsys): - findings = [ - { - "severity": "CRITICAL", - "file": "app/page.tsx", - "line": 10, - "rule_id": "VSEC-001", - "message": "Found a critical issue", - "snippet": "const secret = 'abc';", - } - ] + findings = [{ + "severity": "CRITICAL", + "file": "app/page.tsx", + "line": 10, + "rule_id": "VSEC-001", + "message": "Found a critical issue", + "snippet": "const secret = 'abc';", + }] _print_scan_results(findings, 2) captured = capsys.readouterr() @@ -286,23 +260,18 @@ def test_print_scan_results_critical(capsys): assert "Code: const secret = 'abc';" in captured.out assert "🔴 1 critical" in captured.out assert "❌ Critical issues found. Fix before deploying." in captured.out - assert ( - "💡 Run 'vibesec review' to get an AI prompt for fixing these issues." - in captured.out - ) + assert "💡 Run 'vibesec review' to get an AI prompt for fixing these issues." in captured.out def test_print_scan_results_high(capsys): - findings = [ - { - "severity": "HIGH", - "file": "app/api/route.ts", - "line": 5, - "rule_id": "VSEC-002", - "message": "Found a high issue", - "snippet": "export async function GET() {}", - } - ] + findings = [{ + "severity": "HIGH", + "file": "app/api/route.ts", + "line": 5, + "rule_id": "VSEC-002", + "message": "Found a high issue", + "snippet": "export async function GET() {}", + }] _print_scan_results(findings, 3) captured = capsys.readouterr() @@ -312,16 +281,14 @@ def test_print_scan_results_high(capsys): def test_print_scan_results_warnings_only(capsys): - findings = [ - { - "severity": "WARNING", - "file": "utils.ts", - "line": 1, - "rule_id": "VSEC-003", - "message": "Found a warning", - "snippet": "console.log(data);", - } - ] + findings = [{ + "severity": "WARNING", + "file": "utils.ts", + "line": 1, + "rule_id": "VSEC-003", + "message": "Found a warning", + "snippet": "console.log(data);", + }] _print_scan_results(findings, 1) captured = capsys.readouterr() @@ -332,38 +299,10 @@ def test_print_scan_results_warnings_only(capsys): def test_print_scan_results_sorting(capsys): findings = [ - { - "severity": "INFO", - "file": "info.ts", - "line": 1, - "rule_id": "VSEC-004", - "message": "Info message", - "snippet": "info", - }, - { - "severity": "CRITICAL", - "file": "crit.ts", - "line": 2, - "rule_id": "VSEC-001", - "message": "Crit message", - "snippet": "crit", - }, - { - "severity": "HIGH", - "file": "high.ts", - "line": 3, - "rule_id": "VSEC-002", - "message": "High message", - "snippet": "high", - }, - { - "severity": "WARNING", - "file": "warn.ts", - "line": 4, - "rule_id": "VSEC-003", - "message": "Warn message", - "snippet": "warn", - }, + {"severity": "INFO", "file": "info.ts", "line": 1, "rule_id": "VSEC-004", "message": "Info message", "snippet": "info"}, + {"severity": "CRITICAL", "file": "crit.ts", "line": 2, "rule_id": "VSEC-001", "message": "Crit message", "snippet": "crit"}, + {"severity": "HIGH", "file": "high.ts", "line": 3, "rule_id": "VSEC-002", "message": "High message", "snippet": "high"}, + {"severity": "WARNING", "file": "warn.ts", "line": 4, "rule_id": "VSEC-003", "message": "Warn message", "snippet": "warn"}, ] _print_scan_results(findings, 4) out = capsys.readouterr().out @@ -420,10 +359,7 @@ def test_cmd_init_claude_code_skip(tmp_path, monkeypatch, capsys): cmd_init(Args(tool="claude-code")) assert claude_file.read_text() == "VibeSec existing rules\n" - assert ( - "CLAUDE.md already contains VibeSec rules — skipping." - in capsys.readouterr().out - ) + assert "CLAUDE.md already contains VibeSec rules — skipping." in capsys.readouterr().out def test_cmd_init_windsurf(tmp_path, monkeypatch, capsys): @@ -467,7 +403,6 @@ def test_cmd_init_supabase_stack(tmp_path, monkeypatch, capsys): def test_sanitize_terminal_output(): from scanner.cli.vibesec import _sanitize_terminal_output - # Test normal strings assert _sanitize_terminal_output("normal string") == "normal string" assert _sanitize_terminal_output("tabs\tare\tallowed") == "tabs\tare\tallowed" @@ -481,130 +416,3 @@ def test_sanitize_terminal_output(): # Test non-strings assert _sanitize_terminal_output(None) is None -def test_collect_files_scandir_error(tmp_path): - test_dir = tmp_path / "testdir" - test_dir.mkdir() - - with patch("os.scandir", side_effect=PermissionError("Permission denied")): - assert list(_collect_files(test_dir)) == [] - - with patch("os.scandir", side_effect=OSError("OS error")): - assert list(_collect_files(test_dir)) == [] - - -def test_collect_files_entry_error(tmp_path): - test_dir = tmp_path / "testdir" - test_dir.mkdir() - (test_dir / "file.py").touch() - - import os - - original_scandir = os.scandir - - def mocked_scandir(path): - it = original_scandir(path) - - class MockIterator: - def __enter__(self): - return self - - def __exit__(self, *args): - pass - - def __iter__(self): - for entry in it: - - class MockEntry: - @property - def name(self): - return entry.name - - @property - def path(self): - return entry.path - - def is_symlink(self): - raise PermissionError("Denied") - - def is_dir(self, **kwargs): - return False - - def is_file(self, **kwargs): - return False - - yield MockEntry() - - return MockIterator() - - with patch("os.scandir", side_effect=mocked_scandir): - assert list(_collect_files(test_dir)) == [] - - -def test_scan_file_lstat_error(tmp_path): - test_file = tmp_path / "unsafe.ts" - test_file.write_text("const key = 'x';\n") - - with patch("os.lstat", side_effect=PermissionError("Permission denied")): - assert _scan_file(test_file, tmp_path) == [] - - with patch("os.lstat", side_effect=OSError("OS error")): - assert _scan_file(test_file, tmp_path) == [] - - -def test_scan_file_large_file_skip(tmp_path): - test_file = tmp_path / "unsafe.ts" - test_file.write_text("const key = 'x';\n") - - import os - - original_lstat = os.lstat - - def mocked_lstat(path): - st = original_lstat(path) - - class MockStat: - @property - def st_mode(self): - return st.st_mode - - @property - def st_size(self): - return 10 * 1024 * 1024 + 1 - - return MockStat() - - with patch("os.lstat", side_effect=mocked_lstat): - assert _scan_file(test_file, tmp_path) == [] - - -@patch("scanner.cli.vibesec.SCAN_RULES", MOCK_RULES) -def test_scan_file_read_error_during_iteration(tmp_path): - test_file = tmp_path / "unsafe.ts" - test_file.write_text("const key = MOCK_SECRET_KEY;\n") - - class MockFile: - def __init__(self, *args, **kwargs): - pass - - def __enter__(self): - return self - - def __exit__(self, *args): - pass - - def __iter__(self): - yield "const key = MOCK_SECRET_KEY;\n" - raise OSError("OS error mid-read") - - with patch("pathlib.Path.open", side_effect=MockFile): - findings = _scan_file(test_file, tmp_path) - assert len(findings) == 1 - assert findings[0]["rule_id"] == "mock-secret" - - -def test_scan_file_no_applicable_rules(tmp_path): - test_file = tmp_path / "unsafe.xyz" - test_file.write_text("const key = MOCK_SECRET_KEY;\n") - - with patch("scanner.cli.vibesec._get_applicable_rules", return_value=[]): - assert _scan_file(test_file, tmp_path) == []