Commit 908d1dff authored by BAIRE Anthony's avatar BAIRE Anthony
Browse files

add module config_reader to process yaml config files

parent 76285a05
#!/usr/bin/python3
import collections
import logging
import types
import yaml
class ConfigError(Exception):
pass
class _Error(Exception):
@property
def key(self):
return self.args[0]
@property
def msg(self):
return self.args[1]
class _KeyError(_Error, KeyError):
pass
class _UNSET:
pass
class _Container:
def _set_parent(self, obj, key):
assert "_parent" not in self
self._parent = obj, key
def _read_value(self, value, type, cast, key=_UNSET):
# check type
if not ( isinstance(value, type) or
(isinstance(value, Sequence) and (type is list)) or
(isinstance(value, Mapping) and (type is dict))):
raise _Error(self.path(key), "type error: value %r is not a %s" % (value, type.__name__))
# convert value
try:
return cast(value)
except Exception as e:
raise _Error(self.path(key), str(e))
def path(self, key=_UNSET):
"""Return the path of entry relative to the root of the tree
if `key` is provided, then the function displays the path of the given
item, otherwise it returnt the path to `self`
"""
if key is _UNSET:
obj, key = getattr(self, "_parent", (None, None))
else:
obj = self
lst=[]
while obj is not None:
lst.append(obj._path_elem(key))
obj, key = getattr(obj, "_parent", (None, None))
return "".join(reversed(lst))
class Mapping(_Container):
"""A read-only dict-like structure that tracks unused keys
This class provides the collections.abc.Mutable API plus .path() and extra
args in .get()
"""
class _Enumerator:
def __init__(self, dct, func):
self._dict = dct
self._func = func
def __len__(self):
return len(self._dict)
def __iter__(self):
for key, val in self._dict._iter_items():
yield self._func(key, val)
def __init__(self, values=()):
self._dict = types.MappingProxyType(collections.OrderedDict(values))
self._unused = set(self._dict)
for k, v in self._dict.items():
if isinstance(v, _Container):
v._set_parent(self, k)
def __len__(self):
return len(self._dict)
def __contains__(self, key):
return key in self._dict
def __getitem__(self, key):
return self.get(key, required=True)
def get(self, key, default=None, type=object, *, cast=lambda x:x, required=False):
"""Get an elemment of the Mapping
This function gets the value of entry `key` in the Mapping
- if found :
- ensure that the value is an instance of `type` or raise _Error
- return the result of `cast`(value) or raise _Error in case `cast`
return an exception
- if not found :
- if `required` is true, then raise _KeyError
- otherwise return the value of `default`
NOTE: if `default` is a dict then it is implicitely converted to a Mapping
"""
self._unused.discard(key)
result = self._dict.get(key, _UNSET)
if result is not _UNSET:
return self._read_value(result, type, cast, key)
elif required:
raise _KeyError(self.path(key), "config key is missing")
elif isinstance(default, dict):
# implicitely create a mapping
dct = Mapping(default)
dct._set_parent(self, key)
return dct
else:
return default
def __iter__(self):
return iter(self._dict)
def keys(self):
return self._dict.keys()
def _iter_items(self):
for key, val in self._dict.items():
self._unused.discard(key)
yield key, val
def values(self):
return self._Enumerator(self, lambda k,v: v)
def items(self):
return self._Enumerator(self, lambda k,v: (k,v))
def _path_elem(self, key):
return ".%s" % key
def _log_warnings(self, logger):
for key, val in self._dict.items():
if key in self._unused:
logger.warning("unused config key %s", self.path(key))
elif isinstance(val, _Container):
val._log_warnings(logger)
class Sequence(tuple, _Container):
"""A read-only list-like structure
This class provides the collections.abc.Sequence API plus .path()
"""
def __init__(self, values=()):
super().__init__()
for k, v in enumerate(self):
if isinstance(v, _Container):
v._set_parent(self, k)
def _path_elem(self, key):
return "[%d]" % key
def _log_warnings(self, logger):
for val in self:
if isinstance(val, _Container):
val._log_warnings(logger)
class _Loader(yaml.SafeLoader):
pass
yaml.add_constructor(
yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG,
lambda loader, node: Mapping(loader.construct_pairs(node)),
_Loader)
yaml.add_constructor(
yaml.resolver.BaseResolver.DEFAULT_SEQUENCE_TAG,
lambda loader, node: Sequence(loader.construct_sequence(node)),
_Loader)
class ConfigReader:
"""A class for reading a YAML config file
Usage:
with ConfigReader(open(filename), logger) as cfg:
val1 = cfg["key1"]
...
When entering the context, this class parses the provided yaml source and
returns a read-only object to access its content.
Within the context the class tracks which config keys are accessed.
When leaving the context and in the absence of any error, the class warns
about unused config keys. In case of error (any exception) it logs the
error and raise ConfigError.
The read-only object (returned when entering the context) is a tree where
all instances of dict and list are replaced with Mapping and Sequence. Also
the order of dicts is preserved.
All warning and error messages are reported to the logger, with the path to
the relevant entry within the tree.
"""
def __init__(self, source, logger=logging.root):
self._log = logger
self._source = source
def _report_error(self, exc_value):
if isinstance(exc_value, _Error):
msg = "%s: %s" % (exc_value.key, exc_value.msg)
self._log.error("%s", msg)
elif isinstance(exc_value, yaml.YAMLError):
msg = "yaml error: %s" % str(exc_value)
self._log.error("%s", msg)
else:
msg = "exception in config reader"
self._log.exception("%s", msg)
raise ConfigError(msg)
def __enter__(self):
assert not hasattr(self, "value")
name = getattr(self._source, "name", None)
if name:
self._log.info("reading config file %s", name)
try:
self.value = yaml.load(self._source, _Loader)
except yaml.YAMLError as e:
self._report_error(e)
finally:
if hasattr(self._source, "close"):
self._source.close()
return self.value
def __exit__(self, exc, val, tb):
assert hasattr(self, "value")
v = self.value
del self.value
if exc is not None:
self._report_error(val)
elif isinstance(v, _Container):
v._log_warnings(self._log)
import contextlib
import io
import logging
import unittest
from unittest import mock
import config_reader
from config_reader import ConfigReader, ConfigError
log = logging.getLogger("test")
class ConfigReaderTestCase(unittest.TestCase):
def test_config_ok(self):
with mock.patch.object(log, "warning") as m, \
ConfigReader("""---
scalar_read: 42
scalar_unused: 24
dict_read:
dscalar_read: 14
dscalar_unused: 15
404: missing
list_read:
- lrscalar_read: 16
lrscalar_unused: 17
- unused_val4: 40
unused_val5: 50
list_unused:
- unused_val1: 10
unused_val2: 20
- unused_val3: 30
""", log) as cfg:
self.assertEqual(cfg["scalar_read"], 42)
self.assertEqual(cfg["scalar_read"], 42)
self.assertEqual(cfg["dict_read"]["dscalar_read"], 14)
self.assertEqual(cfg["list_read"][0]["lrscalar_read"], 16)
# read a default value
self.assertIs(cfg["dict_read"].get("unknown"), None)
self.assertEqual(cfg["dict_read"].get("unknown", "bar"), "bar")
# check type
self.assertEqual(cfg.get("scalar_read", int), 42)
# cast value
self.assertEqual (cfg.get("scalar_read", cast=str), "42")
self.assertEqual (cfg.get("scalar_read", cast=lambda x: x//2), 21)
# check paths
self.assertEqual(cfg.path("scalar_read"), ".scalar_read")
self.assertEqual(cfg["dict_read"].path(), ".dict_read")
self.assertEqual(cfg["dict_read"].path("dscalar_read"), ".dict_read.dscalar_read")
self.assertEqual(cfg["list_read"].path(), ".list_read")
self.assertEqual(cfg["list_read"].path(0), ".list_read[0]")
self.assertEqual(cfg["list_read"][0].path(), ".list_read[0]")
self.assertEqual(cfg["list_read"][0].path("lrscalar_read"), ".list_read[0].lrscalar_read")
m.assert_has_calls([
mock.call('unused config key %s', '.scalar_unused'),
mock.call('unused config key %s', '.dict_read.dscalar_unused'),
mock.call('unused config key %s', '.dict_read.404'),
mock.call('unused config key %s', '.list_read[0].lrscalar_unused'),
mock.call('unused config key %s', '.list_read[1].unused_val4'),
mock.call('unused config key %s', '.list_read[1].unused_val5'),
mock.call('unused config key %s', '.list_unused'),
])
def test_enumerators(self):
with ConfigReader("""---
list: [4,4,3,1,5,7,9,8]
dict:
foo: bar
26: 14
hello: world
""", log) as cfg:
self.assertSequenceEqual(tuple(cfg["list"]), (4,4,3,1,5,7,9,8))
self.assertSequenceEqual(tuple(cfg["dict"]), ("foo", 26, "hello"))
self.assertSequenceEqual(tuple(cfg["dict"].keys()), ("foo", 26, "hello"))
self.assertSequenceEqual(tuple(cfg["dict"].values()), ("bar", 14, "world"))
self.assertSequenceEqual(tuple(cfg["dict"].items()), (("foo", "bar"), (26, 14), ("hello", "world")))
def test_implicit_mapping(self):
with self.check_error("\.mapping\.submapping\.missing: config key is missing"):
with ConfigReader("""---
scalar: 42
""", log) as cfg:
mapping = cfg.get("mapping", {})
submapping = mapping.get("submapping", {"foo": "bar"})
self.assertIsInstance(mapping, config_reader.Mapping)
self.assertIsInstance(submapping, config_reader.Mapping)
self.assertEqual(submapping["foo"], "bar")
submapping["missing"]
def test_config_file(self):
fp = io.StringIO("---\n[4, 2]")
fp.name="dummmy_file.yml"
with mock.patch.object(log, "info") as m:
with ConfigReader(fp, log) as cfg:
self.assertTrue(fp.closed)
self.assertSequenceEqual(cfg, (4, 2))
m.assert_called_with('reading config file %s', 'dummmy_file.yml')
@contextlib.contextmanager
def check_error(self, regex, method="error"):
assert method in ("error", "exception")
with mock.patch.object(log, "error" ) as m_err, \
mock.patch.object(log, "exception") as m_exc:
m = m_err if (method == "error") else m_exc
with self.assertRaisesRegex(ConfigError, regex):
yield
self.assertEqual(m.call_count, 1)
call = m.call_args[0]
msg = call[0] % call[1:]
self.assertRegex(msg, regex)
def test_parse_error(self):
with self.check_error("yaml error: while scanning "):
with ConfigReader(""""foo""", log) as cfg:
raise ValueError()
def test_key_error(self):
with self.check_error(r"\.a\.b\.missing: config key is missing"):
with ConfigReader("""---
a: { b: {c: 42}}
""", log) as cfg:
cfg["a"]["b"]["missing"]
with self.check_error(r"exception in config reader", method="exception"):
with ConfigReader("""---
a: { b: [ 42 ] }
""", log) as cfg:
cfg["a"]["b"][2]
def test_type_error(self):
with self.check_error(r"\.bad\.value: type error: value 42 is not a str"):
with ConfigReader("""---
bad: { value: 42 }
""", log) as cfg:
cfg["bad"].get("value", type=str)
# type check happens before cast
with self.check_error(r"\.bad\.value: type error: value 42 is not a str"):
with ConfigReader("""---
bad: { value: 42 }
""", log) as cfg:
cfg["bad"].get("value", type=str, cast=lambda x:x/0)
def test_cast_error(self):
# ValueError
with self.check_error(r"\.bad\.value: invalid literal for int\(\) with base 10: 'foo'"):
with ConfigReader("""---
bad: { value: "foo" }
""", log) as cfg:
cfg["bad"].get("value", cast=int)
# TypeError
with self.check_error(r"\.bad\.value: unsupported operand type\(s\) for /: 'str' and 'int'"):
with ConfigReader("""---
bad: { value: "foo" }
""", log) as cfg:
cfg["bad"].get("value", cast=lambda x: x/2)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment