Database “Builders”

The Materials Project relies on MongoDB databases. The common parts of the process of creating these databases from either files or other MongoDB databases are encapsulated in the pymatgen.db.builders subpackage, and in the mgbuild command-line script. This package and script comprise a lightweight framework that is meant to automate, simplify, and ultimately streamline the process of creating databases. All the code and methods are general-purpose, and thus could be useful to any project that needs to perform extract-transform-load (ETL) operations with MongoDB.

The code presented here can be found in the directory pymatgen.db/builders/examples in the source code distribution.

Features

Parallel building

Builders can be run in parallel without explicit coding of parallelism by the author. This allows CPU-intensive transformations of the data to run much faster on multicore machines, which includes most modern hardware. An illustration of the difference between sequential and parallel builds is shown below.

_images/parallel_build.png

The builder framework automatically parallelizes the processing of each item, by placing items in a shared queue and spawning processes to pull items off the queue in parallel. Note that the get_items() method is not automatically parallelized, so for better or worse querying the DB happens sequentially (unless you write explicit parallel code inside get_items()).

Incremental building

Incremental building allows successive builds of source MongoDB collection(s) to only operate on the records added since the last build. This can save huge amounts of time. An illustration of the difference between an incremental and full (non-incremental) build is shown below.

_images/incremental_build.png

Writing a builder

To write a builder, you must create a Python class that inherits from pymatgen.db.builders.core.Builder (see the pymatgen.db.builders package) and implement a few methods on this class to perform the specific work for your builder. In this section we will give two example builders: a simple FileCounter builder that just counts the lines in a file, and a CopyBuilder that copies a MongoDB collection.

Simple “FileCounter” builder

The first builder simply reads from a file, and counts the lines in that file. In this section we’ll show the whole program, then walk through it one section at a time:

from pymatgen.db.builders.core import Builder
class FileCounter(Builder):
    """Count lines and characters in a file.
    """
    def __init__(self, **kwargs):
        self.num_lines = 0
        Builder.__init__(self, **kwargs)

    def get_parameters(self):
        return {'input_file': {'type': 'str', 'desc': 'Input file path'}}

    def get_items(self, input_file=None):
        with open(input_file, "r") as f:
            for line in f:
                yield line

    def process_item(self, item):
        self.num_lines += 1

    def finalize(self, errors):
        print("{:d} lines, {:d} characters".format(
            self.num_lines, self.num_chars))
        return True

Initialization:

from pymatgen.db.builders.core import Builder
class FileCounter(Builder):
    """Count lines and characters in a file.
    """
    def __init__(self, **kwargs):
        self.num_lines = 0
        Builder.__init__(self, **kwargs)

The class inherits from the pymatgen.db.core.Builder class. In this case, it includes a constructor, but this is optional and often not needed. The constructor simply initializes the number of lines counter and then calls its parent.

get_parameters:

def get_parameters(self):
   return {'input_file': {'type': 'str', 'desc': 'Input file path'}}

This method returns a dictionary of metadata about the parameters for get_items(). This metadata is used when running a builder with the mgbuild program. More details on this are given in Running a builder. For now, it is enough to note that the parameters must match the keyword arguments to get_items(), both in name and (expected) type.

get_items:

def get_items(self, input_file=None):
    with open(input_file, "r") as f:
        for line in f:
            yield line

The two main methods that you must override are called get_items() and process_item(). The first will return an iterator, in this case the function simply returns one line of the file at a time. Notice that you can make the function into a generator by using yield to return items, but returning any other iterable would also work fine (e.g. the function could have been a one-liner “return open(input_file).readlines()”).

process_item:

def process_item(self, item):
    self.num_lines += 1

Here the instance variable num_lines is simply incremented for every line passed to it by the get_items() iterator.

Warning

Updating instance variables will cause improper behavior if the user runs the builder in parallel. This occurs because the parallel mode automatically starts multiple copies of the same class, and their independent actions will clash. If you really need to update some shared state, use the Python multiprocessing module functions. See the multiprocessing docs for details.

finalize:

def finalize(self, errors):
    print("{:d} lines, {:d} characters".format(
        self.num_lines, self.num_chars))
    return True

Optionally, you can put code that will be run once (for all builders) in the finalize() method. Here we just print a result. The return value of finalize is used to determine whether the build was successful. So make sure you return True, if it succeeds, since the default of None will read as False.

Note that this builder did not access MongoDB in any way. The next example will show MongoDB access and other features.

Database “CopyBuilder”

The next builder does a simple DB operation: copying one MongoDB collection from a source to a destination. As before, we begin with the full program and then step through it one snippet at at time:

from pymatgen.db.builders import core, util
from pymatgen.db.query_engine import QueryEngine

