Skip to content

Builders

One-to-One Map Builder and a simple CopyBuilder implementation

CopyBuilder (MapBuilder)

Sync a source store with a target store.

Source code in maggma/builders/map_builder.py
class CopyBuilder(MapBuilder):
    """Sync a source store with a target store."""

    def unary_function(self, item):
        """
        Identity function for copy builder map operation
        """
        if "_id" in item:
            del item["_id"]
        return item

unary_function(self, item)

Identity function for copy builder map operation

Source code in maggma/builders/map_builder.py
def unary_function(self, item):
    """
    Identity function for copy builder map operation
    """
    if "_id" in item:
        del item["_id"]
    return item

MapBuilder (Builder)

Apply a unary function to yield a target document for each source document.

Supports incremental building, where a source document gets built only if it has newer (by last_updated_field) data than the corresponding (by key) target document.

Source code in maggma/builders/map_builder.py
class MapBuilder(Builder, metaclass=ABCMeta):
    """
    Apply a unary function to yield a target document for each source document.

    Supports incremental building, where a source document gets built only if it
    has newer (by last_updated_field) data than the corresponding (by key) target
    document.

    """

    def __init__(
        self,
        source: Store,
        target: Store,
        query: Optional[Dict] = None,
        projection: Optional[List] = None,
        delete_orphans: bool = False,
        timeout: int = 0,
        store_process_time: bool = True,
        retry_failed: bool = False,
        **kwargs,
    ):
        """
        Apply a unary function to each source document.

        Args:
            source: source store
            target: target store
            query: optional query to filter source store
            projection: list of keys to project from the source for
                processing. Limits data transfer to improve efficiency.
            delete_orphans: Whether to delete documents on target store
                with key values not present in source store. Deletion happens
                after all updates, during Builder.finalize.
            timeout: maximum running time per item in seconds
            store_process_time: If True, add "_process_time" key to
                document for profiling purposes
            retry_failed: If True, will retry building documents that
                previously failed
        """
        self.source = source
        self.target = target
        self.query = query
        self.projection = projection
        self.delete_orphans = delete_orphans
        self.kwargs = kwargs
        self.timeout = timeout
        self.store_process_time = store_process_time
        self.retry_failed = retry_failed
        super().__init__(sources=[source], targets=[target], **kwargs)

    def ensure_indexes(self):
        """
        Ensures indicies on critical fields for MapBuilder
        """
        index_checks = [
            self.source.ensure_index(self.source.key),
            self.source.ensure_index(self.source.last_updated_field),
            self.target.ensure_index(self.target.key),
            self.target.ensure_index(self.target.last_updated_field),
            self.target.ensure_index("state"),
        ]

        if not all(index_checks):
            self.logger.warning(
                "Missing one or more important indices on stores. "
                "Performance for large stores may be severely degraded. "
                "Ensure indices on target.key and "
                "[(store.last_updated_field, -1), (store.key, 1)] "
                "for each of source and target."
            )

    def prechunk(self, number_splits: int) -> Iterator[Dict]:
        """
        Generic prechunk for map builder to perform domain-decompostion
        by the key field
        """
        self.ensure_indexes()
        keys = self.target.newer_in(self.source, criteria=self.query, exhaustive=True)

        N = ceil(len(keys) / number_splits)
        for split in grouper(keys, N):
            yield {"query": {self.source.key: {"$in": list(split)}}}

    def get_items(self):
        """
        Generic get items for Map Builder designed to perform
        incremental building
        """

        self.logger.info("Starting {} Builder".format(self.__class__.__name__))

        self.ensure_indexes()

        keys = self.target.newer_in(self.source, criteria=self.query, exhaustive=True)
        if self.retry_failed:
            if isinstance(self.query, (dict)):
                failed_query = {"$and": [self.query, {"state": "failed"}]}
            else:
                failed_query = {"state": "failed"}
            failed_keys = self.target.distinct(self.target.key, criteria=failed_query)
            keys = list(set(keys + failed_keys))

        self.logger.info("Processing {} items".format(len(keys)))

        if self.projection:
            projection = list(
                set(self.projection + [self.source.key, self.source.last_updated_field])
            )
        else:
            projection = None

        self.total = len(keys)
        for chunked_keys in grouper(keys, self.chunk_size):
            chunked_keys = list(chunked_keys)
            for doc in list(
                self.source.query(
                    criteria={self.source.key: {"$in": chunked_keys}},
                    properties=projection,
                )
            ):
                yield doc

    def process_item(self, item: Dict):
        """
        Generic process items to process a dictionary using
        a map function
        """

        self.logger.debug("Processing: {}".format(item[self.source.key]))

        time_start = time()

        try:
            with Timeout(seconds=self.timeout):
                processed = dict(self.unary_function(item))
                processed.update({"state": "successful"})

            for k in [self.source.key, self.source.last_updated_field]:
                if k in processed:
                    del processed[k]

        except Exception as e:
            self.logger.error(traceback.format_exc())
            processed = {"error": str(e), "state": "failed"}

        time_end = time()

        key, last_updated_field = self.source.key, self.source.last_updated_field

        out = {
            self.target.key: item[key],
            self.target.last_updated_field: self.source._lu_func[0](
                item.get(last_updated_field, datetime.utcnow())
            ),
        }

        if self.store_process_time:
            out["_process_time"] = time_end - time_start

        out.update(processed)
        return out

    def update_targets(self, items: List[Dict]):
        """
        Generic update targets for Map Builder
        """
        target = self.target
        for item in items:
            item["_bt"] = datetime.utcnow()
            if "_id" in item:
                del item["_id"]

        if len(items) > 0:
            target.update(items)

    def finalize(self):
        """
        Finalize MapBuilder operations including removing orphaned documents
        """
        if self.delete_orphans:
            source_keyvals = set(self.source.distinct(self.source.key))
            target_keyvals = set(self.target.distinct(self.target.key))
            to_delete = list(target_keyvals - source_keyvals)
            if len(to_delete):
                self.logger.info(
                    "Finalize: Deleting {} orphans.".format(len(to_delete))
                )
            self.target.remove_docs({self.target.key: {"$in": to_delete}})
        super().finalize()

    @abstractmethod
    def unary_function(self, item):
        """
        ufn: Unary function to process item
                You do not need to provide values for
                source.key and source.last_updated_field in the output.
                Any uncaught exceptions will be caught by
                process_item and logged to the "error" field
                in the target document.
        """
        pass

