diff --git a/agent/scanners/openssl_cnf.py b/agent/scanners/openssl_cnf.py index a152ee1..46239e6 100644 --- a/agent/scanners/openssl_cnf.py +++ b/agent/scanners/openssl_cnf.py @@ -268,25 +268,12 @@ def _check_legacy_provider( ) -def scan_openssl_cnf(path: Path | None = None) -> list[OpensslCnfFinding]: - """ - Parse openssl.cnf and flag insecure TLS/OpenSSL configuration settings. - Checks: weak MinProtocol (TLSv1.0/1.1), low CipherString SECLEVEL, - weak elliptic curves, and activation of the legacy provider. - Full implementation: task S4. - """ - conf_path = _resolve_conf_path(path) - if conf_path is None: - logger.info("scan_openssl_cnf: openssl.cnf not found, skipping") - return [] - - try: - content = conf_path.read_text(encoding="utf-8", errors="ignore") - except OSError as exc: - logger.warning("scan_openssl_cnf: cannot read %s: %s", conf_path, exc) - return [] - - sections = _parse_config(content) +def parse_openssl_cnf_text( + text: str, virtual_path: str = "openssl.cnf" +) -> list[OpensslCnfFinding]: + """Parse openssl.cnf text and flag insecure TLS/OpenSSL configuration settings.""" + path = Path(virtual_path) + sections = _parse_config(text) findings: list[OpensslCnfFinding] = [] for section in ("system_default_sect", "default_sect"): @@ -296,26 +283,48 @@ def scan_openssl_cnf(path: Path | None = None) -> list[OpensslCnfFinding]: min_protocol = data.get("minprotocol") if min_protocol: - finding = _check_min_protocol(conf_path, section, min_protocol) + finding = _check_min_protocol(path, section, min_protocol) if finding is not None: findings.append(finding) cipher_string = data.get("cipherstring") if cipher_string: - finding = _check_cipher_string(conf_path, section, cipher_string) + finding = _check_cipher_string(path, section, cipher_string) if finding is not None: findings.append(finding) curves = data.get("curves") if curves: - finding = _check_curves(conf_path, section, curves) + finding = _check_curves(path, section, curves) if finding is not None: findings.append(finding) - legacy_finding = _check_legacy_provider(sections, conf_path) + legacy_finding = _check_legacy_provider(sections, path) if legacy_finding is not None: findings.append(legacy_finding) + return findings + + +def scan_openssl_cnf(path: Path | None = None) -> list[OpensslCnfFinding]: + """ + Parse openssl.cnf and flag insecure TLS/OpenSSL configuration settings. + Checks: weak MinProtocol (TLSv1.0/1.1), low CipherString SECLEVEL, + weak elliptic curves, and activation of the legacy provider. + Full implementation: task S4. + """ + conf_path = _resolve_conf_path(path) + if conf_path is None: + logger.info("scan_openssl_cnf: openssl.cnf not found, skipping") + return [] + + try: + content = conf_path.read_text(encoding="utf-8", errors="ignore") + except OSError as exc: + logger.warning("scan_openssl_cnf: cannot read %s: %s", conf_path, exc) + return [] + + findings = parse_openssl_cnf_text(content, str(conf_path)) logger.info( "scan_openssl_cnf: found %d finding(s) in %s", len(findings), diff --git a/agent/scanners/sshd_config.py b/agent/scanners/sshd_config.py index 83dd24f..d0bdbca 100644 --- a/agent/scanners/sshd_config.py +++ b/agent/scanners/sshd_config.py @@ -120,48 +120,85 @@ class SshdConfigFinding: ] -def _parse_sshd_config(path: Path) -> tuple[dict[str, str], bool]: - """ - Parse the global section of an sshd_config file. - - Returns (directives, has_match_block) where: - - directives : dict of lowercase directive name → raw value string - - has_match_block : True when at least one Match block was encountered - - Stops at the first Match line — directives inside Match blocks are not - parsed (POC limitation documented in issue #9). - Inline comments (trailing #…) are stripped from values. - """ +def _parse_sshd_config_directives(text: str) -> tuple[dict[str, str], bool]: + """Parse sshd_config text into (directives, has_match_block).""" directives: dict[str, str] = {} has_match_block = False - try: - with path.open(encoding="utf-8") as f: - for raw_line in f: - line = raw_line.strip() - if not line or line.startswith("#"): - continue - # Stop parsing global directives at the first Match block. - if _MATCH_BLOCK_RE.match(line): - has_match_block = True - break - # Strip inline comment from the value (e.g. "Ciphers aes256-ctr # strong"). - line = _INLINE_COMMENT_RE.sub("", line).strip() - if not line: - continue - parts = re.split(r"[\s=]+", line, 1) - if len(parts) == 2: - directive = parts[0].lower() - value = parts[1].strip() - # Concatenate repeated directives (e.g. multiple MACs lines). - if directive in directives: - directives[directive] += f",{value}" - else: - directives[directive] = value - except OSError as exc: - logger.warning("Failed to parse sshd_config at %s: %s", path, exc) + for raw_line in text.splitlines(): + line = raw_line.strip() + if not line or line.startswith("#"): + continue + # Stop parsing global directives at the first Match block. + if _MATCH_BLOCK_RE.match(line): + has_match_block = True + break + # Strip inline comment from the value (e.g. "Ciphers aes256-ctr # strong"). + line = _INLINE_COMMENT_RE.sub("", line).strip() + if not line: + continue + parts = re.split(r"[\s=]+", line, 1) + if len(parts) == 2: + directive = parts[0].lower() + value = parts[1].strip() + # Concatenate repeated directives (e.g. multiple MACs lines). + if directive in directives: + directives[directive] += f",{value}" + else: + directives[directive] = value return directives, has_match_block +def parse_sshd_config_text( + text: str, virtual_path: str = "sshd_config" +) -> list[SshdConfigFinding]: + """Parse sshd_config text and emit one finding per weak crypto directive.""" + directives, has_match = _parse_sshd_config_directives(text) + findings: list[SshdConfigFinding] = [] + + if has_match: + findings.append(SshdConfigFinding( + source="sshd_config", + algorithm="Match-block", + severity="info", + name="Match", + key_size=None, + expiration_date=None, + store_or_path=virtual_path, + detail="sshd_config contains Match blocks — scoped configuration not analysed", + recommendation="Manual audit recommended for directives inside Match blocks", + recommendation_url=None, + risk_score=0.0, + )) + + for directive_key, weak_map in _WEAK_BY_DIRECTIVE.items(): + if directive_key not in directives: + continue + raw_value = directives[directive_key] + directive_display = _DIRECTIVE_DISPLAY.get(directive_key, directive_key) + rec, rec_url = _RECOMMENDATIONS.get(directive_key, ("Manual review recommended", None)) + + for token in raw_value.split(","): + algo = token.strip().lower() + if algo not in weak_map: + continue + severity, score = weak_map[algo] + findings.append(SshdConfigFinding( + source="sshd_config", + algorithm=algo, + severity=severity, + name=directive_display, + key_size=None, + expiration_date=None, + store_or_path=virtual_path, + detail=f"{directive_display} = {raw_value}", + recommendation=rec, + recommendation_url=rec_url, + risk_score=round(score * _CONFIG_TEMPORAL_FACTOR, 1), + )) + + return findings + + def scan_sshd_config(paths_to_scan: list[Path] | None = None) -> list[SshdConfigFinding]: """ Scan sshd_config files and emit one finding per weak crypto directive. @@ -202,48 +239,12 @@ def scan_sshd_config(paths_to_scan: list[Path] | None = None) -> list[SshdConfig continue logger.debug("Parsing sshd_config: %s", file) - parsed, has_match = _parse_sshd_config(file) - - if has_match: - findings.append(SshdConfigFinding( - source="sshd_config", - algorithm="Match-block", - severity="info", - name="Match", - key_size=None, - expiration_date=None, - store_or_path=str(file), - detail="sshd_config contains Match blocks — scoped configuration not analysed", - recommendation="Manual audit recommended for directives inside Match blocks", - recommendation_url=None, - risk_score=0.0, - )) - - for directive_key, weak_map in _WEAK_BY_DIRECTIVE.items(): - if directive_key not in parsed: - continue - raw_value = parsed[directive_key] - directive_display = _DIRECTIVE_DISPLAY.get(directive_key, directive_key) - rec, rec_url = _RECOMMENDATIONS.get(directive_key, ("Manual review recommended", None)) - - for token in raw_value.split(","): - algo = token.strip().lower() - if algo not in weak_map: - continue - severity, score = weak_map[algo] - findings.append(SshdConfigFinding( - source="sshd_config", - algorithm=algo, - severity=severity, - name=directive_display, - key_size=None, - expiration_date=None, - store_or_path=str(file), - detail=f"{directive_display} = {raw_value}", - recommendation=rec, - recommendation_url=rec_url, - risk_score=round(score * _CONFIG_TEMPORAL_FACTOR, 1), - )) + try: + text = file.read_text(encoding="utf-8") + except OSError as exc: + logger.warning("Failed to parse sshd_config at %s: %s", file, exc) + continue + findings.extend(parse_sshd_config_text(text, str(file))) logger.info("scan_sshd_config: found %d finding(s)", len(findings)) return findings \ No newline at end of file diff --git a/agent/tests/import/configs/sshd_config b/agent/tests/import/configs/sshd_config new file mode 100644 index 0000000..fe24c0d --- /dev/null +++ b/agent/tests/import/configs/sshd_config @@ -0,0 +1,6 @@ +# Test fixture — sshd_config with weak crypto directives +# Expected findings: 3 (diffie-hellman-group1-sha1 critical, 3des-cbc critical, hmac-sha1 high) +Protocol 2 +KexAlgorithms diffie-hellman-group1-sha1,curve25519-sha256 +Ciphers 3des-cbc,aes256-gcm@openssh.com +MACs hmac-sha1,hmac-sha2-256-etm@openssh.com diff --git a/agent/tests/import/configs/sshd_config_clean b/agent/tests/import/configs/sshd_config_clean new file mode 100644 index 0000000..844c799 --- /dev/null +++ b/agent/tests/import/configs/sshd_config_clean @@ -0,0 +1,7 @@ +# Test fixture — sshd_config with strong crypto directives only +# Expected findings: none +Protocol 2 +KexAlgorithms curve25519-sha256,diffie-hellman-group16-sha512 +Ciphers aes256-gcm@openssh.com,chacha20-poly1305@openssh.com +MACs hmac-sha2-256-etm@openssh.com,hmac-sha2-512-etm@openssh.com +HostKeyAlgorithms ssh-ed25519 diff --git a/backend/config_parser.py b/backend/config_parser.py new file mode 100644 index 0000000..36d788f --- /dev/null +++ b/backend/config_parser.py @@ -0,0 +1,392 @@ +""" +config_parser.py — Parse sshd_config and openssl.cnf files uploaded via the API. + +Mirrors the detection logic from agent/scanners/sshd_config.py and +agent/scanners/openssl_cnf.py, but works on bytes and returns list[dict] +compatible with repository.create_findings_bulk(). +""" +from __future__ import annotations + +import logging +import re +from pathlib import Path + +logger = logging.getLogger(__name__) + +_NIST_PQC_URL = "https://csrc.nist.gov/projects/post-quantum-cryptography" +_OPENSSH_DOCS_URL = "https://www.openssh.com/security.html" +_RFC8996_URL = "https://www.rfc-editor.org/rfc/rfc8996" +_OPENSSL_DOCS_URL = "https://www.openssl.org/docs/man3.0/man5/config.html" + +# Temporal factor for config findings: no expiration date → same as temporal_factor(None) = 0.8 +_CONFIG_TEMPORAL_FACTOR = 0.8 + +# --------------------------------------------------------------------------- +# sshd_config — weak algorithm tables (mirrors agent/scanners/sshd_config.py) +# --------------------------------------------------------------------------- + +_WEAK_KEXALGO: dict[str, tuple[str, int]] = { + "diffie-hellman-group1-sha1": ("critical", 100), + "diffie-hellman-group14-sha1": ("high", 80), +} + +_WEAK_CIPHERS: dict[str, tuple[str, int]] = { + "3des-cbc": ("critical", 100), + "blowfish-cbc": ("high", 85), + "cast128-cbc": ("high", 85), + "arcfour": ("critical", 100), + "arcfour128": ("high", 90), + "arcfour256": ("high", 85), +} + +_WEAK_MACS: dict[str, tuple[str, int]] = { + "hmac-md5": ("critical", 100), + "hmac-sha1": ("high", 80), + "hmac-md5-96": ("critical", 100), + "hmac-sha1-96": ("high", 80), +} + +_WEAK_HOSTKEYALGOS: dict[str, tuple[str, int]] = { + "ssh-rsa": ("high", 80), +} + +_WEAK_BY_DIRECTIVE: dict[str, dict[str, tuple[str, int]]] = { + "kexalgorithms": _WEAK_KEXALGO, + "ciphers": _WEAK_CIPHERS, + "macs": _WEAK_MACS, + "hostkeyalgorithms": _WEAK_HOSTKEYALGOS, +} + +_DIRECTIVE_DISPLAY: dict[str, str] = { + "kexalgorithms": "KexAlgorithms", + "ciphers": "Ciphers", + "macs": "MACs", + "hostkeyalgorithms": "HostKeyAlgorithms", +} + +_SSHD_RECOMMENDATIONS: dict[str, tuple[str, str | None]] = { + "kexalgorithms": ( + "Migrate to sntrup761x25519-sha512@openssh.com (hybrid PQC) or curve25519-sha256", + _NIST_PQC_URL, + ), + "ciphers": ( + "Use AES-256-GCM (aes256-gcm@openssh.com) or chacha20-poly1305@openssh.com", + _OPENSSH_DOCS_URL, + ), + "macs": ( + "Use ETM MACs: hmac-sha2-256-etm@openssh.com or hmac-sha2-512-etm@openssh.com", + _OPENSSH_DOCS_URL, + ), + "hostkeyalgorithms": ( + "Migrate to Ed25519 host keys (ssh-ed25519)", + _NIST_PQC_URL, + ), +} + +_MATCH_BLOCK_RE = re.compile(r"^Match\s+", re.IGNORECASE) +_INLINE_COMMENT_RE = re.compile(r"\s*#.*$") + +# --------------------------------------------------------------------------- +# openssl.cnf — detection helpers (mirrors agent/scanners/openssl_cnf.py) +# --------------------------------------------------------------------------- + +_SECLEVEL_RE = re.compile(r"@seclevel\s*=\s*(\d)", re.IGNORECASE) +_WEAK_CURVE_TOKENS = ("secp160", "secp192", "prime192", "secp224", "prime224") + + +# --------------------------------------------------------------------------- +# Auto-detection +# --------------------------------------------------------------------------- + +def detect_config_type(filename: str) -> str | None: + """Detect config type from filename: 'sshd' or 'openssl', or None if unknown.""" + name = Path(filename).name.lower() + if "sshd_config" in name: + return "sshd" + if "openssl" in name and Path(filename).suffix.lower() in (".cnf", ".conf"): + return "openssl" + return None + + +# --------------------------------------------------------------------------- +# sshd_config parser +# --------------------------------------------------------------------------- + +def _parse_sshd_directives(text: str) -> tuple[dict[str, str], bool]: + directives: dict[str, str] = {} + has_match_block = False + for raw_line in text.splitlines(): + line = raw_line.strip() + if not line or line.startswith("#"): + continue + if _MATCH_BLOCK_RE.match(line): + has_match_block = True + break + line = _INLINE_COMMENT_RE.sub("", line).strip() + if not line: + continue + parts = re.split(r"[\s=]+", line, 1) + if len(parts) == 2: + directive = parts[0].lower() + value = parts[1].strip() + if directive in directives: + directives[directive] += f",{value}" + else: + directives[directive] = value + return directives, has_match_block + + +def parse_sshd_config(content: bytes, filename: str) -> list[dict]: + """Parse sshd_config bytes and return findings as list[dict].""" + text = content.decode("utf-8", errors="ignore") + store_or_path = f"uploaded:{filename}" + directives, has_match = _parse_sshd_directives(text) + findings: list[dict] = [] + + if has_match: + findings.append({ + "source": "import_config", + "algorithm": "Match-block", + "severity": "info", + "name": "Match", + "key_size": None, + "expiration_date": None, + "store_or_path": store_or_path, + "detail": "sshd_config contains Match blocks — scoped configuration not analysed", + "recommendation": "Manual audit recommended for directives inside Match blocks", + "recommendation_url": None, + "risk_score": 0.0, + }) + + for directive_key, weak_map in _WEAK_BY_DIRECTIVE.items(): + if directive_key not in directives: + continue + raw_value = directives[directive_key] + directive_display = _DIRECTIVE_DISPLAY.get(directive_key, directive_key) + rec, rec_url = _SSHD_RECOMMENDATIONS.get(directive_key, ("Manual review recommended", None)) + + for token in raw_value.split(","): + algo = token.strip().lower() + if algo not in weak_map: + continue + severity, score = weak_map[algo] + findings.append({ + "source": "import_config", + "algorithm": algo, + "severity": severity, + "name": directive_display, + "key_size": None, + "expiration_date": None, + "store_or_path": store_or_path, + "detail": f"{directive_display} = {raw_value}", + "recommendation": rec, + "recommendation_url": rec_url, + "risk_score": round(score * _CONFIG_TEMPORAL_FACTOR, 1), + }) + + logger.debug("parse_sshd_config: %d finding(s) in %r", len(findings), filename) + return findings + + +# --------------------------------------------------------------------------- +# openssl.cnf parser +# --------------------------------------------------------------------------- + +def _parse_openssl_sections(text: str) -> dict[str, dict[str, str]]: + sections: dict[str, dict[str, str]] = {} + current: str | None = None + for raw_line in text.splitlines(): + line = raw_line.strip() + if not line or line.startswith("#") or line.startswith(";"): + continue + if line.startswith("[") and line.endswith("]"): + current = line[1:-1].strip().lower() + sections.setdefault(current, {}) + continue + if "=" not in line: + continue + key, value = line.split("=", 1) + key = key.strip().lower() + value = value.strip() + if current is None: + current = "default" + sections.setdefault(current, {}) + sections[current][key] = value + return sections + + +def _openssl_finding( + *, + store_or_path: str, + section: str, + key: str, + value: str, + algorithm: str, + severity: str, + recommendation: str, + recommendation_url: str | None = None, +) -> dict: + return { + "source": "import_config", + "algorithm": algorithm, + "severity": severity, + "name": value, + "key_size": None, + "expiration_date": None, + "store_or_path": store_or_path, + "detail": f"[{section}] {key} = {value}", + "recommendation": recommendation, + "recommendation_url": recommendation_url, + "risk_score": None, + } + + +def _check_min_protocol(store_or_path: str, section: str, value: str) -> dict | None: + normalized = value.strip().lower().replace(" ", "") + if normalized in {"tlsv1", "tlsv1.0"}: + severity, algorithm = "high", "TLS-TLSv1.0" + elif normalized == "tlsv1.1": + severity, algorithm = "medium", "TLS-TLSv1.1" + else: + return None + return _openssl_finding( + store_or_path=store_or_path, section=section, key="MinProtocol", value=value, + algorithm=algorithm, severity=severity, + recommendation="Set MinProtocol to TLSv1.2 or TLSv1.3", + recommendation_url=_RFC8996_URL, + ) + + +def _check_cipher_string(store_or_path: str, section: str, value: str) -> dict | None: + normalized = value.strip().lower() + match = _SECLEVEL_RE.search(normalized) + if match: + level = int(match.group(1)) + if level <= 0: + severity = "critical" + elif level == 1: + severity = "high" + else: + return None + return _openssl_finding( + store_or_path=store_or_path, section=section, key="CipherString", value=value, + algorithm=f"OpenSSL-SECLEVEL-{level}", severity=severity, + recommendation="Raise SECLEVEL to 2 or higher", + recommendation_url=_OPENSSL_DOCS_URL, + ) + + weak_tokens = ("low", "export", "null", "md5", "rc4", "des") + matched_token = next((t for t in weak_tokens if t in normalized), None) + if matched_token: + return _openssl_finding( + store_or_path=store_or_path, section=section, key="CipherString", value=value, + algorithm=f"OpenSSL-{matched_token.upper()}", severity="high", + recommendation="Remove weak ciphers or set SECLEVEL to 2 or higher", + recommendation_url=_OPENSSL_DOCS_URL, + ) + + if "aes128" in normalized or "aes-128" in normalized: + return _openssl_finding( + store_or_path=store_or_path, section=section, key="CipherString", value=value, + algorithm="AES-128", severity="low", + recommendation=( + "Prefer AES-256 ciphers (e.g. AES256-GCM-SHA384) to maintain " + "128-bit effective security against Grover's quantum algorithm" + ), + recommendation_url=_OPENSSL_DOCS_URL, + ) + + return None + + +def _check_curves(store_or_path: str, section: str, value: str) -> dict | None: + candidates = [part.strip() for part in re.split(r"[,:\s]+", value) if part.strip()] + weak = [c for c in candidates if any(tok in c.lower() for tok in _WEAK_CURVE_TOKENS)] + if not weak: + return None + severity = "high" if any("160" in c or "192" in c for c in weak) else "medium" + return _openssl_finding( + store_or_path=store_or_path, section=section, key="Curves", value=", ".join(weak), + algorithm=f"EC-{weak[0]}", severity=severity, + recommendation="Remove curves below 256-bit security", + recommendation_url=_OPENSSL_DOCS_URL, + ) + + +def _check_legacy_provider( + sections: dict[str, dict[str, str]], store_or_path: str +) -> dict | None: + default_section = sections.get("default") + init_section = None + if default_section: + init_section = default_section.get("openssl_conf") + if not init_section and "openssl_init" in sections: + init_section = "openssl_init" + if not init_section: + return None + init_config = sections.get(init_section) + if not init_config: + return None + provider_section_name = init_config.get("providers") + if not provider_section_name: + return None + provider_section = sections.get(provider_section_name) + if not provider_section: + return None + legacy_ref = provider_section.get("legacy") + if not legacy_ref: + return None + legacy_section = sections.get(legacy_ref) + if not legacy_section: + return None + activate = legacy_section.get("activate", "").strip().lower() + if activate not in {"1", "yes", "true", "on"}: + return None + return _openssl_finding( + store_or_path=store_or_path, + section=provider_section_name, + key="LegacyProvider", + value="legacy", + algorithm="OpenSSL-LegacyProvider", + severity="high", + recommendation="Disable the legacy provider unless strictly required", + recommendation_url=_OPENSSL_DOCS_URL, + ) + + +def parse_openssl_cnf(content: bytes, filename: str) -> list[dict]: + """Parse openssl.cnf bytes and return findings as list[dict].""" + text = content.decode("utf-8", errors="ignore") + store_or_path = f"uploaded:{filename}" + sections = _parse_openssl_sections(text) + findings: list[dict] = [] + + for section in ("system_default_sect", "default_sect"): + data = sections.get(section) + if not data: + continue + + min_protocol = data.get("minprotocol") + if min_protocol: + f = _check_min_protocol(store_or_path, section, min_protocol) + if f is not None: + findings.append(f) + + cipher_string = data.get("cipherstring") + if cipher_string: + f = _check_cipher_string(store_or_path, section, cipher_string) + if f is not None: + findings.append(f) + + curves = data.get("curves") + if curves: + f = _check_curves(store_or_path, section, curves) + if f is not None: + findings.append(f) + + legacy_finding = _check_legacy_provider(sections, store_or_path) + if legacy_finding is not None: + findings.append(legacy_finding) + + logger.debug("parse_openssl_cnf: %d finding(s) in %r", len(findings), filename) + return findings diff --git a/backend/models.py b/backend/models.py index 4d69905..89bbceb 100644 --- a/backend/models.py +++ b/backend/models.py @@ -88,10 +88,4 @@ class FindingRead(BaseModel): model_config = {"from_attributes": True} -# --------------------------------------------------------------------------- -# Import payloads -# --------------------------------------------------------------------------- -class ImportConfigPayload(BaseModel): - config_type: str = Field(..., pattern="^(sshd|openssl)$") - config_content: str = Field(..., description="Raw config file content") diff --git a/backend/routers/imports.py b/backend/routers/imports.py index 2365492..232fa2f 100644 --- a/backend/routers/imports.py +++ b/backend/routers/imports.py @@ -5,9 +5,10 @@ from sqlalchemy.orm import Session from cert_parser import parse_certificate_file +from config_parser import detect_config_type, parse_openssl_cnf, parse_sshd_config from ssh_parser import parse_ssh_public_key from database import get_db -from models import FindingRead, ImportConfigPayload +from models import FindingRead import repository logger = logging.getLogger(__name__) @@ -87,12 +88,40 @@ async def import_ssh( @router.post("/config", response_model=list[FindingRead], status_code=201) async def import_config( - payload: ImportConfigPayload, db: Session = Depends(get_db) + file: UploadFile = File(...), + db: Session = Depends(get_db), ) -> list[FindingRead]: """ - Parse an sshd_config or openssl.cnf file and return the findings. - Full implementation in task I4 / IM3. + Parse an uploaded sshd_config or openssl.cnf file and return the findings. + The config type is auto-detected from the filename + (sshd_config → sshd, openssl*.cnf → openssl). """ - logger.info("import_config: received request") - # TODO (I4/IM3): parse config, extract crypto directives, persist findings - raise HTTPException(status_code=501, detail="Not implemented yet") + content = await file.read(_MAX_CERT_SIZE + 1) + if len(content) > _MAX_CERT_SIZE: + raise HTTPException(status_code=413, detail="File too large (max 5 MB)") + + filename = file.filename or "uploaded_config" + config_type = detect_config_type(filename) + if config_type is None: + raise HTTPException( + status_code=400, + detail="Cannot detect config type from filename. " + "Use a file named 'sshd_config' or 'openssl*.cnf'.", + ) + + try: + if config_type == "sshd": + finding_dicts = parse_sshd_config(content, filename) + else: + finding_dicts = parse_openssl_cnf(content, filename) + except ValueError as exc: + raise HTTPException(status_code=400, detail=str(exc)) from exc + + scan = repository.create_scan(db, hostname="import", mode="import") + findings = repository.create_findings_bulk(db, scan.id, finding_dicts) + + logger.info( + "import_config: persisted %d finding(s) from %r (scan_id=%d, type=%s)", + len(findings), filename, scan.id, config_type, + ) + return findings diff --git a/backend/tests/test_import_endpoints.py b/backend/tests/test_import_endpoints.py index 606eede..168fe70 100644 --- a/backend/tests/test_import_endpoints.py +++ b/backend/tests/test_import_endpoints.py @@ -33,8 +33,10 @@ sys.path.insert(0, str(BACKEND_DIR)) # Fixture directories -CERTS_DIR = REPO_ROOT / "agent" / "tests" / "import" / "certs" -SSH_DIR = REPO_ROOT / "agent" / "tests" / "ssh" +CERTS_DIR = REPO_ROOT / "agent" / "tests" / "import" / "certs" +SSH_DIR = REPO_ROOT / "agent" / "tests" / "ssh" +CONFIGS_DIR = REPO_ROOT / "agent" / "tests" / "import" / "configs" +OPENSSL_FIXTURES_DIR = REPO_ROOT / "agent" / "tests" / "fixtures" # --------------------------------------------------------------------------- # Override the database dependency with an in-memory SQLite DB @@ -94,6 +96,14 @@ def post_ssh(filename: str) -> dict: return client.post("/import/ssh", files=files) +def post_config(filename: str, fixture_dir=None): + if fixture_dir is None: + fixture_dir = CONFIGS_DIR + path = fixture_dir / filename + files = {"file": (filename, path.read_bytes())} + return client.post("/import/config", files=files) + + # =========================================================================== # /import/cert — PEM # =========================================================================== @@ -313,3 +323,95 @@ def test_agent_wrapper_imports_same_object(self): from qready_scoring.algo_severity import ALGO_SEVERITY as shared from scoring.algo_severity import ALGO_SEVERITY as agent # noqa: F401 assert shared is agent, "agent wrapper must re-export the shared dict, not a copy" + + +# =========================================================================== +# /import/config — sshd_config +# =========================================================================== + +class TestImportConfigSshd: + def test_status_201(self): + r = post_config("sshd_config") + assert r.status_code == 201 + + def test_returns_findings(self): + findings = post_config("sshd_config").json() + assert isinstance(findings, list) + assert len(findings) >= 1 + + def test_source_is_import_config(self): + findings = post_config("sshd_config").json() + assert all(f["source"] == "import_config" for f in findings) + + def test_store_or_path_has_uploaded_prefix(self): + findings = post_config("sshd_config").json() + assert all(f["store_or_path"].startswith("uploaded:") for f in findings) + + def test_severity_is_valid_enum(self): + findings = post_config("sshd_config").json() + valid = {"critical", "high", "medium", "low", "info"} + for f in findings: + assert f["severity"] in valid + + def test_weak_kexalgo_detected(self): + algos = [f["algorithm"] for f in post_config("sshd_config").json()] + assert "diffie-hellman-group1-sha1" in algos + + def test_weak_cipher_detected(self): + algos = [f["algorithm"] for f in post_config("sshd_config").json()] + assert "3des-cbc" in algos + + def test_clean_config_returns_empty_list(self): + r = post_config("sshd_config_clean") + assert r.status_code == 201 + assert r.json() == [] + + +# =========================================================================== +# /import/config — openssl.cnf +# =========================================================================== + +class TestImportConfigOpenssl: + def test_status_201_critical(self): + r = post_config("test_openssl_critical.cnf", fixture_dir=OPENSSL_FIXTURES_DIR) + assert r.status_code == 201 + + def test_critical_finding_detected(self): + findings = post_config("test_openssl_critical.cnf", fixture_dir=OPENSSL_FIXTURES_DIR).json() + assert len(findings) >= 1 + assert any(f["severity"] == "critical" for f in findings) + + def test_seclevel_zero_algorithm(self): + algos = [f["algorithm"] for f in + post_config("test_openssl_critical.cnf", fixture_dir=OPENSSL_FIXTURES_DIR).json()] + assert "OpenSSL-SECLEVEL-0" in algos + + def test_source_is_import_config(self): + findings = post_config("test_openssl_critical.cnf", fixture_dir=OPENSSL_FIXTURES_DIR).json() + assert all(f["source"] == "import_config" for f in findings) + + def test_store_or_path_has_uploaded_prefix(self): + findings = post_config("test_openssl_critical.cnf", fixture_dir=OPENSSL_FIXTURES_DIR).json() + assert all(f["store_or_path"].startswith("uploaded:") for f in findings) + + def test_strong_config_returns_empty_list(self): + r = post_config("test_openssl_strong.cnf", fixture_dir=OPENSSL_FIXTURES_DIR) + assert r.status_code == 201 + assert r.json() == [] + + +# =========================================================================== +# /import/config — edge cases +# =========================================================================== + +class TestImportConfigEdgeCases: + def test_unknown_filename_returns_400(self): + files = {"file": ("unknown_file.txt", b"some content")} + r = client.post("/import/config", files=files) + assert r.status_code == 400 + + def test_empty_sshd_config_returns_empty_list(self): + files = {"file": ("sshd_config", b"")} + r = client.post("/import/config", files=files) + assert r.status_code == 201 + assert r.json() == []