Skip to content

Commit

Permalink
Use LZ4 for compressible chunks (only). Bump protocol version major.
Browse files Browse the repository at this point in the history
  • Loading branch information
elonen committed Jan 6, 2020
1 parent dd9494b commit 6d1f514
Show file tree
Hide file tree
Showing 9 changed files with 124 additions and 110 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,14 @@ Features and notable differences to Btsync/Resilio, Syncthing and Dropbox-like s
* Keeps traffic inside the LAN (doesn't connect to any third party servers).
* Resilient against slow individual nodes. Transfers from slow peers are detected, aborted and avoided afterwards.
* Does _not_ preserve Unix file attributes (for now), as Windows doesn't support them.
* Master never modifies sync directory - it treat is as _read only_.
* Master never modifies sync directory - it treats it as _read only_.
* Supports bandwidth limiting.

## Technologies

Lanscatter is built on _Python 3.7_ using asyncio (aiohttp & aiofiles),
_wxPython_ for cross-platform GUI, _Blake2b_ algorithm for chunk hashing, _pytest_ for unit / integration tests
and _pyinstaller_ for packaging / freezing into exe files.
Lanscatter is built on **Python 3.7** using **asyncio** (aiohttp & aiofiles),
**wxPython** for cross-platform GUI, **Blake2b** algorithm for chunk hashing, **LZ4** for in-flight compression,
**pytest** for unit / integration tests and **pyinstaller** for packaging / freezing into exe files.

## Site-to-site distribution

Expand Down
45 changes: 19 additions & 26 deletions lanscatter/chunker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import mmap
from concurrent.futures import ThreadPoolExecutor, Future, as_completed, CancelledError
from pathlib import Path, PurePosixPath
from contextlib import suppress
import lz4.frame

# Tools for scanning files in a directory and splitting them into hashed chunks.
# MasterNode and PeerNode both use this for maintaining and syncing their state.
Expand All @@ -29,6 +29,7 @@ class FileChunk(HashableBase):
path: str # path + filename
pos: int # chunk start position in bytes
size: int # chunk size in bytes
cmpratio: float # Compression ratio (compressed size / original size)
hash: HashType # Hex checksum of data contents (blake2, digest_size=12)


Expand Down Expand Up @@ -164,6 +165,11 @@ def chunk_diff(self, there: 'SyncBatch'):
there_only=there_chunks - self.chunks,
here_only=self.chunks - there_chunks)

def copy_chunk_compress_ratios_from(self, other):
"""Copy (replace) chunk compression ratio values from given other FileBatch."""
cmp_ratios = {c.hash: c.cmpratio for c in other.chunks if c.cmpratio is not None}
self.chunks = set((FileChunk(path=c.path, pos=c.pos, size=c.size, hash=c.hash, cmpratio=cmp_ratios.get(c.hash)) for c in self.chunks))

def all_hashes(self) -> Set[HashType]:
"""Return set of unique hashes in the batch"""
return set((c.hash for c in self.chunks))
Expand All @@ -187,7 +193,7 @@ def from_dict(data: Dict) -> 'SyncBatch':
return res


