Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Connected event in NamedPipeClient and minor changes #9

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 80 additions & 28 deletions NamedPipeWrapper/NamedPipeClient.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
using System;
using System.Collections.Generic;
using System.IO.Pipes;
using System.Linq;
using System.Text;
using System.Threading;
using NamedPipeWrapper.IO;
using NamedPipeWrapper.Threading;
Expand All @@ -16,11 +13,11 @@ namespace NamedPipeWrapper
public class NamedPipeClient<TReadWrite> : NamedPipeClient<TReadWrite, TReadWrite> where TReadWrite : class
{
/// <summary>
/// Constructs a new <c>NamedPipeClient</c> to connect to the <see cref="NamedPipeNamedPipeServer{TReadWrite}"/> specified by <paramref name="pipeName"/>.
/// Constructs a new <c>NamedPipeClient</c> to connect to the <see cref="NamedPipeServer{TReadWrite}"/> specified by <paramref name="pipeName"/>.
/// </summary>
/// <param name="pipeName">Name of the server's pipe</param>
/// <param name="serverName">server name default is local.</param>
public NamedPipeClient(string pipeName,string serverName=".") : base(pipeName, serverName)
public NamedPipeClient(string pipeName, string serverName = ".") : base(pipeName, serverName)
{
}
}
Expand All @@ -46,6 +43,11 @@ public class NamedPipeClient<TRead, TWrite>
/// </summary>
public event ConnectionMessageEventHandler<TRead, TWrite> ServerMessage;

/// <summary>
/// Invoked when the client connects to the server.
/// </summary>
public event ConnectionEventHandler<TRead, TWrite> Connected;

/// <summary>
/// Invoked when the client disconnects from the server (e.g., the pipe is closed or broken).
/// </summary>
Expand All @@ -69,11 +71,11 @@ public class NamedPipeClient<TRead, TWrite>
private string _serverName { get; set; }

/// <summary>
/// Constructs a new <c>NamedPipeClient</c> to connect to the <see cref="NamedPipeServer{TRead, TWrite}"/> specified by <paramref name="pipeName"/>.
/// Constructs a new <c>NamedPipeClient</c> to connect to the <see cref="NamedPipeServer{TReadWrite}"/> specified by <paramref name="pipeName"/>.
/// </summary>
/// <param name="pipeName">Name of the server's pipe</param>
/// <param name="serverName">the Name of the server, default is local machine</param>
public NamedPipeClient(string pipeName,string serverName)
public NamedPipeClient(string pipeName, string serverName)
{
_pipeName = pipeName;
_serverName = serverName;
Expand All @@ -92,14 +94,34 @@ public void Start()
worker.DoWork(ListenSync);
}

/// <summary>
/// Connects to the named pipe server synchronously.
/// </summary>
/// <returns></returns>
public void StartSynchronously()
{
_closedExplicitly = false;
try
{
ListenSync();
}
catch (Exception e)
{
OnError(e);
}
}

/// <summary>
/// Sends a message to the server over a named pipe.
/// </summary>
/// <param name="message">Message to send to the server.</param>
public void PushMessage(TWrite message)
/// <returns>false if conection is null</returns>
public bool PushMessage(TWrite message)
{
if (_connection != null)
_connection.PushMessage(message);
if (_connection == null) return false;

_connection.PushMessage(message);
return true;
}

/// <summary>
Expand All @@ -113,35 +135,62 @@ public void Stop()
}

#region Wait for connection/disconnection

public void WaitForConnection()
/// <summary>
/// Wait for connection
/// </summary>
/// <returns>true if connected</returns>
public bool WaitForConnection()
{
_connected.WaitOne();
return _connected.WaitOne();
}

public void WaitForConnection(int millisecondsTimeout)
/// <summary>
/// Wait for connection
/// </summary>
/// <param name="millisecondsTimeout"></param>
/// <returns>true if connected</returns>
public bool WaitForConnection(int millisecondsTimeout)
{
_connected.WaitOne(millisecondsTimeout);
return _connected.WaitOne(millisecondsTimeout);
}

public void WaitForConnection(TimeSpan timeout)
/// <summary>
/// Wait for connection
/// </summary>
/// <param name="timeout"></param>
/// <returns>true if connected</returns>
public bool WaitForConnection(TimeSpan timeout)
{
_connected.WaitOne(timeout);
return _connected.WaitOne(timeout);
}

