Skip to content

Commit

Permalink
Logger: Refactor mavlink logging
Browse files Browse the repository at this point in the history
Writing to two separate fifos (normal and delayed)
Delay stopping until all reliable messages are sent
Resending after ack timeout
  • Loading branch information
jnippula committed Feb 16, 2024
1 parent 72070fc commit dbe1996
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 72 deletions.
6 changes: 3 additions & 3 deletions src/modules/logger/log_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ void LogWriter::thread_stop()
}
}

int LogWriter::write_message(LogType type, void *ptr, size_t size, uint64_t dropout_start, bool reliable, bool wait)
int LogWriter::write_message(LogType type, void *ptr, size_t size, uint64_t dropout_start, ULogWriteType wrtype)
{
int ret_file = 0, ret_mavlink = 0;

Expand All @@ -169,8 +169,8 @@ int LogWriter::write_message(LogType type, void *ptr, size_t size, uint64_t drop
if (_log_writer_mavlink_for_write && type == LogType::Full) {
#ifdef LOGGER_PARALLEL_LOGGING

if (reliable) {
ret_mavlink = _log_writer_mavlink_for_write->write_reliable_message(ptr, size, wait);
if (wrtype != ULogWriteType::BEST_EFFORT) {
ret_mavlink = _log_writer_mavlink_for_write->write_reliable_message(ptr, size, wrtype);

} else
#endif
Expand Down
16 changes: 13 additions & 3 deletions src/modules/logger/log_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

#include "log_writer_file.h"
#include "log_writer_mavlink.h"
#include "messages.h"

namespace px4
{
Expand Down Expand Up @@ -74,10 +75,19 @@ class LogWriter
void stop_log_mavlink();

#ifdef LOGGER_PARALLEL_LOGGING

void allow_delayed_sending()
{
if (_log_writer_mavlink) { _log_writer_mavlink->allow_delayed_sending(); }
}
void stop_log_mavlink_req()
{
if (_log_writer_mavlink) { _log_writer_mavlink->stop_log_req(); }
}
void wait_fifo_empty()
{
if (_log_writer_mavlink) {
_log_writer_mavlink->wait_fifo_count(0);
_log_writer_mavlink->wait_fifos_empty();

while (_log_writer_mavlink->reliable_fifo_is_sending()) {
usleep(10000);
Expand All @@ -103,8 +113,8 @@ class LogWriter
* -1 if not enough space in the buffer left (file backend), -2 mavlink backend failed
* add type -> pass through, but not to mavlink if mission log
*/
int write_message(LogType type, void *ptr, size_t size, uint64_t dropout_start = 0, bool reliable = false,
bool wait = false);
int write_message(LogType type, void *ptr, size_t size, uint64_t dropout_start = 0,
ULogWriteType wrtype = ULogWriteType::BEST_EFFORT);

/**
* Select a backend, so that future calls to write_message() only write to the selected
Expand Down
145 changes: 102 additions & 43 deletions src/modules/logger/log_writer_mavlink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
****************************************************************************/

#include "log_writer_mavlink.h"
#include "messages.h"

#include <drivers/drv_hrt.h>
#include <mathlib/mathlib.h>
Expand All @@ -58,37 +57,60 @@ LogWriterMavlink::LogWriterMavlink()
#ifdef LOGGER_PARALLEL_LOGGING
ReliableMsg *LogWriterMavlink::reliable_fifo_pop()
{
ReliableMsg *node = nullptr;
pthread_mutex_lock(&_fifo.mtx);
PX4_DEBUG("reliable POP - wait..");

while (_fifo.empty() && !_fifo.sender_should_exit.load()) {
while ((_fifo.empty() && (!_delayed_sending_allowed || _fifo_delayed.empty())) && !_fifo.sender_should_exit.load()) {
pthread_cond_wait(&_fifo.cv, &_fifo.mtx);
}

PX4_DEBUG("reliable POP: signalled");
ReliableMsg *node = _fifo.getHead();
_fifo.remove(node);

if (node) {
_fifo.sending = true;
if (!_fifo.empty()) {
node = _fifo.getHead();
_fifo.remove(node);

if (node) {
_fifo.sending = true;
}

} else if (_delayed_sending_allowed && !_fifo_delayed.empty()) {
node = _fifo_delayed.getHead();
_fifo_delayed.remove(node);

if (node) {
PX4_DEBUG("reliable delayed POP - sending");
_fifo.sending = true;
}
}

pthread_mutex_unlock(&_fifo.mtx);
return node;
}

bool LogWriterMavlink::reliable_fifo_push(ReliableMsg *node)
bool LogWriterMavlink::reliable_fifo_push(ReliableMsg *node, bool delayed)
{
if (!node) {
PX4_ERR("[reliable_fifo_push] nullptr");
return false;
}

pthread_mutex_lock(&_fifo.mtx);
_fifo.add(node);
PX4_DEBUG("reliable PUSH - signal sender");
pthread_cond_signal(&_fifo.cv);
pthread_mutex_unlock(&_fifo.mtx);
if (delayed) {
pthread_mutex_lock(&_fifo.mtx);
_fifo_delayed.add(node);
PX4_DEBUG("reliable delayed PUSH - signal sender");
pthread_cond_signal(&_fifo.cv);
pthread_mutex_unlock(&_fifo.mtx);

} else {
pthread_mutex_lock(&_fifo.mtx);
_fifo.add(node);
PX4_DEBUG("reliable PUSH - signal sender");
pthread_cond_signal(&_fifo.cv);
pthread_mutex_unlock(&_fifo.mtx);
}

return true;
}

Expand All @@ -115,6 +137,17 @@ void LogWriterMavlink::wait_fifo_count(size_t count)
}
}

void LogWriterMavlink::wait_fifos_empty()
{
if (!_delayed_sending_allowed) {
PX4_WARN("Parallel-logging - delayed fifo sending not allowed while stopping");
}

while ((_delayed_sending_allowed && !_fifo_delayed.empty()) || !_fifo.empty()) {
usleep(30000);
}
}

size_t LogWriterMavlink::reliable_fifo_count()
{
size_t count = 0;
Expand All @@ -139,7 +172,7 @@ void LogWriterMavlink::mav_reliable_sender()
PX4_DEBUG("[sender] - msg:%p", msg);

if (msg) {
write_message(msg->data, msg->len, true);
write_message(msg->data, msg->len, msg->wrtype);
PX4_DEBUG("[sender] - delete msg");
delete msg;
reliable_fifo_set_sender_idle();
Expand All @@ -154,7 +187,7 @@ bool LogWriterMavlink::init()
pthread_attr_t thr_attr;
pthread_attr_init(&thr_attr);
pthread_attr_setstacksize(&thr_attr, PX4_STACK_ADJUSTED(8500));
PX4_INFO("create mav_reliable_sender_thread");
PX4_DEBUG("create mav_reliable_sender_thread");
int ret = pthread_create(&_mav_reliable_sender_thread, &thr_attr, &LogWriterMavlink::mav_reliable_sender_helper, this);

if (ret) {
Expand Down Expand Up @@ -204,6 +237,8 @@ void LogWriterMavlink::start_log()
_ulog_stream_acked_data.msg_sequence = 0;
_ulog_stream_acked_data.length = 0;
_ulog_stream_acked_data.first_message_offset = 0;
_stop_log_request = false;
_delayed_sending_allowed = false;
#endif
_is_started = true;
}
Expand All @@ -217,7 +252,7 @@ void LogWriterMavlink::stop_log()
_is_started = false;
}

int LogWriterMavlink::write_message(void *ptr, size_t size, bool reliable)
int LogWriterMavlink::write_message(void *ptr, size_t size, ULogWriteType wrtype)
{
if (!is_started()) {
return 0;
Expand All @@ -227,7 +262,7 @@ int LogWriterMavlink::write_message(void *ptr, size_t size, bool reliable)

#ifdef LOGGER_PARALLEL_LOGGING

if (reliable) {
if (wrtype != ULogWriteType::BEST_EFFORT) {
ulog_s_p = &_ulog_stream_acked_data;

} else {
Expand All @@ -253,7 +288,7 @@ int LogWriterMavlink::write_message(void *ptr, size_t size, bool reliable)
size -= send_len;

if (ulog_s_p->length >= data_len) {
if (publish_message(reliable)) {
if (publish_message(wrtype)) {
return -2;
}
}
Expand All @@ -263,10 +298,17 @@ int LogWriterMavlink::write_message(void *ptr, size_t size, bool reliable)
}

#ifdef LOGGER_PARALLEL_LOGGING
int LogWriterMavlink::write_reliable_message(void *ptr, size_t size, bool wait)
int LogWriterMavlink::write_reliable_message(void *ptr, size_t size, ULogWriteType wrtype)
{
if (wait) {
bool delayed;

if (wrtype == ULogWriteType::RELIABLE_START_STEPS) {
wait_fifo_count(LOGGER_RELIABLE_FIFO_WAIT_THRESHOLD);
delayed = false;

} else {
// ULogWriteType::RELIABLE_TOPIC_DATA
delayed = true;
}

uint8_t *p = (uint8_t *) ptr;
Expand All @@ -278,7 +320,8 @@ int LogWriterMavlink::write_reliable_message(void *ptr, size_t size, bool wait)
memcpy(msg->data, p, len);
p += len;
msg->len = len;
reliable_fifo_push(msg);
msg->wrtype = wrtype;
reliable_fifo_push(msg, delayed);
}

return 0;
Expand All @@ -299,13 +342,13 @@ void LogWriterMavlink::set_need_reliable_transfer(bool need_reliable)
}
#endif

int LogWriterMavlink::publish_message(bool reliable)
int LogWriterMavlink::publish_message(ULogWriteType wrtype)
{
ulog_stream_s *ulog_s_p;

#ifdef LOGGER_PARALLEL_LOGGING

if (reliable) {
if (wrtype != ULogWriteType::BEST_EFFORT) {
ulog_s_p = &_ulog_stream_acked_data;

} else {
Expand All @@ -321,7 +364,7 @@ int LogWriterMavlink::publish_message(bool reliable)

#ifdef LOGGER_PARALLEL_LOGGING

if (!reliable) {
if (wrtype == ULogWriteType::BEST_EFFORT) {
_ulog_stream_pub.publish(*ulog_s_p);

} else {
Expand All @@ -345,35 +388,51 @@ int LogWriterMavlink::publish_message(bool reliable)
bool got_ack = false;
const int timeout_ms = ulog_stream_ack_s::ACK_TIMEOUT * ulog_stream_ack_s::ACK_MAX_TRIES;

hrt_abstime started = hrt_absolute_time();
while (!got_ack) {

do {
int ret = px4_poll(fds, sizeof(fds) / sizeof(fds[0]), timeout_ms);
hrt_abstime started = hrt_absolute_time();

if (ret <= 0) {
break;
}
do {
int ret = px4_poll(fds, sizeof(fds) / sizeof(fds[0]), timeout_ms);

if (fds[0].revents & POLLIN) {
ulog_stream_ack_s ack;
orb_copy(ORB_ID(ulog_stream_ack), _ulog_stream_ack_sub, &ack);
if (ret <= 0) {
break;
}

if (fds[0].revents & POLLIN) {
ulog_stream_ack_s ack;
orb_copy(ORB_ID(ulog_stream_ack), _ulog_stream_ack_sub, &ack);

if (ack.msg_sequence == ulog_s_p->msg_sequence) {
got_ack = true;
if (ack.msg_sequence == ulog_s_p->msg_sequence) {
got_ack = true;
}

} else {
break;
}
} while (!got_ack && hrt_elapsed_time(&started) / 1000 < timeout_ms);

} else {
break;
if (!got_ack) {
PX4_ERR("Ack timeout");
#ifdef LOGGER_PARALLEL_LOGGING

if (_stop_log_request) {
stop_log();
return -2;

} else {
PX4_ERR("Resending..");
_ulog_stream_acked_pub.publish(*ulog_s_p);
}

#else
stop_log();
return -2;
#endif
}
} while (!got_ack && hrt_elapsed_time(&started) / 1000 < timeout_ms);

if (!got_ack) {
PX4_ERR("Ack timeout. Stopping mavlink log");
stop_log();
return -2;
PX4_DEBUG("got ack in %i ms", (int)(hrt_elapsed_time(&started) / 1000));
}

PX4_DEBUG("got ack in %i ms", (int)(hrt_elapsed_time(&started) / 1000));
}

ulog_s_p->msg_sequence++;
Expand Down
Loading

0 comments on commit dbe1996

Please sign in to comment.