This page is available as a Jupyter notebook: tutorials/5-dynamic-flows.ipynb.

Creating dynamic flows

In this tutorial, you will:

  • Learn how to create dynamic workflows.

  • Understand the detour, addition, and replace options in the Response object.

The ability create dynamic workflows (i.e. jobs or workflows that launch other jobs or workflows) is a particularly powerful usage pattern in Jobflow.

The Response(replace) option

The main mechanism for creating dynamic jobs in Jobflow is through the Response object. We will demonstrate this below for a toy example where we:

  1. Generate a list of numbers whose length is only determined at runtime.

  2. Perform a toy operation on each number in the list.

While this is a trivial example, a similar usage is common in computational materials science (e.g. you might perform a calculation on a bulk structure, carve all possible surface slabs, and then perform a calculation on each slab). What makes this dynamic is that the number of jobs is only determined at runtime.

[2]:
from random import randint
from jobflow import job, Flow, Response
from jobflow.managers.local import run_locally

@job
def make_list(a):
    return [a] * randint(2, 5)

@job
def add(a, b):
    return a + b

@job
def add_distributed(list_a):
    jobs = []
    for val in list_a:
        jobs.append(add(val, 1))

    flow = Flow(jobs)
    return Response(replace=flow)

job1 = make_list(2)
job2 = add_distributed(job1.output)
flow = Flow([job1, job2])

responses = run_locally(flow)
2023-11-23 23:55:02,807 INFO Started executing jobs locally
2023-11-23 23:55:02,937 INFO Starting job - make_list (d509897c-c318-4b2e-889e-a77331480a58)
2023-11-23 23:55:02,955 INFO Finished job - make_list (d509897c-c318-4b2e-889e-a77331480a58)
2023-11-23 23:55:02,956 INFO Starting job - add_distributed (7e07bfa4-a0cc-4535-b6c8-e3ec78fe96fb)
2023-11-23 23:55:02,963 INFO Finished job - add_distributed (7e07bfa4-a0cc-4535-b6c8-e3ec78fe96fb)
2023-11-23 23:55:03,005 INFO Starting job - add (62d7192e-d2af-4f28-a790-ac4987bd9f41)
2023-11-23 23:55:03,008 INFO Finished job - add (62d7192e-d2af-4f28-a790-ac4987bd9f41)
2023-11-23 23:55:03,009 INFO Starting job - add (d8a7d4aa-8255-417f-9cc8-bc9e16896d33)
2023-11-23 23:55:03,012 INFO Finished job - add (d8a7d4aa-8255-417f-9cc8-bc9e16896d33)
2023-11-23 23:55:03,012 INFO Starting job - add (15ce2067-8138-4692-baf8-8ceada256139)
2023-11-23 23:55:03,014 INFO Finished job - add (15ce2067-8138-4692-baf8-8ceada256139)
2023-11-23 23:55:03,015 INFO Starting job - add (43477f07-28fd-4944-9ae7-02f41282c8d3)
2023-11-23 23:55:03,017 INFO Finished job - add (43477f07-28fd-4944-9ae7-02f41282c8d3)
2023-11-23 23:55:03,018 INFO Finished executing jobs locally
/home/jgeorge/miniconda3/envs/AddJobflowTutorial/lib/python3.10/site-packages/jobflow/utils/graph.py:49: UserWarning: Some jobs are not connected, their ordering may be random
  warnings.warn("Some jobs are not connected, their ordering may be random")
[3]:
for uuid, response in responses.items():
    print(f"{uuid} -> {response}")
