diff --git a/CHANGELOG.md b/CHANGELOG.md index c9b8bc3d..b4e01e6f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,12 +3,30 @@ ## [Unreleased] Changes: +* Feat: `MetaAttack`: aggregate per-record vulnerability across multiple privacy attacks (LiRA, + QMIA, Structural) into a unified vulnerability DataFrame with within-attack (mean, std, + consistency) and cross-attack (arithmetic/geometric MIA mean, structural flag, total + vulnerability count) aggregation. Supports three operating modes via `behaviour`: + `'run_all'` (fresh execution), `'use_existing_only'` (collate from pre-existing + `report.json` files without re-running — critical for attacks such as LiRA that may + take weeks on large model grids), and `'fill_missing'` (run only attacks not already + present). Outputs `vulnerability_matrix.csv` alongside the standard JSON report. + By default appends the MetaAttack section to an existing `report_dir/report.json` + (set `keep_separate=True` for a standalone file). PDF report includes a bar chart + of records grouped by the number of attacks flagging them. `use_existing_only` + and `fill_missing` scan both the canonical single-file `report_dir/report.json` + (multi-section, as produced when individual attacks append to the same file) + and any subdirectory-per-attack layout. Registered in the attack factory as + `"meta"`. * Feat: `QMIAAttack`: membership inference attack via quantile regression (Bertran et al., NeurIPS 2023, arXiv:2307.03694). Trains a histogram-based quantile regressor (`HistGradientBoostingRegressor`) on non-member hinge scores to learn per-sample membership thresholds. A sample is predicted as a member when its observed score exceeds the predicted threshold at quantile level (1 - alpha). No shadow models or architecture knowledge required. Registered in the attack factory as `"qmia"`. +* Fix: `StructuralAttack` now respects the `report_individual` flag. Per-record + `record_level_results` and `attack_metrics["individual"]` are only populated when the + flag is set to `True`, matching the behaviour of `LIRAAttack` and `QMIAAttack`. ## Version 1.4.3 (Jan 29, 2026) diff --git a/README.md b/README.md index f635b355..6e64f365 100644 --- a/README.md +++ b/README.md @@ -100,6 +100,29 @@ Run the full benchmark comparing QMIA against WorstCase and LiRA: python examples/sklearn/benchmark_qmia_full.py ``` +## MetaAttack: Unified Per-Record Vulnerability Aggregation + +`MetaAttack` runs multiple privacy attacks (LiRA, QMIA, Structural) on the same target and aggregates their per-record results into a single vulnerability DataFrame. Three operating modes are supported via the `behaviour` parameter: + +* **`'run_all'`** (default) — run every specified attack from scratch. +* **`'use_existing_only'`** — read per-record scores from pre-existing `report.json` files without re-running anything. Useful when expensive attacks such as LiRA have already been run. +* **`'fill_missing'`** — load existing results and run only the attacks not yet present. + +```python +from sacroml.attacks.meta_attack import MetaAttack +from sacroml.attacks.target import Target + +target = Target(model=model, X_train=X_train, y_train=y_train, X_test=X_test, y_test=y_test) +meta = MetaAttack( + attacks=[("lira", {}), ("qmia", {}), ("structural", {})], + behaviour="run_all", # alternatives: "use_existing_only", "fill_missing" + output_dir="output_meta", +) +meta.attack(target) +``` + +The vulnerability matrix is saved as `vulnerability_matrix.csv` in `output_dir`. + ## Documentation See [API documentation](https://ai-sdc.github.io/SACRO-ML/). diff --git a/examples/sklearn/meta_attack_example.py b/examples/sklearn/meta_attack_example.py new file mode 100644 index 00000000..077f2c31 --- /dev/null +++ b/examples/sklearn/meta_attack_example.py @@ -0,0 +1,104 @@ +"""Example: run a MetaAttack combining QMIA and structural attacks. + +Trains a RandomForest on synthetic data, wraps it in a Target, then +runs MetaAttack to produce a cross-attack vulnerability DataFrame. + +Usage:: + + python examples/sklearn/meta_attack_example.py +""" + +import logging + +from sklearn.datasets import make_classification +from sklearn.ensemble import RandomForestClassifier +from sklearn.model_selection import train_test_split + +from sacroml.attacks.meta_attack import MetaAttack +from sacroml.attacks.target import Target + +logging.basicConfig(level=logging.INFO) + +output_dir = "output_meta_attack" + +if __name__ == "__main__": + # --- Prepare target --- + X, y = make_classification( + n_samples=300, + n_features=10, + n_informative=5, + n_classes=2, + random_state=42, + ) + X_train, X_test, y_train, y_test = train_test_split( + X, y, test_size=0.4, stratify=y, random_state=42 + ) + + model = RandomForestClassifier(n_estimators=100, random_state=42) + model.fit(X_train, y_train) + + target = Target( + model=model, + dataset_name="synthetic", + X_train=X_train, + y_train=y_train, + X_test=X_test, + y_test=y_test, + X_train_orig=X_train, + y_train_orig=y_train, + X_test_orig=X_test, + y_test_orig=y_test, + ) + for idx in range(X.shape[1]): + target.add_feature(f"feature_{idx}", [idx], "float") + + # --- Run MetaAttack --- + meta = MetaAttack( + attacks=[ + ("qmia", {}, 2), # QMIA with 2 repetitions + ("structural", {}), # Structural (single run) + ], + behaviour="run_all", # alternatives: "use_existing_only", "fill_missing" + mia_threshold=0.5, + output_dir=output_dir, + ) + meta.attack(target) + + # --- Inspect results --- + df = meta.vulnerability_df + + print("\n=== Vulnerability Matrix (first 10 records) ===") + print(df.head(10).to_string()) + + print("\n=== Summary Statistics ===") + n_train = int(df["is_member"].sum()) + n_test = len(df) - n_train + print(f"Training records: {n_train}") + print(f"Test records: {n_test}") + + # MIA vulnerability + if "qmia_vuln" in df.columns: + n_qmia = int(df["qmia_vuln"].sum()) + print(f"QMIA vulnerable: {n_qmia}") + + # Structural vulnerability (training records only) + if "struct_vuln" in df.columns: + train_df = df[df["is_member"] == 1] + n_struct = int(train_df["struct_vuln"].sum()) + print(f"Struct vulnerable: {n_struct} (of {n_train} training)") + + # Records vulnerable to all attacks + max_attacks = int(df["n_vulnerable"].max()) + n_all = int((df["n_vulnerable"] == max_attacks).sum()) + print(f"Vulnerable to all: {n_all} (flagged by {max_attacks} attacks)") + + # Top-10 most vulnerable training records by MIA mean + if "mia_mean" in df.columns: + top10 = df[df["is_member"] == 1].nlargest(10, "mia_mean")[ + ["mia_mean", "mia_gmean", "n_vulnerable"] + ] + print("\n=== Top 10 Most Vulnerable Training Records ===") + print(top10.to_string()) + + print(f"\nReport saved to: {output_dir}/") + print(f"CSV saved to: {output_dir}/vulnerability_matrix.csv") diff --git a/sacroml/attacks/constants.py b/sacroml/attacks/constants.py new file mode 100644 index 00000000..0c804a78 --- /dev/null +++ b/sacroml/attacks/constants.py @@ -0,0 +1,34 @@ +"""Shared numerical and default-value constants for the attacks package. + +Centralising these here avoids duplication across attack modules and makes +the *why* of each magic number visible at a glance. + +Notes +----- +A separate :data:`sacroml.attacks.utils.EPS` (``1e-16``) and an identical +``EPS`` in :mod:`sacroml.attacks.likelihood_attack` are kept independently +for now because they predate this module and migrating them is a wider +refactor. A follow-up PR can converge those onto a single constant defined +here once the call sites have been audited. +""" + +from __future__ import annotations + +EPS_META: float = 1e-10 +"""Tolerance added before ``log()`` in geometric-mean aggregation. + +Looser than :data:`sacroml.attacks.utils.EPS` (``1e-16``) because the +geometric mean of MIA scores in :class:`~sacroml.attacks.meta_attack.MetaAttack` +does not need the same precision as normal-distribution CDF/PDF +calculations and benefits from a value comfortably above floating-point +denormals. +""" + +DEFAULT_MIA_THRESHOLD: float = 0.5 +"""Default cutoff above which a per-record membership-inference score is +flagged as vulnerable. + +Used as the ``mia_threshold`` default for +:class:`~sacroml.attacks.meta_attack.MetaAttack` so the value can be +referenced symbolically from tests, examples, and documentation. +""" diff --git a/sacroml/attacks/factory.py b/sacroml/attacks/factory.py index b818e133..e3cfd7b7 100644 --- a/sacroml/attacks/factory.py +++ b/sacroml/attacks/factory.py @@ -7,6 +7,7 @@ from sacroml.attacks.attack import Attack from sacroml.attacks.attribute_attack import AttributeAttack from sacroml.attacks.likelihood_attack import LIRAAttack +from sacroml.attacks.meta_attack import MetaAttack from sacroml.attacks.qmia_attack import QMIAAttack from sacroml.attacks.structural_attack import StructuralAttack from sacroml.attacks.target import Target @@ -19,6 +20,7 @@ registry: dict[str, type[Attack]] = { "attribute": AttributeAttack, "lira": LIRAAttack, + "meta": MetaAttack, "qmia": QMIAAttack, "structural": StructuralAttack, "worstcase": WorstCaseAttack, diff --git a/sacroml/attacks/meta_attack.py b/sacroml/attacks/meta_attack.py new file mode 100644 index 00000000..400f1bd0 --- /dev/null +++ b/sacroml/attacks/meta_attack.py @@ -0,0 +1,826 @@ +"""Meta-attack: aggregate per-record vulnerability across multiple privacy attacks. + +Runs multiple privacy attacks (LiRA, QMIA, Structural) on the same Target, +extracts per-record vulnerability scores from each, and aggregates them into +a unified pandas DataFrame with two-level aggregation: + + Level 1 — within-attack: mean, std, and consistency across repeated runs. + Level 2 — cross-attack: arithmetic/geometric mean of MIA scores, + binary structural flag, and total vulnerability count. + +Supports three operating modes via the *behaviour* parameter: + + ``'run_all'`` (default) + Run every specified attack from scratch. + + ``'use_existing_only'`` + Read per-record scores from existing ``report.json`` files in + *report_dir*; no new attacks are executed. Use when attacks were + already run (possibly at great computational cost) and you only want + to collate their results. + + ``'fill_missing'`` + Load any attacks already present in *report_dir* and run only those + not yet found. Saves redundant computation when some attacks have + been run but others have not. + +Reference: AI-SDC/SACRO-ML#428 +""" + +from __future__ import annotations + +import contextlib +import copy +import json +import logging +import os + +import numpy as np +import pandas as pd +from fpdf import FPDF + +from sacroml import metrics +from sacroml.attacks.attack import Attack +from sacroml.attacks.constants import DEFAULT_MIA_THRESHOLD, EPS_META +from sacroml.attacks.target import Target + +logger = logging.getLogger(__name__) + + +SUPPORTED_ATTACKS: set[str] = {"lira", "qmia", "structural"} +"""Attacks that expose per-record vulnerability scores.""" + +MIA_ATTACKS: set[str] = {"lira", "qmia"} +"""Subset of supported attacks that produce membership-inference scores.""" + +BEHAVIOUR_RUN_ALL: str = "run_all" +BEHAVIOUR_USE_EXISTING: str = "use_existing_only" +BEHAVIOUR_FILL_MISSING: str = "fill_missing" + +# Maps the human-readable attack_name stored in report metadata → factory key. +# Keys must match the __str__() return value of each corresponding attack class. +# Values must be a subset of SUPPORTED_ATTACKS. +_REPORT_NAME_TO_KEY: dict[str, str] = { + "LiRA Attack": "lira", + "QMIA Attack": "qmia", + "Structural Attack": "structural", +} + +_MIA_SCORE_FIELDS: dict[str, str] = { + "lira": "score", + "qmia": "member_prob", +} +"""Maps factory key → field name inside ``attack_metrics[N]["individual"]``. + +Used only by :meth:`MetaAttack._extract_mia_scores` (the live-attack path). +The disk-reading path (:meth:`MetaAttack._extract_scores_from_report`) uses +the same field names but looks them up directly rather than via this mapping. +""" + + +class MetaAttack(Attack): + """Aggregate per-record vulnerability across multiple privacy attacks. + + Parameters + ---------- + attacks : list[tuple] + Each entry is ``(name, params)`` or ``(name, params, n_reps)``. + *name* must be one of :data:`SUPPORTED_ATTACKS`. + *params* is a dict of keyword arguments forwarded to the sub-attack + constructor. *n_reps* (default 1) is the number of independent + repetitions; useful for stochastic attacks like LiRA. + behaviour : str + ``'run_all'`` (default), ``'use_existing_only'``, or + ``'fill_missing'``. See module docstring for details. + report_dir : str or None + Directory to scan for existing attack ``report.json`` files when + *behaviour* is ``'use_existing_only'`` or ``'fill_missing'``. + Defaults to *output_dir* when not provided. + mia_threshold : float + Score above which a record is flagged as MIA-vulnerable. + k_threshold : int or None + k-anonymity value below which a record is structurally vulnerable. + ``None`` reads the default from the ACRO risk-appetite config. + output_dir : str + Directory for all outputs (sub-attack subdirectories, report, CSV). + write_report : bool + Whether to write JSON report and CSV to disk. + keep_separate : bool + Controls JSON output location. ``False`` (default) appends the + MetaAttack section to ``{report_dir}/report.json`` so it joins any + sub-attack reports already there, matching the project convention. + ``True`` writes a separate ``{output_dir}/report.json`` like the + base class. The CSV (``vulnerability_matrix.csv``) and PDF always + follow the JSON output location. + """ + + def __init__( + self, + attacks: list[tuple | list], + behaviour: str = "run_all", + report_dir: str | None = None, + mia_threshold: float = DEFAULT_MIA_THRESHOLD, + k_threshold: int | None = None, + output_dir: str = "outputs", + write_report: bool = True, + keep_separate: bool = False, + ) -> None: + super().__init__(output_dir=output_dir, write_report=write_report) + # MetaAttack does not use shadow models; remove the empty directory + # created by the base class so the output directory stays clean. + with contextlib.suppress(OSError): + os.rmdir(self.shadow_path) + + self.attacks: list[tuple[str, dict, int]] = self._parse_attacks(attacks) + + valid = { + BEHAVIOUR_RUN_ALL, + BEHAVIOUR_USE_EXISTING, + BEHAVIOUR_FILL_MISSING, + } + if behaviour not in valid: + raise ValueError( + f"Unknown behaviour: {behaviour!r}. Expected one of {sorted(valid)}." + ) + self.behaviour: str = behaviour + self.report_dir: str = report_dir if report_dir is not None else output_dir + self.keep_separate: bool = keep_separate + + self.mia_threshold: float = mia_threshold + + if k_threshold is None: + from acro import ACRO # noqa: PLC0415 + + self.k_threshold: int = ACRO("default").config["safe_threshold"] + else: + self.k_threshold = k_threshold + + self.vulnerability_df: pd.DataFrame | None = None + + unknown = set(_REPORT_NAME_TO_KEY.values()) - SUPPORTED_ATTACKS + if unknown: + raise RuntimeError( + f"_REPORT_NAME_TO_KEY references unsupported attacks: {unknown}. " + "Update SUPPORTED_ATTACKS or fix the mapping." + ) + + # ------------------------------------------------------------------ + # Validation + # ------------------------------------------------------------------ + + @staticmethod + def _parse_attacks(attacks: list[tuple | list]) -> list[tuple[str, dict, int]]: + """Normalise and validate the *attacks* specification. + + Accepts 2-tuples ``(name, params)`` — *n_reps* defaults to 1 — or + 3-tuples ``(name, params, n_reps)``. + + Raises + ------ + ValueError + If a tuple has the wrong length, if *name* is not in + :data:`SUPPORTED_ATTACKS`, or if *n_reps* is not a positive + integer. + """ + if not attacks: + raise ValueError("attacks must contain at least one entry.") + + specs: list[tuple[str, dict, int]] = [] + for entry in attacks: + if len(entry) == 2: + name, params = entry + n_reps = 1 + elif len(entry) == 3: + name, params, n_reps = entry + else: + raise ValueError( + f"Expected (name, params) or (name, params, n_reps), " + f"got entry of length {len(entry)}: {entry}" + ) + + if name not in SUPPORTED_ATTACKS: + raise ValueError( + f"Unsupported attack: '{name}'. MetaAttack requires " + f"per-record scores. Supported: " + f"{sorted(SUPPORTED_ATTACKS)}" + ) + + if not isinstance(n_reps, int) or n_reps < 1: + raise ValueError(f"n_reps must be a positive integer, got {n_reps!r}") + + specs.append((name, dict(params), n_reps)) + return specs + + # ------------------------------------------------------------------ + # Abstract method implementations + # ------------------------------------------------------------------ + + @classmethod + def attackable(cls, target: Target) -> bool: + """Return whether *target* can be assessed with the meta-attack.""" + return target.has_model() and target.has_data() + + def _attack(self, target: Target) -> dict: + """Run sub-attacks (or read existing) and aggregate per-record vulnerabilities. + + Behaviour is controlled by ``self.behaviour``: + + - ``'run_all'``: run every attack fresh. + - ``'use_existing_only'``: scan *report_dir* for report.json files; + extract scores without running any new attack. + - ``'fill_missing'``: load existing results from *report_dir*, + run only those not already present. + + Returns an empty dict ``{}`` when no scores are available — this can + happen when no valid ``report.json`` files are found in + ``'use_existing_only'`` mode, or when all sub-attacks fail. + """ + # Step 1: Load existing results when not running entirely from scratch. + existing_mia: dict[str, list[list[float]]] = {} + existing_struct: dict[str, list[dict]] = {} + + if self.behaviour != BEHAVIOUR_RUN_ALL: + existing_mia, existing_struct = self._scan_existing_reports() + + # Step 2: Populate score dicts — start from existing, then run new ones. + mia_scores: dict[str, list[list[float]]] = dict(existing_mia) + structural_scores: dict[str, list[dict]] = dict(existing_struct) + + if self.behaviour != BEHAVIOUR_USE_EXISTING: + self._run_new_attacks( + target, existing_mia, existing_struct, mia_scores, structural_scores + ) + + if not mia_scores and not structural_scores: + logger.warning("No vulnerability scores collected; returning empty report.") + return {} + + if target.X_train is None or target.X_test is None: + logger.warning( + "Target is missing X_train or X_test; returning empty report." + ) + return {} + + n_train = len(target.X_train) + n_test = len(target.X_test) + self.vulnerability_df = self._build_dataframe( + n_train, n_test, mia_scores, structural_scores + ) + self._compute_global_metrics(n_train, n_test) + + output = self._make_report(target) + self._write_report(output) + return output + + # ------------------------------------------------------------------ + # Existing-report scanning + # ------------------------------------------------------------------ + + def _scan_existing_reports( + self, + ) -> tuple[dict[str, list[list[float]]], dict[str, list[dict]]]: + """Scan *report_dir* for cached attack scores. + + Supports two on-disk layouts: + + 1. **Canonical single-file layout**, ``{report_dir}/report.json``, + where each individual attack has appended its own + ``"AttackName_"`` section via :class:`GenerateJSONModule`. + This is the layout produced when LiRA, QMIA, and Structural are + run separately with the same ``output_dir``. + 2. **Subdirectory-per-attack layout**, ``{report_dir}//report.json``, + where each sub-attack has its own ``report.json``. + + Both layouts are scanned, so a mixed setup also works. The attack + type is identified from the ``metadata.attack_name`` field; individual + per-record scores are extracted from + ``attack_experiment_logger["attack_instance_logger"]``. + + Returns + ------- + tuple[dict, dict] + ``(mia_scores, structural_scores)`` with the same structure used + internally by :meth:`_attack`. + """ + mia_scores: dict[str, list[list[float]]] = {} + structural_scores: dict[str, list[dict]] = {} + + if not os.path.isdir(self.report_dir): + logger.warning("report_dir %r does not exist.", self.report_dir) + return mia_scores, structural_scores + + # Layout 1: top-level canonical report.json + top_level = os.path.join(self.report_dir, "report.json") + if os.path.isfile(top_level): + self._extract_from_report_file(top_level, mia_scores, structural_scores) + + # Layout 2: subdirectory-per-attack + try: + entries = sorted(os.scandir(self.report_dir), key=lambda e: e.name) + except OSError as exc: + logger.warning( + "Cannot scan report_dir %r: %s; skipping.", self.report_dir, exc + ) + return mia_scores, structural_scores + + for entry in entries: + if not entry.is_dir(): + continue + sub_report = os.path.join(entry.path, "report.json") + if not os.path.isfile(sub_report): + continue + self._extract_from_report_file(sub_report, mia_scores, structural_scores) + + return mia_scores, structural_scores + + def _extract_from_report_file( + self, + report_path: str, + mia_scores: dict[str, list[list[float]]], + structural_scores: dict[str, list[dict]], + ) -> None: + """Parse one ``report.json`` file, accumulating scores in place. + + Iterates every top-level ``"AttackName_"`` section, identifies + the attack via :data:`_REPORT_NAME_TO_KEY`, and extends the matching + dict (``mia_scores`` or ``structural_scores``). Unrecognised + attack names are skipped with a debug log; unreadable files are + skipped with a warning. + """ + try: + with open(report_path) as fh: + report_data = json.load(fh) + except (OSError, json.JSONDecodeError) as exc: + logger.warning("Could not read %s (%s); skipping.", report_path, exc) + return + + for attack_data in report_data.values(): + if not isinstance(attack_data, dict): + continue + attack_name = attack_data.get("metadata", {}).get("attack_name", "") + key = _REPORT_NAME_TO_KEY.get(attack_name) + if key is None: + logger.debug( + "Unrecognised attack_name %r in %s; skipping.", + attack_name, + report_path, + ) + continue + + scores = self._extract_scores_from_report(attack_data, key) + if scores is None: + continue + + if key in MIA_ATTACKS: + mia_scores.setdefault(key, []).extend(scores) # type: ignore[arg-type] + else: + structural_scores.setdefault(key, []).extend(scores) # type: ignore[arg-type] + + logger.info("Loaded existing %s results from %s.", key, report_path) + + def _extract_scores_from_report( # noqa: C901 + self, report_data: dict, key: str + ) -> list[list[float]] | list[dict] | None: + """Extract per-record scores from a parsed report dict. + + Parameters + ---------- + report_data : dict + A single attack entry from a parsed ``report.json`` file — the dict + value under one ``'AttackName_'`` top-level key. Expected keys: + ``'metadata'``, ``'attack_experiment_logger'``. + key : str + Factory key (``'lira'``, ``'qmia'``, or ``'structural'``). + + Returns + ------- + list[list[float]] | list[dict] | None + One entry per instance found in the report, in the format expected + by :meth:`_build_dataframe`, or ``None`` when no individual scores + are present. + """ + try: + logger_key = "attack_instance_logger" + instances = report_data["attack_experiment_logger"][logger_key] + if not isinstance(instances, dict): + raise TypeError(f"Expected dict, got {type(instances).__name__}") + except (KeyError, TypeError) as exc: + logger.warning( + "Unexpected report structure for %s (%s); skipping.", key, exc + ) + return None + + collected: list = [] + for inst in instances.values(): + if not isinstance(inst, dict): + continue + individual = inst.get("individual") + if individual is None: + continue + + if key == "lira": + raw = individual.get("score") + if raw is not None: + try: + collected.append([max(0.0, min(1.0, float(s))) for s in raw]) + except (TypeError, ValueError) as exc: + logger.warning( + "Non-numeric lira score in report (%s); skipping.", exc + ) + elif key == "qmia": + raw = individual.get("member_prob") + if raw is not None: + try: + collected.append([max(0.0, min(1.0, float(s))) for s in raw]) + except (TypeError, ValueError) as exc: + logger.warning( + "Non-numeric qmia score in report (%s); skipping.", exc + ) + elif key == "structural": + k = individual.get("k_anonymity") + cd = individual.get("class_disclosure") + sg = individual.get("smallgroup_risk") + if k is not None and cd is not None and sg is not None: + collected.append( + { + "k_anonymity": k, + "class_disclosure": cd, + "smallgroup_risk": sg, + } + ) + + if not collected: + logger.warning( + "No individual scores found for %s in report; " + "ensure the attack was run with report_individual=True.", + key, + ) + return None + + return collected + + # ------------------------------------------------------------------ + # Sub-attack execution + # ------------------------------------------------------------------ + + def _run_new_attacks( + self, + target: Target, + existing_mia: dict[str, list], + existing_struct: dict[str, list], + mia_scores: dict[str, list], + structural_scores: dict[str, list], + ) -> None: + """Execute sub-attacks that are not already present and populate score dicts. + + When ``behaviour`` is ``'fill_missing'``, attacks found in *existing_mia* + or *existing_struct* are skipped. Structural attacks with ``n_reps > 1`` + are clamped to a single run (a warning is logged) because they are + deterministic. + """ + for name, params, n_reps in self.attacks: + if self.behaviour == BEHAVIOUR_FILL_MISSING and ( + name in existing_mia or name in existing_struct + ): + logger.info( + "Skipping %s - already present in %r.", name, self.report_dir + ) + continue + + effective_n_reps = n_reps + if name == "structural" and n_reps > 1: + logger.warning( + "Structural attack is deterministic; n_reps=%d requested " + "but all repetitions will be identical. Running once only.", + n_reps, + ) + effective_n_reps = 1 + + for rep in range(effective_n_reps): + logger.info("Running %s (rep %d/%d)", name, rep + 1, effective_n_reps) + attack_obj = self._run_sub_attack(name, params, target, rep) + if attack_obj is None: + continue + + if name in MIA_ATTACKS: + scores = self._extract_mia_scores(attack_obj, name) + if scores is not None: + mia_scores.setdefault(name, []).append(scores) + else: + scores_struct = self._extract_structural_scores(attack_obj) + if scores_struct is not None: + structural_scores.setdefault(name, []).append(scores_struct) + + def _run_sub_attack( + self, + name: str, + params: dict, + target: Target, + run_idx: int, + ) -> Attack | None: + """Create, execute, and return a single sub-attack instance. + + Returns ``None`` and logs a warning if the sub-attack produces no + results, rather than raising an exception. + """ + from sacroml.attacks.factory import create_attack # noqa: PLC0415 + + sub_params = copy.deepcopy(params) + + sub_params["report_individual"] = True + + sub_dir = os.path.join(self.output_dir, f"{name}_run{run_idx}") + sub_params["output_dir"] = sub_dir + sub_params["write_report"] = False + + try: + attack_obj = create_attack(name, **sub_params) + result = attack_obj.attack(target) + except (RuntimeError, ValueError, OSError, TypeError, AssertionError) as exc: + logger.error( + "Sub-attack '%s' (run %d) failed with %s: %s", + name, + run_idx, + type(exc).__name__, + exc, + exc_info=True, + ) + return None + if not result: + logger.warning( + "Sub-attack '%s' (run %d) produced no results; skipping.", + name, + run_idx, + ) + return None + return attack_obj + + # ------------------------------------------------------------------ + # Score extraction + # ------------------------------------------------------------------ + + @staticmethod + def _extract_mia_scores(attack_obj: Attack, name: str) -> list[float] | None: + """Return per-record membership scores from a completed MIA attack. + + Returns ``None`` and logs a warning when individual scores are absent, + rather than raising an exception. + """ + field = _MIA_SCORE_FIELDS[name] + + for metrics_dict in attack_obj.attack_metrics: + scores = metrics_dict.get("individual", {}).get(field) + if scores is None: + continue + try: + return [max(0.0, min(1.0, float(s))) for s in scores] + except (TypeError, ValueError) as exc: + logger.warning( + "%s attack has non-numeric individual scores (%s); skipping.", + name, + exc, + ) + return None + + logger.warning( + "%s attack did not produce individual scores. " + "Ensure report_individual=True was set.", + name, + ) + return None + + @staticmethod + def _extract_structural_scores(attack_obj: Attack) -> dict | None: + """Return per-record structural risk indicators, or ``None`` on failure. + + Reads directly from the ``record_level_results`` dataclass, which is + populated after a successful attack run regardless of ``report_individual``. + Returns ``None`` and logs a warning when results are unavailable. + """ + rlr = getattr(attack_obj, "record_level_results", None) + if rlr is None: + logger.warning("Structural attack has no record_level_results; skipping.") + return None + return { + "k_anonymity": rlr.k_anonymity, + "class_disclosure": rlr.class_disclosure, + "smallgroup_risk": rlr.smallgroup_risk, + } + + # ------------------------------------------------------------------ + # DataFrame construction + # ------------------------------------------------------------------ + + def _build_dataframe( + self, + n_train: int, + n_test: int, + mia_scores: dict[str, list[list[float]]], + structural_scores: dict[str, list[dict]], + ) -> pd.DataFrame: + """Assemble the per-record vulnerability DataFrame.""" + n_total = n_train + n_test + data: dict[str, list] = {} + + data["is_member"] = [1] * n_train + [0] * n_test + + # --- Level 1: within-attack aggregation --- + + mia_mean_cols: list[str] = [] + + for name, reps in mia_scores.items(): + scores_array = np.array(reps) # shape: (n_reps, n_total) + + col_mean = f"{name}_mean" + col_std = f"{name}_std" + col_cons = f"{name}_consistency" + col_vuln = f"{name}_vuln" + + data[col_mean] = np.mean(scores_array, axis=0).tolist() + data[col_std] = np.std(scores_array, axis=0).tolist() + data[col_cons] = np.mean(scores_array > self.mia_threshold, axis=0).tolist() + data[col_vuln] = [m > self.mia_threshold for m in data[col_mean]] + + mia_mean_cols.append(col_mean) + + for _, reps in structural_scores.items(): + if len(reps) == 1: + k_vals = reps[0]["k_anonymity"] + cd_vals = reps[0]["class_disclosure"] + sg_vals = reps[0]["smallgroup_risk"] + else: + k_stack = np.array([r["k_anonymity"] for r in reps]) + cd_stack = np.array([r["class_disclosure"] for r in reps]) + sg_stack = np.array([r["smallgroup_risk"] for r in reps]) + + k_vals = np.round(np.mean(k_stack, axis=0)).astype(int).tolist() + cd_vals = (np.mean(cd_stack, axis=0) > 0.5).tolist() + sg_vals = (np.mean(sg_stack, axis=0) > 0.5).tolist() + + nan_pad = [float("nan")] * n_test + none_pad = [None] * n_test + + data["struct_k"] = list(k_vals) + nan_pad + data["struct_cd"] = list(cd_vals) + none_pad + data["struct_sg"] = list(sg_vals) + none_pad + data["struct_vuln"] = [ + (k < self.k_threshold or cd or sg) + for k, cd, sg in zip(k_vals, cd_vals, sg_vals, strict=True) + ] + none_pad + + # --- Level 2: cross-attack aggregation --- + + if mia_mean_cols: + mia_means = np.column_stack([data[col] for col in mia_mean_cols]) + + data["mia_mean"] = np.mean(mia_means, axis=1).tolist() + data["mia_gmean"] = np.exp( + np.mean(np.log(mia_means + EPS_META), axis=1) + ).tolist() + + vuln_cols = [c for c in data if c.endswith("_vuln")] + n_vuln = np.zeros(n_total) + for col in vuln_cols: + vals = data[col] + for i, v in enumerate(vals): + if v: + n_vuln[i] += 1 + data["n_vulnerable"] = n_vuln.astype(int).tolist() + + df = pd.DataFrame(data) + df.index = pd.Index([f"record_{i}" for i in range(n_total)], name="record") + + logger.info( + "Vulnerability matrix: %d records, %d columns", len(df), len(df.columns) + ) + return df + + # ------------------------------------------------------------------ + # Global metrics and reporting + # ------------------------------------------------------------------ + + def _compute_global_metrics(self, n_train: int, n_test: int) -> None: + """Compute meta-attack global metrics from the vulnerability DataFrame.""" + if self.vulnerability_df is None: + raise RuntimeError( + "_compute_global_metrics called before vulnerability_df was built." + ) + df = self.vulnerability_df + membership = np.array([1] * n_train + [0] * n_test) + + if "mia_mean" in df.columns: + mia_means = df["mia_mean"].to_numpy() + y_pred_proba = np.column_stack([1 - mia_means, mia_means]) + self.attack_metrics = [metrics.get_metrics(y_pred_proba, membership)] + else: + n_vuln_train = int(df.loc[df["is_member"] == 1, "n_vulnerable"].sum()) + self.attack_metrics = [ + { + "n_train": n_train, + "n_test": n_test, + "n_vulnerable_train": n_vuln_train, + } + ] + + def _construct_metadata(self) -> None: + """Add meta-attack specific fields to the report metadata.""" + if self.vulnerability_df is None: + raise RuntimeError( + "_construct_metadata called before vulnerability_df was built." + ) + super()._construct_metadata() + m = self.attack_metrics[0] + gm = self.metadata["global_metrics"] + + gm["mia_threshold"] = self.mia_threshold + gm["k_threshold"] = self.k_threshold + gm["n_records"] = len(self.vulnerability_df) + + if "AUC" in m: + gm["AUC"] = m["AUC"] + gm["TPR"] = m["TPR"] + gm["Advantage"] = m["Advantage"] + + df = self.vulnerability_df + n_vuln_cols = len([c for c in df.columns if c.endswith("_vuln")]) + n_all = int((df["n_vulnerable"] == n_vuln_cols).sum()) if n_vuln_cols > 0 else 0 + gm["n_vulnerable_all_attacks"] = n_all + + def _get_attack_metrics_instances(self) -> dict: + """Return metrics structured for the JSON report.""" + if self.vulnerability_df is None: + raise RuntimeError( + "_get_attack_metrics_instances called before" + " vulnerability_df was built." + ) + instance = dict(self.attack_metrics[0]) + + instance["sub_attacks"] = { + name: {"n_reps": n_reps} for name, _, n_reps in self.attacks + } + instance["individual"] = self.vulnerability_df.to_dict(orient="list") + + return { + "attack_instance_logger": {"instance_0": instance}, + } + + def _write_report(self, output: dict) -> None: + """Write JSON report, PDF, and vulnerability matrix CSV. + + By default, append the MetaAttack section to + ``{report_dir}/report.json`` so it joins any sub-attack reports + already there. With ``keep_separate=True``, fall back to the base + class behaviour and write a standalone ``{output_dir}/report.json``. + The CSV always lands in ``{output_dir}/vulnerability_matrix.csv``. + """ + if self.write_report: + if self.keep_separate: + super()._write_report(output) + else: + self._write_to_report_dir(output) + + if self.write_report and self.vulnerability_df is not None: + csv_path = os.path.join(self.output_dir, "vulnerability_matrix.csv") + try: + self.vulnerability_df.to_csv(csv_path) + logger.info("Saved vulnerability matrix to %s", csv_path) + except OSError as exc: + logger.error( + "Failed to write vulnerability matrix to %s: %s", + csv_path, + exc, + exc_info=True, + ) + + def _write_to_report_dir(self, output: dict) -> None: + """Append MetaAttack JSON (and write PDF) to ``{report_dir}``. + + Uses ``report.write_json`` which appends to an existing + ``report.json`` if present (via ``GenerateJSONModule``). + """ + from sacroml.attacks import report # noqa: PLC0415 + + os.makedirs(self.report_dir, exist_ok=True) + dest: str = os.path.join(self.report_dir, "report") + logger.info("Appending report: %s.json", dest) + report.write_json(output, dest) + pdf_report = self._make_pdf(output) + if pdf_report is not None: + report.write_pdf(dest, pdf_report) + + def _make_pdf(self, output: dict) -> FPDF | None: + """Build the MetaAttack PDF report. + + Delegates to :func:`sacroml.attacks.report.create_meta_report` for + consistency with the other attacks (see ``create_lr_report``, + ``create_mia_report``). The report contains title, attack + parameters, global metrics, a per-sub-attack summary, and a bar + chart of records grouped by the number of attacks flagging them. + """ + from sacroml.attacks import report # noqa: PLC0415 + + return report.create_meta_report(output) + + def __str__(self) -> str: + """Return a human-readable name for this attack.""" + return "Meta Attack" diff --git a/sacroml/attacks/report.py b/sacroml/attacks/report.py index 9cb52061..20c03d1b 100644 --- a/sacroml/attacks/report.py +++ b/sacroml/attacks/report.py @@ -717,3 +717,103 @@ def create_qmia_report(output: dict) -> FPDF: if os.path.exists(dest_log_roc): os.remove(dest_log_roc) return pdf + + +def _draw_n_vulnerable_histogram(n_vulnerable: list, output_dir: str) -> str: + """Draw a bar chart of records grouped by number of attacks flagging them. + + Parameters + ---------- + n_vulnerable : list + Per-record count of attacks that flagged each record. + output_dir : str + Directory in which to save the temporary PNG. + + Returns + ------- + str + Path to the saved PNG. + """ + os.makedirs(output_dir, exist_ok=True) + dest = os.path.join(output_dir, "_meta_n_vulnerable.png") + max_n = max(n_vulnerable) if n_vulnerable else 0 + bins = list(range(max_n + 2)) + fig, ax = plt.subplots(figsize=(6, 4)) + ax.hist(n_vulnerable, bins=bins, color="#2e5cb8", edgecolor="white", align="left") + ax.set_xlabel("Number of attacks flagging the record") + ax.set_ylabel("Number of records") + ax.set_xticks(list(range(max_n + 1))) + plt.tight_layout() + fig.savefig(dest) + plt.close(fig) + return dest + + +def create_meta_report(output: dict) -> FPDF: + """Make a MetaAttack PDF report. + + Includes title, attack parameters, global metrics, a per-sub-attack + summary, and a bar chart of records grouped by the number of attacks + that flagged them. + + Parameters + ---------- + output : dict + MetaAttack output dictionary, with ``metadata`` and + ``attack_experiment_logger`` keys. + + Returns + ------- + fpdf.FPDF + Populated FPDF document. + """ + metadata: dict = output["metadata"] + instance = output["attack_experiment_logger"]["attack_instance_logger"][ + "instance_0" + ] + sub_attacks: dict = instance.get("sub_attacks", {}) + individual: dict = instance.get("individual", {}) + output_dir: str = metadata.get("attack_params", {}).get("output_dir", "outputs") + + pdf = FPDF() + pdf.add_page() + pdf.set_xy(0, 0) + title(pdf, "Meta Attack Report") + + subtitle(pdf, "Metadata") + line( + pdf, + f"{'sacroml_version':>30s}: {str(metadata.get('sacroml_version', '')):30s}", + font="courier", + ) + for key, value in metadata.get("attack_params", {}).items(): + line(pdf, f"{key:>30s}: {str(value):30s}", font="courier") + + subtitle(pdf, "Global metrics") + for key, value in metadata.get("global_metrics", {}).items(): + line(pdf, f"{key:>30s}: {str(value):30s}", font="courier") + + if sub_attacks: + subtitle(pdf, "Sub-attack summary") + for name, sub in sub_attacks.items(): + auc = sub.get("AUC") + if isinstance(auc, (int, float)) and np.isfinite(auc): + auc_str = f"{auc:.4f}" + else: + auc_str = "N/A" + line( + pdf, + f"{name:>30s}: AUC={auc_str}, n_reps={sub.get('n_reps', 1)}", + font="courier", + ) + + n_vulnerable = individual.get("n_vulnerable") + if n_vulnerable: + chart_path = _draw_n_vulnerable_histogram(n_vulnerable, output_dir) + pdf.add_page() + subtitle(pdf, "Records by number of attacks flagging them") + pdf.image(chart_path, x=None, y=None, w=0, h=120, type="", link="") + if os.path.exists(chart_path): + os.remove(chart_path) + + return pdf diff --git a/sacroml/attacks/structural_attack.py b/sacroml/attacks/structural_attack.py index a1b4295c..1c591b4b 100644 --- a/sacroml/attacks/structural_attack.py +++ b/sacroml/attacks/structural_attack.py @@ -360,6 +360,7 @@ def __init__( super().__init__(output_dir=output_dir, write_report=write_report) self.target: Target | None = None self.results: StructuralAttackResults | None = None + self.record_level_results: StructuralRecordLevelResults | None = None self.report_individual = report_individual # Load risk appetite from ACRO config @@ -477,11 +478,12 @@ def _attack(self, target: Target) -> dict: class_disclosure_risk=global_cd, smallgroup_risk=global_small, ) - self.record_level_results = StructuralRecordLevelResults( - k_anonymity=record_level_kval, - class_disclosure=record_level_cd, - smallgroup_risk=record_level_small, - ) + if self.report_individual: + self.record_level_results = StructuralRecordLevelResults( + k_anonymity=record_level_kval, + class_disclosure=record_level_cd, + smallgroup_risk=record_level_small, + ) output = self._make_report(target) @@ -678,7 +680,8 @@ def _construct_metadata(self) -> None: self.attack_metrics = {} for key, val in asdict(self.results).items(): self.attack_metrics[key] = val - self.attack_metrics["individual"] = asdict(self.record_level_results) + if self.report_individual and self.record_level_results: + self.attack_metrics["individual"] = asdict(self.record_level_results) def _get_attack_metrics_instances(self) -> dict: """Return attack metrics. Required by the Attack base class. diff --git a/tests/attacks/test_meta_attack.py b/tests/attacks/test_meta_attack.py new file mode 100644 index 00000000..0c675433 --- /dev/null +++ b/tests/attacks/test_meta_attack.py @@ -0,0 +1,1272 @@ +"""Test MetaAttack.""" + +from __future__ import annotations + +import json +import logging +import os +from types import SimpleNamespace + +import pandas as pd +import pytest +from sklearn.datasets import make_classification +from sklearn.ensemble import RandomForestClassifier +from sklearn.model_selection import train_test_split + +from sacroml.attacks import meta_attack as ma +from sacroml.attacks.meta_attack import MetaAttack +from sacroml.attacks.target import Target + +# ------------------------------------------------------------------ +# Fixtures +# ------------------------------------------------------------------ + + +@pytest.fixture(name="meta_target") +def fixture_meta_target() -> Target: + """Return a binary tabular target suitable for MetaAttack tests.""" + X, y = make_classification( + n_samples=200, + n_features=8, + n_informative=4, + n_redundant=0, + n_repeated=0, + n_classes=2, + class_sep=1.25, + random_state=7, + ) + X_train, X_test, y_train, y_test = train_test_split( + X, y, test_size=0.4, stratify=y, random_state=7 + ) + + model = RandomForestClassifier(n_estimators=50, random_state=7) + model.fit(X_train, y_train) + + target = Target( + model=model, + dataset_name="meta_test", + X_train=X_train, + y_train=y_train, + X_test=X_test, + y_test=y_test, + X_train_orig=X_train, + y_train_orig=y_train, + X_test_orig=X_test, + y_test_orig=y_test, + ) + for idx in range(X.shape[1]): + target.add_feature(f"V{idx}", [idx], "float") + return target + + +# ------------------------------------------------------------------ +# Validation tests +# ------------------------------------------------------------------ + + +def test_meta_unsupported_attack(): + """MetaAttack should reject attacks without per-record scores.""" + with pytest.raises(ValueError, match="Unsupported attack"): + MetaAttack( + attacks=[("worstcase", {})], + k_threshold=10, + ) + + +def test_meta_invalid_tuple(): + """MetaAttack should reject tuples that are not length 2 or 3.""" + with pytest.raises(ValueError, match="got entry of length 1"): + MetaAttack( + attacks=[("lira",)], + k_threshold=10, + ) + + +def test_meta_empty_attacks(): + """MetaAttack should reject an empty attacks list.""" + with pytest.raises(ValueError, match="at least one entry"): + MetaAttack( + attacks=[], + k_threshold=10, + ) + + +def test_meta_invalid_n_reps(): + """MetaAttack should reject n_reps < 1.""" + with pytest.raises(ValueError, match="positive integer"): + MetaAttack( + attacks=[("lira", {}, 0)], + k_threshold=10, + ) + + +# ------------------------------------------------------------------ +# Integration tests +# ------------------------------------------------------------------ + + +def test_meta_basic_qmia_structural(meta_target, tmp_path): + """MetaAttack with QMIA + structural should produce a valid DataFrame.""" + meta = MetaAttack( + attacks=[("qmia", {}), ("structural", {})], + output_dir=str(tmp_path / "meta"), + write_report=False, + k_threshold=10, + ) + + output = meta.attack(meta_target) + + # Report structure + assert output["metadata"]["attack_name"] == "Meta Attack" + + # DataFrame shape: n_train + n_test rows + n_train = len(meta_target.X_train) + n_test = len(meta_target.X_test) + df = meta.vulnerability_df + assert isinstance(df, pd.DataFrame) + assert len(df) == n_train + n_test + + # Expected columns present + assert "is_member" in df.columns + assert "qmia_mean" in df.columns + assert "qmia_vuln" in df.columns + assert "struct_k" in df.columns + assert "struct_vuln" in df.columns + assert "mia_mean" in df.columns + assert "mia_gmean" in df.columns + assert "n_vulnerable" in df.columns + + +def test_meta_structural_nan_for_test_records(meta_target, tmp_path): + """Structural columns should be NaN/None for test (non-member) records.""" + meta = MetaAttack( + attacks=[("qmia", {}), ("structural", {})], + output_dir=str(tmp_path / "meta"), + write_report=False, + k_threshold=10, + ) + meta.attack(meta_target) + + df = meta.vulnerability_df + test_rows = df[df["is_member"] == 0] + assert test_rows["struct_k"].isna().all() + + +def test_meta_repeated_runs(meta_target, tmp_path): + """Repeated runs should produce non-zero std for at least some records.""" + meta = MetaAttack( + attacks=[("qmia", {}, 2)], + output_dir=str(tmp_path / "meta"), + write_report=False, + k_threshold=10, + ) + meta.attack(meta_target) + + df = meta.vulnerability_df + # With 2 stochastic reps, some records should have non-zero std + assert "qmia_std" in df.columns + # Consistency should be in [0, 1] + assert df["qmia_consistency"].between(0.0, 1.0).all() + + +def test_meta_threshold_effects(meta_target, tmp_path): + """Different thresholds should produce different vulnerability counts.""" + meta_low = MetaAttack( + attacks=[("qmia", {})], + output_dir=str(tmp_path / "low"), + write_report=False, + mia_threshold=0.3, + k_threshold=10, + ) + meta_low.attack(meta_target) + + meta_high = MetaAttack( + attacks=[("qmia", {})], + output_dir=str(tmp_path / "high"), + write_report=False, + mia_threshold=0.7, + k_threshold=10, + ) + meta_high.attack(meta_target) + + n_vuln_low = meta_low.vulnerability_df["n_vulnerable"].sum() + n_vuln_high = meta_high.vulnerability_df["n_vulnerable"].sum() + # Lower threshold should flag more records + assert n_vuln_low >= n_vuln_high + + +def test_meta_global_metrics(meta_target, tmp_path): + """Global metrics should contain AUC in [0, 1] when MIA attacks are run.""" + meta = MetaAttack( + attacks=[("qmia", {})], + output_dir=str(tmp_path / "meta"), + write_report=False, + k_threshold=10, + ) + output = meta.attack(meta_target) + + m = output["attack_experiment_logger"]["attack_instance_logger"]["instance_0"] + assert 0 <= m["AUC"] <= 1 + assert 0 <= m["TPR"] <= 1 + + +def test_meta_report_structure(meta_target, tmp_path): + """JSON report should have the standard nested structure.""" + meta = MetaAttack( + attacks=[("qmia", {}), ("structural", {})], + output_dir=str(tmp_path / "meta"), + write_report=False, + k_threshold=10, + ) + output = meta.attack(meta_target) + + assert "log_id" in output + assert "metadata" in output + assert "attack_experiment_logger" in output + + metadata = output["metadata"] + assert metadata["attack_name"] == "Meta Attack" + assert "global_metrics" in metadata + assert "mia_threshold" in metadata["global_metrics"] + assert "k_threshold" in metadata["global_metrics"] + assert "n_vulnerable_all_attacks" in metadata["global_metrics"] + + instance = output["attack_experiment_logger"]["attack_instance_logger"][ + "instance_0" + ] + assert "sub_attacks" in instance + assert "individual" in instance + assert "qmia" in instance["sub_attacks"] + assert "structural" in instance["sub_attacks"] + + +def test_meta_factory_integration(meta_target, tmp_path): + """MetaAttack should be invocable via the attack factory.""" + from sacroml.attacks.factory import attack # noqa: PLC0415 + + output = attack( + target=meta_target, + attack_name="meta", + attacks=[("qmia", {})], + output_dir=str(tmp_path / "factory"), + write_report=False, + k_threshold=10, + ) + + assert output["metadata"]["attack_name"] == "Meta Attack" + + +def test_meta_csv_export(meta_target, tmp_path): + """MetaAttack with write_report=True should produce a CSV file.""" + out_dir = str(tmp_path / "meta") + meta = MetaAttack( + attacks=[("qmia", {})], + output_dir=out_dir, + write_report=True, + k_threshold=10, + ) + meta.attack(meta_target) + + csv_path = os.path.join(out_dir, "vulnerability_matrix.csv") + assert os.path.isfile(csv_path) + + df_loaded = pd.read_csv(csv_path, index_col=0) + assert len(df_loaded) == len(meta.vulnerability_df) + + +# ------------------------------------------------------------------ +# Behaviour mode tests +# ------------------------------------------------------------------ + + +def test_meta_invalid_behaviour(): + """MetaAttack should reject an unrecognised behaviour string.""" + with pytest.raises(ValueError, match="Unknown behaviour"): + MetaAttack( + attacks=[("qmia", {})], + behaviour="rerun_everything", + k_threshold=10, + ) + + +def test_meta_use_existing_only(meta_target, tmp_path): + """Use_existing_only reads from pre-existing report.json files.""" + n_train = len(meta_target.X_train) + n_test = len(meta_target.X_test) + n_total = n_train + n_test + + # Build a minimal mock QMIA report.json in a subdirectory. + # The real format wraps each attack under "AttackName_" (GenerateJSONModule). + scores = [0.6] * n_train + [0.4] * n_test + mock_report = { + "QMIA Attack_test-uuid": { + "metadata": {"attack_name": "QMIA Attack"}, + "attack_experiment_logger": { + "attack_instance_logger": { + "instance_0": {"individual": {"member_prob": scores}} + } + }, + } + } + report_dir = str(tmp_path / "existing") + sub_dir = os.path.join(report_dir, "qmia_run0") + os.makedirs(sub_dir) + with open(os.path.join(sub_dir, "report.json"), "w") as fh: + json.dump(mock_report, fh) + + meta = MetaAttack( + attacks=[("qmia", {})], + behaviour="use_existing_only", + report_dir=report_dir, + output_dir=str(tmp_path / "meta_out"), + write_report=False, + k_threshold=10, + ) + output = meta.attack(meta_target) + + assert output["metadata"]["attack_name"] == "Meta Attack" + df = meta.vulnerability_df + assert isinstance(df, pd.DataFrame) + assert len(df) == n_total + # Training records should be flagged (score 0.6 > threshold 0.5) + assert df.loc[df["is_member"] == 1, "qmia_vuln"].all() + # Test records should not be flagged (score 0.4 <= threshold 0.5) + assert not df.loc[df["is_member"] == 0, "qmia_vuln"].any() + + +def test_meta_use_existing_missing_individual(meta_target, tmp_path): + """Use_existing_only skips reports that lack individual scores.""" + # Report without the 'individual' key (uses the real nested on-disk format). + mock_report = { + "QMIA Attack_test-uuid": { + "metadata": {"attack_name": "QMIA Attack"}, + "attack_experiment_logger": { + "attack_instance_logger": { + "instance_0": {} # no 'individual' key + } + }, + } + } + report_dir = str(tmp_path / "existing") + sub_dir = os.path.join(report_dir, "qmia_run0") + os.makedirs(sub_dir) + with open(os.path.join(sub_dir, "report.json"), "w") as fh: + json.dump(mock_report, fh) + + meta = MetaAttack( + attacks=[("qmia", {})], + behaviour="use_existing_only", + report_dir=report_dir, + output_dir=str(tmp_path / "meta_out"), + write_report=False, + k_threshold=10, + ) + # No scores collected → empty report + result = meta.attack(meta_target) + assert result == {} + + +def test_meta_fill_missing_skips_present(meta_target, tmp_path): + """Fill_missing should skip attacks already in report_dir.""" + n_train = len(meta_target.X_train) + n_test = len(meta_target.X_test) + scores = [0.7] * (n_train + n_test) + + mock_qmia = { + "QMIA Attack_test-uuid": { + "metadata": {"attack_name": "QMIA Attack"}, + "attack_experiment_logger": { + "attack_instance_logger": { + "instance_0": {"individual": {"member_prob": scores}} + } + }, + } + } + report_dir = str(tmp_path / "existing") + sub_dir = os.path.join(report_dir, "qmia_run0") + os.makedirs(sub_dir) + with open(os.path.join(sub_dir, "report.json"), "w") as fh: + json.dump(mock_qmia, fh) + + # Ask for qmia (already present) + structural (missing → will run) + meta = MetaAttack( + attacks=[("qmia", {}), ("structural", {})], + behaviour="fill_missing", + report_dir=report_dir, + output_dir=str(tmp_path / "meta_out"), + write_report=False, + k_threshold=10, + ) + output = meta.attack(meta_target) + + assert output["metadata"]["attack_name"] == "Meta Attack" + df = meta.vulnerability_df + assert "qmia_mean" in df.columns + assert "struct_k" in df.columns + # QMIA scores must come from the mock (all 0.7), not from a fresh live run. + assert df["qmia_mean"].dropna().between(0.69, 0.71).all() + + +def test_meta_structural_warns_nreps(meta_target, tmp_path, caplog): + """Structural attack with n_reps > 1 should warn and run only once.""" + meta = MetaAttack( + attacks=[("structural", {}, 3)], + output_dir=str(tmp_path / "meta"), + write_report=False, + k_threshold=10, + ) + with caplog.at_level(logging.WARNING): + meta.attack(meta_target) + + assert any("deterministic" in msg for msg in caplog.messages) + + df = meta.vulnerability_df + assert df is not None + assert "struct_k" in df.columns + # Only one set of structural scores should be present (single run) + assert df["struct_k"].notna().sum() == len(meta_target.X_train) + + +def test_meta_corrupted_report_json_skipped(meta_target, tmp_path): + """Use_existing_only skips subdirectories whose report.json is not valid JSON.""" + report_dir = str(tmp_path / "existing") + sub_dir = os.path.join(report_dir, "qmia_run0") + os.makedirs(sub_dir) + with open(os.path.join(sub_dir, "report.json"), "w") as fh: + fh.write("this is not valid json {{{") + + meta = MetaAttack( + attacks=[("qmia", {})], + behaviour="use_existing_only", + report_dir=report_dir, + output_dir=str(tmp_path / "meta_out"), + write_report=False, + k_threshold=10, + ) + # Bad file should be skipped gracefully — result is empty, not a crash. + result = meta.attack(meta_target) + assert result == {} + + +def test_meta_scan_nonexistent_report_dir(meta_target, tmp_path): + """Use_existing_only with a missing report_dir returns an empty result.""" + meta = MetaAttack( + attacks=[("qmia", {})], + behaviour="use_existing_only", + report_dir=str(tmp_path / "does_not_exist"), + output_dir=str(tmp_path / "meta_out"), + write_report=False, + k_threshold=10, + ) + result = meta.attack(meta_target) + assert result == {} + + +def test_meta_structural_multiple_reps_averaging(meta_target, tmp_path): + """Structural_scores with multiple reps should be averaged in the DataFrame.""" + n_train = len(meta_target.X_train) + + # Directly call _build_dataframe with two structural reps to exercise averaging. + meta = MetaAttack( + attacks=[("structural", {})], + output_dir=str(tmp_path / "meta"), + write_report=False, + k_threshold=10, + ) + # Fabricate two structural reps with different k-anonymity values. + false_train = [False] * n_train + reps = [ + { + "k_anonymity": [2] * n_train, + "class_disclosure": false_train, + "smallgroup_risk": false_train, + }, + { + "k_anonymity": [4] * n_train, + "class_disclosure": false_train, + "smallgroup_risk": false_train, + }, + ] + n_test = len(meta_target.X_test) + df = meta._build_dataframe(n_train, n_test, {}, {"structural": reps}) + + # k values should be averaged: (2 + 4) / 2 = 3 + assert all(v == 3 for v in df["struct_k"].dropna()) + + +# ------------------------------------------------------------------ +# Additional coverage: S4, S5, S6 +# ------------------------------------------------------------------ + + +def test_meta_use_existing_structural(meta_target, tmp_path): + """Use_existing_only loads structural scores from a pre-existing report.json.""" + n_train = len(meta_target.X_train) + + false_train = [False] * n_train + mock_report = { + "Structural Attack_test-uuid": { + "metadata": {"attack_name": "Structural Attack"}, + "attack_experiment_logger": { + "attack_instance_logger": { + "instance_0": { + "individual": { + "k_anonymity": [5] * n_train, + "class_disclosure": false_train, + "smallgroup_risk": false_train, + } + } + } + }, + } + } + report_dir = str(tmp_path / "existing") + sub_dir = os.path.join(report_dir, "struct_run0") + os.makedirs(sub_dir) + with open(os.path.join(sub_dir, "report.json"), "w") as fh: + json.dump(mock_report, fh) + + meta = MetaAttack( + attacks=[("structural", {})], + behaviour="use_existing_only", + report_dir=report_dir, + output_dir=str(tmp_path / "meta_out"), + write_report=False, + k_threshold=10, + ) + output = meta.attack(meta_target) + + assert output["metadata"]["attack_name"] == "Meta Attack" + df = meta.vulnerability_df + assert "struct_k" in df.columns + assert "struct_vuln" in df.columns + # k=5 < k_threshold=10 → all training records should be flagged + assert df.loc[df["is_member"] == 1, "struct_vuln"].all() + + +def test_meta_fill_missing_full_cache_hit(meta_target, tmp_path): + """Fill_missing with all attacks already on disk runs nothing new.""" + n_train = len(meta_target.X_train) + n_test = len(meta_target.X_test) + scores = [0.8] * (n_train + n_test) + + mock_qmia = { + "QMIA Attack_test-uuid": { + "metadata": {"attack_name": "QMIA Attack"}, + "attack_experiment_logger": { + "attack_instance_logger": { + "instance_0": {"individual": {"member_prob": scores}} + } + }, + } + } + report_dir = str(tmp_path / "existing") + sub_dir = os.path.join(report_dir, "qmia_run0") + os.makedirs(sub_dir) + with open(os.path.join(sub_dir, "report.json"), "w") as fh: + json.dump(mock_qmia, fh) + + meta = MetaAttack( + attacks=[("qmia", {})], + behaviour="fill_missing", + report_dir=report_dir, + output_dir=str(tmp_path / "meta_out"), + write_report=False, + k_threshold=10, + ) + output = meta.attack(meta_target) + + assert output["metadata"]["attack_name"] == "Meta Attack" + df = meta.vulnerability_df + assert "qmia_mean" in df.columns + # All scores came from mock (0.8) — no live run happened. + assert df["qmia_mean"].dropna().between(0.79, 0.81).all() + + +def test_meta_mia_cross_attack_aggregation(meta_target, tmp_path): + """Mia_mean and mia_gmean are correct when two MIA attacks run together.""" + meta = MetaAttack( + attacks=[("qmia", {}), ("lira", {"n_shadow_models": 10})], + output_dir=str(tmp_path / "meta"), + write_report=False, + k_threshold=10, + ) + meta.attack(meta_target) + + df = meta.vulnerability_df + assert "qmia_mean" in df.columns + assert "lira_mean" in df.columns + assert "mia_mean" in df.columns + assert "mia_gmean" in df.columns + + # mia_mean must be the arithmetic mean of the two per-attack means. + import numpy as np # noqa: PLC0415 + + expected = (df["qmia_mean"] + df["lira_mean"]) / 2 + assert np.allclose(df["mia_mean"], expected, equal_nan=True) + + +# ------------------------------------------------------------------ +# I3: structural-only global metrics path +# ------------------------------------------------------------------ + + +def test_meta_structural_only_global_metrics(meta_target, tmp_path): + """Structural-only run must not produce AUC and must report n_vulnerable_train.""" + meta = MetaAttack( + attacks=[("structural", {})], + output_dir=str(tmp_path / "meta"), + write_report=False, + k_threshold=10, + ) + output = meta.attack(meta_target) + + gm = output["metadata"]["global_metrics"] + assert "AUC" not in gm + assert "TPR" not in gm + + logger_key = "attack_instance_logger" + instance = output["attack_experiment_logger"][logger_key]["instance_0"] + assert "AUC" not in instance + assert "n_train" in instance + assert "n_vulnerable_train" in instance + assert isinstance(instance["n_vulnerable_train"], int) + assert instance["n_vulnerable_train"] >= 0 + + +# ------------------------------------------------------------------ +# I4: n_vulnerable_all_attacks value +# ------------------------------------------------------------------ + + +def test_meta_n_vulnerable_all_attacks_value(meta_target, tmp_path): + """N_vulnerable_all_attacks counts records flagged by every active attack.""" + # mia_threshold=-1.0 guarantees all QMIA scores (clipped to [0,1]) satisfy > -1.0 + meta = MetaAttack( + attacks=[("qmia", {})], + output_dir=str(tmp_path / "meta"), + write_report=False, + mia_threshold=-1.0, + k_threshold=10, + ) + output = meta.attack(meta_target) + n_total = len(meta_target.X_train) + len(meta_target.X_test) + assert output["metadata"]["global_metrics"]["n_vulnerable_all_attacks"] == n_total + + meta2 = MetaAttack( + attacks=[("qmia", {})], + output_dir=str(tmp_path / "meta2"), + write_report=False, + mia_threshold=1.1, + k_threshold=10, + ) + output2 = meta2.attack(meta_target) + assert output2["metadata"]["global_metrics"]["n_vulnerable_all_attacks"] == 0 + + +# ------------------------------------------------------------------ +# I5: struct_vuln via class_disclosure / smallgroup_risk +# ------------------------------------------------------------------ + + +def test_meta_struct_vuln_flagged_by_class_disclosure(meta_target, tmp_path): + """Struct_vuln must be True when class_disclosure=True, even if k >= k_threshold.""" + n_train = len(meta_target.X_train) + n_test = len(meta_target.X_test) + meta = MetaAttack( + attacks=[("structural", {})], + output_dir=str(tmp_path / "meta"), + write_report=False, + k_threshold=5, + ) + # k well above threshold, but class_disclosure triggers the flag + reps = [ + { + "k_anonymity": [10] * n_train, + "class_disclosure": [True] * n_train, + "smallgroup_risk": [False] * n_train, + } + ] + df = meta._build_dataframe(n_train, n_test, {}, {"structural": reps}) + assert df.loc[df["is_member"] == 1, "struct_vuln"].all() + + +def test_meta_struct_vuln_flagged_by_smallgroup_risk(meta_target, tmp_path): + """Struct_vuln must be True when smallgroup_risk=True, even if k >= k_threshold.""" + n_train = len(meta_target.X_train) + n_test = len(meta_target.X_test) + meta = MetaAttack( + attacks=[("structural", {})], + output_dir=str(tmp_path / "meta"), + write_report=False, + k_threshold=5, + ) + reps = [ + { + "k_anonymity": [10] * n_train, + "class_disclosure": [False] * n_train, + "smallgroup_risk": [True] * n_train, + } + ] + df = meta._build_dataframe(n_train, n_test, {}, {"structural": reps}) + assert df.loc[df["is_member"] == 1, "struct_vuln"].all() + + +# ------------------------------------------------------------------ +# keep_separate / append-to-existing-report.json tests +# ------------------------------------------------------------------ + + +def test_meta_keep_separate_default_writes_to_report_dir(meta_target, tmp_path): + """Default ``keep_separate=False`` writes report.json to ``report_dir``.""" + out_dir = str(tmp_path / "out") + rep_dir = str(tmp_path / "rep") + meta = MetaAttack( + attacks=[("qmia", {})], + output_dir=out_dir, + report_dir=rep_dir, + write_report=True, + k_threshold=10, + ) + meta.attack(meta_target) + + assert os.path.isfile(os.path.join(rep_dir, "report.json")) + assert not os.path.isfile(os.path.join(out_dir, "report.json")) + assert os.path.isfile(os.path.join(out_dir, "vulnerability_matrix.csv")) + + +def test_meta_keep_separate_true_writes_to_output_dir(meta_target, tmp_path): + """``keep_separate=True`` writes report.json to ``output_dir`` (base behaviour).""" + out_dir = str(tmp_path / "out") + rep_dir = str(tmp_path / "rep") + os.makedirs(rep_dir, exist_ok=True) + meta = MetaAttack( + attacks=[("qmia", {})], + output_dir=out_dir, + report_dir=rep_dir, + write_report=True, + keep_separate=True, + k_threshold=10, + ) + meta.attack(meta_target) + + assert os.path.isfile(os.path.join(out_dir, "report.json")) + assert not os.path.isfile(os.path.join(rep_dir, "report.json")) + + +def test_meta_make_pdf_returns_fpdf(meta_target, tmp_path): + """``_make_pdf`` should return an FPDF instance, not None.""" + meta = MetaAttack( + attacks=[("qmia", {})], + output_dir=str(tmp_path / "meta"), + write_report=False, + k_threshold=10, + ) + output = meta.attack(meta_target) + pdf = meta._make_pdf(output) + assert pdf is not None + + +def test_meta_pdf_written_to_report_dir_by_default(meta_target, tmp_path): + """With default keep_separate=False, report.pdf lands in report_dir.""" + out_dir = str(tmp_path / "out") + rep_dir = str(tmp_path / "rep") + meta = MetaAttack( + attacks=[("qmia", {})], + output_dir=out_dir, + report_dir=rep_dir, + write_report=True, + k_threshold=10, + ) + meta.attack(meta_target) + pdf_path = os.path.join(rep_dir, "report.pdf") + assert os.path.isfile(pdf_path) + assert os.path.getsize(pdf_path) > 0 + + +def test_meta_use_existing_reads_canonical_single_file(meta_target, tmp_path): + """Use_existing_only reads the canonical top-level report.json (multi-section).""" + n_train = len(meta_target.X_train) + n_test = len(meta_target.X_test) + n_total = n_train + n_test + + qmia_scores = [0.7] * n_train + [0.3] * n_test + struct_kvals = [3] * n_train + struct_cd = [False] * n_train + struct_sg = [False] * n_train + + report_dir = tmp_path / "rep" + report_dir.mkdir() + canonical = { + "QMIA Attack_qmia-uuid": { + "metadata": {"attack_name": "QMIA Attack"}, + "attack_experiment_logger": { + "attack_instance_logger": { + "instance_0": {"individual": {"member_prob": qmia_scores}} + } + }, + }, + "Structural Attack_struct-uuid": { + "metadata": {"attack_name": "Structural Attack"}, + "attack_experiment_logger": { + "attack_instance_logger": { + "instance_0": { + "individual": { + "k_anonymity": struct_kvals, + "class_disclosure": struct_cd, + "smallgroup_risk": struct_sg, + } + } + } + }, + }, + } + (report_dir / "report.json").write_text(json.dumps(canonical)) + + meta = MetaAttack( + attacks=[("qmia", {}), ("structural", {})], + behaviour="use_existing_only", + report_dir=str(report_dir), + output_dir=str(tmp_path / "meta_out"), + write_report=False, + k_threshold=10, + ) + meta.attack(meta_target) + + df = meta.vulnerability_df + assert isinstance(df, pd.DataFrame) + assert len(df) == n_total + # QMIA scores were 0.7 for train, > 0.5 threshold ⇒ qmia_vuln True + assert df.loc[df["is_member"] == 1, "qmia_vuln"].all() + # k_anonymity 3 < k_threshold 10 ⇒ struct_vuln True for train + assert df.loc[df["is_member"] == 1, "struct_vuln"].all() + # Test records get NaN in structural columns + assert df.loc[df["is_member"] == 0, "struct_k"].isna().all() + + +def test_meta_appends_to_existing_report_json(meta_target, tmp_path): + """Default mode appends to existing report.json, keeps prior sections.""" + rep_dir = tmp_path / "rep" + rep_dir.mkdir() + existing = { + "LiRA Attack_abc123": { + "metadata": {"attack_name": "LiRA Attack", "log_id": "abc123"}, + "fake_payload": True, + } + } + existing_path = rep_dir / "report.json" + existing_path.write_text(json.dumps(existing)) + + meta = MetaAttack( + attacks=[("qmia", {})], + output_dir=str(tmp_path / "out"), + report_dir=str(rep_dir), + write_report=True, + k_threshold=10, + ) + meta.attack(meta_target) + + with open(existing_path) as f: + data = json.load(f) + + assert "LiRA Attack_abc123" in data + assert data["LiRA Attack_abc123"]["fake_payload"] is True + meta_keys = [k for k in data if k.startswith("Meta Attack_")] + assert len(meta_keys) == 1 + + +# ------------------------------------------------------------------ +# Construction edge cases +# ------------------------------------------------------------------ + + +@pytest.fixture(name="bare_meta") +def fixture_bare_meta(tmp_path) -> MetaAttack: + """Return a minimal MetaAttack instance for direct unit testing of helpers.""" + return MetaAttack( + attacks=[("qmia", {})], + output_dir=str(tmp_path / "bare"), + write_report=False, + k_threshold=10, + ) + + +def test_meta_k_threshold_defaults_from_acro(tmp_path): + """Omitting k_threshold reads the default from the ACRO config.""" + meta = MetaAttack( + attacks=[("qmia", {})], + output_dir=str(tmp_path / "meta"), + write_report=False, + ) + assert isinstance(meta.k_threshold, int) + assert meta.k_threshold > 0 + + +def test_meta_bad_report_name_mapping(tmp_path, monkeypatch): + """A _REPORT_NAME_TO_KEY value outside SUPPORTED_ATTACKS is rejected.""" + monkeypatch.setitem(ma._REPORT_NAME_TO_KEY, "Bogus Attack", "bogus") + with pytest.raises(RuntimeError, match="references unsupported attacks"): + MetaAttack( + attacks=[("qmia", {})], + output_dir=str(tmp_path / "meta"), + write_report=False, + k_threshold=10, + ) + + +# ------------------------------------------------------------------ +# _attack early-exit guards +# ------------------------------------------------------------------ + + +def test_meta_missing_xtrain_returns_empty(meta_target, tmp_path): + """When the target lacks X_train/X_test, _attack returns an empty report.""" + scores = [0.6, 0.4, 0.5] + mock_report = { + "QMIA Attack_test-uuid": { + "metadata": {"attack_name": "QMIA Attack"}, + "attack_experiment_logger": { + "attack_instance_logger": { + "instance_0": {"individual": {"member_prob": scores}} + } + }, + } + } + report_dir = str(tmp_path / "existing") + sub_dir = os.path.join(report_dir, "qmia_run0") + os.makedirs(sub_dir) + with open(os.path.join(sub_dir, "report.json"), "w") as fh: + json.dump(mock_report, fh) + + meta = MetaAttack( + attacks=[("qmia", {})], + behaviour="use_existing_only", + report_dir=report_dir, + output_dir=str(tmp_path / "meta_out"), + write_report=False, + k_threshold=10, + ) + # Scores are collected, but the target is missing arrays → empty report. + meta_target.X_train = None + meta_target.X_test = None + assert meta._attack(meta_target) == {} + + +def test_meta_subattack_returns_none_yields_empty(meta_target, tmp_path, monkeypatch): + """When every sub-attack returns None, _attack yields an empty report.""" + meta = MetaAttack( + attacks=[("qmia", {})], + output_dir=str(tmp_path / "meta"), + write_report=False, + k_threshold=10, + ) + monkeypatch.setattr(meta, "_run_sub_attack", lambda *_a, **_k: None) + assert meta.attack(meta_target) == {} + + +# ------------------------------------------------------------------ +# _scan_existing_reports filesystem edge cases +# ------------------------------------------------------------------ + + +def test_meta_scandir_oserror_handled(meta_target, tmp_path, monkeypatch): + """An OSError from os.scandir is caught and yields an empty result.""" + report_dir = tmp_path / "rep" + report_dir.mkdir() + + def boom(_path): + raise OSError("scandir failed") + + monkeypatch.setattr(ma.os, "scandir", boom) + + meta = MetaAttack( + attacks=[("qmia", {})], + behaviour="use_existing_only", + report_dir=str(report_dir), + output_dir=str(tmp_path / "meta_out"), + write_report=False, + k_threshold=10, + ) + assert meta.attack(meta_target) == {} + + +def test_meta_subdir_without_report_json_skipped(meta_target, tmp_path): + """A subdirectory lacking report.json is skipped without error.""" + report_dir = tmp_path / "rep" + report_dir.mkdir() + (report_dir / "empty_sub").mkdir() # no report.json inside + + meta = MetaAttack( + attacks=[("qmia", {})], + behaviour="use_existing_only", + report_dir=str(report_dir), + output_dir=str(tmp_path / "meta_out"), + write_report=False, + k_threshold=10, + ) + assert meta.attack(meta_target) == {} + + +# ------------------------------------------------------------------ +# _extract_from_report_file parsing edge cases +# ------------------------------------------------------------------ + + +def test_extract_from_report_file_skips_nondict_and_unknown(bare_meta, tmp_path): + """Non-dict sections and unrecognised attack names are skipped.""" + path = tmp_path / "report.json" + data = { + "weird_entry": "not a dict", + "Mystery Attack_x": {"metadata": {"attack_name": "Mystery Attack"}}, + } + path.write_text(json.dumps(data)) + + mia_scores: dict = {} + structural_scores: dict = {} + bare_meta._extract_from_report_file(str(path), mia_scores, structural_scores) + assert mia_scores == {} + assert structural_scores == {} + + +# ------------------------------------------------------------------ +# _extract_scores_from_report branches +# ------------------------------------------------------------------ + + +def test_extract_scores_instances_not_dict(bare_meta): + """A non-dict attack_instance_logger returns None.""" + data = { + "metadata": {"attack_name": "QMIA Attack"}, + "attack_experiment_logger": {"attack_instance_logger": ["not", "a", "dict"]}, + } + assert bare_meta._extract_scores_from_report(data, "qmia") is None + + +def test_extract_scores_instance_not_dict(bare_meta): + """A non-dict instance value is skipped, yielding None when none remain.""" + data = { + "metadata": {"attack_name": "QMIA Attack"}, + "attack_experiment_logger": { + "attack_instance_logger": {"instance_0": "not a dict"} + }, + } + assert bare_meta._extract_scores_from_report(data, "qmia") is None + + +def test_extract_scores_lira_valid(bare_meta): + """A valid LiRA report yields clamped per-record scores.""" + data = { + "metadata": {"attack_name": "LiRA Attack"}, + "attack_experiment_logger": { + "attack_instance_logger": { + "instance_0": {"individual": {"score": [-0.5, 0.5, 2.0]}} + } + }, + } + out = bare_meta._extract_scores_from_report(data, "lira") + assert out == [[0.0, 0.5, 1.0]] + + +def test_extract_scores_lira_non_numeric(bare_meta): + """A non-numeric LiRA score is skipped, yielding None.""" + data = { + "metadata": {"attack_name": "LiRA Attack"}, + "attack_experiment_logger": { + "attack_instance_logger": { + "instance_0": {"individual": {"score": ["a", "b"]}} + } + }, + } + assert bare_meta._extract_scores_from_report(data, "lira") is None + + +def test_extract_scores_qmia_non_numeric(bare_meta): + """A non-numeric QMIA score is skipped, yielding None.""" + data = { + "metadata": {"attack_name": "QMIA Attack"}, + "attack_experiment_logger": { + "attack_instance_logger": { + "instance_0": {"individual": {"member_prob": ["x", "y"]}} + } + }, + } + assert bare_meta._extract_scores_from_report(data, "qmia") is None + + +# ------------------------------------------------------------------ +# _run_sub_attack failure paths +# ------------------------------------------------------------------ + + +def test_run_sub_attack_handles_exception(bare_meta, meta_target): + """An invalid sub-attack parameter is caught and returns None.""" + result = bare_meta._run_sub_attack( + "qmia", {"definitely_not_a_param": 123}, meta_target, 0 + ) + assert result is None + + +def test_run_sub_attack_empty_result(bare_meta, meta_target, monkeypatch): + """A sub-attack producing no results returns None.""" + from sacroml.attacks import factory # noqa: PLC0415 + + class _Stub: + def attack(self, _target): + return {} + + monkeypatch.setattr(factory, "create_attack", lambda _name, **_kw: _Stub()) + assert bare_meta._run_sub_attack("qmia", {}, meta_target, 0) is None + + +# ------------------------------------------------------------------ +# _extract_mia_scores / _extract_structural_scores helpers +# ------------------------------------------------------------------ + + +def test_extract_mia_scores_skips_missing_then_reads(): + """The first metrics dict without scores is skipped; the next is read.""" + obj = SimpleNamespace( + attack_metrics=[{}, {"individual": {"score": [-1.0, 0.3, 5.0]}}] + ) + assert MetaAttack._extract_mia_scores(obj, "lira") == [0.0, 0.3, 1.0] + + +def test_extract_mia_scores_non_numeric(): + """Non-numeric individual MIA scores return None.""" + obj = SimpleNamespace(attack_metrics=[{"individual": {"score": ["a", "b"]}}]) + assert MetaAttack._extract_mia_scores(obj, "lira") is None + + +def test_extract_mia_scores_absent(): + """A MIA attack with no individual scores returns None.""" + obj = SimpleNamespace(attack_metrics=[{}]) + assert MetaAttack._extract_mia_scores(obj, "lira") is None + + +def test_extract_structural_scores_absent(): + """A structural attack without record_level_results returns None.""" + obj = SimpleNamespace() + assert MetaAttack._extract_structural_scores(obj) is None + + +# ------------------------------------------------------------------ +# Defensive guards: vulnerability_df not yet built +# ------------------------------------------------------------------ + + +def test_compute_global_metrics_requires_df(bare_meta): + """_compute_global_metrics raises if vulnerability_df is unset.""" + with pytest.raises(RuntimeError, match="vulnerability_df"): + bare_meta._compute_global_metrics(1, 1) + + +def test_construct_metadata_requires_df(bare_meta): + """_construct_metadata raises if vulnerability_df is unset.""" + with pytest.raises(RuntimeError, match="vulnerability_df"): + bare_meta._construct_metadata() + + +def test_get_attack_metrics_instances_requires_df(bare_meta): + """_get_attack_metrics_instances raises if vulnerability_df is unset.""" + with pytest.raises(RuntimeError, match="vulnerability_df"): + bare_meta._get_attack_metrics_instances() + + +# ------------------------------------------------------------------ +# CSV write failure is logged, not raised +# ------------------------------------------------------------------ + + +def test_meta_csv_write_failure_logged(meta_target, tmp_path, monkeypatch, caplog): + """An OSError while writing the CSV is logged and does not propagate.""" + meta = MetaAttack( + attacks=[("qmia", {})], + output_dir=str(tmp_path / "out"), + report_dir=str(tmp_path / "rep"), + write_report=False, + k_threshold=10, + ) + output = meta.attack(meta_target) + + # Re-enable writing, but force the CSV export to fail. + meta.write_report = True + meta.keep_separate = True # write JSON/PDF to output_dir; isolate the CSV failure + + def boom(*_a, **_k): + raise OSError("disk full") + + monkeypatch.setattr(meta.vulnerability_df, "to_csv", boom) + + with caplog.at_level(logging.ERROR): + meta._write_report(output) + + assert any("Failed to write vulnerability matrix" in m for m in caplog.messages) + + +# ------------------------------------------------------------------ +# report.create_meta_report: finite and non-finite sub-attack AUC branches +# ------------------------------------------------------------------ + + +def test_create_meta_report_with_finite_auc(tmp_path): + """Create_meta_report formats a finite sub-attack AUC and draws the chart.""" + from sacroml.attacks import report # noqa: PLC0415 + + output = { + "metadata": { + "sacroml_version": "test", + "attack_params": {"output_dir": str(tmp_path)}, + "global_metrics": {"AUC": 0.9}, + }, + "attack_experiment_logger": { + "attack_instance_logger": { + "instance_0": { + "sub_attacks": {"lira": {"n_reps": 1, "AUC": 0.8321}}, + "individual": {"n_vulnerable": [0, 1, 2, 1, 0]}, + } + } + }, + } + pdf = report.create_meta_report(output) + assert pdf is not None + + +def test_create_meta_report_with_nonfinite_auc(monkeypatch, tmp_path): + """Render 'N/A' when a sub-attack AUC is non-finite or non-numeric.""" + from sacroml.attacks import report # noqa: PLC0415 + + captured: list[str] = [] + original_line = report.line + + def capturing_line(pdf, text, **kwargs): + captured.append(text) + return original_line(pdf, text, **kwargs) + + monkeypatch.setattr(report, "line", capturing_line) + + output = { + "metadata": { + "sacroml_version": "test", + "attack_params": {"output_dir": str(tmp_path)}, + "global_metrics": {"AUC": 0.9}, + }, + "attack_experiment_logger": { + "attack_instance_logger": { + "instance_0": { + "sub_attacks": { + "lira_nan": {"n_reps": 1, "AUC": float("nan")}, + "qmia_inf": {"n_reps": 1, "AUC": float("inf")}, + "structural_neg_inf": {"n_reps": 1, "AUC": float("-inf")}, + "non_numeric": {"n_reps": 1, "AUC": "not-a-number"}, + "missing_auc": {"n_reps": 1}, + }, + "individual": {"n_vulnerable": [0, 1, 2, 1, 0]}, + } + } + }, + } + pdf = report.create_meta_report(output) + + assert pdf is not None + na_lines = [text for text in captured if "AUC=N/A" in text] + assert len(na_lines) == 5 diff --git a/tests/attacks/test_structural_attack.py b/tests/attacks/test_structural_attack.py index a1a89913..193bf058 100644 --- a/tests/attacks/test_structural_attack.py +++ b/tests/attacks/test_structural_attack.py @@ -735,3 +735,40 @@ def test_structural_individual_externalised(tmp_path): assert os.path.exists(npz_path) with np.load(npz_path) as data: assert "individual.k_anonymity" in data + + +def test_structural_report_individual_default_off_omits_individual(): + """Default report_individual=False: no per-record block populated.""" + target = get_target("dt", max_depth=1, min_samples_leaf=20, random_state=0) + attack = sa.StructuralAttack() + assert attack.report_individual is False + + output = attack.attack(target) + + assert attack.record_level_results is None + assert "individual" not in attack.attack_metrics + inst = output["attack_experiment_logger"]["attack_instance_logger"]["instance_0"] + assert "individual" not in inst + + +def test_structural_report_individual_on_populates_individual(): + """Report_individual=True populates per-record block, one entry per train row.""" + target = get_target("dt", max_depth=1, min_samples_leaf=20, random_state=0) + n_train = len(target.y_train) + + attack = sa.StructuralAttack(report_individual=True) + output = attack.attack(target) + + assert attack.record_level_results is not None + assert len(attack.record_level_results.k_anonymity) == n_train + assert len(attack.record_level_results.class_disclosure) == n_train + assert len(attack.record_level_results.smallgroup_risk) == n_train + + assert "individual" in attack.attack_metrics + inst = output["attack_experiment_logger"]["attack_instance_logger"]["instance_0"] + assert "individual" in inst + assert set(inst["individual"].keys()) == { + "k_anonymity", + "class_disclosure", + "smallgroup_risk", + }