From 438b56d184bb0a954d1544b35514db3477ea9228 Mon Sep 17 00:00:00 2001 From: Arnaldo Cesco Date: Thu, 20 Jan 2022 16:53:38 +0100 Subject: [PATCH] Fix connection behaviour regarding sessionPresent Use ispirata fork of paho.mqtt.golang Signed-off-by: Arnaldo Cesco --- device/device.go | 7 ------- device/protocol_mqtt_v1.go | 13 ++++--------- go.mod | 2 ++ go.sum | 6 ++++-- 4 files changed, 10 insertions(+), 18 deletions(-) diff --git a/device/device.go b/device/device.go index 64fba8c..a5abb26 100644 --- a/device/device.go +++ b/device/device.go @@ -47,7 +47,6 @@ type Device struct { messageQueue chan astarteMessageInfo isSendingStoredMessages bool volatileMessages []astarteMessageInfo - sessionPresent bool // MaxInflightMessages is the maximum number of messages that can be in publishing channel at any given time // before adding messages becomes blocking. Defaults to 100. MaxInflightMessages int @@ -191,12 +190,6 @@ func (d *Device) Connect(result chan<- error) { if connectToken.Wait() && connectToken.Error() != nil { return connectToken.Error() } - if !connectToken.SessionPresent() { - fmt.Println("No MQTT session already present, starting one") - } else { - // remember that a session is present for future reconnections - d.sessionPresent = connectToken.SessionPresent() - } return nil } err = backoff.Retry(connectOperation, policy) diff --git a/device/protocol_mqtt_v1.go b/device/protocol_mqtt_v1.go index e6eb61e..ca48ed8 100644 --- a/device/protocol_mqtt_v1.go +++ b/device/protocol_mqtt_v1.go @@ -48,8 +48,8 @@ func (d *Device) initializeMQTTClient() error { } opts.SetTLSConfig(tlsConfig) - opts.SetOnConnectHandler(func(client mqtt.Client) { - astarteOnConnectHandler(d, client) + opts.SetOnConnectHandler(func(client mqtt.Client, sessionPresent bool) { + astarteOnConnectHandler(d, client, sessionPresent) }) opts.SetConnectionLostHandler(func(client mqtt.Client, err error) { @@ -186,9 +186,9 @@ func (d *Device) handleControlMessages(message string, payload []byte) error { return nil } -func astarteOnConnectHandler(d *Device, client mqtt.Client) { +func astarteOnConnectHandler(d *Device, client mqtt.Client, sessionPresent bool) { // Should we run the whole Astarte after connect thing? - if !d.sessionPresent { + if !sessionPresent { // Yes, we should: first, setup subscription if err := d.setupSubscriptions(); err != nil { errorMsg := fmt.Sprintf("Cannot setup subscriptions: %v", err) @@ -237,11 +237,6 @@ func astarteOnConnectHandler(d *Device, client mqtt.Client) { } } - // If a device connected for the first time, since we do not ask - // for a clean session and do not change its clientID, we can assume - // that after connection a session is present - d.sessionPresent = true - // If some messages must be retried, do so d.resendFailedMessages() diff --git a/go.mod b/go.mod index bcc8157..48e03bd 100644 --- a/go.mod +++ b/go.mod @@ -18,3 +18,5 @@ require ( gorm.io/driver/sqlite v1.2.6 gorm.io/gorm v1.22.3 ) + +replace github.com/eclipse/paho.mqtt.golang v1.3.5 => github.com/ispirata/paho.mqtt.golang v1.3.90 diff --git a/go.sum b/go.sum index 08b4c3d..afd2078 100644 --- a/go.sum +++ b/go.sum @@ -8,8 +8,6 @@ github.com/cristalhq/jwt/v3 v3.1.0/go.mod h1:XOnIXst8ozq/esy5N1XOlSyQqBd+84fxJ99 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/eclipse/paho.mqtt.golang v1.3.5 h1:sWtmgNxYM9P2sP+xEItMozsR3w0cqZFlqnNN1bdl41Y= -github.com/eclipse/paho.mqtt.golang v1.3.5/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc= github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= @@ -22,6 +20,8 @@ github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0U github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/iancoleman/orderedmap v0.2.0 h1:sq1N/TFpYH++aViPcaKjys3bDClUEU7s5B+z6jq8pNA= github.com/iancoleman/orderedmap v0.2.0/go.mod h1:N0Wam8K1arqPXNWjMo21EXnBPOPp36vB07FNRdD2geA= +github.com/ispirata/paho.mqtt.golang v1.3.90 h1:om0vKIECC6no6q/UTmPPc6K6jwlDscgqKoSbYFujSaw= +github.com/ispirata/paho.mqtt.golang v1.3.90/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/now v1.1.2/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= @@ -62,6 +62,8 @@ golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d h1:LO7XpTYMwTqxjLcGWPijK3vRX golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=