Skip to content
This repository has been archived by the owner on Aug 22, 2024. It is now read-only.

Commit

Permalink
Add WAL metrics based on pg_stat_wal (introduced in Postgres 14).
Browse files Browse the repository at this point in the history
  • Loading branch information
lesovsky committed Sep 1, 2021
1 parent 6c7e7e6 commit 4b472d4
Show file tree
Hide file tree
Showing 3 changed files with 234 additions and 24 deletions.
18 changes: 16 additions & 2 deletions internal/collector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type postgresServiceConfig struct {
localService bool
// blockSize defines size of data block Postgres operates.
blockSize uint64
// walBlockSize defines size of WAL block Postgres operates.
walBlockSize uint64
// walSegmentSize defines size of WAL segment Postgres operates.
walSegmentSize uint64
// serverVersionNum defines version of Postgres in XXYYZZ format.
Expand Down Expand Up @@ -90,17 +92,29 @@ func newPostgresServiceConfig(connStr string) (postgresServiceConfig, error) {

config.blockSize = bsize

// Get Postgres WAL block size.
err = conn.Conn().QueryRow(context.Background(), "SELECT setting FROM pg_settings WHERE name = 'wal_block_size'").Scan(&setting)
if err != nil {
return config, err
}
walBlockSize, err := strconv.ParseUint(setting, 10, 64)
if err != nil {
return config, err
}

config.walBlockSize = walBlockSize

// Get Postgres WAL segment size.
err = conn.Conn().QueryRow(context.Background(), "SELECT setting FROM pg_settings WHERE name = 'wal_segment_size'").Scan(&setting)
if err != nil {
return config, err
}
segSize, err := strconv.ParseUint(setting, 10, 64)
walSegSize, err := strconv.ParseUint(setting, 10, 64)
if err != nil {
return config, err
}

config.walSegmentSize = segSize
config.walSegmentSize = walSegSize

// Get Postgres server version
err = conn.Conn().QueryRow(context.Background(), "SELECT setting FROM pg_settings WHERE name = 'server_version_num'").Scan(&setting)
Expand Down
164 changes: 145 additions & 19 deletions internal/collector/postgres_wal.go
Original file line number Diff line number Diff line change
@@ -1,42 +1,100 @@
package collector

import (
"context"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaponry/pgscv/internal/log"
"github.com/weaponry/pgscv/internal/model"
"github.com/weaponry/pgscv/internal/store"
"strconv"
)

const (
postgresWalQuery96 = "SELECT pg_is_in_recovery()::int AS recovery, " +
"(case pg_is_in_recovery() when 't' then pg_last_xlog_receive_location() else pg_current_xlog_location() end) - '0/00000000' AS wal_bytes"

postgresWalQuertLatest = "SELECT pg_is_in_recovery()::int AS recovery, " +
postgresWalQuery13 = "SELECT pg_is_in_recovery()::int AS recovery, " +
"(case pg_is_in_recovery() when 't' then pg_last_wal_receive_lsn() else pg_current_wal_lsn() end) - '0/00000000' AS wal_bytes"

postgresWalQueryLatest = "SELECT pg_is_in_recovery()::int AS recovery, wal_records, wal_fpi, " +
"(case pg_is_in_recovery() when 't' then pg_last_wal_receive_lsn() - '0/00000000' else wal_bytes end) AS wal_bytes, " +
"wal_buffers_full, wal_write, wal_sync, wal_write_time, wal_sync_time, " +
"extract('epoch' from stats_reset) as reset_time " +
"FROM pg_stat_wal"
)

type postgresWalCollector struct {
labelNames []string
recovery typedDesc
wal typedDesc
recovery typedDesc
records typedDesc
writtenAllBytes typedDesc
writtenBytes typedDesc
buffersFull typedDesc
writes typedDesc
syncs typedDesc
secondsAll typedDesc
seconds typedDesc
resetUnix typedDesc
}

// NewPostgresWalCollector returns a new Collector exposing postgres WAL stats.
// For details see https://www.postgresql.org/docs/current/monitoring-stats.html#PG-STAT-WAL-VIEW
func NewPostgresWalCollector(constLabels labels, settings model.CollectorSettings) (Collector, error) {
var labelNames = []string{"client_addr", "user", "application_name", "state", "lag"}

return &postgresWalCollector{
labelNames: labelNames,
recovery: newBuiltinTypedDesc(
descOpts{"postgres", "recovery", "info", "Current recovery state, 0 - not in recovery; 1 - in recovery.", 0},
prometheus.GaugeValue,
nil, constLabels,
settings.Filters,
),
wal: newBuiltinTypedDesc(
descOpts{"postgres", "wal", "written_bytes_total", "Total amount of WAL written (or received in case of standby), in bytes.", 0},
records: newBuiltinTypedDesc(
descOpts{"postgres", "wal", "written_records_total", "Total amount of WAL records written (zero in case of standby).", 0},
prometheus.CounterValue,
nil, constLabels,
settings.Filters,
),
writtenAllBytes: newBuiltinTypedDesc(
descOpts{"postgres", "wal", "written_bytes_all_total", "Total amount of WAL written (or received in case of standby), in bytes.", 0},
prometheus.CounterValue,
nil, constLabels,
settings.Filters,
),
writtenBytes: newBuiltinTypedDesc(
descOpts{"postgres", "wal", "written_bytes_total", "Total amount of WAL written by each type of WAL (zero in case of standby), in bytes.", 0},
prometheus.CounterValue,
[]string{"wal"}, constLabels,
settings.Filters,
),
buffersFull: newBuiltinTypedDesc(
descOpts{"postgres", "wal", "buffers_full_total", "Total number of times WAL data was written to disk because WAL buffers became full (zero in case of standby).", 0},
prometheus.CounterValue,
nil, constLabels,
settings.Filters,
),
writes: newBuiltinTypedDesc(
descOpts{"postgres", "wal", "write_total", "Total number of times WAL buffers were written out to disk via XLogWrite request (zero in case of standby).", 0},
prometheus.CounterValue,
nil, constLabels,
settings.Filters,
),
syncs: newBuiltinTypedDesc(
descOpts{"postgres", "wal", "sync_total", "Total number of times WAL files were synced to disk via issue_xlog_fsync request (zero in case of standby).", 0},
prometheus.CounterValue,
nil, constLabels,
settings.Filters,
),
secondsAll: newBuiltinTypedDesc(
descOpts{"postgres", "wal", "seconds_all_total", "Total amount of time spent processing WAL buffers (zero in case of standby), in seconds.", .001},
prometheus.CounterValue,
nil, constLabels,
settings.Filters,
),
seconds: newBuiltinTypedDesc(
descOpts{"postgres", "wal", "seconds_total", "Total amount of time spent processing WAL buffers by each operation (zero in case of standby), in seconds.", .001},
prometheus.CounterValue,
[]string{"op"}, constLabels,
settings.Filters,
),
resetUnix: newBuiltinTypedDesc(
descOpts{"postgres", "wal", "stats_reset_time", "Time at which WAL statistics were last reset, in unixtime.", 0},
prometheus.CounterValue,
nil, constLabels,
settings.Filters,
Expand All @@ -52,26 +110,94 @@ func (c *postgresWalCollector) Update(config Config, ch chan<- prometheus.Metric
}
defer conn.Close()

// Get recovery state.
var recovery int
var walBytes int64
err = conn.Conn().QueryRow(context.TODO(), selectWalQuery(config.serverVersionNum)).Scan(&recovery, &walBytes)
// Get WAL usage stats.
res, err := conn.Query(selectWalQuery(config.serverVersionNum))
if err != nil {
log.Warnf("get recovery state failed: %s; skip", err)
} else {
ch <- c.recovery.newConstMetric(float64(recovery))
ch <- c.wal.newConstMetric(float64(walBytes))
return err
}

stats := parsePostgresWalStats(res)

for k, v := range stats {
switch k {
case "recovery":
ch <- c.recovery.newConstMetric(v)
case "wal_records":
ch <- c.records.newConstMetric(v)
case "wal_fpi":
ch <- c.writtenBytes.newConstMetric(v*float64(config.walBlockSize), "fpi")
case "wal_bytes":
ch <- c.writtenAllBytes.newConstMetric(v)
case "wal_buffers_full":
ch <- c.buffersFull.newConstMetric(v)
case "wal_write":
ch <- c.writes.newConstMetric(v)
case "wal_sync":
ch <- c.syncs.newConstMetric(v)
case "wal_write_time":
ch <- c.seconds.newConstMetric(v, "write")
case "wal_sync_time":
ch <- c.seconds.newConstMetric(v, "sync")
case "wal_all_time":
ch <- c.secondsAll.newConstMetric(v)
case "reset_time":
ch <- c.resetUnix.newConstMetric(v)
default:
continue
}
}

// Collect WAL bytes of regular (non-FPI) records
if fpi, ok := stats["wal_fpi"]; ok {
ch <- c.writtenBytes.newConstMetric(stats["wal_bytes"]-(fpi*float64(config.walBlockSize)), "regular")
}

return nil
}

// parsePostgresWalStats parses PGResult and returns struct with data values
func parsePostgresWalStats(r *model.PGResult) map[string]float64 {
log.Debug("parse postgres WAL stats")

stats := map[string]float64{}

for _, row := range r.Rows {
for i, colname := range r.Colnames {
// Skip empty (NULL) values.
if !row[i].Valid {
continue
}

// Get data value and convert it to float64 used by Prometheus.
v, err := strconv.ParseFloat(row[i].String, 64)
if err != nil {
log.Errorf("invalid input, parse '%s' failed: %s; skip", row[i].String, err)
continue
}

// Column name used as a key.
stats[string(colname.Name)] = v
}
}

// Count total time spent on WAL buffers processing.
wTime, ok1 := stats["wal_write_time"]
sTime, ok2 := stats["wal_sync_time"]
if ok1 && ok2 {
stats["wal_all_time"] = wTime + sTime
}

return stats
}

// selectWalQuery returns suitable wal state query depending on passed version.
func selectWalQuery(version int) string {
switch {
case version < PostgresV10:
return postgresWalQuery96
case version < PostgresV14:
return postgresWalQuery13
default:
return postgresWalQuertLatest
return postgresWalQueryLatest
}
}
76 changes: 73 additions & 3 deletions internal/collector/postgres_wal_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package collector

import (
"database/sql"
"github.com/jackc/pgproto3/v2"
"github.com/stretchr/testify/assert"
"github.com/weaponry/pgscv/internal/model"
"testing"
Expand All @@ -10,25 +12,93 @@ func TestPostgresWalCollector_Update(t *testing.T) {
var input = pipelineInput{
required: []string{
"postgres_recovery_info",
"postgres_wal_written_bytes_all_total",
},
// TODO: wait until Postgres 14 has been released, update Postgres version on pgscv-testing docker image
// and move these metrics to 'required' slice.
optional: []string{
"postgres_wal_written_records_total",
"postgres_wal_written_bytes_total",
"postgres_wal_buffers_full_total",
"postgres_wal_write_total",
"postgres_wal_sync_total",
"postgres_wal_seconds_all_total",
"postgres_wal_seconds_total",
"postgres_wal_stats_reset_time",
},
optional: []string{},
collector: NewPostgresWalCollector,
service: model.ServiceTypePostgresql,
}

pipeline(t, input)
}

func Test_parsePostgresWalStats(t *testing.T) {
var testCases = []struct {
name string
res *model.PGResult
want map[string]float64
}{
{
name: "pg14",
res: &model.PGResult{
Nrows: 1,
Ncols: 10,
Colnames: []pgproto3.FieldDescription{
{Name: []byte("recovery")},
{Name: []byte("wal_records")}, {Name: []byte("wal_fpi")}, {Name: []byte("wal_bytes")},
{Name: []byte("wal_buffers_full")}, {Name: []byte("wal_write")}, {Name: []byte("wal_sync")},
{Name: []byte("wal_write_time")}, {Name: []byte("wal_sync_time")}, {Name: []byte("reset_time")},
},
Rows: [][]sql.NullString{
{
{String: "0", Valid: true},
{String: "58452", Valid: true}, {String: "4712", Valid: true}, {String: "587241", Valid: true},
{String: "1234", Valid: true}, {String: "48541", Valid: true}, {String: "8541", Valid: true},
{String: "874215", Valid: true}, {String: "48736", Valid: true}, {String: "123456789", Valid: true},
},
},
},
want: map[string]float64{
"recovery": 0,
"wal_records": 58452, "wal_fpi": 4712, "wal_bytes": 587241,
"wal_buffers_full": 1234, "wal_write": 48541, "wal_sync": 8541,
"wal_write_time": 874215, "wal_sync_time": 48736, "wal_all_time": 922951, "reset_time": 123456789,
},
},
{
name: "pg13",
res: &model.PGResult{
Nrows: 1,
Ncols: 2,
Colnames: []pgproto3.FieldDescription{
{Name: []byte("recovery")}, {Name: []byte("wal_bytes")},
},
Rows: [][]sql.NullString{{{String: "0", Valid: true}, {String: "123456789", Valid: true}}},
},
want: map[string]float64{"recovery": 0, "wal_bytes": 123456789},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
got := parsePostgresWalStats(tc.res)
assert.EqualValues(t, tc.want, got)
})
}
}

func Test_selectWalQuery(t *testing.T) {
var testcases = []struct {
version int
want string
}{
{version: 90600, want: postgresWalQuery96},
{version: 90605, want: postgresWalQuery96},
{version: 100000, want: postgresWalQuertLatest},
{version: 100005, want: postgresWalQuertLatest},
{version: 100000, want: postgresWalQuery13},
{version: 100005, want: postgresWalQuery13},
{version: 130005, want: postgresWalQuery13},
{version: 140005, want: postgresWalQueryLatest},
}

for _, tc := range testcases {
Expand Down

0 comments on commit 4b472d4

Please sign in to comment.