Skip to content

Commit

Permalink
refactor nsqd storage engine
Browse files Browse the repository at this point in the history
  • Loading branch information
andyxning committed Jul 28, 2019
1 parent 1db3903 commit 29bd197
Show file tree
Hide file tree
Showing 8 changed files with 567 additions and 27 deletions.
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ require (
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db
github.com/judwhite/go-svc v1.0.0
github.com/julienschmidt/httprouter v1.2.0
github.com/kr/pretty v0.1.0 // indirect
github.com/mreiferson/go-options v0.0.0-20190302015348-0c63f026bcd6
github.com/nsqio/go-diskqueue v0.0.0-20180306152900-74cfbc9de839
github.com/nsqio/go-nsq v1.0.7
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/testify v1.2.2 // indirect
golang.org/x/sys v0.0.0-20181221143128-b4a75ba826a6 // indirect
github.com/vmihailenco/msgpack v4.0.4+incompatible
google.golang.org/appengine v1.6.1 // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
)
30 changes: 28 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,19 @@ github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b h1:AP/Y7sqYicnjGDf
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/judwhite/go-svc v1.0.0 h1:W447kYhZsqC14hkfNG8XLy9wbYibeMW75g5DtAIpFGw=
github.com/judwhite/go-svc v1.0.0/go.mod h1:EeMSAFO3mLgEQfcvnZ50JDG0O1uQlagpAbMS6talrXE=
github.com/julienschmidt/httprouter v1.2.0 h1:TDTW5Yz1mjftljbcKqRcrYhd4XeOoI98t+9HbQbYf7g=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/mreiferson/go-options v0.0.0-20190302015348-0c63f026bcd6 h1:frRvTmIp7QT1RPaphBvr6zvEHfvdOX7jMO7rvicCH9Q=
github.com/mreiferson/go-options v0.0.0-20190302015348-0c63f026bcd6/go.mod h1:zHtCks/HQvOt8ATyfwVe3JJq2PPuImzXINPRTC03+9w=
github.com/nsqio/go-diskqueue v0.0.0-20180306152900-74cfbc9de839 h1:nZ0z0haJRzCXAWH9Jl+BUnfD2n2MCSbGRSl8VBX+zR0=
Expand All @@ -26,5 +33,24 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
golang.org/x/sys v0.0.0-20181221143128-b4a75ba826a6 h1:IcgEB62HYgAhX0Nd/QrVgZlxlcyxbGQHElLUhW2X4Fo=
golang.org/x/sys v0.0.0-20181221143128-b4a75ba826a6/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
github.com/vmihailenco/msgpack v4.0.4+incompatible h1:dSLoQfGFAo3F6OoNhwUmLwVgaUXK79GlxNBwueZn0xI=
github.com/vmihailenco/msgpack v4.0.4+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190603091049-60506f45cf65 h1:+rhAzEzT3f4JtomfC371qB+0Ola2caSKcY69NUBZrRQ=
golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190606165138-5da285871e9c h1:+EXw7AwNOKzPFXMZ1yNjO40aWCh3PIquJB2fYlv9wcs=
golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190606124116-d0a3d012864b/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
google.golang.org/appengine v1.6.1 h1:QzqyMA1tlu6CgqCDUtU9V+ZKhLFT2dkJuANu5QaxI3I=
google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
2 changes: 1 addition & 1 deletion nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func NewChannel(topicName string, channelName string, ctx *context,
ctx.nsqd.getOpts().DataPath,
ctx.nsqd.getOpts().MaxBytesPerFile,
int32(minValidMsgLength),
int32(ctx.nsqd.getOpts().MaxMsgSize)+minValidMsgLength,
int32(ctx.nsqd.getOpts().MaxMsgSize)+maxValidMsgLength,
ctx.nsqd.getOpts().SyncEvery,
ctx.nsqd.getOpts().SyncTimeout,
dqLogf,
Expand Down
2 changes: 1 addition & 1 deletion nsqd/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprout
}

msg := NewMessage(topic.GenerateID(), body)
msg.deferred = deferred
msg.AbsTs = time.Now().Add(deferred).UnixNano()
err = topic.PutMessage(msg)
if err != nil {
return nil, http_api.Err{503, "EXITING"}
Expand Down
131 changes: 114 additions & 17 deletions nsqd/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,49 @@ package nsqd
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"math"
"time"

"github.com/vmihailenco/msgpack"
)

var (
// First 4 bytes picked from hex representation of a day before epoch timestamp which should never exist in normal timestamp.
// python3 -c 'import struct; import datetime; print(struct.pack(">Q", int((datetime.datetime(1990, 1, 1).timestamp() - 60*60*24) * 10**9)))'
msgMagic = []byte{0x08, 0xc1, 0xe4, 0xa0}

metaKey = []byte("meta")
bodyKey = []byte("body")
metaLengthPlaceholder = []byte("01") // 2 bytes
bodyLengthPlaceholder = []byte("01234567") // 8 bytes. Because `MaxMsgSize` is in int64 type.

// No const or reference directly are mainly used for unit test.
maxMetaLen uint16 = math.MaxUint16
)

const (
MsgIDLength = 16
minValidMsgLength = MsgIDLength + 8 + 2 // Timestamp + Attempts
minValidMsgLength = 4 + 4 + 2 + 4 + 8 // msgMagic + metaKey + metaLen + bodyKey + bodyLen
maxValidMsgLength = minValidMsgLength + math.MaxUint16 // minValidMsgLength + maxMetaLength
)

type MessageID [MsgIDLength]byte

