diff --git a/test/unittests/test_client.py b/test/unittests/test_client.py index 561617b..0bc0aad 100644 --- a/test/unittests/test_client.py +++ b/test/unittests/test_client.py @@ -226,7 +226,7 @@ def test_message_drop_invalid(self): class TestClientConnections(unittest.TestCase): service_proc: Process = None - num_clients = 128 + num_clients = 256 clients = [] @classmethod @@ -316,3 +316,39 @@ def handler(message): for client in self.clients: client.close() self.clients = [] + + def test_wait_for_response(self): + threads = list() + handlers = list() + + def _handler(message): + self.assertIn(message.data['idx'], message.msg_type) + self.clients[0].emit(message.response()) + + def _await_response(client, idx): + message_type = f"test.message.{idx}" + context = {"test": secrets.token_hex(512)} + resp = client.wait_for_response(Message(message_type, {"idx": idx}, + context)) + self.assertIsInstance(resp, Message) + self.assertEqual(resp.msg_type, f"{message_type}.response") + self.assertEqual(resp.context['test'], context['test']) + + for i in range(self.num_clients): + client = MessageBusClient() + self.clients.append(client) + client.run_in_thread() + self.assertTrue(client.connected_event.wait(5)) + message_type = f"test.message.{i}" + handler = Mock(side_effect=_handler) + handlers.append(handler) + client.on(message_type, handler) + + for idx, client in enumerate(self.clients): + t = Thread(target=_await_response, args=(client, idx)) + threads.append(t) + t.start() + + for thread in threads: + thread.join(3) + self.assertFalse(thread.is_alive())