diff --git a/Adversarial_Observation/Attacks.py b/Adversarial_Observation/Attacks.py index 8b24667..d10df9b 100755 --- a/Adversarial_Observation/Attacks.py +++ b/Adversarial_Observation/Attacks.py @@ -2,91 +2,225 @@ import torch import torch.nn.functional as F import matplotlib.pyplot as plt -import logging -# Set up logging -logging.basicConfig(level=logging.INFO) - -def fgsm_attack(input_batch_data: torch.Tensor, model: torch.nn.Module, input_shape: tuple, epsilon: float) -> torch.Tensor: +def fgsm_attack(input_batch_data: torch.tensor, model: torch.nn.Module, input_shape: tuple, epsilon: float) -> torch.Tensor: """ Apply the FGSM attack to input images given a pre-trained PyTorch model. + + Args: + input_batch_data (ndarray): Batch of input images as a 4D numpy array. + model (nn.Module): Pre-trained PyTorch model to be attacked. + input_shape (tuple): Shape of the input array. + epsilon (float): Magnitude of the perturbation for the attack. + + Returns: + adversarial_batch_data (ndarray): Adversarial images generated by the FGSM attack. """ + # Set the model to evaluation mode model.eval() + + # Check if GPU is available device = torch.device("cuda" if torch.cuda.is_available() else "cpu") - input_batch_data = input_batch_data.to(device) - adversarial_batch_data = [] + # Move the model and input batch data to the appropriate device + model.to(device) + input_batch_data = torch.tensor(input_batch_data).to(device) + + # Disable gradient computation for all model parameters + for param in model.parameters(): + param.requires_grad = False + adversarial_batch_data = [] for img in input_batch_data: - # Make a copy of the image and enable gradient tracking - img = img.clone().detach().unsqueeze(0).to(device) - img.requires_grad = True + # Convert the input image to a PyTorch tensor with dtype=torch.float32 and enable gradient computation + img = img.clone().detach().to(torch.float32).requires_grad_(True) - # Forward pass - preds = model(img) - target = torch.argmax(preds, dim=1) - loss = F.cross_entropy(preds, target) + # Move the input image tensor to the same device as the model + img = img.to(device) - # Backward pass + # Make a forward pass through the model and get the predicted class scores for the input image + preds = model(img.reshape(input_shape)) + + # Compute the loss by selecting the class with the highest predicted score + target = torch.argmax(preds) + loss = torch.nn.functional.cross_entropy(preds, target.unsqueeze(0)) + + # Compute gradients of the loss with respect to the input image pixels model.zero_grad() loss.backward() - # Generate perturbation - grad = img.grad.data - adversarial_img = img + epsilon * grad.sign() + # Calculate the sign of the gradients + gradient_sign = img.grad.sign() + + # Create the adversarial example by adding the signed gradients to the original image + adversarial_img = img + epsilon * gradient_sign + + # Clip the adversarial image to ensure pixel values are within the valid range adversarial_img = torch.clamp(adversarial_img, 0, 1) - adversarial_batch_data.append(adversarial_img.squeeze(0).detach()) + adversarial_batch_data.append(adversarial_img.cpu().detach().numpy()) - return torch.stack(adversarial_batch_data) + return adversarial_batch_data -def compute_gradients(model, img, target_class): - preds = model(img) - target_score = preds[0, target_class] - return torch.autograd.grad(target_score, img)[0] -def generate_adversarial_examples(input_batch_data, model, method='fgsm', **kwargs): - if method == 'fgsm': - return fgsm_attack(input_batch_data, model, **kwargs) - # Implement other attack methods as needed +def gradient_map(input_batch_data: torch.tensor, model: torch.nn.Module, input_shape: tuple, backprop_type: str = 'guided') -> torch.Tensor: + """ + Generate a gradient map for an input image given a pre-trained PyTorch model. + + Args: + input_batch_data (ndarray): Batch of input images as a 4D numpy array. + model (nn.Module): Pre-trained PyTorch model used to generate the gradient map. + input_shape (tuple): Shape of the input array. + backprop_type (str, optional): Type of backpropagation. Supported values: 'vanilla', 'guided', 'relu'. + Defaults to 'vanilla'. -def gradient_ascent(input_image, model, input_shape, target_class, num_iterations=100, step_size=0.01): + Returns: + gradient_maps (ndarray): Gradient map for the input images. + """ + # Set the model to evaluation mode model.eval() + + # Check if GPU is available device = torch.device("cuda" if torch.cuda.is_available() else "cpu") - input_image = input_image.to(device).detach().requires_grad_(True) - for _ in range(num_iterations): - gradients = compute_gradients(model, input_image.reshape(input_shape), target_class) - input_image = input_image + step_size * gradients.sign() - input_image = torch.clamp(input_image, 0, 1) - input_image = input_image.detach().requires_grad_(True) + # Move the model and input batch data to the appropriate device + model.to(device) + input_batch_data = torch.tensor(input_batch_data).to(device) + + # Disable gradient computation for all model parameters + for param in model.parameters(): + param.requires_grad = False + + gradient_maps = [] + for img in input_batch_data: + # Convert the input image to a PyTorch tensor with dtype=torch.float32 and enable gradient computation + img = img.clone().detach().to(torch.float32).requires_grad_(True) + + # Move the input image tensor to the same device as the model + img = img.to(device) + + # Make a forward pass through the model and get the predicted class scores for the input image + preds = model(img.reshape(input_shape)) + + # Compute the score and index of the class with the highest predicted score + score, _ = torch.max(preds, 1) - return input_image.cpu().detach().numpy() + # Reset gradients from previous iterations + model.zero_grad() + + # Compute gradients of the score with respect to the model parameters + score.backward() + + if backprop_type == 'guided': + # Apply guided backpropagation + gradients = img.grad + gradients = gradients * (gradients > 0).float() # ReLU-like operation + gradient_map = gradients.norm(dim=0) + elif backprop_type == 'relu': + # Apply ReLU backpropagation + gradients = img.grad + gradients = (gradients > 0).float() # ReLU operation + gradient_map = gradients.norm(dim=0) + else: + # Default to vanilla backpropagation + gradient_map = img.grad.norm(dim=0) + + gradient_maps.append(gradient_map.cpu().detach().numpy()) -def gradient_map(input_image, model, input_shape): + return gradient_maps + + +def gradient_ascent(input_batch_data: torch.tensor, model: torch.nn.Module, input_shape: tuple, target_neuron: int, num_iterations: int = 100, step_size: float = 1.0) -> torch.Tensor: + """ + Perform gradient ascent to generate an image that maximizes the activation of a target neuron given a pre-trained PyTorch model. + + Args: + input_batch_data (ndarray): Batch of input images as a 4D numpy array. + model (nn.Module): Pre-trained PyTorch model used for gradient ascent. + input_shape (tuple): Shape of the input array. + target_neuron (int): Index of the target neuron to maximize activation. + num_iterations (int, optional): Number of gradient ascent iterations. Defaults to 100. + step_size (float, optional): Step size for each iteration. Defaults to 1.0. + + Returns: + generated_images (ndarray): Generated images that maximize the activation of the target neuron. + """ + # Set the model to evaluation mode model.eval() + + # Check if GPU is available device = torch.device("cuda" if torch.cuda.is_available() else "cpu") - input_image = torch.tensor(input_image).to(device).detach().requires_grad_(True) - - preds = model(input_image.reshape(input_shape)) - target_class = torch.argmax(preds) - loss = F.cross_entropy(preds, target_class.unsqueeze(0)) - - model.zero_grad() - loss.backward() - - gradient = input_image.grad.data.cpu().numpy() - gradient = np.abs(gradient).mean(axis=1) # Average over channels if needed - return gradient - -def visualize_adversarial_examples(original, adversarial): - # Code to visualize original vs adversarial images - pass - -def log_metrics(success_rate, average_perturbation): - logging.info(f'Success Rate: {success_rate}, Average Perturbation: {average_perturbation}') - -class Config: - def __init__(self, epsilon=0.1, attack_method='fgsm'): - self.epsilon = epsilon - self.attack_method = attack_method \ No newline at end of file + + # Move the model to the appropriate device + model.to(device) + + # Initialize the generated images + generated_images = [] + for img in input_batch_data: + # Convert the input image to a PyTorch tensor with dtype=torch.float32 and enable gradient computation + img = img.clone().detach().to(torch.float32).requires_grad_(True) + + # Move the input image tensor to the same device as the model + img = img.to(device) + + # Perform gradient ascent + for _ in range(num_iterations): + # Make a forward pass through the model and get the activation of the target neuron + activation = model(img.reshape(input_shape))[:, target_neuron] + + # Reset gradients from previous iterations + model.zero_grad() + + # Compute gradients of the target neuron activation with respect to the input image using torch.autograd.grad + gradients = torch.autograd.grad(activation, img)[0] + + # Normalize the gradients + gradients = F.normalize(gradients, dim=0) + + # Update the input image using the gradients and step size + img.data += step_size * gradients + + # Append the generated image to the list + generated_images.append(img.cpu().detach().numpy()) + + return generated_images + +def saliency_map(input_image, model, target_class=None): + """ + Generate a saliency map for an input image given a pre-trained PyTorch model. + + Args: + input_image (torch.Tensor): Input image as a 3D torch.Tensor. + model (torch.nn.Module): Pre-trained PyTorch model used to generate the saliency map. + target_class (int, optional): Index of the target class for saliency computation. + If None, the class with the highest predicted score will be used. + + Returns: + saliency_map (torch.Tensor): Saliency map for the input image. + """ + # Set the model to evaluation mode + model.eval() + + # Check if GPU is available + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + + # Move the model and input image to the appropriate device + model.to(device) + input_image = input_image.to(device).requires_grad_() + + # Make a forward pass through the model and get the predicted class scores for the input image + logits = model(input_image.unsqueeze(0)) # Add a batch dimension + if target_class is None: + # If target class is not specified, use the class with the highest predicted score + _, target_class = torch.max(logits, 1) + + # Compute the score for the target class + target_score = logits[0, target_class] + + # Compute gradients of the target class score with respect to the input image + target_score.backward() + + # Calculate the absolute gradients as the saliency map + saliency_map = input_image.grad.abs().squeeze(0).cpu() + + return saliency_map \ No newline at end of file diff --git a/Adversarial_Observation/BirdParticle.py b/Adversarial_Observation/BirdParticle.py index ccaf63d..7008331 100755 --- a/Adversarial_Observation/BirdParticle.py +++ b/Adversarial_Observation/BirdParticle.py @@ -1,93 +1,91 @@ import torch -import torch.nn.functional as F import numpy as np class BirdParticle: - """ - Represents a particle in the Particle Swarm Optimization (PSO) algorithm for adversarial attacks (PyTorch version). - """ - - def __init__(self, model: torch.nn.Module, input_data: torch.Tensor, target_class: int, num_iterations: int = 20, - velocity: torch.Tensor = None, inertia_weight: float = 0.5, - cognitive_weight: float = 1.0, social_weight: float = 1.0, - momentum: float = 0.9, clip_value_position: float = 1.0, device='cpu'): + def __init__(self, position, w=1.0, c1=0.8, c2=0.2, minclamp=0.0, maxclamp=1.0, name=None): """ - Initialize a particle in the PSO algorithm. - + Initializes a particle. + Args: - model (torch.nn.Module): The model to attack. - input_data (torch.Tensor): The input data (image) to attack. - target_class (int): The target class for misclassification. - velocity (torch.Tensor, optional): Initial velocity; defaults to zero. - inertia_weight (float): Inertia weight for velocity update. - cognitive_weight (float): Cognitive weight for velocity update. - social_weight (float): Social weight for velocity update. - momentum (float): Momentum for velocity update. - clip_value_position (float): Max absolute value to clip position. - device (str): Device to run on. + position (torch.Tensor): The initial position of the particle. + w (float): The inertia weight. + c1 (float): The cognitive weight. + c2 (float): The social weight. """ - self.device = device - self.model = model.to(device) - self.num_iterations = num_iterations - self.original_data = input_data.clone().detach().to(device) - self.position = input_data.clone().detach().to(device) - self.target_class = target_class - self.velocity = velocity.clone().detach().to(device) if velocity is not None else torch.zeros_like(input_data).to(device) - self.best_position = self.position.clone().detach() - self.best_score = -np.inf - self.history = [self.position.clone().detach()] - self.clip_value_position = clip_value_position - - # PSO hyperparameters - self.inertia_weight = inertia_weight - self.cognitive_weight = cognitive_weight - self.social_weight = social_weight - self.momentum = momentum - - def fitness(self) -> float: + + self.position_i = position.clone().detach() + self.velocity_i = torch.rand(position.shape) # velocity + # copy the current position to the best position + + self.history = [self.position_i] + + self.pos_best_i = position.clone().detach() # best position individual + self.cost_best_i = -1 # best error individual + self.cost_i = -1 # error individual + + self.w = w + self.c1 = c1 + self.c2 = c2 + self.minclamp = minclamp + self.maxclamp = maxclamp + self.name = name + + def evaluate(self, costFunc: callable, model: torch.nn.Module): """ - Compute the fitness score for the particle, which is the softmax probability of the target class. - Returns: - float: Target class softmax probability. + Evaluates the current fitness of the particle. + + Args: + costFunc (callable): The cost function to be maximized. + This should be a function that takes in a PyTorch model and a tensor of positions and returns a tensor of shape (n, 1) where n is the number of particles. + + model (torch.nn.Module): The model to be used in the cost function. + """ - self.model.eval() - with torch.no_grad(): - input_tensor = self.position - output = self.model(input_tensor.to(self.device)) - probabilities = F.softmax(output, dim=1) - target_prob = probabilities[:, self.target_class] - return target_prob.item() - - def update_velocity(self, global_best_position: torch.Tensor) -> None: + self.cost_i = costFunc(model, self.position_i) + + # check to see if the current position is an individual best + # best has the highest confidence + if self.cost_i >= self.cost_best_i: + self.pos_best_i = self.position_i + self.cost_best_i = self.cost_i + + def update_velocity(self, pos_best_g: list): """ - Update the particle's velocity using the PSO rule. - + Updates the particle velocity based on its own position and the global best position. + Args: - global_best_position (torch.Tensor): Global best position in the swarm. + pos_best_g (list): the global best position """ - r1 = torch.rand_like(self.position).to(self.device) - r2 = torch.rand_like(self.position).to(self.device) - - inertia = self.inertia_weight * self.velocity - cognitive = self.cognitive_weight * r1 * (self.best_position - self.position) - social = self.social_weight * r2 * (global_best_position.to(self.device) - self.position) - - self.velocity = self.momentum * self.velocity + inertia + cognitive + social + r1 = torch.rand_like(self.position_i) + r2 = torch.rand_like(self.position_i) + + vel_cognitive = self.c1 * r1 * (self.pos_best_i - self.position_i) + vel_social = self.c2 * r2 * (pos_best_g - self.position_i) + self.velocity_i = self.w * self.velocity_i + vel_cognitive + vel_social - def update_position(self) -> None: + # velocity clamping to prevent explosion + vmax = 0.5 * (self.maxclamp - self.minclamp) + self.velocity_i = torch.clamp(self.velocity_i, -vmax, vmax) + + def update_position(self): """ - Update the particle's position based on the new velocity. + Updates the particle position based on its velocity. """ - self.position = self.position + self.velocity - self.position = torch.clamp(self.position, 0.0, 1.0) # Keep values in [0, 1] - self.position = torch.clamp(self.position, -self.clip_value_position, self.clip_value_position) - self.history.append(self.position.clone().detach()) + # update position based on velocity + self.position_i = self.position_i + self.velocity_i + self.position_i = torch.clamp(self.position_i, self.minclamp, self.maxclamp) + + # add current position to history + self.history.append(self.position_i) - def evaluate(self) -> None: + def get_history(self): """ - Evaluate current fitness and update personal best if needed. + Returns the history of the particle's positions. + + Args: + None + + Returns: + list: The history of the particle's positions. """ - score = self.fitness() - if score > self.best_score: - self.best_score = score - self.best_position = self.position.clone().detach() + return self.history diff --git a/Adversarial_Observation/Swarm.py b/Adversarial_Observation/Swarm.py index fa016de..34bd3bf 100755 --- a/Adversarial_Observation/Swarm.py +++ b/Adversarial_Observation/Swarm.py @@ -1,159 +1,127 @@ -import os -import logging -from typing import List +import sys import torch import numpy as np -from tqdm import tqdm -import matplotlib.pyplot as plt from Adversarial_Observation.BirdParticle import BirdParticle - - -class ParticleSwarm: - def __init__(self, model: torch.nn.Module, input_set: np.ndarray, starting_class: int, target_class: int, - num_iterations: int = 20, save_dir: str = 'results', inertia_weight: float = 0.5, - cognitive_weight: float = 0.5, social_weight: float = 0.5, momentum: float = 0.9, - clip_value_position: float = 0.2, enable_logging: bool = False, device: str = 'cpu'): - self.model = model.to(device).eval() - self.device = torch.device(device) - - self.input_set = input_set.float().view(-1, 1, 28, 28).to(self.device) - - self.start_class = starting_class - self.target_class = target_class - self.num_iterations = num_iterations - self.save_dir = save_dir - self.enable_logging = enable_logging - - self.particles: List[BirdParticle] = [ - BirdParticle(model, self.input_set[i:i + 1], target_class, - inertia_weight=inertia_weight, cognitive_weight=cognitive_weight, - social_weight=social_weight, momentum=momentum, - clip_value_position=clip_value_position, device=self.device) - for i in range(len(input_set)) +import pandas as pd + +class PSO: + def __init__(self, starting_positions: torch.Tensor, cost_func: callable, model: torch.nn.Module, + w: float = 1.0, c1: float = 0.8, c2: float = 0.2, minclamp: float = 0.0, maxclamp: float = 1.0): + """ + Initializes the Adversarial Particle Swarm Optimization algorithm. + + Args: + starting_positions (torch.Tensor): Tensor of shape (n, m) where n = number of particles, m = dimensions. + cost_func (callable): Cost function to maximize; takes (model, position) and returns scalar cost. + model (torch.nn.Module): Model used in the cost function. + w (float): Inertia weight. + c1 (float): Cognitive (self) weight. + c2 (float): Social (global) weight. + minclamp, maxclamp (float): Value clamps for position updates. + """ + self.model = model + self.cost_func = cost_func + self.epoch = 0 + self.history = [] + + # Initialize all particles + self.swarm = [ + BirdParticle(pos, w=w, c1=c1, c2=c2, minclamp=minclamp, maxclamp=maxclamp) + for pos in starting_positions ] - self.global_best_position = torch.zeros_like(self.input_set[0]) - self.global_best_score = -float('inf') - self.fitness_history: List[float] = [] - - os.makedirs(self.save_dir, exist_ok=True) - if self.enable_logging: - self.setup_logging() - self.log_progress(-1) - - def setup_logging(self): - log_file = os.path.join(self.save_dir, 'iteration_log.log') - self.logger = logging.getLogger() - self.logger.setLevel(logging.INFO) - - file_handler = logging.FileHandler(log_file) - stream_handler = logging.StreamHandler() - - formatter = logging.Formatter('%(asctime)s - %(message)s') - file_handler.setFormatter(formatter) - stream_handler.setFormatter(logging.Formatter('%(message)s')) - - self.logger.addHandler(file_handler) - self.logger.addHandler(stream_handler) - - self.logger.info(f"\n{'*' * 60}") - self.logger.info(f"ParticleSwarm Optimization (PSO) for Adversarial Attack") - self.logger.info(f"{'-' * 60}") - self.logger.info(f"Model: {self.model.__class__.__name__}") - self.logger.info(f"Target Class: {self.target_class}") - self.logger.info(f"Number of Iterations: {self.num_iterations}") - self.logger.info(f"Save Directory: {self.save_dir}") - self.logger.info(f"{'*' * 60}") - - def log_progress(self, iteration: int): - if not self.enable_logging: - return - - self.logger.info(f"\n{'-'*60}") - self.logger.info(f"Iteration {iteration + 1}/{self.num_iterations}") - self.logger.info(f"{'='*60}") - - header = f"{'Particle':<10}{'Original Pred':<15}{'Perturbed Pred':<18}{'Orig Start Prob':<20}{'Pert Start Prob':<20}{'Orig Target Prob':<20}{'Pert Target Prob':<20}{'Personal Best':<20}{'Global Best':<20}" - self.logger.info(header) - self.logger.info(f"{'-'*60}") - - for i, particle in enumerate(self.particles): - with torch.no_grad(): - original_output = self.model(particle.original_data) - perturbed_output = self.model(particle.position) - - original_probs = torch.softmax(original_output, dim=1) - perturbed_probs = torch.softmax(perturbed_output, dim=1) - - original_pred = original_output.argmax(dim=1).item() - perturbed_pred = perturbed_output.argmax(dim=1).item() - - orig_start_prob = original_probs[0, self.start_class].item() - pert_start_prob = perturbed_probs[0, self.start_class].item() - orig_target_prob = original_probs[0, self.target_class].item() - pert_target_prob = perturbed_probs[0, self.target_class].item() - - self.logger.info(f"{i+1:<10}{original_pred:<15}{perturbed_pred:<18}" - f"{orig_start_prob:<20.4f}{pert_start_prob:<20.4f}" - f"{orig_target_prob:<20.4f}{pert_target_prob:<20.4f}" - f"{particle.best_score:<20.4f}{self.global_best_score:<20.4f}") - - self.logger.info(f"{'='*60}") - - def optimize(self): - for iteration in tqdm(range(self.num_iterations), desc="Running Swarm"): - for particle in self.particles: - particle.evaluate() - particle.update_velocity(self.global_best_position) - particle.update_position() - - best_particle = max(self.particles, key=lambda p: p.best_score) - if best_particle.best_score > self.global_best_score: - self.global_best_score = best_particle.best_score - self.global_best_position = best_particle.best_position.clone() - - self.log_progress(iteration) - - def reduce_excess_perturbations(self, original_img: np.ndarray, target_label: int, model_shape: tuple = (1, 1, 28, 28)) -> List[np.ndarray]: - denoised_adv = [] - total_pixels = np.prod(original_img.shape) - - for adv_particle in tqdm(self.particles, desc="Processing Particles"): - adv_img = adv_particle.position.clone().detach().cpu().numpy().reshape(original_img.shape) - orig = original_img.copy() - - with tqdm(total=total_pixels, desc="Processing Pixels", leave=False) as pbar: - for idx in np.ndindex(original_img.shape): - if orig[idx] == adv_img[idx]: - pbar.update(1) - continue - - old_val = adv_img[idx] - adv_img[idx] = orig[idx] - - test_img = torch.from_numpy(adv_img.reshape(model_shape)).float().to(self.device) - with torch.no_grad(): - output = self.model(test_img) - pred = output.softmax(dim=1).argmax(dim=1).item() - - if pred != target_label: - adv_img[idx] = old_val + (orig[idx] - old_val) * 0.5 - test_img = torch.from_numpy(adv_img.reshape(model_shape)).float().to(self.device) - with torch.no_grad(): - output = self.model(test_img) - pred = output.softmax(dim=1).argmax(dim=1).item() - - if pred != target_label: - adv_img[idx] = old_val - - pbar.update(1) - - denoised_adv.append(adv_img) - - return denoised_adv - - def getBest(self) -> np.ndarray: - return self.global_best_position.detach().cpu().numpy() - - def getPoints(self) -> List[np.ndarray]: - return [particle.position.detach().cpu().numpy() for particle in self.particles] + # Evaluate all initial costs to find the true best particle + with torch.no_grad(): + costs = [] + for particle in self.swarm: + cost_val = self.cost_func(self.model, particle.position_i) + # Convert float → tensor if needed + if not torch.is_tensor(cost_val): + cost_val = torch.tensor(cost_val, dtype=torch.float32) + particle.cost_i = cost_val + costs.append(cost_val) + costs = torch.stack(costs) + + # Determine index of best particle (maximize cost) + best_idx = torch.argmax(costs) + self.cos_best_g = costs[best_idx].clone() + self.pos_best_g = self.swarm[best_idx].position_i.clone() + + print(f"[Init] Swarm initialized with {len(self.swarm)} particles.", file=sys.stderr, flush=True) + print(f"[Init] Best initial cost: {self.cos_best_g.item():.4f}", file=sys.stderr, flush=True) + + def step(self) -> tuple: + """ + Performs one iteration of the Adversarial Particle Swarm Optimization algorithm. + + Args: + None + + Returns: + None + + """ + self.epoch += 1 + # Update velocities and positions. + for p in self.swarm: + p.evaluate(self.cost_func, self.model) + p.update_velocity(pos_best_g=self.pos_best_g) + p.update_position() + p.evaluate(self.cost_func, self.model) + + # Update history and global best. + for particle in self.swarm: + if particle.cost_i > self.cos_best_g: + self.pos_best_g = particle.position_i + self.cos_best_g = particle.cost_i + particle.history.append(particle.position_i) + + def getPoints(self): + return torch.vstack([particle.position_i for particle in self.swarm]) + + def getBest(self): + return self.pos_best_g + + def run(self, epochs: int): + """ + Runs the Adversarial Particle Swarm Optimization algorithm for the specified number of epochs. + + Args: + epochs (int): The number of epochs to run the algorithm for. + + Returns: + None + """ + for i in range(epochs): + self.step() + + def get_history(self) -> pd.DataFrame: + """ + Returns the history of the swarm's positions for each epoch. + + Returns: + pd.DataFrame: A dataframe containing the swarm's positions at each epoch. + """ + + history = {} + + for i in range(0, self.epoch): + # Convert tensors to numpy arrays and flatten them so Pandas can handle them nicely + history[f"epoch_{i}"] = [particle.history[i].cpu().numpy().flatten().tolist() for particle in self.swarm] + + # Wrap the dictionary in a pandas DataFrame before returning + return pd.DataFrame(history) + + def save_history(self, filename): + """ + Saves the history of the swarm's positions for each epoch. + + Args: + filename (str): The filename to save the history to. + + Returns: + None + """ + history = self.get_history() + history.to_csv(filename, index=False) + diff --git a/tests/test_adversarial_attacks.py b/tests/test_adversarial_attacks.py index 92ee7ba..5d6592c 100644 --- a/tests/test_adversarial_attacks.py +++ b/tests/test_adversarial_attacks.py @@ -7,12 +7,9 @@ @pytest.fixture def sample_data(): model = load_MNIST_model() - # Ensure the model is moved to the GPU if the system has one - # This matches the automatic device assignment in Attacks.py device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model = model.to(device) - # Fake image (1, 1, 28, 28) for MNIST (batch_size, channel, height, width) input_data = torch.rand((1, 1, 28, 28)) return input_data, model @@ -22,16 +19,17 @@ def test_fgsm_attack(sample_data): epsilon = 0.1 adversarial_data = fgsm_attack(input_data, model, (1, 1, 28, 28), epsilon) + + # Convert the returned list of numpy arrays back to a tensor for comparison + adversarial_tensor = torch.tensor(np.array(adversarial_data)) - # Move back to CPU for assertion if needed - assert not torch.allclose(input_data.cpu(), adversarial_data.cpu(), atol=1e-5), "FGSM failed to perturb the input" + assert not torch.allclose(input_data.cpu(), adversarial_tensor.cpu(), atol=1e-5), "FGSM failed to perturb the input" def test_success_rate(): """Testing the success rate logic.""" original_preds = torch.tensor([0, 1, 2, 3]) adversarial_preds = torch.tensor([1, 0, 2, 3]) - # 2 out of 4 changed success_rate = compute_success_rate(original_preds, adversarial_preds) assert success_rate == 0.5 @@ -43,13 +41,14 @@ def test_gradient_ascent(sample_data): input_data, model, (1, 1, 28, 28), - target_class=3, + target_neuron=3, # Changed from target_class to target_neuron num_iterations=2, step_size=0.1 ) - assert isinstance(adv_img, np.ndarray) - assert adv_img.shape == (1, 1, 28, 28) + assert isinstance(adv_img, list) + assert isinstance(adv_img[0], np.ndarray) + assert adv_img[0].shape == (1, 28, 28) # Updated from (1, 1, 28, 28) def test_gradient_map(sample_data): """Test that the gradient mapping extraction correctly averages channels.""" @@ -57,5 +56,6 @@ def test_gradient_map(sample_data): g_map = gradient_map(input_data.numpy(), model, (1, 1, 28, 28)) - assert isinstance(g_map, np.ndarray) - assert g_map.shape == (1, 28, 28) # Averaged over the 1 channel \ No newline at end of file + assert isinstance(g_map, list) + assert isinstance(g_map[0], np.ndarray) + assert g_map[0].shape == (28, 28) # Updated from (1, 28, 28) \ No newline at end of file diff --git a/tests/test_bird_particle.py b/tests/test_bird_particle.py index cb3328b..2b02d4b 100644 --- a/tests/test_bird_particle.py +++ b/tests/test_bird_particle.py @@ -1,57 +1,44 @@ import pytest import torch -import torch.nn as nn import numpy as np from Adversarial_Observation.BirdParticle import BirdParticle -@pytest.fixture -def simple_pytorch_model(): - """Create a simple PyTorch model to match source logic.""" - return nn.Sequential( - nn.Flatten(), - nn.Linear(28 * 28, 10) - ) - @pytest.fixture def test_data(): """Generate PyTorch tensors for testing.""" - input_data = torch.rand((1, 1, 28, 28)) - target_class = 3 - return input_data, target_class + return torch.rand((1, 1, 28, 28)) @pytest.fixture -def bird_particle(simple_pytorch_model, test_data): - input_data, target_class = test_data - # Removed 'epsilon' as it is not in BirdParticle.__init__ +def bird_particle(test_data): + # Initialize with the actual parameters expected by BirdParticle.__init__ return BirdParticle( - model=simple_pytorch_model, - input_data=input_data, - target_class=target_class, - clip_value_position=0.2 + position=test_data, + minclamp=0.0, + maxclamp=0.2 ) -def test_bird_particle_initialization(bird_particle): +def test_bird_particle_initialization(bird_particle, test_data): """Test initialization against PyTorch source.""" - assert bird_particle.target_class == 3 - assert bird_particle.clip_value_position == 0.2 - assert torch.allclose(bird_particle.position, bird_particle.original_data) - assert bird_particle.best_score == -np.inf + assert bird_particle.minclamp == 0.0 + assert bird_particle.maxclamp == 0.2 + assert torch.allclose(bird_particle.position_i, test_data) + assert bird_particle.cost_best_i == -1 def test_velocity_update(bird_particle): """Test velocity update using PyTorch tensors.""" - initial_velocity = bird_particle.velocity.clone() - global_best = torch.randn_like(bird_particle.position) + initial_velocity = bird_particle.velocity_i.clone() + global_best = torch.randn_like(bird_particle.position_i) bird_particle.update_velocity(global_best) - assert not torch.allclose(initial_velocity, bird_particle.velocity) + assert not torch.allclose(initial_velocity, bird_particle.velocity_i) def test_position_update(bird_particle): """Test position update and clamping.""" # Manually set a velocity to force movement - bird_particle.velocity = torch.ones_like(bird_particle.position) * 0.5 + bird_particle.velocity_i = torch.ones_like(bird_particle.position_i) * 0.5 bird_particle.update_position() - # Check clamping (0.0 to 1.0) and clip_value_position - assert torch.all(bird_particle.position <= 1.0) - assert torch.all(bird_particle.position >= 0.0) \ No newline at end of file + # Check clamping using maxclamp (0.2) and minclamp (0.0) + assert torch.all(bird_particle.position_i <= 0.2) + assert torch.all(bird_particle.position_i >= 0.0) \ No newline at end of file diff --git a/tests/test_pso_optimization.py b/tests/test_pso_optimization.py index 68ddc51..108dd3a 100644 --- a/tests/test_pso_optimization.py +++ b/tests/test_pso_optimization.py @@ -3,7 +3,7 @@ import torch.nn as nn import numpy as np import os -from Adversarial_Observation.Swarm import ParticleSwarm +from Adversarial_Observation.Swarm import PSO @pytest.fixture def simple_model(): @@ -13,58 +13,56 @@ def simple_model(): nn.Linear(4 * 26 * 26, 10) ) +def dummy_cost_func(model, position): + # A dummy cost function to satisfy the PSO requirements during testing + return torch.sum(position).item() + @pytest.fixture def test_data(): + # 5 particles, dimensions: (1, 28, 28) input_images = torch.rand((5, 1, 28, 28)) - starting_class = 0 - target_class = 9 - return input_images, starting_class, target_class + return input_images @pytest.fixture -def particle_swarm(simple_model, test_data, tmp_path): - input_images, starting_class, target_class = test_data - - # Use pytest's built-in tmp_path fixture to avoid all permission errors - save_dir = str(tmp_path / 'test_results') - - swarm = ParticleSwarm( +def particle_swarm(simple_model, test_data): + swarm = PSO( + starting_positions=test_data, + cost_func=dummy_cost_func, model=simple_model, - input_set=input_images, - starting_class=starting_class, - target_class=target_class, - num_iterations=2, - save_dir=save_dir, - enable_logging=True + minclamp=0.0, + maxclamp=1.0 ) return swarm def test_particle_swarm_initialization(particle_swarm): """Test if swarm initializes components correctly.""" - assert particle_swarm.num_iterations == 2 - assert particle_swarm.start_class == 0 - assert len(particle_swarm.particles) == 5 - assert particle_swarm.global_best_score == -float('inf') + assert particle_swarm.epoch == 0 + assert len(particle_swarm.swarm) == 5 + assert particle_swarm.cos_best_g > -float('inf') def test_pso_optimization(particle_swarm): """Test the full optimization loop and score improvement.""" - initial_score = particle_swarm.global_best_score - particle_swarm.optimize() + initial_score = particle_swarm.cos_best_g.clone() + particle_swarm.run(epochs=2) - # The score should have updated after evaluation - assert particle_swarm.global_best_score > initial_score or particle_swarm.global_best_score > -float('inf') + # The score should have updated (or at least maintained) after evaluation + assert particle_swarm.cos_best_g >= initial_score -def test_logging_creation(particle_swarm): - """Verify log files are created in the temporary directory.""" - particle_swarm.optimize() - log_path = os.path.join(particle_swarm.save_dir, 'iteration_log.log') - assert os.path.exists(log_path) +def test_logging_creation(particle_swarm, tmp_path): + """Verify history files are created in the temporary directory.""" + particle_swarm.run(epochs=2) + save_path = str(tmp_path / 'history.csv') + + particle_swarm.save_history(save_path) + assert os.path.exists(save_path) def test_getters(particle_swarm): """Expand testing to ensure data extraction methods return expected types and shapes.""" best_pos = particle_swarm.getBest() - assert isinstance(best_pos, np.ndarray) + # PSO getBest returns a torch.Tensor, not an ndarray + assert isinstance(best_pos, torch.Tensor) points = particle_swarm.getPoints() - assert isinstance(points, list) - assert len(points) == 5 - assert isinstance(points[0], np.ndarray) \ No newline at end of file + # PSO getPoints returns a vertically stacked torch.Tensor + assert isinstance(points, torch.Tensor) + assert points.shape[0] == 5 \ No newline at end of file