d509897c-c318-4b2e-889e-a77331480a58 -> {1: Response(output=[2, 2, 2, 2], detour=None, addition=None, replace=None, stored_data=None, stop_children=False, stop_jobflow=False)}
7e07bfa4-a0cc-4535-b6c8-e3ec78fe96fb -> {1: Response(output=None, detour=None, addition=None, replace=Flow(name='Flow', uuid='ab9475e6-781b-4560-a0cc-4c260f80762a')
1. Job(name='add', uuid='62d7192e-d2af-4f28-a790-ac4987bd9f41')
2. Job(name='add', uuid='d8a7d4aa-8255-417f-9cc8-bc9e16896d33')
3. Job(name='add', uuid='15ce2067-8138-4692-baf8-8ceada256139')
4. Job(name='add', uuid='43477f07-28fd-4944-9ae7-02f41282c8d3'), stored_data=None, stop_children=False, stop_jobflow=False)}
62d7192e-d2af-4f28-a790-ac4987bd9f41 -> {1: Response(output=3, detour=None, addition=None, replace=None, stored_data=None, stop_children=False, stop_jobflow=False)}
d8a7d4aa-8255-417f-9cc8-bc9e16896d33 -> {1: Response(output=3, detour=None, addition=None, replace=None, stored_data=None, stop_children=False, stop_jobflow=False)}
15ce2067-8138-4692-baf8-8ceada256139 -> {1: Response(output=3, detour=None, addition=None, replace=None, stored_data=None, stop_children=False, stop_jobflow=False)}
43477f07-28fd-4944-9ae7-02f41282c8d3 -> {1: Response(output=3, detour=None, addition=None, replace=None, stored_data=None, stop_children=False, stop_jobflow=False)}

As seen above, there are several jobs that were run — certainly more than the two we started with. The first job generates a list of 2’s with a random length. The second job in the flow is what launches a job on each entry in the list. It is replaced by one job for each entry, hence it has no direct output. Then each newly generated job is run.

The Response(addition) option

Beyond replacing a job with downstream jobs, there is also the option to add jobs to the current flow on-the-fly.

Here we will create a simple flow that:

  1. Adds a value to a given number.

  2. If the output is less than 10, do the addition again. Otherwise, stop.

[4]:
@job
def add(a, b):
    return a + b

@job
def add_with_logic(a, b):
    if a < 10:
        return Response(addition=add(a, b))

job1 = add(1, 2)
job2 = add_with_logic(job1.output, 2)
flow = Flow([job1, job2])

responses = run_locally(flow)
2023-11-23 23:55:06,796 INFO Started executing jobs locally
2023-11-23 23:55:06,797 INFO Starting job - add (5630b6ba-1e9a-42ff-be51-ece43fe76643)
2023-11-23 23:55:06,800 INFO Finished job - add (5630b6ba-1e9a-42ff-be51-ece43fe76643)
2023-11-23 23:55:06,801 INFO Starting job - add_with_logic (008b1973-6421-418f-b47d-8f32054fedc8)
2023-11-23 23:55:06,807 INFO Finished job - add_with_logic (008b1973-6421-418f-b47d-8f32054fedc8)
2023-11-23 23:55:06,808 INFO Starting job - add (4e43d6b6-523a-4760-b1cf-3f68916e80fd)
2023-11-23 23:55:06,812 INFO Finished job - add (4e43d6b6-523a-4760-b1cf-3f68916e80fd)
2023-11-23 23:55:06,813 INFO Finished executing jobs locally
[5]:
for uuid, response in responses.items():
    print(f"{uuid} -> {response}")
5630b6ba-1e9a-42ff-be51-ece43fe76643 -> {1: Response(output=3, detour=None, addition=None, replace=None, stored_data=None, stop_children=False, stop_jobflow=False)}
008b1973-6421-418f-b47d-8f32054fedc8 -> {1: Response(output=None, detour=None, addition=Flow(name='Flow', uuid='1994a56f-6dd8-4e35-a187-76d27d808d09')
1. Job(name='add', uuid='4e43d6b6-523a-4760-b1cf-3f68916e80fd'), replace=None, stored_data=None, stop_children=False, stop_jobflow=False)}
4e43d6b6-523a-4760-b1cf-3f68916e80fd -> {1: Response(output=5, detour=None, addition=None, replace=None, stored_data=None, stop_children=False, stop_jobflow=False)}

As you can see above, the addition job is correctly run twice. Now let’s confirm that the addition job is only run once if the output of the first job is greater than 10.

[6]:
@job
def add(a, b):
    return a + b

@job
def add_with_logic(a, b):
    if a < 10:
        return Response(addition=add(a, b))

job1 = add(1, 20)
job2 = add_with_logic(job1.output, 20)
flow = Flow([job1, job2])

