Skip to content

Commit

Permalink
support for event watching
Browse files Browse the repository at this point in the history
  • Loading branch information
jcodybaker committed Dec 16, 2024
1 parent ceacfec commit 4462eb2
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 4 deletions.
17 changes: 17 additions & 0 deletions cmd/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ var notificationsCmd = &cobra.Command{
disc := discovery.NewDiscoverer(dOpts...)
fsnChan := disc.GetFullStatusNotifications(50)
snChan := disc.GetStatusNotifications(50)
enChan := disc.GetEventNotifications(50)
if err := disc.MQTTConnect(ctx); err != nil {
l.Fatal().Err(err).Msg("connecting to MQTT broker")
}
Expand Down Expand Up @@ -85,6 +86,22 @@ var notificationsCmd = &cobra.Command{
sn.Status,
sn.Frame.Params,
)
case en := <-enChan:
log.Debug().
Str("src", en.Frame.Src).
Str("dst", en.Frame.Dst).
Str("method", en.Frame.Method).
Any("msg", en.Event).
Float64("timestamp", en.Event.TS).
Str("raw", string(en.Frame.Params)).
Msg("got NotifyStatus")
Output(
ctx,
fmt.Sprintf("Received NotifyStatus frame from %s", en.Frame.Src),
"notification",
en.Event,
en.Frame.Params,
)
}
}
},
Expand Down
1 change: 1 addition & 0 deletions pkg/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type Discoverer struct {

statusChan chan StatusNotification
fullStatusChan chan StatusNotification
eventChan chan EventNotification
}

// AddDeviceByAddress attempts to parse a user-provided URI and add the device.
Expand Down
5 changes: 1 addition & 4 deletions pkg/discovery/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/jcodybaker/go-shelly"
"github.com/mongoose-os/mos/common/mgrpc"
"github.com/mongoose-os/mos/common/mgrpc/frame"
)

func (d *Discoverer) MQTTConnect(ctx context.Context) error {
Expand All @@ -38,9 +37,7 @@ func (d *Discoverer) MQTTConnect(ctx context.Context) error {
s := mgrpc.Serve(ctx, c)
s.AddHandler("NotifyStatus", d.statusNotificationHandler)
s.AddHandler("NotifyFullStatus", d.fullStatusNotificationHandler)
s.AddHandler("NotifyEvents", func(mr mgrpc.MgRPC, f *frame.Frame) *frame.Frame {
return nil
})
s.AddHandler("NotifyEvent", d.eventNotificationHandler)
}
return nil
}
Expand Down
40 changes: 40 additions & 0 deletions pkg/discovery/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ type StatusNotification struct {
Frame *frame.Frame
}

type EventNotification struct {
Event *shelly.NotifyEvent
Frame *frame.Frame
}

// GetFullStatusNotifications returns a channel which provides NotifyFullStatus messages.
// Messages received before the first invocation of GetFullStatusNotifications will be discarded.
// Consumers MUST be responsive or ther MQTT channel may drop messages.
Expand All @@ -39,6 +44,18 @@ func (d *Discoverer) GetStatusNotifications(buffer int) <-chan StatusNotificatio
return d.statusChan
}

// GetEventNotifications returns a channel which provides events.
// Messages received before the first invocation of GetEventNotifications will be discarded.
// Consumers MUST be responsive or ther MQTT channel may drop messages.
func (d *Discoverer) GetEventNotifications(buffer int) <-chan EventNotification {
d.lock.Lock()
defer d.lock.Unlock()
if d.eventChan == nil {
d.eventChan = make(chan EventNotification, buffer)
}
return d.eventChan
}

func (d *Discoverer) statusNotificationHandler(mr mgrpc.MgRPC, f *frame.Frame) *frame.Frame {
d.lock.Lock()
defer d.lock.Unlock()
Expand Down Expand Up @@ -84,3 +101,26 @@ func (d *Discoverer) fullStatusNotificationHandler(mr mgrpc.MgRPC, f *frame.Fram
}
return nil
}

func (d *Discoverer) eventNotificationHandler(mr mgrpc.MgRPC, f *frame.Frame) *frame.Frame {
d.lock.Lock()
defer d.lock.Unlock()
if d.eventChan == nil {
return nil
}
e := &shelly.NotifyEvent{}
if err := json.Unmarshal(f.Params, &e); err != nil {
log.Err(err).
Str("src", f.Src).
Str("dst", f.Dst).
Int64("id", f.ID).
Str("method", f.Method).
Str("payload", string(f.Params)).
Msg("unmarshalling NotifyFullStatus frame")
}
d.eventChan <- EventNotification{
Event: e,
Frame: f,
}
return nil
}

0 comments on commit 4462eb2

Please sign in to comment.