Skip to content

Commit

Permalink
Improve docs and add relative_timestamp option
Browse files Browse the repository at this point in the history
  • Loading branch information
edwardalee committed Dec 19, 2024
1 parent e170f05 commit f3a809f
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 32 deletions.
4 changes: 2 additions & 2 deletions examples/C/src/mqtt/MQTTDistributed.lf
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
* message. You can change the `use_physical_time` parameter of the `MQTTSubscriber` to `true` to
* get a (nondeterministic) physical connection, similar to `MQTTPhysical`.
*
* The code generator produces three programs, bin/MQTTDistributed_RTI, bin/MQTTDistributed_source,
* and bin/MQTTDistributed_destination, plus a script bin/MQTTDistributed that runs all three.
* The code generator produces two programs, bin/MQTTDistributed_source, and
* bin/MQTTDistributed_destination, plus a script bin/MQTTDistributed that runs all three.
*
* If the source and destination are running in the same machine, there is no clock synchronization
* error.
Expand Down
13 changes: 10 additions & 3 deletions examples/C/src/mqtt/lib/MQTTPublisher.lf
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,21 @@
*
* 1. The publisher ensures that the message is null terminated by adding a null terminator if
* needed. This ensures that the message can be treated as a string at the receiving end. 2. The
* publisher appends to the end of the message the current logical time at which the publishing
* occurs.
* publisher appends to the end of the message (after the null terminator) a magic string "LFts"
* followed by the current logical time at which the publishing occurs. If the `relative_timestamp`
* parameter is true (the default is `false`), then the logical time is relative to the start time
* of the program.
*
* This can be useful if the receiving end will be an instance of `MQTTSubscriber` in another Lingua
* Franca program. Note that `include_timestamp` *must* be true if an `MQTTSubcriber` that
* subscribes to this topic has its `use_physical_time` parameter set to false (its default is
* `true`). Otherwise, the subscriber will issue a warning.
*
* @param topic The topic name to which to publish.
* @param address The IP address of the MQTT broker.
* @param include_timestamp If true, then append the current logical time to the message.
* @param relative_timestamp If true, then the timestamp is relative to the start time of the
* program.
* @param timeout Timeout for completion of message sending in milliseconds.
* @see MQTTSubscriber.
*
Expand Down Expand Up @@ -55,6 +61,7 @@ reactor MQTTPublisher(
topic: string = "DefaultTopic",
address: string = "tcp://localhost:1883",
include_timestamp: bool = false,
relative_timestamp: bool = false,
timeout: time = 10 sec) {
preamble {=
// Count of instances of this reactor so that unique client IDs are generated.
Expand Down Expand Up @@ -184,7 +191,7 @@ reactor MQTTPublisher(
}

// Append the current timestamp to the message.
instant_t timestamp = lf_time_logical();
instant_t timestamp = (self->relative_timestamp) ? lf_time_logical_elapsed() : lf_time_logical();
encode_int64(timestamp,
(unsigned char*)(self->inflight.message + length - sizeof(instant_t))
);
Expand Down
67 changes: 40 additions & 27 deletions examples/C/src/mqtt/lib/MQTTSubscriber.lf
Original file line number Diff line number Diff line change
@@ -1,35 +1,36 @@
/**
* Reactor that subscribes to a specified MQTT topic on which string messages are published. The
* timestamp of the output will depend on the use_physical_time parameter and (if present) the
* timestamp of the output will depend on the `use_physical_time` parameter and (if present) the
* timestamp carried by the incoming message.
*
* If `use_physical_time` is `tru`e (the default), then this reactor uses the current physical time
* If `use_physical_time` is `true` (the default), then this reactor uses the current physical time
* when the subscription notification arrives, plus the `offset`, as the desired output timestamp.
* If the incoming message is carrying a timestamp (the publisher is an instance of `MQTTPublisher`
* with `include_timestamp` set to `true), then this reactor measures the *apparent latency* (the
* physical time of arrival minus the timestamp in the message). At shutdown, this reactor will
* report that maximum and average apparent latencies.
* with `include_timestamp` set to `true`), and if `relative_timestamp` is false, then this reactor
* measures the *apparent latency* (the physical time of arrival minus the timestamp in the
* message). At shutdown, this reactor will report that maximum and average apparent latencies.
*
* If `use_physical_time` is `false`, then this reactor extracts the publisher's timestamp from the
* message and adds the specified offset to get the desired output timestamp. If there is no
* timestamp on the incoming message, then this prints a warning and uses physical time. If the
* received timestamp equals current logical time, then a microstep is added. If the desired output
* timestamp is in the past, then a warning will be printed and the tag of the message will be one
* microstep later than the current tag when it arrives.
* message and adds the specified offset to get the desired output timestamp. If
* `relative_timestamp` is true (the default is `false`), then the timestamp is assumed to be
* relative to the start time of the program. Note that the start time of the sending program may
* not match the start time of this program, but this parameter can make it easier to synchronize
* the two programs. If there is no timestamp on the incoming message, then this prints a warning
* and uses physical time. If the resulting desired timestamp equals current logical time, then a
* microstep is added. If the desired output timestamp is in the past, then a warning will be
* printed and the tag of the message will be one microstep later than the current tag when it
* arrives.
*
* Note that if the publisher and subscriber are both Lingua Franca programs, then the communication
* behaves a physical connection if `use_physical_time` is true (the default). The offset is
* equivalent to an `after` delay.
*
* If `use_physical_time` is false, then the communication attempts to behave like a logical
* connection, but this is not always possible. Logical time can advance between when the publisher
* launches a message, sending it to the MQTT broker, and when the subscriber receives it. This may
* make it impossible to match the desired timestamp and will result in warning messages being
* printed.
* behaves like a physical connection if `use_physical_time` is true (the default). The offset is
* then equivalent to an `after` delay. By setting `use_physical_time` to false, you can get more
* control over the synchronization of the two programs.
*
* @param address The IP address of the MQTT broker.
* @param topic The topic name to which to subscribe.
* @param use_physical_time If true, then use physical time (the default).
* @param relative_timestamp If true, then the timestamp is relative to the start time of the
* program.
* @param offset The offset to add to the publisher's timestamp.
* @see MQTTPublisher.
*
Expand Down Expand Up @@ -61,6 +62,7 @@ preamble {=
environment_t* environment;
interval_t offset;
bool use_physical_time;
bool relative_timestamp;
interval_t latencies; // Sum of all observed latencies.
interval_t max_latency;
size_t count;
Expand All @@ -73,6 +75,7 @@ reactor MQTTSubscriber(
address: string = "tcp://localhost:1883",
topic: string = "DefaultTopic",
use_physical_time: bool = true,
relative_timestamp: bool = false,
offset: time = 0) {
preamble {=
// Count of instances of this reactor so that unique client IDs are generated.
Expand All @@ -90,6 +93,8 @@ reactor MQTTSubscriber(
int topic_length, // If 0, strlen(topic_name) can be trusted.
MQTTClient_message *message
) {
instant_t physical_time = lf_time_physical();

// FIXME: This is assuming that the message string
// and topic_name are null terminated. What if they aren't?
// Perhaps force them to be?
Expand All @@ -116,24 +121,32 @@ reactor MQTTSubscriber(
// Is the magic string present?
&& memcmp("LFts", &message->payload[message->payloadlen - sizeof(instant_t) - 4], 4) == 0
) {
my_info->count++;

instant_t timestamp = extract_int64(
(unsigned char*)message->payload + message->payloadlen - sizeof(instant_t)
);
instant_t physical_time = lf_time_physical();

interval_t latency = physical_time - timestamp;
my_info->latencies += latency;
// Measure the latency, unless the timestamp is relative to the start time. In that case,
// it is not a latency.
if (!my_info->relative_timestamp) {
my_info->count++;

if (latency > my_info->max_latency) {
my_info->max_latency = latency;
interval_t latency = physical_time - timestamp;
my_info->latencies += latency;

if (latency > my_info->max_latency) {
my_info->max_latency = latency;
}
}

if (my_info->use_physical_time) {
// Use physical time.
delay = physical_time + offset - current_time;
} else {
// Use logical time.
// If the timestamp is relative to the start time, then we need to add the start time.
if (my_info->relative_timestamp) {
timestamp += lf_time_start();
}
delay = timestamp + offset - current_time;
}
// Schedule the event.
Expand All @@ -150,7 +163,6 @@ reactor MQTTSubscriber(
lf_print_warning("MQTTSubscriber: Received message with no timestamp!");
}
// Use physical time.
instant_t physical_time = lf_time_physical();
delay = physical_time + offset - current_time;

// Schedule the event.
Expand Down Expand Up @@ -204,7 +216,7 @@ reactor MQTTSubscriber(
state client: MQTTClient = {= NULL =}

/** Struct containing the action and offset. */
state info: MQTTSubscriber_info_t = {= {NULL, NULL, 0LL, false, 0LL, 0LL, 0} =}
state info: MQTTSubscriber_info_t = {= {NULL, NULL, 0LL, false, false, 0LL, 0LL, 0} =}

reaction(startup) -> act {=
int rc; // response code.
Expand Down Expand Up @@ -232,6 +244,7 @@ reactor MQTTSubscriber(
self->info.environment = self->base.environment;
self->info.offset = self->offset;
self->info.use_physical_time = self->use_physical_time;
self->info.relative_timestamp = self->relative_timestamp;

// Set up callback functions.
// Last argument should be a pointer to a function to
Expand Down

0 comments on commit f3a809f

Please sign in to comment.