Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 47 additions & 1 deletion gcm/monitoring/slurm/parsing.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
import logging
import re
import string
from datetime import timedelta
Expand All @@ -16,6 +17,8 @@
first_of,
)

logger = logging.getLogger(__name__)


def parse_cpus_alloc(v: str) -> int:
return int(v.split("/")[0])
Expand Down Expand Up @@ -75,6 +78,48 @@ def parse_gres_or_tres(v: str) -> int:
return parse_tres(v)


def mb_to_bytes(value: int) -> int:
"""Convert megabytes to bytes."""
return value * 1_000_000


def parse_memory_to_bytes(value: str) -> int:
"""Parse a memory value to bytes.

Handles plain integers (assumed MB, Slurm default) and suffixed
strings (M, G, T, P). This is the single robust entry point for
all memory parsing.
"""
if not value or value == "0":
return 0

suffix = value[-1]
if suffix.isdigit():
# Plain integer = MB (Slurm default unit)
return int(value) * 1_000_000

multipliers = {
"M": 1_000_000,
"G": 1_000_000_000,
"T": 1_000_000_000_000,
"P": 1_000_000_000_000_000,
}
if suffix not in multipliers:
raise ValueError(f"Unrecognized suffix in {value}")
return int(float(value[:-1]) * multipliers[suffix])


def maybe_parse_memory_to_bytes(v: str) -> int | None:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can use this decorator from 74 @log_error(name, return_on_error=0), so no need to import logger

"""Parse memory to bytes, returning None for non-parseable values (N/A, empty, etc)."""
if not v or v in {"N/A", "(null)"}:
return None
try:
return parse_memory_to_bytes(v)
except (ValueError, TypeError):
logger.error(f"Failed to parse memory value: {v!r}")
return None


