From 53848e30e56a820983446f310886797f758c900c Mon Sep 17 00:00:00 2001 From: Adam Shapiro Date: Wed, 13 Dec 2023 18:41:33 -0500 Subject: [PATCH 1/5] Added C++ message_decode example using FusionEngineFramer. Renamed the previous example to raw_message_decode. --- README.md | 4 +- examples/CMakeLists.txt | 1 + examples/message_decode/BUILD | 7 +- examples/message_decode/message_decode.cc | 98 ++------------ examples/raw_message_decode/BUILD | 15 +++ examples/raw_message_decode/CMakeLists.txt | 3 + .../raw_message_decode/raw_message_decode.cc | 127 ++++++++++++++++++ 7 files changed, 167 insertions(+), 88 deletions(-) create mode 100644 examples/raw_message_decode/BUILD create mode 100644 examples/raw_message_decode/CMakeLists.txt create mode 100644 examples/raw_message_decode/raw_message_decode.cc diff --git a/README.md b/README.md index 48606a23..e9d5a1f5 100644 --- a/README.md +++ b/README.md @@ -74,11 +74,13 @@ One FusionEngine or a Point One device (Atlas, Quectel LG69T, etc.), please cont #### Example Applications The `examples/` directory contains example applications demonstrating how to use this library. They are: -- `message_decode` - Print the contents of messages contained in a binary file. - `external_cmake_project` - Download a copy of the FusionEngine Client library from the public repository and import it into a CMake project using `FetchContent`. - `generate_data` - Generate a binary file containing a fixed set of messages. - `lband_decode` - Example of decoding RTCM corrections from a recorded file containing LBandFrameMessage. +- `message_decode` - Parse and print the contents of messages contained in a binary file using the `FusionEngineFramer` + class. +- `raw_message_decode` - Parse and print the contents of messages directly from a binary file without a helper class. - `request_version` - Simulate sending a request for a version info message, and waiting for a response. - `tcp_client` - Connect to a device over TCP and display the received FusionEngine messages. - `udp_client` - Connect to a device over UDP and display the received FusionEngine messages. diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 84ffc17b..c633a71d 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -3,6 +3,7 @@ add_subdirectory(common) add_subdirectory(generate_data) add_subdirectory(lband_decode) add_subdirectory(message_decode) +add_subdirectory(raw_message_decode) add_subdirectory(request_version) add_subdirectory(tcp_client) add_subdirectory(udp_client) diff --git a/examples/message_decode/BUILD b/examples/message_decode/BUILD index b66d8358..6b506067 100644 --- a/examples/message_decode/BUILD +++ b/examples/message_decode/BUILD @@ -6,10 +6,15 @@ cc_binary( "message_decode.cc", ], data = [ - "example_data.p1log", + ":example_data", ], deps = [ "//common:print_message", "@fusion_engine_client", ], ) + +filegroup( + name = "example_data", + srcs = ["example_data.p1log"], +) diff --git a/examples/message_decode/message_decode.cc b/examples/message_decode/message_decode.cc index 56db32fd..1f234c00 100644 --- a/examples/message_decode/message_decode.cc +++ b/examples/message_decode/message_decode.cc @@ -8,84 +8,13 @@ #include #include -#include +#include #include "../common/print_message.h" using namespace point_one::fusion_engine::examples; using namespace point_one::fusion_engine::messages; - -/******************************************************************************/ -bool DecodeMessage(std::ifstream& stream, size_t available_bytes) { - static uint32_t expected_sequence_number = 0; - - // Enforce a 4-byte aligned address. - alignas(4) uint8_t storage[4096]; - char* buffer = reinterpret_cast(storage); - - // Read the message header. - if (available_bytes < sizeof(MessageHeader)) { - printf("Not enough data: cannot read header. [%zu bytes < %zu bytes]\n", - available_bytes, sizeof(MessageHeader)); - return false; - } - - stream.read(buffer, sizeof(MessageHeader)); - if (!stream) { - printf("Unexpected error reading header.\n"); - return false; - } - - available_bytes -= sizeof(MessageHeader); - - auto& header = *reinterpret_cast(buffer); - buffer += sizeof(MessageHeader); - - // Read the message payload. - if (available_bytes < header.payload_size_bytes) { - printf("Not enough data: cannot read payload. [%zu bytes < %u bytes]\n", - available_bytes, header.payload_size_bytes); - return false; - } - - stream.read(buffer, header.payload_size_bytes); - if (!stream) { - printf("Unexpected error reading payload.\n"); - return false; - } - - // Verify the message checksum. - size_t message_size = sizeof(MessageHeader) + header.payload_size_bytes; - if (!IsValid(storage)) { - printf( - "CRC failure. [type=%s (%u), size=%zu bytes (payload size=%u bytes], " - "sequence=%u, expected_crc=0x%08x, calculated_crc=0x%08x]\n", - to_string(header.message_type), - static_cast(header.message_type), message_size, - header.payload_size_bytes, header.sequence_number, header.crc, - CalculateCRC(storage)); - return false; - } - - // Check that the sequence number increments as expected. - if (header.sequence_number != expected_sequence_number) { - printf( - "Warning: unexpected sequence number. [type=%s (%u), size=%zu bytes " - "(payload size=%u bytes], crc=0x%08x, expected_sequence=%u, " - "received_sequence=%u]\n", - to_string(header.message_type), - static_cast(header.message_type), message_size, - header.payload_size_bytes, header.crc, expected_sequence_number, - header.sequence_number); - } - - expected_sequence_number = header.sequence_number + 1; - - // Interpret the payload. - PrintMessage(header, buffer); - - return true; -} +using namespace point_one::fusion_engine::parsers; /******************************************************************************/ int main(int argc, const char* argv[]) { @@ -104,23 +33,20 @@ Decode platform pose messages from a binary file containing FusionEngine data. return 1; } - // Determine the file size. - stream.seekg(0, stream.end); - std::streampos file_size_bytes = stream.tellg(); - stream.seekg(0, stream.beg); + // Create a decoder and configure it to print when messaes arrive. + FusionEngineFramer framer(MessageHeader::MAX_MESSAGE_SIZE_BYTES); + framer.SetMessageCallback(PrintMessage); - // Decode all messages in the file. - int return_code = 0; - while (stream.tellg() != file_size_bytes) { - if (!DecodeMessage(stream, - static_cast(file_size_bytes - stream.tellg()))) { - return_code = 1; - break; - } + // Read the file in chunks and decode any messages that are found. + uint8_t buffer[4096]; + while (!stream.eof()) { + stream.read(reinterpret_cast(buffer), sizeof(buffer)); + size_t bytes_read = stream.gcount(); + framer.OnData(buffer, bytes_read); } // Close the file. stream.close(); - return return_code; + return 0; } diff --git a/examples/raw_message_decode/BUILD b/examples/raw_message_decode/BUILD new file mode 100644 index 00000000..96af1e4e --- /dev/null +++ b/examples/raw_message_decode/BUILD @@ -0,0 +1,15 @@ +package(default_visibility = ["//visibility:public"]) + +cc_binary( + name = "raw_message_decode", + srcs = [ + "raw_message_decode.cc", + ], + data = [ + "//message_decode:example_data", + ], + deps = [ + "//common:print_message", + "@fusion_engine_client", + ], +) diff --git a/examples/raw_message_decode/CMakeLists.txt b/examples/raw_message_decode/CMakeLists.txt new file mode 100644 index 00000000..b3b98845 --- /dev/null +++ b/examples/raw_message_decode/CMakeLists.txt @@ -0,0 +1,3 @@ +add_executable(raw_message_decode raw_message_decode.cc) +target_link_libraries(raw_message_decode PUBLIC fusion_engine_client) +target_link_libraries(raw_message_decode PUBLIC print_message) diff --git a/examples/raw_message_decode/raw_message_decode.cc b/examples/raw_message_decode/raw_message_decode.cc new file mode 100644 index 00000000..3f0c9b97 --- /dev/null +++ b/examples/raw_message_decode/raw_message_decode.cc @@ -0,0 +1,127 @@ +/**************************************************************************/ /** +* @brief Example of decoding FusionEngine messages from a recorded file directly +* without the use of the @ref FusionEngineDecoder class. +* @file +******************************************************************************/ + +#include +#include +#include + +#include +#include + +#include "../common/print_message.h" + +using namespace point_one::fusion_engine::examples; +using namespace point_one::fusion_engine::messages; + +/******************************************************************************/ +bool DecodeMessage(std::ifstream& stream, size_t available_bytes) { + static uint32_t expected_sequence_number = 0; + + // Enforce a 4-byte aligned address. + alignas(4) uint8_t storage[4096]; + char* buffer = reinterpret_cast(storage); + + // Read the message header. + if (available_bytes < sizeof(MessageHeader)) { + printf("Not enough data: cannot read header. [%zu bytes < %zu bytes]\n", + available_bytes, sizeof(MessageHeader)); + return false; + } + + stream.read(buffer, sizeof(MessageHeader)); + if (!stream) { + printf("Unexpected error reading header.\n"); + return false; + } + + available_bytes -= sizeof(MessageHeader); + + auto& header = *reinterpret_cast(buffer); + buffer += sizeof(MessageHeader); + + // Read the message payload. + if (available_bytes < header.payload_size_bytes) { + printf("Not enough data: cannot read payload. [%zu bytes < %u bytes]\n", + available_bytes, header.payload_size_bytes); + return false; + } + + stream.read(buffer, header.payload_size_bytes); + if (!stream) { + printf("Unexpected error reading payload.\n"); + return false; + } + + // Verify the message checksum. + size_t message_size = sizeof(MessageHeader) + header.payload_size_bytes; + if (!IsValid(storage)) { + printf( + "CRC failure. [type=%s (%u), size=%zu bytes (payload size=%u bytes], " + "sequence=%u, expected_crc=0x%08x, calculated_crc=0x%08x]\n", + to_string(header.message_type), + static_cast(header.message_type), message_size, + header.payload_size_bytes, header.sequence_number, header.crc, + CalculateCRC(storage)); + return false; + } + + // Check that the sequence number increments as expected. + if (header.sequence_number != expected_sequence_number) { + printf( + "Warning: unexpected sequence number. [type=%s (%u), size=%zu bytes " + "(payload size=%u bytes], crc=0x%08x, expected_sequence=%u, " + "received_sequence=%u]\n", + to_string(header.message_type), + static_cast(header.message_type), message_size, + header.payload_size_bytes, header.crc, expected_sequence_number, + header.sequence_number); + } + + expected_sequence_number = header.sequence_number + 1; + + // Interpret the payload. + PrintMessage(header, buffer); + + return true; +} + +/******************************************************************************/ +int main(int argc, const char* argv[]) { + if (argc != 2) { + printf("Usage: %s FILE\n", argv[0]); + printf(R"EOF( +Decode platform pose messages from a binary file containing FusionEngine data. +)EOF"); + return 0; + } + + // Open the file. + std::ifstream stream(argv[1], std::ifstream::binary); + if (!stream) { + printf("Error opening file '%s'.\n", argv[1]); + return 1; + } + + // Determine the file size. + stream.seekg(0, stream.end); + std::streampos file_size_bytes = stream.tellg(); + stream.seekg(0, stream.beg); + + // Decode all messages in the file. + int return_code = 0; + while (stream.tellg() != file_size_bytes) { + if (!DecodeMessage(stream, + static_cast(file_size_bytes - stream.tellg()))) { + return_code = 1; + break; + } + } + + // Close the file. + stream.close(); + + return return_code; +} From f612c4b27a5f4fb291dc4e422e24ad844e63a405 Mon Sep 17 00:00:00 2001 From: Adam Shapiro Date: Wed, 13 Dec 2023 18:41:40 -0500 Subject: [PATCH 2/5] Fixed TOC link in README. --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index e9d5a1f5..919f3a31 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,7 @@ One FusionEngine or a Point One device (Atlas, Quectel LG69T, etc.), please cont * [Running Examples](#running-examples) * [Bazel](#bazel) * [Including In Your Bazel Project](#including-in-your-bazel-project) - * [Compiling From Source](#compiling-from-so + * [Compiling From Source](#compiling-from-source) * [Running Examples](#running-examples-1) * [Python](#python) * [Compiling Documentation](#compiling-documentation) From 2e762525ff9870df19b9114e37673e14544289b3 Mon Sep 17 00:00:00 2001 From: Adam Shapiro Date: Wed, 13 Dec 2023 18:42:21 -0500 Subject: [PATCH 3/5] Explicitly deserialize the reserved bytes in the MessageHeader. --- python/fusion_engine_client/messages/defs.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/python/fusion_engine_client/messages/defs.py b/python/fusion_engine_client/messages/defs.py index 3c65d6b4..c659c944 100644 --- a/python/fusion_engine_client/messages/defs.py +++ b/python/fusion_engine_client/messages/defs.py @@ -221,12 +221,13 @@ class MessageHeader: SYNC = bytes((SYNC0, SYNC1)) - _FORMAT = ' Date: Wed, 13 Dec 2023 18:43:33 -0500 Subject: [PATCH 4/5] Drop invalid syncs if the reserved bytes are not 0. This may change in the future, but for now we assume that the reserved bytes are always 0. This allows us to detect invalid syncs with extremely large (bogus) payload sizes that would otherwise cause us to capture a lot of data before we could detect an invalid CRC. --- .../fusion_engine_client/parsers/decoder.py | 29 +++++- .../parsers/fusion_engine_framer.cc | 92 ++++++++++++++----- 2 files changed, 93 insertions(+), 28 deletions(-) diff --git a/python/fusion_engine_client/parsers/decoder.py b/python/fusion_engine_client/parsers/decoder.py index efc8107a..83bf0bff 100644 --- a/python/fusion_engine_client/parsers/decoder.py +++ b/python/fusion_engine_client/parsers/decoder.py @@ -166,10 +166,33 @@ def on_data(self, data: Union[bytes, int]) -> List[Union[MessageTuple, MessageWi self._bytes_processed, self._bytes_processed)) self._trace_buffer(self._buffer[:MessageHeader.calcsize()]) - if self._header.payload_size_bytes > self._max_payload_len_bytes: + # The reserved bytes in the header are currently always set to 0. If the incoming bytes are not zero, + # assume this an invalid sync. + # + # This may change in the future, but for now it prevents us from needing to collect a ton of bytes + # before performing a CRC check if a bogus header from an invalid sync has a very large payload size + # that happens to still be smaller than the buffer size. + drop_candidate = False + if self._header.reserved != 0: print_func = _logger.warning if self._warn_on_error == self.WarnOnError.ALL else _logger.debug - print_func('Message payload too big. [payload_size=%d B, max=%d B]', - self._header.payload_size_bytes, self._max_payload_len_bytes) + print_func('Reserved bytes nonzero. Dropping suspected invalid sync. [type=%s, payload_size=%d B, ' + 'max=%d B]' % + (self._header.get_type_string(), self._header.payload_size_bytes, + self._max_payload_len_bytes)) + drop_candidate = True + # If the message is too large to fit in the buffer, we cannot parse it. + # + # If this is an invalid sync, the parsed (invalid) payload length may exceed the buffer size and the + # invalid header will be dropped. If it does not exceed the buffer size, it'll get caught later during + # the CRC check. + elif self._header.payload_size_bytes > self._max_payload_len_bytes: + print_func = _logger.warning if self._warn_on_error == self.WarnOnError.ALL else _logger.debug + print_func('Message payload too big. [type=%s, payload_size=%d B, max=%d B]' % + (self._header.get_type_string(), self._header.payload_size_bytes, + self._max_payload_len_bytes)) + drop_candidate = True + + if drop_candidate: self._header = None self._buffer.pop(0) self._bytes_processed += 1 diff --git a/src/point_one/fusion_engine/parsers/fusion_engine_framer.cc b/src/point_one/fusion_engine/parsers/fusion_engine_framer.cc index 06502f2c..a106e6f0 100644 --- a/src/point_one/fusion_engine/parsers/fusion_engine_framer.cc +++ b/src/point_one/fusion_engine/parsers/fusion_engine_framer.cc @@ -253,37 +253,67 @@ int32_t FusionEngineFramer::OnByte(bool quiet) { // Check if the header is complete. if (next_byte_index_ == sizeof(MessageHeader)) { - // Compute the full message size. If the message is too large to fit in - // the buffer, we cannot parse it. Otherwise, start collecting the - // message payload. - // - // Note that while we compute the current_message_size_ here, we - // intentionally do the "too big" check below with the payload size. That - // way we implicitly handle cases where the payload is large enough to - // cause current_message_size_ to overflow. Normally, this won't happen - // for legit packets that are just too big for the user's buffer, but it - // could happen on a bogus header if we find the preamble randomly in an - // incoming byte stream. The buffer capacity is always - // >=sizeof(MessageHeader), so the subtraction will never be negative. auto* header = reinterpret_cast(buffer_); current_message_size_ = sizeof(MessageHeader) + header->payload_size_bytes; VLOG(3) << "Header complete. Waiting for payload. [message=" - << header->message_type << " (" << (unsigned)header->message_type - << "), seq=" << header->sequence_number - << ", payload_size=" << header->payload_size_bytes << " B]"; - if (header->payload_size_bytes <= - capacity_bytes_ - sizeof(MessageHeader)) { - // If there's no payload, do the CRC check now. - if (header->payload_size_bytes == 0) { - VLOG(3) << "Message has no payload. Checking CRC."; - crc_check_needed = true; + << header->message_type << ", seq=" << header->sequence_number + << ", payload_size=" << header->payload_size_bytes + << " B, message_size=" << current_message_size_ << " B]"; + + // Check for size overflow. We don't currently expect to have _extremely_ + // large packets, so this should never happen. If it does, assume this is + // not a valid message, and instead the sync pattern likely showed up at + // random in the data stream. + if (current_message_size_ < header->payload_size_bytes) { + if (quiet) { + VLOG(2) << "Message size overflow. Dropping suspected invalid sync. " + "[size=" + << current_message_size_ + << " B (payload=" << header->payload_size_bytes << " B)]"; + } else { + LOG(WARNING) + << "Message size overflow. Dropping suspected invalid sync. " + "[size=" + << current_message_size_ + << " B (payload=" << header->payload_size_bytes << " B)]"; } - // Otherwise, collect the payload, then do the CRC check. - else { - state_ = State::DATA; + + state_ = State::SYNC0; + return -1; + } + // The reserved bytes in the header are currently always set to 0. If the + // incoming bytes are not zero, assume this an invalid sync. + // + // This may change in the future, but for now it prevents us from + // needing to collect a ton of bytes before performing a CRC check if a + // bogus header from an invalid sync has a very large payload size that + // happens to still be smaller than the buffer size. + else if (header->reserved[0] != 0 || header->reserved[1] != 0) { + if (quiet) { + VLOG(2) << "Reserved bytes nonzero. Dropping suspected invalid sync. " + "[size=" + << current_message_size_ + << " B (payload=" << header->payload_size_bytes << " B)]"; + } else { + LOG(WARNING) << "Message too large for buffer. [size=" + << current_message_size_ + << " B (payload=" << header->payload_size_bytes + << " B), buffer_capacity=" << capacity_bytes_ + << " B (max_payload=" + << capacity_bytes_ - sizeof(MessageHeader) << " B)]"; } - } else { + + state_ = State::SYNC0; + return -1; + } + // If the message is too large to fit in the buffer, we cannot parse it. + // + // If this is an invalid sync, the parsed (invalid) payload length may + // exceed the buffer size and the invalid header will be dropped. If it + // does not exceed the buffer size, it'll get caught later during the CRC + // check. + else if (current_message_size_ > capacity_bytes_) { if (quiet) { VLOG(2) << "Message too large for buffer. [size=" << current_message_size_ @@ -303,6 +333,18 @@ int32_t FusionEngineFramer::OnByte(bool quiet) { state_ = State::SYNC0; return -1; } + // Sanity checks passed. Start collecting the message payload next. + else { + // If there's no payload, do the CRC check now. + if (header->payload_size_bytes == 0) { + VLOG(3) << "Message has no payload. Checking CRC."; + crc_check_needed = true; + } + // Otherwise, collect the payload, then do the CRC check. + else { + state_ = State::DATA; + } + } } } // Collect the message payload. From bc411231602ca77a7db0926c679a8b7f977ec2a7 Mon Sep 17 00:00:00 2001 From: Adam Shapiro Date: Thu, 14 Dec 2023 12:02:33 -0500 Subject: [PATCH 5/5] Minor review cleanup. --- .../parsers/fusion_engine_framer.cc | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/point_one/fusion_engine/parsers/fusion_engine_framer.cc b/src/point_one/fusion_engine/parsers/fusion_engine_framer.cc index a106e6f0..6dc7afe1 100644 --- a/src/point_one/fusion_engine/parsers/fusion_engine_framer.cc +++ b/src/point_one/fusion_engine/parsers/fusion_engine_framer.cc @@ -261,10 +261,11 @@ int32_t FusionEngineFramer::OnByte(bool quiet) { << ", payload_size=" << header->payload_size_bytes << " B, message_size=" << current_message_size_ << " B]"; - // Check for size overflow. We don't currently expect to have _extremely_ - // large packets, so this should never happen. If it does, assume this is - // not a valid message, and instead the sync pattern likely showed up at - // random in the data stream. + // Check for overflow of the messge size uint32_t variable. We don't + // currently expect to have _extremely_ large packets, so this should + // never happen for a valid message. If it does, assume this is not a + // valid message, and instead the sync pattern likely showed up at random + // in the data stream. if (current_message_size_ < header->payload_size_bytes) { if (quiet) { VLOG(2) << "Message size overflow. Dropping suspected invalid sync. " @@ -296,12 +297,11 @@ int32_t FusionEngineFramer::OnByte(bool quiet) { << current_message_size_ << " B (payload=" << header->payload_size_bytes << " B)]"; } else { - LOG(WARNING) << "Message too large for buffer. [size=" + LOG(WARNING) << "Reserved bytes nonzero. Dropping suspected invalid " + "sync. [size=" << current_message_size_ << " B (payload=" << header->payload_size_bytes - << " B), buffer_capacity=" << capacity_bytes_ - << " B (max_payload=" - << capacity_bytes_ - sizeof(MessageHeader) << " B)]"; + << " B)]"; } state_ = State::SYNC0;