Skip to content

Stores

Module containing various definitions of Stores. Stores are a default access pattern to data and provide various utilities.

JSONStore

Bases: MemoryStore

A Store for access to a single or multiple JSON files.

Source code in src/maggma/stores/mongolike.py
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
class JSONStore(MemoryStore):
    """
    A Store for access to a single or multiple JSON files.
    """

    def __init__(
        self,
        paths: Union[str, List[str]],
        read_only: bool = True,
        serialization_option: Optional[int] = None,
        serialization_default: Optional[Callable[[Any], Any]] = None,
        **kwargs,
    ):
        """
        Args:
            paths: paths for json files to turn into a Store
            read_only: whether this JSONStore is read only. When read_only=True,
                       the JSONStore can still apply MongoDB-like writable operations
                       (e.g. an update) because it behaves like a MemoryStore,
                       but it will not write those changes to the file. On the other hand,
                       if read_only=False (i.e., it is writeable), the JSON file
                       will be automatically updated every time a write-like operation is
                       performed.

                       Note that when read_only=False, JSONStore only supports a single JSON
                       file. If the file does not exist, it will be automatically created
                       when the JSONStore is initialized.
            serialization_option:
                option that will be passed to the orjson.dump when saving to the json the file.
            serialization_default:
                default that will be passed to the orjson.dump when saving to the json the file.
        """
        paths = paths if isinstance(paths, (list, tuple)) else [paths]
        self.paths = paths

        # file_writable overrides read_only for compatibility reasons
        if "file_writable" in kwargs:
            file_writable = kwargs.pop("file_writable")
            warnings.warn(
                "file_writable is deprecated; use read only instead.",
                DeprecationWarning,
            )
            self.read_only = not file_writable
            if self.read_only != read_only:
                warnings.warn(
                    f"Received conflicting keyword arguments file_writable={file_writable}"
                    f" and read_only={read_only}. Setting read_only={file_writable}.",
                    UserWarning,
                )
        else:
            self.read_only = read_only
        self.kwargs = kwargs

        if not self.read_only and len(paths) > 1:
            raise RuntimeError("Cannot instantiate file-writable JSONStore with multiple JSON files.")

        self.default_sort = None
        self.serialization_option = serialization_option
        self.serialization_default = serialization_default

        super().__init__(**kwargs)

    def connect(self, force_reset: bool = False):
        """
        Loads the files into the collection in memory.

        Args:
            force_reset: whether to reset the connection or not. If False (default) and .connect()
            has been called previously, the .json file will not be read in again. This can improve performance
            on systems with slow storage when multiple connect / disconnects are performed.
        """
        if self._coll is None or force_reset:
            self._coll = mongomock.MongoClient().db[self.name]  # type: ignore

            # create the .json file if it does not exist
            if not self.read_only and not Path(self.paths[0]).exists():
                with zopen(self.paths[0], "w") as f:
                    data: List[dict] = []
                    bytesdata = orjson.dumps(data)
                    f.write(bytesdata.decode("utf-8"))

            for path in self.paths:
                objects = self.read_json_file(path)
                try:
                    self.update(objects)
                except KeyError:
                    raise KeyError(
                        f"""
                        Key field '{self.key}' not found in {path.name}. This
                        could mean that this JSONStore was initially created with a different key field.
                        The keys found in the .json file are {list(objects[0].keys())}. Try
                        re-initializing your JSONStore using one of these as the key arguments.
                        """
                    )

    def read_json_file(self, path) -> List:
        """
        Helper method to read the contents of a JSON file and generate
        a list of docs.

        Args:
            path: Path to the JSON file to be read
        """
        with zopen(path) as f:
            data = f.read()
            data = data.decode() if isinstance(data, bytes) else data
            objects = bson.json_util.loads(data) if "$oid" in data else orjson.loads(data)
            objects = [objects] if not isinstance(objects, list) else objects
            # datetime objects deserialize to str. Try to convert the last_updated
            # field back to datetime.
            # # TODO - there may still be problems caused if a JSONStore is init'ed from
            # documents that don't contain a last_updated field
            # See Store.last_updated in store.py.
            for obj in objects:
                if obj.get(self.last_updated_field):
                    obj[self.last_updated_field] = to_dt(obj[self.last_updated_field])

        return objects

    def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = None):
        """
        Update documents into the Store.

        For a file-writable JSONStore, the json file is updated.

        Args:
            docs: the document or list of documents to update
            key: field name(s) to determine uniqueness for a
                 document, can be a list of multiple fields,
                 a single field, or None if the Store's key
                 field is to be used
        """
        super().update(docs=docs, key=key)
        if not self.read_only:
            self.update_json_file()

    def remove_docs(self, criteria: Dict):
        """
        Remove docs matching the query dictionary.

        For a file-writable JSONStore, the json file is updated.

        Args:
            criteria: query dictionary to match
        """
        super().remove_docs(criteria=criteria)
        if not self.read_only:
            self.update_json_file()

    def update_json_file(self):
        """
        Updates the json file when a write-like operation is performed.
        """
        with zopen(self.paths[0], "w") as f:
            data = list(self.query())
            for d in data:
                d.pop("_id")
            bytesdata = orjson.dumps(
                data,
                option=self.serialization_option,
                default=self.serialization_default,
            )
            f.write(bytesdata.decode("utf-8"))

    def __hash__(self):
        return hash((*self.paths, self.last_updated_field))

    def __eq__(self, other: object) -> bool:
        """
        Check equality for JSONStore.

        Args:
            other: other JSONStore to compare with
        """
        if not isinstance(other, JSONStore):
            return False

        fields = ["paths", "last_updated_field"]
        return all(getattr(self, f) == getattr(other, f) for f in fields)

__eq__(other)

Check equality for JSONStore.

Parameters:

Name Type Description Default
other object

other JSONStore to compare with

required
Source code in src/maggma/stores/mongolike.py
779
780
781
782
783
784
785
786
787
788
789
790
def __eq__(self, other: object) -> bool:
    """
    Check equality for JSONStore.

    Args:
        other: other JSONStore to compare with
    """
    if not isinstance(other, JSONStore):
        return False

    fields = ["paths", "last_updated_field"]
    return all(getattr(self, f) == getattr(other, f) for f in fields)

__init__(paths, read_only=True, serialization_option=None, serialization_default=None, **kwargs)

Parameters:

Name Type Description Default
paths Union[str, List[str]]

paths for json files to turn into a Store

required
read_only bool

whether this JSONStore is read only. When read_only=True, the JSONStore can still apply MongoDB-like writable operations (e.g. an update) because it behaves like a MemoryStore, but it will not write those changes to the file. On the other hand, if read_only=False (i.e., it is writeable), the JSON file will be automatically updated every time a write-like operation is performed.

   Note that when read_only=False, JSONStore only supports a single JSON
   file. If the file does not exist, it will be automatically created
   when the JSONStore is initialized.
True
serialization_option Optional[int]

option that will be passed to the orjson.dump when saving to the json the file.

None
serialization_default Optional[Callable[[Any], Any]]

default that will be passed to the orjson.dump when saving to the json the file.

None
Source code in src/maggma/stores/mongolike.py
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
def __init__(
    self,
    paths: Union[str, List[str]],
    read_only: bool = True,
    serialization_option: Optional[int] = None,
    serialization_default: Optional[Callable[[Any], Any]] = None,
    **kwargs,
):
    """
    Args:
        paths: paths for json files to turn into a Store
        read_only: whether this JSONStore is read only. When read_only=True,
                   the JSONStore can still apply MongoDB-like writable operations
                   (e.g. an update) because it behaves like a MemoryStore,
                   but it will not write those changes to the file. On the other hand,
                   if read_only=False (i.e., it is writeable), the JSON file
                   will be automatically updated every time a write-like operation is
                   performed.

                   Note that when read_only=False, JSONStore only supports a single JSON
                   file. If the file does not exist, it will be automatically created
                   when the JSONStore is initialized.
        serialization_option:
            option that will be passed to the orjson.dump when saving to the json the file.
        serialization_default:
            default that will be passed to the orjson.dump when saving to the json the file.
    """
    paths = paths if isinstance(paths, (list, tuple)) else [paths]
    self.paths = paths

    # file_writable overrides read_only for compatibility reasons
    if "file_writable" in kwargs:
        file_writable = kwargs.pop("file_writable")
        warnings.warn(
            "file_writable is deprecated; use read only instead.",
            DeprecationWarning,
        )
        self.read_only = not file_writable
        if self.read_only != read_only:
            warnings.warn(
                f"Received conflicting keyword arguments file_writable={file_writable}"
                f" and read_only={read_only}. Setting read_only={file_writable}.",
                UserWarning,
            )
    else:
        self.read_only = read_only
    self.kwargs = kwargs

    if not self.read_only and len(paths) > 1:
        raise RuntimeError("Cannot instantiate file-writable JSONStore with multiple JSON files.")

    self.default_sort = None
    self.serialization_option = serialization_option
    self.serialization_default = serialization_default

    super().__init__(**kwargs)

connect(force_reset=False)

Loads the files into the collection in memory.

Parameters:

Name Type Description Default
force_reset bool

whether to reset the connection or not. If False (default) and .connect()

False
Source code in src/maggma/stores/mongolike.py
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
def connect(self, force_reset: bool = False):
    """
    Loads the files into the collection in memory.

    Args:
        force_reset: whether to reset the connection or not. If False (default) and .connect()
        has been called previously, the .json file will not be read in again. This can improve performance
        on systems with slow storage when multiple connect / disconnects are performed.
    """
    if self._coll is None or force_reset:
        self._coll = mongomock.MongoClient().db[self.name]  # type: ignore

        # create the .json file if it does not exist
        if not self.read_only and not Path(self.paths[0]).exists():
            with zopen(self.paths[0], "w") as f:
                data: List[dict] = []
                bytesdata = orjson.dumps(data)
                f.write(bytesdata.decode("utf-8"))

        for path in self.paths:
            objects = self.read_json_file(path)
            try:
                self.update(objects)
            except KeyError:
                raise KeyError(
                    f"""
                    Key field '{self.key}' not found in {path.name}. This
                    could mean that this JSONStore was initially created with a different key field.
                    The keys found in the .json file are {list(objects[0].keys())}. Try
                    re-initializing your JSONStore using one of these as the key arguments.
                    """
                )

read_json_file(path)

Helper method to read the contents of a JSON file and generate a list of docs.

Parameters:

Name Type Description Default
path

Path to the JSON file to be read

required
Source code in src/maggma/stores/mongolike.py
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
def read_json_file(self, path) -> List:
    """
    Helper method to read the contents of a JSON file and generate
    a list of docs.

    Args:
        path: Path to the JSON file to be read
    """
    with zopen(path) as f:
        data = f.read()
        data = data.decode() if isinstance(data, bytes) else data
        objects = bson.json_util.loads(data) if "$oid" in data else orjson.loads(data)
        objects = [objects] if not isinstance(objects, list) else objects
        # datetime objects deserialize to str. Try to convert the last_updated
        # field back to datetime.
        # # TODO - there may still be problems caused if a JSONStore is init'ed from
        # documents that don't contain a last_updated field
        # See Store.last_updated in store.py.
        for obj in objects:
            if obj.get(self.last_updated_field):
                obj[self.last_updated_field] = to_dt(obj[self.last_updated_field])

    return objects

remove_docs(criteria)

Remove docs matching the query dictionary.

For a file-writable JSONStore, the json file is updated.

Parameters:

Name Type Description Default
criteria Dict

query dictionary to match

required
Source code in src/maggma/stores/mongolike.py
748
749
750
751
752
753
754
755
756
757
758
759
def remove_docs(self, criteria: Dict):
    """
    Remove docs matching the query dictionary.

    For a file-writable JSONStore, the json file is updated.

    Args:
        criteria: query dictionary to match
    """
    super().remove_docs(criteria=criteria)
    if not self.read_only:
        self.update_json_file()

update(docs, key=None)

Update documents into the Store.

For a file-writable JSONStore, the json file is updated.

Parameters:

Name Type Description Default
docs Union[List[Dict], Dict]

the document or list of documents to update

required
key Union[List, str, None]

field name(s) to determine uniqueness for a document, can be a list of multiple fields, a single field, or None if the Store's key field is to be used

None
Source code in src/maggma/stores/mongolike.py
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = None):
    """
    Update documents into the Store.

    For a file-writable JSONStore, the json file is updated.

    Args:
        docs: the document or list of documents to update
        key: field name(s) to determine uniqueness for a
             document, can be a list of multiple fields,
             a single field, or None if the Store's key
             field is to be used
    """
    super().update(docs=docs, key=key)
    if not self.read_only:
        self.update_json_file()

update_json_file()

Updates the json file when a write-like operation is performed.

Source code in src/maggma/stores/mongolike.py
761
762
763
764
765
766
767
768
769
770
771
772
773
774
def update_json_file(self):
    """
    Updates the json file when a write-like operation is performed.
    """
    with zopen(self.paths[0], "w") as f:
        data = list(self.query())
        for d in data:
            d.pop("_id")
        bytesdata = orjson.dumps(
            data,
            option=self.serialization_option,
            default=self.serialization_default,
        )
        f.write(bytesdata.decode("utf-8"))

MemoryStore

Bases: MongoStore

An in-memory Store that functions similarly to a MongoStore.

Source code in src/maggma/stores/mongolike.py
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
class MemoryStore(MongoStore):
    """
    An in-memory Store that functions similarly
    to a MongoStore.
    """

    def __init__(self, collection_name: str = "memory_db", **kwargs):
        """
        Initializes the Memory Store.

        Args:
            collection_name: name for the collection in memory.
        """
        self.collection_name = collection_name
        self.default_sort = None
        self._coll = None
        self.kwargs = kwargs
        super(MongoStore, self).__init__(**kwargs)

    def connect(self, force_reset: bool = False):
        """
        Connect to the source data.

        Args:
            force_reset: whether to reset the connection or not when the Store is
                already connected.
        """
        if self._coll is None or force_reset:
            self._coll = mongomock.MongoClient().db[self.name]  # type: ignore

    def close(self):
        """Close up all collections."""
        self._coll.database.client.close()

    @property
    def name(self):
        """Name for the store."""
        return f"mem://{self.collection_name}"

    def __hash__(self):
        """Hash for the store."""
        return hash((self.name, self.last_updated_field))

    def groupby(
        self,
        keys: Union[List[str], str],
        criteria: Optional[Dict] = None,
        properties: Union[Dict, List, None] = None,
        sort: Optional[Dict[str, Union[Sort, int]]] = None,
        skip: int = 0,
        limit: int = 0,
    ) -> Iterator[Tuple[Dict, List[Dict]]]:
        """
        Simple grouping function that will group documents
        by keys.

        Args:
            keys: fields to group documents
            criteria: PyMongo filter for documents to search in
            properties: properties to return in grouped documents
            sort: Dictionary of sort order for fields. Keys are field names and
                values are 1 for ascending or -1 for descending.
            skip: number documents to skip
            limit: limit on total number of documents returned

        Returns:
            generator returning tuples of (key, list of elements)
        """
        keys = keys if isinstance(keys, list) else [keys]

        if properties is None:
            properties = []
        if isinstance(properties, dict):
            properties = list(properties.keys())

        data = [
            doc for doc in self.query(properties=keys + properties, criteria=criteria) if all(has(doc, k) for k in keys)
        ]

        def grouping_keys(doc):
            return tuple(get(doc, k) for k in keys)

        for vals, group in groupby(sorted(data, key=grouping_keys), key=grouping_keys):
            doc = {}  # type: ignore
            for k, v in zip(keys, vals):
                set_(doc, k, v)
            yield doc, list(group)

    def __eq__(self, other: object) -> bool:
        """
        Check equality for MemoryStore
        other: other MemoryStore to compare with.
        """
        if not isinstance(other, MemoryStore):
            return False

        fields = ["collection_name", "last_updated_field"]
        return all(getattr(self, f) == getattr(other, f) for f in fields)

name property

Name for the store.

__eq__(other)

Check equality for MemoryStore other: other MemoryStore to compare with.

Source code in src/maggma/stores/mongolike.py
600
601
602
603
604
605
606
607
608
609
def __eq__(self, other: object) -> bool:
    """
    Check equality for MemoryStore
    other: other MemoryStore to compare with.
    """
    if not isinstance(other, MemoryStore):
        return False

    fields = ["collection_name", "last_updated_field"]
    return all(getattr(self, f) == getattr(other, f) for f in fields)

__hash__()

Hash for the store.

Source code in src/maggma/stores/mongolike.py
551
552
553
def __hash__(self):
    """Hash for the store."""
    return hash((self.name, self.last_updated_field))

__init__(collection_name='memory_db', **kwargs)

Initializes the Memory Store.

Parameters:

Name Type Description Default
collection_name str

name for the collection in memory.

'memory_db'
Source code in src/maggma/stores/mongolike.py
518
519
520
521
522
523
524
525
526
527
528
529
def __init__(self, collection_name: str = "memory_db", **kwargs):
    """
    Initializes the Memory Store.

    Args:
        collection_name: name for the collection in memory.
    """
    self.collection_name = collection_name
    self.default_sort = None
    self._coll = None
    self.kwargs = kwargs
    super(MongoStore, self).__init__(**kwargs)

close()

Close up all collections.

Source code in src/maggma/stores/mongolike.py
542
543
544
def close(self):
    """Close up all collections."""
    self._coll.database.client.close()

connect(force_reset=False)

Connect to the source data.

Parameters:

Name Type Description Default
force_reset bool

whether to reset the connection or not when the Store is already connected.

False
Source code in src/maggma/stores/mongolike.py
531
532
533
534
535
536
537
538
539
540
def connect(self, force_reset: bool = False):
    """
    Connect to the source data.

    Args:
        force_reset: whether to reset the connection or not when the Store is
            already connected.
    """
    if self._coll is None or force_reset:
        self._coll = mongomock.MongoClient().db[self.name]  # type: ignore

groupby(keys, criteria=None, properties=None, sort=None, skip=0, limit=0)

Simple grouping function that will group documents by keys.

Parameters:

Name Type Description Default
keys Union[List[str], str]

fields to group documents

required
criteria Optional[Dict]

PyMongo filter for documents to search in

None
properties Union[Dict, List, None]

properties to return in grouped documents

None
sort Optional[Dict[str, Union[Sort, int]]]

Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending.

None
skip int

number documents to skip

0
limit int

limit on total number of documents returned

0

Returns:

Type Description
Iterator[Tuple[Dict, List[Dict]]]

generator returning tuples of (key, list of elements)

Source code in src/maggma/stores/mongolike.py
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
def groupby(
    self,
    keys: Union[List[str], str],
    criteria: Optional[Dict] = None,
    properties: Union[Dict, List, None] = None,
    sort: Optional[Dict[str, Union[Sort, int]]] = None,
    skip: int = 0,
    limit: int = 0,
) -> Iterator[Tuple[Dict, List[Dict]]]:
    """
    Simple grouping function that will group documents
    by keys.

    Args:
        keys: fields to group documents
        criteria: PyMongo filter for documents to search in
        properties: properties to return in grouped documents
        sort: Dictionary of sort order for fields. Keys are field names and
            values are 1 for ascending or -1 for descending.
        skip: number documents to skip
        limit: limit on total number of documents returned

    Returns:
        generator returning tuples of (key, list of elements)
    """
    keys = keys if isinstance(keys, list) else [keys]

    if properties is None:
        properties = []
    if isinstance(properties, dict):
        properties = list(properties.keys())

    data = [
        doc for doc in self.query(properties=keys + properties, criteria=criteria) if all(has(doc, k) for k in keys)
    ]

    def grouping_keys(doc):
        return tuple(get(doc, k) for k in keys)

    for vals, group in groupby(sorted(data, key=grouping_keys), key=grouping_keys):
        doc = {}  # type: ignore
        for k, v in zip(keys, vals):
            set_(doc, k, v)
        yield doc, list(group)

MongoStore

Bases: Store

A Store that connects to a Mongo collection.

