diff --git a/docs/architecture.md b/docs/architecture.md index 852657d..94819b1 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -77,21 +77,63 @@ Project-local and user configs merge per file: `./.qracer/providers.toml` define ## Provider Plugin System -> **구현 예정** — 현재는 `provider_catalog.py` 기반 하드코딩 방식으로 동작합니다. - -Built-in adapters and external plugins share the same `ProviderPlugin` protocol. Lifecycle methods (`initialize`, `health_check`, `shutdown`) require DataRegistry updates — tracked separately from current implementation. +Built-in adapters and external plugins share the same capability protocols +(`PriceProvider`, `NewsProvider`, …) and are registered from the same +`providers.toml` config. Adapters may additionally implement the optional +`LifecycleProvider` protocol to opt into per-provider `initialize`, +`health_check`, and `shutdown` hooks — invoked by `_build_registries()` at +startup and by `qracer serve` on shutdown. ```python -class ProviderPlugin(Protocol): - name: str - capabilities: list[Capability] - tier: Tier # hot | warm | cold - - async def initialize(self, config: ProviderConfig) -> None: ... +# qracer/provider_lifecycle.py +@runtime_checkable +class LifecycleProvider(Protocol): + async def initialize(self) -> None: ... async def health_check(self) -> bool: ... async def shutdown(self) -> None: ... ``` +All three methods are optional — adapters without them are treated as +always-healthy and require no teardown. A provider that raises in +`initialize()` or returns `False` from `health_check()` is excluded from the +registry with a warning instead of crashing the process. + +### Example third-party adapter + +```python +# qracer_polygon/adapter.py +class PolygonAdapter: + """External data provider with graceful lifecycle management.""" + + def __init__(self, api_key: str | None = None) -> None: + self._api_key = api_key + self._session: httpx.AsyncClient | None = None + + async def initialize(self) -> None: + self._session = httpx.AsyncClient(timeout=10.0) + + async def health_check(self) -> bool: + if self._session is None: + return False + resp = await self._session.get("https://api.polygon.io/v1/status") + return resp.status_code == 200 + + async def shutdown(self) -> None: + if self._session is not None: + await self._session.aclose() + + async def get_price(self, ticker: str) -> float: + ... +``` + +Register it via the entry-point group: + +```toml +# External package pyproject.toml +[project.entry-points."qracer.data_providers"] +polygon = "qracer_polygon.adapter:PolygonAdapter" +``` + ### Built-in vs Plugin | Type | Location | Install | @@ -109,15 +151,22 @@ External plugins register via Python entry points: bloomberg = "qracer_bloomberg.adapter:BloombergAdapter" ``` -On startup the registry scans entry points, loads `providers.toml` config, checks credentials, and registers enabled providers. This replaces the current hardcoded `_build_registries()` approach — significant implementation work tracked separately. +On startup `_build_registries()` scans entry points, loads `providers.toml`, +checks credentials, runs each adapter's optional `initialize()` + +`health_check()` hooks, and registers the enabled/healthy providers. ```text App start - → entry_points("qracer.providers") scan - → providers.toml config load + → entry_points("qracer.data_providers") + entry_points("qracer.llm_providers") scan + → providers.toml config load (enabled, priority, api_key_env) → credentials.env check per provider → Missing API key → skip with warning log - → Register enabled providers by priority + → Optional initialize() + health_check() → unhealthy ⇒ skip with warning + → Register surviving providers by priority + +qracer serve exit + → shutdown_all_providers(data_registry, llm_registry) + → Exceptions from shutdown() are logged, never propagated ``` ### `providers.toml` Example diff --git a/qracer/cli.py b/qracer/cli.py index 7b394c7..4f26341 100644 --- a/qracer/cli.py +++ b/qracer/cli.py @@ -283,6 +283,7 @@ def _build_registries() -> tuple[LLMRegistry, DataRegistry, list[str]]: from qracer.llm.providers import Role from qracer.llm.registry import LLMRegistry from qracer.provider_catalog import discover_data_providers, discover_llm_providers + from qracer.provider_lifecycle import initialize_provider_sync config = load_config() llm_registry = LLMRegistry() @@ -320,6 +321,9 @@ def _build_registries() -> tuple[LLMRegistry, DataRegistry, list[str]]: mod_path, cls_name = adapter_path.rsplit(".", 1) adapter_cls = getattr(importlib.import_module(mod_path), cls_name) adapter = adapter_cls(api_key=api_key) if api_key else adapter_cls() + if not initialize_provider_sync(name, adapter): + warnings.append(f"{name}: failed initialize/health_check — excluded") + continue caps = [] for cp in cap_paths: cp_mod, cp_name = cp.rsplit(".", 1) @@ -336,6 +340,9 @@ def _build_registries() -> tuple[LLMRegistry, DataRegistry, list[str]]: mod_path, cls_name = adapter_path.rsplit(".", 1) adapter_cls = getattr(importlib.import_module(mod_path), cls_name) adapter = adapter_cls(api_key=api_key) + if not initialize_provider_sync(name, adapter): + warnings.append(f"{name}: failed initialize/health_check — excluded") + continue roles = [Role(v) for v in role_values] llm_registry.register(name, adapter, roles) except Exception as exc: @@ -1099,9 +1106,15 @@ def _handle_signal(signum: int, _frame: object) -> None: click.echo(" Telegram bot: receiving commands (try /help in chat)") click.echo(" Press Ctrl+C to stop.\n") + from qracer.provider_lifecycle import shutdown_all_providers_sync + try: asyncio.run(server.run()) finally: + try: + shutdown_all_providers_sync(data_registry, llm_registry) + except Exception: + logger.debug("Provider shutdown raised", exc_info=True) release(pid_path) click.echo("qracer serve stopped.") diff --git a/qracer/provider_lifecycle.py b/qracer/provider_lifecycle.py new file mode 100644 index 0000000..21f23c4 --- /dev/null +++ b/qracer/provider_lifecycle.py @@ -0,0 +1,166 @@ +"""Provider lifecycle — optional ``initialize`` / ``health_check`` / ``shutdown`` hooks. + +Adapters registered via :mod:`qracer.provider_catalog` (built-ins and +entry-point plugins) can optionally implement the :class:`LifecycleProvider` +protocol to get: + +* ``initialize()`` — one-shot async setup (open HTTP sessions, prime caches) +* ``health_check() -> bool`` — runtime readiness probe; ``False`` excludes the + provider from the registry instead of letting it crash on first use +* ``shutdown()`` — graceful teardown invoked on server exit + +All three hooks are optional. Adapters that don't implement them are treated +as always-healthy and require no teardown — this keeps the system fully +backwards-compatible with the existing adapters in ``qracer/data/`` and +``qracer/llm/``. + +Hook failures never propagate: a raising ``initialize``/``health_check`` is +treated as "unhealthy" (provider is skipped with a warning), and a raising +``shutdown`` is logged but otherwise ignored so one bad provider can't block +the rest of the teardown sweep. +""" + +from __future__ import annotations + +import asyncio +import inspect +import logging +from typing import Any, Iterable, Protocol, runtime_checkable + +logger = logging.getLogger(__name__) + + +@runtime_checkable +class LifecycleProvider(Protocol): + """Optional lifecycle hooks for data and LLM providers. + + Providers are expected to implement **any subset** of these methods — the + helpers in this module use ``hasattr``/``callable`` duck-typing rather + than strict protocol conformance, so partial implementations are fine. + """ + + async def initialize(self) -> None: ... + + async def health_check(self) -> bool: ... + + async def shutdown(self) -> None: ... + + +async def _maybe_await(value: Any) -> Any: + """Await *value* if it's a coroutine/awaitable, otherwise return it.""" + if inspect.isawaitable(value): + return await value + return value + + +async def initialize_provider(name: str, adapter: Any) -> bool: + """Run the optional ``initialize()`` then ``health_check()`` hooks. + + Returns ``True`` if the adapter is healthy (or has no hooks at all). + Returns ``False`` only when a hook raises or ``health_check()`` reports + the adapter is not ready; either case is logged as a warning. + """ + init = getattr(adapter, "initialize", None) + if callable(init): + try: + await _maybe_await(init()) + except Exception as exc: + logger.warning("Provider '%s' initialize() failed: %s", name, exc) + return False + + health = getattr(adapter, "health_check", None) + if callable(health): + try: + ok = await _maybe_await(health()) + except Exception as exc: + logger.warning("Provider '%s' health_check() raised: %s", name, exc) + return False + if not bool(ok): + logger.warning("Provider '%s' reported unhealthy — excluded", name) + return False + + return True + + +async def shutdown_provider(name: str, adapter: Any) -> None: + """Run the optional ``shutdown()`` hook, swallowing any exception.""" + shutdown = getattr(adapter, "shutdown", None) + if not callable(shutdown): + return + try: + await _maybe_await(shutdown()) + except Exception as exc: + logger.warning("Provider '%s' shutdown() raised: %s", name, exc) + + +def initialize_provider_sync(name: str, adapter: Any) -> bool: + """Synchronous wrapper for :func:`initialize_provider`. + + Used by :func:`qracer.cli._build_registries`, which is called from + synchronous CLI setup code (no running event loop). If a loop is + already running we fall through to ``True`` without invoking hooks, + since ``asyncio.run()`` would fail — in practice the registry build + path is always sync. + """ + try: + asyncio.get_running_loop() + except RuntimeError: + return asyncio.run(initialize_provider(name, adapter)) + # Running loop detected — can't safely invoke. Skip lifecycle gracefully. + logger.debug( + "Skipping lifecycle init for '%s': running event loop present in sync context", + name, + ) + return True + + +def _iter_unique_adapters( + registries: Iterable[Any], +) -> list[tuple[str, Any]]: + """Collect unique ``(name, adapter)`` pairs from one or more registries. + + Accepts both :class:`~qracer.data.registry.DataRegistry` and + :class:`~qracer.llm.registry.LLMRegistry` — anything exposing the private + ``_adapters`` or ``_providers`` dicts used by ``register()`` is fair game. + An adapter registered for multiple capabilities/roles is returned once. + """ + seen: set[int] = set() + out: list[tuple[str, Any]] = [] + for reg in registries: + buckets: dict[Any, list[tuple[str, Any]]] | None = getattr(reg, "_adapters", None) + if buckets is None: + buckets = getattr(reg, "_providers", None) + if not buckets: + continue + for entries in buckets.values(): + for name, adapter in entries: + key = id(adapter) + if key in seen: + continue + seen.add(key) + out.append((name, adapter)) + return out + + +async def shutdown_all_providers(*registries: Any) -> None: + """Shut down every unique adapter across the given registries. + + Safe to call even when no adapters implement :class:`LifecycleProvider` — + adapters without a ``shutdown()`` method are silently skipped. + """ + for name, adapter in _iter_unique_adapters(registries): + await shutdown_provider(name, adapter) + + +def shutdown_all_providers_sync(*registries: Any) -> None: + """Synchronous wrapper for :func:`shutdown_all_providers`. + + No-op when a loop is already running — callers in async contexts should + await :func:`shutdown_all_providers` directly. + """ + try: + asyncio.get_running_loop() + except RuntimeError: + asyncio.run(shutdown_all_providers(*registries)) + return + logger.debug("Skipping sync provider shutdown: running event loop present") diff --git a/tests/test_provider_lifecycle.py b/tests/test_provider_lifecycle.py new file mode 100644 index 0000000..5f4b4f9 --- /dev/null +++ b/tests/test_provider_lifecycle.py @@ -0,0 +1,289 @@ +"""Tests for qracer.provider_lifecycle.""" + +from __future__ import annotations + +import asyncio +import logging +from typing import Any + +import pytest + +from qracer.provider_lifecycle import ( + LifecycleProvider, + initialize_provider, + initialize_provider_sync, + shutdown_all_providers, + shutdown_all_providers_sync, + shutdown_provider, +) + +# -------------------------------------------------------------------------- +# Test doubles +# -------------------------------------------------------------------------- + + +class _AsyncAdapter: + """Adapter that implements every hook as an async coroutine.""" + + def __init__(self, *, healthy: bool = True, fail_init: bool = False) -> None: + self.init_calls = 0 + self.health_calls = 0 + self.shutdown_calls = 0 + self._healthy = healthy + self._fail_init = fail_init + + async def initialize(self) -> None: + self.init_calls += 1 + if self._fail_init: + raise RuntimeError("init boom") + + async def health_check(self) -> bool: + self.health_calls += 1 + return self._healthy + + async def shutdown(self) -> None: + self.shutdown_calls += 1 + + +class _SyncAdapter: + """Adapter that implements hooks as plain (non-async) methods.""" + + def __init__(self, *, healthy: bool = True) -> None: + self.init_calls = 0 + self.shutdown_calls = 0 + self._healthy = healthy + + def initialize(self) -> None: + self.init_calls += 1 + + def health_check(self) -> bool: + return self._healthy + + def shutdown(self) -> None: + self.shutdown_calls += 1 + + +class _NoHookAdapter: + """Adapter with no lifecycle methods — the common built-in case.""" + + +class _RaisingShutdownAdapter: + async def shutdown(self) -> None: + raise RuntimeError("shutdown boom") + + +class _RaisingHealthAdapter: + async def health_check(self) -> bool: + raise RuntimeError("health boom") + + +class _FakeRegistry: + """Minimal stand-in that mimics DataRegistry / LLMRegistry storage.""" + + def __init__(self, adapters_attr: str = "_adapters") -> None: + setattr(self, adapters_attr, {}) + self._key = adapters_attr + + def register(self, capability: Any, name: str, adapter: Any) -> None: + bucket = getattr(self, self._key).setdefault(capability, []) + bucket.append((name, adapter)) + + +# -------------------------------------------------------------------------- +# Protocol detection +# -------------------------------------------------------------------------- + + +class TestLifecycleProtocol: + def test_async_adapter_matches_protocol(self) -> None: + assert isinstance(_AsyncAdapter(), LifecycleProvider) + + def test_no_hook_adapter_does_not_match(self) -> None: + assert not isinstance(_NoHookAdapter(), LifecycleProvider) + + +# -------------------------------------------------------------------------- +# initialize_provider +# -------------------------------------------------------------------------- + + +class TestInitializeProvider: + @pytest.mark.asyncio + async def test_async_hooks_called_and_healthy(self) -> None: + a = _AsyncAdapter() + assert await initialize_provider("acme", a) is True + assert a.init_calls == 1 + assert a.health_calls == 1 + + @pytest.mark.asyncio + async def test_sync_hooks_supported(self) -> None: + a = _SyncAdapter() + assert await initialize_provider("acme", a) is True + assert a.init_calls == 1 + + @pytest.mark.asyncio + async def test_no_hooks_always_healthy(self) -> None: + assert await initialize_provider("acme", _NoHookAdapter()) is True + + @pytest.mark.asyncio + async def test_unhealthy_returns_false(self, caplog: pytest.LogCaptureFixture) -> None: + a = _AsyncAdapter(healthy=False) + caplog.set_level(logging.WARNING, logger="qracer.provider_lifecycle") + assert await initialize_provider("acme", a) is False + assert any("reported unhealthy" in r.message for r in caplog.records) + + @pytest.mark.asyncio + async def test_initialize_failure_returns_false(self, caplog: pytest.LogCaptureFixture) -> None: + a = _AsyncAdapter(fail_init=True) + caplog.set_level(logging.WARNING, logger="qracer.provider_lifecycle") + assert await initialize_provider("acme", a) is False + # health_check should not be called once init fails + assert a.health_calls == 0 + assert any("initialize() failed" in r.message for r in caplog.records) + + @pytest.mark.asyncio + async def test_health_check_exception_returns_false( + self, caplog: pytest.LogCaptureFixture + ) -> None: + caplog.set_level(logging.WARNING, logger="qracer.provider_lifecycle") + assert await initialize_provider("acme", _RaisingHealthAdapter()) is False + assert any("health_check() raised" in r.message for r in caplog.records) + + +# -------------------------------------------------------------------------- +# shutdown_provider +# -------------------------------------------------------------------------- + + +class TestShutdownProvider: + @pytest.mark.asyncio + async def test_shutdown_invoked(self) -> None: + a = _AsyncAdapter() + await shutdown_provider("acme", a) + assert a.shutdown_calls == 1 + + @pytest.mark.asyncio + async def test_sync_shutdown_invoked(self) -> None: + a = _SyncAdapter() + await shutdown_provider("acme", a) + assert a.shutdown_calls == 1 + + @pytest.mark.asyncio + async def test_no_hook_is_noop(self) -> None: + # Should not raise. + await shutdown_provider("acme", _NoHookAdapter()) + + @pytest.mark.asyncio + async def test_exception_is_swallowed(self, caplog: pytest.LogCaptureFixture) -> None: + caplog.set_level(logging.WARNING, logger="qracer.provider_lifecycle") + # Should not raise. + await shutdown_provider("acme", _RaisingShutdownAdapter()) + assert any("shutdown() raised" in r.message for r in caplog.records) + + +# -------------------------------------------------------------------------- +# Sync wrappers +# -------------------------------------------------------------------------- + + +class TestSyncWrappers: + def test_initialize_sync_runs_async_hooks(self) -> None: + a = _AsyncAdapter() + assert initialize_provider_sync("acme", a) is True + assert a.init_calls == 1 + assert a.health_calls == 1 + + def test_initialize_sync_unhealthy(self) -> None: + a = _AsyncAdapter(healthy=False) + assert initialize_provider_sync("acme", a) is False + + def test_initialize_sync_no_hooks(self) -> None: + assert initialize_provider_sync("acme", _NoHookAdapter()) is True + + def test_initialize_sync_with_running_loop_is_noop(self) -> None: + """When a loop is running, sync wrapper returns True without invoking hooks.""" + a = _AsyncAdapter() + + async def _runner() -> bool: + return initialize_provider_sync("acme", a) + + result = asyncio.run(_runner()) + assert result is True + # Hooks are skipped in the running-loop branch. + assert a.init_calls == 0 + + def test_shutdown_all_sync(self) -> None: + a = _AsyncAdapter() + reg = _FakeRegistry() + reg.register("cap", "acme", a) + shutdown_all_providers_sync(reg) + assert a.shutdown_calls == 1 + + def test_shutdown_all_sync_with_running_loop_is_noop(self) -> None: + a = _AsyncAdapter() + reg = _FakeRegistry() + reg.register("cap", "acme", a) + + async def _runner() -> None: + shutdown_all_providers_sync(reg) + + asyncio.run(_runner()) + assert a.shutdown_calls == 0 + + +# -------------------------------------------------------------------------- +# shutdown_all_providers +# -------------------------------------------------------------------------- + + +class TestShutdownAll: + @pytest.mark.asyncio + async def test_deduplicates_shared_adapter(self) -> None: + """One adapter registered under two capabilities is shut down once.""" + a = _AsyncAdapter() + reg = _FakeRegistry() + reg.register("cap_a", "acme", a) + reg.register("cap_b", "acme", a) + await shutdown_all_providers(reg) + assert a.shutdown_calls == 1 + + @pytest.mark.asyncio + async def test_handles_llm_style_registry(self) -> None: + """Accepts registries that store adapters under ``_providers``.""" + a = _AsyncAdapter() + reg = _FakeRegistry(adapters_attr="_providers") + reg.register("role", "acme", a) + await shutdown_all_providers(reg) + assert a.shutdown_calls == 1 + + @pytest.mark.asyncio + async def test_empty_registries_noop(self) -> None: + # No _adapters/_providers attr at all. + class _Bare: + pass + + await shutdown_all_providers(_Bare()) # should not raise + + @pytest.mark.asyncio + async def test_mixed_registries(self) -> None: + """Both data- and llm-style registries visited in a single call.""" + a = _AsyncAdapter() + b = _AsyncAdapter() + data = _FakeRegistry() + llm = _FakeRegistry(adapters_attr="_providers") + data.register("cap", "a-adapter", a) + llm.register("role", "b-adapter", b) + await shutdown_all_providers(data, llm) + assert a.shutdown_calls == 1 + assert b.shutdown_calls == 1 + + @pytest.mark.asyncio + async def test_one_failure_does_not_stop_others(self) -> None: + good = _AsyncAdapter() + bad = _RaisingShutdownAdapter() + reg = _FakeRegistry() + reg.register("cap", "good", good) + reg.register("cap", "bad", bad) + await shutdown_all_providers(reg) + # Good adapter still shut down despite sibling exception. + assert good.shutdown_calls == 1