diff --git a/cmd/manager/start.go b/cmd/manager/start.go index fe31451..0211100 100644 --- a/cmd/manager/start.go +++ b/cmd/manager/start.go @@ -70,7 +70,7 @@ func StartManager(ctx context.Context, cancel context.CancelFunc, cfg Config) er } tracer := tp.Tracer(svcName) - mqttPubSub, err := mqtt.NewPubSub(cfg.MQTTAddress, cfg.MQTTQOS, svcName, cfg.ThingID, cfg.ThingKey, cfg.MQTTTimeout, logger) + mqttPubSub, err := mqtt.NewPubSub(cfg.MQTTAddress, cfg.MQTTQOS, svcName, cfg.ThingID, cfg.ThingKey, cfg.MQTTTimeout, "", nil, logger) if err != nil { return fmt.Errorf("failed to initialize mqtt pubsub: %s", err.Error()) } diff --git a/pkg/mqtt/pubsub.go b/pkg/mqtt/pubsub.go index 8f325fa..673cb46 100644 --- a/pkg/mqtt/pubsub.go +++ b/pkg/mqtt/pubsub.go @@ -36,12 +36,12 @@ type PubSub interface { Close() error } -func NewPubSub(url string, qos byte, id, username, password string, timeout time.Duration, logger *slog.Logger) (PubSub, error) { +func NewPubSub(url string, qos byte, id, username, password string, timeout time.Duration, lwtTopic string, lwtPayload map[string]string, logger *slog.Logger) (PubSub, error) { if id == "" { return nil, errEmptyID } - client, err := newClient(url, id, username, password, timeout) + client, err := newClient(url, id, username, password, timeout, lwtTopic, lwtPayload) if err != nil { return nil, err } @@ -113,12 +113,21 @@ func (ps *pubsub) Close() error { return nil } -func newClient(address, id, username, password string, timeout time.Duration) (mqtt.Client, error) { +func newClient(address, id, username, password string, timeout time.Duration, lwtTopic string, lwtPayload map[string]string) (mqtt.Client, error) { opts := mqtt.NewClientOptions(). SetUsername(username). SetPassword(password). AddBroker(address). SetClientID(id) + + if lwtTopic != "" && lwtPayload != nil { + payload, err := json.Marshal(lwtPayload) + if err != nil { + return nil, fmt.Errorf("failed to serialize LWT payload: %w", err) + } + opts.SetWill(lwtTopic, string(payload), 0, false) + } + client := mqtt.NewClient(opts) token := client.Connect() diff --git a/proplet/mqtt.go b/proplet/mqtt.go index 7ce5abf..bdf0a52 100644 --- a/proplet/mqtt.go +++ b/proplet/mqtt.go @@ -39,6 +39,13 @@ type MQTTService struct { } func NewMQTTService(ctx context.Context, config Config, logger *slog.Logger) (*MQTTService, error) { + lwtTopic := fmt.Sprintf(LWTTopic, config.ChannelID) + lwtPayload := map[string]string{ + "status": "offline", + "proplet_id": config.PropletID, + "chan_id": config.ChannelID, + } + pubsub, err := mqtt.NewPubSub( config.BrokerURL, qos, @@ -46,6 +53,8 @@ func NewMQTTService(ctx context.Context, config Config, logger *slog.Logger) (*M config.PropletID, config.Password, mqttTimeout, + lwtTopic, + lwtPayload, logger, ) if err != nil { @@ -58,18 +67,6 @@ func NewMQTTService(ctx context.Context, config Config, logger *slog.Logger) (*M logger: logger, } - lwtTopic := fmt.Sprintf(LWTTopic, config.ChannelID) - lwtPayload := map[string]string{ - "status": "offline", - "proplet_id": config.PropletID, - "chan_id": config.ChannelID, - } - if err := pubsub.Publish(ctx, lwtTopic, lwtPayload); err != nil { - logger.Error("Failed to set LWT message", slog.Any("error", err)) - - return nil, err - } - if err := service.PublishDiscoveryMessage(ctx); err != nil { logger.Error("Failed to publish discovery message", slog.Any("error", err))