Skip to content

Commit

Permalink
Respect host's ShutdownTimeout (e.g. cancellation token) when stoppin…
Browse files Browse the repository at this point in the history
…g jobs
  • Loading branch information
Tolyandre committed Oct 26, 2020
1 parent 8424c6b commit 1406ccb
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 27 deletions.
3 changes: 2 additions & 1 deletion src/Horarium.AspNetCore/HorariumServerHostedService.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Horarium.Interfaces;
Expand All @@ -23,7 +24,7 @@ public Task StartAsync(CancellationToken cancellationToken)

public Task StopAsync(CancellationToken cancellationToken)
{
return _horariumServer.Stop();
return _horariumServer.Stop(cancellationToken);
}
}
}
10 changes: 6 additions & 4 deletions src/Horarium.Test/RunnerJobTest.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Moq;
using Newtonsoft.Json;
Expand Down Expand Up @@ -31,7 +32,7 @@ public async Task Start_Stop()

await Task.Delay(TimeSpan.FromSeconds(1));

await runnerJobs.Stop();
await runnerJobs.Stop(CancellationToken.None);

jobRepositoryMock.Invocations.Clear();

Expand Down Expand Up @@ -129,7 +130,7 @@ public async Task Start_NextJobStarted_AddsJobTaskToUncompletedTasks()
// Act
runnerJobs.Start();
await Task.Delay(TimeSpan.FromSeconds(5));
await runnerJobs.Stop();
await runnerJobs.Stop(CancellationToken.None);

// Assert
uncompletedTaskList.Verify(x=>x.Add(It.IsAny<Task>()), Times.Once);
Expand All @@ -141,6 +142,7 @@ public async Task StopAsync_AwaitsWhenAllCompleted()
// Arrange
var jobRepositoryMock = new Mock<IJobRepository>();
var uncompletedTaskList = new Mock<IUncompletedTaskList>();
var cancellationToken = new CancellationTokenSource().Token;

var settings = new HorariumSettings
{
Expand All @@ -159,10 +161,10 @@ public async Task StopAsync_AwaitsWhenAllCompleted()
// Act
runnerJobs.Start();
await Task.Delay(TimeSpan.FromSeconds(1));
await runnerJobs.Stop();
await runnerJobs.Stop(cancellationToken);

// Assert
uncompletedTaskList.Verify(x => x.WhenAllCompleted(), Times.Once);
uncompletedTaskList.Verify(x => x.WhenAllCompleted(cancellationToken), Times.Once);
}
}
}
26 changes: 23 additions & 3 deletions src/Horarium.Test/UncompletedTaskListTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Horarium.Handlers;
using Xunit;
Expand Down Expand Up @@ -39,7 +40,7 @@ public async Task Add_TaskWithAnyResult_KeepsTaskUntilCompleted()
public async Task WhenAllCompleted_NoTasks_ReturnsCompletedTask()
{
// Act
var whenAll = _uncompletedTaskList.WhenAllCompleted();
var whenAll = _uncompletedTaskList.WhenAllCompleted(CancellationToken.None);

// Assert
Assert.True(whenAll.IsCompletedSuccessfully);
Expand All @@ -54,7 +55,7 @@ public async Task WhenAllCompleted_TaskNotCompleted_AwaitsUntilTaskCompleted()
_uncompletedTaskList.Add(tcs.Task);

// Act
var whenAll = _uncompletedTaskList.WhenAllCompleted();
var whenAll = _uncompletedTaskList.WhenAllCompleted(CancellationToken.None);

// Assert
await Task.Delay(TimeSpan.FromSeconds(1)); // give a chance to finish any running tasks
Expand All @@ -74,9 +75,28 @@ public async Task WhenAllCompleted_TaskFaulted_DoesNotThrow()
_uncompletedTaskList.Add(Task.FromException(new ApplicationException()));

// Act
var whenAll = _uncompletedTaskList.WhenAllCompleted();
var whenAll = _uncompletedTaskList.WhenAllCompleted(CancellationToken.None);

await whenAll;
}

