Skip to content

Builders

One-to-One Map Builder and a simple CopyBuilder implementation.

CopyBuilder

Bases: MapBuilder

Sync a source store with a target store.

Source code in src/maggma/builders/map_builder.py
211
212
213
214
215
216
217
218
219
220
class CopyBuilder(MapBuilder):
    """Sync a source store with a target store."""

    def unary_function(self, item):
        """
        Identity function for copy builder map operation.
        """
        if "_id" in item:
            del item["_id"]
        return item

unary_function(item)

Identity function for copy builder map operation.

Source code in src/maggma/builders/map_builder.py
214
215
216
217
218
219
220
def unary_function(self, item):
    """
    Identity function for copy builder map operation.
    """
    if "_id" in item:
        del item["_id"]
    return item

MapBuilder

Bases: Builder

Apply a unary function to yield a target document for each source document.

Supports incremental building, where a source document gets built only if it has newer (by last_updated_field) data than the corresponding (by key) target document.

Source code in src/maggma/builders/map_builder.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
class MapBuilder(Builder, metaclass=ABCMeta):
    """
    Apply a unary function to yield a target document for each source document.

    Supports incremental building, where a source document gets built only if it
    has newer (by last_updated_field) data than the corresponding (by key) target
    document.

    """

    def __init__(
        self,
        source: Store,
        target: Store,
        query: Optional[dict] = None,
        projection: Optional[list] = None,
        delete_orphans: bool = False,
        timeout: int = 0,
        store_process_time: bool = True,
        retry_failed: bool = False,
        **kwargs,
    ):
        """
        Apply a unary function to each source document.

        Args:
            source: source store
            target: target store
            query: optional query to filter source store
            projection: list of keys to project from the source for
                processing. Limits data transfer to improve efficiency.
            delete_orphans: Whether to delete documents on target store
                with key values not present in source store. Deletion happens
                after all updates, during Builder.finalize.
            timeout: maximum running time per item in seconds
            store_process_time: If True, add "_process_time" key to
                document for profiling purposes
            retry_failed: If True, will retry building documents that
                previously failed
        """
        self.source = source
        self.target = target
        self.query = query
        self.projection = projection
        self.delete_orphans = delete_orphans
        self.kwargs = kwargs
        self.timeout = timeout
        self.store_process_time = store_process_time
        self.retry_failed = retry_failed
        super().__init__(sources=[source], targets=[target], **kwargs)

    def ensure_indexes(self):
        """
        Ensures indices on critical fields for MapBuilder.
        """
        index_checks = [
            self.source.ensure_index(self.source.key),
            self.source.ensure_index(self.source.last_updated_field),
            self.target.ensure_index(self.target.key),
            self.target.ensure_index(self.target.last_updated_field),
            self.target.ensure_index("state"),
        ]

        if not all(index_checks):
            self.logger.warning(
                "Missing one or more important indices on stores. "
                "Performance for large stores may be severely degraded. "
                "Ensure indices on target.key and "
                "[(store.last_updated_field, -1), (store.key, 1)] "
                "for each of source and target."
            )

    def prechunk(self, number_splits: int) -> Iterator[dict]:
        """
        Generic prechunk for map builder to perform domain-decomposition
        by the key field.
        """
        self.ensure_indexes()
        keys = self.target.newer_in(self.source, criteria=self.query, exhaustive=True)

        N = ceil(len(keys) / number_splits)
        for split in grouper(keys, N):
            yield {"query": {self.source.key: {"$in": list(split)}}}

    def get_items(self):
        """
        Generic get items for Map Builder designed to perform
        incremental building.
        """
        self.logger.info(f"Starting {self.__class__.__name__} Builder")

        self.ensure_indexes()

        keys = self.target.newer_in(self.source, criteria=self.query, exhaustive=True)
        if self.retry_failed:
            if isinstance(self.query, (dict)):
                failed_query = {"$and": [self.query, {"state": "failed"}]}
            else:
                failed_query = {"state": "failed"}
            failed_keys = self.target.distinct(self.target.key, criteria=failed_query)
            keys = list(set(keys + failed_keys))

        self.logger.info(f"Processing {len(keys)} items")

        if self.projection:
            projection = list({*self.projection, self.source.key, self.source.last_updated_field})
        else:
            projection = None

        self.total = len(keys)
        for chunked_keys in grouper(keys, self.chunk_size):
            chunked_keys = list(chunked_keys)
            yield from list(
                self.source.query(
                    criteria={self.source.key: {"$in": chunked_keys}},
                    properties=projection,
                )
            )

    def process_item(self, item: dict):
        """
        Generic process items to process a dictionary using
        a map function.
        """
        self.logger.debug(f"Processing: {item[self.source.key]}")

        time_start = time()

        try:
            with Timeout(seconds=self.timeout):
                processed = dict(self.unary_function(item))
                processed.update({"state": "successful"})

            for k in [self.source.key, self.source.last_updated_field]:
                if k in processed:
                    del processed[k]

        except Exception as e:
            self.logger.error(traceback.format_exc())
            processed = {"error": str(e), "state": "failed"}

        time_end = time()

        key, last_updated_field = self.source.key, self.source.last_updated_field

        out = {
            self.target.key: item[key],
            self.target.last_updated_field: self.source._lu_func[0](item.get(last_updated_field, datetime.utcnow())),
        }

        if self.store_process_time:
            out["_process_time"] = time_end - time_start

        out.update(processed)
        return out

    def update_targets(self, items: list[dict]):
        """
        Generic update targets for Map Builder.
        """
        target = self.target
        for item in items:
            item["_bt"] = datetime.utcnow()
            if "_id" in item:
                del item["_id"]

        if len(items) > 0:
            target.update(items)

    def finalize(self):
        """
        Finalize MapBuilder operations including removing orphaned documents.
        """
        if self.delete_orphans:
            source_keyvals = set(self.source.distinct(self.source.key))
            target_keyvals = set(self.target.distinct(self.target.key))
            to_delete = list(target_keyvals - source_keyvals)
            if len(to_delete):
                self.logger.info(f"Finalize: Deleting {len(to_delete)} orphans.")
            self.target.remove_docs({self.target.key: {"$in": to_delete}})
        super().finalize()

    @abstractmethod
    def unary_function(self, item):
        """
        ufn: Unary function to process item
                You do not need to provide values for
                source.key and source.last_updated_field in the output.
                Any uncaught exceptions will be caught by
                process_item and logged to the "error" field
                in the target document.
        """

