Skip to content

Commit

Permalink
Merge pull request #587 from svenssonaxel/RONDB-782
Browse files Browse the repository at this point in the history
RONDB-782 Merge 22.10-dev into 24.10-main
  • Loading branch information
svenssonaxel authored Dec 3, 2024
2 parents dba074c + 364f599 commit 6fddb75
Show file tree
Hide file tree
Showing 13 changed files with 219 additions and 51 deletions.
10 changes: 6 additions & 4 deletions build_scripts/release_scripts/build_all.sh
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ fi

source $SRC_DIR_ABS/MYSQL_VERSION
RONDB_VERSION="$MYSQL_VERSION_MAJOR.$MYSQL_VERSION_MINOR.$MYSQL_VERSION_PATCH"
RONDB_VERSION_EXTRA="$MYSQL_VERSION_EXTRA"

if [[ "$OUTPUT_DIR" == "" ]]; then
echo "Output directory not specified"
Expand Down Expand Up @@ -135,8 +136,9 @@ echo "Build Params:
Release final clusterj $RELEASE_FINAL_CLUSTERJ
Is public release $IS_PUBLIC_RELEASE
Number of build threads: $CORES
RonDB version: $RONDB_VERSION"

RonDB version: $RONDB_VERSION
RonDB version Extra: $RONDB_VERSION_EXTRA"

if [ "$RELEASE_BUILD" = true ]; then
echo "_____________ BUILDING RONDB. RELEASE: TRUE _____________"
cd $TEMP_BUILD_DIR_ABS
Expand Down Expand Up @@ -195,9 +197,9 @@ if [ "$DEPLOY" = true ]; then

else
echo "Not a public release. Skip deploying clusterj."
CLUSTERJ_VERSION=""
CLUSTERJ_VERSION="DO_NOT_DEPLOY"
fi


$SRC_DIR_ABS/build_scripts/release_scripts/deploy.sh $RONDB_VERSION $TARBALL_NAME $OUTPUT_DIR_ABS $SRC_DIR_ABS/id_rsa "$TARBALL_COPY_LOCATION" "$CLUSTERJ_VERSION"
$SRC_DIR_ABS/build_scripts/release_scripts/deploy.sh $RONDB_VERSION $TARBALL_NAME $OUTPUT_DIR_ABS $SRC_DIR_ABS/id_rsa "$TARBALL_COPY_LOCATION" "$CLUSTERJ_VERSION" "$RONDB_VERSION_EXTRA"
fi
25 changes: 19 additions & 6 deletions build_scripts/release_scripts/deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ TARBALL_NAME=$2
OUTPUT_DIR_ABS=$3
ABS_PATH_RSA_KEY=$4
TARBALL_COPY_LOCATION=$5
CLUSTERJ_VERSION=$6
CLUSTERJ_VERSION==$6
RONDB_VERSION_EXTRA=$7

TAR_FILE="$TARBALL_NAME.tar.gz"

Expand All @@ -33,19 +34,31 @@ if [ "$CPU_ARCH" != "x86_64" ]; then
exit 0
fi

if [ -z "$CLUSTERJ_VERSION" ]; then
if [ "$CLUSTERJ_VERSION" = "DO_NOT_DEPLOY" ]; then
echo "Skip deploying clusterj"
exit 0
fi

echo "Extracting ClusterJ JAR file from tarball again"
set +e

# First attempt. clusterj-$RONDB_VERSION.jar. e.g. 22.10.6.jar
echo "Extracting ClusterJ JAR file from tarball"
JAR_FILE="$TARBALL_NAME/share/java/clusterj-$RONDB_VERSION.jar"
tar xf $TAR_FILE_ABS $JAR_FILE
if [[ ! -f "$JAR_FILE" ]]; then
echo "Error: Unable to find cluster file '$JAR_FILE'"
exit 1
echo "Error: Unable to find cluster file '$JAR_FILE'. Retrying ..."

