From 7688f6f06a0ab4089c6e9404afa8d917fad4b23a Mon Sep 17 00:00:00 2001 From: Rosa Bulo Date: Fri, 16 Jan 2026 13:19:23 +0100 Subject: [PATCH 1/2] Rosa Bulo (REB) SCMSUITE-- SO107: Fixed bug mdanalysis recipes - Fixed bug in AMSConvenientAnalysisJob classes introduced when made compatible with pisa * atom_indices passed as arguments were no longer used * AMSRDFJob failed always - Added tests for AMSConvenientAnalysisJob classes --- .../plams/recipes/md/trajectoryanalysis.py | 89 +++++-- unit_tests/conftest.py | 2 +- unit_tests/test_mdanalysis.py | 244 ++++++++++++++++++ 3 files changed, 307 insertions(+), 28 deletions(-) create mode 100644 unit_tests/test_mdanalysis.py diff --git a/src/scm/plams/recipes/md/trajectoryanalysis.py b/src/scm/plams/recipes/md/trajectoryanalysis.py index 172c630e6..c448f7d11 100644 --- a/src/scm/plams/recipes/md/trajectoryanalysis.py +++ b/src/scm/plams/recipes/md/trajectoryanalysis.py @@ -15,6 +15,12 @@ __all__ = ["AMSRDFJob", "AMSMSDJob", "AMSMSDResults", "AMSVACFJob", "AMSVACFResults"] +class MDAnalysisSettingsError(Exception): + """ + Error in supplied settings object + """ + + class AMSConvenientAnalysisJob(AMSAnalysisJob): _task = "None" @@ -32,6 +38,7 @@ def __init__(self, previous_job=None, atom_indices=None, **kwargs): # needs to self.previous_job = previous_job self.atom_indices = atom_indices + self._settings_updated = False def _get_max_dt_frames(self, max_correlation_time_fs): if max_correlation_time_fs is None: @@ -57,11 +64,17 @@ def get_input(self): Generate the input file """ self._settings_to_list(self.settings.input, self._task) - if self.atom_indices and self._parent_write_atoms: - section = getattr(self.settings.input, self._task) - for entry in section: - if not (self._has_settings_entry(entry, "Atoms") and self._has_settings_entry(entry.Atoms, "Atom")): - self._add_nonunique_settings_entries(entry.Atoms.Atom, self.atom_indices) + + if not self._settings_updated: + if self.atom_indices and self._parent_write_atoms: + section = getattr(self.settings.input, self._task) + for entry in section: + if not (self._has_settings_entry(entry, "Atoms") and self._has_settings_entry(entry.Atoms, "Atom")): + self._add_nonunique_settings_entries(entry.Atoms, "Atom", self.atom_indices) + else: + msg = "Atom indices cannot be supplied as argument if already present in settings" + raise MDAnalysisSettingsError(msg) + self._settings_updated = True return super().get_input() @staticmethod @@ -100,7 +113,7 @@ def _has_settings_entry(settings, entry): return getattr(settings, entry).value_changed @staticmethod - def _add_nonunique_settings_entries(settings, entries): + def _add_nonunique_settings_entries(settings, key, entries): """ For non-default entries, multiple entries can be supplied @@ -109,9 +122,10 @@ def _add_nonunique_settings_entries(settings, entries): """ if not isinstance(settings, Settings): for i, entry in enumerate(entries): - settings[i] = entry + subsettings = getattr(settings, key) + subsettings[i] = entry else: - settings = entries + settings[key] = entries def _parent_prerun(self): """ @@ -655,15 +669,45 @@ def get_input(self): """ self._settings_to_list(self.settings.input, self._task) - for settings in self.settings.input.RadialDistribution: - if not self._has_settings_entry(settings, "AtomsFrom"): - if self.atom_indices: - settings.AtomsFrom.Atom = self.atom_indices - if not self._has_settings_entry(settings, "AtomsTo"): - if self.atom_indices_to: - settings.Atom = self.atom_indices_to - if not self._has_settings_entry(settings, "Range"): - settings.Range = f"{self.rmin} {self.rmax} {self.rstep}" + if not self._settings_updated: + prevjobs = self.previous_job + if not isinstance(prevjobs, list): + prevjobs = [prevjobs] + main_mol = prevjobs[0].results.get_main_molecule() + + for settings in self.settings.input.RadialDistribution: + # If no atom iindices were provided, and there is nothing in the settings at all, add them + atom_indices = self.atom_indices + atom_indices_to = self.atom_indices_to + if not atom_indices: + if not self._has_settings_entry(settings, "AtomsFrom"): + atom_indices = list(range(1, len(main_mol) + 1)) + if not atom_indices_to: + if not self._has_settings_entry(settings, "AtomsTo"): + atom_indices_to = list(range(1, len(main_mol) + 1)) + + # Add the atom indices only if there were no atom indices in the settings (if elements are present, add indices) + if atom_indices: + atoms_from_present = self._has_settings_entry(settings, "AtomsFrom") + atom_present = atoms_from_present and self._has_settings_entry(settings.AtomsFrom, "Atom") + if not atom_present: + self._add_nonunique_settings_entries(settings.AtomsFrom, "Atom", atom_indices) + else: + msg = "Atom indices cannot be supplied as argument if already present in settings" + raise MDAnalysisSettingsError(msg) + if atom_indices_to: + atoms_to_present = self._has_settings_entry(settings, "AtomsTo") + atom_present = atoms_to_present and self._has_settings_entry(settings.AtomsTo, "Atom") + if not atom_present: + self._add_nonunique_settings_entries(settings.AtomsTo, "Atom", atom_indices_to) + else: + msg = "Atom indices cannot be supplied as argument if already present in settings" + raise MDAnalysisSettingsError(msg) + if not self._has_settings_entry(settings, "Range"): + if isinstance(settings, Settings): + settings.Range = f"{self.rmin} {self.rmax} {self.rstep}" + else: + settings.Range = [self.rmin, self.rmax, self.rstep] return super().get_input() def prerun(self): # noqa F811 @@ -673,16 +717,7 @@ def prerun(self): # noqa F811 self._parent_prerun() self._settings_to_list(self.settings.input, self._task) - prevjobs = self.previous_jobs - if not isinstance(prevjobs, list): - prevjobs = [prevjobs] - main_mol = prevjobs[0].results.get_main_molecule() - - if not self.atom_indices: - self.atom_indices = list(range(1, len(main_mol) + 1)) - if not self.atom_indices_to: - self.atom_indices_to = list(range(1, len(main_mol) + 1)) - # Settings will be adjusted when get_input is called (again). + # Atom_indices will be adjusted when get_input is called (again). def postrun(self): """ diff --git a/unit_tests/conftest.py b/unit_tests/conftest.py index 87ad9f212..c45b7bd11 100644 --- a/unit_tests/conftest.py +++ b/unit_tests/conftest.py @@ -30,7 +30,7 @@ def config(): _finish() -@pytest.fixture +@pytest.fixture(scope="session") def xyz_folder(): """ Returns the path to the XYZ folder diff --git a/unit_tests/test_mdanalysis.py b/unit_tests/test_mdanalysis.py new file mode 100644 index 000000000..4a4882d39 --- /dev/null +++ b/unit_tests/test_mdanalysis.py @@ -0,0 +1,244 @@ +import os +import numpy as np +import pytest + +from scm.plams import Settings +from scm.plams import Molecule +from scm.plams import AMSJob +from scm.plams import AMSMSDJob +from scm.plams import AMSRDFJob + +from test_helpers import skip_if_no_ams_installation +from test_helpers import skip_if_no_scm_pisa + + +@pytest.fixture(scope="session") +def mdjob(tmp_path_factory, xyz_folder): + """ + Create AMSJob for an MD simulation (hopefully once per session) + """ + skip_if_no_ams_installation() + pwd = os.getcwd() + mol = Molecule(xyz_folder / "water_box.xyz") + + dirname = tmp_path_factory.mktemp("data") + os.chdir(dirname.as_posix()) + + s = Settings() + s.input.ams.Task = "MolecularDynamics" + s.input.ReaxFF.ForceField = "Water2017.ff" + s.input.ams.RNGSeed = "1 2 3 4 5 6 7 8 9" + s.input.ams.MolecularDynamics.CalcPressure = "Yes" + s.input.ams.MolecularDynamics.InitialVelocities.Temperature = 300 + s.input.ams.MolecularDynamics.Trajectory.SamplingFreq = 1 + s.input.ams.MolecularDynamics.TimeStep = 0.5 + s.input.ams.MolecularDynamics.NSteps = 200 + + s.runscript.nproc = 1 + os.environ["OMP_NUM_THREADS"] = "1" + job = AMSJob(settings=s, molecule=mol, name="md") + job.run() + os.chdir(pwd) + + return job + + +class TestMSDJob: + """ + Test of the AMS Job representing a MeanSquareDisplacement calculation + """ + + def test_default_use(self, mdjob): + """ + Test plainest use of AMSMSDJob + """ + mol = mdjob.results.get_main_molecule() + oxygens = [i + 1 for i, at in enumerate(mol.atoms) if at.symbol == "O"] + msd_job = AMSMSDJob(mdjob, atom_indices=oxygens, start_time_fit_fs=20) + + # Check the input + txt = msd_job.get_input() + for iat in oxygens: + assert "Atom %i" % (iat) in txt + + # Run the job and check output + msd_job.run() + D = msd_job.results.get_diffusion_coefficient() + assert abs(D - 2.7430808506924798e-08) < 1e-18 + + def test_settings_use(self, mdjob): + """ + Test AMSMSDJob with settings object + """ + s = Settings() + s.input.MeanSquareDisplacement.Atoms.Element = "O" + msd_job = AMSMSDJob(mdjob, settings=s, start_time_fit_fs=20) + + # Check the input + txt = msd_job.get_input() + assert "Element O" in txt + + # Run the job and check output + msd_job.run() + D = msd_job.results.get_diffusion_coefficient() + assert abs(D - 2.7430808506924798e-08) < 1e-18 + + def test_pisa_use(self, mdjob): + """ + Test AMSMSDJob with Pisa object + """ + skip_if_no_scm_pisa() + from scm.input_classes import Analysis + + sets = Analysis() + sets.MeanSquareDisplacement.Atoms.Element = "O" + msd_job = AMSMSDJob(mdjob, settings=sets, start_time_fit_fs=20) + + # Check the input + txt = msd_job.get_input() + assert "Element O" in txt + + # Run the job and check output + msd_job.run() + D = msd_job.results.get_diffusion_coefficient() + assert abs(D - 2.7430808506924798e-08) < 1e-18 + + +class TestRDFJob: + """ + Test of the AMS Job representing a RadialDistribution calculation + """ + + def test_default_use(self, mdjob): + """ + Test plainest use of AMSRDFJob + """ + mol = mdjob.results.get_main_molecule() + nats = len(mol) + rdf_job = AMSRDFJob(mdjob) + + # Check the input + txt = rdf_job.get_input() + for iat in range(nats): + assert "Atom %i" % (iat + 1) in txt + + # Run the job and check output + rdf_job.run() + x, rdf = rdf_job.results.get_rdf() + ind = rdf.argmax() + peak = x[ind] + assert abs(peak - 0.9074074074074073) < 1e-8 + + def test_indices_use(self, mdjob): + """ + Test AMSRDFJob supplying atom indices as arguments + """ + mol = mdjob.results.get_main_molecule() + oxygens = [i + 1 for i, at in enumerate(mol.atoms) if at.symbol == "O"] + rdf_job = AMSRDFJob(mdjob, atom_indices=oxygens, atom_indices_to=oxygens) + + # Check the input + txt = rdf_job.get_input() + for iat in oxygens: + assert "Atom %i" % (iat) in txt + assert not "Atom 2\n" in txt + + # Run the job and check output + rdf_job.run() + x, rdf = rdf_job.results.get_rdf() + ind = rdf.argmax() + peak = x[ind] + assert abs(peak - 2.7407407407407405) < 1e-8 + + def test_settings_use(self, mdjob): + """ + Test AMSRDFJob with a Settings object + """ + mol = mdjob.results.get_main_molecule() + oxygens = [i + 1 for i, at in enumerate(mol.atoms) if at.symbol == "O"] + + s = Settings() + s.input.RadialDistribution.AtomsFrom.Atom = oxygens + s.input.RadialDistribution.AtomsTo.Atom = oxygens + + rdf_job = AMSRDFJob(mdjob, atom_indices=oxygens, atom_indices_to=oxygens, settings=s) + + # Check the input + failed = False + try: + txt = rdf_job.get_input() + except Exception: + failed = True + assert failed + + # Now do this correctly + rdf_job = AMSRDFJob(mdjob, settings=s) + txt = rdf_job.get_input() + for iat in oxygens: + assert "Atom %i" % (iat) in txt + assert not "Atom 2\n" in txt + + # Run the job and check output + rdf_job.run() + x, rdf = rdf_job.results.get_rdf() + ind = rdf.argmax() + peak = x[ind] + assert abs(peak - 2.7407407407407405) < 1e-8 + + def test_pisa_use(self, mdjob): + """ + Test AMSRDFJob with Pisa settings + """ + skip_if_no_scm_pisa() + from scm.input_classes import Analysis + + sets = Analysis() + sets.RadialDistribution.AtomsFrom.Element = "O" + sets.RadialDistribution.AtomsTo.Element = "O" + + rdf_job = AMSRDFJob(mdjob, settings=sets) + txt = rdf_job.get_input() + assert "Element O" in txt + + # Run the job and check output + rdf_job.run() + x, rdf = rdf_job.results.get_rdf() + ind = rdf.argmax() + peak = x[ind] + assert abs(peak - 2.7407407407407405) < 1e-8 + + def test_multiple_rdfs(self, mdjob): + """ + Test using multiple RadialDistribution blocks in the settings + """ + mol = mdjob.results.get_main_molecule() + oxygens = [i + 1 for i, at in enumerate(mol.atoms) if at.symbol == "O"] + + # Create the settings with multiply RadialDistribution blocks + s = Settings() + subsettings = [Settings(), Settings()] + subsettings[0].AtomsFrom.Atom = oxygens + subsettings[0].AtomsTo.Atom = oxygens + s.input.RadialDistribution = subsettings + + # Create the RDFJob, test supplying the job as a list, and then check the input + rdf_job = AMSRDFJob([mdjob], settings=s) + txt = rdf_job.get_input() + for iat in oxygens: + assert "Atom %i" % (iat) in txt + + # Run the job and check output + rdf_job.run() + + # Get the peak from the first RDF + x, rdf = rdf_job.results.get_rdf() + ind = rdf.argmax() + peak = x[ind] + assert abs(peak - 2.7407407407407405) < 1e-8 + + # Get the peak from the second RDF + xy = rdf_job.results.get_xy(i=2) + x = np.array(xy.x[0]) + rdf = np.array(xy.y) + peak = x[rdf.argmax()] + assert abs(peak - 0.9074074074074073) < 1e-8 From 38a9fca919edb0ee7130f92ce544ab4003d366f9 Mon Sep 17 00:00:00 2001 From: Rosa Bulo Date: Tue, 20 Jan 2026 13:59:26 +0100 Subject: [PATCH 2/2] Rosa Bulo (REB) SCMSUITE-- SO107: Added tests for VACF and Viscosity jobs --- .../plams/interfaces/adfsuite/amsanalysis.py | 8 +++- .../plams/recipes/md/trajectoryanalysis.py | 4 +- unit_tests/test_mdanalysis.py | 45 +++++++++++++++++++ 3 files changed, 53 insertions(+), 4 deletions(-) diff --git a/src/scm/plams/interfaces/adfsuite/amsanalysis.py b/src/scm/plams/interfaces/adfsuite/amsanalysis.py index 6aa5759a5..224ba6dd5 100644 --- a/src/scm/plams/interfaces/adfsuite/amsanalysis.py +++ b/src/scm/plams/interfaces/adfsuite/amsanalysis.py @@ -239,10 +239,14 @@ def get_D(self, i: int = 1) -> Tuple[Optional[float], Optional[str]]: return None, None section = sections[i - 1] plot = self.get_xy(section.split("(")[0], i) - if not plot.properties or "DiffusionCoefficient" not in plot.properties.keys(): + if not plot.properties or "Final" not in plot.properties.keys(): + return None, None + if "Legend" not in plot.properties.keys() or not isinstance(plot.properties["Legend"], str): + return None, None + if "Diffusion" not in plot.properties["Legend"]: return None, None - D = cast(float, plot.properties["DiffusionCoefficient"]) + D = cast(float, plot.properties["Final"]) D_units = plot.y_units return D, D_units diff --git a/src/scm/plams/recipes/md/trajectoryanalysis.py b/src/scm/plams/recipes/md/trajectoryanalysis.py index c448f7d11..3acd1d334 100644 --- a/src/scm/plams/recipes/md/trajectoryanalysis.py +++ b/src/scm/plams/recipes/md/trajectoryanalysis.py @@ -473,7 +473,7 @@ def get_input(self): """ Generate the input file """ - self.settings_to_list() + self._settings_to_list(self.settings.input, self._task) for settings in self.settings.input.AutoCorrelation: settings.Property = "Velocities" @@ -484,7 +484,7 @@ def prerun(self): # noqa F811 Creates final settings """ self._parent_prerun() # trajectory and atom_indices handled - self.settings_to_list() + self._settings_to_list(self.settings.input, self._task) for settings in self.settings.input.AutoCorrelation: if not self._has_settings_entry(settings, "MaxFrame"): diff --git a/unit_tests/test_mdanalysis.py b/unit_tests/test_mdanalysis.py index 4a4882d39..da835b801 100644 --- a/unit_tests/test_mdanalysis.py +++ b/unit_tests/test_mdanalysis.py @@ -7,6 +7,8 @@ from scm.plams import AMSJob from scm.plams import AMSMSDJob from scm.plams import AMSRDFJob +from scm.plams import AMSVACFJob +from scm.plams.recipes.md.trajectoryanalysis import AMSViscosityFromBinLogJob from test_helpers import skip_if_no_ams_installation from test_helpers import skip_if_no_scm_pisa @@ -33,6 +35,8 @@ def mdjob(tmp_path_factory, xyz_folder): s.input.ams.MolecularDynamics.Trajectory.SamplingFreq = 1 s.input.ams.MolecularDynamics.TimeStep = 0.5 s.input.ams.MolecularDynamics.NSteps = 200 + s.input.ams.MolecularDynamics.BinLog.Step = "Yes" + s.input.ams.MolecularDynamics.BinLog.PressureTensor = "Yes" s.runscript.nproc = 1 os.environ["OMP_NUM_THREADS"] = "1" @@ -242,3 +246,44 @@ def test_multiple_rdfs(self, mdjob): rdf = np.array(xy.y) peak = x[rdf.argmax()] assert abs(peak - 0.9074074074074073) < 1e-8 + + +class TestVACFJob: + """ + Test of the AMS Job representing a velocity autocorrelation function calculation + """ + + def test_default_use(self, mdjob): + """ + Test plainest use of AMSVACFJob + """ + mol = mdjob.results.get_main_molecule() + oxygens = [i + 1 for i, at in enumerate(mol.atoms) if at.symbol == "O"] + vacf_job = AMSVACFJob(mdjob, atom_indices=oxygens) + + # Check the input + txt = vacf_job.get_input() + for iat in oxygens: + assert "Atom %i" % (iat) in txt + + # Run the job and check output + vacf_job.run() + D, units = vacf_job.results.get_D() + assert abs(D - 3.2882389727568384e-08) < 1e-18 + + +class TestViscosityFromBinLogJob: + """ + Test of the AMS Job representing a velocity autocorrelation function calculation + """ + + def test_default_use(self, mdjob): + """ + Test plainest use of AMSVACFJob + """ + visc_job = AMSViscosityFromBinLogJob(mdjob) + + # Run the job and check output + visc_job.run() + visc = visc_job.results.get_double_exponential_fit()[-1][-1] + assert abs(visc - 8.85511238380137e-05) < 1e-18