__init__(self, source, target, query=None, projection=None, delete_orphans=False, timeout=0, store_process_time=True, retry_failed=False, **kwargs) special

Apply a unary function to each source document.

Parameters:

Name Type Description Default
source Store

source store

required
target Store

target store

required
query Optional[Dict]

optional query to filter source store

None
projection Optional[List]

list of keys to project from the source for processing. Limits data transfer to improve efficiency.

None
delete_orphans bool

Whether to delete documents on target store with key values not present in source store. Deletion happens after all updates, during Builder.finalize.

False
timeout int

maximum running time per item in seconds

0
store_process_time bool

If True, add "_process_time" key to document for profiling purposes

True
retry_failed bool

If True, will retry building documents that previously failed

False
Source code in maggma/builders/map_builder.py
def __init__(
    self,
    source: Store,
    target: Store,
    query: Optional[Dict] = None,
    projection: Optional[List] = None,
    delete_orphans: bool = False,
    timeout: int = 0,
    store_process_time: bool = True,
    retry_failed: bool = False,
    **kwargs,
):
    """
    Apply a unary function to each source document.

    Args:
        source: source store
        target: target store
        query: optional query to filter source store
        projection: list of keys to project from the source for
            processing. Limits data transfer to improve efficiency.
        delete_orphans: Whether to delete documents on target store
            with key values not present in source store. Deletion happens
            after all updates, during Builder.finalize.
        timeout: maximum running time per item in seconds
        store_process_time: If True, add "_process_time" key to
            document for profiling purposes
        retry_failed: If True, will retry building documents that
            previously failed
    """
    self.source = source
    self.target = target
    self.query = query
    self.projection = projection
    self.delete_orphans = delete_orphans
    self.kwargs = kwargs
    self.timeout = timeout
    self.store_process_time = store_process_time
    self.retry_failed = retry_failed
    super().__init__(sources=[source], targets=[target], **kwargs)

