diff --git a/Sources/GRPCNIOTransportCore/Client/GRPCClientStreamHandler.swift b/Sources/GRPCNIOTransportCore/Client/GRPCClientStreamHandler.swift index db9c43f..cdc8a60 100644 --- a/Sources/GRPCNIOTransportCore/Client/GRPCClientStreamHandler.swift +++ b/Sources/GRPCNIOTransportCore/Client/GRPCClientStreamHandler.swift @@ -29,6 +29,7 @@ final class GRPCClientStreamHandler: ChannelDuplexHandler { private var isReading = false private var flushPending = false + private var requestFinished = false init( methodDescriptor: MethodDescriptor, @@ -263,6 +264,12 @@ extension GRPCClientStreamHandler { ) case .noMoreMessages: + // If we're done writing (i.e. we have no more messages returned from state + // machine) then return. + if self.requestFinished { return } + + self.requestFinished = true + // Write an empty data frame with the EOS flag set, to signal the RPC // request is now finished. context.write( @@ -276,7 +283,6 @@ extension GRPCClientStreamHandler { ), promise: nil ) - context.flush() break loop diff --git a/Tests/GRPCNIOTransportCoreTests/Client/GRPCClientStreamHandlerTests.swift b/Tests/GRPCNIOTransportCoreTests/Client/GRPCClientStreamHandlerTests.swift index b2c09c7..7f62f2e 100644 --- a/Tests/GRPCNIOTransportCoreTests/Client/GRPCClientStreamHandlerTests.swift +++ b/Tests/GRPCNIOTransportCoreTests/Client/GRPCClientStreamHandlerTests.swift @@ -529,14 +529,16 @@ final class GRPCClientStreamHandlerTests: XCTestCase { // Half-close the outbound end: this would be triggered by finishing the client's writer. XCTAssertNoThrow(channel.close(mode: .output, promise: nil)) - // Flush to make sure the EOS is written. - channel.flush() - // Make sure the EOS frame was sent let emptyEOSFrame = try channel.assertReadDataOutbound() XCTAssertEqual(emptyEOSFrame.data, .byteBuffer(.init())) XCTAssertTrue(emptyEOSFrame.endStream) + // Make sure that, if we flush again, we're not writing anything else down + // the stream. We should have closed at this point. + channel.flush() + XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self)) + // Make sure we cannot write anymore because client's closed. XCTAssertThrowsError( ofType: RPCError.self,