From 51bf4d2fbbe7f8499566d7fb018c54938e20e261 Mon Sep 17 00:00:00 2001 From: John Eppley Date: Tue, 5 Dec 2017 13:44:38 -1000 Subject: [PATCH] Added HMMer support with fragmented DBs instead of fasta --- batch_launcher.py | 363 +++++++++++++++++++++++++++++++++++----------- 1 file changed, 278 insertions(+), 85 deletions(-) diff --git a/batch_launcher.py b/batch_launcher.py index c100882..26c5e37 100755 --- a/batch_launcher.py +++ b/batch_launcher.py @@ -259,12 +259,12 @@ def gbRecordSizer(recordLines): record = SeqIO.read(recordLines,'genbank') return len(record) -def fragmentInput(infile, options, fragmentBase, suffix='.in'): +def fragmentInput(infile, options, frag_base): """ Wraps the following methods into one: getFileType(options, infile) getSizePerChunk(infile, options.splits, fileType, splitOnSize=options.splitOnSize) - fragmentInputBySize(infile, tmpdir, options.chunk, fileType, fragmentBase, splitOnSize=options.splitOnSize, suffix=) + fragmentInputBySize(infile, tmpdir, options.chunk, fileType, frag_base, splitOnSize=options.splitOnSize, suffix=) """ fileType=getFileType(options, infile) # if we had to get from file name, save: @@ -280,44 +280,112 @@ def fragmentInput(infile, options, fragmentBase, suffix='.in'): dont_fragment=True options.chunk=getSizePerChunk(infile, options.splits, fileType, splitOnSize=options.splitOnSize) + # Are we going to save fragments for later reuse? + if options.keepFragments: + options.frag_dir = get_resuable_fragment_dir(infile, options.chunk, + options.splitOnSize) + if options.fragBase is None: + options.fragBase = 'chunk' + suffix = os.path.splitext(infile)[1] + if options.fragSuff is None: + options.fragSuff = suffix + num = count_existing_fragments(options.frag_dir, options.fragBase, + options.fragSuff, infile) + if num > 0: + # fragments already exist, let's continue + return num + else: + options.frag_dir = options.tmpDir + if options.fragBase is None: + options.fragBase = "file.fragment" + suffix = '.in' + if options.fragSuff is None: + options.fragSuff = suffix + # fragment input file if dont_fragment: # Just symlink to tmp_dir logging.debug("Since only one fragment requested, entire source file will be linked to tmp_dir") - link_source_into_tmp(infile, options.tmpDir, options.fragBase) + link_source_into_tmp(infile, options.frag_dir, options.fragBase, + options.fragSuff) + options.keepFragments = False return 1 else: return fragmentInputBySize(infile, - options.tmpDir, + options.frag_dir, options.chunk, fileType, options.fragBase, - splitOnSize=options.splitOnSize) + splitOnSize=options.splitOnSize, + suffix=options.fragSuff) + +def get_resuable_fragment_dir(infile, chunk, splitOnSize): + """ + ge consitent place to put re-usable file fragments. Something like: + # {dirname(infile)}/.batch_launcher/{basename(infile)}/chunk_size + """ + fdir = os.path.join(os.path.dirname(infile), + '.batch_launcher', + os.path.splitext(os.path.basename(infile))[0], + 'chunk_{}{}'.format(chunk, + 'size' if splitOnSize else '')) + if not os.path.exists(fdir): + os.makedirs(fdir) + elif not os.path.isdir: + raise Exception("Desired fragment dir ({}) alredy exists as a file! " + "Rename your input file or omit the -K " + "option").format(fdir) + return fdir + -def link_source_into_tmp(infile, tmpdir, fragmentBase, +def link_source_into_tmp(infile, tmpdir, frag_base, suffix='.in'): """ create symlink in tmp dir that points to input file """ logging.debug("Linking input input: %r" % ({'infile':infile, 'tmpDir':tmpdir, - 'base':fragmentBase})) - tmpFileName=getFragmentPath(tmpdir,fragmentBase,1,suffix) - os.link(infile, tmpFileName) + 'base':frag_base})) + tmp_file_name=getFragmentPath(tmpdir, frag_base, 1, suffix) + os.link(infile, tmp_file_name) return 1 -def fragmentInputBySize(infile, tmpdir, chunk, fileType, fragmentBase, splitOnSize=True, suffix='.in'): +def count_existing_fragments(folder, frag_base, suffix, input_file): + """ + Return count of fragments newer than input, return -1 + if input is newer than any + """ + input_file_time = os.path.getmtime(input_file) + num = 0 + count = 0 + logging.info("Looking for existing fragments in %s", folder) + while True: + num += 1 + tmp_file_name=getFragmentPath(folder, frag_base, num, suffix) + logging.debug("checkking %s", tmp_file_name) + if os.path.exists(tmp_file_name): + count += 1 + if os.path.getmtime(tmp_file_name) < input_file_time: + # if any file is older, return -1 + return -1 + else: + # at first missing file, return count + return count + +def fragmentInputBySize(infile, tmpdir, chunk, fileType, frag_base, splitOnSize=True, suffix='.in'): """ Break up input into files of size chunk in tmpdir. Return number of fragments. """ - logging.debug("Fragmenting input: %r" % ({'infile':infile,'tmpDir':tmpdir,'chunk':chunk,'base':fragmentBase})) + logging.debug("Fragmenting input: %r" % + ({'infile':infile,'tmpDir':tmpdir,'chunk':chunk,'base':frag_base})) inhandle = openInputFile(infile) - num = fragmentInputStreamBySize(inhandle, tmpdir, chunk, fileType, fragmentBase, splitOnSize=splitOnSize, suffix=suffix) + num = fragmentInputStreamBySize(inhandle, tmpdir, chunk, fileType, + frag_base, splitOnSize=splitOnSize, suffix=suffix) if infile is not None: inhandle.close() return num -def fragmentInputStreamBySize(inhandle, tmpdir, chunk, fileType, fragmentBase, splitOnSize=True, suffix='.in'): +def fragmentInputStreamBySize(inhandle, tmpdir, chunk, fileType, frag_base, splitOnSize=True, suffix='.in'): if splitOnSize: # get a custom function that returns the size of this type of record recordSizer=fileType.sizer @@ -327,9 +395,9 @@ def fragmentInputStreamBySize(inhandle, tmpdir, chunk, fileType, fragmentBase, s count=0 num=1 - tmpFileName=getFragmentPath(tmpdir,fragmentBase,num,suffix) + tmp_file_name=getFragmentPath(tmpdir, frag_base, num, suffix) #logging.debug('Writing fragment (%d,%d,%d): %s' % (chunk,count,num,tmpFileName)) - tmpFile = open(tmpFileName, 'w') + tmpFile = open(tmp_file_name, 'w') for record in fileType.recordStreamer(inhandle): recordSize=recordSizer(record) count+=recordSize @@ -338,9 +406,9 @@ def fragmentInputStreamBySize(inhandle, tmpdir, chunk, fileType, fragmentBase, s # close previous chunk and open new one tmpFile.close num+=1 - tmpFileName=getFragmentPath(tmpdir,fragmentBase,num,suffix) + tmp_file_name=getFragmentPath(tmpdir, frag_base, num, suffix) #logging.debug('Writing fragment (%d,%d,%d): %s' % (chunk,count,num,tmpFileName)) - tmpFile = open(tmpFileName, 'w') + tmpFile = open(tmp_file_name, 'w') count=recordSize # write record @@ -402,21 +470,27 @@ def linedRecordGenerator(fileType, stream): if len(lastRecord)>0: yield lastRecord -def addFragmentingOptions(parser,defaults={"splits":400}): - parser.add_option("-L", "--recordLines", metavar="NUMLINES", dest='numLines', default=None, type="int", +def addFragmentingOptions(parser, defaults={"splits":400}): + option_group = parser.add_option_group('Fragmenting Options') + + option_group.add_option("-L", "--recordLines", metavar="NUMLINES", dest='numLines', default=None, type="int", help="Number of lines per record") - parser.add_option("-P", "--pattern", metavar="PATTERN", dest='pattern', default=None, + option_group.add_option("-P", "--pattern", metavar="PATTERN", dest='pattern', default=None, help="Regular expression to split records") - parser.add_option("-T","--infileType", dest='infileType', default=None, + option_group.add_option("-T","--infileType", dest='infileType', default=None, choices=list(fileTypeMap.keys()), help='Type of input file. Otherwise, choose by extension. Known types are: %choices') - parser.add_option("-C", "--chunkSize", type="int", dest='chunk', metavar="FRAG_SIZE", + option_group.add_option("-C", "--chunkSize", type="int", dest='chunk', metavar="FRAG_SIZE", help="The number of records per fragment. Overrides NUM_FRAGS") default=defaults.get("splits",None) - parser.add_option("-N", "--numChunks", dest='splits', type='int', metavar="NUM_FRAGS", default=default, + option_group.add_option("-N", "--numChunks", dest='splits', type='int', metavar="NUM_FRAGS", default=default, help="The number of fragments to create (defaults to %default)") - parser.add_option("-s", "--splitOnSize", default=False, action='store_true', + option_group.add_option("-s", "--splitOnSize", default=False, action='store_true', help="create chunks based on record size, not number of records. For known sequence types (fasta, fastq, gb), sequence length is used, otherwize the full size of the record text is counted") + option_group.add_option("-K", "--keepFragments", default=False, + action='store_true', + help=("keep fragments for later re-use (useful for" + "HMMs)")) ################# # Classes @@ -445,10 +519,12 @@ def recordStreamer(self, stream): FASTQ=FragmentableFileType('fastq',sizer=fastqRecordSizer,numLines=4) GENBANK=FragmentableFileType('gb',sizer=gbRecordSizer,sepRE=re.compile(r'^LOCUS')) TABLE=FragmentableFileType('table',sepRE=re.compile(r'^')) +HMM=FragmentableFileType('hmm',sepRE=re.compile(r'^HMMER')) fileTypeMap={FASTA.name:FASTA, FASTQ.name:FASTQ, GENBANK.name:GENBANK, TABLE.name:TABLE, + HMM.name:HMM, } fileExtensionMap={'.fa':FASTA, '.fna':FASTA, @@ -464,6 +540,7 @@ def recordStreamer(self, stream): '.tsv':TABLE, '.csv':TABLE, '.m8':TABLE, + '.hmm':HMM, } def main(): @@ -472,14 +549,33 @@ def main(): description = """ Given an input file with multiple records, an output file, and a command: process the records in parallel across a cluster or multiprocesser server. """ - parser = OptionParser(usage, description=description) + parser.add_option("--version", dest="version", default=False, + action="store_true", help="print version") + parser.add_option("-A", "--about", + action="store_true", dest="about", default=False, + help="Print description") + + # logging + option_group = parser.add_option_group('Logging Options') + option_group.add_option("-l", "--logToFile", default=False, action="store_true", + help="print logging messages to file") + option_group.add_option("-v", "--verbose", + action="count", dest="verbose", default=1, + help="Print log messages. Use twice for debugging") + option_group.add_option("-q", '--quiet', dest='verbose', + action="store_const", const=0, + help="Suppress warnings. Only print fatal messages") + option_group.add_option("-V","--loglevel", type='int', dest="verbose", + help="Shortcut to set verbosity directly") + # options for processing command - parser.add_option("-G", "--ignore", default=False, action='store_true', + option_group = parser.add_option_group('Command Parsing Options') + option_group.add_option("-G", "--ignore", default=False, action='store_true', help="Do not perform checks based on the program name (in the command). Batch_launcher will skip need you to indicate input/output/threads with -i,-o, and -t, and it will skip any program specific processing (e.g. db checks for blast)") - parser.add_option("-i", "--inputFlag", metavar="FLAG", + option_group.add_option("-i", "--inputFlag", metavar="FLAG", help="Indicate where in command to find the input file to fragment. The value can be an option flag (eg '-i' or '-fasta') or a number indicating a positional argument. Only needed if program in command is not recognized.") - parser.add_option("-o", "--outputFlags", action='append',metavar='FLAG', + option_group.add_option("-o", "--outputFlags", action='append',metavar='FLAG', help="""Indicate where in command (or not) to find the output file. The value can be an option flag or positional argument (as with -i, above), a file name (if the command always produces the same output file), '%e.EXT' if the output file @@ -487,61 +583,54 @@ def main(): input fie with an additional extension, or '%s/foo/bar/' if the output file is the input file with a regexp substitution. Multiple values are permitted to account for multiple outputs.""") - parser.add_option("-p", "--prefixFlag", + option_group.add_option("-p", "--prefixFlag", help="Indicate where in command to find a prefix for output files. Output extension flags '%p.ext' indicate that the output file will be PREFIX.ext.") - parser.add_option("-t", "--threadsFlag", metavar="FLAG", + option_group.add_option("-t", "--threadsFlag", metavar="FLAG", help="Option to use to tell command how many threads to use") # ways to customize batch behavior addFragmentingOptions(parser) - parser.add_option("-X", "--queue", default=AUTO, + + # way to customize execution + option_group = parser.add_option_group('Fragment Execution Options') + option_group.add_option("-X", "--queue", default=AUTO, choices=[SGE,SLURM,LOCAL,AUTO], help="""What type of scheduler to use: 'sge', 'slurm', or 'local' (just use threads and command-line). By default, it will check for the qsub (from sge) and sbatch (from slurm) binaries in the PATH and go with the first one found.""") - parser.add_option("-R", "--throttle", default=0, type='int', + option_group.add_option("-R", "--throttle", default=0, type='int', help="Limit number of simultaneously executing fragemnts to given number. Default: 0 => unlimited") - parser.add_option("-w", "--wait", default=False, action='store_true', + option_group.add_option("-w", "--wait", default=False, action='store_true', help="Wait for jobs to finish before exiting script (only when using SGE)") - parser.add_option("-c","--cwd",dest='cwd', + option_group.add_option("-c","--cwd",dest='cwd', default=False,action='store_true', help='Run fragment in current directory, otherwise it will be executed in the tmp location (as configured in Python, usu a local disk like /tmp) of the node (which is good for programs like softnerry that create lots of temporary files)') - parser.add_option("-r","--retries",default=-1,type='int', + option_group.add_option("-r","--retries",default=-1,type='int', help='number of times to resubmit failed tasks. Less than zero (the default) means continue as long as some new results come through each time') - parser.add_option("-j", "--jobName", metavar="STRING", + option_group = parser.add_option_group('Advanced Fragment Execution Options') + option_group.add_option("-j", "--jobName", metavar="STRING", help="String for naming queued tasks") - parser.add_option("-d", "--tmp_dir", dest='tmpDir', + option_group.add_option("-d", "--tmp_dir", dest='tmpDir', help="Temporary directory for files") - parser.add_option("-S", "--submitOptions", default=None, dest='sgeOptions', + option_group.add_option("-S", "--submitOptions", default=None, dest='sgeOptions', help="Option string to add to SGE or SLURM command") - parser.add_option("-n", "--priority", default=0, type='int', + option_group.add_option("-n", "--priority", default=0, type='int', help="Adjust priority of sub-tasks, only applies to SGE") # primarily used when calling self - parser.add_option("-f", "--frag_base", dest="fragBase", default="file.fragment", - help="naming base for input file fragments") - parser.add_option("-m", "--mode", default='launch', metavar='MODE', + option_group = parser.add_option_group('Internal Use Only') + option_group.add_option("-f", "--frag_base", dest="fragBase", default=None, + help=("naming base for input file fragments" + "('file.fragment')")) + option_group.add_option("--frag_dir", dest="frag_dir", default=None, + help=("folder with input fragments")) + option_group.add_option("--frag_suffix", dest="fragSuff", default=None, + help=("naming suffix for input file fragments")) + option_group.add_option("-m", "--mode", default='launch', metavar='MODE', choices=['launch','run','cleanup'], help="Only used internally to lauch tasks in SGE or SLURM") - parser.add_option("-Z","--taskType",default=None, choices=list(taskTypePatterns.keys()), + option_group.add_option("-Z","--taskType",default=None, choices=list(taskTypePatterns.keys()), help="only for task running: what type of task is this? Will be ignored in inital call") - # other - parser.add_option("-l", "--logToFile", default=False, action="store_true", - help="print logging messages to file") - parser.add_option("-v", "--verbose", - action="count", dest="verbose", default=1, - help="Print log messages. Use twice for debugging") - parser.add_option("-q", '--quiet', dest='verbose', - action="store_const", const=0, - help="Suppress warnings. Only print fatal messages") - parser.add_option("-V","--loglevel", type='int', dest="verbose", - help="Shortcut to set verbosity directly") - parser.add_option("--version", dest="version", default=False, - action="store_true", help="print version") - parser.add_option("-A", "--about", - action="store_true", dest="about", default=False, - help="Print description") - (options, cmdargs) = parser.parse_args() @@ -709,6 +798,9 @@ def prepareCommandForBatch(cmdargs,options): infile=None prefix=None outfile=None + # remember user specified flags + user_flags = get_all_flags(options) + # look up options and flags taskType=options.taskType if taskType is not None: @@ -781,9 +873,13 @@ def prepareCommandForBatch(cmdargs,options): # reset nextArgument=None - # Quit if the user specified arguments are not found + # Quit if the user specified flags or any pos arguments are not found + logging.debug("Used flags: \n%s\npos" + "args:\n%s\nuser_flags:\n%s\nflaggedArgs:\n%s", + usedFlags, positionalArgs, user_flags, flaggedArgs) unused_flags = set(positionalArgs).difference(usedFlags)\ - .union(set(flaggedArgs).difference(usedFlags)) + .union(set(user_flags).difference(usedFlags)) + logging.debug("unused flags: \n %s", unused_flags) if len(unused_flags)>0: raise Exception("Unable to find the following arguments in the " + \ "command: '%s'" % "', '" \ @@ -801,6 +897,24 @@ def prepareCommandForBatch(cmdargs,options): return (infile,outfile) + +def get_all_flags(options): + """ return list of flags explicitly chose by user """ + flags = [] + if options.inputFlag: + flags.append(try_to_int(options.inputFlag)) + if options.outputFlags: + for flag in options.outputFlags: + flags.append(try_to_int(flag)) + return flags + +def try_to_int(flag): + try: + return int(flag) + except ValueError: + return flag + + def translatePositionalArgs(options,cmdargs,checkMap): """ using list of flags that take arguments for given taskType: @@ -941,7 +1055,8 @@ def launchJobs(options, cmdargs, errStream=sys.stdin): # batch_runner command command.append(BATCHLAUNCHER) - command+=["--mode","run","--tmp_dir",options.tmpDir,"--frag_base", options.fragBase, "--loglevel", str(options.verbose), "--queue", options.queue] + command+=["--mode","run","--tmp_dir",options.tmpDir,"--frag_base", + options.fragBase, "--frag_dir", options.frag_dir, "--frag_suffix", options.fragSuff, "--loglevel", str(options.verbose), "--queue", options.queue] if options.inputFlag is not None: command+=['-i',str(options.inputFlag)] if options.prefixFlag is not None: @@ -1108,7 +1223,7 @@ def launchCleanup(options, cmdargs, errStream=sys.stderr): logging.debug("Launching cleanup: %s" % ({'tmpDir':options.tmpDir,'splits':options.splits,'fragBase':options.fragBase,'out':options.outputFlags,'job':options.jobName})) # name outputfiles - outFileByFlag = getOutputFiles(options,cmdargs) + outFileByFlag = getOutputFiles(options, cmdargs) logging.debug("Outfiles: %s" % (outFileByFlag)) fileNameBase = getFileNameBase(options.outputFlags,outFileByFlag,options.jobName) logging.debug("File Name Base: %s" % (fileNameBase)) @@ -1118,7 +1233,18 @@ def launchCleanup(options, cmdargs, errStream=sys.stderr): command = getSubmissionCommandPrefix(options, cleanupFile=fileNameBase) # cleanup - command+=[BATCHLAUNCHER,'--tmp_dir',options.tmpDir,'--frag_base',options.fragBase,'--mode','cleanup','--numChunks',str(options.splits), '--loglevel', str(options.verbose), '--retries', str(options.retries),'--jobName',options.jobName, '--chunk', str(options.chunk), '--queue', options.queue] + command+=[BATCHLAUNCHER, + '--tmp_dir', options.tmpDir, + '--frag_base', options.fragBase, + '--frag_dir', options.frag_dir, + '--frag_suffix', options.fragSuff, + '--mode', 'cleanup', + '--numChunks', str(options.splits), + '--loglevel', str(options.verbose), + '--retries', str(options.retries), + '--jobName', options.jobName, + '--chunk', str(options.chunk), + '--queue', options.queue] if options.splitOnSize: command.append('--splitOnSize') if options.inputFlag is not None: @@ -1168,10 +1294,13 @@ def cleanup(options, cmdargs, errStream=sys.stdin): exitcode=0 # get list of output flags - outFileByFlag = getOutputFiles(options,cmdargs) + outFileByFlag = getOutputFiles(options, cmdargs) + logging.debug("Outfiles: %s" % (outFileByFlag)) # name outputfiles - fileNameBase = getFileNameBase(options.outputFlags,outFileByFlag,options.jobName) + fileNameBase = getFileNameBase(options.outputFlags, + outFileByFlag, + options.jobName) # remove old output files errStreamFile="%s.stderr" % fileNameBase @@ -1217,7 +1346,7 @@ def cleanup(options, cmdargs, errStream=sys.stdin): # look for files for i in taskIds: # look for output - fragName = getFragmentName(options.fragBase,i) + fragName = getFragmentName(options.fragBase, i, options.fragSuff) prefix = getFragmentPrefix(options.fragBase,i) frag = "%s%s%s" % (options.tmpDir, os.sep, fragName) fragerr = "%s.exitcode" % (frag) @@ -1338,7 +1467,7 @@ def cleanup(options, cmdargs, errStream=sys.stdin): # first check tasks that failed completely # rename tasks to make them consecutive if len(failedTasks)>0: - nextTaskNum+=reFragmentMissedTasks(failedTasks,options) + nextTaskNum+=reFragmentMissedTasks(failedTasks, options) # then, if we were able to identify missing records if len(missingRecords)>0: @@ -1430,7 +1559,11 @@ def reFragmentMissedTasks(missedTasks, options): failedRecordStream = fileinput.input(inputsToReFragment) # create new fragments in temporaryLocation - newFragNum=fragmentInputStreamBySize(failedRecordStream, temporaryLocation, options.chunk, fileType, options.fragBase, splitOnSize=options.splitOnSize) + newFragNum=fragmentInputStreamBySize(failedRecordStream, temporaryLocation, + options.chunk, fileType, + options.fragBase, + splitOnSize=options.splitOnSize, + suffix=options.fragSuff) # remove old fragments for i in missedTasks: @@ -1450,13 +1583,29 @@ def moveNewFragmentsToTmpDir(options,nextTaskNum): os.rmdir("%s%stmp" % (options.tmpDir, os.sep)) def getFileNameBase(outFlags,outFileByFlag,default): - # what do we name all the log files? + " what do we name all the log files? " if outFlags: - # base output file on first output file selected - firstFlag=outFlags[0] - if firstFlag in outFileByFlag: - if isinstance(outFileByFlag[firstFlag],str): - return outFileByFlag[firstFlag] + logging.debug("Output Flags: \n%r", outFlags) + logging.debug("Output Files \n%r", outFileByFlag) + # base output file on first output file selected that looks good + # This is a hack to fix fallout from the secondary file code + best_file = None + for flag in outFlags: + out_file = outFileByFlag.get(flag, None) + if out_file is None: + continue + if (not out_file.startswith('-')) and (not + out_file.startswith('./-')): + # this is a good file, use it and stop looking + best_file = out_file + logging.debug("bestest file: %s", best_file) + break + if best_file is None: + # if we haven't seen anything else yet, save this + best_file = out_file + logging.debug("best file: %s", best_file) + if best_file is not None: + return best_file else: # has been translated to integer, take #1 (first in command, not in flags) return outFileByFlag[1] @@ -1737,11 +1886,13 @@ def runFragment(options, cmdargs, taskNum=None): logging.basicConfig(stream=errStream, level=loglevel) # set up file names - infragmentName = getFragmentName(options.fragBase, taskNum) + infragmentName = getFragmentName(options.fragBase, taskNum, + options.fragSuff) + fragment_dir = options.frag_dir prefix = getFragmentPrefix(options.fragBase, taskNum) - infragment = "%s%s%s" % (options.tmpDir, os.sep, infragmentName) - stdoutFragment="%s.stdout"%infragment - stderrFragment="%s.stderr"%infragment + infragment = "%s%s%s" % (fragment_dir, os.sep, infragmentName) + stdoutFragment=os.path.join(options.tmpDir, infragmentName) + ".stdout" + stderrFragment=os.path.join(options.tmpDir, infragmentName) + ".stderr" if options.queue == LOCAL: stdoutLocal=stdoutFragment stderrLocal=stderrFragment @@ -1775,6 +1926,9 @@ def runFragment(options, cmdargs, taskNum=None): (foundI, outputFlags) = prepareCommandForFragment(options, infragment, prefix, outLocal, cmdargs, hostname, errStream) logging.debug("ready ro RUN!") + # fragmented HMMdbs need to be compiled + prep_input_fragment(infragment, options.taskType) + # setup to run command # I/O if foundI: @@ -1848,7 +2002,8 @@ def runFragment(options, cmdargs, taskNum=None): # Do some final cleanup: if exitcode!=0: - errCodeFile="%s.exitcode" % (infragment) + errCodeFile=os.path.join(options.tmpDir, + "%s.exitcode" % (infragmentName)) ecStream=open(errCodeFile,'w') ecStream.write(str(exitcode)) ecStream.close() @@ -1860,9 +2015,11 @@ def runFragment(options, cmdargs, taskNum=None): logging.shutdown() errStream.close() if options.queue == LOCAL: - shutil.move(logFile, "%s.log" % (infragment)) + shutil.move(logFile, os.path.join(options.tmpDir, + "%s.log" % (infragmentName))) else: - shutil.copy(logFile, "%s.log" % (infragment)) + shutil.copy(logFile, os.path.join(options.tmpDir, + "%s.log" % (infragmentName))) # remove local temorary dir for f in os.listdir(localDir): @@ -1871,6 +2028,20 @@ def runFragment(options, cmdargs, taskNum=None): return exitcode + +def prep_input_fragment(infragment, taskType): + """ fragmented HMMdbs need to be compiled""" + if taskType == HMMER: + if not(os.path.exists(infragment + ".h3i")): + logging.info("Running hmmpress on %s", infragment) + log_file = infragment + ".hmmpress.log" + with open(log_file, 'w') as err_stream: + subprocess.check_call(['hmmpress', infragment], + stdout=err_stream, + stderr=err_stream, + ) + + def getOptionHashes(options): """ based on flags set by user and the taskType, @@ -1921,7 +2092,7 @@ def getOptionHashes(options): return (positionalArgs,flaggedArgs) -def getOutputFiles(options,cmdargs): +def getOutputFiles(options, cmdargs): """ return map of (final) output files from this command ...keyed on the flag that indicated @@ -1993,6 +2164,11 @@ def getOutputFiles(options,cmdargs): # No, it is not. Move on pass + # is it an option flag (starts with dash)? + if flag.startswith('-'): + # I guess this could be a file, but let's ignore anyway + continue + if flag not in usedFlags: if flag=='%stdout': outFileByFlag[flag] = None @@ -2429,6 +2605,7 @@ def applyDefaultsToCommand(command,taskType,prepend=False): GLIMMERMG='glimmermg' FRHIT='frhit' LAST='lastal' +HMMER='hmmer' programOptionMap={BLAST:{'in':'-i', 'out':['-o'], 'threads':'-a', @@ -2457,7 +2634,10 @@ def applyDefaultsToCommand(command,taskType,prepend=False): }, FRHIT:{'in':'-d', 'out':['-o'], - 'threads':'-T'} + 'threads':'-T'}, + HMMER:{'in':1, + 'out':['-o','--domtblout','--tblout'], + 'threads':'--cpu'}, } blastPlusProgRE=re.compile( r'(blastn|blastp|blastx|tblastx|tblastn|rpsblast|rpstblastn)$') @@ -2468,6 +2648,7 @@ def applyDefaultsToCommand(command,taskType,prepend=False): glimmermgRE=re.compile(r'(glimmer-mg\.py)$') frHitRE=re.compile(r'(fr-hit)$') lastRE=re.compile(r'(lastal)$') +hmmerRE=re.compile(r'(hmmsearch|hmmscan)$') taskTypePatterns={BLAST:blastProgRE, BLASTPLUS:blastPlusProgRE, DCMB:dcmbProgRE, @@ -2476,6 +2657,7 @@ def applyDefaultsToCommand(command,taskType,prepend=False): GLIMMERMG:glimmermgRE, FRHIT:frHitRE, LAST:lastRE, + HMMER:hmmerRE, } inspectCommandForTaskType={BLAST:inspectBlastCommand, BLASTPLUS:inspectBlastCommand, @@ -2498,6 +2680,11 @@ def applyDefaultsToCommand(command,taskType,prepend=False): unsupportedOptions={} flagsWithArguments={GLIMMERMG:['--iter','-p','-t','-i','-q','-r','-s','-u','--fudge','--taxlevel','--minbp_pct'], LAST:['-P','-a','-b','-c','-d','-e','-F','-p','-q','-r','-x','-y','-z','-f','-k','-l','-m','-n','-s','-i','-u','-t','-j','-Q','-g','-G','-o'], + HMMER:["-o", "-A", "--tblout", "--domtblout", + "--pfamtblout", "--textw", "-E", "-T", "--domE", + "--domT", "--incE", "--incT", "--incdomE", + "--incdomT", "--F1", "--F2", "--F3", "-Z", "--domZ", + "--seed", "--tformat", "--cpu"], } taskSpecificCopy={} @@ -2526,6 +2713,12 @@ def applyDefaultsToCommand(command,taskType,prepend=False): # Classes ############## class FragmentThread( threading.Thread ): + """ + For local multi threading + + This might be better in multiprocessing, but since we're just spawning + processes, the global lock isn't too much of an issue + """ def __init__(self, queue): self.queue = queue threading.Thread.__init__ ( self )