Skip to content

Commit

Permalink
Add MQTT status update subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
AniruddhaKanhere committed Nov 8, 2024
1 parent fe077ba commit 2ea3d01
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 1 deletion.
12 changes: 12 additions & 0 deletions core-bus-aws-iot-mqtt/include/ggl/core_bus/aws_iot_mqtt.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,16 @@ GglError ggl_aws_iot_mqtt_subscribe_parse_resp(
GglObject data, GglBuffer **topic, GglBuffer **payload
);

/// Wrapper for core-bus `aws_iot_mqtt` `connection_status`
GglError ggl_aws_iot_mqtt_connection_status(
GglSubscribeCallback on_response,
GglSubscribeCloseCallback on_close,
void *ctx,
uint32_t *handle
);

GglError ggl_aws_iot_mqtt_connection_status_parse(
GglObject data, bool *connection_status
);

#endif
42 changes: 42 additions & 0 deletions core-bus-aws-iot-mqtt/src/aws_iot_mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,45 @@ GglError ggl_aws_iot_mqtt_subscribe_parse_resp(

return GGL_ERR_OK;
}

/// Call this API to subscribe to MQTT connection status. To parse the data
/// received from the subscription, call
/// ggl_aws_iot_mqtt_connection_status_parse function which will return a true
/// for connected and a false for not connected.
///
/// Note that when a subscription is accepted, the current MQTT status is sent
/// to the subscribers.
GglError ggl_aws_iot_mqtt_connection_status(
GglSubscribeCallback on_response,
GglSubscribeCloseCallback on_close,
void *ctx,
uint32_t *handle
) {
// The GGL subscribe API expects a map. Sending a dummy map.
GglMap args = GGL_MAP();
return ggl_subscribe(
GGL_STR("aws_iot_mqtt"),
GGL_STR("connection_status"),
args,
on_response,
on_close,
ctx,
NULL,
handle
);
}

GglError ggl_aws_iot_mqtt_connection_status_parse(
GglObject data, bool *connection_status
) {
if (data.type != GGL_TYPE_BOOLEAN) {
GGL_LOGE(
"MQTT connection status subscription response is not a boolean."
);
return GGL_ERR_FAILURE;
}

*connection_status = data.boolean;

return GGL_ERR_OK;
}
2 changes: 1 addition & 1 deletion core-bus/include/ggl/core_bus/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
/// Maximum number of core-bus connections.
/// Can be configured with `-DGGL_COREBUS_CLIENT_MAX_SUBSCRIPTIONS=<N>`.
#ifndef GGL_COREBUS_CLIENT_MAX_SUBSCRIPTIONS
#define GGL_COREBUS_CLIENT_MAX_SUBSCRIPTIONS 50
#define GGL_COREBUS_CLIENT_MAX_SUBSCRIPTIONS 100
#endif

/// Send a Core Bus notification (call, but don't wait for response).
Expand Down
25 changes: 25 additions & 0 deletions iotcored/src/bus_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

static void rpc_publish(void *ctx, GglMap params, uint32_t handle);
static void rpc_subscribe(void *ctx, GglMap params, uint32_t handle);
static void rpc_get_status(void *ctx, GglMap params, uint32_t handle);

