Source code for pymatgen.db.query_engine

"""
This module provides a QueryEngine that simplifies queries for Mongo databases
generated using hive.
"""


__author__ = "Shyue Ping Ong, Michael Kocher, Dan Gunter"
__copyright__ = "Copyright 2011, The Materials Project"
__version__ = "2.0"
__maintainer__ = "Shyue Ping Ong"
__email__ = "shyuep@gmail.com"
__status__ = "Production"
__date__ = "Mar 2 2013"

import itertools
import json
import logging
import os
import zlib
from collections import OrderedDict
from collections.abc import Iterable

import gridfs
import pymongo
from pymatgen.core import Composition, Structure
from pymatgen.electronic_structure.core import Orbital, Spin
from pymatgen.electronic_structure.dos import CompleteDos, Dos
from pymatgen.entries.computed_entries import ComputedEntry, ComputedStructureEntry

_log = logging.getLogger("mg." + __name__)


[docs]class QueryEngine: """This class defines a QueryEngine interface to a Mongo Collection based on a set of aliases. This query engine also provides convenient translation between various pymatgen objects and database objects. The major difference between the QueryEngine's query() method and pymongo's find() method is the treatment of nested fields. QueryEngine's query will map the final result to a root level string, while pymmongo will return the doc as is. For example, let's say you have a document that is of the following form:: {"a": {"b" : 1}} Using pymongo.find({}, fields=["a.b"]), you will get a doc where you need to do doc["a"]["b"] to access the final result (1). Using QueryEngine.query(properties=["a.b"], you will obtain a result that can be accessed simply as doc["a.b"]. """ # avoid hard-coding these in other places ALIASES_CONFIG_KEY = "aliases_config" COLLECTION_KEY = "collection" HOST_KEY = "host" PORT_KEY = "port" DB_KEY = "database" USER_KEY = "user" PASSWORD_KEY = "password" # Aliases and defaults aliases = None #: See `aliases` arg to constructor default_criteria = None #: See `default_criteria` arg to constructor default_properties = None #: See `default_properties` arg to constructor # Post-processing operations query_post = None #: See `query_post` arg to constructor result_post = None #: See `result_post` arg to constructor def __init__( self, host="127.0.0.1", port=27017, database="vasp", user=None, password=None, collection="tasks", aliases_config=None, default_properties=None, query_post=None, result_post=None, connection=None, replicaset=None, **ignore, ): """Constructor. Args: host (str): Hostname of database machine. port (int): Port for db access. database (str): Name of database to access. user (str): User for db access. `None` means no authentication. password (str): Password for db access. `None` means no auth. collection (str): Collection to query. Defaults to "tasks". connection (pymongo.Connection): If given, ignore 'host' and 'port' and use existing connection. aliases_config(dict): An alias dict to use. Defaults to None, which means the default aliases defined in "aliases.json" is used. The aliases config should be of the following format:: { "aliases": { "e_above_hull": "analysis.e_above_hull", "energy": "output.final_energy", .... }, "defaults": { "state": "successful" } } aliases (dict): Keys are the incoming property, values are the property it will be translated to. This makes it easier to organize the doc format in a way that is different from the query format. defaults (dict): Criteria that should be applied by default to all queries. For example, a collection may contain data from both successful and unsuccessful runs but for most querying purposes, you may want just successful runs only. Note that defaults do not affect explicitly specified criteria, i.e., if you suppy a query for {"state": "killed"}, this will override the default for {"state": "successful"}. default_properties (list): Property names (strings) to use by default, if no `properties` are given to query(). query_post (list): Functions to post-process the `criteria` passed to `query()`, after aliases are resolved. Function takes two args, the criteria dict and list of result properties. Both may be modified in-place. result_post (list): Functions to post-process the cursor records. Function takes one arg, the document for the current record, that is modified in-place. """ self.host = host self.port = port self.replicaset = replicaset self.database_name = database if connection is None: # can't pass replicaset=None to MongoClient (fails validation) if self.replicaset: self.connection = pymongo.MongoClient(self.host, self.port, replicaset=self.replicaset) else: self.connection = pymongo.MongoClient(self.host, self.port) else: self.connection = connection self.db = self.connection[database] if user: self.db.authenticate(user, password) self.collection_name = collection self.set_aliases_and_defaults(aliases_config=aliases_config, default_properties=default_properties) # Post-processing functions self.query_post = query_post or [] self.result_post = result_post or [] @property def collection_name(self): """ Returns collection name. """ return self._collection_name @collection_name.setter def collection_name(self, value): """Switch to another collection. Note that you may have to set the aliases and default properties if the schema of the new collection differs from the current collection. """ self._collection_name = value self.collection = self.db[value]
[docs] def set_aliases_and_defaults(self, aliases_config=None, default_properties=None): """ Set the alias config and defaults to use. Typically used when switching to a collection with a different schema. Args: aliases_config: An alias dict to use. Defaults to None, which means the default aliases defined in "aliases.json" is used. See constructor for format. default_properties: List of property names (strings) to use by default, if no properties are given to the 'properties' argument of query(). """ if aliases_config is None: with open(os.path.join(os.path.dirname(__file__), "aliases.json")) as f: d = json.load(f) self.aliases = d.get("aliases", {}) self.default_criteria = d.get("defaults", {}) else: self.aliases = aliases_config.get("aliases", {}) self.default_criteria = aliases_config.get("defaults", {}) # set default properties if default_properties is None: self._default_props, self._default_prop_dict = None, None else: self._default_props, self._default_prop_dict = self._parse_properties(default_properties)
def __enter__(self): """Allows for use with the 'with' context manager""" return self def __exit__(self, exc_type, exc_val, exc_tb): """Allows for use with the 'with' context manager""" self.close()
[docs] def close(self): """Disconnects the connection.""" self.connection.disconnect()
[docs] def get_entries_in_system( self, elements, inc_structure=False, optional_data=None, additional_criteria=None, ): """ Gets all entries in a chemical system, e.g. Li-Fe-O will return all Li-O, Fe-O, Li-Fe, Li-Fe-O compounds. .. note:: The get_entries_in_system and get_entries methods should be used with care. In essence, all entries, GGA, GGA+U or otherwise, are returned. The dataset is very heterogeneous and not directly comparable. It is highly recommended that you perform post-processing using pymatgen.entries.compatibility. Args: elements: Sequence of element symbols, e.g. ['Li','Fe','O'] inc_structure: Optional parameter as to whether to include a structure with the ComputedEntry. Defaults to False. Use with care - including structures with a large number of entries can potentially slow down your code to a crawl. optional_data: Optional data to include with the entry. This allows the data to be access via entry.data[key]. additional_criteria: Added ability to provide additional criteria other than just the chemical system. Returns: List of ComputedEntries in the chemical system. """ chemsys_list = [] for i in range(len(elements)): for combi in itertools.combinations(elements, i + 1): chemsys = "-".join(sorted(combi)) chemsys_list.append(chemsys) crit = {"chemsys": {"$in": chemsys_list}} if additional_criteria is not None: crit.update(additional_criteria) return self.get_entries(crit, inc_structure, optional_data=optional_data)
[docs] def get_entries(self, criteria, inc_structure=False, optional_data=None): """ Get ComputedEntries satisfying a particular criteria. .. note:: The get_entries_in_system and get_entries methods should be used with care. In essence, all entries, GGA, GGA+U or otherwise, are returned. The dataset is very heterogeneous and not directly comparable. It is highly recommended that you perform post-processing using pymatgen.entries.compatibility. Args: criteria: Criteria obeying the same syntax as query. inc_structure: Optional parameter as to whether to include a structure with the ComputedEntry. Defaults to False. Use with care - including structures with a large number of entries can potentially slow down your code to a crawl. optional_data: Optional data to include with the entry. This allows the data to be access via entry.data[key]. Returns: List of pymatgen.entries.ComputedEntries satisfying criteria. """ all_entries = [] optional_data = [] if not optional_data else list(optional_data) optional_data.append("oxide_type") fields = list(optional_data) fields.extend( [ "task_id", "unit_cell_formula", "energy", "is_hubbard", "hubbards", "pseudo_potential.labels", "pseudo_potential.functional", "run_type", "input.is_lasph", "input.xc_override", "input.potcar_spec", ] ) if inc_structure: fields.append("output.crystal") for c in self.query(fields, criteria): func = c["pseudo_potential.functional"] labels = c["pseudo_potential.labels"] symbols = [f"{func} {label}" for label in labels] parameters = { "run_type": c["run_type"], "is_hubbard": c["is_hubbard"], "hubbards": c["hubbards"], "potcar_symbols": symbols, "is_lasph": c.get("input.is_lasph") or False, "potcar_spec": c.get("input.potcar_spec"), "xc_override": c.get("input.xc_override"), } optional_data = {k: c[k] for k in optional_data} if inc_structure: struct = Structure.from_dict(c["output.crystal"]) entry = ComputedStructureEntry( struct, c["energy"], 0.0, parameters=parameters, data=optional_data, entry_id=c["task_id"], ) else: entry = ComputedEntry( Composition(c["unit_cell_formula"]), c["energy"], 0.0, parameters=parameters, data=optional_data, entry_id=c["task_id"], ) all_entries.append(entry) return all_entries
def _parse_criteria(self, criteria): """ Internal method to perform mapping of criteria to proper mongo queries using aliases, as well as some useful sanitization. For example, string formulas such as "Fe2O3" are auto-converted to proper mongo queries of {"Fe":2, "O":3}. If 'criteria' is None, returns an empty dict. Putting this logic here simplifies callers and allows subclasses to insert something even when there are no criteria. """ if criteria is None: return {} parsed_crit = {} for k, v in self.default_criteria.items(): if k not in criteria: parsed_crit[self.aliases.get(k, k)] = v for key, crit in list(criteria.items()): if key in ["normalized_formula", "reduced_cell_formula"]: comp = Composition(crit) parsed_crit["pretty_formula"] = comp.reduced_formula elif key == "unit_cell_formula": comp = Composition(crit) crit = comp.as_dict() for el, amt in crit.items(): parsed_crit[f"{self.aliases[key]}.{el}"] = amt parsed_crit["nelements"] = len(crit) parsed_crit["pretty_formula"] = comp.reduced_formula elif key in ["$or", "$and"]: parsed_crit[key] = [self._parse_criteria(m) for m in crit] else: parsed_crit[self.aliases.get(key, key)] = crit return parsed_crit
[docs] def ensure_index(self, key, unique=False): """Wrapper for pymongo.Collection.ensure_index""" return self.collection.ensure_index(key, unique=unique)
[docs] def query(self, properties=None, criteria=None, distinct_key=None, **kwargs): r""" Convenience method for database access. All properties and criteria can be specified using simplified names defined in Aliases. You can use the supported_properties property to get the list of supported properties. Results are returned as an iterator of dicts to ensure memory and cpu efficiency. Note that the dict returned have keys also in the simplified names form, not in the mongo format. For example, if you query for "analysis.e_above_hull", the returned result must be accessed as r['analysis.e_above_hull'] instead of mongo's r['analysis']['e_above_hull']. This is a *feature* of the query engine to allow simple access to deeply nested docs without having to resort to some recursion to go deep into the result. However, if you query for 'analysis', the entire 'analysis' key is returned as r['analysis'] and then the subkeys can be accessed in the usual form, i.e., r['analysis']['e_above_hull'] :param properties: Properties to query for. Defaults to None which means all supported properties. :param criteria: Criteria to query for as a dict. :param distinct_key: If not None, the key for which to get distinct results :param \*\*kwargs: Other kwargs supported by pymongo.collection.find. Useful examples are limit, skip, sort, etc. :return: A QueryResults Iterable, which is somewhat like pymongo's cursor except that it performs mapping. In general, the dev does not need to concern himself with the form. It is sufficient to know that the results are in the form of an iterable of dicts. """ if properties is not None: props, prop_dict = self._parse_properties(properties) else: props, prop_dict = None, None crit = self._parse_criteria(criteria) if self.query_post: for func in self.query_post: func(crit, props) cur = self.collection.find(filter=crit, projection=props, **kwargs) if distinct_key is not None: cur = cur.distinct(distinct_key) return QueryListResults(prop_dict, cur, postprocess=self.result_post) return QueryResults(prop_dict, cur, postprocess=self.result_post)
def _parse_properties(self, properties): """Make list of properties into 2 things: (1) dictionary of { 'aliased-field': 1, ... } for a mongodb query eg. {''} (2) dictionary, keyed by aliased field, for display """ props = {} # TODO: clean up prop_dict? prop_dict = OrderedDict() # We use a dict instead of list to provide for a richer syntax for p in properties: if p in self.aliases: if isinstance(properties, dict): props[self.aliases[p]] = properties[p] else: props[self.aliases[p]] = 1 prop_dict[p] = self.aliases[p].split(".") else: if isinstance(properties, dict): props[p] = properties[p] else: props[p] = 1 prop_dict[p] = p.split(".") # including a lower-level key after a higher level key e.g.: # {'output': 1, 'output.crystal': 1} instead of # {'output.crystal': 1, 'output': 1} # causes mongo to skip the other higher level keys. # this is a (sketchy) workaround for that. Note this problem # doesn't appear often in python2 because the dictionary ordering # is more stable. props = OrderedDict(sorted(props.items(), reverse=True)) return props, prop_dict
[docs] def query_one(self, *args, **kwargs): """Return first document from :meth:`query`, with same parameters.""" for r in self.query(*args, **kwargs): return r return None
[docs] def get_structure_from_id(self, task_id, final_structure=True): """ Returns a structure from the database given the task id. Args: task_id: The task_id to query for. final_structure: Whether to obtain the final or initial structure. Defaults to True. """ args = {"task_id": task_id} field = "output.crystal" if final_structure else "input.crystal" results = tuple(self.query([field], args)) if len(results) > 1: raise QueryError(f"More than one result found for task_id {task_id}!") if len(results) == 0: raise QueryError(f"No structure found for task_id {task_id}!") c = results[0] return Structure.from_dict(c[field])
def __repr__(self): return f"QueryEngine: {self.host}:{self.port}/{self.database_name}"
[docs] @staticmethod def from_config(config_file, use_admin=False): """ Initialize a QueryEngine from a JSON config file generated using mgdb init. Args: config_file: Filename of config file. use_admin: If True, the admin user and password in the config file is used. Otherwise, the readonly_user and password is used. Defaults to False. Returns: QueryEngine """ with open(config_file) as f: d = json.load(f) user = d["admin_user"] if use_admin else d["readonly_user"] password = d["admin_password"] if use_admin else d["readonly_password"] return QueryEngine( host=d["host"], port=d["port"], database=d["database"], user=user, password=password, collection=d["collection"], aliases_config=d.get("aliases_config", None), )
def __getitem__(self, item): """Support pymongo.Database syntax db['collection'] to access collections. Simply delegate this to the pymongo.Database instance, so behavior is the same. """ return self.db[item]
[docs] def get_dos_from_id(self, task_id): """ Overrides the get_dos_from_id for the MIT gridfs format. """ args = {"task_id": task_id} fields = ["calculations"] structure = self.get_structure_from_id(task_id) dosid = None for r in self.query(fields, args): dosid = r["calculations"][-1]["dos_fs_id"] if dosid is not None: self._fs = gridfs.GridFS(self.db, "dos_fs") with self._fs.get(dosid) as dosfile: s = dosfile.read() try: d = json.loads(s) except Exception: s = zlib.decompress(s) d = json.loads(s.decode("utf-8")) tdos = Dos.from_dict(d) pdoss = {} for i in range(len(d["pdos"])): ados = d["pdos"][i] all_ados = {} for j in range(len(ados)): orb = Orbital(j) odos = ados[str(orb)] all_ados[orb] = {Spin(int(k)): v for k, v in odos["densities"].items()} pdoss[structure[i]] = all_ados return CompleteDos(structure, tdos, pdoss) return None
[docs]class QueryResults(Iterable): """ Iterable wrapper for results from QueryEngine. Like pymongo's cursor, this object should generally not be instantiated, but should be obtained from a queryengine. It delegates many attributes to the underlying pymongo cursor, and should support nearly all cursor like attributes such as count(), explain(), hint(), etc. Please see pymongo cursor documentation for details. """ def __init__(self, prop_dict, result_cursor, postprocess=None): """Constructor. :param prop_dict: Properties :param result_cursor: Iterable returning records :param postprocess: List of functions, each taking a record and modifying it in-place, or None, or an empty list """ self._results = result_cursor self._prop_dict = prop_dict self._pproc = postprocess or [] # make empty values iterable def _wrapper(self, func): """ This function wraps all callable objects returned by self.__getattr__. If the result is a cursor, wrap it into a QueryResults object so that you can invoke postprocess functions in self._pproc """ def wrapped(*args, **kwargs): ret_val = func(*args, **kwargs) if isinstance(ret_val, pymongo.cursor.Cursor): ret_val = self.from_cursor(ret_val) return ret_val return wrapped def __getattr__(self, attr): """ Override getattr to make QueryResults inherit all pymongo cursor attributes. Wrap any callable object with _wrapper to intercept cursors and wrap them as a QueryResults object. """ if hasattr(self._results, attr): ret_val = getattr(self._results, attr) # wrap callable objects to convert returned cursors into QueryResults if callable(ret_val): return self._wrapper(ret_val) return ret_val raise AttributeError
[docs] def clone(self): """ Provide a clone of the QueryResults. """ return QueryResults(self._prop_dict, self._results.clone())
[docs] def from_cursor(self, cursor): """ Create a QueryResults object from a cursor object """ return QueryResults(self._prop_dict, cursor, self._pproc)
def __len__(self): """Return length as a `count()` on the MongoDB cursor.""" return len(list(self._results.clone())) def __getitem__(self, i): return self._mapped_result(self._results[i]) def __iter__(self): return self._result_generator() def _mapped_result(self, r): """Transform/map a result.""" # Apply result_post funcs for pulling out sandbox properties for func in self._pproc: func(r) # If we haven't asked for specific properties, just return object if not self._prop_dict: result = r else: result = {} # Map aliased keys back to original key for k, v in self._prop_dict.items(): try: result[k] = self._mapped_result_path(v[1:], data=r[v[0]]) except (IndexError, KeyError, ValueError): result[k] = None return result @staticmethod def _mapped_result_path(path, data=None): if not path: return data if isinstance(data, list): return [QueryResults._mapped_result_path(path, d) for d in data] try: return QueryResults._mapped_result_path(path[1:], data[path[0]]) except (IndexError, KeyError, ValueError): return None def _result_generator(self): for r in self._results: yield self._mapped_result(r)
[docs]class QueryListResults(QueryResults): """Set of QueryResults on a list instead of a MongoDB cursor."""
[docs] def clone(self): """ Return a clone of the QueryListResults. """ return QueryResults(self._prop_dict, self._results[:])
def __len__(self): """Return length of iterable, as a list if possible; otherwise, fall back to the superclass' implementation. """ if hasattr(self._results, "__len__"): return len(self._results) return QueryResults.__len__(self)
[docs]class QueryError(Exception): """ Exception class for errors occuring during queries. """ pass