Skip to content

Commit

Permalink
stream guppy:// responses
Browse files Browse the repository at this point in the history
  • Loading branch information
dimkr committed Sep 20, 2024
1 parent 91321ba commit abaa1f9
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 87 deletions.
13 changes: 4 additions & 9 deletions cfg/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,10 @@ type Config struct {
GopherRequestTimeout time.Duration
LineWidth int

GuppyRequestTimeout time.Duration
MaxGuppySessions int
GuppyChunkTimeout time.Duration
GuppyResponseChunkSize int
MaxSentGuppyChunks int
GuppyRequestTimeout time.Duration
MaxGuppySessions int
GuppyChunkTimeout time.Duration
MaxSentGuppyChunks int

DeliveryBatchSize int
DeliveryRetryInterval int64
Expand Down Expand Up @@ -273,10 +272,6 @@ func (c *Config) FillDefaults() {
c.GuppyChunkTimeout = time.Second * 2
}

if c.GuppyResponseChunkSize <= 0 {
c.GuppyResponseChunkSize = 512
}

if c.MaxSentGuppyChunks <= 0 {
c.MaxSentGuppyChunks = 8
}
Expand Down
161 changes: 83 additions & 78 deletions front/guppy/guppy.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,16 @@ package guppy
import (
"bytes"
"context"
"errors"
"fmt"
"github.com/dimkr/tootik/cfg"
"github.com/dimkr/tootik/front"
"github.com/dimkr/tootik/front/text/guppy"
"io"
"log/slog"
"math"
"math/rand/v2"
"net"
"net/url"
"slices"
"sync"
"time"
)
Expand All @@ -47,6 +46,10 @@ type incomingPacket struct {
From net.Addr
}

type chanWriter struct {
c chan<- []byte
}

type responseChunk struct {
Data []byte
Seq int
Expand All @@ -59,6 +62,11 @@ const (
retryInterval = time.Millisecond * 100
)

func (w chanWriter) Write(p []byte) (int, error) {
w.c <- slices.Clone(p)
return len(p), nil
}

func (gl *Listener) handle(ctx context.Context, from net.Addr, req []byte, acks <-chan []byte, done chan<- string, s net.PacketConn) {
defer func() {
done <- from.String()
Expand All @@ -82,109 +90,108 @@ func (gl *Listener) handle(ctx context.Context, from net.Addr, req []byte, acks

seq := 6 + rand.IntN(math.MaxInt16/2)

var buf bytes.Buffer
w := guppy.Wrap(&buf, seq)
c := make(chan []byte)
w := guppy.Wrap(&chanWriter{c}, seq)

if r.URL.Host != gl.Domain {
w.Status(4, "Wrong host")
} else {
slog.Info("Handling request", "path", r.URL.Path, "from", from)
r.Log = slog.With(slog.Group("request", "path", r.URL.Path))
gl.Handler.Handle(&r, w)
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()

if ctx.Err() != nil {
slog.Warn("Failed to handle request in time", "path", r.URL.Path, "from", from)
return
}
if r.URL.Host != gl.Domain {
w.Status(4, "Wrong host")
} else {
slog.Info("Handling request", "path", r.URL.Path, "from", from)
r.Log = slog.With(slog.Group("request", "path", r.URL.Path))

gl.Handler.Handle(&r, w)
}

chunk := make([]byte, gl.Config.GuppyResponseChunkSize)
w.Flush()
close(c)
}()

w.Flush()
n, err := buf.Read(chunk)
if err != nil {
slog.Error("Failed to read first respone chunk", "error", err)
defer wg.Wait()

chunk, ok := <-c
if !ok {
slog.Warn("Failed to read first respone chunk", "path", r.URL.Path, "from", from)
return
}

chunks := make([]responseChunk, 1, buf.Len()/gl.Config.GuppyResponseChunkSize+2)
chunks := make([]responseChunk, 1, 2)
chunks[0].Seq = seq
chunks[0].Data = chunk[:n]
chunks[0].Data = chunk

// fix the sequence number if the response is cached
// TODO: something less ugly
space := bytes.IndexByte(chunk[:n], ' ')
space := bytes.IndexByte(chunk, ' ')
if string(chunk[:space]) != "1" && string(chunk[:space]) != "3" && string(chunk[:space]) != "4" {
chunks[0].Data = append([]byte(fmt.Sprintf("%d", seq)), chunk[space:n]...)
}

for {
seq++
statusLine := fmt.Sprintf("%d\r\n", seq)
n, err := buf.Read(chunk)
if err != nil && errors.Is(err, io.EOF) {
// this is the EOF packet
chunks = append(chunks, responseChunk{Data: []byte(statusLine), Seq: seq})
break
} else if err != nil {
slog.Error("Failed to read respone chunk", "error", err)
return
}
chunks = append(chunks, responseChunk{Data: append([]byte(statusLine), chunk[:n]...), Seq: seq})
chunks[0].Data = append([]byte(fmt.Sprintf("%d", seq)), chunk[space:]...)
}

retry := time.NewTicker(retryInterval)
defer retry.Stop()

slog.Debug("Sending response", "path", r.URL.Path, "from", from, "first", chunks[0].Seq, "last", chunks[len(chunks)-1].Seq, "chunks", len(chunks))
slog.Debug("Sending response", "path", r.URL.Path, "from", from, "first", chunks[0].Seq)

firstTime := true
eofReceived := false

for {
if !firstTime {
select {
case <-ctx.Done():
select {
case <-ctx.Done():
slog.Warn("Session timed out", "path", r.URL.Path, "from", from)
return

case ack, ok := <-acks:
if !ok {
slog.Warn("Session timed out", "path", r.URL.Path, "from", from)
return
}

case ack, ok := <-acks:
if !ok {
slog.Warn("Session timed out", "path", r.URL.Path, "from", from)
return
}

var ackedSeq int
n, err := fmt.Sscanf(string(ack), "%d\r\n", &ackedSeq)
if err != nil {
slog.Debug("Received invalid ack", "path", r.URL.Path, "from", from, "ack", string(ack), "error", err)
continue
}
if n < 1 {
slog.Debug("Received invalid ack", "path", r.URL.Path, "from", from, "ack", string(ack))
continue
}
var ackedSeq int
n, err := fmt.Sscanf(string(ack), "%d\r\n", &ackedSeq)
if err != nil {
slog.Debug("Received invalid ack", "path", r.URL.Path, "from", from, "ack", string(ack), "error", err)
continue
}
if n < 1 {
slog.Debug("Received invalid ack", "path", r.URL.Path, "from", from, "ack", string(ack))
continue
}

i := ackedSeq - chunks[0].Seq
if i < 0 || i >= len(chunks) {
slog.Debug("Received invalid ack", "path", r.URL.Path, "from", from, "ack", string(ack))
continue
}
i := ackedSeq - chunks[0].Seq
if i < 0 || i >= len(chunks) {
slog.Debug("Received invalid ack", "path", r.URL.Path, "from", from, "ack", string(ack))
continue
}

if chunks[i].Acked {
slog.Debug("Received duplicate ack", "path", r.URL.Path, "from", from, "acked", ackedSeq)
continue
}
if chunks[i].Acked {
slog.Debug("Received duplicate ack", "path", r.URL.Path, "from", from, "acked", ackedSeq)
continue
}

slog.Debug("Marking packet as received", "path", r.URL.Path, "from", from, "acked", ackedSeq)
chunks[i].Acked = true
slog.Debug("Marking packet as received", "path", r.URL.Path, "from", from, "acked", ackedSeq)
chunks[i].Acked = true

// stop if the acked packet is the EOF packet
if i == len(chunks)-1 {
return
}
// stop if the acked packet is the EOF packet
if eofReceived && i == len(chunks)-1 {
return
}

case <-retry.C:
case chunk, ok := <-c:
if !ok && !eofReceived {
seq++
statusLine := fmt.Sprintf("%d\r\n", seq)
chunks = append(chunks, responseChunk{Data: []byte(statusLine), Seq: seq})
eofReceived = true
} else if ok {
seq++
statusLine := fmt.Sprintf("%d\r\n", seq)
chunks = append(chunks, responseChunk{Data: append([]byte(statusLine), chunk...), Seq: seq})
}

case <-retry.C:
}

now := time.Now()
Expand All @@ -205,8 +212,6 @@ func (gl *Listener) handle(ctx context.Context, from net.Addr, req []byte, acks
chunks[i].Sent = now
sent++
}

firstTime = false
}
}

Expand Down

0 comments on commit abaa1f9

Please sign in to comment.