Skip to content

Commit

Permalink
Merge remote-tracking branch 'gitlab/main'
Browse files Browse the repository at this point in the history
  • Loading branch information
Dot-Liu committed Mar 12, 2023
2 parents fedd602 + d514bd6 commit 6db3ae7
Show file tree
Hide file tree
Showing 7 changed files with 385 additions and 2 deletions.
5 changes: 4 additions & 1 deletion node/http-context/clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package http_context
import (
"context"
"fmt"
"github.com/valyala/fasthttp"
"net"
"time"

Expand Down Expand Up @@ -195,7 +196,9 @@ func (ctx *cloneContext) FastFinish() {
ctx.upstreamHostHandler = nil
ctx.finishHandler = nil
ctx.completeHandler = nil

fasthttp.ReleaseRequest(ctx.proxyRequest.req)
fasthttp.ReleaseResponse(ctx.response.Response)
ctx.response.Finish()
ctx.proxyRequest.Finish()

}
1 change: 1 addition & 0 deletions node/http-context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ func (ctx *HttpContext) FastFinish() {
ctx.upstreamHostHandler = nil
ctx.finishHandler = nil
ctx.completeHandler = nil
fasthttp.ReleaseRequest(ctx.requestReader.req)

ctx.requestReader.Finish()
ctx.proxyRequest.Finish()
Expand Down
2 changes: 1 addition & 1 deletion node/http-context/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type ProxyRequest struct {
//}

func (r *ProxyRequest) Finish() error {
fasthttp.ReleaseRequest(r.req)
//fasthttp.ReleaseRequest(r.req)
err := r.RequestReader.Finish()
if err != nil {
log.Warn(err)
Expand Down
48 changes: 48 additions & 0 deletions utils/clone/block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package clone

import (
"io"
"sync"
)

const BuffSize = 4096

var (
poolBuff = sync.Pool{
New: func() any {
return &block{
buf: make([]byte, BuffSize),
}
},
}
)

type block struct {
buf []byte
n int
err error
}

func readBlock(r io.Reader) *block {
b := acquireBlock()
b.n, b.err = r.Read(b.buf)
return b
}
func (b *block) Release() {
b.n = 0
b.err = nil
poolBuff.Put(b)
}
func (b *block) Clone() *block {
c := acquireBlock()
copy(b.buf, c.buf)
c.n, c.err = b.n, b.err
return c
}
func (b *block) Data() ([]byte, int, error) {

return b.buf[:b.n], b.n, b.err
}
func acquireBlock() *block {
return poolBuff.Get().(*block)
}
77 changes: 77 additions & 0 deletions utils/clone/conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package clone

import (
"io"
"net"
"time"
)

type connPip struct {
net.Conn
reader io.Reader
}

func (c *connPip) Read(b []byte) (n int, err error) {
return c.reader.Read(b)
}

type connReader struct {
conn net.Conn
reader io.Reader
}

func (c *connReader) Read(b []byte) (n int, err error) {
return c.reader.Read(b)
}

func (c *connReader) Write(b []byte) (n int, err error) {
return len(b), nil
}

func (c *connReader) Close() error {
return nil
}

func (c *connReader) LocalAddr() net.Addr {
return c.conn.LocalAddr()
}

func (c *connReader) RemoteAddr() net.Addr {
return c.conn.RemoteAddr()
}

func (c *connReader) SetDeadline(t time.Time) error {
return nil
}

func (c *connReader) SetReadDeadline(t time.Time) error {
return nil
}

func (c *connReader) SetWriteDeadline(t time.Time) error {
return nil
}

func CloneConn(conn net.Conn, count int) (rw net.Conn, r []net.Conn) {

if count < 1 {
count = 1
}

rs := Clone(conn, count+1)

cw := &connPip{
Conn: conn,
reader: rs[0],
}
rs = rs[1:]
crs := make([]net.Conn, count)
for i, r := range rs {
crs[i] = &connReader{
conn: conn,
reader: r,
}
}
return cw, crs

}
152 changes: 152 additions & 0 deletions utils/clone/conn_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package clone

import (
"bufio"
"bytes"
"fmt"
"io"
"math/rand"
"net"
"sync"
"testing"
"time"
)

func TestCloneTcp(t *testing.T) {

wg := sync.WaitGroup{}

listen, err := net.Listen("tcp", ":9988")
if err != nil {
panic(err)
}
go func() {

index := 0
for {
index++
accept, err := listen.Accept()
if err != nil {

return
}
wg.Add(1)

if index%2 == 0 {
go func() {
read(accept, "org..", t)
wg.Done()
}()
} else {

go func() {
rw, r := CloneConn(accept, 1)
go read(r[0], "clone", t)
read(rw, "main.", t)

wg.Done()
}()

}
}
}()
writeTcp("cle 1", "127.0.0.1:9988", t)
//write("cle 2", t)
wg.Wait()
listen.Close()
}
func writeTcp(name string, addr string, t *testing.T) {
buf := bytes.Buffer{}
for i := 0; i < 4096; i++ {
buf.WriteString(fmt.Sprint(rand.Int(), ","))
}
data := buf.Bytes()
conn, err := net.Dial("tcp", addr)
if err != nil {

return
}

go doWrite(name, conn, data, t)

}
func doWrite(name string, w io.WriteCloser, data []byte, testing *testing.T) {

tc := time.NewTicker(time.Second)
te := time.NewTimer(time.Second * 5)
total := int64(0)
last := int64(0)
for {
select {
case t := <-tc.C:
testing.Logf("\t[%s] %s\twrite %dk/s\n", t.Format(time.RFC3339), name, (total-last)/1024)
last = total
case <-te.C:
w.Close()
return
default:
n, err := w.Write(data)
if err != nil {
return
}
total += int64(n)
}

}
}
func read(r io.Reader, name string, testing *testing.T) {

rb := bufio.NewReader(r)
buf := make([]byte, 4096)
total := int64(0)
last := int64(0)
tc := time.NewTicker(time.Second)
defer tc.Stop()
for {

select {
case t := <-tc.C:
testing.Logf("\t[%s] %s\tread %dk/s\n", t.Format(time.RFC3339), name, (total-last)/1024)
last = total
default:
r, err := rb.Read(buf)
if err != nil {
return
}
total += int64(r)
}
}
}
func TestClonePipe2(t *testing.T) {
wg := sync.WaitGroup{}
for i := 0; i < 6; i++ {
wg.Add(1)
go func() {
TestClonePipe(t)
wg.Done()
}()
}
wg.Wait()

}
func TestClonePipe(t *testing.T) {
buf := bytes.Buffer{}
for i := 0; i < 8192; i++ {
buf.WriteString(fmt.Sprint(rand.Int(), ","))
}
data := buf.Bytes()
reader, writer := io.Pipe()

readers := Clone(reader, 2)
wg := sync.WaitGroup{}

for i, r := range readers {
wg.Add(1)
go func(i int, r io.Reader) {
read(r, fmt.Sprintf("reader-%d", i+1), t)
wg.Done()
}(i, r)
}

doWrite("write", writer, data, t)
}
Loading

0 comments on commit 6db3ae7

Please sign in to comment.