Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/cloudai/_core/test_scenario.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ def has_more_iterations(self) -> bool:
"""
return self.current_iteration + 1 < self.iterations

def increment_step(self) -> int:
"""Advance the trial counter and return the new value."""
self.step += 1
return self.step

@property
def metric_reporter(self) -> Optional[Type[ReportGenerationStrategy]]:
if not self.reports:
Expand Down
108 changes: 62 additions & 46 deletions src/cloudai/cli/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import copy
import logging
import signal
import traceback
from contextlib import contextmanager
from pathlib import Path
from typing import Callable, List, Optional
Expand Down Expand Up @@ -132,64 +133,79 @@ def handle_dse_job(runner: Runner, args: argparse.Namespace) -> int:
return 1

err = 0
for tr in runner.runner.test_scenario.test_runs:
test_run = copy.deepcopy(tr)

agent_type = test_run.test.agent
agent_class = registry.agents_map.get(agent_type)
if agent_class is None:
logging.error(
f"No agent available for type: {agent_type}. Please make sure {agent_type} "
f"is a valid agent type. Available agents: {registry.agents_map.keys()}"
# Recoverable failures return a non-zero rc and are accumulated here; an unexpected exception
# (a bug) is a hard-fail. We capture it so reports still generate, then re-raise below.
run_error: Exception | None = None
try:
for tr in runner.runner.test_scenario.test_runs:
test_run = copy.deepcopy(tr)

agent_type = test_run.test.agent
agent_class = registry.agents_map.get(agent_type)
if agent_class is None:
logging.error(
f"No agent available for type: {agent_type}. Please make sure {agent_type} "
f"is a valid agent type. Available agents: {registry.agents_map.keys()}"
)
err = 1
continue

agent_config_data = test_run.test.agent_config or {}
agent_config = agent_class.get_config_class()(**agent_config_data)
env = CloudAIGymEnv(
test_run=test_run,
runner=runner.runner,
rewards=agent_config.rewards,
)
err = 1
continue
if agent_config.start_action == "first":
logging.info(f"Using deterministic first sweep for the chosen agent: {env.first_sweep}.")

agent_config_data = test_run.test.agent_config or {}
agent_config = agent_class.get_config_class()(**agent_config_data)
env = CloudAIGymEnv(
test_run=test_run,
runner=runner.runner,
rewards=agent_config.rewards,
)
if agent_config.start_action == "first":
logging.info(f"Using deterministic first sweep for the chosen agent: {env.first_sweep}.")

agent = agent_class(env, agent_config)

observation, _ = env.reset()
for _ in range(agent.max_steps):
result = agent.select_action(observation=observation)
if result is None:
break
step, action = result
env.test_run.step = step
logging.info(f"Running step {step} (of {agent.max_steps}) with action {action}")
prev_observation = observation
observation, reward, done, *_ = env.step(action)
agent.update_policy(
{
"trial_index": step,
"value": reward,
"observation": observation,
"prev_observation": prev_observation,
"action": action,
"done": done,
}
)
logging.info(f"Step {step}: Observation: {[round(obs, 4) for obs in observation]}, Reward: {reward:.4f}")
agent = agent_class(env, agent_config)

err |= agent.run()
except Exception as exc:
run_error = exc
logging.exception("DSE job aborted by an unexpected error; generating reports before failing.")

if args.mode == "run":
runner.runner.test_scenario.test_runs = original_test_runs
generate_reports(runner.runner.system, runner.runner.test_scenario, runner.runner.scenario_root)
generate_reports(
runner.runner.system,
runner.runner.test_scenario,
runner.runner.scenario_root,
error=run_error,
)

if run_error is not None:
raise run_error

logging.info("All jobs are complete.")
return err


def generate_reports(system: System, test_scenario: TestScenario, result_dir: Path) -> None:
def _record_run_failure(result_dir: Path, error: BaseException) -> None:
"""Persist an aborting error into the results dir so the failure is documented with the reports."""
failure_path = result_dir / "dse_failure.txt"
tb = "".join(traceback.format_exception(type(error), error, error.__traceback__))
try:
result_dir.mkdir(parents=True, exist_ok=True)
failure_path.write_text(f"DSE job aborted by an unexpected {type(error).__name__}: {error}\n\n{tb}")
logging.info(f"Documented the aborting error in {failure_path}")
except OSError:
logging.exception(f"Failed to write failure report to {failure_path}")


def generate_reports(
system: System,
test_scenario: TestScenario,
result_dir: Path,
error: BaseException | None = None,
) -> None:
registry = Registry()

if error is not None:
_record_run_failure(result_dir, error)

for name, reporter_class in registry.ordered_scenario_reports():
logging.debug(f"Generating report '{name}' ({reporter_class.__name__})")

Expand Down
48 changes: 48 additions & 0 deletions src/cloudai/configurator/base_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from abc import ABC, abstractmethod
from typing import Any, Dict, Literal

Expand Down Expand Up @@ -111,3 +112,50 @@ def update_policy(self, _feedback: Dict[str, Any]) -> None:
feedback (Dict[str, Any]): Feedback information from the environment.
"""
pass

