Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

detect when kafkajs is stuck with stale broker metadata #186

Merged
merged 2 commits into from
Sep 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/kafka/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@walmartlabs/cookie-cutter-kafka",
"version": "1.3.0-beta.3",
"version": "1.3.0-beta.4",
"license": "Apache-2.0",
"main": "dist/index.js",
"types": "dist/index.d.ts",
Expand Down
23 changes: 23 additions & 0 deletions packages/kafka/src/KafkaConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ export class KafkaConsumer implements IRequireInitialization, IDisposable {
private offsetCommitIntervalMs: number;
private timer: NodeJS.Timer;
private groupEpoch: number = 0;
private brokerMetadataErrors: number = 0;

constructor(config: KafkaConsumerConfig) {
this.config = config;
Expand Down Expand Up @@ -278,6 +279,7 @@ export class KafkaConsumer implements IRequireInitialization, IDisposable {
// tslint:disable-next-line:no-floating-promises
this.commitOffsetsIfNecessary();
}, 0);
this.timer.unref();
}

await this.consumer.run({
Expand Down Expand Up @@ -407,15 +409,36 @@ export class KafkaConsumer implements IRequireInitialization, IDisposable {
this.metrics.gauge(KafkaMetrics.Lag, lag.toNumber(), tags);
}
}

this.brokerMetadataErrors = 0;
} catch (e) {
this.logger.warn("Unable to retrieve watermarks", e);

// this is a workaround for https://github.com/walmartlabs/cookie-cutter/issues/185
// until a fix for kafkajs is available / the root cause of the problem is confirmed
if (
e &&
e.toString().contains("server is not the leader for that topic-partition")
) {
this.brokerMetadataErrors++;
}

if (this.brokerMetadataErrors > 10) {
this.logger.error("detected stale broker metadata in kafkajs");
this.brokerMetadataErrors = 0;
plameniv marked this conversation as resolved.
Show resolved Hide resolved

// throwing an error here will cause a UnhandledPromiseException
// and terminate the application
throw new Error("stale broker metadata");
}
}
}
} finally {
this.timer = setTimeout(() => {
// tslint:disable-next-line:no-floating-promises
this.commitOffsetsIfNecessary();
}, this.offsetCommitIntervalMs);
this.timer.unref();
}
}

Expand Down