-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexample.py
More file actions
209 lines (180 loc) · 6.56 KB
/
example.py
File metadata and controls
209 lines (180 loc) · 6.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
import airsync, asyncio, logging, os, platform, pyperclip, shutil, subprocess, threading
import uuid # <--- Added missing import
from notifypy import Notify
from pathlib import Path
# Import pyvolume
try:
import pyvolume
except ImportError:
pyvolume = None
logging.warning("pyvolume not installed. Volume control will not work.")
from gi.repository import Playerctl, GLib
# -------------------------------
# Setup cache and logging
# -------------------------------
CURRENT_HANDLER_ID = None
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
CACHE_DIR = "cache"
ICON_CACHE_DIR = os.path.join(CACHE_DIR, "icons")
DOWNLOADS_DIR = Path.home() / "Downloads"
os.makedirs(ICON_CACHE_DIR, exist_ok=True)
os.makedirs(DOWNLOADS_DIR, exist_ok=True)
# --- HELPER: PERSISTENT DEVICE ID ---
def get_persistent_device_id():
id_file = os.path.join(CACHE_DIR, "device_id.txt")
if os.path.exists(id_file):
with open(id_file, "r") as f:
return f.read().strip()
else:
new_id = str(uuid.uuid4())
with open(id_file, "w") as f:
f.write(new_id)
return new_id
# Get the ID once
DEVICE_ID = get_persistent_device_id()
# ------------------------------------
# -------------------------------
# PlayerManager (No changes)
# -------------------------------
manager = Playerctl.PlayerManager()
active_players = {}
def init_player(name):
try:
player = Playerctl.Player.new_from_name(name)
active_players[name] = player
logging.info(f"Bound to media player: {name}")
except Exception as e:
logging.warning(f"Could not bind to player {name}: {e}")
manager.connect("name-appeared", lambda m, name: init_player(name))
manager.connect("player-vanished", lambda m, player: active_players.pop(player.props.player_name, None))
for name in manager.props.player_names:
init_player(name)
def get_active_player():
for player in active_players.values():
if player.props.playback_status == Playerctl.PlaybackStatus.PLAYING:
return player
return next(iter(active_players.values()), None)
def start_glib_loop():
loop = GLib.MainLoop()
loop.run()
threading.Thread(target=start_glib_loop, daemon=True).start()
# -------------------------------
# AirSync server
# -------------------------------
server = airsync.Server(
key_path=os.path.join(CACHE_DIR, "airsync.key"),
icon_cache_path=ICON_CACHE_DIR,
discovery=True
)
# --- UPDATED CONFIG WITH ID ---
SERVER_CONFIG = {
"name": platform.node(),
"id": DEVICE_ID, # Matches UDP Broadcast
"deviceId": DEVICE_ID, # Matches macInfo
"categoryType": "PC",
"exactDeviceName": "My Linux PC",
"model": "AirSync-Py",
"type": "PC",
"isPlus": True,
"isPlusSubscription": True,
"serverVersion": "2.5.0",
"version": "2.5.0",
}
@server.on_event("mac_info_request")
async def provide_mac_info(handler_id, device_info):
logging.info(f"Device '{device_info.get('name')}' connecting...")
saved_packages = []
if os.path.exists(ICON_CACHE_DIR):
try:
saved_packages = [f[:-4] for f in os.listdir(ICON_CACHE_DIR) if f.endswith(".png")]
except Exception: pass
# Return config with ID
response = SERVER_CONFIG.copy()
response["savedAppPackages"] = saved_packages
return response
@server.on_event("device_connected")
async def on_connect(handler_id):
global CURRENT_HANDLER_ID
CURRENT_HANDLER_ID = handler_id
logging.info(f"✅ Device {handler_id} connected!")
@server.on_event("device_disconnected")
async def on_disconnect(handler_id):
global CURRENT_HANDLER_ID
if CURRENT_HANDLER_ID == handler_id:
CURRENT_HANDLER_ID = None
logging.info(f"❌ Device {handler_id} disconnected.")
@server.on_event("notification")
async def on_notification(data, handler_id):
logging.info(f"🔔 Notification: {data.get('app')} - {data.get('title')}")
try:
notification = Notify()
notification.application_name = data.get('app')
notification.title = data.get('title')
notification.message = data.get('body')
package_name = data.get('package')
if package_name:
icon_path = os.path.join(ICON_CACHE_DIR, f"{package_name}.png")
if os.path.exists(icon_path):
notification.icon = icon_path
notification.send()
except Exception as e:
logging.error(f"Failed to send desktop notification: {e}")
@server.on_event("appIcons")
async def on_app_icons(data, handler_id):
logging.info(f"Received metadata for {len(data)} NEW app icons.")
@server.on_event("remoteControl")
async def on_remote_control(data, handler_id):
action = data.get("action")
if action == "vol_set" and pyvolume:
try: pyvolume.custom(percent=data['value'])
except: pass
return
if action == "vol_mute" and pyvolume:
try: pyvolume.mute()
except: pass
return
player = get_active_player()
if not player: return
try:
if action == "media_play_pause": player.play_pause()
elif action == "media_next": player.next()
elif action == "media_prev": player.previous()
except: pass
@server.on_event("macMediaControl")
async def on_mac_media_control(data, handler_id):
action = data.get("action")
player = get_active_player()
if not player: return
try:
if action == "play": player.play()
elif action == "pause": player.pause()
elif action == "playPause": player.play_pause()
elif action == "next": player.next()
elif action == "previous": player.previous()
elif action == "stop": player.stop()
except: pass
@server.on_event("call_event")
async def on_call_event(data, handler_id):
state = data.get("state")
number = data.get("name", data.get("number", "Unknown"))
if state == "ringing":
try:
subprocess.Popen(["kdialog", "--title", "Incoming Call", "--passivepopup", f"Incoming call from:\n{number}", "10"])
except FileNotFoundError: pass
@server.on_event("status")
async def on_status(data, handler_id):
pass
@server.on_event("clipboardUpdate")
async def on_clipboard(data, handler_id):
text = data.get('text')
if text: pyperclip.copy(text)
async def main():
server.print_qr_code()
logging.info(f"Starting AirSync server (ID: {DEVICE_ID})...")
# PASS CONFIG WITH ID TO SERVER START
await server.start(port=5297, mac_info=SERVER_CONFIG)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logging.info("Server shutting down...")