Skip to content

Commit

Permalink
split bam into chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
ygidtu committed Jul 14, 2020
1 parent ed3f80d commit f694eb6
Show file tree
Hide file tree
Showing 3 changed files with 223 additions and 86 deletions.
13 changes: 12 additions & 1 deletion const.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"fmt"
"github.com/biogo/hts/bgzf"
"regexp"
"strconv"
"strings"
Expand All @@ -19,7 +20,7 @@ var conf config
var region *Region

const (
VERSION = "0.01"
VERSION = "0.0.2"
DefaultBaseQuality = 30
)

Expand Down Expand Up @@ -123,6 +124,16 @@ func decodeRegion(region string) *Region {
return res
}


type ChanChunk struct {
Ref string
Start int
End int
Chunks bgzf.Chunk
}



type Omopolymeric struct {
Chromosome string
NegEquals int
Expand Down
175 changes: 132 additions & 43 deletions load.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,24 @@ package main
import (
"bufio"
"compress/gzip"
"io"
"math"
"os"
"regexp"
"strconv"
"strings"

"github.com/biogo/biogo/alphabet"
"github.com/biogo/biogo/io/seqio"
"github.com/biogo/biogo/io/seqio/fasta"
"github.com/biogo/biogo/seq/linear"
"github.com/biogo/hts/bam"
"github.com/biogo/hts/bgzf"
"github.com/biogo/hts/fai"
"github.com/biogo/hts/sam"
"github.com/cheggaaa/pb/v3"
"github.com/golang-collections/collections/set"
"github.com/pkg/errors"
"io"
"math"
"os"
"regexp"
"strconv"
"strings"
)

// loadBedFile is function that load bed files
Expand Down Expand Up @@ -368,8 +370,76 @@ func loadSplicingPositions() (map[string]*set.Set, error) {
return res, nil
}

func fetchBamRefs() ([]string, error) {
res := make([]string, 0, 0)
// splitRegion is used to spit a number of size into chunks
func splitRegion(end, chunks int) []int {
regions := []int{0}

bk := end / maxInt(chunks, 1)

for i := 0; i < chunks; i++ {
if i == chunks-1 {
regions = append(regions, end)
} else {
regions = append(regions, (i+1)*bk)
}
}
return regions
}

// adjustRegion is used to move the end site a little backwards
func adjustRegion(regions []int, idx *bam.Index, ref *sam.Reference, bamReader *bam.Reader) ([]bgzf.Chunk, error) {

res := make([]bgzf.Chunk, 0, 0)
for i := 0; i < len(regions); i++ {
if i > 0 && i < len(regions)-1 {
chunks, err := idx.Chunks(ref, regions[i], regions[i+1])
if err != nil {
sugar.Debugf("%v", regions)
// return res, errors.Wrapf(err, "failed before adjust %s: %d - %d", ref.Name(), region[i], region[i+1])
continue
}

if len(chunks) == 0 {
continue
}


iter, err := bam.NewIterator(bamReader, chunks)
if err != nil {
return res, err
}

lastEnd := 0
for iter.Next() {
record := iter.Record()

if lastEnd == 0 {
lastEnd = record.End()
} else if record.Start() > lastEnd {
break
}
}

if lastEnd > 0 {
regions[i] = lastEnd
}
}
}

for i := 1; i < len(regions); i++ {
chunks, err := idx.Chunks(ref, regions[i-1], regions[i])
if err != nil {
continue
// return res, errors.Wrapf(err, "failed after adjust %s: %d - %d", ref.Name(), region[i-1], region[i])
}
res = append(res, chunks...)
}

return res, nil
}

func fetchBamRefs() (map[string][]*ChanChunk, error) {
res := make(map[string][]*ChanChunk)
ifh, err := os.Open(conf.File)
//Panic if something went wrong:
if err != nil {
Expand All @@ -383,9 +453,60 @@ func fetchBamRefs() ([]string, error) {
return res, err
}

idxF, err := os.Open(conf.File + ".bai")
if err != nil {
return nil, err
}
defer idxF.Close()

idx, err := bam.ReadIndex(idxF)
if err != nil {
return nil, err
}

for _, ref := range bamReader.Header().Refs() {
res = append(res, ref.Name())
length, err := strconv.Atoi(ref.Get(sam.NewTag("LN")))
if err != nil {
return res, err
}

regions := make([]int, 0, 0)
if region.Empty() {
regions = splitRegion(length, maxInt(conf.Process, 1))
} else if ref.Name() == region.Chrom {
if region.End != 0 {
length = region.End
}

for _, i := range splitRegion(length-region.Start, maxInt(conf.Process, 1)) {
regions = append(regions, region.Start+i)
}
}

//sugar.Infof("%s - %d", ref.Name(), len(regions))

if len(regions) == 0 {
continue
}
sugar.Debugf("make chunks of %s", ref.Name())
temp, err := adjustRegion(regions, idx, ref, bamReader)
if err != nil {
return res, err
}

if len(temp) == 0 {
continue
}

tempRes := make([]*ChanChunk, 0, 0)
for _, t := range temp {
tempRes = append(tempRes, &ChanChunk{Ref: ref.Name(), Chunks: t})
}

res[ref.Name()] = tempRes
}

sugar.Debug(res)
return res, nil
}

Expand Down Expand Up @@ -427,51 +548,19 @@ func fetchFasta(region *Region) ([]byte, error) {
return res, err
}

func fetchBam(region *Region) (*bam.Iterator, error) {
sugar.Infof("read reads from %s", region.String())
func fetchBam(region bgzf.Chunk) (*bam.Iterator, error) {
ifh, err := os.Open(conf.File)
//Panic if something went wrong:
if err != nil {
return nil, err
}

idxF, err := os.Open(conf.File + ".bai")
if err != nil {
return nil, err
}
defer idxF.Close()

idx, err := bam.ReadIndex(idxF)
if err != nil {
return nil, err
}

//Create a new BAM reader with maximum
//concurrency:
bamReader, err := bam.NewReader(ifh, 1)
if err != nil {
return nil, err
}

chunks := make([]bgzf.Chunk, 0, 0)

for _, ref := range bamReader.Header().Refs() {
if region.Chrom != "" && ref.Name() == region.Chrom {
if region.Start == 0 && region.End == 0 {
if stats, ok := idx.ReferenceStats(ref.ID()); ok {
chunks = append(chunks, stats.Chunk)
}
} else if region.Chrom != "" && region.Start > 0 && region.End > 0 {
if tempChunks, err := idx.Chunks(ref, region.Start, region.End); err != nil {
chunks = append(chunks, tempChunks...)
}
}
} else if region.Chrom == "" {
if stats, ok := idx.ReferenceStats(ref.ID()); ok {
chunks = append(chunks, stats.Chunk)
}
}
}

return bam.NewIterator(bamReader, chunks)
return bam.NewIterator(bamReader, []bgzf.Chunk{region})
}
Loading

0 comments on commit f694eb6

Please sign in to comment.