Skip to content

Store

Module containing the core Store definition.

DateTimeFormat

Bases: Enum

Datetime format in store document.

Source code in src/maggma/core/store.py
27
28
29
30
31
class DateTimeFormat(Enum):
    """Datetime format in store document."""

    DateTime = "datetime"
    IsoFormat = "isoformat"

Sort

Bases: Enum

Enumeration for sorting order.

Source code in src/maggma/core/store.py
20
21
22
23
24
class Sort(Enum):
    """Enumeration for sorting order."""

    Ascending = 1
    Descending = -1

Store

Bases: MSONable

Abstract class for a data Store Defines the interface for all data going in and out of a Builder.

Source code in src/maggma/core/store.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
class Store(MSONable, metaclass=ABCMeta):
    """
    Abstract class for a data Store
    Defines the interface for all data going in and out of a Builder.
    """

    def __init__(
        self,
        key: str = "task_id",
        last_updated_field: str = "last_updated",
        last_updated_type: DateTimeFormat = DateTimeFormat("datetime"),  # noqa: B008
        validator: Optional[Validator] = None,
    ):
        """
        Args:
            key: main key to index on
            last_updated_field: field for date/time stamping the data
            last_updated_type: the date/time format for the last_updated_field.
                                Can be "datetime" or "isoformat"
            validator: Validator to validate documents going into the store.
        """
        self.key = key
        self.last_updated_field = last_updated_field
        self.last_updated_type = last_updated_type
        self._lu_func: tuple[Callable, Callable] = (
            LU_KEY_ISOFORMAT if DateTimeFormat(last_updated_type) == DateTimeFormat.IsoFormat else (identity, identity)
        )
        self.validator = validator
        self.logger = logging.getLogger(type(self).__name__)
        self.logger.addHandler(logging.NullHandler())

    @abstractproperty
    def _collection(self):
        """
        Returns a handle to the pymongo collection object.
        """

    @abstractproperty
    def name(self) -> str:
        """
        Return a string representing this data source.
        """

    @abstractmethod
    def connect(self, force_reset: bool = False):
        """
        Connect to the source data.

        Args:
            force_reset: whether to reset the connection or not
        """

    @abstractmethod
    def close(self):
        """
        Closes any connections.
        """

    @abstractmethod
    def count(self, criteria: Optional[dict] = None) -> int:
        """
        Counts the number of documents matching the query criteria.

        Args:
            criteria: PyMongo filter for documents to count in
        """

    @abstractmethod
    def query(
        self,
        criteria: Optional[dict] = None,
        properties: Union[dict, list, None] = None,
        sort: Optional[dict[str, Union[Sort, int]]] = None,
        skip: int = 0,
        limit: int = 0,
    ) -> Iterator[dict]:
        """
        Queries the Store for a set of documents.

        Args:
            criteria: PyMongo filter for documents to search in
            properties: properties to return in grouped documents
            sort: Dictionary of sort order for fields. Keys are field names and
                values are 1 for ascending or -1 for descending.
            skip: number documents to skip
            limit: limit on total number of documents returned
        """

    @abstractmethod
    def update(self, docs: Union[list[dict], dict], key: Union[list, str, None] = None):
        """
        Update documents into the Store.

        Args:
            docs: the document or list of documents to update
            key: field name(s) to determine uniqueness for a
                 document, can be a list of multiple fields,
                 a single field, or None if the Store's key
                 field is to be used
        """

    @abstractmethod
    def ensure_index(self, key: str, unique: bool = False) -> bool:
        """
        Tries to create an index and return true if it succeeded.

        Args:
            key: single key to index
            unique: Whether or not this index contains only unique keys

        Returns:
            bool indicating if the index exists/was created
        """

    @abstractmethod
    def groupby(
        self,
        keys: Union[list[str], str],
        criteria: Optional[dict] = None,
        properties: Union[dict, list, None] = None,
        sort: Optional[dict[str, Union[Sort, int]]] = None,
        skip: int = 0,
        limit: int = 0,
    ) -> Iterator[tuple[dict, list[dict]]]:
        """
        Simple grouping function that will group documents
        by keys.

        Args:
            keys: fields to group documents
            criteria: PyMongo filter for documents to search in
            properties: properties to return in grouped documents
            sort: Dictionary of sort order for fields. Keys are field names and
                values are 1 for ascending or -1 for descending.
            skip: number documents to skip
            limit: limit on total number of documents returned

        Returns:
            generator returning tuples of (dict, list of docs)
        """

    @abstractmethod
    def remove_docs(self, criteria: dict):
        """
        Remove docs matching the query dictionary.

        Args:
            criteria: query dictionary to match
        """

    def query_one(
        self,
        criteria: Optional[dict] = None,
        properties: Union[dict, list, None] = None,
        sort: Optional[dict[str, Union[Sort, int]]] = None,
    ):
        """
        Queries the Store for a single document.

        Args:
            criteria: PyMongo filter for documents to search
            properties: properties to return in the document
            sort: Dictionary of sort order for fields. Keys are field names and
                values are 1 for ascending or -1 for descending.
        """
        return next(self.query(criteria=criteria, properties=properties, sort=sort), None)

    def distinct(self, field: str, criteria: Optional[dict] = None, all_exist: bool = False) -> list:
        """
        Get all distinct values for a field.

        Args:
            field: the field(s) to get distinct values for
            criteria: PyMongo filter for documents to search in
        """
        criteria = criteria or {}

        results = [key for key, _ in self.groupby(field, properties=[field], criteria=criteria)]
        return [get(r, field) for r in results]

    @property
    def last_updated(self) -> datetime:
        """
        Provides the most recent last_updated date time stamp from
        the documents in this Store.
        """
        doc = next(
            self.query(
                properties=[self.last_updated_field],
                sort={self.last_updated_field: -1},
                limit=1,
            ),
            None,
        )
        if doc and not has(doc, self.last_updated_field):
            raise StoreError(
                f"No field '{self.last_updated_field}' in store document. Please ensure Store.last_updated_field "
                "is a datetime field in your store that represents the time of "
                "last update to each document."
            )
        if not doc or get(doc, self.last_updated_field) is None:
            # Handle when collection has docs but `NoneType` last_updated_field.
            return datetime.min

        return self._lu_func[0](get(doc, self.last_updated_field))

    def newer_in(self, target: "Store", criteria: Optional[dict] = None, exhaustive: bool = False) -> list[str]:
        """
        Returns the keys of documents that are newer in the target
        Store than this Store.

        Args:
            target: target Store to
            criteria: PyMongo filter for documents to search in
            exhaustive: triggers an item-by-item check vs. checking
                        the last_updated of the target Store and using
                        that to filter out new items in
        """
        self.ensure_index(self.key)
        self.ensure_index(self.last_updated_field)

        if exhaustive:
            # Get our current last_updated dates for each key value
            props = {self.key: 1, self.last_updated_field: 1, "_id": 0}
            dates = {
                d[self.key]: self._lu_func[0](d.get(self.last_updated_field, datetime.max))
                for d in self.query(properties=props)
            }

            # Get the last_updated for the store we're comparing with
            props = {target.key: 1, target.last_updated_field: 1, "_id": 0}
            target_dates = {
                d[target.key]: target._lu_func[0](d.get(target.last_updated_field, datetime.min))
                for d in target.query(criteria=criteria, properties=props)
            }

            new_keys = set(target_dates.keys()) - set(dates.keys())
            updated_keys = {key for key, date in dates.items() if target_dates.get(key, datetime.min) > date}

            return list(new_keys | updated_keys)

        criteria = {self.last_updated_field: {"$gt": self._lu_func[1](self.last_updated)}}
        return target.distinct(field=self.key, criteria=criteria)

    @deprecated(message="Please use Store.newer_in")
    def lu_filter(self, targets):
        """Creates a MongoDB filter for new documents.

        By "new", we mean documents in this Store that were last updated later
        than any document in targets.

        Args:
            targets (list): A list of Stores

        """
        if isinstance(targets, Store):
            targets = [targets]

        lu_list = [t.last_updated for t in targets]
        return {self.last_updated_field: {"$gt": self._lu_func[1](max(lu_list))}}

    @deprecated(message="Use Store.newer_in")
    def updated_keys(self, target, criteria=None):
        """
        Returns keys for docs that are newer in the target store in comparison
        with this store when comparing the last updated field (last_updated_field).

        Args:
            target (Store): store to look for updated documents
            criteria (dict): mongo query to limit scope

        Returns:
            list of keys that have been updated in target store
        """
        self.ensure_index(self.key)
        self.ensure_index(self.last_updated_field)

        return self.newer_in(target, criteria=criteria)

    def __ne__(self, other):
        return not self == other

    def __getstate__(self):
        return self.as_dict()

    def __setstate__(self, d):
        d = {k: v for k, v in d.items() if not k.startswith("@")}
        d = MontyDecoder().process_decoded(d)
        self.__init__(**d)

    def __enter__(self):
        self.connect()
        return self

    def __exit__(self, exception_type, exception_value, traceback):
        self.close()

