Builders
One-to-One Map Builder and a simple CopyBuilder implementation
CopyBuilder (MapBuilder)
¶
Sync a source store with a target store.
Source code in maggma/builders/map_builder.py
class CopyBuilder(MapBuilder):
"""Sync a source store with a target store."""
def unary_function(self, item):
"""
Identity function for copy builder map operation
"""
if "_id" in item:
del item["_id"]
return item
unary_function(self, item)
¶
Identity function for copy builder map operation
Source code in maggma/builders/map_builder.py
def unary_function(self, item):
"""
Identity function for copy builder map operation
"""
if "_id" in item:
del item["_id"]
return item
MapBuilder (Builder)
¶
Apply a unary function to yield a target document for each source document.
Supports incremental building, where a source document gets built only if it has newer (by last_updated_field) data than the corresponding (by key) target document.
Source code in maggma/builders/map_builder.py
class MapBuilder(Builder, metaclass=ABCMeta):
"""
Apply a unary function to yield a target document for each source document.
Supports incremental building, where a source document gets built only if it
has newer (by last_updated_field) data than the corresponding (by key) target
document.
"""
def __init__(
self,
source: Store,
target: Store,
query: Optional[Dict] = None,
projection: Optional[List] = None,
delete_orphans: bool = False,
timeout: int = 0,
store_process_time: bool = True,
retry_failed: bool = False,
**kwargs,
):
"""
Apply a unary function to each source document.
Args:
source: source store
target: target store
query: optional query to filter source store
projection: list of keys to project from the source for
processing. Limits data transfer to improve efficiency.
delete_orphans: Whether to delete documents on target store
with key values not present in source store. Deletion happens
after all updates, during Builder.finalize.
timeout: maximum running time per item in seconds
store_process_time: If True, add "_process_time" key to
document for profiling purposes
retry_failed: If True, will retry building documents that
previously failed
"""
self.source = source
self.target = target
self.query = query
self.projection = projection
self.delete_orphans = delete_orphans
self.kwargs = kwargs
self.timeout = timeout
self.store_process_time = store_process_time
self.retry_failed = retry_failed
super().__init__(sources=[source], targets=[target], **kwargs)
def ensure_indexes(self):
"""
Ensures indicies on critical fields for MapBuilder
"""
index_checks = [
self.source.ensure_index(self.source.key),
self.source.ensure_index(self.source.last_updated_field),
self.target.ensure_index(self.target.key),
self.target.ensure_index(self.target.last_updated_field),
self.target.ensure_index("state"),
]
if not all(index_checks):
self.logger.warning(
"Missing one or more important indices on stores. "
"Performance for large stores may be severely degraded. "
"Ensure indices on target.key and "
"[(store.last_updated_field, -1), (store.key, 1)] "
"for each of source and target."
)
def prechunk(self, number_splits: int) -> Iterator[Dict]:
"""
Generic prechunk for map builder to perform domain-decompostion
by the key field
"""
self.ensure_indexes()
keys = self.target.newer_in(self.source, criteria=self.query, exhaustive=True)
N = ceil(len(keys) / number_splits)
for split in grouper(keys, N):
yield {"query": {self.source.key: {"$in": list(split)}}}
def get_items(self):
"""
Generic get items for Map Builder designed to perform
incremental building
"""
self.logger.info("Starting {} Builder".format(self.__class__.__name__))
self.ensure_indexes()
keys = self.target.newer_in(self.source, criteria=self.query, exhaustive=True)
if self.retry_failed:
if isinstance(self.query, (dict)):
failed_query = {"$and": [self.query, {"state": "failed"}]}
else:
failed_query = {"state": "failed"}
failed_keys = self.target.distinct(self.target.key, criteria=failed_query)
keys = list(set(keys + failed_keys))
self.logger.info("Processing {} items".format(len(keys)))
if self.projection:
projection = list(
set(self.projection + [self.source.key, self.source.last_updated_field])
)
else:
projection = None
self.total = len(keys)
for chunked_keys in grouper(keys, self.chunk_size):
chunked_keys = list(chunked_keys)
for doc in list(
self.source.query(
criteria={self.source.key: {"$in": chunked_keys}},
properties=projection,
)
):
yield doc
def process_item(self, item: Dict):
"""
Generic process items to process a dictionary using
a map function
"""
self.logger.debug("Processing: {}".format(item[self.source.key]))
time_start = time()
try:
with Timeout(seconds=self.timeout):
processed = dict(self.unary_function(item))
processed.update({"state": "successful"})
for k in [self.source.key, self.source.last_updated_field]:
if k in processed:
del processed[k]
except Exception as e:
self.logger.error(traceback.format_exc())
processed = {"error": str(e), "state": "failed"}
time_end = time()
key, last_updated_field = self.source.key, self.source.last_updated_field
out = {
self.target.key: item[key],
self.target.last_updated_field: self.source._lu_func[0](
item.get(last_updated_field, datetime.utcnow())
),
}
if self.store_process_time:
out["_process_time"] = time_end - time_start
out.update(processed)
return out
def update_targets(self, items: List[Dict]):
"""
Generic update targets for Map Builder
"""
target = self.target
for item in items:
item["_bt"] = datetime.utcnow()
if "_id" in item:
del item["_id"]
if len(items) > 0:
target.update(items)
def finalize(self):
"""
Finalize MapBuilder operations including removing orphaned documents
"""
if self.delete_orphans:
source_keyvals = set(self.source.distinct(self.source.key))
target_keyvals = set(self.target.distinct(self.target.key))
to_delete = list(target_keyvals - source_keyvals)
if len(to_delete):
self.logger.info(
"Finalize: Deleting {} orphans.".format(len(to_delete))
)
self.target.remove_docs({self.target.key: {"$in": to_delete}})
super().finalize()
@abstractmethod
def unary_function(self, item):
"""
ufn: Unary function to process item
You do not need to provide values for
source.key and source.last_updated_field in the output.
Any uncaught exceptions will be caught by
process_item and logged to the "error" field
in the target document.
"""
pass
__init__(self, source, target, query=None, projection=None, delete_orphans=False, timeout=0, store_process_time=True, retry_failed=False, **kwargs)
special
¶
Apply a unary function to each source document.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source |
Store |
source store |
required |
target |
Store |
target store |
required |
query |
Optional[Dict] |
optional query to filter source store |
None |
projection |
Optional[List] |
list of keys to project from the source for processing. Limits data transfer to improve efficiency. |
None |
delete_orphans |
bool |
Whether to delete documents on target store with key values not present in source store. Deletion happens after all updates, during Builder.finalize. |
False |
timeout |
int |
maximum running time per item in seconds |
0 |
store_process_time |
bool |
If True, add "_process_time" key to document for profiling purposes |
True |
retry_failed |
bool |
If True, will retry building documents that previously failed |
False |
Source code in maggma/builders/map_builder.py
def __init__(
self,
source: Store,
target: Store,
query: Optional[Dict] = None,
projection: Optional[List] = None,
delete_orphans: bool = False,
timeout: int = 0,
store_process_time: bool = True,
retry_failed: bool = False,
**kwargs,
):
"""
Apply a unary function to each source document.
Args:
source: source store
target: target store
query: optional query to filter source store
projection: list of keys to project from the source for
processing. Limits data transfer to improve efficiency.
delete_orphans: Whether to delete documents on target store
with key values not present in source store. Deletion happens
after all updates, during Builder.finalize.
timeout: maximum running time per item in seconds
store_process_time: If True, add "_process_time" key to
document for profiling purposes
retry_failed: If True, will retry building documents that
previously failed
"""
self.source = source
self.target = target
self.query = query
self.projection = projection
self.delete_orphans = delete_orphans
self.kwargs = kwargs
self.timeout = timeout
self.store_process_time = store_process_time
self.retry_failed = retry_failed
super().__init__(sources=[source], targets=[target], **kwargs)
ensure_indexes(self)
¶
Ensures indicies on critical fields for MapBuilder
Source code in maggma/builders/map_builder.py
def ensure_indexes(self):
"""
Ensures indicies on critical fields for MapBuilder
"""
index_checks = [
self.source.ensure_index(self.source.key),
self.source.ensure_index(self.source.last_updated_field),
self.target.ensure_index(self.target.key),
self.target.ensure_index(self.target.last_updated_field),
self.target.ensure_index("state"),
]
if not all(index_checks):
self.logger.warning(
"Missing one or more important indices on stores. "
"Performance for large stores may be severely degraded. "
"Ensure indices on target.key and "
"[(store.last_updated_field, -1), (store.key, 1)] "
"for each of source and target."
)
finalize(self)
¶
Finalize MapBuilder operations including removing orphaned documents
Source code in maggma/builders/map_builder.py
def finalize(self):
"""
Finalize MapBuilder operations including removing orphaned documents
"""
if self.delete_orphans:
source_keyvals = set(self.source.distinct(self.source.key))
target_keyvals = set(self.target.distinct(self.target.key))
to_delete = list(target_keyvals - source_keyvals)
if len(to_delete):
self.logger.info(
"Finalize: Deleting {} orphans.".format(len(to_delete))
)
self.target.remove_docs({self.target.key: {"$in": to_delete}})
super().finalize()
get_items(self)
¶
Generic get items for Map Builder designed to perform incremental building
Source code in maggma/builders/map_builder.py
def get_items(self):
"""
Generic get items for Map Builder designed to perform
incremental building
"""
self.logger.info("Starting {} Builder".format(self.__class__.__name__))
self.ensure_indexes()
keys = self.target.newer_in(self.source, criteria=self.query, exhaustive=True)
if self.retry_failed:
if isinstance(self.query, (dict)):
failed_query = {"$and": [self.query, {"state": "failed"}]}
else:
failed_query = {"state": "failed"}
failed_keys = self.target.distinct(self.target.key, criteria=failed_query)
keys = list(set(keys + failed_keys))
self.logger.info("Processing {} items".format(len(keys)))
if self.projection:
projection = list(
set(self.projection + [self.source.key, self.source.last_updated_field])
)
else:
projection = None
self.total = len(keys)
for chunked_keys in grouper(keys, self.chunk_size):
chunked_keys = list(chunked_keys)
for doc in list(
self.source.query(
criteria={self.source.key: {"$in": chunked_keys}},
properties=projection,
)
):
yield doc
prechunk(self, number_splits)
¶
Generic prechunk for map builder to perform domain-decompostion by the key field
Source code in maggma/builders/map_builder.py
def prechunk(self, number_splits: int) -> Iterator[Dict]:
"""
Generic prechunk for map builder to perform domain-decompostion
by the key field
"""
self.ensure_indexes()
keys = self.target.newer_in(self.source, criteria=self.query, exhaustive=True)
N = ceil(len(keys) / number_splits)
for split in grouper(keys, N):
yield {"query": {self.source.key: {"$in": list(split)}}}
process_item(self, item)
¶
Generic process items to process a dictionary using a map function
Source code in maggma/builders/map_builder.py
def process_item(self, item: Dict):
"""
Generic process items to process a dictionary using
a map function
"""
self.logger.debug("Processing: {}".format(item[self.source.key]))
time_start = time()
try:
with Timeout(seconds=self.timeout):
processed = dict(self.unary_function(item))
processed.update({"state": "successful"})
for k in [self.source.key, self.source.last_updated_field]:
if k in processed:
del processed[k]
except Exception as e:
self.logger.error(traceback.format_exc())
processed = {"error": str(e), "state": "failed"}
time_end = time()
key, last_updated_field = self.source.key, self.source.last_updated_field
out = {
self.target.key: item[key],
self.target.last_updated_field: self.source._lu_func[0](
item.get(last_updated_field, datetime.utcnow())
),
}
if self.store_process_time:
out["_process_time"] = time_end - time_start
out.update(processed)
return out
unary_function(self, item)
¶
Unary function to process item
You do not need to provide values for source.key and source.last_updated_field in the output. Any uncaught exceptions will be caught by process_item and logged to the "error" field in the target document.
Source code in maggma/builders/map_builder.py
@abstractmethod
def unary_function(self, item):
"""
ufn: Unary function to process item
You do not need to provide values for
source.key and source.last_updated_field in the output.
Any uncaught exceptions will be caught by
process_item and logged to the "error" field
in the target document.
"""
pass
update_targets(self, items)
¶
Generic update targets for Map Builder
Source code in maggma/builders/map_builder.py
def update_targets(self, items: List[Dict]):
"""
Generic update targets for Map Builder
"""
target = self.target
for item in items:
item["_bt"] = datetime.utcnow()
if "_id" in item:
del item["_id"]
if len(items) > 0:
target.update(items)
Many-to-Many GroupBuilder
GroupBuilder (Builder)
¶
Group source docs and produces merged documents for each group Supports incremental building, where a source group gets (re)built only if it has a newer (by last_updated_field) doc than the corresponding (by key) target doc.
This is a Many-to-One or Many-to-Many Builder. As a result, this builder can't determine when a source document is orphaned.
Source code in maggma/builders/group_builder.py
class GroupBuilder(Builder, metaclass=ABCMeta):
"""
Group source docs and produces merged documents for each group
Supports incremental building, where a source group gets (re)built only if
it has a newer (by last_updated_field) doc than the corresponding (by key) target doc.
This is a Many-to-One or Many-to-Many Builder. As a result, this builder can't determine when a source document
is orphaned.
"""
def __init__(
self,
source: Store,
target: Store,
grouping_keys: List[str],
query: Optional[Dict] = None,
projection: Optional[List] = None,
timeout: int = 0,
store_process_time: bool = True,
retry_failed: bool = False,
**kwargs,
):
"""
Args:
source: source store
target: target store
query: optional query to filter source store
projection: list of keys to project from the source for
processing. Limits data transfer to improve efficiency.
delete_orphans: Whether to delete documents on target store
with key values not present in source store. Deletion happens
after all updates, during Builder.finalize.
timeout: maximum running time per item in seconds
store_process_time: If True, add "_process_time" key to
document for profiling purposes
retry_failed: If True, will retry building documents that
previously failed
"""
self.source = source
self.target = target
self.grouping_keys = grouping_keys
self.query = query
self.projection = projection
self.kwargs = kwargs
self.timeout = timeout
self.store_process_time = store_process_time
self.retry_failed = retry_failed
self._target_keys_field = f"{self.source.key}s"
super().__init__(sources=[source], targets=[target], **kwargs)
def ensure_indexes(self):
"""
Ensures indicies on critical fields for GroupBuilder
which include the plural version of the target's key field
"""
index_checks = [
self.source.ensure_index(self.source.key),
self.source.ensure_index(self.source.last_updated_field),
self.target.ensure_index(self.target.key),
self.target.ensure_index(self.target.last_updated_field),
self.target.ensure_index("state"),
self.target.ensure_index(self._target_keys_field),
]
if not all(index_checks):
self.logger.warning(
"Missing one or more important indices on stores. "
"Performance for large stores may be severely degraded. "
"Ensure indices on target.key and "
"[(store.last_updated_field, -1), (store.key, 1)] "
"for each of source and target."
)
def prechunk(self, number_splits: int) -> Iterator[Dict]:
"""
Generic prechunk for group builder to perform domain-decompostion
by the grouping keys
"""
self.ensure_indexes()
keys = self.get_ids_to_process()
groups = self.get_groups_from_keys(keys)
N = ceil(len(groups) / number_splits)
for split in grouper(keys, N):
yield {"query": dict(zip(self.grouping_keys, split))}
def get_items(self):
self.logger.info("Starting {} Builder".format(self.__class__.__name__))
self.ensure_indexes()
keys = self.get_ids_to_process()
groups = self.get_groups_from_keys(keys)
if self.projection:
projection = list(
set(self.projection + [self.source.key, self.source.last_updated_field])
)
else:
projection = None
self.total = len(groups)
for group in groups:
docs = list(
self.source.query(
criteria=dict(zip(self.grouping_keys, group)), properties=projection
)
)
yield docs
def process_item(self, item: List[Dict]) -> Dict[Tuple, Dict]: # type: ignore
keys = list(d[self.source.key] for d in item)
self.logger.debug("Processing: {}".format(keys))
time_start = time()
try:
with Timeout(seconds=self.timeout):
processed = self.unary_function(item)
processed.update({"state": "successful"})
except Exception as e:
self.logger.error(traceback.format_exc())
processed = {"error": str(e), "state": "failed"}
time_end = time()
last_updated = [
self.source._lu_func[0](d[self.source.last_updated_field]) for d in item
]
update_doc = {
self.target.key: keys[0],
f"{self.source.key}s": keys,
self.target.last_updated_field: max(last_updated),
"_bt": datetime.utcnow(),
}
processed.update({k: v for k, v in update_doc.items() if k not in processed})
if self.store_process_time:
processed["_process_time"] = time_end - time_start
return processed
def update_targets(self, items: List[Dict]):
"""
Generic update targets for Group Builder
"""
target = self.target
for item in items:
if "_id" in item:
del item["_id"]
if len(items) > 0:
target.update(items)
@abstractmethod
def unary_function(self, items: List[Dict]) -> Dict:
"""
Processing function for GroupBuilder
Arguments:
items: list of of documents with matching grouping keys
Returns:
Dictionary mapping:
tuple of source document keys that are in the grouped document
to the grouped and processed document
"""
def get_ids_to_process(self) -> Iterable:
"""
Gets the IDs that need to be processed
"""
query = self.query or {}
distinct_from_target = list(
self.target.distinct(self._target_keys_field, criteria=query)
)
processed_ids = []
# Not always gauranteed that MongoDB will unpack the list so we
# have to make sure we do that
for d in distinct_from_target:
if isinstance(d, list):
processed_ids.extend(d)
else:
processed_ids.append(d)
all_ids = set(self.source.distinct(self.source.key, criteria=query))
self.logger.debug(f"Found {len(all_ids)} total docs in source")
if self.retry_failed:
failed_keys = self.target.distinct(
self._target_keys_field, criteria={"state": "failed", **query}
)
unprocessed_ids = all_ids - (set(processed_ids) - set(failed_keys))
self.logger.debug(f"Found {len(failed_keys)} failed IDs in target")
else:
unprocessed_ids = all_ids - set(processed_ids)
self.logger.info(f"Found {len(unprocessed_ids)} IDs to process")
new_ids = set(
self.source.newer_in(self.target, criteria=query, exhaustive=False)
)
self.logger.info(f"Found {len(new_ids)} updated IDs to process")
return list(new_ids | unprocessed_ids)
def get_groups_from_keys(self, keys) -> Set[Tuple]:
"""
Get the groups by grouping_keys for these documents
"""
grouping_keys = self.grouping_keys
groups: Set[Tuple] = set()
for chunked_keys in grouper(keys, self.chunk_size):
docs = list(
self.source.query(
criteria={self.source.key: {"$in": chunked_keys}},
properties=grouping_keys,
)
)
sub_groups = set(
tuple(get(d, prop, None) for prop in grouping_keys) for d in docs
)
self.logger.debug(f"Found {len(sub_groups)} subgroups to process")
groups |= sub_groups
self.logger.info(f"Found {len(groups)} groups to process")
return groups
__init__(self, source, target, grouping_keys, query=None, projection=None, timeout=0, store_process_time=True, retry_failed=False, **kwargs)
special
¶
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source |
Store |
source store |
required |
target |
Store |
target store |
required |
query |
Optional[Dict] |
optional query to filter source store |
None |
projection |
Optional[List] |
list of keys to project from the source for processing. Limits data transfer to improve efficiency. |
None |
delete_orphans |
Whether to delete documents on target store with key values not present in source store. Deletion happens after all updates, during Builder.finalize. |
required | |
timeout |
int |
maximum running time per item in seconds |
0 |
store_process_time |
bool |
If True, add "_process_time" key to document for profiling purposes |
True |
retry_failed |
bool |
If True, will retry building documents that previously failed |
False |
Source code in maggma/builders/group_builder.py
def __init__(
self,
source: Store,
target: Store,
grouping_keys: List[str],
query: Optional[Dict] = None,
projection: Optional[List] = None,
timeout: int = 0,
store_process_time: bool = True,
retry_failed: bool = False,
**kwargs,
):
"""
Args:
source: source store
target: target store
query: optional query to filter source store
projection: list of keys to project from the source for
processing. Limits data transfer to improve efficiency.
delete_orphans: Whether to delete documents on target store
with key values not present in source store. Deletion happens
after all updates, during Builder.finalize.
timeout: maximum running time per item in seconds
store_process_time: If True, add "_process_time" key to
document for profiling purposes
retry_failed: If True, will retry building documents that
previously failed
"""
self.source = source
self.target = target
self.grouping_keys = grouping_keys
self.query = query
self.projection = projection
self.kwargs = kwargs
self.timeout = timeout
self.store_process_time = store_process_time
self.retry_failed = retry_failed
self._target_keys_field = f"{self.source.key}s"
super().__init__(sources=[source], targets=[target], **kwargs)
ensure_indexes(self)
¶
Ensures indicies on critical fields for GroupBuilder which include the plural version of the target's key field
Source code in maggma/builders/group_builder.py
def ensure_indexes(self):
"""
Ensures indicies on critical fields for GroupBuilder
which include the plural version of the target's key field
"""
index_checks = [
self.source.ensure_index(self.source.key),
self.source.ensure_index(self.source.last_updated_field),
self.target.ensure_index(self.target.key),
self.target.ensure_index(self.target.last_updated_field),
self.target.ensure_index("state"),
self.target.ensure_index(self._target_keys_field),
]
if not all(index_checks):
self.logger.warning(
"Missing one or more important indices on stores. "
"Performance for large stores may be severely degraded. "
"Ensure indices on target.key and "
"[(store.last_updated_field, -1), (store.key, 1)] "
"for each of source and target."
)
get_groups_from_keys(self, keys)
¶
Get the groups by grouping_keys for these documents
Source code in maggma/builders/group_builder.py
def get_groups_from_keys(self, keys) -> Set[Tuple]:
"""
Get the groups by grouping_keys for these documents
"""
grouping_keys = self.grouping_keys
groups: Set[Tuple] = set()
for chunked_keys in grouper(keys, self.chunk_size):
docs = list(
self.source.query(
criteria={self.source.key: {"$in": chunked_keys}},
properties=grouping_keys,
)
)
sub_groups = set(
tuple(get(d, prop, None) for prop in grouping_keys) for d in docs
)
self.logger.debug(f"Found {len(sub_groups)} subgroups to process")
groups |= sub_groups
self.logger.info(f"Found {len(groups)} groups to process")
return groups
get_ids_to_process(self)
¶
Gets the IDs that need to be processed
Source code in maggma/builders/group_builder.py
def get_ids_to_process(self) -> Iterable:
"""
Gets the IDs that need to be processed
"""
query = self.query or {}
distinct_from_target = list(
self.target.distinct(self._target_keys_field, criteria=query)
)
processed_ids = []
# Not always gauranteed that MongoDB will unpack the list so we
# have to make sure we do that
for d in distinct_from_target:
if isinstance(d, list):
processed_ids.extend(d)
else:
processed_ids.append(d)
all_ids = set(self.source.distinct(self.source.key, criteria=query))
self.logger.debug(f"Found {len(all_ids)} total docs in source")
if self.retry_failed:
failed_keys = self.target.distinct(
self._target_keys_field, criteria={"state": "failed", **query}
)
unprocessed_ids = all_ids - (set(processed_ids) - set(failed_keys))
self.logger.debug(f"Found {len(failed_keys)} failed IDs in target")
else:
unprocessed_ids = all_ids - set(processed_ids)
self.logger.info(f"Found {len(unprocessed_ids)} IDs to process")
new_ids = set(
self.source.newer_in(self.target, criteria=query, exhaustive=False)
)
self.logger.info(f"Found {len(new_ids)} updated IDs to process")
return list(new_ids | unprocessed_ids)
get_items(self)
¶
Returns all the items to process.
Returns:
Type | Description |
---|---|
generator or list of items to process |
Source code in maggma/builders/group_builder.py
def get_items(self):
self.logger.info("Starting {} Builder".format(self.__class__.__name__))
self.ensure_indexes()
keys = self.get_ids_to_process()
groups = self.get_groups_from_keys(keys)
if self.projection:
projection = list(
set(self.projection + [self.source.key, self.source.last_updated_field])
)
else:
projection = None
self.total = len(groups)
for group in groups:
docs = list(
self.source.query(
criteria=dict(zip(self.grouping_keys, group)), properties=projection
)
)
yield docs
prechunk(self, number_splits)
¶
Generic prechunk for group builder to perform domain-decompostion by the grouping keys
Source code in maggma/builders/group_builder.py
def prechunk(self, number_splits: int) -> Iterator[Dict]:
"""
Generic prechunk for group builder to perform domain-decompostion
by the grouping keys
"""
self.ensure_indexes()
keys = self.get_ids_to_process()
groups = self.get_groups_from_keys(keys)
N = ceil(len(groups) / number_splits)
for split in grouper(keys, N):
yield {"query": dict(zip(self.grouping_keys, split))}
process_item(self, item)
¶
Process an item. There should be no database operations in this method. Default behavior is to return the item.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
item |
List[Dict] |
required |
Returns:
Type | Description |
---|---|
item |
an item to update |
Source code in maggma/builders/group_builder.py
def process_item(self, item: List[Dict]) -> Dict[Tuple, Dict]: # type: ignore
keys = list(d[self.source.key] for d in item)
self.logger.debug("Processing: {}".format(keys))
time_start = time()
try:
with Timeout(seconds=self.timeout):
processed = self.unary_function(item)
processed.update({"state": "successful"})
except Exception as e:
self.logger.error(traceback.format_exc())
processed = {"error": str(e), "state": "failed"}
time_end = time()
last_updated = [
self.source._lu_func[0](d[self.source.last_updated_field]) for d in item
]
update_doc = {
self.target.key: keys[0],
f"{self.source.key}s": keys,
self.target.last_updated_field: max(last_updated),
"_bt": datetime.utcnow(),
}
processed.update({k: v for k, v in update_doc.items() if k not in processed})
if self.store_process_time:
processed["_process_time"] = time_end - time_start
return processed
unary_function(self, items)
¶
Processing function for GroupBuilder
Parameters:
Name | Type | Description | Default |
---|---|---|---|
items |
List[Dict] |
list of of documents with matching grouping keys |
required |
Returns:
Type | Description |
---|---|
Dictionary mapping |
tuple of source document keys that are in the grouped document to the grouped and processed document |
Source code in maggma/builders/group_builder.py
@abstractmethod
def unary_function(self, items: List[Dict]) -> Dict:
"""
Processing function for GroupBuilder
Arguments:
items: list of of documents with matching grouping keys
Returns:
Dictionary mapping:
tuple of source document keys that are in the grouped document
to the grouped and processed document
"""
update_targets(self, items)
¶
Generic update targets for Group Builder
Source code in maggma/builders/group_builder.py
def update_targets(self, items: List[Dict]):
"""
Generic update targets for Group Builder
"""
target = self.target
for item in items:
if "_id" in item:
del item["_id"]
if len(items) > 0:
target.update(items)