Skip to content

Commit

Permalink
feat: add Worker class & Limiter (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
arusakov authored Oct 7, 2024
1 parent 74247f6 commit d8aa994
Show file tree
Hide file tree
Showing 10 changed files with 482 additions and 22 deletions.
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
"license": "MIT",
"scripts": {
"build": "tsc -p .",
"compile": "tsc --noEmit -p ./test/",
"compile": "tsc --noEmit -p ./test",
"lint": "eslint .",
"test:all": "TS_NODE_PROJECT=test/tsconfig.json yarn test ./test/*.test.ts",
"test:coverage:html": "c8 --reporter=html --reporter=text yarn test:all",
"test:coverage": "c8 --reporter=lcovonly --reporter=text yarn test:all",
"test": "node --test --test-concurrency=1 --require=ts-node/register"
},
Expand All @@ -30,4 +31,4 @@
"engines": {
"node": ">=20.0.0"
}
}
}
141 changes: 136 additions & 5 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
import { nanos, NatsConnection, NatsError, type MsgHdrs } from '@nats-io/nats-core'
import EventEmitter from 'events'

import type { JetStreamClient } from '@nats-io/jetstream'
import { jetstreamManager } from '@nats-io/jetstream'
import { AckPolicy } from '@nats-io/jetstream'
import { nanos, NatsError } from '@nats-io/nats-core'
import type { JetStreamClient, Consumer, JsMsg} from '@nats-io/jetstream'
import type { MsgHdrs } from '@nats-io/nats-core'

import { createSubject, sleep } from './utils'
import { FixedWindowLimiter, IntervalLimiter, type Limiter } from './limiter'

