This page is available as a Jupyter notebook: tutorials/4-creating-flows.ipynb.

Creating flows in jobflow

In this tutorial, you will:

  • Learn about the Flow object.

  • Set the configuration settings of a flow and its jobs.

The purpose of this tutorial is to delve into the basic functionality of flows and gain a feeling for what is possible. Later tutorials will describe how to create dynamic flows.

Creating job objects

The building block of jobflows are Job objects. Jobs are delayed calls to python functions whose outputs are stored in a database. The easiest way to create a job is using the @job decorator. The job decorator can be applied to any function, even those with optional parameters.

We will start by defining two simple jobs that we will stitch together into a flow.

[2]:
from jobflow import job


@job
def add(a, b, c=2):
    return a + b + c


@job
def mult(a, b):
    return a * b

To combine these two Job objects into a single workflow, we can take advantage of the Flow constructor in Jobflow.

[3]:
from jobflow import Flow

job1 = add(1, 2)
job2 = mult(job1.output, 3)

flow = Flow([job1, job2], name="my-flow")

Because job2 depends on the output of job1, it will only run if/when job1 completes successfully. Jobflow will automatically determine the connectivity of the jobs and run them accordingly. Here, we have also given the flow an optional name, which can be useful for tracking purposes.

Setting Metadata

It can often be useful to attach metadata to jobs and flows before they are run, particularly for querying purposes later on. This can be done with the update_metadata function. The name can also be updated on-the-fly.

[4]:
job1 = add(1, 2)

job1.name = "test"
job1.update_metadata({"tags": ["test"]})

Running Flows

We can run the flow locally by calling the run_locally function.

[5]:
from jobflow.managers.local import run_locally

responses = run_locally(flow)
2023-06-08 09:58:12,065 INFO Started executing jobs locally
2023-06-08 09:58:12,168 INFO Starting job - add (4e9bc8e2-0828-4376-bdc9-dda91ba26d38)
2023-06-08 09:58:12,168 INFO Finished job - add (4e9bc8e2-0828-4376-bdc9-dda91ba26d38)
2023-06-08 09:58:12,169 INFO Starting job - mult (d464616a-7253-41bb-862e-c999393ccc81)
2023-06-08 09:58:12,169 INFO Finished job - mult (d464616a-7253-41bb-862e-c999393ccc81)
2023-06-08 09:58:12,169 INFO Finished executing jobs locally

The output contains a UUID for each job in the flow along with the outputs for each job.

[6]:
for uuid, response in responses.items():
    print(f"{uuid} -> {response}")
4e9bc8e2-0828-4376-bdc9-dda91ba26d38 -> {1: Response(output=5, detour=None, addition=None, replace=None, stored_data=None, stop_children=False, stop_jobflow=False)}
d464616a-7253-41bb-862e-c999393ccc81 -> {1: Response(output=15, detour=None, addition=None, replace=None, stored_data=None, stop_children=False, stop_jobflow=False)}

The UUID can also be obtained from the job object directly, which is useful for indexing the output.

[7]:
print(responses[job2.uuid][1].output)
15