def run(self) -> int:
"""
Orchestrate this agent's exploration over ``self.env``.

Default: a step loop driven by the dispatcher (``select_action`` →
``env.step`` → ``update_policy`` per trial). Agents that drive their
own training loop (e.g. RLlib-based agents calling ``algo.train()``)
override this method.

Failure contract (``handle_dse_job`` consumes the result via
``err |= agent.run()``):

- Return a non-zero code for *recoverable* failures (e.g. a workload run
that failed but should not abort the rest of the sweep). The code is
accumulated and the next ``TestRun`` still executes. Workload-level
failures are already surfaced this way: ``CloudAIGymEnv.step`` maps a
failed metric to ``rewards.metric_failure`` rather than raising, and
``rllib_run`` catches training errors and returns ``rc=1``.
- Raise for *unexpected* failures (framework/agent bugs). Exceptions
propagate out of ``handle_dse_job`` and hard-fail the job so the bug
is surfaced instead of masked as a penalizing reward.

Returns:
int: Process-style return code (``0`` success, non-zero recoverable failure).
"""
observation, _ = self.env.reset()
for _ in range(self.max_steps):
result = self.select_action(observation=observation)
if result is None:
break
step, action = result
logging.info(f"Running step {step} (of {self.max_steps}) with action {action}")
prev_observation = observation
observation, reward, done, *_ = self.env.step(action)
self.update_policy(
{
"trial_index": step,
"value": reward,
"observation": observation,
"prev_observation": prev_observation,
"action": action,
"done": done,
}
)
logging.info(f"Step {step}: Observation: {[round(obs, 4) for obs in observation]}, Reward: {reward:.4f}")
return 0
1 change: 1 addition & 0 deletions src/cloudai/configurator/cloudai_gym.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ def step(self, action: Any) -> Tuple[list, float, bool, dict]:
- done (bool): Whether the episode is done.
- info (dict): Additional info for debugging.
"""
self.test_run.increment_step()
self.test_run = self.test_run.apply_params_set(action)

cached_result = self.get_cached_trajectory_result(action)
Expand Down
14 changes: 10 additions & 4 deletions tests/test_cloudaigym.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,13 @@ def test_tr_output_path(setup_env: tuple[TestRun, BaseRunner]):
agent = GridSearchAgent(env, GridSearchAgent.get_config_class()())

_, action = agent.select_action()
env.test_run.step = 42
env.test_run.step = 41
env.step(action)

assert env.test_run.output_path.name == "42"
assert env.test_run.output_path.name == "42", (
"CloudAIGymEnv.step() must advance test_run.step before computing output_path; "
"starting at 41, step #42's artifacts must land in dir '42'."
)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -417,7 +420,7 @@ def test_cached_step_appends_trajectory_row(nemorun: NeMoRunTestDefinition, tmp_
env.test_run.current_iteration = 0
env.trajectory = {0: [TrajectoryEntry(step=1, action=cached_action, reward=0.42, observation=[0.84])]}

env.test_run.step = 5
env.test_run.step = 4
obs, reward, done, _info = env.step(cached_action)

runner.run.assert_not_called()
Expand All @@ -426,7 +429,10 @@ def test_cached_step_appends_trajectory_row(nemorun: NeMoRunTestDefinition, tmp_
assert done is False
rows = env.trajectory[0]
assert len(rows) == 2
assert rows[-1].step == 5
assert rows[-1].step == 5, (
"CloudAIGymEnv.step() advances test_run.step before recording the trajectory row; "
"the cached row must be tagged with the advanced trial index, not the pre-step value."
)
assert rows[-1].reward == 0.42
assert rows[-1].action == cached_action

Expand Down
Loading
Loading