Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(interactive): Introduce explicit barrier for actors when switching query service to a different graph #3395

Merged
merged 4 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/flex/interactive/development/admin_service.md
Original file line number Diff line number Diff line change
Expand Up @@ -942,7 +942,9 @@ curl -X DELETE -H "Content-Type: application/json" "http://[host]/v1/graph/{grap

#### Description

Start the query service on a graph.
Start the query service on a graph. The `graph_name` param can be empty, indicating restarting on current running graph.

After the AdminService receives this request, any new requests received by query_service will not be executed and will be rejected (but requests that have already been received will be executed); we will proceed by closing the current graph, then opening the specified new graph, and reloading the stored procedures. The service of query_service will not be reopened until these steps are completed.

#### HTTP Request
- **Method**: POST
Expand Down
4 changes: 2 additions & 2 deletions flex/bin/bulk_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ int main(int argc, char** argv) {
}
std::filesystem::path serial_path = data_dir_path / "schema";
if (std::filesystem::exists(serial_path)) {
LOG(ERROR) << "data directory is not empty";
return -1;
LOG(WARNING) << "data directory is not empty";
return 0;
}

auto loader = gs::LoaderFactory::CreateFragmentLoader(
Expand Down
67 changes: 39 additions & 28 deletions flex/engines/http_server/actor/admin_actor.act.cc
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,8 @@ seastar::future<query_result> admin_actor::update_procedure(
}
}

// Manage the service of one graph.
// Start service on a graph first means stop all current running actors, then
// switch graph and and create new actors with a unused scope_id.
seastar::future<query_result> admin_actor::start_service(
query_param&& query_param) {
// parse query_param.content as json and get graph_name
Expand All @@ -296,49 +297,59 @@ seastar::future<query_result> admin_actor::start_service(
<< server::WorkDirManipulator::GetRunningGraph();
}
LOG(WARNING) << "Starting service with graph: " << graph_name;
} catch (std::exception& e) {
LOG(ERROR) << "Fail to Start service: ";
return seastar::make_exception_future<query_result>(
std::runtime_error(e.what()));
}

auto schema_result = server::WorkDirManipulator::GetGraphSchema(graph_name);
if (!schema_result.ok()) {
LOG(ERROR) << "Fail to get graph schema: "
<< schema_result.status().error_message() << ", " << graph_name;
return seastar::make_exception_future<query_result>(std::runtime_error(
"Fail to get graph schema: " + schema_result.status().error_message() +
", " + graph_name));
}
auto& schema_value = schema_result.value();
auto data_dir = server::WorkDirManipulator::GetDataDirectory(graph_name);
if (!data_dir.ok()) {
LOG(ERROR) << "Fail to get data directory: "
<< data_dir.status().error_message();
return seastar::make_exception_future<query_result>(std::runtime_error(
"Fail to get data directory: " + data_dir.status().error_message()));
}
auto data_dir_value = data_dir.value();

// First Stop query_handler's actors.

auto& hqps_service = HQPSService::get();
return hqps_service.stop_query_actors().then([this, graph_name, schema_value,
data_dir_value, &hqps_service] {
LOG(INFO) << "Successfully stopped query handler";

auto schema_result = server::WorkDirManipulator::GetGraphSchema(graph_name);
if (!schema_result.ok()) {
LOG(ERROR) << "Fail to get graph schema: "
<< schema_result.status().error_message() << ", "
<< graph_name;
return seastar::make_exception_future<query_result>(std::runtime_error(
"Fail to get graph schema: " +
schema_result.status().error_message() + ", " + graph_name));
}
auto data_dir = server::WorkDirManipulator::GetDataDirectory(graph_name);
if (!data_dir.ok()) {
LOG(ERROR) << "Fail to get data directory: "
<< data_dir.status().error_message();
return seastar::make_exception_future<query_result>(std::runtime_error(
"Fail to get data directory: " + data_dir.status().error_message()));
}
{
std::lock_guard<std::mutex> lock(mtx_);
auto& db = gs::GraphDB::get();
LOG(INFO) << "Update service running on graph:" << graph_name;
auto& schema_value = schema_result.value();

// use the previous thread num
auto thread_num = db.SessionNum();
db.Close();
if (!db.Open(schema_value, data_dir.value(), thread_num).ok()) {
if (!db.Open(schema_value, data_dir_value, thread_num).ok()) {
LOG(ERROR) << "Fail to load graph from data directory: "
<< data_dir.value();
<< data_dir_value;
return seastar::make_exception_future<query_result>(std::runtime_error(
"Fail to load graph from data directory: " + data_dir.value()));
"Fail to load graph from data directory: " + data_dir_value));
}
server::WorkDirManipulator::SetRunningGraph(graph_name);
}

hqps_service.start_query_actors(); // start on a new scope.
LOG(INFO) << "Successfully restart query actors";
LOG(INFO) << "Successfully started service with graph: " << graph_name;

return seastar::make_ready_future<query_result>(
"Successfully start service");
} catch (std::exception& e) {
LOG(ERROR) << "Fail to Start service: ";
return seastar::make_exception_future<query_result>(
std::runtime_error(e.what()));
}
});
}

// get service status
Expand Down
Loading
Loading