diff --git a/tests/test_integration.py b/tests/test_integration.py index 32e4aea..afde00a 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -31,7 +31,7 @@ @pytest.fixture() -def make_test_dirs(tmp_path): +def test_dir_factory(tmp_path): def create_file(p: Path, total_size: int, pattern=None): pattern = bytearray(random.getrandbits(8) for _ in range(1234)) if pattern is None else pattern @@ -85,18 +85,16 @@ def setup_test_content(base: Path, sub_dir: str, keep_empty=False): create_file(p / rnd_name('rnd_file', '.bin'), int(random.random() * CHUNK_SIZE * 7)) return str(base.resolve()) - print(f"Creating seed dir contents in '{tmp_path}'...") - seed_dir = setup_test_content(tmp_path, 'seed') - peer_dirs = [] - for i, name in TEST_PEER_NAMES.items(): - print(f"Creating files for '{name}'") - peer_dirs.append(setup_test_content(tmp_path, name, keep_empty=(i == 0))) - return seed_dir, peer_dirs + def _factory(node_name: str, keep_empty=False): + print(f"Creating {'empty ' if keep_empty else ''}test dir for '{node_name}'") + return setup_test_content(tmp_path, node_name, keep_empty=keep_empty) + + yield _factory # To be run in separate process: # run syncer and send stdout + exceptions through a pipe. -def sync_proc(conn, is_master, argv): +def _sync_proc(conn, is_master, argv): try: class PipeOut(io.RawIOBase): def write(self, b): @@ -113,44 +111,101 @@ def write(self, b): conn.send((e, traceback.format_exc())) +def _spawn_sync_process(name: str, is_master: bool, sync_dir: str, port: int, master_addr: str): + print(f"Spawning process for {'master' if is_master else 'peer'} node '{name}' at port {port}...") + out = io.StringIO() + def comm_thread(conn): + with suppress(EOFError): + while conn: + o = conn.recv() + if isinstance(o, tuple): + pass # process exit + else: + out.write(str(o)) + conn_recv, conn_send = mp.Pipe(duplex=False) + argv = ['masternode', sync_dir, '--port', str(port), '--concurrent-transfers', '1'] if is_master else \ + ['peernode', master_addr, sync_dir, '--port', str(port), '--rescan-interval', '3'] + proc = mp.Process(target=_sync_proc, name='sync-worker', args=(conn_send, is_master, argv)) + threading.Thread(target=comm_thread, args=(conn_recv,)).start() + proc.start() + return SimpleNamespace(proc=proc, is_master=is_master, out=out, name=name, dir=sync_dir) + +def _wait_seconds(s): + for x in range(s): + print(f"Waiting {s-x} seconds...") + time.sleep(1) + +def _kill_procs(*args): + for x in args: + print(f"Terminating '{x.name}'...") + x.proc.terminate() + +def _assert_node_basics(p): + assert 'Exception' not in p.out.getvalue(), f'Exception(s) on {p.name}' + assert 'egmentation fault' not in p.out.getvalue() + assert p.is_master or 'Up to date' in p.out.getvalue(), 'Node never reached up-to-date state.' + assert 'traceback' not in str(p.out.getvalue()).lower() + +def _print_process_stdout(p): + print(f'\n>>>>>>>>>>>>>>>>>>>> stdout for {p.name} >>>>>>>>>>>>>>>>>>>>\n') + print(p.out.getvalue()) + print("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<") + + +def _assert_identical_dir(p1, p2): + cmp = filecmp.dircmp(p1.dir, p2.dir) + assert not cmp.diff_files, f'Differing files between {p.name} and {p2.name}: {str(cmp.diff_files)}' + assert not cmp.funny_files, f'"Funny" files between {p.name} and {p2.name}: {str(cmp.funny_files)}' + assert not cmp.left_only, f'Files found only from {p.name}: {str(cmp.left_only)}' + assert not cmp.right_only, f'Files not found from {p.name}: {str(cmp.right_only)}' + + + + +def test_minimal_downloads(test_dir_factory): + """Test that each chunk is downloaded only once.""" + master = _spawn_sync_process(f'seed', True, test_dir_factory('seed'), PORT_BASE, '') + peer = _spawn_sync_process(f'leecher', False, test_dir_factory('leecher', keep_empty=True), PORT_BASE+1, f'localhost:{PORT_BASE}') -def test_actual_swarm_on_localhost(make_test_dirs): + _wait_seconds(4) + _kill_procs(master, peer) + + # Count hash downloads from master's log + master_lines = str(master.out.getvalue()).split('\n') + dl_lines = [l for l in master_lines if 'GET /blob/' in l] + assert len(dl_lines) > 0, "No download log messages on master" + downloaded_hashes = [l.split('/')[-1] for l in dl_lines] + for h in downloaded_hashes: + assert downloaded_hashes.count(h) == 1, f"Hash {h} was downloaded more than once." + + # Also check for basic conditions + for p in (master, peer): + _print_process_stdout(p) + _assert_node_basics(p) + _assert_identical_dir(p, master) + print(f'Peer {p.name} test ok') + + + +def test_swarm__corruption__bad_protocol__uptodate__errors(test_dir_factory): """ Integration test. Creates a small local swarm with peers having different initial contents, runs it for a bit and checks that all synced up ok. """ - seed_dir, peer_dirs = make_test_dirs - - def spawn_sync_process(name: str, is_master: bool, sync_dir: str, port: int, master_addr: str): - print(f"Spawning process for '{name}'...") - out = io.StringIO() - def comm_thread(conn): - with suppress(EOFError): - while conn: - o = conn.recv() - if isinstance(o, tuple): - pass # process exit - else: - out.write(str(o)) - conn_recv, conn_send = mp.Pipe(duplex=False) - argv = ['masternode.py', sync_dir, '--port', str(port), '--concurrent-transfers', '1'] if is_master else \ - ['peernode.py', master_addr, sync_dir, '--port', str(port), '--rescan-interval', '3'] - proc = mp.Process(target=sync_proc, name='sync-worker', args=(conn_send, is_master, argv)) - threading.Thread(target=comm_thread, args=(conn_recv,)).start() - proc.start() - return SimpleNamespace(proc=proc, is_master=is_master, out=out, name=name, dir=sync_dir) + seed_dir = test_dir_factory('seed') + peer_dirs = [test_dir_factory(TEST_PEER_NAMES[i], keep_empty=(i == 0)) for i in TEST_PEER_NAMES.keys()] + master = None peers = [] for i, name in TEST_PEER_NAMES.items(): - peers.append(spawn_sync_process(name=f'{name}', is_master=False, sync_dir=peer_dirs[i], port=PORT_BASE+1+i, - master_addr=f'localhost:{PORT_BASE}')) + peers.append(_spawn_sync_process(f'{name}', False, peer_dirs[i], PORT_BASE+1+i, f'localhost:{PORT_BASE}')) time.sleep(0.1) # stagger peer generation a bit if i == 1: # Start server after the first two peers to test start order - master = spawn_sync_process(name=f'master', is_master=True, sync_dir=seed_dir, port=PORT_BASE, master_addr='') + master = _spawn_sync_process(f'master', True, seed_dir, PORT_BASE, '') # Alter files on one peer in the middle of a sync - time.sleep(4) + _wait_seconds(4) print(f"Corrupting some files on '{peers[1].name}'...") try: shutil.rmtree(peers[1].dir + '/dir2') # Delete a dir @@ -201,38 +256,15 @@ async def read_respose_msgs(ws): except asyncio.TimeoutError: assert False, "Request tests timed out." - - # Wait - for x in range(10): - print(f"Waiting {10-x} seconds for nodes before terminating...") - time.sleep(1) - - # Kill processes - for x in (master, *peers): - print(f"Terminating '{x.name}'...") - x.proc.terminate() + _wait_seconds(10) + _kill_procs(master, *peers) print(f"All nodes killed. Testing results...") - for p in (master, *peers): - cmp = filecmp.dircmp(p.dir, master.dir) - try: - assert 'Exception' not in p.out.getvalue(), f'Exception(s) on {p.name}' - assert 'egmentation fault' not in p.out.getvalue() - assert p.is_master or 'Up to date' in p.out.getvalue(), 'Node never reached up-to-date state.' - assert 'traceback' not in str(p.out.getvalue()).lower() - assert not cmp.diff_files, f'Differing files between {p.name} and master: {str(cmp.diff_files)}' - assert not cmp.funny_files, f'"Funny" files between {p.name} and master: {str(cmp.funny_files)}' - assert not cmp.left_only, f'Files found only from {p.name}: {str(cmp.left_only)}' - assert not cmp.right_only, f'Files not found from {p.name}: {str(cmp.right_only)}' - - print(f'\n>>>>>>>>>>>>>>>>>>>> stdout for {p.name} >>>>>>>>>>>>>>>>>>>>\n') - print(p.out.getvalue()) - - except: - print(f'\n>>>>>>>>>>>>>>>>>>>> stdout for {p.name} >>>>>>>>>>>>>>>>>>>>\n') - print(p.out.getvalue()) - print("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<") - raise + assert any([('GET /blob/' in p.out.getvalue()) for p in peers]), 'No P2P transfers happened' + for p in (master, *peers): + _print_process_stdout(p) + _assert_node_basics(p) + _assert_identical_dir(p, master) print(f'Peer {p.name} test ok')