jobflow.core

jobflow.core.flow

Define base Flow object.

class jobflow.core.flow.JobOrder(value)[source]

Bases: ValueEnum

Options to control the order of job execution.

  • AUTO: Automatically determine the job order based on input and output references.

  • LINEAR: Run the jobs in the order they appear in the jobs array.

AUTO = 'auto'
LINEAR = 'linear'
class jobflow.core.flow.Flow(jobs, output=None, name='Flow', order=JobOrder.AUTO, uuid=None, hosts=None, metadata=None, metadata_updates=None)[source]

Bases: MSONable

A Flow contains a collection of Jobs or other Flows to execute.

The Flow object is the main tool for constructing workflows. Flows can either contain Jobs or other Flows. Like Job objects, Flow objects can also have outputs, however, these are not explicitly stored in the database. Instead, the outputs of a Flow act to structure the outputs of the jobs contained within the Flow.

Parameters:
  • jobs (list[Flow | jobflow.Job] | jobflow.Job | Flow) – The jobs to be run as a list of Job or Flow objects.

  • output (Any) – The output of the flow. These should come from the output of one or more of the jobs.

  • name (str) – The flow name.

  • order (JobOrder) – The order in which the jobs should be executed. The default is to determine the order automatically based on the connections between jobs.

  • uuid (str) – The identifier of the flow. This is generated automatically.

  • hosts (list[str]) – The list of UUIDs of the hosts containing the job. This is updated automatically when a flow is included in the jobs array of another flow. The object identified by one UUID of the list should be contained in objects identified by its subsequent elements.

  • metadata (dict[str, Any]) – A dictionary of information that will get stored in the Flow collection.

  • metadata_updates (list[dict[str, Any]]) – A list of updates for the metadata that will be applied to any dynamically generated sub Flow/Job.

Raises:
  • ValueError – If a job in the jobs array is already part of another flow.

  • ValueError – If any jobs needed to resolve the inputs of all jobs in the jobs array are missing.

  • ValueError – If any jobs needed to resolve the flow output are missing.

Warns:

UserWarning – If a .Job or Flow object is used as the Flow output rather than an OutputReference.

See also

job, Job, JobOrder

Examples

Below we define a simple job to add two numbers, and create an flow containing two connected add jobs.

>>> from jobflow import job, Flow
>>> @job
... def add(a, b):
...     return a + b
>>> add_first = add(1, 2)
>>> add_second = add(add_first.output, 2)
>>> flow = Flow(jobs=[add_first, add_second])

This flow does not expose any of the outputs of the jobs contained within it. We could instead “register” the output of the second add as the output of the flow.

>>> flow = Flow(jobs=[add_first, add_second], output=add_second.output)

This will allow the flow to be used in another flow. In this way, Flows can be infinitely nested. For example:

>>> add_third = add(flow.output, 5)
>>> outer_flow = Flow(jobs=[flow, add_third])

Flows can be run using an flow manager. These enable running Flows locally or on compute clusters (using the FireWorks manager).

>>> from jobflow.managers.local import run_locally
>>> response = run_locally(flow)
property jobs: tuple[Flow | Job, ...]

Get the Jobs in the Flow.

Returns:

The list of Jobs/Flows of the Flow.

Return type:

list[Job]

property output: Any

Get the output of the flow.

Returns:

The output of the flow.

Return type:

Any

property job_uuids: tuple[str, ...]

Uuids of every Job contained in the Flow (including nested Flows).

Returns:

The uuids of all Jobs in the Flow (including nested Flows).

Return type:

tuple[str]

property all_uuids: tuple[str, ...]

Uuids of every Job and Flow contained in the Flow (including nested Flows).

Returns:

The uuids of all Jobs and Flows in the Flow (including nested Flows).

Return type:

tuple[str]

property graph: DiGraph

Get a graph indicating the connectivity of jobs in the flow.

Returns:

The graph showing the connectivity of the jobs.

Return type:

DiGraph

property host: str | None

UUID of the first Flow that contains this Flow.

Returns:

the UUID of the host.

Return type:

str

draw_graph(**kwargs)[source]

Draw the flow graph using matplotlib.

Requires matplotlib to be installed.

Parameters:

kwargs – keyword arguments that are passed to jobflow.utils.graph.draw_graph.

Returns:

The matplotlib pyplot state object.

Return type:

pyplot

iterflow()[source]

Iterate through the jobs of the flow.

The jobs are yielded such that the job output references can always be resolved. I.e., root nodes of the flow graph are always returned first.

Yields:

Job, list[str] – The Job and the uuids of any parent jobs (not to be confused with the host flow).

update_kwargs(update, name_filter=None, function_filter=None, dict_mod=False)[source]

Update the kwargs of all Jobs in the Flow.

Note that updates will be applied to jobs in nested Flow.

Parameters:
  • update (dict[str, Any]) – The updates to apply.

  • name_filter (str) – A filter for the job name. Only jobs with a matching name will be updated. Includes partial matches, e.g. “ad” will match a job with the name “adder”.

  • function_filter (Callable) – A filter for the job function. Only jobs with a matching function will be updated.

  • dict_mod (bool) – Use the dict mod language to apply updates. See DictMods for more details.

Examples

Consider a flow containing a simple job with a number keyword argument.

>>> from jobflow import job, Flow
>>> @job
... def add(a, number=5):
...     return a + number
>>> add_job = add(1)
>>> flow = Flow([add_job])

The number argument could be updated in the following ways.

>>> flow.update_kwargs({"number": 10})

This will work if all jobs in the flow have a kwarg called number. However, when this is not the case this will result in the bad input kwargs for some jobs. To only apply the update to the correct jobs, filters can be used.

>>> flow.update_kwargs({"number": 10}, name_filter="add")
>>> flow.update_kwargs({"number": 10}, function_filter=add)
update_maker_kwargs(update, name_filter=None, class_filter=None, nested=True, dict_mod=False)[source]

Update the keyword arguments of any Maker objects in the jobs.

Note that updates will be applied to Jobs in any inner Flows.

Parameters:
  • update (dict[str, Any]) – The updates to apply.

  • name_filter (str) – A filter for the Maker name. Only Makers with a matching name will be updated. Includes partial matches, e.g. “ad” will match a Maker with the name “adder”.

  • class_filter (type[jobflow.Maker]) – A filter for the maker class. Only Makers with a matching class will be updated. Note the class filter will match any subclasses.

  • nested (bool) – Whether to apply the updates to Maker objects that are themselves kwargs of Maker, job, or flow objects. See examples for more details.

  • dict_mod (bool) – Use the dict mod language to apply updates. See DictMods for more details.