void iotcored_start_server(IotcoredArgs *args) {
GglRpcMethodDesc handlers[] = {
{ GGL_STR("publish"), false, rpc_publish, NULL },
{ GGL_STR("subscribe"), true, rpc_subscribe, NULL },
{ GGL_STR("connection_status"), true, rpc_get_status, NULL },
};
size_t handlers_len = sizeof(handlers) / sizeof(handlers[0]);

Expand Down Expand Up @@ -178,3 +180,26 @@ static void rpc_subscribe(void *ctx, GglMap params, uint32_t handle) {

ggl_sub_accept(handle, sub_close_callback, NULL);
}

static void mqtt_status_sub_close_callback(void *ctx, uint32_t handle) {
(void) ctx;
iotcored_mqtt_status_update_unregister(handle);
}

static void rpc_get_status(void *ctx, GglMap params, uint32_t handle) {
(void) ctx;
(void) params;

GglError ret = iotcored_mqtt_status_update_register(handle);
if (ret != GGL_ERR_OK) {
ggl_return_err(handle, ret);
return;
}

ggl_sub_accept(handle, mqtt_status_sub_close_callback, NULL);

// Send a status update as soon as a subscription is accepted.
iotcored_mqtt_status_update_send(
GGL_OBJ_BOOL(iotcored_mqtt_connection_status())
);
}
14 changes: 14 additions & 0 deletions iotcored/src/mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,9 @@ noreturn static void *mqtt_recv_thread_fn(void *arg) {
_Exit(1);
}

// Send status update to indicate mqtt (re)connection.
iotcored_mqtt_status_update_send(GGL_OBJ_BOOL(true));

// Send a fleet status update on reconnection
if (reconnect) {
// Resubscribe to all subscriptions.
Expand Down Expand Up @@ -379,6 +382,9 @@ noreturn static void *mqtt_recv_thread_fn(void *arg) {
(void) MQTT_Disconnect(ctx);
iotcored_tls_cleanup(ctx->transportInterface.pNetworkContext->tls_ctx);

// Send status update to indicate mqtt disconnection.
iotcored_mqtt_status_update_send(GGL_OBJ_BOOL(false));

GGL_LOGE("Removing all IoT core subscriptions");

// Set reconnect flag. Future connections will send a fleet status
Expand Down Expand Up @@ -505,6 +511,14 @@ GglError iotcored_mqtt_connect(const IotcoredArgs *args) {
return GGL_ERR_OK;
}

bool iotcored_mqtt_connection_status(void) {
bool connected = false;
if (MQTT_CheckConnectStatus(&mqtt_ctx) == MQTTStatusConnected) {
connected = true;
}
return connected;
}

GglError iotcored_mqtt_publish(const IotcoredMsg *msg, uint8_t qos) {
assert(msg != NULL);

Expand Down
2 changes: 2 additions & 0 deletions iotcored/src/mqtt.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ typedef struct {

GglError iotcored_mqtt_connect(const IotcoredArgs *args);

bool iotcored_mqtt_connection_status(void);

GglError iotcored_mqtt_publish(const IotcoredMsg *msg, uint8_t qos);
GglError iotcored_mqtt_subscribe(
GglBuffer *topic_filters, size_t count, uint8_t qos
Expand Down
34 changes: 34 additions & 0 deletions iotcored/src/subscription_dispatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ static uint32_t handles[IOTCORED_MAX_SUBSCRIPTIONS];
static uint8_t topic_qos[IOTCORED_MAX_SUBSCRIPTIONS];
static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;

static uint32_t mqtt_status_handles[IOTCORED_MAX_SUBSCRIPTIONS];
static pthread_mutex_t mqtt_status_mtx = PTHREAD_MUTEX_INITIALIZER;

static GglBuffer topic_filter_buf(size_t index) {
return ggl_buffer_substr(
GGL_BUF(sub_topic_filters[index]), 0, topic_filter_len[index]
Expand Down Expand Up @@ -142,6 +145,37 @@ void iotcored_mqtt_receive(const IotcoredMsg *msg) {
}
}

GglError iotcored_mqtt_status_update_register(uint32_t handle) {
GGL_MTX_SCOPE_GUARD(&mqtt_status_mtx);
for (size_t i = 0; i < IOTCORED_MAX_SUBSCRIPTIONS; i++) {
if (mqtt_status_handles[i] == 0) {
mqtt_status_handles[i] = handle;
return GGL_ERR_OK;
}
}
return GGL_ERR_NOMEM;
}

void iotcored_mqtt_status_update_unregister(uint32_t handle) {
GGL_MTX_SCOPE_GUARD(&mqtt_status_mtx);
for (size_t i = 0; i < IOTCORED_MAX_SUBSCRIPTIONS; i++) {
if (mqtt_status_handles[i] == handle) {
mqtt_status_handles[i] = 0;
return;
}
}
}

void iotcored_mqtt_status_update_send(GglObject status) {
GGL_MTX_SCOPE_GUARD(&mqtt_status_mtx);

for (size_t i = 0; i < IOTCORED_MAX_SUBSCRIPTIONS; i++) {
if (mqtt_status_handles[i] != 0) {
ggl_respond(mqtt_status_handles[i], status);
}
}
}

void iotcored_re_register_all_subs(void) {
GGL_MTX_SCOPE_GUARD(&mtx);

Expand Down
7 changes: 7 additions & 0 deletions iotcored/src/subscription_dispatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include <ggl/buffer.h>
#include <ggl/error.h>
#include <ggl/object.h>
#include <stddef.h>
#include <stdint.h>

Expand All @@ -18,4 +19,10 @@ void iotcored_unregister_subscriptions(uint32_t handle);

void iotcored_re_register_all_subs(void);

GglError iotcored_mqtt_status_update_register(uint32_t handle);

void iotcored_mqtt_status_update_unregister(uint32_t handle);

void iotcored_mqtt_status_update_send(GglObject status);

#endif

0 comments on commit 2ea3d01

Please sign in to comment.