diff --git a/taskflow/engines/worker_based/proxy.py b/taskflow/engines/worker_based/proxy.py index e58c7a2ef..23fb59a60 100644 --- a/taskflow/engines/worker_based/proxy.py +++ b/taskflow/engines/worker_based/proxy.py @@ -216,6 +216,10 @@ def _drain_errback(exc, interval): self._running.set() try: while self._running.is_set(): + # This seems to be required when failures occur in + # rabbitmq, as the consumer doesn't show up in rabbit + # as a valid consumer without it. (mmontgomery) + consumer.consume() safe_drain(conn, self._drain_events_timeout) if self._on_wait is not None: self._on_wait()