[Fact]
public async Task WhenAllCompleted_CancellationRequested_DoesNotAwait_ThrowsOperationCancelledException()
{
// Arrange
var tcs = new TaskCompletionSource<bool>();
var cts = new CancellationTokenSource();
_uncompletedTaskList.Add(tcs.Task);

// Act
var whenAll = _uncompletedTaskList.WhenAllCompleted(cts.Token);

// Assert
cts.Cancel();
await Task.Delay(TimeSpan.FromSeconds(1), CancellationToken.None); // give a chance to finish any running tasks

var exception = await Assert.ThrowsAsync<OperationCanceledException>(() => whenAll);
Assert.Equal(cts.Token, exception.CancellationToken);
}
}
}
4 changes: 2 additions & 2 deletions src/Horarium/Handlers/RunnerJobs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void Start()
_horariumLogger.Debug("Started RunnerJob...");
}

public async Task Stop()
public async Task Stop(CancellationToken stopCancellationToken)
{
_cancelTokenSource.Cancel(false);

Expand All @@ -58,7 +58,7 @@ public async Task Stop()
//watcher был остановлен
}

await _uncompletedTaskList.WhenAllCompleted();
await _uncompletedTaskList.WhenAllCompleted(stopCancellationToken);

_horariumLogger.Debug("Stopped DeleterJob");
}
Expand Down
27 changes: 16 additions & 11 deletions src/Horarium/Handlers/UncompletedTaskList.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
Expand Down Expand Up @@ -37,23 +38,27 @@ public void Add(Task task)
}, linkedListNode, CancellationToken.None);
}

public async Task WhenAllCompleted()
public async Task WhenAllCompleted(CancellationToken cancellationToken)
{
Task[] tasksToAwait;
lock (_lockObject)
{
tasksToAwait = _uncompletedTasks.ToArray();
tasksToAwait = _uncompletedTasks
// get rid of fault state, Task.WhenAll shall not throw
.Select(x => x.ContinueWith((t) => { }, CancellationToken.None))
.ToArray();
}

try
{
await Task.WhenAll(tasksToAwait);
}
catch
{
// We just want to have all task completed by now.
// Any possible exceptions must be handled in jobs.
}
var whenAbandon = Task.Delay(Timeout.Infinite, cancellationToken);
var whenAllCompleted = Task.WhenAll(tasksToAwait);

await Task.WhenAny(whenAbandon, whenAllCompleted);

if (cancellationToken.IsCancellationRequested)
throw new OperationCanceledException(
"Horarium stop timeout is expired. One or many jobs are still running. These jobs may not save their state.",
cancellationToken);

}
}
}
7 changes: 4 additions & 3 deletions src/Horarium/HorariumServer.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Horarium.Handlers;
using Horarium.Interfaces;
Expand Down Expand Up @@ -39,14 +40,14 @@ public void Start()
_runnerJobs.Start();
}

public Task Stop()
public Task Stop(CancellationToken stopCancellationToken)
{
return _runnerJobs.Stop();
return _runnerJobs.Stop(stopCancellationToken);
}

public new void Dispose()
{
Stop();
Stop(CancellationToken.None);
}
}
}
10 changes: 8 additions & 2 deletions src/Horarium/Interfaces/IRunnerJobs.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
using System.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;

namespace Horarium.Interfaces
{
public interface IRunnerJobs
{
void Start();
Task Stop();

/// <summary>
/// Stops scheduling next jobs and awaits currently running jobs.
/// If <see cref="stopCancellationToken"></see> is cancelled, than abandons running jobs.
/// </summary>
Task Stop(CancellationToken stopCancellationToken);

}
}
5 changes: 4 additions & 1 deletion src/Horarium/Interfaces/IUncompletedTaskList.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace Horarium.Interfaces
Expand All @@ -15,6 +17,7 @@ public interface IUncompletedTaskList
/// <summary>
/// Returns task that will complete (with success) when all currently running tasks complete or fail.
/// </summary>
Task WhenAllCompleted();
/// <param name="cancellationToken">If cancelled, throws <see cref="OperationCanceledException"/> immediately.</param>
Task WhenAllCompleted(CancellationToken cancellationToken);
}
}

0 comments on commit 1406ccb

Please sign in to comment.