Skip to content

Commit

Permalink
Merge pull request #12 from amit-elbaz/development
Browse files Browse the repository at this point in the history
Added a function for preprocessing the inference data
  • Loading branch information
aviaIguazio authored Feb 1, 2024
2 parents 10f7ee7 + d31418c commit abde3f5
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 20 deletions.
70 changes: 50 additions & 20 deletions financial_payment_classification_v2.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@
"from sklearn.metrics import classification_report\n",
"from sagemaker.feature_store.feature_group import FeatureGroup\n",
"import pandas as pd\n",
"import numpy as np"
"import numpy as np\n",
"from datetime import datetime, timedelta"
]
},
{
Expand Down Expand Up @@ -482,23 +483,6 @@
"Next, we extract the year, month, day, hour, minute, second from the timestamp and remove the timestamp"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "24f6090e",
"metadata": {},
"outputs": [],
"source": [
"data[\"year\"] = data[\"timestamp\"].dt.year\n",
"data[\"month\"] = data[\"timestamp\"].dt.month\n",
"data[\"day\"] = data[\"timestamp\"].dt.day\n",
"data[\"hour\"] = data[\"timestamp\"].dt.hour\n",
"data[\"minute\"] = data[\"timestamp\"].dt.minute\n",
"data[\"second\"] = data[\"timestamp\"].dt.second\n",
"\n",
"del data[\"timestamp\"]"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand Down Expand Up @@ -647,7 +631,53 @@
" del df_merged[\"timestamp\"]\n",
" del df_merged[\"date\"] \n",
" \n",
" return df_merged"
" return df_merged\n",
"\n",
"\n",
"\n",
"\n",
"# Function that updates the timestamps so each transaction category has rows with timestamps from the last 5 days (2 per day)\n",
"def update_timestamps(data):\n",
"\n",
" # Get today's date\n",
" today = datetime.today()\n",
"\n",
" # Calculate the dates for the last 5 days\n",
" last_5_days = [today - timedelta(days=i) for i in range(4, -1, -1)] # Reverse for chronological order\n",
"\n",
" # Extract year, month, and day from each date object\n",
" years = [d.year for d in last_5_days]\n",
" months = [d.month for d in last_5_days]\n",
" days = [d.day for d in last_5_days]\n",
"\n",
" hours = [10, 15]\n",
"\n",
" # Create a list of timestamps of the last 5 days, 2 timestamps per day.\n",
" times = []\n",
" for year, month, day in zip(years, months, days):\n",
" for hour in hours:\n",
" times.append(datetime(year, month, day, hour))\n",
"\n",
"\n",
" # Iterate over each transaction category\n",
" for i in range(len(data[\"transaction_category\"].unique())):\n",
" # Extract all the rows for each category\n",
" category_data = data[data['transaction_category'] == str(i)]\n",
"\n",
" # Ensure timestamp is a datetime object\n",
" category_data['timestamp'] = pd.to_datetime(category_data['timestamp'])\n",
"\n",
" # Sort DataFrame by timestamp in descending order\n",
" category_data_sorted = category_data.sort_values(by='timestamp', ascending=False)\n",
"\n",
" # Select the latest rows and update their timestamp\n",
" latest_rows = category_data_sorted.head(len(times))\n",
" latest_rows['timestamp'] = times\n",
"\n",
" # Update the initial dataframe to include those updated rows\n",
" data.update(latest_rows)\n",
" \n",
" return data"
]
},
{
Expand Down Expand Up @@ -739,7 +769,7 @@
"# setting up the graph\n",
"# setting up the graph\n",
"extended_transactions_set.graph \\\n",
" .to(name='create_aggregations', handler='create_aggregations').to(name=\"add_grouped_features\", handler=\"add_grouped_features\")\n",
" .to(name='update_timestamps', handler='update_timestamps').to(name='create_aggregations', handler='create_aggregations').to(name=\"add_grouped_features\", handler=\"add_grouped_features\")\n",
" # Add aggregations for 2, 12, and 24 hour time windows\n",
" \n",
" \n",
Expand Down
34 changes: 34 additions & 0 deletions src/functions/serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import mlrun
import numpy as np
import pandas as pd
import xgboost as xgb
from cloudpickle import load

Expand Down Expand Up @@ -50,6 +51,39 @@ def _set_model_path(self):

# set model path:
self.model_path = model_path

# Function that preprocesses the inference data
def preprocess(data: pd.Dataframe):
unique_categories = data.transaction_category.unique()
# Create a feature vector that gets the average amount
vector = fstore.FeatureVector("transactions_vector", ["aggregations.amount_avg_1d"], with_indexes=True)

# Use online feature service to get the latest average amount per category
with vector.get_online_feature_service() as online_feature_service:
resp = online_feature_service.get(
[{"transaction_category":cat} for cat in unique_categories]
)

for cat in resp:
transaction_category = cat['transaction_category']
amount_avg = cat['amount_avg_1d']
data["dist_" + transaction_category] = abs(amount_avg - data["amount"])

# convert timestamp to components
data["year"] = data["timestamp"].dt.year
data["month"] = data["timestamp"].dt.month
data["day"] = data["timestamp"].dt.day
data["hour"] = data["timestamp"].dt.hour
data["minute"] = data["timestamp"].dt.minute
data["second"] = data["timestamp"].dt.second

del data["timestamp"]
del data["transaction_category"]

return data





def postprocess(inputs: dict) -> dict:
Expand Down

0 comments on commit abde3f5

Please sign in to comment.