-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathrun_qq.py
More file actions
133 lines (104 loc) · 3.72 KB
/
run_qq.py
File metadata and controls
133 lines (104 loc) · 3.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
"""
run_qq.py - Standalone entry point for the NeoFish QQ bot.
Run with:
python run_qq.py
Required environment variables (set in .env or shell):
QQ_WS_URL — WebSocket URL of your NapCat / go-cqhttp instance,
e.g. ws://127.0.0.1:3001
ANTHROPIC_API_KEY — Anthropic API key for the agent
Optional:
QQ_ACCESS_TOKEN — access token if configured in NapCat
QQ_ALLOWED_IDS — comma-separated user/group IDs to accept
MODEL_NAME, WORKDIR, … — same as the web platform
NapCat quick setup:
1. Install NapCat and sign in.
2. Enable the Forward WebSocket plugin on port 3001.
3. Set QQ_WS_URL=ws://127.0.0.1:3001 in your .env.
"""
import asyncio
import logging
from datetime import datetime
from pathlib import Path
from dotenv import load_dotenv
load_dotenv()
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger(__name__)
def _make_result_callback(adapter):
"""Create a result callback that routes scheduled task results to QQ."""
async def on_bot_task_complete(result, source_session_id, source_chat_id, source_platform, debug):
status_icon = "[OK]" if result.status == "success" else "[FAIL]"
text = (
f"定时任务【{result.description}】已完成\n"
f"{status_icon} 状态:{result.status}\n"
f"{'─' * 40}\n"
f"{result.summary}"
)
if debug:
text += f"\n\n[Debug] 任务ID: {result.task_id}"
if result.files:
text += f"\n\n附件: {', '.join(result.files)}"
if result.error:
text += f"\n\n错误: {result.error}"
try:
await adapter.send_message(source_session_id, text)
for f in result.files:
try:
await adapter.send_file(source_session_id, f, "")
except Exception:
logger.exception("Failed to send file %s via QQ", f)
except Exception:
logger.exception("QQ result callback failed")
return on_bot_task_complete
async def main():
from platforms.qq import QQAdapter
from playwright_manager import PlaywrightManager
from session import session_store
from _agent_runner import make_message_handler
from bot_session import BotSession
from scheduler_service import SchedulerService
# Shared PlaywrightManager
pm = PlaywrightManager()
await pm.start()
logger.info("PlaywrightManager started")
# BotSession task queue
bot_queue = asyncio.Queue()
# SchedulerService
scheduler = SchedulerService(bot_task_queue=bot_queue)
await scheduler.start()
# QQ adapter
adapter = QQAdapter(session_store=session_store)
adapter.on_message = make_message_handler(
adapter, pm, session_store, scheduler_service=scheduler
)
# Result callback
on_complete = _make_result_callback(adapter)
# BotSession
bot = BotSession(
task_queue=bot_queue,
pm=pm,
on_complete=on_complete,
)
logger.info("Starting NeoFish QQ bot…")
await adapter.start()
tasks = [
asyncio.create_task(bot.start(), name="bot-session"),
]
try:
# Wait forever (adapter runs its own task internally)
await asyncio.Event().wait()
except (KeyboardInterrupt, asyncio.CancelledError):
pass
finally:
logger.info("Shutting down QQ bot…")
await bot.stop()
await scheduler.stop()
await adapter.stop()
await pm.stop()
for t in tasks:
t.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
if __name__ == "__main__":
asyncio.run(main())