ensure_indexes(self)

Ensures indicies on critical fields for MapBuilder

Source code in maggma/builders/map_builder.py
def ensure_indexes(self):
    """
    Ensures indicies on critical fields for MapBuilder
    """
    index_checks = [
        self.source.ensure_index(self.source.key),
        self.source.ensure_index(self.source.last_updated_field),
        self.target.ensure_index(self.target.key),
        self.target.ensure_index(self.target.last_updated_field),
        self.target.ensure_index("state"),
    ]

    if not all(index_checks):
        self.logger.warning(
            "Missing one or more important indices on stores. "
            "Performance for large stores may be severely degraded. "
            "Ensure indices on target.key and "
            "[(store.last_updated_field, -1), (store.key, 1)] "
            "for each of source and target."
        )

finalize(self)

Finalize MapBuilder operations including removing orphaned documents

Source code in maggma/builders/map_builder.py
def finalize(self):
    """
    Finalize MapBuilder operations including removing orphaned documents
    """
    if self.delete_orphans:
        source_keyvals = set(self.source.distinct(self.source.key))
        target_keyvals = set(self.target.distinct(self.target.key))
        to_delete = list(target_keyvals - source_keyvals)
        if len(to_delete):
            self.logger.info(
                "Finalize: Deleting {} orphans.".format(len(to_delete))
            )
        self.target.remove_docs({self.target.key: {"$in": to_delete}})
    super().finalize()

get_items(self)

Generic get items for Map Builder designed to perform incremental building

Source code in maggma/builders/map_builder.py
def get_items(self):
    """
    Generic get items for Map Builder designed to perform
    incremental building
    """

    self.logger.info("Starting {} Builder".format(self.__class__.__name__))

    self.ensure_indexes()

    keys = self.target.newer_in(self.source, criteria=self.query, exhaustive=True)
    if self.retry_failed:
        if isinstance(self.query, (dict)):
            failed_query = {"$and": [self.query, {"state": "failed"}]}
        else:
            failed_query = {"state": "failed"}
        failed_keys = self.target.distinct(self.target.key, criteria=failed_query)
        keys = list(set(keys + failed_keys))

    self.logger.info("Processing {} items".format(len(keys)))

    if self.projection:
        projection = list(
            set(self.projection + [self.source.key, self.source.last_updated_field])
        )
    else:
        projection = None

    self.total = len(keys)
    for chunked_keys in grouper(keys, self.chunk_size):
        chunked_keys = list(chunked_keys)
        for doc in list(
            self.source.query(
                criteria={self.source.key: {"$in": chunked_keys}},
                properties=projection,
            )
        ):
            yield doc

prechunk(self, number_splits)

Generic prechunk for map builder to perform domain-decompostion by the key field

Source code in maggma/builders/map_builder.py
def prechunk(self, number_splits: int) -> Iterator[Dict]:
    """
    Generic prechunk for map builder to perform domain-decompostion
    by the key field
    """
    self.ensure_indexes()
    keys = self.target.newer_in(self.source, criteria=self.query, exhaustive=True)

    N = ceil(len(keys) / number_splits)
    for split in grouper(keys, N):
        yield {"query": {self.source.key: {"$in": list(split)}}}

process_item(self, item)

Generic process items to process a dictionary using a map function

Source code in maggma/builders/map_builder.py
def process_item(self, item: Dict):
    """
    Generic process items to process a dictionary using
    a map function
    """

    self.logger.debug("Processing: {}".format(item[self.source.key]))

    time_start = time()

    try:
        with Timeout(seconds=self.timeout):
            processed = dict(self.unary_function(item))
            processed.update({"state": "successful"})

        for k in [self.source.key, self.source.last_updated_field]:
            if k in processed:
                del processed[k]

    except Exception as e:
        self.logger.error(traceback.format_exc())
        processed = {"error": str(e), "state": "failed"}

    time_end = time()

    key, last_updated_field = self.source.key, self.source.last_updated_field

    out = {
        self.target.key: item[key],
        self.target.last_updated_field: self.source._lu_func[0](
            item.get(last_updated_field, datetime.utcnow())
        ),
    }

    if self.store_process_time:
        out["_process_time"] = time_end - time_start

    out.update(processed)
    return out

