Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved early detection of invalid syncs in FusionEngineDecoder. #288

Merged
merged 5 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ One FusionEngine or a Point One device (Atlas, Quectel LG69T, etc.), please cont
* [Running Examples](#running-examples)
* [Bazel](#bazel)
* [Including In Your Bazel Project](#including-in-your-bazel-project)
* [Compiling From Source](#compiling-from-so
* [Compiling From Source](#compiling-from-source)
* [Running Examples](#running-examples-1)
* [Python](#python)
* [Compiling Documentation](#compiling-documentation)
Expand Down Expand Up @@ -74,11 +74,13 @@ One FusionEngine or a Point One device (Atlas, Quectel LG69T, etc.), please cont
#### Example Applications

The `examples/` directory contains example applications demonstrating how to use this library. They are:
- `message_decode` - Print the contents of messages contained in a binary file.
- `external_cmake_project` - Download a copy of the FusionEngine Client library from the public repository and import
it into a CMake project using `FetchContent`.
- `generate_data` - Generate a binary file containing a fixed set of messages.
- `lband_decode` - Example of decoding RTCM corrections from a recorded file containing LBandFrameMessage.
- `message_decode` - Parse and print the contents of messages contained in a binary file using the `FusionEngineFramer`
class.
- `raw_message_decode` - Parse and print the contents of messages directly from a binary file without a helper class.
- `request_version` - Simulate sending a request for a version info message, and waiting for a response.
- `tcp_client` - Connect to a device over TCP and display the received FusionEngine messages.
- `udp_client` - Connect to a device over UDP and display the received FusionEngine messages.
Expand Down
1 change: 1 addition & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ add_subdirectory(common)
add_subdirectory(generate_data)
add_subdirectory(lband_decode)
add_subdirectory(message_decode)
add_subdirectory(raw_message_decode)
add_subdirectory(request_version)
add_subdirectory(tcp_client)
add_subdirectory(udp_client)
Expand Down
7 changes: 6 additions & 1 deletion examples/message_decode/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,15 @@ cc_binary(
"message_decode.cc",
],
data = [
"example_data.p1log",
":example_data",
],
deps = [
"//common:print_message",
"@fusion_engine_client",
],
)

filegroup(
name = "example_data",
srcs = ["example_data.p1log"],
)
98 changes: 12 additions & 86 deletions examples/message_decode/message_decode.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,84 +8,13 @@
#include <fstream>

#include <point_one/fusion_engine/messages/core.h>
#include <point_one/fusion_engine/messages/crc.h>
#include <point_one/fusion_engine/parsers/fusion_engine_framer.h>

#include "../common/print_message.h"

using namespace point_one::fusion_engine::examples;
using namespace point_one::fusion_engine::messages;

/******************************************************************************/
bool DecodeMessage(std::ifstream& stream, size_t available_bytes) {
static uint32_t expected_sequence_number = 0;

// Enforce a 4-byte aligned address.
alignas(4) uint8_t storage[4096];
char* buffer = reinterpret_cast<char*>(storage);

// Read the message header.
if (available_bytes < sizeof(MessageHeader)) {
printf("Not enough data: cannot read header. [%zu bytes < %zu bytes]\n",
available_bytes, sizeof(MessageHeader));
return false;
}

stream.read(buffer, sizeof(MessageHeader));
if (!stream) {
printf("Unexpected error reading header.\n");
return false;
}

available_bytes -= sizeof(MessageHeader);

auto& header = *reinterpret_cast<MessageHeader*>(buffer);
buffer += sizeof(MessageHeader);

// Read the message payload.
if (available_bytes < header.payload_size_bytes) {
printf("Not enough data: cannot read payload. [%zu bytes < %u bytes]\n",
available_bytes, header.payload_size_bytes);
return false;
}

stream.read(buffer, header.payload_size_bytes);
if (!stream) {
printf("Unexpected error reading payload.\n");
return false;
}

// Verify the message checksum.
size_t message_size = sizeof(MessageHeader) + header.payload_size_bytes;
if (!IsValid(storage)) {
printf(
"CRC failure. [type=%s (%u), size=%zu bytes (payload size=%u bytes], "
"sequence=%u, expected_crc=0x%08x, calculated_crc=0x%08x]\n",
to_string(header.message_type),
static_cast<unsigned>(header.message_type), message_size,
header.payload_size_bytes, header.sequence_number, header.crc,
CalculateCRC(storage));
return false;
}

// Check that the sequence number increments as expected.
if (header.sequence_number != expected_sequence_number) {
printf(
"Warning: unexpected sequence number. [type=%s (%u), size=%zu bytes "
"(payload size=%u bytes], crc=0x%08x, expected_sequence=%u, "
"received_sequence=%u]\n",
to_string(header.message_type),
static_cast<unsigned>(header.message_type), message_size,
header.payload_size_bytes, header.crc, expected_sequence_number,
header.sequence_number);
}

expected_sequence_number = header.sequence_number + 1;

// Interpret the payload.
PrintMessage(header, buffer);

return true;
}
using namespace point_one::fusion_engine::parsers;

/******************************************************************************/
int main(int argc, const char* argv[]) {
Expand All @@ -104,23 +33,20 @@ Decode platform pose messages from a binary file containing FusionEngine data.
return 1;
}

// Determine the file size.
stream.seekg(0, stream.end);
std::streampos file_size_bytes = stream.tellg();
stream.seekg(0, stream.beg);
// Create a decoder and configure it to print when messaes arrive.
FusionEngineFramer framer(MessageHeader::MAX_MESSAGE_SIZE_BYTES);
framer.SetMessageCallback(PrintMessage);

// Decode all messages in the file.
int return_code = 0;
while (stream.tellg() != file_size_bytes) {
if (!DecodeMessage(stream,
static_cast<size_t>(file_size_bytes - stream.tellg()))) {
return_code = 1;
break;
}
// Read the file in chunks and decode any messages that are found.
uint8_t buffer[4096];
while (!stream.eof()) {
stream.read(reinterpret_cast<char*>(buffer), sizeof(buffer));
size_t bytes_read = stream.gcount();
framer.OnData(buffer, bytes_read);
}

// Close the file.
stream.close();

return return_code;
return 0;
}
15 changes: 15 additions & 0 deletions examples/raw_message_decode/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package(default_visibility = ["//visibility:public"])

cc_binary(
name = "raw_message_decode",
srcs = [
"raw_message_decode.cc",
],
data = [
"//message_decode:example_data",
],
deps = [
"//common:print_message",
"@fusion_engine_client",
],
)
3 changes: 3 additions & 0 deletions examples/raw_message_decode/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
add_executable(raw_message_decode raw_message_decode.cc)
target_link_libraries(raw_message_decode PUBLIC fusion_engine_client)
target_link_libraries(raw_message_decode PUBLIC print_message)
127 changes: 127 additions & 0 deletions examples/raw_message_decode/raw_message_decode.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/**************************************************************************/ /**
* @brief Example of decoding FusionEngine messages from a recorded file directly
* without the use of the @ref FusionEngineDecoder class.
* @file
******************************************************************************/

#include <cstdint>
#include <cstdio>
#include <fstream>

#include <point_one/fusion_engine/messages/core.h>
#include <point_one/fusion_engine/messages/crc.h>

#include "../common/print_message.h"

using namespace point_one::fusion_engine::examples;
using namespace point_one::fusion_engine::messages;

/******************************************************************************/
bool DecodeMessage(std::ifstream& stream, size_t available_bytes) {
static uint32_t expected_sequence_number = 0;

// Enforce a 4-byte aligned address.
alignas(4) uint8_t storage[4096];
char* buffer = reinterpret_cast<char*>(storage);

// Read the message header.
if (available_bytes < sizeof(MessageHeader)) {
printf("Not enough data: cannot read header. [%zu bytes < %zu bytes]\n",
available_bytes, sizeof(MessageHeader));
return false;
}

stream.read(buffer, sizeof(MessageHeader));
if (!stream) {
printf("Unexpected error reading header.\n");
return false;
}

available_bytes -= sizeof(MessageHeader);

auto& header = *reinterpret_cast<MessageHeader*>(buffer);
buffer += sizeof(MessageHeader);

// Read the message payload.
if (available_bytes < header.payload_size_bytes) {
printf("Not enough data: cannot read payload. [%zu bytes < %u bytes]\n",
available_bytes, header.payload_size_bytes);
return false;
}

stream.read(buffer, header.payload_size_bytes);
if (!stream) {
printf("Unexpected error reading payload.\n");
return false;
}

// Verify the message checksum.
size_t message_size = sizeof(MessageHeader) + header.payload_size_bytes;
if (!IsValid(storage)) {
printf(
"CRC failure. [type=%s (%u), size=%zu bytes (payload size=%u bytes], "
"sequence=%u, expected_crc=0x%08x, calculated_crc=0x%08x]\n",
to_string(header.message_type),
static_cast<unsigned>(header.message_type), message_size,
header.payload_size_bytes, header.sequence_number, header.crc,
CalculateCRC(storage));
return false;
}

// Check that the sequence number increments as expected.
if (header.sequence_number != expected_sequence_number) {
printf(
"Warning: unexpected sequence number. [type=%s (%u), size=%zu bytes "
"(payload size=%u bytes], crc=0x%08x, expected_sequence=%u, "
"received_sequence=%u]\n",
to_string(header.message_type),
static_cast<unsigned>(header.message_type), message_size,
header.payload_size_bytes, header.crc, expected_sequence_number,
header.sequence_number);
}

expected_sequence_number = header.sequence_number + 1;

// Interpret the payload.
PrintMessage(header, buffer);

return true;
}

/******************************************************************************/
int main(int argc, const char* argv[]) {
if (argc != 2) {
printf("Usage: %s FILE\n", argv[0]);
printf(R"EOF(
Decode platform pose messages from a binary file containing FusionEngine data.
)EOF");
return 0;
}

// Open the file.
std::ifstream stream(argv[1], std::ifstream::binary);
Dismissed Show dismissed Hide dismissed
if (!stream) {
printf("Error opening file '%s'.\n", argv[1]);
return 1;
}

// Determine the file size.
stream.seekg(0, stream.end);
std::streampos file_size_bytes = stream.tellg();
stream.seekg(0, stream.beg);

// Decode all messages in the file.
int return_code = 0;
while (stream.tellg() != file_size_bytes) {
if (!DecodeMessage(stream,
static_cast<size_t>(file_size_bytes - stream.tellg()))) {
return_code = 1;
break;
}
}

// Close the file.
stream.close();

return return_code;
}
13 changes: 9 additions & 4 deletions python/fusion_engine_client/messages/defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,13 @@ class MessageHeader:

SYNC = bytes((SYNC0, SYNC1))

_FORMAT = '<BB2xIBBHIII'
_FORMAT = '<BBHIBBHIII'
_SIZE: int = struct.calcsize(_FORMAT)

_MAX_EXPECTED_SIZE_BYTES = (1 << 24)

def __init__(self, message_type: MessageType = MessageType.INVALID):
self.reserved: int = 0
self.crc: int = 0
self.protocol_version: int = 2
self.sequence_number: int = 0
Expand Down Expand Up @@ -285,13 +286,17 @@ def pack(self, buffer: bytes = None, offset: int = 0, payload: bytes = None, ret

@return A `bytes` object containing the serialized message, or the size of the serialized content (in bytes).
"""
if self.reserved != 0:
self.reserved = 0

# If the payload is specified, set the CRC and payload length, and then append the payload to the returned
# result.
if payload is not None:
self.calculate_crc(payload)

args = (MessageHeader.SYNC0, MessageHeader.SYNC1, self.crc, self.protocol_version, self.message_version,
int(self.message_type), self.sequence_number, self.payload_size_bytes, self.source_identifier)
args = (MessageHeader.SYNC0, MessageHeader.SYNC1, self.reserved, self.crc, self.protocol_version,
self.message_version, int(self.message_type), self.sequence_number, self.payload_size_bytes,
self.source_identifier)
if buffer is None:
buffer = struct.pack(MessageHeader._FORMAT, *args)
if payload is not None:
Expand Down Expand Up @@ -323,7 +328,7 @@ def unpack(self, buffer: bytes, offset: int = 0, validate_sync: bool = False, va

@return The size of the serialized header (in bytes).
"""
(sync0, sync1,
(sync0, sync1, self.reserved,
self.crc, self.protocol_version,
self.message_version, message_type_int,
self.sequence_number, self.payload_size_bytes, self.source_identifier) = \
Expand Down
29 changes: 26 additions & 3 deletions python/fusion_engine_client/parsers/decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,33 @@ def on_data(self, data: Union[bytes, int]) -> List[Union[MessageTuple, MessageWi
self._bytes_processed, self._bytes_processed))
self._trace_buffer(self._buffer[:MessageHeader.calcsize()])

if self._header.payload_size_bytes > self._max_payload_len_bytes:
# The reserved bytes in the header are currently always set to 0. If the incoming bytes are not zero,
# assume this an invalid sync.
#
# This may change in the future, but for now it prevents us from needing to collect a ton of bytes
# before performing a CRC check if a bogus header from an invalid sync has a very large payload size
# that happens to still be smaller than the buffer size.
drop_candidate = False
if self._header.reserved != 0:
print_func = _logger.warning if self._warn_on_error == self.WarnOnError.ALL else _logger.debug
print_func('Message payload too big. [payload_size=%d B, max=%d B]',
self._header.payload_size_bytes, self._max_payload_len_bytes)
print_func('Reserved bytes nonzero. Dropping suspected invalid sync. [type=%s, payload_size=%d B, '
'max=%d B]' %
(self._header.get_type_string(), self._header.payload_size_bytes,
self._max_payload_len_bytes))
drop_candidate = True
# If the message is too large to fit in the buffer, we cannot parse it.
#
# If this is an invalid sync, the parsed (invalid) payload length may exceed the buffer size and the
# invalid header will be dropped. If it does not exceed the buffer size, it'll get caught later during
# the CRC check.
elif self._header.payload_size_bytes > self._max_payload_len_bytes:
print_func = _logger.warning if self._warn_on_error == self.WarnOnError.ALL else _logger.debug
print_func('Message payload too big. [type=%s, payload_size=%d B, max=%d B]' %
(self._header.get_type_string(), self._header.payload_size_bytes,
self._max_payload_len_bytes))
drop_candidate = True

if drop_candidate:
self._header = None
self._buffer.pop(0)
self._bytes_processed += 1
Expand Down
Loading
Loading