Skip to content

Commit

Permalink
Use RecyclableMemoryStream instead of MemoryStream
Browse files Browse the repository at this point in the history
  • Loading branch information
Odonno committed Sep 18, 2023
1 parent ca291cb commit 3036d58
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 62 deletions.
35 changes: 26 additions & 9 deletions src/Websocket.Client/ResponseMessage.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Net.WebSockets;
using System.IO;
using System.Net.WebSockets;

namespace Websocket.Client
{
Expand All @@ -7,22 +8,30 @@ namespace Websocket.Client
/// </summary>
public class ResponseMessage
{
private ResponseMessage(byte[]? binary, string? text, WebSocketMessageType messageType)
private readonly byte[]? _binary;

private ResponseMessage(MemoryStream? memoryStream, byte[]? binary, string? text, WebSocketMessageType messageType)
{
Binary = binary;
Stream = memoryStream;
_binary = binary;
Text = text;
MessageType = messageType;
}

/// <summary>
/// Received text message (only if type = WebSocketMessageType.Text)
/// Received text message (only if type = <see cref="WebSocketMessageType.Text"/>)
/// </summary>
public string? Text { get; }

/// <summary>
/// Received text message (only if type = WebSocketMessageType.Binary)
/// Received text message (only if type = <see cref="WebSocketMessageType.Binary"/>)
/// </summary>
public byte[]? Binary { get; }
public byte[]? Binary => Stream is null ? _binary : Stream.ToArray();

/// <summary>
/// Received stream message (only if type = <see cref="WebSocketMessageType.Binary"/> and <see cref="WebsocketClient.IsStreamDisposedAutomatically"/> = false)
/// </summary>
public MemoryStream? Stream { get; }

/// <summary>
/// Current message type (Text or Binary)
Expand All @@ -47,15 +56,23 @@ public override string ToString()
/// </summary>
public static ResponseMessage TextMessage(string? data)
{
return new ResponseMessage(null, data, WebSocketMessageType.Text);
return new ResponseMessage(null, null, data, WebSocketMessageType.Text);
}

/// <summary>
/// Create binary response message
/// </summary>
public static ResponseMessage BinaryMessage(byte[]? data)
{
return new ResponseMessage(data, null, WebSocketMessageType.Binary);
return new ResponseMessage(null, data, null, WebSocketMessageType.Binary);
}

/// <summary>
/// Create stream response message
/// </summary>
public static ResponseMessage BinaryStreamMessage(MemoryStream? memoryStream)
{
return new ResponseMessage(memoryStream, null, null, WebSocketMessageType.Binary);
}
}
}
}
1 change: 1 addition & 0 deletions src/Websocket.Client/Websocket.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="7.0.1" />
<PackageReference Include="Microsoft.IO.RecyclableMemoryStream" Version="2.3.2" />
<PackageReference Include="System.Reactive" Version="6.0.0" />
<PackageReference Include="System.Threading.Channels" Version="7.0.0" />
</ItemGroup>
Expand Down
46 changes: 45 additions & 1 deletion src/Websocket.Client/WebsocketClient.Sending.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
using System;
using System.Net.WebSockets;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;

namespace Websocket.Client
{
Expand Down Expand Up @@ -57,6 +57,45 @@ public void Send(ArraySegment<byte> message)
_messagesBinaryToSendQueue.Writer.TryWrite(message);
}

/// <summary>
/// Send text message to the websocket channel.
/// It inserts the message to the queue and actual sending is done on an other thread
/// </summary>
/// <param name="message">Text message to be sent</param>
/// <param name="cancellationToken">The cancellationToken enables graceful cancellation of asynchronous operations</param>
public ValueTask SendAsync(string message, CancellationToken cancellationToken = default)
{
Validations.Validations.ValidateInput(message, nameof(message));

return _messagesTextToSendQueue.Writer.WriteAsync(message, cancellationToken);
}

/// <summary>
/// Send binary message to the websocket channel.
/// It inserts the message to the queue and actual sending is done on an other thread
/// </summary>
/// <param name="message">Binary message to be sent</param>
/// <param name="cancellationToken">The cancellationToken enables graceful cancellation of asynchronous operations</param>
public ValueTask SendAsync(byte[] message, CancellationToken cancellationToken = default)
{
Validations.Validations.ValidateInput(message, nameof(message));

return _messagesBinaryToSendQueue.Writer.WriteAsync(new ArraySegment<byte>(message), cancellationToken);
}

/// <summary>
/// Send binary message to the websocket channel.
/// It inserts the message to the queue and actual sending is done on an other thread
/// </summary>
/// <param name="message">Binary message to be sent</param>
/// <param name="cancellationToken">The cancellationToken enables graceful cancellation of asynchronous operations</param>
public ValueTask SendAsync(ArraySegment<byte> message, CancellationToken cancellationToken = default)
{
Validations.Validations.ValidateInput(message, nameof(message));

return _messagesBinaryToSendQueue.Writer.WriteAsync(message, cancellationToken);
}

/// <summary>
/// Send text message to the websocket channel.
/// It doesn't use a sending queue,
Expand Down Expand Up @@ -205,8 +244,13 @@ private async Task SendInternal(string message)
}

