diff --git a/ax_browser_broker/api.py b/ax_browser_broker/api.py index 2793ffd..3a12fc7 100644 --- a/ax_browser_broker/api.py +++ b/ax_browser_broker/api.py @@ -41,7 +41,7 @@ ) from .docs import docs from .feedback import FeedbackError, list_issues, report_issue, update_issue -from .identities import IdentityError, redacted_status +from .identities import IdentityError, invalidate_identity_replicas, redacted_status from .lease_control import LeaseControlError, complete_control_session, create_control_session, get_control_session from .pool import LeaseError, heartbeat, lease, release, require_lease, status from .profiles import profile_status, seed_slot, snapshot_golden @@ -294,6 +294,60 @@ def _safe_record_event(**kwargs: Any) -> None: return +def _verify_auth_cookie_landed(identity_id: str, host: str | None) -> dict[str, Any]: + """Confirm the auth login wrote a cookie for the target origin into the base profile. + + The next lease re-syncs each replica from this base profile, so if the cookie did not + land here it will not be visible to the agent. Returns a structured result for the + caller to log/inspect; never raises. + """ + import sqlite3 + + from .identities import require_identity + + result: dict[str, Any] = {"ok": False, "host": host, "checked": False} + try: + identity = require_identity(identity_id) + except Exception as error: # pragma: no cover - defensive + result["error"] = str(error) + return result + base_dir = Path(identity.profile_dir) + result["base_profile_dir"] = str(base_dir) + total = 0 + host_matches = 0 + for relative in ("Default/Cookies", "Default/Network/Cookies"): + db_path = base_dir / relative + if not db_path.exists(): + continue + try: + connection = sqlite3.connect(f"file:{db_path}?mode=ro&immutable=1", uri=True, timeout=1) + try: + result["checked"] = True + total += int(connection.execute("select count(*) from cookies").fetchone()[0] or 0) + if host: + # Match on the registrable parent domain (last two labels) because + # session cookies are commonly set on the parent (e.g. a login at + # app.slack.com sets cookies on .slack.com). + labels = host.split(".") + needle = ".".join(labels[-2:]) if len(labels) >= 2 else host + row = connection.execute( + "select count(*) from cookies where host_key like ?", + (f"%{needle}%",), + ).fetchone() + host_matches += int(row[0] or 0) + finally: + connection.close() + except sqlite3.Error as error: + result["error"] = str(error) + continue + result["total_cookies"] = total + result["host_cookie_matches"] = host_matches + # ok when we found at least one cookie for the target host, or (no host given) the + # profile has cookies at all after the login. + result["ok"] = bool(host_matches > 0) if host else bool(total > 0) + return result + + def _record_browser_failure(request: LeaseIdRequest, action: str, error: Exception, data: dict[str, Any] | None = None) -> None: _safe_record_event( source="broker-api", @@ -3951,7 +4005,34 @@ async def auth_start_vnc(token: str, request: Request) -> Any: async def auth_complete(token: str) -> dict[str, Any]: try: request = complete_auth_request(token) + # Stop the portal browser FIRST so Chrome flushes the freshly-authenticated + # cookies to the identity's base profile on disk before we touch replicas. request["vnc_stop"] = stop_auth_vnc(token, missing_ok=True) + identity_id = request.get("identity_id") + if identity_id: + # The human logged in against the identity's BASE profile. Parallel-session + # leases are served from per-slot replicas, so invalidate stale replicas to + # force a fresh base->replica re-sync on the next lease. Without this the + # agent lease would reuse a warm replica that predates the login and see a + # logged-out session (the core auth-handoff bug). + try: + request["replica_invalidation"] = invalidate_identity_replicas(str(identity_id)) + except IdentityError as replica_error: + request["replica_invalidation"] = {"error": str(replica_error)} + target_url = str(request.get("url") or "") + host = urllib.parse.urlsplit(target_url).hostname if target_url else None + verification = _verify_auth_cookie_landed(str(identity_id), host) + request["cookie_verification"] = verification + if not verification.get("ok"): + _safe_record_event( + source=str(request.get("owner", "unknown")), + event_type="auth", + message="Auth completed but target-origin cookie not found in base profile", + severity="warning", + url=target_url, + tags=["auth", "complete", "cookie-missing"], + data={"token": token, "identity_id": identity_id, "verification": verification}, + ) _safe_record_event( source=str(request.get("owner", "unknown")), event_type="auth", diff --git a/ax_browser_broker/identities.py b/ax_browser_broker/identities.py index c799734..3da8686 100644 --- a/ax_browser_broker/identities.py +++ b/ax_browser_broker/identities.py @@ -4,6 +4,7 @@ import ast import json import os +import shutil import socket import stat import subprocess @@ -240,6 +241,57 @@ def identity_replica_profile_dir(identity: BrowserIdentity, slot_name: str) -> P return BROWSER_POOL_DIR / "profiles" / ".replicas" / identity.identity_id / slot_name +def invalidate_identity_replicas(identity_id: str) -> dict[str, Any]: + """Drop stale per-slot replicas for an identity so the next lease re-syncs from base. + + The auth portal writes login cookies into the identity's BASE profile dir. For + identities with max_parallel_sessions > 1, agent leases are served from per-slot + replica copies under .replicas//. A replica synced before the + human's login is stale and would present a logged-out session to the agent. After + an auth handoff completes we clear the slot config + remove the replica dir for + every slot of this identity that is NOT currently leased, forcing a fresh + base->replica rsync on the next lease. Slots with an active foreign lease are left + untouched (the pool freshness guard re-syncs them on their next lease) so we never + corrupt a live session. + """ + identity = require_identity(identity_id) + replica_root = (BROWSER_POOL_DIR / "profiles" / ".replicas").resolve() + cleared_configs: list[str] = [] + removed_replicas: list[str] = [] + skipped_leased: list[str] = [] + for slot in SLOTS: + if active_identity_id(slot.name) != identity_id: + continue + configured = read_slot_config(slot.name).get("PROFILE_DIR") + if not configured: + continue + configured_path = Path(configured) + try: + configured_path.resolve().relative_to(replica_root) + except ValueError: + # Base profile (not a replica) — auth portal already wrote it; leave it. + continue + if _slot_has_active_lease(slot.name): + skipped_leased.append(slot.name) + continue + config_path = POOL_CONFIG_DIR / f"{slot.name}.env" + if config_path.exists(): + config_path.unlink() + cleared_configs.append(slot.name) + try: + if configured_path.resolve().relative_to(replica_root) and configured_path.exists(): + shutil.rmtree(configured_path, ignore_errors=True) + removed_replicas.append(str(configured_path)) + except (ValueError, OSError): + pass + return { + "identity_id": identity_id, + "cleared_slot_configs": cleared_configs, + "removed_replicas": removed_replicas, + "skipped_leased_slots": skipped_leased, + } + + def _sync_replica_profile(source: Path, target: Path) -> None: if not source.exists(): target.mkdir(parents=True, exist_ok=True) diff --git a/ax_browser_broker/pool.py b/ax_browser_broker/pool.py index 250d150..f9bb52f 100644 --- a/ax_browser_broker/pool.py +++ b/ax_browser_broker/pool.py @@ -274,6 +274,39 @@ def _is_replica_profile(profile_dir: str | None) -> bool: return True +def _cookie_db_mtime(profile_dir: Path) -> float: + """Newest mtime across a profile's cookie databases (0.0 if none exist).""" + newest = 0.0 + for relative in ("Default/Cookies", "Default/Network/Cookies"): + db_path = profile_dir / relative + try: + mtime = db_path.stat().st_mtime + except OSError: + continue + if mtime > newest: + newest = mtime + return newest + + +def _replica_is_stale_against_base(base_profile_dir: Path, replica_profile_dir: Path) -> bool: + """True when the base profile has cookies newer than the warm replica copy. + + The auth portal logs the human in against the identity's BASE profile dir, but + parallel-session leases are served from per-slot replicas under .replicas/. + A replica synced before the human's login never picks up the new cookies, so the + agent lease would see a logged-out session. When the base cookie DB is newer than + the replica's, the replica must be re-synced from base before it is leased. + """ + if base_profile_dir.resolve() == replica_profile_dir.resolve(): + return False + base_mtime = _cookie_db_mtime(base_profile_dir) + if base_mtime <= 0.0: + return False + replica_mtime = _cookie_db_mtime(replica_profile_dir) + # Newer base cookies (with a small tolerance to avoid churning on equal stamps). + return base_mtime > replica_mtime + 1.0 + + def _has_duplicate_profile_slot(identity_id: str, selected_slot: str, selected_profile_dir: str | None) -> bool: if not selected_profile_dir: return False @@ -331,10 +364,43 @@ def lease(owner: str, ttl_seconds: int = LEASE_TTL_SECONDS, identity_id: str | N and identity_lease_count == 0 and _has_duplicate_profile_slot(identity_id, slot.name, identity_profile_dir) ) + # If this lease would be served from a warm replica whose cookies predate + # the identity's base profile (e.g. a human just completed a login in the + # auth portal, which writes to the base profile), force a re-activation so + # the replica is re-synced from base and the agent sees the fresh session. + replica_needs_refresh = bool( + identity_id + and identity + and active_identity == identity_id + and _is_replica_profile(identity_profile_dir) + and _replica_is_stale_against_base( + identity.profile_dir, Path(identity_profile_dir) + ) + ) + if replica_needs_refresh: + try: + from .telemetry import record_event + + record_event( + source=owner, + event_type="lease", + message="Refreshing stale identity replica from base before lease", + severity="info", + tags=["lease", "replica", "auth-sync"], + data={ + "slot": slot.name, + "identity_id": identity_id, + "replica_profile_dir": identity_profile_dir, + "base_profile_dir": str(identity.profile_dir), + }, + ) + except Exception: + pass if identity_id and ( active_identity != identity_id or not _slot_ready(slot) or reconcile_stale_identity_slots + or replica_needs_refresh ): try: activate_identity( diff --git a/docs/browser-routing.md b/docs/browser-routing.md index c6fe22e..5908ed7 100644 --- a/docs/browser-routing.md +++ b/docs/browser-routing.md @@ -48,6 +48,8 @@ openbrowser-use --identity work-main --json open https://example.com When `policy.max_parallel_sessions` is greater than one, parallel leases use per-slot replicas under `profiles/.replicas//`. +A human auth handoff (`/auth/*`) logs in against the identity's **base** profile dir, not a replica. So that the agent's next lease sees the freshly-authenticated session, completing an auth request now invalidates that identity's stale replicas, and a lease that would be served from a replica whose cookies predate the base profile re-syncs the replica from base before handing it out. The auth-complete response also includes a `cookie_verification` block confirming the target-origin cookie actually landed in the base profile. + Do not launch several independent Chrome processes against the same `profile_dir`. Chrome profile locks, SQLite databases, and local state files are single-writer resources. On a laptop, several Chrome windows for the same profile still belong to one Chrome process; on the broker, separate agents normally receive separate Chrome processes. Use these identity concurrency modes: @@ -55,7 +57,7 @@ Use these identity concurrency modes: | Mode | Use For | Tradeoff | | --- | --- | --- | | Single canonical lease | Login, settings changes, sensitive account actions | Strongest persistence; one lease owner at a time; that owner can open multiple tabs | -| Profile replicas | Parallel read/QA/background flows with the same seeded identity | Independent slots; sessions can diverge until replicas are refreshed | +| Profile replicas | Parallel read/QA/background flows with the same seeded identity | Independent slots; replicas re-sync from base after an auth handoff or when their cookies predate the base profile | | Shared live browser coordinator | Future mode for several agents attached to one running Chrome process | Not the default lease contract; needs focus/navigation arbitration | The default contract is a single canonical lease unless the identity explicitly opts into replicas with `policy.max_parallel_sessions`. diff --git a/tests/test_api.py b/tests/test_api.py index 1d56074..30c0057 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -1075,3 +1075,73 @@ def test_telemetry_api_redacts_sensitive_data(tmp_path, monkeypatch) -> None: assert listed.json()["count"] == 1 summary = client.get("/telemetry/summary") assert summary.json()["by_event_type"]["smoke"] == 1 + + +def test_auth_complete_invalidates_replicas_and_verifies_cookie(tmp_path, monkeypatch) -> None: + import sqlite3 + + monkeypatch.setattr(auth, "AUTH_STATE_FILE", tmp_path / "auth.json") + monkeypatch.setattr(api, "stop_auth_vnc", lambda token, missing_ok=False: {"stopped": []}) + + invalidations = [] + monkeypatch.setattr( + api, + "invalidate_identity_replicas", + lambda identity_id: invalidations.append(identity_id) or {"identity_id": identity_id, "removed_replicas": []}, + ) + + # Base profile with a slack cookie (simulating a completed login). + base_profile = tmp_path / "chrome-depontefede" + cookie_dir = base_profile / "Default" + cookie_dir.mkdir(parents=True) + connection = sqlite3.connect(cookie_dir / "Cookies") + connection.execute("create table cookies (host_key text, name text)") + connection.execute("insert into cookies values ('api.slack.com', 'd')") + connection.commit() + connection.close() + + class Identity: + identity_id = "chrome-depontefede" + profile_dir = base_profile + + monkeypatch.setattr("ax_browser_broker.identities.require_identity", lambda _id: Identity()) + monkeypatch.setattr(auth, "require_identity", lambda _id: Identity()) + + request = auth.create_auth_request( + "tester", "https://app.slack.com/client", identity_id="chrome-depontefede" + ) + + client = TestClient(api.app) + response = client.post("/auth/" + request["token"] + "/complete") + + assert response.status_code == 200 + body = response.json() + assert invalidations == ["chrome-depontefede"] + assert body["replica_invalidation"]["identity_id"] == "chrome-depontefede" + assert body["cookie_verification"]["ok"] is True + assert body["cookie_verification"]["host_cookie_matches"] >= 1 + + +def test_auth_complete_flags_missing_target_cookie(tmp_path, monkeypatch) -> None: + monkeypatch.setattr(auth, "AUTH_STATE_FILE", tmp_path / "auth.json") + monkeypatch.setattr(api, "stop_auth_vnc", lambda token, missing_ok=False: {"stopped": []}) + monkeypatch.setattr(api, "invalidate_identity_replicas", lambda identity_id: {"identity_id": identity_id}) + + base_profile = tmp_path / "empty-identity" + base_profile.mkdir() + + class Identity: + identity_id = "empty-identity" + profile_dir = base_profile + + monkeypatch.setattr("ax_browser_broker.identities.require_identity", lambda _id: Identity()) + monkeypatch.setattr(auth, "require_identity", lambda _id: Identity()) + + request = auth.create_auth_request( + "tester", "https://app.slack.com/client", identity_id="empty-identity" + ) + client = TestClient(api.app) + response = client.post("/auth/" + request["token"] + "/complete") + + assert response.status_code == 200 + assert response.json()["cookie_verification"]["ok"] is False diff --git a/tests/test_identities.py b/tests/test_identities.py index 1e0dc7b..acbe227 100644 --- a/tests/test_identities.py +++ b/tests/test_identities.py @@ -342,3 +342,99 @@ def test_activate_identity_refuses_active_slot(tmp_path, monkeypatch) -> None: with pytest.raises(identities.IdentityError): identities.activate_identity("chrome-openpaper", "pool-a") + + +def test_invalidate_identity_replicas_clears_unleased_skips_leased(tmp_path, monkeypatch): + from pathlib import Path + + identity_file = tmp_path / "identities.json" + pool_config_dir = tmp_path / "pool-config" + pool_config_dir.mkdir() + browser_pool = tmp_path / "browser-pool" + base_profile = browser_pool / "profiles" / "chrome-one" + base_profile.mkdir(parents=True) + replica_root = browser_pool / "profiles" / ".replicas" / "chrome-one" + replica_a = replica_root / "pool-a" + replica_b = replica_root / "pool-b" + replica_a.mkdir(parents=True) + replica_b.mkdir(parents=True) + (replica_a / "marker").write_text("a", encoding="utf-8") + (replica_b / "marker").write_text("b", encoding="utf-8") + + identity_file.write_text( + json.dumps( + { + "identities": { + "chrome-one": { + "slot": "auto", + "profile_dir": str(base_profile), + "policy": {"max_parallel_sessions": 2}, + } + } + } + ), + encoding="utf-8", + ) + + monkeypatch.setattr(identities, "IDENTITIES_FILE", identity_file) + monkeypatch.setattr(identities, "POOL_CONFIG_DIR", pool_config_dir) + monkeypatch.setattr(identities, "BROWSER_POOL_DIR", browser_pool) + + # pool-a: replica config, leased -> must be skipped (left intact). + # pool-b: replica config, NOT leased -> config cleared + replica removed. + slot_configs = { + "pool-a": {"IDENTITY_ID": "chrome-one", "PROFILE_DIR": str(replica_a)}, + "pool-b": {"IDENTITY_ID": "chrome-one", "PROFILE_DIR": str(replica_b)}, + } + for slot_name in ("pool-a", "pool-b"): + (pool_config_dir / f"{slot_name}.env").write_text("x=1\n", encoding="utf-8") + + monkeypatch.setattr(identities, "active_identity_id", lambda slot_name: slot_configs.get(slot_name, {}).get("IDENTITY_ID")) + monkeypatch.setattr(identities, "read_slot_config", lambda slot_name: slot_configs.get(slot_name, {})) + monkeypatch.setattr(identities, "_slot_has_active_lease", lambda slot_name: slot_name == "pool-a") + + result = identities.invalidate_identity_replicas("chrome-one") + + assert result["skipped_leased_slots"] == ["pool-a"] + assert result["cleared_slot_configs"] == ["pool-b"] + assert str(replica_b) in result["removed_replicas"] + # Leased slot left untouched. + assert (pool_config_dir / "pool-a.env").exists() + assert replica_a.exists() + # Unleased slot wiped so next lease re-syncs from base. + assert not (pool_config_dir / "pool-b.env").exists() + assert not replica_b.exists() + + +def test_invalidate_identity_replicas_leaves_base_profile_alone(tmp_path, monkeypatch): + identity_file = tmp_path / "identities.json" + pool_config_dir = tmp_path / "pool-config" + pool_config_dir.mkdir() + browser_pool = tmp_path / "browser-pool" + base_profile = browser_pool / "profiles" / "chrome-one" + base_profile.mkdir(parents=True) + + identity_file.write_text( + json.dumps( + { + "identities": { + "chrome-one": {"slot": "auto", "profile_dir": str(base_profile)} + } + } + ), + encoding="utf-8", + ) + monkeypatch.setattr(identities, "IDENTITIES_FILE", identity_file) + monkeypatch.setattr(identities, "POOL_CONFIG_DIR", pool_config_dir) + monkeypatch.setattr(identities, "BROWSER_POOL_DIR", browser_pool) + # Slot points at the BASE profile (not a replica) -> must not be cleared/removed. + (pool_config_dir / "pool-a.env").write_text("x=1\n", encoding="utf-8") + monkeypatch.setattr(identities, "active_identity_id", lambda slot_name: "chrome-one" if slot_name == "pool-a" else None) + monkeypatch.setattr(identities, "read_slot_config", lambda slot_name: {"IDENTITY_ID": "chrome-one", "PROFILE_DIR": str(base_profile)} if slot_name == "pool-a" else {}) + monkeypatch.setattr(identities, "_slot_has_active_lease", lambda slot_name: False) + + result = identities.invalidate_identity_replicas("chrome-one") + assert result["cleared_slot_configs"] == [] + assert result["removed_replicas"] == [] + assert (pool_config_dir / "pool-a.env").exists() + assert base_profile.exists() diff --git a/tests/test_pool.py b/tests/test_pool.py index 9043413..079971f 100644 --- a/tests/test_pool.py +++ b/tests/test_pool.py @@ -781,3 +781,89 @@ def activate(identity_id: str, slot_name: str, check_leases: bool = True, **_kwa finally: pool.release(busy.lease_id) pool.release(free.lease_id) + + +def _write_cookie_db(profile_dir, host="api.slack.com", count=1): + """Create a minimal Chrome-shaped Cookies sqlite db under Default/Cookies.""" + import sqlite3 + + db_dir = profile_dir / "Default" + db_dir.mkdir(parents=True, exist_ok=True) + db_path = db_dir / "Cookies" + connection = sqlite3.connect(db_path) + try: + connection.execute("create table if not exists cookies (host_key text, name text)") + connection.execute("delete from cookies") + for index in range(count): + connection.execute("insert into cookies (host_key, name) values (?, ?)", (host, f"c{index}")) + connection.commit() + finally: + connection.close() + return db_path + + +def test_replica_is_stale_against_base_detects_newer_base(tmp_path): + import os + + base = tmp_path / "base" + replica = tmp_path / "replicas" / "pool-b" + base_db = _write_cookie_db(base) + replica_db = _write_cookie_db(replica) + # Replica synced before the human login: older than base. + os.utime(replica_db, (1000, 1000)) + os.utime(base_db, (2000, 2000)) + assert pool._replica_is_stale_against_base(base, replica) is True + # When replica is at least as fresh as base, no refresh needed. + os.utime(replica_db, (2000, 2000)) + assert pool._replica_is_stale_against_base(base, replica) is False + # Same dir is never stale. + assert pool._replica_is_stale_against_base(base, base) is False + + +def test_warm_replica_resynced_when_base_has_fresher_cookies(tmp_path, monkeypatch): + """Regression: a warm replica that predates the auth login must be re-synced.""" + import os + + base = tmp_path / "chrome-one" + replica = tmp_path / "profiles" / ".replicas" / "chrome-one" / "pool-a" + base_db = _write_cookie_db(base, count=5) + replica_db = _write_cookie_db(replica, count=0) + os.utime(replica_db, (1000, 1000)) # stale replica + os.utime(base_db, (5000, 5000)) # fresh base (human just logged in) + + active = {"pool-a": "chrome-one", "pool-b": None, "pool-c": None} + activations = [] + + class Identity: + identity_id = "chrome-one" + slot = "auto" + profile_dir = base + proxy_ref = None + max_parallel_sessions = 2 + + identity = Identity() + + monkeypatch.setattr(pool, "POOL_STATE_FILE", tmp_path / "leases.json") + monkeypatch.setattr(pool, "healthy", lambda _port: True) + monkeypatch.setattr(pool, "require_identity", lambda _identity_id: identity) + monkeypatch.setattr(pool, "load_identities", lambda: {"chrome-one": identity}) + monkeypatch.setattr(pool, "active_identity_id", lambda slot_name: active.get(slot_name)) + monkeypatch.setattr( + pool, + "read_slot_config", + lambda slot_name: {"PROFILE_DIR": str(replica)} if slot_name == "pool-a" else {}, + ) + monkeypatch.setattr(pool, "_is_replica_profile", lambda profile_dir: "/.replicas/" in str(profile_dir or "")) + monkeypatch.setattr(pool, "identity_replica_profile_dir", lambda _identity, slot_name: replica) + monkeypatch.setattr(pool, "activate_identity", lambda *args, **kwargs: activations.append((args, kwargs))) + + leased = pool.lease("agent-1", identity_id="chrome-one") + try: + # The stale warm replica forced a re-activation (which re-syncs base->replica), + # instead of the no-resync warm reuse path. + assert leased.name == "pool-a" + assert leased.profile_dir == str(replica) + assert len(activations) == 1, "expected re-activation to re-sync stale replica" + assert activations[0][0][1] == "pool-a" + finally: + pool.release(leased.lease_id)