Improve test coverage

* Refactor some functions to improve unit test coverage.

* Use the "coverage" module if available along with unit tests.
This commit is contained in:
James Campbell 2025-10-08 23:53:45 -04:00
parent 75c5d76047
commit ab039dc412
Signed by: james
GPG Key ID: 2287C33A40DC906A
4 changed files with 180 additions and 31 deletions

1
.gitignore vendored
View File

@ -8,3 +8,4 @@ missing
__pycache__
venv
build
.coverage

View File

@ -19,6 +19,12 @@ RPM_VERSION := $(VERSION)-$(RPM_RELEASE)
DEB_VERSION := $(VERSION)~rc$(RELEASE)
endif
# Python stuff
PYTHON ?= python3
PYLINT ?= pylint
BLACK ?= black
HAVE_COVERAGE := $(shell $(PYTHON) -c 'import coverage' 2>/dev/null && echo yes)
# Where packages are built
BUILD_DIR := build
@ -82,19 +88,23 @@ clean:
# Check for lint
lint:
pylint src/pgmon.py
pylint src/test_pgmon.py
black --check --diff src/pgmon.py
black --check --diff src/test_pgmon.py
$(PYLINT) src/pgmon.py
$(PYLINT) src/test_pgmon.py
$(BLACK) --check --diff src/pgmon.py
$(BLACK) --check --diff src/test_pgmon.py
# Format the code using black
format:
black src/pgmon.py
black src/test_pylint.py
$(BLACK) src/pgmon.py
$(BLACK) src/test_pgmon.py
# Run unit tests for the script
test:
cd src ; python3 -m unittest
ifeq ($(HAVE_COVERAGE),yes)
cd src ; $(PYTHON) -m coverage run -m unittest && python3 -m coverage report -m
else
cd src ; $(PYTHON) -m unittest
endif
# Run query tests
query-tests:

View File

@ -134,6 +134,12 @@ class LatestVersionCheckError(Exception):
"""
class InvalidDataError(Exception):
"""
Error indicating query results were somehow invalid
"""
# Default config settings
DEFAULT_CONFIG = {
# The address the agent binds to
@ -469,6 +475,46 @@ def json_encode_special(obj):
raise TypeError("Cannot serialize object of {}".format(type(obj)))
def json_encode_result(return_type, res):
"""
Return a json string encoding of the results of a query.
params:
return_type: the expected structure to return. One of:
value, row, column, set
res: the query results
returns: a json string form of the results
raises:
ConfigError: when an invalid return_type is given
InvalidDataError: when the query results don't match the return type
"""
try:
if return_type == "value":
if len(res) == 0:
return ""
return str(list(res[0].values())[0])
if return_type == "row":
return json.dumps(
res[0] if len(res) > 0 else {}, default=json_encode_special
)
if return_type == "column":
return json.dumps(
[list(r.values())[0] for r in res], default=json_encode_special
)
if return_type == "set":
return json.dumps(res, default=json_encode_special)
except IndexError as e:
raise InvalidDataError(e) from e
# If we got to this point, the return type is invalid
raise ConfigError("Invalid query return type: {}".format(return_type))
def run_query_no_retry(pool, return_type, query, args):
"""
Run the query with no explicit retry code
@ -476,31 +522,10 @@ def run_query_no_retry(pool, return_type, query, args):
with pool.connection(float(Context.config["connect_timeout"])) as conn:
try:
with conn.cursor(cursor_factory=RealDictCursor) as curs:
output = None
curs.execute(query, args)
res = curs.fetchall()
if return_type == "value":
if len(res) == 0:
output = ""
output = str(list(res[0].values())[0])
elif return_type == "row":
# if len(res) == 0:
# return "[]"
output = json.dumps(res[0], default=json_encode_special)
elif return_type == "column":
# if len(res) == 0:
# return "[]"
output = json.dumps(
[list(r.values())[0] for r in res], default=json_encode_special
)
elif return_type == "set":
output = json.dumps(res, default=json_encode_special)
else:
raise ConfigError(
"Invalid query return type: {}".format(return_type)
)
return output
return json_encode_result(return_type, res)
except Exception as e:
dbname = pool.name
if dbname in Context.unhappy_cooldown:

View File

