Skip to content

Stores

Module containing various definitions of Stores. Stores are a default access pattern to data and provide various utilities

JSONStore (MemoryStore)

A Store for access to a single or multiple JSON files

Source code in maggma/stores/mongolike.py
class JSONStore(MemoryStore):
    """
    A Store for access to a single or multiple JSON files
    """

    def __init__(
        self,
        paths: Union[str, List[str]],
        read_only: bool = True,
        **kwargs,
    ):
        """
        Args:
            paths: paths for json files to turn into a Store
            read_only: whether this JSONStore is read only. When read_only=True,
                       the JSONStore can still apply MongoDB-like writable operations
                       (e.g. an update) because it behaves like a MemoryStore,
                       but it will not write those changes to the file. On the other hand,
                       if read_only=False (i.e., it is writeable), the JSON file
                       will be automatically updated every time a write-like operation is
                       performed.

                       Note that when read_only=False, JSONStore only supports a single JSON
                       file. If the file does not exist, it will be automatically created
                       when the JSONStore is initialized.
        """
        paths = paths if isinstance(paths, (list, tuple)) else [paths]
        self.paths = paths

        # file_writable overrides read_only for compatibility reasons
        if "file_writable" in kwargs:
            file_writable = kwargs.pop("file_writable")
            warnings.warn(
                "file_writable is deprecated; use read only instead.",
                DeprecationWarning,
            )
            self.read_only = not file_writable
            if self.read_only != read_only:
                warnings.warn(
                    f"Received conflicting keyword arguments file_writable={file_writable}"
                    f" and read_only={read_only}. Setting read_only={file_writable}.",
                    UserWarning,
                )
        else:
            self.read_only = read_only
        self.kwargs = kwargs

        if not self.read_only and len(paths) > 1:
            raise RuntimeError("Cannot instantiate file-writable JSONStore with multiple JSON files.")

        # create the .json file if it does not exist
        if not self.read_only and not Path(self.paths[0]).exists():
            with zopen(self.paths[0], "w") as f:
                data: List[dict] = []
                bytesdata = orjson.dumps(data)
                f.write(bytesdata.decode("utf-8"))

        self.default_sort = None

        super().__init__(**kwargs)

    def connect(self, force_reset=False):
        """
        Loads the files into the collection in memory
        """
        super().connect(force_reset=force_reset)
        for path in self.paths:
            objects = self.read_json_file(path)
            try:
                self.update(objects)
            except KeyError:
                raise KeyError(
                    f"""
                    Key field '{self.key}' not found in {path.name}. This
                    could mean that this JSONStore was initially created with a different key field.
                    The keys found in the .json file are {list(objects[0].keys())}. Try
                    re-initializing your JSONStore using one of these as the key arguments.
                    """
                )

    def read_json_file(self, path) -> List:
        """
        Helper method to read the contents of a JSON file and generate
        a list of docs.
        Args:
            path: Path to the JSON file to be read
        """
        with zopen(path) as f:
            data = f.read()
            data = data.decode() if isinstance(data, bytes) else data
            objects = orjson.loads(data)
            objects = [objects] if not isinstance(objects, list) else objects
            # datetime objects deserialize to str. Try to convert the last_updated
            # field back to datetime.
            # # TODO - there may still be problems caused if a JSONStore is init'ed from
            # documents that don't contain a last_updated field
            # See Store.last_updated in store.py.
            for obj in objects:
                if obj.get(self.last_updated_field):
                    obj[self.last_updated_field] = to_dt(obj[self.last_updated_field])

        return objects

    def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = None):
        """
        Update documents into the Store.

        For a file-writable JSONStore, the json file is updated.

        Args:
            docs: the document or list of documents to update
            key: field name(s) to determine uniqueness for a
                 document, can be a list of multiple fields,
                 a single field, or None if the Store's key
                 field is to be used
        """
        super().update(docs=docs, key=key)
        if not self.read_only:
            self.update_json_file()

    def remove_docs(self, criteria: Dict):
        """
        Remove docs matching the query dictionary.

        For a file-writable JSONStore, the json file is updated.

        Args:
            criteria: query dictionary to match
        """
        super().remove_docs(criteria=criteria)
        if not self.read_only:
            self.update_json_file()

    def update_json_file(self):
        """
        Updates the json file when a write-like operation is performed.
        """
        with zopen(self.paths[0], "w") as f:
            data = [d for d in self.query()]
            for d in data:
                d.pop("_id")
            bytesdata = orjson.dumps(data)
            f.write(bytesdata.decode("utf-8"))

    def __hash__(self):
        return hash((*self.paths, self.last_updated_field))

    def __eq__(self, other: object) -> bool:
        """
        Check equality for JSONStore

        Args:
            other: other JSONStore to compare with
        """
        if not isinstance(other, JSONStore):
            return False

        fields = ["paths", "last_updated_field"]
        return all(getattr(self, f) == getattr(other, f) for f in fields)

__eq__(self, other) special

Check equality for JSONStore

Parameters:

Name Type Description Default
other object

other JSONStore to compare with

required
Source code in maggma/stores/mongolike.py
def __eq__(self, other: object) -> bool:
    """
    Check equality for JSONStore

    Args:
        other: other JSONStore to compare with
    """
    if not isinstance(other, JSONStore):
        return False

    fields = ["paths", "last_updated_field"]
    return all(getattr(self, f) == getattr(other, f) for f in fields)

__init__(self, paths, read_only=True, **kwargs) special

Parameters:

Name Type Description Default
paths Union[str, List[str]]

paths for json files to turn into a Store

required
read_only bool

whether this JSONStore is read only. When read_only=True, the JSONStore can still apply MongoDB-like writable operations (e.g. an update) because it behaves like a MemoryStore, but it will not write those changes to the file. On the other hand, if read_only=False (i.e., it is writeable), the JSON file will be automatically updated every time a write-like operation is performed.

   Note that when read_only=False, JSONStore only supports a single JSON
   file. If the file does not exist, it will be automatically created
   when the JSONStore is initialized.
True
Source code in maggma/stores/mongolike.py
def __init__(
    self,
    paths: Union[str, List[str]],
    read_only: bool = True,
    **kwargs,
):
    """
    Args:
        paths: paths for json files to turn into a Store
        read_only: whether this JSONStore is read only. When read_only=True,
                   the JSONStore can still apply MongoDB-like writable operations
                   (e.g. an update) because it behaves like a MemoryStore,
                   but it will not write those changes to the file. On the other hand,
                   if read_only=False (i.e., it is writeable), the JSON file
                   will be automatically updated every time a write-like operation is
                   performed.

                   Note that when read_only=False, JSONStore only supports a single JSON
                   file. If the file does not exist, it will be automatically created
                   when the JSONStore is initialized.
    """
    paths = paths if isinstance(paths, (list, tuple)) else [paths]
    self.paths = paths

    # file_writable overrides read_only for compatibility reasons
    if "file_writable" in kwargs:
        file_writable = kwargs.pop("file_writable")
        warnings.warn(
            "file_writable is deprecated; use read only instead.",
            DeprecationWarning,
        )
        self.read_only = not file_writable
        if self.read_only != read_only:
            warnings.warn(
                f"Received conflicting keyword arguments file_writable={file_writable}"
                f" and read_only={read_only}. Setting read_only={file_writable}.",
                UserWarning,
            )
    else:
        self.read_only = read_only
    self.kwargs = kwargs

    if not self.read_only and len(paths) > 1:
        raise RuntimeError("Cannot instantiate file-writable JSONStore with multiple JSON files.")

    # create the .json file if it does not exist
    if not self.read_only and not Path(self.paths[0]).exists():
        with zopen(self.paths[0], "w") as f:
            data: List[dict] = []
            bytesdata = orjson.dumps(data)
            f.write(bytesdata.decode("utf-8"))

    self.default_sort = None

    super().__init__(**kwargs)

connect(self, force_reset=False)

Loads the files into the collection in memory

Source code in maggma/stores/mongolike.py
def connect(self, force_reset=False):
    """
    Loads the files into the collection in memory
    """
    super().connect(force_reset=force_reset)
    for path in self.paths:
        objects = self.read_json_file(path)
        try:
            self.update(objects)
        except KeyError:
            raise KeyError(
                f"""
                Key field '{self.key}' not found in {path.name}. This
                could mean that this JSONStore was initially created with a different key field.
                The keys found in the .json file are {list(objects[0].keys())}. Try
                re-initializing your JSONStore using one of these as the key arguments.
                """
            )

read_json_file(self, path)

Helper method to read the contents of a JSON file and generate a list of docs.

Parameters:

Name Type Description Default
path

Path to the JSON file to be read

required
Source code in maggma/stores/mongolike.py
def read_json_file(self, path) -> List:
    """
    Helper method to read the contents of a JSON file and generate
    a list of docs.
    Args:
        path: Path to the JSON file to be read
    """
    with zopen(path) as f:
        data = f.read()
        data = data.decode() if isinstance(data, bytes) else data
        objects = orjson.loads(data)
        objects = [objects] if not isinstance(objects, list) else objects
        # datetime objects deserialize to str. Try to convert the last_updated
        # field back to datetime.
        # # TODO - there may still be problems caused if a JSONStore is init'ed from
        # documents that don't contain a last_updated field
        # See Store.last_updated in store.py.
        for obj in objects:
            if obj.get(self.last_updated_field):
                obj[self.last_updated_field] = to_dt(obj[self.last_updated_field])

    return objects

remove_docs(self, criteria)

Remove docs matching the query dictionary.

For a file-writable JSONStore, the json file is updated.

Parameters:

Name Type Description Default
criteria Dict

query dictionary to match

required
Source code in maggma/stores/mongolike.py
def remove_docs(self, criteria: Dict):
    """
    Remove docs matching the query dictionary.

    For a file-writable JSONStore, the json file is updated.

    Args:
        criteria: query dictionary to match
    """
    super().remove_docs(criteria=criteria)
    if not self.read_only:
        self.update_json_file()

update(self, docs, key=None)

Update documents into the Store.

For a file-writable JSONStore, the json file is updated.

Parameters:

Name Type Description Default
docs Union[List[Dict], Dict]

the document or list of documents to update

required
key Union[List, str]

field name(s) to determine uniqueness for a document, can be a list of multiple fields, a single field, or None if the Store's key field is to be used

None
Source code in maggma/stores/mongolike.py
def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = None):
    """
    Update documents into the Store.

    For a file-writable JSONStore, the json file is updated.

    Args:
        docs: the document or list of documents to update
        key: field name(s) to determine uniqueness for a
             document, can be a list of multiple fields,
             a single field, or None if the Store's key
             field is to be used
    """
    super().update(docs=docs, key=key)
    if not self.read_only:
        self.update_json_file()

update_json_file(self)

Updates the json file when a write-like operation is performed.

Source code in maggma/stores/mongolike.py
def update_json_file(self):
    """
    Updates the json file when a write-like operation is performed.
    """
    with zopen(self.paths[0], "w") as f:
        data = [d for d in self.query()]
        for d in data:
            d.pop("_id")
        bytesdata = orjson.dumps(data)
        f.write(bytesdata.decode("utf-8"))

MemoryStore (MongoStore)

An in-memory Store that functions similarly to a MongoStore

Source code in maggma/stores/mongolike.py
class MemoryStore(MongoStore):
    """
    An in-memory Store that functions similarly
    to a MongoStore
    """

    def __init__(self, collection_name: str = "memory_db", **kwargs):
        """
        Initializes the Memory Store
        Args:
            collection_name: name for the collection in memory
        """
        self.collection_name = collection_name
        self.default_sort = None
        self._coll = None
        self.kwargs = kwargs
        super(MongoStore, self).__init__(**kwargs)  # noqa

    def connect(self, force_reset: bool = False):
        """
        Connect to the source data
        """

        if self._coll is None or force_reset:
            self._coll = mongomock.MongoClient().db[self.name]

    def close(self):
        """Close up all collections"""
        self._coll.database.client.close()

    @property
    def name(self):
        """Name for the store"""
        return f"mem://{self.collection_name}"

    def __hash__(self):
        """Hash for the store"""
        return hash((self.name, self.last_updated_field))

    def groupby(
        self,
        keys: Union[List[str], str],
        criteria: Optional[Dict] = None,
        properties: Union[Dict, List, None] = None,
        sort: Optional[Dict[str, Union[Sort, int]]] = None,
        skip: int = 0,
        limit: int = 0,
    ) -> Iterator[Tuple[Dict, List[Dict]]]:
        """
        Simple grouping function that will group documents
        by keys.

        Args:
            keys: fields to group documents
            criteria: PyMongo filter for documents to search in
            properties: properties to return in grouped documents
            sort: Dictionary of sort order for fields. Keys are field names and
                values are 1 for ascending or -1 for descending.
            skip: number documents to skip
            limit: limit on total number of documents returned

        Returns:
            generator returning tuples of (key, list of elemnts)
        """
        keys = keys if isinstance(keys, list) else [keys]

        if properties is None:
            properties = []
        if isinstance(properties, dict):
            properties = list(properties.keys())

        data = [
            doc for doc in self.query(properties=keys + properties, criteria=criteria) if all(has(doc, k) for k in keys)
        ]

        def grouping_keys(doc):
            return tuple(get(doc, k) for k in keys)

        for vals, group in groupby(sorted(data, key=grouping_keys), key=grouping_keys):
            doc = {}  # type: Dict[Any,Any]
            for k, v in zip(keys, vals):
                set_(doc, k, v)
            yield doc, list(group)

    def __eq__(self, other: object) -> bool:
        """
        Check equality for MemoryStore
        other: other MemoryStore to compare with
        """
        if not isinstance(other, MemoryStore):
            return False

        fields = ["collection_name", "last_updated_field"]
        return all(getattr(self, f) == getattr(other, f) for f in fields)

name property readonly

Name for the store

__eq__(self, other) special

Check equality for MemoryStore other: other MemoryStore to compare with

Source code in maggma/stores/mongolike.py
def __eq__(self, other: object) -> bool:
    """
    Check equality for MemoryStore
    other: other MemoryStore to compare with
    """
    if not isinstance(other, MemoryStore):
        return False

    fields = ["collection_name", "last_updated_field"]
    return all(getattr(self, f) == getattr(other, f) for f in fields)

__hash__(self) special

Hash for the store

Source code in maggma/stores/mongolike.py
def __hash__(self):
    """Hash for the store"""
    return hash((self.name, self.last_updated_field))

__init__(self, collection_name='memory_db', **kwargs) special

Initializes the Memory Store

Parameters:

Name Type Description Default
collection_name str

name for the collection in memory

'memory_db'
Source code in maggma/stores/mongolike.py
def __init__(self, collection_name: str = "memory_db", **kwargs):
    """
    Initializes the Memory Store
    Args:
        collection_name: name for the collection in memory
    """
    self.collection_name = collection_name
    self.default_sort = None
    self._coll = None
    self.kwargs = kwargs
    super(MongoStore, self).__init__(**kwargs)  # noqa

close(self)

Close up all collections

Source code in maggma/stores/mongolike.py
def close(self):
    """Close up all collections"""
    self._coll.database.client.close()

connect(self, force_reset=False)

Connect to the source data

Source code in maggma/stores/mongolike.py
def connect(self, force_reset: bool = False):
    """
    Connect to the source data
    """

    if self._coll is None or force_reset:
        self._coll = mongomock.MongoClient().db[self.name]

groupby(self, keys, criteria=None, properties=None, sort=None, skip=0, limit=0)

Simple grouping function that will group documents by keys.

Parameters:

Name Type Description Default
keys Union[List[str], str]

fields to group documents

required
criteria Optional[Dict]

PyMongo filter for documents to search in

None
properties Union[Dict, List]

properties to return in grouped documents

None
sort Optional[Dict[str, Union[maggma.core.store.Sort, int]]]

Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending.

None
skip int

number documents to skip

0
limit int

limit on total number of documents returned

0

Returns:

Type Description
Iterator[Tuple[Dict, List[Dict]]]

generator returning tuples of (key, list of elemnts)

Source code in maggma/stores/mongolike.py
def groupby(
    self,
    keys: Union[List[str], str],
    criteria: Optional[Dict] = None,
    properties: Union[Dict, List, None] = None,
    sort: Optional[Dict[str, Union[Sort, int]]] = None,
    skip: int = 0,
    limit: int = 0,
) -> Iterator[Tuple[Dict, List[Dict]]]:
    """
    Simple grouping function that will group documents
    by keys.

    Args:
        keys: fields to group documents
        criteria: PyMongo filter for documents to search in
        properties: properties to return in grouped documents
        sort: Dictionary of sort order for fields. Keys are field names and
            values are 1 for ascending or -1 for descending.
        skip: number documents to skip
        limit: limit on total number of documents returned

    Returns:
        generator returning tuples of (key, list of elemnts)
    """
    keys = keys if isinstance(keys, list) else [keys]

    if properties is None:
        properties = []
    if isinstance(properties, dict):
        properties = list(properties.keys())

    data = [
        doc for doc in self.query(properties=keys + properties, criteria=criteria) if all(has(doc, k) for k in keys)
    ]

    def grouping_keys(doc):
        return tuple(get(doc, k) for k in keys)

    for vals, group in groupby(sorted(data, key=grouping_keys), key=grouping_keys):
        doc = {}  # type: Dict[Any,Any]
        for k, v in zip(keys, vals):
            set_(doc, k, v)
        yield doc, list(group)

MongoStore (Store)

A Store that connects to a Mongo collection

Source code in maggma/stores/mongolike.py
class MongoStore(Store):
    """
    A Store that connects to a Mongo collection
    """

    def __init__(
        self,
        database: str,
        collection_name: str,
        host: str = "localhost",
        port: int = 27017,
        username: str = "",
        password: str = "",
        ssh_tunnel: Optional[SSHTunnel] = None,
        safe_update: bool = False,
        auth_source: Optional[str] = None,
        mongoclient_kwargs: Optional[Dict] = None,
        default_sort: Optional[Dict[str, Union[Sort, int]]] = None,
        **kwargs,
    ):
        """
        Args:
            database: The database name
            collection_name: The collection name
            host: Hostname for the database
            port: TCP port to connect to
            username: Username for the collection
            password: Password to connect with
            safe_update: fail gracefully on DocumentTooLarge errors on update
            auth_source: The database to authenticate on. Defaults to the database name.
            default_sort: Default sort field and direction to use when querying. Can be used to
                ensure determinacy in query results.
        """
        self.database = database
        self.collection_name = collection_name
        self.host = host
        self.port = port
        self.username = username
        self.password = password
        self.ssh_tunnel = ssh_tunnel
        self.safe_update = safe_update
        self.default_sort = default_sort
        self._coll = None  # type: Any
        self.kwargs = kwargs

        if auth_source is None:
            auth_source = self.database
        self.auth_source = auth_source
        self.mongoclient_kwargs = mongoclient_kwargs or {}

        super().__init__(**kwargs)

    @property
    def name(self) -> str:
        """
        Return a string representing this data source
        """
        return f"mongo://{self.host}/{self.database}/{self.collection_name}"

    def connect(self, force_reset: bool = False):
        """
        Connect to the source data
        """
        if self._coll is None or force_reset:
            if self.ssh_tunnel is None:
                host = self.host
                port = self.port
            else:
                self.ssh_tunnel.start()
                host, port = self.ssh_tunnel.local_address

            conn: MongoClient = (
                MongoClient(
                    host=host,
                    port=port,
                    username=self.username,
                    password=self.password,
                    authSource=self.auth_source,
                    **self.mongoclient_kwargs,
                )
                if self.username != ""
                else MongoClient(host, port, **self.mongoclient_kwargs)
            )
            db = conn[self.database]
            self._coll = db[self.collection_name]

    def __hash__(self) -> int:
        """Hash for MongoStore"""
        return hash((self.database, self.collection_name, self.last_updated_field))

    @classmethod
    def from_db_file(cls, filename: str, **kwargs):
        """
        Convenience method to construct MongoStore from db_file
        from old QueryEngine format
        """
        kwargs = loadfn(filename)
        if "collection" in kwargs:
            kwargs["collection_name"] = kwargs.pop("collection")
        # Get rid of aliases from traditional query engine db docs
        kwargs.pop("aliases", None)
        return cls(**kwargs)

    @classmethod
    def from_launchpad_file(cls, lp_file, collection_name, **kwargs):
        """
        Convenience method to construct MongoStore from a launchpad file

        Note: A launchpad file is a special formatted yaml file used in fireworks

        Returns:
        """
        with open(lp_file, "r") as f:
            lp_creds = yaml.load(f, Loader=yaml.FullLoader)

        db_creds = lp_creds.copy()
        db_creds["database"] = db_creds["name"]
        for key in list(db_creds.keys()):
            if key not in ["database", "host", "port", "username", "password"]:
                db_creds.pop(key)
        db_creds["collection_name"] = collection_name

        return cls(**db_creds, **kwargs)

    def distinct(self, field: str, criteria: Optional[Dict] = None, all_exist: bool = False) -> List:
        """
        Get all distinct values for a field

        Args:
            field: the field(s) to get distinct values for
            criteria: PyMongo filter for documents to search in
        """

        criteria = criteria or {}
        try:
            distinct_vals = self._collection.distinct(field, criteria)
        except (OperationFailure, DocumentTooLarge):
            distinct_vals = [
                d["_id"] for d in self._collection.aggregate([{"$match": criteria}, {"$group": {"_id": f"${field}"}}])
            ]
            if all(isinstance(d, list) for d in filter(None, distinct_vals)):  # type: ignore
                distinct_vals = list(chain.from_iterable(filter(None, distinct_vals)))

        return distinct_vals if distinct_vals is not None else []

    def groupby(
        self,
        keys: Union[List[str], str],
        criteria: Optional[Dict] = None,
        properties: Union[Dict, List, None] = None,
        sort: Optional[Dict[str, Union[Sort, int]]] = None,
        skip: int = 0,
        limit: int = 0,
    ) -> Iterator[Tuple[Dict, List[Dict]]]:
        """
        Simple grouping function that will group documents
        by keys.

        Args:
            keys: fields to group documents
            criteria: PyMongo filter for documents to search in
            properties: properties to return in grouped documents
            sort: Dictionary of sort order for fields. Keys are field names and
                values are 1 for ascending or -1 for descending.
            skip: number documents to skip
            limit: limit on total number of documents returned

        Returns:
            generator returning tuples of (key, list of docs)
        """
        pipeline = []
        if isinstance(keys, str):
            keys = [keys]

        if properties is None:
            properties = []
        if isinstance(properties, dict):
            properties = list(properties.keys())

        if criteria is not None:
            pipeline.append({"$match": criteria})

        if len(properties) > 0:
            pipeline.append({"$project": {p: 1 for p in properties + keys}})

        alpha = "abcdefghijklmnopqrstuvwxyz"
        group_id = {letter: f"${key}" for letter, key in zip(alpha, keys)}
        pipeline.append({"$group": {"_id": group_id, "docs": {"$push": "$$ROOT"}}})
        for d in self._collection.aggregate(pipeline, allowDiskUse=True):
            id_doc = {}  # type: Dict[str,Any]
            for letter, key in group_id.items():
                if has(d["_id"], letter):
                    set_(id_doc, key[1:], d["_id"][letter])
            yield (id_doc, d["docs"])

    @classmethod
    def from_collection(cls, collection):
        """
        Generates a MongoStore from a pymongo collection object
        This is not a fully safe operation as it gives dummy information to the MongoStore
        As a result, this will not serialize and can not reset its connection

        Args:
            collection: the PyMongo collection to create a MongoStore around
        """
        # TODO: How do we make this safer?
        coll_name = collection.name
        db_name = collection.database.name

        store = cls(db_name, coll_name)
        store._coll = collection
        return store

    @property
    def _collection(self):
        """Property referring to underlying pymongo collection"""
        if self._coll is None:
            raise StoreError("Must connect Mongo-like store before attemping to use it")
        return self._coll

    def count(
        self,
        criteria: Optional[Dict] = None,
        hint: Optional[Dict[str, Union[Sort, int]]] = None,
    ) -> int:
        """
        Counts the number of documents matching the query criteria

        Args:
            criteria: PyMongo filter for documents to count in
            hint: Dictionary of indexes to use as hints for query optimizer.
                Keys are field names and values are 1 for ascending or -1 for descending.
        """

        criteria = criteria if criteria else {}

        hint_list = (
            [(k, Sort(v).value) if isinstance(v, int) else (k, v.value) for k, v in hint.items()] if hint else None
        )

        if hint_list is not None:  # pragma: no cover
            return self._collection.count_documents(filter=criteria, hint=hint_list)

        return self._collection.count_documents(filter=criteria)

    def query(  # type: ignore
        self,
        criteria: Optional[Dict] = None,
        properties: Union[Dict, List, None] = None,
        sort: Optional[Dict[str, Union[Sort, int]]] = None,
        hint: Optional[Dict[str, Union[Sort, int]]] = None,
        skip: int = 0,
        limit: int = 0,
        **kwargs,
    ) -> Iterator[Dict]:
        """
        Queries the Store for a set of documents

        Args:
            criteria: PyMongo filter for documents to search in
            properties: properties to return in grouped documents
            sort: Dictionary of sort order for fields. Keys are field names and
                values are 1 for ascending or -1 for descending.
            hint: Dictionary of indexes to use as hints for query optimizer.
                Keys are field names and values are 1 for ascending or -1 for descending.
            skip: number documents to skip
            limit: limit on total number of documents returned
            mongoclient_kwargs: Dict of extra kwargs to pass to pymongo find.
        """
        if isinstance(properties, list):
            properties = {p: 1 for p in properties}

        default_sort_formatted = None

        if self.default_sort is not None:
            default_sort_formatted = [
                (k, Sort(v).value) if isinstance(v, int) else (k, v.value) for k, v in self.default_sort.items()
            ]

        sort_list = (
            [(k, Sort(v).value) if isinstance(v, int) else (k, v.value) for k, v in sort.items()]
            if sort
            else default_sort_formatted
        )

        hint_list = (
            [(k, Sort(v).value) if isinstance(v, int) else (k, v.value) for k, v in hint.items()] if hint else None
        )

        for d in self._collection.find(
            filter=criteria, projection=properties, skip=skip, limit=limit, sort=sort_list, hint=hint_list, **kwargs
        ):
            yield d

    def ensure_index(self, key: str, unique: Optional[bool] = False) -> bool:
        """
        Tries to create an index and return true if it suceeded
        Args:
            key: single key to index
            unique: Whether or not this index contains only unique keys

        Returns:
            bool indicating if the index exists/was created
        """

        if confirm_field_index(self._collection, key):
            return True
        else:
            try:
                self._collection.create_index(key, unique=unique, background=True)
                return True
            except Exception:
                return False

    def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = None):
        """
        Update documents into the Store

        Args:
            docs: the document or list of documents to update
            key: field name(s) to determine uniqueness for a
                 document, can be a list of multiple fields,
                 a single field, or None if the Store's key
                 field is to be used
        """

        requests = []

        if not isinstance(docs, list):
            docs = [docs]

        for d in docs:

            d = jsanitize(d, allow_bson=True)

            # document-level validation is optional
            validates = True
            if self.validator:
                validates = self.validator.is_valid(d)
                if not validates:
                    if self.validator.strict:
                        raise ValueError(self.validator.validation_errors(d))
                    else:
                        self.logger.error(self.validator.validation_errors(d))

            if validates:
                key = key or self.key
                if isinstance(key, list):
                    search_doc = {k: d[k] for k in key}
                else:
                    search_doc = {key: d[key]}

                requests.append(ReplaceOne(search_doc, d, upsert=True))

        if len(requests) > 0:
            try:
                self._collection.bulk_write(requests, ordered=False)
            except (OperationFailure, DocumentTooLarge) as e:
                if self.safe_update:
                    for req in requests:
                        req._filter
                        try:
                            self._collection.bulk_write([req], ordered=False)
                        except (OperationFailure, DocumentTooLarge):
                            self.logger.error(
                                f"Could not upload document for {req._filter} as it was too large for Mongo"
                            )
                else:
                    raise e

    def remove_docs(self, criteria: Dict):
        """
        Remove docs matching the query dictionary

        Args:
            criteria: query dictionary to match
        """
        self._collection.delete_many(filter=criteria)

    def close(self):
        """Close up all collections"""
        self._collection.database.client.close()
        self._coll = None
        if self.ssh_tunnel is not None:
            self.ssh_tunnel.stop()

    def __eq__(self, other: object) -> bool:
        """
        Check equality for MongoStore
        other: other mongostore to compare with
        """
        if not isinstance(other, MongoStore):
            return False

        fields = ["database", "collection_name", "host", "port", "last_updated_field"]
        return all(getattr(self, f) == getattr(other, f) for f in fields)