unary_function(self, item)

Unary function to process item

You do not need to provide values for source.key and source.last_updated_field in the output. Any uncaught exceptions will be caught by process_item and logged to the "error" field in the target document.

Source code in maggma/builders/map_builder.py
@abstractmethod
def unary_function(self, item):
    """
    ufn: Unary function to process item
            You do not need to provide values for
            source.key and source.last_updated_field in the output.
            Any uncaught exceptions will be caught by
            process_item and logged to the "error" field
            in the target document.
    """
    pass

update_targets(self, items)

Generic update targets for Map Builder

Source code in maggma/builders/map_builder.py
def update_targets(self, items: List[Dict]):
    """
    Generic update targets for Map Builder
    """
    target = self.target
    for item in items:
        item["_bt"] = datetime.utcnow()
        if "_id" in item:
            del item["_id"]

    if len(items) > 0:
        target.update(items)

Many-to-Many GroupBuilder

GroupBuilder (Builder)

Group source docs and produces merged documents for each group Supports incremental building, where a source group gets (re)built only if it has a newer (by last_updated_field) doc than the corresponding (by key) target doc.

This is a Many-to-One or Many-to-Many Builder. As a result, this builder can't determine when a source document is orphaned.

Source code in maggma/builders/group_builder.py
class GroupBuilder(Builder, metaclass=ABCMeta):
    """
    Group source docs and produces merged documents for each group
    Supports incremental building, where a source group gets (re)built only if
    it has a newer (by last_updated_field) doc than the corresponding (by key) target doc.

    This is a Many-to-One or Many-to-Many Builder. As a result, this builder can't determine when a source document
    is orphaned.
    """

    def __init__(
        self,
        source: Store,
        target: Store,
        grouping_keys: List[str],
        query: Optional[Dict] = None,
        projection: Optional[List] = None,
        timeout: int = 0,
        store_process_time: bool = True,
        retry_failed: bool = False,
        **kwargs,
    ):
        """
        Args:
            source: source store
            target: target store
            query: optional query to filter source store
            projection: list of keys to project from the source for
                processing. Limits data transfer to improve efficiency.
            delete_orphans: Whether to delete documents on target store
                with key values not present in source store. Deletion happens
                after all updates, during Builder.finalize.
            timeout: maximum running time per item in seconds
            store_process_time: If True, add "_process_time" key to
                document for profiling purposes
            retry_failed: If True, will retry building documents that
                previously failed
        """
        self.source = source
        self.target = target
        self.grouping_keys = grouping_keys
        self.query = query
        self.projection = projection
        self.kwargs = kwargs
        self.timeout = timeout
        self.store_process_time = store_process_time
        self.retry_failed = retry_failed

        self._target_keys_field = f"{self.source.key}s"

        super().__init__(sources=[source], targets=[target], **kwargs)

    def ensure_indexes(self):
        """
        Ensures indicies on critical fields for GroupBuilder
        which include the plural version of the target's key field
        """
        index_checks = [
            self.source.ensure_index(self.source.key),
            self.source.ensure_index(self.source.last_updated_field),
            self.target.ensure_index(self.target.key),
            self.target.ensure_index(self.target.last_updated_field),
            self.target.ensure_index("state"),
            self.target.ensure_index(self._target_keys_field),
        ]

        if not all(index_checks):
            self.logger.warning(
                "Missing one or more important indices on stores. "
                "Performance for large stores may be severely degraded. "
                "Ensure indices on target.key and "
                "[(store.last_updated_field, -1), (store.key, 1)] "
                "for each of source and target."
            )

    def prechunk(self, number_splits: int) -> Iterator[Dict]:
        """
        Generic prechunk for group builder to perform domain-decompostion
        by the grouping keys
        """
        self.ensure_indexes()

        keys = self.get_ids_to_process()
        groups = self.get_groups_from_keys(keys)

        N = ceil(len(groups) / number_splits)
        for split in grouper(keys, N):
            yield {"query": dict(zip(self.grouping_keys, split))}

    def get_items(self):

        self.logger.info("Starting {} Builder".format(self.__class__.__name__))

        self.ensure_indexes()
        keys = self.get_ids_to_process()
        groups = self.get_groups_from_keys(keys)

        if self.projection:
            projection = list(
                set(self.projection + [self.source.key, self.source.last_updated_field])
            )
        else:
            projection = None

        self.total = len(groups)
        for group in groups:
            docs = list(
                self.source.query(
                    criteria=dict(zip(self.grouping_keys, group)), properties=projection
                )
            )
            yield docs

    def process_item(self, item: List[Dict]) -> Dict[Tuple, Dict]:  # type: ignore

        keys = list(d[self.source.key] for d in item)

        self.logger.debug("Processing: {}".format(keys))

        time_start = time()

        try:
            with Timeout(seconds=self.timeout):
                processed = self.unary_function(item)
                processed.update({"state": "successful"})
        except Exception as e:
            self.logger.error(traceback.format_exc())
            processed = {"error": str(e), "state": "failed"}

        time_end = time()

        last_updated = [
            self.source._lu_func[0](d[self.source.last_updated_field]) for d in item
        ]

        update_doc = {
            self.target.key: keys[0],
            f"{self.source.key}s": keys,
            self.target.last_updated_field: max(last_updated),
            "_bt": datetime.utcnow(),
        }
        processed.update({k: v for k, v in update_doc.items() if k not in processed})

        if self.store_process_time:
            processed["_process_time"] = time_end - time_start

        return processed

    def update_targets(self, items: List[Dict]):
        """
        Generic update targets for Group Builder
        """
        target = self.target
        for item in items:
            if "_id" in item:
                del item["_id"]

        if len(items) > 0:
            target.update(items)

    @abstractmethod
    def unary_function(self, items: List[Dict]) -> Dict:
        """
        Processing function for GroupBuilder

        Arguments:
            items: list of of documents with matching grouping keys

        Returns:
            Dictionary mapping:
                tuple of source document keys that are in the grouped document
                to the grouped and processed document
        """

    def get_ids_to_process(self) -> Iterable:
        """
        Gets the IDs that need to be processed
        """

        query = self.query or {}

        distinct_from_target = list(
            self.target.distinct(self._target_keys_field, criteria=query)
        )
        processed_ids = []
        # Not always gauranteed that MongoDB will unpack the list so we
        # have to make sure we do that
        for d in distinct_from_target:
            if isinstance(d, list):
                processed_ids.extend(d)
            else:
                processed_ids.append(d)

        all_ids = set(self.source.distinct(self.source.key, criteria=query))
        self.logger.debug(f"Found {len(all_ids)} total docs in source")

        if self.retry_failed:
            failed_keys = self.target.distinct(
                self._target_keys_field, criteria={"state": "failed", **query}
            )
            unprocessed_ids = all_ids - (set(processed_ids) - set(failed_keys))
            self.logger.debug(f"Found {len(failed_keys)} failed IDs in target")
        else:
            unprocessed_ids = all_ids - set(processed_ids)

        self.logger.info(f"Found {len(unprocessed_ids)} IDs to process")

        new_ids = set(
            self.source.newer_in(self.target, criteria=query, exhaustive=False)
        )

        self.logger.info(f"Found {len(new_ids)} updated IDs to process")
        return list(new_ids | unprocessed_ids)

    def get_groups_from_keys(self, keys) -> Set[Tuple]:
        """
        Get the groups by grouping_keys for these documents
        """

        grouping_keys = self.grouping_keys

        groups: Set[Tuple] = set()

        for chunked_keys in grouper(keys, self.chunk_size):
            docs = list(
                self.source.query(
                    criteria={self.source.key: {"$in": chunked_keys}},
                    properties=grouping_keys,
                )
            )

            sub_groups = set(
                tuple(get(d, prop, None) for prop in grouping_keys) for d in docs
            )
            self.logger.debug(f"Found {len(sub_groups)} subgroups to process")

            groups |= sub_groups

        self.logger.info(f"Found {len(groups)} groups to process")
        return groups

