From 1b1eca2ae6646d4d25b207f956abe456fa5bed1d Mon Sep 17 00:00:00 2001 From: Daniel Wertheim Date: Sat, 15 May 2021 22:11:57 +0200 Subject: [PATCH] Fixes reconnect issues and exception logging when socket read times out (#90) --- src/main/MyNatsClient/INatsConnection.cs | 5 +- .../Internals/Extensions/SocketExtensions.cs | 2 +- .../MyNatsClient/Internals/NatsConnection.cs | 57 +++++--- .../Internals/NatsConnectionManager.cs | 46 +++++-- .../MyNatsClient/Internals/NatsEncoder.cs | 2 +- src/main/MyNatsClient/NatsClient.cs | 39 ++++-- src/main/MyNatsClient/NatsException.cs | 5 +- src/main/MyNatsClient/NatsExceptionCodes.cs | 1 + src/main/MyNatsClient/NatsOpStreamReader.cs | 127 ++++++++++-------- src/main/MyNatsClient/Ops/NullOp.cs | 16 +++ src/samples/RequestResponseSample/Program.cs | 10 +- .../RequestResponseSample.csproj | 4 + .../UnitTests/NatsOpStreamReaderTests.cs | 76 +++++------ 13 files changed, 243 insertions(+), 147 deletions(-) create mode 100644 src/main/MyNatsClient/Ops/NullOp.cs diff --git a/src/main/MyNatsClient/INatsConnection.cs b/src/main/MyNatsClient/INatsConnection.cs index 254e8bc..6d343a7 100644 --- a/src/main/MyNatsClient/INatsConnection.cs +++ b/src/main/MyNatsClient/INatsConnection.cs @@ -8,12 +8,11 @@ public interface INatsConnection : IDisposable { INatsServerInfo ServerInfo { get; } bool IsConnected { get; } - bool CanRead { get; } - IEnumerable ReadOp(); + IEnumerable ReadOps(); void WithWriteLock(Action a); void WithWriteLock(Action a, TArg arg); Task WithWriteLockAsync(Func a); Task WithWriteLockAsync(Func a, TArg arg); } -} \ No newline at end of file +} diff --git a/src/main/MyNatsClient/Internals/Extensions/SocketExtensions.cs b/src/main/MyNatsClient/Internals/Extensions/SocketExtensions.cs index 5ce57bb..83920d0 100644 --- a/src/main/MyNatsClient/Internals/Extensions/SocketExtensions.cs +++ b/src/main/MyNatsClient/Internals/Extensions/SocketExtensions.cs @@ -62,4 +62,4 @@ internal static NetworkStream CreateReadWriteStream(this Socket socket) return ns; } } -} \ No newline at end of file +} diff --git a/src/main/MyNatsClient/Internals/NatsConnection.cs b/src/main/MyNatsClient/Internals/NatsConnection.cs index aae6538..7ef0082 100644 --- a/src/main/MyNatsClient/Internals/NatsConnection.cs +++ b/src/main/MyNatsClient/Internals/NatsConnection.cs @@ -5,13 +5,13 @@ using System.Net.Sockets; using System.Threading; using System.Threading.Tasks; +using Microsoft.Extensions.Logging; namespace MyNatsClient.Internals { internal sealed class NatsConnection : INatsConnection { - private readonly Func _socketIsConnected; - private readonly Func _canRead; + private readonly ILogger _logger = LoggerManager.CreateLogger(); private readonly CancellationToken _cancellationToken; private Socket _socket; @@ -24,8 +24,12 @@ internal sealed class NatsConnection : INatsConnection private bool _isDisposed; public INatsServerInfo ServerInfo { get; } - public bool IsConnected => _socketIsConnected(); - public bool CanRead => _canRead(); + public bool IsConnected => _socket.Connected; + + private bool CanRead => + _socket.Connected && + _stream.CanRead && + !_cancellationToken.IsCancellationRequested; internal NatsConnection( NatsServerInfo serverInfo, @@ -45,10 +49,7 @@ internal NatsConnection( _cancellationToken = cancellationToken; _writeStreamSync = new SemaphoreSlim(1, 1); _writer = new NatsStreamWriter(_writeStream, _cancellationToken); - _reader = new NatsOpStreamReader(_readStream); - - _socketIsConnected = () => _socket?.Connected == true; - _canRead = () => _socket?.Connected == true && _stream != null && _stream.CanRead && !_cancellationToken.IsCancellationRequested; + _reader = NatsOpStreamReader.Use(_readStream); } public void Dispose() @@ -71,9 +72,11 @@ void TryDispose(IDisposable disposable) } } + TryDispose(_reader); TryDispose(_writeStream); TryDispose(_readStream); TryDispose(_stream); + try { _socket.Shutdown(SocketShutdown.Both); @@ -82,9 +85,11 @@ void TryDispose(IDisposable disposable) { exs.Add(ex); } + TryDispose(_socket); TryDispose(_writeStreamSync); + _reader = null; _writeStream = null; _readStream = null; _stream = null; @@ -97,13 +102,33 @@ void TryDispose(IDisposable disposable) throw new AggregateException("Failed while disposing connection. See inner exception(s) for more details.", exs); } - public IEnumerable ReadOp() + private void ThrowIfDisposed() + { + if (_isDisposed) + throw new ObjectDisposedException(GetType().Name); + } + + private void ThrowIfNotConnected() + { + if (!IsConnected) + throw NatsException.NotConnected(); + } + + public IEnumerable ReadOps() { ThrowIfDisposed(); ThrowIfNotConnected(); - return _reader.ReadOps(); + _logger.LogDebug("Starting OPs read loop"); + + while (CanRead) + { +#if Debug + _logger.LogDebug("Reading OP"); +#endif + yield return _reader.ReadOp(); + } } public void WithWriteLock(Action a) @@ -177,17 +202,5 @@ public async Task WithWriteLockAsync(Func a _writeStreamSync.Release(); } } - - private void ThrowIfDisposed() - { - if (_isDisposed) - throw new ObjectDisposedException(GetType().Name); - } - - private void ThrowIfNotConnected() - { - if (!IsConnected) - throw NatsException.NotConnected(); - } } } diff --git a/src/main/MyNatsClient/Internals/NatsConnectionManager.cs b/src/main/MyNatsClient/Internals/NatsConnectionManager.cs index 8cf8e25..c7f6bd8 100644 --- a/src/main/MyNatsClient/Internals/NatsConnectionManager.cs +++ b/src/main/MyNatsClient/Internals/NatsConnectionManager.cs @@ -17,7 +17,7 @@ namespace MyNatsClient.Internals { internal class NatsConnectionManager : INatsConnectionManager { - private static readonly ILogger Logger = LoggerManager.CreateLogger(); + private readonly ILogger _logger = LoggerManager.CreateLogger(); private readonly ISocketFactory _socketFactory; @@ -50,7 +50,7 @@ internal NatsConnectionManager(ISocketFactory socketFactory) } catch (Exception ex) { - Logger.LogError(ex, "Error while connecting to {Host}. Trying with next host (if any).", host); + _logger.LogError(ex, "Error while connecting to {Host}. Trying with next host (if any).", host); if (!ShouldTryAndConnect()) throw; @@ -68,6 +68,7 @@ private static bool DefaultServerCertificateValidation(X509Certificate certifica ConnectionInfo connectionInfo, CancellationToken cancellationToken) { + _logger.LogInformation("Establishing connection to {Host}", host); var serverCertificateValidation = connectionInfo.ServerCertificateValidation ?? DefaultServerCertificateValidation; bool RemoteCertificateValidationCallback(object _, X509Certificate certificate, X509Chain chain, SslPolicyErrors errors) @@ -76,19 +77,23 @@ bool RemoteCertificateValidationCallback(object _, X509Certificate certificate, var consumedOps = new List(); Socket socket = null; Stream stream = null; + NatsOpStreamReader reader = null; try { + _logger.LogDebug("Creating socket."); socket = _socketFactory.Create(connectionInfo.SocketOptions); await socket.ConnectAsync( host, connectionInfo.SocketOptions.ConnectTimeoutMs, cancellationToken).ConfigureAwait(false); + _logger.LogDebug("Creating read write stream."); stream = socket.CreateReadWriteStream(); - var reader = new NatsOpStreamReader(stream); + reader = NatsOpStreamReader.Use(stream); - var op = reader.ReadOneOp(); + _logger.LogDebug("Trying to read InfoOp."); + var op = reader.ReadOp(); if (op == null) throw NatsException.FailedToConnectToHost(host, "Expected to get INFO after establishing connection. Got nothing."); @@ -97,6 +102,7 @@ await socket.ConnectAsync( throw NatsException.FailedToConnectToHost(host, $"Expected to get INFO after establishing connection. Got {op.GetType().Name}."); + _logger.LogDebug("Parsing server info."); var serverInfo = NatsServerInfo.Parse(infoOp.Message); var credentials = host.HasNonEmptyCredentials() ? host.Credentials : connectionInfo.Credentials; if (serverInfo.AuthRequired && (credentials == null || credentials == Credentials.Empty)) @@ -109,6 +115,7 @@ await socket.ConnectAsync( if (serverInfo.TlsRequired) { + _logger.LogDebug("Creating SSL Stream."); stream = new SslStream(stream, false, RemoteCertificateValidationCallback, null, EncryptionPolicy.RequireEncryption); var ssl = (SslStream) stream; @@ -123,29 +130,37 @@ await socket.ConnectAsync( TargetHost = host.Address }; + _logger.LogDebug("Performing SSL client authentication."); await ssl.AuthenticateAsClientAsync(clientAuthOptions, cancellationToken).ConfigureAwait(false); - reader = new NatsOpStreamReader(ssl); + reader.SetNewSource(ssl); } + _logger.LogDebug("Sending Connect."); stream.Write(ConnectCmd.Generate(connectionInfo.Verbose, credentials, connectionInfo.Name)); + _logger.LogDebug("Sending Ping."); stream.Write(PingCmd.Bytes.Span); await stream.FlushAsync(cancellationToken).ConfigureAwait(false); - op = reader.ReadOneOp(); - if (op == null) - throw NatsException.FailedToConnectToHost(host, - "Expected to read something after CONNECT and PING. Got nothing."); - - if (op is ErrOp) - throw NatsException.FailedToConnectToHost(host, - $"Expected to get PONG after sending CONNECT and PING. Got {op.Marker}."); + _logger.LogDebug("Trying to read OP to see if connection was established."); + op = reader.ReadOp(); + switch (op) + { + case NullOp: + throw NatsException.FailedToConnectToHost(host, + "Expected to read something after CONNECT and PING. Got nothing."); + case ErrOp: + throw NatsException.FailedToConnectToHost(host, + $"Expected to get PONG after sending CONNECT and PING. Got {op.Marker}."); + } if (!socket.Connected) throw NatsException.FailedToConnectToHost(host, "No connection could be established."); consumedOps.Add(op); + _logger.LogInformation("Connection successfully established to {Host}", host); + return ( new NatsConnection( serverInfo, @@ -157,6 +172,11 @@ await socket.ConnectAsync( catch { Swallow.Everything( + () => + { + reader?.Dispose(); + reader = null; + }, () => { stream?.Dispose(); diff --git a/src/main/MyNatsClient/Internals/NatsEncoder.cs b/src/main/MyNatsClient/Internals/NatsEncoder.cs index 246d3ad..5d136af 100644 --- a/src/main/MyNatsClient/Internals/NatsEncoder.cs +++ b/src/main/MyNatsClient/Internals/NatsEncoder.cs @@ -41,4 +41,4 @@ internal static int WriteSingleByteChars(Span trg, int trgOffset, ReadOnly return trgOffset; } } -} \ No newline at end of file +} diff --git a/src/main/MyNatsClient/NatsClient.cs b/src/main/MyNatsClient/NatsClient.cs index 361810d..a6565ba 100644 --- a/src/main/MyNatsClient/NatsClient.cs +++ b/src/main/MyNatsClient/NatsClient.cs @@ -40,13 +40,15 @@ public sealed class NatsClient : INatsClient, IDisposable private readonly string _inboxAddress; private ISubscription _inboxSubscription; - private readonly ConcurrentDictionary> _outstandingRequests = new ConcurrentDictionary>(); + + private readonly ConcurrentDictionary> _outstandingRequests = + new ConcurrentDictionary>(); public string Id { get; } public INatsObservable Events => _eventMediator; public INatsObservable OpStream => _opMediator.AllOpsStream; public INatsObservable MsgOpStream => _opMediator.MsgOpsStream; - public bool IsConnected => _connection != null && _connection.IsConnected && _connection.CanRead; + public bool IsConnected => _connection?.IsConnected == true; public NatsClient( ConnectionInfo connectionInfo, @@ -192,6 +194,8 @@ public async Task ConnectAsync() DoSafeRelease(); + _logger.LogDebug("Emitting ClientDisconnected due to failure"); + _eventMediator.Emit(new ClientDisconnected(this, DisconnectReason.DueToFailure)); var ex = t.Exception?.GetBaseException() ?? t.Exception; @@ -229,11 +233,15 @@ private void ConsumerWork() { bool ShouldDoWork() => !_isDisposed && IsConnected && _cancellation?.IsCancellationRequested == false; + _logger.LogDebug("Starting consumer worker {IsConnected}", IsConnected); + var lastOpReceivedAt = DateTime.UtcNow; var ping = false; while (ShouldDoWork()) { + _logger.LogDebug("Consumer tick."); + try { if (ping) @@ -243,8 +251,11 @@ private void ConsumerWork() Ping(); } - foreach (var op in _connection.ReadOp()) + foreach (var op in _connection.ReadOps()) { + if (op == NullOp.Instance) + throw NatsException.ClientCouldNotConsumeStream(); + lastOpReceivedAt = DateTime.UtcNow; _opMediator.Emit(op); @@ -259,7 +270,7 @@ private void ConsumerWork() } } } - catch (NatsException nex) when (nex.ExceptionCode == NatsExceptionCodes.OpParserError) + catch (NatsException nex) when (nex.ExceptionCode is NatsExceptionCodes.OpParserError or NatsExceptionCodes.ClientCouldNotConsumeStream) { throw; } @@ -268,11 +279,11 @@ private void ConsumerWork() if (!ShouldDoWork()) break; - _logger.LogError(ex, "Worker got Exception."); - if (ex.InnerException is SocketException socketEx) { - _logger.LogError("Worker task got SocketException with SocketErrorCode={SocketErrorCode}", socketEx.SocketErrorCode); + _logger.LogWarning( + "Consumer task got SocketException with error code {SocketErrorCode} Frequency of Timeouts is controlled via ReceiveTimeout.", + socketEx.SocketErrorCode); if (socketEx.SocketErrorCode == SocketError.Interrupted) break; @@ -280,14 +291,16 @@ private void ConsumerWork() if (socketEx.SocketErrorCode != SocketError.TimedOut) throw; } + else + _logger.LogError(ex, "Consumer task failed"); + } - var silenceDeltaMs = DateTime.UtcNow.Subtract(lastOpReceivedAt).TotalMilliseconds; - if (silenceDeltaMs >= ConsumerMaxMsSilenceFromServer) - throw NatsException.ConnectionFoundIdling(_connection.ServerInfo.Host, _connection.ServerInfo.Port); + var silenceDeltaMs = DateTime.UtcNow.Subtract(lastOpReceivedAt).TotalMilliseconds; + if (silenceDeltaMs >= ConsumerMaxMsSilenceFromServer) + throw NatsException.ConnectionFoundIdling(_connection.ServerInfo.Host, _connection.ServerInfo.Port); - if (silenceDeltaMs >= ConsumerPingAfterMsSilenceFromServer) - ping = true; - } + if (silenceDeltaMs >= ConsumerPingAfterMsSilenceFromServer) + ping = true; } } diff --git a/src/main/MyNatsClient/NatsException.cs b/src/main/MyNatsClient/NatsException.cs index 2c0f84e..61bc2e3 100644 --- a/src/main/MyNatsClient/NatsException.cs +++ b/src/main/MyNatsClient/NatsException.cs @@ -7,7 +7,7 @@ public class NatsException : Exception { public string ExceptionCode { get; private set; } - protected NatsException(string exceptionCode, string message) + private NatsException(string exceptionCode, string message) : base(message) { ExceptionCode = exceptionCode ?? NatsExceptionCodes.Unknown; @@ -40,6 +40,9 @@ internal static NatsException ConnectionFoundIdling(string host, int port) internal static NatsException ClientReceivedErrOp(ErrOp errOp) => new NatsException(NatsExceptionCodes.ClientReceivedErrOp, $"Client received ErrOp with message='{errOp.Message}'."); + internal static NatsException ClientCouldNotConsumeStream() + => new(NatsExceptionCodes.ClientCouldNotConsumeStream, "Client could not consume stream."); + internal static NatsException OpParserError(string message) => new NatsException(NatsExceptionCodes.OpParserError, message); diff --git a/src/main/MyNatsClient/NatsExceptionCodes.cs b/src/main/MyNatsClient/NatsExceptionCodes.cs index dbbff5a..6dbe661 100644 --- a/src/main/MyNatsClient/NatsExceptionCodes.cs +++ b/src/main/MyNatsClient/NatsExceptionCodes.cs @@ -11,6 +11,7 @@ public static class NatsExceptionCodes public const string ExceededMaxPayload = "ExceededMaxPayload"; public const string ConnectionFoundIdling = "ConnectionFoundIdling"; public const string ClientReceivedErrOp = "ClientReceivedErrOp"; + public const string ClientCouldNotConsumeStream = "ClientCouldNotConsumeStream"; public const string OpParserError = "OpParser.Error"; public const string OpParserOpParsingError = "OpParser.Error"; public const string OpParserUnsupportedOp = "OpParser.UnsupportedOp"; diff --git a/src/main/MyNatsClient/NatsOpStreamReader.cs b/src/main/MyNatsClient/NatsOpStreamReader.cs index ec71f86..2b8949a 100644 --- a/src/main/MyNatsClient/NatsOpStreamReader.cs +++ b/src/main/MyNatsClient/NatsOpStreamReader.cs @@ -2,12 +2,17 @@ using System.Collections.Generic; using System.Collections.Immutable; using System.IO; -using System.Linq; using MyNatsClient.Ops; namespace MyNatsClient { - public class NatsOpStreamReader + /// + /// Reads from a Stream. + /// + /// + /// Intentionally not locking so currently not safe for parallel use. + /// + public class NatsOpStreamReader : IDisposable { private const byte Empty = (byte) '\0'; private const byte SpaceDelimiter = (byte) ' '; @@ -22,13 +27,43 @@ public class NatsOpStreamReader private const byte Plus = (byte) '+'; private const byte Minus = (byte) '-'; - private readonly Stream _stream; + private Stream _source; + private readonly MemoryStream _workspace; + private readonly byte[] _opMarkerChars = new byte[4]; + private readonly byte[] _readSingleByteBuff = new byte[1]; - public NatsOpStreamReader(Stream stream) - => _stream = stream ?? throw new ArgumentNullException(nameof(stream)); + private NatsOpStreamReader(Stream source) + { + _source = source ?? throw new ArgumentNullException(nameof(source)); + _workspace = new MemoryStream(); + } + + public static NatsOpStreamReader Use(Stream stream) + => new(stream); + + public void Dispose() + { + _workspace.Dispose(); + } + + private void Reset() + { + _opMarkerChars[0] = Empty; + _opMarkerChars[1] = Empty; + _opMarkerChars[2] = Empty; + _opMarkerChars[3] = Empty; + _workspace.Position = 0; + _readSingleByteBuff[0] = Empty; + } + + public void SetNewSource(Stream source) + { + _source = source; + Reset(); + } private static bool IsDelimiter(byte c) - => c == SpaceDelimiter || c == TabDelimiter; + => c is SpaceDelimiter or TabDelimiter; private static ReadOnlySpan ToChars(ReadOnlySpan source) { @@ -346,75 +381,59 @@ private static MsgOp ParseHMsgOp(Stream source, Stream workspace) payload); } - public IEnumerable ReadOps() + public IOp ReadOp() { - IOp op = null; - using var workspace = new MemoryStream(); - var opMarkerChars = new byte[4]; + Reset(); + var i = -1; while (true) { - var curr = _stream.ReadByte(); - if (curr == -1) - yield break; + var curr = _source.CanRead ? _source.Read(_readSingleByteBuff, 0, 1) : 0; + if (curr == 0) + return NullOp.Instance; - var c = (byte) curr; + var c = _readSingleByteBuff[0]; if (!IsDelimiter(c) && c != Cr && c != Lf) { - opMarkerChars[++i] = c; + _opMarkerChars[++i] = c; continue; } if (i == -1) continue; - if (opMarkerChars[0] == M) - op = ParseMsgOp(_stream, workspace); - else if (opMarkerChars[0] == H) - op = ParseHMsgOp(_stream, workspace); - else if (opMarkerChars[0] == P) + if (_opMarkerChars[0] == M) + return ParseMsgOp(_source, _workspace); + if (_opMarkerChars[0] == H) + return ParseHMsgOp(_source, _workspace); + if (_opMarkerChars[0] == P) { - if (opMarkerChars[1] == I) - op = ParsePingOp(_stream); - else if (opMarkerChars[1] == O) - op = ParsePongOp(_stream); + if (_opMarkerChars[1] == I) + return ParsePingOp(_source); + if (_opMarkerChars[1] == O) + return ParsePongOp(_source); } - else if (opMarkerChars[0] == I) - op = ParseInfoOp(_stream, workspace); - else if (opMarkerChars[0] == Plus) - op = ParseOkOp(_stream); - else if (opMarkerChars[0] == Minus) - op = ParseErrorOp(_stream, workspace); - - if (op == null) - { - var opMarker = string.Create(i + 1, opMarkerChars, (t, v) => - { - if (t.Length == 4) - t[3] = (char) v[3]; - t[2] = (char) v[2]; - t[1] = (char) v[1]; - t[0] = (char) v[0]; - }); + if (_opMarkerChars[0] == I) + return ParseInfoOp(_source, _workspace); + if (_opMarkerChars[0] == Plus) + return ParseOkOp(_source); + if (_opMarkerChars[0] == Minus) + return ParseErrorOp(_source, _workspace); - throw NatsException.OpParserUnsupportedOp(opMarker); - } - - i = -1; - opMarkerChars[0] = Empty; - opMarkerChars[1] = Empty; - opMarkerChars[2] = Empty; - opMarkerChars[3] = Empty; - workspace.Position = 0; + var opMarker = string.Create(i + 1, _opMarkerChars, (t, v) => + { + if (t.Length == 4) + t[3] = (char) v[3]; - yield return op; + t[2] = (char) v[2]; + t[1] = (char) v[1]; + t[0] = (char) v[0]; + }); - op = null; + throw NatsException.OpParserUnsupportedOp(opMarker); } } - - public IOp ReadOneOp() => ReadOps().First(); } } diff --git a/src/main/MyNatsClient/Ops/NullOp.cs b/src/main/MyNatsClient/Ops/NullOp.cs new file mode 100644 index 0000000..8764711 --- /dev/null +++ b/src/main/MyNatsClient/Ops/NullOp.cs @@ -0,0 +1,16 @@ +namespace MyNatsClient.Ops +{ + public sealed class NullOp : IOp + { + private const string OpMarker = "NULL"; + + public string Marker => OpMarker; + + public static readonly NullOp Instance = new(); + + private NullOp() { } + + public override string ToString() + => OpMarker; + } +} diff --git a/src/samples/RequestResponseSample/Program.cs b/src/samples/RequestResponseSample/Program.cs index 57b9552..7ec7c56 100644 --- a/src/samples/RequestResponseSample/Program.cs +++ b/src/samples/RequestResponseSample/Program.cs @@ -1,5 +1,7 @@ using System; using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Console; using MyNatsClient; using MyNatsClient.Rx; @@ -11,10 +13,16 @@ public class Program public static async Task Main(string[] args) { + LoggerManager.UseFactory(LoggerFactory.Create(b => b + .AddFilter("System", LogLevel.Information) + .AddFilter("Microsoft", LogLevel.Information) + .SetMinimumLevel(LogLevel.Debug) + .AddConsole())); + var cnInfo = new ConnectionInfo("localhost"); _client = new NatsClient(cnInfo); - + await _client.ConnectAsync(); _client.Sub("getTemp", stream => stream.Subscribe(msg => diff --git a/src/samples/RequestResponseSample/RequestResponseSample.csproj b/src/samples/RequestResponseSample/RequestResponseSample.csproj index 2bfc198..8082945 100644 --- a/src/samples/RequestResponseSample/RequestResponseSample.csproj +++ b/src/samples/RequestResponseSample/RequestResponseSample.csproj @@ -11,4 +11,8 @@ + + + + diff --git a/src/testing/UnitTests/NatsOpStreamReaderTests.cs b/src/testing/UnitTests/NatsOpStreamReaderTests.cs index d0122ce..83e0b8e 100644 --- a/src/testing/UnitTests/NatsOpStreamReaderTests.cs +++ b/src/testing/UnitTests/NatsOpStreamReaderTests.cs @@ -22,30 +22,30 @@ public void Should_be_able_to_parse_different_messages_as_a_stream() "PONG\r\n", "-ERR 'Unknown Protocol Operation'\r\n")) { - UnitUnderTest = new NatsOpStreamReader(stream); + UnitUnderTest = NatsOpStreamReader.Use(stream); - var infoOp = UnitUnderTest.ReadOps().First(); + var infoOp = UnitUnderTest.ReadOp(); infoOp.Should().BeOfType(); - var okOp = UnitUnderTest.ReadOps().First(); + var okOp = UnitUnderTest.ReadOp(); okOp.Should().BeOfType(); - var msgOp = UnitUnderTest.ReadOps().First(); + var msgOp = UnitUnderTest.ReadOp(); msgOp.Should().BeOfType(); - var msgWithGroupOp = UnitUnderTest.ReadOps().First(); + var msgWithGroupOp = UnitUnderTest.ReadOp(); msgWithGroupOp.Should().BeOfType(); - var pingOp = UnitUnderTest.ReadOps().First(); + var pingOp = UnitUnderTest.ReadOp(); pingOp.Should().BeOfType(); - var pongOp = UnitUnderTest.ReadOps().First(); + var pongOp = UnitUnderTest.ReadOp(); pongOp.Should().BeOfType(); - var errOp = UnitUnderTest.ReadOps().First(); + var errOp = UnitUnderTest.ReadOp(); errOp.Should().BeOfType(); - UnitUnderTest.ReadOps().FirstOrDefault().Should().BeNull(); + UnitUnderTest.ReadOp().Should().Be(NullOp.Instance); } } @@ -55,9 +55,9 @@ public void Should_be_able_to_parse_InfoOp() using (var stream = CreateStream( "INFO {\"server_id\":\"H8RgvFtiq2zlQTA5dB0deh\"}\r\n")) { - UnitUnderTest = new NatsOpStreamReader(stream); + UnitUnderTest = NatsOpStreamReader.Use(stream); - var op = UnitUnderTest.ReadOps().OfType().First(); + var op = UnitUnderTest.ReadOp().Should().BeOfType().Subject; op.Marker.Should().Be("INFO"); op.Message.ToString().Should().Be("{\"server_id\":\"H8RgvFtiq2zlQTA5dB0deh\"}"); @@ -70,9 +70,9 @@ public void Should_be_able_to_parse_OkOp() using (var stream = CreateStream( "+OK\r\n")) { - UnitUnderTest = new NatsOpStreamReader(stream); + UnitUnderTest = NatsOpStreamReader.Use(stream); - var op = UnitUnderTest.ReadOps().OfType().First(); + var op = UnitUnderTest.ReadOp().Should().BeOfType().Subject; op.Marker.Should().Be("+OK"); } @@ -84,9 +84,9 @@ public void Should_be_able_to_parse_MsgOp() using (var stream = CreateStream( "MSG Foo Siddw1 ReplyTo1 4\r\nTEST\r\n")) { - UnitUnderTest = new NatsOpStreamReader(stream); + UnitUnderTest = NatsOpStreamReader.Use(stream); - var op = UnitUnderTest.ReadOps().OfType().First(); + var op = UnitUnderTest.ReadOp().Should().BeOfType().Subject; op.Marker.Should().Be("MSG"); op.Subject.Should().Be("Foo"); @@ -103,9 +103,9 @@ public void Should_be_able_to_parse_MsgOp_When_optionals_are_missing() using (var stream = CreateStream( "MSG Foo Siddw1 0\r\n\r\n")) { - UnitUnderTest = new NatsOpStreamReader(stream); + UnitUnderTest = NatsOpStreamReader.Use(stream); - var op = UnitUnderTest.ReadOps().OfType().First(); + var op = UnitUnderTest.ReadOp().Should().BeOfType().Subject; op.Marker.Should().Be("MSG"); op.Subject.Should().Be("Foo"); @@ -122,9 +122,9 @@ public void Should_be_able_to_parse_MsgOp_When_message_has_new_line() using (var stream = CreateStream( "MSG Foo Siddw1 ReplyTo1 6\r\nTE\r\nST\r\n")) { - UnitUnderTest = new NatsOpStreamReader(stream); + UnitUnderTest = NatsOpStreamReader.Use(stream); - var op = UnitUnderTest.ReadOps().OfType().First(); + var op = UnitUnderTest.ReadOp().Should().BeOfType().Subject; op.Marker.Should().Be("MSG"); op.Subject.Should().Be("Foo"); @@ -141,9 +141,9 @@ public void Should_be_able_to_parse_MsgOp_When_message_has_tab() using (var stream = CreateStream( "MSG Foo Siddw1 ReplyTo1 5\r\nTE\tST\r\n")) { - UnitUnderTest = new NatsOpStreamReader(stream); + UnitUnderTest = NatsOpStreamReader.Use(stream); - var op = UnitUnderTest.ReadOps().OfType().First(); + var op = UnitUnderTest.ReadOp().Should().BeOfType().Subject; op.Marker.Should().Be("MSG"); op.Subject.Should().Be("Foo"); @@ -160,9 +160,9 @@ public void Should_be_able_to_parse_MsgOp_When_message_has_new_line_and_tab() using (var stream = CreateStream( "MSG Foo Siddw1 ReplyTo1 7\r\nTE\tS\r\nT\r\n")) { - UnitUnderTest = new NatsOpStreamReader(stream); + UnitUnderTest = NatsOpStreamReader.Use(stream); - var op = UnitUnderTest.ReadOps().OfType().First(); + var op = UnitUnderTest.ReadOp().Should().BeOfType().Subject; op.Marker.Should().Be("MSG"); op.Subject.Should().Be("Foo"); @@ -179,9 +179,9 @@ public void Should_be_able_to_parse_MsgOp_When_it_is_tab_delimited_instead_of_sp using (var stream = CreateStream( "MSG\tFoo\tSiddw1\tReplyTo1\t4\r\nTEST\r\n")) { - UnitUnderTest = new NatsOpStreamReader(stream); + UnitUnderTest = NatsOpStreamReader.Use(stream); - var op = UnitUnderTest.ReadOps().OfType().First(); + var op = UnitUnderTest.ReadOp().Should().BeOfType().Subject; op.Marker.Should().Be("MSG"); op.Subject.Should().Be("Foo"); @@ -198,9 +198,9 @@ public void Should_be_able_to_parse_MsgOp_When_headers_are_defined() using (var stream = CreateStream( "HMSG Foo Siddw1 ReplyTo1 66 72 NATS/1.0\r\nHeader1:Value1.1\r\nHeader2:Value2.1\r\nHeader2:Value2.2\r\n\r\nTE\r\nST\r\n")) { - UnitUnderTest = new NatsOpStreamReader(stream); + UnitUnderTest = NatsOpStreamReader.Use(stream); - var op = UnitUnderTest.ReadOps().OfType().First(); + var op = UnitUnderTest.ReadOp().Should().BeOfType().Subject; op.Marker.Should().Be("HMSG"); op.Subject.Should().Be("Foo"); @@ -221,9 +221,9 @@ public void Should_be_able_to_parse_MsgOp_When_headers_are_defined_but_optionals using (var stream = CreateStream( "HMSG Foo Siddw1 66 66 NATS/1.0\r\nHeader1:Value1.1\r\nHeader2:Value2.1\r\nHeader2:Value2.2\r\n\r\n\r\n")) { - UnitUnderTest = new NatsOpStreamReader(stream); + UnitUnderTest = NatsOpStreamReader.Use(stream); - var op = UnitUnderTest.ReadOps().OfType().First(); + var op = UnitUnderTest.ReadOp().Should().BeOfType().Subject; op.Marker.Should().Be("HMSG"); op.Subject.Should().Be("Foo"); @@ -243,9 +243,9 @@ public void Should_be_able_to_parse_PingOp() using (var stream = CreateStream( "PING\r\n")) { - UnitUnderTest = new NatsOpStreamReader(stream); + UnitUnderTest = NatsOpStreamReader.Use(stream); - var op = UnitUnderTest.ReadOps().OfType().First(); + var op = UnitUnderTest.ReadOp().Should().BeOfType().Subject; op.Marker.Should().Be("PING"); } @@ -257,9 +257,9 @@ public void Should_be_able_to_parse_PongOp() using (var stream = CreateStream( "PONG\r\n")) { - UnitUnderTest = new NatsOpStreamReader(stream); + UnitUnderTest = NatsOpStreamReader.Use(stream); - var op = UnitUnderTest.ReadOps().OfType().First(); + var op = UnitUnderTest.ReadOp().Should().BeOfType().Subject; op.Marker.Should().Be("PONG"); } @@ -271,9 +271,9 @@ public void Should_be_able_to_parse_ErrOp() using (var stream = CreateStream( "-ERR 'Unknown Protocol Operation'\r\n")) { - UnitUnderTest = new NatsOpStreamReader(stream); + UnitUnderTest = NatsOpStreamReader.Use(stream); - var op = UnitUnderTest.ReadOps().OfType().First(); + var op = UnitUnderTest.ReadOp().Should().BeOfType().Subject; op.Marker.Should().Be("-ERR"); op.Message.Should().Be("'Unknown Protocol Operation'"); @@ -281,15 +281,15 @@ public void Should_be_able_to_parse_ErrOp() } [Fact] - public void Should_be_able_to_handle_blank_ops() + public void Should_be_able_to_handle_blank_lines() { using (var stream = CreateStream( "+OK\r\n\r\n", "PING\r\n")) { - UnitUnderTest = new NatsOpStreamReader(stream); + UnitUnderTest = NatsOpStreamReader.Use(stream); - var ops = UnitUnderTest.ReadOps().ToArray(); + var ops = new[] {UnitUnderTest.ReadOp(), UnitUnderTest.ReadOp()}; ops.Should().HaveCount(2); ops[0].Should().BeOfType(); ops[1].Should().BeOfType();