Skip to content

Commit

Permalink
Merge pull request #1 from shivamsanju/feature-notebook-lineage
Browse files Browse the repository at this point in the history
added documentation for using callback function in client.py
  • Loading branch information
chinmaytredence authored Jun 27, 2022
2 parents 493eff1 + e20914b commit 4394440
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 0 deletions.
40 changes: 40 additions & 0 deletions docs/how-to-guides/client-callback-function.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
---
layout: default
title: How to use callback function in feathr client
parent: Feathr How-to Guides
---

## What is a callback function

A callback function is a function that is sent to another function as an argument. It can be used to extend the function as per the user needs.

## How to use callback functions

Currently the below functions in feathr client support passing a callback as an argument:

- get_online_features
- multi_get_online_features
- get_offline_features
- monitor_features
- materialize_features

These functions accept two optional parameters named **callback** and **params**.
callback is of type function and params is a dictionary where user can pass the arguments for the callback function.

An example on how to use it:

```python
# inside notebook
client = FeathrClient(config_path)
client.get_offline_features(observation_settings,feature_query,output_path, callback, params)

# users can define their own callback function and params
params = {"param1":"value1", "param2":"value2"}

async def callback(params):
import httpx
async with httpx.AsyncClient() as requestHandler:
response = await requestHandler.post('https://some-endpoint', json = params)
return response

```
124 changes: 124 additions & 0 deletions feathr_project/test/test_client_callback.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import os
import asyncio
import unittest.mock as mock
import time
from subprocess import call
from datetime import datetime, timedelta

from pathlib import Path
from feathr import ValueType
from feathr import FeatureQuery
from feathr import ObservationSettings
from feathr import TypedKey
from test_fixture import basic_test_setup
from test_fixture import get_online_test_table_name
from feathr.definition._materialization_utils import _to_materialization_config
from feathr import (BackfillTime, MaterializationSettings)
from feathr import (BackfillTime, MaterializationSettings, FeatureQuery,
ObservationSettings, SparkExecutionConfiguration)
from feathr import RedisSink, HdfsSink


params = {"wait" : 0.1}
async def sample_callback(params):
print(params)
await asyncio.sleep(0.1)

callback = mock.MagicMock(return_value=sample_callback(params))

def test_client_callback_offline_feature():
test_workspace_dir = Path(__file__).parent.resolve() / "test_user_workspace"
client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml"))

location_id = TypedKey(key_column="DOLocationID",
key_column_type=ValueType.INT32,
description="location id in NYC",
full_name="nyc_taxi.location_id")
feature_query = FeatureQuery(feature_list=["f_location_avg_fare"], key=location_id)

settings = ObservationSettings(
observation_path="wasbs://[email protected]/sample_data/green_tripdata_2020-04.csv",
event_timestamp_column="lpep_dropoff_datetime",
timestamp_format="yyyy-MM-dd HH:mm:ss")

now = datetime.now()
output_path = ''.join(['dbfs:/feathrazure_cijob','_', str(now.minute), '_', str(now.second), ".avro"])

res = client.get_offline_features(observation_settings=settings,
feature_query=feature_query,
output_path=output_path,
callback=callback,
params=params)
callback.assert_called_with(params)


def test_client_callback_materialization():
online_test_table = get_online_test_table_name("nycTaxiCITable")
test_workspace_dir = Path(__file__).parent.resolve() / "test_user_workspace"

client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml"))
backfill_time = BackfillTime(start=datetime(2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1))
redisSink = RedisSink(table_name=online_test_table)
settings = MaterializationSettings("nycTaxiTable",
sinks=[redisSink],
feature_names=[
"f_location_avg_fare", "f_location_max_fare"],
backfill_time=backfill_time)
client.materialize_features(settings, callback=callback, params=params)
callback.assert_called_with(params)

def test_client_callback_monitor_features():
online_test_table = get_online_test_table_name("nycTaxiCITable")
test_workspace_dir = Path(__file__).parent.resolve() / "test_user_workspace"

client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml"))
backfill_time = BackfillTime(start=datetime(2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1))
redisSink = RedisSink(table_name=online_test_table)
settings = MaterializationSettings("nycTaxiTable",
sinks=[redisSink],
feature_names=[
"f_location_avg_fare", "f_location_max_fare"],
backfill_time=backfill_time)
client.monitor_features(settings, callback=callback, params=params)
callback.assert_called_with(params)

def test_client_callback_get_online_features():
online_test_table = get_online_test_table_name("nycTaxiCITable")
test_workspace_dir = Path(__file__).parent.resolve() / "test_user_workspace"

client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml"))
backfill_time = BackfillTime(start=datetime(2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1))
redisSink = RedisSink(table_name=online_test_table)
settings = MaterializationSettings("nycTaxiTable",
sinks=[redisSink],
feature_names=[
"f_location_avg_fare", "f_location_max_fare"],
backfill_time=backfill_time)
client.materialize_features(settings)
callback.assert_called_with(params)
client.wait_job_to_finish(timeout_sec=900)
# wait for a few secs for the data to come in redis
time.sleep(5)
client.get_online_features('nycTaxiDemoFeature', '265', ['f_location_avg_fare', 'f_location_max_fare'], callback=callback, params=params)
callback.assert_called_with(params)


def test_client_callback_multi_get_online_features():
online_test_table = get_online_test_table_name("nycTaxiCITable")
test_workspace_dir = Path(__file__).parent.resolve() / "test_user_workspace"

client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml"))
backfill_time = BackfillTime(start=datetime(2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1))
redisSink = RedisSink(table_name=online_test_table)
settings = MaterializationSettings("nycTaxiTable",
sinks=[redisSink],
feature_names=[
"f_location_avg_fare", "f_location_max_fare"],
backfill_time=backfill_time)
client.materialize_features(settings)
callback.assert_called_with(params)
client.wait_job_to_finish(timeout_sec=900)
# wait for a few secs for the data to come in redis
time.sleep(5)
client.multi_get_online_features('nycTaxiDemoFeature', ["239", "265"], ['f_location_avg_fare', 'f_location_max_fare'], callback=callback, params=params)
callback.assert_called_with(params)

0 comments on commit 4394440

Please sign in to comment.