Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More tests #62

Merged
merged 14 commits into from
Oct 27, 2024
4 changes: 2 additions & 2 deletions airbrakes/data_handling/apogee_predictor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class ApogeePredictor:

:param: state: airbrakes state class
:param: data_processor: IMUDataProcessor class
:param: data_points: A sequence of EstimatedDataPacket objects to process.
:param: data_packets: A sequence of EstimatedDataPacket objects to process.
"""

__slots__ = (
Expand Down Expand Up @@ -178,7 +178,7 @@ def _prediction_loop(self) -> None:

for data_packet in data_packets:
self._accelerations.append(data_packet.vertical_acceleration)
self._time_differences.append(data_packet.time_since_last_data_point)
self._time_differences.append(data_packet.time_since_last_data_packet)
self._current_altitude = data_packet.current_altitude
self._current_velocity = data_packet.vertical_velocity

Expand Down
159 changes: 84 additions & 75 deletions airbrakes/data_handling/data_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,18 @@ class IMUDataProcessor:
__slots__ = (
"_current_altitudes",
"_current_orientation_quaternions",
"_data_points",
"_first_data_point",
"_data_packets",
"_gravity_axis_index",
"_gravity_direction",
"_gravity_orientation",
"_gravity_upwards_index",
"_initial_altitude",
"_last_data_point",
"_last_data_packet",
"_max_altitude",
"_max_vertical_velocity",
"_previous_vertical_velocity",
"_rotated_accelerations",
"_time_differences",
"_vertical_velocities",
"upside_down",
)

def __init__(self):
Expand All @@ -51,14 +49,13 @@ def __init__(self):
self._previous_vertical_velocity: np.float64 = np.float64(0.0)
self._initial_altitude: np.float64 | None = None
self._current_altitudes: npt.NDArray[np.float64] = np.array([0.0], dtype=np.float64)
self._last_data_point: EstimatedDataPacket | None = None
self._first_data_point: EstimatedDataPacket | None = None
self._last_data_packet: EstimatedDataPacket | None = None
self._current_orientation_quaternions: npt.NDArray[np.float64] | None = None
self._rotated_accelerations: list[npt.NDArray[np.float64]] = [np.array([0.0]), np.array([0.0]), np.array([0.0])]
self._data_points: list[EstimatedDataPacket] = []
self._data_packets: list[EstimatedDataPacket] = []
self._time_differences: npt.NDArray[np.float64] = np.array([0.0])
self._gravity_orientation: npt.NDArray[np.float64] | None = None
self._gravity_upwards_index: int | None = 0
self._gravity_axis_index: int | None = 0
self._gravity_direction: np.float64 | None = None

def __str__(self) -> str:
Expand Down Expand Up @@ -93,56 +90,22 @@ def max_vertical_velocity(self) -> float:
"""The maximum vertical velocity the rocket has attained during the flight, in m/s."""
return float(self._max_vertical_velocity)

def update(self, data_points: list[EstimatedDataPacket]) -> None:
def update(self, data_packets: list[EstimatedDataPacket]) -> None:
"""
Updates the data points to process. This will recompute all information such as altitude,
velocity, etc.
:param data_points: A list of EstimatedDataPacket objects to process
:param data_packets: A list of EstimatedDataPacket objects to process
"""
# If the data points are empty, we don't want to try to process anything
if not data_points:
if not data_packets:
return

self._data_points = data_points
self._data_packets = data_packets

# If we don't have a last data point, we can't calculate the time differences needed
# for velocity calculation:
if self._last_data_point is None:
# setting last data point as the first element, makes it so that the time diff
# automatically becomes 0, and the velocity becomes 0
self._last_data_point = self._data_points[0]
# This is us getting the rocket's initial orientation

self._current_orientation_quaternions = np.array(
[
self._last_data_point.estOrientQuaternionW,
self._last_data_point.estOrientQuaternionX,
self._last_data_point.estOrientQuaternionY,
self._last_data_point.estOrientQuaternionZ,
]
)

# We also get the initial gravity vector to determine which direction is up
# Important to note that when the compensated acceleration reads -9.8 when on the
# ground, the upwards direction of the gravity vector will be positive, not negative
gravity_orientation = np.array(
[
self._last_data_point.estGravityVectorX,
self._last_data_point.estGravityVectorY,
self._last_data_point.estGravityVectorZ,
]
)

# Gets the index for the direction (x, y, or z) that is pointing upwards
self._gravity_upwards_index = np.argmax(np.abs(gravity_orientation))

# on the physical IMU there is a depiction of the orientation. If a negative direction is
# pointing to the sky, by convention, we define the gravity direction as negative. Otherwise if
# a positive direction is pointing to the sky, we define the gravity direction as positive.
# For purposes of standardizing the sign of accelerations that come out of calculate_rotated_accelerations,
# we define acceleration from motor burn as positve, acceleration due to drag as negative, and
# acceleration on the ground to be +9.8.
self._gravity_direction = 1 if gravity_orientation[self._gravity_upwards_index] < 0 else -1
if self._last_data_packet is None:
self._first_update()

self._time_differences = self._calculate_time_differences()

Expand All @@ -154,7 +117,7 @@ def update(self, data_points: list[EstimatedDataPacket]) -> None:
self._max_altitude = max(self._current_altitudes.max(), self._max_altitude)

# Store the last data point for the next update
self._last_data_point = data_points[-1]
self._last_data_packet = data_packets[-1]

def get_processed_data_packets(self) -> deque[ProcessedDataPacket]:
"""
Expand All @@ -169,24 +132,26 @@ def get_processed_data_packets(self) -> deque[ProcessedDataPacket]:
current_altitude=current_alt,
vertical_velocity=vertical_velocity,
vertical_acceleration=vertical_acceleration,
time_since_last_data_point=time_since_last_data_point,
time_since_last_data_packet=time_since_last_data_packet,
)
for (
current_alt,
vertical_velocity,
vertical_acceleration,
time_since_last_data_point,
time_since_last_data_packet,
) in zip(
self._current_altitudes,
self._vertical_velocities,
self._rotated_accelerations[self._gravity_upwards_index],
self._rotated_accelerations[self._gravity_axis_index],
self._time_differences,
strict=True,
)
)

