Skip to content

Commit

Permalink
HOTFIX: Polars dataframe unix timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Clifford committed Jan 25, 2024
1 parent f027046 commit 6ceac2c
Showing 1 changed file with 18 additions and 13 deletions.
31 changes: 18 additions & 13 deletions influxdb_client_3/write_client/client/write/dataframe_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,14 +352,16 @@ def to_line_protocol(self, row):
# add escape symbols for special characters to tags

fields = ",".join(
f"{col}=\"{row[self.column_indices[col]]}\"" if isinstance(row[self.column_indices[col]], str)
f"{col}=\"{self.escape_value(row[self.column_indices[col]])}\"" if isinstance(row[self.column_indices[col]], str)
else f"{col}={str(row[self.column_indices[col]]).lower()}" if isinstance(row[self.column_indices[col]], bool) # Check for bool first
else f"{col}={row[self.column_indices[col]]}i" if isinstance(row[self.column_indices[col]], int)
else f"{col}={row[self.column_indices[col]]}"
for col in self.column_indices
if col not in self.tag_columns + [self.timestamp_column]
and row[self.column_indices[col]] is not None and row[self.column_indices[col]] != ""
)


# Access the Unix timestamp
timestamp = row[self.column_indices[self.timestamp_column]]
if tags != "":
Expand All @@ -375,19 +377,22 @@ def serialize(self, chunk_idx: int = None):

df = self.data_frame

# Convert timestamp to unix timestamp
if self.precision is None:
df = df.with_columns(pl.col(self.timestamp_column).dt.epoch(time_unit="ns").alias(self.timestamp_column))
elif self.precision == 'ns':
df = df.with_columns(pl.col(self.timestamp_column).dt.epoch(time_unit="ns").alias(self.timestamp_column))
elif self.precision == 'us':
df = df.with_columns(pl.col(self.timestamp_column).dt.epoch(time_unit="us").alias(self.timestamp_column))
elif self.precision == 'ms':
df = df.with_columns(pl.col(self.timestamp_column).dt.epoch(time_unit="ms").alias(self.timestamp_column))
elif self.precision == 's':
df = df.with_columns(pl.col(self.timestamp_column).dt.epoch(time_unit="s").alias(self.timestamp_column))
# Check if the timestamp column is already an integer
if df[self.timestamp_column].dtype in [pl.Int32, pl.Int64]:
# The timestamp column is already an integer, assuming it's in Unix format
pass
else:
raise ValueError(f"Unsupported precision: {self.precision}")
# Convert timestamp to Unix timestamp based on specified precision
if self.precision in [None, 'ns']:
df = df.with_columns(pl.col(self.timestamp_column).dt.epoch(time_unit="ns").alias(self.timestamp_column))
elif self.precision == 'us':
df = df.with_columns(pl.col(self.timestamp_column).dt.epoch(time_unit="us").alias(self.timestamp_column))
elif self.precision == 'ms':
df = df.with_columns(pl.col(self.timestamp_column).dt.epoch(time_unit="ms").alias(self.timestamp_column))
elif self.precision == 's':
df = df.with_columns(pl.col(self.timestamp_column).dt.epoch(time_unit="s").alias(self.timestamp_column))
else:
raise ValueError(f"Unsupported precision: {self.precision}")

if chunk_idx is None:
chunk = df
Expand Down

0 comments on commit 6ceac2c

Please sign in to comment.