Skip to content

Commit

Permalink
Add exclusive write (#651)
Browse files Browse the repository at this point in the history
  • Loading branch information
martindurant authored Dec 11, 2024
1 parent 290f572 commit 5c2b70c
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 19 deletions.
70 changes: 51 additions & 19 deletions gcsfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ def _location():
-------
valid http location
"""
_emulator_location = os.getenv("STORAGE_EMULATOR_HOST", None)
if _emulator_location:
_emulator_location = os.getenv("STORAGE_EMULATOR_HOST", "")
if _emulator_location not in {"default", "", None}:
if not any(
_emulator_location.startswith(scheme) for scheme in ("http://", "https://")
):
Expand Down Expand Up @@ -222,6 +222,10 @@ class GCSFileSystem(asyn.AsyncFileSystem):
In the default case the cache is never expired. This may be controlled via the ``cache_timeout``
GCSFileSystem parameter or via explicit calls to ``GCSFileSystem.invalidate_cache``.
NOTE on "exclusive" mode: mode=="create"" (in pipe and put) and open(mode="xb") are supported on an
experimental basis. The test harness does not currently support this, so use at your
own risk.
Parameters
----------
project : string
Expand Down Expand Up @@ -1332,13 +1336,14 @@ async def _pipe_file(
content_type="application/octet-stream",
fixed_key_metadata=None,
chunksize=50 * 2**20,
mode="overwrite",
):
# enforce blocksize should be a multiple of 2**18
consistency = consistency or self.consistency
bucket, key, generation = self.split_path(path)
size = len(data)
out = None
if size < 5 * 2**20:
if size < chunksize:
location = await simple_upload(
self,
bucket,
Expand All @@ -1348,6 +1353,7 @@ async def _pipe_file(
consistency,
content_type,
fixed_key_metadata=fixed_key_metadata,
mode=mode,
)
else:
location = await initiate_upload(
Expand All @@ -1357,12 +1363,20 @@ async def _pipe_file(
content_type,
metadata,
fixed_key_metadata=fixed_key_metadata,
mode=mode,
)
for offset in range(0, len(data), chunksize):
bit = data[offset : offset + chunksize]
out = await upload_chunk(
self, location, bit, offset, size, content_type
try:
for offset in range(0, len(data), chunksize):
bit = data[offset : offset + chunksize]
out = await upload_chunk(
self, location, bit, offset, size, content_type
)
except Exception:
await self._call(
"DELETE",
location.replace("&ifGenerationMatch=0", ""),
)
raise

checker = get_consistency_checker(consistency)
checker.update(data)
Expand All @@ -1381,6 +1395,7 @@ async def _put_file(
chunksize=50 * 2**20,
callback=None,
fixed_key_metadata=None,
mode="overwrite",
**kwargs,
):
# enforce blocksize should be a multiple of 2**18
Expand All @@ -1407,6 +1422,7 @@ async def _put_file(
metadatain=metadata,
content_type=content_type,
fixed_key_metadata=fixed_key_metadata,
mode=mode,
)
callback.absolute_update(size)

Expand All @@ -1418,18 +1434,26 @@ async def _put_file(
content_type,
metadata=metadata,
fixed_key_metadata=fixed_key_metadata,
mode=mode,
)
offset = 0
while True:
bit = f0.read(chunksize)
if not bit:
break
out = await upload_chunk(
self, location, bit, offset, size, content_type
try:
while True:
bit = f0.read(chunksize)
if not bit:
break
out = await upload_chunk(
self, location, bit, offset, size, content_type
)
offset += len(bit)
callback.absolute_update(offset)
checker.update(bit)
except Exception:
await self._call(
"DELETE",
self.location.replace("&ifGenerationMatch=0", ""),
)
offset += len(bit)
callback.absolute_update(offset)
checker.update(bit)
raise

checker.validate_json_response(out)

Expand Down Expand Up @@ -1780,7 +1804,7 @@ def __init__(
self.fixed_key_metadata = _convert_fixed_key_metadata(det, from_google=True)
self.fixed_key_metadata.update(fixed_key_metadata or {})
self.timeout = timeout
if mode == "wb":
if mode in {"wb", "xb"}:
if self.blocksize < GCS_MIN_BLOCK_SIZE:
warnings.warn("Setting block size to minimum value, 2**18")
self.blocksize = GCS_MIN_BLOCK_SIZE
Expand Down Expand Up @@ -1886,6 +1910,7 @@ def _initiate_upload(self):
self.content_type,
self.metadata,
self.fixed_key_metadata,
mode="create" if "x" in self.mode else "overwrite",
timeout=self.timeout,
)

Expand All @@ -1898,7 +1923,7 @@ def discard(self):
return
self.gcsfs.call(
"DELETE",
self.location,
self.location.replace("&ifGenerationMatch=0", ""),
)
self.location = None
self.closed = True
Expand All @@ -1918,6 +1943,7 @@ def _simple_upload(self):
self.consistency,
self.content_type,
self.fixed_key_metadata,
mode="create" if "x" in self.mode else "overwrite",
timeout=self.timeout,
)

Expand Down Expand Up @@ -1989,17 +2015,20 @@ async def initiate_upload(
content_type="application/octet-stream",
metadata=None,
fixed_key_metadata=None,
mode="overwrie",
):
j = {"name": key}
if metadata:
j["metadata"] = metadata
kw = {"ifGenerationMatch": "0"} if mode == "create" else {}
j.update(_convert_fixed_key_metadata(fixed_key_metadata))
headers, _ = await fs._call(
method="POST",
path=f"{fs._location}/upload/storage/v1/b/{quote(bucket)}/o",
path=f"{fs._location}/upload/storage/v1/b/{quote(bucket)}/o?name={quote(key)}",
uploadType="resumable",
json=j,
headers={"X-Upload-Content-Type": content_type},
**kw,
)
loc = headers["Location"]
out = loc[0] if isinstance(loc, list) else loc # <- for CVR responses
Expand All @@ -2017,12 +2046,14 @@ async def simple_upload(
consistency=None,
content_type="application/octet-stream",
fixed_key_metadata=None,
mode="overwrite",
):
checker = get_consistency_checker(consistency)
path = f"{fs._location}/upload/storage/v1/b/{quote(bucket)}/o"
metadata = {"name": key}
if metadatain is not None:
metadata["metadata"] = metadatain
kw = {"ifGenerationMatch": "0"} if mode == "create" else {}
metadata.update(_convert_fixed_key_metadata(fixed_key_metadata))
metadata = json.dumps(metadata)
template = (
Expand All @@ -2039,6 +2070,7 @@ async def simple_upload(
headers={"Content-Type": 'multipart/related; boundary="==0=="'},
data=UnclosableBytesIO(data),
json_out=True,
**kw,
)
checker.update(datain)
checker.validate_json_response(j)
2 changes: 2 additions & 0 deletions gcsfs/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ def validate_response(status, content, path, args=None):

if status == 403:
raise OSError(f"Forbidden: {path}\n{msg}")
elif status == 412:
raise FileExistsError(path)
elif status == 502:
raise requests.exceptions.ProxyError()
elif "invalid" in str(msg):
Expand Down
13 changes: 13 additions & 0 deletions gcsfs/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1532,3 +1532,16 @@ def test_sign(gcs, monkeypatch):

response = requests.get(result)
assert response.text == "This is a test string"


@pytest.mark.xfail(reason="emulator does not support condition")
def test_write_x_mpu(gcs):
fn = TEST_BUCKET + "/test.file"
with gcs.open(fn, mode="xb", block_size=5 * 2**20) as f:
assert f.mode == "xb"
f.write(b"0" * 5 * 2**20)
f.write(b"done")
with pytest.raises(FileExistsError):
with gcs.open(fn, mode="xb", block_size=5 * 2**20) as f:
f.write(b"0" * 5 * 2**20)
f.write(b"done")

0 comments on commit 5c2b70c

Please sign in to comment.