Skip to content

Commit

Permalink
Log rdkafka events
Browse files Browse the repository at this point in the history
Errors/... are logged by default, and debug logs can be enabled by
setting the `RDKAFKA_DEBUG_LOGS` variable

Issue: BB-439
  • Loading branch information
francoisferrand committed Oct 4, 2023
1 parent ea4eaa9 commit fcddd6a
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 0 deletions.
10 changes: 10 additions & 0 deletions lib/BackbeatConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ class BackbeatConsumer extends EventEmitter {
'offset_commit_cb': this._onOffsetCommit.bind(this),
// automatically create topic
'allow.auto.create.topics': true,
// enable debug if needed
'event_cb': true,
'debug': process.env.RDKAFKA_DEBUG_LOGS || '',
};
const topicParams = {};
if (this._fromOffset !== undefined) {
Expand All @@ -196,6 +199,13 @@ class BackbeatConsumer extends EventEmitter {
consumerParams['client.rack'] = this._site;
}
this._consumer = new kafka.KafkaConsumer(consumerParams, topicParams);

this._consumer.on('event', event => this._log.info('rdkafka.event', { event }));
this._consumer.on('event.log', log => this._log.info('rdkafka.log', { log }));
this._consumer.on('warning', warning => this._log.warn('rdkafka.warning', { warning }));
this._consumer.on('event.error', err => this._log.error('rdkafka.error', { err }));
this._consumer.on('event.throttle', throttle => this._log.info('rdkafka.throttle', { throttle }));

this._consumer.connect({ timeout: 10000 }, () => {
const opts = {
topic: withTopicPrefix('backbeat-sanitycheck'),
Expand Down
8 changes: 8 additions & 0 deletions lib/BackbeatProducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ class BackbeatProducer extends EventEmitter {

this._producer = new Producer(this.producerConfig, this.topicConfig);

this._producer.on('event', event => this._log.info('rdkafka.event', { event }));
this._producer.on('event.log', log => this._log.info('rdkafka.log', { log }));
this._producer.on('warning', warning => this._log.warn('rdkafka.warning', { warning }));
this._producer.on('event.error', err => this._log.error('rdkafka.error', { err }));
this._producer.on('event.throttle', throttle => this._log.info('rdkafka.throttle', { throttle }));

this.connect();
this.setListeners();
return this;
Expand Down Expand Up @@ -68,6 +74,8 @@ class BackbeatProducer extends EventEmitter {
'message.max.bytes': this._maxRequestSize,
'dr_cb': true,
'compression.type': Constants.compressionType,
'event_cb': true,
'debug': process.env.RDKAFKA_DEBUG_LOGS || '',
};
}

Expand Down

0 comments on commit fcddd6a

Please sign in to comment.