Examples

Consider the following flow containing jobs from a Maker:

>>> from dataclasses import dataclass
>>> from jobflow import job, Maker, Flow
>>> @dataclass
... class AddMaker(Maker):
...     name: str = "add"
...     number: float = 10
...
...     @job
...     def make(self, a):
...         return a + self.number
>>> maker = AddMaker()
>>> add_job = maker.make(1)
>>> flow = Flow([add_job])

The number argument could be updated in the following ways.

>>> flow.update_maker_kwargs({"number": 10})

This will work if all Makers in the flow have a kwarg called number. However, when this is not the case this will result in the bad input kwargs for some Makers. To only apply the update to the correct Makers, filters can be used.

>>> flow.update_maker_kwargs({"number": 10}, name_filter="add")
>>> flow.update_maker_kwargs({"number": 10}, class_filter=AddMaker)

By default, the updates are applied to nested Makers. These are Makers which are present in the kwargs of another Maker. Consider the following case for a Maker that produces a job that restarts.

>>> from jobflow import Response
>>> @dataclass
... class RestartMaker(Maker):
...     name: str = "replace"
...     add_maker: Maker = AddMaker()
...
...     @job
...     def make(self, a):
...         restart_job = self.add_maker.make(a)
...         return Response(replace=restart_job)
>>> maker = RestartMaker()
>>> my_job = maker.make(1)
>>> flow = Flow([my_job]

The following update will apply to the nested AddMaker in the kwargs of the RestartMaker:

>>> flow.update_maker_kwargs({"number": 10}, class_filter=AddMaker)

However, if nested=False, then the update will not be applied to the nested Maker:

>>> flow.update_maker_kwargs(
...     {"number": 10}, class_filter=AddMaker, nested=False
... )
append_name(append_str, prepend=False, dynamic=True)[source]

Append a string to the name of the flow and all jobs contained in it.

Parameters:
  • append_str (str) – A string to append.

  • prepend (bool) – Prepend the name rather than appending it.

  • dynamic (bool)

update_metadata(update, name_filter=None, function_filter=None, dict_mod=False, dynamic=True, callback_filter=<function Flow.<lambda>>)[source]

Update the metadata of the Flow and/or its Jobs.

Note that updates will be applied to jobs in nested Flow.

Parameters:
  • update (dict[str, Any]) – The updates to apply.

  • name_filter (str) – A filter for the job name. Only jobs with a matching name will be updated. Includes partial matches, e.g. “ad” will match a job with the name “adder”.

  • function_filter (Callable) – A filter for the job function. Only jobs with a matching function will be updated.

  • dict_mod (bool) – Use the dict mod language to apply updates. See DictMods for more details.

  • dynamic (bool) – The updates will be propagated to Jobs/Flows dynamically generated at runtime.

  • callback_filter (Callable[[Flow | Job], bool]) – A function that takes a Flow or Job instance and returns True if updates should be applied to that instance. Allows for custom filtering logic. Applies recursively to nested Flows and Jobs so best be specific.

Examples

Consider a flow containing two jobs.

>>> from jobflow import job, Flow
>>> @job
... def add(a, b):
...     return a + b
>>> add_job1 = add(5, 6)
>>> add_job2 = add(6, 7)
>>> flow = Flow([add_job1, add_job2])

The metadata of both jobs could be updated as follows:

>>> flow.update_metadata({"tag": "addition_job"})

Or using a callback filter to only update flows containing a specific maker:

>>> flow.update_metadata(
...     {"material_id": 42},
...     callback_filter=lambda flow: SomeMaker in map(type, flow)
...     and flow.name == "flow name"
... )
update_config(config, name_filter=None, function_filter=None, attributes=None, dynamic=True)[source]

Update the job config of all Jobs in the Flow.

Note that updates will be applied to jobs in nested Flow.

Parameters:
  • config (jobflow.JobConfig | dict) – A JobConfig object or a dict with containing the attributes to update.

  • name_filter (str) – A filter for the job name. Only jobs with a matching name will be updated. Includes partial matches, e.g. “ad” will match a job with the name “adder”.

  • function_filter (Callable) – A filter for the job function. Only jobs with a matching function will be updated.

  • attributes (list[str] | str) – Which attributes of the job config to set. Can be specified as one or more attributes specified by their name.

  • dynamic (bool) – The updates will be propagated to Jobs/Flows dynamically generated at runtime.

Examples

Consider a flow containing two jobs.

>>> from jobflow import job, Flow
>>> @job
... def add(a, b):
...     return a + b
>>> add_job1 = add(5, 6)
>>> add_job2 = add(6, 7)
>>> flow = Flow([add_job1, add_job2])

The config of both jobs could be updated as follows:

>>> new_config = JobConfig(
...    manager_config={"_fworker": "myfworker"}, resolve_references=False
... )
>>> flow.update_config(new_config)

To only update specific attributes, the attributes argument can be set. For example, the following will only update the “manager_config” attribute of the jobs’ config.

>>> flow.update_config(new_config, attributes="manager_config")

Alternatively, the config can be specified as a dictionary with keys that are attributes of the JobConfig object. This allows you to specify updates without having to create a completely new JobConfig object. For example:

>>> flow.update_config({"manager_config": {"_fworker": "myfworker"}})
add_hosts_uuids(hosts_uuids=None, prepend=False)[source]

Add a list of UUIDs to the internal list of hosts.

If hosts_uuids is None the uuid of this Flow will be added to the inner jobs and flow. Otherwise, the passed value will be set both in the list of hosts of the current flow and of the inner jobs and flows. The elements of the list are supposed to be ordered in such a way that the object identified by one UUID of the list is contained in objects identified by its subsequent elements.

Parameters:
  • hosts_uuids (str | list[str] | None) – A list of UUIDs to add. If None the current uuid of the flow will be added to the inner Flows and Jobs.

  • prepend (bool) – Insert the UUIDs at the beginning of the list rather than extending it.

add_jobs(jobs)[source]

Add Jobs or Flows to the Flow.

Added objects should not belong to other flows. The list of hosts will be added automatically to the incoming Jobs/Flows based on the hosts of the current Flow.

Parameters:

jobs (Job | Flow | Sequence[Flow | Job]) – A list of Jobs and Flows.

Return type:

None

remove_jobs(indices)[source]

Remove jobs from the Flow.

It is not possible to remove jobs referenced in the output.

Parameters:

indices (int | list[int]) – Indices of the jobs to be removed. Accepted values: from 0 to len(jobs) - 1.

jobflow.core.flow.get_flow(flow, allow_external_references=False)[source]

Check dependencies and return flow object.

Parameters:
  • flow (Flow | Job | list[jobflow.Job]) – A job, list of jobs, or flow.

  • allow_external_references (bool) – If False all the references to other outputs should be from other Jobs of the Flow.

Returns:

A Flow object where connections have been checked.

Return type:

Flow

jobflow.core.job

Functions and classes for representing Job objects.

class jobflow.core.job.JobConfig(resolve_references=True, on_missing_references=OnMissing.ERROR, manager_config=<factory>, expose_store=False, pass_manager_config=True, response_manager_config=<factory>)[source]

Bases: MSONable

The configuration parameters for a job.

Parameters:
  • resolve_references (bool) – Whether to resolve any references before the job function is executed. If False the unresolved reference objects will be passed into the function call.

  • on_missing_references (jobflow.core.reference.OnMissing) – What to do if the references cannot be resolved. The default is to throw an error.

  • manager_config (dict) – The configuration settings to control the manager execution.

  • expose_store (bool) – Whether to expose the store in CURRENT_JOB` when the job is running.

  • pass_manager_config (bool) – Whether to pass the manager configuration on to detour, addition, and replacement jobs.

  • response_manager_config (dict) – The custom configuration to pass to a detour, addition, or replacement job. Using this kwarg will automatically take precedence over the behavior of pass_manager_config such that a different configuration than manger_config can be passed to downstream jobs.

Returns:

A JobConfig object.

Return type:

JobConfig

jobflow.core.job.job(method: Callable | None = None) Callable[..., Job][source]
jobflow.core.job.job(method: Callable, **job_kwargs) Callable[..., Job]
jobflow.core.job.job(method: None = None, **job_kwargs) Callable[..., Callable[..., Job]]

Wrap a function to produce a Job.

Job objects are delayed function calls that can be used in an Flow. A job is composed of the function name and source and any arguments for the function. This decorator makes it simple to create job objects directly from a function definition. See the examples for more details.

Parameters:
  • method – A function to wrap. This should not be specified directly and is implied by the decorator.

  • **job_kwargs – Other keyword arguments that will get passed to the Job init method.

Examples

>>> @job
... def print_message():
...     print("I am a Job")
>>> print_job = print_message()
>>> type(print_job)
<class 'jobflow.core.job.Job'>
>>> print_job.function
<function print_message at 0x7ff72bdf6af0>

Jobs can have required and optional parameters.

>>> @job
... def print_sum(a, b=0):
...     return print(a + b)
>>> print_sum_job = print_sum(1, 2)
>>> print_sum_job.function_args
(1, )
>>> print_sum_job.function_kwargs
{"b": 2}

If the function returns a value it can be referenced using the output attribute of the job.

>>> @job
... def add(a, b):
...     return a + b
>>> add_task = add(1, 2)
>>> add_task.output
OutputReference('abeb6f48-9b34-4698-ab69-e4dc2127ebe9')

Note

Because the task has not yet been run, the output value is an OutputReference object. References are automatically converted to their computed values (resolved) when the task runs.

If a dictionary of values is returned, the values can be indexed in the usual way.

>>> @job
... def compute(a, b):
...     return {"sum": a + b, "product": a * b}
>>> compute_task = compute(1, 2)
>>> compute_task.output["sum"]
OutputReference('abeb6f48-9b34-4698-ab69-e4dc2127ebe9', 'sum')

Warning

If an output is indexed incorrectly, for example by trying to access a key that doesn’t exist, this error will only be raised when the Job is executed.

Jobs can return Response objects that control the flow execution flow. For example, to replace the current job with another job, replace can be used.

>>> from jobflow import Response
>>> @job
... def replace(a, b):
...     new_job = compute(a, b)
...     return Response(replace=new_job)

By default, job outputs are stored in the :obj`.JobStore` docs_store. However, the JobStore additional_stores` can also be used for job outputs. The stores are specified as keyword arguments, where the argument name gives the store name and the argument value is the type of data/key to store in that store. More details on the accepted key types are given in the Job docstring. In the example below, the “graph” key is stored in an additional store named “graphs” and the “data” key is stored in an additional store called “large_data”.

>>> @job(large_data="data", graphs="graph")
... def compute(a, b):
...     return {"data": b, "graph": a }

Note

Using additional stores requires the JobStore to be configured with the required store names present. See the JobStore docstring for more details.

See also

Job, Flow, Response

class jobflow.core.job.Job(function, function_args=None, function_kwargs=None, output_schema=None, uuid=None, index=1, name=None, metadata=None, config=None, hosts=None, metadata_updates=None, config_updates=None, name_updates=None, **kwargs)[source]

Bases: MSONable

A Job is a delayed function call that can be used in an Flow.

In general, one should not create Job objects directly but instead use the job decorator on a function. Any calls to a decorated function will return an Job object.

Parameters:
  • function (Callable) – A function. Can be a builtin function such as sum or any other function provided it can be imported. Class and static methods can also be used, provided the class is importable. Lastly, methods (functions bound to an instance of class) can be used, provided the class is MSONable.

  • function_args (tuple[Any, ...]) – The positional arguments to the function call.

  • function_kwargs (dict[str, Any]) – The keyword arguments to the function call.

  • output_schema (type[BaseModel]) – A pydantic model that defines the schema of the output.

  • uuid (str) – A unique identifier for the job. Generated automatically.

  • index (int) – The index of the job (number of times the job has been replaced).

  • name (str) – The name of the job. If not set it will be determined from function.

  • metadata (dict[str, Any]) – A dictionary of information that will get stored alongside the job output.

  • config (JobConfig) – The config setting for the job.

  • hosts (list[str]) – The list of UUIDs of the hosts containing the job. The object identified by one UUID of the list should be contained in objects identified by its subsequent elements.

  • metadata_updates (list[dict[str, Any]]) – A list of updates for the metadata that will be applied to any Flow/Job generated by the job.

  • config_updates (list[dict[str, Any]]) – A list of updates for the config that will be applied to any Flow/Job generated by the job.

  • **kwargs – Additional keyword arguments that can be used to specify which outputs to save in additional stores. The argument name gives the additional store name and the argument value gives the type of data to store in that additional store. The value can be True in which case all outputs are stored in the additional store, a dictionary key (string or enum), an MSONable class, or a list of keys/classes.

  • name_updates (list[dict[str, Any]])

Variables:

output – The output of the job. This is a reference to the future job output and can be used as the input to other Jobs or Flows.

Returns:

A job.

Return type:

Job

Examples

Builtin functions such as print can be specified.

>>> print_task = Job(function=print, function_args=("I am a job", ))

Or other functions of the Python standard library.

>>> import os
>>> Job(function=os.path.join, function_args=("folder", "filename.txt"))

To use custom functions, the functions should be importable (i.e. not defined in another function). For example, if the following function is defined in the my_package module.

>>> def add(a, b):
...     return a + b
>>> add_job = Job(function=add, function_args=(1, 2))

More details are given in the job decorator docstring.

See also

job, Response, Flow

property input_references: tuple[jobflow.OutputReference, ...]

Find OutputReference objects in the job inputs.

Returns:

The references in the inputs to the job.

Return type:

tuple(OutputReference, …)

property input_uuids: tuple[str, ...]

Uuids of any OutputReference objects in the job inputs.

Returns:

The UUIDs of the references in the job inputs.

Return type:

tuple(str, …)

property input_references_grouped: dict[str, tuple[OutputReference, ...]]

Group any OutputReference objects in the job inputs by their UUIDs.

Returns:

The references grouped by their UUIDs.

Return type:

dict[str, tuple(OutputReference, …)]

property maker: jobflow.Maker | None

Get the host Maker object if a job is to run a maker function.

Returns:

A maker object.

Return type:

Maker or None

property graph: DiGraph

Get a graph of the job indicating the inputs to the job.

Returns:

The graph showing the connectivity of the jobs.

Return type:

DiGraph

property host

UUID of the first Flow that contains the Job.

Returns:

the UUID of the host.

Return type:

str

set_uuid(uuid)[source]

Set the UUID of the job.

Parameters:

uuid (str) – A UUID.

Return type:

None

run(store, job_dir=None)[source]

Run the job.

If the job has inputs that are OutputReference objects, then they will need to be resolved before the job can run. See the docstring for OutputReference.resolve() for more details.

Parameters:
  • store (jobflow.JobStore) – A JobStore to use for resolving references and storing job outputs.

  • job_dir (Path)

Returns:

The response of the job, containing the outputs, and other settings that determine the flow execution.

Return type:

Response

Raises:

ImportError – If the job function cannot be imported.

resolve_args(store, inplace=True)[source]

Resolve any OutputReference objects in the input arguments.

See the docstring for OutputReference.resolve() for more details.

Parameters:
  • store (jobflow.JobStore) – A maggma store to use for resolving references.

  • inplace (bool) – Update the arguments of the current job or return a new job object.

Returns:

A job with the references resolved.

Return type:

Job

update_kwargs(update, name_filter=None, function_filter=None, dict_mod=False)[source]

Update the kwargs of the jobs.

Parameters:
  • update (dict[str, Any]) – The updates to apply.

  • name_filter (str) – A filter for the job name. Only jobs with a matching name will be updated. Includes partial matches, e.g. “ad” will match a job with the name “adder”.

  • function_filter (Callable) – A filter for the job function. Only jobs with a matching function will be updated.

  • dict_mod (bool) – Use the dict mod language to apply updates. See DictMods for more details.

Examples

Consider a simple job with a number keyword argument.

>>> from jobflow import job
>>> @job
... def add(a, number=5):
...     return a + number
>>> add_job = add(1)

The number argument can be updated using.

>>> add_job.update_kwargs({"number": 10})
update_maker_kwargs(update, name_filter=None, class_filter=None, nested=True, dict_mod=False)[source]

Update the keyword arguments of any Maker objects in the job source.

Parameters:
  • update (dict[str, Any]) – The updates to apply.

  • name_filter (str) – A filter for the Maker name. Only Makers with a matching name will be updated. Includes partial matches, e.g. “ad” will match a Maker with the name “adder”.

  • class_filter (type[jobflow.Maker]) – A filter for the maker class. Only Makers with a matching class will be updated. Note the class filter will match any subclasses.

  • nested (bool) – Whether to apply the updates to Maker objects that are themselves kwargs of Maker, job, or flow objects. See examples for more details.

  • dict_mod (bool) – Use the dict mod language to apply updates. See DictMods for more details.

Examples

Consider the following job from a Maker:

>>> from dataclasses import dataclass
>>> from jobflow import job, Maker, Flow
>>> @dataclass
... class AddMaker(Maker):
...     name: str = "add"
...     number: float = 10
...
...     @job
...     def make(self, a):
...         return a + self.number
>>> maker = AddMaker()
>>> add_job = maker.make(1)

The number argument could be updated in the following ways.

>>> add_job.update_maker_kwargs({"number": 10})

By default, the updates are applied to nested Makers. These are Makers which are present in the kwargs of another Maker. Consider the following case for a Maker that produces a job that replaces itself with another job.

>>> from jobflow import Response
>>> @dataclass
... class ReplacementMaker(Maker):
...     name: str = "replace"
...     add_maker: Maker = AddMaker()
...
...     @job
...     def make(self, a):
...         add_job = self.add_maker.make(a)
...         return Response(replace=add_job)
>>> maker = ReplacementMaker()
>>> my_job = maker.make(1)

The following update will apply to the nested AddMaker in the kwargs of the RestartMaker:

>>> my_job.update_maker_kwargs({"number": 10}, class_filter=AddMaker)

However, if nested=False, then the update will not be applied to the nested Maker:

>>> my_job.update_maker_kwargs(
...     {"number": 10}, class_filter=AddMaker, nested=False
... )
append_name(append_str, prepend=False, dynamic=True)[source]

Append a string to the name of the job.

Parameters:
  • append_str (str) – A string to append.

  • prepend (bool) – Prepend the name rather than appending it.

  • dynamic (bool) – The updates will be propagated to Jobs/Flows dynamically generated at runtime.

update_metadata(update, name_filter=None, function_filter=None, dict_mod=False, dynamic=True, callback_filter=<function Job.<lambda>>)[source]

Update the metadata of the job.

Can optionally apply the same updates at runtime to any Job or Flow generated by this job.

Parameters:
  • update (dict[str, Any]) – The updates to apply.

  • name_filter (str) – A filter for the job name. Only jobs with a matching name will be updated. Includes partial matches, e.g. “ad” will match a job with the name “adder”.

  • function_filter (Callable) – A filter for the job function. Only jobs with a matching function will be updated.

  • dict_mod (bool) – Use the dict mod language to apply updates. See DictMods for more details.

  • dynamic (bool) – The updates will be propagated to Jobs/Flows dynamically generated at runtime.

  • callback_filter (Callable[[jobflow.Flow | Job], bool]) – A function that takes a Flow or Job instance and returns True if updates should be applied to that instance. Allows for custom filtering logic.

Examples

Consider a simple job that makes use of a Maker to generate additional jobs at runtime (see Response options for more details):

>>> @job
... def use_maker(maker):
...     return Response(replace=maker.make())

Calling update_metadata with dynamic set to True (the default)

>>> test_job = use_maker(ExampleMaker())
... test_job.update_metadata({"example": 1}, dynamic=True)

will not only set the example metadata to the test_job, but also to all the new Jobs that will be generated at runtime by the ExampleMaker.

update_metadata can be called multiple times with different filters to control which Jobs will be updated. For example, using a callback filter:

>>> test_job.update_metadata(
...     {"material_id": 42},
...     callback_filter=lambda job: isinstance(job.maker, SomeMaker)
... )

At variance, if dynamic is set to False the metadata will only be added to the filtered Jobs and not to any generated Jobs.

update_config(config, name_filter=None, function_filter=None, attributes=None, dynamic=True)[source]

Update the job config.

Can optionally apply the same updates at runtime to any Job or Flow generated by this job.

Parameters:
  • config (JobConfig | dict) – A JobConfig object or a dict with containing the attributes to update.

  • name_filter (str) – A filter for the job name. Only jobs with a matching name will be updated. Includes partial matches, e.g. “ad” will match a job with the name “adder”.

  • function_filter (Callable) – A filter for the job function. Only jobs with a matching function will be updated.

  • attributes (list[str] | str) – Which attributes of the job config to set. Can be specified as one or more attributes specified by their name.

  • dynamic (bool) – The updates will be propagated to Jobs/Flows dynamically generated at runtime.

Examples

Consider a simple job.

>>> from jobflow import job, JobConfig
>>> @job
... def add(a, b):
...     return a + b
>>> add_job = add(1, 2)

The config can be updated using.

>>> new_config = JobConfig(
...    manager_config={"_fworker": "myfworker"}, resolve_references=False
... )
>>> add_job.update_config(new_config)

To only update specific attributes, the attributes argument can be specified. For example, the following will only update the “manager_config” attribute of the job config.

>>> add_job.update_config(new_config, attributes="manager_config")

Alternatively, the config can be specified as a dictionary with keys that are attributes of the JobConfig object. This allows you to specify updates without having to create a completely new JobConfig object. For example:

>>> add_job.update_config({"manager_config": {"_fworker": "myfworker"}})

Consider instead a simple job that makes use of a Maker to generate additional jobs at runtime (see Response options for more details):

>>> @job
... def use_maker(maker):
...     return Response(replace=maker.make())

Calling update_config with dynamic set to True (the default)

>>> test_job = use_maker(ExampleMaker())
... test_job.update_config({"manager_config": {"_fworker": "myfworker"}})

will not only set the manager_config to the test_job, but also to all the new Jobs that will be generated at runtime by the ExampleMaker.

update_config can be called multiple times with different name_filter or function_filter to control which Jobs will be updated.

At variance, if dynamic is set to False the manager_config option will only be set for the test_job and not for the generated Jobs.

as_dict()[source]

Serialize the job as a dictionary.

Return type:

dict

add_hosts_uuids(hosts_uuids, prepend=False)[source]

Add a list of UUIDs to the internal list of hosts.

The elements of the list are supposed to be ordered in such a way that the object identified by one UUID of the list is contained in objects identified by its subsequent elements.

Parameters:
  • hosts_uuids (str | Sequence[str]) – A list of UUIDs to add.

  • prepend (bool) – Insert the UUIDs at the beginning of the list rather than extending it.

class jobflow.core.job.Response(output=None, detour=None, addition=None, replace=None, stored_data=None, stop_children=False, stop_jobflow=False, job_dir=None)[source]

Bases: Generic[T]

The Response contains the output, detours, and stop commands of a job.

Parameters:
  • output (T) – The job output.

  • detour (jobflow.Flow | Job | list[Job] | list[jobflow.Flow]) – A flow or job to detour to.

  • addition (jobflow.Flow | Job | list[Job] | list[jobflow.Flow]) – A flow or job to add to the current flow.

  • replace (jobflow.Flow | Job | list[Job] | list[jobflow.Flow]) – A flow or job to replace the current job.

  • stored_data (dict[Hashable, Any]) – Data to be stored by the flow manager.

  • stop_children (bool) – Stop any children of the current flow.

  • stop_jobflow (bool) – Stop executing all remaining jobs.

  • job_dir (str | Path) – The directory where the job was run.

classmethod from_job_returns(job_returns, output_schema=None, job_dir=None)[source]

Generate a Response from the outputs of a Job.

Parameters:
  • job_returns (Any | None) – The outputs of a job. If this is a Response object, the output schema will be applied to the response outputs and the response returned. Otherwise, the job_returns will be put into the outputs of a new Response object.

  • output_schema (type[BaseModel]) – A pydantic model associated with the job. Used to enforce a schema for the outputs.

  • job_dir (str | Path) – The directory where the job was run.

Raises:

ValueError – If the job outputs do not match the output schema.

Returns:

The job response controlling the data to store and flow execution options.

Return type:

Response

jobflow.core.job.apply_schema(output, schema)[source]

Apply schema to job outputs.

Parameters:
  • output (Any) – The job outputs.

  • schema (type[BaseModel] | None) – A pydantic model that defines the schema to apply.

Raises:
  • ValueError – If a schema is set but there are no outputs.

  • ValueError – If the outputs do not match the schema.

Returns:

Returns an instance of the schema if the schema is set or the original output.

Return type:

BaseModel or Any

jobflow.core.job.store_inputs(inputs)[source]

Job to store inputs.

Note that any Reference objects will not be resolved, however, missing references will be replaced with None.

Parameters:

inputs (Any) – The inputs to store.

Return type:

Any

jobflow.core.job.prepare_replace(replace, current_job)[source]

Prepare a replacement Flow or Job.

If the replacement is a Flow, then an additional Job will be inserted that maps the output id of the original job to outputs of the Flow.

If the replacement is a Flow or a Job, then this function pass on the manager config, schema, and metadata and set the according UUIDs and job index.

Parameters:
  • replace (jobflow.Flow | Job | list[Job]) – A Flow or Job to use as the replacement.

  • current_job (Job) – The current job.

Returns:

The updated flow.

Return type:

Flow

jobflow.core.job.pass_manager_config(jobs, manager_config)[source]

Pass the manager config on to any jobs in the jobs array.

Parameters:
  • jobs (Job | jobflow.Flow | list[Job | jobflow.Flow]) – A job, flow, or list of jobs/flows.

  • manager_config (dict[str, Any]) – A manager config to pass on.

  • metadata – Metadata to pass on.

jobflow.core.maker

Define base Maker class for creating jobs and flows.

class jobflow.core.maker.Maker[source]

Bases: MSONable

Base maker (factory) class for constructing Job and Flow objects.

Note, this class is only an abstract implementation. To create a functioning Maker, one should subclass Maker, define the make method and add a name field with a default value. See examples below.

See also

job, Job, Flow

Examples

Below we define a simple maker that generates a job to add two numbers. Next, we make a job using the maker. When creating a new maker it is essential to: 1) Add a name field with a default value (this controls the name of jobs produced by the maker) and 2) override the make method with a concrete implementation.

