Skip to content

Commit

Permalink
add shadowsocks2022 udp client support
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaokangwang committed Nov 19, 2023
1 parent 690d1f2 commit 1c9cace
Show file tree
Hide file tree
Showing 8 changed files with 623 additions and 24 deletions.
159 changes: 139 additions & 20 deletions proxy/shadowsocks2022/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,19 @@ import (
"context"
"github.com/v2fly/v2ray-core/v5/common"
"github.com/v2fly/v2ray-core/v5/common/buf"
"github.com/v2fly/v2ray-core/v5/common/environment"
"github.com/v2fly/v2ray-core/v5/common/environment/envctx"
"github.com/v2fly/v2ray-core/v5/common/net"
"github.com/v2fly/v2ray-core/v5/common/net/packetaddr"
"github.com/v2fly/v2ray-core/v5/common/retry"
"github.com/v2fly/v2ray-core/v5/common/session"
"github.com/v2fly/v2ray-core/v5/common/signal"
"github.com/v2fly/v2ray-core/v5/common/task"
"github.com/v2fly/v2ray-core/v5/transport"
"github.com/v2fly/v2ray-core/v5/transport/internet"
"github.com/v2fly/v2ray-core/v5/transport/internet/udp"
gonet "net"
"sync"
"time"
)

Expand All @@ -19,32 +25,40 @@ type Client struct {
ctx context.Context
}

