Skip to content
This repository has been archived by the owner on Aug 22, 2024. It is now read-only.

Commit

Permalink
Add collecting user/database values for "postgres_activity_connection…
Browse files Browse the repository at this point in the history
…s_in_flight" metric. (#33)
  • Loading branch information
lesovsky authored Apr 9, 2022
1 parent 9ed2fef commit ed51bf9
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 90 deletions.
148 changes: 71 additions & 77 deletions internal/collector/postgres_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ const (
weLock = "Lock"
)

// postgresActivityCollector ...
// postgresActivityCollector contains metrics related to Postgres activity.
type postgresActivityCollector struct {
up typedDesc
startTime typedDesc
Expand All @@ -80,10 +80,10 @@ type postgresActivityCollector struct {
re queryRegexp // regexps for queries classification
}

// NewPostgresActivityCollector returns a new Collector exposing postgres databases stats.
// For details see
// 1. https://www.postgresql.org/docs/current/monitoring-stats.html#PG-STAT-ACTIVITY-VIEW
// 2. https://www.postgresql.org/docs/current/view-pg-prepared-xacts.html
// NewPostgresActivityCollector returns a new Collector exposing postgres activity stats.
// For details see:
// 1. https://www.postgresql.org/docs/current/monitoring-stats.html#PG-STAT-ACTIVITY-VIEW
// 2. https://www.postgresql.org/docs/current/view-pg-prepared-xacts.html
func NewPostgresActivityCollector(constLabels labels, settings model.CollectorSettings) (Collector, error) {
return &postgresActivityCollector{
up: newBuiltinTypedDesc(
Expand All @@ -107,7 +107,7 @@ func NewPostgresActivityCollector(constLabels labels, settings model.CollectorSe
states: newBuiltinTypedDesc(
descOpts{"postgres", "activity", "connections_in_flight", "Number of connections in-flight in each state.", 0},
prometheus.GaugeValue,
[]string{"state"}, constLabels,
[]string{"user", "database", "state"}, constLabels,
settings.Filters,
),
statesAll: newBuiltinTypedDesc(
Expand Down Expand Up @@ -193,69 +193,52 @@ func (c *postgresActivityCollector) Update(config Config, ch chan<- prometheus.M
}

// connection states
// totals doesn't include waiting state because they already included in 'active' state.
var total = stats.active + stats.idle + stats.idlexact + stats.other
ch <- c.statesAll.newConstMetric(total)
ch <- c.states.newConstMetric(stats.active, "active")
ch <- c.states.newConstMetric(stats.idle, "idle")
ch <- c.states.newConstMetric(stats.idlexact, "idlexact")
ch <- c.states.newConstMetric(stats.other, "other")
ch <- c.states.newConstMetric(stats.waiting, "waiting")

// prepared transactions
ch <- c.prepared.newConstMetric(stats.prepared)

// max duration of user's idle_xacts per user/database.
for k, v := range stats.maxIdleUser {
if names := strings.Split(k, "/"); len(names) >= 2 {
ch <- c.activity.newConstMetric(v, names[0], names[1], "idlexact", "user")
} else {
log.Warnf("create max idlexact user activity failed: invalid input '%s'; skip", k)
}
}

// max duration of maintenance's idle_xacts per user/database.
for k, v := range stats.maxIdleMaint {
if names := strings.Split(k, "/"); len(names) >= 2 {
ch <- c.activity.newConstMetric(v, names[0], names[1], "idlexact", "maintenance")
} else {
log.Warnf("create max idlexact maintenance activity failed: invalid input '%s'; skip", k)
}
}

// max duration of users active (running) activity per user/database.
for k, v := range stats.maxActiveUser {
if names := strings.Split(k, "/"); len(names) >= 2 {
ch <- c.activity.newConstMetric(v, names[0], names[1], "active", "user")
} else {
log.Warnf("create max running user activity failed: invalid input '%s'; skip", k)
var total float64

for tag, values := range map[string]map[string]float64{
"active": stats.active,
"idle": stats.idle,
"idlexact": stats.idlexact,
"other": stats.other,
"waiting": stats.waiting,
} {
for k, v := range values {
if names := strings.Split(k, "/"); len(names) >= 2 {
ch <- c.states.newConstMetric(v, names[0], names[1], tag)

// totals shouldn't include waiting state, because it's already included in 'active' state.
if tag != "waiting" {
total += v
}
} else {
log.Warnf("create state '%s' activity failed: insufficient number of fields in key '%s'; skip", tag, k)
}
}
}

// max duration of maintenance active (running) activity per user/database.
for k, v := range stats.maxActiveMaint {
if names := strings.Split(k, "/"); len(names) >= 2 {
ch <- c.activity.newConstMetric(v, names[0], names[1], "active", "maintenance")
} else {
log.Warnf("create max running maintenance activity failed: invalid input '%s'; skip", k)
}
}
ch <- c.statesAll.newConstMetric(total)

// max duration of users waiting activity per user/database.
for k, v := range stats.maxWaitUser {
if names := strings.Split(k, "/"); len(names) >= 2 {
ch <- c.activity.newConstMetric(v, names[0], names[1], "waiting", "user")
} else {
log.Warnf("create max waiting user activity failed: invalid input '%s'; skip", k)
}
}
// prepared transactions
ch <- c.prepared.newConstMetric(stats.prepared)

// max duration of maintenance waiting activity per user/database.
for k, v := range stats.maxWaitMaint {
if names := strings.Split(k, "/"); len(names) >= 2 {
ch <- c.activity.newConstMetric(v, names[0], names[1], "waiting", "maintenance")
} else {
log.Warnf("create max waiting maintenance activity failed: invalid input '%s'; skip", k)
// Longest activity by states, per user/database
for tag, values := range map[string]map[string]float64{
"idlexact/user": stats.maxIdleUser, // max duration of user's idle_xacts per user/database.
"idlexact/maintenance": stats.maxIdleMaint, // max duration of maintenance's idle_xacts per user/database.
"active/user": stats.maxActiveUser, // max duration of users active (running) activity per user/database.
"active/maintenance": stats.maxActiveMaint, // max duration of maintenance active (running) activity per user/database.
"waiting/user": stats.maxWaitUser, // max duration of users waiting activity per user/database.
"waiting/maintenance": stats.maxWaitMaint, // max duration of maintenance waiting activity per user/database.
} {
for k, v := range values {
if names := strings.Split(k, "/"); len(names) >= 2 {
ff := strings.Split(tag, "/")

ch <- c.activity.newConstMetric(v, names[0], names[1], ff[0], ff[1])
} else {
log.Warnf("create '%s' max activity failed: insufficient number of fields in key '%s'; skip", tag, k)
}
}
}

Expand Down Expand Up @@ -313,11 +296,11 @@ func newQueryRegexp() queryRegexp {

// postgresActivityStat describes current activity
type postgresActivityStat struct {
idle float64 // state = 'idle'
idlexact float64 // state IN ('idle in transaction', 'idle in transaction (aborted)'))
active float64 // state = 'active'
other float64 // state IN ('fastpath function call','disabled')
waiting float64 // wait_event_type = 'Lock' (or waiting = 't')
idle map[string]float64 // state = 'idle'
idlexact map[string]float64 // state IN ('idle in transaction', 'idle in transaction (aborted)'))
active map[string]float64 // state = 'active'
other map[string]float64 // state IN ('fastpath function call','disabled')
waiting map[string]float64 // wait_event_type = 'Lock' (or waiting = 't')
waitEvents map[string]float64 // wait_event_type/wait_event counters
prepared float64 // FROM pg_prepared_xacts
maxIdleUser map[string]float64 // longest duration among idle transactions opened by user/database
Expand All @@ -342,6 +325,11 @@ type postgresActivityStat struct {
// newPostgresActivityStat creates new postgresActivityStat struct with initialized maps.
func newPostgresActivityStat(re queryRegexp) postgresActivityStat {
return postgresActivityStat{
active: make(map[string]float64),
idle: make(map[string]float64),
idlexact: make(map[string]float64),
other: make(map[string]float64),
waiting: make(map[string]float64),
waitEvents: make(map[string]float64),
maxIdleUser: make(map[string]float64),
maxIdleMaint: make(map[string]float64),
Expand Down Expand Up @@ -394,17 +382,21 @@ func parsePostgresActivityStats(r *model.PGResult, re queryRegexp) postgresActiv

// Check backend state:
// 1) is not in a waiting state. Waiting backends are accounted separately.
// 2) don't have NULL database. This is a background daemon and should not accounted.
// 2) don't have NULL database. This is a background daemon and should not be counted.

if (row[waitColIdx].String == weLock || row[waitColIdx].String == "t") || !row[databaseColIdx].Valid {
continue
}

stats.updateState(row[i].String)
userColIdx := colindexes["user"]
stats.updateState(row[userColIdx].String, row[databaseColIdx].String, row[i].String)
case waitColumnName:
// Count waiting activity only if waiting = 't' or wait_event_type = 'Lock'.
if row[i].String == weLock || row[i].String == "t" {
stats.updateState("waiting")
userColIdx := colindexes["user"]
databaseColIdx := colindexes["database"]

stats.updateState(row[userColIdx].String, row[databaseColIdx].String, "waiting")
}

// Update wait_event stats for newer Postgres versions.
Expand Down Expand Up @@ -473,19 +465,21 @@ func parsePostgresActivityStats(r *model.PGResult, re queryRegexp) postgresActiv
}

// updateState increments counter depending on passed state of the backend.
func (s *postgresActivityStat) updateState(state string) {
func (s *postgresActivityStat) updateState(usename, datname, state string) {
key := usename + "/" + datname

// increment state-specific counter
switch state {
case stActive:
s.active++
s.active[key]++
case stIdle:
s.idle++
s.idle[key]++
case stIdleXact, stIdleXactAborted:
s.idlexact++
s.idlexact[key]++
case stFastpath, stDisabled:
s.other++
s.other[key]++
case stWaiting:
s.waiting++
s.waiting[key]++
}
}

Expand Down
48 changes: 35 additions & 13 deletions internal/collector/postgres_activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,11 @@ func Test_parsePostgresActivityStats(t *testing.T) {
},
},
want: postgresActivityStat{
active: 4, idle: 1, idlexact: 3, other: 1, waiting: 2,
active: map[string]float64{"testuser/testdb": 4},
idle: map[string]float64{"testuser/testdb": 1},
idlexact: map[string]float64{"testuser/testdb": 3},
other: map[string]float64{"testuser/testdb": 1},
waiting: map[string]float64{"testuser/testdb": 2},
waitEvents: map[string]float64{"Client/ClientRead": 4, "Lock/transactionid": 2},
maxIdleUser: map[string]float64{"testuser/testdb": 20},
maxIdleMaint: map[string]float64{"testuser/testdb": 28},
Expand Down Expand Up @@ -168,7 +172,12 @@ func Test_parsePostgresActivityStats(t *testing.T) {
maxIdleUser: map[string]float64{}, maxIdleMaint: map[string]float64{},
maxActiveUser: map[string]float64{"testuser/testdb": 1}, maxActiveMaint: map[string]float64{"testuser/testdb": 1},
maxWaitUser: map[string]float64{}, maxWaitMaint: map[string]float64{},
active: 22, querySelect: 2, queryMod: 4, queryDdl: 3, queryMaint: 7, queryWith: 1, queryCopy: 1, queryOther: 4,
active: map[string]float64{"testuser/testdb": 22},
idle: map[string]float64{},
idlexact: map[string]float64{},
other: map[string]float64{},
waiting: map[string]float64{},
querySelect: 2, queryMod: 4, queryDdl: 3, queryMaint: 7, queryWith: 1, queryCopy: 1, queryOther: 4,
vacuumOps: map[string]float64{"regular": 1, "user": 1, "wraparound": 0},
re: testRE,
},
Expand Down Expand Up @@ -197,9 +206,14 @@ func Test_parsePostgresActivityStats(t *testing.T) {
maxIdleUser: map[string]float64{}, maxIdleMaint: map[string]float64{},
maxActiveUser: map[string]float64{"testuser/testdb": 10}, maxActiveMaint: map[string]float64{},
maxWaitUser: map[string]float64{"testuser/testdb": 5}, maxWaitMaint: map[string]float64{},
active: 1, waiting: 1, querySelect: 2,
vacuumOps: map[string]float64{"regular": 0, "user": 0, "wraparound": 0},
re: testRE,
active: map[string]float64{"testuser/testdb": 1},
idle: map[string]float64{},
idlexact: map[string]float64{},
other: map[string]float64{},
waiting: map[string]float64{"testuser/testdb": 1},
querySelect: 2,
vacuumOps: map[string]float64{"regular": 0, "user": 0, "wraparound": 0},
re: testRE,
},
},
}
Expand Down Expand Up @@ -253,7 +267,8 @@ func Test_updateMaxIdletimeDuration(t *testing.T) {
},
{value: "10", usename: "testuser", datname: "testdb", state: "idle in transaction", query: "UPDATE table",
want: postgresActivityStat{
waitEvents: map[string]float64{},
active: map[string]float64{}, idle: map[string]float64{}, idlexact: map[string]float64{},
waiting: map[string]float64{}, other: map[string]float64{}, waitEvents: map[string]float64{},
maxIdleUser: map[string]float64{"testuser/testdb": 10}, maxIdleMaint: map[string]float64{},
maxActiveUser: map[string]float64{}, maxActiveMaint: map[string]float64{},
maxWaitUser: map[string]float64{}, maxWaitMaint: map[string]float64{},
Expand All @@ -263,7 +278,8 @@ func Test_updateMaxIdletimeDuration(t *testing.T) {
},
{value: "10", usename: "testuser", datname: "testdb", state: "idle in transaction", query: "autovacuum: VACUUM table",
want: postgresActivityStat{
waitEvents: map[string]float64{},
active: map[string]float64{}, idle: map[string]float64{}, idlexact: map[string]float64{},
waiting: map[string]float64{}, other: map[string]float64{}, waitEvents: map[string]float64{},
maxIdleUser: map[string]float64{}, maxIdleMaint: map[string]float64{"testuser/testdb": 10},
maxActiveUser: map[string]float64{}, maxActiveMaint: map[string]float64{},
maxWaitUser: map[string]float64{}, maxWaitMaint: map[string]float64{},
Expand All @@ -273,7 +289,8 @@ func Test_updateMaxIdletimeDuration(t *testing.T) {
},
{value: "10", usename: "testuser", datname: "testdb", state: "idle in transaction", query: "VACUUM table",
want: postgresActivityStat{
waitEvents: map[string]float64{},
active: map[string]float64{}, idle: map[string]float64{}, idlexact: map[string]float64{},
waiting: map[string]float64{}, other: map[string]float64{}, waitEvents: map[string]float64{},
maxIdleUser: map[string]float64{}, maxIdleMaint: map[string]float64{"testuser/testdb": 10},
maxActiveUser: map[string]float64{}, maxActiveMaint: map[string]float64{},
maxWaitUser: map[string]float64{}, maxWaitMaint: map[string]float64{},
Expand Down Expand Up @@ -319,7 +336,8 @@ func Test_updateMaxRuntimeDuration(t *testing.T) {
},
{value: "5", usename: "testuser", datname: "testdb", state: "active", query: "UPDATE table",
want: postgresActivityStat{
waitEvents: map[string]float64{},
active: map[string]float64{}, idle: map[string]float64{}, idlexact: map[string]float64{},
waiting: map[string]float64{}, other: map[string]float64{}, waitEvents: map[string]float64{},
maxIdleUser: map[string]float64{}, maxIdleMaint: map[string]float64{},
maxActiveUser: map[string]float64{"testuser/testdb": 5}, maxActiveMaint: map[string]float64{},
maxWaitUser: map[string]float64{}, maxWaitMaint: map[string]float64{},
Expand All @@ -329,7 +347,8 @@ func Test_updateMaxRuntimeDuration(t *testing.T) {
},
{value: "6", usename: "testuser", datname: "testdb", state: "active", query: "autovacuum: VACUUM table",
want: postgresActivityStat{
waitEvents: map[string]float64{},
active: map[string]float64{}, idle: map[string]float64{}, idlexact: map[string]float64{},
waiting: map[string]float64{}, other: map[string]float64{}, waitEvents: map[string]float64{},
maxIdleUser: map[string]float64{}, maxIdleMaint: map[string]float64{},
maxActiveUser: map[string]float64{}, maxActiveMaint: map[string]float64{"testuser/testdb": 6},
maxWaitUser: map[string]float64{}, maxWaitMaint: map[string]float64{},
Expand Down Expand Up @@ -368,7 +387,8 @@ func Test_updateMaxWaittimeDuration(t *testing.T) {
},
{value: "5", usename: "testuser", datname: "testdb", waiting: "Lock", query: "UPDATE table",
want: postgresActivityStat{
waitEvents: map[string]float64{},
active: map[string]float64{}, idle: map[string]float64{}, idlexact: map[string]float64{},
waiting: map[string]float64{}, other: map[string]float64{}, waitEvents: map[string]float64{},
maxIdleUser: map[string]float64{}, maxIdleMaint: map[string]float64{},
maxActiveUser: map[string]float64{}, maxActiveMaint: map[string]float64{},
maxWaitUser: map[string]float64{"testuser/testdb": 5}, maxWaitMaint: map[string]float64{},
Expand All @@ -378,7 +398,8 @@ func Test_updateMaxWaittimeDuration(t *testing.T) {
},
{value: "6", usename: "testuser", datname: "testdb", waiting: "t", query: "autovacuum: VACUUM table",
want: postgresActivityStat{
waitEvents: map[string]float64{},
active: map[string]float64{}, idle: map[string]float64{}, idlexact: map[string]float64{},
waiting: map[string]float64{}, other: map[string]float64{}, waitEvents: map[string]float64{},
maxIdleUser: map[string]float64{}, maxIdleMaint: map[string]float64{},
maxActiveUser: map[string]float64{}, maxActiveMaint: map[string]float64{},
maxWaitUser: map[string]float64{}, maxWaitMaint: map[string]float64{"testuser/testdb": 6},
Expand Down Expand Up @@ -419,7 +440,8 @@ func Test_updateQueryStat(t *testing.T) {
}

assert.Equal(t, postgresActivityStat{
waitEvents: map[string]float64{},
active: map[string]float64{}, idle: map[string]float64{}, idlexact: map[string]float64{},
waiting: map[string]float64{}, other: map[string]float64{}, waitEvents: map[string]float64{},
maxIdleUser: map[string]float64{}, maxIdleMaint: map[string]float64{},
maxActiveUser: map[string]float64{}, maxActiveMaint: map[string]float64{},
maxWaitUser: map[string]float64{}, maxWaitMaint: map[string]float64{},
Expand Down

0 comments on commit ed51bf9

Please sign in to comment.