Skip to content

Commit

Permalink
add support for pipestream for multiple httpclient support
Browse files Browse the repository at this point in the history
  • Loading branch information
Ali-YousefiTelori committed Oct 9, 2023
1 parent 0b736ea commit 0e3a099
Show file tree
Hide file tree
Showing 8 changed files with 256 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>netcoreapp3.1;net6.0;net5.0;net48;net452;net7.0</TargetFrameworks>
<TargetFrameworks>net6.0;net48;net452;net7.0</TargetFrameworks>
<IsPackable>false</IsPackable>
<LangVersion>latest</LangVersion>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="EasyMicroservices.Tests" Version="0.0.0.3" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.7.2" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.0" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
using EasyMicroservices.Laboratory.Engine;
using EasyMicroservices.Laboratory.Engine.Net.Http;
using EasyMicroservices.Laboratory.Models;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks;
using Xunit;
Expand Down Expand Up @@ -37,6 +39,64 @@ public async Task CheckSimpleRequestAndResponse(string request, string response)
Assert.Equal(await httpResponse.Content.ReadAsStringAsync(), response);
}

[Theory]
[InlineData($"Hello Ali \r\n Hi Mahdi", "Reza")]
[InlineData($"Hello Ali \r\n Hi Mahdi2", "Reza2")]
[InlineData($"Hello Ali \r\n Hi Mahdi3", "Reza3")]
public async Task ConcurrentCheckSimpleRequestAndResponse(string request, string response)
{
request = NormalizeOSText(request);
response = NormalizeOSText(response);
ResourceManager resourceManager = new ResourceManager();
resourceManager.Append(request, GetHttpResponseHeaders(response));
HttpHandler httpHandler = new HttpHandler(resourceManager);
var port = await httpHandler.Start();

List<Task<bool>> all = new List<Task<bool>>();
for (int i = 0; i < 100; i++)
{
all.Add(Task.Run(async () =>
{
HttpClient httpClient = new HttpClient();
var data = new StringContent(request);
var httpResponse = await httpClient.PostAsync($"http://localhost:{port}", data);
Assert.Equal(await httpResponse.Content.ReadAsStringAsync(), response);
return true;
}));
}
await Task.WhenAll(all);
Assert.True(all.All(x => x.Result));
}

[Theory]
[InlineData($"Hello Ali \r\n Hi Mahdi", "Reza")]
[InlineData($"Hello Ali \r\n Hi Mahdi2", "Reza2")]
[InlineData($"Hello Ali \r\n Hi Mahdi3", "Reza3")]
public async Task ConcurrentSingleHttpClientCheckSimpleRequestAndResponse(string request, string response)
{
request = NormalizeOSText(request);
response = NormalizeOSText(response);
ResourceManager resourceManager = new ResourceManager();
resourceManager.Append(request, GetHttpResponseHeaders(response));
HttpHandler httpHandler = new HttpHandler(resourceManager);
var port = await httpHandler.Start();
HttpClient httpClient = new HttpClient();

List<Task<bool>> all = new List<Task<bool>>();
for (int i = 0; i < 100; i++)
{
all.Add(Task.Run(async () =>
{
var data = new StringContent(request);
var httpResponse = await httpClient.PostAsync($"http://localhost:{port}", data);
Assert.Equal(await httpResponse.Content.ReadAsStringAsync(), response);
return true;
}));
}
await Task.WhenAll(all);
Assert.True(all.All(x => x.Result));
}