last_updated: datetime property

Provides the most recent last_updated date time stamp from the documents in this Store.

__init__(key='task_id', last_updated_field='last_updated', last_updated_type=DateTimeFormat('datetime'), validator=None)

Parameters:

Name Type Description Default
key str

main key to index on

'task_id'
last_updated_field str

field for date/time stamping the data

'last_updated'
last_updated_type DateTimeFormat

the date/time format for the last_updated_field. Can be "datetime" or "isoformat"

DateTimeFormat('datetime')
validator Optional[Validator]

Validator to validate documents going into the store.

None
Source code in src/maggma/core/store.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def __init__(
    self,
    key: str = "task_id",
    last_updated_field: str = "last_updated",
    last_updated_type: DateTimeFormat = DateTimeFormat("datetime"),  # noqa: B008
    validator: Optional[Validator] = None,
):
    """
    Args:
        key: main key to index on
        last_updated_field: field for date/time stamping the data
        last_updated_type: the date/time format for the last_updated_field.
                            Can be "datetime" or "isoformat"
        validator: Validator to validate documents going into the store.
    """
    self.key = key
    self.last_updated_field = last_updated_field
    self.last_updated_type = last_updated_type
    self._lu_func: tuple[Callable, Callable] = (
        LU_KEY_ISOFORMAT if DateTimeFormat(last_updated_type) == DateTimeFormat.IsoFormat else (identity, identity)
    )
    self.validator = validator
    self.logger = logging.getLogger(type(self).__name__)
    self.logger.addHandler(logging.NullHandler())

