From 94bc52a459cc1259f26a4cadf7ef67b26bec9f8d Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Fri, 6 Dec 2024 15:46:26 -0500 Subject: [PATCH] Add X mode --- gcsfs/core.py | 62 +++++++++++++++++++++++++++++----------- gcsfs/retry.py | 2 ++ gcsfs/tests/test_core.py | 13 +++++++++ 3 files changed, 60 insertions(+), 17 deletions(-) diff --git a/gcsfs/core.py b/gcsfs/core.py index 7decb734..f1e1d5da 100644 --- a/gcsfs/core.py +++ b/gcsfs/core.py @@ -1332,13 +1332,14 @@ async def _pipe_file( content_type="application/octet-stream", fixed_key_metadata=None, chunksize=50 * 2**20, + mode="overwrite", ): # enforce blocksize should be a multiple of 2**18 consistency = consistency or self.consistency bucket, key, generation = self.split_path(path) size = len(data) out = None - if size < 5 * 2**20: + if size < chunksize: location = await simple_upload( self, bucket, @@ -1348,6 +1349,7 @@ async def _pipe_file( consistency, content_type, fixed_key_metadata=fixed_key_metadata, + mode=mode, ) else: location = await initiate_upload( @@ -1357,12 +1359,20 @@ async def _pipe_file( content_type, metadata, fixed_key_metadata=fixed_key_metadata, + mode=mode, ) - for offset in range(0, len(data), chunksize): - bit = data[offset : offset + chunksize] - out = await upload_chunk( - self, location, bit, offset, size, content_type + try: + for offset in range(0, len(data), chunksize): + bit = data[offset : offset + chunksize] + out = await upload_chunk( + self, location, bit, offset, size, content_type + ) + except Exception: + await self._call( + "DELETE", + location.replace("&ifGenerationMatch=0", ""), ) + raise checker = get_consistency_checker(consistency) checker.update(data) @@ -1381,6 +1391,7 @@ async def _put_file( chunksize=50 * 2**20, callback=None, fixed_key_metadata=None, + mode="overwrite", **kwargs, ): # enforce blocksize should be a multiple of 2**18 @@ -1407,6 +1418,7 @@ async def _put_file( metadatain=metadata, content_type=content_type, fixed_key_metadata=fixed_key_metadata, + mode=mode, ) callback.absolute_update(size) @@ -1418,18 +1430,26 @@ async def _put_file( content_type, metadata=metadata, fixed_key_metadata=fixed_key_metadata, + mode=mode, ) offset = 0 - while True: - bit = f0.read(chunksize) - if not bit: - break - out = await upload_chunk( - self, location, bit, offset, size, content_type + try: + while True: + bit = f0.read(chunksize) + if not bit: + break + out = await upload_chunk( + self, location, bit, offset, size, content_type + ) + offset += len(bit) + callback.absolute_update(offset) + checker.update(bit) + except Exception: + await self._call( + "DELETE", + self.location.replace("&ifGenerationMatch=0", ""), ) - offset += len(bit) - callback.absolute_update(offset) - checker.update(bit) + raise checker.validate_json_response(out) @@ -1780,7 +1800,7 @@ def __init__( self.fixed_key_metadata = _convert_fixed_key_metadata(det, from_google=True) self.fixed_key_metadata.update(fixed_key_metadata or {}) self.timeout = timeout - if mode == "wb": + if mode in {"wb", "xb"}: if self.blocksize < GCS_MIN_BLOCK_SIZE: warnings.warn("Setting block size to minimum value, 2**18") self.blocksize = GCS_MIN_BLOCK_SIZE @@ -1886,6 +1906,7 @@ def _initiate_upload(self): self.content_type, self.metadata, self.fixed_key_metadata, + mode="create" if "x" in self.mode else "overwrite", timeout=self.timeout, ) @@ -1898,7 +1919,7 @@ def discard(self): return self.gcsfs.call( "DELETE", - self.location, + self.location.replace("&ifGenerationMatch=0", ""), ) self.location = None self.closed = True @@ -1918,6 +1939,7 @@ def _simple_upload(self): self.consistency, self.content_type, self.fixed_key_metadata, + mode="create" if "x" in self.mode else "overwrite", timeout=self.timeout, ) @@ -1989,17 +2011,20 @@ async def initiate_upload( content_type="application/octet-stream", metadata=None, fixed_key_metadata=None, + mode="overwrie", ): j = {"name": key} if metadata: j["metadata"] = metadata + kw = {"ifGenerationMatch": "0"} if mode == "create" else {} j.update(_convert_fixed_key_metadata(fixed_key_metadata)) headers, _ = await fs._call( method="POST", - path=f"{fs._location}/upload/storage/v1/b/{quote(bucket)}/o", + path=f"{fs._location}/upload/storage/v1/b/{quote(bucket)}/o?name={quote(key)}", uploadType="resumable", json=j, headers={"X-Upload-Content-Type": content_type}, + **kw, ) loc = headers["Location"] out = loc[0] if isinstance(loc, list) else loc # <- for CVR responses @@ -2017,12 +2042,14 @@ async def simple_upload( consistency=None, content_type="application/octet-stream", fixed_key_metadata=None, + mode="overwrite", ): checker = get_consistency_checker(consistency) path = f"{fs._location}/upload/storage/v1/b/{quote(bucket)}/o" metadata = {"name": key} if metadatain is not None: metadata["metadata"] = metadatain + kw = {"ifGenerationMatch": "0"} if mode == "create" else {} metadata.update(_convert_fixed_key_metadata(fixed_key_metadata)) metadata = json.dumps(metadata) template = ( @@ -2039,6 +2066,7 @@ async def simple_upload( headers={"Content-Type": 'multipart/related; boundary="==0=="'}, data=UnclosableBytesIO(data), json_out=True, + **kw, ) checker.update(datain) checker.validate_json_response(j) diff --git a/gcsfs/retry.py b/gcsfs/retry.py index 0d787a4e..0b02fbc2 100644 --- a/gcsfs/retry.py +++ b/gcsfs/retry.py @@ -109,6 +109,8 @@ def validate_response(status, content, path, args=None): if status == 403: raise OSError(f"Forbidden: {path}\n{msg}") + elif status == 412: + raise FileExistsError(path) elif status == 502: raise requests.exceptions.ProxyError() elif "invalid" in str(msg): diff --git a/gcsfs/tests/test_core.py b/gcsfs/tests/test_core.py index 2a0b4c0c..d282aa7a 100644 --- a/gcsfs/tests/test_core.py +++ b/gcsfs/tests/test_core.py @@ -1532,3 +1532,16 @@ def test_sign(gcs, monkeypatch): response = requests.get(result) assert response.text == "This is a test string" + + +@pytest.mark.xfail(reason="emulator does not support condition") +def test_write_x_mpu(gcs): + fn = TEST_BUCKET + "/test.file" + with gcs.open(fn, mode="xb", block_size=5 * 2**20) as f: + assert f.mode == "xb" + f.write(b"0" * 5 * 2**20) + f.write(b"done") + with pytest.raises(FileExistsError): + with gcs.open(fn, mode="xb", block_size=5 * 2**20) as f: + f.write(b"0" * 5 * 2**20) + f.write(b"done")