responses = run_locally(flow)
2023-11-23 23:55:09,755 INFO Started executing jobs locally
2023-11-23 23:55:09,756 INFO Starting job - add (92e3f817-1350-4492-aeee-ec3501bce66f)
2023-11-23 23:55:09,759 INFO Finished job - add (92e3f817-1350-4492-aeee-ec3501bce66f)
2023-11-23 23:55:09,759 INFO Starting job - add_with_logic (e20fe185-4e34-4422-914f-e0acb4cbd529)
2023-11-23 23:55:09,764 INFO Finished job - add_with_logic (e20fe185-4e34-4422-914f-e0acb4cbd529)
2023-11-23 23:55:09,765 INFO Finished executing jobs locally
[7]:
for uuid, response in responses.items():
    print(f"{uuid} -> {response}")
92e3f817-1350-4492-aeee-ec3501bce66f -> {1: Response(output=21, detour=None, addition=None, replace=None, stored_data=None, stop_children=False, stop_jobflow=False)}
e20fe185-4e34-4422-914f-e0acb4cbd529 -> {1: Response(output=None, detour=None, addition=None, replace=None, stored_data=None, stop_children=False, stop_jobflow=False)}

Now, we see that the Response(addition) does not launch a new job.

In this way, one can also compute the Fibonacci numbers:

[8]:
"""A dynamic workflow that calculates the Fibonacci sequence."""
from jobflow import Response, job, run_locally


@job
def fibonacci(smaller, larger, stop_point=1000):
    """Calculate the next number in the Fibonacci sequence.

    If the number is larger than stop_point, the job will stop the workflow
    execution, otherwise, a new job will be submitted to calculate the next number.
    """
    total = smaller + larger

    if total > stop_point:
        return total

    new_job = fibonacci(larger, total, stop_point=stop_point)
    return Response(output=total, addition=new_job)


fibonacci_job = fibonacci(1, 1)

# run the job; responses will contain the output from all jobs
responses = run_locally(fibonacci_job)

2023-11-23 23:55:13,324 INFO Started executing jobs locally
2023-11-23 23:55:13,325 INFO Starting job - fibonacci (6f7a1e92-577f-4289-85e5-e07a82e542db)
2023-11-23 23:55:13,329 INFO Finished job - fibonacci (6f7a1e92-577f-4289-85e5-e07a82e542db)
2023-11-23 23:55:13,329 INFO Starting job - fibonacci (862ce31a-a963-419a-9862-f419c47c3c9e)
2023-11-23 23:55:13,332 INFO Finished job - fibonacci (862ce31a-a963-419a-9862-f419c47c3c9e)
2023-11-23 23:55:13,333 INFO Starting job - fibonacci (81c28a0a-f1ad-487c-b484-46729f6ca9a6)
2023-11-23 23:55:13,335 INFO Finished job - fibonacci (81c28a0a-f1ad-487c-b484-46729f6ca9a6)
2023-11-23 23:55:13,336 INFO Starting job - fibonacci (084e2022-5a00-4ca8-9edf-46977ca08388)
2023-11-23 23:55:13,338 INFO Finished job - fibonacci (084e2022-5a00-4ca8-9edf-46977ca08388)
2023-11-23 23:55:13,339 INFO Starting job - fibonacci (382f371c-9e92-4185-8dbb-829801ec1afd)
2023-11-23 23:55:13,342 INFO Finished job - fibonacci (382f371c-9e92-4185-8dbb-829801ec1afd)
2023-11-23 23:55:13,342 INFO Starting job - fibonacci (f4521225-993a-4b42-9818-8aaff0da9181)
2023-11-23 23:55:13,345 INFO Finished job - fibonacci (f4521225-993a-4b42-9818-8aaff0da9181)
2023-11-23 23:55:13,346 INFO Starting job - fibonacci (3d475e3e-3774-4015-b056-a23f5cf1da6a)
2023-11-23 23:55:13,349 INFO Finished job - fibonacci (3d475e3e-3774-4015-b056-a23f5cf1da6a)
2023-11-23 23:55:13,351 INFO Starting job - fibonacci (ddc61891-1ce1-4d96-b66e-cb60d52e642a)
2023-11-23 23:55:13,353 INFO Finished job - fibonacci (ddc61891-1ce1-4d96-b66e-cb60d52e642a)
2023-11-23 23:55:13,354 INFO Starting job - fibonacci (499ab638-d8cc-4272-b2b4-f64d246f3676)
2023-11-23 23:55:13,357 INFO Finished job - fibonacci (499ab638-d8cc-4272-b2b4-f64d246f3676)
2023-11-23 23:55:13,357 INFO Starting job - fibonacci (71b7d00f-0109-4a23-8a54-fb29ef143cd2)
2023-11-23 23:55:13,360 INFO Finished job - fibonacci (71b7d00f-0109-4a23-8a54-fb29ef143cd2)
2023-11-23 23:55:13,361 INFO Starting job - fibonacci (d60f3f5d-821a-434a-a8c1-50468b381d5f)
2023-11-23 23:55:13,363 INFO Finished job - fibonacci (d60f3f5d-821a-434a-a8c1-50468b381d5f)
2023-11-23 23:55:13,364 INFO Starting job - fibonacci (7828008e-2f89-4226-aaac-66f25d4ccf0a)
2023-11-23 23:55:13,367 INFO Finished job - fibonacci (7828008e-2f89-4226-aaac-66f25d4ccf0a)
2023-11-23 23:55:13,367 INFO Starting job - fibonacci (062b0cad-2b62-4f42-bd39-ac1b3c90d141)
2023-11-23 23:55:13,370 INFO Finished job - fibonacci (062b0cad-2b62-4f42-bd39-ac1b3c90d141)
2023-11-23 23:55:13,371 INFO Starting job - fibonacci (6c1bf8e4-1808-42b4-a99e-49dd031894bd)
2023-11-23 23:55:13,373 INFO Finished job - fibonacci (6c1bf8e4-1808-42b4-a99e-49dd031894bd)
2023-11-23 23:55:13,374 INFO Starting job - fibonacci (d82e352d-1ca3-40d2-85d1-3f08989d3551)
2023-11-23 23:55:13,376 INFO Finished job - fibonacci (d82e352d-1ca3-40d2-85d1-3f08989d3551)
2023-11-23 23:55:13,377 INFO Finished executing jobs locally

