Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions mama/build_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ def __init__(self, args):
self.convenient_install = []
## Workspace and parsing
self.parallel_load = False ## Whether to load dependencies in parallel?
self.serial_load = False ## If True, override the auto-parallel-on-update behaviour
self.parallel_max = 20 ## Cap concurrent git fetches (avoids hammering the SSH master)
self.global_workspace = False
if System.windows:
self.workspaces_root = util.normalized_path(os.getenv('HOMEPATH'))
Expand Down Expand Up @@ -160,6 +162,10 @@ def parse_args(self, args: List[str]):
elif arg == 'silent': self.print = False
elif arg == 'verbose': self.verbose = True
elif arg == 'parallel': self.parallel_load = True
elif arg == 'serial': self.serial_load = True
elif arg.startswith('parallel_max='):
try: self.parallel_max = max(1, int(arg.split('=', 1)[1]))
except (ValueError, IndexError): pass
elif arg == 'all': self.target = 'all'
elif arg == 'test': self.test = ' ' # no test arguments
elif arg == 'start': self.start = ' ' # no start arguments
Expand Down
28 changes: 26 additions & 2 deletions mama/dependency_chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from mama.build_config import BuildConfig
from .build_dependency import BuildDependency
from .util import read_text_from, write_text_to, save_file_if_contents_changed
from .utils import ssh_multiplex
from .utils.system import Color, console, error


Expand Down Expand Up @@ -410,9 +411,32 @@ def get_build_dir_defines(build_dir):
def load_dependency_chain(root: BuildDependency):
"""
This is main entrypoint for building the dependency chain.
All dependencies must be resolved at this stage
All dependencies must be resolved at this stage.

With parallel_load=True, parents submit child loads to this executor and
then block on their futures while still holding a worker slot. The default
ThreadPoolExecutor() is bounded (~min(32, cpu_count+4)) so a moderately
deep dep tree can starve waiting for slots. We pick a max_workers high
enough that this doesn't happen for any realistic project.

For `update` runs we auto-enable parallel_load so concurrent git fetches
share an SSH multiplexed master. The actual fetch concurrency is capped
at `parallel_max` (default 20) by a semaphore inside Git.run_git.

NOTE on parallel_load: existing helpers like BuildDependency.add_child
and BuildDependency.load are not strictly thread-safe (the existing
`currently_loading` busy-wait has a TOCTOU window). For most projects
this is benign because concurrent loads of the SAME dep are rare. Pass
`serial` on the command line to disable parallel loading if you hit
issues.
"""
with concurrent.futures.ThreadPoolExecutor() as e:
if root.config.update and not root.config.serial_load:
root.config.parallel_load = True

ssh_multiplex.set_verbose(root.config.verbose)
ssh_multiplex.init_fetch_semaphore(root.config.parallel_max)

with concurrent.futures.ThreadPoolExecutor(max_workers=256) as e:
def load_dependency(dep: BuildDependency):
if dep.already_loaded:
return dep.should_rebuild
Expand Down
13 changes: 10 additions & 3 deletions mama/types/git.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from .dep_source import DepSource
from ..utils.system import Color, System, console, error
from ..utils.sub_process import SubProcess, execute, execute_piped, execute_piped_echo
from ..utils import ssh_multiplex
from ..util import is_dir_empty, save_file_if_contents_changed, read_lines_from, path_join


Expand Down Expand Up @@ -65,7 +66,9 @@ def run_git(self, dep: BuildDependency, git_command, throw=True):
cmd = f"cd {dep.src_dir} && git {git_command}"
if dep.config.verbose:
console(f' {dep.name: <16} git {git_command}', color=Color.YELLOW)
return execute(cmd, throw=throw)
ssh_multiplex.ensure_master_for_url(self.url)
with ssh_multiplex.fetch_slot():
return execute(cmd, throw=throw)


