From c468763a1feb3229455f38353de5cc9b3cea16ef Mon Sep 17 00:00:00 2001 From: Maxim Kim Date: Wed, 11 Apr 2018 17:58:55 -0700 Subject: [PATCH] post rebase fixes --- examples/Telnet.Server/TelnetServerHandler.cs | 8 +- src/DotNetty.Codecs.Http/Cors/CorsHandler.cs | 13 +- .../DotNetty.Codecs.Http.csproj | 2 +- .../HttpClientUpgradeHandler.cs | 6 +- .../HttpServerExpectContinueHandler.cs | 16 +- .../HttpServerKeepAliveHandler.cs | 11 +- .../Compression/JZlibEncoder.cs | 3 +- src/DotNetty.Codecs/MessageAggregator.cs | 29 ++-- src/DotNetty.Codecs/MessageToMessageCodec.cs | 2 +- src/DotNetty.Codecs/TaskExtensions.cs | 59 +++++++ .../Concurrency/AbstractPromise.cs | 2 +- .../Streams/ChunkedWriteHandler.cs | 157 +++++++++--------- .../Channels/CombinedChannelDuplexHandler.cs | 6 +- .../HttpContentCompressorTest.cs | 7 +- .../ValueTaskExtensions.cs | 37 ----- test/DotNetty.Tests.End2End/End2EndTests.cs | 2 +- .../AutoReadTests.cs | 6 +- .../BufReleaseTests.cs | 2 +- .../DetectPeerCloseWithoutReadTests.cs | 1 + .../ExceptionHandlingTests.cs | 4 +- .../ReadPendingTests.cs | 6 +- .../WriteBeforeRegisteredTests.cs | 2 +- .../SocketDatagramChannelMulticastTest.cs | 4 +- .../SocketDatagramChannelUnicastTest.cs | 4 +- 24 files changed, 194 insertions(+), 195 deletions(-) create mode 100644 src/DotNetty.Codecs/TaskExtensions.cs delete mode 100644 test/DotNetty.Tests.Common/ValueTaskExtensions.cs diff --git a/examples/Telnet.Server/TelnetServerHandler.cs b/examples/Telnet.Server/TelnetServerHandler.cs index 8e7020b4c..2e85ae8a4 100644 --- a/examples/Telnet.Server/TelnetServerHandler.cs +++ b/examples/Telnet.Server/TelnetServerHandler.cs @@ -6,6 +6,7 @@ namespace Telnet.Server using System; using System.Net; using System.Threading.Tasks; + using DotNetty.Codecs; using DotNetty.Common.Concurrency; using DotNetty.Transport.Channels; @@ -17,7 +18,7 @@ public override void ChannelActive(IChannelHandlerContext contex) contex.WriteAndFlushAsync(string.Format("It is {0} now !\r\n", DateTime.Now)); } - protected override async void ChannelRead0(IChannelHandlerContext contex, string msg) + protected override void ChannelRead0(IChannelHandlerContext context, string msg) { // Generate and write a response. string response; @@ -36,11 +37,10 @@ protected override async void ChannelRead0(IChannelHandlerContext contex, string response = "Did you say '" + msg + "'?\r\n"; } - ValueTask waitClose = contex.WriteAndFlushAsync(response); + Task waitClose = context.WriteAndFlushAsync(response); if (close) { - await waitClose; - contex.CloseAsync(); + waitClose.CloseOnComplete(context); } } diff --git a/src/DotNetty.Codecs.Http/Cors/CorsHandler.cs b/src/DotNetty.Codecs.Http/Cors/CorsHandler.cs index c6e38d2e0..feb5453f6 100644 --- a/src/DotNetty.Codecs.Http/Cors/CorsHandler.cs +++ b/src/DotNetty.Codecs.Http/Cors/CorsHandler.cs @@ -167,7 +167,7 @@ void SetExposeHeaders(IHttpResponse response) void SetMaxAge(IHttpResponse response) => response.Headers.Set(HttpHeaderNames.AccessControlMaxAge, this.config.MaxAge); - public override Task WriteAsync(IChannelHandlerContext context, object message) + public override ValueTask WriteAsync(IChannelHandlerContext context, object message) { if (this.config.IsCorsSupportEnabled && message is IHttpResponse response) { @@ -177,7 +177,7 @@ public override Task WriteAsync(IChannelHandlerContext context, object message) this.SetExposeHeaders(response); } } - return context.WriteAndFlushAsync(message); + return context.WriteAndFlushAsync(message, true); } static void Forbidden(IChannelHandlerContext ctx, IHttpRequest request) @@ -197,15 +197,8 @@ static void Respond(IChannelHandlerContext ctx, IHttpRequest request, IHttpRespo Task task = ctx.WriteAndFlushAsync(response); if (!keepAlive) { - task.ContinueWith(CloseOnComplete, ctx, - TaskContinuationOptions.ExecuteSynchronously); + task.CloseOnComplete(ctx); } } - - static void CloseOnComplete(Task task, object state) - { - var ctx = (IChannelHandlerContext)state; - ctx.CloseAsync(); - } } } diff --git a/src/DotNetty.Codecs.Http/DotNetty.Codecs.Http.csproj b/src/DotNetty.Codecs.Http/DotNetty.Codecs.Http.csproj index b6e29b13d..92fa74bd8 100644 --- a/src/DotNetty.Codecs.Http/DotNetty.Codecs.Http.csproj +++ b/src/DotNetty.Codecs.Http/DotNetty.Codecs.Http.csproj @@ -29,7 +29,7 @@ - + diff --git a/src/DotNetty.Codecs.Http/HttpClientUpgradeHandler.cs b/src/DotNetty.Codecs.Http/HttpClientUpgradeHandler.cs index 9f10eeea3..122fd78e4 100644 --- a/src/DotNetty.Codecs.Http/HttpClientUpgradeHandler.cs +++ b/src/DotNetty.Codecs.Http/HttpClientUpgradeHandler.cs @@ -72,7 +72,7 @@ public HttpClientUpgradeHandler(ISourceCodec sourceCodec, IUpgradeCodec upgradeC this.upgradeCodec = upgradeCodec; } - public override Task WriteAsync(IChannelHandlerContext context, object message) + public override ValueTask WriteAsync(IChannelHandlerContext context, object message) { if (!(message is IHttpRequest)) { @@ -81,14 +81,14 @@ public override Task WriteAsync(IChannelHandlerContext context, object message) if (this.upgradeRequested) { - return TaskEx.FromException(new InvalidOperationException("Attempting to write HTTP request with upgrade in progress")); + return new ValueTask(TaskEx.FromException(new InvalidOperationException("Attempting to write HTTP request with upgrade in progress"))); } this.upgradeRequested = true; this.SetUpgradeRequestHeaders(context, (IHttpRequest)message); // Continue writing the request. - Task task = context.WriteAsync(message); + ValueTask task = context.WriteAsync(message); // Notify that the upgrade request was issued. context.FireUserEventTriggered(UpgradeEvent.UpgradeIssued); diff --git a/src/DotNetty.Codecs.Http/HttpServerExpectContinueHandler.cs b/src/DotNetty.Codecs.Http/HttpServerExpectContinueHandler.cs index de4e41746..6ac07f518 100644 --- a/src/DotNetty.Codecs.Http/HttpServerExpectContinueHandler.cs +++ b/src/DotNetty.Codecs.Http/HttpServerExpectContinueHandler.cs @@ -39,27 +39,15 @@ public override void ChannelRead(IChannelHandlerContext context, object message) // the expectation failed so we refuse the request. IHttpResponse rejection = this.RejectResponse(req); ReferenceCountUtil.Release(message); - context.WriteAndFlushAsync(rejection) - .ContinueWith(CloseOnFailure, context, TaskContinuationOptions.ExecuteSynchronously); + context.WriteAndFlushAsync(rejection).CloseOnFailure(context); return; } - context.WriteAndFlushAsync(accept) - .ContinueWith(CloseOnFailure, context, TaskContinuationOptions.ExecuteSynchronously); + context.WriteAndFlushAsync(accept).CloseOnFailure(context); req.Headers.Remove(HttpHeaderNames.Expect); } base.ChannelRead(context, message); } } - - static Task CloseOnFailure(Task task, object state) - { - if (task.IsFaulted) - { - var context = (IChannelHandlerContext)state; - return context.CloseAsync(); - } - return TaskEx.Completed; - } } } diff --git a/src/DotNetty.Codecs.Http/HttpServerKeepAliveHandler.cs b/src/DotNetty.Codecs.Http/HttpServerKeepAliveHandler.cs index 981ff209b..cbec7c65c 100644 --- a/src/DotNetty.Codecs.Http/HttpServerKeepAliveHandler.cs +++ b/src/DotNetty.Codecs.Http/HttpServerKeepAliveHandler.cs @@ -31,7 +31,7 @@ public override void ChannelRead(IChannelHandlerContext context, object message) base.ChannelRead(context, message); } - public override Task WriteAsync(IChannelHandlerContext context, object message) + public override ValueTask WriteAsync(IChannelHandlerContext context, object message) { // modify message on way out to add headers if needed if (message is IHttpResponse response) @@ -52,18 +52,11 @@ public override Task WriteAsync(IChannelHandlerContext context, object message) } if (message is ILastHttpContent && !this.ShouldKeepAlive()) { - return base.WriteAsync(context, message) - .ContinueWith(CloseOnComplete, context, TaskContinuationOptions.ExecuteSynchronously); + return new ValueTask(base.WriteAsync(context, message).CloseOnComplete(context)); } return base.WriteAsync(context, message); } - static Task CloseOnComplete(Task task, object state) - { - var context = (IChannelHandlerContext)state; - return context.CloseAsync(); - } - void TrackResponse(IHttpResponse response) { if (!IsInformational(response)) diff --git a/src/DotNetty.Codecs/Compression/JZlibEncoder.cs b/src/DotNetty.Codecs/Compression/JZlibEncoder.cs index c4600175f..c5609e8dd 100644 --- a/src/DotNetty.Codecs/Compression/JZlibEncoder.cs +++ b/src/DotNetty.Codecs/Compression/JZlibEncoder.cs @@ -240,8 +240,7 @@ Task FinishEncode(IChannelHandlerContext context) this.z.next_out = null; } - return context.WriteAndFlushAsync(footer) - .ContinueWith(_ => context.CloseAsync()); + return context.WriteAndFlushAsync(footer).CloseOnComplete(context); } public override void HandlerAdded(IChannelHandlerContext context) => this.ctx = context; diff --git a/src/DotNetty.Codecs/MessageAggregator.cs b/src/DotNetty.Codecs/MessageAggregator.cs index 79f5e9c62..76106dd03 100644 --- a/src/DotNetty.Codecs/MessageAggregator.cs +++ b/src/DotNetty.Codecs/MessageAggregator.cs @@ -130,13 +130,10 @@ protected internal override void Decode(IChannelHandlerContext context, TMessage bool closeAfterWrite = this.CloseAfterContinueResponse(continueResponse); this.handlingOversizedMessage = this.IgnoreContentAfterContinueResponse(continueResponse); - Task task = context - .WriteAndFlushAsync(continueResponse) - .ContinueWith(ContinueResponseWriteAction, context, TaskContinuationOptions.ExecuteSynchronously); + WriteContinueResponse(context, continueResponse, closeAfterWrite); if (closeAfterWrite) { - task.ContinueWith(CloseAfterWriteAction, context, TaskContinuationOptions.ExecuteSynchronously); return; } @@ -245,19 +242,21 @@ protected internal override void Decode(IChannelHandlerContext context, TMessage throw new MessageAggregationException("Unknown aggregation state."); } } - - static void CloseAfterWriteAction(Task task, object state) + + static async void WriteContinueResponse(IChannelHandlerContext ctx, object message, bool closeAfterWrite) { - var ctx = (IChannelHandlerContext)state; - ctx.Channel.CloseAsync(); - } - - static void ContinueResponseWriteAction(Task task, object state) - { - if (task.IsFaulted) + try + { + await ctx.WriteAndFlushAsync(message); + } + catch (Exception ex) + { + ctx.FireExceptionCaught(ex); + } + + if (closeAfterWrite) { - var ctx = (IChannelHandlerContext)state; - ctx.FireExceptionCaught(task.Exception); + ctx.Channel.CloseAsync(); } } diff --git a/src/DotNetty.Codecs/MessageToMessageCodec.cs b/src/DotNetty.Codecs/MessageToMessageCodec.cs index a990193a2..5b4368323 100644 --- a/src/DotNetty.Codecs/MessageToMessageCodec.cs +++ b/src/DotNetty.Codecs/MessageToMessageCodec.cs @@ -50,7 +50,7 @@ protected MessageToMessageCodec() public sealed override void ChannelRead(IChannelHandlerContext context, object message) => this.decoder.ChannelRead(context, message); - public sealed override Task WriteAsync(IChannelHandlerContext context, object message) => + public sealed override ValueTask WriteAsync(IChannelHandlerContext context, object message) => this.encoder.WriteAsync(context, message); public virtual bool AcceptInboundMessage(object msg) => msg is TInbound; diff --git a/src/DotNetty.Codecs/TaskExtensions.cs b/src/DotNetty.Codecs/TaskExtensions.cs new file mode 100644 index 000000000..b5af165d0 --- /dev/null +++ b/src/DotNetty.Codecs/TaskExtensions.cs @@ -0,0 +1,59 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace DotNetty.Codecs +{ + using System; + using System.Threading.Tasks; + using DotNetty.Common.Utilities; + using DotNetty.Transport.Channels; + + public static class TaskExtensions + { + public static async Task CloseOnComplete(this ValueTask task, IChannelHandlerContext ctx) + { + try + { + await task; + } + finally + { + await ctx.CloseAsync(); + } + } + + static readonly Func CloseOnCompleteContinuation = Close; + static readonly Func CloseOnFailureContinuation = CloseOnFailure; + + public static Task CloseOnComplete(this Task task, IChannelHandlerContext ctx) + => task.ContinueWith(CloseOnCompleteContinuation, ctx, TaskContinuationOptions.ExecuteSynchronously); + + public static Task CloseOnComplete(this Task task, IChannel channel) + => task.ContinueWith(CloseOnCompleteContinuation, channel, TaskContinuationOptions.ExecuteSynchronously); + + public static Task CloseOnFailure(this Task task, IChannelHandlerContext ctx) + => task.ContinueWith(CloseOnFailureContinuation, ctx, TaskContinuationOptions.ExecuteSynchronously); + + static Task Close(Task task, object state) + { + switch (state) + { + case IChannelHandlerContext ctx: + return ctx.CloseAsync(); + case IChannel ch: + return ch.CloseAsync(); + default: + throw new InvalidOperationException("must never get here"); + } + } + + static Task CloseOnFailure(Task task, object state) + { + if (task.IsFaulted) + { + return Close(task, state); + } + return TaskEx.Completed; + } + } +} \ No newline at end of file diff --git a/src/DotNetty.Common/Concurrency/AbstractPromise.cs b/src/DotNetty.Common/Concurrency/AbstractPromise.cs index 1563bb8f4..e02c9bff7 100644 --- a/src/DotNetty.Common/Concurrency/AbstractPromise.cs +++ b/src/DotNetty.Common/Concurrency/AbstractPromise.cs @@ -22,7 +22,7 @@ public abstract class AbstractPromise : IPromise, IValueTaskSource static readonly Action TaskSchedulerCallback = Execute; static readonly Action TaskScheduleCallbackWithExecutionContext = ExecuteWithExecutionContext; - static readonly Exception CompletedSentinel = new Exception(); + protected static readonly Exception CompletedSentinel = new Exception(); short currentId; protected Exception exception; diff --git a/src/DotNetty.Handlers/Streams/ChunkedWriteHandler.cs b/src/DotNetty.Handlers/Streams/ChunkedWriteHandler.cs index 438856ae5..bf7120af8 100644 --- a/src/DotNetty.Handlers/Streams/ChunkedWriteHandler.cs +++ b/src/DotNetty.Handlers/Streams/ChunkedWriteHandler.cs @@ -7,6 +7,7 @@ namespace DotNetty.Handlers.Streams using System.Collections.Generic; using System.Threading.Tasks; using DotNetty.Buffers; + using DotNetty.Common; using DotNetty.Common.Concurrency; using DotNetty.Common.Internal.Logging; using DotNetty.Common.Utilities; @@ -39,11 +40,11 @@ public void ResumeTransfer() } } - public override Task WriteAsync(IChannelHandlerContext context, object message) + public override ValueTask WriteAsync(IChannelHandlerContext context, object message) { - var pendingWrite = new PendingWrite(message); + var pendingWrite = PendingWrite.NewInstance(context.Executor, message); this.queue.Enqueue(pendingWrite); - return pendingWrite.PendingTask; + return pendingWrite; } public override void Flush(IChannelHandlerContext context) => this.DoFlush(context); @@ -97,16 +98,16 @@ void Discard(Exception cause = null) cause = new ClosedChannelException(); } - current.Fail(cause); + current.TrySetException(cause); } else { - current.Success(); + current.TryComplete(); } } catch (Exception exception) { - current.Fail(exception); + current.TrySetException(exception); Logger.Warn($"{StringUtil.SimpleClassName(typeof(ChunkedWriteHandler))}.IsEndOfInput failed", exception); } finally @@ -121,7 +122,7 @@ void Discard(Exception cause = null) cause = new ClosedChannelException(); } - current.Fail(cause); + current.TrySetException(cause); } } } @@ -197,7 +198,7 @@ void DoFlush(IChannelHandlerContext context) ReferenceCountUtil.Release(message); } - current.Fail(exception); + current.TrySetException(exception); CloseInput(chunks); break; @@ -218,7 +219,7 @@ void DoFlush(IChannelHandlerContext context) message = Unpooled.Empty; } - Task future = context.WriteAsync(message); + ValueTask writeFuture = context.WriteAsync(message); if (endOfInput) { this.currentWrite = null; @@ -228,54 +229,62 @@ void DoFlush(IChannelHandlerContext context) // be closed before its not written. // // See https://github.com/netty/netty/issues/303 - future.ContinueWith((_, state) => + CloseOnComplete(writeFuture, current, chunks); + + async void CloseOnComplete(ValueTask future, PendingWrite promise, IChunkedInput input) + { + try { - var pendingTask = (PendingWrite)state; - CloseInput((IChunkedInput)pendingTask.Message); - pendingTask.Success(); - }, - current, - TaskContinuationOptions.ExecuteSynchronously); + await future; + } + finally + { + promise.Progress(input.Progress, input.Length); + promise.TryComplete(); + CloseInput(input); + } + } } else if (channel.IsWritable) { - future.ContinueWith((task, state) => + ProgressOnComplete(writeFuture, current, chunks); + + async void ProgressOnComplete(ValueTask future, PendingWrite promise, IChunkedInput input) + { + try { - var pendingTask = (PendingWrite)state; - if (task.IsFaulted) - { - CloseInput((IChunkedInput)pendingTask.Message); - pendingTask.Fail(task.Exception); - } - else - { - pendingTask.Progress(chunks.Progress, chunks.Length); - } - }, - current, - TaskContinuationOptions.ExecuteSynchronously); + await future; + promise.Progress(input.Progress, input.Length); + } + catch(Exception ex) + { + CloseInput((IChunkedInput)promise.Message); + promise.TrySetException(ex); + } + } } else { - future.ContinueWith((task, state) => + ProgressAndResumeOnComplete(writeFuture, this, channel, chunks); + + async void ProgressAndResumeOnComplete(ValueTask future, ChunkedWriteHandler handler, IChannel ch, IChunkedInput input) { - var handler = (ChunkedWriteHandler) state; - if (task.IsFaulted) - { - CloseInput((IChunkedInput)handler.currentWrite.Message); - handler.currentWrite.Fail(task.Exception); - } - else + PendingWrite promise = handler.currentWrite; + try { - handler.currentWrite.Progress(chunks.Progress, chunks.Length); - if (channel.IsWritable) + await future; + promise.Progress(input.Progress, input.Length); + if (ch.IsWritable) { handler.ResumeTransfer(); } } - }, - this, - TaskContinuationOptions.ExecuteSynchronously); + catch(Exception ex) + { + CloseInput((IChunkedInput)promise.Message); + promise.TrySetException(ex); + } + } } // Flush each chunk to conserve memory @@ -284,22 +293,7 @@ void DoFlush(IChannelHandlerContext context) } else { - context.WriteAsync(pendingMessage) - .ContinueWith((task, state) => - { - var pendingTask = (PendingWrite)state; - if (task.IsFaulted) - { - pendingTask.Fail(task.Exception); - } - else - { - pendingTask.Success(); - } - }, - current, - TaskContinuationOptions.ExecuteSynchronously); - + context.WriteAsync(pendingMessage).LinkOutcome(current); this.currentWrite = null; requiresFlush = true; } @@ -332,37 +326,48 @@ static void CloseInput(IChunkedInput chunks) } } - sealed class PendingWrite + sealed class PendingWrite : AbstractRecyclablePromise { - readonly TaskCompletionSource promise; - - public PendingWrite(object msg) + static readonly ThreadLocalPool Pool = new ThreadLocalPool(h => new PendingWrite(h)); + + PendingWrite(ThreadLocalPool.Handle handle) + : base(handle) { - this.Message = msg; - this.promise = new TaskCompletionSource(); + } + + public static PendingWrite NewInstance(IEventExecutor executor, object msg) + { + PendingWrite entry = Pool.Take(); + entry.Init(executor); + entry.Message = msg; + return entry; } - public object Message { get; } - - public void Success() => this.promise.TryComplete(); + public object Message { get; private set; } - public void Fail(Exception error) + protected override bool TryComplete0(Exception exception, out bool continuationInvoked) { - ReferenceCountUtil.Release(this.Message); - this.promise.TrySetException(error); + if (exception != CompletedSentinel) + { + ReferenceCountUtil.Release(this.Message); + } + + return base.TryComplete0(exception, out continuationInvoked); } public void Progress(long progress, long total) { - if (progress < total) + /*if (progress < total) { return; - } - - this.Success(); + }*/ } - public Task PendingTask => this.promise.Task; + protected override void Recycle() + { + this.Message = null; + base.Recycle(); + } } } } diff --git a/src/DotNetty.Transport/Channels/CombinedChannelDuplexHandler.cs b/src/DotNetty.Transport/Channels/CombinedChannelDuplexHandler.cs index 5b854e73b..085622235 100644 --- a/src/DotNetty.Transport/Channels/CombinedChannelDuplexHandler.cs +++ b/src/DotNetty.Transport/Channels/CombinedChannelDuplexHandler.cs @@ -355,7 +355,7 @@ public override void Read(IChannelHandlerContext context) } } - public override Task WriteAsync(IChannelHandlerContext context, object message) + public override ValueTask WriteAsync(IChannelHandlerContext context, object message) { Contract.Assert(context == this.outboundCtx.InnerContext); @@ -492,7 +492,7 @@ public IChannelHandlerContext Read() return this; } - public Task WriteAsync(object message) => this.ctx.WriteAsync(message); + public ValueTask WriteAsync(object message) => this.ctx.WriteAsync(message); public IChannelHandlerContext Flush() { @@ -501,6 +501,8 @@ public IChannelHandlerContext Flush() } public Task WriteAndFlushAsync(object message) => this.ctx.WriteAndFlushAsync(message); + + public ValueTask WriteAndFlushAsync(object message, bool notifyComplete) => this.ctx.WriteAndFlushAsync(message, notifyComplete); public IAttribute GetAttribute(AttributeKey key) where T : class => this.ctx.GetAttribute(key); diff --git a/test/DotNetty.Codecs.Http.Tests/HttpContentCompressorTest.cs b/test/DotNetty.Codecs.Http.Tests/HttpContentCompressorTest.cs index ed0bcd240..e5695cc10 100644 --- a/test/DotNetty.Codecs.Http.Tests/HttpContentCompressorTest.cs +++ b/test/DotNetty.Codecs.Http.Tests/HttpContentCompressorTest.cs @@ -366,12 +366,9 @@ public void TooManyResponses() ch.WriteOutbound(new DefaultFullHttpResponse(HttpVersion.Http11, HttpResponseStatus.OK, Unpooled.Empty)); Assert.True(false, "Should not get here, expecting exception thrown"); } - catch (AggregateException e) + catch (EncoderException e) { - Assert.Single(e.InnerExceptions); - Assert.IsType(e.InnerExceptions[0]); - Exception exception = e.InnerExceptions[0]; - Assert.IsType(exception.InnerException); + Assert.IsType(e.InnerException); } Assert.True(ch.Finish()); diff --git a/test/DotNetty.Tests.Common/ValueTaskExtensions.cs b/test/DotNetty.Tests.Common/ValueTaskExtensions.cs deleted file mode 100644 index 24ae3a963..000000000 --- a/test/DotNetty.Tests.Common/ValueTaskExtensions.cs +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. - -namespace DotNetty.Tests.Common -{ - using System.Threading.Tasks; - using DotNetty.Transport.Channels; - - public static class ValueTaskExtensions - { - public static async void CloseOnComplete(this ValueTask task, IChannel channel) - { - try - { - await task; - } - finally - { - channel.CloseAsync(); - } - - } - - - public static async void CloseOnComplete(this ValueTask task, IChannelHandlerContext ctx) - { - try - { - await task; - } - finally - { - ctx.CloseAsync(); - } - } - } -} \ No newline at end of file diff --git a/test/DotNetty.Tests.End2End/End2EndTests.cs b/test/DotNetty.Tests.End2End/End2EndTests.cs index 18bbc450a..39473d46b 100644 --- a/test/DotNetty.Tests.End2End/End2EndTests.cs +++ b/test/DotNetty.Tests.End2End/End2EndTests.cs @@ -90,7 +90,7 @@ public async Task EchoServerAndClient() string[] messages = { "message 1", string.Join(",", Enumerable.Range(1, 300)) }; foreach (string message in messages) { - await clientChannel.WriteAndFlushAsync(Unpooled.WrappedBuffer(Encoding.UTF8.GetBytes(message))).AsTask().WithTimeout(DefaultTimeout); + await clientChannel.WriteAndFlushAsync(Unpooled.WrappedBuffer(Encoding.UTF8.GetBytes(message))).WithTimeout(DefaultTimeout); var responseMessage = Assert.IsAssignableFrom(await readListener.ReceiveAsync()); Assert.Equal(message, responseMessage.ToString(Encoding.UTF8)); diff --git a/test/DotNetty.Transport.Libuv.Tests/AutoReadTests.cs b/test/DotNetty.Transport.Libuv.Tests/AutoReadTests.cs index de5d4c4ab..c8f3bc6b2 100644 --- a/test/DotNetty.Transport.Libuv.Tests/AutoReadTests.cs +++ b/test/DotNetty.Transport.Libuv.Tests/AutoReadTests.cs @@ -75,13 +75,13 @@ void AutoReadOffDuringReadOnlyReadsOneTime0(bool readOutsideEventLoopThread, Ser Assert.NotNull(this.clientChannel.LocalAddress); // 3 bytes means 3 independent reads for TestRecvByteBufAllocator - ValueTask writeTask = this.clientChannel.WriteAndFlushAsync(Unpooled.WrappedBuffer(new byte[3])); - Assert.True(writeTask.AsTask().Wait(TimeSpan.FromSeconds(5)), "Client write task timed out"); + Task writeTask = this.clientChannel.WriteAndFlushAsync(Unpooled.WrappedBuffer(new byte[3])); + Assert.True(writeTask.Wait(TimeSpan.FromSeconds(5)), "Client write task timed out"); serverInitializer.AutoReadHandler.AssertSingleRead(); // 3 bytes means 3 independent reads for TestRecvByteBufAllocator writeTask = serverInitializer.Channel.WriteAndFlushAsync(Unpooled.WrappedBuffer(new byte[3])); - Assert.True(writeTask.AsTask().Wait(TimeSpan.FromSeconds(5)), "Server write task timed out"); + Assert.True(writeTask.Wait(TimeSpan.FromSeconds(5)), "Server write task timed out"); clientInitializer.AutoReadHandler.AssertSingleRead(); if (readOutsideEventLoopThread) diff --git a/test/DotNetty.Transport.Libuv.Tests/BufReleaseTests.cs b/test/DotNetty.Transport.Libuv.Tests/BufReleaseTests.cs index d5650f0ae..91220118c 100644 --- a/test/DotNetty.Transport.Libuv.Tests/BufReleaseTests.cs +++ b/test/DotNetty.Transport.Libuv.Tests/BufReleaseTests.cs @@ -107,7 +107,7 @@ public override void ChannelActive(IChannelHandlerContext ctx) // call retain on it so it can't be put back on the pool this.buf.WriteBytes(data).Retain(); - this.writeTask = ctx.Channel.WriteAndFlushAsync(this.buf).AsTask(); + this.writeTask = ctx.Channel.WriteAndFlushAsync(this.buf); } protected override void ChannelRead0(IChannelHandlerContext ctx, object msg) diff --git a/test/DotNetty.Transport.Libuv.Tests/DetectPeerCloseWithoutReadTests.cs b/test/DotNetty.Transport.Libuv.Tests/DetectPeerCloseWithoutReadTests.cs index 53a473198..e89d8140e 100644 --- a/test/DotNetty.Transport.Libuv.Tests/DetectPeerCloseWithoutReadTests.cs +++ b/test/DotNetty.Transport.Libuv.Tests/DetectPeerCloseWithoutReadTests.cs @@ -8,6 +8,7 @@ namespace DotNetty.Transport.Libuv.Tests using System.Threading; using System.Threading.Tasks; using DotNetty.Buffers; + using DotNetty.Codecs; using DotNetty.Common.Concurrency; using DotNetty.Tests.Common; using DotNetty.Transport.Bootstrapping; diff --git a/test/DotNetty.Transport.Libuv.Tests/ExceptionHandlingTests.cs b/test/DotNetty.Transport.Libuv.Tests/ExceptionHandlingTests.cs index e503bb456..2d5e865aa 100644 --- a/test/DotNetty.Transport.Libuv.Tests/ExceptionHandlingTests.cs +++ b/test/DotNetty.Transport.Libuv.Tests/ExceptionHandlingTests.cs @@ -61,8 +61,8 @@ void ReadPendingIsResetAfterEachRead0(ServerBootstrap sb, Bootstrap cb) this.clientChannel = task.Result; Assert.NotNull(this.clientChannel.LocalAddress); - ValueTask writeTask = this.clientChannel.WriteAndFlushAsync(Unpooled.WrappedBuffer(new byte[1024])); - Assert.True(writeTask.AsTask().Wait(DefaultTimeout), "Write task timed out"); + Task writeTask = this.clientChannel.WriteAndFlushAsync(Unpooled.WrappedBuffer(new byte[1024])); + Assert.True(writeTask.Wait(DefaultTimeout), "Write task timed out"); ExceptionHandler exceptionHandler = serverInitializer.ErrorHandler; Assert.True(exceptionHandler.Inactive.Wait(DefaultTimeout), "Handler inactive timed out"); diff --git a/test/DotNetty.Transport.Libuv.Tests/ReadPendingTests.cs b/test/DotNetty.Transport.Libuv.Tests/ReadPendingTests.cs index 772e3de2b..a3644787c 100644 --- a/test/DotNetty.Transport.Libuv.Tests/ReadPendingTests.cs +++ b/test/DotNetty.Transport.Libuv.Tests/ReadPendingTests.cs @@ -71,13 +71,13 @@ void ReadPendingIsResetAfterEachRead0(ServerBootstrap sb, Bootstrap cb) Assert.NotNull(this.clientChannel.LocalAddress); // 4 bytes means 2 read loops for TestNumReadsRecvByteBufAllocator - ValueTask writeTask = this.clientChannel.WriteAndFlushAsync(Unpooled.WrappedBuffer(new byte[4])); - Assert.True(writeTask.AsTask().Wait(TimeSpan.FromSeconds(5)), "Client write task timed out"); + Task writeTask = this.clientChannel.WriteAndFlushAsync(Unpooled.WrappedBuffer(new byte[4])); + Assert.True(writeTask.Wait(TimeSpan.FromSeconds(5)), "Client write task timed out"); // 4 bytes means 2 read loops for TestNumReadsRecvByteBufAllocator Assert.True(serverInitializer.Initialize.Wait(DefaultTimeout), "Server initializer timed out"); writeTask = serverInitializer.Channel.WriteAndFlushAsync(Unpooled.WrappedBuffer(new byte[4])); - Assert.True(writeTask.AsTask().Wait(TimeSpan.FromSeconds(5)), "Server write task timed out"); + Assert.True(writeTask.Wait(TimeSpan.FromSeconds(5)), "Server write task timed out"); serverInitializer.Channel.Read(); serverInitializer.ReadPendingHandler.AssertAllRead(); diff --git a/test/DotNetty.Transport.Libuv.Tests/WriteBeforeRegisteredTests.cs b/test/DotNetty.Transport.Libuv.Tests/WriteBeforeRegisteredTests.cs index 539034b3b..8c91a0913 100644 --- a/test/DotNetty.Transport.Libuv.Tests/WriteBeforeRegisteredTests.cs +++ b/test/DotNetty.Transport.Libuv.Tests/WriteBeforeRegisteredTests.cs @@ -42,7 +42,7 @@ void WriteBeforeConnect0(Bootstrap cb) this.clientChannel = task.Result; Task connectTask = this.clientChannel.ConnectAsync(LoopbackAnyPort); - Task writeTask = this.clientChannel.WriteAndFlushAsync(Unpooled.WrappedBuffer(new byte[] { 1 })).AsTask(); + Task writeTask = this.clientChannel.WriteAndFlushAsync(Unpooled.WrappedBuffer(new byte[] { 1 })); var error = Assert.Throws(() => writeTask.Wait(DefaultTimeout)); Assert.Single(error.InnerExceptions); diff --git a/test/DotNetty.Transport.Tests/Channel/Sockets/SocketDatagramChannelMulticastTest.cs b/test/DotNetty.Transport.Tests/Channel/Sockets/SocketDatagramChannelMulticastTest.cs index 4f72cc801..840c8ab33 100644 --- a/test/DotNetty.Transport.Tests/Channel/Sockets/SocketDatagramChannelMulticastTest.cs +++ b/test/DotNetty.Transport.Tests/Channel/Sockets/SocketDatagramChannelMulticastTest.cs @@ -155,7 +155,7 @@ public void Multicast(AddressFamily addressFamily, IByteBufferAllocator allocato Assert.True(joinTask.Wait(TimeSpan.FromMilliseconds(DefaultTimeOutInMilliseconds * 5)), $"Multicast server join group {groupAddress} timed out!"); - clientChannel.WriteAndFlushAsync(new DatagramPacket(Unpooled.Buffer().WriteInt(1), groupAddress)).AsTask().Wait(); + clientChannel.WriteAndFlushAsync(new DatagramPacket(Unpooled.Buffer().WriteInt(1), groupAddress)).Wait(); Assert.True(multicastHandler.WaitForResult(), "Multicast server should have receivied the message."); Task leaveTask = serverChannel.LeaveGroup(groupAddress, loopback); @@ -166,7 +166,7 @@ public void Multicast(AddressFamily addressFamily, IByteBufferAllocator allocato Task.Delay(DefaultTimeOutInMilliseconds).Wait(); // we should not receive a message anymore as we left the group before - clientChannel.WriteAndFlushAsync(new DatagramPacket(Unpooled.Buffer().WriteInt(1), groupAddress)).AsTask().Wait(); + clientChannel.WriteAndFlushAsync(new DatagramPacket(Unpooled.Buffer().WriteInt(1), groupAddress)).Wait(); Assert.False(multicastHandler.WaitForResult(), "Multicast server should not receive the message."); } finally diff --git a/test/DotNetty.Transport.Tests/Channel/Sockets/SocketDatagramChannelUnicastTest.cs b/test/DotNetty.Transport.Tests/Channel/Sockets/SocketDatagramChannelUnicastTest.cs index 4f6ea9710..908a03c15 100644 --- a/test/DotNetty.Transport.Tests/Channel/Sockets/SocketDatagramChannelUnicastTest.cs +++ b/test/DotNetty.Transport.Tests/Channel/Sockets/SocketDatagramChannelUnicastTest.cs @@ -217,12 +217,12 @@ public void SimpleSend(IByteBuffer source, bool bindClient, IByteBufferAllocator for (int i = 0; i < count; i++) { var packet = new DatagramPacket((IByteBuffer)source.Retain(), new IPEndPoint(address, endPoint.Port)); - clientChannel.WriteAndFlushAsync(packet).AsTask().Wait(); + clientChannel.WriteAndFlushAsync(packet).Wait(); Assert.True(handler.WaitForResult()); var duplicatedPacket = (DatagramPacket)packet.Duplicate(); duplicatedPacket.Retain(); - clientChannel.WriteAndFlushAsync(duplicatedPacket).AsTask().Wait(); + clientChannel.WriteAndFlushAsync(duplicatedPacket).Wait(); Assert.True(handler.WaitForResult()); } }