Source code in src/maggma/stores/mongolike.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
class MongoStore(Store):
    """
    A Store that connects to a Mongo collection.
    """

    def __init__(
        self,
        database: str,
        collection_name: str,
        host: str = "localhost",
        port: int = 27017,
        username: str = "",
        password: str = "",
        ssh_tunnel: Optional[SSHTunnel] = None,
        safe_update: bool = False,
        auth_source: Optional[str] = None,
        mongoclient_kwargs: Optional[Dict] = None,
        default_sort: Optional[Dict[str, Union[Sort, int]]] = None,
        **kwargs,
    ):
        """
        Args:
            database: The database name
            collection_name: The collection name
            host: Hostname for the database
            port: TCP port to connect to
            username: Username for the collection
            password: Password to connect with
            safe_update: fail gracefully on DocumentTooLarge errors on update
            auth_source: The database to authenticate on. Defaults to the database name.
            default_sort: Default sort field and direction to use when querying. Can be used to
                ensure determinacy in query results.
        """
        self.database = database
        self.collection_name = collection_name
        self.host = host
        self.port = port
        self.username = username
        self.password = password
        self.ssh_tunnel = ssh_tunnel
        self.safe_update = safe_update
        self.default_sort = default_sort
        self._coll = None  # type: ignore
        self.kwargs = kwargs

        if auth_source is None:
            auth_source = self.database
        self.auth_source = auth_source
        self.mongoclient_kwargs = mongoclient_kwargs or {}

        super().__init__(**kwargs)

    @property
    def name(self) -> str:
        """
        Return a string representing this data source.
        """
        return f"mongo://{self.host}/{self.database}/{self.collection_name}"

    def connect(self, force_reset: bool = False):
        """
        Connect to the source data.

        Args:
            force_reset: whether to reset the connection or not when the Store is
                already connected.
        """
        if self._coll is None or force_reset:
            if self.ssh_tunnel is None:
                host = self.host
                port = self.port
            else:
                self.ssh_tunnel.start()
                host, port = self.ssh_tunnel.local_address

            conn: MongoClient = (
                MongoClient(
                    host=host,
                    port=port,
                    username=self.username,
                    password=self.password,
                    authSource=self.auth_source,
                    **self.mongoclient_kwargs,
                )
                if self.username != ""
                else MongoClient(host, port, **self.mongoclient_kwargs)
            )
            db = conn[self.database]
            self._coll = db[self.collection_name]  # type: ignore

    def __hash__(self) -> int:
        """Hash for MongoStore."""
        return hash((self.database, self.collection_name, self.last_updated_field))

    @classmethod
    def from_db_file(cls, filename: str, **kwargs):
        """
        Convenience method to construct MongoStore from db_file
        from old QueryEngine format.
        """
        kwargs = loadfn(filename)
        if "collection" in kwargs:
            kwargs["collection_name"] = kwargs.pop("collection")
        # Get rid of aliases from traditional query engine db docs
        kwargs.pop("aliases", None)
        return cls(**kwargs)

    @classmethod
    def from_launchpad_file(cls, lp_file, collection_name, **kwargs):
        """
        Convenience method to construct MongoStore from a launchpad file.

        Note: A launchpad file is a special formatted yaml file used in fireworks

        Returns:
        """
        with open(lp_file) as f:
            lp_creds = yaml.safe_load(f.read())

        db_creds = lp_creds.copy()
        db_creds["database"] = db_creds["name"]
        for key in list(db_creds.keys()):
            if key not in ["database", "host", "port", "username", "password"]:
                db_creds.pop(key)
        db_creds["collection_name"] = collection_name

        return cls(**db_creds, **kwargs)

    def distinct(self, field: str, criteria: Optional[Dict] = None, all_exist: bool = False) -> List:
        """
        Get all distinct values for a field.

        Args:
            field: the field(s) to get distinct values for
            criteria: PyMongo filter for documents to search in
        """

        criteria = criteria or {}
        try:
            distinct_vals = self._collection.distinct(field, criteria)
        except (OperationFailure, DocumentTooLarge):
            distinct_vals = [
                d["_id"] for d in self._collection.aggregate([{"$match": criteria}, {"$group": {"_id": f"${field}"}}])
            ]
            if all(isinstance(d, list) for d in filter(None, distinct_vals)):  # type: ignore
                distinct_vals = list(chain.from_iterable(filter(None, distinct_vals)))

        return distinct_vals if distinct_vals is not None else []

    def groupby(
        self,
        keys: Union[List[str], str],
        criteria: Optional[Dict] = None,
        properties: Union[Dict, List, None] = None,
        sort: Optional[Dict[str, Union[Sort, int]]] = None,
        skip: int = 0,
        limit: int = 0,
    ) -> Iterator[Tuple[Dict, List[Dict]]]:
        """
        Simple grouping function that will group documents
        by keys.

        Args:
            keys: fields to group documents
            criteria: PyMongo filter for documents to search in
            properties: properties to return in grouped documents
            sort: Dictionary of sort order for fields. Keys are field names and
                values are 1 for ascending or -1 for descending.
            skip: number documents to skip
            limit: limit on total number of documents returned

        Returns:
            generator returning tuples of (key, list of docs)
        """
        pipeline = []
        if isinstance(keys, str):
            keys = [keys]

        if properties is None:
            properties = []
        if isinstance(properties, dict):
            properties = list(properties.keys())

        if criteria is not None:
            pipeline.append({"$match": criteria})

        if len(properties) > 0:
            pipeline.append({"$project": {p: 1 for p in properties + keys}})

        alpha = "abcdefghijklmnopqrstuvwxyz"
        group_id = {letter: f"${key}" for letter, key in zip(alpha, keys)}
        pipeline.append({"$group": {"_id": group_id, "docs": {"$push": "$$ROOT"}}})
        for d in self._collection.aggregate(pipeline, allowDiskUse=True):
            id_doc = {}  # type: ignore
            for letter, key in group_id.items():
                if has(d["_id"], letter):
                    set_(id_doc, key[1:], d["_id"][letter])
            yield (id_doc, d["docs"])

    @classmethod
    def from_collection(cls, collection):
        """
        Generates a MongoStore from a pymongo collection object
        This is not a fully safe operation as it gives dummy information to the MongoStore
        As a result, this will not serialize and can not reset its connection.

        Args:
            collection: the PyMongo collection to create a MongoStore around
        """
        # TODO: How do we make this safer?
        coll_name = collection.name
        db_name = collection.database.name

        store = cls(db_name, coll_name)
        store._coll = collection
        return store

    @property
    def _collection(self):
        """Property referring to underlying pymongo collection."""
        if self._coll is None:
            raise StoreError("Must connect Mongo-like store before attempting to use it")
        return self._coll

    def count(
        self,
        criteria: Optional[Dict] = None,
        hint: Optional[Dict[str, Union[Sort, int]]] = None,
    ) -> int:
        """
        Counts the number of documents matching the query criteria.

        Args:
            criteria: PyMongo filter for documents to count in
            hint: Dictionary of indexes to use as hints for query optimizer.
                Keys are field names and values are 1 for ascending or -1 for descending.
        """

        criteria = criteria if criteria else {}

        hint_list = (
            [(k, Sort(v).value) if isinstance(v, int) else (k, v.value) for k, v in hint.items()] if hint else None
        )

        if hint_list is not None:  # pragma: no cover
            return self._collection.count_documents(filter=criteria, hint=hint_list)

        return (
            self._collection.count_documents(filter=criteria)
            if criteria
            else self._collection.estimated_document_count()
        )

    def query(  # type: ignore
        self,
        criteria: Optional[Dict] = None,
        properties: Union[Dict, List, None] = None,
        sort: Optional[Dict[str, Union[Sort, int]]] = None,
        hint: Optional[Dict[str, Union[Sort, int]]] = None,
        skip: int = 0,
        limit: int = 0,
        **kwargs,
    ) -> Iterator[Dict]:
        """
        Queries the Store for a set of documents.

        Args:
            criteria: PyMongo filter for documents to search in
            properties: properties to return in grouped documents
            sort: Dictionary of sort order for fields. Keys are field names and
                values are 1 for ascending or -1 for descending.
            hint: Dictionary of indexes to use as hints for query optimizer.
                Keys are field names and values are 1 for ascending or -1 for descending.
            skip: number documents to skip
            limit: limit on total number of documents returned
            mongoclient_kwargs: Dict of extra kwargs to pass to pymongo find.
        """
        if isinstance(properties, list):
            properties = {p: 1 for p in properties}

        default_sort_formatted = None

        if self.default_sort is not None:
            default_sort_formatted = [
                (k, Sort(v).value) if isinstance(v, int) else (k, v.value) for k, v in self.default_sort.items()
            ]

        sort_list = (
            [(k, Sort(v).value) if isinstance(v, int) else (k, v.value) for k, v in sort.items()]
            if sort
            else default_sort_formatted
        )

        hint_list = (
            [(k, Sort(v).value) if isinstance(v, int) else (k, v.value) for k, v in hint.items()] if hint else None
        )

        yield from self._collection.find(
            filter=criteria,
            projection=properties,
            skip=skip,
            limit=limit,
            sort=sort_list,
            hint=hint_list,
            **kwargs,
        )

    def ensure_index(self, key: str, unique: Optional[bool] = False) -> bool:
        """
        Tries to create an index and return true if it succeeded.

        Args:
            key: single key to index
            unique: Whether or not this index contains only unique keys.

        Returns:
            bool indicating if the index exists/was created
        """

        if confirm_field_index(self._collection, key):
            return True

        try:
            self._collection.create_index(key, unique=unique, background=True)
            return True
        except Exception:
            return False

    def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = None):
        """
        Update documents into the Store.

        Args:
            docs: the document or list of documents to update
            key: field name(s) to determine uniqueness for a
                 document, can be a list of multiple fields,
                 a single field, or None if the Store's key
                 field is to be used
        """

        requests = []

        if not isinstance(docs, list):
            docs = [docs]

        for d in (jsanitize(x, allow_bson=True) for x in docs):
            # document-level validation is optional
            validates = True
            if self.validator:
                validates = self.validator.is_valid(d)
                if not validates:
                    if self.validator.strict:
                        raise ValueError(self.validator.validation_errors(d))
                    self.logger.error(self.validator.validation_errors(d))

            if validates:
                key = key or self.key
                search_doc = {k: d[k] for k in key} if isinstance(key, list) else {key: d[key]}

                requests.append(ReplaceOne(search_doc, d, upsert=True))

        if len(requests) > 0:
            try:
                self._collection.bulk_write(requests, ordered=False)
            except (OperationFailure, DocumentTooLarge) as e:
                if self.safe_update:
                    for req in requests:
                        try:
                            self._collection.bulk_write([req], ordered=False)
                        except (OperationFailure, DocumentTooLarge):
                            self.logger.error(
                                f"Could not upload document for {req._filter} as it was too large for Mongo"
                            )
                else:
                    raise e

    def remove_docs(self, criteria: Dict):
        """
        Remove docs matching the query dictionary.

        Args:
            criteria: query dictionary to match
        """
        self._collection.delete_many(filter=criteria)

    def close(self):
        """Close up all collections."""
        self._collection.database.client.close()
        self._coll = None
        if self.ssh_tunnel is not None:
            self.ssh_tunnel.stop()

    def __eq__(self, other: object) -> bool:
        """
        Check equality for MongoStore
        other: other mongostore to compare with.
        """
        if not isinstance(other, MongoStore):
            return False

        fields = ["database", "collection_name", "host", "port", "last_updated_field"]
        return all(getattr(self, f) == getattr(other, f) for f in fields)

name: str property

Return a string representing this data source.

__eq__(other)

Check equality for MongoStore other: other mongostore to compare with.

Source code in src/maggma/stores/mongolike.py
434
435
436
437
438
439
440
441
442
443
def __eq__(self, other: object) -> bool:
    """
    Check equality for MongoStore
    other: other mongostore to compare with.
    """
    if not isinstance(other, MongoStore):
        return False

    fields = ["database", "collection_name", "host", "port", "last_updated_field"]
    return all(getattr(self, f) == getattr(other, f) for f in fields)

__hash__()

Hash for MongoStore.

Source code in src/maggma/stores/mongolike.py
132
133
134
def __hash__(self) -> int:
    """Hash for MongoStore."""
    return hash((self.database, self.collection_name, self.last_updated_field))

__init__(database, collection_name, host='localhost', port=27017, username='', password='', ssh_tunnel=None, safe_update=False, auth_source=None, mongoclient_kwargs=None, default_sort=None, **kwargs)

Parameters:

Name Type Description Default
database str

The database name

required
collection_name str

The collection name

required
host str

Hostname for the database

'localhost'
port int

TCP port to connect to

27017
username str

Username for the collection

''
password str

Password to connect with

''
safe_update bool

fail gracefully on DocumentTooLarge errors on update

False
auth_source Optional[str]

The database to authenticate on. Defaults to the database name.

None
default_sort Optional[Dict[str, Union[Sort, int]]]

Default sort field and direction to use when querying. Can be used to ensure determinacy in query results.

None
Source code in src/maggma/stores/mongolike.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def __init__(
    self,
    database: str,
    collection_name: str,
    host: str = "localhost",
    port: int = 27017,
    username: str = "",
    password: str = "",
    ssh_tunnel: Optional[SSHTunnel] = None,
    safe_update: bool = False,
    auth_source: Optional[str] = None,
    mongoclient_kwargs: Optional[Dict] = None,
    default_sort: Optional[Dict[str, Union[Sort, int]]] = None,
    **kwargs,
):
    """
    Args:
        database: The database name
        collection_name: The collection name
        host: Hostname for the database
        port: TCP port to connect to
        username: Username for the collection
        password: Password to connect with
        safe_update: fail gracefully on DocumentTooLarge errors on update
        auth_source: The database to authenticate on. Defaults to the database name.
        default_sort: Default sort field and direction to use when querying. Can be used to
            ensure determinacy in query results.
    """
    self.database = database
    self.collection_name = collection_name
    self.host = host
    self.port = port
    self.username = username
    self.password = password
    self.ssh_tunnel = ssh_tunnel
    self.safe_update = safe_update
    self.default_sort = default_sort
    self._coll = None  # type: ignore
    self.kwargs = kwargs

    if auth_source is None:
        auth_source = self.database
    self.auth_source = auth_source
    self.mongoclient_kwargs = mongoclient_kwargs or {}

    super().__init__(**kwargs)

close()

Close up all collections.

Source code in src/maggma/stores/mongolike.py
427
428
429
430
431
432
def close(self):
    """Close up all collections."""
    self._collection.database.client.close()
    self._coll = None
    if self.ssh_tunnel is not None:
        self.ssh_tunnel.stop()

connect(force_reset=False)

Connect to the source data.

Parameters:

Name Type Description Default
force_reset bool

whether to reset the connection or not when the Store is already connected.

False
Source code in src/maggma/stores/mongolike.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def connect(self, force_reset: bool = False):
    """
    Connect to the source data.

    Args:
        force_reset: whether to reset the connection or not when the Store is
            already connected.
    """
    if self._coll is None or force_reset:
        if self.ssh_tunnel is None:
            host = self.host
            port = self.port
        else:
            self.ssh_tunnel.start()
            host, port = self.ssh_tunnel.local_address

        conn: MongoClient = (
            MongoClient(
                host=host,
                port=port,
                username=self.username,
                password=self.password,
                authSource=self.auth_source,
                **self.mongoclient_kwargs,
            )
            if self.username != ""
            else MongoClient(host, port, **self.mongoclient_kwargs)
        )
        db = conn[self.database]
        self._coll = db[self.collection_name]  # type: ignore

count(criteria=None, hint=None)

Counts the number of documents matching the query criteria.

Parameters:

Name Type Description Default
criteria Optional[Dict]

PyMongo filter for documents to count in

None
hint Optional[Dict[str, Union[Sort, int]]]

Dictionary of indexes to use as hints for query optimizer. Keys are field names and values are 1 for ascending or -1 for descending.

None
Source code in src/maggma/stores/mongolike.py
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
def count(
    self,
    criteria: Optional[Dict] = None,
    hint: Optional[Dict[str, Union[Sort, int]]] = None,
) -> int:
    """
    Counts the number of documents matching the query criteria.

    Args:
        criteria: PyMongo filter for documents to count in
        hint: Dictionary of indexes to use as hints for query optimizer.
            Keys are field names and values are 1 for ascending or -1 for descending.
    """

    criteria = criteria if criteria else {}

    hint_list = (
        [(k, Sort(v).value) if isinstance(v, int) else (k, v.value) for k, v in hint.items()] if hint else None
    )

    if hint_list is not None:  # pragma: no cover
        return self._collection.count_documents(filter=criteria, hint=hint_list)

    return (
        self._collection.count_documents(filter=criteria)
        if criteria
        else self._collection.estimated_document_count()
    )

distinct(field, criteria=None, all_exist=False)

Get all distinct values for a field.

Parameters:

Name Type Description Default
field str

the field(s) to get distinct values for

required
criteria Optional[Dict]

PyMongo filter for documents to search in

None
Source code in src/maggma/stores/mongolike.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def distinct(self, field: str, criteria: Optional[Dict] = None, all_exist: bool = False) -> List:
    """
    Get all distinct values for a field.

    Args:
        field: the field(s) to get distinct values for
        criteria: PyMongo filter for documents to search in
    """

    criteria = criteria or {}
    try:
        distinct_vals = self._collection.distinct(field, criteria)
    except (OperationFailure, DocumentTooLarge):
        distinct_vals = [
            d["_id"] for d in self._collection.aggregate([{"$match": criteria}, {"$group": {"_id": f"${field}"}}])
        ]
        if all(isinstance(d, list) for d in filter(None, distinct_vals)):  # type: ignore
            distinct_vals = list(chain.from_iterable(filter(None, distinct_vals)))

    return distinct_vals if distinct_vals is not None else []

ensure_index(key, unique=False)

Tries to create an index and return true if it succeeded.

Parameters:

Name Type Description Default
key str

single key to index

required
unique Optional[bool]

Whether or not this index contains only unique keys.

False

Returns:

Type Description
bool

bool indicating if the index exists/was created

Source code in src/maggma/stores/mongolike.py
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
def ensure_index(self, key: str, unique: Optional[bool] = False) -> bool:
    """
    Tries to create an index and return true if it succeeded.

    Args:
        key: single key to index
        unique: Whether or not this index contains only unique keys.

    Returns:
        bool indicating if the index exists/was created
    """

    if confirm_field_index(self._collection, key):
        return True

    try:
        self._collection.create_index(key, unique=unique, background=True)
        return True
    except Exception:
        return False

from_collection(collection) classmethod

Generates a MongoStore from a pymongo collection object This is not a fully safe operation as it gives dummy information to the MongoStore As a result, this will not serialize and can not reset its connection.

Parameters:

Name Type Description Default
collection

the PyMongo collection to create a MongoStore around

required
Source code in src/maggma/stores/mongolike.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
@classmethod
def from_collection(cls, collection):
    """
    Generates a MongoStore from a pymongo collection object
    This is not a fully safe operation as it gives dummy information to the MongoStore
    As a result, this will not serialize and can not reset its connection.

    Args:
        collection: the PyMongo collection to create a MongoStore around
    """
    # TODO: How do we make this safer?
    coll_name = collection.name
    db_name = collection.database.name

    store = cls(db_name, coll_name)
    store._coll = collection
    return store

from_db_file(filename, **kwargs) classmethod

Convenience method to construct MongoStore from db_file from old QueryEngine format.

Source code in src/maggma/stores/mongolike.py
136
137
138
139
140
141
142
143
144
145
146
147
@classmethod
def from_db_file(cls, filename: str, **kwargs):
    """
    Convenience method to construct MongoStore from db_file
    from old QueryEngine format.
    """
    kwargs = loadfn(filename)
    if "collection" in kwargs:
        kwargs["collection_name"] = kwargs.pop("collection")
    # Get rid of aliases from traditional query engine db docs
    kwargs.pop("aliases", None)
    return cls(**kwargs)

from_launchpad_file(lp_file, collection_name, **kwargs) classmethod

Convenience method to construct MongoStore from a launchpad file.

Note: A launchpad file is a special formatted yaml file used in fireworks

Returns:

Source code in src/maggma/stores/mongolike.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
@classmethod
def from_launchpad_file(cls, lp_file, collection_name, **kwargs):
    """
    Convenience method to construct MongoStore from a launchpad file.

    Note: A launchpad file is a special formatted yaml file used in fireworks

    Returns:
    """
    with open(lp_file) as f:
        lp_creds = yaml.safe_load(f.read())

    db_creds = lp_creds.copy()
    db_creds["database"] = db_creds["name"]
    for key in list(db_creds.keys()):
        if key not in ["database", "host", "port", "username", "password"]:
            db_creds.pop(key)
    db_creds["collection_name"] = collection_name

    return cls(**db_creds, **kwargs)

groupby(keys, criteria=None, properties=None, sort=None, skip=0, limit=0)

Simple grouping function that will group documents by keys.

Parameters:

Name Type Description Default
keys Union[List[str], str]

fields to group documents

required
criteria Optional[Dict]

PyMongo filter for documents to search in

None
properties Union[Dict, List, None]

properties to return in grouped documents

None
sort Optional[Dict[str, Union[Sort, int]]]

Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending.

None
skip int

number documents to skip

0
limit int

limit on total number of documents returned

0

Returns:

Type Description
Iterator[Tuple[Dict, List[Dict]]]

generator returning tuples of (key, list of docs)

Source code in src/maggma/stores/mongolike.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
def groupby(
    self,
    keys: Union[List[str], str],
    criteria: Optional[Dict] = None,
    properties: Union[Dict, List, None] = None,
    sort: Optional[Dict[str, Union[Sort, int]]] = None,
    skip: int = 0,
    limit: int = 0,
) -> Iterator[Tuple[Dict, List[Dict]]]:
    """
    Simple grouping function that will group documents
    by keys.

    Args:
        keys: fields to group documents
        criteria: PyMongo filter for documents to search in
        properties: properties to return in grouped documents
        sort: Dictionary of sort order for fields. Keys are field names and
            values are 1 for ascending or -1 for descending.
        skip: number documents to skip
        limit: limit on total number of documents returned

    Returns:
        generator returning tuples of (key, list of docs)
    """
    pipeline = []
    if isinstance(keys, str):
        keys = [keys]

    if properties is None:
        properties = []
    if isinstance(properties, dict):
        properties = list(properties.keys())

    if criteria is not None:
        pipeline.append({"$match": criteria})

    if len(properties) > 0:
        pipeline.append({"$project": {p: 1 for p in properties + keys}})

    alpha = "abcdefghijklmnopqrstuvwxyz"
    group_id = {letter: f"${key}" for letter, key in zip(alpha, keys)}
    pipeline.append({"$group": {"_id": group_id, "docs": {"$push": "$$ROOT"}}})
    for d in self._collection.aggregate(pipeline, allowDiskUse=True):
        id_doc = {}  # type: ignore
        for letter, key in group_id.items():
            if has(d["_id"], letter):
                set_(id_doc, key[1:], d["_id"][letter])
        yield (id_doc, d["docs"])

