import unittest from datetime import datetime, timedelta import pgmon class TestPgmonMethods(unittest.TestCase): ## # update_deep ## def test_update_deep__empty_cases(self): # Test empty dict cases d1 = {} d2 = {} pgmon.update_deep(d1, d2) self.assertEqual(d1, {}) self.assertEqual(d2, {}) d1 = {'a': 1} d2 = {} pgmon.update_deep(d1, d2) self.assertEqual(d1, { 'a': 1 }) self.assertEqual(d2, {}) d1 = {} d2 = {'a': 1} pgmon.update_deep(d1, d2) self.assertEqual(d1, { 'a': 1 }) self.assertEqual(d2, d1) def test_update_deep__scalars(self): # Test adding/updating scalar values d1 = {'foo': 1, 'bar': "text", 'hello': "world"} d2 = {'foo': 2, 'baz': "blah"} pgmon.update_deep(d1, d2) self.assertEqual(d1, {'foo': 2, 'bar': "text", 'baz': "blah", 'hello': "world"}) self.assertEqual(d2, {'foo': 2, 'baz': "blah"}) def test_update_deep__lists(self): # Test adding to lists d1 = {'lst1': []} d2 = {'lst1': [1, 2]} pgmon.update_deep(d1, d2) self.assertEqual(d1, {'lst1': [1, 2]}) self.assertEqual(d2, d1) d1 = {'lst1': [1, 2]} d2 = {'lst1': []} pgmon.update_deep(d1, d2) self.assertEqual(d1, {'lst1': [1, 2]}) self.assertEqual(d2, {'lst1': []}) d1 = {'lst1': [1, 2, 3]} d2 = {'lst1': [3, 4]} pgmon.update_deep(d1, d2) self.assertEqual(d1, {'lst1': [1, 2, 3, 3, 4]}) self.assertEqual(d2, {'lst1': [3, 4]}) # Lists of objects d1 = {'lst1': [{'id': 1}, {'id': 2}, {'id': 3}]} d2 = {'lst1': [{'id': 3}, {'id': 4}]} pgmon.update_deep(d1, d2) self.assertEqual(d1, {'lst1': [{'id': 1}, {'id': 2}, {'id': 3}, {'id': 3}, {'id': 4}]}) self.assertEqual(d2, {'lst1': [{'id': 3}, {'id': 4}]}) # Nested lists d1 = {'obj1': {'l1': [1, 2]}} d2 = {'obj1': {'l1': [3, 4]}} pgmon.update_deep(d1, d2) self.assertEqual(d1, {'obj1': {'l1': [1, 2, 3, 4]}}) self.assertEqual(d2, {'obj1': {'l1': [3, 4]}}) def test_update_deep__dicts(self): # Test adding to lists d1 = {'obj1': {}} d2 = {'obj1': {'a': 1, 'b': 2}} pgmon.update_deep(d1, d2) self.assertEqual(d1, {'obj1': {'a': 1, 'b': 2}}) self.assertEqual(d2, d1) d1 = {'obj1': {'a': 1, 'b': 2}} d2 = {'obj1': {}} pgmon.update_deep(d1, d2) self.assertEqual(d1, {'obj1': {'a': 1, 'b': 2}}) self.assertEqual(d2, {'obj1': {}}) d1 = {'obj1': {'a': 1, 'b': 2}} d2 = {'obj1': {'a': 5, 'c': 12}} pgmon.update_deep(d1, d2) self.assertEqual(d1, {'obj1': {'a': 5, 'b': 2, 'c': 12}}) self.assertEqual(d2, {'obj1': {'a': 5, 'c': 12}}) # Nested dicts d1 = {'obj1': {'d1': {'a': 1, 'b': 2}}} d2 = {'obj1': {'d1': {'a': 5, 'c': 12}}} pgmon.update_deep(d1, d2) self.assertEqual(d1, {'obj1': {'d1': {'a': 5, 'b': 2, 'c': 12}}}) self.assertEqual(d2, {'obj1': {'d1': {'a': 5, 'c': 12}}}) def test_update_deep__types(self): # Test mismatched types d1 = {'foo': 5} d2 = None self.assertRaises(TypeError, pgmon.update_deep, d1, d2) d1 = None d2 = {'foo': 5} self.assertRaises(TypeError, pgmon.update_deep, d1, d2) # Nested mismatched types d1 = {'foo': [1, 2]} d2 = {'foo': {'a': 7}} self.assertRaises(TypeError, pgmon.update_deep, d1, d2) ## # get_pool ## def test_get_pool__simple(self): # Just get a pool in a normal case pgmon.config.update(pgmon.default_config) pool = pgmon.get_pool('postgres') self.assertIsNotNone(pool) def test_get_pool__unhappy(self): # Test getting an unhappy database pool pgmon.config.update(pgmon.default_config) pgmon.unhappy_cooldown['postgres'] = datetime.now() + timedelta(60) self.assertRaises(pgmon.UnhappyDBError, pgmon.get_pool, 'postgres') # Test getting a different database when there's an unhappy one pool = pgmon.get_pool('template0') self.assertIsNotNone(pool) ## # handle_connect_failure ## def test_handle_connect_failure__simple(self): # Test adding to an empty unhappy list pgmon.config.update(pgmon.default_config) pgmon.unhappy_cooldown = {} pool = pgmon.get_pool('postgres') pgmon.handle_connect_failure(pool) self.assertGreater(pgmon.unhappy_cooldown['postgres'], datetime.now()) # Test adding another database pool = pgmon.get_pool('template0') pgmon.handle_connect_failure(pool) self.assertGreater(pgmon.unhappy_cooldown['postgres'], datetime.now()) self.assertGreater(pgmon.unhappy_cooldown['template0'], datetime.now()) self.assertEqual(len(pgmon.unhappy_cooldown), 2) ## # get_query ## def test_get_query__basic(self): # Test getting a query with one version metric = { 'type': 'value', 'query': { 0: 'DEFAULT' } } self.assertEqual(pgmon.get_query(metric, 100000), 'DEFAULT') def test_get_query__versions(self): metric = { 'type': 'value', 'query': { 0: 'DEFAULT', 110000: 'NEW' } } # Test getting the default version of a query with no lower bound and a newer version self.assertEqual(pgmon.get_query(metric, 100000), 'DEFAULT') # Test getting the newer version of a query with no lower bound and a newer version for the newer version self.assertEqual(pgmon.get_query(metric, 110000), 'NEW') # Test getting the newer version of a query with no lower bound and a newer version for an even newer version self.assertEqual(pgmon.get_query(metric, 160000), 'NEW') # Test getting a version in bwtween two other versions metric = { 'type': 'value', 'query': { 0: 'DEFAULT', 96000: 'OLD', 110000: 'NEW' } } self.assertEqual(pgmon.get_query(metric, 100000), 'OLD') def test_get_query__missing_metric(self): # Test getting a metric that doesn't exist pass def test_get_query__missing_version(self): # Test getting a metric that only exists for newer versions pass # Test getting a metric that only exists for older versions pass