Using the dataflow Firetasks¶
This group includes custom Firetasks to manage dataflow between Fireworks. The input data and output data are stored in the Firework spec and passed to the subsequent Firetasks and Fireworks via FWAction objects. The module includes:
CommandLineTask
ForeachTask
JoinDictTask
JoinListTask
ImportDataTask
To use a Python function in a dataflow context, see e.g. PyTask.
CommandLineTask¶
The CommandLineTask provides methods to handle commands in a shell with command line options, manage the inputs and outputs of commands and receive file metadata from parent Fireworks and pass file metadata to child Fireworks.
Required parameters¶
command_spec (dict): a dictionary specification of the command. It has the following structure:
command_spec = { 'command': (list), # mandatory, list of strings inputs[0]: (dict), # optional inputs[1]: (dict), # optional # ... outputs[0]: (dict), # optional outputs[1]: (dict), # optional # ... }
Note
When a str
is found instead of dict
for some input or output key, for example inputs[1]: 'string'
, then 'string'
is automatically replaced with {spec['string']}
.
The command
key is a representation of the command as to be used with the
Subprocess package. The optional keys inputs[0]
, inputs[1]
, …,
outputs[0]
, outputs[0]
, …, are
the actual keys specified in inputs
and outputs
.
The dictionaries dict1
, dict2
, etc. have the following schema:
{
'binding': {
prefix: str or None,
separator: str or None
},
'source': {
'type': 'path' or 'data' or 'identifier'
or 'stdin' or 'stdout' or 'stderr' or None,
'value': str or int or float
},
'target': {
'type': 'path' or 'data' or 'identifier'
or 'stdin' or 'stdout' or 'stderr' or None,
'value': str
}
}
Note
If the type
in the source
field is data
then value
can be of types str
, int
and float
.
Note
When a str
is found instead of dict
for some source
, for example {'source': 'string'}
, then string
is replaced with spec['string']
.
Optional parameters¶
inputs (list): a list of keys, one for each input argument; default is empty.
outputs (list): a list of keys, one for each output argument; default is empty.
chunk_number (int): the serial number of the Firetask when it is part of a parallel group generated by a ForeachTask; default is
None
.
Example¶
The following workflow executes the command echo -n Hello world!
in the
command line:
fws:
- fw_id: 1
name: Run a command and store the result
spec:
_tasks:
- _fw_name: CommandLineTask
command_spec:
command: [echo, -n]
input string: {source: input string}
output file:
source: {type: stdout}
target: {type: path, value: /tmp}
inputs: [input string]
outputs: [output file]
input string: {type: data, value: Hello world!}
links: {}
metadata: {}
The STDOUT output is collected and stored in a new file under /tmp
. The
full path of the file is stored in spec of the current Firework and all
child Fireworks with key output file
.
ForeachTask¶
The purpose of ForeachTask is to dynamically branch the workflow between
this Firework and its children by inserting a parallel section of child
Fireworks. The number of the spawned parallel Fireworks is determined by the
length of the list specified by the split
parameter or the optional
number of chunks
parameter. Each child Firework contains a Firetask (of
classes PyTask, CommandLineTask or any Firetask with inputs
parameter)
which processes one element (or one chunk) from this list. The output is passed
to the spec of the Firework(s) right after the detour using a push method,
i.e. the outputs of all parallel Fireworks are collected in a list specified
in the outputs
argument.
Note
the ordering of elements (or chunks) in the resulting outputs
list can be different from that in the original list depending on the execution order of spawned Fireworks.
Required parameters¶
task (dict): a dictionary representation of the Firetask
split (str): a key in spec which contains input data to be distributed over the parallel child Fireworks. This key must also be available both in the
inputs
parameter of the Firetask and in the spec.
Optional parameters¶
number of chunks (int): if provided, the input list, specified with
split
will be divided into this number of sub-lists (chunks) and each chunk will be processed by a separate child Firework. This parameter can be used to reduce the number of parallel Fireworks. Default is the length of the input data list specified insplit
, i.e. for each data element a child Firework will be created.
Example¶
The following example demonstrates the use of ForeachTask in combination with PyTask:
fws:
- fw_id: 1
name: Grind coffee
spec:
_tasks:
- _fw_name: ForeachTask
split: coffee beans
task:
_fw_name: PyTask
func: auxiliary.printurn
inputs: [coffee beans]
outputs: [coffee powder]
coffee beans: [arabica, robusta, liberica]
- fw_id: 2
name: Brew coffee
spec:
_tasks:
- _fw_name: ForeachTask
split: coffee powder
task:
_fw_name: PyTask
func: auxiliary.printurn
inputs: [coffee powder, water]
outputs: [pure coffee]
water: workflowing water
- fw_id: 3
name: Serve coffee
spec:
_tasks:
- _fw_name: PyTask
func: auxiliary.printurn
inputs: [pure coffee]
links:
'1': [2]
'2': [3]
metadata: {}
name: Workflow for many sorts of coffee
In this example the function auxiliary.printurn
prints and returns all
its arguments:
def printurn(*args):
result = []
for arg in args:
if isinstance(arg, list) and len(arg) == 1:
result.append(arg[0])
else:
result.append(arg)
if len(result) == 1:
result = result[0]
print(result)
return result
The module auxiliary
, i.e. the file auxiliary.py
must be in
$PYTHONPATH
.
JoinDictTask¶
This Firetask combines the specified items from spec into a new or existing dictionary in spec.
Required parameters¶
inputs (list): a list of spec keys
output (str): a spec key under which the joined items will be stored
Optional parameters¶
rename (dict): a dictionary with key translations for keys, specified in
inputs
, e.g.{'old name 1': 'new name 1', 'old name 2': 'new name 2'}
JoinListTask¶
This Firetask combines the items specified by spec keys into a new or existing list in spec.
Required parameters¶
inputs (list): a list of spec keys
output (str): a spec key under which the joined items will be stored
Optional parameters¶
None.
ImportDataTask¶
This Firetask updates a dictionary in spec with JSON/YAML data from file in a nested dictionary specified by a map string (see below).
Required parameters¶
filename (str): a filename from which the data is imported. The name must end with either
.json
or.yaml
.mapstring (str): a map string in the format
maplist[0]/maplist[1]/...
. At leastmaplist[0]
has to be defined because this is the key in spec to be used for the import. Every further nesting can be specified by extending the mapstring, for example ifmapstring
ismaplist[0]/maplist[1]
then the data will be imported asspec[maplist[0]][maplist[1]]
.
Optional parameters¶
None.