From 6826fe87595f64a1c282b94662aac5c9040e731f Mon Sep 17 00:00:00 2001 From: Rod Hynes Date: Mon, 12 Sep 2022 13:21:15 -0400 Subject: [PATCH] Capture client-side meek fragmentor metrics --- psiphon/meekConn.go | 44 +++++++++++++++++++++++++++--- psiphon/server/server_test.go | 51 +++++++++++++++++++++++++++++++++++ psiphon/tunnel.go | 5 ---- 3 files changed, 92 insertions(+), 8 deletions(-) diff --git a/psiphon/meekConn.go b/psiphon/meekConn.go index 9f020f167..ea0e694db 100644 --- a/psiphon/meekConn.go +++ b/psiphon/meekConn.go @@ -217,6 +217,7 @@ type MeekConn struct { tlsPadding int limitRequestPayloadLength int redialTLSProbability float64 + underlyingDialer common.Dialer cachedTLSDialer *cachedTLSDialer transport transporter mutex sync.Mutex @@ -224,6 +225,7 @@ type MeekConn struct { runCtx context.Context stopRunning context.CancelFunc relayWaitGroup *sync.WaitGroup + firstUnderlyingConn net.Conn // For MeekModeObfuscatedRoundTrip meekCookieEncryptionPublicKey string @@ -418,10 +420,12 @@ func DialMeek( scheme = "https" + meek.initUnderlyingDialer(dialConfig) + tlsConfig := &CustomTLSConfig{ Parameters: meekConfig.Parameters, DialAddr: meekConfig.DialAddress, - Dial: NewTCPDialer(dialConfig), + Dial: meek.underlyingDial, SNIServerName: meekConfig.SNIServerName, SkipVerify: skipVerify, VerifyServerName: meekConfig.VerifyServerName, @@ -545,7 +549,8 @@ func DialMeek( *copyDialConfig = *dialConfig copyDialConfig.UpstreamProxyURL = "" - dialer = NewTCPDialer(copyDialConfig) + meek.initUnderlyingDialer(copyDialConfig) + dialer = meek.underlyingDial // In this proxy case, the destination server address is in the // request line URL. net/http will render the request line using @@ -569,7 +574,8 @@ func DialMeek( // If dialConfig.UpstreamProxyURL is set, HTTP proxying via // CONNECT will be used by the dialer. - baseDialer := NewTCPDialer(dialConfig) + meek.initUnderlyingDialer(dialConfig) + baseDialer := meek.underlyingDial // The dialer ignores any address that http.Transport will pass in // (derived from the HTTP request URL) and always dials @@ -699,6 +705,28 @@ func DialMeek( return meek, nil } +func (meek *MeekConn) initUnderlyingDialer(dialConfig *DialConfig) { + + // Not safe for concurrent calls; should be called only from DialMeek. + meek.underlyingDialer = NewTCPDialer(dialConfig) +} + +func (meek *MeekConn) underlyingDial(ctx context.Context, network, addr string) (net.Conn, error) { + conn, err := meek.underlyingDialer(ctx, network, addr) + if err == nil { + meek.mutex.Lock() + if meek.firstUnderlyingConn == nil { + // Keep a reference to the first underlying conn to be used as a + // common.MetricsSource in GetMetrics. This enables capturing + // metrics such as fragmentor configuration. + meek.firstUnderlyingConn = conn + } + meek.mutex.Unlock() + } + // Note: no trace error to preserve error type + return conn, err +} + type cachedTLSDialer struct { usedCachedConn int32 cachedConn net.Conn @@ -806,6 +834,16 @@ func (meek *MeekConn) GetMetrics() common.LogFields { logFields["meek_tls_padding"] = meek.tlsPadding logFields["meek_limit_request"] = meek.limitRequestPayloadLength } + // Include metrics, such as fragmentor metrics, from the _first_ underlying + // dial conn. Properties of subsequent underlying dial conns are not reflected + // in these metrics; we assume that the first dial conn, which most likely + // transits the various protocol handshakes, is most significant. + meek.mutex.Lock() + underlyingMetrics, ok := meek.firstUnderlyingConn.(common.MetricsSource) + if ok { + logFields.Add(underlyingMetrics.GetMetrics()) + } + meek.mutex.Unlock() return logFields } diff --git a/psiphon/server/server_test.go b/psiphon/server/server_test.go index 555d91520..3dd8f6a2a 100644 --- a/psiphon/server/server_test.go +++ b/psiphon/server/server_test.go @@ -218,6 +218,31 @@ func TestUnfrontedMeek(t *testing.T) { }) } +func TestFragmentedUnfrontedMeek(t *testing.T) { + runServer(t, + &runServerConfig{ + tunnelProtocol: "UNFRONTED-MEEK-OSSH", + enableSSHAPIRequests: true, + doHotReload: false, + doDefaultSponsorID: false, + denyTrafficRules: false, + requireAuthorization: true, + omitAuthorization: false, + doTunneledWebRequest: true, + doTunneledNTPRequest: true, + forceFragmenting: true, + forceLivenessTest: false, + doPruneServerEntries: false, + doDanglingTCPConn: true, + doPacketManipulation: false, + doBurstMonitor: false, + doSplitTunnel: false, + limitQUICVersions: false, + doDestinationBytes: false, + doChangeBytesConfig: false, + }) +} + func TestUnfrontedMeekHTTPS(t *testing.T) { runServer(t, &runServerConfig{ @@ -244,6 +269,32 @@ func TestUnfrontedMeekHTTPS(t *testing.T) { }) } +func TestFragmentedUnfrontedMeekHTTPS(t *testing.T) { + runServer(t, + &runServerConfig{ + tunnelProtocol: "UNFRONTED-MEEK-HTTPS-OSSH", + tlsProfile: protocol.TLS_PROFILE_RANDOMIZED, + enableSSHAPIRequests: true, + doHotReload: false, + doDefaultSponsorID: false, + denyTrafficRules: false, + requireAuthorization: true, + omitAuthorization: false, + doTunneledWebRequest: true, + doTunneledNTPRequest: true, + forceFragmenting: true, + forceLivenessTest: false, + doPruneServerEntries: false, + doDanglingTCPConn: true, + doPacketManipulation: false, + doBurstMonitor: false, + doSplitTunnel: false, + limitQUICVersions: false, + doDestinationBytes: false, + doChangeBytesConfig: false, + }) +} + func TestUnfrontedMeekHTTPSTLS13(t *testing.T) { runServer(t, &runServerConfig{ diff --git a/psiphon/tunnel.go b/psiphon/tunnel.go index 103b21b4f..33ff8ab09 100644 --- a/psiphon/tunnel.go +++ b/psiphon/tunnel.go @@ -136,7 +136,6 @@ func getCustomParameters( // handshake requests and starting operateTunnel from tunnels which // may be discarded, call Activate on connected tunnels sequentially // as necessary. -// func ConnectTunnel( ctx context.Context, config *Config, @@ -942,9 +941,6 @@ func dialTunnel( // Some conns report additional metrics. fragmentor.Conns report // fragmentor configs. - // - // Limitation: for meek, GetMetrics from underlying fragmentor.Conn(s) - // should be called in order to log fragmentor metrics for meek sessions. if metricsSource, ok := dialConn.(common.MetricsSource); ok { dialParams.DialConnMetrics = metricsSource } @@ -1359,7 +1355,6 @@ func performLivenessTest( // // TODO: change "recently active" to include having received any // SSH protocol messages from the server, not just user payload? -// func (tunnel *Tunnel) operateTunnel(tunnelOwner TunnelOwner) { defer tunnel.operateWaitGroup.Done()