Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 32 additions & 23 deletions agent/scanners/openssl_cnf.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,25 +268,12 @@ def _check_legacy_provider(
)


def scan_openssl_cnf(path: Path | None = None) -> list[OpensslCnfFinding]:
"""
Parse openssl.cnf and flag insecure TLS/OpenSSL configuration settings.
Checks: weak MinProtocol (TLSv1.0/1.1), low CipherString SECLEVEL,
weak elliptic curves, and activation of the legacy provider.
Full implementation: task S4.
"""
conf_path = _resolve_conf_path(path)
if conf_path is None:
logger.info("scan_openssl_cnf: openssl.cnf not found, skipping")
return []

try:
content = conf_path.read_text(encoding="utf-8", errors="ignore")
except OSError as exc:
logger.warning("scan_openssl_cnf: cannot read %s: %s", conf_path, exc)
return []

sections = _parse_config(content)
def parse_openssl_cnf_text(
text: str, virtual_path: str = "openssl.cnf"
) -> list[OpensslCnfFinding]:
"""Parse openssl.cnf text and flag insecure TLS/OpenSSL configuration settings."""
path = Path(virtual_path)
sections = _parse_config(text)
findings: list[OpensslCnfFinding] = []

for section in ("system_default_sect", "default_sect"):
Expand All @@ -296,26 +283,48 @@ def scan_openssl_cnf(path: Path | None = None) -> list[OpensslCnfFinding]:

min_protocol = data.get("minprotocol")
if min_protocol:
finding = _check_min_protocol(conf_path, section, min_protocol)
finding = _check_min_protocol(path, section, min_protocol)
if finding is not None:
findings.append(finding)

cipher_string = data.get("cipherstring")
if cipher_string:
finding = _check_cipher_string(conf_path, section, cipher_string)
finding = _check_cipher_string(path, section, cipher_string)
if finding is not None:
findings.append(finding)

curves = data.get("curves")
if curves:
finding = _check_curves(conf_path, section, curves)
finding = _check_curves(path, section, curves)
if finding is not None:
findings.append(finding)

legacy_finding = _check_legacy_provider(sections, conf_path)
legacy_finding = _check_legacy_provider(sections, path)
if legacy_finding is not None:
findings.append(legacy_finding)

return findings


def scan_openssl_cnf(path: Path | None = None) -> list[OpensslCnfFinding]:
"""
Parse openssl.cnf and flag insecure TLS/OpenSSL configuration settings.
Checks: weak MinProtocol (TLSv1.0/1.1), low CipherString SECLEVEL,
weak elliptic curves, and activation of the legacy provider.
Full implementation: task S4.
"""
conf_path = _resolve_conf_path(path)
if conf_path is None:
logger.info("scan_openssl_cnf: openssl.cnf not found, skipping")
return []

try:
content = conf_path.read_text(encoding="utf-8", errors="ignore")
except OSError as exc:
logger.warning("scan_openssl_cnf: cannot read %s: %s", conf_path, exc)
return []

findings = parse_openssl_cnf_text(content, str(conf_path))
logger.info(
"scan_openssl_cnf: found %d finding(s) in %s",
len(findings),
Expand Down
159 changes: 80 additions & 79 deletions agent/scanners/sshd_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,48 +120,85 @@ class SshdConfigFinding:
]


def _parse_sshd_config(path: Path) -> tuple[dict[str, str], bool]:
"""
Parse the global section of an sshd_config file.

Returns (directives, has_match_block) where:
- directives : dict of lowercase directive name → raw value string
- has_match_block : True when at least one Match block was encountered

Stops at the first Match line — directives inside Match blocks are not
parsed (POC limitation documented in issue #9).
Inline comments (trailing #…) are stripped from values.
"""
def _parse_sshd_config_directives(text: str) -> tuple[dict[str, str], bool]:
"""Parse sshd_config text into (directives, has_match_block)."""
directives: dict[str, str] = {}
has_match_block = False
try:
with path.open(encoding="utf-8") as f:
for raw_line in f:
line = raw_line.strip()
if not line or line.startswith("#"):
continue
# Stop parsing global directives at the first Match block.
if _MATCH_BLOCK_RE.match(line):
has_match_block = True
break
# Strip inline comment from the value (e.g. "Ciphers aes256-ctr # strong").
line = _INLINE_COMMENT_RE.sub("", line).strip()
if not line:
continue
parts = re.split(r"[\s=]+", line, 1)
if len(parts) == 2:
directive = parts[0].lower()
value = parts[1].strip()
# Concatenate repeated directives (e.g. multiple MACs lines).
if directive in directives:
directives[directive] += f",{value}"
else:
directives[directive] = value
except OSError as exc:
logger.warning("Failed to parse sshd_config at %s: %s", path, exc)
for raw_line in text.splitlines():
line = raw_line.strip()
if not line or line.startswith("#"):
continue
# Stop parsing global directives at the first Match block.
if _MATCH_BLOCK_RE.match(line):
has_match_block = True
break
# Strip inline comment from the value (e.g. "Ciphers aes256-ctr # strong").
line = _INLINE_COMMENT_RE.sub("", line).strip()
if not line:
continue
parts = re.split(r"[\s=]+", line, 1)
if len(parts) == 2:
directive = parts[0].lower()
value = parts[1].strip()
# Concatenate repeated directives (e.g. multiple MACs lines).
if directive in directives:
directives[directive] += f",{value}"
else:
directives[directive] = value
return directives, has_match_block


