diff --git a/test-helpers/src/connection/kafka/python/main.py b/test-helpers/src/connection/kafka/python/main.py index 57d62e2a2..4b11a7d17 100644 --- a/test-helpers/src/connection/kafka/python/main.py +++ b/test-helpers/src/connection/kafka/python/main.py @@ -2,6 +2,8 @@ from kafka import KafkaAdminClient from kafka import KafkaProducer from kafka.admin import NewTopic +from kafka.errors import UnknownTopicOrPartitionError +from time import sleep import sys def main(): @@ -17,9 +19,12 @@ def main(): replication_factor=1 ) ]) - producer = KafkaProducer(**config) - producer.send('python_test_topic', b'some_message_bytes').get(timeout=30) + + # send first message with retry since the topic may not be created yet. + retry_if_not_ready(lambda : producer.send('python_test_topic', b'some_message_bytes').get(timeout=30)) + + # send second message without retry, it has no reason to fail. producer.send('python_test_topic', b'another_message').get(timeout=30) consumer = KafkaConsumer('python_test_topic', auto_offset_reset='earliest', **config) @@ -36,6 +41,19 @@ def main(): print("kafka-python script passed all test cases") +def retry_if_not_ready(attempt): + tries = 0 + while True: + try: + attempt() + return + except UnknownTopicOrPartitionError: + tries += 1 + sleep(0.1) + # fail after 10s worth of attempts + if tries > 100: + raise Exception("Timedout, hit UnknownTopicOrPartitionError 100 times in a row") + if __name__ == "__main__": main()