Skip to content

Commit

Permalink
Timestamp fix
Browse files Browse the repository at this point in the history
Added pull request wintoncode#58
  • Loading branch information
esall authored Apr 23, 2019
1 parent 5867a1c commit fff8a83
Showing 1 changed file with 5 additions and 0 deletions.
5 changes: 5 additions & 0 deletions winton_kafka_streams/processor/_record_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ def send(self, topic, key, value, timestamp,

while not produced:
try:
if isinstance(timestamp, tuple) and timestamp:
timestamp = timestamp[-1]
if isinstance(timestamp, float):
timestamp = int(timestamp)

self.producer.produce(topic, ser_value, ser_key, partition, self.on_delivery, partitioner, timestamp)
self.producer.poll(0) # Ensure previous message's delivery reports are served
produced = True
Expand Down

0 comments on commit fff8a83

Please sign in to comment.