>>> from dataclasses import dataclass
>>> from jobflow import Maker, job
>>> @dataclass
... class AddMaker(Maker):
...     name: str = "add"
...
...     @job
...     def make(self, a, b):
...         return a + b
>>> maker = AddMaker()
>>> add_job = maker.make(1, 2)
>>> add_job.name
"add"

There are two key features of Maker objects that make them extremely powerful. The first is the ability to have class instance variables that are used inside the make function. For example, running the job below will produce the output 6.5.

>>> @dataclass
... class AddMaker(Maker):
...     name: str = "add"
...     c: float = 1.5
...
...     @job
...     def make(self, a, b):
...         return a + b + self.c
>>> maker = AddMaker(c=3.5)
>>> add_job = maker.make(1, 2)

Note

Jobs created by a Maker will inherit a copy of the Maker object. This means that if 1) a job is created, 2) the maker variables are changed, and 3) a new job is created, only the second job will reflect the updated maker variables.

The second useful feature is the ability for delayed creation of jobs. For example, consider a job that creates another job during its execution. If the new job has lots of optional arguments, this will require passing lots of arguments to the first job. However, using a Maker, the kwargs of the job can be specified as maker kwargs and specified when the first job is created, making it much cleaner to customise the newly created job. For example, consider this job that first squares two numbers, then creates a new job to add them.

