diff --git a/storage/ndb/rest-server/data-access-rondb/src/feature_store/feature_store.cpp b/storage/ndb/rest-server/data-access-rondb/src/feature_store/feature_store.cpp index 6acf44b8c9c1..67a6fb0b422c 100644 --- a/storage/ndb/rest-server/data-access-rondb/src/feature_store/feature_store.cpp +++ b/storage/ndb/rest-server/data-access-rondb/src/feature_store/feature_store.cpp @@ -26,6 +26,7 @@ #include #include +#include "NdbBlob.hpp" #include "NdbRecAttr.hpp" #include "src/db-operations/pk/common.hpp" #include "src/error-strings.h" @@ -1305,7 +1306,7 @@ RS_Status find_feature_group_schema_id_int(Ndb *ndb_object, const char *subject_ //------------------------------------------------------------------------------------------------- -RS_Status find_feature_group_schema_int(Ndb *ndb_object, const char *subject_name, int project_id, char *schema) { +RS_Status find_feature_group_schema_int(Ndb *ndb_object, const char *subject_name, int project_id, char **schema) { int schema_id; RS_Status status = find_feature_group_schema_id_int(ndb_object, subject_name, project_id, &schema_id); if (status.http_code != SUCCESS) { @@ -1345,16 +1346,14 @@ RS_Status find_feature_group_schema_int(Ndb *ndb_object, const char *subject_nam return RS_RONDB_SERVER_ERROR(ndb_error, ERROR_023); } - NdbRecAttr *schema_attr = ndb_op->getValue("schema", nullptr); - assert(FEATURE_GROUP_SCHEMA_SIZE == (Uint32)table_dict->getColumn("schema")->getSizeInBytes()); - - if (schema_attr == nullptr) { + NdbBlob *schema_blob = ndb_op->getBlobHandle("schema"); + if (schema_blob == nullptr) { ndb_error = ndb_op->getNdbError(); ndb_object->closeTransaction(tx); return RS_RONDB_SERVER_ERROR(ndb_error, ERROR_019); } - if (tx->execute(NdbTransaction::Commit) != 0) { + if (tx->execute(NdbTransaction::NoCommit) != 0) { ndb_error = tx->getNdbError(); ndb_object->closeTransaction(tx); return RS_RONDB_SERVER_ERROR(ndb_error, ERROR_009); @@ -1365,15 +1364,52 @@ RS_Status find_feature_group_schema_int(Ndb *ndb_object, const char *subject_nam return RS_CLIENT_404_ERROR(); } - Uint32 schema_attr_bytes; - const char *schema_attr_start = nullptr; - if (GetByteArray(schema_attr, &schema_attr_start, &schema_attr_bytes) != 0) { - ndb_object->closeTransaction(tx); - return RS_CLIENT_ERROR(ERROR_019); + Uint64 length = 0; + if (schema_blob->getLength(length) == -1) { + return RS_SERVER_ERROR(ERROR_037 + std::string(" Reading column length failed.") + + std::string(" Column: ") + + std::string(schema_blob->getColumn()->getName()) + + " Type: " + std::to_string(schema_blob->getColumn()->getType())); } - memcpy(schema, schema_attr_start, schema_attr_bytes); - schema[schema_attr_bytes] = '\0'; + Uint64 chunk = 0; + Uint64 total_read = 0; + *schema = (char *)malloc(length + 1); // +1 for \0 + char *tmp_buffer = static_cast(*schema); + + for (chunk = 0; chunk < (length / (BLOB_MAX_FETCH_SIZE)) + 1; chunk++) { + Uint64 pos = chunk * BLOB_MAX_FETCH_SIZE; + Uint32 bytes = BLOB_MAX_FETCH_SIZE; // NOTE this is bytes to read and also bytes read. + if (pos + bytes > length) { + bytes = length - pos; + } + + if (bytes != 0) { + if (-1 == schema_blob->setPos(pos)) { + return RS_RONDB_SERVER_ERROR( + schema_blob->getNdbError(), + ERROR_037 + std::string(" Failed to set read position.") + std::string(" Column: ") + + std::string(schema_blob->getColumn()->getName()) + + " Type: " + std::to_string(schema_blob->getColumn()->getType())); + } + + if (schema_blob->readData(tmp_buffer, bytes /*to read, also bytes read*/) == -1) { + return RS_RONDB_SERVER_ERROR( + schema_blob->getNdbError(), + ERROR_037 + std::string(" Read data failed .") + std::string(" Column: ") + + std::string(schema_blob->getColumn()->getName()) + + " Type: " + std::to_string(schema_blob->getColumn()->getType()) + + " Position: " + std::to_string(pos)); + } + + if (bytes > 0) { + total_read += bytes; + tmp_buffer += bytes; + } + } + } + assert(total_read == length); + (*schema)[total_read] = '\0'; ndb_object->closeTransaction(tx); @@ -1388,13 +1424,13 @@ RS_Status find_feature_group_schema_int(Ndb *ndb_object, const char *subject_nam * get the schema id of the max version * SELECT schema from schemas WHERE id = {schema_id} */ -RS_Status find_feature_group_schema(const char *subject_name, int project_id, char *schema) { +RS_Status find_feature_group_schema(const char *subject_name, int project_id, char **schema /*out, freed by golang*/) { Ndb *ndb_object = nullptr; RS_Status status = rdrsRonDBConnectionPool->GetMetadataNdbObject(&ndb_object); if (status.http_code != SUCCESS) { return status; } - /* clang-format off */ + /* clang-format off */ METADATA_OP_RETRY_HANDLER( status = find_feature_group_schema_int(ndb_object, subject_name, project_id, schema); HandleSchemaErrors(ndb_object, status, {std::make_tuple(HOPSWORKS, SCHEMAS)}); diff --git a/storage/ndb/rest-server/data-access-rondb/src/feature_store/feature_store.h b/storage/ndb/rest-server/data-access-rondb/src/feature_store/feature_store.h index 994ce4f2088f..55bfe92a535d 100644 --- a/storage/ndb/rest-server/data-access-rondb/src/feature_store/feature_store.h +++ b/storage/ndb/rest-server/data-access-rondb/src/feature_store/feature_store.h @@ -124,7 +124,7 @@ RS_Status find_serving_key_data(int feature_view_id, Serving_Key **serving_keys, * get the schema id of the max version * SELECT schema from schemas WHERE id = {schema_id} */ -RS_Status find_feature_group_schema(const char *subject_name, int project_id, char *schema); +RS_Status find_feature_group_schema(const char *subject_name, int project_id, char **schema /*out. freed by glang*/); #endif #ifdef __cplusplus diff --git a/storage/ndb/rest-server/data-access-rondb/src/rdrs-const.h b/storage/ndb/rest-server/data-access-rondb/src/rdrs-const.h index 8a69d0a49e51..a46d4c51ae36 100644 --- a/storage/ndb/rest-server/data-access-rondb/src/rdrs-const.h +++ b/storage/ndb/rest-server/data-access-rondb/src/rdrs-const.h @@ -116,7 +116,6 @@ static inline int bytes_for_ndb_str_len(int ndb_str_len) { #define SERVING_KEY_JOIN_ON_SIZE 1000 + 2 /* +2 for ndb len or '\0'*/ #define SERVING_KEY_JOIN_PREFIX_SIZE 63 + 1 /* +1 for ndb len or '\0'*/ #define FEATURE_GROUP_SUBJECT_SIZE 255 + 1 /* +1 for ndb len or '\0'*/ -#define FEATURE_GROUP_SCHEMA_SIZE 29000 + 2 /* +2 for ndb len or '\0'*/ // Data types #define DECIMAL_MAX_SIZE_IN_BYTES 9 * 4 /*4 bytes per 9 digits. 65/9 + 1 * 4*/ diff --git a/storage/ndb/rest-server/rest-api-server/internal/dal/operations_feature_store.go b/storage/ndb/rest-server/rest-api-server/internal/dal/operations_feature_store.go index d758913278ac..3dc78d312e0c 100644 --- a/storage/ndb/rest-server/rest-api-server/internal/dal/operations_feature_store.go +++ b/storage/ndb/rest-server/rest-api-server/internal/dal/operations_feature_store.go @@ -36,15 +36,15 @@ import ( ) type TrainingDatasetFeature struct { - FeatureID int - TrainingDataset int - FeatureGroupID int // When FG Id is null in DB, the value here is 0. Fg Id starts with 1. - Name string - Type string - TDJoinID int - IDX int - Label int - FeatureViewID int + FeatureID int + TrainingDataset int + FeatureGroupID int // When FG Id is null in DB, the value here is 0. Fg Id starts with 1. + Name string + Type string + TDJoinID int + IDX int + Label int + FeatureViewID int } type TrainingDatasetJoin struct { @@ -180,15 +180,15 @@ func GetTrainingDatasetFeature(featureViewID int) ([]TrainingDatasetFeature, *Da retTdfs := make([]TrainingDatasetFeature, int(tdfsSize)) for i, tdf := range tdfsSlice { retTdf := TrainingDatasetFeature{ - FeatureID: int(tdf.feature_id), - TrainingDataset: int(tdf.training_dataset), - FeatureGroupID: int(tdf.feature_group_id), - Name: C.GoString(&tdf.name[0]), - Type: C.GoString(&tdf.data_type[0]), - TDJoinID: int(tdf.td_join_id), - IDX: int(tdf.idx), - Label: int(tdf.label), - FeatureViewID: int(tdf.feature_view_id), + FeatureID: int(tdf.feature_id), + TrainingDataset: int(tdf.training_dataset), + FeatureGroupID: int(tdf.feature_group_id), + Name: C.GoString(&tdf.name[0]), + Type: C.GoString(&tdf.data_type[0]), + TDJoinID: int(tdf.td_join_id), + IDX: int(tdf.idx), + Label: int(tdf.label), + FeatureViewID: int(tdf.feature_view_id), } retTdfs[i] = retTdf } @@ -286,19 +286,27 @@ func GetFeatureGroupAvroSchema(fgName string, fgVersion int, projectId int) (*Fe cSubjectName := C.CString(subjectName) defer C.free(unsafe.Pointer(cSubjectName)) - var schemaBuff = C.malloc(C.size_t(C.FEATURE_GROUP_SCHEMA_SIZE)) - defer C.free(schemaBuff) + // memory allocated on C side and freed here + var schemaBuff *C.char ret := C.find_feature_group_schema( (*C.char)(unsafe.Pointer(cSubjectName)), C.int(projectId), - (*C.char)(unsafe.Pointer(schemaBuff))) + &schemaBuff) if ret.http_code != http.StatusOK { + if unsafe.Pointer(schemaBuff) != nil { + C.free(unsafe.Pointer(schemaBuff)) + } return nil, cToGoRet(&ret) } - var schema = C.GoString((*C.char)(unsafe.Pointer(schemaBuff))) + var schema = C.GoString(schemaBuff) + + if unsafe.Pointer(schemaBuff) != nil { + C.free(unsafe.Pointer(schemaBuff)) + } + var avroSchema FeatureGroupAvroSchema err := json.Unmarshal([]byte(schema), &avroSchema) if err != nil { diff --git a/storage/ndb/rest-server/rest-api-server/resources/testdbs/embeddings.go b/storage/ndb/rest-server/rest-api-server/resources/testdbs/embeddings.go index 5d4cf790244b..970b267eb3bf 100644 --- a/storage/ndb/rest-server/rest-api-server/resources/testdbs/embeddings.go +++ b/storage/ndb/rest-server/rest-api-server/resources/testdbs/embeddings.go @@ -44,7 +44,33 @@ var HopsworksData string //go:embed fixed/hopsworks_40_schema.sql var HopsworksSchema string -var HopsworksScheme string = HopsworksSchema + HopsworksData +//Upgrades / patches +//V5-FSTORE-1537-managed_feature_group.sql +//V6-FSTORE-1507-python_udfs.sql +//V7-HWORKS-1627-kube_labels_priorityclasses.sql +//V8-HWORKS-1670-ray_integration.sql +//V9-FSTORE-1592-type_column_size.sql +//V10-FSTORE-1598-FSTORE-1595-avro_schema_fixes.sql + +//go:embed fixed/V5-FSTORE-1537-managed_feature_group.sql +var V5 string + +//go:embed fixed/V6-FSTORE-1507-python_udfs.sql +var V6 string + +//go:embed fixed/V7-HWORKS-1627-kube_labels_priorityclasses.sql +var V7 string + +//go:embed fixed/V8-HWORKS-1670-ray_integration.sql +var V8 string + +//go:embed fixed/V9-FSTORE-1592-type_column_size.sql +var V9 string + +//go:embed fixed/V10-FSTORE-1598-FSTORE-1595-avro_schema_fixes.sql +var V10 string + +var HopsworksScheme string = HopsworksSchema + HopsworksData + V5 + V6 + V7 + V8 + V9 + V10 const HOPSWORKS_DB_NAME = "hopsworks" diff --git a/storage/ndb/rest-server/rest-api-server/resources/testdbs/fixed/V10-FSTORE-1598-FSTORE-1595-avro_schema_fixes.sql b/storage/ndb/rest-server/rest-api-server/resources/testdbs/fixed/V10-FSTORE-1598-FSTORE-1595-avro_schema_fixes.sql new file mode 100644 index 000000000000..f77a93d649ec --- /dev/null +++ b/storage/ndb/rest-server/rest-api-server/resources/testdbs/fixed/V10-FSTORE-1598-FSTORE-1595-avro_schema_fixes.sql @@ -0,0 +1,2 @@ +ALTER TABLE `hopsworks`.`schemas` DROP FOREIGN KEY project_idx_schemas; +ALTER TABLE `hopsworks`.`schemas` MODIFY COLUMN `schema` TEXT CHARACTER SET latin1 COLLATE latin1_general_cs NOT NULL; \ No newline at end of file diff --git a/storage/ndb/rest-server/rest-api-server/resources/testdbs/fixed/V5-FSTORE-1537-managed_feature_group.sql b/storage/ndb/rest-server/rest-api-server/resources/testdbs/fixed/V5-FSTORE-1537-managed_feature_group.sql new file mode 100644 index 000000000000..2f35a032d4bf --- /dev/null +++ b/storage/ndb/rest-server/rest-api-server/resources/testdbs/fixed/V5-FSTORE-1537-managed_feature_group.sql @@ -0,0 +1,23 @@ +ALTER TABLE `hopsworks`.`cached_feature` + ADD `type` varchar(1000) COLLATE latin1_general_cs NULL, + ADD `partition_key` BOOLEAN NULL DEFAULT FALSE, + ADD `default_value` VARCHAR(400) NULL, + MODIFY `description` varchar(256) NULL DEFAULT ''; + +ALTER TABLE `hopsworks`.`feature_store_s3_connector` + ADD `region` VARCHAR(50) DEFAULT NULL; + +ALTER TABLE `hopsworks`.`feature_group` + ADD `path` VARCHAR(1000) NULL, + ADD `connector_id` INT(11) NULL, + ADD CONSTRAINT `connector_fk` FOREIGN KEY (`connector_id`) REFERENCES `feature_store_connector` (`id`) ON DELETE CASCADE ON UPDATE NO ACTION; + +UPDATE `hopsworks`.`feature_group` AS fg +JOIN `hopsworks`.`on_demand_feature_group` AS on_demand_fg ON fg.`on_demand_feature_group_id` = on_demand_fg.`id` +SET fg.`path` = on_demand_fg.`path`, + fg.`connector_id` = on_demand_fg.`connector_id`; + +ALTER TABLE `hopsworks`.`on_demand_feature_group` + DROP FOREIGN KEY `on_demand_conn_fk`, + DROP COLUMN `path`, + DROP COLUMN `connector_id`; \ No newline at end of file diff --git a/storage/ndb/rest-server/rest-api-server/resources/testdbs/fixed/V6-FSTORE-1507-python_udfs.sql b/storage/ndb/rest-server/rest-api-server/resources/testdbs/fixed/V6-FSTORE-1507-python_udfs.sql new file mode 100644 index 000000000000..54a564340165 --- /dev/null +++ b/storage/ndb/rest-server/rest-api-server/resources/testdbs/fixed/V6-FSTORE-1507-python_udfs.sql @@ -0,0 +1,2 @@ +ALTER TABLE `hopsworks`.`transformation_function` + ADD `execution_mode` VARCHAR(255); \ No newline at end of file diff --git a/storage/ndb/rest-server/rest-api-server/resources/testdbs/fixed/V7-HWORKS-1627-kube_labels_priorityclasses.sql b/storage/ndb/rest-server/rest-api-server/resources/testdbs/fixed/V7-HWORKS-1627-kube_labels_priorityclasses.sql new file mode 100644 index 000000000000..590918426012 --- /dev/null +++ b/storage/ndb/rest-server/rest-api-server/resources/testdbs/fixed/V7-HWORKS-1627-kube_labels_priorityclasses.sql @@ -0,0 +1,33 @@ +ALTER TABLE `hopsworks`.`serving` + ADD `scheduling_config` varchar(2000) COLLATE latin1_general_cs DEFAULT NULL; + +CREATE TABLE IF NOT EXISTS `kube_priority_class` ( + `id` INT(11) AUTO_INCREMENT PRIMARY KEY, + `name` VARCHAR(253) NOT NULL, + UNIQUE KEY `name_idx` (`name`) + ) ENGINE = ndbcluster DEFAULT CHARSET = latin1 COLLATE = latin1_general_cs; + +CREATE TABLE IF NOT EXISTS `kube_project_priority_class` ( + `id` INT(11) AUTO_INCREMENT PRIMARY KEY, + `priority_class_id` INT(11) NOT NULL, + `project_id` INT(11), -- NULL key used for default project settings + `base` INT(0) NOT NULL DEFAULT 0, + CONSTRAINT `priority_class_fkc` FOREIGN KEY (`priority_class_id`) REFERENCES `hopsworks`.`kube_priority_class` (`id`) ON DELETE CASCADE ON UPDATE NO ACTION, + CONSTRAINT `priority_class_project_fkc` FOREIGN KEY (`project_id`) REFERENCES `hopsworks`.`project` (`id`) ON DELETE CASCADE ON UPDATE NO ACTION + ) ENGINE = ndbcluster DEFAULT CHARSET = latin1 COLLATE = latin1_general_cs; + +CREATE TABLE IF NOT EXISTS `kube_label` ( + `id` INT(11) AUTO_INCREMENT PRIMARY KEY, + `name` VARCHAR(317) NOT NULL, + `value` VARCHAR(63), + UNIQUE KEY `name_value_idx` (`name`,`value`) + ) ENGINE = ndbcluster DEFAULT CHARSET = latin1 COLLATE = latin1_general_cs; + +CREATE TABLE IF NOT EXISTS `kube_project_label` ( + `id` INT(11) AUTO_INCREMENT PRIMARY KEY, + `label_id` INT(11) NOT NULL, + `project_id` INT(11), -- NULL key used for default project settings + `base` INT(0) NOT NULL DEFAULT 0, + CONSTRAINT `label_fkc` FOREIGN KEY (`label_id`) REFERENCES `hopsworks`.`kube_label` (`id`) ON DELETE CASCADE ON UPDATE NO ACTION, + CONSTRAINT `label_project_fkc` FOREIGN KEY (`project_id`) REFERENCES `hopsworks`.`project` (`id`) ON DELETE CASCADE ON UPDATE NO ACTION + ) ENGINE = ndbcluster DEFAULT CHARSET = latin1 COLLATE = latin1_general_cs; \ No newline at end of file diff --git a/storage/ndb/rest-server/rest-api-server/resources/testdbs/fixed/V8-HWORKS-1670-ray_integration.sql b/storage/ndb/rest-server/rest-api-server/resources/testdbs/fixed/V8-HWORKS-1670-ray_integration.sql new file mode 100644 index 000000000000..4655a5267217 --- /dev/null +++ b/storage/ndb/rest-server/rest-api-server/resources/testdbs/fixed/V8-HWORKS-1670-ray_integration.sql @@ -0,0 +1,19 @@ +ALTER TABLE `hopsworks`.`executions` ADD COLUMN `ray_dashboard_url` varchar(255) CHARACTER SET latin1 COLLATE latin1_general_cs DEFAULT NULL; +ALTER TABLE `hopsworks`.`executions` ADD COLUMN `ray_cluster_name` varchar(255) CHARACTER SET latin1 COLLATE latin1_general_cs DEFAULT NULL; +ALTER TABLE `hopsworks`.`jupyter_settings` ADD COLUMN `ray_config` varchar(5000) COLLATE latin1_general_cs DEFAULT NULL; + +CREATE TABLE `hopsworks`.`jupyter_ray_session` ( + `id` int NOT NULL AUTO_INCREMENT, + `creation_time` bigint DEFAULT NULL, + `jupyter_project_id` int NOT NULL, + `application_id` varchar(255) NOT NULL, + `kernel_id` varchar(255) NOT NULL, + `ray_head_node_service` varchar(255) DEFAULT NULL, + PRIMARY KEY (`id`), + UNIQUE KEY `kernel_id_unique` (`kernel_id`), + UNIQUE KEY `application_id_unique` (`application_id`), + KEY `fk_ray_session_jupyter_project` (`jupyter_project_id`), + CONSTRAINT `fk_ray_session_jupyter_project` FOREIGN KEY (`jupyter_project_id`) REFERENCES `jupyter_project` (`id`) ON DELETE CASCADE +) ENGINE=ndbcluster + DEFAULT CHARSET=latin1 + COLLATE=latin1_general_cs; \ No newline at end of file diff --git a/storage/ndb/rest-server/rest-api-server/resources/testdbs/fixed/V9-FSTORE-1592-type_column_size.sql b/storage/ndb/rest-server/rest-api-server/resources/testdbs/fixed/V9-FSTORE-1592-type_column_size.sql new file mode 100644 index 000000000000..38a62fad8a84 --- /dev/null +++ b/storage/ndb/rest-server/rest-api-server/resources/testdbs/fixed/V9-FSTORE-1592-type_column_size.sql @@ -0,0 +1,2 @@ +ALTER TABLE `hopsworks`.`cached_feature` + MODIFY COLUMN `type` VARCHAR(20000) COLLATE latin1_general_cs NULL; \ No newline at end of file