From 113ee811e7771f8009042fe53c035d7111889168 Mon Sep 17 00:00:00 2001 From: Sebakov Dmitriy Sergeevich Date: Wed, 22 Apr 2020 13:33:05 +0300 Subject: [PATCH 1/5] Add job throttle --- src/Horarium.Test/RunnerJobTest.cs | 118 ++++++++++++++++++++++++++++ src/Horarium/Handlers/RunnerJobs.cs | 49 +++++++++++- src/Horarium/HorariumSettings.cs | 2 + src/Horarium/JobThrottleSettings.cs | 28 +++++++ 4 files changed, 194 insertions(+), 3 deletions(-) create mode 100644 src/Horarium/JobThrottleSettings.cs diff --git a/src/Horarium.Test/RunnerJobTest.cs b/src/Horarium.Test/RunnerJobTest.cs index 0cb4083..7d503b0 100644 --- a/src/Horarium.Test/RunnerJobTest.cs +++ b/src/Horarium.Test/RunnerJobTest.cs @@ -102,6 +102,124 @@ public async Task Start_WontRecoverBeforeIntervalTimeout_AfterFailedDB() jobRepositoryMock.Verify(r => r.GetReadyJob(It.IsAny(), It.IsAny()), Times.Once); } + [Fact] + public async Task Start_ExecutionWithDelay_WithThrottle() + { + // Arrange + var jobRepositoryMock = new Mock(); + + var settings = new HorariumSettings + { + IntervalStartJob = TimeSpan.FromSeconds(1), + JobThrottleSettings = new JobThrottleSettings + { + UseJobThrottle = true, + IntervalMultiplier = 1, + JobRetrievalAttempts = 1 + } + }; + + var runnerJobs = new RunnerJobs(jobRepositoryMock.Object, + settings, + new JsonSerializerSettings(), + Mock.Of(), + Mock.Of(), + Mock.Of()); + + // Act + runnerJobs.Start(); + await Task.Delay(settings.IntervalStartJob - TimeSpan.FromMilliseconds(500)); + jobRepositoryMock.Invocations.Clear(); + + await Task.Delay(settings.IntervalStartJob + settings.IntervalStartJob.Multiply(settings.JobThrottleSettings.IntervalMultiplier)); + + // Assert + jobRepositoryMock.Verify(r => r.GetReadyJob(It.IsAny(), It.IsAny()), Times.Once); + } + + [Fact] + public async Task Start_ExecutionWithDelay_IncreaseInterval() + { + // Arrange + var jobRepositoryMock = new Mock(); + + jobRepositoryMock.Setup(x => x.GetReadyJob(It.IsAny(), It.IsAny())) + .ReturnsAsync(() => null); + + var settings = new HorariumSettings + { + IntervalStartJob = TimeSpan.FromSeconds(1), + JobThrottleSettings = new JobThrottleSettings + { + UseJobThrottle = true, + IntervalMultiplier = 1, + JobRetrievalAttempts = 1, + } + }; + + var runnerJobs = new RunnerJobs(jobRepositoryMock.Object, + settings, + new JsonSerializerSettings(), + Mock.Of(), + Mock.Of(), + Mock.Of()); + + // Act + runnerJobs.Start(); + await Task.Delay(settings.IntervalStartJob - TimeSpan.FromMilliseconds(500)); + jobRepositoryMock.Invocations.Clear(); + + var interval = settings.IntervalStartJob + + settings.IntervalStartJob.Multiply(settings.JobThrottleSettings.IntervalMultiplier); + await Task.Delay(interval); + interval += settings.IntervalStartJob.Multiply(settings.JobThrottleSettings.IntervalMultiplier); + await Task.Delay(interval); + interval += settings.IntervalStartJob.Multiply(settings.JobThrottleSettings.IntervalMultiplier); + await Task.Delay(interval); + + // Assert + jobRepositoryMock.Verify(r => r.GetReadyJob(It.IsAny(), It.IsAny()), Times.Exactly(3)); + } + + [Fact] + public async Task Start_ExecutionWithDelay_MaxInterval() + { + // Arrange + var jobRepositoryMock = new Mock(); + + jobRepositoryMock.Setup(x => x.GetReadyJob(It.IsAny(), It.IsAny())) + .ReturnsAsync(() => null); + + var settings = new HorariumSettings + { + IntervalStartJob = TimeSpan.FromSeconds(1), + JobThrottleSettings = new JobThrottleSettings + { + UseJobThrottle = true, + IntervalMultiplier = 1, + JobRetrievalAttempts = 1, + MaxJobThrottleInterval = TimeSpan.FromSeconds(1) + } + }; + + var runnerJobs = new RunnerJobs(jobRepositoryMock.Object, + settings, + new JsonSerializerSettings(), + Mock.Of(), + Mock.Of(), + Mock.Of()); + + // Act + runnerJobs.Start(); + await Task.Delay(settings.IntervalStartJob - TimeSpan.FromMilliseconds(500)); + jobRepositoryMock.Invocations.Clear(); + + await Task.Delay(TimeSpan.FromSeconds(5)); + + // Assert + jobRepositoryMock.Verify(r => r.GetReadyJob(It.IsAny(), It.IsAny()), Times.Exactly(5)); + } + [Fact] public async Task Start_NextJobStarted_AddsJobTaskToUncompletedTasks() { diff --git a/src/Horarium/Handlers/RunnerJobs.cs b/src/Horarium/Handlers/RunnerJobs.cs index dfab2f6..731b8da 100644 --- a/src/Horarium/Handlers/RunnerJobs.cs +++ b/src/Horarium/Handlers/RunnerJobs.cs @@ -17,6 +17,8 @@ public class RunnerJobs : IRunnerJobs private readonly IExecutorJob _executorJob; private Task _runnerTask; private readonly IUncompletedTaskList _uncompletedTaskList; + + private readonly TimeSpan _defaultJobThrottleInterval = TimeSpan.FromMilliseconds(100); private CancellationToken _cancellationToken; private readonly CancellationTokenSource _cancelTokenSource = new CancellationTokenSource(); @@ -100,11 +102,29 @@ private async Task GetReadyJob() private async Task StartRunnerInternal(CancellationToken cancellationToken) { + var jobWaitTime = _settings.IntervalStartJob; + while (true) + { + var isJobRan = await TryRunJob(cancellationToken, jobWaitTime); + if (!_settings.JobThrottleSettings.UseJobThrottle) + { + jobWaitTime = _settings.IntervalStartJob; + continue; + } + + jobWaitTime = !isJobRan ? GetNextIntervalStartJob(jobWaitTime) : _settings.IntervalStartJob; + } + } + + private async Task TryRunJob(CancellationToken cancellationToken, TimeSpan waitTime) + { + for (var i = 0; i < _settings.JobThrottleSettings.JobRetrievalAttempts; i++) { var job = await GetReadyJob(); + var isJobReady = job != null; - if (job != null) + if (isJobReady) { _horariumLogger.Debug("Try to Run jobMetadata..."); @@ -117,11 +137,34 @@ private async Task StartRunnerInternal(CancellationToken cancellationToken) throw new TaskCanceledException(); } - if (!_settings.IntervalStartJob.Equals(TimeSpan.Zero)) + if (isJobReady) { - await Task.Delay(_settings.IntervalStartJob, cancellationToken); + return true; } + + if (!waitTime.Equals(TimeSpan.Zero)) + { + await Task.Delay(waitTime, cancellationToken); + } + } + + return false; + } + + private TimeSpan GetNextIntervalStartJob(TimeSpan currentInterval) + { + if (currentInterval.Equals(TimeSpan.Zero)) + { + return _defaultJobThrottleInterval; } + + var nextInterval = + currentInterval + + TimeSpan.FromTicks((long) (currentInterval.Ticks * _settings.JobThrottleSettings.IntervalMultiplier)); + + var maxInterval = _settings.JobThrottleSettings.MaxJobThrottleInterval; + + return nextInterval > maxInterval ? maxInterval : nextInterval; } } } \ No newline at end of file diff --git a/src/Horarium/HorariumSettings.cs b/src/Horarium/HorariumSettings.cs index ceca08d..6150ac6 100644 --- a/src/Horarium/HorariumSettings.cs +++ b/src/Horarium/HorariumSettings.cs @@ -10,6 +10,8 @@ public class HorariumSettings public TimeSpan ObsoleteExecutingJob { get; set; } = TimeSpan.FromMinutes(5); + public JobThrottleSettings JobThrottleSettings { get; set; } = new JobThrottleSettings(); + public IJobScopeFactory JobScopeFactory { get; set; } = new DefaultJobScopeFactory(); public IHorariumLogger Logger { get; set; } = new EmptyLogger(); diff --git a/src/Horarium/JobThrottleSettings.cs b/src/Horarium/JobThrottleSettings.cs new file mode 100644 index 0000000..7b4a1a4 --- /dev/null +++ b/src/Horarium/JobThrottleSettings.cs @@ -0,0 +1,28 @@ +using System; + +namespace Horarium +{ + public class JobThrottleSettings + { + /// + /// When `true`, IntervalStartJob will automatically increase if there is no jobs available + /// + public bool UseJobThrottle { get; set; } + + /// + /// After all attempts are exhausted, waiting interval is increased by formula: + /// currentInterval + (currentInterval * intervalMultiplier) + /// + public int JobRetrievalAttempts { get; set; } = 10; + + /// + /// Multiplier to get the next waiting interval + /// + public double IntervalMultiplier { get; set; } = 0.25; + + /// + /// Maximum waiting interval + /// + public TimeSpan MaxJobThrottleInterval { get; set; } = TimeSpan.FromSeconds(30); + } +} \ No newline at end of file From 32a5ecde15ba207a0330dc0d8057a96dde4d6166 Mon Sep 17 00:00:00 2001 From: Sebakov Dmitriy Sergeevich Date: Wed, 22 Apr 2020 14:01:53 +0300 Subject: [PATCH 2/5] Add documentation --- README.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/README.md b/README.md index 591e437..cbf522b 100644 --- a/README.md +++ b/README.md @@ -138,6 +138,20 @@ Horarium guarantees that a job would run **exactly once** Every Horarium instance consults MongoDB about new jobs to run every 100ms (default), thus creating some load on the DB server. This interval can be changed in ```HorariumSettings``` +If you want to decrease load, you can use job throttling that will automatically increase interval if there are no jobs available after certain attempts. To enable this feature, pass `JobThrottleSettings` to `HorariumSettings` with property `UseJobThrottle` set to true. + +```csharp +var settings = new HorariumSettings +{ + JobThrottleSettings = new JobThrottleSettings + { + UseJobThrottle = true + } +}; +``` + +For more information about configuration, see `JobThrottleSettings` + ## Using Horarium with SimpleInjector To use Horarium with SimpleInjector one should implement its own `IJobFactory`, using `Container` from `SimpleInjector`. For example: From c803ba9b5cf06ab91921f995d1f8d224b7c31397 Mon Sep 17 00:00:00 2001 From: Sebakov Dmitriy Sergeevich Date: Wed, 22 Apr 2020 14:02:39 +0300 Subject: [PATCH 3/5] Documentation improv --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index cbf522b..f6b8b37 100644 --- a/README.md +++ b/README.md @@ -138,7 +138,7 @@ Horarium guarantees that a job would run **exactly once** Every Horarium instance consults MongoDB about new jobs to run every 100ms (default), thus creating some load on the DB server. This interval can be changed in ```HorariumSettings``` -If you want to decrease load, you can use job throttling that will automatically increase interval if there are no jobs available after certain attempts. To enable this feature, pass `JobThrottleSettings` to `HorariumSettings` with property `UseJobThrottle` set to true. +If you want to decrease load, you can use job throttling that will automatically increase interval if there are no jobs available after certain attempts. To enable this feature, pass `JobThrottleSettings` to `HorariumSettings` with property `UseJobThrottle` set to `true`. ```csharp var settings = new HorariumSettings From 3a5f7d450f1935bc5303080acca0852c8905ce06 Mon Sep 17 00:00:00 2001 From: Sebakov Dmitriy Sergeevich Date: Sat, 27 Nov 2021 15:38:45 +0300 Subject: [PATCH 4/5] Add unit tests --- src/Horarium.Test/RunnerJobTest.cs | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/src/Horarium.Test/RunnerJobTest.cs b/src/Horarium.Test/RunnerJobTest.cs index 7d503b0..e760007 100644 --- a/src/Horarium.Test/RunnerJobTest.cs +++ b/src/Horarium.Test/RunnerJobTest.cs @@ -161,8 +161,12 @@ public async Task Start_ExecutionWithDelay_IncreaseInterval() settings, new JsonSerializerSettings(), Mock.Of(), +<<<<<<< HEAD Mock.Of(), Mock.Of()); +======= + Mock.Of()); +>>>>>>> f47d280 (Add unit tests) // Act runnerJobs.Start(); @@ -176,11 +180,19 @@ public async Task Start_ExecutionWithDelay_IncreaseInterval() await Task.Delay(interval); interval += settings.IntervalStartJob.Multiply(settings.JobThrottleSettings.IntervalMultiplier); await Task.Delay(interval); +<<<<<<< HEAD // Assert jobRepositoryMock.Verify(r => r.GetReadyJob(It.IsAny(), It.IsAny()), Times.Exactly(3)); } +======= + + // Assert + jobRepositoryMock.Verify(r => r.GetReadyJob(It.IsAny(), It.IsAny()), Times.Exactly(3)); + } + +>>>>>>> f47d280 (Add unit tests) [Fact] public async Task Start_ExecutionWithDelay_MaxInterval() { @@ -206,8 +218,12 @@ public async Task Start_ExecutionWithDelay_MaxInterval() settings, new JsonSerializerSettings(), Mock.Of(), +<<<<<<< HEAD Mock.Of(), Mock.Of()); +======= + Mock.Of()); +>>>>>>> f47d280 (Add unit tests) // Act runnerJobs.Start(); @@ -215,11 +231,16 @@ public async Task Start_ExecutionWithDelay_MaxInterval() jobRepositoryMock.Invocations.Clear(); await Task.Delay(TimeSpan.FromSeconds(5)); +<<<<<<< HEAD +======= + +>>>>>>> f47d280 (Add unit tests) // Assert jobRepositoryMock.Verify(r => r.GetReadyJob(It.IsAny(), It.IsAny()), Times.Exactly(5)); } +<<<<<<< HEAD [Fact] public async Task Start_NextJobStarted_AddsJobTaskToUncompletedTasks() { @@ -284,5 +305,7 @@ public async Task StopAsync_AwaitsWhenAllCompleted() // Assert uncompletedTaskList.Verify(x => x.WhenAllCompleted(cancellationToken), Times.Once); } +======= +>>>>>>> f47d280 (Add unit tests) } } \ No newline at end of file From 1e596a044aaa27b5b91b667699495fca8b77fabb Mon Sep 17 00:00:00 2001 From: Sebakov Dmitriy Sergeevich Date: Sat, 27 Nov 2021 15:53:24 +0300 Subject: [PATCH 5/5] fix conflicts --- src/Horarium.Test/RunnerJobTest.cs | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/src/Horarium.Test/RunnerJobTest.cs b/src/Horarium.Test/RunnerJobTest.cs index e760007..af12ddd 100644 --- a/src/Horarium.Test/RunnerJobTest.cs +++ b/src/Horarium.Test/RunnerJobTest.cs @@ -161,12 +161,8 @@ public async Task Start_ExecutionWithDelay_IncreaseInterval() settings, new JsonSerializerSettings(), Mock.Of(), -<<<<<<< HEAD Mock.Of(), Mock.Of()); -======= - Mock.Of()); ->>>>>>> f47d280 (Add unit tests) // Act runnerJobs.Start(); @@ -180,19 +176,11 @@ public async Task Start_ExecutionWithDelay_IncreaseInterval() await Task.Delay(interval); interval += settings.IntervalStartJob.Multiply(settings.JobThrottleSettings.IntervalMultiplier); await Task.Delay(interval); -<<<<<<< HEAD // Assert jobRepositoryMock.Verify(r => r.GetReadyJob(It.IsAny(), It.IsAny()), Times.Exactly(3)); } - -======= - - // Assert - jobRepositoryMock.Verify(r => r.GetReadyJob(It.IsAny(), It.IsAny()), Times.Exactly(3)); - } ->>>>>>> f47d280 (Add unit tests) [Fact] public async Task Start_ExecutionWithDelay_MaxInterval() { @@ -218,12 +206,8 @@ public async Task Start_ExecutionWithDelay_MaxInterval() settings, new JsonSerializerSettings(), Mock.Of(), -<<<<<<< HEAD Mock.Of(), Mock.Of()); -======= - Mock.Of()); ->>>>>>> f47d280 (Add unit tests) // Act runnerJobs.Start(); @@ -231,16 +215,10 @@ public async Task Start_ExecutionWithDelay_MaxInterval() jobRepositoryMock.Invocations.Clear(); await Task.Delay(TimeSpan.FromSeconds(5)); -<<<<<<< HEAD - -======= - ->>>>>>> f47d280 (Add unit tests) // Assert jobRepositoryMock.Verify(r => r.GetReadyJob(It.IsAny(), It.IsAny()), Times.Exactly(5)); } -<<<<<<< HEAD [Fact] public async Task Start_NextJobStarted_AddsJobTaskToUncompletedTasks() { @@ -305,7 +283,5 @@ public async Task StopAsync_AwaitsWhenAllCompleted() // Assert uncompletedTaskList.Verify(x => x.WhenAllCompleted(cancellationToken), Times.Once); } -======= ->>>>>>> f47d280 (Add unit tests) } } \ No newline at end of file