Skip to content

Commit

Permalink
Initial import
Browse files Browse the repository at this point in the history
  • Loading branch information
Marco Pracucci committed Dec 13, 2017
0 parents commit 11bf002
Show file tree
Hide file tree
Showing 10 changed files with 391 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
dist
MANIFEST
prometheus_jenkins_exporter.egg-info
__pycache__
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Prometheus exporter for PgBouncer

33 changes: 33 additions & 0 deletions config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
exporter_host: 0.0.0.0
exporter_port: 9100

pgbouncers:
-
# The pgbouncer connection string
dsn: postgresql://pgbouncer-exporter:$(PGBOUNCER_EXPORTER_PASS)@localhost:6431/pgbouncer

# The pgbouncer connection timeout in seconds.
connect_timeout: 5

# Databases to report metrics for. If omitted or empty, all databases
# will be reported.
include_databases:

# Databases to exclude from metrics reporting. If omitted or empty, all
# databases matching "include_databases" will be reported.
exclude_databases:
- pgbouncer

# Extra labels to add to all metrics exported for this pgbouncer
# instance. Required if you have configured multiple pgbouncers,
# in order to export an unique set of metrics.
extra_labels:
- name: cluster
value: 1

- dsn: postgresql://pgbouncer-exporter:$(PGBOUNCER_EXPORTER_PASS)@localhost:6432/pgbouncer
exclude_databases:
- pgbouncer
extra_labels:
- name: cluster
value: 2
Empty file.
69 changes: 69 additions & 0 deletions prometheus_pgbouncer_exporter/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import logging
import sys
import signal
import time
import argparse
from prometheus_client import start_http_server
from prometheus_client.core import REGISTRY
from pythonjsonlogger import jsonlogger
from config import Config
from collector import PgbouncersMetricsCollector


class SignalHandler():
def __init__(self):
self.shutdown = False

# Register signal handler
signal.signal(signal.SIGINT, self._on_signal_received)
signal.signal(signal.SIGTERM, self._on_signal_received)

def is_shutting_down(self):
return self.shutdown

def _on_signal_received(self, signal, frame):
logging.getLogger().info("Exporter is shutting down")
self.shutdown = True


def main():
# Parse arguments
parser = argparse.ArgumentParser()
parser.add_argument("--config", help="Path to config file", default="config.yml")
parser.add_argument("--log-level", help="Minimum log level. Accepted values are: DEBUG, INFO, WARNING, ERROR, CRITICAL", default="INFO")
args = parser.parse_args()

# Register signal handler
signal_handler = SignalHandler()

# Init logger
logHandler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter("(asctime) (levelname) (message)", datefmt="%Y-%m-%d %H:%M:%S")
logHandler.setFormatter(formatter)
logging.getLogger().addHandler(logHandler)
logging.getLogger().setLevel(args.log_level)
logging.getLogger().info("Exporter is starting up")

# Read config file
config = Config()
try:
config.read(args.config)
logging.getLogger().info(f"Config file successfully read from {args.config}")
except Exception as error:
logging.getLogger().fatal(f"Unable to read config file from {args.config}", extra={"exception": str(error)})
sys.exit(1)

# Register our custom collector
REGISTRY.register(PgbouncersMetricsCollector(config.getPgbouncers()))

# Start server
start_http_server(config.getExporterPort(), config.getExporterHost())
logging.getLogger().info(f"Exporter listening on {config.getExporterHost()}:{config.getExporterPort()}")

while not signal_handler.is_shutting_down():
time.sleep(1)

logging.getLogger().info("Exporter has shutdown")

if __name__ == '__main__':
main()
162 changes: 162 additions & 0 deletions prometheus_pgbouncer_exporter/collector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import psycopg2
import logging
from psycopg2.extras import DictCursor
from prometheus_client.core import CounterMetricFamily, GaugeMetricFamily


class PgbouncersMetricsCollector():
# TODO force type
def __init__(self, configs):
self.collectors = list(map(lambda config: PgbouncerMetricsCollector(config), configs))

def collect(self):
entries = []
metrics = {}

# Collect all metrics data
for collector in self.collectors:
entries += collector.collect()