name: str property readonly

Return a string representing this data source

__eq__(self, other) special

Check equality for MongoStore other: other mongostore to compare with

Source code in maggma/stores/mongolike.py
def __eq__(self, other: object) -> bool:
    """
    Check equality for MongoStore
    other: other mongostore to compare with
    """
    if not isinstance(other, MongoStore):
        return False

    fields = ["database", "collection_name", "host", "port", "last_updated_field"]
    return all(getattr(self, f) == getattr(other, f) for f in fields)

__hash__(self) special

Hash for MongoStore

Source code in maggma/stores/mongolike.py
def __hash__(self) -> int:
    """Hash for MongoStore"""
    return hash((self.database, self.collection_name, self.last_updated_field))

__init__(self, database, collection_name, host='localhost', port=27017, username='', password='', ssh_tunnel=None, safe_update=False, auth_source=None, mongoclient_kwargs=None, default_sort=None, **kwargs) special

Parameters:

Name Type Description Default
database str

The database name

required
collection_name str

The collection name

required
host str

Hostname for the database

'localhost'
port int

TCP port to connect to

27017
username str

Username for the collection

''
password str

Password to connect with

''
safe_update bool

fail gracefully on DocumentTooLarge errors on update

False
auth_source Optional[str]

The database to authenticate on. Defaults to the database name.

None
default_sort Optional[Dict[str, Union[maggma.core.store.Sort, int]]]

Default sort field and direction to use when querying. Can be used to ensure determinacy in query results.

None
Source code in maggma/stores/mongolike.py
def __init__(
    self,
    database: str,
    collection_name: str,
    host: str = "localhost",
    port: int = 27017,
    username: str = "",
    password: str = "",
    ssh_tunnel: Optional[SSHTunnel] = None,
    safe_update: bool = False,
    auth_source: Optional[str] = None,
    mongoclient_kwargs: Optional[Dict] = None,
    default_sort: Optional[Dict[str, Union[Sort, int]]] = None,
    **kwargs,
):
    """
    Args:
        database: The database name
        collection_name: The collection name
        host: Hostname for the database
        port: TCP port to connect to
        username: Username for the collection
        password: Password to connect with
        safe_update: fail gracefully on DocumentTooLarge errors on update
        auth_source: The database to authenticate on. Defaults to the database name.
        default_sort: Default sort field and direction to use when querying. Can be used to
            ensure determinacy in query results.
    """
    self.database = database
    self.collection_name = collection_name
    self.host = host
    self.port = port
    self.username = username
    self.password = password
    self.ssh_tunnel = ssh_tunnel
    self.safe_update = safe_update
    self.default_sort = default_sort
    self._coll = None  # type: Any
    self.kwargs = kwargs

    if auth_source is None:
        auth_source = self.database
    self.auth_source = auth_source
    self.mongoclient_kwargs = mongoclient_kwargs or {}

    super().__init__(**kwargs)

close(self)

Close up all collections

Source code in maggma/stores/mongolike.py
def close(self):
    """Close up all collections"""
    self._collection.database.client.close()
    self._coll = None
    if self.ssh_tunnel is not None:
        self.ssh_tunnel.stop()

connect(self, force_reset=False)

Connect to the source data

Source code in maggma/stores/mongolike.py
def connect(self, force_reset: bool = False):
    """
    Connect to the source data
    """
    if self._coll is None or force_reset:
        if self.ssh_tunnel is None:
            host = self.host
            port = self.port
        else:
            self.ssh_tunnel.start()
            host, port = self.ssh_tunnel.local_address

        conn: MongoClient = (
            MongoClient(
                host=host,
                port=port,
                username=self.username,
                password=self.password,
                authSource=self.auth_source,
                **self.mongoclient_kwargs,
            )
            if self.username != ""
            else MongoClient(host, port, **self.mongoclient_kwargs)
        )
        db = conn[self.database]
        self._coll = db[self.collection_name]

count(self, criteria=None, hint=None)

Counts the number of documents matching the query criteria

Parameters:

Name Type Description Default
criteria Optional[Dict]

PyMongo filter for documents to count in

None
hint Optional[Dict[str, Union[maggma.core.store.Sort, int]]]

Dictionary of indexes to use as hints for query optimizer. Keys are field names and values are 1 for ascending or -1 for descending.

None
Source code in maggma/stores/mongolike.py
def count(
    self,
    criteria: Optional[Dict] = None,
    hint: Optional[Dict[str, Union[Sort, int]]] = None,
) -> int:
    """
    Counts the number of documents matching the query criteria

    Args:
        criteria: PyMongo filter for documents to count in
        hint: Dictionary of indexes to use as hints for query optimizer.
            Keys are field names and values are 1 for ascending or -1 for descending.
    """

    criteria = criteria if criteria else {}

    hint_list = (
        [(k, Sort(v).value) if isinstance(v, int) else (k, v.value) for k, v in hint.items()] if hint else None
    )

    if hint_list is not None:  # pragma: no cover
        return self._collection.count_documents(filter=criteria, hint=hint_list)

    return self._collection.count_documents(filter=criteria)

distinct(self, field, criteria=None, all_exist=False)

Get all distinct values for a field

Parameters:

Name Type Description Default
field str

the field(s) to get distinct values for

required
criteria Optional[Dict]

PyMongo filter for documents to search in

None
Source code in maggma/stores/mongolike.py
def distinct(self, field: str, criteria: Optional[Dict] = None, all_exist: bool = False) -> List:
    """
    Get all distinct values for a field

    Args:
        field: the field(s) to get distinct values for
        criteria: PyMongo filter for documents to search in
    """

    criteria = criteria or {}
    try:
        distinct_vals = self._collection.distinct(field, criteria)
    except (OperationFailure, DocumentTooLarge):
        distinct_vals = [
            d["_id"] for d in self._collection.aggregate([{"$match": criteria}, {"$group": {"_id": f"${field}"}}])
        ]
        if all(isinstance(d, list) for d in filter(None, distinct_vals)):  # type: ignore
            distinct_vals = list(chain.from_iterable(filter(None, distinct_vals)))

    return distinct_vals if distinct_vals is not None else []

ensure_index(self, key, unique=False)

Tries to create an index and return true if it suceeded

Parameters:

Name Type Description Default
key str

single key to index

required
unique Optional[bool]

Whether or not this index contains only unique keys

False

Returns:

Type Description
bool

bool indicating if the index exists/was created

Source code in maggma/stores/mongolike.py
def ensure_index(self, key: str, unique: Optional[bool] = False) -> bool:
    """
    Tries to create an index and return true if it suceeded
    Args:
        key: single key to index
        unique: Whether or not this index contains only unique keys

    Returns:
        bool indicating if the index exists/was created
    """

    if confirm_field_index(self._collection, key):
        return True
    else:
        try:
            self._collection.create_index(key, unique=unique, background=True)
            return True
        except Exception:
            return False

from_collection(collection) classmethod

Generates a MongoStore from a pymongo collection object This is not a fully safe operation as it gives dummy information to the MongoStore As a result, this will not serialize and can not reset its connection

Parameters:

Name Type Description Default
collection

the PyMongo collection to create a MongoStore around

required
Source code in maggma/stores/mongolike.py
@classmethod
def from_collection(cls, collection):
    """
    Generates a MongoStore from a pymongo collection object
    This is not a fully safe operation as it gives dummy information to the MongoStore
    As a result, this will not serialize and can not reset its connection

    Args:
        collection: the PyMongo collection to create a MongoStore around
    """
    # TODO: How do we make this safer?
    coll_name = collection.name
    db_name = collection.database.name

    store = cls(db_name, coll_name)
    store._coll = collection
    return store

from_db_file(filename, **kwargs) classmethod

Convenience method to construct MongoStore from db_file from old QueryEngine format

Source code in maggma/stores/mongolike.py
@classmethod
def from_db_file(cls, filename: str, **kwargs):
    """
    Convenience method to construct MongoStore from db_file
    from old QueryEngine format
    """
    kwargs = loadfn(filename)
    if "collection" in kwargs:
        kwargs["collection_name"] = kwargs.pop("collection")
    # Get rid of aliases from traditional query engine db docs
    kwargs.pop("aliases", None)
    return cls(**kwargs)

from_launchpad_file(lp_file, collection_name, **kwargs) classmethod

Convenience method to construct MongoStore from a launchpad file

Note: A launchpad file is a special formatted yaml file used in fireworks

Source code in maggma/stores/mongolike.py
@classmethod
def from_launchpad_file(cls, lp_file, collection_name, **kwargs):
    """
    Convenience method to construct MongoStore from a launchpad file

    Note: A launchpad file is a special formatted yaml file used in fireworks

    Returns:
    """
    with open(lp_file, "r") as f:
        lp_creds = yaml.load(f, Loader=yaml.FullLoader)

    db_creds = lp_creds.copy()
    db_creds["database"] = db_creds["name"]
    for key in list(db_creds.keys()):
        if key not in ["database", "host", "port", "username", "password"]:
            db_creds.pop(key)
    db_creds["collection_name"] = collection_name

    return cls(**db_creds, **kwargs)

groupby(self, keys, criteria=None, properties=None, sort=None, skip=0, limit=0)

Simple grouping function that will group documents by keys.

Parameters:

Name Type Description Default
keys Union[List[str], str]

fields to group documents

required
criteria Optional[Dict]

PyMongo filter for documents to search in

None
properties Union[Dict, List]

properties to return in grouped documents

None
sort Optional[Dict[str, Union[maggma.core.store.Sort, int]]]

Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending.

None
skip int

number documents to skip

0
limit int

limit on total number of documents returned

0

Returns:

Type Description
Iterator[Tuple[Dict, List[Dict]]]

generator returning tuples of (key, list of docs)

Source code in maggma/stores/mongolike.py
def groupby(
    self,
    keys: Union[List[str], str],
    criteria: Optional[Dict] = None,
    properties: Union[Dict, List, None] = None,
    sort: Optional[Dict[str, Union[Sort, int]]] = None,
    skip: int = 0,
    limit: int = 0,
) -> Iterator[Tuple[Dict, List[Dict]]]:
    """
    Simple grouping function that will group documents
    by keys.

    Args:
        keys: fields to group documents
        criteria: PyMongo filter for documents to search in
        properties: properties to return in grouped documents
        sort: Dictionary of sort order for fields. Keys are field names and
            values are 1 for ascending or -1 for descending.
        skip: number documents to skip
        limit: limit on total number of documents returned

    Returns:
        generator returning tuples of (key, list of docs)
    """
    pipeline = []
    if isinstance(keys, str):
        keys = [keys]

    if properties is None:
        properties = []
    if isinstance(properties, dict):
        properties = list(properties.keys())

    if criteria is not None:
        pipeline.append({"$match": criteria})

    if len(properties) > 0:
        pipeline.append({"$project": {p: 1 for p in properties + keys}})

    alpha = "abcdefghijklmnopqrstuvwxyz"
    group_id = {letter: f"${key}" for letter, key in zip(alpha, keys)}
    pipeline.append({"$group": {"_id": group_id, "docs": {"$push": "$$ROOT"}}})
    for d in self._collection.aggregate(pipeline, allowDiskUse=True):
        id_doc = {}  # type: Dict[str,Any]
        for letter, key in group_id.items():
            if has(d["_id"], letter):
                set_(id_doc, key[1:], d["_id"][letter])
        yield (id_doc, d["docs"])

query(self, criteria=None, properties=None, sort=None, hint=None, skip=0, limit=0, **kwargs)

Queries the Store for a set of documents

Parameters:

Name Type Description Default
criteria Optional[Dict]

PyMongo filter for documents to search in

None
properties Union[Dict, List]

properties to return in grouped documents

None
sort Optional[Dict[str, Union[maggma.core.store.Sort, int]]]

Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending.

None
hint Optional[Dict[str, Union[maggma.core.store.Sort, int]]]

Dictionary of indexes to use as hints for query optimizer. Keys are field names and values are 1 for ascending or -1 for descending.

None
skip int

number documents to skip

0
limit int

limit on total number of documents returned

0
mongoclient_kwargs

Dict of extra kwargs to pass to pymongo find.

required
Source code in maggma/stores/mongolike.py
def query(  # type: ignore
    self,
    criteria: Optional[Dict] = None,
    properties: Union[Dict, List, None] = None,
    sort: Optional[Dict[str, Union[Sort, int]]] = None,
    hint: Optional[Dict[str, Union[Sort, int]]] = None,
    skip: int = 0,
    limit: int = 0,
    **kwargs,
) -> Iterator[Dict]:
    """
    Queries the Store for a set of documents

    Args:
        criteria: PyMongo filter for documents to search in
        properties: properties to return in grouped documents
        sort: Dictionary of sort order for fields. Keys are field names and
            values are 1 for ascending or -1 for descending.
        hint: Dictionary of indexes to use as hints for query optimizer.
            Keys are field names and values are 1 for ascending or -1 for descending.
        skip: number documents to skip
        limit: limit on total number of documents returned
        mongoclient_kwargs: Dict of extra kwargs to pass to pymongo find.
    """
    if isinstance(properties, list):
        properties = {p: 1 for p in properties}

    default_sort_formatted = None

    if self.default_sort is not None:
        default_sort_formatted = [
            (k, Sort(v).value) if isinstance(v, int) else (k, v.value) for k, v in self.default_sort.items()
        ]

    sort_list = (
        [(k, Sort(v).value) if isinstance(v, int) else (k, v.value) for k, v in sort.items()]
        if sort
        else default_sort_formatted
    )

    hint_list = (
        [(k, Sort(v).value) if isinstance(v, int) else (k, v.value) for k, v in hint.items()] if hint else None
    )

    for d in self._collection.find(
        filter=criteria, projection=properties, skip=skip, limit=limit, sort=sort_list, hint=hint_list, **kwargs
    ):
        yield d

remove_docs(self, criteria)

Remove docs matching the query dictionary

Parameters:

Name Type Description Default
criteria Dict

query dictionary to match

required
Source code in maggma/stores/mongolike.py
def remove_docs(self, criteria: Dict):
    """
    Remove docs matching the query dictionary

    Args:
        criteria: query dictionary to match
    """
    self._collection.delete_many(filter=criteria)

update(self, docs, key=None)

Update documents into the Store

Parameters:

Name Type Description Default
docs Union[List[Dict], Dict]

the document or list of documents to update

required
key Union[List, str]

field name(s) to determine uniqueness for a document, can be a list of multiple fields, a single field, or None if the Store's key field is to be used

None
Source code in maggma/stores/mongolike.py
def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = None):
    """
    Update documents into the Store

    Args:
        docs: the document or list of documents to update
        key: field name(s) to determine uniqueness for a
             document, can be a list of multiple fields,
             a single field, or None if the Store's key
             field is to be used
    """

    requests = []

    if not isinstance(docs, list):
        docs = [docs]

    for d in docs:

        d = jsanitize(d, allow_bson=True)

        # document-level validation is optional
        validates = True
        if self.validator:
            validates = self.validator.is_valid(d)
            if not validates:
                if self.validator.strict:
                    raise ValueError(self.validator.validation_errors(d))
                else:
                    self.logger.error(self.validator.validation_errors(d))

        if validates:
            key = key or self.key
            if isinstance(key, list):
                search_doc = {k: d[k] for k in key}
            else:
                search_doc = {key: d[key]}

            requests.append(ReplaceOne(search_doc, d, upsert=True))

    if len(requests) > 0:
        try:
            self._collection.bulk_write(requests, ordered=False)
        except (OperationFailure, DocumentTooLarge) as e:
            if self.safe_update:
                for req in requests:
                    req._filter
                    try:
                        self._collection.bulk_write([req], ordered=False)
                    except (OperationFailure, DocumentTooLarge):
                        self.logger.error(
                            f"Could not upload document for {req._filter} as it was too large for Mongo"
                        )
            else:
                raise e

MongoURIStore (MongoStore)

A Store that connects to a Mongo collection via a URI This is expected to be a special mongodb+srv:// URIs that include client parameters via TXT records

Source code in maggma/stores/mongolike.py
class MongoURIStore(MongoStore):
    """
    A Store that connects to a Mongo collection via a URI
    This is expected to be a special mongodb+srv:// URIs that include
    client parameters via TXT records
    """

    def __init__(
        self,
        uri: str,
        collection_name: str,
        database: str = None,
        ssh_tunnel: Optional[SSHTunnel] = None,
        mongoclient_kwargs: Optional[Dict] = None,
        default_sort: Optional[Dict[str, Union[Sort, int]]] = None,
        **kwargs,
    ):
        """
        Args:
            uri: MongoDB+SRV URI
            database: database to connect to
            collection_name: The collection name
            default_sort: Default sort field and direction to use when querying. Can be used to
                ensure determinacy in query results.
        """
        self.uri = uri
        self.ssh_tunnel = ssh_tunnel
        self.default_sort = default_sort
        self.mongoclient_kwargs = mongoclient_kwargs or {}

        # parse the dbname from the uri
        if database is None:
            d_uri = uri_parser.parse_uri(uri)
            if d_uri["database"] is None:
                raise ConfigurationError("If database name is not supplied, a database must be set in the uri")
            self.database = d_uri["database"]
        else:
            self.database = database

        self.collection_name = collection_name
        self.kwargs = kwargs
        self._coll = None
        super(MongoStore, self).__init__(**kwargs)  # lgtm

    @property
    def name(self) -> str:
        """
        Return a string representing this data source
        """
        # TODO: This is not very safe since it exposes the username/password info
        return self.uri

    def connect(self, force_reset: bool = False):
        """
        Connect to the source data
        """
        if self._coll is None or force_reset:  # pragma: no cover
            conn: MongoClient = MongoClient(self.uri, **self.mongoclient_kwargs)
            db = conn[self.database]
            self._coll = db[self.collection_name]

name: str property readonly

Return a string representing this data source

__init__(self, uri, collection_name, database=None, ssh_tunnel=None, mongoclient_kwargs=None, default_sort=None, **kwargs) special

Parameters:

Name Type Description Default
uri str

MongoDB+SRV URI

required
database str

database to connect to

None
collection_name str

The collection name

required
default_sort Optional[Dict[str, Union[maggma.core.store.Sort, int]]]

Default sort field and direction to use when querying. Can be used to ensure determinacy in query results.

None
Source code in maggma/stores/mongolike.py
def __init__(
    self,
    uri: str,
    collection_name: str,
    database: str = None,
    ssh_tunnel: Optional[SSHTunnel] = None,
    mongoclient_kwargs: Optional[Dict] = None,
    default_sort: Optional[Dict[str, Union[Sort, int]]] = None,
    **kwargs,
):
    """
    Args:
        uri: MongoDB+SRV URI
        database: database to connect to
        collection_name: The collection name
        default_sort: Default sort field and direction to use when querying. Can be used to
            ensure determinacy in query results.
    """
    self.uri = uri
    self.ssh_tunnel = ssh_tunnel
    self.default_sort = default_sort
    self.mongoclient_kwargs = mongoclient_kwargs or {}

    # parse the dbname from the uri
    if database is None:
        d_uri = uri_parser.parse_uri(uri)
        if d_uri["database"] is None:
            raise ConfigurationError("If database name is not supplied, a database must be set in the uri")
        self.database = d_uri["database"]
    else:
        self.database = database

    self.collection_name = collection_name
    self.kwargs = kwargs
    self._coll = None
    super(MongoStore, self).__init__(**kwargs)  # lgtm

connect(self, force_reset=False)

Connect to the source data

Source code in maggma/stores/mongolike.py
def connect(self, force_reset: bool = False):
    """
    Connect to the source data
    """
    if self._coll is None or force_reset:  # pragma: no cover
        conn: MongoClient = MongoClient(self.uri, **self.mongoclient_kwargs)
        db = conn[self.database]
        self._coll = db[self.collection_name]

MontyStore (MemoryStore)

A MongoDB compatible store that uses on disk files for storage.

This is handled under the hood using MontyDB. A number of on-disk storage options are available but MontyDB provides a mongo style interface for all options. The options include:

  • sqlite: Uses an sqlite database to store documents.
  • lightning: Uses Lightning Memory-Mapped Database (LMDB) for storage. This can provide fast read and write times but requires lmdb to be installed (in most cases this can be achieved using pip install lmdb).
  • flatfile: Uses a system of flat json files. This is not recommended as multiple simultaneous connections to the store will not work correctly.

See the MontyDB repository for more information: https://github.com/davidlatwe/montydb

