diff --git a/config/config.schema.json b/config/config.schema.json index e26e084..70c4755 100644 --- a/config/config.schema.json +++ b/config/config.schema.json @@ -1,6 +1,6 @@ { "$schema": "https://json-schema.org/draft-07/schema", - "$id": "config.schema.json-v0.3.1", + "$id": "config.schema.json-v0.4.0", "title": "Bulletin IT 设置", "type": "object", "properties": { @@ -20,6 +20,23 @@ "type": "number", "default": 90 }, + "fetch": { + "title": "如何获取通知", + "type": "object", + "properties": { + "concurrency": { + "title": "并发数量上限", + "description": "必须是正整数。", + "type": "number", + "default": 5 + }, + "sleep": { + "title": "获取通知后等待的毫秒数", + "type": "number", + "default": 0 + } + } + }, "ding": { "title": "钉钉插件", "type": "object", diff --git a/package-lock.json b/package-lock.json index 4514128..acb5ff0 100644 --- a/package-lock.json +++ b/package-lock.json @@ -15,6 +15,7 @@ "cli-progress": "^3.9.1", "jsdom": "^19.0.0", "node-fetch": "^3.2.10", + "p-map": "^6.0.0", "virtual-bit-network": "github:YDX-2147483647/virtual-BIT-network", "winston": "^3.8.1", "xml": "^1.0.1", @@ -1438,6 +1439,17 @@ "node": ">=14.17" } }, + "node_modules/check-links/node_modules/p-map": { + "version": "5.5.0", + "resolved": "https://registry.npmmirror.com/p-map/-/p-map-5.5.0.tgz", + "integrity": "sha512-VFqfGDHlx87K66yZrNdI4YGtD70IRyd+zSvgks6mzHPRNkoKy+9EKP4SFC77/vTTQYmRmti7dvqC+m5jBrBAcg==", + "dependencies": { + "aggregate-error": "^4.0.0" + }, + "engines": { + "node": ">=12" + } + }, "node_modules/cheerio": { "version": "1.0.0-rc.12", "resolved": "https://registry.npmmirror.com/cheerio/-/cheerio-1.0.0-rc.12.tgz", @@ -4920,14 +4932,11 @@ } }, "node_modules/p-map": { - "version": "5.5.0", - "resolved": "https://registry.npmmirror.com/p-map/-/p-map-5.5.0.tgz", - "integrity": "sha512-VFqfGDHlx87K66yZrNdI4YGtD70IRyd+zSvgks6mzHPRNkoKy+9EKP4SFC77/vTTQYmRmti7dvqC+m5jBrBAcg==", - "dependencies": { - "aggregate-error": "^4.0.0" - }, + "version": "6.0.0", + "resolved": "https://registry.npmmirror.com/p-map/-/p-map-6.0.0.tgz", + "integrity": "sha512-T8BatKGY+k5rU+Q/GTYgrEf2r4xRMevAN5mtXc2aPc4rS1j3s+vWTaO2Wag94neXuCAUAs8cxBL9EeB5EA6diw==", "engines": { - "node": ">=12" + "node": ">=16" } }, "node_modules/p-memoize": { diff --git a/package.json b/package.json index 6dd0aaa..225b78d 100644 --- a/package.json +++ b/package.json @@ -28,6 +28,7 @@ "cli-progress": "^3.9.1", "jsdom": "^19.0.0", "node-fetch": "^3.2.10", + "p-map": "^6.0.0", "virtual-bit-network": "github:YDX-2147483647/virtual-BIT-network", "winston": "^3.8.1", "xml": "^1.0.1", diff --git a/src/core/config.ts b/src/core/config.ts index a69005b..3ed9338 100644 --- a/src/core/config.ts +++ b/src/core/config.ts @@ -11,6 +11,10 @@ interface Config { sources_by_selectors: string json_path: string save_for: number + fetch: { + concurrency: number + sleep: number + } [propName: string]: any } @@ -18,11 +22,16 @@ const defaults: Config = { sources_by_selectors: 'config/sources_by_selectors.json', json_path: 'output/notices.json', save_for: 90, + fetch: { + concurrency: 5, + sleep: 0, + }, } async function _import_config ({ config_path = 'config/config.yml' } = {}): Promise { const file = await readFile(config_path) const given = parse(file.toString()) + // todo: 目前只支持单层覆盖 return Object.assign({}, defaults, given) as Config } diff --git a/src/core/index.ts b/src/core/index.ts index 2f30b03..e853cc3 100644 --- a/src/core/index.ts +++ b/src/core/index.ts @@ -19,5 +19,7 @@ export function update_notices () { write_json_path: config.json_path, sources_by_selectors_path: config.sources_by_selectors, save_for: config.save_for, + fetch_concurrency: config.fetch.concurrency, + fetch_sleep: config.fetch.sleep, }) } diff --git a/src/core/notices/fetcher.ts b/src/core/notices/fetcher.ts index afedee6..0812a2c 100644 --- a/src/core/notices/fetcher.ts +++ b/src/core/notices/fetcher.ts @@ -1,3 +1,6 @@ +import { setTimeout } from 'node:timers/promises' +import pMap from 'p-map' + import type { HookCollectionType } from '../hooks_type.js' import type { Notice, Source } from '../models.js' @@ -5,6 +8,7 @@ import type { Notice, Source } from '../models.js' * 从一系列来源获取通知 * @param options 选项 * @param options.sources + * @param options.concurrency * @param options._hook (internal usage only) `fetch`, `fetch_each` * * Here there are 2 hooks: `fetch` and `fetch_each`. @@ -12,9 +16,11 @@ import type { Notice, Source } from '../models.js' * - `fetch_each`: several parallel sub-processes, fetching each source. */ export async function fetch_all_sources ({ - sources, _hook, + sources, concurrency, sleep, _hook, }: { sources: Source[], + concurrency: number, + sleep: number, _hook: HookCollectionType, }): Promise<{ notices: Notice[] }> { return await _hook( @@ -24,6 +30,7 @@ export async function fetch_all_sources ({ // First create a non-hook version. async function fetch_each ({ source }: { source: Source }): Promise<{ notices: Notice[] }> { const notices = await source.fetch_notice({ _hook }) + await setTimeout(sleep) return { notices } } // Then wrap it with the hook. @@ -31,8 +38,7 @@ export async function fetch_all_sources ({ return _hook('fetch_each', fetch_each, { source: s, ...options }) } // Call `fetch_each` in parallel. - const notices_grouped = await Promise.all( - sources.map(fetch_each_hooked)) + const notices_grouped = await pMap(sources, fetch_each_hooked, { concurrency }) // Ignore `undefined`. // (If `fetch_each` has an error hook, we may get here even though there's nothing.) diff --git a/src/core/update_notices.ts b/src/core/update_notices.ts index d005ac4..0a329b7 100644 --- a/src/core/update_notices.ts +++ b/src/core/update_notices.ts @@ -13,15 +13,19 @@ type UpdateNoticesOptions = { write_json_path: string sources_by_selectors_path: string save_for: number + fetch_concurrency: number + fetch_sleep: number } async function _update_notices ({ - sources_by_selectors_path, read_json_path, write_json_path, save_for, + sources_by_selectors_path, read_json_path, write_json_path, save_for, fetch_concurrency, fetch_sleep, _hook, ...options }: { _hook: HookCollectionType } & UpdateNoticesOptions) { const sources = await import_sources({ sources_by_selectors_path }) const { notices: latest_notices } = await fetch_all_sources({ sources, + concurrency: fetch_concurrency, + sleep: fetch_sleep, _hook, ...options, })