def _has_local_modifications(self, dep: BuildDependency) -> bool:
Expand Down Expand Up @@ -124,7 +127,9 @@ def init_commit_hash(self, dep: BuildDependency, use_cache: bool, fetch_remote:
try:
if self.branch: arguments = self.branch
elif self.tag: arguments = self.tag
result = execute_piped(f'git ls-remote {self.url} {arguments}', timeout=5)
ssh_multiplex.ensure_master_for_url(self.url)
with ssh_multiplex.fetch_slot():
result = execute_piped(f'git ls-remote {self.url} {arguments}', timeout=5)
if result: result = result.split(' ')[0][0:7]
if dep.config.verbose:
console(f' {self.name} git ls-remote {self.url} {arguments}: {result}', color=Color.YELLOW)
Expand Down Expand Up @@ -299,7 +304,9 @@ def print_output(p:SubProcess, line:str):
cmd = f'git clone {clone_args} {clone_to_dir}'
if dep.config.verbose:
console(f' {dep.name: <16} {cmd}')
result = SubProcess.run(cmd, io_func=print_output)
ssh_multiplex.ensure_master_for_url(self.url)
with ssh_multiplex.fetch_slot():
result = SubProcess.run(cmd, io_func=print_output)

# handle the result:
if dep.config.print:
Expand Down
94 changes: 94 additions & 0 deletions mama/utils/mama_ssh.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#!/usr/bin/env python3
"""
GIT_SSH_COMMAND wrapper for mama.

Git invokes us as if we were ssh:
mama_ssh.py [ssh-args] [-p port] [-l user] host command-on-remote...

We figure out the destination, run `ssh -G` once to read the user's effective
config for that host, decide which `-o` options need adding (multiplexing,
keepalives) WITHOUT overriding anything the user already has, then exec ssh
with the augmented args.

This wrapper is invoked many times (once per git op); it must be cheap. We
cache per-host options under $XDG_CACHE_HOME so repeated invocations during
the same mama run don't re-probe.

If anything goes wrong we still exec ssh with the original args — never
break the build because of multiplexing setup.
"""

from __future__ import annotations

import json
import os
import sys
import time

# Allow running as a standalone script without a parent package context.
_THIS_DIR = os.path.dirname(os.path.abspath(__file__))
if __package__ in (None, ''):
sys.path.insert(0, os.path.dirname(_THIS_DIR)) # add mama/ parent to path
from utils import ssh_multiplex # type: ignore
else:
from . import ssh_multiplex


_CACHE_TTL_SECONDS = 60 * 30 # 30 minutes per-host options cache


def _cache_path() -> str:
base = os.environ.get('XDG_CACHE_HOME') or os.path.expanduser('~/.cache')
d = os.path.join(base, 'mama')
os.makedirs(d, exist_ok=True)
return os.path.join(d, 'ssh_host_options.json')


def _load_cache() -> dict:
try:
with open(_cache_path(), 'r') as f:
return json.load(f)
except (OSError, ValueError):
return {}


def _save_cache(cache: dict) -> None:
path = _cache_path()
tmp = path + '.tmp'
try:
with open(tmp, 'w') as f:
json.dump(cache, f)
os.replace(tmp, path)
except OSError:
pass


def _options_for(user: str, host: str, port: str | None) -> list[str]:
key = ssh_multiplex.host_key(user, host, port)
cache = _load_cache()
entry = cache.get(key)
if entry and time.time() - entry.get('ts', 0) < _CACHE_TTL_SECONDS:
return list(entry.get('opts', []))
probe = ssh_multiplex.probe_ssh_config(user, host, port)
opts, _ = ssh_multiplex.options_to_add(probe)
cache[key] = {'ts': time.time(), 'opts': opts}
_save_cache(cache)
return opts


def main(argv: list[str]) -> int:
args = argv[1:]
extra: list[str] = []
parsed = ssh_multiplex.parse_host_from_ssh_args(args)
if parsed is not None:
user, host, port = parsed
try:
extra = _options_for(user, host, port)
except Exception:
extra = []
final = ['ssh', *extra, *args]
os.execvp('ssh', final)


if __name__ == '__main__':
raise SystemExit(main(sys.argv))
Loading