Skip to content

Commit

Permalink
Merge pull request #20 from mlrun/development
Browse files Browse the repository at this point in the history
dev to main: update docs and github action
  • Loading branch information
gilad-shaham authored Mar 11, 2024
2 parents a56e83a + 618fbd9 commit c3b1fe1
Show file tree
Hide file tree
Showing 9 changed files with 134 additions and 38 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/deploy_workflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ on:
- closed
branches:
- staging
- master
- main

jobs:
deploy-workflow:
Expand Down
46 changes: 45 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,45 @@
# demo-sagemaker
# AWS SageMaker Demo with MLRun

This demo showcases how to build, manage, and deploy machine learning models using AWS SageMaker and MLRun. It emphasizes the automation of ML workflows from development to production.

This demo is based on the SageMaker Payment Classification use case from the SageMaker's example repository (https://github.com/aws/amazon-sagemaker-examples/blob/main/use-cases/financial_payment_classification/financial_payment_classification.ipynb).

## Key Components

- **AWS SageMaker**: A comprehensive service that enables developers and data scientists to build, train, and deploy machine learning (ML) models efficiently.

- **MLRun**: An open-source MLOps framework designed to manage and automate your machine learning and data science lifecycle. In this demo, it is used to automate ML deployment and workflows.

## Running the Demo

1. **Prerequisites**: Ensure you have an AWS account with SageMaker enabled and MLRun installed in your environment.

2. **Clone the repository**: Clone this repository to your SageMaker notebook environment.

