Writing a Builder

Builder Architecture

A Builder is a class that inherits from maggma.core.Builder and implement 3 methods:

  • get_items: This method should return some iterable of items to run through process_item
  • process_item: This method should take a single item, process it, and return the processed item
  • update_targets: This method should take a list of processed items and update the target stores.

To make this less abstract, we will write a builder that multiplies the "a" sub-document by a pre-configured multiplier. Let's assume we have some source collection in MongoDB with documents that look like this:

    "id": 1,
    "a": 3,
    "last_updated": "2019-11-3"

Class definition and __init__

A simple class definition for a Maggma-based builder looks like this:

from maggma.core import Builder
from maggma.core import Store

class MultiplyBuilder(Builder):
    Simple builder that multiplies the "a" sub-document by pre-set value

The __init__ for a builder can have any set of parameters. Generally, you want a source Store and a target Store along with any parameters that configure the builder. Due to the MSONable pattern, any parameters to __init__ have to be stored as attributes. A simple __init__ would look like this:

    def __init__(self, source: Store, target: Store, multiplier: int = 2, **kwargs):
            source: the source store
            target: the target store
            multiplier: the multiplier to apply to "a" sub-document
        self.source = source = target
        self.multiplier = multiplier
        self.kwargs = kwargs


Python type annotations provide a really nice way of documenting the types we expect and being able to later type check using mypy. We defined the type for source and target as Store since we only care that implements that pattern. How exactly these Stores operate doesn't concern us here.

Note that the __init__ arguments: source, target, multiplier, and kwargs get saved as attributes:

        self.source = source = target
        self.multiplier = multiplier
        self.kwargs = kwargs

Finally, we want to call the base Builder's __init__ to tell it our sources and targets for this builder. In addition, we pass along any extra parameters that might configured the base builder class.


Calling the parent class __init__ is a good practice as sub-classing builders is a good way to encapsulate complex logic.


get_items is conceptually a simple method to implement, but in practice can easily be more code than the rest of the builder. All of the logic for getting data from the sources has to happen here, which requires some planning. get_items should also sort all of the data into individual items to process. This simple builder has a very easy get_items:

    def get_items(self) -> Iterator:
        Gets induvidual documents to multiply

        return self.source.query()

Here, get items just returns the results of query() from the store. It could also have been written as a generator:

    def get_items(self) -> Iterable:
        Gets induvidual documents to multiply
        for doc in self.source.query():
            yield doc

We could have also returned a list of items:

    def get_items(self) -> Iterable:
        Gets induvidual documents to multiply
        docs = list(self.source.query())

One advantage of using the generator approach is it is less memory intensive than the approach where a list of items returned. For large datasets, returning a list of all items for processing may be prohibitive due to memory constraints.


process_item just has to do the parallelizable work on each item. Since the item is whatever comes out of get_items, you know exactly what it should be. It may be a single document, a list of documents, a mapping, a set, etc.

Our simple process item just has to multiply one field by self.multiplier:

    def process_item(self, item : Dict) -> Dict:
        Multiplies the "a" sub-document by self.multiplier
        new_item = dict(**item)
        new_item["a"] *= self.multiplier
        return new_item


Finally, we have to put the processed item in to the target store:

    def update_targets(self,items: List[Dict]):
        Adds the processed items into the target store


Note that whatever process_item returns, update_targets takes a List of these: For instance, if process_item returns str, then update_targets would look like:

    def update_target(self,items: List[str]):

Putting it all together we get:

from typing import Dict, Iterable, List
from maggma.core import Builder
from maggma.core import Store

class MultiplyBuilder(Builder):
    Simple builder that multiplies the "a" sub-document by pre-set value

    def __init__(self, source: Store, target: Store, multiplier: int = 2, **kwargs):
            source: the source store
            target: the target store
            multiplier: the multiplier to apply to "a" sub-document
        self.source = source = target
        self.multiplier = multiplier
        self.kwargs = kwargs


    def get_items(self) -> Iterable:
        Gets induvidual documents to multiply
        docs = list(self.source.query())

    def process_item(self, item : Dict) -> Dict:
        Multiplies the "a" sub-document by self.multiplier
        new_item = dict(**item)
        new_item["a"] *= self.multiplier
        return new_item

    def update_targets(self,items: List[Dict]):
        Adds the processed items into the target store

Distributed Processing

maggma can distribute a builder across multiple computers.

The Builder must have a prechunk method defined. prechunk should do a subset of get_items to figure out what needs to be processed and then return dictionaries that modify the Builder in-place to only work on each subset.

For example, if in the above example we'd first have to update the builder to be able to work on a subset of keys. One pattern is to define a generic query argument for the builder and use that in get items:

    def __init__(self, source: Store, target: Store, multiplier: int = 2, query: Optional[Dict] = None, **kwargs):
            source: the source store
            target: the target store
            multiplier: the multiplier to apply to "a" sub-document
        self.source = source = target
        self.multiplier = multiplier
        self.query = query
        self.kwargs = kwargs


    def get_items(self) -> Iterable:
        Gets induvidual documents to multiply
        query = self.query or {}
        docs = list(self.source.query(criteria=query))

Then we can define a prechunk method that modifies the Builder dict in place to operate on just a subset of the keys:

    from maggma.utils import grouper
    def prechunk(self, number_splits: int) -> Iterable[Dict]:
        keys  = self.source.distinct(self.source.key)
        for split in grouper(keys, N):
            yield {
                "query": {self.source.key: {"$in": list(split)}}

When distributed processing runs, it will modify the Builder dictionary in place by the prechunk dictionary. In this case, each builder distribute to a worker will get a modified query parameter that only runs on a subset of all possible keys.