-
Notifications
You must be signed in to change notification settings - Fork 0
/
MTK_DataHandler.py
193 lines (150 loc) · 7.63 KB
/
MTK_DataHandler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
import zipfile
import numpy as np
import matplotlib.pyplot as plt
import os
class FlightDataHandler:
def __init__(self, launch_date = None, board_version = None) -> None:
self.file_path = None
self.launch_date = launch_date
self.board_version = board_version
self.packet_data = None
self.state_data = None
self.state_activations = {}
self.state_column_to_index = {'arming_time':2,
'apogee_time':7, 'drogue_time': 8, 'main_time': 9,
'sus_time':10}
self.config = None
self.column_to_index = {'magic':0, 'status':1, 'time_us':2, 'main_voltage_v':3, 'pyro_voltage_v':4, 'numSatellites':5, 'gpsFixType':6, 'latitude_degrees':7,
'longitude_degrees':8, 'gps_hMSL_m':9, 'barometer_hMSL_m':10, 'temperature_c':11, 'acceleration_x_mss':12, 'acceleration_y_mss':13, 'acceleration_z_mss':14,
'angular_velocity_x_rads':15, 'angular_velocity_y_rads':16, 'angular_velocity_z_rads':17, 'gauss_x':18, 'gauss_y':19, 'gauss_z':20,
'kf_acceleration_mss':21, 'kf_velocity_ms':22, 'kf_position_m':23, 'w':24, 'x':25, 'y':26, 'z':27, 'checksum':28}
def update_column_to_index(self,updated):
self.column_to_index = updated
def update_board_version(self,board_version):
self.board_version = board_version
def update_launch_data(self, launch_date):
self.launch_date = launch_date
def from_np(self, file_path, launch_date=None, board_version=None, packet_data=True):
# Check if the file exists
if not os.path.isfile(file_path):
raise FileNotFoundError(f"Error: The file '{file_path}' does not exist.")
self.launch_date = launch_date if launch_date is not None else print("Warning. No launch date provided.")
self.board_version = board_version if board_version is not None else print("Warning. No board version provided.")
try:
# Load packet or state data from the .npy file
if packet_data:
self.packet_data = np.load(file_path)
else:
self.state_data = np.load(file_path)
except Exception as e:
raise IOError(f"Error loading data from file '{file_path}': {e}")
def from_zip(self, file_path, launch_date=None, board_version=None, csv=False):
self.launch_date = launch_date if launch_date is not None else print("Warning. No launch date provided.")
self.board_version = board_version if board_version is not None else print("Warning. No board version provided.")
try:
with zipfile.ZipFile(file_path, 'r') as zip_ref:
file_list = zip_ref.namelist()
packet_file = "packet_data.csv" if csv else "packet_data.npy"
state_file = "state_data.csv" if csv else "state_data.npy"
if packet_file in file_list:
with zip_ref.open(packet_file) as data_file:
if csv:
self.packet_data = np.loadtxt(data_file, delimiter=',', skiprows=1)
else:
self.packet_data = np.load(data_file)
else:
raise FileNotFoundError(f"Error: '{packet_file}' is missing in the ZIP file.")
# Check and load 'state_data'
if state_file in file_list:
with zip_ref.open(state_file) as data_file:
if csv:
self.state_data = np.loadtxt(data_file, delimiter=',', skiprows=1)
else:
self.state_data = np.load(data_file)
else:
raise FileNotFoundError(f"Error: '{state_file}' is missing in the ZIP file.")
# Check and load '.config' file
config_file_name = next((f for f in file_list if f.endswith('.config')), None)
if config_file_name:
with zip_ref.open(config_file_name) as config_file:
self.config = config_file.read().decode('utf-8')
else:
raise FileNotFoundError("Error: '.config' file is missing in the ZIP file.")
except zipfile.BadZipFile:
print("Error: The provided file is not a valid ZIP file.")
except FileNotFoundError as e:
print(e)
def plot_column(self, column_name, title=None, start=None, end=None):
# Check if packet_data is loaded
if self.packet_data is None:
print("No data loaded.")
return
# Check if the column name exists
if column_name not in self.column_to_index:
print(f"Column: '{column_name}' not found in data.")
return
# Obtaine the index and intialize start and end plotting index if not specified
index = self.column_to_index[column_name]
if start is None:
start = 0
if end is None:
end = self.packet_data.shape[0]
# Obtain indexed data
column_data = self.packet_data[start:end, index]
plt.plot(range(start, end), column_data, label=column_name)
plt.title(title or f"{column_name} over Time")
plt.xlabel("Index")
plt.ylabel(column_name)
plt.legend()
plt.grid(True) # Add grid for better visualization
plt.show()
def display_metadata(self):
print(f"Launch Date: {self.launch_date}")
print(f"Board Version: {self.board_version}")
def find_stages(self):
#populates state_activations with column_name value pairings
if self.state_data is None:
print("No state data loaded.")
return
for stage, col in self.state_column_to_index.items():
column = self.state_data[:, col]
column = (column > 0.5).astype(int)
activation = np.where(np.diff(column) ==1)[0]
if len(activation) > 0:
self.state_activations[stage] = activation[0]
else:
self.state_activations[stage] = None
print(stage, activation[0])
def plot_column_with_states(self, column_name, title=None, start=None, end=None):
# Check if packet_data is loaded
if self.packet_data is None:
print("No data loaded.")
return
# Check if the column name exists
if column_name not in self.column_to_index:
print(f"Column: '{column_name}' not found in data.")
return
# Obtaine the index and intialize start and end plotting index if not specified
index = self.column_to_index[column_name]
if start is None:
start = 0
if end is None:
end = self.packet_data.shape[0]
# Obtain indexed data
column_data = self.packet_data[start:end, index]
#staging additions
colors = ['b', 'g', 'r', 'c', 'm', 'y', 'k', '#FF5733']
if not self.state_activations:
print("No state data loaded/parsed.")
return
for i, (stage_name, stage_time) in enumerate(self.state_activations.items()):
if start <= stage_time < end:
color = colors[i % len(colors)]
plt.axvline(x=stage_time, color=color, linestyle='--', label=stage_name)
plt.plot(range(start, end), column_data, label=column_name)
plt.title(title or f"{column_name} over Time")
plt.xlabel("Index")
plt.ylabel(column_name)
plt.legend()
plt.grid(True) # Add grid for better visualization
plt.show()