Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions openadmet/models/architecture/chemprop.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
from openadmet.models.architecture.model_base import models as model_registry

_METRIC_TO_LOSS = {
"mse": nn.metrics.MSE(),
"mae": nn.metrics.MAE(),
"rmse": nn.metrics.RMSE(),
"mse": nn.metrics.MSE,
"mae": nn.metrics.MAE,
"rmse": nn.metrics.RMSE,
}


Expand Down Expand Up @@ -420,7 +420,7 @@ def build(self, scaler=None):

"""
if not self.estimator:
metric_list = [_METRIC_TO_LOSS[metric] for metric in self.metric_list]
metric_list = [_METRIC_TO_LOSS[metric]() for metric in self.metric_list]

if self.from_chemeleon:
logger.info(
Expand Down
197 changes: 155 additions & 42 deletions openadmet/models/eval/cross_validation.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
"""Cross-validation evaluators for regression models."""

import json
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor
import torch
from functools import partial
from collections import defaultdict
from typing import Any, ClassVar
Expand Down Expand Up @@ -484,6 +487,14 @@ class PytorchLightningRepeatedKFoldCrossValidation(CrossValidationBase):
n_splits: int = Field(5, description="Number of splits for cross-validation")
n_repeats: int = Field(1, description="Number of repeats for cross-validation")
random_state: int = Field(42, description="Random state for reproducibility")
n_jobs: int = Field(
1,
description=(
"Number of parallel jobs for fold execution. n_jobs=1 is serial (default). "
"When n_jobs > 1 with GPU acceleration, all folds share the same GPU — "
"ensure sufficient VRAM for n_jobs models simultaneously."
),
)
_evaluated: bool = False
_driver_type: DriverType = DriverType.LIGHTNING
axes_labels: list[str] = Field(
Expand Down Expand Up @@ -523,6 +534,70 @@ def active_metrics(self):
)
return metrics

@staticmethod
def _run_single_fold(
fold,
y_val,
y_true,
y_pred,
fold_train_dataloader,
fold_val_dataloader,
fold_train_scaler,
model,
trainer_config,
target_labels,
n_tasks,
active_metrics,
n_jobs,
):
logger.info(f"Fold {fold} starting")

fold_model = model.make_new()
fold_model.build(scaler=fold_train_scaler)

# On multi-GPU parallel runs, pin each fold to a specific GPU so folds
# on different GPUs train simultaneously. Falls back to the configured
# devices for n_jobs=1 or CPU runs.
num_gpus = trainer_config.get("num_gpus", 0)
if n_jobs > 1 and trainer_config["accelerator"] == "gpu" and num_gpus > 0:
effective_devices = [fold % num_gpus]
else:
effective_devices = trainer_config["devices"]

fold_trainer = LightningTrainer(
max_epochs=trainer_config["max_epochs"],
accelerator=trainer_config["accelerator"],
devices=effective_devices,
use_wandb=False,
output_dir=trainer_config["output_dir"] / "cv" / f"fold_{str(fold)}",
wandb_project=trainer_config["wandb_project"],
enable_progress_bar=trainer_config["enable_progress_bar"]
if n_jobs == 1
else False,
inference_mode=trainer_config["inference_mode"],
)
fold_trainer.model = fold_model
fold_trainer.build()

fold_model = fold_trainer.train(fold_train_dataloader, fold_val_dataloader)
y_pred_fold = fold_model.predict(fold_val_dataloader)

if not (n_tasks == y_pred_fold.shape[1]):
raise ValueError("y_true and y_pred must have the same number of tasks")

fold_metrics = {}
for task_id in range(n_tasks):
t_true, t_pred = get_t_true_and_t_pred(
task_id, y_true, y_pred, y_val, y_pred_fold
)
t_label = target_labels[task_id]
fold_metrics[t_label] = {}
for metric_name, metric_data in active_metrics.items():
metric_func, _, _ = metric_data
fold_metrics[t_label][metric_name] = metric_func(t_true, t_pred)

return fold_metrics, y_val, y_pred_fold

def evaluate(
self,
model=None,
Expand Down Expand Up @@ -615,7 +690,7 @@ def evaluate(
X_all, y_all, groups, self.n_splits, self.n_repeats, self.random_state
)

cv = iter(zip(train_inds, test_inds))
cv_list = list(zip(train_inds, test_inds))

self.data = {
"shape": [self.n_splits, self.n_repeats],
Expand All @@ -637,70 +712,108 @@ def evaluate(
t_label = target_labels[task_id]
self._metric_data[t_label] = defaultdict(list)

for fold, (fold_train_ids, fold_val_ids) in enumerate(cv):
logger.info(f"Fold {fold}")

# Featurize all folds serially — RDKit is not thread-safe
fold_inputs = []
for fold, (fold_train_ids, fold_val_ids) in enumerate(cv_list):
X_train = X_all[fold_train_ids]
y_train = y_all[fold_train_ids]
X_val = X_all[fold_val_ids]
y_val = y_all[fold_val_ids]

# print shapes of matrices
logger.debug(f"X_train shape: {X_train.shape}")
logger.debug(f"y_train shape: {y_train.shape}")
logger.debug(f"X_val shape: {X_val.shape}")
logger.debug(f"y_val shape: {y_val.shape}")

# Create a new featurizer and model for each fold
fold_featurizer = featurizer.make_new()

fold_train_dataloader, _, fold_train_scaler, _ = fold_featurizer.featurize(
X_train, y_train
)

fold_val_dataloader, _, _, _ = fold_featurizer.featurize(X_val, y_val)
fold_model = model.make_new()
fold_model.build(scaler=fold_train_scaler)

fold_trainer = LightningTrainer(
max_epochs=trainer.max_epochs,
accelerator=trainer.accelerator,
devices=trainer.devices,
use_wandb=False,
output_dir=trainer.output_dir / "cv" / f"fold_{str(fold)}",
wandb_project=trainer.wandb_project,
fold_inputs.append(
(
fold,
y_val,
fold_train_dataloader,
fold_val_dataloader,
fold_train_scaler,
)
)

# Pass model to trainer
fold_trainer.model = fold_model
fold_trainer.build()

# Pass the dataloaders to the trainer
fold_model = fold_trainer.train(fold_train_dataloader, fold_val_dataloader)
# evaluate the model
y_pred_fold = fold_model.predict(
fold_val_dataloader,
accelerator=trainer.accelerator,
devices=trainer.devices,
)
num_gpus = torch.cuda.device_count() if trainer.accelerator == "gpu" else 0

trainer_config = {
"max_epochs": trainer.max_epochs,
"accelerator": trainer.accelerator,
"devices": trainer.devices,
"output_dir": trainer.output_dir,
"wandb_project": trainer.wandb_project,
"enable_progress_bar": trainer.enable_progress_bar,
"inference_mode": trainer.inference_mode,
"num_gpus": num_gpus,
}

# calculate the mean and confidence interval for each metric
# loop over tasks and calculate the statistics
if not (n_tasks == y_pred_fold.shape[1]):
raise ValueError("y_true and y_pred must have the same number of tasks")
if self.n_jobs > 1 and trainer.accelerator == "gpu":
logger.info(
f"Distributing {len(cv_list)} CV folds across {num_gpus} GPU(s) "
f"with n_jobs={self.n_jobs}. Each fold is assigned to GPU "
f"fold % {num_gpus}. Ensure sufficient VRAM per GPU."
)

for task_id in range(n_tasks):
t_true, t_pred = get_t_true_and_t_pred(
task_id, y_true, y_pred, y_val, y_pred_fold
if self.n_jobs == 1:
fold_results = [
self._run_single_fold(
fold,
y_val,
y_true,
y_pred,
fold_train_dl,
fold_val_dl,
fold_train_scaler,
model,
trainer_config,
target_labels,
n_tasks,
self.active_metrics,
self.n_jobs,
)
t_label = target_labels[task_id]
for fold, y_val, fold_train_dl, fold_val_dl, fold_train_scaler in fold_inputs
]
else:
spawn_ctx = mp.get_context("spawn")
with ProcessPoolExecutor(
max_workers=self.n_jobs, mp_context=spawn_ctx
) as pool:
futures = [
pool.submit(
PytorchLightningRepeatedKFoldCrossValidation._run_single_fold,
fold,
y_val,
y_true,
y_pred,
fold_train_dl,
fold_val_dl,
fold_train_scaler,
model,
trainer_config,
target_labels,
n_tasks,
self.active_metrics,
self.n_jobs,
)
for fold, y_val, fold_train_dl, fold_val_dl, fold_train_scaler in fold_inputs
]
fold_results = [f.result() for f in futures]

for metric_name, metric_data in self.active_metrics.items():
metric_func, is_scipy_metric, _ = metric_data
value = metric_func(t_true, t_pred)
for fold_metrics, y_val, y_pred_fold in fold_results:
for t_label, metrics in fold_metrics.items():
for metric_name, value in metrics.items():
self._metric_data[t_label][metric_name].append(value)

logger.info(f"Fold {fold} complete")
# y_val and y_pred_fold from the last fold are used for the regression plot
_, y_val, y_pred_fold = fold_results[-1]

logger.info(f"All {len(cv_list)} folds complete")

# now we have the metric data for each task, calculate the mean and confidence interval
for t_label in target_labels:
Expand Down
7 changes: 5 additions & 2 deletions openadmet/models/trainer/lightning.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,21 +71,23 @@ class LightningTrainer(TrainerBase):

max_epochs: int = 20
accelerator: str = "gpu"
devices: int = 1
devices: int | list[int] = 1
use_wandb: bool = False
output_dir: Path = None
wandb_project: str = "openadmet-testing"
early_stopping: bool = False
early_stopping_patience: int = 10
early_stopping_mode: str = "min"
early_stopping_min_delta: float = 0.001
enable_progress_bar: bool = True
gradient_clip_val: float = 0.0
precision: int = 32
accumulate_grad_batches: int = 1
deterministic: bool = False
fast_dev_run: bool = False
limit_train_batches: float = 1.0
limit_val_batches: float = 1.0
inference_mode: bool = False

wandb_logger: Any = None
_logger: Any
Expand Down Expand Up @@ -157,7 +159,7 @@ def build(self, no_val: bool = False):
# Initialize the PyTorch Lightning trainer
self._trainer = pl.Trainer(
logger=self._logger,
enable_progress_bar=True,
enable_progress_bar=self.enable_progress_bar,
accelerator=self.accelerator,
devices=self.devices, # Use GPU if available
max_epochs=self.max_epochs, # number of epochs to train for
Expand All @@ -169,6 +171,7 @@ def build(self, no_val: bool = False):
fast_dev_run=self.fast_dev_run,
limit_train_batches=self.limit_train_batches,
limit_val_batches=self.limit_val_batches,
inference_mode=self.inference_mode,
)

def train(self, train_dataloader, val_dataloader):
Expand Down
Loading