Skip to content

Commit

Permalink
feat: Fetch with limited concurrency
Browse files Browse the repository at this point in the history
- `fetch_all_sources`时,不再一下子发起所有请求,而一点一点发起,限制并发数量上限。
- 配置文件新增`fetch_concurrency: number`。

Resolves #45
  • Loading branch information
YDX-2147483647 committed Oct 9, 2023
1 parent 6046f36 commit f2e20f7
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 12 deletions.
8 changes: 7 additions & 1 deletion config/config.schema.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"$schema": "https://json-schema.org/draft-07/schema",
"$id": "config.schema.json-v0.3.1",
"$id": "config.schema.json-v0.4.0",
"title": "Bulletin IT 设置",
"type": "object",
"properties": {
Expand All @@ -20,6 +20,12 @@
"type": "number",
"default": 90
},
"fetch_concurrency": {
"title": "获取通知时并发数量上限",
"description": "必须是正整数。",
"type": "number",
"default": 5
},
"ding": {
"title": "钉钉插件",
"type": "object",
Expand Down
23 changes: 16 additions & 7 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"cli-progress": "^3.9.1",
"jsdom": "^19.0.0",
"node-fetch": "^3.2.10",
"p-map": "^6.0.0",
"virtual-bit-network": "github:YDX-2147483647/virtual-BIT-network",
"winston": "^3.8.1",
"xml": "^1.0.1",
Expand Down
2 changes: 2 additions & 0 deletions src/core/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ interface Config {
sources_by_selectors: string
json_path: string
save_for: number
fetch_concurrency: number
[propName: string]: any
}

const defaults: Config = {
sources_by_selectors: 'config/sources_by_selectors.json',
json_path: 'output/notices.json',
save_for: 90,
fetch_concurrency: 5,
}

async function _import_config ({ config_path = 'config/config.yml' } = {}): Promise<Config> {
Expand Down
1 change: 1 addition & 0 deletions src/core/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ export function update_notices () {
write_json_path: config.json_path,
sources_by_selectors_path: config.sources_by_selectors,
save_for: config.save_for,
fetch_concurrency: config.fetch_concurrency,
})
}
9 changes: 6 additions & 3 deletions src/core/notices/fetcher.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
import pMap from 'p-map'

import type { HookCollectionType } from '../hooks_type.js'
import type { Notice, Source } from '../models.js'

/**
* 从一系列来源获取通知
* @param options 选项
* @param options.sources
* @param options.concurrency
* @param options._hook (internal usage only) `fetch`, `fetch_each`
*
* Here there are 2 hooks: `fetch` and `fetch_each`.
* - `fetch`: the whole process, fetching all of the sources.
* - `fetch_each`: several parallel sub-processes, fetching each source.
*/
export async function fetch_all_sources ({
sources, _hook,
sources, concurrency, _hook,
}: {
sources: Source[],
concurrency: number,
_hook: HookCollectionType,
}): Promise<{ notices: Notice[] }> {
return await _hook(
Expand All @@ -31,8 +35,7 @@ export async function fetch_all_sources ({
return _hook('fetch_each', fetch_each, { source: s, ...options })
}
// Call `fetch_each` in parallel.
const notices_grouped = await Promise.all(
sources.map(fetch_each_hooked))
const notices_grouped = await pMap(sources, fetch_each_hooked, { concurrency })

// Ignore `undefined`.
// (If `fetch_each` has an error hook, we may get here even though there's nothing.)
Expand Down
4 changes: 3 additions & 1 deletion src/core/update_notices.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@ type UpdateNoticesOptions = {
write_json_path: string
sources_by_selectors_path: string
save_for: number
fetch_concurrency: number
}

async function _update_notices ({
sources_by_selectors_path, read_json_path, write_json_path, save_for,
sources_by_selectors_path, read_json_path, write_json_path, save_for, fetch_concurrency,
_hook, ...options
}: { _hook: HookCollectionType } & UpdateNoticesOptions) {
const sources = await import_sources({ sources_by_selectors_path })
const { notices: latest_notices } = await fetch_all_sources({
sources,
concurrency: fetch_concurrency,
_hook,
...options,
})
Expand Down

0 comments on commit f2e20f7

Please sign in to comment.