.. include:: global.inc
******
FAQ
******
^^^^^^^^^^^^^^^^^
General
^^^^^^^^^^^^^^^^^
=========================================================
Q. *Ruffus* won't create dependency graphs
=========================================================
A. You need to have installed ``dot`` from `Graphviz `_ to produce
pretty flowcharts likes this:
.. image:: images/pretty_flowchart.png
=========================================================
Q. *Ruffus* seems to be hanging in the same place
=========================================================
A. If *ruffus* is interrupted, for example, by a Ctrl-C,
you will often find the following lines of code highlighted::
File "build/bdist.linux-x86_64/egg/ruffus/task.py", line 1904, in pipeline_run
File "build/bdist.linux-x86_64/egg/ruffus/task.py", line 1380, in run_all_jobs_in_task
File "/xxxx/python2.6/multiprocessing/pool.py", line 507, in next
self._cond.wait(timeout)
File "/xxxxx/python2.6/threading.py", line 237, in wait
waiter.acquire()
This is *not* where *ruffus* is hanging but the boundary between the main programme process
and the sub-processes which run *ruffus* jobs in parallel.
This is naturally where broken execution threads get washed up onto.
=========================================================
Q. Regular expression substitutions don't work
=========================================================
A. If you are using the special regular expression forms ``"\1"``, ``"\2"`` etc.
to refer to matching groups, remember to 'escape' the subsitution pattern string.
The best option is to use `'raw' python strings `_.
For example:
::
r"\1_substitutes\2correctly\3four\4times"
Ruffus will throw an exception if it sees an unescaped ``"\1"`` or ``"\2"`` in a file name.
======================================================================================
Q. How to use decorated functions in Ruffus
======================================================================================
A. Place your decorator after Ruffus decorators. This ensures that by the time Ruffus sees
your function, it has already been decorated.
::
@transform(["example.abc"], suffix(".abc"), ".xyz")
@custom_decoration
def func(input, output):
pass
You will also need to use either ``@wraps`` or ``update_wrapper`` from ``functools``
to write your decorator:
::
def custom(task_func):
""" Decorate a function to print progress
"""
@wraps(task_func)
def wrapper_function(*args, **kwargs):
print "Before"
task_func(*args, **kwargs)
print "After"
return wrapper_function
This ensures that the ``__name__`` and ``__module__`` attributes from the task function
are made available to Ruffus via your decorator.
======================================================================================
Q. Can a task function in a Ruffus pipeline be called normally outside of Ruffus?
======================================================================================
A. Yes. Most python decorators wrap themselves around a function. However, Ruffus leaves the
original function untouched and unwrapped. Instead, Ruffus adds a ``pipeline_task`` attribute
to the task function to signal that this is a pipelined function.
This means the original task function can be called just like any other python function.
======================================================================================
Q. My tasks creates two files but why does only one survive in the ruffus pipeline?
======================================================================================
::
from ruffus import *
import sys
@transform("start.input", regex(".+"), ("first_output.txt", "second_output.txt"))
def task1(i,o):
pass
@transform(task1, suffix(".txt"), ".result")
def task2(i, o):
pass
pipeline_printout(sys.stdout, [task2], verbose=3)
::
________________________________________
Tasks which will be run:
Task = task1
Job = [start.input
->[first_output.txt, second_output.txt]]
Task = task2
Job = [[first_output.txt, second_output.txt]
->first_output.result]
________________________________________
A: This code produces a single output of a tuple of 2 files. In fact, you want two
outputs, each consisting of 1 file.
You want a single job (single input) to be produce multiple outputs (multiple jobs
in downstream tasks). This is a one-to-many operation which calls for
:ref:`@split `:
::
from ruffus import *
import sys
@split("start.input", ("first_output.txt", "second_output.txt"))
def task1(i,o):
pass
@transform(task1, suffix(".txt"), ".result")
def task2(i, o):
pass
pipeline_printout(sys.stdout, [task2], verbose=3)
::
________________________________________
Tasks which will be run:
Task = task1
Job = [start.input
->[first_output.txt, second_output.txt]]
Task = task2
Job = [first_output.txt
->first_output.result]
Job = [second_output.txt
->second_output.result]
________________________________________
======================================================================================
Q. How can a Ruffus task produce output which goes off in different directions?
======================================================================================
A. As above, anytime there is a situation which requires a one-to-many operation, you should reach
for :ref:`@split `. The advanced form takes a regular expression, making
it easier to produce multiple derivatives of the input file. The following example splits
*2* jobs each into *3*, so that the subsequence task will run *2* x *3* = *6* jobs.
::
from ruffus import *
import sys
@split(["1.input_file",
"2.input_file"],
regex(r"(.+).input_file"), # match file prefix
[r"\1.file_type1",
r"\1.file_type2",
r"\1.file_type3"])
def split_task(input, output):
pass
@transform(split_task, regex("(.+)"), r"\1.test")
def test_split_output(i, o):
pass
pipeline_printout(sys.stdout, [test_split_output], verbose = 3)
Each of the original 2 files have been split in three so that test_split_output will run
6 jobs simultaneously.
::
________________________________________
Tasks which will be run:
Task = split_task
Job = [1.input_file ->[1.file_type1, 1.file_type2, 1.file_type3]]
Job = [2.input_file ->[2.file_type1, 2.file_type2, 2.file_type3]]
Task = test_split_output
Job = [1.file_type1 ->1.file_type1.test]
Job = [1.file_type2 ->1.file_type2.test]
Job = [1.file_type3 ->1.file_type3.test]
Job = [2.file_type1 ->2.file_type1.test]
Job = [2.file_type2 ->2.file_type2.test]
Job = [2.file_type3 ->2.file_type3.test]
________________________________________
======================================================================================
Q. Can I call extra code before each job?
======================================================================================
A. This is easily accomplished by hijacking the process
for checking if jobs are up to date or not (:ref:`@check_if_uptodate `):
::
from ruffus import *
import sys
def run_this_before_each_job (*args):
print "Calling function before each job using these args", args
# Remember to delegate to the default Ruffus code for checking if
# jobs need to run.
return needs_update_check_modify_time(*args)
@check_if_uptodate(run_this_before_each_job)
@files([[None, "a.1"], [None, "b.1"]])
def task_func(input, output):
pass
pipeline_printout(sys.stdout, [task_func])
This results in:
::
________________________________________
>>> pipeline_run([task_func])
Calling function before each job using these args (None, 'a.1')
Calling function before each job using these args (None, 'a.1')
Calling function before each job using these args (None, 'b.1')
Job = [None -> a.1] completed
Job = [None -> b.1] completed
Completed Task = task_func
.. note::
Because ``run_this_before_each_job(...)`` is called whenever Ruffus checks to see if
a job is up to date or not, the function may be called twice for some jobs
(e.g. ``(None, 'a.1')`` above).
=========================================================================================================
Q. Does *Ruffus* allow checkpointing: to distinguish interrupted and completed results?
=========================================================================================================
(Thanks to Bernie Pope for sorting this out.)
A. When gmake is interrupted, it will delete the target file it is updating so that the target is
remade from scratch when make is next run.
There is no direct support for this in *ruffus* **yet**. In any case, the partial / incomplete
file may be usefully if only to reveal, for example, what might have caused an interrupting error
or exception.
A common *Ruffus* convention is create an empty checkpoint or "flag" file whose sole purpose
is to record a modification-time and the successful completion of a job.
This would be task with a completion flag:
::
#
# Assuming a pipelined task function named "stage1"
#
@transform(stage1, suffix(".stage1"), [".stage2", ".stage2_finished"] )
def stage2 (input_files, output_files):
task_output_file, flag_file = output_files
cmd = ("do_something2 %(input_file)s >| %(task_output_file)s ")
cmd = cmd % {
"input_file": input_files[0],
"task_output_file": task_output_file
}
if not os.system( cmd ):
#88888888888888888888888888888888888888888888888888888888888888888888888888888
#
# It worked: Create completion flag_file
#
open(flag_file, "w")
#
#88888888888888888888888888888888888888888888888888888888888888888888888888888
The flag_files ``xxx.stage2_finished`` indicate that each job is finished. If this is missing,
``xxx.stage2`` is only a partial, interrupted result.
The only thing to be aware of is that the flag file will appear in the list of inputs of the
downstream task, which should accordingly look like this:
::
@transform(stage2, suffix(".stage2"), [".stage3", ".stage3_finished"] )
def stage3 (input_files, output_files):
#888888888888888888888888888888888888888888888888888888888888888888888888888888888
#
# Note that the first parameter is a LIST of input files, the last of which
# is the flag file from the previous task which we can ignore
#
input_file, previous_flag_file = input_files
#
#888888888888888888888888888888888888888888888888888888888888888888888888888888888
task_output_file, flag_file = output_files
cmd = ("do_something3 %(input_file)s >| %(task_output_file)s ")
cmd = cmd % {
"input_file": input_file,
"task_output_file": task_output_file
}
# completion flag file for this task
if not os.system( cmd ):
open(flag_file, "w")
The :ref:`Bioinformatics example` contains :ref:`code ` for checkpointing.
^^^^^^^^^^^^^^^
Windows
^^^^^^^^^^^^^^^
=========================================================
Q. Windows seems to spawn *ruffus* processes recursively
=========================================================
A. It is necessary to protect the "entry point" of the program under windows.
Otherwise, a new process will be started each time the main module is imported
by a new Python interpreter as an unintended side effects. Causing a cascade
of new processes.
See: http://docs.python.org/library/multiprocessing.html#multiprocessing-programming
This code works::
if __name__ == '__main__':
try:
pipeline_run([parallel_task], multiprocess = 5)
except Exception, e:
print e.args
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Sun Grid Engine
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
=========================================================
Q. *qrsh* eats up all my processor time under *ruffus*
=========================================================
A. `Sun Grid Engine `_ provides the
`qrsh `_
command to run an interactive rsh session. ``qrsh`` can
be used to run commands/scripts in a compute farm or grid cluster.
However, when run within *ruffus*, ``qrsh`` seems to spin idly, polling for input, consuming
all the CPU resources in that process.
An interim solution is to close the ``STDIN`` for the ``qrsh`` invocation::
from subprocess import Popen, PIPE
qrsh_cmd = ["qrsh",
"-now", "n",
"-cwd",
"-p", "-%d" % priority,
"-q", queue_name,
"little_script.py"]
p = Popen(qrsh_cmd, stdin = PIPE)
p.stdin.close()
sts = os.waitpid(p.pid, 0)
=====================================================================
Q. When I submit lots of jobs at the same time, SGE freezes and dies
=====================================================================
A. This seems to be dependent on your setup. One workaround may be to
introduce a random time delay at the beginining of your jobs::
import time, random
@parallel(param_func)
def task_in_parallel(input_file, output_file):
"""
Works starts after a random delay so that SGE has a chance to manage the queue
"""
time.sleep(random.random() / 2.0)
# Wake up and do work