You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Issue #488 and the broken test found in this ticket made me realize that if a worker with no streams is registered in the engine, it still "uses" one slot for fanout, delaying other potential engine clients, and if the fanout is low, leading to the engine loop exiting prematurely in some cases, like in the test below. This is because pending clients are not retaining the run loop (evlooprefcnt is only increased when clients register, not before). With the current code (as of 1.8-1.9), the test case below exits prematurely and the event handlers are not called at all.
I open this ticket to re-work the engine to improve management of active streams, clients, and fanout management. Not super high priority as this is an edge case with weird workers but still, it would be nice to fix in the future.
test case:
deftest_010_worker_with_no_streams_vs_fanout(self):
"""test StreamWorker with no streams vs engine fanout"""# Revealead by GH #488classTestH(EventHandler):
def__init__(self):
self.start_count=0self.close_count=0self.pickup_count=0self.hup_count=0defev_start(self, worker):
self.start_count+=1defev_pickup(self, worker, node):
self.pickup_count+=1defev_hup(self, worker, node, rc):
self.hup_count+=1defev_close(self, worker, timedout):
ifnottimedout:
self.close_count+=1task=task_self()
task.set_info('debug', True)
fanout=task.info("fanout")
try:
task.set_info("fanout", 1)
self.assertEqual(len(task._engine._clients), 0)
# under low fanout, register a StreamWorker with no streamsworker=StreamWorker(handler=None)
task.schedule(worker)
eh=TestH()
task.shell("/bin/sleep 0.11", handler=eh, key="n1")
task.shell("/bin/sleep 0.12", handler=eh, key="n2")
task.shell("/bin/sleep 0.13", handler=eh, key="n3")
task.run()
self.assertEqual(eh.start_count, 3)
self.assertEqual(eh.pickup_count, 3)
self.assertEqual(eh.hup_count, 3)
self.assertEqual(eh.close_count, 3)
self.assertEqual(len(task._engine._clients), 1)
worker.abort()
self.assertEqual(len(task._engine._clients), 0)
finally:
task.set_info("fanout", fanout)
The text was updated successfully, but these errors were encountered:
An unconfigured StreamWorker does not have streams to register in the
engine, so when registered, it stays until it is explicitly aborted. To
make that worse, it is using a 'fanout slot' and if the Engine's fanout
is too low, it can lead to the engine run loop exiting prematurely. This
is not a case handled very well by the Engine. See cea-hpc#497 for more details
about the problem.
In the meantime, we fix the test to exit with a clean engine state.
Fixescea-hpc#488.
An unconfigured StreamWorker does not have streams to register in the
engine, so when registered, it stays until it is explicitly aborted. To
make that worse, it is using a 'fanout slot' and if the Engine's fanout
is too low, it can lead to the engine run loop exiting prematurely. This
is not a case handled very well by the Engine. See #497 for more details
about the problem.
In the meantime, we fix the test to exit with a clean engine state.
Fixes#488.
Issue #488 and the broken test found in this ticket made me realize that if a worker with no streams is registered in the engine, it still "uses" one slot for fanout, delaying other potential engine clients, and if the fanout is low, leading to the engine loop exiting prematurely in some cases, like in the test below. This is because pending clients are not retaining the run loop (evlooprefcnt is only increased when clients register, not before). With the current code (as of 1.8-1.9), the test case below exits prematurely and the event handlers are not called at all.
I open this ticket to re-work the engine to improve management of active streams, clients, and fanout management. Not super high priority as this is an edge case with weird workers but still, it would be nice to fix in the future.
test case:
The text was updated successfully, but these errors were encountered: