Compare commits

..

16 Commits

Author SHA1 Message Date
8182ad75b9
Fix activity query
* Modify the activity query to ensure each state is always reported.

* Fix the activity preprocessor to unwrap the value from the array
  containing it.
2025-10-18 12:49:09 -04:00
175fd85f9f
Remove Debian 10
* The Debain 10 Docker image may have been removed.  Remove it from the
  list of supported distros until a source can be found.
2025-10-18 02:47:50 -04:00
e9c97d65cb
Fix OEL7 and AL2 targets 2025-10-18 02:42:02 -04:00
af5d9cf186
Add Amazon Linux 2 support, prepare for PG18
* Add Amazon Linux 2 as a supported OS.

* Dynamically select the package manager for Amazon and Oracle Linux.

* Prepare for supporting PostgreSQL 18.  Specifically, the Docker image
  seems to have moved the data directory, so get that dynamically.
2025-10-18 02:11:08 -04:00
17c15e7e48
Add support for Amazon Linux 2023, bump version to 1.1.0-rc1
* Add support for packaging for Amazon Linux 2023

* Bump version to 1.1.0-rc1

* Tidy up the Makefile a bit
2025-10-18 01:00:30 -04:00
f55bcdcfae
Merge branch 'dev/locks' into develop 2025-10-17 01:35:43 -04:00
e7b97a9e88
Add items to template
* Add items to the Zabbix template for:
    - Backgroubd writer / checkpoint stats
    - Lock stats
    - Connection states
2025-10-17 01:24:33 -04:00
3ade30c04a
Add script to shot tempalte coverage
* Add a script to compare the defined metrics with those used in the
  Zabbix template.

* Add a Makefile target to run the above script.
2025-10-09 02:21:17 -04:00
ab039dc412
Improve test coverage
* Refactor some functions to improve unit test coverage.

* Use the "coverage" module if available along with unit tests.
2025-10-08 23:53:45 -04:00
75c5d76047
Add a metric for tracking the number of granted locks 2025-09-23 01:14:50 -04:00
43cd162313
Refactor to make pylint happy 2025-09-23 01:12:49 -04:00
29bfd07dad
Run code through black 2025-07-14 01:58:08 -04:00
60589c2058
Update undiscovered item behavior 2025-07-14 01:39:57 -04:00
ea3aca3455
Filter out initial logical replication sync workers
* Slots are crated for each table during the initial sync, which only
  live for the duration of the copy for that table.

* The initial sync backend workers' names are also based on the table
  being copied.

* Filter out the above in Zabbix discovery based on application_name and
  slot_name.
2025-07-07 13:15:03 -04:00
cc71547f5f
Include application_name in replication discovery data 2025-07-07 13:07:11 -04:00
107d5056d6
Merge branch 'main' into develop 2025-07-06 03:37:37 -04:00
14 changed files with 1858 additions and 483 deletions

1
.gitignore vendored
View File

@ -8,3 +8,4 @@ missing
__pycache__
venv
build
.coverage

View File

@ -0,0 +1,74 @@
# Copyright 2024 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
EAPI=8
PYTHON_COMPAT=( python3_{6..13} )
inherit python-r1 systemd
DESCRIPTION="PostgreSQL monitoring bridge"
HOMEPAGE="None"
LICENSE="BSD"
SLOT="0"
KEYWORDS="amd64"
SRC_URI="https://code2.shh-dot-com.org/james/${PN}/releases/download/v${PV}/${P}.tar.bz2"
IUSE="-systemd"
DEPEND="
${PYTHON_DEPS}
dev-python/psycopg:2
dev-python/pyyaml
dev-python/requests
app-admin/logrotate
"
RDEPEND="${DEPEND}"
BDEPEND=""
#RESTRICT="fetch"
#S="${WORKDIR}/${PN}"
#pkg_nofetch() {
# einfo "Please download"
# einfo " - ${P}.tar.bz2"
# einfo "from ${HOMEPAGE} and place it in your DISTDIR directory."
# einfo "The file should be owned by portage:portage."
#}
src_compile() {
true
}
src_install() {
# Install init script
if ! use systemd ; then
newinitd "openrc/pgmon.initd" pgmon
newconfd "openrc/pgmon.confd" pgmon
fi
# Install systemd unit
if use systemd ; then
systemd_dounit "systemd/pgmon.service"
fi
# Install script
exeinto /usr/bin
newexe "src/pgmon.py" pgmon
# Install default config
diropts -o root -g root -m 0755
insinto /etc/pgmon
doins "sample-config/pgmon.yml"
doins "sample-config/pgmon-metrics.yml"
# Install logrotate config
insinto /etc/logrotate.d
newins "logrotate/pgmon.logrotate" pgmon
# Install man page
doman manpages/pgmon.1
}

185
Makefile
View File

