Skip to content

Commit

Permalink
reduce useless ObjectExists (#28156)
Browse files Browse the repository at this point in the history
replace ListBlobs() with GetProperties()
unified style std::string& / char*
config azure requestTimeoutMs

Signed-off-by: PowderLi <[email protected]>
  • Loading branch information
PowderLi authored Nov 7, 2023
1 parent af1c204 commit 7bb0fa9
Show file tree
Hide file tree
Showing 12 changed files with 392 additions and 137 deletions.
138 changes: 110 additions & 28 deletions internal/core/src/storage/AzureChunkManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,33 @@
#include <iostream>
#include <sstream>
#include "common/EasyAssert.h"
#include "log/Log.h"
#include "storage/AzureChunkManager.h"

namespace milvus {
namespace storage {

std::atomic<size_t> AzureChunkManager::init_count_(0);
std::mutex AzureChunkManager::client_mutex_;

void
AzureLogger(std::string const& level, std::string const& msg) {
LOG_SEGCORE_INFO_ << "[AZURE LOG] " << msg;
}

AzureChunkManager::AzureChunkManager(const StorageConfig& storage_config)
: default_bucket_name_(storage_config.bucket_name),
path_prefix_(storage_config.root_path) {
std::scoped_lock lock{client_mutex_};
const size_t initCount = init_count_++;
if (initCount == 0) {
// azure::AzureBlobChunkManager::InitLog(storage_config.log_level, AzureLogger);
}
client_ = std::make_shared<azure::AzureBlobChunkManager>(
storage_config.access_key_id,
storage_config.access_key_value,
storage_config.address,
storage_config.requestTimeoutMs,
storage_config.useIAM);
}

Expand All @@ -52,18 +67,19 @@ AzureChunkManager::Remove(const std::string& filepath) {

std::vector<std::string>
AzureChunkManager::ListWithPrefix(const std::string& filepath) {
return ListObjects(default_bucket_name_.c_str(), filepath.c_str());
return ListObjects(default_bucket_name_, filepath);
}

uint64_t
AzureChunkManager::Read(const std::string& filepath, void* buf, uint64_t size) {
if (!ObjectExists(default_bucket_name_, filepath)) {
try {
return GetObjectBuffer(default_bucket_name_, filepath, buf, size);
} catch (const std::exception& e) {
std::stringstream err_msg;
err_msg << "object('" << default_bucket_name_ << "', '" << filepath
<< "') not exists";
err_msg << "read object('" << default_bucket_name_ << "', '" << filepath
<< "' fail: " << e.what();
throw SegcoreError(ObjectNotExist, err_msg.str());
}
return GetObjectBuffer(default_bucket_name_, filepath, buf, size);
}

void
Expand All @@ -75,80 +91,146 @@ AzureChunkManager::Write(const std::string& filepath,

bool
AzureChunkManager::BucketExists(const std::string& bucket_name) {
return client_->BucketExists(bucket_name);
bool res;
try {
res = client_->BucketExists(bucket_name);
} catch (std::exception& err) {
ThrowAzureError("BucketExists", err, "params, bucket={}", bucket_name);
}
return res;
}

std::vector<std::string>
AzureChunkManager::ListBuckets() {
return client_->ListBuckets();
std::vector<std::string> res;
try {
res = client_->ListBuckets();
} catch (std::exception& err) {
ThrowAzureError("ListBuckets", err, "params");
}
return res;
}

bool
AzureChunkManager::CreateBucket(const std::string& bucket_name) {
bool res;
try {
client_->CreateBucket(bucket_name);
} catch (std::exception& e) {
throw SegcoreError(BucketInvalid, e.what());
res = client_->CreateBucket(bucket_name);
} catch (std::exception& err) {
ThrowAzureError("CreateBucket", err, "params, bucket={}", bucket_name);
}
return true;
return res;
}

bool
AzureChunkManager::DeleteBucket(const std::string& bucket_name) {
bool res;
try {
client_->DeleteBucket(bucket_name);
} catch (std::exception& e) {
throw SegcoreError(BucketInvalid, e.what());
res = client_->DeleteBucket(bucket_name);
} catch (std::exception& err) {
ThrowAzureError("DeleteBucket", err, "params, bucket={}", bucket_name);
}
return true;
return res;
}

bool
AzureChunkManager::ObjectExists(const std::string& bucket_name,
const std::string& object_name) {
return client_->ObjectExists(bucket_name, object_name);
bool res;
try {
res = client_->ObjectExists(bucket_name, object_name);
} catch (std::exception& err) {
ThrowAzureError("ObjectExists",
err,
"params, bucket={}, object={}",
bucket_name,
object_name);
}
return res;
}

int64_t
uint64_t
AzureChunkManager::GetObjectSize(const std::string& bucket_name,
const std::string& object_name) {
uint64_t res;
try {
return client_->GetObjectSize(bucket_name, object_name);
} catch (std::exception& e) {
throw SegcoreError(ObjectNotExist, e.what());
res = client_->GetObjectSize(bucket_name, object_name);
} catch (std::exception& err) {
ThrowAzureError("GetObjectSize",
err,
"params, bucket={}, object={}",
bucket_name,
object_name);
}
return res;
}

bool
AzureChunkManager::DeleteObject(const std::string& bucket_name,
const std::string& object_name) {
bool res;
try {
client_->DeleteObject(bucket_name, object_name);
} catch (std::exception& e) {
throw SegcoreError(ObjectNotExist, e.what());
res = client_->DeleteObject(bucket_name, object_name);
} catch (std::exception& err) {
ThrowAzureError("DeleteObject",
err,
"params, bucket={}, object={}",
bucket_name,
object_name);
}
return true;
return res;
}

bool
AzureChunkManager::PutObjectBuffer(const std::string& bucket_name,
const std::string& object_name,
void* buf,
uint64_t size) {
return client_->PutObjectBuffer(bucket_name, object_name, buf, size);
bool res;
try {
res = client_->PutObjectBuffer(bucket_name, object_name, buf, size);
} catch (std::exception& err) {
ThrowAzureError("PutObjectBuffer",
err,
"params, bucket={}, object={}",
bucket_name,
object_name);
}
return res;
}

uint64_t
AzureChunkManager::GetObjectBuffer(const std::string& bucket_name,
const std::string& object_name,
void* buf,
uint64_t size) {
return client_->GetObjectBuffer(bucket_name, object_name, buf, size);
uint64_t res;
try {
res = client_->GetObjectBuffer(bucket_name, object_name, buf, size);
} catch (std::exception& err) {
ThrowAzureError("GetObjectBuffer",
err,
"params, bucket={}, object={}",
bucket_name,
object_name);
}
return res;
}

std::vector<std::string>
AzureChunkManager::ListObjects(const char* bucket_name, const char* prefix) {
return client_->ListObjects(bucket_name, prefix);
AzureChunkManager::ListObjects(const std::string& bucket_name,
const std::string& prefix) {
std::vector<std::string> res;
try {
res = client_->ListObjects(bucket_name, prefix);
} catch (std::exception& err) {
ThrowAzureError("ListObjects",
err,
"params, bucket={}, prefix={}",
bucket_name,
prefix);
}
return res;
}

} // namespace storage
Expand Down
21 changes: 19 additions & 2 deletions internal/core/src/storage/AzureChunkManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,20 @@
namespace milvus {
namespace storage {

template <typename... Args>

static SegcoreError
ThrowAzureError(const std::string& func,
const std::exception& err,
const std::string& fmtString,
Args&&... args) {
std::ostringstream oss;
const auto& message = fmt::format(fmtString, std::forward<Args>(args)...);
oss << "Error in " << func << "[exception:" << err.what()
<< ", params:" << message << "]";
throw SegcoreError(S3Error, oss.str());
}

/**
* @brief This AzureChunkManager is responsible for read and write file in blob.
*/
Expand Down Expand Up @@ -113,7 +127,7 @@ class AzureChunkManager : public ChunkManager {
bool
ObjectExists(const std::string& bucket_name,
const std::string& object_name);
int64_t
uint64_t
GetObjectSize(const std::string& bucket_name,
const std::string& object_name);
bool
Expand All @@ -130,9 +144,12 @@ class AzureChunkManager : public ChunkManager {
void* buf,
uint64_t size);
std::vector<std::string>
ListObjects(const char* bucket_name, const char* prefix = nullptr);
ListObjects(const std::string& bucket_name,
const std::string& prefix = nullptr);

private:
static std::atomic<size_t> init_count_;
static std::mutex client_mutex_;
std::shared_ptr<azure::AzureBlobChunkManager> client_;
std::string default_bucket_name_;
std::string path_prefix_;
Expand Down
2 changes: 1 addition & 1 deletion internal/core/src/storage/MinioChunkManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ MinioChunkManager::ObjectExists(const std::string& bucket_name,
return true;
}

int64_t
uint64_t
MinioChunkManager::GetObjectSize(const std::string& bucket_name,
const std::string& object_name) {
Aws::S3::Model::HeadObjectRequest request;
Expand Down
2 changes: 1 addition & 1 deletion internal/core/src/storage/MinioChunkManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ class MinioChunkManager : public ChunkManager {
bool
ObjectExists(const std::string& bucket_name,
const std::string& object_name);
int64_t
uint64_t
GetObjectSize(const std::string& bucket_name,
const std::string& object_name);
bool
Expand Down
Loading

0 comments on commit 7bb0fa9

Please sign in to comment.