From 8e8058c4e3ed7ffe684977b44a0937a1c4108f1d Mon Sep 17 00:00:00 2001 From: Nassim ZERKA Date: Thu, 7 Jul 2022 23:07:44 +0200 Subject: [PATCH 1/9] wip null --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index c2ab138..be9047b 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,5 @@ wal-g-prometheus-exporter __pycache__/ .*.swp exporter.spec +wal +venv From b58c4b439d51386529387bfd8c755371eb27d660 Mon Sep 17 00:00:00 2001 From: Nassim ZERKA Date: Thu, 12 May 2022 22:02:39 +0200 Subject: [PATCH 2/9] add support for port setting as env var --- exporter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter.py b/exporter.py index eadfa9c..f49b973 100644 --- a/exporter.py +++ b/exporter.py @@ -40,7 +40,7 @@ if key != 'root': logging.getLogger(key).setLevel(logging.WARNING) -http_port = os.getenv("WALG_EXPORTER_PORT", "9351") +http_port = int(os.getenv("WALG_EXPORTER_PORT", "9351")) # Base backup update # ------------------ From a06de43c5935c8a3b000931f57b1cbb9c6e2d302 Mon Sep 17 00:00:00 2001 From: Nassim ZERKA Date: Fri, 13 May 2022 17:29:44 +0200 Subject: [PATCH 3/9] fix: run query to read archive status with a function that is a privilege to pg_monitor role --- exporter.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/exporter.py b/exporter.py index f49b973..dbd5997 100644 --- a/exporter.py +++ b/exporter.py @@ -278,10 +278,10 @@ def xlog_ready_callback(self): ) as db_connection: db_connection.autocommit = True - with db_connection.cursor(cursor_factory=DictCursor) as c: - c.execute("SELECT COUNT(*) FROM pg_ls_dir('pg_wal') WHERE pg_ls_dir ~ '^[0-9A-F]{24}.ready';") + with db_connection.cursor(cursor_factory=psycopg2.extras.DictCursor) as c: + c.execute("SELECT COUNT(*) FROM pg_ls_archive_statusdir() WHERE pg_ls_archive_statusdir.name ~ '^[0-9A-F]{24}.ready';") res = c.fetchone() - return res + return res[0] def xlog_since_last_bb_callback(self): From 3b5e7d805df3fe22e94e885a2e760b74164b990c Mon Sep 17 00:00:00 2001 From: Nassim ZERKA Date: Mon, 13 Jun 2022 16:50:23 +0200 Subject: [PATCH 4/9] refacto: correct compression label and suppress fuse metric --- exporter.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/exporter.py b/exporter.py index dbd5997..281cd18 100644 --- a/exporter.py +++ b/exporter.py @@ -173,15 +173,13 @@ def __init__(self): ['compression'], unit='bytes') self.last_backup_size.labels('compressed').set_function( - lambda: (self.bbs[len(self.bbs) - 1]['uncompressed_size'] + lambda: (self.bbs[len(self.bbs) - 1]['compressed_size'] if self.bbs else 0) ) self.last_backup_size.labels('uncompressed').set_function( - lambda: (self.bbs[len(self.bbs) - 1]['compressed_size'] + lambda: (self.bbs[len(self.bbs) - 1]['uncompressed_size'] if self.bbs else 0) ) - self.walg_backup_fuse = Gauge('walg_backup_fuse',"0 backup fuse is OK, 1 backup fuse is burnt") - self.walg_backup_fuse.set_function(self.backup_fuse_callback) # Fetch remote base backups self.update_basebackup() @@ -293,9 +291,6 @@ def xlog_since_last_bb_callback(self): else: return 0 - def backup_fuse_callback(self): - return int(os.path.exists('/tmp/failed_pg_archive')) - if __name__ == '__main__': info("Startup...") info('My PID is: %s', os.getpid()) From b06df73c2e0cf436ceb9bc2777f5926e56336251 Mon Sep 17 00:00:00 2001 From: Nassim ZERKA Date: Wed, 15 Jun 2022 17:37:38 +0200 Subject: [PATCH 5/9] add to some metrics the label 'backup' whose values are 'delta' or 'full' --- exporter.py | 32 ++++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/exporter.py b/exporter.py index 281cd18..aea753a 100644 --- a/exporter.py +++ b/exporter.py @@ -99,7 +99,12 @@ def wal_diff(a, b): b_int = int(b[8:16], 16) * 0x100 + int(b[16:24], 16) return a_int - b_int +def is_delta(bb): + if re.match(r"^.*_D_.*$", bb['backup_name']): + return 'delta' + else: + return 'full' class Exporter(): def __init__(self): @@ -113,7 +118,7 @@ def __init__(self): # Declare metrics self.basebackup = Gauge('walg_basebackup', 'Remote Basebackups', - ['start_wal_segment', 'start_lsn'], + ['start_wal_segment', 'start_lsn', 'backup'], unit='seconds') self.basebackup_count = Gauge('walg_basebackup_count', 'Remote Basebackups count') @@ -121,22 +126,23 @@ def __init__(self): self.last_upload = Gauge('walg_last_upload', 'Last upload of incremental or full backup', - ['type'], + ['type', 'backup'], unit='seconds') #Set the time of last uplaod to 0 if none is retieved from pg_stat_archiver table if self.last_xlog_upload_callback is not None: - self.last_upload.labels('xlog').set('0.0') + self.last_upload.labels('xlog', is_delta(self.bbs[len(self.bbs) - 1])).set('0.0') else: - self.last_upload.labels('xlog').set_function( + self.last_upload.labels('xlog', is_delta(self.bbs[len(self.bbs) - 1])).set_function( self.last_xlog_upload_callback) - self.last_upload.labels('basebackup').set_function( + self.last_upload.labels('basebackup', is_delta(self.bbs[len(self.bbs) - 1])).set_function( lambda: self.bbs[len(self.bbs) - 1]['start_time'].timestamp() if self.bbs else 0 ) self.oldest_basebackup = Gauge('walg_oldest_basebackup', 'oldest full backup', + ['backup'], unit='seconds') - self.oldest_basebackup.set_function( + self.oldest_basebackup.labels(is_delta(self.bbs[0])).set_function( lambda: self.bbs[0]['start_time'].timestamp() if self.bbs else 0 ) @@ -162,21 +168,23 @@ def __init__(self): self.xlog_since_last_bb.set_function(self.xlog_since_last_bb_callback) self.last_backup_duration = Gauge('walg_last_backup_duration', - 'Duration of the last full backup') - self.last_backup_duration.set_function( + 'Duration of the last full backup', + ['backup'], + unit='seconds') + self.last_backup_duration.labels(is_delta(self.bbs[len(self.bbs) - 1])).set_function( lambda: ((self.bbs[len(self.bbs) - 1]['finish_time'] - self.bbs[len(self.bbs) - 1]['start_time']).total_seconds() if self.bbs else 0) ) self.last_backup_size = Gauge('walg_last_backup_size', 'Size of last uploaded backup. Label compression="compressed" for compressed size and compression="uncompressed" for uncompressed ', - ['compression'], + ['compression', 'backup'], unit='bytes') - self.last_backup_size.labels('compressed').set_function( + self.last_backup_size.labels('compressed', is_delta(self.bbs[len(self.bbs) - 1])).set_function( lambda: (self.bbs[len(self.bbs) - 1]['compressed_size'] if self.bbs else 0) ) - self.last_backup_size.labels('uncompressed').set_function( + self.last_backup_size.labels('uncompressed', is_delta(self.bbs[len(self.bbs) - 1])).set_function( lambda: (self.bbs[len(self.bbs) - 1]['uncompressed_size'] if self.bbs else 0) ) @@ -211,7 +219,7 @@ def update_basebackup(self, *unused): for bb in new_bbs: if bb['backup_name'] not in old_bbs_name: (self.basebackup.labels(bb['wal_file_name'], - bb['start_lsn']) + bb['start_lsn'], is_delta(bb)) .set(bb['start_time'].timestamp())) # Update backup list self.bbs = new_bbs From 4f4d0ceaf0834d3000c9ba736584ca60cdd02418 Mon Sep 17 00:00:00 2001 From: Nassim ZERKA Date: Thu, 16 Jun 2022 09:45:15 +0200 Subject: [PATCH 6/9] refacto: reevaluate the value tyo label 'backup' --- exporter.py | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/exporter.py b/exporter.py index aea753a..3639bfd 100644 --- a/exporter.py +++ b/exporter.py @@ -102,9 +102,9 @@ def wal_diff(a, b): def is_delta(bb): if re.match(r"^.*_D_.*$", bb['backup_name']): return 'delta' - else: return 'full' + class Exporter(): def __init__(self): @@ -126,23 +126,22 @@ def __init__(self): self.last_upload = Gauge('walg_last_upload', 'Last upload of incremental or full backup', - ['type', 'backup'], + ['type'], unit='seconds') #Set the time of last uplaod to 0 if none is retieved from pg_stat_archiver table if self.last_xlog_upload_callback is not None: - self.last_upload.labels('xlog', is_delta(self.bbs[len(self.bbs) - 1])).set('0.0') + self.last_upload.labels('xlog').set('0.0') else: - self.last_upload.labels('xlog', is_delta(self.bbs[len(self.bbs) - 1])).set_function( + self.last_upload.labels('xlog').set_function( self.last_xlog_upload_callback) - self.last_upload.labels('basebackup', is_delta(self.bbs[len(self.bbs) - 1])).set_function( + self.last_upload.labels('basebackup').set_function( lambda: self.bbs[len(self.bbs) - 1]['start_time'].timestamp() if self.bbs else 0 ) self.oldest_basebackup = Gauge('walg_oldest_basebackup', 'oldest full backup', - ['backup'], unit='seconds') - self.oldest_basebackup.labels(is_delta(self.bbs[0])).set_function( + self.oldest_basebackup.set_function( lambda: self.bbs[0]['start_time'].timestamp() if self.bbs else 0 ) @@ -169,22 +168,21 @@ def __init__(self): self.last_backup_duration = Gauge('walg_last_backup_duration', 'Duration of the last full backup', - ['backup'], unit='seconds') - self.last_backup_duration.labels(is_delta(self.bbs[len(self.bbs) - 1])).set_function( + self.last_backup_duration.set_function( lambda: ((self.bbs[len(self.bbs) - 1]['finish_time'] - self.bbs[len(self.bbs) - 1]['start_time']).total_seconds() if self.bbs else 0) ) self.last_backup_size = Gauge('walg_last_backup_size', 'Size of last uploaded backup. Label compression="compressed" for compressed size and compression="uncompressed" for uncompressed ', - ['compression', 'backup'], + ['compression'], unit='bytes') - self.last_backup_size.labels('compressed', is_delta(self.bbs[len(self.bbs) - 1])).set_function( + self.last_backup_size.labels('compressed').set_function( lambda: (self.bbs[len(self.bbs) - 1]['compressed_size'] if self.bbs else 0) ) - self.last_backup_size.labels('uncompressed', is_delta(self.bbs[len(self.bbs) - 1])).set_function( + self.last_backup_size.labels('uncompressed').set_function( lambda: (self.bbs[len(self.bbs) - 1]['uncompressed_size'] if self.bbs else 0) ) From 1a2a00a62037479e920d4543e12bc701a55bc7e2 Mon Sep 17 00:00:00 2001 From: Nassim ZERKA Date: Thu, 7 Jul 2022 23:12:52 +0200 Subject: [PATCH 7/9] feat: make sure to refresh metrics at each tick of thread timer --- exporter.py | 173 +++++++++++++++++++++------------------------------- 1 file changed, 69 insertions(+), 104 deletions(-) diff --git a/exporter.py b/exporter.py index 3639bfd..bece14a 100644 --- a/exporter.py +++ b/exporter.py @@ -1,6 +1,5 @@ import os import os.path -import signal import subprocess import json import datetime @@ -8,7 +7,6 @@ import argparse import logging import time -import sys import threading from logging import warning, info, debug, error # noqa: F401 @@ -17,7 +15,6 @@ import psycopg2 from psycopg2.extras import DictCursor - # Configuration # ------------- @@ -32,7 +29,7 @@ else: logging.basicConfig( format='%(asctime)s %(levelname)-8s %(message)s', - level=logging.DEBUG, + level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S') # Disable logging of libs @@ -42,6 +39,7 @@ http_port = int(os.getenv("WALG_EXPORTER_PORT", "9351")) + # Base backup update # ------------------ @@ -99,12 +97,14 @@ def wal_diff(a, b): b_int = int(b[8:16], 16) * 0x100 + int(b[16:24], 16) return a_int - b_int + def is_delta(bb): if re.match(r"^.*_D_.*$", bb['backup_name']): return 'delta' else: return 'full' + class Exporter(): def __init__(self): @@ -116,39 +116,11 @@ def __init__(self): self.archive_status = None # Declare metrics - self.basebackup = Gauge('walg_basebackup', - 'Remote Basebackups', - ['start_wal_segment', 'start_lsn', 'backup'], - unit='seconds') - self.basebackup_count = Gauge('walg_basebackup_count', - 'Remote Basebackups count') - self.basebackup_count.set_function(lambda: len(self.bbs)) - - self.last_upload = Gauge('walg_last_upload', - 'Last upload of incremental or full backup', - ['type'], - unit='seconds') - #Set the time of last uplaod to 0 if none is retieved from pg_stat_archiver table - if self.last_xlog_upload_callback is not None: - self.last_upload.labels('xlog').set('0.0') - else: - self.last_upload.labels('xlog').set_function( - self.last_xlog_upload_callback) - self.last_upload.labels('basebackup').set_function( - lambda: self.bbs[len(self.bbs) - 1]['start_time'].timestamp() - if self.bbs else 0 - ) - self.oldest_basebackup = Gauge('walg_oldest_basebackup', - 'oldest full backup', - unit='seconds') - self.oldest_basebackup.set_function( - lambda: self.bbs[0]['start_time'].timestamp() if self.bbs else 0 - ) - - self.xlog_ready = Gauge('walg_missing_remote_wal_segment_at_end', - 'Xlog ready for upload') - self.xlog_ready.set_function(self.xlog_ready_callback) - + self.basebackup = Gauge('walg_basebackup', 'Remote Basebackups', ['start_wal_segment', 'start_lsn', 'backup'], unit='seconds') + self.basebackup_count = Gauge('walg_basebackup_count', 'Remote Basebackups count') + self.last_upload = Gauge('walg_last_upload', 'Last upload of incremental or full backup', ['type'], unit='seconds') + self.oldest_basebackup = Gauge('walg_oldest_basebackup', 'oldest full backup', unit='seconds') + self.xlog_ready = Gauge('walg_missing_remote_wal_segment_at_end', 'Xlog ready for upload') self.exception = Gauge('walg_exception', 'Wal-g exception: ' '0 : no exception everything is OK, ' @@ -156,50 +128,37 @@ def __init__(self): '2 : no archives found in local, ' '3 : basebackup and xlog errors ' '4 : remote is unreachable, ' - '6 : no archives found in local & remote is unreachable , ') - self.exception.set_function( - lambda: ((1 if self.basebackup_exception else 0) + - (2 if self.xlog_exception else 0) + - (4 if self.remote_exception else 0) )) - - self.xlog_since_last_bb = Gauge('walg_xlogs_since_basebackup', - 'Xlog uploaded since last base backup') - self.xlog_since_last_bb.set_function(self.xlog_since_last_bb_callback) + '6 : no archives found in local & remote is unreachable.') + + self.xlog_since_last_bb = Gauge('walg_xlogs_since_basebackup', 'Xlog uploaded since last base backup') + + + self.last_backup_duration = Gauge('walg_last_backup_duration', 'Duration of the last full backup', unit='seconds') + + self.last_backup_size = Gauge('walg_last_backup_size', 'Size of last uploaded backup. Label compression="compressed" for compressed size and compression="uncompressed" for uncompressed ', ['compression'], unit='bytes') + self.fetch_metrics() + + def fetch_metrics(self): - self.last_backup_duration = Gauge('walg_last_backup_duration', - 'Duration of the last full backup', - unit='seconds') - self.last_backup_duration.set_function( - lambda: ((self.bbs[len(self.bbs) - 1]['finish_time'] - - self.bbs[len(self.bbs) - 1]['start_time']).total_seconds() - if self.bbs else 0) - ) - self.last_backup_size = Gauge('walg_last_backup_size', - 'Size of last uploaded backup. Label compression="compressed" for compressed size and compression="uncompressed" for uncompressed ', - ['compression'], - unit='bytes') - self.last_backup_size.labels('compressed').set_function( - lambda: (self.bbs[len(self.bbs) - 1]['compressed_size'] - if self.bbs else 0) - ) - self.last_backup_size.labels('uncompressed').set_function( - lambda: (self.bbs[len(self.bbs) - 1]['uncompressed_size'] - if self.bbs else 0) - ) - # Fetch remote base backups self.update_basebackup() + self.basebackup_count.set_function(lambda: len(self.bbs)) + if self.last_xlog_upload_callback is not None: + self.last_upload.labels('xlog').set('0.0') + else: + self.last_upload.labels('xlog').set_function(self.last_xlog_upload_callback) + self.last_upload.labels('basebackup').set_function(lambda: self.bbs[len(self.bbs) - 1]['start_time'].timestamp() if self.bbs else 0 ) + self.oldest_basebackup.set_function(lambda: self.bbs[0]['start_time'].timestamp() if self.bbs else 0) + self.xlog_ready.set_function(self.xlog_ready_callback) + self.exception.set_function(lambda: ((1 if self.basebackup_exception else 0) + (2 if self.xlog_exception else 0) + (4 if self.remote_exception else 0))) + self.xlog_since_last_bb.set_function(self.xlog_since_last_bb_callback) + self.last_backup_duration.set_function(lambda: ((self.bbs[len(self.bbs) - 1]['finish_time'] - self.bbs[len(self.bbs) - 1]['start_time']).total_seconds() if self.bbs else 0)) + self.last_backup_size.labels('compressed').set_function(lambda: (self.bbs[len(self.bbs) - 1]['compressed_size'] if self.bbs else 0)) + self.last_backup_size.labels('uncompressed').set_function(lambda: (self.bbs[len(self.bbs) - 1]['uncompressed_size'] if self.bbs else 0)) def update_basebackup(self, *unused): - """ - Update metrics about basebackup by calling backup-list - """ - - info('Updating basebackups metrics...') try: # Fetch remote backup list - res = subprocess.run(["wal-g", "backup-list", - "--detail", "--json"], - capture_output=True, check=True) + res = subprocess.run(["wal-g", "backup-list", "--detail", "--json"], capture_output=True, check=True) new_bbs = list(map(format_date, json.loads(res.stdout))) new_bbs.sort(key=lambda bb: bb['start_time']) new_bbs_name = [bb['backup_name'] for bb in new_bbs] @@ -211,13 +170,12 @@ def update_basebackup(self, *unused): if bb['backup_name'] not in new_bbs_name: # Backup deleted self.basebackup.remove(bb['wal_file_name'], - bb['start_lsn']) + bb['start_lsn'], is_delta(bb)) bb_deleted = bb_deleted + 1 # Add metrics for new backups for bb in new_bbs: if bb['backup_name'] not in old_bbs_name: - (self.basebackup.labels(bb['wal_file_name'], - bb['start_lsn'], is_delta(bb)) + (self.basebackup.labels(bb['wal_file_name'], bb['start_lsn'], is_delta(bb)) .set(bb['start_time'].timestamp())) # Update backup list self.bbs = new_bbs @@ -226,14 +184,20 @@ def update_basebackup(self, *unused): self.bbs[0]['start_time'], self.bbs[len(self.bbs) - 1]['start_time'], bb_deleted) - + self.remote_exception = False self.basebackup_exception = False except subprocess.CalledProcessError as e: error(e.stderr) self.remote_exception = True + for bb in self.bbs: + self.basebackup.remove(bb['wal_file_name'], bb['start_lsn'], is_delta(bb)) + self.bbs = [] except json.decoder.JSONDecodeError: info(res.stderr) self.basebackup_exception = True + for bb in self.bbs: + self.basebackup.remove(bb['wal_file_name'], bb['start_lsn'], is_delta(bb)) + self.bbs = [] def last_archive_status(self): if (self.last_archive_check is None or @@ -245,26 +209,26 @@ def last_archive_status(self): def _last_archive_status(self): with psycopg2.connect( - host=os.getenv('PGHOST', 'localhost'), - port=os.getenv('PGPORT', '5432'), - user=os.getenv('PGUSER', 'postgres'), - password=os.getenv('PGPASSWORD'), - dbname=os.getenv('PGDATABASE', 'postgres'), + host=os.getenv('PGHOST', 'localhost'), + port=os.getenv('PGPORT', '5432'), + user=os.getenv('PGUSER', 'postgres'), + password=os.getenv('PGPASSWORD'), + dbname=os.getenv('PGDATABASE', 'postgres'), ) as db_connection: db_connection.autocommit = True with db_connection.cursor(cursor_factory=DictCursor) as c: c.execute('SELECT archived_count, failed_count, ' - 'last_archived_wal, ' - 'last_archived_time, ' - 'last_failed_wal, ' - 'last_failed_time ' - 'FROM pg_stat_archiver') + 'last_archived_wal, ' + 'last_archived_time, ' + 'last_failed_wal, ' + 'last_failed_time ' + 'FROM pg_stat_archiver') res = c.fetchone() - # When last_archived_wal & last_archived_time have no values in pg_stat_archiver table (i.e.: archive_mode='off' ) if not (bool(res[2]) and bool(res[3])): - info("Cannot fetch archive status. Postgresql archive_mode should be enabled") self.xlog_exception = True + else: + self.xlog_exception = False return res def last_xlog_upload_callback(self): @@ -274,20 +238,20 @@ def last_xlog_upload_callback(self): def xlog_ready_callback(self): with psycopg2.connect( - host=os.getenv('PGHOST', 'localhost'), - port=os.getenv('PGPORT', '5432'), - user=os.getenv('PGUSER', 'postgres'), - password=os.getenv('PGPASSWORD'), - dbname=os.getenv('PGDATABASE', 'postgres'), + host=os.getenv('PGHOST', '127.0.0.1'), + port=os.getenv('PGPORT', '5432'), + user=os.getenv('PGUSER', 'postgres'), + password=os.getenv('PGPASSWORD', 'pgpass'), + dbname=os.getenv('PGDATABASE', 'postgres'), ) as db_connection: db_connection.autocommit = True with db_connection.cursor(cursor_factory=psycopg2.extras.DictCursor) as c: - c.execute("SELECT COUNT(*) FROM pg_ls_archive_statusdir() WHERE pg_ls_archive_statusdir.name ~ '^[0-9A-F]{24}.ready';") + c.execute( + "SELECT COUNT(*) FROM pg_ls_archive_statusdir() WHERE pg_ls_archive_statusdir.name ~ '^[0-9A-F]{24}.ready';") res = c.fetchone() return res[0] - def xlog_since_last_bb_callback(self): # Compute xlog_since_last_basebackup archive_status = self.last_archive_status() @@ -297,6 +261,7 @@ def xlog_since_last_bb_callback(self): else: return 0 + if __name__ == '__main__': info("Startup...") info('My PID is: %s', os.getpid()) @@ -309,11 +274,11 @@ def xlog_since_last_bb_callback(self): while True: try: with psycopg2.connect( - host=os.getenv('PGHOST', 'localhost'), - port=os.getenv('PGPORT', '5432'), - user=os.getenv('PGUSER', 'postgres'), - password=os.getenv('PGPASSWORD'), - dbname=os.getenv('PGDATABASE', 'postgres'), + host=os.getenv('PGHOST', 'localhost'), + port=os.getenv('PGPORT', '5432'), + user=os.getenv('PGUSER', 'postgres'), + password=os.getenv('PGPASSWORD'), + dbname=os.getenv('PGDATABASE', 'postgres'), ) as db_connection: db_connection.autocommit = True @@ -337,4 +302,4 @@ def xlog_since_last_bb_callback(self): ticker = threading.Event() while not ticker.wait(update_basebackup_interval): - exporter.update_basebackup() + exporter.fetch_metrics() From 1b46900e2a2e9a147ff1bb14887150b7ddaca6a4 Mon Sep 17 00:00:00 2001 From: Nassim ZERKA Date: Mon, 11 Jul 2022 14:50:59 +0200 Subject: [PATCH 8/9] refactor: separate metrics declarartion and fetching --- exporter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter.py b/exporter.py index bece14a..fc7b33c 100644 --- a/exporter.py +++ b/exporter.py @@ -298,7 +298,7 @@ def xlog_since_last_bb_callback(self): exporter = Exporter() # The periodic interval to update basebackup metrics, defaults to 15 minutes - update_basebackup_interval = float(os.getenv("UPDATE_BASEBACKUP_INTERVAL", "900")) + update_basebackup_interval = float(os.getenv("UPDATE_BASEBACKUP_INTERVAL", "120")) ticker = threading.Event() while not ticker.wait(update_basebackup_interval): From 230d5e2b824fbb5e106b06921d1d424e76af06d9 Mon Sep 17 00:00:00 2001 From: Nassim ZERKA Date: Tue, 12 Jul 2022 14:05:59 +0200 Subject: [PATCH 9/9] feat: flush metrics after switch of master --- exporter.py | 79 ++++++++++++++++++++++++++++++++++------------------- 1 file changed, 51 insertions(+), 28 deletions(-) diff --git a/exporter.py b/exporter.py index fc7b33c..7ec043d 100644 --- a/exporter.py +++ b/exporter.py @@ -97,7 +97,6 @@ def wal_diff(a, b): b_int = int(b[8:16], 16) * 0x100 + int(b[16:24], 16) return a_int - b_int - def is_delta(bb): if re.match(r"^.*_D_.*$", bb['backup_name']): return 'delta' @@ -140,6 +139,7 @@ def __init__(self): def fetch_metrics(self): + self.is_primary() self.update_basebackup() self.basebackup_count.set_function(lambda: len(self.bbs)) if self.last_xlog_upload_callback is not None: @@ -241,7 +241,7 @@ def xlog_ready_callback(self): host=os.getenv('PGHOST', '127.0.0.1'), port=os.getenv('PGPORT', '5432'), user=os.getenv('PGUSER', 'postgres'), - password=os.getenv('PGPASSWORD', 'pgpass'), + password=os.getenv('PGPASSWORD'), dbname=os.getenv('PGDATABASE', 'postgres'), ) as db_connection: @@ -261,6 +261,52 @@ def xlog_since_last_bb_callback(self): else: return 0 + def flush_metrics(self): + + self.basebackup_count.set('0.0') + self.last_upload.labels('xlog').set('0.0') + self.last_upload.labels('basebackup').set('0.0') + self.oldest_basebackup.set('0.0') + self.xlog_ready.set('0.0') + self.exception.set('0.0') + self.xlog_since_last_bb.set('0.0') + self.last_backup_duration.set('0.0') + self.last_backup_size.labels('compressed').set('0.0') + self.last_backup_size.labels('uncompressed').set('0.0') + for bb in self.bbs: + self.basebackup.remove(bb['wal_file_name'], bb['start_lsn'], is_delta(bb)) + + # Check if this is a master instance + def is_primary(self): + while True: + try: + with psycopg2.connect( + host=os.getenv('PGHOST', 'localhost'), + port=os.getenv('PGPORT', '5432'), + user=os.getenv('PGUSER', 'postgres'), + password=os.getenv('PGPASSWORD'), + dbname=os.getenv('PGDATABASE', 'postgres'), + + ) as db_connection: + db_connection.autocommit = True + with db_connection.cursor() as c: + c.execute("SELECT NOT pg_is_in_recovery()") + result = c.fetchone() + if bool(result) and result[0]: + break + else: + if self.bbs is not None: + self.flush_metrics() + info("Running on slave, waiting for promotion...") + time.sleep(60) + except Exception as e: + if self.bbs is not None: + self.flush_metrics() + error(f"Unable to connect to postgres server: {e}, retrying in 60sec...") + time.sleep(60) + + + if __name__ == '__main__': info("Startup...") @@ -270,36 +316,13 @@ def xlog_since_last_bb_callback(self): start_http_server(http_port) info("Webserver started on port %s", http_port) - # Check if this is a master instance - while True: - try: - with psycopg2.connect( - host=os.getenv('PGHOST', 'localhost'), - port=os.getenv('PGPORT', '5432'), - user=os.getenv('PGUSER', 'postgres'), - password=os.getenv('PGPASSWORD'), - dbname=os.getenv('PGDATABASE', 'postgres'), - - ) as db_connection: - db_connection.autocommit = True - with db_connection.cursor() as c: - c.execute("SELECT NOT pg_is_in_recovery()") - result = c.fetchone() - if bool(result) and result[0]: - break - else: - info("Running on slave, waiting for promotion...") - time.sleep(60) - except Exception as e: - error(f"Unable to connect to postgres server: {e}, retrying in 60sec...") - time.sleep(60) - # Launch exporter exporter = Exporter() - # The periodic interval to update basebackup metrics, defaults to 15 minutes - update_basebackup_interval = float(os.getenv("UPDATE_BASEBACKUP_INTERVAL", "120")) + # The periodic interval to update basebackup metrics, defaults to 5 minutes + update_basebackup_interval = float(os.getenv("UPDATE_BASEBACKUP_INTERVAL", "300")) ticker = threading.Event() + while not ticker.wait(update_basebackup_interval): exporter.fetch_metrics()