Skip to content

Commit

Permalink
Copy object operation (#1052)
Browse files Browse the repository at this point in the history
* Copy operation

Signed-off-by: rajdchak <[email protected]>

* Rebased from main

Signed-off-by: rajdchak <[email protected]>

* Addressed some comments

Signed-off-by: rajdchak <[email protected]>

* Addressing commentds

Signed-off-by: rajdchak <[email protected]>

* Updated changelog

Signed-off-by: rajdchak <[email protected]>

* Updated changelog comment

Signed-off-by: rajdchak <[email protected]>

---------

Signed-off-by: rajdchak <[email protected]>
  • Loading branch information
rajdchak authored Oct 16, 2024
1 parent e411e02 commit de6c1bc
Show file tree
Hide file tree
Showing 10 changed files with 363 additions and 23 deletions.
1 change: 1 addition & 0 deletions mountpoint-s3-client/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

### Other changes

* Add support for copy object operation. ([#1052](https://github.com/awslabs/mountpoint-s3/pull/1052))
* Address a threading issue in the s2n-tls library that could result in premature cleanup and `NULL pointer` errors. ([aws/s2n-tls#4584](https://github.com/aws/s2n-tls/pull/4584))
* Inaccurate reporting of `s3.client.buffer_pool.primary_allocated` CRT statistic is fixed. ([awslabs/aws-c-s3#453](https://github.com/awslabs/aws-c-s3/pull/453))
* Expose `s3.client.buffer_pool.forced_used` metric which account for buffer allocations that could exceed memory limit in the CRT buffer pool. ([#1025](https://github.com/awslabs/mountpoint-s3/pull/1025))
Expand Down
22 changes: 18 additions & 4 deletions mountpoint-s3-client/src/failure_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ use mountpoint_s3_crt::s3::client::BufferPoolUsageStats;
use pin_project::pin_project;

use crate::object_client::{
DeleteObjectError, DeleteObjectResult, ETag, GetBodyPart, GetObjectAttributesError, GetObjectAttributesResult,
GetObjectError, GetObjectRequest, HeadObjectError, HeadObjectResult, ListObjectsError, ListObjectsResult,
ObjectAttribute, ObjectClient, ObjectClientError, ObjectClientResult, PutObjectError, PutObjectParams,
PutObjectRequest, PutObjectResult, PutObjectSingleParams, UploadReview,
CopyObjectError, CopyObjectParams, CopyObjectResult, DeleteObjectError, DeleteObjectResult, ETag, GetBodyPart,
GetObjectAttributesError, GetObjectAttributesResult, GetObjectError, GetObjectRequest, HeadObjectError,
HeadObjectResult, ListObjectsError, ListObjectsResult, ObjectAttribute, ObjectClient, ObjectClientError,
ObjectClientResult, PutObjectError, PutObjectParams, PutObjectRequest, PutObjectResult, PutObjectSingleParams,
UploadReview,
};

// Wrapper for injecting failures into a get stream or a put request
Expand Down Expand Up @@ -98,6 +99,19 @@ where
self.client.delete_object(bucket, key).await
}

async fn copy_object(
&self,
source_bucket: &str,
source_key: &str,
destination_bucket: &str,
destination_key: &str,
params: &CopyObjectParams,
) -> ObjectClientResult<CopyObjectResult, CopyObjectError, Self::ClientError> {
self.client
.copy_object(source_bucket, source_key, destination_bucket, destination_key, params)
.await
}

async fn get_object(
&self,
bucket: &str,
Expand Down
13 changes: 7 additions & 6 deletions mountpoint-s3-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,11 @@ pub mod config {
/// Types used by all object clients
pub mod types {
pub use super::object_client::{
Checksum, ChecksumAlgorithm, DeleteObjectResult, ETag, GetBodyPart, GetObjectAttributesParts,
GetObjectAttributesResult, GetObjectRequest, HeadObjectResult, ListObjectsResult, ObjectAttribute,
ObjectClientResult, ObjectInfo, ObjectPart, PutObjectParams, PutObjectResult, PutObjectSingleParams,
PutObjectTrailingChecksums, RestoreStatus, UploadChecksum, UploadReview, UploadReviewPart,
Checksum, ChecksumAlgorithm, CopyObjectParams, CopyObjectResult, DeleteObjectResult, ETag, GetBodyPart,
GetObjectAttributesParts, GetObjectAttributesResult, GetObjectRequest, HeadObjectResult, ListObjectsResult,
ObjectAttribute, ObjectClientResult, ObjectInfo, ObjectPart, PutObjectParams, PutObjectResult,
PutObjectSingleParams, PutObjectTrailingChecksums, RestoreStatus, UploadChecksum, UploadReview,
UploadReviewPart,
};
}

Expand All @@ -86,8 +87,8 @@ pub mod types {
/// client errors. See its documentation for more details.
pub mod error {
pub use super::object_client::{
DeleteObjectError, GetObjectAttributesError, GetObjectError, HeadObjectError, ListObjectsError,
ObjectClientError, PutObjectError,
CopyObjectError, DeleteObjectError, GetObjectAttributesError, GetObjectError, HeadObjectError,
ListObjectsError, ObjectClientError, PutObjectError,
};
#[doc(hidden)]
pub use super::s3_crt_client::HeadBucketError;
Expand Down
78 changes: 73 additions & 5 deletions mountpoint-s3-client/src/mock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ use tracing::trace;
use crate::checksums::crc32c_to_base64;
use crate::error_metadata::{ClientErrorMetadata, ProvideErrorMetadata};
use crate::object_client::{
Checksum, ChecksumAlgorithm, DeleteObjectError, DeleteObjectResult, ETag, GetBodyPart, GetObjectAttributesError,
GetObjectAttributesParts, GetObjectAttributesResult, GetObjectError, GetObjectRequest, HeadObjectError,
HeadObjectResult, ListObjectsError, ListObjectsResult, ObjectAttribute, ObjectClient, ObjectClientError,
ObjectClientResult, ObjectInfo, ObjectPart, PutObjectError, PutObjectParams, PutObjectRequest, PutObjectResult,
PutObjectSingleParams, PutObjectTrailingChecksums, RestoreStatus, UploadReview, UploadReviewPart,
Checksum, ChecksumAlgorithm, CopyObjectError, CopyObjectParams, CopyObjectResult, DeleteObjectError,
DeleteObjectResult, ETag, GetBodyPart, GetObjectAttributesError, GetObjectAttributesParts,
GetObjectAttributesResult, GetObjectError, GetObjectRequest, HeadObjectError, HeadObjectResult, ListObjectsError,
ListObjectsResult, ObjectAttribute, ObjectClient, ObjectClientError, ObjectClientResult, ObjectInfo, ObjectPart,
PutObjectError, PutObjectParams, PutObjectRequest, PutObjectResult, PutObjectSingleParams,
PutObjectTrailingChecksums, RestoreStatus, UploadReview, UploadReviewPart,
};

mod leaky_bucket;
Expand Down Expand Up @@ -341,6 +342,7 @@ pub enum Operation {
GetObjectAttributes,
ListObjectsV2,
PutObject,
CopyObject,
PutObjectSingle,
}

Expand Down Expand Up @@ -603,6 +605,28 @@ impl ObjectClient for MockClient {
Ok(DeleteObjectResult {})
}

async fn copy_object(
&self,
source_bucket: &str,
source_key: &str,
destination_bucket: &str,
destination_key: &str,
_params: &CopyObjectParams,
) -> ObjectClientResult<CopyObjectResult, CopyObjectError, Self::ClientError> {
if destination_bucket != self.config.bucket && source_bucket != self.config.bucket {
return Err(ObjectClientError::ServiceError(CopyObjectError::NotFound));
}

let mut objects = self.objects.write().unwrap();
if let Some(object) = objects.get(source_key) {
let cloned_object = object.clone();
objects.insert(destination_key.to_owned(), cloned_object);
Ok(CopyObjectResult {})
} else {
Err(ObjectClientError::ServiceError(CopyObjectError::NotFound))
}
}

async fn get_object(
&self,
bucket: &str,
Expand Down Expand Up @@ -1159,6 +1183,50 @@ mod tests {
let next = get_request.next().await.expect("result should not be empty");
assert_client_error!(next, "empty read window");
}
#[tokio::test]
async fn test_copy_object() {
let bucket = "test_bucket";
let src_key = "src_copy_key";
let dst_key = "dst_copy_key";
let client = MockClient::new(MockClientConfig {
bucket: bucket.to_string(),
part_size: 1024,
unordered_list_seed: None,
..Default::default()
});

client.add_object(src_key, "test_body".into());

client
.copy_object(bucket, src_key, bucket, dst_key, &Default::default())
.await
.expect("Should not fail");

client
.get_object(bucket, dst_key, None, None)
.await
.expect("get_object should succeed");
}

#[tokio::test]
async fn test_copy_object_non_existing_key() {
let bucket = "test_bucket";
let src_key = "src_copy_key";
let dst_key = "dst_copy_key";
let client = MockClient::new(MockClientConfig {
bucket: bucket.to_string(),
part_size: 1024,
unordered_list_seed: None,
..Default::default()
});

assert!(matches!(
client
.copy_object(bucket, src_key, bucket, dst_key, &Default::default())
.await,
Err(ObjectClientError::ServiceError(CopyObjectError::NotFound))
));
}

#[tokio::test]
async fn list_object_dirs() {
Expand Down
21 changes: 17 additions & 4 deletions mountpoint-s3-client/src/mock_client/throughput_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ use crate::mock_client::{
MockClient, MockClientConfig, MockClientError, MockGetObjectRequest, MockObject, MockPutObjectRequest,
};
use crate::object_client::{
DeleteObjectError, DeleteObjectResult, ETag, GetBodyPart, GetObjectAttributesError, GetObjectAttributesResult,
GetObjectError, GetObjectRequest, HeadObjectError, HeadObjectResult, ListObjectsError, ListObjectsResult,
ObjectAttribute, ObjectClient, ObjectClientResult, PutObjectError, PutObjectParams, PutObjectResult,
PutObjectSingleParams,
CopyObjectError, CopyObjectParams, CopyObjectResult, DeleteObjectError, DeleteObjectResult, ETag, GetBodyPart,
GetObjectAttributesError, GetObjectAttributesResult, GetObjectError, GetObjectRequest, HeadObjectError,
HeadObjectResult, ListObjectsError, ListObjectsResult, ObjectAttribute, ObjectClient, ObjectClientResult,
PutObjectError, PutObjectParams, PutObjectResult, PutObjectSingleParams,
};

/// A [MockClient] that rate limits overall download throughput to simulate a target network
Expand Down Expand Up @@ -126,6 +126,19 @@ impl ObjectClient for ThroughputMockClient {
self.inner.delete_object(bucket, key).await
}

async fn copy_object(
&self,
source_bucket: &str,
source_key: &str,
destination_bucket: &str,
destination_key: &str,
params: &CopyObjectParams,
) -> ObjectClientResult<CopyObjectResult, CopyObjectError, Self::ClientError> {
self.inner
.copy_object(source_bucket, source_key, destination_bucket, destination_key, params)
.await
}

async fn get_object(
&self,
bucket: &str,
Expand Down
49 changes: 49 additions & 0 deletions mountpoint-s3-client/src/object_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,22 @@ pub trait ObjectClient {
key: &str,
) -> ObjectClientResult<DeleteObjectResult, DeleteObjectError, Self::ClientError>;

/// Create a copy of an existing object. Currently, this functionality has the following limitations:
/// - Supported only for copying between matching bucket types:
/// - Standard S3 to Standard S3 buckets.
/// - S3 Express to S3 Express buckets.
/// - Host header must use virtual host addressing style (path style is not supported) and both source and dest buckets must have dns compliant name.
/// - Only {bucket}/{key} format is supported for source and passing arn as source will not work.
/// - Source bucket is assumed to be in the same region as destination bucket.
async fn copy_object(
&self,
source_bucket: &str,
source_key: &str,
destination_bucket: &str,
destination_key: &str,
params: &CopyObjectParams,
) -> ObjectClientResult<CopyObjectResult, CopyObjectError, Self::ClientError>;

/// Get an object from the object store. Returns a stream of body parts of the object. Parts are
/// guaranteed to be returned by the stream in order and contiguously.
async fn get_object(
Expand Down Expand Up @@ -226,6 +242,39 @@ pub enum DeleteObjectError {
NoSuchBucket,
}

/// Result of a [`copy_object`](ObjectClient::copy_object) request
#[derive(Debug)]
#[non_exhaustive]
pub struct CopyObjectResult {
// TODO: Populate this struct with return fields from the S3 API, e.g., etag.
}

/// Errors returned by a [`copy_object`](ObjectClient::copy_object) request
#[derive(Debug, Error, PartialEq, Eq)]
#[non_exhaustive]
pub enum CopyObjectError {
/// Note that CopyObject cannot distinguish between NoSuchBucket and NoSuchKey errors
#[error("The object was not found")]
NotFound,

#[error("The source object of the COPY action is not in the active tier and is only stored in Amazon S3 Glacier.")]
ObjectNotInActiveTierError,
}

/// Parameters to a [`copy_object`](ObjectClient::copy_object) request
#[derive(Debug, Default, Clone)]
#[non_exhaustive]
pub struct CopyObjectParams {
// TODO: Populate this struct with fields as and when required to satisfy various use cases.
}

impl CopyObjectParams {
/// Create a default [CopyObjectParams].
pub fn new() -> Self {
Self::default()
}
}

/// Result of a [`get_object_attributes`](ObjectClient::get_object_attributes) request
#[derive(Debug, Default)]
pub struct GetObjectAttributesResult {
Expand Down
23 changes: 19 additions & 4 deletions mountpoint-s3-client/src/s3_crt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ macro_rules! request_span {
($self:expr, $method:expr) => { request_span!($self, $method,) };
}

pub(crate) mod copy_object;
pub(crate) mod delete_object;
pub(crate) mod get_object;

Expand All @@ -65,11 +66,10 @@ pub(crate) mod get_object_attributes;
pub(crate) mod head_object;
pub(crate) mod list_objects;

pub(crate) mod put_object;
pub(crate) use put_object::S3PutObjectRequest;

pub(crate) mod head_bucket;
pub(crate) mod put_object;
pub use head_bucket::HeadBucketError;
pub(crate) use put_object::S3PutObjectRequest;

/// `tracing` doesn't allow dynamic levels but we want to dynamically choose the log level for
/// requests based on their response status. https://github.com/tokio-rs/tracing/issues/372
Expand Down Expand Up @@ -527,7 +527,6 @@ impl S3CrtClientInner {
let options = Self::new_meta_request_options(message, operation);
self.make_meta_request_from_options(options, request_span, |_| {}, on_headers, on_body, on_finish)
}

/// Make an HTTP request using this S3 client that invokes the given callbacks as the request
/// makes progress. See [make_meta_request] for arguments.
fn make_meta_request_from_options<T: Send + 'static, E: std::error::Error + Send + 'static>(
Expand Down Expand Up @@ -803,6 +802,7 @@ enum S3Operation {
HeadObject,
ListObjects,
PutObject,
CopyObject,
PutObjectSingle,
}

Expand All @@ -812,6 +812,7 @@ impl S3Operation {
match self {
S3Operation::GetObject => MetaRequestType::GetObject,
S3Operation::PutObject => MetaRequestType::PutObject,
S3Operation::CopyObject => MetaRequestType::CopyObject,
_ => MetaRequestType::Default,
}
}
Expand All @@ -827,6 +828,7 @@ impl S3Operation {
S3Operation::HeadObject => Some("HeadObject"),
S3Operation::ListObjects => Some("ListObjectsV2"),
S3Operation::PutObject => None,
S3Operation::CopyObject => None,
S3Operation::PutObjectSingle => Some("PutObject"),
}
}
Expand Down Expand Up @@ -1092,6 +1094,7 @@ fn request_type_to_metrics_string(request_type: RequestType) -> &'static str {
RequestType::AbortMultipartUpload => "AbortMultipartUpload",
RequestType::CompleteMultipartUpload => "CompleteMultipartUpload",
RequestType::UploadPartCopy => "UploadPartCopy",
RequestType::CopyObject => "CopyObject",
RequestType::PutObject => "PutObject",
}
}
Expand Down Expand Up @@ -1262,6 +1265,18 @@ impl ObjectClient for S3CrtClient {
self.delete_object(bucket, key).await
}

async fn copy_object(
&self,
source_bucket: &str,
source_key: &str,
destination_bucket: &str,
destination_key: &str,
params: &CopyObjectParams,
) -> ObjectClientResult<CopyObjectResult, CopyObjectError, S3RequestError> {
self.copy_object(source_bucket, source_key, destination_bucket, destination_key, params)
.await
}

async fn get_object(
&self,
bucket: &str,
Expand Down
Loading

0 comments on commit de6c1bc

Please sign in to comment.