Source code in maggma/stores/mongolike.py
class MontyStore(MemoryStore):
    """
    A MongoDB compatible store that uses on disk files for storage.

    This is handled under the hood using MontyDB. A number of on-disk storage options
    are available but MontyDB provides a mongo style interface for all options. The
    options include:

    - sqlite: Uses an sqlite database to store documents.
    - lightning: Uses Lightning Memory-Mapped Database (LMDB) for storage. This can
      provide fast read and write times but requires lmdb to be installed (in most cases
      this can be achieved using ``pip install lmdb``).
    - flatfile: Uses a system of flat json files. This is not recommended as multiple
      simultaneous connections to the store will not work correctly.

    See the MontyDB repository for more information: https://github.com/davidlatwe/montydb
    """

    def __init__(
        self,
        collection_name,
        database_path: str = None,
        database_name: str = "db",
        storage: str = "sqlite",
        storage_kwargs: Optional[dict] = None,
        client_kwargs: Optional[dict] = None,
        **kwargs,
    ):
        """
        Initializes the Monty Store.

        Args:
            collection_name: Name for the collection.
            database_path: Path to on-disk database files. If None, the current working
                directory will be used.
            database_name: The database name.
            storage: The storage type. Options include "sqlite", "lightning", "flatfile".
            storage_kwargs: Keyword arguments passed to ``montydb.set_storage``.
            client_kwargs: Keyword arguments passed to the ``montydb.MontyClient``
                constructor.
            **kwargs: Additional keyword arguments passed to the Store constructor.
        """
        if database_path is None:
            database_path = str(Path.cwd())

        self.database_path = database_path
        self.database_name = database_name
        self.collection_name = collection_name
        self._coll = None
        self.default_sort = None
        self.ssh_tunnel = None  # This is to fix issues with the tunnel on close
        self.kwargs = kwargs
        self.storage = storage
        self.storage_kwargs = storage_kwargs or {
            "use_bson": True,
            "monty_version": "4.0",
        }
        self.client_kwargs = client_kwargs or {}
        super(MongoStore, self).__init__(**kwargs)  # noqa

    def connect(self, force_reset: bool = False):
        """
        Connect to the database store.

        Args:
            force_reset: Force connection reset.
        """
        from montydb import set_storage, MontyClient

        set_storage(self.database_path, storage=self.storage, **self.storage_kwargs)
        client = MontyClient(self.database_path, **self.client_kwargs)
        if not self._coll or force_reset:
            self._coll = client["db"][self.collection_name]

    @property
    def name(self) -> str:
        """Return a string representing this data source."""
        return f"monty://{self.database_path}/{self.database}/{self.collection_name}"

    def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = None):
        """
        Update documents into the Store.

        Args:
            docs: The document or list of documents to update.
            key: Field name(s) to determine uniqueness for a document, can be a list of
                multiple fields, a single field, or None if the Store's key field is to be
                used.
        """

        if not isinstance(docs, list):
            docs = [docs]

        for d in docs:
            d = jsanitize(d, allow_bson=True)

            # document-level validation is optional
            validates = True
            if self.validator:
                validates = self.validator.is_valid(d)
                if not validates:
                    if self.validator.strict:
                        raise ValueError(self.validator.validation_errors(d))
                    else:
                        self.logger.error(self.validator.validation_errors(d))

            if validates:
                key = key or self.key
                if isinstance(key, list):
                    search_doc = {k: d[k] for k in key}
                else:
                    search_doc = {key: d[key]}

                self._collection.replace_one(search_doc, d, upsert=True)

name: str property readonly

Return a string representing this data source.

__init__(self, collection_name, database_path=None, database_name='db', storage='sqlite', storage_kwargs=None, client_kwargs=None, **kwargs) special

Initializes the Monty Store.

Parameters:

Name Type Description Default
collection_name

Name for the collection.

required
database_path str

Path to on-disk database files. If None, the current working directory will be used.

None
database_name str

The database name.

'db'
storage str

The storage type. Options include "sqlite", "lightning", "flatfile".

'sqlite'
storage_kwargs Optional[dict]

Keyword arguments passed to montydb.set_storage.

None
client_kwargs Optional[dict]

Keyword arguments passed to the montydb.MontyClient constructor.

None
**kwargs

Additional keyword arguments passed to the Store constructor.

{}
Source code in maggma/stores/mongolike.py
def __init__(
    self,
    collection_name,
    database_path: str = None,
    database_name: str = "db",
    storage: str = "sqlite",
    storage_kwargs: Optional[dict] = None,
    client_kwargs: Optional[dict] = None,
    **kwargs,
):
    """
    Initializes the Monty Store.

    Args:
        collection_name: Name for the collection.
        database_path: Path to on-disk database files. If None, the current working
            directory will be used.
        database_name: The database name.
        storage: The storage type. Options include "sqlite", "lightning", "flatfile".
        storage_kwargs: Keyword arguments passed to ``montydb.set_storage``.
        client_kwargs: Keyword arguments passed to the ``montydb.MontyClient``
            constructor.
        **kwargs: Additional keyword arguments passed to the Store constructor.
    """
    if database_path is None:
        database_path = str(Path.cwd())

    self.database_path = database_path
    self.database_name = database_name
    self.collection_name = collection_name
    self._coll = None
    self.default_sort = None
    self.ssh_tunnel = None  # This is to fix issues with the tunnel on close
    self.kwargs = kwargs
    self.storage = storage
    self.storage_kwargs = storage_kwargs or {
        "use_bson": True,
        "monty_version": "4.0",
    }
    self.client_kwargs = client_kwargs or {}
    super(MongoStore, self).__init__(**kwargs)  # noqa

connect(self, force_reset=False)

Connect to the database store.

Parameters:

Name Type Description Default
force_reset bool

Force connection reset.

False
Source code in maggma/stores/mongolike.py
def connect(self, force_reset: bool = False):
    """
    Connect to the database store.

    Args:
        force_reset: Force connection reset.
    """
    from montydb import set_storage, MontyClient

    set_storage(self.database_path, storage=self.storage, **self.storage_kwargs)
    client = MontyClient(self.database_path, **self.client_kwargs)
    if not self._coll or force_reset:
        self._coll = client["db"][self.collection_name]

update(self, docs, key=None)

Update documents into the Store.

Parameters:

Name Type Description Default
docs Union[List[Dict], Dict]

The document or list of documents to update.

required
key Union[List, str]

Field name(s) to determine uniqueness for a document, can be a list of multiple fields, a single field, or None if the Store's key field is to be used.

None
Source code in maggma/stores/mongolike.py
def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = None):
    """
    Update documents into the Store.

    Args:
        docs: The document or list of documents to update.
        key: Field name(s) to determine uniqueness for a document, can be a list of
            multiple fields, a single field, or None if the Store's key field is to be
            used.
    """

    if not isinstance(docs, list):
        docs = [docs]

    for d in docs:
        d = jsanitize(d, allow_bson=True)

        # document-level validation is optional
        validates = True
        if self.validator:
            validates = self.validator.is_valid(d)
            if not validates:
                if self.validator.strict:
                    raise ValueError(self.validator.validation_errors(d))
                else:
                    self.logger.error(self.validator.validation_errors(d))

        if validates:
            key = key or self.key
            if isinstance(key, list):
                search_doc = {k: d[k] for k in key}
            else:
                search_doc = {key: d[key]}

            self._collection.replace_one(search_doc, d, upsert=True)

SSHTunnel (MSONable)

Source code in maggma/stores/mongolike.py
class SSHTunnel(MSONable):

    __TUNNELS: Dict[str, SSHTunnelForwarder] = {}

    def __init__(
        self,
        tunnel_server_address: str,
        remote_server_address: str,
        username: Optional[str] = None,
        password: Optional[str] = None,
        private_key: Optional[str] = None,
        **kwargs,
    ):
        """
        Args:
            tunnel_server_address: string address with port for the SSH tunnel server
            remote_server_address: string address with port for the server to connect to
            username: optional username for the ssh tunnel server
            password: optional password for the ssh tunnel server; If a private_key is
                supplied this password is assumed to be the private key password
            private_key: ssh private key to authenticate to the tunnel server
            kwargs: any extra args passed to the SSHTunnelForwarder
        """

        self.tunnel_server_address = tunnel_server_address
        self.remote_server_address = remote_server_address
        self.username = username
        self.password = password
        self.private_key = private_key
        self.kwargs = kwargs

        if remote_server_address in SSHTunnel.__TUNNELS:
            self.tunnel = SSHTunnel.__TUNNELS[remote_server_address]
        else:
            open_port = _find_free_port("127.0.0.1")
            local_bind_address = ("127.0.0.1", open_port)

            ssh_address, ssh_port = tunnel_server_address.split(":")
            ssh_port = int(ssh_port)  # type: ignore

            remote_bind_address, remote_bind_port = remote_server_address.split(":")
            remote_bind_port = int(remote_bind_port)  # type: ignore

            if private_key is not None:
                ssh_password = None
                ssh_private_key_password = password
            else:
                ssh_password = password
                ssh_private_key_password = None

            self.tunnel = SSHTunnelForwarder(
                ssh_address_or_host=(ssh_address, ssh_port),
                local_bind_address=local_bind_address,
                remote_bind_address=(remote_bind_address, remote_bind_port),
                ssh_username=username,
                ssh_password=ssh_password,
                ssh_private_key_password=ssh_private_key_password,
                ssh_pkey=private_key,
                **kwargs,
            )

    def start(self):
        if not self.tunnel.is_active:
            self.tunnel.start()

    def stop(self):
        if self.tunnel.tunnel_is_up:
            self.tunnel.stop()

    @property
    def local_address(self) -> Tuple[str, int]:
        return self.tunnel.local_bind_address

__init__(self, tunnel_server_address, remote_server_address, username=None, password=None, private_key=None, **kwargs) special

Parameters:

Name Type Description Default
tunnel_server_address str

string address with port for the SSH tunnel server

required
remote_server_address str

string address with port for the server to connect to

required
username Optional[str]

optional username for the ssh tunnel server

None
password Optional[str]

optional password for the ssh tunnel server; If a private_key is supplied this password is assumed to be the private key password

None
private_key Optional[str]

ssh private key to authenticate to the tunnel server

None
kwargs

any extra args passed to the SSHTunnelForwarder

{}
Source code in maggma/stores/mongolike.py
def __init__(
    self,
    tunnel_server_address: str,
    remote_server_address: str,
    username: Optional[str] = None,
    password: Optional[str] = None,
    private_key: Optional[str] = None,
    **kwargs,
):
    """
    Args:
        tunnel_server_address: string address with port for the SSH tunnel server
        remote_server_address: string address with port for the server to connect to
        username: optional username for the ssh tunnel server
        password: optional password for the ssh tunnel server; If a private_key is
            supplied this password is assumed to be the private key password
        private_key: ssh private key to authenticate to the tunnel server
        kwargs: any extra args passed to the SSHTunnelForwarder
    """

    self.tunnel_server_address = tunnel_server_address
    self.remote_server_address = remote_server_address
    self.username = username
    self.password = password
    self.private_key = private_key
    self.kwargs = kwargs

    if remote_server_address in SSHTunnel.__TUNNELS:
        self.tunnel = SSHTunnel.__TUNNELS[remote_server_address]
    else:
        open_port = _find_free_port("127.0.0.1")
        local_bind_address = ("127.0.0.1", open_port)

        ssh_address, ssh_port = tunnel_server_address.split(":")
        ssh_port = int(ssh_port)  # type: ignore

        remote_bind_address, remote_bind_port = remote_server_address.split(":")
        remote_bind_port = int(remote_bind_port)  # type: ignore

        if private_key is not None:
            ssh_password = None
            ssh_private_key_password = password
        else:
            ssh_password = password
            ssh_private_key_password = None

        self.tunnel = SSHTunnelForwarder(
            ssh_address_or_host=(ssh_address, ssh_port),
            local_bind_address=local_bind_address,
            remote_bind_address=(remote_bind_address, remote_bind_port),
            ssh_username=username,
            ssh_password=ssh_password,
            ssh_private_key_password=ssh_private_key_password,
            ssh_pkey=private_key,
            **kwargs,
        )

Module defining a FileStore that enables accessing files in a local directory using typical maggma access patterns.

FileStore (MemoryStore)

A Store for files on disk. Provides a common access method consistent with other stores. Each Item in the Store represents one file. Files can be organized into any type of directory structure.

A hash of the full path to each file is used to define a file_id that uniquely identifies each item.

Any metadata added to the items is written to a .json file in the root directory of the FileStore.

Source code in maggma/stores/file_store.py
class FileStore(MemoryStore):
    """
    A Store for files on disk. Provides a common access method consistent with
    other stores. Each Item in the Store represents one file. Files can be organized
    into any type of directory structure.

    A hash of the full path to each file is used to define a file_id that uniquely
    identifies each item.

    Any metadata added to the items is written to a .json file in the root directory
    of the FileStore.
    """

    def __init__(
        self,
        path: Union[str, Path],
        file_filters: Optional[List] = None,
        max_depth: Optional[int] = None,
        read_only: bool = True,
        include_orphans: bool = False,
        json_name: str = "FileStore.json",
        **kwargs,
    ):
        """
        Initializes a FileStore
        Args:
            path: parent directory containing all files and subdirectories to process
            file_filters: List of fnmatch patterns defining the files to be tracked by
                the FileStore. Only files that match one of the patterns  provided will
                be included in the Store If None (default), all files are included.

                Examples: ["*.txt", "test-[abcd].txt"], etc.
                See https://docs.python.org/3/library/fnmatch.html for full syntax
            max_depth: The maximum depth to look into subdirectories. 0 = no recursion,
                1 = include files 1 directory below the FileStore, etc.
                None (default) will scan all files below
                the FileStore root directory, regardless of depth.
            read_only: If True (default), the .update() and .remove_docs()
                methods are disabled, preventing any changes to the files on
                disk. In addition, metadata cannot be written to disk.
            include_orphans: Whether to include orphaned metadata records in query results.
                Orphaned metadata records are records found in the local JSON file that can
                no longer be associated to a file on disk. This can happen if a file is renamed
                or deleted, or if the FileStore is re-initialized with a more restrictive
                file_filters or max_depth argument. By default (False), these records
                do not appear in query results. Nevertheless, the metadata records are
                retained in the JSON file and the FileStore to prevent accidental data loss.
            json_name: Name of the .json file to which metadata is saved. If read_only
                is False, this file will be created in the root directory of the
                FileStore.
            kwargs: kwargs passed to MemoryStore.__init__()
        """
        # this conditional block is needed in order to guarantee that the 'name'
        # property, which is passed to `MemoryStore`, works correctly
        # collection names passed to MemoryStore cannot end with '.'
        if path == ".":
            path = Path.cwd()
        self.path = Path(path) if isinstance(path, str) else path

        self.json_name = json_name
        self.file_filters = file_filters if file_filters else ["*"]
        self.collection_name = "file_store"
        self.key = "file_id"
        self.include_orphans = include_orphans
        self.read_only = read_only
        self.max_depth = max_depth

        self.metadata_store = JSONStore(
            paths=[str(self.path / self.json_name)],
            read_only=self.read_only,
            collection_name=self.collection_name,
            key=self.key,
        )

        self.kwargs = kwargs

        super().__init__(
            collection_name=self.collection_name,
            key=self.key,
            **self.kwargs,
        )

    @property
    def name(self) -> str:
        """
        Return a string representing this data source
        """
        return f"file://{self.path}"

    def add_metadata(
        self,
        metadata: Dict = {},
        query: Optional[Dict] = None,
        auto_data: Callable[[Dict], Dict] = None,
        **kwargs,
    ):
        """
        Add metadata to a record in the FileStore, either manually or by computing it automatically
        from another field, such as name or path (see auto_data).

        Args:
            metadata: dict of additional data to add to the records returned by query.
                      Note that any protected keys (such as 'name', 'path', etc.)
                      will be ignored.
            query: Query passed to FileStore.query()
            auto_data: A function that automatically computes metadata based on a field in
                    the record itself. The function must take in the item as a dict and
                    return a dict containing the desired metadata. A typical use case is
                    to assign metadata based on the name of a file. For example, for
                    data files named like `2022-04-01_april_fool_experiment.txt`, the
                    auto_data function could be:

                    def get_metadata_from_filename(d):
                        return {"date": d["name"].split("_")[0],
                                "test_name": d["name"].split("_")[1]
                                }

                    Note that in the case of conflict between manual and automatically
                    computed metadata (for example, if metadata={"name": "another_name"} was
                    supplied alongside the auto_data function above), the manually-supplied
                    metadata is used.
            kwargs: kwargs passed to FileStore.query()
        """
        # sanitize the metadata
        filtered_metadata = self._filter_data(metadata)
        updated_docs = []

        for doc in self.query(query, **kwargs):
            if auto_data:
                extra_data = self._filter_data(auto_data(doc))
                doc.update(extra_data)
            doc.update(filtered_metadata)
            updated_docs.append(doc)

        self.update(updated_docs, key=self.key)

    def read(self) -> List[Dict]:
        """
        Iterate through all files in the Store folder and populate
        the Store with dictionaries containing basic information about each file.

        The keys of the documents added to the Store are

            name: str = File name
            path: Path = Absolute path of this file
            parent: str = Name of the parent directory (if any)
            file_id: str = Unique identifier for this file, computed from the hash
                        of its path relative to the base FileStore directory and
                        the file creation time. The key of this field is 'file_id'
                        by default but can be changed via the 'key' kwarg to
                        FileStore.__init__().
            size: int = Size of this file in bytes
            last_updated: datetime = Time this file was last modified
            hash: str = Hash of the file contents
            orphan: bool = Whether this record is an orphan
        """
        file_list = []
        # generate a list of files in subdirectories
        for pattern in self.file_filters:
            # list every file that matches the pattern
            for f in self.path.rglob(pattern):
                if f.is_file():
                    # ignore the .json file created by the Store
                    if f.name == self.json_name:
                        continue
                    # filter based on depth
                    depth = len(f.relative_to(self.path).parts) - 1
                    if self.max_depth is None or depth <= self.max_depth:
                        file_list.append(self._create_record_from_file(f))

        return file_list

    def _create_record_from_file(self, f: Union[str, Path]) -> Dict:
        """
        Given the path to a file, return a Dict that constitues a record of
        basic information about that file. The keys in the returned dict
        are:

            name: str = File name
            path: Path = Absolute path of this file
            parent: str = Name of the parent directory (if any)
            file_id: str = Unique identifier for this file, computed from the hash
                        of its path relative to the base FileStore directory and
                        the file creation time. The key of this field is 'file_id'
                        by default but can be changed via the 'key' kwarg to
                        FileStore.__init__().
            size: int = Size of this file in bytes
            last_updated: datetime = Time this file was last modified
            hash: str = Hash of the file contents
            orphan: bool = Whether this record is an orphan
        """
        # make sure f is a Path object
        if not isinstance(f, Path):
            f = Path(f)

        # compute the file_id from the relative path
        relative_path = f.relative_to(self.path)
        digest = hashlib.md5()
        digest.update(str(relative_path).encode())
        file_id = str(digest.hexdigest())

        # hash the file contents
        digest2 = hashlib.md5()
        block_size = 128 * digest2.block_size
        digest2.update(self.name.encode())
        with open(f.as_posix(), "rb") as file:
            buf = file.read(block_size)
            digest2.update(buf)
        content_hash = str(digest2.hexdigest())

        d = {
            "name": f.name,
            "path": f,
            "path_relative": relative_path,
            "parent": f.parent.name,
            "size": f.stat().st_size,
            "last_updated": datetime.fromtimestamp(f.stat().st_mtime, tz=timezone.utc),
            "orphan": False,
            "hash": content_hash,
            self.key: file_id,
        }

        return d

    def connect(self, force_reset: bool = False):
        """
        Connect to the source data

        Read all the files in the directory, create corresponding File
        items in the internal MemoryStore.

        If there is a metadata .json file in the directory, read its
        contents into the MemoryStore

        Args:
            force_reset: whether to reset the connection or not
        """
        # read all files and place them in the MemoryStore
        # use super.update to bypass the read_only guard statement
        # because we want the file data to be populated in memory
        super().connect()
        super().update(self.read())

        # now read any metadata from the .json file
        try:
            self.metadata_store.connect()
            metadata = [d for d in self.metadata_store.query()]
        except FileNotFoundError:
            metadata = []
            warnings.warn(
                f"""
                JSON file '{self.json_name}' not found. To create this file automatically, re-initialize
                the FileStore with read_only=False.
                """
            )

        # merge metadata with file data and check for orphaned metadata
        requests = []
        found_orphans = False
        key = self.key
        file_ids = self.distinct(self.key)
        for d in metadata:
            if isinstance(key, list):
                search_doc = {k: d[k] for k in key}
            else:
                search_doc = {key: d[key]}

            if d[key] not in file_ids:
                found_orphans = True
                d.update({"orphan": True})

            del d["_id"]

            requests.append(UpdateOne(search_doc, {"$set": d}, upsert=True))

        if found_orphans:
            warnings.warn(
                f"Orphaned metadata was found in {self.json_name}. This metadata"
                "will be added to the store with {'orphan': True}"
            )
        if len(requests) > 0:
            self._collection.bulk_write(requests, ordered=False)

    def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = None):
        """
        Update items in the Store. Only possible if the store is not read only. Any new
        fields that are added will be written to the JSON file in the root directory
        of the FileStore.

        Note that certain fields that come from file metadata on disk are protected and
        cannot be updated with this method. This prevents the contents of the FileStore
        from becoming out of sync with the files on which it is based. The protected fields
        are keys in the dict returned by _create_record_from_file, e.g. 'name', 'parent',
        'path', 'last_updated', 'hash', 'size', 'contents', and 'orphan'. The 'path_relative' and key fields are
        retained to make each document in the JSON file identifiable by manual inspection.

        Args:
            docs: the document or list of documents to update
            key: field name(s) to determine uniqueness for a
                 document, can be a list of multiple fields,
                 a single field, or None if the Store's key
                 field is to be used
        """
        if self.read_only:
            raise StoreError(
                "This Store is read-only. To enable file I/O, re-initialize the store with read_only=False."
            )

        super().update(docs, key)
        data = [d for d in self.query()]
        filtered_data = []
        # remove fields that are populated by .read()
        for d in data:
            filtered_d = self._filter_data(d)
            # don't write records that contain only file_id
            if (
                len(set(filtered_d.keys()).difference(set(["path_relative", self.key])))
                != 0
            ):
                filtered_data.append(filtered_d)
        self.metadata_store.update(filtered_data, self.key)

    def _filter_data(self, d):
        """
        Remove any protected keys from a dictionary

        Args:
            d: Dictionary whose keys are to be filtered
        """
        filtered_d = {
            k: v
            for k, v in d.items()
            if k not in PROTECTED_KEYS.union({self.last_updated_field})
        }
        return filtered_d

    def query(  # type: ignore
        self,
        criteria: Optional[Dict] = None,
        properties: Union[Dict, List, None] = None,
        sort: Optional[Dict[str, Union[Sort, int]]] = None,
        hint: Optional[Dict[str, Union[Sort, int]]] = None,
        skip: int = 0,
        limit: int = 0,
        contents_size_limit: Optional[int] = None,
    ) -> Iterator[Dict]:
        """
        Queries the Store for a set of documents

        Args:
            criteria: PyMongo filter for documents to search in
            properties: properties to return in grouped documents
            sort: Dictionary of sort order for fields. Keys are field names and
                values are 1 for ascending or -1 for descending.
            hint: Dictionary of indexes to use as hints for query optimizer.
                Keys are field names and values are 1 for ascending or -1 for descending.
            skip: number documents to skip
            limit: limit on total number of documents returned
            contents_size_limit: Maximum file size in bytes for which to return contents.
                The FileStore will attempt to read the file and populate the 'contents' key
                with its content at query time, unless the file size is larger than this value.
        """
        return_contents = False
        criteria = criteria if criteria else {}
        if criteria.get("orphan", None) is None:
            if not self.include_orphans:
                criteria.update({"orphan": False})

        if criteria.get("contents"):
            warnings.warn("'contents' is not a queryable field! Ignoring.")

        if isinstance(properties, list):
            properties = {p: 1 for p in properties}

        orig_properties = properties.copy() if properties else None

        if properties is None or properties.get("contents"):
            return_contents = True

        if properties is not None and return_contents:
            # remove contents b/c it isn't stored in the MemoryStore
            properties.pop("contents")
            # add size and path to query so that file can be read
            properties.update({"size": 1})
            properties.update({"path": 1})

        for d in super().query(
            criteria=criteria,
            properties=properties,
            sort=sort,
            hint=hint,
            skip=skip,
            limit=limit,
        ):
            # add file contents to the returned documents, if appropriate
            if return_contents:
                if contents_size_limit is None or d["size"] <= contents_size_limit:
                    # attempt to read the file contents and inject into the document
                    # TODO - could add more logic for detecting different file types
                    # and more nuanced exception handling
                    try:
                        with zopen(d["path"], "r") as f:
                            data = f.read()
                    except Exception as e:
                        data = f"Unable to read: {e}"

                elif d["size"] > contents_size_limit:
                    data = "Unable to read: file too large"
                else:
                    data = "Unable to read: Unknown error"

                d.update({"contents": data})

                # remove size and path if not explicitly requested
                if orig_properties is not None and "size" not in orig_properties:
                    d.pop("size")
                if orig_properties is not None and "path" not in orig_properties:
                    d.pop("path")

            yield d

    def query_one(
        self,
        criteria: Optional[Dict] = None,
        properties: Union[Dict, List, None] = None,
        sort: Optional[Dict[str, Union[Sort, int]]] = None,
        contents_size_limit: Optional[int] = None,
    ):
        """
        Queries the Store for a single document

        Args:
            criteria: PyMongo filter for documents to search
            properties: properties to return in the document
            sort: Dictionary of sort order for fields. Keys are field names and
                values are 1 for ascending or -1 for descending.
            contents_size_limit: Maximum file size in bytes for which to return contents.
                The FileStore will attempt to read the file and populate the 'contents' key
                with its content at query time, unless the file size is larger than this value.
        """
        return next(
            self.query(
                criteria=criteria,
                properties=properties,
                sort=sort,
                contents_size_limit=contents_size_limit,
            ),
            None,
        )

    def remove_docs(self, criteria: Dict, confirm: bool = False):
        """
        Remove items matching the query dictionary.

        Args:
            criteria: query dictionary to match
            confirm: Boolean flag to confirm that remove_docs should delete
                     files on disk. Default: False.
        """
        if self.read_only:
            raise StoreError(
                "This Store is read-only. To enable file I/O, re-initialize the "
                "store with read_only=False."
            )

        docs = [d for d in self.query(criteria)]
        # this ensures that any modifications to criteria made by self.query
        # (e.g., related to orphans or contents) are propagated through to the superclass
        new_criteria = {"file_id": {"$in": [d["file_id"] for d in docs]}}

        if len(docs) > 0 and not confirm:
            raise StoreError(
                f"Warning! This command is about to delete {len(docs)} items from disk! "
                "If this is what you want, reissue this command with confirm=True."
            )

        for d in docs:
            Path(d["path"]).unlink()
            super().remove_docs(criteria=new_criteria)

