Skip to content

Commit

Permalink
Merge branch 'feast-dev:master' into remote_offline
Browse files Browse the repository at this point in the history
  • Loading branch information
redhatHameed authored May 31, 2024
2 parents 853f1e2 + e514f66 commit f3f44c3
Show file tree
Hide file tree
Showing 11 changed files with 172 additions and 495 deletions.
19 changes: 8 additions & 11 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,16 @@ test-python-unit:
python -m pytest -n 8 --color=yes sdk/python/tests

test-python-integration:
python -m pytest -n 8 --integration -k "(not snowflake or not test_historical_features_main) and not minio_registry" --color=yes --durations=5 --timeout=1200 --timeout_method=thread sdk/python/tests
python -m pytest -n 8 --integration --color=yes --durations=10 --timeout=1200 --timeout_method=thread \
-k "(not snowflake or not test_historical_features_main)" \
sdk/python/tests

test-python-integration-local:
@(docker info > /dev/null 2>&1 && \
FEAST_IS_LOCAL_TEST=True \
FEAST_LOCAL_ONLINE_CONTAINER=True \
python -m pytest -n 8 --color=yes --integration \
-k "not gcs_registry and \
not s3_registry and \
not test_lambda_materialization and \
not test_snowflake_materialization" \
sdk/python/tests \
) || echo "This script uses Docker, and it isn't running - please start the Docker Daemon and try again!";
FEAST_IS_LOCAL_TEST=True \
FEAST_LOCAL_ONLINE_CONTAINER=True \
python -m pytest -n 8 --color=yes --integration --durations=5 --dist loadgroup \
-k "not test_lambda_materialization and not test_snowflake_materialization" \
sdk/python/tests