query(criteria=None, properties=None, sort=None, hint=None, skip=0, limit=0, **kwargs)

Queries the Store for a set of documents.

Parameters:

Name Type Description Default
criteria Optional[Dict]

PyMongo filter for documents to search in

None
properties Union[Dict, List, None]

properties to return in grouped documents

None
sort Optional[Dict[str, Union[Sort, int]]]

Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending.

None
hint Optional[Dict[str, Union[Sort, int]]]

Dictionary of indexes to use as hints for query optimizer. Keys are field names and values are 1 for ascending or -1 for descending.

None
skip int

number documents to skip

0
limit int

limit on total number of documents returned

0
mongoclient_kwargs

Dict of extra kwargs to pass to pymongo find.

required
Source code in src/maggma/stores/mongolike.py
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
def query(  # type: ignore
    self,
    criteria: Optional[Dict] = None,
    properties: Union[Dict, List, None] = None,
    sort: Optional[Dict[str, Union[Sort, int]]] = None,
    hint: Optional[Dict[str, Union[Sort, int]]] = None,
    skip: int = 0,
    limit: int = 0,
    **kwargs,
) -> Iterator[Dict]:
    """
    Queries the Store for a set of documents.

    Args:
        criteria: PyMongo filter for documents to search in
        properties: properties to return in grouped documents
        sort: Dictionary of sort order for fields. Keys are field names and
            values are 1 for ascending or -1 for descending.
        hint: Dictionary of indexes to use as hints for query optimizer.
            Keys are field names and values are 1 for ascending or -1 for descending.
        skip: number documents to skip
        limit: limit on total number of documents returned
        mongoclient_kwargs: Dict of extra kwargs to pass to pymongo find.
    """
    if isinstance(properties, list):
        properties = {p: 1 for p in properties}

    default_sort_formatted = None

    if self.default_sort is not None:
        default_sort_formatted = [
            (k, Sort(v).value) if isinstance(v, int) else (k, v.value) for k, v in self.default_sort.items()
        ]

    sort_list = (
        [(k, Sort(v).value) if isinstance(v, int) else (k, v.value) for k, v in sort.items()]
        if sort
        else default_sort_formatted
    )

    hint_list = (
        [(k, Sort(v).value) if isinstance(v, int) else (k, v.value) for k, v in hint.items()] if hint else None
    )

    yield from self._collection.find(
        filter=criteria,
        projection=properties,
        skip=skip,
        limit=limit,
        sort=sort_list,
        hint=hint_list,
        **kwargs,
    )

remove_docs(criteria)

Remove docs matching the query dictionary.

Parameters:

Name Type Description Default
criteria Dict

query dictionary to match

required
Source code in src/maggma/stores/mongolike.py
418
419
420
421
422
423
424
425
def remove_docs(self, criteria: Dict):
    """
    Remove docs matching the query dictionary.

    Args:
        criteria: query dictionary to match
    """
    self._collection.delete_many(filter=criteria)

update(docs, key=None)

Update documents into the Store.

Parameters:

Name Type Description Default
docs Union[List[Dict], Dict]

the document or list of documents to update

required
key Union[List, str, None]

field name(s) to determine uniqueness for a document, can be a list of multiple fields, a single field, or None if the Store's key field is to be used

None
Source code in src/maggma/stores/mongolike.py
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = None):
    """
    Update documents into the Store.

    Args:
        docs: the document or list of documents to update
        key: field name(s) to determine uniqueness for a
             document, can be a list of multiple fields,
             a single field, or None if the Store's key
             field is to be used
    """

    requests = []

    if not isinstance(docs, list):
        docs = [docs]

    for d in (jsanitize(x, allow_bson=True) for x in docs):
        # document-level validation is optional
        validates = True
        if self.validator:
            validates = self.validator.is_valid(d)
            if not validates:
                if self.validator.strict:
                    raise ValueError(self.validator.validation_errors(d))
                self.logger.error(self.validator.validation_errors(d))

        if validates:
            key = key or self.key
            search_doc = {k: d[k] for k in key} if isinstance(key, list) else {key: d[key]}

            requests.append(ReplaceOne(search_doc, d, upsert=True))

    if len(requests) > 0:
        try:
            self._collection.bulk_write(requests, ordered=False)
        except (OperationFailure, DocumentTooLarge) as e:
            if self.safe_update:
                for req in requests:
                    try:
                        self._collection.bulk_write([req], ordered=False)
                    except (OperationFailure, DocumentTooLarge):
                        self.logger.error(
                            f"Could not upload document for {req._filter} as it was too large for Mongo"
                        )
            else:
                raise e

MongoURIStore

Bases: MongoStore

A Store that connects to a Mongo collection via a URI This is expected to be a special mongodb+srv:// URIs that include client parameters via TXT records.

Source code in src/maggma/stores/mongolike.py
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
class MongoURIStore(MongoStore):
    """
    A Store that connects to a Mongo collection via a URI
    This is expected to be a special mongodb+srv:// URIs that include
    client parameters via TXT records.
    """

    def __init__(
        self,
        uri: str,
        collection_name: str,
        database: Optional[str] = None,
        ssh_tunnel: Optional[SSHTunnel] = None,
        mongoclient_kwargs: Optional[Dict] = None,
        default_sort: Optional[Dict[str, Union[Sort, int]]] = None,
        **kwargs,
    ):
        """
        Args:
            uri: MongoDB+SRV URI
            database: database to connect to
            collection_name: The collection name
            default_sort: Default sort field and direction to use when querying. Can be used to
                ensure determinacy in query results.
        """
        self.uri = uri
        self.ssh_tunnel = ssh_tunnel
        self.default_sort = default_sort
        self.mongoclient_kwargs = mongoclient_kwargs or {}

        # parse the dbname from the uri
        if database is None:
            d_uri = uri_parser.parse_uri(uri)
            if d_uri["database"] is None:
                raise ConfigurationError("If database name is not supplied, a database must be set in the uri")
            self.database = d_uri["database"]
        else:
            self.database = database

        self.collection_name = collection_name
        self.kwargs = kwargs
        self._coll = None
        super(MongoStore, self).__init__(**kwargs)  # lgtm

    @property
    def name(self) -> str:
        """
        Return a string representing this data source.
        """
        # TODO: This is not very safe since it exposes the username/password info
        return self.uri

    def connect(self, force_reset: bool = False):
        """
        Connect to the source data.

        Args:
            force_reset: whether to reset the connection or not when the Store is
                already connected.
        """
        if self._coll is None or force_reset:  # pragma: no cover
            conn: MongoClient = MongoClient(self.uri, **self.mongoclient_kwargs)
            db = conn[self.database]
            self._coll = db[self.collection_name]  # type: ignore

name: str property

Return a string representing this data source.

__init__(uri, collection_name, database=None, ssh_tunnel=None, mongoclient_kwargs=None, default_sort=None, **kwargs)

Parameters:

Name Type Description Default
uri str

MongoDB+SRV URI

required
database Optional[str]

database to connect to

None
collection_name str

The collection name

required
default_sort Optional[Dict[str, Union[Sort, int]]]

Default sort field and direction to use when querying. Can be used to ensure determinacy in query results.

None
Source code in src/maggma/stores/mongolike.py
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
def __init__(
    self,
    uri: str,
    collection_name: str,
    database: Optional[str] = None,
    ssh_tunnel: Optional[SSHTunnel] = None,
    mongoclient_kwargs: Optional[Dict] = None,
    default_sort: Optional[Dict[str, Union[Sort, int]]] = None,
    **kwargs,
):
    """
    Args:
        uri: MongoDB+SRV URI
        database: database to connect to
        collection_name: The collection name
        default_sort: Default sort field and direction to use when querying. Can be used to
            ensure determinacy in query results.
    """
    self.uri = uri
    self.ssh_tunnel = ssh_tunnel
    self.default_sort = default_sort
    self.mongoclient_kwargs = mongoclient_kwargs or {}

    # parse the dbname from the uri
    if database is None:
        d_uri = uri_parser.parse_uri(uri)
        if d_uri["database"] is None:
            raise ConfigurationError("If database name is not supplied, a database must be set in the uri")
        self.database = d_uri["database"]
    else:
        self.database = database

    self.collection_name = collection_name
    self.kwargs = kwargs
    self._coll = None
    super(MongoStore, self).__init__(**kwargs)  # lgtm

connect(force_reset=False)

Connect to the source data.

Parameters:

Name Type Description Default
force_reset bool

whether to reset the connection or not when the Store is already connected.

False
Source code in src/maggma/stores/mongolike.py
498
499
500
501
502
503
504
505
506
507
508
509
def connect(self, force_reset: bool = False):
    """
    Connect to the source data.

    Args:
        force_reset: whether to reset the connection or not when the Store is
            already connected.
    """
    if self._coll is None or force_reset:  # pragma: no cover
        conn: MongoClient = MongoClient(self.uri, **self.mongoclient_kwargs)
        db = conn[self.database]
        self._coll = db[self.collection_name]  # type: ignore

MontyStore

Bases: MemoryStore

A MongoDB compatible store that uses on disk files for storage.

This is handled under the hood using MontyDB. A number of on-disk storage options are available but MontyDB provides a mongo style interface for all options. The options include:

  • sqlite: Uses an sqlite database to store documents.
  • lightning: Uses Lightning Memory-Mapped Database (LMDB) for storage. This can provide fast read and write times but requires lmdb to be installed (in most cases this can be achieved using pip install lmdb).
  • flatfile: Uses a system of flat json files. This is not recommended as multiple simultaneous connections to the store will not work correctly.

Note that MontyDB (and, therefore, MontyStore) will write out a new database to the disk but cannot be used to read an existing (e.g. SQLite) database that wasn't formatted by MontyDB.

See the MontyDB repository for more information: https://github.com/davidlatwe/montydb

Source code in src/maggma/stores/mongolike.py
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
@requires(
    MontyClient is not None,
    "MontyStore requires MontyDB to be installed. See the MontyDB repository for more "
    "information: https://github.com/davidlatwe/montydb",
)
class MontyStore(MemoryStore):
    """
    A MongoDB compatible store that uses on disk files for storage.

    This is handled under the hood using MontyDB. A number of on-disk storage options
    are available but MontyDB provides a mongo style interface for all options. The
    options include:

    - sqlite: Uses an sqlite database to store documents.
    - lightning: Uses Lightning Memory-Mapped Database (LMDB) for storage. This can
      provide fast read and write times but requires lmdb to be installed (in most cases
      this can be achieved using ``pip install lmdb``).
    - flatfile: Uses a system of flat json files. This is not recommended as multiple
      simultaneous connections to the store will not work correctly.

    Note that MontyDB (and, therefore, MontyStore) will write out a new database to
    the disk but cannot be used to read an existing (e.g. SQLite) database that wasn't
    formatted by MontyDB.

    See the MontyDB repository for more information: https://github.com/davidlatwe/montydb
    """

    def __init__(
        self,
        collection_name,
        database_path: Optional[str] = None,
        database_name: str = "db",
        storage: Literal["sqlite", "flatfile", "lightning"] = "sqlite",
        storage_kwargs: Optional[dict] = None,
        client_kwargs: Optional[dict] = None,
        **kwargs,
    ):
        """
        Initializes the Monty Store.

        Args:
            collection_name: Name for the collection.
            database_path: Path to on-disk database files. If None, the current working
                directory will be used.
            database_name: The database name.
            storage: The storage type. Options include "sqlite", "lightning", "flatfile". Note that
            although MontyDB supports in memory storage, this capability is disabled in maggma to avoid unintended
            behavior, since multiple in-memory MontyStore would actually point to the same data.
            storage_kwargs: Keyword arguments passed to ``montydb.set_storage``.
            client_kwargs: Keyword arguments passed to the ``montydb.MontyClient``
                constructor.
            **kwargs: Additional keyword arguments passed to the Store constructor.
        """
        if database_path is None:
            database_path = str(Path.cwd())

        self.database_path = database_path
        self.database_name = database_name
        self.collection_name = collection_name
        self._coll = None  # type: ignore
        self.default_sort = None
        self.ssh_tunnel = None  # This is to fix issues with the tunnel on close
        self.kwargs = kwargs
        self.storage = storage
        self.storage_kwargs = storage_kwargs or {
            "use_bson": True,  # import pymongo's BSON; do not use montydb's
            "mongo_version": "4.0",
        }
        self.client_kwargs = client_kwargs or {}
        super(MongoStore, self).__init__(**kwargs)

    def connect(self, force_reset: bool = False):
        """
        Connect to the database store.

        Args:
            force_reset: whether to reset the connection or not when the Store is
                already connected.
        """
        if not self._coll or force_reset:
            # TODO - workaround, may be obviated by a future montydb update
            if self.database_path != ":memory:":
                set_storage(self.database_path, storage=self.storage, **self.storage_kwargs)
            client = MontyClient(self.database_path, **self.client_kwargs)
            self._coll = client[self.database_name][self.collection_name]

    @property
    def name(self) -> str:
        """Return a string representing this data source."""
        return f"monty://{self.database_path}/{self.database_name}/{self.collection_name}"

    def count(
        self,
        criteria: Optional[Dict] = None,
        hint: Optional[Dict[str, Union[Sort, int]]] = None,
    ) -> int:
        """
        Counts the number of documents matching the query criteria.

        Args:
            criteria: PyMongo filter for documents to count in
            hint: Dictionary of indexes to use as hints for query optimizer.
                Keys are field names and values are 1 for ascending or -1 for descending.
        """
        criteria = criteria if criteria else {}

        hint_list = (
            [(k, Sort(v).value) if isinstance(v, int) else (k, v.value) for k, v in hint.items()] if hint else None
        )

        if hint_list is not None:  # pragma: no cover
            return self._collection.count_documents(filter=criteria, hint=hint_list)

        return self._collection.count_documents(filter=criteria)

    def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = None):
        """
        Update documents into the Store.

        Args:
            docs: The document or list of documents to update.
            key: Field name(s) to determine uniqueness for a document, can be a list of
                multiple fields, a single field, or None if the Store's key field is to be
                used.
        """

        if not isinstance(docs, list):
            docs = [docs]

        for d in docs:
            d = jsanitize(d, allow_bson=True)

            # document-level validation is optional
            validates = True
            if self.validator:
                validates = self.validator.is_valid(d)
                if not validates:
                    if self.validator.strict:
                        raise ValueError(self.validator.validation_errors(d))
                    self.logger.error(self.validator.validation_errors(d))

            if validates:
                key = key or self.key
                search_doc = {k: d[k] for k in key} if isinstance(key, list) else {key: d[key]}

                self._collection.replace_one(search_doc, d, upsert=True)

name: str property

Return a string representing this data source.

__init__(collection_name, database_path=None, database_name='db', storage='sqlite', storage_kwargs=None, client_kwargs=None, **kwargs)

Initializes the Monty Store.

Parameters:

Name Type Description Default
collection_name

Name for the collection.

required
database_path Optional[str]

Path to on-disk database files. If None, the current working directory will be used.

None
database_name str

The database name.

'db'
storage Literal['sqlite', 'flatfile', 'lightning']

The storage type. Options include "sqlite", "lightning", "flatfile". Note that

'sqlite'
storage_kwargs Optional[dict]

Keyword arguments passed to montydb.set_storage.

None
client_kwargs Optional[dict]

Keyword arguments passed to the montydb.MontyClient constructor.

None
**kwargs

Additional keyword arguments passed to the Store constructor.

{}
Source code in src/maggma/stores/mongolike.py
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
def __init__(
    self,
    collection_name,
    database_path: Optional[str] = None,
    database_name: str = "db",
    storage: Literal["sqlite", "flatfile", "lightning"] = "sqlite",
    storage_kwargs: Optional[dict] = None,
    client_kwargs: Optional[dict] = None,
    **kwargs,
):
    """
    Initializes the Monty Store.

    Args:
        collection_name: Name for the collection.
        database_path: Path to on-disk database files. If None, the current working
            directory will be used.
        database_name: The database name.
        storage: The storage type. Options include "sqlite", "lightning", "flatfile". Note that
        although MontyDB supports in memory storage, this capability is disabled in maggma to avoid unintended
        behavior, since multiple in-memory MontyStore would actually point to the same data.
        storage_kwargs: Keyword arguments passed to ``montydb.set_storage``.
        client_kwargs: Keyword arguments passed to the ``montydb.MontyClient``
            constructor.
        **kwargs: Additional keyword arguments passed to the Store constructor.
    """
    if database_path is None:
        database_path = str(Path.cwd())

    self.database_path = database_path
    self.database_name = database_name
    self.collection_name = collection_name
    self._coll = None  # type: ignore
    self.default_sort = None
    self.ssh_tunnel = None  # This is to fix issues with the tunnel on close
    self.kwargs = kwargs
    self.storage = storage
    self.storage_kwargs = storage_kwargs or {
        "use_bson": True,  # import pymongo's BSON; do not use montydb's
        "mongo_version": "4.0",
    }
    self.client_kwargs = client_kwargs or {}
    super(MongoStore, self).__init__(**kwargs)

connect(force_reset=False)

Connect to the database store.

Parameters:

Name Type Description Default
force_reset bool

whether to reset the connection or not when the Store is already connected.

False
Source code in src/maggma/stores/mongolike.py
864
865
866
867
868
869
870
871
872
873
874
875
876
877
def connect(self, force_reset: bool = False):
    """
    Connect to the database store.

    Args:
        force_reset: whether to reset the connection or not when the Store is
            already connected.
    """
    if not self._coll or force_reset:
        # TODO - workaround, may be obviated by a future montydb update
        if self.database_path != ":memory:":
            set_storage(self.database_path, storage=self.storage, **self.storage_kwargs)
        client = MontyClient(self.database_path, **self.client_kwargs)
        self._coll = client[self.database_name][self.collection_name]

count(criteria=None, hint=None)

Counts the number of documents matching the query criteria.

Parameters:

Name Type Description Default
criteria Optional[Dict]

PyMongo filter for documents to count in

None
hint Optional[Dict[str, Union[Sort, int]]]

Dictionary of indexes to use as hints for query optimizer. Keys are field names and values are 1 for ascending or -1 for descending.

None
Source code in src/maggma/stores/mongolike.py
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
def count(
    self,
    criteria: Optional[Dict] = None,
    hint: Optional[Dict[str, Union[Sort, int]]] = None,
) -> int:
    """
    Counts the number of documents matching the query criteria.

    Args:
        criteria: PyMongo filter for documents to count in
        hint: Dictionary of indexes to use as hints for query optimizer.
            Keys are field names and values are 1 for ascending or -1 for descending.
    """
    criteria = criteria if criteria else {}

    hint_list = (
        [(k, Sort(v).value) if isinstance(v, int) else (k, v.value) for k, v in hint.items()] if hint else None
    )

    if hint_list is not None:  # pragma: no cover
        return self._collection.count_documents(filter=criteria, hint=hint_list)

    return self._collection.count_documents(filter=criteria)

update(docs, key=None)

Update documents into the Store.

Parameters:

Name Type Description Default
docs Union[List[Dict], Dict]

The document or list of documents to update.

required
key Union[List, str, None]

Field name(s) to determine uniqueness for a document, can be a list of multiple fields, a single field, or None if the Store's key field is to be used.

None
Source code in src/maggma/stores/mongolike.py
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = None):
    """
    Update documents into the Store.

    Args:
        docs: The document or list of documents to update.
        key: Field name(s) to determine uniqueness for a document, can be a list of
            multiple fields, a single field, or None if the Store's key field is to be
            used.
    """

    if not isinstance(docs, list):
        docs = [docs]

    for d in docs:
        d = jsanitize(d, allow_bson=True)

        # document-level validation is optional
        validates = True
        if self.validator:
            validates = self.validator.is_valid(d)
            if not validates:
                if self.validator.strict:
                    raise ValueError(self.validator.validation_errors(d))
                self.logger.error(self.validator.validation_errors(d))

        if validates:
            key = key or self.key
            search_doc = {k: d[k] for k in key} if isinstance(key, list) else {key: d[key]}

            self._collection.replace_one(search_doc, d, upsert=True)

Module defining a FileStore that enables accessing files in a local directory using typical maggma access patterns.

FileStore

Bases: MemoryStore

A Store for files on disk. Provides a common access method consistent with other stores. Each Item in the Store represents one file. Files can be organized into any type of directory structure.

A hash of the full path to each file is used to define a file_id that uniquely identifies each item.

Any metadata added to the items is written to a .json file in the root directory of the FileStore.