The Response(detour) option

The Response(detour) option behaves similarly to Response(addition). The difference is that Response(addition) will add a job (or flow) to the current flow, while Response(detour) will no longer run the current flow and will switch to a parallel job or flow.

[9]:
@job
def add(a, b):
    return a + b

@job
def add_with_logic(a, b):
    if a < 10:
        return Response(detour=add(a, b))

job1 = add(1, 2)
job2 = add_with_logic(job1.output, 2)
flow = Flow([job1, job2])

responses = run_locally(flow)
2023-11-23 23:55:16,275 INFO Started executing jobs locally
2023-11-23 23:55:16,278 INFO Starting job - add (301d75f0-7042-494a-9f24-cab0428c2fd1)
2023-11-23 23:55:16,281 INFO Finished job - add (301d75f0-7042-494a-9f24-cab0428c2fd1)
2023-11-23 23:55:16,282 INFO Starting job - add_with_logic (97be61a8-eec4-4e64-bf53-ba37621575e7)
2023-11-23 23:55:16,292 INFO Finished job - add_with_logic (97be61a8-eec4-4e64-bf53-ba37621575e7)
2023-11-23 23:55:16,293 INFO Starting job - add (d4c31f68-09ad-418a-ac52-89b303fc2a00)
2023-11-23 23:55:16,296 INFO Finished job - add (d4c31f68-09ad-418a-ac52-89b303fc2a00)
2023-11-23 23:55:16,296 INFO Finished executing jobs locally
[10]:
responses
[10]:
{'301d75f0-7042-494a-9f24-cab0428c2fd1': {1: Response(output=3, detour=None, addition=None, replace=None, stored_data=None, stop_children=False, stop_jobflow=False)},
 '97be61a8-eec4-4e64-bf53-ba37621575e7': {1: Response(output=None, detour=Flow(name='Flow', uuid='0de995a5-1110-4200-b010-276cb2017474')
  1. Job(name='add', uuid='d4c31f68-09ad-418a-ac52-89b303fc2a00'), addition=None, replace=None, stored_data=None, stop_children=False, stop_jobflow=False)},
 'd4c31f68-09ad-418a-ac52-89b303fc2a00': {1: Response(output=5, detour=None, addition=None, replace=None, stored_data=None, stop_children=False, stop_jobflow=False)}}

For this toy example, both Response(addition) and Response(detour) behave identically.