Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add action processing queues #146

Merged
merged 58 commits into from
Sep 10, 2023
Merged
Show file tree
Hide file tree
Changes from 55 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
9eefcc3
add queues
VladBrok Aug 3, 2023
279422d
fix some tests
VladBrok Aug 3, 2023
7a37946
comment out broken tests
VladBrok Aug 3, 2023
f245925
remove unused commented code
VladBrok Aug 3, 2023
4823096
remove TODO
VladBrok Aug 3, 2023
6a63781
comment out one more test because it is failing
VladBrok Aug 3, 2023
86e343d
fix one test
VladBrok Aug 3, 2023
75b2f32
fix one more test
VladBrok Aug 3, 2023
de451fc
fix one more test
VladBrok Aug 3, 2023
3cc8d5f
fix one more test
VladBrok Aug 3, 2023
e065beb
remove commented code
VladBrok Aug 3, 2023
f10de20
fix bind-backend-proxy test by adding setQueue method
VladBrok Aug 6, 2023
3d6847f
fix one more test
VladBrok Aug 6, 2023
edf931f
use setQueue inside base-server
VladBrok Aug 6, 2023
3ab59e9
add type for onActions callback
VladBrok Aug 6, 2023
9518518
add type and docs for setQueue
VladBrok Aug 6, 2023
6cff1da
merge with main, fix
VladBrok Aug 6, 2023
51d2623
add types and docs for #type and #channel
VladBrok Aug 6, 2023
a472744
fix pnpm-lock
VladBrok Aug 6, 2023
55cd58b
remove eslint-disable-no-console
VladBrok Aug 6, 2023
4111eed
change test delay
VladBrok Aug 6, 2023
c8dd5df
change test delay
VladBrok Aug 6, 2023
0ffe131
change test delay
VladBrok Aug 6, 2023
8ab1940
fix test
VladBrok Aug 6, 2023
b0f0d69
always unbind error if processed
VladBrok Aug 7, 2023
e9c74c1
fix tests, add test
VladBrok Aug 7, 2023
d04eaeb
add test
VladBrok Aug 7, 2023
5732085
make sure that all actions are processed before destroy
VladBrok Aug 7, 2023
3421325
import actionsCallback type from core
VladBrok Aug 7, 2023
79ea1ff
add test that checks channel regex
VladBrok Aug 8, 2023
13365e7
add test that checks channel pattern
VladBrok Aug 8, 2023
5ea5d90
add test to check that error in one queue does not affect another queue
VladBrok Aug 8, 2023
6a18219
add test: "undoes all other actions in a queue if some action should …
VladBrok Aug 8, 2023
165cc87
add test that checks setQueue method
VladBrok Aug 8, 2023
a068af5
remove empty queues from the map
VladBrok Aug 8, 2023
40de15c
replace ignoreDestroying with actionsInQueue set
VladBrok Aug 23, 2023
f018ab4
update onActions
VladBrok Aug 27, 2023
79b4bf6
check for duplicate meta.id
VladBrok Aug 29, 2023
96ceb63
rename onActions to onSync
VladBrok Aug 29, 2023
0f0b88f
refactor (rename, simplify code), use VladBrok/core
VladBrok Aug 30, 2023
cb01eb3
optimize by binding single event listeners for all queues instead of …
VladBrok Aug 30, 2023
f5a86ac
adjust logic when queue is undefined
VladBrok Aug 30, 2023
6061995
add test to check that an action with same ID is not added to the queue
VladBrok Aug 30, 2023
6296643
Update base-server/index.js
VladBrok Aug 30, 2023
4027724
Update base-server/index.js
VladBrok Aug 30, 2023
13cff3e
Update base-server/index.js
VladBrok Aug 30, 2023
1e41a51
code style fix
VladBrok Aug 30, 2023
7fad4e6
fix lint error
VladBrok Aug 30, 2023
7d7b760
fix missing queue
VladBrok Aug 31, 2023
4434b44
use updated onSync api that allows to call access in a queue
VladBrok Aug 31, 2023
2b94ecb
update VladBrok/core
VladBrok Aug 31, 2023
0bf8f43
fix test
VladBrok Aug 31, 2023
040e23e
fix test
VladBrok Aug 31, 2023
a885f92
use only action.id in on(processed) for queues, fix destroying logic
VladBrok Sep 1, 2023
b80bd7d
remove setQueue, change tests accordingly
VladBrok Sep 1, 2023
5f6f10c
check action.type before checking action.id
VladBrok Sep 1, 2023
34e9bd1
rename onSync to onReceive
VladBrok Sep 1, 2023
8afebda
Merge branch 'next' into feat/54
ai Sep 10, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 36 additions & 5 deletions base-server/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,36 @@ import type {
LogStore,
Meta,
ServerConnection,
SyncCallback,
TestTime
} from '@logux/core'
import type { Server as HTTPServer, IncomingMessage, ServerResponse } from 'http'
import type {
Server as HTTPServer,
IncomingMessage,
ServerResponse
} from 'http'
import type { Unsubscribe } from 'nanoevents'
import type { LogFn } from 'pino'

