.. include:: ../../global.inc .. _code_template.code: ######################################################################################## Code for flowchart_colours ######################################################################################## * :ref:`Code template ` * :download:`Download <../../static_data/example_scripts/ruffus_template.py>` code This is the standard template we use for developing ruffus pipelines ************************************ Code ************************************ :: #!/usr/bin/env python """ ruffus_template.py [--log_file PATH] [--verbose] [--target_tasks] [--jobs] [--just_print] [--flowchart] [--key_legend_in_graph] [--forced_tasks] """ import sys, os #88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 # options #88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 if __name__ == '__main__': from optparse import OptionParser import StringIO parser = OptionParser(version="%prog 1.0", usage = "\n\n %progs [options]") # # general options: verbosity / logging # parser.add_option("-v", "--verbose", dest = "verbose", action="count", default=0, help="Print more verbose messages for each additional verbose level.") parser.add_option("-L", "--log_file", dest="log_file", metavar="FILE", type="string", help="Name and path of log file") # # pipeline # parser.add_option("-t", "--target_tasks", dest="target_tasks", action="append", default = list(), metavar="JOBNAME", type="string", help="Target task(s) of pipeline.") parser.add_option("-j", "--jobs", dest="jobs", default=1, metavar="N", type="int", help="Allow N jobs (commands) to run simultaneously.") parser.add_option("-n", "--just_print", dest="just_print", action="store_true", default=False, help="Don't actually run any commands; just print the pipeline.") parser.add_option("--flowchart", dest="flowchart", metavar="FILE", type="string", help="Don't actually run any commands; just print the pipeline " "as a flowchart.") # # Less common pipeline options # parser.add_option("--key_legend_in_graph", dest="key_legend_in_graph", action="store_true", default=False, help="Print out legend and key for dependency graph.") parser.add_option("--forced_tasks", dest="forced_tasks", action="append", default = list(), metavar="JOBNAME", type="string", help="Pipeline task(s) which will be included even if they are up to date.") # get help string f =StringIO.StringIO() parser.print_help(f) helpstr = f.getvalue() (options, remaining_args) = parser.parse_args() #vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv # # # Change this if necessary # # # #^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ # # Add names of mandatory options, # strings corresponding to the "dest" parameter # in the options defined above # mandatory_options = [ ] #vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv # # # Change this if necessary # # # #^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ def check_mandatory_options (options, mandatory_options, helpstr): """ Check if specified mandatory options have b een defined """ missing_options = [] for o in mandatory_options: if not getattr(options, o): missing_options.append("--" + o) if not len(missing_options): return raise Exception("Missing mandatory parameter%s: %s.\n\n%s\n\n" % ("s" if len(missing_options) > 1 else "", ", ".join(missing_options), helpstr)) check_mandatory_options (options, mandatory_options, helpstr) #88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 # imports #88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 from ruffus import * #88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 # Functions #88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 #88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 # Logger #88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 if __name__ == '__main__': import logging import logging.handlers MESSAGE = 15 logging.addLevelName(MESSAGE, "MESSAGE") def setup_std_logging (logger, log_file, verbose): """ set up logging using programme options """ class debug_filter(logging.Filter): """ Ignore INFO messages """ def filter(self, record): return logging.INFO != record.levelno class NullHandler(logging.Handler): """ for when there is no logging """ def emit(self, record): pass # We are interesting in all messages logger.setLevel(logging.DEBUG) has_handler = False # log to file if that is specified if log_file: handler = logging.FileHandler(log_file, delay=False) handler.setFormatter(logging.Formatter("%(asctime)s - %(name)s - %(levelname)6s - %(message)s")) handler.setLevel(MESSAGE) logger.addHandler(handler) has_handler = True # log to stderr if verbose if verbose: stderrhandler = logging.StreamHandler(sys.stderr) stderrhandler.setFormatter(logging.Formatter(" %(message)s")) stderrhandler.setLevel(logging.DEBUG) if log_file: stderrhandler.addFilter(debug_filter()) logger.addHandler(stderrhandler) has_handler = True # no logging if not has_handler: logger.addHandler(NullHandler()) # # set up log # logger = logging.getLogger(module_name) setup_std_logging(logger, options.log_file, options.verbose) # # Allow logging across Ruffus pipeline # def get_logger (logger_name, args): return logger from ruffus.proxy_logger import * (logger_proxy, logging_mutex) = make_shared_logger_and_proxy (get_logger, module_name, {}) #88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 # Pipeline #88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 # Put pipeline code here #88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 # Main logic #88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 if __name__ == '__main__': if options.just_print: pipeline_printout(sys.stdout, options.target_tasks, options.forced_tasks, verbose=options.verbose) elif options.flowchart: pipeline_printout_graph ( open(options.flowchart, "w"), # use flowchart file name extension to decide flowchart format # e.g. svg, jpg etc. os.path.splitext(options.flowchart)[1][1:], options.target_tasks, options.forced_tasks, no_key_legend = not options.key_legend_in_graph) else: pipeline_run(options.target_tasks, options.forced_tasks, multiprocess = options.jobs, logger = stderr_logger, verbose = options.verbose)