def _hash_file(basedir: str, relpath: str, chunk_size: int, executor, progress_func: Callable) -> \
def _hash_file(basedir: str, relpath: str, chunk_size: int, executor, progress_func: Callable, test_compress: bool) -> \
Generator[Future, None, None]:
"""
Split given file into chunks and return multithreading futures that hash them.
Expand All @@ -204,28 +210,33 @@ def _hash_file(basedir: str, relpath: str, chunk_size: int, executor, progress_f
path = Path(basedir) / relpath
files_size = path.stat().st_size
if files_size == 0:
yield executor.submit(lambda: FileChunk(path=relpath, pos=0, size=0, hash=HashFunc().result()))
yield executor.submit(lambda: FileChunk(path=relpath, pos=0, size=0, hash=HashFunc().result(), cmpratio=1.0))
else:
with open(path, 'r+b') as f:
mm = mmap.mmap(f.fileno(), 0)
for pos in range(0, files_size, chunk_size):
def do_hash(p, s):
progress_func(relpath, s, p, files_size)
return FileChunk(path=relpath, pos=p, size=s, hash=HashFunc().update(mm[p:(p+s)]).result())
h = HashFunc().update(mm[p:(p+s)]).result()
compress_ratio = 1.0 if not test_compress else \
min(1.0, float("%.2g" % (len(lz4.frame.compress(mm[p:(p+s)])) / s)))
return FileChunk(path=relpath, pos=p, size=s, hash=h, cmpratio=compress_ratio)
yield executor.submit(do_hash, pos, min(chunk_size, files_size - pos))


async def scan_dir(basedir: str, chunk_size: int, old_batch: Optional[SyncBatch], progress_func: Callable) ->\
async def scan_dir(basedir: str, chunk_size: int, old_batch: Optional[SyncBatch], progress_func: Callable, test_compress: bool) ->\
Tuple[SyncBatch, Iterable[str]]:
"""
Scan given directory and generate a list of FileChunks of its contents. If old_chunks is provided,
assumes contents haven't changed if mtime and size are identical.
assumes contents haven't changed if mtime and size are identical. Optionally tests how compressible chunks are.
:param basedir: Folders to scan
:param progress_func: Progress report callback - func(cur_filename, file_progress, total_progress)
:param chunk_size: Length of chunk to split files into
:param old_batch: If given, compares dir it and skips hashing files with identical size & mtime
:return: Tuple(New list of FileChunks or old_chunks if no changes are detected, List[errors])
:param test_compress: Test for compressibility while hashing
:return: Tuple(New list of FileChunks or old_chunks if no changes are detected, List[errors],
Dict[<hash>: compress_ratio, ...])
"""
errors = []
fnames = []
Expand Down Expand Up @@ -276,7 +287,7 @@ def file_progress(path, just_read, pos, file_size):
futures = []
for fn in files_needing_rehash:
try:
futures.extend(_hash_file(basedir, fn, chunk_size, pool, file_progress))
futures.extend(_hash_file(basedir, fn, chunk_size, pool, file_progress, test_compress=test_compress))
except (OSError, IOError) as e:
errors.append(f'[{fn}]: ' + str(e))
for f in as_completed(futures):
Expand Down Expand Up @@ -311,21 +322,3 @@ def file_progress(path, just_read, pos, file_size):
assert '\\' not in x.path, f"Non-posix path: '{x.path}'"

return res, errors


# ----------------------------

async def main():

def progress(cur_filename, file_progress, total_progress):
print(cur_filename, file_progress, total_progress)

src = await scan_dir('sync-source/', 100*1000*1000, old_batch=None, progress_func=progress)
trg = await scan_dir('sync-target/', 100*1000*1000, old_batch=None, progress_func=progress)
fdiff = trg.file_tree_diff(src)
print(fdiff)
cdiff = trg.chunk_diff(src)
print(cdiff)

if __name__ == "__main__":
asyncio.run(main())
4 changes: 2 additions & 2 deletions lanscatter/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ class Defaults:
CONCURRENT_TRANSFERS_PEER = 2
DIR_SCAN_INTERVAL_PEER = 60

APP_VERSION = '0.1.1'
PROTOCOL_VERSION = '1.0.0'
APP_VERSION = '0.1.2'
PROTOCOL_VERSION = '2.0.0'


def drop_process_priority():
Expand Down
154 changes: 85 additions & 69 deletions lanscatter/fileio.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Tuple, Optional
from contextlib import suppress
import aiofiles, os, time, asyncio, aiohttp, mmap
import lz4.frame

from .common import Defaults
from .chunker import FileChunk, HashFunc
Expand Down Expand Up @@ -78,54 +79,66 @@ async def upload_chunk(self, chunk: FileChunk, request: web.Request)\
:param chunk: Chunk to read
:param request: HTTP request to answer
:param use_lz4: Compress with LZ4 if client accepts it
:return: Tuple(Aiohttp.response, float(seconds the upload took) or None if it no progress was made)
"""
use_lz4 = (chunk.cmpratio < 0.95) and ('lz4' in str(request.headers.get('Accept-Encoding')))
response = None
remaining = chunk.size
start_t = time.time()
try:
async with self.open_and_seek(chunk.path, chunk.pos, for_write=False) as f:
# Ok, read chunk from file and stream it out
response = web.StreamResponse(
status=200,
reason='OK',
headers={'Content-Type': 'application/octet-stream', 'Content-Disposition': 'inline'})
await response.prepare(request)