_log = util.get_builder_log("copy")

class CopyBuilder(core.Builder):
    """Copy from one MongoDB collection to another.
    """
    def __init__(self, *args, **kwargs):
        self._target_coll = None
        core.Builder.__init__(self, *args, **kwargs)

    def get_items(self, source=None, target=None, crit=None):
        """Copy records from source to target collection.

        :param source: Input collection
        :type source: QueryEngine
        :param target: Output collection
        :type target: QueryEngine
        :param crit: Filter criteria, e.g. "{ 'flag': True }".
        :type crit: dict
        """
        self._target_coll = target.collection
        if not crit:  # reduce any False-y crit value to None
            crit = None
        cur = source.query(criteria=crit)
        _log.info("copy: source={} crit={} count={:d}"
                  .format(source.collection, crit, len(cur)))
        return cur

    def process_item(self, item):
        self._target_coll.insert(item)

Logging:

_log = util.get_builder_log("copy")

In this program, we start by setting up logging. For convenience, the util.get_builder_log() method creates a new Python logging.Logger instance with a standard name and format.

Initialization:

def __init__(self, *args, **kwargs):
    self._target_coll = None
    core.Builder.__init__(self, *args, **kwargs)

When we initialize the class, we create an instance variable that we will later use to remember the target collection.

get_items:

def get_items(self, source=None, target=None, crit=None):
    """Copy records from source to target collection.

    :param source: Input collection
    :type source: QueryEngine
    :param target: Output collection
    :type target: QueryEngine
    :param crit: Filter criteria, e.g. "{ 'flag': True }".
    :type crit: dict
    """
    self._target_coll = target.collection
    if not crit:  # reduce any False-y crit value to None
        crit = None
    cur = source.query(criteria=crit)
    _log.info("source={} crit={} count={:d}"
              .format(source.collection, crit, len(cur)))
    return cur

For a copy operation, the get_items() method must query the source collection and get an iterator over the records.

There are two things that are different from the FileCounter example. First, note that there is no get_parameters() method at all. Instead the docstring of this method is actually a machine-readable version of the metadata needed for running the builder. Not coincidentally, the format expected by this docstring is also understood by Sphinx’s autodoc feature. This way, you will be able to kill two birds with one stone: your builders will be documented for command-line invocation, and you can easily generate HTML, PDF, etc. documentation pages.

Second, this method connects to the database and queries it. But, you may be asking, where is the db.connect() call? This is handled by some magic that is in the docstring. Notice that the type of both the source and target is QueryEngine. This is a special datatype that instructs the driver program (mgbuild) to expect a database configuration file with host name, user, password, database name, etc. and to automatically connect to this database and return a pymatgen.db.query_engine.QueryEngine instance. These instances are passed in as arguments to the method. So, all the method has to do is to use the QueryEngine object. In this case, this means creating a cursor that iterates over the source collection and remembering the target collection in an instance variable.

Note

Unlike the previous example where instance variables might cause strange behavior, here the _target_coll instance variable is perfectly fine for parallel execution because the individual builder instances do not want to share the state of this variable between them – they each want and need their own copy.

process_item:

def process_item(self, item):
    self._target_coll.insert(item)

Here, we simply insert every item into the target collection.

As we will see later, the builder framework also contains some automatic functionality for incremental building, which means only looking at records that are new since the last time. Usually this involves some extra logic inside the builder itself, but in a very simple case like this the copying would automatically work with the incremental mode.

Incremental builder “MaxValueBuilder”

The incremental building concept was introduced in Features. The central idea of the incremental building implementation is that any builder can be run in “incremental mode”. When this happens, any QueryEngine objects are replaced transparently by equivalent objects that track their last position, of class pymatgen.db.builders.incr.TrackedQueryEngine, which is documented in module pymatgen.db.builders.incr. This tracking can be controlled, if necessary, with the instance variable tracking on the TrackedQueryEngine class.

The Database “CopyBuilder” is an example of a trivial builder that can work with incremental building, without modification to the source code. With incremental mode activated, successive copies will only move over “new” data items. But most builders will not be this easy. To help understand what to do in a non-trivial case, we show here a contrived example where a collection A is used to build a derived collection B. In A, there are 2 values for each record, a number and a group name. In B, there are two values for each distinct group in A: the group name, and the highest value for that group.

For the sake of this example we will use the following algorithm to rebuild B from A when A gets new elements Anew.

  1. For each record present in Anew:

  • Get its group, g

  • If g is not seen, figure out current maximum (if any) from all records in A

  • Update maximum for g, in memory, with value for record

  1. After all records in Anew are processed, set new group maximums in B from values stored in memory.

