-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgroups.py
More file actions
205 lines (163 loc) · 7.43 KB
/
groups.py
File metadata and controls
205 lines (163 loc) · 7.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
import asyncio
import contextlib
import logging
from pyrogram import enums
from pyrogram.errors import RPCError
from rich.console import Console as RichConsole
from rich.panel import Panel
from rich.table import Table
from rich.progress import Progress
from rich.prompt import Prompt, Confirm
import db as _default_db
from ratelimit import rate_limiter as _default_rate_limiter
log = logging.getLogger("telex.groups")
async def check_premium_status(app, console, db=None, rate_limiter=None):
"""Check if the account has Telegram Premium."""
console.print("[yellow]Checking premium status...[/]")
try:
me = await app.get_me()
except RPCError as e:
console.print(f"[red]Failed to fetch account info: {e}[/]")
return
if me.is_premium:
status = "[bold green]Active[/bold green] — You have Telegram Premium"
else:
status = "[bold red]Inactive[/bold red] — No Telegram Premium"
console.print(
f"Account: [cyan]{me.first_name or ''} {me.last_name or ''}[/cyan]\n"
f"Username: [cyan]@{me.username or '—'}[/cyan]\n"
f"Premium: {status}"
)
log.info("Premium check: %s (premium=%s)", me.username, me.is_premium)
async def check_spam_status(app, console, db=None, rate_limiter=None):
"""Check account spam status via @SpamBot."""
console.print("[yellow]Checking spam status...[/]")
try:
await app.send_message("SpamBot", "/start")
except RPCError as e:
console.print(f"[red]Failed to contact @SpamBot: {e}[/]")
return
# Poll for response (SpamBot replies within 1-2s, poll up to 10s)
for _ in range(5):
await asyncio.sleep(2)
async for msg in app.get_chat_history("SpamBot", limit=1):
if not msg.outgoing:
console.print(f"[bold cyan]@SpamBot:[/] {msg.text or '[no response]'}")
log.info("Spam check result: %s", (msg.text or "")[:80])
return
console.print("[red]@SpamBot didn't respond in time.[/]")
async def fetch_all_groups(app, console, db=None, rate_limiter=None):
"""Fetch and display all joined groups/channels from Telegram."""
console.print("[yellow]Fetching all groups...[/]")
groups = []
async for dialog in app.get_dialogs():
chat = dialog.chat
if chat.type in (enums.ChatType.GROUP, enums.ChatType.SUPERGROUP, enums.ChatType.CHANNEL):
groups.append(chat)
if not groups:
console.print("[red]No groups found.[/]")
return
# Print as simple list (works with both Console and OutputAdapter)
console.print(f"[bold cyan]All Joined Groups ({len(groups)})[/]")
for i, chat in enumerate(groups, 1):
uname = f"@{chat.username}" if chat.username else "—"
console.print(f" {i}. {chat.title or '—'} ({uname}) [{chat.type.name}] — {chat.members_count or 0} members")
log.info("Fetched %d groups total", len(groups))
async def find_and_leave_restricted(app, console, db=None, rate_limiter=None):
"""Find groups where user can't send messages and offer to leave."""
db = db or _default_db
rate_limiter = rate_limiter or _default_rate_limiter
console.print("[yellow]Fetching groups...[/]")
groups = []
async for dialog in app.get_dialogs():
chat = dialog.chat
if chat.type in (enums.ChatType.GROUP, enums.ChatType.SUPERGROUP, enums.ChatType.CHANNEL):
groups.append(chat)
if not groups:
console.print("[red]No groups found.[/]")
return
console.print(f"[yellow]Checking send permissions for {len(groups)} groups...[/]")
_has_progress = isinstance(console, RichConsole)
restricted = []
progress_cm = Progress(console=console) if _has_progress else contextlib.nullcontext()
with progress_cm as progress:
task = progress.add_task("[yellow]Checking...", total=len(groups)) if progress else None
for i, chat in enumerate(groups):
try:
if not await _check_can_send(app, chat, rate_limiter=rate_limiter):
restricted.append(chat)
except Exception as e:
log.debug("Error checking %s: %s", chat.title, e)
if progress:
progress.advance(task)
elif (i + 1) % 10 == 0:
console.print(f"[dim]Checked {i+1}/{len(groups)}...[/]")
if not restricted:
console.print("[green]All groups allow sending messages.[/]")
return
console.print(f"[yellow]Found {len(restricted)} restricted group(s):[/]")
for i, chat in enumerate(restricted, 1):
uname = f"@{chat.username}" if chat.username else "—"
console.print(f" {i}. {chat.title or '—'} ({uname}) [{chat.type.name}]")
selection = Prompt.ask(
"[cyan]Select groups to leave (comma-separated numbers or 'all')[/]"
)
if selection.strip().lower() == "all":
selected = restricted
else:
try:
indices = [int(x.strip()) - 1 for x in selection.split(",")]
selected = [restricted[i] for i in indices if 0 <= i < len(restricted)]
except (ValueError, IndexError):
console.print("[red]Invalid selection.[/]")
return
if not selected:
console.print("[red]No groups selected.[/]")
return
if Confirm.ask(f"[yellow]Leave {len(selected)} group(s)?[/]"):
await _leave_groups(app, selected, console, db=db, rate_limiter=rate_limiter)
async def _check_can_send(app, chat, rate_limiter=None) -> bool:
"""Check if current user can send messages in a chat."""
rate_limiter = rate_limiter or _default_rate_limiter
try:
member = await rate_limiter.call(
lambda: app.get_chat_member(chat.id, "me")
)
except RPCError:
return True # Can't determine, assume accessible
# Owner/Admin can always send
if member.status in (enums.ChatMemberStatus.OWNER, enums.ChatMemberStatus.ADMINISTRATOR):
return True
# Channels: only admins can post
if chat.type == enums.ChatType.CHANNEL:
return False
# Restricted: check user-specific permissions
if member.status == enums.ChatMemberStatus.RESTRICTED:
if member.permissions:
return member.permissions.can_send_messages is not False
return False
# Regular member: check default group permissions
if chat.permissions:
return chat.permissions.can_send_messages is not False
return True
async def _leave_groups(app, groups, console, db=None, rate_limiter=None):
"""Leave a list of groups with rate limiting."""
db = db or _default_db
rate_limiter = rate_limiter or _default_rate_limiter
left = 0
_has_progress = isinstance(console, RichConsole)
progress_cm = Progress(console=console) if _has_progress else contextlib.nullcontext()
with progress_cm as progress:
task = progress.add_task("[red]Leaving...", total=len(groups)) if progress else None
for i, chat in enumerate(groups):
try:
await rate_limiter.call(lambda c=chat: app.leave_chat(c.id), console)
db.remove_group(chat.id)
console.print(f" [green]✓[/] Left: {chat.title}")
left += 1
except RPCError as e:
console.print(f" [red]✗ Failed: {chat.title} — {e}[/]")
if progress:
progress.advance(task)
log.info("Left %d/%d restricted groups", left, len(groups))
console.print(f"\n[green]Left {left}/{len(groups)} groups.[/]")