99 lines
3.4 KiB
Python
99 lines
3.4 KiB
Python
|
|
packet_definitions = """packet_rtc 1 28 uint32_t t 0 4 RTC_TimeTypeDef sTime 4 20 RTC_DateTypeDef sDate 24 4
|
|
packet_vbatt 2 8 uint32_t t 0 4 uint16_t vbatt_cnts 4 2
|
|
packet_imu 3 148 uint32_t t 0 4 uint8_t data[141] 4 1
|
|
packet_adc 4 268 uint32_t t 0 4 uint8_t index 4 1 int32_t ekg_readings_cnts[50] 8 4 int32_t str_readings_cnts[5] 208 4 int32_t oT_readings_cnts[5] 228 4 int32_t iT_readings_cnts[5] 248 4
|
|
packet_spo2 5 184 uint32_t t 0 4 uint8_t bytes[180] 4 1
|
|
packet_msg 6 36 uint32_t t 0 4 char buff[32] 4 1"""
|
|
|
|
def arr_sizes(s):
|
|
if s[-1:] != b']' or b'[' not in s:
|
|
return 1
|
|
v = int(s[s.rindex(b'[') + 1:-1])
|
|
return v
|
|
|
|
def get_type_list(lines):
|
|
|
|
types = []
|
|
|
|
for line in lines:
|
|
if line != b'':
|
|
L = line.split(b" ")
|
|
types.append({'type_name' : L[0],
|
|
'type_code' : int(L[1]),
|
|
'size' : int(L[2]),
|
|
'elements' : []
|
|
})
|
|
i = 3
|
|
while i < len(L):
|
|
types[-1]['elements'].append({'type_name' : L[i], 'name' : L[i + 1], 'offset' : int(L[i + 2]), 'n_elements' : arr_sizes(L[i + 1]), 'size' : int(L[i + 3]) * arr_sizes(L[i + 1])})
|
|
i += 4
|
|
return types
|
|
|
|
def my_filter(arr):
|
|
inds = np.where(np.abs(np.diff(arr)) > 0.005)[0]
|
|
for ind in inds:
|
|
prev_ind = ind - 1
|
|
while prev_ind in inds:
|
|
prev_ind -= 1
|
|
next_ind = ind + 1
|
|
while next_ind in inds:
|
|
next_ind += 1
|
|
arr[ind] = 0.5 * arr[prev_ind] + 0.5 * arr[next_ind]
|
|
return arr
|
|
|
|
accs = []
|
|
gyros = []
|
|
def process_imu(d, t):
|
|
global accs
|
|
global gyros
|
|
for e in t['elements']:
|
|
block = d[e['offset']:e['offset'] + e['size']]
|
|
element_size = int(len(block) / e['n_elements'])
|
|
if e['name'] == b'data[141]':
|
|
for i in range(20):
|
|
reading = block[1 + 7 * i : 8 + 7 * i]
|
|
#print(reading)
|
|
imu_reading_type = reading[0] >> 3
|
|
imu_reading_tag_cnt = (reading[0] >> 1) & 3
|
|
data = np.array([int.from_bytes(reading[2 * i + 1 : 2 * i + 3], byteorder = 'little', signed = True) for i in range(3)])
|
|
if imu_reading_type == 1:
|
|
gyros.append(250 / (1<<16) * data)
|
|
elif imu_reading_type == 2:
|
|
accs.append(4 / (1<<16) * data)
|
|
else:
|
|
assert False
|
|
if len(gyros) > 400:
|
|
gyros = gyros[-400:]
|
|
if len(accs) > 400:
|
|
accs = accs[-400:]
|
|
fig, axs = plt.subplots(2)
|
|
g = np.array(gyros)
|
|
a = np.array(accs)
|
|
axs[0].set_ylabel("dps")
|
|
axs[0].plot(g[:,0])
|
|
axs[0].plot(g[:,1])
|
|
axs[0].plot(g[:,2])
|
|
axs[1].set_ylabel("g")
|
|
axs[1].plot(a[:,0])
|
|
axs[1].plot(a[:,1])
|
|
axs[1].plot(a[:,2])
|
|
plt.savefig("acc_gyro.png")
|
|
plt.close()
|
|
|
|
def read_and_process(types, cons, size):
|
|
index = 0
|
|
while (index < size):
|
|
packet_type = cons[index]
|
|
t = [t for t in types if t['type_code'] == packet_type][0]
|
|
print(index, packet_type, t['type_name'])
|
|
d = cons[index + 1 : index + 1 + t['size']]
|
|
if t['type_name'] == b'packet_imu':
|
|
pass
|
|
#process_imu(d, t)
|
|
if t['type_name'] == b'packet_msg':
|
|
print(d)
|
|
index += 1 + t['size']
|
|
|
|
|