>>> from jobflow import Response
>>> @dataclass
... class SquareThenAddMaker(Maker):
...     name: str = "square then add"
...     add_maker: Maker = AddMaker()
...
...     @job
...     def make(self, a, b):
...         a_squared = a ** 2
...         b_squared = b ** 2
...         add_job = self.add_maker.make(a_squared, b_squared)
...         return Response(detour=add_job)
>>> maker = SquareThenAddMaker()
>>> square_add_job = maker.make(3, 4)

The add job can be customised by specifying a custom AddMaker instance when the SquareThenAddMaker is initialized.

>>> custom_add_maker = AddMaker(c=1000)
>>> maker = SquareThenAddMaker(add_maker=custom_add_maker)

This provides a very natural way to contain and control the kwargs for dynamically created jobs.

Note that makers can also be used to create Flow objects. In this case, the only difference is that the make() method is not decorated using the job decorator and the function should return a flow. For example:

>>> from jobflow import Flow
>>> @dataclass
... class DoubleAddMaker(Maker):
...     name: str = "double add"
...     add_maker: Maker = AddMaker()
...
...     def make(self, a, b):
...         add_first = self.add_maker.make(a, b)
...         add_second = self.add_maker.make(add_first.output, b)
...         return Flow([add_first, add_second], name=self.name)
>>> maker = DoubleAddMaker()
>>> double_add_job = maker.make(1, 2)
make(*args, **kwargs)[source]

