Skip to content

Commit

Permalink
Add action processing queues (#146)
Browse files Browse the repository at this point in the history
* add queues

* fix some tests

* comment out broken tests

* remove unused commented code

* remove TODO

* comment out one more test because it is failing

* fix one test

* fix one more test

* fix one more test

* fix one more test

* remove commented code

* fix bind-backend-proxy test by adding setQueue method

* fix one more test

* use setQueue inside base-server

* add type for onActions callback

* add type and docs for setQueue

* add types and docs for #type and #channel

* fix pnpm-lock

* remove eslint-disable-no-console

* change test delay

* change test delay

* change test delay

* fix test

* always unbind error if processed

* fix tests, add test

* add test

* make sure that all actions are processed before destroy

* import actionsCallback type from core

* add test that checks channel regex

* add test that checks channel pattern

* add test to check that error in one queue does not affect another queue

* add test: "undoes all other actions in a queue if some action should be undone"

* add test that checks setQueue method

* remove empty queues from the map

* replace ignoreDestroying with actionsInQueue set

* update onActions

* check for duplicate meta.id

* rename onActions to onSync

* refactor (rename, simplify code), use VladBrok/core

* optimize by binding single event listeners for all queues instead of for each action

* adjust logic when queue is undefined

* add test to check that an action with same ID is not added to the queue

* Update base-server/index.js

Co-authored-by: Andrey Sitnik <[email protected]>

* Update base-server/index.js

Co-authored-by: Andrey Sitnik <[email protected]>

* Update base-server/index.js

Co-authored-by: Andrey Sitnik <[email protected]>

* code style fix

* fix lint error

* fix missing queue

* use updated onSync api that allows to call access in a queue

* update VladBrok/core

* fix test

* fix test

* use only action.id in on(processed) for queues, fix destroying logic

* remove setQueue, change tests accordingly

* check action.type before checking action.id

* rename onSync to onReceive

---------

Co-authored-by: Andrey Sitnik <[email protected]>
  • Loading branch information
VladBrok and ai authored Sep 10, 2023
1 parent 049642f commit ae49d1b
Show file tree
Hide file tree
Showing 7 changed files with 747 additions and 97 deletions.
41 changes: 36 additions & 5 deletions base-server/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,37 @@ import type {
Log,
LogStore,
Meta,
ReceiveCallback,
ServerConnection,
TestTime
} from '@logux/core'
import type { Server as HTTPServer, IncomingMessage, ServerResponse } from 'http'
import type {
Server as HTTPServer,
IncomingMessage,
ServerResponse
} from 'http'
import type { Unsubscribe } from 'nanoevents'
import type { LogFn } from 'pino'

import type { ChannelContext, Context } from '../context/index.js'
import type { ServerClient } from '../server-client/index.js'

interface TypeOptions {
/**
* Name of the queue that will be used to process actions
* of the specified type. Default is 'main'
*/
queue?: string
}

interface ChannelOptions {
/**
* Name of the queue that will be used to process channels
* with the specified name pattern. Default is 'main'
*/
queue?: string
}

export interface ServerMeta extends Meta {
/**
* All nodes subscribed to channel will receive the action.
Expand Down Expand Up @@ -717,6 +738,8 @@ export class BaseServer<
*/
nodeIds: Map<string, ServerClient>

onReceive: ReceiveCallback

/**
* Server options.
*
Expand Down Expand Up @@ -804,27 +827,31 @@ export class BaseServer<
*
* @param pattern Pattern for channel name.
* @param callbacks Callback during subscription process.
* @param options Additional options
*/
channel<
ChannelParams extends object = {},
Data extends object = {},
SubscribeAction extends LoguxSubscribeAction = LoguxSubscribeAction
>(
pattern: string,
callbacks: ChannelCallbacks<SubscribeAction, Data, ChannelParams, Headers>
callbacks: ChannelCallbacks<SubscribeAction, Data, ChannelParams, Headers>,
options?: ChannelOptions
): void

/**
* @param pattern Regular expression for channel name.
* @param callbacks Callback during subscription process.
* @param options Additional options
*/
channel<
ChannelParams extends string[] = string[],
Data extends object = {},
SubscribeAction extends LoguxSubscribeAction = LoguxSubscribeAction
>(
pattern: RegExp,
callbacks: ChannelCallbacks<SubscribeAction, Data, ChannelParams, Headers>
callbacks: ChannelCallbacks<SubscribeAction, Data, ChannelParams, Headers>,
options?: ChannelOptions
): void

/**
Expand Down Expand Up @@ -1122,10 +1149,12 @@ export class BaseServer<
/**
* @param actionCreator Action creator function.
* @param callbacks Callbacks for action created by creator.
* @param options Additional options
*/
type<Creator extends AbstractActionCreator, Data extends object = {}>(
actionCreator: Creator,
callbacks: ActionCallbacks<ReturnType<Creator>, Data, Headers>
callbacks: ActionCallbacks<ReturnType<Creator>, Data, Headers>,
options?: TypeOptions
): void

/**
Expand All @@ -1149,10 +1178,12 @@ export class BaseServer<
*
* @param name The action’s type or action’s type matching rule as RegExp..
* @param callbacks Callbacks for actions with this type.
* @param options Additional options
*/
type<TypeAction extends Action = AnyAction, Data extends object = {}>(
name: RegExp | TypeAction['type'],
callbacks: ActionCallbacks<TypeAction, Data, Headers>
callbacks: ActionCallbacks<TypeAction, Data, Headers>,
options?: TypeOptions
): void

/**
Expand Down
131 changes: 125 additions & 6 deletions base-server/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { LoguxNotFoundError } from '@logux/actions'
import { Log, MemoryStore, parseId, ServerConnection } from '@logux/core'
import fastq from 'fastq'
import { promises as fs } from 'fs'
import { createNanoEvents } from 'nanoevents'
import { nanoid } from 'nanoid'
Expand Down Expand Up @@ -64,6 +65,12 @@ function normalizeTypeCallbacks(name, callbacks) {
}
}

async function queueWorker(task, next) {
let { action, meta, processAction, queue } = task
queue.next = next
await processAction(action, meta)
}

function normalizeChannelCallbacks(pattern, callbacks) {
if (callbacks && callbacks.accessAndLoad) {
callbacks.access = (ctx, ...args) => {
Expand Down Expand Up @@ -185,7 +192,9 @@ export class BaseServer {
this.emitter.emit('report', 'add', { action, meta })
}

if (this.destroying) return
if (this.destroying && !this.actionToQueue.has(meta.id)) {
return
}

if (action.type === 'logux/subscribe') {
if (meta.server === this.nodeId) {
Expand Down Expand Up @@ -333,6 +342,10 @@ export class BaseServer {
this.timeouts = {}
this.lastTimeout = 0

this.typeToQueue = new Map()
this.queues = new Map()
this.actionToQueue = new Map()

this.controls = {
'GET /': {
async request() {
Expand All @@ -355,6 +368,54 @@ export class BaseServer {
this.listenNotes = {}
bindBackendProxy(this)

let end = (actionId, queue, queueKey, ...args) => {
this.actionToQueue.delete(actionId)
if (queue.length() === 0) {
this.queues.delete(queueKey)
}
queue.next(...args)
}
let undoRemainingTasks = queue => {
let remainingTasks = queue.getQueue()
if (remainingTasks) {
for (let task of remainingTasks) {
this.undo(task.action, task.meta, 'error')
this.actionToQueue.delete(task.meta.id)
}
}
queue.killAndDrain()
}
this.on('error', (e, action, meta) => {
let queueKey = this.actionToQueue.get(meta?.id)

if (!queueKey) {
return
}

let queue = this.queues.get(queueKey)
undoRemainingTasks(queue)
end(meta.id, queue, queueKey, e)
})
this.on('processed', (action, meta) => {
let queueKeyByActionId =
(action?.type === 'logux/undo' || action?.type === 'logux/processed') &&
this.actionToQueue.get(action?.id)
let queueKeyByMetaId = this.actionToQueue.get(meta?.id)

let queueKey = queueKeyByMetaId || queueKeyByActionId
let actionId = queueKeyByMetaId ? meta?.id : action?.id

if (!queueKey) {
return
}

let queue = this.queues.get(queueKey)
if (action.type === 'logux/undo') {
undoRemainingTasks(queue)
}
end(actionId, queue, queueKey, null, meta)
})

this.unbind.push(() => {
for (let i of this.connected.values()) i.destroy()
for (let i in this.timeouts) {
Expand All @@ -373,6 +434,15 @@ export class BaseServer {
}
})
)
this.unbind.push(() => {
return Promise.allSettled(
[...this.queues.values()].map(queue => {
return new Promise(resolve => {
queue.drain = resolve
})
})
)
})
}

addClient(connection) {
Expand Down Expand Up @@ -409,7 +479,7 @@ export class BaseServer {
return [undoAction, undoMeta]
}

channel(pattern, callbacks) {
channel(pattern, callbacks, options = {}) {
normalizeChannelCallbacks(`Channel ${pattern}`, callbacks)
let channel = Object.assign({}, callbacks)
if (typeof pattern === 'string') {
Expand All @@ -419,6 +489,8 @@ export class BaseServer {
} else {
channel.regexp = pattern
}

channel.queueName = options.queue || 'main'
this.channels.push(channel)
}

Expand Down Expand Up @@ -617,6 +689,49 @@ export class BaseServer {
}
}

onReceive(processAction, action, meta) {
if (this.actionToQueue.has(meta.id)) {
return
}

let clientId = parseId(meta.id).clientId
let queueName = ''

let isChannel =
(action.type === 'logux/subscribe' ||
action.type === 'logux/unsubscribe') &&
action.channel

if (isChannel) {
for (let i = 0; i < this.channels.length && !queueName; i++) {
let channel = this.channels[i]
let pattern = channel.regexp || channel.pattern.regex
if (action.channel.match(pattern)) {
queueName = channel.queue
}
}
} else {
queueName = this.typeToQueue.get(action.type)
}

queueName = queueName || 'main'
let queueKey = `${clientId}/${queueName}`
let queue = this.queues.get(queueKey)

if (!queue) {
queue = fastq(queueWorker, 1)
this.queues.set(queueKey, queue)
}

queue.push({
action,
meta,
processAction,
queue
})
this.actionToQueue.set(meta.id, queueKey)
}

otherChannel(callbacks) {
normalizeChannelCallbacks('Unknown channel', callbacks)
if (this.otherSubscriber) {
Expand Down Expand Up @@ -785,9 +900,10 @@ export class BaseServer {
let ctx = this.createContext(action, meta)
let client = this.clientIds.get(clientId)
for (let filter of Object.values(subscriber.filters)) {
filter = typeof filter === 'function'
? await filter(ctx, action, meta)
: filter
filter =
typeof filter === 'function'
? await filter(ctx, action, meta)
: filter
if (filter && client) {
ignoreClients.add(clientId)
client.node.onAdd(action, meta)
Expand Down Expand Up @@ -924,7 +1040,10 @@ export class BaseServer {
if (!match) this.wrongChannel(action, meta)
}

type(name, callbacks) {
type(name, callbacks, options = {}) {
let queue = options.queue || 'main'
this.typeToQueue.set(name, queue)

if (typeof name === 'function') name = name.type
normalizeTypeCallbacks(`Action type ${name}`, callbacks)

Expand Down
Loading

0 comments on commit ae49d1b

Please sign in to comment.