__init__(source, target, query=None, projection=None, delete_orphans=False, timeout=0, store_process_time=True, retry_failed=False, **kwargs)

Apply a unary function to each source document.

Parameters:

Name Type Description Default
source Store

source store

required
target Store

target store

required
query Optional[dict]

optional query to filter source store

None
projection Optional[list]

list of keys to project from the source for processing. Limits data transfer to improve efficiency.

None
delete_orphans bool

Whether to delete documents on target store with key values not present in source store. Deletion happens after all updates, during Builder.finalize.

False
timeout int

maximum running time per item in seconds

0
store_process_time bool

If True, add "_process_time" key to document for profiling purposes

True
retry_failed bool

If True, will retry building documents that previously failed

False
Source code in src/maggma/builders/map_builder.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def __init__(
    self,
    source: Store,
    target: Store,
    query: Optional[dict] = None,
    projection: Optional[list] = None,
    delete_orphans: bool = False,
    timeout: int = 0,
    store_process_time: bool = True,
    retry_failed: bool = False,
    **kwargs,
):
    """
    Apply a unary function to each source document.

    Args:
        source: source store
        target: target store
        query: optional query to filter source store
        projection: list of keys to project from the source for
            processing. Limits data transfer to improve efficiency.
        delete_orphans: Whether to delete documents on target store
            with key values not present in source store. Deletion happens
            after all updates, during Builder.finalize.
        timeout: maximum running time per item in seconds
        store_process_time: If True, add "_process_time" key to
            document for profiling purposes
        retry_failed: If True, will retry building documents that
            previously failed
    """
    self.source = source
    self.target = target
    self.query = query
    self.projection = projection
    self.delete_orphans = delete_orphans
    self.kwargs = kwargs
    self.timeout = timeout
    self.store_process_time = store_process_time
    self.retry_failed = retry_failed
    super().__init__(sources=[source], targets=[target], **kwargs)