close() abstractmethod

Closes any connections.

Source code in src/maggma/core/store.py
86
87
88
89
90
@abstractmethod
def close(self):
    """
    Closes any connections.
    """

connect(force_reset=False) abstractmethod

Connect to the source data.

Parameters:

Name Type Description Default
force_reset bool

whether to reset the connection or not

False
Source code in src/maggma/core/store.py
77
78
79
80
81
82
83
84
@abstractmethod
def connect(self, force_reset: bool = False):
    """
    Connect to the source data.

    Args:
        force_reset: whether to reset the connection or not
    """

count(criteria=None) abstractmethod

Counts the number of documents matching the query criteria.

Parameters:

Name Type Description Default
criteria Optional[dict]

PyMongo filter for documents to count in

None
Source code in src/maggma/core/store.py
92
93
94
95
96
97
98
99
@abstractmethod
def count(self, criteria: Optional[dict] = None) -> int:
    """
    Counts the number of documents matching the query criteria.

    Args:
        criteria: PyMongo filter for documents to count in
    """

distinct(field, criteria=None, all_exist=False)

Get all distinct values for a field.

Parameters:

Name Type Description Default
field str

the field(s) to get distinct values for

required
criteria Optional[dict]

PyMongo filter for documents to search in

None
Source code in src/maggma/core/store.py
201
202
203
204
205
206
207
208
209
210
211
212
def distinct(self, field: str, criteria: Optional[dict] = None, all_exist: bool = False) -> list:
    """
    Get all distinct values for a field.

    Args:
        field: the field(s) to get distinct values for
        criteria: PyMongo filter for documents to search in
    """
    criteria = criteria or {}

    results = [key for key, _ in self.groupby(field, properties=[field], criteria=criteria)]
    return [get(r, field) for r in results]

ensure_index(key, unique=False) abstractmethod

Tries to create an index and return true if it succeeded.

Parameters:

Name Type Description Default
key str

single key to index

required
unique bool

Whether or not this index contains only unique keys

False

Returns:

Type Description
bool

bool indicating if the index exists/was created

Source code in src/maggma/core/store.py
135
136
137
138
139
140
141
142
143
144
145
146
@abstractmethod
def ensure_index(self, key: str, unique: bool = False) -> bool:
    """
    Tries to create an index and return true if it succeeded.

    Args:
        key: single key to index
        unique: Whether or not this index contains only unique keys

    Returns:
        bool indicating if the index exists/was created
    """

