From 8e23b737b8a24976a816a3a9f9e0ad88cb2f763c Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Sun, 30 Jun 2024 11:53:14 -0700 Subject: [PATCH] Batch updates to not overwhelm web socket --- .../C/src/synchrophasors/Synchrophasors.html | 16 ++-- .../C/src/synchrophasors/Synchrophasors.lf | 81 ++++++++++++------- 2 files changed, 56 insertions(+), 41 deletions(-) diff --git a/examples/C/src/synchrophasors/Synchrophasors.html b/examples/C/src/synchrophasors/Synchrophasors.html index ee20d993..08846703 100644 --- a/examples/C/src/synchrophasors/Synchrophasors.html +++ b/examples/C/src/synchrophasors/Synchrophasors.html @@ -63,16 +63,12 @@

Synchrophasors

try { // console.log('"' + message + '"'); var data = JSON.parse(message); - chart.data.datasets[0] = { - label: 'real part', - data: data.real, - borderWidth: 1 - }; - chart.data.datasets[1] = { - label: 'imaginary part', - data: data.imaginary, - borderWidth: 1 - }; + for (let i = 0; i < data.length; i++) { + if (data[i][0] < chart.data.datasets[0].data.length) { // Safety check. + chart.data.datasets[0].data[data[i][0]] = data[i][1][0] // Real part. + chart.data.datasets[1].data[data[i][0]] = data[i][1][1] // Imaginary part. + } + } chart.update(); } catch(error) { console.error(error); diff --git a/examples/C/src/synchrophasors/Synchrophasors.lf b/examples/C/src/synchrophasors/Synchrophasors.lf index 72a60112..1e2e851a 100644 --- a/examples/C/src/synchrophasors/Synchrophasors.lf +++ b/examples/C/src/synchrophasors/Synchrophasors.lf @@ -82,47 +82,66 @@ reactor PhaseMeasurementUnit( =} } +/** + * Upon receiving inputs on the `phasors` port, construct a JSON string to + * convey the update via a web socket to the observing web page, if one is connected. + * The array has the form `[[channel, [real, imag]], ...]`, where `channel` is the index of the + * multiport input providing new data, and `real` and `imag` are the real and imaginary + * parts of the data. The size of the array is the number of present inputs. + * + * To avoid overwhelming the web socket communication, this reactor accumulates all inputs + * that arrive during a period of physical time given by the `period` parameter before sending + * any data out over the web socket. This can result in some data points being updated more than + * once in a single message. It will also send out data whenever the local buffer fills up. + */ reactor Observer( - // Number of inputs - n: int = 2) { + n: int = 2, // Number of inputs + period: time = 100 ms +) { input[n] phasors: timestamped_complex_t state connected: bool = false + state json: char* = {= NULL =} + state p: char* = {= NULL =} + state last_sent: time = 0 w = new WebSocketServerString( initial_file = {= LF_SOURCE_DIRECTORY LF_FILE_SEPARATOR "Synchrophasors.html" =}) reaction(phasors) -> w.in_dynamic {= - char* json = (char*)malloc(sizeof(char) * (30 * (10 * self->n))); - char* p = json; // pointer to next position to write. - snprintf(p, 10, "{\"real\":["); - p += 9; - for (int n = 0; n < self->n; n++) { - if (n != 0) { - *p++ = ','; - *p++ = ' '; - } - long int height = lround(phasors[n]->value.phasor.real * 100); - int len = snprintf(p, 5, "%3ld", height); - if (len > 0) p += len; // Excluding trailing null terminator. - } - snprintf(p, 16, "],\"imaginary\":["); - p += 15; - for (int n = 0; n < self->n; n++) { - if (n != 0) { - *p++ = ','; - *p++ = ' '; - } - long int height = lround(phasors[n]->value.phasor.imaginary * 100); - int len = snprintf(p, 5, "%3ld", height); - if (len > 0) p += len; // Excluding trailing null terminator. - } - *p++ = ']'; - *p++ = '}'; - *p = '\0'; if (self->connected) { - lf_set(w.in_dynamic, json); + for (int n = 0; n < self->n; n++) { + if (phasors[n]->is_present) { + if (self->json == NULL) { + // Construct array big enough to hold the maximum possible size string, which + // occurs when all inputs are present. The fact that this buffer always has the + // same size should help prevent memory fragmentation because malloc will repeatedly + // allocate the same memory once it has been processed and freed by the + // WebSocketServerString reactor. + self->json = (char*)malloc(sizeof(char) * (3 + (20 * self->n))); + self->p = self->json; // pointer to next position to write. + *self->p++ = '['; + } else { + *self->p++ = ','; + } + long int real = lround(phasors[n]->value.phasor.real * 100); + long int imag = lround(phasors[n]->value.phasor.imaginary * 100); + int len = snprintf(self->p, 18, "[%3d,[%4ld,%4ld]]", n, real, imag); + if (len > 0) self->p += len; // Excluding trailing null terminator. + + interval_t now = lf_time_physical_elapsed(); + if (self->p - self->json > 2 + 20 * (self->n - 1) // Not enough room for another entry + || now - self->last_sent >= self->period // Enough physical time has elapsed. + ) { + self->last_sent = now; + *self->p++ = ']'; + *self->p++ = '\0'; + lf_set(w.in_dynamic, self->json); + // lf_print("%s", self->json); + self->json = NULL; // Mark to reallocate on next arrival. + } + } + } } - // lf_print("%s", json); =} reaction(w.connected) {=