Skip to content

Commit

Permalink
RONDB-784: RDRS1 Performance Improvements for Complex Features (#582)
Browse files Browse the repository at this point in the history
* RDRS Performance improvements around complex features

- Bump GO version
- Use hamba avro as a replacement for linkedin avro library to deserialize complex features
- Avoid json.Unmarshal when parsing complex feature field
- Use Sonic library to serialize JSON before sending to the client.

* RONDB-784: RDRS1 Performance Improvements for Complex Features

---------

Co-authored-by: Salman Niazi <[email protected]>
  • Loading branch information
SirOibaf and smkniazi authored Dec 3, 2024
1 parent 0e85bdf commit dd13385
Show file tree
Hide file tree
Showing 19 changed files with 173 additions and 191 deletions.
2 changes: 1 addition & 1 deletion Dockerfile.oraclelinux8
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ ARG OPEN_SSL_VERSION=3.0.11
ARG BOOST_VERSION_MAJOR=1
ARG BOOST_VERSION_MINOR=77
ARG BOOST_VERSION_PATCH=0
ARG GO_VERSION=1.20.3
ARG GO_VERSION=1.22.9
ARG JSONCPP_VERSION=1.9.5

# Default build threads to 1; max is defined in Docker config (run `nproc` in Docker container)
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.oraclelinux9
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ ARG OPENSSL_ROOT=system
ARG BOOST_VERSION_MAJOR=1
ARG BOOST_VERSION_MINOR=77
ARG BOOST_VERSION_PATCH=0
ARG GO_VERSION=1.20.3
ARG GO_VERSION=1.22.9
ARG JSONCPP_VERSION=1.9.5

# Default build threads to 1; max is defined in Docker config (run `nproc` in Docker container)
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.ubuntu22
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ ARG OPEN_SSL_VERSION=3.0.11
ARG BOOST_VERSION_MAJOR=1
ARG BOOST_VERSION_MINOR=77
ARG BOOST_VERSION_PATCH=0
ARG GO_VERSION=1.20.3
ARG GO_VERSION=1.22.9
ARG JSONCPP_VERSION=1.9.5

# Default build threads to 1; max is defined in Docker config (run `nproc` in Docker container)
Expand Down
26 changes: 13 additions & 13 deletions storage/ndb/rest-server/rest-api-server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
//
module hopsworks.ai/rdrs

go 1.20
go 1.22.0

toolchain go1.23.2

require (
github.com/gin-contrib/sse v0.1.0 // indirect
Expand All @@ -33,44 +35,42 @@ require (
github.com/sirupsen/logrus v1.9.3
github.com/ugorji/go/codec v1.2.11 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/sys v0.14.0 // indirect
golang.org/x/sys v0.27.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1
)

require (
github.com/hamba/avro/v2 v2.27.0
github.com/prometheus/client_golang v1.17.0
google.golang.org/grpc v1.59.0
google.golang.org/protobuf v1.31.0
)

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/bytedance/sonic/loader v0.2.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/chenzhuoyu/iasm v0.9.1 // indirect
github.com/cloudwego/base64x v0.1.4 // indirect
github.com/cloudwego/iasm v0.2.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.45.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect
zappem.net/pub/debug/xxd v1.0.0 // indirect
)

require (
github.com/bytedance/sonic v1.10.2 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect
github.com/bytedance/sonic v1.12.5
github.com/goccy/go-json v0.10.2 // indirect
github.com/ianlancetaylor/cgosymbolizer v0.0.0-20230328201059-365e72989107
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
github.com/linkedin/goavro/v2 v2.12.0
github.com/klauspost/cpuid/v2 v2.2.9 // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
golang.org/x/arch v0.6.0 // indirect
golang.org/x/arch v0.12.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto v0.0.0-20231030173426-d783a09b4405 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
127 changes: 28 additions & 99 deletions storage/ndb/rest-server/rest-api-server/go.sum

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ func newWithDefaults() AllConfigs {
PreAllocatedBuffers: 32,
BatchMaxSize: 256,
OperationIDMaxSize: 256,
EnablePPROF: false,
PPROFPort: 8080,
},
GRPC: GRPC{
Enable: true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type Internal struct {
GOMAXPROCS int
BatchMaxSize uint32
OperationIDMaxSize uint32
EnablePPROF bool
PPROFPort uint16
}

func (i *Internal) Validate() error {
Expand All @@ -42,6 +44,10 @@ func (i *Internal) Validate() error {
return errors.New("BufferSize is too low")
}

if i.EnablePPROF && !(i.PPROFPort >= 1 && i.PPROFPort <= 65535) {
return errors.New("Invalid PPROFPort")
}

return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import (
"strings"
"time"

"github.com/hamba/avro/v2"
"github.com/patrickmn/go-cache"

"github.com/linkedin/goavro/v2"
"hopsworks.ai/rdrs/internal/dal"
"hopsworks.ai/rdrs/internal/log"
)
Expand All @@ -51,12 +51,12 @@ type FeatureViewMetadata struct {
NumOfFeatures int
FeatureIndexLookup map[string]int // key: joinIndex + fgId + fName, label are excluded. joinIndex is needed because of self-join
// serving key doc: https://hopsworks.atlassian.net/wiki/spaces/FST/pages/173342721/How+to+resolve+the+set+of+serving+key+in+get+feature+vector
PrimaryKeyMap map[string]*dal.ServingKey // key: join index + feature name. Used for constructing rondb request.
ValidPrimaryKeys map[string]bool // key: serving-key-prefix + fName, fName. Used for pk validation.
PrefixJoinKeyMap map[string][]string // key: serving-key-prefix + fName, value: list of feature which join on the key. Used for filling in pk value.
JoinKeyMap map[string][]string // key: fName, value: list of feature which join on the key. Used for filling in pk value.
RequiredJoinKeyMap map[string][]string // key: serving-key-prefix + fName, value: list of feature which join on the key. Used for filling in pk value.
ComplexFeatures map[string]*AvroDecoder // key: joinIndex + fgId + fName, label are excluded. joinIndex is needed because of self-join
PrimaryKeyMap map[string]*dal.ServingKey // key: join index + feature name. Used for constructing rondb request.
ValidPrimaryKeys map[string]bool // key: serving-key-prefix + fName, fName. Used for pk validation.
PrefixJoinKeyMap map[string][]string // key: serving-key-prefix + fName, value: list of feature which join on the key. Used for filling in pk value.
JoinKeyMap map[string][]string // key: fName, value: list of feature which join on the key. Used for filling in pk value.
RequiredJoinKeyMap map[string][]string // key: serving-key-prefix + fName, value: list of feature which join on the key. Used for filling in pk value.
ComplexFeatures map[string]*avro.Schema // key: joinIndex + fgId + fName, label are excluded. joinIndex is needed because of self-join
}

type FeatureGroupFeatures struct {
Expand All @@ -71,29 +71,17 @@ type FeatureGroupFeatures struct {
}

type FeatureMetadata struct {
FeatureStoreName string
FeatureGroupName string
FeatureGroupVersion int
FeatureGroupId int
Id int
Name string
Type string
Index int
Label bool
Prefix string
JoinIndex int
}

type AvroDecoder struct {
Codec *goavro.Codec
}

func (ad *AvroDecoder) Decode(in []byte) (interface{}, error) {
native, _, err := ad.Codec.NativeFromBinary(in)
if err != nil {
return nil, err
}
return native, nil
FeatureStoreName string
FeatureGroupName string
FeatureGroupVersion int
FeatureGroupId int
Id int
Name string
Type string
Index int
Label bool
Prefix string
JoinIndex int
}

var COMPLEX_FEATURE = map[string]bool{
Expand Down Expand Up @@ -196,7 +184,7 @@ func newFeatureViewMetadata(
featureCount++
}

var complexFeatures = make(map[string]*AvroDecoder)
var complexFeatures = make(map[string]*avro.Schema)
var fgSchemaCache = make(map[int]*dal.FeatureGroupAvroSchema)
for _, fgFeature := range fgFeaturesArray {
for _, feature := range fgFeature.Features {
Expand All @@ -217,16 +205,16 @@ func newFeatureViewMetadata(
}
fgSchemaCache[feature.FeatureGroupId] = newFgSchema
}
schema, err := fgSchemaCache[feature.FeatureGroupId].GetSchemaByFeatureName(feature.Name)
schemaStr, err := fgSchemaCache[feature.FeatureGroupId].GetSchemaByFeatureName(feature.Name)
if err != nil {
return nil, errors.New("Failed to get feature schema for feature: " + feature.Name)
}
codec, err := goavro.NewCodec(string(schema))
schema, err := avro.Parse(string(schemaStr))
if err != nil {
return nil, errors.New("Failed to parse feature schema.")
}
featureIndexKey := GetFeatureIndexKeyByFeature(feature)
complexFeatures[featureIndexKey] = &AvroDecoder{codec}
complexFeatures[featureIndexKey] = &schema
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,8 @@ func GetFeatureValues(ronDbResult *[]*api.PKReadResponseWithCodeJSON, entries *m
featureIndexKey := feature_store.GetFeatureIndexKeyByFgIndexKey(*response.Body.OperationID, featureName)
// When only primary key is selected, Rondb will return all columns, so not all value from response are needed.
if index, ok := (featureView.FeatureIndexLookup)[featureIndexKey]; ok {
if decoder, ok := (featureView.ComplexFeatures)[featureIndexKey]; ok {
var deser, err1 = DeserialiseComplexFeature(value, decoder)
if schema, ok := (featureView.ComplexFeatures)[featureIndexKey]; ok {
var deser, err1 = DeserialiseComplexFeature(value, schema)
if err1 != nil {
status = api.FEATURE_STATUS_ERROR
err = feature_store.DESERIALISE_FEATURE_FAIL.NewMessage(fmt.Sprintf("Feature name: %s; %s", featureName, err1.Error()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ package feature_store
import (
"encoding/base64"
"encoding/json"
"fmt"
"strings"

"hopsworks.ai/rdrs/internal/feature_store"
"github.com/hamba/avro/v2"
"hopsworks.ai/rdrs/internal/log"
)

func DeserialiseComplexFeature(value *json.RawMessage, decoder *feature_store.AvroDecoder) (*interface{}, error) {
var valueString string
err := json.Unmarshal(*value, &valueString)
func DeserialiseComplexFeature(value *json.RawMessage, schema *avro.Schema) (*interface{}, error) {
valueString, err := decodeJSONString(value)
if err != nil {
if log.IsDebug() {
log.Debugf("Failed to unmarshal. Value: %s", valueString)
Expand All @@ -25,11 +25,32 @@ func DeserialiseComplexFeature(value *json.RawMessage, decoder *feature_store.Av
}
return nil, err
}
native, err := decoder.Decode(jsonDecode)
nativeJson := ConvertAvroToJson(native)
var avroDeserialized interface{}
err = avro.Unmarshal(*schema, jsonDecode, &avroDeserialized)
if err != nil {
if log.IsDebug() {
log.Debugf("Failed to deserialize avro")
}
return nil, err
}
nativeJson := ConvertAvroToJson(avroDeserialized)
return &nativeJson, err
}

func decodeJSONString(raw *json.RawMessage) (string, error) {
// Convert the raw message to a string
rawStr := string(*raw)
// Check that the first and last characters are quotes
if len(rawStr) < 2 || rawStr[0] != '"' || rawStr[len(rawStr)-1] != '"' {
return "", fmt.Errorf("invalid JSON string format")
}
// Remove the surrounding quotes
unquotedStr := rawStr[1 : len(rawStr)-1]
// Replace escape sequences with their actual characters
decodedStr := strings.ReplaceAll(unquotedStr, `\"`, `"`)
return decodedStr, nil
}

func ConvertAvroToJson(o interface{}) interface{} {
var out interface{}
switch o.(type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"strconv"
"testing"

"github.com/linkedin/goavro/v2"
"github.com/hamba/avro/v2"
fsmetadata "hopsworks.ai/rdrs/internal/feature_store"
"hopsworks.ai/rdrs/internal/handlers/feature_store"
fshelper "hopsworks.ai/rdrs/internal/integrationtests/feature_store"
Expand Down Expand Up @@ -1460,20 +1460,15 @@ func Test_GetFeatureVector_Success_ComplexType(t *testing.T) {
if err != nil {
t.Fatalf("Cannot get sample data with error %s ", err)
}
mapCodec, err := goavro.NewCodec(`["null",{"type":"record","name":"r854762204","namespace":"struct","fields":[{"name":"int1","type":["null","long"]},{"name":"int2","type":["null","long"]}]}]`)
mapSchema, err := avro.Parse(`["null",{"type":"record","name":"r854762204","namespace":"struct","fields":[{"name":"int1","type":["null","long"]},{"name":"int2","type":["null","long"]}]}]`)
if err != nil {
t.Fatal(err.Error())
}
arrayCodec, err := goavro.NewCodec(`["null",{"type":"array","items":["null","long"]}]`)
arraySchema, err := avro.Parse(`["null",{"type":"array","items":["null","long"]}]`)
if err != nil {
t.Fatal(err.Error())
}
mapDecoder := fsmetadata.AvroDecoder{Codec: mapCodec}
arrayDecoder := fsmetadata.AvroDecoder{Codec: arrayCodec}

if err != nil {
t.Fatal(err.Error())
}
var fsReq = CreateFeatureStoreRequest(
fsName,
fvName,
Expand All @@ -1491,7 +1486,7 @@ func Test_GetFeatureVector_Success_ComplexType(t *testing.T) {
if err != nil {
t.Fatalf("Cannot convert to json with error %s ", err)
}
arrayPt, err := feature_store.DeserialiseComplexFeature(arrayJson, &arrayDecoder) // array
arrayPt, err := feature_store.DeserialiseComplexFeature(arrayJson, &arraySchema) // array
row[2] = *arrayPt
if err != nil {
t.Fatalf("Cannot deserailize feature with error %s ", err)
Expand All @@ -1501,7 +1496,7 @@ func Test_GetFeatureVector_Success_ComplexType(t *testing.T) {
if err != nil {
t.Fatalf("Cannot convert to json with error %s ", err)
}
mapPt, err := feature_store.DeserialiseComplexFeature(mapJson, &mapDecoder) // map
mapPt, err := feature_store.DeserialiseComplexFeature(mapJson, &mapSchema) // map
row[3] = *mapPt
if err != nil {
t.Fatalf("Cannot deserailize feature with error %s ", err)
Expand Down Expand Up @@ -2349,7 +2344,7 @@ func Test_IncludeDetailedStatus_JoinedTablePartialKey(t *testing.T) {
t.Fatalf("Detailed status should have two entries")
}
for _, ds := range list_ds {
if (ds.FeatureGroupId ==2076 && ds.HttpStatus != http.StatusOK) || (ds.FeatureGroupId ==2069 && ds.HttpStatus != http.StatusBadRequest) {
if (ds.FeatureGroupId == 2076 && ds.HttpStatus != http.StatusOK) || (ds.FeatureGroupId == 2069 && ds.HttpStatus != http.StatusBadRequest) {
t.Fatalf("HttpStatus should be 200 or 400")
}
if ds.FeatureGroupId == -1 {
Expand Down Expand Up @@ -2381,7 +2376,7 @@ func Test_IncludeDetailedStatus_JoinedTablePartialKeyAndMissingRow(t *testing.T)
t.Fatalf("Detailed status should have two entries")
}
for _, ds := range list_ds {
if (ds.FeatureGroupId ==2069 && ds.HttpStatus != http.StatusNotFound) || (ds.FeatureGroupId ==2076 && ds.HttpStatus != http.StatusBadRequest) {
if (ds.FeatureGroupId == 2069 && ds.HttpStatus != http.StatusNotFound) || (ds.FeatureGroupId == 2076 && ds.HttpStatus != http.StatusBadRequest) {
t.Fatalf("HttpStatus should be 404 or 400")
}
if ds.FeatureGroupId == -1 {
Expand Down
Loading

0 comments on commit dd13385

Please sign in to comment.