Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

put_file: support concurrent multipart uploads with max_concurrency #848

Merged
merged 2 commits into from
Feb 27, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 76 additions & 19 deletions s3fs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,14 @@ class S3FileSystem(AsyncFileSystem):
session : aiobotocore AioSession object to be used for all connections.
This session will be used inplace of creating a new session inside S3FileSystem.
For example: aiobotocore.session.AioSession(profile='test_user')
max_concurrency : int (1)
The maximum number of concurrent transfers to use per file for multipart
upload (``put()``) operations. Defaults to 1 (sequential). When used in
conjunction with ``S3FileSystem.put(batch_size=...)`` the maximum number of
simultaneous connections is ``max_concurrency * batch_size``. We may extend
this parameter to affect ``pipe()``, ``cat()`` and ``get()``. Increasing this
value will result in higher memory usage during multipart upload operations (by
``max_concurrency * chunksize`` bytes per file).

The following parameters are passed on to fsspec:

Expand Down Expand Up @@ -282,6 +290,7 @@ def __init__(
cache_regions=False,
asynchronous=False,
loop=None,
max_concurrency=1,
**kwargs,
):
if key and username:
Expand Down Expand Up @@ -319,6 +328,9 @@ def __init__(
self.cache_regions = cache_regions
self._s3 = None
self.session = session
if max_concurrency < 1:
raise ValueError("max_concurrency must be >= 1")
self.max_concurrency = max_concurrency

@property
def s3(self):
Expand Down Expand Up @@ -1140,7 +1152,13 @@ async def _pipe_file(self, path, data, chunksize=50 * 2**20, **kwargs):
self.invalidate_cache(path)

async def _put_file(
self, lpath, rpath, callback=_DEFAULT_CALLBACK, chunksize=50 * 2**20, **kwargs
self,
lpath,
rpath,
callback=_DEFAULT_CALLBACK,
chunksize=50 * 2**20,
max_concurrency=None,
**kwargs,
):
bucket, key, _ = self.split_path(rpath)
if os.path.isdir(lpath):
Expand Down Expand Up @@ -1169,24 +1187,15 @@ async def _put_file(
mpu = await self._call_s3(
"create_multipart_upload", Bucket=bucket, Key=key, **kwargs
)

out = []
while True:
chunk = f0.read(chunksize)
if not chunk:
break
out.append(
await self._call_s3(
"upload_part",
Bucket=bucket,
PartNumber=len(out) + 1,
UploadId=mpu["UploadId"],
Body=chunk,
Key=key,
)
)
callback.relative_update(len(chunk))

out = await self._upload_file_part_concurrent(
bucket,
key,
mpu,
f0,
callback=callback,
chunksize=chunksize,
max_concurrency=max_concurrency,
)
parts = [
{"PartNumber": i + 1, "ETag": o["ETag"]} for i, o in enumerate(out)
]
Expand All @@ -1201,6 +1210,54 @@ async def _put_file(
self.invalidate_cache(rpath)
rpath = self._parent(rpath)

async def _upload_file_part_concurrent(
self,
bucket,
key,
mpu,
f0,
callback=_DEFAULT_CALLBACK,
chunksize=50 * 2**20,
max_concurrency=None,
):
max_concurrency = max_concurrency or self.max_concurrency
if max_concurrency < 1:
raise ValueError("max_concurrency must be >= 1")

async def _upload_chunk(chunk, part_number):
result = await self._call_s3(
"upload_part",
Bucket=bucket,
PartNumber=part_number,
UploadId=mpu["UploadId"],
Body=chunk,
Key=key,
)
callback.relative_update(len(chunk))
return result

out = []
while True:
chunks = []
for i in range(max_concurrency):
chunk = f0.read(chunksize)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somewhere we need a caveat, that increasing concurrency will lead to high memory use.

if chunk:
chunks.append(chunk)
if not chunks:
break
if len(chunks) > 1:
out.extend(
await asyncio.gather(
martindurant marked this conversation as resolved.
Show resolved Hide resolved
*[
_upload_chunk(chunk, len(out) + i)
for i, chunk in enumerate(chunks, 1)
]
)
)
else:
out.append(await _upload_chunk(chunk, len(out) + 1))
martindurant marked this conversation as resolved.
Show resolved Hide resolved
return out

async def _get_file(
self, rpath, lpath, callback=_DEFAULT_CALLBACK, version_id=None
):
Expand Down
Loading