Skip to content

Commit

Permalink
protocol: fix stream client to yield heartbeat messages
Browse files Browse the repository at this point in the history
  • Loading branch information
fracek committed Jan 11, 2024
1 parent 8608ba8 commit 97a7c35
Show file tree
Hide file tree
Showing 8 changed files with 2,082 additions and 1,814 deletions.
5 changes: 5 additions & 0 deletions .changeset/beige-singers-smile.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@apibara/protocol": patch
---

Fix stream client to yield heartbeat messages
5 changes: 1 addition & 4 deletions biome.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"formatter": {
"enabled": true,
"formatWithErrors": false,
"indentSize": 2,
"indentWidth": 2,
"indentStyle": "space",
"lineWidth": 80
},
Expand All @@ -22,9 +22,6 @@
"semicolons": "always"
}
},
"organizeImports": {
"enabled": true
},
"files": {
"ignore": [
"**/node_modules",
Expand Down
12 changes: 9 additions & 3 deletions packages/protocol/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,12 @@ export class StreamClient {

clearTimeout(clock);

// only return messages if they are with the most recently configured stream
if (messageTyped.streamId?.toString() === this.stream_id.toString()) {
if (messageTyped.message === "heartbeat") {
yield messageTyped;
} else if (
messageTyped.streamId?.toString() === this.stream_id.toString()
) {
// only return messages if they are with the most recently configured stream
// reset retry count on new message
retryCount = 1;

Expand All @@ -230,7 +234,9 @@ export class StreamClient {
clearTimeout(clock);

const isGrpcError =
err.hasOwn("code") && err.hasOwn("details") && err.hasOwn("metadata");
Object.hasOwn(err, "code") &&
Object.hasOwn(err, "details") &&
Object.hasOwn(err, "metadata");

// non-grpc error, so just bubble it up
if (!isGrpcError) {
Expand Down
6 changes: 3 additions & 3 deletions packages/protocol/src/proto/apibara/node/v1alpha2/Data.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
// Original file: src/proto/stream.proto

import type { Cursor as _apibara_node_v1alpha2_Cursor, Cursor__Output as _apibara_node_v1alpha2_Cursor__Output } from '../../../apibara/node/v1alpha2/Cursor';
import type { DataFinality as _apibara_node_v1alpha2_DataFinality } from '../../../apibara/node/v1alpha2/DataFinality';
import type { DataFinality as _apibara_node_v1alpha2_DataFinality, DataFinality__Output as _apibara_node_v1alpha2_DataFinality__Output } from '../../../apibara/node/v1alpha2/DataFinality';

export interface Data {
'endCursor'?: (_apibara_node_v1alpha2_Cursor | null);
'finality'?: (_apibara_node_v1alpha2_DataFinality | keyof typeof _apibara_node_v1alpha2_DataFinality);
'finality'?: (_apibara_node_v1alpha2_DataFinality);
'data'?: (Buffer | Uint8Array | string)[];
'cursor'?: (_apibara_node_v1alpha2_Cursor | null);
}

export interface Data__Output {
'endCursor': (_apibara_node_v1alpha2_Cursor__Output | null);
'finality': (keyof typeof _apibara_node_v1alpha2_DataFinality);
'finality': (_apibara_node_v1alpha2_DataFinality__Output);
'data': (Uint8Array)[];
'cursor': (_apibara_node_v1alpha2_Cursor__Output | null);
}
24 changes: 18 additions & 6 deletions packages/protocol/src/proto/apibara/node/v1alpha2/DataFinality.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,20 @@
// Original file: src/proto/stream.proto

export enum DataFinality {
DATA_STATUS_UNKNOWN = 0,
DATA_STATUS_PENDING = 1,
DATA_STATUS_ACCEPTED = 2,
DATA_STATUS_FINALIZED = 3,
}
export const DataFinality = {
DATA_STATUS_UNKNOWN: 'DATA_STATUS_UNKNOWN',
DATA_STATUS_PENDING: 'DATA_STATUS_PENDING',
DATA_STATUS_ACCEPTED: 'DATA_STATUS_ACCEPTED',
DATA_STATUS_FINALIZED: 'DATA_STATUS_FINALIZED',
} as const;

export type DataFinality =
| 'DATA_STATUS_UNKNOWN'
| 0
| 'DATA_STATUS_PENDING'
| 1
| 'DATA_STATUS_ACCEPTED'
| 2
| 'DATA_STATUS_FINALIZED'
| 3

export type DataFinality__Output = typeof DataFinality[keyof typeof DataFinality]
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
// Original file: src/proto/stream.proto

import type { Cursor as _apibara_node_v1alpha2_Cursor, Cursor__Output as _apibara_node_v1alpha2_Cursor__Output } from '../../../apibara/node/v1alpha2/Cursor';
import type { DataFinality as _apibara_node_v1alpha2_DataFinality } from '../../../apibara/node/v1alpha2/DataFinality';
import type { DataFinality as _apibara_node_v1alpha2_DataFinality, DataFinality__Output as _apibara_node_v1alpha2_DataFinality__Output } from '../../../apibara/node/v1alpha2/DataFinality';
import type { Long } from '@grpc/proto-loader';

export interface StreamDataRequest {
'streamId'?: (number | string | Long);
'batchSize'?: (number | string | Long);
'startingCursor'?: (_apibara_node_v1alpha2_Cursor | null);
'finality'?: (_apibara_node_v1alpha2_DataFinality | keyof typeof _apibara_node_v1alpha2_DataFinality);
'finality'?: (_apibara_node_v1alpha2_DataFinality);
'filter'?: (Buffer | Uint8Array | string);
'_streamId'?: "streamId";
'_batchSize'?: "batchSize";
Expand All @@ -19,7 +19,7 @@ export interface StreamDataRequest__Output {
'streamId'?: (Long);
'batchSize'?: (Long);
'startingCursor': (_apibara_node_v1alpha2_Cursor__Output | null);
'finality'?: (keyof typeof _apibara_node_v1alpha2_DataFinality);
'finality'?: (_apibara_node_v1alpha2_DataFinality__Output);
'filter': (Uint8Array);
'_streamId': "streamId";
'_batchSize': "batchSize";
Expand Down
Loading

0 comments on commit 97a7c35

Please sign in to comment.