Skip to content

Commit

Permalink
Fix connection behaviour regarding sessionPresent
Browse files Browse the repository at this point in the history
Use ispirata fork of paho.mqtt.golang

Signed-off-by: Arnaldo Cesco <[email protected]>
  • Loading branch information
Annopaolo committed Jan 20, 2022
1 parent b43c052 commit 438b56d
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 18 deletions.
7 changes: 0 additions & 7 deletions device/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ type Device struct {
messageQueue chan astarteMessageInfo
isSendingStoredMessages bool
volatileMessages []astarteMessageInfo
sessionPresent bool
// MaxInflightMessages is the maximum number of messages that can be in publishing channel at any given time
// before adding messages becomes blocking. Defaults to 100.
MaxInflightMessages int
Expand Down Expand Up @@ -191,12 +190,6 @@ func (d *Device) Connect(result chan<- error) {
if connectToken.Wait() && connectToken.Error() != nil {
return connectToken.Error()
}
if !connectToken.SessionPresent() {
fmt.Println("No MQTT session already present, starting one")
} else {
// remember that a session is present for future reconnections
d.sessionPresent = connectToken.SessionPresent()
}
return nil
}
err = backoff.Retry(connectOperation, policy)
Expand Down
13 changes: 4 additions & 9 deletions device/protocol_mqtt_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ func (d *Device) initializeMQTTClient() error {
}
opts.SetTLSConfig(tlsConfig)

opts.SetOnConnectHandler(func(client mqtt.Client) {
astarteOnConnectHandler(d, client)
opts.SetOnConnectHandler(func(client mqtt.Client, sessionPresent bool) {
astarteOnConnectHandler(d, client, sessionPresent)
})

opts.SetConnectionLostHandler(func(client mqtt.Client, err error) {
Expand Down Expand Up @@ -186,9 +186,9 @@ func (d *Device) handleControlMessages(message string, payload []byte) error {
return nil
}

func astarteOnConnectHandler(d *Device, client mqtt.Client) {
func astarteOnConnectHandler(d *Device, client mqtt.Client, sessionPresent bool) {
// Should we run the whole Astarte after connect thing?
if !d.sessionPresent {
if !sessionPresent {
// Yes, we should: first, setup subscription
if err := d.setupSubscriptions(); err != nil {
errorMsg := fmt.Sprintf("Cannot setup subscriptions: %v", err)
Expand Down Expand Up @@ -237,11 +237,6 @@ func astarteOnConnectHandler(d *Device, client mqtt.Client) {
}
}

// If a device connected for the first time, since we do not ask
// for a clean session and do not change its clientID, we can assume
// that after connection a session is present
d.sessionPresent = true

// If some messages must be retried, do so
d.resendFailedMessages()

Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@ require (
gorm.io/driver/sqlite v1.2.6
gorm.io/gorm v1.22.3
)

replace github.com/eclipse/paho.mqtt.golang v1.3.5 => github.com/ispirata/paho.mqtt.golang v1.3.90
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ github.com/cristalhq/jwt/v3 v3.1.0/go.mod h1:XOnIXst8ozq/esy5N1XOlSyQqBd+84fxJ99
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eclipse/paho.mqtt.golang v1.3.5 h1:sWtmgNxYM9P2sP+xEItMozsR3w0cqZFlqnNN1bdl41Y=
github.com/eclipse/paho.mqtt.golang v1.3.5/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc=
github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
Expand All @@ -22,6 +20,8 @@ github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0U
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/iancoleman/orderedmap v0.2.0 h1:sq1N/TFpYH++aViPcaKjys3bDClUEU7s5B+z6jq8pNA=
github.com/iancoleman/orderedmap v0.2.0/go.mod h1:N0Wam8K1arqPXNWjMo21EXnBPOPp36vB07FNRdD2geA=
github.com/ispirata/paho.mqtt.golang v1.3.90 h1:om0vKIECC6no6q/UTmPPc6K6jwlDscgqKoSbYFujSaw=
github.com/ispirata/paho.mqtt.golang v1.3.90/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.2/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
Expand Down Expand Up @@ -62,6 +62,8 @@ golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d h1:LO7XpTYMwTqxjLcGWPijK3vRX
golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down

0 comments on commit 438b56d

Please sign in to comment.