Source code in src/maggma/stores/file_store.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
class FileStore(MemoryStore):
    """
    A Store for files on disk. Provides a common access method consistent with
    other stores. Each Item in the Store represents one file. Files can be organized
    into any type of directory structure.

    A hash of the full path to each file is used to define a file_id that uniquely
    identifies each item.

    Any metadata added to the items is written to a .json file in the root directory
    of the FileStore.
    """

    def __init__(
        self,
        path: Union[str, Path],
        file_filters: Optional[List] = None,
        max_depth: Optional[int] = None,
        read_only: bool = True,
        include_orphans: bool = False,
        json_name: str = "FileStore.json",
        **kwargs,
    ):
        """
        Initializes a FileStore.

        Args:
            path: parent directory containing all files and subdirectories to process
            file_filters: List of fnmatch patterns defining the files to be tracked by
                the FileStore. Only files that match one of the patterns  provided will
                be included in the Store If None (default), all files are included.

                Examples: ["*.txt", "test-[abcd].txt"], etc.
                See https://docs.python.org/3/library/fnmatch.html for full syntax
            max_depth: The maximum depth to look into subdirectories. 0 = no recursion,
                1 = include files 1 directory below the FileStore, etc.
                None (default) will scan all files below
                the FileStore root directory, regardless of depth.
            read_only: If True (default), the .update() and .remove_docs()
                methods are disabled, preventing any changes to the files on
                disk. In addition, metadata cannot be written to disk.
            include_orphans: Whether to include orphaned metadata records in query results.
                Orphaned metadata records are records found in the local JSON file that can
                no longer be associated to a file on disk. This can happen if a file is renamed
                or deleted, or if the FileStore is re-initialized with a more restrictive
                file_filters or max_depth argument. By default (False), these records
                do not appear in query results. Nevertheless, the metadata records are
                retained in the JSON file and the FileStore to prevent accidental data loss.
            json_name: Name of the .json file to which metadata is saved. If read_only
                is False, this file will be created in the root directory of the
                FileStore.
            kwargs: kwargs passed to MemoryStore.__init__()
        """
        # this conditional block is needed in order to guarantee that the 'name'
        # property, which is passed to `MemoryStore`, works correctly
        # collection names passed to MemoryStore cannot end with '.'
        if path == ".":
            path = Path.cwd()
        self.path = Path(path) if isinstance(path, str) else path

        self.json_name = json_name
        file_filters = file_filters if file_filters else ["*"]
        self.file_filters = re.compile("|".join(fnmatch.translate(p) for p in file_filters))
        self.collection_name = "file_store"
        self.key = "file_id"
        self.include_orphans = include_orphans
        self.read_only = read_only
        self.max_depth = max_depth

        self.metadata_store = JSONStore(
            paths=[str(self.path / self.json_name)],
            read_only=self.read_only,
            collection_name=self.collection_name,
            key=self.key,
        )

        self.kwargs = kwargs

        super().__init__(
            collection_name=self.collection_name,
            key=self.key,
            **self.kwargs,
        )

    @property
    def name(self) -> str:
        """
        Return a string representing this data source.
        """
        return f"file://{self.path}"

    def add_metadata(
        self,
        metadata: Optional[Dict] = None,
        query: Optional[Dict] = None,
        auto_data: Optional[Callable[[Dict], Dict]] = None,
        **kwargs,
    ):
        """
        Add metadata to a record in the FileStore, either manually or by computing it automatically
        from another field, such as name or path (see auto_data).

        Args:
            metadata: dict of additional data to add to the records returned by query.
                      Note that any protected keys (such as 'name', 'path', etc.)
                      will be ignored.
            query: Query passed to FileStore.query()
            auto_data: A function that automatically computes metadata based on a field in
                    the record itself. The function must take in the item as a dict and
                    return a dict containing the desired metadata. A typical use case is
                    to assign metadata based on the name of a file. For example, for
                    data files named like `2022-04-01_april_fool_experiment.txt`, the
                    auto_data function could be:

                    def get_metadata_from_filename(d):
                        return {"date": d["name"].split("_")[0],
                                "test_name": d["name"].split("_")[1]
                                }

                    Note that in the case of conflict between manual and automatically
                    computed metadata (for example, if metadata={"name": "another_name"} was
                    supplied alongside the auto_data function above), the manually-supplied
                    metadata is used.
            kwargs: kwargs passed to FileStore.query()
        """
        if metadata is None:
            metadata = {}
        # sanitize the metadata
        filtered_metadata = self._filter_data(metadata)
        updated_docs = []

        for doc in self.query(query, **kwargs):
            if auto_data:
                extra_data = self._filter_data(auto_data(doc))
                doc.update(extra_data)
            doc.update(filtered_metadata)
            updated_docs.append(doc)

        self.update(updated_docs, key=self.key)

    def read(self) -> List[Dict]:
        """
        Iterate through all files in the Store folder and populate
        the Store with dictionaries containing basic information about each file.

        The keys of the documents added to the Store are:

        - name: str = File name
        - path: Path = Absolute path of this file
        - parent: str = Name of the parent directory (if any)
        - file_id: str = Unique identifier for this file, computed from the hash
                    of its path relative to the base FileStore directory and
                    the file creation time. The key of this field is 'file_id'
                    by default but can be changed via the 'key' kwarg to
                    `FileStore.__init__()`.
        - size: int = Size of this file in bytes
        - last_updated: datetime = Time this file was last modified
        - hash: str = Hash of the file contents
        - orphan: bool = Whether this record is an orphan
        """
        file_list = []
        # generate a list of files in subdirectories
        for root, _dirs, files in os.walk(self.path):
            # for pattern in self.file_filters:
            for match in filter(self.file_filters.match, files):
                # for match in fnmatch.filter(files, pattern):
                path = Path(os.path.join(root, match))
                # ignore the .json file created by the Store
                if path.is_file() and path.name != self.json_name:
                    # filter based on depth
                    depth = len(path.relative_to(self.path).parts) - 1
                    if self.max_depth is None or depth <= self.max_depth:
                        file_list.append(self._create_record_from_file(path))

        return file_list

    def _create_record_from_file(self, f: Path) -> Dict:
        """
        Given the path to a file, return a Dict that constitutes a record of
        basic information about that file. The keys in the returned dict
        are:

        - name: str = File name
        - path: Path = Absolute path of this file
        - parent: str = Name of the parent directory (if any)
        - file_id: str = Unique identifier for this file, computed from the hash
                    of its path relative to the base FileStore directory and
                    the file creation time. The key of this field is 'file_id'
                    by default but can be changed via the 'key' kwarg to
                    FileStore.__init__().
        - size: int = Size of this file in bytes
        - last_updated: datetime = Time this file was last modified
        - hash: str = Hash of the file contents
        - orphan: bool = Whether this record is an orphan
        """
        # compute the file_id from the relative path
        relative_path = f.relative_to(self.path)
        digest = hashlib.md5()
        digest.update(str(relative_path).encode())
        file_id = str(digest.hexdigest())

        # hash the file contents
        digest2 = hashlib.md5()
        b = bytearray(128 * 2056)
        mv = memoryview(b)
        digest2.update(self.name.encode())
        with open(f.as_posix(), "rb", buffering=0) as file:
            # this block copied from the file_digest method in python 3.11+
            # see https://github.com/python/cpython/blob/0ba07b2108d4763273f3fb85544dde34c5acd40a/Lib/hashlib.py#L213
            if hasattr(file, "getbuffer"):
                # io.BytesIO object, use zero-copy buffer
                digest2.update(file.getbuffer())
            else:
                for n in iter(lambda: file.readinto(mv), 0):
                    digest2.update(mv[:n])

        content_hash = str(digest2.hexdigest())
        stats = f.stat()

        return {
            "name": f.name,
            "path": f,
            "path_relative": relative_path,
            "parent": f.parent.name,
            "size": stats.st_size,
            "last_updated": datetime.fromtimestamp(stats.st_mtime, tz=timezone.utc),
            "orphan": False,
            "hash": content_hash,
            self.key: file_id,
        }

    def connect(self, force_reset: bool = False):
        """
        Connect to the source data.

        Read all the files in the directory, create corresponding File
        items in the internal MemoryStore.

        If there is a metadata .json file in the directory, read its
        contents into the MemoryStore

        Args:
            force_reset: whether to reset the connection or not when the Store is
                already connected.
        """
        # read all files and place them in the MemoryStore
        # use super.update to bypass the read_only guard statement
        # because we want the file data to be populated in memory
        super().connect(force_reset=force_reset)
        super().update(self.read())

        # now read any metadata from the .json file
        try:
            self.metadata_store.connect(force_reset=force_reset)
            metadata = list(self.metadata_store.query())
        except FileNotFoundError:
            metadata = []
            warnings.warn(
                f"""
                JSON file '{self.json_name}' not found. To create this file automatically, re-initialize
                the FileStore with read_only=False.
                """
            )

        # merge metadata with file data and check for orphaned metadata
        requests = []
        found_orphans = False
        key = self.key
        file_ids = self.distinct(self.key)
        for d in metadata:
            search_doc = {k: d[k] for k in key} if isinstance(key, list) else {key: d[key]}

            if d[key] not in file_ids:
                found_orphans = True
                d.update({"orphan": True})

            del d["_id"]

            requests.append(UpdateOne(search_doc, {"$set": d}, upsert=True))

        if found_orphans:
            warnings.warn(
                f"Orphaned metadata was found in {self.json_name}. This metadata"
                "will be added to the store with {'orphan': True}"
            )
        if len(requests) > 0:
            self._collection.bulk_write(requests, ordered=False)

    def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = None):
        """
        Update items in the Store. Only possible if the store is not read only. Any new
        fields that are added will be written to the JSON file in the root directory
        of the FileStore.

        Note that certain fields that come from file metadata on disk are protected and
        cannot be updated with this method. This prevents the contents of the FileStore
        from becoming out of sync with the files on which it is based. The protected fields
        are keys in the dict returned by _create_record_from_file, e.g. 'name', 'parent',
        'path', 'last_updated', 'hash', 'size', 'contents', and 'orphan'. The 'path_relative' and key fields are
        retained to make each document in the JSON file identifiable by manual inspection.

        Args:
            docs: the document or list of documents to update
            key: field name(s) to determine uniqueness for a
                 document, can be a list of multiple fields,
                 a single field, or None if the Store's key
                 field is to be used
        """
        if self.read_only:
            raise StoreError(
                "This Store is read-only. To enable file I/O, re-initialize the store with read_only=False."
            )

        super().update(docs, key)
        data = list(self.query())
        filtered_data = []
        # remove fields that are populated by .read()
        for d in data:
            filtered_d = self._filter_data(d)
            # don't write records that contain only file_id
            if len(set(filtered_d.keys()).difference({"path_relative", self.key})) != 0:
                filtered_data.append(filtered_d)
        self.metadata_store.update(filtered_data, self.key)

    def _filter_data(self, d):
        """
        Remove any protected keys from a dictionary.

        Args:
            d: Dictionary whose keys are to be filtered
        """
        return {k: v for k, v in d.items() if k not in PROTECTED_KEYS.union({self.last_updated_field})}

    def query(  # type: ignore
        self,
        criteria: Optional[Dict] = None,
        properties: Union[Dict, List, None] = None,
        sort: Optional[Dict[str, Union[Sort, int]]] = None,
        hint: Optional[Dict[str, Union[Sort, int]]] = None,
        skip: int = 0,
        limit: int = 0,
        contents_size_limit: Optional[int] = 0,
    ) -> Iterator[Dict]:
        """
        Queries the Store for a set of documents.

        Args:
            criteria: PyMongo filter for documents to search in
            properties: properties to return in grouped documents
            sort: Dictionary of sort order for fields. Keys are field names and
                values are 1 for ascending or -1 for descending.
            hint: Dictionary of indexes to use as hints for query optimizer.
                Keys are field names and values are 1 for ascending or -1 for descending.
            skip: number documents to skip
            limit: limit on total number of documents returned
            contents_size_limit: Maximum file size in bytes for which to return contents.
                The FileStore will attempt to read the file and populate the 'contents' key
                with its content at query time, unless the file size is larger than this value.
                By default, reading content is disabled. Note that enabling content reading
                can substantially slow down the query operation, especially when there
                are large numbers of files.
        """
        return_contents = False
        criteria = criteria if criteria else {}
        if criteria.get("orphan", None) is None and not self.include_orphans:
            criteria.update({"orphan": False})

        if criteria.get("contents"):
            warnings.warn("'contents' is not a queryable field! Ignoring.")

        if isinstance(properties, list):
            properties = {p: 1 for p in properties}

        orig_properties = properties.copy() if properties else None

        if properties is None:
            # None means return all fields, including contents
            return_contents = True
        elif properties.get("contents"):
            return_contents = True
            # remove contents b/c it isn't stored in the MemoryStore
            properties.pop("contents")
            # add size and path to query so that file can be read
            properties.update({"size": 1})
            properties.update({"path": 1})

        for d in super().query(
            criteria=criteria,
            properties=properties,
            sort=sort,
            hint=hint,
            skip=skip,
            limit=limit,
        ):
            # add file contents to the returned documents, if appropriate
            if return_contents and not d.get("orphan"):
                if contents_size_limit is None or d["size"] <= contents_size_limit:
                    # attempt to read the file contents and inject into the document
                    # TODO - could add more logic for detecting different file types
                    # and more nuanced exception handling
                    try:
                        with zopen(d["path"], "r") as f:
                            data = f.read()
                    except Exception as e:
                        data = f"Unable to read: {e}"

                elif d["size"] > contents_size_limit:
                    data = f"File exceeds size limit of {contents_size_limit} bytes"
                else:
                    data = "Unable to read: Unknown error"

                d.update({"contents": data})

                # remove size and path if not explicitly requested
                if orig_properties is not None and "size" not in orig_properties:
                    d.pop("size")
                if orig_properties is not None and "path" not in orig_properties:
                    d.pop("path")

            yield d

    def query_one(
        self,
        criteria: Optional[Dict] = None,
        properties: Union[Dict, List, None] = None,
        sort: Optional[Dict[str, Union[Sort, int]]] = None,
        contents_size_limit: Optional[int] = None,
    ):
        """
        Queries the Store for a single document.

        Args:
            criteria: PyMongo filter for documents to search
            properties: properties to return in the document
            sort: Dictionary of sort order for fields. Keys are field names and
                values are 1 for ascending or -1 for descending.
            contents_size_limit: Maximum file size in bytes for which to return contents.
                The FileStore will attempt to read the file and populate the 'contents' key
                with its content at query time, unless the file size is larger than this value.
        """
        return next(
            self.query(
                criteria=criteria,
                properties=properties,
                sort=sort,
                contents_size_limit=contents_size_limit,
            ),
            None,
        )

    def remove_docs(self, criteria: Dict, confirm: bool = False):
        """
        Remove items matching the query dictionary.

        Args:
            criteria: query dictionary to match
            confirm: Boolean flag to confirm that remove_docs should delete
                     files on disk. Default: False.
        """
        if self.read_only:
            raise StoreError(
                "This Store is read-only. To enable file I/O, re-initialize the store with read_only=False."
            )

        docs = list(self.query(criteria))
        # this ensures that any modifications to criteria made by self.query
        # (e.g., related to orphans or contents) are propagated through to the superclass
        new_criteria = {"file_id": {"$in": [d["file_id"] for d in docs]}}

        if len(docs) > 0 and not confirm:
            raise StoreError(
                f"Warning! This command is about to delete {len(docs)} items from disk! "
                "If this is what you want, reissue this command with confirm=True."
            )

        for d in docs:
            Path(d["path"]).unlink()
            super().remove_docs(criteria=new_criteria)

name: str property

Return a string representing this data source.

__init__(path, file_filters=None, max_depth=None, read_only=True, include_orphans=False, json_name='FileStore.json', **kwargs)

Initializes a FileStore.

Parameters:

Name Type Description Default
path Union[str, Path]

parent directory containing all files and subdirectories to process

required
file_filters Optional[List]

List of fnmatch patterns defining the files to be tracked by the FileStore. Only files that match one of the patterns provided will be included in the Store If None (default), all files are included.

Examples: ["*.txt", "test-[abcd].txt"], etc. See https://docs.python.org/3/library/fnmatch.html for full syntax

None
max_depth Optional[int]

The maximum depth to look into subdirectories. 0 = no recursion, 1 = include files 1 directory below the FileStore, etc. None (default) will scan all files below the FileStore root directory, regardless of depth.

None
read_only bool

If True (default), the .update() and .remove_docs() methods are disabled, preventing any changes to the files on disk. In addition, metadata cannot be written to disk.

True
include_orphans bool

Whether to include orphaned metadata records in query results. Orphaned metadata records are records found in the local JSON file that can no longer be associated to a file on disk. This can happen if a file is renamed or deleted, or if the FileStore is re-initialized with a more restrictive file_filters or max_depth argument. By default (False), these records do not appear in query results. Nevertheless, the metadata records are retained in the JSON file and the FileStore to prevent accidental data loss.

False
json_name str

Name of the .json file to which metadata is saved. If read_only is False, this file will be created in the root directory of the FileStore.

'FileStore.json'
kwargs

kwargs passed to MemoryStore.init()

{}
Source code in src/maggma/stores/file_store.py
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def __init__(
    self,
    path: Union[str, Path],
    file_filters: Optional[List] = None,
    max_depth: Optional[int] = None,
    read_only: bool = True,
    include_orphans: bool = False,
    json_name: str = "FileStore.json",
    **kwargs,
):
    """
    Initializes a FileStore.

    Args:
        path: parent directory containing all files and subdirectories to process
        file_filters: List of fnmatch patterns defining the files to be tracked by
            the FileStore. Only files that match one of the patterns  provided will
            be included in the Store If None (default), all files are included.

            Examples: ["*.txt", "test-[abcd].txt"], etc.
            See https://docs.python.org/3/library/fnmatch.html for full syntax
        max_depth: The maximum depth to look into subdirectories. 0 = no recursion,
            1 = include files 1 directory below the FileStore, etc.
            None (default) will scan all files below
            the FileStore root directory, regardless of depth.
        read_only: If True (default), the .update() and .remove_docs()
            methods are disabled, preventing any changes to the files on
            disk. In addition, metadata cannot be written to disk.
        include_orphans: Whether to include orphaned metadata records in query results.
            Orphaned metadata records are records found in the local JSON file that can
            no longer be associated to a file on disk. This can happen if a file is renamed
            or deleted, or if the FileStore is re-initialized with a more restrictive
            file_filters or max_depth argument. By default (False), these records
            do not appear in query results. Nevertheless, the metadata records are
            retained in the JSON file and the FileStore to prevent accidental data loss.
        json_name: Name of the .json file to which metadata is saved. If read_only
            is False, this file will be created in the root directory of the
            FileStore.
        kwargs: kwargs passed to MemoryStore.__init__()
    """
    # this conditional block is needed in order to guarantee that the 'name'
    # property, which is passed to `MemoryStore`, works correctly
    # collection names passed to MemoryStore cannot end with '.'
    if path == ".":
        path = Path.cwd()
    self.path = Path(path) if isinstance(path, str) else path

    self.json_name = json_name
    file_filters = file_filters if file_filters else ["*"]
    self.file_filters = re.compile("|".join(fnmatch.translate(p) for p in file_filters))
    self.collection_name = "file_store"
    self.key = "file_id"
    self.include_orphans = include_orphans
    self.read_only = read_only
    self.max_depth = max_depth

    self.metadata_store = JSONStore(
        paths=[str(self.path / self.json_name)],
        read_only=self.read_only,
        collection_name=self.collection_name,
        key=self.key,
    )

    self.kwargs = kwargs

    super().__init__(
        collection_name=self.collection_name,
        key=self.key,
        **self.kwargs,
    )

add_metadata(metadata=None, query=None, auto_data=None, **kwargs)

Add metadata to a record in the FileStore, either manually or by computing it automatically from another field, such as name or path (see auto_data).

Parameters:

Name Type Description Default
metadata Optional[Dict]

dict of additional data to add to the records returned by query. Note that any protected keys (such as 'name', 'path', etc.) will be ignored.

None
query Optional[Dict]

Query passed to FileStore.query()

None
auto_data Optional[Callable[[Dict], Dict]]

A function that automatically computes metadata based on a field in the record itself. The function must take in the item as a dict and return a dict containing the desired metadata. A typical use case is to assign metadata based on the name of a file. For example, for data files named like 2022-04-01_april_fool_experiment.txt, the auto_data function could be:

def get_metadata_from_filename(d):
    return {"date": d["name"].split("_")[0],
            "test_name": d["name"].split("_")[1]
            }

Note that in the case of conflict between manual and automatically
computed metadata (for example, if metadata={"name": "another_name"} was
supplied alongside the auto_data function above), the manually-supplied
metadata is used.
None
kwargs

kwargs passed to FileStore.query()

