diff --git a/cmd/notifications.go b/cmd/notifications.go index 34fbb12..d228f86 100644 --- a/cmd/notifications.go +++ b/cmd/notifications.go @@ -42,6 +42,7 @@ var notificationsCmd = &cobra.Command{ disc := discovery.NewDiscoverer(dOpts...) fsnChan := disc.GetFullStatusNotifications(50) snChan := disc.GetStatusNotifications(50) + enChan := disc.GetEventNotifications(50) if err := disc.MQTTConnect(ctx); err != nil { l.Fatal().Err(err).Msg("connecting to MQTT broker") } @@ -85,6 +86,22 @@ var notificationsCmd = &cobra.Command{ sn.Status, sn.Frame.Params, ) + case en := <-enChan: + log.Debug(). + Str("src", en.Frame.Src). + Str("dst", en.Frame.Dst). + Str("method", en.Frame.Method). + Any("msg", en.Event). + Float64("timestamp", en.Event.TS). + Str("raw", string(en.Frame.Params)). + Msg("got NotifyStatus") + Output( + ctx, + fmt.Sprintf("Received NotifyStatus frame from %s", en.Frame.Src), + "notification", + en.Event, + en.Frame.Params, + ) } } }, diff --git a/pkg/discovery/discovery.go b/pkg/discovery/discovery.go index ed51990..8eb9ded 100644 --- a/pkg/discovery/discovery.go +++ b/pkg/discovery/discovery.go @@ -63,6 +63,7 @@ type Discoverer struct { statusChan chan StatusNotification fullStatusChan chan StatusNotification + eventChan chan EventNotification } // AddDeviceByAddress attempts to parse a user-provided URI and add the device. diff --git a/pkg/discovery/mqtt.go b/pkg/discovery/mqtt.go index abae250..d2a73cd 100644 --- a/pkg/discovery/mqtt.go +++ b/pkg/discovery/mqtt.go @@ -11,7 +11,6 @@ import ( mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/jcodybaker/go-shelly" "github.com/mongoose-os/mos/common/mgrpc" - "github.com/mongoose-os/mos/common/mgrpc/frame" ) func (d *Discoverer) MQTTConnect(ctx context.Context) error { @@ -38,9 +37,7 @@ func (d *Discoverer) MQTTConnect(ctx context.Context) error { s := mgrpc.Serve(ctx, c) s.AddHandler("NotifyStatus", d.statusNotificationHandler) s.AddHandler("NotifyFullStatus", d.fullStatusNotificationHandler) - s.AddHandler("NotifyEvents", func(mr mgrpc.MgRPC, f *frame.Frame) *frame.Frame { - return nil - }) + s.AddHandler("NotifyEvent", d.eventNotificationHandler) } return nil } diff --git a/pkg/discovery/notifications.go b/pkg/discovery/notifications.go index 1937543..5ea04a5 100644 --- a/pkg/discovery/notifications.go +++ b/pkg/discovery/notifications.go @@ -15,6 +15,11 @@ type StatusNotification struct { Frame *frame.Frame } +type EventNotification struct { + Event *shelly.NotifyEvent + Frame *frame.Frame +} + // GetFullStatusNotifications returns a channel which provides NotifyFullStatus messages. // Messages received before the first invocation of GetFullStatusNotifications will be discarded. // Consumers MUST be responsive or ther MQTT channel may drop messages. @@ -39,6 +44,18 @@ func (d *Discoverer) GetStatusNotifications(buffer int) <-chan StatusNotificatio return d.statusChan } +// GetEventNotifications returns a channel which provides events. +// Messages received before the first invocation of GetEventNotifications will be discarded. +// Consumers MUST be responsive or ther MQTT channel may drop messages. +func (d *Discoverer) GetEventNotifications(buffer int) <-chan EventNotification { + d.lock.Lock() + defer d.lock.Unlock() + if d.eventChan == nil { + d.eventChan = make(chan EventNotification, buffer) + } + return d.eventChan +} + func (d *Discoverer) statusNotificationHandler(mr mgrpc.MgRPC, f *frame.Frame) *frame.Frame { d.lock.Lock() defer d.lock.Unlock() @@ -84,3 +101,26 @@ func (d *Discoverer) fullStatusNotificationHandler(mr mgrpc.MgRPC, f *frame.Fram } return nil } + +func (d *Discoverer) eventNotificationHandler(mr mgrpc.MgRPC, f *frame.Frame) *frame.Frame { + d.lock.Lock() + defer d.lock.Unlock() + if d.eventChan == nil { + return nil + } + e := &shelly.NotifyEvent{} + if err := json.Unmarshal(f.Params, &e); err != nil { + log.Err(err). + Str("src", f.Src). + Str("dst", f.Dst). + Int64("id", f.ID). + Str("method", f.Method). + Str("payload", string(f.Params)). + Msg("unmarshalling NotifyFullStatus frame") + } + d.eventChan <- EventNotification{ + Event: e, + Frame: f, + } + return nil +}