diff --git a/gcm/monitoring/slurm/parsing.py b/gcm/monitoring/slurm/parsing.py index bcedc35..e37a40b 100644 --- a/gcm/monitoring/slurm/parsing.py +++ b/gcm/monitoring/slurm/parsing.py @@ -1,5 +1,6 @@ # Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. +import logging import re import string from datetime import timedelta @@ -16,6 +17,8 @@ first_of, ) +logger = logging.getLogger(__name__) + def parse_cpus_alloc(v: str) -> int: return int(v.split("/")[0]) @@ -75,6 +78,48 @@ def parse_gres_or_tres(v: str) -> int: return parse_tres(v) +def mb_to_bytes(value: int) -> int: + """Convert megabytes to bytes.""" + return value * 1_000_000 + + +def parse_memory_to_bytes(value: str) -> int: + """Parse a memory value to bytes. + + Handles plain integers (assumed MB, Slurm default) and suffixed + strings (M, G, T, P). This is the single robust entry point for + all memory parsing. + """ + if not value or value == "0": + return 0 + + suffix = value[-1] + if suffix.isdigit(): + # Plain integer = MB (Slurm default unit) + return int(value) * 1_000_000 + + multipliers = { + "M": 1_000_000, + "G": 1_000_000_000, + "T": 1_000_000_000_000, + "P": 1_000_000_000_000_000, + } + if suffix not in multipliers: + raise ValueError(f"Unrecognized suffix in {value}") + return int(float(value[:-1]) * multipliers[suffix]) + + +def maybe_parse_memory_to_bytes(v: str) -> int | None: + """Parse memory to bytes, returning None for non-parseable values (N/A, empty, etc).""" + if not v or v in {"N/A", "(null)"}: + return None + try: + return parse_memory_to_bytes(v) + except (ValueError, TypeError): + logger.error(f"Failed to parse memory value: {v!r}") + return None + + def convert_memory_to_mb(value: str) -> int: """Helper function to convert memory values with units to MB. @@ -89,7 +134,8 @@ def convert_memory_to_mb(value: str) -> int: suffix = value[-1] if suffix.isdigit(): - return int(int(value) * 1e-6) + # Plain integer = MB (Slurm default unit) + return int(value) if suffix == "P": multiplier = 1_000_000_000 diff --git a/gcm/schemas/slurm/sinfo_node.py b/gcm/schemas/slurm/sinfo_node.py index 8a000ad..4f54ca0 100644 --- a/gcm/schemas/slurm/sinfo_node.py +++ b/gcm/schemas/slurm/sinfo_node.py @@ -2,8 +2,8 @@ # All rights reserved. from dataclasses import dataclass -from gcm.monitoring.coerce import maybe_int from gcm.monitoring.slurm.parsing import ( + maybe_parse_memory_to_bytes, parse_cpus_alloc, parse_cpus_idle, parse_cpus_other, @@ -38,8 +38,8 @@ class NodeData(DerivedCluster): CPUS_IDLE: int = parsed_field(parser=parse_cpus_idle, field_name="CPUS(A/I/O/T)") CPUS_OTHER: int = parsed_field(parser=parse_cpus_other, field_name="CPUS(A/I/O/T)") CPUS_TOTAL: int = parsed_field(parser=parse_cpus_total, field_name="CPUS(A/I/O/T)") - FREE_MEM: int | None = parsed_field(parser=maybe_int) - MEMORY: int | None = parsed_field(parser=maybe_int) + FREE_MEM: int | None = parsed_field(parser=maybe_parse_memory_to_bytes) + MEMORY: int | None = parsed_field(parser=maybe_parse_memory_to_bytes) NUM_GPUS: int = parsed_field(parser=parse_gres, field_name="GRES") USER: str = parsed_field(parser=str) REASON: str = parsed_field(parser=str) diff --git a/gcm/schemas/slurm/squeue.py b/gcm/schemas/slurm/squeue.py index d2a4088..212552b 100644 --- a/gcm/schemas/slurm/squeue.py +++ b/gcm/schemas/slurm/squeue.py @@ -6,7 +6,7 @@ from gcm.monitoring.coerce import maybe_float, maybe_int from gcm.monitoring.slurm.nodelist_parsers import nodelist from gcm.monitoring.slurm.parsing import ( - convert_memory_to_mb, + maybe_parse_memory_to_bytes, parse_gres_or_tres, parse_value_from_tres, ) @@ -27,7 +27,9 @@ class JobData(DerivedCluster): JOBID_RAW: str = parsed_field(parser=str, field_name="JOBID") NAME: str = parsed_field(parser=str) TIME_LIMIT: str = parsed_field(parser=str, field_name="TIMELIMIT") - MIN_MEMORY: int = parsed_field(parser=convert_memory_to_mb, field_name="MINMEMORY") + MIN_MEMORY: int | None = parsed_field( + parser=maybe_parse_memory_to_bytes, field_name="MINMEMORY" + ) COMMAND: str = parsed_field(parser=str) PRIORITY: float | None = parsed_field(parser=maybe_float) STATE: str = parsed_field(parser=str) diff --git a/gcm/tests/test_parsers.py b/gcm/tests/test_parsers.py index 90f13a3..9bd8ae1 100644 --- a/gcm/tests/test_parsers.py +++ b/gcm/tests/test_parsers.py @@ -13,7 +13,14 @@ range_expression, range_expression_element, ) -from gcm.monitoring.slurm.parsing import parse_gres, parse_tres, parse_value_from_tres +from gcm.monitoring.slurm.parsing import ( + maybe_parse_memory_to_bytes, + mb_to_bytes, + parse_gres, + parse_memory_to_bytes, + parse_tres, + parse_value_from_tres, +) from gcm.monitoring.utils.parsing.combinators import ( at_least_one, at_least_zero, @@ -542,5 +549,66 @@ def test_parse_gpu_from_tres_bad(s: str, exc: Type[Exception]) -> None: parse_value_from_tres(s, "gres/gpu") +@pytest.mark.parametrize( + "value, expected", + [ + (0, 0), + (1, 1_000_000), + (500_000, 500_000_000_000), + (1_524_000, 1_524_000_000_000), + ], +) +def test_mb_to_bytes(value: int, expected: int) -> None: + assert mb_to_bytes(value) == expected + + +@pytest.mark.parametrize( + "value, expected", + [ + ("0", 0), + ("100", 100_000_000), + ("500000", 500_000_000_000), + ("500M", 500_000_000), + ("60G", 60_000_000_000), + ("10.5G", 10_500_000_000), + ("1T", 1_000_000_000_000), + ("1P", 1_000_000_000_000_000), + ], +) +def test_parse_memory_to_bytes(value: str, expected: int) -> None: + assert parse_memory_to_bytes(value) == expected + + +@pytest.mark.parametrize( + "value, expected", + [ + ("0", 0), + ("100", 100_000_000), + ("500M", 500_000_000), + ("60G", 60_000_000_000), + ("N/A", None), + ("invalid", None), + ("", None), + ], +) +def test_maybe_parse_memory_to_bytes(value: str, expected: int | None) -> None: + assert maybe_parse_memory_to_bytes(value) == expected + + +@pytest.mark.parametrize( + "s, expected", + [ + ("cpu=1,mem=10G,node=1,billing=2", 10_000), + ("cpu=320,mem=1280.50G,node=20,billing=3040,gres/gpu=160", 1_280_500), + ("cpu=5200,mem=32500000M,node=65,billing=17487,gres/gpu=520", 32_500_000), + ("cpu=24,node=1,billing=112,gres/gpu=2", 0), + ("", 0), + ], +) +def test_parse_mem_from_tres(s: str, expected: int) -> None: + """parse_value_from_tres returns MB for memory. Callers wrap with mb_to_bytes.""" + assert parse_value_from_tres(s, "mem") == expected + + if __name__ == "__main__": unittest.main() diff --git a/gcm/tests/test_slurm.py b/gcm/tests/test_slurm.py index 91345b6..3a8aa15 100644 --- a/gcm/tests/test_slurm.py +++ b/gcm/tests/test_slurm.py @@ -83,7 +83,7 @@ class TestSlurmCliClient: JOBID_RAW="42953598", NAME="run1", TIME_LIMIT="3-00:00:00", - MIN_MEMORY=60000, + MIN_MEMORY=60_000_000_000, COMMAND="/test/run.sh", PRIORITY=0.00017546257008, STATE="RUNNING", @@ -127,7 +127,7 @@ class TestSlurmCliClient: JOBID_RAW="42956774", NAME="run3", TIME_LIMIT="3-00:00:00", - MIN_MEMORY=60000, + MIN_MEMORY=60_000_000_000, COMMAND="/test/run.sh", PRIORITY=0.00000595580787, STATE="RUNNING", @@ -208,7 +208,7 @@ class TestSlurmCliClient: JOBID_RAW="22783212", NAME="run4", TIME_LIMIT="1:00:00", - MIN_MEMORY=10500, + MIN_MEMORY=10_500_000_000, COMMAND="/test/run.sh", PRIORITY=0.00018553552222, STATE="PENDING", @@ -256,7 +256,7 @@ class TestSlurmCliClient: JOBID_RAW="42271120", NAME="run5", TIME_LIMIT="3-00:00:00", - MIN_MEMORY=1000000, + MIN_MEMORY=1_000_000_000_000, COMMAND="/test/run.sh", PRIORITY=0.00012484216134, STATE="PENDING", diff --git a/gcm/tests/test_slurm_job_monitor.py b/gcm/tests/test_slurm_job_monitor.py index 7b3765a..138e1c0 100644 --- a/gcm/tests/test_slurm_job_monitor.py +++ b/gcm/tests/test_slurm_job_monitor.py @@ -174,7 +174,7 @@ def test_cli(tmp_path: Path) -> None: "CPUS_IDLE": 0, "CPUS_OTHER": 80, "CPUS_TOTAL": 80, - "MEMORY": 500_000, + "MEMORY": 500_000_000_000, "NUM_GPUS": 8, "num_rows": 4, "collection_unixtime": FakeClock().unixtime(), @@ -194,7 +194,7 @@ def test_cli(tmp_path: Path) -> None: "CPUS_IDLE": 0, "CPUS_OTHER": 80, "CPUS_TOTAL": 80, - "MEMORY": 500_000, + "MEMORY": 500_000_000_000, "NUM_GPUS": 8, "num_rows": 4, "collection_unixtime": FakeClock().unixtime(), @@ -214,8 +214,8 @@ def test_cli(tmp_path: Path) -> None: "CPUS_IDLE": 0, "CPUS_OTHER": 0, "CPUS_TOTAL": 80, - "FREE_MEM": 210_288, - "MEMORY": 500_000, + "FREE_MEM": 210_288_000_000, + "MEMORY": 500_000_000_000, "NUM_GPUS": 8, "num_rows": 4, "collection_unixtime": FakeClock().unixtime(), @@ -235,8 +235,8 @@ def test_cli(tmp_path: Path) -> None: "CPUS_IDLE": 0, "CPUS_OTHER": 0, "CPUS_TOTAL": 80, - "FREE_MEM": 449_512, - "MEMORY": 500_000, + "FREE_MEM": 449_512_000_000, + "MEMORY": 500_000_000_000, "NUM_GPUS": 2, "num_rows": 4, "collection_unixtime": FakeClock().unixtime(), @@ -304,7 +304,7 @@ def test_cli(tmp_path: Path) -> None: "collection_unixtime": FakeClock().unixtime(), "GPUS_REQUESTED": 1, "MIN_CPUS": 1, - "MIN_MEMORY": 60000, + "MIN_MEMORY": 60_000_000_000, "CPUS": 1, "NODES": 1, "TRES_GPUS_ALLOCATED": 1, @@ -349,7 +349,7 @@ def test_cli(tmp_path: Path) -> None: "collection_unixtime": 1668197951, "GPUS_REQUESTED": 8, "MIN_CPUS": 80, - "MIN_MEMORY": 60000, + "MIN_MEMORY": 60_000_000_000, "CPUS": 2560, "NODES": 32, "TRES_GPUS_ALLOCATED": 256, @@ -429,7 +429,7 @@ def test_cli(tmp_path: Path) -> None: "collection_unixtime": FakeClock().unixtime(), "GPUS_REQUESTED": 0, "MIN_CPUS": 1, - "MIN_MEMORY": 10500, + "MIN_MEMORY": 10_500_000_000, "CPUS": 1, "NODES": 1, "TRES_GPUS_ALLOCATED": 0, @@ -475,7 +475,7 @@ def test_cli(tmp_path: Path) -> None: "collection_unixtime": FakeClock().unixtime(), "GPUS_REQUESTED": 8, "MIN_CPUS": 16, - "MIN_MEMORY": 1000000, + "MIN_MEMORY": 1_000_000_000_000, "CPUS": 320, "NODES": 20, "TRES_GPUS_ALLOCATED": 160,