jobflow.utils

jobflow.utils.dict_mods

Implementation of the DictMod language for manipulating dictionaries.

This module enables the modification of a dict using another dict. The main method of interest is apply_mod.

Note

This code is based heavily on the Ansible class of custodian, but simplifies it considerably for the limited use cases required by jobflow.

The original version of this file was written by Shyue Ping Ong and Anubhav Jain.

class jobflow.utils.dict_mods.DictMods[source]

Bases: object

Class to define mongo-like modifications on a dict.

Supported keywords include the following Mongo-based keywords, with the usual meanings (refer to Mongo documentation for information):

  • _inc

  • _set

  • _unset

  • _push

  • _push_all

  • _add_to_set (but _each is not supported)

  • _pop

  • _pull

  • _pull_all

  • _rename

Note

Note that _set does not support modification of nested dicts using the mongo {"a.b":1} notation. This is because mongo does not allow keys with “.” to be inserted. Instead, nested dict modification is supported using a special “->” keyword, e.g. {"a->b": 1}

static set(input_dict, settings)[source]

Set a value.

static unset(input_dict, settings)[source]

Unset a value.

static push(input_dict, settings)[source]

Append a value to a list or create a list it if it doesn’t exist.

static push_all(input_dict, settings)[source]

Extend a list or create a list it if it doesn’t exist.

static inc(input_dict, settings)[source]

Increment a number.

static rename(input_dict, settings)[source]

Rename a key.

static add_to_set(input_dict, settings)[source]

Add an item to a set or create the set if it doesn’t exist.

static pull(input_dict, settings)[source]

Extract a field from a nested dictionary.

static pull_all(input_dict, settings)[source]

Extract many fields from a nested dictionary.

static pop(input_dict, settings)[source]

Pop a value from a list.

jobflow.utils.dict_mods.apply_mod(modification, obj)[source]

Apply a dict mod to an object.

Note that modify makes actual in-place modifications. It does not return a copy.

Parameters:
  • modification (dict[str, Any]) – Modification must be {action_keyword : settings}, where action_keyword is a supported DictMod.

  • obj (dict[str, Any]) – A dict to be modified.

jobflow.utils.enum

Utilities for enumerations.

class jobflow.utils.enum.ValueEnum(value)[source]

Bases: Enum

Enum that serializes to string as the value and can be compared against a str.

as_dict()[source]

Create a serializable representation of the enum.

jobflow.utils.find

Tools for finding and replacing in dictionaries and other objects.

jobflow.utils.find.find_key(d, key, include_end=False, nested=False)[source]

Find the route to key: value pairs in a dictionary.

This function works on nested dictionaries, lists, and tuples.

Parameters:
  • d (dict[Hashable, Any] | list[Any]) – A dict or list of dicts.

  • key (Hashable | type[MSONable]) – A dictionary key or MSONable class to locate.

  • include_end (bool) – Whether to include the key in the route. This has no effect if the key is an MSONable class.

  • nested (bool) – Whether to return nested keys or stop at the first match.

Return type:

A list of routes to where the matches were found.

Examples

>>> data = {
...    "a": [0, {"b": 1, "x": 3}],
...    "c": {"d": {"x": 3}}
... }
>>> find_key(data, "x")
[['a', 1], ['c', 'd']]
>>> find_key(data, "x", include_end=True)
[['a', 1, 'x'], ['c', 'd', 'x']]

The nested argument can be used to control the behaviour of nested keys.

>>> data = {"a": {"x": {"x": 1}}, "b": {"x": 0}}
>>> find_key(data, "x", nested=False)
[['a'], ['b']]
>>> find_key(data, "x", nested=True)
[['a'], ['a', 'x'], ['b']]
jobflow.utils.find.find_key_value(d, key, value)[source]

Find the route to key: value pairs in a dictionary.

This function works on nested dictionaries, lists, and tuples.

Parameters:
  • d (dict[Hashable, Any] | list[Any]) – A dict or list of dicts.

  • key (Hashable) – A dictionary key.

  • value (Hashable) – A value.

Return type:

A tuple of routes to where the matches were found.

Examples

>>> data = {
...    "a": [0, {"b": 1, "x": 3}],
...    "c": {"d": {"x": 3}}
... }
... find_key_value(data, "x", 3)
(['a', 1], ['c', 'd'])
jobflow.utils.find.update_in_dictionary(obj, updates)[source]