func (c *Client) Process(ctx context.Context, link *transport.Link, dialer internet.Dialer) error {
outbound := session.OutboundFromContext(ctx)
if outbound == nil || !outbound.Target.IsValid() {
return newError("target not specified")
}
destination := outbound.Target
network := destination.Network
const UDPConnectionState = "UDPConnectionState"

var conn internet.Connection
type ClientUDPConnState struct {
session *ClientUDPSession
initOnce *sync.Once
}

err := retry.ExponentialBackoff(5, 100).On(func() error {
dest := net.TCPDestination(c.config.Address.AsAddress(), net.Port(c.config.Port))
dest.Network = network
rawConn, err := dialer.Dial(ctx, dest)
func (c *ClientUDPConnState) GetOrCreateSession(create func() (*ClientUDPSession, error)) (*ClientUDPSession, error) {
var err error
c.initOnce.Do(func() {
sessionState, err := create()
if err != nil {
return err
err = newError("failed to create UDP session").Base(err)
return
}
conn = rawConn

return nil
c.session = sessionState
})
if err != nil {
return newError("failed to find an available destination").AtWarning().Base(err)
return nil, newError("failed to initialize UDP State").Base(err)
}
return c.session, nil
}

func NewClientUDPConnState() (*ClientUDPConnState, error) {
return &ClientUDPConnState{initOnce: &sync.Once{}}, nil
}

func (c *Client) Process(ctx context.Context, link *transport.Link, dialer internet.Dialer) error {
outbound := session.OutboundFromContext(ctx)
if outbound == nil || !outbound.Target.IsValid() {
return newError("target not specified")
}
newError("tunneling request to ", destination, " via ", network, ":", c.config.Address).WriteToLog(session.ExportIDToError(ctx))
defer conn.Close()
destination := outbound.Target
network := destination.Network

var keyDerivation = newBLAKE3KeyDerivation()
var method Method
Expand All @@ -62,7 +76,44 @@ func (c *Client) Process(ctx context.Context, link *transport.Link, dialer inter
ctx, cancel := context.WithCancel(ctx)
timer := signal.CancelAfterInactivity(ctx, cancel, time.Minute)

if packetConn, err := packetaddr.ToPacketAddrConn(link, destination); err == nil {
udpSession, err := c.getUDPSession(c.ctx, network, dialer, method, keyDerivation)
if err != nil {
return newError("failed to get UDP udpSession").Base(err)
}
requestDone := func() error {
return udp.CopyPacketConn(udpSession, packetConn, udp.UpdateActivity(timer))
}
responseDone := func() error {
return udp.CopyPacketConn(packetConn, udpSession, udp.UpdateActivity(timer))
}
responseDoneAndCloseWriter := task.OnSuccess(responseDone, task.Close(link.Writer))
if err := task.Run(ctx, requestDone, responseDoneAndCloseWriter); err != nil {
return newError("connection ends").Base(err)
}
return nil
}

if network == net.Network_TCP {
var conn internet.Connection
err := retry.ExponentialBackoff(5, 100).On(func() error {
dest := net.TCPDestination(c.config.Address.AsAddress(), net.Port(c.config.Port))
dest.Network = network
rawConn, err := dialer.Dial(ctx, dest)
if err != nil {
return err
}
conn = rawConn

return nil
})

if err != nil {
return newError("failed to find an available destination").AtWarning().Base(err)
}
newError("tunneling request to ", destination, " via ", network, ":", c.config.Address).WriteToLog(session.ExportIDToError(ctx))
defer conn.Close()

request := &TCPRequest{
keyDerivation: keyDerivation,
method: method,
Expand Down Expand Up @@ -107,11 +158,79 @@ func (c *Client) Process(ctx context.Context, link *transport.Link, dialer inter
}
return nil
} else {
return newError("not implemented")
udpSession, err := c.getUDPSession(c.ctx, network, dialer, method, keyDerivation)
if err != nil {
return newError("failed to get UDP udpSession").Base(err)
}
monoDestUDPConn := udp.NewMonoDestUDPConn(udpSession, &gonet.UDPAddr{IP: destination.Address.IP(), Port: int(destination.Port)})
requestDone := func() error {
return buf.Copy(link.Reader, monoDestUDPConn, buf.UpdateActivity(timer))
}
responseDone := func() error {
return buf.Copy(monoDestUDPConn, link.Writer, buf.UpdateActivity(timer))
}
responseDoneAndCloseWriter := task.OnSuccess(responseDone, task.Close(link.Writer))
if err := task.Run(ctx, requestDone, responseDoneAndCloseWriter); err != nil {
return newError("connection ends").Base(err)
}
return nil
}
}

func (c *Client) getUDPSession(ctx context.Context, network net.Network, dialer internet.Dialer, method Method, keyDerivation *BLAKE3KeyDerivation) (internet.AbstractPacketConn, error) {
storage := envctx.EnvironmentFromContext(ctx).(environment.ProxyEnvironment).TransientStorage()
clientUDPStateIfce, err := storage.Get(ctx, UDPConnectionState)
if err != nil {
return nil, newError("failed to get UDP connection state").Base(err)
}
clientUDPState, ok := clientUDPStateIfce.(*ClientUDPConnState)
if !ok {
return nil, newError("failed to cast UDP connection state")
}

sessionState, err := clientUDPState.GetOrCreateSession(func() (*ClientUDPSession, error) {
var conn internet.Connection
err := retry.ExponentialBackoff(5, 100).On(func() error {
dest := net.TCPDestination(c.config.Address.AsAddress(), net.Port(c.config.Port))
dest.Network = network
rawConn, err := dialer.Dial(ctx, dest)
if err != nil {
return err
}
conn = rawConn

return nil
})
if err != nil {
return nil, newError("failed to find an available destination").AtWarning().Base(err)
}
newError("creating udp session to ", network, ":", c.config.Address).WriteToLog(session.ExportIDToError(ctx))
packetProcessor, err := method.GetUDPClientProcessor(c.config.Ipsk, c.config.Psk, keyDerivation)
if err != nil {
return nil, newError("failed to create UDP client packet processor").Base(err)
}
return NewClientUDPSession(ctx, conn, packetProcessor), nil
})
if err != nil {
return nil, newError("failed to create UDP session").Base(err)
}
sessionConn, err := sessionState.NewSessionConn()
if err != nil {
return nil, newError("failed to create UDP session connection").Base(err)
}
return sessionConn, nil
}

func NewClient(ctx context.Context, config *ClientConfig) (*Client, error) {

storage := envctx.EnvironmentFromContext(ctx).(environment.ProxyEnvironment).TransientStorage()

udpState, err := NewClientUDPConnState()
if err != nil {
return nil, newError("failed to create UDP connection state").Base(err)
}
storage.Put(ctx, UDPConnectionState, udpState)

return &Client{
config: config,
ctx: ctx,
Expand Down
162 changes: 162 additions & 0 deletions proxy/shadowsocks2022/client_session.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package shadowsocks2022

import (
"context"
"crypto/rand"
"github.com/v2fly/v2ray-core/v5/common/buf"
"github.com/v2fly/v2ray-core/v5/common/net"
"github.com/v2fly/v2ray-core/v5/transport/internet"
"io"
gonet "net"
"sync"
"time"
)

func NewClientUDPSession(ctx context.Context, conn io.ReadWriteCloser, packetProcessor UDPClientPacketProcessor) *ClientUDPSession {
session := &ClientUDPSession{
locker: &sync.Mutex{},
conn: conn,
packetProcessor: packetProcessor,
sessionMap: make(map[string]*ClientUDPSessionConn),
}
session.ctx, session.finish = context.WithCancel(ctx)

go session.KeepReading()
return session
}

type ClientUDPSession struct {
locker *sync.Mutex

conn io.ReadWriteCloser
packetProcessor UDPClientPacketProcessor
sessionMap map[string]*ClientUDPSessionConn

ctx context.Context
finish func()
}

func (c *ClientUDPSession) Close() error {
c.finish()
return c.conn.Close()
}

func (c *ClientUDPSession) WriteUDPRequest(request *UDPRequest) error {
buffer := buf.New()
defer buffer.Release()
err := c.packetProcessor.EncodeUDPRequest(request, buffer)
if request.Payload != nil {
request.Payload.Release()
}
if err != nil {
return newError("unable to encode udp request").Base(err)
}
_, err = c.conn.Write(buffer.Bytes())
if err != nil {
return newError("unable to write to conn").Base(err)
}
return nil
}

func (c *ClientUDPSession) KeepReading() {
for c.ctx.Err() == nil {
udpResp := &UDPResponse{}
buffer := make([]byte, 1600)
n, err := c.conn.Read(buffer)
if err != nil {
newError("unable to read from conn").Base(err).WriteToLog()
return
}
if n != 0 {
err := c.packetProcessor.DecodeUDPResp(buffer[:n], udpResp)
if err != nil {
newError("unable to decode udp response").Base(err).WriteToLog()
continue
}
c.locker.Lock()
session, ok := c.sessionMap[string(udpResp.ClientSessionID[:])]
if ok {
select {
case session.readChan <- udpResp:
default:
}
} else {
newError("misbehaving server: unknown client session ID").Base(err).WriteToLog()
}
c.locker.Unlock()
}
}
}

func (c *ClientUDPSession) NewSessionConn() (internet.AbstractPacketConn, error) {
sessionID := make([]byte, 8)
_, err := rand.Read(sessionID)
if err != nil {
return nil, newError("unable to generate session id").Base(err)
}

connctx, connfinish := context.WithCancel(c.ctx)

sessionConn := &ClientUDPSessionConn{
sessionID: string(sessionID),
readChan: make(chan *UDPResponse, 16),
parent: c,
ctx: connctx,
finish: connfinish,
nextWritePacketID: 0,
}
c.locker.Lock()
c.sessionMap[sessionConn.sessionID] = sessionConn
c.locker.Unlock()
return sessionConn, nil
}

type ClientUDPSessionConn struct {
sessionID string
readChan chan *UDPResponse
parent *ClientUDPSession

nextWritePacketID uint64

ctx context.Context
finish func()
}

func (c *ClientUDPSessionConn) Close() error {
delete(c.parent.sessionMap, c.sessionID)
c.finish()
return nil
}

func (c *ClientUDPSessionConn) WriteTo(p []byte, addr gonet.Addr) (n int, err error) {
thisPacketID := c.nextWritePacketID
c.nextWritePacketID += 1
req := &UDPRequest{
SessionID: [8]byte{},
PacketID: thisPacketID,
TimeStamp: uint64(time.Now().Unix()),
Address: net.IPAddress(addr.(*gonet.UDPAddr).IP),
Port: addr.(*net.UDPAddr).Port,
Payload: nil,
}
copy(req.SessionID[:], c.sessionID)
req.Payload = buf.New()
req.Payload.Write(p)
err = c.parent.WriteUDPRequest(req)
if err != nil {
return 0, newError("unable to write to parent session").Base(err)
}
return len(p), nil
}

func (c *ClientUDPSessionConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
select {
case <-c.ctx.Done():
return 0, nil, io.EOF
case resp := <-c.readChan:
n = copy(p, resp.Payload.Bytes())
resp.Payload.Release()
addr = &net.UDPAddr{IP: resp.Address.IP(), Port: int(resp.Port)}
}
return
}
Loading

0 comments on commit 1c9cace

Please sign in to comment.