Skip to content

Commit

Permalink
Improved early detection of invalid syncs in FusionEngineDecoder. (#…
Browse files Browse the repository at this point in the history
…288)

# Changes
- Added C++ `message_decode` example app using `FusionEngineFramer` class, and renamed previous app to `raw_message_decode`

# Fixes
- Use the header reserved bytes to detect invalid syncs early when the (bogus) payload length is very large but smaller than the max packet size
  • Loading branch information
adamshapiro0 authored Dec 14, 2023
2 parents cf6d76c + bc41123 commit 187ed5a
Show file tree
Hide file tree
Showing 10 changed files with 270 additions and 121 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ One FusionEngine or a Point One device (Atlas, Quectel LG69T, etc.), please cont
* [Running Examples](#running-examples)
* [Bazel](#bazel)
* [Including In Your Bazel Project](#including-in-your-bazel-project)
* [Compiling From Source](#compiling-from-so
* [Compiling From Source](#compiling-from-source)
* [Running Examples](#running-examples-1)
* [Python](#python)
* [Compiling Documentation](#compiling-documentation)
Expand Down Expand Up @@ -74,11 +74,13 @@ One FusionEngine or a Point One device (Atlas, Quectel LG69T, etc.), please cont
#### Example Applications

The `examples/` directory contains example applications demonstrating how to use this library. They are:
- `message_decode` - Print the contents of messages contained in a binary file.
- `external_cmake_project` - Download a copy of the FusionEngine Client library from the public repository and import
it into a CMake project using `FetchContent`.
- `generate_data` - Generate a binary file containing a fixed set of messages.
- `lband_decode` - Example of decoding RTCM corrections from a recorded file containing LBandFrameMessage.
- `message_decode` - Parse and print the contents of messages contained in a binary file using the `FusionEngineFramer`
class.
- `raw_message_decode` - Parse and print the contents of messages directly from a binary file without a helper class.
- `request_version` - Simulate sending a request for a version info message, and waiting for a response.
- `tcp_client` - Connect to a device over TCP and display the received FusionEngine messages.
- `udp_client` - Connect to a device over UDP and display the received FusionEngine messages.
Expand Down
1 change: 1 addition & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ add_subdirectory(common)
add_subdirectory(generate_data)
add_subdirectory(lband_decode)
add_subdirectory(message_decode)
add_subdirectory(raw_message_decode)
add_subdirectory(request_version)
add_subdirectory(tcp_client)
add_subdirectory(udp_client)
Expand Down
7 changes: 6 additions & 1 deletion examples/message_decode/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,15 @@ cc_binary(
"message_decode.cc",
],
data = [
"example_data.p1log",
":example_data",
],
deps = [
"//common:print_message",
"@fusion_engine_client",
],
)

filegroup(
name = "example_data",
srcs = ["example_data.p1log"],
)
98 changes: 12 additions & 86 deletions examples/message_decode/message_decode.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,84 +8,13 @@
#include <fstream>

#include <point_one/fusion_engine/messages/core.h>
#include <point_one/fusion_engine/messages/crc.h>
#include <point_one/fusion_engine/parsers/fusion_engine_framer.h>

#include "../common/print_message.h"

using namespace point_one::fusion_engine::examples;
using namespace point_one::fusion_engine::messages;

/******************************************************************************/
bool DecodeMessage(std::ifstream& stream, size_t available_bytes) {
static uint32_t expected_sequence_number = 0;

// Enforce a 4-byte aligned address.
alignas(4) uint8_t storage[4096];
char* buffer = reinterpret_cast<char*>(storage);

// Read the message header.
if (available_bytes < sizeof(MessageHeader)) {
printf("Not enough data: cannot read header. [%zu bytes < %zu bytes]\n",
available_bytes, sizeof(MessageHeader));
return false;
}

stream.read(buffer, sizeof(MessageHeader));
if (!stream) {
printf("Unexpected error reading header.\n");
return false;
}

available_bytes -= sizeof(MessageHeader);

auto& header = *reinterpret_cast<MessageHeader*>(buffer);
buffer += sizeof(MessageHeader);

// Read the message payload.
if (available_bytes < header.payload_size_bytes) {
printf("Not enough data: cannot read payload. [%zu bytes < %u bytes]\n",
available_bytes, header.payload_size_bytes);
return false;
}

stream.read(buffer, header.payload_size_bytes);
if (!stream) {
printf("Unexpected error reading payload.\n");
return false;
}

// Verify the message checksum.
size_t message_size = sizeof(MessageHeader) + header.payload_size_bytes;
if (!IsValid(storage)) {
printf(
"CRC failure. [type=%s (%u), size=%zu bytes (payload size=%u bytes], "
"sequence=%u, expected_crc=0x%08x, calculated_crc=0x%08x]\n",
to_string(header.message_type),
static_cast<unsigned>(header.message_type), message_size,
header.payload_size_bytes, header.sequence_number, header.crc,
CalculateCRC(storage));
return false;
}

// Check that the sequence number increments as expected.
if (header.sequence_number != expected_sequence_number) {
printf(
"Warning: unexpected sequence number. [type=%s (%u), size=%zu bytes "
"(payload size=%u bytes], crc=0x%08x, expected_sequence=%u, "
"received_sequence=%u]\n",
to_string(header.message_type),
static_cast<unsigned>(header.message_type), message_size,
header.payload_size_bytes, header.crc, expected_sequence_number,
header.sequence_number);
}

expected_sequence_number = header.sequence_number + 1;

// Interpret the payload.
PrintMessage(header, buffer);

return true;
}
using namespace point_one::fusion_engine::parsers;

/******************************************************************************/
int main(int argc, const char* argv[]) {
Expand All @@ -104,23 +33,20 @@ Decode platform pose messages from a binary file containing FusionEngine data.
return 1;
}

// Determine the file size.
stream.seekg(0, stream.end);
std::streampos file_size_bytes = stream.tellg();
stream.seekg(0, stream.beg);
// Create a decoder and configure it to print when messaes arrive.
FusionEngineFramer framer(MessageHeader::MAX_MESSAGE_SIZE_BYTES);
framer.SetMessageCallback(PrintMessage);

// Decode all messages in the file.
int return_code = 0;
while (stream.tellg() != file_size_bytes) {
if (!DecodeMessage(stream,
static_cast<size_t>(file_size_bytes - stream.tellg()))) {
return_code = 1;
break;
}
// Read the file in chunks and decode any messages that are found.
uint8_t buffer[4096];
while (!stream.eof()) {
stream.read(reinterpret_cast<char*>(buffer), sizeof(buffer));
size_t bytes_read = stream.gcount();
framer.OnData(buffer, bytes_read);
}

// Close the file.
stream.close();

return return_code;
return 0;
}
15 changes: 15 additions & 0 deletions examples/raw_message_decode/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package(default_visibility = ["//visibility:public"])

cc_binary(
name = "raw_message_decode",
srcs = [
"raw_message_decode.cc",
],
data = [
"//message_decode:example_data",
],
deps = [
"//common:print_message",
"@fusion_engine_client",
],
)
3 changes: 3 additions & 0 deletions examples/raw_message_decode/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
add_executable(raw_message_decode raw_message_decode.cc)
target_link_libraries(raw_message_decode PUBLIC fusion_engine_client)
target_link_libraries(raw_message_decode PUBLIC print_message)
127 changes: 127 additions & 0 deletions examples/raw_message_decode/raw_message_decode.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/**************************************************************************/ /**
* @brief Example of decoding FusionEngine messages from a recorded file directly
* without the use of the @ref FusionEngineDecoder class.
* @file
******************************************************************************/

#include <cstdint>
#include <cstdio>
#include <fstream>

#include <point_one/fusion_engine/messages/core.h>
#include <point_one/fusion_engine/messages/crc.h>

#include "../common/print_message.h"

using namespace point_one::fusion_engine::examples;
using namespace point_one::fusion_engine::messages;

/******************************************************************************/
bool DecodeMessage(std::ifstream& stream, size_t available_bytes) {
static uint32_t expected_sequence_number = 0;

// Enforce a 4-byte aligned address.
alignas(4) uint8_t storage[4096];
char* buffer = reinterpret_cast<char*>(storage);

// Read the message header.
if (available_bytes < sizeof(MessageHeader)) {
printf("Not enough data: cannot read header. [%zu bytes < %zu bytes]\n",
available_bytes, sizeof(MessageHeader));
return false;
}

stream.read(buffer, sizeof(MessageHeader));
if (!stream) {
printf("Unexpected error reading header.\n");
return false;
}

available_bytes -= sizeof(MessageHeader);

auto& header = *reinterpret_cast<MessageHeader*>(buffer);
buffer += sizeof(MessageHeader);

// Read the message payload.
if (available_bytes < header.payload_size_bytes) {
printf("Not enough data: cannot read payload. [%zu bytes < %u bytes]\n",
available_bytes, header.payload_size_bytes);
return false;
}

stream.read(buffer, header.payload_size_bytes);
if (!stream) {
printf("Unexpected error reading payload.\n");
return false;
}

// Verify the message checksum.
size_t message_size = sizeof(MessageHeader) + header.payload_size_bytes;
if (!IsValid(storage)) {
printf(
"CRC failure. [type=%s (%u), size=%zu bytes (payload size=%u bytes], "
"sequence=%u, expected_crc=0x%08x, calculated_crc=0x%08x]\n",
to_string(header.message_type),
static_cast<unsigned>(header.message_type), message_size,
header.payload_size_bytes, header.sequence_number, header.crc,
CalculateCRC(storage));
return false;
}

// Check that the sequence number increments as expected.
if (header.sequence_number != expected_sequence_number) {
printf(
"Warning: unexpected sequence number. [type=%s (%u), size=%zu bytes "
"(payload size=%u bytes], crc=0x%08x, expected_sequence=%u, "
"received_sequence=%u]\n",
to_string(header.message_type),
static_cast<unsigned>(header.message_type), message_size,
header.payload_size_bytes, header.crc, expected_sequence_number,
header.sequence_number);
}

expected_sequence_number = header.sequence_number + 1;

// Interpret the payload.
PrintMessage(header, buffer);

return true;
}

/******************************************************************************/
int main(int argc, const char* argv[]) {
if (argc != 2) {
printf("Usage: %s FILE\n", argv[0]);
printf(R"EOF(
Decode platform pose messages from a binary file containing FusionEngine data.
)EOF");
return 0;
}

// Open the file.
std::ifstream stream(argv[1], std::ifstream::binary);
if (!stream) {
printf("Error opening file '%s'.\n", argv[1]);
return 1;
}

// Determine the file size.
stream.seekg(0, stream.end);
std::streampos file_size_bytes = stream.tellg();
stream.seekg(0, stream.beg);

// Decode all messages in the file.
int return_code = 0;
while (stream.tellg() != file_size_bytes) {
if (!DecodeMessage(stream,
static_cast<size_t>(file_size_bytes - stream.tellg()))) {
return_code = 1;
break;
}
}

// Close the file.
stream.close();

return return_code;
}
13 changes: 9 additions & 4 deletions python/fusion_engine_client/messages/defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,13 @@ class MessageHeader:

SYNC = bytes((SYNC0, SYNC1))

_FORMAT = '<BB2xIBBHIII'
_FORMAT = '<BBHIBBHIII'
_SIZE: int = struct.calcsize(_FORMAT)

_MAX_EXPECTED_SIZE_BYTES = (1 << 24)

def __init__(self, message_type: MessageType = MessageType.INVALID):
self.reserved: int = 0
self.crc: int = 0
self.protocol_version: int = 2
self.sequence_number: int = 0
Expand Down Expand Up @@ -285,13 +286,17 @@ def pack(self, buffer: bytes = None, offset: int = 0, payload: bytes = None, ret
@return A `bytes` object containing the serialized message, or the size of the serialized content (in bytes).
"""
if self.reserved != 0:
self.reserved = 0

# If the payload is specified, set the CRC and payload length, and then append the payload to the returned
# result.
if payload is not None:
self.calculate_crc(payload)

args = (MessageHeader.SYNC0, MessageHeader.SYNC1, self.crc, self.protocol_version, self.message_version,
int(self.message_type), self.sequence_number, self.payload_size_bytes, self.source_identifier)
args = (MessageHeader.SYNC0, MessageHeader.SYNC1, self.reserved, self.crc, self.protocol_version,
self.message_version, int(self.message_type), self.sequence_number, self.payload_size_bytes,
self.source_identifier)
if buffer is None:
buffer = struct.pack(MessageHeader._FORMAT, *args)
if payload is not None:
Expand Down Expand Up @@ -323,7 +328,7 @@ def unpack(self, buffer: bytes, offset: int = 0, validate_sync: bool = False, va
@return The size of the serialized header (in bytes).
"""
(sync0, sync1,
(sync0, sync1, self.reserved,
self.crc, self.protocol_version,
self.message_version, message_type_int,
self.sequence_number, self.payload_size_bytes, self.source_identifier) = \
Expand Down
29 changes: 26 additions & 3 deletions python/fusion_engine_client/parsers/decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,33 @@ def on_data(self, data: Union[bytes, int]) -> List[Union[MessageTuple, MessageWi
self._bytes_processed, self._bytes_processed))
self._trace_buffer(self._buffer[:MessageHeader.calcsize()])

if self._header.payload_size_bytes > self._max_payload_len_bytes:
# The reserved bytes in the header are currently always set to 0. If the incoming bytes are not zero,
# assume this an invalid sync.
#
# This may change in the future, but for now it prevents us from needing to collect a ton of bytes
# before performing a CRC check if a bogus header from an invalid sync has a very large payload size
# that happens to still be smaller than the buffer size.
drop_candidate = False
if self._header.reserved != 0:
print_func = _logger.warning if self._warn_on_error == self.WarnOnError.ALL else _logger.debug
print_func('Message payload too big. [payload_size=%d B, max=%d B]',
self._header.payload_size_bytes, self._max_payload_len_bytes)
print_func('Reserved bytes nonzero. Dropping suspected invalid sync. [type=%s, payload_size=%d B, '
'max=%d B]' %
(self._header.get_type_string(), self._header.payload_size_bytes,
self._max_payload_len_bytes))
drop_candidate = True
# If the message is too large to fit in the buffer, we cannot parse it.
#
# If this is an invalid sync, the parsed (invalid) payload length may exceed the buffer size and the
# invalid header will be dropped. If it does not exceed the buffer size, it'll get caught later during
# the CRC check.
elif self._header.payload_size_bytes > self._max_payload_len_bytes:
print_func = _logger.warning if self._warn_on_error == self.WarnOnError.ALL else _logger.debug
print_func('Message payload too big. [type=%s, payload_size=%d B, max=%d B]' %
(self._header.get_type_string(), self._header.payload_size_bytes,
self._max_payload_len_bytes))
drop_candidate = True

if drop_candidate:
self._header = None
self._buffer.pop(0)
self._bytes_processed += 1
Expand Down
Loading

0 comments on commit 187ed5a

Please sign in to comment.