diff --git a/CHANGELOG.md b/CHANGELOG.md index c6c86809..be131c26 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,12 +18,17 @@ Changes: (multi-section, as produced when individual attacks append to the same file) and any subdirectory-per-attack layout. Registered in the attack factory as `"meta"`. +* Refactor: Name `InstanceBasedAttack`'s default floating-point matching tolerance as the module-level constant `INSTANCE_MATCH_ATOL = 1e-8` ([#454](https://github.com/AI-SDC/SACRO-ML/issues/454)). `StructuralAttack` is intentionally not changed because it uses exact `np.unique` equality on deterministic `predict_proba` outputs and does not need a tolerance. * Feat: `QMIAAttack`: membership inference attack via quantile regression (Bertran et al., NeurIPS 2023, arXiv:2307.03694). Trains a histogram-based quantile regressor (`HistGradientBoostingRegressor`) on non-member hinge scores to learn per-sample membership thresholds. A sample is predicted as a member when its observed score exceeds the predicted threshold at quantile level (1 - alpha). No shadow models or architecture knowledge required. Registered in the attack factory as `"qmia"`. +* Refactor: move `unwrap_model` from `InstanceBasedAttack` to `sacroml.attacks.utils` + so it can be reused by other attacks that need to split a scikit-learn `Pipeline` + into its final estimator and preprocessing stages + ([#455](https://github.com/AI-SDC/SACRO-ML/issues/455)). * Fix: `StructuralAttack` now respects the `report_individual` flag. Per-record `record_level_results` and `attack_metrics["individual"]` are only populated when the flag is set to `True`, matching the behaviour of `LIRAAttack` and `QMIAAttack`. diff --git a/sacroml/attacks/factory.py b/sacroml/attacks/factory.py index e3cfd7b7..62b7d5f9 100644 --- a/sacroml/attacks/factory.py +++ b/sacroml/attacks/factory.py @@ -6,6 +6,7 @@ from sacroml.attacks.attack import Attack from sacroml.attacks.attribute_attack import AttributeAttack +from sacroml.attacks.instance_based_attack import InstanceBasedAttack from sacroml.attacks.likelihood_attack import LIRAAttack from sacroml.attacks.meta_attack import MetaAttack from sacroml.attacks.qmia_attack import QMIAAttack @@ -19,6 +20,7 @@ registry: dict[str, type[Attack]] = { "attribute": AttributeAttack, + "instance_based": InstanceBasedAttack, "lira": LIRAAttack, "meta": MetaAttack, "qmia": QMIAAttack, diff --git a/sacroml/attacks/instance_based_attack.py b/sacroml/attacks/instance_based_attack.py new file mode 100644 index 00000000..0e16b03b --- /dev/null +++ b/sacroml/attacks/instance_based_attack.py @@ -0,0 +1,519 @@ +"""Instance-based model attack. + +Detects when instance-based models (SVM, kNN) store training data as part +of their model parameters (support vectors or neighbors), confirming a +concrete data leakage pathway. + +This module provides the `InstanceBasedAttack` class, which: +- Checks if a model is an instance-based type (SVM or kNN) +- Extracts the stored instances (support vectors or neighbors) +- Compares them to the training data to confirm data leakage +- Reports matching examples and available mitigations +""" + +from __future__ import annotations + +import logging +from dataclasses import asdict, dataclass, field + +import numpy as np +from fpdf import FPDF +from sklearn.neighbors import KNeighborsClassifier, KNeighborsRegressor +from sklearn.svm import SVC, SVR, NuSVC, NuSVR, OneClassSVM + +from sacroml.attacks import report +from sacroml.attacks.attack import Attack +from sacroml.attacks.target import Target +from sacroml.attacks.utils import unwrap_model + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +SVM_TYPES = (SVC, NuSVC, SVR, NuSVR, OneClassSVM) +KNN_TYPES = (KNeighborsClassifier, KNeighborsRegressor) + +N_EXAMPLES = 10 # default number of matching examples included in the report + +INSTANCE_MATCH_ATOL: float = 1e-8 +"""Absolute tolerance for matching stored instances to training rows. + +Used by :func:`numpy.allclose` so that stored support vectors (or kNN +neighbours) that differ from the original training row only by floating +point rounding (~1e-16 per element) still count as matches. + +Kept local to this module rather than a shared constants module: see +issue #454. ``StructuralAttack`` does not use a numerical tolerance +because its equivalence classes come from :func:`numpy.unique` on +deterministic ``predict_proba`` outputs, where identical inputs produce +bit-identical outputs and exact equality is the right semantics. +""" + +_INTRODUCTION = ( + "This report provides the results of an instance-based model data " + "leakage check. Some model types -- notably Support Vector Machines " + "(SVM) and k-Nearest Neighbours (kNN) -- store training data points " + "as part of their fitted model parameters. SVM models store 'support " + "vectors' (a subset of training records that define the decision " + "boundary), while kNN models store the entire training dataset. " + "When such a model is released from a Trusted Research Environment " + "(TRE), these stored data points can be directly extracted, " + "constituting a concrete data leakage risk.\n This attack extracts " + "any stored instances from the model, compares them against the " + "original training data, and reports whether matches are found." +) + +_GLOSSARY = { + "Support Vectors": ( + "In SVM models, support vectors are the training data points that " + "lie closest to the decision boundary. These are stored verbatim " + "inside the fitted model and can be extracted directly." + ), + "kNN Storage": ( + "k-Nearest Neighbours models store the entire training dataset " + "internally, as predictions are made by finding the k closest " + "stored points to a new input." + ), + "DP Variant": ( + "A differentially private variant of a model adds calibrated " + "noise to break the direct link between stored model parameters " + "and the original training data, mitigating the leakage risk." + ), + "Storage Fraction": ( + "The proportion of training data points stored inside the model. " + "For SVM this is typically a subset; for kNN this is 1.0 (all data)." + ), + "Match Fraction": ( + "The proportion of stored instances that exactly match a training " + "data point. A non-zero match fraction confirms data leakage." + ), +} + + +@dataclass +class InstanceBasedAttackResults: + """Results of an instance-based model attack.""" + + model_type: str + is_instance_based: bool = False + is_dp_safe: bool = False + n_stored_instances: int = 0 + n_training_samples: int = 0 + storage_fraction: float = 0.0 + n_matched: int = 0 + n_checked: int = 0 + match_fraction: float = 0.0 + data_leakage_confirmed: bool = False + mitigations: list[str] = field(default_factory=list) + details: dict | None = None + + +@dataclass +class InstanceBasedRecordLevelResults: + """Per-training-record outcomes for an instance-based attack. + + Indexed by training record (length == n_training_samples), consistent + with StructuralAttack's individual block. A value of 1 means that + training record is stored verbatim inside the model; 0 means it is not. + """ + + individual_risk: list[int] # 1 if training record is stored in model, else 0 + + +class InstanceBasedAttack(Attack): + """Detect training data stored in instance-based model parameters. + + Instance-based models such as SVM and kNN store training data points + (support vectors or all neighbors) inside the fitted model. This attack + extracts those stored instances, compares them to the training data, and + reports whether the model leaks training data. + """ + + def __init__( + self, + output_dir: str = "outputs", + write_report: bool = True, + n_examples: int = N_EXAMPLES, + atol: float = INSTANCE_MATCH_ATOL, + report_individual: bool = False, + ) -> None: + """Construct an instance-based model attack. + + Parameters + ---------- + output_dir : str + Name of a directory to write outputs. + write_report : bool + Whether to generate a JSON and PDF report. + n_examples : int + Maximum number of matching examples to show in the PDF report. + Does not limit how many matches are recorded; all matches are + kept in the per-record results. + atol : float + Absolute tolerance for floating-point comparison when matching + stored instances to training data. + report_individual : bool + Whether to report metrics for each individual record. + """ + super().__init__(output_dir=output_dir, write_report=write_report) + self.n_examples = n_examples + self.atol = atol + self.report_individual: bool = report_individual + self.results: InstanceBasedAttackResults | None = None + self.record_level_results: InstanceBasedRecordLevelResults | None = None + + def __str__(self) -> str: + """Return the name of the attack.""" + return "Instance-Based Model Attack" + + @classmethod + def attackable(cls, target: Target) -> bool: + """Return whether a target can be assessed with this attack. + + Requires a model and training data. Non-instance-based models are + handled gracefully (reported as not applicable). + """ + if not target.has_model(): + logger.info("target.model is missing, cannot proceed") + return False + if not target.has_data(): + logger.info("target data is missing, cannot proceed") + return False + return True + + def _compare_instances( + self, + stored_instances: np.ndarray, + stored_indices: np.ndarray | None, + X_train: np.ndarray, + ) -> tuple[int, InstanceBasedRecordLevelResults]: + """Compare stored model instances against training data. + + Parameters + ---------- + stored_instances : np.ndarray + Data points stored inside the model. + stored_indices : np.ndarray or None + Indices of stored instances into the original training data. + X_train : np.ndarray + The training data to compare against. + + Returns + ------- + n_matched : int + Number of stored instances that match a training record. + record_level_results : InstanceBasedRecordLevelResults + One entry per training record: 1 if that record is stored in + the model, 0 otherwise. + """ + individual_risk = np.zeros(len(X_train), dtype=int) + + for i, stored_row in enumerate(stored_instances): + match_index = -1 + + # Try index-based direct comparison first + if stored_indices is not None and i < len(stored_indices): + idx = int(stored_indices[i]) + if 0 <= idx < len(X_train) and np.allclose( + stored_row, X_train[idx], atol=self.atol + ): + match_index = idx + + # Fallback: search through training data + if match_index == -1: + for j in range(len(X_train)): + if np.allclose(stored_row, X_train[j], atol=self.atol): + match_index = j + break + + if match_index != -1: + individual_risk[match_index] = 1 + + n_matched = int(individual_risk.sum()) + record_level_results = InstanceBasedRecordLevelResults( + individual_risk=individual_risk.tolist() + ) + return n_matched, record_level_results + + def _build_mitigations( + self, is_svm: bool, is_knn: bool, is_dp_safe: bool + ) -> list[str]: + """Build the list of available mitigations.""" + mitigations: list[str] = [] + + if is_dp_safe: + mitigations.append( + "This model uses a DP-safe variant. The stored parameters are " + "in a transformed/noisy space and do not directly correspond " + "to training data points." + ) + + if is_svm: + mitigations.append( + "Use a differentially private SVM variant (e.g., DPSVC from " + "sacroml.safemodel) which adds noise to the separating " + "hyperplane in a transformed feature space, breaking the " + "direct link between support vectors and training data." + ) + + if is_knn: + mitigations.append( + "kNN models inherently store all training data. Consider " + "using a model type that does not require storing training " + "instances (e.g., decision tree, random forest, or neural " + "network)." + ) + + mitigations.append( + "By agreement with the TRE, this risk may be deemed 'not " + "relevant' for this particular dataset if the data is already " + "public or low-sensitivity." + ) + + return mitigations + + def _attack(self, target: Target) -> dict: + """Run the instance-based model attack. + + Parameters + ---------- + target : Target + The target object containing the model and data. + + Returns + ------- + dict + Attack report dictionary. + """ + raw_model, preprocessor = unwrap_model(target.model.model) + model_type = type(raw_model).__name__ + + is_svm = isinstance(raw_model, SVM_TYPES) + is_knn = isinstance(raw_model, KNN_TYPES) + is_instance_based = is_svm or is_knn + + # Lazy import to avoid circular dependency + from sacroml.safemodel.classifiers.dp_svc import DPSVC # noqa: PLC0415 + + is_dp_safe = isinstance(raw_model, DPSVC) + + X_train = target.X_train + # If model was inside a Pipeline with preprocessing, transform + # X_train to the same space as the stored instances + if preprocessor is not None: + X_train = preprocessor.transform(X_train) + n_training = len(X_train) + + if not is_instance_based: + logger.info( + "Model type %s is not instance-based, no data leakage risk " + "from stored instances.", + model_type, + ) + self.results = InstanceBasedAttackResults( + model_type=model_type, + n_training_samples=n_training, + ) + output = self._make_report(target) + self._write_report(output) + return output + + # Extract stored instances + stored_instances = None + stored_indices = None + + if is_svm: + if hasattr(raw_model, "support_vectors_"): + stored_instances = np.asarray(raw_model.support_vectors_) + stored_indices = np.asarray(raw_model.support_) + else: + logger.warning( + "SVM model %s does not have support_vectors_ attribute. " + "It may not be fitted.", + model_type, + ) + + if is_knn: + if hasattr(raw_model, "_fit_X"): + stored_instances = np.asarray(raw_model._fit_X) + stored_indices = np.arange(len(stored_instances)) + else: + logger.warning( + "kNN model %s does not have _fit_X attribute. " + "It may not be fitted.", + model_type, + ) + + if stored_instances is None: + self.results = InstanceBasedAttackResults( + model_type=model_type, + is_instance_based=True, + is_dp_safe=is_dp_safe, + n_training_samples=n_training, + mitigations=self._build_mitigations(is_svm, is_knn, is_dp_safe), + ) + output = self._make_report(target) + self._write_report(output) + return output + + n_stored = len(stored_instances) + + # Check shape compatibility + if stored_instances.shape[1] != X_train.shape[1]: + logger.warning( + "Feature dimension mismatch: stored instances have %d " + "features, training data has %d. Cannot compare.", + stored_instances.shape[1], + X_train.shape[1], + ) + self.results = InstanceBasedAttackResults( + model_type=model_type, + is_instance_based=True, + is_dp_safe=is_dp_safe, + n_stored_instances=n_stored, + n_training_samples=n_training, + storage_fraction=n_stored / n_training if n_training > 0 else 0.0, + mitigations=self._build_mitigations(is_svm, is_knn, is_dp_safe), + details={"error": "Feature dimension mismatch"}, + ) + output = self._make_report(target) + self._write_report(output) + return output + + # Compare stored instances to training data + n_matched, self.record_level_results = self._compare_instances( + stored_instances, stored_indices, X_train + ) + + storage_fraction = n_stored / n_training if n_training > 0 else 0.0 + match_fraction = n_matched / n_stored if n_stored > 0 else 0.0 + data_leakage_confirmed = n_matched > 0 + + mitigations = self._build_mitigations(is_svm, is_knn, is_dp_safe) + + self.results = InstanceBasedAttackResults( + model_type=model_type, + is_instance_based=True, + is_dp_safe=is_dp_safe, + n_stored_instances=n_stored, + n_training_samples=n_training, + storage_fraction=storage_fraction, + n_matched=n_matched, + n_checked=n_stored, + match_fraction=match_fraction, + data_leakage_confirmed=data_leakage_confirmed, + mitigations=mitigations, + ) + + output = self._make_report(target) + self._write_report(output) + return output + + def _construct_metadata(self) -> None: + """Construct the metadata dictionary for reporting.""" + super()._construct_metadata() + if self.results: + self.metadata["global_metrics"] = { + "model_type": self.results.model_type, + "is_instance_based": self.results.is_instance_based, + "is_dp_safe": self.results.is_dp_safe, + "n_stored_instances": self.results.n_stored_instances, + "n_training_samples": self.results.n_training_samples, + "storage_fraction": self.results.storage_fraction, + "n_matched": self.results.n_matched, + "match_fraction": self.results.match_fraction, + "data_leakage_confirmed": self.results.data_leakage_confirmed, + } + + def _get_attack_metrics_instances(self) -> dict: + """Return attack metrics for the report structure.""" + attack_metrics_experiment = {} + if self.results: + instance_0 = asdict(self.results) + if self.report_individual and self.record_level_results is not None: + instance_0["individual"] = asdict(self.record_level_results) + attack_metrics_experiment["attack_instance_logger"] = { + "instance_0": instance_0, + } + return attack_metrics_experiment + + def _make_pdf(self, output: dict) -> FPDF: + """Create PDF report. + + Returns + ------- + FPDF : A PDF object containing the instance-based attack report. + """ + metadata = output["metadata"] + metrics = metadata["global_metrics"] + instance_data = output["attack_experiment_logger"]["attack_instance_logger"][ + "instance_0" + ] + + pdf = FPDF() + pdf.add_page() + pdf.set_xy(0, 0) + + report.title(pdf, "Instance-Based Model Attack Report") + report.subtitle(pdf, "Introduction") + report.line(pdf, _INTRODUCTION) + + report.subtitle(pdf, "Experiment Summary") + report.line( + pdf, + f"{'sacroml_version':>30s}: {str(metadata['sacroml_version']):30s}", + font="courier", + ) + for key, value in metadata["attack_params"].items(): + report.line(pdf, f"{key:>30s}: {str(value):30s}", font="courier") + + report.subtitle(pdf, "Risk Summary") + for key in ( + "model_type", + "is_instance_based", + "is_dp_safe", + "data_leakage_confirmed", + "n_stored_instances", + "n_training_samples", + "storage_fraction", + "n_matched", + "match_fraction", + ): + value = metrics.get(key, "N/A") + report.line(pdf, f"{key:>30s}: {str(value):30s}", font="courier") + + # Example matches: show first n_examples training indices flagged as stored. + rlr = self.record_level_results + if rlr is not None and any(rlr.individual_risk): + matched_train_indices = [ + i for i, risk in enumerate(rlr.individual_risk) if risk + ] + shown = matched_train_indices[: self.n_examples] + pdf.add_page() + report.title(pdf, "Example Matches") + report.line( + pdf, + f"Showing {len(shown)} of {len(matched_train_indices)} training " + f"record(s) found stored verbatim in the model:", + ) + for display_i, train_idx in enumerate(shown): + report.line( + pdf, + f" Match {display_i + 1}: train[{train_idx}]", + font="courier", + font_size=9, + ) + + # Mitigations + mitigations = instance_data.get("mitigations", []) + if mitigations: + pdf.add_page() + report.title(pdf, "Available Mitigations") + for i, mitigation in enumerate(mitigations): + report.subtitle(pdf, f"Option {i + 1}") + report.line(pdf, mitigation) + + pdf.add_page() + report.title(pdf, "Glossary") + report._write_dict(pdf, _GLOSSARY) + + return pdf diff --git a/sacroml/attacks/utils.py b/sacroml/attacks/utils.py index 9543f1bc..79301267 100644 --- a/sacroml/attacks/utils.py +++ b/sacroml/attacks/utils.py @@ -10,6 +10,7 @@ import numpy as np from scipy.stats import shapiro from sklearn.base import BaseEstimator +from sklearn.pipeline import Pipeline from sacroml.attacks.model import Model from sacroml.attacks.target import Target @@ -278,3 +279,35 @@ def get_class_by_name(class_path: str) -> type[object]: module_path, class_name = class_path.rsplit(".", 1) module = importlib.import_module(module_path) return getattr(module, class_name) + + +def unwrap_model( + model: BaseEstimator, +) -> tuple[BaseEstimator, Pipeline | None]: + """Extract the final estimator and preprocessor from a scikit-learn model. + + If ``model`` is a :class:`sklearn.pipeline.Pipeline`, the final step is + returned as the estimator and a new ``Pipeline`` containing the remaining + earlier steps is returned as the preprocessor. This allows callers to + transform inputs into the same feature space the final estimator was + fitted on. If the Pipeline has only one step, no preprocessor exists and + ``None`` is returned in its place. Non-Pipeline models are returned + unchanged with ``None`` as the preprocessor. + + Parameters + ---------- + model : BaseEstimator + A fitted scikit-learn estimator, optionally wrapped in a ``Pipeline``. + + Returns + ------- + tuple[BaseEstimator, Pipeline | None] + ``(final_estimator, preprocessor)`` where ``preprocessor`` is a + Pipeline of all steps except the last, or ``None`` if the input is + not a Pipeline or is a single-step Pipeline. + """ + if isinstance(model, Pipeline): + final_estimator = model.steps[-1][1] + preprocessor = Pipeline(model.steps[:-1]) if len(model.steps) > 1 else None + return final_estimator, preprocessor + return model, None diff --git a/tests/attacks/test_instance_based_attack.py b/tests/attacks/test_instance_based_attack.py new file mode 100644 index 00000000..8f787782 --- /dev/null +++ b/tests/attacks/test_instance_based_attack.py @@ -0,0 +1,480 @@ +"""Tests for InstanceBasedAttack.""" + +import logging +import os + +import numpy as np +import pytest +from sklearn.datasets import make_moons, make_regression +from sklearn.model_selection import train_test_split +from sklearn.neighbors import KNeighborsClassifier, KNeighborsRegressor +from sklearn.pipeline import Pipeline +from sklearn.preprocessing import StandardScaler +from sklearn.svm import SVC, SVR, NuSVC +from sklearn.tree import DecisionTreeClassifier + +from sacroml.attacks.factory import create_attack +from sacroml.attacks.instance_based_attack import ( + INSTANCE_MATCH_ATOL, + InstanceBasedAttack, +) +from sacroml.attacks.target import Target +from sacroml.safemodel.classifiers.dp_svc import DPSVC + + +def _make_target_clf(model, n_samples=100, random_state=42): + """Create a target with a fitted classification model on synthetic data.""" + X, y = make_moons(n_samples=n_samples, noise=0.3, random_state=random_state) + X_train, X_test, y_train, y_test = train_test_split( + X, y, test_size=0.2, random_state=random_state + ) + model.fit(X_train, y_train) + return Target( + model=model, + X_train=X_train, + y_train=y_train, + X_test=X_test, + y_test=y_test, + ) + + +def _make_target_reg(model, n_samples=100, random_state=42): + """Create a target with a fitted regression model on synthetic data.""" + X, y = make_regression( + n_samples=n_samples, n_features=2, noise=0.1, random_state=random_state + ) + X_train, X_test, y_train, y_test = train_test_split( + X, y, test_size=0.2, random_state=random_state + ) + model.fit(X_train, y_train) + return Target( + model=model, + X_train=X_train, + y_train=y_train, + X_test=X_test, + y_test=y_test, + ) + + +class TestAttackable: + """Tests for the attackable classmethod.""" + + def test_no_model(self): + """Test attackable returns False with no model.""" + target = Target() + assert not InstanceBasedAttack.attackable(target) + + def test_no_data(self): + """Test attackable returns False with no training data.""" + model = SVC(gamma=0.1) + X, y = make_moons(n_samples=50, noise=0.3, random_state=42) + model.fit(X, y) + target = Target(model=model) + assert not InstanceBasedAttack.attackable(target) + + def test_valid_target(self): + """Test attackable returns True for valid target.""" + target = _make_target_clf(SVC(gamma=0.1)) + assert InstanceBasedAttack.attackable(target) + + def test_non_instance_model_attackable(self): + """Test attackable returns True for non-instance models too.""" + target = _make_target_clf(DecisionTreeClassifier(random_state=42)) + assert InstanceBasedAttack.attackable(target) + + +class TestSVMDetection: + """Tests for SVM model detection and leakage confirmation.""" + + def test_svc_detects_leakage(self): + """Test SVC support vector leakage is detected.""" + target = _make_target_clf(SVC(gamma=0.1)) + attack = InstanceBasedAttack( + output_dir="outputs_instance_based", write_report=False + ) + output = attack.attack(target) + + assert output + instance = output["attack_experiment_logger"]["attack_instance_logger"][ + "instance_0" + ] + assert instance["is_instance_based"] is True + assert instance["model_type"] == "SVC" + assert instance["data_leakage_confirmed"] is True + assert instance["n_matched"] > 0 + assert instance["match_fraction"] > 0 + assert instance["n_stored_instances"] > 0 + assert len(instance["mitigations"]) > 0 + + def test_nusvc_detects_leakage(self): + """Test NuSVC support vector leakage is detected.""" + target = _make_target_clf(NuSVC()) + attack = InstanceBasedAttack( + output_dir="outputs_instance_based", write_report=False + ) + output = attack.attack(target) + + instance = output["attack_experiment_logger"]["attack_instance_logger"][ + "instance_0" + ] + assert instance["is_instance_based"] is True + assert instance["model_type"] == "NuSVC" + assert instance["data_leakage_confirmed"] is True + + def test_svr_detects_leakage(self): + """Test SVR support vector leakage is detected.""" + target = _make_target_reg(SVR()) + attack = InstanceBasedAttack( + output_dir="outputs_instance_based", write_report=False + ) + output = attack.attack(target) + + instance = output["attack_experiment_logger"]["attack_instance_logger"][ + "instance_0" + ] + assert instance["is_instance_based"] is True + assert instance["model_type"] == "SVR" + assert instance["data_leakage_confirmed"] is True + + +class TestKNNDetection: + """Tests for kNN model detection and leakage confirmation.""" + + def test_knn_detects_leakage(self): + """Test KNeighborsClassifier stores all training data.""" + target = _make_target_clf(KNeighborsClassifier(n_neighbors=3)) + attack = InstanceBasedAttack( + output_dir="outputs_instance_based", write_report=False + ) + output = attack.attack(target) + + instance = output["attack_experiment_logger"]["attack_instance_logger"][ + "instance_0" + ] + assert instance["is_instance_based"] is True + assert instance["model_type"] == "KNeighborsClassifier" + assert instance["data_leakage_confirmed"] is True + assert instance["storage_fraction"] == pytest.approx(1.0) + assert instance["match_fraction"] == pytest.approx(1.0) + + def test_knn_regressor_leakage(self): + """Test KNeighborsRegressor stores all training data.""" + target = _make_target_reg(KNeighborsRegressor(n_neighbors=3)) + attack = InstanceBasedAttack( + output_dir="outputs_instance_based", write_report=False + ) + output = attack.attack(target) + + instance = output["attack_experiment_logger"]["attack_instance_logger"][ + "instance_0" + ] + assert instance["is_instance_based"] is True + assert instance["model_type"] == "KNeighborsRegressor" + assert instance["data_leakage_confirmed"] is True + assert instance["storage_fraction"] == pytest.approx(1.0) + + +class TestNonInstanceModels: + """Tests for non-instance-based models.""" + + def test_decision_tree_safe(self): + """Test DecisionTree is not flagged as instance-based.""" + target = _make_target_clf(DecisionTreeClassifier(random_state=42)) + attack = InstanceBasedAttack( + output_dir="outputs_instance_based", write_report=False + ) + output = attack.attack(target) + + instance = output["attack_experiment_logger"]["attack_instance_logger"][ + "instance_0" + ] + assert instance["is_instance_based"] is False + assert instance["data_leakage_confirmed"] is False + assert instance["n_stored_instances"] == 0 + assert instance["n_matched"] == 0 + + +class TestDPSVC: + """Tests for differentially private SVM variant.""" + + def test_dpsvc_is_dp_safe(self): + """Test DPSVC is detected as DP-safe.""" + X, y = make_moons(n_samples=100, noise=0.3, random_state=42) + X_train, X_test, y_train, y_test = train_test_split( + X, y, test_size=0.2, random_state=42 + ) + model = DPSVC(eps=10) + model.fit(X_train, y_train) + target = Target( + model=model, + X_train=X_train, + y_train=y_train, + X_test=X_test, + y_test=y_test, + ) + attack = InstanceBasedAttack( + output_dir="outputs_instance_based", write_report=False + ) + output = attack.attack(target) + + instance = output["attack_experiment_logger"]["attack_instance_logger"][ + "instance_0" + ] + assert instance["is_dp_safe"] is True + assert any("DP-safe" in m for m in instance["mitigations"]) + + +class TestPipeline: + """Tests for Pipeline unwrapping.""" + + def test_pipeline_svc(self): + """Test SVC inside a Pipeline is detected and leakage confirmed.""" + model = Pipeline([("scaler", StandardScaler()), ("svc", SVC(gamma=0.1))]) + X, y = make_moons(n_samples=100, noise=0.3, random_state=42) + X_train, X_test, y_train, y_test = train_test_split( + X, y, test_size=0.2, random_state=42 + ) + model.fit(X_train, y_train) + target = Target( + model=model, + X_train=X_train, + y_train=y_train, + X_test=X_test, + y_test=y_test, + ) + attack = InstanceBasedAttack( + output_dir="outputs_instance_based", write_report=False + ) + output = attack.attack(target) + + instance = output["attack_experiment_logger"]["attack_instance_logger"][ + "instance_0" + ] + assert instance["is_instance_based"] is True + assert instance["model_type"] == "SVC" + assert instance["data_leakage_confirmed"] is True + assert instance["n_matched"] > 0 + + +class TestConfiguration: + """Tests for attack configuration and parameters.""" + + def test_n_examples_does_not_cap_recorded_matches(self): + """Check that n_examples limits report display only, not recorded matches.""" + target = _make_target_clf(SVC(gamma=0.1)) + attack = InstanceBasedAttack( + output_dir="outputs_instance_based", + write_report=False, + n_examples=2, + report_individual=True, + ) + output = attack.attack(target) + + instance = output["attack_experiment_logger"]["attack_instance_logger"][ + "instance_0" + ] + individual = instance["individual"] + # individual_risk has one entry per training record. + assert len(individual["individual_risk"]) == instance["n_training_samples"] + assert sum(individual["individual_risk"]) == instance["n_matched"] + + def test_str_representation(self): + """Test __str__ returns the attack name.""" + attack = InstanceBasedAttack( + output_dir="outputs_instance_based", write_report=False + ) + assert str(attack) == "Instance-Based Model Attack" + + def test_get_params(self): + """Test get_params returns constructor parameters.""" + attack = InstanceBasedAttack( + output_dir="outputs_instance_based", + write_report=False, + n_examples=5, + atol=1e-6, + ) + params = attack.get_params() + assert params["n_examples"] == 5 + assert params["atol"] == 1e-6 + assert params["output_dir"] == "outputs_instance_based" + assert params["report_individual"] is False + + def test_default_atol_is_module_constant(self): + """Default atol matches INSTANCE_MATCH_ATOL, see issue #454.""" + attack = InstanceBasedAttack(write_report=False) + assert attack.atol == INSTANCE_MATCH_ATOL + assert INSTANCE_MATCH_ATOL == 1e-8 + + def test_factory_registration(self): + """Test attack is registered in the factory.""" + attack = create_attack( + "instance_based", + output_dir="outputs_instance_based", + write_report=False, + ) + assert isinstance(attack, InstanceBasedAttack) + + +class TestOutputStructure: + """Tests for output format and report generation.""" + + def test_output_structure(self): + """Test output dict has required keys and metadata.""" + target = _make_target_clf(SVC(gamma=0.1)) + attack = InstanceBasedAttack( + output_dir="outputs_instance_based", write_report=False + ) + output = attack.attack(target) + + assert "log_id" in output + assert "log_time" in output + assert "metadata" in output + assert "attack_experiment_logger" in output + + metadata = output["metadata"] + assert metadata["attack_name"] == "Instance-Based Model Attack" + assert "global_metrics" in metadata + assert "data_leakage_confirmed" in metadata["global_metrics"] + + def test_report_files_created(self): + """Test JSON and PDF report files are generated.""" + target = _make_target_clf(SVC(gamma=0.1)) + attack = InstanceBasedAttack( + output_dir="outputs_instance_based", write_report=True + ) + attack.attack(target) + + assert os.path.exists(os.path.join("outputs_instance_based", "report.json")) + assert os.path.exists(os.path.join("outputs_instance_based", "report.pdf")) + + def test_record_level_individual_structure(self): + """Per-record individual block has one entry per training record.""" + target = _make_target_clf(SVC(gamma=0.1)) + attack = InstanceBasedAttack( + output_dir="outputs_instance_based", + write_report=False, + report_individual=True, + ) + output = attack.attack(target) + + instance = output["attack_experiment_logger"]["attack_instance_logger"][ + "instance_0" + ] + individual = instance["individual"] + assert "individual_risk" in individual + + n_train = instance["n_training_samples"] + assert len(individual["individual_risk"]) == n_train + # Values are 0 or 1. + assert all(v in (0, 1) for v in individual["individual_risk"]) + # Count of stored training records matches n_matched. + assert sum(individual["individual_risk"]) == instance["n_matched"] + + def test_report_individual_off_by_default(self): + """Without report_individual, no per-record block is emitted.""" + target = _make_target_clf(SVC(gamma=0.1)) + attack = InstanceBasedAttack( + output_dir="outputs_instance_based", write_report=False + ) + output = attack.attack(target) + + instance = output["attack_experiment_logger"]["attack_instance_logger"][ + "instance_0" + ] + assert "individual" not in instance + assert "example_matches" not in instance + + def test_empty_target_returns_empty(self): + """Test attack on empty target returns empty dict.""" + target = Target() + attack = InstanceBasedAttack( + output_dir="outputs_instance_based", write_report=False + ) + output = attack.attack(target) + assert output == {} + + +class TestGracefulDegradation: + """Tests for defensive / graceful-degradation paths.""" + + def test_unfitted_knn_warns_and_returns_zero(self, caplog): + """Unfitted kNN: no _fit_X attribute triggers a warning and zero stored.""" + X_train = np.array([[0.0, 0.0], [1.0, 1.0]]) + y_train = np.array([0, 1]) + unfitted = KNeighborsClassifier(n_neighbors=1) + target = Target( + model=unfitted, + X_train=X_train, + y_train=y_train, + X_test=X_train, + y_test=y_train, + ) + attack = InstanceBasedAttack( + output_dir="outputs_instance_based", write_report=False + ) + caplog.set_level( + logging.WARNING, logger="sacroml.attacks.instance_based_attack" + ) + output = attack.attack(target) + + instance = output["attack_experiment_logger"]["attack_instance_logger"][ + "instance_0" + ] + assert instance["model_type"] == "KNeighborsClassifier" + assert instance["n_stored_instances"] == 0 + assert instance["data_leakage_confirmed"] is False + assert any("_fit_X" in rec.message for rec in caplog.records) + + def test_feature_dim_mismatch_warns_and_returns(self, caplog): + """Stored support vectors and X_train with different feature counts.""" + X_train = np.array([[0.0, 0.0], [1.0, 1.0], [0.0, 1.0], [1.0, 0.0]]) + y_train = np.array([0, 1, 0, 1]) + model = SVC(gamma=0.1).fit(X_train, y_train) + target = Target( + model=model, + X_train=np.array([[0.0, 0.0, 0.0], [1.0, 1.0, 1.0]]), + y_train=np.array([0, 1]), + X_test=np.array([[0.0, 0.0, 0.0], [1.0, 1.0, 1.0]]), + y_test=np.array([0, 1]), + ) + attack = InstanceBasedAttack( + output_dir="outputs_instance_based", write_report=False + ) + caplog.set_level( + logging.WARNING, logger="sacroml.attacks.instance_based_attack" + ) + output = attack.attack(target) + + instance = output["attack_experiment_logger"]["attack_instance_logger"][ + "instance_0" + ] + assert instance["n_matched"] == 0 + assert instance["data_leakage_confirmed"] is False + assert instance["details"]["error"] == "Feature dimension mismatch" + assert any( + "Feature dimension mismatch" in rec.message for rec in caplog.records + ) + + def test_fallback_search_when_indices_wrong(self): + """Wrong support_ index hint forces the fallback brute-force loop.""" + X_train = np.array([[0.0, 0.0], [1.0, 1.0], [0.0, 1.0], [1.0, 0.0]]) + y_train = np.array([0, 1, 0, 1]) + model = SVC(gamma=0.1).fit(X_train, y_train) + model.support_ = np.zeros_like(model.support_) + target = Target( + model=model, + X_train=X_train, + y_train=y_train, + X_test=X_train, + y_test=y_train, + ) + attack = InstanceBasedAttack( + output_dir="outputs_instance_based", write_report=False + ) + output = attack.attack(target) + + instance = output["attack_experiment_logger"]["attack_instance_logger"][ + "instance_0" + ] + assert instance["data_leakage_confirmed"] is True + assert instance["n_matched"] > 0 diff --git a/tests/attacks/test_utils.py b/tests/attacks/test_utils.py new file mode 100644 index 00000000..8364cbba --- /dev/null +++ b/tests/attacks/test_utils.py @@ -0,0 +1,58 @@ +"""Tests for sacroml.attacks.utils helper functions.""" + +from __future__ import annotations + +import numpy as np +from sklearn.linear_model import LogisticRegression +from sklearn.pipeline import Pipeline +from sklearn.preprocessing import StandardScaler +from sklearn.svm import SVC + +from sacroml.attacks.utils import unwrap_model + + +class TestUnwrapModel: + """Tests for ``unwrap_model``.""" + + def test_non_pipeline_returns_model_and_none(self): + """A plain estimator is returned unchanged with no preprocessor.""" + model = SVC(gamma=0.1) + estimator, preprocessor = unwrap_model(model) + assert estimator is model + assert preprocessor is None + + def test_single_step_pipeline_returns_final_step_only(self): + """A one-step Pipeline yields its final estimator and no preprocessor.""" + final = LogisticRegression() + pipe = Pipeline([("clf", final)]) + estimator, preprocessor = unwrap_model(pipe) + assert estimator is final + assert preprocessor is None + + def test_multi_step_pipeline_splits_preprocessor_from_estimator(self): + """A multi-step Pipeline yields the final step and a Pipeline of the rest.""" + scaler = StandardScaler() + final = LogisticRegression() + pipe = Pipeline([("scaler", scaler), ("clf", final)]) + + estimator, preprocessor = unwrap_model(pipe) + + assert estimator is final + assert isinstance(preprocessor, Pipeline) + assert [name for name, _ in preprocessor.steps] == ["scaler"] + assert preprocessor.steps[0][1] is scaler + + def test_multi_step_preprocessor_transforms_input(self): + """The returned preprocessor can transform inputs end-to-end.""" + rng = np.random.default_rng(0) + X = rng.normal(size=(20, 3)) + y = rng.integers(0, 2, size=20) + + pipe = Pipeline([("scaler", StandardScaler()), ("clf", LogisticRegression())]) + pipe.fit(X, y) + + _, preprocessor = unwrap_model(pipe) + transformed = preprocessor.transform(X) + + np.testing.assert_allclose(transformed.mean(axis=0), 0, atol=1e-8) + np.testing.assert_allclose(transformed.std(axis=0), 1, atol=1e-1) diff --git a/tests/conftest.py b/tests/conftest.py index 7efca252..abc06920 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -37,6 +37,7 @@ def pytest_sessionfinish(session, exitstatus): "fit2.tf", "outputs", "output_attribute", + "outputs_instance_based", "output_lira", "output_pytorch", "output_sklearn",