From 76cbf8fb285e3aedc409c0a6a7644878aac9c4e0 Mon Sep 17 00:00:00 2001 From: Nazar Mandzyk Date: Tue, 9 May 2023 01:37:30 +0300 Subject: [PATCH] Fix unobserved SocketException resolves #770 (original issue) resolves #771 (original PR, this is a rebase) Perform socket read operations according to Task-based asynchronous pattern (TAP) instead of Asynchronous Programming Model (APM) Also cleanup/nullable-ize SocketInitiatorThread (this part by @gbirchmeier) --- QuickFIXn/SocketInitiatorThread.cs | 182 ++++++++++++------------- QuickFIXn/SocketReader.cs | 71 +++++----- QuickFIXn/Transport/SocketInitiator.cs | 6 +- RELEASE_NOTES.md | 4 + 4 files changed, 132 insertions(+), 131 deletions(-) diff --git a/QuickFIXn/SocketInitiatorThread.cs b/QuickFIXn/SocketInitiatorThread.cs index 9277cb1cf..117204574 100755 --- a/QuickFIXn/SocketInitiatorThread.cs +++ b/QuickFIXn/SocketInitiatorThread.cs @@ -1,9 +1,11 @@ -using System.Net.Sockets; -using System.Net; -using System.Threading; -using System.IO; +#nullable enable using System; using System.Diagnostics; +using System.IO; +using System.Net; +using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; namespace QuickFix { @@ -12,56 +14,57 @@ namespace QuickFix /// public class SocketInitiatorThread : IResponder { - public Session Session { get { return session_; } } - public Transport.SocketInitiator Initiator { get { return initiator_; } } + public Session Session { get; } + public Transport.SocketInitiator Initiator { get; } public const int BUF_SIZE = 512; - private Thread thread_ = null; - private byte[] readBuffer_ = new byte[BUF_SIZE]; - private Parser parser_; - protected Stream stream_; - private Transport.SocketInitiator initiator_; - private Session session_; - private IPEndPoint socketEndPoint_; - protected SocketSettings socketSettings_; - private bool isDisconnectRequested_ = false; + private Thread? _thread; + private readonly byte[] _readBuffer = new byte[BUF_SIZE]; + private readonly Parser _parser = new(); + private Stream? _stream; + private CancellationTokenSource _readCancellationTokenSource = new(); + private readonly IPEndPoint _socketEndPoint; + private readonly SocketSettings _socketSettings; + private bool _isDisconnectRequested = false; + + /// + /// Keep a task for handling async read + /// + private Task? _currentReadTask; public SocketInitiatorThread(Transport.SocketInitiator initiator, Session session, IPEndPoint socketEndPoint, SocketSettings socketSettings) { - isDisconnectRequested_ = false; - initiator_ = initiator; - session_ = session; - socketEndPoint_ = socketEndPoint; - parser_ = new Parser(); - session_ = session; - socketSettings_ = socketSettings; + Initiator = initiator; + Session = session; + _socketEndPoint = socketEndPoint; + _socketSettings = socketSettings; } public void Start() { - isDisconnectRequested_ = false; - thread_ = new Thread(new ParameterizedThreadStart(Transport.SocketInitiator.SocketInitiatorThreadStart)); - thread_.Start(this); + _isDisconnectRequested = false; + _thread = new Thread(Transport.SocketInitiator.SocketInitiatorThreadStart); + _thread.Start(this); } public void Join() { - if (null == thread_) + if (_thread is null) return; Disconnect(); // Make sure session's socket reader thread doesn't try to do a Join on itself! - if (Thread.CurrentThread.ManagedThreadId != thread_.ManagedThreadId) - thread_.Join(2000); - thread_ = null; + if (Environment.CurrentManagedThreadId != _thread.ManagedThreadId) + _thread.Join(2000); + _thread = null; } public void Connect() { - Debug.Assert(stream_ == null); + Debug.Assert(_stream == null); - stream_ = SetupStream(); - session_.SetResponder(this); + _stream = SetupStream(); + Session.SetResponder(this); } /// @@ -71,55 +74,36 @@ public void Connect() /// Stream representing the (network)connection to the other party protected virtual Stream SetupStream() { - return QuickFix.Transport.StreamFactory.CreateClientStream(socketEndPoint_, socketSettings_, session_.Log); + return Transport.StreamFactory.CreateClientStream(_socketEndPoint, _socketSettings, Session.Log); } public bool Read() { try { - int bytesRead = ReadSome(readBuffer_, 1000); + int bytesRead = ReadSome(_readBuffer, 1000); if (bytesRead > 0) - parser_.AddToStream(readBuffer_, bytesRead); - else if (null != session_) - { - session_.Next(); - } + _parser.AddToStream(_readBuffer, bytesRead); else - { - throw new QuickFIXException("Initiator timed out while reading socket"); - } + Session.Next(); ProcessStream(); return true; } - catch (System.ObjectDisposedException e) + catch (ObjectDisposedException) { - // this exception means socket_ is already closed when poll() is called - if (isDisconnectRequested_ == false) - { - // for lack of a better idea, do what the general exception does - if (null != session_) - session_.Disconnect(e.ToString()); - else - Disconnect(); - } - return false; + // this exception means _socket is already closed when poll() is called + if (_isDisconnectRequested == false) + Disconnect(); } - catch (System.Exception e) + catch (Exception e) { - if (null != session_) - session_.Disconnect(e.ToString()); - else - Disconnect(); + Session.Log.OnEvent(e.ToString()); + Disconnect(); } return false; } - /// - /// Keep a handle to the current outstanding read request (if any) - /// - private IAsyncResult currentReadRequest_; /// /// Reads data from the network into the specified buffer. /// It will wait up to the specified number of milliseconds for data to arrive, @@ -129,58 +113,58 @@ public bool Read() /// The timeout milliseconds. /// The number of bytes read into the buffer /// On connection reset - protected int ReadSome(byte[] buffer, int timeoutMilliseconds) + protected virtual int ReadSome(byte[] buffer, int timeoutMilliseconds) { - // NOTE: THIS FUNCTION IS EXACTLY THE SAME AS THE ONE IN SocketReader. + if (_stream is null) { + throw new ApplicationException("Initiator is not connected (uninitialized stream)"); + } + + // NOTE: FROM HERE, THIS FUNCTION IS EXACTLY THE SAME AS THE ONE IN SocketReader. // Any changes made here should also be performed there. try { // Begin read if it is not already started - if (currentReadRequest_ == null) - currentReadRequest_ = stream_.BeginRead(buffer, 0, buffer.Length, null, null); - - // Wait for it to complete (given timeout) - currentReadRequest_.AsyncWaitHandle.WaitOne(timeoutMilliseconds); + _currentReadTask ??= _stream.ReadAsync(buffer, 0, buffer.Length, _readCancellationTokenSource.Token); - if (currentReadRequest_.IsCompleted) - { - // Make sure to set currentReadRequest_ to before retreiving result - // so a new read can be started next time even if an exception is thrown - var request = currentReadRequest_; - currentReadRequest_ = null; + if (_currentReadTask.Wait(timeoutMilliseconds)) { + // Dispose/nullify currentReadTask *before* retreiving .Result. + // Accessting .Result can throw an exception, so we need to reset currentReadTask + // first, to set us up for the next read even if an exception is thrown. + Task? request = _currentReadTask; + _currentReadTask = null; - int bytesRead = stream_.EndRead(request); + int bytesRead = request.Result; // (As mentioned above, this can throw an exception!) if (0 == bytesRead) - throw new SocketException(System.Convert.ToInt32(SocketError.Shutdown)); + throw new SocketException(Convert.ToInt32(SocketError.Shutdown)); return bytesRead; } - else - return 0; + + return 0; } - catch (System.IO.IOException ex) // Timeout + catch (AggregateException ex) // Timeout { - var inner = ex.InnerException as SocketException; - if (inner != null && inner.SocketErrorCode == SocketError.TimedOut) - { + _currentReadTask = null; + var ioException = ex.InnerException as IOException; + var inner = ioException?.InnerException as SocketException; + if (inner is not null && inner.SocketErrorCode == SocketError.TimedOut) { // Nothing read return 0; } - else if (inner != null) - { + + if (inner is not null) { throw inner; //rethrow SocketException part (which we have exception logic for) } - else - throw; //rethrow original exception + + throw; //rethrow original exception } } private void ProcessStream() { - string msg; - while (parser_.ReadFixMessage(out msg)) + while (_parser.ReadFixMessage(out var msg)) { - session_.Next(msg); + Session.Next(msg); } } @@ -188,16 +172,26 @@ private void ProcessStream() public bool Send(string data) { + if (_stream is null) { + throw new ApplicationException("Initiator is not connected (uninitialized stream)"); + } + byte[] rawData = CharEncoding.DefaultEncoding.GetBytes(data); - stream_.Write(rawData, 0, rawData.Length); + _stream.Write(rawData, 0, rawData.Length); return true; } public void Disconnect() { - isDisconnectRequested_ = true; - if (stream_ != null) - stream_.Close(); + _isDisconnectRequested = true; + _readCancellationTokenSource.Cancel(); + _readCancellationTokenSource.Dispose(); + + // just wait when read task will be cancelled + _currentReadTask?.ContinueWith(_ => { }).Wait(1000); + _currentReadTask?.Dispose(); + _currentReadTask = null; + _stream?.Close(); } #endregion diff --git a/QuickFIXn/SocketReader.cs b/QuickFIXn/SocketReader.cs index 4d8eb7e88..d12b87b3e 100755 --- a/QuickFIXn/SocketReader.cs +++ b/QuickFIXn/SocketReader.cs @@ -3,27 +3,27 @@ using System.IO; using System; using System.Linq; +using System.Threading; +using System.Threading.Tasks; namespace QuickFix { - /// - /// TODO merge with SocketInitiatorThread - /// public class SocketReader : IDisposable { public const int BUF_SIZE = 4096; private readonly byte[] _readBuffer = new byte[BUF_SIZE]; private readonly Parser _parser = new(); - private Session? _qfSession; //will be null when initialized + private Session? _qfSession; private readonly Stream _stream; + private readonly CancellationTokenSource _readCancellationTokenSource = new(); private readonly TcpClient _tcpClient; private readonly ClientHandlerThread _responder; private readonly AcceptorSocketDescriptor? _acceptorDescriptor; /// - /// Keep a handle to the current outstanding read request (if any) + /// Keep a task for handling async read /// - private IAsyncResult? _currentReadRequest; + private Task? _currentReadTask; internal SocketReader( TcpClient tcpClient, @@ -71,46 +71,44 @@ public void Read() /// On connection reset protected virtual int ReadSome(byte[] buffer, int timeoutMilliseconds) { - // NOTE: THIS FUNCTION IS EXACTLY THE SAME AS THE ONE IN SocketInitiatorThread. - // Any changes made here should also be made there. - try - { + // NOTE: THIS FUNCTION IS (nearly) EXACTLY THE SAME AS THE ONE IN SocketInitiatorThread. + // Any changes made here should also be performed there. + try { // Begin read if it is not already started - _currentReadRequest ??= _stream.BeginRead(buffer, 0, buffer.Length, null, null); - - // Wait for it to complete (given timeout) - _currentReadRequest.AsyncWaitHandle.WaitOne(timeoutMilliseconds); + _currentReadTask ??= _stream.ReadAsync(buffer, 0, buffer.Length, _readCancellationTokenSource.Token); - if (_currentReadRequest.IsCompleted) - { - // Make sure to set _currentReadRequest to before retreiving result - // so a new read can be started next time even if an exception is thrown - var request = _currentReadRequest; - _currentReadRequest = null; + if (_currentReadTask.Wait(timeoutMilliseconds)) { + // Dispose/nullify currentReadTask *before* retreiving .Result. + // Accessting .Result can throw an exception, so we need to reset currentReadTask + // first, to set us up for the next read even if an exception is thrown. + Task? request = _currentReadTask; + _currentReadTask = null; - int bytesRead = _stream.EndRead(request); + int bytesRead = request.Result; // (As mentioned above, this can throw an exception!) if (0 == bytesRead) - throw new SocketException(System.Convert.ToInt32(SocketError.Shutdown)); + throw new SocketException(Convert.ToInt32(SocketError.Shutdown)); return bytesRead; } return 0; } - catch (IOException ex) // Timeout + catch (AggregateException ex) // Timeout { - var inner = ex.InnerException as SocketException; - if (inner?.SocketErrorCode == SocketError.TimedOut) - { + _currentReadTask = null; + + IOException? ioException = ex.InnerException as IOException; + SocketException? inner = ioException?.InnerException as SocketException; + if (inner is not null && inner.SocketErrorCode == SocketError.TimedOut) { // Nothing read return 0; } - else if (inner is not null) - { + + if (inner is not null) { throw inner; //rethrow SocketException part (which we have exception logic for) } - else - throw; //rethrow original exception + + throw; //rethrow original exception } } @@ -200,8 +198,7 @@ protected void ProcessStream() protected void DisconnectClient() { - _stream.Close(); - _tcpClient.Close(); + Dispose(); } private bool IsAssumedSession(SessionID sessionId) @@ -275,13 +272,23 @@ public void Dispose() GC.SuppressFinalize(this); } + private bool _disposed = false; protected virtual void Dispose(bool disposing) { + if (_disposed) return; if (disposing) { + _readCancellationTokenSource.Cancel(); + _readCancellationTokenSource.Dispose(); + + // just wait when read task will be cancelled + _currentReadTask?.ContinueWith(_ => { }).Wait(1000); + _currentReadTask?.Dispose(); + _stream.Dispose(); _tcpClient.Close(); } + _disposed = true; } ~SocketReader() => Dispose(false); } diff --git a/QuickFIXn/Transport/SocketInitiator.cs b/QuickFIXn/Transport/SocketInitiator.cs index 086949126..d8e24a430 100644 --- a/QuickFIXn/Transport/SocketInitiator.cs +++ b/QuickFIXn/Transport/SocketInitiator.cs @@ -21,8 +21,6 @@ public class SocketInitiator : AbstractInitiator public const string SOCKET_CONNECT_PORT = "SocketConnectPort"; public const string RECONNECT_INTERVAL = "ReconnectInterval"; - #region Private Members - private volatile bool _shutdownRequested = false; private DateTime _lastConnectTimeDt = DateTime.MinValue; private int _reconnectInterval = 30; @@ -31,8 +29,6 @@ public class SocketInitiator : AbstractInitiator private readonly Dictionary _sessionToHostNum = new(); private readonly object _sync = new(); - #endregion - public SocketInitiator( IApplication application, IMessageStoreFactory storeFactory, @@ -42,7 +38,7 @@ public SocketInitiator( : base(application, storeFactory, settings, logFactoryNullable, messageFactoryNullable) { } - public static void SocketInitiatorThreadStart(object socketInitiatorThread) + public static void SocketInitiatorThreadStart(object? socketInitiatorThread) { SocketInitiatorThread? t = socketInitiatorThread as SocketInitiatorThread; if (t == null) return; diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 70788fe49..e7ce46de5 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -73,6 +73,10 @@ What's New * #740 - Capture inner exception messages when handling authentication exceptions (rars) * #833 - Add Try/Catch logic to SocketInitiator.OnStart() (Falcz) * #782 - proper handling of XmlData field (larsope) +* #770 - fix unobserved SocketException + * Perform socket read operations according to Task-based asynchronous pattern (TAP) instead of Asynchronous + Programming Model (APM), in order to catch unobserved SocketExceptions (nmandzyk) + * Cleanup/nullable-ize SocketInitiatorThread (gbirchmeier) ### v1.11.2: * same as v1.11.1, but I fixed the readme in the pushed nuget packages