Skip to content

Builder

Module containing the core builder definition

Builder (MSONable)

Base Builder class At minimum this class should implement: get_items - Get items from the sources update_targets - Updates the sources with results

Multiprocessing and MPI processing can be used if all the data processing is limited to process_items

Source code in maggma/core/builder.py
class Builder(MSONable, metaclass=ABCMeta):
    """
    Base Builder class
    At minimum this class should implement:
    get_items - Get items from the sources
    update_targets - Updates the sources with results

    Multiprocessing and MPI processing can be used if all
    the data processing is  limited to process_items
    """

    def __init__(
        self,
        sources: Union[List[Store], Store],
        targets: Union[List[Store], Store],
        chunk_size: int = 1000,
    ):
        """
        Initialize the builder the framework.

        Arguments:
            sources: source Store(s)
            targets: target Store(s)
            chunk_size: chunk size for processing
        """
        self.sources = sources if isinstance(sources, list) else [sources]
        self.targets = targets if isinstance(targets, list) else [targets]
        self.chunk_size = chunk_size
        self.total = None  # type: Optional[int]
        self.logger = logging.getLogger(type(self).__name__)
        self.logger.addHandler(logging.NullHandler())

    def connect(self):
        """
        Connect to the builder sources and targets.
        """
        for s in self.sources + self.targets:
            s.connect()

    def prechunk(self, number_splits: int) -> Iterable[Dict]:
        """
        Part of a domain-decomposition paradigm to allow the builder to operate on
        multiple nodes by divinding up the IO as well as the compute
        This function should return an iterator of dictionaries that can be distributed
        to multiple instances of the builder to get/process/udpate on

        Arguments:
            number_splits: The number of groups to split the documents to work on
        """
        self.logger.info(
            f"{self.__class__.__name__} doesn't have distributed processing capabillities."
            " Instead this builder will run on just one worker for all processing"
        )
        raise NotImplementedError(
            f"{self.__class__.__name__} doesn't have distributed processing capabillities."
            " Instead this builder will run on just one worker for all processing"
        )

    @abstractmethod
    def get_items(self) -> Iterable:
        """
        Returns all the items to process.

        Returns:
            generator or list of items to process
        """
        pass

    def process_item(self, item: Any) -> Any:
        """
        Process an item. There should be no database operations in this method.
        Default behavior is to return the item.
        Arguments:
            item:

        Returns:
           item: an item to update
        """
        return item

    @abstractmethod
    def update_targets(self, items: List):
        """
        Takes a list of items from process item and updates the targets with them.
        Can also perform other book keeping in the process such as storing gridfs oids, etc.

        Arguments:
            items:

        Returns:

        """
        pass

    def finalize(self):
        """
        Perform any final clean up.
        """
        # Close any Mongo connections.
        for store in self.sources + self.targets:
            try:
                store.close()
            except AttributeError:
                continue

    def run(self, log_level=logging.DEBUG):
        """
        Run the builder serially
        This is only intended for diagnostic purposes
        """
        # Set up logging
        root = logging.getLogger()
        root.setLevel(log_level)
        ch = TqdmLoggingHandler()
        formatter = logging.Formatter(
            "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
        )
        ch.setFormatter(formatter)
        root.addHandler(ch)

        self.connect()

        cursor = self.get_items()

        for chunk in grouper(tqdm(cursor), self.chunk_size):
            self.logger.info("Processing batch of {} items".format(self.chunk_size))
            processed_chunk = [self.process_item(item) for item in chunk]
            processed_items = [item for item in processed_chunk if item is not None]
            self.update_targets(processed_items)

        self.finalize()

    def __getstate__(self):
        return self.as_dict()

    def __setstate__(self, d):
        d = {k: v for k, v in d.items() if not k.startswith("@")}
        d = MontyDecoder().process_decoded(d)
        self.__init__(**d)

__init__(self, sources, targets, chunk_size=1000) special

Initialize the builder the framework.

Parameters:

Name Type Description Default
sources Union[List[maggma.core.store.Store], maggma.core.store.Store]

source Store(s)

required
targets Union[List[maggma.core.store.Store], maggma.core.store.Store]

target Store(s)

required
chunk_size int

chunk size for processing