[Theory]
[InlineData("Hello Ali \r\n Hi Mahdi", $"POST / HTTP/1.1\r\nHost: localhost:*MyPort*\r\nContent-Type: text/plain; charset=utf-8\r\nContent-Length: 21\r\n\r\nHello Ali \r\n Hi Mahdi")]
public async Task CheckSimpleRequestToGiveMeFullRequestHeaderValue(string request, string response)
Expand Down Expand Up @@ -75,7 +135,7 @@ public async Task CheckSimpleRequestToGiveMeLastFullRequestHeaderValue(string re
var httpResponse = await httpClient.PostAsync($"http://localhost:{port}", data);
var textResponse = await httpResponse.Content.ReadAsStringAsync();

httpClient.DefaultRequestHeaders.Clear();
httpClient = new HttpClient();
httpClient.DefaultRequestHeaders.Add(RequestTypeHeaderConstants.RequestTypeHeader, RequestTypeHeaderConstants.GiveMeLastFullRequestHeaderValue);
httpResponse = await httpClient.GetAsync($"http://localhost:{port}");
textResponse = await httpResponse.Content.ReadAsStringAsync();
Expand Down Expand Up @@ -178,23 +238,31 @@ public async Task CheckScope()
resourceManager.Append(scope);
HttpHandler httpHandler = new HttpHandler(resourceManager);
var port = await httpHandler.Start();
var addHeaders = (HttpClient client) =>
{
client.DefaultRequestHeaders.Add("x-amz-meta-title", "someTitle");
client.DefaultRequestHeaders.Add("User-Agent", "aws-sdk-dotnet-coreclr/3.7.101.44 aws-sdk-dotnet-core/3.7.103.6 .NET_Core/6.0.11 OS/Microsoft_Windows_10.0.22000 ClientAsync");
client.DefaultRequestHeaders.Add("amz-sdk-invocation-id", "guid");
client.DefaultRequestHeaders.Add("amz-sdk-request", "attempt=1; max=5");
client.DefaultRequestHeaders.Add("X-Amz-Date", "20230107T162454Z");
client.DefaultRequestHeaders.Add("X-Amz-Content-SHA256", "sha256");
client.DefaultRequestHeaders.Add("Authorization", "empty");
client.DefaultRequestHeaders.Add("Host", "s3.eu-west-1.amazonaws.com");
};
HttpClient httpClient = new HttpClient();
httpClient.DefaultRequestHeaders.Add("x-amz-meta-title", "someTitle");
httpClient.DefaultRequestHeaders.Add("User-Agent", "aws-sdk-dotnet-coreclr/3.7.101.44 aws-sdk-dotnet-core/3.7.103.6 .NET_Core/6.0.11 OS/Microsoft_Windows_10.0.22000 ClientAsync");
httpClient.DefaultRequestHeaders.Add("amz-sdk-invocation-id", "guid");
httpClient.DefaultRequestHeaders.Add("amz-sdk-request", "attempt=1; max=5");
httpClient.DefaultRequestHeaders.Add("X-Amz-Date", "20230107T162454Z");
httpClient.DefaultRequestHeaders.Add("X-Amz-Content-SHA256", "sha256");
httpClient.DefaultRequestHeaders.Add("Authorization", "empty");
httpClient.DefaultRequestHeaders.Add("Host", "s3.eu-west-1.amazonaws.com");
addHeaders(httpClient);
var httpResponse = await httpClient.PutAsync($"http://localhost:{port}", null);
var textResponse = await httpResponse.Content.ReadAsStringAsync();
Assert.Equal("Ali", textResponse);

httpClient = new HttpClient();
addHeaders(httpClient);
httpResponse = await httpClient.PutAsync($"http://localhost:{port}", null);
textResponse = await httpResponse.Content.ReadAsStringAsync();
Assert.Equal("Reza", textResponse);

httpClient = new HttpClient();
addHeaders(httpClient);
httpResponse = await httpClient.PutAsync($"http://localhost:{port}", null);
textResponse = await httpResponse.Content.ReadAsStringAsync();
Assert.Equal("Ali", textResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<Platforms>AnyCPU;x64;x86</Platforms>
<Authors>EasyMicroservices</Authors>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<Version>0.0.0.11</Version>
<Version>0.0.0.12</Version>
<Description>Laboratory of http client.</Description>
<Copyright>[email protected]</Copyright>
<PackageTags>test,tests,http,https,httpclient,laboratory</PackageTags>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using EasyMicroservices.Utilities.Constants;
using EasyMicroservices.Laboratory.IO;
using EasyMicroservices.Utilities.Constants;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;
Expand All @@ -21,14 +23,15 @@ public HttpHandler(ResourceManager resourceManager) : base(resourceManager)

}


/// <summary>
/// Handle Tcp client of a http client
/// </summary>
/// <param name="tcpClient"></param>
/// <returns></returns>
protected override async Task HandleTcpClient(TcpClient tcpClient)
{
using var stream = tcpClient.GetStream();
using var stream = new PipelineStream(tcpClient.GetStream());
string firstLine = await ReadLineAsync(stream);
StringBuilder fullBody = new StringBuilder();
Dictionary<string, string> headers = new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase);
Expand Down Expand Up @@ -58,6 +61,7 @@ protected override async Task HandleTcpClient(TcpClient tcpClient)
fullBody.Append(requestBody);
}
await WriteResponseAsync(firstLine, headers, requestBody, fullBody, stream);
tcpClient.Client.Shutdown(SocketShutdown.Send);
}
}
}
36 changes: 24 additions & 12 deletions src/CSharp/EasyMicroservices.Laboratory/Engine/Net/TcpHandler.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
using EasyMicroservices.Laboratory.Constants;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace EasyMicroservices.Laboratory.Engine.Net
Expand Down Expand Up @@ -69,7 +71,12 @@ public void Stop()
}

