diff --git a/flex/planner/graph_planner.cc b/flex/planner/graph_planner.cc index deca274d3f69..3605722cd21c 100644 --- a/flex/planner/graph_planner.cc +++ b/flex/planner/graph_planner.cc @@ -12,10 +12,15 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ +#include +#include +#include +#include #include namespace gs { +#if (GRAPH_PLANNER_JNI_INVOKER) namespace jni { static JavaVM* _jvm = NULL; @@ -139,11 +144,72 @@ JNIEnvMark::~JNIEnvMark() { JNIEnv* JNIEnvMark::env() { return _env; } -physical::PhysicalPlan GraphPlannerWrapper::CompilePlan( - const std::string& compiler_config_path, - const std::string& cypher_query_string) { +} // namespace jni + +#endif + +std::vector list_files(const std::string& path) { + // list all files in the directory + std::vector files; + for (const auto& entry : std::filesystem::directory_iterator(path)) { + files.push_back(entry.path().string()); + } + return files; +} + +std::string GraphPlannerWrapper::expand_directory(const std::string& path) { + std::vector paths; + std::string::size_type start = 0; + std::string::size_type end = path.find(':'); + while (end != std::string::npos) { + auto sub_path = path.substr(start, end - start); + if (!sub_path.empty()) { + if (std::filesystem::is_directory(sub_path)) { + auto files = list_files(sub_path); + paths.insert(paths.end(), files.begin(), files.end()); + } else { + paths.push_back(sub_path); + } + } + start = end + 1; + end = path.find(':', start); + } + auto sub_path = path.substr(start); + if (!sub_path.empty()) { + auto files = list_files(sub_path); + paths.insert(paths.end(), files.begin(), files.end()); + } + std::stringstream ss; + for (const auto& p : paths) { + ss << p << ":"; + } + return ss.str(); +} + +#if (GRAPH_PLANNER_JNI_INVOKER) + +std::string GraphPlannerWrapper::generate_jvm_options( + const std::string java_path, const std::string& jna_path, + const std::string& graph_schema_yaml, + const std::string& graph_statistic_json) { + auto expanded_java_path = expand_directory(java_path); + VLOG(10) << "Expanded java path: " << expanded_java_path; + std::string jvm_options = "-Djava.class.path=" + expanded_java_path; + jvm_options += " -Djna.library.path=" + jna_path; + jvm_options += " -Dgraph.schema=" + graph_schema_yaml; + if (!graph_statistic_json.empty()) { + jvm_options += " -Dgraph.statistic=" + graph_statistic_json; + } + return jvm_options; +} + +physical::PhysicalPlan compilePlanJNI(jclass graph_planner_clz_, + jmethodID graph_planner_method_id_, + jni::JNIEnvMark& jni_wrapper_, + const std::string& compiler_config_path, + const std::string& cypher_query_string) { physical::PhysicalPlan physical_plan; - if (!is_valid()) { + if (graph_planner_clz_ == NULL || graph_planner_method_id_ == NULL) { LOG(ERROR) << "Invalid GraphPlannerWrapper."; return physical_plan; } @@ -176,60 +242,101 @@ physical::PhysicalPlan GraphPlannerWrapper::CompilePlan( return physical_plan; } +#endif -std::string GraphPlannerWrapper::generate_jvm_options( - const std::string java_path, const std::string& jna_path, - const std::string& graph_schema_yaml, - const std::string& graph_statistic_json) { - auto expanded_java_path = expand_directory(java_path); - VLOG(10) << "Expanded java path: " << expanded_java_path; - std::string jvm_options = "-Djava.class.path=" + expanded_java_path; - jvm_options += " -Djna.library.path=" + jna_path; - jvm_options += " -Dgraph.schema=" + graph_schema_yaml; - if (!graph_statistic_json.empty()) { - jvm_options += " -Dgraph.statistic=" + graph_statistic_json; +#if (!GRAPH_PLANNER_JNI_INVOKER) + +physical::PhysicalPlan readPhysicalPlan(const std::string& output_file) { + std::ifstream file(output_file, std::ios::binary); + + if (!file.is_open()) { + LOG(ERROR) << "Fail to open file: " << output_file; + return physical::PhysicalPlan(); } - return jvm_options; + + file.seekg(0, std::ios::end); + size_t size = file.tellg(); + file.seekg(0, std::ios::beg); + + std::string buffer; + buffer.resize(size); + + file.read(&buffer[0], size); + + file.close(); + physical::PhysicalPlan plan; + if (!plan.ParseFromString(std::string(buffer))) { + LOG(ERROR) << "Fail to parse physical plan."; + return physical::PhysicalPlan(); + } + return plan; } -std::string GraphPlannerWrapper::expand_directory(const std::string& path) { - std::vector paths; - std::string::size_type start = 0; - std::string::size_type end = path.find(':'); - while (end != std::string::npos) { - auto sub_path = path.substr(start, end - start); - if (!sub_path.empty()) { - if (std::filesystem::is_directory(sub_path)) { - auto files = list_files(sub_path); - paths.insert(paths.end(), files.begin(), files.end()); - } else { - paths.push_back(sub_path); - } +physical::PhysicalPlan compilePlanSubprocess( + const std::string& class_path, const std::string& jna_path, + const std::string& graph_schema_yaml, + const std::string& graph_statistic_json, + const std::string& compiler_config_path, + const std::string& cypher_query_string) { + // use execvp to run the command + auto random_prefix = std::to_string( + std::chrono::system_clock::now().time_since_epoch().count()); + std::string dst_query_path = "/tmp/temp_query_" + random_prefix + ".cypher"; + std::string dst_output_file = "/tmp/temp_output_" + random_prefix + ".pb"; + std::ofstream query_file(dst_query_path); + query_file << cypher_query_string; + query_file.close(); + pid_t pid = fork(); + if (pid == 0) { + // generate a random file name for query + const char* const command_string_array[] = { + "java", + "-cp", + class_path.c_str(), + jna_path.c_str(), + graph_schema_yaml.c_str(), + graph_statistic_json.c_str(), + "com.alibaba.graphscope.common.ir.tools.GraphPlanner", + compiler_config_path.c_str(), + dst_query_path.c_str(), + dst_output_file.c_str(), + "/tmp/temp.cypher.yaml", + NULL}; + LOG(INFO) << command_string_array[0] << " " << command_string_array[1] + << " " << command_string_array[2] << " " + << command_string_array[3] << " " << command_string_array[4] + << " " << command_string_array[5] << " " + << command_string_array[6] << " " << command_string_array[7] + << " " << command_string_array[8] << " " + << command_string_array[9]; + execvp(command_string_array[0], + const_cast(command_string_array)); + } else if (pid < 0) { + LOG(ERROR) << "Error in fork."; + } else { + int status; + waitpid(pid, &status, 0); + if (status != 0) { + LOG(ERROR) << "Error in running command."; } - start = end + 1; - end = path.find(':', start); + return readPhysicalPlan(dst_output_file); } - auto sub_path = path.substr(start); - if (!sub_path.empty()) { - auto files = list_files(sub_path); - paths.insert(paths.end(), files.begin(), files.end()); - } - std::stringstream ss; - for (const auto& p : paths) { - ss << p << ":"; - } - return ss.str(); + return physical::PhysicalPlan(); } +#endif -std::vector GraphPlannerWrapper::list_files( - const std::string& path) { - // list all files in the directory - std::vector files; - for (const auto& entry : std::filesystem::directory_iterator(path)) { - files.push_back(entry.path().string()); - } - return files; +physical::PhysicalPlan GraphPlannerWrapper::CompilePlan( + const std::string& compiler_config_path, + const std::string& cypher_query_string) { +#if (GRAPH_PLANNER_JNI_INVOKER) + return compilePlanJNI(graph_planner_clz_, graph_planner_method_id_, + jni_wrapper_, compiler_config_path, + cypher_query_string); +#else + return compilePlanSubprocess(class_path_, jna_path_, graph_schema_yaml_, + graph_statistic_json_, compiler_config_path, + cypher_query_string); +#endif } -} // namespace jni } // namespace gs diff --git a/flex/planner/graph_planner.h b/flex/planner/graph_planner.h index 1372990758ff..68e2325b61a2 100644 --- a/flex/planner/graph_planner.h +++ b/flex/planner/graph_planner.h @@ -26,7 +26,15 @@ limitations under the License. #include "glog/logging.h" +// #define GRAPH_PLANNER_JNI_INVOKER JNI +#ifndef GRAPH_PLANNER_JNI_INVOKER +#define GRAPH_PLANNER_JNI_INVOKER 0 // 1: JNI, 0: subprocess +#endif +// Could be JNI or subprocess + namespace gs { + +#if (GRAPH_PLANNER_JNI_INVOKER) namespace jni { struct JNIEnvMark { JNIEnv* _env; @@ -37,6 +45,9 @@ struct JNIEnvMark { JNIEnv* env(); }; +} // namespace jni +#endif + class GraphPlannerWrapper { public: static constexpr const char* kGraphPlannerClass = @@ -48,6 +59,7 @@ class GraphPlannerWrapper { GraphPlannerWrapper(const std::string java_path, const std::string& jna_path, const std::string& graph_schema_yaml, const std::string& graph_statistic_json = "") +#if (GRAPH_PLANNER_JNI_INVOKER) : jni_wrapper_(generate_jvm_options( java_path, jna_path, graph_schema_yaml, graph_statistic_json)) { jclass clz = jni_wrapper_.env()->FindClass(kGraphPlannerClass); @@ -64,15 +76,29 @@ class GraphPlannerWrapper { } graph_planner_method_id_ = j_method_id; } +#else + : jna_path_("-Djna.library.path=" + jna_path), + graph_schema_yaml_("-Dgraph.schema=" + graph_schema_yaml), + graph_statistic_json_("-Dgraph.statistic=" + graph_statistic_json) { + class_path_ = expand_directory(java_path); + } +#endif ~GraphPlannerWrapper() { +#if (GRAPH_PLANNER_JNI_INVOKER) if (graph_planner_clz_ != NULL) { jni_wrapper_.env()->DeleteGlobalRef(graph_planner_clz_); } +#endif } inline bool is_valid() { +#if (GRAPH_PLANNER_JNI_INVOKER) return graph_planner_clz_ != NULL && graph_planner_method_id_ != NULL; +#else + return true; // just return true, since we don't have a way to check the + // validity when calling via subprocess. +#endif } /** @@ -90,21 +116,28 @@ class GraphPlannerWrapper { const std::string& jna_path, const std::string& graph_schema_yaml, const std::string& graph_statistic_json); - + // physical::PhysicalPlan compilePlanJNI(const std::string& + // compiler_config_path, + // const std::string& + // cypher_query_string); + std::string expand_directory(const std::string& path); +#if (GRAPH_PLANNER_JNI_INVOKER) // We need to list all files in the directory, if exists. // The reason why we need to list all files in the directory is that // java -Djava.class.path=dir/* (in jni, which we are using)will not load all // jar files in the directory, While java -cp dir/* will load all jar files in // the directory. - std::string expand_directory(const std::string& path); - std::vector list_files(const std::string& path); - JNIEnvMark jni_wrapper_; + gs::jni::JNIEnvMark jni_wrapper_; jclass graph_planner_clz_; jmethodID graph_planner_method_id_; +#else + std::string class_path_; + std::string jna_path_; + std::string graph_schema_yaml_; + std::string graph_statistic_json_; +#endif }; - -} // namespace jni } // namespace gs #endif // PLANNER_GRAPH_PLANNER_H_ \ No newline at end of file diff --git a/flex/tests/planner/graph_planner_test.cc b/flex/tests/planner/graph_planner_test.cc index fa4804ad9c88..46231d9150f0 100644 --- a/flex/tests/planner/graph_planner_test.cc +++ b/flex/tests/planner/graph_planner_test.cc @@ -39,7 +39,7 @@ int main(int argc, char** argv) { LOG(ERROR) << "Invalid compiler config path."; return 1; } - gs::jni::GraphPlannerWrapper planner(java_path, jna_path, graph_schema_path); + gs::GraphPlannerWrapper planner(java_path, jna_path, graph_schema_path); if (!planner.is_valid()) { LOG(ERROR) << "Invalid GraphPlannerWrapper."; return 1;