diff --git a/main/devices/Device.hpp b/main/devices/Device.hpp index 693fecbe..15f021b3 100644 --- a/main/devices/Device.hpp +++ b/main/devices/Device.hpp @@ -18,12 +18,15 @@ #include #include #include +#include +#include using namespace std::chrono; using namespace std::chrono_literals; using std::shared_ptr; using namespace farmhub::kernel; using namespace farmhub::kernel::drivers; +using namespace farmhub::kernel::mqtt; #if defined(MK4) #include @@ -213,7 +216,7 @@ class MemoryTelemetryProvider : public TelemetryProvider { class MqttTelemetryPublisher : public TelemetryPublisher { public: - MqttTelemetryPublisher(shared_ptr mqttRoot, TelemetryCollector& telemetryCollector) + MqttTelemetryPublisher(shared_ptr mqttRoot, TelemetryCollector& telemetryCollector) : mqttRoot(mqttRoot) , telemetryCollector(telemetryCollector) { } @@ -223,7 +226,7 @@ class MqttTelemetryPublisher : public TelemetryPublisher { } private: - shared_ptr mqttRoot; + shared_ptr mqttRoot; TelemetryCollector& telemetryCollector; }; @@ -248,12 +251,13 @@ class ConfiguredKernel { }); } - LOGI(" ______ _ _ _"); - LOGI(" | ____| | | | | | |"); - LOGI(" | |__ __ _ _ __ _ __ ___ | |__| |_ _| |__"); - LOGI(" | __/ _` | '__| '_ ` _ \\| __ | | | | '_ \\"); - LOGI(" | | | (_| | | | | | | | | | | | |_| | |_) |"); - LOGI(" |_| \\__,_|_| |_| |_| |_|_| |_|\\__,_|_.__/ " FARMHUB_VERSION); + LOGD(" ______ _ _ _"); + LOGD(" | ____| | | | | | |"); + LOGD(" | |__ __ _ _ __ _ __ ___ | |__| |_ _| |__"); + LOGD(" | __/ _` | '__| '_ ` _ \\| __ | | | | '_ \\"); + LOGD(" | | | (_| | | | | | | | | | | | |_| | |_) |"); + LOGD(" |_| \\__,_|_| |_| |_| |_|_| |_|\\__,_|_.__/ " FARMHUB_VERSION); + LOGD(" "); } void registerShutdownListener(std::function listener) { @@ -388,13 +392,21 @@ class Device { if (record.level > deviceConfig.publishLogs.get()) { return; } + auto length = record.message.length(); + // Remove the level prefix + auto messageStart = 2; + // Remove trailing newline + auto messageEnd = record.message.charAt(length - 1) == '\n' + ? length - 1 + : length; + String message = record.message.substring(messageStart, messageEnd); mqttDeviceRoot->publish( "log", [&](JsonObject& json) { json["level"] = record.level; - json["message"] = record.message; + json["message"] = message; }, - MqttDriver::Retention::NoRetain, MqttDriver::QoS::AtLeastOnce, ticks::zero(), MqttDriver::LogPublish::Silent); + Retention::NoRetain, QoS::AtLeastOnce, 2s, LogPublish::Silent); }); }); @@ -444,7 +456,7 @@ class Device { json["peripherals"].to().set(peripheralsInitJson); json["sleepWhenIdle"] = kernel.sleepManager.sleepWhenIdle; }, - MqttDriver::Retention::NoRetain, MqttDriver::QoS::AtLeastOnce, ticks::max()); + Retention::NoRetain, QoS::AtLeastOnce, ticks::max()); Task::loop("telemetry", 8192, [this](Task& task) { publishTelemetry(); @@ -496,7 +508,7 @@ class Device { TDeviceDefinition& deviceDefinition = configuredKernel.deviceDefinition; TDeviceConfiguration& deviceConfig = deviceDefinition.config; - shared_ptr mqttDeviceRoot = kernel.mqtt.forRoot(locationPrefix() + "devices/ugly-duckling/" + deviceConfig.instance.get()); + shared_ptr mqttDeviceRoot = kernel.mqtt.forRoot(locationPrefix() + "devices/ugly-duckling/" + deviceConfig.instance.get()); PeripheralManager peripheralManager { kernel.i2c, deviceDefinition.pcnt, deviceDefinition.pwm, kernel.sleepManager, kernel.switches, mqttDeviceRoot }; TelemetryCollector deviceTelemetryCollector; diff --git a/main/kernel/Component.hpp b/main/kernel/Component.hpp index 5ea582be..54837437 100644 --- a/main/kernel/Component.hpp +++ b/main/kernel/Component.hpp @@ -1,20 +1,20 @@ #pragma once #include -#include +#include -using namespace farmhub::kernel::drivers; +using namespace farmhub::kernel::mqtt; namespace farmhub::kernel { class Component : public Named { protected: - Component(const String& name, shared_ptr mqttRoot) + Component(const String& name, shared_ptr mqttRoot) : Named(name) , mqttRoot(mqttRoot) { } - shared_ptr mqttRoot; + shared_ptr mqttRoot; }; } // namespace farmhub::kernel diff --git a/main/kernel/Console.hpp b/main/kernel/Console.hpp index a3bda882..c01b4392 100644 --- a/main/kernel/Console.hpp +++ b/main/kernel/Console.hpp @@ -42,11 +42,7 @@ class ConsoleProvider { int processLog(const char* format, va_list args) { Level level = getLevel(format[0]); - if (level <= recordedLevel) { - std::lock_guard lock(bufferMutex); - vsnprintf(buffer, BUFFER_SIZE, format, args); - logRecords.offer(level, buffer); - } + recordLog(level, format, args); int count = 0; #ifdef FARMHUB_DEBUG @@ -87,6 +83,31 @@ class ConsoleProvider { return count; } + void recordLog(Level level, const char* format, va_list args) { + if (level > recordedLevel) { + return; + } + + int length; + { + std::lock_guard lock(bufferMutex); + length = vsnprintf(buffer, BUFFER_SIZE, format, args); + if (length < 0) { + printf("Encountered an encoding error"); + } else if (length < BUFFER_SIZE) { + logRecords.offer(level, buffer); + return; + } + } + + // The buffer was too small, try again with a heap-allocated buffer instead, but still limit length + length = std::min(length, 2048); + char* heapBuffer = new char[length + 1]; + vsnprintf(heapBuffer, length + 1, format, args); + logRecords.offer(level, String(heapBuffer, length)); + delete[] heapBuffer; + } + static inline Level getLevel(char c) { switch (c) { case 'E': diff --git a/main/kernel/Kernel.hpp b/main/kernel/Kernel.hpp index d7cbdcce..17cfb326 100644 --- a/main/kernel/Kernel.hpp +++ b/main/kernel/Kernel.hpp @@ -19,14 +19,15 @@ #include #include #include -#include #include #include #include +#include using namespace std::chrono; using namespace std::chrono_literals; using namespace farmhub::kernel::drivers; +using namespace farmhub::kernel::mqtt; namespace farmhub::kernel { diff --git a/main/kernel/Log.hpp b/main/kernel/Log.hpp index 525c33b8..a51c6bc6 100644 --- a/main/kernel/Log.hpp +++ b/main/kernel/Log.hpp @@ -9,12 +9,13 @@ namespace farmhub::kernel { enum class Level { - None = ESP_LOG_NONE, - Error = ESP_LOG_ERROR, - Warning = ESP_LOG_WARN, - Info = ESP_LOG_INFO, - Debug = ESP_LOG_DEBUG, - Verbose = ESP_LOG_VERBOSE, + None = 0, + // Fatal = 1, + Error = 2, + Warning = 3, + Info = 4, + Debug = 5, + Verbose = 6, }; #define FARMHUB_LOG(level, format, ...) \ diff --git a/main/kernel/drivers/RtcDriver.hpp b/main/kernel/drivers/RtcDriver.hpp index 7f46825d..1d70bb0b 100644 --- a/main/kernel/drivers/RtcDriver.hpp +++ b/main/kernel/drivers/RtcDriver.hpp @@ -7,10 +7,11 @@ #include "esp_netif_sntp.h" #include "esp_sntp.h" +#include #include #include - #include +#include using namespace std::chrono; using namespace std::chrono_literals; diff --git a/main/kernel/drivers/MqttDriver.hpp b/main/kernel/mqtt/MqttDriver.hpp similarity index 78% rename from main/kernel/drivers/MqttDriver.hpp rename to main/kernel/mqtt/MqttDriver.hpp index 3c430d78..51dca8d6 100644 --- a/main/kernel/drivers/MqttDriver.hpp +++ b/main/kernel/mqtt/MqttDriver.hpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -22,160 +23,64 @@ using namespace farmhub::kernel; using std::make_shared; using std::shared_ptr; -namespace farmhub::kernel::drivers { +using namespace farmhub::kernel::drivers; -class MqttDriver { -public: - enum class Retention { - NoRetain, - Retain - }; - - enum class QoS { - AtMostOnce = 0, - AtLeastOnce = 1, - ExactlyOnce = 2 - }; - - enum class LogPublish { - Log, - Silent - }; - - enum class PublishStatus { - TimeOut = 0, - Success = 1, - Failed = 2, - Pending = 3, - QueueFull = 4 - }; - - typedef std::function CommandHandler; - - typedef std::function SubscriptionHandler; - - class MqttRoot { - public: - MqttRoot(MqttDriver& mqtt, const String& rootTopic) - : mqtt(mqtt) - , rootTopic(rootTopic) { - } - - shared_ptr forSuffix(const String& suffix) { - return make_shared(mqtt, rootTopic + "/" + suffix); - } - - PublishStatus publish(const String& suffix, const JsonDocument& json, Retention retain = Retention::NoRetain, QoS qos = QoS::AtMostOnce, ticks timeout = ticks::zero(), LogPublish log = LogPublish::Log) { - return mqtt.publish(fullTopic(suffix), json, retain, qos, timeout, log); - } - - PublishStatus publish(const String& suffix, std::function populate, Retention retain = Retention::NoRetain, QoS qos = QoS::AtMostOnce, ticks timeout = ticks::zero(), LogPublish log = LogPublish::Log) { - JsonDocument doc; - JsonObject root = doc.to(); - populate(root); - return publish(suffix, doc, retain, qos, timeout, log); - } - - PublishStatus clear(const String& suffix, Retention retain = Retention::NoRetain, QoS qos = QoS::AtMostOnce, ticks timeout = ticks::zero()) { - return mqtt.clear(fullTopic(suffix), retain, qos, timeout); - } +namespace farmhub::kernel::mqtt { - bool subscribe(const String& suffix, SubscriptionHandler handler) { - return subscribe(suffix, QoS::ExactlyOnce, handler); - } +enum class Retention { + NoRetain, + Retain +}; - bool registerCommand(const String& name, CommandHandler handler) { - String suffix = "commands/" + name; - return subscribe(suffix, QoS::ExactlyOnce, [this, name, suffix, handler](const String&, const JsonObject& request) { - // TODO Do exponential backoff when clear cannot be finished - // Clear topic and wait for it to be cleared - auto clearStatus = mqtt.clear(fullTopic(suffix), Retention::Retain, QoS::ExactlyOnce, std::chrono::seconds { 5 }, MQTT_ALERT_AFTER_INCOMING); - if (clearStatus != PublishStatus::Success) { - LOGE("MQTT: Failed to clear retained command topic '%s', status: %d", - suffix.c_str(), static_cast(clearStatus)); - } +enum class QoS { + AtMostOnce = 0, + AtLeastOnce = 1, + ExactlyOnce = 2 +}; - JsonDocument responseDoc; - auto response = responseDoc.to(); - handler(request, response); - if (response.size() > 0) { - publish("responses/" + name, responseDoc, Retention::NoRetain, QoS::ExactlyOnce); - } - }); - } +enum class LogPublish { + Log, + Silent +}; - void registerCommand(Command& command) { - registerCommand(command.name, [&](const JsonObject& request, JsonObject& response) { - command.handle(request, response); - }); - } +enum class PublishStatus { + TimeOut = 0, + Success = 1, + Failed = 2, + Pending = 3, + QueueFull = 4 +}; - /** - * @brief Subscribes to the given topic under the topic prefix. - * - * Note that subscription does not support wildcards. - */ - bool subscribe(const String& suffix, QoS qos, SubscriptionHandler handler) { - return mqtt.subscribe(fullTopic(suffix), qos, handler); - } +typedef std::function CommandHandler; - private: - String fullTopic(const String& suffix) const { - return rootTopic + "/" + suffix; - } +typedef std::function SubscriptionHandler; - MqttDriver& mqtt; - const String rootTopic; - }; +class MqttRoot; +class MqttDriver { private: struct OutgoingMessage { - String topic; - String payload; - Retention retain; - QoS qos; - TaskHandle_t waitingTask; - LogPublish log; - milliseconds extendAlert; + const String topic; + const String payload; + const Retention retain; + const QoS qos; + const TaskHandle_t waitingTask; + const LogPublish log; + const milliseconds extendAlert; static const uint32_t PUBLISH_SUCCESS = 1; static const uint32_t PUBLISH_FAILED = 2; - - OutgoingMessage(const String& topic, const String& payload, Retention retention, QoS qos, TaskHandle_t waitingTask, LogPublish log, milliseconds extendAlert) - : topic(topic) - , payload(payload) - , retain(retention) - , qos(qos) - , waitingTask(waitingTask) - , log(log) - , extendAlert(extendAlert) { - } }; struct IncomingMessage { - String topic; - String payload; - - IncomingMessage(const String& topic, const String& payload) - : topic(topic) - , payload(payload) { - } + const String topic; + const String payload; }; struct Subscription { const String topic; const QoS qos; const SubscriptionHandler handle; - - Subscription(const String& topic, QoS qos, SubscriptionHandler handle) - : topic(topic) - , qos(qos) - , handle(handle) { - } - - Subscription(const Subscription& other) - : Subscription(other.topic, other.qos, other.handle) { - } }; public: @@ -236,6 +141,8 @@ class MqttDriver { #endif String payload; serializeJson(json, payload); + // Stay alert until the message is sent + extendAlert = std::max(duration_cast(timeout), extendAlert); return executeAndAwait(timeout, [&](TaskHandle_t waitingTask) { return outgoingQueue.offerIn(MQTT_QUEUE_TIMEOUT, OutgoingMessage { topic, payload, retain, qos, waitingTask, log, extendAlert }); }); @@ -244,6 +151,8 @@ class MqttDriver { PublishStatus clear(const String& topic, Retention retain, QoS qos, ticks timeout = ticks::zero(), milliseconds extendAlert = MQTT_ALERT_AFTER_OUTGOING) { LOGD("MQTT: Clearing topic '%s'", topic.c_str()); + // Stay alert until the message is sent + extendAlert = std::max(duration_cast(timeout), extendAlert); return executeAndAwait(timeout, [&](TaskHandle_t waitingTask) { return outgoingQueue.offerIn(MQTT_QUEUE_TIMEOUT, OutgoingMessage { topic, "", retain, qos, waitingTask, LogPublish::Log, extendAlert }); }); @@ -260,13 +169,20 @@ class MqttDriver { } uint32_t status = ulTaskNotifyTake(pdTRUE, timeout.count()); switch (status) { - case 0: + case 0: { + Lock lock(pendingMessagesMutex); + pendingMessages.remove_if([waitingTask](const auto& message) { + return message.waitingTask == waitingTask; + }); return PublishStatus::TimeOut; - case OutgoingMessage::PUBLISH_SUCCESS: + } + case OutgoingMessage::PUBLISH_SUCCESS: { return PublishStatus::Success; + } case OutgoingMessage::PUBLISH_FAILED: - default: + default: { return PublishStatus::Failed; + } } } @@ -495,6 +411,8 @@ class MqttDriver { case MQTT_EVENT_DISCONNECTED: { LOGV("MQTT: Disconnected from MQTT server"); mqttReady.clear(); + Lock lock(pendingMessagesMutex); + pendingMessages.clear(); break; } case MQTT_EVENT_SUBSCRIBED: { @@ -507,6 +425,7 @@ class MqttDriver { } case MQTT_EVENT_PUBLISHED: { LOGV("MQTT: Published, message ID %d", event->msg_id); + notifyPendingTask(event, true); break; } case MQTT_EVENT_DATA: { @@ -524,6 +443,7 @@ class MqttDriver { logErrorIfNonZero("reported from tls stack", event->error_handle->esp_tls_stack_err); logErrorIfNonZero("captured as transport's socket errno", event->error_handle->esp_transport_sock_errno); } + notifyPendingTask(event, false); break; } default: { @@ -533,6 +453,25 @@ class MqttDriver { } } + void notifyPendingTask(esp_mqtt_event_handle_t event, bool success) { + Lock lock(pendingMessagesMutex); + pendingMessages.remove_if([&](const auto& message) { + if (message.messageId == event->msg_id) { + notifyWaitingTask(message.waitingTask, success); + return true; + } else { + return false; + } + }); + } + + void notifyWaitingTask(TaskHandle_t task, bool success) { + if (task != nullptr) { + uint32_t status = success ? OutgoingMessage::PUBLISH_SUCCESS : OutgoingMessage::PUBLISH_FAILED; + xTaskNotify(task, status, eSetValueWithOverwrite); + } + } + static void logErrorIfNonZero(const char* message, int error) { if (error != 0) { LOGE(" - %s: 0x%x", message, error); @@ -540,38 +479,47 @@ class MqttDriver { } void processOutgoingMessage(const OutgoingMessage message) { - int ret = esp_mqtt_client_publish( + Lock lock(pendingMessagesMutex); + int ret = esp_mqtt_client_enqueue( client, message.topic.c_str(), message.payload.c_str(), message.payload.length(), static_cast(message.qos), - message.retain == Retention::Retain); + message.retain == Retention::Retain, + true); #ifdef DUMP_MQTT if (message.log == LogPublish::Log) { LOGV("MQTT: Published to '%s' (size: %d), result: %d", message.topic.c_str(), message.payload.length(), ret); } #endif - bool success; switch (ret) { - case -1: + case -1: { LOGD("MQTT: Error publishing to '%s'", message.topic.c_str()); - success = false; + notifyWaitingTask(message.waitingTask, false); break; - case -2: + } + case -2: { LOGD("MQTT: Outbox full, message to '%s' dropped", message.topic.c_str()); - success = false; + notifyWaitingTask(message.waitingTask, false); break; - default: - success = true; - break; - } - if (message.waitingTask != nullptr) { - uint32_t status = success ? OutgoingMessage::PUBLISH_SUCCESS : OutgoingMessage::PUBLISH_FAILED; - xTaskNotify(message.waitingTask, status, eSetValueWithOverwrite); + } + default: { + auto messageId = ret; + if (message.waitingTask != nullptr) { + if (messageId == 0) { + // Notify tasks waiting on QoS 0 messages immediately + notifyWaitingTask(message.waitingTask, true); + } else { + // Record + pendingMessages.push_back({ messageId, message.waitingTask }); + } + break; + } + } } } @@ -664,15 +612,22 @@ class MqttDriver { // TODO Use a map instead std::list subscriptions; + struct PendingMessage { + const int messageId; + const TaskHandle_t waitingTask; + }; + Mutex pendingMessagesMutex; + std::list pendingMessages; + // TODO Review these values static constexpr milliseconds MQTT_CONNECTION_TIMEOUT = 10s; static constexpr milliseconds MQTT_LOOP_INTERVAL = 1s; static constexpr milliseconds MQTT_DISCONNECTED_CHECK_INTERVAL = 5s; static constexpr milliseconds MQTT_QUEUE_TIMEOUT = 1s; static constexpr milliseconds MQTT_ALERT_AFTER_OUTGOING = 1s; - static constexpr milliseconds MQTT_ALERT_AFTER_INCOMING = 30s; - static constexpr milliseconds MQTT_MAX_TIMEOUT_POWER_SAVE = 1h; + + friend class MqttRoot; }; -} // namespace farmhub::kernel::drivers +} // namespace farmhub::kernel::mqtt diff --git a/main/kernel/mqtt/MqttRoot.hpp b/main/kernel/mqtt/MqttRoot.hpp new file mode 100644 index 00000000..70b45ae5 --- /dev/null +++ b/main/kernel/mqtt/MqttRoot.hpp @@ -0,0 +1,85 @@ +#pragma once + +#include + +#include + +namespace farmhub::kernel::mqtt { + +class MqttRoot { +public: + MqttRoot(MqttDriver& mqtt, const String& rootTopic) + : mqtt(mqtt) + , rootTopic(rootTopic) { + } + + shared_ptr forSuffix(const String& suffix) { + return make_shared(mqtt, rootTopic + "/" + suffix); + } + + PublishStatus publish(const String& suffix, const JsonDocument& json, Retention retain = Retention::NoRetain, QoS qos = QoS::AtMostOnce, ticks timeout = ticks::zero(), LogPublish log = LogPublish::Log) { + return mqtt.publish(fullTopic(suffix), json, retain, qos, timeout, log); + } + + PublishStatus publish(const String& suffix, std::function populate, Retention retain = Retention::NoRetain, QoS qos = QoS::AtMostOnce, ticks timeout = ticks::zero(), LogPublish log = LogPublish::Log) { + JsonDocument doc; + JsonObject root = doc.to(); + populate(root); + return publish(suffix, doc, retain, qos, timeout, log); + } + + PublishStatus clear(const String& suffix, Retention retain = Retention::NoRetain, QoS qos = QoS::AtMostOnce, ticks timeout = ticks::zero()) { + return mqtt.clear(fullTopic(suffix), retain, qos, timeout); + } + + bool subscribe(const String& suffix, SubscriptionHandler handler) { + return subscribe(suffix, QoS::ExactlyOnce, handler); + } + + bool registerCommand(const String& name, CommandHandler handler) { + String suffix = "commands/" + name; + return subscribe(suffix, QoS::ExactlyOnce, [this, name, suffix, handler](const String&, const JsonObject& request) { + // TODO Do exponential backoff when clear cannot be finished + // Clear topic and wait for it to be cleared + auto clearStatus = mqtt.clear(fullTopic(suffix), Retention::Retain, QoS::ExactlyOnce, std::chrono::seconds { 5 }, MQTT_ALERT_AFTER_INCOMING); + if (clearStatus != PublishStatus::Success) { + LOGE("MQTT: Failed to clear retained command topic '%s', status: %d", + suffix.c_str(), static_cast(clearStatus)); + } + + JsonDocument responseDoc; + auto response = responseDoc.to(); + handler(request, response); + if (response.size() > 0) { + publish("responses/" + name, responseDoc, Retention::NoRetain, QoS::ExactlyOnce); + } + }); + } + + void registerCommand(Command& command) { + registerCommand(command.name, [&](const JsonObject& request, JsonObject& response) { + command.handle(request, response); + }); + } + + /** + * @brief Subscribes to the given topic under the topic prefix. + * + * Note that subscription does not support wildcards. + */ + bool subscribe(const String& suffix, QoS qos, SubscriptionHandler handler) { + return mqtt.subscribe(fullTopic(suffix), qos, handler); + } + +private: + String fullTopic(const String& suffix) const { + return rootTopic + "/" + suffix; + } + + MqttDriver& mqtt; + const String rootTopic; + + static constexpr milliseconds MQTT_ALERT_AFTER_INCOMING = 30s; +}; + +} // namespace farmhub::kernel::mqtt diff --git a/main/peripherals/Peripheral.hpp b/main/peripherals/Peripheral.hpp index 3fcb57c7..2d9a53d5 100644 --- a/main/peripherals/Peripheral.hpp +++ b/main/peripherals/Peripheral.hpp @@ -10,8 +10,8 @@ #include #include #include -#include #include +#include using std::move; using std::shared_ptr; @@ -19,6 +19,7 @@ using std::unique_ptr; using namespace farmhub::kernel; using namespace farmhub::kernel::drivers; +using namespace farmhub::kernel::mqtt; namespace farmhub::peripherals { @@ -28,7 +29,7 @@ class PeripheralBase : public TelemetryProvider, public Named { public: - PeripheralBase(const String& name, shared_ptr mqttRoot, size_t telemetrySize = 2048) + PeripheralBase(const String& name, shared_ptr mqttRoot, size_t telemetrySize = 2048) : Named(name) , mqttRoot(mqttRoot) , telemetrySize(telemetrySize) { @@ -64,7 +65,7 @@ class PeripheralBase } protected: - shared_ptr mqttRoot; + shared_ptr mqttRoot; private: const size_t telemetrySize; @@ -74,7 +75,7 @@ template class Peripheral : public PeripheralBase { public: - Peripheral(const String& name, shared_ptr mqttRoot) + Peripheral(const String& name, shared_ptr mqttRoot) : PeripheralBase(name, mqttRoot) { } @@ -114,7 +115,7 @@ class PeripheralFactoryBase { , peripheralType(peripheralType) { } - virtual unique_ptr createPeripheral(const String& name, const String& jsonConfig, shared_ptr mqttRoot, PeripheralServices& services, JsonObject& initConfigJson) = 0; + virtual unique_ptr createPeripheral(const String& name, const String& jsonConfig, shared_ptr mqttRoot, PeripheralServices& services, JsonObject& initConfigJson) = 0; const String factoryType; const String peripheralType; @@ -133,7 +134,7 @@ class PeripheralFactory : public PeripheralFactoryBase { , deviceConfigArgs(std::forward(deviceConfigArgs)...) { } - unique_ptr createPeripheral(const String& name, const String& jsonConfig, shared_ptr mqttRoot, PeripheralServices& services, JsonObject& initConfigJson) override { + unique_ptr createPeripheral(const String& name, const String& jsonConfig, shared_ptr mqttRoot, PeripheralServices& services, JsonObject& initConfigJson) override { // Use short prefix because SPIFFS has a 32 character limit ConfigurationFile* configFile = new ConfigurationFile(FileSystem::get(), "/p/" + name); mqttRoot->subscribe("config", [name, configFile](const String&, const JsonObject& configJson) { @@ -154,7 +155,7 @@ class PeripheralFactory : public PeripheralFactoryBase { return peripheral; } - virtual unique_ptr> createPeripheral(const String& name, const TDeviceConfig& deviceConfig, shared_ptr mqttRoot, PeripheralServices& services) = 0; + virtual unique_ptr> createPeripheral(const String& name, const TDeviceConfig& deviceConfig, shared_ptr mqttRoot, PeripheralServices& services) = 0; private: std::tuple deviceConfigArgs; @@ -171,7 +172,7 @@ class PeripheralManager PwmManager& pwmManager, SleepManager& sleepManager, SwitchManager& switchManager, - const shared_ptr mqttDeviceRoot) + const shared_ptr mqttDeviceRoot) : services({ i2c, pcntManager, pwmManager, sleepManager, switchManager }) , mqttDeviceRoot(mqttDeviceRoot) { } @@ -268,7 +269,7 @@ class PeripheralManager throw PeripheralCreationException("Factory not found: '" + factoryType + "'"); } const String& peripheralType = it->second.get().peripheralType; - shared_ptr mqttRoot = mqttDeviceRoot->forSuffix("peripherals/" + peripheralType + "/" + name); + shared_ptr mqttRoot = mqttDeviceRoot->forSuffix("peripherals/" + peripheralType + "/" + name); PeripheralFactoryBase& factory = it->second.get(); return factory.createPeripheral(name, configJson, mqttRoot, services, initConfigJson); } @@ -281,7 +282,7 @@ class PeripheralManager // TODO Make this immutable somehow PeripheralServices services; - const shared_ptr mqttDeviceRoot; + const shared_ptr mqttDeviceRoot; // TODO Use an unordered_map? std::map> factories; diff --git a/main/peripherals/chicken_door/ChickenDoor.hpp b/main/peripherals/chicken_door/ChickenDoor.hpp index ce1e8823..7aa93baa 100644 --- a/main/peripherals/chicken_door/ChickenDoor.hpp +++ b/main/peripherals/chicken_door/ChickenDoor.hpp @@ -89,7 +89,7 @@ class ChickenDoorComponent public: ChickenDoorComponent( const String& name, - shared_ptr mqttRoot, + shared_ptr mqttRoot, SleepManager& sleepManager, SwitchManager& switches, PwmMotorDriver& motor, @@ -355,7 +355,7 @@ class ChickenDoor public: ChickenDoor( const String& name, - shared_ptr mqttRoot, + shared_ptr mqttRoot, I2CManager& i2c, uint8_t lightSensorAddress, SleepManager& sleepManager, @@ -403,7 +403,7 @@ class NoLightSensorComponent : public LightSensorComponent { public: NoLightSensorComponent( const String& name, - shared_ptr mqttRoot, + shared_ptr mqttRoot, I2CManager& i2c, I2CConfig config, seconds measurementFrequency, @@ -427,7 +427,7 @@ class ChickenDoorFactory , Motorized(motors) { } - unique_ptr> createPeripheral(const String& name, const ChickenDoorDeviceConfig& deviceConfig, shared_ptr mqttRoot, PeripheralServices& services) override { + unique_ptr> createPeripheral(const String& name, const ChickenDoorDeviceConfig& deviceConfig, shared_ptr mqttRoot, PeripheralServices& services) override { PwmMotorDriver& motor = findMotor(deviceConfig.motor.get()); auto lightSensorType = deviceConfig.lightSensor.get().type.get(); try { diff --git a/main/peripherals/environment/Ds18B20SoilSensor.hpp b/main/peripherals/environment/Ds18B20SoilSensor.hpp index d5e6c6fb..5cf649fd 100644 --- a/main/peripherals/environment/Ds18B20SoilSensor.hpp +++ b/main/peripherals/environment/Ds18B20SoilSensor.hpp @@ -10,11 +10,10 @@ #include #include #include -#include #include using namespace farmhub::kernel; -using namespace farmhub::kernel::drivers; +using namespace farmhub::kernel::mqtt; using namespace farmhub::peripherals; using std::make_unique; using std::unique_ptr; @@ -31,7 +30,7 @@ class Ds18B20SoilSensorComponent public: Ds18B20SoilSensorComponent( const String& name, - shared_ptr mqttRoot, + shared_ptr mqttRoot, InternalPinPtr pin) : Component(name, mqttRoot) { @@ -90,7 +89,7 @@ class Ds18B20SoilSensorComponent class Ds18B20SoilSensor : public Peripheral { public: - Ds18B20SoilSensor(const String& name, shared_ptr mqttRoot, InternalPinPtr pin) + Ds18B20SoilSensor(const String& name, shared_ptr mqttRoot, InternalPinPtr pin) : Peripheral(name, mqttRoot) , sensor(name, mqttRoot, pin) { } @@ -110,7 +109,7 @@ class Ds18B20SoilSensorFactory : PeripheralFactory("environment:ds18b20", "environment") { } - unique_ptr> createPeripheral(const String& name, const SinglePinDeviceConfig& deviceConfig, shared_ptr mqttRoot, PeripheralServices& services) override { + unique_ptr> createPeripheral(const String& name, const SinglePinDeviceConfig& deviceConfig, shared_ptr mqttRoot, PeripheralServices& services) override { return make_unique(name, mqttRoot, deviceConfig.pin.get()); } }; diff --git a/main/peripherals/environment/Environment.hpp b/main/peripherals/environment/Environment.hpp index e3777f0e..678f7a6d 100644 --- a/main/peripherals/environment/Environment.hpp +++ b/main/peripherals/environment/Environment.hpp @@ -4,13 +4,12 @@ #include #include -#include #include #include using namespace farmhub::kernel; -using namespace farmhub::kernel::drivers; +using namespace farmhub::kernel::mqtt; using namespace farmhub::peripherals; using std::make_unique; using std::unique_ptr; @@ -24,7 +23,7 @@ class Environment Environment( const String& name, const String& sensorType, - shared_ptr mqttRoot, + shared_ptr mqttRoot, I2CManager& i2c, I2CConfig config) : Peripheral(name, mqttRoot) @@ -49,7 +48,7 @@ class I2CEnvironmentFactory , defaultAddress(defaultAddress) { } - unique_ptr> createPeripheral(const String& name, const I2CDeviceConfig& deviceConfig, shared_ptr mqttRoot, PeripheralServices& services) override { + unique_ptr> createPeripheral(const String& name, const I2CDeviceConfig& deviceConfig, shared_ptr mqttRoot, PeripheralServices& services) override { auto i2cConfig = deviceConfig.parse(defaultAddress); LOGI("Creating %s sensor %s with %s", sensorType.c_str(), name.c_str(), i2cConfig.toString().c_str()); diff --git a/main/peripherals/environment/Sht2xComponent.hpp b/main/peripherals/environment/Sht2xComponent.hpp index 847a3b72..3682838f 100644 --- a/main/peripherals/environment/Sht2xComponent.hpp +++ b/main/peripherals/environment/Sht2xComponent.hpp @@ -27,7 +27,7 @@ class Sht2xComponent Sht2xComponent( const String& name, const String& sensorType, - shared_ptr mqttRoot, + shared_ptr mqttRoot, I2CManager& i2c, I2CConfig config) : Component(name, mqttRoot) diff --git a/main/peripherals/environment/Sht31Component.hpp b/main/peripherals/environment/Sht31Component.hpp index 6a24cd6d..18377343 100644 --- a/main/peripherals/environment/Sht31Component.hpp +++ b/main/peripherals/environment/Sht31Component.hpp @@ -23,7 +23,7 @@ class Sht31Component Sht31Component( const String& name, const String& sensorType, - shared_ptr mqttRoot, + shared_ptr mqttRoot, I2CManager& i2c, I2CConfig config) : Component(name, mqttRoot) diff --git a/main/peripherals/environment/SoilMoistureSensor.hpp b/main/peripherals/environment/SoilMoistureSensor.hpp index bd1ab284..7590e2b3 100644 --- a/main/peripherals/environment/SoilMoistureSensor.hpp +++ b/main/peripherals/environment/SoilMoistureSensor.hpp @@ -30,7 +30,7 @@ class SoilMoistureSensorComponent public: SoilMoistureSensorComponent( const String& name, - shared_ptr mqttRoot, + shared_ptr mqttRoot, const SoilMoistureSensorDeviceConfig& config) : Component(name, mqttRoot) , airValue(config.air.get()) @@ -65,7 +65,7 @@ class SoilMoistureSensorComponent class SoilMoistureSensor : public Peripheral { public: - SoilMoistureSensor(const String& name, shared_ptr mqttRoot, const SoilMoistureSensorDeviceConfig& config) + SoilMoistureSensor(const String& name, shared_ptr mqttRoot, const SoilMoistureSensorDeviceConfig& config) : Peripheral(name, mqttRoot) , sensor(name, mqttRoot, config) { } @@ -85,7 +85,7 @@ class SoilMoistureSensorFactory : PeripheralFactory("environment:soil-moisture", "environment") { } - unique_ptr> createPeripheral(const String& name, const SoilMoistureSensorDeviceConfig& deviceConfig, shared_ptr mqttRoot, PeripheralServices& services) override { + unique_ptr> createPeripheral(const String& name, const SoilMoistureSensorDeviceConfig& deviceConfig, shared_ptr mqttRoot, PeripheralServices& services) override { return std::make_unique(name, mqttRoot, deviceConfig); } }; diff --git a/main/peripherals/fence/ElectricFenceMonitor.hpp b/main/peripherals/fence/ElectricFenceMonitor.hpp index ca0f1f4b..e6fd0324 100644 --- a/main/peripherals/fence/ElectricFenceMonitor.hpp +++ b/main/peripherals/fence/ElectricFenceMonitor.hpp @@ -46,7 +46,7 @@ class ElectricFenceMonitorComponent public: ElectricFenceMonitorComponent( const String& name, - shared_ptr mqttRoot, + shared_ptr mqttRoot, PcntManager& pcnt, const ElectricFenceMonitorDeviceConfig& config) : Component(name, mqttRoot) { @@ -107,7 +107,7 @@ class ElectricFenceMonitorComponent class ElectricFenceMonitor : public Peripheral { public: - ElectricFenceMonitor(const String& name, shared_ptr mqttRoot, PcntManager& pcnt, const ElectricFenceMonitorDeviceConfig& config) + ElectricFenceMonitor(const String& name, shared_ptr mqttRoot, PcntManager& pcnt, const ElectricFenceMonitorDeviceConfig& config) : Peripheral(name, mqttRoot) , monitor(name, mqttRoot, pcnt, config) { } @@ -127,7 +127,7 @@ class ElectricFenceMonitorFactory : PeripheralFactory("electric-fence") { } - unique_ptr> createPeripheral(const String& name, const ElectricFenceMonitorDeviceConfig& deviceConfig, shared_ptr mqttRoot, PeripheralServices& services) override { + unique_ptr> createPeripheral(const String& name, const ElectricFenceMonitorDeviceConfig& deviceConfig, shared_ptr mqttRoot, PeripheralServices& services) override { return std::make_unique(name, mqttRoot, services.pcntManager, deviceConfig); } }; diff --git a/main/peripherals/flow_control/FlowControl.hpp b/main/peripherals/flow_control/FlowControl.hpp index cc8de60c..0e914f80 100644 --- a/main/peripherals/flow_control/FlowControl.hpp +++ b/main/peripherals/flow_control/FlowControl.hpp @@ -5,7 +5,7 @@ #include #include #include -#include +#include #include #include #include @@ -14,7 +14,7 @@ #include #include -using namespace farmhub::kernel::drivers; +using namespace farmhub::kernel::mqtt; using namespace farmhub::peripherals; using namespace farmhub::peripherals::flow_meter; using namespace farmhub::peripherals::valve; @@ -31,7 +31,7 @@ class FlowControl : public Peripheral { public: FlowControl( const String& name, - shared_ptr mqttRoot, + shared_ptr mqttRoot, PcntManager& pcnt, SleepManager& sleepManager, ValveControlStrategy& strategy, @@ -86,7 +86,7 @@ class FlowControlFactory , Motorized(motors) { } - unique_ptr> createPeripheral(const String& name, const FlowControlDeviceConfig& deviceConfig, shared_ptr mqttRoot, PeripheralServices& services) override { + unique_ptr> createPeripheral(const String& name, const FlowControlDeviceConfig& deviceConfig, shared_ptr mqttRoot, PeripheralServices& services) override { auto strategy = deviceConfig.valve.get().createValveControlStrategy(this); auto flowMeterConfig = deviceConfig.flowMeter.get(); diff --git a/main/peripherals/flow_meter/FlowMeter.hpp b/main/peripherals/flow_meter/FlowMeter.hpp index bc56125f..1c9af51e 100644 --- a/main/peripherals/flow_meter/FlowMeter.hpp +++ b/main/peripherals/flow_meter/FlowMeter.hpp @@ -5,12 +5,12 @@ #include #include #include -#include +#include #include #include using namespace farmhub::kernel; -using namespace farmhub::kernel::drivers; +using namespace farmhub::kernel::mqtt; using namespace farmhub::peripherals; using std::make_unique; using std::unique_ptr; @@ -19,7 +19,7 @@ namespace farmhub::peripherals::flow_meter { class FlowMeter : public Peripheral { public: - FlowMeter(const String& name, shared_ptr mqttRoot, PcntManager& pcnt, InternalPinPtr pin, double qFactor, milliseconds measurementFrequency) + FlowMeter(const String& name, shared_ptr mqttRoot, PcntManager& pcnt, InternalPinPtr pin, double qFactor, milliseconds measurementFrequency) : Peripheral(name, mqttRoot) , flowMeter(name, mqttRoot, pcnt, pin, qFactor, measurementFrequency) { } @@ -39,7 +39,7 @@ class FlowMeterFactory : PeripheralFactory("flow-meter") { } - unique_ptr> createPeripheral(const String& name, const FlowMeterDeviceConfig& deviceConfig, shared_ptr mqttRoot, PeripheralServices& services) override { + unique_ptr> createPeripheral(const String& name, const FlowMeterDeviceConfig& deviceConfig, shared_ptr mqttRoot, PeripheralServices& services) override { return make_unique(name, mqttRoot, services.pcntManager, deviceConfig.pin.get(), deviceConfig.qFactor.get(), deviceConfig.measurementFrequency.get()); } }; diff --git a/main/peripherals/flow_meter/FlowMeterComponent.hpp b/main/peripherals/flow_meter/FlowMeterComponent.hpp index 407cff06..24bf48fe 100644 --- a/main/peripherals/flow_meter/FlowMeterComponent.hpp +++ b/main/peripherals/flow_meter/FlowMeterComponent.hpp @@ -11,10 +11,9 @@ #include #include #include +#include -#include - -using namespace farmhub::kernel::drivers; +using namespace farmhub::kernel::mqtt; namespace farmhub::peripherals::flow_meter { @@ -24,7 +23,7 @@ class FlowMeterComponent public: FlowMeterComponent( const String& name, - shared_ptr mqttRoot, + shared_ptr mqttRoot, PcntManager& pcnt, InternalPinPtr pin, double qFactor, diff --git a/main/peripherals/light_sensor/Bh1750.hpp b/main/peripherals/light_sensor/Bh1750.hpp index e50b3bc5..cb462fa3 100644 --- a/main/peripherals/light_sensor/Bh1750.hpp +++ b/main/peripherals/light_sensor/Bh1750.hpp @@ -38,7 +38,7 @@ class Bh1750Component public: Bh1750Component( const String& name, - shared_ptr mqttRoot, + shared_ptr mqttRoot, I2CManager& i2c, I2CConfig config, seconds measurementFrequency, @@ -75,7 +75,7 @@ class Bh1750 public: Bh1750( const String& name, - shared_ptr mqttRoot, + shared_ptr mqttRoot, I2CManager& i2c, const I2CConfig& config, seconds measurementFrequency, @@ -99,7 +99,7 @@ class Bh1750Factory : PeripheralFactory("light-sensor:bh1750", "light-sensor") { } - unique_ptr> createPeripheral(const String& name, const Bh1750DeviceConfig& deviceConfig, shared_ptr mqttRoot, PeripheralServices& services) override { + unique_ptr> createPeripheral(const String& name, const Bh1750DeviceConfig& deviceConfig, shared_ptr mqttRoot, PeripheralServices& services) override { I2CConfig i2cConfig = deviceConfig.parse(0x23); return std::make_unique(name, mqttRoot, services.i2c, i2cConfig, deviceConfig.measurementFrequency.get(), deviceConfig.latencyInterval.get()); } diff --git a/main/peripherals/light_sensor/LightSensor.hpp b/main/peripherals/light_sensor/LightSensor.hpp index da5051d9..e42e6b00 100644 --- a/main/peripherals/light_sensor/LightSensor.hpp +++ b/main/peripherals/light_sensor/LightSensor.hpp @@ -32,7 +32,7 @@ class LightSensorComponent public: LightSensorComponent( const String& name, - shared_ptr mqttRoot, + shared_ptr mqttRoot, seconds measurementFrequency, seconds latencyInterval) : Component(name, mqttRoot) diff --git a/main/peripherals/light_sensor/Tsl2591.hpp b/main/peripherals/light_sensor/Tsl2591.hpp index dae00d97..ec77daad 100644 --- a/main/peripherals/light_sensor/Tsl2591.hpp +++ b/main/peripherals/light_sensor/Tsl2591.hpp @@ -39,7 +39,7 @@ class Tsl2591Component public: Tsl2591Component( const String& name, - shared_ptr mqttRoot, + shared_ptr mqttRoot, I2CManager& i2c, I2CConfig config, seconds measurementFrequency, @@ -80,7 +80,7 @@ class Tsl2591 public: Tsl2591( const String& name, - shared_ptr mqttRoot, + shared_ptr mqttRoot, I2CManager& i2c, const I2CConfig& config, seconds measurementFrequency, @@ -104,7 +104,7 @@ class Tsl2591Factory : PeripheralFactory("light-sensor:tsl2591", "light-sensor") { } - unique_ptr> createPeripheral(const String& name, const Tsl2591DeviceConfig& deviceConfig, shared_ptr mqttRoot, PeripheralServices& services) override { + unique_ptr> createPeripheral(const String& name, const Tsl2591DeviceConfig& deviceConfig, shared_ptr mqttRoot, PeripheralServices& services) override { I2CConfig i2cConfig = deviceConfig.parse(TSL2591_ADDR); return std::make_unique(name, mqttRoot, services.i2c, i2cConfig, deviceConfig.measurementFrequency.get(), deviceConfig.latencyInterval.get()); } diff --git a/main/peripherals/multiplexer/Xl9535.hpp b/main/peripherals/multiplexer/Xl9535.hpp index 31d8005c..d13c0ce6 100644 --- a/main/peripherals/multiplexer/Xl9535.hpp +++ b/main/peripherals/multiplexer/Xl9535.hpp @@ -19,7 +19,7 @@ class Xl9535Component public: Xl9535Component( const String& name, - shared_ptr mqttRoot, + shared_ptr mqttRoot, I2CManager& i2c, I2CConfig config) : Component(name, mqttRoot) @@ -122,7 +122,7 @@ class Xl9535Pin : public Pin { class Xl9535 : public Peripheral { public: - Xl9535(const String& name, shared_ptr mqttRoot, I2CManager& i2c, I2CConfig config) + Xl9535(const String& name, shared_ptr mqttRoot, I2CManager& i2c, I2CConfig config) : Peripheral(name, mqttRoot) , component(name, mqttRoot, i2c, config) { @@ -147,7 +147,7 @@ class Xl9535Factory : PeripheralFactory("multiplexer:xl9535") { } - unique_ptr> createPeripheral(const String& name, const Xl9535DeviceConfig& deviceConfig, shared_ptr mqttRoot, PeripheralServices& services) override { + unique_ptr> createPeripheral(const String& name, const Xl9535DeviceConfig& deviceConfig, shared_ptr mqttRoot, PeripheralServices& services) override { return make_unique(name, mqttRoot, services.i2c, deviceConfig.parse()); } }; diff --git a/main/peripherals/valve/Valve.hpp b/main/peripherals/valve/Valve.hpp index 157a7054..d51d0645 100644 --- a/main/peripherals/valve/Valve.hpp +++ b/main/peripherals/valve/Valve.hpp @@ -37,7 +37,7 @@ class Valve const String& name, SleepManager& sleepManager, ValveControlStrategy& strategy, - shared_ptr mqttRoot) + shared_ptr mqttRoot) : Peripheral(name, mqttRoot) , valve(name, sleepManager, strategy, mqttRoot, [this]() { publishTelemetry(); @@ -71,7 +71,7 @@ class ValveFactory , Motorized(motors) { } - unique_ptr> createPeripheral(const String& name, const ValveDeviceConfig& deviceConfig, shared_ptr mqttRoot, PeripheralServices& services) override { + unique_ptr> createPeripheral(const String& name, const ValveDeviceConfig& deviceConfig, shared_ptr mqttRoot, PeripheralServices& services) override { auto strategy = deviceConfig.createValveControlStrategy(this); return make_unique(name, services.sleepManager, *strategy, mqttRoot); } diff --git a/main/peripherals/valve/ValveComponent.hpp b/main/peripherals/valve/ValveComponent.hpp index d24e6e72..688379bf 100644 --- a/main/peripherals/valve/ValveComponent.hpp +++ b/main/peripherals/valve/ValveComponent.hpp @@ -204,7 +204,7 @@ class ValveComponent : public Component { const String& name, SleepManager& sleepManager, ValveControlStrategy& strategy, - shared_ptr mqttRoot, + shared_ptr mqttRoot, std::function publishTelemetry) : Component(name, mqttRoot) , sleepManager(sleepManager)