diff --git a/src/matkit/cli.py b/src/matkit/cli.py index 5636dca..7edbe6d 100644 --- a/src/matkit/cli.py +++ b/src/matkit/cli.py @@ -1,12 +1,5 @@ import click import json -from matkit.graspa import graspa -from matkit.raspa2 import raspa2 -from matkit.tobacco import create_linker_from_smiles -from matkit.zeopp import zeopp - - -from matkit.graspa_sycl import graspa_sycl @click.group() @@ -49,12 +42,13 @@ def graspa_cli(): @click.option("--cycles", default=1000, help="Number of cycles.") def graspa_setup(cif, outdir, adsorbate, temp, pressure, cutoff, cycles): """Setup gRASPA simulation files.""" + from matkit.graspa import setup_simulation + # Convert adsorbates tuple to list of dicts - # as expected by graspa.setup_simulation adsorbate_list = [{"MoleculeName": ads} for ads in adsorbate] try: - graspa.setup_simulation( + setup_simulation( cif=cif, outpath=outdir, adsorbates=adsorbate_list, @@ -68,6 +62,65 @@ def graspa_setup(cif, outdir, adsorbate, temp, pressure, cutoff, cycles): click.echo(f"Error setting up gRASPA simulation: {e}", err=True) +@graspa_cli.command("batch-setup") +@click.option( + "--cif-dir", + required=True, + type=click.Path(exists=True, file_okay=False), + help="Directory containing CIF files.", +) +@click.option( + "--outdir", + required=True, + type=click.Path(), + help="Base output directory for simulation files.", +) +@click.option( + "--adsorbate", + required=True, + multiple=True, + help="Adsorbate molecule name (e.g. CO2). Can be specified multiple times.", +) +@click.option( + "--temp", + required=True, + multiple=True, + type=float, + help="Temperature in Kelvin. Can be specified multiple times.", +) +@click.option( + "--pressure", + required=True, + multiple=True, + type=float, + help="Pressure in Pa. Can be specified multiple times.", +) +@click.option("--cutoff", default=12.8, help="Cutoff radius in Angstrom.") +@click.option("--cycles", default=1000, help="Number of MC cycles.") +def graspa_batch_setup( + cif_dir, outdir, adsorbate, temp, pressure, cutoff, cycles +): + """Set up gRASPA simulations for all CIF x T x P.""" + from matkit.graspa import setup_batch + + adsorbate_list = [{"MoleculeName": ads} for ads in adsorbate] + + try: + manifest = setup_batch( + cif_dir=cif_dir, + outpath=outdir, + adsorbates=adsorbate_list, + temperatures=list(temp), + pressures=list(pressure), + cutoff=cutoff, + n_cycle=cycles, + ) + click.echo(f"Set up {len(manifest)} simulations in {outdir}") + click.echo(f"Manifest written to {outdir}/simulations.jsonl") + except Exception as e: + click.echo(f"Error setting up batch simulations: {e}", err=True) + + @graspa_cli.command("analyze") @click.option( "--path", @@ -83,8 +136,10 @@ def graspa_setup(cif, outdir, adsorbate, temp, pressure, cutoff, cycles): ) def graspa_analyze(path, unit): """Analyze gRASPA simulation results.""" + from matkit.graspa import get_output_data + try: - result = graspa.get_output_data(path, unit=unit) + result = get_output_data(path, unit=unit) click.echo(json.dumps(result, indent=2)) except Exception as e: click.echo(f"Error analyzing gRASPA results: {e}", err=True) @@ -119,8 +174,10 @@ def graspa_sycl_cli(): @click.option("--cycles", default=1000, help="Number of cycles.") def graspa_sycl_setup(cif, outdir, adsorbate, temp, pressure, cutoff, cycles): """Setup gRASPA SYCL simulation files.""" + from matkit.graspa_sycl import setup_simulation + try: - graspa_sycl.setup_simulation( + setup_simulation( cif=cif, outpath=outdir, adsorbate=adsorbate, @@ -155,8 +212,10 @@ def graspa_sycl_setup(cif, outdir, adsorbate, temp, pressure, cutoff, cycles): @click.option("--fname", default="raspa.log", help="Output filename.") def graspa_sycl_analyze(path, unit, adsorbate, fname): """Analyze gRASPA SYCL simulation results.""" + from matkit.graspa_sycl import get_output_data + try: - result = graspa_sycl.get_output_data( + result = get_output_data( output_path=path, unit=unit, adsorbate=adsorbate, output_fname=fname ) click.echo(json.dumps(result, indent=2)) @@ -194,9 +253,10 @@ def raspa2_cli(): @click.option("--cycles", default=1000, help="Number of cycles.") def raspa2_setup(cif, outdir, adsorbate, temp, pressure, cutoff, cycles): """Setup RASPA2 simulation files.""" + from matkit.raspa2 import setup_input_simulation + try: - # raspa2.setup_input_simulation takes a list of cifs - raspa2.setup_input_simulation( + setup_input_simulation( cifs=list(cif), outpath=outdir, adsorbate=adsorbate, @@ -225,8 +285,10 @@ def raspa2_setup(cif, outdir, adsorbate, temp, pressure, cutoff, cycles): ) def raspa2_analyze(path, unit): """Analyze RASPA2 simulation results.""" + from matkit.raspa2 import get_output_data + try: - result = raspa2.get_output_data(path, unit=unit) + result = get_output_data(path, unit=unit) click.echo(json.dumps(result, indent=2)) except Exception as e: click.echo(f"Error analyzing RASPA2 results: {e}", err=True) @@ -252,8 +314,10 @@ def tobacco_cli(): @click.option("--out", default="final_output.cif", help="Output CIF filename.") def tobacco_create(smiles, site, out): """Create a linker CIF from SMILES.""" + from matkit.tobacco import create_linker + try: - create_linker_from_smiles.create_linker( + create_linker( smiles=smiles, connection_sites=list(site), output_cif=out ) except Exception as e: @@ -541,6 +605,454 @@ def plot_selectivity_cmd( click.echo(f"Error: {e}", err=True) +# ========================================== +# MLIP COMMANDS +# ========================================== +@main.group("mlip") +def mlip_cli(): + """Commands for ML interatomic potential calculations.""" + pass + + +@mlip_cli.command("mace-opt") +@click.option( + "--fname", + required=True, + type=click.Path(exists=True), + help="Input structure file (CIF, XYZ, POSCAR, etc.).", +) +@click.option( + "--run-type", + default="geo_opt", + type=click.Choice(["geo_opt", "cell_opt", "geo_opt_cell_opt"]), + help="Optimization type.", +) +@click.option("--steps", default=1000, help="Max optimization steps.") +@click.option( + "--fmax", + default=1e-3, + help="Force convergence criterion (eV/A).", +) +@click.option( + "--model", + default="medium", + type=click.Choice(["small", "medium", "large"]), + help="MACE-MP model size.", +) +@click.option( + "--device", + default="cpu", + type=click.Choice(["cpu", "cuda"]), + help="Compute device.", +) +@click.option( + "--dispersion/--no-dispersion", + default=True, + help="Include D3 dispersion corrections.", +) +@click.option( + "--default-dtype", + default="float64", + type=click.Choice(["float32", "float64"]), + help="Floating point precision.", +) +@click.option( + "--write-traj", + is_flag=True, + help="Write ASE trajectory files.", +) +@click.option( + "--output", + default=None, + type=click.Path(), + help="Output filename (auto-generated if omitted).", +) +def mace_opt_cmd( + fname, + run_type, + steps, + fmax, + model, + device, + dispersion, + default_dtype, + write_traj, + output, +): + """Run MACE-MP geometry/cell optimization.""" + try: + from matkit.mlip import run_opt_mace + + run_opt_mace( + fname=fname, + run_type=run_type, + steps=steps, + fmax=fmax, + model=model, + device=device, + dispersion=dispersion, + default_dtype=default_dtype, + write_traj=write_traj, + output_fname=output, + ) + click.echo("MACE-MP optimization complete.") + except ImportError: + click.echo( + "Error: mace-torch required. " + "Install with: pip install matkit[mlip]", + err=True, + ) + except Exception as e: + click.echo(f"Error: {e}", err=True) + + +@mlip_cli.command("uma-opt") +@click.option( + "--fname", + required=True, + type=click.Path(exists=True), + help="Input structure file.", +) +@click.option( + "--run-type", + default="geo_opt", + type=click.Choice(["geo_opt", "cell_opt", "geo_opt_cell_opt"]), + help="Optimization type.", +) +@click.option("--steps", default=1000, help="Max optimization steps.") +@click.option( + "--fmax", + default=1e-3, + help="Force convergence criterion (eV/A).", +) +@click.option( + "--model", + default="uma-s-1p2", + help="UMA model name (e.g. uma-s-1p2, uma-m-1p1).", +) +@click.option( + "--task-name", + default="omat", + type=click.Choice(["omat", "oc20", "oc22", "oc25", "omol", "odac", "omc"]), + help="Task head for domain-specific prediction.", +) +@click.option( + "--device", + default="cpu", + type=click.Choice(["cpu", "cuda"]), + help="Compute device.", +) +@click.option( + "--write-traj", + is_flag=True, + help="Write ASE trajectory files.", +) +@click.option( + "--output", + default=None, + type=click.Path(), + help="Output filename (auto-generated if omitted).", +) +def uma_opt_cmd( + fname, + run_type, + steps, + fmax, + model, + task_name, + device, + write_traj, + output, +): + """Run UMA geometry/cell optimization.""" + try: + from matkit.mlip.uma import run_opt_uma + + run_opt_uma( + fname=fname, + run_type=run_type, + steps=steps, + fmax=fmax, + model=model, + task_name=task_name, + device=device, + write_traj=write_traj, + output_fname=output, + ) + click.echo("UMA optimization complete.") + except ImportError: + click.echo( + "Error: fairchem-core required. " + "Install with: pip install matkit[uma]", + err=True, + ) + except Exception as e: + click.echo(f"Error: {e}", err=True) + + +@mlip_cli.command("uma-sp") +@click.option( + "--fname", + required=True, + type=click.Path(exists=True), + help="Input structure file.", +) +@click.option( + "--model", + default="uma-s-1p2", + help="UMA model name.", +) +@click.option( + "--task-name", + default="omat", + type=click.Choice(["omat", "oc20", "oc22", "oc25", "omol", "odac", "omc"]), + help="Task head for domain-specific prediction.", +) +@click.option( + "--device", + default="cpu", + type=click.Choice(["cpu", "cuda"]), + help="Compute device.", +) +def uma_sp_cmd(fname, model, task_name, device): + """Run UMA single-point energy calculation.""" + try: + from matkit.mlip.uma import run_sp_uma + + result = run_sp_uma( + fname=fname, + model=model, + task_name=task_name, + device=device, + ) + click.echo(json.dumps(result, indent=2)) + except ImportError: + click.echo( + "Error: fairchem-core required. " + "Install with: pip install matkit[uma]", + err=True, + ) + except Exception as e: + click.echo(f"Error: {e}", err=True) + + +@mlip_cli.command("uma-md") +@click.option( + "--fname", + required=True, + type=click.Path(exists=True), + help="Input structure file.", +) +@click.option( + "--model", + default="uma-s-1p2", + help="UMA model name.", +) +@click.option( + "--task-name", + default="omat", + type=click.Choice(["omat", "oc20", "oc22", "oc25", "omol", "odac", "omc"]), + help="Task head for domain-specific prediction.", +) +@click.option( + "--device", + default="cpu", + type=click.Choice(["cpu", "cuda"]), + help="Compute device.", +) +@click.option( + "--temperature", + default=300.0, + help="Target temperature in Kelvin.", +) +@click.option( + "--timestep", + default=1.0, + help="MD timestep in femtoseconds.", +) +@click.option("--steps", default=1000, help="Number of MD steps.") +@click.option( + "--ensemble", + default="nvt", + type=click.Choice(["nve", "nvt"]), + help="MD ensemble.", +) +@click.option( + "--friction", + default=0.01, + help="Langevin friction coefficient (NVT only).", +) +@click.option( + "--write-traj", + is_flag=True, + help="Write ASE trajectory file.", +) +@click.option( + "--output", + default=None, + type=click.Path(), + help="Output filename (auto-generated if omitted).", +) +@click.option( + "--log-interval", + default=10, + help="Steps between log entries.", +) +def uma_md_cmd( + fname, + model, + task_name, + device, + temperature, + timestep, + steps, + ensemble, + friction, + write_traj, + output, + log_interval, +): + """Run UMA molecular dynamics.""" + try: + from matkit.mlip.uma import run_md_uma + + run_md_uma( + fname=fname, + model=model, + task_name=task_name, + device=device, + temperature=temperature, + timestep=timestep, + steps=steps, + ensemble=ensemble, + friction=friction, + write_traj=write_traj, + output_fname=output, + log_interval=log_interval, + ) + click.echo("UMA MD complete.") + except ImportError: + click.echo( + "Error: fairchem-core required. " + "Install with: pip install matkit[uma]", + err=True, + ) + except Exception as e: + click.echo(f"Error: {e}", err=True) + + +@mlip_cli.command("uma-opt-batch") +@click.option( + "--input", + "input_path", + required=True, + type=click.Path(exists=True), + help="Input CIF file or directory of CIF files.", +) +@click.option( + "--outdir", + required=True, + type=click.Path(), + help="Output directory for CIFs and results.jsonl.", +) +@click.option( + "--model", + "models", + multiple=True, + default=["uma-s-1p2"], + help="UMA model name(s). Can be specified multiple times.", +) +@click.option( + "--run-type", + "run_types", + multiple=True, + default=["geo_opt"], + type=click.Choice(["geo_opt", "cell_opt", "geo_opt_cell_opt"]), + help="Optimization type(s). Can be specified multiple times.", +) +@click.option( + "--task-name", + default="omat", + type=click.Choice(["omat", "oc20", "oc22", "oc25", "omol", "odac", "omc"]), + help="Task head for domain-specific prediction.", +) +@click.option("--steps", default=1000, help="Max optimization steps.") +@click.option( + "--fmax", + default=1e-3, + help="Force convergence criterion (eV/A) for geometry.", +) +@click.option( + "--fmax-cell", + default=None, + type=float, + help="Force convergence for cell opt. Defaults to --fmax.", +) +@click.option( + "--num-gpus", + default=None, + type=int, + help="Number of GPUs. Auto-detected if omitted.", +) +@click.option( + "--device", + default="cuda", + type=click.Choice(["cpu", "cuda"]), + help="Compute device.", +) +@click.option( + "--write-traj", + is_flag=True, + help="Write ASE trajectory files.", +) +@click.option( + "--overwrite", + is_flag=True, + help="Overwrite existing output files.", +) +def uma_opt_batch_cmd( + input_path, + outdir, + models, + run_types, + task_name, + steps, + fmax, + fmax_cell, + num_gpus, + device, + write_traj, + overwrite, +): + """Run UMA optimization in batch across multiple GPUs.""" + try: + from matkit.mlip.uma import run_opt_uma_batch + + result_path = run_opt_uma_batch( + input_path=input_path, + output_dir=outdir, + models=list(models), + run_types=list(run_types), + task_name=task_name, + steps=steps, + fmax=fmax, + fmax_cell=fmax_cell, + num_gpus=num_gpus, + device=device, + write_traj=write_traj, + overwrite=overwrite, + ) + click.echo(f"Results written to: {result_path}") + except ImportError: + click.echo( + "Error: fairchem-core required. " + "Install with: pip install matkit[uma]", + err=True, + ) + except Exception as e: + click.echo(f"Error: {e}", err=True) + + # ========================================== # ZEOPP COMMANDS # ========================================== @@ -602,11 +1114,22 @@ def zeopp_cli(): type=click.Path(), help="Output directory for result files.", ) -def zeopp_run(cif, analysis, probe_radius, chan_radius, num_samples, - ha, radii, network_path, outdir): +def zeopp_run( + cif, + analysis, + probe_radius, + chan_radius, + num_samples, + ha, + radii, + network_path, + outdir, +): """Run Zeo++ analysis on a CIF structure.""" + from matkit.zeopp import run_zeopp + try: - result = zeopp.run_zeopp( + result = run_zeopp( cif=cif, analyses=list(analysis), probe_radius=probe_radius, @@ -637,9 +1160,11 @@ def zeopp_run(cif, analysis, probe_radius, chan_radius, num_samples, ) def zeopp_analyze(path, analysis): """Parse existing Zeo++ output files.""" + from matkit.zeopp import get_output_data + try: analyses = list(analysis) if analysis else None - result = zeopp.get_output_data(path, analyses=analyses) + result = get_output_data(path, analyses=analyses) click.echo(json.dumps(result, indent=2)) except Exception as e: click.echo(f"Error analyzing Zeo++ results: {e}", err=True) @@ -704,9 +1229,16 @@ def zeopp_analyze(path, analysis): help="Max parallel processes. Default: CPU count.", ) def zeopp_run_batch( - cif_dir, outdir, analysis, probe_radius, - chan_radius, num_samples, ha, radii, - network_path, max_workers, + cif_dir, + outdir, + analysis, + probe_radius, + chan_radius, + num_samples, + ha, + radii, + network_path, + max_workers, ): """Run Zeo++ on all CIFs in a directory. @@ -719,8 +1251,10 @@ def zeopp_run_batch( matkit zeopp run-batch --cif-dir cifs/ --outdir out/ \\ --analysis res --analysis sa --max-workers 32 """ + from matkit.zeopp import run_batch + try: - summary = zeopp.run_batch( + summary = run_batch( cif_dir=cif_dir, output_dir=outdir, analyses=list(analysis), diff --git a/src/matkit/graspa/__init__.py b/src/matkit/graspa/__init__.py index 8391d99..06f59d6 100644 --- a/src/matkit/graspa/__init__.py +++ b/src/matkit/graspa/__init__.py @@ -1,11 +1,13 @@ from matkit.graspa.graspa import ( setup_simulation, + setup_batch, get_output_data, generate_component_blocks, ) __all__ = [ "setup_simulation", + "setup_batch", "get_output_data", "generate_component_blocks", ] diff --git a/src/matkit/graspa/graspa.py b/src/matkit/graspa/graspa.py index da5a98b..b9be117 100644 --- a/src/matkit/graspa/graspa.py +++ b/src/matkit/graspa/graspa.py @@ -1,6 +1,11 @@ +from __future__ import annotations + +import json +from itertools import product from pathlib import Path import shutil from matkit.utils.unitcell_calculator import calculate_cell_size +from matkit.types import GRASPAResult from ase.io import read as ase_read @@ -9,7 +14,7 @@ def get_output_data( unit: str = "mol/kg", output_fname: str = "raspa.log", eos: bool = False, -) -> dict: +) -> GRASPAResult: """Parse gRASPA simulation output and extract uptake data. Args: @@ -229,3 +234,76 @@ def setup_simulation( f.write(template) return True + + +def setup_batch( + cif_dir: str, + outpath: str, + adsorbates: list[dict], + temperatures: list[float], + pressures: list[float], + cutoff: float = 12.8, + n_cycle: int = 1000, + template_dir: str = "template", +) -> list[dict]: + """Set up gRASPA simulations for all CIF x T x P. + + Discovers all .cif files in cif_dir and creates a simulation directory + for each (CIF, temperature, pressure) combination using setup_simulation(). + Writes a simulations.jsonl manifest to outpath. + + Args: + cif_dir: Directory containing input CIF files. + outpath: Base output directory for all simulation directories. + adsorbates: List of adsorbate dicts with 'MoleculeName' key. + temperatures: List of simulation temperatures in Kelvin. + pressures: List of simulation pressures in Pascals. + cutoff: Van der Waals cutoff radius in Angstrom. + n_cycle: Number of Monte Carlo cycles. + template_dir: Template subdirectory name under files/. + + Returns: + List of manifest dicts, each with keys: sim_dir, cif, + temperature, pressure, adsorbates. + + Raises: + ValueError: If cif_dir does not exist or contains no .cif files. + """ + cif_path = Path(cif_dir) + out_path = Path(outpath) + + if not cif_path.is_dir(): + raise ValueError(f"CIF directory does not exist: {cif_dir}") + + cif_files = sorted(cif_path.glob("*.cif")) + if not cif_files: + raise ValueError(f"No .cif files found in {cif_dir}") + + manifest = [] + for cif, temp, pres in product(cif_files, temperatures, pressures): + sim_dir = out_path / cif.stem / f"T{temp}_P{pres:g}" + setup_simulation( + cif=str(cif), + outpath=str(sim_dir), + adsorbates=adsorbates, + temperature=temp, + pressure=pres, + cutoff=cutoff, + n_cycle=n_cycle, + template_dir=template_dir, + ) + entry = { + "sim_dir": str(sim_dir), + "cif": cif.name, + "temperature": temp, + "pressure": pres, + "adsorbates": [ad["MoleculeName"] for ad in adsorbates], + } + manifest.append(entry) + + manifest_path = out_path / "simulations.jsonl" + with manifest_path.open("w") as f: + for entry in manifest: + f.write(json.dumps(entry) + "\n") + + return manifest diff --git a/src/matkit/mlip/__init__.py b/src/matkit/mlip/__init__.py index 784f493..83231a6 100644 --- a/src/matkit/mlip/__init__.py +++ b/src/matkit/mlip/__init__.py @@ -1,3 +1,25 @@ -from matkit.mlip.mace_opt import run_opt_mace +__all__ = [] -__all__ = ["run_opt_mace"] +try: + from matkit.mlip.mace_opt import run_opt_mace + + __all__ += ["run_opt_mace"] +except ImportError: + pass + +try: + from matkit.mlip.uma import ( + run_opt_uma, + run_opt_uma_batch, + run_sp_uma, + run_md_uma, + ) + + __all__ += [ + "run_opt_uma", + "run_opt_uma_batch", + "run_sp_uma", + "run_md_uma", + ] +except ImportError: + pass diff --git a/src/matkit/mlip/uma.py b/src/matkit/mlip/uma.py new file mode 100644 index 0000000..8659595 --- /dev/null +++ b/src/matkit/mlip/uma.py @@ -0,0 +1,529 @@ +from pathlib import Path + +from ase.io import read as ase_read +from ase.io import write as ase_write +from ase.optimize import BFGS + +try: + from ase.filters import ExpCellFilter +except ImportError: + try: + from ase.constraints import ExpCellFilter + except ImportError: + from ase.filters import FrechetCellFilter as ExpCellFilter +from ase.md.langevin import Langevin +from ase.md.verlet import VelocityVerlet +from ase import units + + +def _create_calculator( + model: str = "uma-s-1p2", + task_name: str = "omat", + device: str = "cpu", +): + """Create a FAIRChemCalculator from a pretrained UMA model. + + Args: + model: UMA model name (e.g. 'uma-s-1p2', 'uma-m-1p1'). + task_name: Task head name ('omat', 'oc20', 'oc22', 'oc25', + 'omol', 'odac', 'omc'). + device: Compute device ('cpu' or 'cuda'). + + Returns: + Configured FAIRChemCalculator instance. + """ + from fairchem.core import pretrained_mlip, FAIRChemCalculator + + predictor = pretrained_mlip.get_predict_unit(model, device=device) + return FAIRChemCalculator(predictor, task_name=task_name) + + +def _run_opt_uma_with_calc( + atoms, + calc, + run_type: str = "geo_opt", + steps: int = 1000, + fmax: float = 1e-3, + fmax_cell: float | None = None, + write_traj: bool = False, + traj_prefix: str | None = None, +) -> dict: + """Core optimization logic with a pre-built calculator. + + Args: + atoms: ASE Atoms object to optimize (modified in place). + calc: Pre-built ASE calculator. + run_type: 'geo_opt', 'cell_opt', or 'geo_opt_cell_opt'. + steps: Maximum optimization steps per stage. + fmax: Force convergence criterion (eV/A) for geometry. + fmax_cell: Force convergence for cell optimization. + Falls back to fmax if None. + write_traj: Whether to write trajectory files. + traj_prefix: Base path for trajectory files (no + extension). Required if write_traj is True. + + Returns: + Dict with 'converged' (bool), 'final_energy' (float), + 'n_steps' (int). + """ + atoms.calc = calc + if fmax_cell is None: + fmax_cell = fmax + + total_steps = 0 + + if run_type == "geo_opt": + traj = f"{traj_prefix}.traj" if write_traj else None + dyn = BFGS(atoms, trajectory=traj) + converged = dyn.run(fmax=fmax, steps=steps) + total_steps = dyn.nsteps + elif run_type == "cell_opt": + ecf = ExpCellFilter(atoms) + traj = f"{traj_prefix}.traj" if write_traj else None + dyn = BFGS(ecf, trajectory=traj) + converged = dyn.run(fmax=fmax_cell, steps=steps) + total_steps = dyn.nsteps + elif run_type == "geo_opt_cell_opt": + traj1 = f"{traj_prefix}_geo_opt.traj" if write_traj else None + dyn1 = BFGS(atoms, trajectory=traj1) + dyn1.run(fmax=fmax, steps=steps) + total_steps += dyn1.nsteps + + ecf = ExpCellFilter(atoms) + traj2 = f"{traj_prefix}_cell_opt.traj" if write_traj else None + dyn2 = BFGS(ecf, trajectory=traj2) + converged = dyn2.run(fmax=fmax_cell, steps=steps) + total_steps += dyn2.nsteps + else: + raise ValueError( + f"run_type '{run_type}' is not supported. " + "Options: 'geo_opt', 'cell_opt', " + "'geo_opt_cell_opt'." + ) + + return { + "converged": bool(converged), + "final_energy": float(atoms.get_potential_energy()), + "n_steps": total_steps, + } + + +def run_opt_uma( + fname: str, + run_type: str = "geo_opt", + steps: int = 1000, + fmax: float = 1e-3, + model: str = "uma-s-1p2", + task_name: str = "omat", + device: str = "cpu", + write_traj: bool = False, + output_fname: str = None, +) -> None: + """Run geometry and/or cell optimization using a UMA model. + + Args: + fname: Path to input structure file (CIF, XYZ, POSCAR, + etc.). + run_type: Type of optimization - 'geo_opt', 'cell_opt', + or 'geo_opt_cell_opt'. + steps: Maximum number of optimization steps. + fmax: Force convergence criterion in eV/Angstrom. + model: UMA model name ('uma-s-1p2', 'uma-m-1p1'). + task_name: Task head ('omat', 'oc20', 'oc22', 'oc25', + 'omol', 'odac', 'omc'). + device: Compute device ('cpu' or 'cuda'). + write_traj: Whether to write ASE trajectory files. + output_fname: Output filename (auto-generated if None). + """ + atoms = ase_read(fname) + calc = _create_calculator(model, task_name, device) + + fpath = Path(fname) + filename = str(fpath.with_suffix("")) + ext = fpath.suffix + + if output_fname is None: + output_fname = f"{filename}_opt_{model}{ext}" + + _run_opt_uma_with_calc( + atoms, + calc, + run_type=run_type, + steps=steps, + fmax=fmax, + write_traj=write_traj, + traj_prefix=filename, + ) + + ase_write(output_fname, atoms) + + +def run_sp_uma( + fname: str, + model: str = "uma-s-1p2", + task_name: str = "omat", + device: str = "cpu", +) -> dict: + """Run single-point energy/forces/stress calculation with UMA. + + Args: + fname: Path to input structure file. + model: UMA model name ('uma-s-1p2', 'uma-m-1p1'). + task_name: Task head ('omat', 'oc20', 'oc22', 'oc25', + 'omol', 'odac', 'omc'). + device: Compute device ('cpu' or 'cuda'). + + Returns: + Dict with 'energy' (float), 'forces' (list), + and 'stress' (list or None). + """ + atoms = ase_read(fname) + atoms.calc = _create_calculator(model, task_name, device) + + result = { + "energy": atoms.get_potential_energy(), + "forces": atoms.get_forces().tolist(), + } + + try: + result["stress"] = atoms.get_stress().tolist() + except Exception: + result["stress"] = None + + return result + + +def run_md_uma( + fname: str, + model: str = "uma-s-1p2", + task_name: str = "omat", + device: str = "cpu", + temperature: float = 300.0, + timestep: float = 1.0, + steps: int = 1000, + ensemble: str = "nvt", + friction: float = 0.01, + write_traj: bool = False, + output_fname: str = None, + log_interval: int = 10, +) -> None: + """Run molecular dynamics using a UMA model. + + Args: + fname: Path to input structure file. + model: UMA model name ('uma-s-1p2', 'uma-m-1p1'). + task_name: Task head ('omat', 'oc20', 'oc22', 'oc25', + 'omol', 'odac', 'omc'). + device: Compute device ('cpu' or 'cuda'). + temperature: Target temperature in Kelvin. + timestep: MD timestep in femtoseconds. + steps: Number of MD steps. + ensemble: 'nve' (VelocityVerlet) or 'nvt' (Langevin). + friction: Langevin friction coefficient (for NVT only). + write_traj: Whether to write ASE trajectory file. + output_fname: Output filename for final structure + (auto-generated if None). + log_interval: Steps between log entries. + """ + atoms = ase_read(fname) + atoms.calc = _create_calculator(model, task_name, device) + + fpath = Path(fname) + filename = str(fpath.with_suffix("")) + ext = fpath.suffix + + if output_fname is None: + output_fname = f"{filename}_md_{model}{ext}" + + traj = f"{filename}_md.traj" if write_traj else None + + if ensemble == "nvt": + dyn = Langevin( + atoms, + timestep=timestep * units.fs, + temperature_K=temperature, + friction=friction, + trajectory=traj, + loginterval=log_interval, + ) + elif ensemble == "nve": + dyn = VelocityVerlet( + atoms, + timestep=timestep * units.fs, + trajectory=traj, + loginterval=log_interval, + ) + else: + raise ValueError( + f"ensemble '{ensemble}' is not supported. Options: 'nve', 'nvt'." + ) + + dyn.run(steps=steps) + ase_write(output_fname, atoms) + + +def _gpu_worker( + gpu_id, + job_queue, + result_queue, + task_name, + steps, + fmax, + fmax_cell, + output_dir, + write_traj, + overwrite, + device, +): + """Worker process pinned to a single GPU. + + Must be module-level for multiprocessing spawn pickling. + Caches calculators per (model, task_name) to avoid + reloading models between jobs on the same GPU. + """ + import os + + if device == "cuda": + os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id) + os.environ.setdefault("OMP_NUM_THREADS", "1") + + calc_cache = {} + outdir = Path(output_dir) + outdir.mkdir(parents=True, exist_ok=True) + + while True: + job = job_queue.get() + if job is None: + break + + input_file, model, run_type = job + stem = Path(input_file).stem + tag = f"{stem}__{run_type}__{model}" + output_cif = outdir / f"{tag}.cif" + + if output_cif.exists() and not overwrite: + result_queue.put( + { + "structure": stem, + "model": model, + "run_type": run_type, + "task_name": task_name, + "status": "skipped", + "output_file": str(output_cif), + "converged": None, + "final_energy": None, + "n_steps": None, + "error_message": None, + } + ) + continue + + try: + cache_key = (model, task_name) + if cache_key not in calc_cache: + calc_cache[cache_key] = _create_calculator( + model, task_name, device + ) + calc = calc_cache[cache_key] + + atoms = ase_read(input_file) + + traj_dir = outdir / "trajectories" + traj_prefix = None + if write_traj: + traj_dir.mkdir(parents=True, exist_ok=True) + traj_prefix = str(traj_dir / tag) + + result = _run_opt_uma_with_calc( + atoms, + calc, + run_type=run_type, + steps=steps, + fmax=fmax, + fmax_cell=fmax_cell, + write_traj=write_traj, + traj_prefix=traj_prefix, + ) + + ase_write(str(output_cif), atoms) + + result_queue.put( + { + "structure": stem, + "model": model, + "run_type": run_type, + "task_name": task_name, + "status": "success", + "output_file": str(output_cif), + "converged": result["converged"], + "final_energy": result["final_energy"], + "n_steps": result["n_steps"], + "error_message": None, + } + ) + print( + f"[GPU {gpu_id}] Done: {tag} " + f"(E={result['final_energy']:.4f}, " + f"converged={result['converged']})" + ) + except Exception as e: + result_queue.put( + { + "structure": stem, + "model": model, + "run_type": run_type, + "task_name": task_name, + "status": "failure", + "output_file": None, + "converged": None, + "final_energy": None, + "n_steps": None, + "error_message": str(e), + } + ) + print(f"[GPU {gpu_id}] ERROR: {tag}: {e}") + + +def run_opt_uma_batch( + input_path: str, + output_dir: str, + models: list | str = "uma-s-1p2", + run_types: list | str = "geo_opt", + task_name: str = "omat", + steps: int = 1000, + fmax: float = 1e-3, + fmax_cell: float | None = None, + num_gpus: int | None = None, + device: str = "cuda", + write_traj: bool = False, + overwrite: bool = False, +) -> str: + """Run UMA optimization in batch across multiple GPUs. + + Creates a Cartesian product of (input_files x models x + run_types) and distributes jobs across GPU workers using + a shared queue. Each worker caches calculators to avoid + reloading models between jobs. + + Args: + input_path: Path to a single structure file or a + directory of CIF files. + output_dir: Directory for output CIFs and results.jsonl. + models: Model name(s). String or list of strings. + run_types: Run type(s). String or list of strings. + Options: 'geo_opt', 'cell_opt', 'geo_opt_cell_opt'. + task_name: FAIRChem task head. + steps: Maximum optimization steps per stage. + fmax: Force convergence criterion (eV/A) for geometry. + fmax_cell: Force convergence for cell optimization. + Falls back to fmax if None. + num_gpus: Number of GPUs. Auto-detected if None. + device: 'cuda' or 'cpu'. + write_traj: Write ASE trajectory files. + overwrite: Overwrite existing output files. + + Returns: + Path to the results.jsonl file. + """ + import itertools + import json + from multiprocessing import get_context + + # Normalize inputs + if isinstance(models, str): + models = [models] + if isinstance(run_types, str): + run_types = [run_types] + + input_p = Path(input_path) + if input_p.is_dir(): + input_files = sorted(str(f) for f in input_p.glob("*.cif")) + if not input_files: + raise FileNotFoundError(f"No CIF files found in: {input_path}") + else: + input_files = [str(input_p)] + + jobs = list(itertools.product(input_files, models, run_types)) + if not jobs: + print("No jobs to run.") + return "" + + # Auto-detect GPU count + if num_gpus is None: + if device == "cuda": + try: + import torch + + num_gpus = torch.cuda.device_count() + except ImportError: + num_gpus = 1 + if num_gpus == 0: + num_gpus = 1 + else: + num_gpus = 1 + + n_workers = min(num_gpus, len(jobs)) + + print( + f"Running {len(jobs)} jobs across {n_workers} workers (device={device})" + ) + + ctx = get_context("spawn") + job_queue = ctx.Queue() + result_queue = ctx.Queue() + + for job in jobs: + job_queue.put(job) + for _ in range(n_workers): + job_queue.put(None) + + workers = [] + for gpu_id in range(n_workers): + proc = ctx.Process( + target=_gpu_worker, + args=( + gpu_id, + job_queue, + result_queue, + task_name, + steps, + fmax, + fmax_cell, + output_dir, + write_traj, + overwrite, + device, + ), + ) + proc.start() + workers.append(proc) + + for proc in workers: + proc.join() + + # Collect results + results = [] + while not result_queue.empty(): + results.append(result_queue.get()) + + results.sort(key=lambda r: (r["structure"], r["model"], r["run_type"])) + + outdir = Path(output_dir) + outdir.mkdir(parents=True, exist_ok=True) + summary_path = outdir / "results.jsonl" + + success = sum(1 for r in results if r["status"] == "success") + skipped = sum(1 for r in results if r["status"] == "skipped") + failed = len(results) - success - skipped + + with open(summary_path, "w", encoding="utf-8") as f: + for rec in results: + f.write(json.dumps(rec) + "\n") + + print( + f"Batch complete: {success} success, {skipped} skipped, " + f"{failed} failure out of {len(results)} jobs" + ) + print(f"Results: {summary_path}") + + return str(summary_path) diff --git a/src/matkit/tobacco/create_linker_from_smiles.py b/src/matkit/tobacco/create_linker_from_smiles.py index 9f27d21..5787189 100644 --- a/src/matkit/tobacco/create_linker_from_smiles.py +++ b/src/matkit/tobacco/create_linker_from_smiles.py @@ -269,10 +269,7 @@ def smiles_to_cif(smiles, output_prefix="L1"): d = struct.get_distance(i, j, mic=True) jlabel = f"{struct[j].symbol}{j + 1}" dist = round(d, 4) - bond_block += ( - f"{label:<6}{jlabel:<6}" - f"{dist:<8}. S\n" - ) + bond_block += f"{label:<6}{jlabel:<6}{dist:<8}. S\n" with open(final_cif, "w", encoding="utf-8") as f: f.write( diff --git a/src/matkit/types.py b/src/matkit/types.py new file mode 100644 index 0000000..2df5b3d --- /dev/null +++ b/src/matkit/types.py @@ -0,0 +1,66 @@ +"""Typed return value contracts for matkit functions. + +These TypedDicts document the dict shapes returned by matkit's +core functions. They are purely for type-checking and IDE support +-- no runtime validation is performed. +""" + +from __future__ import annotations + +from typing import Optional, TypedDict + + +class GRASPAResult(TypedDict): + """Return type for ``matkit.graspa.get_output_data``.""" + + success: bool + uptake: Optional[float] + error: Optional[float] + unit: str + qst: Optional[float] + error_qst: Optional[float] + qst_unit: str + calc_time_in_s: Optional[float] + + +class GRASPASyclResult(TypedDict): + """Return type for ``matkit.graspa_sycl.get_output_data``.""" + + success: bool + uptake: float + error: float + unit: str + calc_time_in_s: Optional[float] + + +class RASPA2Result(TypedDict): + """Return type for ``matkit.raspa2.get_output_data``.""" + + success: bool + uptake: float + error: float + unit: str + calc_time_in_s: Optional[int] + + +class ZeoppResult(TypedDict): + """Return type for ``matkit.zeopp.run_zeopp``.""" + + success: bool + results: dict + error: Optional[str] + + +class UMABatchResult(TypedDict): + """Record type for ``matkit.mlip.run_opt_uma_batch`` results.""" + + structure: str + model: str + run_type: str + task_name: str + status: str # "success", "failure", or "skipped" + output_file: Optional[str] + converged: Optional[bool] + final_energy: Optional[float] + n_steps: Optional[int] + error_message: Optional[str] diff --git a/src/matkit/utils/remove_solvent.py b/src/matkit/utils/remove_solvent.py index a8377ce..b7083f2 100644 --- a/src/matkit/utils/remove_solvent.py +++ b/src/matkit/utils/remove_solvent.py @@ -17,16 +17,20 @@ def remove_solvent(path_to_cif, output_path, mass_ratio=0.8, skin=0.3): atoms = ase_read(path_to_cif) cutOff = neighborlist.natural_cutoffs(atoms) - neighborList = neighborlist.NeighborList(cutOff, - self_interaction=False, - bothways=True, - skin=skin) + neighborList = neighborlist.NeighborList( + cutOff, self_interaction=False, bothways=True, skin=skin + ) neighborList.update(atoms) G = nx.Graph() for k in range(len(atoms)): - tup = (k, {"element": f"{atoms.get_chemical_symbols()[k]}", - "pos": atoms.get_positions()[k]}) + tup = ( + k, + { + "element": f"{atoms.get_chemical_symbols()[k]}", + "pos": atoms.get_positions()[k], + }, + ) G.add_nodes_from([tup]) for k in range(len(atoms)): @@ -48,11 +52,9 @@ def remove_solvent(path_to_cif, output_path, mass_ratio=0.8, skin=0.3): if index == max_index: continue else: - if mass/massG[max_index] < mass_ratio: + if mass / massG[max_index] < mass_ratio: for n in Gcc[index]: solvent_indice.append(n) ions_index = sorted(solvent_indice, reverse=True) del atoms[ions_index] - ase_write(f'{output_path}', atoms) - - + ase_write(f"{output_path}", atoms) diff --git a/src/matkit/zeopp/zeopp.py b/src/matkit/zeopp/zeopp.py index 6891ead..c65e37d 100644 --- a/src/matkit/zeopp/zeopp.py +++ b/src/matkit/zeopp/zeopp.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from concurrent.futures import ThreadPoolExecutor import json from pathlib import Path @@ -5,6 +7,8 @@ import subprocess import tempfile +from matkit.types import ZeoppResult + VALID_ANALYSES = {"res", "sa", "vol", "psd", "chan"} @@ -291,7 +295,7 @@ def run_zeopp( radii_file: str | None = None, network_path: str | None = None, output_dir: str | None = None, -) -> dict: +) -> ZeoppResult: """Run Zeo++ network binary on a CIF structure file. Builds a single command combining all requested analysis flags @@ -327,15 +331,11 @@ def run_zeopp( raise FileNotFoundError(f"CIF file does not exist: {cif}") if radii_file is None: - radii_file = str( - Path(__file__).parent / "files" / "UFF.rad" - ) + radii_file = str(Path(__file__).parent / "files" / "UFF.rad") radii_path = Path(radii_file) if not radii_path.exists(): - raise FileNotFoundError( - f"Radii file does not exist: {radii_file}" - ) + raise FileNotFoundError(f"Radii file does not exist: {radii_file}") if analyses is None: analyses = ["res"] @@ -376,20 +376,32 @@ def run_zeopp( if analysis == "res": cmd.extend(["-res"]) elif analysis == "sa": - cmd.extend([ - "-sa", str(probe_radius), - str(chan_radius), str(num_samples), - ]) + cmd.extend( + [ + "-sa", + str(probe_radius), + str(chan_radius), + str(num_samples), + ] + ) elif analysis == "vol": - cmd.extend([ - "-vol", str(probe_radius), - str(chan_radius), str(num_samples), - ]) + cmd.extend( + [ + "-vol", + str(probe_radius), + str(chan_radius), + str(num_samples), + ] + ) elif analysis == "psd": - cmd.extend([ - "-psd", str(probe_radius), - str(chan_radius), str(num_samples), - ]) + cmd.extend( + [ + "-psd", + str(probe_radius), + str(chan_radius), + str(num_samples), + ] + ) elif analysis == "chan": cmd.extend(["-chan", str(probe_radius)]) cmd.append(str(cif_dest)) @@ -476,15 +488,11 @@ def run_batch( """ cif_path = Path(cif_dir) if not cif_path.exists(): - raise FileNotFoundError( - f"CIF directory does not exist: {cif_dir}" - ) + raise FileNotFoundError(f"CIF directory does not exist: {cif_dir}") cif_files = sorted(cif_path.glob("*.cif")) if not cif_files: - raise FileNotFoundError( - f"No CIF files found in: {cif_dir}" - ) + raise FileNotFoundError(f"No CIF files found in: {cif_dir}") out_path = Path(output_dir) out_path.mkdir(parents=True, exist_ok=True) @@ -511,9 +519,7 @@ def _process_one(cif_file: Path) -> dict: "structure": stem, "status": "success", } - record.update( - _flatten_results(result["results"]) - ) + record.update(_flatten_results(result["results"])) return record except Exception as e: return { @@ -524,10 +530,7 @@ def _process_one(cif_file: Path) -> dict: results = [] with ThreadPoolExecutor(max_workers) as pool: - futures = { - pool.submit(_process_one, cif): cif - for cif in cif_files - } + futures = {pool.submit(_process_one, cif): cif for cif in cif_files} for fut in futures: results.append(fut.result())