buff_in, buff_out = bytearray(Defaults.FILE_BUFFER_SIZE), None

async def read_file():
nonlocal buff_in, remaining, chunk
if remaining > 0:
if remaining < len(buff_in):
buff_in = bytearray(remaining)
cnt = await f.readinto(buff_in)
if cnt != len(buff_in):
raise web.HTTPNotFound(reason=f'Filesize mismatch / "{str(chunk.path)}" changed? Read {cnt} but expected {len(buff_in)}.')
remaining -= cnt
else:
buff_in = None

async def write_http():
nonlocal buff_out
if not buff_out:
buff_out = bytearray(Defaults.FILE_BUFFER_SIZE)
else:
i, cnt = 0, len(buff_out)
while cnt > 0:
limited_n = int(await self.ul_limiter.acquire(cnt, Defaults.NETWORK_BUFFER_MIN))
await response.write(buff_out[i:(i + limited_n)])
i += limited_n
cnt -= limited_n

# Double buffered read & write
while remaining > 0:
await asyncio.gather(read_file(), write_http())
buff_in, buff_out = buff_out, buff_in # Swap buffers
await write_http() # Write once more to flush buff_out

await response.write_eof()
return response, (time.time() - start_t)
with lz4.frame.LZ4FrameCompressor() as lz:
# Ok, read chunk from file and stream it out
response = web.StreamResponse(
status=200,
reason='OK',
headers={'Content-Type': 'application/octet-stream', 'Content-Disposition': 'inline',
'Content-Encoding': 'lz4' if use_lz4 else 'None'})
await response.prepare(request)
if use_lz4:
await response.write(lz.begin())

buff_in, buff_out = bytearray(Defaults.FILE_BUFFER_SIZE), None

async def read_file():
nonlocal buff_in, remaining, chunk
if remaining > 0:
if remaining < len(buff_in):
buff_in = bytearray(remaining)
cnt = await f.readinto(buff_in)
if cnt != len(buff_in):
raise web.HTTPNotFound(reason=f'Filesize mismatch / "{str(chunk.path)}" changed? Read {cnt} but expected {len(buff_in)}.')
remaining -= cnt
else:
buff_in = None

async def write_http():
nonlocal buff_out
if not buff_out:
buff_out = bytearray(Defaults.FILE_BUFFER_SIZE)
else:
i, cnt = 0, len(buff_out)
while cnt > 0:
limited_n = int(await self.ul_limiter.acquire(cnt, Defaults.NETWORK_BUFFER_MIN))
raw = memoryview(buff_out)[i:(i + limited_n)]
out = lz.compress(raw) if use_lz4 else raw
await response.write(out)
# await response.write(buff_out[i:(i + limited_n)])
self.dl_limiter.unspend(limited_n - len(out))
i += limited_n
cnt -= limited_n

# Double buffered read & write
while remaining > 0:
await asyncio.gather(read_file(), write_http())
buff_in, buff_out = buff_out, buff_in # Swap buffers
await write_http() # Write once more to flush buff_out

if use_lz4:
await response.write(lz.flush())
await response.write_eof()
return response, (time.time() - start_t)

