From 649900d6ca6f2a67b8d3f8ee1dbf055f42bc3eae Mon Sep 17 00:00:00 2001 From: Maria Castellanos Date: Wed, 29 Apr 2026 12:23:16 -0400 Subject: [PATCH 1/4] implement parallel cv --- openadmet/models/eval/cross_validation.py | 133 +++++++++++++++------- 1 file changed, 90 insertions(+), 43 deletions(-) diff --git a/openadmet/models/eval/cross_validation.py b/openadmet/models/eval/cross_validation.py index b20068c6..9c8d5a3f 100644 --- a/openadmet/models/eval/cross_validation.py +++ b/openadmet/models/eval/cross_validation.py @@ -1,9 +1,11 @@ """Cross-validation evaluators for regression models.""" import json +import threading from functools import partial from collections import defaultdict from typing import Any, ClassVar +from joblib import Parallel, delayed import pandas as pd import numpy as np from loguru import logger @@ -484,6 +486,14 @@ class PytorchLightningRepeatedKFoldCrossValidation(CrossValidationBase): n_splits: int = Field(5, description="Number of splits for cross-validation") n_repeats: int = Field(1, description="Number of repeats for cross-validation") random_state: int = Field(42, description="Random state for reproducibility") + n_jobs: int = Field( + 1, + description=( + "Number of parallel jobs for fold execution. n_jobs=1 is serial (default). " + "When n_jobs > 1 with GPU acceleration, all folds share the same GPU — " + "ensure sufficient VRAM for n_jobs models simultaneously." + ), + ) _evaluated: bool = False _driver_type: DriverType = DriverType.LIGHTNING axes_labels: list[str] = Field( @@ -523,6 +533,60 @@ def active_metrics(self): ) return metrics + @staticmethod + def _run_single_fold( + fold, + y_val, + y_true, + y_pred, + fold_train_dataloader, + fold_val_dataloader, + fold_train_scaler, + model, + trainer, + target_labels, + n_tasks, + active_metrics, + ): + logger.info(f"Fold {fold} starting on thread {threading.current_thread().name}") + + fold_model = model.make_new() + fold_model.build(scaler=fold_train_scaler) + + fold_trainer = LightningTrainer( + max_epochs=trainer.max_epochs, + accelerator=trainer.accelerator, + devices=trainer.devices, + use_wandb=False, + output_dir=trainer.output_dir / "cv" / f"fold_{str(fold)}", + wandb_project=trainer.wandb_project, + ) + fold_trainer.model = fold_model + fold_trainer.build() + + fold_model = fold_trainer.train(fold_train_dataloader, fold_val_dataloader) + y_pred_fold = fold_model.predict( + fold_val_dataloader, + accelerator=trainer.accelerator, + devices=trainer.devices, + ) + + if not (n_tasks == y_pred_fold.shape[1]): + raise ValueError("y_true and y_pred must have the same number of tasks") + + fold_metrics = {} + for task_id in range(n_tasks): + t_true, t_pred = get_t_true_and_t_pred( + task_id, y_true, y_pred, y_val, y_pred_fold + ) + t_label = target_labels[task_id] + fold_metrics[t_label] = {} + for metric_name, metric_data in active_metrics.items(): + metric_func, _, _ = metric_data + fold_metrics[t_label][metric_name] = metric_func(t_true, t_pred) + + return fold_metrics, y_val, y_pred_fold + def evaluate( self, model=None, @@ -615,7 +679,7 @@ def evaluate( X_all, y_all, groups, self.n_splits, self.n_repeats, self.random_state ) - cv = iter(zip(train_inds, test_inds)) + cv_list = list(zip(train_inds, test_inds)) self.data = { "shape": [self.n_splits, self.n_repeats], @@ -637,70 +701,53 @@ def evaluate( t_label = target_labels[task_id] self._metric_data[t_label] = defaultdict(list) - for fold, (fold_train_ids, fold_val_ids) in enumerate(cv): - logger.info(f"Fold {fold}") - + # Featurize all folds serially — RDKit is not thread-safe + fold_inputs = [] + for fold, (fold_train_ids, fold_val_ids) in enumerate(cv_list): X_train = X_all[fold_train_ids] y_train = y_all[fold_train_ids] X_val = X_all[fold_val_ids] y_val = y_all[fold_val_ids] - # print shapes of matrices logger.debug(f"X_train shape: {X_train.shape}") logger.debug(f"y_train shape: {y_train.shape}") logger.debug(f"X_val shape: {X_val.shape}") logger.debug(f"y_val shape: {y_val.shape}") - # Create a new featurizer and model for each fold fold_featurizer = featurizer.make_new() - fold_train_dataloader, _, fold_train_scaler, _ = fold_featurizer.featurize( X_train, y_train ) - fold_val_dataloader, _, _, _ = fold_featurizer.featurize(X_val, y_val) - fold_model = model.make_new() - fold_model.build(scaler=fold_train_scaler) - - fold_trainer = LightningTrainer( - max_epochs=trainer.max_epochs, - accelerator=trainer.accelerator, - devices=trainer.devices, - use_wandb=False, - output_dir=trainer.output_dir / "cv" / f"fold_{str(fold)}", - wandb_project=trainer.wandb_project, + fold_inputs.append( + (fold, y_val, fold_train_dataloader, fold_val_dataloader, fold_train_scaler) ) - # Pass model to trainer - fold_trainer.model = fold_model - fold_trainer.build() - - # Pass the dataloaders to the trainer - fold_model = fold_trainer.train(fold_train_dataloader, fold_val_dataloader) - # evaluate the model - y_pred_fold = fold_model.predict( - fold_val_dataloader, - accelerator=trainer.accelerator, - devices=trainer.devices, + if self.n_jobs > 1 and trainer.accelerator == "gpu": + logger.warning( + f"Running {self.n_jobs} CV folds in parallel on GPU. " + "Each fold loads a separate model — ensure sufficient VRAM." ) - # calculate the mean and confidence interval for each metric - # loop over tasks and calculate the statistics - if not (n_tasks == y_pred_fold.shape[1]): - raise ValueError("y_true and y_pred must have the same number of tasks") - - for task_id in range(n_tasks): - t_true, t_pred = get_t_true_and_t_pred( - task_id, y_true, y_pred, y_val, y_pred_fold - ) - t_label = target_labels[task_id] + fold_results = Parallel(n_jobs=self.n_jobs, prefer="threads")( + delayed(self._run_single_fold)( + fold, y_val, y_true, y_pred, + fold_train_dl, fold_val_dl, fold_train_scaler, + model, trainer, + target_labels, n_tasks, self.active_metrics, + ) + for fold, y_val, fold_train_dl, fold_val_dl, fold_train_scaler in fold_inputs + ) - for metric_name, metric_data in self.active_metrics.items(): - metric_func, is_scipy_metric, _ = metric_data - value = metric_func(t_true, t_pred) + for fold_metrics, y_val, y_pred_fold in fold_results: + for t_label, metrics in fold_metrics.items(): + for metric_name, value in metrics.items(): self._metric_data[t_label][metric_name].append(value) - logger.info(f"Fold {fold} complete") + # y_val and y_pred_fold from the last fold are used for the regression plot + _, y_val, y_pred_fold = fold_results[-1] + + logger.info(f"All {len(cv_list)} folds complete") # now we have the metric data for each task, calculate the mean and confidence interval for t_label in target_labels: From c262028cf5d47ffdd97a9320369c3d03c90831d2 Mon Sep 17 00:00:00 2001 From: Maria Castellanos Date: Wed, 29 Apr 2026 16:51:21 -0400 Subject: [PATCH 2/4] fixing threading pt 2 --- openadmet/models/architecture/chemprop.py | 8 +-- openadmet/models/eval/cross_validation.py | 73 +++++++++++++++-------- openadmet/models/trainer/lightning.py | 5 +- 3 files changed, 57 insertions(+), 29 deletions(-) diff --git a/openadmet/models/architecture/chemprop.py b/openadmet/models/architecture/chemprop.py index cb17553c..bb3868fa 100644 --- a/openadmet/models/architecture/chemprop.py +++ b/openadmet/models/architecture/chemprop.py @@ -18,9 +18,9 @@ from openadmet.models.architecture.model_base import models as model_registry _METRIC_TO_LOSS = { - "mse": nn.metrics.MSE(), - "mae": nn.metrics.MAE(), - "rmse": nn.metrics.RMSE(), + "mse": nn.metrics.MSE, + "mae": nn.metrics.MAE, + "rmse": nn.metrics.RMSE, } @@ -420,7 +420,7 @@ def build(self, scaler=None): """ if not self.estimator: - metric_list = [_METRIC_TO_LOSS[metric] for metric in self.metric_list] + metric_list = [_METRIC_TO_LOSS[metric]() for metric in self.metric_list] if self.from_chemeleon: logger.info( diff --git a/openadmet/models/eval/cross_validation.py b/openadmet/models/eval/cross_validation.py index 9c8d5a3f..5b6b762e 100644 --- a/openadmet/models/eval/cross_validation.py +++ b/openadmet/models/eval/cross_validation.py @@ -1,11 +1,11 @@ """Cross-validation evaluators for regression models.""" import json -import threading +import multiprocessing as mp +from concurrent.futures import ProcessPoolExecutor from functools import partial from collections import defaultdict from typing import Any, ClassVar -from joblib import Parallel, delayed import pandas as pd import numpy as np from loguru import logger @@ -543,33 +543,32 @@ def _run_single_fold( fold_val_dataloader, fold_train_scaler, model, - trainer, + trainer_config, target_labels, n_tasks, active_metrics, + n_jobs, ): - logger.info(f"Fold {fold} starting on thread {threading.current_thread().name}") + logger.info(f"Fold {fold} starting") fold_model = model.make_new() fold_model.build(scaler=fold_train_scaler) fold_trainer = LightningTrainer( - max_epochs=trainer.max_epochs, - accelerator=trainer.accelerator, - devices=trainer.devices, + max_epochs=trainer_config["max_epochs"], + accelerator=trainer_config["accelerator"], + devices=trainer_config["devices"], use_wandb=False, - output_dir=trainer.output_dir / "cv" / f"fold_{str(fold)}", - wandb_project=trainer.wandb_project, + output_dir=trainer_config["output_dir"] / "cv" / f"fold_{str(fold)}", + wandb_project=trainer_config["wandb_project"], + enable_progress_bar=trainer_config["enable_progress_bar"] if n_jobs == 1 else False, + inference_mode=trainer_config["inference_mode"], ) fold_trainer.model = fold_model fold_trainer.build() fold_model = fold_trainer.train(fold_train_dataloader, fold_val_dataloader) - y_pred_fold = fold_model.predict( - fold_val_dataloader, - accelerator=trainer.accelerator, - devices=trainer.devices, - ) + y_pred_fold = fold_model.predict(fold_val_dataloader) if not (n_tasks == y_pred_fold.shape[1]): raise ValueError("y_true and y_pred must have the same number of tasks") @@ -723,21 +722,47 @@ def evaluate( (fold, y_val, fold_train_dataloader, fold_val_dataloader, fold_train_scaler) ) + trainer_config = { + "max_epochs": trainer.max_epochs, + "accelerator": trainer.accelerator, + "devices": trainer.devices, + "output_dir": trainer.output_dir, + "wandb_project": trainer.wandb_project, + "enable_progress_bar": trainer.enable_progress_bar, + "inference_mode": trainer.inference_mode, + } + if self.n_jobs > 1 and trainer.accelerator == "gpu": logger.warning( f"Running {self.n_jobs} CV folds in parallel on GPU. " - "Each fold loads a separate model — ensure sufficient VRAM." + "Each fold spawns its own process with an independent CUDA context — " + "ensure sufficient VRAM for n_jobs models simultaneously." ) - fold_results = Parallel(n_jobs=self.n_jobs, prefer="threads")( - delayed(self._run_single_fold)( - fold, y_val, y_true, y_pred, - fold_train_dl, fold_val_dl, fold_train_scaler, - model, trainer, - target_labels, n_tasks, self.active_metrics, - ) - for fold, y_val, fold_train_dl, fold_val_dl, fold_train_scaler in fold_inputs - ) + if self.n_jobs == 1: + fold_results = [ + self._run_single_fold( + fold, y_val, y_true, y_pred, + fold_train_dl, fold_val_dl, fold_train_scaler, + model, trainer_config, target_labels, n_tasks, + self.active_metrics, self.n_jobs, + ) + for fold, y_val, fold_train_dl, fold_val_dl, fold_train_scaler in fold_inputs + ] + else: + spawn_ctx = mp.get_context("spawn") + with ProcessPoolExecutor(max_workers=self.n_jobs, mp_context=spawn_ctx) as pool: + futures = [ + pool.submit( + PytorchLightningRepeatedKFoldCrossValidation._run_single_fold, + fold, y_val, y_true, y_pred, + fold_train_dl, fold_val_dl, fold_train_scaler, + model, trainer_config, target_labels, n_tasks, + self.active_metrics, self.n_jobs, + ) + for fold, y_val, fold_train_dl, fold_val_dl, fold_train_scaler in fold_inputs + ] + fold_results = [f.result() for f in futures] for fold_metrics, y_val, y_pred_fold in fold_results: for t_label, metrics in fold_metrics.items(): diff --git a/openadmet/models/trainer/lightning.py b/openadmet/models/trainer/lightning.py index 2f89cac2..2bade0c3 100644 --- a/openadmet/models/trainer/lightning.py +++ b/openadmet/models/trainer/lightning.py @@ -79,6 +79,7 @@ class LightningTrainer(TrainerBase): early_stopping_patience: int = 10 early_stopping_mode: str = "min" early_stopping_min_delta: float = 0.001 + enable_progress_bar: bool = True gradient_clip_val: float = 0.0 precision: int = 32 accumulate_grad_batches: int = 1 @@ -86,6 +87,7 @@ class LightningTrainer(TrainerBase): fast_dev_run: bool = False limit_train_batches: float = 1.0 limit_val_batches: float = 1.0 + inference_mode: bool = False wandb_logger: Any = None _logger: Any @@ -157,7 +159,7 @@ def build(self, no_val: bool = False): # Initialize the PyTorch Lightning trainer self._trainer = pl.Trainer( logger=self._logger, - enable_progress_bar=True, + enable_progress_bar=self.enable_progress_bar, accelerator=self.accelerator, devices=self.devices, # Use GPU if available max_epochs=self.max_epochs, # number of epochs to train for @@ -169,6 +171,7 @@ def build(self, no_val: bool = False): fast_dev_run=self.fast_dev_run, limit_train_batches=self.limit_train_batches, limit_val_batches=self.limit_val_batches, + inference_mode=self.inference_mode, ) def train(self, train_dataloader, val_dataloader): From 5a42519ddfe0e91a636b72f7c7433924708be20d Mon Sep 17 00:00:00 2001 From: Maria Castellanos Date: Wed, 29 Apr 2026 17:28:03 -0400 Subject: [PATCH 3/4] Adding multi-gpu parallelization --- openadmet/models/eval/cross_validation.py | 23 ++++++++++++++++++----- openadmet/models/trainer/lightning.py | 2 +- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/openadmet/models/eval/cross_validation.py b/openadmet/models/eval/cross_validation.py index 5b6b762e..da7da033 100644 --- a/openadmet/models/eval/cross_validation.py +++ b/openadmet/models/eval/cross_validation.py @@ -3,6 +3,7 @@ import json import multiprocessing as mp from concurrent.futures import ProcessPoolExecutor +import torch from functools import partial from collections import defaultdict from typing import Any, ClassVar @@ -554,10 +555,19 @@ def _run_single_fold( fold_model = model.make_new() fold_model.build(scaler=fold_train_scaler) + # On multi-GPU parallel runs, pin each fold to a specific GPU so folds + # on different GPUs train simultaneously. Falls back to the configured + # devices for n_jobs=1 or CPU runs. + num_gpus = trainer_config.get("num_gpus", 0) + if n_jobs > 1 and trainer_config["accelerator"] == "gpu" and num_gpus > 0: + effective_devices = [fold % num_gpus] + else: + effective_devices = trainer_config["devices"] + fold_trainer = LightningTrainer( max_epochs=trainer_config["max_epochs"], accelerator=trainer_config["accelerator"], - devices=trainer_config["devices"], + devices=effective_devices, use_wandb=False, output_dir=trainer_config["output_dir"] / "cv" / f"fold_{str(fold)}", wandb_project=trainer_config["wandb_project"], @@ -722,6 +732,8 @@ def evaluate( (fold, y_val, fold_train_dataloader, fold_val_dataloader, fold_train_scaler) ) + num_gpus = torch.cuda.device_count() if trainer.accelerator == "gpu" else 0 + trainer_config = { "max_epochs": trainer.max_epochs, "accelerator": trainer.accelerator, @@ -730,13 +742,14 @@ def evaluate( "wandb_project": trainer.wandb_project, "enable_progress_bar": trainer.enable_progress_bar, "inference_mode": trainer.inference_mode, + "num_gpus": num_gpus, } if self.n_jobs > 1 and trainer.accelerator == "gpu": - logger.warning( - f"Running {self.n_jobs} CV folds in parallel on GPU. " - "Each fold spawns its own process with an independent CUDA context — " - "ensure sufficient VRAM for n_jobs models simultaneously." + logger.info( + f"Distributing {len(cv_list)} CV folds across {num_gpus} GPU(s) " + f"with n_jobs={self.n_jobs}. Each fold is assigned to GPU " + f"fold % {num_gpus}. Ensure sufficient VRAM per GPU." ) if self.n_jobs == 1: diff --git a/openadmet/models/trainer/lightning.py b/openadmet/models/trainer/lightning.py index 2bade0c3..fc1ce855 100644 --- a/openadmet/models/trainer/lightning.py +++ b/openadmet/models/trainer/lightning.py @@ -71,7 +71,7 @@ class LightningTrainer(TrainerBase): max_epochs: int = 20 accelerator: str = "gpu" - devices: int = 1 + devices: int | list[int] = 1 use_wandb: bool = False output_dir: Path = None wandb_project: str = "openadmet-testing" From ab92b1f2290ef14f8b663790aa43b2c809af2f64 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 29 Apr 2026 21:48:47 +0000 Subject: [PATCH 4/4] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- openadmet/models/eval/cross_validation.py | 50 ++++++++++++++++++----- 1 file changed, 39 insertions(+), 11 deletions(-) diff --git a/openadmet/models/eval/cross_validation.py b/openadmet/models/eval/cross_validation.py index da7da033..10fde721 100644 --- a/openadmet/models/eval/cross_validation.py +++ b/openadmet/models/eval/cross_validation.py @@ -571,7 +571,9 @@ def _run_single_fold( use_wandb=False, output_dir=trainer_config["output_dir"] / "cv" / f"fold_{str(fold)}", wandb_project=trainer_config["wandb_project"], - enable_progress_bar=trainer_config["enable_progress_bar"] if n_jobs == 1 else False, + enable_progress_bar=trainer_config["enable_progress_bar"] + if n_jobs == 1 + else False, inference_mode=trainer_config["inference_mode"], ) fold_trainer.model = fold_model @@ -729,7 +731,13 @@ def evaluate( ) fold_val_dataloader, _, _, _ = fold_featurizer.featurize(X_val, y_val) fold_inputs.append( - (fold, y_val, fold_train_dataloader, fold_val_dataloader, fold_train_scaler) + ( + fold, + y_val, + fold_train_dataloader, + fold_val_dataloader, + fold_train_scaler, + ) ) num_gpus = torch.cuda.device_count() if trainer.accelerator == "gpu" else 0 @@ -755,23 +763,43 @@ def evaluate( if self.n_jobs == 1: fold_results = [ self._run_single_fold( - fold, y_val, y_true, y_pred, - fold_train_dl, fold_val_dl, fold_train_scaler, - model, trainer_config, target_labels, n_tasks, - self.active_metrics, self.n_jobs, + fold, + y_val, + y_true, + y_pred, + fold_train_dl, + fold_val_dl, + fold_train_scaler, + model, + trainer_config, + target_labels, + n_tasks, + self.active_metrics, + self.n_jobs, ) for fold, y_val, fold_train_dl, fold_val_dl, fold_train_scaler in fold_inputs ] else: spawn_ctx = mp.get_context("spawn") - with ProcessPoolExecutor(max_workers=self.n_jobs, mp_context=spawn_ctx) as pool: + with ProcessPoolExecutor( + max_workers=self.n_jobs, mp_context=spawn_ctx + ) as pool: futures = [ pool.submit( PytorchLightningRepeatedKFoldCrossValidation._run_single_fold, - fold, y_val, y_true, y_pred, - fold_train_dl, fold_val_dl, fold_train_scaler, - model, trainer_config, target_labels, n_tasks, - self.active_metrics, self.n_jobs, + fold, + y_val, + y_true, + y_pred, + fold_train_dl, + fold_val_dl, + fold_train_scaler, + model, + trainer_config, + target_labels, + n_tasks, + self.active_metrics, + self.n_jobs, ) for fold, y_val, fold_train_dl, fold_val_dl, fold_train_scaler in fold_inputs ]