groupby(keys, criteria=None, properties=None, sort=None, skip=0, limit=0) abstractmethod

Simple grouping function that will group documents by keys.

Parameters:

Name Type Description Default
keys Union[list[str], str]

fields to group documents

required
criteria Optional[dict]

PyMongo filter for documents to search in

None
properties Union[dict, list, None]

properties to return in grouped documents

None
sort Optional[dict[str, Union[Sort, int]]]

Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending.

None
skip int

number documents to skip

0
limit int

limit on total number of documents returned

0

Returns:

Type Description
Iterator[tuple[dict, list[dict]]]

generator returning tuples of (dict, list of docs)

Source code in src/maggma/core/store.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
@abstractmethod
def groupby(
    self,
    keys: Union[list[str], str],
    criteria: Optional[dict] = None,
    properties: Union[dict, list, None] = None,
    sort: Optional[dict[str, Union[Sort, int]]] = None,
    skip: int = 0,
    limit: int = 0,
) -> Iterator[tuple[dict, list[dict]]]:
    """
    Simple grouping function that will group documents
    by keys.

    Args:
        keys: fields to group documents
        criteria: PyMongo filter for documents to search in
        properties: properties to return in grouped documents
        sort: Dictionary of sort order for fields. Keys are field names and
            values are 1 for ascending or -1 for descending.
        skip: number documents to skip
        limit: limit on total number of documents returned

    Returns:
        generator returning tuples of (dict, list of docs)
    """

lu_filter(targets)

Creates a MongoDB filter for new documents.

By "new", we mean documents in this Store that were last updated later than any document in targets.

Parameters:

Name Type Description Default
targets list

A list of Stores

required
Source code in src/maggma/core/store.py
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
@deprecated(message="Please use Store.newer_in")
def lu_filter(self, targets):
    """Creates a MongoDB filter for new documents.

    By "new", we mean documents in this Store that were last updated later
    than any document in targets.

    Args:
        targets (list): A list of Stores

    """
    if isinstance(targets, Store):
        targets = [targets]

    lu_list = [t.last_updated for t in targets]
    return {self.last_updated_field: {"$gt": self._lu_func[1](max(lu_list))}}

name()

Return a string representing this data source.

Source code in src/maggma/core/store.py
71
72
73
74
75
@abstractproperty
def name(self) -> str:
    """
    Return a string representing this data source.
    """

newer_in(target, criteria=None, exhaustive=False)

Returns the keys of documents that are newer in the target Store than this Store.

Parameters:

Name Type Description Default
target Store

target Store to

required
criteria Optional[dict]

PyMongo filter for documents to search in

None
exhaustive bool

triggers an item-by-item check vs. checking the last_updated of the target Store and using that to filter out new items in

False
Source code in src/maggma/core/store.py
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
def newer_in(self, target: "Store", criteria: Optional[dict] = None, exhaustive: bool = False) -> list[str]:
    """
    Returns the keys of documents that are newer in the target
    Store than this Store.

    Args:
        target: target Store to
        criteria: PyMongo filter for documents to search in
        exhaustive: triggers an item-by-item check vs. checking
                    the last_updated of the target Store and using
                    that to filter out new items in
    """
    self.ensure_index(self.key)
    self.ensure_index(self.last_updated_field)

    if exhaustive:
        # Get our current last_updated dates for each key value
        props = {self.key: 1, self.last_updated_field: 1, "_id": 0}
        dates = {
            d[self.key]: self._lu_func[0](d.get(self.last_updated_field, datetime.max))
            for d in self.query(properties=props)
        }

        # Get the last_updated for the store we're comparing with
        props = {target.key: 1, target.last_updated_field: 1, "_id": 0}
        target_dates = {
            d[target.key]: target._lu_func[0](d.get(target.last_updated_field, datetime.min))
            for d in target.query(criteria=criteria, properties=props)
        }

        new_keys = set(target_dates.keys()) - set(dates.keys())
        updated_keys = {key for key, date in dates.items() if target_dates.get(key, datetime.min) > date}

        return list(new_keys | updated_keys)

    criteria = {self.last_updated_field: {"$gt": self._lu_func[1](self.last_updated)}}
    return target.distinct(field=self.key, criteria=criteria)

