diff --git a/fly.tls.toml b/fly.tls.toml index b2fbd69fc4..30240c3e91 100644 --- a/fly.tls.toml +++ b/fly.tls.toml @@ -2,7 +2,8 @@ app = "" kill_signal = "SIGINT" kill_timeout = "15s" -swap_size_mb = 152 +# swap must be disabled when using "suspend" +# swap_size_mb = 152 [build] dockerfile = "node.Dockerfile" @@ -20,11 +21,18 @@ swap_size_mb = 152 [experimental] auto_rollback = true +# community.fly.io/t/19180 +# fly.io/docs/machines/guides-examples/machine-restart-policy +[[restart]] + policy = "on-failure" + retries = 3 + # DNS over HTTPS (well, h2c and http1.1) [[services]] internal_port = 8055 protocol = "tcp" - auto_stop_machines = true + # community.fly.io/t/20672 + auto_stop_machines = "suspend" auto_start_machines = true [services.concurrency] @@ -57,7 +65,7 @@ auto_rollback = true [[services]] internal_port = 10555 protocol = "tcp" - auto_stop_machines = true + auto_stop_machines = "suspend" auto_start_machines = true [services.concurrency] diff --git a/fly.toml b/fly.toml index 38fc89f208..e548706e8e 100644 --- a/fly.toml +++ b/fly.toml @@ -2,7 +2,9 @@ app = "" kill_signal = "SIGINT" kill_timeout = "15s" -swap_size_mb = 152 +# swap cannot be used with "suspend" +# community.fly.io/t/20672 +# swap_size_mb = 152 [build] dockerfile = "node.Dockerfile" @@ -17,11 +19,17 @@ swap_size_mb = 152 NODE_ENV = "production" LOG_LEVEL = "info" +# community.fly.io/t/19180 +# fly.io/docs/machines/guides-examples/machine-restart-policy +[[restart]] + policy = "on-failure" + retries = 3 + # DNS over HTTPS [[services]] protocol = "tcp" internal_port = 8080 - auto_stop_machines = true + auto_stop_machines = "suspend" auto_start_machines = true [[services.ports]] @@ -50,7 +58,7 @@ swap_size_mb = 152 [[services]] protocol = "tcp" internal_port = 10000 - auto_stop_machines = true + auto_stop_machines = "suspend" auto_start_machines = true [[services.ports]] diff --git a/node.Dockerfile b/node.Dockerfile index 0680f23584..6f91660e13 100644 --- a/node.Dockerfile +++ b/node.Dockerfile @@ -19,7 +19,7 @@ FROM node:22-alpine AS runner # env vals persist even at run-time: archive.is/QpXp2 # and overrides fly.toml env values ENV NODE_ENV production -ENV NODE_OPTIONS="--max-old-space-size=320 --heapsnapshot-signal=SIGUSR2" +ENV NODE_OPTIONS="--max-old-space-size=200 --heapsnapshot-signal=SIGUSR2" # get working dir in order WORKDIR /app # external deps not bundled by webpack diff --git a/src/commons/bufutil.js b/src/commons/bufutil.js index 56eb33625c..8897091476 100644 --- a/src/commons/bufutil.js +++ b/src/commons/bufutil.js @@ -169,11 +169,19 @@ export function bufferOf(arrayBuf) { return Buffer.from(new Uint8Array(arrayBuf)); } +/** + * @param {Buffer} b + * @returns {int} + */ export function recycleBuffer(b) { b.fill(0); return 0; } +/** + * @param {int} size + * @returns {Buffer} + */ export function createBuffer(size) { return Buffer.allocUnsafe(size); } diff --git a/src/commons/dnsutil.js b/src/commons/dnsutil.js index 4449b6e4ee..37b9006db3 100644 --- a/src/commons/dnsutil.js +++ b/src/commons/dnsutil.js @@ -377,9 +377,19 @@ export function isAnswerQuad0(packet) { return isAnswerBlocked(packet.answers); } +export function ttl(packet) { + if (!hasAnswers(packet)) return 0; + return packet.answers[0].ttl || 0; +} + +/** + * @param {any} dnsPacket + * @returns {string[]} + */ export function extractDomains(dnsPacket) { if (!hasSingleQuestion(dnsPacket)) return []; + /** @type {string} */ const names = new Set(); const answers = dnsPacket.answers; @@ -416,7 +426,7 @@ export function extractDomains(dnsPacket) { export function getInterestingAnswerData(packet, maxlen = 80, delim = "|") { if (!hasAnswers(packet)) { - return !util.emptyObj(packet) ? packet.rcode || "WTF" : "WTF"; + return !util.emptyObj(packet) ? packet.rcode || "WTF1" : "WTF2"; } // set to true if at least one ip has been captured from ans @@ -535,6 +545,10 @@ export function getQueryType(packet) { return util.emptyString(qt) ? false : qt; } +/** + * @param {string?} n + * @returns {string} + */ export function normalizeName(n) { if (util.emptyString(n)) return n; diff --git a/src/commons/envutil.js b/src/commons/envutil.js index 834fb3c31e..e931590256 100644 --- a/src/commons/envutil.js +++ b/src/commons/envutil.js @@ -8,6 +8,12 @@ // musn't import /depend on anything. +export function isProd() { + if (!envManager) return false; + + return envManager.determineEnvStage() === "production"; +} + export function onFly() { if (!envManager) return false; @@ -89,6 +95,9 @@ export function isDeno() { return envManager.r() === "deno"; } +/** + * in milliseconds + */ export function workersTimeout(missing = 0) { if (!envManager) return missing; return envManager.get("WORKER_TIMEOUT") || missing; @@ -227,12 +236,10 @@ export function shutdownTimeoutMs() { } export function measureHeap() { - // disable; webpack can't bundle memwatch; see: server-node.js - return false; if (!envManager) return false; const reg = region(); if ( - reg === "maa" || + reg === "bom" || reg === "sin" || reg === "fra" || reg === "ams" || diff --git a/src/commons/util.js b/src/commons/util.js index 05cbafdb23..bc0c51934b 100644 --- a/src/commons/util.js +++ b/src/commons/util.js @@ -13,7 +13,8 @@ // musn't import any non-std modules export function fromBrowser(ua) { - return ua && ua.startsWith("Mozilla/5.0"); + if (emptyString(ua)) return false; + return ua.startsWith("Mozilla/5.0") || ua.startsWith("dohjs/"); } export function jsonHeaders() { @@ -117,7 +118,13 @@ export function objOf(map) { return map.entries ? Object.fromEntries(map) : {}; } -export function timedOp(op, ms, cleanup = () => {}) { +/** + * @param {(function((out, err) => void))} op + * @param {int} ms + * @param {function(any)} cleanup + * @returns {Promise} + */ +export function timedOp(op, ms, cleanup = (x) => {}) { return new Promise((resolve, reject) => { let timedout = false; const tid = timeout(ms, () => { @@ -142,6 +149,7 @@ export function timedOp(op, ms, cleanup = () => {}) { } }); } catch (e) { + clearTimeout(tid); if (!timedout) reject(e); } }); @@ -180,6 +188,7 @@ export function timedSafeAsyncOp(promisedOp, ms, defaultOp) { } }) .catch((ignored) => { + clearTimeout(tid); if (!timedout) deferredOp(); // else: handled by timeout }); diff --git a/src/core/dns/conns.js b/src/core/dns/conns.js index d79b5c92d9..af022cee7b 100644 --- a/src/core/dns/conns.js +++ b/src/core/dns/conns.js @@ -13,20 +13,36 @@ import * as util from "../../commons/util.js"; */ export class TcpConnPool { + /** + * @param {int} size + * @param {int} ttl + */ constructor(size, ttl) { + /** @type {int} */ this.size = size; - // max sweeps per give/take + /** + * max sweeps per give/take + * @type {int} + */ this.maxsweep = Math.max((size / 4) | 0, 20); + /** @type {int} */ this.ttl = ttl; // ms const quarterttl = (ttl / 4) | 0; + /** @type {int} */ this.keepalive = Math.min(/* 60s*/ 60000, quarterttl); // ms + /** @type {int} */ this.lastSweep = 0; + /** @type {int} */ this.sweepGapMs = Math.max(/* 10s*/ 10000, quarterttl); // ms /** @type {Map} */ this.pool = new Map(); log.d("tcp-pool psz:", size, "msw:", this.maxsweep, "t:", ttl); } + /** + * @param {AnySock} socket + * @returns {boolean} + */ give(socket) { if (socket.pending) return false; if (!socket.writable) return false; @@ -40,6 +56,9 @@ export class TcpConnPool { return this.checkin(socket); } + /** + * @returns {AnySock?} + */ take() { const thres = this.maxsweep / 2; let out = null; @@ -68,9 +87,9 @@ export class TcpConnPool { } /** - * @param {import("net").Socket} sock + * @param {AnySock} sock * @param {Report} report - * @returns {import("net").Socket} + * @returns {AnySock} */ checkout(sock, report) { log.d(report.id, "checkout, size:", this.pool.size); @@ -88,6 +107,10 @@ export class TcpConnPool { return sock; } + /** + * @param {AnySock} socket + * @returns {boolean} + */ checkin(sock) { const report = this.mkreport(); @@ -102,6 +125,10 @@ export class TcpConnPool { return true; } + /** + * @param {boolean} clear + * @returns {boolean} + */ sweep(clear = false) { const sz = this.pool.size; if (sz <= 0) return false; @@ -122,11 +149,21 @@ export class TcpConnPool { return sz > this.pool.size; // size decreased post-sweep? } + /** + * @param {AnySock?} socket + * @returns {boolean} + */ ready(sock) { - return sock.readyState === "open"; + return sock && sock.readyState === "open"; } + /** + * @param {AnySock?} sock + * @param {Report} report + * @returns {boolean} + */ healthy(sock, report) { + if (!sock) return false; const destroyed = !sock.writable; const open = this.ready(sock); const fresh = report.fresh(this.ttl); @@ -136,10 +173,18 @@ export class TcpConnPool { return fresh; // healthy if not expired } + /** + * @param {AnySock} sock + * @param {Report} report + * @returns {boolean} + */ dead(sock, report) { return !this.healthy(sock, report); } + /** + * @param {AnySock?} sock + */ evict(sock) { this.pool.delete(sock); @@ -148,6 +193,7 @@ export class TcpConnPool { } catch (ignore) {} } + /** @return {Report} */ mkreport() { return new Report(util.uid("tcp")); } @@ -164,24 +210,39 @@ class Report { this.lastuse = Date.now(); } + /** @param {number} since */ fresh(since) { return this.lastuse + since >= Date.now(); } } export class UdpConnPool { + /** + * @param {int} size + * @param {int} ttl + */ constructor(size, ttl) { + /** @type {int} */ this.size = size; + /** @type {int} */ this.maxsweep = Math.max((size / 4) | 0, 20); + /** @type {int} */ this.ttl = Math.max(/* 60s*/ 60000, ttl); // no more than 60s + /** @type {int} */ this.lastSweep = 0; + /** @type {int} */ this.sweepGapMs = Math.max(/* 10s*/ 10000, (ttl / 2) | 0); // ms /** @type {Map} */ this.pool = new Map(); log.d("udp-pool psz:", size, "msw:", this.maxsweep, "t:", ttl); } + /** + * @param {AnySock?} socket + * @returns {boolean} + */ give(socket) { + if (!socket) return false; if (this.pool.has(socket)) return true; const free = this.pool.size < this.size || this.sweep(); @@ -190,6 +251,9 @@ export class UdpConnPool { return this.checkin(socket); } + /** + * @returns {AnySock?} + */ take() { const thres = this.maxsweep / 2; let out = null; @@ -217,9 +281,9 @@ export class UdpConnPool { } /** - * @param {import("dgram").Socket} sock + * @param {AnySock} sock * @param {Report} report - * @returns {import("dgram").Socket} + * @returns {AnySock} */ checkout(sock, report) { log.d(report.id, "checkout, size:", this.pool.size); @@ -231,6 +295,10 @@ export class UdpConnPool { return sock; } + /** + * @param {AnySock} socket + * @returns {boolean} + */ checkin(sock) { const report = this.mkreport(); @@ -243,6 +311,10 @@ export class UdpConnPool { return true; } + /** + * @param {boolean} clear + * @returns {boolean} + */ sweep(clear = false) { const sz = this.pool.size; if (sz <= 0) return false; @@ -263,6 +335,10 @@ export class UdpConnPool { return sz > this.pool.size; // size decreased post-sweep? } + /** + * @param {Report} report + * @returns {boolean} + */ healthy(report) { const fresh = report.fresh(this.ttl); const id = report.id; @@ -270,10 +346,17 @@ export class UdpConnPool { return fresh; // healthy if not expired } + /** + * @param {Report} report + * @returns {boolean} + */ dead(report) { return !this.healthy(report); } + /** + * @param {AnySock?} sock + */ evict(sock) { if (!sock) return; this.pool.delete(sock); @@ -282,6 +365,7 @@ export class UdpConnPool { sock.close(); } + /** @return {Report} */ mkreport() { return new Report(util.uid("udp")); } diff --git a/src/core/dns/transact.js b/src/core/dns/transact.js index 53debb5db9..cb525b6503 100644 --- a/src/core/dns/transact.js +++ b/src/core/dns/transact.js @@ -9,25 +9,44 @@ import * as bufutil from "../../commons/bufutil.js"; import * as util from "../../commons/util.js"; import * as dnsutil from "../../commons/dnsutil.js"; +/** + * @typedef {import("net").Socket} TcpSock + * @typedef {import("dgram").Socket} UdpSock + * @typedef {import("net").AddressInfo} AddrInfo + */ + // TcpTx implements a single DNS question-answer exchange over TCP. It doesn't // multiplex multiple DNS questions over the same socket. It doesn't take the // ownership of the socket, but requires exclusive use of it. The socket may // close itself on errors, however. export class TcpTx { + /** @param {TcpSock} socket */ constructor(socket) { + /** @type {TcpSock} */ this.sock = socket; - // only one transaction allowed - // then done gates all other requests - this.done = false; + /** @type {boolean} */ + this.done = false || socket == null; // done gates all other requests + /** @type {ScratchBuffer} */ // reads from the socket is buffered into scratch this.readBuffer = this.makeReadBuffer(); + /** @type {function(Buffer)} */ + this.resolve = null; + /** @type {function(string?)} */ + this.reject = null; this.log = log.withTags("TcpTx"); } + /** @param {TcpSock} sock */ static begin(sock) { return new TcpTx(sock); } + /** + * @param {string} rxid + * @param {Buffer} query + * @param {int} timeout + * @returns {Promise|null} + */ async exchange(rxid, query, timeout) { if (this.done) { this.log.w(rxid, "no exchange, tx is done"); @@ -44,7 +63,7 @@ export class TcpTx { this.onTimeout(rxid); }; const onError = (err) => { - this.onError(rxid); + this.onError(rxid, err); }; try { @@ -68,16 +87,22 @@ export class TcpTx { } } - // TODO: Same code as in server.js, merge them + /** + * @param {string} rxid + * @param {Buffer} chunk + * @returns + */ onData(rxid, chunk) { + const cl = bufutil.len(chunk); + + // TODO: Same code as in server.js, merge them if (this.done) { - this.log.w(rxid, "on reads, tx is closed for business"); + this.log.w(rxid, "on reads, tx closed; discard", cl); return chunk; } const sb = this.readBuffer; - const cl = chunk.byteLength; if (cl <= 0) return; // read header first which contains length(dns-query) @@ -130,22 +155,23 @@ export class TcpTx { } // continue reading from socket } - onClose(err) { + onClose(rxid, err) { if (this.done) return; // no-op - return err ? this.no("error") : this.no("close"); + return err ? this.no(err.message) : this.no("close"); } - onError(err) { + onError(rxid, err) { if (this.done) return; // no-op this.log.e(rxid, "udp err", err.message); this.no(err.message); } - onTimeout() { + onTimeout(rxid) { if (this.done) return; // no-op this.no("timeout"); } + /** @returns {Promise} */ promisedRead() { const that = this; return new Promise((resolve, reject) => { @@ -154,44 +180,62 @@ export class TcpTx { }); } + /** + * @param {string} rxid + * @param {Buffer} query + */ write(rxid, query) { + const qlen = bufutil.len(query); if (this.done) { - this.log.w(rxid, "no writes, tx is done working"); + this.log.w(rxid, "no writes, tx is done; discard", qlen); return query; } const header = bufutil.createBuffer(dnsutil.dnsHeaderSize); + const hlen = bufutil.len(header); bufutil.recycleBuffer(header); - header.writeUInt16BE(query.byteLength); + header.writeUInt16BE(qlen); this.sock.write(header, () => { - this.log.d(rxid, "len(header):", header.byteLength); + this.log.d(rxid, "tcp write hdr:", hlen); }); this.sock.write(query, () => { - this.log.d(rxid, "len(query):", query.byteLength); + this.log.d(rxid, "tcp write q:", qlen); }); } + /** + * @param {Buffer} val + */ yes(val) { this.done = true; this.resolve(val); } + /** + * @param {string?|Error} reason + */ no(reason) { this.done = true; this.reject(reason); } + /** @returns {ScratchBuffer} */ makeReadBuffer() { - const qlenBuf = bufutil.createBuffer(dnsutil.dnsHeaderSize); - const qlenBufOffset = bufutil.recycleBuffer(qlenBuf); - - return { - qlenBuf: qlenBuf, - qlenBufOffset: qlenBufOffset, - qBuf: null, - qBufOffset: 0, - }; + return new ScratchBuffer(); + } +} + +class ScratchBuffer { + constructor() { + /** @type {Buffer} */ + this.qlenBuf = bufutil.createBuffer(dnsutil.dnsHeaderSize); + /** @type {int} */ + this.qlenBufOffset = bufutil.recycleBuffer(this.qlenBuf); + /** @type {Buffer} */ + this.qBuf = null; + /** @type {int} */ + this.qBufOffset = 0; } } @@ -200,19 +244,32 @@ export class TcpTx { // ownership of the socket, but requires exclusive access to it. The socket // may close itself on errors, however. export class UdpTx { + /** @param {UdpSock} socket */ constructor(socket) { + /** @type {UdpSock} */ this.sock = socket; - // only one transaction allowed - this.done = false; - // ticks socket io timeout - this.timeoutTimerId = null; + /** @type {boolean} */ + this.done = false || socket == null; // only one transaction allowed + /** @type {NodeJS.Timeout|-1} */ + this.timeoutTimerId = null; // ticks socket io timeout + /** @type {function(Buffer)} */ + this.resolve = null; + /** @type {function(string)} */ + this.reject = null; this.log = log.withTags("UdpTx"); } + /** @param {UdpSock} sock */ static begin(sock) { return new UdpTx(sock); } + /** + * @param {string} rxid + * @param {Buffer} query + * @param {int} timeout + * @returns {Promise|null} + */ async exchange(rxid, query, timeout) { if (this.done) { this.log.w(rxid, "no exchange, tx is done"); @@ -246,15 +303,26 @@ export class UdpTx { } } + /** + * @param {string} rxid + * @param {Buffer} query + * @returns + */ write(rxid, query) { if (this.done) return; // discard - this.log.d(rxid, "udp write"); + this.log.d(rxid, "udp write", bufutil.len(query)); this.sock.send(query); // err-on-write handled by onError } + /** + * @param {string} rxid + * @param {Buffer} b + * @param {AddrInfo} addrinfo + * @returns + */ onMessage(rxid, b, addrinfo) { if (this.done) return; // discard - this.log.d(rxid, "udp read"); + this.log.d(rxid, "udp read", bufutil.len(b)); this.yes(b); } @@ -270,6 +338,10 @@ export class UdpTx { return err ? this.no("error") : this.no("close"); } + /** + * @param {int} timeout + * @returns {Promise} + */ promisedRead(timeout = 0) { const that = this; if (timeout > 0) { @@ -283,6 +355,7 @@ export class UdpTx { }); } + /** @param {Buffer} val */ yes(val) { if (this.done) return; @@ -291,6 +364,7 @@ export class UdpTx { this.resolve(val); } + /** @param {string|Error} reason */ no(reason) { if (this.done) return; diff --git a/src/core/doh.js b/src/core/doh.js index 619815c3af..fc0749409b 100644 --- a/src/core/doh.js +++ b/src/core/doh.js @@ -61,11 +61,20 @@ function optionsRequest(request) { return request.method === "OPTIONS"; } +/** + * @param {IOState} io + * @param {Error} err + */ function errorResponse(io, err = null) { const eres = pres.errResponse("doh.js", err); io.dnsExceptionResponse(eres); } +/** + * @param {IOState} io + * @param {string} ua + * @returns {Response} + */ function withCors(io, ua) { if (util.fromBrowser(ua)) io.setCorsHeadersIfNeeded(); return io.httpResponse; diff --git a/src/core/io-state.js b/src/core/io-state.js index 1c56ff5494..bd33e9f7a2 100644 --- a/src/core/io-state.js +++ b/src/core/io-state.js @@ -8,22 +8,36 @@ import * as bufutil from "../commons/bufutil.js"; import * as dnsutil from "../commons/dnsutil.js"; +import * as envutil from "../commons/envutil.js"; import * as util from "../commons/util.js"; export default class IOState { constructor() { + /** @type {string} */ this.flag = ""; + /** @type {any} */ this.decodedDnsPacket = this.emptyDecodedDnsPacket(); - /** @type {Response} */ - this.httpResponse = undefined; + /** @type {Response?} */ + this.httpResponse = null; + /** @type {boolean} */ + this.isProd = envutil.isProd(); + /** @type {boolean} */ this.isException = false; - this.exceptionStack = undefined; + /** @type {string} */ + this.exceptionStack = null; + /** @type {string} */ this.exceptionFrom = ""; + /** @type {boolean} */ this.isDnsBlock = false; + /** @type {boolean} */ this.alwaysGatewayAnswer = false; + /** @type {string} */ this.gwip4 = ""; + /** @type {string} */ this.gwip6 = ""; + /** @type {string} */ this.region = ""; + /** @type {boolean} */ this.stopProcessing = false; this.log = log.withTags("IOState"); } @@ -80,11 +94,13 @@ export default class IOState { exceptionFrom: this.exceptionFrom, exceptionStack: this.exceptionStack, }; + this.decodedDnsPacket = dnsutil.decode(servfail); + this.logDnsPkt(); this.httpResponse = new Response(servfail, { headers: util.concatHeaders( this.headers(servfail), - this.additionalHeader(JSON.stringify(ex)) + this.debugHeaders(JSON.stringify(ex)) ), status: servfail ? 200 : 408, // rfc8484 section-4.2.1 }); @@ -123,11 +139,24 @@ export default class IOState { this.decodedDnsPacket = dnsPacket || dnsutil.decode(arrayBuffer); } + this.logDnsPkt(); this.httpResponse = new Response(arrayBuffer, { headers: this.headers(arrayBuffer), }); } + logDnsPkt() { + if (this.isProd) return; + this.log.d( + "domains", + dnsutil.extractDomains(this.decodedDnsPacket), + dnsutil.getQueryType(this.decodedDnsPacket) || "", + "data", + dnsutil.getInterestingAnswerData(this.decodedDnsPacket), + dnsutil.ttl(this.decodedDnsPacket) + ); + } + dnsBlockResponse(blockflag) { this.initDecodedDnsPacketIfNeeded(); this.stopProcessing = true; @@ -148,7 +177,7 @@ export default class IOState { this.httpResponse = new Response(null, { headers: util.concatHeaders( this.headers(), - this.additionalHeader(JSON.stringify(this.exceptionStack)) + this.debugHeaders(JSON.stringify(this.exceptionStack)) ), status: 503, }); @@ -174,7 +203,7 @@ export default class IOState { this.httpResponse = new Response(null, { headers: util.concatHeaders( this.headers(), - this.additionalHeader(JSON.stringify(this.exceptionStack)) + this.debugHeaders(JSON.stringify(this.exceptionStack)) ), status: 503, }); @@ -182,8 +211,11 @@ export default class IOState { } headers(b = null) { - const xNileFlags = this.isDnsBlock ? { "x-nile-flags": this.flag } : null; - const xNileFlagsOk = !xNileFlags ? { "x-nile-flags-dn": this.flag } : null; + const hasBlockFlag = !util.emptyString(this.flag); + const isBlocked = hasBlockFlag && this.isDnsBlock; + const couldBlock = hasBlockFlag && !this.isDnsBlock; + const xNileFlags = isBlocked ? { "x-nile-flags": this.flag } : null; + const xNileFlagsOk = couldBlock ? { "x-nile-flags-dn": this.flag } : null; const xNileRegion = !util.emptyString(this.region) ? { "x-nile-region": this.region } : null; @@ -191,13 +223,15 @@ export default class IOState { return util.concatHeaders( util.dnsHeaders(), util.contentLengthHeader(b), + this.cacheHeaders(), xNileRegion, xNileFlags, xNileFlagsOk ); } - additionalHeader(json) { + debugHeaders(json) { + if (this.isProd) return null; if (!json) return null; return { @@ -215,6 +249,16 @@ export default class IOState { } } + // set cache from ttl in decoded-dns-packet + cacheHeaders() { + const ttl = dnsutil.ttl(this.decodedDnsPacket); + if (ttl <= 0) return null; + + return { + "cache-control": "public, max-age=" + ttl, + }; + } + assignBlockResponse() { let done = this.initFlagsAndAnswers(); done = done && this.addData(); diff --git a/src/core/linux/swap.js b/src/core/linux/swap.js deleted file mode 100644 index 724cfa0e12..0000000000 --- a/src/core/linux/swap.js +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright (c) 2021 RethinkDNS and its authors. - * - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. - */ -import { spawnSync } from "node:child_process"; - -const swapfile = "swap__"; -const swapsize = "152M"; - -// linuxize.com/post/create-a-linux-swap-file -export function mkswap() { - return ( - !hasanyswap() && - sh("fallocate", ["-l", swapsize, swapfile]) && - sh("chmod", ["600", swapfile]) && - sh("mkswap", [swapfile]) && - sh("swapon", [swapfile]) && - sh("sysctl", ["vm.swappiness=20"]) - ); -} - -export function rmswap() { - return hasswap() && sh("swapoff", ["-v", swapfile]) && sh("rm", [swapfile]); -} - -function hasanyswap() { - // cat /proc/swaps - // Filename Type Size Used Priority - // /swap__ file 155644 99968 -2 - const pswaps = shout("cat", ["/proc/swaps"]); - const lines = pswaps && pswaps.split("\n"); - return lines && lines.length > 1; -} - -// stackoverflow.com/a/53222213 -function hasswap() { - return sh("test", ["-e", swapfile]); -} - -function shout(cmd, args) { - return shx(cmd, args, true); -} - -function sh(cmd, args) { - return shx(cmd, args) === 0; -} - -function shx(cmd, args, out = false) { - if (!cmd) return false; - args = args || []; - const opts = { - cwd: "/", - uid: 0, - shell: true, - encoding: "utf8", - }; - const proc = spawnSync(cmd, args, opts); - if (proc.error) log.i(cmd, args, opts, "error", proc.error); - if (proc.stderr) log.e(cmd, args, opts, proc.stderr); - if (proc.stdout) log.l(proc.stdout); - return !out ? proc.status : proc.stdout; -} diff --git a/src/core/log.js b/src/core/log.js index 7a22b580a8..f841cbec26 100644 --- a/src/core/log.js +++ b/src/core/log.js @@ -82,9 +82,6 @@ export default class Log { _resetLevel() { this.d = stub(); this.debug = stub(); - this.lapTime = stub(); - this.startTime = stub(); - this.endTime = stub(); this.i = stub(); this.info = stub(); this.w = stub(); @@ -96,18 +93,6 @@ export default class Log { withTags(...tags) { const that = this; return { - lapTime: (n, ...r) => { - return that.lapTime(n, ...tags, ...r); - }, - startTime: (n, ...r) => { - const tid = that.startTime(n); - that.d(that.now() + " T", ...tags, "create", tid, ...r); - return tid; - }, - endTime: (n, ...r) => { - that.d(that.now() + " T", ...tags, "end", n, ...r); - return that.endTime(n); - }, d: (...args) => { that.d(that.now() + " D", ...tags, ...args); }, @@ -163,13 +148,7 @@ export default class Log { this.d = console.debug; this.debug = console.debug; case "timer": - this.lapTime = console.timeLog || stub(); // Stubbing required for Fastly as they do not currently support this method. - this.startTime = function (name) { - name = uid(name); - if (console.time) console.time(name); - return name; - }; - this.endTime = console.timeEnd || stub(); // Stubbing required for Fastly as they do not currently support this method. + // deprecated; fallthrough case "info": this.i = console.info; this.info = console.info; diff --git a/src/core/node/config.js b/src/core/node/config.js index 59efdfa4fa..3caacb6cbb 100644 --- a/src/core/node/config.js +++ b/src/core/node/config.js @@ -20,7 +20,6 @@ import Log from "../log.js"; import * as system from "../../system.js"; import { services, stopAfter } from "../svc.js"; import EnvManager from "../env.js"; -import * as swap from "../linux/swap.js"; import * as dnst from "../../core/node/dns-transport.js"; // some of the cjs node globals aren't available in esm @@ -120,18 +119,10 @@ async function prep() { // TODO: move dns* related settings to env // flydns is always ipv6 (fdaa::53) const plainOldDnsIp = onFly ? "fdaa::3" : "1.1.1.2"; - let dns53 = null; - /** swap space and recursive resolver on Fly */ - if (onFly || true) { - const ok = swap.mkswap(); - log.i("mkswap done?", ok); - dns53 = dnst.makeTransport(plainOldDnsIp); - log.i("imported udp/tcp dns transport", plainOldDnsIp); - } else { - log.i("no swap required"); - } + const dns53 = dnst.makeTransport(plainOldDnsIp); + log.i("imported udp/tcp dns transport", plainOldDnsIp); - /** signal ready */ + // signal ready system.pub("ready", [dns53]); } diff --git a/src/core/node/dns-transport.js b/src/core/node/dns-transport.js index 12ff0067ae..490c181060 100644 --- a/src/core/node/dns-transport.js +++ b/src/core/node/dns-transport.js @@ -11,6 +11,19 @@ import * as util from "../../commons/util.js"; import { TcpConnPool, UdpConnPool } from "../dns/conns.js"; import { TcpTx, UdpTx } from "../dns/transact.js"; +/** + * @typedef {import("net").Socket | import("dgram").Socket} AnySock + * @typedef {import("net").Socket} TcpSock + * @typedef {import("dgram").Socket} UdpSock + */ + +/** + * + * @param {string} host + * @param {int} port + * @param {any} opts + * @returns {Transport} + */ export function makeTransport(host, port = 53, opts = {}) { return new Transport(host, port, opts); } @@ -25,14 +38,21 @@ export function makeTransport(host, port = 53, opts = {}) { export class Transport { constructor(host, port, opts = {}) { if (util.emptyString(host)) throw new Error("invalid host" + host); + /** @type {string} */ this.host = host; + /** @type {int} */ this.port = port || 53; + /** @type {int} */ this.connectTimeout = opts.connectTimeout || 3000; // 3s + /** @type {int} */ this.ioTimeout = opts.ioTimeout || 10000; // 10s + /** @type {int} */ this.ipproto = net.isIP(host); // 4, 6, or 0 const sz = opts.poolSize || 500; // conns const ttl = opts.poolTtl || 60000; // 1m + /** @type {TcpConnPool} */ this.tcpconns = new TcpConnPool(sz, ttl); + /** @type {UdpConnPool} */ this.udpconns = new UdpConnPool(sz, ttl); this.log = log.withTags("DnsTransport"); @@ -45,52 +65,55 @@ export class Transport { this.log.i("transport teardown (tcp | udp) done?", r1, "|", r2); } + /** + * @param {string} rxid + * @param {Buffer} q + * @returns {Promise|null} + */ async udpquery(rxid, q) { let sock = this.udpconns.take(); this.log.d(rxid, "udp pooled?", sock !== null); - const t = this.log.startTime("udp-query"); + /** @type {Buffer?} */ let ans = null; try { sock = sock || (await this.makeConn("udp")); - this.log.lapTime(t, rxid, "make-conn"); - ans = await UdpTx.begin(sock).exchange(rxid, q, this.ioTimeout); - this.log.lapTime(t, rxid, "get-ans"); - this.parkConn(sock, "udp"); } catch (ex) { this.closeUdp(sock); this.log.e(rxid, ex); } - this.log.endTime(t); - return ans; } + /** + * @param {string} rxid + * @param {Buffer} q + * @returns {Promise|null} + */ async tcpquery(rxid, q) { let sock = this.tcpconns.take(); - this.log.d(rxid, "tcp pooled?", sock !== null); + this.log.d(rxid, "tcp pooled?", sock != null); - const t = this.log.startTime("tcp-query"); + /** @type {Buffer?} */ let ans = null; try { sock = sock || (await this.makeConn("tcp")); - log.lapTime(t, rxid, "make-conn"); - ans = await TcpTx.begin(sock).exchange(rxid, q, this.ioTimeout); - log.lapTime(t, rxid, "get-ans"); - this.parkConn(sock, "tcp"); } catch (ex) { this.closeTcp(sock); this.log.e(rxid, ex); } - this.log.endTime(t); return ans; } + /** + * @param {AnySock} sock + * @param {string} proto + */ parkConn(sock, proto) { if (proto === "tcp") { const ok = this.tcpconns.give(sock); @@ -101,6 +124,11 @@ export class Transport { } } + /** + * @param {string} proto + * @returns {Promise} + * @throws {Error} + */ makeConn(proto) { if (proto === "tcp") { const tcpconnect = (cb) => { @@ -128,17 +156,18 @@ export class Transport { } /** - * @param {import("net").Socket} sock + * @param {TcpSock?} sock */ closeTcp(sock) { + if (!sock) return; // the socket is not expected to have any error-listeners // so we add one to avoid unhandled errors sock.on("error", util.stub); - if (sock && !sock.destroyed) sock.destroySoon(); + if (!sock.destroyed) sock.destroySoon(); } /** - * @param {import("dgram").Socket} sock + * @param {UdpSock?} sock */ closeUdp(sock) { if (!sock || sock.destroyed) return; diff --git a/src/core/plugin.js b/src/core/plugin.js index 12c6bdbeea..a64d05dd8e 100644 --- a/src/core/plugin.js +++ b/src/core/plugin.js @@ -53,7 +53,7 @@ export default class RethinkPlugin { this.registerPlugin( "userOp", services.userOp, - ["rxid", "request", "isDnsMsg"], + ["rxid", "request", "requestDecodedDnsPacket", "isDnsMsg"], this.userOpCallback ); @@ -142,10 +142,7 @@ export default class RethinkPlugin { async execute() { const io = this.io; - const rxid = this.ctx.get("rxid"); - - const t = this.log.startTime("exec-plugin-" + rxid); - + // const rxid = this.ctx.get("rxid"); for (const p of this.plugin) { if (io.stopProcessing && !p.continueOnStopProcess) { continue; @@ -154,19 +151,12 @@ export default class RethinkPlugin { continue; } - this.log.lapTime(t, rxid, p.name, "send-io"); - const res = await p.module.exec(makectx(this.ctx, p.pctx)); - this.log.lapTime(t, rxid, p.name, "got-res"); - if (typeof p.callback === "function") { await p.callback.call(this, res, io); } - - this.log.lapTime(t, rxid, p.name, "post-callback"); } - this.log.endTime(t); } /** @@ -187,7 +177,7 @@ export default class RethinkPlugin { /** * Adds "userBlocklistInfo", "userBlocklistInfo", and "dnsResolverUrl" * to RethinkPlugin ctx. - * @param {RResp} response - Contains data: userBlocklistInfo / userBlockstamp + * @param {RResp} response * @param {IOState} io */ async userOpCallback(response, io) { diff --git a/src/plugins/cache-util.js b/src/plugins/cache-util.js index 5bdc5800c7..561d76597b 100644 --- a/src/plugins/cache-util.js +++ b/src/plugins/cache-util.js @@ -118,7 +118,7 @@ function updateTtl(packet, end) { } } -function makeId(packet) { +export function makeId(packet) { // multiple questions are kind of an undefined behaviour // stackoverflow.com/a/55093896 if (!dnsutil.hasSingleQuestion(packet)) return null; diff --git a/src/plugins/dns-op/blocker.js b/src/plugins/dns-op/blocker.js index 2265250898..0852f74520 100644 --- a/src/plugins/dns-op/blocker.js +++ b/src/plugins/dns-op/blocker.js @@ -15,6 +15,12 @@ export class DnsBlocker { this.log = log.withTags("DnsBlocker"); } + /** + * @param {string} rxid + * @param {pres.RespData} req + * @param {pres.BlockstampInfo} blockInfo + * @returns {pres.RespData} + */ blockQuestion(rxid, req, blockInfo) { const dnsPacket = req.dnsPacket; const stamps = req.stamps; @@ -40,6 +46,12 @@ export class DnsBlocker { return pres.copyOnlyBlockProperties(req, bres); } + /** + * @param {string} rxid + * @param {pres.RespData} res + * @param {pres.BlockstampInfo} blockInfo + * @returns {pres.RespData} + */ blockAnswer(rxid, res, blockInfo) { const dnsPacket = res.dnsPacket; const stamps = res.stamps; @@ -71,6 +83,12 @@ export class DnsBlocker { return pres.copyOnlyBlockProperties(res, bres); } + /** + * @param {string[]} names + * @param {pres.BlockstampInfo} blockInfo + * @param {pres.BStamp} blockstamps + * @returns {pres.RespData} + */ block(names, blockInfo, blockstamps) { let r = pres.rdnsNoBlockResponse(); for (const n of names) { diff --git a/src/plugins/dns-op/cache-resolver.js b/src/plugins/dns-op/cache-resolver.js index 61240e1870..41617f68ac 100644 --- a/src/plugins/dns-op/cache-resolver.js +++ b/src/plugins/dns-op/cache-resolver.js @@ -48,6 +48,12 @@ export class DNSCacheResponder { return response; } + /** + * @param {string} rxid + * @param {any} packet + * @param {pres.BStamp} blockInfo + * @returns {Promise} + */ async resolveFromCache(rxid, packet, blockInfo) { const noAnswer = pres.rdnsNoBlockResponse(); // if blocklist-filter is setup, then there's no need to query http-cache @@ -101,6 +107,12 @@ export class DNSCacheResponder { return pres.dnsResponse(res.dnsPacket, reencoded, res.stamps); } + /** + * @param {string} rxid + * @param {pres.RespData} r + * @param {pres.BStamp} blockInfo + * @returns {pres.RespData} + */ makeCacheResponse(rxid, r, blockInfo) { // check incoming dns request against blocklists in cache-metadata this.blocker.blockQuestion(rxid, /* out*/ r, blockInfo); diff --git a/src/plugins/dns-op/prefilter.js b/src/plugins/dns-op/prefilter.js index 806f9c81f0..122dea2b6f 100644 --- a/src/plugins/dns-op/prefilter.js +++ b/src/plugins/dns-op/prefilter.js @@ -197,7 +197,8 @@ export class DNSPrefilter { const subdomains = d.split("."); do { if (util.emptyArray(subdomains)) break; - if (undelegated.has(subdomains.join("."))) { + const fqdn = subdomains.join("."); + if (undelegated.has(fqdn)) { return block; } } while (subdomains.shift() != null); diff --git a/src/plugins/dns-op/resolver.js b/src/plugins/dns-op/resolver.js index edf04cae9a..d37d0c543e 100644 --- a/src/plugins/dns-op/resolver.js +++ b/src/plugins/dns-op/resolver.js @@ -13,6 +13,7 @@ import * as dnsutil from "../../commons/dnsutil.js"; import * as bufutil from "../../commons/bufutil.js"; import * as util from "../../commons/util.js"; import * as envutil from "../../commons/envutil.js"; +import * as system from "../../system.js"; import { BlocklistFilter } from "../rethinkdns/filter.js"; export default class DNSResolver { @@ -33,9 +34,11 @@ export default class DNSResolver { this.log = log.withTags("DnsResolver"); this.measurements = []; + this.coalstats = { tot: 0, pub: 0, empty: 0, try: 0 }; this.profileResolve = envutil.profileDnsResolves(); // only valid on nodejs this.forceDoh = envutil.forceDoh(); + this.timeout = (envutil.workersTimeout() / 2) | 0; // only valid on workers // bg-bw-init results in higher io-wait, not lower @@ -137,7 +140,7 @@ export default class DNSResolver { * @param {Request} ctx.request * @param {ArrayBuffer} ctx.requestBodyBuffer * @param {Object} ctx.requestDecodedDnsPacket - * @param {Object} ctx.userBlocklistInfo + * @param {pres.BlockstampInfo} ctx.userBlocklistInfo * @param {String} ctx.userDnsResolverUrl * @param {string} ctx.userBlockstamp * @param {pres.BStamp?} ctx.domainBlockstamp @@ -151,6 +154,7 @@ export default class DNSResolver { const rawpacket = ctx.requestBodyBuffer; const decodedpacket = ctx.requestDecodedDnsPacket; const userDns = ctx.userDnsResolverUrl; + const forceUserDns = this.forceDoh || !util.emptyString(userDns); const dispatcher = ctx.dispatcher; const userBlockstamp = ctx.userBlockstamp; // may be null or empty-obj (stamp then needs to be got from blf) @@ -212,7 +216,7 @@ export default class DNSResolver { this.resolveDnsUpstream( rxid, req, - this.determineDohResolvers(userDns), + this.determineDohResolvers(userDns, forceUserDns), rawpacket, decodedpacket ), @@ -232,6 +236,7 @@ export default class DNSResolver { } // developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise/allSettled#return_value + /** @type{Response} */ const res = promisedTasks[1].value; if (fromMax) { @@ -251,8 +256,8 @@ export default class DNSResolver { if (!res.ok) { const txt = res.text && (await res.text()); - this.log.d(rxid, "!OK", res.status, txt); - throw new Error(txt + " http err: " + res); + this.log.w(rxid, "!OK", res.status, txt); + throw new Error(txt + " http err: " + res.status + " " + res.statusText); } const ans = await res.arrayBuffer(); @@ -263,7 +268,7 @@ export default class DNSResolver { // check outgoing cached dns-packet against blocklists this.blocker.blockAnswer(rxid, /* out*/ r, blInfo); const fromCache = cacheutil.hasCacheHeader(res.headers); - this.log.d(rxid, "ans block?", r.isBlocked, "from cache?", fromCache); + this.log.d(rxid, "ansblock?", r.isBlocked, "fromcache?", fromCache); // if res was got from caches or if res was got from max doh (ie, blf // wasn't used to retrieve stamps), then skip hydrating the cache @@ -309,7 +314,7 @@ export default class DNSResolver { this.log.d(rxid, "primeCache: block?", blocked, "k", k.href); if (!k) { - this.log.d(rxid, "no cache-key, url/query missing?", k, r.stamps); + this.log.d(rxid, "primeCache: no key, url/query missing?", k, r.stamps); return; } @@ -340,24 +345,39 @@ DNSResolver.prototype.resolveDnsUpstream = async function ( query, packet ) { - // Promise.any on promisedPromises[] only works if there are - // zero awaits in this function or any of its downstream calls. - // Otherwise, the first reject in promisedPromises[], before - // any statement in the call-stack awaits, would throw unhandled - // error, since the event loop would have 'ticked' and Promise.any - // on promisedPromises[] would still not have been executed, as it - // is the last statement of this function (which would have eaten up - // all rejects as long as there was one resolved promise). - const promisedPromises = []; - // if no doh upstreams set, resolve over plain-old dns if (util.emptyArray(resolverUrls)) { + const eid = cacheutil.makeId(packet); + /** @type {ArrayBuffer[]?} */ + let parcel = null; + + try { + const g = await system.when(eid, this.timeout); + this.coalstats.tot += 1; + if (!util.emptyArray(g) && g[0] != null) { + const sz = bufutil.len(g[0]); + this.log.d(rxid, "coalesced", eid, sz, this.coalstats); + if (sz > 0) return Promise.resolve(new Response(g[0])); + } + this.coalstats.empty += 1; + this.log.e(rxid, "empty coalesced", eid, this.coalstats); + return Promise.resolve(util.respond503()); + } catch (reason) { + // happens on timeout or if new event, eid + this.coalstats.try += 1; + this.log.d(rxid, "not coalesced", eid, reason, this.coalstats); + } + if (this.transport == null) { this.log.e(rxid, "plain dns transport not set"); + this.coalstats.pub += 1; + system.pub(eid, parcel); return Promise.reject(new Error("plain dns transport not set")); } - // do not let exceptions passthrough to the caller + + let promisedResponse = null; try { + // do not let exceptions passthrough to the caller const q = bufutil.bufferOf(query); let ans = await this.transport.udpquery(rxid, q); @@ -367,19 +387,31 @@ DNSResolver.prototype.resolveDnsUpstream = async function ( } if (ans) { - const r = new Response(bufutil.arrayBufferOf(ans)); - promisedPromises.push(Promise.resolve(r)); + const ab = bufutil.arrayBufferOf(ans); + parcel = [ab]; + promisedResponse = Promise.resolve(new Response(ab)); } else { - promisedPromises.push(Promise.resolve(util.respond503())); + promisedResponse = Promise.resolve(util.respond503()); } } catch (e) { this.log.e(rxid, "err when querying plain old dns", e.stack); - promisedPromises.push(Promise.reject(e)); + promisedResponse = Promise.reject(e); } - return Promise.any(promisedPromises); + this.coalstats.pub += 1; + system.pub(eid, parcel); + return promisedResponse; } + // Promise.any on promisedPromises[] only works if there are + // zero awaits in this function or any of its downstream calls. + // Otherwise, the first reject in promisedPromises[], before + // any statement in the call-stack awaits, would throw unhandled + // error, since the event loop would have 'ticked' and Promise.any + // on promisedPromises[] would still not have been executed, as it + // is the last statement of this function (which would have eaten up + // all rejects as long as there was one resolved promise). + const promisedPromises = []; try { // upstream to cache this.log.d(rxid, "upstream cache"); diff --git a/src/plugins/plugin-response.js b/src/plugins/plugin-response.js index c11c185fad..adebaf2cb0 100644 --- a/src/plugins/plugin-response.js +++ b/src/plugins/plugin-response.js @@ -9,6 +9,8 @@ import * as util from "../commons/util.js"; import * as bufutil from "../commons/bufutil.js"; +/** @typedef {import("./users/auth-token.js").Outcome} AuthOutcome */ + export class RResp { constructor(data = null, hasex = false, exfrom = "", exstack = "") { /** @type {RespData?} */ @@ -30,10 +32,27 @@ export class RespData { this.flag = flag || ""; /** @type {Object} */ this.dnsPacket = packet || null; - /** @type {ArrayBuffer} */ + /** @type {ArrayBuffer?} */ this.dnsBuffer = raw || null; - /** @type {BStamp?} */ + /** @type {BStamp|boolean} */ this.stamps = stamps || {}; + /** @type {AuthOutcome?} */ + this.userAuth = null; + /** @type {BlockstampInfo?} */ + this.userBlocklistInfo = null; + /** @type {String} */ + this.dnsResolverUrl = ""; + /** @type {string} */ + this.userBlocklistFlag = ""; + } +} + +export class BlockstampInfo { + constructor() { + /** @type {Uint16Array} */ + this.userBlocklistFlagUint = null; + /** @type {String} - mosty 0 or 1 */ + this.flagVersion = "0"; } } diff --git a/src/plugins/rdns-util.js b/src/plugins/rdns-util.js index f9d93677c7..f1c1f89873 100644 --- a/src/plugins/rdns-util.js +++ b/src/plugins/rdns-util.js @@ -27,11 +27,6 @@ const emptystr = ""; // delim, version, blockstamp (flag), accesskey const emptystamp = [emptystr, emptystr, emptystr, emptystr]; -// deprecated: all lists are trreated as wildcards -const _wildcardUint16 = new Uint16Array([ - 64544, 18431, 8191, 65535, 64640, 1, 128, 16320, -]); - // pec: parental control, rec: recommended, sec: security const recBlockstamps = new Map(); // oisd, 1hosts:mini, cpbl:light, anudeep, yhosts, tiuxo, adguard @@ -57,28 +52,46 @@ recBlockstamps.set("pr", "1:eMYB-ACgAQAgARAwIABhUgCA"); // pec, sec recBlockstamps.set("ps", "1:GNwB-ACgeQKr7cg3YXoAgA=="); +/** + * @param {BlocklistFilter} blf + * @returns {boolean} + */ export function isBlocklistFilterSetup(blf) { return blf && !util.emptyObj(blf.ftrie); } +/** + * @param {string} p + * @returns {boolean} + */ export function isStampQuery(p) { return stampPrefix.test(p); } +/** + * @param {string} p + * @returns {boolean} + */ export function isLogQuery(p) { return logPrefix.test(p); } -// dn -> domain name, ex: you.and.i.example.com -// userBlInfo -> user-selected blocklist-stamp -// {userBlocklistFlagUint, userServiceListUint} -// dnBlInfo -> obj of blocklists stamps for dn and all its subdomains -// {string(sub/domain-name) : u16(blocklist-stamp) } -// FIXME: return block-dnspacket depending on altsvc/https/svcb or cname/a/aaaa +/** + * dn -> domain name, ex: you.and.i.example.com + * userBlInfo -> user-selected blocklist-stamp + * {BlockstampInfo} + * dnBlInfo -> obj of blocklists stamps for dn and all its subdomains + * {string(sub/domain-name) : u16(blocklist-stamp) } + * FIXME: return block-dnspacket depending on altsvc/https/svcb or cname/a/aaaa + * @param {string} dn domain name + * @param {pres.BlockstampInfo} userBlInfo user blocklist info + * @param {pres.BStamp} dnBlInfo domain blockstamp map + */ export function doBlock(dn, userBlInfo, dnBlInfo) { const blockSubdomains = envutil.blockSubdomains(); const version = userBlInfo.flagVersion; const noblock = pres.rdnsNoBlockResponse(); + const userUint = userBlInfo.userBlocklistFlagUint; if ( util.emptyString(dn) || util.emptyObj(dnBlInfo) || @@ -89,32 +102,14 @@ export function doBlock(dn, userBlInfo, dnBlInfo) { // treat every blocklist as a wildcard blocklist if (blockSubdomains) { - return applyWildcardBlocklists( - userBlInfo.userBlocklistFlagUint, - version, - dnBlInfo, - dn - ); + return applyWildcardBlocklists(dn, version, userUint, dnBlInfo); } - const dnUint = new Uint16Array(dnBlInfo[dn]); + const dnUint = dnBlInfo[dn]; // if the domain isn't in block-info, we're done if (util.emptyArray(dnUint)) return noblock; // else, determine if user selected blocklist intersect with the domain's - const r = applyBlocklists(userBlInfo.userBlocklistFlagUint, dnUint, version); - - // if response is blocked, we're done - if (r.isBlocked) return r; - // if user-blockstamp doesn't contain any wildcard blocklists, we're done - if (util.emptyArray(userBlInfo.userServiceListUint)) return r; - - // check if any subdomain is in blocklists that is also in user-blockstamp - return applyWildcardBlocklists( - userBlInfo.userServiceListUint, - version, - dnBlInfo, - dn - ); + return applyBlocklists(version, userUint, dnUint); } /** @@ -131,7 +126,7 @@ export function blockstampFromCache(cr) { } /** - * @param {*} dnsPacket + * @param {any} dnsPacket * @param {BlocklistFilter} blocklistFilter * @returns {pres.BStamp|boolean} */ @@ -156,7 +151,14 @@ export function blockstampFromBlocklistFilter(dnsPacket, blocklistFilter) { return util.emptyMap(m) ? false : util.objOf(m); } -function applyWildcardBlocklists(uint1, flagVersion, dnBlInfo, dn) { +/** + * @param {string} dn domain name + * @param {Uint16Array} usrUint user blocklist flags + * @param {string} flagVersion mosty 0 or 1 + * @param {pres.BStamp} dnBlInfo subdomain blocklist flag group + * @returns {pres.RespData} + */ +function applyWildcardBlocklists(dn, flagVersion, usrUint, dnBlInfo) { const dnSplit = dn.split("."); // iterate through all subdomains one by one, for ex: a.b.c.ex.com: @@ -170,7 +172,7 @@ function applyWildcardBlocklists(uint1, flagVersion, dnBlInfo, dn) { // the subdomain isn't present in any current blocklists if (util.emptyArray(subdomainUint)) continue; - const response = applyBlocklists(uint1, subdomainUint, flagVersion); + const response = applyBlocklists(flagVersion, usrUint, subdomainUint); // if any subdomain is in any blocklist, block the current request if (!util.emptyObj(response) && response.isBlocked) { @@ -181,7 +183,13 @@ function applyWildcardBlocklists(uint1, flagVersion, dnBlInfo, dn) { return pres.rdnsNoBlockResponse(); } -function applyBlocklists(uint1, uint2, flagVersion) { +/** + * @param {string} flagVersion + * @param {Uint16Array} uint1 + * @param {Uint16Array} uint2 + * @returns {pres.RespData} + */ +function applyBlocklists(flagVersion, uint1, uint2) { // uint1 -> user blocklists; uint2 -> blocklists including sub/domains const blockedUint = intersect(uint1, uint2); @@ -194,6 +202,11 @@ function applyBlocklists(uint1, uint2, flagVersion) { } } +/** + * @param {Uint16Array} flag1 + * @param {Uint16Array} flag2 + * @returns {Uint16Array|null} + */ function intersect(flag1, flag2) { if (util.emptyArray(flag1) || util.emptyArray(flag2)) return null; @@ -253,6 +266,11 @@ function intersect(flag1, flag2) { return Uint16Array.of(commonHeader, ...commonBody.reverse()); } +/** + * @param {int} uint + * @param {int} pos + * @returns + */ function clearbit(uint, pos) { return uint & ~(1 << pos); } @@ -326,7 +344,7 @@ export function recBlockstampFrom(url) { /** * @param {string} u - Request URL string - * @returns {Array} s - delim, version, blockstamp (flag), accesskey + * @returns {string[]} s - delim, version, blockstamp (flag), accesskey */ export function extractStamps(u) { const url = new URL(u); @@ -373,6 +391,10 @@ export function extractStamps(u) { return emptystamp; } +/** + * @param {string} b64Flag + * @returns {Uint16Array} + */ export function base64ToUintV0(b64Flag) { // TODO: v0 not in use, remove all occurences // FIXME: Impl not accurate @@ -381,32 +403,45 @@ export function base64ToUintV0(b64Flag) { return bufutil.base64ToUint16(f); } +/** + * @param {string} b64Flag + * @returns {Uint16Array} + */ export function base64ToUintV1(b64Flag) { // TODO: check for empty b64Flag return bufutil.base64ToUint16(b64Flag); } +/** + * @param {string} b64Flag + * @returns {Uint16Array} + */ export function base32ToUintV1(flag) { // TODO: check for empty flag const b32 = decodeURI(flag); return bufutil.decodeFromBinaryArray(rbase32(b32)); } +/** + * @param {string} s + * @returns {string[]} [delim, ver, blockstamp, accesskey] + */ function splitBlockstamp(s) { - // delim, version, blockstamp, accesskey - if (util.emptyString(s)) return emptystamp; if (!isStampQuery(s)) return emptystamp; if (isB32Stamp(s)) { + // delim, version, blockstamp, accesskey return [_b32delim, ...s.split(_b32delim)]; } else { return [_b64delim, ...s.split(_b64delim)]; } - - return out; } +/** + * @param {string} s + * @returns {boolean} + */ export function isB32Stamp(s) { const idx32 = s.indexOf(_b32delim); const idx64 = s.indexOf(_b64delim); @@ -416,21 +451,27 @@ export function isB32Stamp(s) { else return idx32 < idx64; } -// s[0] is version field, if it doesn't exist -// then treat it as if version 0. +/** + * + * @param {string[]} s + * @returns {string} + */ export function stampVersion(s) { + // s[0] is version field, if it doesn't exist + // then treat it as if version 0. if (!util.emptyArray(s)) return s[0]; else return "0"; } // TODO: The logic to parse stamps must be kept in sync with: // github.com/celzero/website-dns/blob/8e6056bb/src/js/flag.js#L260-L425 +/** + * + * @param {string} flag + * @returns {pres.BlockstampInfo} + */ export function unstamp(flag) { - const r = { - userBlocklistFlagUint: null, - flagVersion: "0", - userServiceListUint: null, - }; + const r = new pres.BlockstampInfo(); if (util.emptyString(flag)) return r; @@ -440,29 +481,26 @@ export function unstamp(flag) { const isFlagB32 = isB32Stamp(flag); // "v:b64" or "v-b32" or "uriencoded(b64)", where v is uint version const s = flag.split(isFlagB32 ? _b32delim : _b64delim); - let convertor = (x) => ""; // empty convertor - let f = ""; // stamp flag const v = stampVersion(s); + r.flagVersion = v; if (v === "0") { - // version 0 - convertor = base64ToUintV0; - f = s[0]; + const f = s[0]; + r.userBlocklistFlagUint = base64ToUintV0(f) || null; } else if (v === "1") { - convertor = isFlagB32 ? base32ToUintV1 : base64ToUintV1; - f = s[1]; + const convertor = isFlagB32 ? base32ToUintV1 : base64ToUintV1; + const f = s[1]; + r.userBlocklistFlagUint = convertor(f) || null; } else { log.w("Rdns:unstamp", "unknown blocklist stamp version in " + s); - return r; } - - r.flagVersion = v; - r.userBlocklistFlagUint = convertor(f) || null; - r.userServiceListUint = intersect(r.userBlocklistFlagUint, _wildcardUint16); - return r; } +/** + * @param {pres.BlockstampInfo} blockInfo + * @returns {boolean} + */ export function hasBlockstamp(blockInfo) { return ( !util.emptyObj(blockInfo) && @@ -470,13 +508,21 @@ export function hasBlockstamp(blockInfo) { ); } -// returns true if tstamp is of form yyyy/epochMs +/** + * returns true if tstamp is of form yyyy/epochMs + * @param {string} tstamp + * @returns {boolean} + */ function isValidFullTimestamp(tstamp) { if (typeof tstamp !== "string") return false; return tstamp.indexOf("/") === 4; } -// from: github.com/celzero/downloads/blob/main/src/timestamp.js +/** + * from: github.com/celzero/downloads/blob/main/src/timestamp.js + * @param {string} tstamp + * @returns {int} epoch + */ export function bareTimestampFrom(tstamp) { // strip out "/" if tstamp is of form yyyy/epochMs if (isValidFullTimestamp(tstamp)) { @@ -490,6 +536,10 @@ export function bareTimestampFrom(tstamp) { return t; } +/** + * @param {string} strflag + * @returns {string[]} blocklist names + */ export function blocklists(strflag) { const { userBlocklistFlagUint, flagVersion } = unstamp(strflag); const blocklists = []; diff --git a/src/plugins/users/user-op.js b/src/plugins/users/user-op.js index 6f879311ad..3247072af0 100644 --- a/src/plugins/users/user-op.js +++ b/src/plugins/users/user-op.js @@ -8,13 +8,18 @@ import { UserCache } from "./user-cache.js"; import * as pres from "../plugin-response.js"; import * as util from "../../commons/util.js"; +import * as envutil from "../../commons/envutil.js"; import * as rdnsutil from "../rdns-util.js"; import * as token from "./auth-token.js"; -import * as bufutil from "../../commons/bufutil.js"; +import * as dnsutil from "../../commons/dnsutil.js"; // TODO: determine an approp cache-size const cacheSize = 20000; +// use fixed doh upstream for these domains, +// instead of either recursing (on Fly.io) +const delegated = new Set(["ipv4only.arpa"]); + export class UserOp { constructor() { this.userConfigCache = new UserCache(cacheSize); @@ -22,7 +27,7 @@ export class UserOp { } /** - * @param {{request: Request, isDnsMsg: Boolean, rxid: string}} ctx + * @param {{request: Request, requestDecodedDnsPacket: any, isDnsMsg: Boolean, rxid: string}} ctx * @returns {Promise} */ async exec(ctx) { @@ -44,11 +49,11 @@ export class UserOp { } /** - * @param {{request: Request, isDnsMsg: Boolean, rxid: string}} ctx + * @param {{request: Request, requestDecodedDnsPacket: any, isDnsMsg: Boolean, rxid: string}} ctx * @returns {pres.RResp} */ loadUser(ctx) { - let response = pres.emptyResponse(); + const response = pres.emptyResponse(); if (!ctx.isDnsMsg) { this.log.w(ctx.rxid, "not a dns-msg, ignore"); @@ -56,18 +61,29 @@ export class UserOp { } try { - const blocklistFlag = rdnsutil.blockstampFromUrl(ctx.request.url); + const dnsPacket = ctx.requestDecodedDnsPacket; + const domains = dnsutil.extractDomains(dnsPacket); + for (const d of domains) { + if (delegated.has(d)) { + // may be overriden by user-preferred doh upstream + response.data.dnsResolverUrl = envutil.primaryDohResolver(); + } + } - if (util.emptyString(blocklistFlag)) { + const blocklistFlag = rdnsutil.blockstampFromUrl(ctx.request.url); + const hasflag = !util.emptyString(blocklistFlag); + if (!hasflag) { this.log.d(ctx.rxid, "empty blocklist-flag", ctx.request.url); } - // blocklistFlag may be invalid, ref rdnsutil.blockstampFromUrl let r = this.userConfigCache.get(blocklistFlag); - if (!util.emptyString(blocklistFlag) && util.emptyObj(r)) { - r = rdnsutil.unstamp(blocklistFlag); + let hasdata = rdnsutil.hasBlockstamp(r); + if (hasflag && !hasdata) { + // r not in cache + r = rdnsutil.unstamp(blocklistFlag); // r is never null, may throw ex + hasdata = rdnsutil.hasBlockstamp(r); - if (!bufutil.emptyBuf(r.userBlocklistFlagUint)) { + if (hasdata) { this.log.d(ctx.rxid, "new cfg cache kv", blocklistFlag, r); // TODO: blocklistFlag is not normalized, ie b32 used for dot isn't // converted to its b64 form (which doh and rethinkdns modules use) @@ -75,16 +91,18 @@ export class UserOp { this.userConfigCache.put(blocklistFlag, r); } } else { - this.log.d(ctx.rxid, "cfg cache hit?", r != null, blocklistFlag, r); + this.log.d(ctx.rxid, "cfg cache hit?", hasdata, blocklistFlag, r); } - response.data.userBlocklistInfo = r; - response.data.userBlocklistFlag = blocklistFlag; - // sets user-preferred doh upstream - response.data.dnsResolverUrl = null; + if (hasdata) { + response.data.userBlocklistInfo = r; + response.data.userBlocklistFlag = blocklistFlag; + // TODO: override response.data.dnsResolverUrl + } } catch (e) { this.log.e(ctx.rxid, "loadUser", e); - response = pres.errResponse("UserOp:loadUser", e); + // avoid erroring out on invalid blocklist info & flag + // response = pres.errResponse("UserOp:loadUser", e); } return response; diff --git a/src/server-node.js b/src/server-node.js index 379cdc3473..dd687e4772 100644 --- a/src/server-node.js +++ b/src/server-node.js @@ -24,9 +24,6 @@ import * as util from "./commons/util.js"; import "./core/node/config.js"; import { finished } from "node:stream"; import * as nodecrypto from "./commons/crypto.js"; -// webpack can't handle node-bindings, a dependency of node-memwatch -// github.com/webpack/webpack/issues/16029 -// import * as memwatch from "@airbnb/node-memwatch"; /** * @typedef {net.Socket} Socket @@ -50,6 +47,7 @@ class Stats { this.nofconns = 0; this.openconns = 0; this.noftimeouts = 0; + this.nofheapsnaps = 0; // avg1, avg5, avg15, adj, maxconns this.bp = [0, 0, 0, 0, 0]; } @@ -169,9 +167,8 @@ const tracker = new Tracker(); const stats = new Stats(); const cpucount = os.cpus().length || 1; const adjPeriodSec = 5; +const maxHeapSnaps = 20; let adjTimer = null; -/** @type {memwatch.HeapDiff} */ -let heapdiff = null; ((main) => { // listen for "go" and start the server @@ -340,7 +337,6 @@ function systemUp() { trapServerEvents(hcheck); }); - // if (envutil.measureHeap()) heapdiff = new memwatch.HeapDiff(); heartbeat(); } @@ -360,7 +356,7 @@ function trapServerEvents(s) { const id = tracker.trackConn(s, socket); if (!tracker.valid(id)) { - log.i("tcp: not tracking; server shutting down?"); + log.i("tcp: not tracking; server shutting down?", id); close(socket); return; } @@ -372,7 +368,7 @@ function trapServerEvents(s) { }); socket.on("error", (err) => { - log.d("tcp: incoming conn closed with err; " + err.message); + log.d("tcp: incoming conn", id, "closed:", err.message); close(socket); }); @@ -415,7 +411,7 @@ function trapSecureServerEvents(s) { const id = tracker.trackConn(s, socket); if (!tracker.valid(id)) { - log.i("tls: not tracking; server shutting down?"); + log.i("tls: not tracking; server shutting down?", id); close(socket); return; } @@ -462,11 +458,24 @@ function trapSecureServerEvents(s) { s.on("tlsClientError", (err, /** @type {TLSSocket} */ tlsSocket) => { stats.tlserr += 1; // fly tcp healthchecks also trigger tlsClientErrors - log.d("tls: client err; " + err.message); + log.d("tls: client err;", err.message, addrstr(tlsSocket)); close(tlsSocket); }); } +/** + * @param {TLSSocket|Socket} sock + */ +function addrstr(sock) { + if (!sock) return ""; + if (sock.localAddress == null || sock.remoteAddress == null) return ""; + return ( + `[${sock.localAddress}]:${sock.localPort}` + + "->" + + `[${sock.remoteAddress}]:${sock.remotePort}` + ); +} + /** * @param {tls.Server} s * @returns {void} @@ -845,7 +854,6 @@ async function handleTCPQuery(q, socket, host, flag) { if (bufutil.emptyBuf(q) || !tcpOkay(socket)) return; const rxid = util.xid(); - const t = log.startTime("handle-tcp-query-" + rxid); try { const r = await resolveQuery(rxid, q, host, flag); if (bufutil.emptyBuf(r)) { @@ -860,7 +868,6 @@ async function handleTCPQuery(q, socket, host, flag) { ok = false; log.w(rxid, "tcp: send fail, err", e); } - log.endTime(t); // close socket when !ok if (!ok) { @@ -945,8 +952,6 @@ async function serveHTTPS(req, res) { const buffers = []; - const t = log.startTime("recv-https"); - // if using for await loop, then it must be wrapped in a // try-catch block: stackoverflow.com/questions/69169226 // if not, errors from reading req escapes unhandled. @@ -958,8 +963,6 @@ async function serveHTTPS(req, res) { const b = bufutil.concatBuf(buffers); const bLen = b.byteLength; - log.endTime(t); - if (util.isPostRequest(req) && !dnsutil.validResponseSize(b)) { res.writeHead(dnsutil.dohStatusCode(b), util.corsHeadersIfNeeded(ua)); res.end(); @@ -980,7 +983,6 @@ async function handleHTTPRequest(b, req, res) { heartbeat(); const rxid = util.xid(); - const t = log.startTime("handle-http-req-" + rxid); try { let host = req.headers.host || req.headers[":authority"]; if (isIPv6(host)) host = `[${host}]`; @@ -999,26 +1001,18 @@ async function handleHTTPRequest(b, req, res) { body: req.method === "POST" ? b : null, }); - log.lapTime(t, "upstream-start"); - const fRes = await handleRequest(util.mkFetchEvent(fReq)); - log.lapTime(t, "upstream-end"); - if (!resOkay(res)) { throw new Error("res not writable 1"); } res.writeHead(fRes.status, util.copyHeaders(fRes)); - log.lapTime(t, "send-head"); - // ans may be null on non-2xx responses, such as redirects (3xx) by cc.js // or 4xx responses on timeouts or 5xx on invalid http method const ans = await fRes.arrayBuffer(); - log.lapTime(t, "recv-ans"); - if (!resOkay(res)) { throw new Error("res not writable 2"); } else if (!bufutil.emptyBuf(ans)) { @@ -1034,8 +1028,6 @@ async function handleHTTPRequest(b, req, res) { if (!ok) resClose(res); log.w(e); } - - log.endTime(t); } /** @@ -1061,29 +1053,39 @@ function trapRequestResponseEvents(req, res) { } function heartbeat() { - const maxc = envutil.maxconns(); const minc = envutil.minconns(); + const maxc = envutil.maxconns(); + const isNode = envutil.isNode(); + const notCloud = envutil.onLocal(); const measureHeap = envutil.measureHeap(); - + const freemem = os.freemem() / (1024 * 1024); // in mb + const totmem = os.totalmem() / (1024 * 1024); // in mb // increment no of requests stats.noreqs += 1; - if (!measureHeap) { - endHeapDiffIfNeeded(heapdiff); - heapdiff = null; - } else if (heapdiff == null) { - // heapdiff = new memwatch.HeapDiff(); - } else if (stats.noreqs % (maxc * 10) === 0) { - endHeapDiffIfNeeded(heapdiff); - // heapdiff = new memwatch.HeapDiff(); - } if (stats.noreqs % (minc * 2) === 0) { log.i(stats.str(), "in", (uptime() / 60000) | 0, "mins"); } + + const mul = notCloud ? 2 : 10; + const writeSnap = notCloud || measureHeap; + const ramthres = notCloud || freemem < 0.2 * totmem; + const reqthres = stats.noreqs > 0 && stats.noreqs % (maxc * mul) === 0; + const withinLimit = stats.nofheapsnaps < maxHeapSnaps; + if (isNode && writeSnap && withinLimit && reqthres && ramthres) { + stats.nofheapsnaps += 1; + const n = "s" + stats.nofheapsnaps + "." + stats.noreqs + ".heapsnapshot"; + const start = Date.now(); + // nodejs.org/en/learn/diagnostics/memory/using-heap-snapshot + v8.writeHeapSnapshot(n); // blocks event loop! + const elapsed = (Date.now() - start) / 1000; + log.i("heap snapshot #", stats.nofheapsnaps, n, "in", elapsed, "s"); + } } function adjustMaxConns(n) { const isNode = envutil.isNode(); + const notCloud = envutil.onLocal(); const maxc = envutil.maxconns(); const minc = envutil.minconns(); const adjsPerSec = 60 / adjPeriodSec; @@ -1097,6 +1099,11 @@ function adjustMaxConns(n) { avg5 = ((avg5 * 100) / cpucount) | 0; avg15 = ((avg15 * 100) / cpucount) | 0; + const freemem = os.freemem() / (1024 * 1024); // in mb + const totmem = os.totalmem() / (1024 * 1024); // in mb + const lowram = freemem < 0.1 * totmem; + const verylowram = freemem < 0.025 * totmem; + let adj = stats.bp[3] || 0; // increase in load if (avg5 > 90) { @@ -1111,7 +1118,7 @@ function adjustMaxConns(n) { n = maxc; if (avg1 > 100) { n = minc; - } else if (avg1 > 90 || avg5 > 80) { + } else if (avg1 > 90 || avg5 > 80 || lowram) { n = Math.max((n * 0.2) | 0, minc); } else if (avg1 > 80 || avg5 > 75) { n = Math.max((n * 0.4) | 0, minc); @@ -1135,45 +1142,31 @@ function adjustMaxConns(n) { const breakpoint = 6 * adjsPerSec; // 6 mins const stresspoint = 4 * adjsPerSec; // 4 mins const nstr = stats.openconns + "/" + n; - if (adj > breakpoint) { - log.w("load: stopping; n:", nstr, "adjs:", adj); + if (adj > breakpoint || (verylowram && !notCloud)) { + log.w("load: verylowram! freemem:", freemem, "totmem:", totmem); + log.w("load: stopping lowram?", verylowram, "; n:", nstr, "adjs:", adj); stopAfter(0); return; } else if (adj > stresspoint) { + log.w("load: stress; lowram?", lowram, "mem:", freemem, " / ", totmem); log.w("load: stress; n:", nstr, "adjs:", adj, "avgs:", avg1, avg5, avg15); n = (minc / 2) | 0; } else if (adj > 0) { + log.d("load: high; lowram?", lowram, "mem:", freemem, " / ", totmem); log.d("load: high; n:", nstr, "adjs:", adj, "avgs:", avg1, avg5, avg15); } - // nodejs.org/en/docs/guides/diagnostics/memory/using-gc-traces - if (adj > 0) { - if (isNode) v8.setFlagsFromString("--trace-gc"); - } else { - if (isNode) v8.setFlagsFromString("--notrace-gc"); - } - stats.bp = [avg1, avg5, avg15, adj, n]; for (const s of tracker.servers()) { if (!s || !s.listening) continue; s.maxConnections = n; } -} -/** - * @param {memwatch.HeapDiff} h - * @returns void - */ -function endHeapDiffIfNeeded(h) { - // disabled; memwatch is not bundled due to a webpack bug - if (!h || true) return; - try { - const diff = h.end(); - log.i("heap before", diff.before); - log.i("heap after", diff.after); - log.i("heap details", diff.change.details); - } catch (ex) { - log.w("heap-diff err", ex.message); + // nodejs.org/en/docs/guides/diagnostics/memory/using-gc-traces + if (adj > 0) { + if (isNode) v8.setFlagsFromString("--trace-gc"); + } else { + if (isNode) v8.setFlagsFromString("--notrace-gc"); } } @@ -1182,5 +1175,8 @@ function bye() { // of other unreleased resources (see: svc.js#systemStop); and so exit with // success (exit code 0) regardless; ref: community.fly.io/t/4547/6 console.warn("W game over"); + + if (envutil.isNode()) v8.writeHeapSnapshot("snap.end.heapsnapshot"); + process.exit(0); } diff --git a/src/system.js b/src/system.js index 91997f239f..89871b53be 100644 --- a/src/system.js +++ b/src/system.js @@ -9,6 +9,10 @@ import * as util from "./commons/util.js"; // Evaluate if EventTarget APIs can replace this hand-rolled impl // developers.cloudflare.com/workers/platform/changelog#2021-09-24 + +/** @typedef {any[]?} parcel */ +/** @typedef {function(parcel)} listenfn */ + // once emitted, they stick; firing off new listeners forever, just the once. const stickyEvents = new Set([ // when process bring-up is done @@ -21,12 +25,20 @@ const stickyEvents = new Set([ "go", ]); +/** @type {Map} */ +const stickyParcels = new Map(); + const events = new Set([ // when server should cease "stop", ]); +/** @type {Set} */ +const ephemeralEvents = new Set(); + +/** @type {Map>} */ const listeners = new Map(); +/** @type {Map>} */ const waitGroup = new Map(); (() => { @@ -41,103 +53,180 @@ const waitGroup = new Map(); } })(); -// fires an event -export function pub(event, parcel = undefined) { - awaiters(event, parcel); - callbacks(event, parcel); +/** + * Fires event. + * @param {string} event + * @param {parcel} parcel + * @returns {int} + */ +export function pub(event, parcel = null) { + if (util.emptyString(event)) return; + + const hadEphemeralEvent = ephemeralEvents.delete(event); + + const tot = awaiters(event, parcel, hadEphemeralEvent); + return tot + callbacks(event, parcel, hadEphemeralEvent); } -// invokes cb when event is fired -export function sub(event, cb) { +/** + * Invokes cb when event is fired. + * @param {string} event + * @param {listenfn} cb + * @param {int} timeout + * @returns {boolean} + */ +export function sub(event, cb, timeout = 0) { + if (util.emptyString(event)) return; + if (typeof cb !== "function") return; + const eventCallbacks = listeners.get(event); - // if such even callbacks don't exist if (!eventCallbacks) { - // but event is sticky, fire off the listener at once + // event is sticky, fire off the listener at once if (stickyEvents.has(event)) { - microtaskBox(cb); + const parcel = stickyParcels.get(event); // may be null + microtaskBox(cb, parcel); return true; } - // but event doesn't exist, then there's nothing to do + // event doesn't exist so make it ephemeral + ephemeralEvents.add(event); + listeners.set(event, new Set()); + waitGroup.set(event, new Set()); return false; } - eventCallbacks.add(cb); + const tid = timeout > 0 ? util.timeout(timeout, cb) : -2; + const fulfiller = + tid > 0 + ? (parcel) => { + clearTimeout(tid); + cb(parcel); + } + : cb; + eventCallbacks.add(fulfiller); return true; } -// waits till event fires or timesout +/** + * Waits till event fires or timesout. + * @param {string} event + * @param {int} timeout + * @returns {Promise} + */ export function when(event, timeout = 0) { + if (util.emptyString(event)) { + return Promise.reject(new Error("empty event")); + } + const wg = waitGroup.get(event); if (!wg) { // if stick event, fulfill promise right away if (stickyEvents.has(event)) { - return Promise.resolve(event); + const parcel = stickyParcels.get(event); // may be null + return Promise.resolve(parcel); } - // no such event - return Promise.reject(new Error(event + " missing")); + // no such event so make it ephemeral + ephemeralEvents.add(event); + listeners.set(event, new Set()); + waitGroup.set(event, new Set()); + return Promise.reject(new Error(event + " missing event")); } return new Promise((accept, reject) => { const tid = timeout > 0 ? util.timeout(timeout, () => { - reject(new Error(event + " elapsed " + timeout)); + reject(new Error(event + " event elapsed " + timeout)); }) : -2; - const fulfiller = function (parcel) { + /** @type {listenfn} */ + const fulfiller = (parcel) => { if (tid >= 0) clearTimeout(tid); - accept(parcel, event); + accept(parcel); }; wg.add(fulfiller); }); } -function awaiters(event, parcel) { - const g = waitGroup.get(event); +/** + * @param {string} event + * @param {parcel} parcel + * @param {boolean} ephemeralEvent + * @returns {int} + */ +function awaiters(event, parcel = null, ephemeralEvent = false) { + if (util.emptyString(event)) return 0; + const wg = waitGroup.get(event); - if (!g) return; + if (!wg) return 0; - // listeners valid just the once for stickyEvents + // listeners valid just the once for stickyEvents & ephemeralEvents if (stickyEvents.has(event)) { waitGroup.delete(event); + stickyParcels.set(event, parcel); + } else if (ephemeralEvent) { + // log.d("sys: wg ephemeralEvent", event, parcel); + waitGroup.delete(event); } - safeBox(g, parcel); + if (wg.size === 0) return 0; + + safeBox(wg, parcel); + return wg.size; } -function callbacks(event, parcel) { +/** + * @param {string} event + * @param {parcel} parcel + * @param {boolean} ephemeralEvent + * @returns {int} + */ +function callbacks(event, parcel = null, ephemeralEvent = false) { + if (util.emptyString(event)) return 0; const cbs = listeners.get(event); - if (!cbs) return; + if (!cbs) return 0; - // listeners valid just the once for stickyEvents + // listeners valid just the once for stickyEvents & ephemeralEvents if (stickyEvents.has(event)) { listeners.delete(event); + stickyParcels.set(event, parcel); + } else if (ephemeralEvent) { + // log.d("sys: cb ephemeralEvent", event, parcel); + listeners.delete(event); } + if (cbs.size === 0) return 0; // callbacks are queued async and don't block the caller. On Workers, // where IOs or timers require event-context aka network-context, // which is only available when fns are invoked in response to an // incoming request (through the fetch event handler), such callbacks // may not even fire. Instead use: awaiters and not callbacks. microtaskBox(cbs, parcel); + return cbs.size; } -// TODO: could be replaced with scheduler.wait -// developers.cloudflare.com/workers/platform/changelog#2021-12-10 -// queues fn in a macro-task queue of the event-loop -// exec order: github.com/nodejs/node/issues/22257 +/** + * Queues fn in a macro-task queue of the event-loop + * exec order: github.com/nodejs/node/issues/22257 + * @param {listenfn} fn + */ export function taskBox(fn) { + // TODO: could be replaced with scheduler.wait + // developers.cloudflare.com/workers/platform/changelog#2021-12-10 util.timeout(/* with 0ms delay*/ 0, () => safeBox(fn)); } -// queues fn in a micro-task queue // ref: MDN: Web/API/HTML_DOM_API/Microtask_guide/In_depth // queue-task polyfill: stackoverflow.com/a/61605098 const taskboxPromise = { p: Promise.resolve() }; +/** + * Queues fns in a micro-task queue + * @param {listenfn[]} fns + * @param {parcel} arg + */ function microtaskBox(fns, arg) { let enqueue = null; if (typeof queueMicrotask === "function") { @@ -149,9 +238,14 @@ function microtaskBox(fns, arg) { enqueue(() => safeBox(fns, arg)); } -// TODO: safeBox for async fns with r.push(await f())? -// stackoverflow.com/questions/38508420 +/** + * stackoverflow.com/questions/38508420 + * @param {listenfn[]|listenfn?} fns + * @param {parcel} arg + * @returns {any[]} + */ function safeBox(fns, arg) { + // TODO: safeBox for async fns with r.push(await f())? if (typeof fns === "function") { fns = [fns]; } @@ -169,6 +263,7 @@ function safeBox(fns, arg) { try { r.push(f(arg)); } catch (ignore) { + // log.e("sys: safeBox err", ignore); r.push(null); } }