Skip to content

Commit

Permalink
Merge pull request #11 from radiofrance/refactor-exporter
Browse files Browse the repository at this point in the history
feat: refresh metrics when switchover occurs
  • Loading branch information
Nacymus authored Jul 12, 2022
2 parents d9582be + 19d6ce5 commit 0528f39
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 28 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ wal-g-prometheus-exporter
__pycache__/
.*.swp
exporter.spec
wal
venv
93 changes: 65 additions & 28 deletions exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ def wal_diff(a, b):
b_int = int(b[8:16], 16) * 0x100 + int(b[16:24], 16)
return a_int - b_int

def is_delta(bb):
if re.match(r"^.*_D_.*$", bb['backup_name']):
return 'delta'
else:
return 'full'


def is_delta(bb):
if re.match(r"^.*_D_.*$", bb['backup_name']):
Expand Down Expand Up @@ -132,15 +138,19 @@ def __init__(self):
'4 : remote is unreachable, '
'6 : no archives found in local & remote is unreachable.')

self.xlog_since_last_bb = Gauge('walg_xlogs_since_basebackup', 'Xlog uploaded since last base backup')

self.xlog_since_last_bb = Gauge('walg_xlogs_since_basebackup', 'Xlog uploaded since last base backup')

self.last_backup_duration = Gauge('walg_last_backup_duration', 'Duration of the last full backup', unit='seconds')

self.last_backup_size = Gauge('walg_last_backup_size', 'Size of last uploaded backup. Label compression="compressed" for compressed size and compression="uncompressed" for uncompressed ', ['compression'], unit='bytes')
self.fetch_metrics()

def fetch_metrics(self):


self.is_primary()

self.update_basebackup()
self.basebackup_count.set_function(lambda: len(self.bbs))
if self.last_xlog_upload_callback is not None:
Expand Down Expand Up @@ -242,7 +252,9 @@ def xlog_ready_callback(self):
host=os.getenv('PGHOST', '127.0.0.1'),
port=os.getenv('PGPORT', '5432'),
user=os.getenv('PGUSER', 'postgres'),
password=os.getenv('PGPASSWORD', 'pgpass'),

password=os.getenv('PGPASSWORD'),

dbname=os.getenv('PGDATABASE', 'postgres'),

) as db_connection:
Expand All @@ -263,6 +275,53 @@ def xlog_since_last_bb_callback(self):
return 0


def flush_metrics(self):

self.basebackup_count.set('0.0')
self.last_upload.labels('xlog').set('0.0')
self.last_upload.labels('basebackup').set('0.0')
self.oldest_basebackup.set('0.0')
self.xlog_ready.set('0.0')
self.exception.set('0.0')
self.xlog_since_last_bb.set('0.0')
self.last_backup_duration.set('0.0')
self.last_backup_size.labels('compressed').set('0.0')
self.last_backup_size.labels('uncompressed').set('0.0')
for bb in self.bbs:
self.basebackup.remove(bb['wal_file_name'], bb['start_lsn'], is_delta(bb))

# Check if this is a master instance
def is_primary(self):
while True:
try:
with psycopg2.connect(
host=os.getenv('PGHOST', 'localhost'),
port=os.getenv('PGPORT', '5432'),
user=os.getenv('PGUSER', 'postgres'),
password=os.getenv('PGPASSWORD'),
dbname=os.getenv('PGDATABASE', 'postgres'),

) as db_connection:
db_connection.autocommit = True
with db_connection.cursor() as c:
c.execute("SELECT NOT pg_is_in_recovery()")
result = c.fetchone()
if bool(result) and result[0]:
break
else:
if self.bbs is not None:
self.flush_metrics()
info("Running on slave, waiting for promotion...")
time.sleep(60)
except Exception as e:
if self.bbs is not None:
self.flush_metrics()
error(f"Unable to connect to postgres server: {e}, retrying in 60sec...")
time.sleep(60)




if __name__ == '__main__':
info("Startup...")
info('My PID is: %s', os.getpid())
Expand All @@ -271,36 +330,14 @@ def xlog_since_last_bb_callback(self):
start_http_server(http_port)
info("Webserver started on port %s", http_port)

# Check if this is a master instance
while True:
try:
with psycopg2.connect(
host=os.getenv('PGHOST', 'localhost'),
port=os.getenv('PGPORT', '5432'),
user=os.getenv('PGUSER', 'postgres'),
password=os.getenv('PGPASSWORD'),
dbname=os.getenv('PGDATABASE', 'postgres'),

) as db_connection:
db_connection.autocommit = True
with db_connection.cursor() as c:
c.execute("SELECT NOT pg_is_in_recovery()")
result = c.fetchone()
if bool(result) and result[0]:
break
else:
info("Running on slave, waiting for promotion...")
time.sleep(60)
except Exception as e:
error(f"Unable to connect to postgres server: {e}, retrying in 60sec...")
time.sleep(60)

# Launch exporter
exporter = Exporter()

# The periodic interval to update basebackup metrics, defaults to 15 minutes
update_basebackup_interval = float(os.getenv("UPDATE_BASEBACKUP_INTERVAL", "120"))
# The periodic interval to update basebackup metrics, defaults to 5 minutes
update_basebackup_interval = float(os.getenv("UPDATE_BASEBACKUP_INTERVAL", "300"))


ticker = threading.Event()

while not ticker.wait(update_basebackup_interval):
exporter.fetch_metrics()

0 comments on commit 0528f39

Please sign in to comment.