diff --git a/proxy/shadowsocks2022/client.go b/proxy/shadowsocks2022/client.go index 207c6044ed2..979d4b8f264 100644 --- a/proxy/shadowsocks2022/client.go +++ b/proxy/shadowsocks2022/client.go @@ -4,13 +4,19 @@ import ( "context" "github.com/v2fly/v2ray-core/v5/common" "github.com/v2fly/v2ray-core/v5/common/buf" + "github.com/v2fly/v2ray-core/v5/common/environment" + "github.com/v2fly/v2ray-core/v5/common/environment/envctx" "github.com/v2fly/v2ray-core/v5/common/net" + "github.com/v2fly/v2ray-core/v5/common/net/packetaddr" "github.com/v2fly/v2ray-core/v5/common/retry" "github.com/v2fly/v2ray-core/v5/common/session" "github.com/v2fly/v2ray-core/v5/common/signal" "github.com/v2fly/v2ray-core/v5/common/task" "github.com/v2fly/v2ray-core/v5/transport" "github.com/v2fly/v2ray-core/v5/transport/internet" + "github.com/v2fly/v2ray-core/v5/transport/internet/udp" + gonet "net" + "sync" "time" ) @@ -19,32 +25,40 @@ type Client struct { ctx context.Context } -func (c *Client) Process(ctx context.Context, link *transport.Link, dialer internet.Dialer) error { - outbound := session.OutboundFromContext(ctx) - if outbound == nil || !outbound.Target.IsValid() { - return newError("target not specified") - } - destination := outbound.Target - network := destination.Network +const UDPConnectionState = "UDPConnectionState" - var conn internet.Connection +type ClientUDPConnState struct { + session *ClientUDPSession + initOnce *sync.Once +} - err := retry.ExponentialBackoff(5, 100).On(func() error { - dest := net.TCPDestination(c.config.Address.AsAddress(), net.Port(c.config.Port)) - dest.Network = network - rawConn, err := dialer.Dial(ctx, dest) +func (c *ClientUDPConnState) GetOrCreateSession(create func() (*ClientUDPSession, error)) (*ClientUDPSession, error) { + var err error + c.initOnce.Do(func() { + sessionState, err := create() if err != nil { - return err + err = newError("failed to create UDP session").Base(err) + return } - conn = rawConn - - return nil + c.session = sessionState }) if err != nil { - return newError("failed to find an available destination").AtWarning().Base(err) + return nil, newError("failed to initialize UDP State").Base(err) + } + return c.session, nil +} + +func NewClientUDPConnState() (*ClientUDPConnState, error) { + return &ClientUDPConnState{initOnce: &sync.Once{}}, nil +} + +func (c *Client) Process(ctx context.Context, link *transport.Link, dialer internet.Dialer) error { + outbound := session.OutboundFromContext(ctx) + if outbound == nil || !outbound.Target.IsValid() { + return newError("target not specified") } - newError("tunneling request to ", destination, " via ", network, ":", c.config.Address).WriteToLog(session.ExportIDToError(ctx)) - defer conn.Close() + destination := outbound.Target + network := destination.Network var keyDerivation = newBLAKE3KeyDerivation() var method Method @@ -62,7 +76,44 @@ func (c *Client) Process(ctx context.Context, link *transport.Link, dialer inter ctx, cancel := context.WithCancel(ctx) timer := signal.CancelAfterInactivity(ctx, cancel, time.Minute) + if packetConn, err := packetaddr.ToPacketAddrConn(link, destination); err == nil { + udpSession, err := c.getUDPSession(c.ctx, network, dialer, method, keyDerivation) + if err != nil { + return newError("failed to get UDP udpSession").Base(err) + } + requestDone := func() error { + return udp.CopyPacketConn(udpSession, packetConn, udp.UpdateActivity(timer)) + } + responseDone := func() error { + return udp.CopyPacketConn(packetConn, udpSession, udp.UpdateActivity(timer)) + } + responseDoneAndCloseWriter := task.OnSuccess(responseDone, task.Close(link.Writer)) + if err := task.Run(ctx, requestDone, responseDoneAndCloseWriter); err != nil { + return newError("connection ends").Base(err) + } + return nil + } + if network == net.Network_TCP { + var conn internet.Connection + err := retry.ExponentialBackoff(5, 100).On(func() error { + dest := net.TCPDestination(c.config.Address.AsAddress(), net.Port(c.config.Port)) + dest.Network = network + rawConn, err := dialer.Dial(ctx, dest) + if err != nil { + return err + } + conn = rawConn + + return nil + }) + + if err != nil { + return newError("failed to find an available destination").AtWarning().Base(err) + } + newError("tunneling request to ", destination, " via ", network, ":", c.config.Address).WriteToLog(session.ExportIDToError(ctx)) + defer conn.Close() + request := &TCPRequest{ keyDerivation: keyDerivation, method: method, @@ -107,11 +158,79 @@ func (c *Client) Process(ctx context.Context, link *transport.Link, dialer inter } return nil } else { - return newError("not implemented") + udpSession, err := c.getUDPSession(c.ctx, network, dialer, method, keyDerivation) + if err != nil { + return newError("failed to get UDP udpSession").Base(err) + } + monoDestUDPConn := udp.NewMonoDestUDPConn(udpSession, &gonet.UDPAddr{IP: destination.Address.IP(), Port: int(destination.Port)}) + requestDone := func() error { + return buf.Copy(link.Reader, monoDestUDPConn, buf.UpdateActivity(timer)) + } + responseDone := func() error { + return buf.Copy(monoDestUDPConn, link.Writer, buf.UpdateActivity(timer)) + } + responseDoneAndCloseWriter := task.OnSuccess(responseDone, task.Close(link.Writer)) + if err := task.Run(ctx, requestDone, responseDoneAndCloseWriter); err != nil { + return newError("connection ends").Base(err) + } + return nil } } +func (c *Client) getUDPSession(ctx context.Context, network net.Network, dialer internet.Dialer, method Method, keyDerivation *BLAKE3KeyDerivation) (internet.AbstractPacketConn, error) { + storage := envctx.EnvironmentFromContext(ctx).(environment.ProxyEnvironment).TransientStorage() + clientUDPStateIfce, err := storage.Get(ctx, UDPConnectionState) + if err != nil { + return nil, newError("failed to get UDP connection state").Base(err) + } + clientUDPState, ok := clientUDPStateIfce.(*ClientUDPConnState) + if !ok { + return nil, newError("failed to cast UDP connection state") + } + + sessionState, err := clientUDPState.GetOrCreateSession(func() (*ClientUDPSession, error) { + var conn internet.Connection + err := retry.ExponentialBackoff(5, 100).On(func() error { + dest := net.TCPDestination(c.config.Address.AsAddress(), net.Port(c.config.Port)) + dest.Network = network + rawConn, err := dialer.Dial(ctx, dest) + if err != nil { + return err + } + conn = rawConn + + return nil + }) + if err != nil { + return nil, newError("failed to find an available destination").AtWarning().Base(err) + } + newError("creating udp session to ", network, ":", c.config.Address).WriteToLog(session.ExportIDToError(ctx)) + packetProcessor, err := method.GetUDPClientProcessor(c.config.Ipsk, c.config.Psk, keyDerivation) + if err != nil { + return nil, newError("failed to create UDP client packet processor").Base(err) + } + return NewClientUDPSession(ctx, conn, packetProcessor), nil + }) + if err != nil { + return nil, newError("failed to create UDP session").Base(err) + } + sessionConn, err := sessionState.NewSessionConn() + if err != nil { + return nil, newError("failed to create UDP session connection").Base(err) + } + return sessionConn, nil +} + func NewClient(ctx context.Context, config *ClientConfig) (*Client, error) { + + storage := envctx.EnvironmentFromContext(ctx).(environment.ProxyEnvironment).TransientStorage() + + udpState, err := NewClientUDPConnState() + if err != nil { + return nil, newError("failed to create UDP connection state").Base(err) + } + storage.Put(ctx, UDPConnectionState, udpState) + return &Client{ config: config, ctx: ctx, diff --git a/proxy/shadowsocks2022/client_session.go b/proxy/shadowsocks2022/client_session.go new file mode 100644 index 00000000000..4acd8363796 --- /dev/null +++ b/proxy/shadowsocks2022/client_session.go @@ -0,0 +1,162 @@ +package shadowsocks2022 + +import ( + "context" + "crypto/rand" + "github.com/v2fly/v2ray-core/v5/common/buf" + "github.com/v2fly/v2ray-core/v5/common/net" + "github.com/v2fly/v2ray-core/v5/transport/internet" + "io" + gonet "net" + "sync" + "time" +) + +func NewClientUDPSession(ctx context.Context, conn io.ReadWriteCloser, packetProcessor UDPClientPacketProcessor) *ClientUDPSession { + session := &ClientUDPSession{ + locker: &sync.Mutex{}, + conn: conn, + packetProcessor: packetProcessor, + sessionMap: make(map[string]*ClientUDPSessionConn), + } + session.ctx, session.finish = context.WithCancel(ctx) + + go session.KeepReading() + return session +} + +type ClientUDPSession struct { + locker *sync.Mutex + + conn io.ReadWriteCloser + packetProcessor UDPClientPacketProcessor + sessionMap map[string]*ClientUDPSessionConn + + ctx context.Context + finish func() +} + +func (c *ClientUDPSession) Close() error { + c.finish() + return c.conn.Close() +} + +func (c *ClientUDPSession) WriteUDPRequest(request *UDPRequest) error { + buffer := buf.New() + defer buffer.Release() + err := c.packetProcessor.EncodeUDPRequest(request, buffer) + if request.Payload != nil { + request.Payload.Release() + } + if err != nil { + return newError("unable to encode udp request").Base(err) + } + _, err = c.conn.Write(buffer.Bytes()) + if err != nil { + return newError("unable to write to conn").Base(err) + } + return nil +} + +func (c *ClientUDPSession) KeepReading() { + for c.ctx.Err() == nil { + udpResp := &UDPResponse{} + buffer := make([]byte, 1600) + n, err := c.conn.Read(buffer) + if err != nil { + newError("unable to read from conn").Base(err).WriteToLog() + return + } + if n != 0 { + err := c.packetProcessor.DecodeUDPResp(buffer[:n], udpResp) + if err != nil { + newError("unable to decode udp response").Base(err).WriteToLog() + continue + } + c.locker.Lock() + session, ok := c.sessionMap[string(udpResp.ClientSessionID[:])] + if ok { + select { + case session.readChan <- udpResp: + default: + } + } else { + newError("misbehaving server: unknown client session ID").Base(err).WriteToLog() + } + c.locker.Unlock() + } + } +} + +func (c *ClientUDPSession) NewSessionConn() (internet.AbstractPacketConn, error) { + sessionID := make([]byte, 8) + _, err := rand.Read(sessionID) + if err != nil { + return nil, newError("unable to generate session id").Base(err) + } + + connctx, connfinish := context.WithCancel(c.ctx) + + sessionConn := &ClientUDPSessionConn{ + sessionID: string(sessionID), + readChan: make(chan *UDPResponse, 16), + parent: c, + ctx: connctx, + finish: connfinish, + nextWritePacketID: 0, + } + c.locker.Lock() + c.sessionMap[sessionConn.sessionID] = sessionConn + c.locker.Unlock() + return sessionConn, nil +} + +type ClientUDPSessionConn struct { + sessionID string + readChan chan *UDPResponse + parent *ClientUDPSession + + nextWritePacketID uint64 + + ctx context.Context + finish func() +} + +func (c *ClientUDPSessionConn) Close() error { + delete(c.parent.sessionMap, c.sessionID) + c.finish() + return nil +} + +func (c *ClientUDPSessionConn) WriteTo(p []byte, addr gonet.Addr) (n int, err error) { + thisPacketID := c.nextWritePacketID + c.nextWritePacketID += 1 + req := &UDPRequest{ + SessionID: [8]byte{}, + PacketID: thisPacketID, + TimeStamp: uint64(time.Now().Unix()), + Address: net.IPAddress(addr.(*gonet.UDPAddr).IP), + Port: addr.(*net.UDPAddr).Port, + Payload: nil, + } + copy(req.SessionID[:], c.sessionID) + req.Payload = buf.New() + req.Payload.Write(p) + err = c.parent.WriteUDPRequest(req) + if err != nil { + return 0, newError("unable to write to parent session").Base(err) + } + return len(p), nil +} + +func (c *ClientUDPSessionConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) { + select { + case <-c.ctx.Done(): + return 0, nil, io.EOF + case resp := <-c.readChan: + n = copy(p, resp.Payload.Bytes()) + resp.Payload.Release() + addr = &net.UDPAddr{IP: resp.Address.IP(), Port: int(resp.Port)} + } + return +} diff --git a/proxy/shadowsocks2022/eih_aes.go b/proxy/shadowsocks2022/eih_aes.go index eca9b053e48..08f5dbe68ee 100644 --- a/proxy/shadowsocks2022/eih_aes.go +++ b/proxy/shadowsocks2022/eih_aes.go @@ -1,6 +1,7 @@ package shadowsocks2022 import ( + "crypto/subtle" "github.com/lunixbochs/struc" "github.com/v2fly/v2ray-core/v5/common/buf" "io" @@ -78,18 +79,33 @@ func newAESEIHGeneratorContainer(size int, effectivePsk []byte, ipsk [][]byte) * } func (a *aesEIHGenerator) GenerateEIH(derivation KeyDerivation, method Method, salt []byte) (ExtensibleIdentityHeaders, error) { + return a.generateEIHWithMask(derivation, method, salt, nil) +} + +func (a *aesEIHGenerator) GenerateEIHUDP(derivation KeyDerivation, method Method, mask []byte) (ExtensibleIdentityHeaders, error) { + return a.generateEIHWithMask(derivation, method, nil, mask) +} + +func (a *aesEIHGenerator) generateEIHWithMask(derivation KeyDerivation, method Method, salt, mask []byte) (ExtensibleIdentityHeaders, error) { eih := make([][16]byte, a.length) current := a.length - 1 currentPskHash := a.pskHash for { identityKeyBuf := buf.New() identityKey := identityKeyBuf.Extend(int32(method.GetSessionSubKeyAndSaltLength())) - err := derivation.GetIdentitySubKey(a.ipsk[current], salt, identityKey) - if err != nil { - return nil, newError("failed to get identity sub key").Base(err) + if mask == nil { + err := derivation.GetIdentitySubKey(a.ipsk[current], salt, identityKey) + if err != nil { + return nil, newError("failed to get identity sub key").Base(err) + } + } else { + copy(identityKey, a.ipsk[current]) } eih[current] = [16]byte{} - err = method.GenerateEIH(identityKey, currentPskHash[:], eih[current][:]) + if mask != nil { + subtle.XORBytes(currentPskHash[:], mask, currentPskHash[:]) + } + err := method.GenerateEIH(identityKey, currentPskHash[:], eih[current][:]) if err != nil { return nil, newError("failed to generate EIH").Base(err) } diff --git a/proxy/shadowsocks2022/method_aes128gcm.go b/proxy/shadowsocks2022/method_aes128gcm.go index 50d61abb1b5..e7ef940a861 100644 --- a/proxy/shadowsocks2022/method_aes128gcm.go +++ b/proxy/shadowsocks2022/method_aes128gcm.go @@ -36,3 +36,37 @@ func (A AES128GCMMethod) GenerateEIH(CurrentIdentitySubKey []byte, nextPskHash [ aesCipher.Encrypt(out, nextPskHash) return nil } + +func (A AES128GCMMethod) GetUDPClientProcessor(ipsk [][]byte, psk []byte, derivation KeyDerivation) (UDPClientPacketProcessor, error) { + reqSeparateHeaderPsk := psk + if ipsk != nil { + reqSeparateHeaderPsk = ipsk[0] + } + reqSeparateHeaderCipher, err := aes.NewCipher(reqSeparateHeaderPsk) + if err != nil { + return nil, newError("failed to create AES cipher").Base(err) + } + respSeparateHeaderCipher, err := aes.NewCipher(psk) + if err != nil { + return nil, newError("failed to create AES cipher").Base(err) + } + getPacketAEAD := func(sessionID []byte) cipher.AEAD { + sessionKey := make([]byte, A.GetSessionSubKeyAndSaltLength()) + derivation.GetSessionSubKey(psk, sessionID, sessionKey) + block, err := aes.NewCipher(sessionKey) + aead, err := cipher.NewGCM(block) + if err != nil { + panic(err) + } + return aead + } + eihGenerator := newAESEIHGeneratorContainer(len(ipsk), psk, ipsk) + getEIH := func(mask []byte) ExtensibleIdentityHeaders { + eih, err := eihGenerator.GenerateEIHUDP(derivation, A, mask) + if err != nil { + newError("failed to generate EIH").Base(err).WriteToLog() + } + return eih + } + return NewAESUDPClientPacketProcessor(reqSeparateHeaderCipher, respSeparateHeaderCipher, getPacketAEAD, getEIH), nil +} diff --git a/proxy/shadowsocks2022/method_aes256gcm.go b/proxy/shadowsocks2022/method_aes256gcm.go index 52f7c30b152..20cc601f3e2 100644 --- a/proxy/shadowsocks2022/method_aes256gcm.go +++ b/proxy/shadowsocks2022/method_aes256gcm.go @@ -36,3 +36,37 @@ func (A AES256GCMMethod) GenerateEIH(CurrentIdentitySubKey []byte, nextPskHash [ aesCipher.Encrypt(out, nextPskHash) return nil } + +func (A AES256GCMMethod) GetUDPClientProcessor(ipsk [][]byte, psk []byte, derivation KeyDerivation) (UDPClientPacketProcessor, error) { + reqSeparateHeaderPsk := psk + if ipsk != nil { + reqSeparateHeaderPsk = ipsk[0] + } + reqSeparateHeaderCipher, err := aes.NewCipher(reqSeparateHeaderPsk) + if err != nil { + return nil, newError("failed to create AES cipher").Base(err) + } + respSeparateHeaderCipher, err := aes.NewCipher(psk) + if err != nil { + return nil, newError("failed to create AES cipher").Base(err) + } + getPacketAEAD := func(sessionID []byte) cipher.AEAD { + sessionKey := make([]byte, A.GetSessionSubKeyAndSaltLength()) + derivation.GetSessionSubKey(psk, sessionID, sessionKey) + block, err := aes.NewCipher(sessionKey) + aead, err := cipher.NewGCM(block) + if err != nil { + panic(err) + } + return aead + } + eihGenerator := newAESEIHGeneratorContainer(len(ipsk), psk, ipsk) + getEIH := func(mask []byte) ExtensibleIdentityHeaders { + eih, err := eihGenerator.GenerateEIHUDP(derivation, A, mask) + if err != nil { + newError("failed to generate EIH").Base(err).WriteToLog() + } + return eih + } + return NewAESUDPClientPacketProcessor(reqSeparateHeaderCipher, respSeparateHeaderCipher, getPacketAEAD, getEIH), nil +} diff --git a/proxy/shadowsocks2022/ss2022.go b/proxy/shadowsocks2022/ss2022.go index 6d5bfac54c6..67b425ac4b7 100644 --- a/proxy/shadowsocks2022/ss2022.go +++ b/proxy/shadowsocks2022/ss2022.go @@ -3,6 +3,7 @@ package shadowsocks2022 import ( "crypto/cipher" "github.com/lunixbochs/struc" + "github.com/v2fly/v2ray-core/v5/common/buf" "github.com/v2fly/v2ray-core/v5/common/net" "github.com/v2fly/v2ray-core/v5/common/protocol" "io" @@ -19,6 +20,7 @@ type Method interface { GetSessionSubKeyAndSaltLength() int GetStreamAEAD(SessionSubKey []byte) (cipher.AEAD, error) GenerateEIH(CurrentIdentitySubKey []byte, nextPskHash []byte, out []byte) error + GetUDPClientProcessor(ipsk [][]byte, psk []byte, derivation KeyDerivation) (UDPClientPacketProcessor, error) } type ExtensibleIdentityHeaders interface { @@ -86,3 +88,27 @@ var addrParser = protocol.NewAddressParser( protocol.AddressFamilyByte(0x04, net.AddressFamilyIPv6), protocol.AddressFamilyByte(0x03, net.AddressFamilyDomain), ) + +type UDPRequest struct { + SessionID [8]byte + PacketID uint64 + TimeStamp uint64 + Address DestinationAddress + Port int + Payload *buf.Buffer +} + +type UDPResponse struct { + UDPRequest + ClientSessionID [8]byte +} + +const UDPHeaderTypeClientToServerStream = byte(0x00) +const UDPHeaderTypeServerToClientStream = byte(0x01) + +// UDPClientPacketProcessor +// Caller retain and receive all ownership of the buffer +type UDPClientPacketProcessor interface { + EncodeUDPRequest(request *UDPRequest, out *buf.Buffer) error + DecodeUDPResp(input []byte, resp *UDPResponse) error +} diff --git a/proxy/shadowsocks2022/udp_aes.go b/proxy/shadowsocks2022/udp_aes.go new file mode 100644 index 00000000000..f7a6d3c3df9 --- /dev/null +++ b/proxy/shadowsocks2022/udp_aes.go @@ -0,0 +1,157 @@ +package shadowsocks2022 + +import ( + "bytes" + "crypto/cipher" + "github.com/lunixbochs/struc" + "github.com/v2fly/v2ray-core/v5/common/buf" + "github.com/v2fly/v2ray-core/v5/common/net" + "io" +) + +type AESUDPClientPacketProcessor struct { + requestSeparateHeaderBlockCipher cipher.Block + responseSeparateHeaderBlockCipher cipher.Block + mainPacketAEAD func([]byte) cipher.AEAD + EIHGenerator func([]byte) ExtensibleIdentityHeaders +} + +func NewAESUDPClientPacketProcessor(requestSeparateHeaderBlockCipher, responseSeparateHeaderBlockCipher cipher.Block, mainPacketAEAD func([]byte) cipher.AEAD, eih func([]byte) ExtensibleIdentityHeaders) *AESUDPClientPacketProcessor { + return &AESUDPClientPacketProcessor{ + requestSeparateHeaderBlockCipher: requestSeparateHeaderBlockCipher, + responseSeparateHeaderBlockCipher: responseSeparateHeaderBlockCipher, + mainPacketAEAD: mainPacketAEAD, + EIHGenerator: eih, + } +} + +type separateHeader struct { + SessionID [8]byte + PacketID uint64 +} + +type header struct { + Type byte + TimeStamp uint64 + PaddingLength uint16 `struc:"sizeof=Padding"` + Padding []byte +} + +type respHeader struct { + Type byte + TimeStamp uint64 + ClientSessionID [8]byte + PaddingLength uint16 `struc:"sizeof=Padding"` + Padding []byte +} + +func (p *AESUDPClientPacketProcessor) EncodeUDPRequest(request *UDPRequest, out *buf.Buffer) error { + separateHeaderStruct := separateHeader{PacketID: request.PacketID, SessionID: request.SessionID} + separateHeaderBuffer := buf.New() + defer separateHeaderBuffer.Release() + { + err := struc.Pack(separateHeaderBuffer, &separateHeaderStruct) + if err != nil { + return newError("failed to pack separateHeader").Base(err) + } + } + separateHeaderBufferBytes := separateHeaderBuffer.Bytes() + { + encryptedDest := out.Extend(16) + p.requestSeparateHeaderBlockCipher.Encrypt(encryptedDest, separateHeaderBufferBytes) + } + + if p.EIHGenerator != nil { + eih := p.EIHGenerator(separateHeaderBufferBytes[0:16]) + eihHeader := struct { + EIH ExtensibleIdentityHeaders + }{ + EIH: eih, + } + err := struc.Pack(out, &eihHeader) + if err != nil { + return newError("failed to pack eih").Base(err) + } + } + + headerStruct := header{ + Type: UDPHeaderTypeClientToServerStream, + TimeStamp: request.TimeStamp, + PaddingLength: 0, + Padding: nil, + } + requestBodyBuffer := buf.New() + { + err := struc.Pack(requestBodyBuffer, &headerStruct) + if err != nil { + return newError("failed to header").Base(err) + } + } + { + err := addrParser.WriteAddressPort(requestBodyBuffer, request.Address, net.Port(request.Port)) + if err != nil { + return newError("failed to write address port").Base(err) + } + } + { + _, err := io.Copy(requestBodyBuffer, bytes.NewReader(request.Payload.Bytes())) + if err != nil { + return newError("failed to copy payload").Base(err) + } + } + { + mainPacketAEADMaterialized := p.mainPacketAEAD(separateHeaderBufferBytes[0:8]) + encryptedDest := out.Extend(int32(mainPacketAEADMaterialized.Overhead()) + requestBodyBuffer.Len()) + mainPacketAEADMaterialized.Seal(encryptedDest[:0], separateHeaderBuffer.Bytes()[4:16], requestBodyBuffer.Bytes(), nil) + } + return nil +} + +func (p *AESUDPClientPacketProcessor) DecodeUDPResp(input []byte, resp *UDPResponse) error { + separateHeaderBuffer := buf.New() + defer separateHeaderBuffer.Release() + { + encryptedDest := separateHeaderBuffer.Extend(16) + p.responseSeparateHeaderBlockCipher.Decrypt(encryptedDest, input) + } + separateHeaderStruct := separateHeader{} + { + err := struc.Unpack(separateHeaderBuffer, &separateHeaderStruct) + if err != nil { + return newError("failed to unpack separateHeader").Base(err) + } + } + resp.PacketID = separateHeaderStruct.PacketID + resp.SessionID = separateHeaderStruct.SessionID + { + mainPacketAEADMaterialized := p.mainPacketAEAD(separateHeaderBuffer.Bytes()[0:8]) + decryptedDestBuffer := buf.New() + decryptedDest := decryptedDestBuffer.Extend(int32(len(input)) - 16 - int32(mainPacketAEADMaterialized.Overhead())) + _, err := mainPacketAEADMaterialized.Open(decryptedDest[:0], separateHeaderBuffer.Bytes()[4:16], input[16:], nil) + if err != nil { + return newError("failed to open main packet").Base(err) + } + decryptedDestReader := bytes.NewReader(decryptedDest) + headerStruct := respHeader{} + { + err := struc.Unpack(decryptedDestReader, &headerStruct) + if err != nil { + return newError("failed to unpack header").Base(err) + } + } + resp.TimeStamp = headerStruct.TimeStamp + var addressReaderBuf = buf.New() + defer addressReaderBuf.Release() + var port net.Port + resp.Address, port, err = addrParser.ReadAddressPort(addressReaderBuf, decryptedDestReader) + if err != nil { + return newError("failed to read address port").Base(err) + } + resp.Port = int(port) + readedLength := decryptedDestReader.Size() - int64(decryptedDestReader.Len()) + decryptedDestBuffer.Advance(int32(readedLength)) + resp.Payload = decryptedDestBuffer + resp.ClientSessionID = headerStruct.ClientSessionID + return nil + } +} diff --git a/transport/internet/udp/monodest.go b/transport/internet/udp/monodest.go new file mode 100644 index 00000000000..cdca33b0966 --- /dev/null +++ b/transport/internet/udp/monodest.go @@ -0,0 +1,51 @@ +package udp + +import ( + "github.com/v2fly/v2ray-core/v5/common/buf" + "github.com/v2fly/v2ray-core/v5/common/net" + "github.com/v2fly/v2ray-core/v5/transport/internet" +) + +func NewMonoDestUDPConn(conn internet.AbstractPacketConn, addr net.Addr) *MonoDestUDPConn { + return &MonoDestUDPConn{ + AbstractPacketConn: conn, + dest: addr, + } +} + +type MonoDestUDPConn struct { + internet.AbstractPacketConn + dest net.Addr +} + +func (m *MonoDestUDPConn) ReadMultiBuffer() (buf.MultiBuffer, error) { + buffer := buf.New() + buffer.Extend(2048) + nBytes, _, err := m.AbstractPacketConn.ReadFrom(buffer.Bytes()) + if err != nil { + buffer.Release() + return nil, err + } + buffer.Resize(0, int32(nBytes)) + return buf.MultiBuffer{buffer}, nil +} + +func (m *MonoDestUDPConn) WriteMultiBuffer(buffer buf.MultiBuffer) error { + for _, b := range buffer { + _, err := m.AbstractPacketConn.WriteTo(b.Bytes(), m.dest) + if err != nil { + return err + } + } + buf.ReleaseMulti(buffer) + return nil +} + +func (m *MonoDestUDPConn) Read(p []byte) (n int, err error) { + n, _, err = m.AbstractPacketConn.ReadFrom(p) + return +} + +func (m *MonoDestUDPConn) Write(p []byte) (n int, err error) { + return m.AbstractPacketConn.WriteTo(p, m.dest) +}