diff --git a/examples/zinx_test_stopChanelPanic/client/main.go b/examples/zinx_test_stopChanelPanic/client/main.go new file mode 100644 index 00000000..cacb0f24 --- /dev/null +++ b/examples/zinx_test_stopChanelPanic/client/main.go @@ -0,0 +1,43 @@ +package main + +import ( + "fmt" + "os" + "os/signal" + "time" + + "github.com/aceld/zinx/examples/zinx_test_stopChanelPanic/router" + "github.com/aceld/zinx/ziface" + "github.com/aceld/zinx/znet" +) + +func onClientStart(conn ziface.IConnection) { + fmt.Println("[Client] 连接建立成功,开始发送测试消息...") + + // 发送测试消息 + go func() { + time.Sleep(500 * time.Millisecond) // 等待一下确保连接稳定 + err := conn.SendMsg(1, []byte("Test Panic")) + if err != nil { + fmt.Println("[Client] 发送失败:", err) + } else { + fmt.Println("[Client] 测试消息已发送") + } + }() +} + +func main() { + client := znet.NewClient("127.0.0.1", 8999) + client.AddRouter(1, &router.PanicTestRouter{znet.BaseRouter{}}) + client.SetOnConnStart(onClientStart) + // Start the client + client.Start() + + // close + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, os.Kill) + sig := <-c + fmt.Println("===exit===", sig) + client.Stop() + time.Sleep(time.Second * 2) +} diff --git a/examples/zinx_test_stopChanelPanic/router/router.go b/examples/zinx_test_stopChanelPanic/router/router.go new file mode 100644 index 00000000..23efe88b --- /dev/null +++ b/examples/zinx_test_stopChanelPanic/router/router.go @@ -0,0 +1,44 @@ +package router + +import ( + "fmt" + "sync" + "time" + + "github.com/aceld/zinx/ziface" + "github.com/aceld/zinx/znet" +) + +type PanicTestRouter struct { + znet.BaseRouter +} + +func (r *PanicTestRouter) Handle(req ziface.IRequest) { + conn := req.GetConnection() + + var wg sync.WaitGroup + + for i := 0; i < 10000; i++ { + wg.Add(1) + go func(index int) { + defer wg.Done() + + msg := fmt.Sprintf("Concurrent message %d at %v", index, time.Now().UnixNano()) + err := conn.SendBuffMsg(1, []byte(msg)) + if err != nil { + fmt.Printf("Send error: %v\n", err) + } else { + fmt.Printf("Sent: %s\n", msg) + } + }(i) + + if i == 5000 { + time.Sleep(5 * time.Millisecond) + fmt.Println(">>> Stopping connection while sending...") + conn.Stop() + } + } + + wg.Wait() + fmt.Println("All goroutines finished") +} diff --git a/examples/zinx_test_stopChanelPanic/sever/main.go b/examples/zinx_test_stopChanelPanic/sever/main.go new file mode 100644 index 00000000..55fb921c --- /dev/null +++ b/examples/zinx_test_stopChanelPanic/sever/main.go @@ -0,0 +1,17 @@ +package main + +import ( + "fmt" + + "github.com/aceld/zinx/examples/zinx_test_stopChanelPanic/router" + "github.com/aceld/zinx/znet" +) + +func main() { + fmt.Println("[Server] server start") + s := znet.NewServer() + s.AddRouter(1, &router.PanicTestRouter{znet.BaseRouter{}}) + s.Serve() +} + +// ... existing code ... diff --git a/znet/connection.go b/znet/connection.go index 190f6661..64423ebf 100644 --- a/znet/connection.go +++ b/znet/connection.go @@ -63,6 +63,8 @@ type Connection struct { // (有缓冲管道,用于读、写两个goroutine之间的消息通信) msgBuffChan chan []byte + closeOnce sync.Once + // Go StartWriter Flag // (开始初始化写协程标志) startWriterFlag int32 @@ -433,7 +435,10 @@ func (c *Connection) SendToQueue(data []byte, opts ...ziface.MsgSendOption) erro select { case <-c.ctx.Done(): // Close all channels associated with the connection - close(c.msgBuffChan) + // Close Once to avoid repeated closure + c.closeOnce.Do(func() { + close(c.msgBuffChan) + }) return errors.New("connection closed when send buff msg") case <-idleTimeout.C: return errors.New("send buff msg timeout")