From 7b963c01817572ef2823b8a4d44b3df6d665e40e Mon Sep 17 00:00:00 2001 From: shamykyzer Date: Fri, 27 Mar 2026 02:07:29 +0000 Subject: [PATCH 01/15] feat: add instance-based model attack to detect training data leakage in SVM/kNN models --- sacroml/attacks/factory.py | 2 + sacroml/attacks/instance_based_attack.py | 553 ++++++++++++++++++++ tests/attacks/test_instance_based_attack.py | 368 +++++++++++++ tests/conftest.py | 1 + 4 files changed, 924 insertions(+) create mode 100644 sacroml/attacks/instance_based_attack.py create mode 100644 tests/attacks/test_instance_based_attack.py diff --git a/sacroml/attacks/factory.py b/sacroml/attacks/factory.py index 701b5eb0..82b0b2b2 100644 --- a/sacroml/attacks/factory.py +++ b/sacroml/attacks/factory.py @@ -5,6 +5,7 @@ import yaml from sacroml.attacks.attribute_attack import AttributeAttack +from sacroml.attacks.instance_based_attack import InstanceBasedAttack from sacroml.attacks.likelihood_attack import LIRAAttack from sacroml.attacks.structural_attack import StructuralAttack from sacroml.attacks.target import Target @@ -16,6 +17,7 @@ registry: dict = { "attribute": AttributeAttack, + "instance_based": InstanceBasedAttack, "lira": LIRAAttack, "structural": StructuralAttack, "worstcase": WorstCaseAttack, diff --git a/sacroml/attacks/instance_based_attack.py b/sacroml/attacks/instance_based_attack.py new file mode 100644 index 00000000..656dc64e --- /dev/null +++ b/sacroml/attacks/instance_based_attack.py @@ -0,0 +1,553 @@ +"""Instance-based model attack. + +Detects when instance-based models (SVM, kNN) store training data as part +of their model parameters (support vectors or neighbors), confirming a +concrete data leakage pathway. + +This module provides the `InstanceBasedAttack` class, which: +- Checks if a model is an instance-based type (SVM or kNN) +- Extracts the stored instances (support vectors or neighbors) +- Compares them to the training data to confirm data leakage +- Reports matching examples and available mitigations +""" + +from __future__ import annotations + +import logging +from dataclasses import asdict, dataclass + +import numpy as np +from fpdf import FPDF +from sklearn.neighbors import KNeighborsClassifier, KNeighborsRegressor +from sklearn.svm import SVC, SVR, NuSVC, NuSVR, OneClassSVM + +try: + from sklearn.pipeline import Pipeline +except ImportError: # pragma: no cover + Pipeline = None + +from sacroml.attacks import report +from sacroml.attacks.attack import Attack +from sacroml.attacks.target import Target + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +SVM_TYPES = (SVC, NuSVC, SVR, NuSVR, OneClassSVM) +KNN_TYPES = (KNeighborsClassifier, KNeighborsRegressor) + +_INTRODUCTION = ( + "This report provides the results of an instance-based model data " + "leakage check. Some model types -- notably Support Vector Machines " + "(SVM) and k-Nearest Neighbours (kNN) -- store training data points " + "as part of their fitted model parameters. SVM models store 'support " + "vectors' (a subset of training records that define the decision " + "boundary), while kNN models store the entire training dataset. " + "When such a model is released from a Trusted Research Environment " + "(TRE), these stored data points can be directly extracted, " + "constituting a concrete data leakage risk.\n This attack extracts " + "any stored instances from the model, compares them against the " + "original training data, and reports whether matches are found." +) + +_GLOSSARY = { + "Support Vectors": ( + "In SVM models, support vectors are the training data points that " + "lie closest to the decision boundary. These are stored verbatim " + "inside the fitted model and can be extracted directly." + ), + "kNN Storage": ( + "k-Nearest Neighbours models store the entire training dataset " + "internally, as predictions are made by finding the k closest " + "stored points to a new input." + ), + "DP Variant": ( + "A differentially private variant of a model adds calibrated " + "noise to break the direct link between stored model parameters " + "and the original training data, mitigating the leakage risk." + ), + "Storage Fraction": ( + "The proportion of training data points stored inside the model. " + "For SVM this is typically a subset; for kNN this is 1.0 (all data)." + ), + "Match Fraction": ( + "The proportion of stored instances that exactly match a training " + "data point. A high match fraction confirms data leakage." + ), +} + + +@dataclass +class InstanceBasedAttackResults: + """Results of an instance-based model attack.""" + + model_type: str + is_instance_based: bool + is_dp_safe: bool + n_stored_instances: int + n_training_samples: int + storage_fraction: float + n_matched: int + n_checked: int + match_fraction: float + example_matches: list[dict] + data_leakage_confirmed: bool + mitigations: list[str] + details: dict | None = None + + +class InstanceBasedAttack(Attack): + """Detect training data stored in instance-based model parameters. + + Instance-based models such as SVM and kNN store training data points + (support vectors or all neighbors) inside the fitted model. This attack + extracts those stored instances, compares them to the training data, and + reports whether the model leaks training data. + """ + + def __init__( + self, + output_dir: str = "outputs", + write_report: bool = True, + n_examples: int = 10, + atol: float = 1e-8, + ) -> None: + """Construct an instance-based model attack. + + Parameters + ---------- + output_dir : str + Name of a directory to write outputs. + write_report : bool + Whether to generate a JSON and PDF report. + n_examples : int + Maximum number of matching examples to include in the report. + atol : float + Absolute tolerance for floating-point comparison when matching + stored instances to training data. + """ + super().__init__(output_dir=output_dir, write_report=write_report) + self.n_examples = n_examples + self.atol = atol + self.results: InstanceBasedAttackResults | None = None + + def __str__(self) -> str: + """Return the name of the attack.""" + return "Instance-Based Model Attack" + + @classmethod + def attackable(cls, target: Target) -> bool: + """Return whether a target can be assessed with this attack. + + Requires a model and training data. Non-instance-based models are + handled gracefully (reported as not applicable). + """ + if not target.has_model(): + logger.info("target.model is missing, cannot proceed") + return False + if not target.has_data(): + logger.info("target data is missing, cannot proceed") + return False + return True + + @staticmethod + def _unwrap_model(model): + """Extract the final estimator and preprocessor from a Pipeline. + + Returns + ------- + tuple + (final_estimator, preprocessor_pipeline_or_None) + If the model is a Pipeline with preprocessing steps, returns + a Pipeline of just the preprocessing steps so X_train can be + transformed to the same space as the stored instances. + """ + if Pipeline is not None and isinstance(model, Pipeline): + final_estimator = model.steps[-1][1] + preprocessor = ( + Pipeline(model.steps[:-1]) if len(model.steps) > 1 else None + ) + return final_estimator, preprocessor + return model, None + + def _compare_instances( + self, + stored_instances: np.ndarray, + stored_indices: np.ndarray | None, + X_train: np.ndarray, + ) -> tuple[int, list[dict]]: + """Compare stored model instances against training data. + + Parameters + ---------- + stored_instances : np.ndarray + Data points stored inside the model. + stored_indices : np.ndarray or None + Indices of stored instances into the original training data. + X_train : np.ndarray + The training data to compare against. + + Returns + ------- + n_matched : int + Number of stored instances that match training data. + example_matches : list[dict] + Details of the first n_examples matches. + """ + n_matched = 0 + example_matches: list[dict] = [] + + for i, stored_row in enumerate(stored_instances): + matched = False + match_index = None + + # Try index-based direct comparison first + if stored_indices is not None and i < len(stored_indices): + idx = int(stored_indices[i]) + if 0 <= idx < len(X_train) and np.allclose( + stored_row, X_train[idx], atol=self.atol + ): + matched = True + match_index = idx + + # Fallback: search through training data + if not matched: + for j in range(len(X_train)): + if np.allclose(stored_row, X_train[j], atol=self.atol): + matched = True + match_index = j + break + + if matched: + n_matched += 1 + if len(example_matches) < self.n_examples: + n_preview = min(5, stored_row.shape[0]) + example_matches.append( + { + "stored_index": i, + "training_index": match_index, + "stored_values": stored_row[:n_preview].tolist(), + "training_values": ( + X_train[match_index][:n_preview].tolist() + if match_index is not None + else None + ), + } + ) + + return n_matched, example_matches + + def _build_mitigations( + self, is_svm: bool, is_knn: bool, is_dp_safe: bool + ) -> list[str]: + """Build the list of available mitigations.""" + mitigations: list[str] = [] + + if is_dp_safe: + mitigations.append( + "This model uses a DP-safe variant. The stored parameters are " + "in a transformed/noisy space and do not directly correspond " + "to training data points." + ) + + if is_svm: + mitigations.append( + "Use a differentially private SVM variant (e.g., DPSVC from " + "sacroml.safemodel) which adds noise to the separating " + "hyperplane in a transformed feature space, breaking the " + "direct link between support vectors and training data." + ) + + if is_knn: + mitigations.append( + "kNN models inherently store all training data. Consider " + "using a model type that does not require storing training " + "instances (e.g., decision tree, random forest, or neural " + "network)." + ) + + mitigations.append( + "By agreement with the TRE, this risk may be deemed 'not " + "relevant' for this particular dataset if the data is already " + "public or low-sensitivity." + ) + + return mitigations + + def _attack(self, target: Target) -> dict: + """Run the instance-based model attack. + + Parameters + ---------- + target : Target + The target object containing the model and data. + + Returns + ------- + dict + Attack report dictionary. + """ + raw_model, preprocessor = self._unwrap_model(target.model.model) + model_type = type(raw_model).__name__ + + is_svm = isinstance(raw_model, SVM_TYPES) + is_knn = isinstance(raw_model, KNN_TYPES) + is_instance_based = is_svm or is_knn + + # Lazy import to avoid circular dependency + from sacroml.safemodel.classifiers.dp_svc import DPSVC # noqa: PLC0415 + + is_dp_safe = isinstance(raw_model, DPSVC) + + X_train = target.X_train + # If model was inside a Pipeline with preprocessing, transform + # X_train to the same space as the stored instances + if preprocessor is not None: + X_train = preprocessor.transform(X_train) + n_training = len(X_train) + + if not is_instance_based: + logger.info( + "Model type %s is not instance-based, no data leakage risk " + "from stored instances.", + model_type, + ) + self.results = InstanceBasedAttackResults( + model_type=model_type, + is_instance_based=False, + is_dp_safe=False, + n_stored_instances=0, + n_training_samples=n_training, + storage_fraction=0.0, + n_matched=0, + n_checked=0, + match_fraction=0.0, + example_matches=[], + data_leakage_confirmed=False, + mitigations=[], + ) + output = self._make_report(target) + self._write_report(output) + return output + + # Extract stored instances + stored_instances = None + stored_indices = None + + if is_svm: + if hasattr(raw_model, "support_vectors_"): + stored_instances = np.asarray(raw_model.support_vectors_) + stored_indices = np.asarray(raw_model.support_) + else: + logger.warning( + "SVM model %s does not have support_vectors_ attribute. " + "It may not be fitted.", + model_type, + ) + + if is_knn: + if hasattr(raw_model, "_fit_X"): + stored_instances = np.asarray(raw_model._fit_X) + stored_indices = np.arange(len(stored_instances)) + else: + logger.warning( + "kNN model %s does not have _fit_X attribute. " + "It may not be fitted.", + model_type, + ) + + if stored_instances is None: + self.results = InstanceBasedAttackResults( + model_type=model_type, + is_instance_based=True, + is_dp_safe=is_dp_safe, + n_stored_instances=0, + n_training_samples=n_training, + storage_fraction=0.0, + n_matched=0, + n_checked=0, + match_fraction=0.0, + example_matches=[], + data_leakage_confirmed=False, + mitigations=self._build_mitigations( + is_svm, is_knn, is_dp_safe + ), + ) + output = self._make_report(target) + self._write_report(output) + return output + + n_stored = len(stored_instances) + + # Check shape compatibility + if stored_instances.shape[1] != X_train.shape[1]: + logger.warning( + "Feature dimension mismatch: stored instances have %d " + "features, training data has %d. Cannot compare.", + stored_instances.shape[1], + X_train.shape[1], + ) + self.results = InstanceBasedAttackResults( + model_type=model_type, + is_instance_based=True, + is_dp_safe=is_dp_safe, + n_stored_instances=n_stored, + n_training_samples=n_training, + storage_fraction=n_stored / n_training if n_training > 0 else 0.0, + n_matched=0, + n_checked=0, + match_fraction=0.0, + example_matches=[], + data_leakage_confirmed=False, + mitigations=self._build_mitigations( + is_svm, is_knn, is_dp_safe + ), + details={"error": "Feature dimension mismatch"}, + ) + output = self._make_report(target) + self._write_report(output) + return output + + # Compare stored instances to training data + n_matched, example_matches = self._compare_instances( + stored_instances, stored_indices, X_train + ) + + storage_fraction = n_stored / n_training if n_training > 0 else 0.0 + match_fraction = n_matched / n_stored if n_stored > 0 else 0.0 + data_leakage_confirmed = n_matched > 0 + + mitigations = self._build_mitigations(is_svm, is_knn, is_dp_safe) + + self.results = InstanceBasedAttackResults( + model_type=model_type, + is_instance_based=True, + is_dp_safe=is_dp_safe, + n_stored_instances=n_stored, + n_training_samples=n_training, + storage_fraction=storage_fraction, + n_matched=n_matched, + n_checked=n_stored, + match_fraction=match_fraction, + example_matches=example_matches, + data_leakage_confirmed=data_leakage_confirmed, + mitigations=mitigations, + ) + + output = self._make_report(target) + self._write_report(output) + return output + + def _construct_metadata(self) -> None: + """Construct the metadata dictionary for reporting.""" + super()._construct_metadata() + if self.results: + self.metadata["global_metrics"] = { + "model_type": self.results.model_type, + "is_instance_based": self.results.is_instance_based, + "is_dp_safe": self.results.is_dp_safe, + "n_stored_instances": self.results.n_stored_instances, + "n_training_samples": self.results.n_training_samples, + "storage_fraction": self.results.storage_fraction, + "n_matched": self.results.n_matched, + "match_fraction": self.results.match_fraction, + "data_leakage_confirmed": self.results.data_leakage_confirmed, + } + + def _get_attack_metrics_instances(self) -> dict: + """Return attack metrics for the report structure.""" + attack_metrics_experiment = {} + if self.results: + attack_metrics_instances = { + "instance_0": asdict(self.results), + } + attack_metrics_experiment["attack_instance_logger"] = ( + attack_metrics_instances + ) + return attack_metrics_experiment + + def _make_pdf(self, output: dict) -> FPDF: + """Create PDF report. + + Returns + ------- + FPDF : A PDF object containing the instance-based attack report. + """ + metadata = output["metadata"] + metrics = metadata["global_metrics"] + instance_data = output["attack_experiment_logger"][ + "attack_instance_logger" + ]["instance_0"] + + pdf = FPDF() + pdf.add_page() + pdf.set_xy(0, 0) + + report.title(pdf, "Instance-Based Model Attack Report") + report.subtitle(pdf, "Introduction") + report.line(pdf, _INTRODUCTION) + + report.subtitle(pdf, "Experiment Summary") + report.line( + pdf, + f"{'sacroml_version':>30s}: " + f"{str(metadata['sacroml_version']):30s}", + font="courier", + ) + for key, value in metadata["attack_params"].items(): + report.line( + pdf, f"{key:>30s}: {str(value):30s}", font="courier" + ) + + report.subtitle(pdf, "Risk Summary") + for key in ( + "model_type", + "is_instance_based", + "is_dp_safe", + "data_leakage_confirmed", + "n_stored_instances", + "n_training_samples", + "storage_fraction", + "n_matched", + "match_fraction", + ): + value = metrics.get(key, "N/A") + report.line( + pdf, f"{key:>30s}: {str(value):30s}", font="courier" + ) + + # Example matches + example_matches = instance_data.get("example_matches", []) + if example_matches: + pdf.add_page() + report.title(pdf, "Example Matches") + report.line( + pdf, + f"Showing {len(example_matches)} example(s) of training " + "data found stored in the model (first 5 feature values):", + ) + for i, match in enumerate(example_matches): + report.line( + pdf, + f" Match {i + 1}: " + f"stored[{match.get('stored_index', '?')}] " + f"= train[{match.get('training_index', '?')}] " + f"values: {match.get('stored_values', [])}", + font="courier", + font_size=9, + ) + + # Mitigations + mitigations = instance_data.get("mitigations", []) + if mitigations: + pdf.add_page() + report.title(pdf, "Available Mitigations") + for i, mitigation in enumerate(mitigations): + report.subtitle(pdf, f"Option {i + 1}") + report.line(pdf, mitigation) + + pdf.add_page() + report.title(pdf, "Glossary") + report._write_dict(pdf, _GLOSSARY) + + return pdf diff --git a/tests/attacks/test_instance_based_attack.py b/tests/attacks/test_instance_based_attack.py new file mode 100644 index 00000000..c2efce21 --- /dev/null +++ b/tests/attacks/test_instance_based_attack.py @@ -0,0 +1,368 @@ +"""Tests for InstanceBasedAttack.""" + +import os + +import pytest +from sklearn.datasets import make_moons, make_regression +from sklearn.model_selection import train_test_split +from sklearn.neighbors import KNeighborsClassifier, KNeighborsRegressor +from sklearn.pipeline import Pipeline +from sklearn.preprocessing import StandardScaler +from sklearn.svm import SVC, SVR, NuSVC +from sklearn.tree import DecisionTreeClassifier + +from sacroml.attacks.factory import create_attack +from sacroml.attacks.instance_based_attack import InstanceBasedAttack +from sacroml.attacks.target import Target +from sacroml.safemodel.classifiers.dp_svc import DPSVC + + +def _make_target_clf(model, n_samples=100, random_state=42): + """Create a target with a fitted classification model on synthetic data.""" + X, y = make_moons(n_samples=n_samples, noise=0.3, random_state=random_state) + X_train, X_test, y_train, y_test = train_test_split( + X, y, test_size=0.2, random_state=random_state + ) + model.fit(X_train, y_train) + return Target( + model=model, + X_train=X_train, + y_train=y_train, + X_test=X_test, + y_test=y_test, + ) + + +def _make_target_reg(model, n_samples=100, random_state=42): + """Create a target with a fitted regression model on synthetic data.""" + X, y = make_regression( + n_samples=n_samples, n_features=2, noise=0.1, random_state=random_state + ) + X_train, X_test, y_train, y_test = train_test_split( + X, y, test_size=0.2, random_state=random_state + ) + model.fit(X_train, y_train) + return Target( + model=model, + X_train=X_train, + y_train=y_train, + X_test=X_test, + y_test=y_test, + ) + + +class TestAttackable: + """Tests for the attackable classmethod.""" + + def test_no_model(self): + """Test attackable returns False with no model.""" + target = Target() + assert not InstanceBasedAttack.attackable(target) + + def test_no_data(self): + """Test attackable returns False with no training data.""" + model = SVC(gamma=0.1) + X, y = make_moons(n_samples=50, noise=0.3, random_state=42) + model.fit(X, y) + target = Target(model=model) + assert not InstanceBasedAttack.attackable(target) + + def test_valid_target(self): + """Test attackable returns True for valid target.""" + target = _make_target_clf(SVC(gamma=0.1)) + assert InstanceBasedAttack.attackable(target) + + def test_non_instance_model_attackable(self): + """Test attackable returns True for non-instance models too.""" + target = _make_target_clf(DecisionTreeClassifier(random_state=42)) + assert InstanceBasedAttack.attackable(target) + + +class TestSVMDetection: + """Tests for SVM model detection and leakage confirmation.""" + + def test_svc_detects_leakage(self): + """Test SVC support vector leakage is detected.""" + target = _make_target_clf(SVC(gamma=0.1)) + attack = InstanceBasedAttack( + output_dir="outputs_instance_based", write_report=False + ) + output = attack.attack(target) + + assert output + instance = output["attack_experiment_logger"]["attack_instance_logger"][ + "instance_0" + ] + assert instance["is_instance_based"] is True + assert instance["model_type"] == "SVC" + assert instance["data_leakage_confirmed"] is True + assert instance["n_matched"] > 0 + assert instance["match_fraction"] > 0 + assert instance["n_stored_instances"] > 0 + assert len(instance["mitigations"]) > 0 + + def test_nusvc_detects_leakage(self): + """Test NuSVC support vector leakage is detected.""" + target = _make_target_clf(NuSVC()) + attack = InstanceBasedAttack( + output_dir="outputs_instance_based", write_report=False + ) + output = attack.attack(target) + + instance = output["attack_experiment_logger"]["attack_instance_logger"][ + "instance_0" + ] + assert instance["is_instance_based"] is True + assert instance["model_type"] == "NuSVC" + assert instance["data_leakage_confirmed"] is True + + def test_svr_detects_leakage(self): + """Test SVR support vector leakage is detected.""" + target = _make_target_reg(SVR()) + attack = InstanceBasedAttack( + output_dir="outputs_instance_based", write_report=False + ) + output = attack.attack(target) + + instance = output["attack_experiment_logger"]["attack_instance_logger"][ + "instance_0" + ] + assert instance["is_instance_based"] is True + assert instance["model_type"] == "SVR" + assert instance["data_leakage_confirmed"] is True + + +class TestKNNDetection: + """Tests for kNN model detection and leakage confirmation.""" + + def test_knn_detects_leakage(self): + """Test KNeighborsClassifier stores all training data.""" + target = _make_target_clf(KNeighborsClassifier(n_neighbors=3)) + attack = InstanceBasedAttack( + output_dir="outputs_instance_based", write_report=False + ) + output = attack.attack(target) + + instance = output["attack_experiment_logger"]["attack_instance_logger"][ + "instance_0" + ] + assert instance["is_instance_based"] is True + assert instance["model_type"] == "KNeighborsClassifier" + assert instance["data_leakage_confirmed"] is True + assert instance["storage_fraction"] == pytest.approx(1.0) + assert instance["match_fraction"] == pytest.approx(1.0) + + def test_knn_regressor_leakage(self): + """Test KNeighborsRegressor stores all training data.""" + target = _make_target_reg(KNeighborsRegressor(n_neighbors=3)) + attack = InstanceBasedAttack( + output_dir="outputs_instance_based", write_report=False + ) + output = attack.attack(target) + + instance = output["attack_experiment_logger"]["attack_instance_logger"][ + "instance_0" + ] + assert instance["is_instance_based"] is True + assert instance["model_type"] == "KNeighborsRegressor" + assert instance["data_leakage_confirmed"] is True + assert instance["storage_fraction"] == pytest.approx(1.0) + + +class TestNonInstanceModels: + """Tests for non-instance-based models.""" + + def test_decision_tree_safe(self): + """Test DecisionTree is not flagged as instance-based.""" + target = _make_target_clf(DecisionTreeClassifier(random_state=42)) + attack = InstanceBasedAttack( + output_dir="outputs_instance_based", write_report=False + ) + output = attack.attack(target) + + instance = output["attack_experiment_logger"]["attack_instance_logger"][ + "instance_0" + ] + assert instance["is_instance_based"] is False + assert instance["data_leakage_confirmed"] is False + assert instance["n_stored_instances"] == 0 + assert instance["n_matched"] == 0 + + +class TestDPSVC: + """Tests for differentially private SVM variant.""" + + def test_dpsvc_is_dp_safe(self): + """Test DPSVC is detected as DP-safe.""" + X, y = make_moons(n_samples=100, noise=0.3, random_state=42) + X_train, X_test, y_train, y_test = train_test_split( + X, y, test_size=0.2, random_state=42 + ) + model = DPSVC(eps=10) + model.fit(X_train, y_train) + target = Target( + model=model, + X_train=X_train, + y_train=y_train, + X_test=X_test, + y_test=y_test, + ) + attack = InstanceBasedAttack( + output_dir="outputs_instance_based", write_report=False + ) + output = attack.attack(target) + + instance = output["attack_experiment_logger"]["attack_instance_logger"][ + "instance_0" + ] + assert instance["is_dp_safe"] is True + assert any("DP-safe" in m for m in instance["mitigations"]) + + +class TestPipeline: + """Tests for Pipeline unwrapping.""" + + def test_pipeline_svc(self): + """Test SVC inside a Pipeline is detected and leakage confirmed.""" + model = Pipeline( + [("scaler", StandardScaler()), ("svc", SVC(gamma=0.1))] + ) + X, y = make_moons(n_samples=100, noise=0.3, random_state=42) + X_train, X_test, y_train, y_test = train_test_split( + X, y, test_size=0.2, random_state=42 + ) + model.fit(X_train, y_train) + target = Target( + model=model, + X_train=X_train, + y_train=y_train, + X_test=X_test, + y_test=y_test, + ) + attack = InstanceBasedAttack( + output_dir="outputs_instance_based", write_report=False + ) + output = attack.attack(target) + + instance = output["attack_experiment_logger"]["attack_instance_logger"][ + "instance_0" + ] + assert instance["is_instance_based"] is True + assert instance["model_type"] == "SVC" + assert instance["data_leakage_confirmed"] is True + assert instance["n_matched"] > 0 + + +class TestConfiguration: + """Tests for attack configuration and parameters.""" + + def test_n_examples_limit(self): + """Test example matches are capped at n_examples.""" + target = _make_target_clf(SVC(gamma=0.1)) + attack = InstanceBasedAttack( + output_dir="outputs_instance_based", + write_report=False, + n_examples=3, + ) + output = attack.attack(target) + + instance = output["attack_experiment_logger"]["attack_instance_logger"][ + "instance_0" + ] + assert len(instance["example_matches"]) <= 3 + + def test_str_representation(self): + """Test __str__ returns the attack name.""" + attack = InstanceBasedAttack( + output_dir="outputs_instance_based", write_report=False + ) + assert str(attack) == "Instance-Based Model Attack" + + def test_get_params(self): + """Test get_params returns constructor parameters.""" + attack = InstanceBasedAttack( + output_dir="outputs_instance_based", + write_report=False, + n_examples=5, + atol=1e-6, + ) + params = attack.get_params() + assert params["n_examples"] == 5 + assert params["atol"] == 1e-6 + assert params["output_dir"] == "outputs_instance_based" + + def test_factory_registration(self): + """Test attack is registered in the factory.""" + attack = create_attack( + "instance_based", + output_dir="outputs_instance_based", + write_report=False, + ) + assert isinstance(attack, InstanceBasedAttack) + + +class TestOutputStructure: + """Tests for output format and report generation.""" + + def test_output_structure(self): + """Test output dict has required keys and metadata.""" + target = _make_target_clf(SVC(gamma=0.1)) + attack = InstanceBasedAttack( + output_dir="outputs_instance_based", write_report=False + ) + output = attack.attack(target) + + assert "log_id" in output + assert "log_time" in output + assert "metadata" in output + assert "attack_experiment_logger" in output + + metadata = output["metadata"] + assert metadata["attack_name"] == "Instance-Based Model Attack" + assert "global_metrics" in metadata + assert "data_leakage_confirmed" in metadata["global_metrics"] + + def test_report_files_created(self): + """Test JSON and PDF report files are generated.""" + target = _make_target_clf(SVC(gamma=0.1)) + attack = InstanceBasedAttack( + output_dir="outputs_instance_based", write_report=True + ) + attack.attack(target) + + assert os.path.exists( + os.path.join("outputs_instance_based", "report.json") + ) + assert os.path.exists( + os.path.join("outputs_instance_based", "report.pdf") + ) + + def test_example_match_structure(self): + """Test example matches contain expected fields.""" + target = _make_target_clf(SVC(gamma=0.1)) + attack = InstanceBasedAttack( + output_dir="outputs_instance_based", write_report=False + ) + output = attack.attack(target) + + instance = output["attack_experiment_logger"]["attack_instance_logger"][ + "instance_0" + ] + matches = instance["example_matches"] + assert len(matches) > 0 + + match = matches[0] + assert "stored_index" in match + assert "training_index" in match + assert "stored_values" in match + assert "training_values" in match + assert isinstance(match["stored_values"], list) + + def test_empty_target_returns_empty(self): + """Test attack on empty target returns empty dict.""" + target = Target() + attack = InstanceBasedAttack( + output_dir="outputs_instance_based", write_report=False + ) + output = attack.attack(target) + assert output == {} diff --git a/tests/conftest.py b/tests/conftest.py index 120034e6..e43b44f1 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -36,6 +36,7 @@ def pytest_sessionfinish(session, exitstatus): "fit2.tf", "outputs", "output_attribute", + "outputs_instance_based", "output_lira", "output_pytorch", "output_sklearn", From da9ed7d18492326d7fc97166cf2476957559293f Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 27 Mar 2026 02:45:00 +0000 Subject: [PATCH 02/15] style: pre-commit fixes --- sacroml/attacks/instance_based_attack.py | 29 +++++++-------------- tests/attacks/test_instance_based_attack.py | 12 +++------ 2 files changed, 12 insertions(+), 29 deletions(-) diff --git a/sacroml/attacks/instance_based_attack.py b/sacroml/attacks/instance_based_attack.py index 656dc64e..26fe75a2 100644 --- a/sacroml/attacks/instance_based_attack.py +++ b/sacroml/attacks/instance_based_attack.py @@ -164,9 +164,7 @@ def _unwrap_model(model): """ if Pipeline is not None and isinstance(model, Pipeline): final_estimator = model.steps[-1][1] - preprocessor = ( - Pipeline(model.steps[:-1]) if len(model.steps) > 1 else None - ) + preprocessor = Pipeline(model.steps[:-1]) if len(model.steps) > 1 else None return final_estimator, preprocessor return model, None @@ -369,9 +367,7 @@ def _attack(self, target: Target) -> dict: match_fraction=0.0, example_matches=[], data_leakage_confirmed=False, - mitigations=self._build_mitigations( - is_svm, is_knn, is_dp_safe - ), + mitigations=self._build_mitigations(is_svm, is_knn, is_dp_safe), ) output = self._make_report(target) self._write_report(output) @@ -399,9 +395,7 @@ def _attack(self, target: Target) -> dict: match_fraction=0.0, example_matches=[], data_leakage_confirmed=False, - mitigations=self._build_mitigations( - is_svm, is_knn, is_dp_safe - ), + mitigations=self._build_mitigations(is_svm, is_knn, is_dp_safe), details={"error": "Feature dimension mismatch"}, ) output = self._make_report(target) @@ -475,9 +469,9 @@ def _make_pdf(self, output: dict) -> FPDF: """ metadata = output["metadata"] metrics = metadata["global_metrics"] - instance_data = output["attack_experiment_logger"][ - "attack_instance_logger" - ]["instance_0"] + instance_data = output["attack_experiment_logger"]["attack_instance_logger"][ + "instance_0" + ] pdf = FPDF() pdf.add_page() @@ -490,14 +484,11 @@ def _make_pdf(self, output: dict) -> FPDF: report.subtitle(pdf, "Experiment Summary") report.line( pdf, - f"{'sacroml_version':>30s}: " - f"{str(metadata['sacroml_version']):30s}", + f"{'sacroml_version':>30s}: {str(metadata['sacroml_version']):30s}", font="courier", ) for key, value in metadata["attack_params"].items(): - report.line( - pdf, f"{key:>30s}: {str(value):30s}", font="courier" - ) + report.line(pdf, f"{key:>30s}: {str(value):30s}", font="courier") report.subtitle(pdf, "Risk Summary") for key in ( @@ -512,9 +503,7 @@ def _make_pdf(self, output: dict) -> FPDF: "match_fraction", ): value = metrics.get(key, "N/A") - report.line( - pdf, f"{key:>30s}: {str(value):30s}", font="courier" - ) + report.line(pdf, f"{key:>30s}: {str(value):30s}", font="courier") # Example matches example_matches = instance_data.get("example_matches", []) diff --git a/tests/attacks/test_instance_based_attack.py b/tests/attacks/test_instance_based_attack.py index c2efce21..acdbcd22 100644 --- a/tests/attacks/test_instance_based_attack.py +++ b/tests/attacks/test_instance_based_attack.py @@ -224,9 +224,7 @@ class TestPipeline: def test_pipeline_svc(self): """Test SVC inside a Pipeline is detected and leakage confirmed.""" - model = Pipeline( - [("scaler", StandardScaler()), ("svc", SVC(gamma=0.1))] - ) + model = Pipeline([("scaler", StandardScaler()), ("svc", SVC(gamma=0.1))]) X, y = make_moons(n_samples=100, noise=0.3, random_state=42) X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=42 @@ -330,12 +328,8 @@ def test_report_files_created(self): ) attack.attack(target) - assert os.path.exists( - os.path.join("outputs_instance_based", "report.json") - ) - assert os.path.exists( - os.path.join("outputs_instance_based", "report.pdf") - ) + assert os.path.exists(os.path.join("outputs_instance_based", "report.json")) + assert os.path.exists(os.path.join("outputs_instance_based", "report.pdf")) def test_example_match_structure(self): """Test example matches contain expected fields.""" From 6c10dc3a08cd8e11c5a5cc9f5f8514fe1dd7bfc9 Mon Sep 17 00:00:00 2001 From: shamykyzer Date: Tue, 12 May 2026 17:01:14 +0300 Subject: [PATCH 03/15] docs: clarify match fraction glossary to flag any non-zero leakage --- sacroml/attacks/instance_based_attack.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sacroml/attacks/instance_based_attack.py b/sacroml/attacks/instance_based_attack.py index 26fe75a2..48ed3bca 100644 --- a/sacroml/attacks/instance_based_attack.py +++ b/sacroml/attacks/instance_based_attack.py @@ -72,7 +72,7 @@ ), "Match Fraction": ( "The proportion of stored instances that exactly match a training " - "data point. A high match fraction confirms data leakage." + "data point. A non-zero match fraction confirms data leakage." ), } From 635983569c9830e02e7971e62b69b7745cfce92f Mon Sep 17 00:00:00 2001 From: shamykyzer Date: Wed, 13 May 2026 06:31:13 +0300 Subject: [PATCH 04/15] refactor: name magic numbers as N_EXAMPLES and N_FEATURE_PREVIEW constants --- sacroml/attacks/instance_based_attack.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sacroml/attacks/instance_based_attack.py b/sacroml/attacks/instance_based_attack.py index 48ed3bca..e4658171 100644 --- a/sacroml/attacks/instance_based_attack.py +++ b/sacroml/attacks/instance_based_attack.py @@ -36,6 +36,9 @@ SVM_TYPES = (SVC, NuSVC, SVR, NuSVR, OneClassSVM) KNN_TYPES = (KNeighborsClassifier, KNeighborsRegressor) +N_EXAMPLES = 10 # default number of matching examples included in the report +N_FEATURE_PREVIEW = 10 # number of feature values shown per example match + _INTRODUCTION = ( "This report provides the results of an instance-based model data " "leakage check. Some model types -- notably Support Vector Machines " @@ -109,7 +112,7 @@ def __init__( self, output_dir: str = "outputs", write_report: bool = True, - n_examples: int = 10, + n_examples: int = N_EXAMPLES, atol: float = 1e-8, ) -> None: """Construct an instance-based model attack. @@ -219,7 +222,7 @@ def _compare_instances( if matched: n_matched += 1 if len(example_matches) < self.n_examples: - n_preview = min(5, stored_row.shape[0]) + n_preview = min(N_FEATURE_PREVIEW, stored_row.shape[0]) example_matches.append( { "stored_index": i, From ffa061c7edcfb71af2af417bb27d445848e791f5 Mon Sep 17 00:00:00 2001 From: shamykyzer Date: Fri, 15 May 2026 20:36:36 +0300 Subject: [PATCH 05/15] style: add type annotations to _unwrap_model --- sacroml/attacks/instance_based_attack.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sacroml/attacks/instance_based_attack.py b/sacroml/attacks/instance_based_attack.py index e4658171..77b92535 100644 --- a/sacroml/attacks/instance_based_attack.py +++ b/sacroml/attacks/instance_based_attack.py @@ -15,6 +15,7 @@ import logging from dataclasses import asdict, dataclass +from typing import Any import numpy as np from fpdf import FPDF @@ -154,7 +155,7 @@ def attackable(cls, target: Target) -> bool: return True @staticmethod - def _unwrap_model(model): + def _unwrap_model(model: Any) -> tuple[Any, Any]: """Extract the final estimator and preprocessor from a Pipeline. Returns From 445a9da193e1afce55262378610ba38364f1e3af Mon Sep 17 00:00:00 2001 From: shamykyzer Date: Mon, 18 May 2026 16:43:07 +0300 Subject: [PATCH 06/15] refactor: tighten _unwrap_model annotations to sklearn types --- sacroml/attacks/instance_based_attack.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sacroml/attacks/instance_based_attack.py b/sacroml/attacks/instance_based_attack.py index 77b92535..fc96bbeb 100644 --- a/sacroml/attacks/instance_based_attack.py +++ b/sacroml/attacks/instance_based_attack.py @@ -15,10 +15,10 @@ import logging from dataclasses import asdict, dataclass -from typing import Any import numpy as np from fpdf import FPDF +from sklearn.base import BaseEstimator from sklearn.neighbors import KNeighborsClassifier, KNeighborsRegressor from sklearn.svm import SVC, SVR, NuSVC, NuSVR, OneClassSVM @@ -155,7 +155,7 @@ def attackable(cls, target: Target) -> bool: return True @staticmethod - def _unwrap_model(model: Any) -> tuple[Any, Any]: + def _unwrap_model(model: BaseEstimator) -> tuple[BaseEstimator, Pipeline | None]: """Extract the final estimator and preprocessor from a Pipeline. Returns From ea331ab068f142671351fa6b58f15461618f40fc Mon Sep 17 00:00:00 2001 From: shamykyzer Date: Thu, 21 May 2026 14:15:36 +0300 Subject: [PATCH 07/15] refactor: extract INSTANCE_MATCH_ATOL constant for InstanceBasedAttack --- CHANGELOG.md | 1 + sacroml/attacks/instance_based_attack.py | 16 +++++++++++++++- tests/attacks/test_instance_based_attack.py | 11 ++++++++++- 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c9b8bc3d..10e5f310 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## [Unreleased] Changes: +* Refactor: Name `InstanceBasedAttack`'s default floating-point matching tolerance as the module-level constant `INSTANCE_MATCH_ATOL = 1e-8` ([#454](https://github.com/AI-SDC/SACRO-ML/issues/454)). `StructuralAttack` is intentionally not changed because it uses exact `np.unique` equality on deterministic `predict_proba` outputs and does not need a tolerance. * Feat: `QMIAAttack`: membership inference attack via quantile regression (Bertran et al., NeurIPS 2023, arXiv:2307.03694). Trains a histogram-based quantile regressor (`HistGradientBoostingRegressor`) on non-member hinge scores to learn per-sample diff --git a/sacroml/attacks/instance_based_attack.py b/sacroml/attacks/instance_based_attack.py index fc96bbeb..f55d529a 100644 --- a/sacroml/attacks/instance_based_attack.py +++ b/sacroml/attacks/instance_based_attack.py @@ -40,6 +40,20 @@ N_EXAMPLES = 10 # default number of matching examples included in the report N_FEATURE_PREVIEW = 10 # number of feature values shown per example match +INSTANCE_MATCH_ATOL: float = 1e-8 +"""Absolute tolerance for matching stored instances to training rows. + +Used by :func:`numpy.allclose` so that stored support vectors (or kNN +neighbours) that differ from the original training row only by floating +point rounding (~1e-16 per element) still count as matches. + +Kept local to this module rather than a shared constants module: see +issue #454. ``StructuralAttack`` does not use a numerical tolerance +because its equivalence classes come from :func:`numpy.unique` on +deterministic ``predict_proba`` outputs, where identical inputs produce +bit-identical outputs and exact equality is the right semantics. +""" + _INTRODUCTION = ( "This report provides the results of an instance-based model data " "leakage check. Some model types -- notably Support Vector Machines " @@ -114,7 +128,7 @@ def __init__( output_dir: str = "outputs", write_report: bool = True, n_examples: int = N_EXAMPLES, - atol: float = 1e-8, + atol: float = INSTANCE_MATCH_ATOL, ) -> None: """Construct an instance-based model attack. diff --git a/tests/attacks/test_instance_based_attack.py b/tests/attacks/test_instance_based_attack.py index acdbcd22..4aeb27f6 100644 --- a/tests/attacks/test_instance_based_attack.py +++ b/tests/attacks/test_instance_based_attack.py @@ -12,7 +12,10 @@ from sklearn.tree import DecisionTreeClassifier from sacroml.attacks.factory import create_attack -from sacroml.attacks.instance_based_attack import InstanceBasedAttack +from sacroml.attacks.instance_based_attack import ( + INSTANCE_MATCH_ATOL, + InstanceBasedAttack, +) from sacroml.attacks.target import Target from sacroml.safemodel.classifiers.dp_svc import DPSVC @@ -289,6 +292,12 @@ def test_get_params(self): assert params["atol"] == 1e-6 assert params["output_dir"] == "outputs_instance_based" + def test_default_atol_is_module_constant(self): + """Default atol matches INSTANCE_MATCH_ATOL, see issue #454.""" + attack = InstanceBasedAttack(write_report=False) + assert attack.atol == INSTANCE_MATCH_ATOL + assert INSTANCE_MATCH_ATOL == 1e-8 + def test_factory_registration(self): """Test attack is registered in the factory.""" attack = create_attack( From a05adbcd6155c5e6f73ed9d3fcbe556c982b5627 Mon Sep 17 00:00:00 2001 From: shamykyzer Date: Thu, 21 May 2026 13:27:44 +0300 Subject: [PATCH 08/15] refactor: move unwrap_model to sacroml.attacks.utils for reuse --- CHANGELOG.md | 4 ++ sacroml/attacks/instance_based_attack.py | 27 +---------- sacroml/attacks/utils.py | 33 ++++++++++++++ tests/attacks/test_utils.py | 58 ++++++++++++++++++++++++ 4 files changed, 97 insertions(+), 25 deletions(-) create mode 100644 tests/attacks/test_utils.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 10e5f310..bc8d893f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,10 @@ Changes: membership thresholds. A sample is predicted as a member when its observed score exceeds the predicted threshold at quantile level (1 - alpha). No shadow models or architecture knowledge required. Registered in the attack factory as `"qmia"`. +* Refactor: move `unwrap_model` from `InstanceBasedAttack` to `sacroml.attacks.utils` + so it can be reused by other attacks that need to split a scikit-learn `Pipeline` + into its final estimator and preprocessing stages + ([#455](https://github.com/AI-SDC/SACRO-ML/issues/455)). ## Version 1.4.3 (Jan 29, 2026) diff --git a/sacroml/attacks/instance_based_attack.py b/sacroml/attacks/instance_based_attack.py index f55d529a..6b861230 100644 --- a/sacroml/attacks/instance_based_attack.py +++ b/sacroml/attacks/instance_based_attack.py @@ -18,18 +18,13 @@ import numpy as np from fpdf import FPDF -from sklearn.base import BaseEstimator from sklearn.neighbors import KNeighborsClassifier, KNeighborsRegressor from sklearn.svm import SVC, SVR, NuSVC, NuSVR, OneClassSVM -try: - from sklearn.pipeline import Pipeline -except ImportError: # pragma: no cover - Pipeline = None - from sacroml.attacks import report from sacroml.attacks.attack import Attack from sacroml.attacks.target import Target +from sacroml.attacks.utils import unwrap_model logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -168,24 +163,6 @@ def attackable(cls, target: Target) -> bool: return False return True - @staticmethod - def _unwrap_model(model: BaseEstimator) -> tuple[BaseEstimator, Pipeline | None]: - """Extract the final estimator and preprocessor from a Pipeline. - - Returns - ------- - tuple - (final_estimator, preprocessor_pipeline_or_None) - If the model is a Pipeline with preprocessing steps, returns - a Pipeline of just the preprocessing steps so X_train can be - transformed to the same space as the stored instances. - """ - if Pipeline is not None and isinstance(model, Pipeline): - final_estimator = model.steps[-1][1] - preprocessor = Pipeline(model.steps[:-1]) if len(model.steps) > 1 else None - return final_estimator, preprocessor - return model, None - def _compare_instances( self, stored_instances: np.ndarray, @@ -303,7 +280,7 @@ def _attack(self, target: Target) -> dict: dict Attack report dictionary. """ - raw_model, preprocessor = self._unwrap_model(target.model.model) + raw_model, preprocessor = unwrap_model(target.model.model) model_type = type(raw_model).__name__ is_svm = isinstance(raw_model, SVM_TYPES) diff --git a/sacroml/attacks/utils.py b/sacroml/attacks/utils.py index 9543f1bc..79301267 100644 --- a/sacroml/attacks/utils.py +++ b/sacroml/attacks/utils.py @@ -10,6 +10,7 @@ import numpy as np from scipy.stats import shapiro from sklearn.base import BaseEstimator +from sklearn.pipeline import Pipeline from sacroml.attacks.model import Model from sacroml.attacks.target import Target @@ -278,3 +279,35 @@ def get_class_by_name(class_path: str) -> type[object]: module_path, class_name = class_path.rsplit(".", 1) module = importlib.import_module(module_path) return getattr(module, class_name) + + +def unwrap_model( + model: BaseEstimator, +) -> tuple[BaseEstimator, Pipeline | None]: + """Extract the final estimator and preprocessor from a scikit-learn model. + + If ``model`` is a :class:`sklearn.pipeline.Pipeline`, the final step is + returned as the estimator and a new ``Pipeline`` containing the remaining + earlier steps is returned as the preprocessor. This allows callers to + transform inputs into the same feature space the final estimator was + fitted on. If the Pipeline has only one step, no preprocessor exists and + ``None`` is returned in its place. Non-Pipeline models are returned + unchanged with ``None`` as the preprocessor. + + Parameters + ---------- + model : BaseEstimator + A fitted scikit-learn estimator, optionally wrapped in a ``Pipeline``. + + Returns + ------- + tuple[BaseEstimator, Pipeline | None] + ``(final_estimator, preprocessor)`` where ``preprocessor`` is a + Pipeline of all steps except the last, or ``None`` if the input is + not a Pipeline or is a single-step Pipeline. + """ + if isinstance(model, Pipeline): + final_estimator = model.steps[-1][1] + preprocessor = Pipeline(model.steps[:-1]) if len(model.steps) > 1 else None + return final_estimator, preprocessor + return model, None diff --git a/tests/attacks/test_utils.py b/tests/attacks/test_utils.py new file mode 100644 index 00000000..8364cbba --- /dev/null +++ b/tests/attacks/test_utils.py @@ -0,0 +1,58 @@ +"""Tests for sacroml.attacks.utils helper functions.""" + +from __future__ import annotations + +import numpy as np +from sklearn.linear_model import LogisticRegression +from sklearn.pipeline import Pipeline +from sklearn.preprocessing import StandardScaler +from sklearn.svm import SVC + +from sacroml.attacks.utils import unwrap_model + + +class TestUnwrapModel: + """Tests for ``unwrap_model``.""" + + def test_non_pipeline_returns_model_and_none(self): + """A plain estimator is returned unchanged with no preprocessor.""" + model = SVC(gamma=0.1) + estimator, preprocessor = unwrap_model(model) + assert estimator is model + assert preprocessor is None + + def test_single_step_pipeline_returns_final_step_only(self): + """A one-step Pipeline yields its final estimator and no preprocessor.""" + final = LogisticRegression() + pipe = Pipeline([("clf", final)]) + estimator, preprocessor = unwrap_model(pipe) + assert estimator is final + assert preprocessor is None + + def test_multi_step_pipeline_splits_preprocessor_from_estimator(self): + """A multi-step Pipeline yields the final step and a Pipeline of the rest.""" + scaler = StandardScaler() + final = LogisticRegression() + pipe = Pipeline([("scaler", scaler), ("clf", final)]) + + estimator, preprocessor = unwrap_model(pipe) + + assert estimator is final + assert isinstance(preprocessor, Pipeline) + assert [name for name, _ in preprocessor.steps] == ["scaler"] + assert preprocessor.steps[0][1] is scaler + + def test_multi_step_preprocessor_transforms_input(self): + """The returned preprocessor can transform inputs end-to-end.""" + rng = np.random.default_rng(0) + X = rng.normal(size=(20, 3)) + y = rng.integers(0, 2, size=20) + + pipe = Pipeline([("scaler", StandardScaler()), ("clf", LogisticRegression())]) + pipe.fit(X, y) + + _, preprocessor = unwrap_model(pipe) + transformed = preprocessor.transform(X) + + np.testing.assert_allclose(transformed.mean(axis=0), 0, atol=1e-8) + np.testing.assert_allclose(transformed.std(axis=0), 1, atol=1e-1) From a7e842fcae49b99584d7606b817ba68d946fe501 Mon Sep 17 00:00:00 2001 From: shamykyzer Date: Fri, 22 May 2026 13:55:50 +0300 Subject: [PATCH 09/15] revert: move unwrap_model to utils.py --- CHANGELOG.md | 4 -- sacroml/attacks/instance_based_attack.py | 27 ++++++++++- sacroml/attacks/utils.py | 33 -------------- tests/attacks/test_utils.py | 58 ------------------------ 4 files changed, 25 insertions(+), 97 deletions(-) delete mode 100644 tests/attacks/test_utils.py diff --git a/CHANGELOG.md b/CHANGELOG.md index bc8d893f..10e5f310 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,10 +10,6 @@ Changes: membership thresholds. A sample is predicted as a member when its observed score exceeds the predicted threshold at quantile level (1 - alpha). No shadow models or architecture knowledge required. Registered in the attack factory as `"qmia"`. -* Refactor: move `unwrap_model` from `InstanceBasedAttack` to `sacroml.attacks.utils` - so it can be reused by other attacks that need to split a scikit-learn `Pipeline` - into its final estimator and preprocessing stages - ([#455](https://github.com/AI-SDC/SACRO-ML/issues/455)). ## Version 1.4.3 (Jan 29, 2026) diff --git a/sacroml/attacks/instance_based_attack.py b/sacroml/attacks/instance_based_attack.py index 6b861230..f55d529a 100644 --- a/sacroml/attacks/instance_based_attack.py +++ b/sacroml/attacks/instance_based_attack.py @@ -18,13 +18,18 @@ import numpy as np from fpdf import FPDF +from sklearn.base import BaseEstimator from sklearn.neighbors import KNeighborsClassifier, KNeighborsRegressor from sklearn.svm import SVC, SVR, NuSVC, NuSVR, OneClassSVM +try: + from sklearn.pipeline import Pipeline +except ImportError: # pragma: no cover + Pipeline = None + from sacroml.attacks import report from sacroml.attacks.attack import Attack from sacroml.attacks.target import Target -from sacroml.attacks.utils import unwrap_model logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -163,6 +168,24 @@ def attackable(cls, target: Target) -> bool: return False return True + @staticmethod + def _unwrap_model(model: BaseEstimator) -> tuple[BaseEstimator, Pipeline | None]: + """Extract the final estimator and preprocessor from a Pipeline. + + Returns + ------- + tuple + (final_estimator, preprocessor_pipeline_or_None) + If the model is a Pipeline with preprocessing steps, returns + a Pipeline of just the preprocessing steps so X_train can be + transformed to the same space as the stored instances. + """ + if Pipeline is not None and isinstance(model, Pipeline): + final_estimator = model.steps[-1][1] + preprocessor = Pipeline(model.steps[:-1]) if len(model.steps) > 1 else None + return final_estimator, preprocessor + return model, None + def _compare_instances( self, stored_instances: np.ndarray, @@ -280,7 +303,7 @@ def _attack(self, target: Target) -> dict: dict Attack report dictionary. """ - raw_model, preprocessor = unwrap_model(target.model.model) + raw_model, preprocessor = self._unwrap_model(target.model.model) model_type = type(raw_model).__name__ is_svm = isinstance(raw_model, SVM_TYPES) diff --git a/sacroml/attacks/utils.py b/sacroml/attacks/utils.py index 79301267..9543f1bc 100644 --- a/sacroml/attacks/utils.py +++ b/sacroml/attacks/utils.py @@ -10,7 +10,6 @@ import numpy as np from scipy.stats import shapiro from sklearn.base import BaseEstimator -from sklearn.pipeline import Pipeline from sacroml.attacks.model import Model from sacroml.attacks.target import Target @@ -279,35 +278,3 @@ def get_class_by_name(class_path: str) -> type[object]: module_path, class_name = class_path.rsplit(".", 1) module = importlib.import_module(module_path) return getattr(module, class_name) - - -def unwrap_model( - model: BaseEstimator, -) -> tuple[BaseEstimator, Pipeline | None]: - """Extract the final estimator and preprocessor from a scikit-learn model. - - If ``model`` is a :class:`sklearn.pipeline.Pipeline`, the final step is - returned as the estimator and a new ``Pipeline`` containing the remaining - earlier steps is returned as the preprocessor. This allows callers to - transform inputs into the same feature space the final estimator was - fitted on. If the Pipeline has only one step, no preprocessor exists and - ``None`` is returned in its place. Non-Pipeline models are returned - unchanged with ``None`` as the preprocessor. - - Parameters - ---------- - model : BaseEstimator - A fitted scikit-learn estimator, optionally wrapped in a ``Pipeline``. - - Returns - ------- - tuple[BaseEstimator, Pipeline | None] - ``(final_estimator, preprocessor)`` where ``preprocessor`` is a - Pipeline of all steps except the last, or ``None`` if the input is - not a Pipeline or is a single-step Pipeline. - """ - if isinstance(model, Pipeline): - final_estimator = model.steps[-1][1] - preprocessor = Pipeline(model.steps[:-1]) if len(model.steps) > 1 else None - return final_estimator, preprocessor - return model, None diff --git a/tests/attacks/test_utils.py b/tests/attacks/test_utils.py deleted file mode 100644 index 8364cbba..00000000 --- a/tests/attacks/test_utils.py +++ /dev/null @@ -1,58 +0,0 @@ -"""Tests for sacroml.attacks.utils helper functions.""" - -from __future__ import annotations - -import numpy as np -from sklearn.linear_model import LogisticRegression -from sklearn.pipeline import Pipeline -from sklearn.preprocessing import StandardScaler -from sklearn.svm import SVC - -from sacroml.attacks.utils import unwrap_model - - -class TestUnwrapModel: - """Tests for ``unwrap_model``.""" - - def test_non_pipeline_returns_model_and_none(self): - """A plain estimator is returned unchanged with no preprocessor.""" - model = SVC(gamma=0.1) - estimator, preprocessor = unwrap_model(model) - assert estimator is model - assert preprocessor is None - - def test_single_step_pipeline_returns_final_step_only(self): - """A one-step Pipeline yields its final estimator and no preprocessor.""" - final = LogisticRegression() - pipe = Pipeline([("clf", final)]) - estimator, preprocessor = unwrap_model(pipe) - assert estimator is final - assert preprocessor is None - - def test_multi_step_pipeline_splits_preprocessor_from_estimator(self): - """A multi-step Pipeline yields the final step and a Pipeline of the rest.""" - scaler = StandardScaler() - final = LogisticRegression() - pipe = Pipeline([("scaler", scaler), ("clf", final)]) - - estimator, preprocessor = unwrap_model(pipe) - - assert estimator is final - assert isinstance(preprocessor, Pipeline) - assert [name for name, _ in preprocessor.steps] == ["scaler"] - assert preprocessor.steps[0][1] is scaler - - def test_multi_step_preprocessor_transforms_input(self): - """The returned preprocessor can transform inputs end-to-end.""" - rng = np.random.default_rng(0) - X = rng.normal(size=(20, 3)) - y = rng.integers(0, 2, size=20) - - pipe = Pipeline([("scaler", StandardScaler()), ("clf", LogisticRegression())]) - pipe.fit(X, y) - - _, preprocessor = unwrap_model(pipe) - transformed = preprocessor.transform(X) - - np.testing.assert_allclose(transformed.mean(axis=0), 0, atol=1e-8) - np.testing.assert_allclose(transformed.std(axis=0), 1, atol=1e-1) From 1e869be65f12e9d244ef21fe7ad6c8eef8c981c5 Mon Sep 17 00:00:00 2001 From: shamykyzer Date: Mon, 25 May 2026 11:35:22 +0300 Subject: [PATCH 10/15] test: cover graceful-degradation paths in InstanceBasedAttack --- tests/attacks/test_instance_based_attack.py | 90 +++++++++++++++++++++ 1 file changed, 90 insertions(+) diff --git a/tests/attacks/test_instance_based_attack.py b/tests/attacks/test_instance_based_attack.py index 4aeb27f6..3fa0a1c0 100644 --- a/tests/attacks/test_instance_based_attack.py +++ b/tests/attacks/test_instance_based_attack.py @@ -1,7 +1,9 @@ """Tests for InstanceBasedAttack.""" +import logging import os +import numpy as np import pytest from sklearn.datasets import make_moons, make_regression from sklearn.model_selection import train_test_split @@ -369,3 +371,91 @@ def test_empty_target_returns_empty(self): ) output = attack.attack(target) assert output == {} + + +class TestGracefulDegradation: + """Tests for defensive / graceful-degradation paths.""" + + def test_unfitted_knn_warns_and_returns_zero(self, caplog): + """Unfitted kNN: no _fit_X attribute triggers a warning and zero stored.""" + X_train = np.array([[0.0, 0.0], [1.0, 1.0]]) + y_train = np.array([0, 1]) + unfitted = KNeighborsClassifier(n_neighbors=1) + target = Target( + model=unfitted, + X_train=X_train, + y_train=y_train, + X_test=X_train, + y_test=y_train, + ) + attack = InstanceBasedAttack( + output_dir="outputs_instance_based", write_report=False + ) + caplog.set_level( + logging.WARNING, logger="sacroml.attacks.instance_based_attack" + ) + output = attack.attack(target) + + instance = output["attack_experiment_logger"]["attack_instance_logger"][ + "instance_0" + ] + assert instance["model_type"] == "KNeighborsClassifier" + assert instance["n_stored_instances"] == 0 + assert instance["data_leakage_confirmed"] is False + assert any("_fit_X" in rec.message for rec in caplog.records) + + def test_feature_dim_mismatch_warns_and_returns(self, caplog): + """Stored support vectors and X_train with different feature counts.""" + X_fit = np.array([[0.0, 0.0], [1.0, 1.0], [0.0, 1.0], [1.0, 0.0]]) + y_fit = np.array([0, 1, 0, 1]) + model = SVC(gamma=0.1).fit(X_fit, y_fit) + X_train_mismatch = np.array([[0.0, 0.0, 0.0], [1.0, 1.0, 1.0]]) + y_train = np.array([0, 1]) + target = Target( + model=model, + X_train=X_train_mismatch, + y_train=y_train, + X_test=X_train_mismatch, + y_test=y_train, + ) + attack = InstanceBasedAttack( + output_dir="outputs_instance_based", write_report=False + ) + caplog.set_level( + logging.WARNING, logger="sacroml.attacks.instance_based_attack" + ) + output = attack.attack(target) + + instance = output["attack_experiment_logger"]["attack_instance_logger"][ + "instance_0" + ] + assert instance["n_matched"] == 0 + assert instance["data_leakage_confirmed"] is False + assert instance["details"]["error"] == "Feature dimension mismatch" + assert any( + "Feature dimension mismatch" in rec.message for rec in caplog.records + ) + + def test_fallback_search_when_indices_wrong(self): + """Wrong support_ index hint forces the fallback brute-force loop.""" + X_train = np.array([[0.0, 0.0], [1.0, 1.0], [0.0, 1.0], [1.0, 0.0]]) + y_train = np.array([0, 1, 0, 1]) + model = SVC(gamma=0.1).fit(X_train, y_train) + model.support_ = np.zeros_like(model.support_) + target = Target( + model=model, + X_train=X_train, + y_train=y_train, + X_test=X_train, + y_test=y_train, + ) + attack = InstanceBasedAttack( + output_dir="outputs_instance_based", write_report=False + ) + output = attack.attack(target) + + instance = output["attack_experiment_logger"]["attack_instance_logger"][ + "instance_0" + ] + assert instance["data_leakage_confirmed"] is True + assert instance["n_matched"] > 0 From b014644a4affd67ddda1933fd07ff2c5462ccf0f Mon Sep 17 00:00:00 2001 From: shamykyzer Date: Mon, 25 May 2026 11:37:39 +0300 Subject: [PATCH 11/15] style: rename test variables to match ruff pep8-naming allowlist --- tests/attacks/test_instance_based_attack.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/tests/attacks/test_instance_based_attack.py b/tests/attacks/test_instance_based_attack.py index 3fa0a1c0..1c32d7b9 100644 --- a/tests/attacks/test_instance_based_attack.py +++ b/tests/attacks/test_instance_based_attack.py @@ -406,17 +406,15 @@ def test_unfitted_knn_warns_and_returns_zero(self, caplog): def test_feature_dim_mismatch_warns_and_returns(self, caplog): """Stored support vectors and X_train with different feature counts.""" - X_fit = np.array([[0.0, 0.0], [1.0, 1.0], [0.0, 1.0], [1.0, 0.0]]) - y_fit = np.array([0, 1, 0, 1]) - model = SVC(gamma=0.1).fit(X_fit, y_fit) - X_train_mismatch = np.array([[0.0, 0.0, 0.0], [1.0, 1.0, 1.0]]) - y_train = np.array([0, 1]) + X_train = np.array([[0.0, 0.0], [1.0, 1.0], [0.0, 1.0], [1.0, 0.0]]) + y_train = np.array([0, 1, 0, 1]) + model = SVC(gamma=0.1).fit(X_train, y_train) target = Target( model=model, - X_train=X_train_mismatch, - y_train=y_train, - X_test=X_train_mismatch, - y_test=y_train, + X_train=np.array([[0.0, 0.0, 0.0], [1.0, 1.0, 1.0]]), + y_train=np.array([0, 1]), + X_test=np.array([[0.0, 0.0, 0.0], [1.0, 1.0, 1.0]]), + y_test=np.array([0, 1]), ) attack = InstanceBasedAttack( output_dir="outputs_instance_based", write_report=False From 6c5294917aa65e39330b4d813aa870abb15aed7d Mon Sep 17 00:00:00 2001 From: Shamy <110725453+shamykyzer@users.noreply.github.com> Date: Tue, 26 May 2026 18:58:42 +0300 Subject: [PATCH 12/15] refactor: move unwrap_model to sacroml.attacks.utils for reuse (#459) --- CHANGELOG.md | 4 ++ sacroml/attacks/instance_based_attack.py | 27 +---------- sacroml/attacks/utils.py | 33 ++++++++++++++ tests/attacks/test_utils.py | 58 ++++++++++++++++++++++++ 4 files changed, 97 insertions(+), 25 deletions(-) create mode 100644 tests/attacks/test_utils.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 10e5f310..bc8d893f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,10 @@ Changes: membership thresholds. A sample is predicted as a member when its observed score exceeds the predicted threshold at quantile level (1 - alpha). No shadow models or architecture knowledge required. Registered in the attack factory as `"qmia"`. +* Refactor: move `unwrap_model` from `InstanceBasedAttack` to `sacroml.attacks.utils` + so it can be reused by other attacks that need to split a scikit-learn `Pipeline` + into its final estimator and preprocessing stages + ([#455](https://github.com/AI-SDC/SACRO-ML/issues/455)). ## Version 1.4.3 (Jan 29, 2026) diff --git a/sacroml/attacks/instance_based_attack.py b/sacroml/attacks/instance_based_attack.py index f55d529a..6b861230 100644 --- a/sacroml/attacks/instance_based_attack.py +++ b/sacroml/attacks/instance_based_attack.py @@ -18,18 +18,13 @@ import numpy as np from fpdf import FPDF -from sklearn.base import BaseEstimator from sklearn.neighbors import KNeighborsClassifier, KNeighborsRegressor from sklearn.svm import SVC, SVR, NuSVC, NuSVR, OneClassSVM -try: - from sklearn.pipeline import Pipeline -except ImportError: # pragma: no cover - Pipeline = None - from sacroml.attacks import report from sacroml.attacks.attack import Attack from sacroml.attacks.target import Target +from sacroml.attacks.utils import unwrap_model logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -168,24 +163,6 @@ def attackable(cls, target: Target) -> bool: return False return True - @staticmethod - def _unwrap_model(model: BaseEstimator) -> tuple[BaseEstimator, Pipeline | None]: - """Extract the final estimator and preprocessor from a Pipeline. - - Returns - ------- - tuple - (final_estimator, preprocessor_pipeline_or_None) - If the model is a Pipeline with preprocessing steps, returns - a Pipeline of just the preprocessing steps so X_train can be - transformed to the same space as the stored instances. - """ - if Pipeline is not None and isinstance(model, Pipeline): - final_estimator = model.steps[-1][1] - preprocessor = Pipeline(model.steps[:-1]) if len(model.steps) > 1 else None - return final_estimator, preprocessor - return model, None - def _compare_instances( self, stored_instances: np.ndarray, @@ -303,7 +280,7 @@ def _attack(self, target: Target) -> dict: dict Attack report dictionary. """ - raw_model, preprocessor = self._unwrap_model(target.model.model) + raw_model, preprocessor = unwrap_model(target.model.model) model_type = type(raw_model).__name__ is_svm = isinstance(raw_model, SVM_TYPES) diff --git a/sacroml/attacks/utils.py b/sacroml/attacks/utils.py index 9543f1bc..79301267 100644 --- a/sacroml/attacks/utils.py +++ b/sacroml/attacks/utils.py @@ -10,6 +10,7 @@ import numpy as np from scipy.stats import shapiro from sklearn.base import BaseEstimator +from sklearn.pipeline import Pipeline from sacroml.attacks.model import Model from sacroml.attacks.target import Target @@ -278,3 +279,35 @@ def get_class_by_name(class_path: str) -> type[object]: module_path, class_name = class_path.rsplit(".", 1) module = importlib.import_module(module_path) return getattr(module, class_name) + + +def unwrap_model( + model: BaseEstimator, +) -> tuple[BaseEstimator, Pipeline | None]: + """Extract the final estimator and preprocessor from a scikit-learn model. + + If ``model`` is a :class:`sklearn.pipeline.Pipeline`, the final step is + returned as the estimator and a new ``Pipeline`` containing the remaining + earlier steps is returned as the preprocessor. This allows callers to + transform inputs into the same feature space the final estimator was + fitted on. If the Pipeline has only one step, no preprocessor exists and + ``None`` is returned in its place. Non-Pipeline models are returned + unchanged with ``None`` as the preprocessor. + + Parameters + ---------- + model : BaseEstimator + A fitted scikit-learn estimator, optionally wrapped in a ``Pipeline``. + + Returns + ------- + tuple[BaseEstimator, Pipeline | None] + ``(final_estimator, preprocessor)`` where ``preprocessor`` is a + Pipeline of all steps except the last, or ``None`` if the input is + not a Pipeline or is a single-step Pipeline. + """ + if isinstance(model, Pipeline): + final_estimator = model.steps[-1][1] + preprocessor = Pipeline(model.steps[:-1]) if len(model.steps) > 1 else None + return final_estimator, preprocessor + return model, None diff --git a/tests/attacks/test_utils.py b/tests/attacks/test_utils.py new file mode 100644 index 00000000..8364cbba --- /dev/null +++ b/tests/attacks/test_utils.py @@ -0,0 +1,58 @@ +"""Tests for sacroml.attacks.utils helper functions.""" + +from __future__ import annotations + +import numpy as np +from sklearn.linear_model import LogisticRegression +from sklearn.pipeline import Pipeline +from sklearn.preprocessing import StandardScaler +from sklearn.svm import SVC + +from sacroml.attacks.utils import unwrap_model + + +class TestUnwrapModel: + """Tests for ``unwrap_model``.""" + + def test_non_pipeline_returns_model_and_none(self): + """A plain estimator is returned unchanged with no preprocessor.""" + model = SVC(gamma=0.1) + estimator, preprocessor = unwrap_model(model) + assert estimator is model + assert preprocessor is None + + def test_single_step_pipeline_returns_final_step_only(self): + """A one-step Pipeline yields its final estimator and no preprocessor.""" + final = LogisticRegression() + pipe = Pipeline([("clf", final)]) + estimator, preprocessor = unwrap_model(pipe) + assert estimator is final + assert preprocessor is None + + def test_multi_step_pipeline_splits_preprocessor_from_estimator(self): + """A multi-step Pipeline yields the final step and a Pipeline of the rest.""" + scaler = StandardScaler() + final = LogisticRegression() + pipe = Pipeline([("scaler", scaler), ("clf", final)]) + + estimator, preprocessor = unwrap_model(pipe) + + assert estimator is final + assert isinstance(preprocessor, Pipeline) + assert [name for name, _ in preprocessor.steps] == ["scaler"] + assert preprocessor.steps[0][1] is scaler + + def test_multi_step_preprocessor_transforms_input(self): + """The returned preprocessor can transform inputs end-to-end.""" + rng = np.random.default_rng(0) + X = rng.normal(size=(20, 3)) + y = rng.integers(0, 2, size=20) + + pipe = Pipeline([("scaler", StandardScaler()), ("clf", LogisticRegression())]) + pipe.fit(X, y) + + _, preprocessor = unwrap_model(pipe) + transformed = preprocessor.transform(X) + + np.testing.assert_allclose(transformed.mean(axis=0), 0, atol=1e-8) + np.testing.assert_allclose(transformed.std(axis=0), 1, atol=1e-1) From f02fe5b62dbbc238e67b4f4496111eaad35aa6dd Mon Sep 17 00:00:00 2001 From: shamykyzer Date: Fri, 29 May 2026 13:47:38 +0100 Subject: [PATCH 13/15] feat: address review feedback on InstanceBasedAttack - add report_individual option, gated like StructuralAttack so the per-record block only appears under the 'individual' key when set - record all matched instances (n_examples now limits PDF display only) - replace bespoke example_matches with an InstanceBasedRecordLevelResults dataclass of parallel lists, consistent with other attacks - give InstanceBasedAttackResults field defaults to trim the graceful-degradation construction sites --- sacroml/attacks/instance_based_attack.py | 156 +++++++++++--------- tests/attacks/test_instance_based_attack.py | 58 ++++++-- 2 files changed, 126 insertions(+), 88 deletions(-) diff --git a/sacroml/attacks/instance_based_attack.py b/sacroml/attacks/instance_based_attack.py index 6b861230..d3709106 100644 --- a/sacroml/attacks/instance_based_attack.py +++ b/sacroml/attacks/instance_based_attack.py @@ -14,7 +14,7 @@ from __future__ import annotations import logging -from dataclasses import asdict, dataclass +from dataclasses import asdict, dataclass, field import numpy as np from fpdf import FPDF @@ -95,20 +95,36 @@ class InstanceBasedAttackResults: """Results of an instance-based model attack.""" model_type: str - is_instance_based: bool - is_dp_safe: bool - n_stored_instances: int - n_training_samples: int - storage_fraction: float - n_matched: int - n_checked: int - match_fraction: float - example_matches: list[dict] - data_leakage_confirmed: bool - mitigations: list[str] + is_instance_based: bool = False + is_dp_safe: bool = False + n_stored_instances: int = 0 + n_training_samples: int = 0 + storage_fraction: float = 0.0 + n_matched: int = 0 + n_checked: int = 0 + match_fraction: float = 0.0 + data_leakage_confirmed: bool = False + mitigations: list[str] = field(default_factory=list) details: dict | None = None +@dataclass +class InstanceBasedRecordLevelResults: + """Per-stored-instance match outcomes for an instance-based attack. + + Each list is indexed by stored instance (an SVM support vector or a + kNN neighbour) and holds one entry per stored instance, matched or + not. Stored as parallel lists, consistent with how other attacks + expose per-record results, so it can be emitted under the report's + ``individual`` key when ``report_individual`` is set. + """ + + stored_index: list[int] + training_index: list[int] # index into X_train, or -1 if no match + matched: list[bool] + stored_values: list[list[float]] # first N_FEATURE_PREVIEW feature values + + class InstanceBasedAttack(Attack): """Detect training data stored in instance-based model parameters. @@ -124,6 +140,7 @@ def __init__( write_report: bool = True, n_examples: int = N_EXAMPLES, atol: float = INSTANCE_MATCH_ATOL, + report_individual: bool = False, ) -> None: """Construct an instance-based model attack. @@ -134,15 +151,21 @@ def __init__( write_report : bool Whether to generate a JSON and PDF report. n_examples : int - Maximum number of matching examples to include in the report. + Maximum number of matching examples to show in the PDF report. + Does not limit how many matches are recorded; all matches are + kept in the per-record results. atol : float Absolute tolerance for floating-point comparison when matching stored instances to training data. + report_individual : bool + Whether to report metrics for each individual record. """ super().__init__(output_dir=output_dir, write_report=write_report) self.n_examples = n_examples self.atol = atol + self.report_individual: bool = report_individual self.results: InstanceBasedAttackResults | None = None + self.record_level_results: InstanceBasedRecordLevelResults | None = None def __str__(self) -> str: """Return the name of the attack.""" @@ -168,7 +191,7 @@ def _compare_instances( stored_instances: np.ndarray, stored_indices: np.ndarray | None, X_train: np.ndarray, - ) -> tuple[int, list[dict]]: + ) -> tuple[int, InstanceBasedRecordLevelResults]: """Compare stored model instances against training data. Parameters @@ -184,15 +207,21 @@ def _compare_instances( ------- n_matched : int Number of stored instances that match training data. - example_matches : list[dict] - Details of the first n_examples matches. + record_level_results : InstanceBasedRecordLevelResults + Per-stored-instance outcomes, one entry per stored instance + (matched or not). The matched subset is recoverable via the + ``matched`` flags; nothing is truncated here, callers slice + for display. """ n_matched = 0 - example_matches: list[dict] = [] + stored_index: list[int] = [] + training_index: list[int] = [] + matched_flags: list[bool] = [] + stored_values: list[list[float]] = [] for i, stored_row in enumerate(stored_instances): matched = False - match_index = None + match_index = -1 # Try index-based direct comparison first if stored_indices is not None and i < len(stored_indices): @@ -213,22 +242,20 @@ def _compare_instances( if matched: n_matched += 1 - if len(example_matches) < self.n_examples: - n_preview = min(N_FEATURE_PREVIEW, stored_row.shape[0]) - example_matches.append( - { - "stored_index": i, - "training_index": match_index, - "stored_values": stored_row[:n_preview].tolist(), - "training_values": ( - X_train[match_index][:n_preview].tolist() - if match_index is not None - else None - ), - } - ) - - return n_matched, example_matches + + n_preview = min(N_FEATURE_PREVIEW, stored_row.shape[0]) + stored_index.append(i) + training_index.append(match_index) + matched_flags.append(matched) + stored_values.append(stored_row[:n_preview].tolist()) + + record_level_results = InstanceBasedRecordLevelResults( + stored_index=stored_index, + training_index=training_index, + matched=matched_flags, + stored_values=stored_values, + ) + return n_matched, record_level_results def _build_mitigations( self, is_svm: bool, is_knn: bool, is_dp_safe: bool @@ -307,17 +334,7 @@ def _attack(self, target: Target) -> dict: ) self.results = InstanceBasedAttackResults( model_type=model_type, - is_instance_based=False, - is_dp_safe=False, - n_stored_instances=0, n_training_samples=n_training, - storage_fraction=0.0, - n_matched=0, - n_checked=0, - match_fraction=0.0, - example_matches=[], - data_leakage_confirmed=False, - mitigations=[], ) output = self._make_report(target) self._write_report(output) @@ -354,14 +371,7 @@ def _attack(self, target: Target) -> dict: model_type=model_type, is_instance_based=True, is_dp_safe=is_dp_safe, - n_stored_instances=0, n_training_samples=n_training, - storage_fraction=0.0, - n_matched=0, - n_checked=0, - match_fraction=0.0, - example_matches=[], - data_leakage_confirmed=False, mitigations=self._build_mitigations(is_svm, is_knn, is_dp_safe), ) output = self._make_report(target) @@ -385,11 +395,6 @@ def _attack(self, target: Target) -> dict: n_stored_instances=n_stored, n_training_samples=n_training, storage_fraction=n_stored / n_training if n_training > 0 else 0.0, - n_matched=0, - n_checked=0, - match_fraction=0.0, - example_matches=[], - data_leakage_confirmed=False, mitigations=self._build_mitigations(is_svm, is_knn, is_dp_safe), details={"error": "Feature dimension mismatch"}, ) @@ -398,7 +403,7 @@ def _attack(self, target: Target) -> dict: return output # Compare stored instances to training data - n_matched, example_matches = self._compare_instances( + n_matched, self.record_level_results = self._compare_instances( stored_instances, stored_indices, X_train ) @@ -418,7 +423,6 @@ def _attack(self, target: Target) -> dict: n_matched=n_matched, n_checked=n_stored, match_fraction=match_fraction, - example_matches=example_matches, data_leakage_confirmed=data_leakage_confirmed, mitigations=mitigations, ) @@ -447,12 +451,12 @@ def _get_attack_metrics_instances(self) -> dict: """Return attack metrics for the report structure.""" attack_metrics_experiment = {} if self.results: - attack_metrics_instances = { - "instance_0": asdict(self.results), + instance_0 = asdict(self.results) + if self.report_individual and self.record_level_results is not None: + instance_0["individual"] = asdict(self.record_level_results) + attack_metrics_experiment["attack_instance_logger"] = { + "instance_0": instance_0, } - attack_metrics_experiment["attack_instance_logger"] = ( - attack_metrics_instances - ) return attack_metrics_experiment def _make_pdf(self, output: dict) -> FPDF: @@ -500,23 +504,29 @@ def _make_pdf(self, output: dict) -> FPDF: value = metrics.get(key, "N/A") report.line(pdf, f"{key:>30s}: {str(value):30s}", font="courier") - # Example matches - example_matches = instance_data.get("example_matches", []) - if example_matches: + # Example matches: slice the first n_examples matched records from + # the full per-record results for display only. + rlr = self.record_level_results + if rlr is not None and any(rlr.matched): + matched_positions = [ + k for k, is_match in enumerate(rlr.matched) if is_match + ] + shown = matched_positions[: self.n_examples] pdf.add_page() report.title(pdf, "Example Matches") report.line( pdf, - f"Showing {len(example_matches)} example(s) of training " - "data found stored in the model (first 5 feature values):", + f"Showing {len(shown)} of {len(matched_positions)} matched " + f"instance(s) found stored in the model (first " + f"{N_FEATURE_PREVIEW} feature values):", ) - for i, match in enumerate(example_matches): + for display_i, k in enumerate(shown): report.line( pdf, - f" Match {i + 1}: " - f"stored[{match.get('stored_index', '?')}] " - f"= train[{match.get('training_index', '?')}] " - f"values: {match.get('stored_values', [])}", + f" Match {display_i + 1}: " + f"stored[{rlr.stored_index[k]}] " + f"= train[{rlr.training_index[k]}] " + f"values: {rlr.stored_values[k]}", font="courier", font_size=9, ) diff --git a/tests/attacks/test_instance_based_attack.py b/tests/attacks/test_instance_based_attack.py index 1c32d7b9..7da49907 100644 --- a/tests/attacks/test_instance_based_attack.py +++ b/tests/attacks/test_instance_based_attack.py @@ -259,20 +259,25 @@ def test_pipeline_svc(self): class TestConfiguration: """Tests for attack configuration and parameters.""" - def test_n_examples_limit(self): - """Test example matches are capped at n_examples.""" + def test_n_examples_does_not_cap_recorded_matches(self): + """n_examples limits report display only, not recorded matches.""" target = _make_target_clf(SVC(gamma=0.1)) attack = InstanceBasedAttack( output_dir="outputs_instance_based", write_report=False, - n_examples=3, + n_examples=2, + report_individual=True, ) output = attack.attack(target) instance = output["attack_experiment_logger"]["attack_instance_logger"][ "instance_0" ] - assert len(instance["example_matches"]) <= 3 + individual = instance["individual"] + # Every stored instance is recorded despite the small n_examples. + assert len(individual["matched"]) == instance["n_stored_instances"] + assert instance["n_stored_instances"] > attack.n_examples + assert sum(individual["matched"]) == instance["n_matched"] def test_str_representation(self): """Test __str__ returns the attack name.""" @@ -293,6 +298,7 @@ def test_get_params(self): assert params["n_examples"] == 5 assert params["atol"] == 1e-6 assert params["output_dir"] == "outputs_instance_based" + assert params["report_individual"] is False def test_default_atol_is_module_constant(self): """Default atol matches INSTANCE_MATCH_ATOL, see issue #454.""" @@ -342,8 +348,37 @@ def test_report_files_created(self): assert os.path.exists(os.path.join("outputs_instance_based", "report.json")) assert os.path.exists(os.path.join("outputs_instance_based", "report.pdf")) - def test_example_match_structure(self): - """Test example matches contain expected fields.""" + def test_record_level_individual_structure(self): + """Per-record individual block has parallel lists of equal length.""" + target = _make_target_clf(SVC(gamma=0.1)) + attack = InstanceBasedAttack( + output_dir="outputs_instance_based", + write_report=False, + report_individual=True, + ) + output = attack.attack(target) + + instance = output["attack_experiment_logger"]["attack_instance_logger"][ + "instance_0" + ] + individual = instance["individual"] + for key in ("stored_index", "training_index", "matched", "stored_values"): + assert key in individual + + n_stored = instance["n_stored_instances"] + assert len(individual["stored_index"]) == n_stored + assert len(individual["training_index"]) == n_stored + assert len(individual["matched"]) == n_stored + assert len(individual["stored_values"]) == n_stored + assert isinstance(individual["stored_values"][0], list) + # A non-negative training index appears exactly when matched is True. + for train_idx, is_match in zip( + individual["training_index"], individual["matched"], strict=True + ): + assert (train_idx >= 0) == is_match + + def test_report_individual_off_by_default(self): + """Without report_individual, no per-record block is emitted.""" target = _make_target_clf(SVC(gamma=0.1)) attack = InstanceBasedAttack( output_dir="outputs_instance_based", write_report=False @@ -353,15 +388,8 @@ def test_example_match_structure(self): instance = output["attack_experiment_logger"]["attack_instance_logger"][ "instance_0" ] - matches = instance["example_matches"] - assert len(matches) > 0 - - match = matches[0] - assert "stored_index" in match - assert "training_index" in match - assert "stored_values" in match - assert "training_values" in match - assert isinstance(match["stored_values"], list) + assert "individual" not in instance + assert "example_matches" not in instance def test_empty_target_returns_empty(self): """Test attack on empty target returns empty dict.""" From ca71239d56e61d101399ed4203c54a5fe8d0bdc7 Mon Sep 17 00:00:00 2001 From: shamykyzer Date: Fri, 29 May 2026 14:03:09 +0100 Subject: [PATCH 14/15] style: satisfy pydocstringformatter on test docstring --- tests/attacks/test_instance_based_attack.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/attacks/test_instance_based_attack.py b/tests/attacks/test_instance_based_attack.py index 7da49907..919e87c7 100644 --- a/tests/attacks/test_instance_based_attack.py +++ b/tests/attacks/test_instance_based_attack.py @@ -260,7 +260,7 @@ class TestConfiguration: """Tests for attack configuration and parameters.""" def test_n_examples_does_not_cap_recorded_matches(self): - """n_examples limits report display only, not recorded matches.""" + """Check that n_examples limits report display only, not recorded matches.""" target = _make_target_clf(SVC(gamma=0.1)) attack = InstanceBasedAttack( output_dir="outputs_instance_based", From 048f4992bbecef42cb9b52b5be75de5b0b5e64d8 Mon Sep 17 00:00:00 2001 From: JessUWE Date: Tue, 16 Jun 2026 12:47:32 +0100 Subject: [PATCH 15/15] fix: reindex record-level results by training record in InstanceBasedAttack --- sacroml/attacks/instance_based_attack.py | 74 +++++++-------------- tests/attacks/test_instance_based_attack.py | 31 ++++----- 2 files changed, 35 insertions(+), 70 deletions(-) diff --git a/sacroml/attacks/instance_based_attack.py b/sacroml/attacks/instance_based_attack.py index d3709106..0e16b03b 100644 --- a/sacroml/attacks/instance_based_attack.py +++ b/sacroml/attacks/instance_based_attack.py @@ -33,7 +33,6 @@ KNN_TYPES = (KNeighborsClassifier, KNeighborsRegressor) N_EXAMPLES = 10 # default number of matching examples included in the report -N_FEATURE_PREVIEW = 10 # number of feature values shown per example match INSTANCE_MATCH_ATOL: float = 1e-8 """Absolute tolerance for matching stored instances to training rows. @@ -110,19 +109,14 @@ class InstanceBasedAttackResults: @dataclass class InstanceBasedRecordLevelResults: - """Per-stored-instance match outcomes for an instance-based attack. + """Per-training-record outcomes for an instance-based attack. - Each list is indexed by stored instance (an SVM support vector or a - kNN neighbour) and holds one entry per stored instance, matched or - not. Stored as parallel lists, consistent with how other attacks - expose per-record results, so it can be emitted under the report's - ``individual`` key when ``report_individual`` is set. + Indexed by training record (length == n_training_samples), consistent + with StructuralAttack's individual block. A value of 1 means that + training record is stored verbatim inside the model; 0 means it is not. """ - stored_index: list[int] - training_index: list[int] # index into X_train, or -1 if no match - matched: list[bool] - stored_values: list[list[float]] # first N_FEATURE_PREVIEW feature values + individual_risk: list[int] # 1 if training record is stored in model, else 0 class InstanceBasedAttack(Attack): @@ -206,21 +200,14 @@ def _compare_instances( Returns ------- n_matched : int - Number of stored instances that match training data. + Number of stored instances that match a training record. record_level_results : InstanceBasedRecordLevelResults - Per-stored-instance outcomes, one entry per stored instance - (matched or not). The matched subset is recoverable via the - ``matched`` flags; nothing is truncated here, callers slice - for display. + One entry per training record: 1 if that record is stored in + the model, 0 otherwise. """ - n_matched = 0 - stored_index: list[int] = [] - training_index: list[int] = [] - matched_flags: list[bool] = [] - stored_values: list[list[float]] = [] + individual_risk = np.zeros(len(X_train), dtype=int) for i, stored_row in enumerate(stored_instances): - matched = False match_index = -1 # Try index-based direct comparison first @@ -229,31 +216,21 @@ def _compare_instances( if 0 <= idx < len(X_train) and np.allclose( stored_row, X_train[idx], atol=self.atol ): - matched = True match_index = idx # Fallback: search through training data - if not matched: + if match_index == -1: for j in range(len(X_train)): if np.allclose(stored_row, X_train[j], atol=self.atol): - matched = True match_index = j break - if matched: - n_matched += 1 - - n_preview = min(N_FEATURE_PREVIEW, stored_row.shape[0]) - stored_index.append(i) - training_index.append(match_index) - matched_flags.append(matched) - stored_values.append(stored_row[:n_preview].tolist()) + if match_index != -1: + individual_risk[match_index] = 1 + n_matched = int(individual_risk.sum()) record_level_results = InstanceBasedRecordLevelResults( - stored_index=stored_index, - training_index=training_index, - matched=matched_flags, - stored_values=stored_values, + individual_risk=individual_risk.tolist() ) return n_matched, record_level_results @@ -504,29 +481,24 @@ def _make_pdf(self, output: dict) -> FPDF: value = metrics.get(key, "N/A") report.line(pdf, f"{key:>30s}: {str(value):30s}", font="courier") - # Example matches: slice the first n_examples matched records from - # the full per-record results for display only. + # Example matches: show first n_examples training indices flagged as stored. rlr = self.record_level_results - if rlr is not None and any(rlr.matched): - matched_positions = [ - k for k, is_match in enumerate(rlr.matched) if is_match + if rlr is not None and any(rlr.individual_risk): + matched_train_indices = [ + i for i, risk in enumerate(rlr.individual_risk) if risk ] - shown = matched_positions[: self.n_examples] + shown = matched_train_indices[: self.n_examples] pdf.add_page() report.title(pdf, "Example Matches") report.line( pdf, - f"Showing {len(shown)} of {len(matched_positions)} matched " - f"instance(s) found stored in the model (first " - f"{N_FEATURE_PREVIEW} feature values):", + f"Showing {len(shown)} of {len(matched_train_indices)} training " + f"record(s) found stored verbatim in the model:", ) - for display_i, k in enumerate(shown): + for display_i, train_idx in enumerate(shown): report.line( pdf, - f" Match {display_i + 1}: " - f"stored[{rlr.stored_index[k]}] " - f"= train[{rlr.training_index[k]}] " - f"values: {rlr.stored_values[k]}", + f" Match {display_i + 1}: train[{train_idx}]", font="courier", font_size=9, ) diff --git a/tests/attacks/test_instance_based_attack.py b/tests/attacks/test_instance_based_attack.py index 919e87c7..8f787782 100644 --- a/tests/attacks/test_instance_based_attack.py +++ b/tests/attacks/test_instance_based_attack.py @@ -274,10 +274,9 @@ def test_n_examples_does_not_cap_recorded_matches(self): "instance_0" ] individual = instance["individual"] - # Every stored instance is recorded despite the small n_examples. - assert len(individual["matched"]) == instance["n_stored_instances"] - assert instance["n_stored_instances"] > attack.n_examples - assert sum(individual["matched"]) == instance["n_matched"] + # individual_risk has one entry per training record. + assert len(individual["individual_risk"]) == instance["n_training_samples"] + assert sum(individual["individual_risk"]) == instance["n_matched"] def test_str_representation(self): """Test __str__ returns the attack name.""" @@ -349,7 +348,7 @@ def test_report_files_created(self): assert os.path.exists(os.path.join("outputs_instance_based", "report.pdf")) def test_record_level_individual_structure(self): - """Per-record individual block has parallel lists of equal length.""" + """Per-record individual block has one entry per training record.""" target = _make_target_clf(SVC(gamma=0.1)) attack = InstanceBasedAttack( output_dir="outputs_instance_based", @@ -362,20 +361,14 @@ def test_record_level_individual_structure(self): "instance_0" ] individual = instance["individual"] - for key in ("stored_index", "training_index", "matched", "stored_values"): - assert key in individual - - n_stored = instance["n_stored_instances"] - assert len(individual["stored_index"]) == n_stored - assert len(individual["training_index"]) == n_stored - assert len(individual["matched"]) == n_stored - assert len(individual["stored_values"]) == n_stored - assert isinstance(individual["stored_values"][0], list) - # A non-negative training index appears exactly when matched is True. - for train_idx, is_match in zip( - individual["training_index"], individual["matched"], strict=True - ): - assert (train_idx >= 0) == is_match + assert "individual_risk" in individual + + n_train = instance["n_training_samples"] + assert len(individual["individual_risk"]) == n_train + # Values are 0 or 1. + assert all(v in (0, 1) for v in individual["individual_risk"]) + # Count of stored training records matches n_matched. + assert sum(individual["individual_risk"]) == instance["n_matched"] def test_report_individual_off_by_default(self): """Without report_individual, no per-record block is emitted."""