Skip to content

Commit

Permalink
Switch to blocking queue (#39)
Browse files Browse the repository at this point in the history
  • Loading branch information
dasgarner authored Oct 16, 2023
1 parent 7c59026 commit 63cc8bf
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 133 deletions.
158 changes: 41 additions & 117 deletions Worker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/
using System.Collections.Concurrent;
using System.Text;
using ConcurrentPriorityQueue.Core;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
Expand All @@ -34,33 +35,15 @@ public class Worker : BackgroundService
private readonly ILogger<Worker> _logger;
private readonly ZmqSettings _settings;

private readonly ConcurrentQueue<ZmqMessage> _queueQos1;
private readonly ConcurrentQueue<ZmqMessage> _queueQos2;
private readonly ConcurrentQueue<ZmqMessage> _queueQos3;
private readonly ConcurrentQueue<ZmqMessage> _queueQos4;
private readonly ConcurrentQueue<ZmqMessage> _queueQos5;
private readonly ConcurrentQueue<ZmqMessage> _queueQos6;
private readonly ConcurrentQueue<ZmqMessage> _queueQos7;
private readonly ConcurrentQueue<ZmqMessage> _queueQos8;
private readonly ConcurrentQueue<ZmqMessage> _queueQos9;
private readonly ConcurrentQueue<ZmqMessage> _queueQos10;
private readonly BlockingCollection<ZmqMessage> _queue;

private int _sentCount = 0;

public Worker(ILogger<Worker> logger, IOptions<ZmqSettings> settings)
{
_logger = logger;
_settings = settings.Value;
_queueQos1 = new();
_queueQos2 = new();
_queueQos3 = new();
_queueQos4 = new();
_queueQos5 = new();
_queueQos6 = new();
_queueQos7 = new();
_queueQos8 = new();
_queueQos9 = new();
_queueQos10 = new();
_queue = new ConcurrentPriorityQueue<ZmqMessage, int>().ToBlockingCollection();
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
Expand All @@ -87,10 +70,11 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
// 3. Set up a periodic timer which sends a heartbeat message (H) every 30 seconds
// -------
await Task.WhenAll(
Task.Factory.StartNew(() => { new NetMQRuntime().Run(stoppingToken, ResponderAsync(stoppingToken)); }, stoppingToken),
Task.Factory.StartNew(() => { new NetMQRuntime().Run(stoppingToken, PublisherAsync(stoppingToken)); }, stoppingToken)
Task.Factory.StartNew(() => { new NetMQRuntime().Run(stoppingToken, ResponderAsync(stoppingToken)); }, stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Default),
Task.Factory.StartNew(() => { new NetMQRuntime().Run(stoppingToken, PublisherAsync(stoppingToken)); }, stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Default),
Task.Factory.StartNew(() => { new NetMQRuntime().Run(stoppingToken, HeartbeatAsync(stoppingToken)); }, stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Default)
);

// Must call clean up at the end
NetMQConfig.Cleanup();
}
Expand All @@ -116,7 +100,7 @@ async Task ResponderAsync(CancellationToken stoppingToken)
// Are we a request for stats?
if (message.Equals("stats"))
{
string json = GetJsonStats(GetCurrentQueueSize(), _sentCount, stats);
string json = GetJsonStats(_queue.Count, _sentCount, stats);
responseSocket.SendFrame(json);
_logger.LogDebug("{json}", json);

Expand Down Expand Up @@ -151,31 +135,14 @@ async Task ResponderAsync(CancellationToken stoppingToken)
// Stats
stats["total"]++;
stats["" + zmqMessage.qos]++;
stats["peak"] = Math.Max(stats["peak"], GetCurrentQueueSize() + 1);
stats["peak"] = Math.Max(stats["peak"], _queue.Count + 1);

// Add to the queue
_logger.LogDebug("Queuing");

if (zmqMessage.qos == 1) {
_queueQos1.Enqueue(zmqMessage);
} else if (zmqMessage.qos == 2) {
_queueQos2.Enqueue(zmqMessage);
} else if (zmqMessage.qos == 3) {
_queueQos3.Enqueue(zmqMessage);
} else if (zmqMessage.qos == 4) {
_queueQos4.Enqueue(zmqMessage);
} else if (zmqMessage.qos == 5) {
_queueQos5.Enqueue(zmqMessage);
} else if (zmqMessage.qos == 6) {
_queueQos6.Enqueue(zmqMessage);
} else if (zmqMessage.qos == 7) {
_queueQos7.Enqueue(zmqMessage);
} else if (zmqMessage.qos == 8) {
_queueQos8.Enqueue(zmqMessage);
} else if (zmqMessage.qos == 9) {
_queueQos9.Enqueue(zmqMessage);
} else {
_queueQos10.Enqueue(zmqMessage);
bool result = _queue.TryAdd(zmqMessage);
if (!result)
{
_logger.LogError("Failed to add message to the queue");
}

// Reply
Expand All @@ -191,7 +158,7 @@ async Task ResponderAsync(CancellationToken stoppingToken)

async Task PublisherAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Queue polling every {poll} seconds.", _settings.queuePoll ?? 10);
_logger.LogInformation("Creating a publisher socket.");

using var publisherSocket = new PublisherSocket();

Expand All @@ -206,56 +173,25 @@ async Task PublisherAsync(CancellationToken stoppingToken)
publisherSocket.Bind(pub);
}

// Track the poll count
int pollingTime = (_settings.queuePoll ?? 10) * 1000;
int heartbeatDue = 20000;

while (!stoppingToken.IsCancellationRequested)
// Long running task
await Task.Run(() =>
{
if (heartbeatDue >= 30000)
{
heartbeatDue = 0;

_logger.LogDebug("Heartbeat...");
publisherSocket.SendMultipartMessage(ZmqMessage.Heartbeat());
}

int currentQueueSize = GetCurrentQueueSize();
if (currentQueueSize > 0)
// The queue is never complete
while (!_queue.IsCompleted)
{
_logger.LogInformation("Queue Poll - work to be done, queue size: {size}", currentQueueSize);

// Send up to X messages
int messagesToSend = _settings.queueSize ?? 10;

ProcessQueue(publisherSocket, _queueQos10, ref messagesToSend);
ProcessQueue(publisherSocket, _queueQos9, ref messagesToSend);
ProcessQueue(publisherSocket, _queueQos8, ref messagesToSend);
ProcessQueue(publisherSocket, _queueQos7, ref messagesToSend);
ProcessQueue(publisherSocket, _queueQos6, ref messagesToSend);
ProcessQueue(publisherSocket, _queueQos5, ref messagesToSend);
ProcessQueue(publisherSocket, _queueQos4, ref messagesToSend);
ProcessQueue(publisherSocket, _queueQos3, ref messagesToSend);
ProcessQueue(publisherSocket, _queueQos2, ref messagesToSend);
ProcessQueue(publisherSocket, _queueQos1, ref messagesToSend);
}

heartbeatDue += pollingTime;

await Task.Delay(pollingTime, stoppingToken);
}
}
_logger.LogDebug("Waiting for message");

private void ProcessQueue(PublisherSocket publisherSocket, ConcurrentQueue<ZmqMessage> queue, ref int messagesToSend)
{
try {
bool isWork = !queue.IsEmpty;
while (messagesToSend > 0)
{
bool result = queue.TryDequeue(out ZmqMessage message);
bool result = _queue.TryTake(out ZmqMessage message, -1, stoppingToken);
if (result && message != null)
{
_logger.LogDebug("Sending message, qos {qos}", message.qos);
if (message.isHeartbeat)
{
_logger.LogDebug("Heartbeat...");
}
else
{
_logger.LogDebug("Sending message, qos {qos}, queue size {size}", message.qos, _queue.Count);
}

// Send with a timeout.
bool isSent = publisherSocket.TrySendMultipartMessage(
Expand All @@ -268,22 +204,24 @@ private void ProcessQueue(PublisherSocket publisherSocket, ConcurrentQueue<ZmqMe
_logger.LogError("Timeout sending message for channel: {channel} after {pubSendTimeoutMs}ms", message.channel, _settings.pubSendTimeoutMs ?? 500);
}

messagesToSend--;

// increment sent stat
Interlocked.Increment(ref _sentCount);
} else {
break;
if (!message.isHeartbeat)
{
Interlocked.Increment(ref _sentCount);
}
}
}

if (isWork) {
_logger.LogInformation("Queue empty or reached max send size");
}
}
catch (Exception e)
_logger.LogInformation("Queue completed");
}, stoppingToken);
}

async Task HeartbeatAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
_logger.LogError("Process Queue: failed {e}", e.Message);
_queue.TryAdd(new ZmqMessage { isHeartbeat = true, qos = 5});
await Task.Delay(30000, stoppingToken);
}
}

