-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmain.py
More file actions
379 lines (313 loc) · 13.2 KB
/
main.py
File metadata and controls
379 lines (313 loc) · 13.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
# -*- coding: utf-8 -*-
"""
TaskNya 监控程序主入口
提供命令行接口和向后兼容的 TrainingMonitor 类。
"""
import os
import sys
import time
import argparse
import logging
from datetime import datetime
# 确保能够导入 core 模块
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from core.config import ConfigManager, DEFAULT_CONFIG
from core.monitor import MonitorManager
from core.notifier import WebhookNotifier, GenericWebhookNotifier, EmailNotifier, WeComNotifier, MessageBuilder
from core.utils import get_gpu_info, setup_logger
from core.utils.logger import get_default_log_path
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(get_default_log_path('monitor.log'), encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class TrainingMonitor:
"""
任务监控器
用于监控深度学习训练等长时间运行的任务,
支持文件检测、日志检测、GPU功耗检测等多种触发条件。
这是一个向后兼容的门面类,内部使用重构后的模块化组件。
Attributes:
config (dict): 配置字典
start_time (datetime): 监控开始时间
should_stop (callable): 停止检查回调函数
Example:
>>> monitor = TrainingMonitor(config_path='config.yaml')
>>> monitor.start_monitoring()
"""
def __init__(self, config_path: str = None):
"""
初始化任务监控器
Args:
config_path (str, optional): 配置文件路径。
如果不提供,将使用默认配置。
"""
# 加载配置
self._config_manager = ConfigManager()
self.config = self._config_manager.load_config(config_path)
# 初始化组件
self._monitor_manager = MonitorManager(self.config)
self._notifier = WebhookNotifier(self.config.get('webhook', {}))
self._generic_notifier = GenericWebhookNotifier(self.config.get('generic_webhook', {}))
self._email_notifier = EmailNotifier(self.config.get('email', {}))
self._wecom_notifier = WeComNotifier(self.config.get('wecom', {}))
self._message_builder = MessageBuilder(self.config.get('webhook', {}))
# 状态
self.start_time = datetime.now()
self.should_stop = lambda: False # 默认的停止检查函数
# 为向后兼容保留的属性
self.low_power_count = 0
self.last_log_position = 0
def _load_config(self, config_path: str) -> dict:
"""
加载配置文件(向后兼容方法)
Args:
config_path: 配置文件路径
Returns:
配置字典
"""
return self._config_manager.load_config(config_path)
def is_training_complete(self):
"""
检查任务是否完成
Returns:
tuple: (是否完成, 触发方式, 详情)
"""
return self._monitor_manager.check()
def _check_gpu_power_below_threshold(self, threshold: float, gpu_ids) -> bool:
"""
检查GPU功耗是否低于阈值(向后兼容方法)
Args:
threshold: 功耗阈值
gpu_ids: GPU ID
Returns:
是否低于阈值
"""
gpu_monitor = self._monitor_manager.get_monitor("GPU功耗监控")
if gpu_monitor:
return gpu_monitor._check_power_below_threshold()
return False
def send_notification(self, training_info: dict) -> bool:
"""
发送通知
Args:
training_info: 任务信息
Returns:
是否发送成功
"""
success = True
# 发送飞书通知
if self._notifier.enabled:
if not self._notifier.send(training_info):
success = False
# 发送通用 Webhook 通知
if self._generic_notifier.enabled:
if not self._generic_notifier.send(training_info):
success = False
# 发送邮件通知
if self._email_notifier.enabled:
if not self._email_notifier.send(training_info):
success = False
# 发送企业微信通知
if self._wecom_notifier.enabled:
if not self._wecom_notifier.send(training_info):
success = False
return success
def get_gpu_info(self) -> str:
"""
获取GPU信息
Returns:
GPU信息字符串
"""
return get_gpu_info()
def start_monitoring(self):
"""
开始监控任务进程
这是主监控循环,会阻塞直到任务完成、超时或被停止。
"""
project_name = self.config['monitor']['project_name']
check_interval = self.config['monitor']['check_interval']
logprint = self.config['monitor']['logprint']
timeout = self.config['monitor']['timeout']
logger.info(f"开始监控任务进程: {project_name}")
if self.config['monitor'].get('check_api_enabled', False):
from core.monitor.api_trigger import ApiTriggerServer
api_port = self.config['monitor'].get('check_api_port', 9870)
api_token = self.config['monitor'].get('check_api_auth_token', '')
def on_trigger(payload):
"""API 触发回调"""
end_time = datetime.now()
info = self._message_builder.build_training_info(
start_time=self.start_time,
end_time=end_time,
project_name=payload.get(
'project_name', self.config['monitor']['project_name']
),
method="API触发",
detail=payload.get('message', 'API 被动触发通知'),
gpu_info=None,
)
self.send_notification(info)
_api_server = ApiTriggerServer(api_port, api_token, on_trigger)
_api_server.start()
elapsed_time = 0
while not self.should_stop():
flag, method, detail = self.is_training_complete()
if flag:
end_time = datetime.now()
# 准备任务信息
training_info = self._message_builder.build_training_info(
start_time=self.start_time,
end_time=end_time,
project_name=project_name,
method=method,
detail=detail,
gpu_info=self.get_gpu_info() if self._should_include_gpu_info(method) else None
)
# 如果是目录监控触发,尝试获取详细报告数据
if method == "目录变化检测":
dir_monitor = self._monitor_manager.get_monitor("目录监控")
if dir_monitor and hasattr(dir_monitor, 'get_report_data'):
training_info['report'] = dir_monitor.get_report_data()
logger.info(f"任务已完成!总耗时: {training_info['duration']}")
self.send_notification(training_info)
# 如果是目录监控触发且启用了持续模式,重置并继续
if method == "目录变化检测":
dir_monitor = self._monitor_manager.get_monitor("目录监控")
if dir_monitor and getattr(dir_monitor, 'continuous_mode', False):
logger.info("持续监控模式:重置目录监控器,继续检测")
dir_monitor.reset()
self.start_time = datetime.now() # 重置计时
continue # 不 break,继续监控
break
# 响应式等待,以便能够快速响应停止信号
# 将等待分解为 1 秒的小间隔
for _ in range(int(check_interval)):
if self.should_stop():
break
time.sleep(1)
elapsed_time += check_interval
# 超时检查
if timeout is not None and isinstance(timeout, (int, float)) and elapsed_time >= timeout:
logger.warning(f"监控超时,已等待 {elapsed_time} 秒")
break
# 定期输出状态
if elapsed_time % logprint == 0:
logger.info(f"监控仍在进行中,已等待 {elapsed_time} 秒")
self._log_monitor_status()
def _should_include_gpu_info(self, method: str) -> bool:
"""
判断是否需要包含GPU信息
Args:
method: 触发方式
Returns:
是否包含GPU信息
"""
return (method == "GPU功耗检测" or
self.config['monitor'].get('check_gpu_power_enabled', False) or
self.config['webhook'].get('include_gpu_info', True) or
self.config['generic_webhook'].get('enabled', False))
def _log_monitor_status(self):
"""输出当前监控状态到日志"""
if self.config['monitor'].get('check_log_enabled', False):
log_mode = self.config['monitor'].get('check_log_mode', 'full')
mode_str = "全量检测" if log_mode == 'full' else "增量检测"
logger.info(f"日志检测模式: {mode_str}")
def main():
"""命令行入口函数"""
parser = argparse.ArgumentParser(
description="TaskNya - 任务监控和通知系统",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
python main.py # 使用默认配置
python main.py --config my.yaml # 使用自定义配置
"""
)
parser.add_argument(
"--config",
help="配置文件路径(YAML格式)",
default=None
)
parser.add_argument(
"--trigger",
action="store_true",
help="跳过监控,直接向所有已启用渠道发送一次通知",
)
parser.add_argument(
"--test-channel",
type=str,
default=None,
choices=["webhook", "generic_webhook", "email", "wecom", "all"],
help="测试指定通知渠道(无视是否启用),可选: webhook, generic_webhook, email, wecom, all",
)
parser.add_argument(
"--message",
type=str,
default=None,
help="手动触发时附带的自定义消息",
)
args = parser.parse_args()
if args.test_channel:
monitor = TrainingMonitor(config_path=args.config)
now = datetime.now()
training_info = monitor._message_builder.build_training_info(
start_time=now,
end_time=now,
project_name=monitor.config['monitor']['project_name'],
method="测试发送",
detail=args.message or "CLI 测试通知渠道",
gpu_info=None
)
channel = args.test_channel
notifiers = {}
if channel in ("webhook", "all"):
cfg = dict(monitor.config.get('webhook', {}))
cfg['enabled'] = True
notifiers['飞书 Webhook'] = WebhookNotifier(cfg)
if channel in ("generic_webhook", "all"):
cfg = dict(monitor.config.get('generic_webhook', {}))
cfg['enabled'] = True
notifiers['通用 Webhook'] = GenericWebhookNotifier(cfg)
if channel in ("email", "all"):
cfg = dict(monitor.config.get('email', {}))
cfg['enabled'] = True
notifiers['邮件'] = EmailNotifier(cfg)
if channel in ("wecom", "all"):
cfg = dict(monitor.config.get('wecom', {}))
cfg['enabled'] = True
notifiers['企业微信'] = WeComNotifier(cfg)
for name, notifier in notifiers.items():
logger.info(f"正在测试 [{name}] ...")
if notifier.send(training_info):
logger.info(f"[{name}] 发送成功")
else:
logger.warning(f"[{name}] 发送失败")
return
if args.trigger:
monitor = TrainingMonitor(config_path=args.config)
now = datetime.now()
training_info = monitor._message_builder.build_training_info(
start_time=now,
end_time=now,
project_name=monitor.config['monitor']['project_name'],
method="手动触发",
detail=args.message or "CLI 手动触发通知",
gpu_info=None
)
logger.info("手动触发通知...")
success = monitor.send_notification(training_info)
if success:
logger.info("通知发送成功")
else:
logger.warning("部分或全部通知发送失败")
return
monitor = TrainingMonitor(config_path=args.config)
monitor.start_monitoring()
if __name__ == "__main__":
main()