forked from masonasons/FastSM
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapplication.py
More file actions
2031 lines (1778 loc) · 73.5 KB
/
application.py
File metadata and controls
2031 lines (1778 loc) · 73.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import sys
import platform
import os
import pickle
import threading
import zipfile
import html
import json
import datetime
import time
import re
import requests
import webbrowser
import config
import wx
from version import APP_NAME, APP_SHORTNAME, APP_VERSION, APP_AUTHOR
shortname = APP_SHORTNAME
name = APP_NAME
version = APP_VERSION
author = APP_AUTHOR
# Regex patterns
url_re = re.compile(r"(?i)\b((?:https?://|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:'\".,<>?]))")
url_re2 = re.compile(r"(?:\w+://|www\.)[^ ,.?!#%=+][^ ]*")
bad_chars = "'\\.,[](){}:;\""
html_tag_re = re.compile(r'<[^>]+>')
class StatusWrapper:
"""Wrapper class to add text attribute to immutable Mastodon status objects"""
def __init__(self, status, text=""):
self._status = status
self.text = text
def __getattr__(self, name):
if name in ('_status', 'text'):
return object.__getattribute__(self, name)
return getattr(self._status, name)
def __hasattr__(self, name):
if name in ('_status', 'text'):
return True
return hasattr(self._status, name)
class NotificationWrapper:
"""Wrapper class to add type label and text to notification objects for templating"""
def __init__(self, notification, type_label="", text=""):
self._notification = notification
self.type = type_label # Human-readable type label like "followed you"
self.text = text # Status text if notification has associated status
def __getattr__(self, name):
if name in ('_notification', 'type', 'text'):
return object.__getattribute__(self, name)
return getattr(self._notification, name)
def __hasattr__(self, name):
if name in ('_notification', 'type', 'text'):
return True
return hasattr(self._notification, name)
class dict_obj:
def __init__(self, dict1):
self.__dict__.update(dict1)
class Application:
"""Main application class that holds all global state and utility methods."""
_instance = None
def __init__(self):
self.accounts = []
self.prefs = None
self.users = []
self.unknown_users = []
self.confpath = ""
self.errors = []
self.currentAccount = None
self.timeline_settings = []
self._initialized = False
@classmethod
def get_instance(cls):
"""Get the singleton application instance."""
if cls._instance is None:
cls._instance = cls()
return cls._instance
def load(self):
"""Initialize the application - load preferences and accounts."""
if self._initialized:
return
import sound
from GUI import main
import mastodon_api as t
import timeline
self.prefs = config.Config(name="FastSM", autosave=True)
# In portable mode, userdata folder is already app-specific, don't add /FastSM
if config.is_portable_mode():
self.confpath = self.prefs._user_config_home
else:
self.confpath = self.prefs._user_config_home + "/FastSM"
# Redirect stderr to errors.log in config directory (not app directory)
# This is especially important for installed versions where the app directory
# (Program Files) is write-protected
if getattr(sys, 'frozen', False):
try:
# Ensure config directory exists before writing errors.log
if not os.path.exists(self.confpath):
os.makedirs(self.confpath)
f = open(self.confpath + "/errors.log", "a")
sys.stderr = f
except Exception as e:
# Log to console if we can't set up error logging
print(f"Warning: Could not set up error logging: {e}", file=sys.__stderr__)
# Load preferences with defaults
self.prefs.timelinecache_version = self.prefs.get("timelinecache_version", 1)
if self.prefs.timelinecache_version == 1:
if os.path.exists(self.confpath + "/timelinecache"):
os.remove(self.confpath + "/timelinecache")
self.prefs.timelinecache_version = 2
self.prefs.user_reversed = self.prefs.get("user_reversed", False)
self.prefs.user_limit = self.prefs.get("user_limit", 4)
self.prefs.postTemplate = self.prefs.get("postTemplate", "$account.display_name$ (@$account.acct$): $text$ $created_at$")
self.prefs.conversationTemplate = self.prefs.get("conversationTemplate", "$account.display_name$: $text$ $created_at$")
self.prefs.copyTemplate = self.prefs.get("copyTemplate", "$account.display_name$ (@$account.acct$): $text$")
self.prefs.boostTemplate = self.prefs.get("boostTemplate", "$account.display_name$ boosted $reblog.account.display_name$: $text$ $created_at$")
self.prefs.quoteTemplate = self.prefs.get("quoteTemplate", "Quoting $account.display_name$ (@$account.acct$): $text$")
self.prefs.notificationTemplate = self.prefs.get("notificationTemplate", "$account.display_name$ (@$account.acct$) $type$: $text$ $created_at$")
self.prefs.messageTemplate = self.prefs.get("messageTemplate", "$account.display_name$: $text$ $created_at$")
self.prefs.userTemplate = self.prefs.get("userTemplate", "$display_name$ (@$acct$): $followers_count$ followers, $following_count$ following, $statuses_count$ posts. Bio: $note$")
self.prefs.accounts = self.prefs.get("accounts", 1)
self.prefs.errors = self.prefs.get("errors", True)
self.prefs.streaming = self.prefs.get("streaming", False)
self.prefs.invisible = self.prefs.get("invisible", False)
self.prefs.invisible_sync = self.prefs.get("invisible_sync", True)
self.prefs.update_time = self.prefs.get("update_time", 2)
self.prefs.media_volume = self.prefs.get("media_volume", self.prefs.get("volume", 1.0)) # Media player volume (migrates from old volume setting)
self.prefs.auto_open_audio_player = self.prefs.get("auto_open_audio_player", False) # Auto-open audio player when media starts
self.prefs.stop_audio_on_close = self.prefs.get("stop_audio_on_close", False) # Stop audio when audio player closes
self.prefs.ctrl_enter_to_send = self.prefs.get("ctrl_enter_to_send", False) # Use Ctrl+Enter to send posts instead of Enter
self.prefs.count = self.prefs.get("count", 40)
self.prefs.repeat = self.prefs.get("repeat", False)
self.prefs.demojify = self.prefs.get("demojify", False)
self.prefs.demojify_post = self.prefs.get("demojify_post", False)
self.prefs.include_media_descriptions = self.prefs.get("include_media_descriptions", True)
self.prefs.include_link_preview = self.prefs.get("include_link_preview", True)
self.prefs.max_usernames_display = self.prefs.get("max_usernames_display", 0) # 0 = show all
self.prefs.position = self.prefs.get("position", True)
self.prefs.chars_sent = self.prefs.get("chars_sent", 0)
self.prefs.posts_sent = self.prefs.get("posts_sent", 0)
self.prefs.replies_sent = self.prefs.get("replies_sent", 0)
self.prefs.quotes_sent = self.prefs.get("quotes_sent", 0)
self.prefs.boosts_sent = self.prefs.get("boosts_sent", 0)
self.prefs.favourites_sent = self.prefs.get("favourites_sent", 0)
self.prefs.statuses_received = self.prefs.get("statuses_received", 0)
self.prefs.ask_dismiss = self.prefs.get("ask_dismiss", True)
self.prefs.reversed = self.prefs.get("reversed", False)
self.prefs.window_shown = self.prefs.get("window_shown", True)
self.prefs.autoOpenSingleURL = self.prefs.get("autoOpenSingleURL", False)
self.prefs.use24HourTime = self.prefs.get("use24HourTime", False)
self.prefs.fetch_pages = self.prefs.get("fetch_pages", 1) # Number of API calls to make when loading timelines
self.prefs.single_api_on_startup = self.prefs.get("single_api_on_startup", False) # Use only one API call on initial timeline loads
self.prefs.check_for_updates = self.prefs.get("check_for_updates", True) # Check for updates on startup
self.prefs.load_all_previous = self.prefs.get("load_all_previous", False) # Keep loading previous until timeline is fully loaded
self.prefs.earcon_audio = self.prefs.get("earcon_audio", True)
self.prefs.earcon_top = self.prefs.get("earcon_top", False)
self.prefs.earcon_mention = self.prefs.get("earcon_mention", True)
self.prefs.wrap = self.prefs.get("wrap", False)
# Content warning handling: 'hide' = show CW only, 'show' = show CW + text, 'ignore' = show text only
self.prefs.cw_mode = self.prefs.get("cw_mode", "hide")
# Keymap for invisible interface (default inherits from default.keymap)
self.prefs.keymap = self.prefs.get("keymap", "default")
# Sync home timeline position with Mastodon marker API
self.prefs.sync_timeline_position = self.prefs.get("sync_timeline_position", False)
# Dark mode: 'off', 'on', or 'auto' (follow system)
self.prefs.dark_mode = self.prefs.get("dark_mode", "off")
# Confirmation settings for menu/hotkey actions
self.prefs.confirm_boost = self.prefs.get("confirm_boost", False)
self.prefs.confirm_unboost = self.prefs.get("confirm_unboost", False)
self.prefs.confirm_favorite = self.prefs.get("confirm_favorite", False)
self.prefs.confirm_unfavorite = self.prefs.get("confirm_unfavorite", False)
self.prefs.confirm_follow = self.prefs.get("confirm_follow", False)
self.prefs.confirm_unfollow = self.prefs.get("confirm_unfollow", False)
self.prefs.confirm_block = self.prefs.get("confirm_block", True)
self.prefs.confirm_unblock = self.prefs.get("confirm_unblock", True)
self.prefs.confirm_mute = self.prefs.get("confirm_mute", False)
self.prefs.confirm_unmute = self.prefs.get("confirm_unmute", False)
self.prefs.confirm_delete = self.prefs.get("confirm_delete", False)
self.prefs.confirm_bookmark = self.prefs.get("confirm_bookmark", False)
self.prefs.confirm_unbookmark = self.prefs.get("confirm_unbookmark", False)
# AI image description settings
self.prefs.ai_service = self.prefs.get("ai_service", "none") # 'none', 'openai', or 'gemini'
self.prefs.openai_api_key = self.prefs.get("openai_api_key", "")
self.prefs.openai_model = self.prefs.get("openai_model", "gpt-4o-mini")
self.prefs.gemini_api_key = self.prefs.get("gemini_api_key", "")
self.prefs.gemini_model = self.prefs.get("gemini_model", "gemini-2.0-flash")
self.prefs.ai_image_prompt = self.prefs.get("ai_image_prompt", "Describe this image in detail for someone who cannot see it. Include information about the subjects, setting, colors, and any text visible in the image.")
# yt-dlp path for YouTube/etc URL extraction (empty = use bundled or system)
self.prefs.ytdlp_path = self.prefs.get("ytdlp_path", "")
# yt-dlp cookies file for age-restricted/private videos
self.prefs.ytdlp_cookies = self.prefs.get("ytdlp_cookies", "")
# Deno path for yt-dlp extractors that need it
self.prefs.deno_path = self.prefs.get("deno_path", "")
# Whether we've already asked about Windows 11 keymap
self.prefs.win11_keymap_asked = self.prefs.get("win11_keymap_asked", False)
# Audio output device index (1 = default device in BASS)
self.prefs.audio_output_device = self.prefs.get("audio_output_device", 1)
# Timeline caching settings
self.prefs.timeline_cache_enabled = self.prefs.get("timeline_cache_enabled", True) # Enable timeline caching for fast startup
self.prefs.timeline_cache_limit = self.prefs.get("timeline_cache_limit", 1000) # Max items to cache per timeline
# Initialize audio output with selected device
import sound
try:
sound.init_audio_output(self.prefs.audio_output_device)
except Exception as e:
# Audio init failure shouldn't crash the app
print(f"Warning: Audio initialization failed: {e}", file=sys.stderr)
if self.prefs.invisible:
main.window.register_keys()
# User cache is now in-memory only per-account, no global cache needed
self.users = []
self.load_timeline_settings()
# Check for and handle any partially configured accounts
self._handle_unfinished_accounts()
# If all accounts were removed, ensure at least one will be created
if self.prefs.accounts <= 0:
self.prefs.accounts = 1
# Load accounts - first one on main thread, rest in parallel if already configured
if self.prefs.accounts > 0:
# First account must be on main thread (handles auth dialogs, sets currentAccount)
self.add_session()
# Load remaining accounts in parallel if more than one
if self.prefs.accounts > 1:
import concurrent.futures
# Check which accounts are already configured (have credentials)
parallelizable = []
sequential = []
for i in range(1, self.prefs.accounts):
if self._is_account_configured(i):
parallelizable.append(i)
else:
sequential.append(i)
# Load configured accounts in parallel
if parallelizable:
with concurrent.futures.ThreadPoolExecutor(max_workers=len(parallelizable)) as executor:
futures = [executor.submit(self._add_session_threaded, i) for i in parallelizable]
concurrent.futures.wait(futures)
# Load unconfigured accounts sequentially on main thread (need dialogs)
for i in sequential:
self.add_session(i)
self._initialized = True
# Check for updates on startup if enabled
# Delay slightly to ensure main window is fully initialized on macOS
if self.prefs.check_for_updates:
def delayed_cfu():
import time
time.sleep(2) # Wait for window to be fully ready
self.cfu()
threading.Thread(target=delayed_cfu, daemon=True).start()
def add_session(self, index=None):
"""Add a new account session."""
import mastodon_api as t
import wx
if index is None:
index = len(self.accounts)
try:
self.accounts.append(t.mastodon(self, index))
except t.AccountSetupCancelled:
# User cancelled account setup - exit gracefully if no accounts
if len(self.accounts) == 0:
wx.CallAfter(wx.Exit)
return
# Otherwise just skip this account
def _is_account_configured(self, index):
"""Check if an account has credentials saved (no dialogs needed)."""
import config
try:
# In portable mode, don't add FastSM prefix (userdata is already app-specific)
# Use save_on_exit=False to avoid overwriting real account prefs on exit
if config.is_portable_mode():
prefs = config.Config(name="account"+str(index), autosave=False, save_on_exit=False)
else:
prefs = config.Config(name="FastSM/account"+str(index), autosave=False, save_on_exit=False)
platform_type = prefs.get("platform_type", "")
if platform_type == "bluesky":
# Bluesky needs handle and password
return bool(prefs.get("bluesky_handle", "")) and bool(prefs.get("bluesky_password", ""))
else:
# Mastodon needs instance URL and access token
return bool(prefs.get("instance_url", "")) and bool(prefs.get("access_token", ""))
except:
return False
def _is_account_partially_configured(self, index):
"""Check if an account has been started but not completed.
Returns (is_partial, platform_type, details) tuple."""
import config
try:
if config.is_portable_mode():
prefs = config.Config(name="account"+str(index), autosave=False, save_on_exit=False)
else:
prefs = config.Config(name="FastSM/account"+str(index), autosave=False, save_on_exit=False)
platform_type = prefs.get("platform_type", "")
if not platform_type:
return (False, None, None)
if platform_type == "bluesky":
handle = prefs.get("bluesky_handle", "")
password = prefs.get("bluesky_password", "")
if handle and password:
return (False, None, None) # Fully configured
if handle or password:
return (True, "bluesky", handle or "incomplete")
# Just platform_type set, nothing else
return (True, "bluesky", "setup not started")
else:
# Mastodon
instance_url = prefs.get("instance_url", "")
access_token = prefs.get("access_token", "")
if instance_url and access_token:
return (False, None, None) # Fully configured
if instance_url:
return (True, "mastodon", instance_url)
# Just platform_type set, nothing else
return (True, "mastodon", "setup not started")
except:
return (False, None, None)
def _handle_unfinished_accounts(self):
"""Check for and handle any partially configured accounts on startup."""
import wx
import config
import shutil
import os
unfinished = []
for i in range(self.prefs.accounts):
is_partial, platform_type, details = self._is_account_partially_configured(i)
if is_partial:
unfinished.append((i, platform_type, details))
if not unfinished:
return
# Build message for user
accounts_to_remove = []
for index, platform_type, details in unfinished:
msg = f"Account {index + 1} ({platform_type}) has incomplete setup"
if details and details != "setup not started":
msg += f": {details}"
result = wx.MessageBox(
f"{msg}\n\nWould you like to continue setup for this account?\n\n"
"Yes = Continue setup\n"
"No = Remove this account",
"Incomplete Account Found",
wx.YES_NO | wx.ICON_QUESTION
)
if result == wx.NO:
accounts_to_remove.append(index)
# Remove accounts (in reverse order to maintain indices)
if accounts_to_remove:
for index in sorted(accounts_to_remove, reverse=True):
try:
# Delete config folder
if config.is_portable_mode():
config_path = os.path.join(self.confpath, f"account{index}")
else:
config_path = os.path.join(self.confpath, f"account{index}")
if os.path.exists(config_path):
shutil.rmtree(config_path)
# Shift remaining account folders down
for j in range(index + 1, self.prefs.accounts):
old_path = os.path.join(self.confpath, f"account{j}")
new_path = os.path.join(self.confpath, f"account{j-1}")
if os.path.exists(old_path):
shutil.move(old_path, new_path)
self.prefs.accounts -= 1
except Exception as e:
print(f"Error removing account {index}: {e}")
def _add_session_threaded(self, index):
"""Add account session from a background thread."""
import mastodon_api as t
try:
account = t.mastodon(self, index)
# Store directly - account is fully initialized
self.accounts.append(account)
except Exception as e:
print(f"Error loading account {index}: {e}")
def save_users(self):
"""No-op - user cache is in-memory only now."""
pass
def save_timeline_settings(self):
"""Save timeline settings to disk."""
f = open(self.confpath + "/timelinecache", "wb")
f.write(pickle.dumps(self.timeline_settings))
f.close()
def load_timeline_settings(self):
"""Load timeline settings from disk."""
try:
f = open(self.confpath + "/timelinecache", "rb")
self.timeline_settings = pickle.loads(f.read())
f.close()
except:
return False
def get_timeline_settings(self, account_id, name):
"""Get or create timeline settings for an account/timeline."""
import timeline
for i in self.timeline_settings:
if i.tl == name and i.account_id == account_id:
return i
self.timeline_settings.append(timeline.TimelineSettings(account_id, name))
return self.timeline_settings[len(self.timeline_settings) - 1]
def clean_users(self):
"""Clear the user cache."""
self.users = []
# ============ Utility Methods (moved from utils.py) ============
def strip_html(self, text):
"""Strip HTML tags and decode entities"""
# Add spaces for block elements and line breaks to prevent text concatenation
# Note: Don't add spaces for inline elements like <span> - Mastodon uses spans
# within URLs (e.g., <span class="invisible">https://</span>) and adding spaces
# would break the URL (causing "https:// example.com" instead of "https://example.com")
text = re.sub(r'</(p|div)>', ' ', text, flags=re.IGNORECASE)
text = re.sub(r'<br\s*/?>', ' ', text, flags=re.IGNORECASE)
text = html_tag_re.sub('', text)
text = html.unescape(text)
text = re.sub(r'\s+', ' ', text).strip()
return text
def html_to_text_for_edit(self, content, mentions=None):
"""Convert HTML content to plain text for editing, preserving newlines and full handles.
Args:
content: The HTML content from the status
mentions: List of mention objects from the status (for resolving full handles)
"""
if not content:
return ""
text = content
# Replace mentions with full handles if we have mention data
# Mastodon HTML: <span class="h-card"><a href="https://instance/@user" ...>@<span>user</span></a></span>
if mentions:
for mention in mentions:
acct = getattr(mention, 'acct', '')
url = getattr(mention, 'url', '')
if acct and url:
# Match the mention link and replace with @acct
# Pattern matches <a href="URL">@anything</a> or similar structures
pattern = rf'<a[^>]*href=["\']?{re.escape(url)}["\']?[^>]*>.*?</a>'
text = re.sub(pattern, f'@{acct}', text, flags=re.IGNORECASE | re.DOTALL)
# Convert line breaks and paragraphs to newlines
text = re.sub(r'<br\s*/?>', '\n', text, flags=re.IGNORECASE)
text = re.sub(r'</p>\s*<p[^>]*>', '\n\n', text, flags=re.IGNORECASE)
text = re.sub(r'</?p[^>]*>', '\n', text, flags=re.IGNORECASE)
# Remove all remaining HTML tags
text = html_tag_re.sub('', text)
# Decode HTML entities
text = html.unescape(text)
# Clean up excessive newlines but preserve intentional ones
text = re.sub(r'\n{3,}', '\n\n', text)
text = text.strip()
return text
def process_status(self, s, return_only_text=False, template="", ignore_cw=False, account=None):
"""Process a Mastodon status for display"""
# Handle scheduled statuses - check for _scheduled flag (set by platform backend)
# or raw ScheduledStatus (has params and scheduled_at)
is_scheduled = getattr(s, '_scheduled', False)
if not is_scheduled and hasattr(s, 'params') and hasattr(s, 'scheduled_at'):
is_scheduled = True
if is_scheduled:
return self._process_scheduled_status(s)
if hasattr(s, 'content'):
text = self.strip_html(s.content)
else:
text = ""
if hasattr(s, 'reblog') and s.reblog:
# For reblogs, get text from reblogged status (which includes its media descriptions)
text = self.process_status(s.reblog, True, ignore_cw=ignore_cw, account=account)
if template == "":
template = self.prefs.boostTemplate
else:
# Handle content warning based on preference
spoiler = getattr(s, 'spoiler_text', None)
if spoiler and not ignore_cw:
cw_mode = getattr(self.prefs, 'cw_mode', 'hide')
if cw_mode == 'hide':
# Show only the content warning
text = f"CW: {spoiler}"
elif cw_mode == 'show':
# Show CW followed by text
text = f"CW: {spoiler}. {text}"
# 'ignore' mode: just use the text as-is
# Handle server-side filter warnings (action="warn")
# Uses the same cw_mode setting as content warnings
filtered = getattr(s, 'filtered', None)
if filtered and not ignore_cw:
# Get filter titles from the matched filters
filter_titles = []
for result in filtered:
filter_obj = getattr(result, 'filter', None)
if filter_obj:
title = getattr(filter_obj, 'title', None)
if title:
filter_titles.append(title)
if filter_titles:
filter_warning = "Filtered: " + ", ".join(filter_titles)
cw_mode = getattr(self.prefs, 'cw_mode', 'hide')
if cw_mode == 'hide':
text = filter_warning
elif cw_mode == 'show':
text = f"{filter_warning}. {text}"
# 'ignore' mode: just use the text as-is
# Add media descriptions to text (only for non-reblogs to avoid duplication)
if self.prefs.include_media_descriptions and hasattr(s, 'media_attachments') and s.media_attachments:
for media in s.media_attachments:
# Handle both objects (from API) and dicts (from cache)
if isinstance(media, dict):
media_type = media.get('type', 'media') or 'media'
description = media.get('description') or media.get('alt')
else:
media_type = getattr(media, 'type', 'media') or 'media'
description = getattr(media, 'description', None) or getattr(media, 'alt', None)
type_display = media_type.upper() if media_type == 'gifv' else media_type.capitalize()
if description:
text += f" ({type_display}) description: {description}"
else:
text += f" ({type_display}) with no description"
# Add card (external link embed) information - especially useful for Bluesky
# posts that only contain a link with no text
if self.prefs.include_link_preview:
card = getattr(s, 'card', None)
if card:
card_title = getattr(card, 'title', None) if not isinstance(card, dict) else card.get('title')
card_description = getattr(card, 'description', None) if not isinstance(card, dict) else card.get('description')
card_url = getattr(card, 'url', None) if not isinstance(card, dict) else card.get('url')
# Show card if we have title, description, or at least the URL
if card_title or card_description or card_url:
card_parts = []
if card_title:
card_parts.append(card_title)
if card_description:
card_parts.append(card_description)
# If no title/description but we have URL, show the URL
if not card_parts and card_url:
card_parts.append(card_url)
card_text = " - ".join(card_parts)
# If text is empty, use card as main text; otherwise append
if not text.strip():
text = f"(Link) {card_text}"
else:
text += f" (Link) {card_text}"
# Add poll information
if hasattr(s, 'poll') and s.poll:
poll = s.poll
# Handle both object and dict (from cache)
def get_poll_attr(obj, name, default=None):
if isinstance(obj, dict):
return obj.get(name, default)
return getattr(obj, name, default)
is_expired = get_poll_attr(poll, 'expired', False)
has_voted = get_poll_attr(poll, 'voted', False)
options = get_poll_attr(poll, 'options', [])
own_votes = get_poll_attr(poll, 'own_votes', []) or []
votes_count = get_poll_attr(poll, 'votes_count', 0)
# Build poll status
if is_expired:
poll_status = "Poll ended"
elif has_voted:
poll_status = "Poll (voted)"
else:
poll_status = "Poll"
# Build options list with vote info
option_texts = []
for i, opt in enumerate(options):
opt_title = get_poll_attr(opt, 'title', str(opt))
opt_votes = get_poll_attr(opt, 'votes_count', 0)
if is_expired or has_voted:
# Show results
if votes_count > 0:
pct = (opt_votes / votes_count) * 100
opt_text = f"{opt_title}: {pct:.0f}%"
else:
opt_text = f"{opt_title}: 0%"
if i in own_votes:
opt_text += " (your vote)"
else:
opt_text = opt_title
option_texts.append(opt_text)
text += f" ({poll_status}: {', '.join(option_texts)})"
# Strip quote-related URLs from text when there's a quote
if hasattr(s, 'quote') and s.quote:
import re
# Remove RE:/QT: followed by a URL at the start
text = re.sub(r'^(RE|QT|re|qt):\s*https?://\S+\s*', '', text, flags=re.IGNORECASE).strip()
# Get the quoted post's URL to strip it from the text
quote_url = getattr(s.quote, 'url', None)
if quote_url:
# Strip the exact URL if it appears at the end
text = text.rstrip()
if text.endswith(quote_url):
text = text[:-len(quote_url)].rstrip()
# Also strip any trailing Mastodon-style status URLs (https://instance/@user/id)
text = re.sub(r'\s*https?://[^\s]+/@[^\s]+/\d+\s*$', '', text).strip()
# Collapse consecutive usernames at start of text if max_usernames_display is set
# Setting controls threshold - when exceeded, show only first username + "and X more"
max_usernames = getattr(self.prefs, 'max_usernames_display', 0)
if max_usernames > 0:
import re
# Match consecutive @username patterns at the start (with optional whitespace between)
username_pattern = r'^((?:@[\w.-]+(?:@[\w.-]+)?(?:\s+|$))+)'
match = re.match(username_pattern, text)
if match:
# Extract all usernames from the matched portion
username_portion = match.group(1)
usernames = re.findall(r'@[\w.-]+(?:@[\w.-]+)?', username_portion)
if len(usernames) > max_usernames:
# Show only the first username and "X more"
first_username = usernames[0]
remaining_count = len(usernames) - 1
rest_of_text = text[len(username_portion):].lstrip()
text = f"{first_username} and {remaining_count} more"
if rest_of_text:
text += f" {rest_of_text}"
if return_only_text:
return text
# Handle quotes: format as "Person: their comment. Quoting person2: quoted text. time"
quote_formatted = ""
if hasattr(s, 'quote') and s.quote:
quote_text = self.process_status(s.quote, True, account=account)
quote_wrapped = StatusWrapper(s.quote, quote_text)
quote_formatted = self.template_to_string(quote_wrapped, self.prefs.quoteTemplate, account=account)
wrapped = StatusWrapper(s, text)
if quote_formatted:
# For quotes, remove timestamp from main template and add it at the very end
main_template = template if template else self.prefs.postTemplate
# Remove $created_at$ from main template for quote posts
temp_template = main_template.replace(" $created_at$", "").replace("$created_at$ ", "").replace("$created_at$", "")
result = self.template_to_string(wrapped, temp_template, account=account)
result += " " + quote_formatted
# Add timestamp at the very end
created_at = getattr(s, 'created_at', None)
if created_at:
result += " " + self.parse_date(created_at)
else:
result = self.template_to_string(wrapped, template, account=account)
return result
def _process_scheduled_status(self, s):
"""Process a scheduled status for display."""
# Check both _scheduled_at (UniversalStatus) and scheduled_at (raw ScheduledStatus)
scheduled_at = getattr(s, '_scheduled_at', None) or getattr(s, 'scheduled_at', None)
# Handle both raw ScheduledStatus (has params dict) and UniversalStatus (content from params)
params = getattr(s, 'params', None)
if params:
# Raw ScheduledStatus - get from params
if isinstance(params, dict):
text = params.get('text', '')
visibility = params.get('visibility', 'public')
spoiler = params.get('spoiler_text', '')
else:
text = getattr(params, 'text', '')
visibility = getattr(params, 'visibility', 'public')
spoiler = getattr(params, 'spoiler_text', '')
else:
# UniversalStatus - content may be empty since params.text doesn't map to content
# Check content first, then fall back to text attribute
content = getattr(s, 'content', '')
if content:
text = self.strip_html(content)
else:
text = getattr(s, 'text', '')
visibility = getattr(s, 'visibility', 'public')
spoiler = getattr(s, 'spoiler_text', '')
# Format scheduled time
if scheduled_at:
time_str = self.parse_date(scheduled_at)
else:
time_str = "unknown time"
# Build the display string
result = f"Scheduled for {time_str}"
if visibility and visibility != 'public':
result += f" ({visibility})"
if spoiler:
result += f" CW: {spoiler}."
result += f": {text}"
# Add media attachment count if any
media = getattr(s, 'media_attachments', None)
if media:
result += f" ({len(media)} attachment{'s' if len(media) > 1 else ''})"
return result
def process_notification(self, n, account=None):
"""Process a Mastodon notification for display using notification template"""
type_labels = {
'follow': 'followed you',
'favourite': 'favourited your post',
'reblog': 'boosted your post',
'mention': 'mentioned you',
'poll': 'poll ended',
'update': 'edited a post',
'status': 'posted',
'follow_request': 'requested to follow you',
'admin.sign_up': 'signed up',
'admin.report': 'new report',
'quote': 'quoted your post',
}
notif_type = getattr(n, 'type', 'unknown')
status = getattr(n, 'status', None)
# Get human-readable type label
label = type_labels.get(notif_type, notif_type)
# Build status text if notification has an associated status
status_text = ""
if status:
# Use text field if available, otherwise strip HTML from content
status_text = getattr(status, 'text', '') or self.strip_html(getattr(status, 'content', ''))
# Collapse consecutive usernames at start of text if max_usernames_display is set
max_usernames = getattr(self.prefs, 'max_usernames_display', 0)
if max_usernames > 0:
import re
username_pattern = r'^((?:@[\w.-]+(?:@[\w.-]+)?(?:\s+|$))+)'
match = re.match(username_pattern, status_text)
if match:
username_portion = match.group(1)
usernames = re.findall(r'@[\w.-]+(?:@[\w.-]+)?', username_portion)
if len(usernames) > max_usernames:
first_username = usernames[0]
remaining_count = len(usernames) - 1
rest_of_text = status_text[len(username_portion):].lstrip()
status_text = f"{first_username} and {remaining_count} more"
if rest_of_text:
status_text += f" {rest_of_text}"
# Handle quote notifications - format similar to how quotes are shown in timelines
if hasattr(status, 'quote') and status.quote:
quote = status.quote
quote_text = getattr(quote, 'text', '') or self.strip_html(getattr(quote, 'content', ''))
quote_account = getattr(quote, 'account', None)
if quote_account:
# Check for alias
quote_user_id = str(getattr(quote_account, 'id', ''))
if account and quote_user_id and quote_user_id in account.prefs.aliases:
quote_name = account.prefs.aliases[quote_user_id]
else:
quote_name = getattr(quote_account, 'display_name', '') or getattr(quote_account, 'acct', '')
quote_acct = getattr(quote_account, 'acct', '')
status_text += f" Quoting {quote_name} (@{quote_acct}): {quote_text}"
else:
status_text += f" Quoting: {quote_text}"
# Add poll info for notifications with polls
if hasattr(status, 'poll') and status.poll:
poll = status.poll
# Handle both object and dict (from cache)
def get_poll_attr(obj, name, default=None):
if isinstance(obj, dict):
return obj.get(name, default)
return getattr(obj, name, default)
is_expired = get_poll_attr(poll, 'expired', False)
options = get_poll_attr(poll, 'options', [])
votes_count = get_poll_attr(poll, 'votes_count', 0)
option_texts = []
for opt in options:
opt_title = get_poll_attr(opt, 'title', str(opt))
opt_votes = get_poll_attr(opt, 'votes_count', 0)
if is_expired and votes_count > 0:
pct = (opt_votes / votes_count) * 100
option_texts.append(f"{opt_title}: {pct:.0f}%")
else:
option_texts.append(opt_title)
poll_label = "Poll ended" if is_expired else "Poll"
status_text += f" ({poll_label}: {', '.join(option_texts)})"
# Wrap notification with type label and text for templating
wrapped = NotificationWrapper(n, type_label=label, text=status_text)
# Use notification template
result = self.template_to_string(wrapped, self.prefs.notificationTemplate, account=account)
return result
def process_conversation(self, c, account=None):
"""Process a Mastodon conversation for display"""
conv_accounts = getattr(c, 'accounts', [])
last_status = getattr(c, 'last_status', None)
if conv_accounts:
# Get display names with alias and demojify support
names = []
for a in conv_accounts[:3]:
user_id = str(getattr(a, 'id', ''))
if account and user_id and user_id in account.prefs.aliases:
names.append(account.prefs.aliases[user_id])
else:
display_name = a.display_name or a.acct
# Apply demojify setting
if self.prefs.demojify:
demojied = self.demojify(display_name)
if demojied == "":
display_name = a.acct
else:
display_name = demojied
names.append(display_name)
participants = ", ".join(names)
if len(conv_accounts) > 3:
participants += f" and {len(conv_accounts) - 3} others"
else:
participants = "Unknown"
if last_status:
text = self.strip_html(getattr(last_status, 'content', ''))
created_at = self.parse_date(getattr(last_status, 'created_at', None))
return f"{participants}: {text} {created_at}"
else:
return f"Conversation with {participants}"
def process_message(self, s, return_text=False):
"""Process a direct message/conversation"""
if hasattr(s, 'last_status'):
return self.process_conversation(s)
elif hasattr(s, 'content'):
text = self.strip_html(s.content)
if return_text:
return text
return self.template_to_string(s, self.prefs.conversationTemplate)
return ""
def find_urls_in_text(self, text):
return [s.strip(bad_chars) for s in url_re2.findall(text)]
def find_urls_in_status(self, s):
"""Find URLs in a status (Mastodon or Bluesky)
Returns URLs in order: text/link URLs first, then media URLs
"""
urls = []
media_urls = []
# For reblogged/boosted posts, also check the inner post
post_to_check = s
if hasattr(s, 'reblog') and s.reblog:
post_to_check = s.reblog
# Get card URL (external link embed)
if hasattr(post_to_check, 'card') and post_to_check.card:
if hasattr(post_to_check.card, 'url') and post_to_check.card.url:
urls.append(post_to_check.card.url)
# Get Bluesky facet links (URLs embedded in post text)
if hasattr(post_to_check, '_facet_links') and post_to_check._facet_links:
for link in post_to_check._facet_links:
if link not in urls:
urls.append(link)
# Get URLs from HTML content (Mastodon)
if hasattr(post_to_check, 'content') and post_to_check.content:
text_urls = self.find_urls_in_text(self.strip_html(post_to_check.content))
for url in text_urls:
if url not in urls:
urls.append(url)
# Get URLs from plain text (Bluesky) - only if no facet links found
if hasattr(post_to_check, 'text') and post_to_check.text and not hasattr(post_to_check, '_facet_links'):
text_urls = self.find_urls_in_text(post_to_check.text)
for url in text_urls:
if url not in urls:
urls.append(url)
# Get media attachment URLs (added last so they appear after text URLs)
if hasattr(post_to_check, 'media_attachments'):
for media in post_to_check.media_attachments:
if hasattr(media, 'url') and media.url:
if media.url not in urls:
media_urls.append(media.url)
# Combine: text URLs first, then media URLs
urls.extend(media_urls)
# Also check quoted posts for media
if hasattr(post_to_check, 'quote') and post_to_check.quote:
quote = post_to_check.quote
if hasattr(quote, 'media_attachments'):
for media in quote.media_attachments:
if hasattr(media, 'url') and media.url and media.url not in urls:
urls.append(media.url)
return urls
def template_to_string(self, s, template="", account=None):
"""Format a status using a template"""
if template == "":
template = self.prefs.postTemplate
# Prepare text content now, but replace AFTER other substitutions
# to prevent $..$ patterns in post text from being interpreted as template vars
text_content = None
if "$text$" in template:
# First check if we have a pre-processed text attribute (from StatusWrapper)
# This includes media descriptions and other processed content from process_status()
text_content = getattr(s, 'text', '')
needs_media_descriptions = False
if not text_content:
# Fall back to stripping HTML from content
# Media descriptions need to be added since we're not using pre-processed text
if hasattr(s, 'reblog') and s.reblog:
text_content = self.strip_html(getattr(s.reblog, 'content', ''))
else:
text_content = self.strip_html(getattr(s, 'content', ''))
needs_media_descriptions = True
if self.prefs.demojify_post:
text_content = self.demojify(text_content)
# Add media descriptions only if we used the fallback path (not pre-processed text)
if needs_media_descriptions and self.prefs.include_media_descriptions:
# Get media from reblog if this is a boost, otherwise from status
status_for_media = s.reblog if hasattr(s, 'reblog') and s.reblog else s
media_attachments = getattr(status_for_media, 'media_attachments', []) or []
for media in media_attachments:
# Handle both objects (from API) and dicts (from cache)
if isinstance(media, dict):
media_type = media.get('type', 'media') or 'media'
description = media.get('description') or media.get('alt')
else: