Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(interactive): Implement HttpIrMetaReader to Get Meta Data From Remote Http Service #3908

Merged
merged 44 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
6336844
[GIE Compiler] support reading ir meta from meta service by http api
shirly121 May 29, 2024
5582553
remove useless files
shirly121 May 29, 2024
d35c5bb
minor fix
shirly121 May 29, 2024
f785414
[GIE Compiler] support reading graph statistics using meta sdk
shirly121 May 30, 2024
3513782
minor fix
shirly121 May 30, 2024
6c3a32a
refine reading statistics interface
shirly121 May 31, 2024
e9e7b5a
todo: after merge the get_statistics pr, fix the ci
zhanglei1949 Jun 3, 2024
49d7d5f
Merge branch 'main' into ir_get_meta
shirly121 Jun 6, 2024
83e3a31
add necessary config for irConfigs
BingqingLyu Jun 6, 2024
e1292e2
add necessary config for irConfigs
BingqingLyu Jun 6, 2024
675db21
Merge remote-tracking branch 'origin/main' into ir_get_meta
shirly121 Jun 11, 2024
0e72f83
Merge branch 'ir_get_meta' of github.com:shirly121/GraphScope into ir…
shirly121 Jun 11, 2024
dc6ba31
fix ci
zhanglei1949 Jun 12, 2024
906766f
Merge branch 'main' into ir_get_meta
zhanglei1949 Jun 12, 2024
53eab57
[GIE Compiler] refine according to reviews
shirly121 Jun 13, 2024
6996274
fix ci issues
shirly121 Jun 13, 2024
063fa0e
minor fix
shirly121 Jun 13, 2024
dcf0f8d
minor fix
shirly121 Jun 13, 2024
229d92e
minor fix
shirly121 Jun 13, 2024
fb6c702
Merge branch 'main' into ir_get_meta
zhanglei1949 Jun 13, 2024
77f2bb0
fixing ci
zhanglei1949 Jun 13, 2024
c719c21
Merge branch 'main' into ir_get_meta
longbinlai Jun 13, 2024
867cdde
Merge branch 'main' into ir_get_meta
BingqingLyu Jun 14, 2024
c53a009
minor refine config
BingqingLyu Jun 14, 2024
3072239
fix ci tests
shirly121 Jun 14, 2024
7e9d687
Merge branch 'ir_get_meta' of github.com:shirly121/GraphScope into ir…
shirly121 Jun 14, 2024
8ec0447
fixing hqps ci
zhanglei1949 Jun 14, 2024
076ffec
minor fix
shirly121 Jun 14, 2024
3e22d82
Merge branch 'ir_get_meta' of github.com:shirly121/GraphScope into ir…
shirly121 Jun 14, 2024
ba4d2d7
minor refine
BingqingLyu Jun 14, 2024
1912d5f
Merge branch 'main' into ir_get_meta
longbinlai Jun 14, 2024
23a0509
Merge branch 'main' into ir_get_meta
zhanglei1949 Jun 14, 2024
64dd138
Merge branch 'main' into ir_get_meta
BingqingLyu Jun 14, 2024
aa256f8
fix packaging
zhanglei1949 Jun 14, 2024
767303b
Merge branch 'main' into ir_get_meta
zhanglei1949 Jun 14, 2024
4a6c36d
minor update in values.yaml
BingqingLyu Jun 17, 2024
5cb7940
Merge branch 'main' into ir_get_meta
BingqingLyu Jun 19, 2024
e5b41a7
Merge branch 'main' into ir_get_meta
zhanglei1949 Jun 20, 2024
9d14d68
Merge branch 'main' into ir_get_meta
BingqingLyu Jun 24, 2024
eea8a78
fix: set snapshot id as the latest in frontend cache, to avoid incons…
BingqingLyu Jun 25, 2024
0b94c17
Merge branch 'main' into ir_get_meta
zhanglei1949 Jun 25, 2024
bca39bb
fixing ci
zhanglei1949 Jun 25, 2024
ef92dc9
Merge branch 'main' into ir_get_meta
zhanglei1949 Jun 25, 2024
4736052
fix
zhanglei1949 Jun 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .github/workflows/interactive.yml
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,18 @@ jobs:
./tests/hqps/query_test ${GS_TEST_DIR}/flex/ldbc-sf01-long-date/audit_graph_schema.yaml \
/tmp/csr-data-dir/