ensure_indexes()

Ensures indices on critical fields for MapBuilder.

Source code in src/maggma/builders/map_builder.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def ensure_indexes(self):
    """
    Ensures indices on critical fields for MapBuilder.
    """
    index_checks = [
        self.source.ensure_index(self.source.key),
        self.source.ensure_index(self.source.last_updated_field),
        self.target.ensure_index(self.target.key),
        self.target.ensure_index(self.target.last_updated_field),
        self.target.ensure_index("state"),
    ]

    if not all(index_checks):
        self.logger.warning(
            "Missing one or more important indices on stores. "
            "Performance for large stores may be severely degraded. "
            "Ensure indices on target.key and "
            "[(store.last_updated_field, -1), (store.key, 1)] "
            "for each of source and target."
        )

finalize()

Finalize MapBuilder operations including removing orphaned documents.

Source code in src/maggma/builders/map_builder.py
186
187
188
189
190
191
192
193
194
195
196
197
def finalize(self):
    """
    Finalize MapBuilder operations including removing orphaned documents.
    """
    if self.delete_orphans:
        source_keyvals = set(self.source.distinct(self.source.key))
        target_keyvals = set(self.target.distinct(self.target.key))
        to_delete = list(target_keyvals - source_keyvals)
        if len(to_delete):
            self.logger.info(f"Finalize: Deleting {len(to_delete)} orphans.")
        self.target.remove_docs({self.target.key: {"$in": to_delete}})
    super().finalize()

get_items()

Generic get items for Map Builder designed to perform incremental building.

Source code in src/maggma/builders/map_builder.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def get_items(self):
    """
    Generic get items for Map Builder designed to perform
    incremental building.
    """
    self.logger.info(f"Starting {self.__class__.__name__} Builder")

    self.ensure_indexes()

    keys = self.target.newer_in(self.source, criteria=self.query, exhaustive=True)
    if self.retry_failed:
        if isinstance(self.query, (dict)):
            failed_query = {"$and": [self.query, {"state": "failed"}]}
        else:
            failed_query = {"state": "failed"}
        failed_keys = self.target.distinct(self.target.key, criteria=failed_query)
        keys = list(set(keys + failed_keys))

    self.logger.info(f"Processing {len(keys)} items")

    if self.projection:
        projection = list({*self.projection, self.source.key, self.source.last_updated_field})
    else:
        projection = None

    self.total = len(keys)
    for chunked_keys in grouper(keys, self.chunk_size):
        chunked_keys = list(chunked_keys)
        yield from list(
            self.source.query(
                criteria={self.source.key: {"$in": chunked_keys}},
                properties=projection,
            )
        )

prechunk(number_splits)

Generic prechunk for map builder to perform domain-decomposition by the key field.

Source code in src/maggma/builders/map_builder.py
89
90
91
92
93
94
95
96
97
98
99
def prechunk(self, number_splits: int) -> Iterator[dict]:
    """
    Generic prechunk for map builder to perform domain-decomposition
    by the key field.
    """
    self.ensure_indexes()
    keys = self.target.newer_in(self.source, criteria=self.query, exhaustive=True)

    N = ceil(len(keys) / number_splits)
    for split in grouper(keys, N):
        yield {"query": {self.source.key: {"$in": list(split)}}}

process_item(item)

Generic process items to process a dictionary using a map function.

