diff --git a/docs/flex/interactive/development/admin_service.md b/docs/flex/interactive/development/admin_service.md index 9d0fa8dfd230..21e8faf31beb 100644 --- a/docs/flex/interactive/development/admin_service.md +++ b/docs/flex/interactive/development/admin_service.md @@ -1085,4 +1085,30 @@ To start admin service in development, use the command line argument `--enable-a The Compiler service could be started as a subprocess of the AdminService. This ensures that when switching graphs in the AdminService, the Compiler service also switches to the corresponding graph's schema. This is the default behavior in the current Interactive. ```bash -./bin/interactive_server -c ${ENGINE_CONFIG} -w ${WORKSPACE} --enable-admin-service true --start-compiler true \ No newline at end of file +./bin/interactive_server -c ${ENGINE_CONFIG} -w ${WORKSPACE} --enable-admin-service true --start-compiler true +``` + + +## Http error code + +Internally we use [`StatusCode`](https://github.com/alibaba/GraphScope/blob/main/flex/utils/result.h) to record the runtime errors. +The mapping between statusCode and http code is shown in the following table. + +| Code | HTTP Code | +| ----------------------------------- | ----------- | +| gs::StatusCode::OK | 200 | +| gs::StatusCode::InValidArgument | 400 | +| gs::StatusCode::UnsupportedOperator | 400 | +| gs::StatusCode::AlreadyExists | 409 | +| gs::StatusCode::NotExists | 404 | +| gs::StatusCode::CodegenError | 500 | +| gs::StatusCode::UninitializedStatus | 500 | +| gs::StatusCode::InvalidSchema | 400 | +| gs::StatusCode::PermissionError | 403 | +| gs::StatusCode::IllegalOperation | 400 | +| gs::StatusCode::InternalError | 500 | +| gs::StatusCode::InvalidImportFile | 400 | +| gs::StatusCode::IOError | 500 | +| gs::StatusCode::NotFound | 404 | +| gs::StatusCode::QueryFailed | 500 | +| default | 500 | \ No newline at end of file diff --git a/flex/engines/http_server/actor/admin_actor.act.cc b/flex/engines/http_server/actor/admin_actor.act.cc index ca77032016c6..8c4121ab6cd6 100644 --- a/flex/engines/http_server/actor/admin_actor.act.cc +++ b/flex/engines/http_server/actor/admin_actor.act.cc @@ -40,7 +40,7 @@ admin_actor::admin_actor(hiactor::actor_base* exec_ctx, } // Create a new Graph with the passed graph config. -seastar::future admin_actor::run_create_graph( +seastar::future admin_actor::run_create_graph( query_param&& query_param) { LOG(INFO) << "Creating Graph: " << query_param.content; @@ -53,82 +53,54 @@ seastar::future admin_actor::run_create_graph( yaml = YAML::Load(json_ss); } catch (std::exception& e) { LOG(ERROR) << "Fail to parse json: " << e.what(); - return seastar::make_exception_future( - std::runtime_error("Fail to parse json: " + std::string(e.what()))); + return seastar::make_ready_future( + gs::Result( + gs::StatusCode::InvalidSchema, + "Fail to parse json: " + std::string(e.what()))); } catch (...) { LOG(ERROR) << "Fail to parse json: " << query_param.content; - return seastar::make_exception_future( - std::runtime_error("Fail to parse json: " + query_param.content)); + return seastar::make_ready_future( + gs::Result(gs::StatusCode::InvalidSchema, + "Fail to parse json: ")); } auto result = server::WorkDirManipulator::CreateGraph(yaml); - - if (result.ok()) { - VLOG(10) << "Successfully created graph"; - return seastar::make_ready_future(std::move(result.value())); - } else { - LOG(ERROR) << "Fail to create graph: " << result.status().error_message(); - return seastar::make_exception_future(std::runtime_error( - "Fail to create graph: " + result.status().error_message())); - } + return seastar::make_ready_future(std::move(result)); } // get graph schema // query_param is the graph name -seastar::future admin_actor::run_get_graph_schema( +seastar::future admin_actor::run_get_graph_schema( query_param&& query_param) { LOG(INFO) << "Get Graph schema for graph: " << query_param.content; auto schema_result = server::WorkDirManipulator::GetGraphSchemaString(query_param.content); - if (schema_result.ok()) { - return seastar::make_ready_future( - std::move(schema_result.value())); - } else { - LOG(ERROR) << "Fail to get graph schema: " - << schema_result.status().error_message(); - return seastar::make_exception_future(std::runtime_error( - "Fail to get graph schema: " + schema_result.status().error_message())); - } + return seastar::make_ready_future( + std::move(schema_result)); } // list all graphs -seastar::future admin_actor::run_list_graphs( +seastar::future admin_actor::run_list_graphs( query_param&& query_param) { LOG(INFO) << "List all graphs."; auto list_result = server::WorkDirManipulator::ListGraphs(); - if (!list_result.ok()) { - LOG(ERROR) << "Fail to list graphs: " - << list_result.status().error_message(); - return seastar::make_exception_future(std::runtime_error( - "Fail to list graphs: " + list_result.status().error_message())); - } else { - VLOG(10) << "Successfully list graphs"; - return seastar::make_ready_future( - std::move(list_result.value())); - } + return seastar::make_ready_future(std::move(list_result)); } // delete one graph -seastar::future admin_actor::run_delete_graph( +seastar::future admin_actor::run_delete_graph( query_param&& query_param) { LOG(INFO) << "Delete graph: " << query_param.content; auto delete_res = server::WorkDirManipulator::DeleteGraph(query_param.content); - if (delete_res.ok()) { - return seastar::make_ready_future( - std::move(delete_res.value())); - } else { - LOG(ERROR) << "Fail to delete graph: " - << delete_res.status().error_message(); - return seastar::make_exception_future(std::runtime_error( - "Fail to delete graph: " + delete_res.status().error_message())); - } + + return seastar::make_ready_future(std::move(delete_res)); } // load the graph. -seastar::future admin_actor::run_graph_loading( +seastar::future admin_actor::run_graph_loading( graph_management_param&& query_param) { // query_param contains two parameter, first for graph name, second for graph // config @@ -146,13 +118,17 @@ seastar::future admin_actor::run_graph_loading( yaml = YAML::Load(json_ss); } catch (std::exception& e) { LOG(ERROR) << "Fail to parse json: " << e.what(); - return seastar::make_exception_future( - std::runtime_error("Fail to parse json: " + std::string(e.what()))); + return seastar::make_ready_future( + gs::Result( + gs::StatusCode::InvalidImportFile, + "Fail to parse json: " + std::string(e.what()))); } catch (...) { - LOG(ERROR) << "Fail to parse json: " << graph_config; - return seastar::make_exception_future(std::runtime_error( - "Fail to parse json when running dataloading for : " + graph_name)); + LOG(ERROR) << "Fail to parse json: "; + return seastar::make_ready_future( + gs::Result(gs::StatusCode::InvalidImportFile, + "Fail to parse json: ")); } + int32_t loading_thread_num = 1; if (yaml["loading_thread_num"]) { loading_thread_num = yaml["loading_thread_num"].as(); @@ -160,21 +136,13 @@ seastar::future admin_actor::run_graph_loading( auto graph_loading_res = server::WorkDirManipulator::LoadGraph( graph_name, yaml, loading_thread_num); - - if (graph_loading_res.ok()) { - VLOG(10) << "Successfully loaded graph"; - return seastar::make_ready_future( - std::move(graph_loading_res.value())); - } else { - LOG(ERROR) << "Fail to load graph: " - << graph_loading_res.status().error_message(); - return seastar::make_exception_future(std::runtime_error( - "Fail to load graph: " + graph_loading_res.status().error_message())); - } + return seastar::make_ready_future( + std::move(graph_loading_res)); } // Get all procedure with graph_name and procedure_name -seastar::future admin_actor::get_procedure_by_procedure_name( +seastar::future +admin_actor::get_procedure_by_procedure_name( procedure_query_param&& query_param) { auto& graph_name = query_param.content.first; auto& procedure_name = query_param.content.second; @@ -183,41 +151,23 @@ seastar::future admin_actor::get_procedure_by_procedure_name( auto get_procedure_res = server::WorkDirManipulator::GetProcedureByGraphAndProcedureName( graph_name, procedure_name); - if (get_procedure_res.ok()) { - VLOG(10) << "Successfully get procedure procedures"; - return seastar::make_ready_future( - std::move(get_procedure_res.value())); - } else { - LOG(ERROR) << "Fail to get procedure for graph: " << graph_name - << " and procedure: " << procedure_name << ", error message: " - << get_procedure_res.status().error_message(); - return seastar::make_exception_future( - std::runtime_error("Fail to get procedure: " + - get_procedure_res.status().error_message())); - } + + return seastar::make_ready_future( + std::move(get_procedure_res)); } // Get all procedures of one graph. -seastar::future admin_actor::get_procedures_by_graph_name( +seastar::future admin_actor::get_procedures_by_graph_name( query_param&& query_param) { auto& graph_name = query_param.content; auto get_all_procedure_res = server::WorkDirManipulator::GetProceduresByGraphName(graph_name); - if (get_all_procedure_res.ok()) { - VLOG(10) << "Successfully get all procedures: " - << get_all_procedure_res.value(); - return seastar::make_ready_future( - std::move(get_all_procedure_res.value())); - } else { - LOG(ERROR) << "Fail to get all procedures: " - << get_all_procedure_res.status().error_message(); - return seastar::make_exception_future( - std::runtime_error("Fail to get all procedures: " + - get_all_procedure_res.status().error_message())); - } + + return seastar::make_ready_future( + std::move(get_all_procedure_res)); } -seastar::future admin_actor::create_procedure( +seastar::future admin_actor::create_procedure( create_procedure_query_param&& query_param) { auto& graph_name = query_param.content.first; auto& parameter = query_param.content.second; @@ -228,11 +178,11 @@ seastar::future admin_actor::create_procedure( .then_wrapped([](auto&& f) { try { auto res = f.get(); - return seastar::make_ready_future( - query_result{std::move(res)}); + return seastar::make_ready_future( + admin_query_result{std::move(res)}); } catch (std::exception& e) { LOG(ERROR) << "Fail to create procedure: " << e.what(); - return seastar::make_exception_future( + return seastar::make_exception_future( std::runtime_error("Fail to create procedure: " + std::string(e.what()))); } @@ -240,49 +190,32 @@ seastar::future admin_actor::create_procedure( } // Delete a procedure by graph name and procedure name -seastar::future admin_actor::delete_procedure( +seastar::future admin_actor::delete_procedure( create_procedure_query_param&& query_param) { auto& graph_name = query_param.content.first; auto& procedure_name = query_param.content.second; auto delete_procedure_res = server::WorkDirManipulator::DeleteProcedure(graph_name, procedure_name); - if (delete_procedure_res.ok()) { - VLOG(10) << "Successfully get all procedures"; - return seastar::make_ready_future( - std::move(delete_procedure_res.value())); - } else { - LOG(ERROR) << "Fail to create procedure: " - << delete_procedure_res.status().error_message(); - return seastar::make_exception_future( - std::runtime_error("Fail to create procedures: " + - delete_procedure_res.status().error_message())); - } + return seastar::make_ready_future( + std::move(delete_procedure_res)); } // update a procedure by graph name and procedure name -seastar::future admin_actor::update_procedure( +seastar::future admin_actor::update_procedure( update_procedure_query_param&& query_param) { auto& graph_name = std::get<0>(query_param.content); auto& procedure_name = std::get<1>(query_param.content); auto& parameter = std::get<2>(query_param.content); auto update_procedure_res = server::WorkDirManipulator::UpdateProcedure( graph_name, procedure_name, parameter); - if (update_procedure_res.ok()) { - VLOG(10) << "Successfully update procedure: " << procedure_name; - return seastar::make_ready_future( - std::move(update_procedure_res.value())); - } else { - LOG(ERROR) << "Fail to create procedure: " - << update_procedure_res.status().error_message(); - return seastar::make_exception_future( - std::runtime_error("Fail to create procedures: " + - update_procedure_res.status().error_message())); - } + + return seastar::make_ready_future( + std::move(update_procedure_res)); } // Start service on a graph first means stop all current running actors, then // switch graph and create new actors with a unused scope_id. -seastar::future admin_actor::start_service( +seastar::future admin_actor::start_service( query_param&& query_param) { // parse query_param.content as json and get graph_name auto& content = query_param.content; @@ -302,25 +235,26 @@ seastar::future admin_actor::start_service( LOG(WARNING) << "Starting service with graph: " << graph_name; } catch (std::exception& e) { LOG(ERROR) << "Fail to Start service: "; - return seastar::make_exception_future( - std::runtime_error(e.what())); + return seastar::make_ready_future( + gs::Result( + gs::StatusCode::InvalidSchema, + "Fail to parse json: " + std::string(e.what()))); } auto schema_result = server::WorkDirManipulator::GetGraphSchema(graph_name); if (!schema_result.ok()) { LOG(ERROR) << "Fail to get graph schema: " << schema_result.status().error_message() << ", " << graph_name; - return seastar::make_exception_future(std::runtime_error( - "Fail to get graph schema: " + schema_result.status().error_message() + - ", " + graph_name)); + return seastar::make_ready_future( + gs::Result(schema_result.status())); } auto& schema_value = schema_result.value(); auto data_dir = server::WorkDirManipulator::GetDataDirectory(graph_name); if (!data_dir.ok()) { LOG(ERROR) << "Fail to get data directory: " << data_dir.status().error_message(); - return seastar::make_exception_future(std::runtime_error( - "Fail to get data directory: " + data_dir.status().error_message())); + return seastar::make_ready_future( + gs::Result(data_dir.status())); } auto data_dir_value = data_dir.value(); @@ -342,8 +276,10 @@ seastar::future admin_actor::start_service( if (!db.Open(schema_value, data_dir_value, thread_num).ok()) { LOG(ERROR) << "Fail to load graph from data directory: " << data_dir_value; - return seastar::make_exception_future(std::runtime_error( - "Fail to load graph from data directory: " + data_dir_value)); + return seastar::make_ready_future( + gs::Result( + gs::StatusCode::InternalError, + "Fail to open graph from data directory: " + data_dir_value)); } server::WorkDirManipulator::SetRunningGraph(graph_name); } @@ -354,37 +290,38 @@ seastar::future admin_actor::start_service( server::WorkDirManipulator::GetGraphSchemaPath(graph_name); if (!hqps_service.start_compiler_subprocess(schema_path)) { LOG(ERROR) << "Fail to start compiler"; - return seastar::make_exception_future( + return seastar::make_exception_future( seastar::sstring("Fail to start compiler")); } LOG(INFO) << "Successfully started service with graph: " << graph_name; - return seastar::make_ready_future( - "Successfully start service"); + return seastar::make_ready_future( + gs::Result("Successfully start service")); }); } // Stop service. // Actually stop the query_handler's actors. // The port is still connectable. -seastar::future admin_actor::stop_service( +seastar::future admin_actor::stop_service( query_param&& query_param) { auto& hqps_service = HQPSService::get(); return hqps_service.stop_query_actors().then([&hqps_service] { LOG(INFO) << "Successfully stopped query handler"; if (hqps_service.stop_compiler_subprocess()) { LOG(INFO) << "Successfully stop compiler"; - return seastar::make_ready_future( - seastar::sstring("Successfully stop service")); + return seastar::make_ready_future( + gs::Result("Successfully stop service")); } else { LOG(ERROR) << "Fail to stop compiler"; - return seastar::make_ready_future( - seastar::sstring("Fail to stop compiler")); + return seastar::make_ready_future( + gs::Result(gs::StatusCode::InternalError, + "Fail to stop compiler")); } }); } // get service status -seastar::future admin_actor::service_status( +seastar::future admin_actor::service_status( query_param&& query_param) { auto& hqps_service = HQPSService::get(); auto query_port = hqps_service.get_query_port(); @@ -397,11 +334,12 @@ seastar::future admin_actor::service_status( LOG(INFO) << "Query service has not been inited!"; res["status"] = "Query service has not been inited!"; } - return seastar::make_ready_future(res.dump()); + return seastar::make_ready_future( + gs::Result(res.dump())); } // get node status. -seastar::future admin_actor::node_status( +seastar::future admin_actor::node_status( query_param&& query_param) { // get current host' cpu usage and memory usage auto cpu_usage = gs::get_current_cpu_usage(); @@ -423,7 +361,8 @@ seastar::future admin_actor::node_status( << gs::memory_to_mb_str(mem_usage.second); json["memory_usage"] = ss.str(); } - return seastar::make_ready_future(json.dump()); + return seastar::make_ready_future( + gs::Result(json.dump())); } } // namespace server \ No newline at end of file diff --git a/flex/engines/http_server/actor/admin_actor.act.h b/flex/engines/http_server/actor/admin_actor.act.h index ed0eb2852655..daaa9cbf2266 100644 --- a/flex/engines/http_server/actor/admin_actor.act.h +++ b/flex/engines/http_server/actor/admin_actor.act.h @@ -31,33 +31,33 @@ class ANNOTATION(actor:impl) admin_actor : public hiactor::actor { admin_actor(hiactor::actor_base* exec_ctx, const hiactor::byte_t* addr); ~admin_actor() override; - seastar::future ANNOTATION(actor:method) run_create_graph(query_param&& param); + seastar::future ANNOTATION(actor:method) run_create_graph(query_param&& param); - seastar::future ANNOTATION(actor:method) run_get_graph_schema(query_param&& param); + seastar::future ANNOTATION(actor:method) run_get_graph_schema(query_param&& param); - seastar::future ANNOTATION(actor:method) run_list_graphs(query_param&& param); + seastar::future ANNOTATION(actor:method) run_list_graphs(query_param&& param); - seastar::future ANNOTATION(actor:method) run_delete_graph(query_param&& param); + seastar::future ANNOTATION(actor:method) run_delete_graph(query_param&& param); - seastar::future ANNOTATION(actor:method) run_graph_loading(graph_management_param&& param); + seastar::future ANNOTATION(actor:method) run_graph_loading(graph_management_param&& param); - seastar::future ANNOTATION(actor:method) start_service(query_param&& param); + seastar::future ANNOTATION(actor:method) start_service(query_param&& param); - seastar::future ANNOTATION(actor:method) stop_service(query_param&& param); + seastar::future ANNOTATION(actor:method) stop_service(query_param&& param); - seastar::future ANNOTATION(actor:method) service_status(query_param&& param); + seastar::future ANNOTATION(actor:method) service_status(query_param&& param); - seastar::future ANNOTATION(actor:method) get_procedure_by_procedure_name(procedure_query_param&& param); + seastar::future ANNOTATION(actor:method) get_procedure_by_procedure_name(procedure_query_param&& param); - seastar::future ANNOTATION(actor:method) get_procedures_by_graph_name(query_param&& param); + seastar::future ANNOTATION(actor:method) get_procedures_by_graph_name(query_param&& param); - seastar::future ANNOTATION(actor:method) create_procedure(create_procedure_query_param&& param); + seastar::future ANNOTATION(actor:method) create_procedure(create_procedure_query_param&& param); - seastar::future ANNOTATION(actor:method) delete_procedure(procedure_query_param&& param); + seastar::future ANNOTATION(actor:method) delete_procedure(procedure_query_param&& param); - seastar::future ANNOTATION(actor:method) update_procedure(update_procedure_query_param&& param); + seastar::future ANNOTATION(actor:method) update_procedure(update_procedure_query_param&& param); - seastar::future ANNOTATION(actor:method) node_status(query_param&& param); + seastar::future ANNOTATION(actor:method) node_status(query_param&& param); // DECLARE_RUN_QUERIES; /// Declare `do_work` func here, no need to implement. diff --git a/flex/engines/http_server/handler/admin_http_handler.cc b/flex/engines/http_server/handler/admin_http_handler.cc index ebba9dba514e..8a044d5a651d 100644 --- a/flex/engines/http_server/handler/admin_http_handler.cc +++ b/flex/engines/http_server/handler/admin_http_handler.cc @@ -59,46 +59,31 @@ class admin_http_graph_handler_impl : public seastar::httpd::handler_base { if (path.find("dataloading") != seastar::sstring::npos) { LOG(INFO) << "Route to loading graph"; if (!req->param.exists("graph_name")) { - return seastar::make_exception_future< - std::unique_ptr>( - std::runtime_error("graph_name not exists")); + return new_bad_request_reply(std::move(rep), + "expect field 'graph_name' in request"); } else { auto graph_name = req->param.at("graph_name"); LOG(INFO) << "Graph name: " << graph_name; auto pair = std::make_pair(graph_name, std::move(req->content)); return admin_actor_refs_[dst_executor] .run_graph_loading(graph_management_param{std::move(pair)}) - .then_wrapped([rep = std::move(rep)]( - seastar::future&& fut) mutable { - if (__builtin_expect(fut.failed(), false)) { - return seastar::make_exception_future< - std::unique_ptr>( - fut.get_exception()); - } - auto result = fut.get0(); - rep->write_body("application/json", std::move(result.content)); - rep->done(); - return seastar::make_ready_future< - std::unique_ptr>(std::move(rep)); - }); + .then_wrapped( + [rep = std::move(rep)]( + seastar::future&& fut) mutable { + return return_reply_with_result(std::move(rep), + std::move(fut)); + }); } } else { LOG(INFO) << "Route to creating graph"; return admin_actor_refs_[dst_executor] .run_create_graph(query_param{std::move(req->content)}) - .then_wrapped([rep = std::move(rep)]( - seastar::future&& fut) mutable { - if (__builtin_expect(fut.failed(), false)) { - return seastar::make_exception_future< - std::unique_ptr>( - fut.get_exception()); - } - auto result = fut.get0(); - rep->write_body("application/json", std::move(result.content)); - rep->done(); - return seastar::make_ready_future< - std::unique_ptr>(std::move(rep)); - }); + .then_wrapped( + [rep = std::move(rep)]( + seastar::future&& fut) mutable { + return return_reply_with_result(std::move(rep), + std::move(fut)); + }); } } else if (method == "GET") { if (req->param.exists("graph_name") && @@ -106,35 +91,21 @@ class admin_http_graph_handler_impl : public seastar::httpd::handler_base { auto graph_name = req->param.at("graph_name"); return admin_actor_refs_[dst_executor] .run_get_graph_schema(query_param{std::move(graph_name)}) - .then_wrapped([rep = std::move(rep)]( - seastar::future&& fut) mutable { - if (__builtin_expect(fut.failed(), false)) { - return seastar::make_exception_future< - std::unique_ptr>( - fut.get_exception()); - } - auto result = fut.get0(); - rep->write_body("application/json", std::move(result.content)); - rep->done(); - return seastar::make_ready_future< - std::unique_ptr>(std::move(rep)); - }); + .then_wrapped( + [rep = std::move(rep)]( + seastar::future&& fut) mutable { + return return_reply_with_result(std::move(rep), + std::move(fut)); + }); } else { return admin_actor_refs_[dst_executor] .run_list_graphs(query_param{std::move(req->content)}) - .then_wrapped([rep = std::move(rep)]( - seastar::future&& fut) mutable { - if (__builtin_expect(fut.failed(), false)) { - return seastar::make_exception_future< - std::unique_ptr>( - fut.get_exception()); - } - auto result = fut.get0(); - rep->write_body("application/json", std::move(result.content)); - rep->done(); - return seastar::make_ready_future< - std::unique_ptr>(std::move(rep)); - }); + .then_wrapped( + [rep = std::move(rep)]( + seastar::future&& fut) mutable { + return return_reply_with_result(std::move(rep), + std::move(fut)); + }); } } else if (method == "DELETE") { if (!req->param.exists("graph_name")) { @@ -146,16 +117,8 @@ class admin_http_graph_handler_impl : public seastar::httpd::handler_base { return admin_actor_refs_[dst_executor] .run_delete_graph(query_param{std::move(graph_name)}) .then_wrapped([rep = std::move(rep)]( - seastar::future&& fut) mutable { - if (__builtin_expect(fut.failed(), false)) { - return seastar::make_exception_future< - std::unique_ptr>(fut.get_exception()); - } - auto result = fut.get0(); - rep->write_body("application/json", std::move(result.content)); - rep->done(); - return seastar::make_ready_future< - std::unique_ptr>(std::move(rep)); + seastar::future&& fut) mutable { + return return_reply_with_result(std::move(rep), std::move(fut)); }); } else { return seastar::make_exception_future< @@ -220,37 +183,23 @@ class admin_http_procedure_handler_impl : public seastar::httpd::handler_base { return admin_actor_refs_[dst_executor] .get_procedure_by_procedure_name( procedure_query_param{std::move(pair)}) - .then_wrapped([rep = std::move(rep)]( - seastar::future&& fut) mutable { - if (__builtin_expect(fut.failed(), false)) { - return seastar::make_exception_future< - std::unique_ptr>( - fut.get_exception()); - } - auto result = fut.get0(); - rep->write_body("application/json", std::move(result.content)); - rep->done(); - return seastar::make_ready_future< - std::unique_ptr>(std::move(rep)); - }); + .then_wrapped( + [rep = std::move(rep)]( + seastar::future&& fut) mutable { + return return_reply_with_result(std::move(rep), + std::move(fut)); + }); } else { // get all procedures. LOG(INFO) << "Get all procedures for: " << graph_name; return admin_actor_refs_[dst_executor] .get_procedures_by_graph_name(query_param{std::move(graph_name)}) - .then_wrapped([rep = std::move(rep)]( - seastar::future&& fut) mutable { - if (__builtin_expect(fut.failed(), false)) { - return seastar::make_exception_future< - std::unique_ptr>( - fut.get_exception()); - } - auto result = fut.get0(); - rep->write_body("application/json", std::move(result.content)); - rep->done(); - return seastar::make_ready_future< - std::unique_ptr>(std::move(rep)); - }); + .then_wrapped( + [rep = std::move(rep)]( + seastar::future&& fut) mutable { + return return_reply_with_result(std::move(rep), + std::move(fut)); + }); } } else if (req->_method == "POST") { if (!req->param.exists("graph_name")) { @@ -267,16 +216,8 @@ class admin_http_procedure_handler_impl : public seastar::httpd::handler_base { .create_procedure(create_procedure_query_param{ std::make_pair(graph_name, std::move(req->content))}) .then_wrapped([rep = std::move(rep)]( - seastar::future&& fut) mutable { - if (__builtin_expect(fut.failed(), false)) { - return seastar::make_exception_future< - std::unique_ptr>(fut.get_exception()); - } - auto result = fut.get0(); - rep->write_body("application/json", std::move(result.content)); - rep->done(); - return seastar::make_ready_future< - std::unique_ptr>(std::move(rep)); + seastar::future&& fut) mutable { + return return_reply_with_result(std::move(rep), std::move(fut)); }); } else if (req->_method == "DELETE") { // delete must give graph_name and procedure_name @@ -299,16 +240,8 @@ class admin_http_procedure_handler_impl : public seastar::httpd::handler_base { .delete_procedure( procedure_query_param{std::make_pair(graph_name, procedure_name)}) .then_wrapped([rep = std::move(rep)]( - seastar::future&& fut) mutable { - if (__builtin_expect(fut.failed(), false)) { - return seastar::make_exception_future< - std::unique_ptr>(fut.get_exception()); - } - auto result = fut.get0(); - rep->write_body("application/json", std::move(result.content)); - rep->done(); - return seastar::make_ready_future< - std::unique_ptr>(std::move(rep)); + seastar::future&& fut) mutable { + return return_reply_with_result(std::move(rep), std::move(fut)); }); } else if (req->_method == "PUT") { if (!req->param.exists("graph_name") || @@ -330,16 +263,8 @@ class admin_http_procedure_handler_impl : public seastar::httpd::handler_base { .update_procedure(update_procedure_query_param{ std::make_tuple(graph_name, procedure_name, req->content)}) .then_wrapped([rep = std::move(rep)]( - seastar::future&& fut) mutable { - if (__builtin_expect(fut.failed(), false)) { - return seastar::make_exception_future< - std::unique_ptr>(fut.get_exception()); - } - auto result = fut.get0(); - rep->write_body("application/json", std::move(result.content)); - rep->done(); - return seastar::make_ready_future< - std::unique_ptr>(std::move(rep)); + seastar::future&& fut) mutable { + return return_reply_with_result(std::move(rep), std::move(fut)); }); } else { return seastar::make_exception_future< @@ -395,35 +320,21 @@ class admin_http_service_handler_impl : public seastar::httpd::handler_base { if (action == "start" || action == "restart") { return admin_actor_refs_[dst_executor] .start_service(query_param{std::move(req->content)}) - .then_wrapped([rep = std::move(rep)]( - seastar::future&& fut) mutable { - if (__builtin_expect(fut.failed(), false)) { - return seastar::make_exception_future< - std::unique_ptr>( - fut.get_exception()); - } - auto result = fut.get0(); - rep->write_body("application/json", std::move(result.content)); - rep->done(); - return seastar::make_ready_future< - std::unique_ptr>(std::move(rep)); - }); + .then_wrapped( + [rep = std::move(rep)]( + seastar::future&& fut) mutable { + return return_reply_with_result(std::move(rep), + std::move(fut)); + }); } else if (action == "stop") { return admin_actor_refs_[dst_executor] .stop_service(query_param{std::move(req->content)}) - .then_wrapped([rep = std::move(rep)]( - seastar::future&& fut) mutable { - if (__builtin_expect(fut.failed(), false)) { - return seastar::make_exception_future< - std::unique_ptr>( - fut.get_exception()); - } - auto result = fut.get0(); - rep->write_body("application/json", std::move(result.content)); - rep->done(); - return seastar::make_ready_future< - std::unique_ptr>(std::move(rep)); - }); + .then_wrapped( + [rep = std::move(rep)]( + seastar::future&& fut) mutable { + return return_reply_with_result(std::move(rep), + std::move(fut)); + }); } else { return seastar::make_exception_future< std::unique_ptr>( @@ -435,17 +346,8 @@ class admin_http_service_handler_impl : public seastar::httpd::handler_base { return admin_actor_refs_[dst_executor] .service_status(query_param{std::move(req->content)}) .then_wrapped([rep = std::move(rep)]( - seastar::future&& fut) mutable { - if (__builtin_expect(fut.failed(), false)) { - return seastar::make_exception_future< - std::unique_ptr>(fut.get_exception()); - } - auto result = fut.get0(); - LOG(INFO) << "Service status: " << result.content; - rep->write_body("application/json", std::move(result.content)); - rep->done(); - return seastar::make_ready_future< - std::unique_ptr>(std::move(rep)); + seastar::future&& fut) mutable { + return return_reply_with_result(std::move(rep), std::move(fut)); }); } } @@ -485,17 +387,8 @@ class admin_http_node_handler_impl : public seastar::httpd::handler_base { return admin_actor_refs_[dst_executor] .node_status(query_param{std::move(req->content)}) .then_wrapped([rep = std::move(rep)]( - seastar::future&& fut) mutable { - if (__builtin_expect(fut.failed(), false)) { - return seastar::make_exception_future< - std::unique_ptr>(fut.get_exception()); - } - auto result = fut.get0(); - LOG(INFO) << "Node status: " << result.content; - rep->write_body("application/json", std::move(result.content)); - rep->done(); - return seastar::make_ready_future< - std::unique_ptr>(std::move(rep)); + seastar::future&& fut) mutable { + return return_reply_with_result(std::move(rep), std::move(fut)); }); } else { return seastar::make_exception_future< diff --git a/flex/engines/http_server/handler/admin_http_handler.h b/flex/engines/http_server/handler/admin_http_handler.h index 3d5c3f92a42a..14465e459a95 100644 --- a/flex/engines/http_server/handler/admin_http_handler.h +++ b/flex/engines/http_server/handler/admin_http_handler.h @@ -19,6 +19,7 @@ #include #include #include +#include "flex/engines/http_server/handler/http_utils.h" #include "flex/engines/http_server/types.h" #include "flex/utils/service_utils.h" diff --git a/flex/engines/http_server/handler/http_utils.cc b/flex/engines/http_server/handler/http_utils.cc new file mode 100644 index 000000000000..6db4c95975f1 --- /dev/null +++ b/flex/engines/http_server/handler/http_utils.cc @@ -0,0 +1,105 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "flex/engines/http_server/handler/http_utils.h" + +namespace server { + +seastar::future> new_bad_request_reply( + std::unique_ptr rep, const std::string& msg) { + rep->set_status(seastar::httpd::reply::status_type::bad_request); + rep->write_body("application/json", seastar::sstring(msg)); + rep->done(); + return seastar::make_ready_future>( + std::move(rep)); +} + +seastar::httpd::reply::status_type status_code_to_http_code( + gs::StatusCode code) { + switch (code) { + case gs::StatusCode::OK: + return seastar::httpd::reply::status_type::ok; + case gs::StatusCode::InValidArgument: + return seastar::httpd::reply::status_type::bad_request; + case gs::StatusCode::UnsupportedOperator: + return seastar::httpd::reply::status_type::bad_request; + case gs::StatusCode::AlreadyExists: + return seastar::httpd::reply::status_type::conflict; + case gs::StatusCode::NotExists: + return seastar::httpd::reply::status_type::not_found; + case gs::StatusCode::CodegenError: + return seastar::httpd::reply::status_type::internal_server_error; + case gs::StatusCode::UninitializedStatus: + return seastar::httpd::reply::status_type::internal_server_error; + case gs::StatusCode::InvalidSchema: + return seastar::httpd::reply::status_type::bad_request; + case gs::StatusCode::PermissionError: + return seastar::httpd::reply::status_type::forbidden; + case gs::StatusCode::IllegalOperation: + return seastar::httpd::reply::status_type::bad_request; + case gs::StatusCode::InternalError: + return seastar::httpd::reply::status_type::internal_server_error; + case gs::StatusCode::InvalidImportFile: + return seastar::httpd::reply::status_type::bad_request; + case gs::StatusCode::IOError: + return seastar::httpd::reply::status_type::internal_server_error; + case gs::StatusCode::NotFound: + return seastar::httpd::reply::status_type::not_found; + case gs::StatusCode::QueryFailed: + return seastar::httpd::reply::status_type::internal_server_error; + default: + return seastar::httpd::reply::status_type::internal_server_error; + } +} + +seastar::future> +catch_exception_and_return_reply(std::unique_ptr rep, + std::exception_ptr ex) { + try { + std::rethrow_exception(ex); + } catch (std::exception& e) { + LOG(ERROR) << "Exception: " << e.what(); + seastar::sstring what = e.what(); + rep->write_body("application/json", std::move(what)); + rep->set_status(seastar::httpd::reply::status_type::bad_request); + rep->done(); + return seastar::make_ready_future>( + std::move(rep)); + } +} + +seastar::future> +return_reply_with_result(std::unique_ptr rep, + seastar::future&& fut) { + if (__builtin_expect(fut.failed(), false)) { + return catch_exception_and_return_reply(std::move(rep), + fut.get_exception()); + } + auto&& result = fut.get0(); + auto status_code = + status_code_to_http_code(result.content.status().error_code()); + rep->set_status(status_code); + if (status_code == seastar::httpd::reply::status_type::ok) { + rep->write_body("application/json", std::move(result.content.value())); + } else { + rep->write_body("application/json", + seastar::sstring(result.content.status().error_message())); + } + rep->done(); + return seastar::make_ready_future>( + std::move(rep)); +} + +} // namespace server \ No newline at end of file diff --git a/flex/engines/http_server/handler/http_utils.h b/flex/engines/http_server/handler/http_utils.h new file mode 100644 index 000000000000..e303beb68d35 --- /dev/null +++ b/flex/engines/http_server/handler/http_utils.h @@ -0,0 +1,40 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "flex/engines/http_server/types.h" +#include "flex/utils/result.h" +#include "seastar/http/reply.hh" + +#ifndef ENGINES_HTTP_SERVER_HANDLER_HTTP_UTILS_H_ +#define ENGINES_HTTP_SERVER_HANDLER_HTTP_UTILS_H_ + +namespace server { + +seastar::future> new_bad_request_reply( + std::unique_ptr rep, const std::string& msg); + +seastar::httpd::reply::status_type status_code_to_http_code( + gs::StatusCode code); + +seastar::future> +catch_exception_and_return_reply(std::unique_ptr rep, + std::exception_ptr ex); + +seastar::future> +return_reply_with_result(std::unique_ptr rep, + seastar::future&& fut); + +} // namespace server + +#endif // ENGINES_HTTP_SERVER_HANDLER_HTTP_UTILS_H_