diff --git a/go.mod b/go.mod index 3d3c873..e71a5cf 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.13 require ( github.com/Luzifer/rconfig v2.2.0+incompatible // indirect - github.com/eclipse/paho.mqtt.golang v1.3.3 + github.com/eclipse/paho.mqtt.golang v1.3.5 github.com/luzifer/rconfig v2.2.0+incompatible github.com/onsi/ginkgo v1.10.3 // indirect github.com/onsi/gomega v1.7.1 // indirect diff --git a/go.sum b/go.sum index ec0d3e5..daeccc2 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,8 @@ github.com/Luzifer/rconfig v2.2.0+incompatible h1:Kle3+rshPM7LxciOheaR4EfHUzibkD github.com/Luzifer/rconfig v2.2.0+incompatible/go.mod h1:9pet6z2+mm/UAB0jF/rf0s62USfHNolzgR6Q4KpsJI0= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/eclipse/paho.mqtt.golang v1.3.3 h1:Fh1zsLniMFJByLqKrSB9ZRjkbpU0k1Xne23ZqEE/O08= -github.com/eclipse/paho.mqtt.golang v1.3.3/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc= +github.com/eclipse/paho.mqtt.golang v1.3.5 h1:sWtmgNxYM9P2sP+xEItMozsR3w0cqZFlqnNN1bdl41Y= +github.com/eclipse/paho.mqtt.golang v1.3.5/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/client.go b/vendor/github.com/eclipse/paho.mqtt.golang/client.go index 93b5dbe..847daee 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/client.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/client.go @@ -379,8 +379,13 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) { cm := newConnectMsgFromOptions(&c.options, broker) DEBUG.Println(CLI, "about to write new connect msg") CONN: + tlsCfg := c.options.TLSConfig + if c.options.OnConnectAttempt != nil { + DEBUG.Println(CLI, "using custom onConnectAttempt handler...") + tlsCfg = c.options.OnConnectAttempt(broker, c.options.TLSConfig) + } // Start by opening the network connection (tcp, tls, ws) etc - conn, err = openConnection(broker, c.options.TLSConfig, c.options.ConnectTimeout, c.options.HTTPHeaders, c.options.WebsocketOptions) + conn, err = openConnection(broker, tlsCfg, c.options.ConnectTimeout, c.options.HTTPHeaders, c.options.WebsocketOptions) if err != nil { ERROR.Println(CLI, err.Error()) WARN.Println(CLI, "failed to connect to broker, trying next") @@ -397,7 +402,7 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) { // We may be have to attempt the connection with MQTT 3.1 if conn != nil { - conn.Close() + _ = conn.Close() } if !c.options.protocolVersionExplicit && protocolVersion == 4 { // try falling back to 3.1? DEBUG.Println(CLI, "Trying reconnect using MQTT 3.1 protocol") @@ -434,12 +439,22 @@ func (c *client) Disconnect(quiesce uint) { dm := packets.NewControlPacket(packets.Disconnect).(*packets.DisconnectPacket) dt := newToken(packets.Disconnect) - c.oboundP <- &PacketAndToken{p: dm, t: dt} + disconnectSent := false + select { + case c.oboundP <- &PacketAndToken{p: dm, t: dt}: + disconnectSent = true + case <-c.commsStopped: + WARN.Println("Disconnect packet could not be sent because comms stopped") + case <-time.After(time.Duration(quiesce) * time.Millisecond): + WARN.Println("Disconnect packet not sent due to timeout") + } // wait for work to finish, or quiesce time consumed - DEBUG.Println(CLI, "calling WaitTimeout") - dt.WaitTimeout(time.Duration(quiesce) * time.Millisecond) - DEBUG.Println(CLI, "WaitTimeout done") + if disconnectSent { + DEBUG.Println(CLI, "calling WaitTimeout") + dt.WaitTimeout(time.Duration(quiesce) * time.Millisecond) + DEBUG.Println(CLI, "WaitTimeout done") + } } else { WARN.Println(CLI, "Disconnect() called but not connected (disconnected/reconnecting)") c.setConnected(disconnected) @@ -504,8 +519,8 @@ func (c *client) internalConnLost(err error) { } } -// startCommsWorkers is called when the connection is up. It starts off all of the routines needed to process incoming and -// outgoing messages. +// startCommsWorkers is called when the connection is up. +// It starts off all of the routines needed to process incoming and outgoing messages. // Returns true if the comms workers were started (i.e. they were not already running) func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packets.ControlPacket) bool { DEBUG.Println(CLI, "startCommsWorkers called") @@ -592,7 +607,22 @@ func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packet commsIncomingPub = nil continue } - incomingPubChan <- pub + // Care is needed here because an error elsewhere could trigger a deadlock + sendPubLoop: + for { + select { + case incomingPubChan <- pub: + break sendPubLoop + case err, ok := <-commsErrors: + if !ok { // commsErrors has been closed so we can ignore it + commsErrors = nil + continue + } + ERROR.Println(CLI, "Connect comms goroutine - error triggered during send Pub", err) + c.internalConnLost(err) // no harm in calling this if the connection is already down (or shutdown is in progress) + continue + } + } case err, ok := <-commsErrors: if !ok { commsErrors = nil diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/netconn.go b/vendor/github.com/eclipse/paho.mqtt.golang/netconn.go index 9f9f084..0cb6cd1 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/netconn.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/netconn.go @@ -30,7 +30,8 @@ import ( // This just establishes the network connection; once established the type of connection should be irrelevant // -// openConnection opens a network connection using the protocol indicated in the URL. Does not carry out any MQTT specific handshakes +// openConnection opens a network connection using the protocol indicated in the URL. +// Does not carry out any MQTT specific handshakes. func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, headers http.Header, websocketOptions *WebsocketOptions) (net.Conn, error) { switch uri.Scheme { case "ws": @@ -81,7 +82,7 @@ func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, heade err = tlsConn.Handshake() if err != nil { - conn.Close() + _ = conn.Close() return nil, err } diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/options.go b/vendor/github.com/eclipse/paho.mqtt.golang/options.go index 04f8ae6..4a1420c 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/options.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/options.go @@ -49,6 +49,9 @@ type OnConnectHandler func(Client) // the initial connection is lost type ReconnectHandler func(Client, *ClientOptions) +// ConnectionAttemptHandler is invoked prior to making the initial connection. +type ConnectionAttemptHandler func(broker *url.URL, tlsCfg *tls.Config) *tls.Config + // ClientOptions contains configurable options for an Client. Note that these should be set using the // relevant methods (e.g. AddBroker) rather than directly. See those functions for information on usage. type ClientOptions struct { @@ -79,6 +82,7 @@ type ClientOptions struct { OnConnect OnConnectHandler OnConnectionLost ConnectionLostHandler OnReconnecting ReconnectHandler + OnConnectAttempt ConnectionAttemptHandler WriteTimeout time.Duration MessageChannelDepth uint ResumeSubs bool @@ -120,6 +124,7 @@ func NewClientOptions() *ClientOptions { Store: nil, OnConnect: nil, OnConnectionLost: DefaultConnectionLostHandler, + OnConnectAttempt: nil, WriteTimeout: 0, // 0 represents timeout disabled ResumeSubs: false, HTTPHeaders: make(map[string][]string), @@ -321,6 +326,15 @@ func (o *ClientOptions) SetReconnectingHandler(cb ReconnectHandler) *ClientOptio return o } +// SetConnectionAttemptHandler sets the ConnectionAttemptHandler callback to be executed prior +// to each attempt to connect to an MQTT broker. Returns the *tls.Config that will be used when establishing +// the connection (a copy of the tls.Config from ClientOptions will be passed in along with the broker URL). +// This allows connection specific changes to be made to the *tls.Config. +func (o *ClientOptions) SetConnectionAttemptHandler(onConnectAttempt ConnectionAttemptHandler) *ClientOptions { + o.OnConnectAttempt = onConnectAttempt + return o +} + // SetWriteTimeout puts a limit on how long a mqtt publish should block until it unblocks with a // timeout error. A duration of 0 never times out. Default never times out func (o *ClientOptions) SetWriteTimeout(t time.Duration) *ClientOptions { diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/packets/connect.go b/vendor/github.com/eclipse/paho.mqtt.golang/packets/connect.go index 7284682..2184703 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/packets/connect.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/packets/connect.go @@ -29,7 +29,11 @@ type ConnectPacket struct { } func (c *ConnectPacket) String() string { - return fmt.Sprintf("%s protocolversion: %d protocolname: %s cleansession: %t willflag: %t WillQos: %d WillRetain: %t Usernameflag: %t Passwordflag: %t keepalive: %d clientId: %s willtopic: %s willmessage: %s Username: %s Password: %s", c.FixedHeader, c.ProtocolVersion, c.ProtocolName, c.CleanSession, c.WillFlag, c.WillQos, c.WillRetain, c.UsernameFlag, c.PasswordFlag, c.Keepalive, c.ClientIdentifier, c.WillTopic, c.WillMessage, c.Username, c.Password) + var password string + if len(c.Password) > 0 { + password = "" + } + return fmt.Sprintf("%s protocolversion: %d protocolname: %s cleansession: %t willflag: %t WillQos: %d WillRetain: %t Usernameflag: %t Passwordflag: %t keepalive: %d clientId: %s willtopic: %s willmessage: %s Username: %s Password: %s", c.FixedHeader, c.ProtocolVersion, c.ProtocolName, c.CleanSession, c.WillFlag, c.WillQos, c.WillRetain, c.UsernameFlag, c.PasswordFlag, c.Keepalive, c.ClientIdentifier, c.WillTopic, c.WillMessage, c.Username, password) } func (c *ConnectPacket) Write(w io.Writer) error { diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/router.go b/vendor/github.com/eclipse/paho.mqtt.golang/router.go index 42261ee..4737d87 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/router.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/router.go @@ -132,13 +132,46 @@ func (r *router) setDefaultHandler(handler MessageHandler) { // associated callback (or the defaultHandler, if one exists and no other route matched). If // anything is sent down the stop channel the function will end. func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order bool, client *client) <-chan *PacketAndToken { - ackChan := make(chan *PacketAndToken) - go func() { + var wg sync.WaitGroup + ackOutChan := make(chan *PacketAndToken) // Channel returned to caller; closed when messages channel closed + var ackInChan chan *PacketAndToken // ACKs generated by ackFunc get put onto this channel + + stopAckCopy := make(chan struct{}) // Closure requests stop of go routine copying ackInChan to ackOutChan + ackCopyStopped := make(chan struct{}) // Closure indicates that it is safe to close ackOutChan + goRoutinesDone := make(chan struct{}) // closed on wg.Done() + if order { + ackInChan = ackOutChan // When order = true no go routines are used so safe to use one channel and close when done + } else { + // When order = false ACK messages are sent in go routines so ackInChan cannot be closed until all goroutines done + ackInChan = make(chan *PacketAndToken) + go func() { // go routine to copy from ackInChan to ackOutChan until stopped + for { + select { + case a := <-ackInChan: + ackOutChan <- a + case <-stopAckCopy: + close(ackCopyStopped) // Signal main go routine that it is safe to close ackOutChan + for { + select { + case <-ackInChan: // drain ackInChan to ensure all goRoutines can complete cleanly (ACK dropped) + DEBUG.Println(ROU, "matchAndDispatch received acknowledgment after processing stopped (ACK dropped).") + case <-goRoutinesDone: + close(ackInChan) // Nothing further should be sent (a panic is probably better than silent failure) + DEBUG.Println(ROU, "matchAndDispatch order=false copy goroutine exiting.") + return + } + } + } + } + }() + } + + go func() { // Main go routine handling inbound messages for message := range messages { // DEBUG.Println(ROU, "matchAndDispatch received message") sent := false r.RLock() - m := messageFromPublish(message, ackFunc(ackChan, client.persist, message)) + m := messageFromPublish(message, ackFunc(ackInChan, client.persist, message)) var handlers []MessageHandler for e := r.routes.Front(); e != nil; e = e.Next() { if e.Value.(*route).match(message.TopicName) { @@ -146,9 +179,11 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order handlers = append(handlers, e.Value.(*route).callback) } else { hd := e.Value.(*route).callback + wg.Add(1) go func() { hd(client, m) m.Ack() + wg.Done() }() } sent = true @@ -159,9 +194,11 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order if order { handlers = append(handlers, r.defaultHandler) } else { + wg.Add(1) go func() { r.defaultHandler(client, m) m.Ack() + wg.Done() }() } } else { @@ -175,8 +212,18 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order } // DEBUG.Println(ROU, "matchAndDispatch handled message") } - close(ackChan) + if order { + close(ackOutChan) + } else { // Ensure that nothing further will be written to ackOutChan before closing it + close(stopAckCopy) + <-ackCopyStopped + close(ackOutChan) + go func() { + wg.Wait() // Note: If this remains running then the user has handlers that are not returning + close(goRoutinesDone) + }() + } DEBUG.Println(ROU, "matchAndDispatch exiting") }() - return ackChan + return ackOutChan } diff --git a/vendor/modules.txt b/vendor/modules.txt index d75a18e..a85f618 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1,4 +1,4 @@ -# github.com/eclipse/paho.mqtt.golang v1.3.3 +# github.com/eclipse/paho.mqtt.golang v1.3.5 github.com/eclipse/paho.mqtt.golang github.com/eclipse/paho.mqtt.golang/packets # github.com/gorilla/websocket v1.4.2