Source code for pymatgen.db.util

"""
Utility functions used across scripts.
"""

import datetime
import json
import logging

import bson
from pymongo.mongo_client import MongoClient

from pymatgen.db.config import DBConfig

DEFAULT_PORT = DBConfig.DEFAULT_PORT
DEFAULT_CONFIG_FILE = DBConfig.DEFAULT_FILE
DEFAULT_SETTINGS = DBConfig.DEFAULT_SETTINGS

_log = logging.getLogger("mg.util")


[docs]class MongoJSONEncoder(json.JSONEncoder): """ JSON encoder to support ObjectIDs and datetime used in Mongo. """
[docs] def default(self, o): """ Override default to support ObjectID and datetime. """ if isinstance(o, bson.objectid.ObjectId): return str(o) if isinstance(o, datetime.datetime): return o.isoformat() return json.JSONEncoder.default(self, o)
[docs]def get_settings(config_file): """ Get settings from file. """ cfg = DBConfig(config_file) return cfg.settings
[docs]def get_database(config_file=None, settings=None, admin=False, **kwargs): """ Get a database object from a config file. """ d = get_settings(config_file) if settings is None else settings try: user = d["admin_user"] if admin else d["readonly_user"] passwd = d["admin_password"] if admin else d["readonly_password"] conn = MongoClient( host=d["host"], port=d["port"], username=user, password=passwd, authSource=d["database"], **kwargs ) db = conn[d["database"]] except (KeyError, TypeError, ValueError) as ex: print(str(ex)) _log.warning("No {admin,readonly}_user/password found in config. file, accessing DB without authentication") return db
[docs]def get_collection(config_file, admin=False, settings=None): """ Get a collection from a config file. :param config_file Path to filename :param admin Whether to use admin credentials. Default to False. :param settings Whether to override settings or obtain from config file (None). """ if settings is None: settings = get_settings(config_file) db = get_database(admin=admin, settings=settings) return db[settings["collection"]]
[docs]def collection_keys(coll, sep="."): """Get a list of all (including nested) keys in a collection. Examines the first document in the collection. :param sep: Separator for nested keys :return: List of str """ def _keys(x, pre=""): for k in x: yield pre + k if isinstance(x[k], dict): yield from _keys(x[k], pre + k + sep) return list(_keys(coll.find_one()))