From fcc0b8571fe4b5de0ff398797aacf36466f04d17 Mon Sep 17 00:00:00 2001 From: Odonno Date: Tue, 8 Aug 2023 22:38:10 +0200 Subject: [PATCH] Use RecyclableMemoryStream instead of MemoryStream --- src/Websocket.Client/ResponseMessage.cs | 35 ++++++-- src/Websocket.Client/Websocket.Client.csproj | 1 + .../WebsocketClient.Sending.cs | 40 +++++++++ src/Websocket.Client/WebsocketClient.cs | 84 ++++++++----------- 4 files changed, 100 insertions(+), 60 deletions(-) diff --git a/src/Websocket.Client/ResponseMessage.cs b/src/Websocket.Client/ResponseMessage.cs index c4ef4c0..ed9cc66 100644 --- a/src/Websocket.Client/ResponseMessage.cs +++ b/src/Websocket.Client/ResponseMessage.cs @@ -1,4 +1,5 @@ -using System.Net.WebSockets; +using System.IO; +using System.Net.WebSockets; namespace Websocket.Client { @@ -7,22 +8,30 @@ namespace Websocket.Client /// public class ResponseMessage { - private ResponseMessage(byte[] binary, string text, WebSocketMessageType messageType) + private readonly byte[] _binary; + + private ResponseMessage(MemoryStream memoryStream, byte[] binary, string text, WebSocketMessageType messageType) { - Binary = binary; + Stream = memoryStream; + _binary = binary; Text = text; MessageType = messageType; } /// - /// Received text message (only if type = WebSocketMessageType.Text) + /// Received text message (only if type = ) /// public string Text { get; } /// - /// Received text message (only if type = WebSocketMessageType.Binary) + /// Received text message (only if type = ) /// - public byte[] Binary { get; } + public byte[] Binary => Stream is null ? _binary : Stream.ToArray(); + + /// + /// Received stream message (only if type = and = false) + /// + public MemoryStream Stream { get; } /// /// Current message type (Text or Binary) @@ -47,7 +56,7 @@ public override string ToString() /// public static ResponseMessage TextMessage(string data) { - return new ResponseMessage(null, data, WebSocketMessageType.Text); + return new ResponseMessage(null, null, data, WebSocketMessageType.Text); } /// @@ -55,7 +64,15 @@ public static ResponseMessage TextMessage(string data) /// public static ResponseMessage BinaryMessage(byte[] data) { - return new ResponseMessage(data, null, WebSocketMessageType.Binary); + return new ResponseMessage(null, data, null, WebSocketMessageType.Binary); + } + + /// + /// Create stream response message + /// + public static ResponseMessage BinaryStreamMessage(MemoryStream memoryStream) + { + return new ResponseMessage(memoryStream, null, null, WebSocketMessageType.Binary); } } -} \ No newline at end of file +} diff --git a/src/Websocket.Client/Websocket.Client.csproj b/src/Websocket.Client/Websocket.Client.csproj index 386bcbe..b8ce297 100644 --- a/src/Websocket.Client/Websocket.Client.csproj +++ b/src/Websocket.Client/Websocket.Client.csproj @@ -31,6 +31,7 @@ all runtime; build; native; contentfiles; analyzers + diff --git a/src/Websocket.Client/WebsocketClient.Sending.cs b/src/Websocket.Client/WebsocketClient.Sending.cs index 294a952..ee3a268 100644 --- a/src/Websocket.Client/WebsocketClient.Sending.cs +++ b/src/Websocket.Client/WebsocketClient.Sending.cs @@ -1,5 +1,6 @@ using System; using System.Net.WebSockets; +using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; using Websocket.Client.Logging; @@ -56,6 +57,45 @@ public void Send(ArraySegment message) _messagesBinaryToSendQueue.Writer.TryWrite(message); } + /// + /// Send text message to the websocket channel. + /// It inserts the message to the queue and actual sending is done on an other thread + /// + /// Text message to be sent + /// The cancellationToken enables graceful cancellation of asynchronous operations + public ValueTask SendAsync(string message, CancellationToken cancellationToken = default) + { + Validations.Validations.ValidateInput(message, nameof(message)); + + return _messagesTextToSendQueue.Writer.WriteAsync(message, cancellationToken); + } + + /// + /// Send binary message to the websocket channel. + /// It inserts the message to the queue and actual sending is done on an other thread + /// + /// Binary message to be sent + /// The cancellationToken enables graceful cancellation of asynchronous operations + public ValueTask SendAsync(byte[] message, CancellationToken cancellationToken = default) + { + Validations.Validations.ValidateInput(message, nameof(message)); + + return _messagesBinaryToSendQueue.Writer.WriteAsync(new ArraySegment(message), cancellationToken); + } + + /// + /// Send binary message to the websocket channel. + /// It inserts the message to the queue and actual sending is done on an other thread + /// + /// Binary message to be sent + /// The cancellationToken enables graceful cancellation of asynchronous operations + public ValueTask SendAsync(ArraySegment message, CancellationToken cancellationToken = default) + { + Validations.Validations.ValidateInput(message, nameof(message)); + + return _messagesBinaryToSendQueue.Writer.WriteAsync(message, cancellationToken); + } + /// /// Send text message to the websocket channel. /// It doesn't use a sending queue, diff --git a/src/Websocket.Client/WebsocketClient.cs b/src/Websocket.Client/WebsocketClient.cs index 1318777..a5b670f 100644 --- a/src/Websocket.Client/WebsocketClient.cs +++ b/src/Websocket.Client/WebsocketClient.cs @@ -1,7 +1,7 @@ +using Microsoft.IO; using System; using System.Diagnostics; using System.IO; -using System.Linq; using System.Net.WebSockets; using System.Reactive.Linq; using System.Reactive.Subjects; @@ -24,6 +24,7 @@ public partial class WebsocketClient : IWebsocketClient private readonly WebsocketAsyncLock _locker = new WebsocketAsyncLock(); private readonly Func> _connectionFactory; + private static readonly RecyclableMemoryStreamManager _memoryStreamManager = new RecyclableMemoryStreamManager(); private Uri _url; private Timer _lastChanceTimer; @@ -159,6 +160,14 @@ public bool IsReconnectionEnabled /// public bool IsTextMessageConversionEnabled { get; set; } = true; + /// + /// Enable or disable automatic of the + /// after sending data (only available for binary response). + /// Setting value to false allows you to access the stream directly, however keep in mind you need to handle the dispose yourself. + /// Default: true + /// + public bool IsStreamDisposedAutomatically { get; set; } = true; + /// public Encoding MessageEncoding { get; set; } @@ -416,71 +425,43 @@ private async Task Listen(WebSocket client, CancellationToken token) { // define buffer here and reuse, to avoid more allocation const int chunkSize = 1024 * 4; +#if NETSTANDARD2_0 var buffer = new ArraySegment(new byte[chunkSize]); +#else + var buffer = new Memory(new byte[chunkSize]); +#endif do { +#if NETSTANDARD2_0 WebSocketReceiveResult result; - byte[] resultArrayWithTrailing = null; - var resultArraySize = 0; - var isResultArrayCloned = false; - MemoryStream ms = null; +#else + ValueWebSocketReceiveResult result; +#endif + var ms = _memoryStreamManager.GetStream() as RecyclableMemoryStream; while (true) { result = await client.ReceiveAsync(buffer, token); - var currentChunk = buffer.Array; - var currentChunkSize = result.Count; - - var isFirstChunk = resultArrayWithTrailing == null; - if (isFirstChunk) - { - // first chunk, use buffer as reference, do not allocate anything - resultArraySize += currentChunkSize; - resultArrayWithTrailing = currentChunk; - isResultArrayCloned = false; - } - else if (currentChunk == null) - { - // weird chunk, do nothing - } - else - { - // received more chunks, lets merge them via memory stream - if (ms == null) - { - // create memory stream and insert first chunk - ms = new MemoryStream(); - ms.Write(resultArrayWithTrailing, 0, resultArraySize); - } - // insert current chunk - ms.Write(currentChunk, buffer.Offset, currentChunkSize); - } +#if NETSTANDARD2_0 + ms.Write(buffer.AsSpan(0, result.Count)); +#else + ms.Write(buffer[..result.Count].Span); +#endif if (result.EndOfMessage) - { break; - } - - if (isResultArrayCloned) - continue; - - // we got more chunks incoming, need to clone first chunk - resultArrayWithTrailing = resultArrayWithTrailing?.ToArray(); - isResultArrayCloned = true; } - ms?.Seek(0, SeekOrigin.Begin); + ms.Seek(0, SeekOrigin.Begin); ResponseMessage message; + bool shouldDisposeStream = true; + if (result.MessageType == WebSocketMessageType.Text && IsTextMessageConversionEnabled) { - var data = ms != null ? - GetEncoding().GetString(ms.ToArray()) : - resultArrayWithTrailing != null ? - GetEncoding().GetString(resultArrayWithTrailing, 0, resultArraySize) : - null; + var data = GetEncoding().GetString(ms.ToArray()); message = ResponseMessage.TextMessage(data); } @@ -520,18 +501,19 @@ await StopInternal(client, WebSocketCloseStatus.NormalClosure, "Closing", } else { - if (ms != null) + if (IsStreamDisposedAutomatically) { message = ResponseMessage.BinaryMessage(ms.ToArray()); } else { - Array.Resize(ref resultArrayWithTrailing, resultArraySize); - message = ResponseMessage.BinaryMessage(resultArrayWithTrailing); + message = ResponseMessage.BinaryStreamMessage(ms); + shouldDisposeStream = false; } } - ms?.Dispose(); + if (shouldDisposeStream) + ms.Dispose(); Logger.Trace(L($"Received: {message}")); _lastReceivedMsg = DateTime.UtcNow;