name: str property readonly

Return a string representing this data source

__init__(self, path, file_filters=None, max_depth=None, read_only=True, include_orphans=False, json_name='FileStore.json', **kwargs) special

Initializes a FileStore

Parameters:

Name Type Description Default
path Union[str, pathlib.Path]

parent directory containing all files and subdirectories to process

required
file_filters Optional[List]

List of fnmatch patterns defining the files to be tracked by the FileStore. Only files that match one of the patterns provided will be included in the Store If None (default), all files are included.

Examples: ["*.txt", "test-[abcd].txt"], etc. See https://docs.python.org/3/library/fnmatch.html for full syntax

None
max_depth Optional[int]

The maximum depth to look into subdirectories. 0 = no recursion, 1 = include files 1 directory below the FileStore, etc. None (default) will scan all files below the FileStore root directory, regardless of depth.

None
read_only bool

If True (default), the .update() and .remove_docs() methods are disabled, preventing any changes to the files on disk. In addition, metadata cannot be written to disk.

True
include_orphans bool

Whether to include orphaned metadata records in query results. Orphaned metadata records are records found in the local JSON file that can no longer be associated to a file on disk. This can happen if a file is renamed or deleted, or if the FileStore is re-initialized with a more restrictive file_filters or max_depth argument. By default (False), these records do not appear in query results. Nevertheless, the metadata records are retained in the JSON file and the FileStore to prevent accidental data loss.

False
json_name str

Name of the .json file to which metadata is saved. If read_only is False, this file will be created in the root directory of the FileStore.

'FileStore.json'
kwargs

kwargs passed to MemoryStore.init()

{}
Source code in maggma/stores/file_store.py
def __init__(
    self,
    path: Union[str, Path],
    file_filters: Optional[List] = None,
    max_depth: Optional[int] = None,
    read_only: bool = True,
    include_orphans: bool = False,
    json_name: str = "FileStore.json",
    **kwargs,
):
    """
    Initializes a FileStore
    Args:
        path: parent directory containing all files and subdirectories to process
        file_filters: List of fnmatch patterns defining the files to be tracked by
            the FileStore. Only files that match one of the patterns  provided will
            be included in the Store If None (default), all files are included.

            Examples: ["*.txt", "test-[abcd].txt"], etc.
            See https://docs.python.org/3/library/fnmatch.html for full syntax
        max_depth: The maximum depth to look into subdirectories. 0 = no recursion,
            1 = include files 1 directory below the FileStore, etc.
            None (default) will scan all files below
            the FileStore root directory, regardless of depth.
        read_only: If True (default), the .update() and .remove_docs()
            methods are disabled, preventing any changes to the files on
            disk. In addition, metadata cannot be written to disk.
        include_orphans: Whether to include orphaned metadata records in query results.
            Orphaned metadata records are records found in the local JSON file that can
            no longer be associated to a file on disk. This can happen if a file is renamed
            or deleted, or if the FileStore is re-initialized with a more restrictive
            file_filters or max_depth argument. By default (False), these records
            do not appear in query results. Nevertheless, the metadata records are
            retained in the JSON file and the FileStore to prevent accidental data loss.
        json_name: Name of the .json file to which metadata is saved. If read_only
            is False, this file will be created in the root directory of the
            FileStore.
        kwargs: kwargs passed to MemoryStore.__init__()
    """
    # this conditional block is needed in order to guarantee that the 'name'
    # property, which is passed to `MemoryStore`, works correctly
    # collection names passed to MemoryStore cannot end with '.'
    if path == ".":
        path = Path.cwd()
    self.path = Path(path) if isinstance(path, str) else path

    self.json_name = json_name
    self.file_filters = file_filters if file_filters else ["*"]
    self.collection_name = "file_store"
    self.key = "file_id"
    self.include_orphans = include_orphans
    self.read_only = read_only
    self.max_depth = max_depth

    self.metadata_store = JSONStore(
        paths=[str(self.path / self.json_name)],
        read_only=self.read_only,
        collection_name=self.collection_name,
        key=self.key,
    )

    self.kwargs = kwargs

    super().__init__(
        collection_name=self.collection_name,
        key=self.key,
        **self.kwargs,
    )

add_metadata(self, metadata={}, query=None, auto_data=None, **kwargs)

Add metadata to a record in the FileStore, either manually or by computing it automatically from another field, such as name or path (see auto_data).

Parameters:

Name Type Description Default
metadata Dict

dict of additional data to add to the records returned by query. Note that any protected keys (such as 'name', 'path', etc.) will be ignored.

{}
query Optional[Dict]

Query passed to FileStore.query()

None
auto_data Callable[[Dict], Dict]

A function that automatically computes metadata based on a field in the record itself. The function must take in the item as a dict and return a dict containing the desired metadata. A typical use case is to assign metadata based on the name of a file. For example, for data files named like 2022-04-01_april_fool_experiment.txt, the auto_data function could be:

def get_metadata_from_filename(d):
    return {"date": d["name"].split("_")[0],
            "test_name": d["name"].split("_")[1]
            }

Note that in the case of conflict between manual and automatically
computed metadata (for example, if metadata={"name": "another_name"} was
supplied alongside the auto_data function above), the manually-supplied
metadata is used.
None
kwargs

kwargs passed to FileStore.query()

{}
Source code in maggma/stores/file_store.py
def add_metadata(
    self,
    metadata: Dict = {},
    query: Optional[Dict] = None,
    auto_data: Callable[[Dict], Dict] = None,
    **kwargs,
):
    """
    Add metadata to a record in the FileStore, either manually or by computing it automatically
    from another field, such as name or path (see auto_data).

    Args:
        metadata: dict of additional data to add to the records returned by query.
                  Note that any protected keys (such as 'name', 'path', etc.)
                  will be ignored.
        query: Query passed to FileStore.query()
        auto_data: A function that automatically computes metadata based on a field in
                the record itself. The function must take in the item as a dict and
                return a dict containing the desired metadata. A typical use case is
                to assign metadata based on the name of a file. For example, for
                data files named like `2022-04-01_april_fool_experiment.txt`, the
                auto_data function could be:

                def get_metadata_from_filename(d):
                    return {"date": d["name"].split("_")[0],
                            "test_name": d["name"].split("_")[1]
                            }

                Note that in the case of conflict between manual and automatically
                computed metadata (for example, if metadata={"name": "another_name"} was
                supplied alongside the auto_data function above), the manually-supplied
                metadata is used.
        kwargs: kwargs passed to FileStore.query()
    """
    # sanitize the metadata
    filtered_metadata = self._filter_data(metadata)
    updated_docs = []

    for doc in self.query(query, **kwargs):
        if auto_data:
            extra_data = self._filter_data(auto_data(doc))
            doc.update(extra_data)
        doc.update(filtered_metadata)
        updated_docs.append(doc)

    self.update(updated_docs, key=self.key)

connect(self, force_reset=False)

Connect to the source data

Read all the files in the directory, create corresponding File items in the internal MemoryStore.

If there is a metadata .json file in the directory, read its contents into the MemoryStore

Parameters:

Name Type Description Default
force_reset bool

whether to reset the connection or not

False
Source code in maggma/stores/file_store.py
def connect(self, force_reset: bool = False):
    """
    Connect to the source data

    Read all the files in the directory, create corresponding File
    items in the internal MemoryStore.

    If there is a metadata .json file in the directory, read its
    contents into the MemoryStore

    Args:
        force_reset: whether to reset the connection or not
    """
    # read all files and place them in the MemoryStore
    # use super.update to bypass the read_only guard statement
    # because we want the file data to be populated in memory
    super().connect()
    super().update(self.read())

    # now read any metadata from the .json file
    try:
        self.metadata_store.connect()
        metadata = [d for d in self.metadata_store.query()]
    except FileNotFoundError:
        metadata = []
        warnings.warn(
            f"""
            JSON file '{self.json_name}' not found. To create this file automatically, re-initialize
            the FileStore with read_only=False.
            """
        )

    # merge metadata with file data and check for orphaned metadata
    requests = []
    found_orphans = False
    key = self.key
    file_ids = self.distinct(self.key)
    for d in metadata:
        if isinstance(key, list):
            search_doc = {k: d[k] for k in key}
        else:
            search_doc = {key: d[key]}

        if d[key] not in file_ids:
            found_orphans = True
            d.update({"orphan": True})

        del d["_id"]

        requests.append(UpdateOne(search_doc, {"$set": d}, upsert=True))

    if found_orphans:
        warnings.warn(
            f"Orphaned metadata was found in {self.json_name}. This metadata"
            "will be added to the store with {'orphan': True}"
        )
    if len(requests) > 0:
        self._collection.bulk_write(requests, ordered=False)

query(self, criteria=None, properties=None, sort=None, hint=None, skip=0, limit=0, contents_size_limit=None)

Queries the Store for a set of documents

Parameters:

Name Type Description Default
criteria Optional[Dict]

PyMongo filter for documents to search in

None
properties Union[Dict, List]

properties to return in grouped documents

None
sort Optional[Dict[str, Union[maggma.core.store.Sort, int]]]

Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending.

None
hint Optional[Dict[str, Union[maggma.core.store.Sort, int]]]

Dictionary of indexes to use as hints for query optimizer. Keys are field names and values are 1 for ascending or -1 for descending.

None
skip int

number documents to skip

0
limit int

limit on total number of documents returned

0
contents_size_limit Optional[int]

Maximum file size in bytes for which to return contents. The FileStore will attempt to read the file and populate the 'contents' key with its content at query time, unless the file size is larger than this value.

None
Source code in maggma/stores/file_store.py
def query(  # type: ignore
    self,
    criteria: Optional[Dict] = None,
    properties: Union[Dict, List, None] = None,
    sort: Optional[Dict[str, Union[Sort, int]]] = None,
    hint: Optional[Dict[str, Union[Sort, int]]] = None,
    skip: int = 0,
    limit: int = 0,
    contents_size_limit: Optional[int] = None,
) -> Iterator[Dict]:
    """
    Queries the Store for a set of documents

    Args:
        criteria: PyMongo filter for documents to search in
        properties: properties to return in grouped documents
        sort: Dictionary of sort order for fields. Keys are field names and
            values are 1 for ascending or -1 for descending.
        hint: Dictionary of indexes to use as hints for query optimizer.
            Keys are field names and values are 1 for ascending or -1 for descending.
        skip: number documents to skip
        limit: limit on total number of documents returned
        contents_size_limit: Maximum file size in bytes for which to return contents.
            The FileStore will attempt to read the file and populate the 'contents' key
            with its content at query time, unless the file size is larger than this value.
    """
    return_contents = False
    criteria = criteria if criteria else {}
    if criteria.get("orphan", None) is None:
        if not self.include_orphans:
            criteria.update({"orphan": False})

    if criteria.get("contents"):
        warnings.warn("'contents' is not a queryable field! Ignoring.")

    if isinstance(properties, list):
        properties = {p: 1 for p in properties}

    orig_properties = properties.copy() if properties else None

    if properties is None or properties.get("contents"):
        return_contents = True

    if properties is not None and return_contents:
        # remove contents b/c it isn't stored in the MemoryStore
        properties.pop("contents")
        # add size and path to query so that file can be read
        properties.update({"size": 1})
        properties.update({"path": 1})

    for d in super().query(
        criteria=criteria,
        properties=properties,
        sort=sort,
        hint=hint,
        skip=skip,
        limit=limit,
    ):
        # add file contents to the returned documents, if appropriate
        if return_contents:
            if contents_size_limit is None or d["size"] <= contents_size_limit:
                # attempt to read the file contents and inject into the document
                # TODO - could add more logic for detecting different file types
                # and more nuanced exception handling
                try:
                    with zopen(d["path"], "r") as f:
                        data = f.read()
                except Exception as e:
                    data = f"Unable to read: {e}"

            elif d["size"] > contents_size_limit:
                data = "Unable to read: file too large"
            else:
                data = "Unable to read: Unknown error"

            d.update({"contents": data})

            # remove size and path if not explicitly requested
            if orig_properties is not None and "size" not in orig_properties:
                d.pop("size")
            if orig_properties is not None and "path" not in orig_properties:
                d.pop("path")

        yield d

query_one(self, criteria=None, properties=None, sort=None, contents_size_limit=None)

Queries the Store for a single document

Parameters:

Name Type Description Default
criteria Optional[Dict]

PyMongo filter for documents to search

None
properties Union[Dict, List]

properties to return in the document

None
sort Optional[Dict[str, Union[maggma.core.store.Sort, int]]]

Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending.

None
contents_size_limit Optional[int]

Maximum file size in bytes for which to return contents. The FileStore will attempt to read the file and populate the 'contents' key with its content at query time, unless the file size is larger than this value.

None
Source code in maggma/stores/file_store.py
def query_one(
    self,
    criteria: Optional[Dict] = None,
    properties: Union[Dict, List, None] = None,
    sort: Optional[Dict[str, Union[Sort, int]]] = None,
    contents_size_limit: Optional[int] = None,
):
    """
    Queries the Store for a single document

    Args:
        criteria: PyMongo filter for documents to search
        properties: properties to return in the document
        sort: Dictionary of sort order for fields. Keys are field names and
            values are 1 for ascending or -1 for descending.
        contents_size_limit: Maximum file size in bytes for which to return contents.
            The FileStore will attempt to read the file and populate the 'contents' key
            with its content at query time, unless the file size is larger than this value.
    """
    return next(
        self.query(
            criteria=criteria,
            properties=properties,
            sort=sort,
            contents_size_limit=contents_size_limit,
        ),
        None,
    )

read(self)

Iterate through all files in the Store folder and populate the Store with dictionaries containing basic information about each file.

The keys of the documents added to the Store are

name: str = File name
path: Path = Absolute path of this file
parent: str = Name of the parent directory (if any)
!!! file_id "str = Unique identifier for this file, computed from the hash"
            of its path relative to the base FileStore directory and
            the file creation time. The key of this field is 'file_id'
            by default but can be changed via the 'key' kwarg to
            FileStore.__init__().
size: int = Size of this file in bytes
last_updated: datetime = Time this file was last modified
hash: str = Hash of the file contents
orphan: bool = Whether this record is an orphan
Source code in maggma/stores/file_store.py
def read(self) -> List[Dict]:
    """
    Iterate through all files in the Store folder and populate
    the Store with dictionaries containing basic information about each file.

    The keys of the documents added to the Store are

        name: str = File name
        path: Path = Absolute path of this file
        parent: str = Name of the parent directory (if any)
        file_id: str = Unique identifier for this file, computed from the hash
                    of its path relative to the base FileStore directory and
                    the file creation time. The key of this field is 'file_id'
                    by default but can be changed via the 'key' kwarg to
                    FileStore.__init__().
        size: int = Size of this file in bytes
        last_updated: datetime = Time this file was last modified
        hash: str = Hash of the file contents
        orphan: bool = Whether this record is an orphan
    """
    file_list = []
    # generate a list of files in subdirectories
    for pattern in self.file_filters:
        # list every file that matches the pattern
        for f in self.path.rglob(pattern):
            if f.is_file():
                # ignore the .json file created by the Store
                if f.name == self.json_name:
                    continue
                # filter based on depth
                depth = len(f.relative_to(self.path).parts) - 1
                if self.max_depth is None or depth <= self.max_depth:
                    file_list.append(self._create_record_from_file(f))

    return file_list

remove_docs(self, criteria, confirm=False)

Remove items matching the query dictionary.

Parameters:

Name Type Description Default
criteria Dict

query dictionary to match

required
confirm bool

Boolean flag to confirm that remove_docs should delete files on disk. Default: False.

False
Source code in maggma/stores/file_store.py
def remove_docs(self, criteria: Dict, confirm: bool = False):
    """
    Remove items matching the query dictionary.

    Args:
        criteria: query dictionary to match
        confirm: Boolean flag to confirm that remove_docs should delete
                 files on disk. Default: False.
    """
    if self.read_only:
        raise StoreError(
            "This Store is read-only. To enable file I/O, re-initialize the "
            "store with read_only=False."
        )

    docs = [d for d in self.query(criteria)]
    # this ensures that any modifications to criteria made by self.query
    # (e.g., related to orphans or contents) are propagated through to the superclass
    new_criteria = {"file_id": {"$in": [d["file_id"] for d in docs]}}

    if len(docs) > 0 and not confirm:
        raise StoreError(
            f"Warning! This command is about to delete {len(docs)} items from disk! "
            "If this is what you want, reissue this command with confirm=True."
        )

    for d in docs:
        Path(d["path"]).unlink()
        super().remove_docs(criteria=new_criteria)

update(self, docs, key=None)

Update items in the Store. Only possible if the store is not read only. Any new fields that are added will be written to the JSON file in the root directory of the FileStore.

Note that certain fields that come from file metadata on disk are protected and cannot be updated with this method. This prevents the contents of the FileStore from becoming out of sync with the files on which it is based. The protected fields are keys in the dict returned by _create_record_from_file, e.g. 'name', 'parent', 'path', 'last_updated', 'hash', 'size', 'contents', and 'orphan'. The 'path_relative' and key fields are retained to make each document in the JSON file identifiable by manual inspection.

Parameters:

Name Type Description Default
docs Union[List[Dict], Dict]

the document or list of documents to update

required
key Union[List, str]

field name(s) to determine uniqueness for a document, can be a list of multiple fields, a single field, or None if the Store's key field is to be used

None
Source code in maggma/stores/file_store.py
def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = None):
    """
    Update items in the Store. Only possible if the store is not read only. Any new
    fields that are added will be written to the JSON file in the root directory
    of the FileStore.

    Note that certain fields that come from file metadata on disk are protected and
    cannot be updated with this method. This prevents the contents of the FileStore
    from becoming out of sync with the files on which it is based. The protected fields
    are keys in the dict returned by _create_record_from_file, e.g. 'name', 'parent',
    'path', 'last_updated', 'hash', 'size', 'contents', and 'orphan'. The 'path_relative' and key fields are
    retained to make each document in the JSON file identifiable by manual inspection.

    Args:
        docs: the document or list of documents to update
        key: field name(s) to determine uniqueness for a
             document, can be a list of multiple fields,
             a single field, or None if the Store's key
             field is to be used
    """
    if self.read_only:
        raise StoreError(
            "This Store is read-only. To enable file I/O, re-initialize the store with read_only=False."
        )

    super().update(docs, key)
    data = [d for d in self.query()]
    filtered_data = []
    # remove fields that are populated by .read()
    for d in data:
        filtered_d = self._filter_data(d)
        # don't write records that contain only file_id
        if (
            len(set(filtered_d.keys()).difference(set(["path_relative", self.key])))
            != 0
        ):
            filtered_data.append(filtered_d)
    self.metadata_store.update(filtered_data, self.key)

Module containing various definitions of Stores. Stores are a default access pattern to data and provide various utillities

GridFSStore (Store)

A Store for GrdiFS backend. Provides a common access method consistent with other stores

Source code in maggma/stores/gridfs.py
class GridFSStore(Store):
    """
    A Store for GrdiFS backend. Provides a common access method consistent with other stores
    """

    def __init__(
        self,
        database: str,
        collection_name: str,
        host: str = "localhost",
        port: int = 27017,
        username: str = "",
        password: str = "",
        compression: bool = False,
        ensure_metadata: bool = False,
        searchable_fields: List[str] = None,
        auth_source: Optional[str] = None,
        mongoclient_kwargs: Optional[Dict] = None,
        ssh_tunnel: Optional[SSHTunnel] = None,
        **kwargs,
    ):
        """
        Initializes a GrdiFS Store for binary data
        Args:
            database: database name
            collection_name: The name of the collection.
                This is the string portion before the GridFS extensions
            host: hostname for the database
            port: port to connect to
            username: username to connect as
            password: password to authenticate as
            compression: compress the data as it goes into GridFS
            ensure_metadata: ensure returned documents have the metadata fields
            searchable_fields: fields to keep in the index store
            auth_source: The database to authenticate on. Defaults to the database name.
            ssh_tunnel: An SSHTunnel object to use.
        """

        self.database = database
        self.collection_name = collection_name
        self.host = host
        self.port = port
        self.username = username
        self.password = password
        self._coll = None  # type: Any
        self.compression = compression
        self.ensure_metadata = ensure_metadata
        self.searchable_fields = [] if searchable_fields is None else searchable_fields
        self.kwargs = kwargs
        self.ssh_tunnel = ssh_tunnel

        if auth_source is None:
            auth_source = self.database
        self.auth_source = auth_source
        self.mongoclient_kwargs = mongoclient_kwargs or {}

        if "key" not in kwargs:
            kwargs["key"] = "_id"
        super().__init__(**kwargs)

    @classmethod
    def from_launchpad_file(cls, lp_file, collection_name, **kwargs):
        """
        Convenience method to construct a GridFSStore from a launchpad file

        Note: A launchpad file is a special formatted yaml file used in fireworks

        Returns:
        """
        with open(lp_file, "r") as f:
            lp_creds = yaml.load(f, Loader=yaml.FullLoader)

        db_creds = lp_creds.copy()
        db_creds["database"] = db_creds["name"]
        for key in list(db_creds.keys()):
            if key not in ["database", "host", "port", "username", "password"]:
                db_creds.pop(key)
        db_creds["collection_name"] = collection_name

        return cls(**db_creds, **kwargs)

    @property
    def name(self) -> str:
        """
        Return a string representing this data source
        """
        return f"gridfs://{self.host}/{self.database}/{self.collection_name}"

    def connect(self, force_reset: bool = False):
        """
        Connect to the source data
        """
        if self.ssh_tunnel is None:
            host = self.host
            port = self.port
        else:
            self.ssh_tunnel.start()
            host, port = self.ssh_tunnel.local_address

        conn: MongoClient = (
            MongoClient(
                host=host,
                port=port,
                username=self.username,
                password=self.password,
                authSource=self.auth_source,
                **self.mongoclient_kwargs,
            )
            if self.username != ""
            else MongoClient(host, port, **self.mongoclient_kwargs)
        )
        if not self._coll or force_reset:
            db = conn[self.database]

            self._coll = gridfs.GridFS(db, self.collection_name)
            self._files_collection = db["{}.files".format(self.collection_name)]
            self._files_store = MongoStore.from_collection(self._files_collection)
            self._files_store.last_updated_field = f"metadata.{self.last_updated_field}"
            self._files_store.key = self.key
            self._chunks_collection = db["{}.chunks".format(self.collection_name)]

    @property
    def _collection(self):
        """Property referring to underlying pymongo collection"""
        if self._coll is None:
            raise StoreError("Must connect Mongo-like store before attemping to use it")
        return self._coll

    @property
    def last_updated(self) -> datetime:
        """
        Provides the most recent last_updated date time stamp from
        the documents in this Store
        """
        return self._files_store.last_updated

    @classmethod
    def transform_criteria(cls, criteria: Dict) -> Dict:
        """
        Allow client to not need to prepend 'metadata.' to query fields.
        Args:
            criteria: Query criteria
        """
        new_criteria = dict()
        for field in criteria:
            if field not in files_collection_fields and not field.startswith(
                "metadata."
            ):
                new_criteria["metadata." + field] = copy.copy(criteria[field])
            else:
                new_criteria[field] = copy.copy(criteria[field])

        return new_criteria

    def count(self, criteria: Optional[Dict] = None) -> int:
        """
        Counts the number of documents matching the query criteria

        Args:
            criteria: PyMongo filter for documents to count in
        """
        if isinstance(criteria, dict):
            criteria = self.transform_criteria(criteria)

        return self._files_store.count(criteria)

    def query(
        self,
        criteria: Optional[Dict] = None,
        properties: Union[Dict, List, None] = None,
        sort: Optional[Dict[str, Union[Sort, int]]] = None,
        skip: int = 0,
        limit: int = 0,
    ) -> Iterator[Dict]:
        """
        Queries the GridFS Store for a set of documents.
        Will check to see if data can be returned from
        files store first.
        If the data from the gridfs is not a json serialized string
        a dict will be returned with the data in the "data" key
        plus the self.key and self.last_updated_field.

        Args:
            criteria: PyMongo filter for documents to search in
            properties: properties to return in grouped documents
            sort: Dictionary of sort order for fields. Keys are field names and
                values are 1 for ascending or -1 for descending.
            skip: number documents to skip
            limit: limit on total number of documents returned
        """
        if isinstance(criteria, dict):
            criteria = self.transform_criteria(criteria)
        elif criteria is not None:
            raise ValueError("Criteria must be a dictionary or None")

        prop_keys = set()
        if isinstance(properties, dict):
            prop_keys = set(properties.keys())
        elif isinstance(properties, list):
            prop_keys = set(properties)

        for doc in self._files_store.query(
            criteria=criteria, sort=sort, limit=limit, skip=skip
        ):
            if properties is not None and prop_keys.issubset(set(doc.keys())):
                yield {p: doc[p] for p in properties if p in doc}
            else:

                metadata = doc.get("metadata", {})

                data = self._collection.find_one(
                    filter={"_id": doc["_id"]},
                    skip=skip,
                    limit=limit,
                    sort=sort,
                ).read()

                if metadata.get("compression", "") == "zlib":
                    data = zlib.decompress(data).decode("UTF-8")

                try:
                    data = json.loads(data)
                except Exception:
                    if not isinstance(data, dict):
                        data = {
                            "data": data,
                            self.key: doc.get(self.key),
                            self.last_updated_field: doc.get(self.last_updated_field),
                        }

                if self.ensure_metadata and isinstance(data, dict):
                    data.update(metadata)

                yield data

    def distinct(
        self, field: str, criteria: Optional[Dict] = None, all_exist: bool = False
    ) -> List:
        """
        Get all distinct values for a field. This function only operates
        on the metadata in the files collection

        Args:
            field: the field(s) to get distinct values for
            criteria: PyMongo filter for documents to search in
        """
        criteria = (
            self.transform_criteria(criteria)
            if isinstance(criteria, dict)
            else criteria
        )

        field = (
            f"metadata.{field}"
            if field not in files_collection_fields
            and not field.startswith("metadata.")
            else field
        )

        return self._files_store.distinct(field=field, criteria=criteria)

    def groupby(
        self,
        keys: Union[List[str], str],
        criteria: Optional[Dict] = None,
        properties: Union[Dict, List, None] = None,
        sort: Optional[Dict[str, Union[Sort, int]]] = None,
        skip: int = 0,
        limit: int = 0,
    ) -> Iterator[Tuple[Dict, List[Dict]]]:
        """
        Simple grouping function that will group documents
        by keys. Will only work if the keys are included in the files
        collection for GridFS

        Args:
            keys: fields to group documents
            criteria: PyMongo filter for documents to search in
            properties: properties to return in grouped documents
            sort: Dictionary of sort order for fields. Keys are field names and
                values are 1 for ascending or -1 for descending.
            skip: number documents to skip
            limit: limit on total number of documents returned

        Returns:
            generator returning tuples of (dict, list of docs)
        """

        criteria = (
            self.transform_criteria(criteria)
            if isinstance(criteria, dict)
            else criteria
        )
        keys = [keys] if not isinstance(keys, list) else keys
        keys = [
            f"metadata.{k}"
            if k not in files_collection_fields and not k.startswith("metadata.")
            else k
            for k in keys
        ]
        for group, ids in self._files_store.groupby(
            keys, criteria=criteria, properties=[f"metadata.{self.key}"]
        ):
            ids = [
                get(doc, f"metadata.{self.key}")
                for doc in ids
                if has(doc, f"metadata.{self.key}")
            ]

            group = {
                k.replace("metadata.", ""): get(group, k) for k in keys if has(group, k)
            }

            yield group, list(self.query(criteria={self.key: {"$in": ids}}))

    def ensure_index(self, key: str, unique: Optional[bool] = False) -> bool:
        """
        Tries to create an index and return true if it suceeded
        Currently operators on the GridFS files collection
        Args:
            key: single key to index
            unique: Whether or not this index contains only unique keys

        Returns:
            bool indicating if the index exists/was created
        """
        # Transform key for gridfs first
        if key not in files_collection_fields:
            files_col_key = "metadata.{}".format(key)
            return self._files_store.ensure_index(files_col_key, unique=unique)
        else:
            return self._files_store.ensure_index(key, unique=unique)

    def update(
        self,
        docs: Union[List[Dict], Dict],
        key: Union[List, str, None] = None,
        additional_metadata: Union[str, List[str], None] = None,
    ):
        """
        Update documents into the Store

        Args:
            docs: the document or list of documents to update
            key: field name(s) to determine uniqueness for a
                 document, can be a list of multiple fields,
                 a single field, or None if the Store's key
                 field is to be used
            additional_metadata: field(s) to include in the gridfs metadata
        """

        if not isinstance(docs, list):
            docs = [docs]

        if isinstance(key, str):
            key = [key]
        elif not key:
            key = [self.key]

        key = list(set(key) - set(files_collection_fields))

        if additional_metadata is None:
            additional_metadata = []
        elif isinstance(additional_metadata, str):
            additional_metadata = [additional_metadata]
        else:
            additional_metadata = list(additional_metadata)

        for d in docs:
            search_doc = {k: d[k] for k in key}

            metadata = {
                k: get(d, k)
                for k in [self.last_updated_field]
                + additional_metadata
                + self.searchable_fields
                if has(d, k)
            }
            metadata.update(search_doc)
            data = json.dumps(jsanitize(d)).encode("UTF-8")
            if self.compression:
                data = zlib.compress(data)
                metadata["compression"] = "zlib"

            self._collection.put(data, metadata=metadata)
            search_doc = self.transform_criteria(search_doc)

            # Cleans up old gridfs entries
            for fdoc in (
                self._files_collection.find(search_doc, ["_id"])
                .sort("uploadDate", -1)
                .skip(1)
            ):
                self._collection.delete(fdoc["_id"])

    def remove_docs(self, criteria: Dict):
        """
        Remove docs matching the query dictionary

        Args:
            criteria: query dictionary to match
        """
        if isinstance(criteria, dict):
            criteria = self.transform_criteria(criteria)
        ids = [cursor._id for cursor in self._collection.find(criteria)]

        for _id in ids:
            self._collection.delete(_id)

    def close(self):
        self._collection.database.client.close()
        if self.ssh_tunnel is not None:
            self.ssh_tunnel.stop()

    def __eq__(self, other: object) -> bool:
        """
        Check equality for GridFSStore
        other: other GridFSStore to compare with
        """
        if not isinstance(other, GridFSStore):
            return False

        fields = ["database", "collection_name", "host", "port"]
        return all(getattr(self, f) == getattr(other, f) for f in fields)