@ -9,6 +9,7 @@ FULL_VERSION := $(shell grep -m 1 '^VERSION = ' "$(SCRIPT)" | sed -ne 's/.*"\(.*
VERSION := $(shell echo $(FULL_VERSION) | sed -n 's/\(.*\)\(-rc.*\|$$\)/\1/p')
RELEASE := $(shell echo $(FULL_VERSION) | sed -n 's/.*-rc\([0-9]\+\)$$/\1/p')
# Package version formatting
ifeq ($(RELEASE),)
RPM_RELEASE := 1
RPM_VERSION := $(VERSION)-$(RPM_RELEASE)
@ -19,6 +20,12 @@ RPM_VERSION := $(VERSION)-$(RPM_RELEASE)
DEB_VERSION := $(VERSION)~rc$(RELEASE)
endif
# Python stuff
PYTHON ?= python3
PYLINT ?= pylint
BLACK ?= black
HAVE_COVERAGE := $(shell $(PYTHON) -c 'import coverage' 2>/dev/null && echo yes)
# Where packages are built
BUILD_DIR := build
@ -27,68 +34,77 @@ BUILD_DIR := build
SUPPORTED := ubuntu-20.04 \
ubuntu-22.04 \
ubuntu-24.04 \
debian-10 \
debian-11 \
debian-12 \
debian-13 \
rockylinux-8 \
rockylinux-9 \
rockylinux-10 \
oraclelinux-7 \
amazonlinux-2 \
amazonlinux-2023 \
gentoo
##
# These targets are the main ones to use for most things.
##
.PHONY: all clean tgz test query-tests install-common install-openrc install-systemd
.PHONY: all clean tgz lint format test query-tests template-coverage
all: package-all
# Dump information related to the current version
version:
@echo "full version=$(FULL_VERSION) version=$(VERSION) rel=$(RELEASE) rpm=$(RPM_VERSION) deb=$(DEB_VERSION)"
# Build all packages
.PHONY: package-all
package-all: $(foreach distro_release, $(SUPPORTED), package-$(distro_release))
# Clean up the build directory
clean:
rm -rf $(BUILD_DIR)
# Gentoo package (tar.gz) creation
.PHONY: package-gentoo
package-gentoo:
mkdir -p $(BUILD_DIR)/gentoo
tar --transform "s,^,$(PACKAGE_NAME)-$(FULL_VERSION)/," -acjf $(BUILD_DIR)/gentoo/$(PACKAGE_NAME)-$(FULL_VERSION).tar.bz2 --exclude .gitignore $(shell git ls-tree --full-tree --name-only -r HEAD)
# Check for lint
lint:
$(PYLINT) src/pgmon.py
$(PYLINT) src/test_pgmon.py
$(BLACK) --check --diff src/pgmon.py
$(BLACK) --check --diff src/test_pgmon.py
# Format the code using black
format:
$(BLACK) src/pgmon.py
$(BLACK) src/test_pgmon.py
# Create a deb package
.PHONY: package-%
package-%: PARTS=$(subst -, ,$*)
package-%: DISTRO=$(word 1, $(PARTS))
package-%: RELEASE=$(word 2, $(PARTS))
package-%:
mkdir -p $(BUILD_DIR)
docker run --rm \
-v .:/src:ro \
-v ./$(BUILD_DIR):/output \
--user $(shell id -u):$(shell id -g) \
"$(DISTRO)-packager:$(RELEASE)"
# Run unit tests for the script (with coverage if it's available)
test:
ifeq ($(HAVE_COVERAGE),yes)
cd src ; $(PYTHON) -m coverage run -m unittest && python3 -m coverage report -m
else
cd src ; $(PYTHON) -m unittest
endif
# Create a tarball
# Run query tests
query-tests:
cd tests ; ./run-tests.sh
# Compare the sample metrics with the Zabbix template
template-coverage:
$(PYTHON) zabbix_templates/coverage.py sample-config/pgmon-metrics.yml zabbix_templates/pgmon_templates.yaml
# Create a tarball (openrc version)
tgz:
rm -rf $(BUILD_DIR)/tgz/root
mkdir -p $(BUILD_DIR)/tgz/root
$(MAKE) install-openrc DESTDIR=$(BUILD_DIR)/tgz/root
tar -cz -f $(BUILD_DIR)/tgz/$(PACKAGE_NAME)-$(FULL_VERSION).tgz -C $(BUILD_DIR)/tgz/root .
# Clean up the build directory
clean:
rm -rf $(BUILD_DIR)
# Run unit tests for the script
test:
cd src ; python3 -m unittest
# Run query tests
query-tests:
cd tests ; ./run-tests.sh
##
# Install targets
#
# These are mostly used by the various package building images to actually install the script
##
# Install the script at the specified base directory (common components)
.PHONY: install-common
install-common:
# Set up directories
mkdir -p $(DESTDIR)/etc/$(PACKAGE_NAME)
@ -111,6 +127,7 @@ install-common:
cp logrotate/${PACKAGE_NAME}.logrotate ${DESTDIR}/etc/logrotate.d/${PACKAGE_NAME}
# Install for systemd
.PHONY: install-systemd
install-systemd:
# Install the common stuff
$(MAKE) install-common
@ -122,6 +139,7 @@ install-systemd:
cp systemd/* $(DESTDIR)/lib/systemd/system/
# Install for open-rc
.PHONY: install-openrc
install-openrc:
# Install the common stuff
$(MAKE) install-common
@ -138,10 +156,46 @@ install-openrc:
cp openrc/pgmon.confd $(DESTDIR)/etc/conf.d/pgmon
# Run all of the install tests
.PHONY: install-tests debian-%-install-test rockylinux-%-install-test ubuntu-%-install-test gentoo-install-test
install-tests: $(foreach distro_release, $(SUPPORTED), $(distro_release)-install-test)
##
# Packaging targets
#
# These targets create packages for the supported distro versions
##
# Build all packages
.PHONY: package-all
package-all: all-package-images $(foreach distro_release, $(SUPPORTED), package-$(distro_release))
# Gentoo package (tar.bz2) creation
.PHONY: package-gentoo
package-gentoo:
mkdir -p $(BUILD_DIR)/gentoo
tar --transform "s,^,$(PACKAGE_NAME)-$(FULL_VERSION)/," -acjf $(BUILD_DIR)/gentoo/$(PACKAGE_NAME)-$(FULL_VERSION).tar.bz2 --exclude .gitignore $(shell git ls-tree --full-tree --name-only -r HEAD)
cp $(BUILD_DIR)/gentoo/$(PACKAGE_NAME)-$(FULL_VERSION).tar.bz2 $(BUILD_DIR)/
# Create a deb/rpm package
.PHONY: package-%
package-%: PARTS=$(subst -, ,$*)
package-%: DISTRO=$(word 1, $(PARTS))
package-%: RELEASE=$(word 2, $(PARTS))
package-%:
mkdir -p $(BUILD_DIR)
docker run --rm \
-v .:/src:ro \
-v ./$(BUILD_DIR):/output \
--user $(shell id -u):$(shell id -g) \
"$(DISTRO)-packager:$(RELEASE)"
##
# Install test targets
#
# These targets test installing the package on the supported distro versions
##
# Run all of the install tests
.PHONY: install-tests debian-%-install-test rockylinux-%-install-test ubuntu-%-install-test amazonlinux-%-install-test gentoo-install-test
install-tests: $(foreach distro_release, $(SUPPORTED), $(distro_release)-install-test)
# Run a Debian install test
debian-%-install-test:
@ -154,7 +208,7 @@ debian-%-install-test:
rockylinux-%-install-test:
docker run --rm \
-v ./$(BUILD_DIR):/output \
rockylinux:$* \
rockylinux/rockylinux:$* \
bash -c 'dnf makecache && dnf install -y /output/$(PACKAGE_NAME)-$(RPM_VERSION).el$*.noarch.rpm'
# Run an Ubuntu install test
@ -165,19 +219,30 @@ ubuntu-%-install-test:
bash -c 'apt-get update && apt-get install -y /output/$(PACKAGE_NAME)-$(DEB_VERSION)-ubuntu-$*.deb'
# Run an OracleLinux install test (this is for EL7 since CentOS7 images no longer exist)
oraclelinux-%-install-test: MGR=$(intcmp $*, 7, yum, yum, dnf)
oraclelinux-%-install-test:
docker run --rm \
-v ./$(BUILD_DIR):/output \
oraclelinux:7 \
bash -c 'yum makecache && yum install -y /output/$(PACKAGE_NAME)-$(RPM_VERSION).el7.noarch.rpm'
oraclelinux:$* \
bash -c '$(MGR) makecache && $(MGR) install -y /output/$(PACKAGE_NAME)-$(RPM_VERSION).el7.noarch.rpm'
# Run a Amazon Linux install test
# Note: AL2 is too old for dnf
amazonlinux-%-install-test: MGR=$(intcmp $*, 2, yum, yum, dnf)
amazonlinux-%-install-test:
docker run --rm \
-v ./$(BUILD_DIR):/output \
amazonlinux:$* \
bash -c '$(MGR) makecache && $(MGR) install -y /output/$(PACKAGE_NAME)-$(RPM_VERSION).amzn$*.noarch.rpm'
# Run a Gentoo install test
gentoo-install-test:
# May impliment this in the future, but would require additional headaches to set up a repo
true
echo "This would take a while ... skipping for now"
##
# Container targets
# Packaging image targets
#
# These targets build the docker images used to create packages
##
@ -186,6 +251,11 @@ gentoo-install-test:
.PHONY: all-package-images
all-package-images: $(foreach distro_release, $(SUPPORTED), package-image-$(distro_release))
# Nothing needs to be created to build te tarballs for Gentoo
.PHONY: package-image-gentoo
package-image-gentoo:
echo "Noting to do"
# Generic target for creating images that actually build the packages
# The % is expected to be: distro-release
# The build/.package-image-% target actually does the work and touches a state file to avoid excessive building
@ -201,11 +271,10 @@ package-image-%:
# Inside-container targets
#
# These targets are used inside containers. They expect the repo to be mounted
# at /src and the package manager specific build directory to be mounted at
# /output.
# at /src and the build directory to be mounted at /output.
##
.PHONY: actually-package-debian-% actually-package-rockylinux-% actually-package-ubuntu-% actually-package-oraclelinux-%
.PHONY: actually-package-debian-% actually-package-rockylinux-% actually-package-ubuntu-% actually-package-oraclelinux-% actually-package-amazonlinux-%
# Debian package creation
actually-package-debian-%:
@ -229,10 +298,36 @@ actually-package-ubuntu-%:
dpkg-deb -Zgzip --build /output/ubuntu-$* "/output/$(PACKAGE_NAME)-$(DEB_VERSION)-ubuntu-$*.deb"
# OracleLinux package creation
# Note: This needs to work inside OEL7, so we can't use intcmp
actually-package-oraclelinux-7:
mkdir -p /output/oraclelinux-7/{BUILD,RPMS,SOURCES,SPECS,SRPMS}
sed -e "s/@@VERSION@@/$(VERSION)/g" -e "s/@@RELEASE@@/$(RPM_RELEASE)/g" RPM/$(PACKAGE_NAME)-el7.spec > /output/oraclelinux-7/SPECS/$(PACKAGE_NAME).spec
rpmbuild --define '_topdir /output/oraclelinux-7' \
--define 'version $(RPM_VERSION)' \
-bb /output/oraclelinux-7/SPECS/$(PACKAGE_NAME).spec
cp /output/oraclelinux-7/RPMS/noarch/$(PACKAGE_NAME)-$(RPM_VERSION).el7.noarch.rpm /output/
actually-package-oraclelinux-%:
mkdir -p /output/oraclelinux-$*/{BUILD,RPMS,SOURCES,SPECS,SRPMS}
sed -e "s/@@VERSION@@/$(VERSION)/g" -e "s/@@RELEASE@@/$(RPM_RELEASE)/g" RPM/$(PACKAGE_NAME)-el7.spec > /output/oraclelinux-$*/SPECS/$(PACKAGE_NAME).spec
sed -e "s/@@VERSION@@/$(VERSION)/g" -e "s/@@RELEASE@@/$(RPM_RELEASE)/g" RPM/$(PACKAGE_NAME).spec > /output/oraclelinux-$*/SPECS/$(PACKAGE_NAME).spec
rpmbuild --define '_topdir /output/oraclelinux-$*' \
--define 'version $(RPM_VERSION)' \
-bb /output/oraclelinux-$*/SPECS/$(PACKAGE_NAME).spec
cp /output/oraclelinux-$*/RPMS/noarch/$(PACKAGE_NAME)-$(RPM_VERSION).el$*.noarch.rpm /output/
# Amazon Linux package creation
# Note: This needs to work inside AL2, so we can't use intcmp
actually-package-amazonlinux-2:
mkdir -p /output/amazonlinux-2/{BUILD,RPMS,SOURCES,SPECS,SRPMS}
sed -e "s/@@VERSION@@/$(VERSION)/g" -e "s/@@RELEASE@@/$(RPM_RELEASE)/g" RPM/$(PACKAGE_NAME)-el7.spec > /output/amazonlinux-2/SPECS/$(PACKAGE_NAME).spec
rpmbuild --define '_topdir /output/amazonlinux-2' \
--define 'version $(RPM_VERSION)' \
-bb /output/amazonlinux-2/SPECS/$(PACKAGE_NAME).spec
cp /output/amazonlinux-2/RPMS/noarch/$(PACKAGE_NAME)-$(RPM_VERSION).amzn2.noarch.rpm /output/
actually-package-amazonlinux-%:
mkdir -p /output/amazonlinux-$*/{BUILD,RPMS,SOURCES,SPECS,SRPMS}
sed -e "s/@@VERSION@@/$(VERSION)/g" -e "s/@@RELEASE@@/$(RPM_RELEASE)/g" RPM/$(PACKAGE_NAME).spec > /output/amazonlinux-$*/SPECS/$(PACKAGE_NAME).spec
rpmbuild --define '_topdir /output/amazonlinux-$*' \
--define 'version $(RPM_VERSION)' \
-bb /output/amazonlinux-$*/SPECS/$(PACKAGE_NAME).spec
cp /output/amazonlinux-$*/RPMS/noarch/$(PACKAGE_NAME)-$(RPM_VERSION).amzn$*.noarch.rpm /output/

View File

@ -0,0 +1,22 @@
# Dockerfile.rpm
ARG DISTRO=amazonlinux
ARG RELEASE=2023
FROM ${DISTRO}:${RELEASE}
ARG DISTRO
ARG RELEASE
RUN if [ ${RELEASE} -le 2 ] ; then MGR=yum ; else MGR=dnf ; fi && \
${MGR} install -y \
rpm-build \
make \
&& ${MGR} clean all
RUN echo -e "#!/bin/bash\nmake actually-package-${DISTRO}-${RELEASE}" > /init.sh \
&& chmod 755 /init.sh
WORKDIR /src
CMD ["/bin/bash", "/init.sh"]

View File

@ -8,11 +8,12 @@ FROM ${DISTRO}:${RELEASE}
ARG DISTRO
ARG RELEASE
RUN yum install -y \
RUN if [ ${RELEASE} -le 7 ] ; then MGR=yum ; else MGR=dnf ; fi && \
${MGR} install -y \
rpm-build \
make \
oracle-epel-release-el7 \
&& yum clean all
oracle-epel-release-el${RELEASE} \
&& ${MGR} clean all
RUN echo -e "#!/bin/bash\nmake actually-package-${DISTRO}-${RELEASE}" > /init.sh \
&& chmod 755 /init.sh

View File

@ -3,7 +3,7 @@
ARG DISTRO=rockylinux
ARG RELEASE=9
FROM ${DISTRO}:${RELEASE}
FROM rockylinux/${DISTRO}:${RELEASE}
ARG DISTRO
ARG RELEASE

4
pylintrc Normal file
View File

@ -0,0 +1,4 @@
[MASTER]
py-version=3.5
disable=fixme

View File

@ -18,6 +18,7 @@ metrics:
query:
0: >
SELECT host(client_addr) || '_' || regexp_replace(application_name, '[ ,]', '_', 'g') AS repid,
application_name,
client_addr,
state
FROM pg_stat_replication
@ -195,12 +196,28 @@ metrics:
type: set
query:
0: >
SELECT state,
count(*) AS backend_count,
COALESCE(EXTRACT(EPOCH FROM max(now() - state_change)), 0) AS max_state_time
FROM pg_stat_activity
WHERE datname = %(dbname)s
GROUP BY state
SELECT
states.state,
COALESCE(a.backend_count, 0) AS backend_count,
COALESCE(a.max_state_time, 0) AS max_state_time
FROM (
SELECT state,
count(*) AS backend_count,
COALESCE(EXTRACT(EPOCH FROM now() - min(state_change)), 0) AS max_state_time
FROM pg_stat_activity
WHERE datname = %(dbname)s
GROUP BY state
) AS a
RIGHT JOIN
unnest(ARRAY[
'active',
'idle',
'idle in transaction',
'idle in transaction (aborted)',
'fastpath function call',
'disabled'
]) AS states(state)
USING (state);
test_args:
dbname: postgres
@ -229,7 +246,19 @@ metrics:
SELECT COUNT(*) FILTER (WHERE has_sequence_privilege(c.oid, 'SELECT,USAGE')) AS visible_sequences,
COUNT(*) AS total_sequences
FROM pg_class AS c
WHERE relkind = 'S';
WHERE relkind = 'S'
locks:
type: row
query:
0: >
SELECT COUNT(*) AS total,
SUM(CASE WHEN granted THEN 1 ELSE 0 END) AS granted
FROM pg_locks
90400: >
SELECT COUNT(*) AS total,
COUNT(*) FILTER (WHERE granted) AS granted
FROM pg_locks
##
@ -305,3 +334,5 @@ metrics:
type: value
query:
0: SELECT count(*) AS ntables FROM pg_stat_user_tables
#sequence_usage:

View File

@ -1,105 +1,147 @@
#!/usr/bin/env python3
"""
pgmon is a monitoring intermediary that sits between a PostgreSQL cluster and a monitoring systen
that is capable of parsing JSON responses over an HTTP connection.
"""
# pylint: disable=too-few-public-methods
import yaml
import json
import time
import os
import sys
import signal
import argparse
import logging
import re
from decimal import Decimal
from urllib.parse import urlparse, parse_qs
from contextlib import contextmanager
from datetime import datetime, timedelta
from http.server import BaseHTTPRequestHandler
from http.server import ThreadingHTTPServer
from threading import Lock
import yaml
import psycopg2
from psycopg2.extras import RealDictCursor
from psycopg2.pool import ThreadedConnectionPool
from contextlib import contextmanager
import signal
from threading import Thread, Lock, Semaphore
from http.server import BaseHTTPRequestHandler, HTTPServer
from http.server import ThreadingHTTPServer
from urllib.parse import urlparse, parse_qs
import requests
import re
from decimal import Decimal
VERSION = "1.0.4"
VERSION = "1.1.0-rc1"
# Configuration
config = {}
# Dictionary of current PostgreSQL connection pools
connections_lock = Lock()
connections = {}
class Context:
"""
The global context for connections, config, version, nad IPC
"""
# Dictionary of unhappy databases. Keys are database names, value is the time
# the database was determined to be unhappy plus the cooldown setting. So,
# basically it's the time when we should try to connect to the database again.
unhappy_cooldown = {}
# Configuration
config = {}
# Version information
cluster_version = None
cluster_version_next_check = None
cluster_version_lock = Lock()
# Dictionary of current PostgreSQL connection pools
connections_lock = Lock()
connections = {}
# PostgreSQL latest version information
latest_version = None
latest_version_next_check = None
latest_version_lock = Lock()
release_supported = None
# Dictionary of unhappy databases. Keys are database names, value is the time
# the database was determined to be unhappy plus the cooldown setting. So,
# basically it's the time when we should try to connect to the database again.
unhappy_cooldown = {}
# Running state (used to gracefully shut down)
running = True
# Version information
cluster_version = None
cluster_version_next_check = None
cluster_version_lock = Lock()
# The http server object
httpd = None
# PostgreSQL latest version information
latest_version = None
latest_version_next_check = None
latest_version_lock = Lock()
release_supported = None
# Where the config file lives
config_file = None
# Running state (used to gracefully shut down)
running = True
# Configure logging
log = logging.getLogger(__name__)
formatter = logging.Formatter(
"%(asctime)s - %(levelname)s - %(filename)s: %(funcName)s() line %(lineno)d: %(message)s"
)
console_log_handler = logging.StreamHandler()
console_log_handler.setFormatter(formatter)
log.addHandler(console_log_handler)
# The http server object
httpd = None
# Where the config file lives
config_file = None
# Configure logging
log = logging.getLogger(__name__)
@classmethod
def init_logging(cls):
"""
Actually initialize the logging framework. Since we don't ever instantiate the Context
class, this provides a way to make a few modifications to the log handler.
"""
formatter = logging.Formatter(
"%(asctime)s - %(levelname)s - %(filename)s: "
"%(funcName)s() line %(lineno)d: %(message)s"
)
console_log_handler = logging.StreamHandler()
console_log_handler.setFormatter(formatter)
cls.log.addHandler(console_log_handler)
# Error types
class ConfigError(Exception):
pass
"""
Error type for all config related errors.
"""
class DisconnectedError(Exception):
pass
"""
Error indicating a previously active connection to the database has been disconnected.
"""
class UnhappyDBError(Exception):
pass
"""
Error indicating that a database the code has been asked to connect to is on the unhappy list.
"""
class UnknownMetricError(Exception):
pass
"""
Error indicating that an undefined metric was requested.
"""
class MetricVersionError(Exception):
pass
"""
Error indicating that there is no suitable query for a metric that was requested for the
version of PostgreSQL being monitored.
"""
class LatestVersionCheckError(Exception):
pass
"""
Error indicating that there was a problem retrieving or parsing the latest version information.
"""
class InvalidDataError(Exception):
"""
Error indicating query results were somehow invalid
"""
# Default config settings
default_config = {
DEFAULT_CONFIG = {
# The address the agent binds to
"address": "127.0.0.1",
# The port the agent listens on for requests
@ -177,6 +219,60 @@ def update_deep(d1, d2):
return d1
def validate_metric(path, name, metric):
"""
Validate a metric definition from a given file. If any query definitions come from external
files, the metric dict will be updated with the actual query.
Params:
path: path to the file which contains this definition
name: name of the metric
metric: the dictionary containing the metric definition
"""
# Validate return types
try:
if metric["type"] not in ["value", "row", "column", "set"]:
raise ConfigError(
"Invalid return type: {} for metric {} in {}".format(
metric["type"], name, path
)
)
except KeyError as e:
raise ConfigError(
"No type specified for metric {} in {}".format(name, path)
) from e
# Ensure queries exist
query_dict = metric.get("query", {})
if not isinstance(query_dict, dict):
raise ConfigError(
"Query definition should be a dictionary, got: {} for metric {} in {}".format(
query_dict, name, path
)
)
if len(query_dict) == 0:
raise ConfigError("Missing queries for metric {} in {}".format(name, path))
# Read external sql files and validate version keys
config_base = os.path.dirname(path)
for vers, query in metric["query"].items():
try:
int(vers)
except Exception as e:
raise ConfigError(
"Invalid version: {} for metric {} in {}".format(vers, name, path)
) from e
# Read in the external query and update the definition in the metricdictionary
if query.startswith("file:"):
query_path = query[5:]
if not query_path.startswith("/"):
query_path = os.path.join(config_base, query_path)
with open(query_path, "r", encoding="utf-8") as f:
metric["query"][vers] = f.read()
def read_config(path, included=False):
"""
Read a config file.
@ -185,61 +281,21 @@ def read_config(path, included=False):
path: path to the file to read
included: is this file included by another file?
"""
# Read config file
log.info("Reading log file: {}".format(path))
with open(path, "r") as f:
Context.log.info("Reading log file: %s", path)
with open(path, "r", encoding="utf-8") as f:
try:
cfg = yaml.safe_load(f)
except yaml.parser.ParserError as e:
raise ConfigError("Inavlid config file: {}: {}".format(path, e))
# Since we use it a few places, get the base directory from the config
config_base = os.path.dirname(path)
raise ConfigError("Inavlid config file: {}: {}".format(path, e)) from e
# Read any external queries and validate metric definitions
for name, metric in cfg.get("metrics", {}).items():
# Validate return types
try:
if metric["type"] not in ["value", "row", "column", "set"]:
raise ConfigError(
"Invalid return type: {} for metric {} in {}".format(
metric["type"], name, path
)
)
except KeyError:
raise ConfigError(
"No type specified for metric {} in {}".format(name, path)
)
# Ensure queries exist
query_dict = metric.get("query", {})
if type(query_dict) is not dict:
raise ConfigError(
"Query definition should be a dictionary, got: {} for metric {} in {}".format(
query_dict, name, path
)
)
if len(query_dict) == 0:
raise ConfigError("Missing queries for metric {} in {}".format(name, path))
# Read external sql files and validate version keys
for vers, query in metric["query"].items():
try:
int(vers)
except:
raise ConfigError(
"Invalid version: {} for metric {} in {}".format(vers, name, path)
)
if query.startswith("file:"):
query_path = query[5:]
if not query_path.startswith("/"):
query_path = os.path.join(config_base, query_path)
with open(query_path, "r") as f:
metric["query"][vers] = f.read()
validate_metric(path, name, metric)
# Read any included config files
config_base = os.path.dirname(path)
for inc in cfg.get("include", []):
# Prefix relative paths with the directory from the current config
if not inc.startswith("/"):
@ -250,34 +306,37 @@ def read_config(path, included=False):
# config
if included:
return cfg
else:
new_config = {}
update_deep(new_config, default_config)
update_deep(new_config, cfg)
# Minor sanity checks
if len(new_config["metrics"]) == 0:
log.error("No metrics are defined")
raise ConfigError("No metrics defined")
new_config = {}
update_deep(new_config, DEFAULT_CONFIG)
update_deep(new_config, cfg)
# Validate the new log level before changing the config
if new_config["log_level"].upper() not in [
"DEBUG",
"INFO",
"WARNING",
"ERROR",
"CRITICAL",
]:
raise ConfigError("Invalid log level: {}".format(new_config["log_level"]))
# Minor sanity checks
if len(new_config["metrics"]) == 0:
Context.log.error("No metrics are defined")
raise ConfigError("No metrics defined")
global config
config = new_config
# Validate the new log level before changing the config
if new_config["log_level"].upper() not in [
"DEBUG",
"INFO",
"WARNING",
"ERROR",
"CRITICAL",
]:
raise ConfigError("Invalid log level: {}".format(new_config["log_level"]))
# Apply changes to log level
log.setLevel(logging.getLevelName(config["log_level"].upper()))
Context.config = new_config
# Apply changes to log level
Context.log.setLevel(logging.getLevelName(Context.config["log_level"].upper()))
# Return the config (mostly to make pylint happy, but also in case I opt to remove the side
# effect and make this more functional.
return Context.config
def signal_handler(sig, frame):
def signal_handler(sig, frame): # pylint: disable=unused-argument
"""
Function for handling signals
@ -288,19 +347,22 @@ def signal_handler(sig, frame):
# Signal everything to shut down
if sig in [signal.SIGINT, signal.SIGTERM, signal.SIGQUIT]:
log.info("Shutting down ...")
global running
running = False
if httpd is not None:
httpd.socket.close()
Context.log.info("Shutting down ...")
Context.running = False
if Context.httpd is not None:
Context.httpd.socket.close()
# Signal a reload
if sig == signal.SIGHUP:
log.warning("Received config reload signal")
read_config(config_file)
Context.log.warning("Received config reload signal")
read_config(Context.config_file)
class ConnectionPool(ThreadedConnectionPool):
"""
Threaded connection pool that has a context manager.
"""
def __init__(self, dbname, minconn, maxconn, *args, **kwargs):
# Make sure dbname isn't different in the kwargs
kwargs["dbname"] = dbname
@ -309,7 +371,14 @@ class ConnectionPool(ThreadedConnectionPool):
self.name = dbname
@contextmanager
def connection(self, timeout=None):
def connection(self, timeout):
"""
Connection context manager for our connection pool. This will attempt to retrieve a
connection until the timeout is reached.
Params:
timeout: how long to keep trying to get a connection bedore giving up
"""
conn = None
timeout_time = datetime.now() + timedelta(timeout)
# We will continue to try to get a connection slot until we time out
@ -333,34 +402,37 @@ class ConnectionPool(ThreadedConnectionPool):
def get_pool(dbname):
"""
Get a database connection pool.
Params:
dbname: the name of the database for which a connection pool should be returned.
"""
# Check if the db is unhappy and wants to be left alone
if dbname in unhappy_cooldown:
if unhappy_cooldown[dbname] > datetime.now():
if dbname in Context.unhappy_cooldown:
if Context.unhappy_cooldown[dbname] > datetime.now():
raise UnhappyDBError()
# Create a connection pool if it doesn't already exist
if dbname not in connections:
with connections_lock:
if dbname not in Context.connections:
with Context.connections_lock:
# Make sure nobody created the pool while we were waiting on the
# lock
if dbname not in connections:
log.info("Creating connection pool for: {}".format(dbname))
if dbname not in Context.connections:
Context.log.info("Creating connection pool for: %s", dbname)
# Actually create the connection pool
connections[dbname] = ConnectionPool(
Context.connections[dbname] = ConnectionPool(
dbname,
int(config["min_pool_size"]),
int(config["max_pool_size"]),
int(Context.config["min_pool_size"]),
int(Context.config["max_pool_size"]),
application_name="pgmon",
host=config["dbhost"],
port=config["dbport"],
user=config["dbuser"],
connect_timeout=int(config["connect_timeout"]),
sslmode=config["ssl_mode"],
host=Context.config["dbhost"],
port=Context.config["dbport"],
user=Context.config["dbuser"],
connect_timeout=int(Context.config["connect_timeout"]),
sslmode=Context.config["ssl_mode"],
)
# Clear the unhappy indicator if present
unhappy_cooldown.pop(dbname, None)
return connections[dbname]
Context.unhappy_cooldown.pop(dbname, None)
return Context.connections[dbname]
def handle_connect_failure(pool):
@ -368,8 +440,8 @@ def handle_connect_failure(pool):
Mark the database as being unhappy so we can leave it alone for a while
"""
dbname = pool.name
unhappy_cooldown[dbname] = datetime.now() + timedelta(
seconds=int(config["reconnect_cooldown"])
Context.unhappy_cooldown[dbname] = datetime.now() + timedelta(
seconds=int(Context.config["reconnect_cooldown"])
)
@ -400,41 +472,67 @@ def json_encode_special(obj):
"""
if isinstance(obj, Decimal):
return float(obj)
raise TypeError(f'Cannot serialize object of {type(obj)}')
raise TypeError("Cannot serialize object of {}".format(type(obj)))
def json_encode_result(return_type, res):
"""
Return a json string encoding of the results of a query.
params:
return_type: the expected structure to return. One of:
value, row, column, set
res: the query results
returns: a json string form of the results
raises:
ConfigError: when an invalid return_type is given
InvalidDataError: when the query results don't match the return type
"""
try:
if return_type == "value":
if len(res) == 0:
return ""
return str(list(res[0].values())[0])
if return_type == "row":
return json.dumps(
res[0] if len(res) > 0 else {}, default=json_encode_special
)
if return_type == "column":
return json.dumps(
[list(r.values())[0] for r in res], default=json_encode_special
)
if return_type == "set":
return json.dumps(res, default=json_encode_special)
except IndexError as e:
raise InvalidDataError(e) from e
# If we got to this point, the return type is invalid
raise ConfigError("Invalid query return type: {}".format(return_type))
def run_query_no_retry(pool, return_type, query, args):
"""
Run the query with no explicit retry code
"""
with pool.connection(float(config["connect_timeout"])) as conn:
with pool.connection(float(Context.config["connect_timeout"])) as conn:
try:
with conn.cursor(cursor_factory=RealDictCursor) as curs:
curs.execute(query, args)
res = curs.fetchall()
if return_type == "value":
if len(res) == 0:
return ""
return str(list(res[0].values())[0])
elif return_type == "row":
if len(res) == 0:
return "[]"
return json.dumps(res[0], default=json_encode_special)
elif return_type == "column":
if len(res) == 0:
return "[]"
return json.dumps([list(r.values())[0] for r in res], default=json_encode_special)
elif return_type == "set":
return json.dumps(res, default=json_encode_special)
except:
return json_encode_result(return_type, res)
except Exception as e:
dbname = pool.name
if dbname in unhappy_cooldown:
raise UnhappyDBError()
elif conn.closed != 0:
raise DisconnectedError()
else:
raise
if dbname in Context.unhappy_cooldown:
raise UnhappyDBError() from e
if conn.closed != 0:
raise DisconnectedError() from e
raise
def run_query(pool, return_type, query, args):
@ -455,7 +553,7 @@ def run_query(pool, return_type, query, args):
try:
return run_query_no_retry(pool, return_type, query, args)
except DisconnectedError:
log.warning("Stale PostgreSQL connection found ... trying again")
Context.log.warning("Stale PostgreSQL connection found ... trying again")
# This sleep is an annoying hack to give the pool workers time to
# actually mark the connection, otherwise it can be given back in the
# next connection() call
@ -463,9 +561,9 @@ def run_query(pool, return_type, query, args):
time.sleep(1)
try:
return run_query_no_retry(pool, return_type, query, args)
except:
except Exception as e:
handle_connect_failure(pool)
raise UnhappyDBError()
raise UnhappyDBError() from e
def get_cluster_version():
@ -473,40 +571,39 @@ def get_cluster_version():
Get the PostgreSQL version if we don't already know it, or if it's been
too long sice the last time it was checked.
"""
global cluster_version
global cluster_version_next_check
# If we don't know the version or it's past the recheck time, get the
# version from the database. Only one thread needs to do this, so they all
# try to grab the lock, and then make sure nobody else beat them to it.
if (
cluster_version is None
or cluster_version_next_check is None
or cluster_version_next_check < datetime.now()
Context.cluster_version is None
or Context.cluster_version_next_check is None
or Context.cluster_version_next_check < datetime.now()
):
with cluster_version_lock:
with Context.cluster_version_lock:
# Only check if nobody already got the version before us
if (
cluster_version is None
or cluster_version_next_check is None
or cluster_version_next_check < datetime.now()
Context.cluster_version is None
or Context.cluster_version_next_check is None
or Context.cluster_version_next_check < datetime.now()
):
log.info("Checking PostgreSQL cluster version")
pool = get_pool(config["dbname"])
cluster_version = int(
Context.log.info("Checking PostgreSQL cluster version")
pool = get_pool(Context.config["dbname"])
Context.cluster_version = int(
run_query(pool, "value", "SHOW server_version_num", None)
)
cluster_version_next_check = datetime.now() + timedelta(
seconds=int(config["version_check_period"])
Context.cluster_version_next_check = datetime.now() + timedelta(
seconds=int(Context.config["version_check_period"])
)
log.info("Got PostgreSQL cluster version: {}".format(cluster_version))
log.debug(
"Next PostgreSQL cluster version check will be after: {}".format(
cluster_version_next_check
)
Context.log.info(
"Got PostgreSQL cluster version: %s", Context.cluster_version
)
Context.log.debug(
"Next PostgreSQL cluster version check will be after: %s",
Context.cluster_version_next_check,
)
return cluster_version
return Context.cluster_version
def version_num_to_release(version_num):
@ -519,8 +616,7 @@ def version_num_to_release(version_num):
"""
if version_num // 10000 < 10:
return version_num // 10000 + (version_num % 10000 // 100 / 10)
else:
return version_num // 10000
return version_num // 10000
def parse_version_rss(raw_rss, release):
@ -528,7 +624,7 @@ def parse_version_rss(raw_rss, release):
Parse the raw RSS from the versions.rss feed to extract the latest version of
PostgreSQL that's availabe for the cluster being monitored.
This sets these global variables:
This sets these Context variables:
latest_version
release_supported
@ -538,8 +634,6 @@ def parse_version_rss(raw_rss, release):
raw_rss: The raw rss text from versions.rss
release: The PostgreSQL release we care about (ex: 9.2, 14)
"""
global latest_version
global release_supported
# Regular expressions for parsing the RSS document
version_line = re.compile(
@ -559,75 +653,75 @@ def parse_version_rss(raw_rss, release):
version = m.group(1)
parts = list(map(int, version.split(".")))
if parts[0] < 10:
latest_version = int(
Context.latest_version = int(
"{}{:02}{:02}".format(parts[0], parts[1], parts[2])
)
else:
latest_version = int("{}00{:02}".format(parts[0], parts[1]))
Context.latest_version = int("{}00{:02}".format(parts[0], parts[1]))
elif release_found:
# The next line after the version tells if the version is supported
if unsupported_line.match(line):
release_supported = False
Context.release_supported = False
else:
release_supported = True
Context.release_supported = True
break
# Make sure we actually found it
if not release_found:
raise LatestVersionCheckError("Current release ({}) not found".format(release))
log.info(
"Got latest PostgreSQL version: {} supported={}".format(
latest_version, release_supported
)
Context.log.info(
"Got latest PostgreSQL version: %s supported=%s",
Context.latest_version,
Context.release_supported,
)
log.debug(
"Next latest PostgreSQL version check will be after: {}".format(
latest_version_next_check
)
Context.log.debug(
"Next latest PostgreSQL version check will be after: %s",
Context.latest_version_next_check,
)
def get_latest_version():
"""
Get the latest supported version of the major PostgreSQL release running on the server being monitored.
Get the latest supported version of the major PostgreSQL release running on the server being
monitored.
"""
global latest_version_next_check
# If we don't know the latest version or it's past the recheck time, get the
# version from the PostgreSQL RSS feed. Only one thread needs to do this, so
# they all try to grab the lock, and then make sure nobody else beat them to it.
if (
latest_version is None
or latest_version_next_check is None
or latest_version_next_check < datetime.now()
Context.latest_version is None
or Context.latest_version_next_check is None
or Context.latest_version_next_check < datetime.now()
):
# Note: we get the cluster version here before grabbing the latest_version_lock
# lock so it's not held while trying to talk with the DB.
release = version_num_to_release(get_cluster_version())
with latest_version_lock:
with Context.latest_version_lock:
# Only check if nobody already got the version before us
if (
latest_version is None
or latest_version_next_check is None
or latest_version_next_check < datetime.now()
Context.latest_version is None
or Context.latest_version_next_check is None
or Context.latest_version_next_check < datetime.now()
):
log.info("Checking latest PostgreSQL version")
latest_version_next_check = datetime.now() + timedelta(
seconds=int(config["latest_version_check_period"])
Context.log.info("Checking latest PostgreSQL version")
Context.latest_version_next_check = datetime.now() + timedelta(
seconds=int(Context.config["latest_version_check_period"])
)
# Grab the RSS feed
raw_rss = requests.get("https://www.postgresql.org/versions.rss")
raw_rss = requests.get(
"https://www.postgresql.org/versions.rss", timeout=30
)
if raw_rss.status_code != 200:
raise LatestVersionCheckError("code={}".format(r.status_code))
raise LatestVersionCheckError("code={}".format(raw_rss.status_code))
# Parse the RSS body and set global variables
# Parse the RSS body and set Context variables
parse_version_rss(raw_rss.text, release)
return latest_version
return Context.latest_version
def sample_metric(dbname, metric_name, args, retry=True):
@ -636,9 +730,9 @@ def sample_metric(dbname, metric_name, args, retry=True):
"""
# Get the metric definition
try:
metric = config["metrics"][metric_name]
except KeyError:
raise UnknownMetricError("Unknown metric: {}".format(metric_name))
metric = Context.config["metrics"][metric_name]
except KeyError as e:
raise UnknownMetricError("Unknown metric: {}".format(metric_name)) from e
# Get the connection pool for the database, or create one if it doesn't
# already exist.
@ -653,8 +747,7 @@ def sample_metric(dbname, metric_name, args, retry=True):
# Execute the quert
if retry:
return run_query(pool, metric["type"], query, args)
else:
return run_query_no_retry(pool, metric["type"], query, args)
return run_query_no_retry(pool, metric["type"], query, args)
def test_queries():
@ -662,12 +755,17 @@ def test_queries():
Run all of the metric queries against a database and check the results
"""
# We just use the default db for tests
dbname = config["dbname"]
dbname = Context.config["dbname"]
# Loop through all defined metrics.
for name, metric in config["metrics"].items():
for name, metric in Context.config["metrics"].items():
# If the metric has arguments to use while testing, grab those
args = metric.get("test_args", {})
print("Testing {} [{}]".format(name, ", ".join(["{}={}".format(key, value) for key, value in args.items()])))
print(
"Testing {} [{}]".format(
name,
", ".join(["{}={}".format(key, value) for key, value in args.items()]),
)
)
# When testing against a docker container, we may end up connecting
# before the service is truly up (it restarts during the initialization
# phase). To cope with this, we'll allow a few connection failures.
@ -704,9 +802,8 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
"""
Override to suppress standard request logging
"""
pass
def do_GET(self):
def do_GET(self): # pylint: disable=invalid-name
"""
Handle a request. This is just a wrapper around the actual handler
code to keep things more readable.
@ -714,7 +811,7 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
try:
self._handle_request()
except BrokenPipeError:
log.error("Client disconnected, exiting handler")
Context.log.error("Client disconnected, exiting handler")
def _handle_request(self):
"""
@ -727,7 +824,6 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
if metric_name == "agent_version":
self._reply(200, VERSION)
return
elif metric_name == "latest_version_info":
try:
get_latest_version()
@ -735,46 +831,40 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
200,
json.dumps(
{
"latest": latest_version,
"supported": 1 if release_supported else 0,
"latest": Context.latest_version,
"supported": 1 if Context.release_supported else 0,
}
),
)
except LatestVersionCheckError as e:
log.error("Failed to retrieve latest version information: {}".format(e))
Context.log.error(
"Failed to retrieve latest version information: %s", e
)
self._reply(503, "Failed to retrieve latest version info")
return
else:
# Note: parse_qs returns the values as a list. Since we always expect
# single values, just grab the first from each.
args = {key: values[0] for key, values in parsed_query.items()}
# Note: parse_qs returns the values as a list. Since we always expect
# single values, just grab the first from each.
args = {key: values[0] for key, values in parsed_query.items()}
# Get the dbname. If none was provided, use the default from the
# config.
dbname = args.get("dbname", Context.config["dbname"])
# Get the dbname. If none was provided, use the default from the
# config.
dbname = args.get("dbname", config["dbname"])
# Sample the metric
try:
self._reply(200, sample_metric(dbname, metric_name, args))
return
except UnknownMetricError as e:
log.error("Unknown metric: {}".format(metric_name))
self._reply(404, "Unknown metric")
return
except MetricVersionError as e:
log.error(
"Failed to find a version of {} for {}".format(metric_name, version)
)
self._reply(404, "Unsupported version")
return
except UnhappyDBError as e:
log.info("Database {} is unhappy, please be patient".format(dbname))
self._reply(503, "Database unavailable")
return
except Exception as e:
log.error("Error running query: {}".format(e))
self._reply(500, "Unexpected error: {}".format(e))
return
# Sample the metric
try:
self._reply(200, sample_metric(dbname, metric_name, args))
except UnknownMetricError:
Context.log.error("Unknown metric: %s", metric_name)
self._reply(404, "Unknown metric")
except MetricVersionError:
Context.log.error("Failed to find an query version for %s", metric_name)
self._reply(404, "Unsupported version")
except UnhappyDBError:
Context.log.info("Database %s is unhappy, please be patient", dbname)
self._reply(503, "Database unavailable")
except Exception as e: # pylint: disable=broad-exception-caught
Context.log.error("Error running query: %s", e)
self._reply(500, "Unexpected error: {}".format(e))
def _reply(self, code, content):
"""
@ -787,7 +877,14 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
self.wfile.write(bytes(content, "utf-8"))
if __name__ == "__main__":
def main():
"""
Main application routine
"""
# Initialize the logging framework
Context.init_logging()
# Handle cli args
parser = argparse.ArgumentParser(
prog="pgmon", description="A PostgreSQL monitoring agent"
@ -808,33 +905,35 @@ if __name__ == "__main__":
args = parser.parse_args()
# Set the config file path
config_file = args.config_file
Context.config_file = args.config_file
# Read the config file
read_config(config_file)
read_config(Context.config_file)
# Run query tests and exit if test mode is enabled
if args.test:
errors = test_queries()
if errors > 0:
if test_queries() > 0:
sys.exit(1)
else:
sys.exit(0)
sys.exit(0)
# Set up the http server to receive requests
server_address = (config["address"], config["port"])
httpd = ThreadingHTTPServer(server_address, SimpleHTTPRequestHandler)
server_address = (Context.config["address"], Context.config["port"])
Context.httpd = ThreadingHTTPServer(server_address, SimpleHTTPRequestHandler)
# Set up the signal handler
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGHUP, signal_handler)
# Handle requests.
log.info("Listening on port {}...".format(config["port"]))
while running:
httpd.handle_request()
Context.log.info("Listening on port %s...", Context.config["port"])
while Context.running:
Context.httpd.handle_request()
# Clean up PostgreSQL connections
# TODO: Improve this ... not sure it actually closes all the connections cleanly
for pool in connections.values():
for pool in Context.connections.values():
pool.close()
if __name__ == "__main__":
main()

File diff suppressed because it is too large Load Diff

View File

@ -20,7 +20,7 @@ services:
POSTGRES_PASSWORD: secret
healthcheck:
#test: [ "CMD", "pg_isready", "-U", "postgres" ]
test: [ "CMD-SHELL", "pg_controldata /var/lib/postgresql/data/ | grep -q 'in production'" ]
test: [ "CMD-SHELL", "pg_controldata $(psql -At -U postgres -c 'show data_directory') | grep -q 'in production'" ]
interval: 5s
timeout: 2s
retries: 40

View File

@ -24,6 +24,7 @@ images["14"]='14-bookworm'
images["15"]='15-bookworm'
images["16"]='16-bookworm'
images["17"]='17-bookworm'
images["18"]='18-trixie'
declare -A results=()

129
zabbix_templates/coverage.py Executable file
View File

@ -0,0 +1,129 @@
#!/usr/bin/env python3
# Compare the items defined in a Zabbix template with the metrics defined in a config file.
import sys
import yaml
# Special built in metrics
SPECIAL = {
"agent_version",
"latest_version_info"
}
class NonMetricItemError(Exception):
"""
A given item does not directly use a metric
"""
def read_metrics(file):
"""
Read the metrics from a config file and return the list of names
params:
file: the name of the file to read
returns:
list of metric named defined in the file
raises:
yaml.parser.ParserError: invalid yaml file
"""
names = set()
config = None
with open(file, "r", encoding="utf-8") as f:
config = yaml.safe_load(f)
try:
for m in config["metrics"].keys():
names.add(m)
except KeyError:
pass
return names
def extract_metric(item):
"""
Extract the metric from an item definition
params:
item: the item/discovery/prototype definition dict
returns:
the name of the metric used in the item
raises:
NonMetricItemError: the item does not directly use a metric
"""
try:
if item["type"] == "HTTP_AGENT":
url = item["url"]
if url.startswith("http://localhost:{$AGENT_PORT}"):
return url.split("/")[-1]
except KeyError:
raise NonMetricItemError()
raise NonMetricItemError()
def read_template(file):
"""
Read the items from a Zabbix template and return the list of metric names
params:
file: the name of the file to read
returns:
list of metric named used in the file
raises:
yaml.parser.ParserError: invalid yaml file
"""
names = set()
config = None
with open(file, "r", encoding="utf-8") as f:
config = yaml.safe_load(f)
try:
for template in config["zabbix_export"]["templates"]:
for item in template["items"]:
try:
names.add(extract_metric(item))
except NonMetricItemError:
pass
for rule in template["discovery_rules"]:
try:
names.add(extract_metric(rule))
except NonMetricItemError:
pass
for proto in rule["item_prototypes"]:
try:
names.add(extract_metric(proto))
except NonMetricItemError:
pass
except KeyError:
pass
return names
if __name__ == '__main__':
config_file = sys.argv[1]
config_metrics = read_metrics(config_file)
template_file = sys.argv[2]
template_metrics = read_template(template_file) - SPECIAL
config_only = config_metrics - template_metrics
template_only = template_metrics - config_metrics
print("Config only: {}".format(sorted(list(config_only))))
print("Template only: {}".format(sorted(list(template_only))))

View File

@ -49,6 +49,206 @@ zabbix_export:
tags:
- tag: Application
value: PostgreSQL
- uuid: 1dd74025ca0e463bb9eee5cb473f30db
name: 'Total buffers allocated'
type: DEPENDENT
key: 'pgmon.bgwriter[buffers_alloc,total]'
delay: '0'
history: 90d
description: 'Total number of shared buffers that have been allocated'
preprocessing:
- type: JSONPATH
parameters:
- $.buffers_alloc
master_item:
key: 'pgmon[bgwriter]'
tags:
- tag: Application
value: PostgreSQL
- uuid: 4ab405996e71444a8113b95c65d73be2
name: 'Total buffers written by backends'
type: DEPENDENT
key: 'pgmon.bgwriter[buffers_backend,total]'
delay: '0'
history: 90d
description: 'Total number of shared buffers written by backends'
preprocessing:
- type: JSONPATH
parameters:
- $.buffers_backend
master_item:
key: 'pgmon[bgwriter]'
tags:
- tag: Application
value: PostgreSQL
- uuid: e8b440a1d6ff4ca6b3cb27e2e76f9d06
name: 'Total number of fsyncs from backends'
type: DEPENDENT
key: 'pgmon.bgwriter[buffers_backend_fsync,total]'
delay: '0'
history: 90d
description: 'Total number of times backends have issued their own fsync calls'
preprocessing:
- type: JSONPATH
parameters:
- $.buffers_backend_fsync
master_item:
key: 'pgmon[bgwriter]'
tags:
- tag: Application
value: PostgreSQL
- uuid: f8a14885edf34b5cbf2e92432e8fa4c2
name: 'Total buffers written during checkpoints'
type: DEPENDENT
key: 'pgmon.bgwriter[buffers_checkpoint,total]'
delay: '0'
history: 90d
description: 'Total number of shared buffers written during checkpoints'
preprocessing:
- type: JSONPATH
parameters:
- $.buffers_checkpoint
master_item:
key: 'pgmon[bgwriter]'
tags:
- tag: Application
value: PostgreSQL
- uuid: f09956f7aaad4b99b0d67339edf26dcf
name: 'Total buffers written by the background writer'
type: DEPENDENT
key: 'pgmon.bgwriter[buffers_clean,total]'
delay: '0'
history: 90d
description: 'Total number of shared buffers written by the background writer'
preprocessing:
- type: JSONPATH
parameters:
- $.buffers_clean
master_item:
key: 'pgmon[bgwriter]'
tags:
- tag: Application
value: PostgreSQL
- uuid: f1c6bd9346b14964bd492d7ccf8baf50
name: 'Total checkpoints due to changes'
type: DEPENDENT
key: 'pgmon.bgwriter[checkpoints_changes,total]'
delay: '0'
history: 90d
description: 'Total number of checkpoints that have occurred due to the number of row changes'
preprocessing:
- type: JSONPATH
parameters:
- $.checkpoints_req
master_item:
key: 'pgmon[bgwriter]'
tags:
- tag: Application
value: PostgreSQL
- uuid: 04c4587d2f2f4f5fbad866d70938cbb3
name: 'Total checkpoints due to time limit'
type: DEPENDENT
key: 'pgmon.bgwriter[checkpoints_timed,total]'
delay: '0'
history: 90d
description: 'Total number of checkpoints that have occurred due to the configured time limit'
preprocessing:
- type: JSONPATH
parameters:
- $.checkpoints_timed
master_item:
key: 'pgmon[bgwriter]'
tags:
- tag: Application
value: PostgreSQL
- uuid: 580a7e632b644aafa1166c534185ba92
name: 'Total time spent syncing files for checkpoints'
type: DEPENDENT
key: 'pgmon.bgwriter[checkpoint_sync_time,total]'
delay: '0'
history: 90d
units: s
description: 'Total time spent syncing files for checkpoints'
preprocessing:
- type: JSONPATH
parameters:
- $.checkpoint_sync_time
- type: MULTIPLIER
parameters:
- '0.001'
master_item:
key: 'pgmon[bgwriter]'
tags:
- tag: Application
value: PostgreSQL
- uuid: 4faa33b55fae4fc68561020636573a46
name: 'Total time spent writing checkpoints'
type: DEPENDENT
key: 'pgmon.bgwriter[checkpoint_write_time,total]'
delay: '0'
history: 90d
units: s
description: 'Total time spent writing checkpoints'
preprocessing:
- type: JSONPATH
parameters:
- $.checkpoint_write_time
- type: MULTIPLIER
parameters:
- '0.001'
master_item:
key: 'pgmon[bgwriter]'
tags:
- tag: Application
value: PostgreSQL
- uuid: c0c829c6cdc046a4b5d216e10362e4bb
name: 'Total number of times the background writer stopped'
type: DEPENDENT
key: 'pgmon.bgwriter[maxwritten_clean,total]'
delay: '0'
history: 90d
description: 'Total number of times the background writer stopped due to reaching the maximum number of buffers it''s allowed to write in a single scan'
preprocessing:
- type: JSONPATH
parameters:
- $.maxwritten_clean
master_item:
key: 'pgmon[bgwriter]'
tags:
- tag: Application
value: PostgreSQL
- uuid: 645fef7b55cd48c69b11de3201e88d78
name: 'Number of locks'
type: DEPENDENT
key: pgmon.locks.count
delay: '0'
history: 90d
description: 'Total number of locks in any database'
preprocessing:
- type: JSONPATH
parameters:
- $.total
master_item:
key: 'pgmon[locks]'
tags:
- tag: Application
value: PostgreSQL
- uuid: a56ad753b1f341928867a47b14ed8b77
name: 'Number of granted locks'
type: DEPENDENT
key: pgmon.locks.granted
delay: '0'
history: 90d
description: 'Total number of granted locks in any database'
preprocessing:
- type: JSONPATH
parameters:
- $.granted
master_item:
key: 'pgmon[locks]'
tags:
- tag: Application
value: PostgreSQL
- uuid: de1fa757395440118026f4c7a7c4ebbe
name: 'PostgreSQL latest supported version'
type: DEPENDENT
@ -95,6 +295,21 @@ zabbix_export:
tags:
- tag: Application
value: PostgreSQL
- uuid: 91baea76ebb240b19c5a5d3913d0b989
name: 'PostgreSQL BGWriter Info'
type: HTTP_AGENT
key: 'pgmon[bgwriter]'
delay: 5m
history: '0'
value_type: TEXT
trends: '0'
description: 'Maximum age of any frozen XID and MXID in any database'
url: 'http://localhost:{$AGENT_PORT}/bgwriter'
tags:
- tag: Application
value: PostgreSQL
- tag: Type
value: Raw
- uuid: 06b1d082ed1e4796bc31cc25f7db6326
name: 'PostgreSQL Backend IO Info'
type: HTTP_AGENT
@ -124,6 +339,21 @@ zabbix_export:
value: PostgreSQL
- tag: Type
value: Raw
- uuid: 4627c156923f4d53bc04789b9b88c133
name: 'PostgreSQL Lock Info'
type: HTTP_AGENT
key: 'pgmon[locks]'
delay: 5m
history: '0'
value_type: TEXT
trends: '0'
description: 'Maximum age of any frozen XID and MXID in any database'
url: 'http://localhost:{$AGENT_PORT}/locks'
tags:
- tag: Application
value: PostgreSQL
- tag: Type
value: Raw
- uuid: 8706eccb7edc4fa394f552fc31f401a9
name: 'PostgreSQL ID Age Info'
type: HTTP_AGENT
@ -167,8 +397,286 @@ zabbix_export:
operator: NOT_MATCHES_REGEX
formulaid: A
lifetime: 30d
enabled_lifetime_type: DISABLE_NEVER
enabled_lifetime_type: DISABLE_AFTER
enabled_lifetime: 1d
item_prototypes:
- uuid: 28d5fe3a4f6848149afed33aa645f677
name: 'Max connection age: active on {#DBNAME}'
type: DEPENDENT
key: 'pgmon_connection_states[active,age,{#DBNAME}]'
delay: '0'
history: 90d
value_type: FLOAT
description: 'Number of disk blocks read in this database'
preprocessing:
- type: JSONPATH
parameters:
- '$[?(@.state == "active")].max_state_time.first()'
master_item:
key: 'pgmon_connection_states[{#DBNAME}]'
tags:
- tag: Application
value: PostgreSQL
- tag: Database
value: '{#DBNAME}'
- uuid: ff1b817c3d1f43dc8b49bfd0dcb0d10a
name: 'Connection count: active on {#DBNAME}'
type: DEPENDENT
key: 'pgmon_connection_states[active,count,{#DBNAME}]'
delay: '0'
history: 90d
description: 'Number of disk blocks read in this database'
preprocessing:
- type: JSONPATH
parameters:
- '$[?(@.state == "active")].backend_count.first()'
master_item:
key: 'pgmon_connection_states[{#DBNAME}]'
tags:
- tag: Application
value: PostgreSQL
- tag: Database
value: '{#DBNAME}'
- uuid: 4c0b9ca43adb45b895e3ba2e200e501e
name: 'Max connection age: disabled on {#DBNAME}'
type: DEPENDENT
key: 'pgmon_connection_states[disabled,age,{#DBNAME}]'
delay: '0'
history: 90d
value_type: FLOAT
description: 'Number of disk blocks read in this database'
preprocessing:
- type: JSONPATH
parameters:
- '$[?(@.state == "disabled")].max_state_time.first()'
master_item:
key: 'pgmon_connection_states[{#DBNAME}]'
tags:
- tag: Application
value: PostgreSQL
- tag: Database
value: '{#DBNAME}'
- uuid: 280dfc9a84e8425399164e6f3e91cf92
name: 'Connection count: disabled on {#DBNAME}'
type: DEPENDENT
key: 'pgmon_connection_states[disabled,count,{#DBNAME}]'
delay: '0'
history: 90d
description: 'Number of disk blocks read in this database'
preprocessing:
- type: JSONPATH
parameters:
- '$[?(@.state == "disabled")].backend_count.first()'
master_item:
key: 'pgmon_connection_states[{#DBNAME}]'
tags:
- tag: Application
value: PostgreSQL
- tag: Database
value: '{#DBNAME}'
- uuid: 63e62108657f47aaa29f7ec6499e45fc
name: 'Max connection age: fastpath function call on {#DBNAME}'
type: DEPENDENT
key: 'pgmon_connection_states[fastpath_function,age,{#DBNAME}]'
delay: '0'
history: 90d
value_type: FLOAT
description: 'Number of disk blocks read in this database'
preprocessing:
- type: JSONPATH
parameters:
- '$[?(@.state == "fastpath function call")].max_state_time.first()'
master_item:
key: 'pgmon_connection_states[{#DBNAME}]'
tags:
- tag: Application
value: PostgreSQL
- tag: Database
value: '{#DBNAME}'
- uuid: 33193e62f1ad4da5b2e30677581e5305
name: 'Connection count: fastpath function call on {#DBNAME}'
type: DEPENDENT
key: 'pgmon_connection_states[fastpath_function,count,{#DBNAME}]'
delay: '0'
history: 90d
description: 'Number of disk blocks read in this database'
preprocessing:
- type: JSONPATH
parameters:
- '$[?(@.state == "fastpath function call")].backend_count.first()'
master_item:
key: 'pgmon_connection_states[{#DBNAME}]'
tags:
- tag: Application
value: PostgreSQL
- tag: Database
value: '{#DBNAME}'
- uuid: 92e2366a56424bc18a88606417eae6e4
name: 'Max connection age: idle on {#DBNAME}'
type: DEPENDENT
key: 'pgmon_connection_states[idle,age,{#DBNAME}]'
delay: '0'
history: 90d
value_type: FLOAT
description: 'Number of disk blocks read in this database'
preprocessing:
- type: JSONPATH
parameters:
- '$[?(@.state == "idle")].max_state_time.first()'
master_item:
key: 'pgmon_connection_states[{#DBNAME}]'
tags:
- tag: Application
value: PostgreSQL
- tag: Database
value: '{#DBNAME}'
- uuid: 505bbbb4c7b745d8bab8f3b33705b76b
name: 'Connection count: idle on {#DBNAME}'
type: DEPENDENT
key: 'pgmon_connection_states[idle,count,{#DBNAME}]'
delay: '0'
history: 90d
description: 'Number of disk blocks read in this database'
preprocessing:
- type: JSONPATH
parameters:
- '$[?(@.state == "idle")].backend_count.first()'
master_item:
key: 'pgmon_connection_states[{#DBNAME}]'
tags:
- tag: Application
value: PostgreSQL
- tag: Database
value: '{#DBNAME}'
- uuid: 282494d5900c4c2e8abd298160c7cbb6
name: 'Max connection age: idle in transaction (aborted) on {#DBNAME}'
type: DEPENDENT
key: 'pgmon_connection_states[idle_aborted,age,{#DBNAME}]'
delay: '0'
history: 90d
value_type: FLOAT
description: 'Number of disk blocks read in this database'
preprocessing:
- type: JSONPATH
parameters:
- '$[?(@.state == "idle in transaction (aborted)")].max_state_time.first()'
master_item:
key: 'pgmon_connection_states[{#DBNAME}]'
tags:
- tag: Application
value: PostgreSQL
- tag: Database
value: '{#DBNAME}'
- uuid: 0da5075d79234602836fc7967a31f1cc
name: 'Connection count: idle in transaction (aborted) on {#DBNAME}'
type: DEPENDENT
key: 'pgmon_connection_states[idle_aborted,count,{#DBNAME}]'
delay: '0'
history: 90d
description: 'Number of disk blocks read in this database'
preprocessing:
- type: JSONPATH
parameters:
- '$[?(@.state == "idle in transaction (aborted)")].backend_count.first()'
master_item:
key: 'pgmon_connection_states[{#DBNAME}]'
tags:
- tag: Application
value: PostgreSQL
- tag: Database
value: '{#DBNAME}'
- uuid: 47aa3f7f4ff1473aae425b4c89472ab4
name: 'Max connection age: idle in transaction on {#DBNAME}'
type: DEPENDENT
key: 'pgmon_connection_states[idle_transaction,age,{#DBNAME}]'
delay: '0'
history: 90d
value_type: FLOAT
description: 'Number of disk blocks read in this database'
preprocessing:
- type: JSONPATH
parameters:
- '$[?(@.state == "idle in transaction")].max_state_time.first()'
master_item:
key: 'pgmon_connection_states[{#DBNAME}]'
tags:
- tag: Application
value: PostgreSQL
- tag: Database
value: '{#DBNAME}'
- uuid: 95519bf0aefd49799601e1bbb488ec90
name: 'Connection count: idle in transaction on {#DBNAME}'
type: DEPENDENT
key: 'pgmon_connection_states[idle_transaction,count,{#DBNAME}]'
delay: '0'
history: 90d
description: 'Number of disk blocks read in this database'
preprocessing:
- type: JSONPATH
parameters:
- '$[?(@.state == "idle in transaction")].backend_count.first()'
master_item:
key: 'pgmon_connection_states[{#DBNAME}]'
tags:
- tag: Application
value: PostgreSQL
- tag: Database
value: '{#DBNAME}'
- uuid: b2ae38a5733d49ceb1a31e774c11785b
name: 'Max connection age: starting on {#DBNAME}'
type: DEPENDENT
key: 'pgmon_connection_states[starting,age,{#DBNAME}]'
delay: '0'
history: 90d
value_type: FLOAT
description: 'Number of disk blocks read in this database'
preprocessing:
- type: JSONPATH
parameters:
- '$[?(@.state == "starting")].max_state_time.first()'
master_item:
key: 'pgmon_connection_states[{#DBNAME}]'
tags:
- tag: Application
value: PostgreSQL
- tag: Database
value: '{#DBNAME}'
- uuid: c1890f2d7ce84a7ebaaadf266d3ffc51
name: 'Connection count: starting on {#DBNAME}'
type: DEPENDENT
key: 'pgmon_connection_states[starting,count,{#DBNAME}]'
delay: '0'
history: 90d
description: 'Number of disk blocks read in this database'
preprocessing:
- type: JSONPATH
parameters:
- '$[?(@.state == "starting")].backend_count.first()'
master_item:
key: 'pgmon_connection_states[{#DBNAME}]'
tags:
- tag: Application
value: PostgreSQL
- tag: Database
value: '{#DBNAME}'
- uuid: 131653f883f448a7b861b16bc4366dfd
name: 'Database Connection States for {#DBNAME}'
type: HTTP_AGENT
key: 'pgmon_connection_states[{#DBNAME}]'
history: '0'
value_type: TEXT
trends: '0'
url: 'http://localhost:{$AGENT_PORT}/activity'
query_fields:
- name: dbname
value: '{#DBNAME}'
tags:
- tag: Application
value: PostgreSQL
- tag: Database
value: '{#DBNAME}'
- tag: Type
value: Raw
- uuid: a30babe4a6f4440bba2a3ee46eff7ce2
name: 'Time spent executing statements on {#DBNAME}'
type: DEPENDENT
@ -982,6 +1490,9 @@ zabbix_export:
type: DEPENDENT
key: pgmon_discover_io_backend_types
delay: '0'
lifetime: 30d
enabled_lifetime_type: DISABLE_AFTER
enabled_lifetime: 1h
item_prototypes:
- uuid: b1ac2e56b30f4812bf33ce973ef16b10
name: 'I/O Evictions by {#BACKEND_TYPE}'
@ -1572,8 +2083,15 @@ zabbix_export:
type: HTTP_AGENT
key: pgmon_discover_rep
delay: 10m
filter:
conditions:
- macro: '{#APPLICATION_NAME}'
value: '^pg_[0-9]+_sync_[0-9]+_[0-9]+$'
operator: NOT_MATCHES_REGEX
formulaid: A
lifetime: 30d
enabled_lifetime_type: DISABLE_NEVER
enabled_lifetime_type: DISABLE_AFTER
enabled_lifetime: 7d
item_prototypes:
- uuid: 3a5a60620e6a4db694e47251148d82f5
name: 'Flush lag for {#REPID}'
@ -1775,6 +2293,8 @@ zabbix_export:
value: Raw
url: 'http://localhost:{$AGENT_PORT}/discover_rep'
lld_macro_paths:
- lld_macro: '{#APPLICATION_NAME}'
path: $.application_name
- lld_macro: '{#CLIENT_ADDR}'
path: $.client_addr
- lld_macro: '{#REPID}'
@ -1786,6 +2306,15 @@ zabbix_export:
type: HTTP_AGENT
key: pgmon_discover_slots
delay: 10m
filter:
conditions:
- macro: '{#SLOT_NAME}'
value: '^pg_[0-9]+_sync_[0-9]+_[0-9]+$'
operator: NOT_MATCHES_REGEX
formulaid: A
lifetime: 30d
enabled_lifetime_type: DISABLE_AFTER
enabled_lifetime: 7d
item_prototypes:
- uuid: 536c5f82e3074ddfbfd842b3a2e8d46c
name: 'Slot {#SLOT_NAME} - Confirmed Flushed Bytes Lag'