From 4cbb4cc1b9b159ecead4731f1b36e774d094ed90 Mon Sep 17 00:00:00 2001 From: Sushruti Mishra Date: Sun, 5 Apr 2026 20:48:28 -0400 Subject: [PATCH 1/3] chore: Added unit tests --- .gitignore | 1 + tests/__init__.py | 0 tests/conftest.py | 81 ++++++++++ tests/test_config_loader.py | 234 ++++++++++++++++++++++++++++ tests/test_idea_manager.py | 292 +++++++++++++++++++++++++++++++++++ tests/test_pipeline_state.py | 120 ++++++++++++++ tests/test_security.py | 194 +++++++++++++++++++++++ 7 files changed, 922 insertions(+) create mode 100644 tests/__init__.py create mode 100644 tests/conftest.py create mode 100644 tests/test_config_loader.py create mode 100644 tests/test_idea_manager.py create mode 100644 tests/test_pipeline_state.py create mode 100644 tests/test_security.py diff --git a/.gitignore b/.gitignore index 1499251..4a082f6 100644 --- a/.gitignore +++ b/.gitignore @@ -45,6 +45,7 @@ venv/ env/ ENV/ .venv +.coverage # IDE .vscode/ diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..d64a584 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,81 @@ +"""Shared fixtures for NeuriCo tests.""" + +import sys +from pathlib import Path + +import pytest +import yaml + +# Add src/ to path so core modules are importable +sys.path.insert(0, str(Path(__file__).parent.parent / "src")) + + +@pytest.fixture +def tmp_config_dir(tmp_path): + """Create a temp directory with a valid domains.yaml config.""" + config_dir = tmp_path / "config" + config_dir.mkdir() + + # Minimal domains config matching the structure of config/domains.yaml + domains_config = { + "default_domain": "artificial_intelligence", + "domains": { + "artificial_intelligence": { + "name": "Artificial Intelligence", + "description": "AI research", + "has_template": True, + }, + "machine_learning": { + "name": "Machine Learning", + "description": "ML research", + "has_template": True, + }, + "data_science": { + "name": "Data Science", + "description": "Data analysis", + "has_template": False, + }, + }, + "validation": {"allow_unknown": True, "warn_missing_template": True}, + } + + with open(config_dir / "domains.yaml", "w") as f: + yaml.dump(domains_config, f) + + return config_dir + + +@pytest.fixture +def tmp_ideas_dir(tmp_path): + """Create a temp directory structure for idea storage.""" + ideas_dir = tmp_path / "ideas" + ideas_dir.mkdir() + return ideas_dir + + +@pytest.fixture +def sample_idea_spec(): + """Return a valid idea specification dict with all optional fields populated.""" + return { + "idea": { + "title": "Test ML Experiment", + "domain": "machine_learning", + "hypothesis": "Fine-tuning with curriculum learning improves convergence speed", + "expected_outputs": [ + {"type": "metrics", "format": "json", "fields": ["accuracy", "loss"]} + ], + "evaluation_criteria": ["Convergence speed improvement > 10%"], + } + } + + +@pytest.fixture +def minimal_idea_spec(): + """Return a minimal valid idea specification (only required fields).""" + return { + "idea": { + "title": "Minimal Test Idea", + "domain": "artificial_intelligence", + "hypothesis": "This is a sufficiently long hypothesis for testing purposes", + } + } diff --git a/tests/test_config_loader.py b/tests/test_config_loader.py new file mode 100644 index 0000000..d8077f4 --- /dev/null +++ b/tests/test_config_loader.py @@ -0,0 +1,234 @@ +"""Tests for core.config_loader module.""" + +import os +import yaml +import pytest +from unittest.mock import patch + +from core.config_loader import ConfigLoader, normalize_domain, get_valid_domains, get_default_domain + + +@pytest.fixture(autouse=True) +def reset_singleton(): + """Reset ConfigLoader singleton between tests.""" + ConfigLoader._instance = None + ConfigLoader._cache = {} + yield + ConfigLoader._instance = None + ConfigLoader._cache = {} + + +@pytest.fixture +def loader(tmp_config_dir): + """Return a ConfigLoader pointing at the tmp config directory.""" + loader = ConfigLoader() + loader.config_dir = tmp_config_dir + loader.project_root = tmp_config_dir.parent + return loader + + +class TestLoadConfig: + # Verify a valid YAML file is loaded and parsed into a dict + def test_loads_valid_yaml(self, loader): + config = loader.load_config("domains") + assert "domains" in config + assert "artificial_intelligence" in config["domains"] + + # Verify FileNotFoundError is raised for a missing config file + def test_missing_config_raises(self, loader): + with pytest.raises(FileNotFoundError): + loader.load_config("nonexistent") + + # Verify second call returns the same cached object (no disk read) + def test_caches_on_second_call(self, loader): + first = loader.load_config("domains") + second = loader.load_config("domains") + assert first is second + + # Verify reload=True bypasses cache and picks up on-disk changes + def test_reload_bypasses_cache(self, loader, tmp_config_dir): + first = loader.load_config("domains") + + # Modify the file on disk + config_path = tmp_config_dir / "domains.yaml" + updated = first.copy() + updated["default_domain"] = "data_science" + with open(config_path, "w") as f: + yaml.dump(updated, f) + + reloaded = loader.load_config("domains", reload=True) + assert reloaded["default_domain"] == "data_science" + + +class TestSingletonBehavior: + # Verify __new__ returns the same instance (singleton pattern) + def test_two_instances_are_same_object(self): + a = ConfigLoader() + b = ConfigLoader() + assert a is b + + # Verify cache is shared across singleton references + def test_shared_cache(self, tmp_config_dir): + a = ConfigLoader() + a.config_dir = tmp_config_dir + a.load_config("domains") + + b = ConfigLoader() + # b should see a's cached value without needing config_dir set + assert "domains" in b._cache + + +class TestDomainHelpers: + # Verify get_valid_domains returns domain keys from config + def test_get_valid_domains(self, loader): + domains = loader.get_valid_domains() + assert "machine_learning" in domains + assert "artificial_intelligence" in domains + + # Verify known domain returns True, unknown returns False + def test_is_domain_valid(self, loader): + assert loader.is_domain_valid("machine_learning") is True + assert loader.is_domain_valid("underwater_basket_weaving") is False + + # Verify default domain matches the config file value + def test_get_default_domain(self, loader): + assert loader.get_default_domain() == "artificial_intelligence" + + # Verify display name is pulled from config's 'name' field + def test_get_domain_display_name(self, loader): + assert loader.get_domain_display_name("machine_learning") == "Machine Learning" + + # Verify unknown domain falls back to title-cased slug + def test_get_domain_display_name_fallback(self, loader): + name = loader.get_domain_display_name("unknown_domain") + assert name == "Unknown Domain" + + # Verify has_template flag is read correctly (True and False cases) + def test_domain_has_template(self, loader): + assert loader.domain_has_template("artificial_intelligence") is True + assert loader.domain_has_template("data_science") is False + + # Verify allow_unknown setting is read from validation config + def test_should_allow_unknown_domains(self, loader): + assert loader.should_allow_unknown_domains() is True + + +class TestConvenienceFunctions: + # Verify module-level get_valid_domains() returns domains from config + def test_get_valid_domains(self, loader): + domains = get_valid_domains() + assert "machine_learning" in domains + assert "artificial_intelligence" in domains + + # Verify module-level get_default_domain() returns the default from config + def test_get_default_domain(self, loader): + assert get_default_domain() == "artificial_intelligence" + + +class TestWorkspaceConfig: + # Verify workspace.yaml is loaded when it exists + def test_loads_workspace_yaml(self, loader, tmp_config_dir): + workspace_cfg = {"workspace": {"parent_dir": "/custom/path", "auto_create": False}} + with open(tmp_config_dir / "workspace.yaml", "w") as f: + yaml.dump(workspace_cfg, f) + + config = loader.get_workspace_config() + assert config["workspace"]["parent_dir"] == "/custom/path" + + # Verify fallback to workspace.yaml.example when workspace.yaml is missing + + def test_falls_back_to_template(self, loader, tmp_config_dir): + template_cfg = {"workspace": {"parent_dir": "from_template", "auto_create": True}} + with open(tmp_config_dir / "workspace.yaml.example", "w") as f: + yaml.dump(template_cfg, f) + + config = loader.get_workspace_config() + assert config["workspace"]["parent_dir"] == "from_template" + + # Verify hardcoded defaults when neither yaml nor template exists + def test_falls_back_to_defaults_when_no_files(self, loader): + config = loader.get_workspace_config() + assert config["workspace"]["parent_dir"] == "workspaces" + assert config["workspace"]["auto_create"] is True + + # Verify workspace config is cached after first load + def test_caches_workspace_config(self, loader, tmp_config_dir): + template_cfg = {"workspace": {"parent_dir": "cached"}} + with open(tmp_config_dir / "workspace.yaml.example", "w") as f: + yaml.dump(template_cfg, f) + + first = loader.get_workspace_config() + second = loader.get_workspace_config() + assert first is second + + # Verify auto_create flag is read from workspace config + def test_should_auto_create_workspace(self, loader, tmp_config_dir): + cfg = {"workspace": {"parent_dir": "ws", "auto_create": False}} + with open(tmp_config_dir / "workspace.yaml", "w") as f: + yaml.dump(cfg, f) + assert loader.should_auto_create_workspace() is False + + +class TestGetWorkspaceParentDir: + # Verify NEURICO_WORKSPACE env var takes highest priority (Docker override) + def test_env_var_override(self, loader): + with patch.dict(os.environ, {"NEURICO_WORKSPACE": "/docker/workspace"}): + result = loader.get_workspace_parent_dir() + assert str(result) == "/docker/workspace" + + # Verify absolute path from config is used as-is + def test_absolute_path_from_config(self, loader, tmp_config_dir): + cfg = {"workspace": {"parent_dir": "/absolute/workspaces"}} + with open(tmp_config_dir / "workspace.yaml", "w") as f: + yaml.dump(cfg, f) + + with patch.dict(os.environ, {}, clear=False): + os.environ.pop("NEURICO_WORKSPACE", None) + result = loader.get_workspace_parent_dir() + assert str(result) == "/absolute/workspaces" + + # Verify relative path is resolved against project root + def test_relative_path_resolves_to_project_root(self, loader, tmp_config_dir): + cfg = {"workspace": {"parent_dir": "my_workspaces"}} + with open(tmp_config_dir / "workspace.yaml", "w") as f: + yaml.dump(cfg, f) + + with patch.dict(os.environ, {}, clear=False): + os.environ.pop("NEURICO_WORKSPACE", None) + result = loader.get_workspace_parent_dir() + assert result == loader.project_root / "my_workspaces" + + # Verify ${VAR} syntax in config is substituted from environment + def test_env_var_substitution_in_config(self, loader, tmp_config_dir): + cfg = {"workspace": {"parent_dir": "${MY_CUSTOM_DIR}"}} + with open(tmp_config_dir / "workspace.yaml", "w") as f: + yaml.dump(cfg, f) + + with patch.dict(os.environ, {"MY_CUSTOM_DIR": "/from/env"}, clear=False): + os.environ.pop("NEURICO_WORKSPACE", None) + result = loader.get_workspace_parent_dir() + assert str(result) == "/from/env" + + +class TestNormalizeDomain: + # Verify a valid domain is returned unchanged + def test_valid_domain_passes_through(self, loader): + assert normalize_domain("machine_learning") == "machine_learning" + + # Verify unknown domain falls back to default when allow_unknown is True + def test_unknown_domain_falls_back_to_default(self, loader): + result = normalize_domain("quantum_computing") + assert result == "artificial_intelligence" + + # Verify unknown domain is returned as-is when allow_unknown is False + def test_unknown_domain_no_fallback_when_disallowed(self, loader, tmp_config_dir): + config_path = tmp_config_dir / "domains.yaml" + with open(config_path) as f: + config = yaml.safe_load(f) + config["validation"]["allow_unknown"] = False + with open(config_path, "w") as f: + yaml.dump(config, f) + loader.load_config("domains", reload=True) + + result = normalize_domain("quantum_computing") + assert result == "quantum_computing" diff --git a/tests/test_idea_manager.py b/tests/test_idea_manager.py new file mode 100644 index 0000000..0e18275 --- /dev/null +++ b/tests/test_idea_manager.py @@ -0,0 +1,292 @@ +"""Tests for core.idea_manager module.""" + +import pytest +import yaml +from unittest.mock import patch, MagicMock + +from core.idea_manager import IdeaManager + + +@pytest.fixture +def manager(tmp_ideas_dir, tmp_config_dir): + """Return an IdeaManager using temp directories with mocked ConfigLoader.""" + mock_loader = MagicMock() + mock_loader.get_valid_domains.return_value = [ + "artificial_intelligence", "machine_learning", "data_science" + ] + mock_loader.should_allow_unknown_domains.return_value = True + mock_loader.get_default_domain.return_value = "artificial_intelligence" + + with patch("core.idea_manager.ConfigLoader", return_value=mock_loader): + mgr = IdeaManager(ideas_dir=tmp_ideas_dir) + # Store mock so tests can reconfigure it + mgr._mock_loader = mock_loader + return mgr + + +class TestValidateIdea: + # Verify a fully populated idea spec passes validation with no errors + def test_valid_idea_passes(self, manager, sample_idea_spec): + with patch("core.idea_manager.ConfigLoader", return_value=manager._mock_loader): + result = manager.validate_idea(sample_idea_spec) + assert result["valid"] is True + assert result["errors"] == [] + + # Verify spec without top-level 'idea' key is rejected immediately + def test_missing_top_level_idea_key(self, manager): + with patch("core.idea_manager.ConfigLoader", return_value=manager._mock_loader): + result = manager.validate_idea({"title": "oops"}) + assert result["valid"] is False + assert any("Missing top-level 'idea' key" in e for e in result["errors"]) + + # Verify missing required field 'title' produces an error + def test_missing_title(self, manager): + spec = {"idea": {"domain": "machine_learning", "hypothesis": "A long enough hypothesis here"}} + with patch("core.idea_manager.ConfigLoader", return_value=manager._mock_loader): + result = manager.validate_idea(spec) + assert result["valid"] is False + assert any("title" in e for e in result["errors"]) + + # Verify missing required field 'domain' produces an error + def test_missing_domain(self, manager): + spec = {"idea": {"title": "Test", "hypothesis": "A long enough hypothesis here"}} + with patch("core.idea_manager.ConfigLoader", return_value=manager._mock_loader): + result = manager.validate_idea(spec) + assert result["valid"] is False + assert any("domain" in e for e in result["errors"]) + + # Verify missing required field 'hypothesis' produces an error + def test_missing_hypothesis(self, manager): + spec = {"idea": {"title": "Test", "domain": "machine_learning"}} + with patch("core.idea_manager.ConfigLoader", return_value=manager._mock_loader): + result = manager.validate_idea(spec) + assert result["valid"] is False + assert any("hypothesis" in e for e in result["errors"]) + + # Verify hypothesis under 20 chars triggers a warning (not an error) + def test_short_hypothesis_warning(self, manager): + spec = {"idea": {"title": "Test", "domain": "machine_learning", "hypothesis": "Short"}} + with patch("core.idea_manager.ConfigLoader", return_value=manager._mock_loader): + result = manager.validate_idea(spec) + assert result["valid"] is True + assert any("short" in w.lower() for w in result["warnings"]) + + # Verify unknown domain produces a warning when allow_unknown is True + def test_unknown_domain_warns(self, manager): + spec = { + "idea": { + "title": "Test", + "domain": "underwater_basket_weaving", + "hypothesis": "A long enough hypothesis for testing", + } + } + with patch("core.idea_manager.ConfigLoader", return_value=manager._mock_loader): + result = manager.validate_idea(spec) + assert result["valid"] is True + assert any("Unknown domain" in w for w in result["warnings"]) + + # Verify unknown domain produces an error when allow_unknown is False + def test_unknown_domain_errors_when_disallowed(self, manager): + manager._mock_loader.should_allow_unknown_domains.return_value = False + spec = { + "idea": { + "title": "Test", + "domain": "unknown_field", + "hypothesis": "A sufficiently long hypothesis for testing", + } + } + with patch("core.idea_manager.ConfigLoader", return_value=manager._mock_loader): + result = manager.validate_idea(spec) + assert result["valid"] is False + assert any("Invalid domain" in e for e in result["errors"]) + + # Verify invalid compute constraint value is rejected + def test_invalid_compute_constraint(self, manager, sample_idea_spec): + sample_idea_spec["idea"]["constraints"] = {"compute": "quantum"} + with patch("core.idea_manager.ConfigLoader", return_value=manager._mock_loader): + result = manager.validate_idea(sample_idea_spec) + assert result["valid"] is False + assert any("compute" in e.lower() for e in result["errors"]) + + # Verify expected_outputs that isn't a list produces an error + def test_expected_outputs_not_a_list(self, manager, sample_idea_spec): + sample_idea_spec["idea"]["expected_outputs"] = "not_a_list" + with patch("core.idea_manager.ConfigLoader", return_value=manager._mock_loader): + result = manager.validate_idea(sample_idea_spec) + assert result["valid"] is False + assert any("expected_outputs must be a list" in e for e in result["errors"]) + + # Verify empty expected_outputs list triggers a warning (agent decides outputs) + def test_expected_outputs_empty_warns(self, manager, sample_idea_spec): + sample_idea_spec["idea"]["expected_outputs"] = [] + with patch("core.idea_manager.ConfigLoader", return_value=manager._mock_loader): + result = manager.validate_idea(sample_idea_spec) + assert result["valid"] is True + assert any("empty" in w for w in result["warnings"]) + + # Verify output entries missing 'type' and 'format' fields produce errors + def test_expected_output_missing_type_and_format(self, manager, sample_idea_spec): + sample_idea_spec["idea"]["expected_outputs"] = [{"description": "results"}] + with patch("core.idea_manager.ConfigLoader", return_value=manager._mock_loader): + result = manager.validate_idea(sample_idea_spec) + assert result["valid"] is False + assert any("missing 'type'" in e for e in result["errors"]) + assert any("missing 'format'" in e for e in result["errors"]) + + # Verify omitting expected_outputs entirely triggers an informational warning + def test_no_expected_outputs_warns(self, manager, minimal_idea_spec): + with patch("core.idea_manager.ConfigLoader", return_value=manager._mock_loader): + result = manager.validate_idea(minimal_idea_spec) + assert any("No expected_outputs" in w for w in result["warnings"]) + + # Verify non-integer time_limit produces an error + def test_time_limit_not_integer(self, manager, sample_idea_spec): + sample_idea_spec["idea"]["constraints"] = {"time_limit": "fast"} + with patch("core.idea_manager.ConfigLoader", return_value=manager._mock_loader): + result = manager.validate_idea(sample_idea_spec) + assert any("time_limit must be an integer" in e for e in result["errors"]) + + # Verify time_limit under 60s triggers a "very short" warning + def test_time_limit_too_short_warns(self, manager, sample_idea_spec): + sample_idea_spec["idea"]["constraints"] = {"time_limit": 30} + with patch("core.idea_manager.ConfigLoader", return_value=manager._mock_loader): + result = manager.validate_idea(sample_idea_spec) + assert any("very short" in w for w in result["warnings"]) + + # Verify time_limit over 24h triggers a "very long" warning + def test_time_limit_too_long_warns(self, manager, sample_idea_spec): + sample_idea_spec["idea"]["constraints"] = {"time_limit": 100000} + with patch("core.idea_manager.ConfigLoader", return_value=manager._mock_loader): + result = manager.validate_idea(sample_idea_spec) + assert any("very long" in w for w in result["warnings"]) + + # Verify evaluation_criteria that isn't a list produces an error + def test_evaluation_criteria_not_a_list(self, manager, sample_idea_spec): + sample_idea_spec["idea"]["evaluation_criteria"] = "just a string" + with patch("core.idea_manager.ConfigLoader", return_value=manager._mock_loader): + result = manager.validate_idea(sample_idea_spec) + assert any("evaluation_criteria must be a list" in e for e in result["errors"]) + + # Verify empty evaluation_criteria list triggers a warning + def test_evaluation_criteria_empty_warns(self, manager, sample_idea_spec): + sample_idea_spec["idea"]["evaluation_criteria"] = [] + with patch("core.idea_manager.ConfigLoader", return_value=manager._mock_loader): + result = manager.validate_idea(sample_idea_spec) + assert any("No evaluation criteria" in w for w in result["warnings"]) + + +class TestSubmitIdea: + # Verify submit writes a YAML file to submitted/ with correct metadata + def test_creates_yaml_file(self, manager, sample_idea_spec): + with patch("core.idea_manager.ConfigLoader", return_value=manager._mock_loader): + idea_id = manager.submit_idea(sample_idea_spec) + + idea_file = manager.submitted_dir / f"{idea_id}.yaml" + assert idea_file.exists() + + with open(idea_file) as f: + saved = yaml.safe_load(f) + assert saved["idea"]["title"] == "Test ML Experiment" + assert saved["idea"]["metadata"]["status"] == "submitted" + + # Verify submitting an invalid idea raises ValueError + def test_invalid_idea_raises(self, manager): + with patch("core.idea_manager.ConfigLoader", return_value=manager._mock_loader): + with pytest.raises(ValueError, match="validation failed"): + manager.submit_idea({"idea": {}}) + + +class TestGenerateIdeaId: + # Verify generated ID contains a sanitized (lowercase, underscored) title + def test_id_contains_sanitized_title(self, manager, sample_idea_spec): + idea_id = manager._generate_idea_id(sample_idea_spec) + assert "test_ml_experiment" in idea_id + + # Verify generated ID ends with an 8-char hex hash for uniqueness + def test_id_contains_hash(self, manager, sample_idea_spec): + idea_id = manager._generate_idea_id(sample_idea_spec) + # ID format: {safe_title}_{timestamp}_{hash8} + parts = idea_id.rsplit("_", 1) + assert len(parts[-1]) == 8 + + +class TestIdeaLifecycle: + # Verify a submitted idea can be retrieved by its ID + def test_submit_and_retrieve(self, manager, sample_idea_spec): + with patch("core.idea_manager.ConfigLoader", return_value=manager._mock_loader): + idea_id = manager.submit_idea(sample_idea_spec) + retrieved = manager.get_idea(idea_id) + assert retrieved is not None + assert retrieved["idea"]["title"] == "Test ML Experiment" + + # Verify status update moves the YAML file between directories + def test_update_status_moves_file(self, manager, sample_idea_spec): + with patch("core.idea_manager.ConfigLoader", return_value=manager._mock_loader): + idea_id = manager.submit_idea(sample_idea_spec) + assert (manager.submitted_dir / f"{idea_id}.yaml").exists() + + manager.update_status(idea_id, "in_progress") + assert not (manager.submitted_dir / f"{idea_id}.yaml").exists() + assert (manager.in_progress_dir / f"{idea_id}.yaml").exists() + + # Verify invalid status string raises ValueError + def test_update_status_invalid_raises(self, manager): + with pytest.raises(ValueError, match="Invalid status"): + manager.update_status("fake_id", "invalid_status") + + # Verify get_idea returns None for an ID that doesn't exist + def test_get_idea_returns_none_for_missing(self, manager): + assert manager.get_idea("nonexistent_id_12345") is None + + # Verify update_status returns False when the idea ID is not found + def test_update_status_returns_false_for_missing(self, manager): + assert manager.update_status("nonexistent_id_12345", "in_progress") is False + + # Verify list_ideas filters by status and returns correct summaries + def test_list_ideas_returns_submitted(self, manager, sample_idea_spec): + with patch("core.idea_manager.ConfigLoader", return_value=manager._mock_loader): + manager.submit_idea(sample_idea_spec) + ideas = manager.list_ideas(status="submitted") + assert len(ideas) == 1 + assert ideas[0]["title"] == "Test ML Experiment" + + # Verify list_ideas with status=None returns ideas across all directories + def test_list_ideas_all_statuses(self, manager, sample_idea_spec): + with patch("core.idea_manager.ConfigLoader", return_value=manager._mock_loader): + idea_id = manager.submit_idea(sample_idea_spec) + manager.update_status(idea_id, "in_progress") + ideas = manager.list_ideas(status=None) + assert len(ideas) == 1 + assert ideas[0]["status"] == "in_progress" + + # Verify list_ideas filters correctly for in_progress and completed + def test_list_ideas_by_in_progress_and_completed(self, manager, sample_idea_spec): + with patch("core.idea_manager.ConfigLoader", return_value=manager._mock_loader): + idea_id = manager.submit_idea(sample_idea_spec) + manager.update_status(idea_id, "in_progress") + + assert len(manager.list_ideas(status="in_progress")) == 1 + assert len(manager.list_ideas(status="completed")) == 0 + + manager.update_status(idea_id, "completed") + assert len(manager.list_ideas(status="in_progress")) == 0 + assert len(manager.list_ideas(status="completed")) == 1 + + # Verify update_status creates metadata dict if idea was saved without one + def test_update_status_creates_metadata(self, manager, tmp_ideas_dir): + # Manually write an idea file without metadata + idea_file = manager.submitted_dir / "no_meta.yaml" + idea_file.write_text(yaml.dump({"idea": {"title": "No Metadata Idea"}})) + + result = manager.update_status("no_meta", "in_progress") + assert result is True + + moved_file = manager.in_progress_dir / "no_meta.yaml" + with open(moved_file) as f: + saved = yaml.safe_load(f) + assert saved["idea"]["metadata"]["status"] == "in_progress" + + # Verify list_ideas rejects invalid status strings + def test_list_ideas_invalid_status_raises(self, manager): + with pytest.raises(ValueError, match="Invalid status"): + manager.list_ideas(status="archived") diff --git a/tests/test_pipeline_state.py b/tests/test_pipeline_state.py new file mode 100644 index 0000000..2d05a63 --- /dev/null +++ b/tests/test_pipeline_state.py @@ -0,0 +1,120 @@ +"""Tests for PipelineState from core.pipeline_orchestrator.""" + +import json + +import pytest + +from core.pipeline_orchestrator import PipelineState + + +@pytest.fixture +def state(tmp_path): + """Return a fresh PipelineState using a temp work directory.""" + return PipelineState(tmp_path) + + +class TestInitialState: + # Verify fresh state has an empty stages dict + def test_fresh_state_has_no_stages(self, state): + assert state.state["stages"] == {} + + # Verify fresh state is not marked completed + def test_fresh_state_not_completed(self, state): + assert state.state["completed"] is False + + # Verify fresh state has no current stage set + def test_fresh_state_no_current_stage(self, state): + assert state.state["current_stage"] is None + + # Verify state file is written to disk on initialization + def test_state_file_created(self, state): + assert state.state_file.exists() + + +class TestStartStage: + # Verify starting a stage sets status to in_progress and updates current_stage + def test_marks_stage_in_progress(self, state): + state.start_stage("resource_finder") + assert state.state["stages"]["resource_finder"]["status"] == "in_progress" + assert state.state["current_stage"] == "resource_finder" + + # Verify started_at timestamp is recorded + def test_sets_started_at(self, state): + state.start_stage("resource_finder") + assert state.state["stages"]["resource_finder"]["started_at"] is not None + + +class TestCompleteStage: + # Verify successful completion sets status, success flag, and outputs + def test_success(self, state): + state.start_stage("resource_finder") + state.complete_stage("resource_finder", success=True, outputs={"papers": 5}) + + stage = state.state["stages"]["resource_finder"] + assert stage["status"] == "completed" + assert stage["success"] is True + assert stage["outputs"] == {"papers": 5} + assert state.state["current_stage"] is None + + # Verify failed completion sets status to 'failed' with success=False + def test_failure(self, state): + state.start_stage("experiment_runner") + state.complete_stage("experiment_runner", success=False) + + stage = state.state["stages"]["experiment_runner"] + assert stage["status"] == "failed" + assert stage["success"] is False + + # Verify completing a stage that was never started still works + def test_complete_without_start(self, state): + state.complete_stage("ad_hoc", success=True) + assert state.state["stages"]["ad_hoc"]["status"] == "completed" + + +class TestMarkCompleted: + # Verify mark_completed sets the pipeline-level completed flag and timestamp + def test_marks_pipeline_completed(self, state): + state.mark_completed() + assert state.state["completed"] is True + assert "completed_at" in state.state + + +class TestStageQueries: + # Verify get_stage_status returns None for unknown stages, correct status otherwise + def test_get_stage_status(self, state): + assert state.get_stage_status("resource_finder") is None + state.start_stage("resource_finder") + assert state.get_stage_status("resource_finder") == "in_progress" + + # Verify is_stage_completed returns True only after successful completion + def test_is_stage_completed(self, state): + assert state.is_stage_completed("resource_finder") is False + state.start_stage("resource_finder") + state.complete_stage("resource_finder", success=True) + assert state.is_stage_completed("resource_finder") is True + + # Verify a failed stage is not considered "completed" + def test_failed_stage_not_considered_completed(self, state): + state.start_stage("resource_finder") + state.complete_stage("resource_finder", success=False) + assert state.is_stage_completed("resource_finder") is False + + +class TestPersistence: + # Verify state survives a new PipelineState instance reading from the same directory + def test_state_persists_to_disk(self, tmp_path): + state1 = PipelineState(tmp_path) + state1.start_stage("resource_finder") + state1.complete_stage("resource_finder", success=True, outputs={"count": 3}) + + # Load from disk via new instance + state2 = PipelineState(tmp_path) + assert state2.is_stage_completed("resource_finder") is True + assert state2.state["stages"]["resource_finder"]["outputs"] == {"count": 3} + + # Verify the state file on disk is valid JSON + def test_state_file_is_valid_json(self, state): + state.start_stage("test") + with open(state.state_file) as f: + data = json.load(f) + assert "stages" in data diff --git a/tests/test_security.py b/tests/test_security.py new file mode 100644 index 0000000..b5d1512 --- /dev/null +++ b/tests/test_security.py @@ -0,0 +1,194 @@ +"""Tests for core.security module.""" + +from core.security import get_safe_env, sanitize_text, sanitize_log_file, sanitize_logs_directory + +class TestSanitizeText: + # Verify all OpenAI key formats (project, org, OpenRouter, bare) are redacted + def test_redacts_openai_keys(self): + cases = [ + ("sk-proj-abc123DEF456ghi789JKL012", "[REDACTED_OPENAI_PROJECT_KEY]"), + ("sk-or-v1-abc123DEF456ghi789JKL012", "[REDACTED_OPENROUTER_KEY]"), + ("sk-or-abc123DEF456ghi789JKL012mno", "[REDACTED_OPENAI_ORG_KEY]"), + ("sk-" + "A" * 48, "[REDACTED_OPENAI_KEY]"), + ] + for key, expected_redaction in cases: + result = sanitize_text(f"key is {key}") + assert key not in result, f"Key {key[:15]}... was not redacted" + assert expected_redaction in result, f"Expected {expected_redaction} for {key[:15]}..." + + # Verify Anthropic sk-ant- prefix keys are redacted + def test_redacts_anthropic_key(self): + text = "key is sk-ant-abc123DEF456ghi789JKL012" + result = sanitize_text(text) + assert "sk-ant-" not in result + assert "[REDACTED_ANTHROPIC_KEY]" in result + + # Verify all GitHub token formats (PAT, OAuth, App, Refresh, fine-grained) are redacted + def test_redacts_github_tokens(self): + suffix = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789ab" + cases = [ + (f"ghp_{suffix}", "[REDACTED_GITHUB_PAT]"), + (f"gho_{suffix}", "[REDACTED_GITHUB_OAUTH]"), + (f"ghs_{suffix}", "[REDACTED_GITHUB_APP]"), + (f"ghr_{suffix}", "[REDACTED_GITHUB_REFRESH]"), + ("github_pat_ABCDEFGHIJ0123456789ab", "[REDACTED_GITHUB_FINE_GRAINED]"), + ] + for key, expected_redaction in cases: + result = sanitize_text(f"token is {key}") + assert key not in result, f"Token {key[:15]}... was not redacted" + assert expected_redaction in result, f"Expected {expected_redaction} for {key[:15]}..." + + # Verify AWS access key IDs (AKIA prefix) are redacted + def test_redacts_aws_access_key(self): + text = "key is AKIAIOSFODNN7EXAMPLE" + result = sanitize_text(text) + assert "AKIA" not in result + assert "[REDACTED_AWS_ACCESS_KEY]" in result + + # Verify Google/Gemini API keys (AIza prefix) are redacted + def test_redacts_google_api_key(self): + text = "key is AIzaSyD-example-key-that-is-long-enough-00" + result = sanitize_text(text) + assert "AIza" not in result + assert "[REDACTED_GOOGLE_KEY]" in result + + # Verify KEY=value assignments are redacted for all tracked env var names + def test_redacts_env_var_assignments(self): + env_vars = [ + "OPENAI_API_KEY", "ANTHROPIC_API_KEY", "GITHUB_TOKEN", + "GEMINI_API_KEY", "GOOGLE_API_KEY", "OPENROUTER_KEY", + ] + for var in env_vars: + result = sanitize_text(f"{var}=some-secret-value") + assert "some-secret-value" not in result, f"{var} assignment value not redacted" + assert f"{var}=[REDACTED]" in result, f"{var} not replaced with [REDACTED]" + + # Verify export KEY=value assignments are also caught + def test_redacts_export_env_assignments(self): + env_vars = [ + "OPENAI_API_KEY", "ANTHROPIC_API_KEY", "GITHUB_TOKEN", + "GEMINI_API_KEY", "GOOGLE_API_KEY", "OPENROUTER_KEY", + ] + for var in env_vars: + result = sanitize_text(f"export {var}=some-secret-value") + assert "some-secret-value" not in result, f"export {var} value not redacted" + + # Verify normal text without secrets passes through unchanged + def test_preserves_normal_text(self): + text = "This is a normal log line with no secrets." + assert sanitize_text(text) == text + + # Verify short strings starting with "sk" aren't false-positived + def test_preserves_short_sk_prefix(self): + text = "the sketch is ready" + assert sanitize_text(text) == text + + +class TestGetSafeEnv: + # Verify known sensitive keys (OPENAI, ANTHROPIC, etc.) are stripped from env + def test_removes_sensitive_keys(self): + env = { + "PATH": "/usr/bin", + "OPENAI_API_KEY": "sk-secret", + "HOME": "/home/user", + "ANTHROPIC_API_KEY": "sk-ant-secret", + } + safe = get_safe_env(env) + assert "OPENAI_API_KEY" not in safe + assert "ANTHROPIC_API_KEY" not in safe + + # Verify non-sensitive keys are preserved untouched + def test_keeps_non_sensitive_keys(self): + env = { + "PATH": "/usr/bin", + "HOME": "/home/user", + "LANG": "en_US.UTF-8", + } + safe = get_safe_env(env) + assert safe == env + + # Verify empty env dict returns empty dict without error + def test_empty_env(self): + assert get_safe_env({}) == {} + + +class TestSanitizeLogFile: + # Verify a log file containing secrets from all key patterns is fully redacted + def test_sanitizes_file_with_secrets(self, tmp_path): + github_suffix = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789ab" + log_file = tmp_path / "test.log" + log_file.write_text( + "OpenAI project: sk-proj-abc123DEF456ghi789JKL012\n" + "OpenRouter: sk-or-v1-abc123DEF456ghi789JKL012\n" + "OpenAI org: sk-or-abc123DEF456ghi789JKL012mno\n" + f"OpenAI bare: sk-{'A' * 48}\n" + "Anthropic: sk-ant-abc123DEF456ghi789JKL012\n" + f"GitHub PAT: ghp_{github_suffix}\n" + f"GitHub OAuth: gho_{github_suffix}\n" + f"GitHub App: ghs_{github_suffix}\n" + f"GitHub Refresh: ghr_{github_suffix}\n" + "GitHub fine-grained: github_pat_ABCDEFGHIJ0123456789ab\n" + "Google: AIzaSyD-example-key-that-is-long-enough-00\n" + "AWS: AKIAIOSFODNN7EXAMPLE\n" + ) + + modified = sanitize_log_file(log_file) + assert modified is True + + content = log_file.read_text() + expected_redactions = [ + "[REDACTED_OPENAI_PROJECT_KEY]", + "[REDACTED_OPENROUTER_KEY]", + "[REDACTED_OPENAI_ORG_KEY]", + "[REDACTED_OPENAI_KEY]", + "[REDACTED_ANTHROPIC_KEY]", + "[REDACTED_GITHUB_PAT]", + "[REDACTED_GITHUB_OAUTH]", + "[REDACTED_GITHUB_APP]", + "[REDACTED_GITHUB_REFRESH]", + "[REDACTED_GITHUB_FINE_GRAINED]", + "[REDACTED_GOOGLE_KEY]", + "[REDACTED_AWS_ACCESS_KEY]", + ] + for redaction in expected_redactions: + assert redaction in content, f"{redaction} not found in sanitized file" + + # Verify clean files are not rewritten (returns False) + def test_no_modification_when_clean(self, tmp_path): + log_file = tmp_path / "clean.log" + log_file.write_text("Nothing sensitive here.\n") + + modified = sanitize_log_file(log_file) + assert modified is False + + # Verify missing files are handled gracefully (returns False) + def test_nonexistent_file_returns_false(self, tmp_path): + modified = sanitize_log_file(tmp_path / "missing.log") + assert modified is False + + +class TestSanitizeLogsDirectory: + # Verify .log, .jsonl, and .txt files are sanitized but other extensions are ignored + def test_sanitizes_multiple_file_types(self, tmp_path): + (tmp_path / "run.log").write_text("key: sk-proj-abc123DEF456ghi789JKL012\n") + (tmp_path / "transcript.jsonl").write_text('{"key": "sk-ant-abc123DEF456ghi789JKL012"}\n') + (tmp_path / "notes.txt").write_text("OPENAI_API_KEY=mysecret\n") + # .py file should be ignored (not a log pattern) + (tmp_path / "script.py").write_text("sk-proj-abc123DEF456ghi789JKL012\n") + + count = sanitize_logs_directory(tmp_path) + assert count == 3 + assert "sk-proj-" in (tmp_path / "script.py").read_text() + + # Verify directory with only clean log files returns zero modifications + def test_returns_zero_for_clean_directory(self, tmp_path): + (tmp_path / "clean.log").write_text("No secrets here.\n") + assert sanitize_logs_directory(tmp_path) == 0 + + # Verify nonexistent directory is handled gracefully + def test_returns_zero_for_nonexistent_directory(self, tmp_path): + assert sanitize_logs_directory(tmp_path / "nope") == 0 + + # Verify empty directory returns zero without error + def test_returns_zero_for_empty_directory(self, tmp_path): + assert sanitize_logs_directory(tmp_path) == 0 From 2ff47ceaadff356cfb48031143765d9e68e5b9f0 Mon Sep 17 00:00:00 2001 From: Sushruti Mishra Date: Mon, 6 Apr 2026 22:04:10 -0400 Subject: [PATCH 2/3] chore: added integration tests --- .../test_pipeline_orchestrator_integration.py | 484 ++++++++++++++++++ 1 file changed, 484 insertions(+) create mode 100644 tests/test_pipeline_orchestrator_integration.py diff --git a/tests/test_pipeline_orchestrator_integration.py b/tests/test_pipeline_orchestrator_integration.py new file mode 100644 index 0000000..30face1 --- /dev/null +++ b/tests/test_pipeline_orchestrator_integration.py @@ -0,0 +1,484 @@ +"""Integration tests for ResearchPipelineOrchestrator from core.pipeline_orchestrator.""" + +import json +import subprocess +from contextlib import ExitStack +from pathlib import Path + +import pytest +from unittest.mock import patch, MagicMock + +from core.pipeline_orchestrator import ResearchPipelineOrchestrator, CLI_COMMANDS + + +@pytest.fixture +def idea_spec(): + """Return a minimal idea spec for pipeline tests.""" + return { + "idea": { + "title": "Test Research", + "domain": "machine_learning", + "hypothesis": "Testing the pipeline orchestrator end to end", + } + } + + +@pytest.fixture +def orchestrator(tmp_path): + """Return orchestrator with a temp work dir and .neurico dir pre-created.""" + work_dir = tmp_path / "workspace" + work_dir.mkdir() + (work_dir / ".neurico").mkdir() + return ResearchPipelineOrchestrator(work_dir=work_dir, templates_dir=tmp_path / "templates") + + +def _mock_resource_finder_success(**kwargs): + """Fake run_resource_finder that always succeeds.""" + return {"success": True, "outputs": {"papers": 3}} + + +def _mock_resource_finder_failure(**kwargs): + """Fake run_resource_finder that always fails.""" + return {"success": False, "error": "no papers found"} + + +def _experiment_patches(cli_cmd="sh -c cat"): + """Context manager stack that mocks all experiment runner dependencies. + + Patches: run_resource_finder, PromptGenerator, generate_instructions, + and overrides CLI_COMMANDS to use a simple shell command instead of real AI tools. + Uses 'sh -c ' so extra flags appended by the orchestrator are ignored. + """ + mock_pg = MagicMock() + mock_pg.return_value.generate_research_prompt.return_value = "fake research prompt" + mock_gen_inst = MagicMock(return_value="fake session instructions\n") + + stack = ExitStack() + stack.enter_context(patch.dict(CLI_COMMANDS, {"claude": cli_cmd, "codex": cli_cmd, "gemini": cli_cmd})) + stack.enter_context(patch("core.pipeline_orchestrator.run_resource_finder", _mock_resource_finder_success)) + stack.enter_context(patch("core.pipeline_orchestrator.generate_instructions", mock_gen_inst)) + stack.enter_context(patch("templates.prompt_generator.PromptGenerator", mock_pg)) + return stack + + +class TestRunPipelineFullFlow: + # Verify full pipeline succeeds when both stages succeed (resource finder + experiment runner) + def test_full_pipeline_success(self, orchestrator, idea_spec): + with _experiment_patches(): + results = orchestrator.run_pipeline( + idea=idea_spec, + provider="claude", + resource_finder_timeout=10, + experiment_runner_timeout=10, + ) + + assert results["success"] is True + assert results["stages"]["resource_finder"]["success"] is True + assert results["stages"]["experiment_runner"]["success"] is True + + # Verify pipeline results file was written + results_file = orchestrator.work_dir / ".neurico" / "pipeline_results.json" + assert results_file.exists() + saved = json.loads(results_file.read_text()) + assert saved["success"] is True + + # Verify pipeline state is marked completed after successful full run + def test_pipeline_state_completed_after_success(self, orchestrator, idea_spec): + with _experiment_patches(): + orchestrator.run_pipeline( + idea=idea_spec, + provider="claude", + resource_finder_timeout=10, + experiment_runner_timeout=10, + ) + + assert orchestrator.state.state["completed"] is True + assert orchestrator.state.is_stage_completed("resource_finder") + assert orchestrator.state.is_stage_completed("experiment_runner") + + +class TestSkipResourceFinder: + # Verify skip_resource_finder=True skips stage 1 and still runs experiment runner + def test_skips_resource_finder(self, orchestrator, idea_spec): + with _experiment_patches(): + results = orchestrator.run_pipeline( + idea=idea_spec, + provider="claude", + skip_resource_finder=True, + resource_finder_timeout=10, + experiment_runner_timeout=10, + ) + + assert results["success"] is True + assert results["stages"]["resource_finder"]["skipped"] is True + assert results["stages"]["experiment_runner"]["success"] is True + + # Verify resource_finder state is marked completed even when skipped + def test_state_marked_completed_when_skipped(self, orchestrator, idea_spec): + with _experiment_patches(): + orchestrator.run_pipeline( + idea=idea_spec, + provider="claude", + skip_resource_finder=True, + resource_finder_timeout=10, + experiment_runner_timeout=10, + ) + + assert orchestrator.state.is_stage_completed("resource_finder") + + +class TestResourceFinderFailure: + # Verify pipeline stops and returns failure when resource finder fails + def test_stops_pipeline_on_failure(self, orchestrator, idea_spec): + with patch("core.pipeline_orchestrator.run_resource_finder", _mock_resource_finder_failure): + results = orchestrator.run_pipeline( + idea=idea_spec, + provider="claude", + resource_finder_timeout=10, + experiment_runner_timeout=10, + ) + + assert results["success"] is False + assert results["stages"]["resource_finder"]["success"] is False + # Experiment runner should never have run + assert "experiment_runner" not in results["stages"] + + # Verify state reflects the failed resource_finder stage + def test_state_reflects_failure(self, orchestrator, idea_spec): + with patch("core.pipeline_orchestrator.run_resource_finder", _mock_resource_finder_failure): + orchestrator.run_pipeline( + idea=idea_spec, + provider="claude", + resource_finder_timeout=10, + experiment_runner_timeout=10, + ) + + assert orchestrator.state.get_stage_status("resource_finder") == "failed" + + +class TestHumanReviewPause: + # Verify pipeline continues when human approves (inputs "yes") + def test_approved_continues_to_experiment(self, orchestrator, idea_spec): + with _experiment_patches(), \ + patch("builtins.input", return_value="yes"): + results = orchestrator.run_pipeline( + idea=idea_spec, + provider="claude", + pause_after_resources=True, + resource_finder_timeout=10, + experiment_runner_timeout=10, + ) + + assert results["success"] is True + assert results["stages"]["human_review"]["approved"] is True + assert results["stages"]["experiment_runner"]["success"] is True + + # Verify pipeline stops when human rejects (inputs "no") + def test_rejected_stops_pipeline(self, orchestrator, idea_spec): + with patch("core.pipeline_orchestrator.run_resource_finder", _mock_resource_finder_success), \ + patch("builtins.input", return_value="no"): + results = orchestrator.run_pipeline( + idea=idea_spec, + provider="claude", + pause_after_resources=True, + resource_finder_timeout=10, + experiment_runner_timeout=10, + ) + + assert results["success"] is False + assert results["stages"]["human_review"]["approved"] is False + # Experiment runner should never have run + assert "experiment_runner" not in results["stages"] + + +class TestExperimentRunnerSubprocess: + # Verify experiment runner creates log and transcript files + def test_creates_log_files(self, orchestrator, idea_spec): + with _experiment_patches(): + orchestrator.run_pipeline( + idea=idea_spec, + provider="claude", + skip_resource_finder=True, + resource_finder_timeout=10, + experiment_runner_timeout=10, + ) + + logs_dir = orchestrator.work_dir / "logs" + assert (logs_dir / "execution_claude.log").exists() + assert (logs_dir / "execution_claude_transcript.jsonl").exists() + assert (logs_dir / "research_prompt.txt").exists() + assert (logs_dir / "session_instructions.txt").exists() + + # Verify session instructions are written to stdin of the subprocess (captured in log via cat) + def test_session_instructions_piped_to_process(self, orchestrator, idea_spec): + with _experiment_patches(): + orchestrator.run_pipeline( + idea=idea_spec, + provider="claude", + skip_resource_finder=True, + resource_finder_timeout=10, + experiment_runner_timeout=10, + ) + + # cat echoes stdin to stdout, so the log should contain the session instructions + log_content = (orchestrator.work_dir / "logs" / "execution_claude.log").read_text() + assert "fake session instructions" in log_content + + # Verify nonzero return code from subprocess marks experiment as failed + def test_nonzero_exit_code_fails(self, orchestrator, idea_spec): + with _experiment_patches("sh -c false"): + results = orchestrator.run_pipeline( + idea=idea_spec, + provider="claude", + skip_resource_finder=True, + resource_finder_timeout=10, + experiment_runner_timeout=10, + ) + + assert results["stages"]["experiment_runner"]["success"] is False + assert results["stages"]["experiment_runner"]["return_code"] != 0 + + # Verify provider-specific permission flags are applied (codex --yolo, claude --dangerously-skip-permissions) + def test_permission_flags_by_provider(self, orchestrator, idea_spec): + providers_and_flags = [ + ("claude", "--dangerously-skip-permissions"), + ("codex", "--yolo"), + ("gemini", "--yolo"), + ] + + for provider, expected_flag in providers_and_flags: + with _experiment_patches("sh -c echo"), \ + patch("subprocess.Popen", wraps=subprocess.Popen) as mock_popen: + try: + orchestrator.run_pipeline( + idea=idea_spec, + provider=provider, + skip_resource_finder=True, + full_permissions=True, + resource_finder_timeout=10, + experiment_runner_timeout=10, + ) + except Exception: + pass # echo may not behave perfectly, we just check the command + + call_args = mock_popen.call_args[0][0] + cmd_str = " ".join(call_args) + assert expected_flag in cmd_str, f"Expected {expected_flag} for {provider}, got: {cmd_str}" + + +class TestExperimentRunnerTimeout: + # Verify subprocess timeout is handled and returns timeout error + # Mocks process.wait() to raise TimeoutExpired since the orchestrator's readline + # loop blocks until stdout closes, making real timeouts unreliable in tests + def test_timeout_returns_error(self, orchestrator, idea_spec): + mock_process = MagicMock() + mock_process.stdin = MagicMock() + mock_process.stdout.readline.return_value = "" + mock_process.wait.side_effect = subprocess.TimeoutExpired(cmd="test", timeout=2) + mock_process.kill = MagicMock() + + with _experiment_patches(), \ + patch("subprocess.Popen", return_value=mock_process): + results = orchestrator.run_pipeline( + idea=idea_spec, + provider="claude", + skip_resource_finder=True, + resource_finder_timeout=10, + experiment_runner_timeout=2, + ) + + assert results["stages"]["experiment_runner"]["success"] is False + assert results["stages"]["experiment_runner"]["error"] == "timeout" + mock_process.kill.assert_called_once() + + +class TestResumePipeline: + # Verify resume skips resource_finder when it's already completed + def test_resumes_from_experiment_runner(self, orchestrator, idea_spec): + # Manually mark resource_finder as complete + orchestrator.state.start_stage("resource_finder") + orchestrator.state.complete_stage("resource_finder", success=True) + + with _experiment_patches(): + results = orchestrator.resume_pipeline( + idea=idea_spec, + provider="claude", + ) + + assert results["success"] is True + # Resource finder should have been skipped (not re-run) + assert results["stages"]["resource_finder"]["skipped"] is True + + # Verify resume returns immediately when all stages are already completed + def test_resume_when_already_complete(self, orchestrator, idea_spec): + orchestrator.state.start_stage("resource_finder") + orchestrator.state.complete_stage("resource_finder", success=True) + orchestrator.state.start_stage("experiment_runner") + orchestrator.state.complete_stage("experiment_runner", success=True) + + results = orchestrator.resume_pipeline(idea=idea_spec) + assert results["resumed"] is False + assert results["message"] == "Pipeline already complete" + + +class TestGetPipelineStatus: + # Verify status reflects no stages run on a fresh orchestrator + def test_fresh_status(self, orchestrator): + status = orchestrator.get_pipeline_status() + assert status["completed"] is False + assert status["current_stage"] is None + assert status["stages"] == {} + + # Verify status reflects in-progress stage + def test_in_progress_status(self, orchestrator): + orchestrator.state.start_stage("resource_finder") + status = orchestrator.get_pipeline_status() + assert status["current_stage"] == "resource_finder" + assert status["stages"]["resource_finder"]["status"] == "in_progress" + + # Verify status reflects completed pipeline + def test_completed_status(self, orchestrator, idea_spec): + with _experiment_patches(): + orchestrator.run_pipeline( + idea=idea_spec, + provider="claude", + skip_resource_finder=True, + resource_finder_timeout=10, + experiment_runner_timeout=10, + ) + + status = orchestrator.get_pipeline_status() + assert status["completed"] is True + + +class TestResultsPersistence: + # Verify pipeline_results.json is written even when pipeline fails + def test_results_saved_on_failure(self, orchestrator, idea_spec): + with patch("core.pipeline_orchestrator.run_resource_finder", _mock_resource_finder_failure): + orchestrator.run_pipeline( + idea=idea_spec, + provider="claude", + resource_finder_timeout=10, + experiment_runner_timeout=10, + ) + + results_file = orchestrator.work_dir / ".neurico" / "pipeline_results.json" + assert results_file.exists() + saved = json.loads(results_file.read_text()) + assert saved["success"] is False + + # Verify work_dir is recorded in the results + def test_work_dir_in_results(self, orchestrator, idea_spec): + with patch("core.pipeline_orchestrator.run_resource_finder", _mock_resource_finder_failure): + results = orchestrator.run_pipeline( + idea=idea_spec, + provider="claude", + resource_finder_timeout=10, + experiment_runner_timeout=10, + ) + + assert results["work_dir"] == str(orchestrator.work_dir) + + +class TestTemplatesDirAutoDetect: + # Verify templates_dir defaults to project_root/templates when not provided + def test_auto_detects_templates_dir(self, tmp_path): + work_dir = tmp_path / "workspace" + work_dir.mkdir() + (work_dir / ".neurico").mkdir() + + orch = ResearchPipelineOrchestrator(work_dir=work_dir) + assert orch.templates_dir == Path(__file__).parent.parent / "templates" + + +class TestResourceFinderException: + # Verify exception in run_resource_finder propagates and records failure in state + def test_exception_propagates(self, orchestrator, idea_spec): + def _exploding_resource_finder(**kwargs): + raise RuntimeError("connection lost") + + with patch("core.pipeline_orchestrator.run_resource_finder", _exploding_resource_finder), \ + pytest.raises(RuntimeError, match="connection lost"): + orchestrator.run_pipeline( + idea=idea_spec, + provider="claude", + resource_finder_timeout=10, + experiment_runner_timeout=10, + ) + + assert orchestrator.state.get_stage_status("resource_finder") == "failed" + + +class TestPipelineLevelException: + # Verify exceptions in run_pipeline are caught, recorded, and re-raised + def test_exception_saves_results_and_reraises(self, orchestrator, idea_spec): + def _exploding_resource_finder(**kwargs): + raise RuntimeError("total failure") + + with patch("core.pipeline_orchestrator.run_resource_finder", _exploding_resource_finder), \ + pytest.raises(RuntimeError, match="total failure"): + orchestrator.run_pipeline( + idea=idea_spec, + provider="claude", + resource_finder_timeout=10, + experiment_runner_timeout=10, + ) + + # Results file should still be saved (in finally block) + results_file = orchestrator.work_dir / ".neurico" / "pipeline_results.json" + assert results_file.exists() + saved = json.loads(results_file.read_text()) + assert saved["error"] == "total failure" + + +class TestExperimentRunnerException: + # Verify generic exception in experiment runner is caught, state updated, and re-raised + def test_exception_propagates(self, orchestrator, idea_spec): + mock_pg = MagicMock() + mock_pg.return_value.generate_research_prompt.side_effect = RuntimeError("template broken") + + with patch("core.pipeline_orchestrator.generate_instructions", MagicMock()), \ + patch("templates.prompt_generator.PromptGenerator", mock_pg), \ + pytest.raises(RuntimeError, match="template broken"): + orchestrator.run_pipeline( + idea=idea_spec, + provider="claude", + skip_resource_finder=True, + resource_finder_timeout=10, + experiment_runner_timeout=10, + ) + + assert orchestrator.state.get_stage_status("experiment_runner") == "failed" + + +class TestScribeMode: + # Verify use_scribe=True uses 'scribe' command and sets SCRIBE_RUN_DIR env var + def test_scribe_command_and_env(self, orchestrator, idea_spec): + mock_pg = MagicMock() + mock_pg.return_value.generate_research_prompt.return_value = "fake prompt" + mock_gen_inst = MagicMock(return_value="fake instructions\n") + mock_process = MagicMock() + mock_process.stdin = MagicMock() + mock_process.stdout.readline.return_value = "" + mock_process.wait.return_value = 0 + + with patch("core.pipeline_orchestrator.generate_instructions", mock_gen_inst), \ + patch("templates.prompt_generator.PromptGenerator", mock_pg), \ + patch("subprocess.Popen", return_value=mock_process) as mock_popen: + orchestrator.run_pipeline( + idea=idea_spec, + provider="claude", + skip_resource_finder=True, + use_scribe=True, + resource_finder_timeout=10, + experiment_runner_timeout=10, + ) + + # Check command starts with 'scribe' + call_args = mock_popen.call_args + cmd_list = call_args[0][0] + assert cmd_list[0] == "scribe", f"Expected scribe command, got: {cmd_list}" + + # Check SCRIBE_RUN_DIR is set in env + env = call_args[1]["env"] + assert env["SCRIBE_RUN_DIR"] == str(orchestrator.work_dir) From 434538806d312acf03c059b8c7452a1429ce1f47 Mon Sep 17 00:00:00 2001 From: Sushruti Mishra Date: Mon, 6 Apr 2026 22:16:32 -0400 Subject: [PATCH 3/3] chore: updated readme for tests --- README.md | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/README.md b/README.md index 1b0c6e3..c73696a 100644 --- a/README.md +++ b/README.md @@ -410,6 +410,29 @@ Paper-finder starts automatically in Docker — no extra setup needed. +## Testing + +Install dev dependencies and run the test suite: + +```bash +# Install uv if you don't have it +curl -LsSf https://astral.sh/uv/install.sh | sh + +# Install project with dev dependencies +uv sync --dev + +# Run all tests +uv run pytest tests/ -v + +# Run unit tests only +uv run pytest tests/test_security.py tests/test_config_loader.py tests/test_idea_manager.py tests/test_pipeline_state.py -v + +# Run integration tests only +uv run pytest tests/test_pipeline_orchestrator_integration.py -v +``` + +No API keys or external services are required — all tests use temporary directories and mock data. + ## Documentation - **[docs/WORKFLOW.md](docs/WORKFLOW.md)** - Complete workflow guide