@staticmethod
def _multiply_quaternions(first_quaternion: npt.NDArray[np.float64], second_quaternion: npt.NDArray[np.float64]):
def _multiply_quaternions(
first_quaternion: npt.NDArray[np.float64], second_quaternion: npt.NDArray[np.float64]
) -> npt.NDArray[np.float64]:
"""
Calculates the quaternion multiplication. quaternion multiplication is not commutative, e.g. q1q2 =/= q2q1
:param first_quaternion: numpy array with the first quaternion in row form
Expand All @@ -203,7 +168,7 @@ def _multiply_quaternions(first_quaternion: npt.NDArray[np.float64], second_quat
return np.array([w, x, y, z])

@staticmethod
def _calculate_quaternion_conjugate(quaternion: npt.NDArray[np.float64]):
def _calculate_quaternion_conjugate(quaternion: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]:
"""
Calculates the conjugate of a quaternion
:param quaternion: numpy array with a quaternion in row form
Expand All @@ -212,16 +177,60 @@ def _calculate_quaternion_conjugate(quaternion: npt.NDArray[np.float64]):
w, x, y, z = quaternion
return np.array([w, -x, -y, -z])

def _first_update(self) -> None:
"""
Sets up the initial values for the data processor. This includes setting the initial
altitude, the initial orientation of the rocket, and the initial gravity vector. This should
only be called once, when the first data packets are passed in.
"""
# Setting last data point as the first element, makes it so that the time diff
# automatically becomes 0, and the velocity becomes 0
self._last_data_packet = self._data_packets[0]

# This is us getting the rocket's initial altitude from the mean of the first data packets
self._initial_altitude = np.mean(
np.array([data_packet.estPressureAlt for data_packet in self._data_packets], dtype=np.float64)
)

# This is us getting the rocket's initial orientation
self._current_orientation_quaternions = np.array(
[
self._last_data_packet.estOrientQuaternionW,
self._last_data_packet.estOrientQuaternionX,
self._last_data_packet.estOrientQuaternionY,
self._last_data_packet.estOrientQuaternionZ,
]
)

# We also get the initial gravity vector to determine which direction is up
# Important to note that when the compensated acceleration reads -9.8 when on the
# ground, the upwards direction of the gravity vector will be positive, not negative
gravity_orientation = np.array(
[
self._last_data_packet.estGravityVectorX,
self._last_data_packet.estGravityVectorY,
self._last_data_packet.estGravityVectorZ,
]
)

# Gets the index for the direction (x, y, or z) that is pointing upwards
self._gravity_axis_index = np.argmax(np.abs(gravity_orientation))

# on the physical IMU there is a depiction of the orientation. If a negative direction is
# pointing to the sky, by convention, we define the gravity direction as negative. Otherwise, if
# a positive direction is pointing to the sky, we define the gravity direction as positive.
# For purposes of standardizing the sign of accelerations that come out of calculate_rotated_accelerations,
# we define acceleration from motor burn as positive, acceleration due to drag as negative, and
# acceleration on the ground to be +9.8.
self._gravity_direction = 1 if gravity_orientation[self._gravity_axis_index] < 0 else -1