This algorithm is incremental in the sense that it ignores any groups that are not in the new elements Anew, yet non-trivial because in order to calculate the new maximum value one needs to examine all the elements in A:

from pymatgen.db.builders import core
from pymatgen.db.builders import util
from pymatgen.db.query_engine import QueryEngine

class MaxValueBuilder(core.Builder):
    """Example of incremental builder that requires
    some custom logic for incremental case.
    """
    def get_items(self, source=None, target=None):
        """Get all records from source collection to add to target.

        :param source: Input collection
        :type source: QueryEngine
        :param target: Output collection
        :type target: QueryEngine
        """
        self._groups = self.shared_dict()
        self._target_coll = target.collection
        self._src = source
        return source.query()

    def process_item(self, item):
        """Calculate new maximum value for each group,
        for "new" items only.
        """
        group, value = item['group'], item['value']
        if group in self._groups:
            cur_val = self._groups[group]
            self._groups[group] = max(cur_val, value)
        else:
            # New group. Could fetch old max. from target collection,
            # but for the sake of illustration recalculate it from
            # the source collection.
            self._src.tracking = False  # examine entire collection
            new_max = value
            for rec in self._src.query(criteria={'group': group},
                                       properties=['value']):
                new_max = max(new_max, rec['value'])
            self._src.tracking = True  # back to incremental mode
            # calculate new max
            self._groups[group] = new_max

    def finalize(self, errs):
        """Update target collection with calculated maximum values.
        """
        for group, value in self._groups.items():
            doc = {'group': group, 'value': value}
            self._target_coll.update({'group': group}, doc, upsert=True)
        return True

Initialization:

class MaxValueBuilder(core.Builder):
    """Example of incremental builder that requires
       some custom logic for incremental case.
    """
    def get_items(self, source=None, target=None):
        """Get all records from source collection to add to target.

        :param source: Input collection
        :type source: QueryEngine
        :param target: Output collection
        :type target: QueryEngine
        """
        self._groups = self.shared_dict()
        self._target_coll = target.collection
        self._src = source
        return source.query()

Just as for the CopyBuilder, we use the docstring-style of declaration for the parameters to this builder, which are simply the input and output collections. We remember both source and target in variables. In addition, we use a utility function shared_dict() in the Builder class to get a dictionary variable that can be shared between parallel processes. Finally, this method returns a query on all items in the collection.

Processing:

def process_item(self, item):
    """Calculate new maximum value for each group,
    for "new" items only.
    """
    group, value = item['group'], item['value']
    if group in self._groups:
        cur_val = self._groups[group]
        self._groups[group] = max(cur_val, value)
    else:
        # New group. Could fetch old max. from target collection,
        # but for the sake of illustration recalculate it from
        # the source collection.
        self._src.tracking = False  # examine entire collection
        new_max = value
        for rec in self._src.query(criteria={'group': group},
                                   properties=['value']):
            new_max = max(new_max, rec['value'])
        self._src.tracking = True  # back to incremental mode
        # calculate new max
        self._groups[group] = new_max

For each item, we update the shared _groups variable created in get_items(). For new groups, we re-scan the whole source collection to find the previous maximum value. There are a couple better ways to do this, but this method is easy to understand and illustrates how a collection can be manipulated in its “raw” form in an incremental builder.

The key lines here are self._src.tracking = False and, later, self._src.tracking = True. These turn off the “incremental mode” so that the query will start at the beginning of the collection instead of from the start of Anew.

Finalization:

def finalize(self, errs):
    """Update target collection with calculated maximum values.
    """
    for group, value in self._groups.items():
        doc = {'group': group, 'value': value}
        self._target_coll.update({'group': group}, doc, upsert=True)
    return True

In this case, the finalize() method is used to set the calculated group maximums into the target collection. This is the same as the reduce stage of a map/reduce task (the process_item performs the map).

In conclusion, we see that for this case only 2 lines turning the tracking variable on and off needed to be added to accommodate incremental building.

Running a builder

This section describes how, once you have written a builder class, you can use mgbuild to run it, possibly in parallel and possibly “incrementally”, on some inputs.

We will break this process into two parts:

Both of these use the mgbuild sub-command “run” (alternatively: “build”), like this:

mgbuild run <arguments>

In the examples below, we will assume that you have pymatgen-db installed and in your Python path. We will use the example modules that are installed in pymatgen.db.builders.examples.

Displaying builder usage

You can get the list of parameters and their types for a given builder by giving its full module path, and the -u or --usage option:

% mgbuild run -u pymatgen.db.builders.examples.copy_builder.CopyBuilder

pymatgen.db.builders.examples.copy_builder.CopyBuilder
  Copy from one MongoDB collection to another.
  Parameters:
    crit (dict): Filter criteria, e.g. "{ 'flag': True }".
    source (QueryEngine): Input collection
    target (QueryEngine): Output collection