__init__(self, source, target, grouping_keys, query=None, projection=None, timeout=0, store_process_time=True, retry_failed=False, **kwargs) special

Parameters:

Name Type Description Default
source Store

source store

required
target Store

target store

required
query Optional[Dict]

optional query to filter source store

None
projection Optional[List]

list of keys to project from the source for processing. Limits data transfer to improve efficiency.

None
delete_orphans

Whether to delete documents on target store with key values not present in source store. Deletion happens after all updates, during Builder.finalize.

required
timeout int

maximum running time per item in seconds

0
store_process_time bool

If True, add "_process_time" key to document for profiling purposes

True
retry_failed bool

If True, will retry building documents that previously failed

False
Source code in maggma/builders/group_builder.py
def __init__(
    self,
    source: Store,
    target: Store,
    grouping_keys: List[str],
    query: Optional[Dict] = None,
    projection: Optional[List] = None,
    timeout: int = 0,
    store_process_time: bool = True,
    retry_failed: bool = False,
    **kwargs,
):
    """
    Args:
        source: source store
        target: target store
        query: optional query to filter source store
        projection: list of keys to project from the source for
            processing. Limits data transfer to improve efficiency.
        delete_orphans: Whether to delete documents on target store
            with key values not present in source store. Deletion happens
            after all updates, during Builder.finalize.
        timeout: maximum running time per item in seconds
        store_process_time: If True, add "_process_time" key to
            document for profiling purposes
        retry_failed: If True, will retry building documents that
            previously failed
    """
    self.source = source
    self.target = target
    self.grouping_keys = grouping_keys
    self.query = query
    self.projection = projection
    self.kwargs = kwargs
    self.timeout = timeout
    self.store_process_time = store_process_time
    self.retry_failed = retry_failed

    self._target_keys_field = f"{self.source.key}s"

    super().__init__(sources=[source], targets=[target], **kwargs)

