Skip to content

Commit

Permalink
Use RecyclableMemoryStream instead of MemoryStream (#133)
Browse files Browse the repository at this point in the history
* Use RecyclableMemoryStream instead of MemoryStream

* Add SendTextAsBinary method

* Remove NETSTANDARD2_0 pragma conditions
  • Loading branch information
Odonno authored Feb 14, 2024
1 parent ca291cb commit caba38b
Show file tree
Hide file tree
Showing 5 changed files with 212 additions and 78 deletions.
36 changes: 36 additions & 0 deletions src/Websocket.Client/RequestMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using System;

namespace Websocket.Client
{
internal abstract class RequestMessage { }

internal class RequestTextMessage : RequestMessage
{
public string Text { get; }

public RequestTextMessage(string text)
{
Text = text;
}
}

internal class RequestBinaryMessage : RequestMessage
{
public byte[] Data { get; }

public RequestBinaryMessage(byte[] data)
{
Data = data;
}
}

internal class RequestBinarySegmentMessage : RequestMessage
{
public ArraySegment<byte> Data { get; }

public RequestBinarySegmentMessage(ArraySegment<byte> data)
{
Data = data;
}
}
}
35 changes: 26 additions & 9 deletions src/Websocket.Client/ResponseMessage.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Net.WebSockets;
using System.IO;
using System.Net.WebSockets;

namespace Websocket.Client
{
Expand All @@ -7,22 +8,30 @@ namespace Websocket.Client
/// </summary>
public class ResponseMessage
{
private ResponseMessage(byte[]? binary, string? text, WebSocketMessageType messageType)
private readonly byte[]? _binary;

private ResponseMessage(MemoryStream? memoryStream, byte[]? binary, string? text, WebSocketMessageType messageType)
{
Binary = binary;
Stream = memoryStream;
_binary = binary;
Text = text;
MessageType = messageType;
}

/// <summary>
/// Received text message (only if type = WebSocketMessageType.Text)
/// Received text message (only if type = <see cref="WebSocketMessageType.Text"/>)
/// </summary>
public string? Text { get; }

/// <summary>
/// Received text message (only if type = WebSocketMessageType.Binary)
/// Received text message (only if type = <see cref="WebSocketMessageType.Binary"/>)
/// </summary>
public byte[]? Binary { get; }
public byte[]? Binary => Stream is null ? _binary : Stream.ToArray();

/// <summary>
/// Received stream message (only if type = <see cref="WebSocketMessageType.Binary"/> and <see cref="WebsocketClient.IsStreamDisposedAutomatically"/> = false)
/// </summary>
public MemoryStream? Stream { get; }

/// <summary>
/// Current message type (Text or Binary)
Expand All @@ -47,15 +56,23 @@ public override string ToString()
/// </summary>
public static ResponseMessage TextMessage(string? data)
{
return new ResponseMessage(null, data, WebSocketMessageType.Text);
return new ResponseMessage(null, null, data, WebSocketMessageType.Text);
}

/// <summary>
/// Create binary response message
/// </summary>
public static ResponseMessage BinaryMessage(byte[]? data)
{
return new ResponseMessage(data, null, WebSocketMessageType.Binary);
return new ResponseMessage(null, data, null, WebSocketMessageType.Binary);
}

/// <summary>
/// Create stream response message
/// </summary>
public static ResponseMessage BinaryStreamMessage(MemoryStream? memoryStream)
{
return new ResponseMessage(memoryStream, null, null, WebSocketMessageType.Binary);
}
}
}
}
1 change: 1 addition & 0 deletions src/Websocket.Client/Websocket.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="7.0.1" />
<PackageReference Include="Microsoft.IO.RecyclableMemoryStream" Version="2.3.2" />
<PackageReference Include="System.Reactive" Version="6.0.0" />
<PackageReference Include="System.Threading.Channels" Version="7.0.0" />
</ItemGroup>
Expand Down
138 changes: 124 additions & 14 deletions src/Websocket.Client/WebsocketClient.Sending.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
using System;
using Microsoft.Extensions.Logging;
using System;
using System.Net.WebSockets;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;

