-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstate.py
More file actions
125 lines (99 loc) · 3.52 KB
/
state.py
File metadata and controls
125 lines (99 loc) · 3.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# src/bot/handlers/state.py
from dataclasses import dataclass
from typing import Optional
import asyncio
import json
from pathlib import Path
import os
RUNTIME_DIR = Path(os.getenv('RUNTIME_DIR', 'runtime'))
RUNTIME_DIR.mkdir(parents=True, exist_ok=True)
@dataclass
class QtyState:
product: str
msg_id: int
class PendingQty:
def __init__(self):
self._store = {}
self._lock = asyncio.Lock()
async def set(self, user_id: int, product: str, msg_id: int):
async with self._lock:
self._store[user_id] = QtyState(product, msg_id)
async def get(self, user_id: int) -> Optional[QtyState]:
async with self._lock:
return self._store.get(user_id)
async def clear(self, user_id: int):
async with self._lock:
self._store.pop(user_id, None)
@dataclass
class PremiumState:
plan: str
class PendingPremium:
"""In-memory pending premium selection with optional disk persistence.
Persists each user's pending plan to: runtime/pending_premium_{user_id}.json
Uses run_in_executor to avoid blocking the event loop on file I/O.
"""
def __init__(self):
self._store: dict[int, PremiumState] = {}
self._lock = asyncio.Lock()
def _path_for(self, user_id: int) -> Path:
return RUNTIME_DIR / f'pending_premium_{user_id}.json'
async def _write_file(self, path: Path, data: dict):
loop = asyncio.get_event_loop()
def _write():
with open(path, 'w', encoding='utf-8') as f:
json.dump(data, f)
await loop.run_in_executor(None, _write)
async def _read_file(self, path: Path) -> Optional[dict]:
loop = asyncio.get_event_loop()
def _read():
if not path.exists():
return None
with open(path, 'r', encoding='utf-8') as f:
return json.load(f)
return await loop.run_in_executor(None, _read)
async def _remove_file(self, path: Path):
loop = asyncio.get_event_loop()
def _rm():
try:
if path.exists():
path.unlink()
except Exception:
pass
await loop.run_in_executor(None, _rm)
async def set(self, user_id: int, plan: str):
"""Set pending plan for user and persist to disk."""
async with self._lock:
self._store[user_id] = PremiumState(plan)
data = {'plan': plan}
path = self._path_for(user_id)
try:
await self._write_file(path, data)
except Exception:
# best-effort: don't raise to calling handlers
pass
async def get(self, user_id: int) -> Optional[PremiumState]:
async with self._lock:
st = self._store.get(user_id)
if st:
return st
# if not in memory, try to read from disk
path = self._path_for(user_id)
try:
data = await self._read_file(path)
if data and 'plan' in data:
ps = PremiumState(data['plan'])
# cache in memory for future
async with self._lock:
self._store[user_id] = ps
return ps
except Exception:
return None
return None
async def clear(self, user_id: int):
async with self._lock:
self._store.pop(user_id, None)
path = self._path_for(user_id)
try:
await self._remove_file(path)
except Exception:
pass