diff --git a/base-server/index.d.ts b/base-server/index.d.ts index a00da0de..318da511 100644 --- a/base-server/index.d.ts +++ b/base-server/index.d.ts @@ -10,16 +10,37 @@ import type { Log, LogStore, Meta, + ReceiveCallback, ServerConnection, TestTime } from '@logux/core' -import type { Server as HTTPServer, IncomingMessage, ServerResponse } from 'http' +import type { + Server as HTTPServer, + IncomingMessage, + ServerResponse +} from 'http' import type { Unsubscribe } from 'nanoevents' import type { LogFn } from 'pino' import type { ChannelContext, Context } from '../context/index.js' import type { ServerClient } from '../server-client/index.js' +interface TypeOptions { + /** + * Name of the queue that will be used to process actions + * of the specified type. Default is 'main' + */ + queue?: string +} + +interface ChannelOptions { + /** + * Name of the queue that will be used to process channels + * with the specified name pattern. Default is 'main' + */ + queue?: string +} + export interface ServerMeta extends Meta { /** * All nodes subscribed to channel will receive the action. @@ -717,6 +738,8 @@ export class BaseServer< */ nodeIds: Map + onReceive: ReceiveCallback + /** * Server options. * @@ -804,6 +827,7 @@ export class BaseServer< * * @param pattern Pattern for channel name. * @param callbacks Callback during subscription process. + * @param options Additional options */ channel< ChannelParams extends object = {}, @@ -811,12 +835,14 @@ export class BaseServer< SubscribeAction extends LoguxSubscribeAction = LoguxSubscribeAction >( pattern: string, - callbacks: ChannelCallbacks + callbacks: ChannelCallbacks, + options?: ChannelOptions ): void /** * @param pattern Regular expression for channel name. * @param callbacks Callback during subscription process. + * @param options Additional options */ channel< ChannelParams extends string[] = string[], @@ -824,7 +850,8 @@ export class BaseServer< SubscribeAction extends LoguxSubscribeAction = LoguxSubscribeAction >( pattern: RegExp, - callbacks: ChannelCallbacks + callbacks: ChannelCallbacks, + options?: ChannelOptions ): void /** @@ -1122,10 +1149,12 @@ export class BaseServer< /** * @param actionCreator Action creator function. * @param callbacks Callbacks for action created by creator. + * @param options Additional options */ type( actionCreator: Creator, - callbacks: ActionCallbacks, Data, Headers> + callbacks: ActionCallbacks, Data, Headers>, + options?: TypeOptions ): void /** @@ -1149,10 +1178,12 @@ export class BaseServer< * * @param name The action’s type or action’s type matching rule as RegExp.. * @param callbacks Callbacks for actions with this type. + * @param options Additional options */ type( name: RegExp | TypeAction['type'], - callbacks: ActionCallbacks + callbacks: ActionCallbacks, + options?: TypeOptions ): void /** diff --git a/base-server/index.js b/base-server/index.js index 1202e2ba..138b0a3e 100644 --- a/base-server/index.js +++ b/base-server/index.js @@ -1,5 +1,6 @@ import { LoguxNotFoundError } from '@logux/actions' import { Log, MemoryStore, parseId, ServerConnection } from '@logux/core' +import fastq from 'fastq' import { promises as fs } from 'fs' import { createNanoEvents } from 'nanoevents' import { nanoid } from 'nanoid' @@ -64,6 +65,12 @@ function normalizeTypeCallbacks(name, callbacks) { } } +async function queueWorker(task, next) { + let { action, meta, processAction, queue } = task + queue.next = next + await processAction(action, meta) +} + function normalizeChannelCallbacks(pattern, callbacks) { if (callbacks && callbacks.accessAndLoad) { callbacks.access = (ctx, ...args) => { @@ -185,7 +192,9 @@ export class BaseServer { this.emitter.emit('report', 'add', { action, meta }) } - if (this.destroying) return + if (this.destroying && !this.actionToQueue.has(meta.id)) { + return + } if (action.type === 'logux/subscribe') { if (meta.server === this.nodeId) { @@ -333,6 +342,10 @@ export class BaseServer { this.timeouts = {} this.lastTimeout = 0 + this.typeToQueue = new Map() + this.queues = new Map() + this.actionToQueue = new Map() + this.controls = { 'GET /': { async request() { @@ -355,6 +368,54 @@ export class BaseServer { this.listenNotes = {} bindBackendProxy(this) + let end = (actionId, queue, queueKey, ...args) => { + this.actionToQueue.delete(actionId) + if (queue.length() === 0) { + this.queues.delete(queueKey) + } + queue.next(...args) + } + let undoRemainingTasks = queue => { + let remainingTasks = queue.getQueue() + if (remainingTasks) { + for (let task of remainingTasks) { + this.undo(task.action, task.meta, 'error') + this.actionToQueue.delete(task.meta.id) + } + } + queue.killAndDrain() + } + this.on('error', (e, action, meta) => { + let queueKey = this.actionToQueue.get(meta?.id) + + if (!queueKey) { + return + } + + let queue = this.queues.get(queueKey) + undoRemainingTasks(queue) + end(meta.id, queue, queueKey, e) + }) + this.on('processed', (action, meta) => { + let queueKeyByActionId = + (action?.type === 'logux/undo' || action?.type === 'logux/processed') && + this.actionToQueue.get(action?.id) + let queueKeyByMetaId = this.actionToQueue.get(meta?.id) + + let queueKey = queueKeyByMetaId || queueKeyByActionId + let actionId = queueKeyByMetaId ? meta?.id : action?.id + + if (!queueKey) { + return + } + + let queue = this.queues.get(queueKey) + if (action.type === 'logux/undo') { + undoRemainingTasks(queue) + } + end(actionId, queue, queueKey, null, meta) + }) + this.unbind.push(() => { for (let i of this.connected.values()) i.destroy() for (let i in this.timeouts) { @@ -373,6 +434,15 @@ export class BaseServer { } }) ) + this.unbind.push(() => { + return Promise.allSettled( + [...this.queues.values()].map(queue => { + return new Promise(resolve => { + queue.drain = resolve + }) + }) + ) + }) } addClient(connection) { @@ -409,7 +479,7 @@ export class BaseServer { return [undoAction, undoMeta] } - channel(pattern, callbacks) { + channel(pattern, callbacks, options = {}) { normalizeChannelCallbacks(`Channel ${pattern}`, callbacks) let channel = Object.assign({}, callbacks) if (typeof pattern === 'string') { @@ -419,6 +489,8 @@ export class BaseServer { } else { channel.regexp = pattern } + + channel.queueName = options.queue || 'main' this.channels.push(channel) } @@ -617,6 +689,49 @@ export class BaseServer { } } + onReceive(processAction, action, meta) { + if (this.actionToQueue.has(meta.id)) { + return + } + + let clientId = parseId(meta.id).clientId + let queueName = '' + + let isChannel = + (action.type === 'logux/subscribe' || + action.type === 'logux/unsubscribe') && + action.channel + + if (isChannel) { + for (let i = 0; i < this.channels.length && !queueName; i++) { + let channel = this.channels[i] + let pattern = channel.regexp || channel.pattern.regex + if (action.channel.match(pattern)) { + queueName = channel.queue + } + } + } else { + queueName = this.typeToQueue.get(action.type) + } + + queueName = queueName || 'main' + let queueKey = `${clientId}/${queueName}` + let queue = this.queues.get(queueKey) + + if (!queue) { + queue = fastq(queueWorker, 1) + this.queues.set(queueKey, queue) + } + + queue.push({ + action, + meta, + processAction, + queue + }) + this.actionToQueue.set(meta.id, queueKey) + } + otherChannel(callbacks) { normalizeChannelCallbacks('Unknown channel', callbacks) if (this.otherSubscriber) { @@ -785,9 +900,10 @@ export class BaseServer { let ctx = this.createContext(action, meta) let client = this.clientIds.get(clientId) for (let filter of Object.values(subscriber.filters)) { - filter = typeof filter === 'function' - ? await filter(ctx, action, meta) - : filter + filter = + typeof filter === 'function' + ? await filter(ctx, action, meta) + : filter if (filter && client) { ignoreClients.add(clientId) client.node.onAdd(action, meta) @@ -924,7 +1040,10 @@ export class BaseServer { if (!match) this.wrongChannel(action, meta) } - type(name, callbacks) { + type(name, callbacks, options = {}) { + let queue = options.queue || 'main' + this.typeToQueue.set(name, queue) + if (typeof name === 'function') name = name.type normalizeTypeCallbacks(`Action type ${name}`, callbacks) diff --git a/bind-backend-proxy/index.test.ts b/bind-backend-proxy/index.test.ts index f1eb5738..f612ba10 100644 --- a/bind-backend-proxy/index.test.ts +++ b/bind-backend-proxy/index.test.ts @@ -471,13 +471,27 @@ it('notifies about actions and subscriptions', async () => { let client = await app.connect('10', { headers: { lang: 'fr' } }) client.log.add({ type: 'A' }) client.log.add({ channel: 'a', type: 'logux/subscribe' }) - await delay(100) + await delay(280) expect(app.log.actions()).toEqual([ { type: 'A' }, - { channel: 'a', type: 'logux/subscribe' } + { channel: 'a', type: 'logux/subscribe' }, + { + id: '1 10:1:1 0', + type: 'logux/processed' + }, + { + type: 'a/load1' + }, + { + type: 'a/load2' + }, + { + id: '2 10:1:1 0', + type: 'logux/processed' + } ]) - expect(app.log.entries()[0][1].status).toEqual('waiting') + expect(app.log.entries()[0][1].status).toEqual('processed') expect(sent).toEqual([ [ 'POST', @@ -505,7 +519,7 @@ it('notifies about actions and subscriptions', async () => { command: 'action', headers: { lang: 'fr' }, meta: { - added: 1, + added: 3, id: '2 10:1:1 0', reasons: ['test'], server: 'server:uuid', @@ -531,11 +545,11 @@ it('notifies about actions and subscriptions', async () => { ]) expect(app.log.entries()[0][1].status).toEqual('processed') expect(events).toEqual([ - 'backendSent', 'backendSent', 'backendGranted', - 'backendGranted', 'backendProcessed', + 'backendSent', + 'backendGranted', 'backendProcessed' ]) expect(processed).toEqual(0) @@ -609,17 +623,27 @@ it('reacts on wrong backend answer', async () => { app.on('error', e => { errors.push(e.message) }) + let client = await app.connect('10') + client.log.add({ type: 'EMPTY' }) + await delay(20) client.log.add({ type: 'BROKEN1' }) + await delay(20) client.log.add({ type: 'BROKEN2' }) + await delay(20) client.log.add({ type: 'BROKEN3' }) + await delay(20) client.log.add({ type: 'BROKEN4' }) + await delay(20) client.log.add({ type: 'BROKEN5' }) + await delay(20) client.log.add({ type: 'BROKEN6' }) + await delay(20) client.log.add({ type: 'BROKEN7' }) + await delay(20) client.log.add({ channel: 'resend', type: 'logux/subscribe' }) - await delay(100) + await delay(20) expect(errors).toEqual([ 'Empty back-end answer', @@ -633,61 +657,61 @@ it('reacts on wrong backend answer', async () => { 'Resend can be called on subscription' ]) expect(app.log.actions()).toEqual([ - { type: 'BROKEN1' }, - { type: 'BROKEN2' }, - { type: 'BROKEN6' }, - { channel: 'resend', type: 'logux/subscribe' }, { action: { type: 'EMPTY' }, id: '1 10:1:1 0', reason: 'error', type: 'logux/undo' }, + { type: 'BROKEN1' }, { action: { type: 'BROKEN1' }, - id: '2 10:1:1 0', + id: '3 10:1:1 0', reason: 'error', type: 'logux/undo' }, + { type: 'BROKEN2' }, { action: { type: 'BROKEN2' }, - id: '3 10:1:1 0', + id: '5 10:1:1 0', reason: 'error', type: 'logux/undo' }, { action: { type: 'BROKEN3' }, - id: '4 10:1:1 0', + id: '7 10:1:1 0', reason: 'error', type: 'logux/undo' }, { action: { type: 'BROKEN4' }, - id: '5 10:1:1 0', + id: '9 10:1:1 0', reason: 'error', type: 'logux/undo' }, { action: { type: 'BROKEN5' }, - id: '6 10:1:1 0', + id: '11 10:1:1 0', reason: 'error', type: 'logux/undo' }, + { type: 'BROKEN6' }, { action: { type: 'BROKEN6' }, - id: '7 10:1:1 0', + id: '13 10:1:1 0', reason: 'error', type: 'logux/undo' }, { action: { type: 'BROKEN7' }, - id: '8 10:1:1 0', + id: '15 10:1:1 0', reason: 'error', type: 'logux/undo' }, + { channel: 'resend', type: 'logux/subscribe' }, { action: { channel: 'resend', type: 'logux/subscribe' }, - id: '9 10:1:1 0', + id: '17 10:1:1 0', reason: 'error', type: 'logux/undo' } @@ -703,24 +727,25 @@ it('reacts on backend error', async () => { }) let client = await app.connect('10') client.log.add({ type: 'AERROR' }) + await delay(150) client.log.add({ type: 'PERROR' }) - await delay(220) + await delay(150) expect(errors).toEqual([ 'Error on back-end server', 'Error on back-end server' ]) expect(app.log.actions()).toEqual([ - { type: 'PERROR' }, { action: { type: 'AERROR' }, id: '1 10:1:1 0', reason: 'error', type: 'logux/undo' }, + { type: 'PERROR' }, { action: { type: 'PERROR' }, - id: '2 10:1:1 0', + id: '3 10:1:1 0', reason: 'error', type: 'logux/undo' } diff --git a/package.json b/package.json index f1227dda..9fdcaac0 100644 --- a/package.json +++ b/package.json @@ -33,11 +33,12 @@ }, "dependencies": { "@logux/actions": "^0.3.1", - "@logux/core": "^0.8.5", + "@logux/core": "VladBrok/core#feat/54", "JSONStream": "^1.3.5", "cookie": "^0.5.0", "dotenv": "^16.3.1", "fast-glob": "^3.3.1", + "fastq": "^1.15.0", "ip": "^1.1.8", "nanodelay": "^2.0.2", "nanoevents": "^8.0.0", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 06acb544..10aa9983 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -9,8 +9,8 @@ dependencies: specifier: ^0.3.1 version: 0.3.1(@logux/core@0.8.5) '@logux/core': - specifier: ^0.8.5 - version: 0.8.5 + specifier: VladBrok/core#feat/54 + version: github.com/VladBrok/core/bf66502c715ff0e2dc72e894cbdc960d85408cc8 JSONStream: specifier: ^1.3.5 version: 1.3.5 @@ -23,6 +23,9 @@ dependencies: fast-glob: specifier: ^3.3.1 version: 3.3.1 + fastq: + specifier: ^1.15.0 + version: 1.15.0 ip: specifier: ^1.1.8 version: 1.1.8 @@ -456,14 +459,7 @@ packages: peerDependencies: '@logux/core': ^0.8.0 dependencies: - '@logux/core': 0.8.5 - dev: false - - /@logux/core@0.8.5: - resolution: {integrity: sha512-KHNcr1hyaPiU4k407CXegyGz945Imwrelz6XnZpbpFdUvC8PaA43fZiIoZ4y2x8Cwfh8gzWUjFP9JiKXlO2yrA==} - engines: {node: ^14.0.0 || ^16.0.0 || >=18.0.0} - dependencies: - nanoevents: 7.0.1 + '@logux/core': github.com/VladBrok/core/bf66502c715ff0e2dc72e894cbdc960d85408cc8 dev: false /@logux/eslint-config@51.0.1(eslint-config-standard@17.1.0)(eslint-plugin-import@2.28.0)(eslint-plugin-n@16.0.1)(eslint-plugin-perfectionist@1.5.1)(eslint-plugin-prefer-let@3.0.1)(eslint-plugin-promise@6.1.1)(eslint@8.47.0): @@ -3146,3 +3142,12 @@ packages: /yyyy-mm-dd@1.0.2: resolution: {integrity: sha512-xHhOFKT1Y29Jc4/6To1hmIUswKhCKplPSbkCyIabAAZ5q/9GmZvlN3WNcafcq3zze3CzYU66XNeSBLCD8Oickw==} dev: false + + github.com/VladBrok/core/bf66502c715ff0e2dc72e894cbdc960d85408cc8: + resolution: {tarball: https://codeload.github.com/VladBrok/core/tar.gz/bf66502c715ff0e2dc72e894cbdc960d85408cc8} + name: '@logux/core' + version: 0.8.4 + engines: {node: ^14.0.0 || ^16.0.0 || >=18.0.0} + dependencies: + nanoevents: 7.0.1 + dev: false diff --git a/server-client/index.js b/server-client/index.js index d1ed3d2c..d4c85574 100644 --- a/server-client/index.js +++ b/server-client/index.js @@ -35,6 +35,7 @@ export class ServerClient { auth: this.auth.bind(this), inFilter: this.filter.bind(this), inMap: this.inMap.bind(this), + onReceive: app.onReceive.bind(app), outMap: this.outMap.bind(this), ping: app.options.ping, subprotocol: app.options.subprotocol, diff --git a/server-client/index.test.ts b/server-client/index.test.ts index 291888b7..74044b85 100644 --- a/server-client/index.test.ts +++ b/server-client/index.test.ts @@ -620,22 +620,22 @@ it('checks action creator', async () => { expect(test.names).toEqual([ 'connect', 'authenticated', + 'denied', 'add', 'add', - 'denied', 'add' ]) - expect(test.reports[4]).toEqual(['denied', { actionId: '2 1:uuid 0' }]) - expect(test.reports[2][1].meta.id).toEqual('1 10:uuid 0') + expect(test.reports[2]).toEqual(['denied', { actionId: '2 1:uuid 0' }]) + expect(test.reports[4][1].meta.id).toEqual('1 10:uuid 0') expect(test.app.log.actions()).toEqual([ { type: 'GOOD' }, - { id: '1 10:uuid 0', type: 'logux/processed' }, { action: { type: 'BAD' }, id: '2 1:uuid 0', reason: 'denied', type: 'logux/undo' - } + }, + { id: '1 10:uuid 0', type: 'logux/processed' } ]) }) @@ -663,8 +663,8 @@ it('allows subscribe and unsubscribe actions', async () => { it('checks action meta', async () => { let test = createReporter() - test.app.type('GOOD', { access: () => true }) - test.app.type('BAD', { access: () => true }) + test.app.type('GOOD', { access: () => true }, { queue: '1' }) + test.app.type('BAD', { access: () => true }, { queue: '2' }) test.app.log.generateId() test.app.log.generateId() @@ -753,35 +753,35 @@ it('checks user access for action', async () => { await sendTo(client, [ 'sync', 2, - { type: 'FOO' }, - { id: [1, '10:uuid', 0], time: 1 }, { bar: true, type: 'FOO' }, + { id: [1, '10:uuid', 0], time: 1 }, + { type: 'FOO' }, { id: [1, '10:uuid', 1], time: 1 } ]) await delay(50) expect(test.app.log.actions()).toEqual([ { bar: true, type: 'FOO' }, + { id: '1 10:uuid 0', type: 'logux/processed' }, { action: { type: 'FOO' }, - id: '1 10:uuid 0', + id: '1 10:uuid 1', reason: 'denied', type: 'logux/undo' - }, - { id: '1 10:uuid 1', type: 'logux/processed' } + } ]) expect(test.names).toEqual([ 'connect', 'authenticated', - 'denied', 'add', 'add', + 'denied', 'add' ]) - expect(test.reports[2][1].actionId).toEqual('1 10:uuid 0') + expect(test.reports[4][1].actionId).toEqual('1 10:uuid 1') expect(sent(client).find(i => i[0] === 'debug')).toEqual([ 'debug', 'error', - 'Action "1 10:uuid 0" was denied' + 'Action "1 10:uuid 1" was denied' ]) }) @@ -1345,46 +1345,66 @@ it('has finally callback', async () => { app.on('error', e => { errors.push(e.message) }) - app.type('A', { - access: () => true, - finally() { - calls.push('A') - } - }) - app.type('B', { - access: () => true, - finally() { - calls.push('B') + app.type( + 'A', + { + access: () => true, + finally() { + calls.push('A') + } }, - process: () => {} - }) - app.type('C', { - access: () => true, - finally() { - calls.push('C') + { queue: 'A' } + ) + app.type( + 'B', + { + access: () => true, + finally() { + calls.push('B') + }, + process: () => {} }, - resend() { - throw new Error('C') - } - }) - app.type('D', { - access() { - throw new Error('D') + { queue: 'B' } + ) + app.type( + 'C', + { + access: () => true, + finally() { + calls.push('C') + }, + resend() { + throw new Error('C') + } }, - finally() { - calls.push('D') - } - }) - app.type('E', { - access: () => true, - finally() { - calls.push('E') - throw new Error('EE') + { queue: 'C' } + ) + app.type( + 'D', + { + access() { + throw new Error('D') + }, + finally() { + calls.push('D') + } }, - process() { - throw new Error('E') - } - }) + { queue: 'D' } + ) + app.type( + 'E', + { + access: () => true, + finally() { + calls.push('E') + throw new Error('EE') + }, + process() { + throw new Error('E') + } + }, + { queue: 'E' } + ) let client = await connectClient(app, '10:client:uuid') await sendTo(client, [ 'sync', @@ -1401,8 +1421,8 @@ it('has finally callback', async () => { { id: [5, '10:client:other', 0], time: 1 } ]) - expect(calls).toEqual(['A', 'B', 'C', 'D', 'E']) - expect(errors).toEqual(['C', 'D', 'E', 'EE']) + expect(calls).toEqual(['D', 'C', 'A', 'E', 'B']) + expect(errors).toEqual(['D', 'C', 'E', 'EE']) }) it('sends error to author', async () => { @@ -2019,3 +2039,451 @@ it('allows to throws LoguxNotFoundError', async () => { } ]) }) + +it('undoes all other actions in a queue if error in one action occurs', async () => { + let app = createServer() + let calls: string[] = [] + let errors: string[] = [] + app.on('error', e => { + errors.push(e.message) + }) + app.type( + 'GOOD 0', + { + access: () => true, + process() { + calls.push('GOOD 0') + } + }, + { queue: '1' } + ) + app.type( + 'BAD', + { + access: () => true, + async process() { + await delay(50) + calls.push('BAD') + throw new Error('BAD') + } + }, + { queue: '1' } + ) + app.type( + 'GOOD 1', + { + access: () => true, + process() { + calls.push('GOOD 1') + } + }, + { queue: '1' } + ) + app.type( + 'GOOD 2', + { + access: () => true, + process() { + calls.push('GOOD 2') + } + }, + { queue: '1' } + ) + + let client = await connectClient(app, '10:client:uuid') + await sendTo(client, [ + 'sync', + 3, + { type: 'GOOD 0' }, + { id: [1, '10:client:other', 0], time: 1 }, + { type: 'BAD' }, + { id: [2, '10:client:other', 0], time: 1 }, + { type: 'GOOD 1' }, + { id: [3, '10:client:other', 0], time: 1 }, + { type: 'GOOD 2' }, + { id: [4, '10:client:other', 0], time: 1 } + ]) + await delay(50) + + expect(errors).toEqual(['BAD']) + expect(calls).toEqual(['GOOD 0', 'BAD']) +}) + +it('does not add action with same ID to the queue', async () => { + let app = createServer() + let errors: string[] = [] + let calls: string[] = [] + app.on('error', e => { + errors.push(e.message) + }) + app.type( + 'FOO', + { + access: () => true, + process: () => { + calls.push('FOO') + } + }, + { queue: '1' } + ) + app.type( + 'BAR', + { + access: () => true, + process: () => { + calls.push('BAR') + } + }, + { queue: '1' } + ) + app.type( + 'BAZ', + { + access: () => true, + process: () => { + calls.push('BAZ') + } + }, + { queue: '2' } + ) + app.type( + 'BOM', + { + access: () => true, + process: () => { + calls.push('BOM') + } + }, + { queue: '2' } + ) + + let client = await connectClient(app, '10:client:uuid') + await sendTo(client, [ + 'sync', + 4, + { type: 'FOO' }, + { id: [1, '10:client:other', 0], time: 1 }, + { type: 'BAR' }, + { id: [1, '10:client:other', 0], time: 1 }, + { type: 'BAZ' }, + { id: [1, '10:client:other', 0], time: 1 }, + { type: 'BOM' }, + { id: [2, '10:client:other', 0], time: 1 } + ]) + + expect(errors).toEqual([]) + expect(calls).toEqual(['FOO', 'BOM']) +}) + +it('does not undo actions in one queue if error occurs in another queue', async () => { + let app = createServer() + let calls: string[] = [] + let errors: string[] = [] + app.on('error', e => { + errors.push(e.message) + }) + app.type( + 'BAD', + { + access: () => true, + process() { + calls.push('BAD') + throw new Error('BAD') + } + }, + { queue: '1' } + ) + app.type( + 'GOOD 1', + { + access: () => true, + async process() { + await delay(30) + calls.push('GOOD 1') + } + }, + { queue: '2' } + ) + app.type( + 'GOOD 2', + { + access: () => true, + process() { + calls.push('GOOD 2') + } + }, + { queue: '2' } + ) + + let client = await connectClient(app, '10:client:uuid') + await sendTo(client, [ + 'sync', + 3, + { type: 'BAD' }, + { id: [1, '10:client:other', 0], time: 1 }, + { type: 'GOOD 1' }, + { id: [2, '10:client:other', 0], time: 1 }, + { type: 'GOOD 2' }, + { id: [3, '10:client:other', 0], time: 1 } + ]) + await delay(50) + + expect(errors).toEqual(['BAD']) + expect(calls).toEqual(['BAD', 'GOOD 1', 'GOOD 2']) +}) + +it('calls access, resend and process in a queue', async () => { + let app = createServer() + let calls: string[] = [] + app.type('FOO', { + async access() { + await delay(50) + calls.push('FOO ACCESS') + return true + }, + async process() { + await delay(50) + calls.push('FOO PROCESS') + }, + async resend() { + await delay(50) + calls.push('FOO RESEND') + return '' + } + }) + app.type('BAR', { + async access() { + calls.push('BAR ACCESS') + return true + }, + async process() { + calls.push('BAR PROCESS') + }, + async resend() { + calls.push('BAR RESEND') + return '' + } + }) + + let client = await connectClient(app, '10:client:uuid') + await sendTo(client, [ + 'sync', + 2, + { type: 'FOO' }, + { id: [1, '10:client:other', 0], time: 1 }, + { type: 'BAR' }, + { id: [2, '10:client:other', 0], time: 1 } + ]) + await delay(200) + + expect(calls).toEqual([ + 'FOO ACCESS', + 'FOO RESEND', + 'FOO PROCESS', + 'BAR ACCESS', + 'BAR RESEND', + 'BAR PROCESS' + ]) +}) + +it('undoes all other actions in a queue if some action should be undone', async () => { + let test = createReporter() + test.app.type('FOO', { + access: () => false + }) + test.app.type('BAR', { + access: () => true + }) + + let client = await connectClient(test.app) + await sendTo(client, [ + 'sync', + 3, + { type: 'FOO' }, + { id: [1, '10:uuid', 0], time: 1 }, + { type: 'BAR' }, + { id: [2, '10:uuid', 0], time: 1 } + ]) + + expect(test.names).toEqual([ + 'connect', + 'authenticated', + 'denied', + 'add', + 'add' + ]) + expect(test.app.log.actions()).toEqual([ + { + action: { type: 'FOO' }, + id: '1 10:uuid 0', + reason: 'denied', + type: 'logux/undo' + }, + { + action: { type: 'BAR' }, + id: '2 10:uuid 0', + reason: 'error', + type: 'logux/undo' + } + ]) +}) + +it('all actions are processed before destroy', async () => { + let app = createServer() + let calls: string[] = [] + app.type( + 'task 1.1', + { + async access() { + return true + }, + async process() { + await delay(30) + calls.push('task 1.1') + } + }, + { queue: '1' } + ) + app.type( + 'task 2.1', + { + async access() { + return true + }, + async process() { + await delay(30) + calls.push('task 2.1') + } + }, + { queue: '1' } + ) + app.type( + 'task 1.2', + { + async access() { + await delay(50) + return true + }, + async process() { + calls.push('task 1.2') + } + }, + { queue: '2' } + ) + app.type( + 'task 2.2', + { + async access() { + return true + }, + async process() { + calls.push('task 2.2') + } + }, + { queue: '2' } + ) + app.type('during destroy', { + async access() { + return true + }, + async process() { + calls.push('during destroy') + } + }) + + let client = await connectClient(app, '10:client:uuid') + sendTo(client, [ + 'sync', + 4, + { type: 'task 1.1' }, + { id: [1, client.nodeId || '', 0], time: 1 }, + { type: 'task 2.1' }, + { id: [2, client.nodeId || '', 0], time: 1 }, + { type: 'task 1.2' }, + { id: [3, client.nodeId || '', 0], time: 1 }, + { type: 'task 2.2' }, + { id: [4, client.nodeId || '', 0], time: 1 } + ]) + await delay(10) + await app.destroy() + + expect(calls).toEqual(['task 1.1', 'task 1.2', 'task 2.2', 'task 2.1']) +}) + +it('recognizes channel regex', async () => { + let app = createServer() + let calls: string[] = [] + app.channel(/ba./, { + access: () => true, + load: (_, action) => { + calls.push(action.channel) + } + }) + + let client = await connectClient(app, '10:client:uuid') + await sendTo(client, [ + 'sync', + 3, + { channel: 'bar', type: 'logux/subscribe' }, + { id: [1, client.nodeId || '', 0], time: 1 }, + { channel: 'baz', type: 'logux/subscribe' }, + { id: [2, client.nodeId || '', 0], time: 1 }, + { channel: 'bom', type: 'logux/subscribe' }, + { id: [3, client.nodeId || '', 0], time: 1 } + ]) + + expect(calls).toEqual(['bar', 'baz']) +}) + +it('recognizes channel pattern', async () => { + let app = createServer() + let calls: string[] = [] + app.channel('/api/users/:id', { + access: () => true, + load: (_, action) => { + calls.push(action.channel) + } + }) + + let client = await connectClient(app, '10:client:uuid') + await sendTo(client, [ + 'sync', + 3, + { channel: '/api/users/5', type: 'logux/subscribe' }, + { id: [1, client.nodeId || '', 0], time: 1 }, + { channel: '/api/users/10', type: 'logux/subscribe' }, + { id: [2, client.nodeId || '', 0], time: 1 }, + { channel: '/api/users/10/9/8', type: 'logux/subscribe' }, + { id: [3, client.nodeId || '', 0], time: 1 } + ]) + + expect(calls).toEqual(['/api/users/5', '/api/users/10']) +}) + +it('removes empty queues', async () => { + let app = createServer() + app.type('FOO', { + access: () => true, + process: async () => { + await delay(50) + } + }) + app.type('BAR', { + access: () => true + }) + + let client = await connectClient(app, '10:client:uuid') + sendTo(client, [ + 'sync', + 2, + { type: 'FOO' }, + { id: [1, '10:client:uuid', 0], time: 1 }, + { type: 'BAR' }, + { id: [2, '10:client:uuid', 0], time: 1 } + ]) + + await delay(10) + expect(privateMethods(app).queues.size).toEqual(1) + await delay(50) + expect(privateMethods(app).queues.size).toEqual(0) +})