export type QueueOpts = {

export type QueueOpts
= {
client: JetStreamClient
name: string

Expand Down Expand Up @@ -65,11 +71,136 @@ export class Queue {

}

async add(name: string, data: unknown, options?: AddOptions) {
async add(name: string, data?: unknown, options?: AddOptions) {
const payload = JSON.stringify(data)
return this.client.publish(`${this.name}.${name}`, payload, options && {
msgID: options.id,
headers: options.headers
})
}
}

export type RateLimit = {
duration: number
max: number
}

export type WorkerOpts = {
client: JetStreamClient
name: string
processor: (job: JsMsg) => Promise<void>
concurrency?: number
rateLimit?: RateLimit
}

export class Worker extends EventEmitter {
protected readonly client: JetStreamClient
protected readonly name: string
protected readonly processor: (job: JsMsg) => Promise<void>
protected readonly concurrency: number
protected readonly limiter: Limiter
protected readonly fetchInterval: number
protected readonly fetchTimeout: number

protected consumer: Consumer | null = null
protected running = false
protected processingNow = 0
protected loopPromise: Promise<void> | null = null

constructor(opts: WorkerOpts) {
super()

this.client = opts.client
this.name = opts.name
this.processor = opts.processor
this.concurrency = opts.concurrency || 1

this.fetchInterval = 150
this.fetchTimeout = 3_000
this.limiter = opts.rateLimit ?
new FixedWindowLimiter(opts.rateLimit.max, opts.rateLimit.duration, this.fetchInterval) :
new IntervalLimiter(this.fetchInterval)
}

async setup() {
const manager = await this.client.jetstreamManager()

try {
await manager.streams.add({
name: this.name,
subjects: [createSubject(this.name)],
})
} catch (e) {
if (!(e instanceof NatsError && e.api_error?.err_code === 10058)) {
throw e
}
}

await manager.consumers.add(this.name, {
durable_name: this.name,
ack_policy: AckPolicy.All,
})

this.consumer = await this.client.consumers.get(this.name, this.name)
}

async stop() {
this.running = false

if (this.loopPromise) {
await this.loopPromise
}
while (this.processingNow > 0) {
await sleep(this.fetchInterval)
}
}

start() {
if (!this.consumer) {
throw new Error('call setup() before start()')
}

if (!this.loopPromise) {
this.running = true
this.loopPromise = this.loop()
}
}

protected async loop() {
while (this.running) {
const max = this.limiter.get(this.concurrency - this.processingNow)
const jobs = await this.fetch(max)

for await (const j of jobs) {
this.limiter.inc()
this.process(j) // without await!
}

await sleep(this.limiter.timeout())
}
}

protected async process(j: JsMsg) {
this.processingNow += 1
try {
this.process(j)
await j.ackAck()
} catch (e) {
await j.term()
} finally {
this.processingNow -= 1
}
}

protected async fetch(count: number) {
try {
return this.consumer!.fetch({
max_messages: count,
expires: this.fetchTimeout
})
} catch (e) {
// TODO
return []
}
}
}
59 changes: 59 additions & 0 deletions src/limiter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
export type Limiter = {
inc(): void
timeout(): number
get(max: number): number
}

export class IntervalLimiter implements Limiter {
constructor(
protected readonly interval: number,
) {}

timeout() {
return this.interval
}

inc() {
// NOOP
}

get(max: number) {
return max
}
}

export class FixedWindowLimiter implements Limiter {
protected count = 0
protected timestamp = 0

constructor(
protected readonly max: number,
protected readonly duration: number,
protected readonly interval: number,
) {}

timeout() {
const now = Date.now()
const timestamp = now - (now % this.duration)

if (timestamp !== this.timestamp) {
this.count = 0
this.timestamp = timestamp
}

if (this.count >= this.max) {
this.count = 0
this.timestamp = timestamp + this.duration
return Math.max(this.timestamp - now, this.interval)
}
return this.interval
}

inc() {
this.count += 1
}

get(max: number) {
return Math.min(max, this.max - this.count)
}
}
7 changes: 7 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export const sleep = (ms: number) => new Promise<void>((resolve) => {
setTimeout(resolve, ms)
})

export const createSubject = (name: string) => {
return `${name}.*`
}
97 changes: 97 additions & 0 deletions test/queue-add.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import { equal } from 'assert/strict'
import { describe, it, before, after, afterEach, beforeEach } from 'node:test'

import { connect } from '@nats-io/transport-node'
import { jetstream } from '@nats-io/jetstream'
import { NatsConnection } from '@nats-io/nats-core'
import type { JetStreamClient, JetStreamManager } from '@nats-io/jetstream'

import { Queue, Worker } from '../src'

describe('Queue.add()', () => {
let connection: NatsConnection
let client: JetStreamClient
let manager: JetStreamManager
let queue: Queue

const QUEUE_NAME_1 = 'queue1'
const JOB_NAME_1 = 'job1'

before(async () => {
connection = await connect({
servers: '127.0.0.1:4222',
})
client = jetstream(connection)
manager = await client.jetstreamManager()
})

beforeEach(async () => {
queue = new Queue({
client,
name: QUEUE_NAME_1,
})

await queue.setup()
})

afterEach(async () => {
await manager.streams.delete(QUEUE_NAME_1)
})

after(async () => {
await connection.close()
})

it('OK', async () => {
const ack = await queue.add(JOB_NAME_1)

equal(ack.duplicate, false)
equal(ack.seq, 1)

const stream = await client.streams.get(QUEUE_NAME_1)

const { state: { messages } } = await stream.info()
equal(messages, 1)
})

it('OK with payload', async () => {
const ack = await queue.add(JOB_NAME_1, { x: 1 })

equal(ack.duplicate, false)
equal(ack.seq, 1)
})

it('OK with different IDs', async () => {
const ack1 = await queue.add(JOB_NAME_1, 'data', {
id: 'id1',
})

equal(ack1.duplicate, false)
equal(ack1.seq, 1)

const ack2 = await queue.add(JOB_NAME_1, 'data', {
id: 'id2',
})

equal(ack2.duplicate, false)
equal(ack2.seq, 2)
})

it('OK duplicated IDs', async () => {
const id = '12345'

const ack1 = await queue.add(JOB_NAME_1, 'data', {
id,
})

equal(ack1.duplicate, false)
equal(ack1.seq, 1)

const ack2 = await queue.add(JOB_NAME_1, 'data', {
id,
})

equal(ack2.duplicate, true)
equal(ack2.seq, 1)
})
})
Loading

0 comments on commit d8aa994

Please sign in to comment.