From e7be239a179028f5093ce0885a2fd3d887d31dfc Mon Sep 17 00:00:00 2001 From: Jun Ki Min <42475935+loomlike@users.noreply.github.com> Date: Sat, 3 Dec 2022 01:22:14 +0000 Subject: [PATCH] Add integration test. TODO: confirm expected behavior of the timestamp range and fix issues Signed-off-by: Jun Ki Min <42475935+loomlike@users.noreply.github.com> --- feathr_project/test/conftest.py | 17 ++- .../test/integration/test_settings.py | 142 +++++++++--------- 2 files changed, 84 insertions(+), 75 deletions(-) diff --git a/feathr_project/test/conftest.py b/feathr_project/test/conftest.py index 5e3a167b3..93f1071a1 100644 --- a/feathr_project/test/conftest.py +++ b/feathr_project/test/conftest.py @@ -29,14 +29,15 @@ def workspace_dir() -> str: return str(Path(__file__).parent.resolve().joinpath("test_user_workspace")) -@pytest.fixture -def mock_data_path(workspace_dir): - return str(Path(workspace_dir).joinpath( - "mockdata", - "feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net", - "demo_data", - "green_tripdata_2020-04.csv", - )) +# TODO we can use this later +# @pytest.fixture +# def mock_data_path(workspace_dir): +# return str(Path(workspace_dir).joinpath( +# "mockdata", +# "feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net", +# "demo_data", +# "green_tripdata_2020-04.csv", +# )) @pytest.fixture(scope="function") diff --git a/feathr_project/test/integration/test_settings.py b/feathr_project/test/integration/test_settings.py index 8f7118b74..8fc38fd3f 100644 --- a/feathr_project/test/integration/test_settings.py +++ b/feathr_project/test/integration/test_settings.py @@ -1,5 +1,8 @@ +from datetime import datetime, timedelta from pathlib import Path +from tempfile import TemporaryDirectory +import pandas as pd import pytest from feathr import ( @@ -15,40 +18,59 @@ from feathr.utils.job_utils import get_result_df +TIMESTAMP_COL = "timestamp" +KEY_COL = "location_id" +FEATURE_COL = "fare" + + +@pytest.fixture(scope="session") +def mock_df(): + """Mock data for testing. + """ + # TODO Currently we're using "today" since `useLatestFeatureData` uses now(). + # Once the behavior is changed to use the latest timestamp in the data, we can use fixed test data instead of creating new one everytime. + today = datetime.now().date() + date_range = list(pd.date_range(start=today-timedelta(days=4), end=today, freq="D")) + return pd.DataFrame({ + TIMESTAMP_COL: date_range + date_range, + KEY_COL: [1, 1, 1, 1, 1, 2, 2, 2, 2, 2], + FEATURE_COL: [5.5, 10.0, 6.0, 8.0, 2.5, 38.0, 12.0, 52.0, 180.0, 3.5], + }) + + @pytest.mark.integration -def test__observation_settings(feathr_client, mock_data_path): +def test__observation_settings(feathr_client, mock_df): + tmp_dir = TemporaryDirectory() - # TODO - mock_data_path = str(Path("mock_data.csv").resolve()) - # mock_distance_data_path = str(Path("mock_distance_data.csv").resolve()) + mock_data_path = str(Path(tmp_dir.name, "mock_data.csv")) + mock_df.to_csv(str(mock_data_path), index=False) # Upload data into dbfs or adls if feathr_client.spark_runtime != "local": mock_data_path = feathr_client.feathr_spark_launcher.upload_or_get_cloud_path(mock_data_path) - # mock_distance_data_path = feathr_client.feathr_spark_launcher.upload_or_get_cloud_path(mock_distance_data_path) # Define agg features source = HdfsSource( name="source", path=mock_data_path, - event_timestamp_column="lpep_pickup_datetime", - timestamp_format="yyyyMMdd", + event_timestamp_column=TIMESTAMP_COL, + timestamp_format="yyyy-MM-dd", # yyyy/MM/dd/HH/mm/ss ) - location_id_key = TypedKey( - key_column="DOLocationID", + key = TypedKey( + key_column=KEY_COL, key_column_type=ValueType.INT32, - description="location id key", - full_name="location_id_key", + description="key", + full_name=KEY_COL, ) agg_features = [ Feature( - name="f_location_max_fare", - key=location_id_key, + name=f"f_{FEATURE_COL}", + key=key, feature_type=FLOAT, transform=WindowAggTransformation( - agg_expr="cast_float(fare_amount)", + agg_expr=f"cast_float({FEATURE_COL})", agg_func="MAX", - window="20d", + window="2d", # 2 days sliding window ), ), ] @@ -58,75 +80,59 @@ def test__observation_settings(feathr_client, mock_data_path): features=agg_features, ) - # distance_source = HdfsSource( - # name="distance_source", - # path=mock_distance_data_path, - # event_timestamp_column="lpep_pickup_datetime", - # timestamp_format="yyyy-MM-dd HH:mm:ss", - # ) - # datetime_key = TypedKey( - # key_column="lpep_pickup_datetime", - # key_column_type=ValueType.INT64, - # description="datetime key", - # full_name="datetime_key", - # ) - # features = [ - # Feature( - # name="f_fare_amount", - # # name="f_trip_distance", - # # key=datetime_key, - # feature_type=FLOAT, - # transform="fare_amount", - # # transform="trip_distance", - # ), - # ] - # anchor = FeatureAnchor( - # name="anchor", - # source=INPUT_CONTEXT, - # # source=distance_source, - # features=features, - # ) - feathr_client.build_features( anchor_list=[ agg_anchor, - # anchor, ] ) query = [ FeatureQuery( feature_list=[ - "f_location_max_fare", - # "f_trip_distance", - # "f_fare_amount", + f"f_{FEATURE_COL}", ], - key=location_id_key, + key=key, ), ] - observation_time_range_values = [ - AbsoluteTimeRange( - start_time="20210102", - end_time="20210103", - time_format="yyyyMMdd", + test_parameters__expected_values = [ + ( + dict(event_timestamp_column=TIMESTAMP_COL), + # Max value by the sliding window '2d' + [5.5, 10., 10., 8., 8., 38., 38., 52., 180., 180.], + ), + ( + dict(use_latest_feature_data=True), + # The latest feature values: Time window is '2d' and thus the feature values for each key is 8.0 and 180.0 + [8.0, 8.0, 8.0, 8.0, 8.0, 180.0, 180.0, 180.0, 180.0, 180.0], + ), + ( + dict( + event_timestamp_column=TIMESTAMP_COL, + observation_time_range=RelativeTimeRange(offset="3d", window="2d"), + ), + # TODO BUG - RelativeTimeRange doesn't have any effect on the result + [5.5, 10., 10., 8., 8., 38., 38., 52., 180., 180.], + ), + ( + dict( + event_timestamp_column=TIMESTAMP_COL, + observation_time_range=AbsoluteTimeRange( + start_time=mock_df[TIMESTAMP_COL].max().date().isoformat(), + end_time=mock_df[TIMESTAMP_COL].max().date().isoformat(), + time_format="yyyy-MM-dd", + ), + ), + # TODO BUG - AbsoluteTimeRange doesn't have any effect on the result + [5.5, 10., 10., 8., 8., 38., 38., 52., 180., 180.], ), - # RelativeTimeRange(offset="10h", window="1d"), - # None, ] - # event_timestamp_column_values = [ - # "lpep_pickup_datetime", - # None, - # ] - - for obs_time_range in observation_time_range_values: + for test_params, expected_values in test_parameters__expected_values: settings = ObservationSettings( observation_path=mock_data_path, - event_timestamp_column="lpep_pickup_datetime",# TODOevent_timestamp_column, - timestamp_format="yyyyMMdd", # TODO check -- We only support yyyyMMdd format for this. In future, if there is a request, we can - # use_latest_feature_data=True, #use_latest_feature_data, - observation_time_range=obs_time_range, + timestamp_format="yyyy-MM-dd", + **test_params, ) output_path = str(Path(Path(mock_data_path).parent, "output.avro")) @@ -141,4 +147,6 @@ def test__observation_settings(feathr_client, mock_data_path): # download result and assert the returned result res_df = get_result_df(feathr_client) - print(res_df) #[["lpep_pickup_datetime", "f_location_max_fare", "f_trip_distance"]]) + res_df.sort_values(by=[KEY_COL, TIMESTAMP_COL], inplace=True) + assert res_df[f"f_{FEATURE_COL}"].tolist() == expected_values + # print(res_df)