{}
Source code in src/maggma/stores/file_store.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
def add_metadata(
    self,
    metadata: Optional[Dict] = None,
    query: Optional[Dict] = None,
    auto_data: Optional[Callable[[Dict], Dict]] = None,
    **kwargs,
):
    """
    Add metadata to a record in the FileStore, either manually or by computing it automatically
    from another field, such as name or path (see auto_data).

    Args:
        metadata: dict of additional data to add to the records returned by query.
                  Note that any protected keys (such as 'name', 'path', etc.)
                  will be ignored.
        query: Query passed to FileStore.query()
        auto_data: A function that automatically computes metadata based on a field in
                the record itself. The function must take in the item as a dict and
                return a dict containing the desired metadata. A typical use case is
                to assign metadata based on the name of a file. For example, for
                data files named like `2022-04-01_april_fool_experiment.txt`, the
                auto_data function could be:

                def get_metadata_from_filename(d):
                    return {"date": d["name"].split("_")[0],
                            "test_name": d["name"].split("_")[1]
                            }

                Note that in the case of conflict between manual and automatically
                computed metadata (for example, if metadata={"name": "another_name"} was
                supplied alongside the auto_data function above), the manually-supplied
                metadata is used.
        kwargs: kwargs passed to FileStore.query()
    """
    if metadata is None:
        metadata = {}
    # sanitize the metadata
    filtered_metadata = self._filter_data(metadata)
    updated_docs = []

    for doc in self.query(query, **kwargs):
        if auto_data:
            extra_data = self._filter_data(auto_data(doc))
            doc.update(extra_data)
        doc.update(filtered_metadata)
        updated_docs.append(doc)

    self.update(updated_docs, key=self.key)

connect(force_reset=False)

Connect to the source data.

Read all the files in the directory, create corresponding File items in the internal MemoryStore.

If there is a metadata .json file in the directory, read its contents into the MemoryStore

Parameters:

Name Type Description Default
force_reset bool

whether to reset the connection or not when the Store is already connected.

False
Source code in src/maggma/stores/file_store.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
def connect(self, force_reset: bool = False):
    """
    Connect to the source data.

    Read all the files in the directory, create corresponding File
    items in the internal MemoryStore.

    If there is a metadata .json file in the directory, read its
    contents into the MemoryStore

    Args:
        force_reset: whether to reset the connection or not when the Store is
            already connected.
    """
    # read all files and place them in the MemoryStore
    # use super.update to bypass the read_only guard statement
    # because we want the file data to be populated in memory
    super().connect(force_reset=force_reset)
    super().update(self.read())

    # now read any metadata from the .json file
    try:
        self.metadata_store.connect(force_reset=force_reset)
        metadata = list(self.metadata_store.query())
    except FileNotFoundError:
        metadata = []
        warnings.warn(
            f"""
            JSON file '{self.json_name}' not found. To create this file automatically, re-initialize
            the FileStore with read_only=False.
            """
        )

    # merge metadata with file data and check for orphaned metadata
    requests = []
    found_orphans = False
    key = self.key
    file_ids = self.distinct(self.key)
    for d in metadata:
        search_doc = {k: d[k] for k in key} if isinstance(key, list) else {key: d[key]}

        if d[key] not in file_ids:
            found_orphans = True
            d.update({"orphan": True})

        del d["_id"]

        requests.append(UpdateOne(search_doc, {"$set": d}, upsert=True))

    if found_orphans:
        warnings.warn(
            f"Orphaned metadata was found in {self.json_name}. This metadata"
            "will be added to the store with {'orphan': True}"
        )
    if len(requests) > 0:
        self._collection.bulk_write(requests, ordered=False)

query(criteria=None, properties=None, sort=None, hint=None, skip=0, limit=0, contents_size_limit=0)

Queries the Store for a set of documents.

Parameters:

Name Type Description Default
criteria Optional[Dict]

PyMongo filter for documents to search in

None
properties Union[Dict, List, None]

properties to return in grouped documents

None
sort Optional[Dict[str, Union[Sort, int]]]

Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending.

None
hint Optional[Dict[str, Union[Sort, int]]]

Dictionary of indexes to use as hints for query optimizer. Keys are field names and values are 1 for ascending or -1 for descending.

None
skip int

number documents to skip

0
limit int

limit on total number of documents returned

0
contents_size_limit Optional[int]

Maximum file size in bytes for which to return contents. The FileStore will attempt to read the file and populate the 'contents' key with its content at query time, unless the file size is larger than this value. By default, reading content is disabled. Note that enabling content reading can substantially slow down the query operation, especially when there are large numbers of files.

0
Source code in src/maggma/stores/file_store.py
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
def query(  # type: ignore
    self,
    criteria: Optional[Dict] = None,
    properties: Union[Dict, List, None] = None,
    sort: Optional[Dict[str, Union[Sort, int]]] = None,
    hint: Optional[Dict[str, Union[Sort, int]]] = None,
    skip: int = 0,
    limit: int = 0,
    contents_size_limit: Optional[int] = 0,
) -> Iterator[Dict]:
    """
    Queries the Store for a set of documents.

    Args:
        criteria: PyMongo filter for documents to search in
        properties: properties to return in grouped documents
        sort: Dictionary of sort order for fields. Keys are field names and
            values are 1 for ascending or -1 for descending.
        hint: Dictionary of indexes to use as hints for query optimizer.
            Keys are field names and values are 1 for ascending or -1 for descending.
        skip: number documents to skip
        limit: limit on total number of documents returned
        contents_size_limit: Maximum file size in bytes for which to return contents.
            The FileStore will attempt to read the file and populate the 'contents' key
            with its content at query time, unless the file size is larger than this value.
            By default, reading content is disabled. Note that enabling content reading
            can substantially slow down the query operation, especially when there
            are large numbers of files.
    """
    return_contents = False
    criteria = criteria if criteria else {}
    if criteria.get("orphan", None) is None and not self.include_orphans:
        criteria.update({"orphan": False})

    if criteria.get("contents"):
        warnings.warn("'contents' is not a queryable field! Ignoring.")

    if isinstance(properties, list):
        properties = {p: 1 for p in properties}

    orig_properties = properties.copy() if properties else None

    if properties is None:
        # None means return all fields, including contents
        return_contents = True
    elif properties.get("contents"):
        return_contents = True
        # remove contents b/c it isn't stored in the MemoryStore
        properties.pop("contents")
        # add size and path to query so that file can be read
        properties.update({"size": 1})
        properties.update({"path": 1})

    for d in super().query(
        criteria=criteria,
        properties=properties,
        sort=sort,
        hint=hint,
        skip=skip,
        limit=limit,
    ):
        # add file contents to the returned documents, if appropriate
        if return_contents and not d.get("orphan"):
            if contents_size_limit is None or d["size"] <= contents_size_limit:
                # attempt to read the file contents and inject into the document
                # TODO - could add more logic for detecting different file types
                # and more nuanced exception handling
                try:
                    with zopen(d["path"], "r") as f:
                        data = f.read()
                except Exception as e:
                    data = f"Unable to read: {e}"

            elif d["size"] > contents_size_limit:
                data = f"File exceeds size limit of {contents_size_limit} bytes"
            else:
                data = "Unable to read: Unknown error"

            d.update({"contents": data})

            # remove size and path if not explicitly requested
            if orig_properties is not None and "size" not in orig_properties:
                d.pop("size")
            if orig_properties is not None and "path" not in orig_properties:
                d.pop("path")

        yield d

query_one(criteria=None, properties=None, sort=None, contents_size_limit=None)

Queries the Store for a single document.

Parameters:

Name Type Description Default
criteria Optional[Dict]

PyMongo filter for documents to search

None
properties Union[Dict, List, None]

properties to return in the document

None
sort Optional[Dict[str, Union[Sort, int]]]

Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending.

None
contents_size_limit Optional[int]

Maximum file size in bytes for which to return contents. The FileStore will attempt to read the file and populate the 'contents' key with its content at query time, unless the file size is larger than this value.

None
Source code in src/maggma/stores/file_store.py
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
def query_one(
    self,
    criteria: Optional[Dict] = None,
    properties: Union[Dict, List, None] = None,
    sort: Optional[Dict[str, Union[Sort, int]]] = None,
    contents_size_limit: Optional[int] = None,
):
    """
    Queries the Store for a single document.

    Args:
        criteria: PyMongo filter for documents to search
        properties: properties to return in the document
        sort: Dictionary of sort order for fields. Keys are field names and
            values are 1 for ascending or -1 for descending.
        contents_size_limit: Maximum file size in bytes for which to return contents.
            The FileStore will attempt to read the file and populate the 'contents' key
            with its content at query time, unless the file size is larger than this value.
    """
    return next(
        self.query(
            criteria=criteria,
            properties=properties,
            sort=sort,
            contents_size_limit=contents_size_limit,
        ),
        None,
    )

read()

Iterate through all files in the Store folder and populate the Store with dictionaries containing basic information about each file.

The keys of the documents added to the Store are:

  • name: str = File name
  • path: Path = Absolute path of this file
  • parent: str = Name of the parent directory (if any)
  • file_id: str = Unique identifier for this file, computed from the hash of its path relative to the base FileStore directory and the file creation time. The key of this field is 'file_id' by default but can be changed via the 'key' kwarg to FileStore.__init__().
  • size: int = Size of this file in bytes
  • last_updated: datetime = Time this file was last modified
  • hash: str = Hash of the file contents
  • orphan: bool = Whether this record is an orphan
Source code in src/maggma/stores/file_store.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def read(self) -> List[Dict]:
    """
    Iterate through all files in the Store folder and populate
    the Store with dictionaries containing basic information about each file.

    The keys of the documents added to the Store are:

    - name: str = File name
    - path: Path = Absolute path of this file
    - parent: str = Name of the parent directory (if any)
    - file_id: str = Unique identifier for this file, computed from the hash
                of its path relative to the base FileStore directory and
                the file creation time. The key of this field is 'file_id'
                by default but can be changed via the 'key' kwarg to
                `FileStore.__init__()`.
    - size: int = Size of this file in bytes
    - last_updated: datetime = Time this file was last modified
    - hash: str = Hash of the file contents
    - orphan: bool = Whether this record is an orphan
    """
    file_list = []
    # generate a list of files in subdirectories
    for root, _dirs, files in os.walk(self.path):
        # for pattern in self.file_filters:
        for match in filter(self.file_filters.match, files):
            # for match in fnmatch.filter(files, pattern):
            path = Path(os.path.join(root, match))
            # ignore the .json file created by the Store
            if path.is_file() and path.name != self.json_name:
                # filter based on depth
                depth = len(path.relative_to(self.path).parts) - 1
                if self.max_depth is None or depth <= self.max_depth:
                    file_list.append(self._create_record_from_file(path))

    return file_list

remove_docs(criteria, confirm=False)

Remove items matching the query dictionary.

Parameters:

Name Type Description Default
criteria Dict

query dictionary to match

required
confirm bool

Boolean flag to confirm that remove_docs should delete files on disk. Default: False.

False
Source code in src/maggma/stores/file_store.py
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
def remove_docs(self, criteria: Dict, confirm: bool = False):
    """
    Remove items matching the query dictionary.

    Args:
        criteria: query dictionary to match
        confirm: Boolean flag to confirm that remove_docs should delete
                 files on disk. Default: False.
    """
    if self.read_only:
        raise StoreError(
            "This Store is read-only. To enable file I/O, re-initialize the store with read_only=False."
        )

    docs = list(self.query(criteria))
    # this ensures that any modifications to criteria made by self.query
    # (e.g., related to orphans or contents) are propagated through to the superclass
    new_criteria = {"file_id": {"$in": [d["file_id"] for d in docs]}}

    if len(docs) > 0 and not confirm:
        raise StoreError(
            f"Warning! This command is about to delete {len(docs)} items from disk! "
            "If this is what you want, reissue this command with confirm=True."
        )

    for d in docs:
        Path(d["path"]).unlink()
        super().remove_docs(criteria=new_criteria)

update(docs, key=None)

Update items in the Store. Only possible if the store is not read only. Any new fields that are added will be written to the JSON file in the root directory of the FileStore.

Note that certain fields that come from file metadata on disk are protected and cannot be updated with this method. This prevents the contents of the FileStore from becoming out of sync with the files on which it is based. The protected fields are keys in the dict returned by _create_record_from_file, e.g. 'name', 'parent', 'path', 'last_updated', 'hash', 'size', 'contents', and 'orphan'. The 'path_relative' and key fields are retained to make each document in the JSON file identifiable by manual inspection.

Parameters:

Name Type Description Default
docs Union[List[Dict], Dict]

the document or list of documents to update

required
key Union[List, str, None]

field name(s) to determine uniqueness for a document, can be a list of multiple fields, a single field, or None if the Store's key field is to be used

None
Source code in src/maggma/stores/file_store.py
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = None):
    """
    Update items in the Store. Only possible if the store is not read only. Any new
    fields that are added will be written to the JSON file in the root directory
    of the FileStore.

    Note that certain fields that come from file metadata on disk are protected and
    cannot be updated with this method. This prevents the contents of the FileStore
    from becoming out of sync with the files on which it is based. The protected fields
    are keys in the dict returned by _create_record_from_file, e.g. 'name', 'parent',
    'path', 'last_updated', 'hash', 'size', 'contents', and 'orphan'. The 'path_relative' and key fields are
    retained to make each document in the JSON file identifiable by manual inspection.

    Args:
        docs: the document or list of documents to update
        key: field name(s) to determine uniqueness for a
             document, can be a list of multiple fields,
             a single field, or None if the Store's key
             field is to be used
    """
    if self.read_only:
        raise StoreError(
            "This Store is read-only. To enable file I/O, re-initialize the store with read_only=False."
        )

    super().update(docs, key)
    data = list(self.query())
    filtered_data = []
    # remove fields that are populated by .read()
    for d in data:
        filtered_d = self._filter_data(d)
        # don't write records that contain only file_id
        if len(set(filtered_d.keys()).difference({"path_relative", self.key})) != 0:
            filtered_data.append(filtered_d)
    self.metadata_store.update(filtered_data, self.key)

Module containing various definitions of Stores. Stores are a default access pattern to data and provide various utilities.

GridFSStore

Bases: Store

A Store for GridFS backend. Provides a common access method consistent with other stores.

Source code in src/maggma/stores/gridfs.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
class GridFSStore(Store):
    """
    A Store for GridFS backend. Provides a common access method consistent with other stores.
    """

    def __init__(
        self,
        database: str,
        collection_name: str,
        host: str = "localhost",
        port: int = 27017,
        username: str = "",
        password: str = "",
        compression: bool = False,
        ensure_metadata: bool = False,
        searchable_fields: Optional[List[str]] = None,
        auth_source: Optional[str] = None,
        mongoclient_kwargs: Optional[Dict] = None,
        ssh_tunnel: Optional[SSHTunnel] = None,
        **kwargs,
    ):
        """
        Initializes a GridFS Store for binary data
        Args:
            database: database name
            collection_name: The name of the collection.
                This is the string portion before the GridFS extensions
            host: hostname for the database
            port: port to connect to
            username: username to connect as
            password: password to authenticate as
            compression: compress the data as it goes into GridFS
            ensure_metadata: ensure returned documents have the metadata fields
            searchable_fields: fields to keep in the index store
            auth_source: The database to authenticate on. Defaults to the database name.
            ssh_tunnel: An SSHTunnel object to use.
        """

        self.database = database
        self.collection_name = collection_name
        self.host = host
        self.port = port
        self.username = username
        self.password = password
        self._coll: Any = None
        self.compression = compression
        self.ensure_metadata = ensure_metadata
        self.searchable_fields = [] if searchable_fields is None else searchable_fields
        self.kwargs = kwargs
        self.ssh_tunnel = ssh_tunnel

        if auth_source is None:
            auth_source = self.database
        self.auth_source = auth_source
        self.mongoclient_kwargs = mongoclient_kwargs or {}

        if "key" not in kwargs:
            kwargs["key"] = "_id"
        super().__init__(**kwargs)

    @classmethod
    def from_launchpad_file(cls, lp_file, collection_name, **kwargs):
        """
        Convenience method to construct a GridFSStore from a launchpad file.

        Note: A launchpad file is a special formatted yaml file used in fireworks

        Returns:
        """
        with open(lp_file) as f:
            lp_creds = yaml.safe_load(f.read())

        db_creds = lp_creds.copy()
        db_creds["database"] = db_creds["name"]
        for key in list(db_creds.keys()):
            if key not in ["database", "host", "port", "username", "password"]:
                db_creds.pop(key)
        db_creds["collection_name"] = collection_name

        return cls(**db_creds, **kwargs)

    @property
    def name(self) -> str:
        """
        Return a string representing this data source.
        """
        return f"gridfs://{self.host}/{self.database}/{self.collection_name}"

    def connect(self, force_reset: bool = False):
        """
        Connect to the source data.

        Args:
            force_reset: whether to reset the connection or not when the Store is
                already connected.
        """
        if not self._coll or force_reset:
            if self.ssh_tunnel is None:
                host = self.host
                port = self.port
            else:
                self.ssh_tunnel.start()
                host, port = self.ssh_tunnel.local_address

            conn: MongoClient = (
                MongoClient(
                    host=host,
                    port=port,
                    username=self.username,
                    password=self.password,
                    authSource=self.auth_source,
                    **self.mongoclient_kwargs,
                )
                if self.username != ""
                else MongoClient(host, port, **self.mongoclient_kwargs)
            )
            db = conn[self.database]
            self._coll = gridfs.GridFS(db, self.collection_name)
            self._files_collection = db[f"{self.collection_name}.files"]
            self._files_store = MongoStore.from_collection(self._files_collection)
            self._files_store.last_updated_field = f"metadata.{self.last_updated_field}"
            self._files_store.key = self.key
            self._chunks_collection = db[f"{self.collection_name}.chunks"]

    @property
    def _collection(self):
        """Property referring to underlying pymongo collection."""
        if self._coll is None:
            raise StoreError("Must connect Mongo-like store before attempting to use it")
        return self._coll

    @property
    def last_updated(self) -> datetime:
        """
        Provides the most recent last_updated date time stamp from
        the documents in this Store.
        """
        return self._files_store.last_updated

    @classmethod
    def transform_criteria(cls, criteria: Dict) -> Dict:
        """
        Allow client to not need to prepend 'metadata.' to query fields.

        Args:
            criteria: Query criteria
        """
        new_criteria = dict()
        for field in criteria:
            if field not in files_collection_fields and not field.startswith("metadata."):
                new_criteria["metadata." + field] = copy.copy(criteria[field])
            else:
                new_criteria[field] = copy.copy(criteria[field])

        return new_criteria

    def count(self, criteria: Optional[Dict] = None) -> int:
        """
        Counts the number of documents matching the query criteria.

        Args:
            criteria: PyMongo filter for documents to count in
        """
        if isinstance(criteria, dict):
            criteria = self.transform_criteria(criteria)

        return self._files_store.count(criteria)

    def query(
        self,
        criteria: Optional[Dict] = None,
        properties: Union[Dict, List, None] = None,
        sort: Optional[Dict[str, Union[Sort, int]]] = None,
        skip: int = 0,
        limit: int = 0,
    ) -> Iterator[Dict]:
        """
        Queries the GridFS Store for a set of documents.
        Will check to see if data can be returned from
        files store first.
        If the data from the gridfs is not a json serialized string
        a dict will be returned with the data in the "data" key
        plus the self.key and self.last_updated_field.

        Args:
            criteria: PyMongo filter for documents to search in
            properties: properties to return in grouped documents
            sort: Dictionary of sort order for fields. Keys are field names and
                values are 1 for ascending or -1 for descending.
            skip: number documents to skip
            limit: limit on total number of documents returned
        """
        if isinstance(criteria, dict):
            criteria = self.transform_criteria(criteria)
        elif criteria is not None:
            raise ValueError("Criteria must be a dictionary or None")

        prop_keys = set()
        if isinstance(properties, dict):
            prop_keys = set(properties.keys())
        elif isinstance(properties, list):
            prop_keys = set(properties)

        for doc in self._files_store.query(criteria=criteria, sort=sort, limit=limit, skip=skip):
            if properties is not None and prop_keys.issubset(set(doc.keys())):
                yield {p: doc[p] for p in properties if p in doc}
            else:
                metadata = doc.get("metadata", {})

                data = self._collection.find_one(
                    filter={"_id": doc["_id"]},
                    skip=skip,
                    limit=limit,
                    sort=sort,
                ).read()

                if metadata.get("compression", "") == "zlib":
                    data = zlib.decompress(data).decode("UTF-8")

                try:
                    data = json.loads(data)
                except Exception:
                    if not isinstance(data, dict):
                        data = {
                            "data": data,
                            self.key: doc.get(self.key),
                            self.last_updated_field: doc.get(self.last_updated_field),
                        }

                if self.ensure_metadata and isinstance(data, dict):
                    data.update(metadata)

                yield data

    def distinct(self, field: str, criteria: Optional[Dict] = None, all_exist: bool = False) -> List:
        """
        Get all distinct values for a field. This function only operates
        on the metadata in the files collection.

        Args:
            field: the field(s) to get distinct values for
            criteria: PyMongo filter for documents to search in
        """
        criteria = self.transform_criteria(criteria) if isinstance(criteria, dict) else criteria

        field = (
            f"metadata.{field}" if field not in files_collection_fields and not field.startswith("metadata.") else field
        )

        return self._files_store.distinct(field=field, criteria=criteria)

    def groupby(
        self,
        keys: Union[List[str], str],
        criteria: Optional[Dict] = None,
        properties: Union[Dict, List, None] = None,
        sort: Optional[Dict[str, Union[Sort, int]]] = None,
        skip: int = 0,
        limit: int = 0,
    ) -> Iterator[Tuple[Dict, List[Dict]]]:
        """
        Simple grouping function that will group documents
        by keys. Will only work if the keys are included in the files
        collection for GridFS.

        Args:
            keys: fields to group documents
            criteria: PyMongo filter for documents to search in
            properties: properties to return in grouped documents
            sort: Dictionary of sort order for fields. Keys are field names and
                values are 1 for ascending or -1 for descending.
            skip: number documents to skip
            limit: limit on total number of documents returned

        Returns:
            generator returning tuples of (dict, list of docs)
        """

        criteria = self.transform_criteria(criteria) if isinstance(criteria, dict) else criteria
        keys = [keys] if not isinstance(keys, list) else keys
        keys = [
            f"metadata.{k}" if k not in files_collection_fields and not k.startswith("metadata.") else k for k in keys
        ]
        for group, ids in self._files_store.groupby(keys, criteria=criteria, properties=[f"metadata.{self.key}"]):
            ids = [get(doc, f"metadata.{self.key}") for doc in ids if has(doc, f"metadata.{self.key}")]

            group = {k.replace("metadata.", ""): get(group, k) for k in keys if has(group, k)}

            yield group, list(self.query(criteria={self.key: {"$in": ids}}))

    def ensure_index(self, key: str, unique: Optional[bool] = False) -> bool:
        """
        Tries to create an index and return true if it succeeded
        Currently operators on the GridFS files collection
        Args:
            key: single key to index
            unique: Whether or not this index contains only unique keys.

        Returns:
            bool indicating if the index exists/was created
        """
        # Transform key for gridfs first
        if key not in files_collection_fields:
            files_col_key = f"metadata.{key}"
            return self._files_store.ensure_index(files_col_key, unique=unique)
        return self._files_store.ensure_index(key, unique=unique)

    def update(
        self,
        docs: Union[List[Dict], Dict],
        key: Union[List, str, None] = None,
        additional_metadata: Union[str, List[str], None] = None,
    ):
        """
        Update documents into the Store.

        Args:
            docs: the document or list of documents to update
            key: field name(s) to determine uniqueness for a
                 document, can be a list of multiple fields,
                 a single field, or None if the Store's key
                 field is to be used
            additional_metadata: field(s) to include in the gridfs metadata
        """

        if not isinstance(docs, list):
            docs = [docs]

        if isinstance(key, str):
            key = [key]
        elif not key:
            key = [self.key]

        key = list(set(key) - set(files_collection_fields))

        if additional_metadata is None:
            additional_metadata = []
        elif isinstance(additional_metadata, str):
            additional_metadata = [additional_metadata]
        else:
            additional_metadata = list(additional_metadata)

        for d in docs:
            search_doc = {k: d[k] for k in key}

            metadata = {
                k: get(d, k)
                for k in [self.last_updated_field, *additional_metadata, *self.searchable_fields]
                if has(d, k)
            }
            metadata.update(search_doc)
            data = json.dumps(jsanitize(d)).encode("UTF-8")
            if self.compression:
                data = zlib.compress(data)
                metadata["compression"] = "zlib"

            self._collection.put(data, metadata=metadata)
            search_doc = self.transform_criteria(search_doc)

            # Cleans up old gridfs entries
            for fdoc in self._files_collection.find(search_doc, ["_id"]).sort("uploadDate", -1).skip(1):
                self._collection.delete(fdoc["_id"])

    def remove_docs(self, criteria: Dict):
        """
        Remove docs matching the query dictionary.

        Args:
            criteria: query dictionary to match
        """
        if isinstance(criteria, dict):
            criteria = self.transform_criteria(criteria)
        ids = [cursor._id for cursor in self._collection.find(criteria)]

        for _id in ids:
            self._collection.delete(_id)

    def close(self):
        self._files_store.close()
        self._coll = None
        if self.ssh_tunnel is not None:
            self.ssh_tunnel.stop()

    def __eq__(self, other: object) -> bool:
        """
        Check equality for GridFSStore
        other: other GridFSStore to compare with.
        """
        if not isinstance(other, GridFSStore):
            return False

        fields = ["database", "collection_name", "host", "port"]
        return all(getattr(self, f) == getattr(other, f) for f in fields)

