-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathpool.py
More file actions
90 lines (72 loc) · 2.91 KB
/
pool.py
File metadata and controls
90 lines (72 loc) · 2.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import asyncio
from dataclasses import dataclass, field
from pathlib import Path
from config import AUTHEN_DIR, AUTH_FILE
from loguru import logger
from playwright.async_api import Browser, BrowserContext, Playwright
from utils import create_context, launch_browser
@dataclass
class ContextPool:
"""A reusable pool of independent Playwright browser contexts."""
_contexts: list[BrowserContext] = field(default_factory=list)
_semaphore: asyncio.Semaphore | None = None
_queue: asyncio.Queue[BrowserContext] | None = None
_browser: Browser | None = None
@staticmethod
def _discover_auth_files() -> list[Path]:
auth_files = sorted(AUTHEN_DIR.glob("*.json"))
return auth_files or [AUTH_FILE]
@classmethod
async def create(cls, playwright: Playwright, pool_size: int = 3) -> "ContextPool":
if pool_size < 1:
raise ValueError("pool_size must be at least 1")
pool = cls()
pool._semaphore = asyncio.Semaphore(pool_size)
pool._queue = asyncio.Queue()
pool._browser = await launch_browser(playwright)
auth_files = pool._discover_auth_files()
if len(auth_files) < pool_size:
logger.warning(
"Requested pool_size={}, but only found {} auth state file(s) in {}. "
"Contexts will reuse available auth state files.",
pool_size,
len(auth_files),
AUTHEN_DIR,
)
try:
for index in range(pool_size):
auth_file = auth_files[index % len(auth_files)]
context = await create_context(pool._browser, storage_state=auth_file)
pool._contexts.append(context)
pool._queue.put_nowait(context)
logger.info(f"Context {index} created with auth state {auth_file.name}")
except Exception:
await pool.close()
raise
return pool
async def acquire(self) -> BrowserContext:
if self._semaphore is None or self._queue is None:
raise RuntimeError("Context pool is not initialized")
await self._semaphore.acquire()
try:
return await self._queue.get()
except Exception:
self._semaphore.release()
raise
def release(self, context: BrowserContext):
if self._semaphore is None or self._queue is None:
raise RuntimeError("Context pool is not initialized")
self._queue.put_nowait(context)
self._semaphore.release()
async def close(self):
for context in self._contexts:
try:
await context.close()
except Exception as exc:
logger.warning(f"Failed to close context: {exc}")
self._contexts.clear()
if self._browser is not None:
try:
await self._browser.close()
finally:
self._browser = None