diff --git a/pkg/resmgr/events.go b/pkg/resmgr/events.go index 6565e61cf..2ad5265de 100644 --- a/pkg/resmgr/events.go +++ b/pkg/resmgr/events.go @@ -17,7 +17,6 @@ package resmgr import ( logger "github.com/containers/nri-plugins/pkg/log" "github.com/containers/nri-plugins/pkg/resmgr/cache" - "github.com/containers/nri-plugins/pkg/resmgr/metrics" ) // Our logger instance for events. @@ -25,34 +24,14 @@ var evtlog = logger.NewLogger("events") // setupEventProcessing sets up event and metrics processing. func (m *resmgr) setupEventProcessing() error { - var err error - m.events = make(chan interface{}, 8) m.stop = make(chan interface{}) - options := metrics.Options{ - PollInterval: opt.MetricsTimer, - } - if m.metrics, err = metrics.NewMetrics(options); err != nil { - return resmgrError("failed to create metrics (pre)processor: %v", err) - } - - return nil -} - -func (m *resmgr) startMetricsProcessing() error { - if err := m.metrics.Start(); err != nil { - return resmgrError("failed to start metrics (pre)processor: %v", err) - } return nil } // startEventProcessing starts event and metrics processing. func (m *resmgr) startEventProcessing() error { - if err := m.startMetricsProcessing(); err != nil { - return resmgrError("failed to start metrics (pre)processor: %v", err) - } - stop := m.stop go func() { for { @@ -73,7 +52,6 @@ func (m *resmgr) startEventProcessing() error { func (m *resmgr) stopEventProcessing() { if m.stop != nil { close(m.stop) - m.metrics.Stop() m.stop = nil } } diff --git a/pkg/resmgr/metrics/metrics.go b/pkg/resmgr/metrics/metrics.go deleted file mode 100644 index 119613d48..000000000 --- a/pkg/resmgr/metrics/metrics.go +++ /dev/null @@ -1,149 +0,0 @@ -// Copyright 2020 Intel Corporation. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package metrics - -import ( - "bytes" - "fmt" - "strings" - "sync" - "time" - - "github.com/prometheus/client_golang/prometheus" - model "github.com/prometheus/client_model/go" - "github.com/prometheus/common/expfmt" - - logger "github.com/containers/nri-plugins/pkg/log" - - "github.com/containers/nri-plugins/pkg/instrumentation" - "github.com/containers/nri-plugins/pkg/metrics" - - // pull in all metrics collectors - _ "github.com/containers/nri-plugins/pkg/metrics/register" -) - -// Options describes options for metrics collection and processing. -type Options struct { - // PollInterval is the interval for polling raw metrics. - PollInterval time.Duration -} - -// Metrics implements collecting, caching and processing of raw metrics. -type Metrics struct { - sync.RWMutex - opts Options // metrics collecting options - g prometheus.Gatherer // prometheus/raw metrics gatherer - stop chan interface{} // channel to stop polling goroutine - raw []*model.MetricFamily // latest set of raw metrics - pend []*model.MetricFamily // pending metrics for forwarding -} - -// Our logger instance. -var log = logger.NewLogger("metrics") - -// NewMetrics creates a new instance for metrics collecting and processing. -func NewMetrics(opts Options) (*Metrics, error) { - g, err := metrics.NewMetricGatherer() - if err != nil { - return nil, metricsError("failed to create raw metrics gatherer: %v", err) - } - - m := &Metrics{ - opts: opts, - raw: make([]*model.MetricFamily, 0), - g: g, - } - - m.poll() - instrumentation.RegisterGatherer(m) - - return m, nil -} - -// Start starts metrics collection and processing. -func (m *Metrics) Start() error { - if m.stop != nil { - return nil - } - - stop := make(chan interface{}) - go func() { - var pollTimer *time.Ticker - var pollChan <-chan time.Time - - if m.opts.PollInterval > 0 { - pollTimer = time.NewTicker(m.opts.PollInterval) - pollChan = pollTimer.C - } else { - log.Info("periodic collection of metrics is disabled") - } - - for { - select { - case _ = <-stop: - if pollTimer != nil { - pollTimer.Stop() - } - return - case _ = <-pollChan: - if err := m.poll(); err != nil { - log.Error("failed to poll raw metrics: %v", err) - } - } - } - }() - m.stop = stop - - return nil -} - -// Stop stops metrics collection and processing. -func (m *Metrics) Stop() { - if m.stop != nil { - close(m.stop) - m.stop = nil - } -} - -// poll does a single round of raw metrics collection. -func (m *Metrics) poll() error { - m.Lock() - defer m.Unlock() - - f, err := m.g.Gather() - if err != nil { - return metricsError("failed to poll raw metrics: %v", err) - } - m.raw = f - m.pend = f - return nil -} - -// dump debug-dumps the given MetricFamily data -func dump(prefix string, f *model.MetricFamily) { - if !log.DebugEnabled() { - return - } - buf := &bytes.Buffer{} - if _, err := expfmt.MetricFamilyToText(buf, f); err != nil { - return - } - log.DebugBlock(" <"+prefix+"> ", "%s", strings.TrimSpace(buf.String())) -} - -// metricsError returns a new formatted error specific to metrics-processing. -func metricsError(format string, args ...interface{}) error { - return fmt.Errorf("metrics: "+format, args...) -} diff --git a/pkg/resmgr/metrics/prometheus.go b/pkg/resmgr/metrics/prometheus.go deleted file mode 100644 index 2ade86961..000000000 --- a/pkg/resmgr/metrics/prometheus.go +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright 2020 Intel Corporation. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package metrics - -import ( - model "github.com/prometheus/client_model/go" -) - -// Gather is our prometheus.Gatherer interface for proxying metrics. -func (m *Metrics) Gather() ([]*model.MetricFamily, error) { - m.Lock() - pend := m.pend - m.Unlock() - - if pend == nil { - log.Debug("no data to proxy to prometheus...") - } else { - log.Debug("proxying data to prometheus...") - } - - return pend, nil -} diff --git a/pkg/resmgr/nri.go b/pkg/resmgr/nri.go index f83edc881..83adba5ce 100644 --- a/pkg/resmgr/nri.go +++ b/pkg/resmgr/nri.go @@ -313,6 +313,10 @@ func (p *nriPlugin) StopPodSandbox(ctx context.Context, podSandbox *api.PodSandb m := p.resmgr + // TODO(klihub): shouldn't we m.Lock()/defer m.Unlock() here? + metrics.Block() + defer metrics.Unblock() + released := []cache.Container{} pod, _ := m.cache.LookupPod(podSandbox.GetId()) @@ -391,6 +395,8 @@ func (p *nriPlugin) CreateContainer(ctx context.Context, podSandbox *api.PodSand m := p.resmgr m.Lock() defer m.Unlock() + metrics.Block() + defer metrics.Unblock() c, err := m.cache.InsertContainer(container) if err != nil { @@ -612,6 +618,9 @@ func (p *nriPlugin) RemoveContainer(ctx context.Context, pod *api.PodSandbox, co func (p *nriPlugin) updateContainers() (retErr error) { // Notes: must be called with p.resmgr lock held. + metrics.Block() + defer metrics.Unblock() + updates := p.getPendingUpdates(nil) event := UpdateContainers diff --git a/pkg/resmgr/policy/metrics.go b/pkg/resmgr/policy/metrics.go new file mode 100644 index 000000000..19eb09169 --- /dev/null +++ b/pkg/resmgr/policy/metrics.go @@ -0,0 +1,55 @@ +// Copyright The NRI Plugins Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package policy + +import ( + "github.com/prometheus/client_golang/prometheus" + + "github.com/containers/nri-plugins/pkg/metrics" +) + +type PolicyCollector struct { + policy *policy +} + +func (p *policy) newPolicyCollector() *PolicyCollector { + return &PolicyCollector{ + policy: p, + } +} + +func (c *PolicyCollector) register() error { + return metrics.Register(c.policy.ActivePolicy(), c, metrics.WithGroup("policy")) +} + +func (c *PolicyCollector) Describe(ch chan<- *prometheus.Desc) { + for _, d := range c.policy.active.DescribeMetrics() { + ch <- d + } +} + +func (c *PolicyCollector) Collect(ch chan<- prometheus.Metric) { + polled := c.policy.active.PollMetrics() + + collected, err := c.policy.active.CollectMetrics(polled) + if err != nil { + log.Error("failed to collect metrics: %v", err) + return + } + + for _, m := range collected { + ch <- m + } +} diff --git a/pkg/resmgr/policy/policy.go b/pkg/resmgr/policy/policy.go index f8ec5963d..2ecdcb29a 100644 --- a/pkg/resmgr/policy/policy.go +++ b/pkg/resmgr/policy/policy.go @@ -147,12 +147,6 @@ type Policy interface { HandleEvent(*events.Policy) (bool, error) // ExportResourceData exports/updates resource data for the container. ExportResourceData(cache.Container) - // DescribeMetrics generates policy-specific prometheus metrics data descriptors. - DescribeMetrics() []*prometheus.Desc - // PollMetrics provides policy metrics for monitoring. - PollMetrics() Metrics - // CollectMetrics generates prometheus metrics from cached/polled policy-specific metrics data. - CollectMetrics(Metrics) ([]prometheus.Metric, error) // GetTopologyZones returns the policy/pool data for 'topology zone' CRDs. GetTopologyZones() []*TopologyZone } @@ -204,11 +198,12 @@ type ZoneAttribute struct { // Policy instance/state. type policy struct { - options Options // policy options - cache cache.Cache // system state cache - active Backend // our active backend - system system.System // system/HW/topology info - sendEvent SendEventFn // function to send event up to the resource manager + options Options // policy options + cache cache.Cache // system state cache + active Backend // our active backend + system system.System // system/HW/topology info + sendEvent SendEventFn // function to send event up to the resource manager + pcollect *PolicyCollector // policy metrics collector } // backend is a registered Backend. @@ -225,11 +220,25 @@ var log logger.Logger = logger.NewLogger("policy") func NewPolicy(backend Backend, cache cache.Cache, o *Options) (Policy, error) { log.Info("creating '%s' policy...", backend.Name()) - return &policy{ + p := &policy{ cache: cache, options: *o, active: backend, - }, nil + } + + sys, err := system.DiscoverSystem() + if err != nil { + return nil, policyError("failed to discover system topology: %v", err) + } + p.system = sys + + pcollect := p.newPolicyCollector() + if err := pcollect.register(); err != nil { + return nil, policyError("failed to register policy collector: %v", err) + } + p.pcollect = pcollect + + return p, nil } func (p *policy) ActivePolicy() string { @@ -241,12 +250,6 @@ func (p *policy) ActivePolicy() string { // Start starts up policy, preparing it for serving requests. func (p *policy) Start(cfg interface{}) error { - sys, err := system.DiscoverSystem() - if err != nil { - return policyError("failed to discover system topology: %v", err) - } - p.system = sys - log.Info("activating '%s' policy...", p.active.Name()) if err := p.active.Setup(&BackendOptions{ @@ -315,21 +318,6 @@ func (p *policy) ExportResourceData(c cache.Container) { p.cache.WriteFile(c.GetID(), ExportedResources, 0644, buf.Bytes()) } -// PollMetrics provides policy metrics for monitoring. -func (p *policy) PollMetrics() Metrics { - return p.active.PollMetrics() -} - -// DescribeMetrics generates policy-specific prometheus metrics data descriptors. -func (p *policy) DescribeMetrics() []*prometheus.Desc { - return p.active.DescribeMetrics() -} - -// CollectMetrics generates prometheus metrics from cached/polled policy-specific metrics data. -func (p *policy) CollectMetrics(m Metrics) ([]prometheus.Metric, error) { - return p.active.CollectMetrics(m) -} - // GetTopologyZones returns the policy/pool data for 'topology zone' CRDs. func (p *policy) GetTopologyZones() []*TopologyZone { return p.active.GetTopologyZones() diff --git a/pkg/resmgr/policycollector/collector.go b/pkg/resmgr/policycollector/collector.go deleted file mode 100644 index 6946d2095..000000000 --- a/pkg/resmgr/policycollector/collector.go +++ /dev/null @@ -1,62 +0,0 @@ -// Copyright 2019 Intel Corporation. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package policycollector - -import ( - "github.com/containers/nri-plugins/pkg/metrics" - "github.com/containers/nri-plugins/pkg/resmgr/policy" - "github.com/prometheus/client_golang/prometheus" -) - -type PolicyCollector struct { - policy policy.Policy -} - -func (c *PolicyCollector) SetPolicy(policy policy.Policy) { - c.policy = policy -} - -// HasPolicySpecificMetrics judges whether the policy defines the policy-specific metrics -func (c *PolicyCollector) HasPolicySpecificMetrics() bool { - if c.policy.DescribeMetrics() == nil { - return false - } - return true -} - -// Describe implements prometheus.Collector interface -func (c *PolicyCollector) Describe(ch chan<- *prometheus.Desc) { - for _, d := range c.policy.DescribeMetrics() { - ch <- d - } -} - -// Collect implements prometheus.Collector interface -func (c *PolicyCollector) Collect(ch chan<- prometheus.Metric) { - prometheusMetrics, err := c.policy.CollectMetrics(c.policy.PollMetrics()) - if err != nil { - return - } - for _, m := range prometheusMetrics { - ch <- m - } -} - -// RegisterPolicyMetricsCollector registers policy-specific collector -func (c *PolicyCollector) RegisterPolicyMetricsCollector() error { - return metrics.RegisterCollector("policyMetrics", func() (prometheus.Collector, error) { - return c, nil - }) -} diff --git a/pkg/resmgr/resource-manager.go b/pkg/resmgr/resource-manager.go index 93b11a4b2..948ced007 100644 --- a/pkg/resmgr/resource-manager.go +++ b/pkg/resmgr/resource-manager.go @@ -27,7 +27,6 @@ import ( "github.com/containers/nri-plugins/pkg/pidfile" "github.com/containers/nri-plugins/pkg/resmgr/cache" "github.com/containers/nri-plugins/pkg/resmgr/control" - "github.com/containers/nri-plugins/pkg/resmgr/metrics" "github.com/containers/nri-plugins/pkg/resmgr/policy" "github.com/containers/nri-plugins/pkg/sysfs" "github.com/containers/nri-plugins/pkg/topology" @@ -35,8 +34,6 @@ import ( "sigs.k8s.io/yaml" cfgapi "github.com/containers/nri-plugins/pkg/apis/config/v1alpha1" - - policyCollector "github.com/containers/nri-plugins/pkg/resmgr/policycollector" ) // ResourceManager is the interface we expose for controlling the CRI resource manager. @@ -59,7 +56,6 @@ type resmgr struct { cache cache.Cache // cached state policy policy.Policy // resource manager policy control control.Control // policy controllers/enforcement - metrics *metrics.Metrics // metrics collector/pre-processor events chan interface{} // channel for delivering events stop chan interface{} // channel for signalling shutdown to goroutines nri *nriPlugin // NRI plugins, if we're running as such @@ -103,10 +99,6 @@ func NewResourceManager(backend policy.Backend, agt *agent.Agent) (ResourceManag return nil, err } - if err := m.registerPolicyMetricsCollector(); err != nil { - return nil, err - } - if err := m.setupEventProcessing(); err != nil { return nil, err } @@ -174,12 +166,13 @@ func (m *resmgr) start(cfg cfgapi.ResmgrConfig) error { mCfg := cfg.CommonConfig() logger.Configure(&mCfg.Log) - instrumentation.Reconfigure(&mCfg.Instrumentation) if err := m.policy.Start(m.cfg.PolicyConfig()); err != nil { return err } + instrumentation.Reconfigure(&mCfg.Instrumentation) + if err := m.nri.start(); err != nil { return err } @@ -280,17 +273,6 @@ func (m *resmgr) updateTopologyZones() { } } -// registerPolicyMetricsCollector registers policy metrics collector· -func (m *resmgr) registerPolicyMetricsCollector() error { - pc := &policyCollector.PolicyCollector{} - pc.SetPolicy(m.policy) - if pc.HasPolicySpecificMetrics() { - return pc.RegisterPolicyMetricsCollector() - } - log.Info("%s policy has no policy-specific metrics.", m.policy.ActivePolicy()) - return nil -} - func (m *resmgr) reconfigure(cfg cfgapi.ResmgrConfig) error { apply := func(cfg cfgapi.ResmgrConfig) error { mCfg := cfg.CommonConfig()