From ed51bf92ea48208d6542a1cc54167b872cf073bf Mon Sep 17 00:00:00 2001 From: Lesovsky Alexey Date: Sat, 9 Apr 2022 12:21:07 +0500 Subject: [PATCH] Add collecting user/database values for "postgres_activity_connections_in_flight" metric. (#33) --- internal/collector/postgres_activity.go | 148 +++++++++---------- internal/collector/postgres_activity_test.go | 48 ++++-- 2 files changed, 106 insertions(+), 90 deletions(-) diff --git a/internal/collector/postgres_activity.go b/internal/collector/postgres_activity.go index 01681aa..6a37695 100644 --- a/internal/collector/postgres_activity.go +++ b/internal/collector/postgres_activity.go @@ -66,7 +66,7 @@ const ( weLock = "Lock" ) -// postgresActivityCollector ... +// postgresActivityCollector contains metrics related to Postgres activity. type postgresActivityCollector struct { up typedDesc startTime typedDesc @@ -80,10 +80,10 @@ type postgresActivityCollector struct { re queryRegexp // regexps for queries classification } -// NewPostgresActivityCollector returns a new Collector exposing postgres databases stats. -// For details see -// 1. https://www.postgresql.org/docs/current/monitoring-stats.html#PG-STAT-ACTIVITY-VIEW -// 2. https://www.postgresql.org/docs/current/view-pg-prepared-xacts.html +// NewPostgresActivityCollector returns a new Collector exposing postgres activity stats. +// For details see: +// 1. https://www.postgresql.org/docs/current/monitoring-stats.html#PG-STAT-ACTIVITY-VIEW +// 2. https://www.postgresql.org/docs/current/view-pg-prepared-xacts.html func NewPostgresActivityCollector(constLabels labels, settings model.CollectorSettings) (Collector, error) { return &postgresActivityCollector{ up: newBuiltinTypedDesc( @@ -107,7 +107,7 @@ func NewPostgresActivityCollector(constLabels labels, settings model.CollectorSe states: newBuiltinTypedDesc( descOpts{"postgres", "activity", "connections_in_flight", "Number of connections in-flight in each state.", 0}, prometheus.GaugeValue, - []string{"state"}, constLabels, + []string{"user", "database", "state"}, constLabels, settings.Filters, ), statesAll: newBuiltinTypedDesc( @@ -193,69 +193,52 @@ func (c *postgresActivityCollector) Update(config Config, ch chan<- prometheus.M } // connection states - // totals doesn't include waiting state because they already included in 'active' state. - var total = stats.active + stats.idle + stats.idlexact + stats.other - ch <- c.statesAll.newConstMetric(total) - ch <- c.states.newConstMetric(stats.active, "active") - ch <- c.states.newConstMetric(stats.idle, "idle") - ch <- c.states.newConstMetric(stats.idlexact, "idlexact") - ch <- c.states.newConstMetric(stats.other, "other") - ch <- c.states.newConstMetric(stats.waiting, "waiting") - - // prepared transactions - ch <- c.prepared.newConstMetric(stats.prepared) - - // max duration of user's idle_xacts per user/database. - for k, v := range stats.maxIdleUser { - if names := strings.Split(k, "/"); len(names) >= 2 { - ch <- c.activity.newConstMetric(v, names[0], names[1], "idlexact", "user") - } else { - log.Warnf("create max idlexact user activity failed: invalid input '%s'; skip", k) - } - } - // max duration of maintenance's idle_xacts per user/database. - for k, v := range stats.maxIdleMaint { - if names := strings.Split(k, "/"); len(names) >= 2 { - ch <- c.activity.newConstMetric(v, names[0], names[1], "idlexact", "maintenance") - } else { - log.Warnf("create max idlexact maintenance activity failed: invalid input '%s'; skip", k) - } - } - - // max duration of users active (running) activity per user/database. - for k, v := range stats.maxActiveUser { - if names := strings.Split(k, "/"); len(names) >= 2 { - ch <- c.activity.newConstMetric(v, names[0], names[1], "active", "user") - } else { - log.Warnf("create max running user activity failed: invalid input '%s'; skip", k) + var total float64 + + for tag, values := range map[string]map[string]float64{ + "active": stats.active, + "idle": stats.idle, + "idlexact": stats.idlexact, + "other": stats.other, + "waiting": stats.waiting, + } { + for k, v := range values { + if names := strings.Split(k, "/"); len(names) >= 2 { + ch <- c.states.newConstMetric(v, names[0], names[1], tag) + + // totals shouldn't include waiting state, because it's already included in 'active' state. + if tag != "waiting" { + total += v + } + } else { + log.Warnf("create state '%s' activity failed: insufficient number of fields in key '%s'; skip", tag, k) + } } } - // max duration of maintenance active (running) activity per user/database. - for k, v := range stats.maxActiveMaint { - if names := strings.Split(k, "/"); len(names) >= 2 { - ch <- c.activity.newConstMetric(v, names[0], names[1], "active", "maintenance") - } else { - log.Warnf("create max running maintenance activity failed: invalid input '%s'; skip", k) - } - } + ch <- c.statesAll.newConstMetric(total) - // max duration of users waiting activity per user/database. - for k, v := range stats.maxWaitUser { - if names := strings.Split(k, "/"); len(names) >= 2 { - ch <- c.activity.newConstMetric(v, names[0], names[1], "waiting", "user") - } else { - log.Warnf("create max waiting user activity failed: invalid input '%s'; skip", k) - } - } + // prepared transactions + ch <- c.prepared.newConstMetric(stats.prepared) - // max duration of maintenance waiting activity per user/database. - for k, v := range stats.maxWaitMaint { - if names := strings.Split(k, "/"); len(names) >= 2 { - ch <- c.activity.newConstMetric(v, names[0], names[1], "waiting", "maintenance") - } else { - log.Warnf("create max waiting maintenance activity failed: invalid input '%s'; skip", k) + // Longest activity by states, per user/database + for tag, values := range map[string]map[string]float64{ + "idlexact/user": stats.maxIdleUser, // max duration of user's idle_xacts per user/database. + "idlexact/maintenance": stats.maxIdleMaint, // max duration of maintenance's idle_xacts per user/database. + "active/user": stats.maxActiveUser, // max duration of users active (running) activity per user/database. + "active/maintenance": stats.maxActiveMaint, // max duration of maintenance active (running) activity per user/database. + "waiting/user": stats.maxWaitUser, // max duration of users waiting activity per user/database. + "waiting/maintenance": stats.maxWaitMaint, // max duration of maintenance waiting activity per user/database. + } { + for k, v := range values { + if names := strings.Split(k, "/"); len(names) >= 2 { + ff := strings.Split(tag, "/") + + ch <- c.activity.newConstMetric(v, names[0], names[1], ff[0], ff[1]) + } else { + log.Warnf("create '%s' max activity failed: insufficient number of fields in key '%s'; skip", tag, k) + } } } @@ -313,11 +296,11 @@ func newQueryRegexp() queryRegexp { // postgresActivityStat describes current activity type postgresActivityStat struct { - idle float64 // state = 'idle' - idlexact float64 // state IN ('idle in transaction', 'idle in transaction (aborted)')) - active float64 // state = 'active' - other float64 // state IN ('fastpath function call','disabled') - waiting float64 // wait_event_type = 'Lock' (or waiting = 't') + idle map[string]float64 // state = 'idle' + idlexact map[string]float64 // state IN ('idle in transaction', 'idle in transaction (aborted)')) + active map[string]float64 // state = 'active' + other map[string]float64 // state IN ('fastpath function call','disabled') + waiting map[string]float64 // wait_event_type = 'Lock' (or waiting = 't') waitEvents map[string]float64 // wait_event_type/wait_event counters prepared float64 // FROM pg_prepared_xacts maxIdleUser map[string]float64 // longest duration among idle transactions opened by user/database @@ -342,6 +325,11 @@ type postgresActivityStat struct { // newPostgresActivityStat creates new postgresActivityStat struct with initialized maps. func newPostgresActivityStat(re queryRegexp) postgresActivityStat { return postgresActivityStat{ + active: make(map[string]float64), + idle: make(map[string]float64), + idlexact: make(map[string]float64), + other: make(map[string]float64), + waiting: make(map[string]float64), waitEvents: make(map[string]float64), maxIdleUser: make(map[string]float64), maxIdleMaint: make(map[string]float64), @@ -394,17 +382,21 @@ func parsePostgresActivityStats(r *model.PGResult, re queryRegexp) postgresActiv // Check backend state: // 1) is not in a waiting state. Waiting backends are accounted separately. - // 2) don't have NULL database. This is a background daemon and should not accounted. + // 2) don't have NULL database. This is a background daemon and should not be counted. if (row[waitColIdx].String == weLock || row[waitColIdx].String == "t") || !row[databaseColIdx].Valid { continue } - stats.updateState(row[i].String) + userColIdx := colindexes["user"] + stats.updateState(row[userColIdx].String, row[databaseColIdx].String, row[i].String) case waitColumnName: // Count waiting activity only if waiting = 't' or wait_event_type = 'Lock'. if row[i].String == weLock || row[i].String == "t" { - stats.updateState("waiting") + userColIdx := colindexes["user"] + databaseColIdx := colindexes["database"] + + stats.updateState(row[userColIdx].String, row[databaseColIdx].String, "waiting") } // Update wait_event stats for newer Postgres versions. @@ -473,19 +465,21 @@ func parsePostgresActivityStats(r *model.PGResult, re queryRegexp) postgresActiv } // updateState increments counter depending on passed state of the backend. -func (s *postgresActivityStat) updateState(state string) { +func (s *postgresActivityStat) updateState(usename, datname, state string) { + key := usename + "/" + datname + // increment state-specific counter switch state { case stActive: - s.active++ + s.active[key]++ case stIdle: - s.idle++ + s.idle[key]++ case stIdleXact, stIdleXactAborted: - s.idlexact++ + s.idlexact[key]++ case stFastpath, stDisabled: - s.other++ + s.other[key]++ case stWaiting: - s.waiting++ + s.waiting[key]++ } } diff --git a/internal/collector/postgres_activity_test.go b/internal/collector/postgres_activity_test.go index ffb53bb..ba487a6 100644 --- a/internal/collector/postgres_activity_test.go +++ b/internal/collector/postgres_activity_test.go @@ -110,7 +110,11 @@ func Test_parsePostgresActivityStats(t *testing.T) { }, }, want: postgresActivityStat{ - active: 4, idle: 1, idlexact: 3, other: 1, waiting: 2, + active: map[string]float64{"testuser/testdb": 4}, + idle: map[string]float64{"testuser/testdb": 1}, + idlexact: map[string]float64{"testuser/testdb": 3}, + other: map[string]float64{"testuser/testdb": 1}, + waiting: map[string]float64{"testuser/testdb": 2}, waitEvents: map[string]float64{"Client/ClientRead": 4, "Lock/transactionid": 2}, maxIdleUser: map[string]float64{"testuser/testdb": 20}, maxIdleMaint: map[string]float64{"testuser/testdb": 28}, @@ -168,7 +172,12 @@ func Test_parsePostgresActivityStats(t *testing.T) { maxIdleUser: map[string]float64{}, maxIdleMaint: map[string]float64{}, maxActiveUser: map[string]float64{"testuser/testdb": 1}, maxActiveMaint: map[string]float64{"testuser/testdb": 1}, maxWaitUser: map[string]float64{}, maxWaitMaint: map[string]float64{}, - active: 22, querySelect: 2, queryMod: 4, queryDdl: 3, queryMaint: 7, queryWith: 1, queryCopy: 1, queryOther: 4, + active: map[string]float64{"testuser/testdb": 22}, + idle: map[string]float64{}, + idlexact: map[string]float64{}, + other: map[string]float64{}, + waiting: map[string]float64{}, + querySelect: 2, queryMod: 4, queryDdl: 3, queryMaint: 7, queryWith: 1, queryCopy: 1, queryOther: 4, vacuumOps: map[string]float64{"regular": 1, "user": 1, "wraparound": 0}, re: testRE, }, @@ -197,9 +206,14 @@ func Test_parsePostgresActivityStats(t *testing.T) { maxIdleUser: map[string]float64{}, maxIdleMaint: map[string]float64{}, maxActiveUser: map[string]float64{"testuser/testdb": 10}, maxActiveMaint: map[string]float64{}, maxWaitUser: map[string]float64{"testuser/testdb": 5}, maxWaitMaint: map[string]float64{}, - active: 1, waiting: 1, querySelect: 2, - vacuumOps: map[string]float64{"regular": 0, "user": 0, "wraparound": 0}, - re: testRE, + active: map[string]float64{"testuser/testdb": 1}, + idle: map[string]float64{}, + idlexact: map[string]float64{}, + other: map[string]float64{}, + waiting: map[string]float64{"testuser/testdb": 1}, + querySelect: 2, + vacuumOps: map[string]float64{"regular": 0, "user": 0, "wraparound": 0}, + re: testRE, }, }, } @@ -253,7 +267,8 @@ func Test_updateMaxIdletimeDuration(t *testing.T) { }, {value: "10", usename: "testuser", datname: "testdb", state: "idle in transaction", query: "UPDATE table", want: postgresActivityStat{ - waitEvents: map[string]float64{}, + active: map[string]float64{}, idle: map[string]float64{}, idlexact: map[string]float64{}, + waiting: map[string]float64{}, other: map[string]float64{}, waitEvents: map[string]float64{}, maxIdleUser: map[string]float64{"testuser/testdb": 10}, maxIdleMaint: map[string]float64{}, maxActiveUser: map[string]float64{}, maxActiveMaint: map[string]float64{}, maxWaitUser: map[string]float64{}, maxWaitMaint: map[string]float64{}, @@ -263,7 +278,8 @@ func Test_updateMaxIdletimeDuration(t *testing.T) { }, {value: "10", usename: "testuser", datname: "testdb", state: "idle in transaction", query: "autovacuum: VACUUM table", want: postgresActivityStat{ - waitEvents: map[string]float64{}, + active: map[string]float64{}, idle: map[string]float64{}, idlexact: map[string]float64{}, + waiting: map[string]float64{}, other: map[string]float64{}, waitEvents: map[string]float64{}, maxIdleUser: map[string]float64{}, maxIdleMaint: map[string]float64{"testuser/testdb": 10}, maxActiveUser: map[string]float64{}, maxActiveMaint: map[string]float64{}, maxWaitUser: map[string]float64{}, maxWaitMaint: map[string]float64{}, @@ -273,7 +289,8 @@ func Test_updateMaxIdletimeDuration(t *testing.T) { }, {value: "10", usename: "testuser", datname: "testdb", state: "idle in transaction", query: "VACUUM table", want: postgresActivityStat{ - waitEvents: map[string]float64{}, + active: map[string]float64{}, idle: map[string]float64{}, idlexact: map[string]float64{}, + waiting: map[string]float64{}, other: map[string]float64{}, waitEvents: map[string]float64{}, maxIdleUser: map[string]float64{}, maxIdleMaint: map[string]float64{"testuser/testdb": 10}, maxActiveUser: map[string]float64{}, maxActiveMaint: map[string]float64{}, maxWaitUser: map[string]float64{}, maxWaitMaint: map[string]float64{}, @@ -319,7 +336,8 @@ func Test_updateMaxRuntimeDuration(t *testing.T) { }, {value: "5", usename: "testuser", datname: "testdb", state: "active", query: "UPDATE table", want: postgresActivityStat{ - waitEvents: map[string]float64{}, + active: map[string]float64{}, idle: map[string]float64{}, idlexact: map[string]float64{}, + waiting: map[string]float64{}, other: map[string]float64{}, waitEvents: map[string]float64{}, maxIdleUser: map[string]float64{}, maxIdleMaint: map[string]float64{}, maxActiveUser: map[string]float64{"testuser/testdb": 5}, maxActiveMaint: map[string]float64{}, maxWaitUser: map[string]float64{}, maxWaitMaint: map[string]float64{}, @@ -329,7 +347,8 @@ func Test_updateMaxRuntimeDuration(t *testing.T) { }, {value: "6", usename: "testuser", datname: "testdb", state: "active", query: "autovacuum: VACUUM table", want: postgresActivityStat{ - waitEvents: map[string]float64{}, + active: map[string]float64{}, idle: map[string]float64{}, idlexact: map[string]float64{}, + waiting: map[string]float64{}, other: map[string]float64{}, waitEvents: map[string]float64{}, maxIdleUser: map[string]float64{}, maxIdleMaint: map[string]float64{}, maxActiveUser: map[string]float64{}, maxActiveMaint: map[string]float64{"testuser/testdb": 6}, maxWaitUser: map[string]float64{}, maxWaitMaint: map[string]float64{}, @@ -368,7 +387,8 @@ func Test_updateMaxWaittimeDuration(t *testing.T) { }, {value: "5", usename: "testuser", datname: "testdb", waiting: "Lock", query: "UPDATE table", want: postgresActivityStat{ - waitEvents: map[string]float64{}, + active: map[string]float64{}, idle: map[string]float64{}, idlexact: map[string]float64{}, + waiting: map[string]float64{}, other: map[string]float64{}, waitEvents: map[string]float64{}, maxIdleUser: map[string]float64{}, maxIdleMaint: map[string]float64{}, maxActiveUser: map[string]float64{}, maxActiveMaint: map[string]float64{}, maxWaitUser: map[string]float64{"testuser/testdb": 5}, maxWaitMaint: map[string]float64{}, @@ -378,7 +398,8 @@ func Test_updateMaxWaittimeDuration(t *testing.T) { }, {value: "6", usename: "testuser", datname: "testdb", waiting: "t", query: "autovacuum: VACUUM table", want: postgresActivityStat{ - waitEvents: map[string]float64{}, + active: map[string]float64{}, idle: map[string]float64{}, idlexact: map[string]float64{}, + waiting: map[string]float64{}, other: map[string]float64{}, waitEvents: map[string]float64{}, maxIdleUser: map[string]float64{}, maxIdleMaint: map[string]float64{}, maxActiveUser: map[string]float64{}, maxActiveMaint: map[string]float64{}, maxWaitUser: map[string]float64{}, maxWaitMaint: map[string]float64{"testuser/testdb": 6}, @@ -419,7 +440,8 @@ func Test_updateQueryStat(t *testing.T) { } assert.Equal(t, postgresActivityStat{ - waitEvents: map[string]float64{}, + active: map[string]float64{}, idle: map[string]float64{}, idlexact: map[string]float64{}, + waiting: map[string]float64{}, other: map[string]float64{}, waitEvents: map[string]float64{}, maxIdleUser: map[string]float64{}, maxIdleMaint: map[string]float64{}, maxActiveUser: map[string]float64{}, maxActiveMaint: map[string]float64{}, maxWaitUser: map[string]float64{}, maxWaitMaint: map[string]float64{},