Skip to content

Commit

Permalink
Add SendTextAsBinary method
Browse files Browse the repository at this point in the history
  • Loading branch information
Odonno committed Sep 18, 2023
1 parent 3036d58 commit 84ebd4a
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 15 deletions.
36 changes: 36 additions & 0 deletions src/Websocket.Client/RequestMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using System;

namespace Websocket.Client
{
internal abstract class RequestMessage { }

internal class RequestTextMessage : RequestMessage
{
public string Text { get; }

public RequestTextMessage(string text)
{
Text = text;
}
}

internal class RequestBinaryMessage : RequestMessage
{
public byte[] Data { get; }

public RequestBinaryMessage(byte[] data)
{
Data = data;
}
}

internal class RequestBinarySegmentMessage : RequestMessage
{
public ArraySegment<byte> Data { get; }

public RequestBinarySegmentMessage(ArraySegment<byte> data)
{
Data = data;
}
}
}
114 changes: 99 additions & 15 deletions src/Websocket.Client/WebsocketClient.Sending.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using Microsoft.Extensions.Logging;
using System;
using System.Net.WebSockets;
using System.Runtime.InteropServices;
using System.Threading;
Expand All @@ -9,7 +10,7 @@ namespace Websocket.Client
{
public partial class WebsocketClient
{
private readonly Channel<string> _messagesTextToSendQueue = Channel.CreateUnbounded<string>(new UnboundedChannelOptions()
private readonly Channel<RequestMessage> _messagesTextToSendQueue = Channel.CreateUnbounded<RequestMessage>(new UnboundedChannelOptions()
{
SingleReader = true,
SingleWriter = false
Expand All @@ -30,7 +31,7 @@ public void Send(string message)
{
Validations.Validations.ValidateInput(message, nameof(message));

_messagesTextToSendQueue.Writer.TryWrite(message);
_messagesTextToSendQueue.Writer.TryWrite(new RequestTextMessage(message));
}

/// <summary>
Expand Down Expand Up @@ -67,7 +68,7 @@ public ValueTask SendAsync(string message, CancellationToken cancellationToken =
{
Validations.Validations.ValidateInput(message, nameof(message));

return _messagesTextToSendQueue.Writer.WriteAsync(message, cancellationToken);
return _messagesTextToSendQueue.Writer.WriteAsync(new RequestTextMessage(message), cancellationToken);
}

/// <summary>
Expand Down Expand Up @@ -107,7 +108,7 @@ public Task SendInstant(string message)
{
Validations.Validations.ValidateInput(message, nameof(message));

return SendInternalSynchronized(message);
return SendInternalSynchronized(new RequestTextMessage(message));
}

/// <summary>
Expand All @@ -122,6 +123,60 @@ public Task SendInstant(byte[] message)
return SendInternalSynchronized(new ArraySegment<byte>(message));
}

/// <summary>
/// Send already converted text message to the websocket channel.
/// Use this method to avoid double serialization of the text message.
/// It inserts the message to the queue and actual sending is done on an other thread
/// </summary>
/// <param name="message">Message to be sent</param>
public void SendTextAsBinary(byte[] message)
{
Validations.Validations.ValidateInput(message, nameof(message));

_messagesTextToSendQueue.Writer.TryWrite(new RequestBinaryMessage(message));
}

/// <summary>
/// Send already converted text message to the websocket channel.
/// Use this method to avoid double serialization of the text message.
/// It inserts the message to the queue and actual sending is done on an other thread
/// </summary>
/// <param name="message">Message to be sent</param>
public void SendTextAsBinary(ArraySegment<byte> message)
{
Validations.Validations.ValidateInput(message, nameof(message));

_messagesTextToSendQueue.Writer.TryWrite(new RequestBinarySegmentMessage(message));
}

/// <summary>
/// Send already converted text message to the websocket channel.
/// Use this method to avoid double serialization of the text message.
/// It inserts the message to the queue and actual sending is done on an other thread
/// </summary>
/// <param name="message">Message to be sent</param>
/// <param name="cancellationToken">The cancellationToken enables graceful cancellation of asynchronous operations</param>
public ValueTask SendTextAsBinaryAsync(byte[] message, CancellationToken cancellationToken = default)
{
Validations.Validations.ValidateInput(message, nameof(message));

return _messagesTextToSendQueue.Writer.WriteAsync(new RequestBinaryMessage(message), cancellationToken);
}

/// <summary>
/// Send already converted text message to the websocket channel.
/// Use this method to avoid double serialization of the text message.
/// It inserts the message to the queue and actual sending is done on an other thread
/// </summary>
/// <param name="message">Message to be sent</param>
/// <param name="cancellationToken">The cancellationToken enables graceful cancellation of asynchronous operations</param>
public ValueTask SendTextAsBinaryAsync(ArraySegment<byte> message, CancellationToken cancellationToken = default)
{
Validations.Validations.ValidateInput(message, nameof(message));

return _messagesTextToSendQueue.Writer.WriteAsync(new RequestBinarySegmentMessage(message), cancellationToken);
}

/// <summary>
/// Stream/publish fake message (via 'MessageReceived' observable).
/// Use for testing purposes to simulate a server message.
Expand Down Expand Up @@ -227,15 +282,15 @@ private void StartBackgroundThreadForSendingBinary()
_ = Task.Factory.StartNew(_ => SendBinaryFromQueue(), TaskCreationOptions.LongRunning, _cancellationTotal?.Token ?? CancellationToken.None);
}

private async Task SendInternalSynchronized(string message)
private async Task SendInternalSynchronized(RequestMessage message)
{
using (await _locker.LockAsync())
{
await SendInternal(message);
}
}

private async Task SendInternal(string message)
private async Task SendInternal(RequestMessage message)
{
if (!IsClientConnected())
{
Expand All @@ -245,14 +300,43 @@ private async Task SendInternal(string message)

_logger.LogTrace(L("Sending: {message}"), Name, message);
#if NETSTANDARD2_0
var buffer = GetEncoding().GetBytes(message);
var messageSegment = new ArraySegment<byte>(buffer);
ArraySegment<byte> payload;

switch (message)
{
case RequestTextMessage textMessage:
payload = new ArraySegment<byte>(GetEncoding().GetBytes(textMessage.Text));
break;
case RequestBinaryMessage binaryMessage:
payload = new ArraySegment<byte>(binaryMessage.Data);
break;
case RequestBinarySegmentMessage segmentMessage:
payload = segmentMessage.Data;
break;
default:
throw new ArgumentException($"Unknown message type: {message.GetType()}");
}
#else
ReadOnlyMemory<byte> messageSegment = MemoryMarshal.AsMemory<byte>(GetEncoding().GetBytes(message));
ReadOnlyMemory<byte> payload;

switch (message)
{
case RequestTextMessage textMessage:
payload = MemoryMarshal.AsMemory<byte>(GetEncoding().GetBytes(textMessage.Text));
break;
case RequestBinaryMessage binaryMessage:
payload = MemoryMarshal.AsMemory<byte>(binaryMessage.Data);
break;
case RequestBinarySegmentMessage segmentMessage:
payload = segmentMessage.Data.AsMemory();
break;
default:
throw new ArgumentException($"Unknown message type: {message.GetType()}");
}
#endif

await _client!
.SendAsync(messageSegment, WebSocketMessageType.Text, true, _cancellation?.Token ?? CancellationToken.None)
.SendAsync(payload, WebSocketMessageType.Text, true, _cancellation?.Token ?? CancellationToken.None)
.ConfigureAwait(false);
}

Expand All @@ -264,18 +348,18 @@ private async Task SendInternalSynchronized(ArraySegment<byte> message)
}
}

private async Task SendInternal(ArraySegment<byte> message)
private async Task SendInternal(ArraySegment<byte> payload)
{
if (!IsClientConnected())
{
_logger.LogDebug(L("Client is not connected to server, cannot send binary, length: {length}"), Name, message.Count);
_logger.LogDebug(L("Client is not connected to server, cannot send binary, length: {length}"), Name, payload.Count);
return;
}

_logger.LogTrace(L("Sending binary, length: {length}"), Name, message.Count);
_logger.LogTrace(L("Sending binary, length: {length}"), Name, payload.Count);

await _client!
.SendAsync(message, WebSocketMessageType.Binary, true, _cancellation?.Token ?? CancellationToken.None)
.SendAsync(payload, WebSocketMessageType.Binary, true, _cancellation?.Token ?? CancellationToken.None)
.ConfigureAwait(false);
}
}
Expand Down

0 comments on commit 84ebd4a

Please sign in to comment.