Skip to content

Commit

Permalink
added tests and two features.
Browse files Browse the repository at this point in the history
1: set fragment prep command from command line options
2: indicate command arguments to be translated into absolute paths
3: allow output to /dev/null to skip fragmenting
  • Loading branch information
jmeppley committed Dec 6, 2017
1 parent 51bf4d2 commit 422db89
Show file tree
Hide file tree
Showing 6 changed files with 121,554 additions and 20 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,5 @@ target/
*.local
*.sge
*.slurm
.tst.*
.batch_launcher
114 changes: 94 additions & 20 deletions batch_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,20 @@ def main():
help="Indicate where in command to find a prefix for output files. Output extension flags '%p.ext' indicate that the output file will be PREFIX.ext.")
option_group.add_option("-t", "--threadsFlag", metavar="FLAG",
help="Option to use to tell command how many threads to use")
option_group.add_option("-H", "--relative_path", metavar='FLAG',
action='append', dest='rel_paths',
help="Use the same syntax as -o or -i to indicate "
"elements in the command that are relative "
"paths. These will need to be made absolute when "
"the command is run in a temp folder.")
option_group.add_option("--frag_prep", metavar='COMMAND', default=None,
help="command to run on fragments. Can be used "
"to fragment databases instead of queries. "
" Include place holder '{}' for file name. "
"Can optionnally include output suffix to "
"prevent for unneeded work when re-using "
"fragments. EG: "
"'.h3i: hmmpress {}' for fragmenting an HMM db")

# ways to customize batch behavior
addFragmentingOptions(parser)
Expand Down Expand Up @@ -749,6 +763,7 @@ def main():
else:
logging.warn("Debugging level is high, so skipping cleanup and leaving temporary files in place")
sys.exit(0)
logging.debug("Done!")
sys.exit(exitcode)
else:
# otherwise, launch cleanup task to wait for processing to finish
Expand Down Expand Up @@ -822,6 +837,11 @@ def prepareCommandForBatch(cmdargs,options):
options.threadsFlag = programOptionMap[taskType].get('threads',None)
if isinstance(options.threadsFlag,int):
relativePositions['threads']=True
if not options.rel_paths:
options.rel_paths = programOptionMap[taskType].get('rel', [])
for flag in options.rel_paths:
if isinstance(flag, int):
relativePositions['rel'] = True

if len(relativePositions):
translatePositionalArgs(options,cmdargs,relativePositions)
Expand Down Expand Up @@ -866,6 +886,8 @@ def prepareCommandForBatch(cmdargs,options):
logging.debug("Found output (%s) in arg %d" % (outfile,i))
elif nextArgument=='threads':
pass
elif nextArgument=='rel':
pass
else:
# something went wrong
raise Exception("Unrecognized nextArgument: %s" % (nextArgument))
Expand Down Expand Up @@ -954,6 +976,10 @@ def translatePositionalArgs(options,cmdargs,checkMap):
for i in range(len(options.outputFlags)):
if isinstance(options.outputFlags[i],int):
options.outputFlags[i]=positionMap[options.outputFlags[i]-1]
if 'rel' in checkMap:
for i in range(len(options.rel_paths)):
if isinstance(options.rel_paths[i], int):
options.rel_paths[i] = positionMap[options.rel_paths[i]-1]
if 'prefix' in checkMap:
if isinstance(options.prefixFlag,int):
options.prefixFlag=positionMap[options.prefixFlag-1]
Expand Down Expand Up @@ -1505,6 +1531,7 @@ def cleanup(options, cmdargs, errStream=sys.stdin):

logging.info("Final cleanup")
# check contesnts of tasks.err and tasks.out in options.tmpDir
logging.debug("collecting stderr and stdout from fragments")
commonerr="%s%stasks.err"%(options.tmpDir,os.sep)
commonout="%s%stasks.out"%(options.tmpDir,os.sep)
# if not empty, add to errStream (make sure it's open)
Expand All @@ -1522,6 +1549,7 @@ def cleanup(options, cmdargs, errStream=sys.stdin):
os.remove(commonout)