# Instance metrics
for data in entries:
name = data["name"]
metrics[name] = metrics[name] if name in metrics else self._instanceMetric(data)
metrics[name].add_metric(value=data["value"], labels=data["labels"].values())

return metrics.values()

def _instanceMetric(self, data):
if data["type"] is "counter":
return CounterMetricFamily(data["name"], data["help"], labels=data["labels"].keys())
elif data["type"] is "gauge":
return GaugeMetricFamily(data["name"], data["help"], labels=data["labels"].keys())
else:
raise Exception(f"Unsupported metric type: {data['type']}")


class PgbouncerMetricsCollector():
# TODO force type
def __init__(self, config):
self.config = config

def collect(self):
conn = False
metrics = []
success = True

try:
# Connect to pgbouncer
conn = psycopg2.connect(dsn = self.config.getDsn(), connect_timeout = self.config.getConnectTimeout())
conn.set_session(autocommit=True)

# SHOW STATS
results = self._fetchMetrics(conn, "SHOW STATS")
if results:
results = self._filterMetricsByIncludeDatabases(results, self.config.getIncludeDatabases())
results = self._filterMetricsByExcludeDatabases(results, self.config.getExcludeDatabases())
metrics += self._exportMetrics(results, "pgbouncer_stats_", [
{ "type": "counter", "column": "total_requests", "metric": "queries_total", "help": "Total number of SQL queries pooled by pgbouncer" },
{ "type": "counter", "column": "total_query_time", "metric": "queries_duration_microseconds", "help": "Total number of microseconds spent by pgbouncer when actively connected to PostgreSQL" },
{ "type": "counter", "column": "total_received", "metric": "received_bytes_total", "help": "Total volume in bytes of network traffic received by pgbouncer" },
{ "type": "counter", "column": "total_sent", "metric": "sent_bytes_total", "help": "Total volume in bytes of network traffic sent by pgbouncer" },
], [ "database" ], self.config.getExtraLabels())
else:
success = False

# SHOW POOLS
results = self._fetchMetrics(conn, "SHOW POOLS")
if results:
results = self._filterMetricsByIncludeDatabases(results, self.config.getIncludeDatabases())
results = self._filterMetricsByExcludeDatabases(results, self.config.getExcludeDatabases())
metrics += self._exportMetrics(results, "pgbouncer_pools_", [
{ "type": "gauge", "column": "cl_active", "metric": "client_active_connections", "help": "Client connections that are linked to server connection and can process queries" },
{ "type": "gauge", "column": "cl_waiting", "metric": "client_waiting_connections", "help": "Client connections have sent queries but have not yet got a server connection" },
{ "type": "gauge", "column": "sv_active", "metric": "server_active_connections", "help": "Server connections that linked to client" },
{ "type": "gauge", "column": "sv_idle", "metric": "server_idle_connections", "help": "Server connections that unused and immediately usable for client queries" },
{ "type": "gauge", "column": "sv_used", "metric": "server_used_connections", "help": "Server connections that have been idle more than server_check_delay, so they needs server_check_query to run on it before it can be used" },
{ "type": "gauge", "column": "sv_tested", "metric": "server_testing_connections", "help": "Server connections that are currently running either server_reset_query or server_check_query" },
{ "type": "gauge", "column": "sv_login", "metric": "server_login_connections", "help": "Server connections currently in logging in process" },
{ "type": "gauge", "column": "maxwait", "metric": "client_maxwait_seconds", "help": "How long the first (oldest) client in queue has waited, in seconds" },
], [ "database", "user" ], self.config.getExtraLabels())
else:
success = False

except Exception as error:
logging.getLogger().debug(f"Unable fetch metrics from {self.config.getDnsWithMaskedPassword()}", extra={ "exception": str(error) })

success = False
finally:
if conn:
conn.close()

# Add pgbouncer_up metric
metrics.append({
"type": "gauge",
"name": "pgbouncer_up",
"value": 1 if success else 0,
"labels": self.config.getExtraLabels(),
"help": "PgBouncer is UP and the scraping of all metrics succeeded"
})

return metrics

def _exportMetrics(self, results, metricPrefix, mappings, metricLabels, extraLabels):
metrics = []

for result in results:
for mapping in mappings:
# Ensure the column exists
if not mapping["column"] in result:
continue

