-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbase.py
More file actions
462 lines (401 loc) · 13.4 KB
/
base.py
File metadata and controls
462 lines (401 loc) · 13.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
import os
import logging as log
import sys
import time
import shutil
from config import Settings
from logging.handlers import RotatingFileHandler
from selenium import webdriver
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import (
MoveTargetOutOfBoundsException,
InvalidSessionIdException,
NoSuchWindowException,
)
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from random import randrange
class PromptNotFoundException(Exception):
pass
def close_shop(driver):
"""
Close the WebDriver session and clean up any residual Chrome profile directories.
Args:
driver: The Selenium WebDriver instance.
Returns:
None
"""
settings = Settings()
if driver:
try:
PID = driver.service.process.pid
log.info("closing shop...")
log.info(f"Chrome PID: {PID}")
driver.close()
driver.quit()
except (InvalidSessionIdException, NoSuchWindowException, Exception) as ex:
log.error(f"Error closing driver: {ex}", exc_info=True)
finally:
# Clean up any residual chrome_profile directories
base_dir = settings.base_dir
for dir_name in os.listdir(base_dir):
if dir_name.startswith("chrome_profile"):
profile_path = os.path.join(base_dir, dir_name)
try:
shutil.rmtree(profile_path)
log.info(
f"Deleted residual Chrome profile directory: {profile_path}"
)
except Exception as ex:
log.error(
f"Error deleting residual Chrome profile directory: {ex}",
exc_info=True,
)
def get_length_of_page(driver):
"""
Get the length of the page for scrolling.
Args:
driver: The Selenium WebDriver instance.
Returns:
int: The height of the page.
"""
return driver.execute_script("return document.body.scrollHeight;")
def get_file_name(_file):
"""
Get the file name from the full path.
Args:
_file: The file name to get the name for.
Returns:
str: The name of the file.
"""
return os.path.basename(_file)
def get_working_directory(_file):
"""
Get the working directory of the script.
Args:
_file: The file name to get the directory for.
Returns:
str: The directory of the file.
"""
return os.path.dirname(_file)
def start_end_log(file, end_log=False):
"""
Log the start or end of a session.
Args:
file: The file name to log.
end_log: Boolean indicating if this is the end of the session.
Returns:
None
"""
prefix = "End" if end_log else "Start"
log.info(f"{prefix} of {get_file_name(file)} session")
def random_time():
"""
Randomizes sleep to avoid detection.
Sleep time is between 3 and 11 seconds.
Args:
None
Returns:
None
"""
sleep_time = randrange(2, 6)
log.info(f"Sleeping for {sleep_time} seconds...")
print(f"Sleeping for {sleep_time} seconds to avoid detection...")
return time.sleep(sleep_time)
def click_element(browser, elem, elem_name=None):
"""
Click on a WebElement using ActionChains.
Args:
browser: The Selenium WebDriver instance.
elem: The WebElement to click on.
elem_name: Optional name for the element (for logging).
Returns:
None
Raises:
MoveTargetOutOfBoundsException: If the element is out of bounds.
"""
try:
_name = elem if elem_name is None else elem_name
log.info(f"clicking element: {_name}")
ActionChains(browser).move_to_element(elem).click().perform()
log.info(f"{_name} clicked successfully!")
except MoveTargetOutOfBoundsException:
log.info("Error click_element", exc_info=True)
def notification_popup(driver):
"""
Handles the 'Turn On Notifications' prompt.
Args:
driver: The Selenium WebDriver instance.
Returns:
bool: True if the prompt was bypassed, False otherwise.
"""
settings = Settings()
result = True
if settings.bypass_popup_check:
return result
try:
log.info("Checking for notification prompt")
# Wait for the prompt to appear
wait = WebDriverWait(driver, 5)
# Check for "Turn On" to detect the prompt
turn_on_btn = wait.until(
EC.presence_of_element_located(
(
By.XPATH,
"//*[contains(translate(text(), 'ABCDEFGHIJKLMNOPQRSTUVWXYZ', 'abcdefghijklmnopqrstuvwxyz'), 'turn on')]",
)
)
)
if turn_on_btn:
log.info("Turn on modal detected, attempting to bypass")
# Try finding "Not Now" button
not_now_btn = wait.until(
EC.element_to_be_clickable(
(
By.XPATH,
"//div[@role='button' and @tabindex='0' and contains(translate(text(), 'ABCDEFGHIJKLMNOPQRSTUVWXYZ', 'abcdefghijklmnopqrstuvwxyz'), 'not now')]",
)
)
)
if not_now_btn:
click_element(driver, not_now_btn, "Not Now")
settings.bypass_popup_check = True
else:
log.debug("Not Now button not found, continuing")
result = False
else:
log.debug("Turn on notifications prompt not found, continuing")
result = False
except Exception as ex:
log.debug(f"Notification prompt not found or not interactable: {ex}")
result = False
finally:
return result
def save_popup(driver):
"""
Handles the 'Save Info' prompt.
Args:
driver: The Selenium WebDriver instance.
Returns:
bool: True if the prompt was bypassed, False otherwise.
"""
settings = Settings()
if settings.bypass_popup_check:
return True
try:
log.info("Checking for save info prompt..")
wait = WebDriverWait(driver, 5)
not_now_btn = wait.until(
EC.element_to_be_clickable(
(
By.XPATH,
"//*[contains(translate(text(), 'ABCDEFGHIJKLMNOPQRSTUVWXYZ', 'abcdefghijklmnopqrstuvwxyz'), 'not now')]",
)
)
)
if not_now_btn:
log.info("Save info prompt detected, attempting to bypass")
click_element(driver, not_now_btn, "Not Now")
log.info("Save info prompt bypassed...")
return True
else:
log.debug("Save info prompt not found")
return False
except Exception as ex:
log.debug(f"Save info prompt not found or not interactable: {ex}")
return False
def check_login_status(driver, username):
"""
Check if the user is already logged in to Instagram.
Args:
driver: The Selenium WebDriver instance.
username: The Instagram username.
Returns:
bool: True if the user is logged in, False otherwise.
Raises:
Exception: If an error occurs during the check.
"""
try:
driver.get("https://www.instagram.com/")
time.sleep(3)
profile_link = driver.find_elements(
By.XPATH, f"//a[contains(@href, '/{username}/')]"
)
if profile_link:
log.info("User is already logged in")
return True
log.info("User is not logged in")
return False
except Exception as ex:
log.error(f"Error checking login status: {ex}", exc_info=True)
return False
def check_session(driver):
"""
Check if the WebDriver session is still active.
Args:
driver: The Selenium WebDriver instance.
Returns:
bool: True if the session is active, False otherwise.
Raises:
NoSuchWindowException: If the window is closed.
InvalidSessionIdException: If the session ID is invalid.
"""
try:
driver.execute_script("return true;")
return True
except (NoSuchWindowException, InvalidSessionIdException) as ex:
log.error(f"Session check failed: {ex}", exc_info=True)
return False
def find_text(search_value: str, browser: webdriver):
"""
Find an interactable element containing specific text on the page.
Args:
search_value: The text to search for.
browser: The Selenium WebDriver instance.
Returns:
WebElement: The found element, or None if not found.
"""
target = None
try:
log.info(f"Searching for text: {search_value}")
xpath = f"//div[@role='button' and @tabindex='0' and contains(translate(text(), 'ABCDEFGHIJKLMNOPQRSTUVWXYZ', 'abcdefghijklmnopqrstuvwxyz'), '{search_value.lower()}')]"
target = browser.find_element(by=By.XPATH, value=xpath)
log.info(f'Text "{search_value}" found with role=button selector')
except Exception as ex:
log.debug(f"Text not found: {search_value}, {ex}")
finally:
return target
def get_driver():
"""
Initialize the Selenium WebDriver.
Args:
None
Returns:
WebDriver: The initialized WebDriver instance.
Raises:
Exception: If an error occurs during WebDriver initialization.
"""
try:
log.info("Retrieving WebDriver...")
options = Options()
# Keep headless disabled for debugging; revert to osname-based logic if desired
options.headless = False
options.add_argument("--no-first-run")
options.add_argument("--no-default-browser-check")
options.add_argument("--disable-extensions")
options.add_argument("--disable-sync")
options.add_argument("--disable-notifications")
browser = webdriver.Chrome(options=options)
browser.set_window_size(1200, 800)
browser.implicitly_wait(15)
return browser
except Exception as ex:
log.error(f"Error initializing WebDriver: {ex}", exc_info=True)
raise
def get_password(driver, password):
"""
Find the password input field and enter the password.
Args:
driver: The Selenium WebDriver instance.
password: The Instagram password.
Returns:
None
"""
pass_element = driver.find_element(by=By.XPATH, value="//input[@name='password']")
log.info(f"found password element: {pass_element}")
ActionChains(driver).move_to_element(pass_element).click().send_keys(
password
).perform()
random_time()
def get_username(driver, username):
"""
Find the username input field and enter the username.
Args:
driver: The Selenium WebDriver instance.
username: The Instagram username.
Returns:
None
"""
user_element = driver.find_element(by=By.XPATH, value="//input[@name='username']")
log.info(f"found username element: {user_element}")
ActionChains(driver).move_to_element(user_element).click().send_keys(
username
).perform()
random_time()
def click_login(browser):
"""
Find the login button and click it.
Args:
browser: The Selenium WebDriver instance.
Returns:
None
"""
login_button = browser.find_element(
by=By.XPATH, value="//*[contains(text(), 'Log in')]"
)
log.info(f"Login button found: {login_button}")
ActionChains(browser).move_to_element(login_button).click().perform()
log.info("Login successful!")
random_time()
def site_login(browser, username, password):
"""
Log in handler for Instagram.
Args:
browser: The Selenium WebDriver instance.
username: The Instagram username.
password: The Instagram password.
Returns:
browser: The logged-in WebDriver instance.
Raises:
Exception: If an error occurs during login.
"""
try:
if check_login_status(browser, username):
log.info("Skipping login as user is already logged in")
return browser
log.info("initiating fresh login")
browser.get("https://www.instagram.com/accounts/login/")
get_username(browser, username)
get_password(browser, password)
click_login(browser)
random_time()
save_popup(browser)
if not notification_popup(browser):
log.info("Bypassed notification prompt or none found")
return browser
except Exception as err:
log.info(f"Error logging in to site: {err}", exc_info=True)
sys.exit(1)
def clear_cmd():
"""
Clear the console screen.
Args:
None
Returns:
None
"""
os.system("cls" if os.name == "nt" else "clear")
# Logging setup
settings = Settings()
BASE_DIR = get_working_directory(__file__)
# Create logs and data directories if they don't exist
os.makedirs(os.path.dirname(settings.logging), exist_ok=True)
# os.makedirs(os.path.join(BASE_DIR, "data"), exist_ok=True)
handlers = [
log.StreamHandler(),
RotatingFileHandler(
settings.logging,
mode="a",
maxBytes=5 * 1024 * 1024,
backupCount=5,
encoding=None,
delay=0,
),
]
log.basicConfig(
format="%(asctime)s | %(levelname)s | %(message)s",
handlers=handlers,
level=log.ERROR,
)