diff --git a/src/functions/data-preparation.py b/src/functions/data-preparation.py index a6cff34..9251ae3 100644 --- a/src/functions/data-preparation.py +++ b/src/functions/data-preparation.py @@ -7,17 +7,16 @@ import time - - def data_prepare(context): # Set AWS environment variables: _set_envars(context) - region = sagemaker.Session().boto_region_name sm_client = boto3.client("sagemaker") boto_session = boto3.Session(region_name=region) - sagemaker_session = sagemaker.session.Session(boto_session=boto_session, sagemaker_client=sm_client) + sagemaker_session = sagemaker.session.Session( + boto_session=boto_session, sagemaker_client=sm_client + ) role = os.environ["SAGEMAKER_ROLE"] bucket_prefix = "payment-classification" s3_bucket = sagemaker_session.default_bucket() @@ -74,7 +73,9 @@ def data_prepare(context): feature_group_name = "feature-group-payment-classification" record_identifier_feature_name = "identifier" - feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=sagemaker_session) + feature_group = FeatureGroup( + name=feature_group_name, sagemaker_session=sagemaker_session + ) featurestore_runtime = boto_session.client( service_name="sagemaker-featurestore-runtime", region_name=region @@ -98,7 +99,7 @@ def data_prepare(context): status = feature_group.describe().get("FeatureGroupStatus") - if status!='Created': + if status != "Created": feature_group.create( s3_uri=f"s3://{s3_bucket}/{bucket_prefix}", record_identifier_name=record_identifier_feature_name, @@ -137,7 +138,9 @@ def get_feature_store_values(): ) feature_store_resp["identifier"] = feature_store_resp["identifier"].astype(int) feature_store_resp["count"] = feature_store_resp["count"].astype(int) - feature_store_resp["mean_amount"] = feature_store_resp["mean_amount"].astype(float) + feature_store_resp["mean_amount"] = feature_store_resp["mean_amount"].astype( + float + ) feature_store_resp["EventTime"] = feature_store_resp["EventTime"].astype(float) feature_store_resp = feature_store_resp.sort_values(by="identifier") @@ -146,8 +149,12 @@ def get_feature_store_values(): feature_store_resp = get_feature_store_values() feature_store_data = pd.DataFrame() - feature_store_data["mean_amount"] = data.groupby(["transaction_category"]).mean()["amount"] - feature_store_data["count"] = data.groupby(["transaction_category"]).count()["amount"] + feature_store_data["mean_amount"] = data.groupby(["transaction_category"]).mean()[ + "amount" + ] + feature_store_data["count"] = data.groupby(["transaction_category"]).count()[ + "amount" + ] feature_store_data["identifier"] = feature_store_data.index feature_store_data["EventTime"] = time.time() @@ -157,7 +164,9 @@ def get_feature_store_values(): .apply(lambda x: np.average(x["mean_amount"], weights=x["count"])) ) feature_store_data["count"] = ( - pd.concat([feature_store_resp, feature_store_data]).groupby("identifier").sum()["count"] + pd.concat([feature_store_resp, feature_store_data]) + .groupby("identifier") + .sum()["count"] ) feature_group.ingest(data_frame=feature_store_data, max_workers=3, wait=True) @@ -168,7 +177,9 @@ def get_feature_store_values(): feature_store_data, values=["mean_amount"], index=["identifier"] ).T.add_suffix("_dist") additional_features_columns = list(additional_features.columns) - data = pd.concat([data, pd.DataFrame(columns=additional_features_columns, dtype=object)]) + data = pd.concat( + [data, pd.DataFrame(columns=additional_features_columns, dtype=object)] + ) data[additional_features_columns] = additional_features.values[0] for col in additional_features_columns: data[col] = abs(data[col] - data["amount"])