public void WaitForDisconnection()
/// <summary>
/// Wait for disconnection
/// </summary>
/// <returns>true if disconnected</returns>
public bool WaitForDisconnection()
{
_disconnected.WaitOne();
return _disconnected.WaitOne();
}

public void WaitForDisconnection(int millisecondsTimeout)
/// <summary>
/// Wait for disconnection
/// </summary>
/// <param name="millisecondsTimeout"></param>
/// <returns>true if disconnected</returns>
public bool WaitForDisconnection(int millisecondsTimeout)
{
_disconnected.WaitOne(millisecondsTimeout);
return _disconnected.WaitOne(millisecondsTimeout);
}

public void WaitForDisconnection(TimeSpan timeout)
/// <summary>
/// Wait for disconnection
/// </summary>
/// <param name="timeout"></param>
/// <returns>true if disconnected</returns>
public bool WaitForDisconnection(TimeSpan timeout)
{
_disconnected.WaitOne(timeout);
return _disconnected.WaitOne(timeout);
}

#endregion
Expand All @@ -151,12 +200,12 @@ public void WaitForDisconnection(TimeSpan timeout)
private void ListenSync()
{
// Get the name of the data pipe that should be used from now on by this NamedPipeClient
var handshake = PipeClientFactory.Connect<string, string>(_pipeName,_serverName);
var handshake = PipeClientFactory.Connect<string, string>(_pipeName, _serverName);
var dataPipeName = handshake.ReadObject();
handshake.Close();

// Connect to the actual data pipe
var dataPipe = PipeClientFactory.CreateAndConnectPipe(dataPipeName,_serverName);
var dataPipe = PipeClientFactory.CreateAndConnectPipe(dataPipeName, _serverName);

// Create a Connection object for the data pipe
_connection = ConnectionFactory.CreateConnection<TRead, TWrite>(dataPipe);
Expand All @@ -166,6 +215,9 @@ private void ListenSync()
_connection.Open();

_connected.Set();

if (Connected != null)
Connected(_connection);
}

private void OnDisconnected(NamedPipeConnection<TRead, TWrite> connection)
Expand Down Expand Up @@ -209,11 +261,11 @@ private void OnError(Exception exception)

static class PipeClientFactory
{
public static PipeStreamWrapper<TRead, TWrite> Connect<TRead, TWrite>(string pipeName,string serverName)
public static PipeStreamWrapper<TRead, TWrite> Connect<TRead, TWrite>(string pipeName, string serverName)
where TRead : class
where TWrite : class
{
return new PipeStreamWrapper<TRead, TWrite>(CreateAndConnectPipe(pipeName,serverName));
return new PipeStreamWrapper<TRead, TWrite>(CreateAndConnectPipe(pipeName, serverName));
}

public static NamedPipeClientStream CreateAndConnectPipe(string pipeName, string serverName)
Expand All @@ -223,7 +275,7 @@ public static NamedPipeClientStream CreateAndConnectPipe(string pipeName, string
return pipe;
}

private static NamedPipeClientStream CreatePipe(string pipeName,string serverName)
private static NamedPipeClientStream CreatePipe(string pipeName, string serverName)
{
return new NamedPipeClientStream(serverName, pipeName, PipeDirection.InOut, PipeOptions.Asynchronous | PipeOptions.WriteThrough);
}
Expand Down
43 changes: 20 additions & 23 deletions NamedPipeWrapper/NamedPipeConnection.cs
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
using System;
using System.Collections.Generic;
using System.IO.Pipes;
using System.Linq;
using System.Runtime.Serialization;
using System.Text;
using System.Threading;
using NamedPipeWrapper.IO;
using NamedPipeWrapper.Threading;
using System.Collections.Concurrent;

namespace NamedPipeWrapper
{
{
/// <summary>
/// Represents a connection between a named pipe client and server.
/// </summary>
Expand Down Expand Up @@ -162,35 +159,35 @@ private void ReadPipe()
{
//we must igonre exception, otherwise, the namepipe wrapper will stop work.
}
}
}
}