# Second attempt. e.g. 22.10.6LTS.jar
JAR_FILE="$TARBALL_NAME/share/java/clusterj-$RONDB_VERSION$RONDB_VERSION_EXTRA.jar"
tar xf $TAR_FILE_ABS $JAR_FILE
if [[ ! -f "$JAR_FILE" ]]; then
echo "Error: Unable to find cluster file '$JAR_FILE'"
exit 1
fi
fi

set -e

mvn deploy:deploy-file -Dfile=$JAR_FILE -DgroupId=com.mysql.ndb -DartifactId=clusterj-rondb \
-Dversion=$CLUSTERJ_VERSION -Dpackaging=jar -DrepositoryId=Hops \
-Durl=https://archiva.hops.works/repository/Hops \
Expand All @@ -60,7 +73,7 @@ mvn deploy:deploy-file -Dfile=$JAR_FILE -DgroupId=com.mysql.ndb -DartifactId=clu
-DJenkinsHops.User=$EE_USER \
-DJenkinsHops.Password=$EE_PASS

echo "Extracting libndbclient.so.6.1.0 file from tarball again"
echo "Extracting libndbclient.so.6.1.0 file from tarball"
LIBNDB_FILE="$TARBALL_NAME/lib/libndbclient.so.6.1.0"
tar xf $TAR_FILE_ABS $LIBNDB_FILE
if [[ ! -f "$LIBNDB_FILE" ]]; then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <string>
#include <vector>