except asyncio.CancelledError as e:
# If client disconnected, predict how long upload would have taken
Expand Down Expand Up @@ -180,37 +193,40 @@ async def download_chunk(self, chunk: FileChunk, url: str, http_session: ClientS
:param file_size: Size of complete file (optional). File will be truncated to this size.
"""
with suppress(RuntimeError): # Avoid dirty exit in aiofiles when Ctrl^C (RuntimeError('Event loop is closed')
async with http_session.get(url) as resp:
async with http_session.get(url, headers={'Accept-Encoding': 'lz4'}) as resp:
if resp.status != 200: # some error
raise IOError(f'HTTP status {resp.status}')
else:
async with self.open_and_seek(chunk.path, chunk.pos, for_write=True) as outf:
#csum = HashFunc()
buff_in, buff_out = None, b''

async def read_http():
nonlocal buff_in
limited_n = int(await self.dl_limiter.acquire(
Defaults.DOWNLOAD_BUFFER_MAX, Defaults.NETWORK_BUFFER_MIN))
buff_in = await resp.content.read(limited_n)
self.dl_limiter.unspend(limited_n - len(buff_in))
buff_in = buff_in if buff_in else None

async def write_and_csum():
nonlocal buff_out
#await asyncio.gather(outf.write(buff_out), csum.update_async(buff_out))
await outf.write(buff_out)

while buff_out is not None:
await asyncio.gather(read_http(), write_and_csum()) # Read, write and hash concurrently
buff_in, buff_out = buff_out, buff_in # Swap buffers

# Checksuming here is actually waste of CPU since we'll rehash sync dir anyway when finished
#if csum.result() != chunk.hash:
# raise IOError(f'Checksum error verifying {chunk.hash} from {url}')

if file_size >= 0:
await outf.truncate(file_size)
use_lz4 = 'lz4' in str(resp.headers.get('Content-Encoding'))
with lz4.frame.LZ4FrameDecompressor() as lz:
async with self.open_and_seek(chunk.path, chunk.pos, for_write=True) as outf:
#csum = HashFunc()
buff_in, buff_out = None, b''

async def read_http():
nonlocal buff_in
limited_n = int(await self.dl_limiter.acquire(
Defaults.DOWNLOAD_BUFFER_MAX, Defaults.NETWORK_BUFFER_MIN))
buff_in = await resp.content.read(limited_n)
self.dl_limiter.unspend(limited_n - len(buff_in))
buff_in = buff_in if buff_in else None

async def write_and_csum():
nonlocal buff_out
#await asyncio.gather(outf.write(buff_out), csum.update_async(buff_out))
dec = lz.decompress(buff_out) if use_lz4 else buff_out
await outf.write(dec)

while buff_out is not None:
await asyncio.gather(read_http(), write_and_csum()) # Read, write and hash concurrently
buff_in, buff_out = buff_out, buff_in # Swap buffers

# Checksuming here is actually waste of CPU since we'll rehash sync dir anyway when finished
#if csum.result() != chunk.hash:
# raise IOError(f'Checksum error verifying {chunk.hash} from {url}')

if file_size >= 0:
await outf.truncate(file_size)


async def change_mtime(self, path, mtime):
Expand Down
6 changes: 3 additions & 3 deletions lanscatter/masternode.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from aiohttp import web, WSMsgType
from pathlib import Path
from typing import Callable, Optional, Awaitable
from typing import Callable, Optional, Awaitable, Dict
from json.decoder import JSONDecodeError
import asyncio, traceback, html
import concurrent.futures
Expand Down Expand Up @@ -238,7 +238,7 @@ async def send_loop():
with suppress(asyncio.TimeoutError):
msg = await asyncio.wait_for(send_queue.get(), timeout=1)
if msg and not ws.closed:
await ws.send_json(msg)
await ws.send_json(msg, compress=9)
if msg and msg.get('action') == 'fatal':
self.status_func(log_info=f'Sent fatal error to client. Kicking them out: {str(msg)}"')
await ws.close()
Expand Down Expand Up @@ -356,7 +356,7 @@ def progress_func_adapter(cur_filename, file_progress, total_progress):
while True:
# TODO: integrate with inotify (watchdog package) to avoid frequent rescans
new_batch, errors = await scan_dir(base_dir, chunk_size=chunk_size, old_batch=server.file_server.batch,
progress_func=progress_func_adapter)
progress_func=progress_func_adapter, test_compress=True)
for i, e in enumerate(errors):
status_func(log_error=f'- Dir scan error #{i}: {e}')
if new_batch != server.file_server.batch:
Expand Down
Loading

0 comments on commit 6d1f514

Please sign in to comment.