Skip to content

Commit

Permalink
Fixed a segfault when 'commit' or 'store_offsets' consumer method is …
Browse files Browse the repository at this point in the history
…called incorrectly with errored Message object (#1754)

Fixed a segfault when 'commit' or 'store_offsets' consumer method is called incorrectly with errored Message object
  • Loading branch information
pranavrth authored Jun 28, 2024
1 parent 729563f commit a6d2e1e
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ v2.4.1 is a maintenance release with the following fixes and enhancements:
- Removed usage of `strcpy` to enhance security of the client (#1745)
- Fixed invalid write in `OAUTHBEARER/OIDC` extensions copy (#1745)
- Fixed an issue related to import error of `TopicCollection` and `TopicPartitionInfo` classes when importing through other module like mypy.
- Fixed a segfault when `commit` or `store_offsets` consumer method is called incorrectly with errored Message object

confluent-kafka-python is based on librdkafka v2.4.1, see the
[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.4.1)
Expand Down
20 changes: 20 additions & 0 deletions src/confluent_kafka/src/Consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,16 @@ static PyObject *Consumer_commit (Handle *self, PyObject *args,
}

m = (Message *)msg;

if (m->error != Py_None) {
PyObject *error = Message_error(m, NULL);
PyObject *errstr = PyObject_CallMethod(error, "str", NULL);
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG,
"Cannot commit offsets for message with error: '%s'" , PyUnicode_AsUTF8(errstr));
Py_DECREF(error);
Py_DECREF(errstr);
return NULL;
}

c_offsets = rd_kafka_topic_partition_list_new(1);
rktpar = rd_kafka_topic_partition_list_add(
Expand Down Expand Up @@ -627,6 +637,16 @@ static PyObject *Consumer_store_offsets (Handle *self, PyObject *args,

m = (Message *)msg;

if (m->error != Py_None) {
PyObject *error = Message_error(m, NULL);
PyObject *errstr = PyObject_CallMethod(error, "str", NULL);
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG,
"Cannot store offsets for message with error: '%s'" , PyUnicode_AsUTF8(errstr));
Py_DECREF(error);
Py_DECREF(errstr);
return NULL;
}

c_offsets = rd_kafka_topic_partition_list_new(1);
rktpar = rd_kafka_topic_partition_list_add(
c_offsets, cfl_PyUnistr_AsUTF8(m->topic, &uo8),
Expand Down
52 changes: 51 additions & 1 deletion tests/integration/consumer/test_consumer_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#

import pytest
from confluent_kafka import TopicPartition, OFFSET_END, KafkaError
from confluent_kafka import TopicPartition, OFFSET_END, KafkaError, KafkaException

from confluent_kafka.error import ConsumeError
from confluent_kafka.serialization import StringSerializer
Expand All @@ -44,3 +44,53 @@ def test_consume_error(kafka_cluster):
consumer.poll()
assert exc_info.value.args[0].code() == KafkaError._PARTITION_EOF, \
"Expected _PARTITION_EOF, not {}".format(exc_info)


def test_consume_error_commit(kafka_cluster):
"""
Tests to ensure that we handle messages with errors when commiting.
"""
topic = kafka_cluster.create_topic("test_commit_transaction")
consumer_conf = {'group.id': 'pytest',
'session.timeout.ms': 100}

producer = kafka_cluster.producer()
producer.produce(topic=topic, value="a")
producer.flush()

consumer = kafka_cluster.cimpl_consumer(consumer_conf)
consumer.subscribe([topic])
try:
# Since the session timeout value is low, JoinGroupRequest will fail
# and we get error in a message while polling.
m = consumer.poll(1)
consumer.commit(m)
except KafkaException as e:
assert e.args[0].code() == KafkaError._INVALID_ARG, \
"Expected INVALID_ARG, not {}".format(e)


def test_consume_error_store_offsets(kafka_cluster):
"""
Tests to ensure that we handle messages with errors when storing offsets.
"""
topic = kafka_cluster.create_topic("test_commit_transaction")
consumer_conf = {'group.id': 'pytest',
'session.timeout.ms': 100,
'enable.auto.offset.store': True,
'enable.auto.commit': False}

producer = kafka_cluster.producer()
producer.produce(topic=topic, value="a")
producer.flush()

consumer = kafka_cluster.cimpl_consumer(consumer_conf)
consumer.subscribe([topic])
try:
# Since the session timeout value is low, JoinGroupRequest will fail
# and we get error in a message while polling.
m = consumer.poll(1)
consumer.store_offsets(m)
except KafkaException as e:
assert e.args[0].code() == KafkaError._INVALID_ARG, \
"Expected INVALID_ARG, not {}".format(e)

0 comments on commit a6d2e1e

Please sign in to comment.