Group Builder¶
Another advanced template in maggma
is the GroupBuilder
, which groups documents together before applying your function on the group of items. Just like MapBuilder
, GroupBuilder
also handles incremental building, keeping track of errors, getting only the data you need, and managing timeouts. GroupBuilder won't delete orphaned documents since that reverse relationship isn't valid.
Let's create a simple ResupplyBuilder
, which will look at the inventory of items and determine what items need resupply. The source document will look something like this:
{
"name": "Banana",
"type": "fruit",
"quantity": 20,
"minimum": 10,
"last_updated": "2019-11-3T19:09:45"
}
Our builder should give us documents that look like this:
{
"names": ["Grapes", "Apples", "Bananas"],
"type": "fruit",
"resupply": {
"Apples": 10,
"Bananes": 0,
"Grapes": 5
},
"last_updated": "2019-11-3T19:09:45"
}
To begin, we define our GroupBuilder
:
from maggma.builders import GroupBuilder
from maggma.core import Store
class ResupplyBuilder(GroupBuilder):
"""
Simple builder that determines which items to resupply
"""
def __init__(inventory: Store, resupply: Store,resupply_percent : int = 100, **kwargs):
"""
Arguments:
inventory: current inventory information
resupply: target resupply information
resupply_percent: the percent of the minimum to include in the resupply
"""
self.inventory = inventory
self.resupply = resupply
self.resupply_percent = resupply_percent
self.kwargs = kwargs
super().__init__(source=inventory, target=resupply, grouping_properties=["type"], **kwargs)
Note that unlike the previous MapBuilder
example, we didn't call the source and target stores as such. Providing more useful names is a good idea in writing builders to make it clearer what the underlying data should look like.
GroupBuilder
inherits from MapBuilder
so it has the same configurational parameters.
- query: A query to apply to items in the source Store.
- projection: list of the fields you want to project. This can reduce the data transfer load if you only need certain fields or sub-documents from the source documents
- timeout: optional timeout on the process function
- store_process_timeout: adds the process time into the target document for profiling
- retry_failed: retries running the process function on previously failed documents
One parameter that doesn't work in GroupBuilder
is delete_orphans
, since the Many-to-One relationship makes determining orphaned documents very difficult.
Finally let's get to the hard part which is running our function. We do this by defining unary_function
def unary_function(self, items: List[Dict]) -> Dict:
resupply = {}
for item in items:
if item["quantity"] > item["minimum"]:
resupply[item["name"]] = int(item["minimum"] * self.resupply_percent )
else:
resupply[item["name"]] = 0
return {"resupply": resupply}
Just as in MapBuilder
, we're not returning all the extra information typically kept in the originally item. Normally, we would have to write code that copies over the source key
and convert it to the target key
. Same goes for the last_updated_field
. GroupBuilder
takes care of this, while also recording errors, processing time, and the Builder version.GroupBuilder
also keeps a plural version of the source.key
field, so in this example, all the name
values will be put together and kept in names