A. You need to have installed dot from Graphviz to produce pretty flowcharts likes this:
A. If ruffus is interrupted, for example, by a Ctrl-C, you will often find the following lines of code highlighted:
File "build/bdist.linux-x86_64/egg/ruffus/task.py", line 1904, in pipeline_run
File "build/bdist.linux-x86_64/egg/ruffus/task.py", line 1380, in run_all_jobs_in_task
File "/xxxx/python2.6/multiprocessing/pool.py", line 507, in next
self._cond.wait(timeout)
File "/xxxxx/python2.6/threading.py", line 237, in wait
waiter.acquire()
This is not where ruffus is hanging but the boundary between the main programme process and the sub-processes which run ruffus jobs in parallel.
This is naturally where broken execution threads get washed up onto.
A. If you are using the special regular expression forms "\1", "\2" etc. to refer to matching groups, remember to ‘escape’ the subsitution pattern string. The best option is to use ‘raw’ python strings. For example:
r"\1_substitutes\2correctly\3four\4times"
Ruffus will throw an exception if it sees an unescaped "\1" or "\2" in a file name.
A. Place your decorator after Ruffus decorators. This ensures that by the time Ruffus sees your function, it has already been decorated.
@transform(["example.abc"], suffix(".abc"), ".xyz") @custom_decoration def func(input, output): pass
You will also need to use either @wraps or update_wrapper from functools to write your decorator:
def custom(task_func): """ Decorate a function to print progress """ @wraps(task_func) def wrapper_function(*args, **kwargs): print "Before" task_func(*args, **kwargs) print "After" return wrapper_function
This ensures that the __name__ and __module__ attributes from the task function are made available to Ruffus via your decorator.
A. Yes. Most python decorators wrap themselves around a function. However, Ruffus leaves the original function untouched and unwrapped. Instead, Ruffus adds a pipeline_task attribute to the task function to signal that this is a pipelined function.
This means the original task function can be called just like any other python function.
from ruffus import * import sys @transform("start.input", regex(".+"), ("first_output.txt", "second_output.txt")) def task1(i,o): pass @transform(task1, suffix(".txt"), ".result") def task2(i, o): pass pipeline_printout(sys.stdout, [task2], verbose=3)________________________________________ Tasks which will be run: Task = task1 Job = [start.input ->[first_output.txt, second_output.txt]] Task = task2 Job = [[first_output.txt, second_output.txt] ->first_output.result] ________________________________________
A: This code produces a single output of a tuple of 2 files. In fact, you want two outputs, each consisting of 1 file.
You want a single job (single input) to be produce multiple outputs (multiple jobs in downstream tasks). This is a one-to-many operation which calls for @split:
from ruffus import * import sys @split("start.input", ("first_output.txt", "second_output.txt")) def task1(i,o): pass @transform(task1, suffix(".txt"), ".result") def task2(i, o): pass pipeline_printout(sys.stdout, [task2], verbose=3)________________________________________ Tasks which will be run: Task = task1 Job = [start.input ->[first_output.txt, second_output.txt]] Task = task2 Job = [first_output.txt ->first_output.result] Job = [second_output.txt ->second_output.result] ________________________________________
A. As above, anytime there is a situation which requires a one-to-many operation, you should reach for @split. The advanced form takes a regular expression, making it easier to produce multiple derivatives of the input file. The following example splits 2 jobs each into 3, so that the subsequence task will run 2 x 3 = 6 jobs.
from ruffus import * import sys @split(["1.input_file", "2.input_file"], regex(r"(.+).input_file"), # match file prefix [r"\1.file_type1", r"\1.file_type2", r"\1.file_type3"]) def split_task(input, output): pass @transform(split_task, regex("(.+)"), r"\1.test") def test_split_output(i, o): pass pipeline_printout(sys.stdout, [test_split_output], verbose = 3)Each of the original 2 files have been split in three so that test_split_output will run 6 jobs simultaneously.
________________________________________ Tasks which will be run: Task = split_task Job = [1.input_file ->[1.file_type1, 1.file_type2, 1.file_type3]] Job = [2.input_file ->[2.file_type1, 2.file_type2, 2.file_type3]] Task = test_split_output Job = [1.file_type1 ->1.file_type1.test] Job = [1.file_type2 ->1.file_type2.test] Job = [1.file_type3 ->1.file_type3.test] Job = [2.file_type1 ->2.file_type1.test] Job = [2.file_type2 ->2.file_type2.test] Job = [2.file_type3 ->2.file_type3.test] ________________________________________
A. This is easily accomplished by hijacking the process for checking if jobs are up to date or not (@check_if_uptodate):
from ruffus import * import sys def run_this_before_each_job (*args): print "Calling function before each job using these args", args # Remember to delegate to the default Ruffus code for checking if # jobs need to run. return needs_update_check_modify_time(*args) @check_if_uptodate(run_this_before_each_job) @files([[None, "a.1"], [None, "b.1"]]) def task_func(input, output): pass pipeline_printout(sys.stdout, [task_func])This results in:
________________________________________ >>> pipeline_run([task_func]) Calling function before each job using these args (None, 'a.1') Calling function before each job using these args (None, 'a.1') Calling function before each job using these args (None, 'b.1') Job = [None -> a.1] completed Job = [None -> b.1] completed Completed Task = task_funcNote
Because run_this_before_each_job(...) is called whenever Ruffus checks to see if a job is up to date or not, the function may be called twice for some jobs (e.g. (None, 'a.1') above).
(Thanks to Bernie Pope for sorting this out.)
A. When gmake is interrupted, it will delete the target file it is updating so that the target is remade from scratch when make is next run.
There is no direct support for this in ruffus yet. In any case, the partial / incomplete file may be usefully if only to reveal, for example, what might have caused an interrupting error or exception.
A common Ruffus convention is create an empty checkpoint or “flag” file whose sole purpose is to record a modification-time and the successful completion of a job.
This would be task with a completion flag:
#
# Assuming a pipelined task function named "stage1"
#
@transform(stage1, suffix(".stage1"), [".stage2", ".stage2_finished"] )
def stage2 (input_files, output_files):
task_output_file, flag_file = output_files
cmd = ("do_something2 %(input_file)s >| %(task_output_file)s ")
cmd = cmd % {
"input_file": input_files[0],
"task_output_file": task_output_file
}
if not os.system( cmd ):
#88888888888888888888888888888888888888888888888888888888888888888888888888888
#
# It worked: Create completion flag_file
#
open(flag_file, "w")
#
#88888888888888888888888888888888888888888888888888888888888888888888888888888
The flag_files xxx.stage2_finished indicate that each job is finished. If this is missing, xxx.stage2 is only a partial, interrupted result.
The only thing to be aware of is that the flag file will appear in the list of inputs of the downstream task, which should accordingly look like this:
@transform(stage2, suffix(".stage2"), [".stage3", ".stage3_finished"] )
def stage3 (input_files, output_files):
#888888888888888888888888888888888888888888888888888888888888888888888888888888888
#
# Note that the first parameter is a LIST of input files, the last of which
# is the flag file from the previous task which we can ignore
#
input_file, previous_flag_file = input_files
#
#888888888888888888888888888888888888888888888888888888888888888888888888888888888
task_output_file, flag_file = output_files
cmd = ("do_something3 %(input_file)s >| %(task_output_file)s ")
cmd = cmd % {
"input_file": input_file,
"task_output_file": task_output_file
}
# completion flag file for this task
if not os.system( cmd ):
open(flag_file, "w")
The Bioinformatics example contains code for checkpointing.
A. It is necessary to protect the “entry point” of the program under windows. Otherwise, a new process will be started each time the main module is imported by a new Python interpreter as an unintended side effects. Causing a cascade of new processes.
See: http://docs.python.org/library/multiprocessing.html#multiprocessing-programming
This code works:
if __name__ == '__main__':
try:
pipeline_run([parallel_task], multiprocess = 5)
except Exception, e:
print e.args
A. Sun Grid Engine provides the qrsh command to run an interactive rsh session. qrsh can be used to run commands/scripts in a compute farm or grid cluster.
However, when run within ruffus, qrsh seems to spin idly, polling for input, consuming all the CPU resources in that process.
An interim solution is to close the STDIN for the qrsh invocation:
from subprocess import Popen, PIPE
qrsh_cmd = ["qrsh",
"-now", "n",
"-cwd",
"-p", "-%d" % priority,
"-q", queue_name,
"little_script.py"]
p = Popen(qrsh_cmd, stdin = PIPE)
p.stdin.close()
sts = os.waitpid(p.pid, 0)
A. This seems to be dependent on your setup. One workaround may be to introduce a random time delay at the beginining of your jobs:
import time, random
@parallel(param_func)
def task_in_parallel(input_file, output_file):
"""
Works starts after a random delay so that SGE has a chance to manage the queue
"""
time.sleep(random.random() / 2.0)
# Wake up and do work