Source code for malibu.config.configuration

# -*- coding: utf-8 -*-
from __future__ import unicode_literals

import json
import io

from contextlib import closing
try:
    from urllib2 import urlopen
except ImportError:
    from urllib.request import urlopen

from malibu.text import (
    string_type,
    unicode_type,
    unicode2str
)

__doc__ = """
malibu.config.configuration
---------------------------

INI-style configuration implementation with some special features
to make configuration a little simpler.

"""


[docs]class ConfigurationSection(dict): """ The ConfigurationSection class is a modified dictionary that provides "helpers" to grab a configuration entry in it's correct "type" form. """ def __init__(self): dict.__init__(self) self.mutable = True def __getitem__(self, key): try: return dict.__getitem__(self, key) except (IndexError, KeyError): raise KeyError("Unknown configuration key '%s'." % (key)) def __setitem__(self, key, value): if self.mutable: self.update({key: value}) else: raise AttributeError("This section is not mutable.")
[docs] def set_mutable(self, mutable): """ Enforces immutability on a configuration section. """ self.mutable = mutable
[docs] def set(self, key, value): """ Allows programmatic setting of configuration entries. """ return self.__setitem__(key, value)
[docs] def get(self, key, default=None): """ The bare "get" on the underlying dictionary that returns the configuration entry in whatever form it was parsed as, typically a string. """ try: value = self.__getitem__(key) if isinstance(value, unicode_type()) and value.lower() == u'!none': return None elif isinstance(value, string_type()) and value.lower() == '!none': return None else: return value except IndexError: return default
[docs] def get_list(self, key, delimiter=",", strip=True, default=[]): """ Attempts to take a something-delimited string and "listify" it. If an error occurs while attempting to listify, :param default: will be returned. """ try: val = self.get(key) if isinstance(val, list): return val l = val.split(delimiter) if len(val) > 0 else default if strip: return [item.strip() for item in l] else: return l except: return default
[docs] def get_string(self, key, default=""): """ Attempts to take the value stored and retrieve it safely as a string. If the value mapped to by :param key: is "!None", the object returned is NoneType. If an error occurs while trying to safely retrieve the string, :param default: is returned. """ try: return str(self.get(key)) or default except: return default
[docs] def get_int(self, key, default=None): """ Attempts to fetch and intify the value mapped to by :param key:. If an error occurs while trying to intify the value, :param default: will be returned. """ try: return int(self.get(key)) or default except: return default
[docs] def get_bool(self, key, default=False): """ Attempts to safely fetch the value mapped to by :param key:. After successful retrieval, a conditional coercion to boolean is attempt. If the coercion to boolean fails, :param default: is returned. """ try: val = self.get(key) or default if isinstance(val, bool): return val elif isinstance(val, unicode_type()): if val.lower() == 'true': self.set(key, True) return True elif val.lower() == 'false': self.set(key, False) return False else: return default elif isinstance(val, int): if val == 1: self.set(key, True) return True elif val == 0: self.set(key, False) return False else: return default else: print("get_bool: unknown value type") print("type(val) => ", type(val)) print("unicode_type() => ", unicode_type()) return default except: return default
[docs]class SectionPromise(object): """ this is a configuration section promise to make resolution of linked sections post-load easier. """ promises = [] def __init__(self, config, section, key, link): self.config = config self.section = section self.key = key self.link = link self.__fulfilled = False SectionPromise.promises.append(self) def __str__(self): """ Convert directly to a string for recreating the link during config write. Better for serialization. """ return '@' + self.link
[docs] def resolve(self): """ Resolves a SectionPromise into the proper dictionary value. """ if self.__fulfilled: return section = self.config.get_section(self.section) link = self.config.get_section(self.link) target = section.get(self.key) if isinstance(target, list): target.remove(self) target.append(link) section.set(self.key, target) else: section.set(self.key, link) # Preserve the promise for writing back out. section.set("_%s_promise" % (self.key), self) self.__fulfilled = True
[docs]class Configuration(object): """ Configuration class performs the loading, saving, and parsing of an INI-style configuration file with a few advanced features such as value typing, file inclusion, section references, and JSON-style list definition. """ def __init__(self): """ initialise the container store in key:value format withing the certain category """ self.__container = ConfigurationSection() self._filename = None self.loaded = False def __resolve_links(self): """ resolves all linked references (SectionPromise instances). """ for promise in SectionPromise.promises: promise.resolve() SectionPromise.promises = []
[docs] def add_section(self, section_name): """ Adds a new configuration section to the main dictionary. """ section = ConfigurationSection() self.__container.set(section_name, section) return section
[docs] def remove_section(self, section_name): """ Removes a section from the main dictionary. """ del self.__container[section_name]
@property def sections(self): """ Returns a list of all sections in the configuration. """ return self.__container.keys()
[docs] def has_section(self, section_name): """ Return if this configuration has a section named :param section_name:. """ return section_name in self.__container
[docs] def get_section(self, section_name): """ Return the internal ConfigurationSection representation of a set of configuration entries. :param str section_name: Section name to retrieve. :rtype: malibu.config.configuration.ConfigurationSection :return: ConfigurationSection or None """ if self.__container.__contains__(section_name): return self.__container[section_name] else: return None
[docs] def get_namespace(self, namespace): """ Returns a set of ConfigurationSection objects that are prefixed with the namespace specified above. If no configuration sections have the requested namespace, None is returned. :param str namespace: Namespace to find in section name. :rtype: set :return: dict or None """ if not namespace: raise ValueError("Namespace can not be none") sections = {} for section_name in self.sections: if section_name.startswith(namespace + ":"): short_name = section_name.split(":")[1] sections.update({ short_name: self.get_section(section_name), }) else: continue if len(sections) == 0: return {} else: return sections
[docs] def unload(self): """ Unload an entire configuration """ self.__container.clear() self.loaded = False
[docs] def reload(self): """ Reload the configuration from the initially specified file """ self.unload() self.load(self._filename)
[docs] def save(self, filename=None): """ Write the loaded configuration into the file specified by :param filename: or to the initially specified filename. All linked sections are flattened into SectionPromise instances and written to the configuration properly. :raises ValueError: if no save filename available. """ if filename is None: filename = self._filename if filename is None: raise ValueError('No filename specified and no stored filename.') with closing(io.open(filename, 'w')) as config: for section, smap in self.__container.items(): config.write("[%s]\n" % (section)) for key, value in smap.items(): if isinstance(value, list): value = "+list:" + json.dumps(value) elif isinstance(value, ConfigurationSection): if "_%s_promise" % (key) in smap: value = str(smap["_%s_promise" % (key)]) else: value = str(value) elif isinstance(value, SectionPromise): continue elif isinstance(value, io.TextIOBase): value = "+file:" + value.name elif value is None: value = "!None" else: value = str(value) config.write("%s = %s\n" % (key, value)) config.write("\n")
[docs] def load(self, filename): """ Loads a INI-style configuration from the given filename. If the file can not be opened from :param filename:, a ValueError is raised. Upon any other error, the exception is simply raised to the top. :raises ValueError: if no filename provided. :raises Exception: upon other error """ try: fobj = io.open(filename, 'r') self._filename = filename self.load_file(fobj) fobj.close() except IOError: raise ValueError("Invalid filename '%s'." % (filename)) except: raise
[docs] def load_file(self, fobj): """ Performs the full load of the configuration file from the underlying file object. If a file object is not passed in :param fobj:, TypeError is raised. :raises TypeError: if :param fobj: is not a file type """ if not fobj or not isinstance(fobj, io.IOBase): raise TypeError("Invalid file object.") if self.loaded: self.__container.clear() section_name = None option_key = None option_value = None for line in fobj.readlines(): line = line.strip('\n').lstrip() if line.startswith('#') or line.startswith(';'): continue elif line.startswith('[') and line.endswith(']'): # This is the beginning of a section tag. section_name = line[1:-1] if not self.get_section(section_name): self.add_section(section_name) continue elif '=' in line: s = line.split('=', 1) # strip whitespace option_key = s[0].strip() option_value = s[1].strip() if s[1] is not '' else None if option_value and option_value[-1] == ';': option_value = option_value[0:-1] section = self.get_section(section_name) if not option_value: section.set(option_key, option_value) continue if option_value.startswith('+'): # typed reference / variable dobj_type = option_value.split(':')[0][1:] if len(option_value.split(':')) > 2: dobj_value = ':'.join(option_value.split(':')[1:]) else: dobj_value = option_value.split(':')[1] if dobj_type.lower() == 'file': try: section.set(option_key, io.open(dobj_value, 'r')) except: try: section.set( option_key, io.open(dobj_value, 'w+')) except: section.set(option_key, None) elif dobj_type.lower() in ["url", "uri"]: try: section.set(option_key, urlopen(dobj_value).read()) except: raise section.set(option_key, None) elif dobj_type.lower() == 'list': try: dobj_list = json.loads('%s' % (dobj_value)) dobj_list = unicode2str(dobj_list) except: dobj_list = [] dobj_repl = [] for item in dobj_list: if item.startswith(b'@'): link_name = item[1:] if not self.get_section(link_name): dobj_repl.append( SectionPromise( self, section_name, option_key, link_name)) else: link = self.get_section(link_name) dobj_repl.append(link) else: dobj_repl.append(item) section.set(option_key, dobj_repl) else: section.set(option_key, option_value) elif option_value.startswith('@'): # section reference link_name = option_value[1:] if not self.get_section(link_name): section.set( option_key, SectionPromise( self, section_name, option_key, link_name)) else: link = self.get_section(link_name) section.set(option_key, link) else: section.set(option_key, option_value) continue else: continue self.__resolve_links() self.loaded = True