ruffus.Task

ruffus.task – Overview

Decorator syntax:

Pipelined tasks are created by “decorating” a function with the following syntax:

def func_a():
    pass

@follows(func_a)
def func_b ():
    pass

Each task is a single function which is applied one or more times to a list of parameters (typically input files to produce a list of output files).

Each of these is a separate, independent job (sharing the same code) which can be run in parallel.

Running the pipeline

To run the pipeline:

pipeline_run(target_tasks, forcedtorun_tasks = [], multiprocess = 1,
                logger = stderr_logger,
                gnu_make_maximal_rebuild_mode  = True,
                cleanup_log = "../cleanup.log")

pipeline_cleanup(cleanup_log = "../cleanup.log")

Decorators

Basic Task decorators are:

@follows()

and

@files()

Task decorators include:

@split()

@transform()

@merge()

@posttask()

More advanced users may require:

@transform()

@collate()

@parallel()

@check_if_uptodate()

@files_re()

Pipeline functions

pipeline_run

ruffus.task.pipeline_run(target_tasks, forcedtorun_tasks=[], multiprocess=1, logger=stderr_logger, gnu_make_maximal_rebuild_mode=True)

Run pipelines.

Parameters:
  • target_tasks – targets task functions which will be run if they are out-of-date
  • forcedtorun_tasks – task functions which will be run whether or not they are out-of-date
  • multiprocess – The number of concurrent jobs
  • logger (logging objects) – Where progress will be logged. Defaults to stderr output.
  • gnu_make_maximal_rebuild_mode – Defaults to re-running all out-of-date tasks. Runs minimal set to build targets if set to True. Use with caution.
  • verbose – level 0 : nothing level 1 : logs task names and warnings level 2 : logs task description if exists level 3 : logs job names for jobs to be run level 4 : logs list of up-to-date tasks and job names for jobs to be run level 5 : logs job names for all jobs whether up-to-date or not level 10: logs messages useful only for debugging ruffus pipeline code
  • runtime_data – Experimental feature for passing data to tasks at run time
  • one_second_per_job – Defaults to (true) forcing jobs to take a minimum of 1 second to complete
  • touch_file_only – Create or update input/output files only to simulate running the pipeline. Do not run jobs

pipeline_printout

ruffus.task.pipeline_printout(output_stream, target_tasks, forcedtorun_tasks=[], verbose=1, indent=4, gnu_make_maximal_rebuild_mode=True, wrap_width=100, runtime_data=None)

Printouts the parts of the pipeline which will be run

Because the parameters of some jobs depend on the results of previous tasks, this function produces only the current snap-shot of task jobs. In particular, tasks which generate variable number of inputs into following tasks will not produce the full range of jobs.

verbose = 0 : nothing
verbose = 1 : print task name
verbose = 2 : print task description if exists
verbose = 3 : print job names for jobs to be run
verbose = 4 : print list of up-to-date tasks and job names for jobs to be run
verbose = 5 : print job names for all jobs whether up-to-date or not
Parameters:
  • output_stream (file-like object with write() function) – where to print to
  • target_tasks – targets task functions which will be run if they are out-of-date
  • forcedtorun_tasks – task functions which will be run whether or not they are out-of-date
  • verbose – level 0 : nothing level 1 : logs task names and warnings level 2 : logs task description if exists level 3 : logs job names for jobs to be run level 4 : logs list of up-to-date tasks and job names for jobs to be run level 5 : logs job names for all jobs whether up-to-date or not level 10: logs messages useful only for debugging ruffus pipeline code
  • indent – How much indentation for pretty format.
  • gnu_make_maximal_rebuild_mode – Defaults to re-running all out-of-date tasks. Runs minimal set to build targets if set to True. Use with caution.
  • wrap_width – The maximum length of each line
  • runtime_data – Experimental feature for passing data to tasks at run time

pipeline_printout_graph

