diff --git a/mountpoint-s3-client/CHANGELOG.md b/mountpoint-s3-client/CHANGELOG.md index b0ba63d2b..7fbf0fba1 100644 --- a/mountpoint-s3-client/CHANGELOG.md +++ b/mountpoint-s3-client/CHANGELOG.md @@ -2,6 +2,7 @@ ### Other changes +* Add support for copy object operation. ([#1052](https://github.com/awslabs/mountpoint-s3/pull/1052)) * Address a threading issue in the s2n-tls library that could result in premature cleanup and `NULL pointer` errors. ([aws/s2n-tls#4584](https://github.com/aws/s2n-tls/pull/4584)) * Inaccurate reporting of `s3.client.buffer_pool.primary_allocated` CRT statistic is fixed. ([awslabs/aws-c-s3#453](https://github.com/awslabs/aws-c-s3/pull/453)) * Expose `s3.client.buffer_pool.forced_used` metric which account for buffer allocations that could exceed memory limit in the CRT buffer pool. ([#1025](https://github.com/awslabs/mountpoint-s3/pull/1025)) diff --git a/mountpoint-s3-client/src/failure_client.rs b/mountpoint-s3-client/src/failure_client.rs index 339e946c0..3abbe29d3 100644 --- a/mountpoint-s3-client/src/failure_client.rs +++ b/mountpoint-s3-client/src/failure_client.rs @@ -15,10 +15,11 @@ use mountpoint_s3_crt::s3::client::BufferPoolUsageStats; use pin_project::pin_project; use crate::object_client::{ - DeleteObjectError, DeleteObjectResult, ETag, GetBodyPart, GetObjectAttributesError, GetObjectAttributesResult, - GetObjectError, GetObjectRequest, HeadObjectError, HeadObjectResult, ListObjectsError, ListObjectsResult, - ObjectAttribute, ObjectClient, ObjectClientError, ObjectClientResult, PutObjectError, PutObjectParams, - PutObjectRequest, PutObjectResult, PutObjectSingleParams, UploadReview, + CopyObjectError, CopyObjectParams, CopyObjectResult, DeleteObjectError, DeleteObjectResult, ETag, GetBodyPart, + GetObjectAttributesError, GetObjectAttributesResult, GetObjectError, GetObjectRequest, HeadObjectError, + HeadObjectResult, ListObjectsError, ListObjectsResult, ObjectAttribute, ObjectClient, ObjectClientError, + ObjectClientResult, PutObjectError, PutObjectParams, PutObjectRequest, PutObjectResult, PutObjectSingleParams, + UploadReview, }; // Wrapper for injecting failures into a get stream or a put request @@ -98,6 +99,19 @@ where self.client.delete_object(bucket, key).await } + async fn copy_object( + &self, + source_bucket: &str, + source_key: &str, + destination_bucket: &str, + destination_key: &str, + params: &CopyObjectParams, + ) -> ObjectClientResult { + self.client + .copy_object(source_bucket, source_key, destination_bucket, destination_key, params) + .await + } + async fn get_object( &self, bucket: &str, diff --git a/mountpoint-s3-client/src/lib.rs b/mountpoint-s3-client/src/lib.rs index ab53cde8c..861ff9739 100644 --- a/mountpoint-s3-client/src/lib.rs +++ b/mountpoint-s3-client/src/lib.rs @@ -72,10 +72,11 @@ pub mod config { /// Types used by all object clients pub mod types { pub use super::object_client::{ - Checksum, ChecksumAlgorithm, DeleteObjectResult, ETag, GetBodyPart, GetObjectAttributesParts, - GetObjectAttributesResult, GetObjectRequest, HeadObjectResult, ListObjectsResult, ObjectAttribute, - ObjectClientResult, ObjectInfo, ObjectPart, PutObjectParams, PutObjectResult, PutObjectSingleParams, - PutObjectTrailingChecksums, RestoreStatus, UploadChecksum, UploadReview, UploadReviewPart, + Checksum, ChecksumAlgorithm, CopyObjectParams, CopyObjectResult, DeleteObjectResult, ETag, GetBodyPart, + GetObjectAttributesParts, GetObjectAttributesResult, GetObjectRequest, HeadObjectResult, ListObjectsResult, + ObjectAttribute, ObjectClientResult, ObjectInfo, ObjectPart, PutObjectParams, PutObjectResult, + PutObjectSingleParams, PutObjectTrailingChecksums, RestoreStatus, UploadChecksum, UploadReview, + UploadReviewPart, }; } @@ -86,8 +87,8 @@ pub mod types { /// client errors. See its documentation for more details. pub mod error { pub use super::object_client::{ - DeleteObjectError, GetObjectAttributesError, GetObjectError, HeadObjectError, ListObjectsError, - ObjectClientError, PutObjectError, + CopyObjectError, DeleteObjectError, GetObjectAttributesError, GetObjectError, HeadObjectError, + ListObjectsError, ObjectClientError, PutObjectError, }; #[doc(hidden)] pub use super::s3_crt_client::HeadBucketError; diff --git a/mountpoint-s3-client/src/mock_client.rs b/mountpoint-s3-client/src/mock_client.rs index 2b1e1a630..c60c00d64 100644 --- a/mountpoint-s3-client/src/mock_client.rs +++ b/mountpoint-s3-client/src/mock_client.rs @@ -25,11 +25,12 @@ use tracing::trace; use crate::checksums::crc32c_to_base64; use crate::error_metadata::{ClientErrorMetadata, ProvideErrorMetadata}; use crate::object_client::{ - Checksum, ChecksumAlgorithm, DeleteObjectError, DeleteObjectResult, ETag, GetBodyPart, GetObjectAttributesError, - GetObjectAttributesParts, GetObjectAttributesResult, GetObjectError, GetObjectRequest, HeadObjectError, - HeadObjectResult, ListObjectsError, ListObjectsResult, ObjectAttribute, ObjectClient, ObjectClientError, - ObjectClientResult, ObjectInfo, ObjectPart, PutObjectError, PutObjectParams, PutObjectRequest, PutObjectResult, - PutObjectSingleParams, PutObjectTrailingChecksums, RestoreStatus, UploadReview, UploadReviewPart, + Checksum, ChecksumAlgorithm, CopyObjectError, CopyObjectParams, CopyObjectResult, DeleteObjectError, + DeleteObjectResult, ETag, GetBodyPart, GetObjectAttributesError, GetObjectAttributesParts, + GetObjectAttributesResult, GetObjectError, GetObjectRequest, HeadObjectError, HeadObjectResult, ListObjectsError, + ListObjectsResult, ObjectAttribute, ObjectClient, ObjectClientError, ObjectClientResult, ObjectInfo, ObjectPart, + PutObjectError, PutObjectParams, PutObjectRequest, PutObjectResult, PutObjectSingleParams, + PutObjectTrailingChecksums, RestoreStatus, UploadReview, UploadReviewPart, }; mod leaky_bucket; @@ -341,6 +342,7 @@ pub enum Operation { GetObjectAttributes, ListObjectsV2, PutObject, + CopyObject, PutObjectSingle, } @@ -603,6 +605,28 @@ impl ObjectClient for MockClient { Ok(DeleteObjectResult {}) } + async fn copy_object( + &self, + source_bucket: &str, + source_key: &str, + destination_bucket: &str, + destination_key: &str, + _params: &CopyObjectParams, + ) -> ObjectClientResult { + if destination_bucket != self.config.bucket && source_bucket != self.config.bucket { + return Err(ObjectClientError::ServiceError(CopyObjectError::NotFound)); + } + + let mut objects = self.objects.write().unwrap(); + if let Some(object) = objects.get(source_key) { + let cloned_object = object.clone(); + objects.insert(destination_key.to_owned(), cloned_object); + Ok(CopyObjectResult {}) + } else { + Err(ObjectClientError::ServiceError(CopyObjectError::NotFound)) + } + } + async fn get_object( &self, bucket: &str, @@ -1159,6 +1183,50 @@ mod tests { let next = get_request.next().await.expect("result should not be empty"); assert_client_error!(next, "empty read window"); } + #[tokio::test] + async fn test_copy_object() { + let bucket = "test_bucket"; + let src_key = "src_copy_key"; + let dst_key = "dst_copy_key"; + let client = MockClient::new(MockClientConfig { + bucket: bucket.to_string(), + part_size: 1024, + unordered_list_seed: None, + ..Default::default() + }); + + client.add_object(src_key, "test_body".into()); + + client + .copy_object(bucket, src_key, bucket, dst_key, &Default::default()) + .await + .expect("Should not fail"); + + client + .get_object(bucket, dst_key, None, None) + .await + .expect("get_object should succeed"); + } + + #[tokio::test] + async fn test_copy_object_non_existing_key() { + let bucket = "test_bucket"; + let src_key = "src_copy_key"; + let dst_key = "dst_copy_key"; + let client = MockClient::new(MockClientConfig { + bucket: bucket.to_string(), + part_size: 1024, + unordered_list_seed: None, + ..Default::default() + }); + + assert!(matches!( + client + .copy_object(bucket, src_key, bucket, dst_key, &Default::default()) + .await, + Err(ObjectClientError::ServiceError(CopyObjectError::NotFound)) + )); + } #[tokio::test] async fn list_object_dirs() { diff --git a/mountpoint-s3-client/src/mock_client/throughput_client.rs b/mountpoint-s3-client/src/mock_client/throughput_client.rs index 1634eb4b1..bf651a67c 100644 --- a/mountpoint-s3-client/src/mock_client/throughput_client.rs +++ b/mountpoint-s3-client/src/mock_client/throughput_client.rs @@ -14,10 +14,10 @@ use crate::mock_client::{ MockClient, MockClientConfig, MockClientError, MockGetObjectRequest, MockObject, MockPutObjectRequest, }; use crate::object_client::{ - DeleteObjectError, DeleteObjectResult, ETag, GetBodyPart, GetObjectAttributesError, GetObjectAttributesResult, - GetObjectError, GetObjectRequest, HeadObjectError, HeadObjectResult, ListObjectsError, ListObjectsResult, - ObjectAttribute, ObjectClient, ObjectClientResult, PutObjectError, PutObjectParams, PutObjectResult, - PutObjectSingleParams, + CopyObjectError, CopyObjectParams, CopyObjectResult, DeleteObjectError, DeleteObjectResult, ETag, GetBodyPart, + GetObjectAttributesError, GetObjectAttributesResult, GetObjectError, GetObjectRequest, HeadObjectError, + HeadObjectResult, ListObjectsError, ListObjectsResult, ObjectAttribute, ObjectClient, ObjectClientResult, + PutObjectError, PutObjectParams, PutObjectResult, PutObjectSingleParams, }; /// A [MockClient] that rate limits overall download throughput to simulate a target network @@ -126,6 +126,19 @@ impl ObjectClient for ThroughputMockClient { self.inner.delete_object(bucket, key).await } + async fn copy_object( + &self, + source_bucket: &str, + source_key: &str, + destination_bucket: &str, + destination_key: &str, + params: &CopyObjectParams, + ) -> ObjectClientResult { + self.inner + .copy_object(source_bucket, source_key, destination_bucket, destination_key, params) + .await + } + async fn get_object( &self, bucket: &str, diff --git a/mountpoint-s3-client/src/object_client.rs b/mountpoint-s3-client/src/object_client.rs index 5cc51314f..ad81bf5cc 100644 --- a/mountpoint-s3-client/src/object_client.rs +++ b/mountpoint-s3-client/src/object_client.rs @@ -56,6 +56,22 @@ pub trait ObjectClient { key: &str, ) -> ObjectClientResult; + /// Create a copy of an existing object. Currently, this functionality has the following limitations: + /// - Supported only for copying between matching bucket types: + /// - Standard S3 to Standard S3 buckets. + /// - S3 Express to S3 Express buckets. + /// - Host header must use virtual host addressing style (path style is not supported) and both source and dest buckets must have dns compliant name. + /// - Only {bucket}/{key} format is supported for source and passing arn as source will not work. + /// - Source bucket is assumed to be in the same region as destination bucket. + async fn copy_object( + &self, + source_bucket: &str, + source_key: &str, + destination_bucket: &str, + destination_key: &str, + params: &CopyObjectParams, + ) -> ObjectClientResult; + /// Get an object from the object store. Returns a stream of body parts of the object. Parts are /// guaranteed to be returned by the stream in order and contiguously. async fn get_object( @@ -226,6 +242,39 @@ pub enum DeleteObjectError { NoSuchBucket, } +/// Result of a [`copy_object`](ObjectClient::copy_object) request +#[derive(Debug)] +#[non_exhaustive] +pub struct CopyObjectResult { + // TODO: Populate this struct with return fields from the S3 API, e.g., etag. +} + +/// Errors returned by a [`copy_object`](ObjectClient::copy_object) request +#[derive(Debug, Error, PartialEq, Eq)] +#[non_exhaustive] +pub enum CopyObjectError { + /// Note that CopyObject cannot distinguish between NoSuchBucket and NoSuchKey errors + #[error("The object was not found")] + NotFound, + + #[error("The source object of the COPY action is not in the active tier and is only stored in Amazon S3 Glacier.")] + ObjectNotInActiveTierError, +} + +/// Parameters to a [`copy_object`](ObjectClient::copy_object) request +#[derive(Debug, Default, Clone)] +#[non_exhaustive] +pub struct CopyObjectParams { + // TODO: Populate this struct with fields as and when required to satisfy various use cases. +} + +impl CopyObjectParams { + /// Create a default [CopyObjectParams]. + pub fn new() -> Self { + Self::default() + } +} + /// Result of a [`get_object_attributes`](ObjectClient::get_object_attributes) request #[derive(Debug, Default)] pub struct GetObjectAttributesResult { diff --git a/mountpoint-s3-client/src/s3_crt_client.rs b/mountpoint-s3-client/src/s3_crt_client.rs index 33f2804fb..6e1b811f8 100644 --- a/mountpoint-s3-client/src/s3_crt_client.rs +++ b/mountpoint-s3-client/src/s3_crt_client.rs @@ -56,6 +56,7 @@ macro_rules! request_span { ($self:expr, $method:expr) => { request_span!($self, $method,) }; } +pub(crate) mod copy_object; pub(crate) mod delete_object; pub(crate) mod get_object; @@ -65,11 +66,10 @@ pub(crate) mod get_object_attributes; pub(crate) mod head_object; pub(crate) mod list_objects; -pub(crate) mod put_object; -pub(crate) use put_object::S3PutObjectRequest; - pub(crate) mod head_bucket; +pub(crate) mod put_object; pub use head_bucket::HeadBucketError; +pub(crate) use put_object::S3PutObjectRequest; /// `tracing` doesn't allow dynamic levels but we want to dynamically choose the log level for /// requests based on their response status. https://github.com/tokio-rs/tracing/issues/372 @@ -527,7 +527,6 @@ impl S3CrtClientInner { let options = Self::new_meta_request_options(message, operation); self.make_meta_request_from_options(options, request_span, |_| {}, on_headers, on_body, on_finish) } - /// Make an HTTP request using this S3 client that invokes the given callbacks as the request /// makes progress. See [make_meta_request] for arguments. fn make_meta_request_from_options( @@ -803,6 +802,7 @@ enum S3Operation { HeadObject, ListObjects, PutObject, + CopyObject, PutObjectSingle, } @@ -812,6 +812,7 @@ impl S3Operation { match self { S3Operation::GetObject => MetaRequestType::GetObject, S3Operation::PutObject => MetaRequestType::PutObject, + S3Operation::CopyObject => MetaRequestType::CopyObject, _ => MetaRequestType::Default, } } @@ -827,6 +828,7 @@ impl S3Operation { S3Operation::HeadObject => Some("HeadObject"), S3Operation::ListObjects => Some("ListObjectsV2"), S3Operation::PutObject => None, + S3Operation::CopyObject => None, S3Operation::PutObjectSingle => Some("PutObject"), } } @@ -1092,6 +1094,7 @@ fn request_type_to_metrics_string(request_type: RequestType) -> &'static str { RequestType::AbortMultipartUpload => "AbortMultipartUpload", RequestType::CompleteMultipartUpload => "CompleteMultipartUpload", RequestType::UploadPartCopy => "UploadPartCopy", + RequestType::CopyObject => "CopyObject", RequestType::PutObject => "PutObject", } } @@ -1262,6 +1265,18 @@ impl ObjectClient for S3CrtClient { self.delete_object(bucket, key).await } + async fn copy_object( + &self, + source_bucket: &str, + source_key: &str, + destination_bucket: &str, + destination_key: &str, + params: &CopyObjectParams, + ) -> ObjectClientResult { + self.copy_object(source_bucket, source_key, destination_bucket, destination_key, params) + .await + } + async fn get_object( &self, bucket: &str, diff --git a/mountpoint-s3-client/src/s3_crt_client/copy_object.rs b/mountpoint-s3-client/src/s3_crt_client/copy_object.rs new file mode 100644 index 000000000..422c07079 --- /dev/null +++ b/mountpoint-s3-client/src/s3_crt_client/copy_object.rs @@ -0,0 +1,97 @@ +use std::ops::Deref; +use std::os::unix::prelude::OsStrExt; + +use mountpoint_s3_crt::{http::request_response::Header, s3::client::MetaRequestResult}; + +use crate::object_client::{CopyObjectError, CopyObjectParams, CopyObjectResult, ObjectClientResult}; +use crate::s3_crt_client::{S3CrtClient, S3Operation, S3RequestError}; + +impl S3CrtClient { + /// Create and begin a new CopyObject request. + pub(super) async fn copy_object( + &self, + source_bucket: &str, + source_key: &str, + destination_bucket: &str, + destination_key: &str, + _params: &CopyObjectParams, + ) -> ObjectClientResult { + let request = { + let mut message = self + .inner + .new_request_template("PUT", destination_bucket) + .map_err(S3RequestError::construction_failure)?; + message + .set_request_path(format!("/{destination_key}")) + .map_err(S3RequestError::construction_failure)?; + message + .set_header(&Header::new( + "x-amz-copy-source", + format!("/{source_bucket}/{source_key}"), + )) + .map_err(S3RequestError::construction_failure)?; + + let span = request_span!( + self.inner, + "copy_object", + source_bucket, + source_key, + destination_bucket, + destination_key + ); + + self.inner + .make_simple_http_request(message, S3Operation::CopyObject, span, parse_copy_object_error)? + }; + + let _body = request.await?; + + Ok(CopyObjectResult {}) + } +} +fn parse_copy_object_error(result: &MetaRequestResult) -> Option { + match result.response_status { + 403 => { + let body = result.error_response_body.as_ref()?; + let root = xmltree::Element::parse(body.as_bytes()).ok()?; + let error_code = root.get_child("Code")?; + let error_str = error_code.get_text()?; + + match error_str.deref() { + "ObjectNotInActiveTierError" => Some(CopyObjectError::ObjectNotInActiveTierError), + _ => None, + } + } + 404 => Some(CopyObjectError::NotFound), + _ => None, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::ffi::{OsStr, OsString}; + + fn make_result(response_status: i32, body: impl Into) -> MetaRequestResult { + MetaRequestResult { + response_status, + crt_error: 1i32.into(), + error_response_headers: None, + error_response_body: Some(body.into()), + } + } + #[test] + fn parse_403_object_not_in_active_tier_error() { + let body = br#"ObjectNotInActiveTierErrorThe source object of the COPY action is not in the active tier and is only stored in Amazon S3 Glaciertest-bucketBHCQ0FTYY0HKMV43ntCK1jQfPxY7sSNL/GB13RttgJLjSETfIuOiuRnwImO0dQP2ttj2Qqpn5S/jSLt3Ql0TgHWuYF0="#; + let result = make_result(403, OsStr::from_bytes(&body[..])); + let result = parse_copy_object_error(&result); + assert_eq!(result, Some(CopyObjectError::ObjectNotInActiveTierError)); + } + #[test] + fn parse_404_error() { + let body = br#"test-bucketBHCQ0FTYY0HKMV43ntCK1jQfPxY7sSNL/GB13RttgJLjSETfIuOiuRnwImO0dQP2ttj2Qqpn5S/jSLt3Ql0TgHWuYF0="#; + let result = make_result(404, OsStr::from_bytes(&body[..])); + let result = parse_copy_object_error(&result); + assert_eq!(result, Some(CopyObjectError::NotFound)); + } +} diff --git a/mountpoint-s3-client/tests/copy_object.rs b/mountpoint-s3-client/tests/copy_object.rs new file mode 100644 index 000000000..3c1810949 --- /dev/null +++ b/mountpoint-s3-client/tests/copy_object.rs @@ -0,0 +1,79 @@ +#![cfg(feature = "s3_tests")] + +pub mod common; +use aws_sdk_s3::primitives::ByteStream; +use bytes::Bytes; +use common::*; +use mountpoint_s3_client::error::{CopyObjectError, ObjectClientError}; +use mountpoint_s3_client::S3RequestError; +use mountpoint_s3_client::{ObjectClient, S3CrtClient}; + +#[tokio::test] +async fn test_copy_objects() { + let sdk_client = get_test_sdk_client().await; + let (bucket, prefix) = get_test_bucket_and_prefix("test_copy_objects"); + + let key = format!("{prefix}/hello"); + let body = b"hello world!"; + sdk_client + .put_object() + .bucket(&bucket) + .key(&key) + .body(ByteStream::from(Bytes::from_static(body))) + .send() + .await + .unwrap(); + + let client: S3CrtClient = get_test_client(); + let copy_key = format!("{prefix}/hello2"); + + let _result = client + .copy_object(&bucket, &key, &bucket, ©_key, &Default::default()) + .await + .expect("copy_object operation should succeed"); + + sdk_client + .head_object() + .bucket(&bucket) + .key(©_key) + .send() + .await + .expect("copied object should exist"); +} + +#[tokio::test] +async fn test_copy_object_no_permission() { + let (_bucket, prefix) = get_test_bucket_and_prefix("test_copy_object_no_permission"); + let bucket = get_test_bucket_without_permissions(); + let key = format!("{prefix}/hello"); + let copy_key = format!("{prefix}/hello2"); + + let client: S3CrtClient = get_test_client(); + let result = client + .copy_object(&bucket, &key, &bucket, ©_key, &Default::default()) + .await; + + assert!(matches!( + result, + Err(ObjectClientError::ClientError(S3RequestError::Forbidden(_, _))) + )); +} + +#[tokio::test] +async fn test_copy_object_non_existing_key() { + let (bucket, prefix) = get_test_bucket_and_prefix("test_copy_objects"); + let key = format!("{prefix}/hello"); + let copy_key = format!("{prefix}/hello2"); + + let client: S3CrtClient = get_test_client(); + let result = client + .copy_object(&bucket, &key, &bucket, ©_key, &Default::default()) + .await; + + assert!(matches!( + result, + Err(ObjectClientError::ServiceError(CopyObjectError::NotFound)) + )); +} + +// TODO: Add integration test for cross bucket copy but before that need to set up a new environment variable for a new bucket. diff --git a/mountpoint-s3-crt/src/s3/client.rs b/mountpoint-s3-crt/src/s3/client.rs index bb33a0a25..19a22f347 100644 --- a/mountpoint-s3-crt/src/s3/client.rs +++ b/mountpoint-s3-crt/src/s3/client.rs @@ -1359,6 +1359,8 @@ pub enum RequestType { CompleteMultipartUpload, /// UploadPartCopy: https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPartCopy.html UploadPartCopy, + /// CopyObject: https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html + CopyObject, /// PutObject: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html PutObject, } @@ -1375,6 +1377,7 @@ impl From for RequestType { aws_s3_request_type::AWS_S3_REQUEST_TYPE_ABORT_MULTIPART_UPLOAD => RequestType::AbortMultipartUpload, aws_s3_request_type::AWS_S3_REQUEST_TYPE_COMPLETE_MULTIPART_UPLOAD => RequestType::CompleteMultipartUpload, aws_s3_request_type::AWS_S3_REQUEST_TYPE_UPLOAD_PART_COPY => RequestType::UploadPartCopy, + aws_s3_request_type::AWS_S3_REQUEST_TYPE_COPY_OBJECT => RequestType::CopyObject, aws_s3_request_type::AWS_S3_REQUEST_TYPE_PUT_OBJECT => RequestType::PutObject, _ => panic!("unknown request type {:?}", value), }