Compare commits

..

No commits in common. "dev/locks" and "main" have entirely different histories.

8 changed files with 429 additions and 1113 deletions

1
.gitignore vendored
View File

@ -8,4 +8,3 @@ missing
__pycache__ __pycache__
venv venv
build build
.coverage

View File

@ -19,12 +19,6 @@ RPM_VERSION := $(VERSION)-$(RPM_RELEASE)
DEB_VERSION := $(VERSION)~rc$(RELEASE) DEB_VERSION := $(VERSION)~rc$(RELEASE)
endif endif
# Python stuff
PYTHON ?= python3
PYLINT ?= pylint
BLACK ?= black
HAVE_COVERAGE := $(shell $(PYTHON) -c 'import coverage' 2>/dev/null && echo yes)
# Where packages are built # Where packages are built
BUILD_DIR := build BUILD_DIR := build
@ -44,7 +38,7 @@ SUPPORTED := ubuntu-20.04 \
# These targets are the main ones to use for most things. # These targets are the main ones to use for most things.
## ##
.PHONY: all clean tgz lint format test query-tests install-common install-openrc install-systemd .PHONY: all clean tgz test query-tests install-common install-openrc install-systemd
all: package-all all: package-all
@ -86,35 +80,14 @@ tgz:
clean: clean:
rm -rf $(BUILD_DIR) rm -rf $(BUILD_DIR)
# Check for lint
lint:
$(PYLINT) src/pgmon.py
$(PYLINT) src/test_pgmon.py
$(BLACK) --check --diff src/pgmon.py
$(BLACK) --check --diff src/test_pgmon.py
# Format the code using black
format:
$(BLACK) src/pgmon.py
$(BLACK) src/test_pgmon.py
# Run unit tests for the script # Run unit tests for the script
test: test:
ifeq ($(HAVE_COVERAGE),yes) cd src ; python3 -m unittest
cd src ; $(PYTHON) -m coverage run -m unittest && python3 -m coverage report -m
else
cd src ; $(PYTHON) -m unittest
endif
# Run query tests # Run query tests
query-tests: query-tests:
cd tests ; ./run-tests.sh cd tests ; ./run-tests.sh
# Compare the sample metrics with the Zabbix template
template-coverage:
$(PYTHON) zabbix_templates/coverage.py sample-config/pgmon-metrics.yml zabbix_templates/pgmon_templates.yaml
# Install the script at the specified base directory (common components) # Install the script at the specified base directory (common components)
install-common: install-common:
# Set up directories # Set up directories

View File

@ -1,4 +0,0 @@
[MASTER]
py-version=3.5
disable=fixme

View File

@ -18,7 +18,6 @@ metrics:
query: query:
0: > 0: >
SELECT host(client_addr) || '_' || regexp_replace(application_name, '[ ,]', '_', 'g') AS repid, SELECT host(client_addr) || '_' || regexp_replace(application_name, '[ ,]', '_', 'g') AS repid,
application_name,
client_addr, client_addr,
state state
FROM pg_stat_replication FROM pg_stat_replication
@ -230,19 +229,7 @@ metrics:
SELECT COUNT(*) FILTER (WHERE has_sequence_privilege(c.oid, 'SELECT,USAGE')) AS visible_sequences, SELECT COUNT(*) FILTER (WHERE has_sequence_privilege(c.oid, 'SELECT,USAGE')) AS visible_sequences,
COUNT(*) AS total_sequences COUNT(*) AS total_sequences
FROM pg_class AS c FROM pg_class AS c
WHERE relkind = 'S' WHERE relkind = 'S';
locks:
type: row
query:
0:
SELECT COUNT(*) AS total,
SUM(CASE WHEN granted THEN 1 ELSE 0 END) AS granted
FROM pg_locks
90400: >
SELECT COUNT(*) AS total,
COUNT(*) FILTER (WHERE granted) AS granted
FROM pg_locks
## ##

View File