Make a job or a flow - must be overridden with a concrete implementation.

Return type:

jobflow.Flow | jobflow.Job

property name

Name of the Maker - must be overridden with a dataclass field.

update_kwargs(update, name_filter=None, class_filter=None, nested=True, dict_mod=False)[source]

Update the keyword arguments of the Maker.

Note

Note, the maker is not updated in place. Instead a new maker object is returned.

Parameters:
  • update (dict[str, Any]) – The updates to apply.

  • name_filter (str) – A filter for the Maker name. Only Makers with a matching name will be updated. Includes partial matches, e.g. “ad” will match a Maker with the name “adder”.

  • class_filter (type[Maker]) – A filter for the maker class. Only Makers with a matching class will be updated. Note the class filter will match any subclasses.

  • nested (bool) – Whether to apply the updates to Maker objects that are themselves kwargs of a Maker object. See examples for more details.

  • dict_mod (bool) – Use the dict mod language to apply updates. See DictMods for more details.

Examples

Consider the following Maker:

>>> from dataclasses import dataclass
>>> from jobflow import job, Maker
>>> @dataclass
... class AddMaker(Maker):
...     name: str = "add"
...     number: float = 10
...
...     @job
...     def make(self, a):
...         return a + self.number
>>> maker = AddMaker()

