From 6d2b35123ba72be41ca854ca255e40d5e1bd60a6 Mon Sep 17 00:00:00 2001 From: Zhang Lei Date: Wed, 26 Jun 2024 20:43:36 +0800 Subject: [PATCH] feat(interactive): Automatically upload local file when bulk loading (#3966) - In previous sdk implementation, we assume the import files are already viable inside the container. However in reality, users need to make use of local files. So we need to upload the import files automatically when invoking bulk loading. - The documentation will be added later, since we about to refactor our documentation for sdk. Fix #3965 --- .../http_server/actor/admin_actor.act.cc | 15 + .../http_server/actor/admin_actor.act.h | 2 + .../http_server/handler/admin_http_handler.cc | 193 +++++++++++- flex/engines/http_server/handler/http_utils.h | 17 ++ .../http_server/workdir_manipulator.cc | 33 +++ .../engines/http_server/workdir_manipulator.h | 12 + .../interactive/client/Session.java | 3 +- .../interactive/client/UtilsInterface.java | 25 ++ .../interactive/client/common/Status.java | 10 +- .../client/impl/DefaultSession.java | 274 +++++++++++++++++- .../interactive/client/DriverTest.java | 74 ++++- .../python/gs_interactive/client/session.py | 158 +++++++++- .../sdk/python/test/test_driver.py | 35 +++ flex/openapi/openapi_interactive.yaml | 42 +++ 14 files changed, 850 insertions(+), 43 deletions(-) create mode 100644 flex/interactive/sdk/java/src/main/java/com/alibaba/graphscope/interactive/client/UtilsInterface.java diff --git a/flex/engines/http_server/actor/admin_actor.act.cc b/flex/engines/http_server/actor/admin_actor.act.cc index 847c6d7b01ca..77c6c142c7cc 100644 --- a/flex/engines/http_server/actor/admin_actor.act.cc +++ b/flex/engines/http_server/actor/admin_actor.act.cc @@ -1336,4 +1336,19 @@ seastar::future admin_actor::run_get_graph_statistic( gs::Result(statistics.ToJson())); } +seastar::future admin_actor::upload_file( + query_param&& query_param) { + auto& content = query_param.content; + auto upload_res = WorkDirManipulator::CreateFile(content); + if (upload_res.ok()) { + auto value = upload_res.value(); + return seastar::make_ready_future( + gs::Result( + seastar::sstring(value.data(), value.size()))); + } else { + return seastar::make_ready_future( + gs::Result(upload_res.status())); + } +} + } // namespace server \ No newline at end of file diff --git a/flex/engines/http_server/actor/admin_actor.act.h b/flex/engines/http_server/actor/admin_actor.act.h index 0b30475be074..878988ea4304 100644 --- a/flex/engines/http_server/actor/admin_actor.act.h +++ b/flex/engines/http_server/actor/admin_actor.act.h @@ -72,6 +72,8 @@ class ANNOTATION(actor:impl) admin_actor : public hiactor::actor { seastar::future ANNOTATION(actor:method) run_get_graph_statistic(query_param&& param); + seastar::future ANNOTATION(actor:method) upload_file(query_param&& param); + // DECLARE_RUN_QUERIES; /// Declare `do_work` func here, no need to implement. ACTOR_DO_WORK() diff --git a/flex/engines/http_server/handler/admin_http_handler.cc b/flex/engines/http_server/handler/admin_http_handler.cc index f7c7ad3707c2..e0e874640b14 100644 --- a/flex/engines/http_server/handler/admin_http_handler.cc +++ b/flex/engines/http_server/handler/admin_http_handler.cc @@ -23,6 +23,7 @@ #include "flex/engines/http_server/generated/actor/admin_actor_ref.act.autogen.h" #include "flex/engines/http_server/types.h" #include "flex/engines/http_server/workdir_manipulator.h" +#include "flex/third_party/httplib.h" #include @@ -39,6 +40,181 @@ std::string trim_slash(const std::string& origin) { return res; } +// Only returns success if all results are success +// But currently, only one file uploading is supported. +admin_query_result generate_final_result( + server::payload>>& result) { + auto result_val = result.content; + nlohmann::json json_res; + if (result_val.size() != 1) { + LOG(INFO) << "Only one file uploading is supported"; + return admin_query_result{gs::Result( + gs::Status(gs::StatusCode::InternalError, + "Only one file uploading is supported"))}; + } + for (auto& res : result_val) { + if (res.ok()) { + json_res["file_path"] = res.value(); + } else { + return admin_query_result{std::move(res)}; + } + } + return admin_query_result{gs::Result(json_res.dump())}; +} + +inline bool parse_multipart_boundary(const seastar::sstring& content_type, + seastar::sstring& boundary) { + auto pos = content_type.find("boundary="); + if (pos == std::string::npos) { + return false; + } + boundary = content_type.substr(pos + 9); + if (boundary.length() >= 2 && boundary[0] == '"' && boundary.back() == '"') { + boundary = boundary.substr(1, boundary.size() - 2); + } + return !boundary.empty(); +} + +class admin_file_upload_handler_impl : public seastar::httpd::handler_base { + public: + admin_file_upload_handler_impl(uint32_t group_id, uint32_t shard_concurrency) + : shard_concurrency_(shard_concurrency), executor_idx_(0) { + admin_actor_refs_.reserve(shard_concurrency_); + hiactor::scope_builder builder; + builder.set_shard(hiactor::local_shard_id()) + .enter_sub_scope(hiactor::scope(0)) + .enter_sub_scope(hiactor::scope(group_id)); + for (unsigned i = 0; i < shard_concurrency_; ++i) { + admin_actor_refs_.emplace_back(builder.build_ref(i)); + } + } + ~admin_file_upload_handler_impl() override = default; + + seastar::future>>> + upload_file(std::vector>&& + file_name_and_contents, + size_t cur_ind, uint32_t dst_executor, + std::vector>&& results) { + if (cur_ind >= file_name_and_contents.size()) { + VLOG(10) << "Successfully uploaded " << file_name_and_contents.size() + << " files."; + return seastar::make_ready_future< + server::payload>>>( + std::move(results)); + } else { + return admin_actor_refs_[dst_executor] + .upload_file( + query_param{std::move(file_name_and_contents[cur_ind].second)}) + .then_wrapped([this, dst_executor, cur_ind, + file_name_and_contents = + std::move(file_name_and_contents), + results = + std::move(results)](auto&& result_fut) mutable { + auto result = result_fut.get0(); + auto result_val = result.content; + if (result_val.ok()) { + VLOG(10) << "Upload file success: " + << file_name_and_contents[cur_ind].first << ", " + << result_val.value(); + } else { + LOG(ERROR) << "Upload file failed"; + return seastar::make_exception_future< + server::payload>>>( + std::runtime_error("Upload file failed: " + + result_val.status().error_message())); + } + results.emplace_back(result_val); + return upload_file(std::move(file_name_and_contents), cur_ind + 1, + dst_executor, std::move(results)); + }); + } + } + + seastar::future>> upload_files( + std::vector>&& + file_name_and_contents, + uint32_t dst_executor) { + // upload each file in chain + std::vector> results; + return upload_file(std::move(file_name_and_contents), 0, dst_executor, + std::move(results)) + .then([](auto&& results) { + auto final_res = generate_final_result(results); + return seastar::make_ready_future( + std::move(final_res)); + }); + } + + std::vector> + parse_multipart_form_data(const seastar::sstring& content, + const seastar::sstring& boundary) { + std::vector names, filenames, content_types, contents; + httplib::detail::MultipartFormDataParser parser; + parser.set_boundary(boundary); + httplib::MultipartContentHeader header_callback = + [&names, &filenames, + &content_types](const httplib::MultipartFormData& header) { + names.push_back(header.name); + filenames.push_back(header.filename); + content_types.push_back(header.content_type); + return true; + }; + httplib::ContentReceiver content_callback = + [&contents](const char* data, size_t data_length) { + contents.emplace_back(data, data_length); + return true; + }; + parser.parse(content.data(), content.size(), content_callback, + header_callback); + VLOG(10) << "filestorage names:" << gs::to_string(names); + VLOG(10) << "filenames: " << gs::to_string(filenames); + VLOG(10) << "content types" << gs::to_string(content_types); + std::vector> res; + for (size_t i = 0; i < names.size(); ++i) { + res.emplace_back(names[i], contents[i]); + } + return res; + } + + seastar::future> handle( + const seastar::sstring& path, + std::unique_ptr req, + std::unique_ptr rep) override { + auto dst_executor = executor_idx_; + + executor_idx_ = (executor_idx_ + 1) % shard_concurrency_; + LOG(INFO) << "Handling path:" << path << ", method: " << req->_method; + auto& method = req->_method; + if (method == "POST") { + seastar::sstring boundary; + if (!parse_multipart_boundary(req->_headers["Content-Type"], boundary)) { + LOG(ERROR) << "Failed to parse boundary"; + return seastar::make_exception_future< + std::unique_ptr>( + std::runtime_error("Failed to parse boundary")); + } + std::vector> + file_name_and_contents = + parse_multipart_form_data(req->content, boundary); + // upload for each file + return upload_files(std::move(file_name_and_contents), dst_executor) + .then_wrapped([rep = std::move(rep)]( + seastar::future&& fut) mutable { + return return_reply_with_result(std::move(rep), std::move(fut)); + }); + } else { + return seastar::make_exception_future< + std::unique_ptr>( + std::runtime_error("Unsupported method" + method)); + } + } + + private: + const uint32_t shard_concurrency_; + uint32_t executor_idx_; + std::vector admin_actor_refs_; +}; + /** * Handle all request for graph management. */ @@ -559,8 +735,7 @@ seastar::future<> admin_http_handler::set_routes() { .add_str("/procedure") .add_param("procedure_id"); // Delete a procedure - r.add(new seastar::httpd::match_rule(*match_rule), - seastar::httpd::operation_type::DELETE); + r.add(new seastar::httpd::match_rule(*match_rule), SEASTAR_DELETE); } { // Each procedure's handling @@ -587,10 +762,17 @@ seastar::future<> admin_http_handler::set_routes() { shard_admin_graph_concurrency)); // Delete a graph - r.add(seastar::httpd::operation_type::DELETE, + r.add(SEASTAR_DELETE, seastar::httpd::url("/v1/graph").remainder("graph_id"), new admin_http_graph_handler_impl(interactive_admin_group_id, shard_admin_graph_concurrency)); + { + // uploading file to server + r.add(seastar::httpd::operation_type::POST, + seastar::httpd::url("/v1/file/upload"), + new admin_file_upload_handler_impl(interactive_admin_group_id, + shard_admin_graph_concurrency)); + } // Get graph metadata { @@ -707,7 +889,7 @@ seastar::future<> admin_http_handler::set_routes() { CHECK(params.at("procedure_id") == "/proce1") << params.at("procedure_id"); params.clear(); - test_handler = r.get_handler(seastar::httpd::operation_type::DELETE, + test_handler = r.get_handler(SEASTAR_DELETE, "/v1/graph/abc/procedure/proce1", params); CHECK(test_handler); test_handler = r.get_handler(seastar::httpd::operation_type::PUT, @@ -727,8 +909,7 @@ seastar::future<> admin_http_handler::set_routes() { match_rule->add_str("/v1/job").add_param("job_id"); r.add(match_rule, seastar::httpd::operation_type::GET); - r.add(seastar::httpd::operation_type::DELETE, - seastar::httpd::url("/v1/job").remainder("job_id"), + r.add(SEASTAR_DELETE, seastar::httpd::url("/v1/job").remainder("job_id"), new admin_http_job_handler_impl(interactive_admin_group_id, shard_admin_job_concurrency)); } diff --git a/flex/engines/http_server/handler/http_utils.h b/flex/engines/http_server/handler/http_utils.h index e303beb68d35..376537eb3132 100644 --- a/flex/engines/http_server/handler/http_utils.h +++ b/flex/engines/http_server/handler/http_utils.h @@ -14,6 +14,7 @@ */ #include "flex/engines/http_server/types.h" #include "flex/utils/result.h" +#include "seastar/http/common.hh" #include "seastar/http/reply.hh" #ifndef ENGINES_HTTP_SERVER_HANDLER_HTTP_UTILS_H_ @@ -35,6 +36,22 @@ seastar::future> return_reply_with_result(std::unique_ptr rep, seastar::future&& fut); +// To avoid macro conflict between /usr/include/arpa/nameser_compact.h#120(which +// is included by httplib.h) and seastar/http/common.hh#61 +static constexpr seastar::httpd::operation_type SEASTAR_DELETE = + seastar::httpd::operation_type::DELETE; + } // namespace server +namespace gs { + +template +struct to_string_impl; + +template <> +struct to_string_impl { + static std::string to_string(const seastar::sstring& t) { return t.c_str(); } +}; +} // namespace gs + #endif // ENGINES_HTTP_SERVER_HANDLER_HTTP_UTILS_H_ diff --git a/flex/engines/http_server/workdir_manipulator.cc b/flex/engines/http_server/workdir_manipulator.cc index 51ea88bcb047..9f13005cc394 100644 --- a/flex/engines/http_server/workdir_manipulator.cc +++ b/flex/engines/http_server/workdir_manipulator.cc @@ -615,6 +615,14 @@ std::string WorkDirManipulator::GetLogDir() { return log_dir; } +std::string WorkDirManipulator::GetUploadDir() { + auto upload_dir = workspace + UPLOAD_DIR; + if (!std::filesystem::exists(upload_dir)) { + std::filesystem::create_directory(upload_dir); + } + return upload_dir; +} + std::string WorkDirManipulator::GetCompilerLogFile() { // with timestamp auto time_stamp = std::to_string( @@ -645,6 +653,30 @@ gs::Result WorkDirManipulator::CommitTempIndices( return indices_dir; } +gs::Result WorkDirManipulator::CreateFile( + const seastar::sstring& content) { + if (content.size() == 0) { + return {gs::Status(gs::StatusCode::InValidArgument, "Content is empty")}; + } + if (content.size() > MAX_CONTENT_SIZE) { + return { + gs::Status(gs::StatusCode::InValidArgument, + "Content is too large" + std::to_string(content.size()))}; + } + + // get the timestamp as the file name + auto time_stamp = std::to_string(gs::GetCurrentTimeStamp()); + auto file_name = GetUploadDir() + "/" + time_stamp; + std::ofstream fout(file_name); + if (!fout.is_open()) { + return {gs::Status(gs::StatusCode::PermissionError, "Fail to open file")}; + } + fout << content; + fout.close(); + LOG(INFO) << "Successfully create file: " << file_name; + return file_name; +} + // graph_name can be a path, first try as it is absolute path, or // relative path std::string WorkDirManipulator::get_graph_dir(const std::string& graph_name) { @@ -1244,5 +1276,6 @@ const std::string WorkDirManipulator::CONF_ENGINE_CONFIG_FILE_NAME = const std::string WorkDirManipulator::RUNNING_GRAPH_FILE_NAME = "RUNNING"; const std::string WorkDirManipulator::TMP_DIR = "/tmp"; const std::string WorkDirManipulator::GRAPH_LOADER_BIN = "bulk_loader"; +const std::string WorkDirManipulator::UPLOAD_DIR = "upload"; } // namespace server \ No newline at end of file diff --git a/flex/engines/http_server/workdir_manipulator.h b/flex/engines/http_server/workdir_manipulator.h index 98766b437aa0..520633e6c4b7 100644 --- a/flex/engines/http_server/workdir_manipulator.h +++ b/flex/engines/http_server/workdir_manipulator.h @@ -60,6 +60,8 @@ class WorkDirManipulator { static const std::string RUNNING_GRAPH_FILE_NAME; static const std::string TMP_DIR; static const std::string GRAPH_LOADER_BIN; + static const std::string UPLOAD_DIR; + static constexpr int32_t MAX_CONTENT_SIZE = 100 * 1024 * 1024; // 100MB static void SetWorkspace(const std::string& workspace_path); @@ -159,6 +161,8 @@ class WorkDirManipulator { static std::string GetLogDir(); + static std::string GetUploadDir(); + static std::string GetCompilerLogFile(); // Return a unique temp dir for the graph. static std::string GetTempIndicesDir(const std::string& graph_name); @@ -169,6 +173,14 @@ class WorkDirManipulator { static gs::Result CommitTempIndices( const std::string& graph_name); + // Create a file which contains the content, in binary, returns the filename. + // NOTE: Creating new files under a directory. Limit the size of the content. + // The uploaded file are mainly used for bulk loading, we will clear + // them after the loading process. + // TODO(zhanglei): Consider the bulk loading may fail, so we will + // automatically clear the uploaded files after a period of time. + static gs::Result CreateFile(const seastar::sstring& content); + private: static std::string get_tmp_bulk_loading_job_log_path( const std::string& graph_name); diff --git a/flex/interactive/sdk/java/src/main/java/com/alibaba/graphscope/interactive/client/Session.java b/flex/interactive/sdk/java/src/main/java/com/alibaba/graphscope/interactive/client/Session.java index b5400b885b0f..a5af8904382f 100644 --- a/flex/interactive/sdk/java/src/main/java/com/alibaba/graphscope/interactive/client/Session.java +++ b/flex/interactive/sdk/java/src/main/java/com/alibaba/graphscope/interactive/client/Session.java @@ -22,4 +22,5 @@ public interface Session JobInterface, ProcedureInterface, QueryServiceInterface, - AutoCloseable {} + AutoCloseable, + UtilsInterface {} diff --git a/flex/interactive/sdk/java/src/main/java/com/alibaba/graphscope/interactive/client/UtilsInterface.java b/flex/interactive/sdk/java/src/main/java/com/alibaba/graphscope/interactive/client/UtilsInterface.java new file mode 100644 index 000000000000..def0a4f21cb3 --- /dev/null +++ b/flex/interactive/sdk/java/src/main/java/com/alibaba/graphscope/interactive/client/UtilsInterface.java @@ -0,0 +1,25 @@ +/* + * Copyright 2022 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.graphscope.interactive.client; + +import com.alibaba.graphscope.interactive.client.common.Result; +import com.alibaba.graphscope.interactive.models.UploadFileResponse; + +import java.io.File; + +public interface UtilsInterface { + Result uploadFile(File fileStorage); +} diff --git a/flex/interactive/sdk/java/src/main/java/com/alibaba/graphscope/interactive/client/common/Status.java b/flex/interactive/sdk/java/src/main/java/com/alibaba/graphscope/interactive/client/common/Status.java index 0073530cd22c..4aff035737a0 100644 --- a/flex/interactive/sdk/java/src/main/java/com/alibaba/graphscope/interactive/client/common/Status.java +++ b/flex/interactive/sdk/java/src/main/java/com/alibaba/graphscope/interactive/client/common/Status.java @@ -53,7 +53,15 @@ public Status(StatusCode code, String message) { this.message = message; } - public static Status ServerInternalError(String message) { + public static Status ok(String message) { + return new Status(StatusCode.kOk, message); + } + + public static Status badRequest(String message) { + return new Status(StatusCode.kBadRequest, message); + } + + public static Status serverInternalError(String message) { return new Status(StatusCode.kServerInternalError, message); } diff --git a/flex/interactive/sdk/java/src/main/java/com/alibaba/graphscope/interactive/client/impl/DefaultSession.java b/flex/interactive/sdk/java/src/main/java/com/alibaba/graphscope/interactive/client/impl/DefaultSession.java index c33d121a4c74..82dda3f359f1 100644 --- a/flex/interactive/sdk/java/src/main/java/com/alibaba/graphscope/interactive/client/impl/DefaultSession.java +++ b/flex/interactive/sdk/java/src/main/java/com/alibaba/graphscope/interactive/client/impl/DefaultSession.java @@ -22,10 +22,13 @@ import com.alibaba.graphscope.interactive.api.*; import com.alibaba.graphscope.interactive.client.Session; import com.alibaba.graphscope.interactive.client.common.Result; +import com.alibaba.graphscope.interactive.client.common.Status; import com.alibaba.graphscope.interactive.models.*; import com.google.protobuf.InvalidProtocolBufferException; import java.io.Closeable; +import java.io.File; +import java.util.ArrayList; import java.util.List; /*** @@ -33,6 +36,11 @@ * Based on the code generated by OpenAPI Generator. */ public class DefaultSession implements Session { + private static final int DEFAULT_READ_TIMEOUT = 30000; + private static final int DEFAULT_WRITE_TIMEOUT = 30000; + private static String JSON_FORMAT_STRING = "json"; + private static String PROTO_FORMAT_STRING = "proto"; + private static String ENCODER_FORMAT_STRING = "encoder"; private final AdminServiceGraphManagementApi graphApi; private final AdminServiceJobManagementApi jobApi; private final AdminServiceProcedureManagementApi procedureApi; @@ -40,23 +48,9 @@ public class DefaultSession implements Session { private final GraphServiceVertexManagementApi vertexApi; private final GraphServiceEdgeManagementApi edgeApi; private final QueryServiceApi queryApi; - - private static final int DEFAULT_READ_TIMEOUT = 30000; - private static final int DEFAULT_WRITE_TIMEOUT = 30000; - private static String JSON_FORMAT_STRING = "json"; - private static String PROTO_FORMAT_STRING = "proto"; - private static String ENCODER_FORMAT_STRING = "encoder"; - + private final UtilsApi utilsApi; private final ApiClient client, queryClient; - public static DefaultSession newInstance(String uri) { - return new DefaultSession(uri); - } - - public static DefaultSession newInstance(String host, int port) { - return new DefaultSession("http://" + host + ":" + port); - } - /** * Create a default GraphScope Interactive Session. * @@ -75,6 +69,8 @@ private DefaultSession(String uri) { vertexApi = new GraphServiceVertexManagementApi(client); edgeApi = new GraphServiceEdgeManagementApi(client); + utilsApi = new UtilsApi(client); + Result status = getServiceStatus(); if (!status.isOk()) { throw new RuntimeException( @@ -93,6 +89,229 @@ private DefaultSession(String uri) { queryApi = new QueryServiceApi(queryClient); } + public static DefaultSession newInstance(String uri) { + return new DefaultSession(uri); + } + + public static DefaultSession newInstance(String host, int port) { + return new DefaultSession("http://" + host + ":" + port); + } + + /** + * Try to upload the input files if they are specified with a starting @ + * for input files in schema_mapping. Replace the path to the uploaded file with the + * path returned from the server. + *

+ * The @ can be added to the beginning of data_source.location in schema_mapping.loading_config + * or added to each file in vertex_mappings and edge_mappings. + *

+ * 1. location: @/path/to/dir + * inputs: + * - @/path/to/file1 + * - @/path/to/file2 + * 2. location: /path/to/dir + * inputs: + * - @/path/to/file1 + * - @/path/to/file2 + * 3. location: @/path/to/dir + * inputs: + * - /path/to/file1 + * - /path/to/file2 + * 4. location: /path/to/dir + * inputs: + * - /path/to/file1 + * - /path/to/file2 + * 4. location: None + * inputs: + * - @/path/to/file1 + * - @/path/to/file2 + * Among the above 4 cases, only the 1, 3, 5 case are valid, for 2,4 the file will not be uploaded + * + * @param schemaMapping schema mapping object + * @return true if all files are uploaded successfully, false otherwise + */ + private static Result validateSchemaMapping(SchemaMapping schemaMapping) { + if (schemaMapping == null) { + return new Result(Status.badRequest("Schema mapping is null"), null); + } + boolean rootLocationMarkedUploaded = false; + String location = null; + SchemaMappingLoadingConfig config = schemaMapping.getLoadingConfig(); + if (config != null) { + if (config.getDataSource() != null && config.getDataSource().getScheme() != null) { + if (config.getDataSource() + .getScheme() + .equals(SchemaMappingLoadingConfigDataSource.SchemeEnum.FILE)) { + location = config.getDataSource().getLocation(); + } else { + return new Result( + Status.ok("Only FILE scheme is supported"), schemaMapping); + } + } + } + if (location != null && location.startsWith("@")) { + rootLocationMarkedUploaded = true; + } + + List extractedFiles = new ArrayList<>(); + if (schemaMapping.getVertexMappings() != null) { + for (VertexMapping item : schemaMapping.getVertexMappings()) { + if (item.getInputs() != null) { + for (int i = 0; i < item.getInputs().size(); ++i) { + String input = item.getInputs().get(i); + if (location != null && !rootLocationMarkedUploaded) { + if (input.startsWith("@")) { + return new Result( + Status.badRequest( + "Root location given without @, but the input file" + + " starts with @" + + input), + null); + } + } + if (location != null) { + input = location + "/" + trimPath(input); + } + item.getInputs().set(i, input); + extractedFiles.add(input); + } + } + } + } + if (schemaMapping.getEdgeMappings() != null) { + for (EdgeMapping item : schemaMapping.getEdgeMappings()) { + if (item.getInputs() != null) { + for (int i = 0; i < item.getInputs().size(); ++i) { + String input = item.getInputs().get(i); + if (location != null && !rootLocationMarkedUploaded) { + if (input.startsWith("@")) { + return new Result( + Status.badRequest( + "Root location given without @, but the input file" + + " starts with @" + + input), + null); + } + } + if (location != null) { + input = location + "/" + trimPath(input); + } + item.getInputs().set(i, input); + extractedFiles.add(input); + } + } + } + } + { + int count = 0; + for (String file : extractedFiles) { + if (file.startsWith("@")) { + count += 1; + } + } + if (count == 0) { + System.out.println("No files to upload"); + return Result.ok(schemaMapping); + } else if (count != extractedFiles.size()) { + System.err.println("Can not mix uploading file and not uploading file"); + return Result.error("Can not mix uploading file and not uploading file"); + } + } + return Result.ok(schemaMapping); + } + + private Result uploadFilesAndUpdate(SchemaMapping schemaMapping) { + if (schemaMapping.getVertexMappings() != null) { + for (VertexMapping item : schemaMapping.getVertexMappings()) { + if (item.getInputs() != null) { + for (int i = 0; i < item.getInputs().size(); ++i) { + String input = item.getInputs().get(i); + if (input.startsWith("@")) { + input = input.substring(1); + File file = new File(input); + if (!file.exists()) { + return new Result( + Status.badRequest("File does not exist: " + input), + schemaMapping); + } + + Result uploadedFile = uploadFile(file); + if (!uploadedFile.isOk()) { + return new Result( + Status.badRequest( + "Failed to upload file: " + + input + + ", " + + uploadedFile.getStatusMessage()), + schemaMapping); + } + item.getInputs().set(i, uploadedFile.getValue().getFilePath()); + } + } + } + } + } + if (schemaMapping.getEdgeMappings() != null) { + for (EdgeMapping item : schemaMapping.getEdgeMappings()) { + if (item.getInputs() != null) { + for (int i = 0; i < item.getInputs().size(); ++i) { + String input = item.getInputs().get(i); + if (input.startsWith("@")) { + input = input.substring(1); + File file = new File(input); + if (!file.exists()) { + return new Result( + Status.badRequest("File does not exist: " + input), + schemaMapping); + } + Result uploadedFile = uploadFile(file); + if (!uploadedFile.isOk()) { + return new Result( + Status.badRequest( + "Failed to upload file: " + + input + + ", " + + uploadedFile.getStatusMessage()), + schemaMapping); + } + item.getInputs().set(i, uploadedFile.getValue().getFilePath()); + } + } + } + } + } + return Result.ok(schemaMapping); + } + + /** + * Remove the @ if it is present in the path + * + * @param path + * @return + */ + private static String trimPath(String path) { + if (path == null) { + throw new IllegalArgumentException("path cannot be null"); + } + return path.startsWith("@") ? path.substring(1) : path; + } + + private Result tryUploadFile(SchemaMapping schemaMapping) { + Result validateResult = validateSchemaMapping(schemaMapping); + if (!validateResult.isOk()) { + return new Result( + Status.badRequest("validation failed for schema mapping"), schemaMapping); + } + SchemaMapping validatedSchemaMapping = validateResult.getValue(); + System.out.println("Schema mapping validated successfully"); + Result uploadResult = uploadFilesAndUpdate(validatedSchemaMapping); + if (!uploadResult.isOk()) { + return new Result( + Status.badRequest("upload failed for schema mapping"), schemaMapping); + } + return uploadResult; + } + @Override public Result getEdge( String graphName, @@ -159,9 +378,14 @@ public Result updateEdge(String graphName, EdgeRequest edgeRequest) { @Override public Result bulkLoading(String graphId, SchemaMapping mapping) { + Result result = tryUploadFile(mapping); + if (!result.isOk()) { + System.out.println("Failed to upload files" + result.getStatusMessage()); + return new Result(result.getStatus(), null); + } try { ApiResponse response = - graphApi.createDataloadingJobWithHttpInfo(graphId, mapping); + graphApi.createDataloadingJobWithHttpInfo(graphId, result.getValue()); return Result.fromResponse(response); } catch (ApiException e) { e.printStackTrace(); @@ -559,4 +783,22 @@ public Result deleteVertex(String graphId, String label, Object primaryK */ @Override public void close() throws Exception {} + + /** + * Upload a file to the server. + * + * @param fileStorage the file to upload + * (required) + * @return the response from the server + */ + @Override + public Result uploadFile(File fileStorage) { + try { + ApiResponse response = utilsApi.uploadFileWithHttpInfo(fileStorage); + return Result.fromResponse(response); + } catch (ApiException e) { + e.printStackTrace(); + return Result.fromException(e); + } + } } diff --git a/flex/interactive/sdk/java/src/test/java/com/alibaba/graphscope/interactive/client/DriverTest.java b/flex/interactive/sdk/java/src/test/java/com/alibaba/graphscope/interactive/client/DriverTest.java index d2499bc6cccc..0931a4641e10 100644 --- a/flex/interactive/sdk/java/src/test/java/com/alibaba/graphscope/interactive/client/DriverTest.java +++ b/flex/interactive/sdk/java/src/test/java/com/alibaba/graphscope/interactive/client/DriverTest.java @@ -329,30 +329,50 @@ public void test1BulkLoading() { assertOk(rep); jobId = rep.getValue().getJobId(); logger.info("job id: " + jobId); + waitJobFinished(jobId); } @Test @Order(3) - public void test2waitJobFinished() { - if (jobId == null) { - return; + public void test1BulkLoadingUploading() { + SchemaMapping schemaMapping = new SchemaMapping(); + { + SchemaMappingLoadingConfig loadingConfig = new SchemaMappingLoadingConfig(); + loadingConfig.setImportOption(SchemaMappingLoadingConfig.ImportOptionEnum.INIT); + loadingConfig.setFormat(new SchemaMappingLoadingConfigFormat().type("csv")); + schemaMapping.setLoadingConfig(loadingConfig); } - while (true) { - Result rep = session.getJobStatus(jobId); - assertOk(rep); - JobStatus job = rep.getValue(); - if (job.getStatus() == JobStatus.StatusEnum.SUCCESS) { - logger.info("job finished"); - break; - } else if (job.getStatus() == JobStatus.StatusEnum.FAILED) { - throw new RuntimeException("job failed"); + { + // get env var FLEX_DATA_DIR + if (System.getenv("FLEX_DATA_DIR") == null) { + logger.info("FLEX_DATA_DIR is not set"); + return; } - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - e.printStackTrace(); + // The file will be uploaded to the server + String personPath = "@" + System.getenv("FLEX_DATA_DIR") + "/person.csv"; + String knowsPath = "@" + System.getenv("FLEX_DATA_DIR") + "/person_knows_person.csv"; + { + VertexMapping vertexMapping = new VertexMapping(); + vertexMapping.setTypeName("person"); + vertexMapping.addInputsItem(personPath); + schemaMapping.addVertexMappingsItem(vertexMapping); + } + { + EdgeMapping edgeMapping = new EdgeMapping(); + edgeMapping.setTypeTriplet( + new EdgeMappingTypeTriplet() + .edge("knows") + .sourceVertex("person") + .destinationVertex("person")); + edgeMapping.addInputsItem(knowsPath); + schemaMapping.addEdgeMappingsItem(edgeMapping); } } + Result rep = session.bulkLoading(graphId, schemaMapping); + assertOk(rep); + jobId = rep.getValue().getJobId(); + logger.info("job id: " + jobId); + waitJobFinished(jobId); } @Test @@ -578,4 +598,26 @@ private static boolean assertOk(Result result) { } return true; } + + private void waitJobFinished(String jobId) { + if (jobId == null) { + return; + } + while (true) { + Result rep = session.getJobStatus(jobId); + assertOk(rep); + JobStatus job = rep.getValue(); + if (job.getStatus() == JobStatus.StatusEnum.SUCCESS) { + logger.info("job finished"); + break; + } else if (job.getStatus() == JobStatus.StatusEnum.FAILED) { + throw new RuntimeException("job failed"); + } + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } } diff --git a/flex/interactive/sdk/python/gs_interactive/client/session.py b/flex/interactive/sdk/python/gs_interactive/client/session.py index 79993c3560b1..fd132a1a280c 100644 --- a/flex/interactive/sdk/python/gs_interactive/client/session.py +++ b/flex/interactive/sdk/python/gs_interactive/client/session.py @@ -19,9 +19,10 @@ from abc import ABCMeta, abstractmethod from typing import Annotated, Any, Dict, List, Optional, Union -from pydantic import Field, StrictStr -from gs_interactive.client.status import Status +from pydantic import Field, StrictStr, StrictBytes + +from gs_interactive.client.status import Status, StatusCode from gs_interactive.api.admin_service_graph_management_api import ( AdminServiceGraphManagementApi, ) @@ -40,6 +41,7 @@ from gs_interactive.api.graph_service_vertex_management_api import ( GraphServiceVertexManagementApi, ) +from gs_interactive.api.utils_api import UtilsApi from gs_interactive.api.query_service_api import QueryServiceApi from gs_interactive.api_client import ApiClient from gs_interactive.client.result import Result @@ -70,7 +72,7 @@ from gs_interactive.models.query_request import QueryRequest from gs_interactive.models.vertex_request import VertexRequest from gs_interactive.client.generated.results_pb2 import CollectiveResults - +from gs_interactive.models.upload_file_response import UploadFileResponse class EdgeInterface(metaclass=ABCMeta): @abstractmethod @@ -296,6 +298,11 @@ def list_jobs(self) -> Result[List[JobResponse]]: def cancel_job(self, job_id: StrictStr) -> Result[str]: raise NotImplementedError +class UiltsInterface(metaclass=ABCMeta): + @abstractmethod + def upload_file(self, filestorage: Optional[Union[StrictBytes, StrictStr]]) -> Result[UploadFileResponse]: + raise NotImplementedError + class Session( VertexInterface, @@ -304,6 +311,7 @@ class Session( ProcedureInterface, JobInterface, QueryServiceInterface, + UiltsInterface, ): pass @@ -321,6 +329,7 @@ def __init__(self, uri: str): self._service_api = AdminServiceServiceManagementApi(self._client) self._edge_api = GraphServiceEdgeManagementApi(self._client) self._vertex_api = GraphServiceVertexManagementApi(self._client) + self._utils_api = UtilsApi(self._client) # TODO(zhanglei): Get endpoint from service, current implementation is adhoc. # get service port service_status = self.get_service_status() @@ -481,6 +490,13 @@ def bulk_loading( graph_id: Annotated[StrictStr, Field(description="The id of graph to load")], schema_mapping: SchemaMapping, ) -> Result[JobResponse]: + # First try to upload the input files if they are specified with a starting @ + # return a new schema_mapping with the uploaded files + upload_res = self.try_upload_files(schema_mapping) + if not upload_res.is_ok(): + return upload_res + schema_mapping = upload_res.get_value() + print("new schema_mapping: ", schema_mapping) try: response = self._graph_api.create_dataloading_job_with_http_info( graph_id, schema_mapping @@ -665,3 +681,139 @@ def cancel_job(self, job_id: StrictStr) -> Result[str]: return Result.from_response(response) except Exception as e: return Result.from_exception(e) + + def upload_file(self, filestorage: Optional[Union[StrictBytes, StrictStr]]) -> Result[UploadFileResponse]: + try: + print("uploading file: ", filestorage) + response = self._utils_api.upload_file_with_http_info(filestorage) + print("response: ", response) + if response.status_code == 200: + # the response is the path of the uploaded file on server. + return Result.from_response(response) + else: + print("Failed to upload file: ", input) + return Result.from_response(response) + except Exception as e: + print("got exception: ", e) + return Result.from_exception(e) + + def trim_path(self, path: str) -> str: + return path[1:] if path.startswith('@') else path + + def check_file_mixup(self, schema_mapping: SchemaMapping) -> Result[SchemaMapping]: + root_dir_marked_with_at = False # Can not mix uploading file and not uploading file + location=None + if schema_mapping.loading_config and schema_mapping.loading_config.data_source: + if schema_mapping.loading_config.data_source.scheme != 'file': + print("Only check mixup for file scheme") + return Result.ok(schema_mapping) + location = schema_mapping.loading_config.data_source.location + if location and location.startswith('@'): + root_dir_marked_with_at = True + extracted_files = [] + if schema_mapping.vertex_mappings: + for vertex_mapping in schema_mapping.vertex_mappings: + if vertex_mapping.inputs: + for i, input in enumerate(vertex_mapping.inputs): + # First check whether input is valid + if location and not root_dir_marked_with_at: + if input.startswith('@'): + print("Root location given without @, but the input file starts with @" + input) + return Result.error(Status(StatusCode.BAD_REQUEST, "Root location given without @, but the input file starts with @" + input), schema_mapping) + if location: + vertex_mapping.inputs[i] = location + '/' + self.trim_path(input) + extracted_files.append(vertex_mapping.inputs[i]) + if schema_mapping.edge_mappings: + for edge_mapping in schema_mapping.edge_mappings: + if edge_mapping.inputs: + for i, input in enumerate(edge_mapping.inputs): + if location and not root_dir_marked_with_at: + if input.startswith('@'): + print("Root location given without @, but the input file starts with @" + input) + return Result.error(Status(StatusCode.BAD_REQUEST, "Root location given without @, but the input file starts with @" + input), schema_mapping) + if location: + edge_mapping.inputs[i] = location + '/' + self.trim_path(input) + extracted_files.append(edge_mapping.inputs[i]) + if extracted_files: + #count the number of files start with @ + count = 0 + for file in extracted_files: + if file.startswith('@'): + count += 1 + if count == 0: + print("No file to upload") + return Result.ok(schema_mapping) + elif count != len(extracted_files): + print("Can not mix uploading file and not uploading file") + return Result.error("Can not mix uploading file and not uploading file") + return Result.ok(schema_mapping) + + def upload_and_replace_input_inplace(self, schema_mapping: SchemaMapping) -> Result[SchemaMapping]: + """ + For each input file in schema_mapping, if the file starts with @, upload the file to the server + and replace the path with the path returned from the server. + """ + if schema_mapping.vertex_mappings: + for vertex_mapping in schema_mapping.vertex_mappings: + if vertex_mapping.inputs: + for i, input in enumerate(vertex_mapping.inputs): + if input.startswith('@'): + res = self.upload_file(input[1:]) + if not res.is_ok(): + return Result.error(res.status, schema_mapping) + vertex_mapping.inputs[i] = res.get_value().file_path + if schema_mapping.edge_mappings: + for edge_mapping in schema_mapping.edge_mappings: + if edge_mapping.inputs: + for i, input in enumerate(edge_mapping.inputs): + if input.startswith('@'): + res = self.upload_file(input[1:]) + if not res.is_ok(): + return Result.error(res.status, schema_mapping) + edge_mapping.inputs[i] = res.get_value().file_path + return Result.ok(schema_mapping) + + def try_upload_files(self, schema_mapping: SchemaMapping) -> Result[SchemaMapping]: + """ + Try to upload the input files if they are specified with a starting @ + for input files in schema_mapping. Replace the path to the uploaded file with the + path returned from the server. + + The @ can be added to the beginning of data_source.location in schema_mapping.loading_config + or added to each file in vertex_mappings and edge_mappings. + + 1. location: @/path/to/dir + inputs: + - @/path/to/file1 + - @/path/to/file2 + 2. location: /path/to/dir + inputs: + - @/path/to/file1 + - @/path/to/file2 + 3. location: @/path/to/dir + inputs: + - /path/to/file1 + - /path/to/file2 + 4. location: /path/to/dir + inputs: + - /path/to/file1 + - /path/to/file2 + 4. location: None + inputs: + - @/path/to/file1 + - @/path/to/file2 + Among the above 4 cases, only the 1, 3, 5 case are valid, for 2,4 the file will not be uploaded + """ + + check_mixup_res = self.check_file_mixup(schema_mapping) + if not check_mixup_res.is_ok(): + return check_mixup_res + schema_mapping = check_mixup_res.get_value() + # now try upload the replace inplace + print("after check_mixup_res: ") + upload_res = self.upload_and_replace_input_inplace(schema_mapping) + if not upload_res.is_ok(): + return upload_res + print("new schema_mapping: ", upload_res.get_value()) + return Result.ok(upload_res.get_value()) + \ No newline at end of file diff --git a/flex/interactive/sdk/python/test/test_driver.py b/flex/interactive/sdk/python/test/test_driver.py index fbadf86f9770..bc36dbb71086 100644 --- a/flex/interactive/sdk/python/test/test_driver.py +++ b/flex/interactive/sdk/python/test/test_driver.py @@ -96,6 +96,7 @@ def tearDown(self): def test_example(self): self._graph_id = self.createGraph() self.bulkLoading() + self.bulkLoadingUploading() self.waitJobFinish() self.list_graph() self.runCypherQuery() @@ -187,6 +188,40 @@ def bulkLoading(self): assert resp.is_ok() self._job_id = resp.get_value().job_id + + def bulkLoadingUploading(self): + """ + Test bulk loading with uploading files + """ + assert os.environ.get("FLEX_DATA_DIR") is not None + location = os.environ.get("FLEX_DATA_DIR") + person_csv_path = "@/{}/person.csv".format(location) + knows_csv_path = "@/{}/person_knows_person.csv".format(location) + print("test bulk loading: ", self._graph_id, person_csv_path, knows_csv_path) + schema_mapping = SchemaMapping( + loading_config=SchemaMappingLoadingConfig( + data_source=SchemaMappingLoadingConfigDataSource(scheme="file"), + import_option="init", + format=SchemaMappingLoadingConfigFormat(type="csv"), + ), + vertex_mappings=[ + VertexMapping(type_name="person", inputs=[person_csv_path]) + ], + edge_mappings=[ + EdgeMapping( + type_triplet=EdgeMappingTypeTriplet( + edge="knows", + source_vertex="person", + destination_vertex="person", + ), + inputs=[knows_csv_path], + ) + ], + ) + resp = self._sess.bulk_loading(self._graph_id, schema_mapping) + assert resp.is_ok() + self._job_id = resp.get_value().job_id + def waitJobFinish(self): assert self._job_id is not None while True: diff --git a/flex/openapi/openapi_interactive.yaml b/flex/openapi/openapi_interactive.yaml index 736409d7a3c2..af3c5aaf721a 100644 --- a/flex/openapi/openapi_interactive.yaml +++ b/flex/openapi/openapi_interactive.yaml @@ -998,6 +998,38 @@ paths: format: byte '500': description: Server internal error + /v1/file/upload: + post: + tags: [Utils] + operationId: uploadFile + requestBody: + required: true + content: + multipart/form-data: + schema: + type: object + properties: + filestorage: + type: string + format: binary + responses: + 200: + description: successful operation + content: + application/json: + schema: + $ref: '#/components/schemas/UploadFileResponse' + example: + file_path: /home/graphscope/path/to/file.csv + # metadata: + # datasource: file + # file_type: csv + 500: + description: Server Internal Error + content: + application/json: + schema: + $ref: '#/components/schemas/APIResponse' components: schemas: AnyValue: {} @@ -1744,5 +1776,15 @@ components: type: string description: URL or log string detail: + type: object + additionalProperties: true + UploadFileResponse: + required: + - file_path + # - metadata + properties: + file_path: + type: string + metadata: type: object additionalProperties: true \ No newline at end of file