import type { ChannelContext, Context } from '../context/index.js'
import type { ServerClient } from '../server-client/index.js'

interface TypeOptions {
/**
* Name of the queue that will be used to process actions
* of the specified type. Default is 'main'
*/
queue?: string
}

interface ChannelOptions {
/**
* Name of the queue that will be used to process channels
* with the specified name pattern. Default is 'main'
*/
queue?: string
}

export interface ServerMeta extends Meta {
/**
* All nodes subscribed to channel will receive the action.
Expand Down Expand Up @@ -716,6 +737,8 @@ export class BaseServer<
*/
nodeIds: Map<string, ServerClient>

onSync: SyncCallback
ai marked this conversation as resolved.
Show resolved Hide resolved

/**
* Server options.
*
Expand Down Expand Up @@ -803,27 +826,31 @@ export class BaseServer<
*
* @param pattern Pattern for channel name.
* @param callbacks Callback during subscription process.
* @param options Additional options
*/
channel<
ChannelParams extends object = {},
Data extends object = {},
SubscribeAction extends LoguxSubscribeAction = LoguxSubscribeAction
>(
pattern: string,
callbacks: ChannelCallbacks<SubscribeAction, Data, ChannelParams, Headers>
callbacks: ChannelCallbacks<SubscribeAction, Data, ChannelParams, Headers>,
options?: ChannelOptions
): void

/**
* @param pattern Regular expression for channel name.
* @param callbacks Callback during subscription process.
* @param options Additional options
*/
channel<
ChannelParams extends string[] = string[],
Data extends object = {},
SubscribeAction extends LoguxSubscribeAction = LoguxSubscribeAction
>(
pattern: RegExp,
callbacks: ChannelCallbacks<SubscribeAction, Data, ChannelParams, Headers>
callbacks: ChannelCallbacks<SubscribeAction, Data, ChannelParams, Headers>,
options?: ChannelOptions
): void

/**
Expand Down Expand Up @@ -1120,10 +1147,12 @@ export class BaseServer<
/**
* @param actionCreator Action creator function.
* @param callbacks Callbacks for action created by creator.
* @param options Additional options
*/
type<Creator extends AbstractActionCreator, Data extends object = {}>(
actionCreator: Creator,
callbacks: ActionCallbacks<ReturnType<Creator>, Data, Headers>
callbacks: ActionCallbacks<ReturnType<Creator>, Data, Headers>,
options?: TypeOptions
): void

/**
Expand All @@ -1147,10 +1176,12 @@ export class BaseServer<
*
* @param name The action’s type or action’s type matching rule as RegExp..
* @param callbacks Callbacks for actions with this type.
* @param options Additional options
*/
type<TypeAction extends Action = AnyAction, Data extends object = {}>(
name: RegExp | TypeAction['type'],
callbacks: ActionCallbacks<TypeAction, Data, Headers>
callbacks: ActionCallbacks<TypeAction, Data, Headers>,
options?: TypeOptions
): void

/**
Expand Down
126 changes: 120 additions & 6 deletions base-server/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { LoguxNotFoundError } from '@logux/actions'
import { Log, MemoryStore, parseId, ServerConnection } from '@logux/core'
import fastq from 'fastq'
import { promises as fs } from 'fs'
import { createNanoEvents } from 'nanoevents'
import { nanoid } from 'nanoid'
Expand Down Expand Up @@ -64,6 +65,12 @@ function normalizeTypeCallbacks(name, callbacks) {
}
}

async function queueWorker(task, next) {
let { action, meta, processAction, queue } = task
queue.next = next
await processAction(action, meta)
}

function normalizeChannelCallbacks(pattern, callbacks) {
if (callbacks && callbacks.accessAndLoad) {
callbacks.access = (ctx, ...args) => {
Expand Down Expand Up @@ -185,7 +192,13 @@ export class BaseServer {
this.emitter.emit('report', 'add', { action, meta })
}

if (this.destroying) return
if (
this.destroying &&
!this.actionToQueue.has(meta.id) &&
!this.actionToQueue.has(action.id)
) {
return
}

if (action.type === 'logux/subscribe') {
if (meta.server === this.nodeId) {
Expand Down Expand Up @@ -333,6 +346,10 @@ export class BaseServer {
this.timeouts = {}
this.lastTimeout = 0

this.typeToQueue = new Map()
this.queues = new Map()
this.actionToQueue = new Map()

this.controls = {
'GET /': {
async request() {
Expand All @@ -355,6 +372,45 @@ export class BaseServer {
this.listenNotes = {}
bindBackendProxy(this)

let end = (actionId, queue, queueKey, ...args) => {
this.actionToQueue.delete(actionId)
if (queue.length() === 0) {
this.queues.delete(queueKey)
}
queue.next(...args)
}
let undoRemainingTasks = queue => {
let remainingTasks = queue.getQueue()
if (remainingTasks) {
for (let task of remainingTasks) {
this.undo(task.action, task.meta, 'error')
this.actionToQueue.delete(task.meta.id)
}
}
queue.killAndDrain()
}
this.on('error', (e, action, meta) => {
let queueKey = this.actionToQueue.get(meta?.id)
if (!queueKey) {
return
}
let queue = this.queues.get(queueKey)
undoRemainingTasks(queue)
end(meta.id, queue, queueKey, e)
})
this.on('processed', (action, meta) => {
let queueKey = this.actionToQueue.get(action?.id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can’t check action.id until you check action.type. Diferent actions could have a very dfferent data in action with possible collisions.

So add logux/processed or something here. But I think we should check meta.id to mark tasks as processed on their own processed event, not on processed event of logux/processed action for that action.

let actionId = action?.id
if (!queueKey) {
return
}
let queue = this.queues.get(queueKey)
if (action.type === 'logux/undo') {
undoRemainingTasks(queue)
}
end(actionId, queue, queueKey, null, meta)
})

this.unbind.push(() => {
for (let i of this.connected.values()) i.destroy()
for (let i in this.timeouts) {
Expand All @@ -373,6 +429,15 @@ export class BaseServer {
}
})
)
this.unbind.push(() => {
return Promise.allSettled(
[...this.queues.values()].map(queue => {
return new Promise(resolve => {
queue.drain = resolve
})
})
)
})
}

addClient(connection) {
Expand Down Expand Up @@ -409,7 +474,7 @@ export class BaseServer {
return [undoAction, undoMeta]
}

channel(pattern, callbacks) {
channel(pattern, callbacks, options = {}) {
normalizeChannelCallbacks(`Channel ${pattern}`, callbacks)
let channel = Object.assign({}, callbacks)
if (typeof pattern === 'string') {
Expand All @@ -419,6 +484,8 @@ export class BaseServer {
} else {
channel.regexp = pattern
}

channel.queueName = options.queue || 'main'
this.channels.push(channel)
}

Expand Down Expand Up @@ -617,6 +684,49 @@ export class BaseServer {
}
}

onSync(processAction, action, meta) {
if (this.actionToQueue.has(meta.id)) {
return
}

let clientId = parseId(meta.id).clientId
let queueName = ''

let isChannel =
(action.type === 'logux/subscribe' ||
action.type === 'logux/unsubscribe') &&
action.channel

if (isChannel) {
for (let i = 0; i < this.channels.length && !queueName; i++) {
let channel = this.channels[i]
let pattern = channel.regexp || channel.pattern.regex
if (action.channel.match(pattern)) {
queueName = channel.queue
}
}
} else {
queueName = this.typeToQueue.get(action.type)
}

queueName = queueName || 'main'
let queueKey = `${clientId}/${queueName}`
let queue = this.queues.get(queueKey)

if (!queue) {
queue = fastq(queueWorker, 1)
this.queues.set(queueKey, queue)
}

queue.push({
action,
meta,
processAction,
queue
})
this.actionToQueue.set(meta.id, queueKey)
}

otherChannel(callbacks) {
normalizeChannelCallbacks('Unknown channel', callbacks)
if (this.otherSubscriber) {
Expand Down Expand Up @@ -785,9 +895,10 @@ export class BaseServer {
let ctx = this.createContext(action, meta)
let client = this.clientIds.get(clientId)
for (let filter of Object.values(subscriber.filters)) {
filter = typeof filter === 'function'
? await filter(ctx, action, meta)
: filter
filter =
typeof filter === 'function'
? await filter(ctx, action, meta)
: filter
if (filter && client) {
ignoreClients.add(clientId)
client.node.onAdd(action, meta)
Expand Down Expand Up @@ -924,7 +1035,10 @@ export class BaseServer {
if (!match) this.wrongChannel(action, meta)
}

type(name, callbacks) {
type(name, callbacks, options = {}) {
let queue = options.queue || 'main'
this.typeToQueue.set(name, queue)

if (typeof name === 'function') name = name.type
normalizeTypeCallbacks(`Action type ${name}`, callbacks)

Expand Down
Loading