diff --git a/README.md b/README.md index 48606a23..919f3a31 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,7 @@ One FusionEngine or a Point One device (Atlas, Quectel LG69T, etc.), please cont * [Running Examples](#running-examples) * [Bazel](#bazel) * [Including In Your Bazel Project](#including-in-your-bazel-project) - * [Compiling From Source](#compiling-from-so + * [Compiling From Source](#compiling-from-source) * [Running Examples](#running-examples-1) * [Python](#python) * [Compiling Documentation](#compiling-documentation) @@ -74,11 +74,13 @@ One FusionEngine or a Point One device (Atlas, Quectel LG69T, etc.), please cont #### Example Applications The `examples/` directory contains example applications demonstrating how to use this library. They are: -- `message_decode` - Print the contents of messages contained in a binary file. - `external_cmake_project` - Download a copy of the FusionEngine Client library from the public repository and import it into a CMake project using `FetchContent`. - `generate_data` - Generate a binary file containing a fixed set of messages. - `lband_decode` - Example of decoding RTCM corrections from a recorded file containing LBandFrameMessage. +- `message_decode` - Parse and print the contents of messages contained in a binary file using the `FusionEngineFramer` + class. +- `raw_message_decode` - Parse and print the contents of messages directly from a binary file without a helper class. - `request_version` - Simulate sending a request for a version info message, and waiting for a response. - `tcp_client` - Connect to a device over TCP and display the received FusionEngine messages. - `udp_client` - Connect to a device over UDP and display the received FusionEngine messages. diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 84ffc17b..c633a71d 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -3,6 +3,7 @@ add_subdirectory(common) add_subdirectory(generate_data) add_subdirectory(lband_decode) add_subdirectory(message_decode) +add_subdirectory(raw_message_decode) add_subdirectory(request_version) add_subdirectory(tcp_client) add_subdirectory(udp_client) diff --git a/examples/message_decode/BUILD b/examples/message_decode/BUILD index b66d8358..6b506067 100644 --- a/examples/message_decode/BUILD +++ b/examples/message_decode/BUILD @@ -6,10 +6,15 @@ cc_binary( "message_decode.cc", ], data = [ - "example_data.p1log", + ":example_data", ], deps = [ "//common:print_message", "@fusion_engine_client", ], ) + +filegroup( + name = "example_data", + srcs = ["example_data.p1log"], +) diff --git a/examples/message_decode/message_decode.cc b/examples/message_decode/message_decode.cc index 56db32fd..1f234c00 100644 --- a/examples/message_decode/message_decode.cc +++ b/examples/message_decode/message_decode.cc @@ -8,84 +8,13 @@ #include #include -#include +#include #include "../common/print_message.h" using namespace point_one::fusion_engine::examples; using namespace point_one::fusion_engine::messages; - -/******************************************************************************/ -bool DecodeMessage(std::ifstream& stream, size_t available_bytes) { - static uint32_t expected_sequence_number = 0; - - // Enforce a 4-byte aligned address. - alignas(4) uint8_t storage[4096]; - char* buffer = reinterpret_cast(storage); - - // Read the message header. - if (available_bytes < sizeof(MessageHeader)) { - printf("Not enough data: cannot read header. [%zu bytes < %zu bytes]\n", - available_bytes, sizeof(MessageHeader)); - return false; - } - - stream.read(buffer, sizeof(MessageHeader)); - if (!stream) { - printf("Unexpected error reading header.\n"); - return false; - } - - available_bytes -= sizeof(MessageHeader); - - auto& header = *reinterpret_cast(buffer); - buffer += sizeof(MessageHeader); - - // Read the message payload. - if (available_bytes < header.payload_size_bytes) { - printf("Not enough data: cannot read payload. [%zu bytes < %u bytes]\n", - available_bytes, header.payload_size_bytes); - return false; - } - - stream.read(buffer, header.payload_size_bytes); - if (!stream) { - printf("Unexpected error reading payload.\n"); - return false; - } - - // Verify the message checksum. - size_t message_size = sizeof(MessageHeader) + header.payload_size_bytes; - if (!IsValid(storage)) { - printf( - "CRC failure. [type=%s (%u), size=%zu bytes (payload size=%u bytes], " - "sequence=%u, expected_crc=0x%08x, calculated_crc=0x%08x]\n", - to_string(header.message_type), - static_cast(header.message_type), message_size, - header.payload_size_bytes, header.sequence_number, header.crc, - CalculateCRC(storage)); - return false; - } - - // Check that the sequence number increments as expected. - if (header.sequence_number != expected_sequence_number) { - printf( - "Warning: unexpected sequence number. [type=%s (%u), size=%zu bytes " - "(payload size=%u bytes], crc=0x%08x, expected_sequence=%u, " - "received_sequence=%u]\n", - to_string(header.message_type), - static_cast(header.message_type), message_size, - header.payload_size_bytes, header.crc, expected_sequence_number, - header.sequence_number); - } - - expected_sequence_number = header.sequence_number + 1; - - // Interpret the payload. - PrintMessage(header, buffer); - - return true; -} +using namespace point_one::fusion_engine::parsers; /******************************************************************************/ int main(int argc, const char* argv[]) { @@ -104,23 +33,20 @@ Decode platform pose messages from a binary file containing FusionEngine data. return 1; } - // Determine the file size. - stream.seekg(0, stream.end); - std::streampos file_size_bytes = stream.tellg(); - stream.seekg(0, stream.beg); + // Create a decoder and configure it to print when messaes arrive. + FusionEngineFramer framer(MessageHeader::MAX_MESSAGE_SIZE_BYTES); + framer.SetMessageCallback(PrintMessage); - // Decode all messages in the file. - int return_code = 0; - while (stream.tellg() != file_size_bytes) { - if (!DecodeMessage(stream, - static_cast(file_size_bytes - stream.tellg()))) { - return_code = 1; - break; - } + // Read the file in chunks and decode any messages that are found. + uint8_t buffer[4096]; + while (!stream.eof()) { + stream.read(reinterpret_cast(buffer), sizeof(buffer)); + size_t bytes_read = stream.gcount(); + framer.OnData(buffer, bytes_read); } // Close the file. stream.close(); - return return_code; + return 0; } diff --git a/examples/raw_message_decode/BUILD b/examples/raw_message_decode/BUILD new file mode 100644 index 00000000..96af1e4e --- /dev/null +++ b/examples/raw_message_decode/BUILD @@ -0,0 +1,15 @@ +package(default_visibility = ["//visibility:public"]) + +cc_binary( + name = "raw_message_decode", + srcs = [ + "raw_message_decode.cc", + ], + data = [ + "//message_decode:example_data", + ], + deps = [ + "//common:print_message", + "@fusion_engine_client", + ], +) diff --git a/examples/raw_message_decode/CMakeLists.txt b/examples/raw_message_decode/CMakeLists.txt new file mode 100644 index 00000000..b3b98845 --- /dev/null +++ b/examples/raw_message_decode/CMakeLists.txt @@ -0,0 +1,3 @@ +add_executable(raw_message_decode raw_message_decode.cc) +target_link_libraries(raw_message_decode PUBLIC fusion_engine_client) +target_link_libraries(raw_message_decode PUBLIC print_message) diff --git a/examples/raw_message_decode/raw_message_decode.cc b/examples/raw_message_decode/raw_message_decode.cc new file mode 100644 index 00000000..3f0c9b97 --- /dev/null +++ b/examples/raw_message_decode/raw_message_decode.cc @@ -0,0 +1,127 @@ +/**************************************************************************/ /** +* @brief Example of decoding FusionEngine messages from a recorded file directly +* without the use of the @ref FusionEngineDecoder class. +* @file +******************************************************************************/ + +#include +#include +#include + +#include +#include + +#include "../common/print_message.h" + +using namespace point_one::fusion_engine::examples; +using namespace point_one::fusion_engine::messages; + +/******************************************************************************/ +bool DecodeMessage(std::ifstream& stream, size_t available_bytes) { + static uint32_t expected_sequence_number = 0; + + // Enforce a 4-byte aligned address. + alignas(4) uint8_t storage[4096]; + char* buffer = reinterpret_cast(storage); + + // Read the message header. + if (available_bytes < sizeof(MessageHeader)) { + printf("Not enough data: cannot read header. [%zu bytes < %zu bytes]\n", + available_bytes, sizeof(MessageHeader)); + return false; + } + + stream.read(buffer, sizeof(MessageHeader)); + if (!stream) { + printf("Unexpected error reading header.\n"); + return false; + } + + available_bytes -= sizeof(MessageHeader); + + auto& header = *reinterpret_cast(buffer); + buffer += sizeof(MessageHeader); + + // Read the message payload. + if (available_bytes < header.payload_size_bytes) { + printf("Not enough data: cannot read payload. [%zu bytes < %u bytes]\n", + available_bytes, header.payload_size_bytes); + return false; + } + + stream.read(buffer, header.payload_size_bytes); + if (!stream) { + printf("Unexpected error reading payload.\n"); + return false; + } + + // Verify the message checksum. + size_t message_size = sizeof(MessageHeader) + header.payload_size_bytes; + if (!IsValid(storage)) { + printf( + "CRC failure. [type=%s (%u), size=%zu bytes (payload size=%u bytes], " + "sequence=%u, expected_crc=0x%08x, calculated_crc=0x%08x]\n", + to_string(header.message_type), + static_cast(header.message_type), message_size, + header.payload_size_bytes, header.sequence_number, header.crc, + CalculateCRC(storage)); + return false; + } + + // Check that the sequence number increments as expected. + if (header.sequence_number != expected_sequence_number) { + printf( + "Warning: unexpected sequence number. [type=%s (%u), size=%zu bytes " + "(payload size=%u bytes], crc=0x%08x, expected_sequence=%u, " + "received_sequence=%u]\n", + to_string(header.message_type), + static_cast(header.message_type), message_size, + header.payload_size_bytes, header.crc, expected_sequence_number, + header.sequence_number); + } + + expected_sequence_number = header.sequence_number + 1; + + // Interpret the payload. + PrintMessage(header, buffer); + + return true; +} + +/******************************************************************************/ +int main(int argc, const char* argv[]) { + if (argc != 2) { + printf("Usage: %s FILE\n", argv[0]); + printf(R"EOF( +Decode platform pose messages from a binary file containing FusionEngine data. +)EOF"); + return 0; + } + + // Open the file. + std::ifstream stream(argv[1], std::ifstream::binary); + if (!stream) { + printf("Error opening file '%s'.\n", argv[1]); + return 1; + } + + // Determine the file size. + stream.seekg(0, stream.end); + std::streampos file_size_bytes = stream.tellg(); + stream.seekg(0, stream.beg); + + // Decode all messages in the file. + int return_code = 0; + while (stream.tellg() != file_size_bytes) { + if (!DecodeMessage(stream, + static_cast(file_size_bytes - stream.tellg()))) { + return_code = 1; + break; + } + } + + // Close the file. + stream.close(); + + return return_code; +} diff --git a/python/fusion_engine_client/messages/defs.py b/python/fusion_engine_client/messages/defs.py index 3c65d6b4..c659c944 100644 --- a/python/fusion_engine_client/messages/defs.py +++ b/python/fusion_engine_client/messages/defs.py @@ -221,12 +221,13 @@ class MessageHeader: SYNC = bytes((SYNC0, SYNC1)) - _FORMAT = ' List[Union[MessageTuple, MessageWi self._bytes_processed, self._bytes_processed)) self._trace_buffer(self._buffer[:MessageHeader.calcsize()]) - if self._header.payload_size_bytes > self._max_payload_len_bytes: + # The reserved bytes in the header are currently always set to 0. If the incoming bytes are not zero, + # assume this an invalid sync. + # + # This may change in the future, but for now it prevents us from needing to collect a ton of bytes + # before performing a CRC check if a bogus header from an invalid sync has a very large payload size + # that happens to still be smaller than the buffer size. + drop_candidate = False + if self._header.reserved != 0: print_func = _logger.warning if self._warn_on_error == self.WarnOnError.ALL else _logger.debug - print_func('Message payload too big. [payload_size=%d B, max=%d B]', - self._header.payload_size_bytes, self._max_payload_len_bytes) + print_func('Reserved bytes nonzero. Dropping suspected invalid sync. [type=%s, payload_size=%d B, ' + 'max=%d B]' % + (self._header.get_type_string(), self._header.payload_size_bytes, + self._max_payload_len_bytes)) + drop_candidate = True + # If the message is too large to fit in the buffer, we cannot parse it. + # + # If this is an invalid sync, the parsed (invalid) payload length may exceed the buffer size and the + # invalid header will be dropped. If it does not exceed the buffer size, it'll get caught later during + # the CRC check. + elif self._header.payload_size_bytes > self._max_payload_len_bytes: + print_func = _logger.warning if self._warn_on_error == self.WarnOnError.ALL else _logger.debug + print_func('Message payload too big. [type=%s, payload_size=%d B, max=%d B]' % + (self._header.get_type_string(), self._header.payload_size_bytes, + self._max_payload_len_bytes)) + drop_candidate = True + + if drop_candidate: self._header = None self._buffer.pop(0) self._bytes_processed += 1 diff --git a/src/point_one/fusion_engine/parsers/fusion_engine_framer.cc b/src/point_one/fusion_engine/parsers/fusion_engine_framer.cc index 06502f2c..6dc7afe1 100644 --- a/src/point_one/fusion_engine/parsers/fusion_engine_framer.cc +++ b/src/point_one/fusion_engine/parsers/fusion_engine_framer.cc @@ -253,37 +253,67 @@ int32_t FusionEngineFramer::OnByte(bool quiet) { // Check if the header is complete. if (next_byte_index_ == sizeof(MessageHeader)) { - // Compute the full message size. If the message is too large to fit in - // the buffer, we cannot parse it. Otherwise, start collecting the - // message payload. - // - // Note that while we compute the current_message_size_ here, we - // intentionally do the "too big" check below with the payload size. That - // way we implicitly handle cases where the payload is large enough to - // cause current_message_size_ to overflow. Normally, this won't happen - // for legit packets that are just too big for the user's buffer, but it - // could happen on a bogus header if we find the preamble randomly in an - // incoming byte stream. The buffer capacity is always - // >=sizeof(MessageHeader), so the subtraction will never be negative. auto* header = reinterpret_cast(buffer_); current_message_size_ = sizeof(MessageHeader) + header->payload_size_bytes; VLOG(3) << "Header complete. Waiting for payload. [message=" - << header->message_type << " (" << (unsigned)header->message_type - << "), seq=" << header->sequence_number - << ", payload_size=" << header->payload_size_bytes << " B]"; - if (header->payload_size_bytes <= - capacity_bytes_ - sizeof(MessageHeader)) { - // If there's no payload, do the CRC check now. - if (header->payload_size_bytes == 0) { - VLOG(3) << "Message has no payload. Checking CRC."; - crc_check_needed = true; + << header->message_type << ", seq=" << header->sequence_number + << ", payload_size=" << header->payload_size_bytes + << " B, message_size=" << current_message_size_ << " B]"; + + // Check for overflow of the messge size uint32_t variable. We don't + // currently expect to have _extremely_ large packets, so this should + // never happen for a valid message. If it does, assume this is not a + // valid message, and instead the sync pattern likely showed up at random + // in the data stream. + if (current_message_size_ < header->payload_size_bytes) { + if (quiet) { + VLOG(2) << "Message size overflow. Dropping suspected invalid sync. " + "[size=" + << current_message_size_ + << " B (payload=" << header->payload_size_bytes << " B)]"; + } else { + LOG(WARNING) + << "Message size overflow. Dropping suspected invalid sync. " + "[size=" + << current_message_size_ + << " B (payload=" << header->payload_size_bytes << " B)]"; } - // Otherwise, collect the payload, then do the CRC check. - else { - state_ = State::DATA; + + state_ = State::SYNC0; + return -1; + } + // The reserved bytes in the header are currently always set to 0. If the + // incoming bytes are not zero, assume this an invalid sync. + // + // This may change in the future, but for now it prevents us from + // needing to collect a ton of bytes before performing a CRC check if a + // bogus header from an invalid sync has a very large payload size that + // happens to still be smaller than the buffer size. + else if (header->reserved[0] != 0 || header->reserved[1] != 0) { + if (quiet) { + VLOG(2) << "Reserved bytes nonzero. Dropping suspected invalid sync. " + "[size=" + << current_message_size_ + << " B (payload=" << header->payload_size_bytes << " B)]"; + } else { + LOG(WARNING) << "Reserved bytes nonzero. Dropping suspected invalid " + "sync. [size=" + << current_message_size_ + << " B (payload=" << header->payload_size_bytes + << " B)]"; } - } else { + + state_ = State::SYNC0; + return -1; + } + // If the message is too large to fit in the buffer, we cannot parse it. + // + // If this is an invalid sync, the parsed (invalid) payload length may + // exceed the buffer size and the invalid header will be dropped. If it + // does not exceed the buffer size, it'll get caught later during the CRC + // check. + else if (current_message_size_ > capacity_bytes_) { if (quiet) { VLOG(2) << "Message too large for buffer. [size=" << current_message_size_ @@ -303,6 +333,18 @@ int32_t FusionEngineFramer::OnByte(bool quiet) { state_ = State::SYNC0; return -1; } + // Sanity checks passed. Start collecting the message payload next. + else { + // If there's no payload, do the CRC check now. + if (header->payload_size_bytes == 0) { + VLOG(3) << "Message has no payload. Checking CRC."; + crc_check_needed = true; + } + // Otherwise, collect the payload, then do the CRC check. + else { + state_ = State::DATA; + } + } } } // Collect the message payload.