This page is available as a Jupyter notebook: tutorials/2-introduction.ipynb.

Introduction to jobflow

In this tutorial, you will:

  • Create several jobs.

  • Combine the Jobs into a Flow.

  • Set up a JobStore - the primary database interface for jobflow.

  • Run a Flow using the custom store.

  • Examine the Flow outputs.

The purpose of this tutorial is to introduce the main concepts of jobflow. It isn’t intended to demonstrate all the features of jobflow or explain things in great detail.

Creating job objects

The building block of jobflows are Job objects. Jobs are delayed calls to python functions whose outputs are stored in a database. The easiest way to create a job is using the @job decorator which be applied to any function, even those with optional parameters.

Note

jobflow encodes job outputs as json using the MontyEncoder feature of the monty package. This means that additional datatypes such as datetime objects, enums, and MSONable classes can also be serialized.

Lets create a decorated function that times how long it takes to load a website.

[2]:
from jobflow import job


@job
def time_website(website: str):
    import urllib.request
    from time import perf_counter

    with urllib.request.urlopen(website) as f:
        start_time = perf_counter()
        f.read()
        end_time = perf_counter()

    return end_time - start_time

Any call to the time_website function will return a Job object rather than executing the function.

[3]:
time_github = time_website("https://www.github.com")
time_google = time_website("https://www.google.com")
time_nyt = time_website("https://www.nytimes.com")

Jobs have outputs that can be accessed using the output attribute. When a job runs, its outputs will be stored in the JobStore. As the job has not yet been executed, the output is currently a reference to the future output.

[4]:
time_github.output
[4]:
OutputReference(aa2a6b1a-4846-4154-94b9-4296b2a64e5d)

The output of a job can be used as an input to another job. To demonstrate this, let’s create a new job function that will sum all its inputs.

[5]:
@job
def sum_numbers(numbers):
    return sum(numbers)


sum_times = sum_numbers([time_github.output, time_google.output, time_nyt.output])

Note, none of the jobs we have created have been executed. The idea of jobflow is to built up a series of (interconnected) jobs and run them at a later date.

Creating Flows

A Flow is a collection of Jobs or other Flow objects. Flows are the primary tool for defining workflows in jobflow. Let’s create a Flow from the jobs we just made. We will repeat the lines we used to create them here for clarity.

[15]:
from jobflow import Flow

time_github = time_website("https://www.github.com")
time_google = time_website("https://www.google.com")
time_nyt = time_website("https://www.nytimes.com")

sum_times = sum_numbers([time_github.output, time_google.output, time_nyt.output])

flow = Flow([time_github, time_google, time_nyt, sum_times])

The order of the jobs in the input array does not matter. Their execution order will be determined by their connectivity. Because sum_times takes the output of time_github etc as an input, the timing jobs will always run first.

The connectivity of the jobs in a flow can be visualized:

[7]:
flow.draw_graph().show()
../_images/tutorials_2-introduction_13_0.png

Similar to Jobs, Flows can also have outputs, however, these are not explicitly stored in the database. Instead, the outputs of a Flow act to structure the outputs of the jobs contained within it.

The Flow we just created does not have any outputs. We could instead “register” the output of the sum job as the output of the flow:

flow = Flow(
    jobs=[time_github, time_google, time_nyt, sum_times],
    output=sum_times.output
)

This would allow the Flow to be used in the input to a job. For example

new_job = sum_times([flow.output, 10])

Note that the Flow output can be structured however you like. For example, you may wish to expose all the job outputs in the form of a dictionary.

flow = Flow(
    jobs=[time_github, time_google, time_nyt, sum_times],
    output={
        "times": [time_github.output, time_google.output, time_nyt.output],
        "sum": sum_times.output
    }
)

When can then be used by subsequent jobs as:

new_job = sum_numbers([flow.output["sum"], 10])

Setting up the JobStore

Anything returned by a Job will be stored in the JobStore, the primary database interface for jobflows.

The JobStore is an implementation of the Store interface provided by the Maggma package. Maggma provides implementations for a number of common databases including:

Jobflow allows you to store outputs in multiple stores — for example, small documents can be stored in a MongoDB database whereas large files can be stored in an S3 store.

