Skip to content

Commit

Permalink
Merge branch 'main' into ir_fix_filter
Browse files Browse the repository at this point in the history
  • Loading branch information
longbinlai authored Jun 3, 2024
2 parents 1141af3 + fe61616 commit a640569
Show file tree
Hide file tree
Showing 16 changed files with 641 additions and 247 deletions.
2 changes: 1 addition & 1 deletion docs/flex/interactive/data_model.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Within the `graph.yaml` file, vertices are delineated under the `vertex_types` s
Note:
- In the current version, only one single primary key can be specified, but we plan to support multiple primary keys in the future.
- The data type of primary key column must be one of `DT_SIGNED_INT32`, `DT_UNSIGNED_INT32`, `DT_SIGNED_INT64` or `DT_UNSIGNED_INT64`.
- The data type of primary key column must be one of `DT_SIGNED_INT32`, `DT_UNSIGNED_INT32`, `DT_SIGNED_INT64`,`DT_UNSIGNED_INT64`, or string types `var_char`,`long_text`(`fixed_char` is currently not supported).

Edges are defined within the `edge_types` section, characterized by the mandatory fields: `type_name`, `vertex_type_pair_relations`, and `properties`. The type_name and properties fields function similarly to those in vertices. However, the vertex_type_pair_relations field is exclusive to edges, specifying the permissible source and destination vertex types, as well as the relationship detailing how many source and destination vertices can be linked by this edge. Here's an illustrative example:
```yaml
Expand Down
60 changes: 60 additions & 0 deletions flex/engines/graph_db/database/graph_db_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
#include "flex/engines/graph_db/database/graph_db_session.h"
#include "flex/utils/app_utils.h"

#ifdef BUILD_HQPS
#include "flex/proto_generated_gie/stored_procedure.pb.h"
#include "nlohmann/json.hpp"
#endif // BUILD_HQPS

namespace gs {

ReadTransaction GraphDBSession::GetReadTransaction() const {
Expand Down Expand Up @@ -224,6 +229,61 @@ AppBase* GraphDBSession::GetApp(int type) {

#undef likely // likely

#ifdef BUILD_HQPS
Result<std::pair<uint8_t, std::string_view>>
GraphDBSession::parse_query_type_from_cypher_json(
const std::string_view& str_view) {
VLOG(10) << "string view: " << str_view;
nlohmann::json j;
try {
j = nlohmann::json::parse(str_view);
} catch (const nlohmann::json::parse_error& e) {
LOG(ERROR) << "Fail to parse json from input content: " << e.what();
return Result<std::pair<uint8_t, std::string_view>>(gs::Status(
StatusCode::InternalError,
"Fail to parse json from input content:" + std::string(e.what())));
}
auto query_name = j["query_name"].get<std::string>();
const auto& app_name_to_path_index = schema().GetPlugins();
if (app_name_to_path_index.count(query_name) <= 0) {
LOG(ERROR) << "Query name is not registered: " << query_name;
return Result<std::pair<uint8_t, std::string_view>>(gs::Status(
StatusCode::NotFound, "Query name is not registered: " + query_name));
}
if (j.contains("arguments")) {
for (auto& arg : j["arguments"]) {
VLOG(10) << "arg: " << arg;
}
}
VLOG(10) << "Query name: " << query_name;
return std::make_pair(app_name_to_path_index.at(query_name).second, str_view);
}

Result<std::pair<uint8_t, std::string_view>>
GraphDBSession::parse_query_type_from_cypher_internal(
const std::string_view& str_view) {
procedure::Query cur_query;
if (!cur_query.ParseFromArray(str_view.data(), str_view.size())) {
LOG(ERROR) << "Fail to parse query from input content";
return Result<std::pair<uint8_t, std::string_view>>(gs::Status(
StatusCode::InternalError, "Fail to parse query from input content"));
}
auto query_name = cur_query.query_name().name();
if (query_name.empty()) {
LOG(ERROR) << "Query name is empty";
return Result<std::pair<uint8_t, std::string_view>>(
gs::Status(StatusCode::NotFound, "Query name is empty"));
}
const auto& app_name_to_path_index = schema().GetPlugins();
if (app_name_to_path_index.count(query_name) <= 0) {
LOG(ERROR) << "Query name is not registered: " << query_name;
return Result<std::pair<uint8_t, std::string_view>>(gs::Status(
StatusCode::NotFound, "Query name is not registered: " + query_name));
}
return std::make_pair(app_name_to_path_index.at(query_name).second, str_view);
}
#endif

const AppMetric& GraphDBSession::GetAppMetric(int idx) const {
return app_metrics_[idx];
}
Expand Down
63 changes: 10 additions & 53 deletions flex/engines/graph_db/database/graph_db_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,6 @@
#include "flex/utils/property/column.h"
#include "flex/utils/result.h"

#ifdef BUILD_HQPS
#include "flex/proto_generated_gie/stored_procedure.pb.h"
#include "nlohmann/json.hpp"
#endif // BUILD_HQPS

namespace gs {

class GraphDB;
Expand Down Expand Up @@ -114,6 +109,12 @@ class GraphDBSession {
AppBase* GetApp(int idx);

private:
#ifdef BUILD_HQPS
Result<std::pair<uint8_t, std::string_view>>
parse_query_type_from_cypher_json(const std::string_view& input);
Result<std::pair<uint8_t, std::string_view>>
parse_query_type_from_cypher_internal(const std::string_view& input);
#endif // BUILD_HQPS
/**
* @brief Parse the input format of the query.
* There are four formats:
Expand Down Expand Up @@ -165,58 +166,14 @@ class GraphDBSession {
// For cypherJson there is no query-id provided. The query name is
// provided in the json string.
std::string_view str_view(input.data(), len - 1);
VLOG(10) << "string view: " << str_view;
nlohmann::json j;
try {
j = nlohmann::json::parse(str_view);
} catch (const nlohmann::json::parse_error& e) {
LOG(ERROR) << "Fail to parse json from input content: " << e.what();
return Result<std::pair<uint8_t, std::string_view>>(gs::Status(
StatusCode::InternalError,
"Fail to parse json from input content:" + std::string(e.what())));
}
auto query_name = j["query_name"].get<std::string>();
const auto& app_name_to_path_index = schema().GetPlugins();
if (app_name_to_path_index.count(query_name) <= 0) {
LOG(ERROR) << "Query name is not registered: " << query_name;
return Result<std::pair<uint8_t, std::string_view>>(
gs::Status(StatusCode::NotFound,
"Query name is not registered: " + query_name));
}
if (j.contains("arguments")) {
for (auto& arg : j["arguments"]) {
VLOG(10) << "arg: " << arg;
}
}
VLOG(10) << "Query name: " << query_name;
return std::make_pair(app_name_to_path_index.at(query_name).second,
std::string_view(str_data, len - 1));
return parse_query_type_from_cypher_json(str_view);
} else if (input_tag ==
static_cast<uint8_t>(InputFormat::kCypherInternalProcedure)) {
// For cypher internal procedure, the query_name is
// provided in the protobuf message.
procedure::Query cur_query;
if (!cur_query.ParseFromArray(input.data(), input.size() - 1)) {
LOG(ERROR) << "Fail to parse query from input content";
return Result<std::pair<uint8_t, std::string_view>>(
gs::Status(StatusCode::InternalError,
"Fail to parse query from input content"));
}
auto query_name = cur_query.query_name().name();
if (query_name.empty()) {
LOG(ERROR) << "Query name is empty";
return Result<std::pair<uint8_t, std::string_view>>(
gs::Status(StatusCode::NotFound, "Query name is empty"));
}
const auto& app_name_to_path_index = schema().GetPlugins();
if (app_name_to_path_index.count(query_name) <= 0) {
LOG(ERROR) << "Query name is not registered: " << query_name;
return Result<std::pair<uint8_t, std::string_view>>(
gs::Status(StatusCode::NotFound,
"Query name is not registered: " + query_name));
}
return std::make_pair(app_name_to_path_index.at(query_name).second,
std::string_view(str_data, len - 1));
std::string_view str_view(input.data(), len - 1);
return parse_query_type_from_cypher_internal(str_view);

}
#endif // BUILD_HQPS
else {
Expand Down
2 changes: 1 addition & 1 deletion flex/engines/hqps_db/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ add_library(hqps_plan_proto SHARED ${PROTO_SRCS_GIE})
target_include_directories(hqps_plan_proto PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>
$<BUILD_INTERFACE:${CMAKE_CURRENT_BINARY_DIR}>)
target_link_libraries(hqps_plan_proto PUBLIC protobuf::libprotobuf)
target_link_libraries(hqps_plan_proto PUBLIC ${Protobuf_LIBRARIES})
install_flex_target(hqps_plan_proto)

install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
Expand Down
9 changes: 4 additions & 5 deletions flex/engines/http_server/handler/hqps_http_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,10 @@ seastar::future<std::unique_ptr<seastar::httpd::reply>> hqps_ic_handler::handle(

return executor_refs_[dst_executor]
.run_graph_db_query(query_param{std::move(req->content)})
.then([this, input_format
.then([input_format
#ifdef HAVE_OPENTELEMETRY_CPP
,
outer_span = outer_span
this, outer_span = outer_span
#endif // HAVE_OPENTELEMETRY_CPP
](auto&& output) {
if (output.content.size() < 4) {
Expand Down Expand Up @@ -425,10 +425,9 @@ hqps_adhoc_query_handler::handle(const seastar::sstring& path,
std::move(output.content));
});
})
.then([this
.then([
#ifdef HAVE_OPENTELEMETRY_CPP
,
outer_span = outer_span
this, outer_span = outer_span
#endif // HAVE_OPENTELEMETRY_CPP
](auto&& output) {
if (output.content.size() < 4) {
Expand Down
10 changes: 6 additions & 4 deletions flex/storages/metadata/default_graph_meta_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,11 @@ Result<PluginMeta> DefaultGraphMetaStore::GetPluginMeta(
return Result<PluginMeta>(
Status(StatusCode::InValidArgument, "Plugin not belongs to the graph"));
}
meta.id = plugin_id;
if (meta.id != plugin_id) {
return Result<PluginMeta>(
Status(StatusCode::InValidArgument,
"Plugin id not match: " + plugin_id + " vs " + meta.id));
}
return Result<PluginMeta>(meta);
}

Expand All @@ -143,8 +147,6 @@ Result<std::vector<PluginMeta>> DefaultGraphMetaStore::GetAllPluginMeta(
VLOG(10) << "Found plugin metas: " << res.move_value().size();
for (auto& pair : res.move_value()) {
auto plugin_meta = PluginMeta::FromJson(pair.second);
// the key is id.
plugin_meta.id = pair.first;
if (plugin_meta.bound_graph == graph_id) {
metas.push_back(plugin_meta);
}
Expand Down Expand Up @@ -176,7 +178,7 @@ Result<bool> DefaultGraphMetaStore::DeletePluginMetaByGraphId(
}
VLOG(10) << "Found plugin_ids: " << plugin_ids.size();
for (auto& plugin_id : plugin_ids) {
RETURN_IF_NOT_OK(base_store_->DeleteMeta(PLUGIN_META, plugin_id));
RETURN_IF_NOT_OK(DeletePluginMeta(graph_id, plugin_id));
}
return Result<bool>(true);
}
Expand Down
11 changes: 11 additions & 0 deletions flex/storages/metadata/graph_meta_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ GraphMeta GraphMeta::FromJson(const nlohmann::json& json) {

if (json.contains("data_update_time")) {
meta.data_update_time = json["data_update_time"].get<int64_t>();
} else {
meta.data_update_time = 0;
}
if (json.contains("data_import_config")) {
meta.data_import_config = json["data_import_config"].dump();
Expand Down Expand Up @@ -306,6 +308,7 @@ JobMeta JobMeta::FromJson(const nlohmann::json& json) {

CreateGraphMetaRequest CreateGraphMetaRequest::FromJson(
const std::string& json_str) {
LOG(INFO) << "CreateGraphMetaRequest::FromJson: " << json_str;
CreateGraphMetaRequest request;
nlohmann::json json;
try {
Expand All @@ -326,9 +329,13 @@ CreateGraphMetaRequest CreateGraphMetaRequest::FromJson(
}
if (json.contains("data_update_time")) {
request.data_update_time = json["data_update_time"].get<int64_t>();
} else {
request.data_update_time = 0;
}
if (json.contains("creation_time")) {
request.creation_time = json["creation_time"].get<int64_t>();
} else {
request.creation_time = GetCurrentTimeStamp();
}
return request;
}
Expand All @@ -340,6 +347,8 @@ std::string CreateGraphMetaRequest::ToString() const {
json["schema"] = nlohmann::json::parse(schema);
if (data_update_time.has_value()) {
json["data_update_time"] = data_update_time.value();
} else {
json["data_update_time"] = 0;
}
json["creation_time"] = creation_time;
return json.dump();
Expand Down Expand Up @@ -441,6 +450,8 @@ CreatePluginMetaRequest CreatePluginMetaRequest::FromJson(
}
if (j.contains("creation_time")) {
request.creation_time = j["creation_time"].get<int64_t>();
} else {
request.creation_time = GetCurrentTimeStamp();
}
if (j.contains("description")) {
request.description = j["description"].get<std::string>();
Expand Down
12 changes: 7 additions & 5 deletions flex/storages/rt_mutable_graph/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -701,12 +701,14 @@ static Status parse_vertex_schema(YAML::Node node, Schema& schema) {
property_types[primary_key_inds[i]] != PropertyType::kString &&
property_types[primary_key_inds[i]] != PropertyType::kUInt64 &&
property_types[primary_key_inds[i]] != PropertyType::kInt32 &&
property_types[primary_key_inds[i]] != PropertyType::kUInt32) {
property_types[primary_key_inds[i]] != PropertyType::kUInt32 &&
!property_types[primary_key_inds[i]].IsVarchar()) {
LOG(ERROR) << "Primary key " << primary_key_name
<< " should be int64 or string";
return Status(
StatusCode::InvalidSchema,
"Primary key " + primary_key_name + " should be int64 or string");
<< " should be int64/int32/uint64/uint32 or string/varchar";
return Status(StatusCode::InvalidSchema,
"Primary key " + primary_key_name +
" should be int64/int32/uint64/"
"uint32 or string/varchar");
}
primary_keys.emplace_back(property_types[primary_key_inds[i]],
property_names[primary_key_inds[i]],
Expand Down
2 changes: 1 addition & 1 deletion flex/tests/hqps/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ if (BUILD_HQPS)
add_executable(${T_NAME} ${CMAKE_CURRENT_SOURCE_DIR}/${T_NAME}.cc)
target_link_libraries(${T_NAME} hqps_plan_proto flex_rt_mutable_graph flex_graph_db flex_utils ${GLOG_LIBRARIES} ${LIBGRAPELITE_LIBRARIES})
endforeach()
endif()
endif()
7 changes: 7 additions & 0 deletions flex/utils/id_indexer.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,13 @@ class LFIndexer {
} else if (type.type_enum == impl::PropertyTypeImpl::kVarChar) {
keys_ = new StringColumn(StorageStrategy::kMem,
type.additional_type_info.max_length);
} else if (type.type_enum == impl::PropertyTypeImpl::kString) {
LOG(WARNING) << "String type is a deprecated type, use varchar instead.";
LOG(WARNING) << "Use default max length"
<< PropertyType::STRING_DEFAULT_MAX_LENGTH
<< " for varchar type.";
keys_ = new StringColumn(StorageStrategy::kMem,
PropertyType::STRING_DEFAULT_MAX_LENGTH);
} else {
LOG(FATAL) << "Not support type [" << type << "] as pk type ..";
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
*
* * Copyright 2020 Alibaba Group Holding Limited.
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/

package com.alibaba.graphscope.common.ir.rex;

import org.apache.calcite.rex.RexNode;

import java.util.Collections;
import java.util.List;

public class ClassifiedFilter {
private final List<RexNode> labelFilters;
private final List<Comparable> labelValues;
private final List<RexNode> uniqueKeyFilters;
private final List<RexNode> extraFilters;

public ClassifiedFilter(
List<RexNode> labelFilters,
List<Comparable> labelValues,
List<RexNode> uniqueKeyFilters,
List<RexNode> extraFilters) {
this.labelFilters = labelFilters;
this.labelValues = labelValues;
this.uniqueKeyFilters = uniqueKeyFilters;
this.extraFilters = extraFilters;
}

public List<RexNode> getLabelFilters() {
return Collections.unmodifiableList(labelFilters);
}

public List<Comparable> getLabelValues() {
return Collections.unmodifiableList(labelValues);
}

public List<RexNode> getUniqueKeyFilters() {
return Collections.unmodifiableList(uniqueKeyFilters);
}

public List<RexNode> getExtraFilters() {
return Collections.unmodifiableList(extraFilters);
}
}
Loading

0 comments on commit a640569

Please sign in to comment.