Skip to content

Builder

Module containing the core builder definition.

Builder

Bases: MSONable

Base Builder class At minimum this class should implement: get_items - Get items from the sources update_targets - Updates the sources with results.

Multiprocessing and MPI processing can be used if all the data processing is limited to process_items

Source code in src/maggma/core/builder.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
class Builder(MSONable, metaclass=ABCMeta):
    """
    Base Builder class
    At minimum this class should implement:
    get_items - Get items from the sources
    update_targets - Updates the sources with results.

    Multiprocessing and MPI processing can be used if all
    the data processing is  limited to process_items
    """

    def __init__(
        self,
        sources: Union[list[Store], Store],
        targets: Union[list[Store], Store],
        chunk_size: int = 1000,
    ):
        """
        Initialize the builder the framework.

        Arguments:
            sources: source Store(s)
            targets: target Store(s)
            chunk_size: chunk size for processing
        """
        self.sources = sources if isinstance(sources, list) else [sources]
        self.targets = targets if isinstance(targets, list) else [targets]
        self.chunk_size = chunk_size
        self.total = None  # type: Optional[int]
        self.logger = logging.getLogger(type(self).__name__)
        self.logger.addHandler(logging.NullHandler())

    def connect(self):
        """
        Connect to the builder sources and targets.
        """
        for s in self.sources + self.targets:
            s.connect()

    def prechunk(self, number_splits: int) -> Iterable[dict]:
        """
        Part of a domain-decomposition paradigm to allow the builder to operate on
        multiple nodes by dividing up the IO as well as the compute
        This function should return an iterator of dictionaries that can be distributed
        to multiple instances of the builder to get/process/update on.

        Arguments:
            number_splits: The number of groups to split the documents to work on
        """
        self.logger.info(
            f"{self.__class__.__name__} doesn't have distributed processing capabilities."
            " Instead this builder will run on just one worker for all processing"
        )
        raise NotImplementedError(
            f"{self.__class__.__name__} doesn't have distributed processing capabilities."
            " Instead this builder will run on just one worker for all processing"
        )

    @abstractmethod
    def get_items(self) -> Iterable:
        """
        Returns all the items to process.

        Returns:
            generator or list of items to process
        """

    def process_item(self, item: Any) -> Any:
        """
        Process an item. There should be no database operations in this method.
        Default behavior is to return the item.

        Arguments:
            item:

        Returns:
           item: an item to update
        """
        return item

    @abstractmethod
    def update_targets(self, items: list):
        """
        Takes a list of items from process item and updates the targets with them.
        Can also perform other book keeping in the process such as storing gridfs oids, etc.

        Arguments:
            items:

        Returns:

        """

    def finalize(self):
        """
        Perform any final clean up.
        """
        # Close any Mongo connections.
        for store in self.sources + self.targets:
            try:
                store.close()
            except (AttributeError, StoreError):
                continue

    def run(self, log_level=logging.DEBUG):
        """
        Run the builder serially
        This is only intended for diagnostic purposes.
        """
        # Set up logging
        root = logging.getLogger()
        root.setLevel(log_level)
        ch = TqdmLoggingHandler()
        formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
        ch.setFormatter(formatter)
        root.addHandler(ch)

        self.connect()

        cursor = self.get_items()

        for chunk in grouper(tqdm(cursor), self.chunk_size):
            self.logger.info(f"Processing batch of {self.chunk_size} items")
            processed_chunk = [self.process_item(item) for item in chunk]
            processed_items = [item for item in processed_chunk if item is not None]
            self.update_targets(processed_items)

        self.finalize()

    def __getstate__(self):
        return self.as_dict()

    def __setstate__(self, d):
        d = {k: v for k, v in d.items() if not k.startswith("@")}
        d = MontyDecoder().process_decoded(d)
        self.__init__(**d)

__init__(sources, targets, chunk_size=1000)

Initialize the builder the framework.

Parameters:

Name Type Description Default
sources Union[list[Store], Store]

source Store(s)

required
targets Union[list[Store], Store]

target Store(s)

required
chunk_size int

chunk size for processing

