Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 3ade30c04a | |||
| ab039dc412 | |||
| 75c5d76047 | |||
| 43cd162313 | |||
| 29bfd07dad | |||
| 60589c2058 | |||
| ea3aca3455 | |||
| cc71547f5f | |||
| 107d5056d6 |
1
.gitignore
vendored
1
.gitignore
vendored
@ -8,3 +8,4 @@ missing
|
||||
__pycache__
|
||||
venv
|
||||
build
|
||||
.coverage
|
||||
|
||||
31
Makefile
31
Makefile
@ -19,6 +19,12 @@ RPM_VERSION := $(VERSION)-$(RPM_RELEASE)
|
||||
DEB_VERSION := $(VERSION)~rc$(RELEASE)
|
||||
endif
|
||||
|
||||
# Python stuff
|
||||
PYTHON ?= python3
|
||||
PYLINT ?= pylint
|
||||
BLACK ?= black
|
||||
|
||||
HAVE_COVERAGE := $(shell $(PYTHON) -c 'import coverage' 2>/dev/null && echo yes)
|
||||
|
||||
# Where packages are built
|
||||
BUILD_DIR := build
|
||||
@ -38,7 +44,7 @@ SUPPORTED := ubuntu-20.04 \
|
||||
# These targets are the main ones to use for most things.
|
||||
##
|
||||
|
||||
.PHONY: all clean tgz test query-tests install-common install-openrc install-systemd
|
||||
.PHONY: all clean tgz lint format test query-tests install-common install-openrc install-systemd
|
||||
|
||||
all: package-all
|
||||
|
||||
@ -80,14 +86,35 @@ tgz:
|
||||
clean:
|
||||
rm -rf $(BUILD_DIR)
|
||||
|
||||
# Check for lint
|
||||
lint:
|
||||
$(PYLINT) src/pgmon.py
|
||||
$(PYLINT) src/test_pgmon.py
|
||||
$(BLACK) --check --diff src/pgmon.py
|
||||
$(BLACK) --check --diff src/test_pgmon.py
|
||||
|
||||
# Format the code using black
|
||||
format:
|
||||
$(BLACK) src/pgmon.py
|
||||
$(BLACK) src/test_pgmon.py
|
||||
|
||||
# Run unit tests for the script
|
||||
test:
|
||||
cd src ; python3 -m unittest
|
||||
ifeq ($(HAVE_COVERAGE),yes)
|
||||
cd src ; $(PYTHON) -m coverage run -m unittest && python3 -m coverage report -m
|
||||
else
|
||||
cd src ; $(PYTHON) -m unittest
|
||||
endif
|
||||
|
||||
# Run query tests
|
||||
query-tests:
|
||||
cd tests ; ./run-tests.sh
|
||||
|
||||
# Compare the sample metrics with the Zabbix template
|
||||
template-coverage:
|
||||
$(PYTHON) zabbix_templates/coverage.py sample-config/pgmon-metrics.yml zabbix_templates/pgmon_templates.yaml
|
||||
|
||||
|
||||
# Install the script at the specified base directory (common components)
|
||||
install-common:
|
||||
# Set up directories
|
||||
|
||||
@ -18,6 +18,7 @@ metrics:
|
||||
query:
|
||||
0: >
|
||||
SELECT host(client_addr) || '_' || regexp_replace(application_name, '[ ,]', '_', 'g') AS repid,
|
||||
application_name,
|
||||
client_addr,
|
||||
state
|
||||
FROM pg_stat_replication
|
||||
@ -229,7 +230,19 @@ metrics:
|
||||
SELECT COUNT(*) FILTER (WHERE has_sequence_privilege(c.oid, 'SELECT,USAGE')) AS visible_sequences,
|
||||
COUNT(*) AS total_sequences
|
||||
FROM pg_class AS c
|
||||
WHERE relkind = 'S';
|
||||
WHERE relkind = 'S'
|
||||
|
||||
locks:
|
||||
type: row
|
||||
query:
|
||||
0:
|
||||
SELECT COUNT(*) AS total,
|
||||
SUM(CASE WHEN granted THEN 1 ELSE 0 END) AS granted
|
||||
FROM pg_locks
|
||||
90400: >
|
||||
SELECT COUNT(*) AS total,
|
||||
COUNT(*) FILTER (WHERE granted) AS granted
|
||||
FROM pg_locks
|
||||
|
||||
|
||||
##
|
||||
|
||||
667
src/pgmon.py
667
src/pgmon.py
@ -1,105 +1,147 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
pgmon is a monitoring intermediary that sits between a PostgreSQL cluster and a monitoring systen
|
||||
that is capable of parsing JSON responses over an HTTP connection.
|
||||
"""
|
||||
|
||||
# pylint: disable=too-few-public-methods
|
||||
|
||||
import yaml
|
||||
import json
|
||||
import time
|
||||
import os
|
||||
import sys
|
||||
|
||||
import signal
|
||||
import argparse
|
||||
import logging
|
||||
import re
|
||||
|
||||
from decimal import Decimal
|
||||
|
||||
from urllib.parse import urlparse, parse_qs
|
||||
|
||||
from contextlib import contextmanager
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from http.server import BaseHTTPRequestHandler
|
||||
from http.server import ThreadingHTTPServer
|
||||
|
||||
from threading import Lock
|
||||
|
||||
import yaml
|
||||
|
||||
import psycopg2
|
||||
from psycopg2.extras import RealDictCursor
|
||||
from psycopg2.pool import ThreadedConnectionPool
|
||||
|
||||
from contextlib import contextmanager
|
||||
|
||||
import signal
|
||||
from threading import Thread, Lock, Semaphore
|
||||
|
||||
from http.server import BaseHTTPRequestHandler, HTTPServer
|
||||
from http.server import ThreadingHTTPServer
|
||||
from urllib.parse import urlparse, parse_qs
|
||||
|
||||
import requests
|
||||
import re
|
||||
|
||||
from decimal import Decimal
|
||||
|
||||
VERSION = "1.0.4"
|
||||
|
||||
# Configuration
|
||||
config = {}
|
||||
|
||||
# Dictionary of current PostgreSQL connection pools
|
||||
connections_lock = Lock()
|
||||
connections = {}
|
||||
class Context:
|
||||
"""
|
||||
The global context for connections, config, version, nad IPC
|
||||
"""
|
||||
|
||||
# Dictionary of unhappy databases. Keys are database names, value is the time
|
||||
# the database was determined to be unhappy plus the cooldown setting. So,
|
||||
# basically it's the time when we should try to connect to the database again.
|
||||
unhappy_cooldown = {}
|
||||
# Configuration
|
||||
config = {}
|
||||
|
||||
# Version information
|
||||
cluster_version = None
|
||||
cluster_version_next_check = None
|
||||
cluster_version_lock = Lock()
|
||||
# Dictionary of current PostgreSQL connection pools
|
||||
connections_lock = Lock()
|
||||
connections = {}
|
||||
|
||||
# PostgreSQL latest version information
|
||||
latest_version = None
|
||||
latest_version_next_check = None
|
||||
latest_version_lock = Lock()
|
||||
release_supported = None
|
||||
# Dictionary of unhappy databases. Keys are database names, value is the time
|
||||
# the database was determined to be unhappy plus the cooldown setting. So,
|
||||
# basically it's the time when we should try to connect to the database again.
|
||||
unhappy_cooldown = {}
|
||||
|
||||
# Running state (used to gracefully shut down)
|
||||
running = True
|
||||
# Version information
|
||||
cluster_version = None
|
||||
cluster_version_next_check = None
|
||||
cluster_version_lock = Lock()
|
||||
|
||||
# The http server object
|
||||
httpd = None
|
||||
# PostgreSQL latest version information
|
||||
latest_version = None
|
||||
latest_version_next_check = None
|
||||
latest_version_lock = Lock()
|
||||
release_supported = None
|
||||
|
||||
# Where the config file lives
|
||||
config_file = None
|
||||
# Running state (used to gracefully shut down)
|
||||
running = True
|
||||
|
||||
# Configure logging
|
||||
log = logging.getLogger(__name__)
|
||||
formatter = logging.Formatter(
|
||||
"%(asctime)s - %(levelname)s - %(filename)s: %(funcName)s() line %(lineno)d: %(message)s"
|
||||
)
|
||||
console_log_handler = logging.StreamHandler()
|
||||
console_log_handler.setFormatter(formatter)
|
||||
log.addHandler(console_log_handler)
|
||||
# The http server object
|
||||
httpd = None
|
||||
|
||||
# Where the config file lives
|
||||
config_file = None
|
||||
|
||||
# Configure logging
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
@classmethod
|
||||
def init_logging(cls):
|
||||
"""
|
||||
Actually initialize the logging framework. Since we don't ever instantiate the Context
|
||||
class, this provides a way to make a few modifications to the log handler.
|
||||
"""
|
||||
|
||||
formatter = logging.Formatter(
|
||||
"%(asctime)s - %(levelname)s - %(filename)s: "
|
||||
"%(funcName)s() line %(lineno)d: %(message)s"
|
||||
)
|
||||
console_log_handler = logging.StreamHandler()
|
||||
console_log_handler.setFormatter(formatter)
|
||||
cls.log.addHandler(console_log_handler)
|
||||
|
||||
|
||||
# Error types
|
||||
class ConfigError(Exception):
|
||||
pass
|
||||
"""
|
||||
Error type for all config related errors.
|
||||
"""
|
||||
|
||||
|
||||
class DisconnectedError(Exception):
|
||||
pass
|
||||
"""
|
||||
Error indicating a previously active connection to the database has been disconnected.
|
||||
"""
|
||||
|
||||
|
||||
class UnhappyDBError(Exception):
|
||||
pass
|
||||
"""
|
||||
Error indicating that a database the code has been asked to connect to is on the unhappy list.
|
||||
"""
|
||||
|
||||
|
||||
class UnknownMetricError(Exception):
|
||||
pass
|
||||
"""
|
||||
Error indicating that an undefined metric was requested.
|
||||
"""
|
||||
|
||||
|
||||
class MetricVersionError(Exception):
|
||||
pass
|
||||
"""
|
||||
Error indicating that there is no suitable query for a metric that was requested for the
|
||||
version of PostgreSQL being monitored.
|
||||
"""
|
||||
|
||||
|
||||
class LatestVersionCheckError(Exception):
|
||||
pass
|
||||
"""
|
||||
Error indicating that there was a problem retrieving or parsing the latest version information.
|
||||
"""
|
||||
|
||||
|
||||
class InvalidDataError(Exception):
|
||||
"""
|
||||
Error indicating query results were somehow invalid
|
||||
"""
|
||||
|
||||
|
||||
# Default config settings
|
||||
default_config = {
|
||||
DEFAULT_CONFIG = {
|
||||
# The address the agent binds to
|
||||
"address": "127.0.0.1",
|
||||
# The port the agent listens on for requests
|
||||
@ -177,6 +219,60 @@ def update_deep(d1, d2):
|
||||
return d1
|
||||
|
||||
|
||||
def validate_metric(path, name, metric):
|
||||
"""
|
||||
Validate a metric definition from a given file. If any query definitions come from external
|
||||
files, the metric dict will be updated with the actual query.
|
||||
|
||||
Params:
|
||||
path: path to the file which contains this definition
|
||||
name: name of the metric
|
||||
metric: the dictionary containing the metric definition
|
||||
"""
|
||||
# Validate return types
|
||||
try:
|
||||
if metric["type"] not in ["value", "row", "column", "set"]:
|
||||
raise ConfigError(
|
||||
"Invalid return type: {} for metric {} in {}".format(
|
||||
metric["type"], name, path
|
||||
)
|
||||
)
|
||||
except KeyError as e:
|
||||
raise ConfigError(
|
||||
"No type specified for metric {} in {}".format(name, path)
|
||||
) from e
|
||||
|
||||
# Ensure queries exist
|
||||
query_dict = metric.get("query", {})
|
||||
if not isinstance(query_dict, dict):
|
||||
raise ConfigError(
|
||||
"Query definition should be a dictionary, got: {} for metric {} in {}".format(
|
||||
query_dict, name, path
|
||||
)
|
||||
)
|
||||
|
||||
if len(query_dict) == 0:
|
||||
raise ConfigError("Missing queries for metric {} in {}".format(name, path))
|
||||
|
||||
# Read external sql files and validate version keys
|
||||
config_base = os.path.dirname(path)
|
||||
for vers, query in metric["query"].items():
|
||||
try:
|
||||
int(vers)
|
||||
except Exception as e:
|
||||
raise ConfigError(
|
||||
"Invalid version: {} for metric {} in {}".format(vers, name, path)
|
||||
) from e
|
||||
|
||||
# Read in the external query and update the definition in the metricdictionary
|
||||
if query.startswith("file:"):
|
||||
query_path = query[5:]
|
||||
if not query_path.startswith("/"):
|
||||
query_path = os.path.join(config_base, query_path)
|
||||
with open(query_path, "r", encoding="utf-8") as f:
|
||||
metric["query"][vers] = f.read()
|
||||
|
||||
|
||||
def read_config(path, included=False):
|
||||
"""
|
||||
Read a config file.
|
||||
@ -185,61 +281,21 @@ def read_config(path, included=False):
|
||||
path: path to the file to read
|
||||
included: is this file included by another file?
|
||||
"""
|
||||
|
||||
# Read config file
|
||||
log.info("Reading log file: {}".format(path))
|
||||
with open(path, "r") as f:
|
||||
Context.log.info("Reading log file: %s", path)
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
try:
|
||||
cfg = yaml.safe_load(f)
|
||||
except yaml.parser.ParserError as e:
|
||||
raise ConfigError("Inavlid config file: {}: {}".format(path, e))
|
||||
|
||||
# Since we use it a few places, get the base directory from the config
|
||||
config_base = os.path.dirname(path)
|
||||
raise ConfigError("Inavlid config file: {}: {}".format(path, e)) from e
|
||||
|
||||
# Read any external queries and validate metric definitions
|
||||
for name, metric in cfg.get("metrics", {}).items():
|
||||
# Validate return types
|
||||
try:
|
||||
if metric["type"] not in ["value", "row", "column", "set"]:
|
||||
raise ConfigError(
|
||||
"Invalid return type: {} for metric {} in {}".format(
|
||||
metric["type"], name, path
|
||||
)
|
||||
)
|
||||
except KeyError:
|
||||
raise ConfigError(
|
||||
"No type specified for metric {} in {}".format(name, path)
|
||||
)
|
||||
|
||||
# Ensure queries exist
|
||||
query_dict = metric.get("query", {})
|
||||
if type(query_dict) is not dict:
|
||||
raise ConfigError(
|
||||
"Query definition should be a dictionary, got: {} for metric {} in {}".format(
|
||||
query_dict, name, path
|
||||
)
|
||||
)
|
||||
|
||||
if len(query_dict) == 0:
|
||||
raise ConfigError("Missing queries for metric {} in {}".format(name, path))
|
||||
|
||||
# Read external sql files and validate version keys
|
||||
for vers, query in metric["query"].items():
|
||||
try:
|
||||
int(vers)
|
||||
except:
|
||||
raise ConfigError(
|
||||
"Invalid version: {} for metric {} in {}".format(vers, name, path)
|
||||
)
|
||||
|
||||
if query.startswith("file:"):
|
||||
query_path = query[5:]
|
||||
if not query_path.startswith("/"):
|
||||
query_path = os.path.join(config_base, query_path)
|
||||
with open(query_path, "r") as f:
|
||||
metric["query"][vers] = f.read()
|
||||
validate_metric(path, name, metric)
|
||||
|
||||
# Read any included config files
|
||||
config_base = os.path.dirname(path)
|
||||
for inc in cfg.get("include", []):
|
||||
# Prefix relative paths with the directory from the current config
|
||||
if not inc.startswith("/"):
|
||||
@ -250,34 +306,37 @@ def read_config(path, included=False):
|
||||
# config
|
||||
if included:
|
||||
return cfg
|
||||
else:
|
||||
new_config = {}
|
||||
update_deep(new_config, default_config)
|
||||
update_deep(new_config, cfg)
|
||||
|
||||
# Minor sanity checks
|
||||
if len(new_config["metrics"]) == 0:
|
||||
log.error("No metrics are defined")
|
||||
raise ConfigError("No metrics defined")
|
||||
new_config = {}
|
||||
update_deep(new_config, DEFAULT_CONFIG)
|
||||
update_deep(new_config, cfg)
|
||||
|
||||
# Validate the new log level before changing the config
|
||||
if new_config["log_level"].upper() not in [
|
||||
"DEBUG",
|
||||
"INFO",
|
||||
"WARNING",
|
||||
"ERROR",
|
||||
"CRITICAL",
|
||||
]:
|
||||
raise ConfigError("Invalid log level: {}".format(new_config["log_level"]))
|
||||
# Minor sanity checks
|
||||
if len(new_config["metrics"]) == 0:
|
||||
Context.log.error("No metrics are defined")
|
||||
raise ConfigError("No metrics defined")
|
||||
|
||||
global config
|
||||
config = new_config
|
||||
# Validate the new log level before changing the config
|
||||
if new_config["log_level"].upper() not in [
|
||||
"DEBUG",
|
||||
"INFO",
|
||||
"WARNING",
|
||||
"ERROR",
|
||||
"CRITICAL",
|
||||
]:
|
||||
raise ConfigError("Invalid log level: {}".format(new_config["log_level"]))
|
||||
|
||||
# Apply changes to log level
|
||||
log.setLevel(logging.getLevelName(config["log_level"].upper()))
|
||||
Context.config = new_config
|
||||
|
||||
# Apply changes to log level
|
||||
Context.log.setLevel(logging.getLevelName(Context.config["log_level"].upper()))
|
||||
|
||||
# Return the config (mostly to make pylint happy, but also in case I opt to remove the side
|
||||
# effect and make this more functional.
|
||||
return Context.config
|
||||
|
||||
|
||||
def signal_handler(sig, frame):
|
||||
def signal_handler(sig, frame): # pylint: disable=unused-argument
|
||||
"""
|
||||
Function for handling signals
|
||||
|
||||
@ -288,19 +347,22 @@ def signal_handler(sig, frame):
|
||||
|
||||
# Signal everything to shut down
|
||||
if sig in [signal.SIGINT, signal.SIGTERM, signal.SIGQUIT]:
|
||||
log.info("Shutting down ...")
|
||||
global running
|
||||
running = False
|
||||
if httpd is not None:
|
||||
httpd.socket.close()
|
||||
Context.log.info("Shutting down ...")
|
||||
Context.running = False
|
||||
if Context.httpd is not None:
|
||||
Context.httpd.socket.close()
|
||||
|
||||
# Signal a reload
|
||||
if sig == signal.SIGHUP:
|
||||
log.warning("Received config reload signal")
|
||||
read_config(config_file)
|
||||
Context.log.warning("Received config reload signal")
|
||||
read_config(Context.config_file)
|
||||
|
||||
|
||||
class ConnectionPool(ThreadedConnectionPool):
|
||||
"""
|
||||
Threaded connection pool that has a context manager.
|
||||
"""
|
||||
|
||||
def __init__(self, dbname, minconn, maxconn, *args, **kwargs):
|
||||
# Make sure dbname isn't different in the kwargs
|
||||
kwargs["dbname"] = dbname
|
||||
@ -309,7 +371,14 @@ class ConnectionPool(ThreadedConnectionPool):
|
||||
self.name = dbname
|
||||
|
||||
@contextmanager
|
||||
def connection(self, timeout=None):
|
||||
def connection(self, timeout):
|
||||
"""
|
||||
Connection context manager for our connection pool. This will attempt to retrieve a
|
||||
connection until the timeout is reached.
|
||||
|
||||
Params:
|
||||
timeout: how long to keep trying to get a connection bedore giving up
|
||||
"""
|
||||
conn = None
|
||||
timeout_time = datetime.now() + timedelta(timeout)
|
||||
# We will continue to try to get a connection slot until we time out
|
||||
@ -333,34 +402,37 @@ class ConnectionPool(ThreadedConnectionPool):
|
||||
def get_pool(dbname):
|
||||
"""
|
||||
Get a database connection pool.
|
||||
|
||||
Params:
|
||||
dbname: the name of the database for which a connection pool should be returned.
|
||||
"""
|
||||
# Check if the db is unhappy and wants to be left alone
|
||||
if dbname in unhappy_cooldown:
|
||||
if unhappy_cooldown[dbname] > datetime.now():
|
||||
if dbname in Context.unhappy_cooldown:
|
||||
if Context.unhappy_cooldown[dbname] > datetime.now():
|
||||
raise UnhappyDBError()
|
||||
|
||||
# Create a connection pool if it doesn't already exist
|
||||
if dbname not in connections:
|
||||
with connections_lock:
|
||||
if dbname not in Context.connections:
|
||||
with Context.connections_lock:
|
||||
# Make sure nobody created the pool while we were waiting on the
|
||||
# lock
|
||||
if dbname not in connections:
|
||||
log.info("Creating connection pool for: {}".format(dbname))
|
||||
if dbname not in Context.connections:
|
||||
Context.log.info("Creating connection pool for: %s", dbname)
|
||||
# Actually create the connection pool
|
||||
connections[dbname] = ConnectionPool(
|
||||
Context.connections[dbname] = ConnectionPool(
|
||||
dbname,
|
||||
int(config["min_pool_size"]),
|
||||
int(config["max_pool_size"]),
|
||||
int(Context.config["min_pool_size"]),
|
||||
int(Context.config["max_pool_size"]),
|
||||
application_name="pgmon",
|
||||
host=config["dbhost"],
|
||||
port=config["dbport"],
|
||||
user=config["dbuser"],
|
||||
connect_timeout=int(config["connect_timeout"]),
|
||||
sslmode=config["ssl_mode"],
|
||||
host=Context.config["dbhost"],
|
||||
port=Context.config["dbport"],
|
||||
user=Context.config["dbuser"],
|
||||
connect_timeout=int(Context.config["connect_timeout"]),
|
||||
sslmode=Context.config["ssl_mode"],
|
||||
)
|
||||
# Clear the unhappy indicator if present
|
||||
unhappy_cooldown.pop(dbname, None)
|
||||
return connections[dbname]
|
||||
Context.unhappy_cooldown.pop(dbname, None)
|
||||
return Context.connections[dbname]
|
||||
|
||||
|
||||
def handle_connect_failure(pool):
|
||||
@ -368,8 +440,8 @@ def handle_connect_failure(pool):
|
||||
Mark the database as being unhappy so we can leave it alone for a while
|
||||
"""
|
||||
dbname = pool.name
|
||||
unhappy_cooldown[dbname] = datetime.now() + timedelta(
|
||||
seconds=int(config["reconnect_cooldown"])
|
||||
Context.unhappy_cooldown[dbname] = datetime.now() + timedelta(
|
||||
seconds=int(Context.config["reconnect_cooldown"])
|
||||
)
|
||||
|
||||
|
||||
@ -400,41 +472,67 @@ def json_encode_special(obj):
|
||||
"""
|
||||
if isinstance(obj, Decimal):
|
||||
return float(obj)
|
||||
raise TypeError(f'Cannot serialize object of {type(obj)}')
|
||||
raise TypeError("Cannot serialize object of {}".format(type(obj)))
|
||||
|
||||
|
||||
def json_encode_result(return_type, res):
|
||||
"""
|
||||
Return a json string encoding of the results of a query.
|
||||
|
||||
params:
|
||||
return_type: the expected structure to return. One of:
|
||||
value, row, column, set
|
||||
res: the query results
|
||||
|
||||
returns: a json string form of the results
|
||||
|
||||
raises:
|
||||
ConfigError: when an invalid return_type is given
|
||||
InvalidDataError: when the query results don't match the return type
|
||||
"""
|
||||
try:
|
||||
if return_type == "value":
|
||||
if len(res) == 0:
|
||||
return ""
|
||||
return str(list(res[0].values())[0])
|
||||
|
||||
if return_type == "row":
|
||||
return json.dumps(
|
||||
res[0] if len(res) > 0 else {}, default=json_encode_special
|
||||
)
|
||||
|
||||
if return_type == "column":
|
||||
return json.dumps(
|
||||
[list(r.values())[0] for r in res], default=json_encode_special
|
||||
)
|
||||
|
||||
if return_type == "set":
|
||||
return json.dumps(res, default=json_encode_special)
|
||||
except IndexError as e:
|
||||
raise InvalidDataError(e) from e
|
||||
|
||||
# If we got to this point, the return type is invalid
|
||||
raise ConfigError("Invalid query return type: {}".format(return_type))
|
||||
|
||||
|
||||
def run_query_no_retry(pool, return_type, query, args):
|
||||
"""
|
||||
Run the query with no explicit retry code
|
||||
"""
|
||||
with pool.connection(float(config["connect_timeout"])) as conn:
|
||||
with pool.connection(float(Context.config["connect_timeout"])) as conn:
|
||||
try:
|
||||
with conn.cursor(cursor_factory=RealDictCursor) as curs:
|
||||
curs.execute(query, args)
|
||||
res = curs.fetchall()
|
||||
|
||||
if return_type == "value":
|
||||
if len(res) == 0:
|
||||
return ""
|
||||
return str(list(res[0].values())[0])
|
||||
elif return_type == "row":
|
||||
if len(res) == 0:
|
||||
return "[]"
|
||||
return json.dumps(res[0], default=json_encode_special)
|
||||
elif return_type == "column":
|
||||
if len(res) == 0:
|
||||
return "[]"
|
||||
return json.dumps([list(r.values())[0] for r in res], default=json_encode_special)
|
||||
elif return_type == "set":
|
||||
return json.dumps(res, default=json_encode_special)
|
||||
except:
|
||||
return json_encode_result(return_type, res)
|
||||
except Exception as e:
|
||||
dbname = pool.name
|
||||
if dbname in unhappy_cooldown:
|
||||
raise UnhappyDBError()
|
||||
elif conn.closed != 0:
|
||||
raise DisconnectedError()
|
||||
else:
|
||||
raise
|
||||
if dbname in Context.unhappy_cooldown:
|
||||
raise UnhappyDBError() from e
|
||||
if conn.closed != 0:
|
||||
raise DisconnectedError() from e
|
||||
raise
|
||||
|
||||
|
||||
def run_query(pool, return_type, query, args):
|
||||
@ -455,7 +553,7 @@ def run_query(pool, return_type, query, args):
|
||||
try:
|
||||
return run_query_no_retry(pool, return_type, query, args)
|
||||
except DisconnectedError:
|
||||
log.warning("Stale PostgreSQL connection found ... trying again")
|
||||
Context.log.warning("Stale PostgreSQL connection found ... trying again")
|
||||
# This sleep is an annoying hack to give the pool workers time to
|
||||
# actually mark the connection, otherwise it can be given back in the
|
||||
# next connection() call
|
||||
@ -463,9 +561,9 @@ def run_query(pool, return_type, query, args):
|
||||
time.sleep(1)
|
||||
try:
|
||||
return run_query_no_retry(pool, return_type, query, args)
|
||||
except:
|
||||
except Exception as e:
|
||||
handle_connect_failure(pool)
|
||||
raise UnhappyDBError()
|
||||
raise UnhappyDBError() from e
|
||||
|
||||
|
||||
def get_cluster_version():
|
||||
@ -473,40 +571,39 @@ def get_cluster_version():
|
||||
Get the PostgreSQL version if we don't already know it, or if it's been
|
||||
too long sice the last time it was checked.
|
||||
"""
|
||||
global cluster_version
|
||||
global cluster_version_next_check
|
||||
|
||||
# If we don't know the version or it's past the recheck time, get the
|
||||
# version from the database. Only one thread needs to do this, so they all
|
||||
# try to grab the lock, and then make sure nobody else beat them to it.
|
||||
if (
|
||||
cluster_version is None
|
||||
or cluster_version_next_check is None
|
||||
or cluster_version_next_check < datetime.now()
|
||||
Context.cluster_version is None
|
||||
or Context.cluster_version_next_check is None
|
||||
or Context.cluster_version_next_check < datetime.now()
|
||||
):
|
||||
with cluster_version_lock:
|
||||
with Context.cluster_version_lock:
|
||||
# Only check if nobody already got the version before us
|
||||
if (
|
||||
cluster_version is None
|
||||
or cluster_version_next_check is None
|
||||
or cluster_version_next_check < datetime.now()
|
||||
Context.cluster_version is None
|
||||
or Context.cluster_version_next_check is None
|
||||
or Context.cluster_version_next_check < datetime.now()
|
||||
):
|
||||
log.info("Checking PostgreSQL cluster version")
|
||||
pool = get_pool(config["dbname"])
|
||||
cluster_version = int(
|
||||
Context.log.info("Checking PostgreSQL cluster version")
|
||||
pool = get_pool(Context.config["dbname"])
|
||||
Context.cluster_version = int(
|
||||
run_query(pool, "value", "SHOW server_version_num", None)
|
||||
)
|
||||
cluster_version_next_check = datetime.now() + timedelta(
|
||||
seconds=int(config["version_check_period"])
|
||||
Context.cluster_version_next_check = datetime.now() + timedelta(
|
||||
seconds=int(Context.config["version_check_period"])
|
||||
)
|
||||
log.info("Got PostgreSQL cluster version: {}".format(cluster_version))
|
||||
log.debug(
|
||||
"Next PostgreSQL cluster version check will be after: {}".format(
|
||||
cluster_version_next_check
|
||||
)
|
||||
Context.log.info(
|
||||
"Got PostgreSQL cluster version: %s", Context.cluster_version
|
||||
)
|
||||
Context.log.debug(
|
||||
"Next PostgreSQL cluster version check will be after: %s",
|
||||
Context.cluster_version_next_check,
|
||||
)
|
||||
|
||||
return cluster_version
|
||||
return Context.cluster_version
|
||||
|
||||
|
||||
def version_num_to_release(version_num):
|
||||
@ -519,8 +616,7 @@ def version_num_to_release(version_num):
|
||||
"""
|
||||
if version_num // 10000 < 10:
|
||||
return version_num // 10000 + (version_num % 10000 // 100 / 10)
|
||||
else:
|
||||
return version_num // 10000
|
||||
return version_num // 10000
|
||||
|
||||
|
||||
def parse_version_rss(raw_rss, release):
|
||||
@ -528,7 +624,7 @@ def parse_version_rss(raw_rss, release):
|
||||
Parse the raw RSS from the versions.rss feed to extract the latest version of
|
||||
PostgreSQL that's availabe for the cluster being monitored.
|
||||
|
||||
This sets these global variables:
|
||||
This sets these Context variables:
|
||||
latest_version
|
||||
release_supported
|
||||
|
||||
@ -538,8 +634,6 @@ def parse_version_rss(raw_rss, release):
|
||||
raw_rss: The raw rss text from versions.rss
|
||||
release: The PostgreSQL release we care about (ex: 9.2, 14)
|
||||
"""
|
||||
global latest_version
|
||||
global release_supported
|
||||
|
||||
# Regular expressions for parsing the RSS document
|
||||
version_line = re.compile(
|
||||
@ -559,75 +653,75 @@ def parse_version_rss(raw_rss, release):
|
||||
version = m.group(1)
|
||||
parts = list(map(int, version.split(".")))
|
||||
if parts[0] < 10:
|
||||
latest_version = int(
|
||||
Context.latest_version = int(
|
||||
"{}{:02}{:02}".format(parts[0], parts[1], parts[2])
|
||||
)
|
||||
else:
|
||||
latest_version = int("{}00{:02}".format(parts[0], parts[1]))
|
||||
Context.latest_version = int("{}00{:02}".format(parts[0], parts[1]))
|
||||
elif release_found:
|
||||
# The next line after the version tells if the version is supported
|
||||
if unsupported_line.match(line):
|
||||
release_supported = False
|
||||
Context.release_supported = False
|
||||
else:
|
||||
release_supported = True
|
||||
Context.release_supported = True
|
||||
break
|
||||
|
||||
# Make sure we actually found it
|
||||
if not release_found:
|
||||
raise LatestVersionCheckError("Current release ({}) not found".format(release))
|
||||
|
||||
log.info(
|
||||
"Got latest PostgreSQL version: {} supported={}".format(
|
||||
latest_version, release_supported
|
||||
)
|
||||
Context.log.info(
|
||||
"Got latest PostgreSQL version: %s supported=%s",
|
||||
Context.latest_version,
|
||||
Context.release_supported,
|
||||
)
|
||||
log.debug(
|
||||
"Next latest PostgreSQL version check will be after: {}".format(
|
||||
latest_version_next_check
|
||||
)
|
||||
Context.log.debug(
|
||||
"Next latest PostgreSQL version check will be after: %s",
|
||||
Context.latest_version_next_check,
|
||||
)
|
||||
|
||||
|
||||
def get_latest_version():
|
||||
"""
|
||||
Get the latest supported version of the major PostgreSQL release running on the server being monitored.
|
||||
Get the latest supported version of the major PostgreSQL release running on the server being
|
||||
monitored.
|
||||
"""
|
||||
|
||||
global latest_version_next_check
|
||||
|
||||
# If we don't know the latest version or it's past the recheck time, get the
|
||||
# version from the PostgreSQL RSS feed. Only one thread needs to do this, so
|
||||
# they all try to grab the lock, and then make sure nobody else beat them to it.
|
||||
if (
|
||||
latest_version is None
|
||||
or latest_version_next_check is None
|
||||
or latest_version_next_check < datetime.now()
|
||||
Context.latest_version is None
|
||||
or Context.latest_version_next_check is None
|
||||
or Context.latest_version_next_check < datetime.now()
|
||||
):
|
||||
# Note: we get the cluster version here before grabbing the latest_version_lock
|
||||
# lock so it's not held while trying to talk with the DB.
|
||||
release = version_num_to_release(get_cluster_version())
|
||||
|
||||
with latest_version_lock:
|
||||
with Context.latest_version_lock:
|
||||
# Only check if nobody already got the version before us
|
||||
if (
|
||||
latest_version is None
|
||||
or latest_version_next_check is None
|
||||
or latest_version_next_check < datetime.now()
|
||||
Context.latest_version is None
|
||||
or Context.latest_version_next_check is None
|
||||
or Context.latest_version_next_check < datetime.now()
|
||||
):
|
||||
log.info("Checking latest PostgreSQL version")
|
||||
latest_version_next_check = datetime.now() + timedelta(
|
||||
seconds=int(config["latest_version_check_period"])
|
||||
Context.log.info("Checking latest PostgreSQL version")
|
||||
Context.latest_version_next_check = datetime.now() + timedelta(
|
||||
seconds=int(Context.config["latest_version_check_period"])
|
||||
)
|
||||
|
||||
# Grab the RSS feed
|
||||
raw_rss = requests.get("https://www.postgresql.org/versions.rss")
|
||||
raw_rss = requests.get(
|
||||
"https://www.postgresql.org/versions.rss", timeout=30
|
||||
)
|
||||
if raw_rss.status_code != 200:
|
||||
raise LatestVersionCheckError("code={}".format(r.status_code))
|
||||
raise LatestVersionCheckError("code={}".format(raw_rss.status_code))
|
||||
|
||||
# Parse the RSS body and set global variables
|
||||
# Parse the RSS body and set Context variables
|
||||
parse_version_rss(raw_rss.text, release)
|
||||
|
||||
return latest_version
|
||||
return Context.latest_version
|
||||
|
||||
|
||||
def sample_metric(dbname, metric_name, args, retry=True):
|
||||
@ -636,9 +730,9 @@ def sample_metric(dbname, metric_name, args, retry=True):
|
||||
"""
|
||||
# Get the metric definition
|
||||
try:
|
||||
metric = config["metrics"][metric_name]
|
||||
except KeyError:
|
||||
raise UnknownMetricError("Unknown metric: {}".format(metric_name))
|
||||
metric = Context.config["metrics"][metric_name]
|
||||
except KeyError as e:
|
||||
raise UnknownMetricError("Unknown metric: {}".format(metric_name)) from e
|
||||
|
||||
# Get the connection pool for the database, or create one if it doesn't
|
||||
# already exist.
|
||||
@ -653,8 +747,7 @@ def sample_metric(dbname, metric_name, args, retry=True):
|
||||
# Execute the quert
|
||||
if retry:
|
||||
return run_query(pool, metric["type"], query, args)
|
||||
else:
|
||||
return run_query_no_retry(pool, metric["type"], query, args)
|
||||
return run_query_no_retry(pool, metric["type"], query, args)
|
||||
|
||||
|
||||
def test_queries():
|
||||
@ -662,12 +755,17 @@ def test_queries():
|
||||
Run all of the metric queries against a database and check the results
|
||||
"""
|
||||
# We just use the default db for tests
|
||||
dbname = config["dbname"]
|
||||
dbname = Context.config["dbname"]
|
||||
# Loop through all defined metrics.
|
||||
for name, metric in config["metrics"].items():
|
||||
for name, metric in Context.config["metrics"].items():
|
||||
# If the metric has arguments to use while testing, grab those
|
||||
args = metric.get("test_args", {})
|
||||
print("Testing {} [{}]".format(name, ", ".join(["{}={}".format(key, value) for key, value in args.items()])))
|
||||
print(
|
||||
"Testing {} [{}]".format(
|
||||
name,
|
||||
", ".join(["{}={}".format(key, value) for key, value in args.items()]),
|
||||
)
|
||||
)
|
||||
# When testing against a docker container, we may end up connecting
|
||||
# before the service is truly up (it restarts during the initialization
|
||||
# phase). To cope with this, we'll allow a few connection failures.
|
||||
@ -704,9 +802,8 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
|
||||
"""
|
||||
Override to suppress standard request logging
|
||||
"""
|
||||
pass
|
||||
|
||||
def do_GET(self):
|
||||
def do_GET(self): # pylint: disable=invalid-name
|
||||
"""
|
||||
Handle a request. This is just a wrapper around the actual handler
|
||||
code to keep things more readable.
|
||||
@ -714,7 +811,7 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
|
||||
try:
|
||||
self._handle_request()
|
||||
except BrokenPipeError:
|
||||
log.error("Client disconnected, exiting handler")
|
||||
Context.log.error("Client disconnected, exiting handler")
|
||||
|
||||
def _handle_request(self):
|
||||
"""
|
||||
@ -727,7 +824,6 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
|
||||
|
||||
if metric_name == "agent_version":
|
||||
self._reply(200, VERSION)
|
||||
return
|
||||
elif metric_name == "latest_version_info":
|
||||
try:
|
||||
get_latest_version()
|
||||
@ -735,46 +831,40 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
|
||||
200,
|
||||
json.dumps(
|
||||
{
|
||||
"latest": latest_version,
|
||||
"supported": 1 if release_supported else 0,
|
||||
"latest": Context.latest_version,
|
||||
"supported": 1 if Context.release_supported else 0,
|
||||
}
|
||||
),
|
||||
)
|
||||
except LatestVersionCheckError as e:
|
||||
log.error("Failed to retrieve latest version information: {}".format(e))
|
||||
Context.log.error(
|
||||
"Failed to retrieve latest version information: %s", e
|
||||
)
|
||||
self._reply(503, "Failed to retrieve latest version info")
|
||||
return
|
||||
else:
|
||||
# Note: parse_qs returns the values as a list. Since we always expect
|
||||
# single values, just grab the first from each.
|
||||
args = {key: values[0] for key, values in parsed_query.items()}
|
||||
|
||||
# Note: parse_qs returns the values as a list. Since we always expect
|
||||
# single values, just grab the first from each.
|
||||
args = {key: values[0] for key, values in parsed_query.items()}
|
||||
# Get the dbname. If none was provided, use the default from the
|
||||
# config.
|
||||
dbname = args.get("dbname", Context.config["dbname"])
|
||||
|
||||
# Get the dbname. If none was provided, use the default from the
|
||||
# config.
|
||||
dbname = args.get("dbname", config["dbname"])
|
||||
|
||||
# Sample the metric
|
||||
try:
|
||||
self._reply(200, sample_metric(dbname, metric_name, args))
|
||||
return
|
||||
except UnknownMetricError as e:
|
||||
log.error("Unknown metric: {}".format(metric_name))
|
||||
self._reply(404, "Unknown metric")
|
||||
return
|
||||
except MetricVersionError as e:
|
||||
log.error(
|
||||
"Failed to find a version of {} for {}".format(metric_name, version)
|
||||
)
|
||||
self._reply(404, "Unsupported version")
|
||||
return
|
||||
except UnhappyDBError as e:
|
||||
log.info("Database {} is unhappy, please be patient".format(dbname))
|
||||
self._reply(503, "Database unavailable")
|
||||
return
|
||||
except Exception as e:
|
||||
log.error("Error running query: {}".format(e))
|
||||
self._reply(500, "Unexpected error: {}".format(e))
|
||||
return
|
||||
# Sample the metric
|
||||
try:
|
||||
self._reply(200, sample_metric(dbname, metric_name, args))
|
||||
except UnknownMetricError:
|
||||
Context.log.error("Unknown metric: %s", metric_name)
|
||||
self._reply(404, "Unknown metric")
|
||||
except MetricVersionError:
|
||||
Context.log.error("Failed to find an query version for %s", metric_name)
|
||||
self._reply(404, "Unsupported version")
|
||||
except UnhappyDBError:
|
||||
Context.log.info("Database %s is unhappy, please be patient", dbname)
|
||||
self._reply(503, "Database unavailable")
|
||||
except Exception as e: # pylint: disable=broad-exception-caught
|
||||
Context.log.error("Error running query: %s", e)
|
||||
self._reply(500, "Unexpected error: {}".format(e))
|
||||
|
||||
def _reply(self, code, content):
|
||||
"""
|
||||
@ -787,7 +877,14 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
|
||||
self.wfile.write(bytes(content, "utf-8"))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
def main():
|
||||
"""
|
||||
Main application routine
|
||||
"""
|
||||
|
||||
# Initialize the logging framework
|
||||
Context.init_logging()
|
||||
|
||||
# Handle cli args
|
||||
parser = argparse.ArgumentParser(
|
||||
prog="pgmon", description="A PostgreSQL monitoring agent"
|
||||
@ -808,33 +905,35 @@ if __name__ == "__main__":
|
||||
args = parser.parse_args()
|
||||
|
||||
# Set the config file path
|
||||
config_file = args.config_file
|
||||
Context.config_file = args.config_file
|
||||
|
||||
# Read the config file
|
||||
read_config(config_file)
|
||||
read_config(Context.config_file)
|
||||
|
||||
# Run query tests and exit if test mode is enabled
|
||||
if args.test:
|
||||
errors = test_queries()
|
||||
if errors > 0:
|
||||
if test_queries() > 0:
|
||||
sys.exit(1)
|
||||
else:
|
||||
sys.exit(0)
|
||||
sys.exit(0)
|
||||
|
||||
# Set up the http server to receive requests
|
||||
server_address = (config["address"], config["port"])
|
||||
httpd = ThreadingHTTPServer(server_address, SimpleHTTPRequestHandler)
|
||||
server_address = (Context.config["address"], Context.config["port"])
|
||||
Context.httpd = ThreadingHTTPServer(server_address, SimpleHTTPRequestHandler)
|
||||
|
||||
# Set up the signal handler
|
||||
signal.signal(signal.SIGINT, signal_handler)
|
||||
signal.signal(signal.SIGHUP, signal_handler)
|
||||
|
||||
# Handle requests.
|
||||
log.info("Listening on port {}...".format(config["port"]))
|
||||
while running:
|
||||
httpd.handle_request()
|
||||
Context.log.info("Listening on port %s...", Context.config["port"])
|
||||
while Context.running:
|
||||
Context.httpd.handle_request()
|
||||
|
||||
# Clean up PostgreSQL connections
|
||||
# TODO: Improve this ... not sure it actually closes all the connections cleanly
|
||||
for pool in connections.values():
|
||||
for pool in Context.connections.values():
|
||||
pool.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
129
zabbix_templates/coverage.py
Executable file
129
zabbix_templates/coverage.py
Executable file
@ -0,0 +1,129 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# Compare the items defined in a Zabbix template with the metrics defined in a config file.
|
||||
|
||||
import sys
|
||||
|
||||
import yaml
|
||||
|
||||
# Special built in metrics
|
||||
SPECIAL = {
|
||||
"agent_version",
|
||||
"latest_version_info"
|
||||
}
|
||||
|
||||
|
||||
class NonMetricItemError(Exception):
|
||||
"""
|
||||
A given item does not directly use a metric
|
||||
"""
|
||||
|
||||
|
||||
def read_metrics(file):
|
||||
"""
|
||||
Read the metrics from a config file and return the list of names
|
||||
|
||||
params:
|
||||
file: the name of the file to read
|
||||
|
||||
returns:
|
||||
list of metric named defined in the file
|
||||
|
||||
raises:
|
||||
yaml.parser.ParserError: invalid yaml file
|
||||
"""
|
||||
names = set()
|
||||
config = None
|
||||
|
||||
with open(file, "r", encoding="utf-8") as f:
|
||||
config = yaml.safe_load(f)
|
||||
|
||||
try:
|
||||
for m in config["metrics"].keys():
|
||||
names.add(m)
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
return names
|
||||
|
||||
|
||||
def extract_metric(item):
|
||||
"""
|
||||
Extract the metric from an item definition
|
||||
|
||||
params:
|
||||
item: the item/discovery/prototype definition dict
|
||||
|
||||
returns:
|
||||
the name of the metric used in the item
|
||||
|
||||
raises:
|
||||
NonMetricItemError: the item does not directly use a metric
|
||||
"""
|
||||
try:
|
||||
if item["type"] == "HTTP_AGENT":
|
||||
url = item["url"]
|
||||
if url.startswith("http://localhost:{$AGENT_PORT}"):
|
||||
return url.split("/")[-1]
|
||||
except KeyError:
|
||||
raise NonMetricItemError()
|
||||
|
||||
raise NonMetricItemError()
|
||||
|
||||
|
||||
def read_template(file):
|
||||
"""
|
||||
Read the items from a Zabbix template and return the list of metric names
|
||||
|
||||
params:
|
||||
file: the name of the file to read
|
||||
|
||||
returns:
|
||||
list of metric named used in the file
|
||||
|
||||
raises:
|
||||
yaml.parser.ParserError: invalid yaml file
|
||||
"""
|
||||
names = set()
|
||||
config = None
|
||||
|
||||
with open(file, "r", encoding="utf-8") as f:
|
||||
config = yaml.safe_load(f)
|
||||
|
||||
try:
|
||||
for template in config["zabbix_export"]["templates"]:
|
||||
for item in template["items"]:
|
||||
try:
|
||||
names.add(extract_metric(item))
|
||||
except NonMetricItemError:
|
||||
pass
|
||||
|
||||
for rule in template["discovery_rules"]:
|
||||
try:
|
||||
names.add(extract_metric(rule))
|
||||
except NonMetricItemError:
|
||||
pass
|
||||
|
||||
for proto in rule["item_prototypes"]:
|
||||
try:
|
||||
names.add(extract_metric(proto))
|
||||
except NonMetricItemError:
|
||||
pass
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
return names
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
config_file = sys.argv[1]
|
||||
config_metrics = read_metrics(config_file)
|
||||
|
||||
template_file = sys.argv[2]
|
||||
template_metrics = read_template(template_file) - SPECIAL
|
||||
|
||||
config_only = config_metrics - template_metrics
|
||||
template_only = template_metrics - config_metrics
|
||||
|
||||
print("Config only: {}".format(sorted(list(config_only))))
|
||||
print("Template only: {}".format(sorted(list(template_only))))
|
||||
@ -167,7 +167,8 @@ zabbix_export:
|
||||
operator: NOT_MATCHES_REGEX
|
||||
formulaid: A
|
||||
lifetime: 30d
|
||||
enabled_lifetime_type: DISABLE_NEVER
|
||||
enabled_lifetime_type: DISABLE_AFTER
|
||||
enabled_lifetime: 1d
|
||||
item_prototypes:
|
||||
- uuid: a30babe4a6f4440bba2a3ee46eff7ce2
|
||||
name: 'Time spent executing statements on {#DBNAME}'
|
||||
@ -982,6 +983,9 @@ zabbix_export:
|
||||
type: DEPENDENT
|
||||
key: pgmon_discover_io_backend_types
|
||||
delay: '0'
|
||||
lifetime: 30d
|
||||
enabled_lifetime_type: DISABLE_AFTER
|
||||
enabled_lifetime: 1h
|
||||
item_prototypes:
|
||||
- uuid: b1ac2e56b30f4812bf33ce973ef16b10
|
||||
name: 'I/O Evictions by {#BACKEND_TYPE}'
|
||||
@ -1572,8 +1576,15 @@ zabbix_export:
|
||||
type: HTTP_AGENT
|
||||
key: pgmon_discover_rep
|
||||
delay: 10m
|
||||
filter:
|
||||
conditions:
|
||||
- macro: '{#APPLICATION_NAME}'
|
||||
value: '^pg_[0-9]+_sync_[0-9]+_[0-9]+$'
|
||||
operator: NOT_MATCHES_REGEX
|
||||
formulaid: A
|
||||
lifetime: 30d
|
||||
enabled_lifetime_type: DISABLE_NEVER
|
||||
enabled_lifetime_type: DISABLE_AFTER
|
||||
enabled_lifetime: 7d
|
||||
item_prototypes:
|
||||
- uuid: 3a5a60620e6a4db694e47251148d82f5
|
||||
name: 'Flush lag for {#REPID}'
|
||||
@ -1775,6 +1786,8 @@ zabbix_export:
|
||||
value: Raw
|
||||
url: 'http://localhost:{$AGENT_PORT}/discover_rep'
|
||||
lld_macro_paths:
|
||||
- lld_macro: '{#APPLICATION_NAME}'
|
||||
path: $.application_name
|
||||
- lld_macro: '{#CLIENT_ADDR}'
|
||||
path: $.client_addr
|
||||
- lld_macro: '{#REPID}'
|
||||
@ -1786,6 +1799,15 @@ zabbix_export:
|
||||
type: HTTP_AGENT
|
||||
key: pgmon_discover_slots
|
||||
delay: 10m
|
||||
filter:
|
||||
conditions:
|
||||
- macro: '{#SLOT_NAME}'
|
||||
value: '^pg_[0-9]+_sync_[0-9]+_[0-9]+$'
|
||||
operator: NOT_MATCHES_REGEX
|
||||
formulaid: A
|
||||
lifetime: 30d
|
||||
enabled_lifetime_type: DISABLE_AFTER
|
||||
enabled_lifetime: 7d
|
||||
item_prototypes:
|
||||
- uuid: 536c5f82e3074ddfbfd842b3a2e8d46c
|
||||
name: 'Slot {#SLOT_NAME} - Confirmed Flushed Bytes Lag'
|
||||
|
||||
Loading…
Reference in New Issue
Block a user