Skip to content

Commit

Permalink
Updated InfluxDB write options to reduce jitter_interval and client h…
Browse files Browse the repository at this point in the history
…andling. When resampling is enabled, the code now restructures each log file prior to writing and writes the entire resampled log file to InfluxDB in one go, instead of signal-by-signal.
  • Loading branch information
MatinF committed Jan 1, 2023
1 parent f25ed4b commit 5a6cc06
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 22 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ build.bat
*j1939-engine.dbc
*j1939-speed.dbc
*test_new.py
*env/*
*env/*
*_test*
2 changes: 1 addition & 1 deletion inputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
dbc_paths = ["dbc_files/canmod-gps.dbc"]
signals = []

# specify resampling frequency ("": no resampling)
# specify resampling frequency. Setting this to "" means no resampling (much slower)
res = "5S"

# -----------------------------------------------
Expand Down
7 changes: 6 additions & 1 deletion utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ def restructure_data(df_phys, res, full_col_names=False, pgn_names=False):
from J1939_PGN import J1939_PGN

df_phys_join = pd.DataFrame({"TimeStamp": []})

if res == "":
print("Warning: You must set a resampling frequency (e.g. 5S)")
return df_phys_join

if not df_phys.empty:
for message, df_phys_message in df_phys.groupby("CAN ID"):
for signal, data in df_phys_message.groupby("Signal"):
Expand All @@ -95,7 +100,7 @@ def restructure_data(df_phys, res, full_col_names=False, pgn_names=False):

df_phys_join = pd.merge_ordered(
df_phys_join,
data["Physical Value"].rename(col_name).resample(res).pad().dropna(),
data["Physical Value"].rename(col_name).resample(res).ffill().dropna(),
on="TimeStamp",
fill_method="none",
).set_index("TimeStamp")
Expand Down
45 changes: 26 additions & 19 deletions utils_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,26 @@ def write_signals(self, device_id, df_phys):
"""
tag_columns = []

if not df_phys.empty:
for signal, group in df_phys.groupby("Signal")["Physical Value"]:
df_signal = group.to_frame().rename(columns={"Physical Value": signal})
if df_phys.empty:
print("Warning: Dataframe is empty, no data written")
return
else:
if self.res != "":
self.write_influx(device_id, df_phys, [])

if self.res != "":
df_signal = df_signal.resample(self.res).ffill().dropna()
else:
for signal, group in df_phys.groupby("Signal")["Physical Value"]:
df_signal = group.to_frame().rename(columns={"Physical Value": signal})

if self.verbose:
print(f"Signal: {signal} (mean: {round(df_signal[signal].mean(),2)} | records: {len(df_signal)} | resampling: {self.res})")
if self.res != "":
df_signal = df_signal.resample(self.res).ffill().dropna()

if self.verbose:
print(f"Signal: {signal} (mean: {round(df_signal[signal].mean(),2)} | records: {len(df_signal)} | resampling: {self.res})")

# tag_columns, df_signal = self.add_signal_tags(df_signal)
# tag_columns, df_signal = self.add_signal_tags(df_signal)

self.write_influx(device_id, df_signal, tag_columns)
self.write_influx(device_id, df_signal, tag_columns)

def write_influx(self, name, df, tag_columns):
"""Helper function to write signal dataframes to InfluxDB"""
Expand All @@ -94,16 +101,16 @@ def write_influx(self, name, df, tag_columns):
print("Please check your InfluxDB credentials")
return

_write_client = self.client.write_api(
write_options=WriteOptions(
batch_size=5000,
flush_interval=1_000,
jitter_interval=2_000,
retry_interval=5_000,
)
)

_write_client.write(self.influx_bucket, record=df, data_frame_measurement_name=name, data_frame_tag_columns=tag_columns)
with self.client.write_api(
write_options=WriteOptions(
batch_size=20_000,
flush_interval=1_000,
jitter_interval=0,
retry_interval=5_000,
)
) as _write_client:
_write_client.write(self.influx_bucket, record=df, data_frame_measurement_name=name,
data_frame_tag_columns=tag_columns)

if self.verbose:
print(f"- SUCCESS: {len(df.index)} records of {name} written to InfluxDB\n\n")
Expand Down

0 comments on commit 5a6cc06

Please sign in to comment.