Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/storm/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ type DelugeOptions struct {

MaxConnections int `long:"max-connections" env:"POOL_MAX_CONNECTIONS" required:"true" default:"5" description:"Maximum concurrent Deluge RPC connections"`
IdleTime *Duration `long:"idle-time" env:"POOL_IDLE_TIME" required:"true" default:"30s" description:"Close idle Deluge RPC connections after this duration"`
ConnectTimeout *Duration `long:"connect-timeout" env:"POOL_CONNECT_TIMEOUT" required:"true" default:"10s" description:"Timeout for establishing new Deluge RPC connections"`
}

func (options *DelugeOptions) Client() storm.DelugeProvider {
Expand All @@ -143,7 +144,7 @@ func (options *DelugeOptions) Client() storm.DelugeProvider {
}

func (options *DelugeOptions) Pool(log *zap.Logger) *storm.ConnectionPool {
return storm.NewConnectionPool(log, options.MaxConnections, options.IdleTime.Duration, options.Client())
return storm.NewConnectionPool(log, options.MaxConnections, options.IdleTime.Duration, options.ConnectTimeout.Duration, options.Client())
}

type Options struct {
Expand Down
41 changes: 34 additions & 7 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,12 @@ func (nullTimer) Stop() bool {
return true
}

func NewConnectionPool(log *zap.Logger, maxConnections int, idleConnectionTime time.Duration, provider DelugeProvider) *ConnectionPool {
func NewConnectionPool(log *zap.Logger, maxConnections int, idleConnectionTime time.Duration, connectTimeout time.Duration, provider DelugeProvider) *ConnectionPool {
pool := &ConnectionPool{
Log: log,
MaxConnections: maxConnections,
IdleConnectionTime: idleConnectionTime,
ConnectTimeout: connectTimeout,
Provider: provider,

get: make(chan *poolReq),
Expand All @@ -79,6 +80,7 @@ type ConnectionPool struct {
Log *zap.Logger
MaxConnections int
IdleConnectionTime time.Duration
ConnectTimeout time.Duration
Provider DelugeProvider

get chan *poolReq
Expand Down Expand Up @@ -209,13 +211,38 @@ func (pool *ConnectionPool) getConn(req *poolReq) {
return
}

// A new connection can be established
conn := pool.Provider()
// A new connection can be established.
// Connect() is run in a goroutine so the pool worker is not blocked indefinitely
// if the TCP dial times out (e.g. IPv6 unreachable with no RST on Go 1.21+ DNS changes).
newConn := pool.Provider()

err := conn.Connect()
if err != nil {
pool.Log.Error("Failed to establish Deluge RPC connection", zap.Error(err))
conn = nil
type connectResult struct {
err error
}
connectCh := make(chan connectResult, 1)
go func() { connectCh <- connectResult{newConn.Connect()} }()

var timeout <-chan time.Time
if pool.ConnectTimeout > 0 {
t := time.NewTimer(pool.ConnectTimeout)
defer t.Stop()
timeout = t.C
}

var conn deluge.DelugeClient
select {
case r := <-connectCh:
if r.err != nil {
pool.Log.Error("Failed to establish Deluge RPC connection", zap.Error(r.err))
} else {
conn = newConn
}
case <-timeout:
pool.Log.Error("Timed out establishing Deluge RPC connection", zap.Duration("timeout", pool.ConnectTimeout))
go func() { <-connectCh; newConn.Close() }()
case <-req.ctx.Done():
go func() { <-connectCh; newConn.Close() }()
return
}

ok := req.Send(conn)
Expand Down
Loading