-
Notifications
You must be signed in to change notification settings - Fork 10
/
clientSSE.go.tmpl
127 lines (118 loc) · 3.64 KB
/
clientSSE.go.tmpl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
{{ define "sse" }}
const sseResponse = async (
res: Response,
options: WebrpcStreamOptions<any>,
retryFetch: () => Promise<void>
) => {
const {onMessage, onOpen, onClose, onError} = options;
if (!res.ok) {
try {
await buildResponse(res);
} catch (error) {
// @ts-ignore
onError(error, retryFetch);
}
return;
}
if (!res.body) {
onError(
WebrpcBadResponseError.new({
status: res.status,
cause: "Invalid response, missing body",
}),
retryFetch
);
return;
}
onOpen && onOpen();
const reader = res.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
let lastReadTime = Date.now();
const timeout = (10 + 1) * 1000;
let timeoutError = false
const intervalId = setInterval(() => {
if (Date.now() - lastReadTime > timeout) {
timeoutError = true
clearInterval(intervalId)
reader.releaseLock()
}
}, timeout);
while (true) {
let value;
let done;
try {
({value, done} = await reader.read());
if (timeoutError) throw new Error("Timeout, no data or heartbeat received")
lastReadTime = Date.now();
buffer += decoder.decode(value, {stream: true});
} catch (error) {
let message = "";
if (error instanceof Error) {
message = error.message;
}
if (error instanceof DOMException && error.name === "AbortError") {
onError(
WebrpcRequestFailedError.new({
message: "AbortError",
cause: `AbortError: ${message}`,
}),
() => {
throw new Error("Abort signal cannot be used to reconnect");
}
);
} else {
onError(
WebrpcStreamLostError.new({
cause: `reader.read(): ${message}`,
}),
retryFetch
);
}
return;
}
let lines = buffer.split("\n");
for (let i = 0; i < lines.length - 1; i++) {
if (lines[i].length == 0) {
continue;
}
let data: any;
try {
data = JSON.parse(lines[i]);
if (data.hasOwnProperty("webrpcError")) {
const error = data.webrpcError;
const code: number =
typeof error.code === "number" ? error.code : 0;
onError(
(webrpcErrorByCode[code] || WebrpcError).new(error),
retryFetch
);
return;
}
} catch (error) {
if (
error instanceof Error &&
error.message === "Abort signal cannot be used to reconnect"
) {
throw error;
}
onError(
WebrpcBadResponseError.new({
status: res.status,
// @ts-ignore
cause: `JSON.parse(): ${error.message}`,
}),
retryFetch
);
}
onMessage(data);
}
if (!done) {
buffer = lines[lines.length - 1];
continue;
}
onClose && onClose();
return;
}
};
{{ end }}