From d4772c79300bea9a59d1d3a8c88d52b9afed9b10 Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Tue, 24 Sep 2024 20:52:40 +0530 Subject: [PATCH 01/41] util: always clear timeouts on errors --- src/commons/util.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/commons/util.js b/src/commons/util.js index 05cbafdb23..95517150c1 100644 --- a/src/commons/util.js +++ b/src/commons/util.js @@ -142,6 +142,7 @@ export function timedOp(op, ms, cleanup = () => {}) { } }); } catch (e) { + clearTimeout(tid); if (!timedout) reject(e); } }); @@ -180,6 +181,7 @@ export function timedSafeAsyncOp(promisedOp, ms, defaultOp) { } }) .catch((ignored) => { + clearTimeout(tid); if (!timedout) deferredOp(); // else: handled by timeout }); From bf96425b37132143f8a85b176516a12132b47a89 Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Tue, 24 Sep 2024 20:54:38 +0530 Subject: [PATCH 02/41] log: rmv log timers post arb delay (2m) to avoid leak --- src/commons/util.js | 7 +++++++ src/core/log.js | 28 +++++++++++++++++++++------- 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/src/commons/util.js b/src/commons/util.js index 95517150c1..405baf2f18 100644 --- a/src/commons/util.js +++ b/src/commons/util.js @@ -519,6 +519,13 @@ export function stub(...args) { }; } +export function stubr(r, ...args) { + return (...args) => { + /* no-op */ + return r; + }; +} + export function stubAsync(...args) { return async (...args) => { /* no-op */ diff --git a/src/core/log.js b/src/core/log.js index 7a22b580a8..12d8f32ee7 100644 --- a/src/core/log.js +++ b/src/core/log.js @@ -9,7 +9,7 @@ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ -import { uid, stub } from "../commons/util.js"; +import { uid, stub, stubr } from "../commons/util.js"; /** * @typedef {'error'|'logpush'|'warn'|'info'|'timer'|'debug'} LogLevels @@ -83,8 +83,8 @@ export default class Log { this.d = stub(); this.debug = stub(); this.lapTime = stub(); - this.startTime = stub(); - this.endTime = stub(); + this.startTime = stubr(""); + this.endTime = stubr(false); this.i = stub(); this.info = stub(); this.w = stub(); @@ -97,16 +97,22 @@ export default class Log { const that = this; return { lapTime: (n, ...r) => { + // returns void return that.lapTime(n, ...tags, ...r); }, startTime: (n, ...r) => { const tid = that.startTime(n); that.d(that.now() + " T", ...tags, "create", tid, ...r); + const tim = setTimeout(() => { + that.endTime(tid); + }, /* 2mins*/ 2 * 60 * 1000); + if (typeof tim.unref === "function") tim.unref(); return tid; }, endTime: (n, ...r) => { - that.d(that.now() + " T", ...tags, "end", n, ...r); - return that.endTime(n); + if (that.endTime(n)) { + that.d(that.now() + " T", ...tags, "end", n, ...r); + } // else: already ended or invalid timer }, d: (...args) => { that.d(that.now() + " D", ...tags, ...args); @@ -163,13 +169,21 @@ export default class Log { this.d = console.debug; this.debug = console.debug; case "timer": - this.lapTime = console.timeLog || stub(); // Stubbing required for Fastly as they do not currently support this method. + // stub() for Fastly as it does not support console timers. + this.lapTime = console.timeLog || stub(); this.startTime = function (name) { name = uid(name); if (console.time) console.time(name); return name; }; - this.endTime = console.timeEnd || stub(); // Stubbing required for Fastly as they do not currently support this method. + // stub() for Fastly as it does not support console timers. + this.endTime = function (uid) { + try { + if (console.timeEnd) console.timeEnd(uid); + return true; + } catch (ignore) {} + return false; + }; case "info": this.i = console.info; this.info = console.info; From b4c13cbcd2d34651130fbe570237c6c397d03f4a Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Tue, 24 Sep 2024 20:56:36 +0530 Subject: [PATCH 03/41] node: add on demand heap measurements --- src/server-node.js | 48 ++++++++++++++++------------------------------ 1 file changed, 16 insertions(+), 32 deletions(-) diff --git a/src/server-node.js b/src/server-node.js index 379cdc3473..c47cb307f4 100644 --- a/src/server-node.js +++ b/src/server-node.js @@ -24,9 +24,6 @@ import * as util from "./commons/util.js"; import "./core/node/config.js"; import { finished } from "node:stream"; import * as nodecrypto from "./commons/crypto.js"; -// webpack can't handle node-bindings, a dependency of node-memwatch -// github.com/webpack/webpack/issues/16029 -// import * as memwatch from "@airbnb/node-memwatch"; /** * @typedef {net.Socket} Socket @@ -50,6 +47,7 @@ class Stats { this.nofconns = 0; this.openconns = 0; this.noftimeouts = 0; + this.nofheapsnaps = 0; // avg1, avg5, avg15, adj, maxconns this.bp = [0, 0, 0, 0, 0]; } @@ -169,9 +167,8 @@ const tracker = new Tracker(); const stats = new Stats(); const cpucount = os.cpus().length || 1; const adjPeriodSec = 5; +const maxHeapSnaps = 10; let adjTimer = null; -/** @type {memwatch.HeapDiff} */ -let heapdiff = null; ((main) => { // listen for "go" and start the server @@ -340,7 +337,6 @@ function systemUp() { trapServerEvents(hcheck); }); - // if (envutil.measureHeap()) heapdiff = new memwatch.HeapDiff(); heartbeat(); } @@ -1061,6 +1057,7 @@ function trapRequestResponseEvents(req, res) { } function heartbeat() { + const isNode = envutil.isNode(); const maxc = envutil.maxconns(); const minc = envutil.minconns(); const measureHeap = envutil.measureHeap(); @@ -1068,18 +1065,19 @@ function heartbeat() { // increment no of requests stats.noreqs += 1; - if (!measureHeap) { - endHeapDiffIfNeeded(heapdiff); - heapdiff = null; - } else if (heapdiff == null) { - // heapdiff = new memwatch.HeapDiff(); - } else if (stats.noreqs % (maxc * 10) === 0) { - endHeapDiffIfNeeded(heapdiff); - // heapdiff = new memwatch.HeapDiff(); - } if (stats.noreqs % (minc * 2) === 0) { log.i(stats.str(), "in", (uptime() / 60000) | 0, "mins"); } + + if (measureHeap && stats.noreqs !== 0 && stats.noreqs % (maxc * 10) === 0) { + if (isNode && stats.nofheapsnaps < maxHeapSnaps) { + stats.nofheapsnaps += 1; + const nom = + "snap" + stats.nofheapsnaps + "." + stats.noreqs + ".heapsnapshot"; + log.i("heap snapshot #", stats.nofheapsnaps, nom); + v8.writeHeapSnapshot(nom); + } + } } function adjustMaxConns(n) { @@ -1160,27 +1158,13 @@ function adjustMaxConns(n) { } } -/** - * @param {memwatch.HeapDiff} h - * @returns void - */ -function endHeapDiffIfNeeded(h) { - // disabled; memwatch is not bundled due to a webpack bug - if (!h || true) return; - try { - const diff = h.end(); - log.i("heap before", diff.before); - log.i("heap after", diff.after); - log.i("heap details", diff.change.details); - } catch (ex) { - log.w("heap-diff err", ex.message); - } -} - function bye() { // in some cases, node stops listening but the process doesn't exit because // of other unreleased resources (see: svc.js#systemStop); and so exit with // success (exit code 0) regardless; ref: community.fly.io/t/4547/6 console.warn("W game over"); + + if (envutil.isNode()) v8.writeHeapSnapshot("snap.end.heapsnapshot"); + process.exit(0); } From 051b8d2557d614ad5a8d4184e41e39464d3a7cd3 Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Tue, 24 Sep 2024 21:59:40 +0530 Subject: [PATCH 04/41] env: measure heap in prod for select Fly regions --- src/commons/envutil.js | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/commons/envutil.js b/src/commons/envutil.js index 834fb3c31e..f308a5128d 100644 --- a/src/commons/envutil.js +++ b/src/commons/envutil.js @@ -227,12 +227,10 @@ export function shutdownTimeoutMs() { } export function measureHeap() { - // disable; webpack can't bundle memwatch; see: server-node.js - return false; if (!envManager) return false; const reg = region(); if ( - reg === "maa" || + reg === "bom" || reg === "sin" || reg === "fra" || reg === "ams" || From 9e0b52bd9f21d03cc58be60bd3e9005478492bb1 Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Tue, 24 Sep 2024 22:00:07 +0530 Subject: [PATCH 05/41] node: incr total heapsnapshots from 10 to 20 --- src/server-node.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server-node.js b/src/server-node.js index c47cb307f4..92c5244976 100644 --- a/src/server-node.js +++ b/src/server-node.js @@ -167,7 +167,7 @@ const tracker = new Tracker(); const stats = new Stats(); const cpucount = os.cpus().length || 1; const adjPeriodSec = 5; -const maxHeapSnaps = 10; +const maxHeapSnaps = 20; let adjTimer = null; ((main) => { From 356beac548c37c3192bab0807e7ff36601ee44c2 Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Tue, 24 Sep 2024 22:01:05 +0530 Subject: [PATCH 06/41] node: measure heap on low ram --- src/server-node.js | 44 ++++++++++++++++++++++++-------------------- 1 file changed, 24 insertions(+), 20 deletions(-) diff --git a/src/server-node.js b/src/server-node.js index 92c5244976..301141c9f1 100644 --- a/src/server-node.js +++ b/src/server-node.js @@ -1057,10 +1057,7 @@ function trapRequestResponseEvents(req, res) { } function heartbeat() { - const isNode = envutil.isNode(); - const maxc = envutil.maxconns(); const minc = envutil.minconns(); - const measureHeap = envutil.measureHeap(); // increment no of requests stats.noreqs += 1; @@ -1068,22 +1065,13 @@ function heartbeat() { if (stats.noreqs % (minc * 2) === 0) { log.i(stats.str(), "in", (uptime() / 60000) | 0, "mins"); } - - if (measureHeap && stats.noreqs !== 0 && stats.noreqs % (maxc * 10) === 0) { - if (isNode && stats.nofheapsnaps < maxHeapSnaps) { - stats.nofheapsnaps += 1; - const nom = - "snap" + stats.nofheapsnaps + "." + stats.noreqs + ".heapsnapshot"; - log.i("heap snapshot #", stats.nofheapsnaps, nom); - v8.writeHeapSnapshot(nom); - } - } } function adjustMaxConns(n) { const isNode = envutil.isNode(); const maxc = envutil.maxconns(); const minc = envutil.minconns(); + const measureHeap = envutil.measureHeap(); const adjsPerSec = 60 / adjPeriodSec; // caveats: @@ -1095,6 +1083,11 @@ function adjustMaxConns(n) { avg5 = ((avg5 * 100) / cpucount) | 0; avg15 = ((avg15 * 100) / cpucount) | 0; + const freemem = os.freemem(); + const totmem = os.totalmem(); + const lowram = freemem < 0.1 * totmem; + const verylowram = freemem < 0.025 * totmem; + let adj = stats.bp[3] || 0; // increase in load if (avg5 > 90) { @@ -1109,7 +1102,7 @@ function adjustMaxConns(n) { n = maxc; if (avg1 > 100) { n = minc; - } else if (avg1 > 90 || avg5 > 80) { + } else if (avg1 > 90 || avg5 > 80 || lowram) { n = Math.max((n * 0.2) | 0, minc); } else if (avg1 > 80 || avg5 > 75) { n = Math.max((n * 0.4) | 0, minc); @@ -1133,8 +1126,8 @@ function adjustMaxConns(n) { const breakpoint = 6 * adjsPerSec; // 6 mins const stresspoint = 4 * adjsPerSec; // 4 mins const nstr = stats.openconns + "/" + n; - if (adj > breakpoint) { - log.w("load: stopping; n:", nstr, "adjs:", adj); + if (adj > breakpoint || verylowram) { + log.w("load: stopping lowram?", verylowram, "; n:", nstr, "adjs:", adj); stopAfter(0); return; } else if (adj > stresspoint) { @@ -1144,6 +1137,12 @@ function adjustMaxConns(n) { log.d("load: high; n:", nstr, "adjs:", adj, "avgs:", avg1, avg5, avg15); } + stats.bp = [avg1, avg5, avg15, adj, n]; + for (const s of tracker.servers()) { + if (!s || !s.listening) continue; + s.maxConnections = n; + } + // nodejs.org/en/docs/guides/diagnostics/memory/using-gc-traces if (adj > 0) { if (isNode) v8.setFlagsFromString("--trace-gc"); @@ -1151,10 +1150,15 @@ function adjustMaxConns(n) { if (isNode) v8.setFlagsFromString("--notrace-gc"); } - stats.bp = [avg1, avg5, avg15, adj, n]; - for (const s of tracker.servers()) { - if (!s || !s.listening) continue; - s.maxConnections = n; + const ramthres = freemem < 0.2 * totmem; + const reqthres = stats.noreqs > 0 && stats.noreqs % (maxc * 10) === 0; + const withinLimit = stats.nofheapsnaps < maxHeapSnaps; + if (isNode && measureHeap && withinLimit && reqthres && ramthres) { + stats.nofheapsnaps += 1; + const nom = "s" + stats.nofheapsnaps + "." + stats.noreqs + ".heapsnapshot"; + log.i("heap snapshot #", stats.nofheapsnaps, nom); + // nodejs.org/en/learn/diagnostics/memory/using-heap-snapshot + v8.writeHeapSnapshot(nom); // blocks event loop! } } From 472bc8c09102adddfbe531a0b80803f8a0828e7a Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Tue, 24 Sep 2024 22:12:24 +0530 Subject: [PATCH 07/41] node: log elapsed time to write a heapsnapshot --- src/server-node.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/server-node.js b/src/server-node.js index 301141c9f1..b1de922cf9 100644 --- a/src/server-node.js +++ b/src/server-node.js @@ -1156,9 +1156,11 @@ function adjustMaxConns(n) { if (isNode && measureHeap && withinLimit && reqthres && ramthres) { stats.nofheapsnaps += 1; const nom = "s" + stats.nofheapsnaps + "." + stats.noreqs + ".heapsnapshot"; - log.i("heap snapshot #", stats.nofheapsnaps, nom); + const start = Date.now(); // nodejs.org/en/learn/diagnostics/memory/using-heap-snapshot v8.writeHeapSnapshot(nom); // blocks event loop! + const end = Date.now(); + log.i("heap snapshot #", stats.nofheapsnaps, nom, "in", end - start, "ms"); } } From 99898d50a0955bff5226d68c3b791123bfaf912e Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Wed, 25 Sep 2024 05:15:17 +0530 Subject: [PATCH 08/41] dnsop/resolver: m warn log --- src/plugins/dns-op/resolver.js | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/plugins/dns-op/resolver.js b/src/plugins/dns-op/resolver.js index edf04cae9a..e9f6af0191 100644 --- a/src/plugins/dns-op/resolver.js +++ b/src/plugins/dns-op/resolver.js @@ -232,6 +232,7 @@ export default class DNSResolver { } // developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise/allSettled#return_value + /** @type{Response} */ const res = promisedTasks[1].value; if (fromMax) { @@ -251,8 +252,8 @@ export default class DNSResolver { if (!res.ok) { const txt = res.text && (await res.text()); - this.log.d(rxid, "!OK", res.status, txt); - throw new Error(txt + " http err: " + res); + this.log.w(rxid, "!OK", res.status, txt); + throw new Error(txt + " http err: " + res.status + " " + res.statusText); } const ans = await res.arrayBuffer(); From d336c4642979f26796e0806437881a4294b165ea Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Wed, 25 Sep 2024 05:16:22 +0530 Subject: [PATCH 09/41] node: lower heap snapshot thresholds for local env --- src/server-node.js | 60 ++++++++++++++++++++++++++++++++-------------- 1 file changed, 42 insertions(+), 18 deletions(-) diff --git a/src/server-node.js b/src/server-node.js index b1de922cf9..855d925d47 100644 --- a/src/server-node.js +++ b/src/server-node.js @@ -1058,20 +1058,40 @@ function trapRequestResponseEvents(req, res) { function heartbeat() { const minc = envutil.minconns(); - + const maxc = envutil.maxconns(); + const isNode = envutil.isNode(); + const notCloud = envutil.onLocal(); + const measureHeap = envutil.measureHeap(); + const freemem = os.freemem() / (1024 * 1024); // in mb + const totmem = os.totalmem() / (1024 * 1024); // in mb // increment no of requests stats.noreqs += 1; if (stats.noreqs % (minc * 2) === 0) { log.i(stats.str(), "in", (uptime() / 60000) | 0, "mins"); } + + const mul = notCloud ? 2 : 10; + const writeSnap = notCloud || measureHeap; + const ramthres = notCloud || freemem < 0.2 * totmem; + const reqthres = stats.noreqs > 0 && stats.noreqs % (maxc * mul) === 0; + const withinLimit = stats.nofheapsnaps < maxHeapSnaps; + if (isNode && writeSnap && withinLimit && reqthres && ramthres) { + stats.nofheapsnaps += 1; + const n = "s" + stats.nofheapsnaps + "." + stats.noreqs + ".heapsnapshot"; + const start = Date.now(); + // nodejs.org/en/learn/diagnostics/memory/using-heap-snapshot + v8.writeHeapSnapshot(n); // blocks event loop! + const elapsed = (Date.now() - start) / 1000; + log.i("heap snapshot #", stats.nofheapsnaps, n, "in", elapsed, "s"); + } } function adjustMaxConns(n) { const isNode = envutil.isNode(); + const notCloud = envutil.onLocal(); const maxc = envutil.maxconns(); const minc = envutil.minconns(); - const measureHeap = envutil.measureHeap(); const adjsPerSec = 60 / adjPeriodSec; // caveats: @@ -1083,8 +1103,8 @@ function adjustMaxConns(n) { avg5 = ((avg5 * 100) / cpucount) | 0; avg15 = ((avg15 * 100) / cpucount) | 0; - const freemem = os.freemem(); - const totmem = os.totalmem(); + const freemem = os.freemem() / (1024 * 1024); // in mb + const totmem = os.totalmem() / (1024 * 1024); // in mb const lowram = freemem < 0.1 * totmem; const verylowram = freemem < 0.025 * totmem; @@ -1126,14 +1146,31 @@ function adjustMaxConns(n) { const breakpoint = 6 * adjsPerSec; // 6 mins const stresspoint = 4 * adjsPerSec; // 4 mins const nstr = stats.openconns + "/" + n; - if (adj > breakpoint || verylowram) { + if (adj > breakpoint || (verylowram && !notCloud)) { + log.w("load: verylowram! freemem:", freemem, "totmem:", totmem); log.w("load: stopping lowram?", verylowram, "; n:", nstr, "adjs:", adj); stopAfter(0); return; } else if (adj > stresspoint) { + log.w( + "load: stress; lowram?", + lowram, + "freemem:", + freemem, + "totmem:", + totmem + ); log.w("load: stress; n:", nstr, "adjs:", adj, "avgs:", avg1, avg5, avg15); n = (minc / 2) | 0; } else if (adj > 0) { + log.d( + "load: high; lowram?", + lowram, + "freemem:", + freemem, + "totmem:", + totmem + ); log.d("load: high; n:", nstr, "adjs:", adj, "avgs:", avg1, avg5, avg15); } @@ -1149,19 +1186,6 @@ function adjustMaxConns(n) { } else { if (isNode) v8.setFlagsFromString("--notrace-gc"); } - - const ramthres = freemem < 0.2 * totmem; - const reqthres = stats.noreqs > 0 && stats.noreqs % (maxc * 10) === 0; - const withinLimit = stats.nofheapsnaps < maxHeapSnaps; - if (isNode && measureHeap && withinLimit && reqthres && ramthres) { - stats.nofheapsnaps += 1; - const nom = "s" + stats.nofheapsnaps + "." + stats.noreqs + ".heapsnapshot"; - const start = Date.now(); - // nodejs.org/en/learn/diagnostics/memory/using-heap-snapshot - v8.writeHeapSnapshot(nom); // blocks event loop! - const end = Date.now(); - log.i("heap snapshot #", stats.nofheapsnaps, nom, "in", end - start, "ms"); - } } function bye() { From f4452565c77d75b78b9af836c5a5da05468bba1c Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Wed, 25 Sep 2024 05:48:55 +0530 Subject: [PATCH 10/41] fly: disable swap, enable auto suspend --- fly.tls.toml | 5 +++-- fly.toml | 8 +++++--- node.Dockerfile | 2 +- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/fly.tls.toml b/fly.tls.toml index b2fbd69fc4..098b00b126 100644 --- a/fly.tls.toml +++ b/fly.tls.toml @@ -24,7 +24,8 @@ auto_rollback = true [[services]] internal_port = 8055 protocol = "tcp" - auto_stop_machines = true + # community.fly.io/t/20672 + auto_stop_machines = "suspend" auto_start_machines = true [services.concurrency] @@ -57,7 +58,7 @@ auto_rollback = true [[services]] internal_port = 10555 protocol = "tcp" - auto_stop_machines = true + auto_stop_machines = "suspend" auto_start_machines = true [services.concurrency] diff --git a/fly.toml b/fly.toml index 38fc89f208..fb98996262 100644 --- a/fly.toml +++ b/fly.toml @@ -2,7 +2,9 @@ app = "" kill_signal = "SIGINT" kill_timeout = "15s" -swap_size_mb = 152 +# swap cannot be used with "suspend" +# community.fly.io/t/20672 +# swap_size_mb = 152 [build] dockerfile = "node.Dockerfile" @@ -21,7 +23,7 @@ swap_size_mb = 152 [[services]] protocol = "tcp" internal_port = 8080 - auto_stop_machines = true + auto_stop_machines = "suspend" auto_start_machines = true [[services.ports]] @@ -50,7 +52,7 @@ swap_size_mb = 152 [[services]] protocol = "tcp" internal_port = 10000 - auto_stop_machines = true + auto_stop_machines = "suspend" auto_start_machines = true [[services.ports]] diff --git a/node.Dockerfile b/node.Dockerfile index 0680f23584..6f91660e13 100644 --- a/node.Dockerfile +++ b/node.Dockerfile @@ -19,7 +19,7 @@ FROM node:22-alpine AS runner # env vals persist even at run-time: archive.is/QpXp2 # and overrides fly.toml env values ENV NODE_ENV production -ENV NODE_OPTIONS="--max-old-space-size=320 --heapsnapshot-signal=SIGUSR2" +ENV NODE_OPTIONS="--max-old-space-size=200 --heapsnapshot-signal=SIGUSR2" # get working dir in order WORKDIR /app # external deps not bundled by webpack From 68597f2aaff96a7025f5da41e8081e981f51bec4 Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Wed, 25 Sep 2024 05:54:47 +0530 Subject: [PATCH 11/41] plugin: rmv log timers for execute() --- src/core/plugin.js | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/src/core/plugin.js b/src/core/plugin.js index 12c6bdbeea..b12e4c6fbf 100644 --- a/src/core/plugin.js +++ b/src/core/plugin.js @@ -142,10 +142,8 @@ export default class RethinkPlugin { async execute() { const io = this.io; - const rxid = this.ctx.get("rxid"); - - const t = this.log.startTime("exec-plugin-" + rxid); - + // const rxid = this.ctx.get("rxid"); + // const t = this.log.startTime("exec-plugin-" + rxid); for (const p of this.plugin) { if (io.stopProcessing && !p.continueOnStopProcess) { continue; @@ -154,19 +152,16 @@ export default class RethinkPlugin { continue; } - this.log.lapTime(t, rxid, p.name, "send-io"); - + // this.log.lapTime(t, rxid, p.name, "send-io"); const res = await p.module.exec(makectx(this.ctx, p.pctx)); - - this.log.lapTime(t, rxid, p.name, "got-res"); + // this.log.lapTime(t, rxid, p.name, "got-res"); if (typeof p.callback === "function") { await p.callback.call(this, res, io); } - - this.log.lapTime(t, rxid, p.name, "post-callback"); + // this.log.lapTime(t, rxid, p.name, "post-callback"); } - this.log.endTime(t); + // this.log.endTime(t); } /** From 716f21bbe7d1bc40ad1d1256906f9e46b5d6fa30 Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Wed, 25 Sep 2024 06:29:29 +0530 Subject: [PATCH 12/41] fly: rmv non-functional swapon --- src/core/linux/swap.js | 65 ----------------------------------------- src/core/node/config.js | 10 ++----- 2 files changed, 3 insertions(+), 72 deletions(-) delete mode 100644 src/core/linux/swap.js diff --git a/src/core/linux/swap.js b/src/core/linux/swap.js deleted file mode 100644 index 724cfa0e12..0000000000 --- a/src/core/linux/swap.js +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright (c) 2021 RethinkDNS and its authors. - * - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. - */ -import { spawnSync } from "node:child_process"; - -const swapfile = "swap__"; -const swapsize = "152M"; - -// linuxize.com/post/create-a-linux-swap-file -export function mkswap() { - return ( - !hasanyswap() && - sh("fallocate", ["-l", swapsize, swapfile]) && - sh("chmod", ["600", swapfile]) && - sh("mkswap", [swapfile]) && - sh("swapon", [swapfile]) && - sh("sysctl", ["vm.swappiness=20"]) - ); -} - -export function rmswap() { - return hasswap() && sh("swapoff", ["-v", swapfile]) && sh("rm", [swapfile]); -} - -function hasanyswap() { - // cat /proc/swaps - // Filename Type Size Used Priority - // /swap__ file 155644 99968 -2 - const pswaps = shout("cat", ["/proc/swaps"]); - const lines = pswaps && pswaps.split("\n"); - return lines && lines.length > 1; -} - -// stackoverflow.com/a/53222213 -function hasswap() { - return sh("test", ["-e", swapfile]); -} - -function shout(cmd, args) { - return shx(cmd, args, true); -} - -function sh(cmd, args) { - return shx(cmd, args) === 0; -} - -function shx(cmd, args, out = false) { - if (!cmd) return false; - args = args || []; - const opts = { - cwd: "/", - uid: 0, - shell: true, - encoding: "utf8", - }; - const proc = spawnSync(cmd, args, opts); - if (proc.error) log.i(cmd, args, opts, "error", proc.error); - if (proc.stderr) log.e(cmd, args, opts, proc.stderr); - if (proc.stdout) log.l(proc.stdout); - return !out ? proc.status : proc.stdout; -} diff --git a/src/core/node/config.js b/src/core/node/config.js index 59efdfa4fa..d1b7e9bc26 100644 --- a/src/core/node/config.js +++ b/src/core/node/config.js @@ -20,7 +20,6 @@ import Log from "../log.js"; import * as system from "../../system.js"; import { services, stopAfter } from "../svc.js"; import EnvManager from "../env.js"; -import * as swap from "../linux/swap.js"; import * as dnst from "../../core/node/dns-transport.js"; // some of the cjs node globals aren't available in esm @@ -121,14 +120,11 @@ async function prep() { // flydns is always ipv6 (fdaa::53) const plainOldDnsIp = onFly ? "fdaa::3" : "1.1.1.2"; let dns53 = null; - /** swap space and recursive resolver on Fly */ - if (onFly || true) { - const ok = swap.mkswap(); - log.i("mkswap done?", ok); + if (onFly) { + // recursive resolver on Fly + // swapon won't work on fly: community.fly.io/t/19196/13 dns53 = dnst.makeTransport(plainOldDnsIp); log.i("imported udp/tcp dns transport", plainOldDnsIp); - } else { - log.i("no swap required"); } /** signal ready */ From e5246c0223eddf5d3d5e6feda7a22372522b5e7d Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Wed, 25 Sep 2024 07:01:43 +0530 Subject: [PATCH 13/41] fly: restart policy; on-fail --- fly.tls.toml | 6 ++++++ fly.toml | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/fly.tls.toml b/fly.tls.toml index 098b00b126..592e042ea9 100644 --- a/fly.tls.toml +++ b/fly.tls.toml @@ -20,6 +20,12 @@ swap_size_mb = 152 [experimental] auto_rollback = true +# community.fly.io/t/19180 +# fly.io/docs/machines/guides-examples/machine-restart-policy +[[restart]] + policy = "on-fail" + retries = 3 + # DNS over HTTPS (well, h2c and http1.1) [[services]] internal_port = 8055 diff --git a/fly.toml b/fly.toml index fb98996262..d4f921fe54 100644 --- a/fly.toml +++ b/fly.toml @@ -19,6 +19,12 @@ kill_timeout = "15s" NODE_ENV = "production" LOG_LEVEL = "info" +# community.fly.io/t/19180 +# fly.io/docs/machines/guides-examples/machine-restart-policy +[[restart]] + policy = "on-fail" + retries = 3 + # DNS over HTTPS [[services]] protocol = "tcp" From bf3443873a7999afe1441858cbe8d6f4388c2e47 Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Wed, 25 Sep 2024 09:24:46 +0530 Subject: [PATCH 14/41] plugins: do not err out user-op on malformed blockstamp --- src/plugins/users/user-op.js | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/plugins/users/user-op.js b/src/plugins/users/user-op.js index 6f879311ad..6259d82ff5 100644 --- a/src/plugins/users/user-op.js +++ b/src/plugins/users/user-op.js @@ -48,7 +48,7 @@ export class UserOp { * @returns {pres.RResp} */ loadUser(ctx) { - let response = pres.emptyResponse(); + const response = pres.emptyResponse(); if (!ctx.isDnsMsg) { this.log.w(ctx.rxid, "not a dns-msg, ignore"); @@ -84,7 +84,8 @@ export class UserOp { response.data.dnsResolverUrl = null; } catch (e) { this.log.e(ctx.rxid, "loadUser", e); - response = pres.errResponse("UserOp:loadUser", e); + // avoid erroring out on invalid blocklist info & flag + // response = pres.errResponse("UserOp:loadUser", e); } return response; From 84a7ee12969c9384203b12d26eca43724b442d4e Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Wed, 25 Sep 2024 21:06:36 +0530 Subject: [PATCH 15/41] all: m jsdoc --- src/commons/dnsutil.js | 9 ++ src/core/plugin.js | 2 +- src/plugins/dns-op/blocker.js | 18 +++ src/plugins/dns-op/cache-resolver.js | 12 ++ src/plugins/dns-op/resolver.js | 2 +- src/plugins/plugin-response.js | 23 +++- src/plugins/rdns-util.js | 176 +++++++++++++++++---------- 7 files changed, 175 insertions(+), 67 deletions(-) diff --git a/src/commons/dnsutil.js b/src/commons/dnsutil.js index 4449b6e4ee..c40fab6020 100644 --- a/src/commons/dnsutil.js +++ b/src/commons/dnsutil.js @@ -377,9 +377,14 @@ export function isAnswerQuad0(packet) { return isAnswerBlocked(packet.answers); } +/** + * @param {any} dnsPacket + * @returns {string[]} + */ export function extractDomains(dnsPacket) { if (!hasSingleQuestion(dnsPacket)) return []; + /** @type {string} */ const names = new Set(); const answers = dnsPacket.answers; @@ -535,6 +540,10 @@ export function getQueryType(packet) { return util.emptyString(qt) ? false : qt; } +/** + * @param {string?} n + * @returns {string} + */ export function normalizeName(n) { if (util.emptyString(n)) return n; diff --git a/src/core/plugin.js b/src/core/plugin.js index b12e4c6fbf..ec39e20f1b 100644 --- a/src/core/plugin.js +++ b/src/core/plugin.js @@ -182,7 +182,7 @@ export default class RethinkPlugin { /** * Adds "userBlocklistInfo", "userBlocklistInfo", and "dnsResolverUrl" * to RethinkPlugin ctx. - * @param {RResp} response - Contains data: userBlocklistInfo / userBlockstamp + * @param {RResp} response * @param {IOState} io */ async userOpCallback(response, io) { diff --git a/src/plugins/dns-op/blocker.js b/src/plugins/dns-op/blocker.js index 2265250898..0852f74520 100644 --- a/src/plugins/dns-op/blocker.js +++ b/src/plugins/dns-op/blocker.js @@ -15,6 +15,12 @@ export class DnsBlocker { this.log = log.withTags("DnsBlocker"); } + /** + * @param {string} rxid + * @param {pres.RespData} req + * @param {pres.BlockstampInfo} blockInfo + * @returns {pres.RespData} + */ blockQuestion(rxid, req, blockInfo) { const dnsPacket = req.dnsPacket; const stamps = req.stamps; @@ -40,6 +46,12 @@ export class DnsBlocker { return pres.copyOnlyBlockProperties(req, bres); } + /** + * @param {string} rxid + * @param {pres.RespData} res + * @param {pres.BlockstampInfo} blockInfo + * @returns {pres.RespData} + */ blockAnswer(rxid, res, blockInfo) { const dnsPacket = res.dnsPacket; const stamps = res.stamps; @@ -71,6 +83,12 @@ export class DnsBlocker { return pres.copyOnlyBlockProperties(res, bres); } + /** + * @param {string[]} names + * @param {pres.BlockstampInfo} blockInfo + * @param {pres.BStamp} blockstamps + * @returns {pres.RespData} + */ block(names, blockInfo, blockstamps) { let r = pres.rdnsNoBlockResponse(); for (const n of names) { diff --git a/src/plugins/dns-op/cache-resolver.js b/src/plugins/dns-op/cache-resolver.js index 61240e1870..41617f68ac 100644 --- a/src/plugins/dns-op/cache-resolver.js +++ b/src/plugins/dns-op/cache-resolver.js @@ -48,6 +48,12 @@ export class DNSCacheResponder { return response; } + /** + * @param {string} rxid + * @param {any} packet + * @param {pres.BStamp} blockInfo + * @returns {Promise} + */ async resolveFromCache(rxid, packet, blockInfo) { const noAnswer = pres.rdnsNoBlockResponse(); // if blocklist-filter is setup, then there's no need to query http-cache @@ -101,6 +107,12 @@ export class DNSCacheResponder { return pres.dnsResponse(res.dnsPacket, reencoded, res.stamps); } + /** + * @param {string} rxid + * @param {pres.RespData} r + * @param {pres.BStamp} blockInfo + * @returns {pres.RespData} + */ makeCacheResponse(rxid, r, blockInfo) { // check incoming dns request against blocklists in cache-metadata this.blocker.blockQuestion(rxid, /* out*/ r, blockInfo); diff --git a/src/plugins/dns-op/resolver.js b/src/plugins/dns-op/resolver.js index e9f6af0191..6d85529933 100644 --- a/src/plugins/dns-op/resolver.js +++ b/src/plugins/dns-op/resolver.js @@ -137,7 +137,7 @@ export default class DNSResolver { * @param {Request} ctx.request * @param {ArrayBuffer} ctx.requestBodyBuffer * @param {Object} ctx.requestDecodedDnsPacket - * @param {Object} ctx.userBlocklistInfo + * @param {pres.BlockstampInfo} ctx.userBlocklistInfo * @param {String} ctx.userDnsResolverUrl * @param {string} ctx.userBlockstamp * @param {pres.BStamp?} ctx.domainBlockstamp diff --git a/src/plugins/plugin-response.js b/src/plugins/plugin-response.js index c11c185fad..adebaf2cb0 100644 --- a/src/plugins/plugin-response.js +++ b/src/plugins/plugin-response.js @@ -9,6 +9,8 @@ import * as util from "../commons/util.js"; import * as bufutil from "../commons/bufutil.js"; +/** @typedef {import("./users/auth-token.js").Outcome} AuthOutcome */ + export class RResp { constructor(data = null, hasex = false, exfrom = "", exstack = "") { /** @type {RespData?} */ @@ -30,10 +32,27 @@ export class RespData { this.flag = flag || ""; /** @type {Object} */ this.dnsPacket = packet || null; - /** @type {ArrayBuffer} */ + /** @type {ArrayBuffer?} */ this.dnsBuffer = raw || null; - /** @type {BStamp?} */ + /** @type {BStamp|boolean} */ this.stamps = stamps || {}; + /** @type {AuthOutcome?} */ + this.userAuth = null; + /** @type {BlockstampInfo?} */ + this.userBlocklistInfo = null; + /** @type {String} */ + this.dnsResolverUrl = ""; + /** @type {string} */ + this.userBlocklistFlag = ""; + } +} + +export class BlockstampInfo { + constructor() { + /** @type {Uint16Array} */ + this.userBlocklistFlagUint = null; + /** @type {String} - mosty 0 or 1 */ + this.flagVersion = "0"; } } diff --git a/src/plugins/rdns-util.js b/src/plugins/rdns-util.js index f9d93677c7..f1c1f89873 100644 --- a/src/plugins/rdns-util.js +++ b/src/plugins/rdns-util.js @@ -27,11 +27,6 @@ const emptystr = ""; // delim, version, blockstamp (flag), accesskey const emptystamp = [emptystr, emptystr, emptystr, emptystr]; -// deprecated: all lists are trreated as wildcards -const _wildcardUint16 = new Uint16Array([ - 64544, 18431, 8191, 65535, 64640, 1, 128, 16320, -]); - // pec: parental control, rec: recommended, sec: security const recBlockstamps = new Map(); // oisd, 1hosts:mini, cpbl:light, anudeep, yhosts, tiuxo, adguard @@ -57,28 +52,46 @@ recBlockstamps.set("pr", "1:eMYB-ACgAQAgARAwIABhUgCA"); // pec, sec recBlockstamps.set("ps", "1:GNwB-ACgeQKr7cg3YXoAgA=="); +/** + * @param {BlocklistFilter} blf + * @returns {boolean} + */ export function isBlocklistFilterSetup(blf) { return blf && !util.emptyObj(blf.ftrie); } +/** + * @param {string} p + * @returns {boolean} + */ export function isStampQuery(p) { return stampPrefix.test(p); } +/** + * @param {string} p + * @returns {boolean} + */ export function isLogQuery(p) { return logPrefix.test(p); } -// dn -> domain name, ex: you.and.i.example.com -// userBlInfo -> user-selected blocklist-stamp -// {userBlocklistFlagUint, userServiceListUint} -// dnBlInfo -> obj of blocklists stamps for dn and all its subdomains -// {string(sub/domain-name) : u16(blocklist-stamp) } -// FIXME: return block-dnspacket depending on altsvc/https/svcb or cname/a/aaaa +/** + * dn -> domain name, ex: you.and.i.example.com + * userBlInfo -> user-selected blocklist-stamp + * {BlockstampInfo} + * dnBlInfo -> obj of blocklists stamps for dn and all its subdomains + * {string(sub/domain-name) : u16(blocklist-stamp) } + * FIXME: return block-dnspacket depending on altsvc/https/svcb or cname/a/aaaa + * @param {string} dn domain name + * @param {pres.BlockstampInfo} userBlInfo user blocklist info + * @param {pres.BStamp} dnBlInfo domain blockstamp map + */ export function doBlock(dn, userBlInfo, dnBlInfo) { const blockSubdomains = envutil.blockSubdomains(); const version = userBlInfo.flagVersion; const noblock = pres.rdnsNoBlockResponse(); + const userUint = userBlInfo.userBlocklistFlagUint; if ( util.emptyString(dn) || util.emptyObj(dnBlInfo) || @@ -89,32 +102,14 @@ export function doBlock(dn, userBlInfo, dnBlInfo) { // treat every blocklist as a wildcard blocklist if (blockSubdomains) { - return applyWildcardBlocklists( - userBlInfo.userBlocklistFlagUint, - version, - dnBlInfo, - dn - ); + return applyWildcardBlocklists(dn, version, userUint, dnBlInfo); } - const dnUint = new Uint16Array(dnBlInfo[dn]); + const dnUint = dnBlInfo[dn]; // if the domain isn't in block-info, we're done if (util.emptyArray(dnUint)) return noblock; // else, determine if user selected blocklist intersect with the domain's - const r = applyBlocklists(userBlInfo.userBlocklistFlagUint, dnUint, version); - - // if response is blocked, we're done - if (r.isBlocked) return r; - // if user-blockstamp doesn't contain any wildcard blocklists, we're done - if (util.emptyArray(userBlInfo.userServiceListUint)) return r; - - // check if any subdomain is in blocklists that is also in user-blockstamp - return applyWildcardBlocklists( - userBlInfo.userServiceListUint, - version, - dnBlInfo, - dn - ); + return applyBlocklists(version, userUint, dnUint); } /** @@ -131,7 +126,7 @@ export function blockstampFromCache(cr) { } /** - * @param {*} dnsPacket + * @param {any} dnsPacket * @param {BlocklistFilter} blocklistFilter * @returns {pres.BStamp|boolean} */ @@ -156,7 +151,14 @@ export function blockstampFromBlocklistFilter(dnsPacket, blocklistFilter) { return util.emptyMap(m) ? false : util.objOf(m); } -function applyWildcardBlocklists(uint1, flagVersion, dnBlInfo, dn) { +/** + * @param {string} dn domain name + * @param {Uint16Array} usrUint user blocklist flags + * @param {string} flagVersion mosty 0 or 1 + * @param {pres.BStamp} dnBlInfo subdomain blocklist flag group + * @returns {pres.RespData} + */ +function applyWildcardBlocklists(dn, flagVersion, usrUint, dnBlInfo) { const dnSplit = dn.split("."); // iterate through all subdomains one by one, for ex: a.b.c.ex.com: @@ -170,7 +172,7 @@ function applyWildcardBlocklists(uint1, flagVersion, dnBlInfo, dn) { // the subdomain isn't present in any current blocklists if (util.emptyArray(subdomainUint)) continue; - const response = applyBlocklists(uint1, subdomainUint, flagVersion); + const response = applyBlocklists(flagVersion, usrUint, subdomainUint); // if any subdomain is in any blocklist, block the current request if (!util.emptyObj(response) && response.isBlocked) { @@ -181,7 +183,13 @@ function applyWildcardBlocklists(uint1, flagVersion, dnBlInfo, dn) { return pres.rdnsNoBlockResponse(); } -function applyBlocklists(uint1, uint2, flagVersion) { +/** + * @param {string} flagVersion + * @param {Uint16Array} uint1 + * @param {Uint16Array} uint2 + * @returns {pres.RespData} + */ +function applyBlocklists(flagVersion, uint1, uint2) { // uint1 -> user blocklists; uint2 -> blocklists including sub/domains const blockedUint = intersect(uint1, uint2); @@ -194,6 +202,11 @@ function applyBlocklists(uint1, uint2, flagVersion) { } } +/** + * @param {Uint16Array} flag1 + * @param {Uint16Array} flag2 + * @returns {Uint16Array|null} + */ function intersect(flag1, flag2) { if (util.emptyArray(flag1) || util.emptyArray(flag2)) return null; @@ -253,6 +266,11 @@ function intersect(flag1, flag2) { return Uint16Array.of(commonHeader, ...commonBody.reverse()); } +/** + * @param {int} uint + * @param {int} pos + * @returns + */ function clearbit(uint, pos) { return uint & ~(1 << pos); } @@ -326,7 +344,7 @@ export function recBlockstampFrom(url) { /** * @param {string} u - Request URL string - * @returns {Array} s - delim, version, blockstamp (flag), accesskey + * @returns {string[]} s - delim, version, blockstamp (flag), accesskey */ export function extractStamps(u) { const url = new URL(u); @@ -373,6 +391,10 @@ export function extractStamps(u) { return emptystamp; } +/** + * @param {string} b64Flag + * @returns {Uint16Array} + */ export function base64ToUintV0(b64Flag) { // TODO: v0 not in use, remove all occurences // FIXME: Impl not accurate @@ -381,32 +403,45 @@ export function base64ToUintV0(b64Flag) { return bufutil.base64ToUint16(f); } +/** + * @param {string} b64Flag + * @returns {Uint16Array} + */ export function base64ToUintV1(b64Flag) { // TODO: check for empty b64Flag return bufutil.base64ToUint16(b64Flag); } +/** + * @param {string} b64Flag + * @returns {Uint16Array} + */ export function base32ToUintV1(flag) { // TODO: check for empty flag const b32 = decodeURI(flag); return bufutil.decodeFromBinaryArray(rbase32(b32)); } +/** + * @param {string} s + * @returns {string[]} [delim, ver, blockstamp, accesskey] + */ function splitBlockstamp(s) { - // delim, version, blockstamp, accesskey - if (util.emptyString(s)) return emptystamp; if (!isStampQuery(s)) return emptystamp; if (isB32Stamp(s)) { + // delim, version, blockstamp, accesskey return [_b32delim, ...s.split(_b32delim)]; } else { return [_b64delim, ...s.split(_b64delim)]; } - - return out; } +/** + * @param {string} s + * @returns {boolean} + */ export function isB32Stamp(s) { const idx32 = s.indexOf(_b32delim); const idx64 = s.indexOf(_b64delim); @@ -416,21 +451,27 @@ export function isB32Stamp(s) { else return idx32 < idx64; } -// s[0] is version field, if it doesn't exist -// then treat it as if version 0. +/** + * + * @param {string[]} s + * @returns {string} + */ export function stampVersion(s) { + // s[0] is version field, if it doesn't exist + // then treat it as if version 0. if (!util.emptyArray(s)) return s[0]; else return "0"; } // TODO: The logic to parse stamps must be kept in sync with: // github.com/celzero/website-dns/blob/8e6056bb/src/js/flag.js#L260-L425 +/** + * + * @param {string} flag + * @returns {pres.BlockstampInfo} + */ export function unstamp(flag) { - const r = { - userBlocklistFlagUint: null, - flagVersion: "0", - userServiceListUint: null, - }; + const r = new pres.BlockstampInfo(); if (util.emptyString(flag)) return r; @@ -440,29 +481,26 @@ export function unstamp(flag) { const isFlagB32 = isB32Stamp(flag); // "v:b64" or "v-b32" or "uriencoded(b64)", where v is uint version const s = flag.split(isFlagB32 ? _b32delim : _b64delim); - let convertor = (x) => ""; // empty convertor - let f = ""; // stamp flag const v = stampVersion(s); + r.flagVersion = v; if (v === "0") { - // version 0 - convertor = base64ToUintV0; - f = s[0]; + const f = s[0]; + r.userBlocklistFlagUint = base64ToUintV0(f) || null; } else if (v === "1") { - convertor = isFlagB32 ? base32ToUintV1 : base64ToUintV1; - f = s[1]; + const convertor = isFlagB32 ? base32ToUintV1 : base64ToUintV1; + const f = s[1]; + r.userBlocklistFlagUint = convertor(f) || null; } else { log.w("Rdns:unstamp", "unknown blocklist stamp version in " + s); - return r; } - - r.flagVersion = v; - r.userBlocklistFlagUint = convertor(f) || null; - r.userServiceListUint = intersect(r.userBlocklistFlagUint, _wildcardUint16); - return r; } +/** + * @param {pres.BlockstampInfo} blockInfo + * @returns {boolean} + */ export function hasBlockstamp(blockInfo) { return ( !util.emptyObj(blockInfo) && @@ -470,13 +508,21 @@ export function hasBlockstamp(blockInfo) { ); } -// returns true if tstamp is of form yyyy/epochMs +/** + * returns true if tstamp is of form yyyy/epochMs + * @param {string} tstamp + * @returns {boolean} + */ function isValidFullTimestamp(tstamp) { if (typeof tstamp !== "string") return false; return tstamp.indexOf("/") === 4; } -// from: github.com/celzero/downloads/blob/main/src/timestamp.js +/** + * from: github.com/celzero/downloads/blob/main/src/timestamp.js + * @param {string} tstamp + * @returns {int} epoch + */ export function bareTimestampFrom(tstamp) { // strip out "/" if tstamp is of form yyyy/epochMs if (isValidFullTimestamp(tstamp)) { @@ -490,6 +536,10 @@ export function bareTimestampFrom(tstamp) { return t; } +/** + * @param {string} strflag + * @returns {string[]} blocklist names + */ export function blocklists(strflag) { const { userBlocklistFlagUint, flagVersion } = unstamp(strflag); const blocklists = []; From 151af4c001ae033b9761112f71736c7ecd799235 Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Wed, 25 Sep 2024 22:39:05 +0530 Subject: [PATCH 16/41] dns-op: always delegate ipv4only.arpa to doh upstreams --- src/core/plugin.js | 2 +- src/plugins/dns-op/prefilter.js | 3 ++- src/plugins/dns-op/resolver.js | 3 ++- src/plugins/users/user-op.js | 42 +++++++++++++++++++++++---------- 4 files changed, 34 insertions(+), 16 deletions(-) diff --git a/src/core/plugin.js b/src/core/plugin.js index ec39e20f1b..9be0d79021 100644 --- a/src/core/plugin.js +++ b/src/core/plugin.js @@ -53,7 +53,7 @@ export default class RethinkPlugin { this.registerPlugin( "userOp", services.userOp, - ["rxid", "request", "isDnsMsg"], + ["rxid", "request", "requestDecodedDnsPacket", "isDnsMsg"], this.userOpCallback ); diff --git a/src/plugins/dns-op/prefilter.js b/src/plugins/dns-op/prefilter.js index 806f9c81f0..122dea2b6f 100644 --- a/src/plugins/dns-op/prefilter.js +++ b/src/plugins/dns-op/prefilter.js @@ -197,7 +197,8 @@ export class DNSPrefilter { const subdomains = d.split("."); do { if (util.emptyArray(subdomains)) break; - if (undelegated.has(subdomains.join("."))) { + const fqdn = subdomains.join("."); + if (undelegated.has(fqdn)) { return block; } } while (subdomains.shift() != null); diff --git a/src/plugins/dns-op/resolver.js b/src/plugins/dns-op/resolver.js index 6d85529933..20dfe3cc98 100644 --- a/src/plugins/dns-op/resolver.js +++ b/src/plugins/dns-op/resolver.js @@ -151,6 +151,7 @@ export default class DNSResolver { const rawpacket = ctx.requestBodyBuffer; const decodedpacket = ctx.requestDecodedDnsPacket; const userDns = ctx.userDnsResolverUrl; + const forceUserDns = this.forceDoh || !util.emptyString(userDns); const dispatcher = ctx.dispatcher; const userBlockstamp = ctx.userBlockstamp; // may be null or empty-obj (stamp then needs to be got from blf) @@ -212,7 +213,7 @@ export default class DNSResolver { this.resolveDnsUpstream( rxid, req, - this.determineDohResolvers(userDns), + this.determineDohResolvers(userDns, forceUserDns), rawpacket, decodedpacket ), diff --git a/src/plugins/users/user-op.js b/src/plugins/users/user-op.js index 6259d82ff5..1012113dbf 100644 --- a/src/plugins/users/user-op.js +++ b/src/plugins/users/user-op.js @@ -8,13 +8,18 @@ import { UserCache } from "./user-cache.js"; import * as pres from "../plugin-response.js"; import * as util from "../../commons/util.js"; +import * as envutil from "../../commons/envutil.js"; import * as rdnsutil from "../rdns-util.js"; import * as token from "./auth-token.js"; -import * as bufutil from "../../commons/bufutil.js"; +import * as dnsutil from "../../commons/dnsutil.js"; // TODO: determine an approp cache-size const cacheSize = 20000; +// use fixed doh upstream for these domains, +// instead of either recursing (on Fly.io) +const delegated = new Set(["ipv4only.arpa"]); + export class UserOp { constructor() { this.userConfigCache = new UserCache(cacheSize); @@ -22,7 +27,7 @@ export class UserOp { } /** - * @param {{request: Request, isDnsMsg: Boolean, rxid: string}} ctx + * @param {{request: Request, requestDecodedDnsPacket: any, isDnsMsg: Boolean, rxid: string}} ctx * @returns {Promise} */ async exec(ctx) { @@ -56,18 +61,28 @@ export class UserOp { } try { - const blocklistFlag = rdnsutil.blockstampFromUrl(ctx.request.url); + const domains = dnsutil.extractDomains(dnsPacket); + for (const d of domains) { + if (delegated.has(d)) { + // may be overriden by user-preferred doh upstream + response.data.dnsResolverUrl = envutil.primaryDohResolver(); + } + } - if (util.emptyString(blocklistFlag)) { + const blocklistFlag = rdnsutil.blockstampFromUrl(ctx.request.url); + const hasflag = !util.emptyString(blocklistFlag); + if (!hasflag) { this.log.d(ctx.rxid, "empty blocklist-flag", ctx.request.url); } - // blocklistFlag may be invalid, ref rdnsutil.blockstampFromUrl let r = this.userConfigCache.get(blocklistFlag); - if (!util.emptyString(blocklistFlag) && util.emptyObj(r)) { - r = rdnsutil.unstamp(blocklistFlag); + let hasdata = rdnsutil.hasBlockstamp(r); + if (hasflag && !hasdata) { + // r not in cache + r = rdnsutil.unstamp(blocklistFlag); // r is never null, may throw ex + hasdata = rdnsutil.hasBlockstamp(r); - if (!bufutil.emptyBuf(r.userBlocklistFlagUint)) { + if (hasdata) { this.log.d(ctx.rxid, "new cfg cache kv", blocklistFlag, r); // TODO: blocklistFlag is not normalized, ie b32 used for dot isn't // converted to its b64 form (which doh and rethinkdns modules use) @@ -75,13 +90,14 @@ export class UserOp { this.userConfigCache.put(blocklistFlag, r); } } else { - this.log.d(ctx.rxid, "cfg cache hit?", r != null, blocklistFlag, r); + this.log.d(ctx.rxid, "cfg cache hit?", hasdata, blocklistFlag, r); } - response.data.userBlocklistInfo = r; - response.data.userBlocklistFlag = blocklistFlag; - // sets user-preferred doh upstream - response.data.dnsResolverUrl = null; + if (hasdata) { + response.data.userBlocklistInfo = r; + response.data.userBlocklistFlag = blocklistFlag; + // TODO: override response.data.dnsResolverUrl + } } catch (e) { this.log.e(ctx.rxid, "loadUser", e); // avoid erroring out on invalid blocklist info & flag From b032f571e43e132787de11abfeb1eb392928c025 Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Wed, 25 Sep 2024 22:51:45 +0530 Subject: [PATCH 17/41] node: m logs fmt --- src/server-node.js | 18 ++---------------- 1 file changed, 2 insertions(+), 16 deletions(-) diff --git a/src/server-node.js b/src/server-node.js index 855d925d47..66cc421d8b 100644 --- a/src/server-node.js +++ b/src/server-node.js @@ -1152,25 +1152,11 @@ function adjustMaxConns(n) { stopAfter(0); return; } else if (adj > stresspoint) { - log.w( - "load: stress; lowram?", - lowram, - "freemem:", - freemem, - "totmem:", - totmem - ); + log.w("load: stress; lowram?", lowram, "mem:", freemem, " / ", totmem); log.w("load: stress; n:", nstr, "adjs:", adj, "avgs:", avg1, avg5, avg15); n = (minc / 2) | 0; } else if (adj > 0) { - log.d( - "load: high; lowram?", - lowram, - "freemem:", - freemem, - "totmem:", - totmem - ); + log.d("load: high; lowram?", lowram, "mem:", freemem, " / ", totmem); log.d("load: high; n:", nstr, "adjs:", adj, "avgs:", avg1, avg5, avg15); } From 9cec6c2ce7c7315ede376b0ee3ee29d44449e87c Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Wed, 25 Sep 2024 22:53:58 +0530 Subject: [PATCH 18/41] fly: m fix invalid restart policy --- fly.tls.toml | 2 +- fly.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/fly.tls.toml b/fly.tls.toml index 592e042ea9..1e3b420251 100644 --- a/fly.tls.toml +++ b/fly.tls.toml @@ -23,7 +23,7 @@ auto_rollback = true # community.fly.io/t/19180 # fly.io/docs/machines/guides-examples/machine-restart-policy [[restart]] - policy = "on-fail" + policy = "on-failure" retries = 3 # DNS over HTTPS (well, h2c and http1.1) diff --git a/fly.toml b/fly.toml index d4f921fe54..e548706e8e 100644 --- a/fly.toml +++ b/fly.toml @@ -22,7 +22,7 @@ kill_timeout = "15s" # community.fly.io/t/19180 # fly.io/docs/machines/guides-examples/machine-restart-policy [[restart]] - policy = "on-fail" + policy = "on-failure" retries = 3 # DNS over HTTPS From b9735502a7ccd9df9e32c36d886f09076579cc9c Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Wed, 25 Sep 2024 23:34:29 +0530 Subject: [PATCH 19/41] user-op: m assign missing var --- src/plugins/users/user-op.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/plugins/users/user-op.js b/src/plugins/users/user-op.js index 1012113dbf..3247072af0 100644 --- a/src/plugins/users/user-op.js +++ b/src/plugins/users/user-op.js @@ -49,7 +49,7 @@ export class UserOp { } /** - * @param {{request: Request, isDnsMsg: Boolean, rxid: string}} ctx + * @param {{request: Request, requestDecodedDnsPacket: any, isDnsMsg: Boolean, rxid: string}} ctx * @returns {pres.RResp} */ loadUser(ctx) { @@ -61,6 +61,7 @@ export class UserOp { } try { + const dnsPacket = ctx.requestDecodedDnsPacket; const domains = dnsutil.extractDomains(dnsPacket); for (const d of domains) { if (delegated.has(d)) { From a17874230e589dc536e30e5cc0b9a08c8b6c74cd Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Wed, 25 Sep 2024 23:34:48 +0530 Subject: [PATCH 20/41] all: rmv log timers --- src/core/log.js | 37 +--------------------------------- src/core/node/dns-transport.js | 13 ------------ src/core/plugin.js | 5 ----- src/server-node.js | 17 ---------------- 4 files changed, 1 insertion(+), 71 deletions(-) diff --git a/src/core/log.js b/src/core/log.js index 12d8f32ee7..13df003294 100644 --- a/src/core/log.js +++ b/src/core/log.js @@ -82,9 +82,6 @@ export default class Log { _resetLevel() { this.d = stub(); this.debug = stub(); - this.lapTime = stub(); - this.startTime = stubr(""); - this.endTime = stubr(false); this.i = stub(); this.info = stub(); this.w = stub(); @@ -96,24 +93,6 @@ export default class Log { withTags(...tags) { const that = this; return { - lapTime: (n, ...r) => { - // returns void - return that.lapTime(n, ...tags, ...r); - }, - startTime: (n, ...r) => { - const tid = that.startTime(n); - that.d(that.now() + " T", ...tags, "create", tid, ...r); - const tim = setTimeout(() => { - that.endTime(tid); - }, /* 2mins*/ 2 * 60 * 1000); - if (typeof tim.unref === "function") tim.unref(); - return tid; - }, - endTime: (n, ...r) => { - if (that.endTime(n)) { - that.d(that.now() + " T", ...tags, "end", n, ...r); - } // else: already ended or invalid timer - }, d: (...args) => { that.d(that.now() + " D", ...tags, ...args); }, @@ -169,21 +148,7 @@ export default class Log { this.d = console.debug; this.debug = console.debug; case "timer": - // stub() for Fastly as it does not support console timers. - this.lapTime = console.timeLog || stub(); - this.startTime = function (name) { - name = uid(name); - if (console.time) console.time(name); - return name; - }; - // stub() for Fastly as it does not support console timers. - this.endTime = function (uid) { - try { - if (console.timeEnd) console.timeEnd(uid); - return true; - } catch (ignore) {} - return false; - }; + // deprecated; fallthrough case "info": this.i = console.info; this.info = console.info; diff --git a/src/core/node/dns-transport.js b/src/core/node/dns-transport.js index 12ff0067ae..a351d21092 100644 --- a/src/core/node/dns-transport.js +++ b/src/core/node/dns-transport.js @@ -49,22 +49,15 @@ export class Transport { let sock = this.udpconns.take(); this.log.d(rxid, "udp pooled?", sock !== null); - const t = this.log.startTime("udp-query"); let ans = null; try { sock = sock || (await this.makeConn("udp")); - this.log.lapTime(t, rxid, "make-conn"); - ans = await UdpTx.begin(sock).exchange(rxid, q, this.ioTimeout); - this.log.lapTime(t, rxid, "get-ans"); - this.parkConn(sock, "udp"); } catch (ex) { this.closeUdp(sock); this.log.e(rxid, ex); } - this.log.endTime(t); - return ans; } @@ -72,21 +65,15 @@ export class Transport { let sock = this.tcpconns.take(); this.log.d(rxid, "tcp pooled?", sock !== null); - const t = this.log.startTime("tcp-query"); let ans = null; try { sock = sock || (await this.makeConn("tcp")); - log.lapTime(t, rxid, "make-conn"); - ans = await TcpTx.begin(sock).exchange(rxid, q, this.ioTimeout); - log.lapTime(t, rxid, "get-ans"); - this.parkConn(sock, "tcp"); } catch (ex) { this.closeTcp(sock); this.log.e(rxid, ex); } - this.log.endTime(t); return ans; } diff --git a/src/core/plugin.js b/src/core/plugin.js index 9be0d79021..a64d05dd8e 100644 --- a/src/core/plugin.js +++ b/src/core/plugin.js @@ -143,7 +143,6 @@ export default class RethinkPlugin { async execute() { const io = this.io; // const rxid = this.ctx.get("rxid"); - // const t = this.log.startTime("exec-plugin-" + rxid); for (const p of this.plugin) { if (io.stopProcessing && !p.continueOnStopProcess) { continue; @@ -152,16 +151,12 @@ export default class RethinkPlugin { continue; } - // this.log.lapTime(t, rxid, p.name, "send-io"); const res = await p.module.exec(makectx(this.ctx, p.pctx)); - // this.log.lapTime(t, rxid, p.name, "got-res"); if (typeof p.callback === "function") { await p.callback.call(this, res, io); } - // this.log.lapTime(t, rxid, p.name, "post-callback"); } - // this.log.endTime(t); } /** diff --git a/src/server-node.js b/src/server-node.js index 66cc421d8b..fc61a0436b 100644 --- a/src/server-node.js +++ b/src/server-node.js @@ -841,7 +841,6 @@ async function handleTCPQuery(q, socket, host, flag) { if (bufutil.emptyBuf(q) || !tcpOkay(socket)) return; const rxid = util.xid(); - const t = log.startTime("handle-tcp-query-" + rxid); try { const r = await resolveQuery(rxid, q, host, flag); if (bufutil.emptyBuf(r)) { @@ -856,7 +855,6 @@ async function handleTCPQuery(q, socket, host, flag) { ok = false; log.w(rxid, "tcp: send fail, err", e); } - log.endTime(t); // close socket when !ok if (!ok) { @@ -941,8 +939,6 @@ async function serveHTTPS(req, res) { const buffers = []; - const t = log.startTime("recv-https"); - // if using for await loop, then it must be wrapped in a // try-catch block: stackoverflow.com/questions/69169226 // if not, errors from reading req escapes unhandled. @@ -954,8 +950,6 @@ async function serveHTTPS(req, res) { const b = bufutil.concatBuf(buffers); const bLen = b.byteLength; - log.endTime(t); - if (util.isPostRequest(req) && !dnsutil.validResponseSize(b)) { res.writeHead(dnsutil.dohStatusCode(b), util.corsHeadersIfNeeded(ua)); res.end(); @@ -976,7 +970,6 @@ async function handleHTTPRequest(b, req, res) { heartbeat(); const rxid = util.xid(); - const t = log.startTime("handle-http-req-" + rxid); try { let host = req.headers.host || req.headers[":authority"]; if (isIPv6(host)) host = `[${host}]`; @@ -995,26 +988,18 @@ async function handleHTTPRequest(b, req, res) { body: req.method === "POST" ? b : null, }); - log.lapTime(t, "upstream-start"); - const fRes = await handleRequest(util.mkFetchEvent(fReq)); - log.lapTime(t, "upstream-end"); - if (!resOkay(res)) { throw new Error("res not writable 1"); } res.writeHead(fRes.status, util.copyHeaders(fRes)); - log.lapTime(t, "send-head"); - // ans may be null on non-2xx responses, such as redirects (3xx) by cc.js // or 4xx responses on timeouts or 5xx on invalid http method const ans = await fRes.arrayBuffer(); - log.lapTime(t, "recv-ans"); - if (!resOkay(res)) { throw new Error("res not writable 2"); } else if (!bufutil.emptyBuf(ans)) { @@ -1030,8 +1015,6 @@ async function handleHTTPRequest(b, req, res) { if (!ok) resClose(res); log.w(e); } - - log.endTime(t); } /** From 4f25b7463aeb40d4f2ec120848f6de35ddf13e3e Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Thu, 26 Sep 2024 22:04:59 +0530 Subject: [PATCH 21/41] node: m server event logs --- src/server-node.js | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/src/server-node.js b/src/server-node.js index fc61a0436b..5be9f9b1b3 100644 --- a/src/server-node.js +++ b/src/server-node.js @@ -356,7 +356,7 @@ function trapServerEvents(s) { const id = tracker.trackConn(s, socket); if (!tracker.valid(id)) { - log.i("tcp: not tracking; server shutting down?"); + log.i("tcp: not tracking; server shutting down?", id); close(socket); return; } @@ -368,7 +368,7 @@ function trapServerEvents(s) { }); socket.on("error", (err) => { - log.d("tcp: incoming conn closed with err; " + err.message); + log.d("tcp: incoming conn", id, "closed:", err.message); close(socket); }); @@ -411,7 +411,7 @@ function trapSecureServerEvents(s) { const id = tracker.trackConn(s, socket); if (!tracker.valid(id)) { - log.i("tls: not tracking; server shutting down?"); + log.i("tls: not tracking; server shutting down?", id); close(socket); return; } @@ -458,11 +458,23 @@ function trapSecureServerEvents(s) { s.on("tlsClientError", (err, /** @type {TLSSocket} */ tlsSocket) => { stats.tlserr += 1; // fly tcp healthchecks also trigger tlsClientErrors - log.d("tls: client err; " + err.message); + log.d("tls: client err;", err.message, addrstr(tlsSocket)); close(tlsSocket); }); } +/** + * @param {TLSSocket|Socket} sock + */ +function addrstr(sock) { + if (!sock) return ""; + return ( + `[${sock.localAddress}]:${sock.localPort}` + + "->" + + `[${sock.remoteAddress}]:${sock.remotePort}` + ); +} + /** * @param {tls.Server} s * @returns {void} From 36b178ad50353df617bb03e5e67de4aaaa62b38f Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Thu, 26 Sep 2024 22:06:26 +0530 Subject: [PATCH 22/41] core/dns: m handle transacts for null sockets --- src/core/dns/transact.js | 46 ++++++++++++++++++++++++++++++++++------ 1 file changed, 39 insertions(+), 7 deletions(-) diff --git a/src/core/dns/transact.js b/src/core/dns/transact.js index 53debb5db9..8145229ff9 100644 --- a/src/core/dns/transact.js +++ b/src/core/dns/transact.js @@ -9,25 +9,44 @@ import * as bufutil from "../../commons/bufutil.js"; import * as util from "../../commons/util.js"; import * as dnsutil from "../../commons/dnsutil.js"; +/** + * @typedef {import("net").Socket} TcpSock + * @typedef {import("dgram").Socket} UdpSock + * @typedef {import("net").AddressInfo} AddrInfo + */ + // TcpTx implements a single DNS question-answer exchange over TCP. It doesn't // multiplex multiple DNS questions over the same socket. It doesn't take the // ownership of the socket, but requires exclusive use of it. The socket may // close itself on errors, however. export class TcpTx { + /** @param {TcpSock} socket */ constructor(socket) { + /** @type {TcpSock} */ this.sock = socket; - // only one transaction allowed - // then done gates all other requests - this.done = false; + /** @type {boolean} */ + this.done = false || socket == null; // done gates all other requests + /** @type {ScratchBuffer} */ // reads from the socket is buffered into scratch this.readBuffer = this.makeReadBuffer(); + /** @type {function(Buffer)} */ + this.resolve = null; + /** @type {function(string?)} */ + this.reject = null; this.log = log.withTags("TcpTx"); } + /** @param {TcpSock} sock */ static begin(sock) { return new TcpTx(sock); } + /** + * @param {string} rxid + * @param {Buffer} query + * @param {int} timeout + * @returns {Promise|null} + */ async exchange(rxid, query, timeout) { if (this.done) { this.log.w(rxid, "no exchange, tx is done"); @@ -200,19 +219,32 @@ export class TcpTx { // ownership of the socket, but requires exclusive access to it. The socket // may close itself on errors, however. export class UdpTx { + /** @param {UdpSock} socket */ constructor(socket) { + /** @type {UdpSock} */ this.sock = socket; - // only one transaction allowed - this.done = false; - // ticks socket io timeout - this.timeoutTimerId = null; + /** @type {boolean} */ + this.done = false || socket == null; // only one transaction allowed + /** @type {NodeJS.Timeout|-1} */ + this.timeoutTimerId = null; // ticks socket io timeout + /** @type {function(Buffer)} */ + this.resolve = null; + /** @type {function(string)} */ + this.reject = null; this.log = log.withTags("UdpTx"); } + /** @param {UdpSock} sock */ static begin(sock) { return new UdpTx(sock); } + /** + * @param {string} rxid + * @param {Buffer} query + * @param {int} timeout + * @returns {Promise|null} + */ async exchange(rxid, query, timeout) { if (this.done) { this.log.w(rxid, "no exchange, tx is done"); From 433e493f5085ce295cf2ba0ce64ef3aaa449998c Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Thu, 26 Sep 2024 22:07:10 +0530 Subject: [PATCH 23/41] core/dns: m jsdocs --- src/commons/bufutil.js | 8 +++ src/commons/util.js | 8 ++- src/core/dns/conns.js | 96 +++++++++++++++++++++++++++++++--- src/core/dns/transact.js | 59 +++++++++++++++++---- src/core/node/dns-transport.js | 48 +++++++++++++++-- 5 files changed, 199 insertions(+), 20 deletions(-) diff --git a/src/commons/bufutil.js b/src/commons/bufutil.js index 56eb33625c..8897091476 100644 --- a/src/commons/bufutil.js +++ b/src/commons/bufutil.js @@ -169,11 +169,19 @@ export function bufferOf(arrayBuf) { return Buffer.from(new Uint8Array(arrayBuf)); } +/** + * @param {Buffer} b + * @returns {int} + */ export function recycleBuffer(b) { b.fill(0); return 0; } +/** + * @param {int} size + * @returns {Buffer} + */ export function createBuffer(size) { return Buffer.allocUnsafe(size); } diff --git a/src/commons/util.js b/src/commons/util.js index 405baf2f18..a639e13908 100644 --- a/src/commons/util.js +++ b/src/commons/util.js @@ -117,7 +117,13 @@ export function objOf(map) { return map.entries ? Object.fromEntries(map) : {}; } -export function timedOp(op, ms, cleanup = () => {}) { +/** + * @param {(function((out, err) => void))} op + * @param {int} ms + * @param {function(any)} cleanup + * @returns {Promise} + */ +export function timedOp(op, ms, cleanup = (x) => {}) { return new Promise((resolve, reject) => { let timedout = false; const tid = timeout(ms, () => { diff --git a/src/core/dns/conns.js b/src/core/dns/conns.js index d79b5c92d9..af022cee7b 100644 --- a/src/core/dns/conns.js +++ b/src/core/dns/conns.js @@ -13,20 +13,36 @@ import * as util from "../../commons/util.js"; */ export class TcpConnPool { + /** + * @param {int} size + * @param {int} ttl + */ constructor(size, ttl) { + /** @type {int} */ this.size = size; - // max sweeps per give/take + /** + * max sweeps per give/take + * @type {int} + */ this.maxsweep = Math.max((size / 4) | 0, 20); + /** @type {int} */ this.ttl = ttl; // ms const quarterttl = (ttl / 4) | 0; + /** @type {int} */ this.keepalive = Math.min(/* 60s*/ 60000, quarterttl); // ms + /** @type {int} */ this.lastSweep = 0; + /** @type {int} */ this.sweepGapMs = Math.max(/* 10s*/ 10000, quarterttl); // ms /** @type {Map} */ this.pool = new Map(); log.d("tcp-pool psz:", size, "msw:", this.maxsweep, "t:", ttl); } + /** + * @param {AnySock} socket + * @returns {boolean} + */ give(socket) { if (socket.pending) return false; if (!socket.writable) return false; @@ -40,6 +56,9 @@ export class TcpConnPool { return this.checkin(socket); } + /** + * @returns {AnySock?} + */ take() { const thres = this.maxsweep / 2; let out = null; @@ -68,9 +87,9 @@ export class TcpConnPool { } /** - * @param {import("net").Socket} sock + * @param {AnySock} sock * @param {Report} report - * @returns {import("net").Socket} + * @returns {AnySock} */ checkout(sock, report) { log.d(report.id, "checkout, size:", this.pool.size); @@ -88,6 +107,10 @@ export class TcpConnPool { return sock; } + /** + * @param {AnySock} socket + * @returns {boolean} + */ checkin(sock) { const report = this.mkreport(); @@ -102,6 +125,10 @@ export class TcpConnPool { return true; } + /** + * @param {boolean} clear + * @returns {boolean} + */ sweep(clear = false) { const sz = this.pool.size; if (sz <= 0) return false; @@ -122,11 +149,21 @@ export class TcpConnPool { return sz > this.pool.size; // size decreased post-sweep? } + /** + * @param {AnySock?} socket + * @returns {boolean} + */ ready(sock) { - return sock.readyState === "open"; + return sock && sock.readyState === "open"; } + /** + * @param {AnySock?} sock + * @param {Report} report + * @returns {boolean} + */ healthy(sock, report) { + if (!sock) return false; const destroyed = !sock.writable; const open = this.ready(sock); const fresh = report.fresh(this.ttl); @@ -136,10 +173,18 @@ export class TcpConnPool { return fresh; // healthy if not expired } + /** + * @param {AnySock} sock + * @param {Report} report + * @returns {boolean} + */ dead(sock, report) { return !this.healthy(sock, report); } + /** + * @param {AnySock?} sock + */ evict(sock) { this.pool.delete(sock); @@ -148,6 +193,7 @@ export class TcpConnPool { } catch (ignore) {} } + /** @return {Report} */ mkreport() { return new Report(util.uid("tcp")); } @@ -164,24 +210,39 @@ class Report { this.lastuse = Date.now(); } + /** @param {number} since */ fresh(since) { return this.lastuse + since >= Date.now(); } } export class UdpConnPool { + /** + * @param {int} size + * @param {int} ttl + */ constructor(size, ttl) { + /** @type {int} */ this.size = size; + /** @type {int} */ this.maxsweep = Math.max((size / 4) | 0, 20); + /** @type {int} */ this.ttl = Math.max(/* 60s*/ 60000, ttl); // no more than 60s + /** @type {int} */ this.lastSweep = 0; + /** @type {int} */ this.sweepGapMs = Math.max(/* 10s*/ 10000, (ttl / 2) | 0); // ms /** @type {Map} */ this.pool = new Map(); log.d("udp-pool psz:", size, "msw:", this.maxsweep, "t:", ttl); } + /** + * @param {AnySock?} socket + * @returns {boolean} + */ give(socket) { + if (!socket) return false; if (this.pool.has(socket)) return true; const free = this.pool.size < this.size || this.sweep(); @@ -190,6 +251,9 @@ export class UdpConnPool { return this.checkin(socket); } + /** + * @returns {AnySock?} + */ take() { const thres = this.maxsweep / 2; let out = null; @@ -217,9 +281,9 @@ export class UdpConnPool { } /** - * @param {import("dgram").Socket} sock + * @param {AnySock} sock * @param {Report} report - * @returns {import("dgram").Socket} + * @returns {AnySock} */ checkout(sock, report) { log.d(report.id, "checkout, size:", this.pool.size); @@ -231,6 +295,10 @@ export class UdpConnPool { return sock; } + /** + * @param {AnySock} socket + * @returns {boolean} + */ checkin(sock) { const report = this.mkreport(); @@ -243,6 +311,10 @@ export class UdpConnPool { return true; } + /** + * @param {boolean} clear + * @returns {boolean} + */ sweep(clear = false) { const sz = this.pool.size; if (sz <= 0) return false; @@ -263,6 +335,10 @@ export class UdpConnPool { return sz > this.pool.size; // size decreased post-sweep? } + /** + * @param {Report} report + * @returns {boolean} + */ healthy(report) { const fresh = report.fresh(this.ttl); const id = report.id; @@ -270,10 +346,17 @@ export class UdpConnPool { return fresh; // healthy if not expired } + /** + * @param {Report} report + * @returns {boolean} + */ dead(report) { return !this.healthy(report); } + /** + * @param {AnySock?} sock + */ evict(sock) { if (!sock) return; this.pool.delete(sock); @@ -282,6 +365,7 @@ export class UdpConnPool { sock.close(); } + /** @return {Report} */ mkreport() { return new Report(util.uid("udp")); } diff --git a/src/core/dns/transact.js b/src/core/dns/transact.js index 8145229ff9..79b122101d 100644 --- a/src/core/dns/transact.js +++ b/src/core/dns/transact.js @@ -87,8 +87,13 @@ export class TcpTx { } } - // TODO: Same code as in server.js, merge them + /** + * @param {string} rxid + * @param {Buffer} chunk + * @returns + */ onData(rxid, chunk) { + // TODO: Same code as in server.js, merge them if (this.done) { this.log.w(rxid, "on reads, tx is closed for business"); return chunk; @@ -165,6 +170,7 @@ export class TcpTx { this.no("timeout"); } + /** @returns {Promise} */ promisedRead() { const that = this; return new Promise((resolve, reject) => { @@ -173,6 +179,10 @@ export class TcpTx { }); } + /** + * @param {string} rxid + * @param {Buffer} query + */ write(rxid, query) { if (this.done) { this.log.w(rxid, "no writes, tx is done working"); @@ -191,26 +201,38 @@ export class TcpTx { }); } + /** + * @param {Buffer} val + */ yes(val) { this.done = true; this.resolve(val); } + /** + * @param {string?} reason + */ no(reason) { this.done = true; this.reject(reason); } + /** @returns {ScratchBuffer} */ makeReadBuffer() { - const qlenBuf = bufutil.createBuffer(dnsutil.dnsHeaderSize); - const qlenBufOffset = bufutil.recycleBuffer(qlenBuf); - - return { - qlenBuf: qlenBuf, - qlenBufOffset: qlenBufOffset, - qBuf: null, - qBufOffset: 0, - }; + return new ScratchBuffer(); + } +} + +class ScratchBuffer { + constructor() { + /** @type {Buffer} */ + this.qlenBuf = bufutil.createBuffer(dnsutil.dnsHeaderSize); + /** @type {int} */ + this.qlenBufOffset = bufutil.recycleBuffer(this.qlenBuf); + /** @type {Buffer} */ + this.qBuf = null; + /** @type {int} */ + this.qBufOffset = 0; } } @@ -278,12 +300,23 @@ export class UdpTx { } } + /** + * @param {string} rxid + * @param {Buffer} query + * @returns + */ write(rxid, query) { if (this.done) return; // discard this.log.d(rxid, "udp write"); this.sock.send(query); // err-on-write handled by onError } + /** + * @param {string} rxid + * @param {Buffer} b + * @param {AddrInfo} addrinfo + * @returns + */ onMessage(rxid, b, addrinfo) { if (this.done) return; // discard this.log.d(rxid, "udp read"); @@ -302,6 +335,10 @@ export class UdpTx { return err ? this.no("error") : this.no("close"); } + /** + * @param {int} timeout + * @returns {Promise} + */ promisedRead(timeout = 0) { const that = this; if (timeout > 0) { @@ -315,6 +352,7 @@ export class UdpTx { }); } + /** @param {Buffer} val */ yes(val) { if (this.done) return; @@ -323,6 +361,7 @@ export class UdpTx { this.resolve(val); } + /** @param {string} reason */ no(reason) { if (this.done) return; diff --git a/src/core/node/dns-transport.js b/src/core/node/dns-transport.js index a351d21092..061eea73ba 100644 --- a/src/core/node/dns-transport.js +++ b/src/core/node/dns-transport.js @@ -11,6 +11,19 @@ import * as util from "../../commons/util.js"; import { TcpConnPool, UdpConnPool } from "../dns/conns.js"; import { TcpTx, UdpTx } from "../dns/transact.js"; +/** + * @typedef {import("net").Socket | import("dgram").Socket} AnySock + * @typedef {import("net").Socket} TcpSock + * @typedef {import("dgram").Socket} UdpSock + */ + +/** + * + * @param {string} host + * @param {int} port + * @param {any} opts + * @returns {Transport} + */ export function makeTransport(host, port = 53, opts = {}) { return new Transport(host, port, opts); } @@ -25,14 +38,21 @@ export function makeTransport(host, port = 53, opts = {}) { export class Transport { constructor(host, port, opts = {}) { if (util.emptyString(host)) throw new Error("invalid host" + host); + /** @type {string} */ this.host = host; + /** @type {int} */ this.port = port || 53; + /** @type {int} */ this.connectTimeout = opts.connectTimeout || 3000; // 3s + /** @type {int} */ this.ioTimeout = opts.ioTimeout || 10000; // 10s + /** @type {int} */ this.ipproto = net.isIP(host); // 4, 6, or 0 const sz = opts.poolSize || 500; // conns const ttl = opts.poolTtl || 60000; // 1m + /** @type {TcpConnPool} */ this.tcpconns = new TcpConnPool(sz, ttl); + /** @type {UdpConnPool} */ this.udpconns = new UdpConnPool(sz, ttl); this.log = log.withTags("DnsTransport"); @@ -45,10 +65,16 @@ export class Transport { this.log.i("transport teardown (tcp | udp) done?", r1, "|", r2); } + /** + * @param {string} rxid + * @param {Buffer} q + * @returns {Promise|null} + */ async udpquery(rxid, q) { let sock = this.udpconns.take(); this.log.d(rxid, "udp pooled?", sock !== null); + /** @type {Buffer?} */ let ans = null; try { sock = sock || (await this.makeConn("udp")); @@ -61,10 +87,16 @@ export class Transport { return ans; } + /** + * @param {string} rxid + * @param {Buffer} q + * @returns {Promise|null} + */ async tcpquery(rxid, q) { let sock = this.tcpconns.take(); this.log.d(rxid, "tcp pooled?", sock !== null); + /** @type {Buffer?} */ let ans = null; try { sock = sock || (await this.makeConn("tcp")); @@ -78,6 +110,10 @@ export class Transport { return ans; } + /** + * @param {AnySock} sock + * @param {string} proto + */ parkConn(sock, proto) { if (proto === "tcp") { const ok = this.tcpconns.give(sock); @@ -88,6 +124,11 @@ export class Transport { } } + /** + * @param {string} proto + * @returns {Promise} + * @throws {Error} + */ makeConn(proto) { if (proto === "tcp") { const tcpconnect = (cb) => { @@ -115,17 +156,18 @@ export class Transport { } /** - * @param {import("net").Socket} sock + * @param {TcpSock?} sock */ closeTcp(sock) { + if (!sock) return; // the socket is not expected to have any error-listeners // so we add one to avoid unhandled errors sock.on("error", util.stub); - if (sock && !sock.destroyed) sock.destroySoon(); + if (!sock.destroyed) sock.destroySoon(); } /** - * @param {import("dgram").Socket} sock + * @param {UdpSock?} sock */ closeUdp(sock) { if (!sock || sock.destroyed) return; From cc6e7988aaf453f10d16fac080a592204adb5996 Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Fri, 27 Sep 2024 06:50:26 +0530 Subject: [PATCH 24/41] doh: add cors for user-agent=dohjs --- src/commons/util.js | 3 ++- src/core/doh.js | 9 +++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/src/commons/util.js b/src/commons/util.js index a639e13908..5c53da34b4 100644 --- a/src/commons/util.js +++ b/src/commons/util.js @@ -13,7 +13,8 @@ // musn't import any non-std modules export function fromBrowser(ua) { - return ua && ua.startsWith("Mozilla/5.0"); + if (emptyString(ua)) return false; + return ua.startsWith("Mozilla/5.0") || ua.startsWith("dohjs/"); } export function jsonHeaders() { diff --git a/src/core/doh.js b/src/core/doh.js index 619815c3af..fc0749409b 100644 --- a/src/core/doh.js +++ b/src/core/doh.js @@ -61,11 +61,20 @@ function optionsRequest(request) { return request.method === "OPTIONS"; } +/** + * @param {IOState} io + * @param {Error} err + */ function errorResponse(io, err = null) { const eres = pres.errResponse("doh.js", err); io.dnsExceptionResponse(eres); } +/** + * @param {IOState} io + * @param {string} ua + * @returns {Response} + */ function withCors(io, ua) { if (util.fromBrowser(ua)) io.setCorsHeadersIfNeeded(); return io.httpResponse; From 45c079957f09ce86f03f2f00c32f0bcf7612d60e Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Fri, 27 Sep 2024 07:03:05 +0530 Subject: [PATCH 25/41] core/io: doh cache headers --- src/commons/dnsutil.js | 5 +++++ src/core/io-state.js | 11 +++++++++++ 2 files changed, 16 insertions(+) diff --git a/src/commons/dnsutil.js b/src/commons/dnsutil.js index c40fab6020..5967327c6d 100644 --- a/src/commons/dnsutil.js +++ b/src/commons/dnsutil.js @@ -377,6 +377,11 @@ export function isAnswerQuad0(packet) { return isAnswerBlocked(packet.answers); } +export function ttl(packet) { + if (!hasAnswers(packet)) return 0; + return packet.answers[0].ttl || 0; +} + /** * @param {any} dnsPacket * @returns {string[]} diff --git a/src/core/io-state.js b/src/core/io-state.js index 1c56ff5494..45d3ac822b 100644 --- a/src/core/io-state.js +++ b/src/core/io-state.js @@ -191,6 +191,7 @@ export default class IOState { return util.concatHeaders( util.dnsHeaders(), util.contentLengthHeader(b), + this.cacheHeaders(), xNileRegion, xNileFlags, xNileFlagsOk @@ -215,6 +216,16 @@ export default class IOState { } } + // set cache from ttl in decoded-dns-packet + cacheHeaders() { + const ttl = dnsutil.ttl(this.decodedDnsPacket); + if (ttl <= 0) return null; + + return { + "cache-control": "public, max-age=" + ttl, + }; + } + assignBlockResponse() { let done = this.initFlagsAndAnswers(); done = done && this.addData(); From ad46d2e1aea53fb4f838cdb24fc1a8b85ea5232f Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Fri, 27 Sep 2024 07:08:12 +0530 Subject: [PATCH 26/41] core/io: add block flag headers iff non-empty --- src/core/io-state.js | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/core/io-state.js b/src/core/io-state.js index 45d3ac822b..56199b020a 100644 --- a/src/core/io-state.js +++ b/src/core/io-state.js @@ -182,8 +182,11 @@ export default class IOState { } headers(b = null) { - const xNileFlags = this.isDnsBlock ? { "x-nile-flags": this.flag } : null; - const xNileFlagsOk = !xNileFlags ? { "x-nile-flags-dn": this.flag } : null; + const hasBlockFlag = !util.emptyString(this.flag); + const isBlocked = hasBlockFlag && this.isDnsBlock; + const couldBlock = hasBlockFlag && !this.isDnsBlock; + const xNileFlags = isBlocked ? { "x-nile-flags": this.flag } : null; + const xNileFlagsOk = couldBlock ? { "x-nile-flags-dn": this.flag } : null; const xNileRegion = !util.emptyString(this.region) ? { "x-nile-region": this.region } : null; From 138cbc5a6434672bbb96d5ed4e8695f2619e6d5a Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Fri, 27 Sep 2024 18:46:58 +0530 Subject: [PATCH 27/41] core/io: add debug headers iff not prod --- src/commons/envutil.js | 6 ++++++ src/commons/util.js | 7 ------- src/core/io-state.js | 29 ++++++++++++++++++++++------- src/core/log.js | 2 +- src/core/node/config.js | 2 +- 5 files changed, 30 insertions(+), 16 deletions(-) diff --git a/src/commons/envutil.js b/src/commons/envutil.js index f308a5128d..9d94412154 100644 --- a/src/commons/envutil.js +++ b/src/commons/envutil.js @@ -8,6 +8,12 @@ // musn't import /depend on anything. +export function isProd() { + if (!envManager) return false; + + return envManager.determineEnvStage() === "production"; +} + export function onFly() { if (!envManager) return false; diff --git a/src/commons/util.js b/src/commons/util.js index 5c53da34b4..bc0c51934b 100644 --- a/src/commons/util.js +++ b/src/commons/util.js @@ -526,13 +526,6 @@ export function stub(...args) { }; } -export function stubr(r, ...args) { - return (...args) => { - /* no-op */ - return r; - }; -} - export function stubAsync(...args) { return async (...args) => { /* no-op */ diff --git a/src/core/io-state.js b/src/core/io-state.js index 56199b020a..57dae0b25f 100644 --- a/src/core/io-state.js +++ b/src/core/io-state.js @@ -8,22 +8,36 @@ import * as bufutil from "../commons/bufutil.js"; import * as dnsutil from "../commons/dnsutil.js"; +import * as envutil from "../commons/envutil.js"; import * as util from "../commons/util.js"; export default class IOState { constructor() { + /** @type {string} */ this.flag = ""; + /** @type {any} */ this.decodedDnsPacket = this.emptyDecodedDnsPacket(); - /** @type {Response} */ - this.httpResponse = undefined; + /** @type {Response?} */ + this.httpResponse = null; + /** @type {boolean} */ + this.isProd = envutil.isProd(); + /** @type {boolean} */ this.isException = false; - this.exceptionStack = undefined; + /** @type {string} */ + this.exceptionStack = null; + /** @type {string} */ this.exceptionFrom = ""; + /** @type {boolean} */ this.isDnsBlock = false; + /** @type {boolean} */ this.alwaysGatewayAnswer = false; + /** @type {string} */ this.gwip4 = ""; + /** @type {string} */ this.gwip6 = ""; + /** @type {string} */ this.region = ""; + /** @type {boolean} */ this.stopProcessing = false; this.log = log.withTags("IOState"); } @@ -84,7 +98,7 @@ export default class IOState { this.httpResponse = new Response(servfail, { headers: util.concatHeaders( this.headers(servfail), - this.additionalHeader(JSON.stringify(ex)) + this.debugHeaders(JSON.stringify(ex)) ), status: servfail ? 200 : 408, // rfc8484 section-4.2.1 }); @@ -148,7 +162,7 @@ export default class IOState { this.httpResponse = new Response(null, { headers: util.concatHeaders( this.headers(), - this.additionalHeader(JSON.stringify(this.exceptionStack)) + this.debugHeaders(JSON.stringify(this.exceptionStack)) ), status: 503, }); @@ -174,7 +188,7 @@ export default class IOState { this.httpResponse = new Response(null, { headers: util.concatHeaders( this.headers(), - this.additionalHeader(JSON.stringify(this.exceptionStack)) + this.debugHeaders(JSON.stringify(this.exceptionStack)) ), status: 503, }); @@ -201,7 +215,8 @@ export default class IOState { ); } - additionalHeader(json) { + debugHeaders(json) { + if (this.isProd) return null; if (!json) return null; return { diff --git a/src/core/log.js b/src/core/log.js index 13df003294..f841cbec26 100644 --- a/src/core/log.js +++ b/src/core/log.js @@ -9,7 +9,7 @@ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ -import { uid, stub, stubr } from "../commons/util.js"; +import { uid, stub } from "../commons/util.js"; /** * @typedef {'error'|'logpush'|'warn'|'info'|'timer'|'debug'} LogLevels diff --git a/src/core/node/config.js b/src/core/node/config.js index d1b7e9bc26..31328fad71 100644 --- a/src/core/node/config.js +++ b/src/core/node/config.js @@ -127,7 +127,7 @@ async function prep() { log.i("imported udp/tcp dns transport", plainOldDnsIp); } - /** signal ready */ + // signal ready system.pub("ready", [dns53]); } From 7ecb04b3a0e8b674a917273958751966357e831c Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Fri, 27 Sep 2024 22:05:09 +0530 Subject: [PATCH 28/41] core/dns: missing socket callback args --- src/core/dns/transact.js | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/src/core/dns/transact.js b/src/core/dns/transact.js index 79b122101d..3a57d9deb6 100644 --- a/src/core/dns/transact.js +++ b/src/core/dns/transact.js @@ -63,7 +63,7 @@ export class TcpTx { this.onTimeout(rxid); }; const onError = (err) => { - this.onError(rxid); + this.onError(rxid, err); }; try { @@ -93,15 +93,16 @@ export class TcpTx { * @returns */ onData(rxid, chunk) { + const cl = bufutil.len(chunk); + // TODO: Same code as in server.js, merge them if (this.done) { - this.log.w(rxid, "on reads, tx is closed for business"); + this.log.w(rxid, "on reads, tx closed; discard", cl); return chunk; } const sb = this.readBuffer; - const cl = chunk.byteLength; if (cl <= 0) return; // read header first which contains length(dns-query) @@ -154,18 +155,18 @@ export class TcpTx { } // continue reading from socket } - onClose(err) { + onClose(rxid, err) { if (this.done) return; // no-op - return err ? this.no("error") : this.no("close"); + return err ? this.no(err.message) : this.no("close"); } - onError(err) { + onError(rxid, err) { if (this.done) return; // no-op this.log.e(rxid, "udp err", err.message); this.no(err.message); } - onTimeout() { + onTimeout(rxid) { if (this.done) return; // no-op this.no("timeout"); } @@ -184,20 +185,22 @@ export class TcpTx { * @param {Buffer} query */ write(rxid, query) { + const qlen = bufutil.len(query); + const hlen = bufutil.len(header); if (this.done) { - this.log.w(rxid, "no writes, tx is done working"); + this.log.w(rxid, "no writes, tx is done; discard", qlen); return query; } const header = bufutil.createBuffer(dnsutil.dnsHeaderSize); bufutil.recycleBuffer(header); - header.writeUInt16BE(query.byteLength); + header.writeUInt16BE(qlen); this.sock.write(header, () => { - this.log.d(rxid, "len(header):", header.byteLength); + this.log.d(rxid, "tcp write hdr:", hlen); }); this.sock.write(query, () => { - this.log.d(rxid, "len(query):", query.byteLength); + this.log.d(rxid, "tcp write q:", qlen); }); } @@ -210,7 +213,7 @@ export class TcpTx { } /** - * @param {string?} reason + * @param {string?|Error} reason */ no(reason) { this.done = true; @@ -307,7 +310,7 @@ export class UdpTx { */ write(rxid, query) { if (this.done) return; // discard - this.log.d(rxid, "udp write"); + this.log.d(rxid, "udp write", bufutil.len(query)); this.sock.send(query); // err-on-write handled by onError } @@ -319,7 +322,7 @@ export class UdpTx { */ onMessage(rxid, b, addrinfo) { if (this.done) return; // discard - this.log.d(rxid, "udp read"); + this.log.d(rxid, "udp read", bufutil.len(b)); this.yes(b); } @@ -361,7 +364,7 @@ export class UdpTx { this.resolve(val); } - /** @param {string} reason */ + /** @param {string|Error} reason */ no(reason) { if (this.done) return; From 1d7101e85d0bf262c424d9435c619b2af9d22aea Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Fri, 27 Sep 2024 22:05:35 +0530 Subject: [PATCH 29/41] core/io: m debug log dns response --- src/core/io-state.js | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/core/io-state.js b/src/core/io-state.js index 57dae0b25f..06ad5d19f3 100644 --- a/src/core/io-state.js +++ b/src/core/io-state.js @@ -95,6 +95,7 @@ export default class IOState { exceptionStack: this.exceptionStack, }; + this.logDnsPkt(); this.httpResponse = new Response(servfail, { headers: util.concatHeaders( this.headers(servfail), @@ -137,11 +138,23 @@ export default class IOState { this.decodedDnsPacket = dnsPacket || dnsutil.decode(arrayBuffer); } + this.logDnsPkt(); this.httpResponse = new Response(arrayBuffer, { headers: this.headers(arrayBuffer), }); } + logDnsPkt() { + this.log.d( + "domains", + dnsutil.extractDomains(this.decodedDnsPacket), + "data", + dnsutil.getInterestingAnswerData(this.decodedDnsPacket), + "ttl", + dnsutil.ttl(this.decodedDnsPacket) + ); + } + dnsBlockResponse(blockflag) { this.initDecodedDnsPacketIfNeeded(); this.stopProcessing = true; From 73f8d2035f0530c7fd0f228b997f99f20065136c Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Fri, 27 Sep 2024 22:06:06 +0530 Subject: [PATCH 30/41] node: sock addrs may be undefined --- src/server-node.js | 1 + 1 file changed, 1 insertion(+) diff --git a/src/server-node.js b/src/server-node.js index 5be9f9b1b3..dd687e4772 100644 --- a/src/server-node.js +++ b/src/server-node.js @@ -468,6 +468,7 @@ function trapSecureServerEvents(s) { */ function addrstr(sock) { if (!sock) return ""; + if (sock.localAddress == null || sock.remoteAddress == null) return ""; return ( `[${sock.localAddress}]:${sock.localPort}` + "->" + From 953e0d19fd8a2049df53a2632244ab2d36b44bdf Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Sat, 28 Sep 2024 06:10:45 +0530 Subject: [PATCH 31/41] Fly: disable swap when using auto suspend --- fly.tls.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fly.tls.toml b/fly.tls.toml index 1e3b420251..30240c3e91 100644 --- a/fly.tls.toml +++ b/fly.tls.toml @@ -2,7 +2,8 @@ app = "" kill_signal = "SIGINT" kill_timeout = "15s" -swap_size_mb = 152 +# swap must be disabled when using "suspend" +# swap_size_mb = 152 [build] dockerfile = "node.Dockerfile" From a03377662c878e15ac0fd6d066efd9e8dea3e472 Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Sat, 28 Sep 2024 06:12:00 +0530 Subject: [PATCH 32/41] dnsutil: distinct err values for getInterestingAnswerData --- src/commons/dnsutil.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/commons/dnsutil.js b/src/commons/dnsutil.js index 5967327c6d..37b9006db3 100644 --- a/src/commons/dnsutil.js +++ b/src/commons/dnsutil.js @@ -426,7 +426,7 @@ export function extractDomains(dnsPacket) { export function getInterestingAnswerData(packet, maxlen = 80, delim = "|") { if (!hasAnswers(packet)) { - return !util.emptyObj(packet) ? packet.rcode || "WTF" : "WTF"; + return !util.emptyObj(packet) ? packet.rcode || "WTF1" : "WTF2"; } // set to true if at least one ip has been captured from ans From 71bf7a09d8c5b1a8b8446dd5ccb30fdf4558d35b Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Sat, 28 Sep 2024 06:12:51 +0530 Subject: [PATCH 33/41] various: m logs --- src/core/io-state.js | 3 ++- src/plugins/dns-op/resolver.js | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/core/io-state.js b/src/core/io-state.js index 06ad5d19f3..686a7898b8 100644 --- a/src/core/io-state.js +++ b/src/core/io-state.js @@ -145,12 +145,13 @@ export default class IOState { } logDnsPkt() { + if (this.isProd) return; this.log.d( "domains", dnsutil.extractDomains(this.decodedDnsPacket), + dnsutil.getQueryType(this.decodedDnsPacket) || "", "data", dnsutil.getInterestingAnswerData(this.decodedDnsPacket), - "ttl", dnsutil.ttl(this.decodedDnsPacket) ); } diff --git a/src/plugins/dns-op/resolver.js b/src/plugins/dns-op/resolver.js index 20dfe3cc98..a4b3ac0f7d 100644 --- a/src/plugins/dns-op/resolver.js +++ b/src/plugins/dns-op/resolver.js @@ -265,7 +265,7 @@ export default class DNSResolver { // check outgoing cached dns-packet against blocklists this.blocker.blockAnswer(rxid, /* out*/ r, blInfo); const fromCache = cacheutil.hasCacheHeader(res.headers); - this.log.d(rxid, "ans block?", r.isBlocked, "from cache?", fromCache); + this.log.d(rxid, "ansblock?", r.isBlocked, "fromcache?", fromCache); // if res was got from caches or if res was got from max doh (ie, blf // wasn't used to retrieve stamps), then skip hydrating the cache @@ -311,7 +311,7 @@ export default class DNSResolver { this.log.d(rxid, "primeCache: block?", blocked, "k", k.href); if (!k) { - this.log.d(rxid, "no cache-key, url/query missing?", k, r.stamps); + this.log.d(rxid, "primeCache: no key, url/query missing?", k, r.stamps); return; } From 2905c688157046a82637abfc0a3f608ee961ea8b Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Sun, 29 Sep 2024 02:18:25 +0530 Subject: [PATCH 34/41] core/dns: m uninit var --- src/core/dns/transact.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/dns/transact.js b/src/core/dns/transact.js index 3a57d9deb6..cb525b6503 100644 --- a/src/core/dns/transact.js +++ b/src/core/dns/transact.js @@ -186,13 +186,13 @@ export class TcpTx { */ write(rxid, query) { const qlen = bufutil.len(query); - const hlen = bufutil.len(header); if (this.done) { this.log.w(rxid, "no writes, tx is done; discard", qlen); return query; } const header = bufutil.createBuffer(dnsutil.dnsHeaderSize); + const hlen = bufutil.len(header); bufutil.recycleBuffer(header); header.writeUInt16BE(qlen); From 953059059816bbe3277d133658a97ede8b36df08 Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Sun, 29 Sep 2024 02:19:10 +0530 Subject: [PATCH 35/41] core/node: dns53 default on node --- src/core/node/config.js | 5 ++--- src/core/node/dns-transport.js | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/core/node/config.js b/src/core/node/config.js index 31328fad71..1210a48b48 100644 --- a/src/core/node/config.js +++ b/src/core/node/config.js @@ -119,12 +119,11 @@ async function prep() { // TODO: move dns* related settings to env // flydns is always ipv6 (fdaa::53) const plainOldDnsIp = onFly ? "fdaa::3" : "1.1.1.2"; - let dns53 = null; + const dns53 = dnst.makeTransport(plainOldDnsIp); + log.i("imported udp/tcp dns transport", plainOldDnsIp); if (onFly) { // recursive resolver on Fly // swapon won't work on fly: community.fly.io/t/19196/13 - dns53 = dnst.makeTransport(plainOldDnsIp); - log.i("imported udp/tcp dns transport", plainOldDnsIp); } // signal ready diff --git a/src/core/node/dns-transport.js b/src/core/node/dns-transport.js index 061eea73ba..490c181060 100644 --- a/src/core/node/dns-transport.js +++ b/src/core/node/dns-transport.js @@ -94,7 +94,7 @@ export class Transport { */ async tcpquery(rxid, q) { let sock = this.tcpconns.take(); - this.log.d(rxid, "tcp pooled?", sock !== null); + this.log.d(rxid, "tcp pooled?", sock != null); /** @type {Buffer?} */ let ans = null; From 781083667c9bc63ca24427f3f97adabc8cff458f Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Sun, 29 Sep 2024 08:43:05 +0530 Subject: [PATCH 36/41] system: sticky parcel & jsdocs --- src/system.js | 93 +++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 75 insertions(+), 18 deletions(-) diff --git a/src/system.js b/src/system.js index 91997f239f..622d11d7ab 100644 --- a/src/system.js +++ b/src/system.js @@ -9,6 +9,10 @@ import * as util from "./commons/util.js"; // Evaluate if EventTarget APIs can replace this hand-rolled impl // developers.cloudflare.com/workers/platform/changelog#2021-09-24 + +/** @typedef {any[]?} parcel */ +/** @typedef {function(parcel)} listenfn */ + // once emitted, they stick; firing off new listeners forever, just the once. const stickyEvents = new Set([ // when process bring-up is done @@ -21,12 +25,17 @@ const stickyEvents = new Set([ "go", ]); +/** @type {Map} */ +const stickyParcels = new Map(); + const events = new Set([ // when server should cease "stop", ]); +/** @type {Map>} */ const listeners = new Map(); +/** @type {Map>} */ const waitGroup = new Map(); (() => { @@ -41,40 +50,65 @@ const waitGroup = new Map(); } })(); -// fires an event +/** + * Fires event. + * @param {string} event + * @param {parcel} parcel + */ export function pub(event, parcel = undefined) { awaiters(event, parcel); callbacks(event, parcel); } -// invokes cb when event is fired -export function sub(event, cb) { +/** + * Invokes cb when event is fired. + * @param {string} event + * @param {listenfn} cb + * @param {int} timeout + * @returns {boolean} + */ +export function sub(event, cb, timeout = 0) { const eventCallbacks = listeners.get(event); // if such even callbacks don't exist if (!eventCallbacks) { // but event is sticky, fire off the listener at once if (stickyEvents.has(event)) { - microtaskBox(cb); + const parcel = stickyParcels.get(event); // may be null + microtaskBox(cb, parcel); return true; } // but event doesn't exist, then there's nothing to do return false; } - eventCallbacks.add(cb); + const tid = timeout > 0 ? util.timeout(timeout, cb) : -2; + const fulfiller = + tid > 0 + ? (parcel) => { + clearTimeout(tid); + cb(parcel); + } + : cb; + eventCallbacks.add(fulfiller); return true; } -// waits till event fires or timesout +/** + * Waits till event fires or timesout. + * @param {string} event + * @param {int} timeout + * @returns {Promise} + */ export function when(event, timeout = 0) { const wg = waitGroup.get(event); if (!wg) { // if stick event, fulfill promise right away if (stickyEvents.has(event)) { - return Promise.resolve(event); + const parcel = stickyParcels.get(event); // may be null + return Promise.resolve(parcel); } // no such event return Promise.reject(new Error(event + " missing")); @@ -87,15 +121,20 @@ export function when(event, timeout = 0) { reject(new Error(event + " elapsed " + timeout)); }) : -2; - const fulfiller = function (parcel) { + /** @type {listenfn} */ + const fulfiller = (parcel) => { if (tid >= 0) clearTimeout(tid); - accept(parcel, event); + accept(parcel); }; wg.add(fulfiller); }); } -function awaiters(event, parcel) { +/** + * @param {string} event + * @param {parcel} parcel + */ +function awaiters(event, parcel = null) { const g = waitGroup.get(event); if (!g) return; @@ -103,12 +142,17 @@ function awaiters(event, parcel) { // listeners valid just the once for stickyEvents if (stickyEvents.has(event)) { waitGroup.delete(event); + stickyParcels.set(event, parcel); } safeBox(g, parcel); } -function callbacks(event, parcel) { +/** + * @param {string} event + * @param {parcel} parcel + */ +function callbacks(event, parcel = null) { const cbs = listeners.get(event); if (!cbs) return; @@ -116,6 +160,7 @@ function callbacks(event, parcel) { // listeners valid just the once for stickyEvents if (stickyEvents.has(event)) { listeners.delete(event); + stickyParcels.set(event, parcel); } // callbacks are queued async and don't block the caller. On Workers, @@ -126,18 +171,25 @@ function callbacks(event, parcel) { microtaskBox(cbs, parcel); } -// TODO: could be replaced with scheduler.wait -// developers.cloudflare.com/workers/platform/changelog#2021-12-10 -// queues fn in a macro-task queue of the event-loop -// exec order: github.com/nodejs/node/issues/22257 +/** + * Queues fn in a macro-task queue of the event-loop + * exec order: github.com/nodejs/node/issues/22257 + * @param {listenfn} fn + */ export function taskBox(fn) { + // TODO: could be replaced with scheduler.wait + // developers.cloudflare.com/workers/platform/changelog#2021-12-10 util.timeout(/* with 0ms delay*/ 0, () => safeBox(fn)); } -// queues fn in a micro-task queue // ref: MDN: Web/API/HTML_DOM_API/Microtask_guide/In_depth // queue-task polyfill: stackoverflow.com/a/61605098 const taskboxPromise = { p: Promise.resolve() }; +/** + * Queues fns in a micro-task queue + * @param {listenfn[]} fns + * @param {parcel} arg + */ function microtaskBox(fns, arg) { let enqueue = null; if (typeof queueMicrotask === "function") { @@ -149,9 +201,14 @@ function microtaskBox(fns, arg) { enqueue(() => safeBox(fns, arg)); } -// TODO: safeBox for async fns with r.push(await f())? -// stackoverflow.com/questions/38508420 +/** + * stackoverflow.com/questions/38508420 + * @param {listenfn[]|listenfn?} fns + * @param {parcel} arg + * @returns {any[]} + */ function safeBox(fns, arg) { + // TODO: safeBox for async fns with r.push(await f())? if (typeof fns === "function") { fns = [fns]; } From 10dbc94bdc287149118a0bf9bb3546759e380b1a Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Sun, 29 Sep 2024 12:29:09 +0530 Subject: [PATCH 37/41] commons/envutil: m jsdoc --- src/commons/envutil.js | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/commons/envutil.js b/src/commons/envutil.js index 9d94412154..e931590256 100644 --- a/src/commons/envutil.js +++ b/src/commons/envutil.js @@ -95,6 +95,9 @@ export function isDeno() { return envManager.r() === "deno"; } +/** + * in milliseconds + */ export function workersTimeout(missing = 0) { if (!envManager) return missing; return envManager.get("WORKER_TIMEOUT") || missing; From 8b1bc5c1a8f03633a56e605e33a22c92e7ad18f1 Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Sun, 29 Sep 2024 12:29:45 +0530 Subject: [PATCH 38/41] core/io: m decode synth servfails --- src/core/io-state.js | 1 + 1 file changed, 1 insertion(+) diff --git a/src/core/io-state.js b/src/core/io-state.js index 686a7898b8..bd33e9f7a2 100644 --- a/src/core/io-state.js +++ b/src/core/io-state.js @@ -94,6 +94,7 @@ export default class IOState { exceptionFrom: this.exceptionFrom, exceptionStack: this.exceptionStack, }; + this.decodedDnsPacket = dnsutil.decode(servfail); this.logDnsPkt(); this.httpResponse = new Response(servfail, { From 180d5c9dc0dddf19987b941dc8fda1d7e3b5838b Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Sun, 29 Sep 2024 12:30:09 +0530 Subject: [PATCH 39/41] core/node: rmv no-op code --- src/core/node/config.js | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/core/node/config.js b/src/core/node/config.js index 1210a48b48..3caacb6cbb 100644 --- a/src/core/node/config.js +++ b/src/core/node/config.js @@ -121,10 +121,6 @@ async function prep() { const plainOldDnsIp = onFly ? "fdaa::3" : "1.1.1.2"; const dns53 = dnst.makeTransport(plainOldDnsIp); log.i("imported udp/tcp dns transport", plainOldDnsIp); - if (onFly) { - // recursive resolver on Fly - // swapon won't work on fly: community.fly.io/t/19196/13 - } // signal ready system.pub("ready", [dns53]); From bd725aba29f1f308c921751b4fa03c6f0ed8e33f Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Sun, 29 Sep 2024 12:51:07 +0530 Subject: [PATCH 40/41] resolver: coalesce requests --- src/plugins/cache-util.js | 2 +- src/plugins/dns-op/resolver.js | 62 +++++++++++++++++++++++++--------- src/system.js | 61 +++++++++++++++++++++++++-------- 3 files changed, 93 insertions(+), 32 deletions(-) diff --git a/src/plugins/cache-util.js b/src/plugins/cache-util.js index 5bdc5800c7..561d76597b 100644 --- a/src/plugins/cache-util.js +++ b/src/plugins/cache-util.js @@ -118,7 +118,7 @@ function updateTtl(packet, end) { } } -function makeId(packet) { +export function makeId(packet) { // multiple questions are kind of an undefined behaviour // stackoverflow.com/a/55093896 if (!dnsutil.hasSingleQuestion(packet)) return null; diff --git a/src/plugins/dns-op/resolver.js b/src/plugins/dns-op/resolver.js index a4b3ac0f7d..d37d0c543e 100644 --- a/src/plugins/dns-op/resolver.js +++ b/src/plugins/dns-op/resolver.js @@ -13,6 +13,7 @@ import * as dnsutil from "../../commons/dnsutil.js"; import * as bufutil from "../../commons/bufutil.js"; import * as util from "../../commons/util.js"; import * as envutil from "../../commons/envutil.js"; +import * as system from "../../system.js"; import { BlocklistFilter } from "../rethinkdns/filter.js"; export default class DNSResolver { @@ -33,9 +34,11 @@ export default class DNSResolver { this.log = log.withTags("DnsResolver"); this.measurements = []; + this.coalstats = { tot: 0, pub: 0, empty: 0, try: 0 }; this.profileResolve = envutil.profileDnsResolves(); // only valid on nodejs this.forceDoh = envutil.forceDoh(); + this.timeout = (envutil.workersTimeout() / 2) | 0; // only valid on workers // bg-bw-init results in higher io-wait, not lower @@ -342,24 +345,39 @@ DNSResolver.prototype.resolveDnsUpstream = async function ( query, packet ) { - // Promise.any on promisedPromises[] only works if there are - // zero awaits in this function or any of its downstream calls. - // Otherwise, the first reject in promisedPromises[], before - // any statement in the call-stack awaits, would throw unhandled - // error, since the event loop would have 'ticked' and Promise.any - // on promisedPromises[] would still not have been executed, as it - // is the last statement of this function (which would have eaten up - // all rejects as long as there was one resolved promise). - const promisedPromises = []; - // if no doh upstreams set, resolve over plain-old dns if (util.emptyArray(resolverUrls)) { + const eid = cacheutil.makeId(packet); + /** @type {ArrayBuffer[]?} */ + let parcel = null; + + try { + const g = await system.when(eid, this.timeout); + this.coalstats.tot += 1; + if (!util.emptyArray(g) && g[0] != null) { + const sz = bufutil.len(g[0]); + this.log.d(rxid, "coalesced", eid, sz, this.coalstats); + if (sz > 0) return Promise.resolve(new Response(g[0])); + } + this.coalstats.empty += 1; + this.log.e(rxid, "empty coalesced", eid, this.coalstats); + return Promise.resolve(util.respond503()); + } catch (reason) { + // happens on timeout or if new event, eid + this.coalstats.try += 1; + this.log.d(rxid, "not coalesced", eid, reason, this.coalstats); + } + if (this.transport == null) { this.log.e(rxid, "plain dns transport not set"); + this.coalstats.pub += 1; + system.pub(eid, parcel); return Promise.reject(new Error("plain dns transport not set")); } - // do not let exceptions passthrough to the caller + + let promisedResponse = null; try { + // do not let exceptions passthrough to the caller const q = bufutil.bufferOf(query); let ans = await this.transport.udpquery(rxid, q); @@ -369,19 +387,31 @@ DNSResolver.prototype.resolveDnsUpstream = async function ( } if (ans) { - const r = new Response(bufutil.arrayBufferOf(ans)); - promisedPromises.push(Promise.resolve(r)); + const ab = bufutil.arrayBufferOf(ans); + parcel = [ab]; + promisedResponse = Promise.resolve(new Response(ab)); } else { - promisedPromises.push(Promise.resolve(util.respond503())); + promisedResponse = Promise.resolve(util.respond503()); } } catch (e) { this.log.e(rxid, "err when querying plain old dns", e.stack); - promisedPromises.push(Promise.reject(e)); + promisedResponse = Promise.reject(e); } - return Promise.any(promisedPromises); + this.coalstats.pub += 1; + system.pub(eid, parcel); + return promisedResponse; } + // Promise.any on promisedPromises[] only works if there are + // zero awaits in this function or any of its downstream calls. + // Otherwise, the first reject in promisedPromises[], before + // any statement in the call-stack awaits, would throw unhandled + // error, since the event loop would have 'ticked' and Promise.any + // on promisedPromises[] would still not have been executed, as it + // is the last statement of this function (which would have eaten up + // all rejects as long as there was one resolved promise). + const promisedPromises = []; try { // upstream to cache this.log.d(rxid, "upstream cache"); diff --git a/src/system.js b/src/system.js index 622d11d7ab..62481fe266 100644 --- a/src/system.js +++ b/src/system.js @@ -33,6 +33,9 @@ const events = new Set([ "stop", ]); +/** @type {Set} */ +const ephemeralEvents = new Set(); + /** @type {Map>} */ const listeners = new Map(); /** @type {Map>} */ @@ -54,10 +57,13 @@ const waitGroup = new Map(); * Fires event. * @param {string} event * @param {parcel} parcel + * @returns {int} */ -export function pub(event, parcel = undefined) { - awaiters(event, parcel); - callbacks(event, parcel); +export function pub(event, parcel = null) { + if (util.emptyString(event)) return; + + const tot = awaiters(event, parcel); + return tot + callbacks(event, parcel); } /** @@ -68,17 +74,21 @@ export function pub(event, parcel = undefined) { * @returns {boolean} */ export function sub(event, cb, timeout = 0) { + if (util.emptyString(event)) return; + if (typeof cb !== "function") return; + const eventCallbacks = listeners.get(event); - // if such even callbacks don't exist if (!eventCallbacks) { - // but event is sticky, fire off the listener at once + // event is sticky, fire off the listener at once if (stickyEvents.has(event)) { const parcel = stickyParcels.get(event); // may be null microtaskBox(cb, parcel); return true; } - // but event doesn't exist, then there's nothing to do + // event doesn't exist so make it ephemeral + ephemeralEvents.add(event); + listeners.set(event, new Set()); return false; } @@ -102,6 +112,10 @@ export function sub(event, cb, timeout = 0) { * @returns {Promise} */ export function when(event, timeout = 0) { + if (util.emptyString(event)) { + return Promise.reject(new Error("empty event")); + } + const wg = waitGroup.get(event); if (!wg) { @@ -110,15 +124,17 @@ export function when(event, timeout = 0) { const parcel = stickyParcels.get(event); // may be null return Promise.resolve(parcel); } - // no such event - return Promise.reject(new Error(event + " missing")); + // no such event so make it ephemeral + ephemeralEvents.add(event); + waitGroup.set(event, new Set()); + return Promise.reject(new Error(event + " missing event")); } return new Promise((accept, reject) => { const tid = timeout > 0 ? util.timeout(timeout, () => { - reject(new Error(event + " elapsed " + timeout)); + reject(new Error(event + " event elapsed " + timeout)); }) : -2; /** @type {listenfn} */ @@ -133,34 +149,47 @@ export function when(event, timeout = 0) { /** * @param {string} event * @param {parcel} parcel + * @returns {int} */ function awaiters(event, parcel = null) { - const g = waitGroup.get(event); + if (util.emptyString(event)) return 0; + const wg = waitGroup.get(event); - if (!g) return; + if (!wg) return 0; - // listeners valid just the once for stickyEvents + // listeners valid just the once for stickyEvents & ephemeralEvents if (stickyEvents.has(event)) { waitGroup.delete(event); stickyParcels.set(event, parcel); + } else if (ephemeralEvents.has(event)) { + // log.d("sys: wg ephemeralEvents", event, parcel); + waitGroup.delete(event); + ephemeralEvents.delete(event); } - safeBox(g, parcel); + safeBox(wg, parcel); + return wg.size; } /** * @param {string} event * @param {parcel} parcel + * @returns {int} */ function callbacks(event, parcel = null) { + if (util.emptyString(event)) return 0; const cbs = listeners.get(event); - if (!cbs) return; + if (!cbs) return 0; - // listeners valid just the once for stickyEvents + // listeners valid just the once for stickyEvents & ephemeralEvents if (stickyEvents.has(event)) { listeners.delete(event); stickyParcels.set(event, parcel); + } else if (ephemeralEvents.has(event)) { + // log.d("sys: cb ephemeralEvents", event, parcel); + listeners.delete(event); + ephemeralEvents.delete(event); } // callbacks are queued async and don't block the caller. On Workers, @@ -169,6 +198,7 @@ function callbacks(event, parcel = null) { // incoming request (through the fetch event handler), such callbacks // may not even fire. Instead use: awaiters and not callbacks. microtaskBox(cbs, parcel); + return cbs.size; } /** @@ -226,6 +256,7 @@ function safeBox(fns, arg) { try { r.push(f(arg)); } catch (ignore) { + // log.e("sys: safeBox err", ignore); r.push(null); } } From 3fcf47d754391198b594c0c56d6d90fbd79622c2 Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Tue, 1 Oct 2024 03:00:56 +0530 Subject: [PATCH 41/41] system: ephemeral event common for both listeners & awaiters --- src/system.js | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/src/system.js b/src/system.js index 62481fe266..89871b53be 100644 --- a/src/system.js +++ b/src/system.js @@ -62,8 +62,10 @@ const waitGroup = new Map(); export function pub(event, parcel = null) { if (util.emptyString(event)) return; - const tot = awaiters(event, parcel); - return tot + callbacks(event, parcel); + const hadEphemeralEvent = ephemeralEvents.delete(event); + + const tot = awaiters(event, parcel, hadEphemeralEvent); + return tot + callbacks(event, parcel, hadEphemeralEvent); } /** @@ -89,6 +91,7 @@ export function sub(event, cb, timeout = 0) { // event doesn't exist so make it ephemeral ephemeralEvents.add(event); listeners.set(event, new Set()); + waitGroup.set(event, new Set()); return false; } @@ -126,6 +129,7 @@ export function when(event, timeout = 0) { } // no such event so make it ephemeral ephemeralEvents.add(event); + listeners.set(event, new Set()); waitGroup.set(event, new Set()); return Promise.reject(new Error(event + " missing event")); } @@ -149,9 +153,10 @@ export function when(event, timeout = 0) { /** * @param {string} event * @param {parcel} parcel + * @param {boolean} ephemeralEvent * @returns {int} */ -function awaiters(event, parcel = null) { +function awaiters(event, parcel = null, ephemeralEvent = false) { if (util.emptyString(event)) return 0; const wg = waitGroup.get(event); @@ -161,12 +166,13 @@ function awaiters(event, parcel = null) { if (stickyEvents.has(event)) { waitGroup.delete(event); stickyParcels.set(event, parcel); - } else if (ephemeralEvents.has(event)) { - // log.d("sys: wg ephemeralEvents", event, parcel); + } else if (ephemeralEvent) { + // log.d("sys: wg ephemeralEvent", event, parcel); waitGroup.delete(event); - ephemeralEvents.delete(event); } + if (wg.size === 0) return 0; + safeBox(wg, parcel); return wg.size; } @@ -174,9 +180,10 @@ function awaiters(event, parcel = null) { /** * @param {string} event * @param {parcel} parcel + * @param {boolean} ephemeralEvent * @returns {int} */ -function callbacks(event, parcel = null) { +function callbacks(event, parcel = null, ephemeralEvent = false) { if (util.emptyString(event)) return 0; const cbs = listeners.get(event); @@ -186,12 +193,12 @@ function callbacks(event, parcel = null) { if (stickyEvents.has(event)) { listeners.delete(event); stickyParcels.set(event, parcel); - } else if (ephemeralEvents.has(event)) { - // log.d("sys: cb ephemeralEvents", event, parcel); + } else if (ephemeralEvent) { + // log.d("sys: cb ephemeralEvent", event, parcel); listeners.delete(event); - ephemeralEvents.delete(event); } + if (cbs.size === 0) return 0; // callbacks are queued async and don't block the caller. On Workers, // where IOs or timers require event-context aka network-context, // which is only available when fns are invoked in response to an