Skip to content

Commit

Permalink
added instrument publsiher helpers
Browse files Browse the repository at this point in the history
  • Loading branch information
hmoragrega committed Nov 19, 2021
1 parent cb4d6da commit b7cda69
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 11 deletions.
6 changes: 4 additions & 2 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ const (
)

var (
defaultMonitor Monitor

commonLabels = []string{consumerKey, msgNameKey, msgVersionKey, errorKey}
)

Expand Down Expand Up @@ -130,7 +132,7 @@ func MustInstrumentRouterWithMonitor(monitor *Monitor, router *pubsub.Router) *p
//
// It will panic on metric registration error.
func MustInstrumentRouter(router *pubsub.Router) *pubsub.Router {
return MustInstrumentRouterWithMonitor(&Monitor{}, router)
return MustInstrumentRouterWithMonitor(&defaultMonitor, router)
}

// MustInstrumentRouter helper to instrument a router and returns the same instance.
Expand All @@ -150,7 +152,7 @@ func InstrumentRouterWithMonitor(monitor *Monitor, router *pubsub.Router) error

// InstrumentRouter helper to instrument a router returning any errors that may happen.
func InstrumentRouter(router *pubsub.Router) error {
return InstrumentRouterWithMonitor(&Monitor{}, router)
return InstrumentRouterWithMonitor(&defaultMonitor, router)
}

// InstrumentRouter a router returning any errors that may happen.
Expand Down
33 changes: 29 additions & 4 deletions metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,17 @@ func TestDefaultInstrument(t *testing.T) {
t.Cleanup(resetRegisterer)

require.NoError(t, InstrumentRouter(&pubsub.Router{}))
require.NotNil(t, InstrumentPublisher(pubsub.NoOpPublisher()))
require.NotPanics(t, func() {
MustInstrumentRouter(&pubsub.Router{})
})

pub, err := InstrumentPublisher(pubsub.NoOpPublisher())
require.NoError(t, err)
require.NotNil(t, pub)

require.NotPanics(t, func() {
_ = MustInstrumentPublisher(pubsub.NoOpPublisher())
})
}

func TestInstrumentMultipleRouters(t *testing.T) {
Expand All @@ -38,11 +48,26 @@ func TestInstrumentMultipleRouters(t *testing.T) {
})
}

func TestInstrumentPublisher(t *testing.T) {
t.Cleanup(resetRegisterer)

require.NotPanics(t, func() {
MustInstrumentPublisherWithMonitor(&Monitor{}, pubsub.NoOpPublisher())
})

t.Run("without new registry ot panics on successive calls on different monitors", func(t *testing.T) {
require.Panics(t, func() {
MustInstrumentPublisherWithMonitor(&Monitor{}, pubsub.NoOpPublisher())
})
})
}

func TestInstrumentMultipleMonitor(t *testing.T) {
t.Cleanup(resetRegisterer)

var r pubsub.Router
MustInstrumentRouter(&r)
var m Monitor
MustInstrumentRouterWithMonitor(&m, &r)

t.Run("panics if the registerer is not different", func(t *testing.T) {
require.Panics(t, func() {
Expand Down Expand Up @@ -83,7 +108,7 @@ func TestProcessedMessages(t *testing.T) {
Namespace: "custom_ns",
},
PublishOpts: prometheus.HistogramOpts{
Buckets: []float64{1},
Buckets: []float64{1},
ConstLabels: map[string]string{"more": "custom"},
},
ConstLabels: map[string]string{"const": "label"},
Expand Down Expand Up @@ -118,7 +143,7 @@ func TestProcessedMessages(t *testing.T) {
return publisher.Publish(ctx, topic, envelopes...)
})
},
m.InstrumentPublisher,
m.MustInstrumentPublisher,
)

require.NoError(t, r.Register(consumerA, envPub.Subscriber(consumerA), slowHandler))
Expand Down
36 changes: 31 additions & 5 deletions publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,34 @@ import (
)

// InstrumentPublisher is a helper to instrument a publisher with the default monitor.
func InstrumentPublisher(next pubsub.Publisher) pubsub.Publisher {
var m Monitor
return m.InstrumentPublisher(next)
func InstrumentPublisher(next pubsub.Publisher) (pubsub.Publisher, error) {
return InstrumentPublisherWithMonitor(&defaultMonitor, next)
}

// InstrumentPublisherWithMonitor is a helper to instrument a publisher with the default monitor.
func InstrumentPublisherWithMonitor(monitor *Monitor, next pubsub.Publisher) (pubsub.Publisher, error) {
return monitor.InstrumentPublisher(next)
}

// MustInstrumentPublisher is a helper to instrument a publisher with the default monitor.
func MustInstrumentPublisher(next pubsub.Publisher) pubsub.Publisher {
return MustInstrumentPublisherWithMonitor(&defaultMonitor, next)
}

// MustInstrumentPublisherWithMonitor is a helper to instrument a publisher with the default monitor.
func MustInstrumentPublisherWithMonitor(monitor *Monitor, next pubsub.Publisher) pubsub.Publisher {
pub, err := InstrumentPublisherWithMonitor(monitor, next)
if err != nil {
panic(err)
}
return pub
}

// InstrumentPublisher is a publisher middleware that will send metrics on publishing operations.
func (m *Monitor) InstrumentPublisher(next pubsub.Publisher) pubsub.Publisher {
func (m *Monitor) InstrumentPublisher(next pubsub.Publisher) (pubsub.Publisher, error) {
if err := m.Register(); err != nil {
return nil, err
}
return pubsub.PublisherFunc(func(ctx context.Context, topic string, envelopes ...*pubsub.Message) (err error) {
start := time.Now()
defer func() {
Expand All @@ -33,7 +54,12 @@ func (m *Monitor) InstrumentPublisher(next pubsub.Publisher) pubsub.Publisher {
}()

return next.Publish(ctx, topic, envelopes...)
})
}), nil
}

// MustInstrumentPublisher is a publisher middleware that will send metrics on publishing operations.
func (m *Monitor) MustInstrumentPublisher(next pubsub.Publisher) pubsub.Publisher {
return MustInstrumentPublisherWithMonitor(m, next)
}

func (m *Monitor) buildPublish(opts prometheus.HistogramOpts) *prometheus.HistogramVec {
Expand Down

0 comments on commit b7cda69

Please sign in to comment.