Skip to content

Commit

Permalink
Refactor tests. Add test for minimal downloads
Browse files Browse the repository at this point in the history
  • Loading branch information
elonen committed Jan 6, 2020
1 parent 6d1f514 commit 2ae08c1
Showing 1 changed file with 96 additions and 64 deletions.
160 changes: 96 additions & 64 deletions tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@


@pytest.fixture()
def make_test_dirs(tmp_path):
def test_dir_factory(tmp_path):

def create_file(p: Path, total_size: int, pattern=None):
pattern = bytearray(random.getrandbits(8) for _ in range(1234)) if pattern is None else pattern
Expand Down Expand Up @@ -85,18 +85,16 @@ def setup_test_content(base: Path, sub_dir: str, keep_empty=False):
create_file(p / rnd_name('rnd_file', '.bin'), int(random.random() * CHUNK_SIZE * 7))
return str(base.resolve())

print(f"Creating seed dir contents in '{tmp_path}'...")
seed_dir = setup_test_content(tmp_path, 'seed')
peer_dirs = []
for i, name in TEST_PEER_NAMES.items():
print(f"Creating files for '{name}'")
peer_dirs.append(setup_test_content(tmp_path, name, keep_empty=(i == 0)))
return seed_dir, peer_dirs
def _factory(node_name: str, keep_empty=False):
print(f"Creating {'empty ' if keep_empty else ''}test dir for '{node_name}'")
return setup_test_content(tmp_path, node_name, keep_empty=keep_empty)

yield _factory


# To be run in separate process:
# run syncer and send stdout + exceptions through a pipe.
def sync_proc(conn, is_master, argv):
def _sync_proc(conn, is_master, argv):
try:
class PipeOut(io.RawIOBase):
def write(self, b):
Expand All @@ -113,44 +111,101 @@ def write(self, b):
conn.send((e, traceback.format_exc()))


def _spawn_sync_process(name: str, is_master: bool, sync_dir: str, port: int, master_addr: str):
print(f"Spawning process for {'master' if is_master else 'peer'} node '{name}' at port {port}...")
out = io.StringIO()
def comm_thread(conn):
with suppress(EOFError):
while conn:
o = conn.recv()
if isinstance(o, tuple):
pass # process exit
else:
out.write(str(o))
conn_recv, conn_send = mp.Pipe(duplex=False)
argv = ['masternode', sync_dir, '--port', str(port), '--concurrent-transfers', '1'] if is_master else \
['peernode', master_addr, sync_dir, '--port', str(port), '--rescan-interval', '3']
proc = mp.Process(target=_sync_proc, name='sync-worker', args=(conn_send, is_master, argv))
threading.Thread(target=comm_thread, args=(conn_recv,)).start()
proc.start()
return SimpleNamespace(proc=proc, is_master=is_master, out=out, name=name, dir=sync_dir)

def _wait_seconds(s):
for x in range(s):
print(f"Waiting {s-x} seconds...")
time.sleep(1)

def _kill_procs(*args):
for x in args:
print(f"Terminating '{x.name}'...")
x.proc.terminate()

def _assert_node_basics(p):
assert 'Exception' not in p.out.getvalue(), f'Exception(s) on {p.name}'
assert 'egmentation fault' not in p.out.getvalue()
assert p.is_master or 'Up to date' in p.out.getvalue(), 'Node never reached up-to-date state.'
assert 'traceback' not in str(p.out.getvalue()).lower()

def _print_process_stdout(p):
print(f'\n>>>>>>>>>>>>>>>>>>>> stdout for {p.name} >>>>>>>>>>>>>>>>>>>>\n')
print(p.out.getvalue())
print("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<")


def _assert_identical_dir(p1, p2):
cmp = filecmp.dircmp(p1.dir, p2.dir)
assert not cmp.diff_files, f'Differing files between {p.name} and {p2.name}: {str(cmp.diff_files)}'
assert not cmp.funny_files, f'"Funny" files between {p.name} and {p2.name}: {str(cmp.funny_files)}'
assert not cmp.left_only, f'Files found only from {p.name}: {str(cmp.left_only)}'
assert not cmp.right_only, f'Files not found from {p.name}: {str(cmp.right_only)}'




def test_minimal_downloads(test_dir_factory):
"""Test that each chunk is downloaded only once."""
master = _spawn_sync_process(f'seed', True, test_dir_factory('seed'), PORT_BASE, '')
peer = _spawn_sync_process(f'leecher', False, test_dir_factory('leecher', keep_empty=True), PORT_BASE+1, f'localhost:{PORT_BASE}')

def test_actual_swarm_on_localhost(make_test_dirs):
_wait_seconds(4)
_kill_procs(master, peer)

