diff --git a/.github/workflows/interactive.yml b/.github/workflows/interactive.yml index 4e5785b30484..101f9ba8ea07 100644 --- a/.github/workflows/interactive.yml +++ b/.github/workflows/interactive.yml @@ -181,6 +181,18 @@ jobs: ./tests/hqps/query_test ${GS_TEST_DIR}/flex/ldbc-sf01-long-date/audit_graph_schema.yaml \ /tmp/csr-data-dir/ + - name: Test get graph meta from admin service + env: + GS_TEST_DIR: ${{ github.workspace }}/gstest + INTERACTIVE_WORKSPACE: /tmp/interactive_workspace + run: | + cd ${GITHUB_WORKSPACE}/flex/tests/hqps + sed -i 's/default_graph: ldbc/default_graph: modern_graph/g' ./engine_config_test.yaml + pip3 install argparse + pip3 install neo4j + bash hqps_compiler_get_meta_test.sh ${INTERACTIVE_WORKSPACE} ./engine_config_test.yaml + sed -i 's/default_graph: modern_graph/default_graph: ldbc/g' ./engine_config_test.yaml + - name: Run codegen test. env: GS_TEST_DIR: ${{ github.workspace }}/gstest diff --git a/flex/engines/http_server/actor/admin_actor.act.cc b/flex/engines/http_server/actor/admin_actor.act.cc index f810e34679a2..847c6d7b01ca 100644 --- a/flex/engines/http_server/actor/admin_actor.act.cc +++ b/flex/engines/http_server/actor/admin_actor.act.cc @@ -485,7 +485,13 @@ seastar::future admin_actor::run_get_graph_meta( add_runnable_info(plugin_meta); } auto& graph_meta = meta_res.value(); - graph_meta.plugin_metas = all_plugin_metas; + // There can also be procedures that builtin in the graph meta. + for (auto& plugin_meta : graph_meta.plugin_metas) { + add_runnable_info(plugin_meta); + } + graph_meta.plugin_metas.insert(graph_meta.plugin_metas.end(), + all_plugin_metas.begin(), + all_plugin_metas.end()); return seastar::make_ready_future( gs::Result(std::move(graph_meta.ToJson()))); } else { @@ -694,6 +700,12 @@ seastar::future admin_actor::get_procedures_by_graph_name( for (auto& plugin_meta : all_plugin_metas) { add_runnable_info(plugin_meta); } + for (auto& plugin_meta : graph_meta_res.value().plugin_metas) { + add_runnable_info(plugin_meta); + } + all_plugin_metas.insert(all_plugin_metas.end(), + graph_meta_res.value().plugin_metas.begin(), + graph_meta_res.value().plugin_metas.end()); return seastar::make_ready_future( gs::Result(to_json_str(all_plugin_metas))); } else { @@ -1123,13 +1135,49 @@ seastar::future admin_actor::service_status( res["bolt_port"] = hqps_service.get_service_config().bolt_port; res["gremlin_port"] = hqps_service.get_service_config().gremlin_port; if (running_graph_res.ok()) { - auto graph_meta = + auto graph_meta_res = metadata_store_->GetGraphMeta(running_graph_res.value()); - if (graph_meta.ok()) { - res["graph"] = nlohmann::json::parse(graph_meta.value().ToJson()); + if (graph_meta_res.ok()) { + auto& graph_meta = graph_meta_res.value(); + // Add the plugin meta. + auto get_all_procedure_res = + metadata_store_->GetAllPluginMeta(running_graph_res.value()); + if (get_all_procedure_res.ok()) { + VLOG(10) << "Successfully get all procedures: " + << get_all_procedure_res.value().size(); + auto& all_plugin_metas = get_all_procedure_res.value(); + VLOG(10) << "original all plugins : " << all_plugin_metas.size(); + for (auto& plugin_meta : all_plugin_metas) { + add_runnable_info(plugin_meta); + } + for (auto& plugin_meta : graph_meta.plugin_metas) { + add_runnable_info(plugin_meta); + } + + VLOG(10) << "original graph meta: " << graph_meta.plugin_metas.size(); + for (auto& plugin_meta : all_plugin_metas) { + if (plugin_meta.runnable) { + graph_meta.plugin_metas.emplace_back(plugin_meta); + } + } + VLOG(10) << "got graph meta: " << graph_meta.ToJson(); + res["graph"] = nlohmann::json::parse(graph_meta.ToJson()); + } else { + LOG(ERROR) << "Fail to get all procedures: " + << get_all_procedure_res.status().error_message(); + return seastar::make_exception_future( + get_all_procedure_res.status()); + } + } else { + LOG(ERROR) << "Fail to get graph meta: " + << graph_meta_res.status().error_message(); + res["graph"] = {}; + return seastar::make_exception_future( + graph_meta_res.status()); } } else { res["graph"] = {}; + LOG(INFO) << "No graph is running"; } res["start_time"] = hqps_service.get_start_time(); } else { diff --git a/flex/engines/http_server/service/hqps_service.cc b/flex/engines/http_server/service/hqps_service.cc index 54865c29212d..342db5fb34e7 100644 --- a/flex/engines/http_server/service/hqps_service.cc +++ b/flex/engines/http_server/service/hqps_service.cc @@ -252,7 +252,8 @@ bool HQPSService::start_compiler_subprocess( std::stringstream ss; ss << "java -cp " << interactive_class_path; if (!graph_schema_path.empty()) { - ss << " -Dgraph.schema=" << graph_schema_path; + ss << " -Dgraph.schema=http://localhost:" << service_config_.admin_port + << "/v1/service/status"; } ss << " " << COMPILER_SERVER_CLASS_NAME; ss << " " << service_config_.engine_config_path; diff --git a/flex/interactive/sdk/python/gs_interactive/client/session.py b/flex/interactive/sdk/python/gs_interactive/client/session.py index c1d821078bd8..79993c3560b1 100644 --- a/flex/interactive/sdk/python/gs_interactive/client/session.py +++ b/flex/interactive/sdk/python/gs_interactive/client/session.py @@ -325,7 +325,7 @@ def __init__(self, uri: str): # get service port service_status = self.get_service_status() if not service_status.is_ok(): - raise Exception("Failed to get service status") + raise Exception("Failed to get service status: ", service_status.get_status_message()) service_port = service_status.get_value().hqps_port # replace the port in uri uri = uri.split(":") diff --git a/flex/interactive/sdk/python/py.typed b/flex/interactive/sdk/python/py.typed new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/flex/interactive/sdk/python/test/test_driver.py b/flex/interactive/sdk/python/test/test_driver.py index d0f010d92277..fbadf86f9770 100644 --- a/flex/interactive/sdk/python/test/test_driver.py +++ b/flex/interactive/sdk/python/test/test_driver.py @@ -94,7 +94,7 @@ def tearDown(self): print("delete graph: ", rep2) def test_example(self): - self.createGraph() + self._graph_id = self.createGraph() self.bulkLoading() self.waitJobFinish() self.list_graph() @@ -103,6 +103,7 @@ def test_example(self): self.createCypherProcedure() self.createCppProcedure() self.restart() + self.restartOnNewGraph() self.getStatistics() self.callProcedure() self.callProcedureWithHttp() @@ -154,8 +155,7 @@ def createGraph(self): create_graph.var_schema = create_schema resp = self._sess.create_graph(create_graph) assert resp.is_ok() - self._graph_id = resp.get_value().graph_id - print("create graph: ", self._graph_id) + return resp.get_value().graph_id def bulkLoading(self): assert os.environ.get("FLEX_DATA_DIR") is not None @@ -263,6 +263,65 @@ def restart(self): print("restart: ", resp.get_value()) # wait 5 seconds time.sleep(5) + # get service status + resp = self._sess.get_service_status() + assert resp.is_ok() + print("get service status: ", resp.get_value()) + + def restartOnNewGraph(self): + original_graph_id = None + status_res = self._sess.get_service_status() + assert status_res.is_ok() + status = status_res.get_value() + if status.status == "Running": + if status.graph is not None and status.graph.id is not None: + original_graph_id = status.graph.id + else: + raise Exception("service status error, graph id is None") + elif status.status == "Stopped": + pass + else: + raise Exception("service status error " + status) + assert original_graph_id is not None + # create new graph + new_graph_id = self.createGraph() + # start service + print("start service on new graph: ", new_graph_id) + start_service_res = self._sess.start_service( + start_service_request=StartServiceRequest(graph_id=new_graph_id) + ) + assert start_service_res.is_ok() + # restart service + print("restart service on new graph: ", new_graph_id) + restart_res = self._sess.restart_service() + assert restart_res.is_ok() + # get status + print("get service status: ") + status_res = self._sess.get_service_status() + assert status_res.is_ok() + print("get service status: ", status_res.get_value().status) + # stop + print("stop service: ") + stop_res = self._sess.stop_service() + assert stop_res.is_ok() + # get status + print("get service status: ") + status_res = self._sess.get_service_status() + assert status_res.is_ok() + print("get service status: ", status_res.get_value().status) + assert status_res.get_value().status == "Stopped" + # after stop, we should be able to delete the graph + print("delete graph: ", new_graph_id) + delete_res = self._sess.delete_graph(new_graph_id) + assert delete_res.is_ok() + # start on original graph + print("start service on original graph: ", original_graph_id) + start_service_res = self._sess.start_service( + start_service_request=StartServiceRequest(graph_id=original_graph_id) + ) + assert start_service_res.is_ok() + print("finish restartOnNewGraph") + time.sleep(5) def getStatistics(self): resp = self._sess.get_graph_statistics(self._graph_id) diff --git a/flex/openapi/openapi_interactive.yaml b/flex/openapi/openapi_interactive.yaml index f8f2df293b39..736409d7a3c2 100644 --- a/flex/openapi/openapi_interactive.yaml +++ b/flex/openapi/openapi_interactive.yaml @@ -1298,6 +1298,8 @@ components: type: boolean creation_time: type: integer + update_time: + type: integer UpdateProcedureRequest: x-body-name: update_procedure_request type: object diff --git a/flex/storages/metadata/graph_meta_store.cc b/flex/storages/metadata/graph_meta_store.cc index d99d17a9817f..44801276ab91 100644 --- a/flex/storages/metadata/graph_meta_store.cc +++ b/flex/storages/metadata/graph_meta_store.cc @@ -134,6 +134,9 @@ PluginMeta PluginMeta::FromJson(const nlohmann::json& json) { } if (json.contains("name")) { meta.name = json["name"].get(); + if (meta.id.empty()) { + meta.id = meta.name; + } } if (json.contains("bound_graph")) { meta.bound_graph = json["bound_graph"].get(); @@ -155,6 +158,8 @@ PluginMeta PluginMeta::FromJson(const nlohmann::json& json) { } if (json.contains("type")) { meta.type = json["type"].get(); + } else { + meta.type = "cpp"; // default is cpp } if (json.contains("option")) { meta.setOptionFromJsonString(json["option"].dump()); @@ -337,6 +342,11 @@ CreateGraphMetaRequest CreateGraphMetaRequest::FromJson( } else { request.creation_time = GetCurrentTimeStamp(); } + if (json.contains("stored_procedures")) { + for (auto& plugin : json["stored_procedures"]) { + request.plugin_metas.push_back(PluginMeta::FromJson(plugin)); + } + } return request; } @@ -351,6 +361,11 @@ std::string CreateGraphMetaRequest::ToString() const { json["data_update_time"] = 0; } json["creation_time"] = creation_time; + json["stored_procedures"] = nlohmann::json::array(); + for (auto& plugin_meta : plugin_metas) { + json["stored_procedures"].push_back( + nlohmann::json::parse(plugin_meta.ToJson())); + } return json.dump(); } diff --git a/flex/storages/metadata/graph_meta_store.h b/flex/storages/metadata/graph_meta_store.h index d42337f13895..f9f710c7e23a 100644 --- a/flex/storages/metadata/graph_meta_store.h +++ b/flex/storages/metadata/graph_meta_store.h @@ -130,6 +130,8 @@ struct CreateGraphMetaRequest { std::optional data_update_time; int64_t creation_time; + std::vector plugin_metas; + static CreateGraphMetaRequest FromJson(const std::string& json_str); std::string ToString() const; diff --git a/flex/tests/hqps/engine_config_test.yaml b/flex/tests/hqps/engine_config_test.yaml index a5d42fca55f2..a11bc928c525 100644 --- a/flex/tests/hqps/engine_config_test.yaml +++ b/flex/tests/hqps/engine_config_test.yaml @@ -23,6 +23,14 @@ compiler: - FilterIntoJoinRule - FilterMatchRule - NotMatchToAntiJoinRule + meta: + reader: + schema: + uri: http://localhost:7777/v1/service/status + interval: 1000 # ms + statistics: + uri: http://localhost:7777/v1/graph/%s/statistics + interval: 86400000 # ms endpoint: default_listen_address: localhost bolt_connector: diff --git a/flex/tests/hqps/hqps_compiler_get_meta_test.sh b/flex/tests/hqps/hqps_compiler_get_meta_test.sh new file mode 100644 index 000000000000..4e752ad05c88 --- /dev/null +++ b/flex/tests/hqps/hqps_compiler_get_meta_test.sh @@ -0,0 +1,99 @@ +#!/bin/bash +# Copyright 2020 Alibaba Group Holding Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +set -e +SCRIPT_DIR=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd) +FLEX_HOME=${SCRIPT_DIR}/../../ +SERVER_BIN=${FLEX_HOME}/build/bin/interactive_server +GIE_HOME=${FLEX_HOME}/../interactive_engine/ +ADMIN_PORT=7777 +QUERY_PORT=10000 + +# +if [ ! $# -eq 2 ]; then + echo "only receives: $# args, need 2" + echo "Usage: $0 " + exit 1 +fi + +INTERACTIVE_WORKSPACE=$1 +ENGINE_CONFIG_PATH=$2 +if [ ! -d ${INTERACTIVE_WORKSPACE} ]; then + echo "INTERACTIVE_WORKSPACE: ${INTERACTIVE_WORKSPACE} not exists" + exit 1 +fi +if [ ! -f ${ENGINE_CONFIG_PATH} ]; then + echo "ENGINE_CONFIG: ${ENGINE_CONFIG_PATH} not exists" + exit 1 +fi + + +RED='\033[0;31m' +GREEN='\033[0;32m' +NC='\033[0m' # No Color +err() { + echo -e "${RED}[$(date +'%Y-%m-%d %H:%M:%S')] -ERROR- $* ${NC}" >&2 +} + +info() { + echo -e "${GREEN}[$(date +'%Y-%m-%d %H:%M:%S')] -INFO- $* ${NC}" +} + + +kill_service(){ + info "Kill Service first" + ps -ef | grep "interactive_server" | awk '{print $2}' | xargs kill -9 || true + ps -ef | grep "GraphServer" | awk '{print $2}' | xargs kill -9 || true + sleep 3 + # check if service is killed + info "Kill Service success" +} + +# kill service when exit +trap kill_service EXIT + +# start engine service and load ldbc graph +start_engine_service(){ + #check SERVER_BIN exists + if [ ! -f ${SERVER_BIN} ]; then + err "SERVER_BIN not found" + exit 1 + fi + + cmd="${SERVER_BIN} -c ${ENGINE_CONFIG_PATH} --enable-admin-service true " + cmd="${cmd} -w ${INTERACTIVE_WORKSPACE} --start-compiler true" + + echo "Start engine service with command: ${cmd}" + ${cmd} & + sleep 10 + #check interactive_server is running, if not, exit + ps -ef | grep "interactive_server" | grep -v grep + + info "Start engine service success" +} + +run_cypher_test() { + # run a simple cypher query: MATCH (n) RETURN count(n) + python3 ./test_count_vertices.py --endpoint neo4j://localhost:7687 +} + +kill_service +start_engine_service +# comiper service will fail to start, if the graph meta can not be retrieved +run_cypher_test +kill_service + + + + diff --git a/flex/tests/hqps/test_count_vertices.py b/flex/tests/hqps/test_count_vertices.py new file mode 100644 index 000000000000..b85761a7cea0 --- /dev/null +++ b/flex/tests/hqps/test_count_vertices.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from neo4j import GraphDatabase +from neo4j import Session as Neo4jSession + +import argparse + +def count_vertices(sess: Neo4jSession): + query = "MATCH (n) RETURN COUNT(n);" + result = sess.run(query) + for record in result: + print(record[0]) + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Count the number of vertices in a graph.") + parser.add_argument("--endpoint", type=str, required=True, help="The endpoint to connect.") + args = parser.parse_args() + + driver = GraphDatabase.driver(args.endpoint, auth=None) + with driver.session() as session: + count_vertices(session) + driver.close() \ No newline at end of file diff --git a/flex/tests/interactive/modern_graph_schema_v0_0.yaml b/flex/tests/interactive/modern_graph_schema_v0_0.yaml index 252b29c999b8..0c627425fbb1 100644 --- a/flex/tests/interactive/modern_graph_schema_v0_0.yaml +++ b/flex/tests/interactive/modern_graph_schema_v0_0.yaml @@ -20,7 +20,8 @@ schema: - property_id: 1 property_name: name property_type: - primitive_type: DT_STRING + string: + long_text: - property_id: 2 property_name: age property_type: @@ -39,11 +40,13 @@ schema: - property_id: 1 property_name: name property_type: - primitive_type: DT_STRING + string: + long_text: - property_id: 2 property_name: lang property_type: - primitive_type: DT_STRING + string: + long_text: primary_keys: - id edge_types: diff --git a/flex/tests/interactive/test_plugin_loading.sh b/flex/tests/interactive/test_plugin_loading.sh index 3c376ca6379d..d9b75c5fb48a 100644 --- a/flex/tests/interactive/test_plugin_loading.sh +++ b/flex/tests/interactive/test_plugin_loading.sh @@ -30,13 +30,22 @@ kill_service(){ trap kill_service EXIT start_engine_service(){ + # expect one args + if [ $# -lt 1 ]; then + echo "Receives: $# args, need 1 args" + echo "Usage: $0 " + exit 1 + fi + enable_compiler=$1 #check SERVER_BIN exists if [ ! -f ${SERVER_BIN} ]; then err "SERVER_BIN not found" exit 1 fi cmd="${SERVER_BIN} -w ${WORKSPACE} -c ${ENGINE_CONFIG_PATH} --enable-admin-service true" - cmd="${cmd} --start-compiler true" + if [ "${enable_compiler}" == "true" ]; then + cmd="${cmd} --start-compiler true" + fi echo "Start engine service with command: ${cmd}" ${cmd} & @@ -93,7 +102,7 @@ check_procedure_loading_and_calling_via_encoder() { exit 1 fi cp $1 ${WORKSPACE}/data/${GRAPH_NAME}/graph.yaml - start_engine_service + start_engine_service false python3 test_call_proc.py --endpoint http://localhost:7777 --input-format encoder @@ -108,16 +117,19 @@ check_procedure_loading_and_calling_via_cypher_json() { exit 1 fi cp $1 ${WORKSPACE}/data/${GRAPH_NAME}/graph.yaml - start_engine_service + start_engine_service true + sleep 5 python3 test_call_proc.py --endpoint http://localhost:7777 --input-format json kill_service } echo "Testing for schema file: ${SCHEMA_VERSION_00}" +rm -rf ${WORKSPACE}/METADATA/ check_procedure_loading_and_calling_via_encoder ${SCHEMA_VERSION_00} echo "Testing for schema file: ${SCHEMA_VERSION_01}" +rm -rf ${WORKSPACE}/METADATA/ check_procedure_loading_and_calling_via_cypher_json ${SCHEMA_VERSION_01} echo "Test passed for plugin loading and calling" \ No newline at end of file diff --git a/interactive_engine/compiler/conf/ir.compiler.properties b/interactive_engine/compiler/conf/ir.compiler.properties index 955ba0607a96..992f828a2b2e 100644 --- a/interactive_engine/compiler/conf/ir.compiler.properties +++ b/interactive_engine/compiler/conf/ir.compiler.properties @@ -13,7 +13,10 @@ pegasus.hosts: localhost:1234 # set timeout in milliseconds to connect to hiactor service # hiactor.timeout: 6000000 -# graph.schema +# set schema access uri, two formats are supported: +# 1. local file path: file://, i.e. file:///path/to/your/schema.json +# 2. remote http path: http://:/, i.e. http://localhost:8080/schema +# we specifically support reading from a relative path if it is a local file. graph.schema: ../executor/ir/core/resource/modern_schema.json graph.store: exp @@ -21,7 +24,7 @@ graph.planner.is.on: true graph.planner.opt: RBO graph.planner.rules: FilterIntoJoinRule, FilterMatchRule, ExtendIntersectRule, ExpandGetVFusionRule -# set file path of glogue input statistics +# set statistics access uri # graph.statistics: src/test/resources/statistics/modern_statistics.json # set stored procedures directory path @@ -61,3 +64,9 @@ calcite.default.charset: UTF-8 # set the max capacity of the result streaming buffer for each query # per.query.stream.buffer.max.capacity: 256 + +# set the interval in milliseconds to fetch graph schema +# graph.meta.schema.fetch.interval.ms: 1000 + +# set the timeout in milliseconds to fetch graph statistics +# graph.meta.statistics.fetch.interval.ms: 86400000l diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/GraphServer.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/GraphServer.java index d5787d2fae53..946110d660e2 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/GraphServer.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/GraphServer.java @@ -25,8 +25,10 @@ import com.alibaba.graphscope.common.config.GraphConfig; import com.alibaba.graphscope.common.ir.meta.IrMeta; import com.alibaba.graphscope.common.ir.meta.IrMetaTracker; +import com.alibaba.graphscope.common.ir.meta.fetcher.DynamicIrMetaFetcher; import com.alibaba.graphscope.common.ir.meta.fetcher.IrMetaFetcher; import com.alibaba.graphscope.common.ir.meta.fetcher.StaticIrMetaFetcher; +import com.alibaba.graphscope.common.ir.meta.reader.HttpIrMetaReader; import com.alibaba.graphscope.common.ir.meta.reader.LocalIrMetaReader; import com.alibaba.graphscope.common.ir.planner.GraphRelOptimizer; import com.alibaba.graphscope.common.ir.tools.*; @@ -48,6 +50,7 @@ import java.io.File; import java.io.IOException; +import java.net.URI; import java.net.URL; import java.nio.charset.StandardCharsets; import java.nio.file.Files; @@ -187,7 +190,14 @@ public static void main(String[] args) throws Exception { private static IrMetaFetcher createIrMetaFetcher(Configs configs, IrMetaTracker tracker) throws IOException { - return new StaticIrMetaFetcher(new LocalIrMetaReader(configs), tracker); + URI schemaUri = URI.create(GraphConfig.GRAPH_META_SCHEMA_URI.get(configs)); + if (schemaUri.getScheme() == null || schemaUri.getScheme().equals("file")) { + return new StaticIrMetaFetcher(new LocalIrMetaReader(configs), tracker); + } else if (schemaUri.getScheme().equals("http")) { + return new DynamicIrMetaFetcher(configs, new HttpIrMetaReader(configs), tracker); + } + throw new IllegalArgumentException( + "unknown graph meta reader mode: " + schemaUri.getScheme()); } private static GraphProperties getTestGraph(Configs configs) { diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/GraphConfig.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/GraphConfig.java index 7ec5e1c48769..deeecba5e5db 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/GraphConfig.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/GraphConfig.java @@ -17,8 +17,17 @@ package com.alibaba.graphscope.common.config; public class GraphConfig { - public static final Config GRAPH_SCHEMA = Config.stringConfig("graph.schema", "."); - public static final Config GRAPH_STATISTICS = + public static final Config GRAPH_META_SCHEMA_URI = + Config.stringConfig("graph.schema", "."); + + public static final Config GRAPH_META_STATISTICS_URI = Config.stringConfig("graph.statistics", ""); + public static final Config GRAPH_STORE = Config.stringConfig("graph.store", "exp"); + + public static final Config GRAPH_META_SCHEMA_FETCH_INTERVAL_MS = + Config.longConfig("graph.meta.schema.fetch.interval.ms", 1000); + + public static final Config GRAPH_META_STATISTICS_FETCH_INTERVAL_MS = + Config.longConfig("graph.meta.statistics.fetch.interval.ms", 24 * 3600 * 1000l); } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/PlannerConfig.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/PlannerConfig.java index 1512c9301353..d34e8e7a38da 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/PlannerConfig.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/PlannerConfig.java @@ -9,7 +9,9 @@ public class PlannerConfig { public static final Config GRAPH_PLANNER_OPT = Config.stringConfig("graph.planner.opt", "RBO"); public static final Config GRAPH_PLANNER_RULES = - Config.stringConfig("graph.planner.rules", ""); + Config.stringConfig( + "graph.planner.rules", + "FilterIntoJoinRule,FilterMatchRule,ExtendIntersectRule,ExpandGetVFusionRule"); public static final Config GRAPH_PLANNER_CBO_GLOGUE_SIZE = Config.intConfig("graph.planner.cbo.glogue.size", 3); public static final Config JOIN_MIN_PATTERN_SIZE = diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/YamlConfigs.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/YamlConfigs.java index 55f481acbf47..4d26b2413529 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/YamlConfigs.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/YamlConfigs.java @@ -25,7 +25,6 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; -import java.nio.file.Path; import java.util.Map; public class YamlConfigs extends Configs { @@ -55,16 +54,24 @@ public class YamlConfigs extends Configs { if (schema != null) { return schema; } - String workspace = configs.get("directories.workspace"); - String subdir = configs.get("directories.subdirs.data"); - String graphName = configs.get("default_graph"); - if (workspace != null && subdir != null && graphName != null) { - return Path.of(workspace, subdir, graphName, "graph.yaml") - .toString(); - } else { - return null; + return configs.get("compiler.meta.reader.schema.uri"); + }) + .put( + "graph.statistics", + (Configs configs) -> { + String statistics = System.getProperty("graph.statistics"); + if (statistics != null) { + return statistics; } + return configs.get("compiler.meta.reader.statistics.uri"); }) + .put( + "graph.meta.schema.fetch.interval.ms", + (Configs configs) -> configs.get("compiler.meta.reader.schema.interval")) + .put( + "graph.meta.statistics.fetch.interval.ms", + (Configs configs) -> + configs.get("compiler.meta.reader.statistics.interval")) .put( "graph.store", (Configs configs) -> { diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/GraphId.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/GraphId.java index bda93af32ab9..8ddf0d75a511 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/GraphId.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/GraphId.java @@ -18,6 +18,8 @@ package com.alibaba.graphscope.common.ir.meta; +import com.google.common.base.Objects; + import org.checkerframework.checker.nullness.qual.Nullable; /** @@ -36,4 +38,17 @@ public GraphId(T id) { public @Nullable T getId() { return id; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + GraphId graphId = (GraphId) o; + return Objects.equal(id, graphId.id); + } + + @Override + public int hashCode() { + return Objects.hashCode(id); + } } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java new file mode 100644 index 000000000000..b89e573c65d4 --- /dev/null +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java @@ -0,0 +1,115 @@ +/* + * + * * Copyright 2020 Alibaba Group Holding Limited. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package com.alibaba.graphscope.common.ir.meta.fetcher; + +import com.alibaba.graphscope.common.config.Configs; +import com.alibaba.graphscope.common.config.GraphConfig; +import com.alibaba.graphscope.common.ir.meta.IrMeta; +import com.alibaba.graphscope.common.ir.meta.IrMetaStats; +import com.alibaba.graphscope.common.ir.meta.IrMetaTracker; +import com.alibaba.graphscope.common.ir.meta.reader.IrMetaReader; +import com.alibaba.graphscope.groot.common.schema.api.GraphStatistics; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * Periodically update IrMeta, with the update frequency controlled by configuration. + * Specifically, for procedures, a remote update will be actively triggered when they are not found locally. + */ +public class DynamicIrMetaFetcher extends IrMetaFetcher implements AutoCloseable { + private static final Logger logger = LoggerFactory.getLogger(DynamicIrMetaFetcher.class); + private final ScheduledExecutorService scheduler; + private volatile IrMetaStats currentState; + + public DynamicIrMetaFetcher(Configs configs, IrMetaReader dataReader, IrMetaTracker tracker) { + super(dataReader, tracker); + this.scheduler = new ScheduledThreadPoolExecutor(2); + this.scheduler.scheduleAtFixedRate( + () -> syncMeta(), + 0, + GraphConfig.GRAPH_META_SCHEMA_FETCH_INTERVAL_MS.get(configs), + TimeUnit.MILLISECONDS); + this.scheduler.scheduleAtFixedRate( + () -> syncStats(), + 0, + GraphConfig.GRAPH_META_STATISTICS_FETCH_INTERVAL_MS.get(configs), + TimeUnit.MILLISECONDS); + } + + @Override + public Optional fetch() { + return currentState == null ? Optional.empty() : Optional.of(currentState); + } + + private synchronized void syncMeta() { + try { + IrMeta meta = this.reader.readMeta(); + // if the graph id is changed, we need to update the statistics + GraphStatistics curStats = + (this.currentState == null + || !this.currentState.getGraphId().equals(meta.getGraphId())) + ? null + : this.currentState.getStatistics(); + this.currentState = + new IrMetaStats( + meta.getGraphId(), + meta.getSnapshotId(), + meta.getSchema(), + meta.getStoredProcedures(), + curStats); + if (this.currentState.getStatistics() == null) { + syncStats(); + } + } catch (Exception e) { + logger.warn("failed to read meta data, error is {}", e); + } + } + + private synchronized void syncStats() { + try { + if (this.currentState != null) { + GraphStatistics stats = this.reader.readStats(this.currentState.getGraphId()); + if (stats != null) { + this.currentState = + new IrMetaStats( + this.currentState.getSnapshotId(), + this.currentState.getSchema(), + this.currentState.getStoredProcedures(), + stats); + if (tracker != null) { + tracker.onChanged(this.currentState); + } + } + } + } catch (Exception e) { + logger.warn("failed to read graph statistics, error is {}", e); + } + } + + @Override + public void close() throws Exception { + this.scheduler.shutdown(); + } +} diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/IrMetaFetcher.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/IrMetaFetcher.java index 06985f38a3c5..8385fb35d736 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/IrMetaFetcher.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/IrMetaFetcher.java @@ -26,8 +26,8 @@ /** * This interface primarily describes the strategies to obtaining IrMeta, which are mainly of two types: static and dynamic. - * The static strategy {@code StaticIrMetaFetcher} assumes that IrMeta does not change after initialization, - * while the dynamic strategy {@code DynamicIrMetaFetcher} assumes that IrMeta can change. + * The static strategy {@link StaticIrMetaFetcher} assumes that IrMeta does not change after initialization, + * while the dynamic strategy {@link DynamicIrMetaFetcher} assumes that IrMeta can change. */ public abstract class IrMetaFetcher { protected final IrMetaReader reader; diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/reader/HttpIrMetaReader.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/reader/HttpIrMetaReader.java new file mode 100644 index 000000000000..05ae98478633 --- /dev/null +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/reader/HttpIrMetaReader.java @@ -0,0 +1,131 @@ +/* + * + * * Copyright 2020 Alibaba Group Holding Limited. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package com.alibaba.graphscope.common.ir.meta.reader; + +import com.alibaba.graphscope.common.config.Configs; +import com.alibaba.graphscope.common.config.GraphConfig; +import com.alibaba.graphscope.common.ir.meta.GraphId; +import com.alibaba.graphscope.common.ir.meta.IrMeta; +import com.alibaba.graphscope.common.ir.meta.SnapshotId; +import com.alibaba.graphscope.common.ir.meta.procedure.GraphStoredProcedures; +import com.alibaba.graphscope.common.ir.meta.schema.FileFormatType; +import com.alibaba.graphscope.common.ir.meta.schema.IrGraphSchema; +import com.alibaba.graphscope.common.ir.meta.schema.IrGraphStatistics; +import com.alibaba.graphscope.common.ir.meta.schema.SchemaInputStream; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; + +import org.javatuples.Pair; +import org.yaml.snakeyaml.Yaml; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.nio.charset.StandardCharsets; +import java.util.Map; + +// read ir meta from a remote http service +public class HttpIrMetaReader implements IrMetaReader { + private static final String CONTENT_TYPE = "Content-Type"; + private static final String APPLICATION_JSON = "application/json; utf-8"; + private final HttpClient httpClient; + private final Configs configs; + + public HttpIrMetaReader(Configs configs) { + this.configs = configs; + this.httpClient = HttpClient.newBuilder().build(); + } + + @Override + public IrMeta readMeta() throws IOException { + try { + HttpResponse response = + sendRequest(GraphConfig.GRAPH_META_SCHEMA_URI.get(configs)); + String res = response.body(); + Preconditions.checkArgument( + response.statusCode() == 200, + "read meta data fail, status code: %s, error message: %s", + response.statusCode(), + res); + Pair metaPair = convertMetaFromJsonToYaml(res); + String metaInYaml = metaPair.getValue1(); + return new IrMeta( + metaPair.getValue0(), + SnapshotId.createEmpty(), // todo: return snapshot id from http service + new IrGraphSchema( + new SchemaInputStream( + new ByteArrayInputStream( + metaInYaml.getBytes(StandardCharsets.UTF_8)), + FileFormatType.YAML)), + new GraphStoredProcedures( + new ByteArrayInputStream(metaInYaml.getBytes(StandardCharsets.UTF_8)), + this)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public IrGraphStatistics readStats(GraphId graphId) throws IOException { + try { + Preconditions.checkArgument( + graphId.getId() != null, "graph id should not be null in http meta reader"); + HttpResponse response = + sendRequest( + String.format( + GraphConfig.GRAPH_META_STATISTICS_URI.get(configs), + graphId.getId())); + String res = response.body(); + Preconditions.checkArgument( + response.statusCode() == 200, + "read graph statistics fail, status code: %s, error message: %s", + response.statusCode(), + res); + return new IrGraphStatistics( + new ByteArrayInputStream(res.getBytes(StandardCharsets.UTF_8))); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + private HttpResponse sendRequest(String requestUri) + throws IOException, InterruptedException { + HttpRequest request = + HttpRequest.newBuilder() + .uri(URI.create(requestUri)) + .headers(CONTENT_TYPE, APPLICATION_JSON) + .GET() + .build(); + return httpClient.send(request, HttpResponse.BodyHandlers.ofString()); + } + + private Pair convertMetaFromJsonToYaml(String metaInJson) throws IOException { + ObjectMapper mapper = new ObjectMapper(); + JsonNode rootNode = mapper.readTree(metaInJson); + Map rootMap = mapper.convertValue(rootNode, Map.class); + Map metaMap = (Map) rootMap.get("graph"); + GraphId graphId = new GraphId(metaMap.get("id")); + Yaml yaml = new Yaml(); + return Pair.with(graphId, yaml.dump(metaMap)); + } +} diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/reader/LocalIrMetaReader.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/reader/LocalIrMetaReader.java index ab667c086bac..7c0db6727816 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/reader/LocalIrMetaReader.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/reader/LocalIrMetaReader.java @@ -32,6 +32,8 @@ import java.io.FileInputStream; import java.io.IOException; +import java.net.URI; +import java.nio.file.Path; import java.util.Objects; /** @@ -47,24 +49,33 @@ public LocalIrMetaReader(Configs configs) { @Override public IrMeta readMeta() throws IOException { - String metaPath = + String schemaUri = Objects.requireNonNull( - GraphConfig.GRAPH_SCHEMA.get(configs), "ir meta path not exist"); - FileFormatType formatType = FileUtils.getFormatType(metaPath); + GraphConfig.GRAPH_META_SCHEMA_URI.get(configs), "ir meta path not exist"); + URI schemaURI = URI.create(schemaUri); + Path schemaPath = + (schemaURI.getScheme() == null) ? Path.of(schemaURI.getPath()) : Path.of(schemaURI); + FileFormatType formatType = FileUtils.getFormatType(schemaUri); IrGraphSchema graphSchema = - new IrGraphSchema(new SchemaInputStream(new FileInputStream(metaPath), formatType)); + new IrGraphSchema( + new SchemaInputStream( + new FileInputStream(schemaPath.toFile()), formatType)); IrMeta irMeta = (formatType == FileFormatType.YAML) ? new IrMeta( graphSchema, - new GraphStoredProcedures(new FileInputStream(metaPath), this)) + new GraphStoredProcedures(new FileInputStream(schemaUri), this)) : new IrMeta(graphSchema); return irMeta; } @Override public IrGraphStatistics readStats(GraphId graphId) throws IOException { - String statsPath = GraphConfig.GRAPH_STATISTICS.get(configs); - return statsPath.isEmpty() ? null : new IrGraphStatistics(new FileInputStream(statsPath)); + String statsUri = GraphConfig.GRAPH_META_STATISTICS_URI.get(configs); + if (statsUri.isEmpty()) return null; + URI statsURI = URI.create(statsUri); + Path statsPath = + (statsURI.getScheme() == null) ? Path.of(statsURI.getPath()) : Path.of(statsURI); + return new IrGraphStatistics(new FileInputStream(statsPath.toFile())); } } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/rel/metadata/schema/GlogueSchema.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/rel/metadata/schema/GlogueSchema.java index 2d199869c264..9bc470321aba 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/rel/metadata/schema/GlogueSchema.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/rel/metadata/schema/GlogueSchema.java @@ -58,9 +58,13 @@ public GlogueSchema(GraphSchema graphSchema, GraphStatistics statistics) { edgeTypeCardinality = new HashMap(); for (GraphVertex vertex : graphSchema.getVertexList()) { schemaGraph.addVertex(vertex.getLabelId()); - vertexTypeCardinality.put( - vertex.getLabelId(), - statistics.getVertexTypeCount(vertex.getLabelId()).doubleValue()); + Long vertexTypeCount = statistics.getVertexTypeCount(vertex.getLabelId()); + if (vertexTypeCount == null) { + throw new IllegalArgumentException( + "Vertex type count not found for vertex type: " + vertex.getLabelId()); + } else { + vertexTypeCardinality.put(vertex.getLabelId(), vertexTypeCount.doubleValue()); + } } for (GraphEdge edge : graphSchema.getEdgeList()) { for (EdgeRelation relation : edge.getRelationList()) { @@ -68,14 +72,17 @@ public GlogueSchema(GraphSchema graphSchema, GraphStatistics statistics) { int targetType = relation.getTarget().getLabelId(); EdgeTypeId edgeType = new EdgeTypeId(sourceType, targetType, edge.getLabelId()); schemaGraph.addEdge(sourceType, targetType, edgeType); - edgeTypeCardinality.put( - edgeType, - statistics - .getEdgeTypeCount( - Optional.of(sourceType), - Optional.of(edge.getLabelId()), - Optional.of(targetType)) - .doubleValue()); + Long edgeTypeCount = + statistics.getEdgeTypeCount( + Optional.of(sourceType), + Optional.of(edge.getLabelId()), + Optional.of(targetType)); + if (edgeTypeCount == null) { + throw new IllegalArgumentException( + "Edge type count not found for edge type: " + edge.getLabelId()); + } else { + edgeTypeCardinality.put(edgeType, edgeTypeCount.doubleValue()); + } } } } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/GraphPlanner.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/GraphPlanner.java index b02324527e1c..fe74f13d63c1 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/GraphPlanner.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/GraphPlanner.java @@ -18,9 +18,13 @@ import com.alibaba.graphscope.common.config.Configs; import com.alibaba.graphscope.common.config.FrontendConfig; +import com.alibaba.graphscope.common.config.GraphConfig; import com.alibaba.graphscope.common.ir.meta.IrMeta; +import com.alibaba.graphscope.common.ir.meta.IrMetaTracker; +import com.alibaba.graphscope.common.ir.meta.fetcher.IrMetaFetcher; import com.alibaba.graphscope.common.ir.meta.fetcher.StaticIrMetaFetcher; import com.alibaba.graphscope.common.ir.meta.procedure.StoredProcedureMeta; +import com.alibaba.graphscope.common.ir.meta.reader.HttpIrMetaReader; import com.alibaba.graphscope.common.ir.meta.reader.LocalIrMetaReader; import com.alibaba.graphscope.common.ir.meta.schema.GraphOptSchema; import com.alibaba.graphscope.common.ir.meta.schema.IrGraphSchema; @@ -49,6 +53,8 @@ import java.io.File; import java.io.FileOutputStream; +import java.io.IOException; +import java.net.URI; import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.Objects; @@ -187,6 +193,18 @@ private static Configs createExtraConfigs(@Nullable String keyValues) { return new Configs(keyValueMap); } + private static IrMetaFetcher createIrMetaFetcher(Configs configs, IrMetaTracker tracker) + throws IOException { + URI schemaUri = URI.create(GraphConfig.GRAPH_META_SCHEMA_URI.get(configs)); + if (schemaUri.getScheme() == null || schemaUri.getScheme().equals("file")) { + return new StaticIrMetaFetcher(new LocalIrMetaReader(configs), tracker); + } else if (schemaUri.getScheme().equals("http")) { + return new StaticIrMetaFetcher(new HttpIrMetaReader(configs), tracker); + } + throw new IllegalArgumentException( + "unknown graph meta reader mode: " + schemaUri.getScheme()); + } + public static void main(String[] args) throws Exception { if (args.length < 4 || args[0].isEmpty() @@ -200,9 +218,7 @@ public static void main(String[] args) throws Exception { } Configs configs = Configs.Factory.create(args[0]); GraphRelOptimizer optimizer = new GraphRelOptimizer(configs); - StaticIrMetaFetcher metaFetcher = - new StaticIrMetaFetcher( - new LocalIrMetaReader(configs), optimizer.getGlogueHolder()); + IrMetaFetcher metaFetcher = createIrMetaFetcher(configs, optimizer.getGlogueHolder()); String query = FileUtils.readFileToString(new File(args[1]), StandardCharsets.UTF_8); GraphPlanner planner = new GraphPlanner( diff --git a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/config/YamlConfigTest.java b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/config/YamlConfigTest.java index addf56aef907..4914e682ad21 100644 --- a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/config/YamlConfigTest.java +++ b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/config/YamlConfigTest.java @@ -63,7 +63,7 @@ public void pegasus_config_test() throws Exception { Assert.assertEquals(18, (int) PegasusConfig.PEGASUS_OUTPUT_CAPACITY.get(configs)); Assert.assertEquals( "./target/test-classes/config/modern/graph.yaml", - GraphConfig.GRAPH_SCHEMA.get(configs)); + GraphConfig.GRAPH_META_SCHEMA_URI.get(configs)); Assert.assertEquals("pegasus", FrontendConfig.ENGINE_TYPE.get(configs)); Assert.assertEquals(false, FrontendConfig.GREMLIN_SERVER_DISABLED.get(configs)); Assert.assertEquals(8003, (int) FrontendConfig.GREMLIN_SERVER_PORT.get(configs)); diff --git a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/Utils.java b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/Utils.java index 0d94f23b6eea..92f3d969715a 100644 --- a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/Utils.java +++ b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/Utils.java @@ -99,7 +99,7 @@ public static IrMeta mockSchemaMeta(String schemaPath) { Configs configs = new Configs( ImmutableMap.of( - GraphConfig.GRAPH_SCHEMA.getKey(), + GraphConfig.GRAPH_META_SCHEMA_URI.getKey(), schemaResource.toURI().getPath())); return new StaticIrMetaFetcher(new LocalIrMetaReader(configs), null).fetch().get(); } catch (Exception e) { @@ -117,9 +117,9 @@ public static IrMeta mockIrMeta( Configs configs = new Configs( ImmutableMap.of( - GraphConfig.GRAPH_SCHEMA.getKey(), + GraphConfig.GRAPH_META_SCHEMA_URI.getKey(), schemaResource.toURI().getPath(), - GraphConfig.GRAPH_STATISTICS.getKey(), + GraphConfig.GRAPH_META_STATISTICS_URI.getKey(), statisticsResource.toURI().getPath())); return new StaticIrMetaFetcher(new LocalIrMetaReader(configs), tracker).fetch().get(); } catch (Exception e) { diff --git a/interactive_engine/compiler/src/test/resources/config/gs_interactive_hiactor.yaml b/interactive_engine/compiler/src/test/resources/config/gs_interactive_hiactor.yaml index d6c6d0398bff..6176498157e8 100644 --- a/interactive_engine/compiler/src/test/resources/config/gs_interactive_hiactor.yaml +++ b/interactive_engine/compiler/src/test/resources/config/gs_interactive_hiactor.yaml @@ -20,6 +20,10 @@ compiler: opt: RBO rules: - FilterMatchRule + meta: + reader: + schema: + uri: ./target/test-classes/config/modern/graph.yaml endpoint: default_listen_address: 0.0.0.0 # default localhost bolt_connector: # for cypher, there may be other connectors, such as bolt_connector, https_connector diff --git a/interactive_engine/compiler/src/test/resources/config/gs_interactive_pegasus.yaml b/interactive_engine/compiler/src/test/resources/config/gs_interactive_pegasus.yaml index 6a18c3b86e3c..bfb9dc7e8fe7 100644 --- a/interactive_engine/compiler/src/test/resources/config/gs_interactive_pegasus.yaml +++ b/interactive_engine/compiler/src/test/resources/config/gs_interactive_pegasus.yaml @@ -24,6 +24,10 @@ compiler: opt: RBO rules: - FilterMatchRule + meta: + reader: + schema: + uri: ./target/test-classes/config/modern/graph.yaml endpoint: default_listen_address: 0.0.0.0 # default localhost bolt_connector: # for cypher, there may be other connectors, such as bolt_connector, https_connector diff --git a/interactive_engine/frontend/src/main/java/com/alibaba/graphscope/frontend/VineyardIrMetaReader.java b/interactive_engine/frontend/src/main/java/com/alibaba/graphscope/frontend/VineyardIrMetaReader.java index 012b373f4c83..fa550eec6571 100644 --- a/interactive_engine/frontend/src/main/java/com/alibaba/graphscope/frontend/VineyardIrMetaReader.java +++ b/interactive_engine/frontend/src/main/java/com/alibaba/graphscope/frontend/VineyardIrMetaReader.java @@ -44,7 +44,8 @@ public VineyardIrMetaReader(Configs configs) { public IrMeta readMeta() throws IOException { String schemaString = FileUtils.readFileToString( - new File(GraphConfig.GRAPH_SCHEMA.get(configs)), StandardCharsets.UTF_8); + new File(GraphConfig.GRAPH_META_SCHEMA_URI.get(configs)), + StandardCharsets.UTF_8); GraphSchema graphSchema = DefaultGraphSchema.buildSchemaFromJson(schemaString); return new IrMeta(new IrGraphSchema(graphSchema, true)); } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/WrappedSchemaFetcher.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/WrappedSchemaFetcher.java index 646fbad49973..54ffe13606d6 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/WrappedSchemaFetcher.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/WrappedSchemaFetcher.java @@ -43,8 +43,8 @@ public WrappedSchemaFetcher( public Map getSchemaSnapshotPair() { SnapshotWithSchema snapshotSchema = this.snapshotCache.getSnapshotWithSchema(); long MAX_SNAPSHOT_ID = Long.MAX_VALUE - 1; - // Always retrieve the latest result in secondary instance - long snapshotId = isSecondary ? MAX_SNAPSHOT_ID : snapshotSchema.getSnapshotId(); + // Always retrieve the latest snapshot id to avoid inconsistency. + long snapshotId = MAX_SNAPSHOT_ID; GraphSchema schema = snapshotSchema.getGraphDef(); return Map.of(snapshotId, schema); } diff --git a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/ir/GrootMetaFetcher.java b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/ir/GrootMetaFetcher.java deleted file mode 100644 index 027c488bc762..000000000000 --- a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/ir/GrootMetaFetcher.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright 2020 Alibaba Group Holding Limited. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.graphscope.groot.servers.ir; - -import com.alibaba.graphscope.common.ir.meta.IrMeta; -import com.alibaba.graphscope.common.ir.meta.IrMetaTracker; -import com.alibaba.graphscope.common.ir.meta.fetcher.IrMetaFetcher; -import com.alibaba.graphscope.common.ir.meta.reader.IrMetaReader; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; - -public class GrootMetaFetcher extends IrMetaFetcher { - private static final Logger logger = LoggerFactory.getLogger(GrootMetaFetcher.class); - - public GrootMetaFetcher(IrMetaReader reader, IrMetaTracker tracker) { - super(reader, tracker); - } - - @Override - public Optional fetch() { - try { - return Optional.of(reader.readMeta()); - } catch (Exception e) { - logger.warn("fetch ir meta from groot failed: {}", e); - return Optional.empty(); - } - } -} diff --git a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/ir/IrServiceProducer.java b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/ir/IrServiceProducer.java index d6a39f6aa4d4..61f5835bf556 100644 --- a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/ir/IrServiceProducer.java +++ b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/ir/IrServiceProducer.java @@ -18,10 +18,8 @@ import com.alibaba.graphscope.GraphServer; import com.alibaba.graphscope.common.client.channel.ChannelFetcher; -import com.alibaba.graphscope.common.config.AuthConfig; -import com.alibaba.graphscope.common.config.FrontendConfig; -import com.alibaba.graphscope.common.config.PegasusConfig; -import com.alibaba.graphscope.common.config.PlannerConfig; +import com.alibaba.graphscope.common.config.*; +import com.alibaba.graphscope.common.ir.meta.fetcher.DynamicIrMetaFetcher; import com.alibaba.graphscope.common.ir.meta.fetcher.IrMetaFetcher; import com.alibaba.graphscope.common.ir.planner.GraphRelOptimizer; import com.alibaba.graphscope.gremlin.integration.result.TestGraphFactory; @@ -60,8 +58,10 @@ public AbstractService makeGraphService( logger.info("IR configs: {}", irConfigs); GraphRelOptimizer optimizer = new GraphRelOptimizer(irConfigs); IrMetaFetcher irMetaFetcher = - new GrootMetaFetcher( - new GrootIrMetaReader(schemaFetcher), optimizer.getGlogueHolder()); + new DynamicIrMetaFetcher( + irConfigs, + new GrootIrMetaReader(schemaFetcher), + optimizer.getGlogueHolder()); RoleClients updateCommitter = new RoleClients<>(channelManager, RoleType.COORDINATOR, SnapshotUpdateClient::new); int frontendId = CommonConfig.NODE_IDX.get(configs); @@ -130,10 +130,17 @@ private com.alibaba.graphscope.common.config.Configs getConfigs() { addToConfigMapIfExist(FrontendConfig.FRONTEND_SERVER_NUM.getKey(), configMap); // add frontend qps limit addToConfigMapIfExist(FrontendConfig.QUERY_PER_SECOND_LIMIT.getKey(), configMap); + // add graph schema fetch interval + addToConfigMapIfExist(GraphConfig.GRAPH_META_SCHEMA_FETCH_INTERVAL_MS.getKey(), configMap); + // add graph statistics fetch interval + addToConfigMapIfExist( + GraphConfig.GRAPH_META_STATISTICS_FETCH_INTERVAL_MS.getKey(), configMap); // add graph planner configs addToConfigMapIfExist(PlannerConfig.GRAPH_PLANNER_IS_ON.getKey(), configMap); addToConfigMapIfExist(PlannerConfig.GRAPH_PLANNER_OPT.getKey(), configMap); addToConfigMapIfExist(PlannerConfig.GRAPH_PLANNER_RULES.getKey(), configMap); + addToConfigMapIfExist(FrontendConfig.GREMLIN_SCRIPT_LANGUAGE_NAME.getKey(), configMap); + addToConfigMapIfExist(FrontendConfig.GRAPH_PHYSICAL_OPT.getKey(), configMap); return new com.alibaba.graphscope.common.config.Configs(configMap); }