diff --git a/openadmet/models/architecture/chemprop.py b/openadmet/models/architecture/chemprop.py index cb17553c..bb3868fa 100644 --- a/openadmet/models/architecture/chemprop.py +++ b/openadmet/models/architecture/chemprop.py @@ -18,9 +18,9 @@ from openadmet.models.architecture.model_base import models as model_registry _METRIC_TO_LOSS = { - "mse": nn.metrics.MSE(), - "mae": nn.metrics.MAE(), - "rmse": nn.metrics.RMSE(), + "mse": nn.metrics.MSE, + "mae": nn.metrics.MAE, + "rmse": nn.metrics.RMSE, } @@ -420,7 +420,7 @@ def build(self, scaler=None): """ if not self.estimator: - metric_list = [_METRIC_TO_LOSS[metric] for metric in self.metric_list] + metric_list = [_METRIC_TO_LOSS[metric]() for metric in self.metric_list] if self.from_chemeleon: logger.info( diff --git a/openadmet/models/eval/cross_validation.py b/openadmet/models/eval/cross_validation.py index b20068c6..10fde721 100644 --- a/openadmet/models/eval/cross_validation.py +++ b/openadmet/models/eval/cross_validation.py @@ -1,6 +1,9 @@ """Cross-validation evaluators for regression models.""" import json +import multiprocessing as mp +from concurrent.futures import ProcessPoolExecutor +import torch from functools import partial from collections import defaultdict from typing import Any, ClassVar @@ -484,6 +487,14 @@ class PytorchLightningRepeatedKFoldCrossValidation(CrossValidationBase): n_splits: int = Field(5, description="Number of splits for cross-validation") n_repeats: int = Field(1, description="Number of repeats for cross-validation") random_state: int = Field(42, description="Random state for reproducibility") + n_jobs: int = Field( + 1, + description=( + "Number of parallel jobs for fold execution. n_jobs=1 is serial (default). " + "When n_jobs > 1 with GPU acceleration, all folds share the same GPU — " + "ensure sufficient VRAM for n_jobs models simultaneously." + ), + ) _evaluated: bool = False _driver_type: DriverType = DriverType.LIGHTNING axes_labels: list[str] = Field( @@ -523,6 +534,70 @@ def active_metrics(self): ) return metrics + @staticmethod + def _run_single_fold( + fold, + y_val, + y_true, + y_pred, + fold_train_dataloader, + fold_val_dataloader, + fold_train_scaler, + model, + trainer_config, + target_labels, + n_tasks, + active_metrics, + n_jobs, + ): + logger.info(f"Fold {fold} starting") + + fold_model = model.make_new() + fold_model.build(scaler=fold_train_scaler) + + # On multi-GPU parallel runs, pin each fold to a specific GPU so folds + # on different GPUs train simultaneously. Falls back to the configured + # devices for n_jobs=1 or CPU runs. + num_gpus = trainer_config.get("num_gpus", 0) + if n_jobs > 1 and trainer_config["accelerator"] == "gpu" and num_gpus > 0: + effective_devices = [fold % num_gpus] + else: + effective_devices = trainer_config["devices"] + + fold_trainer = LightningTrainer( + max_epochs=trainer_config["max_epochs"], + accelerator=trainer_config["accelerator"], + devices=effective_devices, + use_wandb=False, + output_dir=trainer_config["output_dir"] / "cv" / f"fold_{str(fold)}", + wandb_project=trainer_config["wandb_project"], + enable_progress_bar=trainer_config["enable_progress_bar"] + if n_jobs == 1 + else False, + inference_mode=trainer_config["inference_mode"], + ) + fold_trainer.model = fold_model + fold_trainer.build() + + fold_model = fold_trainer.train(fold_train_dataloader, fold_val_dataloader) + y_pred_fold = fold_model.predict(fold_val_dataloader) + + if not (n_tasks == y_pred_fold.shape[1]): + raise ValueError("y_true and y_pred must have the same number of tasks") + + fold_metrics = {} + for task_id in range(n_tasks): + t_true, t_pred = get_t_true_and_t_pred( + task_id, y_true, y_pred, y_val, y_pred_fold + ) + t_label = target_labels[task_id] + fold_metrics[t_label] = {} + for metric_name, metric_data in active_metrics.items(): + metric_func, _, _ = metric_data + fold_metrics[t_label][metric_name] = metric_func(t_true, t_pred) + + return fold_metrics, y_val, y_pred_fold + def evaluate( self, model=None, @@ -615,7 +690,7 @@ def evaluate( X_all, y_all, groups, self.n_splits, self.n_repeats, self.random_state ) - cv = iter(zip(train_inds, test_inds)) + cv_list = list(zip(train_inds, test_inds)) self.data = { "shape": [self.n_splits, self.n_repeats], @@ -637,70 +712,108 @@ def evaluate( t_label = target_labels[task_id] self._metric_data[t_label] = defaultdict(list) - for fold, (fold_train_ids, fold_val_ids) in enumerate(cv): - logger.info(f"Fold {fold}") - + # Featurize all folds serially — RDKit is not thread-safe + fold_inputs = [] + for fold, (fold_train_ids, fold_val_ids) in enumerate(cv_list): X_train = X_all[fold_train_ids] y_train = y_all[fold_train_ids] X_val = X_all[fold_val_ids] y_val = y_all[fold_val_ids] - # print shapes of matrices logger.debug(f"X_train shape: {X_train.shape}") logger.debug(f"y_train shape: {y_train.shape}") logger.debug(f"X_val shape: {X_val.shape}") logger.debug(f"y_val shape: {y_val.shape}") - # Create a new featurizer and model for each fold fold_featurizer = featurizer.make_new() - fold_train_dataloader, _, fold_train_scaler, _ = fold_featurizer.featurize( X_train, y_train ) - fold_val_dataloader, _, _, _ = fold_featurizer.featurize(X_val, y_val) - fold_model = model.make_new() - fold_model.build(scaler=fold_train_scaler) - - fold_trainer = LightningTrainer( - max_epochs=trainer.max_epochs, - accelerator=trainer.accelerator, - devices=trainer.devices, - use_wandb=False, - output_dir=trainer.output_dir / "cv" / f"fold_{str(fold)}", - wandb_project=trainer.wandb_project, + fold_inputs.append( + ( + fold, + y_val, + fold_train_dataloader, + fold_val_dataloader, + fold_train_scaler, + ) ) - # Pass model to trainer - fold_trainer.model = fold_model - fold_trainer.build() - - # Pass the dataloaders to the trainer - fold_model = fold_trainer.train(fold_train_dataloader, fold_val_dataloader) - # evaluate the model - y_pred_fold = fold_model.predict( - fold_val_dataloader, - accelerator=trainer.accelerator, - devices=trainer.devices, - ) + num_gpus = torch.cuda.device_count() if trainer.accelerator == "gpu" else 0 + + trainer_config = { + "max_epochs": trainer.max_epochs, + "accelerator": trainer.accelerator, + "devices": trainer.devices, + "output_dir": trainer.output_dir, + "wandb_project": trainer.wandb_project, + "enable_progress_bar": trainer.enable_progress_bar, + "inference_mode": trainer.inference_mode, + "num_gpus": num_gpus, + } - # calculate the mean and confidence interval for each metric - # loop over tasks and calculate the statistics - if not (n_tasks == y_pred_fold.shape[1]): - raise ValueError("y_true and y_pred must have the same number of tasks") + if self.n_jobs > 1 and trainer.accelerator == "gpu": + logger.info( + f"Distributing {len(cv_list)} CV folds across {num_gpus} GPU(s) " + f"with n_jobs={self.n_jobs}. Each fold is assigned to GPU " + f"fold % {num_gpus}. Ensure sufficient VRAM per GPU." + ) - for task_id in range(n_tasks): - t_true, t_pred = get_t_true_and_t_pred( - task_id, y_true, y_pred, y_val, y_pred_fold + if self.n_jobs == 1: + fold_results = [ + self._run_single_fold( + fold, + y_val, + y_true, + y_pred, + fold_train_dl, + fold_val_dl, + fold_train_scaler, + model, + trainer_config, + target_labels, + n_tasks, + self.active_metrics, + self.n_jobs, ) - t_label = target_labels[task_id] + for fold, y_val, fold_train_dl, fold_val_dl, fold_train_scaler in fold_inputs + ] + else: + spawn_ctx = mp.get_context("spawn") + with ProcessPoolExecutor( + max_workers=self.n_jobs, mp_context=spawn_ctx + ) as pool: + futures = [ + pool.submit( + PytorchLightningRepeatedKFoldCrossValidation._run_single_fold, + fold, + y_val, + y_true, + y_pred, + fold_train_dl, + fold_val_dl, + fold_train_scaler, + model, + trainer_config, + target_labels, + n_tasks, + self.active_metrics, + self.n_jobs, + ) + for fold, y_val, fold_train_dl, fold_val_dl, fold_train_scaler in fold_inputs + ] + fold_results = [f.result() for f in futures] - for metric_name, metric_data in self.active_metrics.items(): - metric_func, is_scipy_metric, _ = metric_data - value = metric_func(t_true, t_pred) + for fold_metrics, y_val, y_pred_fold in fold_results: + for t_label, metrics in fold_metrics.items(): + for metric_name, value in metrics.items(): self._metric_data[t_label][metric_name].append(value) - logger.info(f"Fold {fold} complete") + # y_val and y_pred_fold from the last fold are used for the regression plot + _, y_val, y_pred_fold = fold_results[-1] + + logger.info(f"All {len(cv_list)} folds complete") # now we have the metric data for each task, calculate the mean and confidence interval for t_label in target_labels: diff --git a/openadmet/models/trainer/lightning.py b/openadmet/models/trainer/lightning.py index 2f89cac2..fc1ce855 100644 --- a/openadmet/models/trainer/lightning.py +++ b/openadmet/models/trainer/lightning.py @@ -71,7 +71,7 @@ class LightningTrainer(TrainerBase): max_epochs: int = 20 accelerator: str = "gpu" - devices: int = 1 + devices: int | list[int] = 1 use_wandb: bool = False output_dir: Path = None wandb_project: str = "openadmet-testing" @@ -79,6 +79,7 @@ class LightningTrainer(TrainerBase): early_stopping_patience: int = 10 early_stopping_mode: str = "min" early_stopping_min_delta: float = 0.001 + enable_progress_bar: bool = True gradient_clip_val: float = 0.0 precision: int = 32 accumulate_grad_batches: int = 1 @@ -86,6 +87,7 @@ class LightningTrainer(TrainerBase): fast_dev_run: bool = False limit_train_batches: float = 1.0 limit_val_batches: float = 1.0 + inference_mode: bool = False wandb_logger: Any = None _logger: Any @@ -157,7 +159,7 @@ def build(self, no_val: bool = False): # Initialize the PyTorch Lightning trainer self._trainer = pl.Trainer( logger=self._logger, - enable_progress_bar=True, + enable_progress_bar=self.enable_progress_bar, accelerator=self.accelerator, devices=self.devices, # Use GPU if available max_epochs=self.max_epochs, # number of epochs to train for @@ -169,6 +171,7 @@ def build(self, no_val: bool = False): fast_dev_run=self.fast_dev_run, limit_train_batches=self.limit_train_batches, limit_val_batches=self.limit_val_batches, + inference_mode=self.inference_mode, ) def train(self, train_dataloader, val_dataloader):