The number argument could be updated using:

>>> maker.update_kwargs({"number": 10})

By default, the updates are applied to nested Makers. These are Makers which are present in the kwargs of another Maker. For example, consider this maker that first squares a number, then creates a new job to add it.

>>> from jobflow import Response
>>> @dataclass
... class SquareThenAddMaker(Maker):
...     name: str = "square then add"
...     add_maker: Maker = AddMaker()
...
...     @job
...     def make(self, a, b):
...         a_squared = a ** 2
...         add_job = self.add_maker.make(a_squared)
...         return Response(detour=add_job)
>>> maker = SquareThenAddMaker()

The following update will apply to the nested AddMaker in the kwargs of the SquareThenAddMaker:

>>> maker = maker.update_kwargs({"number": 10}, class_filter=AddMaker)

However, if nested=False, then the update will not be applied to the nested Maker:

>>> maker = maker.update_kwargs(
...     {"number": 10}, class_filter=AddMaker, nested=False
... )
jobflow.core.maker.recursive_call(obj, func, name_filter=None, class_filter=None, nested=True)[source]

Recursively call a function on all Maker objects in the object.

Parameters:
  • obj (Maker) – The Maker object to call the function on.

  • func (Callable[[Maker], Maker]) – The function to call a Maker object, if it matches the filters. This function must return a new Maker object.

  • name_filter (str) – A filter for the Maker name. Only Makers with a matching name will be updated. Includes partial matches, e.g. “ad” will match a Maker with the name “adder”.

  • class_filter (type[Maker]) – A filter for the maker class. Only Makers with a matching class will be updated. Note the class filter will match any subclasses.

  • nested (bool) – Whether to apply the updates to Maker objects that are themselves kwargs of a Maker object. See examples for more details.

jobflow.core.reference

Define classes for handling job outputs.