@ -234,6 +234,11 @@ class TestPgmonMethods(unittest.TestCase): # pylint: disable=too-many-public-me
d2 = {"foo": {"a": 7}}
self.assertRaises(TypeError, pgmon.update_deep, d1, d2)
# Nested mismatched types
d1 = {"foo": {"a": 7}}
d2 = {"foo": [1, 2]}
self.assertRaises(TypeError, pgmon.update_deep, d1, d2)
##
# get_pool
##
@ -1047,7 +1052,7 @@ metrics:
# get_latest_version
##
def test_get_latest_version(self):
def test_get_latest_version__basic(self):
"""
Test getting the latest version from the actual RSS feed
"""
@ -1066,7 +1071,7 @@ metrics:
# json_encode_special
##
def test_json_encode_special(self):
def test_json_encode_special__basic(self):
"""
Test encoding Decimal types as JSON
"""
@ -1085,3 +1090,111 @@ metrics:
self.assertEqual(
json.dumps(Decimal("2.5"), default=pgmon.json_encode_special), "2.5"
)
##
# json_encode_result
##
def test_json_encode_result__value(self):
"""
Test encoding a single value return type, valid inputs
"""
# Empty result
self.assertEqual(pgmon.json_encode_result("value", []), "")
# Single value
self.assertEqual(pgmon.json_encode_result("value", [{"id": 5}]), "5")
self.assertEqual(pgmon.json_encode_result("value", [{"id": "5"}]), "5")
self.assertEqual(pgmon.json_encode_result("value", [{"key": "word"}]), "word")
self.assertEqual(pgmon.json_encode_result("value", [{"id": Decimal(5)}]), "5")
def test_json_encode_result__row(self):
"""
Test encoding a row, valid inputs
"""
# Empty result
self.assertEqual(pgmon.json_encode_result("row", {}), "{}")
# Simple row
self.assertEqual(
pgmon.json_encode_result("row", [{"id": 5, "foo": "bar"}]),
'{"id": 5, "foo": "bar"}',
)
# Empry row (not ever likely to be what you want ... but technically not invalid)
self.assertEqual(pgmon.json_encode_result("row", [{}]), "{}")
def test_json_encode_result__column(self):
"""
Test encoding a column, valid inputs
"""
# Empty result
self.assertEqual(pgmon.json_encode_result("column", []), "[]")
# Simple column
self.assertEqual(pgmon.json_encode_result("column", [{"id": 5}]), "[5]")
self.assertEqual(
pgmon.json_encode_result("column", [{"id": 5}, {"id": 7}, {"id": 2}]),
"[5, 7, 2]",
)
def test_json_encode_result__set(self):
"""
Test encoding a set, valid inputs
"""
# Empty result
self.assertEqual(pgmon.json_encode_result("set", []), "[]")
# Simple column
self.assertEqual(pgmon.json_encode_result("set", [{"id": 5}]), '[{"id": 5}]')
self.assertEqual(
pgmon.json_encode_result(
"set", [{"id": 5, "foo": "bar"}, {"id": 7, "foo": "baz"}]
),
'[{"id": 5, "foo": "bar"}, {"id": 7, "foo": "baz"}]',
)
def test_json_encode_result__invalid_type(self):
"""
Test requesting an invalid return type
"""
# Make sure an empty list raises the error
self.assertRaises(pgmon.ConfigError, pgmon.json_encode_result, "foo", [])
# Make sure including data still raises the error
self.assertRaises(
pgmon.ConfigError,
pgmon.json_encode_result,
"foo",
[{"id": 5, "foo": "bar"}],
)
def test_json_encode_result__invalid_value(self):
"""
Test invalid data/type combinations for a value
"""
# Note: We should always get a lsit of dicts from psycopg using the RealDictCursor
# Empty row returned
self.assertRaises(
pgmon.InvalidDataError, pgmon.json_encode_result, "value", [{}]
)
def test_json_encode_result__invalid_row(self):
"""
Test invalid data/type combinations for a row
"""
# Note: We should always get a lsit of dicts from psycopg using the RealDictCursor
# Note: I can't at this point think of any sort of invalid result psycopg2 should
# produce for this return type.
# This is basically a place holder for now.
True # pylint: disable=pointless-statement
def test_json_encode_result__invalid_set(self):
"""
Test invalid data/type combinations for a set
"""
# Note: We should always get a lsit of dicts from psycopg using the RealDictCursor
# Note: I can't at this point think of any sort of invalid result psycopg2 should
# produce for this return type.
# This is basically a place holder for now.
True # pylint: disable=pointless-statement