last_updated: datetime property readonly

Provides the most recent last_updated date time stamp from the documents in this Store

name: str property readonly

Return a string representing this data source

__eq__(self, other) special

Check equality for GridFSStore other: other GridFSStore to compare with

Source code in maggma/stores/gridfs.py
def __eq__(self, other: object) -> bool:
    """
    Check equality for GridFSStore
    other: other GridFSStore to compare with
    """
    if not isinstance(other, GridFSStore):
        return False

    fields = ["database", "collection_name", "host", "port"]
    return all(getattr(self, f) == getattr(other, f) for f in fields)

__init__(self, database, collection_name, host='localhost', port=27017, username='', password='', compression=False, ensure_metadata=False, searchable_fields=None, auth_source=None, mongoclient_kwargs=None, ssh_tunnel=None, **kwargs) special

Initializes a GrdiFS Store for binary data

Parameters:

Name Type Description Default
database str

database name

required
collection_name str

The name of the collection. This is the string portion before the GridFS extensions

required
host str

hostname for the database

'localhost'
port int

port to connect to

27017
username str

username to connect as

''
password str

password to authenticate as

''
compression bool

compress the data as it goes into GridFS

False
ensure_metadata bool

ensure returned documents have the metadata fields

False
searchable_fields List[str]

fields to keep in the index store

None
auth_source Optional[str]

The database to authenticate on. Defaults to the database name.

None
ssh_tunnel Optional[maggma.stores.mongolike.SSHTunnel]

An SSHTunnel object to use.

None
Source code in maggma/stores/gridfs.py
def __init__(
    self,
    database: str,
    collection_name: str,
    host: str = "localhost",
    port: int = 27017,
    username: str = "",
    password: str = "",
    compression: bool = False,
    ensure_metadata: bool = False,
    searchable_fields: List[str] = None,
    auth_source: Optional[str] = None,
    mongoclient_kwargs: Optional[Dict] = None,
    ssh_tunnel: Optional[SSHTunnel] = None,
    **kwargs,
):
    """
    Initializes a GrdiFS Store for binary data
    Args:
        database: database name
        collection_name: The name of the collection.
            This is the string portion before the GridFS extensions
        host: hostname for the database
        port: port to connect to
        username: username to connect as
        password: password to authenticate as
        compression: compress the data as it goes into GridFS
        ensure_metadata: ensure returned documents have the metadata fields
        searchable_fields: fields to keep in the index store
        auth_source: The database to authenticate on. Defaults to the database name.
        ssh_tunnel: An SSHTunnel object to use.
    """

    self.database = database
    self.collection_name = collection_name
    self.host = host
    self.port = port
    self.username = username
    self.password = password
    self._coll = None  # type: Any
    self.compression = compression
    self.ensure_metadata = ensure_metadata
    self.searchable_fields = [] if searchable_fields is None else searchable_fields
    self.kwargs = kwargs
    self.ssh_tunnel = ssh_tunnel

    if auth_source is None:
        auth_source = self.database
    self.auth_source = auth_source
    self.mongoclient_kwargs = mongoclient_kwargs or {}

    if "key" not in kwargs:
        kwargs["key"] = "_id"
    super().__init__(**kwargs)

close(self)

Closes any connections

Source code in maggma/stores/gridfs.py
def close(self):
    self._collection.database.client.close()
    if self.ssh_tunnel is not None:
        self.ssh_tunnel.stop()

connect(self, force_reset=False)

Connect to the source data

Source code in maggma/stores/gridfs.py
def connect(self, force_reset: bool = False):
    """
    Connect to the source data
    """
    if self.ssh_tunnel is None:
        host = self.host
        port = self.port
    else:
        self.ssh_tunnel.start()
        host, port = self.ssh_tunnel.local_address

    conn: MongoClient = (
        MongoClient(
            host=host,
            port=port,
            username=self.username,
            password=self.password,
            authSource=self.auth_source,
            **self.mongoclient_kwargs,
        )
        if self.username != ""
        else MongoClient(host, port, **self.mongoclient_kwargs)
    )
    if not self._coll or force_reset:
        db = conn[self.database]

        self._coll = gridfs.GridFS(db, self.collection_name)
        self._files_collection = db["{}.files".format(self.collection_name)]
        self._files_store = MongoStore.from_collection(self._files_collection)
        self._files_store.last_updated_field = f"metadata.{self.last_updated_field}"
        self._files_store.key = self.key
        self._chunks_collection = db["{}.chunks".format(self.collection_name)]

count(self, criteria=None)

Counts the number of documents matching the query criteria

Parameters:

Name Type Description Default
criteria Optional[Dict]

PyMongo filter for documents to count in

None
Source code in maggma/stores/gridfs.py
def count(self, criteria: Optional[Dict] = None) -> int:
    """
    Counts the number of documents matching the query criteria

    Args:
        criteria: PyMongo filter for documents to count in
    """
    if isinstance(criteria, dict):
        criteria = self.transform_criteria(criteria)

    return self._files_store.count(criteria)

distinct(self, field, criteria=None, all_exist=False)

Get all distinct values for a field. This function only operates on the metadata in the files collection

Parameters:

Name Type Description Default
field str

the field(s) to get distinct values for

required
criteria Optional[Dict]

PyMongo filter for documents to search in

None
Source code in maggma/stores/gridfs.py
def distinct(
    self, field: str, criteria: Optional[Dict] = None, all_exist: bool = False
) -> List:
    """
    Get all distinct values for a field. This function only operates
    on the metadata in the files collection

    Args:
        field: the field(s) to get distinct values for
        criteria: PyMongo filter for documents to search in
    """
    criteria = (
        self.transform_criteria(criteria)
        if isinstance(criteria, dict)
        else criteria
    )

    field = (
        f"metadata.{field}"
        if field not in files_collection_fields
        and not field.startswith("metadata.")
        else field
    )

    return self._files_store.distinct(field=field, criteria=criteria)

ensure_index(self, key, unique=False)

Tries to create an index and return true if it suceeded Currently operators on the GridFS files collection

Parameters:

Name Type Description Default
key str

single key to index

required
unique Optional[bool]

Whether or not this index contains only unique keys

False

Returns:

Type Description
bool

bool indicating if the index exists/was created

Source code in maggma/stores/gridfs.py
def ensure_index(self, key: str, unique: Optional[bool] = False) -> bool:
    """
    Tries to create an index and return true if it suceeded
    Currently operators on the GridFS files collection
    Args:
        key: single key to index
        unique: Whether or not this index contains only unique keys

    Returns:
        bool indicating if the index exists/was created
    """
    # Transform key for gridfs first
    if key not in files_collection_fields:
        files_col_key = "metadata.{}".format(key)
        return self._files_store.ensure_index(files_col_key, unique=unique)
    else:
        return self._files_store.ensure_index(key, unique=unique)

from_launchpad_file(lp_file, collection_name, **kwargs) classmethod

Convenience method to construct a GridFSStore from a launchpad file

Note: A launchpad file is a special formatted yaml file used in fireworks

Source code in maggma/stores/gridfs.py
@classmethod
def from_launchpad_file(cls, lp_file, collection_name, **kwargs):
    """
    Convenience method to construct a GridFSStore from a launchpad file

    Note: A launchpad file is a special formatted yaml file used in fireworks

    Returns:
    """
    with open(lp_file, "r") as f:
        lp_creds = yaml.load(f, Loader=yaml.FullLoader)

    db_creds = lp_creds.copy()
    db_creds["database"] = db_creds["name"]
    for key in list(db_creds.keys()):
        if key not in ["database", "host", "port", "username", "password"]:
            db_creds.pop(key)
    db_creds["collection_name"] = collection_name

    return cls(**db_creds, **kwargs)

groupby(self, keys, criteria=None, properties=None, sort=None, skip=0, limit=0)

Simple grouping function that will group documents by keys. Will only work if the keys are included in the files collection for GridFS

Parameters:

Name Type Description Default
keys Union[List[str], str]

fields to group documents

required
criteria Optional[Dict]

PyMongo filter for documents to search in

None
properties Union[Dict, List]

properties to return in grouped documents

None
sort Optional[Dict[str, Union[maggma.core.store.Sort, int]]]

Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending.

None
skip int

number documents to skip

0
limit int

limit on total number of documents returned

0

Returns:

Type Description
Iterator[Tuple[Dict, List[Dict]]]

generator returning tuples of (dict, list of docs)

Source code in maggma/stores/gridfs.py
def groupby(
    self,
    keys: Union[List[str], str],
    criteria: Optional[Dict] = None,
    properties: Union[Dict, List, None] = None,
    sort: Optional[Dict[str, Union[Sort, int]]] = None,
    skip: int = 0,
    limit: int = 0,
) -> Iterator[Tuple[Dict, List[Dict]]]:
    """
    Simple grouping function that will group documents
    by keys. Will only work if the keys are included in the files
    collection for GridFS

    Args:
        keys: fields to group documents
        criteria: PyMongo filter for documents to search in
        properties: properties to return in grouped documents
        sort: Dictionary of sort order for fields. Keys are field names and
            values are 1 for ascending or -1 for descending.
        skip: number documents to skip
        limit: limit on total number of documents returned

    Returns:
        generator returning tuples of (dict, list of docs)
    """

    criteria = (
        self.transform_criteria(criteria)
        if isinstance(criteria, dict)
        else criteria
    )
    keys = [keys] if not isinstance(keys, list) else keys
    keys = [
        f"metadata.{k}"
        if k not in files_collection_fields and not k.startswith("metadata.")
        else k
        for k in keys
    ]
    for group, ids in self._files_store.groupby(
        keys, criteria=criteria, properties=[f"metadata.{self.key}"]
    ):
        ids = [
            get(doc, f"metadata.{self.key}")
            for doc in ids
            if has(doc, f"metadata.{self.key}")
        ]

        group = {
            k.replace("metadata.", ""): get(group, k) for k in keys if has(group, k)
        }

        yield group, list(self.query(criteria={self.key: {"$in": ids}}))

query(self, criteria=None, properties=None, sort=None, skip=0, limit=0)

Queries the GridFS Store for a set of documents. Will check to see if data can be returned from files store first. If the data from the gridfs is not a json serialized string a dict will be returned with the data in the "data" key plus the self.key and self.last_updated_field.

Parameters:

Name Type Description Default
criteria Optional[Dict]

PyMongo filter for documents to search in

None
properties Union[Dict, List]

properties to return in grouped documents

None
sort Optional[Dict[str, Union[maggma.core.store.Sort, int]]]

Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending.

None
skip int

number documents to skip

0
limit int

limit on total number of documents returned

0
Source code in maggma/stores/gridfs.py
def query(
    self,
    criteria: Optional[Dict] = None,
    properties: Union[Dict, List, None] = None,
    sort: Optional[Dict[str, Union[Sort, int]]] = None,
    skip: int = 0,
    limit: int = 0,
) -> Iterator[Dict]:
    """
    Queries the GridFS Store for a set of documents.
    Will check to see if data can be returned from
    files store first.
    If the data from the gridfs is not a json serialized string
    a dict will be returned with the data in the "data" key
    plus the self.key and self.last_updated_field.

    Args:
        criteria: PyMongo filter for documents to search in
        properties: properties to return in grouped documents
        sort: Dictionary of sort order for fields. Keys are field names and
            values are 1 for ascending or -1 for descending.
        skip: number documents to skip
        limit: limit on total number of documents returned
    """
    if isinstance(criteria, dict):
        criteria = self.transform_criteria(criteria)
    elif criteria is not None:
        raise ValueError("Criteria must be a dictionary or None")

    prop_keys = set()
    if isinstance(properties, dict):
        prop_keys = set(properties.keys())
    elif isinstance(properties, list):
        prop_keys = set(properties)

    for doc in self._files_store.query(
        criteria=criteria, sort=sort, limit=limit, skip=skip
    ):
        if properties is not None and prop_keys.issubset(set(doc.keys())):
            yield {p: doc[p] for p in properties if p in doc}
        else:

            metadata = doc.get("metadata", {})

            data = self._collection.find_one(
                filter={"_id": doc["_id"]},
                skip=skip,
                limit=limit,
                sort=sort,
            ).read()

            if metadata.get("compression", "") == "zlib":
                data = zlib.decompress(data).decode("UTF-8")

            try:
                data = json.loads(data)
            except Exception:
                if not isinstance(data, dict):
                    data = {
                        "data": data,
                        self.key: doc.get(self.key),
                        self.last_updated_field: doc.get(self.last_updated_field),
                    }

            if self.ensure_metadata and isinstance(data, dict):
                data.update(metadata)

            yield data

remove_docs(self, criteria)

Remove docs matching the query dictionary

Parameters:

Name Type Description Default
criteria Dict

query dictionary to match

required
Source code in maggma/stores/gridfs.py
def remove_docs(self, criteria: Dict):
    """
    Remove docs matching the query dictionary

    Args:
        criteria: query dictionary to match
    """
    if isinstance(criteria, dict):
        criteria = self.transform_criteria(criteria)
    ids = [cursor._id for cursor in self._collection.find(criteria)]

    for _id in ids:
        self._collection.delete(_id)

transform_criteria(criteria) classmethod

Allow client to not need to prepend 'metadata.' to query fields.

Parameters:

Name Type Description Default
criteria Dict

Query criteria

required
Source code in maggma/stores/gridfs.py
@classmethod
def transform_criteria(cls, criteria: Dict) -> Dict:
    """
    Allow client to not need to prepend 'metadata.' to query fields.
    Args:
        criteria: Query criteria
    """
    new_criteria = dict()
    for field in criteria:
        if field not in files_collection_fields and not field.startswith(
            "metadata."
        ):
            new_criteria["metadata." + field] = copy.copy(criteria[field])
        else:
            new_criteria[field] = copy.copy(criteria[field])

    return new_criteria

update(self, docs, key=None, additional_metadata=None)

Update documents into the Store

Parameters:

Name Type Description Default
docs Union[List[Dict], Dict]

the document or list of documents to update

required
key Union[List, str]

field name(s) to determine uniqueness for a document, can be a list of multiple fields, a single field, or None if the Store's key field is to be used

None
additional_metadata Union[str, List[str]]

field(s) to include in the gridfs metadata

None
Source code in maggma/stores/gridfs.py
def update(
    self,
    docs: Union[List[Dict], Dict],
    key: Union[List, str, None] = None,
    additional_metadata: Union[str, List[str], None] = None,
):
    """
    Update documents into the Store

    Args:
        docs: the document or list of documents to update
        key: field name(s) to determine uniqueness for a
             document, can be a list of multiple fields,
             a single field, or None if the Store's key
             field is to be used
        additional_metadata: field(s) to include in the gridfs metadata
    """

    if not isinstance(docs, list):
        docs = [docs]

    if isinstance(key, str):
        key = [key]
    elif not key:
        key = [self.key]

    key = list(set(key) - set(files_collection_fields))

    if additional_metadata is None:
        additional_metadata = []
    elif isinstance(additional_metadata, str):
        additional_metadata = [additional_metadata]
    else:
        additional_metadata = list(additional_metadata)

    for d in docs:
        search_doc = {k: d[k] for k in key}

        metadata = {
            k: get(d, k)
            for k in [self.last_updated_field]
            + additional_metadata
            + self.searchable_fields
            if has(d, k)
        }
        metadata.update(search_doc)
        data = json.dumps(jsanitize(d)).encode("UTF-8")
        if self.compression:
            data = zlib.compress(data)
            metadata["compression"] = "zlib"

        self._collection.put(data, metadata=metadata)
        search_doc = self.transform_criteria(search_doc)

        # Cleans up old gridfs entries
        for fdoc in (
            self._files_collection.find(search_doc, ["_id"])
            .sort("uploadDate", -1)
            .skip(1)
        ):
            self._collection.delete(fdoc["_id"])

GridFSURIStore (GridFSStore)

A Store for GridFS backend, with connection via a mongo URI string.

This is expected to be a special mongodb+srv:// URIs that include client parameters via TXT records

Source code in maggma/stores/gridfs.py
class GridFSURIStore(GridFSStore):
    """
    A Store for GridFS backend, with connection via a mongo URI string.

    This is expected to be a special mongodb+srv:// URIs that include client parameters
    via TXT records
    """

    def __init__(
        self,
        uri: str,
        collection_name: str,
        database: str = None,
        compression: bool = False,
        ensure_metadata: bool = False,
        searchable_fields: List[str] = None,
        mongoclient_kwargs: Optional[Dict] = None,
        **kwargs,
    ):
        """
        Initializes a GrdiFS Store for binary data
        Args:
            uri: MongoDB+SRV URI
            database: database to connect to
            collection_name: The collection name
            compression: compress the data as it goes into GridFS
            ensure_metadata: ensure returned documents have the metadata fields
            searchable_fields: fields to keep in the index store
        """

        self.uri = uri

        # parse the dbname from the uri
        if database is None:
            d_uri = uri_parser.parse_uri(uri)
            if d_uri["database"] is None:
                raise ConfigurationError(
                    "If database name is not supplied, a database must be set in the uri"
                )
            self.database = d_uri["database"]
        else:
            self.database = database

        self.collection_name = collection_name
        self._coll = None  # type: Any
        self.compression = compression
        self.ensure_metadata = ensure_metadata
        self.searchable_fields = [] if searchable_fields is None else searchable_fields
        self.kwargs = kwargs
        self.mongoclient_kwargs = mongoclient_kwargs or {}

        if "key" not in kwargs:
            kwargs["key"] = "_id"
        super(GridFSStore, self).__init__(**kwargs)  # lgtm

    def connect(self, force_reset: bool = False):
        """
        Connect to the source data
        """
        if not self._coll or force_reset:  # pragma: no cover
            conn: MongoClient = MongoClient(self.uri, **self.mongoclient_kwargs)
            db = conn[self.database]
            self._coll = gridfs.GridFS(db, self.collection_name)
            self._files_collection = db["{}.files".format(self.collection_name)]
            self._files_store = MongoStore.from_collection(self._files_collection)
            self._files_store.last_updated_field = f"metadata.{self.last_updated_field}"
            self._files_store.key = self.key
            self._chunks_collection = db["{}.chunks".format(self.collection_name)]

__init__(self, uri, collection_name, database=None, compression=False, ensure_metadata=False, searchable_fields=None, mongoclient_kwargs=None, **kwargs) special

Initializes a GrdiFS Store for binary data

Parameters:

Name Type Description Default
uri str

MongoDB+SRV URI

required
database str

database to connect to

None
collection_name str

The collection name

required
compression bool

compress the data as it goes into GridFS

False
ensure_metadata bool

ensure returned documents have the metadata fields

False
searchable_fields List[str]

fields to keep in the index store

None
Source code in maggma/stores/gridfs.py
def __init__(
    self,
    uri: str,
    collection_name: str,
    database: str = None,
    compression: bool = False,
    ensure_metadata: bool = False,
    searchable_fields: List[str] = None,
    mongoclient_kwargs: Optional[Dict] = None,
    **kwargs,
):
    """
    Initializes a GrdiFS Store for binary data
    Args:
        uri: MongoDB+SRV URI
        database: database to connect to
        collection_name: The collection name
        compression: compress the data as it goes into GridFS
        ensure_metadata: ensure returned documents have the metadata fields
        searchable_fields: fields to keep in the index store
    """

    self.uri = uri

    # parse the dbname from the uri
    if database is None:
        d_uri = uri_parser.parse_uri(uri)
        if d_uri["database"] is None:
            raise ConfigurationError(
                "If database name is not supplied, a database must be set in the uri"
            )
        self.database = d_uri["database"]
    else:
        self.database = database

    self.collection_name = collection_name
    self._coll = None  # type: Any
    self.compression = compression
    self.ensure_metadata = ensure_metadata
    self.searchable_fields = [] if searchable_fields is None else searchable_fields
    self.kwargs = kwargs
    self.mongoclient_kwargs = mongoclient_kwargs or {}

    if "key" not in kwargs:
        kwargs["key"] = "_id"
    super(GridFSStore, self).__init__(**kwargs)  # lgtm