- name: Test get graph meta from admin service
env:
GS_TEST_DIR: ${{ github.workspace }}/gstest
INTERACTIVE_WORKSPACE: /tmp/interactive_workspace
run: |
cd ${GITHUB_WORKSPACE}/flex/tests/hqps
sed -i 's/default_graph: ldbc/default_graph: modern_graph/g' ./engine_config_test.yaml
pip3 install argparse
pip3 install neo4j
bash hqps_compiler_get_meta_test.sh ${INTERACTIVE_WORKSPACE} ./engine_config_test.yaml
sed -i 's/default_graph: modern_graph/default_graph: ldbc/g' ./engine_config_test.yaml

- name: Run codegen test.
env:
GS_TEST_DIR: ${{ github.workspace }}/gstest
Expand Down
56 changes: 52 additions & 4 deletions flex/engines/http_server/actor/admin_actor.act.cc
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,13 @@ seastar::future<admin_query_result> admin_actor::run_get_graph_meta(
add_runnable_info(plugin_meta);
}
auto& graph_meta = meta_res.value();
graph_meta.plugin_metas = all_plugin_metas;
// There can also be procedures that builtin in the graph meta.
for (auto& plugin_meta : graph_meta.plugin_metas) {
add_runnable_info(plugin_meta);
}
graph_meta.plugin_metas.insert(graph_meta.plugin_metas.end(),
all_plugin_metas.begin(),
all_plugin_metas.end());
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(std::move(graph_meta.ToJson())));
} else {
Expand Down Expand Up @@ -694,6 +700,12 @@ seastar::future<admin_query_result> admin_actor::get_procedures_by_graph_name(
for (auto& plugin_meta : all_plugin_metas) {
add_runnable_info(plugin_meta);
}
for (auto& plugin_meta : graph_meta_res.value().plugin_metas) {
add_runnable_info(plugin_meta);
}
all_plugin_metas.insert(all_plugin_metas.end(),
graph_meta_res.value().plugin_metas.begin(),
graph_meta_res.value().plugin_metas.end());
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(to_json_str(all_plugin_metas)));
} else {
Expand Down Expand Up @@ -1123,13 +1135,49 @@ seastar::future<admin_query_result> admin_actor::service_status(
res["bolt_port"] = hqps_service.get_service_config().bolt_port;
res["gremlin_port"] = hqps_service.get_service_config().gremlin_port;
if (running_graph_res.ok()) {
auto graph_meta =
auto graph_meta_res =
metadata_store_->GetGraphMeta(running_graph_res.value());
if (graph_meta.ok()) {
res["graph"] = nlohmann::json::parse(graph_meta.value().ToJson());
if (graph_meta_res.ok()) {
auto& graph_meta = graph_meta_res.value();
// Add the plugin meta.
auto get_all_procedure_res =
metadata_store_->GetAllPluginMeta(running_graph_res.value());
if (get_all_procedure_res.ok()) {
VLOG(10) << "Successfully get all procedures: "
<< get_all_procedure_res.value().size();
auto& all_plugin_metas = get_all_procedure_res.value();
VLOG(10) << "original all plugins : " << all_plugin_metas.size();
for (auto& plugin_meta : all_plugin_metas) {
add_runnable_info(plugin_meta);
}
for (auto& plugin_meta : graph_meta.plugin_metas) {
add_runnable_info(plugin_meta);
}

VLOG(10) << "original graph meta: " << graph_meta.plugin_metas.size();
for (auto& plugin_meta : all_plugin_metas) {
if (plugin_meta.runnable) {
graph_meta.plugin_metas.emplace_back(plugin_meta);
}
}
VLOG(10) << "got graph meta: " << graph_meta.ToJson();
res["graph"] = nlohmann::json::parse(graph_meta.ToJson());
} else {
LOG(ERROR) << "Fail to get all procedures: "
<< get_all_procedure_res.status().error_message();
return seastar::make_exception_future<admin_query_result>(
get_all_procedure_res.status());
}
} else {
LOG(ERROR) << "Fail to get graph meta: "
<< graph_meta_res.status().error_message();
res["graph"] = {};
return seastar::make_exception_future<admin_query_result>(
graph_meta_res.status());
}
} else {
res["graph"] = {};
LOG(INFO) << "No graph is running";
}
res["start_time"] = hqps_service.get_start_time();
} else {
Expand Down
3 changes: 2 additions & 1 deletion flex/engines/http_server/service/hqps_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ bool HQPSService::start_compiler_subprocess(
std::stringstream ss;
ss << "java -cp " << interactive_class_path;
if (!graph_schema_path.empty()) {
ss << " -Dgraph.schema=" << graph_schema_path;
ss << " -Dgraph.schema=http://localhost:" << service_config_.admin_port
<< "/v1/service/status";
}
ss << " " << COMPILER_SERVER_CLASS_NAME;
ss << " " << service_config_.engine_config_path;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ def __init__(self, uri: str):
# get service port
service_status = self.get_service_status()
if not service_status.is_ok():
raise Exception("Failed to get service status")
raise Exception("Failed to get service status: ", service_status.get_status_message())
service_port = service_status.get_value().hqps_port
# replace the port in uri
uri = uri.split(":")
Expand Down
Empty file.
65 changes: 62 additions & 3 deletions flex/interactive/sdk/python/test/test_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def tearDown(self):
print("delete graph: ", rep2)

def test_example(self):
self.createGraph()
self._graph_id = self.createGraph()
self.bulkLoading()
self.waitJobFinish()
self.list_graph()
Expand All @@ -103,6 +103,7 @@ def test_example(self):
self.createCypherProcedure()
self.createCppProcedure()
self.restart()
self.restartOnNewGraph()
self.getStatistics()
self.callProcedure()
self.callProcedureWithHttp()
Expand Down Expand Up @@ -154,8 +155,7 @@ def createGraph(self):
create_graph.var_schema = create_schema
resp = self._sess.create_graph(create_graph)
assert resp.is_ok()
self._graph_id = resp.get_value().graph_id
print("create graph: ", self._graph_id)
return resp.get_value().graph_id

def bulkLoading(self):
assert os.environ.get("FLEX_DATA_DIR") is not None
Expand Down Expand Up @@ -263,6 +263,65 @@ def restart(self):
print("restart: ", resp.get_value())
# wait 5 seconds
time.sleep(5)
# get service status
resp = self._sess.get_service_status()
assert resp.is_ok()
print("get service status: ", resp.get_value())

def restartOnNewGraph(self):
original_graph_id = None
status_res = self._sess.get_service_status()
assert status_res.is_ok()
status = status_res.get_value()
if status.status == "Running":
if status.graph is not None and status.graph.id is not None:
original_graph_id = status.graph.id
else:
raise Exception("service status error, graph id is None")
elif status.status == "Stopped":
pass
else:
raise Exception("service status error " + status)
assert original_graph_id is not None
# create new graph
new_graph_id = self.createGraph()
# start service
print("start service on new graph: ", new_graph_id)
start_service_res = self._sess.start_service(
start_service_request=StartServiceRequest(graph_id=new_graph_id)
)
assert start_service_res.is_ok()
# restart service
print("restart service on new graph: ", new_graph_id)
restart_res = self._sess.restart_service()
assert restart_res.is_ok()
# get status
print("get service status: ")
status_res = self._sess.get_service_status()
assert status_res.is_ok()
print("get service status: ", status_res.get_value().status)
# stop
print("stop service: ")
stop_res = self._sess.stop_service()
assert stop_res.is_ok()
# get status
print("get service status: ")
status_res = self._sess.get_service_status()
assert status_res.is_ok()
print("get service status: ", status_res.get_value().status)
assert status_res.get_value().status == "Stopped"
# after stop, we should be able to delete the graph
print("delete graph: ", new_graph_id)
delete_res = self._sess.delete_graph(new_graph_id)
assert delete_res.is_ok()
# start on original graph
print("start service on original graph: ", original_graph_id)
start_service_res = self._sess.start_service(
start_service_request=StartServiceRequest(graph_id=original_graph_id)
)
assert start_service_res.is_ok()
print("finish restartOnNewGraph")
time.sleep(5)

def getStatistics(self):
resp = self._sess.get_graph_statistics(self._graph_id)
Expand Down
2 changes: 2 additions & 0 deletions flex/openapi/openapi_interactive.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1298,6 +1298,8 @@ components:
type: boolean
creation_time:
type: integer
update_time:
type: integer
UpdateProcedureRequest:
x-body-name: update_procedure_request
type: object
Expand Down
15 changes: 15 additions & 0 deletions flex/storages/metadata/graph_meta_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ PluginMeta PluginMeta::FromJson(const nlohmann::json& json) {
}
if (json.contains("name")) {
meta.name = json["name"].get<std::string>();
if (meta.id.empty()) {
meta.id = meta.name;
}
}
if (json.contains("bound_graph")) {
meta.bound_graph = json["bound_graph"].get<GraphId>();
Expand All @@ -155,6 +158,8 @@ PluginMeta PluginMeta::FromJson(const nlohmann::json& json) {
}
if (json.contains("type")) {
meta.type = json["type"].get<std::string>();
} else {
meta.type = "cpp"; // default is cpp
}
if (json.contains("option")) {
meta.setOptionFromJsonString(json["option"].dump());
Expand Down Expand Up @@ -337,6 +342,11 @@ CreateGraphMetaRequest CreateGraphMetaRequest::FromJson(
} else {
request.creation_time = GetCurrentTimeStamp();
}
if (json.contains("stored_procedures")) {
for (auto& plugin : json["stored_procedures"]) {
request.plugin_metas.push_back(PluginMeta::FromJson(plugin));
}
}
return request;
}

Expand All @@ -351,6 +361,11 @@ std::string CreateGraphMetaRequest::ToString() const {
json["data_update_time"] = 0;
}
json["creation_time"] = creation_time;
json["stored_procedures"] = nlohmann::json::array();
for (auto& plugin_meta : plugin_metas) {
json["stored_procedures"].push_back(
nlohmann::json::parse(plugin_meta.ToJson()));
}
return json.dump();
}

Expand Down
2 changes: 2 additions & 0 deletions flex/storages/metadata/graph_meta_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ struct CreateGraphMetaRequest {
std::optional<uint64_t> data_update_time;
int64_t creation_time;

std::vector<PluginMeta> plugin_metas;

static CreateGraphMetaRequest FromJson(const std::string& json_str);

std::string ToString() const;
Expand Down
8 changes: 8 additions & 0 deletions flex/tests/hqps/engine_config_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ compiler:
- FilterIntoJoinRule
- FilterMatchRule
- NotMatchToAntiJoinRule
meta:
reader:
schema:
uri: http://localhost:7777/v1/service/status
interval: 1000 # ms
statistics:
uri: http://localhost:7777/v1/graph/%s/statistics
interval: 86400000 # ms
endpoint:
default_listen_address: localhost
bolt_connector:
Expand Down
99 changes: 99 additions & 0 deletions flex/tests/hqps/hqps_compiler_get_meta_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#!/bin/bash
# Copyright 2020 Alibaba Group Holding Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -e
SCRIPT_DIR=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd)
FLEX_HOME=${SCRIPT_DIR}/../../
SERVER_BIN=${FLEX_HOME}/build/bin/interactive_server
GIE_HOME=${FLEX_HOME}/../interactive_engine/
ADMIN_PORT=7777
QUERY_PORT=10000

#
if [ ! $# -eq 2 ]; then
echo "only receives: $# args, need 2"
echo "Usage: $0 <INTERACTIVE_WORKSPACE> <ENGINE_CONFIG>"
exit 1
fi

INTERACTIVE_WORKSPACE=$1
ENGINE_CONFIG_PATH=$2
if [ ! -d ${INTERACTIVE_WORKSPACE} ]; then
echo "INTERACTIVE_WORKSPACE: ${INTERACTIVE_WORKSPACE} not exists"
exit 1
fi
if [ ! -f ${ENGINE_CONFIG_PATH} ]; then
echo "ENGINE_CONFIG: ${ENGINE_CONFIG_PATH} not exists"
exit 1
fi


RED='\033[0;31m'
GREEN='\033[0;32m'
NC='\033[0m' # No Color
err() {
echo -e "${RED}[$(date +'%Y-%m-%d %H:%M:%S')] -ERROR- $* ${NC}" >&2
}

info() {
echo -e "${GREEN}[$(date +'%Y-%m-%d %H:%M:%S')] -INFO- $* ${NC}"
}


kill_service(){
info "Kill Service first"
ps -ef | grep "interactive_server" | awk '{print $2}' | xargs kill -9 || true
ps -ef | grep "GraphServer" | awk '{print $2}' | xargs kill -9 || true
sleep 3
# check if service is killed
info "Kill Service success"
}

# kill service when exit
trap kill_service EXIT

# start engine service and load ldbc graph
start_engine_service(){
#check SERVER_BIN exists
if [ ! -f ${SERVER_BIN} ]; then
err "SERVER_BIN not found"
exit 1
fi

cmd="${SERVER_BIN} -c ${ENGINE_CONFIG_PATH} --enable-admin-service true "
cmd="${cmd} -w ${INTERACTIVE_WORKSPACE} --start-compiler true"

echo "Start engine service with command: ${cmd}"
${cmd} &
sleep 10
#check interactive_server is running, if not, exit
ps -ef | grep "interactive_server" | grep -v grep

info "Start engine service success"
}

run_cypher_test() {
# run a simple cypher query: MATCH (n) RETURN count(n)
python3 ./test_count_vertices.py --endpoint neo4j://localhost:7687
}

kill_service
start_engine_service
# comiper service will fail to start, if the graph meta can not be retrieved
run_cypher_test
kill_service




Loading
Loading