ensure_indexes(self)

Ensures indicies on critical fields for GroupBuilder which include the plural version of the target's key field

Source code in maggma/builders/group_builder.py
def ensure_indexes(self):
    """
    Ensures indicies on critical fields for GroupBuilder
    which include the plural version of the target's key field
    """
    index_checks = [
        self.source.ensure_index(self.source.key),
        self.source.ensure_index(self.source.last_updated_field),
        self.target.ensure_index(self.target.key),
        self.target.ensure_index(self.target.last_updated_field),
        self.target.ensure_index("state"),
        self.target.ensure_index(self._target_keys_field),
    ]

    if not all(index_checks):
        self.logger.warning(
            "Missing one or more important indices on stores. "
            "Performance for large stores may be severely degraded. "
            "Ensure indices on target.key and "
            "[(store.last_updated_field, -1), (store.key, 1)] "
            "for each of source and target."
        )

get_groups_from_keys(self, keys)

Get the groups by grouping_keys for these documents

Source code in maggma/builders/group_builder.py
def get_groups_from_keys(self, keys) -> Set[Tuple]:
    """
    Get the groups by grouping_keys for these documents
    """

    grouping_keys = self.grouping_keys

    groups: Set[Tuple] = set()

    for chunked_keys in grouper(keys, self.chunk_size):
        docs = list(
            self.source.query(
                criteria={self.source.key: {"$in": chunked_keys}},
                properties=grouping_keys,
            )
        )

        sub_groups = set(
            tuple(get(d, prop, None) for prop in grouping_keys) for d in docs
        )
        self.logger.debug(f"Found {len(sub_groups)} subgroups to process")

        groups |= sub_groups

    self.logger.info(f"Found {len(groups)} groups to process")
    return groups

get_ids_to_process(self)

Gets the IDs that need to be processed

