-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathPerformance API CLI.py
More file actions
618 lines (474 loc) · 20.5 KB
/
Performance API CLI.py
File metadata and controls
618 lines (474 loc) · 20.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
#! /usr/bin/env python3
import requests
import json
import os
import time
import string
import calendar
import datetime
import base64
import pathlib
import re
import smtplib
import signal
from pathlib import Path
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
from email import encoders
from email.mime.text import MIMEText
from Data.persistent.emailsCC import CCmails, mainEmailFrom, toMail
import csv
""" SUMMARY:
- ⚙️ Fundamental setup (Path, API URLs and others)
- 📜 Load resources
- 🔐 Security execution
- ✍️ Misc variables
- 🔖 Email function
- 🦾 CSV generator, checks and cleaning
- 🎯 API functions
- 🏞 User Interface start
"""
# ⚙️ Fundamental setup (Path, API URLs and others)
os.system("clear")
# API DOCUMENTATION LINK: https://developers.hubspot.com/docs/api/cms/performance
urlPerformance = "https://api.hubapi.com/cms/v3/performance/"
urlUptime = "https://api.hubapi.com/cms/v3/performance/uptime"
# Disable in production
debugMode = False
enableLogs = True
shiftAlpha = 4
# Dynamic File Path Solution
API_PATH = pathlib.Path(__file__).parent.absolute()
def relative_to_assets(path: str) -> Path:
return API_PATH / Path(path)
def getTime():
# Get current time and store inside a variable. [used for LOGS]
currentTime = datetime.datetime.now()
return currentTime
def signal_handler(sig, frame):
if enableLogs == True:
# ===== 🚨 LOGS STEP =====
logStepEx = f"{getTime()} - WARNING: User interrupted the execution manually. Removing temporary files."
# Check if Data/logs/log.txt exists, if not create it.
if not os.path.exists(relative_to_assets("Data/logs/log.txt")):
open(relative_to_assets("Data/logs/log.txt"), "w+")
# Append logStep to log.txt
with open(relative_to_assets("Data/logs/log.txt"), "a") as logFile:
logFile.write(f"{logStepEx}\n")
# ===== 🚨 LOGS STEP =====
# If the program exits then remove important files.
# Delete Data/security/.tmp/.API file
os.remove(relative_to_assets("Data/security/.tmp/.API"))
os.remove(relative_to_assets("Data/security/.tmp/.epasswd"))
exit()
# 📜 Load resources
# Load URLs.json and store in a variable
with open(relative_to_assets("Data/persistent/URLs.json"), "r") as f:
jsonURLs = json.load(f)
# print(jsonURLs)
# Load mainDomain.json and store in a variable
with open(relative_to_assets("Data/persistent/mainDomain.json"), "r") as f:
jsonMainDomain = json.load(f)
targetDomain = jsonMainDomain.get("domain")
# Nick's Security checks and loads
def decryptSecurity():
if enableLogs == True:
# ===== 🚨 LOGS STEP =====
logStepEx = (
f"{getTime()} - SECURITY: Decrypting API key for internal usage in memory."
)
# Check if Data/logs/log.txt exists, if not create it.
if not os.path.exists(relative_to_assets("Data/logs/log.txt")):
open(relative_to_assets("Data/logs/log.txt"), "w+")
# Append logStep to log.txt
with open(relative_to_assets("Data/logs/log.txt"), "a") as logFile:
logFile.write(f"{logStepEx}\n")
# ===== 🚨 LOGS STEP =====
key = "MjI0" # up 255
key = base64.b64decode(key)
cleanKey = re.sub(r"[^A-Za-z0-9-]", "", key.decode("utf-8"))
finalKey = int(cleanKey)
loadEnc00 = open(relative_to_assets("Data/security/.API.nclmE"), "rb").read()
byteReader = bytearray(loadEnc00)
for index, value in enumerate(byteReader):
byteReader[index] = value ^ finalKey
decEnc = open(relative_to_assets("Data/security/.tmp/.API"), "wb")
decEnc.write(byteReader)
def decryptMailSecurity():
if enableLogs == True:
# ===== 🚨 LOGS STEP =====
logStepEx = f"{getTime()} - SECURITY: Decrypting Email credentials for internal usage in memory."
# Check if Data/logs/log.txt exists, if not create it.
if not os.path.exists(relative_to_assets("Data/logs/log.txt")):
open(relative_to_assets("Data/logs/log.txt"), "w+")
# Append logStep to log.txt
with open(relative_to_assets("Data/logs/log.txt"), "a") as logFile:
logFile.write(f"{logStepEx}\n")
# ===== 🚨 LOGS STEP =====
key = "MTkw" # up to 255
key = base64.b64decode(key)
cleanKey = re.sub(r"[^A-Za-z0-9-]", "", key.decode("utf-8"))
finalKey = int(cleanKey)
loadEnc00 = open(relative_to_assets("Data/security/.epasswd.nclmE"), "rb").read()
byteReader = bytearray(loadEnc00)
for index, value in enumerate(byteReader):
byteReader[index] = value ^ finalKey
decEnc = open(relative_to_assets("Data/security/.tmp/.epasswd"), "wb")
decEnc.write(byteReader)
# 🔐 Security execution
def API_SEC():
if enableLogs == True:
# ===== 🚨 LOGS STEP =====
logStepEx = f"{getTime()} - SECURITY: Main security function started... Running checks and crypto jobs."
# Check if Data/logs/log.txt exists, if not create it.
if not os.path.exists(relative_to_assets("Data/logs/log.txt")):
open(relative_to_assets("Data/logs/log.txt"), "w+")
# Append logStep to log.txt
with open(relative_to_assets("Data/logs/log.txt"), "a") as logFile:
logFile.write(f"{logStepEx}\n")
# ===== 🚨 LOGS STEP =====
decryptSecurity()
decryptMailSecurity()
global shiftAlpha
# Caesar Cipher
alphaCharset = string.ascii_letters
numCharset = string.digits
charsetMain = alphaCharset + numCharset
totalNum = 0
for i in range(len(charsetMain)):
totalNum += i
shiftAlpha %= totalNum
unshiftAlpha = -shiftAlpha
alphaUnshifted = None
alphaUnshifted = charsetMain[unshiftAlpha:] + charsetMain[:unshiftAlpha]
tableContentUn = str.maketrans(charsetMain, alphaUnshifted)
# Security measures
API_CONTENT = open(relative_to_assets("Data/security/.tmp/.API"), "r").read()
API_DECODED = base64.b64decode(API_CONTENT.encode("utf-8"))
# Regular expression to remove garbage characters, do not remove "-"
API_DECODED_CLEAN = re.sub(r"[^A-Za-z0-9-]", "", API_DECODED.decode("utf-8"))
UNLOCKED_CONTENT = str(API_DECODED_CLEAN).translate(tableContentUn)
os.remove(relative_to_assets("Data/security/.tmp/.API"))
return UNLOCKED_CONTENT
def EMAIL_SEC():
if enableLogs == True:
# ===== 🚨 LOGS STEP =====
logStepEx = f"{getTime()} - SECURITY: Email decryption was called... Running crypto jobs to provide email credentials."
# Check if Data/logs/log.txt exists, if not create it.
if not os.path.exists(relative_to_assets("Data/logs/log.txt")):
open(relative_to_assets("Data/logs/log.txt"), "w+")
# Append logStep to log.txt
with open(relative_to_assets("Data/logs/log.txt"), "a") as logFile:
logFile.write(f"{logStepEx}\n")
# ===== 🚨 LOGS STEP =====
global shiftAlpha
# Caesar Cipher
alphaCharset = string.ascii_letters
numCharset = string.digits
charsetMain = alphaCharset + numCharset
totalNum = 0
for i in range(len(charsetMain)):
totalNum += i
shiftAlpha %= totalNum
unshiftAlpha = -shiftAlpha
alphaUnshifted = None
alphaUnshifted = charsetMain[unshiftAlpha:] + charsetMain[:unshiftAlpha]
tableContentUn = str.maketrans(charsetMain, alphaUnshifted)
# Email passwd security measures
EMAIL_CONTENT = open(relative_to_assets("Data/security/.tmp/.epasswd"), "r").read()
EMAIL_DECODED = base64.b64decode(EMAIL_CONTENT.encode("utf-8"))
EMAIL_FINAL_DECODE = EMAIL_DECODED.decode("utf-8")
UNLOCKED_CONTENT = str(EMAIL_FINAL_DECODE).translate(tableContentUn)
os.remove(relative_to_assets("Data/security/.tmp/.epasswd"))
return UNLOCKED_CONTENT
# ✍️ Misc variables
allPaths = []
usingPath = None
executeNow = False
# constantlyCheck = True
# UNIX TIMESTAMP RESOLVER
# unixCurrentTimestamp = calendar.timegm(time.gmtime())
unixCurrentTimestamp = datetime.datetime.now().timestamp()
# Remove "." from unixCurrentTimestamp
unixCurrentTimestamp = str(unixCurrentTimestamp).replace(".", "")
# Convert unixCurrentTimestamp to int
unixCurrentTimestamp = int(unixCurrentTimestamp)
unixFuture = unixCurrentTimestamp + 600000
# This querystring is temporary
querystring = {
"domain": f"{targetDomain}",
"path": f"{usingPath}",
"period": "4h",
"interval": "10m",
"hapikey": f"{API_SEC()}",
}
headers = {"accept": "application/json"}
clientName = "CLIENT NAME"
ver = "v1.9.4"
dev = "Nick"
emailFrom = mainEmailFrom(224)
emailBCC = CCmails
for url in jsonURLs["results"]:
# Append url to usingPath
allPaths.append(url)
if debugMode == True:
print(f"\n\nDEBUG: {allPaths}\n\n")
# 🔖 Email function
def sendEmail(subject: str, message: str):
if enableLogs == True:
# ===== 🚨 LOGS STEP =====
logStepEx = f"{getTime()} - Email send function started."
# Check if Data/logs/log.txt exists, if not create it.
if not os.path.exists(relative_to_assets("Data/logs/log.txt")):
open(relative_to_assets("Data/logs/log.txt"), "w+")
# Append logStep to log.txt
with open(relative_to_assets("Data/logs/log.txt"), "a") as logFile:
logFile.write(f"{logStepEx}\n")
# ===== 🚨 LOGS STEP =====
# Email settings
emailSubject = f"HubSpot Performance API Report - {subject}"
rcpt = emailBCC.split(",") + [toMail]
msg = MIMEMultipart()
msg["From"] = emailFrom
msg["To"] = toMail
msg["Cc"] = emailBCC
msg["Subject"] = emailSubject
body_part = MIMEText(f"{message}", "plain")
msg.attach(body_part)
for content in msg:
content.encode("utf-8")
with open(relative_to_assets("Data/release/resultsOutput.csv"), "rb") as file:
# Attach the file with filename to the email
msg.attach(MIMEApplication(file.read(), Name="resultsOutput.csv"))
try:
emailServer = smtplib.SMTP("smtp.gmail.com", 587)
emailServer.ehlo()
emailServer.starttls()
emailServer.login(emailFrom, EMAIL_SEC())
emailServer.sendmail(msg["From"], rcpt, msg.as_string())
emailServer.quit()
print("Success: Email sent!")
except FileNotFoundError:
if enableLogs == True:
# ===== 🚨 LOGS STEP =====
logStepEx = f"{getTime()} - EMAIL SEND ERROR. Please check 'sendEmail()' function - Probably a file is missing, or incorrect encoding or credential error with email server."
# Check if Data/logs/log.txt exists, if not create it.
if not os.path.exists(relative_to_assets("Data/logs/log.txt")):
open(relative_to_assets("Data/logs/log.txt"), "w+")
# Append logStep to log.txt
with open(relative_to_assets("Data/logs/log.txt"), "a") as logFile:
logFile.write(f"{logStepEx}\n")
# ===== 🚨 LOGS STEP =====
print("Email failed to send.")
# 🦾 CSV generator, checks and cleaning
def csvEngine():
if enableLogs == True:
# ===== 🚨 LOGS STEP =====
logStep = f"{getTime()} - CSV ENGINE fetching data."
# Check if Data/logs/log.txt exists, if not create it.
if not os.path.exists(relative_to_assets("Data/logs/log.txt")):
open(relative_to_assets("Data/logs/log.txt"), "w+")
# Append logStep to log.txt
with open(relative_to_assets("Data/logs/log.txt"), "a") as logFile:
logFile.write(f"{logStep}\n")
# ===== 🚨 LOGS STEP =====
print("Generating CSV...")
try:
# Load the file resultsOutput.json
with open(relative_to_assets("Data/export/resultsOutput.json")) as jsonFile:
jsonResults = json.load(jsonFile)
with open(
relative_to_assets("Data/release/resultsOutput.csv"), "w"
) as csvFile:
writer = csv.writer(csvFile)
writer.writerow(
["Path", "404 Status", "Period", "Interval", "Domain"]
) # Header
pathNum = 0
for _ in jsonResults["Path"]:
pathNum += 1 # 100
for i in range(len(jsonResults["Path"])):
writer.writerow(
[
jsonResults["Path"][i],
jsonResults["Data"][i][0]["404"],
"4 Hours",
"10 Minutes",
targetDomain,
]
)
csvCheck()
except FileNotFoundError:
if enableLogs == True:
# ===== 🚨 LOGS STEP =====
logStepEx = f"{getTime()} - CSV ENGINE ERROR. File not found - Probably the JSON file was not found."
# Check if Data/logs/log.txt exists, if not create it.
if not os.path.exists(relative_to_assets("Data/logs/log.txt")):
open(relative_to_assets("Data/logs/log.txt"), "w+")
# Append logStep to log.txt
with open(relative_to_assets("Data/logs/log.txt"), "a") as logFile:
logFile.write(f"{logStepEx}\n")
# ===== 🚨 LOGS STEP =====
print("ERROR! JSON not found.")
def csvCheck():
if enableLogs == True:
# ===== 🚨 LOGS STEP =====
logStepEx = f"{getTime()} - CSV Checks started. Searching for 404s and if detected an email will be sent."
# Check if Data/logs/log.txt exists, if not create it.
if not os.path.exists(relative_to_assets("Data/logs/log.txt")):
open(relative_to_assets("Data/logs/log.txt"), "w+")
# Append logStep to log.txt
with open(relative_to_assets("Data/logs/log.txt"), "a") as logFile:
logFile.write(f"{logStepEx}\n")
# ===== 🚨 LOGS STEP =====
print("Checking if there is pages with 404.")
print("If there is a 404, the script will send an email.")
time.sleep(15)
with open(relative_to_assets("Data/export/resultsOutput.json")) as jsonFile:
jsonResults = json.load(jsonFile)
issueFound = False
# if jsonResults 'Data' key contains a 404 key greater than 0, then issueFound = True
pagesError = 0
for i in range(len(jsonResults["Data"])):
if jsonResults["Data"][i][0]["404"] > 0:
print("404 found! Sending email.")
pagesError += 1
else:
print("No 404s found.")
if pagesError > 0:
if enableLogs == True:
# ===== 🚨 LOGS STEP =====
logStepEx = f"{getTime()} - WARNING: 404s found. Triggering email."
# Check if Data/logs/log.txt exists, if not create it.
if not os.path.exists(relative_to_assets("Data/logs/log.txt")):
open(relative_to_assets("Data/logs/log.txt"), "w+")
# Append logStep to log.txt
with open(relative_to_assets("Data/logs/log.txt"), "a") as logFile:
logFile.write(f"{logStepEx}\n")
# ===== 🚨 LOGS STEP =====
sendEmail(
"HubSpot Performance API",
f"Hi,\n\nThere are some pages that need attention, 404 status has been found in a period of 4 hours and interval of 10 minutes in domain '{targetDomain}'.\n\nBest regards,\n{clientName} Performance API",
)
# 🎯 API functions
def funcPerformance(performanceAPI: str):
global querystring
if enableLogs == True:
# ===== 🚨 LOGS STEP =====
logStep = f"{getTime()} - Performance API is running once again..."
# Check if Data/logs/log.txt exists, if not create it.
if not os.path.exists(relative_to_assets("Data/logs/log.txt")):
open(relative_to_assets("Data/logs/log.txt"), "w+")
# Append logStep to log.txt
with open(relative_to_assets("Data/logs/log.txt"), "a") as logFile:
logFile.write(f"{logStep}\n")
# ===== 🚨 LOGS STEP =====
response = requests.request(
"GET", performanceAPI, headers=headers, params=querystring
)
finalOutputPer = response.json()
if debugMode == True:
print(f"{dev}'s DEBUG: {finalOutputPer}")
if finalOutputPer.get("status") is not None:
# Error handling
if finalOutputPer["status"] == "error":
os.remove(relative_to_assets("Data/security/.tmp/.API"))
os.remove(relative_to_assets("Data/security/.tmp/.epasswd"))
print(f"ERROR!\n{finalOutputPer['message']}\n")
else:
resultPathStorage = {"Path": []}
resultDataStorage = {"Data": []}
print("Getting information of all URLs inside the domain...")
for path in allPaths:
usingPath = path
querystring = {
"domain": f"{targetDomain}",
"path": f"{usingPath}",
"start": f"{unixCurrentTimestamp}",
"end": f"{unixFuture}",
"period": "4h",
"interval": "10m",
"hapikey": f"{API_SEC()}",
}
response = requests.request(
"GET", performanceAPI, headers=headers, params=querystring
)
finalOutputPer = response.json()
resultPathStorage[path] = finalOutputPer.get("path")
resultDataStorage[path] = finalOutputPer.get("data")
# Store both resultPathStorage and resultDataStorage in a JSON file called "resultsOutput"
with open(relative_to_assets("Data/export/resultsOutput.json"), "w") as f:
# Clear f
f.seek(0)
f.truncate()
# Append each url value to the resultPathStorage dictionary as values of the 'Path' key.
for path in allPaths:
resultPathStorage["Path"].append(resultPathStorage[path])
# Append each data value to the resultDataStorage dictionary as values of the 'Data' key.
for data in allPaths:
resultDataStorage["Data"].append(resultDataStorage[data])
# Combine resultPathStorage and resultDataStorage. Export as json.
json.dump(
{"Path": resultPathStorage["Path"], "Data": resultDataStorage["Data"]},
f,
)
print("Information stored in JSON file.")
# time.sleep(15)
csvEngine()
# Used but not directly
def funcUptime(uptimeAPI: str):
responseUp = requests.request("GET", uptimeAPI, headers=headers, params=querystring)
finalOutputUp = responseUp.json()
if debugMode == True:
print(f"{dev}'s DEBUG: {finalOutputUp}")
if finalOutputUp.get("status") is not None:
# Error handling
if finalOutputUp["status"] == "error":
print(f"ERROR!\n{finalOutputUp['message']}\n")
else:
pass
def mainController():
global executeNow
print(
"The script will continue running. After 10 minutes it will start checking again."
)
print(
f"Scanning {targetDomain} to get performance information...(Please wait. This can take a while)\n"
)
# Check if Data folder exists, if not break.
if not os.path.exists(relative_to_assets("Data/export")):
print("ERROR! Data folder not found.")
exit()
# Check if ./Data/security/.tmp folder exists, if not create it.
if not os.path.exists(relative_to_assets("Data/security/.tmp")):
os.makedirs(relative_to_assets("Data/security/.tmp"))
# Check if ./Data/export folder exists, if not create it.
if not os.path.exists(relative_to_assets("Data/export")):
os.makedirs(relative_to_assets("Data/export"))
# Check if ./Data/release folder exists, if not create it.
if not os.path.exists(relative_to_assets("Data/release")):
os.makedirs(relative_to_assets("Data/release"))
# Check if ./Data/export/persistent exists, if not create it.
if not os.path.exists(relative_to_assets("Data/export/persistent")):
os.makedirs(relative_to_assets("Data/export/persistent"))
funcPerformance(urlPerformance)
constantlyCheck = True
while constantlyCheck == True:
# Wait 10 minutes (600) and toggle executeNow.
time.sleep(600)
print("Checking again...")
executeNow = True
while executeNow == True:
funcPerformance(urlPerformance)
executeNow = False
while executeNow == False:
time.sleep(600)
executeNow = True
# 🏞 User Interface start
print(f"Welcome to {clientName} Performance CLI {ver}\n")
# signal handler for "CTRL + C"
signal.signal(signal.SIGINT, signal_handler)
if debugMode is True:
API_SEC()
mainController()
signal.pause()