#include "NdbBlob.hpp"
#include "NdbRecAttr.hpp"
#include "src/db-operations/pk/common.hpp"
#include "src/error-strings.h"
Expand Down Expand Up @@ -1272,7 +1273,7 @@ RS_Status find_feature_group_schema_id_int(Ndb *ndb_object,
RS_Status find_feature_group_schema_int(Ndb *ndb_object,
const char *subject_name,
int project_id,
char *schema) {
char **schema) {
int schema_id;
RS_Status status = find_feature_group_schema_id_int(ndb_object,
subject_name,
Expand Down Expand Up @@ -1309,15 +1310,15 @@ RS_Status find_feature_group_schema_int(Ndb *ndb_object,
ndb_object->closeTransaction(tx);
return RS_RONDB_SERVER_ERROR(ndb_error, ERROR_023);
}
NdbRecAttr *schema_attr = ndb_op->getValue("schema", nullptr);
assert(FEATURE_GROUP_SCHEMA_SIZE ==
(Uint32)table_dict->getColumn("schema")->getSizeInBytes());
if (schema_attr == nullptr) {

NdbBlob *schema_blob = ndb_op->getBlobHandle("schema");
if (schema_blob == nullptr) {
ndb_error = ndb_op->getNdbError();
ndb_object->closeTransaction(tx);
return RS_RONDB_SERVER_ERROR(ndb_error, ERROR_019);
}
if (tx->execute(NdbTransaction::Commit) != 0) {

if (tx->execute(NdbTransaction::NoCommit) != 0) {
ndb_error = tx->getNdbError();
ndb_object->closeTransaction(tx);
return RS_RONDB_SERVER_ERROR(ndb_error, ERROR_009);
Expand All @@ -1326,14 +1327,54 @@ RS_Status find_feature_group_schema_int(Ndb *ndb_object,
ndb_object->closeTransaction(tx);
return RS_CLIENT_404_ERROR();
}
Uint32 schema_attr_bytes;
const char *schema_attr_start = nullptr;
if (GetByteArray(schema_attr, &schema_attr_start, &schema_attr_bytes) != 0) {
ndb_object->closeTransaction(tx);
return RS_CLIENT_ERROR(ERROR_019);

Uint64 length = 0;
if (schema_blob->getLength(length) == -1) {
return RS_SERVER_ERROR(ERROR_037 + std::string(" Reading column length failed.") +
std::string(" Column: ") +
std::string(schema_blob->getColumn()->getName()) +
" Type: " + std::to_string(schema_blob->getColumn()->getType()));
}
memcpy(schema, schema_attr_start, schema_attr_bytes);
schema[schema_attr_bytes] = '\0';

Uint64 chunk = 0;
Uint64 total_read = 0;
*schema = (char *)malloc(length + 1); // +1 for \0
char *tmp_buffer = static_cast<char *>(*schema);

for (chunk = 0; chunk < (length / (BLOB_MAX_FETCH_SIZE)) + 1; chunk++) {
Uint64 pos = chunk * BLOB_MAX_FETCH_SIZE;
Uint32 bytes = BLOB_MAX_FETCH_SIZE; // NOTE this is bytes to read and also bytes read.
if (pos + bytes > length) {
bytes = length - pos;
}

if (bytes != 0) {
if (-1 == schema_blob->setPos(pos)) {
return RS_RONDB_SERVER_ERROR(
schema_blob->getNdbError(),
ERROR_037 + std::string(" Failed to set read position.") + std::string(" Column: ") +
std::string(schema_blob->getColumn()->getName()) +
" Type: " + std::to_string(schema_blob->getColumn()->getType()));
}

if (schema_blob->readData(tmp_buffer, bytes /*to read, also bytes read*/) == -1) {
return RS_RONDB_SERVER_ERROR(
schema_blob->getNdbError(),
ERROR_037 + std::string(" Read data failed .") + std::string(" Column: ") +
std::string(schema_blob->getColumn()->getName()) +
" Type: " + std::to_string(schema_blob->getColumn()->getType()) +
" Position: " + std::to_string(pos));
}

if (bytes > 0) {
total_read += bytes;
tmp_buffer += bytes;
}
}
}
assert(total_read == length);
(*schema)[total_read] = '\0';

ndb_object->closeTransaction(tx);
return RS_OK;
}
Expand All @@ -1345,9 +1386,7 @@ RS_Status find_feature_group_schema_int(Ndb *ndb_object,
* get the schema id of the max version
* SELECT schema from schemas WHERE id = {schema_id}
*/
RS_Status find_feature_group_schema(const char *subject_name,
int project_id,
char *schema) {
RS_Status find_feature_group_schema(const char *subject_name, int project_id, char **schema /*out, freed by golang*/) {
Ndb *ndb_object = nullptr;
RS_Status status = rdrsRonDBConnectionPool->GetMetadataNdbObject(&ndb_object);
if (status.http_code != SUCCESS) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ RS_Status find_serving_key_data(int feature_view_id, Serving_Key **serving_keys,
* get the schema id of the max version
* SELECT schema from schemas WHERE id = {schema_id}
*/
RS_Status find_feature_group_schema(const char *subject_name, int project_id, char *schema);
RS_Status find_feature_group_schema(const char *subject_name, int project_id, char **schema /*out. freed by glang*/);

#endif
#ifdef __cplusplus
Expand Down
1 change: 0 additions & 1 deletion storage/ndb/rest-server/data-access-rondb/src/rdrs-const.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ static inline int bytes_for_ndb_str_len(int ndb_str_len) {
#define SERVING_KEY_JOIN_ON_SIZE 1000 + 2 /* +2 for ndb len or '\0'*/
#define SERVING_KEY_JOIN_PREFIX_SIZE 63 + 1 /* +1 for ndb len or '\0'*/
#define FEATURE_GROUP_SUBJECT_SIZE 255 + 1 /* +1 for ndb len or '\0'*/
#define FEATURE_GROUP_SCHEMA_SIZE 29000 + 2 /* +2 for ndb len or '\0'*/

// Data types
#define DECIMAL_MAX_SIZE_IN_BYTES 9 * 4 /*4 bytes per 9 digits. 65/9 + 1 * 4*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ import (
)

type TrainingDatasetFeature struct {
FeatureID int
TrainingDataset int
FeatureGroupID int // When FG Id is null in DB, the value here is 0. Fg Id starts with 1.
Name string
Type string
TDJoinID int
IDX int
Label int
FeatureViewID int
FeatureID int
TrainingDataset int
FeatureGroupID int // When FG Id is null in DB, the value here is 0. Fg Id starts with 1.
Name string
Type string
TDJoinID int
IDX int
Label int
FeatureViewID int
}

type TrainingDatasetJoin struct {
Expand Down Expand Up @@ -180,15 +180,15 @@ func GetTrainingDatasetFeature(featureViewID int) ([]TrainingDatasetFeature, *Da
retTdfs := make([]TrainingDatasetFeature, int(tdfsSize))
for i, tdf := range tdfsSlice {
retTdf := TrainingDatasetFeature{
FeatureID: int(tdf.feature_id),
TrainingDataset: int(tdf.training_dataset),
FeatureGroupID: int(tdf.feature_group_id),
Name: C.GoString(&tdf.name[0]),
Type: C.GoString(&tdf.data_type[0]),
TDJoinID: int(tdf.td_join_id),
IDX: int(tdf.idx),
Label: int(tdf.label),
FeatureViewID: int(tdf.feature_view_id),
FeatureID: int(tdf.feature_id),
TrainingDataset: int(tdf.training_dataset),
FeatureGroupID: int(tdf.feature_group_id),
Name: C.GoString(&tdf.name[0]),
Type: C.GoString(&tdf.data_type[0]),
TDJoinID: int(tdf.td_join_id),
IDX: int(tdf.idx),
Label: int(tdf.label),
FeatureViewID: int(tdf.feature_view_id),
}
retTdfs[i] = retTdf
}
Expand Down Expand Up @@ -291,19 +291,27 @@ func GetFeatureGroupAvroSchema(fgName string, fgVersion int, projectId int) (*Fe
cSubjectName := C.CString(subjectName)
defer C.free(unsafe.Pointer(cSubjectName))

var schemaBuff = C.malloc(C.size_t(C.FEATURE_GROUP_SCHEMA_SIZE))
defer C.free(schemaBuff)
// memory allocated on C side and freed here
var schemaBuff *C.char

ret := C.find_feature_group_schema(
(*C.char)(unsafe.Pointer(cSubjectName)),
C.int(projectId),
(*C.char)(unsafe.Pointer(schemaBuff)))
&schemaBuff)

if ret.http_code != http.StatusOK {
if unsafe.Pointer(schemaBuff) != nil {
C.free(unsafe.Pointer(schemaBuff))
}
return nil, cToGoRet(&ret)
}

var schema = C.GoString((*C.char)(unsafe.Pointer(schemaBuff)))
var schema = C.GoString(schemaBuff)

if unsafe.Pointer(schemaBuff) != nil {
C.free(unsafe.Pointer(schemaBuff))
}

var avroSchema FeatureGroupAvroSchema
err := json.Unmarshal([]byte(schema), &avroSchema)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,33 @@ var HopsworksData string
//go:embed fixed/hopsworks_40_schema.sql
var HopsworksSchema string

var HopsworksScheme string = HopsworksSchema + HopsworksData
//Upgrades / patches
//V5-FSTORE-1537-managed_feature_group.sql
//V6-FSTORE-1507-python_udfs.sql
//V7-HWORKS-1627-kube_labels_priorityclasses.sql
//V8-HWORKS-1670-ray_integration.sql
//V9-FSTORE-1592-type_column_size.sql
//V10-FSTORE-1598-FSTORE-1595-avro_schema_fixes.sql

//go:embed fixed/V5-FSTORE-1537-managed_feature_group.sql
var V5 string

//go:embed fixed/V6-FSTORE-1507-python_udfs.sql
var V6 string

//go:embed fixed/V7-HWORKS-1627-kube_labels_priorityclasses.sql
var V7 string

//go:embed fixed/V8-HWORKS-1670-ray_integration.sql
var V8 string

//go:embed fixed/V9-FSTORE-1592-type_column_size.sql
var V9 string

//go:embed fixed/V10-FSTORE-1598-FSTORE-1595-avro_schema_fixes.sql
var V10 string

var HopsworksScheme string = HopsworksSchema + HopsworksData + V5 + V6 + V7 + V8 + V9 + V10

const HOPSWORKS_DB_NAME = "hopsworks"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE `hopsworks`.`schemas` DROP FOREIGN KEY project_idx_schemas;
ALTER TABLE `hopsworks`.`schemas` MODIFY COLUMN `schema` TEXT CHARACTER SET latin1 COLLATE latin1_general_cs NOT NULL;
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
ALTER TABLE `hopsworks`.`cached_feature`
ADD `type` varchar(1000) COLLATE latin1_general_cs NULL,
ADD `partition_key` BOOLEAN NULL DEFAULT FALSE,
ADD `default_value` VARCHAR(400) NULL,
MODIFY `description` varchar(256) NULL DEFAULT '';

ALTER TABLE `hopsworks`.`feature_store_s3_connector`
ADD `region` VARCHAR(50) DEFAULT NULL;

ALTER TABLE `hopsworks`.`feature_group`
ADD `path` VARCHAR(1000) NULL,
ADD `connector_id` INT(11) NULL,
ADD CONSTRAINT `connector_fk` FOREIGN KEY (`connector_id`) REFERENCES `feature_store_connector` (`id`) ON DELETE CASCADE ON UPDATE NO ACTION;

UPDATE `hopsworks`.`feature_group` AS fg
JOIN `hopsworks`.`on_demand_feature_group` AS on_demand_fg ON fg.`on_demand_feature_group_id` = on_demand_fg.`id`
SET fg.`path` = on_demand_fg.`path`,
fg.`connector_id` = on_demand_fg.`connector_id`;

ALTER TABLE `hopsworks`.`on_demand_feature_group`
DROP FOREIGN KEY `on_demand_conn_fk`,
DROP COLUMN `path`,
DROP COLUMN `connector_id`;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE `hopsworks`.`transformation_function`
ADD `execution_mode` VARCHAR(255);
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
ALTER TABLE `hopsworks`.`serving`
ADD `scheduling_config` varchar(2000) COLLATE latin1_general_cs DEFAULT NULL;

CREATE TABLE IF NOT EXISTS `kube_priority_class` (
`id` INT(11) AUTO_INCREMENT PRIMARY KEY,
`name` VARCHAR(253) NOT NULL,
UNIQUE KEY `name_idx` (`name`)
) ENGINE = ndbcluster DEFAULT CHARSET = latin1 COLLATE = latin1_general_cs;

CREATE TABLE IF NOT EXISTS `kube_project_priority_class` (
`id` INT(11) AUTO_INCREMENT PRIMARY KEY,
`priority_class_id` INT(11) NOT NULL,
`project_id` INT(11), -- NULL key used for default project settings
`base` INT(0) NOT NULL DEFAULT 0,
CONSTRAINT `priority_class_fkc` FOREIGN KEY (`priority_class_id`) REFERENCES `hopsworks`.`kube_priority_class` (`id`) ON DELETE CASCADE ON UPDATE NO ACTION,
CONSTRAINT `priority_class_project_fkc` FOREIGN KEY (`project_id`) REFERENCES `hopsworks`.`project` (`id`) ON DELETE CASCADE ON UPDATE NO ACTION
) ENGINE = ndbcluster DEFAULT CHARSET = latin1 COLLATE = latin1_general_cs;

CREATE TABLE IF NOT EXISTS `kube_label` (
`id` INT(11) AUTO_INCREMENT PRIMARY KEY,
`name` VARCHAR(317) NOT NULL,
`value` VARCHAR(63),
UNIQUE KEY `name_value_idx` (`name`,`value`)
) ENGINE = ndbcluster DEFAULT CHARSET = latin1 COLLATE = latin1_general_cs;

CREATE TABLE IF NOT EXISTS `kube_project_label` (
`id` INT(11) AUTO_INCREMENT PRIMARY KEY,
`label_id` INT(11) NOT NULL,
`project_id` INT(11), -- NULL key used for default project settings
`base` INT(0) NOT NULL DEFAULT 0,
CONSTRAINT `label_fkc` FOREIGN KEY (`label_id`) REFERENCES `hopsworks`.`kube_label` (`id`) ON DELETE CASCADE ON UPDATE NO ACTION,
CONSTRAINT `label_project_fkc` FOREIGN KEY (`project_id`) REFERENCES `hopsworks`.`project` (`id`) ON DELETE CASCADE ON UPDATE NO ACTION
) ENGINE = ndbcluster DEFAULT CHARSET = latin1 COLLATE = latin1_general_cs;
Loading

0 comments on commit 6fddb75

Please sign in to comment.