Skip to content

Map Builder

maggma has a built in builder called the MapBuilder which handles a number of tedious tasks in writing a builder. This class is designed to be used similar to a map operator in any other framework in even the map function in python. MapBuilder will take each document in the source store, apply the function you give it, and then store that in the target store. It handles incremental building, keeping track of errors, getting only the data you need, managing timeouts, and deleting orphaned documents through configurational options.

Let's create the same MultiplierBuilder we wrote earlier using MapBuilder:

from maggma.builders import MapBuilder
from maggma.core import Store

class MultiplyBuilder(MapBuilder):
    """
    Simple builder that multiplies the "a" sub-document by pre-set value
    """

Just like before we define a new class, but this time it should inherit from MapBuilder.

    def __init__(self, source: Store, target: Store, multiplier: int = 2, **kwargs):
        """
        Arguments:
            source: the source store
            target: the target store
            multiplier: the multiplier to apply to "a" sub-document
        """
        self.source = source
        self.target = target
        self.multiplier = multiplier
        self.kwargs = kwargs

        kwargs = {k,v in kwargs.items() if k not in ["projection","delete_orphans","timeout","store_process_time","retry_failed"]}

        super().__init__(source=source,
                         target=target,
                         projection=["a"],
                         delete_orphans=False,
                         timeout=10,
                         store_process_time=True,
                         retry_failed=True,
                         **kwargs)

MapBuilder has a number of configurational options that you can hardcode as above or expose as properties for the user through **kwargs:

  • projection: list of the fields you want to project. This can reduce the data transfer load if you only need certain fields or sub-documents from the source documents
  • delete_orphans: this will delete documents in the target which don't have a corresponding document in the source
  • timeout: optional timeout on the process function
  • store_process_timeout: adds the process time into the target document for profiling
  • retry_failed: retries running the process function on previously failed documents

Finally let's get to the hard part which is running our function. We do this by defining unary_function

    def unary_function(self,item):
        return {"a": item["a"] * self.multiplier}

Note that we're not returning all the extra information typically kept in the originally item. Normally, we would have to write code that copies over the source key and convert it to the target key. Same goes for the last_updated_field. MapBuilder takes care of this, while also recording errors, processing time, and the Builder version.