Skip to content

Commit

Permalink
Batch updates to not overwhelm web socket
Browse files Browse the repository at this point in the history
  • Loading branch information
edwardalee committed Jun 30, 2024
1 parent d6d2057 commit 8e23b73
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 41 deletions.
16 changes: 6 additions & 10 deletions examples/C/src/synchrophasors/Synchrophasors.html
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,12 @@ <h1>Synchrophasors</h1>
try {
// console.log('"' + message + '"');
var data = JSON.parse(message);
chart.data.datasets[0] = {
label: 'real part',
data: data.real,
borderWidth: 1
};
chart.data.datasets[1] = {
label: 'imaginary part',
data: data.imaginary,
borderWidth: 1
};
for (let i = 0; i < data.length; i++) {
if (data[i][0] < chart.data.datasets[0].data.length) { // Safety check.
chart.data.datasets[0].data[data[i][0]] = data[i][1][0] // Real part.
chart.data.datasets[1].data[data[i][0]] = data[i][1][1] // Imaginary part.
}
}
chart.update();
} catch(error) {
console.error(error);
Expand Down
81 changes: 50 additions & 31 deletions examples/C/src/synchrophasors/Synchrophasors.lf
Original file line number Diff line number Diff line change
Expand Up @@ -82,47 +82,66 @@ reactor PhaseMeasurementUnit(
=}
}

/**
* Upon receiving inputs on the `phasors` port, construct a JSON string to
* convey the update via a web socket to the observing web page, if one is connected.
* The array has the form `[[channel, [real, imag]], ...]`, where `channel` is the index of the
* multiport input providing new data, and `real` and `imag` are the real and imaginary
* parts of the data. The size of the array is the number of present inputs.
*
* To avoid overwhelming the web socket communication, this reactor accumulates all inputs
* that arrive during a period of physical time given by the `period` parameter before sending
* any data out over the web socket. This can result in some data points being updated more than
* once in a single message. It will also send out data whenever the local buffer fills up.
*/
reactor Observer(
// Number of inputs
n: int = 2) {
n: int = 2, // Number of inputs
period: time = 100 ms
) {
input[n] phasors: timestamped_complex_t
state connected: bool = false
state json: char* = {= NULL =}
state p: char* = {= NULL =}
state last_sent: time = 0

w = new WebSocketServerString(
initial_file = {= LF_SOURCE_DIRECTORY LF_FILE_SEPARATOR "Synchrophasors.html" =})

reaction(phasors) -> w.in_dynamic {=
char* json = (char*)malloc(sizeof(char) * (30 * (10 * self->n)));
char* p = json; // pointer to next position to write.
snprintf(p, 10, "{\"real\":[");
p += 9;
for (int n = 0; n < self->n; n++) {
if (n != 0) {
*p++ = ',';
*p++ = ' ';
}
long int height = lround(phasors[n]->value.phasor.real * 100);
int len = snprintf(p, 5, "%3ld", height);
if (len > 0) p += len; // Excluding trailing null terminator.
}
snprintf(p, 16, "],\"imaginary\":[");
p += 15;
for (int n = 0; n < self->n; n++) {
if (n != 0) {
*p++ = ',';
*p++ = ' ';
}
long int height = lround(phasors[n]->value.phasor.imaginary * 100);
int len = snprintf(p, 5, "%3ld", height);
if (len > 0) p += len; // Excluding trailing null terminator.
}
*p++ = ']';
*p++ = '}';
*p = '\0';
if (self->connected) {
lf_set(w.in_dynamic, json);
for (int n = 0; n < self->n; n++) {
if (phasors[n]->is_present) {
if (self->json == NULL) {
// Construct array big enough to hold the maximum possible size string, which
// occurs when all inputs are present. The fact that this buffer always has the
// same size should help prevent memory fragmentation because malloc will repeatedly
// allocate the same memory once it has been processed and freed by the
// WebSocketServerString reactor.
self->json = (char*)malloc(sizeof(char) * (3 + (20 * self->n)));
self->p = self->json; // pointer to next position to write.
*self->p++ = '[';
} else {
*self->p++ = ',';
}
long int real = lround(phasors[n]->value.phasor.real * 100);
long int imag = lround(phasors[n]->value.phasor.imaginary * 100);
int len = snprintf(self->p, 18, "[%3d,[%4ld,%4ld]]", n, real, imag);
if (len > 0) self->p += len; // Excluding trailing null terminator.

interval_t now = lf_time_physical_elapsed();
if (self->p - self->json > 2 + 20 * (self->n - 1) // Not enough room for another entry
|| now - self->last_sent >= self->period // Enough physical time has elapsed.
) {
self->last_sent = now;
*self->p++ = ']';
*self->p++ = '\0';
lf_set(w.in_dynamic, self->json);
// lf_print("%s", self->json);
self->json = NULL; // Mark to reallocate on next arrival.
}
}
}
}
// lf_print("%s", json);
=}

reaction(w.connected) {=
Expand Down

0 comments on commit 8e23b73

Please sign in to comment.