connect(self, force_reset=False)

Connect to the source data

Source code in maggma/stores/gridfs.py
def connect(self, force_reset: bool = False):
    """
    Connect to the source data
    """
    if not self._coll or force_reset:  # pragma: no cover
        conn: MongoClient = MongoClient(self.uri, **self.mongoclient_kwargs)
        db = conn[self.database]
        self._coll = gridfs.GridFS(db, self.collection_name)
        self._files_collection = db["{}.files".format(self.collection_name)]
        self._files_store = MongoStore.from_collection(self._files_collection)
        self._files_store.last_updated_field = f"metadata.{self.last_updated_field}"
        self._files_store.key = self.key
        self._chunks_collection = db["{}.chunks".format(self.collection_name)]

Advanced Stores for connecting to AWS data

S3Store (Store)

GridFS like storage using Amazon S3 and a regular store for indexing Assumes Amazon AWS key and secret key are set in environment or default config file

Source code in maggma/stores/aws.py
class S3Store(Store):
    """
    GridFS like storage using Amazon S3 and a regular store for indexing
    Assumes Amazon AWS key and secret key are set in environment or default config file
    """

    def __init__(
        self,
        index: Store,
        bucket: str,
        s3_profile: Optional[Union[str, dict]] = None,
        compress: bool = False,
        endpoint_url: str = None,
        sub_dir: str = None,
        s3_workers: int = 1,
        s3_resource_kwargs: Optional[dict] = None,
        key: str = "fs_id",
        store_hash: bool = True,
        unpack_data: bool = True,
        searchable_fields: Optional[List[str]] = None,
        **kwargs,
    ):
        """
        Initializes an S3 Store

        Args:
            index: a store to use to index the S3 Bucket
            bucket: name of the bucket
            s3_profile: name of aws profile containing credentials for role.
                Alternatively you can pass in a dictionary with the full credentials:
                    aws_access_key_id (string) -- AWS access key ID
                    aws_secret_access_key (string) -- AWS secret access key
                    aws_session_token (string) -- AWS temporary session token
                    region_name (string) -- Default region when creating new connections
            compress: compress files inserted into the store
            endpoint_url: endpoint_url to allow interface to minio service
            sub_dir: (optional)  subdirectory of the s3 bucket to store the data
            s3_workers: number of concurrent S3 puts to run
            store_hash: store the sha1 hash right before insertion to the database.
            unpack_data: whether to decompress and unpack byte data when querying from the bucket.
            searchable_fields: fields to keep in the index store
        """
        if boto3 is None:
            raise RuntimeError("boto3 and botocore are required for S3Store")
        self.index = index

        self.bucket = bucket
        self.s3_profile = s3_profile
        self.compress = compress
        self.endpoint_url = endpoint_url
        self.sub_dir = sub_dir.strip("/") + "/" if sub_dir else ""
        self.s3 = None  # type: Any
        self.s3_bucket = None  # type: Any
        self.s3_workers = s3_workers
        self.s3_resource_kwargs = (
            s3_resource_kwargs if s3_resource_kwargs is not None else {}
        )
        self.unpack_data = unpack_data
        self.searchable_fields = (
            searchable_fields if searchable_fields is not None else []
        )
        self.store_hash = store_hash

        # Force the key to be the same as the index
        assert isinstance(
            index.key, str
        ), "Since we are using the key as a file name in S3, they key must be a string"
        if key != index.key:
            warnings.warn(
                f'The desired S3Store key "{key}" does not match the index key "{index.key},"'
                "the index key will be used",
                UserWarning,
            )
        kwargs["key"] = str(index.key)

        self._thread_local = threading.local()
        super(S3Store, self).__init__(**kwargs)

    @property
    def name(self) -> str:
        """
        Returns:
            a string representing this data source
        """
        return f"s3://{self.bucket}"

    def connect(self, *args, **kwargs):  # lgtm[py/conflicting-attributes]
        """
        Connect to the source data
        """

        session = self._get_session()
        resource = session.resource(
            "s3", endpoint_url=self.endpoint_url, **self.s3_resource_kwargs
        )

        if not self.s3:
            self.s3 = resource
            try:
                self.s3.meta.client.head_bucket(Bucket=self.bucket)
            except ClientError:
                raise RuntimeError("Bucket not present on AWS: {}".format(self.bucket))

            self.s3_bucket = resource.Bucket(self.bucket)
        self.index.connect(*args, **kwargs)

    def close(self):
        """
        Closes any connections
        """
        self.index.close()
        self.s3 = None
        self.s3_bucket = None

    @property
    def _collection(self):
        """
        Returns:
            a handle to the pymongo collection object

        Important:
            Not guaranteed to exist in the future
        """
        # For now returns the index collection since that is what we would "search" on
        return self.index._collection

    def count(self, criteria: Optional[Dict] = None) -> int:
        """
        Counts the number of documents matching the query criteria

        Args:
            criteria: PyMongo filter for documents to count in
        """

        return self.index.count(criteria)

    def query(
        self,
        criteria: Optional[Dict] = None,
        properties: Union[Dict, List, None] = None,
        sort: Optional[Dict[str, Union[Sort, int]]] = None,
        skip: int = 0,
        limit: int = 0,
    ) -> Iterator[Dict]:
        """
        Queries the Store for a set of documents

        Args:
            criteria: PyMongo filter for documents to search in
            properties: properties to return in grouped documents
            sort: Dictionary of sort order for fields. Keys are field names and
                values are 1 for ascending or -1 for descending.
            skip: number documents to skip
            limit: limit on total number of documents returned

        """
        prop_keys = set()
        if isinstance(properties, dict):
            prop_keys = set(properties.keys())
        elif isinstance(properties, list):
            prop_keys = set(properties)

        for doc in self.index.query(
            criteria=criteria, sort=sort, limit=limit, skip=skip
        ):
            if properties is not None and prop_keys.issubset(set(doc.keys())):
                yield {p: doc[p] for p in properties if p in doc}
            else:
                try:
                    # TODO: THis is ugly and unsafe, do some real checking before pulling data
                    data = (
                        self.s3_bucket.Object(self.sub_dir + str(doc[self.key]))
                        .get()["Body"]
                        .read()
                    )
                except botocore.exceptions.ClientError as e:
                    # If a client error is thrown, then check that it was a 404 error.
                    # If it was a 404 error, then the object does not exist.
                    error_code = int(e.response["Error"]["Code"])
                    if error_code == 404:
                        self.logger.error(
                            "Could not find S3 object {}".format(doc[self.key])
                        )
                        break
                    else:
                        raise e

                if self.unpack_data:
                    data = self._unpack(
                        data=data, compressed=doc.get("compression", "") == "zlib"
                    )

                    if self.last_updated_field in doc:
                        data[self.last_updated_field] = doc[self.last_updated_field]

                yield data

    @staticmethod
    def _unpack(data: bytes, compressed: bool):

        if compressed:
            data = zlib.decompress(data)
        # requires msgpack-python to be installed to fix string encoding problem
        # https://github.com/msgpack/msgpack/issues/121
        # During recursion
        # msgpack.unpackb goes as deep as possible during reconstruction
        # MontyDecoder().process_decode only goes until it finds a from_dict
        # as such, we cannot just use msgpack.unpackb(data, object_hook=monty_object_hook, raw=False)
        # Should just return the unpacked object then let the user run process_decoded
        unpacked_data = msgpack.unpackb(data, raw=False)
        return unpacked_data

    def distinct(
        self, field: str, criteria: Optional[Dict] = None, all_exist: bool = False
    ) -> List:
        """
        Get all distinct values for a field

        Args:
            field: the field(s) to get distinct values for
            criteria: PyMongo filter for documents to search in
        """
        # Index is a store so it should have its own distinct function
        return self.index.distinct(field, criteria=criteria)

    def groupby(
        self,
        keys: Union[List[str], str],
        criteria: Optional[Dict] = None,
        properties: Union[Dict, List, None] = None,
        sort: Optional[Dict[str, Union[Sort, int]]] = None,
        skip: int = 0,
        limit: int = 0,
    ) -> Iterator[Tuple[Dict, List[Dict]]]:
        """
        Simple grouping function that will group documents
        by keys.

        Args:
            keys: fields to group documents
            criteria: PyMongo filter for documents to search in
            properties: properties to return in grouped documents
            sort: Dictionary of sort order for fields. Keys are field names and
                values are 1 for ascending or -1 for descending.
            skip: number documents to skip
            limit: limit on total number of documents returned

        Returns:
            generator returning tuples of (dict, list of docs)
        """
        return self.index.groupby(
            keys=keys,
            criteria=criteria,
            properties=properties,
            sort=sort,
            skip=skip,
            limit=limit,
        )

    def ensure_index(self, key: str, unique: bool = False) -> bool:
        """
        Tries to create an index and return true if it suceeded

        Args:
            key: single key to index
            unique: Whether or not this index contains only unique keys

        Returns:
            bool indicating if the index exists/was created
        """
        return self.index.ensure_index(key, unique=unique)

    def update(
        self,
        docs: Union[List[Dict], Dict],
        key: Union[List, str, None] = None,
        additional_metadata: Union[str, List[str], None] = None,
    ):
        """
        Update documents into the Store

        Args:
            docs: the document or list of documents to update
            key: field name(s) to determine uniqueness for a
                 document, can be a list of multiple fields,
                 a single field, or None if the Store's key
                 field is to be used
            additional_metadata: field(s) to include in the s3 store's metadata
        """
        if not isinstance(docs, list):
            docs = [docs]

        if isinstance(key, str):
            key = [key]
        elif not key:
            key = [self.key]

        if additional_metadata is None:
            additional_metadata = []
        elif isinstance(additional_metadata, str):
            additional_metadata = [additional_metadata]
        else:
            additional_metadata = list(additional_metadata)

        with ThreadPoolExecutor(max_workers=self.s3_workers) as pool:
            fs = {
                pool.submit(
                    self.write_doc_to_s3,
                    doc=itr_doc,
                    search_keys=key + additional_metadata + self.searchable_fields,
                )
                for itr_doc in docs
            }
            fs, _ = wait(fs)

            search_docs = [sdoc.result() for sdoc in fs]

        # Use store's update to remove key clashes
        self.index.update(search_docs, key=self.key)

    def _get_session(self):
        if not hasattr(self._thread_local, "s3_bucket"):
            if isinstance(self.s3_profile, dict):
                return Session(**self.s3_profile)
            else:
                return Session(profile_name=self.s3_profile)

    def _get_bucket(self):
        """
        If on the main thread return the bucket created above, else create a new bucket on each thread
        """
        if threading.current_thread().name == "MainThread":
            return self.s3_bucket
        if not hasattr(self._thread_local, "s3_bucket"):
            session = self._get_session()
            resource = session.resource("s3", endpoint_url=self.endpoint_url)
            self._thread_local.s3_bucket = resource.Bucket(self.bucket)
        return self._thread_local.s3_bucket

    def write_doc_to_s3(self, doc: Dict, search_keys: List[str]):
        """
        Write the data to s3 and return the metadata to be inserted into the index db

        Args:
            doc: the document
            search_keys: list of keys to pull from the docs and be inserted into the
            index db
        """
        s3_bucket = self._get_bucket()

        search_doc = {k: doc[k] for k in search_keys}
        search_doc[self.key] = doc[self.key]  # Ensure key is in metadata
        if self.sub_dir != "":
            search_doc["sub_dir"] = self.sub_dir

        # Remove MongoDB _id from search
        if "_id" in search_doc:
            del search_doc["_id"]

        # to make hashing more meaningful, make sure last updated field is removed
        lu_info = doc.pop(self.last_updated_field, None)
        data = msgpack.packb(doc, default=monty_default)

        if self.compress:
            # Compress with zlib if chosen
            search_doc["compression"] = "zlib"
            data = zlib.compress(data)

        if self.last_updated_field in doc:
            # need this conversion for aws metadata insert
            search_doc[self.last_updated_field] = str(
                to_isoformat_ceil_ms(doc[self.last_updated_field])
            )

        # keep a record of original keys, in case these are important for the individual researcher
        # it is not expected that this information will be used except in disaster recovery
        s3_to_mongo_keys = {k: self._sanitize_key(k) for k in search_doc.keys()}
        s3_to_mongo_keys["s3-to-mongo-keys"] = "s3-to-mongo-keys"  # inception
        # encode dictionary since values have to be strings
        search_doc["s3-to-mongo-keys"] = dumps(s3_to_mongo_keys)
        s3_bucket.put_object(
            Key=self.sub_dir + str(doc[self.key]),
            Body=data,
            Metadata={s3_to_mongo_keys[k]: str(v) for k, v in search_doc.items()},
        )

        if lu_info is not None:
            search_doc[self.last_updated_field] = lu_info

        if self.store_hash:
            hasher = sha1()
            hasher.update(data)
            obj_hash = hasher.hexdigest()
            search_doc["obj_hash"] = obj_hash
        return search_doc

    @staticmethod
    def _sanitize_key(key):
        """
        Sanitize keys to store in S3/MinIO metadata.
        """

        # Any underscores are encoded as double dashes in metadata, since keys with
        # underscores may be result in the corresponding HTTP header being stripped
        # by certain server configurations (e.g. default nginx), leading to:
        # `botocore.exceptions.ClientError: An error occurred (AccessDenied) when
        # calling the PutObject operation: There were headers present in the request
        # which were not signed`
        # Metadata stored in the MongoDB index (self.index) is stored unchanged.

        # Additionally, MinIO requires lowercase keys
        return str(key).replace("_", "-").lower()

    def remove_docs(self, criteria: Dict, remove_s3_object: bool = False):
        """
        Remove docs matching the query dictionary

        Args:
            criteria: query dictionary to match
            remove_s3_object: whether to remove the actual S3 Object or not
        """
        if not remove_s3_object:
            self.index.remove_docs(criteria=criteria)
        else:
            to_remove = self.index.distinct(self.key, criteria=criteria)
            self.index.remove_docs(criteria=criteria)

            # Can remove up to 1000 items at a time via boto
            to_remove_chunks = list(grouper(to_remove, n=1000))
            for chunk_to_remove in to_remove_chunks:
                objlist = [{"Key": f"{self.sub_dir}{obj}"} for obj in chunk_to_remove]
                self.s3_bucket.delete_objects(Delete={"Objects": objlist})

    @property
    def last_updated(self):
        return self.index.last_updated

    def newer_in(
        self, target: Store, criteria: Optional[Dict] = None, exhaustive: bool = False
    ) -> List[str]:
        """
        Returns the keys of documents that are newer in the target
        Store than this Store.

        Args:
            target: target Store
            criteria: PyMongo filter for documents to search in
            exhaustive: triggers an item-by-item check vs. checking
                        the last_updated of the target Store and using
                        that to filter out new items in
        """
        if hasattr(target, "index"):
            return self.index.newer_in(
                target=target.index, criteria=criteria, exhaustive=exhaustive
            )
        else:
            return self.index.newer_in(
                target=target, criteria=criteria, exhaustive=exhaustive
            )

    def __hash__(self):
        return hash((self.index.__hash__, self.bucket))

    def rebuild_index_from_s3_data(self, **kwargs):
        """
        Rebuilds the index Store from the data in S3
        Relies on the index document being stores as the metadata for the file
        This can help recover lost databases
        """
        bucket = self.s3_bucket
        objects = bucket.objects.filter(Prefix=self.sub_dir)
        for obj in objects:
            key_ = self.sub_dir + obj.key
            data = self.s3_bucket.Object(key_).get()["Body"].read()

            if self.compress:
                data = zlib.decompress(data)
            unpacked_data = msgpack.unpackb(data, raw=False)
            self.update(unpacked_data, **kwargs)

    def rebuild_metadata_from_index(self, index_query: dict = None):
        """
        Read data from the index store and populate the metadata of the S3 bucket
        Force all of the keys to be lower case to be Minio compatible
        Args:
            index_query: query on the index store
        """

        qq = {} if index_query is None else index_query
        for index_doc in self.index.query(qq):
            key_ = self.sub_dir + index_doc[self.key]
            s3_object = self.s3_bucket.Object(key_)
            new_meta = {self._sanitize_key(k): v for k, v in s3_object.metadata.items()}
            for k, v in index_doc.items():
                new_meta[str(k).lower()] = v
            new_meta.pop("_id")
            if self.last_updated_field in new_meta:
                new_meta[self.last_updated_field] = str(
                    to_isoformat_ceil_ms(new_meta[self.last_updated_field])
                )
            # s3_object.metadata.update(new_meta)
            s3_object.copy_from(
                CopySource={"Bucket": self.s3_bucket.name, "Key": key_},
                Metadata=new_meta,
                MetadataDirective="REPLACE",
            )

    def __eq__(self, other: object) -> bool:
        """
        Check equality for S3Store
        other: other S3Store to compare with
        """
        if not isinstance(other, S3Store):
            return False

        fields = ["index", "bucket", "last_updated_field"]
        return all(getattr(self, f) == getattr(other, f) for f in fields)

last_updated property readonly

Provides the most recent last_updated date time stamp from the documents in this Store

name: str property readonly

Returns:

Type Description
str

a string representing this data source

__eq__(self, other) special

Check equality for S3Store other: other S3Store to compare with

Source code in maggma/stores/aws.py
def __eq__(self, other: object) -> bool:
    """
    Check equality for S3Store
    other: other S3Store to compare with
    """
    if not isinstance(other, S3Store):
        return False

    fields = ["index", "bucket", "last_updated_field"]
    return all(getattr(self, f) == getattr(other, f) for f in fields)

__init__(self, index, bucket, s3_profile=None, compress=False, endpoint_url=None, sub_dir=None, s3_workers=1, s3_resource_kwargs=None, key='fs_id', store_hash=True, unpack_data=True, searchable_fields=None, **kwargs) special

Initializes an S3 Store

Parameters:

Name Type Description Default
index Store

a store to use to index the S3 Bucket

required
bucket str

name of the bucket

required
s3_profile Union[str, dict]

name of aws profile containing credentials for role. Alternatively you can pass in a dictionary with the full credentials: aws_access_key_id (string) -- AWS access key ID aws_secret_access_key (string) -- AWS secret access key aws_session_token (string) -- AWS temporary session token region_name (string) -- Default region when creating new connections

None
compress bool

compress files inserted into the store

False
endpoint_url str

endpoint_url to allow interface to minio service

None
sub_dir str

(optional) subdirectory of the s3 bucket to store the data

None
s3_workers int

number of concurrent S3 puts to run

1
store_hash bool

store the sha1 hash right before insertion to the database.

True
unpack_data bool

whether to decompress and unpack byte data when querying from the bucket.

True
searchable_fields Optional[List[str]]

fields to keep in the index store

None
Source code in maggma/stores/aws.py
def __init__(
    self,
    index: Store,
    bucket: str,
    s3_profile: Optional[Union[str, dict]] = None,
    compress: bool = False,
    endpoint_url: str = None,
    sub_dir: str = None,
    s3_workers: int = 1,
    s3_resource_kwargs: Optional[dict] = None,
    key: str = "fs_id",
    store_hash: bool = True,
    unpack_data: bool = True,
    searchable_fields: Optional[List[str]] = None,
    **kwargs,
):
    """
    Initializes an S3 Store

    Args:
        index: a store to use to index the S3 Bucket
        bucket: name of the bucket
        s3_profile: name of aws profile containing credentials for role.
            Alternatively you can pass in a dictionary with the full credentials:
                aws_access_key_id (string) -- AWS access key ID
                aws_secret_access_key (string) -- AWS secret access key
                aws_session_token (string) -- AWS temporary session token
                region_name (string) -- Default region when creating new connections
        compress: compress files inserted into the store
        endpoint_url: endpoint_url to allow interface to minio service
        sub_dir: (optional)  subdirectory of the s3 bucket to store the data
        s3_workers: number of concurrent S3 puts to run
        store_hash: store the sha1 hash right before insertion to the database.
        unpack_data: whether to decompress and unpack byte data when querying from the bucket.
        searchable_fields: fields to keep in the index store
    """
    if boto3 is None:
        raise RuntimeError("boto3 and botocore are required for S3Store")
    self.index = index

    self.bucket = bucket
    self.s3_profile = s3_profile
    self.compress = compress
    self.endpoint_url = endpoint_url
    self.sub_dir = sub_dir.strip("/") + "/" if sub_dir else ""
    self.s3 = None  # type: Any
    self.s3_bucket = None  # type: Any
    self.s3_workers = s3_workers
    self.s3_resource_kwargs = (
        s3_resource_kwargs if s3_resource_kwargs is not None else {}
    )
    self.unpack_data = unpack_data
    self.searchable_fields = (
        searchable_fields if searchable_fields is not None else []
    )
    self.store_hash = store_hash

    # Force the key to be the same as the index
    assert isinstance(
        index.key, str
    ), "Since we are using the key as a file name in S3, they key must be a string"
    if key != index.key:
        warnings.warn(
            f'The desired S3Store key "{key}" does not match the index key "{index.key},"'
            "the index key will be used",
            UserWarning,
        )
    kwargs["key"] = str(index.key)

    self._thread_local = threading.local()
    super(S3Store, self).__init__(**kwargs)

close(self)

Closes any connections

Source code in maggma/stores/aws.py
def close(self):
    """
    Closes any connections
    """
    self.index.close()
    self.s3 = None
    self.s3_bucket = None

connect(self, *args, **kwargs)

Connect to the source data

Source code in maggma/stores/aws.py
def connect(self, *args, **kwargs):  # lgtm[py/conflicting-attributes]
    """
    Connect to the source data
    """

    session = self._get_session()
    resource = session.resource(
        "s3", endpoint_url=self.endpoint_url, **self.s3_resource_kwargs
    )

    if not self.s3:
        self.s3 = resource
        try:
            self.s3.meta.client.head_bucket(Bucket=self.bucket)
        except ClientError:
            raise RuntimeError("Bucket not present on AWS: {}".format(self.bucket))

        self.s3_bucket = resource.Bucket(self.bucket)
    self.index.connect(*args, **kwargs)

count(self, criteria=None)

Counts the number of documents matching the query criteria

Parameters:

Name Type Description Default
criteria Optional[Dict]

PyMongo filter for documents to count in

None
Source code in maggma/stores/aws.py
def count(self, criteria: Optional[Dict] = None) -> int:
    """
    Counts the number of documents matching the query criteria

    Args:
        criteria: PyMongo filter for documents to count in
    """

    return self.index.count(criteria)

distinct(self, field, criteria=None, all_exist=False)

Get all distinct values for a field

Parameters:

Name Type Description Default
field str

the field(s) to get distinct values for

required
criteria Optional[Dict]

PyMongo filter for documents to search in

None
Source code in maggma/stores/aws.py
def distinct(
    self, field: str, criteria: Optional[Dict] = None, all_exist: bool = False
) -> List:
    """
    Get all distinct values for a field

    Args:
        field: the field(s) to get distinct values for
        criteria: PyMongo filter for documents to search in
    """
    # Index is a store so it should have its own distinct function
    return self.index.distinct(field, criteria=criteria)

ensure_index(self, key, unique=False)

Tries to create an index and return true if it suceeded

Parameters:

Name Type Description Default
key str

single key to index

required
unique bool

Whether or not this index contains only unique keys

False

Returns:

Type Description
bool

bool indicating if the index exists/was created

Source code in maggma/stores/aws.py
def ensure_index(self, key: str, unique: bool = False) -> bool:
    """
    Tries to create an index and return true if it suceeded

    Args:
        key: single key to index
        unique: Whether or not this index contains only unique keys

    Returns:
        bool indicating if the index exists/was created
    """
    return self.index.ensure_index(key, unique=unique)

groupby(self, keys, criteria=None, properties=None, sort=None, skip=0, limit=0)

Simple grouping function that will group documents by keys.

Parameters:

Name Type Description Default
keys Union[List[str], str]

fields to group documents

required
criteria Optional[Dict]

PyMongo filter for documents to search in

None
properties Union[Dict, List]

properties to return in grouped documents

None
sort Optional[Dict[str, Union[maggma.core.store.Sort, int]]]

Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending.

None
skip int

number documents to skip

0
limit int

limit on total number of documents returned

0

Returns:

Type Description
Iterator[Tuple[Dict, List[Dict]]]

generator returning tuples of (dict, list of docs)

Source code in maggma/stores/aws.py
def groupby(
    self,
    keys: Union[List[str], str],
    criteria: Optional[Dict] = None,
    properties: Union[Dict, List, None] = None,
    sort: Optional[Dict[str, Union[Sort, int]]] = None,
    skip: int = 0,
    limit: int = 0,
) -> Iterator[Tuple[Dict, List[Dict]]]:
    """
    Simple grouping function that will group documents
    by keys.

    Args:
        keys: fields to group documents
        criteria: PyMongo filter for documents to search in
        properties: properties to return in grouped documents
        sort: Dictionary of sort order for fields. Keys are field names and
            values are 1 for ascending or -1 for descending.
        skip: number documents to skip
        limit: limit on total number of documents returned

    Returns:
        generator returning tuples of (dict, list of docs)
    """
    return self.index.groupby(
        keys=keys,
        criteria=criteria,
        properties=properties,
        sort=sort,
        skip=skip,
        limit=limit,
    )