TcpListener _tcpListener;
Task InternalStart(int port)
/// <summary>
///
/// </summary>
/// <param name="port"></param>
/// <returns></returns>
protected virtual Task InternalStart(int port)
{
_tcpListener = new TcpListener(IPAddress.Any, port);
_tcpListener.Start();
Expand All @@ -80,17 +87,7 @@ Task InternalStart(int port)
try
{
var tcpClient = await _tcpListener.AcceptTcpClientAsync();
_ = Task.Run(async () =>
{
try
{
await HandleTcpClient(tcpClient);
}
catch
{
tcpClient.Close();
}
});
ThreadPool.QueueUserWorkItem(InternalHandleTcpClient, tcpClient);
}
catch
{
Expand All @@ -101,6 +98,20 @@ Task InternalStart(int port)
return TaskHelper.GetCompletedTask();
}

async void InternalHandleTcpClient(object tcpClient)
{
var client = (TcpClient)tcpClient;
try
{
await HandleTcpClient(client);
}
catch
{
client.Client.Shutdown(SocketShutdown.Send);
client.Close();
}
}

/// <summary>
/// Handle a Tcp client
/// </summary>
Expand Down Expand Up @@ -201,6 +212,7 @@ public async Task WriteResponseAsync(string firstLine, Dictionary<string, string
_lastResponseBody = responseBody;
var responseBodyBytes = Encoding.UTF8.GetBytes(responseBody);
await stream.WriteAsync(responseBodyBytes, 0, responseBodyBytes.Length);
await stream.FlushAsync();
}

string GetGiveMeFullRequestHeaderValueResponse(string firstLine, Dictionary<string, string> requestHeaders, string requestBody)
Expand Down
73 changes: 73 additions & 0 deletions src/CSharp/EasyMicroservices.Laboratory/IO/PipelineStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
using System.IO;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;

namespace EasyMicroservices.Laboratory.IO
{
/// <summary>
///
/// </summary>
internal class PipelineStream : Stream
{
PipelineStreamReader pipelineStreamReader;
NetworkStream _stream;
public PipelineStream(NetworkStream stream)
{
_stream = stream;
pipelineStreamReader = new PipelineStreamReader(_stream);
}

public override bool CanRead => _stream.CanRead;

public override bool CanSeek => _stream.CanSeek;

public override bool CanWrite => _stream.CanWrite;

public override long Length => _stream.Length;

public override long Position { get => _stream.Position; set => _stream.Position = value; }

public override void Flush()
{
_stream.Flush();
}

public override int Read(byte[] buffer, int offset, int count)
{
throw new System.NotImplementedException();
}

public override long Seek(long offset, SeekOrigin origin)
{
return _stream.Seek(offset, origin);
}

public override void SetLength(long value)
{
_stream.SetLength(Length);
}

public override void Write(byte[] buffer, int offset, int count)
{
throw new System.NotImplementedException();
}

public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
return pipelineStreamReader.ReadAsync(buffer, count);
}

public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
return _stream.WriteAsync(buffer, offset, count, cancellationToken);
}

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
_stream.Dispose();
pipelineStreamReader.Dispose();
}
}
}
Loading

0 comments on commit 0e3a099

Please sign in to comment.