Skip to content

Commit

Permalink
fix: Set the transformation service endpoint defintion to feature_sto…
Browse files Browse the repository at this point in the history
…re.yaml file instead of OS env.

Signed-off-by: Shuchu Han <[email protected]>
  • Loading branch information
shuchu committed Nov 25, 2024
1 parent 953d62e commit 63dcefa
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 6 deletions.
16 changes: 10 additions & 6 deletions go/internal/feast/featurestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ package feast
import (
"context"
"errors"
"os"

"fmt"
"github.com/apache/arrow/go/v17/arrow/memory"
//"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"

Expand Down Expand Up @@ -61,11 +60,16 @@ func NewFeatureStore(config *registry.RepoConfig, callback transformation.Transf
return nil, err
}

// Use a scalable transformation service like Python Transformation Service as the Python version of transformation service is
// better for data calculation than the Go version
transformationServerEndpoint := os.Getenv("TRANSFORM_SERVER_ENDPOINT")
// Use a scalable transformation service like Python Transformation Service.
// Assume the user will define the "transformation_service_endpoint" in the feature_store.yaml file
// under the "feature_server" section.
transformationServerEndpoint, ok := config.FeatureServer["transformation_service_endpoint"]
if !ok {
fmt.Println("Errors while reading transformation_service_endpoint info")
panic("No transformation service endpoint provided in the feature_store.yaml file.")
}

transformationService, _ := transformation.NewGrpcTransformationService(config, transformationServerEndpoint)
transformationService, _ := transformation.NewGrpcTransformationService(config, transformationServerEndpoint.(string))

return &FeatureStore{
config: config,
Expand Down
53 changes: 53 additions & 0 deletions go/internal/feast/featurestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,59 @@ func TestNewFeatureStore(t *testing.T) {
fs, err := NewFeatureStore(&config, nil)
assert.Nil(t, err)
assert.IsType(t, &onlinestore.RedisOnlineStore{}, fs.onlineStore)

t.Run("valid config", func(t *testing.T) {
config := &registry.RepoConfig{
Project: "feature_repo",
Registry: getRegistryPath(),
Provider: "local",
OnlineStore: map[string]interface{}{
"type": "redis",
},
FeatureServer: map[string]interface{}{
"transformation_service_endpoint": "localhost:50051",
},
}
fs, err := NewFeatureStore(config, nil)
assert.Nil(t, err)
assert.NotNil(t, fs)
assert.IsType(t, &onlinestore.RedisOnlineStore{}, fs.onlineStore)
assert.NotNil(t, fs.transformationService)
})

t.Run("missing transformation service endpoint", func(t *testing.T) {
config := &registry.RepoConfig{
Project: "feature_repo",
Registry: getRegistryPath(),
Provider: "local",
OnlineStore: map[string]interface{}{
"type": "redis",
},
}
defer func() {
if r := recover(); r == nil {
t.Errorf("The code did not panic")
}
}()
NewFeatureStore(config, nil)
})

t.Run("invalid online store config", func(t *testing.T) {
config := &registry.RepoConfig{
Project: "feature_repo",
Registry: getRegistryPath(),
Provider: "local",
OnlineStore: map[string]interface{}{
"type": "invalid_store",
},
FeatureServer: map[string]interface{}{
"transformation_service_endpoint": "localhost:50051",
},
}
fs, err := NewFeatureStore(config, nil)
assert.NotNil(t, err)
assert.Nil(t, fs)
})
}

func TestGetOnlineFeaturesRedis(t *testing.T) {
Expand Down

0 comments on commit 63dcefa

Please sign in to comment.