newer_in(self, target, criteria=None, exhaustive=False)

Returns the keys of documents that are newer in the target Store than this Store.

Parameters:

Name Type Description Default
target Store

target Store

required
criteria Optional[Dict]

PyMongo filter for documents to search in

None
exhaustive bool

triggers an item-by-item check vs. checking the last_updated of the target Store and using that to filter out new items in

False
Source code in maggma/stores/aws.py
def newer_in(
    self, target: Store, criteria: Optional[Dict] = None, exhaustive: bool = False
) -> List[str]:
    """
    Returns the keys of documents that are newer in the target
    Store than this Store.

    Args:
        target: target Store
        criteria: PyMongo filter for documents to search in
        exhaustive: triggers an item-by-item check vs. checking
                    the last_updated of the target Store and using
                    that to filter out new items in
    """
    if hasattr(target, "index"):
        return self.index.newer_in(
            target=target.index, criteria=criteria, exhaustive=exhaustive
        )
    else:
        return self.index.newer_in(
            target=target, criteria=criteria, exhaustive=exhaustive
        )

query(self, criteria=None, properties=None, sort=None, skip=0, limit=0)

Queries the Store for a set of documents

Parameters:

Name Type Description Default
criteria Optional[Dict]

PyMongo filter for documents to search in

None
properties Union[Dict, List]

properties to return in grouped documents

None
sort Optional[Dict[str, Union[maggma.core.store.Sort, int]]]

Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending.

None
skip int

number documents to skip

0
limit int

limit on total number of documents returned

0
Source code in maggma/stores/aws.py
def query(
    self,
    criteria: Optional[Dict] = None,
    properties: Union[Dict, List, None] = None,
    sort: Optional[Dict[str, Union[Sort, int]]] = None,
    skip: int = 0,
    limit: int = 0,
) -> Iterator[Dict]:
    """
    Queries the Store for a set of documents

    Args:
        criteria: PyMongo filter for documents to search in
        properties: properties to return in grouped documents
        sort: Dictionary of sort order for fields. Keys are field names and
            values are 1 for ascending or -1 for descending.
        skip: number documents to skip
        limit: limit on total number of documents returned

    """
    prop_keys = set()
    if isinstance(properties, dict):
        prop_keys = set(properties.keys())
    elif isinstance(properties, list):
        prop_keys = set(properties)

    for doc in self.index.query(
        criteria=criteria, sort=sort, limit=limit, skip=skip
    ):
        if properties is not None and prop_keys.issubset(set(doc.keys())):
            yield {p: doc[p] for p in properties if p in doc}
        else:
            try:
                # TODO: THis is ugly and unsafe, do some real checking before pulling data
                data = (
                    self.s3_bucket.Object(self.sub_dir + str(doc[self.key]))
                    .get()["Body"]
                    .read()
                )
            except botocore.exceptions.ClientError as e:
                # If a client error is thrown, then check that it was a 404 error.
                # If it was a 404 error, then the object does not exist.
                error_code = int(e.response["Error"]["Code"])
                if error_code == 404:
                    self.logger.error(
                        "Could not find S3 object {}".format(doc[self.key])
                    )
                    break
                else:
                    raise e

            if self.unpack_data:
                data = self._unpack(
                    data=data, compressed=doc.get("compression", "") == "zlib"
                )

                if self.last_updated_field in doc:
                    data[self.last_updated_field] = doc[self.last_updated_field]

            yield data

rebuild_index_from_s3_data(self, **kwargs)

Rebuilds the index Store from the data in S3 Relies on the index document being stores as the metadata for the file This can help recover lost databases

Source code in maggma/stores/aws.py
def rebuild_index_from_s3_data(self, **kwargs):
    """
    Rebuilds the index Store from the data in S3
    Relies on the index document being stores as the metadata for the file
    This can help recover lost databases
    """
    bucket = self.s3_bucket
    objects = bucket.objects.filter(Prefix=self.sub_dir)
    for obj in objects:
        key_ = self.sub_dir + obj.key
        data = self.s3_bucket.Object(key_).get()["Body"].read()

        if self.compress:
            data = zlib.decompress(data)
        unpacked_data = msgpack.unpackb(data, raw=False)
        self.update(unpacked_data, **kwargs)

rebuild_metadata_from_index(self, index_query=None)

Read data from the index store and populate the metadata of the S3 bucket Force all of the keys to be lower case to be Minio compatible

Parameters:

Name Type Description Default
index_query dict

query on the index store

None
Source code in maggma/stores/aws.py
def rebuild_metadata_from_index(self, index_query: dict = None):
    """
    Read data from the index store and populate the metadata of the S3 bucket
    Force all of the keys to be lower case to be Minio compatible
    Args:
        index_query: query on the index store
    """

    qq = {} if index_query is None else index_query
    for index_doc in self.index.query(qq):
        key_ = self.sub_dir + index_doc[self.key]
        s3_object = self.s3_bucket.Object(key_)
        new_meta = {self._sanitize_key(k): v for k, v in s3_object.metadata.items()}
        for k, v in index_doc.items():
            new_meta[str(k).lower()] = v
        new_meta.pop("_id")
        if self.last_updated_field in new_meta:
            new_meta[self.last_updated_field] = str(
                to_isoformat_ceil_ms(new_meta[self.last_updated_field])
            )
        # s3_object.metadata.update(new_meta)
        s3_object.copy_from(
            CopySource={"Bucket": self.s3_bucket.name, "Key": key_},
            Metadata=new_meta,
            MetadataDirective="REPLACE",
        )

remove_docs(self, criteria, remove_s3_object=False)

Remove docs matching the query dictionary

Parameters:

Name Type Description Default
criteria Dict

query dictionary to match

required
remove_s3_object bool

whether to remove the actual S3 Object or not

False
Source code in maggma/stores/aws.py
def remove_docs(self, criteria: Dict, remove_s3_object: bool = False):
    """
    Remove docs matching the query dictionary

    Args:
        criteria: query dictionary to match
        remove_s3_object: whether to remove the actual S3 Object or not
    """
    if not remove_s3_object:
        self.index.remove_docs(criteria=criteria)
    else:
        to_remove = self.index.distinct(self.key, criteria=criteria)
        self.index.remove_docs(criteria=criteria)

        # Can remove up to 1000 items at a time via boto
        to_remove_chunks = list(grouper(to_remove, n=1000))
        for chunk_to_remove in to_remove_chunks:
            objlist = [{"Key": f"{self.sub_dir}{obj}"} for obj in chunk_to_remove]
            self.s3_bucket.delete_objects(Delete={"Objects": objlist})

update(self, docs, key=None, additional_metadata=None)

Update documents into the Store

Parameters:

Name Type Description Default
docs Union[List[Dict], Dict]

the document or list of documents to update

required
key Union[List, str]

field name(s) to determine uniqueness for a document, can be a list of multiple fields, a single field, or None if the Store's key field is to be used

None
additional_metadata Union[str, List[str]]

field(s) to include in the s3 store's metadata

None
Source code in maggma/stores/aws.py
def update(
    self,
    docs: Union[List[Dict], Dict],
    key: Union[List, str, None] = None,
    additional_metadata: Union[str, List[str], None] = None,
):
    """
    Update documents into the Store

    Args:
        docs: the document or list of documents to update
        key: field name(s) to determine uniqueness for a
             document, can be a list of multiple fields,
             a single field, or None if the Store's key
             field is to be used
        additional_metadata: field(s) to include in the s3 store's metadata
    """
    if not isinstance(docs, list):
        docs = [docs]

    if isinstance(key, str):
        key = [key]
    elif not key:
        key = [self.key]

    if additional_metadata is None:
        additional_metadata = []
    elif isinstance(additional_metadata, str):
        additional_metadata = [additional_metadata]
    else:
        additional_metadata = list(additional_metadata)

    with ThreadPoolExecutor(max_workers=self.s3_workers) as pool:
        fs = {
            pool.submit(
                self.write_doc_to_s3,
                doc=itr_doc,
                search_keys=key + additional_metadata + self.searchable_fields,
            )
            for itr_doc in docs
        }
        fs, _ = wait(fs)

        search_docs = [sdoc.result() for sdoc in fs]

    # Use store's update to remove key clashes
    self.index.update(search_docs, key=self.key)

write_doc_to_s3(self, doc, search_keys)

Write the data to s3 and return the metadata to be inserted into the index db

Parameters:

Name Type Description Default
doc Dict

the document

required
search_keys List[str]

list of keys to pull from the docs and be inserted into the

required
Source code in maggma/stores/aws.py
def write_doc_to_s3(self, doc: Dict, search_keys: List[str]):
    """
    Write the data to s3 and return the metadata to be inserted into the index db

    Args:
        doc: the document
        search_keys: list of keys to pull from the docs and be inserted into the
        index db
    """
    s3_bucket = self._get_bucket()

    search_doc = {k: doc[k] for k in search_keys}
    search_doc[self.key] = doc[self.key]  # Ensure key is in metadata
    if self.sub_dir != "":
        search_doc["sub_dir"] = self.sub_dir

    # Remove MongoDB _id from search
    if "_id" in search_doc:
        del search_doc["_id"]

    # to make hashing more meaningful, make sure last updated field is removed
    lu_info = doc.pop(self.last_updated_field, None)
    data = msgpack.packb(doc, default=monty_default)

    if self.compress:
        # Compress with zlib if chosen
        search_doc["compression"] = "zlib"
        data = zlib.compress(data)

    if self.last_updated_field in doc:
        # need this conversion for aws metadata insert
        search_doc[self.last_updated_field] = str(
            to_isoformat_ceil_ms(doc[self.last_updated_field])
        )

    # keep a record of original keys, in case these are important for the individual researcher
    # it is not expected that this information will be used except in disaster recovery
    s3_to_mongo_keys = {k: self._sanitize_key(k) for k in search_doc.keys()}
    s3_to_mongo_keys["s3-to-mongo-keys"] = "s3-to-mongo-keys"  # inception
    # encode dictionary since values have to be strings
    search_doc["s3-to-mongo-keys"] = dumps(s3_to_mongo_keys)
    s3_bucket.put_object(
        Key=self.sub_dir + str(doc[self.key]),
        Body=data,
        Metadata={s3_to_mongo_keys[k]: str(v) for k, v in search_doc.items()},
    )

    if lu_info is not None:
        search_doc[self.last_updated_field] = lu_info

    if self.store_hash:
        hasher = sha1()
        hasher.update(data)
        obj_hash = hasher.hexdigest()
        search_doc["obj_hash"] = obj_hash
    return search_doc

Advanced Stores for behavior outside normal access patterns

AliasingStore (Store)

Special Store that aliases for the primary accessors

Source code in maggma/stores/advanced_stores.py
class AliasingStore(Store):
    """
    Special Store that aliases for the primary accessors
    """

    def __init__(self, store: Store, aliases: Dict, **kwargs):
        """
        Args:
            store: the store to wrap around
            aliases: dict of aliases of the form external key: internal key
        """
        self.store = store
        # Given an external key tells what the internal key is
        self.aliases = aliases
        # Given the internal key tells us what the external key is
        self.reverse_aliases = {v: k for k, v in aliases.items()}
        self.kwargs = kwargs

        kwargs.update(
            {
                "last_updated_field": store.last_updated_field,
                "last_updated_type": store.last_updated_type,
            }
        )
        super(AliasingStore, self).__init__(**kwargs)

    @property
    def name(self) -> str:
        """
        Return a string representing this data source
        """
        return self.store.name

    def count(self, criteria: Optional[Dict] = None) -> int:
        """
        Counts the number of documents matching the query criteria

        Args:
            criteria: PyMongo filter for documents to count in
        """
        criteria = criteria if criteria else {}
        lazy_substitute(criteria, self.reverse_aliases)
        return self.store.count(criteria)

    def query(
        self,
        criteria: Optional[Dict] = None,
        properties: Union[Dict, List, None] = None,
        sort: Optional[Dict[str, Union[Sort, int]]] = None,
        skip: int = 0,
        limit: int = 0,
    ) -> Iterator[Dict]:
        """
        Queries the Store for a set of documents

        Args:
            criteria: PyMongo filter for documents to search in
            properties: properties to return in grouped documents
            sort: Dictionary of sort order for fields. Keys are field names and
                values are 1 for ascending or -1 for descending.
            skip: number documents to skip
            limit: limit on total number of documents returned
        """

        criteria = criteria if criteria else {}

        if properties is not None:
            if isinstance(properties, list):
                properties = {p: 1 for p in properties}
            substitute(properties, self.reverse_aliases)

        lazy_substitute(criteria, self.reverse_aliases)
        for d in self.store.query(
            properties=properties, criteria=criteria, sort=sort, limit=limit, skip=skip
        ):
            substitute(d, self.aliases)
            yield d

    def distinct(
        self, field: str, criteria: Optional[Dict] = None, all_exist: bool = False
    ) -> List:
        """
        Get all distinct values for a field

        Args:
            field: the field(s) to get distinct values for
            criteria: PyMongo filter for documents to search in
        """
        criteria = criteria if criteria else {}
        lazy_substitute(criteria, self.reverse_aliases)

        # substitute forward
        return self.store.distinct(self.aliases[field], criteria=criteria)

    def groupby(
        self,
        keys: Union[List[str], str],
        criteria: Optional[Dict] = None,
        properties: Union[Dict, List, None] = None,
        sort: Optional[Dict[str, Union[Sort, int]]] = None,
        skip: int = 0,
        limit: int = 0,
    ) -> Iterator[Tuple[Dict, List[Dict]]]:
        """
        Simple grouping function that will group documents
        by keys.

        Args:
            keys: fields to group documents
            criteria: PyMongo filter for documents to search in
            properties: properties to return in grouped documents
            sort: Dictionary of sort order for fields. Keys are field names and
                values are 1 for ascending or -1 for descending.
            skip: number documents to skip
            limit: limit on total number of documents returned

        Returns:
            generator returning tuples of (dict, list of docs)
        """
        # Convert to a list
        keys = keys if isinstance(keys, list) else [keys]

        # Make the aliasing transformations on keys
        keys = [self.aliases[k] if k in self.aliases else k for k in keys]

        # Update criteria and properties based on aliases
        criteria = criteria if criteria else {}

        if properties is not None:
            if isinstance(properties, list):
                properties = {p: 1 for p in properties}
            substitute(properties, self.reverse_aliases)

        lazy_substitute(criteria, self.reverse_aliases)

        return self.store.groupby(
            keys=keys, properties=properties, criteria=criteria, skip=skip, limit=limit
        )

    def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = None):
        """
        Update documents into the Store

        Args:
            docs: the document or list of documents to update
            key: field name(s) to determine uniqueness for a
                 document, can be a list of multiple fields,
                 a single field, or None if the Store's key
                 field is to be used
        """
        key = key if key else self.key

        for d in docs:
            substitute(d, self.reverse_aliases)

        if key in self.aliases:
            key = self.aliases[key]

        self.store.update(docs, key=key)

    def remove_docs(self, criteria: Dict):
        """
        Remove docs matching the query dictionary

        Args:
            criteria: query dictionary to match
        """
        # Update criteria and properties based on aliases
        lazy_substitute(criteria, self.reverse_aliases)
        self.store.remove_docs(criteria)

    def ensure_index(self, key, unique=False, **kwargs):
        if key in self.aliases:
            key = self.aliases
        return self.store.ensure_index(key, unique, **kwargs)

    def close(self):
        self.store.close()

    @property
    def _collection(self):
        return self.store._collection

    def connect(self, force_reset=False):
        self.store.connect(force_reset=force_reset)

    def __eq__(self, other: object) -> bool:
        """
        Check equality for AliasingStore

        Args:
            other: other AliasingStore to compare with
        """
        if not isinstance(other, AliasingStore):
            return False

        fields = ["store", "aliases", "last_updated_field"]
        return all(getattr(self, f) == getattr(other, f) for f in fields)

name: str property readonly

Return a string representing this data source

__eq__(self, other) special

Check equality for AliasingStore

Parameters:

Name Type Description Default
other object

other AliasingStore to compare with

required
Source code in maggma/stores/advanced_stores.py
def __eq__(self, other: object) -> bool:
    """
    Check equality for AliasingStore

    Args:
        other: other AliasingStore to compare with
    """
    if not isinstance(other, AliasingStore):
        return False

    fields = ["store", "aliases", "last_updated_field"]
    return all(getattr(self, f) == getattr(other, f) for f in fields)

__init__(self, store, aliases, **kwargs) special

Parameters:

Name Type Description Default
store Store

the store to wrap around

required
aliases Dict

dict of aliases of the form external key: internal key

required
Source code in maggma/stores/advanced_stores.py
def __init__(self, store: Store, aliases: Dict, **kwargs):
    """
    Args:
        store: the store to wrap around
        aliases: dict of aliases of the form external key: internal key
    """
    self.store = store
    # Given an external key tells what the internal key is
    self.aliases = aliases
    # Given the internal key tells us what the external key is
    self.reverse_aliases = {v: k for k, v in aliases.items()}
    self.kwargs = kwargs

    kwargs.update(
        {
            "last_updated_field": store.last_updated_field,
            "last_updated_type": store.last_updated_type,
        }
    )
    super(AliasingStore, self).__init__(**kwargs)

close(self)

Closes any connections

Source code in maggma/stores/advanced_stores.py
def close(self):
    self.store.close()

connect(self, force_reset=False)

Connect to the source data

Parameters:

Name Type Description Default
force_reset

whether to reset the connection or not

False
Source code in maggma/stores/advanced_stores.py
def connect(self, force_reset=False):
    self.store.connect(force_reset=force_reset)

count(self, criteria=None)

Counts the number of documents matching the query criteria

Parameters:

Name Type Description Default
criteria Optional[Dict]

PyMongo filter for documents to count in

None
Source code in maggma/stores/advanced_stores.py
def count(self, criteria: Optional[Dict] = None) -> int:
    """
    Counts the number of documents matching the query criteria

    Args:
        criteria: PyMongo filter for documents to count in
    """
    criteria = criteria if criteria else {}
    lazy_substitute(criteria, self.reverse_aliases)
    return self.store.count(criteria)

distinct(self, field, criteria=None, all_exist=False)

Get all distinct values for a field

Parameters:

Name Type Description Default
field str

the field(s) to get distinct values for

required
criteria Optional[Dict]

PyMongo filter for documents to search in

None
Source code in maggma/stores/advanced_stores.py
def distinct(
    self, field: str, criteria: Optional[Dict] = None, all_exist: bool = False
) -> List:
    """
    Get all distinct values for a field

    Args:
        field: the field(s) to get distinct values for
        criteria: PyMongo filter for documents to search in
    """
    criteria = criteria if criteria else {}
    lazy_substitute(criteria, self.reverse_aliases)

    # substitute forward
    return self.store.distinct(self.aliases[field], criteria=criteria)

ensure_index(self, key, unique=False, **kwargs)

Tries to create an index and return true if it suceeded

Parameters:

Name Type Description Default
key

single key to index

required
unique

Whether or not this index contains only unique keys

False

Returns:

Type Description

bool indicating if the index exists/was created

Source code in maggma/stores/advanced_stores.py
def ensure_index(self, key, unique=False, **kwargs):
    if key in self.aliases:
        key = self.aliases
    return self.store.ensure_index(key, unique, **kwargs)

groupby(self, keys, criteria=None, properties=None, sort=None, skip=0, limit=0)

Simple grouping function that will group documents by keys.

Parameters:

Name Type Description Default
keys Union[List[str], str]

fields to group documents

required
criteria Optional[Dict]

PyMongo filter for documents to search in

None
properties Union[Dict, List]

properties to return in grouped documents

None
sort Optional[Dict[str, Union[maggma.core.store.Sort, int]]]

Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending.

None
skip int

number documents to skip

0
limit int

limit on total number of documents returned

0

Returns:

Type Description
Iterator[Tuple[Dict, List[Dict]]]

generator returning tuples of (dict, list of docs)

Source code in maggma/stores/advanced_stores.py
def groupby(
    self,
    keys: Union[List[str], str],
    criteria: Optional[Dict] = None,
    properties: Union[Dict, List, None] = None,
    sort: Optional[Dict[str, Union[Sort, int]]] = None,
    skip: int = 0,
    limit: int = 0,
) -> Iterator[Tuple[Dict, List[Dict]]]:
    """
    Simple grouping function that will group documents
    by keys.

    Args:
        keys: fields to group documents
        criteria: PyMongo filter for documents to search in
        properties: properties to return in grouped documents
        sort: Dictionary of sort order for fields. Keys are field names and
            values are 1 for ascending or -1 for descending.
        skip: number documents to skip
        limit: limit on total number of documents returned

    Returns:
        generator returning tuples of (dict, list of docs)
    """
    # Convert to a list
    keys = keys if isinstance(keys, list) else [keys]

    # Make the aliasing transformations on keys
    keys = [self.aliases[k] if k in self.aliases else k for k in keys]

    # Update criteria and properties based on aliases
    criteria = criteria if criteria else {}

    if properties is not None:
        if isinstance(properties, list):
            properties = {p: 1 for p in properties}
        substitute(properties, self.reverse_aliases)

    lazy_substitute(criteria, self.reverse_aliases)

    return self.store.groupby(
        keys=keys, properties=properties, criteria=criteria, skip=skip, limit=limit
    )

query(self, criteria=None, properties=None, sort=None, skip=0, limit=0)

Queries the Store for a set of documents

Parameters:

Name Type Description Default
criteria Optional[Dict]

PyMongo filter for documents to search in

None
properties Union[Dict, List]

properties to return in grouped documents

None
sort Optional[Dict[str, Union[maggma.core.store.Sort, int]]]

Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending.

None
skip int

number documents to skip

0
limit int

limit on total number of documents returned

0
Source code in maggma/stores/advanced_stores.py
def query(
    self,
    criteria: Optional[Dict] = None,
    properties: Union[Dict, List, None] = None,
    sort: Optional[Dict[str, Union[Sort, int]]] = None,
    skip: int = 0,
    limit: int = 0,
) -> Iterator[Dict]:
    """
    Queries the Store for a set of documents

    Args:
        criteria: PyMongo filter for documents to search in
        properties: properties to return in grouped documents
        sort: Dictionary of sort order for fields. Keys are field names and
            values are 1 for ascending or -1 for descending.
        skip: number documents to skip
        limit: limit on total number of documents returned
    """

    criteria = criteria if criteria else {}

    if properties is not None:
        if isinstance(properties, list):
            properties = {p: 1 for p in properties}
        substitute(properties, self.reverse_aliases)

    lazy_substitute(criteria, self.reverse_aliases)
    for d in self.store.query(
        properties=properties, criteria=criteria, sort=sort, limit=limit, skip=skip
    ):
        substitute(d, self.aliases)
        yield d

remove_docs(self, criteria)

Remove docs matching the query dictionary

Parameters:

Name Type Description Default
criteria Dict

query dictionary to match

required
Source code in maggma/stores/advanced_stores.py
def remove_docs(self, criteria: Dict):
    """
    Remove docs matching the query dictionary

    Args:
        criteria: query dictionary to match
    """
    # Update criteria and properties based on aliases
    lazy_substitute(criteria, self.reverse_aliases)
    self.store.remove_docs(criteria)

update(self, docs, key=None)

Update documents into the Store

Parameters:

Name Type Description Default
docs Union[List[Dict], Dict]

the document or list of documents to update

required
key Union[List, str]

field name(s) to determine uniqueness for a document, can be a list of multiple fields, a single field, or None if the Store's key field is to be used

None
Source code in maggma/stores/advanced_stores.py
def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = None):
    """
    Update documents into the Store

    Args:
        docs: the document or list of documents to update
        key: field name(s) to determine uniqueness for a
             document, can be a list of multiple fields,
             a single field, or None if the Store's key
             field is to be used
    """
    key = key if key else self.key

    for d in docs:
        substitute(d, self.reverse_aliases)

    if key in self.aliases:
        key = self.aliases[key]

    self.store.update(docs, key=key)

MongograntStore (MongoStore)

Initialize a Store with a mongogrant "<role>:<host>/<db>." spec.

Some class methods of MongoStore, e.g. from_db_file and from_collection, are not supported.

mongogrant documentation: https://github.com/materialsproject/mongogrant

