From 6d1f514091f9d27064f0bef487ddc06321aff114 Mon Sep 17 00:00:00 2001 From: Jarno Elonen Date: Sun, 5 Jan 2020 23:45:39 +0200 Subject: [PATCH] Use LZ4 for compressible chunks (only). Bump protocol version major. --- README.md | 8 +- lanscatter/chunker.py | 45 +++++------ lanscatter/common.py | 4 +- lanscatter/fileio.py | 154 +++++++++++++++++++++----------------- lanscatter/masternode.py | 6 +- lanscatter/peernode.py | 11 ++- requirements.cli.txt | 1 + requirements.txt | 1 + tests/test_integration.py | 4 +- 9 files changed, 124 insertions(+), 110 deletions(-) diff --git a/README.md b/README.md index 217ed28..69ff860 100644 --- a/README.md +++ b/README.md @@ -74,14 +74,14 @@ Features and notable differences to Btsync/Resilio, Syncthing and Dropbox-like s * Keeps traffic inside the LAN (doesn't connect to any third party servers). * Resilient against slow individual nodes. Transfers from slow peers are detected, aborted and avoided afterwards. * Does _not_ preserve Unix file attributes (for now), as Windows doesn't support them. -* Master never modifies sync directory - it treat is as _read only_. +* Master never modifies sync directory - it treats it as _read only_. * Supports bandwidth limiting. ## Technologies -Lanscatter is built on _Python 3.7_ using asyncio (aiohttp & aiofiles), -_wxPython_ for cross-platform GUI, _Blake2b_ algorithm for chunk hashing, _pytest_ for unit / integration tests -and _pyinstaller_ for packaging / freezing into exe files. +Lanscatter is built on **Python 3.7** using **asyncio** (aiohttp & aiofiles), +**wxPython** for cross-platform GUI, **Blake2b** algorithm for chunk hashing, **LZ4** for in-flight compression, +**pytest** for unit / integration tests and **pyinstaller** for packaging / freezing into exe files. ## Site-to-site distribution diff --git a/lanscatter/chunker.py b/lanscatter/chunker.py index 9ee6f64..8607724 100644 --- a/lanscatter/chunker.py +++ b/lanscatter/chunker.py @@ -5,7 +5,7 @@ import mmap from concurrent.futures import ThreadPoolExecutor, Future, as_completed, CancelledError from pathlib import Path, PurePosixPath -from contextlib import suppress +import lz4.frame # Tools for scanning files in a directory and splitting them into hashed chunks. # MasterNode and PeerNode both use this for maintaining and syncing their state. @@ -29,6 +29,7 @@ class FileChunk(HashableBase): path: str # path + filename pos: int # chunk start position in bytes size: int # chunk size in bytes + cmpratio: float # Compression ratio (compressed size / original size) hash: HashType # Hex checksum of data contents (blake2, digest_size=12) @@ -164,6 +165,11 @@ def chunk_diff(self, there: 'SyncBatch'): there_only=there_chunks - self.chunks, here_only=self.chunks - there_chunks) + def copy_chunk_compress_ratios_from(self, other): + """Copy (replace) chunk compression ratio values from given other FileBatch.""" + cmp_ratios = {c.hash: c.cmpratio for c in other.chunks if c.cmpratio is not None} + self.chunks = set((FileChunk(path=c.path, pos=c.pos, size=c.size, hash=c.hash, cmpratio=cmp_ratios.get(c.hash)) for c in self.chunks)) + def all_hashes(self) -> Set[HashType]: """Return set of unique hashes in the batch""" return set((c.hash for c in self.chunks)) @@ -187,7 +193,7 @@ def from_dict(data: Dict) -> 'SyncBatch': return res -def _hash_file(basedir: str, relpath: str, chunk_size: int, executor, progress_func: Callable) -> \ +def _hash_file(basedir: str, relpath: str, chunk_size: int, executor, progress_func: Callable, test_compress: bool) -> \ Generator[Future, None, None]: """ Split given file into chunks and return multithreading futures that hash them. @@ -204,28 +210,33 @@ def _hash_file(basedir: str, relpath: str, chunk_size: int, executor, progress_f path = Path(basedir) / relpath files_size = path.stat().st_size if files_size == 0: - yield executor.submit(lambda: FileChunk(path=relpath, pos=0, size=0, hash=HashFunc().result())) + yield executor.submit(lambda: FileChunk(path=relpath, pos=0, size=0, hash=HashFunc().result(), cmpratio=1.0)) else: with open(path, 'r+b') as f: mm = mmap.mmap(f.fileno(), 0) for pos in range(0, files_size, chunk_size): def do_hash(p, s): progress_func(relpath, s, p, files_size) - return FileChunk(path=relpath, pos=p, size=s, hash=HashFunc().update(mm[p:(p+s)]).result()) + h = HashFunc().update(mm[p:(p+s)]).result() + compress_ratio = 1.0 if not test_compress else \ + min(1.0, float("%.2g" % (len(lz4.frame.compress(mm[p:(p+s)])) / s))) + return FileChunk(path=relpath, pos=p, size=s, hash=h, cmpratio=compress_ratio) yield executor.submit(do_hash, pos, min(chunk_size, files_size - pos)) -async def scan_dir(basedir: str, chunk_size: int, old_batch: Optional[SyncBatch], progress_func: Callable) ->\ +async def scan_dir(basedir: str, chunk_size: int, old_batch: Optional[SyncBatch], progress_func: Callable, test_compress: bool) ->\ Tuple[SyncBatch, Iterable[str]]: """ Scan given directory and generate a list of FileChunks of its contents. If old_chunks is provided, - assumes contents haven't changed if mtime and size are identical. + assumes contents haven't changed if mtime and size are identical. Optionally tests how compressible chunks are. :param basedir: Folders to scan :param progress_func: Progress report callback - func(cur_filename, file_progress, total_progress) :param chunk_size: Length of chunk to split files into :param old_batch: If given, compares dir it and skips hashing files with identical size & mtime - :return: Tuple(New list of FileChunks or old_chunks if no changes are detected, List[errors]) + :param test_compress: Test for compressibility while hashing + :return: Tuple(New list of FileChunks or old_chunks if no changes are detected, List[errors], + Dict[: compress_ratio, ...]) """ errors = [] fnames = [] @@ -276,7 +287,7 @@ def file_progress(path, just_read, pos, file_size): futures = [] for fn in files_needing_rehash: try: - futures.extend(_hash_file(basedir, fn, chunk_size, pool, file_progress)) + futures.extend(_hash_file(basedir, fn, chunk_size, pool, file_progress, test_compress=test_compress)) except (OSError, IOError) as e: errors.append(f'[{fn}]: ' + str(e)) for f in as_completed(futures): @@ -311,21 +322,3 @@ def file_progress(path, just_read, pos, file_size): assert '\\' not in x.path, f"Non-posix path: '{x.path}'" return res, errors - - -# ---------------------------- - -async def main(): - - def progress(cur_filename, file_progress, total_progress): - print(cur_filename, file_progress, total_progress) - - src = await scan_dir('sync-source/', 100*1000*1000, old_batch=None, progress_func=progress) - trg = await scan_dir('sync-target/', 100*1000*1000, old_batch=None, progress_func=progress) - fdiff = trg.file_tree_diff(src) - print(fdiff) - cdiff = trg.chunk_diff(src) - print(cdiff) - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/lanscatter/common.py b/lanscatter/common.py index 6cfc9be..54f3926 100644 --- a/lanscatter/common.py +++ b/lanscatter/common.py @@ -21,8 +21,8 @@ class Defaults: CONCURRENT_TRANSFERS_PEER = 2 DIR_SCAN_INTERVAL_PEER = 60 - APP_VERSION = '0.1.1' - PROTOCOL_VERSION = '1.0.0' + APP_VERSION = '0.1.2' + PROTOCOL_VERSION = '2.0.0' def drop_process_priority(): diff --git a/lanscatter/fileio.py b/lanscatter/fileio.py index 77cefe5..b139385 100644 --- a/lanscatter/fileio.py +++ b/lanscatter/fileio.py @@ -3,6 +3,7 @@ from typing import Tuple, Optional from contextlib import suppress import aiofiles, os, time, asyncio, aiohttp, mmap +import lz4.frame from .common import Defaults from .chunker import FileChunk, HashFunc @@ -78,54 +79,66 @@ async def upload_chunk(self, chunk: FileChunk, request: web.Request)\ :param chunk: Chunk to read :param request: HTTP request to answer + :param use_lz4: Compress with LZ4 if client accepts it :return: Tuple(Aiohttp.response, float(seconds the upload took) or None if it no progress was made) """ + use_lz4 = (chunk.cmpratio < 0.95) and ('lz4' in str(request.headers.get('Accept-Encoding'))) response = None remaining = chunk.size start_t = time.time() try: async with self.open_and_seek(chunk.path, chunk.pos, for_write=False) as f: - # Ok, read chunk from file and stream it out - response = web.StreamResponse( - status=200, - reason='OK', - headers={'Content-Type': 'application/octet-stream', 'Content-Disposition': 'inline'}) - await response.prepare(request) - - buff_in, buff_out = bytearray(Defaults.FILE_BUFFER_SIZE), None - - async def read_file(): - nonlocal buff_in, remaining, chunk - if remaining > 0: - if remaining < len(buff_in): - buff_in = bytearray(remaining) - cnt = await f.readinto(buff_in) - if cnt != len(buff_in): - raise web.HTTPNotFound(reason=f'Filesize mismatch / "{str(chunk.path)}" changed? Read {cnt} but expected {len(buff_in)}.') - remaining -= cnt - else: - buff_in = None - - async def write_http(): - nonlocal buff_out - if not buff_out: - buff_out = bytearray(Defaults.FILE_BUFFER_SIZE) - else: - i, cnt = 0, len(buff_out) - while cnt > 0: - limited_n = int(await self.ul_limiter.acquire(cnt, Defaults.NETWORK_BUFFER_MIN)) - await response.write(buff_out[i:(i + limited_n)]) - i += limited_n - cnt -= limited_n - - # Double buffered read & write - while remaining > 0: - await asyncio.gather(read_file(), write_http()) - buff_in, buff_out = buff_out, buff_in # Swap buffers - await write_http() # Write once more to flush buff_out - - await response.write_eof() - return response, (time.time() - start_t) + with lz4.frame.LZ4FrameCompressor() as lz: + # Ok, read chunk from file and stream it out + response = web.StreamResponse( + status=200, + reason='OK', + headers={'Content-Type': 'application/octet-stream', 'Content-Disposition': 'inline', + 'Content-Encoding': 'lz4' if use_lz4 else 'None'}) + await response.prepare(request) + if use_lz4: + await response.write(lz.begin()) + + buff_in, buff_out = bytearray(Defaults.FILE_BUFFER_SIZE), None + + async def read_file(): + nonlocal buff_in, remaining, chunk + if remaining > 0: + if remaining < len(buff_in): + buff_in = bytearray(remaining) + cnt = await f.readinto(buff_in) + if cnt != len(buff_in): + raise web.HTTPNotFound(reason=f'Filesize mismatch / "{str(chunk.path)}" changed? Read {cnt} but expected {len(buff_in)}.') + remaining -= cnt + else: + buff_in = None + + async def write_http(): + nonlocal buff_out + if not buff_out: + buff_out = bytearray(Defaults.FILE_BUFFER_SIZE) + else: + i, cnt = 0, len(buff_out) + while cnt > 0: + limited_n = int(await self.ul_limiter.acquire(cnt, Defaults.NETWORK_BUFFER_MIN)) + raw = memoryview(buff_out)[i:(i + limited_n)] + out = lz.compress(raw) if use_lz4 else raw + await response.write(out) + # await response.write(buff_out[i:(i + limited_n)]) + self.dl_limiter.unspend(limited_n - len(out)) + i += limited_n + cnt -= limited_n + + # Double buffered read & write + while remaining > 0: + await asyncio.gather(read_file(), write_http()) + buff_in, buff_out = buff_out, buff_in # Swap buffers + await write_http() # Write once more to flush buff_out + + if use_lz4: + await response.write(lz.flush()) + await response.write_eof() + return response, (time.time() - start_t) except asyncio.CancelledError as e: # If client disconnected, predict how long upload would have taken @@ -180,37 +193,40 @@ async def download_chunk(self, chunk: FileChunk, url: str, http_session: ClientS :param file_size: Size of complete file (optional). File will be truncated to this size. """ with suppress(RuntimeError): # Avoid dirty exit in aiofiles when Ctrl^C (RuntimeError('Event loop is closed') - async with http_session.get(url) as resp: + async with http_session.get(url, headers={'Accept-Encoding': 'lz4'}) as resp: if resp.status != 200: # some error raise IOError(f'HTTP status {resp.status}') else: - async with self.open_and_seek(chunk.path, chunk.pos, for_write=True) as outf: - #csum = HashFunc() - buff_in, buff_out = None, b'' - - async def read_http(): - nonlocal buff_in - limited_n = int(await self.dl_limiter.acquire( - Defaults.DOWNLOAD_BUFFER_MAX, Defaults.NETWORK_BUFFER_MIN)) - buff_in = await resp.content.read(limited_n) - self.dl_limiter.unspend(limited_n - len(buff_in)) - buff_in = buff_in if buff_in else None - - async def write_and_csum(): - nonlocal buff_out - #await asyncio.gather(outf.write(buff_out), csum.update_async(buff_out)) - await outf.write(buff_out) - - while buff_out is not None: - await asyncio.gather(read_http(), write_and_csum()) # Read, write and hash concurrently - buff_in, buff_out = buff_out, buff_in # Swap buffers - - # Checksuming here is actually waste of CPU since we'll rehash sync dir anyway when finished - #if csum.result() != chunk.hash: - # raise IOError(f'Checksum error verifying {chunk.hash} from {url}') - - if file_size >= 0: - await outf.truncate(file_size) + use_lz4 = 'lz4' in str(resp.headers.get('Content-Encoding')) + with lz4.frame.LZ4FrameDecompressor() as lz: + async with self.open_and_seek(chunk.path, chunk.pos, for_write=True) as outf: + #csum = HashFunc() + buff_in, buff_out = None, b'' + + async def read_http(): + nonlocal buff_in + limited_n = int(await self.dl_limiter.acquire( + Defaults.DOWNLOAD_BUFFER_MAX, Defaults.NETWORK_BUFFER_MIN)) + buff_in = await resp.content.read(limited_n) + self.dl_limiter.unspend(limited_n - len(buff_in)) + buff_in = buff_in if buff_in else None + + async def write_and_csum(): + nonlocal buff_out + #await asyncio.gather(outf.write(buff_out), csum.update_async(buff_out)) + dec = lz.decompress(buff_out) if use_lz4 else buff_out + await outf.write(dec) + + while buff_out is not None: + await asyncio.gather(read_http(), write_and_csum()) # Read, write and hash concurrently + buff_in, buff_out = buff_out, buff_in # Swap buffers + + # Checksuming here is actually waste of CPU since we'll rehash sync dir anyway when finished + #if csum.result() != chunk.hash: + # raise IOError(f'Checksum error verifying {chunk.hash} from {url}') + + if file_size >= 0: + await outf.truncate(file_size) async def change_mtime(self, path, mtime): diff --git a/lanscatter/masternode.py b/lanscatter/masternode.py index ef104bd..17b1aac 100644 --- a/lanscatter/masternode.py +++ b/lanscatter/masternode.py @@ -1,6 +1,6 @@ from aiohttp import web, WSMsgType from pathlib import Path -from typing import Callable, Optional, Awaitable +from typing import Callable, Optional, Awaitable, Dict from json.decoder import JSONDecodeError import asyncio, traceback, html import concurrent.futures @@ -238,7 +238,7 @@ async def send_loop(): with suppress(asyncio.TimeoutError): msg = await asyncio.wait_for(send_queue.get(), timeout=1) if msg and not ws.closed: - await ws.send_json(msg) + await ws.send_json(msg, compress=9) if msg and msg.get('action') == 'fatal': self.status_func(log_info=f'Sent fatal error to client. Kicking them out: {str(msg)}"') await ws.close() @@ -356,7 +356,7 @@ def progress_func_adapter(cur_filename, file_progress, total_progress): while True: # TODO: integrate with inotify (watchdog package) to avoid frequent rescans new_batch, errors = await scan_dir(base_dir, chunk_size=chunk_size, old_batch=server.file_server.batch, - progress_func=progress_func_adapter) + progress_func=progress_func_adapter, test_compress=True) for i, e in enumerate(errors): status_func(log_error=f'- Dir scan error #{i}: {e}') if new_batch != server.file_server.batch: diff --git a/lanscatter/peernode.py b/lanscatter/peernode.py index 2333644..dc4b66f 100644 --- a/lanscatter/peernode.py +++ b/lanscatter/peernode.py @@ -78,6 +78,7 @@ async def local_file_fixups(self, max_recursions=4): # Print status report path_diff = self.local_batch.file_tree_diff(self.remote_batch) + self.status_func(log_info=f'LOCAL: Difference stats: ' f'{len(chunk_diff.there_only)} missing chunks, ' f'{len(path_diff.with_different_attribs) + len(path_diff.there_only)} altered / ' @@ -99,7 +100,7 @@ async def local_file_fixups(self, max_recursions=4): self.status_func(log_info=f'LOCAL: Copying {missing.hash} from "{dupe.path}"/{dupe.pos}' f' to "{missing.path}"/{missing.pos}') if await self.file_io.copy_chunk_locally(copy_from=dupe, copy_to=missing): - self.local_batch.add(chunks=(dupe,)) + self.local_batch.add(chunks=(missing,)) else: self.status_func(log_info=f'LOCAL: Hash {missing.hash} was not in "{dupe.path}"/{dupe.pos} ' 'anymore (was probably overwritten). Forgetting it.') @@ -142,7 +143,7 @@ async def local_file_fixups(self, max_recursions=4): self.local_batch.discard(paths=[f.path]) elif here.mtime == there.mtime: - self.status_func(log_info=f'LOCAL: File "{here.path}" is has wrong content but was' + self.status_func(log_info=f'LOCAL: File "{here.path}" has wrong content but was' f' set to target time. Resetting it to "now".') here.mtime = time.time() await self.file_io.change_mtime(here.path, here.mtime) @@ -259,6 +260,7 @@ async def error(txt): self.status_func(log_info=f'Chunks size is {int(new_batch.chunk_size/1024/1024+0.5)} MB ' f'({new_batch.chunk_size} bytes).') self.remote_batch = new_batch + # self.status_func(log_debug='Initial sync batch:' + str(self.remote_batch)) self.full_rescan_trigger.set() elif action == 'new_batch': @@ -295,7 +297,7 @@ async def server_connection_loop(self, server_url: str): while not self.exit_trigger.is_set(): try: async with aiohttp.ClientSession() as session: - async with session.ws_connect(server_url) as ws: + async with session.ws_connect(server_url, compress=15) as ws: self.status_func(log_info=f'Master server connected.') # Read send_queue and pass them to websocket @@ -377,11 +379,12 @@ def __hash_dir_progress_func(cur_filename, file_progress, total_progress): self.status_func(log_debug='Rescanning local files.') new_local_batch, errors = await scan_dir( str(self.file_io.basedir), chunk_size=self.remote_batch.chunk_size, - old_batch=self.local_batch, progress_func=__hash_dir_progress_func) + old_batch=self.local_batch, progress_func=__hash_dir_progress_func, test_compress=False) for i,e in enumerate(errors): self.status_func(log_error=f'- Dir scan error #{i}: {e}') self.status_func(log_debug='Rescan finished.') + new_local_batch.copy_chunk_compress_ratios_from(self.remote_batch) different_from_remote = self.remote_batch != new_local_batch if not self.joined_swarm or new_local_batch != self.local_batch or different_from_remote: diff --git a/requirements.cli.txt b/requirements.cli.txt index 53ae4a0..a14e74f 100644 --- a/requirements.cli.txt +++ b/requirements.cli.txt @@ -5,3 +5,4 @@ pytest-timeout packaging async-timeout psutil +lz4 diff --git a/requirements.txt b/requirements.txt index f4b8e03..f74bce8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,6 +8,7 @@ appdirs packaging async-timeout psutil +lz4 pytest pytest-timeout diff --git a/tests/test_integration.py b/tests/test_integration.py index 806b611..32e4aea 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -203,8 +203,8 @@ async def read_respose_msgs(ws): # Wait - for x in range(8): - print(f"Waiting {8-x} seconds for nodes before terminating...") + for x in range(10): + print(f"Waiting {10-x} seconds for nodes before terminating...") time.sleep(1) # Kill processes