Skip to content

Commit

Permalink
Merge pull request #19 from OpenMPDK/bw_iops_reporting
Browse files Browse the repository at this point in the history
Instantaneous BW and IOPS reporting to Prometheus
  • Loading branch information
somnathr authored Nov 17, 2023
2 parents df81c52 + e67b210 commit cdbbf3f
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 2 deletions.
16 changes: 16 additions & 0 deletions cmd/globals.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,22 @@ var (
globalIsStopping bool = false
gIsRDDQHandleSet bool = false

report_minio_metrics bool = os.Getenv("MINIO_REPORT_METRICS") != ""
// Separate counters for live collecting and actual reported
// This is to avoid low numbers being reported if Collect is called while metrics thread is recording,
// report counters will store the last completed second of metrics
globalCurrPutIOPS uint64
globalCurrGetIOPS uint64
globalCurrPutBW uint64
globalCurrGetBW uint64
globalCurrDel uint64
globalReportPutIOPS uint64
globalReportGetIOPS uint64
globalReportPutBW uint64
globalReportGetBW uint64
globalReportDel uint64
globalCollectMetrics uint32

// Global server's network statistics
globalConnStats = newConnStats()

Expand Down
56 changes: 54 additions & 2 deletions cmd/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cmd
import (
"context"
"net/http"
"sync/atomic"

"github.com/minio/minio/cmd/logger"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -36,9 +37,10 @@ var (
)
)

// Commented out this collector registration since Prometheus is running collect twice
// Couldn't see any difference in functionality
func init() {
prometheus.MustRegister(httpRequestsDuration)
prometheus.MustRegister(newMinioCollector())
}

// newMinioCollector describes the collector
Expand All @@ -63,7 +65,57 @@ func (c *minioCollector) Describe(ch chan<- *prometheus.Desc) {

// Collect is called by the Prometheus registry when collecting metrics.
func (c *minioCollector) Collect(ch chan<- prometheus.Metric) {

// Only report in Prometheus if MINIO_REPORT_METRICS is set
if report_minio_metrics {
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("minio", "metrics", "put_iops"),
"Current PUT IOPS of this Minio instance",
nil, nil),
prometheus.CounterValue,
float64(atomic.LoadUint64(&globalReportPutIOPS)),
)

ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("minio", "metrics", "get_iops"),
"Current GET IOPS of this Minio instance",
nil, nil),
prometheus.CounterValue,
float64(atomic.LoadUint64(&globalReportGetIOPS)),
)

ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("minio", "metrics", "del_iops"),
"Current DEL IOPS of this Minio instance",
nil, nil),
prometheus.CounterValue,
float64(atomic.LoadUint64(&globalReportDel)),
)

ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("minio", "metrics", "put_bw"),
"Current PUT BW of this Minio instance",
nil, nil),
prometheus.CounterValue,
float64(atomic.LoadUint64(&globalReportPutBW)),
)

ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("minio", "metrics", "get_bw"),
"Current GET BW of this Minio instance",
nil, nil),
prometheus.CounterValue,
float64(atomic.LoadUint64(&globalReportGetBW)),
)
}




// Always expose network stats

// Network Sent/Received Bytes
Expand Down
29 changes: 29 additions & 0 deletions cmd/object-handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,11 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r
s3Select.Evaluate(w)
s3Select.Close()

if (atomic.LoadUint32(&globalCollectMetrics) == 1) {
atomic.AddUint64(&globalCurrGetBW, uint64(objInfo.Size))
atomic.AddUint64(&globalCurrGetIOPS, 1)
}

// Get host and port from Request.RemoteAddr.
host, port, err := net.SplitHostPort(handlers.GetSourceIP(r))
if err != nil {
Expand Down Expand Up @@ -401,6 +406,10 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req
defer gr.Close()

objInfo = gr.ObjInfo
if (atomic.LoadUint32(&globalCollectMetrics) == 1) {
atomic.AddUint64(&globalCurrGetBW, uint64(objInfo.Size))
atomic.AddUint64(&globalCurrGetIOPS, 1)
}
}

