-
-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathmemory.py
More file actions
40 lines (34 loc) · 1.55 KB
/
memory.py
File metadata and controls
40 lines (34 loc) · 1.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from collections import defaultdict, deque
from datetime import datetime
from typing import Any
from .abc import DataBase, ClusterCounterInfo
class MemoryDataBase(DataBase):
def __init__(
self,
database_name: str
):
super().__init__(database_name)
self._clusters_logs = deque(maxlen=10000)
self._clusters_counters: defaultdict[int, defaultdict[str, defaultdict[str, int]]] = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
async def insert_cluster_info(self, cluster_id: str, type: str, event: str, data: Any | None = None):
self._clusters_logs.append({
'cluster_id': cluster_id,
'type': type,
'event': event,
'data': data,
})
async def upsert_cluster_counter(self, cluster_id: str, hits: int, bytes: int):
hour = self.get_hour()
self._clusters_counters[hour][cluster_id]['hits'] += hits
self._clusters_counters[hour][cluster_id]['bytes'] += bytes
async def get_cluster_counter(self, cluster_id: str, before_hour: int = 0) -> list[ClusterCounterInfo]:
res = []
for h in range(before_hour, self.get_hour() + 1):
if h not in self._clusters_counters or cluster_id not in self._clusters_counters[h]:
continue
res.append(ClusterCounterInfo(
time=datetime.fromtimestamp(h * 3600),
hits=self._clusters_counters[h][cluster_id]['hits'],
bytes=self._clusters_counters[h][cluster_id]['bytes'],
))
return res