From 2134878be0bf115a2ac6b92f519d53f16debcb74 Mon Sep 17 00:00:00 2001 From: Mark Kuckian Date: Thu, 30 Apr 2026 13:02:21 +0300 Subject: [PATCH 1/3] fix: bound parallel-load thread pool to avoid recursive deadlock MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `load_dependency_chain` recursively submits child loads to a shared executor and blocks on `f.result()` while still holding a worker slot. Python's default `ThreadPoolExecutor()` caps workers at min(32, cpu_count + 4), so a sufficiently deep + wide dep tree can starve waiting for slots and deadlock under `parallel_load=True`. Pick `max_workers=256` — high enough that the recursive pattern works for any realistic project, low enough that a runaway tree doesn't fork-bomb the OS. Co-Authored-By: Claude Opus 4.7 (1M context) --- mama/dependency_chain.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/mama/dependency_chain.py b/mama/dependency_chain.py index e048fd2..79e34ae 100644 --- a/mama/dependency_chain.py +++ b/mama/dependency_chain.py @@ -410,9 +410,15 @@ def get_build_dir_defines(build_dir): def load_dependency_chain(root: BuildDependency): """ This is main entrypoint for building the dependency chain. - All dependencies must be resolved at this stage + All dependencies must be resolved at this stage. + + With parallel_load=True, parents submit child loads to this executor and + then block on their futures while still holding a worker slot. The default + ThreadPoolExecutor() is bounded (~min(32, cpu_count+4)) so a moderately + deep dep tree can starve waiting for slots. We pick a max_workers high + enough that this doesn't happen for any realistic project. """ - with concurrent.futures.ThreadPoolExecutor() as e: + with concurrent.futures.ThreadPoolExecutor(max_workers=256) as e: def load_dependency(dep: BuildDependency): if dep.already_loaded: return dep.should_rebuild From 32cdca50ccff32d39bff88d5b0cd07e7d114adde Mon Sep 17 00:00:00 2001 From: Mark Kuckian Date: Thu, 30 Apr 2026 13:02:50 +0300 Subject: [PATCH 2/3] feat: add ssh connection multiplexing helper for parallel git fetches MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When mama runs `update` against many private git repositories on the same SSH host (e.g. github.com), each `git fetch` opens a fresh SSH connection and pays the full auth cost. This adds a helper module that lets a single auth'd master socket carry many concurrent git ops. Design rules (so we never break a user's existing setup): * Probe each host with `ssh -G user@host` and read effective config. * Only add `-o ControlMaster/ControlPath/ControlPersist` flags for hosts where the user has not already configured multiplexing. Same for `ServerAliveInterval/ServerAliveCountMax` keepalives. * Pre-warm one master per host with `ssh -fN` BEFORE concurrent fetches, then poll `ssh -O check` until the ControlPath socket is bound. Without this, N parallel fetches all race to BECOME the master and trigger N parallel auths — the exact thing multiplexing is meant to avoid. * If the pre-warm fails (auth declined, network blip, MFA timeout) we strip the multiplex options and let each fetch make its own simple connection — better than a thundering-herd auth. * Track masters we started; clean them up at exit. Pre-existing masters the user manages stay untouched. * Never touch ssh-agent: no IdentityAgent, no IdentityFile, no SSH_AUTH_SOCK manipulation. The user's keys and agent stay exactly as configured. `GIT_SSH_COMMAND` points at a small wrapper (`mama_ssh.py`) so per-host options can be applied just-in-time. A single global GIT_SSH_COMMAND can't encode per-host decisions, so the wrapper does it on each invocation. The wrapper is only installed when at least one host actually needs added options — for users with their config fully tuned this is a no-op. New build_config flags: * `parallel_max=N` (default 20) — caps concurrent git fetches via a semaphore, so 20 simultaneous fetches don't overwhelm a single master socket or the remote's anti-abuse limits. * `serial` — opt out of any auto-parallel-on-update behaviour. Tests cover URL parsing edge cases (Windows paths, IPv6, scheme rejection), ssh-G output parsing, options-decision logic (only add what the user hasn't), wrapper arg parsing, prewarm-failure-strips-multiplex-opts, and 50 threads racing on `ensure_master_for_url` for the same host probing exactly once. Co-Authored-By: Claude Opus 4.7 (1M context) --- mama/build_config.py | 6 + mama/utils/mama_ssh.py | 94 ++++ mama/utils/ssh_multiplex.py | 461 ++++++++++++++++++ .../test_ssh_multiplex/test_ssh_multiplex.py | 311 ++++++++++++ 4 files changed, 872 insertions(+) create mode 100755 mama/utils/mama_ssh.py create mode 100644 mama/utils/ssh_multiplex.py create mode 100644 tests/test_ssh_multiplex/test_ssh_multiplex.py diff --git a/mama/build_config.py b/mama/build_config.py index 6994033..71d699c 100644 --- a/mama/build_config.py +++ b/mama/build_config.py @@ -117,6 +117,8 @@ def __init__(self, args): self.convenient_install = [] ## Workspace and parsing self.parallel_load = False ## Whether to load dependencies in parallel? + self.serial_load = False ## If True, override the auto-parallel-on-update behaviour + self.parallel_max = 20 ## Cap concurrent git fetches (avoids hammering the SSH master) self.global_workspace = False if System.windows: self.workspaces_root = util.normalized_path(os.getenv('HOMEPATH')) @@ -160,6 +162,10 @@ def parse_args(self, args: List[str]): elif arg == 'silent': self.print = False elif arg == 'verbose': self.verbose = True elif arg == 'parallel': self.parallel_load = True + elif arg == 'serial': self.serial_load = True + elif arg.startswith('parallel_max='): + try: self.parallel_max = max(1, int(arg.split('=', 1)[1])) + except (ValueError, IndexError): pass elif arg == 'all': self.target = 'all' elif arg == 'test': self.test = ' ' # no test arguments elif arg == 'start': self.start = ' ' # no start arguments diff --git a/mama/utils/mama_ssh.py b/mama/utils/mama_ssh.py new file mode 100755 index 0000000..b6baf6c --- /dev/null +++ b/mama/utils/mama_ssh.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python3 +""" +GIT_SSH_COMMAND wrapper for mama. + +Git invokes us as if we were ssh: + mama_ssh.py [ssh-args] [-p port] [-l user] host command-on-remote... + +We figure out the destination, run `ssh -G` once to read the user's effective +config for that host, decide which `-o` options need adding (multiplexing, +keepalives) WITHOUT overriding anything the user already has, then exec ssh +with the augmented args. + +This wrapper is invoked many times (once per git op); it must be cheap. We +cache per-host options under $XDG_CACHE_HOME so repeated invocations during +the same mama run don't re-probe. + +If anything goes wrong we still exec ssh with the original args — never +break the build because of multiplexing setup. +""" + +from __future__ import annotations + +import json +import os +import sys +import time + +# Allow running as a standalone script without a parent package context. +_THIS_DIR = os.path.dirname(os.path.abspath(__file__)) +if __package__ in (None, ''): + sys.path.insert(0, os.path.dirname(_THIS_DIR)) # add mama/ parent to path + from utils import ssh_multiplex # type: ignore +else: + from . import ssh_multiplex + + +_CACHE_TTL_SECONDS = 60 * 30 # 30 minutes per-host options cache + + +def _cache_path() -> str: + base = os.environ.get('XDG_CACHE_HOME') or os.path.expanduser('~/.cache') + d = os.path.join(base, 'mama') + os.makedirs(d, exist_ok=True) + return os.path.join(d, 'ssh_host_options.json') + + +def _load_cache() -> dict: + try: + with open(_cache_path(), 'r') as f: + return json.load(f) + except (OSError, ValueError): + return {} + + +def _save_cache(cache: dict) -> None: + path = _cache_path() + tmp = path + '.tmp' + try: + with open(tmp, 'w') as f: + json.dump(cache, f) + os.replace(tmp, path) + except OSError: + pass + + +def _options_for(user: str, host: str, port: str | None) -> list[str]: + key = ssh_multiplex.host_key(user, host, port) + cache = _load_cache() + entry = cache.get(key) + if entry and time.time() - entry.get('ts', 0) < _CACHE_TTL_SECONDS: + return list(entry.get('opts', [])) + probe = ssh_multiplex.probe_ssh_config(user, host, port) + opts, _ = ssh_multiplex.options_to_add(probe) + cache[key] = {'ts': time.time(), 'opts': opts} + _save_cache(cache) + return opts + + +def main(argv: list[str]) -> int: + args = argv[1:] + extra: list[str] = [] + parsed = ssh_multiplex.parse_host_from_ssh_args(args) + if parsed is not None: + user, host, port = parsed + try: + extra = _options_for(user, host, port) + except Exception: + extra = [] + final = ['ssh', *extra, *args] + os.execvp('ssh', final) + + +if __name__ == '__main__': + raise SystemExit(main(sys.argv)) diff --git a/mama/utils/ssh_multiplex.py b/mama/utils/ssh_multiplex.py new file mode 100644 index 0000000..c1c0206 --- /dev/null +++ b/mama/utils/ssh_multiplex.py @@ -0,0 +1,461 @@ +""" +SSH connection multiplexing for git operations. + +When mama runs `update` against many private git repositories on the same SSH +host (e.g. github.com), each `git fetch` opens a fresh SSH connection and pays +the full auth cost. Multiplexing lets a single auth'd master socket carry many +parallel git ops. + +Design rules: +* Probe via `ssh -G user@host` to read the user's effective config. We never + override settings the user has already configured (ControlMaster, + ControlPath, ServerAliveInterval, ServerAliveCountMax). Our `-o` flags are + added only for keys the user has not set. +* `GIT_SSH_COMMAND` points at a small wrapper (`mama_ssh.py`) so per-host + options can be applied just-in-time. A single global GIT_SSH_COMMAND can't + encode per-host decisions, so the wrapper does it on each invocation. +* Pre-warm one master per host with `ssh -fN` BEFORE we kick off parallel git + ops, so 20 concurrent fetches don't trigger 20 concurrent auths. +* Track which masters we started; clean them up at exit. Pre-existing masters + the user manages are left alone. +* Never touch ssh-agent: no IdentityAgent, no IdentityFile, no manipulation of + SSH_AUTH_SOCK. The user's keys and agent stay exactly as configured. +""" + +from __future__ import annotations + +import atexit +import os +import re +import shlex +import subprocess +import sys +import threading +from urllib.parse import urlparse + + +DEFAULT_MAX_CONCURRENT_FETCHES = 20 + +_OUR_CONTROL_DIR = os.path.expanduser('~/.ssh/cm') +_OUR_CONTROL_PATH = os.path.join(_OUR_CONTROL_DIR, '%C') + +_DEFAULT_KEEPALIVE_INTERVAL = '60' +_DEFAULT_KEEPALIVE_COUNT = '3' +_DEFAULT_CONTROL_PERSIST = '10m' + +# Marker so the wrapper knows we set GIT_SSH_COMMAND ourselves and didn't +# inherit it from the user. +_OWNED_ENV = 'MAMA_SSH_MUX_OWNED' + + +# Module state ------------------------------------------------------------- + +_state_lock = threading.Lock() +_per_host_locks: dict[str, threading.Lock] = {} +_warmed: dict[str, dict] = {} # host_key -> info dict +_fetch_semaphore: threading.Semaphore | None = None +_atexit_registered = False +_verbose = False + + +# URL parsing -------------------------------------------------------------- + +# scp-style git URL: [user@]host:path (the path must NOT start with //, that +# would be ssh://). We anchor on a colon that isn't followed by //. +_SCP_RE = re.compile(r'^(?:(?P[^@/\s]+)@)?(?P[^:/\s]+):(?!//)') + + +def parse_ssh_endpoint(url: str) -> tuple[str, str, str | None] | None: + """ + Return (user, host, port_or_None) for an SSH-using git URL, or None. + + Accepts: + git@github.com:user/repo.git -> ('git', 'github.com', None) + ssh://git@host:2222/user/repo.git -> ('git', 'host', '2222') + Rejects: + https://github.com/user/repo.git + /path/to/local/repo + file:///... + C:/foo (Windows path, not scp-style) + host: (no path after colon) + """ + if not url: + return None + if url.startswith('ssh://'): + try: + p = urlparse(url) + except ValueError: + return None + if not p.hostname: + return None + port = str(p.port) if p.port else None + return (p.username or 'git', p.hostname, port) + # Reject anything that has a non-ssh scheme. + if '://' in url: + return None + # Reject Windows-style absolute paths: a single drive letter followed by + # `:` and then `/` or `\`. Git itself doesn't treat these as scp URLs. + if len(url) >= 3 and url[1] == ':' and url[0].isalpha() and url[2] in ('/', '\\'): + return None + m = _SCP_RE.match(url) + if not m: + return None + host = m.group('host') + # Require the colon to be followed by a non-empty path component; + # `host:` with nothing after isn't a real git URL. + if m.end() >= len(url): + return None + # Bracketed IPv6 in scp-form (`git@[::1]:repo`) is not supported by git + # itself; punt to None rather than report a bogus host. + if '[' in host or ']' in host: + return None + return (m.group('user') or 'git', host, None) + + +def host_key(user: str, host: str, port: str | None) -> str: + return f'{user}@{host}:{port or ""}' + + +# ssh -G probe ------------------------------------------------------------- + +def probe_ssh_config(user: str, host: str, port: str | None, + timeout: float = 5.0) -> dict[str, str]: + """ + Run `ssh -G [-p port] user@host` and return effective config (lower-cased + keys). Empty dict on failure — probe must never block the build. + """ + cmd = ['ssh', '-G'] + if port: + cmd += ['-p', port] + cmd += [f'{user}@{host}'] + try: + cp = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout) + except (subprocess.TimeoutExpired, FileNotFoundError, OSError): + return {} + if cp.returncode != 0: + return {} + out: dict[str, str] = {} + for line in cp.stdout.splitlines(): + line = line.strip() + if not line or line.startswith('#'): + continue + parts = line.split(None, 1) + if len(parts) == 2: + # `ssh -G` prints each key only once; keep first. + key = parts[0].lower() + if key not in out: + out[key] = parts[1] + return out + + +def is_multiplex_configured(probe: dict[str, str]) -> bool: + """User has both ControlMaster (yes/auto/ask/autoask) AND a ControlPath.""" + cm = probe.get('controlmaster', 'no').lower() + cp = probe.get('controlpath', 'none').lower() + return cm not in ('no', 'false', '') and cp not in ('none', '', 'no') + + +def _is_keepalive_configured(probe: dict[str, str]) -> bool: + interval = probe.get('serveraliveinterval', '0') + return interval not in ('0', '', None) + + +def options_to_add(probe: dict[str, str]) -> tuple[list[str], bool]: + """ + Return (-o args, we_own_master). `we_own_master` is True when we are the + one configuring multiplex (and therefore responsible for pre-warming and + cleaning it up). False if the user already has multiplex configured. + """ + opts: list[str] = [] + we_own_master = False + if not is_multiplex_configured(probe): + we_own_master = True + os.makedirs(_OUR_CONTROL_DIR, mode=0o700, exist_ok=True) + opts += [ + '-oControlMaster=auto', + f'-oControlPath={_OUR_CONTROL_PATH}', + f'-oControlPersist={_DEFAULT_CONTROL_PERSIST}', + ] + if not _is_keepalive_configured(probe): + opts += [ + f'-oServerAliveInterval={_DEFAULT_KEEPALIVE_INTERVAL}', + f'-oServerAliveCountMax={_DEFAULT_KEEPALIVE_COUNT}', + ] + return opts, we_own_master + + +# Per-host setup ----------------------------------------------------------- + +def _host_lock(key: str) -> threading.Lock: + with _state_lock: + lk = _per_host_locks.get(key) + if lk is None: + lk = threading.Lock() + _per_host_locks[key] = lk + return lk + + +def set_verbose(v: bool) -> None: + global _verbose + _verbose = bool(v) + + +def _log(msg: str) -> None: + if _verbose: + sys.stderr.write(f'[ssh-mux] {msg}\n') + + +def ensure_master_for_url(url: str) -> None: + """ + Idempotent. Probes the host's SSH config and, if multiplexing isn't + already set up by the user, opens a master connection and remembers it + for cleanup. Sets GIT_SSH_COMMAND so subsequent git ops use our wrapper. + + Safe to call concurrently from multiple threads. Blocks the FIRST caller + per host while the master is being established; subsequent callers return + immediately. + """ + ep = parse_ssh_endpoint(url) + if ep is None: + return + user, host, port = ep + key = host_key(user, host, port) + + if key in _warmed: + return + + with _host_lock(key): + if key in _warmed: + return + + probe = probe_ssh_config(user, host, port) + opts, we_own_master = options_to_add(probe) + + if we_own_master: + master_up = _start_master(user, host, port, opts) + if not master_up: + # Pre-warm failed (auth declined, network blip, host key + # prompt, MFA timeout). If we left ControlMaster/ControlPath + # in opts, every subsequent fetch would race to BECOME the + # master and we'd trigger N concurrent auths instead of one — + # the exact thing multiplexing is meant to prevent. Strip + # the multiplex flags so each fetch makes its own simple + # connection. Keepalives are still useful and stay. + opts = [o for o in opts + if not (o.startswith('-oControlMaster=') + or o.startswith('-oControlPath=') + or o.startswith('-oControlPersist='))] + we_own_master = False + + with _state_lock: + _warmed[key] = { + 'user': user, 'host': host, 'port': port, + 'opts': opts, 'we_own_master': we_own_master, + } + # Only install the GIT_SSH_COMMAND wrapper when there's something for + # it to do. If the user has every host configured to their liking the + # wrapper would only add a fork+exec per git op for no benefit. + if opts: + _set_git_ssh_command() + if we_own_master: + _ensure_atexit() + + +def _master_control_args(opts: list[str]) -> list[str]: + """The subset of options needed to address a master on the same socket.""" + return [o for o in opts + if o.startswith('-oControlPath=') or o.startswith('-oControlPersist=')] + + +def _start_master(user: str, host: str, port: str | None, opts: list[str]) -> bool: + """ + Open a master in the background with `ssh -fN` and verify it's listening + via `ssh -O check`. Returns True only if the master is confirmed ready — + callers should downgrade to non-multiplexed mode on False so concurrent + fetches don't all race to be the master and trigger N parallel auths. + """ + cmd = ['ssh', '-fN'] + # Force ControlMaster=yes for the master itself; replace any =auto. + cmd += [o for o in opts if not o.startswith('-oControlMaster=')] + cmd += ['-oControlMaster=yes'] + if port: + cmd += ['-p', port] + cmd += [f'{user}@{host}'] + _log(f'opening master: {" ".join(shlex.quote(c) for c in cmd)}') + try: + # 30s is generous for password/2FA prompts. -fN backgrounds AFTER auth + # but BEFORE the ControlPath socket is bound, so we still need to poll. + cp = subprocess.run(cmd, timeout=30, capture_output=True, text=True) + if cp.returncode != 0: + _log(f'master start rc={cp.returncode} stderr={cp.stderr.strip()}') + return False + except subprocess.TimeoutExpired: + _log(f'master start timed out for {host}') + return False + except (OSError, FileNotFoundError) as e: + _log(f'master start failed for {host}: {e}') + return False + + return _wait_master_ready(user, host, port, opts) + + +def _wait_master_ready(user: str, host: str, port: str | None, + opts: list[str], deadline_s: float = 5.0) -> bool: + """ + Poll `ssh -O check` until the master responds or we hit the deadline. + `ssh -fN` returns as soon as auth+fork happen, but the ControlPath socket + can take a brief moment to bind. Without this poll the first racing + fetches see "no socket yet" and each open their own connection. + """ + import time as _t + check = ['ssh', '-Ocheck'] + _master_control_args(opts) + if port: + check += ['-p', port] + check += [f'{user}@{host}'] + end = _t.monotonic() + deadline_s + delay = 0.05 + while _t.monotonic() < end: + try: + cp = subprocess.run(check, timeout=2, capture_output=True, text=True) + except (subprocess.TimeoutExpired, OSError): + return False + if cp.returncode == 0: + return True + _t.sleep(delay) + delay = min(delay * 2, 0.5) + _log(f'master never became ready for {host}') + return False + + +def cleanup_masters() -> None: + """Run `ssh -O exit` for masters we started. Don't touch user-owned ones.""" + with _state_lock: + snapshot = list(_warmed.values()) + for info in snapshot: + if not info.get('we_own_master'): + continue + cmd = ['ssh', '-Oexit'] + _master_control_args(info['opts']) + if info.get('port'): + cmd += ['-p', info['port']] + cmd += [f'{info["user"]}@{info["host"]}'] + try: + subprocess.run(cmd, timeout=5, capture_output=True) + except Exception: + pass + + +def _ensure_atexit() -> None: + global _atexit_registered + if not _atexit_registered: + atexit.register(cleanup_masters) + _atexit_registered = True + + +def _set_git_ssh_command() -> None: + # If user already set GIT_SSH_COMMAND we leave it alone — they've made an + # explicit choice. Our wrapper would override that. + if os.environ.get('GIT_SSH_COMMAND') and os.environ.get(_OWNED_ENV) != '1': + return + wrapper = wrapper_script_path() + os.environ['GIT_SSH_COMMAND'] = ( + shlex.quote(sys.executable) + ' ' + shlex.quote(wrapper) + ) + os.environ[_OWNED_ENV] = '1' + + +def wrapper_script_path() -> str: + return os.path.join(os.path.dirname(__file__), 'mama_ssh.py') + + +# Concurrent-fetch semaphore ----------------------------------------------- + +def init_fetch_semaphore(max_concurrent: int = DEFAULT_MAX_CONCURRENT_FETCHES) -> None: + """Initialise the global semaphore that caps concurrent git fetches.""" + global _fetch_semaphore + n = max(1, int(max_concurrent)) + with _state_lock: + if _fetch_semaphore is None: + _fetch_semaphore = threading.Semaphore(n) + + +class _NullCM: + def __enter__(self): return self + def __exit__(self, *exc): return False + + +class _SemCM: + def __init__(self, sem: threading.Semaphore): + self._sem = sem + def __enter__(self): + self._sem.acquire() + return self + def __exit__(self, *exc): + self._sem.release() + return False + + +def fetch_slot(): + """ + Context manager that holds a slot in the fetch semaphore. No-op if + `init_fetch_semaphore` has not been called (e.g. for non-parallel runs). + """ + sem = _fetch_semaphore + return _SemCM(sem) if sem is not None else _NullCM() + + +# Wrapper helpers (used by mama_ssh.py) ------------------------------------ + +# OpenSSH options that take an argument. If unsure, keep this conservative — +# being wrong only means we may misidentify the host position; we still +# fall back to a no-decoration exec. +_SSH_OPTS_WITH_ARG = { + '-b', '-B', '-c', '-D', '-E', '-e', '-F', '-I', '-i', '-J', '-L', '-l', + '-m', '-O', '-o', '-p', '-Q', '-R', '-S', '-W', '-w', +} + + +def parse_host_from_ssh_args(argv: list[str]) -> tuple[str, str, str | None] | None: + """ + Walk argv mirroring OpenSSH's getopt_long, return (user, host, port) of + the destination. Returns None if we can't identify the host. + """ + user = None + port = None + host = None + i = 0 + while i < len(argv): + a = argv[i] + if a == '--': + i += 1 + break + if a.startswith('-') and len(a) > 1: + short = a[:2] + if short in _SSH_OPTS_WITH_ARG: + if len(a) == 2: + i += 2 # value is in next arg + else: + i += 1 # value is glued (e.g. -oFoo=bar, -p2222) + if short == '-p' and len(a) > 2: + port = a[2:] + if short == '-l' and len(a) > 2: + user = a[2:] + if short == '-p' and len(a) == 2 and i - 1 < len(argv): + port = argv[i - 1] + if short == '-l' and len(a) == 2 and i - 1 < len(argv): + user = argv[i - 1] + continue + # boolean flag + i += 1 + continue + # First non-flag is the destination + host = a + if '@' in host: + user_part, host = host.split('@', 1) + if user is None: + user = user_part + break + if not host: + return None + if user is None: + user = 'git' + return (user, host, port) diff --git a/tests/test_ssh_multiplex/test_ssh_multiplex.py b/tests/test_ssh_multiplex/test_ssh_multiplex.py new file mode 100644 index 0000000..f0fc53c --- /dev/null +++ b/tests/test_ssh_multiplex/test_ssh_multiplex.py @@ -0,0 +1,311 @@ +"""Unit tests for mama.utils.ssh_multiplex pure-logic helpers. + +These cover: +* URL -> (user, host, port) parsing for SSH and non-SSH URLs. +* ssh-G probe output -> options decision: ControlMaster/ControlPath added + only when the user has not already configured multiplexing. +* GIT_SSH_COMMAND wrapper arg parsing. + +Network-touching paths (probe, prewarm) are mocked. +""" +from __future__ import annotations + +import os +import sys +from unittest import mock + +import pytest + +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))) +from mama.utils import ssh_multiplex as sm # noqa: E402 + + +class TestParseSshEndpoint: + def test_scp_form(self): + assert sm.parse_ssh_endpoint('git@github.com:foo/bar.git') == ('git', 'github.com', None) + + def test_scp_form_user_other_than_git(self): + assert sm.parse_ssh_endpoint('alice@host.example:proj.git') == ('alice', 'host.example', None) + + def test_scp_form_no_user(self): + # Falls back to default 'git' user. + assert sm.parse_ssh_endpoint('host.example:proj.git') == ('git', 'host.example', None) + + def test_ssh_url_with_port(self): + assert sm.parse_ssh_endpoint('ssh://git@host:2222/foo/bar.git') == ('git', 'host', '2222') + + def test_ssh_url_no_user(self): + assert sm.parse_ssh_endpoint('ssh://host/foo/bar.git') == ('git', 'host', None) + + def test_https_rejected(self): + assert sm.parse_ssh_endpoint('https://github.com/foo/bar.git') is None + + def test_http_rejected(self): + assert sm.parse_ssh_endpoint('http://github.com/foo/bar.git') is None + + def test_file_url_rejected(self): + assert sm.parse_ssh_endpoint('file:///srv/repos/foo.git') is None + + def test_local_path_rejected(self): + assert sm.parse_ssh_endpoint('/srv/repos/foo.git') is None + + def test_relative_path_rejected(self): + # 'foo/bar.git' has no colon — not scp-style, no scheme. + assert sm.parse_ssh_endpoint('foo/bar.git') is None + + def test_empty_url(self): + assert sm.parse_ssh_endpoint('') is None + assert sm.parse_ssh_endpoint(None) is None + + def test_windows_path_rejected(self): + # Windows drive paths must NOT be treated as scp-form. + assert sm.parse_ssh_endpoint('C:/foo/bar') is None + assert sm.parse_ssh_endpoint('D:\\repos\\proj') is None + + def test_host_with_no_path_rejected(self): + # `host:` with nothing after isn't a real git URL. + assert sm.parse_ssh_endpoint('git@host:') is None + + def test_bracketed_ipv6_rejected(self): + # git itself doesn't treat scp-form bracketed IPv6 as a URL. + assert sm.parse_ssh_endpoint('git@[::1]:repo.git') is None + + +class TestIsMultiplexConfigured: + def test_no_controlmaster_no_controlpath(self): + assert not sm.is_multiplex_configured({'controlmaster': 'no', 'controlpath': 'none'}) + + def test_controlmaster_auto_with_path(self): + assert sm.is_multiplex_configured({'controlmaster': 'auto', 'controlpath': '~/.ssh/cm/%C'}) + + def test_controlmaster_yes_with_path(self): + assert sm.is_multiplex_configured({'controlmaster': 'yes', 'controlpath': '/tmp/sock'}) + + def test_controlmaster_set_but_path_none(self): + # ControlPath=none means no socket -> not multiplexed even with master=auto. + assert not sm.is_multiplex_configured({'controlmaster': 'auto', 'controlpath': 'none'}) + + def test_path_set_but_no_master(self): + assert not sm.is_multiplex_configured({'controlmaster': 'no', 'controlpath': '/tmp/sock'}) + + def test_empty_probe(self): + # When ssh -G fails the probe is empty; treat as "not configured". + assert not sm.is_multiplex_configured({}) + + +class TestOptionsToAdd: + def test_user_has_full_config(self): + probe = { + 'controlmaster': 'auto', + 'controlpath': '~/.ssh/sockets/%C', + 'serveraliveinterval': '30', + 'serveralivecountmax': '5', + } + opts, we_own = sm.options_to_add(probe) + assert opts == [], 'should add nothing when user has everything' + assert we_own is False + + def test_user_has_nothing(self, tmp_path, monkeypatch): + # Avoid mkdir on the user's actual ~/.ssh/cm. + monkeypatch.setattr(sm, '_OUR_CONTROL_DIR', str(tmp_path / 'cm')) + monkeypatch.setattr(sm, '_OUR_CONTROL_PATH', str(tmp_path / 'cm' / '%C')) + probe = {'controlmaster': 'no', 'controlpath': 'none'} + opts, we_own = sm.options_to_add(probe) + assert we_own is True + assert any(o.startswith('-oControlMaster=') for o in opts) + assert any(o.startswith('-oControlPath=') for o in opts) + assert any(o.startswith('-oControlPersist=') for o in opts) + assert any(o.startswith('-oServerAliveInterval=') for o in opts) + assert any(o.startswith('-oServerAliveCountMax=') for o in opts) + + def test_user_has_keepalives_only(self, tmp_path, monkeypatch): + monkeypatch.setattr(sm, '_OUR_CONTROL_DIR', str(tmp_path / 'cm')) + monkeypatch.setattr(sm, '_OUR_CONTROL_PATH', str(tmp_path / 'cm' / '%C')) + probe = { + 'controlmaster': 'no', 'controlpath': 'none', + 'serveraliveinterval': '60', 'serveralivecountmax': '3', + } + opts, we_own = sm.options_to_add(probe) + assert we_own is True + # We add multiplex but NOT keepalives (user has them already). + assert any(o.startswith('-oControlMaster=') for o in opts) + assert not any(o.startswith('-oServerAliveInterval=') for o in opts) + assert not any(o.startswith('-oServerAliveCountMax=') for o in opts) + + def test_user_has_multiplex_only(self): + probe = { + 'controlmaster': 'auto', 'controlpath': '/tmp/sock', + 'serveraliveinterval': '0', + } + opts, we_own = sm.options_to_add(probe) + assert we_own is False + # No control* options; only keepalives. + assert not any(o.startswith('-oControlMaster=') for o in opts) + assert not any(o.startswith('-oControlPath=') for o in opts) + assert any(o.startswith('-oServerAliveInterval=') for o in opts) + + +class TestParseHostFromSshArgs: + def test_simple(self): + # Mimic git's invocation: [opts] host command + args = ['-o', 'SendEnv=GIT_PROTOCOL', 'git@github.com', + "git-upload-pack 'foo/bar.git'"] + assert sm.parse_host_from_ssh_args(args) == ('git', 'github.com', None) + + def test_user_at_form(self): + args = ['alice@example.com', 'echo hi'] + assert sm.parse_host_from_ssh_args(args) == ('alice', 'example.com', None) + + def test_separate_l_flag(self): + args = ['-l', 'alice', 'example.com', 'echo hi'] + assert sm.parse_host_from_ssh_args(args) == ('alice', 'example.com', None) + + def test_glued_p_flag(self): + args = ['-p2222', 'git@host', 'cmd'] + assert sm.parse_host_from_ssh_args(args) == ('git', 'host', '2222') + + def test_separate_p_flag(self): + args = ['-p', '2222', 'git@host', 'cmd'] + assert sm.parse_host_from_ssh_args(args) == ('git', 'host', '2222') + + def test_glued_o_flag(self): + args = ['-oStrictHostKeyChecking=no', 'git@host', 'cmd'] + assert sm.parse_host_from_ssh_args(args) == ('git', 'host', None) + + def test_no_args(self): + assert sm.parse_host_from_ssh_args([]) is None + + def test_only_options(self): + assert sm.parse_host_from_ssh_args(['-v', '-q']) is None + + +class TestProbeSshConfig: + def test_parses_keys(self): + fake_out = ( + "user git\n" + "hostname github.com\n" + "ControlMaster auto\n" + "ControlPath ~/.ssh/sockets/%C\n" + "# comment line\n" + "\n" + "ServerAliveInterval 30\n" + ) + fake_cp = mock.Mock(returncode=0, stdout=fake_out) + with mock.patch('subprocess.run', return_value=fake_cp) as run: + cfg = sm.probe_ssh_config('git', 'github.com', None) + run.assert_called_once() + assert cfg['user'] == 'git' + assert cfg['hostname'] == 'github.com' + assert cfg['controlmaster'] == 'auto' + assert cfg['controlpath'] == '~/.ssh/sockets/%C' + assert cfg['serveraliveinterval'] == '30' + + def test_returns_empty_on_failure(self): + fake_cp = mock.Mock(returncode=255, stdout='', stderr='boom') + with mock.patch('subprocess.run', return_value=fake_cp): + assert sm.probe_ssh_config('git', 'host', None) == {} + + def test_returns_empty_on_timeout(self): + import subprocess as sp + with mock.patch('subprocess.run', side_effect=sp.TimeoutExpired('ssh', 5)): + assert sm.probe_ssh_config('git', 'host', None) == {} + + +class TestEnsureMasterIdempotent: + def test_runs_probe_once_per_host(self, monkeypatch): + # Reset module state. + monkeypatch.setattr(sm, '_warmed', {}) + monkeypatch.setattr(sm, '_per_host_locks', {}) + + probe_calls = [] + def fake_probe(user, host, port, timeout=5.0): + probe_calls.append((user, host, port)) + return {'controlmaster': 'auto', 'controlpath': '/tmp/x'} + monkeypatch.setattr(sm, 'probe_ssh_config', fake_probe) + + # User already has multiplex => we DON'T start a master, just remember. + url = 'git@github.com:foo/bar.git' + sm.ensure_master_for_url(url) + sm.ensure_master_for_url(url) + sm.ensure_master_for_url(url) + assert len(probe_calls) == 1 + assert sm._warmed['git@github.com:']['we_own_master'] is False + + def test_starts_master_when_user_lacks_config(self, monkeypatch, tmp_path): + monkeypatch.setattr(sm, '_warmed', {}) + monkeypatch.setattr(sm, '_per_host_locks', {}) + monkeypatch.setattr(sm, '_OUR_CONTROL_DIR', str(tmp_path / 'cm')) + monkeypatch.setattr(sm, '_OUR_CONTROL_PATH', str(tmp_path / 'cm' / '%C')) + + monkeypatch.setattr(sm, 'probe_ssh_config', + lambda u, h, p, timeout=5.0: {}) # nothing configured + + master_calls = [] + def fake_start(user, host, port, opts): + master_calls.append((user, host, port, list(opts))) + return True + monkeypatch.setattr(sm, '_start_master', fake_start) + + sm.ensure_master_for_url('git@example.com:foo.git') + sm.ensure_master_for_url('git@example.com:bar.git') # same host + assert len(master_calls) == 1 + assert sm._warmed['git@example.com:']['we_own_master'] is True + + def test_prewarm_failure_strips_multiplex_opts(self, monkeypatch, tmp_path): + # When _start_master fails, we MUST clear ControlMaster/Path/Persist + # from opts. Otherwise N parallel fetches would race to be the master + # and trigger N concurrent auths — the exact thing this is meant to + # prevent. + monkeypatch.setattr(sm, '_warmed', {}) + monkeypatch.setattr(sm, '_per_host_locks', {}) + monkeypatch.setattr(sm, '_OUR_CONTROL_DIR', str(tmp_path / 'cm')) + monkeypatch.setattr(sm, '_OUR_CONTROL_PATH', str(tmp_path / 'cm' / '%C')) + monkeypatch.setattr(sm, 'probe_ssh_config', + lambda u, h, p, timeout=5.0: {}) + monkeypatch.setattr(sm, '_start_master', + lambda u, h, p, o: False) # always fail + + sm.ensure_master_for_url('git@example.com:foo.git') + info = sm._warmed['git@example.com:'] + assert info['we_own_master'] is False + for o in info['opts']: + assert not o.startswith('-oControlMaster=') + assert not o.startswith('-oControlPath=') + assert not o.startswith('-oControlPersist=') + # Keepalives are still useful and stay. + assert any(o.startswith('-oServerAliveInterval=') for o in info['opts']) + + def test_concurrent_ensure_probes_once(self, monkeypatch, tmp_path): + """50 threads racing on the same host must result in exactly one probe + and at most one master start.""" + import threading + monkeypatch.setattr(sm, '_warmed', {}) + monkeypatch.setattr(sm, '_per_host_locks', {}) + monkeypatch.setattr(sm, '_OUR_CONTROL_DIR', str(tmp_path / 'cm')) + monkeypatch.setattr(sm, '_OUR_CONTROL_PATH', str(tmp_path / 'cm' / '%C')) + + probe_count = [0] + probe_lock = threading.Lock() + def slow_probe(user, host, port, timeout=5.0): + with probe_lock: + probe_count[0] += 1 + # simulate the syscall being slow so threads pile up on the lock + import time as _t; _t.sleep(0.05) + return {'controlmaster': 'auto', 'controlpath': '/tmp/sock'} + monkeypatch.setattr(sm, 'probe_ssh_config', slow_probe) + + start_event = threading.Event() + def worker(): + start_event.wait() + sm.ensure_master_for_url('git@example.com:proj.git') + threads = [threading.Thread(target=worker) for _ in range(50)] + for t in threads: t.start() + start_event.set() + for t in threads: t.join() + assert probe_count[0] == 1 + + +@pytest.fixture(autouse=True) +def _clean_env(monkeypatch): + monkeypatch.delenv('GIT_SSH_COMMAND', raising=False) + monkeypatch.delenv(sm._OWNED_ENV, raising=False) From 7e10ee7883f9b097a36c9f82778c859a79d03f9b Mon Sep 17 00:00:00 2001 From: Mark Kuckian Date: Thu, 30 Apr 2026 13:03:48 +0300 Subject: [PATCH 3/3] feat: wire ssh_multiplex into git ops and auto-enable parallel update MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `Git.run_git`, `git ls-remote`, and `git clone` now call `ssh_multiplex.ensure_master_for_url(url)` before each network op and acquire a slot in the global `fetch_slot()` semaphore — so up to N (default 20) git operations can run concurrently against the same SSH master. `load_dependency_chain` initialises the fetch semaphore from the new `parallel_max` config and auto-enables `parallel_load=True` whenever the user runs `mama update`, unless `serial` is passed on the command line. This is the change that makes `mama update` actually do its fetches in parallel by default. Co-Authored-By: Claude Opus 4.7 (1M context) --- mama/dependency_chain.py | 18 ++++++++++++++++++ mama/types/git.py | 13 ++++++++++--- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/mama/dependency_chain.py b/mama/dependency_chain.py index 79e34ae..b641aa1 100644 --- a/mama/dependency_chain.py +++ b/mama/dependency_chain.py @@ -4,6 +4,7 @@ from mama.build_config import BuildConfig from .build_dependency import BuildDependency from .util import read_text_from, write_text_to, save_file_if_contents_changed +from .utils import ssh_multiplex from .utils.system import Color, console, error @@ -417,7 +418,24 @@ def load_dependency_chain(root: BuildDependency): ThreadPoolExecutor() is bounded (~min(32, cpu_count+4)) so a moderately deep dep tree can starve waiting for slots. We pick a max_workers high enough that this doesn't happen for any realistic project. + + For `update` runs we auto-enable parallel_load so concurrent git fetches + share an SSH multiplexed master. The actual fetch concurrency is capped + at `parallel_max` (default 20) by a semaphore inside Git.run_git. + + NOTE on parallel_load: existing helpers like BuildDependency.add_child + and BuildDependency.load are not strictly thread-safe (the existing + `currently_loading` busy-wait has a TOCTOU window). For most projects + this is benign because concurrent loads of the SAME dep are rare. Pass + `serial` on the command line to disable parallel loading if you hit + issues. """ + if root.config.update and not root.config.serial_load: + root.config.parallel_load = True + + ssh_multiplex.set_verbose(root.config.verbose) + ssh_multiplex.init_fetch_semaphore(root.config.parallel_max) + with concurrent.futures.ThreadPoolExecutor(max_workers=256) as e: def load_dependency(dep: BuildDependency): if dep.already_loaded: diff --git a/mama/types/git.py b/mama/types/git.py index 34e2361..078bb8b 100644 --- a/mama/types/git.py +++ b/mama/types/git.py @@ -5,6 +5,7 @@ from .dep_source import DepSource from ..utils.system import Color, System, console, error from ..utils.sub_process import SubProcess, execute, execute_piped, execute_piped_echo +from ..utils import ssh_multiplex from ..util import is_dir_empty, save_file_if_contents_changed, read_lines_from, path_join @@ -65,7 +66,9 @@ def run_git(self, dep: BuildDependency, git_command, throw=True): cmd = f"cd {dep.src_dir} && git {git_command}" if dep.config.verbose: console(f' {dep.name: <16} git {git_command}', color=Color.YELLOW) - return execute(cmd, throw=throw) + ssh_multiplex.ensure_master_for_url(self.url) + with ssh_multiplex.fetch_slot(): + return execute(cmd, throw=throw) def _has_local_modifications(self, dep: BuildDependency) -> bool: @@ -124,7 +127,9 @@ def init_commit_hash(self, dep: BuildDependency, use_cache: bool, fetch_remote: try: if self.branch: arguments = self.branch elif self.tag: arguments = self.tag - result = execute_piped(f'git ls-remote {self.url} {arguments}', timeout=5) + ssh_multiplex.ensure_master_for_url(self.url) + with ssh_multiplex.fetch_slot(): + result = execute_piped(f'git ls-remote {self.url} {arguments}', timeout=5) if result: result = result.split(' ')[0][0:7] if dep.config.verbose: console(f' {self.name} git ls-remote {self.url} {arguments}: {result}', color=Color.YELLOW) @@ -299,7 +304,9 @@ def print_output(p:SubProcess, line:str): cmd = f'git clone {clone_args} {clone_to_dir}' if dep.config.verbose: console(f' {dep.name: <16} {cmd}') - result = SubProcess.run(cmd, io_func=print_output) + ssh_multiplex.ensure_master_for_url(self.url) + with ssh_multiplex.fetch_slot(): + result = SubProcess.run(cmd, io_func=print_output) # handle the result: if dep.config.print: