diff --git a/packages/kafka/package.json b/packages/kafka/package.json index c8aa0e99..04b50a09 100644 --- a/packages/kafka/package.json +++ b/packages/kafka/package.json @@ -1,6 +1,6 @@ { "name": "@walmartlabs/cookie-cutter-kafka", - "version": "1.3.0-beta.3", + "version": "1.3.0-beta.4", "license": "Apache-2.0", "main": "dist/index.js", "types": "dist/index.d.ts", diff --git a/packages/kafka/src/KafkaConsumer.ts b/packages/kafka/src/KafkaConsumer.ts index bdd47077..04308178 100644 --- a/packages/kafka/src/KafkaConsumer.ts +++ b/packages/kafka/src/KafkaConsumer.ts @@ -79,6 +79,7 @@ export class KafkaConsumer implements IRequireInitialization, IDisposable { private offsetCommitIntervalMs: number; private timer: NodeJS.Timer; private groupEpoch: number = 0; + private brokerMetadataErrors: number = 0; constructor(config: KafkaConsumerConfig) { this.config = config; @@ -278,6 +279,7 @@ export class KafkaConsumer implements IRequireInitialization, IDisposable { // tslint:disable-next-line:no-floating-promises this.commitOffsetsIfNecessary(); }, 0); + this.timer.unref(); } await this.consumer.run({ @@ -407,8 +409,28 @@ export class KafkaConsumer implements IRequireInitialization, IDisposable { this.metrics.gauge(KafkaMetrics.Lag, lag.toNumber(), tags); } } + + this.brokerMetadataErrors = 0; } catch (e) { this.logger.warn("Unable to retrieve watermarks", e); + + // this is a workaround for https://github.com/walmartlabs/cookie-cutter/issues/185 + // until a fix for kafkajs is available / the root cause of the problem is confirmed + if ( + e && + e.toString().contains("server is not the leader for that topic-partition") + ) { + this.brokerMetadataErrors++; + } + + if (this.brokerMetadataErrors > 10) { + this.logger.error("detected stale broker metadata in kafkajs"); + this.brokerMetadataErrors = 0; + + // throwing an error here will cause a UnhandledPromiseException + // and terminate the application + throw new Error("stale broker metadata"); + } } } } finally { @@ -416,6 +438,7 @@ export class KafkaConsumer implements IRequireInitialization, IDisposable { // tslint:disable-next-line:no-floating-promises this.commitOffsetsIfNecessary(); }, this.offsetCommitIntervalMs); + this.timer.unref(); } }