last_updated: datetime property

Provides the most recent last_updated date time stamp from the documents in this Store.

name: str property

Return a string representing this data source.

__eq__(other)

Check equality for GridFSStore other: other GridFSStore to compare with.

Source code in src/maggma/stores/gridfs.py
423
424
425
426
427
428
429
430
431
432
def __eq__(self, other: object) -> bool:
    """
    Check equality for GridFSStore
    other: other GridFSStore to compare with.
    """
    if not isinstance(other, GridFSStore):
        return False

    fields = ["database", "collection_name", "host", "port"]
    return all(getattr(self, f) == getattr(other, f) for f in fields)

__init__(database, collection_name, host='localhost', port=27017, username='', password='', compression=False, ensure_metadata=False, searchable_fields=None, auth_source=None, mongoclient_kwargs=None, ssh_tunnel=None, **kwargs)

Initializes a GridFS Store for binary data Args: database: database name collection_name: The name of the collection. This is the string portion before the GridFS extensions host: hostname for the database port: port to connect to username: username to connect as password: password to authenticate as compression: compress the data as it goes into GridFS ensure_metadata: ensure returned documents have the metadata fields searchable_fields: fields to keep in the index store auth_source: The database to authenticate on. Defaults to the database name. ssh_tunnel: An SSHTunnel object to use.

Source code in src/maggma/stores/gridfs.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def __init__(
    self,
    database: str,
    collection_name: str,
    host: str = "localhost",
    port: int = 27017,
    username: str = "",
    password: str = "",
    compression: bool = False,
    ensure_metadata: bool = False,
    searchable_fields: Optional[List[str]] = None,
    auth_source: Optional[str] = None,
    mongoclient_kwargs: Optional[Dict] = None,
    ssh_tunnel: Optional[SSHTunnel] = None,
    **kwargs,
):
    """
    Initializes a GridFS Store for binary data
    Args:
        database: database name
        collection_name: The name of the collection.
            This is the string portion before the GridFS extensions
        host: hostname for the database
        port: port to connect to
        username: username to connect as
        password: password to authenticate as
        compression: compress the data as it goes into GridFS
        ensure_metadata: ensure returned documents have the metadata fields
        searchable_fields: fields to keep in the index store
        auth_source: The database to authenticate on. Defaults to the database name.
        ssh_tunnel: An SSHTunnel object to use.
    """

    self.database = database
    self.collection_name = collection_name
    self.host = host
    self.port = port
    self.username = username
    self.password = password
    self._coll: Any = None
    self.compression = compression
    self.ensure_metadata = ensure_metadata
    self.searchable_fields = [] if searchable_fields is None else searchable_fields
    self.kwargs = kwargs
    self.ssh_tunnel = ssh_tunnel

    if auth_source is None:
        auth_source = self.database
    self.auth_source = auth_source
    self.mongoclient_kwargs = mongoclient_kwargs or {}

    if "key" not in kwargs:
        kwargs["key"] = "_id"
    super().__init__(**kwargs)

connect(force_reset=False)

Connect to the source data.

Parameters:

Name Type Description Default
force_reset bool

whether to reset the connection or not when the Store is already connected.

False
Source code in src/maggma/stores/gridfs.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def connect(self, force_reset: bool = False):
    """
    Connect to the source data.

    Args:
        force_reset: whether to reset the connection or not when the Store is
            already connected.
    """
    if not self._coll or force_reset:
        if self.ssh_tunnel is None:
            host = self.host
            port = self.port
        else:
            self.ssh_tunnel.start()
            host, port = self.ssh_tunnel.local_address

        conn: MongoClient = (
            MongoClient(
                host=host,
                port=port,
                username=self.username,
                password=self.password,
                authSource=self.auth_source,
                **self.mongoclient_kwargs,
            )
            if self.username != ""
            else MongoClient(host, port, **self.mongoclient_kwargs)
        )
        db = conn[self.database]
        self._coll = gridfs.GridFS(db, self.collection_name)
        self._files_collection = db[f"{self.collection_name}.files"]
        self._files_store = MongoStore.from_collection(self._files_collection)
        self._files_store.last_updated_field = f"metadata.{self.last_updated_field}"
        self._files_store.key = self.key
        self._chunks_collection = db[f"{self.collection_name}.chunks"]

count(criteria=None)

Counts the number of documents matching the query criteria.

Parameters:

Name Type Description Default
criteria Optional[Dict]

PyMongo filter for documents to count in

None
Source code in src/maggma/stores/gridfs.py
196
197
198
199
200
201
202
203
204
205
206
def count(self, criteria: Optional[Dict] = None) -> int:
    """
    Counts the number of documents matching the query criteria.

    Args:
        criteria: PyMongo filter for documents to count in
    """
    if isinstance(criteria, dict):
        criteria = self.transform_criteria(criteria)

    return self._files_store.count(criteria)

distinct(field, criteria=None, all_exist=False)

Get all distinct values for a field. This function only operates on the metadata in the files collection.

Parameters:

Name Type Description Default
field str

the field(s) to get distinct values for

required
criteria Optional[Dict]

PyMongo filter for documents to search in

None
Source code in src/maggma/stores/gridfs.py
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
def distinct(self, field: str, criteria: Optional[Dict] = None, all_exist: bool = False) -> List:
    """
    Get all distinct values for a field. This function only operates
    on the metadata in the files collection.

    Args:
        field: the field(s) to get distinct values for
        criteria: PyMongo filter for documents to search in
    """
    criteria = self.transform_criteria(criteria) if isinstance(criteria, dict) else criteria

    field = (
        f"metadata.{field}" if field not in files_collection_fields and not field.startswith("metadata.") else field
    )

    return self._files_store.distinct(field=field, criteria=criteria)

ensure_index(key, unique=False)

Tries to create an index and return true if it succeeded Currently operators on the GridFS files collection Args: key: single key to index unique: Whether or not this index contains only unique keys.

Returns:

Type Description
bool

bool indicating if the index exists/was created

Source code in src/maggma/stores/gridfs.py
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
def ensure_index(self, key: str, unique: Optional[bool] = False) -> bool:
    """
    Tries to create an index and return true if it succeeded
    Currently operators on the GridFS files collection
    Args:
        key: single key to index
        unique: Whether or not this index contains only unique keys.

    Returns:
        bool indicating if the index exists/was created
    """
    # Transform key for gridfs first
    if key not in files_collection_fields:
        files_col_key = f"metadata.{key}"
        return self._files_store.ensure_index(files_col_key, unique=unique)
    return self._files_store.ensure_index(key, unique=unique)

from_launchpad_file(lp_file, collection_name, **kwargs) classmethod

Convenience method to construct a GridFSStore from a launchpad file.

Note: A launchpad file is a special formatted yaml file used in fireworks

Returns:

Source code in src/maggma/stores/gridfs.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
@classmethod
def from_launchpad_file(cls, lp_file, collection_name, **kwargs):
    """
    Convenience method to construct a GridFSStore from a launchpad file.

    Note: A launchpad file is a special formatted yaml file used in fireworks

    Returns:
    """
    with open(lp_file) as f:
        lp_creds = yaml.safe_load(f.read())

    db_creds = lp_creds.copy()
    db_creds["database"] = db_creds["name"]
    for key in list(db_creds.keys()):
        if key not in ["database", "host", "port", "username", "password"]:
            db_creds.pop(key)
    db_creds["collection_name"] = collection_name

    return cls(**db_creds, **kwargs)

groupby(keys, criteria=None, properties=None, sort=None, skip=0, limit=0)

Simple grouping function that will group documents by keys. Will only work if the keys are included in the files collection for GridFS.

Parameters:

Name Type Description Default
keys Union[List[str], str]

fields to group documents

required
criteria Optional[Dict]

PyMongo filter for documents to search in

None
properties Union[Dict, List, None]

properties to return in grouped documents

None
sort Optional[Dict[str, Union[Sort, int]]]

Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending.

None
skip int

number documents to skip

0
limit int

limit on total number of documents returned

0

Returns:

Type Description
Iterator[Tuple[Dict, List[Dict]]]

generator returning tuples of (dict, list of docs)

Source code in src/maggma/stores/gridfs.py
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
def groupby(
    self,
    keys: Union[List[str], str],
    criteria: Optional[Dict] = None,
    properties: Union[Dict, List, None] = None,
    sort: Optional[Dict[str, Union[Sort, int]]] = None,
    skip: int = 0,
    limit: int = 0,
) -> Iterator[Tuple[Dict, List[Dict]]]:
    """
    Simple grouping function that will group documents
    by keys. Will only work if the keys are included in the files
    collection for GridFS.

    Args:
        keys: fields to group documents
        criteria: PyMongo filter for documents to search in
        properties: properties to return in grouped documents
        sort: Dictionary of sort order for fields. Keys are field names and
            values are 1 for ascending or -1 for descending.
        skip: number documents to skip
        limit: limit on total number of documents returned

    Returns:
        generator returning tuples of (dict, list of docs)
    """

    criteria = self.transform_criteria(criteria) if isinstance(criteria, dict) else criteria
    keys = [keys] if not isinstance(keys, list) else keys
    keys = [
        f"metadata.{k}" if k not in files_collection_fields and not k.startswith("metadata.") else k for k in keys
    ]
    for group, ids in self._files_store.groupby(keys, criteria=criteria, properties=[f"metadata.{self.key}"]):
        ids = [get(doc, f"metadata.{self.key}") for doc in ids if has(doc, f"metadata.{self.key}")]

        group = {k.replace("metadata.", ""): get(group, k) for k in keys if has(group, k)}

        yield group, list(self.query(criteria={self.key: {"$in": ids}}))

query(criteria=None, properties=None, sort=None, skip=0, limit=0)

Queries the GridFS Store for a set of documents. Will check to see if data can be returned from files store first. If the data from the gridfs is not a json serialized string a dict will be returned with the data in the "data" key plus the self.key and self.last_updated_field.

Parameters:

Name Type Description Default
criteria Optional[Dict]

PyMongo filter for documents to search in

None
properties Union[Dict, List, None]

properties to return in grouped documents

None
sort Optional[Dict[str, Union[Sort, int]]]

Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending.

None
skip int

number documents to skip

0
limit int

limit on total number of documents returned

0
Source code in src/maggma/stores/gridfs.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
def query(
    self,
    criteria: Optional[Dict] = None,
    properties: Union[Dict, List, None] = None,
    sort: Optional[Dict[str, Union[Sort, int]]] = None,
    skip: int = 0,
    limit: int = 0,
) -> Iterator[Dict]:
    """
    Queries the GridFS Store for a set of documents.
    Will check to see if data can be returned from
    files store first.
    If the data from the gridfs is not a json serialized string
    a dict will be returned with the data in the "data" key
    plus the self.key and self.last_updated_field.

    Args:
        criteria: PyMongo filter for documents to search in
        properties: properties to return in grouped documents
        sort: Dictionary of sort order for fields. Keys are field names and
            values are 1 for ascending or -1 for descending.
        skip: number documents to skip
        limit: limit on total number of documents returned
    """
    if isinstance(criteria, dict):
        criteria = self.transform_criteria(criteria)
    elif criteria is not None:
        raise ValueError("Criteria must be a dictionary or None")

    prop_keys = set()
    if isinstance(properties, dict):
        prop_keys = set(properties.keys())
    elif isinstance(properties, list):
        prop_keys = set(properties)

    for doc in self._files_store.query(criteria=criteria, sort=sort, limit=limit, skip=skip):
        if properties is not None and prop_keys.issubset(set(doc.keys())):
            yield {p: doc[p] for p in properties if p in doc}
        else:
            metadata = doc.get("metadata", {})

            data = self._collection.find_one(
                filter={"_id": doc["_id"]},
                skip=skip,
                limit=limit,
                sort=sort,
            ).read()

            if metadata.get("compression", "") == "zlib":
                data = zlib.decompress(data).decode("UTF-8")

            try:
                data = json.loads(data)
            except Exception:
                if not isinstance(data, dict):
                    data = {
                        "data": data,
                        self.key: doc.get(self.key),
                        self.last_updated_field: doc.get(self.last_updated_field),
                    }

            if self.ensure_metadata and isinstance(data, dict):
                data.update(metadata)

            yield data

remove_docs(criteria)

Remove docs matching the query dictionary.

Parameters:

Name Type Description Default
criteria Dict

query dictionary to match

required
Source code in src/maggma/stores/gridfs.py
403
404
405
406
407
408
409
410
411
412
413
414
415
def remove_docs(self, criteria: Dict):
    """
    Remove docs matching the query dictionary.

    Args:
        criteria: query dictionary to match
    """
    if isinstance(criteria, dict):
        criteria = self.transform_criteria(criteria)
    ids = [cursor._id for cursor in self._collection.find(criteria)]

    for _id in ids:
        self._collection.delete(_id)

transform_criteria(criteria) classmethod

Allow client to not need to prepend 'metadata.' to query fields.

Parameters:

Name Type Description Default
criteria Dict

Query criteria

required
Source code in src/maggma/stores/gridfs.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
@classmethod
def transform_criteria(cls, criteria: Dict) -> Dict:
    """
    Allow client to not need to prepend 'metadata.' to query fields.

    Args:
        criteria: Query criteria
    """
    new_criteria = dict()
    for field in criteria:
        if field not in files_collection_fields and not field.startswith("metadata."):
            new_criteria["metadata." + field] = copy.copy(criteria[field])
        else:
            new_criteria[field] = copy.copy(criteria[field])

    return new_criteria

update(docs, key=None, additional_metadata=None)

Update documents into the Store.

Parameters:

Name Type Description Default
docs Union[List[Dict], Dict]

the document or list of documents to update

required
key Union[List, str, None]

field name(s) to determine uniqueness for a document, can be a list of multiple fields, a single field, or None if the Store's key field is to be used

None
additional_metadata Union[str, List[str], None]

field(s) to include in the gridfs metadata

None
Source code in src/maggma/stores/gridfs.py
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
def update(
    self,
    docs: Union[List[Dict], Dict],
    key: Union[List, str, None] = None,
    additional_metadata: Union[str, List[str], None] = None,
):
    """
    Update documents into the Store.

    Args:
        docs: the document or list of documents to update
        key: field name(s) to determine uniqueness for a
             document, can be a list of multiple fields,
             a single field, or None if the Store's key
             field is to be used
        additional_metadata: field(s) to include in the gridfs metadata
    """

    if not isinstance(docs, list):
        docs = [docs]

    if isinstance(key, str):
        key = [key]
    elif not key:
        key = [self.key]

    key = list(set(key) - set(files_collection_fields))

    if additional_metadata is None:
        additional_metadata = []
    elif isinstance(additional_metadata, str):
        additional_metadata = [additional_metadata]
    else:
        additional_metadata = list(additional_metadata)

    for d in docs:
        search_doc = {k: d[k] for k in key}

        metadata = {
            k: get(d, k)
            for k in [self.last_updated_field, *additional_metadata, *self.searchable_fields]
            if has(d, k)
        }
        metadata.update(search_doc)
        data = json.dumps(jsanitize(d)).encode("UTF-8")
        if self.compression:
            data = zlib.compress(data)
            metadata["compression"] = "zlib"

        self._collection.put(data, metadata=metadata)
        search_doc = self.transform_criteria(search_doc)

        # Cleans up old gridfs entries
        for fdoc in self._files_collection.find(search_doc, ["_id"]).sort("uploadDate", -1).skip(1):
            self._collection.delete(fdoc["_id"])

GridFSURIStore

Bases: GridFSStore

A Store for GridFS backend, with connection via a mongo URI string.

This is expected to be a special mongodb+srv:// URIs that include client parameters via TXT records

Source code in src/maggma/stores/gridfs.py
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
class GridFSURIStore(GridFSStore):
    """
    A Store for GridFS backend, with connection via a mongo URI string.

    This is expected to be a special mongodb+srv:// URIs that include client parameters
    via TXT records
    """

    def __init__(
        self,
        uri: str,
        collection_name: str,
        database: Optional[str] = None,
        compression: bool = False,
        ensure_metadata: bool = False,
        searchable_fields: Optional[List[str]] = None,
        mongoclient_kwargs: Optional[Dict] = None,
        **kwargs,
    ):
        """
        Initializes a GridFS Store for binary data.

        Args:
            uri: MongoDB+SRV URI
            database: database to connect to
            collection_name: The collection name
            compression: compress the data as it goes into GridFS
            ensure_metadata: ensure returned documents have the metadata fields
            searchable_fields: fields to keep in the index store.
        """

        self.uri = uri

        # parse the dbname from the uri
        if database is None:
            d_uri = uri_parser.parse_uri(uri)
            if d_uri["database"] is None:
                raise ConfigurationError("If database name is not supplied, a database must be set in the uri")
            self.database = d_uri["database"]
        else:
            self.database = database

        self.collection_name = collection_name
        self._coll: Any = None
        self.compression = compression
        self.ensure_metadata = ensure_metadata
        self.searchable_fields = [] if searchable_fields is None else searchable_fields
        self.kwargs = kwargs
        self.mongoclient_kwargs = mongoclient_kwargs or {}

        if "key" not in kwargs:
            kwargs["key"] = "_id"
        super(GridFSStore, self).__init__(**kwargs)  # lgtm

    def connect(self, force_reset: bool = False):
        """
        Connect to the source data.

        Args:
            force_reset: whether to reset the connection or not when the Store is
                already connected.
        """
        if not self._coll or force_reset:  # pragma: no cover
            conn: MongoClient = MongoClient(self.uri, **self.mongoclient_kwargs)
            db = conn[self.database]
            self._coll = gridfs.GridFS(db, self.collection_name)
            self._files_collection = db[f"{self.collection_name}.files"]
            self._files_store = MongoStore.from_collection(self._files_collection)
            self._files_store.last_updated_field = f"metadata.{self.last_updated_field}"
            self._files_store.key = self.key
            self._chunks_collection = db[f"{self.collection_name}.chunks"]

