From 12fa5950b855b8dbd0caf6263e8014424db437c8 Mon Sep 17 00:00:00 2001 From: TBXark Date: Sun, 8 Oct 2023 14:13:47 +0800 Subject: [PATCH] =?UTF-8?q?perf:=20#216=20=E4=BD=BF=E7=94=A8=E6=9B=B4?= =?UTF-8?q?=E5=8A=A0=E7=B2=BE=E5=87=86=E7=9A=84=E6=B5=81=E8=A7=A3=E6=9E=90?= =?UTF-8?q?=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dist/buildinfo.json | 2 +- dist/index.js | 247 +++++++++++++++++++++++++++++++++++------- dist/timestamp | 2 +- src/openai.js | 54 ++------- src/vendors/stream.js | 224 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 441 insertions(+), 88 deletions(-) create mode 100644 src/vendors/stream.js diff --git a/dist/buildinfo.json b/dist/buildinfo.json index f385c236..a2c45d01 100644 --- a/dist/buildinfo.json +++ b/dist/buildinfo.json @@ -1 +1 @@ -{"sha": "0ab2830", "timestamp": 1696670444} +{"sha": "9d714ee", "timestamp": 1696745505} diff --git a/dist/index.js b/dist/index.js index 2517ddec..8d2fbdc9 100644 --- a/dist/index.js +++ b/dist/index.js @@ -41,9 +41,9 @@ var ENV = { // 检查更新的分支 UPDATE_BRANCH: "master", // 当前版本 - BUILD_TIMESTAMP: 1696670444, + BUILD_TIMESTAMP: 1696745505, // 当前版本 commit id - BUILD_VERSION: "0ab2830", + BUILD_VERSION: "9d714ee", I18N: null, LANGUAGE: "zh-cn", // 使用流模式 @@ -516,28 +516,208 @@ async function getBot(token) { } } -// src/openai.js -function extractContentFromStreamData(stream) { - const line = stream.split("\n"); - let remainingStr = ""; - let contentStr = ""; - for (const l of line) { +// src/vendors/stream.js +var Stream = class { + constructor(response, controller) { + this.response = response; + this.controller = controller; + this.decoder = new SSEDecoder(); + } + async *iterMessages() { + if (!this.response.body) { + this.controller.abort(); + throw new Error(`Attempted to iterate over a response with no body`); + } + const lineDecoder = new LineDecoder(); + const iter = readableStreamAsyncIterable(this.response.body); + for await (const chunk of iter) { + for (const line of lineDecoder.decode(chunk)) { + const sse = this.decoder.decode(line); + if (sse) + yield sse; + } + } + for (const line of lineDecoder.flush()) { + const sse = this.decoder.decode(line); + if (sse) + yield sse; + } + } + async *[Symbol.asyncIterator]() { + let done = false; try { - if (l.startsWith("data:") && l.endsWith("}")) { - const data = JSON.parse(l.substring(5)); - contentStr += data.choices[0].delta?.content || ""; - } else { - remainingStr = l; + for await (const sse of this.iterMessages()) { + if (done) + continue; + if (sse.data.startsWith("[DONE]")) { + done = true; + continue; + } + if (sse.event === null) { + try { + yield JSON.parse(sse.data); + } catch (e) { + console.error(`Could not parse message into JSON:`, sse.data); + console.error(`From chunk:`, sse.raw); + throw e; + } + } } + done = true; } catch (e) { - remainingStr = l; + if (e instanceof Error && e.name === "AbortError") + return; + throw e; + } finally { + if (!done) + this.controller.abort(); + } + } +}; +var SSEDecoder = class { + constructor() { + this.event = null; + this.data = []; + this.chunks = []; + } + decode(line) { + if (line.endsWith("\r")) { + line = line.substring(0, line.length - 1); + } + if (!line) { + if (!this.event && !this.data.length) + return null; + const sse = { + event: this.event, + data: this.data.join("\n"), + raw: this.chunks + }; + this.event = null; + this.data = []; + this.chunks = []; + return sse; + } + this.chunks.push(line); + if (line.startsWith(":")) { + return null; + } + let [fieldname, _, value] = partition(line, ":"); + if (value.startsWith(" ")) { + value = value.substring(1); + } + if (fieldname === "event") { + this.event = value; + } else if (fieldname === "data") { + this.data.push(value); + } + return null; + } +}; +var LineDecoder = class { + constructor() { + this.buffer = []; + this.trailingCR = false; + } + decode(chunk) { + let text = this.decodeText(chunk); + if (this.trailingCR) { + text = "\r" + text; + this.trailingCR = false; + } + if (text.endsWith("\r")) { + this.trailingCR = true; + text = text.slice(0, -1); + } + if (!text) { + return []; + } + const trailingNewline = LineDecoder.NEWLINE_CHARS.has(text[text.length - 1] || ""); + let lines = text.split(LineDecoder.NEWLINE_REGEXP); + if (lines.length === 1 && !trailingNewline) { + this.buffer.push(lines[0]); + return []; + } + if (this.buffer.length > 0) { + lines = [this.buffer.join("") + lines[0], ...lines.slice(1)]; + this.buffer = []; + } + if (!trailingNewline) { + this.buffer = [lines.pop() || ""]; + } + return lines; + } + decodeText(bytes) { + var _a; + if (bytes == null) + return ""; + if (typeof bytes === "string") + return bytes; + if (typeof Buffer !== "undefined") { + if (bytes instanceof Buffer) { + return bytes.toString(); + } + if (bytes instanceof Uint8Array) { + return Buffer.from(bytes).toString(); + } + throw new Error(`Unexpected: received non-Uint8Array (${bytes.constructor.name}) stream chunk in an environment with a global "Buffer" defined, which this library assumes to be Node. Please report this error.`); + } + if (typeof TextDecoder !== "undefined") { + if (bytes instanceof Uint8Array || bytes instanceof ArrayBuffer) { + (_a = this.textDecoder) !== null && _a !== void 0 ? _a : this.textDecoder = new TextDecoder("utf8"); + return this.textDecoder.decode(bytes); + } + throw new Error(`Unexpected: received non-Uint8Array/ArrayBuffer (${bytes.constructor.name}) in a web platform. Please report this error.`); + } + throw new Error(`Unexpected: neither Buffer nor TextDecoder are available as globals. Please report this error.`); + } + flush() { + if (!this.buffer.length && !this.trailingCR) { + return []; } + const lines = [this.buffer.join("")]; + this.buffer = []; + this.trailingCR = false; + return lines; } +}; +LineDecoder.NEWLINE_CHARS = /* @__PURE__ */ new Set(["\n", "\r", "\v", "\f", "", "", "", "\x85", "\u2028", "\u2029"]); +LineDecoder.NEWLINE_REGEXP = /\r\n|[\n\r\x0b\x0c\x1c\x1d\x1e\x85\u2028\u2029]/g; +function partition(str, delimiter) { + const index = str.indexOf(delimiter); + if (index !== -1) { + return [str.substring(0, index), delimiter, str.substring(index + delimiter.length)]; + } + return [str, "", ""]; +} +function readableStreamAsyncIterable(stream) { + if (stream[Symbol.asyncIterator]) + return stream; + const reader = stream.getReader(); return { - content: contentStr, - pending: remainingStr + async next() { + try { + const result = await reader.read(); + if (result === null || result === void 0 ? void 0 : result.done) + reader.releaseLock(); + return result; + } catch (e) { + reader.releaseLock(); + throw e; + } + }, + async return() { + const cancelPromise = reader.cancel(); + reader.releaseLock(); + await cancelPromise; + return { done: true, value: void 0 }; + }, + [Symbol.asyncIterator]() { + return this; + } }; } + +// src/openai.js function isOpenAIEnable(context) { const key = context.openAIKeyFromContext(); return key && key.length > 0; @@ -572,34 +752,19 @@ async function requestCompletionsFromOpenAI(message, history, context, onStream) signal }); if (onStream && resp.ok && resp.headers.get("content-type").indexOf("text/event-stream") !== -1) { - const reader = resp.body.getReader({ mode: "byob" }); - const decoder = new TextDecoder("utf-8"); - let data = { done: false }; - let pendingText = ""; + const stream = new Stream(resp, controller); let contentFull = ""; let lengthDelta = 0; let updateStep = 20; - while (data.done === false) { - try { - data = await reader.readAtLeast(4096, new Uint8Array(5e3)); - pendingText += decoder.decode(data.value); - const content = extractContentFromStreamData(pendingText); - pendingText = content.pending; - lengthDelta += content.content.length; - contentFull = contentFull + content.content; - if (lengthDelta > updateStep) { - lengthDelta = 0; - updateStep += 5; - await onStream(`${contentFull} + for await (const data of stream) { + const c = data.choices[0].delta?.content || ""; + lengthDelta += c.length; + contentFull = contentFull + c; + if (lengthDelta > updateStep) { + lengthDelta = 0; + updateStep += 5; + await onStream(`${contentFull} ${ENV.I18N.message.loading}...`); - } - } catch (e) { - contentFull += ` - -[ERROR]: ${e.message} - -`; - break; } } return contentFull; @@ -1341,7 +1506,7 @@ var Ai = class { // src/workers-ai.js function isWorkersAIEnable(context) { - return AI !== null; + return AI && AI.fetch; } async function requestCompletionsFromWorkersAI(message, history, context, onStream) { const ai = new Ai(AI); diff --git a/dist/timestamp b/dist/timestamp index 5a60800f..f1c01cab 100644 --- a/dist/timestamp +++ b/dist/timestamp @@ -1 +1 @@ -1696670444 +1696745505 diff --git a/src/openai.js b/src/openai.js index 6ebf5d52..9ad67fae 100644 --- a/src/openai.js +++ b/src/openai.js @@ -1,33 +1,8 @@ /* eslint-disable no-unused-vars */ import {Context} from './context.js'; import {DATABASE, ENV} from './env.js'; +import {Stream} from "./vendors/stream.js"; -/** - * 从流数据中提取内容 - * @param {string} stream - * @return {{pending: string, content: string}} - */ -function extractContentFromStreamData(stream) { - const line = stream.split('\n'); - let remainingStr = ''; - let contentStr = ''; - for (const l of line) { - try { - if (l.startsWith('data:') && l.endsWith('}')) { - const data = JSON.parse(l.substring(5)); - contentStr += data.choices[0].delta?.content || ''; - } else { - remainingStr = l; - } - } catch (e) { - remainingStr = l; - } - } - return { - content: contentStr, - pending: remainingStr, - }; -} /** * @return {boolean} @@ -80,30 +55,19 @@ export async function requestCompletionsFromOpenAI(message, history, context, on signal, }); if (onStream && resp.ok && resp.headers.get('content-type').indexOf('text/event-stream') !== -1) { - const reader = resp.body.getReader({mode: 'byob'}); - const decoder = new TextDecoder('utf-8'); - let data = {done: false}; - let pendingText = ''; + const stream = new Stream(resp, controller) let contentFull = ''; let lengthDelta = 0; let updateStep = 20; - while (data.done === false) { - try { - data = await reader.readAtLeast(4096, new Uint8Array(5000)); - pendingText += decoder.decode(data.value); - const content = extractContentFromStreamData(pendingText); - pendingText = content.pending; - lengthDelta += content.content.length; - contentFull = contentFull + content.content; + for await (const data of stream) { + const c = data.choices[0].delta?.content || '' + lengthDelta += c.length; + contentFull = contentFull + c; if (lengthDelta > updateStep) { - lengthDelta = 0; - updateStep += 5; - await onStream(`${contentFull}\n${ENV.I18N.message.loading}...`); + lengthDelta = 0; + updateStep += 5; + await onStream(`${contentFull}\n${ENV.I18N.message.loading}...`); } - } catch (e) { - contentFull += `\n\n[ERROR]: ${e.message}\n\n`; - break; - } } return contentFull; } diff --git a/src/vendors/stream.js b/src/vendors/stream.js new file mode 100644 index 00000000..b77d9095 --- /dev/null +++ b/src/vendors/stream.js @@ -0,0 +1,224 @@ +/* eslint-disable */ + +export class Stream { + constructor(response, controller) { + this.response = response; + this.controller = controller; + this.decoder = new SSEDecoder(); + } + async *iterMessages() { + if (!this.response.body) { + this.controller.abort(); + throw new Error(`Attempted to iterate over a response with no body`); + } + const lineDecoder = new LineDecoder(); + const iter = readableStreamAsyncIterable(this.response.body); + for await (const chunk of iter) { + for (const line of lineDecoder.decode(chunk)) { + const sse = this.decoder.decode(line); + if (sse) + yield sse; + } + } + for (const line of lineDecoder.flush()) { + const sse = this.decoder.decode(line); + if (sse) + yield sse; + } + } + async *[Symbol.asyncIterator]() { + let done = false; + try { + for await (const sse of this.iterMessages()) { + if (done) + continue; + if (sse.data.startsWith('[DONE]')) { + done = true; + continue; + } + if (sse.event === null) { + try { + yield JSON.parse(sse.data); + } + catch (e) { + console.error(`Could not parse message into JSON:`, sse.data); + console.error(`From chunk:`, sse.raw); + throw e; + } + } + } + done = true; + } + catch (e) { + // If the user calls `stream.controller.abort()`, we should exit without throwing. + if (e instanceof Error && e.name === 'AbortError') + return; + throw e; + } + finally { + // If the user `break`s, abort the ongoing request. + if (!done) + this.controller.abort(); + } + } +} +class SSEDecoder { + constructor() { + this.event = null; + this.data = []; + this.chunks = []; + } + decode(line) { + if (line.endsWith('\r')) { + line = line.substring(0, line.length - 1); + } + if (!line) { + // empty line and we didn't previously encounter any messages + if (!this.event && !this.data.length) + return null; + const sse = { + event: this.event, + data: this.data.join('\n'), + raw: this.chunks, + }; + this.event = null; + this.data = []; + this.chunks = []; + return sse; + } + this.chunks.push(line); + if (line.startsWith(':')) { + return null; + } + let [fieldname, _, value] = partition(line, ':'); + if (value.startsWith(' ')) { + value = value.substring(1); + } + if (fieldname === 'event') { + this.event = value; + } + else if (fieldname === 'data') { + this.data.push(value); + } + return null; + } +} +/** + * A re-implementation of httpx's `LineDecoder` in Python that handles incrementally + * reading lines from text. + * + * https://github.com/encode/httpx/blob/920333ea98118e9cf617f246905d7b202510941c/httpx/_decoders.py#L258 + */ +class LineDecoder { + constructor() { + this.buffer = []; + this.trailingCR = false; + } + decode(chunk) { + let text = this.decodeText(chunk); + if (this.trailingCR) { + text = '\r' + text; + this.trailingCR = false; + } + if (text.endsWith('\r')) { + this.trailingCR = true; + text = text.slice(0, -1); + } + if (!text) { + return []; + } + const trailingNewline = LineDecoder.NEWLINE_CHARS.has(text[text.length - 1] || ''); + let lines = text.split(LineDecoder.NEWLINE_REGEXP); + if (lines.length === 1 && !trailingNewline) { + this.buffer.push(lines[0]); + return []; + } + if (this.buffer.length > 0) { + lines = [this.buffer.join('') + lines[0], ...lines.slice(1)]; + this.buffer = []; + } + if (!trailingNewline) { + this.buffer = [lines.pop() || '']; + } + return lines; + } + decodeText(bytes) { + var _a; + if (bytes == null) + return ''; + if (typeof bytes === 'string') + return bytes; + // Node: + if (typeof Buffer !== 'undefined') { + if (bytes instanceof Buffer) { + return bytes.toString(); + } + if (bytes instanceof Uint8Array) { + return Buffer.from(bytes).toString(); + } + throw new Error(`Unexpected: received non-Uint8Array (${bytes.constructor.name}) stream chunk in an environment with a global "Buffer" defined, which this library assumes to be Node. Please report this error.`); + } + // Browser + if (typeof TextDecoder !== 'undefined') { + if (bytes instanceof Uint8Array || bytes instanceof ArrayBuffer) { + (_a = this.textDecoder) !== null && _a !== void 0 ? _a : (this.textDecoder = new TextDecoder('utf8')); + return this.textDecoder.decode(bytes); + } + throw new Error(`Unexpected: received non-Uint8Array/ArrayBuffer (${bytes.constructor.name}) in a web platform. Please report this error.`); + } + throw new Error(`Unexpected: neither Buffer nor TextDecoder are available as globals. Please report this error.`); + } + flush() { + if (!this.buffer.length && !this.trailingCR) { + return []; + } + const lines = [this.buffer.join('')]; + this.buffer = []; + this.trailingCR = false; + return lines; + } +} +// prettier-ignore +LineDecoder.NEWLINE_CHARS = new Set(['\n', '\r', '\x0b', '\x0c', '\x1c', '\x1d', '\x1e', '\x85', '\u2028', '\u2029']); +LineDecoder.NEWLINE_REGEXP = /\r\n|[\n\r\x0b\x0c\x1c\x1d\x1e\x85\u2028\u2029]/g; +function partition(str, delimiter) { + const index = str.indexOf(delimiter); + if (index !== -1) { + return [str.substring(0, index), delimiter, str.substring(index + delimiter.length)]; + } + return [str, '', '']; +} +/** + * Most browsers don't yet have async iterable support for ReadableStream, + * and Node has a very different way of reading bytes from its "ReadableStream". + * + * This polyfill was pulled from https://github.com/MattiasBuelens/web-streams-polyfill/pull/122#issuecomment-1627354490 + */ +function readableStreamAsyncIterable(stream) { + if (stream[Symbol.asyncIterator]) + return stream; + const reader = stream.getReader(); + return { + async next() { + try { + const result = await reader.read(); + if (result === null || result === void 0 ? void 0 : result.done) + reader.releaseLock(); // release lock when stream becomes closed + return result; + } + catch (e) { + reader.releaseLock(); // release lock when stream becomes errored + throw e; + } + }, + async return() { + const cancelPromise = reader.cancel(); + reader.releaseLock(); + await cancelPromise; + return { done: true, value: undefined }; + }, + [Symbol.asyncIterator]() { + return this; + }, + }; +}