Skip to content

Commit

Permalink
protocol: add timeout while waiting for messages
Browse files Browse the repository at this point in the history
Add a maximum timeout between messages. Since the server sends an
heartbeat message every 30 seconds, not receiving any message for 45
seconds means the client and server are disconnected.
  • Loading branch information
fracek committed Mar 4, 2023
1 parent d70a28f commit 51c3c9c
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 2 deletions.
5 changes: 5 additions & 0 deletions .changeset/yellow-spies-hammer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@apibara/protocol': patch
---

Handle client-server connection hanging
22 changes: 20 additions & 2 deletions packages/protocol/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ export { ChannelCredentials, StatusObject } from '@grpc/grpc-js'

const StreamService = v1alpha2.protoDescriptor.apibara.node.v1alpha2.Stream

// Server produces an heartbeat every 30 seconds, so we use 45 seconds as a timeout.
const MESSAGE_TIMEOUT_MS = 45_000

export type DataStream = ClientDuplexStream<
v1alpha2.IStreamDataRequest,
v1alpha2.IStreamDataResponse
Expand Down Expand Up @@ -157,14 +160,27 @@ export class StreamClient {
while (true) {
let retryCount = 1
let cursor = null
let clock
try {
// this check is to make ts happy
if (!this.stream) {
throw new Error('Stream disconnected unexpectedly')
}

for await (const message of this.stream) {
const messageTyped = message as v1alpha2.IStreamDataResponse
const streamIter = this.stream[Symbol.asyncIterator]()
while (true) {
const timeout = new Promise((_, reject) => {
clock = setTimeout(() => {
reject(new Error('Stream timed out'))
}, MESSAGE_TIMEOUT_MS)
})

const message = <IteratorResult<v1alpha2.IStreamDataResponse>>(
await Promise.race([streamIter.next(), timeout])
)
const messageTyped = message.value as v1alpha2.IStreamDataResponse

clearTimeout(clock)

// only return messages if they are with the most recently configured stream
if (messageTyped.streamId?.toString() == this.stream_id.toString()) {
Expand All @@ -182,6 +198,8 @@ export class StreamClient {
}
}
} catch (err: any) {
clearTimeout(clock)

const isGrpcError =
err.hasOwnProperty('code') &&
err.hasOwnProperty('details') &&
Expand Down

0 comments on commit 51c3c9c

Please sign in to comment.