__init__(uri, collection_name, database=None, compression=False, ensure_metadata=False, searchable_fields=None, mongoclient_kwargs=None, **kwargs)

Initializes a GridFS Store for binary data.

Parameters:

Name Type Description Default
uri str

MongoDB+SRV URI

required
database Optional[str]

database to connect to

None
collection_name str

The collection name

required
compression bool

compress the data as it goes into GridFS

False
ensure_metadata bool

ensure returned documents have the metadata fields

False
searchable_fields Optional[List[str]]

fields to keep in the index store.

None
Source code in src/maggma/stores/gridfs.py
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
def __init__(
    self,
    uri: str,
    collection_name: str,
    database: Optional[str] = None,
    compression: bool = False,
    ensure_metadata: bool = False,
    searchable_fields: Optional[List[str]] = None,
    mongoclient_kwargs: Optional[Dict] = None,
    **kwargs,
):
    """
    Initializes a GridFS Store for binary data.

    Args:
        uri: MongoDB+SRV URI
        database: database to connect to
        collection_name: The collection name
        compression: compress the data as it goes into GridFS
        ensure_metadata: ensure returned documents have the metadata fields
        searchable_fields: fields to keep in the index store.
    """

    self.uri = uri

    # parse the dbname from the uri
    if database is None:
        d_uri = uri_parser.parse_uri(uri)
        if d_uri["database"] is None:
            raise ConfigurationError("If database name is not supplied, a database must be set in the uri")
        self.database = d_uri["database"]
    else:
        self.database = database

    self.collection_name = collection_name
    self._coll: Any = None
    self.compression = compression
    self.ensure_metadata = ensure_metadata
    self.searchable_fields = [] if searchable_fields is None else searchable_fields
    self.kwargs = kwargs
    self.mongoclient_kwargs = mongoclient_kwargs or {}

    if "key" not in kwargs:
        kwargs["key"] = "_id"
    super(GridFSStore, self).__init__(**kwargs)  # lgtm

connect(force_reset=False)

Connect to the source data.

Parameters:

Name Type Description Default
force_reset bool

whether to reset the connection or not when the Store is already connected.

False
Source code in src/maggma/stores/gridfs.py
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
def connect(self, force_reset: bool = False):
    """
    Connect to the source data.

    Args:
        force_reset: whether to reset the connection or not when the Store is
            already connected.
    """
    if not self._coll or force_reset:  # pragma: no cover
        conn: MongoClient = MongoClient(self.uri, **self.mongoclient_kwargs)
        db = conn[self.database]
        self._coll = gridfs.GridFS(db, self.collection_name)
        self._files_collection = db[f"{self.collection_name}.files"]
        self._files_store = MongoStore.from_collection(self._files_collection)
        self._files_store.last_updated_field = f"metadata.{self.last_updated_field}"
        self._files_store.key = self.key
        self._chunks_collection = db[f"{self.collection_name}.chunks"]

Stores for connecting to AWS data.

S3Store

Bases: Store

GridFS like storage using Amazon S3 and a regular store for indexing.

Assumes Amazon AWS key and secret key are set in environment or default config file.

Source code in src/maggma/stores/aws.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
class S3Store(Store):
    """
    GridFS like storage using Amazon S3 and a regular store for indexing.

    Assumes Amazon AWS key and secret key are set in environment or default config file.
    """

    def __init__(
        self,
        index: Store,
        bucket: str,
        s3_profile: Optional[Union[str, dict]] = None,
        compress: bool = False,
        endpoint_url: Optional[str] = None,
        sub_dir: Optional[str] = None,
        s3_workers: int = 1,
        s3_resource_kwargs: Optional[dict] = None,
        ssh_tunnel: Optional[SSHTunnel] = None,
        key: str = "fs_id",
        store_hash: bool = True,
        unpack_data: bool = True,
        searchable_fields: Optional[List[str]] = None,
        index_store_kwargs: Optional[dict] = None,
        **kwargs,
    ):
        """
        Initializes an S3 Store.

        Args:
            index: a store to use to index the S3 bucket.
            bucket: name of the bucket.
            s3_profile: name of AWS profile containing the credentials. Alternatively
                you can pass in a dictionary with the full credentials:
                    aws_access_key_id (string) -- AWS access key ID
                    aws_secret_access_key (string) -- AWS secret access key
                    aws_session_token (string) -- AWS temporary session token
                    region_name (string) -- Default region when creating new connections
            compress: compress files inserted into the store.
            endpoint_url: this allows the interface with minio service; ignored if
                `ssh_tunnel` is provided, in which case it is inferred.
            sub_dir: subdirectory of the S3 bucket to store the data.
            s3_workers: number of concurrent S3 puts to run.
            s3_resource_kwargs: additional kwargs to pass to the boto3 session resource.
            ssh_tunnel: optional SSH tunnel to use for the S3 connection.
            key: main key to index on.
            store_hash: store the SHA1 hash right before insertion to the database.
            unpack_data: whether to decompress and unpack byte data when querying from
                the bucket.
            searchable_fields: fields to keep in the index store.
            index_store_kwargs: kwargs to pass to the index store. Allows the user to
                use kwargs here to update the index store.
        """
        if boto3 is None:
            raise RuntimeError("boto3 and botocore are required for S3Store")
        self.index_store_kwargs = index_store_kwargs or {}
        if index_store_kwargs:
            d_ = index.as_dict()
            d_.update(index_store_kwargs)
            self.index = index.__class__.from_dict(d_)
        else:
            self.index = index
        self.bucket = bucket
        self.s3_profile = s3_profile
        self.compress = compress
        self.endpoint_url = endpoint_url
        self.sub_dir = sub_dir.strip("/") + "/" if sub_dir else ""
        self.s3: Any = None
        self.s3_bucket: Any = None
        self.s3_workers = s3_workers
        self.s3_resource_kwargs = s3_resource_kwargs if s3_resource_kwargs is not None else {}
        self.ssh_tunnel = ssh_tunnel
        self.unpack_data = unpack_data
        self.searchable_fields = searchable_fields if searchable_fields is not None else []
        self.store_hash = store_hash

        # Force the key to be the same as the index
        assert isinstance(index.key, str), "Since we are using the key as a file name in S3, they key must be a string"
        if key != index.key:
            warnings.warn(
                f'The desired S3Store key "{key}" does not match the index key "{index.key},"'
                "the index key will be used",
                UserWarning,
            )
        kwargs["key"] = str(index.key)

        self._thread_local = threading.local()
        super().__init__(**kwargs)

    @property
    def name(self) -> str:
        """String representing this data source."""
        return f"s3://{self.bucket}"

    def connect(self, force_reset: bool = False):  # lgtm[py/conflicting-attributes]
        """Connect to the source data.

        Args:
            force_reset: whether to force a reset of the connection
        """
        if self.s3 is None or force_reset:
            self.s3, self.s3_bucket = self._get_resource_and_bucket()
        self.index.connect(force_reset=force_reset)

    def close(self):
        """Closes any connections."""
        self.index.close()

        self.s3.meta.client.close()
        self.s3 = None
        self.s3_bucket = None

        if self.ssh_tunnel is not None:
            self.ssh_tunnel.stop()

    @property
    def _collection(self):
        """
        A handle to the pymongo collection object.

        Important:
            Not guaranteed to exist in the future.
        """
        # For now returns the index collection since that is what we would "search" on
        return self.index._collection

    def count(self, criteria: Optional[Dict] = None) -> int:
        """
        Counts the number of documents matching the query criteria.

        Args:
            criteria: PyMongo filter for documents to count in.
        """
        return self.index.count(criteria)

    def query(
        self,
        criteria: Optional[Dict] = None,
        properties: Union[Dict, List, None] = None,
        sort: Optional[Dict[str, Union[Sort, int]]] = None,
        skip: int = 0,
        limit: int = 0,
    ) -> Iterator[Dict]:
        """
        Queries the Store for a set of documents.

        Args:
            criteria: PyMongo filter for documents to search in.
            properties: properties to return in grouped documents.
            sort: Dictionary of sort order for fields. Keys are field names and values
                are 1 for ascending or -1 for descending.
            skip: number documents to skip.
            limit: limit on total number of documents returned.

        """
        prop_keys = set()
        if isinstance(properties, dict):
            prop_keys = set(properties.keys())
        elif isinstance(properties, list):
            prop_keys = set(properties)

        for doc in self.index.query(criteria=criteria, sort=sort, limit=limit, skip=skip):
            if properties is not None and prop_keys.issubset(set(doc.keys())):
                yield {p: doc[p] for p in properties if p in doc}
            else:
                try:
                    # TODO: This is ugly and unsafe, do some real checking before pulling data
                    data = self.s3_bucket.Object(self._get_full_key_path(doc[self.key])).get()["Body"].read()
                except botocore.exceptions.ClientError as e:
                    # If a client error is thrown, then check that it was a NoSuchKey or NoSuchBucket error.
                    # If it was a NoSuchKey error, then the object does not exist.
                    error_code = e.response["Error"]["Code"]
                    if error_code in ["NoSuchKey", "NoSuchBucket"]:
                        error_message = e.response["Error"]["Message"]
                        self.logger.error(
                            f"S3 returned '{error_message}' while querying '{self.bucket}' for '{doc[self.key]}'"
                        )
                        continue
                    else:
                        raise e

                if self.unpack_data:
                    data = self._read_data(data=data, compress_header=doc.get("compression", ""))

                    if self.last_updated_field in doc:
                        data[self.last_updated_field] = doc[self.last_updated_field]

                yield data

    def _read_data(self, data: bytes, compress_header: str) -> Dict:
        """Reads the data and transforms it into a dictionary.
        Allows for subclasses to apply custom schemes for transforming
        the data retrieved from S3.

        Args:
            data (bytes): The raw byte representation of the data.
            compress_header (str): String representing the type of compression used on the data.

        Returns:
            Dict: Dictionary representation of the data.
        """
        return self._unpack(data=data, compressed=compress_header == "zlib")

    @staticmethod
    def _unpack(data: bytes, compressed: bool):
        if compressed:
            data = zlib.decompress(data)
        # requires msgpack-python to be installed to fix string encoding problem
        # https://github.com/msgpack/msgpack/issues/121
        # During recursion
        # msgpack.unpackb goes as deep as possible during reconstruction
        # MontyDecoder().process_decode only goes until it finds a from_dict
        # as such, we cannot just use msgpack.unpackb(data, object_hook=monty_object_hook, raw=False)
        # Should just return the unpacked object then let the user run process_decoded
        return msgpack.unpackb(data, raw=False)

    def distinct(self, field: str, criteria: Optional[Dict] = None, all_exist: bool = False) -> List:
        """
        Get all distinct values for a field.

        Args:
            field: the field(s) to get distinct values for.
            criteria: PyMongo filter for documents to search in.
        """
        # Index is a store so it should have its own distinct function
        return self.index.distinct(field, criteria=criteria)

    def groupby(
        self,
        keys: Union[List[str], str],
        criteria: Optional[Dict] = None,
        properties: Union[Dict, List, None] = None,
        sort: Optional[Dict[str, Union[Sort, int]]] = None,
        skip: int = 0,
        limit: int = 0,
    ) -> Iterator[Tuple[Dict, List[Dict]]]:
        """
        Simple grouping function that will group documents by keys.

        Args:
            keys: fields to group documents.
            criteria: PyMongo filter for documents to search in.
            properties: properties to return in grouped documents.
            sort: Dictionary of sort order for fields. Keys are field names and values
            are 1 for ascending or -1 for descending.
            skip: number documents to skip.
            limit: limit on total number of documents returned.

        Returns:
            generator returning tuples of (dict, list of docs)
        """
        return self.index.groupby(
            keys=keys,
            criteria=criteria,
            properties=properties,
            sort=sort,
            skip=skip,
            limit=limit,
        )

    def ensure_index(self, key: str, unique: bool = False) -> bool:
        """
        Tries to create an index and return true if it succeeded.

        Args:
            key: single key to index.
            unique: whether this index contains only unique keys.

        Returns:
            bool indicating if the index exists/was created.
        """
        return self.index.ensure_index(key, unique=unique)

    def update(
        self,
        docs: Union[List[Dict], Dict],
        key: Union[List, str, None] = None,
        additional_metadata: Union[str, List[str], None] = None,
    ):
        """
        Update documents into the Store.

        Args:
            docs: the document or list of documents to update.
            key: field name(s) to determine uniqueness for a document, can be a list of
                multiple fields, a single field, or None if the Store's key field is to
                be used.
            additional_metadata: field(s) to include in the S3 store's metadata.
        """
        if not isinstance(docs, list):
            docs = [docs]

        if isinstance(key, str):
            key = [key]
        elif not key:
            key = [self.key]

        if additional_metadata is None:
            additional_metadata = []
        elif isinstance(additional_metadata, str):
            additional_metadata = [additional_metadata]
        else:
            additional_metadata = list(additional_metadata)

        self._write_to_s3_and_index(docs, key + additional_metadata + self.searchable_fields)

    def _write_to_s3_and_index(self, docs: List[Dict], search_keys: List[str]):
        """Implements updating of the provided documents in S3 and the index.
        Allows for subclasses to apply custom approaches to parellizing the writing.

        Args:
            docs (List[Dict]): The documents to update
            search_keys (List[str]): The keys of the information to be updated in the index
        """
        with ThreadPoolExecutor(max_workers=self.s3_workers) as pool:
            fs = {
                pool.submit(
                    self.write_doc_to_s3,
                    doc=itr_doc,
                    search_keys=search_keys,
                )
                for itr_doc in docs
            }
            fs, _ = wait(fs)

            search_docs = [sdoc.result() for sdoc in fs]

        # Use store's update to remove key clashes
        self.index.update(search_docs, key=self.key)

    def _get_session(self):
        if self.ssh_tunnel is not None:
            self.ssh_tunnel.start()

        if not hasattr(self._thread_local, "s3_bucket"):
            if isinstance(self.s3_profile, dict):
                return Session(**self.s3_profile)
            return Session(profile_name=self.s3_profile)

        return None

    def _get_endpoint_url(self):
        if self.ssh_tunnel is None:
            return self.endpoint_url
        host, port = self.ssh_tunnel.local_address
        return f"http://{host}:{port}"

    def _get_bucket(self):
        """If on the main thread return the bucket created above, else create a new
        bucket on each thread."""
        if threading.current_thread().name == "MainThread":
            return self.s3_bucket

        if not hasattr(self._thread_local, "s3_bucket"):
            _, bucket = self._get_resource_and_bucket()
            self._thread_local.s3_bucket = bucket

        return self._thread_local.s3_bucket

    def _get_resource_and_bucket(self):
        """Helper function to create the resource and bucket objects."""
        session = self._get_session()
        endpoint_url = self._get_endpoint_url()
        resource = session.resource("s3", endpoint_url=endpoint_url, **self.s3_resource_kwargs)
        try:
            resource.meta.client.head_bucket(Bucket=self.bucket)
        except ClientError:
            raise RuntimeError("Bucket not present on AWS")
        bucket = resource.Bucket(self.bucket)

        return resource, bucket

    def _get_full_key_path(self, id: str) -> str:
        """Produces the full key path for S3 items.

        Args:
            id (str): The value of the key identifier.

        Returns:
            str: The full key path
        """
        return self.sub_dir + str(id)

    def _get_compression_function(self) -> Callable:
        """Returns the function to use for compressing data."""
        return zlib.compress

    def _get_decompression_function(self) -> Callable:
        """Returns the function to use for decompressing data."""
        return zlib.decompress

    def write_doc_to_s3(self, doc: Dict, search_keys: List[str]) -> Dict:
        """
        Write the data to s3 and return the metadata to be inserted into the index db.

        Args:
            doc: the document.
            search_keys: list of keys to pull from the docs and be inserted into the
                index db.

        Returns:
            Dict: The metadata to be inserted into the index db
        """
        s3_bucket = self._get_bucket()

        search_doc = {k: doc[k] for k in search_keys}
        search_doc[self.key] = doc[self.key]  # Ensure key is in metadata
        if self.sub_dir != "":
            search_doc["sub_dir"] = self.sub_dir

        # Remove MongoDB _id from search
        if "_id" in search_doc:
            del search_doc["_id"]

        # to make hashing more meaningful, make sure last updated field is removed
        lu_info = doc.pop(self.last_updated_field, None)
        data = msgpack.packb(doc, default=monty_default)

        if self.compress:
            # Compress with zlib if chosen
            search_doc["compression"] = "zlib"
            data = self._get_compression_function()(data)

        # keep a record of original keys, in case these are important for the individual researcher
        # it is not expected that this information will be used except in disaster recovery
        s3_to_mongo_keys = {k: self._sanitize_key(k) for k in search_doc}
        s3_to_mongo_keys["s3-to-mongo-keys"] = "s3-to-mongo-keys"  # inception
        # encode dictionary since values have to be strings
        search_doc["s3-to-mongo-keys"] = dumps(s3_to_mongo_keys)
        s3_bucket.upload_fileobj(
            Fileobj=BytesIO(data),
            Key=self._get_full_key_path(str(doc[self.key])),
            ExtraArgs={"Metadata": {s3_to_mongo_keys[k]: str(v) for k, v in search_doc.items()}},
        )

        if lu_info is not None:
            search_doc[self.last_updated_field] = lu_info

        if self.store_hash:
            hasher = sha1()
            hasher.update(data)
            obj_hash = hasher.hexdigest()
            search_doc["obj_hash"] = obj_hash
        return search_doc

    @staticmethod
    def _sanitize_key(key):
        """Sanitize keys to store in S3/MinIO metadata."""
        # Any underscores are encoded as double dashes in metadata, since keys with
        # underscores may be result in the corresponding HTTP header being stripped
        # by certain server configurations (e.g. default nginx), leading to:
        # `botocore.exceptions.ClientError: An error occurred (AccessDenied) when
        # calling the PutObject operation: There were headers present in the request
        # which were not signed`
        # Metadata stored in the MongoDB index (self.index) is stored unchanged.

        # Additionally, MinIO requires lowercase keys
        return str(key).replace("_", "-").lower()

    def remove_docs(self, criteria: Dict, remove_s3_object: bool = False):
        """
        Remove docs matching the query dictionary.

        Args:
            criteria: query dictionary to match.
            remove_s3_object: whether to remove the actual S3 object or not.
        """
        if not remove_s3_object:
            self.index.remove_docs(criteria=criteria)
        else:
            to_remove = self.index.distinct(self.key, criteria=criteria)
            self.index.remove_docs(criteria=criteria)

            # Can remove up to 1000 items at a time via boto
            to_remove_chunks = list(grouper(to_remove, n=1000))
            for chunk_to_remove in to_remove_chunks:
                objlist = [{"Key": self._get_full_key_path(obj)} for obj in chunk_to_remove]
                self.s3_bucket.delete_objects(Delete={"Objects": objlist})

    @property
    def last_updated(self):
        return self.index.last_updated

    def newer_in(self, target: Store, criteria: Optional[Dict] = None, exhaustive: bool = False) -> List[str]:
        """
        Returns the keys of documents that are newer in the target Store than this Store.

        Args:
            target: target Store.
            criteria: PyMongo filter for documents to search in.
            exhaustive: triggers an item-by-item check vs. checking the last_updated of
                the target Store and using that to filter out new items in.
        """
        if hasattr(target, "index"):
            return self.index.newer_in(target=target.index, criteria=criteria, exhaustive=exhaustive)
        return self.index.newer_in(target=target, criteria=criteria, exhaustive=exhaustive)

    def __hash__(self):
        return hash((self.index.__hash__, self.bucket))

    def rebuild_index_from_s3_data(self, **kwargs):
        """
        Rebuilds the index Store from the data in S3.

        Relies on the index document being stores as the metadata for the file. This can
        help recover lost databases.
        """
        bucket = self.s3_bucket
        objects = bucket.objects.filter(Prefix=self.sub_dir)
        for obj in objects:
            key_ = self._get_full_key_path(obj.key)
            data = self.s3_bucket.Object(key_).get()["Body"].read()

            if self.compress:
                data = self._get_decompression_function()(data)
            unpacked_data = msgpack.unpackb(data, raw=False)
            self.update(unpacked_data, **kwargs)

    def rebuild_metadata_from_index(self, index_query: Optional[dict] = None):
        """
        Read data from the index store and populate the metadata of the S3 bucket.
        Force all the keys to be lower case to be Minio compatible.

        Args:
            index_query: query on the index store.
        """
        qq = {} if index_query is None else index_query
        for index_doc in self.index.query(qq):
            key_ = self._get_full_key_path(index_doc[self.key])
            s3_object = self.s3_bucket.Object(key_)
            new_meta = {self._sanitize_key(k): v for k, v in s3_object.metadata.items()}
            for k, v in index_doc.items():
                new_meta[str(k).lower()] = v
            new_meta.pop("_id")
            if self.last_updated_field in new_meta:
                new_meta[self.last_updated_field] = str(to_isoformat_ceil_ms(new_meta[self.last_updated_field]))
            # s3_object.metadata.update(new_meta)
            s3_object.copy_from(
                CopySource={"Bucket": self.s3_bucket.name, "Key": key_},
                Metadata=new_meta,
                MetadataDirective="REPLACE",
            )

    def __eq__(self, other: object) -> bool:
        """
        Check equality for S3Store.

        other: other S3Store to compare with.
        """
        if not isinstance(other, S3Store):
            return False

        fields = ["index", "bucket", "last_updated_field"]
        return all(getattr(self, f) == getattr(other, f) for f in fields)

