Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .github/workflows/run_tests_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,18 @@ jobs:
- name: Checkout code
uses: actions/checkout@v3

- name: Setup environment
uses: conda-incubator/setup-miniconda@v3
with:
miniforge-version: latest
activate-environment: bioptim
environment-file: environment.yml

- name: Print conda info
run: |
conda info
conda list

- name: Install extra dependencies
run: |
sudo apt install -y python3-pip
Expand Down
6 changes: 1 addition & 5 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ c_generated_code
*.gv.pdf

# Bioptim files
bioptim/sandbox/
sandbox/
*.pkl
*.mtx
*.casadi
Expand All @@ -120,10 +120,6 @@ _l4c_generated/
# Mac dev
*.DS_store

*.png

sandbox/

# Ignore all npy files in tests folder except those in tests/shard6
/tests/*.npy
!/tests/shard6/v_bounds_max.npy
Expand Down
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[submodule "external/acados"]
path = external/acados
url = https://github.com/acados/acados.git
url = https://github.com/acados/acados.git
4 changes: 2 additions & 2 deletions bioptim/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@
from .dynamics.ode_solvers import OdeSolver, OdeSolverBase
from .gui.online_callback_server import PlottingServer
from .gui.plot import CustomPlot
from .interfaces import Solver
from .interfaces import Solver, CasadiFunctionInterface
from .limits.constraints import ConstraintFcn, ConstraintList, Constraint, ParameterConstraintList
from .limits.fatigue_path_conditions import FatigueBounds, FatigueInitialGuess
from .limits.multinode_constraint import MultinodeConstraintFcn, MultinodeConstraintList, MultinodeConstraint
Expand Down Expand Up @@ -196,7 +196,7 @@
OnlineOptim,
ContactType,
)
from .misc.mapping import BiMappingList, BiMapping, Mapping, SelectionMapping, Dependency
from .misc.mapping import BiMappingList, BiMapping, Mapping, NodeMapping, NodeMappingList, SelectionMapping, Dependency
from .models.biorbd.biorbd_model import BiorbdModel
from .models.biorbd.external_forces import ExternalForceSetTimeSeries, ExternalForceSetVariables
from .models.biorbd.holonomic_biorbd_model import HolonomicBiorbdModel
Expand Down
9 changes: 9 additions & 0 deletions bioptim/examples/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
("Custom Bounds", "custom_bounds.py"),
("Custom constraint", "custom_constraint.py"),
("Custom initial guess", "custom_initial_guess.py"),
("Custom non casadi dynamics", "custom_non_casadi_dynamics.py"),
("Custom objectives", "custom_objectives.py"),
("Custom parameters", "custom_parameters.py"),
("Custom phase transitions", "custom_phase_transitions.py"),
Expand Down Expand Up @@ -134,6 +135,14 @@
]
),
),
(
"deep_neural_network",
OrderedDict(
[
("pytorch ocp", "pytorch_ocp.py"),
]
),
),
]
)

Expand Down
281 changes: 281 additions & 0 deletions bioptim/examples/toy_examples/deep_neural_network/pytorch_ocp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
"""
This example is similar to the getting_started/pendulum.py example, but the dynamics are computed using a deep neural
network driven by PyTorch. For this example to work, we create a neural network model that trains against the biorbd
library to predict the acceleration of the pendulum. Obviously, one can replace the prediction model with any other
pytorch model. The class TorchModel is thereafter used to wrap the model using L4casadi to be used in the bioptim framework,
if one needs a more flexible way to define the dynamics, objective functions or constraints, they can start from that
class and modify it to their needs.

Extra information:
All information regarding the installation, limitations, credits and known problems can be found in the header of the
TorchModel class.
"""

import os
from typing import Self

import biorbd
from bioptim import OptimalControlProgram, DynamicsFcn, BoundsList, Dynamics, Objective, ObjectiveFcn, Solver
from bioptim.models.torch.torch_model import TorchModel
import numpy as np
import torch


class EarlyStopper:
def __init__(self, patience=1, min_delta=0):
self.patience = patience
self.min_delta = min_delta
self.counter = 0
self.min_validation_loss = float("inf")

def early_stop(self, validation_loss):
if validation_loss < self.min_validation_loss:
self.min_validation_loss = validation_loss
self.counter = 0
elif validation_loss > (self.min_validation_loss + self.min_delta):
self.counter += 1
if self.counter >= self.patience:
return True
return False


class NeuralNetworkModel(torch.nn.Module):
def __init__(self, layer_node_count: tuple[int], dropout_probability: float):
super(NeuralNetworkModel, self).__init__()
activations = torch.nn.GELU()

# Initialize the layers of the neural network
self._size_in = layer_node_count[0]
self._size_out = layer_node_count[-1]
first_and_hidden_layers_node_count = layer_node_count[:-1]
layers = torch.nn.ModuleList()
for i in range(len(first_and_hidden_layers_node_count) - 1):
layers.append(
torch.nn.Linear(first_and_hidden_layers_node_count[i], first_and_hidden_layers_node_count[i + 1])
)
layers.append(activations)
layers.append(torch.nn.Dropout(dropout_probability))
layers.append(torch.nn.Linear(first_and_hidden_layers_node_count[-1], layer_node_count[-1]))

self._forward_model = torch.nn.Sequential(*layers)
self._forward_model.to(self.get_torch_device())

self._optimizer = torch.optim.Adam(self.parameters(), lr=1e-3)
self._loss_function = torch.nn.HuberLoss()

# Put the model in evaluation mode
self.eval()

@property
def size_in(self) -> int:
return self._size_in

@property
def size_out(self) -> int:
return self._size_out

def forward(self, x: torch.Tensor) -> torch.Tensor:
output = torch.Tensor(x.shape[0], self._forward_model[-1].out_features)
for i, data in enumerate(x):
output[i, :] = self._forward_model(data)
return output.to(self.get_torch_device())

@staticmethod
def get_torch_device() -> torch.device:
return torch.device("cuda" if torch.cuda.is_available() else "cpu")

def train_me(
self, training_data: list[torch.Tensor], validation_data: list[torch.Tensor], max_epochs: int = 100
) -> None:
# More details about scheduler in documentation
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
self._optimizer, mode="min", factor=0.1, patience=20, min_lr=1e-8
)

early_stopper = EarlyStopper(patience=20, min_delta=1e-5)
for i in range(max_epochs):
self._perform_epoch_training(targets=training_data)
validation_loss = self._perform_epoch_training(targets=validation_data, only_compute=True)
scheduler.step(validation_loss) # Adjust/reduce learning rate

# Check if the training should stop
print(f"Validation loss: {validation_loss} (epoch: {i})")
if early_stopper.early_stop(validation_loss):
print("Early stopping")
break

def save_me(self, path: str) -> None:
layer_node_count = tuple(
[model.in_features for model in self._forward_model if isinstance(model, torch.nn.Linear)]
+ [self._forward_model[-1].out_features]
)

dropout_probability = tuple([model.p for model in self._forward_model if isinstance(model, torch.nn.Dropout)])
if len(dropout_probability) == 0:
dropout_probability = 0
elif len(dropout_probability) > 1:
# make sure that the dropout probability is the same for all layers
if not all(prob == dropout_probability[0] for prob in dropout_probability):
raise ValueError("Different dropout probabilities for different layers")
dropout_probability = dropout_probability[0]
else:
dropout_probability = dropout_probability[0]

dico = {
"layer_node_count": layer_node_count,
"dropout_probability": dropout_probability,
"state_dict": self.state_dict(),
}
if not os.path.isdir(os.path.dirname(path)):
os.makedirs(os.path.dirname(path))
torch.save(dico, path)

@classmethod
def load_me(cls, path: str) -> Self:
data = torch.load(path, weights_only=True)
inputs = {
"layer_node_count": data["layer_node_count"],
"dropout_probability": data["dropout_probability"],
}
model = NeuralNetworkModel(**inputs)
model.load_state_dict(data["state_dict"])
return model

def _perform_epoch_training(
self,
targets: list[torch.Tensor],
only_compute: bool = False,
) -> tuple[float, float]:

# Perform the predictions
if only_compute:
with torch.no_grad():
all_predictions = self(targets[0])
all_targets = targets[1]

else:
# Put the model in training mode
self.train()

# If it is training, we are updating the model with each prediction, we therefore need to do it in a loop
all_predictions = torch.tensor([]).to(self.get_torch_device())
all_targets = torch.tensor([]).to(self.get_torch_device())
for input, target in zip(*targets):
self._optimizer.zero_grad()

# Get the predictions and targets
output = self(input[None, :])

# Do some machine learning shenanigans
current_loss = self._loss_function.forward(output, target[None, :])
current_loss.backward() # Backpropagation
self._optimizer.step() # Updating weights

# Populate the return values
all_predictions = torch.cat((all_predictions, output))
all_targets = torch.cat((all_targets, target[None, :]))

# Put back the model in evaluation mode
self.eval()

# Calculation of mean distance and error %
epoch_accuracy = (all_predictions - all_targets).abs().mean().item()
return epoch_accuracy


def prepare_ocp(
model: torch.nn.Module,
final_time: float,
n_shooting: int,
) -> OptimalControlProgram:

# Dynamics
dynamics = Dynamics(DynamicsFcn.TORQUE_DRIVEN)
torch_model = TorchModel(torch_model=model)

# Path bounds
x_bounds = BoundsList()
x_bounds["q"] = [-3.14 * 1.5] * torch_model.nb_q, [3.14 * 1.5] * torch_model.nb_q
x_bounds["q"][:, [0, -1]] = 0 # Start and end at 0...
x_bounds["q"][1, -1] = 3.14 # ...but end with pendulum 180 degrees rotated
x_bounds["qdot"] = [-3.14 * 10.0] * torch_model.nb_qdot, [3.14 * 10.0] * torch_model.nb_qdot
x_bounds["qdot"][:, [0, -1]] = 0 # Start and end without any velocity

# Define control path bounds
u_bounds = BoundsList()
u_bounds["tau"] = [-100] * torch_model.nb_tau, [100] * torch_model.nb_tau
u_bounds["tau"][1, :] = 0 # ...but remove the capability to actively rotate

# Add objective functions
objective_functions = Objective(ObjectiveFcn.Lagrange.MINIMIZE_CONTROL, key="tau")

return OptimalControlProgram(
torch_model,
dynamics,
n_shooting,
final_time,
x_bounds=x_bounds,
u_bounds=u_bounds,
objective_functions=objective_functions,
use_sx=False,
)


def generate_dataset(biorbd_model: biorbd.Model, data_point_count: int) -> list[torch.Tensor]:
q_ranges = np.array(
[[[q_range.min(), q_range.max()] for q_range in segment.QRanges()] for segment in biorbd_model.segments()]
).squeeze()
qdot_ranges = np.array(
[
[[qdot_range.min(), qdot_range.max()] for qdot_range in segment.QdotRanges()]
for segment in biorbd_model.segments()
]
).squeeze()
tau_ranges = np.array([-100, 100] * biorbd_model.nbGeneralizedTorque()).reshape(-1, 2)
tau_ranges[1, :] = 0

q = torch.rand(data_point_count, biorbd_model.nbQ()) * (q_ranges[:, 1] - q_ranges[:, 0]) + q_ranges[:, 0]
qdot = (
torch.rand(data_point_count, biorbd_model.nbQdot()) * (qdot_ranges[:, 1] - qdot_ranges[:, 0])
+ qdot_ranges[:, 0]
)
tau = (
torch.rand(data_point_count, biorbd_model.nbGeneralizedTorque()) * (tau_ranges[:, 1] - tau_ranges[:, 0])
+ tau_ranges[:, 0]
)

q = q.to(torch.float)
qdot = qdot.to(torch.float)
tau = tau.to(torch.float)

qddot = torch.zeros(data_point_count, biorbd_model.nbQddot())
for i in range(data_point_count):
qddot[i, :] = torch.tensor(
biorbd_model.ForwardDynamics(np.array(q[i, :]), np.array(qdot[i, :]), np.array(tau[i, :])).to_array()
)

return [torch.cat((q, qdot, tau), dim=1), qddot]


def main():
# --- Prepare a predictive model --- #
force_new_training = False
biorbd_model = biorbd.Model("../getting_started/models/pendulum.bioMod")
training_data = generate_dataset(biorbd_model, data_point_count=30000)
validation_data = generate_dataset(biorbd_model, data_point_count=3000)

if force_new_training or not os.path.isfile("models/trained_pendulum_model.pt"):
model = NeuralNetworkModel(layer_node_count=(6, 8, 2), dropout_probability=0.2)
model.train_me(training_data, validation_data, max_epochs=300)
model.save_me("models/trained_pendulum_model.pt")
else:
model = NeuralNetworkModel.load_me("models/trained_pendulum_model.pt")

ocp = prepare_ocp(model=model, final_time=1, n_shooting=40)

# --- Solve the ocp --- #
ocp.solve(Solver.IPOPT(show_online_optim=True))


if __name__ == "__main__":
main()
1 change: 1 addition & 0 deletions bioptim/interfaces/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from .fratrop_options import FATROP
from .acados_options import ACADOS
from .sqp_options import SQP_METHOD
from .casadi_function_interface import CasadiFunctionInterface


class Solver:
Expand Down
Loading
Loading