Source code for jobflow.core.flow

"""Define base Flow object."""

from __future__ import annotations

import logging
import typing
import warnings

from monty.json import MSONable

from jobflow.core.reference import find_and_get_references
from jobflow.utils import ValueEnum, contains_flow_or_job, suuid

if typing.TYPE_CHECKING:
    from typing import Any, Callable

    from networkx import DiGraph

    import jobflow

__all__ = ["JobOrder", "Flow", "get_flow"]

logger = logging.getLogger(__name__)


[docs]class JobOrder(ValueEnum): """ Options to control the order of job execution. - ``AUTO``: Automatically determine the job order based on input and output references. - ``LINEAR``: Run the jobs in the order they appear in the jobs array. """ AUTO = "auto" LINEAR = "linear"
[docs]class Flow(MSONable): """ A Flow contains a collection of Jobs or other Flows to execute. The :obj:`Flow` object is the main tool for constructing workflows. Flows can either contain Jobs or other Flows. Like :obj:`.Job` objects, Flow objects can also have outputs, however, these are not explicitly stored in the database. Instead, the outputs of a Flow act to structure the outputs of the jobs contained within the Flow. Parameters ---------- jobs The jobs to be run as a list of :obj:`.Job` or :obj:`Flow` objects. output The output of the flow. These should come from the output of one or more of the jobs. name The flow name. order The order in which the jobs should be executed. The default is to determine the order automatically based on the connections between jobs. uuid The identifier of the flow. This is generated automatically. hosts The list of UUIDs of the hosts containing the job. This is updated automatically when a flow is included in the jobs array of another flow. The object identified by one UUID of the list should be contained in objects identified by its subsequent elements. Raises ------ ValueError If a job in the ``jobs`` array is already part of another flow. ValueError If any jobs needed to resolve the inputs of all jobs in the ``jobs`` array are missing. ValueError If any jobs needed to resolve the flow ``output`` are missing. Warns ----- UserWarning If a ``.Job`` or ``Flow`` object is used as the Flow ``output`` rather than an ``OutputReference``. See Also -------- .job, .Job, JobOrder Examples -------- Below we define a simple job to add two numbers, and create an flow containing two connected add jobs. >>> from jobflow import job, Flow >>> @job ... def add(a, b): ... return a + b >>> add_first = add(1, 2) >>> add_second = add(add_first.output, 2) >>> flow = Flow(jobs=[add_first, add_second]) This flow does not expose any of the outputs of the jobs contained within it. We could instead "register" the output of the second add as the output of the flow. >>> flow = Flow(jobs=[add_first, add_second], output=add_second.output) This will allow the flow to be used in another flow. In this way, Flows can be infinitely nested. For example: >>> add_third = add(flow.output, 5) >>> outer_flow = Flow(jobs=[flow, add_third]) Flows can be run using an flow manager. These enable running Flows locally or on compute clusters (using the FireWorks manager). >>> from jobflow.managers.local import run_locally >>> response = run_locally(flow) """ def __init__( self, jobs: list[Flow | jobflow.Job] | jobflow.Job | Flow, output: Any | None = None, name: str = "Flow", order: JobOrder = JobOrder.AUTO, uuid: str = None, hosts: list[str] | None = None, ): from jobflow.core.job import Job if isinstance(jobs, (Job, Flow)): jobs = [jobs] if uuid is None: uuid = suuid() self.name = name self.order = order self.uuid = uuid self.hosts = hosts or [] self._jobs: tuple[Flow | Job, ...] = () self.add_jobs(jobs) self.output = output @property def jobs(self) -> tuple[Flow | jobflow.Job, ...]: """ Get the Jobs in the Flow. Returns ------- list[Job] The list of Jobs/Flows of the Flow. """ return self._jobs @property def output(self) -> Any: """ Get the output of the flow. Returns ------- Any The output of the flow. """ return self._output @output.setter def output(self, output: Any): """ Set the output of the Flow. The output should be compatible with the list of Jobs/Flows contained in the Flow. Parameters ---------- output The output of the flow. These should come from the output of one or more of the jobs. """ if output is not None: if contains_flow_or_job(output): warnings.warn( f"Flow '{self.name}' contains a Flow or Job as an output. " f"Usually the Flow output should be the output of a Job or " f"another Flow (e.g. job.output). If this message is " f"unexpected then double check the outputs of your Flow." ) # check if the jobs array contains all jobs needed for the references references = find_and_get_references(output) reference_uuids = {ref.uuid for ref in references} if not reference_uuids.issubset(set(self.job_uuids)): raise ValueError( "jobs array does not contain all jobs needed for flow output" ) self._output = output @property def job_uuids(self) -> tuple[str, ...]: """ Uuids of every Job contained in the Flow (including nested Flows). Returns ------- tuple[str] The uuids of all Jobs in the Flow (including nested Flows). """ uuids: list[str] = [] for job in self.jobs: if isinstance(job, Flow): uuids.extend(job.job_uuids) else: uuids.append(job.uuid) return tuple(uuids) @property def all_uuids(self) -> tuple[str, ...]: """ Uuids of every Job and Flow contained in the Flow (including nested Flows). Returns ------- tuple[str] The uuids of all Jobs and Flows in the Flow (including nested Flows). """ uuids: list[str] = [] for job in self.jobs: if isinstance(job, Flow): uuids.extend(job.all_uuids) uuids.append(job.uuid) return tuple(uuids) @property def graph(self) -> DiGraph: """ Get a graph indicating the connectivity of jobs in the flow. Returns ------- DiGraph The graph showing the connectivity of the jobs. """ from itertools import product import networkx as nx graph = nx.compose_all([job.graph for job in self.jobs]) if self.order == JobOrder.LINEAR: # add fake edges between jobs to force linear order edges = [] for job_a, job_b in nx.utils.pairwise(self.jobs): if isinstance(job_a, Flow): leaves = [v for v, d in job_a.graph.out_degree() if d == 0] else: leaves = [job_a.uuid] if isinstance(job_b, Flow): roots = [v for v, d in job_b.graph.in_degree() if d == 0] else: roots = [job_b.uuid] for leaf, root in product(leaves, roots): edges.append((leaf, root, {"properties": ""})) graph.add_edges_from(edges) return graph @property def host(self) -> str | None: """ UUID of the first Flow that contains this Flow. Returns ------- str the UUID of the host. """ return self.hosts[0] if self.hosts else None
[docs] def draw_graph(self, **kwargs): """ Draw the flow graph using matplotlib. Requires matplotlib to be installed. Parameters ---------- kwargs keyword arguments that are passed to :obj:`jobflow.utils.graph.draw_graph`. Returns ------- pyplot The matplotlib pyplot state object. """ from jobflow.utils.graph import draw_graph return draw_graph(self.graph, **kwargs)
[docs] def iterflow(self): """ Iterate through the jobs of the flow. The jobs are yielded such that the job output references can always be resolved. I.e., root nodes of the flow graph are always returned first. Yields ------ Job, list[str] The Job and the uuids of any parent jobs (not to be confused with the host flow). """ from networkx import is_directed_acyclic_graph from jobflow.utils.graph import itergraph graph = self.graph if not is_directed_acyclic_graph(graph): raise ValueError( "Job connectivity contains cycles therefore job execution order " "cannot be determined." ) for node in itergraph(graph): parents = [u for u, v in graph.in_edges(node)] job = graph.nodes[node]["job"] yield job, parents
[docs] def update_kwargs( self, update: dict[str, Any], name_filter: str | None = None, function_filter: Callable | None = None, dict_mod: bool = False, ): """ Update the kwargs of all Jobs in the Flow. Note that updates will be applied to jobs in nested Flow. Parameters ---------- update The updates to apply. name_filter A filter for the job name. Only jobs with a matching name will be updated. Includes partial matches, e.g. "ad" will match a job with the name "adder". function_filter A filter for the job function. Only jobs with a matching function will be updated. dict_mod Use the dict mod language to apply updates. See :obj:`.DictMods` for more details. Examples -------- Consider a flow containing a simple job with a ``number`` keyword argument. >>> from jobflow import job, Flow >>> @job ... def add(a, number=5): ... return a + number >>> add_job = add(1) >>> flow = Flow([add_job]) The ``number`` argument could be updated in the following ways. >>> flow.update_kwargs({"number": 10}) This will work if all jobs in the flow have a kwarg called number. However, when this is not the case this will result in the bad input kwargs for some jobs. To only apply the update to the correct jobs, filters can be used. >>> flow.update_kwargs({"number": 10}, name_filter="add") >>> flow.update_kwargs({"number": 10}, function_filter=add) """ for job in self.jobs: job.update_kwargs( update, name_filter=name_filter, function_filter=function_filter, dict_mod=dict_mod, )
[docs] def update_maker_kwargs( self, update: dict[str, Any], name_filter: str | None = None, class_filter: type[jobflow.Maker] | None = None, nested: bool = True, dict_mod: bool = False, ): """ Update the keyword arguments of any :obj:`.Maker` objects in the jobs. Note that updates will be applied to Jobs in any inner Flows. Parameters ---------- update The updates to apply. name_filter A filter for the Maker name. Only Makers with a matching name will be updated. Includes partial matches, e.g. "ad" will match a Maker with the name "adder". class_filter A filter for the maker class. Only Makers with a matching class will be updated. Note the class filter will match any subclasses. nested Whether to apply the updates to Maker objects that are themselves kwargs of Maker, job, or flow objects. See examples for more details. dict_mod Use the dict mod language to apply updates. See :obj:`.DictMods` for more details. Examples -------- Consider the following flow containing jobs from a Maker: >>> from dataclasses import dataclass >>> from jobflow import job, Maker, Flow >>> @dataclass ... class AddMaker(Maker): ... name: str = "add" ... number: float = 10 ... ... @job ... def make(self, a): ... return a + self.number >>> maker = AddMaker() >>> add_job = maker.make(1) >>> flow = Flow([add_job]) The ``number`` argument could be updated in the following ways. >>> flow.update_maker_kwargs({"number": 10}) This will work if all Makers in the flow have a kwarg called number. However, when this is not the case this will result in the bad input kwargs for some Makers. To only apply the update to the correct Makers, filters can be used. >>> flow.update_maker_kwargs({"number": 10}, name_filter="add") >>> flow.update_maker_kwargs({"number": 10}, class_filter=AddMaker) By default, the updates are applied to nested Makers. These are Makers which are present in the kwargs of another Maker. Consider the following case for a Maker that produces a job that restarts. >>> from jobflow import Response >>> @dataclass ... class RestartMaker(Maker): ... name: str = "replace" ... add_maker: Maker = AddMaker() ... ... @job ... def make(self, a): ... restart_job = self.add_maker.make(a) ... return Response(replace=restart_job) >>> maker = RestartMaker() >>> my_job = maker.make(1) >>> flow = Flow([my_job] The following update will apply to the nested ``AddMaker`` in the kwargs of the ``RestartMaker``: >>> flow.update_maker_kwargs({"number": 10}, class_filter=AddMaker) However, if ``nested=False``, then the update will not be applied to the nested Maker: >>> flow.update_maker_kwargs( ... {"number": 10}, class_filter=AddMaker, nested=False ... ) """ for job in self.jobs: job.update_maker_kwargs( update, name_filter=name_filter, class_filter=class_filter, nested=nested, dict_mod=dict_mod, )
[docs] def append_name(self, append_str: str, prepend: bool = False): """ Append a string to the name of the flow and all jobs contained in it. Parameters ---------- append_str A string to append. prepend Prepend the name rather than appending it. """ if prepend: self.name = append_str + self.name else: self.name += append_str for job in self.jobs: job.append_name(append_str, prepend=prepend)
[docs] def update_metadata( self, update: dict[str, Any], name_filter: str | None = None, function_filter: Callable | None = None, dict_mod: bool = False, dynamic: bool = True, ): """ Update the metadata of all Jobs in the Flow. Note that updates will be applied to jobs in nested Flow. Parameters ---------- update The updates to apply. name_filter A filter for the job name. Only jobs with a matching name will be updated. Includes partial matches, e.g. "ad" will match a job with the name "adder". function_filter A filter for the job function. Only jobs with a matching function will be updated. dict_mod Use the dict mod language to apply updates. See :obj:`.DictMods` for more details. dynamic The updates will be propagated to Jobs/Flows dynamically generated at runtime. Examples -------- Consider a flow containing two jobs. >>> from jobflow import job, Flow >>> @job ... def add(a, b): ... return a + b >>> add_job1 = add(5, 6) >>> add_job2 = add(6, 7) >>> flow = Flow([add_job1, add_job2]) The ``metadata`` of both jobs could be updated as follows: >>> flow.update_metadata({"tag": "addition_job"}) """ for job in self.jobs: job.update_metadata( update, name_filter=name_filter, function_filter=function_filter, dict_mod=dict_mod, dynamic=dynamic, )
[docs] def update_config( self, config: jobflow.JobConfig | dict, name_filter: str = None, function_filter: Callable = None, attributes: list[str] | str = None, dynamic: bool = True, ): """ Update the job config of all Jobs in the Flow. Note that updates will be applied to jobs in nested Flow. Parameters ---------- config A JobConfig object or a dict with containing the attributes to update. name_filter A filter for the job name. Only jobs with a matching name will be updated. Includes partial matches, e.g. "ad" will match a job with the name "adder". function_filter A filter for the job function. Only jobs with a matching function will be updated. attributes : Which attributes of the job config to set. Can be specified as one or more attributes specified by their name. dynamic The updates will be propagated to Jobs/Flows dynamically generated at runtime. Examples -------- Consider a flow containing two jobs. >>> from jobflow import job, Flow >>> @job ... def add(a, b): ... return a + b >>> add_job1 = add(5, 6) >>> add_job2 = add(6, 7) >>> flow = Flow([add_job1, add_job2]) The ``config`` of both jobs could be updated as follows: >>> new_config = JobConfig( ... manager_config={"_fworker": "myfworker"}, resolve_references=False ... ) >>> flow.update_config(new_config) To only update specific attributes, the ``attributes`` argument can be set. For example, the following will only update the "manager_config" attribute of the jobs' config. >>> flow.update_config(new_config, attributes="manager_config") Alternatively, the config can be specified as a dictionary with keys that are attributes of the JobConfig object. This allows you to specify updates without having to create a completely new JobConfig object. For example: >>> flow.update_config({"manager_config": {"_fworker": "myfworker"}}) """ for job in self.jobs: job.update_config( config, name_filter=name_filter, function_filter=function_filter, attributes=attributes, dynamic=dynamic, )
[docs] def add_hosts_uuids( self, hosts_uuids: str | list[str] | None = None, prepend: bool = False ): """ Add a list of UUIDs to the internal list of hosts. If hosts_uuids is None the uuid of this Flow will be added to the inner jobs and flow. Otherwise, the passed value will be set both in the list of hosts of the current flow and of the inner jobs and flows. The elements of the list are supposed to be ordered in such a way that the object identified by one UUID of the list is contained in objects identified by its subsequent elements. Parameters ---------- hosts_uuids A list of UUIDs to add. If None the current uuid of the flow will be added to the inner Flows and Jobs. prepend Insert the UUIDs at the beginning of the list rather than extending it. """ if hosts_uuids is not None: if not isinstance(hosts_uuids, (list, tuple)): hosts_uuids = [hosts_uuids] if prepend: self.hosts[0:0] = hosts_uuids else: self.hosts.extend(hosts_uuids) else: hosts_uuids = [self.uuid] for j in self.jobs: j.add_hosts_uuids(hosts_uuids, prepend=prepend)
[docs] def add_jobs(self, jobs: list[Flow | jobflow.Job] | jobflow.Job | Flow): """ Add Jobs or Flows to the Flow. Added objects should not belong to other flows. The list of hosts will be added automatically to the incoming Jobs/Flows based on the hosts of the current Flow. Parameters ---------- jobs A list of Jobs and Flows. """ if not isinstance(jobs, (tuple, list)): jobs = [jobs] job_ids = set(self.all_uuids) hosts = [self.uuid, *self.hosts] for job in jobs: if job.host is not None and job.host != self.uuid: raise ValueError( f"{job.__class__.__name__} {job.name} ({job.uuid}) already belongs " f"to another flow." ) if job.uuid in job_ids: raise ValueError( "jobs array contains multiple jobs/flows with the same uuid " f"({job.uuid})" ) # check for circular dependency of Flows. if isinstance(job, Flow) and self.uuid in job.all_uuids: raise ValueError( f"circular dependency: Flow ({job.uuid}) contains the " f"current Flow ({self.uuid})" ) job_ids.add(job.uuid) job.add_hosts_uuids(hosts) self._jobs += tuple(jobs)
[docs] def remove_jobs(self, indices: int | list[int]): """ Remove jobs from the Flow. It is not possible to remove jobs referenced in the output. Parameters ---------- indices Indices of the jobs to be removed. Accepted values: from 0 to len(jobs) - 1. """ if not isinstance(indices, (list, tuple)): indices = [indices] if any(i < 0 or i >= len(self.jobs) for i in indices): raise ValueError( "Only indices between 0 and the number of the jobs are accepted" ) new_jobs = tuple(j for i, j in enumerate(self.jobs) if i not in indices) uuids: set = set() for job in new_jobs: if isinstance(job, Flow): uuids.update(job.job_uuids) else: uuids.add(job.uuid) # check if the output contains some references to the removed Jobs. references = find_and_get_references(self.output) reference_uuids = {ref.uuid for ref in references} if not reference_uuids.issubset(uuids): raise ValueError( "Removed Jobs/Flows are referenced in the output of the Flow." ) self._jobs = new_jobs
[docs]def get_flow( flow: Flow | jobflow.Job | list[jobflow.Job], ) -> Flow: """ Check dependencies and return flow object. Parameters ---------- flow A job, list of jobs, or flow. Returns ------- Flow A :obj:`Flow` object where connections have been checked. """ if not isinstance(flow, Flow): flow = Flow(jobs=flow) # ensure that we have all the jobs needed to resolve the reference connections job_references = find_and_get_references(flow.jobs) job_reference_uuids = {ref.uuid for ref in job_references} missing_jobs = job_reference_uuids.difference(set(flow.job_uuids)) if len(missing_jobs) > 0: raise ValueError( "The following jobs were not found in the jobs array and are needed to " f"resolve output references:\n{list(missing_jobs)}" ) return flow