Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add delta format to FileSource, add support for it in ibis/duckdb #4123

Merged
merged 2 commits into from
Apr 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions protos/feast/core/DataFormat.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,12 @@ message FileFormat {
// Defines options for the Parquet data format
message ParquetFormat {}

// Defines options for delta data format
message DeltaFormat {}

oneof format {
ParquetFormat parquet_format = 1;
DeltaFormat delta_format = 2;
}
}

Expand Down
14 changes: 14 additions & 0 deletions sdk/python/feast/data_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ def from_proto(cls, proto):
fmt = proto.WhichOneof("format")
if fmt == "parquet_format":
return ParquetFormat()
elif fmt == "delta_format":
return DeltaFormat()
if fmt is None:
return None
raise NotImplementedError(f"FileFormat is unsupported: {fmt}")
Expand All @@ -66,6 +68,18 @@ def __str__(self):
return "parquet"


class DeltaFormat(FileFormat):
"""
Defines delta data format
"""

def to_proto(self):
return FileFormatProto(delta_format=FileFormatProto.DeltaFormat())

def __str__(self):
return "delta"


class StreamFormat(ABC):
"""
Defines an abtracts streaming data format used to encode feature data in streams
Expand Down
39 changes: 23 additions & 16 deletions sdk/python/feast/infra/offline_stores/file_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from typeguard import typechecked

from feast import type_map
from feast.data_format import FileFormat, ParquetFormat
from feast.data_format import DeltaFormat, FileFormat, ParquetFormat
from feast.data_source import DataSource
from feast.feature_logging import LoggingDestination
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
Expand Down Expand Up @@ -157,24 +157,31 @@ def get_table_column_names_and_types(
filesystem, path = FileSource.create_filesystem_and_path(
self.path, self.file_options.s3_endpoint_override
)
# Adding support for different file format path
# based on S3 filesystem
if filesystem is None:
kwargs = (
{"use_legacy_dataset": False}
if version.parse(pyarrow.__version__) < version.parse("15.0.0")
else {}
)

schema = ParquetDataset(path, **kwargs).schema
if hasattr(schema, "names") and hasattr(schema, "types"):
# Newer versions of pyarrow doesn't have this method,
# but this field is good enough.
pass
# TODO why None check necessary
if self.file_format is None or isinstance(self.file_format, ParquetFormat):
if filesystem is None:
kwargs = (
{"use_legacy_dataset": False}
if version.parse(pyarrow.__version__) < version.parse("15.0.0")
else {}
)

schema = ParquetDataset(path, **kwargs).schema
if hasattr(schema, "names") and hasattr(schema, "types"):
# Newer versions of pyarrow doesn't have this method,
# but this field is good enough.
pass
else:
schema = schema.to_arrow_schema()
else:
schema = schema.to_arrow_schema()
schema = ParquetDataset(path, filesystem=filesystem).schema
elif isinstance(self.file_format, DeltaFormat):
from deltalake import DeltaTable

schema = DeltaTable(self.path).schema().to_pyarrow()
else:
schema = ParquetDataset(path, filesystem=filesystem).schema
raise Exception(f"Unknown FileFormat -> {self.file_format}")

return zip(schema.names, map(str, schema.types))

Expand Down
74 changes: 51 additions & 23 deletions sdk/python/feast/infra/offline_stores/ibis.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from ibis.expr.types import Table
from pytz import utc

from feast.data_format import DeltaFormat, ParquetFormat
from feast.data_source import DataSource
from feast.errors import SavedDatasetLocationAlreadyExists
from feast.feature_logging import LoggingConfig, LoggingSource
Expand Down Expand Up @@ -105,6 +106,15 @@ def _generate_row_id(

return entity_table

@staticmethod
def _read_data_source(data_source: DataSource) -> Table:
assert isinstance(data_source, FileSource)

if isinstance(data_source.file_format, ParquetFormat):
return ibis.read_parquet(data_source.path)
elif isinstance(data_source.file_format, DeltaFormat):
return ibis.read_delta(data_source.path)

@staticmethod
def get_historical_features(
config: RepoConfig,
Expand Down Expand Up @@ -137,7 +147,9 @@ def get_historical_features(
def read_fv(
feature_view: FeatureView, feature_refs: List[str], full_feature_names: bool
) -> Tuple:
fv_table: Table = ibis.read_parquet(feature_view.batch_source.name)
fv_table: Table = IbisOfflineStore._read_data_source(
feature_view.batch_source
)

for old_name, new_name in feature_view.batch_source.field_mapping.items():
if old_name in fv_table.columns:
Expand Down Expand Up @@ -227,7 +239,7 @@ def pull_all_from_table_or_query(
start_date = start_date.astimezone(tz=utc)
end_date = end_date.astimezone(tz=utc)

table = ibis.read_parquet(data_source.path)
table = IbisOfflineStore._read_data_source(data_source)

table = table.select(*fields)

Expand Down Expand Up @@ -260,10 +272,9 @@ def write_logged_features(
destination = logging_config.destination
assert isinstance(destination, FileLoggingDestination)

if isinstance(data, Path):
table = ibis.read_parquet(data)
else:
table = ibis.memtable(data)
table = (
ibis.read_parquet(data) if isinstance(data, Path) else ibis.memtable(data)
)

if destination.partition_by:
kwargs = {"partition_by": destination.partition_by}
Expand Down Expand Up @@ -294,12 +305,21 @@ def offline_write_batch(
)

file_options = feature_view.batch_source.file_options
prev_table = ibis.read_parquet(file_options.uri).to_pyarrow()
if table.schema != prev_table.schema:
table = table.cast(prev_table.schema)
new_table = pyarrow.concat_tables([table, prev_table])

ibis.memtable(new_table).to_parquet(file_options.uri)
if isinstance(feature_view.batch_source.file_format, ParquetFormat):
prev_table = ibis.read_parquet(file_options.uri).to_pyarrow()
if table.schema != prev_table.schema:
table = table.cast(prev_table.schema)
new_table = pyarrow.concat_tables([table, prev_table])

ibis.memtable(new_table).to_parquet(file_options.uri)
elif isinstance(feature_view.batch_source.file_format, DeltaFormat):
from deltalake import DeltaTable

prev_schema = DeltaTable(file_options.uri).schema().to_pyarrow()
if table.schema != prev_schema:
table = table.cast(prev_schema)
ibis.memtable(table).to_delta(file_options.uri, mode="append")


class IbisRetrievalJob(RetrievalJob):
Expand Down Expand Up @@ -338,20 +358,28 @@ def persist(
if not allow_overwrite and os.path.exists(storage.file_options.uri):
raise SavedDatasetLocationAlreadyExists(location=storage.file_options.uri)

filesystem, path = FileSource.create_filesystem_and_path(
storage.file_options.uri,
storage.file_options.s3_endpoint_override,
)

if path.endswith(".parquet"):
pyarrow.parquet.write_table(
self.to_arrow(), where=path, filesystem=filesystem
if isinstance(storage.file_options.file_format, ParquetFormat):
filesystem, path = FileSource.create_filesystem_and_path(
storage.file_options.uri,
storage.file_options.s3_endpoint_override,
)
else:
# otherwise assume destination is directory
pyarrow.parquet.write_to_dataset(
self.to_arrow(), root_path=path, filesystem=filesystem

if path.endswith(".parquet"):
pyarrow.parquet.write_table(
self.to_arrow(), where=path, filesystem=filesystem
)
else:
# otherwise assume destination is directory
pyarrow.parquet.write_to_dataset(
self.to_arrow(), root_path=path, filesystem=filesystem
)
elif isinstance(storage.file_options.file_format, DeltaFormat):
mode = (
"overwrite"
if allow_overwrite and os.path.exists(storage.file_options.uri)
else "error"
)
self.table.to_delta(storage.file_options.uri, mode=mode)

@property
def metadata(self) -> Optional[RetrievalMetadata]:
Expand Down
47 changes: 26 additions & 21 deletions sdk/python/requirements/py3.10-ci-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ bidict==0.23.1
# via ibis-framework
bleach==6.1.0
# via nbconvert
boto3==1.34.85
boto3==1.34.88
# via
# feast (setup.py)
# moto
botocore==1.34.85
botocore==1.34.88
# via
# boto3
# moto
Expand Down Expand Up @@ -134,11 +134,11 @@ cryptography==42.0.5
# snowflake-connector-python
# types-pyopenssl
# types-redis
dask[array,dataframe]==2024.4.1
dask[array,dataframe]==2024.4.2
# via
# dask-expr
# feast (setup.py)
dask-expr==1.0.11
dask-expr==1.0.12
# via dask
db-dtypes==1.2.0
# via google-cloud-bigquery
Expand All @@ -148,6 +148,8 @@ decorator==5.1.1
# via ipython
defusedxml==0.7.1
# via nbconvert
deltalake==0.16.4
# via feast (setup.py)
dill==0.3.8
# via feast (setup.py)
distlib==0.3.8
Expand All @@ -158,15 +160,15 @@ docker==7.0.0
# testcontainers
docutils==0.19
# via sphinx
duckdb==0.10.1
duckdb==0.10.2
# via
# duckdb-engine
# ibis-framework
duckdb-engine==0.11.5
# via ibis-framework
entrypoints==0.4
# via altair
exceptiongroup==1.2.0
exceptiongroup==1.2.1
# via
# anyio
# ipython
Expand All @@ -175,7 +177,7 @@ execnet==2.1.1
# via pytest-xdist
executing==2.0.1
# via stack-data
fastapi==0.110.1
fastapi==0.110.2
# via feast (setup.py)
fastjsonschema==2.19.1
# via nbformat
Expand Down Expand Up @@ -263,7 +265,7 @@ greenlet==3.0.3
# via sqlalchemy
grpc-google-iam-v1==0.13.0
# via google-cloud-bigtable
grpcio==1.62.1
grpcio==1.62.2
# via
# feast (setup.py)
# google-api-core
Expand All @@ -275,15 +277,15 @@ grpcio==1.62.1
# grpcio-status
# grpcio-testing
# grpcio-tools
grpcio-health-checking==1.62.1
grpcio-health-checking==1.62.2
# via feast (setup.py)
grpcio-reflection==1.62.1
grpcio-reflection==1.62.2
# via feast (setup.py)
grpcio-status==1.62.1
grpcio-status==1.62.2
# via google-api-core
grpcio-testing==1.62.1
grpcio-testing==1.62.2
# via feast (setup.py)
grpcio-tools==1.62.1
grpcio-tools==1.62.2
# via feast (setup.py)
gunicorn==22.0.0 ; platform_system != "Windows"
# via feast (setup.py)
Expand Down Expand Up @@ -482,13 +484,13 @@ nest-asyncio==1.6.0
# via ipykernel
nodeenv==1.8.0
# via pre-commit
notebook==7.1.2
notebook==7.1.3
# via great-expectations
notebook-shim==0.2.4
# via
# jupyterlab
# notebook
numpy==1.24.4
numpy==1.26.4
# via
# altair
# dask
Expand Down Expand Up @@ -615,12 +617,15 @@ pyarrow==15.0.2
# via
# dask-expr
# db-dtypes
# deltalake
# feast (setup.py)
# google-cloud-bigquery
# ibis-framework
# snowflake-connector-python
pyarrow-hotfix==0.6
# via ibis-framework
# via
# deltalake
# ibis-framework
pyasn1==0.6.0
# via
# pyasn1-modules
Expand Down Expand Up @@ -692,7 +697,7 @@ pytest-ordering==0.6
# via feast (setup.py)
pytest-timeout==1.4.2
# via feast (setup.py)
pytest-xdist==3.5.0
pytest-xdist==3.6.0
# via feast (setup.py)
python-dateutil==2.9.0.post0
# via
Expand Down Expand Up @@ -728,7 +733,7 @@ pyyaml==6.0.1
# pre-commit
# responses
# uvicorn
pyzmq==26.0.0
pyzmq==26.0.2
# via
# ipykernel
# jupyter-client
Expand Down Expand Up @@ -785,7 +790,7 @@ rsa==4.9
# via google-auth
ruamel-yaml==0.17.17
# via great-expectations
ruff==0.3.7
ruff==0.4.1
# via feast (setup.py)
s3transfer==0.10.1
# via boto3
Expand All @@ -812,7 +817,7 @@ sniffio==1.3.1
# httpx
snowballstemmer==2.2.0
# via sphinx
snowflake-connector-python[pandas]==3.8.1
snowflake-connector-python[pandas]==3.9.0
# via feast (setup.py)
sortedcontainers==2.4.0
# via snowflake-connector-python
Expand Down Expand Up @@ -895,7 +900,7 @@ tqdm==4.66.2
# via
# feast (setup.py)
# great-expectations
traitlets==5.14.2
traitlets==5.14.3
# via
# comm
# ipykernel
Expand Down
Loading
Loading