@ -1,147 +1,105 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""
pgmon is a monitoring intermediary that sits between a PostgreSQL cluster and a monitoring systen
that is capable of parsing JSON responses over an HTTP connection.
"""
# pylint: disable=too-few-public-methods
import yaml
import json import json
import time import time
import os import os
import sys import sys
import signal
import argparse import argparse
import logging import logging
import re
from decimal import Decimal
from urllib.parse import urlparse, parse_qs
from contextlib import contextmanager
from datetime import datetime, timedelta from datetime import datetime, timedelta
from http.server import BaseHTTPRequestHandler
from http.server import ThreadingHTTPServer
from threading import Lock
import yaml
import psycopg2 import psycopg2
from psycopg2.extras import RealDictCursor from psycopg2.extras import RealDictCursor
from psycopg2.pool import ThreadedConnectionPool from psycopg2.pool import ThreadedConnectionPool
import requests from contextlib import contextmanager
import signal
from threading import Thread, Lock, Semaphore
from http.server import BaseHTTPRequestHandler, HTTPServer
from http.server import ThreadingHTTPServer
from urllib.parse import urlparse, parse_qs
import requests
import re
from decimal import Decimal
VERSION = "1.0.4" VERSION = "1.0.4"
# Configuration
config = {}
class Context: # Dictionary of current PostgreSQL connection pools
""" connections_lock = Lock()
The global context for connections, config, version, nad IPC connections = {}
"""
# Configuration # Dictionary of unhappy databases. Keys are database names, value is the time
config = {} # the database was determined to be unhappy plus the cooldown setting. So,
# basically it's the time when we should try to connect to the database again.
unhappy_cooldown = {}
# Dictionary of current PostgreSQL connection pools # Version information
connections_lock = Lock() cluster_version = None
connections = {} cluster_version_next_check = None
cluster_version_lock = Lock()
# Dictionary of unhappy databases. Keys are database names, value is the time # PostgreSQL latest version information
# the database was determined to be unhappy plus the cooldown setting. So, latest_version = None
# basically it's the time when we should try to connect to the database again. latest_version_next_check = None
unhappy_cooldown = {} latest_version_lock = Lock()
release_supported = None
# Version information # Running state (used to gracefully shut down)
cluster_version = None running = True
cluster_version_next_check = None
cluster_version_lock = Lock()
# PostgreSQL latest version information # The http server object
latest_version = None httpd = None
latest_version_next_check = None
latest_version_lock = Lock()
release_supported = None
# Running state (used to gracefully shut down) # Where the config file lives
running = True config_file = None
# The http server object # Configure logging
httpd = None log = logging.getLogger(__name__)
formatter = logging.Formatter(
# Where the config file lives "%(asctime)s - %(levelname)s - %(filename)s: %(funcName)s() line %(lineno)d: %(message)s"
config_file = None )
console_log_handler = logging.StreamHandler()
# Configure logging console_log_handler.setFormatter(formatter)
log = logging.getLogger(__name__) log.addHandler(console_log_handler)
@classmethod
def init_logging(cls):
"""
Actually initialize the logging framework. Since we don't ever instantiate the Context
class, this provides a way to make a few modifications to the log handler.
"""
formatter = logging.Formatter(
"%(asctime)s - %(levelname)s - %(filename)s: "
"%(funcName)s() line %(lineno)d: %(message)s"
)
console_log_handler = logging.StreamHandler()
console_log_handler.setFormatter(formatter)
cls.log.addHandler(console_log_handler)
# Error types # Error types
class ConfigError(Exception): class ConfigError(Exception):
""" pass
Error type for all config related errors.
"""
class DisconnectedError(Exception): class DisconnectedError(Exception):
""" pass
Error indicating a previously active connection to the database has been disconnected.
"""
class UnhappyDBError(Exception): class UnhappyDBError(Exception):
""" pass
Error indicating that a database the code has been asked to connect to is on the unhappy list.
"""
class UnknownMetricError(Exception): class UnknownMetricError(Exception):
""" pass
Error indicating that an undefined metric was requested.
"""
class MetricVersionError(Exception): class MetricVersionError(Exception):
""" pass
Error indicating that there is no suitable query for a metric that was requested for the
version of PostgreSQL being monitored.
"""
class LatestVersionCheckError(Exception): class LatestVersionCheckError(Exception):
""" pass
Error indicating that there was a problem retrieving or parsing the latest version information.
"""
class InvalidDataError(Exception):
"""
Error indicating query results were somehow invalid
"""
# Default config settings # Default config settings
DEFAULT_CONFIG = { default_config = {
# The address the agent binds to # The address the agent binds to
"address": "127.0.0.1", "address": "127.0.0.1",
# The port the agent listens on for requests # The port the agent listens on for requests
@ -219,60 +177,6 @@ def update_deep(d1, d2):
return d1 return d1
def validate_metric(path, name, metric):
"""
Validate a metric definition from a given file. If any query definitions come from external
files, the metric dict will be updated with the actual query.
Params:
path: path to the file which contains this definition
name: name of the metric
metric: the dictionary containing the metric definition
"""
# Validate return types
try:
if metric["type"] not in ["value", "row", "column", "set"]:
raise ConfigError(
"Invalid return type: {} for metric {} in {}".format(
metric["type"], name, path
)
)
except KeyError as e:
raise ConfigError(
"No type specified for metric {} in {}".format(name, path)
) from e
# Ensure queries exist
query_dict = metric.get("query", {})
if not isinstance(query_dict, dict):
raise ConfigError(
"Query definition should be a dictionary, got: {} for metric {} in {}".format(
query_dict, name, path
)
)
if len(query_dict) == 0:
raise ConfigError("Missing queries for metric {} in {}".format(name, path))
# Read external sql files and validate version keys
config_base = os.path.dirname(path)
for vers, query in metric["query"].items():
try:
int(vers)
except Exception as e:
raise ConfigError(
"Invalid version: {} for metric {} in {}".format(vers, name, path)
) from e
# Read in the external query and update the definition in the metricdictionary
if query.startswith("file:"):
query_path = query[5:]
if not query_path.startswith("/"):
query_path = os.path.join(config_base, query_path)
with open(query_path, "r", encoding="utf-8") as f:
metric["query"][vers] = f.read()
def read_config(path, included=False): def read_config(path, included=False):
""" """
Read a config file. Read a config file.
@ -281,21 +185,61 @@ def read_config(path, included=False):
path: path to the file to read path: path to the file to read
included: is this file included by another file? included: is this file included by another file?
""" """
# Read config file # Read config file
Context.log.info("Reading log file: %s", path) log.info("Reading log file: {}".format(path))
with open(path, "r", encoding="utf-8") as f: with open(path, "r") as f:
try: try:
cfg = yaml.safe_load(f) cfg = yaml.safe_load(f)
except yaml.parser.ParserError as e: except yaml.parser.ParserError as e:
raise ConfigError("Inavlid config file: {}: {}".format(path, e)) from e raise ConfigError("Inavlid config file: {}: {}".format(path, e))
# Since we use it a few places, get the base directory from the config
config_base = os.path.dirname(path)
# Read any external queries and validate metric definitions # Read any external queries and validate metric definitions
for name, metric in cfg.get("metrics", {}).items(): for name, metric in cfg.get("metrics", {}).items():
validate_metric(path, name, metric) # Validate return types
try:
if metric["type"] not in ["value", "row", "column", "set"]:
raise ConfigError(
"Invalid return type: {} for metric {} in {}".format(
metric["type"], name, path
)
)
except KeyError:
raise ConfigError(
"No type specified for metric {} in {}".format(name, path)
)
# Ensure queries exist
query_dict = metric.get("query", {})
if type(query_dict) is not dict:
raise ConfigError(
"Query definition should be a dictionary, got: {} for metric {} in {}".format(
query_dict, name, path
)
)
if len(query_dict) == 0:
raise ConfigError("Missing queries for metric {} in {}".format(name, path))
# Read external sql files and validate version keys
for vers, query in metric["query"].items():
try:
int(vers)
except:
raise ConfigError(
"Invalid version: {} for metric {} in {}".format(vers, name, path)
)
if query.startswith("file:"):
query_path = query[5:]
if not query_path.startswith("/"):
query_path = os.path.join(config_base, query_path)
with open(query_path, "r") as f:
metric["query"][vers] = f.read()
# Read any included config files # Read any included config files
config_base = os.path.dirname(path)
for inc in cfg.get("include", []): for inc in cfg.get("include", []):
# Prefix relative paths with the directory from the current config # Prefix relative paths with the directory from the current config
if not inc.startswith("/"): if not inc.startswith("/"):
@ -306,37 +250,34 @@ def read_config(path, included=False):
# config # config
if included: if included:
return cfg return cfg
else:
new_config = {}
update_deep(new_config, default_config)
update_deep(new_config, cfg)
new_config = {} # Minor sanity checks
update_deep(new_config, DEFAULT_CONFIG) if len(new_config["metrics"]) == 0:
update_deep(new_config, cfg) log.error("No metrics are defined")
raise ConfigError("No metrics defined")
# Minor sanity checks # Validate the new log level before changing the config
if len(new_config["metrics"]) == 0: if new_config["log_level"].upper() not in [
Context.log.error("No metrics are defined") "DEBUG",
raise ConfigError("No metrics defined") "INFO",
"WARNING",
"ERROR",
"CRITICAL",
]:
raise ConfigError("Invalid log level: {}".format(new_config["log_level"]))
# Validate the new log level before changing the config global config
if new_config["log_level"].upper() not in [ config = new_config
"DEBUG",
"INFO",
"WARNING",
"ERROR",
"CRITICAL",
]:
raise ConfigError("Invalid log level: {}".format(new_config["log_level"]))
Context.config = new_config # Apply changes to log level
log.setLevel(logging.getLevelName(config["log_level"].upper()))
# Apply changes to log level
Context.log.setLevel(logging.getLevelName(Context.config["log_level"].upper()))
# Return the config (mostly to make pylint happy, but also in case I opt to remove the side
# effect and make this more functional.
return Context.config
def signal_handler(sig, frame): # pylint: disable=unused-argument def signal_handler(sig, frame):
""" """
Function for handling signals Function for handling signals
@ -347,22 +288,19 @@ def signal_handler(sig, frame): # pylint: disable=unused-argument
# Signal everything to shut down # Signal everything to shut down
if sig in [signal.SIGINT, signal.SIGTERM, signal.SIGQUIT]: if sig in [signal.SIGINT, signal.SIGTERM, signal.SIGQUIT]:
Context.log.info("Shutting down ...") log.info("Shutting down ...")
Context.running = False global running
if Context.httpd is not None: running = False
Context.httpd.socket.close() if httpd is not None:
httpd.socket.close()
# Signal a reload # Signal a reload
if sig == signal.SIGHUP: if sig == signal.SIGHUP:
Context.log.warning("Received config reload signal") log.warning("Received config reload signal")
read_config(Context.config_file) read_config(config_file)
class ConnectionPool(ThreadedConnectionPool): class ConnectionPool(ThreadedConnectionPool):
"""
Threaded connection pool that has a context manager.
"""
def __init__(self, dbname, minconn, maxconn, *args, **kwargs): def __init__(self, dbname, minconn, maxconn, *args, **kwargs):
# Make sure dbname isn't different in the kwargs # Make sure dbname isn't different in the kwargs
kwargs["dbname"] = dbname kwargs["dbname"] = dbname
@ -371,14 +309,7 @@ class ConnectionPool(ThreadedConnectionPool):
self.name = dbname self.name = dbname
@contextmanager @contextmanager
def connection(self, timeout): def connection(self, timeout=None):
"""
Connection context manager for our connection pool. This will attempt to retrieve a
connection until the timeout is reached.
Params:
timeout: how long to keep trying to get a connection bedore giving up
"""
conn = None conn = None
timeout_time = datetime.now() + timedelta(timeout) timeout_time = datetime.now() + timedelta(timeout)
# We will continue to try to get a connection slot until we time out # We will continue to try to get a connection slot until we time out
@ -402,37 +333,34 @@ class ConnectionPool(ThreadedConnectionPool):
def get_pool(dbname): def get_pool(dbname):
""" """
Get a database connection pool. Get a database connection pool.
Params:
dbname: the name of the database for which a connection pool should be returned.
""" """
# Check if the db is unhappy and wants to be left alone # Check if the db is unhappy and wants to be left alone
if dbname in Context.unhappy_cooldown: if dbname in unhappy_cooldown:
if Context.unhappy_cooldown[dbname] > datetime.now(): if unhappy_cooldown[dbname] > datetime.now():
raise UnhappyDBError() raise UnhappyDBError()
# Create a connection pool if it doesn't already exist # Create a connection pool if it doesn't already exist
if dbname not in Context.connections: if dbname not in connections:
with Context.connections_lock: with connections_lock:
# Make sure nobody created the pool while we were waiting on the # Make sure nobody created the pool while we were waiting on the
# lock # lock
if dbname not in Context.connections: if dbname not in connections:
Context.log.info("Creating connection pool for: %s", dbname) log.info("Creating connection pool for: {}".format(dbname))
# Actually create the connection pool # Actually create the connection pool
Context.connections[dbname] = ConnectionPool( connections[dbname] = ConnectionPool(
dbname, dbname,
int(Context.config["min_pool_size"]), int(config["min_pool_size"]),
int(Context.config["max_pool_size"]), int(config["max_pool_size"]),
application_name="pgmon", application_name="pgmon",
host=Context.config["dbhost"], host=config["dbhost"],
port=Context.config["dbport"], port=config["dbport"],
user=Context.config["dbuser"], user=config["dbuser"],
connect_timeout=int(Context.config["connect_timeout"]), connect_timeout=int(config["connect_timeout"]),
sslmode=Context.config["ssl_mode"], sslmode=config["ssl_mode"],
) )
# Clear the unhappy indicator if present # Clear the unhappy indicator if present
Context.unhappy_cooldown.pop(dbname, None) unhappy_cooldown.pop(dbname, None)
return Context.connections[dbname] return connections[dbname]
def handle_connect_failure(pool): def handle_connect_failure(pool):
@ -440,8 +368,8 @@ def handle_connect_failure(pool):
Mark the database as being unhappy so we can leave it alone for a while Mark the database as being unhappy so we can leave it alone for a while
""" """
dbname = pool.name dbname = pool.name
Context.unhappy_cooldown[dbname] = datetime.now() + timedelta( unhappy_cooldown[dbname] = datetime.now() + timedelta(
seconds=int(Context.config["reconnect_cooldown"]) seconds=int(config["reconnect_cooldown"])
) )
@ -472,67 +400,41 @@ def json_encode_special(obj):
""" """
if isinstance(obj, Decimal): if isinstance(obj, Decimal):
return float(obj) return float(obj)
raise TypeError("Cannot serialize object of {}".format(type(obj))) raise TypeError(f'Cannot serialize object of {type(obj)}')
def json_encode_result(return_type, res):
"""
Return a json string encoding of the results of a query.
params:
return_type: the expected structure to return. One of:
value, row, column, set
res: the query results
returns: a json string form of the results
raises:
ConfigError: when an invalid return_type is given
InvalidDataError: when the query results don't match the return type
"""
try:
if return_type == "value":
if len(res) == 0:
return ""
return str(list(res[0].values())[0])
if return_type == "row":
return json.dumps(
res[0] if len(res) > 0 else {}, default=json_encode_special
)
if return_type == "column":
return json.dumps(
[list(r.values())[0] for r in res], default=json_encode_special
)
if return_type == "set":
return json.dumps(res, default=json_encode_special)
except IndexError as e:
raise InvalidDataError(e) from e
# If we got to this point, the return type is invalid
raise ConfigError("Invalid query return type: {}".format(return_type))
def run_query_no_retry(pool, return_type, query, args): def run_query_no_retry(pool, return_type, query, args):
""" """
Run the query with no explicit retry code Run the query with no explicit retry code
""" """
with pool.connection(float(Context.config["connect_timeout"])) as conn: with pool.connection(float(config["connect_timeout"])) as conn:
try: try:
with conn.cursor(cursor_factory=RealDictCursor) as curs: with conn.cursor(cursor_factory=RealDictCursor) as curs:
curs.execute(query, args) curs.execute(query, args)
res = curs.fetchall() res = curs.fetchall()
return json_encode_result(return_type, res) if return_type == "value":
except Exception as e: if len(res) == 0:
return ""
return str(list(res[0].values())[0])
elif return_type == "row":
if len(res) == 0:
return "[]"
return json.dumps(res[0], default=json_encode_special)
elif return_type == "column":
if len(res) == 0:
return "[]"
return json.dumps([list(r.values())[0] for r in res], default=json_encode_special)
elif return_type == "set":
return json.dumps(res, default=json_encode_special)
except:
dbname = pool.name dbname = pool.name
if dbname in Context.unhappy_cooldown: if dbname in unhappy_cooldown:
raise UnhappyDBError() from e raise UnhappyDBError()
if conn.closed != 0: elif conn.closed != 0:
raise DisconnectedError() from e raise DisconnectedError()
raise else:
raise
def run_query(pool, return_type, query, args): def run_query(pool, return_type, query, args):
@ -553,7 +455,7 @@ def run_query(pool, return_type, query, args):
try: try:
return run_query_no_retry(pool, return_type, query, args) return run_query_no_retry(pool, return_type, query, args)
except DisconnectedError: except DisconnectedError:
Context.log.warning("Stale PostgreSQL connection found ... trying again") log.warning("Stale PostgreSQL connection found ... trying again")
# This sleep is an annoying hack to give the pool workers time to # This sleep is an annoying hack to give the pool workers time to
# actually mark the connection, otherwise it can be given back in the # actually mark the connection, otherwise it can be given back in the
# next connection() call # next connection() call
@ -561,9 +463,9 @@ def run_query(pool, return_type, query, args):
time.sleep(1) time.sleep(1)
try: try:
return run_query_no_retry(pool, return_type, query, args) return run_query_no_retry(pool, return_type, query, args)
except Exception as e: except:
handle_connect_failure(pool) handle_connect_failure(pool)
raise UnhappyDBError() from e raise UnhappyDBError()
def get_cluster_version(): def get_cluster_version():
@ -571,39 +473,40 @@ def get_cluster_version():
Get the PostgreSQL version if we don't already know it, or if it's been Get the PostgreSQL version if we don't already know it, or if it's been
too long sice the last time it was checked. too long sice the last time it was checked.
""" """
global cluster_version
global cluster_version_next_check
# If we don't know the version or it's past the recheck time, get the # If we don't know the version or it's past the recheck time, get the
# version from the database. Only one thread needs to do this, so they all # version from the database. Only one thread needs to do this, so they all
# try to grab the lock, and then make sure nobody else beat them to it. # try to grab the lock, and then make sure nobody else beat them to it.
if ( if (
Context.cluster_version is None cluster_version is None
or Context.cluster_version_next_check is None or cluster_version_next_check is None
or Context.cluster_version_next_check < datetime.now() or cluster_version_next_check < datetime.now()
): ):
with Context.cluster_version_lock: with cluster_version_lock:
# Only check if nobody already got the version before us # Only check if nobody already got the version before us
if ( if (
Context.cluster_version is None cluster_version is None
or Context.cluster_version_next_check is None or cluster_version_next_check is None
or Context.cluster_version_next_check < datetime.now() or cluster_version_next_check < datetime.now()
): ):
Context.log.info("Checking PostgreSQL cluster version") log.info("Checking PostgreSQL cluster version")
pool = get_pool(Context.config["dbname"]) pool = get_pool(config["dbname"])
Context.cluster_version = int( cluster_version = int(
run_query(pool, "value", "SHOW server_version_num", None) run_query(pool, "value", "SHOW server_version_num", None)
) )
Context.cluster_version_next_check = datetime.now() + timedelta( cluster_version_next_check = datetime.now() + timedelta(
seconds=int(Context.config["version_check_period"]) seconds=int(config["version_check_period"])
) )
Context.log.info( log.info("Got PostgreSQL cluster version: {}".format(cluster_version))
"Got PostgreSQL cluster version: %s", Context.cluster_version log.debug(
) "Next PostgreSQL cluster version check will be after: {}".format(
Context.log.debug( cluster_version_next_check
"Next PostgreSQL cluster version check will be after: %s", )
Context.cluster_version_next_check,
) )
return Context.cluster_version return cluster_version
def version_num_to_release(version_num): def version_num_to_release(version_num):
@ -616,7 +519,8 @@ def version_num_to_release(version_num):
""" """
if version_num // 10000 < 10: if version_num // 10000 < 10:
return version_num // 10000 + (version_num % 10000 // 100 / 10) return version_num // 10000 + (version_num % 10000 // 100 / 10)
return version_num // 10000 else:
return version_num // 10000
def parse_version_rss(raw_rss, release): def parse_version_rss(raw_rss, release):
@ -624,7 +528,7 @@ def parse_version_rss(raw_rss, release):
Parse the raw RSS from the versions.rss feed to extract the latest version of Parse the raw RSS from the versions.rss feed to extract the latest version of
PostgreSQL that's availabe for the cluster being monitored. PostgreSQL that's availabe for the cluster being monitored.
This sets these Context variables: This sets these global variables:
latest_version latest_version
release_supported release_supported
@ -634,6 +538,8 @@ def parse_version_rss(raw_rss, release):
raw_rss: The raw rss text from versions.rss raw_rss: The raw rss text from versions.rss
release: The PostgreSQL release we care about (ex: 9.2, 14) release: The PostgreSQL release we care about (ex: 9.2, 14)
""" """
global latest_version
global release_supported
# Regular expressions for parsing the RSS document # Regular expressions for parsing the RSS document
version_line = re.compile( version_line = re.compile(
@ -653,75 +559,75 @@ def parse_version_rss(raw_rss, release):
version = m.group(1) version = m.group(1)
parts = list(map(int, version.split("."))) parts = list(map(int, version.split(".")))
if parts[0] < 10: if parts[0] < 10:
Context.latest_version = int( latest_version = int(
"{}{:02}{:02}".format(parts[0], parts[1], parts[2]) "{}{:02}{:02}".format(parts[0], parts[1], parts[2])
) )
else: else:
Context.latest_version = int("{}00{:02}".format(parts[0], parts[1])) latest_version = int("{}00{:02}".format(parts[0], parts[1]))
elif release_found: elif release_found:
# The next line after the version tells if the version is supported # The next line after the version tells if the version is supported
if unsupported_line.match(line): if unsupported_line.match(line):
Context.release_supported = False release_supported = False
else: else:
Context.release_supported = True release_supported = True
break break
# Make sure we actually found it # Make sure we actually found it
if not release_found: if not release_found:
raise LatestVersionCheckError("Current release ({}) not found".format(release)) raise LatestVersionCheckError("Current release ({}) not found".format(release))
Context.log.info( log.info(
"Got latest PostgreSQL version: %s supported=%s", "Got latest PostgreSQL version: {} supported={}".format(
Context.latest_version, latest_version, release_supported
Context.release_supported, )
) )
Context.log.debug( log.debug(
"Next latest PostgreSQL version check will be after: %s", "Next latest PostgreSQL version check will be after: {}".format(
Context.latest_version_next_check, latest_version_next_check
)
) )
def get_latest_version(): def get_latest_version():
""" """
Get the latest supported version of the major PostgreSQL release running on the server being Get the latest supported version of the major PostgreSQL release running on the server being monitored.
monitored.
""" """
global latest_version_next_check
# If we don't know the latest version or it's past the recheck time, get the # If we don't know the latest version or it's past the recheck time, get the
# version from the PostgreSQL RSS feed. Only one thread needs to do this, so # version from the PostgreSQL RSS feed. Only one thread needs to do this, so
# they all try to grab the lock, and then make sure nobody else beat them to it. # they all try to grab the lock, and then make sure nobody else beat them to it.
if ( if (
Context.latest_version is None latest_version is None
or Context.latest_version_next_check is None or latest_version_next_check is None
or Context.latest_version_next_check < datetime.now() or latest_version_next_check < datetime.now()
): ):
# Note: we get the cluster version here before grabbing the latest_version_lock # Note: we get the cluster version here before grabbing the latest_version_lock
# lock so it's not held while trying to talk with the DB. # lock so it's not held while trying to talk with the DB.
release = version_num_to_release(get_cluster_version()) release = version_num_to_release(get_cluster_version())
with Context.latest_version_lock: with latest_version_lock:
# Only check if nobody already got the version before us # Only check if nobody already got the version before us
if ( if (
Context.latest_version is None latest_version is None
or Context.latest_version_next_check is None or latest_version_next_check is None
or Context.latest_version_next_check < datetime.now() or latest_version_next_check < datetime.now()
): ):
Context.log.info("Checking latest PostgreSQL version") log.info("Checking latest PostgreSQL version")
Context.latest_version_next_check = datetime.now() + timedelta( latest_version_next_check = datetime.now() + timedelta(
seconds=int(Context.config["latest_version_check_period"]) seconds=int(config["latest_version_check_period"])
) )
# Grab the RSS feed # Grab the RSS feed
raw_rss = requests.get( raw_rss = requests.get("https://www.postgresql.org/versions.rss")
"https://www.postgresql.org/versions.rss", timeout=30
)
if raw_rss.status_code != 200: if raw_rss.status_code != 200:
raise LatestVersionCheckError("code={}".format(raw_rss.status_code)) raise LatestVersionCheckError("code={}".format(r.status_code))
# Parse the RSS body and set Context variables # Parse the RSS body and set global variables
parse_version_rss(raw_rss.text, release) parse_version_rss(raw_rss.text, release)
return Context.latest_version return latest_version
def sample_metric(dbname, metric_name, args, retry=True): def sample_metric(dbname, metric_name, args, retry=True):
@ -730,9 +636,9 @@ def sample_metric(dbname, metric_name, args, retry=True):
""" """
# Get the metric definition # Get the metric definition
try: try:
metric = Context.config["metrics"][metric_name] metric = config["metrics"][metric_name]
except KeyError as e: except KeyError:
raise UnknownMetricError("Unknown metric: {}".format(metric_name)) from e raise UnknownMetricError("Unknown metric: {}".format(metric_name))
# Get the connection pool for the database, or create one if it doesn't # Get the connection pool for the database, or create one if it doesn't
# already exist. # already exist.
@ -747,7 +653,8 @@ def sample_metric(dbname, metric_name, args, retry=True):
# Execute the quert # Execute the quert
if retry: if retry:
return run_query(pool, metric["type"], query, args) return run_query(pool, metric["type"], query, args)
return run_query_no_retry(pool, metric["type"], query, args) else:
return run_query_no_retry(pool, metric["type"], query, args)
def test_queries(): def test_queries():
@ -755,17 +662,12 @@ def test_queries():
Run all of the metric queries against a database and check the results Run all of the metric queries against a database and check the results
""" """
# We just use the default db for tests # We just use the default db for tests
dbname = Context.config["dbname"] dbname = config["dbname"]
# Loop through all defined metrics. # Loop through all defined metrics.
for name, metric in Context.config["metrics"].items(): for name, metric in config["metrics"].items():
# If the metric has arguments to use while testing, grab those # If the metric has arguments to use while testing, grab those
args = metric.get("test_args", {}) args = metric.get("test_args", {})
print( print("Testing {} [{}]".format(name, ", ".join(["{}={}".format(key, value) for key, value in args.items()])))
"Testing {} [{}]".format(
name,
", ".join(["{}={}".format(key, value) for key, value in args.items()]),
)
)
# When testing against a docker container, we may end up connecting # When testing against a docker container, we may end up connecting
# before the service is truly up (it restarts during the initialization # before the service is truly up (it restarts during the initialization
# phase). To cope with this, we'll allow a few connection failures. # phase). To cope with this, we'll allow a few connection failures.
@ -802,8 +704,9 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
""" """
Override to suppress standard request logging Override to suppress standard request logging
""" """
pass
def do_GET(self): # pylint: disable=invalid-name def do_GET(self):
""" """
Handle a request. This is just a wrapper around the actual handler Handle a request. This is just a wrapper around the actual handler
code to keep things more readable. code to keep things more readable.
@ -811,7 +714,7 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
try: try:
self._handle_request() self._handle_request()
except BrokenPipeError: except BrokenPipeError:
Context.log.error("Client disconnected, exiting handler") log.error("Client disconnected, exiting handler")
def _handle_request(self): def _handle_request(self):
""" """
@ -824,6 +727,7 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
if metric_name == "agent_version": if metric_name == "agent_version":
self._reply(200, VERSION) self._reply(200, VERSION)
return
elif metric_name == "latest_version_info": elif metric_name == "latest_version_info":
try: try:
get_latest_version() get_latest_version()
@ -831,40 +735,46 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
200, 200,
json.dumps( json.dumps(
{ {
"latest": Context.latest_version, "latest": latest_version,
"supported": 1 if Context.release_supported else 0, "supported": 1 if release_supported else 0,
} }
), ),
) )
except LatestVersionCheckError as e: except LatestVersionCheckError as e:
Context.log.error( log.error("Failed to retrieve latest version information: {}".format(e))
"Failed to retrieve latest version information: %s", e
)
self._reply(503, "Failed to retrieve latest version info") self._reply(503, "Failed to retrieve latest version info")
else: return
# Note: parse_qs returns the values as a list. Since we always expect
# single values, just grab the first from each.
args = {key: values[0] for key, values in parsed_query.items()}
# Get the dbname. If none was provided, use the default from the # Note: parse_qs returns the values as a list. Since we always expect
# config. # single values, just grab the first from each.
dbname = args.get("dbname", Context.config["dbname"]) args = {key: values[0] for key, values in parsed_query.items()}
# Sample the metric # Get the dbname. If none was provided, use the default from the
try: # config.
self._reply(200, sample_metric(dbname, metric_name, args)) dbname = args.get("dbname", config["dbname"])
except UnknownMetricError:
Context.log.error("Unknown metric: %s", metric_name) # Sample the metric
self._reply(404, "Unknown metric") try:
except MetricVersionError: self._reply(200, sample_metric(dbname, metric_name, args))
Context.log.error("Failed to find an query version for %s", metric_name) return
self._reply(404, "Unsupported version") except UnknownMetricError as e:
except UnhappyDBError: log.error("Unknown metric: {}".format(metric_name))
Context.log.info("Database %s is unhappy, please be patient", dbname) self._reply(404, "Unknown metric")
self._reply(503, "Database unavailable") return
except Exception as e: # pylint: disable=broad-exception-caught except MetricVersionError as e:
Context.log.error("Error running query: %s", e) log.error(
self._reply(500, "Unexpected error: {}".format(e)) "Failed to find a version of {} for {}".format(metric_name, version)
)
self._reply(404, "Unsupported version")
return
except UnhappyDBError as e:
log.info("Database {} is unhappy, please be patient".format(dbname))
self._reply(503, "Database unavailable")
return
except Exception as e:
log.error("Error running query: {}".format(e))
self._reply(500, "Unexpected error: {}".format(e))
return
def _reply(self, code, content): def _reply(self, code, content):
""" """
@ -877,14 +787,7 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
self.wfile.write(bytes(content, "utf-8")) self.wfile.write(bytes(content, "utf-8"))
def main(): if __name__ == "__main__":
"""
Main application routine
"""
# Initialize the logging framework
Context.init_logging()
# Handle cli args # Handle cli args
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
prog="pgmon", description="A PostgreSQL monitoring agent" prog="pgmon", description="A PostgreSQL monitoring agent"
@ -905,35 +808,33 @@ def main():
args = parser.parse_args() args = parser.parse_args()
# Set the config file path # Set the config file path
Context.config_file = args.config_file config_file = args.config_file
# Read the config file # Read the config file
read_config(Context.config_file) read_config(config_file)
# Run query tests and exit if test mode is enabled # Run query tests and exit if test mode is enabled
if args.test: if args.test:
if test_queries() > 0: errors = test_queries()
if errors > 0:
sys.exit(1) sys.exit(1)
sys.exit(0) else:
sys.exit(0)
# Set up the http server to receive requests # Set up the http server to receive requests
server_address = (Context.config["address"], Context.config["port"]) server_address = (config["address"], config["port"])
Context.httpd = ThreadingHTTPServer(server_address, SimpleHTTPRequestHandler) httpd = ThreadingHTTPServer(server_address, SimpleHTTPRequestHandler)
# Set up the signal handler # Set up the signal handler
signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGHUP, signal_handler) signal.signal(signal.SIGHUP, signal_handler)
# Handle requests. # Handle requests.
Context.log.info("Listening on port %s...", Context.config["port"]) log.info("Listening on port {}...".format(config["port"]))
while Context.running: while running:
Context.httpd.handle_request() httpd.handle_request()
# Clean up PostgreSQL connections # Clean up PostgreSQL connections
# TODO: Improve this ... not sure it actually closes all the connections cleanly # TODO: Improve this ... not sure it actually closes all the connections cleanly
for pool in Context.connections.values(): for pool in connections.values():
pool.close() pool.close()
if __name__ == "__main__":
main()

File diff suppressed because it is too large Load Diff

View File

@ -1,129 +0,0 @@
#!/usr/bin/env python3
# Compare the items defined in a Zabbix template with the metrics defined in a config file.
import sys
import yaml
# Special built in metrics
SPECIAL = {
"agent_version",
"latest_version_info"
}
class NonMetricItemError(Exception):
"""
A given item does not directly use a metric
"""
def read_metrics(file):
"""
Read the metrics from a config file and return the list of names
params:
file: the name of the file to read
returns:
list of metric named defined in the file
raises:
yaml.parser.ParserError: invalid yaml file
"""
names = set()
config = None
with open(file, "r", encoding="utf-8") as f:
config = yaml.safe_load(f)
try:
for m in config["metrics"].keys():
names.add(m)
except KeyError:
pass
return names
def extract_metric(item):
"""
Extract the metric from an item definition
params:
item: the item/discovery/prototype definition dict
returns:
the name of the metric used in the item
raises:
NonMetricItemError: the item does not directly use a metric
"""
try:
if item["type"] == "HTTP_AGENT":
url = item["url"]
if url.startswith("http://localhost:{$AGENT_PORT}"):
return url.split("/")[-1]
except KeyError:
raise NonMetricItemError()
raise NonMetricItemError()
def read_template(file):
"""
Read the items from a Zabbix template and return the list of metric names
params:
file: the name of the file to read
returns:
list of metric named used in the file
raises:
yaml.parser.ParserError: invalid yaml file
"""
names = set()
config = None
with open(file, "r", encoding="utf-8") as f:
config = yaml.safe_load(f)
try:
for template in config["zabbix_export"]["templates"]:
for item in template["items"]:
try:
names.add(extract_metric(item))
except NonMetricItemError:
pass
for rule in template["discovery_rules"]:
try:
names.add(extract_metric(rule))
except NonMetricItemError:
pass
for proto in rule["item_prototypes"]:
try:
names.add(extract_metric(proto))
except NonMetricItemError:
pass
except KeyError:
pass
return names
if __name__ == '__main__':
config_file = sys.argv[1]
config_metrics = read_metrics(config_file)
template_file = sys.argv[2]
template_metrics = read_template(template_file) - SPECIAL
config_only = config_metrics - template_metrics
template_only = template_metrics - config_metrics
print("Config only: {}".format(sorted(list(config_only))))
print("Template only: {}".format(sorted(list(template_only))))

View File

@ -167,8 +167,7 @@ zabbix_export:
operator: NOT_MATCHES_REGEX operator: NOT_MATCHES_REGEX
formulaid: A formulaid: A
lifetime: 30d lifetime: 30d
enabled_lifetime_type: DISABLE_AFTER enabled_lifetime_type: DISABLE_NEVER
enabled_lifetime: 1d
item_prototypes: item_prototypes:
- uuid: a30babe4a6f4440bba2a3ee46eff7ce2 - uuid: a30babe4a6f4440bba2a3ee46eff7ce2
name: 'Time spent executing statements on {#DBNAME}' name: 'Time spent executing statements on {#DBNAME}'
@ -983,9 +982,6 @@ zabbix_export:
type: DEPENDENT type: DEPENDENT
key: pgmon_discover_io_backend_types key: pgmon_discover_io_backend_types
delay: '0' delay: '0'
lifetime: 30d
enabled_lifetime_type: DISABLE_AFTER
enabled_lifetime: 1h
item_prototypes: item_prototypes:
- uuid: b1ac2e56b30f4812bf33ce973ef16b10 - uuid: b1ac2e56b30f4812bf33ce973ef16b10
name: 'I/O Evictions by {#BACKEND_TYPE}' name: 'I/O Evictions by {#BACKEND_TYPE}'
@ -1576,15 +1572,8 @@ zabbix_export:
type: HTTP_AGENT type: HTTP_AGENT
key: pgmon_discover_rep key: pgmon_discover_rep
delay: 10m delay: 10m
filter:
conditions:
- macro: '{#APPLICATION_NAME}'
value: '^pg_[0-9]+_sync_[0-9]+_[0-9]+$'
operator: NOT_MATCHES_REGEX
formulaid: A
lifetime: 30d lifetime: 30d
enabled_lifetime_type: DISABLE_AFTER enabled_lifetime_type: DISABLE_NEVER
enabled_lifetime: 7d
item_prototypes: item_prototypes:
- uuid: 3a5a60620e6a4db694e47251148d82f5 - uuid: 3a5a60620e6a4db694e47251148d82f5
name: 'Flush lag for {#REPID}' name: 'Flush lag for {#REPID}'
@ -1786,8 +1775,6 @@ zabbix_export:
value: Raw value: Raw
url: 'http://localhost:{$AGENT_PORT}/discover_rep' url: 'http://localhost:{$AGENT_PORT}/discover_rep'
lld_macro_paths: lld_macro_paths:
- lld_macro: '{#APPLICATION_NAME}'
path: $.application_name
- lld_macro: '{#CLIENT_ADDR}' - lld_macro: '{#CLIENT_ADDR}'
path: $.client_addr path: $.client_addr
- lld_macro: '{#REPID}' - lld_macro: '{#REPID}'
@ -1799,15 +1786,6 @@ zabbix_export:
type: HTTP_AGENT type: HTTP_AGENT
key: pgmon_discover_slots key: pgmon_discover_slots
delay: 10m delay: 10m
filter:
conditions:
- macro: '{#SLOT_NAME}'
value: '^pg_[0-9]+_sync_[0-9]+_[0-9]+$'
operator: NOT_MATCHES_REGEX
formulaid: A
lifetime: 30d
enabled_lifetime_type: DISABLE_AFTER
enabled_lifetime: 7d
item_prototypes: item_prototypes:
- uuid: 536c5f82e3074ddfbfd842b3a2e8d46c - uuid: 536c5f82e3074ddfbfd842b3a2e8d46c
name: 'Slot {#SLOT_NAME} - Confirmed Flushed Bytes Lag' name: 'Slot {#SLOT_NAME} - Confirmed Flushed Bytes Lag'