-
Notifications
You must be signed in to change notification settings - Fork 1
Resolve "Ewoks install requirements should contain the python and pip version" #65
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
0a33367
e82bc4f
cd02d86
f881d06
3878cdc
37d089e
86b5993
1c2d1de
19dcca9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1 +1,101 @@ | ||
| """Workflow requirements.""" | ||
|
|
||
| import logging | ||
| from typing import Optional | ||
| from typing import Tuple | ||
| from typing import Union | ||
|
|
||
| from ewokscore.graph import TaskGraph | ||
|
|
||
| from .utils import parse | ||
| from .utils.base_manager import BaseRequirements | ||
| from .utils.detect import get_manager | ||
| from .utils.metadata import last_resort | ||
|
|
||
| logger = logging.getLogger(__file__) | ||
|
|
||
|
|
||
| def add_requirements( | ||
| graph: TaskGraph, | ||
| manager_name: Optional[str] = None, | ||
| manager_command: Tuple[str, ...] = tuple(), | ||
| ) -> None: | ||
| """Add requirements to a workflow definition in-place.""" | ||
| manager = get_manager(manager_name=manager_name, manager_command=manager_command) | ||
| requirements = manager.gather_requirements() | ||
| graph.graph.graph["requirements"] = requirements.model_dump() | ||
|
|
||
|
|
||
| def get_requirements(graph: TaskGraph) -> BaseRequirements: | ||
| """Extract requirements from a workflow definition.""" | ||
| requirements = graph.graph.graph.get("requirements", None) | ||
| no_requirements = not requirements | ||
|
|
||
| if no_requirements: | ||
| logger.warning( | ||
| "BaseRequirements field is empty. Trying to extract requirements automatically..." | ||
| ) | ||
| requirements = last_resort.last_resort_requirements(graph) | ||
|
|
||
| requirements = parse.parse_requirements(requirements) | ||
|
|
||
| if no_requirements: | ||
| logger.info(f"Extracted the following requirements: {requirements.__info__()}") | ||
|
|
||
| return requirements | ||
|
|
||
|
|
||
| def install_requirements( | ||
| requirements: BaseRequirements, | ||
| manager_name: Optional[str] = None, | ||
| manager_command: Union[None, str, Tuple[str, ...]] = None, | ||
| ) -> None: | ||
| """Install workflow requirements.""" | ||
|
|
||
| if manager_command and not manager_name: | ||
| raise ValueError( | ||
| f"Provide 'manager_name' associated to command {manager_command}" | ||
| ) | ||
|
|
||
| try: | ||
| if manager_name: | ||
| raise ValueError("Ignore package manager used to generate the requirements") | ||
| else: | ||
| manager = get_manager(manager_name=requirements.manager.name) | ||
| except ValueError: | ||
| manager = get_manager( | ||
| manager_name=manager_name, manager_command=manager_command | ||
| ) | ||
|
|
||
| manager.install_requirements(requirements) | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| logging.basicConfig(level=logging.DEBUG) | ||
| import time | ||
| from pprint import pprint | ||
|
|
||
| t0 = time.perf_counter() | ||
|
|
||
| try: | ||
| manager = get_manager(manager_name="pip") | ||
| requirements = manager.gather_requirements() | ||
|
|
||
| print() | ||
| print("Model:") | ||
| pprint(requirements.model_dump()) | ||
| finally: | ||
| print("Freeze time:", time.perf_counter() - t0) | ||
|
|
||
| pip_freeze = requirements.manager.freeze | ||
|
|
||
| dists_freeze = manager._freeze_distributions(requirements) | ||
| dists_freeze = [s for s in dists_freeze if not s.startswith("#")] | ||
|
|
||
| print() | ||
| print("pip freeze has these extra's:") | ||
| pprint(set(pip_freeze) - set(dists_freeze)) | ||
|
|
||
| print() | ||
| print("native freeze has these extra's:") | ||
| pprint(set(dists_freeze) - set(pip_freeze)) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,80 @@ | ||
| import logging | ||
| import os | ||
| import sys | ||
| from typing import Any | ||
| from typing import Dict | ||
| from typing import Literal | ||
| from typing import Optional | ||
| from typing import Tuple | ||
|
|
||
| import yaml | ||
|
|
||
| from .utils.base_manager import BaseManager | ||
| from .utils.base_manager import BaseManagerInfo | ||
| from .utils.base_manager import BaseRequirements | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class CondaManagerInfo(BaseManagerInfo): | ||
| name: Literal["conda"] = "conda" | ||
|
|
||
|
|
||
| class CondaRequirements(BaseRequirements): | ||
| manager: CondaManagerInfo | ||
| environment: dict | ||
|
|
||
|
|
||
| class CondaManager(BaseManager): | ||
| NAME = "conda" | ||
| PRIORITY = 4 | ||
| REQUIREMENTS_MODEL = CondaRequirements | ||
|
|
||
| def __init__(self, *command: str) -> None: | ||
| if not command: | ||
| command = self._get_conda_command() | ||
| super().__init__(*command) | ||
|
|
||
| def version(self) -> Optional[str]: | ||
| """Returns None when this manager is not available.""" | ||
| try: | ||
| output = self._check_output("--version") | ||
| except RuntimeError: | ||
| return None | ||
| return output.strip().split(" ")[-1] | ||
|
|
||
| def is_active(self) -> bool: | ||
| """Manager is explicitly active.""" | ||
| return "CONDA_PREFIX" in os.environ or os.path.exists( | ||
| os.path.join(sys.prefix, "conda-meta") | ||
| ) | ||
|
|
||
| def _gather_requirements(self) -> Dict[str, Any]: | ||
| output = self._check_output("env", "export") | ||
| environment = yaml.safe_load(output) | ||
| environment.pop("name", None) | ||
| environment.pop("prefix", None) | ||
|
|
||
| return {"environment": environment} | ||
|
|
||
| def _install_native_requirements(self, requirements: CondaRequirements) -> bool: | ||
| text = yaml.safe_dump(requirements.environment) | ||
| with self._temporary_file(text, ".yml") as tmp_path: | ||
| self._check_call("env", "update", "-f", tmp_path) | ||
| return True | ||
|
|
||
| def _install_base_requirements(self, requirements: BaseRequirements) -> bool: | ||
| raise NotImplementedError(f"{self.NAME} installation of python distributions") | ||
|
|
||
| def _get_conda_command(self) -> Tuple[str, ...]: | ||
| try: | ||
| _ = self._check_output_raw("mamba", "--version") | ||
| return ("mamba",) | ||
| except Exception: | ||
| pass | ||
| try: | ||
| _ = self._check_output_raw("micromamba", "--version") | ||
| return ("micromamba",) | ||
| except Exception: | ||
| pass | ||
| return ("conda",) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,92 @@ | ||
| import importlib.metadata | ||
| import logging | ||
| import sys | ||
| from typing import Any | ||
| from typing import Dict | ||
| from typing import List | ||
| from typing import Literal | ||
| from typing import Optional | ||
|
|
||
| from .utils import pip_freeze | ||
| from .utils.base_manager import BaseManager | ||
| from .utils.base_manager import BaseManagerInfo | ||
| from .utils.base_manager import BaseRequirements | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class PipManagerInfo(BaseManagerInfo): | ||
| name: Literal["pip"] = "pip" | ||
| freeze: List[str] | ||
|
|
||
|
|
||
| class PipRequirements(BaseRequirements): | ||
| manager: PipManagerInfo | ||
|
|
||
| def __info__(self) -> str: | ||
| freeze = "\n ".join(self.manager.freeze) | ||
| return f"{super().__info__()}\nRequirements:\n {freeze}" | ||
|
|
||
|
|
||
| class PipManager(BaseManager): | ||
| NAME = "pip" | ||
| PRIORITY = 0 | ||
| REQUIREMENTS_MODEL = PipRequirements | ||
|
|
||
| def __init__(self, *command: str) -> None: | ||
| if not command: | ||
| command = sys.executable, "-m", "pip" | ||
| super().__init__(*command) | ||
|
|
||
| def version(self) -> Optional[str]: | ||
| """Returns None when this manager is not available.""" | ||
| try: | ||
| return importlib.metadata.version("pip") | ||
| except importlib.metadata.PackageNotFoundError: | ||
| return None | ||
|
|
||
| def is_active(self) -> bool: | ||
| """Manager is explicitly active.""" | ||
| return False | ||
|
|
||
| def _gather_requirements(self) -> Dict[str, Any]: | ||
| freeze_output = self._check_output("freeze").strip().splitlines() | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We gather a freeze list in addition to the distributions list extracted from Both take 0.5 sec in my current environment. Perhaps gathering the freeze list can be omitted at some point. There are still things I did not test yet (like non-git VCS): https://packaging.python.org/en/latest/specifications/direct-url-data-structure/ |
||
|
|
||
| return {"freeze": freeze_output} | ||
|
|
||
| def _install_native_requirements(self, requirements: PipRequirements) -> bool: | ||
| freeze = requirements.manager.freeze | ||
|
|
||
| if not freeze: | ||
| return False | ||
|
|
||
| arguments = self._arguments(freeze) | ||
| self._check_call("install", "--no-cache-dir", *arguments) | ||
| return True | ||
|
|
||
| def _install_base_requirements(self, requirements: BaseRequirements) -> bool: | ||
| freeze = self._freeze_distributions(requirements) | ||
| if not freeze: | ||
| return False | ||
|
|
||
| arguments = self._arguments(freeze) | ||
| self._check_call("install", "--no-cache-dir", *arguments) | ||
| return True | ||
|
|
||
| def _freeze_distributions(self, requirements: BaseRequirements) -> List[str]: | ||
| """ | ||
| Pip freeze argument from list of distributions. | ||
| """ | ||
| freeze = [] | ||
| for dist in requirements.distributions: | ||
| lines, warnings = pip_freeze.freeze_distribution(dist) | ||
| for warning in warnings: | ||
| logger.warning(warning) | ||
| freeze.extend(lines) | ||
| return freeze | ||
|
|
||
| def _arguments(self, freeze: List[str]) -> List[str]: | ||
| arguments, warnings = pip_freeze.sanitize_freeze(freeze) | ||
| for warning in warnings: | ||
| logger.warning(warning) | ||
| return arguments | ||
This file was deleted.
This file was deleted.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,74 @@ | ||
| import importlib.metadata | ||
| import json | ||
| import os | ||
| import sys | ||
| from typing import Any | ||
| from typing import Dict | ||
| from typing import List | ||
| from typing import Literal | ||
| from typing import Optional | ||
|
|
||
| from .utils.base_manager import BaseManager | ||
| from .utils.base_manager import BaseManagerInfo | ||
| from .utils.base_manager import BaseRequirements | ||
|
|
||
|
|
||
| class PipenvManagerInfo(BaseManagerInfo): | ||
| name: Literal["pipenv"] = "pipenv" | ||
| requirements: List[str] | ||
|
|
||
|
|
||
| class PipenvRequirements(BaseRequirements): | ||
| manager: PipenvManagerInfo | ||
|
|
||
| def __info__(self) -> str: | ||
| requirements = "\n ".join(self.manager.requirements) | ||
| return f"{super().__info__()}\nRequirements:\n {requirements}" | ||
|
|
||
|
|
||
| class PipenvManager(BaseManager): | ||
| NAME = "pipenv" | ||
| PRIORITY = 3 | ||
| REQUIREMENTS_MODEL = PipenvRequirements | ||
|
|
||
| def __init__(self, *command: str) -> None: | ||
| if not command: | ||
| command = sys.executable, "-m", "pipenv" | ||
| super().__init__(*command) | ||
|
|
||
| def version(self) -> Optional[str]: | ||
| """Returns None when this manager is not available.""" | ||
| try: | ||
| return importlib.metadata.version("pipenv") | ||
| except importlib.metadata.PackageNotFoundError: | ||
| return None | ||
|
|
||
| def is_active(self) -> bool: | ||
| """Manager is explicitly active.""" | ||
| return "PIPENV_ACTIVE" in os.environ | ||
|
|
||
| def _gather_requirements(self) -> Dict[str, Any]: | ||
| output = self._check_output("lock", "--requirements") | ||
| requirements = output.strip().splitlines() | ||
|
|
||
| return {"requirements": requirements} | ||
|
|
||
| def _install_native_requirements(self, requirements: PipenvRequirements) -> bool: | ||
| lock_data = { | ||
| "_meta": {"hash": {"sha256": "dummy"}}, # minimal metadata | ||
| "default": { | ||
| pkg.split("==")[0]: {"version": pkg.split("==")[1]} | ||
| for pkg in requirements.requirements | ||
| }, | ||
| "develop": { | ||
| pkg.split("==")[0]: {"version": pkg.split("==")[1]} | ||
| for pkg in getattr(requirements, "dev_requirements", []) | ||
| }, | ||
| } | ||
| text = json.dumps(lock_data, indent=2) | ||
|
|
||
| with self._temporary_file(text, ".lock") as tmp_path: | ||
| self._check_call("sync", "--ignore-pipfile", "-f", tmp_path) | ||
|
|
||
| def _install_base_requirements(self, requirements: BaseRequirements) -> bool: | ||
| raise NotImplementedError(f"{self.NAME} installation of python distributions") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That seems difficult to encompass all
condaexecutables.I think there is also
minicondafor example ?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The command is always conda or mamba, except for micromamba iirc.