diff --git a/internal/collector/config.go b/internal/collector/config.go index 2c9eee5..cc0e7f7 100644 --- a/internal/collector/config.go +++ b/internal/collector/config.go @@ -37,6 +37,8 @@ type postgresServiceConfig struct { localService bool // blockSize defines size of data block Postgres operates. blockSize uint64 + // walBlockSize defines size of WAL block Postgres operates. + walBlockSize uint64 // walSegmentSize defines size of WAL segment Postgres operates. walSegmentSize uint64 // serverVersionNum defines version of Postgres in XXYYZZ format. @@ -90,17 +92,29 @@ func newPostgresServiceConfig(connStr string) (postgresServiceConfig, error) { config.blockSize = bsize + // Get Postgres WAL block size. + err = conn.Conn().QueryRow(context.Background(), "SELECT setting FROM pg_settings WHERE name = 'wal_block_size'").Scan(&setting) + if err != nil { + return config, err + } + walBlockSize, err := strconv.ParseUint(setting, 10, 64) + if err != nil { + return config, err + } + + config.walBlockSize = walBlockSize + // Get Postgres WAL segment size. err = conn.Conn().QueryRow(context.Background(), "SELECT setting FROM pg_settings WHERE name = 'wal_segment_size'").Scan(&setting) if err != nil { return config, err } - segSize, err := strconv.ParseUint(setting, 10, 64) + walSegSize, err := strconv.ParseUint(setting, 10, 64) if err != nil { return config, err } - config.walSegmentSize = segSize + config.walSegmentSize = walSegSize // Get Postgres server version err = conn.Conn().QueryRow(context.Background(), "SELECT setting FROM pg_settings WHERE name = 'server_version_num'").Scan(&setting) diff --git a/internal/collector/postgres_wal.go b/internal/collector/postgres_wal.go index d9d388a..a21f034 100644 --- a/internal/collector/postgres_wal.go +++ b/internal/collector/postgres_wal.go @@ -1,42 +1,100 @@ package collector import ( - "context" "github.com/prometheus/client_golang/prometheus" "github.com/weaponry/pgscv/internal/log" "github.com/weaponry/pgscv/internal/model" "github.com/weaponry/pgscv/internal/store" + "strconv" ) const ( postgresWalQuery96 = "SELECT pg_is_in_recovery()::int AS recovery, " + "(case pg_is_in_recovery() when 't' then pg_last_xlog_receive_location() else pg_current_xlog_location() end) - '0/00000000' AS wal_bytes" - postgresWalQuertLatest = "SELECT pg_is_in_recovery()::int AS recovery, " + + postgresWalQuery13 = "SELECT pg_is_in_recovery()::int AS recovery, " + "(case pg_is_in_recovery() when 't' then pg_last_wal_receive_lsn() else pg_current_wal_lsn() end) - '0/00000000' AS wal_bytes" + + postgresWalQueryLatest = "SELECT pg_is_in_recovery()::int AS recovery, wal_records, wal_fpi, " + + "(case pg_is_in_recovery() when 't' then pg_last_wal_receive_lsn() - '0/00000000' else wal_bytes end) AS wal_bytes, " + + "wal_buffers_full, wal_write, wal_sync, wal_write_time, wal_sync_time, " + + "extract('epoch' from stats_reset) as reset_time " + + "FROM pg_stat_wal" ) type postgresWalCollector struct { - labelNames []string - recovery typedDesc - wal typedDesc + recovery typedDesc + records typedDesc + writtenAllBytes typedDesc + writtenBytes typedDesc + buffersFull typedDesc + writes typedDesc + syncs typedDesc + secondsAll typedDesc + seconds typedDesc + resetUnix typedDesc } // NewPostgresWalCollector returns a new Collector exposing postgres WAL stats. // For details see https://www.postgresql.org/docs/current/monitoring-stats.html#PG-STAT-WAL-VIEW func NewPostgresWalCollector(constLabels labels, settings model.CollectorSettings) (Collector, error) { - var labelNames = []string{"client_addr", "user", "application_name", "state", "lag"} - return &postgresWalCollector{ - labelNames: labelNames, recovery: newBuiltinTypedDesc( descOpts{"postgres", "recovery", "info", "Current recovery state, 0 - not in recovery; 1 - in recovery.", 0}, prometheus.GaugeValue, nil, constLabels, settings.Filters, ), - wal: newBuiltinTypedDesc( - descOpts{"postgres", "wal", "written_bytes_total", "Total amount of WAL written (or received in case of standby), in bytes.", 0}, + records: newBuiltinTypedDesc( + descOpts{"postgres", "wal", "written_records_total", "Total amount of WAL records written (zero in case of standby).", 0}, + prometheus.CounterValue, + nil, constLabels, + settings.Filters, + ), + writtenAllBytes: newBuiltinTypedDesc( + descOpts{"postgres", "wal", "written_bytes_all_total", "Total amount of WAL written (or received in case of standby), in bytes.", 0}, + prometheus.CounterValue, + nil, constLabels, + settings.Filters, + ), + writtenBytes: newBuiltinTypedDesc( + descOpts{"postgres", "wal", "written_bytes_total", "Total amount of WAL written by each type of WAL (zero in case of standby), in bytes.", 0}, + prometheus.CounterValue, + []string{"wal"}, constLabels, + settings.Filters, + ), + buffersFull: newBuiltinTypedDesc( + descOpts{"postgres", "wal", "buffers_full_total", "Total number of times WAL data was written to disk because WAL buffers became full (zero in case of standby).", 0}, + prometheus.CounterValue, + nil, constLabels, + settings.Filters, + ), + writes: newBuiltinTypedDesc( + descOpts{"postgres", "wal", "write_total", "Total number of times WAL buffers were written out to disk via XLogWrite request (zero in case of standby).", 0}, + prometheus.CounterValue, + nil, constLabels, + settings.Filters, + ), + syncs: newBuiltinTypedDesc( + descOpts{"postgres", "wal", "sync_total", "Total number of times WAL files were synced to disk via issue_xlog_fsync request (zero in case of standby).", 0}, + prometheus.CounterValue, + nil, constLabels, + settings.Filters, + ), + secondsAll: newBuiltinTypedDesc( + descOpts{"postgres", "wal", "seconds_all_total", "Total amount of time spent processing WAL buffers (zero in case of standby), in seconds.", .001}, + prometheus.CounterValue, + nil, constLabels, + settings.Filters, + ), + seconds: newBuiltinTypedDesc( + descOpts{"postgres", "wal", "seconds_total", "Total amount of time spent processing WAL buffers by each operation (zero in case of standby), in seconds.", .001}, + prometheus.CounterValue, + []string{"op"}, constLabels, + settings.Filters, + ), + resetUnix: newBuiltinTypedDesc( + descOpts{"postgres", "wal", "stats_reset_time", "Time at which WAL statistics were last reset, in unixtime.", 0}, prometheus.CounterValue, nil, constLabels, settings.Filters, @@ -52,26 +110,94 @@ func (c *postgresWalCollector) Update(config Config, ch chan<- prometheus.Metric } defer conn.Close() - // Get recovery state. - var recovery int - var walBytes int64 - err = conn.Conn().QueryRow(context.TODO(), selectWalQuery(config.serverVersionNum)).Scan(&recovery, &walBytes) + // Get WAL usage stats. + res, err := conn.Query(selectWalQuery(config.serverVersionNum)) if err != nil { - log.Warnf("get recovery state failed: %s; skip", err) - } else { - ch <- c.recovery.newConstMetric(float64(recovery)) - ch <- c.wal.newConstMetric(float64(walBytes)) + return err + } + + stats := parsePostgresWalStats(res) + + for k, v := range stats { + switch k { + case "recovery": + ch <- c.recovery.newConstMetric(v) + case "wal_records": + ch <- c.records.newConstMetric(v) + case "wal_fpi": + ch <- c.writtenBytes.newConstMetric(v*float64(config.walBlockSize), "fpi") + case "wal_bytes": + ch <- c.writtenAllBytes.newConstMetric(v) + case "wal_buffers_full": + ch <- c.buffersFull.newConstMetric(v) + case "wal_write": + ch <- c.writes.newConstMetric(v) + case "wal_sync": + ch <- c.syncs.newConstMetric(v) + case "wal_write_time": + ch <- c.seconds.newConstMetric(v, "write") + case "wal_sync_time": + ch <- c.seconds.newConstMetric(v, "sync") + case "wal_all_time": + ch <- c.secondsAll.newConstMetric(v) + case "reset_time": + ch <- c.resetUnix.newConstMetric(v) + default: + continue + } + } + + // Collect WAL bytes of regular (non-FPI) records + if fpi, ok := stats["wal_fpi"]; ok { + ch <- c.writtenBytes.newConstMetric(stats["wal_bytes"]-(fpi*float64(config.walBlockSize)), "regular") } return nil } +// parsePostgresWalStats parses PGResult and returns struct with data values +func parsePostgresWalStats(r *model.PGResult) map[string]float64 { + log.Debug("parse postgres WAL stats") + + stats := map[string]float64{} + + for _, row := range r.Rows { + for i, colname := range r.Colnames { + // Skip empty (NULL) values. + if !row[i].Valid { + continue + } + + // Get data value and convert it to float64 used by Prometheus. + v, err := strconv.ParseFloat(row[i].String, 64) + if err != nil { + log.Errorf("invalid input, parse '%s' failed: %s; skip", row[i].String, err) + continue + } + + // Column name used as a key. + stats[string(colname.Name)] = v + } + } + + // Count total time spent on WAL buffers processing. + wTime, ok1 := stats["wal_write_time"] + sTime, ok2 := stats["wal_sync_time"] + if ok1 && ok2 { + stats["wal_all_time"] = wTime + sTime + } + + return stats +} + // selectWalQuery returns suitable wal state query depending on passed version. func selectWalQuery(version int) string { switch { case version < PostgresV10: return postgresWalQuery96 + case version < PostgresV14: + return postgresWalQuery13 default: - return postgresWalQuertLatest + return postgresWalQueryLatest } } diff --git a/internal/collector/postgres_wal_test.go b/internal/collector/postgres_wal_test.go index 65cc055..f5bd76f 100644 --- a/internal/collector/postgres_wal_test.go +++ b/internal/collector/postgres_wal_test.go @@ -1,6 +1,8 @@ package collector import ( + "database/sql" + "github.com/jackc/pgproto3/v2" "github.com/stretchr/testify/assert" "github.com/weaponry/pgscv/internal/model" "testing" @@ -10,9 +12,20 @@ func TestPostgresWalCollector_Update(t *testing.T) { var input = pipelineInput{ required: []string{ "postgres_recovery_info", + "postgres_wal_written_bytes_all_total", + }, + // TODO: wait until Postgres 14 has been released, update Postgres version on pgscv-testing docker image + // and move these metrics to 'required' slice. + optional: []string{ + "postgres_wal_written_records_total", "postgres_wal_written_bytes_total", + "postgres_wal_buffers_full_total", + "postgres_wal_write_total", + "postgres_wal_sync_total", + "postgres_wal_seconds_all_total", + "postgres_wal_seconds_total", + "postgres_wal_stats_reset_time", }, - optional: []string{}, collector: NewPostgresWalCollector, service: model.ServiceTypePostgresql, } @@ -20,6 +33,61 @@ func TestPostgresWalCollector_Update(t *testing.T) { pipeline(t, input) } +func Test_parsePostgresWalStats(t *testing.T) { + var testCases = []struct { + name string + res *model.PGResult + want map[string]float64 + }{ + { + name: "pg14", + res: &model.PGResult{ + Nrows: 1, + Ncols: 10, + Colnames: []pgproto3.FieldDescription{ + {Name: []byte("recovery")}, + {Name: []byte("wal_records")}, {Name: []byte("wal_fpi")}, {Name: []byte("wal_bytes")}, + {Name: []byte("wal_buffers_full")}, {Name: []byte("wal_write")}, {Name: []byte("wal_sync")}, + {Name: []byte("wal_write_time")}, {Name: []byte("wal_sync_time")}, {Name: []byte("reset_time")}, + }, + Rows: [][]sql.NullString{ + { + {String: "0", Valid: true}, + {String: "58452", Valid: true}, {String: "4712", Valid: true}, {String: "587241", Valid: true}, + {String: "1234", Valid: true}, {String: "48541", Valid: true}, {String: "8541", Valid: true}, + {String: "874215", Valid: true}, {String: "48736", Valid: true}, {String: "123456789", Valid: true}, + }, + }, + }, + want: map[string]float64{ + "recovery": 0, + "wal_records": 58452, "wal_fpi": 4712, "wal_bytes": 587241, + "wal_buffers_full": 1234, "wal_write": 48541, "wal_sync": 8541, + "wal_write_time": 874215, "wal_sync_time": 48736, "wal_all_time": 922951, "reset_time": 123456789, + }, + }, + { + name: "pg13", + res: &model.PGResult{ + Nrows: 1, + Ncols: 2, + Colnames: []pgproto3.FieldDescription{ + {Name: []byte("recovery")}, {Name: []byte("wal_bytes")}, + }, + Rows: [][]sql.NullString{{{String: "0", Valid: true}, {String: "123456789", Valid: true}}}, + }, + want: map[string]float64{"recovery": 0, "wal_bytes": 123456789}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + got := parsePostgresWalStats(tc.res) + assert.EqualValues(t, tc.want, got) + }) + } +} + func Test_selectWalQuery(t *testing.T) { var testcases = []struct { version int @@ -27,8 +95,10 @@ func Test_selectWalQuery(t *testing.T) { }{ {version: 90600, want: postgresWalQuery96}, {version: 90605, want: postgresWalQuery96}, - {version: 100000, want: postgresWalQuertLatest}, - {version: 100005, want: postgresWalQuertLatest}, + {version: 100000, want: postgresWalQuery13}, + {version: 100005, want: postgresWalQuery13}, + {version: 130005, want: postgresWalQuery13}, + {version: 140005, want: postgresWalQueryLatest}, } for _, tc := range testcases {