From c76d55bda9afccc6d4af397ef56dc9fe05d4420e Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Fri, 24 Sep 2021 13:17:20 -0700 Subject: [PATCH] Add s3 bucket (#420) * Add s3 bucket Signed-off-by: Felix Wang * Switch from minio to s3 client Signed-off-by: Felix Wang * Small fixes Signed-off-by: Felix Wang --- .../feast_integration/feast_workflow.py | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/cookbook/case_studies/feature_engineering/feast_integration/feast_workflow.py b/cookbook/case_studies/feature_engineering/feast_integration/feast_workflow.py index de1fc7a8ed..229b95fd5c 100644 --- a/cookbook/case_studies/feature_engineering/feast_integration/feast_workflow.py +++ b/cookbook/case_studies/feature_engineering/feast_integration/feast_workflow.py @@ -17,7 +17,9 @@ * Generate prediction """ +import boto3 import logging +import os import random import typing @@ -31,6 +33,7 @@ from feast_dataobjects import FeatureStore, FeatureStoreConfig from feature_eng_tasks import mean_median_imputer, univariate_selection from flytekit import task, workflow +from flytekit.configuration import aws from flytekit.core.node_creation import create_node from flytekit.extras.sqlite3.task import SQLite3Config, SQLite3Task from flytekit.types.file import JoblibSerializedFile @@ -60,6 +63,23 @@ DATA_CLASS = "surgical lesion" +@task +def create_bucket(bucket_name: str): + client = boto3.client( + 's3', + aws_access_key_id=aws.S3_ACCESS_KEY_ID.get(), + aws_secret_access_key=aws.S3_SECRET_ACCESS_KEY.get(), + use_ssl=False, + endpoint_url=aws.S3_ENDPOINT.get(), + ) + + try: + client.create_bucket(Bucket=bucket_name) + except client.exceptions.BucketAlreadyOwnedByYou: + logger.info(f"Bucket {bucket_name} has already been created by you.") + pass + + sql_task = SQLite3Task( name="sqlite3.horse_colic", query_template="select * from data", @@ -248,6 +268,8 @@ def feast_workflow( registry_path: str = "registry.db", online_store_path: str = "online.db", ) -> typing.List[str]: + # Create bucket if it does not already exist + create_bucket(bucket_name=s3_bucket) # Load parquet file from sqlite task df = sql_task()