Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve speed of TRCReader #1893

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 37 additions & 46 deletions can/io/trc.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,12 @@
import os
from datetime import datetime, timedelta, timezone
from enum import Enum
from typing import Any, Callable, Dict, Generator, List, Optional, TextIO, Union
from typing import Any, Callable, Dict, Generator, Optional, TextIO, Tuple, Union

from ..message import Message
from ..typechecking import StringPathLike
from ..util import channel2int, dlc2len, len2dlc
from .generic import (
TextIOMessageReader,
TextIOMessageWriter,
)
from ..util import channel2int, len2dlc
from .generic import TextIOMessageReader, TextIOMessageWriter

logger = logging.getLogger("can.io.trc")

Expand Down Expand Up @@ -58,13 +55,22 @@ def __init__(
"""
super().__init__(file, mode="r")
self.file_version = TRCFileVersion.UNKNOWN
self.start_time: Optional[datetime] = None
self._start_time: float = 0
self.columns: Dict[str, int] = {}
self._num_columns = -1

if not self.file:
raise ValueError("The given file cannot be None")

self._parse_cols: Callable[[List[str]], Optional[Message]] = lambda x: None
self._parse_cols: Callable[[Tuple[str, ...]], Optional[Message]] = (
lambda x: None
)

@property
def start_time(self) -> Optional[datetime]:
if self._start_time:
return datetime.fromtimestamp(self._start_time, timezone.utc)
return None

def _extract_header(self):
line = ""
Expand All @@ -89,16 +95,18 @@ def _extract_header(self):
elif line.startswith(";$STARTTIME"):
logger.debug("TRCReader: Found start time '%s'", line)
try:
self.start_time = datetime(
1899, 12, 30, tzinfo=timezone.utc
) + timedelta(days=float(line.split("=")[1]))
self._start_time = (
datetime(1899, 12, 30, tzinfo=timezone.utc)
+ timedelta(days=float(line.split("=")[1]))
).timestamp()
except IndexError:
logger.debug("TRCReader: Failed to parse start time")
elif line.startswith(";$COLUMNS"):
logger.debug("TRCReader: Found columns '%s'", line)
try:
columns = line.split("=")[1].split(",")
self.columns = {column: columns.index(column) for column in columns}
self._num_columns = len(columns) - 1
except IndexError:
logger.debug("TRCReader: Failed to parse columns")
elif line.startswith(";"):
Expand All @@ -107,7 +115,7 @@ def _extract_header(self):
break

if self.file_version >= TRCFileVersion.V1_1:
if self.start_time is None:
if self._start_time is None:
raise ValueError("File has no start time information")

if self.file_version >= TRCFileVersion.V2_0:
Expand All @@ -132,7 +140,7 @@ def _extract_header(self):

return line

def _parse_msg_v1_0(self, cols: List[str]) -> Optional[Message]:
def _parse_msg_v1_0(self, cols: Tuple[str, ...]) -> Optional[Message]:
arbit_id = cols[2]
if arbit_id == "FFFFFFFF":
logger.info("TRCReader: Dropping bus info line")
Expand All @@ -147,16 +155,11 @@ def _parse_msg_v1_0(self, cols: List[str]) -> Optional[Message]:
msg.data = bytearray([int(cols[i + 4], 16) for i in range(msg.dlc)])
return msg

def _parse_msg_v1_1(self, cols: List[str]) -> Optional[Message]:
def _parse_msg_v1_1(self, cols: Tuple[str, ...]) -> Optional[Message]:
arbit_id = cols[3]

msg = Message()
if isinstance(self.start_time, datetime):
msg.timestamp = (
self.start_time + timedelta(milliseconds=float(cols[1]))
).timestamp()
else:
msg.timestamp = float(cols[1]) / 1000
msg.timestamp = float(cols[1]) / 1000 + self._start_time
msg.arbitration_id = int(arbit_id, 16)
msg.is_extended_id = len(arbit_id) > 4
msg.channel = 1
Expand All @@ -165,16 +168,11 @@ def _parse_msg_v1_1(self, cols: List[str]) -> Optional[Message]:
msg.is_rx = cols[2] == "Rx"
return msg

def _parse_msg_v1_3(self, cols: List[str]) -> Optional[Message]:
def _parse_msg_v1_3(self, cols: Tuple[str, ...]) -> Optional[Message]:
arbit_id = cols[4]

msg = Message()
if isinstance(self.start_time, datetime):
msg.timestamp = (
self.start_time + timedelta(milliseconds=float(cols[1]))
).timestamp()
else:
msg.timestamp = float(cols[1]) / 1000
msg.timestamp = float(cols[1]) / 1000 + self._start_time
msg.arbitration_id = int(arbit_id, 16)
msg.is_extended_id = len(arbit_id) > 4
msg.channel = int(cols[2])
Expand All @@ -183,7 +181,7 @@ def _parse_msg_v1_3(self, cols: List[str]) -> Optional[Message]:
msg.is_rx = cols[3] == "Rx"
return msg

def _parse_msg_v2_x(self, cols: List[str]) -> Optional[Message]:
def _parse_msg_v2_x(self, cols: Tuple[str, ...]) -> Optional[Message]:
type_ = cols[self.columns["T"]]
bus = self.columns.get("B", None)

Expand All @@ -192,50 +190,43 @@ def _parse_msg_v2_x(self, cols: List[str]) -> Optional[Message]:
dlc = len2dlc(length)
elif "L" in self.columns:
dlc = int(cols[self.columns["L"]])
length = dlc2len(dlc)
else:
raise ValueError("No length/dlc columns present.")

msg = Message()
if isinstance(self.start_time, datetime):
msg.timestamp = (
self.start_time + timedelta(milliseconds=float(cols[self.columns["O"]]))
).timestamp()
else:
msg.timestamp = float(cols[1]) / 1000
msg.timestamp = float(cols[self.columns["O"]]) / 1000 + self._start_time
msg.arbitration_id = int(cols[self.columns["I"]], 16)
msg.is_extended_id = len(cols[self.columns["I"]]) > 4
msg.channel = int(cols[bus]) if bus is not None else 1
msg.dlc = dlc
msg.data = bytearray(
[int(cols[i + self.columns["D"]], 16) for i in range(length)]
)
if dlc:
msg.data = bytearray.fromhex(cols[self.columns["D"]])
msg.is_rx = cols[self.columns["d"]] == "Rx"
msg.is_fd = type_ in ["FD", "FB", "FE", "BI"]
msg.bitrate_switch = type_ in ["FB", " FE"]
msg.error_state_indicator = type_ in ["FE", "BI"]
msg.is_fd = type_ in {"FD", "FB", "FE", "BI"}
msg.bitrate_switch = type_ in {"FB", "FE"}
msg.error_state_indicator = type_ in {"FE", "BI"}

return msg

def _parse_cols_v1_1(self, cols: List[str]) -> Optional[Message]:
def _parse_cols_v1_1(self, cols: Tuple[str, ...]) -> Optional[Message]:
dtype = cols[2]
if dtype in ("Tx", "Rx"):
return self._parse_msg_v1_1(cols)
else:
logger.info("TRCReader: Unsupported type '%s'", dtype)
return None

def _parse_cols_v1_3(self, cols: List[str]) -> Optional[Message]:
def _parse_cols_v1_3(self, cols: Tuple[str, ...]) -> Optional[Message]:
dtype = cols[3]
if dtype in ("Tx", "Rx"):
return self._parse_msg_v1_3(cols)
else:
logger.info("TRCReader: Unsupported type '%s'", dtype)
return None

def _parse_cols_v2_x(self, cols: List[str]) -> Optional[Message]:
def _parse_cols_v2_x(self, cols: Tuple[str, ...]) -> Optional[Message]:
dtype = cols[self.columns["T"]]
if dtype in ["DT", "FD", "FB"]:
if dtype in {"DT", "FD", "FB", "FE", "BI"}:
return self._parse_msg_v2_x(cols)
else:
logger.info("TRCReader: Unsupported type '%s'", dtype)
Expand All @@ -244,7 +235,7 @@ def _parse_cols_v2_x(self, cols: List[str]) -> Optional[Message]:
def _parse_line(self, line: str) -> Optional[Message]:
logger.debug("TRCReader: Parse '%s'", line)
try:
cols = line.split()
cols = tuple(line.split(maxsplit=self._num_columns))
zariiii9003 marked this conversation as resolved.
Show resolved Hide resolved
return self._parse_cols(cols)
except IndexError:
logger.warning("TRCReader: Failed to parse message '%s'", line)
Expand Down
Loading