labels = { key: result[key] for key in metricLabels }
labels.update(extraLabels)

metrics.append({
"type": mapping["type"],
"name": f"{metricPrefix}{mapping['metric']}",
"value": result[mapping["column"]],
"labels": labels,
"help": mapping["help"]
})

return metrics

def _filterMetricsByIncludeDatabases(self, results, databases):
# No filtering if empty
# TODO test me
if not databases:
return results

# TODO test me
return list(filter(lambda item: item["database"] in databases, results))

def _filterMetricsByExcludeDatabases(self, results, databases):
# No filtering if empty
# TODO test me
if not databases:
return results

# TODO test me
return list(filter(lambda item: item["database"] not in databases, results))

def _fetchMetrics(self, conn, query):
cursor = False

try:
# Open a cursor
cursor = conn.cursor(cursor_factory=DictCursor)

# Fetch statistics
# TODO query timeout
cursor.execute(query)

return cursor.fetchall()
except Exception as error:
# TODO log
print(f"{str(error)}")

return False
finally:
if cursor:
cursor.close()
94 changes: 94 additions & 0 deletions prometheus_pgbouncer_exporter/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import yaml
import os
import re

# Define the regex used to replace $(ENV_VAR) with ENV_VAR value
ENV_VAR_REPLACER_PATTERN = re.compile(r'^(.*)\$\(([^\)]+)\)(.*)$')

# Define the regex used to mask the password in the DSN
DSN_PASSWORD_MASK_PATTERN = re.compile(r'^(.*:)([^@]+)(@.*)$')


class Config():
def __init__(self):
self.config = {}
self.pgbouncers = False

def getExporterHost(self):
return self.config["exporter_host"] if "exporter_host" in self.config else "127.0.0.1"

def getExporterPort(self):
return self.config["exporter_port"] if "exporter_port" in self.config else "9100"

def getExporterPath(self):
return self.config["exporter_path"] if "exporter_path" in self.config else "/metrics"

def getPgbouncers(self):
# Lazy instance pgbouncer config
if self.pgbouncers is False:
if "pgbouncers" in self.config:
self.pgbouncers = list(map(lambda item: PgbouncerConfig(item), self.config["pgbouncers"]))
else:
self.pgbouncers = []

return self.pgbouncers

def read(self, filepath):
stream = False

# Setup environment variables replacement
def env_var_replacer(loader, node):
value = loader.construct_scalar(node)
beforePart, envVar, afterPart = ENV_VAR_REPLACER_PATTERN.match(value).groups()

if envVar in os.environ:
return beforePart + os.environ[envVar] + afterPart
else:
return beforePart + envVar + afterPart

yaml.add_implicit_resolver ("!envvarreplacer", ENV_VAR_REPLACER_PATTERN)
yaml.add_constructor('!envvarreplacer', env_var_replacer)

# Read file
try:
stream = open(filepath, "r")
self.config = yaml.load(stream)
finally:
if stream:
stream.close()


class PgbouncerConfig():
def __init__(self, config):
self.config = config
self.labels = False

def getDsn(self):
return self.config["dsn"] if "dsn" in self.config else "postgresql://pgbouncer:@localhost:6431/pgbouncer"

# TODO test me
def getDnsWithMaskedPassword(self):
match = DSN_PASSWORD_MASK_PATTERN.match(self.getDsn())
if match:
return match.group(1) + "***" + match.group(3)
else:
return self.getDsn()

def getConnectTimeout(self):
return self.config["connect_timeout"] if "connect_timeout" in self.config else 5

def getIncludeDatabases(self):
return self.config["include_databases"] if "include_databases" in self.config else []

def getExcludeDatabases(self):
return self.config["exclude_databases"] if "exclude_databases" in self.config else []

def getExtraLabels(self):
# Lazy instance extra labels
if self.labels is False:
if "extra_labels" in self.config:
self.labels = { item["name"]: str(item["value"]) for item in self.config["extra_labels"] }
else:
self.labels = {}

return self.labels
4 changes: 4 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
psycopg2==2.7.3.2
prometheus_client==0.0.21
python-json-logger==0.1.5
pycodestyle
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[metadata]
description-file = README.md
Loading

0 comments on commit 11bf002

Please sign in to comment.