Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incorporate lru-cache library for basis of async cache logic #34

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,12 @@ aborted, it will stay in the cache.

```js
import AbortablePromiseCache from '@gmod/abortable-promise-cache'
import QuickLRU from 'quick-lru'

const cache = new AbortablePromiseCache({
// QuickLRU is a good backing cache to use, but you can use any
// cache as long as it supports `get`, `set`, `delete`, and `keys`.
cache: new QuickLRU({ maxSize: 1000 }),
max: 1000,

// the `fill` callback will be called for a cache miss
async fill(requestData, abortSignal) {
async fetchMethod(requestData, abortSignal) {
// do some long-running thing
return longRunningThing(requestData, abortSignal)
},
Expand All @@ -40,7 +37,10 @@ const cache = new AbortablePromiseCache({
// Fill requests will be signaled to abort if all the requests for them
// so far have been aborted.
const aborter = new AbortController()
const result = await cache.get('some key', { ...anyStuff }, aborter.signal)
const result = await cache.fetch('some key', {
context: { anyStuff },
signal: aborter.signal,
})

// deleting and clearing will abort any outstanding requests
cache.delete('some key')
Expand Down
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
"prepublishOnly": "npm run lint && npm test && npm run build",
"postversion": "git push --follow-tags"
},
"dependencies": {
"lru-cache": "^11.0.0"
},
"devDependencies": {
"@types/node": "^20.14.11",
"@typescript-eslint/eslint-plugin": "^8.0.1",
Expand All @@ -37,7 +40,6 @@
"eslint": "^9.9.0",
"eslint-plugin-unicorn": "^55.0.0",
"prettier": "^3.3.3",
"quick-lru": "^4.0.0",
"rimraf": "^6.0.1",
"typescript": "^5.5.4",
"typescript-eslint": "^8.4.0",
Expand Down
311 changes: 83 additions & 228 deletions src/AbortablePromiseCache.ts
Original file line number Diff line number Diff line change
@@ -1,245 +1,100 @@
import AggregateAbortController from './AggregateAbortController'
import { LRUCache } from 'lru-cache'
import AggregateStatusReporter from './AggregateStatusReporter'

type Callback = (arg: unknown) => void

interface Cache<U> {
delete: (key: string) => void
keys: () => Iterator<string>
get: (key: string) => U | undefined
set: (key: string, value: U) => void
has: (key: string) => boolean
}
type FillCallback<T, U> = (
data: T,
signal?: AbortSignal,
statusCallback?: Callback,
) => Promise<U>

interface Entry<U> {
aborter: AggregateAbortController
settled: boolean
readonly aborted: boolean
statusReporter: AggregateStatusReporter
promise: Promise<U>
}
export default class AbortablePromiseCache<T, U> {
/**
* @param {object} args constructor args
* @param {Function} args.fill fill callback, will be called with sig `fill(data, signal)`
* @param {object} args.cache backing store to use, must implement `get(key)`, `set(key, val)`,
* `delete(key)`, and `keys() -> iterator`
*/

private cache: Cache<Entry<U>>
private fillCallback: FillCallback<T, U>

constructor({
fill,
cache,
}: {
fill: FillCallback<T, U>
cache: Cache<Entry<U>>
}) {
if (typeof fill !== 'function') {
throw new TypeError('must pass a fill function')
export default class AbortablePromiseCache<
// eslint-disable-next-line @typescript-eslint/no-empty-object-type
K extends {},
// eslint-disable-next-line @typescript-eslint/no-empty-object-type
V extends {},
FC = unknown,
> extends LRUCache<K, V, FC> {
inflight = new Map<
K,
{
count: number
abortController: AbortController
statusReporter: AggregateStatusReporter
}
if (typeof cache !== 'object') {
throw new TypeError('must pass a cache object')
>()

async fetch(
k: K,
fetchOptions: LRUCache.FetchOptions<K, V, FC> = {},
): Promise<undefined | V> {
const val = this.inflight.get(k)
const val2 =
val === undefined
? {
count: 1,
abortController: new AbortController(),
statusReporter: new AggregateStatusReporter(),
}
: {
...val,
count: val.count + 1,
}
this.inflight.set(k, val2)
const { signal, ...rest } = fetchOptions

if (signal?.aborted) {
throw new Error('aborted')
}
if (
typeof cache.get !== 'function' ||
typeof cache.set !== 'function' ||
typeof cache.delete !== 'function'
) {
throw new TypeError(
'cache must implement get(key), set(key, val), and and delete(key)',
)
}

this.cache = cache
this.fillCallback = fill
}

static isAbortException(exception: Error) {
return (
// DOMException
exception.name === 'AbortError' ||
// standard-ish non-DOM abort exception
//@ts-ignore
exception.code === 'ERR_ABORTED' ||
// stringified DOMException
exception.message === 'AbortError: aborted' ||
// stringified standard-ish exception
exception.message === 'Error: aborted'
)
}

evict(key: string, entry: Entry<U>) {
if (this.cache.get(key) === entry) {
this.cache.delete(key)
}
}

fill(key: string, data: T, signal?: AbortSignal, statusCallback?: Callback) {
const aborter = new AggregateAbortController()
const statusReporter = new AggregateStatusReporter()
statusReporter.addCallback(statusCallback)
const newEntry: Entry<U> = {
aborter: aborter,
promise: this.fillCallback(data, aborter.signal, (message: unknown) => {
statusReporter.callback(message)
}),
settled: false,
statusReporter,
get aborted() {
return this.aborter.signal.aborted
},
}
newEntry.aborter.addSignal(signal)

// remove the fill from the cache when its abortcontroller fires, if still in there
newEntry.aborter.signal.addEventListener('abort', () => {
if (!newEntry.settled) {
this.evict(key, newEntry)
signal?.addEventListener('abort', () => {
const val = this.inflight.get(k)
if (val === undefined) {
// unknown
return
}
})

// chain off the cached promise to record when it settles
newEntry.promise
.then(
() => {
newEntry.settled = true
},
() => {
newEntry.settled = true

// if the fill throws an error (including abort) and is still in the cache, remove it
this.evict(key, newEntry)
},
)
.catch((error: unknown) => {
// this will only be reached if there is some kind of
// bad bug in this library
console.error(error)
throw error
})

this.cache.set(key, newEntry)
}

static checkSinglePromise<U>(promise: Promise<U>, signal?: AbortSignal) {
// check just this signal for having been aborted, and abort the
// promise if it was, regardless of what happened with the cached
// response
function checkForSingleAbort() {
if (signal?.aborted) {
throw Object.assign(new Error('aborted'), { code: 'ERR_ABORTED' })
}
}

return promise.then(
result => {
checkForSingleAbort()
return result
},
(error: unknown) => {
checkForSingleAbort()
throw error
},
)
}

has(key: string): boolean {
return this.cache.has(key)
}

/**
* Callback for getting status of the pending async
*
* @callback statusCallback
* @param {any} status, current status string or message object
*/

/**
* @param {any} key cache key to use for this request
* @param {any} data data passed as the first argument to the fill callback
* @param {AbortSignal} [signal] optional AbortSignal object that aborts the request
* @param {statusCallback} a callback to get the current status of a pending async operation
*/
get(
key: string,
data: T,
signal?: AbortSignal,
statusCallback?: Callback,
): Promise<U> {
if (!signal && data instanceof AbortSignal) {
throw new TypeError(
'second get argument appears to be an AbortSignal, perhaps you meant to pass `null` for the fill data?',
)
}
const cacheEntry = this.cache.get(key)

if (cacheEntry) {
if (cacheEntry.aborted && !cacheEntry.settled) {
// if it's aborted but has not realized it yet, evict it and redispatch
this.evict(key, cacheEntry)
return this.get(key, data, signal, statusCallback)
}

if (cacheEntry.settled) {
// too late to abort, just return it
return cacheEntry.promise
const currentCount = val.count - 1
if (currentCount === 0) {
val.abortController.abort()
this.inflight.delete(k)
}

// request is in-flight, add this signal to its list of signals,
// or if there is no signal, the aborter will become non-abortable
cacheEntry.aborter.addSignal(signal)
cacheEntry.statusReporter.addCallback(statusCallback)

return AbortablePromiseCache.checkSinglePromise(
cacheEntry.promise,
signal,
)
})
// @ts-expect-error
if (rest.context?.statusCallback) {
// @ts-expect-error
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
val2.statusReporter.addCallback(rest.context.statusCallback)
}

// if we got here, it is not in the cache. fill.
this.fill(key, data, signal, statusCallback)
return AbortablePromiseCache.checkSinglePromise(
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.cache.get(key)!.promise,
signal,
)
return Promise.race([
// @ts-expect-error
super.fetch(k, {
...rest,
signal: val2.abortController.signal,
context: {
...rest.context,
statusCallback: (arg: unknown) => {
const val = this.inflight.get(k)
if (val) {
val.statusReporter.callback(arg)
}
},
},
}),
new Promise<V | undefined>((_resolve, reject) => {
signal?.addEventListener('abort', () => {
reject(new Error('aborted'))
})
}),
])
}

/**
* delete the given entry from the cache. if it exists and its fill request has
* not yet settled, the fill will be signaled to abort.
*
* @param {any} key
*/
delete(key: string) {
const cachedEntry = this.cache.get(key)
if (cachedEntry) {
if (!cachedEntry.settled) {
cachedEntry.aborter.abort()
}
this.cache.delete(key)
delete(key: K) {
const val = this.inflight.get(key)
if (val) {
val.abortController.abort()
}
this.inflight.delete(key)
return super.delete(key)
}

/**
* Clear all requests from the cache. Aborts any that have not settled.
* @returns {number} count of entries deleted
*/
clear() {
// iterate without needing regenerator-runtime
const keyIter = this.cache.keys()
let deleteCount = 0
for (let result = keyIter.next(); !result.done; result = keyIter.next()) {
this.delete(result.value)
deleteCount += 1
for (const val of this.inflight.values()) {
val.abortController.abort()
}
return deleteCount
this.inflight.clear()
super.clear()
}
}
Loading
Loading