Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

I call removeTopics on a consumer, and then add back that topic, I then receive duplicate messages #1452

Open
davesargrad opened this issue Aug 5, 2021 · 0 comments

Comments

@davesargrad
Copy link

davesargrad commented Aug 5, 2021

My code is simple
Here I initialize client:

const init = (broker_address) => {
    // console.log("stream_from_server conn: connecting stream ");
    // console.log("stream_from_server init: connecting to broker ", broker_address);
    const client = new kafka.KafkaClient(broker_address);
    const offset = new kafka.Offset(client)
    const consumer = new kafka.Consumer(client, [  ], { autoCommit: false, fromOffset: true });

    return {client: client, offset: offset, consumer: consumer};
}

I use this to join a topic:

const join = (client_offset_consumer, topic, on_msg_cb) => {
    console.log("stream_from_server join: ", topic);
    client_offset_consumer.offset.fetchLatestOffsets([topic], (err, offsets) => {
        if (err) {
            console.log(`error fetching latest offsets ${err}`)
            return
        }
        var latest = 1
        Object.keys(offsets[topic]).forEach( o => {
            latest = offsets[topic][o] > latest ? offsets[topic][o] : latest
        })

        client_offset_consumer.consumer.addTopics([ { topic: topic, partition: 0,  offset: latest } ], (err, added) => {
            console.log("stream_from_server joined: ", added);
            console.log("stream_from_server joined consumer payloads: ", client_offset_consumer.consumer.payloads);
            client_offset_consumer.consumer.on('message',  (message) => {
                console.log("my message from kafka ", "offset - ", message.offset, "highwateroffset - ", message.highWaterOffset);
                on_msg_cb(message);
            });
        }, true);
    })

}

I use this to leave a topic:

const leave = (client_offset_consumer, topic) => {
    console.log("stream_from_server leave: ", topic);
    client_offset_consumer.consumer.removeTopics([ topic ], (err, removed) => {
        console.log("stream_from_server - left: ", removed);
    });
}

In the following experiment I have a topic named "avl-vam". I join it, leave it, join a second non-existing topic "avl-vama", leave that topic, then join the original topic again.

In the following diagnostic you see that I receive three messages (offsets: 865, 866, 867) on the topic before leaving it.
image

I then leave that first topic, and join then leave a second topic "avl-vama".
image

I then rejoin the original topic "avl-vam".

Then the very next message that arrives (offset 868) is seen in triplicate!
image

Each subsequent message is also seen in triplicate.
image

If I leave the topic again (call removeTopics), then join a second non-existing topic "avl-vamb", leave that, then rejoin the original topic, then I see five messages at the same offset.

image

This seems like a bug. Please advise.

I am using kafka-node 5.0.0.
image

I am using kafka 2.8.0.
image

@davesargrad davesargrad changed the title I call removeTopics on a consumer, and then add back that topic, I then receive multiple messages I call removeTopics on a consumer, and then add back that topic, I then receive duplicate messages Aug 5, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant