diff --git a/examples/C/src/mqtt/MQTTDistributed.lf b/examples/C/src/mqtt/MQTTDistributed.lf index bdfb90b4..6f74245a 100644 --- a/examples/C/src/mqtt/MQTTDistributed.lf +++ b/examples/C/src/mqtt/MQTTDistributed.lf @@ -9,8 +9,8 @@ * message. You can change the `use_physical_time` parameter of the `MQTTSubscriber` to `true` to * get a (nondeterministic) physical connection, similar to `MQTTPhysical`. * - * The code generator produces three programs, bin/MQTTDistributed_RTI, bin/MQTTDistributed_source, - * and bin/MQTTDistributed_destination, plus a script bin/MQTTDistributed that runs all three. + * The code generator produces two programs, bin/MQTTDistributed_source, and + * bin/MQTTDistributed_destination, plus a script bin/MQTTDistributed that runs all three. * * If the source and destination are running in the same machine, there is no clock synchronization * error. diff --git a/examples/C/src/mqtt/lib/MQTTPublisher.lf b/examples/C/src/mqtt/lib/MQTTPublisher.lf index 9d3ab882..cc46b7b7 100644 --- a/examples/C/src/mqtt/lib/MQTTPublisher.lf +++ b/examples/C/src/mqtt/lib/MQTTPublisher.lf @@ -10,15 +10,21 @@ * * 1. The publisher ensures that the message is null terminated by adding a null terminator if * needed. This ensures that the message can be treated as a string at the receiving end. 2. The - * publisher appends to the end of the message the current logical time at which the publishing - * occurs. + * publisher appends to the end of the message (after the null terminator) a magic string "LFts" + * followed by the current logical time at which the publishing occurs. If the `relative_timestamp` + * parameter is true (the default is `false`), then the logical time is relative to the start time + * of the program. * * This can be useful if the receiving end will be an instance of `MQTTSubscriber` in another Lingua * Franca program. Note that `include_timestamp` *must* be true if an `MQTTSubcriber` that * subscribes to this topic has its `use_physical_time` parameter set to false (its default is * `true`). Otherwise, the subscriber will issue a warning. * + * @param topic The topic name to which to publish. * @param address The IP address of the MQTT broker. + * @param include_timestamp If true, then append the current logical time to the message. + * @param relative_timestamp If true, then the timestamp is relative to the start time of the + * program. * @param timeout Timeout for completion of message sending in milliseconds. * @see MQTTSubscriber. * @@ -55,6 +61,7 @@ reactor MQTTPublisher( topic: string = "DefaultTopic", address: string = "tcp://localhost:1883", include_timestamp: bool = false, + relative_timestamp: bool = false, timeout: time = 10 sec) { preamble {= // Count of instances of this reactor so that unique client IDs are generated. @@ -184,7 +191,7 @@ reactor MQTTPublisher( } // Append the current timestamp to the message. - instant_t timestamp = lf_time_logical(); + instant_t timestamp = (self->relative_timestamp) ? lf_time_logical_elapsed() : lf_time_logical(); encode_int64(timestamp, (unsigned char*)(self->inflight.message + length - sizeof(instant_t)) ); diff --git a/examples/C/src/mqtt/lib/MQTTSubscriber.lf b/examples/C/src/mqtt/lib/MQTTSubscriber.lf index b55d79ce..28782f37 100644 --- a/examples/C/src/mqtt/lib/MQTTSubscriber.lf +++ b/examples/C/src/mqtt/lib/MQTTSubscriber.lf @@ -1,35 +1,36 @@ /** * Reactor that subscribes to a specified MQTT topic on which string messages are published. The - * timestamp of the output will depend on the use_physical_time parameter and (if present) the + * timestamp of the output will depend on the `use_physical_time` parameter and (if present) the * timestamp carried by the incoming message. * - * If `use_physical_time` is `tru`e (the default), then this reactor uses the current physical time + * If `use_physical_time` is `true` (the default), then this reactor uses the current physical time * when the subscription notification arrives, plus the `offset`, as the desired output timestamp. * If the incoming message is carrying a timestamp (the publisher is an instance of `MQTTPublisher` - * with `include_timestamp` set to `true), then this reactor measures the *apparent latency* (the - * physical time of arrival minus the timestamp in the message). At shutdown, this reactor will - * report that maximum and average apparent latencies. + * with `include_timestamp` set to `true`), and if `relative_timestamp` is false, then this reactor + * measures the *apparent latency* (the physical time of arrival minus the timestamp in the + * message). At shutdown, this reactor will report that maximum and average apparent latencies. * * If `use_physical_time` is `false`, then this reactor extracts the publisher's timestamp from the - * message and adds the specified offset to get the desired output timestamp. If there is no - * timestamp on the incoming message, then this prints a warning and uses physical time. If the - * received timestamp equals current logical time, then a microstep is added. If the desired output - * timestamp is in the past, then a warning will be printed and the tag of the message will be one - * microstep later than the current tag when it arrives. + * message and adds the specified offset to get the desired output timestamp. If + * `relative_timestamp` is true (the default is `false`), then the timestamp is assumed to be + * relative to the start time of the program. Note that the start time of the sending program may + * not match the start time of this program, but this parameter can make it easier to synchronize + * the two programs. If there is no timestamp on the incoming message, then this prints a warning + * and uses physical time. If the resulting desired timestamp equals current logical time, then a + * microstep is added. If the desired output timestamp is in the past, then a warning will be + * printed and the tag of the message will be one microstep later than the current tag when it + * arrives. * * Note that if the publisher and subscriber are both Lingua Franca programs, then the communication - * behaves a physical connection if `use_physical_time` is true (the default). The offset is - * equivalent to an `after` delay. - * - * If `use_physical_time` is false, then the communication attempts to behave like a logical - * connection, but this is not always possible. Logical time can advance between when the publisher - * launches a message, sending it to the MQTT broker, and when the subscriber receives it. This may - * make it impossible to match the desired timestamp and will result in warning messages being - * printed. + * behaves like a physical connection if `use_physical_time` is true (the default). The offset is + * then equivalent to an `after` delay. By setting `use_physical_time` to false, you can get more + * control over the synchronization of the two programs. * * @param address The IP address of the MQTT broker. * @param topic The topic name to which to subscribe. * @param use_physical_time If true, then use physical time (the default). + * @param relative_timestamp If true, then the timestamp is relative to the start time of the + * program. * @param offset The offset to add to the publisher's timestamp. * @see MQTTPublisher. * @@ -61,6 +62,7 @@ preamble {= environment_t* environment; interval_t offset; bool use_physical_time; + bool relative_timestamp; interval_t latencies; // Sum of all observed latencies. interval_t max_latency; size_t count; @@ -73,6 +75,7 @@ reactor MQTTSubscriber( address: string = "tcp://localhost:1883", topic: string = "DefaultTopic", use_physical_time: bool = true, + relative_timestamp: bool = false, offset: time = 0) { preamble {= // Count of instances of this reactor so that unique client IDs are generated. @@ -90,6 +93,8 @@ reactor MQTTSubscriber( int topic_length, // If 0, strlen(topic_name) can be trusted. MQTTClient_message *message ) { + instant_t physical_time = lf_time_physical(); + // FIXME: This is assuming that the message string // and topic_name are null terminated. What if they aren't? // Perhaps force them to be? @@ -116,24 +121,32 @@ reactor MQTTSubscriber( // Is the magic string present? && memcmp("LFts", &message->payload[message->payloadlen - sizeof(instant_t) - 4], 4) == 0 ) { - my_info->count++; - instant_t timestamp = extract_int64( (unsigned char*)message->payload + message->payloadlen - sizeof(instant_t) ); - instant_t physical_time = lf_time_physical(); - interval_t latency = physical_time - timestamp; - my_info->latencies += latency; + // Measure the latency, unless the timestamp is relative to the start time. In that case, + // it is not a latency. + if (!my_info->relative_timestamp) { + my_info->count++; - if (latency > my_info->max_latency) { - my_info->max_latency = latency; + interval_t latency = physical_time - timestamp; + my_info->latencies += latency; + + if (latency > my_info->max_latency) { + my_info->max_latency = latency; + } } + if (my_info->use_physical_time) { // Use physical time. delay = physical_time + offset - current_time; } else { // Use logical time. + // If the timestamp is relative to the start time, then we need to add the start time. + if (my_info->relative_timestamp) { + timestamp += lf_time_start(); + } delay = timestamp + offset - current_time; } // Schedule the event. @@ -150,7 +163,6 @@ reactor MQTTSubscriber( lf_print_warning("MQTTSubscriber: Received message with no timestamp!"); } // Use physical time. - instant_t physical_time = lf_time_physical(); delay = physical_time + offset - current_time; // Schedule the event. @@ -204,7 +216,7 @@ reactor MQTTSubscriber( state client: MQTTClient = {= NULL =} /** Struct containing the action and offset. */ - state info: MQTTSubscriber_info_t = {= {NULL, NULL, 0LL, false, 0LL, 0LL, 0} =} + state info: MQTTSubscriber_info_t = {= {NULL, NULL, 0LL, false, false, 0LL, 0LL, 0} =} reaction(startup) -> act {= int rc; // response code. @@ -232,6 +244,7 @@ reactor MQTTSubscriber( self->info.environment = self->base.environment; self->info.offset = self->offset; self->info.use_physical_time = self->use_physical_time; + self->info.relative_timestamp = self->relative_timestamp; // Set up callback functions. // Last argument should be a pointer to a function to