def convert_memory_to_mb(value: str) -> int:
"""Helper function to convert memory values with units to MB.

Expand All @@ -89,7 +134,8 @@ def convert_memory_to_mb(value: str) -> int:

suffix = value[-1]
if suffix.isdigit():
return int(int(value) * 1e-6)
# Plain integer = MB (Slurm default unit)
return int(value)

if suffix == "P":
multiplier = 1_000_000_000
Expand Down
6 changes: 3 additions & 3 deletions gcm/schemas/slurm/sinfo_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
# All rights reserved.
from dataclasses import dataclass

from gcm.monitoring.coerce import maybe_int
from gcm.monitoring.slurm.parsing import (
maybe_parse_memory_to_bytes,
parse_cpus_alloc,
parse_cpus_idle,
parse_cpus_other,
Expand Down Expand Up @@ -38,8 +38,8 @@ class NodeData(DerivedCluster):
CPUS_IDLE: int = parsed_field(parser=parse_cpus_idle, field_name="CPUS(A/I/O/T)")
CPUS_OTHER: int = parsed_field(parser=parse_cpus_other, field_name="CPUS(A/I/O/T)")
CPUS_TOTAL: int = parsed_field(parser=parse_cpus_total, field_name="CPUS(A/I/O/T)")
FREE_MEM: int | None = parsed_field(parser=maybe_int)
MEMORY: int | None = parsed_field(parser=maybe_int)
FREE_MEM: int | None = parsed_field(parser=maybe_parse_memory_to_bytes)
MEMORY: int | None = parsed_field(parser=maybe_parse_memory_to_bytes)
NUM_GPUS: int = parsed_field(parser=parse_gres, field_name="GRES")
USER: str = parsed_field(parser=str)
REASON: str = parsed_field(parser=str)
Expand Down
6 changes: 4 additions & 2 deletions gcm/schemas/slurm/squeue.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from gcm.monitoring.coerce import maybe_float, maybe_int
from gcm.monitoring.slurm.nodelist_parsers import nodelist
from gcm.monitoring.slurm.parsing import (
convert_memory_to_mb,
maybe_parse_memory_to_bytes,
parse_gres_or_tres,
parse_value_from_tres,
)
Expand All @@ -27,7 +27,9 @@ class JobData(DerivedCluster):
JOBID_RAW: str = parsed_field(parser=str, field_name="JOBID")
NAME: str = parsed_field(parser=str)
TIME_LIMIT: str = parsed_field(parser=str, field_name="TIMELIMIT")
MIN_MEMORY: int = parsed_field(parser=convert_memory_to_mb, field_name="MINMEMORY")
MIN_MEMORY: int | None = parsed_field(
parser=maybe_parse_memory_to_bytes, field_name="MINMEMORY"
)
COMMAND: str = parsed_field(parser=str)
PRIORITY: float | None = parsed_field(parser=maybe_float)
STATE: str = parsed_field(parser=str)
Expand Down
70 changes: 69 additions & 1 deletion gcm/tests/test_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,14 @@
range_expression,
range_expression_element,
)
from gcm.monitoring.slurm.parsing import parse_gres, parse_tres, parse_value_from_tres
from gcm.monitoring.slurm.parsing import (
maybe_parse_memory_to_bytes,
mb_to_bytes,
parse_gres,
parse_memory_to_bytes,
parse_tres,
parse_value_from_tres,
)
from gcm.monitoring.utils.parsing.combinators import (
at_least_one,
at_least_zero,
Expand Down Expand Up @@ -542,5 +549,66 @@ def test_parse_gpu_from_tres_bad(s: str, exc: Type[Exception]) -> None:
parse_value_from_tres(s, "gres/gpu")


@pytest.mark.parametrize(
"value, expected",
[
(0, 0),
(1, 1_000_000),
(500_000, 500_000_000_000),
(1_524_000, 1_524_000_000_000),
],
)
def test_mb_to_bytes(value: int, expected: int) -> None:
assert mb_to_bytes(value) == expected


@pytest.mark.parametrize(
"value, expected",
[
("0", 0),
("100", 100_000_000),
("500000", 500_000_000_000),
("500M", 500_000_000),
("60G", 60_000_000_000),
("10.5G", 10_500_000_000),
("1T", 1_000_000_000_000),
("1P", 1_000_000_000_000_000),
],
)
def test_parse_memory_to_bytes(value: str, expected: int) -> None:
assert parse_memory_to_bytes(value) == expected


@pytest.mark.parametrize(
"value, expected",
[
("0", 0),
("100", 100_000_000),
("500M", 500_000_000),
("60G", 60_000_000_000),
("N/A", None),
("invalid", None),
("", None),
],
)
def test_maybe_parse_memory_to_bytes(value: str, expected: int | None) -> None:
assert maybe_parse_memory_to_bytes(value) == expected


@pytest.mark.parametrize(
"s, expected",
[
("cpu=1,mem=10G,node=1,billing=2", 10_000),
("cpu=320,mem=1280.50G,node=20,billing=3040,gres/gpu=160", 1_280_500),
("cpu=5200,mem=32500000M,node=65,billing=17487,gres/gpu=520", 32_500_000),
("cpu=24,node=1,billing=112,gres/gpu=2", 0),
("", 0),
],
)
def test_parse_mem_from_tres(s: str, expected: int) -> None:
"""parse_value_from_tres returns MB for memory. Callers wrap with mb_to_bytes."""
assert parse_value_from_tres(s, "mem") == expected


if __name__ == "__main__":
unittest.main()
8 changes: 4 additions & 4 deletions gcm/tests/test_slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class TestSlurmCliClient:
JOBID_RAW="42953598",
NAME="run1",
TIME_LIMIT="3-00:00:00",
MIN_MEMORY=60000,
MIN_MEMORY=60_000_000_000,
COMMAND="/test/run.sh",
PRIORITY=0.00017546257008,
STATE="RUNNING",
Expand Down Expand Up @@ -127,7 +127,7 @@ class TestSlurmCliClient:
JOBID_RAW="42956774",
NAME="run3",
TIME_LIMIT="3-00:00:00",
MIN_MEMORY=60000,
MIN_MEMORY=60_000_000_000,
COMMAND="/test/run.sh",
PRIORITY=0.00000595580787,
STATE="RUNNING",
Expand Down Expand Up @@ -208,7 +208,7 @@ class TestSlurmCliClient:
JOBID_RAW="22783212",
NAME="run4",
TIME_LIMIT="1:00:00",
MIN_MEMORY=10500,
MIN_MEMORY=10_500_000_000,
COMMAND="/test/run.sh",
PRIORITY=0.00018553552222,
STATE="PENDING",
Expand Down Expand Up @@ -256,7 +256,7 @@ class TestSlurmCliClient:
JOBID_RAW="42271120",
NAME="run5",
TIME_LIMIT="3-00:00:00",
MIN_MEMORY=1000000,
MIN_MEMORY=1_000_000_000_000,
COMMAND="/test/run.sh",
PRIORITY=0.00012484216134,
STATE="PENDING",
Expand Down
20 changes: 10 additions & 10 deletions gcm/tests/test_slurm_job_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def test_cli(tmp_path: Path) -> None:
"CPUS_IDLE": 0,
"CPUS_OTHER": 80,
"CPUS_TOTAL": 80,
"MEMORY": 500_000,
"MEMORY": 500_000_000_000,
"NUM_GPUS": 8,
"num_rows": 4,
"collection_unixtime": FakeClock().unixtime(),
Expand All @@ -194,7 +194,7 @@ def test_cli(tmp_path: Path) -> None:
"CPUS_IDLE": 0,
"CPUS_OTHER": 80,
"CPUS_TOTAL": 80,
"MEMORY": 500_000,
"MEMORY": 500_000_000_000,
"NUM_GPUS": 8,
"num_rows": 4,
"collection_unixtime": FakeClock().unixtime(),
Expand All @@ -214,8 +214,8 @@ def test_cli(tmp_path: Path) -> None:
"CPUS_IDLE": 0,
"CPUS_OTHER": 0,
"CPUS_TOTAL": 80,
"FREE_MEM": 210_288,
"MEMORY": 500_000,
"FREE_MEM": 210_288_000_000,
"MEMORY": 500_000_000_000,
"NUM_GPUS": 8,
"num_rows": 4,
"collection_unixtime": FakeClock().unixtime(),
Expand All @@ -235,8 +235,8 @@ def test_cli(tmp_path: Path) -> None:
"CPUS_IDLE": 0,
"CPUS_OTHER": 0,
"CPUS_TOTAL": 80,
"FREE_MEM": 449_512,
"MEMORY": 500_000,
"FREE_MEM": 449_512_000_000,
"MEMORY": 500_000_000_000,
"NUM_GPUS": 2,
"num_rows": 4,
"collection_unixtime": FakeClock().unixtime(),
Expand Down Expand Up @@ -304,7 +304,7 @@ def test_cli(tmp_path: Path) -> None:
"collection_unixtime": FakeClock().unixtime(),
"GPUS_REQUESTED": 1,
"MIN_CPUS": 1,
"MIN_MEMORY": 60000,
"MIN_MEMORY": 60_000_000_000,
"CPUS": 1,
"NODES": 1,
"TRES_GPUS_ALLOCATED": 1,
Expand Down Expand Up @@ -349,7 +349,7 @@ def test_cli(tmp_path: Path) -> None:
"collection_unixtime": 1668197951,
"GPUS_REQUESTED": 8,
"MIN_CPUS": 80,
"MIN_MEMORY": 60000,
"MIN_MEMORY": 60_000_000_000,
"CPUS": 2560,
"NODES": 32,
"TRES_GPUS_ALLOCATED": 256,
Expand Down Expand Up @@ -429,7 +429,7 @@ def test_cli(tmp_path: Path) -> None:
"collection_unixtime": FakeClock().unixtime(),
"GPUS_REQUESTED": 0,
"MIN_CPUS": 1,
"MIN_MEMORY": 10500,
"MIN_MEMORY": 10_500_000_000,
"CPUS": 1,
"NODES": 1,
"TRES_GPUS_ALLOCATED": 0,
Expand Down Expand Up @@ -475,7 +475,7 @@ def test_cli(tmp_path: Path) -> None:
"collection_unixtime": FakeClock().unixtime(),
"GPUS_REQUESTED": 8,
"MIN_CPUS": 16,
"MIN_MEMORY": 1000000,
"MIN_MEMORY": 1_000_000_000_000,
"CPUS": 320,
"NODES": 20,
"TRES_GPUS_ALLOCATED": 160,
Expand Down
Loading