# warn if any files left
logging.debug("Checking for leftover files")
leftoverFiles=os.listdir(options.tmpDir)
if len(leftoverFiles)>0:
if errStream is None:
Expand All @@ -1539,7 +1567,9 @@ def cleanup(options, cmdargs, errStream=sys.stdin):
if failureStream is not None:
failureStream.close()
# delete directory
logging.debug("removing tmp folder %s", options.tmpDir)
os.rmdir(options.tmpDir)
logging.debug("cleanup is complete")
return exitcode

def reFragmentMissedTasks(missedTasks, options):
Expand Down Expand Up @@ -1927,7 +1957,9 @@ def runFragment(options, cmdargs, taskNum=None):
logging.debug("ready ro RUN!")

# fragmented HMMdbs need to be compiled
prep_input_fragment(infragment, options.taskType)
prep_input_fragment(infragment,
fragment_prep_for_task.get(options.taskType,
options.frag_prep))

# setup to run command
# I/O
Expand Down Expand Up @@ -2029,17 +2061,28 @@ def runFragment(options, cmdargs, taskNum=None):
return exitcode


def prep_input_fragment(infragment, taskType):
def prep_input_fragment(infragment, frag_prep):
""" fragmented HMMdbs need to be compiled"""
if taskType == HMMER:
if not(os.path.exists(infragment + ".h3i")):
logging.info("Running hmmpress on %s", infragment)
log_file = infragment + ".hmmpress.log"
with open(log_file, 'w') as err_stream:
subprocess.check_call(['hmmpress', infragment],
stdout=err_stream,
stderr=err_stream,
)
if frag_prep:
m = re.search(r'^([\.a-zA-Z0-9_]+):\s+(\S.+)$', frag_prep)
if m:
ext, command_template = m.groups()
# don't do antthing if output exists and is newer
if os.path.exists(infragment + ext) and \
(os.path.getmtime(infragment) < \
os.path.getmtime(infragment + ext)):
return
else:
command_template = frag_prep
logging.info("Running %s on %s", command_template, infragment)
command = shlex.split(command_template.format(infragment))
log_file = infragment + ".prep.log"
logging.debug("Logging command: %r to %s", command, log_file)
with open(log_file, 'w') as err_stream:
subprocess.check_call(command,
stdout=err_stream,
stderr=err_stream,
)


def getOptionHashes(options):
Expand Down Expand Up @@ -2089,7 +2132,13 @@ def getOptionHashes(options):
except TypeError:
for flag in options.prefixFlag:
flaggedArgs[flag]='prefix'

if options.rel_paths is not None:
for rel_path_flag in options.rel_paths:
try:
positionalArgs[int(rel_path_flag)]='rel'
except ValueError:
flaggedArgs[rel_path_flag]='rel'

return (positionalArgs,flaggedArgs)

def getOutputFiles(options, cmdargs):
Expand Down Expand Up @@ -2134,11 +2183,14 @@ def getOutputFiles(options, cmdargs):
prefix=cmdargs[i]
logging.debug("Found prefix (%s) in arg %d" % (prefix,i))
elif nextArgument=='out':
namedOutCount+=1
outFileByFlag[namedOutCount]=cmdargs[i]
logging.debug("Found outfile number %d in arg %d" % (namedOutCount,i))
if cmdargs[i] != '/dev/null':
namedOutCount+=1
outFileByFlag[namedOutCount]=cmdargs[i]
logging.debug("Found outfile number %d in arg %d" % (namedOutCount,i))
elif nextArgument=='threads':
pass
elif nextArgument=='rel':
pass
else:
# something went wrong
raise Exception("Unrecognized nextArgument: %s" % (nextArgument))
Expand Down Expand Up @@ -2183,6 +2235,9 @@ def getOutputFiles(options, cmdargs):
def prepareCommandForFragment(options, infragment, prefix, outLocal, cmdargs, hostname, errStream):
"""
Replace input and output file names (and thread count) with fragment versions
Translate any other (non i/o) paths to absolute paths.
return:
boolean indicating if input file found or not
list of outputFileFlags. Each is one of:
Expand All @@ -2192,10 +2247,17 @@ def prepareCommandForFragment(options, infragment, prefix, outLocal, cmdargs, ho
"""

foundI=False
logging.debug("Command:\n%s\ninfile: %s\noutfile: %s" % (cmdargs, infragment, outLocal))
logging.debug("Command:\n%s\ninfile: %s\noutfile: %s\ninflag: %r"
"\noutflags: %r\nprefixFlags: %r\nrel_paths: %r",
cmdargs,
infragment,
outLocal,
options.inputFlag,
options.outputFlags,
options.prefixFlag,
options.rel_paths)