# Count hash downloads from master's log
master_lines = str(master.out.getvalue()).split('\n')
dl_lines = [l for l in master_lines if 'GET /blob/' in l]
assert len(dl_lines) > 0, "No download log messages on master"
downloaded_hashes = [l.split('/')[-1] for l in dl_lines]
for h in downloaded_hashes:
assert downloaded_hashes.count(h) == 1, f"Hash {h} was downloaded more than once."

# Also check for basic conditions
for p in (master, peer):
_print_process_stdout(p)
_assert_node_basics(p)
_assert_identical_dir(p, master)
print(f'Peer {p.name} test ok')



def test_swarm__corruption__bad_protocol__uptodate__errors(test_dir_factory):
"""
Integration test. Creates a small local swarm with peers having
different initial contents, runs it for a bit and checks that all synced up ok.
"""
seed_dir, peer_dirs = make_test_dirs

def spawn_sync_process(name: str, is_master: bool, sync_dir: str, port: int, master_addr: str):
print(f"Spawning process for '{name}'...")
out = io.StringIO()
def comm_thread(conn):
with suppress(EOFError):
while conn:
o = conn.recv()
if isinstance(o, tuple):
pass # process exit
else:
out.write(str(o))
conn_recv, conn_send = mp.Pipe(duplex=False)
argv = ['masternode.py', sync_dir, '--port', str(port), '--concurrent-transfers', '1'] if is_master else \
['peernode.py', master_addr, sync_dir, '--port', str(port), '--rescan-interval', '3']
proc = mp.Process(target=sync_proc, name='sync-worker', args=(conn_send, is_master, argv))
threading.Thread(target=comm_thread, args=(conn_recv,)).start()
proc.start()
return SimpleNamespace(proc=proc, is_master=is_master, out=out, name=name, dir=sync_dir)
seed_dir = test_dir_factory('seed')
peer_dirs = [test_dir_factory(TEST_PEER_NAMES[i], keep_empty=(i == 0)) for i in TEST_PEER_NAMES.keys()]

master = None
peers = []
for i, name in TEST_PEER_NAMES.items():
peers.append(spawn_sync_process(name=f'{name}', is_master=False, sync_dir=peer_dirs[i], port=PORT_BASE+1+i,
master_addr=f'localhost:{PORT_BASE}'))
peers.append(_spawn_sync_process(f'{name}', False, peer_dirs[i], PORT_BASE+1+i, f'localhost:{PORT_BASE}'))
time.sleep(0.1) # stagger peer generation a bit
if i == 1:
# Start server after the first two peers to test start order
master = spawn_sync_process(name=f'master', is_master=True, sync_dir=seed_dir, port=PORT_BASE, master_addr='')
master = _spawn_sync_process(f'master', True, seed_dir, PORT_BASE, '')

# Alter files on one peer in the middle of a sync
time.sleep(4)
_wait_seconds(4)
print(f"Corrupting some files on '{peers[1].name}'...")
try:
shutil.rmtree(peers[1].dir + '/dir2') # Delete a dir
Expand Down Expand Up @@ -201,38 +256,15 @@ async def read_respose_msgs(ws):
except asyncio.TimeoutError:
assert False, "Request tests timed out."


# Wait
for x in range(10):
print(f"Waiting {10-x} seconds for nodes before terminating...")
time.sleep(1)

# Kill processes
for x in (master, *peers):
print(f"Terminating '{x.name}'...")
x.proc.terminate()
_wait_seconds(10)
_kill_procs(master, *peers)

print(f"All nodes killed. Testing results...")

for p in (master, *peers):
cmp = filecmp.dircmp(p.dir, master.dir)
try:
assert 'Exception' not in p.out.getvalue(), f'Exception(s) on {p.name}'
assert 'egmentation fault' not in p.out.getvalue()
assert p.is_master or 'Up to date' in p.out.getvalue(), 'Node never reached up-to-date state.'
assert 'traceback' not in str(p.out.getvalue()).lower()
assert not cmp.diff_files, f'Differing files between {p.name} and master: {str(cmp.diff_files)}'
assert not cmp.funny_files, f'"Funny" files between {p.name} and master: {str(cmp.funny_files)}'
assert not cmp.left_only, f'Files found only from {p.name}: {str(cmp.left_only)}'
assert not cmp.right_only, f'Files not found from {p.name}: {str(cmp.right_only)}'

print(f'\n>>>>>>>>>>>>>>>>>>>> stdout for {p.name} >>>>>>>>>>>>>>>>>>>>\n')
print(p.out.getvalue())

except:
print(f'\n>>>>>>>>>>>>>>>>>>>> stdout for {p.name} >>>>>>>>>>>>>>>>>>>>\n')
print(p.out.getvalue())
print("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<")
raise
assert any([('GET /blob/' in p.out.getvalue()) for p in peers]), 'No P2P transfers happened'

for p in (master, *peers):
_print_process_stdout(p)
_assert_node_basics(p)
_assert_identical_dir(p, master)
print(f'Peer {p.name} test ok')

0 comments on commit 2ae08c1

Please sign in to comment.