Update a dictionary in place at specific locations with a new values.

This function works on nested dictionaries and those containing lists or tuples.

Parameters:
  • obj (dict[Hashable, Any]) – A dictionary to update.

  • updates (dict[tuple, Any]) – The updates to perform, as a dictionary of {location: update}.

Examples

>>> data = {
...    "a": [0, {"b": 1, "x": 3}],
...    "c": {"d": {"x": 3}}
... }
>>> update_in_dictionary(data, {('a', 1, 'x'): 100, ('c', 'd', 'x'): 100})
>>> data
{'a': [0, {'b': 1, 'x': 100}], 'c': {'d': {'x': 100}}}
jobflow.utils.find.contains_flow_or_job(obj)[source]

Find whether an object contains any Flow or Job objects.

Parameters:

obj (Any) – An object.

Returns:

Whether the object contains any Flows or jobs.

Return type:

bool

jobflow.utils.find.get_root_locations(locations)[source]

Filter for the the lowest level locations.

If a parent location is in the list, the child location is removed

Parameters:

locations (list[list]) – A list of locations.

Returns:

  • list[list] – A list of locations with only the lowest level locations.

  • Example usage – >>> _get_root_locations([[“a”, “b”], [“a”], [“c”, “d”]]) [[“a”], [“c”, “d”]]

jobflow.utils.graph

Tools for constructing Job and Flow graphs.

jobflow.utils.graph.itergraph(graph)[source]

Iterate through a graph using a topological sort order.

This means the nodes are yielded such that for every directed edge (u v) node u comes before v in the ordering.

Parameters:

graph (DiGraph) – A networkx graph.

Raises:

ValueError – If the graph contains cycles.

Yields:

str – The node uuid.

jobflow.utils.graph.draw_graph(graph, layout_function=None, figsize=(12, 8))[source]

Draw a networkx graph.

Parameters:
  • graph (DiGraph) – A graph object.

  • layout_function (Callable) – A networkx layout function to use as the graph layout. For example, planar_layout.

  • figsize (tuple[float, float]) – The figure size as a tuple of (width, height).

Returns:

The matplotlib pyplot object.

Return type:

matplotlib.pyplot

jobflow.utils.graph.to_pydot(flow)[source]

Convert a flow to a pydot graph.

Pydot graphs can be visualised using graphviz and support more advanced features than networkx graphs. For example, the pydot graph also includes the flow containers.

Note

Requires pydot and graphviz to be installed.

Parameters:

flow (jobflow.Flow) – A flow.

Returns:

The pydot graph.

Return type:

pydot.Dot

Examples

The pydot graph can be generated from a flow using:

>>> from jobflow import job, Flow
>>> @job
... def add(a, b):
...     return a + b
>>> add_first = add(1, 2)
>>> add_second = add(add_first.output, 2)
>>> my_flow = Flow(jobs=[add_first, add_second])
>>> graph = to_pydot(my_flow)

If graphviz is installed, the pydot graph can be rendered to a file using:

>>> graph.write("output.png", format="png")
jobflow.utils.graph.to_mermaid(flow, show_flow_boxes=False)[source]

Convert a flow to a mermaid graph.

Mermaid syntax allows graphs to be displayed interactively via GitHub, the Mermaid Live Editor at mermaid.live, using the mermaid-cli.

Parameters:
  • flow (Flow or a Job) – A flow or a job.

  • show_flow_boxes (bool) – Whether to show the boxes around nested flows.

Returns:

Mermaid commands to render the graph.

Return type:

str

Examples

The mermaid syntax can be generated from a flow using:

>>> from jobflow import job, Flow
>>> @job
... def add(a, b):
...     return a + b
>>> add_first = add(1, 2)
>>> add_second = add(add_first.output, 2)
>>> my_flow = Flow(jobs=[add_first, add_second])
>>> graph_source = to_mermaid(my_flow)

To render the graph, go to mermaid.live and paste the contents of graph_source.

jobflow.utils.log

Tools for logging.

jobflow.utils.log.initialize_logger(level=20)[source]

Initialize the default logger.

Parameters:

level (int) – The log level.

Returns:

A logging instance with customized formatter and handlers.

Return type:

Logger

jobflow.utils.uuid

Tools for generating UUIDs.

jobflow.utils.uuid.suuid()[source]

Generate a string UUID (universally unique identifier).

Uses the UUID4 specification.

Returns:

A UUID.

Return type:

str