_logger.LogTrace(L("Sending: {message}"), Name, message);
#if NETSTANDARD2_0
var buffer = GetEncoding().GetBytes(message);
var messageSegment = new ArraySegment<byte>(buffer);
#else
ReadOnlyMemory<byte> messageSegment = MemoryMarshal.AsMemory<byte>(GetEncoding().GetBytes(message));
#endif

await _client!
.SendAsync(messageSegment, WebSocketMessageType.Text, true, _cancellation?.Token ?? CancellationToken.None)
.ConfigureAwait(false);
Expand Down
87 changes: 35 additions & 52 deletions src/Websocket.Client/WebsocketClient.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.IO;
using System;
using System.IO;
using System.Linq;
using System.Net.WebSockets;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging.Abstractions;
using Websocket.Client.Exceptions;
using Websocket.Client.Threading;

Expand All @@ -22,6 +22,7 @@ public partial class WebsocketClient : IWebsocketClient
private readonly ILogger<WebsocketClient> _logger;
private readonly WebsocketAsyncLock _locker = new WebsocketAsyncLock();
private readonly Func<Uri, CancellationToken, Task<WebSocket>> _connectionFactory;
private static readonly RecyclableMemoryStreamManager _memoryStreamManager = new RecyclableMemoryStreamManager();

private Uri _url;
private Timer? _lastChanceTimer;
Expand Down Expand Up @@ -179,6 +180,15 @@ public bool IsReconnectionEnabled
/// </summary>
public bool IsTextMessageConversionEnabled { get; set; } = true;

/// <summary>
/// Enable or disable automatic <see cref="MemoryStream.Dispose(bool)"/> of the <see cref="MemoryStream"/>
/// after sending data (only available for binary response).
/// Setting value to false allows you to access the stream directly.
/// <warning>However, keep in mind that you need to handle the dispose yourself.</warning>
/// Default: true
/// </summary>
public bool IsStreamDisposedAutomatically { get; set; } = true;

/// <inheritdoc />
public Encoding? MessageEncoding { get; set; }

Expand Down Expand Up @@ -443,71 +453,43 @@ private async Task Listen(WebSocket client, CancellationToken token)
{
// define buffer here and reuse, to avoid more allocation
const int chunkSize = 1024 * 4;
#if NETSTANDARD2_0
var buffer = new ArraySegment<byte>(new byte[chunkSize]);
#else
var buffer = new Memory<byte>(new byte[chunkSize]);
#endif

do
{
#if NETSTANDARD2_0
WebSocketReceiveResult result;
byte[]? resultArrayWithTrailing = null;
var resultArraySize = 0;
var isResultArrayCloned = false;
MemoryStream? ms = null;
#else
ValueWebSocketReceiveResult result;
#endif
var ms = _memoryStreamManager.GetStream() as RecyclableMemoryStream;

while (true)
{
result = await client.ReceiveAsync(buffer, token);
var currentChunk = buffer.Array;
var currentChunkSize = result.Count;

var isFirstChunk = resultArrayWithTrailing == null;
if (isFirstChunk)
{
// first chunk, use buffer as reference, do not allocate anything
resultArraySize += currentChunkSize;
resultArrayWithTrailing = currentChunk;
isResultArrayCloned = false;
}
else if (currentChunk == null)
{
// weird chunk, do nothing
}
else
{
// received more chunks, lets merge them via memory stream
if (ms == null)
{
// create memory stream and insert first chunk
ms = new MemoryStream();
ms.Write(resultArrayWithTrailing!, 0, resultArraySize);
}

// insert current chunk
ms.Write(currentChunk, buffer.Offset, currentChunkSize);
}
#if NETSTANDARD2_0
ms.Write(buffer.AsSpan(0, result.Count));
#else
ms.Write(buffer[..result.Count].Span);
#endif

if (result.EndOfMessage)
{
break;
}

if (isResultArrayCloned)
continue;

// we got more chunks incoming, need to clone first chunk
resultArrayWithTrailing = resultArrayWithTrailing?.ToArray();
isResultArrayCloned = true;
}

ms?.Seek(0, SeekOrigin.Begin);
ms.Seek(0, SeekOrigin.Begin);

ResponseMessage message;
bool shouldDisposeStream = true;

if (result.MessageType == WebSocketMessageType.Text && IsTextMessageConversionEnabled)
{
var data = ms != null ?
GetEncoding().GetString(ms.ToArray()) :
resultArrayWithTrailing != null ?
GetEncoding().GetString(resultArrayWithTrailing, 0, resultArraySize) :
null;
var data = GetEncoding().GetString(ms.ToArray());

message = ResponseMessage.TextMessage(data);
}
Expand Down Expand Up @@ -547,18 +529,19 @@ await StopInternal(client, WebSocketCloseStatus.NormalClosure, "Closing",
}
else
{
if (ms != null)
if (IsStreamDisposedAutomatically)
{
message = ResponseMessage.BinaryMessage(ms.ToArray());
}
else
{
Array.Resize(ref resultArrayWithTrailing, resultArraySize);
message = ResponseMessage.BinaryMessage(resultArrayWithTrailing);
message = ResponseMessage.BinaryStreamMessage(ms);
shouldDisposeStream = false;
}
}

ms?.Dispose();
if (shouldDisposeStream)
ms.Dispose();

_logger.LogTrace(L("Received: {message}"), Name, message);
_lastReceivedMsg = DateTime.UtcNow;
Expand Down

0 comments on commit 3036d58

Please sign in to comment.