Source code in maggma/stores/advanced_stores.py
class MongograntStore(MongoStore):
    """Initialize a Store with a mongogrant "`<role>`:`<host>`/`<db>`." spec.

    Some class methods of MongoStore, e.g. from_db_file and from_collection,
    are not supported.

    mongogrant documentation: https://github.com/materialsproject/mongogrant
    """

    def __init__(
        self,
        mongogrant_spec: str,
        collection_name: str,
        mgclient_config_path: Optional[str] = None,
        **kwargs,
    ):
        """
        Args:
            mongogrant_spec: of the form `<role>`:`<host>`/`<db>`, where
                role is one of {"read", "readWrite"} or aliases {"ro", "rw"};
                host is a db host (w/ optional port) or alias; and db is a db
                on that host, or alias. See mongogrant documentation.
            collection_name: name of mongo collection
            mgclient_config_path: Path to mongogrant client config file,
               or None if default path (`mongogrant.client.path`).
        """
        self.mongogrant_spec = mongogrant_spec
        self.collection_name = collection_name
        self.mgclient_config_path = mgclient_config_path
        self._coll = None

        if self.mgclient_config_path:
            config = Config(check=check, path=self.mgclient_config_path)
            client = Client(config)
        else:
            client = Client()

        if set(("username", "password", "database", "host")) & set(kwargs):
            raise StoreError(
                "MongograntStore does not accept "
                "username, password, database, or host "
                "arguments. Use `mongogrant_spec`."
            )

        self.kwargs = kwargs
        _auth_info = client.get_db_auth_from_spec(self.mongogrant_spec)
        super(MongograntStore, self).__init__(
            host=_auth_info["host"],
            database=_auth_info["authSource"],
            username=_auth_info["username"],
            password=_auth_info["password"],
            collection_name=self.collection_name,
            **kwargs,
        )

    @property
    def name(self):
        return f"mgrant://{self.mongogrant_spec}/{self.collection_name}"

    def __hash__(self):
        return hash(
            (self.mongogrant_spec, self.collection_name, self.last_updated_field)
        )

    @classmethod
    def from_db_file(cls, file):
        """
        Raises ValueError since MongograntStores can't be initialized from a file
        """
        raise ValueError("MongograntStore doesn't implement from_db_file")

    @classmethod
    def from_collection(cls, collection):
        """
        Raises ValueError since MongograntStores can't be initialized from a PyMongo collection
        """
        raise ValueError("MongograntStore doesn't implement from_collection")

    def __eq__(self, other: object) -> bool:
        """
        Check equality for MongograntStore

        Args:
            other: other MongograntStore to compare with
        """
        if not isinstance(other, MongograntStore):
            return False

        fields = [
            "mongogrant_spec",
            "collection_name",
            "mgclient_config_path",
            "last_updated_field",
        ]
        return all(getattr(self, f) == getattr(other, f) for f in fields)

name property readonly

Return a string representing this data source

__eq__(self, other) special

Check equality for MongograntStore

Parameters:

Name Type Description Default
other object

other MongograntStore to compare with

required
Source code in maggma/stores/advanced_stores.py
def __eq__(self, other: object) -> bool:
    """
    Check equality for MongograntStore

    Args:
        other: other MongograntStore to compare with
    """
    if not isinstance(other, MongograntStore):
        return False

    fields = [
        "mongogrant_spec",
        "collection_name",
        "mgclient_config_path",
        "last_updated_field",
    ]
    return all(getattr(self, f) == getattr(other, f) for f in fields)

__init__(self, mongogrant_spec, collection_name, mgclient_config_path=None, **kwargs) special

Parameters:

Name Type Description Default
mongogrant_spec str

of the form <role>:<host>/<db>, where role is one of {"read", "readWrite"} or aliases {"ro", "rw"}; host is a db host (w/ optional port) or alias; and db is a db on that host, or alias. See mongogrant documentation.

required
collection_name str

name of mongo collection

required
mgclient_config_path Optional[str]

Path to mongogrant client config file, or None if default path (mongogrant.client.path).

None
Source code in maggma/stores/advanced_stores.py
def __init__(
    self,
    mongogrant_spec: str,
    collection_name: str,
    mgclient_config_path: Optional[str] = None,
    **kwargs,
):
    """
    Args:
        mongogrant_spec: of the form `<role>`:`<host>`/`<db>`, where
            role is one of {"read", "readWrite"} or aliases {"ro", "rw"};
            host is a db host (w/ optional port) or alias; and db is a db
            on that host, or alias. See mongogrant documentation.
        collection_name: name of mongo collection
        mgclient_config_path: Path to mongogrant client config file,
           or None if default path (`mongogrant.client.path`).
    """
    self.mongogrant_spec = mongogrant_spec
    self.collection_name = collection_name
    self.mgclient_config_path = mgclient_config_path
    self._coll = None

    if self.mgclient_config_path:
        config = Config(check=check, path=self.mgclient_config_path)
        client = Client(config)
    else:
        client = Client()

    if set(("username", "password", "database", "host")) & set(kwargs):
        raise StoreError(
            "MongograntStore does not accept "
            "username, password, database, or host "
            "arguments. Use `mongogrant_spec`."
        )

    self.kwargs = kwargs
    _auth_info = client.get_db_auth_from_spec(self.mongogrant_spec)
    super(MongograntStore, self).__init__(
        host=_auth_info["host"],
        database=_auth_info["authSource"],
        username=_auth_info["username"],
        password=_auth_info["password"],
        collection_name=self.collection_name,
        **kwargs,
    )

from_collection(collection) classmethod

Raises ValueError since MongograntStores can't be initialized from a PyMongo collection

Source code in maggma/stores/advanced_stores.py
@classmethod
def from_collection(cls, collection):
    """
    Raises ValueError since MongograntStores can't be initialized from a PyMongo collection
    """
    raise ValueError("MongograntStore doesn't implement from_collection")

from_db_file(file) classmethod

Raises ValueError since MongograntStores can't be initialized from a file

Source code in maggma/stores/advanced_stores.py
@classmethod
def from_db_file(cls, file):
    """
    Raises ValueError since MongograntStores can't be initialized from a file
    """
    raise ValueError("MongograntStore doesn't implement from_db_file")

SandboxStore (Store)

Provides a sandboxed view to another store

Source code in maggma/stores/advanced_stores.py
class SandboxStore(Store):
    """
    Provides a sandboxed view to another store
    """

    def __init__(self, store: Store, sandbox: str, exclusive: bool = False):
        """
        Args:
            store: store to wrap sandboxing around
            sandbox: the corresponding sandbox
            exclusive: whether to be exclusively in this sandbox or include global items
        """
        self.store = store
        self.sandbox = sandbox
        self.exclusive = exclusive
        super().__init__(
            key=self.store.key,
            last_updated_field=self.store.last_updated_field,
            last_updated_type=self.store.last_updated_type,
            validator=self.store.validator,
        )

    @property
    def name(self) -> str:
        """
        Returns:
            a string representing this data source
        """
        return f"Sandbox[{self.store.name}][{self.sandbox}]"

    @property
    def sbx_criteria(self) -> Dict:
        """
        Returns:
            the sandbox criteria dict used to filter the source store
        """
        if self.exclusive:
            return {"sbxn": self.sandbox}
        else:
            return {
                "$or": [{"sbxn": {"$in": [self.sandbox]}}, {"sbxn": {"$exists": False}}]
            }

    def count(self, criteria: Optional[Dict] = None) -> int:
        """
        Counts the number of documents matching the query criteria

        Args:
            criteria: PyMongo filter for documents to count in
        """
        criteria = (
            dict(**criteria, **self.sbx_criteria) if criteria else self.sbx_criteria
        )
        return self.store.count(criteria=criteria)

    def query(
        self,
        criteria: Optional[Dict] = None,
        properties: Union[Dict, List, None] = None,
        sort: Optional[Dict[str, Union[Sort, int]]] = None,
        skip: int = 0,
        limit: int = 0,
    ) -> Iterator[Dict]:
        """
        Queries the Store for a set of documents

        Args:
            criteria: PyMongo filter for documents to search in
            properties: properties to return in grouped documents
            sort: Dictionary of sort order for fields. Keys are field names and
                values are 1 for ascending or -1 for descending.
            skip: number documents to skip
            limit: limit on total number of documents returned
        """
        criteria = (
            dict(**criteria, **self.sbx_criteria) if criteria else self.sbx_criteria
        )
        return self.store.query(
            properties=properties, criteria=criteria, sort=sort, limit=limit, skip=skip
        )

    def groupby(
        self,
        keys: Union[List[str], str],
        criteria: Optional[Dict] = None,
        properties: Union[Dict, List, None] = None,
        sort: Optional[Dict[str, Union[Sort, int]]] = None,
        skip: int = 0,
        limit: int = 0,
    ) -> Iterator[Tuple[Dict, List[Dict]]]:
        """
        Simple grouping function that will group documents
        by keys.

        Args:
            keys: fields to group documents
            criteria: PyMongo filter for documents to search in
            properties: properties to return in grouped documents
            sort: Dictionary of sort order for fields. Keys are field names and
                values are 1 for ascending or -1 for descending.
            skip: number documents to skip
            limit: limit on total number of documents returned

        Returns:
            generator returning tuples of (dict, list of docs)
        """
        criteria = (
            dict(**criteria, **self.sbx_criteria) if criteria else self.sbx_criteria
        )

        return self.store.groupby(
            keys=keys, properties=properties, criteria=criteria, skip=skip, limit=limit
        )

    def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = None):
        """
        Update documents into the Store

        Args:
            docs: the document or list of documents to update
            key: field name(s) to determine uniqueness for a
                 document, can be a list of multiple fields,
                 a single field, or None if the Store's key
                 field is to be used
        """
        for d in docs:
            if "sbxn" in d:
                d["sbxn"] = list(set(d["sbxn"] + [self.sandbox]))
            else:
                d["sbxn"] = [self.sandbox]

        self.store.update(docs, key=key)

    def remove_docs(self, criteria: Dict):
        """
        Remove docs matching the query dictionary

        Args:
            criteria: query dictionary to match
        """
        # Update criteria and properties based on aliases
        criteria = (
            dict(**criteria, **self.sbx_criteria) if criteria else self.sbx_criteria
        )
        self.store.remove_docs(criteria)

    def ensure_index(self, key, unique=False, **kwargs):
        return self.store.ensure_index(key, unique, **kwargs)

    def close(self):
        self.store.close()

    @property
    def _collection(self):
        return self.store._collection

    def connect(self, force_reset=False):
        self.store.connect(force_reset=force_reset)

    def __eq__(self, other: object) -> bool:
        """
        Check equality for SandboxStore

        Args:
            other: other SandboxStore to compare with
        """
        if not isinstance(other, SandboxStore):
            return False

        fields = ["store", "sandbox", "last_updated_field"]
        return all(getattr(self, f) == getattr(other, f) for f in fields)

name: str property readonly

Returns:

Type Description
str

a string representing this data source

sbx_criteria: Dict property readonly

Returns:

Type Description
Dict

the sandbox criteria dict used to filter the source store

__eq__(self, other) special

Check equality for SandboxStore

Parameters:

Name Type Description Default
other object

other SandboxStore to compare with

required
Source code in maggma/stores/advanced_stores.py
def __eq__(self, other: object) -> bool:
    """
    Check equality for SandboxStore

    Args:
        other: other SandboxStore to compare with
    """
    if not isinstance(other, SandboxStore):
        return False

    fields = ["store", "sandbox", "last_updated_field"]
    return all(getattr(self, f) == getattr(other, f) for f in fields)

__init__(self, store, sandbox, exclusive=False) special

Parameters:

Name Type Description Default
store Store

store to wrap sandboxing around

required
sandbox str

the corresponding sandbox

required
exclusive bool

whether to be exclusively in this sandbox or include global items

False
Source code in maggma/stores/advanced_stores.py
def __init__(self, store: Store, sandbox: str, exclusive: bool = False):
    """
    Args:
        store: store to wrap sandboxing around
        sandbox: the corresponding sandbox
        exclusive: whether to be exclusively in this sandbox or include global items
    """
    self.store = store
    self.sandbox = sandbox
    self.exclusive = exclusive
    super().__init__(
        key=self.store.key,
        last_updated_field=self.store.last_updated_field,
        last_updated_type=self.store.last_updated_type,
        validator=self.store.validator,
    )

close(self)

Closes any connections

Source code in maggma/stores/advanced_stores.py
def close(self):
    self.store.close()

connect(self, force_reset=False)

Connect to the source data

Parameters:

Name Type Description Default
force_reset

whether to reset the connection or not

False
Source code in maggma/stores/advanced_stores.py
def connect(self, force_reset=False):
    self.store.connect(force_reset=force_reset)

count(self, criteria=None)

Counts the number of documents matching the query criteria

Parameters:

Name Type Description Default
criteria Optional[Dict]

PyMongo filter for documents to count in

None
Source code in maggma/stores/advanced_stores.py
def count(self, criteria: Optional[Dict] = None) -> int:
    """
    Counts the number of documents matching the query criteria

    Args:
        criteria: PyMongo filter for documents to count in
    """
    criteria = (
        dict(**criteria, **self.sbx_criteria) if criteria else self.sbx_criteria
    )
    return self.store.count(criteria=criteria)

ensure_index(self, key, unique=False, **kwargs)

Tries to create an index and return true if it suceeded

Parameters:

Name Type Description Default
key

single key to index

required
unique

Whether or not this index contains only unique keys

False

Returns:

Type Description

bool indicating if the index exists/was created

Source code in maggma/stores/advanced_stores.py
def ensure_index(self, key, unique=False, **kwargs):
    return self.store.ensure_index(key, unique, **kwargs)

groupby(self, keys, criteria=None, properties=None, sort=None, skip=0, limit=0)

Simple grouping function that will group documents by keys.

Parameters:

Name Type Description Default
keys Union[List[str], str]

fields to group documents

required
criteria Optional[Dict]

PyMongo filter for documents to search in

None
properties Union[Dict, List]

properties to return in grouped documents

None
sort Optional[Dict[str, Union[maggma.core.store.Sort, int]]]

Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending.

None
skip int

number documents to skip

0
limit int

limit on total number of documents returned

0

Returns:

Type Description
Iterator[Tuple[Dict, List[Dict]]]

generator returning tuples of (dict, list of docs)

Source code in maggma/stores/advanced_stores.py
def groupby(
    self,
    keys: Union[List[str], str],
    criteria: Optional[Dict] = None,
    properties: Union[Dict, List, None] = None,
    sort: Optional[Dict[str, Union[Sort, int]]] = None,
    skip: int = 0,
    limit: int = 0,
) -> Iterator[Tuple[Dict, List[Dict]]]:
    """
    Simple grouping function that will group documents
    by keys.

    Args:
        keys: fields to group documents
        criteria: PyMongo filter for documents to search in
        properties: properties to return in grouped documents
        sort: Dictionary of sort order for fields. Keys are field names and
            values are 1 for ascending or -1 for descending.
        skip: number documents to skip
        limit: limit on total number of documents returned

    Returns:
        generator returning tuples of (dict, list of docs)
    """
    criteria = (
        dict(**criteria, **self.sbx_criteria) if criteria else self.sbx_criteria
    )

    return self.store.groupby(
        keys=keys, properties=properties, criteria=criteria, skip=skip, limit=limit
    )

query(self, criteria=None, properties=None, sort=None, skip=0, limit=0)

Queries the Store for a set of documents

Parameters:

Name Type Description Default
criteria Optional[Dict]

PyMongo filter for documents to search in

None
properties Union[Dict, List]

properties to return in grouped documents

None
sort Optional[Dict[str, Union[maggma.core.store.Sort, int]]]

Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending.

None
skip int

number documents to skip

0
limit int

limit on total number of documents returned

0
Source code in maggma/stores/advanced_stores.py
def query(
    self,
    criteria: Optional[Dict] = None,
    properties: Union[Dict, List, None] = None,
    sort: Optional[Dict[str, Union[Sort, int]]] = None,
    skip: int = 0,
    limit: int = 0,
) -> Iterator[Dict]:
    """
    Queries the Store for a set of documents

    Args:
        criteria: PyMongo filter for documents to search in
        properties: properties to return in grouped documents
        sort: Dictionary of sort order for fields. Keys are field names and
            values are 1 for ascending or -1 for descending.
        skip: number documents to skip
        limit: limit on total number of documents returned
    """
    criteria = (
        dict(**criteria, **self.sbx_criteria) if criteria else self.sbx_criteria
    )
    return self.store.query(
        properties=properties, criteria=criteria, sort=sort, limit=limit, skip=skip
    )

remove_docs(self, criteria)

Remove docs matching the query dictionary

Parameters:

Name Type Description Default
criteria Dict

query dictionary to match

required
Source code in maggma/stores/advanced_stores.py
def remove_docs(self, criteria: Dict):
    """
    Remove docs matching the query dictionary

    Args:
        criteria: query dictionary to match
    """
    # Update criteria and properties based on aliases
    criteria = (
        dict(**criteria, **self.sbx_criteria) if criteria else self.sbx_criteria
    )
    self.store.remove_docs(criteria)

update(self, docs, key=None)

Update documents into the Store

Parameters:

Name Type Description Default
docs Union[List[Dict], Dict]

the document or list of documents to update

required
key Union[List, str]

field name(s) to determine uniqueness for a document, can be a list of multiple fields, a single field, or None if the Store's key field is to be used

None
Source code in maggma/stores/advanced_stores.py
def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = None):
    """
    Update documents into the Store

    Args:
        docs: the document or list of documents to update
        key: field name(s) to determine uniqueness for a
             document, can be a list of multiple fields,
             a single field, or None if the Store's key
             field is to be used
    """
    for d in docs:
        if "sbxn" in d:
            d["sbxn"] = list(set(d["sbxn"] + [self.sandbox]))
        else:
            d["sbxn"] = [self.sandbox]

    self.store.update(docs, key=key)

VaultStore (MongoStore)

Extends MongoStore to read credentials out of Vault server and uses these values to initialize MongoStore instance

Source code in maggma/stores/advanced_stores.py
class VaultStore(MongoStore):
    """
    Extends MongoStore to read credentials out of Vault server
    and uses these values to initialize MongoStore instance
    """

    @requires(hvac is not None, "hvac is required to use VaultStore")
    def __init__(self, collection_name: str, vault_secret_path: str):
        """
        Args:
            collection_name: name of mongo collection
            vault_secret_path: path on vault server with mongo creds object

        Important:
            Environment variables that must be set prior to invocation
            VAULT_ADDR - URL of vault server (eg. https://matgen8.lbl.gov:8200)
            VAULT_TOKEN or GITHUB_TOKEN - token used to authenticate to vault
        """
        self.collection_name = collection_name
        self.vault_secret_path = vault_secret_path

        # TODO: Switch this over to Pydantic ConfigSettings
        vault_addr = os.getenv("VAULT_ADDR")

        if not vault_addr:
            raise RuntimeError("VAULT_ADDR not set")

        client = hvac.Client(vault_addr)

        # If we have a vault token use this
        token = os.getenv("VAULT_TOKEN")

        # Look for a github token instead
        if not token:
            github_token = os.getenv("GITHUB_TOKEN")

            if github_token:
                client.auth_github(github_token)
            else:
                raise RuntimeError("VAULT_TOKEN or GITHUB_TOKEN not set")
        else:
            client.token = token
            if not client.is_authenticated():
                raise RuntimeError("Bad token")

        # Read the vault secret
        json_db_creds = client.read(vault_secret_path)
        db_creds = json.loads(json_db_creds["data"]["value"])

        database = db_creds.get("db")
        host = db_creds.get("host", "localhost")
        port = db_creds.get("port", 27017)
        username = db_creds.get("username", "")
        password = db_creds.get("password", "")

        super(VaultStore, self).__init__(
            database, collection_name, host, port, username, password
        )

    def __eq__(self, other: object) -> bool:
        """
        Check equality for VaultStore

        Args:
            other: other VaultStore to compare with
        """
        if not isinstance(other, VaultStore):
            return False

        fields = ["vault_secret_path", "collection_name", "last_updated_field"]
        return all(getattr(self, f) == getattr(other, f) for f in fields)

__eq__(self, other) special

Check equality for VaultStore

Parameters:

Name Type Description Default
other object

other VaultStore to compare with

required
Source code in maggma/stores/advanced_stores.py
def __eq__(self, other: object) -> bool:
    """
    Check equality for VaultStore

    Args:
        other: other VaultStore to compare with
    """
    if not isinstance(other, VaultStore):
        return False

    fields = ["vault_secret_path", "collection_name", "last_updated_field"]
    return all(getattr(self, f) == getattr(other, f) for f in fields)

__init__(self, collection_name, vault_secret_path) special

Parameters:

Name Type Description Default
collection_name str

name of mongo collection

required
vault_secret_path str

path on vault server with mongo creds object

required

Important

Environment variables that must be set prior to invocation VAULT_ADDR - URL of vault server (eg. https://matgen8.lbl.gov:8200) VAULT_TOKEN or GITHUB_TOKEN - token used to authenticate to vault

Source code in maggma/stores/advanced_stores.py
@requires(hvac is not None, "hvac is required to use VaultStore")
def __init__(self, collection_name: str, vault_secret_path: str):
    """
    Args:
        collection_name: name of mongo collection
        vault_secret_path: path on vault server with mongo creds object

    Important:
        Environment variables that must be set prior to invocation
        VAULT_ADDR - URL of vault server (eg. https://matgen8.lbl.gov:8200)
        VAULT_TOKEN or GITHUB_TOKEN - token used to authenticate to vault
    """
    self.collection_name = collection_name
    self.vault_secret_path = vault_secret_path

    # TODO: Switch this over to Pydantic ConfigSettings
    vault_addr = os.getenv("VAULT_ADDR")

    if not vault_addr:
        raise RuntimeError("VAULT_ADDR not set")

    client = hvac.Client(vault_addr)

    # If we have a vault token use this
    token = os.getenv("VAULT_TOKEN")

    # Look for a github token instead
    if not token:
        github_token = os.getenv("GITHUB_TOKEN")

        if github_token:
            client.auth_github(github_token)
        else:
            raise RuntimeError("VAULT_TOKEN or GITHUB_TOKEN not set")
    else:
        client.token = token
        if not client.is_authenticated():
            raise RuntimeError("Bad token")

    # Read the vault secret
    json_db_creds = client.read(vault_secret_path)
    db_creds = json.loads(json_db_creds["data"]["value"])

    database = db_creds.get("db")
    host = db_creds.get("host", "localhost")
    port = db_creds.get("port", 27017)
    username = db_creds.get("username", "")
    password = db_creds.get("password", "")

    super(VaultStore, self).__init__(
        database, collection_name, host, port, username, password
    )

Special stores that combine underlying Stores together

ConcatStore (Store)

Store concatting multiple stores

Source code in maggma/stores/compound_stores.py
class ConcatStore(Store):
    """Store concatting multiple stores"""

    def __init__(self, stores: List[Store], **kwargs):
        """
        Initialize a ConcatStore that concatenates multiple stores together
        to appear as one store

        Args:
            stores: list of stores to concatenate together
        """
        self.stores = stores
        self.kwargs = kwargs
        super(ConcatStore, self).__init__(**kwargs)

    @property
    def name(self) -> str:
        """
        A string representing this data source
        """
        compound_name = ",".join([store.name for store in self.stores])
        return f"Concat[{compound_name}]"

    def connect(self, force_reset: bool = False):
        """
        Connect all stores in this ConcatStore

        Args:
            force_reset: Whether to forcibly reset the connection for all stores
        """
        for store in self.stores:
            store.connect(force_reset)

    def close(self):
        """
        Close all connections in this ConcatStore
        """
        for store in self.stores:
            store.close()

    @property
    def _collection(self):
        raise NotImplementedError("No collection property for ConcatStore")

    @property
    def last_updated(self) -> datetime:
        """
        Finds the most recent last_updated across all the stores.
        This might not be the most usefull way to do this for this type of Store
        since it could very easily over-estimate the last_updated based on what stores
        are used
        """
        lus = []
        for store in self.stores:
            lu = store.last_updated
            lus.append(lu)
        return max(lus)

    def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = None):
        """
        Update documents into the Store
        Not implemented in ConcatStore

        Args:
            docs: the document or list of documents to update
            key: field name(s) to determine uniqueness for a
                 document, can be a list of multiple fields,
                 a single field, or None if the Store's key
                 field is to be used
        """
        raise NotImplementedError("No update method for ConcatStore")

    def distinct(
        self, field: str, criteria: Optional[Dict] = None, all_exist: bool = False
    ) -> List:
        """
        Get all distinct values for a field

        Args:
            field: the field(s) to get distinct values for
            criteria: PyMongo filter for documents to search in
        """
        distincts = []
        for store in self.stores:
            distincts.extend(store.distinct(field=field, criteria=criteria))

        return list(set(distincts))

    def ensure_index(self, key: str, unique: bool = False) -> bool:
        """
        Ensure an index is properly set. Returns whether all stores support this index or not

        Args:
            key: single key to index
            unique: Whether or not this index contains only unique keys

        Returns:
            bool indicating if the index exists/was created on all stores
        """
        return all([store.ensure_index(key, unique) for store in self.stores])

    def count(self, criteria: Optional[Dict] = None) -> int:
        """
        Counts the number of documents matching the query criteria

        Args:
            criteria: PyMongo filter for documents to count in
        """
        counts = [store.count(criteria) for store in self.stores]

        return sum(counts)

    def query(
        self,
        criteria: Optional[Dict] = None,
        properties: Union[Dict, List, None] = None,
        sort: Optional[Dict[str, Union[Sort, int]]] = None,
        skip: int = 0,
        limit: int = 0,
    ) -> Iterator[Dict]:
        """
        Queries across all Store for a set of documents

        Args:
            criteria: PyMongo filter for documents to search in
            properties: properties to return in grouped documents
            sort: Dictionary of sort order for fields. Keys are field names and
                values are 1 for ascending or -1 for descending.
            skip: number documents to skip
            limit: limit on total number of documents returned
        """
        # TODO: skip, sort and limit are broken. implement properly
        for store in self.stores:
            for d in store.query(criteria=criteria, properties=properties):
                yield d

    def groupby(
        self,
        keys: Union[List[str], str],
        criteria: Optional[Dict] = None,
        properties: Union[Dict, List, None] = None,
        sort: Optional[Dict[str, Union[Sort, int]]] = None,
        skip<