import os
from pathlib import Path
from gwf import Workflow, AnonymousTarget
from gwf.workflow import collect
import glob
GWF workflow
r”“”
Example workflow using mapping between intput and output of each target. It is made to show all the ways information may be passed through an workflow.
input_file1.txt input_file2.txt
file label: 'raw_path' 'raw_path'
| |
| |
template: uppercase_names uppercase_names
| |
| |
file label: 'uppercased_path' 'uppercased_path'
| |
| |
template: divide_names divide_names
/ \ / \
/ \ / \
file label: 'filt_me_path' 'filt_other_path' 'filt_me_path' 'filt_other_path'
\ / \ /
\ / \ /
template: unique_names unique_names
| | | |
| | | |
file label: 'uniq_me_path' 'uniq_other_path' 'uniq_me_path' 'uniq_other_path'
\ \ / /
\ - - - - - - - - - - - / - - - /
\ / - - - - - - - -- - - - - - - \ /
| |
file label: (collected) 'uniq_me_paths' (collected) 'uniq_other_paths'
| |
| |
template: merge_names merge_names
| |
| |
file label: 'output_path' 'output_path'
““”
Imports and utility functions
Instantiate the workflow with the name of the project folder:
# instantiate the workflow
= Workflow(defaults={'account': 'your-project-folder-name'}) gwf
Utility functions:
# utility function
def modify_path(path, **kwargs):
"""
Utility function for modifying file paths substituting
the directory (dir), base name (base), or file suffix (suffix).
"""
for key in ['dir', 'base', 'suffix']:
None)
kwargs.setdefault(key, assert len(kwargs) == 3
= os.path.split(path)
par, name = os.path.splitext(name)
name_no_suffix, suf if type(kwargs['suffix']) is str:
= kwargs['suffix']
suf if kwargs['dir'] is not None:
= kwargs['dir']
par if kwargs['base'] is not None:
= kwargs['base']
name_no_suffix
= os.path.join(par, name_no_suffix + suf)
new_path if type(kwargs['suffix']) is tuple:
assert len(kwargs['suffix']) == 2
= re.subn(r'{}$'.format(kwargs['suffix'][0]), kwargs['suffix'][1], new_path)
new_path, nsubs assert nsubs == 1, nsubs
return new_path
Template functions:
# task template function
def uppercase_names(raw_path):
"""
Formats names to uppercase.
"""
# dir for files produces by task
= 'steps/upper_cased'
output_dir # path of output file
= modify_path(raw_path, dir=output_dir, suffix='_uppercased.txt')
uppercased_path
# input specification
= [raw_path]
inputs # output specification mapping a label to each file
= {'uppercased_path': uppercased_path}
outputs # resource specification
= {'memory': '8g', 'walltime': '00:10:00'}
options
# tmporary output file path
= modify_path(raw_path, dir='/tmp')
tmp_uppercased_path
# commands to run in task (bash script)
# we write to a tmp file and move that to the output directory
# only if the command succeds (the && takes care of that)
= f"""
spec mkdir -p {output_dir}
cat {raw_path} | tr [:lower:] [:upper:] > {tmp_uppercased_path} &&
mv {tmp_uppercased_path} {uppercased_path}
"""
# return target
return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
# task template function
def divide_names(uppercased_path, me=None):
"""
Splits names into two files. One with my name and one with other names.
"""
# uppercased version of the me argument
= me.upper()
uppercased_me
# dir for files produces by task
= 'steps/filtered_names'
output_dir # path of output file with names matching me
= modify_path(uppercased_path, dir=output_dir, suffix=f'_{me}.txt')
filt_me_path # path of output file with other names
= modify_path(uppercased_path, dir=output_dir, suffix=f'_not_{me}.txt')
filt_other_path
# input specification
= [uppercased_path]
inputs # output specification mapping a label to each file
= {'filt_me_path': filt_me_path, 'filt_other_path': filt_other_path}
outputs # resource specification
= {'memory': '8g', 'walltime': '00:10:00'}
options
# tmporary output file paths
= modify_path(filt_me_path, dir='/tmp')
tmp_filt_me_path = modify_path(filt_other_path, dir='/tmp')
tmp_filt_other_path
# commands to run in task (bash script)
# we write to tmp files and move them to the output directory
# only if the command succeds (the && takes care of that)
= f"""
spec mkdir -p {output_dir}
grep {uppercased_me} {uppercased_path} > {tmp_filt_me_path} &&
grep -v {uppercased_me} {uppercased_path} > {tmp_filt_other_path} &&
mv {tmp_filt_me_path} {filt_me_path} &&
mv {tmp_filt_other_path} {filt_other_path}
"""
# return target
return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
# task template function
def unique_names(filt_me_path, filt_other_path):
"""
Extracts unique names from a file.
"""
# dir for files produces by task
= 'steps/unique_names'
output_dir # path of output file with unique names matching me
= modify_path(filt_me_path, dir=output_dir, suffix='_unique.txt')
uniq_me_path # path of output file with unique other names
= modify_path(filt_other_path, dir=output_dir, suffix='_unique.txt')
uniq_other_path
# input specification
= [filt_me_path, filt_other_path]
inputs # output specification mapping a label to each file
= {'unique_me_path': uniq_me_path, 'unique_other_path': uniq_other_path}
outputs # resource specification
= {'memory': '8g', 'walltime': '00:10:00'}
options
# tmporary output file paths
= modify_path(uniq_me_path, dir='/tmp')
tmp_uniq_me_path = modify_path(uniq_other_path, dir='/tmp')
tmp_uniq_other_path
# commands to run in task (bash script)
# we write to tmp files and move them to the output directory
# only if the command succeds (the && takes care of that)
= f"""
spec mkdir -p {output_dir}
sort {filt_me_path} | uniq > {tmp_uniq_me_path} &&
sort {filt_other_path} | uniq > {tmp_uniq_other_path} &&
mv {tmp_uniq_me_path} {uniq_me_path} &&
mv {tmp_uniq_other_path} {uniq_other_path}
"""
# return target
return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
# task template function
def merge_names(paths, output_path):
"""
Merges names from many files.
"""
# dir for files produces by task
= modify_path(output_path, base='', suffix='')
output_dir
# input specification
= [paths]
inputs # output specification mapping a label to the file
= {'path': output_path}
outputs
# tmporary output file path
= modify_path(output_path, dir='/tmp')
tmp_output_path
# resource specification
= {'memory': '8g', 'walltime': '00:10:00'}
options
# commands to run in task (bash script)
# we write to tmp files and move them to the output directory
# only if the command succeds (the && takes care of that)
= f"""
spec mkdir -p {output_dir}
cat {' '.join(paths)} > {tmp_output_path} &&
mv {tmp_output_path} {output_path}
"""
# return target
return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
# task template function
def run_notebook(path, dependencies, memory='8g', walltime='00:10:00', cores=1):
"""
Executes a notebook inplace and saves the output.
"""
# path of output sentinel file
= modify_path(path, base=f'.{str(Path(path).name)}', suffix='.sentinel')
sentinel # sentinel = path.parent / f'.{path.name}'
# input specification
= [path] + dependencies
inputs # output specification mapping a label to each file
= {'sentinel': sentinel}
outputs # resource specification
= {'memory': memory, 'walltime': walltime, 'cores': cores}
options
# commands to run in task (bash script)
= f"""
spec jupyter nbconvert --to notebook --execute --inplace {path} && touch {sentinel}
"""
# return target
return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
Workflow:
# instantiate the workflow
= Workflow(defaults={'account': 'your-project-folder-name'})
gwf
# input files for workflow
= ['data/input_file1.txt', 'data/input_file2.txt']
input_file_names
# workflow parameter
= 'Kasper'
myname
# run an uppercase_names task for each input file
= gwf.map(uppercase_names, input_file_names)
uppercase_names_targets
# run an divide_names task for each output file from uppercase_names
= gwf.map(divide_names, uppercase_names_targets.outputs, extra=dict(me=myname))
filter_names_targets
# run an unique_names task for each output file from divide_names
= gwf.map(unique_names, filter_names_targets.outputs)
unique_names_targets
# collect the outputs labelled 'unique_me_path' from all the outputs of unique_names
= collect(unique_names_targets.outputs, ['unique_me_path'])
collected_outputs
# create a single task to merge all those files into one
= gwf.target_from_template(
merge_me_target 'merge_not_me_name_files',
'unique_me_paths'], "results/merged_me_names.txt")
merge_names(collected_outputs[
)
# collect the outputs labelled 'unique_other_path' from all the outputs of unique_names
= collect(unique_names_targets.outputs, ['unique_other_path'])
collected_outputs
# create a single task to merge all those files into one
= gwf.target_from_template(
merge_other_target 'merge_me_name_files',
'unique_other_paths'], "results/merged_not_me_names.txt")
merge_names(collected_outputs[
)
# make notebooks depend on all output files from workflow
= []
notebook_dependencies for x in gwf.targets.values():
= x.outputs
outputs if type(outputs) is dict:
for o in outputs.values():
notebook_dependencies.append(o)elif type(outputs) is list:
notebook_dependencies.extend(outputs)
# run notebooks in sorted order nb01_, nb02_, ...
for path in glob.glob('notebooks/*.ipynb'):
= gwf.target_from_template(
target
os.path.basename(path), run_notebook(path, notebook_dependencies))# make notebooks depend on all previous notebooks
'sentinel']) notebook_dependencies.append(target.outputs[