Skip to content

Commit

Permalink
fix data-preparation lint issues
Browse files Browse the repository at this point in the history
  • Loading branch information
gilad-shaham committed Mar 8, 2024
1 parent 4aa039c commit 6b87c40
Showing 1 changed file with 22 additions and 11 deletions.
33 changes: 22 additions & 11 deletions src/functions/data-preparation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,16 @@
import time




def data_prepare(context):
# Set AWS environment variables:
_set_envars(context)


region = sagemaker.Session().boto_region_name
sm_client = boto3.client("sagemaker")
boto_session = boto3.Session(region_name=region)
sagemaker_session = sagemaker.session.Session(boto_session=boto_session, sagemaker_client=sm_client)
sagemaker_session = sagemaker.session.Session(
boto_session=boto_session, sagemaker_client=sm_client
)
role = os.environ["SAGEMAKER_ROLE"]
bucket_prefix = "payment-classification"
s3_bucket = sagemaker_session.default_bucket()
Expand Down Expand Up @@ -74,7 +73,9 @@ def data_prepare(context):
feature_group_name = "feature-group-payment-classification"
record_identifier_feature_name = "identifier"

feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=sagemaker_session)
feature_group = FeatureGroup(
name=feature_group_name, sagemaker_session=sagemaker_session
)

featurestore_runtime = boto_session.client(
service_name="sagemaker-featurestore-runtime", region_name=region
Expand All @@ -98,7 +99,7 @@ def data_prepare(context):

status = feature_group.describe().get("FeatureGroupStatus")

if status!='Created':
if status != "Created":
feature_group.create(
s3_uri=f"s3://{s3_bucket}/{bucket_prefix}",
record_identifier_name=record_identifier_feature_name,
Expand Down Expand Up @@ -137,7 +138,9 @@ def get_feature_store_values():
)
feature_store_resp["identifier"] = feature_store_resp["identifier"].astype(int)
feature_store_resp["count"] = feature_store_resp["count"].astype(int)
feature_store_resp["mean_amount"] = feature_store_resp["mean_amount"].astype(float)
feature_store_resp["mean_amount"] = feature_store_resp["mean_amount"].astype(
float
)
feature_store_resp["EventTime"] = feature_store_resp["EventTime"].astype(float)
feature_store_resp = feature_store_resp.sort_values(by="identifier")

Expand All @@ -146,8 +149,12 @@ def get_feature_store_values():
feature_store_resp = get_feature_store_values()

feature_store_data = pd.DataFrame()
feature_store_data["mean_amount"] = data.groupby(["transaction_category"]).mean()["amount"]
feature_store_data["count"] = data.groupby(["transaction_category"]).count()["amount"]
feature_store_data["mean_amount"] = data.groupby(["transaction_category"]).mean()[
"amount"
]
feature_store_data["count"] = data.groupby(["transaction_category"]).count()[
"amount"
]
feature_store_data["identifier"] = feature_store_data.index
feature_store_data["EventTime"] = time.time()

Expand All @@ -157,7 +164,9 @@ def get_feature_store_values():
.apply(lambda x: np.average(x["mean_amount"], weights=x["count"]))
)
feature_store_data["count"] = (
pd.concat([feature_store_resp, feature_store_data]).groupby("identifier").sum()["count"]
pd.concat([feature_store_resp, feature_store_data])
.groupby("identifier")
.sum()["count"]
)

feature_group.ingest(data_frame=feature_store_data, max_workers=3, wait=True)
Expand All @@ -168,7 +177,9 @@ def get_feature_store_values():
feature_store_data, values=["mean_amount"], index=["identifier"]
).T.add_suffix("_dist")
additional_features_columns = list(additional_features.columns)
data = pd.concat([data, pd.DataFrame(columns=additional_features_columns, dtype=object)])
data = pd.concat(
[data, pd.DataFrame(columns=additional_features_columns, dtype=object)]
)
data[additional_features_columns] = additional_features.values[0]
for col in additional_features_columns:
data[col] = abs(data[col] - data["amount"])
Expand Down

0 comments on commit 6b87c40

Please sign in to comment.