Skip to content

Commit

Permalink
RONDB-776: RDRS: changed hopsworks.schemas.schema to TEXT (#573)
Browse files Browse the repository at this point in the history
* RONDB-776: RDRS: changed hopsworks.schemas.schema to TEXT

* RONDB-776: Fixed malloc. +1 for \0
  • Loading branch information
smkniazi authored Nov 19, 2024
1 parent 569dbe2 commit 6701a10
Show file tree
Hide file tree
Showing 11 changed files with 190 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <string>
#include <vector>

#include "NdbBlob.hpp"
#include "NdbRecAttr.hpp"
#include "src/db-operations/pk/common.hpp"
#include "src/error-strings.h"
Expand Down Expand Up @@ -1305,7 +1306,7 @@ RS_Status find_feature_group_schema_id_int(Ndb *ndb_object, const char *subject_

//-------------------------------------------------------------------------------------------------

RS_Status find_feature_group_schema_int(Ndb *ndb_object, const char *subject_name, int project_id, char *schema) {
RS_Status find_feature_group_schema_int(Ndb *ndb_object, const char *subject_name, int project_id, char **schema) {
int schema_id;
RS_Status status = find_feature_group_schema_id_int(ndb_object, subject_name, project_id, &schema_id);
if (status.http_code != SUCCESS) {
Expand Down Expand Up @@ -1345,16 +1346,14 @@ RS_Status find_feature_group_schema_int(Ndb *ndb_object, const char *subject_nam
return RS_RONDB_SERVER_ERROR(ndb_error, ERROR_023);
}

NdbRecAttr *schema_attr = ndb_op->getValue("schema", nullptr);
assert(FEATURE_GROUP_SCHEMA_SIZE == (Uint32)table_dict->getColumn("schema")->getSizeInBytes());

if (schema_attr == nullptr) {
NdbBlob *schema_blob = ndb_op->getBlobHandle("schema");
if (schema_blob == nullptr) {
ndb_error = ndb_op->getNdbError();
ndb_object->closeTransaction(tx);
return RS_RONDB_SERVER_ERROR(ndb_error, ERROR_019);
}

if (tx->execute(NdbTransaction::Commit) != 0) {
if (tx->execute(NdbTransaction::NoCommit) != 0) {
ndb_error = tx->getNdbError();
ndb_object->closeTransaction(tx);
return RS_RONDB_SERVER_ERROR(ndb_error, ERROR_009);
Expand All @@ -1365,15 +1364,52 @@ RS_Status find_feature_group_schema_int(Ndb *ndb_object, const char *subject_nam
return RS_CLIENT_404_ERROR();
}

Uint32 schema_attr_bytes;
const char *schema_attr_start = nullptr;
if (GetByteArray(schema_attr, &schema_attr_start, &schema_attr_bytes) != 0) {
ndb_object->closeTransaction(tx);
return RS_CLIENT_ERROR(ERROR_019);
Uint64 length = 0;
if (schema_blob->getLength(length) == -1) {
return RS_SERVER_ERROR(ERROR_037 + std::string(" Reading column length failed.") +
std::string(" Column: ") +
std::string(schema_blob->getColumn()->getName()) +
" Type: " + std::to_string(schema_blob->getColumn()->getType()));
}

memcpy(schema, schema_attr_start, schema_attr_bytes);
schema[schema_attr_bytes] = '\0';
Uint64 chunk = 0;
Uint64 total_read = 0;
*schema = (char *)malloc(length + 1); // +1 for \0
char *tmp_buffer = static_cast<char *>(*schema);

for (chunk = 0; chunk < (length / (BLOB_MAX_FETCH_SIZE)) + 1; chunk++) {
Uint64 pos = chunk * BLOB_MAX_FETCH_SIZE;
Uint32 bytes = BLOB_MAX_FETCH_SIZE; // NOTE this is bytes to read and also bytes read.
if (pos + bytes > length) {
bytes = length - pos;
}

if (bytes != 0) {
if (-1 == schema_blob->setPos(pos)) {
return RS_RONDB_SERVER_ERROR(
schema_blob->getNdbError(),
ERROR_037 + std::string(" Failed to set read position.") + std::string(" Column: ") +
std::string(schema_blob->getColumn()->getName()) +
" Type: " + std::to_string(schema_blob->getColumn()->getType()));
}

if (schema_blob->readData(tmp_buffer, bytes /*to read, also bytes read*/) == -1) {
return RS_RONDB_SERVER_ERROR(
schema_blob->getNdbError(),
ERROR_037 + std::string(" Read data failed .") + std::string(" Column: ") +
std::string(schema_blob->getColumn()->getName()) +
" Type: " + std::to_string(schema_blob->getColumn()->getType()) +
" Position: " + std::to_string(pos));
}

if (bytes > 0) {
total_read += bytes;
tmp_buffer += bytes;
}
}
}
assert(total_read == length);
(*schema)[total_read] = '\0';

ndb_object->closeTransaction(tx);

Expand All @@ -1388,13 +1424,13 @@ RS_Status find_feature_group_schema_int(Ndb *ndb_object, const char *subject_nam
* get the schema id of the max version
* SELECT schema from schemas WHERE id = {schema_id}
*/
RS_Status find_feature_group_schema(const char *subject_name, int project_id, char *schema) {
RS_Status find_feature_group_schema(const char *subject_name, int project_id, char **schema /*out, freed by golang*/) {
Ndb *ndb_object = nullptr;
RS_Status status = rdrsRonDBConnectionPool->GetMetadataNdbObject(&ndb_object);
if (status.http_code != SUCCESS) {
return status;
}
/* clang-format off */
/* clang-format off */
METADATA_OP_RETRY_HANDLER(
status = find_feature_group_schema_int(ndb_object, subject_name, project_id, schema);
HandleSchemaErrors(ndb_object, status, {std::make_tuple(HOPSWORKS, SCHEMAS)});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ RS_Status find_serving_key_data(int feature_view_id, Serving_Key **serving_keys,
* get the schema id of the max version
* SELECT schema from schemas WHERE id = {schema_id}
*/
RS_Status find_feature_group_schema(const char *subject_name, int project_id, char *schema);
RS_Status find_feature_group_schema(const char *subject_name, int project_id, char **schema /*out. freed by glang*/);

#endif
#ifdef __cplusplus
Expand Down
1 change: 0 additions & 1 deletion storage/ndb/rest-server/data-access-rondb/src/rdrs-const.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ static inline int bytes_for_ndb_str_len(int ndb_str_len) {
#define SERVING_KEY_JOIN_ON_SIZE 1000 + 2 /* +2 for ndb len or '\0'*/
#define SERVING_KEY_JOIN_PREFIX_SIZE 63 + 1 /* +1 for ndb len or '\0'*/
#define FEATURE_GROUP_SUBJECT_SIZE 255 + 1 /* +1 for ndb len or '\0'*/
#define FEATURE_GROUP_SCHEMA_SIZE 29000 + 2 /* +2 for ndb len or '\0'*/

// Data types
#define DECIMAL_MAX_SIZE_IN_BYTES 9 * 4 /*4 bytes per 9 digits. 65/9 + 1 * 4*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ import (
)

type TrainingDatasetFeature struct {
FeatureID int
TrainingDataset int
FeatureGroupID int // When FG Id is null in DB, the value here is 0. Fg Id starts with 1.
Name string
Type string
TDJoinID int
IDX int
Label int
FeatureViewID int
FeatureID int
TrainingDataset int
FeatureGroupID int // When FG Id is null in DB, the value here is 0. Fg Id starts with 1.
Name string
Type string
TDJoinID int
IDX int
Label int
FeatureViewID int
}

type TrainingDatasetJoin struct {
Expand Down Expand Up @@ -180,15 +180,15 @@ func GetTrainingDatasetFeature(featureViewID int) ([]TrainingDatasetFeature, *Da
retTdfs := make([]TrainingDatasetFeature, int(tdfsSize))
for i, tdf := range tdfsSlice {
retTdf := TrainingDatasetFeature{
FeatureID: int(tdf.feature_id),
TrainingDataset: int(tdf.training_dataset),
FeatureGroupID: int(tdf.feature_group_id),
Name: C.GoString(&tdf.name[0]),
Type: C.GoString(&tdf.data_type[0]),
TDJoinID: int(tdf.td_join_id),
IDX: int(tdf.idx),
Label: int(tdf.label),
FeatureViewID: int(tdf.feature_view_id),
FeatureID: int(tdf.feature_id),
TrainingDataset: int(tdf.training_dataset),
FeatureGroupID: int(tdf.feature_group_id),
Name: C.GoString(&tdf.name[0]),
Type: C.GoString(&tdf.data_type[0]),
TDJoinID: int(tdf.td_join_id),
IDX: int(tdf.idx),
Label: int(tdf.label),
FeatureViewID: int(tdf.feature_view_id),
}
retTdfs[i] = retTdf
}
Expand Down Expand Up @@ -286,19 +286,27 @@ func GetFeatureGroupAvroSchema(fgName string, fgVersion int, projectId int) (*Fe
cSubjectName := C.CString(subjectName)
defer C.free(unsafe.Pointer(cSubjectName))

var schemaBuff = C.malloc(C.size_t(C.FEATURE_GROUP_SCHEMA_SIZE))
defer C.free(schemaBuff)
// memory allocated on C side and freed here
var schemaBuff *C.char

ret := C.find_feature_group_schema(
(*C.char)(unsafe.Pointer(cSubjectName)),
C.int(projectId),
(*C.char)(unsafe.Pointer(schemaBuff)))
&schemaBuff)

if ret.http_code != http.StatusOK {
if unsafe.Pointer(schemaBuff) != nil {
C.free(unsafe.Pointer(schemaBuff))
}
return nil, cToGoRet(&ret)
}

var schema = C.GoString((*C.char)(unsafe.Pointer(schemaBuff)))
var schema = C.GoString(schemaBuff)

if unsafe.Pointer(schemaBuff) != nil {
C.free(unsafe.Pointer(schemaBuff))
}

var avroSchema FeatureGroupAvroSchema
err := json.Unmarshal([]byte(schema), &avroSchema)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,33 @@ var HopsworksData string
//go:embed fixed/hopsworks_40_schema.sql
var HopsworksSchema string

var HopsworksScheme string = HopsworksSchema + HopsworksData
//Upgrades / patches
//V5-FSTORE-1537-managed_feature_group.sql
//V6-FSTORE-1507-python_udfs.sql
//V7-HWORKS-1627-kube_labels_priorityclasses.sql
//V8-HWORKS-1670-ray_integration.sql
//V9-FSTORE-1592-type_column_size.sql
//V10-FSTORE-1598-FSTORE-1595-avro_schema_fixes.sql

//go:embed fixed/V5-FSTORE-1537-managed_feature_group.sql
var V5 string

//go:embed fixed/V6-FSTORE-1507-python_udfs.sql
var V6 string

//go:embed fixed/V7-HWORKS-1627-kube_labels_priorityclasses.sql
var V7 string

//go:embed fixed/V8-HWORKS-1670-ray_integration.sql
var V8 string

//go:embed fixed/V9-FSTORE-1592-type_column_size.sql
var V9 string

//go:embed fixed/V10-FSTORE-1598-FSTORE-1595-avro_schema_fixes.sql
var V10 string

var HopsworksScheme string = HopsworksSchema + HopsworksData + V5 + V6 + V7 + V8 + V9 + V10

const HOPSWORKS_DB_NAME = "hopsworks"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE `hopsworks`.`schemas` DROP FOREIGN KEY project_idx_schemas;
ALTER TABLE `hopsworks`.`schemas` MODIFY COLUMN `schema` TEXT CHARACTER SET latin1 COLLATE latin1_general_cs NOT NULL;
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
ALTER TABLE `hopsworks`.`cached_feature`
ADD `type` varchar(1000) COLLATE latin1_general_cs NULL,
ADD `partition_key` BOOLEAN NULL DEFAULT FALSE,
ADD `default_value` VARCHAR(400) NULL,
MODIFY `description` varchar(256) NULL DEFAULT '';

ALTER TABLE `hopsworks`.`feature_store_s3_connector`
ADD `region` VARCHAR(50) DEFAULT NULL;

ALTER TABLE `hopsworks`.`feature_group`
ADD `path` VARCHAR(1000) NULL,
ADD `connector_id` INT(11) NULL,
ADD CONSTRAINT `connector_fk` FOREIGN KEY (`connector_id`) REFERENCES `feature_store_connector` (`id`) ON DELETE CASCADE ON UPDATE NO ACTION;

UPDATE `hopsworks`.`feature_group` AS fg
JOIN `hopsworks`.`on_demand_feature_group` AS on_demand_fg ON fg.`on_demand_feature_group_id` = on_demand_fg.`id`
SET fg.`path` = on_demand_fg.`path`,
fg.`connector_id` = on_demand_fg.`connector_id`;

ALTER TABLE `hopsworks`.`on_demand_feature_group`
DROP FOREIGN KEY `on_demand_conn_fk`,
DROP COLUMN `path`,
DROP COLUMN `connector_id`;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE `hopsworks`.`transformation_function`
ADD `execution_mode` VARCHAR(255);
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
ALTER TABLE `hopsworks`.`serving`
ADD `scheduling_config` varchar(2000) COLLATE latin1_general_cs DEFAULT NULL;

CREATE TABLE IF NOT EXISTS `kube_priority_class` (
`id` INT(11) AUTO_INCREMENT PRIMARY KEY,
`name` VARCHAR(253) NOT NULL,
UNIQUE KEY `name_idx` (`name`)
) ENGINE = ndbcluster DEFAULT CHARSET = latin1 COLLATE = latin1_general_cs;

CREATE TABLE IF NOT EXISTS `kube_project_priority_class` (
`id` INT(11) AUTO_INCREMENT PRIMARY KEY,
`priority_class_id` INT(11) NOT NULL,
`project_id` INT(11), -- NULL key used for default project settings
`base` INT(0) NOT NULL DEFAULT 0,
CONSTRAINT `priority_class_fkc` FOREIGN KEY (`priority_class_id`) REFERENCES `hopsworks`.`kube_priority_class` (`id`) ON DELETE CASCADE ON UPDATE NO ACTION,
CONSTRAINT `priority_class_project_fkc` FOREIGN KEY (`project_id`) REFERENCES `hopsworks`.`project` (`id`) ON DELETE CASCADE ON UPDATE NO ACTION
) ENGINE = ndbcluster DEFAULT CHARSET = latin1 COLLATE = latin1_general_cs;

CREATE TABLE IF NOT EXISTS `kube_label` (
`id` INT(11) AUTO_INCREMENT PRIMARY KEY,
`name` VARCHAR(317) NOT NULL,
`value` VARCHAR(63),
UNIQUE KEY `name_value_idx` (`name`,`value`)
) ENGINE = ndbcluster DEFAULT CHARSET = latin1 COLLATE = latin1_general_cs;

CREATE TABLE IF NOT EXISTS `kube_project_label` (
`id` INT(11) AUTO_INCREMENT PRIMARY KEY,
`label_id` INT(11) NOT NULL,
`project_id` INT(11), -- NULL key used for default project settings
`base` INT(0) NOT NULL DEFAULT 0,
CONSTRAINT `label_fkc` FOREIGN KEY (`label_id`) REFERENCES `hopsworks`.`kube_label` (`id`) ON DELETE CASCADE ON UPDATE NO ACTION,
CONSTRAINT `label_project_fkc` FOREIGN KEY (`project_id`) REFERENCES `hopsworks`.`project` (`id`) ON DELETE CASCADE ON UPDATE NO ACTION
) ENGINE = ndbcluster DEFAULT CHARSET = latin1 COLLATE = latin1_general_cs;
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
ALTER TABLE `hopsworks`.`executions` ADD COLUMN `ray_dashboard_url` varchar(255) CHARACTER SET latin1 COLLATE latin1_general_cs DEFAULT NULL;
ALTER TABLE `hopsworks`.`executions` ADD COLUMN `ray_cluster_name` varchar(255) CHARACTER SET latin1 COLLATE latin1_general_cs DEFAULT NULL;
ALTER TABLE `hopsworks`.`jupyter_settings` ADD COLUMN `ray_config` varchar(5000) COLLATE latin1_general_cs DEFAULT NULL;

CREATE TABLE `hopsworks`.`jupyter_ray_session` (
`id` int NOT NULL AUTO_INCREMENT,
`creation_time` bigint DEFAULT NULL,
`jupyter_project_id` int NOT NULL,
`application_id` varchar(255) NOT NULL,
`kernel_id` varchar(255) NOT NULL,
`ray_head_node_service` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `kernel_id_unique` (`kernel_id`),
UNIQUE KEY `application_id_unique` (`application_id`),
KEY `fk_ray_session_jupyter_project` (`jupyter_project_id`),
CONSTRAINT `fk_ray_session_jupyter_project` FOREIGN KEY (`jupyter_project_id`) REFERENCES `jupyter_project` (`id`) ON DELETE CASCADE
) ENGINE=ndbcluster
DEFAULT CHARSET=latin1
COLLATE=latin1_general_cs;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE `hopsworks`.`cached_feature`
MODIFY COLUMN `type` VARCHAR(20000) COLLATE latin1_general_cs NULL;

0 comments on commit 6701a10

Please sign in to comment.