def _calculate_current_altitudes(self) -> npt.NDArray[np.float64]:
"""
Calculates the current altitudes, by zeroing out the initial altitude.
:return: A numpy array of the current altitudes of the rocket at each data point
"""
# Get the pressure altitudes from the data points
altitudes = np.array([data_point.estPressureAlt for data_point in self._data_points], dtype=np.float64)
# Zero the altitude only once, during the first update:
if self._initial_altitude is None:
self._initial_altitude = np.mean(altitudes)
altitudes = np.array([data_packet.estPressureAlt for data_packet in self._data_packets], dtype=np.float64)
# Zero out the initial altitude
return altitudes - self._initial_altitude

Expand All @@ -235,26 +244,26 @@ def _calculate_rotated_accelerations(self) -> list[npt.NDArray[np.float64]]:
"""

# We pre-allocate the space for our accelerations first
len_data_points = len(self._data_points)
len_data_packets = len(self._data_packets)
rotated_accelerations = [
np.zeros(len_data_points),
np.zeros(len_data_points),
np.zeros(len_data_points),
np.zeros(len_data_packets),
np.zeros(len_data_packets),
np.zeros(len_data_packets),
]

# Iterates through the data points and time differences between the data points
for idx, (data_point, dt) in enumerate(zip(self._data_points, self._time_differences, strict=True)):
for idx, (data_packet, dt) in enumerate(zip(self._data_packets, self._time_differences, strict=True)):
# Accelerations are in m/s^2
x_accel = data_point.estCompensatedAccelX
y_accel = data_point.estCompensatedAccelY
z_accel = data_point.estCompensatedAccelZ
x_accel = data_packet.estCompensatedAccelX
y_accel = data_packet.estCompensatedAccelY
z_accel = data_packet.estCompensatedAccelZ
# Angular rates are in rads/s
gyro_x = data_point.estAngularRateX
gyro_y = data_point.estAngularRateY
gyro_z = data_point.estAngularRateZ
gyro_x = data_packet.estAngularRateX
gyro_y = data_packet.estAngularRateY
gyro_z = data_packet.estAngularRateZ

# If we are missing the data points, then say we didn't rotate
if not any([x_accel, y_accel, z_accel, gyro_x, gyro_y, gyro_z]):
# If we are missing the data points, then say accelerations are zero
if any(val is None for val in [x_accel, y_accel, z_accel, gyro_x, gyro_y, gyro_z]):
return rotated_accelerations

# rotation matrix for rate of change quaternion, with epsilon and K used to drive the norm to 1
Expand Down Expand Up @@ -301,7 +310,7 @@ def _calculate_vertical_velocity(self) -> npt.NDArray[np.float64]:
vertical_accelerations = np.array(
[
deadband(vertical_acceleration - GRAVITY, ACCELERATION_NOISE_THRESHOLD)
for vertical_acceleration in self._rotated_accelerations[self._gravity_upwards_index]
for vertical_acceleration in self._rotated_accelerations[self._gravity_axis_index]
]
)

Expand All @@ -318,11 +327,11 @@ def _calculate_vertical_velocity(self) -> npt.NDArray[np.float64]:
def _calculate_time_differences(self) -> npt.NDArray[np.float64]:
"""
Calculates the time difference between each data point and the previous data point. This cannot
be called on the first update as _last_data_point is None.
be called on the first update as _last_data_packet is None.
:return: A numpy array of the time difference between each data point and the previous data point.
"""
# calculate the time differences between each data point
# We are converting from ns to s, since we don't want to have a velocity in m/ns^2
# We are using the last data point to calculate the time difference between the last data point from the
# previous loop, and the first data point from the current loop
return np.diff([data_point.timestamp for data_point in [self._last_data_point, *self._data_points]]) * 1e-9
return np.diff([data_packet.timestamp for data_packet in [self._last_data_packet, *self._data_packets]]) * 1e-9
2 changes: 1 addition & 1 deletion airbrakes/data_handling/processed_data_packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ class ProcessedDataPacket(msgspec.Struct):
current_altitude: np.float64 # This is the zeroed-out altitude of the rocket.
vertical_velocity: np.float64 # This is the velocity of the rocket, in the upward axis (whichever way is up)
vertical_acceleration: np.float64 # This is the rotated compensated acceleration of the vertical axis
time_since_last_data_point: np.float64 # dt is the time difference between the current and previous data point
time_since_last_data_packet: np.float64 # dt is the time difference between the current and previous data point
2 changes: 1 addition & 1 deletion tests/test_airbrakes.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def log(self, state, extension, imu_data_packets, processed_data_packets):

assert mocked_airbrakes.imu._data_queue.qsize() > 0
assert mocked_airbrakes.state.name == "StandByState"
assert mocked_airbrakes.data_processor._last_data_point is None
assert mocked_airbrakes.data_processor._last_data_packet is None

monkeypatch.setattr(data_processor.__class__, "update", update)
monkeypatch.setattr(mocked_airbrakes.state.__class__, "update", state)
Expand Down
Loading
Loading