From c2168f7cc3fb42f7a078358002cb11112fd2e92a Mon Sep 17 00:00:00 2001 From: gertbehi Date: Sat, 28 Feb 2026 14:03:54 +0100 Subject: [PATCH 01/13] Initial commit: LogFlow - Modern, multiprocess-safe logging for ML --- .flake8 | 13 ++++ .gitignore | 137 ++++++++++++++++++++++++++++++++++ Jenkinsfile | 63 ++++++++++++++++ RATIONALE.md | 45 +++++++++++ README.md | 52 +++++++++++++ examples/mp_demo.py | 50 +++++++++++++ logflow/__init__.py | 7 ++ logflow/core.py | 148 +++++++++++++++++++++++++++++++++++++ logflow/discovery.py | 64 ++++++++++++++++ logflow/intercept.py | 62 ++++++++++++++++ pyproject.toml | 73 ++++++++++++++++++ tests/test_core.py | 40 ++++++++++ tests/test_discovery.py | 27 +++++++ tests/test_multiprocess.py | 50 +++++++++++++ 14 files changed, 831 insertions(+) create mode 100644 .flake8 create mode 100644 .gitignore create mode 100644 Jenkinsfile create mode 100644 RATIONALE.md create mode 100644 README.md create mode 100644 examples/mp_demo.py create mode 100644 logflow/__init__.py create mode 100644 logflow/core.py create mode 100644 logflow/discovery.py create mode 100644 logflow/intercept.py create mode 100644 pyproject.toml create mode 100644 tests/test_core.py create mode 100644 tests/test_discovery.py create mode 100644 tests/test_multiprocess.py diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..3012122 --- /dev/null +++ b/.flake8 @@ -0,0 +1,13 @@ +[flake8] +max-line-length = 120 +extend-ignore = E203, W503 +exclude = + .git, + __pycache__, + build, + dist, + .venv, + .mypy_cache, + .pytest_cache +max-complexity = 15 +select = B,C,E,F,W,T4,B9 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d7d9428 --- /dev/null +++ b/.gitignore @@ -0,0 +1,137 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script, and contain python code for +# the collector to be able to find dependencies. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the user is +# responsible for their own environments. +# .python-version + +# pipenv +# According to pypa/pipenv#1181, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if you are working with a library, you should perhaps +# include Pipfile in your version control system. +# Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is recommended to include poetry.lock in version control. +# poetry.lock + +# pdm +# Similar to Pipfile.lock, it is recommended to include pdm.lock in version control. +# pdm.lock + +# runtime-configuration-files +# Ignore these files for now since they are project-specific. +# .env +# .venv +# venv/ +# ENV/ +# env.bak/ +# venv.bak/ + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +.idea/ + +# VS Code +.vscode/ + +# Project-specific ignores +.venv/ +demo_logs/ +experiment_logs/ +logs/ diff --git a/Jenkinsfile b/Jenkinsfile new file mode 100644 index 0000000..81f31c6 --- /dev/null +++ b/Jenkinsfile @@ -0,0 +1,63 @@ +pipeline { + agent any + + environment { + // Path to the virtual environment relative to the Jenkins workspace + VENV_BIN = "${WORKSPACE}/../.venv/bin" + PYTHONPATH = "${WORKSPACE}/logflow" + } + + stages { + stage('Initialize') { + steps { + echo 'Initializing Workspace...' + // Ensure the environment is set up (usually done by setup script) + sh "${VENV_BIN}/pip install -e ./logflow[dev]" + } + } + + stage('Linting') { + parallel { + stage('Black') { + steps { + sh "${VENV_BIN}/black --check logflow" + } + } + stage('Isort') { + steps { + sh "${VENV_BIN}/isort --check-only logflow" + } + } + stage('Flake8') { + steps { + sh "${VENV_BIN}/flake8 logflow" + } + } + } + } + + stage('Type Check') { + steps { + sh "${VENV_BIN}/mypy logflow" + } + } + + stage('Unit Tests') { + steps { + sh "${VENV_BIN}/pytest logflow/tests" + } + } + } + + post { + always { + echo 'LogFlow Pipeline Complete.' + } + success { + echo 'Project is healthy and ready for publication.' + } + failure { + echo 'Build failed. Please check linting or test failures.' + } + } +} diff --git a/RATIONALE.md b/RATIONALE.md new file mode 100644 index 0000000..22bd36f --- /dev/null +++ b/RATIONALE.md @@ -0,0 +1,45 @@ +# LogFlow: Rationale & Architectural Comparison + +## Executive Summary +**LogFlow** is a modern, multiprocess-safe logging library specifically engineered for High-Performance Computing (HPC) and Machine Learning (ML) environments. While general-purpose logging libraries exist, LogFlow bridges the gap between raw logging primitives and the specialized needs of distributed training (e.g., PyTorch DDP, TensorFlow Distribution). + +--- + +## The Landscape: Existing Alternatives + +| Library | Mechanism | ML/Distributed Suitability | Pros | Cons | +| :--- | :--- | :--- | :--- | :--- | +| **Standard `logging`** | Lock-based (Thread-safe) | **Low**. Requires complex `QueueHandler` setup for MP. | Zero dependencies, built-in. | Extremely verbose setup for MP; no built-in rank awareness. | +| **`Loguru`** | `enqueue=True` (Queue-based) | **Medium**. Great UI/UX, but no native rank/DDP logic. | Beautiful output, thread/MP safe, easy rotation. | Not aware of SLURM/DDP ranks; requires manual wrapping for ML. | +| **`Concurrent-Log-Handler`** | File Locking (`fcntl`/`flock`) | **Low**. Slow on network filesystems (NFS). | Simple to drop-in for standard logging. | High latency; prone to "lock-stale" issues on some clusters. | +| **`Lightning/Accelerate`** | Framework-specific wrappers | **High** (but locked-in). | Automatic rank-0 filtering. | Tied to specific training frameworks; hard to use in standalone scripts. | + +--- + +## Why LogFlow? (The Gap) + +Existing solutions force ML engineers to choose between **ease of use** (Loguru) and **robust distributed logic** (Lightning). LogFlow provides both. + +### 1. Unified Distributed Awareness +LogFlow automatically detects the execution environment (SLURM, TorchRun, MPI) and adjusts its behavior. +- **The "Log Storm" Problem:** In a 128-GPU cluster, standard loggers write 128 identical lines. +- **LogFlow Solution:** Intelligently filters console output to Rank 0 while ensuring all Ranks can optionally write to unique or shared persistent files with atomic safety. + +### 2. Framework Interoperability +ML projects often use a mix of libraries (PyTorch, TensorFlow, JAX, HuggingFace). Each has its own logging style. +- **LogFlow Solution:** Automatically intercepts standard `logging`, `warnings`, and `absl` (TensorFlow) calls, redirecting them into a single, unified, and color-coded stream. + +### 3. Startup-Consistent Rotation +ML experiments are iterative. +- **The Problem:** Standard loggers append to old files or overwrite them, making it hard to find the start of "Experiment #42". +- **LogFlow Solution:** Implements **Startup Rotation**. Every time a script starts, the old log is archived with a timestamp, and a fresh log is created. This ensures 1:1 mapping between a run and a log file. + +### 4. Zero-Latency "Enqueue" +By utilizing a dedicated background process for log sinking, LogFlow ensures that the main training loop (the "Critical Path") is never blocked by I/O operations, even when writing to slow network storage. + +--- + +## Design Goals for Implementation +- **Developer First:** `from logflow import get_logger` should be the only line needed. +- **Framework Agnostic:** Works perfectly in a pure Python script, a Jupyter notebook, or a massive SLURM cluster. +- **Structured by Default:** Optional JSON output for integration with ELK or custom ML dashboards. diff --git a/README.md b/README.md new file mode 100644 index 0000000..7e034f1 --- /dev/null +++ b/README.md @@ -0,0 +1,52 @@ +# LogFlow + +Modern, multiprocess-safe logging specifically engineered for High-Performance Computing (HPC) and Machine Learning (ML). + +## Why LogFlow? +ML experiments and distributed training (like PyTorch DDP) present unique logging challenges: +- **Log Storms:** 128 identical lines when 128 GPUs log simultaneously. +- **Multiprocess Safety:** Corrupted log files when multiple processes write to the same file. +- **Startup Consistency:** Tracking which logs belong to which experiment run. + +LogFlow solves these by being **distributed-aware** and **framework-agnostic**. + +## Key Features +- **Rank-Aware:** Automatically filters console output to Rank 0 (supports SLURM, DDP, MPI). +- **Multiprocess Safe:** Uses Loguru's `enqueue=True` for thread/process safety. +- **Startup Rotation:** Archives old logs on script start, giving every run a fresh log file. +- **Framework Interoperability:** Automatically intercepts and formats logs from **TensorFlow**, **PyTorch**, **JAX**, and standard Python `logging`. +- **Zero-Blocking:** Non-blocking logging via background sinking. + +## Installation +```bash +pip install logflow +``` + +## Quick Start +```python +from logflow import get_logger, configure_logging + +# Optional: customize levels and directories +configure_logging(log_dir="./experiment_logs", console_level="INFO") + +logger = get_logger(__name__) + +logger.info("Starting training loop...") +logger.debug("Hyperparameters: batch_size=32, lr=0.001") +logger.success("Model checkpoint saved!") +``` + +## Distributed Training (DDP/SLURM) +LogFlow handles ranks automatically. No need to wrap your log calls in `if rank == 0:`. +```python +# In a torchrun or SLURM environment +from logflow import get_logger + +logger = get_logger(__name__) + +# Only shows up once in console (Rank 0), but saved in file for all Ranks +logger.info("Initializing process group...") +``` + +## License +MIT diff --git a/examples/mp_demo.py b/examples/mp_demo.py new file mode 100644 index 0000000..2d0747f --- /dev/null +++ b/examples/mp_demo.py @@ -0,0 +1,50 @@ +import multiprocessing as mp +import os +import time + +from logflow import configure_logging, get_logger, shutdown_logging + + +def worker(rank: int) -> None: + # Simulate a distributed environment by setting RANK env var + os.environ["RANK"] = str(rank) + + # Re-initialize configuration in spawned child process + configure_logging(log_dir="./demo_logs", script_name="mp_demo") + + logger = get_logger(f"worker_{rank}") + + # Console: Only Rank 0 shows up + # File: All ranks are saved + logger.info(f"Worker {rank} starting task...") + + time.sleep(0.5) + + if rank % 2 == 0: + logger.success(f"Worker {rank} completed task successfully.") + else: + logger.warning(f"Worker {rank} encountered a minor issue.") + + # Ensure this child process flushes its logs before exiting + shutdown_logging() + + +if __name__ == "__main__": + # Configure main process first + configure_logging(log_dir="./demo_logs", script_name="mp_demo") + main_logger = get_logger("main") + main_logger.info("Main process starting demo with 4 workers...") + + # Spawn 4 workers + processes = [] + for i in range(4): + p = mp.Process(target=worker, args=(i,)) + p.start() + processes.append(p) + + for p in processes: + p.join() + + # Ensure main process flushes its logs before final completion + main_logger.success("Demo completed. Check 'demo_logs/mp_demo.log' for full output.") + shutdown_logging() diff --git a/logflow/__init__.py b/logflow/__init__.py new file mode 100644 index 0000000..5576b5f --- /dev/null +++ b/logflow/__init__.py @@ -0,0 +1,7 @@ +""" +LogFlow: Modern, multiprocess-safe logging for High-Performance Computing and ML. +""" + +from logflow.core import configure_logging, get_logger, shutdown_logging + +__all__ = ["configure_logging", "get_logger", "shutdown_logging"] diff --git a/logflow/core.py b/logflow/core.py new file mode 100644 index 0000000..ec1905f --- /dev/null +++ b/logflow/core.py @@ -0,0 +1,148 @@ +import os +import sys +from datetime import datetime +from pathlib import Path +from typing import Any, Optional, Union + +from loguru import logger + +from logflow.discovery import determine_script_name, get_rank +from logflow.intercept import setup_interception + +# Global state to prevent redundant configuration +_configured = False +_log_file: Optional[Path] = None + + +def configure_logging( + log_dir: Optional[Union[str, Path]] = None, + script_name: Optional[str] = None, + file_level: str = "DEBUG", + console_level: str = "INFO", + rotation_on_startup: bool = True, + retention: int = 5, + enqueue: bool = True, +) -> None: + """ + Configure the global LogFlow system. + + Args: + log_dir: Directory for log files. Defaults to './logs'. + script_name: Base name for log files. Auto-detected if None. + file_level: Minimum level for file logging. + console_level: Minimum level for console logging. + rotation_on_startup: If True, archives existing logs on startup. + retention: Number of archived logs to keep. + enqueue: If True, uses a background process for logging (recommended for MP). + """ + global _configured, _log_file + + # Basic setup + log_dir_path = Path(log_dir) if log_dir else Path(os.getenv("LOGFLOW_DIR", "./logs")) + log_dir_path.mkdir(parents=True, exist_ok=True) + + # Determine script name and rank + script_name = determine_script_name(script_name) + rank = get_rank() + is_rank_zero = rank is None or rank == 0 + + # Multiprocessing awareness + from multiprocessing import current_process + + is_main_process = current_process().name == "MainProcess" + has_rotated = os.getenv("_LOGFLOW_ROTATED") == "1" + + # Remove default Loguru handler + logger.remove() + + # 1. Console Handler (Filtered to Rank 0 by default) + if is_rank_zero: + rank_fmt = "" if rank is None else f"[rank {rank}] | " + console_format = ( + "{time:YYYY-MM-DD HH:mm:ss} | " + "{level: <8} | " + f"{rank_fmt}" + "{name}:{function}:{line} | " + "{message}" + ) + logger.add( + sys.stderr, + level=console_level.upper(), + format=console_format, + colorize=True, + enqueue=enqueue, + ) + + # 2. File Handler + _log_file = log_dir_path / f"{script_name}.log" + + # Handle Startup Rotation (Only on absolute Main Process at Rank 0) + if rotation_on_startup and is_main_process and is_rank_zero and not has_rotated and _log_file.exists(): + try: + mtime = _log_file.stat().st_mtime + ts = datetime.fromtimestamp(mtime).strftime("%Y-%m-%d_%H-%M-%S") + archive_path = _log_file.parent / f"{_log_file.stem}.{ts}{_log_file.suffix}" + _log_file.rename(archive_path) + + # Retention cleanup + archives = sorted( + _log_file.parent.glob(f"{_log_file.stem}.*.*{_log_file.suffix}"), + key=lambda p: p.stat().st_mtime, + reverse=True, + ) + for old in archives[retention:]: + old.unlink() + + # Signal to all future children that rotation is done + os.environ["_LOGFLOW_ROTATED"] = "1" + except Exception as e: + logger.warning(f"Startup rotation failed: {e}") + + # If we are a child and rotation was already done, ensure we don't do it again + if not is_main_process: + os.environ["_LOGFLOW_ROTATED"] = "1" + + rank_seg = "" if rank is None else f"[rank {rank}] | " + file_format = ( + "{time:YYYY-MM-DD HH:mm:ss.SSS} | " "{level: <8} | " f"{rank_seg}" "{name}:{function}:{line} | " "{message}" + ) + + logger.add( + str(_log_file), + level=file_level.upper(), + format=file_format, + enqueue=enqueue, + rotation="10 MB", # Built-in size rotation + retention=retention, + backtrace=True, + diagnose=True, + ) + + # 3. Intercept framework logs + setup_interception() + + _configured = True + if is_rank_zero: + logger.info(f"LogFlow initialized (Rank: {rank if rank is not None else 'N/A'})") + + +def shutdown_logging() -> None: + """ + Ensure all queued log messages are processed and all sinks are closed. + Call this before the main script exits to ensure no logs are lost. + """ + try: + logger.complete() + except Exception: + pass + + +def get_logger(name: Optional[str] = None) -> Any: + """ + Get a logger instance bound to the given name. + """ + if not _configured: + # Auto-configure with defaults if not explicitly called + configure_logging() + + return logger.bind(name=name) if name else logger diff --git a/logflow/discovery.py b/logflow/discovery.py new file mode 100644 index 0000000..8368fb6 --- /dev/null +++ b/logflow/discovery.py @@ -0,0 +1,64 @@ +import os +import sys +from pathlib import Path +from typing import Optional + + +def get_rank() -> Optional[int]: + """ + Detect the rank of the current process in a distributed environment. + Supports PyTorch DDP (torchrun), SLURM, and MPI. + + Returns: + The global rank as an integer, or None if not in a distributed environment. + """ + # 1. PyTorch DDP / torchrun + for var in ("RANK", "SLURM_PROCID"): + val = os.environ.get(var) + if val is not None: + try: + return int(val) + except ValueError: + pass + + # 2. Lightning / Generic DDP (Local + Node Rank) + local_rank = os.environ.get("LOCAL_RANK") + if local_rank is not None: + try: + node_rank = int(os.environ.get("NODE_RANK", os.environ.get("GROUP_RANK", "0"))) + lr = int(local_rank) + # Best effort global rank calculation + device_count = int(os.environ.get("LOCAL_WORLD_SIZE", "1")) + return node_rank * device_count + lr + except ValueError: + pass + + return None + + +def determine_script_name(explicit: Optional[str] = None) -> str: + """ + Determine a sensible name for the log file based on execution context. + """ + if explicit: + return explicit + + # Check env var for consistency across processes + env_name = os.getenv("LOGFLOW_SCRIPT_NAME") + if env_name: + return env_name + + # Try to infer from __main__ + main_module = sys.modules.get("__main__") + if main_module and hasattr(main_module, "__file__") and main_module.__file__: + path = Path(main_module.__file__) + if path.name == "__main__.py": + # If package run, use parent folder name + return path.parent.name or "app" + return path.stem + + # Fallback to sys.argv[0] if it's not a flag + if sys.argv and sys.argv[0] and not sys.argv[0].startswith("-"): + return Path(sys.argv[0]).stem + + return "app" diff --git a/logflow/intercept.py b/logflow/intercept.py new file mode 100644 index 0000000..2f170f2 --- /dev/null +++ b/logflow/intercept.py @@ -0,0 +1,62 @@ +import logging +import sys +import warnings +from types import FrameType +from typing import Any, Optional, Union + +from loguru import logger + + +class InterceptHandler(logging.Handler): + """ + Intercept standard logging messages and redirect them to Loguru. + """ + + def emit(self, record: logging.LogRecord) -> None: + # Get corresponding Loguru level if it exists + level: Union[str, int] + try: + level = logger.level(record.levelname).name + except ValueError: + level = record.levelno + + # Find caller from where the logging call originated + frame: Optional[FrameType] = sys._getframe(6) + depth = 6 + while frame and frame.f_code.co_filename == logging.__file__: + frame = frame.f_back + depth += 1 + + logger.opt(depth=depth, exception=record.exc_info).log(level, record.getMessage()) + + +def redirect_warnings( + message: Union[Warning, str], + category: type[Warning], + filename: str, + lineno: int, + file: Optional[Any] = None, + line: Optional[Any] = None, +) -> None: + """ + Redirect Python warnings to loguru. + """ + logger.opt(depth=2).warning(f"{category.__name__}: {message} ({filename}:{lineno})") + + +def setup_interception() -> None: + """ + Configure standard logging and warnings to use Loguru. + """ + # Redirect standard logging + logging.root.handlers = [InterceptHandler()] + logging.root.setLevel(logging.DEBUG) + + # Reconfigure existing loggers + for name in logging.root.manager.loggerDict.keys(): + existing_logger = logging.getLogger(name) + existing_logger.handlers = [] + existing_logger.propagate = True + + # Redirect warnings + warnings.showwarning = redirect_warnings diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..203f4c6 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,73 @@ +[build-system] +requires = ["setuptools>=61.0", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "logflow" +version = "0.1.0" +description = "Modern, multiprocess-safe logging for High-Performance Computing and ML." +readme = "README.md" +authors = [{name = "gearlux"}] +license = {text = "MIT"} +classifiers = [ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", + "Topic :: Scientific/Engineering :: Artificial Intelligence", + "Topic :: System :: Logging", +] +dependencies = [ + "loguru>=0.7.0", +] +requires-python = ">=3.8" + +[project.optional-dependencies] +dev = [ + "black>=23.0.0", + "isort>=5.12.0", + "flake8>=6.0.0", + "mypy>=1.0.0", + "pytest>=7.0.0", +] + +[project.urls] +Homepage = "https://github.com/gearlux/logflow" +Repository = "https://github.com/gearlux/logflow.git" +Issues = "https://github.com/gearlux/logflow/issues" + +[tool.setuptools.packages.find] +where = ["."] +include = ["logflow*"] + +[tool.black] +line-length = 120 +target-version = ['py38'] +include = '\.pyi?$' + +[tool.isort] +profile = "black" +line_length = 120 +multi_line_output = 3 +include_trailing_comma = true +force_grid_wrap = 0 +use_parentheses = true +ensure_newline_before_comments = true + +[tool.mypy] +python_version = "3.12" +warn_return_any = true +warn_unused_configs = true +disallow_untyped_defs = true +disallow_incomplete_defs = true +check_untyped_defs = true +disallow_untyped_decorators = false +no_implicit_optional = true +warn_redundant_casts = true +warn_unused_ignores = true +warn_no_return = true +warn_unreachable = true +strict_optional = true + +[[tool.mypy.overrides]] +module = "loguru.*" +ignore_missing_imports = true diff --git a/tests/test_core.py b/tests/test_core.py new file mode 100644 index 0000000..e32f114 --- /dev/null +++ b/tests/test_core.py @@ -0,0 +1,40 @@ +import os +import shutil +from pathlib import Path +from logflow import configure_logging, get_logger, shutdown_logging + +def test_configure_and_log(tmp_path): + log_dir = tmp_path / "logs" + configure_logging(log_dir=log_dir, script_name="test_run") + + logger = get_logger("test_module") + logger.info("Test message") + + # Ensure file is created + log_file = log_dir / "test_run.log" + assert log_file.exists() + + shutdown_logging() + + content = log_file.read_text() + assert "test_core" in content + assert "Test message" in content + +def test_startup_rotation(tmp_path): + log_dir = tmp_path / "rotation_test" + log_dir.mkdir() + log_file = log_dir / "rotate.log" + log_file.write_text("old content") + + # Configure twice to trigger rotation + configure_logging(log_dir=log_dir, script_name="rotate", rotation_on_startup=True) + shutdown_logging() + + # Check that a rotated file exists + rotated_files = list(log_dir.glob("rotate.*.log")) + assert len(rotated_files) == 1 + assert rotated_files[0].read_text() == "old content" + + # New log should be empty or have new content + assert log_file.exists() + assert "old content" not in log_file.read_text() diff --git a/tests/test_discovery.py b/tests/test_discovery.py new file mode 100644 index 0000000..d37e1ec --- /dev/null +++ b/tests/test_discovery.py @@ -0,0 +1,27 @@ +import os +from logflow.discovery import get_rank, determine_script_name + +def test_get_rank_none(): + # Ensure rank is None when no env vars are set + if "RANK" in os.environ: del os.environ["RANK"] + if "SLURM_PROCID" in os.environ: del os.environ["SLURM_PROCID"] + if "LOCAL_RANK" in os.environ: del os.environ["LOCAL_RANK"] + assert get_rank() is None + +def test_get_rank_torchrun(): + os.environ["RANK"] = "3" + assert get_rank() == 3 + del os.environ["RANK"] + +def test_get_rank_slurm(): + os.environ["SLURM_PROCID"] = "5" + assert get_rank() == 5 + del os.environ["SLURM_PROCID"] + +def test_determine_script_name_explicit(): + assert determine_script_name("custom_name") == "custom_name" + +def test_determine_script_name_env(): + os.environ["LOGFLOW_SCRIPT_NAME"] = "env_name" + assert determine_script_name() == "env_name" + del os.environ["LOGFLOW_SCRIPT_NAME"] diff --git a/tests/test_multiprocess.py b/tests/test_multiprocess.py new file mode 100644 index 0000000..aa9b918 --- /dev/null +++ b/tests/test_multiprocess.py @@ -0,0 +1,50 @@ +import multiprocessing as mp +import os +import time +from pathlib import Path +from logflow import configure_logging, get_logger, shutdown_logging + +def worker(rank: int, log_dir: Path, script_name: str): + os.environ["RANK"] = str(rank) + # Signal that we are in a child so rotation doesn't happen again + # In a real app this is inherited from parent's env + os.environ["_LOGFLOW_ROTATED"] = "1" + + configure_logging(log_dir=log_dir, script_name=script_name) + logger = get_logger(f"worker_{rank}") + logger.info(f"Worker {rank} log message") + shutdown_logging() + +def test_multiprocess_safety(tmp_path): + log_dir = tmp_path / "mp_test" + script_name = "mp_safety" + + # Initialize parent + os.environ["_LOGFLOW_ROTATED"] = "0" + configure_logging(log_dir=log_dir, script_name=script_name) + main_logger = get_logger("main") + main_logger.info("Main start") + + processes = [] + num_workers = 4 + for i in range(num_workers): + p = mp.Process(target=worker, args=(i, log_dir, script_name)) + p.start() + processes.append(p) + + for p in processes: + p.join() + + main_logger.info("Main end") + shutdown_logging() + + # Verify results + log_file = log_dir / f"{script_name}.log" + assert log_file.exists() + content = log_file.read_text() + + for i in range(num_workers): + assert f"Worker {i} log message" in content + + assert "Main start" in content + assert "Main end" in content From dd0d53aa9a73a5c730e8d1c87fdceed3cb9b2223 Mon Sep 17 00:00:00 2001 From: gertbehi Date: Sat, 28 Feb 2026 16:19:35 +0100 Subject: [PATCH 02/13] config file --- FUTURE_DEVELOPMENT.md | 30 +++++++++++++++++ Jenkinsfile | 25 +++++++------- README.md | 36 ++++++++++++++++++++ logflow.example.yaml | 48 +++++++++++++++++++++++++++ logflow/config.py | 65 ++++++++++++++++++++++++++++++++++++ logflow/core.py | 77 ++++++++++++++++++++++++++++++------------- pyproject.toml | 2 ++ tests/test_config.py | 45 +++++++++++++++++++++++++ 8 files changed, 295 insertions(+), 33 deletions(-) create mode 100644 FUTURE_DEVELOPMENT.md create mode 100644 logflow.example.yaml create mode 100644 logflow/config.py create mode 100644 tests/test_config.py diff --git a/FUTURE_DEVELOPMENT.md b/FUTURE_DEVELOPMENT.md new file mode 100644 index 0000000..6ee5da7 --- /dev/null +++ b/FUTURE_DEVELOPMENT.md @@ -0,0 +1,30 @@ +# LogFlow: Future Development & Roadmap + +This document outlines the strategic enhancements and next-level features for **LogFlow**, intended to solidify its position as the premier logging solution for High-Performance Computing (HPC) and Machine Learning (ML). + +--- + +## 1. JSON Structured Logging (Observability) +**Goal:** Make logs machine-readable for modern observability stacks (ELK, Splunk, Grafana Loki). +- **Implementation:** Add a `serialize=True` option to file sinks. +- **Benefit:** Allows for easy parsing, filtering, and aggregation of distributed training logs in centralized dashboards. + +## 2. Automatic Experiment Context (ML Lifecycle) +**Goal:** Automatically inject ML-specific metadata into every log record. +- **Implementation:** Create a context manager/provider for `epoch`, `step`, and `experiment_id`. +- **Benefit:** Eliminates the need for manual `logger.bind` in every function; logs automatically carry their training context. + +## 3. Rich Framework Interoperability +**Goal:** Deep integration with specialized ML frameworks. +- **Implementation:** Specialized adapters for TensorFlow (`absl`), PyTorch Lightning, and JAX. +- **Benefit:** Preserves framework-specific metadata (component names, internal timestamps) while maintaining a unified UI. + +## 4. Dynamic Reconfiguration (Runtime Control) +**Goal:** Adjust log levels without restarting long-running training jobs. +- **Implementation:** Use Unix signals (e.g., SIGHUP) or a file watcher to reload configuration. +- **Benefit:** Allows developers to increase verbosity (e.g., INFO -> DEBUG) to diagnose a mid-training anomaly on the fly. + +## 5. Performance Optimization (Zero-Copy) +**Goal:** Further reduce the impact of logging on the "Critical Path" of training. +- **Implementation:** Explore zero-copy serialization or specialized background threads for high-volume metric logging. +- **Benefit:** Ensures that logging overhead never impacts GPU utilization or training throughput. diff --git a/Jenkinsfile b/Jenkinsfile index 81f31c6..885c44b 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -2,17 +2,20 @@ pipeline { agent any environment { - // Path to the virtual environment relative to the Jenkins workspace - VENV_BIN = "${WORKSPACE}/../.venv/bin" - PYTHONPATH = "${WORKSPACE}/logflow" + // Local virtual environment within the Jenkins workspace for portability + VENV_PATH = "${WORKSPACE}/.venv" + VENV_BIN = "${VENV_PATH}/bin" } stages { stage('Initialize') { steps { - echo 'Initializing Workspace...' - // Ensure the environment is set up (usually done by setup script) - sh "${VENV_BIN}/pip install -e ./logflow[dev]" + echo 'Creating Isolated Virtual Environment...' + sh "python3 -m venv ${VENV_PATH}" + + echo 'Installing Dependencies in Editable Mode...' + sh "${VENV_BIN}/pip install --upgrade pip" + sh "${VENV_BIN}/pip install -e .[dev]" } } @@ -20,17 +23,17 @@ pipeline { parallel { stage('Black') { steps { - sh "${VENV_BIN}/black --check logflow" + sh "${VENV_BIN}/black --check logflow tests examples" } } stage('Isort') { steps { - sh "${VENV_BIN}/isort --check-only logflow" + sh "${VENV_BIN}/isort --check-only logflow tests examples" } } stage('Flake8') { steps { - sh "${VENV_BIN}/flake8 logflow" + sh "${VENV_BIN}/flake8 logflow tests examples" } } } @@ -38,13 +41,13 @@ pipeline { stage('Type Check') { steps { - sh "${VENV_BIN}/mypy logflow" + sh "${VENV_BIN}/mypy logflow tests examples" } } stage('Unit Tests') { steps { - sh "${VENV_BIN}/pytest logflow/tests" + sh "${VENV_BIN}/pytest tests" } } } diff --git a/README.md b/README.md index 7e034f1..e1b3652 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,42 @@ logger.debug("Hyperparameters: batch_size=32, lr=0.001") logger.success("Model checkpoint saved!") ``` +## Configuration +LogFlow supports a hierarchical configuration system that allows you to manage settings across different projects and environments. + +### 1. Configuration Priority +Settings are resolved in the following order (highest to lowest): +1. **Function Arguments** (passed to `configure_logging()`) +2. **Environment Variables** (prefixed with `LOGFLOW_`) +3. **Local `logflow.yaml` / `logflow.yml`** +4. **Local `pyproject.toml`** (under `[tool.logflow]`) +5. **XDG User Config** (`~/.config/logflow/config.yaml`) +6. **Defaults** + +### 2. Usage Examples + +#### via `pyproject.toml` +```toml +[tool.logflow] +log_dir = "./custom_logs" +console_level = "DEBUG" +retention = 10 +``` + +#### via `logflow.yaml` +```yaml +log_dir: "./experiment_logs" +file_level: "TRACE" +enqueue: true +rotation_on_startup: true +``` + +#### via Environment Variables +```bash +export LOGFLOW_DIR="/var/log/myapp" +export LOGFLOW_CONSOLE_LEVEL="ERROR" +``` + ## Distributed Training (DDP/SLURM) LogFlow handles ranks automatically. No need to wrap your log calls in `if rank == 0:`. ```python diff --git a/logflow.example.yaml b/logflow.example.yaml new file mode 100644 index 0000000..a3024c1 --- /dev/null +++ b/logflow.example.yaml @@ -0,0 +1,48 @@ +# LogFlow: Example Configuration File +# This file illustrates all available configuration options for LogFlow. +# You can use this as a template for your project's logflow.yaml or ~/.config/logflow/config.yaml. + +# --- General Settings --- + +# Directory where log files will be stored. +# Default: "./logs" +# Env: LOGFLOW_DIR +log_dir: "./logs" + +# Base name for the log file (e.g., training.log). +# If omitted, LogFlow will auto-detect the name of the running script. +# Env: LOGFLOW_SCRIPT_NAME +script_name: "my_app" + +# Use a background process for logging. +# Strongly recommended for multiprocess (ML) environments to prevent I/O blocking. +# Default: true +# Env: LOGFLOW_ENQUEUE +enqueue: true + +# --- Rotation & Retention --- + +# Archive the existing log file on script startup by renaming it with a timestamp. +# This ensures every training run gets a fresh log file. +# Default: true +# Env: LOGFLOW_STARTUP_ROTATION +rotation_on_startup: true + +# The number of archived log files to keep before deleting the oldest ones. +# Default: 5 +# Env: LOGFLOW_RETENTION +retention: 5 + +# --- Log Levels --- + +# Minimum level for logs written to the persistent file. +# Options: TRACE, DEBUG, INFO, SUCCESS, WARNING, ERROR, CRITICAL +# Default: "DEBUG" +# Env: LOGFLOW_FILE_LEVEL +file_level: "DEBUG" + +# Minimum level for logs displayed in the console. +# Options: TRACE, DEBUG, INFO, SUCCESS, WARNING, ERROR, CRITICAL +# Default: "INFO" +# Env: LOGFLOW_CONSOLE_LEVEL +console_level: "INFO" diff --git a/logflow/config.py b/logflow/config.py new file mode 100644 index 0000000..623586f --- /dev/null +++ b/logflow/config.py @@ -0,0 +1,65 @@ +import os +import sys +from pathlib import Path +from typing import Any, Dict, Optional + +import yaml + +# For Python < 3.11, use tomli +if sys.version_info >= (3, 11): + import tomllib +else: + import tomli as tomllib + + +def get_xdg_config_dir() -> Path: + """Return the XDG config directory (e.g., ~/.config/logflow).""" + xdg_config_home = os.getenv("XDG_CONFIG_HOME") + if xdg_config_home: + return Path(xdg_config_home) / "logflow" + return Path.home() / ".config" / "logflow" + + +def load_config() -> Dict[str, Any]: + """ + Load configuration from multiple sources with the following priority: + 1. Local logflow.yaml / logflow.yml + 2. Local pyproject.toml ([tool.logflow] section) + 3. XDG User Config (~/.config/logflow/config.yaml) + + Returns: + A dictionary containing the merged configuration. + """ + config: Dict[str, Any] = {} + + # 1. XDG User Config (lowest priority) + xdg_path = get_xdg_config_dir() / "config.yaml" + if xdg_path.exists(): + try: + with open(xdg_path, "r") as f: + config.update(yaml.safe_load(f) or {}) + except Exception: + pass + + # 2. Local pyproject.toml + pyproject_path = Path("pyproject.toml") + if pyproject_path.exists(): + try: + with open(pyproject_path, "rb") as f: + toml_data = tomllib.load(f) + config.update(toml_data.get("tool", {}).get("logflow", {})) + except Exception: + pass + + # 3. Local logflow.yaml / logflow.yml (highest priority) + for ext in ["yaml", "yml"]: + local_yaml = Path(f"logflow.{ext}") + if local_yaml.exists(): + try: + with open(local_yaml, "r") as f: + config.update(yaml.safe_load(f) or {}) + break + except Exception: + pass + + return config diff --git a/logflow/core.py b/logflow/core.py index ec1905f..789c05d 100644 --- a/logflow/core.py +++ b/logflow/core.py @@ -6,6 +6,7 @@ from loguru import logger +from logflow.config import load_config from logflow.discovery import determine_script_name, get_rank from logflow.intercept import setup_interception @@ -17,28 +18,60 @@ def configure_logging( log_dir: Optional[Union[str, Path]] = None, script_name: Optional[str] = None, - file_level: str = "DEBUG", - console_level: str = "INFO", - rotation_on_startup: bool = True, - retention: int = 5, - enqueue: bool = True, + file_level: Optional[str] = None, + console_level: Optional[str] = None, + rotation_on_startup: Optional[bool] = None, + retention: Optional[int] = None, + enqueue: Optional[bool] = None, ) -> None: """ Configure the global LogFlow system. - - Args: - log_dir: Directory for log files. Defaults to './logs'. - script_name: Base name for log files. Auto-detected if None. - file_level: Minimum level for file logging. - console_level: Minimum level for console logging. - rotation_on_startup: If True, archives existing logs on startup. - retention: Number of archived logs to keep. - enqueue: If True, uses a background process for logging (recommended for MP). + Configuration priority: function args > env vars > file config > defaults. """ global _configured, _log_file + # 1. Load configuration from files (lowest priority) + file_cfg = load_config() + + # 2. Merge with defaults and environment variables + # Values are resolved in order: Args -> Env -> Config File -> Default + log_dir_val = log_dir or os.getenv("LOGFLOW_DIR") or file_cfg.get("log_dir") or "./logs" + + script_name = script_name or os.getenv("LOGFLOW_SCRIPT_NAME") or file_cfg.get("script_name") + + file_level_val = file_level or os.getenv("LOGFLOW_FILE_LEVEL") or file_cfg.get("file_level") or "DEBUG" + + console_level_val = console_level or os.getenv("LOGFLOW_CONSOLE_LEVEL") or file_cfg.get("console_level") or "INFO" + + rotation_on_startup_val = ( + rotation_on_startup + if rotation_on_startup is not None + else ( + os.getenv("LOGFLOW_STARTUP_ROTATION", "").lower() == "true" + if os.getenv("LOGFLOW_STARTUP_ROTATION") + else file_cfg.get("rotation_on_startup", True) + ) + ) + + retention_val = ( + retention + or (int(os.getenv("LOGFLOW_RETENTION")) if os.getenv("LOGFLOW_RETENTION") else None) + or file_cfg.get("retention") + or 5 + ) + + enqueue_val = ( + enqueue + if enqueue is not None + else ( + os.getenv("LOGFLOW_ENQUEUE", "").lower() == "true" + if os.getenv("LOGFLOW_ENQUEUE") + else file_cfg.get("enqueue", True) + ) + ) + # Basic setup - log_dir_path = Path(log_dir) if log_dir else Path(os.getenv("LOGFLOW_DIR", "./logs")) + log_dir_path = Path(log_dir_val) log_dir_path.mkdir(parents=True, exist_ok=True) # Determine script name and rank @@ -67,17 +100,17 @@ def configure_logging( ) logger.add( sys.stderr, - level=console_level.upper(), + level=console_level_val.upper(), format=console_format, colorize=True, - enqueue=enqueue, + enqueue=enqueue_val, ) # 2. File Handler _log_file = log_dir_path / f"{script_name}.log" # Handle Startup Rotation (Only on absolute Main Process at Rank 0) - if rotation_on_startup and is_main_process and is_rank_zero and not has_rotated and _log_file.exists(): + if rotation_on_startup_val and is_main_process and is_rank_zero and not has_rotated and _log_file.exists(): try: mtime = _log_file.stat().st_mtime ts = datetime.fromtimestamp(mtime).strftime("%Y-%m-%d_%H-%M-%S") @@ -90,7 +123,7 @@ def configure_logging( key=lambda p: p.stat().st_mtime, reverse=True, ) - for old in archives[retention:]: + for old in archives[retention_val:]: old.unlink() # Signal to all future children that rotation is done @@ -109,11 +142,11 @@ def configure_logging( logger.add( str(_log_file), - level=file_level.upper(), + level=file_level_val.upper(), format=file_format, - enqueue=enqueue, + enqueue=enqueue_val, rotation="10 MB", # Built-in size rotation - retention=retention, + retention=retention_val, backtrace=True, diagnose=True, ) diff --git a/pyproject.toml b/pyproject.toml index 203f4c6..fe4053e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,6 +18,8 @@ classifiers = [ ] dependencies = [ "loguru>=0.7.0", + "pyyaml>=6.0", + "tomli>=2.0.0; python_version < '3.11'", ] requires-python = ">=3.8" diff --git a/tests/test_config.py b/tests/test_config.py new file mode 100644 index 0000000..4709f0b --- /dev/null +++ b/tests/test_config.py @@ -0,0 +1,45 @@ +import os +from pathlib import Path +from logflow.config import load_config, get_xdg_config_dir + +def test_load_config_pyproject(tmp_path): + # Mock pyproject.toml + pyproject = tmp_path / "pyproject.toml" + pyproject.write_text(""" +[tool.logflow] +console_level = "DEBUG" +retention = 10 +""") + + # Change directory to tmp_path + old_cwd = os.getcwd() + os.chdir(tmp_path) + try: + cfg = load_config() + assert cfg["console_level"] == "DEBUG" + assert cfg["retention"] == 10 + finally: + os.chdir(old_cwd) + +def test_load_config_yaml_overrides_toml(tmp_path): + # Mock pyproject.toml + pyproject = tmp_path / "pyproject.toml" + pyproject.write_text("[tool.logflow]\nconsole_level = 'INFO'") + + # Mock logflow.yaml (higher priority) + logflow_yaml = tmp_path / "logflow.yaml" + logflow_yaml.write_text("console_level: 'DEBUG'") + + old_cwd = os.getcwd() + os.chdir(tmp_path) + try: + cfg = load_config() + assert cfg["console_level"] == "DEBUG" + finally: + os.chdir(old_cwd) + +def test_xdg_config_path(): + # Mock XDG_CONFIG_HOME + os.environ["XDG_CONFIG_HOME"] = "/tmp/xdg" + assert str(get_xdg_config_dir()) == "/tmp/xdg/logflow" + del os.environ["XDG_CONFIG_HOME"] From b9faef59f05a3c9fc1c1e4786a0a2cec23a507e2 Mon Sep 17 00:00:00 2001 From: gertbehi Date: Sat, 28 Feb 2026 16:32:24 +0100 Subject: [PATCH 03/13] Fix isort and black formatting for tests and examples --- logflow/config.py | 2 +- tests/test_config.py | 12 ++++++++---- tests/test_core.py | 17 ++++++++++------- tests/test_discovery.py | 17 +++++++++++++---- tests/test_multiprocess.py | 19 +++++++++++-------- 5 files changed, 43 insertions(+), 24 deletions(-) diff --git a/logflow/config.py b/logflow/config.py index 623586f..7599f5d 100644 --- a/logflow/config.py +++ b/logflow/config.py @@ -26,7 +26,7 @@ def load_config() -> Dict[str, Any]: 1. Local logflow.yaml / logflow.yml 2. Local pyproject.toml ([tool.logflow] section) 3. XDG User Config (~/.config/logflow/config.yaml) - + Returns: A dictionary containing the merged configuration. """ diff --git a/tests/test_config.py b/tests/test_config.py index 4709f0b..e65e7e3 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -1,6 +1,8 @@ import os from pathlib import Path -from logflow.config import load_config, get_xdg_config_dir + +from logflow.config import get_xdg_config_dir, load_config + def test_load_config_pyproject(tmp_path): # Mock pyproject.toml @@ -10,7 +12,7 @@ def test_load_config_pyproject(tmp_path): console_level = "DEBUG" retention = 10 """) - + # Change directory to tmp_path old_cwd = os.getcwd() os.chdir(tmp_path) @@ -21,15 +23,16 @@ def test_load_config_pyproject(tmp_path): finally: os.chdir(old_cwd) + def test_load_config_yaml_overrides_toml(tmp_path): # Mock pyproject.toml pyproject = tmp_path / "pyproject.toml" pyproject.write_text("[tool.logflow]\nconsole_level = 'INFO'") - + # Mock logflow.yaml (higher priority) logflow_yaml = tmp_path / "logflow.yaml" logflow_yaml.write_text("console_level: 'DEBUG'") - + old_cwd = os.getcwd() os.chdir(tmp_path) try: @@ -38,6 +41,7 @@ def test_load_config_yaml_overrides_toml(tmp_path): finally: os.chdir(old_cwd) + def test_xdg_config_path(): # Mock XDG_CONFIG_HOME os.environ["XDG_CONFIG_HOME"] = "/tmp/xdg" diff --git a/tests/test_core.py b/tests/test_core.py index e32f114..4e604ef 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1,40 +1,43 @@ import os import shutil from pathlib import Path + from logflow import configure_logging, get_logger, shutdown_logging + def test_configure_and_log(tmp_path): log_dir = tmp_path / "logs" configure_logging(log_dir=log_dir, script_name="test_run") - + logger = get_logger("test_module") logger.info("Test message") - + # Ensure file is created log_file = log_dir / "test_run.log" assert log_file.exists() - + shutdown_logging() - + content = log_file.read_text() assert "test_core" in content assert "Test message" in content + def test_startup_rotation(tmp_path): log_dir = tmp_path / "rotation_test" log_dir.mkdir() log_file = log_dir / "rotate.log" log_file.write_text("old content") - + # Configure twice to trigger rotation configure_logging(log_dir=log_dir, script_name="rotate", rotation_on_startup=True) shutdown_logging() - + # Check that a rotated file exists rotated_files = list(log_dir.glob("rotate.*.log")) assert len(rotated_files) == 1 assert rotated_files[0].read_text() == "old content" - + # New log should be empty or have new content assert log_file.exists() assert "old content" not in log_file.read_text() diff --git a/tests/test_discovery.py b/tests/test_discovery.py index d37e1ec..da1100d 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -1,26 +1,35 @@ import os -from logflow.discovery import get_rank, determine_script_name + +from logflow.discovery import determine_script_name, get_rank + def test_get_rank_none(): # Ensure rank is None when no env vars are set - if "RANK" in os.environ: del os.environ["RANK"] - if "SLURM_PROCID" in os.environ: del os.environ["SLURM_PROCID"] - if "LOCAL_RANK" in os.environ: del os.environ["LOCAL_RANK"] + if "RANK" in os.environ: + del os.environ["RANK"] + if "SLURM_PROCID" in os.environ: + del os.environ["SLURM_PROCID"] + if "LOCAL_RANK" in os.environ: + del os.environ["LOCAL_RANK"] assert get_rank() is None + def test_get_rank_torchrun(): os.environ["RANK"] = "3" assert get_rank() == 3 del os.environ["RANK"] + def test_get_rank_slurm(): os.environ["SLURM_PROCID"] = "5" assert get_rank() == 5 del os.environ["SLURM_PROCID"] + def test_determine_script_name_explicit(): assert determine_script_name("custom_name") == "custom_name" + def test_determine_script_name_env(): os.environ["LOGFLOW_SCRIPT_NAME"] = "env_name" assert determine_script_name() == "env_name" diff --git a/tests/test_multiprocess.py b/tests/test_multiprocess.py index aa9b918..1e82a9d 100644 --- a/tests/test_multiprocess.py +++ b/tests/test_multiprocess.py @@ -2,49 +2,52 @@ import os import time from pathlib import Path + from logflow import configure_logging, get_logger, shutdown_logging + def worker(rank: int, log_dir: Path, script_name: str): os.environ["RANK"] = str(rank) # Signal that we are in a child so rotation doesn't happen again # In a real app this is inherited from parent's env os.environ["_LOGFLOW_ROTATED"] = "1" - + configure_logging(log_dir=log_dir, script_name=script_name) logger = get_logger(f"worker_{rank}") logger.info(f"Worker {rank} log message") shutdown_logging() + def test_multiprocess_safety(tmp_path): log_dir = tmp_path / "mp_test" script_name = "mp_safety" - + # Initialize parent os.environ["_LOGFLOW_ROTATED"] = "0" configure_logging(log_dir=log_dir, script_name=script_name) main_logger = get_logger("main") main_logger.info("Main start") - + processes = [] num_workers = 4 for i in range(num_workers): p = mp.Process(target=worker, args=(i, log_dir, script_name)) p.start() processes.append(p) - + for p in processes: p.join() - + main_logger.info("Main end") shutdown_logging() - + # Verify results log_file = log_dir / f"{script_name}.log" assert log_file.exists() content = log_file.read_text() - + for i in range(num_workers): assert f"Worker {i} log message" in content - + assert "Main start" in content assert "Main end" in content From ae63a9667142ce800f54e07b38de8eb113c4c98d Mon Sep 17 00:00:00 2001 From: gertbehi Date: Sat, 28 Feb 2026 16:40:02 +0100 Subject: [PATCH 04/13] Enhance Jenkins: add Flake8 JUnit reporting for table-based error overview --- Jenkinsfile | 10 +++++++++- pyproject.toml | 1 + 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/Jenkinsfile b/Jenkinsfile index 885c44b..0160403 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -33,7 +33,15 @@ pipeline { } stage('Flake8') { steps { - sh "${VENV_BIN}/flake8 logflow tests examples" + // Generate flake8 output in a format that can be converted to JUnit XML + sh "${VENV_BIN}/flake8 logflow tests examples --tee --output-file=flake8.txt" + // Convert flake8.txt to JUnit XML for Jenkins reporting + sh "${VENV_BIN}/flake8_junit flake8.txt flake8-report.xml" + } + post { + always { + junit 'flake8-report.xml' + } } } } diff --git a/pyproject.toml b/pyproject.toml index fe4053e..d5b91dd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,6 +28,7 @@ dev = [ "black>=23.0.0", "isort>=5.12.0", "flake8>=6.0.0", + "flake8-junit-report>=2.1.0", "mypy>=1.0.0", "pytest>=7.0.0", ] From b24923c4727cad5d9061fb60b5cbc3f869e361b9 Mon Sep 17 00:00:00 2001 From: gertbehi Date: Sat, 28 Feb 2026 16:44:20 +0100 Subject: [PATCH 05/13] Fix linting, add missing type annotations, and improve Jenkins pipeline robustness --- Jenkinsfile | 11 ++++++----- logflow/config.py | 2 +- logflow/core.py | 2 +- tests/test_config.py | 6 +++--- tests/test_core.py | 6 ++---- tests/test_discovery.py | 10 +++++----- tests/test_multiprocess.py | 5 ++--- 7 files changed, 20 insertions(+), 22 deletions(-) diff --git a/Jenkinsfile b/Jenkinsfile index 0160403..883f931 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -33,14 +33,15 @@ pipeline { } stage('Flake8') { steps { - // Generate flake8 output in a format that can be converted to JUnit XML - sh "${VENV_BIN}/flake8 logflow tests examples --tee --output-file=flake8.txt" - // Convert flake8.txt to JUnit XML for Jenkins reporting - sh "${VENV_BIN}/flake8_junit flake8.txt flake8-report.xml" + // Use || true to prevent the stage from stopping before the report is generated + sh "${VENV_BIN}/flake8 logflow tests examples --tee --output-file=flake8.txt || true" + // Convert report to JUnit XML + sh "if [ -f flake8.txt ]; then ${VENV_BIN}/flake8_junit flake8.txt flake8-report.xml; fi" } post { always { - junit 'flake8-report.xml' + // Archive the report if it was generated + junit allowEmptyResults: true, testResults: 'flake8-report.xml' } } } diff --git a/logflow/config.py b/logflow/config.py index 7599f5d..60cadce 100644 --- a/logflow/config.py +++ b/logflow/config.py @@ -1,7 +1,7 @@ import os import sys from pathlib import Path -from typing import Any, Dict, Optional +from typing import Any, Dict import yaml diff --git a/logflow/core.py b/logflow/core.py index 789c05d..d29c16f 100644 --- a/logflow/core.py +++ b/logflow/core.py @@ -55,7 +55,7 @@ def configure_logging( retention_val = ( retention - or (int(os.getenv("LOGFLOW_RETENTION")) if os.getenv("LOGFLOW_RETENTION") else None) + or (int(os.getenv("LOGFLOW_RETENTION", "0")) if os.getenv("LOGFLOW_RETENTION") else None) or file_cfg.get("retention") or 5 ) diff --git a/tests/test_config.py b/tests/test_config.py index e65e7e3..0b00eb0 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -4,7 +4,7 @@ from logflow.config import get_xdg_config_dir, load_config -def test_load_config_pyproject(tmp_path): +def test_load_config_pyproject(tmp_path: Path) -> None: # Mock pyproject.toml pyproject = tmp_path / "pyproject.toml" pyproject.write_text(""" @@ -24,7 +24,7 @@ def test_load_config_pyproject(tmp_path): os.chdir(old_cwd) -def test_load_config_yaml_overrides_toml(tmp_path): +def test_load_config_yaml_overrides_toml(tmp_path: Path) -> None: # Mock pyproject.toml pyproject = tmp_path / "pyproject.toml" pyproject.write_text("[tool.logflow]\nconsole_level = 'INFO'") @@ -42,7 +42,7 @@ def test_load_config_yaml_overrides_toml(tmp_path): os.chdir(old_cwd) -def test_xdg_config_path(): +def test_xdg_config_path() -> None: # Mock XDG_CONFIG_HOME os.environ["XDG_CONFIG_HOME"] = "/tmp/xdg" assert str(get_xdg_config_dir()) == "/tmp/xdg/logflow" diff --git a/tests/test_core.py b/tests/test_core.py index 4e604ef..cde13d3 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1,11 +1,9 @@ -import os -import shutil from pathlib import Path from logflow import configure_logging, get_logger, shutdown_logging -def test_configure_and_log(tmp_path): +def test_configure_and_log(tmp_path: Path) -> None: log_dir = tmp_path / "logs" configure_logging(log_dir=log_dir, script_name="test_run") @@ -23,7 +21,7 @@ def test_configure_and_log(tmp_path): assert "Test message" in content -def test_startup_rotation(tmp_path): +def test_startup_rotation(tmp_path: Path) -> None: log_dir = tmp_path / "rotation_test" log_dir.mkdir() log_file = log_dir / "rotate.log" diff --git a/tests/test_discovery.py b/tests/test_discovery.py index da1100d..f1420d7 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -3,7 +3,7 @@ from logflow.discovery import determine_script_name, get_rank -def test_get_rank_none(): +def test_get_rank_none() -> None: # Ensure rank is None when no env vars are set if "RANK" in os.environ: del os.environ["RANK"] @@ -14,23 +14,23 @@ def test_get_rank_none(): assert get_rank() is None -def test_get_rank_torchrun(): +def test_get_rank_torchrun() -> None: os.environ["RANK"] = "3" assert get_rank() == 3 del os.environ["RANK"] -def test_get_rank_slurm(): +def test_get_rank_slurm() -> None: os.environ["SLURM_PROCID"] = "5" assert get_rank() == 5 del os.environ["SLURM_PROCID"] -def test_determine_script_name_explicit(): +def test_determine_script_name_explicit() -> None: assert determine_script_name("custom_name") == "custom_name" -def test_determine_script_name_env(): +def test_determine_script_name_env() -> None: os.environ["LOGFLOW_SCRIPT_NAME"] = "env_name" assert determine_script_name() == "env_name" del os.environ["LOGFLOW_SCRIPT_NAME"] diff --git a/tests/test_multiprocess.py b/tests/test_multiprocess.py index 1e82a9d..c630c97 100644 --- a/tests/test_multiprocess.py +++ b/tests/test_multiprocess.py @@ -1,12 +1,11 @@ import multiprocessing as mp import os -import time from pathlib import Path from logflow import configure_logging, get_logger, shutdown_logging -def worker(rank: int, log_dir: Path, script_name: str): +def worker(rank: int, log_dir: Path, script_name: str) -> None: os.environ["RANK"] = str(rank) # Signal that we are in a child so rotation doesn't happen again # In a real app this is inherited from parent's env @@ -18,7 +17,7 @@ def worker(rank: int, log_dir: Path, script_name: str): shutdown_logging() -def test_multiprocess_safety(tmp_path): +def test_multiprocess_safety(tmp_path: Path) -> None: log_dir = tmp_path / "mp_test" script_name = "mp_safety" From e3380e9e6b0389c6e878456e9f7c775a3b1c62a2 Mon Sep 17 00:00:00 2001 From: gertbehi Date: Sat, 28 Feb 2026 16:46:43 +0100 Subject: [PATCH 06/13] Enhance Jenkins: add JUnit test reports and code coverage (pytest-cov) --- Jenkinsfile | 12 +++++++++++- pyproject.toml | 1 + 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/Jenkinsfile b/Jenkinsfile index 883f931..cdcd78c 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -56,9 +56,19 @@ pipeline { stage('Unit Tests') { steps { - sh "${VENV_BIN}/pytest tests" + sh "${VENV_BIN}/pytest tests --junitxml=test-report.xml --cov=logflow --cov-report=xml:coverage.xml --cov-report=term" + } + post { + always { + // Archive JUnit test results + junit allowEmptyResults: true, testResults: 'test-report.xml' + // Archive Cobertura coverage results (if plugin is installed) + // Note: modern Jenkins uses the 'cobertura' or 'recordCoverage' step + archiveArtifacts artifacts: 'coverage.xml', fingerprint: true + } } } + } post { diff --git a/pyproject.toml b/pyproject.toml index d5b91dd..c9a06dc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,7 @@ dev = [ "flake8-junit-report>=2.1.0", "mypy>=1.0.0", "pytest>=7.0.0", + "pytest-cov>=4.0.0", ] [project.urls] From 43c303e1b1a41e65572c3d44f23e723d67f98758 Mon Sep 17 00:00:00 2001 From: gertbehi Date: Sat, 28 Feb 2026 16:50:06 +0100 Subject: [PATCH 07/13] Fix Jenkins pipeline: add missing types-PyYAML for Mypy --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index c9a06dc..f857c56 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,7 @@ dev = [ "flake8>=6.0.0", "flake8-junit-report>=2.1.0", "mypy>=1.0.0", + "types-PyYAML>=6.0.0", "pytest>=7.0.0", "pytest-cov>=4.0.0", ] From 05f406e28b9b00d603c567ea06c2d359a93a31bb Mon Sep 17 00:00:00 2001 From: gertbehi Date: Sat, 28 Feb 2026 16:51:56 +0100 Subject: [PATCH 08/13] Enhance Jenkins: use recordCoverage for visual code coverage reporting --- Jenkinsfile | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Jenkinsfile b/Jenkinsfile index cdcd78c..5a8e008 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -60,11 +60,11 @@ pipeline { } post { always { - // Archive JUnit test results + // Archive and display JUnit test results junit allowEmptyResults: true, testResults: 'test-report.xml' - // Archive Cobertura coverage results (if plugin is installed) - // Note: modern Jenkins uses the 'cobertura' or 'recordCoverage' step - archiveArtifacts artifacts: 'coverage.xml', fingerprint: true + + // Display Coverage in Jenkins UI using Code Coverage API Plugin + recordCoverage tools: [[parser: 'COBERTURA', pattern: 'coverage.xml']] } } } From 8887a93177e50ed6e1687c2180ee8c87a064c181 Mon Sep 17 00:00:00 2001 From: gertbehi Date: Sat, 28 Feb 2026 17:06:40 +0100 Subject: [PATCH 09/13] Remove previous reports --- Jenkinsfile | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Jenkinsfile b/Jenkinsfile index 5a8e008..24fdac0 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -33,6 +33,8 @@ pipeline { } stage('Flake8') { steps { + // Clean up previous reports + sh "rm -f flake8.txt flake8-report.xml" // Use || true to prevent the stage from stopping before the report is generated sh "${VENV_BIN}/flake8 logflow tests examples --tee --output-file=flake8.txt || true" // Convert report to JUnit XML From 2bc210cd629f3a6d90185ef4e5f56b21bade988b Mon Sep 17 00:00:00 2001 From: gertbehi Date: Sun, 1 Mar 2026 14:00:42 +0100 Subject: [PATCH 10/13] feat: achieve 94% test coverage and full quality compliance --- tests/test_config.py | 93 ++++++++++++++++++++++++++++-- tests/test_core.py | 121 +++++++++++++++++++++++++++++++++++++++ tests/test_discovery.py | 124 ++++++++++++++++++++++++++++++++++++++-- tests/test_intercept.py | 79 +++++++++++++++++++++++++ 4 files changed, 408 insertions(+), 9 deletions(-) create mode 100644 tests/test_intercept.py diff --git a/tests/test_config.py b/tests/test_config.py index 0b00eb0..5970ba6 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -42,8 +42,91 @@ def test_load_config_yaml_overrides_toml(tmp_path: Path) -> None: os.chdir(old_cwd) -def test_xdg_config_path() -> None: - # Mock XDG_CONFIG_HOME - os.environ["XDG_CONFIG_HOME"] = "/tmp/xdg" - assert str(get_xdg_config_dir()) == "/tmp/xdg/logflow" - del os.environ["XDG_CONFIG_HOME"] +def test_load_config_yml_extension(tmp_path: Path) -> None: + # Mock logflow.yml + logflow_yml = tmp_path / "logflow.yml" + logflow_yml.write_text("file_level: 'TRACE'") + + old_cwd = os.getcwd() + os.chdir(tmp_path) + try: + cfg = load_config() + assert cfg["file_level"] == "TRACE" + finally: + os.chdir(old_cwd) + + +def test_load_config_xdg(tmp_path: Path) -> None: + # Set XDG_CONFIG_HOME to tmp_path + os.environ["XDG_CONFIG_HOME"] = str(tmp_path) + xdg_dir = tmp_path / "logflow" + xdg_dir.mkdir() + xdg_file = xdg_dir / "config.yaml" + xdg_file.write_text("retention: 20") + + try: + cfg = load_config() + assert cfg["retention"] == 20 + finally: + del os.environ["XDG_CONFIG_HOME"] + + +def test_xdg_config_path_default() -> None: + # Ensure it returns default home path if env not set + if "XDG_CONFIG_HOME" in os.environ: + del os.environ["XDG_CONFIG_HOME"] + path = get_xdg_config_dir() + assert ".config/logflow" in str(path) + + +def test_load_config_corrupt_yaml(tmp_path: Path) -> None: + # Create invalid YAML + logflow_yaml = tmp_path / "logflow.yaml" + logflow_yaml.write_bytes(b"\x00\x01\x02") # Binary data is invalid YAML + + old_cwd = os.getcwd() + os.chdir(tmp_path) + try: + cfg = load_config() + # Should fail gracefully and return empty dict + assert cfg == {} + finally: + os.chdir(old_cwd) + + +def test_load_config_pyproject_missing_tool(tmp_path: Path) -> None: + pyproject = tmp_path / "pyproject.toml" + pyproject.write_text("[tool.something_else]\nkey = 'value'") + + old_cwd = os.getcwd() + os.chdir(tmp_path) + try: + cfg = load_config() + assert cfg == {} + finally: + os.chdir(old_cwd) + + +def test_load_config_corrupt_toml(tmp_path: Path) -> None: + pyproject = tmp_path / "pyproject.toml" + # Write invalid TOML (unquoted string or similar) + pyproject.write_text("[tool.logflow]\nkey = value_without_quotes") + + old_cwd = os.getcwd() + os.chdir(tmp_path) + try: + cfg = load_config() + assert cfg == {} + finally: + os.chdir(old_cwd) + + +def test_load_config_empty(tmp_path: Path) -> None: + # Test loading when no files exist + old_cwd = os.getcwd() + os.chdir(tmp_path) + try: + cfg = load_config() + assert cfg == {} + finally: + os.chdir(old_cwd) diff --git a/tests/test_core.py b/tests/test_core.py index cde13d3..9e46276 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1,3 +1,4 @@ +import os from pathlib import Path from logflow import configure_logging, get_logger, shutdown_logging @@ -21,6 +22,126 @@ def test_configure_and_log(tmp_path: Path) -> None: assert "Test message" in content +def test_configure_env_overrides_file(tmp_path: Path) -> None: + log_dir = tmp_path / "env_test" + # Create config file + pyproject = tmp_path / "pyproject.toml" + pyproject.write_text("[tool.logflow]\nfile_level = 'INFO'") + + old_cwd = os.getcwd() + os.chdir(tmp_path) + os.environ["LOGFLOW_FILE_LEVEL"] = "TRACE" + try: + configure_logging(log_dir=log_dir, script_name="env_over") + # In a real test we'd check the internal state, but here we verify it runs + # and create logs with correct params. + logger = get_logger("env_test") + logger.trace("Trace message") + shutdown_logging() + + content = (log_dir / "env_over.log").read_text() + assert "Trace message" in content + finally: + os.chdir(old_cwd) + del os.environ["LOGFLOW_FILE_LEVEL"] + + +def test_configure_args_overrides_env(tmp_path: Path) -> None: + log_dir = tmp_path / "arg_test" + os.environ["LOGFLOW_FILE_LEVEL"] = "ERROR" + try: + # Args 'DEBUG' should win over Env 'ERROR' + configure_logging(log_dir=log_dir, script_name="arg_over", file_level="DEBUG") + logger = get_logger("arg_test") + logger.debug("Debug message") + shutdown_logging() + + content = (log_dir / "arg_over.log").read_text() + assert "Debug message" in content + finally: + del os.environ["LOGFLOW_FILE_LEVEL"] + + +def test_configure_rank_non_zero(tmp_path: Path) -> None: + log_dir = tmp_path / "rank_test" + os.environ["RANK"] = "1" + try: + configure_logging(log_dir=log_dir, script_name="rank_one") + logger = get_logger("rank_test") + logger.info("Non-zero rank message") + shutdown_logging() + + # Should still exist in file + log_file = log_dir / "rank_one.log" + assert "Non-zero rank message" in log_file.read_text() + assert "[rank 1]" in log_file.read_text() + finally: + del os.environ["RANK"] + + +def test_configure_rank_mocked(tmp_path: Path) -> None: + from unittest.mock import patch + + log_dir = tmp_path / "mock_rank" + with patch("logflow.core.get_rank", return_value=10): + configure_logging(log_dir=log_dir, script_name="mocked") + logger = get_logger("mock_rank") + logger.info("Mocked rank message") + shutdown_logging() + + content = (log_dir / "mocked.log").read_text() + assert "[rank 10]" in content + + +def test_shutdown_multiple_times() -> None: + # Should not raise exception + shutdown_logging() + shutdown_logging() + + +def test_auto_configure(tmp_path: Path) -> None: + from unittest.mock import patch + + import logflow.core + + # Reset global state for this test + with patch("logflow.core._configured", False): + with patch("logflow.core.configure_logging") as mock_conf: + logflow.core.get_logger("auto") + mock_conf.assert_called_once() + + +def test_rotation_failure(tmp_path: Path) -> None: + from unittest.mock import patch + + log_dir = tmp_path / "rot_fail" + log_dir.mkdir() + log_file = log_dir / "app.log" + log_file.write_text("old") + + with patch("pathlib.Path.rename", side_effect=OSError("Access denied")): + # Should not crash, just log warning + configure_logging(log_dir=log_dir, script_name="app") + shutdown_logging() + + +def test_configure_no_rotation(tmp_path: Path) -> None: + log_dir = tmp_path / "no_rotate" + log_dir.mkdir() + log_file = log_dir / "app.log" + log_file.write_text("old") + + configure_logging(log_dir=log_dir, script_name="app", rotation_on_startup=False) + logger = get_logger("no_rotate") + logger.info("new") + shutdown_logging() + + # Should append, not rotate + content = log_file.read_text() + assert "old" in content + assert "new" in content + + def test_startup_rotation(tmp_path: Path) -> None: log_dir = tmp_path / "rotation_test" log_dir.mkdir() diff --git a/tests/test_discovery.py b/tests/test_discovery.py index f1420d7..161b76e 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -26,11 +26,127 @@ def test_get_rank_slurm() -> None: del os.environ["SLURM_PROCID"] +def test_get_rank_invalid() -> None: + os.environ["RANK"] = "not_an_int" + assert get_rank() is None + del os.environ["RANK"] + + def test_determine_script_name_explicit() -> None: assert determine_script_name("custom_name") == "custom_name" -def test_determine_script_name_env() -> None: - os.environ["LOGFLOW_SCRIPT_NAME"] = "env_name" - assert determine_script_name() == "env_name" - del os.environ["LOGFLOW_SCRIPT_NAME"] +def test_get_rank_generic_ddp() -> None: + os.environ["LOCAL_RANK"] = "1" + os.environ["LOCAL_WORLD_SIZE"] = "2" + os.environ["NODE_RANK"] = "1" + # (Node 1 * World 2) + Local 1 = Rank 3 + assert get_rank() == 3 + del os.environ["LOCAL_RANK"] + del os.environ["LOCAL_WORLD_SIZE"] + del os.environ["NODE_RANK"] + + +def test_determine_script_name_argv() -> None: + import sys + from unittest.mock import patch + + old_argv = sys.argv + sys.argv = ["/path/to/my_script.py"] + # Mock sys.modules['__main__'] to not have a package/spec + with patch.dict(sys.modules, {"__main__": type("Module", (), {"__package__": None})()}): + try: + name = determine_script_name() + assert name == "my_script" + finally: + sys.argv = old_argv + + +def test_get_rank_local_only() -> None: + os.environ["LOCAL_RANK"] = "2" + # Defaults: node_rank=0, local_world_size=1 + assert get_rank() == 2 + del os.environ["LOCAL_RANK"] + + +def test_determine_script_name_empty_argv() -> None: + import sys + + old_argv = sys.argv + sys.argv = [] + try: + name = determine_script_name() + # Fallback should hit main check or return 'app' + assert isinstance(name, str) + finally: + sys.argv = old_argv + + +def test_determine_script_name_package() -> None: + import sys + from unittest.mock import patch + + # Mock sys.modules['__main__'] to look like a package run + mock_main = type("Module", (), {"__package__": "my_package"})() + with patch.dict(sys.modules, {"__main__": mock_main}): + name = determine_script_name() + # Should return 'my_package' or similar + assert isinstance(name, str) + + +def test_determine_script_name_spec() -> None: + import sys + from unittest.mock import MagicMock, patch + + # Create a dummy module with a spec and name + mock_spec = MagicMock() + mock_spec.name = "my_module.sub" + mock_main = MagicMock() + mock_main.__spec__ = mock_spec + mock_main.__package__ = "my_module" + + with patch.dict(sys.modules, {"__main__": mock_main}): + # We need to bypass the package check + name = determine_script_name() + assert isinstance(name, str) + + +def test_determine_script_name_flag() -> None: + import sys + from unittest.mock import patch + + old_argv = sys.argv + sys.argv = ["-m"] # Obvious flag + # Force MainProcess check to pass + with patch.dict(sys.modules, {"__main__": type("Module", (), {"__package__": None})()}): + try: + name = determine_script_name() + assert isinstance(name, str) + finally: + sys.argv = old_argv + + +def test_get_rank_local_world_size() -> None: + os.environ["LOCAL_RANK"] = "1" + os.environ["NODE_RANK"] = "2" + os.environ["LOCAL_WORLD_SIZE"] = "4" + # (Node 2 * World 4) + Local 1 = Rank 9 + assert get_rank() == 9 + del os.environ["LOCAL_RANK"] + del os.environ["NODE_RANK"] + del os.environ["LOCAL_WORLD_SIZE"] + + +def test_determine_script_name_flag_rejection() -> None: + import sys + from unittest.mock import patch + + old_argv = sys.argv + sys.argv = ["-m"] # Trigger flag rejection + # Mock __main__ to not have a file or package to force fallback + with patch.dict(sys.modules, {"__main__": type("Module", (), {})()}): + try: + name = determine_script_name() + assert name == "app" + finally: + sys.argv = old_argv diff --git a/tests/test_intercept.py b/tests/test_intercept.py new file mode 100644 index 0000000..1322bd7 --- /dev/null +++ b/tests/test_intercept.py @@ -0,0 +1,79 @@ +import logging +import warnings +from pathlib import Path + +from logflow import configure_logging, shutdown_logging + + +def test_intercept_logging(tmp_path: Path) -> None: + log_dir = tmp_path / "intercept_logs" + configure_logging(log_dir=log_dir, script_name="intercept") + + # Use standard logging + std_logger = logging.getLogger("standard_lib") + std_logger.error("Standard logging message") + + shutdown_logging() + + log_file = log_dir / "intercept.log" + content = log_file.read_text() + assert "Standard logging message" in content + + +def test_intercept_exception(tmp_path: Path) -> None: + log_dir = tmp_path / "exception_logs" + configure_logging(log_dir=log_dir, script_name="exception") + + std_logger = logging.getLogger("exception_lib") + try: + raise ValueError("Intercepted error") + except ValueError: + std_logger.exception("An error occurred") + + shutdown_logging() + + log_file = log_dir / "exception.log" + content = log_file.read_text() + assert "An error occurred" in content + assert "ValueError: Intercepted error" in content + + +def test_intercept_unknown_level(tmp_path: Path) -> None: + log_dir = tmp_path / "unknown_level_logs" + configure_logging(log_dir=log_dir, script_name="unknown") + + # Manually emit a record with an unknown level + record = logging.LogRecord( + name="test", + level=99, # Unknown level + pathname="test.py", + lineno=1, + msg="Unknown level message", + args=None, + exc_info=None, + ) + from logflow.intercept import InterceptHandler + + handler = InterceptHandler() + handler.emit(record) + + shutdown_logging() + + log_file = log_dir / "unknown.log" + content = log_file.read_text() + assert "Unknown level message" in content + + +def test_intercept_warnings(tmp_path: Path) -> None: + log_dir = tmp_path / "warning_logs" + configure_logging(log_dir=log_dir, script_name="warnings") + + # Trigger a python warning + warnings.warn("Custom warning message", UserWarning) + + shutdown_logging() + + log_file = log_dir / "warnings.log" + content = log_file.read_text() + assert "Custom warning message" in content + assert "UserWarning" in content From 6337c4fb54cbdf037937049bed88f67ab1b256e5 Mon Sep 17 00:00:00 2001 From: gertbehi Date: Sun, 1 Mar 2026 14:58:47 +0100 Subject: [PATCH 11/13] feat: add GitHub Actions CI workflow for automated PR checks --- .github/workflows/ci.yml | 58 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 .github/workflows/ci.yml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..11c0462 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,58 @@ +name: LogFlow CI + +on: + push: + branches: [ main, dev/main ] + pull_request: + branches: [ main ] + +jobs: + quality-checks: + name: Linting & Type Checking + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Set up Python 3.12 + uses: actions/setup-python@v5 + with: + python-version: "3.12" + cache: 'pip' + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -e ".[dev]" + - name: Formatting (Isort & Black) + run: | + isort --check-only . + black --check . + - name: Linting (Flake8) + run: | + flake8 . + - name: Type Checking (Mypy) + run: | + mypy . + + unit-tests: + name: Unit Tests & Coverage + needs: quality-checks + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Set up Python 3.12 + uses: actions/setup-python@v5 + with: + python-version: "3.12" + cache: 'pip' + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -e ".[dev]" + - name: Run Tests + run: | + pytest tests --junitxml=test-report.xml --cov=logflow --cov-report=xml --cov-report=term + - name: Upload Test Results + uses: actions/upload-artifact@v4 + with: + name: test-results + path: test-report.xml + if: always() From 754eca2d219ef7931317736c4699a5e5c9da2a6f Mon Sep 17 00:00:00 2001 From: gertbehi Date: Sun, 1 Mar 2026 14:59:28 +0100 Subject: [PATCH 12/13] Recommend lnav --- README.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/README.md b/README.md index e1b3652..6cdc5bb 100644 --- a/README.md +++ b/README.md @@ -72,6 +72,19 @@ export LOGFLOW_DIR="/var/log/myapp" export LOGFLOW_CONSOLE_LEVEL="ERROR" ``` +## Log Inspection +For the best experience viewing LogFlow logs (especially interleaving logs from multiple ranks/workers), we recommend using **[lnav](https://lnav.org/)** (The Log File Navigator). + +`lnav` automatically detects LogFlow's timestamp format and can merge multiple log files into a single, chronological view. + +### Usage with lnav +```bash +# View all logs in the directory interleaved by time +lnav ./logs +``` + +For more information, see the **[lnav documentation](https://docs.lnav.org/)**. + ## Distributed Training (DDP/SLURM) LogFlow handles ranks automatically. No need to wrap your log calls in `if rank == 0:`. ```python From e6c762f561a30ae72cf4e1b21907fb5b900f524d Mon Sep 17 00:00:00 2001 From: gertbehi Date: Sun, 1 Mar 2026 15:06:31 +0100 Subject: [PATCH 13/13] fix: use 'spawn' context for cross-platform multiprocess safety and resolve CI hang --- tests/test_multiprocess.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tests/test_multiprocess.py b/tests/test_multiprocess.py index c630c97..ff62ee9 100644 --- a/tests/test_multiprocess.py +++ b/tests/test_multiprocess.py @@ -27,15 +27,21 @@ def test_multiprocess_safety(tmp_path: Path) -> None: main_logger = get_logger("main") main_logger.info("Main start") + # Use 'spawn' context for consistent behavior on Linux, macOS, and Windows. + # This avoids deadlocks with Loguru's background sink queue. + ctx = mp.get_context("spawn") processes = [] num_workers = 4 for i in range(num_workers): - p = mp.Process(target=worker, args=(i, log_dir, script_name)) + p = ctx.Process(target=worker, args=(i, log_dir, script_name)) p.start() processes.append(p) for p in processes: - p.join() + p.join(timeout=15) + if p.is_alive(): + p.terminate() + p.join() main_logger.info("Main end") shutdown_logging()