name: str property

String representing this data source.

__eq__(other)

Check equality for S3Store.

other: other S3Store to compare with.

Source code in src/maggma/stores/aws.py
571
572
573
574
575
576
577
578
579
580
581
def __eq__(self, other: object) -> bool:
    """
    Check equality for S3Store.

    other: other S3Store to compare with.
    """
    if not isinstance(other, S3Store):
        return False

    fields = ["index", "bucket", "last_updated_field"]
    return all(getattr(self, f) == getattr(other, f) for f in fields)

__init__(index, bucket, s3_profile=None, compress=False, endpoint_url=None, sub_dir=None, s3_workers=1, s3_resource_kwargs=None, ssh_tunnel=None, key='fs_id', store_hash=True, unpack_data=True, searchable_fields=None, index_store_kwargs=None, **kwargs)

Initializes an S3 Store.

Parameters:

Name Type Description Default
index Store

a store to use to index the S3 bucket.

required
bucket str

name of the bucket.

required
s3_profile Optional[Union[str, dict]]

name of AWS profile containing the credentials. Alternatively you can pass in a dictionary with the full credentials: aws_access_key_id (string) -- AWS access key ID aws_secret_access_key (string) -- AWS secret access key aws_session_token (string) -- AWS temporary session token region_name (string) -- Default region when creating new connections

None
compress bool

compress files inserted into the store.

False
endpoint_url Optional[str]

this allows the interface with minio service; ignored if ssh_tunnel is provided, in which case it is inferred.

None
sub_dir Optional[str]

subdirectory of the S3 bucket to store the data.

None
s3_workers int

number of concurrent S3 puts to run.

1
s3_resource_kwargs Optional[dict]

additional kwargs to pass to the boto3 session resource.

None
ssh_tunnel Optional[SSHTunnel]

optional SSH tunnel to use for the S3 connection.

None
key str

main key to index on.

'fs_id'
store_hash bool

store the SHA1 hash right before insertion to the database.

True
unpack_data bool

whether to decompress and unpack byte data when querying from the bucket.

True
searchable_fields Optional[List[str]]

fields to keep in the index store.

None
index_store_kwargs Optional[dict]

kwargs to pass to the index store. Allows the user to use kwargs here to update the index store.

None
Source code in src/maggma/stores/aws.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def __init__(
    self,
    index: Store,
    bucket: str,
    s3_profile: Optional[Union[str, dict]] = None,
    compress: bool = False,
    endpoint_url: Optional[str] = None,
    sub_dir: Optional[str] = None,
    s3_workers: int = 1,
    s3_resource_kwargs: Optional[dict] = None,
    ssh_tunnel: Optional[SSHTunnel] = None,
    key: str = "fs_id",
    store_hash: bool = True,
    unpack_data: bool = True,
    searchable_fields: Optional[List[str]] = None,
    index_store_kwargs: Optional[dict] = None,
    **kwargs,
):
    """
    Initializes an S3 Store.

    Args:
        index: a store to use to index the S3 bucket.
        bucket: name of the bucket.
        s3_profile: name of AWS profile containing the credentials. Alternatively
            you can pass in a dictionary with the full credentials:
                aws_access_key_id (string) -- AWS access key ID
                aws_secret_access_key (string) -- AWS secret access key
                aws_session_token (string) -- AWS temporary session token
                region_name (string) -- Default region when creating new connections
        compress: compress files inserted into the store.
        endpoint_url: this allows the interface with minio service; ignored if
            `ssh_tunnel` is provided, in which case it is inferred.
        sub_dir: subdirectory of the S3 bucket to store the data.
        s3_workers: number of concurrent S3 puts to run.
        s3_resource_kwargs: additional kwargs to pass to the boto3 session resource.
        ssh_tunnel: optional SSH tunnel to use for the S3 connection.
        key: main key to index on.
        store_hash: store the SHA1 hash right before insertion to the database.
        unpack_data: whether to decompress and unpack byte data when querying from
            the bucket.
        searchable_fields: fields to keep in the index store.
        index_store_kwargs: kwargs to pass to the index store. Allows the user to
            use kwargs here to update the index store.
    """
    if boto3 is None:
        raise RuntimeError("boto3 and botocore are required for S3Store")
    self.index_store_kwargs = index_store_kwargs or {}
    if index_store_kwargs:
        d_ = index.as_dict()
        d_.update(index_store_kwargs)
        self.index = index.__class__.from_dict(d_)
    else:
        self.index = index
    self.bucket = bucket
    self.s3_profile = s3_profile
    self.compress = compress
    self.endpoint_url = endpoint_url
    self.sub_dir = sub_dir.strip("/") + "/" if sub_dir else ""
    self.s3: Any = None
    self.s3_bucket: Any = None
    self.s3_workers = s3_workers
    self.s3_resource_kwargs = s3_resource_kwargs if s3_resource_kwargs is not None else {}
    self.ssh_tunnel = ssh_tunnel
    self.unpack_data = unpack_data
    self.searchable_fields = searchable_fields if searchable_fields is not None else []
    self.store_hash = store_hash

    # Force the key to be the same as the index
    assert isinstance(index.key, str), "Since we are using the key as a file name in S3, they key must be a string"
    if key != index.key:
        warnings.warn(
            f'The desired S3Store key "{key}" does not match the index key "{index.key},"'
            "the index key will be used",
            UserWarning,
        )
    kwargs["key"] = str(index.key)

    self._thread_local = threading.local()
    super().__init__(**kwargs)

close()

Closes any connections.

Source code in src/maggma/stores/aws.py
132
133
134
135
136
137
138
139
140
141
def close(self):
    """Closes any connections."""
    self.index.close()

    self.s3.meta.client.close()
    self.s3 = None
    self.s3_bucket = None

    if self.ssh_tunnel is not None:
        self.ssh_tunnel.stop()

connect(force_reset=False)

Connect to the source data.

Parameters:

Name Type Description Default
force_reset bool

whether to force a reset of the connection

False
Source code in src/maggma/stores/aws.py
122
123
124
125
126
127
128
129
130
def connect(self, force_reset: bool = False):  # lgtm[py/conflicting-attributes]
    """Connect to the source data.

    Args:
        force_reset: whether to force a reset of the connection
    """
    if self.s3 is None or force_reset:
        self.s3, self.s3_bucket = self._get_resource_and_bucket()
    self.index.connect(force_reset=force_reset)

count(criteria=None)

Counts the number of documents matching the query criteria.

Parameters:

Name Type Description Default
criteria Optional[Dict]

PyMongo filter for documents to count in.

None
Source code in src/maggma/stores/aws.py
154
155
156
157
158
159
160
161
def count(self, criteria: Optional[Dict] = None) -> int:
    """
    Counts the number of documents matching the query criteria.

    Args:
        criteria: PyMongo filter for documents to count in.
    """
    return self.index.count(criteria)

distinct(field, criteria=None, all_exist=False)

Get all distinct values for a field.

Parameters:

Name Type Description Default
field str

the field(s) to get distinct values for.

required
criteria Optional[Dict]

PyMongo filter for documents to search in.

None
Source code in src/maggma/stores/aws.py
244
245
246
247
248
249
250
251
252
253
def distinct(self, field: str, criteria: Optional[Dict] = None, all_exist: bool = False) -> List:
    """
    Get all distinct values for a field.

    Args:
        field: the field(s) to get distinct values for.
        criteria: PyMongo filter for documents to search in.
    """
    # Index is a store so it should have its own distinct function
    return self.index.distinct(field, criteria=criteria)

ensure_index(key, unique=False)

Tries to create an index and return true if it succeeded.

Parameters:

Name Type Description Default
key str

single key to index.

required
unique bool

whether this index contains only unique keys.

False

Returns:

Type Description
bool

bool indicating if the index exists/was created.

Source code in src/maggma/stores/aws.py
288
289
290
291
292
293
294
295
296
297
298
299
def ensure_index(self, key: str, unique: bool = False) -> bool:
    """
    Tries to create an index and return true if it succeeded.

    Args:
        key: single key to index.
        unique: whether this index contains only unique keys.

    Returns:
        bool indicating if the index exists/was created.
    """
    return self.index.ensure_index(key, unique=unique)

groupby(keys, criteria=None, properties=None, sort=None, skip=0, limit=0)

Simple grouping function that will group documents by keys.

Parameters:

Name Type Description Default
keys Union[List[str], str]

fields to group documents.

required
criteria Optional[Dict]

PyMongo filter for documents to search in.

None
properties Union[Dict, List, None]

properties to return in grouped documents.

None
sort Optional[Dict[str, Union[Sort, int]]]

Dictionary of sort order for fields. Keys are field names and values

None
skip int

number documents to skip.

0
limit int

limit on total number of documents returned.

0

Returns:

Type Description
Iterator[Tuple[Dict, List[Dict]]]

generator returning tuples of (dict, list of docs)

Source code in src/maggma/stores/aws.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
def groupby(
    self,
    keys: Union[List[str], str],
    criteria: Optional[Dict] = None,
    properties: Union[Dict, List, None] = None,
    sort: Optional[Dict[str, Union[Sort, int]]] = None,
    skip: int = 0,
    limit: int = 0,
) -> Iterator[Tuple[Dict, List[Dict]]]:
    """
    Simple grouping function that will group documents by keys.

    Args:
        keys: fields to group documents.
        criteria: PyMongo filter for documents to search in.
        properties: properties to return in grouped documents.
        sort: Dictionary of sort order for fields. Keys are field names and values
        are 1 for ascending or -1 for descending.
        skip: number documents to skip.
        limit: limit on total number of documents returned.

    Returns:
        generator returning tuples of (dict, list of docs)
    """
    return self.index.groupby(
        keys=keys,
        criteria=criteria,
        properties=properties,
        sort=sort,
        skip=skip,
        limit=limit,
    )

newer_in(target, criteria=None, exhaustive=False)

Returns the keys of documents that are newer in the target Store than this Store.

Parameters:

Name Type Description Default
target Store

target Store.

required
criteria Optional[Dict]

PyMongo filter for documents to search in.

None
exhaustive bool

triggers an item-by-item check vs. checking the last_updated of the target Store and using that to filter out new items in.

False
Source code in src/maggma/stores/aws.py
511
512
513
514
515
516
517
518
519
520
521
522
523
def newer_in(self, target: Store, criteria: Optional[Dict] = None, exhaustive: bool = False) -> List[str]:
    """
    Returns the keys of documents that are newer in the target Store than this Store.

    Args:
        target: target Store.
        criteria: PyMongo filter for documents to search in.
        exhaustive: triggers an item-by-item check vs. checking the last_updated of
            the target Store and using that to filter out new items in.
    """
    if hasattr(target, "index"):
        return self.index.newer_in(target=target.index, criteria=criteria, exhaustive=exhaustive)
    return self.index.newer_in(target=target, criteria=criteria, exhaustive=exhaustive)

query(criteria=None, properties=None, sort=None, skip=0, limit=0)

Queries the Store for a set of documents.

Parameters:

Name Type Description Default
criteria Optional[Dict]

PyMongo filter for documents to search in.

None
properties Union[Dict, List, None]

properties to return in grouped documents.

None
sort Optional[Dict[str, Union[Sort, int]]]

Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending.

None
skip int

number documents to skip.

0
limit int

limit on total number of documents returned.

0
Source code in src/maggma/stores/aws.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def query(
    self,
    criteria: Optional[Dict] = None,
    properties: Union[Dict, List, None] = None,
    sort: Optional[Dict[str, Union[Sort, int]]] = None,
    skip: int = 0,
    limit: int = 0,
) -> Iterator[Dict]:
    """
    Queries the Store for a set of documents.

    Args:
        criteria: PyMongo filter for documents to search in.
        properties: properties to return in grouped documents.
        sort: Dictionary of sort order for fields. Keys are field names and values
            are 1 for ascending or -1 for descending.
        skip: number documents to skip.
        limit: limit on total number of documents returned.

    """
    prop_keys = set()
    if isinstance(properties, dict):
        prop_keys = set(properties.keys())
    elif isinstance(properties, list):
        prop_keys = set(properties)

    for doc in self.index.query(criteria=criteria, sort=sort, limit=limit, skip=skip):
        if properties is not None and prop_keys.issubset(set(doc.keys())):
            yield {p: doc[p] for p in properties if p in doc}
        else:
            try:
                # TODO: This is ugly and unsafe, do some real checking before pulling data
                data = self.s3_bucket.Object(self._get_full_key_path(doc[self.key])).get()["Body"].read()
            except botocore.exceptions.ClientError as e:
                # If a client error is thrown, then check that it was a NoSuchKey or NoSuchBucket error.
                # If it was a NoSuchKey error, then the object does not exist.
                error_code = e.response["Error"]["Code"]
                if error_code in ["NoSuchKey", "NoSuchBucket"]:
                    error_message = e.response["Error"]["Message"]
                    self.logger.error(
                        f"S3 returned '{error_message}' while querying '{self.bucket}' for '{doc[self.key]}'"
                    )
                    continue
                else:
                    raise e

            if self.unpack_data:
                data = self._read_data(data=data, compress_header=doc.get("compression", ""))

                if self.last_updated_field in doc:
                    data[self.last_updated_field] = doc[self.last_updated_field]

            yield data

rebuild_index_from_s3_data(**kwargs)

Rebuilds the index Store from the data in S3.

Relies on the index document being stores as the metadata for the file. This can help recover lost databases.

Source code in src/maggma/stores/aws.py
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
def rebuild_index_from_s3_data(self, **kwargs):
    """
    Rebuilds the index Store from the data in S3.

    Relies on the index document being stores as the metadata for the file. This can
    help recover lost databases.
    """
    bucket = self.s3_bucket
    objects = bucket.objects.filter(Prefix=self.sub_dir)
    for obj in objects:
        key_ = self._get_full_key_path(obj.key)
        data = self.s3_bucket.Object(key_).get()["Body"].read()

        if self.compress:
            data = self._get_decompression_function()(data)
        unpacked_data = msgpack.unpackb(data, raw=False)
        self.update(unpacked_data, **kwargs)

rebuild_metadata_from_index(index_query=None)

Read data from the index store and populate the metadata of the S3 bucket. Force all the keys to be lower case to be Minio compatible.

Parameters:

Name Type Description Default
index_query Optional[dict]

query on the index store.

None
Source code in src/maggma/stores/aws.py
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
def rebuild_metadata_from_index(self, index_query: Optional[dict] = None):
    """
    Read data from the index store and populate the metadata of the S3 bucket.
    Force all the keys to be lower case to be Minio compatible.

    Args:
        index_query: query on the index store.
    """
    qq = {} if index_query is None else index_query
    for index_doc in self.index.query(qq):
        key_ = self._get_full_key_path(index_doc[self.key])
        s3_object = self.s3_bucket.Object(key_)
        new_meta = {self._sanitize_key(k): v for k, v in s3_object.metadata.items()}
        for k, v in index_doc.items():
            new_meta[str(k).lower()] = v
        new_meta.pop("_id")
        if self.last_updated_field in new_meta:
            new_meta[self.last_updated_field] = str(to_isoformat_ceil_ms(new_meta[self.last_updated_field]))
        # s3_object.metadata.update(new_meta)
        s3_object.copy_from(
            CopySource={"Bucket": self.s3_bucket.name, "Key": key_},
            Metadata=new_meta,
            MetadataDirective="REPLACE",
        )

remove_docs(criteria, remove_s3_object=False)

Remove docs matching the query dictionary.

Parameters:

Name Type Description Default
criteria Dict

query dictionary to match.

required
remove_s3_object bool

whether to remove the actual S3 object or not.

False
Source code in src/maggma/stores/aws.py
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
def remove_docs(self, criteria: Dict, remove_s3_object: bool = False):
    """
    Remove docs matching the query dictionary.

    Args:
        criteria: query dictionary to match.
        remove_s3_object: whether to remove the actual S3 object or not.
    """
    if not remove_s3_object:
        self.index.remove_docs(criteria=criteria)
    else:
        to_remove = self.index.distinct(self.key, criteria=criteria)
        self.index.remove_docs(criteria=criteria)

        # Can remove up to 1000 items at a time via boto
        to_remove_chunks = list(grouper(to_remove, n=1000))
        for chunk_to_remove in to_remove_chunks:
            objlist = [{"Key": self._get_full_key_path(obj)} for obj in chunk_to_remove]
            self.s3_bucket.delete_objects(Delete={"Objects": objlist})

update(docs, key=None, additional_metadata=None)

Update documents into the Store.

Parameters:

Name Type Description Default
docs Union[List[Dict], Dict]

the document or list of documents to update.

required
key Union[List, str, None]

field name(s) to determine uniqueness for a document, can be a list of multiple fields, a single field, or None if the Store's key field is to be used.

None
additional_metadata Union[str, List[str], None]

field(s) to include in the S3 store's metadata.

None
Source code in src/maggma/stores/aws.py
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
def update(
    self,
    docs: Union[List[Dict], Dict],
    key: Union[List, str, None] = None,
    additional_metadata: Union[str, List[str], None] = None,
):
    """
    Update documents into the Store.

    Args:
        docs: the document or list of documents to update.
        key: field name(s) to determine uniqueness for a document, can be a list of
            multiple fields, a single field, or None if the Store's key field is to
            be used.
        additional_metadata: field(s) to include in the S3 store's metadata.
    """
    if not isinstance(docs, list):
        docs = [docs]

    if isinstance(key, str):
        key = [key]
    elif not key:
        key = [self.key]

    if additional_metadata is None:
        additional_metadata = []
    elif isinstance(additional_metadata, str):
        additional_metadata = [additional_metadata]
    else:
        additional_metadata = list(additional_metadata)

    self._write_to_s3_and_index(docs, key + additional_metadata + self.searchable_fields)

write_doc_to_s3(doc, search_keys)

Write the data to s3 and return the metadata to be inserted into the index db.

Parameters:

Name Type Description Default
doc Dict

the document.

required
search_keys List[str]

list of keys to pull from the docs and be inserted into the index db.

required

Returns:

Name Type Description
Dict Dict

The metadata to be inserted into the index db

Source code in src/maggma/stores/aws.py
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
def write_doc_to_s3(self, doc: Dict, search_keys: List[str]) -> Dict:
    """
    Write the data to s3 and return the metadata to be inserted into the index db.

    Args:
        doc: the document.
        search_keys: list of keys to pull from the docs and be inserted into the
            index db.

    Returns:
        Dict: The metadata to be inserted into the index db
    """
    s3_bucket = self._get_bucket()

    search_doc = {k: doc[k] for k in search_keys}
    search_doc[self.key] = doc[self.key]  # Ensure key is in metadata
    if self.sub_dir != "":
        search_doc["sub_dir"] = self.sub_dir

    # Remove MongoDB _id from search
    if "_id" in search_doc:
        del search_doc["_id"]

    # to make hashing more meaningful, make sure last updated field is removed
    lu_info = doc.pop(self.last_updated_field, None)
    data = msgpack.packb(doc, default=monty_default)

    if self.compress:
        # Compress with zlib if chosen
        search_doc["compression"] = "zlib"
        data = self._get_compression_function()(data)

    # keep a record of original keys, in case these are important for the individual researcher
    # it is not expected that this information will be used except in disaster recovery
    s3_to_mongo_keys = {k: self._sanitize_key(k) for k in search_doc}
    s3_to_mongo_keys["s3-to-mongo-keys"] = "s3-to-mongo-keys"  # inception
    # encode dictionary since values have to be strings
    search_doc["s3-to-mongo-keys"] = dumps(s3_to_mongo_keys)
    s3_bucket.upload_fileobj(
        Fileobj=BytesIO(data),
        Key=self._get_full_key_path(str(doc[self.key])),
        ExtraArgs={"Metadata": {s3_to_mongo_keys[k]: str(v) for k, v in search_doc.items()}},
    )

    if lu_info is not None:
        search_doc[self.last_updated_field] = lu_info

    if self.store_hash:
        hasher = sha1()
        hasher.update(data)
        obj_hash = hasher.hexdigest()
        search_doc["obj_hash"] = obj_hash
    return search_doc

Advanced Stores for behavior outside normal access patterns.

AliasingStore

Bases: Store

Special Store that aliases for the primary accessors.

Source code in src/maggma/stores/advanced_stores.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215