Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
296 changes: 244 additions & 52 deletions tests/test_vibesec.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@

import pytest

from scanner.cli.vibesec import _collect_files, _print_scan_results, _scan_file, cmd_init, cmd_scan
from scanner.cli.vibesec import (
_collect_files,
_print_scan_results,
_scan_file,
cmd_init,
cmd_scan,
)

MOCK_RULES = [
{
Expand Down Expand Up @@ -81,7 +87,9 @@ def test_scan_file_with_findings(tmp_path):
@patch("scanner.cli.vibesec.SCAN_RULES", MOCK_RULES)
def test_scan_file_with_multiple_findings(tmp_path):
test_file = tmp_path / "unsafe_multiple.js"
test_file.write_text("const key = MOCK_SECRET_KEY;\n// TODO: fix auth checks here\n")
test_file.write_text(
"const key = MOCK_SECRET_KEY;\n// TODO: fix auth checks here\n"
)

findings = _scan_file(test_file, tmp_path)
rule_ids = [f["rule_id"] for f in findings]
Expand Down Expand Up @@ -119,26 +127,34 @@ def test_scan_file_rule_cache_invalidates_when_scan_rules_change(tmp_path):
test_file = tmp_path / "unsafe.py"
test_file.write_text("FIRST_TOKEN\nSECOND_TOKEN\n")

first_rules = [{
"id": "first",
"pattern": re.compile(r"FIRST_TOKEN"),
"severity": "HIGH",
"message": "first token",
"extensions": [".py"],
}]
second_rules = [{
"id": "second",
"pattern": re.compile(r"SECOND_TOKEN"),
"severity": "HIGH",
"message": "second token",
"extensions": [".py"],
}]
first_rules = [
{
"id": "first",
"pattern": re.compile(r"FIRST_TOKEN"),
"severity": "HIGH",
"message": "first token",
"extensions": [".py"],
}
]
second_rules = [
{
"id": "second",
"pattern": re.compile(r"SECOND_TOKEN"),
"severity": "HIGH",
"message": "second token",
"extensions": [".py"],
}
]

with patch("scanner.cli.vibesec.SCAN_RULES", first_rules):
assert [finding["rule_id"] for finding in _scan_file(test_file, tmp_path)] == ["first"]
assert [finding["rule_id"] for finding in _scan_file(test_file, tmp_path)] == [
"first"
]

with patch("scanner.cli.vibesec.SCAN_RULES", second_rules):
assert [finding["rule_id"] for finding in _scan_file(test_file, tmp_path)] == ["second"]
assert [finding["rule_id"] for finding in _scan_file(test_file, tmp_path)] == [
"second"
]


def test_collect_files():
Expand All @@ -156,7 +172,9 @@ def test_collect_files():
(base_path / "package.lock").touch()

collected_files = list(_collect_files(base_path))
collected_rel_paths = {f.relative_to(base_path).as_posix() for f in collected_files}
collected_rel_paths = {
f.relative_to(base_path).as_posix() for f in collected_files
}

assert collected_rel_paths == {"src/main.py", "src/utils.js", "README.md"}
assert "node_modules/index.js" not in collected_rel_paths
Expand All @@ -171,7 +189,9 @@ def test_collect_files_skips_file_symlink(tmp_path):
link = tmp_path / "linked.py"
_create_symlink(target, link)

collected_rel_paths = {f.relative_to(tmp_path).as_posix() for f in _collect_files(tmp_path)}
collected_rel_paths = {
f.relative_to(tmp_path).as_posix() for f in _collect_files(tmp_path)
}

assert "target.py" in collected_rel_paths
assert "linked.py" not in collected_rel_paths
Expand All @@ -184,7 +204,9 @@ def test_collect_files_skips_dir_symlink(tmp_path):
link = tmp_path / "linked_dir"
_create_symlink(real_dir, link, target_is_directory=True)

collected_rel_paths = {f.relative_to(tmp_path).as_posix() for f in _collect_files(tmp_path)}
collected_rel_paths = {
f.relative_to(tmp_path).as_posix() for f in _collect_files(tmp_path)
}

assert "real/nested.py" in collected_rel_paths
assert "linked_dir/nested.py" not in collected_rel_paths
Expand All @@ -207,7 +229,9 @@ def test_collect_files_handles_cyclic_symlink(tmp_path):
_create_symlink(dir_b, dir_a / "to_b", target_is_directory=True)
_create_symlink(dir_a, dir_b / "to_a", target_is_directory=True)

collected_rel_paths = {f.relative_to(tmp_path).as_posix() for f in _collect_files(tmp_path)}
collected_rel_paths = {
f.relative_to(tmp_path).as_posix() for f in _collect_files(tmp_path)
}

assert collected_rel_paths == {"a/a.py", "b/b.py"}

Expand Down Expand Up @@ -243,14 +267,16 @@ def test_print_scan_results_empty(capsys):


def test_print_scan_results_critical(capsys):
findings = [{
"severity": "CRITICAL",
"file": "app/page.tsx",
"line": 10,
"rule_id": "VSEC-001",
"message": "Found a critical issue",
"snippet": "const secret = 'abc';",
}]
findings = [
{
"severity": "CRITICAL",
"file": "app/page.tsx",
"line": 10,
"rule_id": "VSEC-001",
"message": "Found a critical issue",
"snippet": "const secret = 'abc';",
}
]
_print_scan_results(findings, 2)
captured = capsys.readouterr()

Expand All @@ -260,18 +286,23 @@ def test_print_scan_results_critical(capsys):
assert "Code: const secret = 'abc';" in captured.out
assert "🔴 1 critical" in captured.out
assert "❌ Critical issues found. Fix before deploying." in captured.out
assert "💡 Run 'vibesec review' to get an AI prompt for fixing these issues." in captured.out
assert (
"💡 Run 'vibesec review' to get an AI prompt for fixing these issues."
in captured.out
)


def test_print_scan_results_high(capsys):
findings = [{
"severity": "HIGH",
"file": "app/api/route.ts",
"line": 5,
"rule_id": "VSEC-002",
"message": "Found a high issue",
"snippet": "export async function GET() {}",
}]
findings = [
{
"severity": "HIGH",
"file": "app/api/route.ts",
"line": 5,
"rule_id": "VSEC-002",
"message": "Found a high issue",
"snippet": "export async function GET() {}",
}
]
_print_scan_results(findings, 3)
captured = capsys.readouterr()

Expand All @@ -281,14 +312,16 @@ def test_print_scan_results_high(capsys):


def test_print_scan_results_warnings_only(capsys):
findings = [{
"severity": "WARNING",
"file": "utils.ts",
"line": 1,
"rule_id": "VSEC-003",
"message": "Found a warning",
"snippet": "console.log(data);",
}]
findings = [
{
"severity": "WARNING",
"file": "utils.ts",
"line": 1,
"rule_id": "VSEC-003",
"message": "Found a warning",
"snippet": "console.log(data);",
}
]
_print_scan_results(findings, 1)
captured = capsys.readouterr()

Expand All @@ -299,10 +332,38 @@ def test_print_scan_results_warnings_only(capsys):

def test_print_scan_results_sorting(capsys):
findings = [
{"severity": "INFO", "file": "info.ts", "line": 1, "rule_id": "VSEC-004", "message": "Info message", "snippet": "info"},
{"severity": "CRITICAL", "file": "crit.ts", "line": 2, "rule_id": "VSEC-001", "message": "Crit message", "snippet": "crit"},
{"severity": "HIGH", "file": "high.ts", "line": 3, "rule_id": "VSEC-002", "message": "High message", "snippet": "high"},
{"severity": "WARNING", "file": "warn.ts", "line": 4, "rule_id": "VSEC-003", "message": "Warn message", "snippet": "warn"},
{
"severity": "INFO",
"file": "info.ts",
"line": 1,
"rule_id": "VSEC-004",
"message": "Info message",
"snippet": "info",
},
{
"severity": "CRITICAL",
"file": "crit.ts",
"line": 2,
"rule_id": "VSEC-001",
"message": "Crit message",
"snippet": "crit",
},
{
"severity": "HIGH",
"file": "high.ts",
"line": 3,
"rule_id": "VSEC-002",
"message": "High message",
"snippet": "high",
},
{
"severity": "WARNING",
"file": "warn.ts",
"line": 4,
"rule_id": "VSEC-003",
"message": "Warn message",
"snippet": "warn",
},
]
_print_scan_results(findings, 4)
out = capsys.readouterr().out
Expand Down Expand Up @@ -359,7 +420,10 @@ def test_cmd_init_claude_code_skip(tmp_path, monkeypatch, capsys):
cmd_init(Args(tool="claude-code"))

assert claude_file.read_text() == "VibeSec existing rules\n"
assert "CLAUDE.md already contains VibeSec rules — skipping." in capsys.readouterr().out
assert (
"CLAUDE.md already contains VibeSec rules — skipping."
in capsys.readouterr().out
)


def test_cmd_init_windsurf(tmp_path, monkeypatch, capsys):
Expand Down Expand Up @@ -403,6 +467,7 @@ def test_cmd_init_supabase_stack(tmp_path, monkeypatch, capsys):

def test_sanitize_terminal_output():
from scanner.cli.vibesec import _sanitize_terminal_output

# Test normal strings
assert _sanitize_terminal_output("normal string") == "normal string"
assert _sanitize_terminal_output("tabs\tare\tallowed") == "tabs\tare\tallowed"
Expand All @@ -416,3 +481,130 @@ def test_sanitize_terminal_output():

# Test non-strings
assert _sanitize_terminal_output(None) is None
def test_collect_files_scandir_error(tmp_path):
test_dir = tmp_path / "testdir"
test_dir.mkdir()

with patch("os.scandir", side_effect=PermissionError("Permission denied")):
assert list(_collect_files(test_dir)) == []

with patch("os.scandir", side_effect=OSError("OS error")):
assert list(_collect_files(test_dir)) == []


def test_collect_files_entry_error(tmp_path):
test_dir = tmp_path / "testdir"
test_dir.mkdir()
(test_dir / "file.py").touch()

import os

original_scandir = os.scandir

def mocked_scandir(path):
it = original_scandir(path)

class MockIterator:
def __enter__(self):
return self

def __exit__(self, *args):
pass

def __iter__(self):
for entry in it:

class MockEntry:
@property
def name(self):
return entry.name

@property
def path(self):
return entry.path

def is_symlink(self):
raise PermissionError("Denied")

def is_dir(self, **kwargs):
return False

def is_file(self, **kwargs):
return False

yield MockEntry()

return MockIterator()

with patch("os.scandir", side_effect=mocked_scandir):
assert list(_collect_files(test_dir)) == []


def test_scan_file_lstat_error(tmp_path):
test_file = tmp_path / "unsafe.ts"
test_file.write_text("const key = 'x';\n")

with patch("os.lstat", side_effect=PermissionError("Permission denied")):
assert _scan_file(test_file, tmp_path) == []

with patch("os.lstat", side_effect=OSError("OS error")):
assert _scan_file(test_file, tmp_path) == []


def test_scan_file_large_file_skip(tmp_path):
test_file = tmp_path / "unsafe.ts"
test_file.write_text("const key = 'x';\n")

import os

original_lstat = os.lstat

def mocked_lstat(path):
st = original_lstat(path)

class MockStat:
@property
def st_mode(self):
return st.st_mode

@property
def st_size(self):
return 10 * 1024 * 1024 + 1

return MockStat()

with patch("os.lstat", side_effect=mocked_lstat):
assert _scan_file(test_file, tmp_path) == []


@patch("scanner.cli.vibesec.SCAN_RULES", MOCK_RULES)
def test_scan_file_read_error_during_iteration(tmp_path):
test_file = tmp_path / "unsafe.ts"
test_file.write_text("const key = MOCK_SECRET_KEY;\n")

class MockFile:
def __init__(self, *args, **kwargs):
pass

def __enter__(self):
return self

def __exit__(self, *args):
pass

def __iter__(self):
yield "const key = MOCK_SECRET_KEY;\n"
raise OSError("OS error mid-read")

with patch("pathlib.Path.open", side_effect=MockFile):
findings = _scan_file(test_file, tmp_path)
assert len(findings) == 1
assert findings[0]["rule_id"] == "mock-secret"


def test_scan_file_no_applicable_rules(tmp_path):
test_file = tmp_path / "unsafe.xyz"
test_file.write_text("const key = MOCK_SECRET_KEY;\n")

with patch("scanner.cli.vibesec._get_applicable_rules", return_value=[]):
assert _scan_file(test_file, tmp_path) == []
Loading