Note

You will also get the usage information if you invoke the builder with the wrong number of arguments (e.g. zero), although in this case you will also see some error messages.

Running the builder

The usage of the mgbuild run command is as follows:

usage: mgbuild run [-h] [--quiet] [--verbose] [-i OPER[:FIELD]] [-n NUM_CORES]
                   [-u]
                   builder [parameter [parameter ...]]

positional arguments:
  builder               Builder class, relative or absolute import path, e.g.
                        'my.awesome.BigBuilder' or 'BigBuilder'.
  parameter             Builder parameters, in format <name>=<value>. If the
                        parameter type is QueryEngine, the value should be a
                        JSON configuration file. Prefix filename with a '-' to
                        ignore incremental mode for this QueryEngine.

optional arguments:
  -h, --help            show this help message and exit
  --quiet, -q           Minimal verbosity.
  --verbose, -v         Print more verbose messages to standard error.
                        Repeatable. (default=ERROR)
  -i OPER[:FIELD], --incr OPER[:FIELD]
                        Incremental mode for operation and optional sort-field
                        name. OPER may be one of: copy, other, build. Default
                        FIELD is '_id'
  -n NUM_CORES, --ncores NUM_CORES
                        Number of cores or processes to run in parallel (1)
  -u, --usage           Print usage information on selected builder and exit.

To run the builder, you need at a minimum to give the full path to the builder class, and then values for each parameter. There are also optional arguments for building in parallel and building incrementally. This section will walk through from simple to more complex examples.

Basic usage

Run the copy builder:

mgbuild run  pymatgen.db.builders.examples.copy_builder.CopyBuilder \
    source=conf/test1.json target=conf/test2.json crit='{}'

In this example, we are running the CopyBuilder with configuration files for the source and target and empty criteria (i.e. copy everything). The copy will be run in a single thread.

The configuration files in question are just JSON files that look like this (you could add “user” and “password” for authenticated DBs):

{"host": "localhost", "port": 27017,
 "database": "foo", "collection": "test1"}

See Database configuration for more details.

Running in parallel

Most machines have multiple cores, and hundreds of cores will be common in the near future. If your item processing requires any real work, you will probably benefit by running in parallel:

mgbuild run  pymatgen.db.builders.examples.copy_builder.CopyBuilder \
    source=conf/test1.json target=conf/test2.json crit='{}' -n 8

The same command as previously, but with -n 8 added to cause 8 parallel threads to be spawned to run the copy in parallel.

Note

For parallel runs, only the process_item() method is run in parallel. The get_items() is always run sequentially.

Incremental builds

The concept of incremental building was introduced in Incremental builder “MaxValueBuilder”. From the command-line, incremental building is controlled by the -i/--incr option. What this really does is to add some behind-the-scenes bookkeeping for every parameter of type QueryEngine (except ones where it is explicitly turned off, see below) that records and retrieves the spot where processing was last ended. Multiple spots are allowed per-collection by requiring an “operation”. Currently, only a small set of operations are allowed: “copy”, “build”, and “other”.

For incremental building to work properly, there must be some field in the collection that increases monotonically. This field is used to determine which records come after the spot marked on the last run. By default this field is _id, but it is highly recommended to choose a collection-specific identifier because _id as chosen by the client is not always monotonic.

Basic incremental build:

mgbuild run  pymatgen.db.builders.examples.copy_builder.CopyBuilder \
    source=conf/test1.json target=conf/test2.json crit='{}' \
    -i copy

Copies from source to target. Subsequent runs will only copy records that are newer (according to the field, in this case defaulting to _id) than the last record from the previous run.

Incremental build with parallelism:

mgbuild run  pymatgen.db.builders.examples.copy_builder.CopyBuilder \
    source=conf/test1.json target=conf/test2.json crit='{}' \
    -n 8 -i copy

Parallelism is not different with incremental builds. As before, we simply add -n 8 to the command-line.

Incremental build with custom identifier:

mgbuild run  pymatgen.db.builders.examples.copy_builder.CopyBuilder \
    source=conf/test1.json target=conf/test2.json crit='{}' \
    -i copy:num

This example runs an incremental build with the “copy” operation, using the num field instead of the default _id.

Incremental build skipped for some collections:

mgbuild run  pymatgen.db.builders.examples.copy_builder.CopyBuilder \
    source=conf/test1.json target=-conf/test2.json crit='{}' \
    -i copy:num

This is pretty subtle: notice the “-” inserted after the “=” in target=-conf/test2.json. This has the effect of not adding tracking information for the target collection. In this case, tracking the last record added to the target isn’t useful for the copy, all that matters is knowing where we stopped in the source collection.