class jobflow.core.reference.OnMissing(value)[source]

Bases: ValueEnum

What to do when a reference cannot be resolved.

  • ERROR: Throw an error.

  • NONE: Replace the missing reference with None.

  • PASS: Pass the unresolved OutputReference object on.

ERROR = 'error'
NONE = 'none'
PASS = 'pass'
class jobflow.core.reference.OutputReference(uuid, attributes=(), output_schema=None)[source]

Bases: MSONable

A reference to the output of a Job.

The output may or may not exist yet. Accordingly, OutputReference objects are essentially pointers to future job outputs. Output references can be used as input to a Job or Flow and will be “resolved”. This means, the value of the reference will be looked up in the job store and the value passed on to the job.

Output references are also how jobflow is able to determine the dependencies between jobs, allowing for construction of the job graph and to determine the job execution order.

Upon attribute access or indexing, an output reference will return another output reference with the attribute accesses stored. When resolving the reference, the attributes/indexing will be also be performed before the value is passed on to the job. See the examples for more details.

Parameters:
  • uuid (str) – The job uuid to which the output belongs.

  • attributes (tuple[tuple[str, Any], ...]) – A tuple of attributes or indexes that have been performed on the output. Attributes are specified by a tuple of ("a", attr), whereas indexes are specified as a tuple of ("i", index).

  • output_schema (type[BaseModel]) – A pydantic model that defines the schema of the output and that will be used to validate any attribute accesses or indexes. Note, schemas can only be used to validate the first attribute/index access.

Examples

A reference can be constructed using just the job uuid.

>>> from jobflow import OutputReference
>>> ref = OutputReference("1234")

Attribute accesses return new references.

>>> ref.my_attribute
OutputReference(1234, .my_attribute)

Attribute accesses and indexing can be chained.

>>> ref["key"][0].value
OutputReference(1234, ['key'], [0], .value)
resolve(store, cache=None, on_missing=OnMissing.ERROR, deserialize=True)[source]

Resolve the reference.

This function will query the job store for the reference value and perform any attribute accesses/indexing.

Note

When resolving multiple references simultaneously it is more efficient to use resolve_references as it will minimize the number of database requests.

Parameters:
  • store (jobflow.JobStore | None) – A job store.

  • cache (dict[str, Any]) – A dictionary cache to use for local caching of reference values.

  • on_missing (OnMissing) – What to do if the output reference is missing in the database and cache. See OnMissing for the available options.

  • deserialize (bool) – If False, the data extracted from the store will not be deserialized. Note that in this case, if a reference contains a derived property, it cannot be resolved.

Raises:

ValueError – If the reference cannot be found and on_missing is set to OnMissing.ERROR (default).

Returns:

The resolved reference if it can be found. If the reference cannot be found, the returned value will depend on the value of on_missing.

Return type:

Any

set_uuid(uuid, inplace=True)[source]

Set the UUID of the reference.

Parameters:
  • uuid (str) – A new UUID.

  • inplace – Whether to update the current reference object or return a completely new object.

Returns:

An output reference with the specified uuid.

Return type:

OutputReference

property attributes_formatted

Get a formatted description of the attributes.

as_dict()[source]

Serialize the reference as a dict.

jobflow.core.reference.resolve_references(references, store, cache=None, on_missing=OnMissing.ERROR, deserialize=True)[source]

Resolve multiple output references.

Uses caching to minimize number of database queries.

Parameters:
  • references (Sequence[OutputReference]) – A list or tuple of output references.

  • store (jobflow.JobStore) – A job store.

  • cache (dict[str, Any]) – A dictionary cache to use for local caching of reference values.

  • on_missing (OnMissing) – What to do if the output reference is missing in the database and cache. See OnMissing for the available options.

  • deserialize (bool) – If False, the data extracted from the store will not be deserialized. Note that in this case, if a reference contains a derived property, it cannot be resolved.

Returns:

The output values as a dictionary mapping of {reference: output}.

Return type:

dict[OutputReference, Any]

jobflow.core.reference.find_and_get_references(arg)[source]

Find and extract output references.

This function works on nested inputs. For example, lists or dictionaries (or combinations of list and dictionaries) that contain output references.

Parameters:

arg (Any) – The argument to search for references.

Returns:

The output references as a tuple.

Return type:

tuple[OutputReference]

jobflow.core.reference.find_and_resolve_references(arg, store, cache=None, on_missing=OnMissing.ERROR, deserialize=True)[source]

Return the input but with all output references replaced with their resolved values.

This function works on nested inputs. For example, lists or dictionaries (or combinations of list and dictionaries) that contain output references.

Parameters:
  • arg (Any) – The input argument containing output references.

  • store (jobflow.JobStore) – A job store.

  • cache (dict[str, Any]) – A dictionary cache to use for local caching of reference values.

  • on_missing (OnMissing) – What to do if the output reference is missing in the database and cache. See OnMissing for the available options.

  • deserialize (bool) – If False, the data extracted from the store will not be deserialized. Note that in this case, if a reference contains a derived property, it cannot be resolved.

Returns:

The input argument but with all output references replaced with their resolved values. If a reference cannot be found, its replacement value will depend on the value of on_missing.

Return type:

Any

jobflow.core.reference.validate_schema_access(schema, item)[source]

Validate that an attribute or index access is supported by a model.

If the item is associated to a nested model the class is returned.

Parameters:
  • schema (type[BaseModel]) – A pydantic model to use as the schema.

  • item (str) – An attribute or key to access.

Raises:

AttributeError – If the item is not a valid attribute of the schema.

Returns:

the bool is True if the schema access was valid. The BaseModel class associated with the item, if any.

Return type:

tuple[bool, BaseModel]

jobflow.core.state

Stateful interface for accessing the current job (and store).

This module defines the CURRENT_JOB object which has two attributes:

  • job: Containing the current job.

  • store: Containing the current store. Only available if expose_store is set in the job config.

jobflow.core.store

Define the primary jobflow database interface.

class jobflow.core.store.JobStore(docs_store, additional_stores=None, save=None, load=False)[source]

Bases: Store

Store intended to allow pushing and pulling documents into multiple stores.