Expand All @@ -305,20 +243,6 @@ private static Dictionary<string, int> NewStats()
{ "10", 0 }
};
}

private int GetCurrentQueueSize()
{
return _queueQos1.Count
+ _queueQos2.Count
+ _queueQos3.Count
+ _queueQos4.Count
+ _queueQos5.Count
+ _queueQos6.Count
+ _queueQos7.Count
+ _queueQos8.Count
+ _queueQos9.Count
+ _queueQos10.Count;
}

private static string GetJsonStats(int queueSize, int sentCount, Dictionary<string, int> stats)
{
Expand Down
32 changes: 20 additions & 12 deletions ZmqMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,39 @@
* You should have received a copy of the GNU Affero General Public License
* along with Xibo. If not, see <http://www.gnu.org/licenses/>.
*/
using ConcurrentPriorityQueue.Core;
using NetMQ;

namespace xibo_xmr;
public class ZmqMessage
public class ZmqMessage : IHavePriority<int>
{
public string? channel { get; set; }
public string? key {get; set; }
public string? message {get; set; }
public int? qos {get; set; }
public bool isHeartbeat { get; set; }

public NetMQMessage AsNetMqMessage()
{
NetMQMessage netMQFrames = new(3);
netMQFrames.Append(channel);
netMQFrames.Append(key);
netMQFrames.Append(message);
return netMQFrames;
public int Priority {
get {
return (qos ?? 10) * -1;
}
}

public static NetMQMessage Heartbeat()
public NetMQMessage AsNetMqMessage()
{
NetMQMessage netMQFrames = new(3);
netMQFrames.Append("H");
netMQFrames.Append("");
netMQFrames.Append("");
if (isHeartbeat)
{
netMQFrames.Append("H");
netMQFrames.Append("");
netMQFrames.Append("");
}
else
{
netMQFrames.Append(channel);
netMQFrames.Append(key);
netMQFrames.Append(message);
}
return netMQFrames;
}
}
2 changes: 0 additions & 2 deletions ZmqSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ public class ZmqSettings
{
public string listenOn { get; set; }
public List<string> pubOn { get; set; }
public int? queuePoll { get; set; }
public int? queueSize {get; set; }
public bool ipv6RespSupport { get; set; }
public bool ipv6PubSupport { get; set; }
public int? pubSendTimeoutMs { get; set; }
Expand Down
2 changes: 0 additions & 2 deletions appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
"Zmq": {
"listenOn": "tcp://*:50001",
"pubOn": ["tcp://*:9505"],
"queuePoll": 5,
"queueSize": 10,
"ipv6RespSupport": false,
"ipv6PubSupport": false,
"pubSendTimeoutMs": 500
Expand Down
1 change: 1 addition & 0 deletions xibo-xmr.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="ConcurrentPriorityQueue" Version="2.1.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="7.0.1" />
<PackageReference Include="Microsoft.Extensions.Hosting.Systemd" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting.WindowsServices" Version="7.0.1" />
Expand Down

0 comments on commit 63cc8bf

Please sign in to comment.