Skip to content

Commit

Permalink
record indexer: made parser more robust and improved error handling /…
Browse files Browse the repository at this point in the history
… messages
  • Loading branch information
teharrison committed Apr 30, 2018
1 parent b5b08d2 commit 900efe8
Show file tree
Hide file tree
Showing 3 changed files with 215 additions and 107 deletions.
91 changes: 65 additions & 26 deletions shock-server/node/file/format/fasta/fasta.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,24 +39,50 @@ func (self *Reader) Read() (sequence *seq.Seq, err error) {
if self.r == nil {
self.r = bufio.NewReader(self.f)
}
var label, body []byte
var prev, read, label, body []byte
var eof bool
for {
read, err := self.r.ReadBytes('>')
if len(read) > 1 {
lines := bytes.Split(read, []byte{'\n'})
if len(lines) > 1 {
label = lines[0]
body = bytes.Join(lines[1:len(lines)-1], []byte{})
read, err = self.r.ReadBytes('>')
// non eof error
if err != nil {
if err == io.EOF {
eof = true
} else {
return
}
break
} else if err != nil {
return nil, io.EOF
}
if len(prev) > 0 {
read = append(prev, read...)
}
// only have '>'
if len(read) == 1 {
if eof {
break
} else {
continue
}
}
// found an embedded '>'
if !bytes.Contains(read, []byte{'\n'}) {
prev = read
continue
}
// process lines
read = bytes.TrimSpace(bytes.TrimRight(read, ">"))
lines := bytes.Split(read, []byte{'\n'})
if len(lines) > 1 {
label = lines[0]
body = bytes.Join(lines[1:], []byte{})
}
break
}
if len(label) > 0 && len(body) > 0 {
sequence = seq.New(label, body, nil)
} else {
return nil, errors.New("Invalid fasta entry")
err = errors.New("Invalid fasta entry")
}
if eof {
err = io.EOF
}
return
}
Expand All @@ -67,27 +93,40 @@ func (self *Reader) GetReadOffset() (n int, err error) {
self.r = bufio.NewReader(self.f)
}
n = 0
var read []byte
var eof bool
for {
read, er := self.r.ReadBytes('>')
if len(read) > 1 {
if er == io.EOF {
n += len(read)
} else if read[len(read)-2] != '\n' {
read, err = self.r.ReadBytes('>')
// non eof error
if err != nil {
if err == io.EOF {
eof = true
} else {
return
}
}
// handle embedded '>'
if (len(read) > 1) && bytes.Contains(read, []byte{'\n'}) {
// check for sequence
lines := bytes.Split(bytes.TrimSpace(bytes.TrimRight(read, ">")), []byte{'\n'})
seq := bytes.Join(lines[1:], []byte{})
if len(seq) == 0 {
err = errors.New("Invalid fasta entry")
return
}
if eof {
n += len(read)
continue
err = io.EOF
} else {
n += len(read) - 1
}
if read[len(read)-1] == '>' {
if unread_err := self.r.UnreadByte(); unread_err != nil {
err = unread_err
}
err = self.r.UnreadByte()
}
break
} else if len(read) == 1 {
n += 1
} else if er != nil {
err = er
} else {
n += len(read)
}
if eof {
err = io.EOF
break
}
}
Expand Down
217 changes: 140 additions & 77 deletions shock-server/node/file/format/fastq/fastq.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,56 +44,85 @@ func NewReaderName(name string) (r seq.ReadRewinder, err error) {
}

// Read a single sequence and return it or an error.
// TODO: Does not read interleaved fastq.
func (self *Reader) Read() (sequence *seq.Seq, err error) {
if self.r == nil {
self.r = bufio.NewReader(self.f)
}
var line, label, seqBody, qualBody []byte
sequence = &seq.Seq{}
var seqId, seqBody, qualId, qualBody []byte

inQual := false
READ:
// skip empty lines only at eof
empty := false
for {
if line, err = self.r.ReadBytes('\n'); err == nil {
if len(line) > 0 && line[len(line)-1] == '\r' {
line = line[:len(line)-1]
}
line = bytes.TrimSpace(line)
if len(line) == 0 {
continue
}
switch {
case !inQual && line[0] == '@':
label = line[1:]
case !inQual && line[0] == '+':
if len(label) == 0 {
return nil, errors.New("No ID line parsed at +line in fastq format")
}
if len(line) > 1 && bytes.Compare(label, line[1:]) != 0 {
return nil, errors.New("Quality ID does not match sequence ID")
}
inQual = true
case !inQual:
line = bytes.Join(bytes.Fields(line), nil)
seqBody = append(seqBody, line...)
case inQual:
line = bytes.Join(bytes.Fields(line), nil)
qualBody = append(qualBody, line...)
if len(qualBody) >= len(seqBody) {
break READ
}
}
} else {
return
seqId, err = self.r.ReadBytes('\n')
if err != nil {
break
}
if len(seqId) > 1 {
break
}
empty = true
}

if empty {
err = errors.New("Invalid format: empty line(s) between records")
return
} else if (err == io.EOF) && (len(seqId) > 1) {
err = errors.New("Invalid format: truncated fastq record")
return
} else if err != nil {
return
} else if !bytes.HasPrefix(seqId, []byte{'@'}) {
err = errors.New("Invalid format: id line does not start with @")
return
}
seqId = bytes.TrimSpace(seqId[1:])
if len(seqId) == 0 {
err = errors.New("Invalid format: missing sequence ID")
return
}

if len(seqBody) != len(qualBody) {
return nil, errors.New("Quality length does not match sequence length")
seqBody, err = self.r.ReadBytes('\n')
if err == io.EOF {
err = errors.New("Invalid format: truncated fastq record")
return
} else if err != nil {
return
}
seqBody = bytes.TrimSpace(seqBody)
if len(seqBody) == 0 {
err = errors.New("Invalid format: empty sequence")
return
}
sequence = seq.New(label, seqBody, qualBody)

qualId, err = self.r.ReadBytes('\n')
if err == io.EOF {
err = errors.New("Invalid format: truncated fastq record")
return
} else if err != nil {
return
} else if !bytes.HasPrefix(qualId, []byte{'+'}) {
err = errors.New("Invalid format: plus line does not start with +")
return
}
qualId = bytes.TrimSpace(qualId)
if (len(qualId) > 1) && (bytes.Compare(seqId, qualId[1:]) != 0) {
err = errors.New("Invalid format: quality ID does not match sequence ID")
return
}

qualBody, err = self.r.ReadBytes('\n')
if (err == io.EOF) && (len(seqBody) != len(qualBody)) {
err = errors.New("Invalid format: length of sequence and quality lines do not match")
return
} else if err != nil {
return
} else if len(seqBody) != len(qualBody)-1 {
err = errors.New("Invalid format: length of sequence and quality lines do not match")
return
}
qualBody = bytes.TrimSpace(qualBody)

sequence = seq.New(seqId, seqBody, qualBody)
return
}

Expand All @@ -102,47 +131,81 @@ func (self *Reader) GetReadOffset() (n int, err error) {
if self.r == nil {
self.r = bufio.NewReader(self.f)
}
var seqId, seqBody, qualId, qualBody []byte
curr := 0
id, err := self.r.ReadBytes('\n')
if err != nil {
return 0, err
} else if !bytes.HasPrefix(id, []byte{'@'}) {
return 0, errors.New("Invalid format: id line does not start with @")
}
curr += len(id)

seq, err := self.r.ReadBytes('\n')
if err != nil {
return 0, err
}
curr += len(seq)

plus, err := self.r.ReadBytes('\n')
if err != nil {
return 0, err
} else if !bytes.HasPrefix(plus, []byte{'+'}) {
return 0, errors.New("Invalid format: plus line does not start with +")
}
curr += len(plus)

qual, err := self.r.ReadBytes('\n')
if len(qual) > 1 {
if err == io.EOF {
if len(seq)-1 != len(qual) {
return 0, errors.New("Invalid format: length of sequence and quality lines do not match")
}
n = curr + len(qual)
return n, nil
} else if err != nil {
return 0, err
} else if len(seq) != len(qual) {
return 0, errors.New("Invalid format: length of sequence and quality lines do not match")
} else {
n = curr + len(qual)
return

// skip empty lines only at eof
empty := false
for {
seqId, err = self.r.ReadBytes('\n')
if err != nil {
break
}
if len(seqId) > 1 {
break
}
empty = true
}

if empty {
err = errors.New("Invalid format: empty line(s) between records")
return
} else if (err == io.EOF) && (len(seqId) > 1) {
err = errors.New("Invalid format: truncated fastq record")
return
} else if err != nil {
return
} else if !bytes.HasPrefix(seqId, []byte{'@'}) {
err = errors.New("Invalid format: id line does not start with @")
return
} else if len(seqId) == 2 {
err = errors.New("Invalid format: missing sequence ID")
return
}
return 0, err
curr += len(seqId)

seqBody, err = self.r.ReadBytes('\n')
if err == io.EOF {
err = errors.New("Invalid format: truncated fastq record")
return
} else if err != nil {
return
} else if len(seqBody) == 1 {
err = errors.New("Invalid format: empty sequence")
return
}
curr += len(seqBody)

qualId, err = self.r.ReadBytes('\n')
if err == io.EOF {
err = errors.New("Invalid format: truncated fastq record")
return
} else if err != nil {
return
} else if !bytes.HasPrefix(qualId, []byte{'+'}) {
err = errors.New("Invalid format: plus line does not start with +")
return
}
qualIdTrim := bytes.TrimSpace(qualId)
if (len(qualIdTrim) > 1) && (bytes.Compare(bytes.TrimSpace(seqId[1:]), qualIdTrim[1:]) != 0) {
err = errors.New("Invalid format: quality ID does not match sequence ID")
return
}
curr += len(qualId)

qualBody, err = self.r.ReadBytes('\n')
if (err == io.EOF) && (len(seqBody)-1 != len(qualBody)) {
err = errors.New("Invalid format: length of sequence and quality lines do not match")
return
} else if err != nil {
return
} else if len(seqBody) != len(qualBody) {
err = errors.New("Invalid format: length of sequence and quality lines do not match")
return
}

n = curr + len(qualBody)
return
}

// seek sequences which add up to a size close to the configured chunk size (conf.CHUNK_SIZE, e.g. 1M)
Expand Down
Loading

0 comments on commit 900efe8

Please sign in to comment.