From f694eb63655ac4a48fbb389c07e9147cc800b5bf Mon Sep 17 00:00:00 2001 From: ygidtu Date: Tue, 14 Jul 2020 17:00:42 +0800 Subject: [PATCH] split bam into chunks --- const.go | 13 ++++- load.go | 175 +++++++++++++++++++++++++++++++++++++++++-------------- main.go | 121 +++++++++++++++++++++++++------------- 3 files changed, 223 insertions(+), 86 deletions(-) diff --git a/const.go b/const.go index 9b259f2..1ad178a 100644 --- a/const.go +++ b/const.go @@ -2,6 +2,7 @@ package main import ( "fmt" + "github.com/biogo/hts/bgzf" "regexp" "strconv" "strings" @@ -19,7 +20,7 @@ var conf config var region *Region const ( - VERSION = "0.01" + VERSION = "0.0.2" DefaultBaseQuality = 30 ) @@ -123,6 +124,16 @@ func decodeRegion(region string) *Region { return res } + +type ChanChunk struct { + Ref string + Start int + End int + Chunks bgzf.Chunk +} + + + type Omopolymeric struct { Chromosome string NegEquals int diff --git a/load.go b/load.go index ec77c37..d310c72 100644 --- a/load.go +++ b/load.go @@ -3,6 +3,13 @@ package main import ( "bufio" "compress/gzip" + "io" + "math" + "os" + "regexp" + "strconv" + "strings" + "github.com/biogo/biogo/alphabet" "github.com/biogo/biogo/io/seqio" "github.com/biogo/biogo/io/seqio/fasta" @@ -10,15 +17,10 @@ import ( "github.com/biogo/hts/bam" "github.com/biogo/hts/bgzf" "github.com/biogo/hts/fai" + "github.com/biogo/hts/sam" "github.com/cheggaaa/pb/v3" "github.com/golang-collections/collections/set" "github.com/pkg/errors" - "io" - "math" - "os" - "regexp" - "strconv" - "strings" ) // loadBedFile is function that load bed files @@ -368,8 +370,76 @@ func loadSplicingPositions() (map[string]*set.Set, error) { return res, nil } -func fetchBamRefs() ([]string, error) { - res := make([]string, 0, 0) +// splitRegion is used to spit a number of size into chunks +func splitRegion(end, chunks int) []int { + regions := []int{0} + + bk := end / maxInt(chunks, 1) + + for i := 0; i < chunks; i++ { + if i == chunks-1 { + regions = append(regions, end) + } else { + regions = append(regions, (i+1)*bk) + } + } + return regions +} + +// adjustRegion is used to move the end site a little backwards +func adjustRegion(regions []int, idx *bam.Index, ref *sam.Reference, bamReader *bam.Reader) ([]bgzf.Chunk, error) { + + res := make([]bgzf.Chunk, 0, 0) + for i := 0; i < len(regions); i++ { + if i > 0 && i < len(regions)-1 { + chunks, err := idx.Chunks(ref, regions[i], regions[i+1]) + if err != nil { + sugar.Debugf("%v", regions) + // return res, errors.Wrapf(err, "failed before adjust %s: %d - %d", ref.Name(), region[i], region[i+1]) + continue + } + + if len(chunks) == 0 { + continue + } + + + iter, err := bam.NewIterator(bamReader, chunks) + if err != nil { + return res, err + } + + lastEnd := 0 + for iter.Next() { + record := iter.Record() + + if lastEnd == 0 { + lastEnd = record.End() + } else if record.Start() > lastEnd { + break + } + } + + if lastEnd > 0 { + regions[i] = lastEnd + } + } + } + + for i := 1; i < len(regions); i++ { + chunks, err := idx.Chunks(ref, regions[i-1], regions[i]) + if err != nil { + continue + // return res, errors.Wrapf(err, "failed after adjust %s: %d - %d", ref.Name(), region[i-1], region[i]) + } + res = append(res, chunks...) + } + + return res, nil +} + +func fetchBamRefs() (map[string][]*ChanChunk, error) { + res := make(map[string][]*ChanChunk) ifh, err := os.Open(conf.File) //Panic if something went wrong: if err != nil { @@ -383,9 +453,60 @@ func fetchBamRefs() ([]string, error) { return res, err } + idxF, err := os.Open(conf.File + ".bai") + if err != nil { + return nil, err + } + defer idxF.Close() + + idx, err := bam.ReadIndex(idxF) + if err != nil { + return nil, err + } + for _, ref := range bamReader.Header().Refs() { - res = append(res, ref.Name()) + length, err := strconv.Atoi(ref.Get(sam.NewTag("LN"))) + if err != nil { + return res, err + } + + regions := make([]int, 0, 0) + if region.Empty() { + regions = splitRegion(length, maxInt(conf.Process, 1)) + } else if ref.Name() == region.Chrom { + if region.End != 0 { + length = region.End + } + + for _, i := range splitRegion(length-region.Start, maxInt(conf.Process, 1)) { + regions = append(regions, region.Start+i) + } + } + + //sugar.Infof("%s - %d", ref.Name(), len(regions)) + + if len(regions) == 0 { + continue + } + sugar.Debugf("make chunks of %s", ref.Name()) + temp, err := adjustRegion(regions, idx, ref, bamReader) + if err != nil { + return res, err + } + + if len(temp) == 0 { + continue + } + + tempRes := make([]*ChanChunk, 0, 0) + for _, t := range temp { + tempRes = append(tempRes, &ChanChunk{Ref: ref.Name(), Chunks: t}) + } + + res[ref.Name()] = tempRes } + + sugar.Debug(res) return res, nil } @@ -427,25 +548,13 @@ func fetchFasta(region *Region) ([]byte, error) { return res, err } -func fetchBam(region *Region) (*bam.Iterator, error) { - sugar.Infof("read reads from %s", region.String()) +func fetchBam(region bgzf.Chunk) (*bam.Iterator, error) { ifh, err := os.Open(conf.File) //Panic if something went wrong: if err != nil { return nil, err } - idxF, err := os.Open(conf.File + ".bai") - if err != nil { - return nil, err - } - defer idxF.Close() - - idx, err := bam.ReadIndex(idxF) - if err != nil { - return nil, err - } - //Create a new BAM reader with maximum //concurrency: bamReader, err := bam.NewReader(ifh, 1) @@ -453,25 +562,5 @@ func fetchBam(region *Region) (*bam.Iterator, error) { return nil, err } - chunks := make([]bgzf.Chunk, 0, 0) - - for _, ref := range bamReader.Header().Refs() { - if region.Chrom != "" && ref.Name() == region.Chrom { - if region.Start == 0 && region.End == 0 { - if stats, ok := idx.ReferenceStats(ref.ID()); ok { - chunks = append(chunks, stats.Chunk) - } - } else if region.Chrom != "" && region.Start > 0 && region.End > 0 { - if tempChunks, err := idx.Chunks(ref, region.Start, region.End); err != nil { - chunks = append(chunks, tempChunks...) - } - } - } else if region.Chrom == "" { - if stats, ok := idx.ReferenceStats(ref.ID()); ok { - chunks = append(chunks, stats.Chunk) - } - } - } - - return bam.NewIterator(bamReader, chunks) + return bam.NewIterator(bamReader, []bgzf.Chunk{region}) } diff --git a/main.go b/main.go index b7f0a90..ca9a483 100644 --- a/main.go +++ b/main.go @@ -24,27 +24,17 @@ func writer(w chan string, wg *sync.WaitGroup) { f, err := os.OpenFile(conf.Output, mode, 0644) if err != nil { sugar.Fatalf("failed to open %s: %s", conf.Output, err.Error()) + os.Exit(1) } - defer f.Close() - var gwriter *gzip.Writer - var writer *bufio.Writer + writer := bufio.NewWriter(f) if strings.HasSuffix(conf.Output, "gz") { gwriter = gzip.NewWriter(f) - defer gwriter.Flush() - defer gwriter.Close() - } else { - writer = bufio.NewWriter(f) - defer writer.Flush() + writer = bufio.NewWriter(gwriter) } - if !conf.RemoveHeader { - if gwriter != nil { - _, err = gwriter.Write([]byte(strings.Join(getHeader(), "\t") + "\n")) - } else { - _, err = writer.WriteString(strings.Join(getHeader(), "\t") + "\n") - } + _, err = writer.WriteString(strings.Join(getHeader(), "\t") + "\n") if err != nil { sugar.Fatal(err) @@ -71,19 +61,38 @@ func writer(w chan string, wg *sync.WaitGroup) { } } - if gwriter == nil { - _, _ = writer.WriteString(line + "\n") - } else { - gwriter.Write([]byte(line + "\n")) + if _, err := writer.WriteString(line + "\n"); err != nil { + sugar.Fatal(err) + } + } + + if err := writer.Flush(); err != nil { + sugar.Fatal(err) + } + + if gwriter != nil { + if err := gwriter.Flush(); err != nil { + sugar.Fatal(err) + } + if err := gwriter.Close(); err != nil { + sugar.Fatal(err) } + } + if err := writer.Flush(); err != nil { + sugar.Fatal(err) } - _ = f.Sync() + if err := f.Close(); err != nil { + sugar.Fatal(err) + } wg.Done() } -func worker(wg *sync.WaitGroup, refs chan *Region, w chan string, omopolymericPositions, splicePositions, targetPositions map[string]*set.Set) { +func worker( + wg *sync.WaitGroup, refs chan *ChanChunk, w chan string, + omopolymericPositions, splicePositions, targetPositions map[string]*set.Set, + chrRefs map[string][]byte) { defer wg.Done() for { @@ -92,20 +101,13 @@ func worker(wg *sync.WaitGroup, refs chan *Region, w chan string, omopolymericPo break } - chrRef, err := fetchFasta(ref) - if err != nil { - sugar.Warnf("try to modify %s", ref.Chrom) - if strings.HasPrefix(ref.Chrom, "chr") { - chrRef, err = fetchFasta(&Region{Chrom: strings.ReplaceAll(ref.Chrom, "chr", "")}) - } else { - chrRef, err = fetchFasta(&Region{Chrom: "chr" + ref.Chrom}) - } - } - if err != nil { - sugar.Fatal(err) + chrRef, ok := chrRefs[ref.Ref] + if ! ok { + sugar.Errorf("failed to get %s from reference", ref.Ref) + continue } - iter, err := fetchBam(ref) + iter, err := fetchBam(ref.Chunks) if err != nil { sugar.Fatal(err) } @@ -142,7 +144,7 @@ func worker(wg *sync.WaitGroup, refs chan *Region, w chan string, omopolymericPo genomic := start + record.Start if _, ok := edits[genomic]; !ok { - edits[genomic] = NewEditsInfo(ref.Chrom, chrRef[genomic-1], genomic) + edits[genomic] = NewEditsInfo(ref.Ref, chrRef[genomic-1], genomic) } edits[genomic].AddReads(record, at) @@ -163,13 +165,12 @@ func worker(wg *sync.WaitGroup, refs chan *Region, w chan string, omopolymericPo } getColumn(edits, []map[string]*set.Set{omopolymericPositions, splicePositions}, targetPositions, w) - - sugar.Debugf("read %d reads from %s", total, ref) } w <- "done" } + func main() { conf = defaultConfig() goptions.ParseAndFail(&conf) @@ -222,26 +223,62 @@ func main() { sugar.Infof("Narrowing REDItools to region %s", region.String()) var wg sync.WaitGroup + var lock sync.Mutex w := make(chan string) + refs := make(chan *ChanChunk) + + references, err := fetchBamRefs() + if err != nil { + sugar.Fatal(err) + } + + sugar.Infof("load reference from %s", conf.Reference) + chrRefs := make(map[string][]byte) + for ref, _ := range references { + wg.Add(1) + go func(ref string, wg *sync.WaitGroup, lock *sync.Mutex) { + defer wg.Done() + temp, err := fetchFasta(&Region{Chrom: ref}) + if err != nil { + sugar.Warnf("try to modify %s", ref) + if strings.HasPrefix(ref, "chr") { + temp, err = fetchFasta(&Region{Chrom: strings.ReplaceAll(ref, "chr", "")}) + } else { + temp, err = fetchFasta(&Region{Chrom: "chr" + ref}) + } + } + if err != nil { + sugar.Fatal(err) + } + lock.Lock() + chrRefs[ref] = temp + lock.Unlock() + } (ref, &wg, &lock) + } + + wg.Wait() + wg.Add(1) go writer(w, &wg) - refs := make(chan *Region) for i := 0; i < conf.Process; i++ { - go worker(&wg, refs, w, omopolymericPositions, spicePositions, targetPositions) + go worker(&wg, refs, w, omopolymericPositions, spicePositions, targetPositions, chrRefs) wg.Add(1) } - if region.Empty() { - if references, err := fetchBamRefs(); err == nil { - for _, r := range references { - refs <- &Region{Chrom: r} + if references, err := fetchBamRefs(); err == nil { + for ref, chunks := range references { + sugar.Infof("read reads from %s", ref) + for _, c := range chunks { + sugar.Debug(c) + refs <- c } } } else { - refs <- region + sugar.Error(err) } + close(refs) wg.Wait()