query(criteria=None, properties=None, sort=None, skip=0, limit=0) abstractmethod

Queries the Store for a set of documents.

Parameters:

Name Type Description Default
criteria Optional[dict]

PyMongo filter for documents to search in

None
properties Union[dict, list, None]

properties to return in grouped documents

None
sort Optional[dict[str, Union[Sort, int]]]

Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending.

None
skip int

number documents to skip

0
limit int

limit on total number of documents returned

0
Source code in src/maggma/core/store.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
@abstractmethod
def query(
    self,
    criteria: Optional[dict] = None,
    properties: Union[dict, list, None] = None,
    sort: Optional[dict[str, Union[Sort, int]]] = None,
    skip: int = 0,
    limit: int = 0,
) -> Iterator[dict]:
    """
    Queries the Store for a set of documents.

    Args:
        criteria: PyMongo filter for documents to search in
        properties: properties to return in grouped documents
        sort: Dictionary of sort order for fields. Keys are field names and
            values are 1 for ascending or -1 for descending.
        skip: number documents to skip
        limit: limit on total number of documents returned
    """

query_one(criteria=None, properties=None, sort=None)

Queries the Store for a single document.

Parameters:

Name Type Description Default
criteria Optional[dict]

PyMongo filter for documents to search

None
properties Union[dict, list, None]

properties to return in the document

None
sort Optional[dict[str, Union[Sort, int]]]

Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending.

None
Source code in src/maggma/core/store.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
def query_one(
    self,
    criteria: Optional[dict] = None,
    properties: Union[dict, list, None] = None,
    sort: Optional[dict[str, Union[Sort, int]]] = None,
):
    """
    Queries the Store for a single document.

    Args:
        criteria: PyMongo filter for documents to search
        properties: properties to return in the document
        sort: Dictionary of sort order for fields. Keys are field names and
            values are 1 for ascending or -1 for descending.
    """
    return next(self.query(criteria=criteria, properties=properties, sort=sort), None)

remove_docs(criteria) abstractmethod

Remove docs matching the query dictionary.

Parameters:

Name Type Description Default
criteria dict

query dictionary to match

required
Source code in src/maggma/core/store.py
175
176
177
178
179
180
181
182
@abstractmethod
def remove_docs(self, criteria: dict):
    """
    Remove docs matching the query dictionary.

    Args:
        criteria: query dictionary to match
    """

update(docs, key=None) abstractmethod

Update documents into the Store.

Parameters:

Name Type Description Default
docs Union[list[dict], dict]

the document or list of documents to update

required
key Union[list, str, None]

field name(s) to determine uniqueness for a document, can be a list of multiple fields, a single field, or None if the Store's key field is to be used

None
Source code in src/maggma/core/store.py
122
123
124
125
126
127
128
129
130
131
132
133
@abstractmethod
def update(self, docs: Union[list[dict], dict], key: Union[list, str, None] = None):
    """
    Update documents into the Store.

    Args:
        docs: the document or list of documents to update
        key: field name(s) to determine uniqueness for a
             document, can be a list of multiple fields,
             a single field, or None if the Store's key
             field is to be used
    """

updated_keys(target, criteria=None)

Returns keys for docs that are newer in the target store in comparison with this store when comparing the last updated field (last_updated_field).

Parameters:

Name Type Description Default
target Store

store to look for updated documents

required
criteria dict

mongo query to limit scope

None

Returns:

Type Description

list of keys that have been updated in target store

Source code in src/maggma/core/store.py
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
@deprecated(message="Use Store.newer_in")
def updated_keys(self, target, criteria=None):
    """
    Returns keys for docs that are newer in the target store in comparison
    with this store when comparing the last updated field (last_updated_field).

    Args:
        target (Store): store to look for updated documents
        criteria (dict): mongo query to limit scope

    Returns:
        list of keys that have been updated in target store
    """
    self.ensure_index(self.key)
    self.ensure_index(self.last_updated_field)

    return self.newer_in(target, criteria=criteria)

StoreError

Bases: Exception

General Store-related error.

Source code in src/maggma/core/store.py
332
333
334
335
336
class StoreError(Exception):
    """General Store-related error."""

    def __init__(self, *args, **kwargs):
        super().__init__(self, *args, **kwargs)