1000
Source code in src/maggma/core/builder.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def __init__(
    self,
    sources: Union[list[Store], Store],
    targets: Union[list[Store], Store],
    chunk_size: int = 1000,
):
    """
    Initialize the builder the framework.

    Arguments:
        sources: source Store(s)
        targets: target Store(s)
        chunk_size: chunk size for processing
    """
    self.sources = sources if isinstance(sources, list) else [sources]
    self.targets = targets if isinstance(targets, list) else [targets]
    self.chunk_size = chunk_size
    self.total = None  # type: Optional[int]
    self.logger = logging.getLogger(type(self).__name__)
    self.logger.addHandler(logging.NullHandler())

connect()

Connect to the builder sources and targets.

Source code in src/maggma/core/builder.py
48
49
50
51
52
53
def connect(self):
    """
    Connect to the builder sources and targets.
    """
    for s in self.sources + self.targets:
        s.connect()

finalize()

Perform any final clean up.

Source code in src/maggma/core/builder.py
109
110
111
112
113
114
115
116
117
118
def finalize(self):
    """
    Perform any final clean up.
    """
    # Close any Mongo connections.
    for store in self.sources + self.targets:
        try:
            store.close()
        except (AttributeError, StoreError):
            continue

get_items() abstractmethod

Returns all the items to process.

Returns:

Type Description
Iterable

generator or list of items to process

Source code in src/maggma/core/builder.py
74
75
76
77
78
79
80
81
@abstractmethod
def get_items(self) -> Iterable:
    """
    Returns all the items to process.

    Returns:
        generator or list of items to process
    """

prechunk(number_splits)

Part of a domain-decomposition paradigm to allow the builder to operate on multiple nodes by dividing up the IO as well as the compute This function should return an iterator of dictionaries that can be distributed to multiple instances of the builder to get/process/update on.

Parameters:

Name Type Description Default
number_splits int

The number of groups to split the documents to work on

required
Source code in src/maggma/core/builder.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def prechunk(self, number_splits: int) -> Iterable[dict]:
    """
    Part of a domain-decomposition paradigm to allow the builder to operate on
    multiple nodes by dividing up the IO as well as the compute
    This function should return an iterator of dictionaries that can be distributed
    to multiple instances of the builder to get/process/update on.

    Arguments:
        number_splits: The number of groups to split the documents to work on
    """
    self.logger.info(
        f"{self.__class__.__name__} doesn't have distributed processing capabilities."
        " Instead this builder will run on just one worker for all processing"
    )
    raise NotImplementedError(
        f"{self.__class__.__name__} doesn't have distributed processing capabilities."
        " Instead this builder will run on just one worker for all processing"
    )

process_item(item)

Process an item. There should be no database operations in this method. Default behavior is to return the item.

Parameters:

Name Type Description Default
item Any
required

Returns:

Name Type Description
item Any

an item to update

Source code in src/maggma/core/builder.py
83
84
85
86
87
88
89
90
91
92
93
94
def process_item(self, item: Any) -> Any:
    """
    Process an item. There should be no database operations in this method.
    Default behavior is to return the item.

    Arguments:
        item:

    Returns:
       item: an item to update
    """
    return item

run(log_level=logging.DEBUG)

Run the builder serially This is only intended for diagnostic purposes.

Source code in src/maggma/core/builder.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def run(self, log_level=logging.DEBUG):
    """
    Run the builder serially
    This is only intended for diagnostic purposes.
    """
    # Set up logging
    root = logging.getLogger()
    root.setLevel(log_level)
    ch = TqdmLoggingHandler()
    formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
    ch.setFormatter(formatter)
    root.addHandler(ch)

    self.connect()

    cursor = self.get_items()

    for chunk in grouper(tqdm(cursor), self.chunk_size):
        self.logger.info(f"Processing batch of {self.chunk_size} items")
        processed_chunk = [self.process_item(item) for item in chunk]
        processed_items = [item for item in processed_chunk if item is not None]
        self.update_targets(processed_items)

    self.finalize()

update_targets(items) abstractmethod

Takes a list of items from process item and updates the targets with them. Can also perform other book keeping in the process such as storing gridfs oids, etc.

Parameters:

Name Type Description Default
items list
required

Returns:

Source code in src/maggma/core/builder.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
@abstractmethod
def update_targets(self, items: list):
    """
    Takes a list of items from process item and updates the targets with them.
    Can also perform other book keeping in the process such as storing gridfs oids, etc.

    Arguments:
        items:

    Returns:

    """