From 9f0423f9abdea586df570495d79e6f841408c0ff Mon Sep 17 00:00:00 2001 From: Aaron Lee Date: Thu, 9 Nov 2023 11:44:27 -0800 Subject: [PATCH 1/5] Added IOPS/BW reporting to Prometheus Integrated instantaneous IOPS/BW collection by recording 1 second of IO and outputting it to Prometheus collector. Removed one prometheus minio collector from registering in metrics.go because it was causing Collect to run twice, didn't see any issues with doing it --- cmd/globals.go | 4 ++++ cmd/metrics.go | 29 ++++++++++++++++++++++++++++- cmd/object-handlers.go | 9 +++++++++ cmd/xl-sets.go | 21 +++++++++++++++++++++ 4 files changed, 62 insertions(+), 1 deletion(-) diff --git a/cmd/globals.go b/cmd/globals.go index ed678007..e0a2a406 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -190,6 +190,10 @@ var ( globalTotalGetIOCount uint64 globalIsStopping bool = false gIsRDDQHandleSet bool = false + globalCurrIOCount uint64 + globalCurrBW uint64 + globalMetricsChan = make(chan uint32) + globalCollectMetrics uint32 // Global server's network statistics globalConnStats = newConnStats() diff --git a/cmd/metrics.go b/cmd/metrics.go index 11d5c2c7..77787bb1 100644 --- a/cmd/metrics.go +++ b/cmd/metrics.go @@ -19,6 +19,7 @@ package cmd import ( "context" "net/http" + "sync/atomic" "github.com/minio/minio/cmd/logger" "github.com/prometheus/client_golang/prometheus" @@ -36,9 +37,11 @@ var ( ) ) +// Commented out this collector registration since Prometheus is running collect twice +// Couldn't see any difference in functionality func init() { prometheus.MustRegister(httpRequestsDuration) - prometheus.MustRegister(newMinioCollector()) + //prometheus.MustRegister(newMinioCollector()) } // newMinioCollector describes the collector @@ -64,6 +67,30 @@ func (c *minioCollector) Describe(ch chan<- *prometheus.Desc) { // Collect is called by the Prometheus registry when collecting metrics. func (c *minioCollector) Collect(ch chan<- prometheus.Metric) { + globalMetricsChan<- 1 + // Waiting to make sure metrics thread finishes before + <-globalMetricsChan + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc( + prometheus.BuildFQName("minio", "metrics", "iops"), + "Current IOPS of this Minio instance", + nil, nil), + prometheus.CounterValue, + float64(atomic.LoadUint64(&globalCurrIOCount)), + ) + + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc( + prometheus.BuildFQName("minio", "metrics", "bw"), + "Current BW of this Minio instance (KiB/s)", + nil, nil), + prometheus.CounterValue, + float64(float64(atomic.LoadUint64(&globalCurrBW)) / float64(1024)), + ) + // Reset counters once reported + atomic.StoreUint64(&globalCurrIOCount, 0) + atomic.StoreUint64(&globalCurrBW, 0) + // Always expose network stats // Network Sent/Received Bytes diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index b5ab40f3..b867bebc 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -401,6 +401,10 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req defer gr.Close() objInfo = gr.ObjInfo + if (atomic.LoadUint32(&globalCollectMetrics) == 1) { + atomic.AddUint64(&globalCurrBW, uint64(objInfo.Size)) + atomic.AddUint64(&globalCurrIOCount, 1) + } } if objectAPI.IsEncryptionSupported() { @@ -1419,6 +1423,11 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req return } + if atomic.LoadUint32(&globalCollectMetrics) == 1 { + atomic.AddUint64(&globalCurrBW, uint64(objInfo.Size)) + atomic.AddUint64(&globalCurrIOCount, 1) + } + etag := objInfo.ETag if objInfo.IsCompressed() { // Ignore compressed ETag. diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index 673f63b8..a20dea57 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -25,6 +25,7 @@ import ( "sort" "strings" "sync" + "sync/atomic" "time" "os" "strconv" @@ -347,6 +348,24 @@ func (s *xlSets) UpdateCountersToDisk() { time.Sleep(time.Duration(init_stati) * time.Second) } } +// BW/IOPS reporting thread +func metricsReporting() { + fmt.Println("### Metrics reporting thread started") + for { + // Need to use atomic operation for setting and checking whether to collect + // It's probably fine to not use atomic within this thread, but IO threads will need it accurately determine whether to record + // Go 1.12 doesn't have atomic structs (Go 1.19 introduced atomic structs) + + // Signal to start recording IO, then sleep for 1s and report the counters + start := <-globalMetricsChan + atomic.StoreUint32(&globalCollectMetrics, start) + + time.Sleep(1 * time.Second) + atomic.StoreUint32(&globalCollectMetrics, 0) + // Signal to stop after collecting for 1s + globalMetricsChan <- 0 + } +} func (s *xlSets) syncSharedVols() { @@ -623,6 +642,8 @@ func newXLSets(endpoints EndpointList, format *formatXLV3, setCount int, drivesP go s.UpdateCountersToDisk() } + go metricsReporting() + globalSC_read = false if os.Getenv("MINIO_ENABLE_SC_READ") != "" { fmt.Println("### Setting up for SC read.. ###") From 75c7786b8ae82af28ce415c3cd694bcb4a70685b Mon Sep 17 00:00:00 2001 From: Aaron Lee Date: Thu, 9 Nov 2023 13:08:14 -0800 Subject: [PATCH 2/5] Change BW units to bytes/s for management plane Management plane asked for BW to reported in bytes/s, removing the conversion to KB --- cmd/metrics.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/metrics.go b/cmd/metrics.go index 77787bb1..6321a70d 100644 --- a/cmd/metrics.go +++ b/cmd/metrics.go @@ -85,7 +85,7 @@ func (c *minioCollector) Collect(ch chan<- prometheus.Metric) { "Current BW of this Minio instance (KiB/s)", nil, nil), prometheus.CounterValue, - float64(float64(atomic.LoadUint64(&globalCurrBW)) / float64(1024)), + float64(atomic.LoadUint64(&globalCurrBW)), ) // Reset counters once reported atomic.StoreUint64(&globalCurrIOCount, 0) From 83addb94d1da1a8822bc8d9901e2b794d160f14b Mon Sep 17 00:00:00 2001 From: Aaron Lee Date: Fri, 10 Nov 2023 07:44:14 -0800 Subject: [PATCH 3/5] Removed KiB/s label from Prometheus output, using bytes instead now --- cmd/metrics.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/metrics.go b/cmd/metrics.go index 6321a70d..dc61cff3 100644 --- a/cmd/metrics.go +++ b/cmd/metrics.go @@ -82,7 +82,7 @@ func (c *minioCollector) Collect(ch chan<- prometheus.Metric) { ch <- prometheus.MustNewConstMetric( prometheus.NewDesc( prometheus.BuildFQName("minio", "metrics", "bw"), - "Current BW of this Minio instance (KiB/s)", + "Current BW of this Minio instance", nil, nil), prometheus.CounterValue, float64(atomic.LoadUint64(&globalCurrBW)), From 056d46eb409f4ce507087d162ddf8aef3b63044a Mon Sep 17 00:00:00 2001 From: Aaron Lee Date: Tue, 14 Nov 2023 18:26:02 -0800 Subject: [PATCH 4/5] Fixed indentation/comments --- cmd/globals.go | 8 ++++---- cmd/metrics.go | 1 - cmd/object-handlers.go | 16 ++++++++-------- cmd/xl-sets.go | 30 +++++++++++++++--------------- 4 files changed, 27 insertions(+), 28 deletions(-) diff --git a/cmd/globals.go b/cmd/globals.go index b82359cf..a81a0df3 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -191,10 +191,10 @@ var ( globalTotalGetIOCount uint64 globalIsStopping bool = false gIsRDDQHandleSet bool = false - globalCurrIOCount uint64 - globalCurrBW uint64 - globalMetricsChan = make(chan uint32) - globalCollectMetrics uint32 + globalCurrIOCount uint64 + globalCurrBW uint64 + globalMetricsChan = make(chan uint32) + globalCollectMetrics uint32 // Global server's network statistics globalConnStats = newConnStats() diff --git a/cmd/metrics.go b/cmd/metrics.go index dc61cff3..7e11d1d5 100644 --- a/cmd/metrics.go +++ b/cmd/metrics.go @@ -41,7 +41,6 @@ var ( // Couldn't see any difference in functionality func init() { prometheus.MustRegister(httpRequestsDuration) - //prometheus.MustRegister(newMinioCollector()) } // newMinioCollector describes the collector diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index b867bebc..2fc53faa 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -401,10 +401,10 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req defer gr.Close() objInfo = gr.ObjInfo - if (atomic.LoadUint32(&globalCollectMetrics) == 1) { - atomic.AddUint64(&globalCurrBW, uint64(objInfo.Size)) - atomic.AddUint64(&globalCurrIOCount, 1) - } + if (atomic.LoadUint32(&globalCollectMetrics) == 1) { + atomic.AddUint64(&globalCurrBW, uint64(objInfo.Size)) + atomic.AddUint64(&globalCurrIOCount, 1) + } } if objectAPI.IsEncryptionSupported() { @@ -1423,10 +1423,10 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req return } - if atomic.LoadUint32(&globalCollectMetrics) == 1 { - atomic.AddUint64(&globalCurrBW, uint64(objInfo.Size)) - atomic.AddUint64(&globalCurrIOCount, 1) - } + if atomic.LoadUint32(&globalCollectMetrics) == 1 { + atomic.AddUint64(&globalCurrBW, uint64(objInfo.Size)) + atomic.AddUint64(&globalCurrIOCount, 1) + } etag := objInfo.ETag if objInfo.IsCompressed() { diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index dbaee84c..c9fe2ab2 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -350,21 +350,21 @@ func (s *xlSets) UpdateCountersToDisk() { } // BW/IOPS reporting thread func metricsReporting() { - fmt.Println("### Metrics reporting thread started") - for { - // Need to use atomic operation for setting and checking whether to collect - // It's probably fine to not use atomic within this thread, but IO threads will need it accurately determine whether to record - // Go 1.12 doesn't have atomic structs (Go 1.19 introduced atomic structs) + fmt.Println("### Metrics reporting thread started") + for { + // Need to use atomic operation for setting and checking whether to collect + // It's probably fine to not use atomic within this thread, but IO threads will need it accurately determine whether to record + // Go 1.12 doesn't have atomic structs (Go 1.19 introduced atomic structs) - // Signal to start recording IO, then sleep for 1s and report the counters - start := <-globalMetricsChan - atomic.StoreUint32(&globalCollectMetrics, start) - - time.Sleep(1 * time.Second) - atomic.StoreUint32(&globalCollectMetrics, 0) - // Signal to stop after collecting for 1s - globalMetricsChan <- 0 - } + // Signal to start recording IO, then sleep for 1s and report the counters + start := <-globalMetricsChan + atomic.StoreUint32(&globalCollectMetrics, start) + + time.Sleep(1 * time.Second) + atomic.StoreUint32(&globalCollectMetrics, 0) + // Signal to stop after collecting for 1s + globalMetricsChan <- 0 + } } @@ -642,7 +642,7 @@ func newXLSets(endpoints EndpointList, format *formatXLV3, setCount int, drivesP go s.UpdateCountersToDisk() } - go metricsReporting() + go metricsReporting() globalSC_read = false if os.Getenv("MINIO_ENABLE_SC_READ") != "" { From e7af4f25577cb579e127529e7b7d1c4b44841a23 Mon Sep 17 00:00:00 2001 From: Aaron Lee Date: Thu, 16 Nov 2023 16:38:35 -0800 Subject: [PATCH 5/5] Fixed metrics collection design to constantly run and reset from thread; added new counters Previously the design of metrics collection made it so that metrics could only be reported through Prometheus, since the code to reset counters was inside Prometheus Now, counters are reset inside metrics collection thread while it is constantly running. To address the issue of metrics being collected when counters are reset, new variables will keep track of last second of metrics which Prometheus will take. Also added new counters to separate PUT, GET, and DEL BW/IOPS and now incrementing these counters in other object API handlers that do IO NOTE: Not supporting multipart for now --- cmd/globals.go | 18 +++++++++-- cmd/metrics.go | 70 +++++++++++++++++++++++++++++------------- cmd/object-handlers.go | 28 ++++++++++++++--- cmd/xl-sets.go | 22 ++++++++++--- 4 files changed, 105 insertions(+), 33 deletions(-) diff --git a/cmd/globals.go b/cmd/globals.go index a81a0df3..97c0a8fb 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -191,9 +191,21 @@ var ( globalTotalGetIOCount uint64 globalIsStopping bool = false gIsRDDQHandleSet bool = false - globalCurrIOCount uint64 - globalCurrBW uint64 - globalMetricsChan = make(chan uint32) + + report_minio_metrics bool = os.Getenv("MINIO_REPORT_METRICS") != "" + // Separate counters for live collecting and actual reported + // This is to avoid low numbers being reported if Collect is called while metrics thread is recording, + // report counters will store the last completed second of metrics + globalCurrPutIOPS uint64 + globalCurrGetIOPS uint64 + globalCurrPutBW uint64 + globalCurrGetBW uint64 + globalCurrDel uint64 + globalReportPutIOPS uint64 + globalReportGetIOPS uint64 + globalReportPutBW uint64 + globalReportGetBW uint64 + globalReportDel uint64 globalCollectMetrics uint32 // Global server's network statistics diff --git a/cmd/metrics.go b/cmd/metrics.go index 7e11d1d5..7c079a8b 100644 --- a/cmd/metrics.go +++ b/cmd/metrics.go @@ -65,30 +65,56 @@ func (c *minioCollector) Describe(ch chan<- *prometheus.Desc) { // Collect is called by the Prometheus registry when collecting metrics. func (c *minioCollector) Collect(ch chan<- prometheus.Metric) { + // Only report in Prometheus if MINIO_REPORT_METRICS is set + if report_minio_metrics { + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc( + prometheus.BuildFQName("minio", "metrics", "put_iops"), + "Current PUT IOPS of this Minio instance", + nil, nil), + prometheus.CounterValue, + float64(atomic.LoadUint64(&globalReportPutIOPS)), + ) + + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc( + prometheus.BuildFQName("minio", "metrics", "get_iops"), + "Current GET IOPS of this Minio instance", + nil, nil), + prometheus.CounterValue, + float64(atomic.LoadUint64(&globalReportGetIOPS)), + ) + + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc( + prometheus.BuildFQName("minio", "metrics", "del_iops"), + "Current DEL IOPS of this Minio instance", + nil, nil), + prometheus.CounterValue, + float64(atomic.LoadUint64(&globalReportDel)), + ) + + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc( + prometheus.BuildFQName("minio", "metrics", "put_bw"), + "Current PUT BW of this Minio instance", + nil, nil), + prometheus.CounterValue, + float64(atomic.LoadUint64(&globalReportPutBW)), + ) + + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc( + prometheus.BuildFQName("minio", "metrics", "get_bw"), + "Current GET BW of this Minio instance", + nil, nil), + prometheus.CounterValue, + float64(atomic.LoadUint64(&globalReportGetBW)), + ) + } + - globalMetricsChan<- 1 - // Waiting to make sure metrics thread finishes before - <-globalMetricsChan - ch <- prometheus.MustNewConstMetric( - prometheus.NewDesc( - prometheus.BuildFQName("minio", "metrics", "iops"), - "Current IOPS of this Minio instance", - nil, nil), - prometheus.CounterValue, - float64(atomic.LoadUint64(&globalCurrIOCount)), - ) - ch <- prometheus.MustNewConstMetric( - prometheus.NewDesc( - prometheus.BuildFQName("minio", "metrics", "bw"), - "Current BW of this Minio instance", - nil, nil), - prometheus.CounterValue, - float64(atomic.LoadUint64(&globalCurrBW)), - ) - // Reset counters once reported - atomic.StoreUint64(&globalCurrIOCount, 0) - atomic.StoreUint64(&globalCurrBW, 0) // Always expose network stats diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 2fc53faa..b60a9ba8 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -223,6 +223,11 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r s3Select.Evaluate(w) s3Select.Close() + if (atomic.LoadUint32(&globalCollectMetrics) == 1) { + atomic.AddUint64(&globalCurrGetBW, uint64(objInfo.Size)) + atomic.AddUint64(&globalCurrGetIOPS, 1) + } + // Get host and port from Request.RemoteAddr. host, port, err := net.SplitHostPort(handlers.GetSourceIP(r)) if err != nil { @@ -402,8 +407,8 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req objInfo = gr.ObjInfo if (atomic.LoadUint32(&globalCollectMetrics) == 1) { - atomic.AddUint64(&globalCurrBW, uint64(objInfo.Size)) - atomic.AddUint64(&globalCurrIOCount, 1) + atomic.AddUint64(&globalCurrGetBW, uint64(objInfo.Size)) + atomic.AddUint64(&globalCurrGetIOPS, 1) } } @@ -710,6 +715,11 @@ func (api objectAPIHandlers) HeadObjectHandler(w http.ResponseWriter, r *http.Re if (track_minio_stats) { atomic.AddUint64(&globalTotalHeadObjQD, ^uint64(0)) } + // + if (atomic.LoadUint32(&globalCollectMetrics) == 1) { + atomic.AddUint64(&globalCurrGetBW, uint64(objInfo.Size)) + atomic.AddUint64(&globalCurrGetIOPS, 1) + } // Notify object accessed via a HEAD request. sendEvent(eventArgs{ EventName: event.ObjectAccessedHead, @@ -1185,6 +1195,11 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re objInfo.Size = actualSize } + if atomic.LoadUint32(&globalCollectMetrics) == 1 { + atomic.AddUint64(&globalCurrPutBW, uint64(objInfo.Size)) + atomic.AddUint64(&globalCurrPutIOPS, 1) + } + // Notify object created event. sendEvent(eventArgs{ EventName: event.ObjectCreatedCopy, @@ -1424,8 +1439,8 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req } if atomic.LoadUint32(&globalCollectMetrics) == 1 { - atomic.AddUint64(&globalCurrBW, uint64(objInfo.Size)) - atomic.AddUint64(&globalCurrIOCount, 1) + atomic.AddUint64(&globalCurrPutBW, uint64(objInfo.Size)) + atomic.AddUint64(&globalCurrPutIOPS, 1) } etag := objInfo.ETag @@ -2571,5 +2586,10 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http. } // Ignore delete object errors while replying to client, since we are suppposed to reply only 204. } + + if atomic.LoadUint32(&globalCollectMetrics) == 1 { + atomic.AddUint64(&globalCurrDel, 1) + } + writeSuccessNoContent(w) } diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index c9fe2ab2..325f4711 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -357,13 +357,24 @@ func metricsReporting() { // Go 1.12 doesn't have atomic structs (Go 1.19 introduced atomic structs) // Signal to start recording IO, then sleep for 1s and report the counters - start := <-globalMetricsChan - atomic.StoreUint32(&globalCollectMetrics, start) + atomic.StoreUint32(&globalCollectMetrics, 1) time.Sleep(1 * time.Second) atomic.StoreUint32(&globalCollectMetrics, 0) // Signal to stop after collecting for 1s - globalMetricsChan <- 0 + + // Record counters to a variable for Prometheus to report + atomic.StoreUint64(&globalReportPutIOPS, atomic.LoadUint64(&globalCurrPutIOPS)) + atomic.StoreUint64(&globalReportGetIOPS, atomic.LoadUint64(&globalCurrGetIOPS)) + atomic.StoreUint64(&globalReportPutBW, atomic.LoadUint64(&globalCurrPutBW)) + atomic.StoreUint64(&globalReportGetBW, atomic.LoadUint64(&globalCurrGetBW)) + atomic.StoreUint64(&globalReportDel, atomic.LoadUint64(&globalCurrDel)) + // Reset counters after recording the last second of metrics + atomic.StoreUint64(&globalCurrPutIOPS, 0) + atomic.StoreUint64(&globalCurrGetIOPS, 0) + atomic.StoreUint64(&globalCurrPutBW, 0) + atomic.StoreUint64(&globalCurrGetBW, 0) + atomic.StoreUint64(&globalCurrDel, 0) } } @@ -642,7 +653,10 @@ func newXLSets(endpoints EndpointList, format *formatXLV3, setCount int, drivesP go s.UpdateCountersToDisk() } - go metricsReporting() + // Only record metrics if MINIO_REPORT_METRICS env variable is set + if (report_minio_metrics) { + go metricsReporting() + } globalSC_read = false if os.Getenv("MINIO_ENABLE_SC_READ") != "" {