Skip to content

Commit 3317fd0

Browse files
committed
添加agent并发控制
1 parent a1963b1 commit 3317fd0

4 files changed

Lines changed: 110 additions & 12 deletions

File tree

config/config.example.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ agent:
5454
think_interval: 15 # 决策间隔(秒)
5555
message_buffer_size: 15 # 消息缓冲区大小
5656
max_step: 12 # ReAct 最大步数
57+
max_coroutine: 0 # 最大并发思考进程数(0表示不限制)
5758
enable_active_retrieval: true # 是否启用主动记忆检索
5859

5960
# 聊天行为配置

internal/agent/concurrency.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package agent
2+
3+
import (
4+
"sync"
5+
6+
"go.uber.org/zap"
7+
)
8+
9+
// ConcurrencyManager 并发管理器
10+
type ConcurrencyManager struct {
11+
maxConcurrency int
12+
currentRunning int
13+
queue []*ThinkTask
14+
inQueue map[int64]bool // 快速去重(群组ID -> 是否在队列中)
15+
mu sync.Mutex
16+
17+
handler func(groupID int64, isMention bool) // 执行函数
18+
}
19+
20+
// ThinkTask 思考任务
21+
type ThinkTask struct {
22+
GroupID int64
23+
IsMention bool
24+
}
25+
26+
// NewConcurrencyManager 创建并发管理器
27+
func NewConcurrencyManager(max int, h func(groupID int64, isMention bool)) *ConcurrencyManager {
28+
return &ConcurrencyManager{
29+
maxConcurrency: max,
30+
currentRunning: 0,
31+
inQueue: make(map[int64]bool),
32+
handler: h,
33+
}
34+
}
35+
36+
// Submit 提交任务
37+
func (m *ConcurrencyManager) Submit(groupID int64, isMention bool) {
38+
m.mu.Lock()
39+
defer m.mu.Unlock()
40+
41+
if m.inQueue[groupID] {
42+
zap.L().Debug("任务已在队列中,跳过", zap.Int64("group_id", groupID))
43+
return
44+
}
45+
46+
// 如果设置了最大并发数,且当前运行数已满,则入队
47+
if m.maxConcurrency > 0 && m.currentRunning >= m.maxConcurrency {
48+
m.queue = append(m.queue, &ThinkTask{
49+
GroupID: groupID,
50+
IsMention: isMention,
51+
})
52+
m.inQueue[groupID] = true
53+
zap.L().Debug("并发已满,任务进入队列",
54+
zap.Int64("group_id", groupID),
55+
zap.Int("current", m.currentRunning),
56+
zap.Int("queue_len", len(m.queue)))
57+
return
58+
}
59+
60+
m.currentRunning++
61+
go m.execute(groupID, isMention)
62+
}
63+
64+
// execute 执行任务
65+
func (m *ConcurrencyManager) execute(groupID int64, isMention bool) {
66+
defer m.Finish()
67+
m.handler(groupID, isMention)
68+
}
69+
70+
// Finish 任务完成回调
71+
func (m *ConcurrencyManager) Finish() {
72+
m.mu.Lock()
73+
defer m.mu.Unlock()
74+
75+
m.currentRunning--
76+
if m.currentRunning < 0 {
77+
m.currentRunning = 0
78+
}
79+
80+
// 调度下一个任务
81+
if len(m.queue) > 0 {
82+
// 取出队首任务
83+
task := m.queue[0]
84+
m.queue = m.queue[1:]
85+
delete(m.inQueue, task.GroupID)
86+
87+
// 立即启动
88+
m.currentRunning++
89+
go m.execute(task.GroupID, task.IsMention)
90+
zap.L().Debug("从队列调度任务执行", zap.Int64("group_id", task.GroupID))
91+
}
92+
}

internal/agent/react_agent.go

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,16 @@ import (
3131

3232
// Agent 沐沐智能体
3333
type Agent struct {
34-
cfg *config.Config
35-
persona *persona.Persona
36-
memory *memory.Manager
37-
model model.ToolCallingChatModel
38-
vision *llm.VisionClient // 多模态视觉模型
39-
bot *onebot.Client
40-
react *react.Agent
41-
tools []tool.BaseTool
42-
mcpMgr *mcp.Manager // MCP 管理器
34+
cfg *config.Config
35+
persona *persona.Persona
36+
memory *memory.Manager
37+
model model.ToolCallingChatModel
38+
vision *llm.VisionClient // 多模态视觉模型
39+
bot *onebot.Client
40+
react *react.Agent
41+
tools []tool.BaseTool
42+
mcpMgr *mcp.Manager // MCP 管理器
43+
concurrencyMgr *ConcurrencyManager // 并发管理器
4344

4445
jargonMgr *jargon.Manager // 黑话管理器
4546
learner *learning.Learner // 后台学习系统
@@ -79,6 +80,9 @@ func New(
7980
stopCh: make(chan struct{}),
8081
}
8182

83+
// 初始化并发管理器
84+
a.concurrencyMgr = NewConcurrencyManager(cfg.Agent.MaxCoroutine, a.think)
85+
8286
// 初始化黑话管理器
8387
a.jargonMgr = jargon.New(mem)
8488

@@ -248,7 +252,7 @@ func (a *Agent) onMessage(msg *onebot.GroupMessage) {
248252

249253
// 如果被 @ 了,立即触发一次思考(跳过等待)
250254
if isMentioned {
251-
go a.think(msg.GroupID, true)
255+
go a.concurrencyMgr.Submit(msg.GroupID, true)
252256
}
253257
}
254258

@@ -437,7 +441,7 @@ func (a *Agent) thinkCycle() {
437441
if rand.Float64() > speakProb {
438442
continue
439443
}
440-
a.think(gc.GroupID, false)
444+
a.concurrencyMgr.Submit(gc.GroupID, false)
441445
}
442446
}
443447

@@ -483,7 +487,7 @@ func (a *Agent) getSpeakProbability(groupID int64) float64 {
483487
return baseProb
484488
}
485489

486-
// think 进行思考和决策
490+
// think 提交思考任务
487491
func (a *Agent) think(groupID int64, isMention bool) {
488492
if a.bot.IsSelfMuted(groupID) {
489493
return

internal/config/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ type AgentConfig struct {
6767
ThinkInterval int `yaml:"think_interval"` // 决策间隔(秒)
6868
MessageBufferSize int `yaml:"message_buffer_size"` // 消息缓冲区大小
6969
MaxStep int `yaml:"max_step"` // ReAct 最大步数
70+
MaxCoroutine int `yaml:"max_coroutine"` // 最大并发思考进程数(0表示不限制)
7071
EnableActiveRetrieval bool `yaml:"enable_active_retrieval"` // 是否启用主动记忆检索(阈值固定0.7)
7172
}
7273

0 commit comments

Comments
 (0)