type Message struct {
ID MessageID
Body []byte
Timestamp int64
Attempts uint16
ID MessageID `msgpack:"message_id"`
Body []byte `msgpack:"-"`
Timestamp int64 `msgpack:"timestamp"`
Attempts uint16 `msgpack:"attempts"`
AbsTs int64 `msgpack:"abs_ts"`

// for in-flight handling
deliveryTS time.Time
clientID int64
pri int64
index int
deferred time.Duration
}

func NewMessage(id MessageID, body []byte) *Message {
Expand All @@ -40,21 +59,70 @@ func NewMessage(id MessageID, body []byte) *Message {
func (m *Message) WriteTo(w io.Writer) (int64, error) {
var buf [10]byte
var total int64

binary.BigEndian.PutUint64(buf[:8], uint64(m.Timestamp))
binary.BigEndian.PutUint16(buf[8:10], uint16(m.Attempts))

n, err := w.Write(buf[:])
total += int64(n)
if err != nil {
return total, err
}

n, err = w.Write(m.ID[:])
total += int64(n)
if err != nil {
return total, err
}
n, err = w.Write(m.Body)
total += int64(n)
if err != nil {
return total, err
}
return total, nil
}

func (m *Message) WriteToBackend(w io.Writer) (int64, error) {
var total int64

// magic bytes
n, err := w.Write(msgMagic)
total += int64(n)
if err != nil {
return total, err
}

// meta bytes
meta, err := msgpack.Marshal(m)
if err != nil {
return total, err
}

if len(meta) > int(maxMetaLen) {
return total, errors.New("marshaled meta data length exceeds max meta length")
}

var metaPrefix = append(metaKey, metaLengthPlaceholder...)
binary.BigEndian.PutUint16(metaPrefix[4:4+len(metaLengthPlaceholder)], uint16(len(meta)))

n, err = w.Write(metaPrefix[:])
total += int64(n)
if err != nil {
return total, err
}

n, err = w.Write(meta)
total += int64(n)
if err != nil {
return total, err
}

// msg body
bodyPrefix := append(bodyKey, bodyLengthPlaceholder...)
binary.BigEndian.PutUint64(bodyPrefix[4:4+len(bodyLengthPlaceholder)], uint64(len(m.Body)))

n, err = w.Write(bodyPrefix[:])
total += int64(n)
if err != nil {
return total, err
}

n, err = w.Write(m.Body)
total += int64(n)
Expand All @@ -67,6 +135,8 @@ func (m *Message) WriteTo(w io.Writer) (int64, error) {

// decodeMessage deserializes data (as []byte) and creates a new Message
// message format:
//
// Old message format:
// [x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]...
// | (int64) || || (hex string encoded in ASCII) || (binary)
// | 8-byte || || 16-byte || N-byte
Expand All @@ -75,24 +145,51 @@ func (m *Message) WriteTo(w io.Writer) (int64, error) {
// (uint16)
// 2-byte
// attempts
//
// New message format:
// [x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]...
// | ([]byte)|| (metaKey+metaLen+meta) || (bodyKey+bodyLen+body)
// | 4-byte || (4+2+N)-byte || (4+8+N)-byte
// ------------------------------------------------------------------------------------------...
// message magic message meta message body
//
func decodeMessage(b []byte) (*Message, error) {
var msg Message

if len(b) < minValidMsgLength {
return nil, fmt.Errorf("invalid message buffer size (%d)", len(b))
prefixBytes := b[:len(msgMagic)]
if bytes.Equal(prefixBytes, msgMagic) {
// New message format
metaStartIndex := len(msgMagic)
if !bytes.Equal(b[metaStartIndex:metaStartIndex+len(metaKey)], metaKey) {
return nil, fmt.Errorf("bad msg format. \"meta\" key should be after msg magic")
}

metaSize := binary.BigEndian.Uint16(b[metaStartIndex+len(metaKey) : metaStartIndex+len(metaKey)+len(metaLengthPlaceholder)])
err := msgpack.Unmarshal(b[metaStartIndex+len(metaKey)+len(metaLengthPlaceholder):metaStartIndex+len(metaKey)+len(metaLengthPlaceholder)+int(metaSize)], &msg)
if err != nil {
return nil, err
}

bodyStartIndex := metaStartIndex + len(bodyKey) + len(metaLengthPlaceholder) + int(metaSize)
if !bytes.Equal(b[bodyStartIndex:bodyStartIndex+len(bodyKey)], bodyKey) {
return nil, fmt.Errorf("bad msg format. \"body\" key should be after meta content")
}
bodySize := binary.BigEndian.Uint64(b[bodyStartIndex+len(bodyKey) : bodyStartIndex+len(bodyKey)+len(bodyLengthPlaceholder)])
msg.Body = b[bodyStartIndex+len(bodyKey)+len(bodyLengthPlaceholder) : uint64(bodyStartIndex+len(bodyKey)+len(bodyLengthPlaceholder))+bodySize]
} else {
// Old message format
msg.Timestamp = int64(binary.BigEndian.Uint64(b[:8]))
msg.Attempts = binary.BigEndian.Uint16(b[8:10])
copy(msg.ID[:], b[10:10+MsgIDLength])
msg.Body = b[10+MsgIDLength:]
}

msg.Timestamp = int64(binary.BigEndian.Uint64(b[:8]))
msg.Attempts = binary.BigEndian.Uint16(b[8:10])
copy(msg.ID[:], b[10:10+MsgIDLength])
msg.Body = b[10+MsgIDLength:]

return &msg, nil
}

func writeMessageToBackend(buf *bytes.Buffer, msg *Message, bq BackendQueue) error {
buf.Reset()
_, err := msg.WriteTo(buf)
_, err := msg.WriteToBackend(buf)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 29bd197

Please sign in to comment.