1000
Source code in maggma/core/builder.py
def __init__(
    self,
    sources: Union[List[Store], Store],
    targets: Union[List[Store], Store],
    chunk_size: int = 1000,
):
    """
    Initialize the builder the framework.

    Arguments:
        sources: source Store(s)
        targets: target Store(s)
        chunk_size: chunk size for processing
    """
    self.sources = sources if isinstance(sources, list) else [sources]
    self.targets = targets if isinstance(targets, list) else [targets]
    self.chunk_size = chunk_size
    self.total = None  # type: Optional[int]
    self.logger = logging.getLogger(type(self).__name__)
    self.logger.addHandler(logging.NullHandler())

connect(self)

Connect to the builder sources and targets.

Source code in maggma/core/builder.py
def connect(self):
    """
    Connect to the builder sources and targets.
    """
    for s in self.sources + self.targets:
        s.connect()

finalize(self)

Perform any final clean up.

Source code in maggma/core/builder.py
def finalize(self):
    """
    Perform any final clean up.
    """
    # Close any Mongo connections.
    for store in self.sources + self.targets:
        try:
            store.close()
        except AttributeError:
            continue

get_items(self)

Returns all the items to process.

Returns:

Type Description
Iterable

generator or list of items to process

Source code in maggma/core/builder.py
@abstractmethod
def get_items(self) -> Iterable:
    """
    Returns all the items to process.

    Returns:
        generator or list of items to process
    """
    pass

prechunk(self, number_splits)

Part of a domain-decomposition paradigm to allow the builder to operate on multiple nodes by divinding up the IO as well as the compute This function should return an iterator of dictionaries that can be distributed to multiple instances of the builder to get/process/udpate on

Parameters:

Name Type Description Default
number_splits int

The number of groups to split the documents to work on

required
Source code in maggma/core/builder.py
def prechunk(self, number_splits: int) -> Iterable[Dict]:
    """
    Part of a domain-decomposition paradigm to allow the builder to operate on
    multiple nodes by divinding up the IO as well as the compute
    This function should return an iterator of dictionaries that can be distributed
    to multiple instances of the builder to get/process/udpate on

    Arguments:
        number_splits: The number of groups to split the documents to work on
    """
    self.logger.info(
        f"{self.__class__.__name__} doesn't have distributed processing capabillities."
        " Instead this builder will run on just one worker for all processing"
    )
    raise NotImplementedError(
        f"{self.__class__.__name__} doesn't have distributed processing capabillities."
        " Instead this builder will run on just one worker for all processing"
    )

process_item(self, item)

Process an item. There should be no database operations in this method. Default behavior is to return the item.

Parameters:

Name Type Description Default
item Any required

Returns:

Type Description
item

an item to update

Source code in maggma/core/builder.py
def process_item(self, item: Any) -> Any:
    """
    Process an item. There should be no database operations in this method.
    Default behavior is to return the item.
    Arguments:
        item:

    Returns:
       item: an item to update
    """
    return item

run(self, log_level=10)

Run the builder serially This is only intended for diagnostic purposes

Source code in maggma/core/builder.py
def run(self, log_level=logging.DEBUG):
    """
    Run the builder serially
    This is only intended for diagnostic purposes
    """
    # Set up logging
    root = logging.getLogger()
    root.setLevel(log_level)
    ch = TqdmLoggingHandler()
    formatter = logging.Formatter(
        "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
    )
    ch.setFormatter(formatter)
    root.addHandler(ch)

    self.connect()

    cursor = self.get_items()

    for chunk in grouper(tqdm(cursor), self.chunk_size):
        self.logger.info("Processing batch of {} items".format(self.chunk_size))
        processed_chunk = [self.process_item(item) for item in chunk]
        processed_items = [item for item in processed_chunk if item is not None]
        self.update_targets(processed_items)

    self.finalize()

update_targets(self, items)

Takes a list of items from process item and updates the targets with them. Can also perform other book keeping in the process such as storing gridfs oids, etc.

Parameters:

Name Type Description Default
items List required
Source code in maggma/core/builder.py
@abstractmethod
def update_targets(self, items: List):
    """
    Takes a list of items from process item and updates the targets with them.
    Can also perform other book keeping in the process such as storing gridfs oids, etc.

    Arguments:
        items:

    Returns:

    """
    pass