From fff8a838d2cc3c4d601d0e7c121040e209dce158 Mon Sep 17 00:00:00 2001 From: san <44360140+esall@users.noreply.github.com> Date: Tue, 23 Apr 2019 13:38:45 +0200 Subject: [PATCH] Timestamp fix Added pull request https://github.com/wintoncode/winton-kafka-streams/pull/58 --- winton_kafka_streams/processor/_record_collector.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/winton_kafka_streams/processor/_record_collector.py b/winton_kafka_streams/processor/_record_collector.py index cf76f71..37a1f1f 100644 --- a/winton_kafka_streams/processor/_record_collector.py +++ b/winton_kafka_streams/processor/_record_collector.py @@ -35,6 +35,11 @@ def send(self, topic, key, value, timestamp, while not produced: try: + if isinstance(timestamp, tuple) and timestamp: + timestamp = timestamp[-1] + if isinstance(timestamp, float): + timestamp = int(timestamp) + self.producer.produce(topic, ser_value, ser_key, partition, self.on_delivery, partitioner, timestamp) self.producer.poll(0) # Ensure previous message's delivery reports are served produced = True