Skip to content

Commit

Permalink
feat(interactive): Automatically upload local file when bulk loading (#…
Browse files Browse the repository at this point in the history
…3966)

- In previous sdk implementation, we assume the import files are already
viable inside the container. However in reality, users need to make use
of local files. So we need to upload the import files automatically when
invoking bulk loading.
- The documentation will be added later, since we about to refactor our
documentation for sdk.

Fix #3965
  • Loading branch information
zhanglei1949 authored Jun 26, 2024
1 parent 1fd9f38 commit 6d2b351
Show file tree
Hide file tree
Showing 14 changed files with 850 additions and 43 deletions.
15 changes: 15 additions & 0 deletions flex/engines/http_server/actor/admin_actor.act.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1336,4 +1336,19 @@ seastar::future<admin_query_result> admin_actor::run_get_graph_statistic(
gs::Result<seastar::sstring>(statistics.ToJson()));
}

seastar::future<admin_query_result> admin_actor::upload_file(
query_param&& query_param) {
auto& content = query_param.content;
auto upload_res = WorkDirManipulator::CreateFile(content);
if (upload_res.ok()) {
auto value = upload_res.value();
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(
seastar::sstring(value.data(), value.size())));
} else {
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(upload_res.status()));
}
}

} // namespace server
2 changes: 2 additions & 0 deletions flex/engines/http_server/actor/admin_actor.act.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ class ANNOTATION(actor:impl) admin_actor : public hiactor::actor {

seastar::future<admin_query_result> ANNOTATION(actor:method) run_get_graph_statistic(query_param&& param);

seastar::future<admin_query_result> ANNOTATION(actor:method) upload_file(query_param&& param);

// DECLARE_RUN_QUERIES;
/// Declare `do_work` func here, no need to implement.
ACTOR_DO_WORK()
Expand Down
193 changes: 187 additions & 6 deletions flex/engines/http_server/handler/admin_http_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "flex/engines/http_server/generated/actor/admin_actor_ref.act.autogen.h"
#include "flex/engines/http_server/types.h"
#include "flex/engines/http_server/workdir_manipulator.h"
#include "flex/third_party/httplib.h"

#include <glog/logging.h>

Expand All @@ -39,6 +40,181 @@ std::string trim_slash(const std::string& origin) {
return res;
}

// Only returns success if all results are success
// But currently, only one file uploading is supported.
admin_query_result generate_final_result(
server::payload<std::vector<gs::Result<seastar::sstring>>>& result) {
auto result_val = result.content;
nlohmann::json json_res;
if (result_val.size() != 1) {
LOG(INFO) << "Only one file uploading is supported";
return admin_query_result{gs::Result<seastar::sstring>(
gs::Status(gs::StatusCode::InternalError,
"Only one file uploading is supported"))};
}
for (auto& res : result_val) {
if (res.ok()) {
json_res["file_path"] = res.value();
} else {
return admin_query_result{std::move(res)};
}
}
return admin_query_result{gs::Result<seastar::sstring>(json_res.dump())};
}

inline bool parse_multipart_boundary(const seastar::sstring& content_type,
seastar::sstring& boundary) {
auto pos = content_type.find("boundary=");
if (pos == std::string::npos) {
return false;
}
boundary = content_type.substr(pos + 9);
if (boundary.length() >= 2 && boundary[0] == '"' && boundary.back() == '"') {
boundary = boundary.substr(1, boundary.size() - 2);
}
return !boundary.empty();
}

class admin_file_upload_handler_impl : public seastar::httpd::handler_base {
public:
admin_file_upload_handler_impl(uint32_t group_id, uint32_t shard_concurrency)
: shard_concurrency_(shard_concurrency), executor_idx_(0) {
admin_actor_refs_.reserve(shard_concurrency_);
hiactor::scope_builder builder;
builder.set_shard(hiactor::local_shard_id())
.enter_sub_scope(hiactor::scope<executor_group>(0))
.enter_sub_scope(hiactor::scope<hiactor::actor_group>(group_id));
for (unsigned i = 0; i < shard_concurrency_; ++i) {
admin_actor_refs_.emplace_back(builder.build_ref<admin_actor_ref>(i));
}
}
~admin_file_upload_handler_impl() override = default;

seastar::future<server::payload<std::vector<gs::Result<seastar::sstring>>>>
upload_file(std::vector<std::pair<seastar::sstring, seastar::sstring>>&&
file_name_and_contents,
size_t cur_ind, uint32_t dst_executor,
std::vector<gs::Result<seastar::sstring>>&& results) {
if (cur_ind >= file_name_and_contents.size()) {
VLOG(10) << "Successfully uploaded " << file_name_and_contents.size()
<< " files.";
return seastar::make_ready_future<
server::payload<std::vector<gs::Result<seastar::sstring>>>>(
std::move(results));
} else {
return admin_actor_refs_[dst_executor]
.upload_file(
query_param{std::move(file_name_and_contents[cur_ind].second)})
.then_wrapped([this, dst_executor, cur_ind,
file_name_and_contents =
std::move(file_name_and_contents),
results =
std::move(results)](auto&& result_fut) mutable {
auto result = result_fut.get0();
auto result_val = result.content;
if (result_val.ok()) {
VLOG(10) << "Upload file success: "
<< file_name_and_contents[cur_ind].first << ", "
<< result_val.value();
} else {
LOG(ERROR) << "Upload file failed";
return seastar::make_exception_future<
server::payload<std::vector<gs::Result<seastar::sstring>>>>(
std::runtime_error("Upload file failed: " +
result_val.status().error_message()));
}
results.emplace_back(result_val);
return upload_file(std::move(file_name_and_contents), cur_ind + 1,
dst_executor, std::move(results));
});
}
}

seastar::future<server::payload<gs::Result<seastar::sstring>>> upload_files(
std::vector<std::pair<seastar::sstring, seastar::sstring>>&&
file_name_and_contents,
uint32_t dst_executor) {
// upload each file in chain
std::vector<gs::Result<seastar::sstring>> results;
return upload_file(std::move(file_name_and_contents), 0, dst_executor,
std::move(results))
.then([](auto&& results) {
auto final_res = generate_final_result(results);
return seastar::make_ready_future<admin_query_result>(
std::move(final_res));
});
}

std::vector<std::pair<seastar::sstring, seastar::sstring>>
parse_multipart_form_data(const seastar::sstring& content,
const seastar::sstring& boundary) {
std::vector<seastar::sstring> names, filenames, content_types, contents;
httplib::detail::MultipartFormDataParser parser;
parser.set_boundary(boundary);
httplib::MultipartContentHeader header_callback =
[&names, &filenames,
&content_types](const httplib::MultipartFormData& header) {
names.push_back(header.name);
filenames.push_back(header.filename);
content_types.push_back(header.content_type);
return true;
};
httplib::ContentReceiver content_callback =
[&contents](const char* data, size_t data_length) {
contents.emplace_back(data, data_length);
return true;
};
parser.parse(content.data(), content.size(), content_callback,
header_callback);
VLOG(10) << "filestorage names:" << gs::to_string(names);
VLOG(10) << "filenames: " << gs::to_string(filenames);
VLOG(10) << "content types" << gs::to_string(content_types);
std::vector<std::pair<seastar::sstring, seastar::sstring>> res;
for (size_t i = 0; i < names.size(); ++i) {
res.emplace_back(names[i], contents[i]);
}
return res;
}

seastar::future<std::unique_ptr<seastar::httpd::reply>> handle(
const seastar::sstring& path,
std::unique_ptr<seastar::httpd::request> req,
std::unique_ptr<seastar::httpd::reply> rep) override {
auto dst_executor = executor_idx_;

executor_idx_ = (executor_idx_ + 1) % shard_concurrency_;
LOG(INFO) << "Handling path:" << path << ", method: " << req->_method;
auto& method = req->_method;
if (method == "POST") {
seastar::sstring boundary;
if (!parse_multipart_boundary(req->_headers["Content-Type"], boundary)) {
LOG(ERROR) << "Failed to parse boundary";
return seastar::make_exception_future<
std::unique_ptr<seastar::httpd::reply>>(
std::runtime_error("Failed to parse boundary"));
}
std::vector<std::pair<seastar::sstring, seastar::sstring>>
file_name_and_contents =
parse_multipart_form_data(req->content, boundary);
// upload for each file
return upload_files(std::move(file_name_and_contents), dst_executor)
.then_wrapped([rep = std::move(rep)](
seastar::future<admin_query_result>&& fut) mutable {
return return_reply_with_result(std::move(rep), std::move(fut));
});
} else {
return seastar::make_exception_future<
std::unique_ptr<seastar::httpd::reply>>(
std::runtime_error("Unsupported method" + method));
}
}

private:
const uint32_t shard_concurrency_;
uint32_t executor_idx_;
std::vector<admin_actor_ref> admin_actor_refs_;
};

/**
* Handle all request for graph management.
*/
Expand Down Expand Up @@ -559,8 +735,7 @@ seastar::future<> admin_http_handler::set_routes() {
.add_str("/procedure")
.add_param("procedure_id");
// Delete a procedure
r.add(new seastar::httpd::match_rule(*match_rule),
seastar::httpd::operation_type::DELETE);
r.add(new seastar::httpd::match_rule(*match_rule), SEASTAR_DELETE);
}
{
// Each procedure's handling
Expand All @@ -587,10 +762,17 @@ seastar::future<> admin_http_handler::set_routes() {
shard_admin_graph_concurrency));

// Delete a graph
r.add(seastar::httpd::operation_type::DELETE,
r.add(SEASTAR_DELETE,
seastar::httpd::url("/v1/graph").remainder("graph_id"),
new admin_http_graph_handler_impl(interactive_admin_group_id,
shard_admin_graph_concurrency));
{
// uploading file to server
r.add(seastar::httpd::operation_type::POST,
seastar::httpd::url("/v1/file/upload"),
new admin_file_upload_handler_impl(interactive_admin_group_id,
shard_admin_graph_concurrency));
}

// Get graph metadata
{
Expand Down Expand Up @@ -707,7 +889,7 @@ seastar::future<> admin_http_handler::set_routes() {
CHECK(params.at("procedure_id") == "/proce1")
<< params.at("procedure_id");
params.clear();
test_handler = r.get_handler(seastar::httpd::operation_type::DELETE,
test_handler = r.get_handler(SEASTAR_DELETE,
"/v1/graph/abc/procedure/proce1", params);
CHECK(test_handler);
test_handler = r.get_handler(seastar::httpd::operation_type::PUT,
Expand All @@ -727,8 +909,7 @@ seastar::future<> admin_http_handler::set_routes() {
match_rule->add_str("/v1/job").add_param("job_id");
r.add(match_rule, seastar::httpd::operation_type::GET);

r.add(seastar::httpd::operation_type::DELETE,
seastar::httpd::url("/v1/job").remainder("job_id"),
r.add(SEASTAR_DELETE, seastar::httpd::url("/v1/job").remainder("job_id"),
new admin_http_job_handler_impl(interactive_admin_group_id,
shard_admin_job_concurrency));
}
Expand Down
17 changes: 17 additions & 0 deletions flex/engines/http_server/handler/http_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
#include "flex/engines/http_server/types.h"
#include "flex/utils/result.h"
#include "seastar/http/common.hh"
#include "seastar/http/reply.hh"

#ifndef ENGINES_HTTP_SERVER_HANDLER_HTTP_UTILS_H_
Expand All @@ -35,6 +36,22 @@ seastar::future<std::unique_ptr<seastar::httpd::reply>>
return_reply_with_result(std::unique_ptr<seastar::httpd::reply> rep,
seastar::future<admin_query_result>&& fut);

// To avoid macro conflict between /usr/include/arpa/nameser_compact.h#120(which
// is included by httplib.h) and seastar/http/common.hh#61
static constexpr seastar::httpd::operation_type SEASTAR_DELETE =
seastar::httpd::operation_type::DELETE;

} // namespace server

namespace gs {

template <typename T>
struct to_string_impl;

template <>
struct to_string_impl<seastar::sstring> {
static std::string to_string(const seastar::sstring& t) { return t.c_str(); }
};
} // namespace gs

#endif // ENGINES_HTTP_SERVER_HANDLER_HTTP_UTILS_H_
33 changes: 33 additions & 0 deletions flex/engines/http_server/workdir_manipulator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,14 @@ std::string WorkDirManipulator::GetLogDir() {
return log_dir;
}

std::string WorkDirManipulator::GetUploadDir() {
auto upload_dir = workspace + UPLOAD_DIR;
if (!std::filesystem::exists(upload_dir)) {
std::filesystem::create_directory(upload_dir);
}
return upload_dir;
}

std::string WorkDirManipulator::GetCompilerLogFile() {
// with timestamp
auto time_stamp = std::to_string(
Expand Down Expand Up @@ -645,6 +653,30 @@ gs::Result<std::string> WorkDirManipulator::CommitTempIndices(
return indices_dir;
}

gs::Result<std::string> WorkDirManipulator::CreateFile(
const seastar::sstring& content) {
if (content.size() == 0) {
return {gs::Status(gs::StatusCode::InValidArgument, "Content is empty")};
}
if (content.size() > MAX_CONTENT_SIZE) {
return {
gs::Status(gs::StatusCode::InValidArgument,
"Content is too large" + std::to_string(content.size()))};
}

// get the timestamp as the file name
auto time_stamp = std::to_string(gs::GetCurrentTimeStamp());
auto file_name = GetUploadDir() + "/" + time_stamp;
std::ofstream fout(file_name);
if (!fout.is_open()) {
return {gs::Status(gs::StatusCode::PermissionError, "Fail to open file")};
}
fout << content;
fout.close();
LOG(INFO) << "Successfully create file: " << file_name;
return file_name;
}

// graph_name can be a path, first try as it is absolute path, or
// relative path
std::string WorkDirManipulator::get_graph_dir(const std::string& graph_name) {
Expand Down Expand Up @@ -1244,5 +1276,6 @@ const std::string WorkDirManipulator::CONF_ENGINE_CONFIG_FILE_NAME =
const std::string WorkDirManipulator::RUNNING_GRAPH_FILE_NAME = "RUNNING";
const std::string WorkDirManipulator::TMP_DIR = "/tmp";
const std::string WorkDirManipulator::GRAPH_LOADER_BIN = "bulk_loader";
const std::string WorkDirManipulator::UPLOAD_DIR = "upload";

} // namespace server
12 changes: 12 additions & 0 deletions flex/engines/http_server/workdir_manipulator.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ class WorkDirManipulator {
static const std::string RUNNING_GRAPH_FILE_NAME;
static const std::string TMP_DIR;
static const std::string GRAPH_LOADER_BIN;
static const std::string UPLOAD_DIR;
static constexpr int32_t MAX_CONTENT_SIZE = 100 * 1024 * 1024; // 100MB

static void SetWorkspace(const std::string& workspace_path);

Expand Down Expand Up @@ -159,6 +161,8 @@ class WorkDirManipulator {

static std::string GetLogDir();

static std::string GetUploadDir();

static std::string GetCompilerLogFile();
// Return a unique temp dir for the graph.
static std::string GetTempIndicesDir(const std::string& graph_name);
Expand All @@ -169,6 +173,14 @@ class WorkDirManipulator {
static gs::Result<std::string> CommitTempIndices(
const std::string& graph_name);

// Create a file which contains the content, in binary, returns the filename.
// NOTE: Creating new files under a directory. Limit the size of the content.
// The uploaded file are mainly used for bulk loading, we will clear
// them after the loading process.
// TODO(zhanglei): Consider the bulk loading may fail, so we will
// automatically clear the uploaded files after a period of time.
static gs::Result<std::string> CreateFile(const seastar::sstring& content);

private:
static std::string get_tmp_bulk_loading_job_log_path(
const std::string& graph_name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ public interface Session
JobInterface,
ProcedureInterface,
QueryServiceInterface,
AutoCloseable {}
AutoCloseable,
UtilsInterface {}
Loading

0 comments on commit 6d2b351

Please sign in to comment.