forked from 21amY26/EVI
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathengine.py
More file actions
94 lines (83 loc) · 2.87 KB
/
engine.py
File metadata and controls
94 lines (83 loc) · 2.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
"""Core Engine - Gap Detection."""
from dataclasses import dataclass
from datetime import datetime
from typing import List, Dict
import re, os
from collections import defaultdict
from rich.console import Console
import time
console = Console()
@dataclass
class Gap:
id: int
start_time: datetime
end_time: datetime
duration: float
severity: str
notes: str
@dataclass
class EngineStats:
gaps: List[Gap] = None
gaps_by_sev: Dict[str, int] = None
total_lines: int = 0
total_missing_time: float = 0.0
def __post_init__(self):
if self.gaps is None:
self.gaps = []
if self.gaps_by_sev is None:
self.gaps_by_sev = {'LOW': 0, 'MEDIUM': 0, 'CRITICAL': 0}
def severity(gap_seconds: float) -> str:
if gap_seconds < 60:
return 'LOW'
elif gap_seconds < 300:
return 'MEDIUM'
else:
return 'CRITICAL'
def parse_timestamp(line: str) -> datetime:
patterns = [
r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})',
r'(\d{2}/[A-Za-z]{3}/\d{4}:\d{2}:\d{2}:\d{2})',
r'([A-Za-z]{3} \d{1,2} \d{2}:\d{2}:\d{2})'
]
for pattern in patterns:
match = re.search(pattern, line)
if match:
try:
return datetime.strptime(match.group(1), '%Y-%m-%d %H:%M:%S')
except:
pass
return None
def detect_gaps(filepath: str, threshold: float = 60.0) -> EngineStats:
stats = EngineStats()
clean_path = filepath.strip().strip("'\"")
if not clean_path.endswith(('.log', '.txt')) or not os.path.exists(clean_path) or not os.path.isfile(clean_path):
console.print(f"[bold red]Error:[/] Unsupported file type: {clean_path}")
console.print("[bold yellow]Using demo data instead...[/]")
time.sleep(1.5)
stats.total_lines = 8
fake_gap = Gap(1, datetime.now(), datetime.now(), 2000, 'CRITICAL', 'Demo gap')
stats.gaps = [fake_gap]
stats.gaps_by_sev['CRITICAL'] = 1
stats.gaps_by_sev['MEDIUM'] = 1
stats.total_missing_time = 2055
return stats
timestamps = []
with open(clean_path, 'r') as f:
for line_num, line in enumerate(f, 1):
stats.total_lines += 1
dt = parse_timestamp(line)
if dt:
timestamps.append((line_num, dt))
gaps = []
for i in range(1, len(timestamps)):
gap = (timestamps[i][1] - timestamps[i-1][1]).total_seconds()
if gap > threshold:
sev = severity(gap)
gap_obj = Gap(len(gaps)+1, timestamps[i-1][1], timestamps[i][1], gap, sev, f"Line {timestamps[i-1][0]}-{timestamps[i][0]}")
gaps.append(gap_obj)
stats.gaps_by_sev[sev] += 1
stats.total_missing_time += gap
stats.gaps = gaps
return stats
def run_forensic(filepath: str, threshold: float = 60.0):
return detect_gaps(filepath, threshold)