Source code in src/maggma/builders/map_builder.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def process_item(self, item: dict):
    """
    Generic process items to process a dictionary using
    a map function.
    """
    self.logger.debug(f"Processing: {item[self.source.key]}")

    time_start = time()

    try:
        with Timeout(seconds=self.timeout):
            processed = dict(self.unary_function(item))
            processed.update({"state": "successful"})

        for k in [self.source.key, self.source.last_updated_field]:
            if k in processed:
                del processed[k]

    except Exception as e:
        self.logger.error(traceback.format_exc())
        processed = {"error": str(e), "state": "failed"}

    time_end = time()

    key, last_updated_field = self.source.key, self.source.last_updated_field

    out = {
        self.target.key: item[key],
        self.target.last_updated_field: self.source._lu_func[0](item.get(last_updated_field, datetime.utcnow())),
    }

    if self.store_process_time:
        out["_process_time"] = time_end - time_start

    out.update(processed)
    return out

unary_function(item) abstractmethod

Unary function to process item

You do not need to provide values for source.key and source.last_updated_field in the output. Any uncaught exceptions will be caught by process_item and logged to the "error" field in the target document.

Source code in src/maggma/builders/map_builder.py
199
200
201
202
203
204
205
206
207
208
@abstractmethod
def unary_function(self, item):
    """
    ufn: Unary function to process item
            You do not need to provide values for
            source.key and source.last_updated_field in the output.
            Any uncaught exceptions will be caught by
            process_item and logged to the "error" field
            in the target document.
    """

update_targets(items)

Generic update targets for Map Builder.

Source code in src/maggma/builders/map_builder.py
173
174
175
176
177
178
179
180
181
182
183
184
def update_targets(self, items: list[dict]):
    """
    Generic update targets for Map Builder.
    """
    target = self.target
    for item in items:
        item["_bt"] = datetime.utcnow()
        if "_id" in item:
            del item["_id"]

    if len(items) > 0:
        target.update(items)

Many-to-Many GroupBuilder.

GroupBuilder

Bases: Builder

Group source docs and produces merged documents for each group Supports incremental building, where a source group gets (re)built only if it has a newer (by last_updated_field) doc than the corresponding (by key) target doc.

This is a Many-to-One or Many-to-Many Builder. As a result, this builder can't determine when a source document is orphaned.

