From 6ceac2c0b3a5dad4ee8f7219128205e39b53489a Mon Sep 17 00:00:00 2001 From: Jay Clifford Date: Thu, 25 Jan 2024 13:18:19 +0000 Subject: [PATCH] HOTFIX: Polars dataframe unix timestamp --- .../client/write/dataframe_serializer.py | 31 +++++++++++-------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/influxdb_client_3/write_client/client/write/dataframe_serializer.py b/influxdb_client_3/write_client/client/write/dataframe_serializer.py index ced5337..22b0ad0 100644 --- a/influxdb_client_3/write_client/client/write/dataframe_serializer.py +++ b/influxdb_client_3/write_client/client/write/dataframe_serializer.py @@ -352,7 +352,8 @@ def to_line_protocol(self, row): # add escape symbols for special characters to tags fields = ",".join( - f"{col}=\"{row[self.column_indices[col]]}\"" if isinstance(row[self.column_indices[col]], str) + f"{col}=\"{self.escape_value(row[self.column_indices[col]])}\"" if isinstance(row[self.column_indices[col]], str) + else f"{col}={str(row[self.column_indices[col]]).lower()}" if isinstance(row[self.column_indices[col]], bool) # Check for bool first else f"{col}={row[self.column_indices[col]]}i" if isinstance(row[self.column_indices[col]], int) else f"{col}={row[self.column_indices[col]]}" for col in self.column_indices @@ -360,6 +361,7 @@ def to_line_protocol(self, row): and row[self.column_indices[col]] is not None and row[self.column_indices[col]] != "" ) + # Access the Unix timestamp timestamp = row[self.column_indices[self.timestamp_column]] if tags != "": @@ -375,19 +377,22 @@ def serialize(self, chunk_idx: int = None): df = self.data_frame - # Convert timestamp to unix timestamp - if self.precision is None: - df = df.with_columns(pl.col(self.timestamp_column).dt.epoch(time_unit="ns").alias(self.timestamp_column)) - elif self.precision == 'ns': - df = df.with_columns(pl.col(self.timestamp_column).dt.epoch(time_unit="ns").alias(self.timestamp_column)) - elif self.precision == 'us': - df = df.with_columns(pl.col(self.timestamp_column).dt.epoch(time_unit="us").alias(self.timestamp_column)) - elif self.precision == 'ms': - df = df.with_columns(pl.col(self.timestamp_column).dt.epoch(time_unit="ms").alias(self.timestamp_column)) - elif self.precision == 's': - df = df.with_columns(pl.col(self.timestamp_column).dt.epoch(time_unit="s").alias(self.timestamp_column)) + # Check if the timestamp column is already an integer + if df[self.timestamp_column].dtype in [pl.Int32, pl.Int64]: + # The timestamp column is already an integer, assuming it's in Unix format + pass else: - raise ValueError(f"Unsupported precision: {self.precision}") + # Convert timestamp to Unix timestamp based on specified precision + if self.precision in [None, 'ns']: + df = df.with_columns(pl.col(self.timestamp_column).dt.epoch(time_unit="ns").alias(self.timestamp_column)) + elif self.precision == 'us': + df = df.with_columns(pl.col(self.timestamp_column).dt.epoch(time_unit="us").alias(self.timestamp_column)) + elif self.precision == 'ms': + df = df.with_columns(pl.col(self.timestamp_column).dt.epoch(time_unit="ms").alias(self.timestamp_column)) + elif self.precision == 's': + df = df.with_columns(pl.col(self.timestamp_column).dt.epoch(time_unit="s").alias(self.timestamp_column)) + else: + raise ValueError(f"Unsupported precision: {self.precision}") if chunk_idx is None: chunk = df