Skip to content

Commit

Permalink
Add s3 bucket (flyteorg#420)
Browse files Browse the repository at this point in the history
* Add s3 bucket

Signed-off-by: Felix Wang <[email protected]>

* Switch from minio to s3 client

Signed-off-by: Felix Wang <[email protected]>

* Small fixes

Signed-off-by: Felix Wang <[email protected]>
  • Loading branch information
felixwang9817 authored Sep 24, 2021
1 parent a8e90a2 commit c76d55b
Showing 1 changed file with 22 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
* Generate prediction
"""

import boto3
import logging
import os
import random
import typing

Expand All @@ -31,6 +33,7 @@
from feast_dataobjects import FeatureStore, FeatureStoreConfig
from feature_eng_tasks import mean_median_imputer, univariate_selection
from flytekit import task, workflow
from flytekit.configuration import aws
from flytekit.core.node_creation import create_node
from flytekit.extras.sqlite3.task import SQLite3Config, SQLite3Task
from flytekit.types.file import JoblibSerializedFile
Expand Down Expand Up @@ -60,6 +63,23 @@
DATA_CLASS = "surgical lesion"


@task
def create_bucket(bucket_name: str):
client = boto3.client(
's3',
aws_access_key_id=aws.S3_ACCESS_KEY_ID.get(),
aws_secret_access_key=aws.S3_SECRET_ACCESS_KEY.get(),
use_ssl=False,
endpoint_url=aws.S3_ENDPOINT.get(),
)

try:
client.create_bucket(Bucket=bucket_name)
except client.exceptions.BucketAlreadyOwnedByYou:
logger.info(f"Bucket {bucket_name} has already been created by you.")
pass


sql_task = SQLite3Task(
name="sqlite3.horse_colic",
query_template="select * from data",
Expand Down Expand Up @@ -248,6 +268,8 @@ def feast_workflow(
registry_path: str = "registry.db",
online_store_path: str = "online.db",
) -> typing.List[str]:
# Create bucket if it does not already exist
create_bucket(bucket_name=s3_bucket)

# Load parquet file from sqlite task
df = sql_task()
Expand Down

0 comments on commit c76d55b

Please sign in to comment.