Source code in src/maggma/builders/group_builder.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
class GroupBuilder(Builder, metaclass=ABCMeta):
    """
    Group source docs and produces merged documents for each group
    Supports incremental building, where a source group gets (re)built only if
    it has a newer (by last_updated_field) doc than the corresponding (by key) target doc.

    This is a Many-to-One or Many-to-Many Builder. As a result, this builder can't determine when a source document
    is orphaned.
    """

    def __init__(
        self,
        source: Store,
        target: Store,
        grouping_keys: list[str],
        query: Optional[dict] = None,
        projection: Optional[list] = None,
        timeout: int = 0,
        store_process_time: bool = True,
        retry_failed: bool = False,
        **kwargs,
    ):
        """
        Args:
            source: source store
            target: target store
            query: optional query to filter items from the source store.
            projection: list of keys to project from the source for
                processing. Limits data transfer to improve efficiency.
            delete_orphans: Whether to delete documents on target store
                with key values not present in source store. Deletion happens
                after all updates, during Builder.finalize.
            timeout: maximum running time per item in seconds
            store_process_time: If True, add "_process_time" key to
                document for profiling purposes
            retry_failed: If True, will retry building documents that
                previously failed.
        """
        self.source = source
        self.target = target
        self.grouping_keys = grouping_keys
        self.query = query if query else {}
        self.projection = projection
        self.kwargs = kwargs
        self.timeout = timeout
        self.store_process_time = store_process_time
        self.retry_failed = retry_failed

        self._target_keys_field = f"{self.source.key}s"

        super().__init__(sources=[source], targets=[target], **kwargs)

    def ensure_indexes(self):
        """
        Ensures indices on critical fields for GroupBuilder
        which include the plural version of the target's key field.
        """
        index_checks = [
            self.source.ensure_index(self.source.key),
            self.source.ensure_index(self.source.last_updated_field),
            self.target.ensure_index(self.target.key),
            self.target.ensure_index(self.target.last_updated_field),
            self.target.ensure_index("state"),
            self.target.ensure_index(self._target_keys_field),
        ]

        if not all(index_checks):
            self.logger.warning(
                "Missing one or more important indices on stores. "
                "Performance for large stores may be severely degraded. "
                "Ensure indices on target.key and "
                "[(store.last_updated_field, -1), (store.key, 1)] "
                "for each of source and target."
            )

    def prechunk(self, number_splits: int) -> Iterator[dict]:
        """
        Generic prechunk for group builder to perform domain-decomposition
        by the grouping keys.
        """
        self.ensure_indexes()

        keys = self.get_ids_to_process()
        groups = self.get_groups_from_keys(keys)

        N = ceil(len(groups) / number_splits)
        for split in grouper(keys, N):
            yield {"query": dict(zip(self.grouping_keys, split))}

    def get_items(self):
        self.logger.info(f"Starting {self.__class__.__name__} Builder")

        self.ensure_indexes()
        keys = self.get_ids_to_process()
        groups = self.get_groups_from_keys(keys)

        if self.projection:
            projection = list({*self.projection, self.source.key, self.source.last_updated_field})
        else:
            projection = None

        self.total = len(groups)
        for group in groups:
            group_criteria = dict(zip(self.grouping_keys, group))
            group_criteria.update(self.query)
            yield list(self.source.query(criteria=group_criteria, properties=projection))

    def process_item(self, item: list[dict]) -> dict[tuple, dict]:  # type: ignore
        keys = [d[self.source.key] for d in item]

        self.logger.debug(f"Processing: {keys}")

        time_start = time()

        try:
            with Timeout(seconds=self.timeout):
                processed = self.unary_function(item)
                processed.update({"state": "successful"})
        except Exception as e:
            self.logger.error(traceback.format_exc())
            processed = {"error": str(e), "state": "failed"}

        time_end = time()

        last_updated = [self.source._lu_func[0](d[self.source.last_updated_field]) for d in item]

        update_doc = {
            self.target.key: keys[0],
            f"{self.source.key}s": keys,
            self.target.last_updated_field: max(last_updated),
            "_bt": datetime.utcnow(),
        }
        processed.update({k: v for k, v in update_doc.items() if k not in processed})

        if self.store_process_time:
            processed["_process_time"] = time_end - time_start

        return processed

    def update_targets(self, items: list[dict]):
        """
        Generic update targets for Group Builder.
        """
        target = self.target
        for item in items:
            if "_id" in item:
                del item["_id"]

        if len(items) > 0:
            target.update(items)

    @abstractmethod
    def unary_function(self, items: list[dict]) -> dict:
        """
        Processing function for GroupBuilder.

        Arguments:
            items: list of of documents with matching grouping keys

        Returns:
            Dictionary mapping:
                tuple of source document keys that are in the grouped document
                to the grouped and processed document
        """

    def get_ids_to_process(self) -> Iterable:
        """
        Gets the IDs that need to be processed.
        """
        distinct_from_target = list(self.target.distinct(self._target_keys_field, criteria=self.query))
        processed_ids = []
        # Not always guaranteed that MongoDB will unpack the list so we
        # have to make sure we do that
        for d in distinct_from_target:
            if isinstance(d, list):
                processed_ids.extend(d)
            else:
                processed_ids.append(d)

        all_ids = set(self.source.distinct(self.source.key, criteria=self.query))
        self.logger.debug(f"Found {len(all_ids)} total docs in source")

        if self.retry_failed:
            failed_keys = self.target.distinct(self._target_keys_field, criteria={"state": "failed", **self.query})
            unprocessed_ids = all_ids - (set(processed_ids) - set(failed_keys))
            self.logger.debug(f"Found {len(failed_keys)} failed IDs in target")
        else:
            unprocessed_ids = all_ids - set(processed_ids)

        self.logger.info(f"Found {len(unprocessed_ids)} IDs to process")

        new_ids = set(self.source.newer_in(self.target, criteria=self.query, exhaustive=False))

        self.logger.info(f"Found {len(new_ids)} updated IDs to process")
        return list(new_ids | unprocessed_ids)

    def get_groups_from_keys(self, keys) -> set[tuple]:
        """
        Get the groups by grouping_keys for these documents.
        """
        grouping_keys = self.grouping_keys

        groups: set[tuple] = set()

        for chunked_keys in grouper(keys, self.chunk_size):
            docs = list(
                self.source.query(
                    criteria={self.source.key: {"$in": chunked_keys}},
                    properties=grouping_keys,
                )
            )

            sub_groups = {tuple(get(d, prop, None) for prop in grouping_keys) for d in docs}
            self.logger.debug(f"Found {len(sub_groups)} subgroups to process")

            groups |= sub_groups

        self.logger.info(f"Found {len(groups)} groups to process")
        return groups

