From fd4c4a7f88fa96b0228a25a911d08b6a88669428 Mon Sep 17 00:00:00 2001 From: Thang Pham Date: Wed, 27 May 2026 14:47:09 -0500 Subject: [PATCH 1/3] Add single/mixture mode to gRASPA isotherm config Add a mode field (single or mixture) to the YAML config to distinguish between running adsorbates as independent single-component isotherms vs running them together as a mixture simulation with mole fractions. --- scripts/graspa/isotherm_config.yaml | 4 +++ scripts/graspa/setup_isotherms.py | 53 +++++++++++++++++++++++++---- 2 files changed, 50 insertions(+), 7 deletions(-) diff --git a/scripts/graspa/isotherm_config.yaml b/scripts/graspa/isotherm_config.yaml index 3c0f364..1eac358 100644 --- a/scripts/graspa/isotherm_config.yaml +++ b/scripts/graspa/isotherm_config.yaml @@ -4,6 +4,10 @@ cif_dir: ./cifs outdir: ./batch_results +# mode: single — each adsorbate runs independently (separate sims) +# mode: mixture — all adsorbates run together in one sim (requires MolFraction) +mode: single + adsorbates: [CO2, N2] temperatures: [283, 293, 313, 333] diff --git a/scripts/graspa/setup_isotherms.py b/scripts/graspa/setup_isotherms.py index f499538..f0328fe 100644 --- a/scripts/graspa/setup_isotherms.py +++ b/scripts/graspa/setup_isotherms.py @@ -3,6 +3,10 @@ Usage: python setup_isotherms.py python setup_isotherms.py # defaults to isotherm_config.yaml + +Supports two modes via the 'mode' field in the YAML config: + - single: each adsorbate runs as independent single-component sims + - mixture: all adsorbates run together in one sim (requires MolFraction) """ import sys @@ -38,11 +42,10 @@ def main(): factor = PRESSURE_TO_PA[pressure_unit] pressures_pa = [p * factor for p in cfg["pressures"]] - adsorbates = [{"MoleculeName": name} for name in cfg["adsorbates"]] - + mode = cfg.get("mode", "single") print(f"Config: {config_path}") + print(f"Mode: {mode}") print(f"CIF dir: {cfg['cif_dir']}") - print(f"Adsorbates: {cfg['adsorbates']}") print(f"Temperatures: {cfg['temperatures']}") print( f"Pressures: {len(pressures_pa)} points " @@ -50,10 +53,8 @@ def main(): f"{cfg.get('pressure_unit', 'Pa')})" ) - manifest = setup_batch( + common_kwargs = dict( cif_dir=cfg["cif_dir"], - outpath=cfg["outdir"], - adsorbates=adsorbates, temperatures=cfg["temperatures"], pressures=pressures_pa, cutoff=cfg.get("cutoff", 12.8), @@ -61,7 +62,45 @@ def main(): max_workers=cfg.get("max_workers"), ) - print(f"\nSet up {len(manifest)} simulations in {cfg['outdir']}") + if mode == "single": + print(f"Adsorbates: {cfg['adsorbates']} (each runs independently)") + total = 0 + for ads_name in cfg["adsorbates"]: + ads_outdir = str(Path(cfg["outdir"]) / ads_name) + adsorbates = [{"MoleculeName": ads_name}] + manifest = setup_batch( + outpath=ads_outdir, + adsorbates=adsorbates, + template_dir=cfg.get("template_dir", "template"), + **common_kwargs, + ) + print(f" {ads_name}: {len(manifest)} simulations in {ads_outdir}") + total += len(manifest) + print(f"\nTotal: {total} simulations") + + elif mode == "mixture": + adsorbates = [] + for ad in cfg["adsorbates"]: + entry = {"MoleculeName": ad["name"]} + for k, v in ad.items(): + if k != "name": + entry[k] = v + adsorbates.append(entry) + names = [ad["MoleculeName"] for ad in adsorbates] + print(f"Adsorbates: {names} (mixture)") + + template_dir = cfg.get("template_dir", "template_mixture_isotherm") + manifest = setup_batch( + outpath=cfg["outdir"], + adsorbates=adsorbates, + template_dir=template_dir, + **common_kwargs, + ) + print(f"\nSet up {len(manifest)} simulations in {cfg['outdir']}") + + else: + raise ValueError(f"Unknown mode '{mode}'. Use 'single' or 'mixture'.") + print(f"Manifest: {cfg['outdir']}/simulations.jsonl") From dafd4ab545215cf1a0ace9d42cf5387a6d5496b3 Mon Sep 17 00:00:00 2001 From: Thang Pham Date: Thu, 28 May 2026 12:06:00 -0500 Subject: [PATCH 2/3] Extract shared template utilities to reduce duplication across simulation modules Add copy_template() and render_template() in matkit/utils/template.py and refactor graspa, raspa2, and graspa_sycl setup functions to use them instead of duplicated copy-and-substitute logic. --- src/matkit/graspa/graspa.py | 56 ++++++++++++--------------- src/matkit/graspa_sycl/graspa_sycl.py | 54 ++++++++++---------------- src/matkit/raspa2/raspa2.py | 56 ++++++++++----------------- src/matkit/utils/__init__.py | 9 ++++- src/matkit/utils/template.py | 36 +++++++++++++++++ 5 files changed, 109 insertions(+), 102 deletions(-) create mode 100644 src/matkit/utils/template.py diff --git a/src/matkit/graspa/graspa.py b/src/matkit/graspa/graspa.py index 1d9d408..915f701 100644 --- a/src/matkit/graspa/graspa.py +++ b/src/matkit/graspa/graspa.py @@ -1,14 +1,17 @@ from __future__ import annotations import json +import shutil from concurrent.futures import ThreadPoolExecutor from itertools import product from pathlib import Path -import shutil -from matkit.utils.unitcell_calculator import calculate_cell_size -from matkit.types import GRASPAResult + from ase.io import read as ase_read +from matkit.types import GRASPAResult +from matkit.utils.template import copy_template, render_template +from matkit.utils.unitcell_calculator import calculate_cell_size + def get_output_data( output_path: str, @@ -191,21 +194,15 @@ def setup_simulation( FileNotFoundError: If the CIF file does not exist. """ outdir = Path(outpath) - outdir.mkdir(parents=True, exist_ok=True) cifpath = Path(cif) if not cifpath.exists(): raise FileNotFoundError(f"CIF file does not exist: {cif}") cifname = cifpath.stem - shutil.copy(cifpath, outdir / f"{cifname}.cif") - # Copy template files template_path = Path(__file__).parent / "files" / template_dir - for item in template_path.iterdir(): - if item.is_dir(): - shutil.copytree(item, outdir, dirs_exist_ok=True) - else: - shutil.copy2(item, outdir) + copy_template(template_path, outdir) + shutil.copy(cifpath, outdir / f"{cifname}.cif") # Use pre-computed cell size or read CIF if cell_size is not None: @@ -214,31 +211,26 @@ def setup_simulation( atoms = ase_read(cifpath) uc_x, uc_y, uc_z = calculate_cell_size(atoms) - # Read template and replace placeholders input_path = outdir / "simulation.input" - with input_path.open("r") as f: - template = f.read() - - subs = { - "NCYCLE": str(n_cycle), - "TEMPERATURE": str(temperature), - "PRESSURE": str(pressure), - "CUTOFF": str(cutoff), - "CIFFILE": cifname, - "UC_X": str(uc_x), - "UC_Y": str(uc_y), - "UC_Z": str(uc_z), - } - - for key, val in subs.items(): - template = template.replace(key, val) + render_template( + input_path, + { + "NCYCLE": str(n_cycle), + "TEMPERATURE": str(temperature), + "PRESSURE": str(pressure), + "CUTOFF": str(cutoff), + "CIFFILE": cifname, + "UC_X": str(uc_x), + "UC_Y": str(uc_y), + "UC_Z": str(uc_z), + }, + ) # Replace component block placeholder component_block = generate_component_blocks(adsorbates) - template = template.replace("__COMPONENTS__", component_block) - # Write final input - with input_path.open("w") as f: - f.write(template) + content = input_path.read_text() + content = content.replace("__COMPONENTS__", component_block) + input_path.write_text(content) return True diff --git a/src/matkit/graspa_sycl/graspa_sycl.py b/src/matkit/graspa_sycl/graspa_sycl.py index cab63f8..efe1b85 100644 --- a/src/matkit/graspa_sycl/graspa_sycl.py +++ b/src/matkit/graspa_sycl/graspa_sycl.py @@ -1,8 +1,11 @@ -from pathlib import Path import shutil -from matkit.utils.unitcell_calculator import calculate_cell_size +from pathlib import Path + from ase.io import read as ase_read +from matkit.utils.template import copy_template, render_template +from matkit.utils.unitcell_calculator import calculate_cell_size + _file_dir = Path(__file__).parent / "files" / "template" @@ -33,45 +36,30 @@ def setup_simulation( FileNotFoundError: If the CIF file does not exist. """ outdir = Path(outpath) - outdir.mkdir(parents=True, exist_ok=True) cifpath = Path(cif) if not cifpath.exists(): raise FileNotFoundError(f"CIF file does not exist: {cif}") cifname = cifpath.stem - for item in _file_dir.iterdir(): - if item.is_dir(): - shutil.copytree(item, outdir, dirs_exist_ok=True) - else: - shutil.copy2(item, outdir) + copy_template(_file_dir, outdir) shutil.copy(cif, outdir) - # Editing input file. + atoms = ase_read(cif) - [uc_x, uc_y, uc_z] = calculate_cell_size(atoms) - - with ( - open(f"{outdir}/simulation.input", "r") as f_in, - open(f"{outdir}/simulation.input.tmp", "w") as f_out, - ): - for line in f_in: - if "NCYCLE" in line: - line = line.replace("NCYCLE", str(n_cycle)) - if "ADSORBATE" in line: - line = line.replace("ADSORBATE", adsorbate) - if "TEMPERATURE" in line: - line = line.replace("TEMPERATURE", str(temperature)) - if "PRESSURE" in line: - line = line.replace("PRESSURE", str(pressure)) - if "UC_X UC_Y UC_Z" in line: - line = line.replace("UC_X UC_Y UC_Z", f"{uc_x} {uc_y} {uc_z}") - if "CUTOFF" in line: - line = line.replace("CUTOFF", str(cutoff)) - if "CIFFILE" in line: - line = line.replace("CIFFILE", cifname) - f_out.write(line) - - shutil.move(f"{outdir}/simulation.input.tmp", f"{outdir}/simulation.input") + uc_x, uc_y, uc_z = calculate_cell_size(atoms) + + render_template( + outdir / "simulation.input", + { + "NCYCLE": str(n_cycle), + "ADSORBATE": adsorbate, + "TEMPERATURE": str(temperature), + "PRESSURE": str(pressure), + "UC_X UC_Y UC_Z": f"{uc_x} {uc_y} {uc_z}", + "CUTOFF": str(cutoff), + "CIFFILE": cifname, + }, + ) return True diff --git a/src/matkit/raspa2/raspa2.py b/src/matkit/raspa2/raspa2.py index 4ce0b3a..505e454 100644 --- a/src/matkit/raspa2/raspa2.py +++ b/src/matkit/raspa2/raspa2.py @@ -1,8 +1,11 @@ -from pathlib import Path import shutil -from matkit.utils.unitcell_calculator import calculate_cell_size +from pathlib import Path + from ase.io import read as ase_read +from matkit.utils.template import copy_template, render_template +from matkit.utils.unitcell_calculator import calculate_cell_size + _file_dir = Path(__file__).parent / "files" / "template" @@ -44,42 +47,23 @@ def setup_input_simulation( cifname = cifpath.stem outdir = outpath / cifname - outdir.mkdir(parents=True, exist_ok=True) - for item in _file_dir.iterdir(): - if item.is_dir(): - shutil.copytree(item, outdir, dirs_exist_ok=True) - else: - shutil.copy2(item, outdir) + copy_template(_file_dir, outdir) shutil.copy(cif, outdir) - # Editing input file. + atoms = ase_read(cif) - [uc_x, uc_y, uc_z] = calculate_cell_size(atoms) - - with ( - open(f"{outdir}/simulation.input", "r") as f_in, - open(f"{outdir}/simulation.input.tmp", "w") as f_out, - ): - for line in f_in: - if "NCYCLE" in line: - line = line.replace("NCYCLE", str(n_cycle)) - if "ADSORBATE" in line: - line = line.replace("ADSORBATE", adsorbate) - if "TEMPERATURE" in line: - line = line.replace("TEMPERATURE", str(temperature)) - if "PRESSURE" in line: - line = line.replace("PRESSURE", str(pressure)) - if "UC_X UC_Y UC_Z" in line: - line = line.replace( - "UC_X UC_Y UC_Z", f"{uc_x} {uc_y} {uc_z}" - ) - if "CUTOFF" in line: - line = line.replace("CUTOFF", str(cutoff)) - if "CIFFILE" in line: - line = line.replace("CIFFILE", cifname) - f_out.write(line) - - shutil.move( - f"{outdir}/simulation.input.tmp", f"{outdir}/simulation.input" + uc_x, uc_y, uc_z = calculate_cell_size(atoms) + + render_template( + outdir / "simulation.input", + { + "NCYCLE": str(n_cycle), + "ADSORBATE": adsorbate, + "TEMPERATURE": str(temperature), + "PRESSURE": str(pressure), + "UC_X UC_Y UC_Z": f"{uc_x} {uc_y} {uc_z}", + "CUTOFF": str(cutoff), + "CIFFILE": cifname, + }, ) return True diff --git a/src/matkit/utils/__init__.py b/src/matkit/utils/__init__.py index 79ef568..3434198 100644 --- a/src/matkit/utils/__init__.py +++ b/src/matkit/utils/__init__.py @@ -1,5 +1,12 @@ from matkit.utils.unitcell_calculator import calculate_cell_size from matkit.utils.remove_solvent import remove_solvent from matkit.utils.cifsampler import sample_cifs +from matkit.utils.template import copy_template, render_template -__all__ = ["calculate_cell_size", "remove_solvent", "sample_cifs"] +__all__ = [ + "calculate_cell_size", + "copy_template", + "remove_solvent", + "render_template", + "sample_cifs", +] diff --git a/src/matkit/utils/template.py b/src/matkit/utils/template.py new file mode 100644 index 0000000..0b7e62f --- /dev/null +++ b/src/matkit/utils/template.py @@ -0,0 +1,36 @@ +from __future__ import annotations + +import shutil +from pathlib import Path + + +def copy_template(template_dir: Path, output_dir: Path) -> None: + """Copy all files and directories from a template directory. + + Args: + template_dir: Source directory containing template files. + output_dir: Destination directory (created if needed). + """ + output_dir.mkdir(parents=True, exist_ok=True) + for item in template_dir.iterdir(): + if item.is_dir(): + shutil.copytree(item, output_dir, dirs_exist_ok=True) + else: + shutil.copy2(item, output_dir) + + +def render_template(file_path: Path, substitutions: dict[str, str]) -> None: + """Apply placeholder substitutions to a file in-place. + + Reads the file, replaces each key in *substitutions* with its + value using ``str.replace``, and writes the result back. + + Args: + file_path: Path to the file to modify. + substitutions: Mapping of placeholder strings to their + replacement values. + """ + content = file_path.read_text() + for key, val in substitutions.items(): + content = content.replace(key, val) + file_path.write_text(content) From e60ebc769436977a99cd99e0f5aff3cd75839176 Mon Sep 17 00:00:00 2001 From: Thang Pham Date: Thu, 28 May 2026 12:06:12 -0500 Subject: [PATCH 3/3] Replace print() with logging and add --verbose flag to CLI Replace all print() calls in library modules with logging.getLogger(__name__) calls at appropriate levels. Add -v/--verbose flag to the CLI (default WARNING, -v for INFO, -vv for DEBUG). Fix mace_opt.py to raise ValueError instead of returning "Error" string from a -> None function. --- src/matkit/cli.py | 20 ++++++++- src/matkit/mlip/mace_opt.py | 17 ++++---- src/matkit/mlip/uma.py | 41 ++++++++++++------- src/matkit/raspa3/raspa3.py | 5 ++- .../tobacco/create_linker_from_smiles.py | 9 ++-- src/matkit/zeopp/zeopp.py | 11 +++-- 6 files changed, 72 insertions(+), 31 deletions(-) diff --git a/src/matkit/cli.py b/src/matkit/cli.py index 966b352..7eaa788 100644 --- a/src/matkit/cli.py +++ b/src/matkit/cli.py @@ -1,11 +1,27 @@ +import logging + import click import json @click.group() -def main(): +@click.option( + "-v", + "--verbose", + count=True, + help="Increase verbosity (-v for info, -vv for debug).", +) +def main(verbose): """MatKit CLI: A modular toolkit for molecular simulations.""" - pass + level = logging.WARNING + if verbose == 1: + level = logging.INFO + elif verbose >= 2: + level = logging.DEBUG + logging.basicConfig( + level=level, + format="%(name)s: %(message)s", + ) # ========================================== diff --git a/src/matkit/mlip/mace_opt.py b/src/matkit/mlip/mace_opt.py index d2389b2..bcf874e 100644 --- a/src/matkit/mlip/mace_opt.py +++ b/src/matkit/mlip/mace_opt.py @@ -1,10 +1,13 @@ +import logging from pathlib import Path -from mace.calculators import mace_mp +from ase.constraints import ExpCellFilter from ase.io import read as ase_read from ase.io import write as ase_write from ase.optimize import BFGS -from ase.constraints import ExpCellFilter +from mace.calculators import mace_mp + +logger = logging.getLogger(__name__) def run_opt_mace( @@ -78,13 +81,13 @@ def run_opt_mace( dyn.run(fmax=fmax, steps=steps) else: - print( - (f"run_type {run_type} is not supported."), - ("Options are 'geo_opt', cell_opt' and 'geo_opt_cell_opt'"), + raise ValueError( + f"run_type '{run_type}' is not supported. " + "Options are 'geo_opt', 'cell_opt', and 'geo_opt_cell_opt'." ) - return "Error" ase_write(output_fname, atoms) except Exception as e: - print(e) + logger.error("MACE optimization failed: %s", e) + raise diff --git a/src/matkit/mlip/uma.py b/src/matkit/mlip/uma.py index 8659595..43e8d88 100644 --- a/src/matkit/mlip/uma.py +++ b/src/matkit/mlip/uma.py @@ -1,7 +1,11 @@ +import logging from pathlib import Path +from ase import units from ase.io import read as ase_read from ase.io import write as ase_write +from ase.md.langevin import Langevin +from ase.md.verlet import VelocityVerlet from ase.optimize import BFGS try: @@ -11,9 +15,8 @@ from ase.constraints import ExpCellFilter except ImportError: from ase.filters import FrechetCellFilter as ExpCellFilter -from ase.md.langevin import Langevin -from ase.md.verlet import VelocityVerlet -from ase import units + +logger = logging.getLogger(__name__) def _create_calculator( @@ -361,10 +364,12 @@ def _gpu_worker( "error_message": None, } ) - print( - f"[GPU {gpu_id}] Done: {tag} " - f"(E={result['final_energy']:.4f}, " - f"converged={result['converged']})" + logger.info( + "[GPU %d] Done: %s (E=%.4f, converged=%s)", + gpu_id, + tag, + result["final_energy"], + result["converged"], ) except Exception as e: result_queue.put( @@ -381,7 +386,7 @@ def _gpu_worker( "error_message": str(e), } ) - print(f"[GPU {gpu_id}] ERROR: {tag}: {e}") + logger.error("[GPU %d] ERROR: %s: %s", gpu_id, tag, e) def run_opt_uma_batch( @@ -445,7 +450,7 @@ def run_opt_uma_batch( jobs = list(itertools.product(input_files, models, run_types)) if not jobs: - print("No jobs to run.") + logger.warning("No jobs to run.") return "" # Auto-detect GPU count @@ -464,8 +469,11 @@ def run_opt_uma_batch( n_workers = min(num_gpus, len(jobs)) - print( - f"Running {len(jobs)} jobs across {n_workers} workers (device={device})" + logger.info( + "Running %d jobs across %d workers (device=%s)", + len(jobs), + n_workers, + device, ) ctx = get_context("spawn") @@ -520,10 +528,13 @@ def run_opt_uma_batch( for rec in results: f.write(json.dumps(rec) + "\n") - print( - f"Batch complete: {success} success, {skipped} skipped, " - f"{failed} failure out of {len(results)} jobs" + logger.info( + "Batch complete: %d success, %d skipped, %d failure out of %d jobs", + success, + skipped, + failed, + len(results), ) - print(f"Results: {summary_path}") + logger.info("Results: %s", summary_path) return str(summary_path) diff --git a/src/matkit/raspa3/raspa3.py b/src/matkit/raspa3/raspa3.py index 0522894..0d26b51 100644 --- a/src/matkit/raspa3/raspa3.py +++ b/src/matkit/raspa3/raspa3.py @@ -1,6 +1,9 @@ import json +import logging from pathlib import Path +logger = logging.getLogger(__name__) + def parse_raspa2_pseudo_atom(filepath): pseudo_atoms = [] @@ -126,5 +129,5 @@ def save_force_field(pseudo_fp, force_field_fp, outpath): with open(output_file, "w") as f: json.dump(combined, f, indent=2) - print(f"Saved combined force field to {output_file}") + logger.info("Saved combined force field to %s", output_file) return combined diff --git a/src/matkit/tobacco/create_linker_from_smiles.py b/src/matkit/tobacco/create_linker_from_smiles.py index 5787189..d0c3017 100644 --- a/src/matkit/tobacco/create_linker_from_smiles.py +++ b/src/matkit/tobacco/create_linker_from_smiles.py @@ -1,10 +1,13 @@ +import logging import subprocess from pathlib import Path -import numpy as np import networkx as nx -from ase.io import read as ase_read, write as ase_write +import numpy as np from ase import neighborlist +from ase.io import read as ase_read, write as ase_write + +logger = logging.getLogger(__name__) # Periodic table list PT = { @@ -329,7 +332,7 @@ def create_linker( Path(initial_cif).unlink(missing_ok=True) Path("temp.cif").unlink(missing_ok=True) - print(f"Successfully created linker: {output_cif}") + logger.info("Successfully created linker: %s", output_cif) # === Run the Workflow === diff --git a/src/matkit/zeopp/zeopp.py b/src/matkit/zeopp/zeopp.py index c65e37d..592da41 100644 --- a/src/matkit/zeopp/zeopp.py +++ b/src/matkit/zeopp/zeopp.py @@ -1,5 +1,6 @@ from __future__ import annotations +import logging from concurrent.futures import ThreadPoolExecutor import json from pathlib import Path @@ -9,6 +10,8 @@ from matkit.types import ZeoppResult +logger = logging.getLogger(__name__) + VALID_ANALYSES = {"res", "sa", "vol", "psd", "chan"} @@ -547,9 +550,11 @@ def _process_one(cif_file: Path) -> dict: total = len(results) failed = total - success - print( - f"Zeo++ batch complete: {success} success, " - f"{failed} failure out of {total} structures" + logger.info( + "Zeo++ batch complete: %d success, %d failure out of %d structures", + success, + failed, + total, ) return str(summary_path)