/// <summary>
/// Invoked on the background thread.
/// </summary>
/// <exception cref="SerializationException">An object in the graph of type parameter <typeparamref name="TWrite"/> is not marked as serializable.</exception>
private void WritePipe()
{

while (IsConnected && _streamWrapper.CanWrite)
{

while (IsConnected && _streamWrapper.CanWrite)
{
try
{
//using blockcollection, we needn't use singal to wait for result.
//_writeSignal.WaitOne();
//while (_writeQueue.Count > 0)
{
_streamWrapper.WriteObject(_writeQueue.Take());
_streamWrapper.WaitForPipeDrain();
}
}
catch
{
try
{
//using blockcollection, we needn't use singal to wait for result.
//_writeSignal.WaitOne();
//while (_writeQueue.Count > 0)
{
_streamWrapper.WriteObject(_writeQueue.Take());
_streamWrapper.WaitForPipeDrain();
}
}
catch
{
//we must igonre exception, otherwise, the namepipe wrapper will stop work.
}
}
}
}
}

Expand Down
11 changes: 5 additions & 6 deletions NamedPipeWrapper/NamedPipeServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
using NamedPipeWrapper.Threading;
using System;
using System.Collections.Generic;
using System.IO.Pipes;

using System.IO.Pipes;
namespace NamedPipeWrapper
{
{
/// <summary>
/// Wraps a <see cref="NamedPipeServerStream"/> and provides multiple simultaneous client connection handling.
/// </summary>
Expand All @@ -25,6 +25,7 @@ public NamedPipeServer(string pipeName)
/// Constructs a new <c>NamedPipeServer</c> object that listens for client connections on the given <paramref name="pipeName"/>.
/// </summary>
/// <param name="pipeName">Name of the pipe to listen on</param>
/// <param name="pipeSecurity"></param>
public NamedPipeServer(string pipeName, PipeSecurity pipeSecurity)
: base(pipeName, pipeSecurity)
{
Expand Down Expand Up @@ -67,12 +68,12 @@ public class Server<TRead, TWrite>
private int _nextPipeId;

private volatile bool _shouldKeepRunning;
private volatile bool _isRunning;

/// <summary>
/// Constructs a new <c>NamedPipeServer</c> object that listens for client connections on the given <paramref name="pipeName"/>.
/// </summary>
/// <param name="pipeName">Name of the pipe to listen on</param>
/// <param name="pipeSecurity"></param>
public Server(string pipeName, PipeSecurity pipeSecurity)
{
_pipeName = pipeName;
Expand Down Expand Up @@ -153,12 +154,10 @@ public void Stop()

private void ListenSync()
{
_isRunning = true;
while (_shouldKeepRunning)
{
WaitForConnection(_pipeName, _pipeSecurity);
}
_isRunning = false;
}

private void WaitForConnection(string pipeName, PipeSecurity pipeSecurity)
Expand Down
8 changes: 3 additions & 5 deletions UnitTests/SerializableTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ static SerializableTests()
private int _expectedHash;
private TestCollection _actualData;
private int _actualHash;
private bool _clientDisconnected;

private DateTime _startTime;

Expand All @@ -60,7 +59,6 @@ public void SetUp()
_expectedHash = 0;
_actualData = null;
_actualHash = 0;
_clientDisconnected = false;

_server.ClientMessage += ServerOnClientMessage;

Expand Down Expand Up @@ -137,7 +135,7 @@ public void TestCircularReferences()
Assert.NotNull(_actualHash, string.Format("Server should have received client's {0} item message", _expectedData.Count));
Assert.AreEqual(_expectedHash, _actualHash, string.Format("Hash codes for {0} item message should match", _expectedData.Count));
Assert.AreEqual(_expectedData.Count, _actualData.Count, string.Format("Collection lengths should be equal"));

for (var i = 0; i < _actualData.Count; i++)
{
var expectedItem = _expectedData[i];
Expand Down Expand Up @@ -218,14 +216,14 @@ public override bool Equals(object obj)
if (ReferenceEquals(null, obj)) return false;
if (ReferenceEquals(this, obj)) return true;
if (obj.GetType() != this.GetType()) return false;
return Equals((TestItem) obj);
return Equals((TestItem)obj);
}

public override int GetHashCode()
{
unchecked
{
return (Id*397) ^ (int) Enum;
return (Id * 397) ^ (int)Enum;
}
}
}
Expand Down