Source code in maggma/builders/group_builder.py
def get_ids_to_process(self) -> Iterable:
    """
    Gets the IDs that need to be processed
    """

    query = self.query or {}

    distinct_from_target = list(
        self.target.distinct(self._target_keys_field, criteria=query)
    )
    processed_ids = []
    # Not always gauranteed that MongoDB will unpack the list so we
    # have to make sure we do that
    for d in distinct_from_target:
        if isinstance(d, list):
            processed_ids.extend(d)
        else:
            processed_ids.append(d)

    all_ids = set(self.source.distinct(self.source.key, criteria=query))
    self.logger.debug(f"Found {len(all_ids)} total docs in source")

    if self.retry_failed:
        failed_keys = self.target.distinct(
            self._target_keys_field, criteria={"state": "failed", **query}
        )
        unprocessed_ids = all_ids - (set(processed_ids) - set(failed_keys))
        self.logger.debug(f"Found {len(failed_keys)} failed IDs in target")
    else:
        unprocessed_ids = all_ids - set(processed_ids)

    self.logger.info(f"Found {len(unprocessed_ids)} IDs to process")

    new_ids = set(
        self.source.newer_in(self.target, criteria=query, exhaustive=False)
    )

    self.logger.info(f"Found {len(new_ids)} updated IDs to process")
    return list(new_ids | unprocessed_ids)

get_items(self)

Returns all the items to process.

Returns:

Type Description

generator or list of items to process

Source code in maggma/builders/group_builder.py
def get_items(self):

    self.logger.info("Starting {} Builder".format(self.__class__.__name__))

    self.ensure_indexes()
    keys = self.get_ids_to_process()
    groups = self.get_groups_from_keys(keys)

    if self.projection:
        projection = list(
            set(self.projection + [self.source.key, self.source.last_updated_field])
        )
    else:
        projection = None

    self.total = len(groups)
    for group in groups:
        docs = list(
            self.source.query(
                criteria=dict(zip(self.grouping_keys, group)), properties=projection
            )
        )
        yield docs

prechunk(self, number_splits)

Generic prechunk for group builder to perform domain-decompostion by the grouping keys

Source code in maggma/builders/group_builder.py
def prechunk(self, number_splits: int) -> Iterator[Dict]:
    """
    Generic prechunk for group builder to perform domain-decompostion
    by the grouping keys
    """
    self.ensure_indexes()

    keys = self.get_ids_to_process()
    groups = self.get_groups_from_keys(keys)

    N = ceil(len(groups) / number_splits)
    for split in grouper(keys, N):
        yield {"query": dict(zip(self.grouping_keys, split))}

process_item(self, item)

Process an item. There should be no database operations in this method. Default behavior is to return the item.

Parameters:

Name Type Description Default
item List[Dict] required

Returns:

Type Description
item

an item to update

Source code in maggma/builders/group_builder.py
def process_item(self, item: List[Dict]) -> Dict[Tuple, Dict]:  # type: ignore

    keys = list(d[self.source.key] for d in item)

    self.logger.debug("Processing: {}".format(keys))

    time_start = time()

    try:
        with Timeout(seconds=self.timeout):
            processed = self.unary_function(item)
            processed.update({"state": "successful"})
    except Exception as e:
        self.logger.error(traceback.format_exc())
        processed = {"error": str(e), "state": "failed"}

    time_end = time()

    last_updated = [
        self.source._lu_func[0](d[self.source.last_updated_field]) for d in item
    ]

    update_doc = {
        self.target.key: keys[0],
        f"{self.source.key}s": keys,
        self.target.last_updated_field: max(last_updated),
        "_bt": datetime.utcnow(),
    }
    processed.update({k: v for k, v in update_doc.items() if k not in processed})

    if self.store_process_time:
        processed["_process_time"] = time_end - time_start

    return processed

unary_function(self, items)

Processing function for GroupBuilder

Parameters:

Name Type Description Default
items List[Dict]

list of of documents with matching grouping keys

required

Returns:

Type Description
Dictionary mapping

tuple of source document keys that are in the grouped document to the grouped and processed document

Source code in maggma/builders/group_builder.py
@abstractmethod
def unary_function(self, items: List[Dict]) -> Dict:
    """
    Processing function for GroupBuilder

    Arguments:
        items: list of of documents with matching grouping keys

    Returns:
        Dictionary mapping:
            tuple of source document keys that are in the grouped document
            to the grouped and processed document
    """

update_targets(self, items)

Generic update targets for Group Builder

Source code in maggma/builders/group_builder.py
def update_targets(self, items: List[Dict]):
    """
    Generic update targets for Group Builder
    """
    target = self.target
    for item in items:
        if "_id" in item:
            del item["_id"]

    if len(items) > 0:
        target.update(items)