Skip to content

Commit

Permalink
fix: skip infinite values during serialization (#99)
Browse files Browse the repository at this point in the history
* fix: skip infinite values during serialization to line protocol

* fix: deprecated method call

* fix: use apply() with lambda instead of replace() for future Pandas 3.x compatiblity

* refactor: backport tests from v2 client

---------

Co-authored-by: Jakub Bednar <[email protected]>
  • Loading branch information
alespour and bednar authored Jul 8, 2024
1 parent 1fe1460 commit 66776f1
Show file tree
Hide file tree
Showing 4 changed files with 468 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### Bug Fixes

1. [#95](https://github.com/InfluxCommunity/influxdb3-python/pull/95): `Polars` is optional dependency
1. [#99](https://github.com/InfluxCommunity/influxdb3-python/pull/99): Skip infinite values during serialization to line protocol

## 0.6.1 [2024-06-25]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
keys = []
# tags holds a list of tag f-string segments ordered alphabetically by tag key.
tags = []
# fields holds a list of field f-string segments ordered alphebetically by field key
# fields holds a list of field f-string segments ordered alphabetically by field key
fields = []
# field_indexes holds the index into each row of all the fields.
field_indexes = []
Expand Down Expand Up @@ -160,6 +160,11 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
# null_columns has a bool value for each column holding
# whether that column contains any null (NaN or None) values.
null_columns = data_frame.isnull().any()

# inf_columns has a bool value for each column holding
# whether that column contains any Inf values.
inf_columns = data_frame.isin([np.inf, -np.inf]).any()

timestamp_index = 0

# Iterate through the columns building up the expression for each column.
Expand All @@ -175,9 +180,10 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION

if key in data_frame_tag_columns:
# This column is a tag column.
if null_columns.iloc[index]:
if null_columns.iloc[index] or inf_columns.iloc[index]:
key_value = f"""{{
'' if {val_format} == '' or pd.isna({val_format}) else
'' if {val_format} == '' or pd.isna({val_format}) or
({inf_columns.iloc[index]} and np.isinf({val_format})) else
f',{key_format}={{str({val_format}).translate(_ESCAPE_STRING)}}'
}}"""
else:
Expand All @@ -199,16 +205,17 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
if (issubclass(value.type, np.integer) or issubclass(value.type, np.floating) or
issubclass(value.type, np.bool_)):
suffix = 'i' if issubclass(value.type, np.integer) else ''
if null_columns.iloc[index]:
if null_columns.iloc[index] or inf_columns.iloc[index]:
field_value = (
f"""{{"" if pd.isna({val_format}) else f"{sep}{key_format}={{{val_format}}}{suffix}"}}"""
f"""{{"" if pd.isna({val_format}) or ({inf_columns.iloc[index]} and np.isinf({val_format})) else
f"{sep}{key_format}={{{val_format}}}{suffix}"}}"""
)
else:
field_value = f'{sep}{key_format}={{{val_format}}}{suffix}'
else:
if null_columns.iloc[index]:
if null_columns.iloc[index] or inf_columns.iloc[index]:
field_value = f"""{{
'' if pd.isna({val_format}) else
'' if pd.isna({val_format}) or ({inf_columns.iloc[index]} and np.isinf({val_format})) else
f'{sep}{key_format}="{{str({val_format}).translate(_ESCAPE_STRING)}}"'
}}"""
else:
Expand All @@ -234,11 +241,12 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
'_ESCAPE_STRING': _ESCAPE_STRING,
'keys': keys,
'pd': pd,
'np': np,
})

for k, v in dict(data_frame.dtypes).items():
if k in data_frame_tag_columns:
data_frame[k].replace('', np.nan, inplace=True)
data_frame[k] = data_frame[k].apply(lambda x: np.nan if x == '' else x)

self.data_frame = data_frame
self.f = f
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def serialize(self, chunk_idx: int = None):
chunk = df[chunk_idx * self.chunk_size:(chunk_idx + 1) * self.chunk_size]

# Apply the UDF to each row
line_protocol_expr = chunk.apply(self.to_line_protocol, return_dtype=pl.Object)
line_protocol_expr = chunk.map_rows(self.to_line_protocol, return_dtype=pl.Object)

lp = line_protocol_expr['map'].to_list()

Expand Down
Loading

0 comments on commit 66776f1

Please sign in to comment.