jobflow.core¶
jobflow.core.flow¶
Define base Flow object.
- class jobflow.core.flow.JobOrder(value)[source]¶
Bases:
ValueEnum
Options to control the order of job execution.
AUTO
: Automatically determine the job order based on input and output references.LINEAR
: Run the jobs in the order they appear in the jobs array.
- AUTO = 'auto'¶
- LINEAR = 'linear'¶
- class jobflow.core.flow.Flow(jobs, output=None, name='Flow', order=JobOrder.AUTO, uuid=None, hosts=None)[source]¶
Bases:
MSONable
A Flow contains a collection of Jobs or other Flows to execute.
The
Flow
object is the main tool for constructing workflows. Flows can either contain Jobs or other Flows. LikeJob
objects, Flow objects can also have outputs, however, these are not explicitly stored in the database. Instead, the outputs of a Flow act to structure the outputs of the jobs contained within the Flow.- Parameters:
jobs (list[Flow | jobflow.Job] | jobflow.Job | Flow) – The jobs to be run as a list of
Job
orFlow
objects.output (Any) – The output of the flow. These should come from the output of one or more of the jobs.
name (str) – The flow name.
order (JobOrder) – The order in which the jobs should be executed. The default is to determine the order automatically based on the connections between jobs.
uuid (str) – The identifier of the flow. This is generated automatically.
hosts (list[str]) – The list of UUIDs of the hosts containing the job. This is updated automatically when a flow is included in the jobs array of another flow. The object identified by one UUID of the list should be contained in objects identified by its subsequent elements.
- Raises:
ValueError – If a job in the
jobs
array is already part of another flow.ValueError – If any jobs needed to resolve the inputs of all jobs in the
jobs
array are missing.ValueError – If any jobs needed to resolve the flow
output
are missing.
- Warns:
UserWarning – If a
.Job
orFlow
object is used as the Flowoutput
rather than anOutputReference
.
Examples
Below we define a simple job to add two numbers, and create an flow containing two connected add jobs.
>>> from jobflow import job, Flow >>> @job ... def add(a, b): ... return a + b >>> add_first = add(1, 2) >>> add_second = add(add_first.output, 2) >>> flow = Flow(jobs=[add_first, add_second])
This flow does not expose any of the outputs of the jobs contained within it. We could instead “register” the output of the second add as the output of the flow.
>>> flow = Flow(jobs=[add_first, add_second], output=add_second.output)
This will allow the flow to be used in another flow. In this way, Flows can be infinitely nested. For example:
>>> add_third = add(flow.output, 5) >>> outer_flow = Flow(jobs=[flow, add_third])
Flows can be run using an flow manager. These enable running Flows locally or on compute clusters (using the FireWorks manager).
>>> from jobflow.managers.local import run_locally >>> response = run_locally(flow)
- property output: Any¶
Get the output of the flow.
- Returns:
The output of the flow.
- Return type:
Any
- property job_uuids: tuple[str, ...]¶
Uuids of every Job contained in the Flow (including nested Flows).
- property all_uuids: tuple[str, ...]¶
Uuids of every Job and Flow contained in the Flow (including nested Flows).
- property graph: DiGraph¶
Get a graph indicating the connectivity of jobs in the flow.
- Returns:
The graph showing the connectivity of the jobs.
- Return type:
DiGraph
- property host: str | None¶
UUID of the first Flow that contains this Flow.
- Returns:
the UUID of the host.
- Return type:
- draw_graph(**kwargs)[source]¶
Draw the flow graph using matplotlib.
Requires matplotlib to be installed.
- Parameters:
kwargs – keyword arguments that are passed to
jobflow.utils.graph.draw_graph
.- Returns:
The matplotlib pyplot state object.
- Return type:
pyplot
- iterflow()[source]¶
Iterate through the jobs of the flow.
The jobs are yielded such that the job output references can always be resolved. I.e., root nodes of the flow graph are always returned first.
- Yields:
Job, list[str] – The Job and the uuids of any parent jobs (not to be confused with the host flow).
- update_kwargs(update, name_filter=None, function_filter=None, dict_mod=False)[source]¶
Update the kwargs of all Jobs in the Flow.
Note that updates will be applied to jobs in nested Flow.
- Parameters:
name_filter (str) – A filter for the job name. Only jobs with a matching name will be updated. Includes partial matches, e.g. “ad” will match a job with the name “adder”.
function_filter (Callable) – A filter for the job function. Only jobs with a matching function will be updated.
dict_mod (bool) – Use the dict mod language to apply updates. See
DictMods
for more details.
Examples
Consider a flow containing a simple job with a
number
keyword argument.>>> from jobflow import job, Flow >>> @job ... def add(a, number=5): ... return a + number >>> add_job = add(1) >>> flow = Flow([add_job])
The
number
argument could be updated in the following ways.>>> flow.update_kwargs({"number": 10})
This will work if all jobs in the flow have a kwarg called number. However, when this is not the case this will result in the bad input kwargs for some jobs. To only apply the update to the correct jobs, filters can be used.
>>> flow.update_kwargs({"number": 10}, name_filter="add") >>> flow.update_kwargs({"number": 10}, function_filter=add)
- update_maker_kwargs(update, name_filter=None, class_filter=None, nested=True, dict_mod=False)[source]¶
Update the keyword arguments of any
Maker
objects in the jobs.Note that updates will be applied to Jobs in any inner Flows.
- Parameters:
name_filter (str) – A filter for the Maker name. Only Makers with a matching name will be updated. Includes partial matches, e.g. “ad” will match a Maker with the name “adder”.
class_filter (type[jobflow.Maker]) – A filter for the maker class. Only Makers with a matching class will be updated. Note the class filter will match any subclasses.
nested (bool) – Whether to apply the updates to Maker objects that are themselves kwargs of Maker, job, or flow objects. See examples for more details.
dict_mod (bool) – Use the dict mod language to apply updates. See
DictMods
for more details.
Examples
Consider the following flow containing jobs from a Maker:
>>> from dataclasses import dataclass >>> from jobflow import job, Maker, Flow >>> @dataclass ... class AddMaker(Maker): ... name: str = "add" ... number: float = 10 ... ... @job ... def make(self, a): ... return a + self.number >>> maker = AddMaker() >>> add_job = maker.make(1) >>> flow = Flow([add_job])
The
number
argument could be updated in the following ways.>>> flow.update_maker_kwargs({"number": 10})
This will work if all Makers in the flow have a kwarg called number. However, when this is not the case this will result in the bad input kwargs for some Makers. To only apply the update to the correct Makers, filters can be used.
>>> flow.update_maker_kwargs({"number": 10}, name_filter="add") >>> flow.update_maker_kwargs({"number": 10}, class_filter=AddMaker)
By default, the updates are applied to nested Makers. These are Makers which are present in the kwargs of another Maker. Consider the following case for a Maker that produces a job that restarts.
>>> from jobflow import Response >>> @dataclass ... class RestartMaker(Maker): ... name: str = "replace" ... add_maker: Maker = AddMaker() ... ... @job ... def make(self, a): ... restart_job = self.add_maker.make(a) ... return Response(replace=restart_job) >>> maker = RestartMaker() >>> my_job = maker.make(1) >>> flow = Flow([my_job]
The following update will apply to the nested
AddMaker
in the kwargs of theRestartMaker
:>>> flow.update_maker_kwargs({"number": 10}, class_filter=AddMaker)
However, if
nested=False
, then the update will not be applied to the nested Maker:>>> flow.update_maker_kwargs( ... {"number": 10}, class_filter=AddMaker, nested=False ... )
- append_name(append_str, prepend=False, dynamic=True)[source]¶
Append a string to the name of the flow and all jobs contained in it.
- update_metadata(update, name_filter=None, function_filter=None, dict_mod=False, dynamic=True)[source]¶
Update the metadata of all Jobs in the Flow.
Note that updates will be applied to jobs in nested Flow.
- Parameters:
name_filter (str) – A filter for the job name. Only jobs with a matching name will be updated. Includes partial matches, e.g. “ad” will match a job with the name “adder”.
function_filter (Callable) – A filter for the job function. Only jobs with a matching function will be updated.
dict_mod (bool) – Use the dict mod language to apply updates. See
DictMods
for more details.dynamic (bool) – The updates will be propagated to Jobs/Flows dynamically generated at runtime.
Examples
Consider a flow containing two jobs.
>>> from jobflow import job, Flow >>> @job ... def add(a, b): ... return a + b >>> add_job1 = add(5, 6) >>> add_job2 = add(6, 7) >>> flow = Flow([add_job1, add_job2])
The
metadata
of both jobs could be updated as follows:>>> flow.update_metadata({"tag": "addition_job"})
- update_config(config, name_filter=None, function_filter=None, attributes=None, dynamic=True)[source]¶
Update the job config of all Jobs in the Flow.
Note that updates will be applied to jobs in nested Flow.
- Parameters:
config (jobflow.JobConfig | dict) – A JobConfig object or a dict with containing the attributes to update.
name_filter (str) – A filter for the job name. Only jobs with a matching name will be updated. Includes partial matches, e.g. “ad” will match a job with the name “adder”.
function_filter (Callable) – A filter for the job function. Only jobs with a matching function will be updated.
attributes (list[str] | str) – Which attributes of the job config to set. Can be specified as one or more attributes specified by their name.
dynamic (bool) – The updates will be propagated to Jobs/Flows dynamically generated at runtime.
Examples
Consider a flow containing two jobs.
>>> from jobflow import job, Flow >>> @job ... def add(a, b): ... return a + b >>> add_job1 = add(5, 6) >>> add_job2 = add(6, 7) >>> flow = Flow([add_job1, add_job2])
The
config
of both jobs could be updated as follows:>>> new_config = JobConfig( ... manager_config={"_fworker": "myfworker"}, resolve_references=False ... ) >>> flow.update_config(new_config)
To only update specific attributes, the
attributes
argument can be set. For example, the following will only update the “manager_config” attribute of the jobs’ config.>>> flow.update_config(new_config, attributes="manager_config")
Alternatively, the config can be specified as a dictionary with keys that are attributes of the JobConfig object. This allows you to specify updates without having to create a completely new JobConfig object. For example:
>>> flow.update_config({"manager_config": {"_fworker": "myfworker"}})
- add_hosts_uuids(hosts_uuids=None, prepend=False)[source]¶
Add a list of UUIDs to the internal list of hosts.
If hosts_uuids is None the uuid of this Flow will be added to the inner jobs and flow. Otherwise, the passed value will be set both in the list of hosts of the current flow and of the inner jobs and flows. The elements of the list are supposed to be ordered in such a way that the object identified by one UUID of the list is contained in objects identified by its subsequent elements.
jobflow.core.job¶
Functions and classes for representing Job objects.
- class jobflow.core.job.JobConfig(resolve_references=True, on_missing_references=OnMissing.ERROR, manager_config=<factory>, expose_store=False, pass_manager_config=True, response_manager_config=<factory>)[source]¶
Bases:
MSONable
The configuration parameters for a job.
- Parameters:
resolve_references (bool) – Whether to resolve any references before the job function is executed. If
False
the unresolved reference objects will be passed into the function call.on_missing_references (jobflow.core.reference.OnMissing) – What to do if the references cannot be resolved. The default is to throw an error.
manager_config (dict) – The configuration settings to control the manager execution.
expose_store (bool) – Whether to expose the store in
CURRENT_JOB`
when the job is running.pass_manager_config (bool) – Whether to pass the manager configuration on to detour, addition, and replacement jobs.
response_manager_config (dict) – The custom configuration to pass to a detour, addition, or replacement job. Using this kwarg will automatically take precedence over the behavior of
pass_manager_config
such that a different configuration thanmanger_config
can be passed to downstream jobs.
- Returns:
A
JobConfig
object.- Return type:
- jobflow.core.job.job(method: Callable | None = None) Callable[..., Job] [source]¶
- jobflow.core.job.job(method: Callable, **job_kwargs) Callable[..., Job]
- jobflow.core.job.job(method: None = None, **job_kwargs) Callable[..., Callable[..., Job]]
Wrap a function to produce a
Job
.Job
objects are delayed function calls that can be used in anFlow
. A job is composed of the function name and source and any arguments for the function. This decorator makes it simple to create job objects directly from a function definition. See the examples for more details.- Parameters:
method – A function to wrap. This should not be specified directly and is implied by the decorator.
**job_kwargs – Other keyword arguments that will get passed to the
Job
init method.
Examples
>>> @job ... def print_message(): ... print("I am a Job") >>> print_job = print_message() >>> type(print_job) <class 'jobflow.core.job.Job'> >>> print_job.function <function print_message at 0x7ff72bdf6af0>
Jobs can have required and optional parameters.
>>> @job ... def print_sum(a, b=0): ... return print(a + b) >>> print_sum_job = print_sum(1, 2) >>> print_sum_job.function_args (1, ) >>> print_sum_job.function_kwargs {"b": 2}
If the function returns a value it can be referenced using the
output
attribute of the job.>>> @job ... def add(a, b): ... return a + b >>> add_task = add(1, 2) >>> add_task.output OutputReference('abeb6f48-9b34-4698-ab69-e4dc2127ebe9')
Note
Because the task has not yet been run, the output value is an
OutputReference
object. References are automatically converted to their computed values (resolved) when the task runs.If a dictionary of values is returned, the values can be indexed in the usual way.
>>> @job ... def compute(a, b): ... return {"sum": a + b, "product": a * b} >>> compute_task = compute(1, 2) >>> compute_task.output["sum"] OutputReference('abeb6f48-9b34-4698-ab69-e4dc2127ebe9', 'sum')
Warning
If an output is indexed incorrectly, for example by trying to access a key that doesn’t exist, this error will only be raised when the Job is executed.
Jobs can return
Response
objects that control the flow execution flow. For example, to replace the current job with another job,replace
can be used.>>> from jobflow import Response >>> @job ... def replace(a, b): ... new_job = compute(a, b) ... return Response(replace=new_job)
By default, job outputs are stored in the :obj`.JobStore`
docs_store
. However, theJobStore
additional_stores` can also be used for job outputs. The stores are specified as keyword arguments, where the argument name gives the store name and the argument value is the type of data/key to store in that store. More details on the accepted key types are given in theJob
docstring. In the example below, the “graph” key is stored in an additional store named “graphs” and the “data” key is stored in an additional store called “large_data”.>>> @job(large_data="data", graphs="graph") ... def compute(a, b): ... return {"data": b, "graph": a }
- class jobflow.core.job.Job(function, function_args=None, function_kwargs=None, output_schema=None, uuid=None, index=1, name=None, metadata=None, config=None, hosts=None, metadata_updates=None, config_updates=None, name_updates=None, **kwargs)[source]¶
Bases:
MSONable
A
Job
is a delayed function call that can be used in anFlow
.In general, one should not create
Job
objects directly but instead use thejob
decorator on a function. Any calls to a decorated function will return anJob
object.- Parameters:
function (Callable) – A function. Can be a builtin function such as
sum
or any other function provided it can be imported. Class and static methods can also be used, provided the class is importable. Lastly, methods (functions bound to an instance of class) can be used, provided the class isMSONable
.function_args (tuple[Any, ...]) – The positional arguments to the function call.
function_kwargs (dict[str, Any]) – The keyword arguments to the function call.
output_schema (type[BaseModel]) – A pydantic model that defines the schema of the output.
uuid (str) – A unique identifier for the job. Generated automatically.
index (int) – The index of the job (number of times the job has been replaced).
name (str) – The name of the job. If not set it will be determined from
function
.metadata (dict[str, Any]) – A dictionary of information that will get stored alongside the job output.
config (JobConfig) – The config setting for the job.
hosts (list[str]) – The list of UUIDs of the hosts containing the job. The object identified by one UUID of the list should be contained in objects identified by its subsequent elements.
metadata_updates (list[dict[str, Any]]) – A list of updates for the metadata that will be applied to any Flow/Job generated by the job.
config_updates (list[dict[str, Any]]) – A list of updates for the config that will be applied to any Flow/Job generated by the job.
**kwargs – Additional keyword arguments that can be used to specify which outputs to save in additional stores. The argument name gives the additional store name and the argument value gives the type of data to store in that additional store. The value can be
True
in which case all outputs are stored in the additional store, a dictionary key (string or enum), anMSONable
class, or a list of keys/classes.
- Variables:
output – The output of the job. This is a reference to the future job output and can be used as the input to other Jobs or Flows.
- Returns:
A job.
- Return type:
Examples
Builtin functions such as
print
can be specified.>>> print_task = Job(function=print, args=("I am a job", ))
Or other functions of the Python standard library.
>>> import os >>> Job(function=os.path.join, args=("folder", "filename.txt"))
To use custom functions, the functions should be importable (i.e. not defined in another function). For example, if the following function is defined in the
my_package
module.>>> def add(a, b): ... return a + b >>> add_job = Job(function=add, args=(1, 2))
More details are given in the
job
decorator docstring.- property input_references: tuple[jobflow.OutputReference, ...]¶
Find
OutputReference
objects in the job inputs.- Returns:
The references in the inputs to the job.
- Return type:
tuple(OutputReference, …)
- property input_uuids: tuple[str, ...]¶
Uuids of any
OutputReference
objects in the job inputs.
- property input_references_grouped: dict[str, tuple[OutputReference, ...]]¶
Group any
OutputReference
objects in the job inputs by their UUIDs.- Returns:
The references grouped by their UUIDs.
- Return type:
dict[str, tuple(OutputReference, …)]
- property maker: jobflow.Maker | None¶
Get the host
Maker
object if a job is to run a maker function.- Returns:
A maker object.
- Return type:
Maker or None
- property graph: DiGraph¶
Get a graph of the job indicating the inputs to the job.
- Returns:
The graph showing the connectivity of the jobs.
- Return type:
DiGraph
- property host¶
UUID of the first Flow that contains the Job.
- Returns:
the UUID of the host.
- Return type:
- run(store, job_dir=None)[source]¶
Run the job.
If the job has inputs that are
OutputReference
objects, then they will need to be resolved before the job can run. See the docstring forOutputReference.resolve()
for more details.- Parameters:
store (jobflow.JobStore) – A
JobStore
to use for resolving references and storing job outputs.job_dir (Path)
- Returns:
The response of the job, containing the outputs, and other settings that determine the flow execution.
- Return type:
- Raises:
ImportError – If the job function cannot be imported.
See also
- resolve_args(store, inplace=True)[source]¶
Resolve any
OutputReference
objects in the input arguments.See the docstring for
OutputReference.resolve()
for more details.
- update_kwargs(update, name_filter=None, function_filter=None, dict_mod=False)[source]¶
Update the kwargs of the jobs.
- Parameters:
name_filter (str) – A filter for the job name. Only jobs with a matching name will be updated. Includes partial matches, e.g. “ad” will match a job with the name “adder”.
function_filter (Callable) – A filter for the job function. Only jobs with a matching function will be updated.
dict_mod (bool) – Use the dict mod language to apply updates. See
DictMods
for more details.
Examples
Consider a simple job with a
number
keyword argument.>>> from jobflow import job >>> @job ... def add(a, number=5): ... return a + number >>> add_job = add(1)
The
number
argument can be updated using.>>> add_job.update_kwargs({"number": 10})
- update_maker_kwargs(update, name_filter=None, class_filter=None, nested=True, dict_mod=False)[source]¶
Update the keyword arguments of any
Maker
objects in the job source.- Parameters:
name_filter (str) – A filter for the Maker name. Only Makers with a matching name will be updated. Includes partial matches, e.g. “ad” will match a Maker with the name “adder”.
class_filter (type[jobflow.Maker]) – A filter for the maker class. Only Makers with a matching class will be updated. Note the class filter will match any subclasses.
nested (bool) – Whether to apply the updates to Maker objects that are themselves kwargs of Maker, job, or flow objects. See examples for more details.
dict_mod (bool) – Use the dict mod language to apply updates. See
DictMods
for more details.
Examples
Consider the following job from a Maker:
>>> from dataclasses import dataclass >>> from jobflow import job, Maker, Flow >>> @dataclass ... class AddMaker(Maker): ... name: str = "add" ... number: float = 10 ... ... @job ... def make(self, a): ... return a + self.number >>> maker = AddMaker() >>> add_job = maker.make(1)
The
number
argument could be updated in the following ways.>>> add_job.update_maker_kwargs({"number": 10})
By default, the updates are applied to nested Makers. These are Makers which are present in the kwargs of another Maker. Consider the following case for a Maker that produces a job that replaces itself with another job.
>>> from jobflow import Response >>> @dataclass ... class ReplacementMaker(Maker): ... name: str = "replace" ... add_maker: Maker = AddMaker() ... ... @job ... def make(self, a): ... add_job = self.add_maker.make(a) ... return Response(replace=add_job) >>> maker = ReplacementMaker() >>> my_job = maker.make(1)
The following update will apply to the nested
AddMaker
in the kwargs of theRestartMaker
:>>> my_job.update_maker_kwargs({"number": 10}, class_filter=AddMaker)
However, if
nested=False
, then the update will not be applied to the nested Maker:>>> my_job.update_maker_kwargs( ... {"number": 10}, class_filter=AddMaker, nested=False ... )
- append_name(append_str, prepend=False, dynamic=True)[source]¶
Append a string to the name of the job.
- update_metadata(update, name_filter=None, function_filter=None, dict_mod=False, dynamic=True)[source]¶
Update the metadata of the job.
Can optionally apply the same updates at runtime to any Job or Flow generated by this job.
- Parameters:
name_filter (str) – A filter for the job name. Only jobs with a matching name will be updated. Includes partial matches, e.g. “ad” will match a job with the name “adder”.
function_filter (Callable) – A filter for the job function. Only jobs with a matching function will be updated.
dict_mod (bool) – Use the dict mod language to apply updates. See
DictMods
for more details.dynamic (bool) – The updates will be propagated to Jobs/Flows dynamically generated at runtime.
Examples
Consider a simple job that makes use of a
Maker
to generate additional jobs at runtime (seeResponse
options for more details):>>> @job ... def use_maker(maker): ... return Response(replace=maker.make())
Calling update_metadata with dynamic set to True (the default)
>>> test_job = use_maker(ExampleMaker()) ... test_job.update_metadata({"example": 1}, dynamic=True)
will not only set the example metadata to the test_job, but also to all the new Jobs that will be generated at runtime by the ExampleMaker.
update_metadata can be called multiple times with different name_filter or function_filter to control which Jobs will be updated.
At variance, if dynamic is set to False the example metadata will only be added to the test_job and not to the generated Jobs.
- update_config(config, name_filter=None, function_filter=None, attributes=None, dynamic=True)[source]¶
Update the job config.
Can optionally apply the same updates at runtime to any Job or Flow generated by this job.
- Parameters:
config (JobConfig | dict) – A JobConfig object or a dict with containing the attributes to update.
name_filter (str) – A filter for the job name. Only jobs with a matching name will be updated. Includes partial matches, e.g. “ad” will match a job with the name “adder”.
function_filter (Callable) – A filter for the job function. Only jobs with a matching function will be updated.
attributes (list[str] | str) – Which attributes of the job config to set. Can be specified as one or more attributes specified by their name.
dynamic (bool) – The updates will be propagated to Jobs/Flows dynamically generated at runtime.
Examples
Consider a simple job.
>>> from jobflow import job, JobConfig >>> @job ... def add(a, b): ... return a + b >>> add_job = add(1, 2)
The
config
can be updated using.>>> new_config = JobConfig( ... manager_config={"_fworker": "myfworker"}, resolve_references=False ... ) >>> add_job.update_config(new_config)
To only update specific attributes, the
attributes
argument can be specified. For example, the following will only update the “manager_config” attribute of the job config.>>> add_job.update_config(new_config, attributes="manager_config")
Alternatively, the config can be specified as a dictionary with keys that are attributes of the JobConfig object. This allows you to specify updates without having to create a completely new JobConfig object. For example:
>>> add_job.update_config({"manager_config": {"_fworker": "myfworker"}})
Consider instead a simple job that makes use of a
Maker
to generate additional jobs at runtime (seeResponse
options for more details):>>> @job ... def use_maker(maker): ... return Response(replace=maker.make())
Calling update_config with dynamic set to True (the default)
>>> test_job = use_maker(ExampleMaker()) ... test_job.update_config({"manager_config": {"_fworker": "myfworker"}})
will not only set the manager_config to the test_job, but also to all the new Jobs that will be generated at runtime by the ExampleMaker.
update_config can be called multiple times with different name_filter or function_filter to control which Jobs will be updated.
At variance, if dynamic is set to False the manager_config option will only be set for the test_job and not for the generated Jobs.
- class jobflow.core.job.Response(output=None, detour=None, addition=None, replace=None, stored_data=None, stop_children=False, stop_jobflow=False, job_dir=None)[source]¶
Bases:
Generic
[T
]The
Response
contains the output, detours, and stop commands of a job.- Parameters:
output (T) – The job output.
detour (jobflow.Flow | Job | list[Job] | list[jobflow.Flow]) – A flow or job to detour to.
addition (jobflow.Flow | Job | list[Job] | list[jobflow.Flow]) – A flow or job to add to the current flow.
replace (jobflow.Flow | Job | list[Job] | list[jobflow.Flow]) – A flow or job to replace the current job.
stored_data (dict[Hashable, Any]) – Data to be stored by the flow manager.
stop_children (bool) – Stop any children of the current flow.
stop_jobflow (bool) – Stop executing all remaining jobs.
job_dir (str | Path) – The directory where the job was run.
- classmethod from_job_returns(job_returns, output_schema=None, job_dir=None)[source]¶
Generate a
Response
from the outputs of aJob
.- Parameters:
job_returns (Any | None) – The outputs of a job. If this is a
Response
object, the output schema will be applied to the response outputs and the response returned. Otherwise, thejob_returns
will be put into theoutputs
of a newResponse
object.output_schema (type[BaseModel]) – A pydantic model associated with the job. Used to enforce a schema for the outputs.
job_dir (str | Path) – The directory where the job was run.
- Raises:
ValueError – If the job outputs do not match the output schema.
- Returns:
The job response controlling the data to store and flow execution options.
- Return type:
- jobflow.core.job.apply_schema(output, schema)[source]¶
Apply schema to job outputs.
- Parameters:
output (Any) – The job outputs.
schema (type[BaseModel] | None) – A pydantic model that defines the schema to apply.
- Raises:
ValueError – If a schema is set but there are no outputs.
ValueError – If the outputs do not match the schema.
- Returns:
Returns an instance of the schema if the schema is set or the original output.
- Return type:
BaseModel or Any
- jobflow.core.job.store_inputs(inputs)[source]¶
Job to store inputs.
Note that any
Reference
objects will not be resolved, however, missing references will be replaced withNone
.- Parameters:
inputs (Any) – The inputs to store.
- Return type:
Any
- jobflow.core.job.prepare_replace(replace, current_job)[source]¶
Prepare a replacement
Flow
orJob
.If the replacement is a
Flow
, then an additionalJob
will be inserted that maps the output id of the original job to outputs of theFlow
.If the replacement is a
Flow
or aJob
, then this function pass on the manager config, schema, and metadata and set the according UUIDs and job index.
jobflow.core.maker¶
Define base Maker class for creating jobs and flows.
- class jobflow.core.maker.Maker[source]¶
Bases:
MSONable
Base maker (factory) class for constructing
Job
andFlow
objects.Note, this class is only an abstract implementation. To create a functioning Maker, one should subclass
Maker
, define themake
method and add a name field with a default value. See examples below.Examples
Below we define a simple maker that generates a job to add two numbers. Next, we make a job using the maker. When creating a new maker it is essential to: 1) Add a name field with a default value (this controls the name of jobs produced by the maker) and 2) override the
make
method with a concrete implementation.>>> from dataclasses import dataclass >>> from jobflow import Maker, job >>> @dataclass ... class AddMaker(Maker): ... name: str = "add" ... ... @job ... def make(self, a, b): ... return a + b >>> maker = AddMaker() >>> add_job = maker.make(1, 2) >>> add_job.name "add"
There are two key features of
Maker
objects that make them extremely powerful. The first is the ability to have class instance variables that are used inside the make function. For example, running the job below will produce the output6.5
.>>> @dataclass ... class AddMaker(Maker): ... name: str = "add" ... c: float = 1.5 ... ... @job ... def make(self, a, b): ... return a + b + self.c >>> maker = AddMaker(c=3.5) >>> add_job = maker.make(1, 2)
Note
Jobs created by a Maker will inherit a copy of the Maker object. This means that if 1) a job is created, 2) the maker variables are changed, and 3) a new job is created, only the second job will reflect the updated maker variables.
The second useful feature is the ability for delayed creation of jobs. For example, consider a job that creates another job during its execution. If the new job has lots of optional arguments, this will require passing lots of arguments to the first job. However, using a Maker, the kwargs of the job can be specified as maker kwargs and specified when the first job is created, making it much cleaner to customise the newly created job. For example, consider this job that first squares two numbers, then creates a new job to add them.
>>> from jobflow import Response >>> @dataclass ... class SquareThenAddMaker(Maker): ... name: str = "square then add" ... add_maker: Maker = AddMaker() ... ... @job ... def make(self, a, b): ... a_squared = a ** 2 ... b_squared = b ** 2 ... add_job = self.add_maker.make(a_squared, b_squared) ... return Response(detour=add_job) >>> maker = SquareThenAddMaker() >>> square_add_job = maker.make(3, 4)
The add job can be customised by specifying a custom
AddMaker
instance when theSquareThenAddMaker
is initialized.>>> custom_add_maker = AddMaker(c=1000) >>> maker = SquareThenAddMaker(add_maker=custom_add_maker)
This provides a very natural way to contain and control the kwargs for dynamically created jobs.
Note that makers can also be used to create
Flow
objects. In this case, the only difference is that themake()
method is not decorated using thejob
decorator and the function should return a flow. For example:>>> from jobflow import Flow >>> @dataclass ... class DoubleAddMaker(Maker): ... name: str = "double add" ... add_maker: Maker = AddMaker() ... ... def make(self, a, b): ... add_first = self.add_maker.make(a, b) ... add_second = self.add_maker.make(add_first.output, b) ... return Flow([add_first, add_second], name=self.name) >>> maker = DoubleAddMaker() >>> double_add_job = maker.make(1, 2)
- make(*args, **kwargs)[source]¶
Make a job or a flow - must be overridden with a concrete implementation.
- Return type:
jobflow.Flow | jobflow.Job
- property name¶
Name of the Maker - must be overridden with a dataclass field.
- update_kwargs(update, name_filter=None, class_filter=None, nested=True, dict_mod=False)[source]¶
Update the keyword arguments of the
Maker
.Note
Note, the maker is not updated in place. Instead a new maker object is returned.
- Parameters:
name_filter (str) – A filter for the Maker name. Only Makers with a matching name will be updated. Includes partial matches, e.g. “ad” will match a Maker with the name “adder”.
class_filter (type[Maker]) – A filter for the maker class. Only Makers with a matching class will be updated. Note the class filter will match any subclasses.
nested (bool) – Whether to apply the updates to Maker objects that are themselves kwargs of a Maker object. See examples for more details.
dict_mod (bool) – Use the dict mod language to apply updates. See
DictMods
for more details.
Examples
Consider the following Maker:
>>> from dataclasses import dataclass >>> from jobflow import job, Maker >>> @dataclass ... class AddMaker(Maker): ... name: str = "add" ... number: float = 10 ... ... @job ... def make(self, a): ... return a + self.number >>> maker = AddMaker()
The
number
argument could be updated using:>>> maker.update_kwargs({"number": 10})
By default, the updates are applied to nested Makers. These are Makers which are present in the kwargs of another Maker. For example, consider this maker that first squares a number, then creates a new job to add it.
>>> from jobflow import Response >>> @dataclass ... class SquareThenAddMaker(Maker): ... name: str = "square then add" ... add_maker: Maker = AddMaker() ... ... @job ... def make(self, a, b): ... a_squared = a ** 2 ... add_job = self.add_maker.make(a_squared) ... return Response(detour=add_job) >>> maker = SquareThenAddMaker()
The following update will apply to the nested
AddMaker
in the kwargs of theSquareThenAddMaker
:>>> maker = maker.update_kwargs({"number": 10}, class_filter=AddMaker)
However, if
nested=False
, then the update will not be applied to the nested Maker:>>> maker = maker.update_kwargs( ... {"number": 10}, class_filter=AddMaker, nested=False ... )
- jobflow.core.maker.recursive_call(obj, func, name_filter=None, class_filter=None, nested=True)[source]¶
Recursively call a function on all Maker objects in the object.
- Parameters:
obj (Maker) – The Maker object to call the function on.
func (Callable[[Maker], Maker]) – The function to call a Maker object, if it matches the filters. This function must return a new Maker object.
name_filter (str) – A filter for the Maker name. Only Makers with a matching name will be updated. Includes partial matches, e.g. “ad” will match a Maker with the name “adder”.
class_filter (type[Maker]) – A filter for the maker class. Only Makers with a matching class will be updated. Note the class filter will match any subclasses.
nested (bool) – Whether to apply the updates to Maker objects that are themselves kwargs of a Maker object. See examples for more details.
jobflow.core.reference¶
Define classes for handling job outputs.
- class jobflow.core.reference.OnMissing(value)[source]¶
Bases:
ValueEnum
What to do when a reference cannot be resolved.
ERROR
: Throw an error.NONE
: Replace the missing reference withNone
.PASS
: Pass the unresolvedOutputReference
object on.
- ERROR = 'error'¶
- NONE = 'none'¶
- PASS = 'pass'¶
- class jobflow.core.reference.OutputReference(uuid, attributes=(), output_schema=None)[source]¶
Bases:
MSONable
A reference to the output of a
Job
.The output may or may not exist yet. Accordingly,
OutputReference
objects are essentially pointers to future job outputs. Output references can be used as input to aJob
orFlow
and will be “resolved”. This means, the value of the reference will be looked up in the job store and the value passed on to the job.Output references are also how jobflow is able to determine the dependencies between jobs, allowing for construction of the job graph and to determine the job execution order.
Upon attribute access or indexing, an output reference will return another output reference with the attribute accesses stored. When resolving the reference, the attributes/indexing will be also be performed before the value is passed on to the job. See the examples for more details.
- Parameters:
uuid (str) – The job uuid to which the output belongs.
attributes (tuple[tuple[str, Any], ...]) – A tuple of attributes or indexes that have been performed on the output. Attributes are specified by a tuple of
("a", attr)
, whereas indexes are specified as a tuple of("i", index)
.output_schema (type[BaseModel]) – A pydantic model that defines the schema of the output and that will be used to validate any attribute accesses or indexes. Note, schemas can only be used to validate the first attribute/index access.
Examples
A reference can be constructed using just the job uuid.
>>> from jobflow import OutputReference >>> ref = OutputReference("1234")
Attribute accesses return new references.
>>> ref.my_attribute OutputReference(1234, .my_attribute)
Attribute accesses and indexing can be chained.
>>> ref["key"][0].value OutputReference(1234, ['key'], [0], .value)
- resolve(store, cache=None, on_missing=OnMissing.ERROR, deserialize=True)[source]¶
Resolve the reference.
This function will query the job store for the reference value and perform any attribute accesses/indexing.
Note
When resolving multiple references simultaneously it is more efficient to use
resolve_references
as it will minimize the number of database requests.- Parameters:
store (jobflow.JobStore | None) – A job store.
cache (dict[str, Any]) – A dictionary cache to use for local caching of reference values.
on_missing (OnMissing) – What to do if the output reference is missing in the database and cache. See
OnMissing
for the available options.deserialize (bool) – If False, the data extracted from the store will not be deserialized. Note that in this case, if a reference contains a derived property, it cannot be resolved.
- Raises:
ValueError – If the reference cannot be found and
on_missing
is set toOnMissing.ERROR
(default).- Returns:
The resolved reference if it can be found. If the reference cannot be found, the returned value will depend on the value of
on_missing
.- Return type:
Any
- set_uuid(uuid, inplace=True)[source]¶
Set the UUID of the reference.
- Parameters:
uuid (str) – A new UUID.
inplace – Whether to update the current reference object or return a completely new object.
- Returns:
An output reference with the specified uuid.
- Return type:
- property attributes_formatted¶
Get a formatted description of the attributes.
- jobflow.core.reference.resolve_references(references, store, cache=None, on_missing=OnMissing.ERROR, deserialize=True)[source]¶
Resolve multiple output references.
Uses caching to minimize number of database queries.
- Parameters:
references (Sequence[OutputReference]) – A list or tuple of output references.
store (jobflow.JobStore) – A job store.
cache (dict[str, Any]) – A dictionary cache to use for local caching of reference values.
on_missing (OnMissing) – What to do if the output reference is missing in the database and cache. See
OnMissing
for the available options.deserialize (bool) – If False, the data extracted from the store will not be deserialized. Note that in this case, if a reference contains a derived property, it cannot be resolved.
- Returns:
The output values as a dictionary mapping of
{reference: output}
.- Return type:
dict[OutputReference, Any]
- jobflow.core.reference.find_and_get_references(arg)[source]¶
Find and extract output references.
This function works on nested inputs. For example, lists or dictionaries (or combinations of list and dictionaries) that contain output references.
- Parameters:
arg (Any) – The argument to search for references.
- Returns:
The output references as a tuple.
- Return type:
- jobflow.core.reference.find_and_resolve_references(arg, store, cache=None, on_missing=OnMissing.ERROR, deserialize=True)[source]¶
Return the input but with all output references replaced with their resolved values.
This function works on nested inputs. For example, lists or dictionaries (or combinations of list and dictionaries) that contain output references.
- Parameters:
arg (Any) – The input argument containing output references.
store (jobflow.JobStore) – A job store.
cache (dict[str, Any]) – A dictionary cache to use for local caching of reference values.
on_missing (OnMissing) – What to do if the output reference is missing in the database and cache. See
OnMissing
for the available options.deserialize (bool) – If False, the data extracted from the store will not be deserialized. Note that in this case, if a reference contains a derived property, it cannot be resolved.
- Returns:
The input argument but with all output references replaced with their resolved values. If a reference cannot be found, its replacement value will depend on the value of
on_missing
.- Return type:
Any
- jobflow.core.reference.validate_schema_access(schema, item)[source]¶
Validate that an attribute or index access is supported by a model.
If the item is associated to a nested model the class is returned.
- Parameters:
- Raises:
AttributeError – If the item is not a valid attribute of the schema.
- Returns:
the bool is
True
if the schema access was valid. The BaseModel class associated with the item, if any.- Return type:
jobflow.core.state¶
Stateful interface for accessing the current job (and store).
This module defines the CURRENT_JOB
object which has two attributes:
job
: Containing the current job.store
: Containing the current store. Only available ifexpose_store
is set in the job config.
jobflow.core.store¶
Define the primary jobflow database interface.
- class jobflow.core.store.JobStore(docs_store, additional_stores=None, save=None, load=False)[source]¶
Bases:
Store
Store intended to allow pushing and pulling documents into multiple stores.
- Parameters:
docs_store (Store) – Store for basic documents.
additional_stores (dict[str, Store]) – Additional stores to use for storing large data/specific objects. Given as a mapping of
{store name: store}
where store is a maggmaStore
object.save (save_type) – Which items to save in additional stores when uploading documents. Given as a mapping of
{store name: store_type}
wherestore_type
can be a dictionary key (string or enum), anMSONable
class, or a list of keys/classes.load (load_type) – Which items to load from additional stores when querying documents. Given as a mapping of
{store name: store_type}
wherestore_type
can be True`, in which case all saved items are loaded, a dictionary key (string or enum), anMSONable
class, or a list of keys/classes. Alternatively,load=True
will automatically load all items from every additional store.key – main key to index on
last_updated_field – field for date/time stamping the data
last_updated_type – the date/time format for the last_updated_field. Can be “datetime” or “isoformat”
validator – Validator to validate documents going into the store.
- property name: str¶
Get the name of the data source.
- Returns:
A string representing this data source.
- Return type:
- connect(force_reset=False)[source]¶
Connect to the source data.
- Parameters:
force_reset (bool) – Whether to reset the connection or not.
- query(criteria=None, properties=None, sort=None, skip=0, limit=0, load=None)[source]¶
Query the JobStore for documents.
- Parameters:
criteria (dict) – PyMongo filter for documents to search in.
properties (dict | list) – Properties to return in grouped documents.
sort (dict[str, Sort | int]) – Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending.
skip (int) – Number of documents to skip.
limit (int) – Limit on the total number of documents returned.
load (load_type) – Which items to load from additional stores. See
JobStore
constructor for more details.
- Yields:
Dict – The documents.
- Return type:
Iterator[dict]
- query_one(criteria=None, properties=None, sort=None, load=None)[source]¶
Query the Store for a single document.
- Parameters:
criteria (dict) – PyMongo filter for documents to search.
properties (dict | list) – Properties to return in the document.
sort (dict[str, Sort | int]) – Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending.
load (load_type) – Which items to load from additional stores. See
JobStore
constructor for more details.
- Returns:
The document.
- Return type:
dict or None
- update(docs, key=None, save=None)[source]¶
Update or insert documents into the Store.
- Parameters:
docs (list[dict] | dict | JobStoreDocument | list[JobStoreDocument]) – The Pydantic document or list of Pydantic documents to update.
key (list | str) – Field name(s) to determine uniqueness for a document, can be a list of multiple fields, a single field, or None if the Store’s key field is to be used.
save (bool | save_type) – Which items to save in additional stores. See
JobStore
constructor for more details.
- ensure_index(key, unique=False)[source]¶
Try to create an index on document store and return True success.
- groupby(keys, criteria=None, properties=None, sort=None, skip=0, limit=0, load=None)[source]¶
Group documents by keys.
- Parameters:
criteria (dict) – PyMongo filter for documents to search in.
properties (dict | list) – Properties to return in grouped documents
sort (dict[str, Sort | int]) – Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending.
skip (int) – Number of documents to skip.
limit (int) – Limit on the total number of documents returned.
load (load_type) – Which items to load from additional stores. See
JobStore
constructor for more details.
- Yields:
dict, list[dict] – The documents as (key, documents) grouped by their keys.
- Return type:
- remove_docs(criteria)[source]¶
Remove docs matching the criteria.
- Parameters:
criteria (dict) – Criteria for documents to remove.
- get_output(uuid, which='last', load=False, cache=None, on_missing=OnMissing.ERROR)[source]¶
Get the output of a job UUID.
Note that, unlike
JobStore.query
, this function will automatically try to resolve any output references in the job outputs.- Parameters:
uuid (str) – A job UUID.
If there are multiple job runs, which index to use. Options are:
"last"
(default): Use the last job that ran."first"
: Use the first job that ran."all"
: Return all outputs, sorted with the lowest index first.Alternatively, if an integer is specified, the output for this specific index will be queried.
load (load_type) – Which items to load from additional stores. Setting to
True
will load all items stored in additional stores. See theJobStore
constructor for more details.cache (dict[str, Any]) – A dictionary cache to use for resolving of reference values.
on_missing (OnMissing) – What to do if the output contains a reference and the reference cannot be resolved.
- Returns:
The output(s) for the job UUID.
- Return type:
Any
- classmethod from_file(db_file, **kwargs)[source]¶
Create a JobStore from a database file.
Two options are supported for the database file. The file should be in json or yaml format.
The simplest format is a monty dumped version of the store, generated using:
>>> from monty.serialization import dumpfn >>> dumpfn(job_store, "job_store.yaml")
Alternatively, the file can contain the keys docs_store, additional_stores and any other keyword arguments supported by the
JobStore
constructor. The docs_store and additional stores are specified by thetype
key which must match a MaggmaStore
subclass, and the remaining keys are passed to the store constructor. For example, the following file would create aJobStore
with aMongoStore
for docs and aGridFSStore
as an additional store for data.docs_store: type: MongoStore database: jobflow_unittest collection_name: outputs host: localhost port: 27017 additional_stores: data: type: GridFSStore database: jobflow_unittest collection_name: outputs_blobs host: localhost port: 27017
- classmethod from_dict_spec(spec, **kwargs)[source]¶
Create an JobStore from a dict specification.
Note
This function is different to
JobStore.from_dict
which is used to load the monty serialised representation of the class.The dictionary should contain the keys “docs_store”, “additional_stores” and any other keyword arguments supported by the
JobStore
constructor. The docs_store and additional stores are specified by thetype
key which must match a MaggmaStore
subclass, and the remaining keys are passed to the store constructor. For example, the following file would create aJobStore
with aMongoStore
for docs and aGridFSStore
as an additional store for data.docs_store: type: MongoStore database: jobflow_unittest collection_name: outputs host: localhost port: 27017 additional_stores: data: type: GridFSStore database: jobflow_unittest collection_name: outputs_blobs host: localhost port: 27017