def parse_sshd_config_text(
text: str, virtual_path: str = "sshd_config"
) -> list[SshdConfigFinding]:
"""Parse sshd_config text and emit one finding per weak crypto directive."""
directives, has_match = _parse_sshd_config_directives(text)
findings: list[SshdConfigFinding] = []

if has_match:
findings.append(SshdConfigFinding(
source="sshd_config",
algorithm="Match-block",
severity="info",
name="Match",
key_size=None,
expiration_date=None,
store_or_path=virtual_path,
detail="sshd_config contains Match blocks — scoped configuration not analysed",
recommendation="Manual audit recommended for directives inside Match blocks",
recommendation_url=None,
risk_score=0.0,
))

for directive_key, weak_map in _WEAK_BY_DIRECTIVE.items():
if directive_key not in directives:
continue
raw_value = directives[directive_key]
directive_display = _DIRECTIVE_DISPLAY.get(directive_key, directive_key)
rec, rec_url = _RECOMMENDATIONS.get(directive_key, ("Manual review recommended", None))

for token in raw_value.split(","):
algo = token.strip().lower()
if algo not in weak_map:
continue
severity, score = weak_map[algo]
findings.append(SshdConfigFinding(
source="sshd_config",
algorithm=algo,
severity=severity,
name=directive_display,
key_size=None,
expiration_date=None,
store_or_path=virtual_path,
detail=f"{directive_display} = {raw_value}",
recommendation=rec,
recommendation_url=rec_url,
risk_score=round(score * _CONFIG_TEMPORAL_FACTOR, 1),
))

return findings


def scan_sshd_config(paths_to_scan: list[Path] | None = None) -> list[SshdConfigFinding]:
"""
Scan sshd_config files and emit one finding per weak crypto directive.
Expand Down Expand Up @@ -202,48 +239,12 @@ def scan_sshd_config(paths_to_scan: list[Path] | None = None) -> list[SshdConfig
continue

logger.debug("Parsing sshd_config: %s", file)
parsed, has_match = _parse_sshd_config(file)

if has_match:
findings.append(SshdConfigFinding(
source="sshd_config",
algorithm="Match-block",
severity="info",
name="Match",
key_size=None,
expiration_date=None,
store_or_path=str(file),
detail="sshd_config contains Match blocks — scoped configuration not analysed",
recommendation="Manual audit recommended for directives inside Match blocks",
recommendation_url=None,
risk_score=0.0,
))

for directive_key, weak_map in _WEAK_BY_DIRECTIVE.items():
if directive_key not in parsed:
continue
raw_value = parsed[directive_key]
directive_display = _DIRECTIVE_DISPLAY.get(directive_key, directive_key)
rec, rec_url = _RECOMMENDATIONS.get(directive_key, ("Manual review recommended", None))

for token in raw_value.split(","):
algo = token.strip().lower()
if algo not in weak_map:
continue
severity, score = weak_map[algo]
findings.append(SshdConfigFinding(
source="sshd_config",
algorithm=algo,
severity=severity,
name=directive_display,
key_size=None,
expiration_date=None,
store_or_path=str(file),
detail=f"{directive_display} = {raw_value}",
recommendation=rec,
recommendation_url=rec_url,
risk_score=round(score * _CONFIG_TEMPORAL_FACTOR, 1),
))
try:
text = file.read_text(encoding="utf-8")
except OSError as exc:
logger.warning("Failed to parse sshd_config at %s: %s", file, exc)
continue
findings.extend(parse_sshd_config_text(text, str(file)))

logger.info("scan_sshd_config: found %d finding(s)", len(findings))
return findings
6 changes: 6 additions & 0 deletions agent/tests/import/configs/sshd_config
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Test fixture — sshd_config with weak crypto directives
# Expected findings: 3 (diffie-hellman-group1-sha1 critical, 3des-cbc critical, hmac-sha1 high)
Protocol 2
KexAlgorithms diffie-hellman-group1-sha1,curve25519-sha256
Ciphers 3des-cbc,aes256-gcm@openssh.com
MACs hmac-sha1,hmac-sha2-256-etm@openssh.com
7 changes: 7 additions & 0 deletions agent/tests/import/configs/sshd_config_clean
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Test fixture — sshd_config with strong crypto directives only
# Expected findings: none
Protocol 2
KexAlgorithms curve25519-sha256,diffie-hellman-group16-sha512
Ciphers aes256-gcm@openssh.com,chacha20-poly1305@openssh.com
MACs hmac-sha2-256-etm@openssh.com,hmac-sha2-512-etm@openssh.com
HostKeyAlgorithms ssh-ed25519
Loading