if objectAPI.IsEncryptionSupported() {
Expand Down Expand Up @@ -706,6 +715,11 @@ func (api objectAPIHandlers) HeadObjectHandler(w http.ResponseWriter, r *http.Re
if (track_minio_stats) {
atomic.AddUint64(&globalTotalHeadObjQD, ^uint64(0))
}
//
if (atomic.LoadUint32(&globalCollectMetrics) == 1) {
atomic.AddUint64(&globalCurrGetBW, uint64(objInfo.Size))
atomic.AddUint64(&globalCurrGetIOPS, 1)
}
// Notify object accessed via a HEAD request.
sendEvent(eventArgs{
EventName: event.ObjectAccessedHead,
Expand Down Expand Up @@ -1181,6 +1195,11 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
objInfo.Size = actualSize
}

if atomic.LoadUint32(&globalCollectMetrics) == 1 {
atomic.AddUint64(&globalCurrPutBW, uint64(objInfo.Size))
atomic.AddUint64(&globalCurrPutIOPS, 1)
}

// Notify object created event.
sendEvent(eventArgs{
EventName: event.ObjectCreatedCopy,
Expand Down Expand Up @@ -1419,6 +1438,11 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
return
}

if atomic.LoadUint32(&globalCollectMetrics) == 1 {
atomic.AddUint64(&globalCurrPutBW, uint64(objInfo.Size))
atomic.AddUint64(&globalCurrPutIOPS, 1)
}

etag := objInfo.ETag
if objInfo.IsCompressed() {
// Ignore compressed ETag.
Expand Down Expand Up @@ -2562,5 +2586,10 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
}
// Ignore delete object errors while replying to client, since we are suppposed to reply only 204.
}

if atomic.LoadUint32(&globalCollectMetrics) == 1 {
atomic.AddUint64(&globalCurrDel, 1)
}

writeSuccessNoContent(w)
}
35 changes: 35 additions & 0 deletions cmd/xl-sets.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"os"
"strconv"
Expand Down Expand Up @@ -347,6 +348,35 @@ func (s *xlSets) UpdateCountersToDisk() {
time.Sleep(time.Duration(init_stati) * time.Second)
}
}
// BW/IOPS reporting thread
func metricsReporting() {
fmt.Println("### Metrics reporting thread started")
for {
// Need to use atomic operation for setting and checking whether to collect
// It's probably fine to not use atomic within this thread, but IO threads will need it accurately determine whether to record
// Go 1.12 doesn't have atomic structs (Go 1.19 introduced atomic structs)

// Signal to start recording IO, then sleep for 1s and report the counters
atomic.StoreUint32(&globalCollectMetrics, 1)

time.Sleep(1 * time.Second)
atomic.StoreUint32(&globalCollectMetrics, 0)
// Signal to stop after collecting for 1s

// Record counters to a variable for Prometheus to report
atomic.StoreUint64(&globalReportPutIOPS, atomic.LoadUint64(&globalCurrPutIOPS))
atomic.StoreUint64(&globalReportGetIOPS, atomic.LoadUint64(&globalCurrGetIOPS))
atomic.StoreUint64(&globalReportPutBW, atomic.LoadUint64(&globalCurrPutBW))
atomic.StoreUint64(&globalReportGetBW, atomic.LoadUint64(&globalCurrGetBW))
atomic.StoreUint64(&globalReportDel, atomic.LoadUint64(&globalCurrDel))
// Reset counters after recording the last second of metrics
atomic.StoreUint64(&globalCurrPutIOPS, 0)
atomic.StoreUint64(&globalCurrGetIOPS, 0)
atomic.StoreUint64(&globalCurrPutBW, 0)
atomic.StoreUint64(&globalCurrGetBW, 0)
atomic.StoreUint64(&globalCurrDel, 0)
}
}


func (s *xlSets) syncSharedVols() {
Expand Down Expand Up @@ -623,6 +653,11 @@ func newXLSets(endpoints EndpointList, format *formatXLV3, setCount int, drivesP
go s.UpdateCountersToDisk()
}

// Only record metrics if MINIO_REPORT_METRICS env variable is set
if (report_minio_metrics) {
go metricsReporting()
}

globalSC_read = false
if os.Getenv("MINIO_ENABLE_SC_READ") != "" {
fmt.Println("### Setting up for SC read.. ###")
Expand Down

0 comments on commit cdbbf3f

Please sign in to comment.