Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions examples/zinx_test_stopChanelPanic/client/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package main

import (
"fmt"
"os"
"os/signal"
"time"

"github.com/aceld/zinx/examples/zinx_test_stopChanelPanic/router"
"github.com/aceld/zinx/ziface"
"github.com/aceld/zinx/znet"
)

func onClientStart(conn ziface.IConnection) {
fmt.Println("[Client] 连接建立成功,开始发送测试消息...")

// 发送测试消息
go func() {
time.Sleep(500 * time.Millisecond) // 等待一下确保连接稳定
err := conn.SendMsg(1, []byte("Test Panic"))
if err != nil {
fmt.Println("[Client] 发送失败:", err)
} else {
fmt.Println("[Client] 测试消息已发送")
}
}()
}

func main() {
client := znet.NewClient("127.0.0.1", 8999)
client.AddRouter(1, &router.PanicTestRouter{znet.BaseRouter{}})
client.SetOnConnStart(onClientStart)
// Start the client
client.Start()

// close
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, os.Kill)
sig := <-c
fmt.Println("===exit===", sig)
client.Stop()
time.Sleep(time.Second * 2)
}
44 changes: 44 additions & 0 deletions examples/zinx_test_stopChanelPanic/router/router.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package router

import (
"fmt"
"sync"
"time"

"github.com/aceld/zinx/ziface"
"github.com/aceld/zinx/znet"
)

type PanicTestRouter struct {
znet.BaseRouter
}

func (r *PanicTestRouter) Handle(req ziface.IRequest) {
conn := req.GetConnection()

var wg sync.WaitGroup

for i := 0; i < 10000; i++ {
wg.Add(1)
go func(index int) {
defer wg.Done()

msg := fmt.Sprintf("Concurrent message %d at %v", index, time.Now().UnixNano())
err := conn.SendBuffMsg(1, []byte(msg))
if err != nil {
fmt.Printf("Send error: %v\n", err)
} else {
fmt.Printf("Sent: %s\n", msg)
}
}(i)

if i == 5000 {
time.Sleep(5 * time.Millisecond)
fmt.Println(">>> Stopping connection while sending...")
conn.Stop()
}
}

wg.Wait()
fmt.Println("All goroutines finished")
}
17 changes: 17 additions & 0 deletions examples/zinx_test_stopChanelPanic/sever/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package main

import (
"fmt"

"github.com/aceld/zinx/examples/zinx_test_stopChanelPanic/router"
"github.com/aceld/zinx/znet"
)

func main() {
fmt.Println("[Server] server start")
s := znet.NewServer()
s.AddRouter(1, &router.PanicTestRouter{znet.BaseRouter{}})
s.Serve()
}

// ... existing code ...
7 changes: 6 additions & 1 deletion znet/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ type Connection struct {
// (有缓冲管道,用于读、写两个goroutine之间的消息通信)
msgBuffChan chan []byte

closeOnce sync.Once

// Go StartWriter Flag
// (开始初始化写协程标志)
startWriterFlag int32
Expand Down Expand Up @@ -433,7 +435,10 @@ func (c *Connection) SendToQueue(data []byte, opts ...ziface.MsgSendOption) erro
select {
case <-c.ctx.Done():
// Close all channels associated with the connection
close(c.msgBuffChan)
// Close Once to avoid repeated closure
c.closeOnce.Do(func() {
close(c.msgBuffChan)
})
return errors.New("connection closed when send buff msg")
case <-idleTimeout.C:
return errors.New("send buff msg timeout")
Expand Down
Loading