diff --git a/Worker.cs b/Worker.cs index a6b43fa..11f0856 100644 --- a/Worker.cs +++ b/Worker.cs @@ -20,6 +20,7 @@ */ using System.Collections.Concurrent; using System.Text; +using ConcurrentPriorityQueue.Core; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; @@ -34,16 +35,7 @@ public class Worker : BackgroundService private readonly ILogger _logger; private readonly ZmqSettings _settings; - private readonly ConcurrentQueue _queueQos1; - private readonly ConcurrentQueue _queueQos2; - private readonly ConcurrentQueue _queueQos3; - private readonly ConcurrentQueue _queueQos4; - private readonly ConcurrentQueue _queueQos5; - private readonly ConcurrentQueue _queueQos6; - private readonly ConcurrentQueue _queueQos7; - private readonly ConcurrentQueue _queueQos8; - private readonly ConcurrentQueue _queueQos9; - private readonly ConcurrentQueue _queueQos10; + private readonly BlockingCollection _queue; private int _sentCount = 0; @@ -51,16 +43,7 @@ public Worker(ILogger logger, IOptions settings) { _logger = logger; _settings = settings.Value; - _queueQos1 = new(); - _queueQos2 = new(); - _queueQos3 = new(); - _queueQos4 = new(); - _queueQos5 = new(); - _queueQos6 = new(); - _queueQos7 = new(); - _queueQos8 = new(); - _queueQos9 = new(); - _queueQos10 = new(); + _queue = new ConcurrentPriorityQueue().ToBlockingCollection(); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) @@ -87,10 +70,11 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) // 3. Set up a periodic timer which sends a heartbeat message (H) every 30 seconds // ------- await Task.WhenAll( - Task.Factory.StartNew(() => { new NetMQRuntime().Run(stoppingToken, ResponderAsync(stoppingToken)); }, stoppingToken), - Task.Factory.StartNew(() => { new NetMQRuntime().Run(stoppingToken, PublisherAsync(stoppingToken)); }, stoppingToken) + Task.Factory.StartNew(() => { new NetMQRuntime().Run(stoppingToken, ResponderAsync(stoppingToken)); }, stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Default), + Task.Factory.StartNew(() => { new NetMQRuntime().Run(stoppingToken, PublisherAsync(stoppingToken)); }, stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Default), + Task.Factory.StartNew(() => { new NetMQRuntime().Run(stoppingToken, HeartbeatAsync(stoppingToken)); }, stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Default) ); - + // Must call clean up at the end NetMQConfig.Cleanup(); } @@ -116,7 +100,7 @@ async Task ResponderAsync(CancellationToken stoppingToken) // Are we a request for stats? if (message.Equals("stats")) { - string json = GetJsonStats(GetCurrentQueueSize(), _sentCount, stats); + string json = GetJsonStats(_queue.Count, _sentCount, stats); responseSocket.SendFrame(json); _logger.LogDebug("{json}", json); @@ -151,31 +135,14 @@ async Task ResponderAsync(CancellationToken stoppingToken) // Stats stats["total"]++; stats["" + zmqMessage.qos]++; - stats["peak"] = Math.Max(stats["peak"], GetCurrentQueueSize() + 1); + stats["peak"] = Math.Max(stats["peak"], _queue.Count + 1); // Add to the queue _logger.LogDebug("Queuing"); - - if (zmqMessage.qos == 1) { - _queueQos1.Enqueue(zmqMessage); - } else if (zmqMessage.qos == 2) { - _queueQos2.Enqueue(zmqMessage); - } else if (zmqMessage.qos == 3) { - _queueQos3.Enqueue(zmqMessage); - } else if (zmqMessage.qos == 4) { - _queueQos4.Enqueue(zmqMessage); - } else if (zmqMessage.qos == 5) { - _queueQos5.Enqueue(zmqMessage); - } else if (zmqMessage.qos == 6) { - _queueQos6.Enqueue(zmqMessage); - } else if (zmqMessage.qos == 7) { - _queueQos7.Enqueue(zmqMessage); - } else if (zmqMessage.qos == 8) { - _queueQos8.Enqueue(zmqMessage); - } else if (zmqMessage.qos == 9) { - _queueQos9.Enqueue(zmqMessage); - } else { - _queueQos10.Enqueue(zmqMessage); + bool result = _queue.TryAdd(zmqMessage); + if (!result) + { + _logger.LogError("Failed to add message to the queue"); } // Reply @@ -191,7 +158,7 @@ async Task ResponderAsync(CancellationToken stoppingToken) async Task PublisherAsync(CancellationToken stoppingToken) { - _logger.LogInformation("Queue polling every {poll} seconds.", _settings.queuePoll ?? 10); + _logger.LogInformation("Creating a publisher socket."); using var publisherSocket = new PublisherSocket(); @@ -206,56 +173,25 @@ async Task PublisherAsync(CancellationToken stoppingToken) publisherSocket.Bind(pub); } - // Track the poll count - int pollingTime = (_settings.queuePoll ?? 10) * 1000; - int heartbeatDue = 20000; - - while (!stoppingToken.IsCancellationRequested) + // Long running task + await Task.Run(() => { - if (heartbeatDue >= 30000) - { - heartbeatDue = 0; - - _logger.LogDebug("Heartbeat..."); - publisherSocket.SendMultipartMessage(ZmqMessage.Heartbeat()); - } - - int currentQueueSize = GetCurrentQueueSize(); - if (currentQueueSize > 0) + // The queue is never complete + while (!_queue.IsCompleted) { - _logger.LogInformation("Queue Poll - work to be done, queue size: {size}", currentQueueSize); - - // Send up to X messages - int messagesToSend = _settings.queueSize ?? 10; - - ProcessQueue(publisherSocket, _queueQos10, ref messagesToSend); - ProcessQueue(publisherSocket, _queueQos9, ref messagesToSend); - ProcessQueue(publisherSocket, _queueQos8, ref messagesToSend); - ProcessQueue(publisherSocket, _queueQos7, ref messagesToSend); - ProcessQueue(publisherSocket, _queueQos6, ref messagesToSend); - ProcessQueue(publisherSocket, _queueQos5, ref messagesToSend); - ProcessQueue(publisherSocket, _queueQos4, ref messagesToSend); - ProcessQueue(publisherSocket, _queueQos3, ref messagesToSend); - ProcessQueue(publisherSocket, _queueQos2, ref messagesToSend); - ProcessQueue(publisherSocket, _queueQos1, ref messagesToSend); - } - - heartbeatDue += pollingTime; - - await Task.Delay(pollingTime, stoppingToken); - } - } + _logger.LogDebug("Waiting for message"); - private void ProcessQueue(PublisherSocket publisherSocket, ConcurrentQueue queue, ref int messagesToSend) - { - try { - bool isWork = !queue.IsEmpty; - while (messagesToSend > 0) - { - bool result = queue.TryDequeue(out ZmqMessage message); + bool result = _queue.TryTake(out ZmqMessage message, -1, stoppingToken); if (result && message != null) { - _logger.LogDebug("Sending message, qos {qos}", message.qos); + if (message.isHeartbeat) + { + _logger.LogDebug("Heartbeat..."); + } + else + { + _logger.LogDebug("Sending message, qos {qos}, queue size {size}", message.qos, _queue.Count); + } // Send with a timeout. bool isSent = publisherSocket.TrySendMultipartMessage( @@ -268,22 +204,24 @@ private void ProcessQueue(PublisherSocket publisherSocket, ConcurrentQueue NewStats() { "10", 0 } }; } - - private int GetCurrentQueueSize() - { - return _queueQos1.Count - + _queueQos2.Count - + _queueQos3.Count - + _queueQos4.Count - + _queueQos5.Count - + _queueQos6.Count - + _queueQos7.Count - + _queueQos8.Count - + _queueQos9.Count - + _queueQos10.Count; - } private static string GetJsonStats(int queueSize, int sentCount, Dictionary stats) { diff --git a/ZmqMessage.cs b/ZmqMessage.cs index 44f94ee..eaeb617 100644 --- a/ZmqMessage.cs +++ b/ZmqMessage.cs @@ -18,31 +18,39 @@ * You should have received a copy of the GNU Affero General Public License * along with Xibo. If not, see . */ +using ConcurrentPriorityQueue.Core; using NetMQ; namespace xibo_xmr; -public class ZmqMessage +public class ZmqMessage : IHavePriority { public string? channel { get; set; } public string? key {get; set; } public string? message {get; set; } public int? qos {get; set; } + public bool isHeartbeat { get; set; } - public NetMQMessage AsNetMqMessage() - { - NetMQMessage netMQFrames = new(3); - netMQFrames.Append(channel); - netMQFrames.Append(key); - netMQFrames.Append(message); - return netMQFrames; + public int Priority { + get { + return (qos ?? 10) * -1; + } } - public static NetMQMessage Heartbeat() + public NetMQMessage AsNetMqMessage() { NetMQMessage netMQFrames = new(3); - netMQFrames.Append("H"); - netMQFrames.Append(""); - netMQFrames.Append(""); + if (isHeartbeat) + { + netMQFrames.Append("H"); + netMQFrames.Append(""); + netMQFrames.Append(""); + } + else + { + netMQFrames.Append(channel); + netMQFrames.Append(key); + netMQFrames.Append(message); + } return netMQFrames; } } diff --git a/ZmqSettings.cs b/ZmqSettings.cs index 793ede6..25a4abd 100644 --- a/ZmqSettings.cs +++ b/ZmqSettings.cs @@ -23,8 +23,6 @@ public class ZmqSettings { public string listenOn { get; set; } public List pubOn { get; set; } - public int? queuePoll { get; set; } - public int? queueSize {get; set; } public bool ipv6RespSupport { get; set; } public bool ipv6PubSupport { get; set; } public int? pubSendTimeoutMs { get; set; } diff --git a/appsettings.json b/appsettings.json index 1f8a0fa..ae1ef8e 100644 --- a/appsettings.json +++ b/appsettings.json @@ -16,8 +16,6 @@ "Zmq": { "listenOn": "tcp://*:50001", "pubOn": ["tcp://*:9505"], - "queuePoll": 5, - "queueSize": 10, "ipv6RespSupport": false, "ipv6PubSupport": false, "pubSendTimeoutMs": 500 diff --git a/xibo-xmr.csproj b/xibo-xmr.csproj index 3dc1b75..a04e63a 100644 --- a/xibo-xmr.csproj +++ b/xibo-xmr.csproj @@ -9,6 +9,7 @@ +