__init__(source, target, grouping_keys, query=None, projection=None, timeout=0, store_process_time=True, retry_failed=False, **kwargs)

Parameters:

Name Type Description Default
source Store

source store

required
target Store

target store

required
query Optional[dict]

optional query to filter items from the source store.

None
projection Optional[list]

list of keys to project from the source for processing. Limits data transfer to improve efficiency.

None
delete_orphans

Whether to delete documents on target store with key values not present in source store. Deletion happens after all updates, during Builder.finalize.

required
timeout int

maximum running time per item in seconds

0
store_process_time bool

If True, add "_process_time" key to document for profiling purposes

True
retry_failed bool

If True, will retry building documents that previously failed.

False
Source code in src/maggma/builders/group_builder.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def __init__(
    self,
    source: Store,
    target: Store,
    grouping_keys: list[str],
    query: Optional[dict] = None,
    projection: Optional[list] = None,
    timeout: int = 0,
    store_process_time: bool = True,
    retry_failed: bool = False,
    **kwargs,
):
    """
    Args:
        source: source store
        target: target store
        query: optional query to filter items from the source store.
        projection: list of keys to project from the source for
            processing. Limits data transfer to improve efficiency.
        delete_orphans: Whether to delete documents on target store
            with key values not present in source store. Deletion happens
            after all updates, during Builder.finalize.
        timeout: maximum running time per item in seconds
        store_process_time: If True, add "_process_time" key to
            document for profiling purposes
        retry_failed: If True, will retry building documents that
            previously failed.
    """
    self.source = source
    self.target = target
    self.grouping_keys = grouping_keys
    self.query = query if query else {}
    self.projection = projection
    self.kwargs = kwargs
    self.timeout = timeout
    self.store_process_time = store_process_time
    self.retry_failed = retry_failed

    self._target_keys_field = f"{self.source.key}s"

    super().__init__(sources=[source], targets=[target], **kwargs)

ensure_indexes()

Ensures indices on critical fields for GroupBuilder which include the plural version of the target's key field.

Source code in src/maggma/builders/group_builder.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def ensure_indexes(self):
    """
    Ensures indices on critical fields for GroupBuilder
    which include the plural version of the target's key field.
    """
    index_checks = [
        self.source.ensure_index(self.source.key),
        self.source.ensure_index(self.source.last_updated_field),
        self.target.ensure_index(self.target.key),
        self.target.ensure_index(self.target.last_updated_field),
        self.target.ensure_index("state"),
        self.target.ensure_index(self._target_keys_field),
    ]

    if not all(index_checks):
        self.logger.warning(
            "Missing one or more important indices on stores. "
            "Performance for large stores may be severely degraded. "
            "Ensure indices on target.key and "
            "[(store.last_updated_field, -1), (store.key, 1)] "
            "for each of source and target."
        )

