diff --git a/src/simulated_bifurcation/core/ising.py b/src/simulated_bifurcation/core/ising.py index b0a66fd7..3c48a8b4 100644 --- a/src/simulated_bifurcation/core/ising.py +++ b/src/simulated_bifurcation/core/ising.py @@ -24,13 +24,13 @@ from numpy import ndarray from ..optimizer import SimulatedBifurcationEngine, SimulatedBifurcationOptimizer -from .utils import safe_get_device, safe_get_dtype +from .tensor_bearer import TensorBearer # Workaround because `Self` type is only available in Python >= 3.11 SelfIsing = TypeVar("SelfIsing", bound="Ising") -class Ising(object): +class Ising(TensorBearer): """ Internal implementation of the Ising model. @@ -92,13 +92,10 @@ def __init__( dtype: Optional[torch.dtype] = None, device: Optional[Union[str, torch.device]] = None, ) -> None: - self._dtype = safe_get_dtype(dtype) - self._device = safe_get_device(device) - - if isinstance(J, ndarray): - J = torch.from_numpy(J) - if isinstance(h, ndarray): - h = torch.from_numpy(h) + super().__init__(dtype=dtype, device=device) + J = self._safe_get_tensor(J) + if h is not None: + h = self._safe_get_tensor(h) if J.ndim != 2: raise ValueError( @@ -110,27 +107,25 @@ def __init__( f"Expected J to be square, but got {rows} rows and {cols} columns." ) - self._J = J.to(dtype=self._dtype, device=self._device) + self._J = J self._dimension = rows if h is None: - self._h = torch.zeros( - self._dimension, dtype=self._dtype, device=self._device - ) + self._h = torch.zeros(self._dimension, dtype=self.dtype, device=self.device) elif h.shape != (self._dimension,): raise ValueError( f"Expected the shape of h to be {self._dimension}, but got {tuple(h.shape)}." ) else: - self._h = h.to(dtype=self._dtype, device=self._device) + self._h = h self._has_linear_term = not torch.equal( self._h, - torch.zeros(self._dimension, dtype=self._dtype, device=self._device), + torch.zeros(self._dimension, dtype=self.dtype, device=self.device), ) def __neg__(self) -> SelfIsing: - return self.__class__(-self._J, -self._h, self._dtype, self._device) + return self.__class__(-self._J, -self._h, self.dtype, self.device) def as_simulated_bifurcation_tensor(self) -> torch.Tensor: """ @@ -179,8 +174,8 @@ def as_simulated_bifurcation_tensor(self) -> torch.Tensor: if self._has_linear_term: sb_tensor = torch.zeros( (self._dimension + 1, self._dimension + 1), - dtype=self._dtype, - device=self._device, + dtype=self.dtype, + device=self.device, ) sb_tensor[: self._dimension, : self._dimension] = symmetrical_J sb_tensor[: self._dimension, self._dimension] = -self._h @@ -351,6 +346,8 @@ def minimize( verbose, sampling_period, convergence_threshold, + self.dtype, + self.device, ) tensor = self.as_simulated_bifurcation_tensor() spins = optimizer.run_integrator(tensor, early_stopping) diff --git a/src/simulated_bifurcation/core/quadratic_polynomial.py b/src/simulated_bifurcation/core/quadratic_polynomial.py index 1cd4b726..95ad5ce4 100644 --- a/src/simulated_bifurcation/core/quadratic_polynomial.py +++ b/src/simulated_bifurcation/core/quadratic_polynomial.py @@ -28,20 +28,11 @@ from sympy import Poly from .ising import Ising -from .utils import safe_get_device, safe_get_dtype +from .tensor_bearer import TensorBearer from .variable import Variable -INTEGER_REGEX = re.compile("^int[1-9][0-9]*$") -DOMAIN_ERROR = ValueError( - f'Input type must be one of "spin" or "binary", or be a string starting' - f'with "int" and be followed by a positive integer.\n' - f"More formally, it should match the following regular expression.\n" - f"{INTEGER_REGEX}\n" - f'Examples: "int7", "int42", ...' -) - -class QuadraticPolynomial(object): +class QuadraticPolynomial(TensorBearer): """ Internal implementation of a multivariate quadratic polynomial. @@ -134,7 +125,7 @@ class QuadraticPolynomial(object): Maximize this polynomial over {0, 1, ..., 14, 15} x {0, 1, ..., 14, 15} (outputs are located on the GPU) - >>> best_vector, best_value = poly.maximize(domain="int4) + >>> best_vector, best_value = poly.maximize(domain="int4") >>> best_vector tensor([ 0., 15.], device='cuda:0') >>> best_value @@ -157,8 +148,7 @@ def __init__( dtype: Optional[torch.dtype] = None, device: Optional[Union[str, torch.device]] = None, ): - self._dtype = safe_get_dtype(dtype) - self._device = safe_get_device(device) + super().__init__(dtype=dtype, device=device) self.sb_result = None if len(polynomial_data) == 1 and isinstance(polynomial_data[0], Poly): @@ -169,17 +159,17 @@ def __init__( ) dimension = len(polynomial.gens) self._quadratic_coefficients = torch.zeros( - dimension, dimension, dtype=self._dtype, device=self._device + dimension, dimension, dtype=self.dtype, device=self.device ) self._linear_coefficients = torch.zeros( - dimension, dtype=self._dtype, device=self._device + dimension, dtype=self.dtype, device=self.device ) - self._bias = torch.tensor(0.0, dtype=self._dtype, device=self._device) + self._bias = torch.tensor(0.0, dtype=self.dtype, device=self.device) for monom, coeff in polynomial.terms(): coeff = float(coeff) if sum(monom) == 0: self._bias = torch.tensor( - coeff, dtype=self._dtype, device=self._device + coeff, dtype=self.dtype, device=self.device ) elif sum(monom) == 1: self._linear_coefficients[monom.index(1)] = coeff @@ -196,67 +186,58 @@ def __init__( self._quadratic_coefficients = None self._linear_coefficients = None self._bias = None - for tensor_like in polynomial_data: - if isinstance(tensor_like, np.ndarray): - tensor_like = torch.from_numpy(tensor_like) - elif isinstance(tensor_like, (int, float)): - tensor_like = torch.tensor( - tensor_like, dtype=self._dtype, device=self._device - ) - if isinstance(tensor_like, torch.Tensor): - if tensor_like.ndim == 0: - attribute_to_set = "_bias" - elif tensor_like.ndim == 1: - attribute_to_set = "_linear_coefficients" - elif tensor_like.ndim == 2: - attribute_to_set = "_quadratic_coefficients" - rows, cols = tensor_like.shape - if rows != cols: - raise ValueError( - "Provided quadratic coefficients tensor is not square." - ) - else: + for polynomial_data_element in polynomial_data: + # noinspection PyTypeChecker + tensor_like = self._safe_get_tensor(polynomial_data_element) + if tensor_like.ndim == 0: + attribute_to_set = "_bias" + elif tensor_like.ndim == 1: + attribute_to_set = "_linear_coefficients" + elif tensor_like.ndim == 2: + attribute_to_set = "_quadratic_coefficients" + rows, cols = tensor_like.shape + if rows != cols: raise ValueError( - f"Expected a tensor with at most 2 dimensions, got {tensor_like.ndim}." - ) - if getattr(self, attribute_to_set) is not None: - raise ValueError( - f"Providing two tensors for the same degree is ambiguous. Got at least two tensors for degree {tensor_like.ndim}." - ) - else: - if tensor_like.ndim > 0: - if dimension is None: - dimension = tensor_like.shape[0] - elif dimension != tensor_like.shape[0]: - raise ValueError( - f"Inconsistant shape among provided tensors. Expected {dimension} but got {tensor_like.shape[0]}." - ) - setattr( - self, - attribute_to_set, - tensor_like.to(dtype=self._dtype, device=self._device), + "Provided quadratic coefficients tensor is not square." ) else: raise ValueError( - f"Unsupported coefficient tensor type: {type(tensor_like)}. Expected a torch.Tensor or a numpy.ndarray." + f"Expected a tensor with at most 2 dimensions, got {tensor_like.ndim}." + ) + if getattr(self, attribute_to_set) is not None: + raise ValueError( + f"Providing two tensors for the same degree is ambiguous. Got at least two tensors for degree {tensor_like.ndim}." + ) + else: + if tensor_like.ndim > 0: + if dimension is None: + dimension = tensor_like.shape[0] + elif dimension != tensor_like.shape[0]: + raise ValueError( + f"Inconsistent shape among provided tensors. Expected {dimension} but got {tensor_like.shape[0]}." + ) + setattr( + self, + attribute_to_set, + tensor_like, ) if self._quadratic_coefficients is None: self._quadratic_coefficients = torch.zeros( - dimension, dimension, dtype=self._dtype, device=self._device + dimension, dimension, dtype=self.dtype, device=self.device ) if self._linear_coefficients is None: self._linear_coefficients = torch.zeros( - dimension, dtype=self._dtype, device=self._device + dimension, dtype=self.dtype, device=self.device ) if self._bias is None: - self._bias = torch.tensor(0.0, dtype=self._dtype, device=self._device) + self._bias = torch.tensor(0.0, dtype=self.dtype, device=self.device) self._dimension = self._quadratic_coefficients.shape[0] def __call__(self, value: Union[torch.Tensor, np.ndarray]) -> torch.Tensor: if not isinstance(value, torch.Tensor): try: - value = torch.tensor(value, dtype=self._dtype, device=self._device) + value = torch.tensor(value, dtype=self.dtype, device=self.device) except Exception as err: raise TypeError("Input value cannot be cast to Tensor.") from err @@ -368,12 +349,12 @@ def to_ising(self, domain: Union[str, List[str]]) -> Ising: """ variables = self.__get_variables(domain=domain) spin_identity_vector = QuadraticPolynomial.__spin_identity_vector( - variables=variables, dtype=self._dtype, device=self._device + variables=variables, dtype=self.dtype, device=self.device ) spin_weighted_integer_to_binary_matrix = ( spin_identity_vector + 1 ) * QuadraticPolynomial.__integer_to_binary_matrix( - variables=variables, dtype=self._dtype, device=self._device + variables=variables, dtype=self.dtype, device=self.device ) symmetric_quadratic_tensor = ( self._quadratic_coefficients + self._quadratic_coefficients.t() @@ -396,7 +377,7 @@ def to_ising(self, domain: Union[str, List[str]]) -> Ising: -1, ) torch.diag(J)[...] = 0 - return Ising(J, h, self._dtype, self._device) + return Ising(J, h, self.dtype, self.device) def convert_spins( self, optimized_spins: torch.Tensor, domain: Union[str, List[str]] @@ -441,12 +422,12 @@ def convert_spins( """ variables = self.__get_variables(domain=domain) spin_identity_vector = QuadraticPolynomial.__spin_identity_vector( - variables=variables, dtype=self._dtype, device=self._device + variables=variables, dtype=self.dtype, device=self.device ) spin_weighted_integer_to_binary_matrix = ( spin_identity_vector + 1 ) * QuadraticPolynomial.__integer_to_binary_matrix( - variables=variables, dtype=self._dtype, device=self._device + variables=variables, dtype=self.dtype, device=self.device ) return ( None @@ -581,9 +562,8 @@ def __optimize( convergence_threshold=convergence_threshold, timeout=timeout, ) - self.sb_result = self.convert_spins(optimized_spins, domain).to( - dtype=self._dtype, device=self._device - ) + self.sb_result = self._cast_tensor(self.convert_spins(optimized_spins, domain)) + result = self.sb_result.t() evaluation = self(result) if best_only: diff --git a/src/simulated_bifurcation/core/tensor_bearer.py b/src/simulated_bifurcation/core/tensor_bearer.py new file mode 100644 index 00000000..4aa0a812 --- /dev/null +++ b/src/simulated_bifurcation/core/tensor_bearer.py @@ -0,0 +1,57 @@ +from typing import Optional, Union + +import numpy as np +import torch + + +class TensorBearer: + """ + Utility abstract class to use as a parent class for objects relying on tensors. + """ + + def __init__( + self, + dtype: Optional[torch.dtype] = None, + device: Optional[Union[str, torch.device]] = None, + ): + self.__dtype = self.__safe_get_dtype(dtype) + self.__device = self.__safe_get_device(device) + + @property + def dtype(self) -> torch.dtype: + return self.__dtype + + @property + def device(self) -> torch.device: + return self.__device + + @staticmethod + def __safe_get_dtype(dtype: Optional[torch.dtype]) -> torch.dtype: + if dtype is None: + return torch.float32 + elif dtype == torch.float32 or dtype == torch.float64: + return dtype + raise ValueError( + "The Simulated Bifurcation algorithm can only run with a torch.float32 or a torch.float64 dtype." + ) + + @staticmethod + def __safe_get_device(device: Optional[Union[str, torch.device]]) -> torch.device: + return torch.get_default_device() if device is None else torch.device(device) + + def _safe_get_tensor( + self, data: Union[torch.Tensor, np.ndarray, int, float] + ) -> torch.Tensor: + if isinstance(data, torch.Tensor): + return self._cast_tensor(data) + elif isinstance(data, np.ndarray): + return self._cast_tensor(torch.from_numpy(data)) + elif isinstance(data, (int, float)): + return self._cast_tensor(torch.tensor(data)) + else: + raise TypeError( + "Tensors can only be interpreted from NumPy arrays or int/float values." + ) + + def _cast_tensor(self, tensor: torch.Tensor) -> torch.Tensor: + return tensor.to(dtype=self.dtype, device=self.device) diff --git a/src/simulated_bifurcation/core/utils.py b/src/simulated_bifurcation/core/utils.py deleted file mode 100644 index a6db0046..00000000 --- a/src/simulated_bifurcation/core/utils.py +++ /dev/null @@ -1,17 +0,0 @@ -from typing import Optional, Union - -import torch - - -def safe_get_dtype(dtype: Optional[torch.dtype]) -> torch.dtype: - if dtype is None: - return torch.float32 - elif dtype == torch.float32 or dtype == torch.float64: - return dtype - raise ValueError( - "The Simulated Bifurcation algorithm can only run with a torch.float32 or a torch.float64 dtype." - ) - - -def safe_get_device(device: Optional[Union[str, torch.device]]) -> torch.device: - return torch.get_default_device() if device is None else torch.device(device) diff --git a/src/simulated_bifurcation/core/variable.py b/src/simulated_bifurcation/core/variable.py index 6be6cb98..0e523980 100644 --- a/src/simulated_bifurcation/core/variable.py +++ b/src/simulated_bifurcation/core/variable.py @@ -4,13 +4,11 @@ class Variable: - INTEGER_REGEX = re.compile("^int[1-9][0-9]*$") - DOMAIN_ERROR = ValueError( - 'Domain type must be one of "spin" or "binary", or be a string starting ' - 'with "int" and followed by a positive integer that represents ' - "the number of bits required to encode the values of the domain. " - "More formally, it should match the following regular expression: " - '"^int[1-9][0-9]*$" (ex: "int7", "int42", ...).' + __INTEGER_REGEX = re.compile("^int[1-9][0-9]*$") + __DOMAIN_ERROR_MESSAGE = ( + 'Domain type must be one of "spin" or "binary", or be a string starting with "int" and followed by a positive ' + "integer that represents the number of bits required to encode the values of the domain. More formally, it " + 'should match the following regular expression: "^int[1-9][0-9]*$" (ex: "int7", "int42", ...).' ) def __init__(self, domain_type: OptimizationDomain, encoding_bits: int) -> None: @@ -64,6 +62,6 @@ def from_str(domain: str): return Variable(OptimizationDomain.SPIN, 1) if domain == "binary": return Variable(OptimizationDomain.BINARY, 1) - if Variable.INTEGER_REGEX.match(domain) is None: - raise Variable.DOMAIN_ERROR + if Variable.__INTEGER_REGEX.match(domain) is None: + raise ValueError(Variable.__DOMAIN_ERROR_MESSAGE) return Variable(OptimizationDomain.INTEGER, int(domain[3:])) diff --git a/src/simulated_bifurcation/optimizer/simulated_bifurcation_engine.py b/src/simulated_bifurcation/optimizer/simulated_bifurcation_engine.py index bd2114c4..c23b978d 100644 --- a/src/simulated_bifurcation/optimizer/simulated_bifurcation_engine.py +++ b/src/simulated_bifurcation/optimizer/simulated_bifurcation_engine.py @@ -28,4 +28,4 @@ def get_engine(engine_name: Literal["ballistic", "discrete"]): elif engine_name == "discrete": return SimulatedBifurcationEngine.dSB else: - raise ValueError(f"Unknwown Simulated Bifurcation engine: {engine_name}.") + raise ValueError(f"Unknown Simulated Bifurcation engine: {engine_name}.") diff --git a/src/simulated_bifurcation/optimizer/simulated_bifurcation_optimizer.py b/src/simulated_bifurcation/optimizer/simulated_bifurcation_optimizer.py index d5f74e35..2cd24ceb 100644 --- a/src/simulated_bifurcation/optimizer/simulated_bifurcation_optimizer.py +++ b/src/simulated_bifurcation/optimizer/simulated_bifurcation_optimizer.py @@ -1,32 +1,24 @@ -import logging import warnings from time import time -from typing import Optional, Tuple +from typing import Optional, Union import torch from numpy import minimum from tqdm.auto import tqdm +from ..core.tensor_bearer import TensorBearer from .environment import ENVIRONMENT from .simulated_bifurcation_engine import SimulatedBifurcationEngine from .stop_window import StopWindow from .symplectic_integrator import SymplecticIntegrator -LOGGER = logging.getLogger("simulated_bifurcation_optimizer") -CONSOLE_HANDLER = logging.StreamHandler() -CONSOLE_HANDLER.set_name(logging.WARN) -CONSOLE_HANDLER.setFormatter( - logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") -) -LOGGER.addHandler(CONSOLE_HANDLER) - class ConvergenceWarning(Warning): def __str__(self) -> str: return "No agent has converged. Returned signs of final positions instead." -class SimulatedBifurcationOptimizer: +class SimulatedBifurcationOptimizer(TensorBearer): """ The Simulated Bifurcation (SB) algorithm relies on Hamiltonian/quantum mechanics to find local minima of @@ -73,7 +65,10 @@ def __init__( verbose: bool, sampling_period: int, convergence_threshold: int, + dtype: torch.dtype, + device: Union[str, torch.device], ) -> None: + super().__init__(dtype=dtype, device=device) # Optimizer setting self.engine = engine self.window = None @@ -99,7 +94,6 @@ def __reset(self, matrix: torch.Tensor, early_stopping: bool) -> None: self.__init_window(matrix, early_stopping) self.__init_quadratic_scale_parameter(matrix) self.run = True - self.step = 0 self.start_time = None self.simulation_time = 0 @@ -138,27 +132,24 @@ def __init_window(self, matrix: torch.Tensor, early_stopping: bool) -> None: def __init_symplectic_integrator(self, matrix: torch.Tensor) -> None: self.symplectic_integrator = SymplecticIntegrator( - (matrix.shape[0], self.agents), + self.agents, + self.time_step, + self.pressure_slope, + self.heat_coefficient, self.engine.activation_function, - matrix.dtype, - matrix.device, + self.heated, + matrix, + self.dtype, + self.device, ) - def _step_update(self) -> None: - self.step += 1 - self.iterations_progress.update() - - def __check_stop(self, early_stopping: bool) -> None: - if early_stopping and self.__do_sampling: + def _check_stop(self, early_stopping: bool) -> None: + if early_stopping and self.__must_sample_spins(): self.run = self.window.must_continue() if not self.run: - LOGGER.info("Optimizer stopped. Reason: all agents converged.") return - if self.step >= self.max_steps: + if self.symplectic_integrator.step >= self.max_steps: self.run = False - LOGGER.info( - "Optimizer stopped. Reason: maximum number of iterations reached." - ) return previous_time = self.simulation_time self.simulation_time = time() - self.start_time @@ -168,50 +159,27 @@ def __check_stop(self, early_stopping: bool) -> None: self.time_progress.update(time_update) if self.simulation_time > self.timeout: self.run = False - LOGGER.info("Optimizer stopped. Reason: computation timeout reached.") return - @property - def __do_sampling(self) -> bool: - return self.step % self.sampling_period == 0 + def __must_sample_spins(self) -> bool: + return self.symplectic_integrator.step % self.sampling_period == 0 def __close_progress_bars(self): self.iterations_progress.close() self.time_progress.close() self.window.progress.close() - def __symplectic_update( - self, - matrix: torch.Tensor, - early_stopping: bool, - ) -> torch.Tensor: + def __symplectic_update(self, early_stopping: bool) -> torch.Tensor: self.start_time = time() try: while self.run: - if self.heated: - momentum_copy = self.symplectic_integrator.momentum.clone() - - ( - momentum_coefficient, - position_coefficient, - quadratic_coefficient, - ) = self.__compute_symplectic_coefficients() - self.symplectic_integrator.step( - momentum_coefficient, - position_coefficient, - quadratic_coefficient, - matrix, - ) - - if self.heated: - self.__heat(momentum_copy) - - self._step_update() - if early_stopping and self.__do_sampling: + self.symplectic_integrator.integration_step() + self.iterations_progress.update() + if early_stopping and self.__must_sample_spins(): sampled_spins = self.symplectic_integrator.sample_spins() self.window.update(sampled_spins) - self.__check_stop(early_stopping) + self._check_stop(early_stopping) except KeyboardInterrupt: warnings.warn( RuntimeWarning( @@ -223,31 +191,12 @@ def __symplectic_update( sampled_spins = self.symplectic_integrator.sample_spins() return sampled_spins - def __heat(self, momentum_copy: torch.Tensor) -> None: - torch.add( - self.symplectic_integrator.momentum, - momentum_copy, - alpha=self.time_step * self.heat_coefficient, - out=self.symplectic_integrator.momentum, - ) - - def __compute_symplectic_coefficients(self) -> Tuple[float, float, float]: - pressure = self.__pressure - position_coefficient = self.time_step - momentum_coefficient = self.time_step * (pressure - 1.0) - quadratic_coefficient = self.time_step * self.quadratic_scale_parameter - return momentum_coefficient, position_coefficient, quadratic_coefficient - - @property - def __pressure(self): - return minimum(self.time_step * self.step * self.pressure_slope, 1.0) - def run_integrator( self, matrix: torch.Tensor, early_stopping: bool ) -> torch.Tensor: """ Runs the Simulated Bifurcation (SB) algorithm. Given an input matrix, - the SB algorithm aims at finding the groud state of the Ising model + the SB algorithm aims at finding the ground state of the Ising model defined from this matrix, i.e. the {-1, +1}-vector that minimizes the Ising energy defined as `-0.5 * ΣΣ J(i,j)x(i)x(j)`, where `J` designates the matrix. @@ -276,7 +225,7 @@ def run_integrator( ): raise ValueError("No stopping criterion provided.") self.__reset(matrix, early_stopping) - spins = self.__symplectic_update(matrix, early_stopping) + spins = self.__symplectic_update(early_stopping) self.__close_progress_bars() return self.get_final_spins(spins, early_stopping) diff --git a/src/simulated_bifurcation/optimizer/stop_window.py b/src/simulated_bifurcation/optimizer/stop_window.py index 1829916b..efcf3c4b 100644 --- a/src/simulated_bifurcation/optimizer/stop_window.py +++ b/src/simulated_bifurcation/optimizer/stop_window.py @@ -3,8 +3,10 @@ import torch from tqdm.auto import tqdm +from ..core.tensor_bearer import TensorBearer -class StopWindow: + +class StopWindow(TensorBearer): """ Optimization tool to monitor spins bifurcation and convergence for the Simulated Bifurcation (SB) algorithm. @@ -20,12 +22,11 @@ def __init__( device: Union[str, torch.device], verbose: bool, ) -> None: + super().__init__(dtype=dtype, device=device) self.ising_tensor = ising_tensor self.n_spins = self.ising_tensor.shape[0] self.n_agents = n_agents self.__init_convergence_threshold(convergence_threshold) - self.dtype = dtype - self.device = device self.__init_tensors() self.__init_energies() self.final_spins = self.__init_spins() @@ -67,7 +68,9 @@ def __init_tensor(self, dtype: torch.dtype) -> torch.Tensor: def __init_energies(self) -> None: self.energies = torch.tensor( - [float("inf") for _ in range(self.n_agents)], device=self.device + [float("inf") for _ in range(self.n_agents)], + dtype=self.dtype, + device=self.device, ) def __init_tensors(self) -> None: diff --git a/src/simulated_bifurcation/optimizer/symplectic_integrator.py b/src/simulated_bifurcation/optimizer/symplectic_integrator.py index 4c4e77fd..92433d22 100644 --- a/src/simulated_bifurcation/optimizer/symplectic_integrator.py +++ b/src/simulated_bifurcation/optimizer/symplectic_integrator.py @@ -1,64 +1,97 @@ from typing import Callable, Tuple import torch +from numpy import minimum +from ..core.tensor_bearer import TensorBearer -class SymplecticIntegrator: + +class SymplecticIntegrator(TensorBearer): """ - Simulates the evolution of spins' momentum and position following - the Hamiltonian quantum mechanics equations that drive the - Simulated Bifurcation (SB) algorithm. + Simulates the evolution of spins' momentum and position following the Hamiltonian quantum mechanics equations that + drive the Simulated Bifurcation (SB) algorithm. """ def __init__( self, - shape: Tuple[int, int], + n_oscillators: int, + time_step: float, + pressure_slope: float, + heat_coefficient: float, activation_function: Callable[[torch.Tensor], torch.Tensor], + heat: bool, + quadratic_tensor: torch.Tensor, dtype: torch.dtype, device: torch.device, ): - self.position = self.__init_oscillator(shape, dtype, device) - self.momentum = self.__init_oscillator(shape, dtype, device) + super().__init__(dtype=dtype, device=device) + n_spins = quadratic_tensor.shape[0] + self.position = self.init_oscillator((n_spins, n_oscillators)) + self.momentum = self.init_oscillator((n_spins, n_oscillators)) + self.time_step = time_step + self.pressure_slope = pressure_slope + self.heat_coefficient = heat_coefficient self.activation_function = activation_function + self.heat = heat + self.quadratic_tensor = self._cast_tensor(quadratic_tensor) + self.quadratic_scale_parameter = ( + 0.5 * (n_spins - 1) ** 0.5 / (torch.sqrt(torch.sum(quadratic_tensor**2))) + ) + self.step = 0 - @staticmethod - def __init_oscillator( - shape: Tuple[int, int], dtype: torch.dtype, device: torch.device - ): - return 2 * torch.rand(size=shape, device=device, dtype=dtype) - 1 + def init_oscillator(self, shape: Tuple[int, int]) -> torch.Tensor: + return 2.0 * torch.rand(size=shape, device=self.device, dtype=self.dtype) - 1.0 - def position_update(self, coefficient: float) -> None: - torch.add(self.position, self.momentum, alpha=coefficient, out=self.position) + def position_update(self) -> None: + torch.add( + self.position, + self.momentum, + alpha=self.time_step, + out=self.position, + ) - def momentum_update(self, coefficient: float) -> None: - torch.add(self.momentum, self.position, alpha=coefficient, out=self.momentum) + def momentum_update(self) -> None: + torch.add( + self.momentum, + self.position, + alpha=self.time_step * (self.get_current_pressure() - 1.0), + out=self.momentum, + ) - def quadratic_momentum_update( - self, coefficient: float, matrix: torch.Tensor - ) -> None: + def quadratic_momentum_update(self) -> None: # do not use out=self.position because of side effects self.momentum = torch.addmm( self.momentum, - matrix, + self.quadratic_tensor, self.activation_function(self.position), - alpha=coefficient, + alpha=self.time_step * self.quadratic_scale_parameter, ) def simulate_inelastic_walls(self) -> None: self.momentum[torch.abs(self.position) > 1.0] = 0.0 torch.clip(self.position, -1.0, 1.0, out=self.position) - def step( - self, - momentum_coefficient: float, - position_coefficient: float, - quadratic_coefficient: float, - matrix: torch.Tensor, - ) -> None: - self.momentum_update(momentum_coefficient) - self.position_update(position_coefficient) - self.quadratic_momentum_update(quadratic_coefficient, matrix) + def simulate_heating(self, momentum_copy: torch.Tensor) -> None: + torch.add( + self.momentum, + momentum_copy, + alpha=self.time_step * self.heat_coefficient, + out=self.momentum, + ) + + def get_current_pressure(self) -> float: + return minimum(self.time_step * self.step * self.pressure_slope, 1.0) + + def integration_step(self) -> None: + if self.heat: + momentum_copy = self.momentum.clone() + self.momentum_update() + self.quadratic_momentum_update() + self.position_update() self.simulate_inelastic_walls() + if self.heat: + self.simulate_heating(momentum_copy) + self.step += 1 def sample_spins(self) -> torch.Tensor: return torch.where(self.position >= 0.0, 1.0, -1.0) diff --git a/tests/core/test_ising.py b/tests/core/test_ising.py index 5ab2f455..1b8bd547 100644 --- a/tests/core/test_ising.py +++ b/tests/core/test_ising.py @@ -42,8 +42,8 @@ def test_init_ising( assert torch.equal(torch.zeros(3, dtype=dtype, device=device), ising._h) assert ising._has_linear_term == use_linear_term assert ising._dimension == 3 - assert ising._dtype == torch.float32 if dtype is None else dtype - assert ising._device == torch.get_default_device() if device is None else device + assert ising.dtype == torch.float32 if dtype is None else dtype + assert ising.device == torch.get_default_device() if device is None else device def test_init_ising_with_non_2_dimensional_J(): @@ -131,9 +131,9 @@ def test_negative_ising( ) assert negative_ising._has_linear_term == use_linear_term assert negative_ising._dimension == 3 - assert negative_ising._dtype == torch.float32 if dtype is None else dtype + assert negative_ising.dtype == torch.float32 if dtype is None else dtype assert ( - negative_ising._device == torch.get_default_device() + negative_ising.device == torch.get_default_device() if device is None else device ) diff --git a/tests/core/test_quadratic_polynomial.py b/tests/core/test_quadratic_polynomial.py index 318ae564..c0ade6ed 100644 --- a/tests/core/test_quadratic_polynomial.py +++ b/tests/core/test_quadratic_polynomial.py @@ -3,7 +3,7 @@ import numpy as np import pytest import torch -from sympy import Poly, poly, symbols +from sympy import poly, symbols from src.simulated_bifurcation.core import QuadraticPolynomial @@ -21,8 +21,8 @@ def test_init_from_poly(dtype: torch.dtype, device: torch.device): ) quadratic_polynomial = QuadraticPolynomial(polynomial, dtype=dtype, device=device) - assert quadratic_polynomial._dtype == dtype - assert quadratic_polynomial._device == device + assert quadratic_polynomial.dtype == dtype + assert quadratic_polynomial.device == device assert quadratic_polynomial._dimension == 3 assert torch.equal( torch.tensor( @@ -55,8 +55,8 @@ def test_init_from_poly_no_bias(dtype: torch.dtype, device: torch.device): ) quadratic_polynomial = QuadraticPolynomial(polynomial, dtype=dtype, device=device) - assert quadratic_polynomial._dtype == dtype - assert quadratic_polynomial._device == device + assert quadratic_polynomial.dtype == dtype + assert quadratic_polynomial.device == device assert quadratic_polynomial._dimension == 3 assert torch.equal( torch.tensor( @@ -87,8 +87,8 @@ def test_init_from_poly_no_degree_1_monoms(dtype: torch.dtype, device: torch.dev polynomial = poly(x**2 + 3 * y**2 - 5 * z**2 + 4 * x * y - 2 * y * z + 6) quadratic_polynomial = QuadraticPolynomial(polynomial, dtype=dtype, device=device) - assert quadratic_polynomial._dtype == dtype - assert quadratic_polynomial._device == device + assert quadratic_polynomial.dtype == dtype + assert quadratic_polynomial.device == device assert quadratic_polynomial._dimension == 3 assert torch.equal( torch.tensor( @@ -119,8 +119,8 @@ def test_init_from_poly_no_degree_2_monoms(dtype: torch.dtype, device: torch.dev polynomial = poly(8 * x - 7 * y + z + 6) quadratic_polynomial = QuadraticPolynomial(polynomial, dtype=dtype, device=device) - assert quadratic_polynomial._dtype == dtype - assert quadratic_polynomial._device == device + assert quadratic_polynomial.dtype == dtype + assert quadratic_polynomial.device == device assert quadratic_polynomial._dimension == 3 assert torch.equal( torch.tensor( @@ -477,7 +477,7 @@ def test_build_polynomial_from_tensor( with pytest.raises( ValueError, - match="Inconsistant shape among provided tensors. Expected 3 but got 2.", + match="Inconsistent shape among provided tensors. Expected 3 but got 2.", ): QuadraticPolynomial( torch.zeros(3, 3, dtype=dtype, device=device), @@ -486,7 +486,7 @@ def test_build_polynomial_from_tensor( with pytest.raises( ValueError, - match="Inconsistant shape among provided tensors. Expected 2 but got 3.", + match="Inconsistent shape among provided tensors. Expected 2 but got 3.", ): QuadraticPolynomial( torch.zeros(2, dtype=dtype, device=device), @@ -496,8 +496,8 @@ def test_build_polynomial_from_tensor( def test_build_polynomial_with_wrong_domain(): with pytest.raises( - ValueError, - match="Unsupported coefficient tensor type: . Expected a torch.Tensor or a numpy.ndarray.", + TypeError, + match="Tensors can only be interpreted from NumPy arrays or int/float values.", ): QuadraticPolynomial("Hello world!") diff --git a/tests/core/test_tensor_bearer.py b/tests/core/test_tensor_bearer.py new file mode 100644 index 00000000..202df019 --- /dev/null +++ b/tests/core/test_tensor_bearer.py @@ -0,0 +1,98 @@ +import pytest +import torch +from numpy import array + +from src.simulated_bifurcation.core.tensor_bearer import TensorBearer + +from ..test_utils import BOOLEANS, DEVICES, DTYPES + +CPU = torch.device("cpu") + + +def test_init_with_default_dtype_and_device(): + tensor_bearer = TensorBearer() + assert torch.float32 == tensor_bearer.dtype + assert CPU == tensor_bearer.device + + +@pytest.mark.parametrize( + "dtype, device, device_as_str", + [ + (dtype, device, device_as_str) + for dtype in DTYPES + for device in DEVICES + for device_as_str in BOOLEANS + ], +) +def test_init_with_allowed_dtype_and_device( + dtype: torch.dtype, device: torch.device, device_as_str: bool +): + tensor_bearer = TensorBearer( + dtype=dtype, device=str(device) if device_as_str else device + ) + assert dtype == tensor_bearer.dtype + assert device == tensor_bearer.device + + +@pytest.mark.parametrize( + "dtype", [torch.float16, torch.int8, torch.int16, torch.int32, torch.int64] +) +def test_init_with_unauthorized_dtype(dtype: torch.dtype): + with pytest.raises( + ValueError, + match="The Simulated Bifurcation algorithm can only run with a torch.float32 or a torch.float64 dtype.", + ): + TensorBearer(dtype=dtype) + + +@pytest.mark.parametrize( + "dtype, device", + [(dtype, device) for dtype in DTYPES for device in DEVICES], +) +def test_convert_data_to_tensor(dtype: torch.dtype, device: torch.device): + tensor_bearer = TensorBearer(dtype=dtype, device=device) + data = [[1.0, 2.0], [3.0, 4.0]] + expected_data_tensor = torch.tensor(data, dtype=dtype, device=device) + expected_number_tensor = torch.tensor(2.0, dtype=dtype, device=device) + + # From tensor + assert torch.equal( + expected_data_tensor, tensor_bearer._safe_get_tensor(expected_data_tensor) + ) + + # From NumPy array + assert torch.equal( + expected_data_tensor, tensor_bearer._safe_get_tensor(array(data)) + ) + + # From single numeric value + assert torch.equal(expected_number_tensor, tensor_bearer._safe_get_tensor(2)) + assert torch.equal(expected_number_tensor, tensor_bearer._safe_get_tensor(2.0)) + + # From "raw" data + with pytest.raises( + TypeError, + match="Tensors can only be interpreted from NumPy arrays or int/float values.", + ): + # noinspection PyTypeChecker + tensor_bearer._safe_get_tensor(data) + + +@pytest.mark.parametrize( + "from_dtype, to_dtype, device", + [ + (from_dtype, to_dtype, device) + for from_dtype in DTYPES + for to_dtype in DTYPES + for device in DEVICES + ], +) +def test_cast_tensor( + from_dtype: torch.dtype, to_dtype: torch.dtype, device: torch.device +): + tensor_bearer = TensorBearer(dtype=to_dtype, device=device) + data = [[1.0, 2.0], [3.0, 4.0]] + assert torch.equal( + torch.tensor(data, dtype=to_dtype, device=device), + tensor_bearer._cast_tensor(torch.tensor(data, dtype=from_dtype, device=device)), + ) diff --git a/tests/core/test_utils.py b/tests/core/test_utils.py deleted file mode 100644 index 2a1d9783..00000000 --- a/tests/core/test_utils.py +++ /dev/null @@ -1,34 +0,0 @@ -import pytest -import torch - -from src.simulated_bifurcation.core.utils import safe_get_device, safe_get_dtype - - -def test_get_default_dtype(): - assert torch.float32 == safe_get_dtype(None) - - -@pytest.mark.parametrize("dtype", [torch.float32, torch.float64]) -def test_get_proper_dtype(dtype: torch.dtype): - assert dtype == safe_get_dtype(dtype) - - -@pytest.mark.parametrize( - "dtype", [torch.float16, torch.int8, torch.int16, torch.int32, torch.int64] -) -def test_get_unauthorized_dtype(dtype: torch.dtype): - with pytest.raises( - ValueError, - match="The Simulated Bifurcation algorithm can only run with a torch.float32 or a torch.float64 dtype.", - ): - safe_get_dtype(dtype) - - -def test_get_default_device(): - assert torch.get_default_device() == safe_get_device(None) - - -def test_get_device(): - cpu = torch.device("cpu") - assert cpu == safe_get_device(cpu) - assert cpu == safe_get_device("cpu") diff --git a/tests/core/test_variable.py b/tests/core/test_variable.py index b58b9f36..62b69338 100644 --- a/tests/core/test_variable.py +++ b/tests/core/test_variable.py @@ -58,11 +58,10 @@ def test_init_integer_variable_from_string(encoding_bits: int): def test_init_integer_variable_from_wrong_string_pattern(): expected_error_message = ( - r"Domain type must be one of \"spin\" or \"binary\", or be a string starting " - r"with \"int\" and followed by a positive integer that represents " - r"the number of bits required to encode the values of the domain\. " - r"More formally, it should match the following regular expression: " - r"\"\^int\[1-9\]\[0-9\]\*\$\" \(ex: \"int7\", \"int42\", \.\.\.\)\." + r"Domain type must be one of \"spin\" or \"binary\", or be a string starting with \"int\" and followed by a " + r"positive integer that represents the number of bits required to encode the values of the domain\. More " + r"formally, it should match the following regular expression: \"\^int\[1-9\]\[0-9\]\*\$\" \(ex: \"int7\", " + r"\"int42\", \.\.\.\)\." ) with pytest.raises(ValueError, match=expected_error_message): Variable.from_str("int0") diff --git a/tests/optimizer/__init__.py b/tests/optimizer/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/optimizer/test_optimizer.py b/tests/optimizer/test_optimizer.py index 36672317..996b82a6 100644 --- a/tests/optimizer/test_optimizer.py +++ b/tests/optimizer/test_optimizer.py @@ -113,7 +113,16 @@ def test_set_optimization_environment(): torch.manual_seed(42) set_env(time_step=0.05, pressure_slope=0.005, heat_coefficient=0.1) optimizer = SimulatedBifurcationOptimizer( - 128, 10000, None, SimulatedBifurcationEngine.bSB, True, True, 50, 50 + 128, + 10000, + None, + SimulatedBifurcationEngine.bSB, + True, + True, + 50, + 50, + torch.float32, + torch.device("cpu"), ) assert optimizer.heat_coefficient == 0.1 assert optimizer.pressure_slope == 0.005 @@ -125,7 +134,16 @@ def test_set_only_one_optimization_variable(): torch.manual_seed(42) set_env(time_step=0.05) optimizer = SimulatedBifurcationOptimizer( - 128, 10000, None, SimulatedBifurcationEngine.bSB, True, True, 50, 50 + 128, + 10000, + None, + SimulatedBifurcationEngine.bSB, + True, + True, + 50, + 50, + torch.float32, + torch.device("cpu"), ) assert optimizer.heat_coefficient == 0.06 assert optimizer.pressure_slope == 0.01 @@ -139,7 +157,16 @@ def test_wrong_value_throws_exception_and_variables_not_updated(): # noinspection PyTypeChecker set_env(heat_coefficient="Hello world!") optimizer = SimulatedBifurcationOptimizer( - 128, 10000, None, SimulatedBifurcationEngine.bSB, True, True, 50, 50 + 128, + 10000, + None, + SimulatedBifurcationEngine.bSB, + True, + True, + 50, + 50, + torch.float32, + torch.device("cpu"), ) assert optimizer.heat_coefficient == 0.06 assert optimizer.pressure_slope == 0.01 @@ -159,7 +186,16 @@ def test_timeout(): h = torch.tensor([1, 0, -2], dtype=torch.float32) ising = Ising(J, h) optimizer = SimulatedBifurcationOptimizer( - 128, None, 3.0, SimulatedBifurcationEngine.bSB, True, True, 50, 50 + 128, + None, + 3.0, + SimulatedBifurcationEngine.bSB, + True, + True, + 50, + 50, + torch.float32, + torch.device("cpu"), ) optimizer.run_integrator(ising.as_simulated_bifurcation_tensor(), False) assert optimizer.simulation_time > 3.0 @@ -178,7 +214,16 @@ def test_window(): h = torch.tensor([1, 0, -2], dtype=torch.float32) ising = Ising(J, h) optimizer = SimulatedBifurcationOptimizer( - 1, 100000, None, SimulatedBifurcationEngine.bSB, True, True, 1, 1 + 1, + 100000, + None, + SimulatedBifurcationEngine.bSB, + True, + True, + 1, + 1, + torch.float32, + torch.device("cpu"), ) optimizer.run_integrator(ising.as_simulated_bifurcation_tensor(), True) @@ -196,10 +241,19 @@ def test_max_steps(): h = torch.tensor([1, 0, -2], dtype=torch.float32) ising = Ising(J, h) optimizer = SimulatedBifurcationOptimizer( - 1, 10, None, SimulatedBifurcationEngine.bSB, True, True, 50, 50 + 1, + 10, + None, + SimulatedBifurcationEngine.bSB, + True, + True, + 50, + 50, + torch.float32, + torch.device("cpu"), ) optimizer.run_integrator(ising.as_simulated_bifurcation_tensor(), False) - assert optimizer.step == 10 + assert optimizer.symplectic_integrator.step == 10 def test_no_stop_criterion(): @@ -215,7 +269,16 @@ def test_no_stop_criterion(): h = torch.tensor([1, 0, -2], dtype=torch.float32) ising = Ising(J, h) optimizer = SimulatedBifurcationOptimizer( - 1, None, None, SimulatedBifurcationEngine.bSB, True, True, 50, 50 + 1, + None, + None, + SimulatedBifurcationEngine.bSB, + True, + True, + 50, + 50, + torch.float32, + torch.device("cpu"), ) with pytest.raises(ValueError, match="No stopping criterion provided."): optimizer.run_integrator(ising.as_simulated_bifurcation_tensor(), False) @@ -223,9 +286,9 @@ def test_no_stop_criterion(): def test_keyboard_interrupt(): class SimulatedBifurcationOptimizerTest(SimulatedBifurcationOptimizer): - def _step_update(self) -> None: - super()._step_update() - if self.step >= 1000: + def _check_stop(self, early_stopping: bool) -> None: + super()._check_stop(early_stopping) + if self.symplectic_integrator.step >= 1000: raise KeyboardInterrupt torch.manual_seed(42) @@ -240,7 +303,16 @@ def _step_update(self) -> None: h = torch.tensor([1, 0, -2], dtype=torch.float32) ising = Ising(J, h) optimizer = SimulatedBifurcationOptimizerTest( - 20, 10000, None, SimulatedBifurcationEngine.bSB, True, False, 50, 50 + 20, + 10000, + None, + SimulatedBifurcationEngine.bSB, + True, + False, + 50, + 50, + torch.float32, + torch.device("cpu"), ) with pytest.warns( RuntimeWarning, @@ -250,4 +322,4 @@ def _step_update(self) -> None: assert isinstance(spins, torch.Tensor) assert (4, 20) == tuple(spins.shape) assert torch.all(torch.abs(spins) == 1.0) - assert optimizer.step < 10000 + assert optimizer.symplectic_integrator.step < 10000 diff --git a/tests/optimizer/test_simulated_bifurcation_engine.py b/tests/optimizer/test_simulated_bifurcation_engine.py index 59e565bb..65c38b3d 100644 --- a/tests/optimizer/test_simulated_bifurcation_engine.py +++ b/tests/optimizer/test_simulated_bifurcation_engine.py @@ -13,6 +13,6 @@ def test_simulated_bifurcation_engine(): "discrete" ) with pytest.raises( - ValueError, match="Unknwown Simulated Bifurcation engine: unknown-engine." + ValueError, match="Unknown Simulated Bifurcation engine: unknown-engine." ): SimulatedBifurcationEngine.get_engine("unknown-engine") diff --git a/tests/optimizer/test_symplectic_integrator.py b/tests/optimizer/test_symplectic_integrator.py index a65d5453..53afba90 100644 --- a/tests/optimizer/test_symplectic_integrator.py +++ b/tests/optimizer/test_symplectic_integrator.py @@ -1,288 +1,291 @@ +from typing import Callable + +import pytest import torch from src.simulated_bifurcation.optimizer import SymplecticIntegrator +from ..test_utils import DEVICES, DTYPES + +initial_position = [[-0.7894, -0.4610], [-0.2343, 0.9186], [-0.2191, 0.2018]] +initial_momentum = [[-0.4869, 0.5873], [0.8815, -0.7336], [0.8692, 0.1872]] +quadratic_tensor = [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]] -def test_init_ballistic_symplectic_integrator(): + +def init_integrator( + dtype: torch.dtype, + device: torch.device, + activation_function: Callable[[torch.Tensor], torch.Tensor], + heat: bool, +) -> SymplecticIntegrator: symplectic_integrator = SymplecticIntegrator( - (3, 2), torch.nn.Identity(), torch.float32, "cpu" + 2, + 0.1, + 0.01, + 0.06, + activation_function, + heat, + torch.tensor(quadratic_tensor, dtype=dtype, device=device), + dtype, + device, ) symplectic_integrator.position = torch.tensor( - [ - [-0.7894, -0.4610], - [-0.2343, 0.9186], - [-0.2191, 0.2018], - ], - dtype=torch.float32, + initial_position, dtype=dtype, device=device ) + symplectic_integrator.momentum = torch.tensor( + initial_momentum, dtype=dtype, device=device + ) + return symplectic_integrator + + +def assert_tensors_equality(expected: torch.Tensor, actual: torch.Tensor): + assert torch.all(torch.isclose(expected, actual, atol=1e-4)) + + +@pytest.mark.parametrize( + "dtype, device", [(dtype, device) for dtype in DTYPES for device in DEVICES] +) +def test_sample_spins(dtype: torch.dtype, device: torch.device): + symplectic_integrator = init_integrator(dtype, device, torch.nn.Identity(), False) assert torch.equal( symplectic_integrator.sample_spins(), torch.tensor( - [ - [-1, -1], - [-1, 1], - [-1, 1], - ], - dtype=torch.float32, + [[-1, -1], [-1, 1], [-1, 1]], + dtype=dtype, + device=device, ), ) -def test_init_discrete_symplectic_integrator(): - symplectic_integrator = SymplecticIntegrator( - (3, 2), torch.sign, torch.float32, "cpu" +@pytest.mark.parametrize( + "dtype, device", [(dtype, device) for dtype in DTYPES for device in DEVICES] +) +def test_position_update(dtype: torch.dtype, device: torch.device): + symplectic_integrator = init_integrator(dtype, device, torch.nn.Identity(), False) + symplectic_integrator.position_update() + assert_tensors_equality( + torch.tensor( + [[-0.8381, -0.4023], [-0.1461, 0.8452], [-0.1322, 0.2205]], + dtype=dtype, + device=device, + ), + symplectic_integrator.position, ) - symplectic_integrator.position = torch.tensor( - [ - [-0.7894, -0.4610], - [-0.2343, 0.9186], - [-0.2191, 0.2018], - ], - dtype=torch.float32, + + +@pytest.mark.parametrize( + "dtype, device", [(dtype, device) for dtype in DTYPES for device in DEVICES] +) +def test_momentum_update(dtype: torch.dtype, device: torch.device): + symplectic_integrator = init_integrator(dtype, device, torch.nn.Identity(), False) + symplectic_integrator.momentum_update() + assert_tensors_equality( + torch.tensor( + [[-0.4080, 0.6334], [0.9049, -0.8255], [0.8911, 0.1670]], + dtype=dtype, + device=device, + ), + symplectic_integrator.momentum, ) - assert torch.equal( - symplectic_integrator.sample_spins(), + + +@pytest.mark.parametrize( + "dtype, device", [(dtype, device) for dtype in DTYPES for device in DEVICES] +) +def test_quadratic_position_update_ballistic(dtype: torch.dtype, device: torch.device): + symplectic_integrator = init_integrator(dtype, device, torch.nn.Identity(), False) + symplectic_integrator.quadratic_momentum_update() + assert_tensors_equality( torch.tensor( - [ - [-1, -1], - [-1, 1], - [-1, 1], - ], - dtype=torch.float32, + [[-0.4949, 0.5956], [0.8579, -0.7170], [0.8299, 0.2121]], + dtype=dtype, + device=device, ), + symplectic_integrator.momentum, ) -def test_position_update(): - symplectic_integrator = SymplecticIntegrator( - (3, 2), torch.nn.Identity(), torch.float32, "cpu" +@pytest.mark.parametrize( + "dtype, device", [(dtype, device) for dtype in DTYPES for device in DEVICES] +) +def test_quadratic_position_update_discrete(dtype: torch.dtype, device: torch.device): + symplectic_integrator = init_integrator(dtype, device, torch.sign, False) + symplectic_integrator.quadratic_momentum_update() + assert_tensors_equality( + torch.tensor( + [[-0.5120, 0.6041], [0.8187, -0.7043], [0.7687, 0.2291]], + dtype=dtype, + device=device, + ), + symplectic_integrator.momentum, ) + + +@pytest.mark.parametrize( + "dtype, device", [(dtype, device) for dtype in DTYPES for device in DEVICES] +) +def test_inelastic_walls_simulation(dtype: torch.dtype, device: torch.device): + symplectic_integrator = init_integrator(dtype, device, torch.nn.Identity(), False) symplectic_integrator.position = torch.tensor( - [ - [-0.7894, -0.4610], - [-0.2343, 0.9186], - [-0.2191, 0.2018], - ], - dtype=torch.float32, + [[-2.7894, -0.4610], [-1.2343, 1.9186], [-0.2191, 0.2018]], + dtype=dtype, + device=device, ) symplectic_integrator.momentum = torch.tensor( - [ - [-0.4869, 0.5873], - [0.8815, -0.7336], - [0.8692, 0.1872], - ], - dtype=torch.float32, + [[-0.4869, 0.5873], [0.8815, -0.7336], [0.8692, 0.1872]], + dtype=dtype, + device=device, ) - symplectic_integrator.position_update(0.2) - assert torch.all( - torch.isclose( - symplectic_integrator.position, - torch.tensor( - [ - [-0.8868, -0.3435], - [-0.0580, 0.7719], - [-0.0453, 0.2392], - ], - dtype=torch.float32, - ), - atol=1e-4, - ) + symplectic_integrator.simulate_inelastic_walls() + assert_tensors_equality( + torch.tensor( + [[-1, -0.4610], [-1, 1], [-0.2191, 0.2018]], + dtype=dtype, + device=device, + ), + symplectic_integrator.position, + ) + assert_tensors_equality( + torch.tensor( + [[0, 0.5873], [0, 0], [0.8692, 0.1872]], + dtype=dtype, + device=device, + ), + symplectic_integrator.momentum, ) -def test_momentum_update(): - symplectic_integrator = SymplecticIntegrator( - (3, 2), torch.nn.Identity(), torch.float32, "cpu" - ) +@pytest.mark.parametrize( + "dtype, device", [(dtype, device) for dtype in DTYPES for device in DEVICES] +) +def test_full_ballistic_step(dtype: torch.dtype, device: torch.device): + symplectic_integrator = init_integrator(dtype, device, torch.nn.Identity(), False) symplectic_integrator.position = torch.tensor( - [ - [-0.7894, -0.4610], - [-0.2343, 0.9186], - [-0.2191, 0.2018], - ], - dtype=torch.float32, + [[-2.7894, -0.4610], [-1.2343, 1.9186], [-0.2191, 0.2018]], + dtype=dtype, + device=device, ) symplectic_integrator.momentum = torch.tensor( - [ - [-0.4869, 0.5873], - [0.8815, -0.7336], - [0.8692, 0.1872], - ], - dtype=torch.float32, + [[-0.4869, 0.5873], [0.8815, -0.7336], [0.8692, 0.1872]], + dtype=dtype, + device=device, + ) + symplectic_integrator.integration_step() + assert_tensors_equality( + torch.tensor( + [[-1.0000, -0.3960], [-1.0000, 1.0000], [-0.1431, 0.2243]], + dtype=dtype, + device=device, + ), + symplectic_integrator.position, ) - symplectic_integrator.momentum_update(0.2) - assert torch.all( - torch.isclose( - symplectic_integrator.momentum, - torch.tensor( - [ - [-0.6448, 0.4951], - [0.8346, -0.5499], - [0.8254, 0.2276], - ], - dtype=torch.float32, - ), - atol=1e-4, - ) + assert_tensors_equality( + torch.tensor( + [[0.0000, 0.6501], [0.0000, 0.0000], [0.7597, 0.2254]], + dtype=dtype, + device=device, + ), + symplectic_integrator.momentum, ) -def test_quadratic_position_update(): - symplectic_integrator = SymplecticIntegrator( - (3, 2), torch.nn.Identity(), torch.float32, "cpu" - ) +@pytest.mark.parametrize( + "dtype, device", [(dtype, device) for dtype in DTYPES for device in DEVICES] +) +def test_full_discrete_step(dtype: torch.dtype, device: torch.device): + symplectic_integrator = init_integrator(dtype, device, torch.sign, False) symplectic_integrator.position = torch.tensor( - [ - [-0.7894, -0.4610], - [-0.2343, 0.9186], - [-0.2191, 0.2018], - ], - dtype=torch.float32, + [[-2.7894, -0.4610], [-1.2343, 1.9186], [-0.2191, 0.2018]], + dtype=dtype, + device=device, ) symplectic_integrator.momentum = torch.tensor( - [ - [-0.4869, 0.5873], - [0.8815, -0.7336], - [0.8692, 0.1872], - ], - dtype=torch.float32, + [[-0.4869, 0.5873], [0.8815, -0.7336], [0.8692, 0.1872]], + dtype=dtype, + device=device, ) - symplectic_integrator.quadratic_momentum_update( - 0.2, + symplectic_integrator.integration_step() + assert_tensors_equality( torch.tensor( - [ - [0, 0.2, 0.3], - [0.2, 0, 0.1], - [0.3, 0.1, 0], - ], - dtype=torch.float32, + [[-1.0000, -0.3960], [-1.0000, 1.0000], [-0.1400, 0.2227]], + dtype=dtype, + device=device, ), + symplectic_integrator.position, ) - assert torch.all( - torch.isclose( - symplectic_integrator.momentum, - torch.tensor( - [ - [-0.5094, 0.6362], - [0.8455, -0.7480], - [0.8171, 0.1779], - ], - dtype=torch.float32, - ), - atol=1e-4, - ) + assert_tensors_equality( + torch.tensor( + [[0.0000, 0.6502], [0.0000, 0.0000], [0.7906, 0.2089]], + dtype=dtype, + device=device, + ), + symplectic_integrator.momentum, ) -def test_inelastic_walls_simulation(): - symplectic_integrator = SymplecticIntegrator( - (3, 2), torch.nn.Identity(), torch.float32, "cpu" - ) +@pytest.mark.parametrize( + "dtype, device", [(dtype, device) for dtype in DTYPES for device in DEVICES] +) +def test_full_ballistic_step_with_heating(dtype: torch.dtype, device: torch.device): + symplectic_integrator = init_integrator(dtype, device, torch.nn.Identity(), True) symplectic_integrator.position = torch.tensor( - [ - [-2.7894, -0.4610], - [-1.2343, 1.9186], - [-0.2191, 0.2018], - ], - dtype=torch.float32, + [[-2.7894, -0.4610], [-1.2343, 1.9186], [-0.2191, 0.2018]], + dtype=dtype, + device=device, ) symplectic_integrator.momentum = torch.tensor( - [ - [-0.4869, 0.5873], - [0.8815, -0.7336], - [0.8692, 0.1872], - ], - dtype=torch.float32, + [[-0.4869, 0.5873], [0.8815, -0.7336], [0.8692, 0.1872]], + dtype=dtype, + device=device, ) - symplectic_integrator.simulate_inelastic_walls() - assert torch.all( - torch.isclose( - symplectic_integrator.position, - torch.tensor( - [ - [-1, -0.4610], - [-1, 1], - [-0.2191, 0.2018], - ], - dtype=torch.float32, - ), - atol=1e-4, - ) + symplectic_integrator.integration_step() + assert_tensors_equality( + torch.tensor( + [[-1.0000, -0.3960], [-1.0000, 1.0000], [-0.1431, 0.2243]], + dtype=dtype, + device=device, + ), + symplectic_integrator.position, ) - assert torch.all( - torch.isclose( - symplectic_integrator.momentum, - torch.tensor( - [ - [0, 0.5873], - [0, 0], - [0.8692, 0.1872], - ], - dtype=torch.float32, - ), - atol=1e-4, - ) + assert_tensors_equality( + torch.tensor( + [[-0.0029, 0.6536], [0.0053, -0.0044], [0.7649, 0.2265]], + dtype=dtype, + device=device, + ), + symplectic_integrator.momentum, ) -def test_full_step(): - symplectic_integrator = SymplecticIntegrator( - (3, 2), torch.nn.Identity(), torch.float32, "cpu" - ) +@pytest.mark.parametrize( + "dtype, device", [(dtype, device) for dtype in DTYPES for device in DEVICES] +) +def test_full_discrete_step_with_heating(dtype: torch.dtype, device: torch.device): + symplectic_integrator = init_integrator(dtype, device, torch.sign, True) symplectic_integrator.position = torch.tensor( - [ - [-2.7894, -0.4610], - [-1.2343, 1.9186], - [-0.2191, 0.2018], - ], - dtype=torch.float32, + [[-2.7894, -0.4610], [-1.2343, 1.9186], [-0.2191, 0.2018]], + dtype=dtype, + device=device, ) symplectic_integrator.momentum = torch.tensor( - [ - [-0.4869, 0.5873], - [0.8815, -0.7336], - [0.8692, 0.1872], - ], - dtype=torch.float32, + [[-0.4869, 0.5873], [0.8815, -0.7336], [0.8692, 0.1872]], + dtype=dtype, + device=device, ) - symplectic_integrator.step( - 0.2, - 0.2, - 0.2, + symplectic_integrator.integration_step() + assert_tensors_equality( torch.tensor( - [ - [0, 0.2, 0.3], - [0.2, 0, 0.1], - [0.3, 0.1, 0], - ], - dtype=torch.float32, + [[-1.0000, -0.3960], [-1.0000, 1.0000], [-0.1400, 0.2227]], + dtype=dtype, + device=device, ), + symplectic_integrator.position, ) - assert torch.all( - torch.isclose( - symplectic_integrator.position, - torch.tensor( - [ - [-1, -0.3620], - [-1, 1], - [-0.0540, 0.2473], - ], - dtype=torch.float32, - ), - atol=1e-4, - ) - ) - assert torch.all( - torch.isclose( - symplectic_integrator.momentum, - torch.tensor( - [ - [0, 0.5839], - [0, 0], - [0.6233, 0.2428], - ], - dtype=torch.float32, - ), - atol=1e-4, - ) + assert_tensors_equality( + torch.tensor( + [[-0.0029, 0.6536], [0.0053, -0.0044], [0.7958, 0.2100]], + dtype=dtype, + device=device, + ), + symplectic_integrator.momentum, ) diff --git a/tests/test_utils.py b/tests/test_utils.py index 6ad4e263..e9890dcc 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,5 +1,7 @@ import torch +BOOLEANS = [True, False] + DTYPES = [torch.float32, torch.float64] DEVICES = (