diff --git a/pkg/kubelet/server/portforward/httpstream.go b/pkg/kubelet/server/portforward/httpstream.go index b9d58a7e403c9..4b5e66d6607bc 100644 --- a/pkg/kubelet/server/portforward/httpstream.go +++ b/pkg/kubelet/server/portforward/httpstream.go @@ -163,10 +163,6 @@ func (h *httpStreamHandler) removeStreamPair(requestID string) { h.streamPairsLock.Lock() defer h.streamPairsLock.Unlock() - if h.conn != nil { - pair := h.streamPairs[requestID] - h.conn.RemoveStreams(pair.dataStream, pair.errorStream) - } delete(h.streamPairs, requestID) } diff --git a/pkg/kubelet/server/portforward/httpstream_test.go b/pkg/kubelet/server/portforward/httpstream_test.go index 37e0ce8f90702..26e6905bbbc0e 100644 --- a/pkg/kubelet/server/portforward/httpstream_test.go +++ b/pkg/kubelet/server/portforward/httpstream_test.go @@ -92,23 +92,11 @@ func TestHTTPStreamReceived(t *testing.T) { } } -type fakeConn struct { - removeStreamsCalled bool -} - -func (*fakeConn) CreateStream(headers http.Header) (httpstream.Stream, error) { return nil, nil } -func (*fakeConn) Close() error { return nil } -func (*fakeConn) CloseChan() <-chan bool { return nil } -func (*fakeConn) SetIdleTimeout(timeout time.Duration) {} -func (f *fakeConn) RemoveStreams(streams ...httpstream.Stream) { f.removeStreamsCalled = true } - func TestGetStreamPair(t *testing.T) { timeout := make(chan time.Time) - conn := &fakeConn{} h := &httpStreamHandler{ streamPairs: make(map[string]*httpStreamPair), - conn: conn, } // test adding a new entry @@ -170,11 +158,6 @@ func TestGetStreamPair(t *testing.T) { // make sure monitorStreamPair completed <-monitorDone - if !conn.removeStreamsCalled { - t.Fatalf("connection remove stream not called") - } - conn.removeStreamsCalled = false - // make sure the pair was removed if h.hasStreamPair("1") { t.Fatal("expected removal of pair after both data and error streams received") @@ -188,7 +171,6 @@ func TestGetStreamPair(t *testing.T) { if p == nil { t.Fatal("expected p not to be nil") } - monitorDone = make(chan struct{}) go func() { h.monitorStreamPair(p, timeout) @@ -201,9 +183,6 @@ func TestGetStreamPair(t *testing.T) { if h.hasStreamPair("2") { t.Fatal("expected stream pair to be removed") } - if !conn.removeStreamsCalled { - t.Fatalf("connection remove stream not called") - } } func TestRequestID(t *testing.T) { diff --git a/staging/src/k8s.io/apimachinery/pkg/util/httpstream/httpstream.go b/staging/src/k8s.io/apimachinery/pkg/util/httpstream/httpstream.go index 32f075782a9ab..00ce5f785c8b1 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/httpstream/httpstream.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/httpstream/httpstream.go @@ -78,8 +78,6 @@ type Connection interface { // SetIdleTimeout sets the amount of time the connection may remain idle before // it is automatically closed. SetIdleTimeout(timeout time.Duration) - // RemoveStreams can be used to remove a set of streams from the Connection. - RemoveStreams(streams ...Stream) } // Stream represents a bidirectional communications channel that is part of an diff --git a/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection.go b/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection.go index b6903c527641b..9d222faa898fa 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection.go @@ -31,7 +31,7 @@ import ( // streams. type connection struct { conn *spdystream.Connection - streams map[uint32]httpstream.Stream + streams []httpstream.Stream streamLock sync.Mutex newStreamHandler httpstream.NewStreamHandler } @@ -64,11 +64,7 @@ func NewServerConnection(conn net.Conn, newStreamHandler httpstream.NewStreamHan // will be invoked when the server receives a newly created stream from the // client. func newConnection(conn *spdystream.Connection, newStreamHandler httpstream.NewStreamHandler) httpstream.Connection { - c := &connection{ - conn: conn, - newStreamHandler: newStreamHandler, - streams: make(map[uint32]httpstream.Stream), - } + c := &connection{conn: conn, newStreamHandler: newStreamHandler} go conn.Serve(c.newSpdyStream) return c } @@ -85,7 +81,7 @@ func (c *connection) Close() error { // calling Reset instead of Close ensures that all streams are fully torn down s.Reset() } - c.streams = make(map[uint32]httpstream.Stream, 0) + c.streams = make([]httpstream.Stream, 0) c.streamLock.Unlock() // now that all streams are fully torn down, it's safe to call close on the underlying connection, @@ -94,15 +90,6 @@ func (c *connection) Close() error { return c.conn.Close() } -// RemoveStreams can be used to removes a set of streams from the Connection. -func (c *connection) RemoveStreams(streams ...httpstream.Stream) { - c.streamLock.Lock() - for _, stream := range streams { - delete(c.streams, stream.Identifier()) - } - c.streamLock.Unlock() -} - // CreateStream creates a new stream with the specified headers and registers // it with the connection. func (c *connection) CreateStream(headers http.Header) (httpstream.Stream, error) { @@ -122,7 +109,7 @@ func (c *connection) CreateStream(headers http.Header) (httpstream.Stream, error // it owns. func (c *connection) registerStream(s httpstream.Stream) { c.streamLock.Lock() - c.streams[s.Identifier()] = s + c.streams = append(c.streams, s) c.streamLock.Unlock() } diff --git a/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection_test.go b/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection_test.go index cfeef2c9075bd..e00b29c461e1e 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection_test.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection_test.go @@ -178,41 +178,3 @@ func TestConnectionCloseIsImmediateThroughAProxy(t *testing.T) { } } } - -type fakeStream struct{ id uint32 } - -func (*fakeStream) Read(p []byte) (int, error) { return 0, nil } -func (*fakeStream) Write(p []byte) (int, error) { return 0, nil } -func (*fakeStream) Close() error { return nil } -func (*fakeStream) Reset() error { return nil } -func (*fakeStream) Headers() http.Header { return nil } -func (f *fakeStream) Identifier() uint32 { return f.id } - -func TestConnectionRemoveStreams(t *testing.T) { - c := &connection{streams: make(map[uint32]httpstream.Stream)} - stream0 := &fakeStream{id: 0} - stream1 := &fakeStream{id: 1} - stream2 := &fakeStream{id: 2} - - c.registerStream(stream0) - c.registerStream(stream1) - - if len(c.streams) != 2 { - t.Fatalf("should have two streams, has %d", len(c.streams)) - } - - // not exists - c.RemoveStreams(stream2) - - if len(c.streams) != 2 { - t.Fatalf("should have two streams, has %d", len(c.streams)) - } - - // remove all existing - c.RemoveStreams(stream0, stream1) - - if len(c.streams) != 0 { - t.Fatalf("should not have any streams, has %d", len(c.streams)) - } - -} diff --git a/staging/src/k8s.io/client-go/tools/portforward/portforward_test.go b/staging/src/k8s.io/client-go/tools/portforward/portforward_test.go index 034be748fe2a1..5b9afabeaab1f 100644 --- a/staging/src/k8s.io/client-go/tools/portforward/portforward_test.go +++ b/staging/src/k8s.io/client-go/tools/portforward/portforward_test.go @@ -68,9 +68,6 @@ func (c *fakeConnection) CloseChan() <-chan bool { return c.closeChan } -func (c *fakeConnection) RemoveStreams(_ ...httpstream.Stream) { -} - func (c *fakeConnection) SetIdleTimeout(timeout time.Duration) { // no-op }