ruffus.task.pipeline_printout_graph(stream, output_format, target_tasks, forcedtorun_tasks=[], draw_vertically=True, ignore_upstream_of_target=False, skip_uptodate_tasks=False, gnu_make_maximal_rebuild_mode=True, test_all_task_for_update=True, no_key_legend=False, minimal_key_legend=True, user_colour_scheme=None, pipeline_name='Pipeline:', size=(11, 8), dpi=120, runtime_data=None)

print out pipeline dependencies in various formats

Parameters:
  • stream (file-like object with write() function) – where to print to
  • output_format – [“dot”, “jpg”, “svg”, “ps”, “png”]. All but the first depends on the dot program.
  • target_tasks – targets task functions which will be run if they are out-of-date.
  • forcedtorun_tasks – task functions which will be run whether or not they are out-of-date.
  • draw_vertically – Top to bottom instead of left to right.
  • ignore_upstream_of_target – Don’t draw upstream tasks of targets.
  • skip_uptodate_tasks – Don’t draw up-to-date tasks if possible.
  • gnu_make_maximal_rebuild_mode – Defaults to re-running all out-of-date tasks. Runs minimal set to build targets if set to True. Use with caution.
  • test_all_task_for_update – Ask all task functions if they are up-to-date.
  • no_key_legend – Don’t draw key/legend for graph.

Logging

class ruffus.task.t_black_hole_logger
Does nothing!
class ruffus.task.t_stderr_logger
Everything to stderr

Implementation:

Parameter factories:

ruffus.task.merge_param_factory(input_files_task_globs, output_param, *extra_params)
Factory for task_merge
ruffus.task.collate_param_factory(input_files_task_globs, flatten_input, regex, extra_input_files_task_globs, replace_inputs, *output_extra_specs)
Factory for task_collate all [input] which lead to the same [output / extra] are combined together
ruffus.task.files_re_param_factory(input_files_task_globs, combining_all_jobs, regex, extra_input_files_task_globs, *output_and_extras)
Factory for functions which in turn
yield tuples of input_file_name, output_file_name

Usage:

for i, o in param_func():
print ” input file name = ” , i print “output file name = ” , o
..Note::
  1. param_func has to be called each time
  2. glob is called each time. So do not expect the file lists in param_func() to be the same for each invocation
  3. A “copy” of the file list is saved So do not expect to modify your copy of the original list and expect changes to the input/export files
ruffus.task.transform_param_factory(input_files_task_globs, flatten_input, regex, regex_or_suffix, extra_input_files_task_globs, replace_inputs, output_pattern, *extra_specs)
Factory for task_transform
ruffus.task.files_param_factory(input_files_task_globs, flatten_input, do_not_expand_single_job_tasks, output_extras)
Factory for functions which
yield tuples of inputs, outputs / extras

..Note:

1. Each job requires input/output file names
2. Input/output file names can be a string, an arbitrarily nested sequence
3. Non-string types are ignored
3. Either Input or output file name must contain at least one string
ruffus.task.args_param_factory(orig_args)
Factory for functions which
yield tuples of inputs, outputs / extras

..Note:

1. Each job requires input/output file names
2. Input/output file names can be a string, an arbitrarily nested sequence
3. Non-string types are ignored
3. Either Input or output file name must contain at least one string
ruffus.task.split_param_factory(input_files_task_globs, output_files_task_globs, *extra_params)
Factory for task_split

Wrappers around jobs:

ruffus.task.job_wrapper_generic(param, user_defined_work_func, register_cleanup, touch_files_only)
run func
ruffus.task.job_wrapper_io_files(param, user_defined_work_func, register_cleanup, touch_files_only)
run func on any i/o if not up to date
ruffus.task.job_wrapper_mkdir(param, user_defined_work_func, register_cleanup, touch_files_only)
make directories if not exists

Checking if job is update:

ruffus.task.needs_update_check_modify_time(*params)

Given input and output files, see if all exist and whether output files are later than input files Each can be

  1. string: assumed to be a filename “file1”
  2. any other type
  3. arbitrary nested sequence of (1) and (2)
ruffus.task.needs_update_check_directory_missing(dirs)
Called per directory:
Does it exist? Is it an ordinary file not a directory? (throw exception

Exceptions and Errors