namespace Websocket.Client
{
public partial class WebsocketClient
{
private readonly Channel<string> _messagesTextToSendQueue = Channel.CreateUnbounded<string>(new UnboundedChannelOptions()
private readonly Channel<RequestMessage> _messagesTextToSendQueue = Channel.CreateUnbounded<RequestMessage>(new UnboundedChannelOptions()
{
SingleReader = true,
SingleWriter = false
Expand All @@ -30,7 +31,7 @@ public void Send(string message)
{
Validations.Validations.ValidateInput(message, nameof(message));

_messagesTextToSendQueue.Writer.TryWrite(message);
_messagesTextToSendQueue.Writer.TryWrite(new RequestTextMessage(message));
}

/// <summary>
Expand All @@ -57,6 +58,45 @@ public void Send(ArraySegment<byte> message)
_messagesBinaryToSendQueue.Writer.TryWrite(message);
}

/// <summary>
/// Send text message to the websocket channel.
/// It inserts the message to the queue and actual sending is done on an other thread
/// </summary>
/// <param name="message">Text message to be sent</param>
/// <param name="cancellationToken">The cancellationToken enables graceful cancellation of asynchronous operations</param>
public ValueTask SendAsync(string message, CancellationToken cancellationToken = default)
{
Validations.Validations.ValidateInput(message, nameof(message));

return _messagesTextToSendQueue.Writer.WriteAsync(new RequestTextMessage(message), cancellationToken);
}

/// <summary>
/// Send binary message to the websocket channel.
/// It inserts the message to the queue and actual sending is done on an other thread
/// </summary>
/// <param name="message">Binary message to be sent</param>
/// <param name="cancellationToken">The cancellationToken enables graceful cancellation of asynchronous operations</param>
public ValueTask SendAsync(byte[] message, CancellationToken cancellationToken = default)
{
Validations.Validations.ValidateInput(message, nameof(message));

return _messagesBinaryToSendQueue.Writer.WriteAsync(new ArraySegment<byte>(message), cancellationToken);
}

/// <summary>
/// Send binary message to the websocket channel.
/// It inserts the message to the queue and actual sending is done on an other thread
/// </summary>
/// <param name="message">Binary message to be sent</param>
/// <param name="cancellationToken">The cancellationToken enables graceful cancellation of asynchronous operations</param>
public ValueTask SendAsync(ArraySegment<byte> message, CancellationToken cancellationToken = default)
{
Validations.Validations.ValidateInput(message, nameof(message));

return _messagesBinaryToSendQueue.Writer.WriteAsync(message, cancellationToken);
}

/// <summary>
/// Send text message to the websocket channel.
/// It doesn't use a sending queue,
Expand All @@ -68,7 +108,7 @@ public Task SendInstant(string message)
{
Validations.Validations.ValidateInput(message, nameof(message));

return SendInternalSynchronized(message);
return SendInternalSynchronized(new RequestTextMessage(message));
}

/// <summary>
Expand All @@ -83,6 +123,60 @@ public Task SendInstant(byte[] message)
return SendInternalSynchronized(new ArraySegment<byte>(message));
}

/// <summary>
/// Send already converted text message to the websocket channel.
/// Use this method to avoid double serialization of the text message.
/// It inserts the message to the queue and actual sending is done on an other thread
/// </summary>
/// <param name="message">Message to be sent</param>
public void SendTextAsBinary(byte[] message)
{
Validations.Validations.ValidateInput(message, nameof(message));

_messagesTextToSendQueue.Writer.TryWrite(new RequestBinaryMessage(message));
}

/// <summary>
/// Send already converted text message to the websocket channel.
/// Use this method to avoid double serialization of the text message.
/// It inserts the message to the queue and actual sending is done on an other thread
/// </summary>
/// <param name="message">Message to be sent</param>
public void SendTextAsBinary(ArraySegment<byte> message)
{
Validations.Validations.ValidateInput(message, nameof(message));

_messagesTextToSendQueue.Writer.TryWrite(new RequestBinarySegmentMessage(message));
}

/// <summary>
/// Send already converted text message to the websocket channel.
/// Use this method to avoid double serialization of the text message.
/// It inserts the message to the queue and actual sending is done on an other thread
/// </summary>
/// <param name="message">Message to be sent</param>
/// <param name="cancellationToken">The cancellationToken enables graceful cancellation of asynchronous operations</param>
public ValueTask SendTextAsBinaryAsync(byte[] message, CancellationToken cancellationToken = default)
{
Validations.Validations.ValidateInput(message, nameof(message));

return _messagesTextToSendQueue.Writer.WriteAsync(new RequestBinaryMessage(message), cancellationToken);
}

/// <summary>
/// Send already converted text message to the websocket channel.
/// Use this method to avoid double serialization of the text message.
/// It inserts the message to the queue and actual sending is done on an other thread
/// </summary>
/// <param name="message">Message to be sent</param>
/// <param name="cancellationToken">The cancellationToken enables graceful cancellation of asynchronous operations</param>
public ValueTask SendTextAsBinaryAsync(ArraySegment<byte> message, CancellationToken cancellationToken = default)
{
Validations.Validations.ValidateInput(message, nameof(message));

return _messagesTextToSendQueue.Writer.WriteAsync(new RequestBinarySegmentMessage(message), cancellationToken);
}

/// <summary>
/// Stream/publish fake message (via 'MessageReceived' observable).
/// Use for testing purposes to simulate a server message.
Expand Down Expand Up @@ -188,15 +282,15 @@ private void StartBackgroundThreadForSendingBinary()
_ = Task.Factory.StartNew(_ => SendBinaryFromQueue(), TaskCreationOptions.LongRunning, _cancellationTotal?.Token ?? CancellationToken.None);
}

private async Task SendInternalSynchronized(string message)
private async Task SendInternalSynchronized(RequestMessage message)
{
using (await _locker.LockAsync())
{
await SendInternal(message);
}
}

private async Task SendInternal(string message)
private async Task SendInternal(RequestMessage message)
{
if (!IsClientConnected())
{
Expand All @@ -205,10 +299,26 @@ private async Task SendInternal(string message)
}

_logger.LogTrace(L("Sending: {message}"), Name, message);
var buffer = GetEncoding().GetBytes(message);
var messageSegment = new ArraySegment<byte>(buffer);

ReadOnlyMemory<byte> payload;

switch (message)
{
case RequestTextMessage textMessage:
payload = MemoryMarshal.AsMemory<byte>(GetEncoding().GetBytes(textMessage.Text));
break;
case RequestBinaryMessage binaryMessage:
payload = MemoryMarshal.AsMemory<byte>(binaryMessage.Data);
break;
case RequestBinarySegmentMessage segmentMessage:
payload = segmentMessage.Data.AsMemory();
break;
default:
throw new ArgumentException($"Unknown message type: {message.GetType()}");
}

await _client!
.SendAsync(messageSegment, WebSocketMessageType.Text, true, _cancellation?.Token ?? CancellationToken.None)
.SendAsync(payload, WebSocketMessageType.Text, true, _cancellation?.Token ?? CancellationToken.None)
.ConfigureAwait(false);
}

Expand All @@ -220,18 +330,18 @@ private async Task SendInternalSynchronized(ArraySegment<byte> message)
}
}

private async Task SendInternal(ArraySegment<byte> message)
private async Task SendInternal(ArraySegment<byte> payload)
{
if (!IsClientConnected())
{
_logger.LogDebug(L("Client is not connected to server, cannot send binary, length: {length}"), Name, message.Count);
_logger.LogDebug(L("Client is not connected to server, cannot send binary, length: {length}"), Name, payload.Count);
return;
}

_logger.LogTrace(L("Sending binary, length: {length}"), Name, message.Count);
_logger.LogTrace(L("Sending binary, length: {length}"), Name, payload.Count);

await _client!
.SendAsync(message, WebSocketMessageType.Binary, true, _cancellation?.Token ?? CancellationToken.None)
.SendAsync(payload, WebSocketMessageType.Binary, true, _cancellation?.Token ?? CancellationToken.None)
.ConfigureAwait(false);
}
}
Expand Down
Loading

0 comments on commit caba38b

Please sign in to comment.