Source code for pymatgen.db.config

"""
Database configuration functions.
Main class is DBConfig, which encapsulates a database configuration
passed in as a file or object. For example::
    cfg1 = DBConfig()  # use defaults
    cfg2 = DBConfig("/path/to/myfile.json")  # read from file
    f = open("/other/file.json")
    cfg3 = DBConfig(f)  # read from file object
    # access dict of parsed conf. settings
    settings = cfg1.settings
"""

import os

from ruamel import yaml

__author__ = "Dan Gunter <dkgunter@lbl.gov>"
__date__ = "4/25/14"

# Constants for keys
HOST_KEY = "host"
PORT_KEY = "port"
DB_KEY = "database"
COLL_KEY = "collection"
USER_KEY = "user"
PASS_KEY = "password"
ALIASES_KEY = "aliases"


[docs]class ConfigurationFileError(Exception): """ Error for Config File """ def __init__(self, filename, err): """ Init for ConfigurationFileError. """ msg = f"reading '{filename}': {err}" Exception.__init__(self, msg)
[docs]class DBConfig: """Database configuration.""" DEFAULT_PORT = 27017 DEFAULT_FILE = "db.json" ALL_SETTINGS = [ HOST_KEY, PORT_KEY, DB_KEY, COLL_KEY, ALIASES_KEY, ] DEFAULT_SETTINGS = [ (HOST_KEY, "localhost"), (PORT_KEY, DEFAULT_PORT), (DB_KEY, "vasp"), (ALIASES_KEY, {}), ] def __init__(self, config_file=None, config_dict=None): """ Constructor. Settings are created from config_dict, if given, or parsed config_file, if given, otherwise the DEFAULT_FILE is tried and if that is not present the DEFAULT_SETTINGS are used without modification. :param config_file: Read configuration from this file. :type config_file: file or str path :param config_dict: Set configuration from this dictionary. :raises: ConfigurationFileError if cannot read/parse config_file """ self._cfg = dict(self.DEFAULT_SETTINGS) settings = {} if config_dict: settings = config_dict.copy() auth_aliases(settings) else: # Try to use DEFAULT_FILE if no config_file if config_file is None: if os.path.exists(self.DEFAULT_FILE): config_file = self.DEFAULT_FILE # If there was a config_file, parse it if config_file is not None: try: settings = get_settings(config_file) except Exception as err: path = _as_file(config_file).name raise ConfigurationFileError(path, err) self._cfg.update(settings) normalize_auth(self._cfg) def __str__(self): return str(self._cfg)
[docs] def copy(self): """Return a copy of self (internal settings are copied).""" return DBConfig(config_dict=self._cfg.copy())
@property def settings(self): """ Return settings """ return self._cfg @property def host(self): """ Return host """ return self._cfg.get(HOST_KEY, None) @property def port(self): """ Return port. """ return self._cfg.get(PORT_KEY, self.DEFAULT_PORT) @property def dbname(self): """Name of the database.""" return self._cfg.get(DB_KEY, None) @dbname.setter def dbname(self, value): """ Set dbname. """ self._cfg[DB_KEY] = value @property def collection(self): """ Return collection. """ return self._cfg.get(COLL_KEY, None) @collection.setter def collection(self, value): """ Set collection. """ self._cfg[COLL_KEY] = value @property def user(self): """ Return user. """ return self._cfg.get(USER_KEY, None) @property def password(self): """ Return password. """ return self._cfg.get(PASS_KEY, None)
[docs]def get_settings(infile): """Read settings from input file. :param infile: Input file for JSON settings. :type infile: file or str path :return: Settings parsed from file :rtype: dict """ yml = yaml.YAML() settings = yml.load(_as_file(infile)) if not hasattr(settings, "keys"): raise ValueError(f"Settings not found in {infile}") # Processing of namespaced parameters in .pmgrc.yaml. processed_settings = {} for k, v in settings.items(): if k.startswith("PMG_DB_"): processed_settings[k[7:].lower()] = v else: processed_settings[k] = v auth_aliases(processed_settings) return processed_settings
[docs]def auth_aliases(d): """Interpret user/password aliases.""" for alias, real in ((USER_KEY, "readonly_user"), (PASS_KEY, "readonly_password")): if alias in d: d[real] = d[alias] del d[alias]
[docs]def normalize_auth(settings, admin=True, readonly=True, readonly_first=False): """Transform the readonly/admin user and password to simple user/password, as expected by QueryEngine. If return value is true, then admin or readonly password will be in keys "user" and "password". :param settings: Connection settings :type settings: dict :param admin: Check for admin password :param readonly: Check for readonly password :param readonly_first: Check for readonly password before admin :return: Whether user/password were found :rtype: bool """ U, P = USER_KEY, PASS_KEY # If user/password, un-prefixed, exists, do nothing. if U in settings and P in settings: return True # Set prefixes prefixes = [] if readonly_first: if readonly: prefixes.append("readonly_") if admin: prefixes.append("admin_") else: if admin: prefixes.append("admin_") if readonly: prefixes.append("readonly_") # Look for first user/password matching. found = False for pfx in prefixes: ukey, pkey = pfx + U, pfx + P if ukey in settings and pkey in settings: settings[U] = settings[ukey] settings[P] = settings[pkey] found = True break return found
def _as_file(f, mode="r"): if isinstance(f, str): return open(f, mode) return f