-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpageStatus_monitor.py
More file actions
284 lines (247 loc) · 11 KB
/
pageStatus_monitor.py
File metadata and controls
284 lines (247 loc) · 11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
"""
# Install all necessary Python Module or Package
pip install requests python-whois pymongo
# Will use cronjob in Linux to repeat (Every 6 Hours execute below script - sudo crontab -e)
30 */6 * * * /usr/bin/python3 /backup/domain_monitor.py >> /backup/domain_monitor_logfile.log 2>&1
# Access MongoDB on Docker
docker exec -it <MONGODB_DOCKER_ID> bash
mongo -u username -p password
"""
import requests
import whois
import ssl
import socket
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime, timedelta
from pymongo import MongoClient
# Configuration
DOMAINS_TO_MONITOR = ["", "", "", "", ""] # Only will be use when FILE_PATH can't be found
FILE_PATH = "/domain_monitoring/landingPage_domain_monitor.txt" # File containing domains to monitor
MAX_RETRIES = 8
# Alert Days Confguration
DOMAIN_ALERT_DAYS_THRESHOLD = 30 # Number of days before expiry to trigger an alert
SSL_ALERT_DAYS_THRESHOLD = 5 # Number of days before expiry to trigger an alert
# Email Configuration
EMAIL_RECEIVERS = ["", "", "", "", ""]
SMTP_SERVER = "
SMTP_PORT = 587
SMTP_USER = ""
SMTP_PASSWORD = ""
# MongoDB Configuration
MONGO_URI = "mongodb://username:password@localhost:27017"
MONGO_DB = "landingPage_monitoring"
MONGO_COLLECTION = "landingPage_data"
# Telegram Configuration
TELEGRAM_BOT_TOKEN = "" # Replace with your bot's API token
TELEGRAM_CHAT_ID = "" # Replace with your chat or group ID
# Slack Configuration
SLACK_WEBHOOK_URL = ""
def check_page_status(domain):
"""Check Landing Page status with retries."""
print(f"Checking https://{domain} landing page status")
domain = f"https://{domain}"
for attempt in range(MAX_RETRIES):
try:
response = requests.get(domain, timeout=10)
if response.status_code == 200:
print(f"OK for {domain} on attend {attempt + 1}")
return True
except requests.Timeout:
print(f"Timeout error while checking {domain}")
except requests.RequestException as e:
print(f"Request error while checking {domain}: {e}")
print(f"WARNING - Invalid Landing Page status for domain: {domain}, all {MAX_RETRIES} attempts failed.")
return False
def check_domain_expiry(domain):
"""Fetch domain expiry date using WHOIS."""
for attempt in range(MAX_RETRIES):
try:
domain_info = whois.whois(domain)
expiry_date = domain_info.expiration_date
if isinstance(expiry_date, list): # Handle cases where multiple dates are returned
expiry_date = expiry_date[0]
if expiry_date:
days_left = (expiry_date - datetime.now()).days
print(f"Domain expiry found on attempt {attempt + 1} for domain: {domain}")
return expiry_date, days_left
except Exception as e:
print(f"Error checking expiry for {domain}: {e}")
print(f"WARNING - Can't obtain Domain Expiry for domain: {domain}, all {MAX_RETRIES} attempts failed.")
return None, None
def check_ssl_certificate(domain):
"""Fetch SSL certificate expiration date for a subdomain."""
# Ensure the domain starts with http:// (or https://), so urlparse works consistently
if not domain.startswith(('http://', 'https://')):
domain = 'http://' + domain
# Use urlparse to extract the base domain (without any path or query)
parsed_domain = urlparse(domain)
base_domain = parsed_domain.netloc # This will give us the domain without the path
# Just in case the domain includes a port number, we strip it
if ':' in base_domain:
base_domain = base_domain.split(':')[0]
full_domain = base_domain # This is the domain to check
for attempt in range(MAX_RETRIES):
try:
ctx = ssl.create_default_context()
with socket.create_connection((full_domain, 443), timeout=10) as sock:
with ctx.wrap_socket(sock, server_hostname=full_domain) as ssock:
cert = ssock.getpeercert()
expiry_date = datetime.strptime(cert['notAfter'], '%b %d %H:%M:%S %Y GMT')
days_left = (expiry_date - datetime.now()).days
print(f"Domain SSL expiry found on attempt {attempt + 1} for domain: {full_domain}")
return expiry_date, days_left
except Exception as e:
print(f"Error checking SSL certificate for {full_domain}: {e}")
print(f"WARNING - Can't obtain SSL Expiry for domain: {full_domain}, all {MAX_RETRIES} attempts failed.")
return None, None
def send_telegram_alert(message):
"""Send a message to a Telegram chat or group."""
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
payload = {"chat_id": TELEGRAM_CHAT_ID, "text": message}
try:
response = requests.post(url, json=payload)
if response.status_code == 200:
print(f"Telegram alert sent successfully. {message}")
else:
print(f"Failed to send Telegram alert: {response.text}")
except Exception as e:
print(f"Error sending Telegram alert: {e}")
def send_slack_alert(message):
"""Send a notification to a Slack channel."""
payload = {"text": message}
try:
response = requests.post(SLACK_WEBHOOK_URL, json=payload)
if response.status_code == 200:
print("Slack alert sent successfully.")
else:
print(f"Failed to send Slack alert: {response.text}")
except Exception as e:
print(f"Error sending Slack alert: {e}")
def send_email_alert(subject, message):
"""Send an email alert to the configured receivers."""
msg = MIMEMultipart()
msg['From'] = SMTP_USER
msg['To'] = ", ".join(EMAIL_RECEIVERS)
msg['Subject'] = subject
msg.attach(MIMEText(message, 'plain'))
try:
with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:
server.starttls()
server.login(SMTP_USER, SMTP_PASSWORD)
server.send_message(msg)
print(f"Alert email sent successfully with the Subject: {subject}")
except Exception as e:
print(f"Error sending email: {e}")
def send_alert(subject, message):
"""Send an alert via email, Telegram and Slack."""
full_message = f"{subject}\n\n{message}"
#send_email_alert(subject, message) # Email notification
#send_telegram_alert(full_message) # Telegram noitification
#send_slack_alert(full_message) # Slack notification
def save_to_mongo(data):
try:
client = MongoClient(MONGO_URI)
db = client[MONGO_DB]
collection = db[MONGO_COLLECTION]
collection.insert_one(data)
print(f"Data saved to MongoDB for domain: {data['domain']}")
except Exception as e:
print(f"Error saving to MongoDB: {e}")
def delete_old_mongo():
try:
client = MongoClient(MONGO_URI)
db = client[MONGO_DB]
collection = db[MONGO_COLLECTION]
# Check if the index exists, and create it if it doesn't
if "checked_at_1" not in collection.index_information():
collection.create_index([("checked_at", 1)])
# Get the date 1 month ago (30 Days)
one_month_ago = datetime.now() - timedelta(days=1)
# Delete documents where 'checked_at' is older than 1 month
result = collection.delete_many({"checked_at": {"$lt": one_month_ago}})
# Output how many documents were deleted
print(f"Deleted {result.deleted_count} documents.")
except Exception as e:
print(f"Error deleting old MongoDB Data: {e}")
def monitor_domains():
"""Monitor each domain for ICP license and expiration."""
try:
# Read domains from the input file
with open(FILE_PATH, 'r') as file:
DOMAINS = [line.strip() for line in file.readlines()]
except FileNotFoundError:
print(f"Error: {FILE_PATH} file not found. Will proceed on domain listed in the script.")
DOMAINS = DOMAINS_TO_MONITOR
#return
for domain in DOMAINS:
current_datetime = datetime.now()
print("\nCurrent date and time:", current_datetime)
print(f"Checking Landing Page status for domain: {domain}")
# Check domain landing page status
status = check_page_status(domain)
if status != True and status == False:
print(f"WARNING - Unable to load landing page for {domain}")
send_alert(
f"🚨 ALERT - Unable to load landing page for {domain} 🚨",
f"The landing page {domain} is unable to load."
)
status = "Error"
else:
print(f"Landing Page {domain} successful loaded.")
status = "OK"
# Check domain expiry
print(f"Checking expiry info for domain: {domain}")
expiry_date, days_left = check_domain_expiry(domain)
if expiry_date:
if days_left <= DOMAIN_ALERT_DAYS_THRESHOLD:
print(f"WARNING - Domain expiring soon for {domain}")
send_alert(
f"🚨 ALERT - Domain expiring soon: {domain} 🚨",
f"The domain {domain} will expire on {expiry_date} ({days_left} days remaining)."
)
print(f"Domain: {domain}, Expiry Date: {expiry_date}, Days Left: {days_left}")
else:
send_alert(
f"Unable to determine expiry for {domain}",
f"WHOIS lookup failed to find the expiry date for {domain}."
)
print(f"Could not determine expiry for {domain}")
# Check domain SSL
print(f"Checking SSL certificate for domain: {domain}")
ssl_expiry_date, ssl_days_left = check_ssl_certificate(domain)
if ssl_expiry_date:
if ssl_days_left <= SSL_ALERT_DAYS_THRESHOLD:
print(f"WARNING - SSL Certificate expiring soon for {domain}")
send_alert(
f"🚨 ALERT - SSL Certificate expiring soon: {domain} 🚨",
f"The SSL certificate for {domain} will expire on {ssl_expiry_date} ({ssl_days_left} days remaining)."
)
print(f"Domain: {domain}, SSL Expiry Date: {ssl_expiry_date}, SSL Days Left: {ssl_days_left}")
else:
send_alert(
f"Unable to determine SSL certificate expiry for {domain}",
f"Failed to fetch the SSL certificate expiry for {domain}."
)
# Save data to MongoDB
data = {
"domain": domain,
"checked_at": current_datetime,
"landing_page_status_info": {
"status": status,
},
"whois_info": {
"expiry_date": expiry_date,
"days_left": days_left,
},
"ssl_info": {
"ssl_expiry_date": ssl_expiry_date,
"ssl_days_left": ssl_days_left,
},
}
save_to_mongo(data)
delete_old_mongo()
# Run the monitoring script
if __name__ == "__main__":
monitor_domains()