From d4a0c093c13541bd32b9c22b68221c89baa589ff Mon Sep 17 00:00:00 2001 From: huangmengzhu Date: Wed, 8 Mar 2023 12:08:29 +0800 Subject: [PATCH 1/7] =?UTF-8?q?=E6=8F=90=E4=BA=A4=20conn=20clone=20?= =?UTF-8?q?=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- utils/conn-clone/block.go | 48 ++++++++++++ utils/conn-clone/conn.go | 137 ++++++++++++++++++++++++++++++++++ utils/conn-clone/conn_test.go | 117 +++++++++++++++++++++++++++++ 3 files changed, 302 insertions(+) create mode 100644 utils/conn-clone/block.go create mode 100644 utils/conn-clone/conn.go create mode 100644 utils/conn-clone/conn_test.go diff --git a/utils/conn-clone/block.go b/utils/conn-clone/block.go new file mode 100644 index 00000000..1c167370 --- /dev/null +++ b/utils/conn-clone/block.go @@ -0,0 +1,48 @@ +package conn_clone + +import ( + "io" + "sync" +) + +const BuffSize = 4096 + +var ( + poolBuff = sync.Pool{ + New: func() any { + return &block{ + buf: make([]byte, BuffSize), + } + }, + } +) + +type block struct { + buf []byte + n int + err error +} + +func readBlock(r io.Reader) *block { + b := acquireBlock() + b.n, b.err = r.Read(b.buf) + return b +} +func (b *block) Release() { + b.n = 0 + b.err = nil + poolBuff.Put(b) +} +func (b *block) Clone() *block { + c := acquireBlock() + copy(b.buf, c.buf) + c.n, c.err = b.n, b.err + return c +} +func (b *block) Data() ([]byte, int, error) { + + return b.buf[:b.n], b.n, b.err +} +func acquireBlock() *block { + return poolBuff.Get().(*block) +} diff --git a/utils/conn-clone/conn.go b/utils/conn-clone/conn.go new file mode 100644 index 00000000..193bb0ba --- /dev/null +++ b/utils/conn-clone/conn.go @@ -0,0 +1,137 @@ +package conn_clone + +import ( + "container/list" + "io" + "net" + "sync" + "time" +) + +type ConnPip struct { + net.Conn + reader io.Reader +} + +func (c *ConnPip) Read(b []byte) (n int, err error) { + return c.reader.Read(b) +} + +type ConnReader struct { + conn net.Conn + reader io.Reader +} + +func (c *ConnReader) Read(b []byte) (n int, err error) { + return c.reader.Read(b) +} + +func (c *ConnReader) Write(b []byte) (n int, err error) { + return len(b), nil +} + +func (c *ConnReader) Close() error { + return nil +} + +func (c *ConnReader) LocalAddr() net.Addr { + return c.conn.LocalAddr() +} + +func (c *ConnReader) RemoteAddr() net.Addr { + return c.conn.RemoteAddr() +} + +func (c *ConnReader) SetDeadline(t time.Time) error { + return nil +} + +func (c *ConnReader) SetReadDeadline(t time.Time) error { + return nil +} + +func (c *ConnReader) SetWriteDeadline(t time.Time) error { + return nil +} + +func Clone(conn net.Conn) (rw net.Conn, r net.Conn) { + pipeRw, writerRw := io.Pipe() + pipeR, writerR := io.Pipe() + + go copyTo(conn, writerRw, writerR) + return &ConnPip{ + Conn: conn, + reader: pipeRw, + }, &ConnReader{ + conn: conn, + reader: pipeR, + } +} + +type blockChan struct { + ls *list.List + lock sync.Mutex + w *io.PipeWriter + isRun bool +} + +func newBlockChan(w *io.PipeWriter) *blockChan { + bc := &blockChan{w: w, ls: list.New()} + go rc(bc) + return bc +} +func rc(bc *blockChan) { + + for { + bc.lock.Lock() + + e := bc.ls.Front() + if e == nil { + bc.isRun = false + bc.lock.Unlock() + return + } + d := bc.ls.Remove(e).(*block) + bc.lock.Unlock() + data, n, err := d.Data() + + for i := 0; i < n; { + nw, _ := bc.w.Write(data[i:]) + i += nw + } + d.Release() + if err != nil { + bc.w.CloseWithError(err) + return + } + + } + +} +func (b *blockChan) write(d *block) { + b.lock.Lock() + + b.ls.PushBack(d) + if !b.isRun { + b.isRun = true + go rc(b) + } + b.lock.Unlock() + +} + +func copyTo(conn net.Conn, ws ...*io.PipeWriter) { + + cs := make([]*blockChan, 0, len(ws)) + for _, w := range ws { + cs = append(cs, newBlockChan(w)) + } + for { + + bc := readBlock(conn) + for _, c := range cs { + c.write(bc.Clone()) + } + bc.Release() + } +} diff --git a/utils/conn-clone/conn_test.go b/utils/conn-clone/conn_test.go new file mode 100644 index 00000000..d75ab4ea --- /dev/null +++ b/utils/conn-clone/conn_test.go @@ -0,0 +1,117 @@ +package conn_clone + +import ( + "bufio" + "bytes" + "fmt" + "math/rand" + "net" + "sync" + "testing" + "time" +) + +func TestClone(t *testing.T) { + + wg := sync.WaitGroup{} + + listen, err := net.Listen("tcp", ":9988") + if err != nil { + panic(err) + } + go func() { + + index := 0 + for { + index++ + accept, err := listen.Accept() + if err != nil { + + return + } + wg.Add(1) + + if index%2 == 0 { + go func() { + read(accept, "org..", t) + wg.Done() + }() + } else { + + go func() { + rw, r := Clone(accept) + go read(r, "clone", t) + read(rw, "main.", t) + + wg.Done() + }() + + } + } + }() + write("cle 1", t) + write("cle 2", t) + wg.Wait() + listen.Close() +} +func write(name string, testing *testing.T) { + buf := bytes.Buffer{} + for i := 0; i < 4096; i++ { + buf.WriteString(fmt.Sprint(rand.Int(), ",")) + } + + conn, err := net.Dial("tcp", "127.0.0.1:9988") + if err != nil { + + return + } + + go func() { + data := buf.Bytes() + tc := time.NewTicker(time.Second) + te := time.NewTimer(time.Second * 5) + total := int64(0) + last := int64(0) + for { + select { + case t := <-tc.C: + testing.Logf("\t[%s] %s\twrite %dk/s\n", t.Format(time.RFC3339), name, (total-last)/1024) + last = total + case <-te.C: + conn.Close() + return + default: + n, err := conn.Write(data) + if err != nil { + return + } + total += int64(n) + } + + } + }() + +} +func read(conn net.Conn, name string, testing *testing.T) { + + rb := bufio.NewReader(conn) + buf := make([]byte, 1024) + total := int64(0) + last := int64(0) + tc := time.NewTicker(time.Second) + defer tc.Stop() + for { + + select { + case t := <-tc.C: + testing.Logf("\t[%s] %s\tread %dk/s\n", t.Format(time.RFC3339), name, (total-last)/1024) + last = total + default: + r, err := rb.Read(buf) + if err != nil { + return + } + total += int64(r) + } + } +} From 1adf4847fa377f866bb8c91da55402866c8e6eff Mon Sep 17 00:00:00 2001 From: huangmengzhu Date: Thu, 9 Mar 2023 17:34:03 +0800 Subject: [PATCH 2/7] =?UTF-8?q?=E5=B0=86clone=20=E6=8B=86=E4=B8=BA=20clone?= =?UTF-8?q?=20Reader=20=E5=92=8Cclone=20net.conn?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- utils/{conn-clone => clone}/block.go | 2 +- utils/clone/conn.go | 77 +++++++++++++ utils/{conn-clone => clone}/conn_test.go | 6 +- utils/clone/reader.go | 93 +++++++++++++++ utils/conn-clone/conn.go | 137 ----------------------- 5 files changed, 174 insertions(+), 141 deletions(-) rename utils/{conn-clone => clone}/block.go (97%) create mode 100644 utils/clone/conn.go rename utils/{conn-clone => clone}/conn_test.go (95%) create mode 100644 utils/clone/reader.go delete mode 100644 utils/conn-clone/conn.go diff --git a/utils/conn-clone/block.go b/utils/clone/block.go similarity index 97% rename from utils/conn-clone/block.go rename to utils/clone/block.go index 1c167370..892e4891 100644 --- a/utils/conn-clone/block.go +++ b/utils/clone/block.go @@ -1,4 +1,4 @@ -package conn_clone +package clone import ( "io" diff --git a/utils/clone/conn.go b/utils/clone/conn.go new file mode 100644 index 00000000..97dbc47a --- /dev/null +++ b/utils/clone/conn.go @@ -0,0 +1,77 @@ +package clone + +import ( + "io" + "net" + "time" +) + +type connPip struct { + net.Conn + reader io.Reader +} + +func (c *connPip) Read(b []byte) (n int, err error) { + return c.reader.Read(b) +} + +type connReader struct { + conn net.Conn + reader io.Reader +} + +func (c *connReader) Read(b []byte) (n int, err error) { + return c.reader.Read(b) +} + +func (c *connReader) Write(b []byte) (n int, err error) { + return len(b), nil +} + +func (c *connReader) Close() error { + return nil +} + +func (c *connReader) LocalAddr() net.Addr { + return c.conn.LocalAddr() +} + +func (c *connReader) RemoteAddr() net.Addr { + return c.conn.RemoteAddr() +} + +func (c *connReader) SetDeadline(t time.Time) error { + return nil +} + +func (c *connReader) SetReadDeadline(t time.Time) error { + return nil +} + +func (c *connReader) SetWriteDeadline(t time.Time) error { + return nil +} + +func CloneConn(conn net.Conn, count int) (rw net.Conn, r []net.Conn) { + + if count < 1 { + count = 1 + } + + rs := Clone(conn, count+1) + + cw := &connPip{ + Conn: conn, + reader: rs[0], + } + rs = rs[1:] + crs := make([]net.Conn, count) + for _, r := range rs { + crs = append(crs, &connReader{ + conn: conn, + reader: r, + }) + } + return cw, crs + +} diff --git a/utils/conn-clone/conn_test.go b/utils/clone/conn_test.go similarity index 95% rename from utils/conn-clone/conn_test.go rename to utils/clone/conn_test.go index d75ab4ea..356bc3e2 100644 --- a/utils/conn-clone/conn_test.go +++ b/utils/clone/conn_test.go @@ -1,4 +1,4 @@ -package conn_clone +package clone import ( "bufio" @@ -39,8 +39,8 @@ func TestClone(t *testing.T) { } else { go func() { - rw, r := Clone(accept) - go read(r, "clone", t) + rw, r := CloneConn(accept, 1) + go read(r[0], "clone", t) read(rw, "main.", t) wg.Done() diff --git a/utils/clone/reader.go b/utils/clone/reader.go new file mode 100644 index 00000000..a2adefb0 --- /dev/null +++ b/utils/clone/reader.go @@ -0,0 +1,93 @@ +package clone + +import ( + "container/list" + "io" + "sync" +) + +func Clone(r io.Reader, size int) []io.Reader { + + if size <= 1 { + return []io.Reader{r} + } + + rs := make([]io.Reader, size) + ws := make([]*io.PipeWriter, size) + for i := 0; i < size; i++ { + reader, writer := io.Pipe() + rs = append(rs, reader) + ws = append(ws, writer) + } + go copyTo(r, ws...) + return rs +} + +type blockChan struct { + ls *list.List + lock sync.Mutex + w *io.PipeWriter + isRun bool +} + +func newBlockChan(w *io.PipeWriter) *blockChan { + bc := &blockChan{w: w, ls: list.New()} + go rc(bc) + return bc +} +func rc(bc *blockChan) { + + for { + bc.lock.Lock() + + e := bc.ls.Front() + if e == nil { + bc.isRun = false + bc.lock.Unlock() + return + } + d := bc.ls.Remove(e).(*block) + bc.lock.Unlock() + data, n, err := d.Data() + + for i := 0; i < n; { + nw, _ := bc.w.Write(data[i:]) + i += nw + } + d.Release() + if err != nil { + + bc.w.CloseWithError(err) + return + } + + } + +} +func (b *blockChan) write(d *block) { + b.lock.Lock() + + b.ls.PushBack(d) + if !b.isRun { + b.isRun = true + go rc(b) + } + b.lock.Unlock() + +} + +func copyTo(in io.Reader, ws ...*io.PipeWriter) { + + cs := make([]*blockChan, 0, len(ws)) + for _, w := range ws { + cs = append(cs, newBlockChan(w)) + } + for { + + bc := readBlock(in) + for _, c := range cs { + c.write(bc.Clone()) + } + bc.Release() + } +} diff --git a/utils/conn-clone/conn.go b/utils/conn-clone/conn.go deleted file mode 100644 index 193bb0ba..00000000 --- a/utils/conn-clone/conn.go +++ /dev/null @@ -1,137 +0,0 @@ -package conn_clone - -import ( - "container/list" - "io" - "net" - "sync" - "time" -) - -type ConnPip struct { - net.Conn - reader io.Reader -} - -func (c *ConnPip) Read(b []byte) (n int, err error) { - return c.reader.Read(b) -} - -type ConnReader struct { - conn net.Conn - reader io.Reader -} - -func (c *ConnReader) Read(b []byte) (n int, err error) { - return c.reader.Read(b) -} - -func (c *ConnReader) Write(b []byte) (n int, err error) { - return len(b), nil -} - -func (c *ConnReader) Close() error { - return nil -} - -func (c *ConnReader) LocalAddr() net.Addr { - return c.conn.LocalAddr() -} - -func (c *ConnReader) RemoteAddr() net.Addr { - return c.conn.RemoteAddr() -} - -func (c *ConnReader) SetDeadline(t time.Time) error { - return nil -} - -func (c *ConnReader) SetReadDeadline(t time.Time) error { - return nil -} - -func (c *ConnReader) SetWriteDeadline(t time.Time) error { - return nil -} - -func Clone(conn net.Conn) (rw net.Conn, r net.Conn) { - pipeRw, writerRw := io.Pipe() - pipeR, writerR := io.Pipe() - - go copyTo(conn, writerRw, writerR) - return &ConnPip{ - Conn: conn, - reader: pipeRw, - }, &ConnReader{ - conn: conn, - reader: pipeR, - } -} - -type blockChan struct { - ls *list.List - lock sync.Mutex - w *io.PipeWriter - isRun bool -} - -func newBlockChan(w *io.PipeWriter) *blockChan { - bc := &blockChan{w: w, ls: list.New()} - go rc(bc) - return bc -} -func rc(bc *blockChan) { - - for { - bc.lock.Lock() - - e := bc.ls.Front() - if e == nil { - bc.isRun = false - bc.lock.Unlock() - return - } - d := bc.ls.Remove(e).(*block) - bc.lock.Unlock() - data, n, err := d.Data() - - for i := 0; i < n; { - nw, _ := bc.w.Write(data[i:]) - i += nw - } - d.Release() - if err != nil { - bc.w.CloseWithError(err) - return - } - - } - -} -func (b *blockChan) write(d *block) { - b.lock.Lock() - - b.ls.PushBack(d) - if !b.isRun { - b.isRun = true - go rc(b) - } - b.lock.Unlock() - -} - -func copyTo(conn net.Conn, ws ...*io.PipeWriter) { - - cs := make([]*blockChan, 0, len(ws)) - for _, w := range ws { - cs = append(cs, newBlockChan(w)) - } - for { - - bc := readBlock(conn) - for _, c := range cs { - c.write(bc.Clone()) - } - bc.Release() - } -} From a4f9813bcd98107907d87a0eb3d58d851651405f Mon Sep 17 00:00:00 2001 From: huangmengzhu Date: Thu, 9 Mar 2023 17:48:28 +0800 Subject: [PATCH 3/7] bug fix --- utils/clone/block.go | 2 +- utils/clone/conn.go | 6 +++--- utils/clone/conn_test.go | 4 ++-- utils/clone/reader.go | 4 +--- 4 files changed, 7 insertions(+), 9 deletions(-) diff --git a/utils/clone/block.go b/utils/clone/block.go index 892e4891..77a17a21 100644 --- a/utils/clone/block.go +++ b/utils/clone/block.go @@ -5,7 +5,7 @@ import ( "sync" ) -const BuffSize = 4096 +const BuffSize = 8192 var ( poolBuff = sync.Pool{ diff --git a/utils/clone/conn.go b/utils/clone/conn.go index 97dbc47a..b6207f43 100644 --- a/utils/clone/conn.go +++ b/utils/clone/conn.go @@ -66,11 +66,11 @@ func CloneConn(conn net.Conn, count int) (rw net.Conn, r []net.Conn) { } rs = rs[1:] crs := make([]net.Conn, count) - for _, r := range rs { - crs = append(crs, &connReader{ + for i, r := range rs { + crs[i] = &connReader{ conn: conn, reader: r, - }) + } } return cw, crs diff --git a/utils/clone/conn_test.go b/utils/clone/conn_test.go index 356bc3e2..9eef19ac 100644 --- a/utils/clone/conn_test.go +++ b/utils/clone/conn_test.go @@ -50,7 +50,7 @@ func TestClone(t *testing.T) { } }() write("cle 1", t) - write("cle 2", t) + //write("cle 2", t) wg.Wait() listen.Close() } @@ -95,7 +95,7 @@ func write(name string, testing *testing.T) { func read(conn net.Conn, name string, testing *testing.T) { rb := bufio.NewReader(conn) - buf := make([]byte, 1024) + buf := make([]byte, 4096) total := int64(0) last := int64(0) tc := time.NewTicker(time.Second) diff --git a/utils/clone/reader.go b/utils/clone/reader.go index a2adefb0..1d9bfea8 100644 --- a/utils/clone/reader.go +++ b/utils/clone/reader.go @@ -15,9 +15,7 @@ func Clone(r io.Reader, size int) []io.Reader { rs := make([]io.Reader, size) ws := make([]*io.PipeWriter, size) for i := 0; i < size; i++ { - reader, writer := io.Pipe() - rs = append(rs, reader) - ws = append(ws, writer) + rs[i], ws[i] = io.Pipe() } go copyTo(r, ws...) return rs From f5fbbae81ca82b15f055740d2c21fa23c0e31847 Mon Sep 17 00:00:00 2001 From: huangmengzhu Date: Fri, 10 Mar 2023 11:55:10 +0800 Subject: [PATCH 4/7] =?UTF-8?q?=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- utils/clone/block.go | 2 +- utils/clone/conn_test.go | 80 ++++++++++++++++++++++++++-------------- utils/clone/reader.go | 15 +++++++- 3 files changed, 66 insertions(+), 31 deletions(-) diff --git a/utils/clone/block.go b/utils/clone/block.go index 77a17a21..892e4891 100644 --- a/utils/clone/block.go +++ b/utils/clone/block.go @@ -5,7 +5,7 @@ import ( "sync" ) -const BuffSize = 8192 +const BuffSize = 4096 var ( poolBuff = sync.Pool{ diff --git a/utils/clone/conn_test.go b/utils/clone/conn_test.go index 9eef19ac..5d243c61 100644 --- a/utils/clone/conn_test.go +++ b/utils/clone/conn_test.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "fmt" + "io" "math/rand" "net" "sync" @@ -11,7 +12,7 @@ import ( "time" ) -func TestClone(t *testing.T) { +func TestCloneTcp(t *testing.T) { wg := sync.WaitGroup{} @@ -49,52 +50,53 @@ func TestClone(t *testing.T) { } } }() - write("cle 1", t) + writeTcp("cle 1", "127.0.0.1:9988", t) //write("cle 2", t) wg.Wait() listen.Close() } -func write(name string, testing *testing.T) { +func writeTcp(name string, addr string, t *testing.T) { buf := bytes.Buffer{} for i := 0; i < 4096; i++ { buf.WriteString(fmt.Sprint(rand.Int(), ",")) } - - conn, err := net.Dial("tcp", "127.0.0.1:9988") + data := buf.Bytes() + conn, err := net.Dial("tcp", addr) if err != nil { return } - go func() { - data := buf.Bytes() - tc := time.NewTicker(time.Second) - te := time.NewTimer(time.Second * 5) - total := int64(0) - last := int64(0) - for { - select { - case t := <-tc.C: - testing.Logf("\t[%s] %s\twrite %dk/s\n", t.Format(time.RFC3339), name, (total-last)/1024) - last = total - case <-te.C: - conn.Close() + go doWrite(name, conn, data, t) + +} +func doWrite(name string, w io.WriteCloser, data []byte, testing *testing.T) { + + tc := time.NewTicker(time.Second) + te := time.NewTimer(time.Second * 5) + total := int64(0) + last := int64(0) + for { + select { + case t := <-tc.C: + testing.Logf("\t[%s] %s\twrite %dk/s\n", t.Format(time.RFC3339), name, (total-last)/1024) + last = total + case <-te.C: + w.Close() + return + default: + n, err := w.Write(data) + if err != nil { return - default: - n, err := conn.Write(data) - if err != nil { - return - } - total += int64(n) } - + total += int64(n) } - }() + } } -func read(conn net.Conn, name string, testing *testing.T) { +func read(r io.Reader, name string, testing *testing.T) { - rb := bufio.NewReader(conn) + rb := bufio.NewReader(r) buf := make([]byte, 4096) total := int64(0) last := int64(0) @@ -115,3 +117,25 @@ func read(conn net.Conn, name string, testing *testing.T) { } } } + +func TestClonePipe(t *testing.T) { + buf := bytes.Buffer{} + for i := 0; i < 8192; i++ { + buf.WriteString(fmt.Sprint(rand.Int(), ",")) + } + data := buf.Bytes() + reader, writer := io.Pipe() + + readers := Clone(reader, 3) + wg := sync.WaitGroup{} + + for i, r := range readers { + wg.Add(1) + go func(i int, r io.Reader) { + read(r, fmt.Sprintf("reader-%d", i+1), t) + wg.Done() + }(i, r) + } + + doWrite("write", writer, data, t) +} diff --git a/utils/clone/reader.go b/utils/clone/reader.go index 1d9bfea8..18d57a1a 100644 --- a/utils/clone/reader.go +++ b/utils/clone/reader.go @@ -4,6 +4,7 @@ import ( "container/list" "io" "sync" + "time" ) func Clone(r io.Reader, size int) []io.Reader { @@ -30,11 +31,12 @@ type blockChan struct { func newBlockChan(w *io.PipeWriter) *blockChan { bc := &blockChan{w: w, ls: list.New()} - go rc(bc) + return bc } func rc(bc *blockChan) { + retry := 0 for { bc.lock.Lock() @@ -42,10 +44,17 @@ func rc(bc *blockChan) { if e == nil { bc.isRun = false bc.lock.Unlock() - return + + retry++ + if retry >= 3 { + break + } + time.Sleep(time.Millisecond * time.Duration(retry)) + continue } d := bc.ls.Remove(e).(*block) bc.lock.Unlock() + data, n, err := d.Data() for i := 0; i < n; { @@ -63,11 +72,13 @@ func rc(bc *blockChan) { } func (b *blockChan) write(d *block) { + //fmt.Println("write") b.lock.Lock() b.ls.PushBack(d) if !b.isRun { b.isRun = true + //fmt.Println("run rc") go rc(b) } b.lock.Unlock() From 80372dad5c8a8d86f3d31fc0cc332de3ae3b35f9 Mon Sep 17 00:00:00 2001 From: chenjiekun Date: Fri, 10 Mar 2023 18:06:13 +0800 Subject: [PATCH 5/7] fix proxy request finish --- node/http-context/context.go | 1 + node/http-context/proxy.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/node/http-context/context.go b/node/http-context/context.go index 530826af..58b09b9b 100644 --- a/node/http-context/context.go +++ b/node/http-context/context.go @@ -267,6 +267,7 @@ func (ctx *HttpContext) FastFinish() { ctx.completeHandler = nil ctx.requestReader.Finish() + fasthttp.ReleaseRequest(ctx.requestReader.req) ctx.proxyRequest.Finish() ctx.response.Finish() ctx.fastHttpRequestCtx = nil diff --git a/node/http-context/proxy.go b/node/http-context/proxy.go index fa7f1b2e..5fcc07b1 100644 --- a/node/http-context/proxy.go +++ b/node/http-context/proxy.go @@ -21,7 +21,7 @@ type ProxyRequest struct { //} func (r *ProxyRequest) Finish() error { - fasthttp.ReleaseRequest(r.req) + //fasthttp.ReleaseRequest(r.req) err := r.RequestReader.Finish() if err != nil { log.Warn(err) From 1f022691798bf1e2f2512b9c1be6612c68f16f4d Mon Sep 17 00:00:00 2001 From: huangmengzhu Date: Fri, 10 Mar 2023 20:47:36 +0800 Subject: [PATCH 6/7] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- utils/clone/conn_test.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/utils/clone/conn_test.go b/utils/clone/conn_test.go index 5d243c61..f6e4e0ca 100644 --- a/utils/clone/conn_test.go +++ b/utils/clone/conn_test.go @@ -117,7 +117,18 @@ func read(r io.Reader, name string, testing *testing.T) { } } } +func TestClonePipe2(t *testing.T) { + wg := sync.WaitGroup{} + for i := 0; i < 6; i++ { + wg.Add(1) + go func() { + TestClonePipe(t) + wg.Done() + }() + } + wg.Wait() +} func TestClonePipe(t *testing.T) { buf := bytes.Buffer{} for i := 0; i < 8192; i++ { @@ -126,7 +137,7 @@ func TestClonePipe(t *testing.T) { data := buf.Bytes() reader, writer := io.Pipe() - readers := Clone(reader, 3) + readers := Clone(reader, 2) wg := sync.WaitGroup{} for i, r := range readers { From fb273e009ccaca3ccc5f5f01130596e86809ec56 Mon Sep 17 00:00:00 2001 From: huangmengzhu Date: Fri, 10 Mar 2023 21:14:26 +0800 Subject: [PATCH 7/7] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dfinshbug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- node/http-context/clone.go | 5 ++++- node/http-context/context.go | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/node/http-context/clone.go b/node/http-context/clone.go index 4b06ba85..e0720f12 100644 --- a/node/http-context/clone.go +++ b/node/http-context/clone.go @@ -3,6 +3,7 @@ package http_context import ( "context" "fmt" + "github.com/valyala/fasthttp" "net" "time" @@ -195,7 +196,9 @@ func (ctx *cloneContext) FastFinish() { ctx.upstreamHostHandler = nil ctx.finishHandler = nil ctx.completeHandler = nil - + fasthttp.ReleaseRequest(ctx.proxyRequest.req) + fasthttp.ReleaseResponse(ctx.response.Response) + ctx.response.Finish() ctx.proxyRequest.Finish() } diff --git a/node/http-context/context.go b/node/http-context/context.go index 58b09b9b..00543386 100644 --- a/node/http-context/context.go +++ b/node/http-context/context.go @@ -265,9 +265,9 @@ func (ctx *HttpContext) FastFinish() { ctx.upstreamHostHandler = nil ctx.finishHandler = nil ctx.completeHandler = nil + fasthttp.ReleaseRequest(ctx.requestReader.req) ctx.requestReader.Finish() - fasthttp.ReleaseRequest(ctx.requestReader.req) ctx.proxyRequest.Finish() ctx.response.Finish() ctx.fastHttpRequestCtx = nil