Parameters:
  • docs_store (Store) – Store for basic documents.

  • additional_stores (dict[str, Store]) – Additional stores to use for storing large data/specific objects. Given as a mapping of {store name: store} where store is a maggma Store object.

  • save (save_type) – Which items to save in additional stores when uploading documents. Given as a mapping of {store name: store_type} where store_type can be a dictionary key (string or enum), an MSONable class, or a list of keys/classes.

  • load (load_type) – Which items to load from additional stores when querying documents. Given as a mapping of {store name: store_type} where store_type can be True`, in which case all saved items are loaded, a dictionary key (string or enum), an MSONable class, or a list of keys/classes. Alternatively, load=True will automatically load all items from every additional store.

  • key – main key to index on

  • last_updated_field – field for date/time stamping the data

  • last_updated_type – the date/time format for the last_updated_field. Can be “datetime” or “isoformat”

  • validator – Validator to validate documents going into the store.

property name: str

Get the name of the data source.

Returns:

A string representing this data source.

Return type:

str

connect(force_reset=False)[source]

Connect to the source data.

Parameters:

force_reset (bool) – Whether to reset the connection or not.

close()[source]

Close any connections.

count(criteria=None)[source]

Count the number of documents matching the query criteria.

Parameters:

criteria (dict | None) – PyMongo filter for documents to count in.

Returns:

The number of documents matching the query.

Return type:

int

query(criteria=None, properties=None, sort=None, skip=0, limit=0, load=None)[source]

Query the JobStore for documents.

Parameters:
  • criteria (dict) – PyMongo filter for documents to search in.

  • properties (dict | list) – Properties to return in grouped documents.

  • sort (dict[str, Sort | int]) – Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending.

  • skip (int) – Number of documents to skip.

  • limit (int) – Limit on the total number of documents returned.

  • load (load_type) – Which items to load from additional stores. See JobStore constructor for more details.

Yields:

Dict – The documents.

Return type:

Iterator[dict]

query_one(criteria=None, properties=None, sort=None, load=None)[source]

Query the Store for a single document.

Parameters:
  • criteria (dict) – PyMongo filter for documents to search.

  • properties (dict | list) – Properties to return in the document.

  • sort (dict[str, Sort | int]) – Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending.

  • load (load_type) – Which items to load from additional stores. See JobStore constructor for more details.

Returns:

The document.

Return type:

dict or None

update(docs, key=None, save=None)[source]

Update or insert documents into the Store.

Parameters:
  • docs (list[dict] | dict | JobStoreDocument | list[JobStoreDocument]) – The Pydantic document or list of Pydantic documents to update.

  • key (list | str) – Field name(s) to determine uniqueness for a document, can be a list of multiple fields, a single field, or None if the Store’s key field is to be used.

  • save (bool | save_type) – Which items to save in additional stores. See JobStore constructor for more details.

ensure_index(key, unique=False)[source]

Try to create an index on document store and return True success.

Parameters:
  • key (str) – Single key to index.

  • unique (bool) – Whether or not this index contains only unique keys.

Returns:

Whether the index exists/was created correctly.

Return type:

bool

groupby(keys, criteria=None, properties=None, sort=None, skip=0, limit=0, load=None)[source]

Group documents by keys.

Parameters:
  • keys (list[str] | str) – Fields to group documents.

  • criteria (dict) – PyMongo filter for documents to search in.

  • properties (dict | list) – Properties to return in grouped documents

  • sort (dict[str, Sort | int]) – Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending.

  • skip (int) – Number of documents to skip.

  • limit (int) – Limit on the total number of documents returned.

  • load (load_type) – Which items to load from additional stores. See JobStore constructor for more details.

Yields:

dict, list[dict] – The documents as (key, documents) grouped by their keys.

Return type:

Iterator[tuple[dict, list[dict]]]

remove_docs(criteria)[source]

Remove docs matching the criteria.

Parameters:

criteria (dict) – Criteria for documents to remove.

get_output(uuid, which='last', load=False, cache=None, on_missing=OnMissing.ERROR)[source]

Get the output of a job UUID.

Note that, unlike JobStore.query, this function will automatically try to resolve any output references in the job outputs.

Parameters:
  • uuid (str) – A job UUID.

  • which (str | int) –

    If there are multiple job runs, which index to use. Options are:

    • "last" (default): Use the last job that ran.

    • "first": Use the first job that ran.

    • "all": Return all outputs, sorted with the lowest index first.

    • Alternatively, if an integer is specified, the output for this specific index will be queried.

  • load (load_type) – Which items to load from additional stores. Setting to True will load all items stored in additional stores. See the JobStore constructor for more details.

  • cache (dict[str, Any]) – A dictionary cache to use for resolving of reference values.

  • on_missing (OnMissing) – What to do if the output contains a reference and the reference cannot be resolved.

Returns:

The output(s) for the job UUID.

Return type:

Any

classmethod from_file(db_file, **kwargs)[source]

Create a JobStore from a database file.

Two options are supported for the database file. The file should be in json or yaml format.

The simplest format is a monty dumped version of the store, generated using:

>>> from monty.serialization import dumpfn
>>> dumpfn(job_store, "job_store.yaml")

Alternatively, the file can contain the keys docs_store, additional_stores and any other keyword arguments supported by the JobStore constructor. The docs_store and additional stores are specified by the type key which must match a Maggma Store subclass, and the remaining keys are passed to the store constructor. For example, the following file would create a JobStore with a MongoStore for docs and a GridFSStore as an additional store for data.

docs_store:
  type: MongoStore
  database: jobflow_unittest
  collection_name: outputs
  host: localhost
  port: 27017
additional_stores:
  data:
    type: GridFSStore
    database: jobflow_unittest
    collection_name: outputs_blobs
    host: localhost
    port: 27017
Parameters:
  • db_file (str | Path) – Path to the file containing the credentials.

  • **kwargs – Additional keyword arguments that get passed to the JobStore constructor. These arguments are ignored if the file contains a monty serialised JobStore.

Returns:

A JobStore.

Return type:

JobStore

classmethod from_dict_spec(spec, **kwargs)[source]

Create an JobStore from a dict specification.

Note

This function is different to JobStore.from_dict which is used to load the monty serialised representation of the class.

The dictionary should contain the keys “docs_store”, “additional_stores” and any other keyword arguments supported by the JobStore constructor. The docs_store and additional stores are specified by the type key which must match a Maggma Store subclass, and the remaining keys are passed to the store constructor. For example, the following file would create a JobStore with a MongoStore for docs and a GridFSStore as an additional store for data.

docs_store:
  type: MongoStore
  database: jobflow_unittest
  collection_name: outputs
  host: localhost
  port: 27017
additional_stores:
  data:
    type: GridFSStore
    database: jobflow_unittest
    collection_name: outputs_blobs
    host: localhost
    port: 27017
Parameters:
  • spec (dict) – The dictionary specification.

  • **kwargs – Additional keyword arguments that get passed to the JobStore constructor.

Returns:

A JobStore.

Return type:

JobStore