diff --git a/.gitignore b/.gitignore index 9132dc1f86..2114c08132 100644 --- a/.gitignore +++ b/.gitignore @@ -70,3 +70,5 @@ build/* # AI Agent files AGENTS.md + +spec.md diff --git a/arc/job/adapter.py b/arc/job/adapter.py index aecaf065a1..977aa9df5b 100644 --- a/arc/job/adapter.py +++ b/arc/job/adapter.py @@ -82,6 +82,7 @@ class JobEnum(str, Enum): - gan # Generative adversarial networks, https://doi.org/10.1063/5.0055094 """ # ESS + ase = 'ase' cfour = 'cfour' gaussian = 'gaussian' mockter = 'mockter' diff --git a/arc/job/adapters/__init__.py b/arc/job/adapters/__init__.py index 1d0949a935..6c0821c54f 100644 --- a/arc/job/adapters/__init__.py +++ b/arc/job/adapters/__init__.py @@ -1,4 +1,5 @@ import arc.job.adapters.common +import arc.job.adapters.ase import arc.job.adapters.cfour import arc.job.adapters.gaussian import arc.job.adapters.mockter diff --git a/arc/job/adapters/ase.py b/arc/job/adapters/ase.py new file mode 100644 index 0000000000..6f30983c40 --- /dev/null +++ b/arc/job/adapters/ase.py @@ -0,0 +1,241 @@ +""" +An adapter for executing ASE (Atomic Simulation Environment) jobs +""" + +import datetime +import os +import subprocess +from typing import TYPE_CHECKING, List, Optional, Tuple, Union + +from arc.common import get_logger, read_yaml_file, save_yaml_file +from arc.job.adapter import JobAdapter +from arc.job.adapters.common import _initialize_adapter +from arc.job.factory import register_job_adapter +from arc.imports import settings +from arc.settings.settings import ARC_PYTHON, find_executable + +if TYPE_CHECKING: + from arc.level import Level + from arc.species.species import ARCSpecies + from arc.reaction import ARCReaction + +logger = get_logger() + +# Default mapping if not yet fully defined in settings.py +DEFAULT_ASE_ENV = { + 'torchani': 'TANI_PYTHON', + 'xtb': 'XTB_PYTHON', +} + +class ASEAdapter(JobAdapter): + """ + A generic adapter for ASE (Atomic Simulation Environment) jobs. + Supports multiple calculators and environments. + """ + def __init__(self, + project: str, + project_directory: str, + job_type: Union[List[str], str], + args: Optional[dict] = None, + bath_gas: Optional[str] = None, + checkfile: Optional[str] = None, + conformer: Optional[int] = None, + constraints: Optional[List[Tuple[List[int], float]]] = None, + cpu_cores: Optional[str] = None, + dihedral_increment: Optional[float] = None, + dihedrals: Optional[List[float]] = None, + directed_scan_type: Optional[str] = None, + ess_settings: Optional[dict] = None, + ess_trsh_methods: Optional[List[str]] = None, + execution_type: Optional[str] = None, + fine: bool = False, + initial_time: Optional[Union[datetime.datetime, str]] = None, + irc_direction: Optional[str] = None, + job_id: Optional[int] = None, + job_memory_gb: float = 14.0, + job_name: Optional[str] = None, + job_num: Optional[int] = None, + job_server_name: Optional[str] = None, + job_status: Optional[List[Union[dict, str]]] = None, + level: Optional['Level'] = None, + max_job_time: Optional[float] = None, + run_multi_species: bool = False, + reactions: Optional[List['ARCReaction']] = None, + rotor_index: Optional[int] = None, + server: Optional[str] = None, + server_nodes: Optional[list] = None, + queue: Optional[str] = None, + attempted_queues: Optional[List[str]] = None, + species: Optional[List['ARCSpecies']] = None, + testing: bool = False, + times_rerun: int = 0, + torsions: Optional[List[List[int]]] = None, + tsg: Optional[int] = None, + xyz: Optional[dict] = None, + ): + + self.job_adapter = 'ase' + self.execution_type = execution_type or 'incore' + self.incore_capacity = 100 + + self.sp = None + self.opt_xyz = None + self.freqs = None + + self.args = args or dict() + self.python_executable = self.get_python_executable() + self.script_path = os.path.join(os.path.dirname(__file__), 'scripts', 'ase_script.py') + + _initialize_adapter(obj=self, + is_ts=False, + project=project, + project_directory=project_directory, + job_type=job_type, + args=args, + bath_gas=bath_gas, + checkfile=checkfile, + conformer=conformer, + constraints=constraints, + cpu_cores=cpu_cores, + dihedral_increment=dihedral_increment, + dihedrals=dihedrals, + directed_scan_type=directed_scan_type, + ess_settings=ess_settings, + ess_trsh_methods=ess_trsh_methods, + fine=fine, + initial_time=initial_time, + irc_direction=irc_direction, + job_id=job_id, + job_memory_gb=job_memory_gb, + job_name=job_name, + job_num=job_num, + job_server_name=job_server_name, + job_status=job_status, + level=level, + max_job_time=max_job_time, + run_multi_species=run_multi_species, + reactions=reactions, + rotor_index=rotor_index, + server=server, + server_nodes=server_nodes, + queue=queue, + attempted_queues=attempted_queues, + species=species, + testing=testing, + times_rerun=times_rerun, + torsions=torsions, + tsg=tsg, + xyz=xyz, + ) + + def get_python_executable(self) -> str: + """ + Identify the correct Python executable based on the calculator. + """ + calc = self.args.get('keyword', {}).get('calculator', '').lower() + env_mapping = settings.get('ASE_CALCULATORS_ENV', DEFAULT_ASE_ENV) + env_var_name = env_mapping.get(calc) + + if env_var_name and env_var_name in settings: + exe = settings[env_var_name] + if exe: + return exe + + # Fallback to calculator-specific env if it exists + found_exe = find_executable(f'{calc}_env') + if found_exe: + return found_exe + + return ARC_PYTHON or 'python' + + def write_input_file(self) -> None: + """ + Write the input file for ase_script.py. + """ + input_dict = { + 'job_type': self.job_type, + 'xyz': self.xyz, + 'charge': self.charge, + 'multiplicity': self.multiplicity, + 'constraints': self.constraints, + 'settings': self.args.get('keyword', {}), + } + save_yaml_file(os.path.join(self.local_path, 'input.yml'), input_dict) + + def execute_incore(self) -> None: + """ + Execute the job incore. + """ + self.write_input_file() + cmd = [self.python_executable, self.script_path, '--yml_path', self.local_path] + process = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + if process.returncode != 0: + logger.error(f"ASE job failed incore:\n{process.stderr}") + self.parse_results() + + def execute_queue(self) -> None: + """ + Execute a job to the server's queue. + """ + self.write_input_file() + self.write_submit_script() + self.set_files() + if self.server_adapter is not None: + for file_dict in self.files_to_upload: + self.server_adapter.upload_file(remote_path=file_dict['remote'], + local_path=file_dict['local']) + self.server_adapter.submit_job(self.remote_path) + + def set_files(self) -> None: + """ + Set files to be uploaded and downloaded. + """ + # 1. Upload + if self.execution_type != 'incore': + self.files_to_upload.append(self.get_file_property_dictionary(file_name='submit.sh')) + self.files_to_upload.append(self.get_file_property_dictionary(file_name='input.yml')) + self.files_to_upload.append(self.get_file_property_dictionary(file_name='ase_script.py', + local=self.script_path)) + # 2. Download + self.files_to_download.append(self.get_file_property_dictionary(file_name='output.yml')) + + def set_additional_file_paths(self) -> None: + """ + Set additional file paths specific for the adapter. + """ + pass + + def set_input_file_memory(self) -> None: + """ + Set the input_file_memory attribute. + """ + pass + + def write_submit_script(self) -> None: + """ + Write the submission script. + """ + remote_script_path = os.path.join(self.remote_path, 'ase_script.py') + command = f"{self.python_executable} {remote_script_path} --yml_path {self.remote_path}" + content = f"#!/bin/bash\n\n{command}\n" + with open(os.path.join(self.local_path, 'submit.sh'), 'w') as f: + f.write(content) + + def parse_results(self) -> None: + """ + Parse the output.yml generated by ase_script.py. + """ + out_path = os.path.join(self.local_path, 'output.yml') + if os.path.isfile(out_path): + results = read_yaml_file(out_path) + self.electronic_energy = results.get('sp') + self.xyz_out = results.get('opt_xyz') or results.get('xyz') + self.frequencies = results.get('freqs') + self.hessian = results.get('hessian') + self.normal_modes = results.get('modes') + self.reduced_masses = results.get('reduced_masses') + self.force_constants = results.get('force_constants') + if 'error' in results: + logger.error(f"ASE job error: {results['error']}") + +register_job_adapter('ase', ASEAdapter) diff --git a/arc/job/adapters/ase_test.py b/arc/job/adapters/ase_test.py new file mode 100644 index 0000000000..6da4b4d321 --- /dev/null +++ b/arc/job/adapters/ase_test.py @@ -0,0 +1,160 @@ +#!/usr/bin/env python3 +# encoding: utf-8 + +""" +This module contains unit tests for the arc.job.adapters.ase module. +These tests verify IO and logic without executing the external ASE script in CI. +""" + +import os +import shutil +import unittest +from unittest.mock import patch +import numpy as np + +from arc.common import ARC_TESTING_PATH, read_yaml_file, save_yaml_file +from arc.job.adapters.ase import ASEAdapter +from arc.species.species import ARCSpecies +from arc.job.adapters.scripts.ase_script import to_kJmol, numpy_vibrational_analysis + + +class TestASEAdapter(unittest.TestCase): + """ + Contains unit tests for the ASEAdapter class and ase_script utility functions. + """ + + @classmethod + def setUpClass(cls): + """ + A method that is run before all unit tests in this class. + """ + cls.maxDiff = None + cls.project_directory = os.path.join(ARC_TESTING_PATH, 'test_ASEAdapter') + if not os.path.exists(cls.project_directory): + os.makedirs(cls.project_directory) + + xyz = {'symbols': ('O', 'H', 'H'), + 'isotopes': (16, 1, 1), + 'coords': ((0.0, 0.0, 0.0), + (0.0, 0.75, 0.58), + (0.0, -0.75, 0.58))} + + cls.job_1 = ASEAdapter(execution_type='incore', + job_type='sp', + project='test_1', + project_directory=os.path.join(cls.project_directory, 'test_1'), + species=[ARCSpecies(label='H2O', xyz=xyz)], + args={'keyword': {'calculator': 'torchani', 'model': 'ANI2x'}}, + testing=True) + + cls.job_2 = ASEAdapter(execution_type='queue', + job_type='opt', + project='test_2', + project_directory=os.path.join(cls.project_directory, 'test_2'), + species=[ARCSpecies(label='H2O', xyz=xyz)], + args={'keyword': {'calculator': 'xtb', 'method': 'GFN2-xTB'}}, + testing=True) + + cls.job_1.local_path = os.path.join(cls.project_directory, 'test_1') + cls.job_2.local_path = os.path.join(cls.project_directory, 'test_2') + cls.job_2.remote_path = '/path/to/remote' + os.makedirs(cls.job_1.local_path, exist_ok=True) + os.makedirs(cls.job_2.local_path, exist_ok=True) + + def test_get_python_executable(self): + """Test resolving the python executable environment""" + with patch('arc.job.adapters.ase.settings', {'TANI_PYTHON': '/path/to/tani_python'}): + exe = self.job_1.get_python_executable() + self.assertEqual(exe, '/path/to/tani_python') + + with patch('arc.job.adapters.ase.settings', {'XTB_PYTHON': '/path/to/xtb_python'}): + exe = self.job_2.get_python_executable() + self.assertEqual(exe, '/path/to/xtb_python') + + def test_write_input_file(self): + """Test writing the YAML input file for the ASE script""" + self.job_1.write_input_file() + input_path = os.path.join(self.job_1.local_path, 'input.yml') + self.assertTrue(os.path.isfile(input_path)) + data = read_yaml_file(input_path) + self.assertEqual(data['job_type'], 'sp') + self.assertEqual(data['settings']['calculator'], 'torchani') + self.assertEqual(data['settings']['model'], 'ANI2x') + self.assertEqual(data['xyz']['symbols'], ('O', 'H', 'H')) + + def test_write_submit_script(self): + """Test writing the submission script for queue execution""" + self.job_2.python_executable = '/fake/python' + self.job_2.write_submit_script() + submit_path = os.path.join(self.job_2.local_path, 'submit.sh') + self.assertTrue(os.path.isfile(submit_path)) + with open(submit_path, 'r') as f: + content = f.read() + self.assertIn('/fake/python', content) + self.assertIn('--yml_path /path/to/remote', content) + self.assertIn('ase_script.py', content) + + def test_set_files(self): + """Test properly assigning upload and download files""" + self.job_2.set_files() + self.assertTrue(any('submit.sh' in f['local'] for f in self.job_2.files_to_upload)) + self.assertTrue(any('input.yml' in f['local'] for f in self.job_2.files_to_upload)) + self.assertTrue(any('ase_script.py' in f['local'] for f in self.job_2.files_to_upload)) + self.assertTrue(any('output.yml' in f['local'] for f in self.job_2.files_to_download)) + + def test_parse_results(self): + """Test parsing dummy output YAML back into object attributes""" + output_data = { + 'sp': -76.0, + 'opt_xyz': {'symbols': ('O', 'H', 'H'), 'coords': ((0.0, 0.0, 0.0), (0.0, 0.76, 0.59), (0.0, -0.76, 0.59))}, + 'freqs': [1500.0, 3600.0, 3700.0], + 'modes': [[[0.0, 0.0, 0.1]]], + 'reduced_masses': [1.0, 1.0, 1.0], + 'force_constants': [1.0, 2.0, 3.0] + } + save_yaml_file(os.path.join(self.job_1.local_path, 'output.yml'), output_data) + self.job_1.parse_results() + self.assertEqual(self.job_1.electronic_energy, -76.0) + self.assertEqual(self.job_1.frequencies, [1500.0, 3600.0, 3700.0]) + self.assertEqual(self.job_1.force_constants, [1.0, 2.0, 3.0]) + self.assertIsNotNone(self.job_1.xyz_out) + self.assertAlmostEqual(self.job_1.xyz_out['coords'][1][1], 0.76) + + def test_to_kJmol(self): + """Test utility conversion function to_kJmol""" + self.assertAlmostEqual(to_kJmol(1.0), 96.48533, places=5) + self.assertAlmostEqual(to_kJmol(27.21138), 2625.499015202655, places=5) + + def test_numpy_vibrational_analysis(self): + """Test fallback numpy vibrational analysis directly""" + masses = np.array([16.0, 1.0, 1.0]) + n_atoms = len(masses) + # Create a hessian with some very small eigenvalues (for translations/rotations) + # and some large ones. + hessian = np.zeros((3 * n_atoms, 3 * n_atoms)) + for i in range(6, 9): + hessian[i, i] = 10.0 + + results = numpy_vibrational_analysis(masses, hessian) + self.assertIn('freqs', results) + self.assertIn('modes', results) + self.assertIn('force_constants', results) + self.assertIn('reduced_masses', results) + + # nonlinear (len > 2), filters out first 6 modes + self.assertEqual(len(results['freqs']), 3) + self.assertEqual(len(results['modes']), 3) + self.assertEqual(len(results['force_constants']), 3) + self.assertEqual(len(results['reduced_masses']), 3) + + @classmethod + def tearDownClass(cls): + """ + A function that is run ONCE after all unit tests in this class. + Delete all project directories created during these unit tests + """ + shutil.rmtree(cls.project_directory, ignore_errors=True) + + +if __name__ == '__main__': + unittest.main(testRunner=unittest.TextTestRunner(verbosity=2)) diff --git a/arc/job/adapters/common.py b/arc/job/adapters/common.py index 7ad8495713..12979279ef 100644 --- a/arc/job/adapters/common.py +++ b/arc/job/adapters/common.py @@ -74,7 +74,7 @@ } all_families_ts_adapters = [] -adapters_that_do_not_require_a_level_arg = ['xtb', 'torchani'] +adapters_that_do_not_require_a_level_arg = ['xtb', 'torchani', 'ase'] # Default is "queue", "pipe" will be called whenever needed. So just list 'incore'. default_incore_adapters = ['autotst', 'gcn', 'heuristics', 'kinbot', 'psi4', 'xtb', 'xtb_gsm', 'torchani', 'openbabel'] diff --git a/arc/job/adapters/scripts/ase_script.py b/arc/job/adapters/scripts/ase_script.py new file mode 100644 index 0000000000..7189571434 --- /dev/null +++ b/arc/job/adapters/scripts/ase_script.py @@ -0,0 +1,341 @@ +#!/usr/bin/env python3 +# encoding: utf-8 + +""" +A standalone script to run ASE (Atomic Simulation Environment) jobs. +Standardizes interaction with various calculators. +""" + +import argparse +import math +import os +import sys +import yaml +import numpy as np + +from ase import Atoms +from ase.constraints import FixInternals +from ase.optimize import BFGS, LBFGS, GPMin +from ase.optimize.sciopt import SciPyFminBFGS, SciPyFminCG +from ase.vibrations import Vibrations + +# Constants matched to ASE internal units (3.23.0+) for exact numerical matching +c = 299792458.0 +e = 1.6021766208e-19 +amu = 1.66053904e-27 +pi = math.pi +h = 6.62607015e-34 +E_h = 4.3597447222071e-18 # Hartree in Joules +N_A = 6.02214076e23 + + +def to_kJmol(energy_ev: float) -> float: + """ + Convert ASE default (eV) to kJ/mol. + """ + return energy_ev * e * N_A / 1000.0 + + +def read_yaml_file(path: str): + """ + Read a YAML file. + """ + with open(path, 'r') as f: + return yaml.load(stream=f, Loader=yaml.FullLoader) + + +def save_yaml_file(path: str, content: dict): + """ + Save a YAML file. + """ + def string_representer(dumper, data): + if len(data.splitlines()) > 1: + return dumper.represent_scalar(tag='tag:yaml.org,2002:str', value=data, style='|') + return dumper.represent_scalar(tag='tag:yaml.org,2002:str', value=data) + yaml.add_representer(str, string_representer) + with open(path, 'w') as f: + f.write(yaml.dump(data=content)) + + +def get_calculator(calc_config: dict, charge: int = 0, multiplicity: int = 1): + """ + Initialize the ASE calculator based on settings. + """ + name = calc_config.get('calculator', '').lower() + kwargs = calc_config.get('calculator_kwargs', {}) + + if name == 'torchani': + import torch + import torchani + model_name = calc_config.get('model', 'ANI2x') + device = torch.device(calc_config.get('device', 'cpu')) + if model_name.lower() == 'ani1ccx': + model = torchani.models.ANI1ccx(periodic_table_index=True).to(device) + elif model_name.lower() == 'ani1x': + model = torchani.models.ANI1x(periodic_table_index=True).to(device) + else: + model = torchani.models.ANI2x(periodic_table_index=True).to(device) + return model.ase() + + elif name == 'xtb': + from xtb.ase.calculator import XTB + if 'charge' not in kwargs: + kwargs['charge'] = charge + if 'uhf' not in kwargs: + kwargs['uhf'] = multiplicity - 1 + return XTB(**kwargs) + + elif name == 'mopac': + from ase.calculators.mopac import MOPAC + return MOPAC(**kwargs) + + from ase.calculators.calculator import get_calculator_class + try: + calc_class = get_calculator_class(name) + return calc_class(**kwargs) + except Exception as exc: + print(f"Could not load ASE calculator '{name}': {exc}") + sys.exit(1) + + +def apply_constraints(atoms: Atoms, constraints_data: list): + """ + Apply internal constraints to the Atoms object. + """ + if not constraints_data: + return + bonds, angles, dihedrals = list(), list(), list() + for constraint in constraints_data: + indices = constraint[0] + if len(indices) == 2: + bonds.append([constraint[1], indices]) + elif len(indices) == 3: + angles.append([constraint[1], indices]) + elif len(indices) == 4: + dihedrals.append([constraint[1], indices]) + atoms.set_constraint(FixInternals(bonds=bonds, angles_deg=angles, dihedrals_deg=dihedrals)) + + +def numpy_vibrational_analysis(masses: np.ndarray, hessian: np.ndarray): + """ + Computing vibrational wavenumbers, modes, reduced masses, and force constants from Hessian. + NumPy implementation following physical constants and ASE units. + Logic follows TorchANI and ASE VibrationsData standards. + + Args: + masses: (n_atoms,) array of atomic masses in AMU. + hessian: (3*n_atoms, 3*n_atoms) array in eV/A^2. + + Returns: + dict: Containing freqs, modes, force_constants, reduced_masses. + """ + # 1. Mass-weighted Hessian + # inv_sqrt_mass: (3*n_atoms,) + inv_sqrt_mass = (1.0 / np.sqrt(masses)).repeat(3) + # H_mw = M^-1/2 * H * M^-1/2 + mass_scaled_hessian = hessian * inv_sqrt_mass[:, np.newaxis] * inv_sqrt_mass[np.newaxis, :] + + # 2. Diagonalize + eigenvalues, eigenvectors = np.linalg.eigh(mass_scaled_hessian) + + # 3. Frequencies (cm^-1) + # Factor to convert sqrt(eV / (A^2 * AMU)) to cm^-1 + # nu = 1/(2*pi*c*100) * sqrt(e * 10^20 / amu) + freq_factor = (1.0 / (2.0 * pi * c * 100.0)) * np.sqrt((e * 1.0e20) / amu) + + freqs = [] + for eig in eigenvalues: + if eig >= 0: + f = freq_factor * np.sqrt(eig) + else: + # ARC convention: imaginary frequencies are represented as negative real numbers + f = -freq_factor * np.sqrt(-eig) + freqs.append(float(f)) + + # 4. Normal Modes (MDU: Mass Deweighted Unnormalized in TorchANI / Standard in ASE) + # These modes are normalized such that sum_i m_i * |v_i|^2 = 1 + # eigenvectors.T has modes as rows + mw_normalized = eigenvectors.T + md_unnormalized = mw_normalized * inv_sqrt_mass[np.newaxis, :] + + # 5. Reduced Masses (AMU) + # Formula from ASE/TorchANI: mu_n = 1 / sum_i |v_{n,i}|^2 + # where v are the mass-weighted normalized modes calculated above. + norm_sq = np.sum(np.square(np.abs(md_unnormalized)), axis=1) + rmasses = 1.0 / norm_sq + + # 6. Force Constants (mDyne/A) + # k_n = mu_n * omega_n^2 + # Conversion factor from eV/A^2 to mDyne/A is e * 10^-2 * 10^20 = e * 10^18 ? + # 1 eV/A^2 = 16.021766 N/m = 0.16021766 mDyne/A + # eigenvalue (eV/(A^2*AMU)) * rmass (AMU) = k (eV/A^2) + fconst_factor = e * 1.0e18 + fconstants = eigenvalues * rmasses * fconst_factor + + # MDN modes (Mass Deweighted Normalized) for output + # normalized such that sum_i |v_i|^2 = 1 + norm_factors = 1.0 / np.sqrt(norm_sq) + md_normalized = md_unnormalized * norm_factors[:, np.newaxis] + + # Filter out translations and rotations (first 6 modes for non-linear, 5 for linear) + # Most ESS only report 3N-6 / 3N-5 modes. + # We'll filter modes with very small magnitude if they are in the first 6. + # Sorting by magnitude ensures we catch the smallest ones. + indices = np.argsort(np.abs(freqs)) + + # Threshold for considering a mode as a translation/rotation (cm^-1) + rot_trans_threshold = 10.0 + + num_to_filter = 6 if len(masses) > 2 else 5 if len(masses) == 2 else 0 + filtered_indices = [] + for i in range(len(freqs)): + if i < num_to_filter and abs(freqs[indices[i]]) < rot_trans_threshold: + continue + filtered_indices.append(indices[i]) + + # Sort back the remaining indices by their original order (which is by eigenvalue) + # but we'll return them sorted by frequency value (imaginary first, then increasing real) + final_indices = sorted(filtered_indices, key=lambda i: freqs[i]) + + return { + 'freqs': [freqs[i] for i in final_indices], + 'modes': md_normalized[final_indices].reshape(len(final_indices), -1, 3).tolist(), + 'force_constants': [fconstants[i].tolist() for i in final_indices], + 'reduced_masses': [rmasses[i].tolist() for i in final_indices], + 'hessian': hessian.tolist() + } + + +def run_vibrational_analysis(atoms: Atoms, settings: dict): + """ + Perform vibrational analysis and return frequencies, modes, and other properties. + """ + if settings.get('calculator', '').lower() == 'torchani': + try: + import torch + import torchani + device = torch.device(settings.get('device', 'cpu')) + model_name = settings.get('model', 'ANI2x') + if model_name.lower() == 'ani1ccx': + model = torchani.models.ANI1ccx(periodic_table_index=True).to(device) + elif model_name.lower() == 'ani1x': + model = torchani.models.ANI1x(periodic_table_index=True).to(device) + else: + model = torchani.models.ANI2x(periodic_table_index=True).to(device) + + species = torch.tensor(atoms.get_atomic_numbers(), device=device, dtype=torch.long).unsqueeze(0) + coordinates = torch.from_numpy(atoms.get_positions()).unsqueeze(0).requires_grad_(True) + masses = torchani.utils.get_atomic_masses(species) + energies = model.double()((species, coordinates)).energies + hessian = torchani.utils.hessian(coordinates, energies=energies) + freqs, modes, force_constants, reduced_masses = torchani.utils.vibrational_analysis(masses, hessian, mode_type='MDN') + + return { + 'freqs': (freqs.cpu().numpy().tolist() if hasattr(freqs, 'cpu') else freqs.numpy().tolist()), + 'hessian': hessian.cpu().numpy().tolist() if hasattr(hessian, 'cpu') else hessian.tolist(), + 'modes': modes.cpu().numpy().tolist() if hasattr(modes, 'cpu') else modes.tolist(), + 'force_constants': force_constants.cpu().numpy().tolist() if hasattr(force_constants, 'cpu') else force_constants.tolist(), + 'reduced_masses': reduced_masses.cpu().numpy().tolist() if hasattr(reduced_masses, 'cpu') else reduced_masses.tolist() + } + except Exception: + pass + + vib = Vibrations(atoms, name='vib_tmp', nfree=4) + vib.run() + vib_data = vib.get_vibrations() + try: + hessian = vib_data.get_hessian_2d() + except AttributeError: + hessian = vib_data.get_hessian() + if len(hessian.shape) == 4: + n_atoms = hessian.shape[0] + hessian = hessian.reshape(3 * n_atoms, 3 * n_atoms) + masses = atoms.get_masses() + vib.clean() + + return numpy_vibrational_analysis(masses, hessian) + + +def main(): + """ + Main execution logic. + """ + parser = argparse.ArgumentParser() + parser.add_argument('--yml_path', type=str, default='input.yml') + args = parser.parse_args() + + input_path = os.path.abspath(args.yml_path) + if os.path.isdir(input_path): + input_path = os.path.join(input_path, 'input.yml') + + try: + input_dict = read_yaml_file(input_path) + except Exception as exc: + print(f"Error reading input file: {exc}") + return + + job_type = input_dict.get('job_type') + xyz = input_dict.get('xyz') + settings = input_dict.get('settings', {}) + charge = input_dict.get('charge', 0) + multiplicity = input_dict.get('multiplicity', 1) + + atoms = Atoms(symbols=xyz['symbols'], positions=xyz['coords']) + calc = get_calculator(settings, charge, multiplicity) + atoms.calc = calc + + apply_constraints(atoms, input_dict.get('constraints')) + + output = {} + + def save_current_geometry(out_dict, atoms_obj, input_xyz): + out_dict['opt_xyz'] = { + 'coords': tuple(map(tuple, atoms_obj.get_positions().tolist())), + 'symbols': input_xyz['symbols'], + 'isotopes': input_xyz.get('isotopes') or tuple([None] * len(input_xyz['symbols'])) + } + + if job_type in ['sp', 'opt', 'conf_opt', 'freq', 'optfreq', 'directed_scan']: + output['sp'] = to_kJmol(atoms.get_potential_energy()) + + if job_type in ['opt', 'conf_opt', 'optfreq', 'directed_scan']: + fmax = float(settings.get('fmax', 0.001)) + steps = int(settings.get('steps', 1000)) + engine_name = settings.get('optimizer', 'BFGS').lower() + + engine_dict = { + 'bfgs': BFGS, 'lbfgs': LBFGS, 'gpmin': GPMin, + 'scipyfminbfgs': SciPyFminBFGS, 'scipyfmincg': SciPyFminCG, + 'sella': None, + } + if engine_name == 'sella': + from sella import Sella + opt_class = Sella + else: + opt_class = engine_dict.get(engine_name, BFGS) + opt = opt_class(atoms, logfile=os.path.join(os.path.dirname(input_path), 'opt.log')) + + try: + opt.run(fmax=fmax, steps=steps) + save_current_geometry(output, atoms, xyz) + output['sp'] = to_kJmol(atoms.get_potential_energy()) + except Exception as exc: + output['error'] = f"Optimization failed: {exc}" + save_current_geometry(output, atoms, xyz) + else: + # For non-optimization jobs, still save the geometry + save_current_geometry(output, atoms, xyz) + + if job_type in ['freq', 'optfreq']: + try: + freq_results = run_vibrational_analysis(atoms, settings) + output.update(freq_results) + except Exception as exc: + output['error'] = output.get('error', '') + f" Frequency calculation failed: {exc}" + + output_path = os.path.join(os.path.dirname(input_path), 'output.yml') + save_yaml_file(output_path, output) + + +if __name__ == '__main__': + main() diff --git a/arc/level.py b/arc/level.py index 50d2062d2a..8b107d661b 100644 --- a/arc/level.py +++ b/arc/level.py @@ -342,6 +342,8 @@ def deduce_software(self, Args: job_type (str, optional): An ARC job type, assists in determining the software. """ + if self.software is not None and job_type is None: + return # OneDMin if job_type == 'onedmin': diff --git a/arc/main_test.py b/arc/main_test.py index 8251079306..ba9c75fd39 100644 --- a/arc/main_test.py +++ b/arc/main_test.py @@ -77,7 +77,8 @@ def test_as_dict(self): 'method': 'wb97xd', 'method_type': 'dft', 'software': 'gaussian'}, - 'ess_settings': {'cfour': ['local'], + 'ess_settings': {'ase': ['local'], + 'cfour': ['local'], 'gaussian': ['local', 'server2'], 'gcn': ['local'], 'mockter': ['local'], diff --git a/arc/parser/adapters/yaml.py b/arc/parser/adapters/yaml.py index 5828ec9e33..8cd733f16f 100644 --- a/arc/parser/adapters/yaml.py +++ b/arc/parser/adapters/yaml.py @@ -42,7 +42,7 @@ def parse_geometry(self) -> Optional[Dict[str, tuple]]: Returns: Optional[Dict[str, tuple]] The cartesian geometry. """ - for key in ['xyz', 'opt_xyz']: + for key in ['opt_xyz', 'xyz']: if key in self.data.keys(): return self.data[key] if isinstance(self.data[key], dict) else str_to_xyz(self.data[key]) return None @@ -54,7 +54,7 @@ def parse_frequencies(self) -> Optional['np.ndarray']: Returns: Optional[np.ndarray] The parsed frequencies (in cm^-1). """ - freqs = self.data.get('freqs') + freqs = self.data.get('freqs') or self.data.get('frequencies') return np.array(freqs, dtype=np.float64) if freqs else None def parse_normal_mode_displacement(self) -> Tuple[Optional['np.ndarray'], Optional['np.ndarray']]: @@ -64,8 +64,8 @@ def parse_normal_mode_displacement(self) -> Tuple[Optional['np.ndarray'], Option Returns: Tuple[Optional['np.ndarray'], Optional['np.ndarray']] The frequencies (in cm^-1) and the normal mode displacements. """ - freqs = self.data.get('freqs') - modes = self.data.get('modes') + freqs = self.data.get('freqs') or self.data.get('frequencies') + modes = self.data.get('modes') or self.data.get('normal_modes') if freqs and modes: return ( np.array(freqs, dtype=np.float64) if freqs is not None else None, @@ -85,12 +85,12 @@ def parse_t1(self) -> Optional[float]: def parse_e_elect(self) -> Optional[float]: """ - Parse the electronic energy from an sp job output file. + Parse the electronic energy from the YAML file. Returns: Optional[float] The electronic energy in kJ/mol. """ - energy = self.data.get('sp') + energy = self.data.get('sp') or self.data.get('energy') return energy def parse_zpe_correction(self) -> Optional[float]: diff --git a/arc/parser/parser.py b/arc/parser/parser.py index c417f3c2d0..fa0b7ade19 100644 --- a/arc/parser/parser.py +++ b/arc/parser/parser.py @@ -114,6 +114,9 @@ def parse_xyz_from_file(log_file_path: str) -> Optional[Dict[str, tuple]]: splits = line.split() if len(splits) == 2 and all([s.isdigit() for s in splits]): start_parsing = True + elif file_extension in ['.yml', '.yaml']: + ess_adapter = ess_factory(log_file_path=log_file_path, ess_adapter='yaml') + xyz = ess_adapter.parse_geometry() else: record = False for line in lines: diff --git a/arc/settings/settings.py b/arc/settings/settings.py index 2a14fd4a2a..41aebb8aa6 100644 --- a/arc/settings/settings.py +++ b/arc/settings/settings.py @@ -83,10 +83,11 @@ 'torchani': 'local', 'openbabel': 'local', 'orca_neb': 'local', + 'ase': 'local', } # Electronic structure software ARC may access (use lowercase): -supported_ess = ['cfour', 'gaussian', 'mockter', 'molpro', 'orca', 'qchem', 'terachem', 'onedmin', 'xtb', 'torchani', 'openbabel'] +supported_ess = ['cfour', 'gaussian', 'mockter', 'molpro', 'orca', 'qchem', 'terachem', 'onedmin', 'xtb', 'torchani', 'openbabel', 'ase'] # TS methods to try when appropriate for a reaction (other than user guesses which are always allowed): ts_adapters = ['heuristics', 'AutoTST', 'GCN', 'xtb_gsm', 'orca_neb'] @@ -157,7 +158,8 @@ 'HTCondor': 'hours', } -input_filenames = {'cfour': 'ZMAT', +input_filenames = {'ase': 'input.yml', + 'cfour': 'ZMAT', 'gaussian': 'input.gjf', 'mockter': 'input.yml', 'molpro': 'input.in', @@ -169,7 +171,8 @@ 'xtb': 'input.sh', } -output_filenames = {'cfour': 'output.out', +output_filenames = {'ase': 'output.yml', + 'cfour': 'output.out', 'gaussian': 'input.log', 'gcn': 'output.yml', 'mockter': 'output.yml', @@ -258,6 +261,15 @@ 'level': 'wb97xd/def2tzvp', } +ase_default_options_dict = {'optimizer': 'BFGS', + 'fmax': 0.001, + 'steps': 1000, + } + +ASE_CALCULATORS_ENV = {'torchani': 'TANI_PYTHON', + 'xtb': 'SELLA_PYTHON', + } + valid_chars = "-_[]=.,%s%s" % (string.ascii_letters, string.digits) # A scan with better resolution (lower number here) takes more time to compute, @@ -307,8 +319,8 @@ ARC_FAMILIES_PATH = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), 'data', 'families') # Default environment names for sister repos -TS_GCN_PYTHON, TANI_PYTHON, AUTOTST_PYTHON, ARC_PYTHON, XTB, OB_PYTHON, RMG_PYTHON, RMG_PATH, RMG_DB_PATH = \ - None, None, None, None, None, None, None, None, None +TS_GCN_PYTHON, TANI_PYTHON, AUTOTST_PYTHON, ARC_PYTHON, XTB, XTB_PYTHON, OB_PYTHON, RMG_PYTHON, RMG_PATH, RMG_DB_PATH = \ + None, None, None, None, None, None, None, None, None, None home = os.getenv("HOME") or os.path.expanduser("~") @@ -345,10 +357,12 @@ def find_executable(env_name, executable_name='python'): return None TANI_PYTHON = find_executable('tani_env') +SELLA_PYTHON = find_executable('sella_env') OB_PYTHON = find_executable('ob_env') TS_GCN_PYTHON = find_executable('ts_gcn') AUTOTST_PYTHON = find_executable('tst_env') ARC_PYTHON = find_executable('arc_env') +XTB_PYTHON = find_executable('xtb_env') RMG_ENV_NAME = 'rmg_env' RMG_PYTHON = find_executable('rmg_env') XTB = find_executable('xtb_env', 'xtb')