test-python-integration-container:
@(docker info > /dev/null 2>&1 && \
Expand Down
38 changes: 23 additions & 15 deletions java/serving-client/src/main/java/dev/feast/FeastClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import feast.proto.serving.ServingServiceGrpc.ServingServiceBlockingStub;
import feast.proto.types.ValueProto;
import io.grpc.CallCredentials;
import io.grpc.Deadline;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
Expand All @@ -50,6 +49,7 @@ public class FeastClient implements AutoCloseable {

private final ManagedChannel channel;
private final ServingServiceBlockingStub stub;
private final long requestTimeout;

/**
* Create a client to access Feast Serving.
Expand All @@ -68,12 +68,14 @@ public static FeastClient create(String host, int port) {
*
* @param host hostname or ip address of Feast serving GRPC server
* @param port port number of Feast serving GRPC server
* @param deadline GRPC deadline of Feast serving GRPC server {@link Deadline}
* @param requestTimeout maximum duration for online retrievals from the GRPC server in
* milliseconds, use 0 for no timeout
* @return {@link FeastClient}
*/
public static FeastClient create(String host, int port, Deadline deadline) {
public static FeastClient create(String host, int port, long requestTimeout) {
// configure client with no security config.
return FeastClient.createSecure(host, port, SecurityConfig.newBuilder().build(), deadline);
return FeastClient.createSecure(
host, port, SecurityConfig.newBuilder().build(), requestTimeout);
}

/**
Expand All @@ -86,7 +88,7 @@ public static FeastClient create(String host, int port, Deadline deadline) {
* @return {@link FeastClient}
*/
public static FeastClient createSecure(String host, int port, SecurityConfig securityConfig) {
return createSecure(host, port, securityConfig, null);
return FeastClient.createSecure(host, port, securityConfig, 0);
}

/**
Expand All @@ -96,11 +98,17 @@ public static FeastClient createSecure(String host, int port, SecurityConfig sec
* @param port port number of Feast serving GRPC server
* @param securityConfig security options to configure the Feast client. See {@link
* SecurityConfig} for options.
* @param deadline GRPC deadline of Feast serving GRPC server {@link Deadline}
* @param requestTimeout maximum duration for online retrievals from the GRPC server in
* milliseconds
* @return {@link FeastClient}
*/
public static FeastClient createSecure(
String host, int port, SecurityConfig securityConfig, Deadline deadline) {
String host, int port, SecurityConfig securityConfig, long requestTimeout) {

if (requestTimeout < 0) {
throw new IllegalArgumentException("Request timeout can't be negative");
}

// Configure client TLS
ManagedChannel channel = null;
if (securityConfig.isTLSEnabled()) {
Expand All @@ -127,7 +135,7 @@ public static FeastClient createSecure(
channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
}

return new FeastClient(channel, securityConfig.getCredentials(), Optional.ofNullable(deadline));
return new FeastClient(channel, securityConfig.getCredentials(), requestTimeout);
}

/**
Expand Down Expand Up @@ -158,7 +166,10 @@ public List<Row> getOnlineFeatures(List<String> featureRefs, List<Row> entities)

requestBuilder.putAllEntities(getEntityValuesMap(entities));

GetOnlineFeaturesResponse response = stub.getOnlineFeatures(requestBuilder.build());
ServingServiceGrpc.ServingServiceBlockingStub timedStub =
requestTimeout != 0 ? stub.withDeadlineAfter(requestTimeout, TimeUnit.MILLISECONDS) : stub;

GetOnlineFeaturesResponse response = timedStub.getOnlineFeatures(requestBuilder.build());

List<Row> results = Lists.newArrayList();
if (response.getResultsCount() == 0) {
Expand Down Expand Up @@ -231,12 +242,13 @@ public List<Row> getOnlineFeatures(List<String> featureRefs, List<Row> rows, Str
}

protected FeastClient(ManagedChannel channel, Optional<CallCredentials> credentials) {
this(channel, credentials, Optional.empty());
this(channel, credentials, 0);
}

protected FeastClient(
ManagedChannel channel, Optional<CallCredentials> credentials, Optional<Deadline> deadline) {
ManagedChannel channel, Optional<CallCredentials> credentials, long requestTimeout) {
this.channel = channel;
this.requestTimeout = requestTimeout;
TracingClientInterceptor tracingInterceptor =
TracingClientInterceptor.newBuilder().withTracer(GlobalTracer.get()).build();

Expand All @@ -247,10 +259,6 @@ protected FeastClient(
servingStub = servingStub.withCallCredentials(credentials.get());
}

if (deadline.isPresent()) {
servingStub = servingStub.withDeadline(deadline.get());
}

this.stub = servingStub;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,14 @@
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

public class FeastClientTest {
private final String AUTH_TOKEN = "test token";
private final Deadline DEADLINE = Deadline.after(2, TimeUnit.SECONDS);
private final long TIMEOUT_MILLIS = 300;

@Rule public GrpcCleanupRule grpcRule;
private AtomicBoolean isAuthenticated;
Expand Down Expand Up @@ -88,7 +87,7 @@ public void setup() throws Exception {
ManagedChannel channel =
this.grpcRule.register(
InProcessChannelBuilder.forName(serverName).directExecutor().build());
this.client = new FeastClient(channel, Optional.empty(), Optional.of(DEADLINE));
this.client = new FeastClient(channel, Optional.empty(), TIMEOUT_MILLIS);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ def __init__(self, project_name: str, *args, **kwargs):
self.minio = MinioContainer()
self.minio.start()
client = self.minio.get_client()
client.make_bucket("test")
if not client.bucket_exists("test"):
client.make_bucket("test")
host_ip = self.minio.get_container_host_ip()
exposed_port = self.minio.get_exposed_port(self.minio.port)
self.endpoint_url = f"http://{host_ip}:{exposed_port}"
Expand Down
167 changes: 1 addition & 166 deletions sdk/python/tests/integration/registration/test_feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,68 +11,21 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
from datetime import timedelta
from tempfile import mkstemp

import pytest
from pytest_lazyfixture import lazy_fixture

from feast import FileSource
from feast.data_format import ParquetFormat
from feast.entity import Entity
from feast.feature_store import FeatureStore
from feast.feature_view import FeatureView
from feast.field import Field
from feast.infra.offline_stores.file import FileOfflineStoreConfig
from feast.infra.online_stores.dynamodb import DynamoDBOnlineStoreConfig
from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig
from feast.repo_config import RepoConfig
from feast.types import Array, Bytes, Float64, Int64, String
from feast.types import Float64, Int64, String
from tests.utils.data_source_test_creator import prep_file_source


@pytest.mark.integration
@pytest.mark.parametrize(
"test_feature_store",
[
lazy_fixture("feature_store_with_gcs_registry"),
lazy_fixture("feature_store_with_s3_registry"),
],
)
def test_apply_entity_integration(test_feature_store):
entity = Entity(
name="driver_car_id",
description="Car driver id",
tags={"team": "matchmaking"},
)

# Register Entity
test_feature_store.apply([entity])

entities = test_feature_store.list_entities()

entity = entities[0]
assert (
len(entities) == 1
and entity.name == "driver_car_id"
and entity.description == "Car driver id"
and "team" in entity.tags
and entity.tags["team"] == "matchmaking"
)

entity = test_feature_store.get_entity("driver_car_id")
assert (
entity.name == "driver_car_id"
and entity.description == "Car driver id"
and "team" in entity.tags
and entity.tags["team"] == "matchmaking"
)

test_feature_store.teardown()


@pytest.mark.integration
@pytest.mark.parametrize(
"test_feature_store",
Expand Down Expand Up @@ -109,81 +62,6 @@ def test_feature_view_inference_success(test_feature_store, dataframe_source):
test_feature_store.teardown()


@pytest.mark.integration
@pytest.mark.parametrize(
"test_feature_store",
[
lazy_fixture("feature_store_with_gcs_registry"),
lazy_fixture("feature_store_with_s3_registry"),
],
)
def test_apply_feature_view_integration(test_feature_store):
# Create Feature Views
batch_source = FileSource(
file_format=ParquetFormat(),
path="file://feast/*",
timestamp_field="ts_col",
created_timestamp_column="timestamp",
)

entity = Entity(name="fs1_my_entity_1", join_keys=["test"])

fv1 = FeatureView(
name="my_feature_view_1",
schema=[
Field(name="fs1_my_feature_1", dtype=Int64),
Field(name="fs1_my_feature_2", dtype=String),
Field(name="fs1_my_feature_3", dtype=Array(String)),
Field(name="fs1_my_feature_4", dtype=Array(Bytes)),
Field(name="test", dtype=Int64),
],
entities=[entity],
tags={"team": "matchmaking"},
source=batch_source,
ttl=timedelta(minutes=5),
)

# Register Feature View
test_feature_store.apply([fv1, entity])

feature_views = test_feature_store.list_feature_views()

# List Feature Views
assert (
len(feature_views) == 1
and feature_views[0].name == "my_feature_view_1"
and feature_views[0].features[0].name == "fs1_my_feature_1"
and feature_views[0].features[0].dtype == Int64
and feature_views[0].features[1].name == "fs1_my_feature_2"
and feature_views[0].features[1].dtype == String
and feature_views[0].features[2].name == "fs1_my_feature_3"
and feature_views[0].features[2].dtype == Array(String)
and feature_views[0].features[3].name == "fs1_my_feature_4"
and feature_views[0].features[3].dtype == Array(Bytes)
and feature_views[0].entities[0] == "fs1_my_entity_1"
)

feature_view = test_feature_store.get_feature_view("my_feature_view_1")
assert (
feature_view.name == "my_feature_view_1"
and feature_view.features[0].name == "fs1_my_feature_1"
and feature_view.features[0].dtype == Int64
and feature_view.features[1].name == "fs1_my_feature_2"
and feature_view.features[1].dtype == String
and feature_view.features[2].name == "fs1_my_feature_3"
and feature_view.features[2].dtype == Array(String)
and feature_view.features[3].name == "fs1_my_feature_4"
and feature_view.features[3].dtype == Array(Bytes)
and feature_view.entities[0] == "fs1_my_entity_1"
)

test_feature_store.delete_feature_view("my_feature_view_1")
feature_views = test_feature_store.list_feature_views()
assert len(feature_views) == 0

test_feature_store.teardown()


@pytest.fixture
def feature_store_with_local_registry():
fd, registry_path = mkstemp()
Expand All @@ -197,46 +75,3 @@ def feature_store_with_local_registry():
entity_key_serialization_version=2,
)
)


@pytest.fixture
def feature_store_with_gcs_registry():
from google.cloud import storage

storage_client = storage.Client()
bucket_name = f"feast-registry-test-{int(time.time() * 1000)}"
bucket = storage_client.bucket(bucket_name)
bucket = storage_client.create_bucket(bucket)
bucket.add_lifecycle_delete_rule(
age=14
) # delete buckets automatically after 14 days
bucket.patch()
bucket.blob("registry.db")

return FeatureStore(
config=RepoConfig(
registry=f"gs://{bucket_name}/registry.db",
project="default",
provider="gcp",
entity_key_serialization_version=2,
)
)


@pytest.fixture
def feature_store_with_s3_registry():
aws_registry_path = os.getenv(
"AWS_REGISTRY_PATH", "s3://feast-int-bucket/registries"
)
return FeatureStore(
config=RepoConfig(
registry=f"{aws_registry_path}/{int(time.time() * 1000)}/registry.db",
project="default",
provider="aws",
online_store=DynamoDBOnlineStoreConfig(
region=os.getenv("AWS_REGION", "us-west-2")
),
offline_store=FileOfflineStoreConfig(),
entity_key_serialization_version=2,
)
)
Loading

0 comments on commit f3f44c3

Please sign in to comment.