Skip to content

Commit

Permalink
Use pkg/mqtt package in proplet implementation
Browse files Browse the repository at this point in the history
Signed-off-by: JeffMboya <[email protected]>
  • Loading branch information
JeffMboya committed Dec 12, 2024
1 parent 1dee293 commit 00181a6
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 174 deletions.
1 change: 1 addition & 0 deletions pkg/mqtt/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type PubSub interface {
Publish(ctx context.Context, topic string, msg any) error
Subscribe(ctx context.Context, topic string, handler Handler) error
Unsubscribe(ctx context.Context, topic string) error
Close() error
}

func NewPubSub(url string, qos byte, id, username, password string, timeout time.Duration, logger *slog.Logger) (PubSub, error) {
Expand Down
189 changes: 108 additions & 81 deletions proplet/mqtt.go
Original file line number Diff line number Diff line change
@@ -1,131 +1,158 @@
package proplet

import (
"encoding/json"
"context"
"fmt"
"log/slog"
"time"

pkgerrors "github.com/absmach/propeller/pkg/errors"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/absmach/propeller/pkg/mqtt"
)

const livelinessInterval = 10 * time.Second
const (
livelinessInterval = 10 * time.Second
mqttTimeout = 30 * time.Second
qos = 0
)

var (
RegistryFailurePayload = `{"status":"failure","error":"%v"}`
RegistrySuccessPayload = `{"status":"success"}`
RegistryAckTopicTemplate = "channels/%s/messages/control/manager/registry"
lwtPayloadTemplate = `{"status":"online","proplet_id":"%s","chan_id":"%s"}`
discoveryPayloadTemplate = `{"proplet_id":"%s","chan_id":"%s"}`
alivePayloadTemplate = `{"status":"alive","proplet_id":"%s","chan_id":"%s"}`
aliveTopicTemplate = "channels/%s/messages/control/proplet/alive"
discoveryTopicTemplate = "channels/%s/messages/control/proplet/create"
startTopicTemplate = "channels/%s/messages/control/manager/start"
stopTopicTemplate = "channels/%s/messages/control/manager/stop"
registryUpdateTopicTemplate = "channels/%s/messages/control/manager/updateRegistry"
registryResponseTopic = "channels/%s/messages/registry/server"
fetchRequestTopicTemplate = "channels/%s/messages/registry/proplet"
RegistryUpdateRequestTopic = "channels/%s/messages/control/manager/updateRegistry"
RegistryUpdateResponseTopic = "channels/%s/messages/control/proplet/updateRegistry"
AliveTopic = "channels/%s/messages/control/proplet/alive"
AlivePayload = `{"status":"alive","proplet_id":"%s","chan_id":"%s"}`
DiscoveryTopic = "channels/%s/messages/control/proplet/create"
DiscoveryPayload = `{"proplet_id":"%s","chan_id":"%s"}`
LWTTopic = "channels/%s/messages/control/proplet/create"
LWTPayload = `{"status":"online","proplet_id":"%s","chan_id":"%s"}`
StartTopic = "channels/%s/messages/control/manager/start"
StopTopic = "channels/%s/messages/control/manager/stop"
RegistryResponseTopic = "channels/%s/messages/registry/server"
RegistryRequestTopic = "channels/%s/messages/registry/proplet"
)

func NewMQTTClient(config Config, logger *slog.Logger) (mqtt.Client, error) {
lwtPayload := fmt.Sprintf(lwtPayloadTemplate, config.PropletID, config.ChannelID)
if lwtPayload == "" {
return nil, fmt.Errorf("failed to prepare MQTT last will payload: %w", pkgerrors.ErrMQTTWillPayloadFailed)
}

opts := mqtt.NewClientOptions().
AddBroker(config.BrokerURL).
SetClientID("Proplet-"+config.PropletID).
SetUsername(config.PropletID).
SetPassword(config.Password).
SetCleanSession(true).
SetWill(aliveTopicTemplate+config.ChannelID, lwtPayloadTemplate+config.PropletID+config.ChannelID, 0, false)

logger.Info("Configured Last Will and Testament")

opts.SetConnectionLostHandler(func(client mqtt.Client, err error) {
logger.Error("MQTT connection lost", slog.Any("error", err))
})

opts.SetReconnectingHandler(func(client mqtt.Client, options *mqtt.ClientOptions) {
logger.Info("MQTT reconnecting")
})
type MQTTService struct {
pubsub mqtt.PubSub
config Config
logger *slog.Logger
}

client := mqtt.NewClient(opts)
password := client.Connect()
if password.Wait() && password.Error() != nil {
return nil, fmt.Errorf("failed to connect to MQTT broker '%s': %w", config.BrokerURL, pkgerrors.ErrMQTTConnectionFailed)
func NewMQTTService(ctx context.Context, config Config, logger *slog.Logger) (*MQTTService, error) {
pubsub, err := mqtt.NewPubSub(
config.BrokerURL,
qos,
"Proplet-"+config.PropletID,
config.PropletID,
config.Password,
mqttTimeout,
logger,
)
if err != nil {
return nil, fmt.Errorf("failed to initialize MQTT PubSub: %w", err)
}

PublishDiscovery(client, config, logger)
service := &MQTTService{
pubsub: pubsub,
config: config,
logger: logger,
}

go startLivelinessUpdates(client, config, logger)
lwtTopic := fmt.Sprintf(LWTTopic, config.ChannelID)
lwtPayload := map[string]string{
"status": "offline",
"proplet_id": config.PropletID,
"chan_id": config.ChannelID,
}
if err := pubsub.Publish(ctx, lwtTopic, lwtPayload); err != nil {
logger.Error("Failed to set LWT message", slog.Any("error", err))

return client, nil
}
return nil, err
}

func PublishDiscovery(client mqtt.Client, config Config, logger *slog.Logger) {
topic := fmt.Sprintf(discoveryTopicTemplate, config.ChannelID)
payload := fmt.Sprintf(discoveryPayloadTemplate, config.PropletID, config.ChannelID)
password := client.Publish(topic, 0, false, payload)
password.Wait()
if password.Error() != nil {
logger.Info("failed to publish discovery message: %w", slog.Any("error", password.Error()))
if err := service.PublishDiscoveryMessage(ctx); err != nil {
logger.Error("Failed to publish discovery message", slog.Any("error", err))

return
return nil, err
}

logger.Info("Discovery message published successfully")
go service.StartLivelinessUpdates(ctx)

return service, nil
}

func startLivelinessUpdates(client mqtt.Client, config Config, logger *slog.Logger) {
func (m *MQTTService) StartLivelinessUpdates(ctx context.Context) {
ticker := time.NewTicker(livelinessInterval)
defer ticker.Stop()

for range ticker.C {
password := client.Publish(fmt.Sprintf(aliveTopicTemplate, config.ChannelID), 0, false, fmt.Sprintf(alivePayloadTemplate, config.PropletID, config.ChannelID))
password.Wait()
if password.Error() != nil {
logger.Error("Failed to publish liveliness message", slog.String("topic", fmt.Sprintf(aliveTopicTemplate, config.ChannelID)), slog.Any("error", password.Error()))
} else {
logger.Info("Published liveliness message")
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
err := m.pubsub.Publish(ctx, fmt.Sprintf(AliveTopic, m.config.ChannelID), map[string]string{
"status": "alive",
"proplet_id": m.config.PropletID,
"chan_id": m.config.ChannelID,
})
if err != nil {
m.logger.Error("Failed to publish liveliness message", slog.Any("error", err))
} else {
m.logger.Info("Published liveliness message")
}
}
}
}

func SubscribeToManagerTopics(client mqtt.Client, config Config, startHandler, stopHandler, registryHandler mqtt.MessageHandler) error {
if password := client.Subscribe(fmt.Sprintf(startTopicTemplate, config.ChannelID), 0, startHandler); password.Wait() && password.Error() != nil {
return fmt.Errorf("failed to subscribe to start topic: %w", password.Error())
func (m *MQTTService) PublishDiscoveryMessage(ctx context.Context) error {
topic := fmt.Sprintf(DiscoveryTopic, m.config.ChannelID)
payload := map[string]string{
"proplet_id": m.config.PropletID,
"chan_id": m.config.ChannelID,
}

if password := client.Subscribe(fmt.Sprintf(stopTopicTemplate, config.ChannelID), 0, stopHandler); password.Wait() && password.Error() != nil {
return fmt.Errorf("failed to subscribe to stop topic: %w", password.Error())
if err := m.pubsub.Publish(ctx, topic, payload); err != nil {
return fmt.Errorf("failed to publish discovery message: %w", err)
}
m.logger.Info("Discovery message published successfully")

return nil
}

if password := client.Subscribe(fmt.Sprintf(registryUpdateTopicTemplate, config.ChannelID), 0, registryHandler); password.Wait() && password.Error() != nil {
return fmt.Errorf("failed to subscribe to registry update topic: %w", password.Error())
func (m *MQTTService) SubscribeToManagerTopics(ctx context.Context, startHandler, stopHandler, registryHandler mqtt.Handler) error {
handlers := map[string]mqtt.Handler{
fmt.Sprintf(StartTopic, m.config.ChannelID): startHandler,
fmt.Sprintf(StopTopic, m.config.ChannelID): stopHandler,
fmt.Sprintf(RegistryUpdateRequestTopic, m.config.ChannelID): registryHandler,
}
for topic, handler := range handlers {
if err := m.pubsub.Subscribe(ctx, topic, handler); err != nil {
return fmt.Errorf("failed to subscribe to topic %s: %w", topic, err)
}
}

return nil
}

func SubscribeToRegistryTopic(client mqtt.Client, channelID string, handler mqtt.MessageHandler, logger *slog.Logger) error {
if password := client.Subscribe(fmt.Sprintf(registryResponseTopic, channelID), 0, handler); password.Wait() && password.Error() != nil {
return fmt.Errorf("failed to subscribe to registry topic '%s': %w", fmt.Sprintf(registryResponseTopic, channelID), password.Error())
func (m *MQTTService) SubscribeToRegistryTopic(ctx context.Context, handler mqtt.Handler) error {
topic := fmt.Sprintf(RegistryResponseTopic, m.config.ChannelID)
if err := m.pubsub.Subscribe(ctx, topic, handler); err != nil {
return fmt.Errorf("failed to subscribe to registry topic: %w", err)
}

return nil
}

func PublishFetchRequest(client mqtt.Client, channelID, appName string, logger *slog.Logger) error {
payload, err := json.Marshal(map[string]string{"app_name": appName})
if err != nil {
return fmt.Errorf("failed to marshal fetch request payload: %w", err)
}
if password := client.Publish(fmt.Sprintf(fetchRequestTopicTemplate, channelID), 0, false, payload); password.Wait() && password.Error() != nil {
return fmt.Errorf("failed to publish fetch request: %w", password.Error())
func (m *MQTTService) PublishFetchRequest(ctx context.Context, appName string) error {
topic := fmt.Sprintf(RegistryRequestTopic, m.config.ChannelID)
payload := map[string]string{"app_name": appName}
if err := m.pubsub.Publish(ctx, topic, payload); err != nil {
return fmt.Errorf("failed to publish fetch request: %w", err)
}
m.logger.Info("Fetch request published successfully")

return nil
}

func (m *MQTTService) Close() error {
return m.pubsub.Close()
}
Loading

0 comments on commit 00181a6

Please sign in to comment.