get_groups_from_keys(keys)

Get the groups by grouping_keys for these documents.

Source code in src/maggma/builders/group_builder.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
def get_groups_from_keys(self, keys) -> set[tuple]:
    """
    Get the groups by grouping_keys for these documents.
    """
    grouping_keys = self.grouping_keys

    groups: set[tuple] = set()

    for chunked_keys in grouper(keys, self.chunk_size):
        docs = list(
            self.source.query(
                criteria={self.source.key: {"$in": chunked_keys}},
                properties=grouping_keys,
            )
        )

        sub_groups = {tuple(get(d, prop, None) for prop in grouping_keys) for d in docs}
        self.logger.debug(f"Found {len(sub_groups)} subgroups to process")

        groups |= sub_groups

    self.logger.info(f"Found {len(groups)} groups to process")
    return groups

get_ids_to_process()

Gets the IDs that need to be processed.

Source code in src/maggma/builders/group_builder.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
def get_ids_to_process(self) -> Iterable:
    """
    Gets the IDs that need to be processed.
    """
    distinct_from_target = list(self.target.distinct(self._target_keys_field, criteria=self.query))
    processed_ids = []
    # Not always guaranteed that MongoDB will unpack the list so we
    # have to make sure we do that
    for d in distinct_from_target:
        if isinstance(d, list):
            processed_ids.extend(d)
        else:
            processed_ids.append(d)

    all_ids = set(self.source.distinct(self.source.key, criteria=self.query))
    self.logger.debug(f"Found {len(all_ids)} total docs in source")

    if self.retry_failed:
        failed_keys = self.target.distinct(self._target_keys_field, criteria={"state": "failed", **self.query})
        unprocessed_ids = all_ids - (set(processed_ids) - set(failed_keys))
        self.logger.debug(f"Found {len(failed_keys)} failed IDs in target")
    else:
        unprocessed_ids = all_ids - set(processed_ids)

    self.logger.info(f"Found {len(unprocessed_ids)} IDs to process")

    new_ids = set(self.source.newer_in(self.target, criteria=self.query, exhaustive=False))

    self.logger.info(f"Found {len(new_ids)} updated IDs to process")
    return list(new_ids | unprocessed_ids)

prechunk(number_splits)

Generic prechunk for group builder to perform domain-decomposition by the grouping keys.

Source code in src/maggma/builders/group_builder.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def prechunk(self, number_splits: int) -> Iterator[dict]:
    """
    Generic prechunk for group builder to perform domain-decomposition
    by the grouping keys.
    """
    self.ensure_indexes()

    keys = self.get_ids_to_process()
    groups = self.get_groups_from_keys(keys)

    N = ceil(len(groups) / number_splits)
    for split in grouper(keys, N):
        yield {"query": dict(zip(self.grouping_keys, split))}

unary_function(items) abstractmethod

Processing function for GroupBuilder.

Parameters:

Name Type Description Default
items list[dict]

list of of documents with matching grouping keys

required

Returns:

Type Description
dict

Dictionary mapping: tuple of source document keys that are in the grouped document to the grouped and processed document

Source code in src/maggma/builders/group_builder.py
170
171
172
173
174
175
176
177
178
179
180
181
182
@abstractmethod
def unary_function(self, items: list[dict]) -> dict:
    """
    Processing function for GroupBuilder.

    Arguments:
        items: list of of documents with matching grouping keys

    Returns:
        Dictionary mapping:
            tuple of source document keys that are in the grouped document
            to the grouped and processed document
    """

update_targets(items)

Generic update targets for Group Builder.

Source code in src/maggma/builders/group_builder.py
158
159
160
161
162
163
164
165
166
167
168
def update_targets(self, items: list[dict]):
    """
    Generic update targets for Group Builder.
    """
    target = self.target
    for item in items:
        if "_id" in item:
            del item["_id"]

    if len(items) > 0:
        target.update(items)