In this example we will use a single store type for all documents. In particular, we will use a special type of store called a MemoryStore. This means that any outputs are only stored in memory and not in an external database. For production workflows, we recommend using one of the permanent stores outlined above.

First, we need to create the memory store that will function as the store for all outputs.

[8]:
from maggma.stores import MemoryStore

docs_store = MemoryStore()

Next we can initialize the JobStore with the memory store.

[9]:
from jobflow import JobStore

store = JobStore(docs_store)

This type of JobStore (using a MemoryStore for all outputs) is the default store used by jobflow if a custom store is not set.

Running the Flow

Jobflow supports running Flows locally or on remote clusters. Below we run the Flow locally using the run_locally function using our custom store.

Note

Running Flows on remote clusters can be achieved using the FireWorks package and is covered in the Running Jobflow with FireWorks tutorial.

[10]:
from jobflow.managers.local import run_locally

responses = run_locally(flow, store=store)
2023-12-08 11:15:18,266 INFO Started executing jobs locally
2023-12-08 11:15:18,270 INFO Starting job - time_website (f7831f60-617e-490d-8854-ef3e25a78504)
2023-12-08 11:15:18,873 INFO Finished job - time_website (f7831f60-617e-490d-8854-ef3e25a78504)
2023-12-08 11:15:18,875 INFO Starting job - time_website (993990cd-872e-4c8f-823b-ac4eea8756c6)
2023-12-08 11:15:19,300 INFO Finished job - time_website (993990cd-872e-4c8f-823b-ac4eea8756c6)
2023-12-08 11:15:19,304 INFO Starting job - time_website (3c2d3abd-be33-4857-9943-6526bfb05804)
2023-12-08 11:15:19,774 INFO Finished job - time_website (3c2d3abd-be33-4857-9943-6526bfb05804)
2023-12-08 11:15:19,777 INFO Starting job - sum_numbers (2be44e87-0918-481f-ae77-7a5931074e1e)
2023-12-08 11:15:19,783 INFO Finished job - sum_numbers (2be44e87-0918-481f-ae77-7a5931074e1e)
2023-12-08 11:15:19,785 INFO Finished executing jobs locally

Below is an annotated example of the log message:

2021-06-04 17:36:50,531 INFO Starting job - time_website (d5460743-bb88-4922-8306-0400eb419a91)
                ↑                                 ↑                         ↑
              time                             job name                  job uuid

The job uuid is a unique identifier that is assigned to the job when it is created. It is also used to track the job outputs.

Examine Flow outputs

The run_locally function returns the output of all jobs. The format of the output is:

{
    job_uuid: {
        job_index: {
            Response()
        }
    }
}

The job_index and Response() objects are introduced in later tutorials. The main thing to know is that the Response contains the output of the Job and any other commands for controlling the Flow execution.

[11]:
responses
[11]:
{'f7831f60-617e-490d-8854-ef3e25a78504': {1: Response(output=0.20485366717912257, detour=None, addition=None, replace=None, stored_data=None, stop_children=False, stop_jobflow=False)},
 '993990cd-872e-4c8f-823b-ac4eea8756c6': {1: Response(output=0.06472958298400044, detour=None, addition=None, replace=None, stored_data=None, stop_children=False, stop_jobflow=False)},
 '3c2d3abd-be33-4857-9943-6526bfb05804': {1: Response(output=0.18873387505300343, detour=None, addition=None, replace=None, stored_data=None, stop_children=False, stop_jobflow=False)},
 '2be44e87-0918-481f-ae77-7a5931074e1e': {1: Response(output=0.45831712521612644, detour=None, addition=None, replace=None, stored_data=None, stop_children=False, stop_jobflow=False)}}

We can check the output of specific jobs using:

[12]:
responses[time_github.uuid][1].output
[12]:
0.20485366717912257

In general, it is easier to examine the outputs a job using the JobStore.get_output function. This will query the database (in this case the MemoryStore) and return the output of the job. If using multiple stores, the relevant outputs will be automatically queried from the appropriate stores.

Let’s query the database for the output of the time_github job.

[13]:
store.get_output(time_github.uuid)
[13]:
0.20485366717912257

Next steps

Now that you are more familiar jobflows, we encourage you to learn about all the different options jobflow provides for designing and running workflows. A good next step is the Defining Jobs using jobflow tutorial, which will cover the Job object and job decorator in more detail.