Skip to content

Commit

Permalink
kafka_int_tests::cluster_sasl_scram_over_mtls_nodejs_and_python attem…
Browse files Browse the repository at this point in the history
…pt to fix intermittent failure
  • Loading branch information
rukai committed Nov 25, 2024
1 parent 63a8dcd commit d46f137
Showing 1 changed file with 20 additions and 2 deletions.
22 changes: 20 additions & 2 deletions test-helpers/src/connection/kafka/python/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from kafka import KafkaAdminClient
from kafka import KafkaProducer
from kafka.admin import NewTopic
from kafka.errors import UnknownTopicOrPartitionError
from time import sleep
import sys

def main():
Expand All @@ -17,9 +19,12 @@ def main():
replication_factor=1
)
])

producer = KafkaProducer(**config)
producer.send('python_test_topic', b'some_message_bytes').get(timeout=30)

# send first message with retry since the topic may not be created yet.
retry_if_not_ready(lambda : producer.send('python_test_topic', b'some_message_bytes').get(timeout=30))

# send second message without retry, it has no reason to fail.
producer.send('python_test_topic', b'another_message').get(timeout=30)

consumer = KafkaConsumer('python_test_topic', auto_offset_reset='earliest', **config)
Expand All @@ -36,6 +41,19 @@ def main():

print("kafka-python script passed all test cases")

def retry_if_not_ready(attempt):
tries = 0
while True:
try:
attempt()
return
except UnknownTopicOrPartitionError:
tries += 1
sleep(0.1)
# fail after 10s worth of attempts
if tries > 100:
raise Exception("Timedout, hit UnknownTopicOrPartitionError 100 times in a row")


if __name__ == "__main__":
main()

0 comments on commit d46f137

Please sign in to comment.