Skip to content

Commit

Permalink
add sendmmsg/recvmmsg support
Browse files Browse the repository at this point in the history
  • Loading branch information
wwqgtxx committed Apr 22, 2023
1 parent ae38401 commit f787f0c
Show file tree
Hide file tree
Showing 6 changed files with 389 additions and 123 deletions.
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type UdpConfig struct {
type ListenerConfig struct {
BindAddress string `yaml:"bind-address"`
FallbackConfig `yaml:",inline"`
MMsg bool `yaml:"mmsg"`
}

type FallbackConfig struct {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/sagernet/tfo-go v0.0.0-20230303015439-ffcfd8c41cf9
golang.org/x/crypto v0.8.0
golang.org/x/exp v0.0.0-20230321023759-10a507213a29
golang.org/x/net v0.9.0
gopkg.in/yaml.v3 v3.0.1
)

Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ golang.org/x/crypto v0.8.0 h1:pd9TJtTueMTVQXzk8E2XESSMQDj/U7OUu0PqJqPXQjQ=
golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE=
golang.org/x/exp v0.0.0-20230321023759-10a507213a29 h1:ooxPy7fPvB4kwsA2h+iBNHkAbp/4JxTSwCmvdjEYmug=
golang.org/x/exp v0.0.0-20230321023759-10a507213a29/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM=
golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns=
golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU=
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
Expand Down
134 changes: 11 additions & 123 deletions udp/udp.go
Original file line number Diff line number Diff line change
@@ -1,143 +1,31 @@
package udp

import (
"golang.org/x/exp/slices"
"github.com/wwqgtxx/wstunnel/config"
"log"
"net"
"net/netip"
"sync"

"github.com/wwqgtxx/wstunnel/config"
cache "github.com/wwqgtxx/wstunnel/utils/lrucache"
)

const BufferSize = 16 * 1024

var BufPool = sync.Pool{New: func() any { return make([]byte, BufferSize) }}

func ListenUdp(address string) (*net.UDPConn, error) {
pc, err := net.ListenPacket("udp", address)
if err != nil {
return nil, err
}
return pc.(*net.UDPConn), nil
}

type NatItem struct {
net.Conn
sync.Mutex
}

type Tunnel struct {
nat *cache.LruCache[netip.AddrPort, *NatItem]
address string
target string
reserved []byte
}

func NewTunnel(udpConfig config.UdpConfig) *Tunnel {
nat := cache.New[netip.AddrPort, *NatItem](
cache.WithAge[netip.AddrPort, *NatItem](5*60),
cache.WithUpdateAgeOnGet[netip.AddrPort, *NatItem](),
cache.WithEvict[netip.AddrPort, *NatItem](func(key netip.AddrPort, value *NatItem) {
if conn := value.Conn; conn != nil {
log.Println("Delete", conn.LocalAddr(), "for", key, "to", conn.RemoteAddr())
_ = conn.Close()
}
}),
cache.WithCreate[netip.AddrPort, *NatItem](func(key netip.AddrPort) *NatItem {
return &NatItem{}
}),
)
t := &Tunnel{
nat: nat,
address: udpConfig.BindAddress,
target: udpConfig.TargetAddress,
reserved: slices.Clone(udpConfig.Reserved),
}
return t
}

func (t *Tunnel) Handle() {
udpConn, err := ListenUdp(t.address)
if err != nil {
log.Println(err)
return
}
for {
buf := BufPool.Get().([]byte)
n, addr, err := udpConn.ReadFromUDPAddrPort(buf)
if err != nil {
BufPool.Put(buf)
// TODO: handle close
log.Println(err)
continue
}
go func() {
defer BufPool.Put(buf)
var err error
natItem, _ := t.nat.Get(addr)
natItem.Mutex.Lock()
conn := natItem.Conn
if conn == nil {
log.Println("Dial to", t.target, "for", addr)
conn, err = net.Dial("udp", t.target)
if err != nil {
natItem.Mutex.Unlock()
log.Println(err)
return
}
log.Println("Associate from", addr, "to", conn.RemoteAddr(), "by", conn.LocalAddr())
natItem.Conn = conn
go func() {
for {
buf := BufPool.Get().([]byte)
n, err := conn.Read(buf)
if err != nil {
BufPool.Put(buf)
t.nat.Delete(addr) // it will call conn.Close() inside
log.Println(err)
return
}
if len(t.reserved) > 0 && n > len(t.reserved) { // wireguard reserved
for i := range t.reserved {
buf[i+1] = 0
}
}
_, err = udpConn.WriteToUDPAddrPort(buf[:n], addr)
BufPool.Put(buf)
if err != nil {
t.nat.Delete(addr) // it will call conn.Close() inside
log.Println(err)
return
}
}
}()
}
natItem.Mutex.Unlock()
if len(t.reserved) > 0 && n > len(t.reserved) { // wireguard reserved
copy(buf[1:], t.reserved)
}
_, err = conn.Write(buf[:n])
if err != nil {
log.Println(err)
return
}
}()

}
const MaxUdpAge = 5 * 60

type Tunnel interface {
Handle()
}

var tunnels = make(map[string]*Tunnel)
var tunnels = make(map[string]Tunnel)

func BuildUdp(udpConfig config.UdpConfig) {
_, port, err := net.SplitHostPort(udpConfig.BindAddress)
if err != nil {
log.Println(err)
return
}
tunnels[port] = NewTunnel(udpConfig)
if udpConfig.MMsg {
tunnels[port] = NewMmsgTunnel(udpConfig)
} else {
tunnels[port] = NewStdTunnel(udpConfig)
}

}

func StartUdps() {
Expand Down
Loading

0 comments on commit f787f0c

Please sign in to comment.