# setup hashes to look for options
logging.debug("inflag: %r\noutflags: %r\nprefixFlags: %r" % (options.inputFlag, options.outputFlags, options.prefixFlag))
(positionalArgs,flaggedArgs)=getOptionHashes(options)
logging.debug("Looking for args: \n%r\n%r" % (positionalArgs,flaggedArgs))

Expand Down Expand Up @@ -2228,9 +2290,16 @@ def prepareCommandForFragment(options, infragment, prefix, outLocal, cmdargs, ho
cmdargs[i]=prefix
#logging.debug("Replaced prefix at arg %d" % (i))
elif nextArgument=='out':
namedOutCount+=1
cmdargs[i]="%s.%s" % (outLocal,namedOutCount)
#logging.debug("Replaced outfile at arg %d" % (i))
if cmdargs[i] != '/dev/null':
namedOutCount+=1
cmdargs[i]="%s.%s" % (outLocal,namedOutCount)
#logging.debug("Replaced outfile at arg %d" % (i))
elif nextArgument=='rel':
old_path = cmdargs[i]
new_path = os.path.abspath(old_path)
logging.debug("changing %s to %s",
old_path, new_path)
cmdargs[i] = new_path
elif nextArgument=='threads':
if options.queue != LOCAL:
threads = getThreadCountForNode(hostname,errStream)
Expand Down Expand Up @@ -2608,6 +2677,7 @@ def applyDefaultsToCommand(command,taskType,prepend=False):
HMMER='hmmer'
programOptionMap={BLAST:{'in':'-i',
'out':['-o'],
'rel':['-d'],
'threads':'-a',
},
DCMB:{'in':'-query',
Expand All @@ -2616,6 +2686,7 @@ def applyDefaultsToCommand(command,taskType,prepend=False):
},
BLASTPLUS:{'in':'-query',
'out':['-out'],
'rel':['-db'],
'threads':'-num_threads'
},
METARNA:{'in':'-i',
Expand All @@ -2624,6 +2695,7 @@ def applyDefaultsToCommand(command,taskType,prepend=False):
'threads':'-p',
},
LAST:{'in':2,
'rel':[1,],
'out':['%stdout']},
GLIMMERMG:{'in':1,
'out':['%p.predict','%E.predict'],
Expand All @@ -2636,6 +2708,7 @@ def applyDefaultsToCommand(command,taskType,prepend=False):
'out':['-o'],
'threads':'-T'},
HMMER:{'in':1,
'rel':[2,],
'out':['-o','--domtblout','--tblout'],
'threads':'--cpu'},
}
Expand All @@ -2659,6 +2732,7 @@ def applyDefaultsToCommand(command,taskType,prepend=False):
LAST:lastRE,
HMMER:hmmerRE,
}
fragment_prep_for_task={HMMER: ".h3i: hmmpress {}",}
inspectCommandForTaskType={BLAST:inspectBlastCommand,
BLASTPLUS:inspectBlastCommand,
DCMB:inspectDCMBCommand,
Expand Down
16 changes: 16 additions & 0 deletions test_data/bats/hmmer.bats
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/usr/bin/env bats

@test "Testing HMM db split" {
run ./batch_launcher.py -X local -N 5 -- hmmsearch --tblout test_data/.tst.genes.v.pfam.tbl --cpu 2 test_data/pfams.100.hmm test_data/genes.faa
[ "$status" = 0 ]
run diff <(grep -v "^#" test_data/.tst.genes.v.pfam.tbl) <(grep -v "^#" test_data/genes.v.pfam.tbl)
[ "$status" = 0 ]
}

@test "Testing HMM db split with keep" {
run ./batch_launcher.py -X local -N 5 -K -- hmmsearch --tblout test_data/.tst.genes.v.pfam.tbl --cpu 2 test_data/pfams.100.hmm test_data/genes.faa
[ "$status" = 0 ]
run diff <(grep -v "^#" test_data/.tst.genes.v.pfam.tbl) <(grep -v "^#" test_data/genes.v.pfam.tbl)
[ "$status" = 0 ]
}

Loading

0 comments on commit 422db89

Please sign in to comment.