3. **Set the environment variables in `mlrun.env`**: Copy the `mlrun.env` file to your workspace and fill in the necessary environment variables such as `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `AWS_DEFAULT_REGION`, `SAGEMAKER_ROLE`, `MLRUN_DBPATH`, `V3IO_USERNAME`, and `V3IO_ACCESS_KEY`.

4. **Run the Jupyter notebook**: Open and run the `financial-payment-pipeline.ipynb` notebook. This notebook contains the code for the financial payment classification pipeline.

5. **Monitor your runs**: Track your runs in the MLRun dashboard. The dashboard provides a graphical interface for tracking your MLRun projects, functions, runs, and artifacts.

You can also open `financial-payment-classification.ipynb` to review the SageMaker code and the MLRun code segments cell-by-cell. This notebook does not include the automated workflow, but rather the individual steps.

## CI/CD using GitHub Actions
This demo also includes a workflow for automating the execution of the machine learning pipeline. To set this up:

1. **Fork this repository**: Create a copy of this repository in your own GitHub account by forking it.

2. **Add Secrets to Your Repository**: Navigate to the "Settings" tab in your GitHub repository, then click on "Secrets". Here, you need to add the following secrets, which will be used as environment variables in your workflow:

- `AWS_ACCESS_KEY_ID`
- `AWS_SECRET_ACCESS_KEY`
- `AWS_DEFAULT_REGION`
- `SAGEMAKER_ROLE`
- `MLRUN_DBPATH`
- `V3IO_ACCESS_KEY`

Additionally, set the `V3IO_USERNAME` environment variable to your username.

3. **Commit and Push Your Changes**: Make any necessary changes to the code, then commit and push these changes to your repository.

4. **Create a Pull Request**: Create a pull request to either the `staging` or `main` branch. Once the pull request is merged, it will trigger the GitHub action. You can review the pipeline execution in the MLRun UI, a link to which can be found in the workflow steps.
17 changes: 16 additions & 1 deletion mlrun.env
Original file line number Diff line number Diff line change
@@ -1,8 +1,23 @@
# This file contains environment variables for configuring the SageMaker and MLRun environments.


# AWS_ACCESS_KEY_ID: This is the access key for your AWS account. It is used to authenticate and authorize AWS API requests.
AWS_ACCESS_KEY_ID=

# AWS_SECRET_ACCESS_KEY: This is the secret key for your AWS account. It is used along with the access key to sign programmatic AWS API requests.
AWS_SECRET_ACCESS_KEY=

# AWS_DEFAULT_REGION: This is the default region that AWS CLI and SDKs will interact with when no other region is specified.
AWS_DEFAULT_REGION=

# SAGEMAKER_ROLE: This is the AWS IAM role that Amazon SageMaker can assume to perform tasks on your behalf (e.g., reading training results, writing model artifacts).
SAGEMAKER_ROLE=

# MLRUN_DBPATH: This is the path to the database that MLRun uses to store metadata for MLRun projects, functions, runs, and artifacts.
MLRUN_DBPATH=

# V3IO_USERNAME: This is the username for the V3IO data platform, which is a high-performance data storage and processing platform.
V3IO_USERNAME=
V3IO_ACCESS_KEY=

# V3IO_ACCESS_KEY: This is the access key for the V3IO data platform. It is used to authenticate requests to the V3IO API.
V3IO_ACCESS_KEY=
4 changes: 2 additions & 2 deletions project_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
)
@click.option(
"--branch",
type=click.Choice(["development", "staging", "master"]
type=click.Choice(["development", "staging", "main"]
),
required=True,
help="Specify the branch - only relevant when using git source.",
Expand All @@ -37,7 +37,7 @@ def main(
) -> None:

user_project = (
True if single_cluster_mode and branch in ["staging", "master"] else False
True if single_cluster_mode and branch in ["staging", "main"] else False
)

source = f"{repo}#{branch}"
Expand Down
46 changes: 29 additions & 17 deletions src/functions/data-preparation.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
import os
import time

import boto3
import numpy as np
import sagemaker
import pandas as pd
import sagemaker
from sagemaker.feature_store.feature_group import FeatureGroup
import time




def data_prepare(context):
# Set AWS environment variables:
_set_envars(context)


region = sagemaker.Session().boto_region_name
sm_client = boto3.client("sagemaker")
boto_session = boto3.Session(region_name=region)
sagemaker_session = sagemaker.session.Session(boto_session=boto_session, sagemaker_client=sm_client)
role = os.environ["SAGEMAKER-ROLE"]
sagemaker_session = sagemaker.session.Session(
boto_session=boto_session, sagemaker_client=sm_client
)
role = os.environ["SAGEMAKER_ROLE"]
bucket_prefix = "payment-classification"
s3_bucket = sagemaker_session.default_bucket()

Expand Down Expand Up @@ -74,13 +74,15 @@ def data_prepare(context):
feature_group_name = "feature-group-payment-classification"
record_identifier_feature_name = "identifier"

feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=sagemaker_session)
feature_group = FeatureGroup(
name=feature_group_name, sagemaker_session=sagemaker_session
)

featurestore_runtime = boto_session.client(
service_name="sagemaker-featurestore-runtime", region_name=region
)

feature_store_session = sagemaker.Session(
sagemaker.Session(
boto_session=boto_session,
sagemaker_client=sm_client,
sagemaker_featurestore_runtime_client=featurestore_runtime,
Expand All @@ -98,7 +100,7 @@ def data_prepare(context):

status = feature_group.describe().get("FeatureGroupStatus")

if status!='Created':
if status != "Created":
feature_group.create(
s3_uri=f"s3://{s3_bucket}/{bucket_prefix}",
record_identifier_name=record_identifier_feature_name,
Expand Down Expand Up @@ -137,7 +139,9 @@ def get_feature_store_values():
)
feature_store_resp["identifier"] = feature_store_resp["identifier"].astype(int)
feature_store_resp["count"] = feature_store_resp["count"].astype(int)
feature_store_resp["mean_amount"] = feature_store_resp["mean_amount"].astype(float)
feature_store_resp["mean_amount"] = feature_store_resp["mean_amount"].astype(
float
)
feature_store_resp["EventTime"] = feature_store_resp["EventTime"].astype(float)
feature_store_resp = feature_store_resp.sort_values(by="identifier")

Expand All @@ -146,8 +150,12 @@ def get_feature_store_values():
feature_store_resp = get_feature_store_values()

feature_store_data = pd.DataFrame()
feature_store_data["mean_amount"] = data.groupby(["transaction_category"]).mean()["amount"]
feature_store_data["count"] = data.groupby(["transaction_category"]).count()["amount"]
feature_store_data["mean_amount"] = data.groupby(["transaction_category"]).mean()[
"amount"
]
feature_store_data["count"] = data.groupby(["transaction_category"]).count()[
"amount"
]
feature_store_data["identifier"] = feature_store_data.index
feature_store_data["EventTime"] = time.time()

Expand All @@ -157,7 +165,9 @@ def get_feature_store_values():
.apply(lambda x: np.average(x["mean_amount"], weights=x["count"]))
)
feature_store_data["count"] = (
pd.concat([feature_store_resp, feature_store_data]).groupby("identifier").sum()["count"]
pd.concat([feature_store_resp, feature_store_data])
.groupby("identifier")
.sum()["count"]
)

feature_group.ingest(data_frame=feature_store_data, max_workers=3, wait=True)
Expand All @@ -168,7 +178,9 @@ def get_feature_store_values():
feature_store_data, values=["mean_amount"], index=["identifier"]
).T.add_suffix("_dist")
additional_features_columns = list(additional_features.columns)
data = pd.concat([data, pd.DataFrame(columns=additional_features_columns, dtype=object)])
data = pd.concat(
[data, pd.DataFrame(columns=additional_features_columns, dtype=object)]
)
data[additional_features_columns] = additional_features.values[0]
for col in additional_features_columns:
data[col] = abs(data[col] - data["amount"])
Expand All @@ -191,7 +203,7 @@ def get_feature_store_values():
sagemaker_session = sagemaker.session.Session(
boto_session=boto_session, sagemaker_client=sm_client
)
role = context.get_secret("SAGEMAKER-ROLE")
role = context.get_secret("SAGEMAKER_ROLE")
bucket_prefix = "payment-classification"
s3_bucket = sagemaker_session.default_bucket()

Expand All @@ -216,4 +228,4 @@ def _set_envars(context):
os.environ["AWS_ACCESS_KEY_ID"] = context.get_secret("AWS_ACCESS_KEY_ID")
os.environ["AWS_SECRET_ACCESS_KEY"] = context.get_secret("AWS_SECRET_ACCESS_KEY")
os.environ["AWS_DEFAULT_REGION"] = context.get_secret("AWS_DEFAULT_REGION")
os.environ["SAGEMAKER-ROLE"] = context.get_secret("SAGEMAKER-ROLE")
os.environ["SAGEMAKER_ROLE"] = context.get_secret("SAGEMAKER_ROLE")
16 changes: 16 additions & 0 deletions src/functions/evaluate.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,22 @@ def evaluate(
label_column: str,
factorize_key: dict = None,
) -> pd.DataFrame:
"""
Evaluate a trained model on a test set.
Parameters:
model_path (str): The path to the trained model.
model_name (str): The name of the model file.
test_set (Union[mlrun.DataItem, str]): The test set data. It can be either a mlrun.DataItem object or a string
representing the path to the test set file.
label_column (str): The name of the label column in the test set.
factorize_key (dict, optional): A dictionary mapping the target labels to their corresponding names. Defaults to
the transaction categories.
Returns:
pd.DataFrame: The classification report as a pandas DataFrame.
"""

# download model from s3:
if model_path.startswith("store://"):
model_path = mlrun.get_dataitem(model_path).get().decode("utf-8")
Expand Down
24 changes: 15 additions & 9 deletions src/functions/serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
import numpy as np
import xgboost as xgb
from cloudpickle import load
import mlrun.feature_store as fstore


warnings.filterwarnings("ignore")

Expand All @@ -28,14 +26,15 @@ def load(self):
self.model = load(open("xgboost-model", "rb"))

def predict(self, body: dict) -> List:
"""Generate model predictions from sample."""

print(body)
# body['inputs'][0] = body['inputs'][0][1:]

# print(body)
"""
Generate model predictions from data input.
Args:
body (dict): The input data for prediction.
Returns:
The model predictions as a list.
"""

# Convert input to numpy array:
data = np.asarray(body["inputs"])
Expand Down Expand Up @@ -64,7 +63,14 @@ def _set_model_path(self):
def postprocess(inputs: dict) -> dict:
"""
Postprocessing the output of the model
Args:
inputs (dict): The input dictionary containing the model outputs.
Returns:
dict: The postprocessed output dictionary with predictions and confidences.
"""

# Read the prediction:
print(inputs)
outputs = np.asarray(inputs.pop("outputs"))
Expand All @@ -80,4 +86,4 @@ def postprocess(inputs: dict) -> dict:

inputs["predictions"] = predictions
inputs["confidences"] = confidences
return inputs
return inputs
6 changes: 2 additions & 4 deletions src/functions/train.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import os

import boto3
import mlrun.feature_store as fs
import numpy as np
import sagemaker


Expand All @@ -17,7 +15,7 @@ def train(context):
sagemaker_session = sagemaker.session.Session(
boto_session=boto_session, sagemaker_client=sm_client
)
role = context.get_secret("SAGEMAKER-ROLE")
role = context.get_secret("SAGEMAKER_ROLE")
bucket_prefix = "payment-classification"
s3_bucket = sagemaker_session.default_bucket()

Expand Down Expand Up @@ -74,4 +72,4 @@ def _set_envars(context):
os.environ["AWS_ACCESS_KEY_ID"] = context.get_secret("AWS_ACCESS_KEY_ID")
os.environ["AWS_SECRET_ACCESS_KEY"] = context.get_secret("AWS_SECRET_ACCESS_KEY")
os.environ["AWS_DEFAULT_REGION"] = context.get_secret("AWS_DEFAULT_REGION")
os.environ["SAGEMAKER-ROLE"] = context.get_secret("SAGEMAKER-ROLE")
os.environ["SAGEMAKER_ROLE"] = context.get_secret("SAGEMAKER_ROLE")
11 changes: 8 additions & 3 deletions src/workflows/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@


@dsl.pipeline(
name="Fraud Detection Pipeline",
description="Detecting fraud from a transactions dataset",
name="Classify payment Pipeline",
description="Classify payment from a transactions dataset",
)
def kfpipeline():
"""
This function defines a Kubeflow Pipeline for payment classification.
It trains a model using SageMaker, evaluates its performance, sets up a serving function,
and deploys the serving function.
"""
project = mlrun.get_current_project()

# Train
Expand All @@ -19,7 +24,7 @@ def kfpipeline():
)

# Evaluate
evaluate_run = project.run_function(
project.run_function(
function="evaluate",
name="evaluate",
handler="evaluate",
Expand Down

0 comments on commit c3b1fe1

Please sign in to comment.