Files
chest_strap/code/l452_code/packet_parser_helpers.py
T
2026-05-26 08:11:29 -07:00

296 lines
12 KiB
Python

import numpy as np
import matplotlib.pyplot as plt
import scipy as sp
import time
packet_definitions = b"""packet_rtc 1 28 uint32_t t 0 4 RTC_TimeTypeDef sTime 4 20 RTC_DateTypeDef sDate 24 4
packet_vbatt 2 8 uint32_t t 0 4 uint16_t vbatt_cnts 4 2
packet_imu 3 148 uint32_t t 0 4 uint8_t data[141] 4 1
packet_adc 4 268 uint32_t t 0 4 uint8_t index 4 1 int32_t ekg_readings_cnts[50] 8 4 int32_t str_readings_cnts[5] 208 4 int32_t oT_readings_cnts[5] 228 4 int32_t iT_readings_cnts[5] 248 4
packet_spo2 5 184 uint32_t t 0 4 uint8_t bytes[180] 4 1
packet_msg 6 36 uint32_t t 0 4 char buff[32] 4 1""".split(b'\n')
def arr_sizes(s):
if s[-1:] != b']' or b'[' not in s:
return 1
v = int(s[s.rindex(b'[') + 1:-1])
return v
def get_type_list(lines):
types = []
for line in lines:
if line != b'':
L = line.split(b" ")
types.append({'type_name' : L[0],
'type_code' : int(L[1]),
'size' : int(L[2]),
'elements' : []
})
i = 3
while i < len(L):
types[-1]['elements'].append({'type_name' : L[i], 'name' : L[i + 1], 'offset' : int(L[i + 2]), 'n_elements' : arr_sizes(L[i + 1]), 'size' : int(L[i + 3]) * arr_sizes(L[i + 1])})
i += 4
return types
reds = []
irs = []
greens = []
ppg_freq_Hz = 50
def process_ppg(d, t):
global greens, reds, irs
for e in t['elements']:
block = d[e['offset']:e['offset'] + e['size']]
element_size = int(len(block) / e['n_elements'])
if e['name'] == b'bytes[180]':
reds += [int.from_bytes(block[3 * i : 3 * i + 3], byteorder = 'big') for i in range(0,60,3)]
irs += [int.from_bytes(block[3 * i : 3 * i + 3], byteorder = 'big') for i in range(1,60,3)]
greens += [int.from_bytes(block[3 * i : 3 * i + 3], byteorder = 'big') for i in range(2,60,3)]
if len(reds) > 400:
reds = reds[-400:]
irs = irs[-400:]
greens = greens[-400:]
fig, axs = plt.subplots(3)
axs[0].set_title('red')
axs[1].set_title('ir')
axs[2].set_title('green')
axs[0].plot(reds)
axs[1].plot(irs)
axs[2].plot(greens)
plt.savefig("ppg.png")
plt.close()
accs = []
gyros = []
imu_sparse = 0
imu_freq_Hz = 240
def process_imu(d, t):
global accs, gyros, imu_sparse
for e in t['elements']:
block = d[e['offset']:e['offset'] + e['size']]
element_size = int(len(block) / e['n_elements'])
if e['name'] == b'data[141]':
for i in range(20):
reading = block[1 + 7 * i : 8 + 7 * i]
#print(reading)
imu_reading_type = reading[0] >> 3
imu_reading_tag_cnt = (reading[0] >> 1) & 3
data = np.array([int.from_bytes(reading[2 * i + 1 : 2 * i + 3], byteorder = 'little', signed = True) for i in range(3)])
if imu_reading_type == 1:
gyros.append(250 / (1<<16) * data)
elif imu_reading_type == 2:
accs.append(4 / (1<<16) * data)
else:
pass
#assert False
# imu_sparse += 1
# if imu_sparse % 5 == 4:
# tt = int(5 * imu_freq_Hz)
# if len(gyros) > 480:
# gyros = gyros[-1600:]
# accs = accs[-1600:]
# else:
# return
# fig, axs = plt.subplots(2)
# b, a = sp.signal.butter(2, 20 / (0.5 * imu_freq_Hz), btype = 'lowpass')
# g = sp.signal.filtfilt(b,a,np.abs(np.diff(np.array(gyros), axis = 0)), axis = 0)
# a = sp.signal.filtfilt(b,a,np.abs(np.diff(np.array(accs), axis = 0)), axis = 0)
# #np.save("gyros.npy", g)
# #np.save("accs.npy", a)
# a -= np.mean(a, axis = 0).reshape(1,3)
# axs[0].set_ylabel("dps")
# axs[0].plot(g[-tt:,0])
# axs[0].plot(g[-tt:,1])
# axs[0].plot(g[-tt:,2])
# axs[1].set_ylabel("g")
# axs[1].plot(a[-tt:,0])
# axs[1].plot(a[-tt:,1])
# axs[1].plot(a[-tt:,2])
# plt.savefig("acc_gyro.png")
# plt.close()
ecgs = []
t1s = []
t2s = []
strains = []
ts = []
adc_sparse = 0
# Should be running at 488Hz
freq_Hz = 488.28125
max_hr_Hz = 120 / 60
last_adc_graph = 0
r_peaks = []
def process_adc(d, t):
global ecgs, t1s, t2s, strains, adc_sparse, ts, last_adc_graph, r_peaks
for e in t['elements']:
block = d[e['offset']:e['offset'] + e['size']]
element_size = int(len(block) / e['n_elements'])
if e['name'] == b't':
ts.append((1 / 2000) * int.from_bytes(block[:4], byteorder = 'little', signed = True))
if e['name'] == b'ekg_readings_cnts[50]':
ecgs = ecgs + [(2.4 / (1<<24)) * int.from_bytes(block[4 * i : 4 * i + 4], byteorder = 'little', signed = True) for i in range(50)]
if e['name'] == b'str_readings_cnts[5]':
strains = strains + [(2.4 / (1<<24)) * int.from_bytes(block[4 * i : 4 * i + 4], byteorder = 'little', signed = True) for i in range(5)]
if e['name'] == b'oT_readings_cnts[5]':
t1s = t1s + [(2.4 / (1<<24)) * int.from_bytes(block[4 * i : 4 * i + 4], byteorder = 'little', signed = True) for i in range(5)]
if e['name'] == b'iT_readings_cnts[5]':
t2s = t2s + [(2.4 / (1<<24)) * int.from_bytes(block[4 * i : 4 * i + 4], byteorder = 'little', signed = True) for i in range(5)]
if True and time.time() - last_adc_graph > 0.5:# and len(ecgs) > 500:
last_adc_graph = time.time()
#ecgs = ecgs[- int(5 * 60 * freq_Hz):]
#b, a = sp.signal.bessel(2, [1 / (0.5 * freq_Hz), 120 / (0.5 * freq_Hz)], btype = 'bandpass')
#ecgs_ = sp.signal.filtfilt(b, a, ecgs)
if len(r_peaks) > 0:
start = r_peaks[-1]
else:
start = 0
v = np.convolve(np.abs(np.diff(ecgs[start:])), np.ones(5) / 5, mode = 'valid')
inds = [start + e for e in np.where(v > 0.0002)[0]]
last_ind = start
for ind in inds:
if ind - last_ind < freq_Hz / max_hr_Hz:
continue
region_start = ind - int(0.1 * freq_Hz)
region_end = ind + int(0.1 * freq_Hz)
peak = region_start + np.argmax(ecgs[region_start : region_end])
r_peaks.append(peak)
last_ind = ind
fig, axs = plt.subplots(3, 1, height_ratios = [1,1,1])
axs[0].plot(np.arange(len(ecgs))[int(- 5 * freq_Hz):] / freq_Hz, ecgs[int(- 5 * freq_Hz):], 'k.', linestyle='--', alpha = 0.5)
strain_ax = axs[1].twinx()
strain_ax.plot(np.arange(len(t2s)) * 10 / freq_Hz, t2s, 'g', alpha = 0.5)
strain_ax.set_yticks([])
axs[1].plot([],'k.',label = 'R-R int')
#alsho can do poincare plot
# for fft, resample RR interpolation to a 4Hz grid wich cubic spline
# welches method or FFT->PSD
# there's also Lomb-Scargle periodogram
axs[1].plot([],'b.',label = '|delta(R-R int)|')
axs[1].plot([],'g',label = 'chest')
for peak in r_peaks:
axs[0].plot(peak / freq_Hz, ecgs[peak], 'ro')
#fn = sp.interpolate.CubicSpline(0.5 * (r_peaks[1:] + r_peaks[:-1]), np.diff(r_peaks)
# Xs = np.linspace(r_peaks[0], r_peaks[-1], 1000)
#np.log(np.abs(np.fft.rfft()))
axs[1].plot(np.array(r_peaks[:-1]) / freq_Hz, 1000 * np.diff(r_peaks) / freq_Hz, 'k.')
axs[0].set_xlim((len(ecgs) / freq_Hz) - 5, len(ecgs) / freq_Hz)
hrv_ax = axs[1].twinx()
hrv = np.abs(1000 * np.diff(np.diff(r_peaks)) / freq_Hz)
hrv_ax.plot(np.array(r_peaks[2:]) / freq_Hz, hrv, 'b.')
if len(r_peaks) > 10:
N = int(np.ceil(len(r_peaks) / 30))
for i in range(N):
block = 1000 * np.array(r_peaks[30 * i : min(len(r_peaks), 30 * i + 30)]) / freq_Hz
rmssd = np.power(np.sum(np.power(np.diff(np.diff(block)), 2.0)) / (len(block) - 2), 0.5)
sdnn = np.std(np.diff(block))
pNN50 = 100 * np.mean(np.abs(np.diff(np.diff(block))) > 50)
hrv_ax.plot([block[0] / 1000, block[-1] / 1000], [rmssd, rmssd], 'r', alpha = 0.5)
hrv_ax.plot([block[0] / 1000, block[-1] / 1000], [sdnn, sdnn], 'm', alpha = 0.5)
hrv_ax.plot([block[0] / 1000, block[-1] / 1000], [pNN50, pNN50], color = 'orange', alpha = 0.5)
hrv_ax.minorticks_on()
hrv_ax.grid(alpha = 0.5)
hrv_ax.tick_params(axis = 'y', colors = 'blue')
hrv_ax.yaxis.tick_right()
axs[1].set_ylim(0, 1200)
hrv_ax.set_ylim(0, 240)
axs[0].set_xlabel("t (s)")
axs[1].set_xlabel("t (s)")
axs[0].set_ylabel("V (mV)")
axs[1].set_ylabel("beat delta T (ms)")
hrv_ax.set_ylabel("BPS delta T (ms)", color = 'blue')
axs[1].legend(loc='upper left', framealpha = 1)
hrv_ax.set_axisbelow(True)
plt.tight_layout()
#if (len(ecgs) / freq_Hz) > 6:
# plt.show()
# quit()
plt.savefig("hrv_biofeedback.png")
plt.close()
def make_graphs():
fig, axs = plt.subplots(2,2)
b, a = sp.signal.butter(2, [1 / (0.5 * freq_Hz), 120 / (0.5 * freq_Hz)], btype = 'bandpass')
ecgs_ = sp.signal.filtfilt(b, a, ecgs)
r_peaks = sp.signal.find_peaks(ecgs, height = None, threshold = None, distance = freq_Hz / max_hr_Hz, width = 5, prominence = 0.001)[0]
axs[0][0].plot(np.arange(len(ecgs)) / freq_Hz, ecgs_, 'k.', linestyle='--')
for peak in r_peaks:
axs[0][0].plot(peak / freq_Hz, ecgs_[peak], 'ro')
axs[1][0].plot(r_peaks[:-1] / freq_Hz, 1 / (np.diff(r_peaks) /freq_Hz), 'k.')
b, a = sp.signal.butter(2, [0.05 / (0.5 * ppg_freq_Hz), 4 / (0.5 * ppg_freq_Hz)], btype = 'bandpass')
reds_ = sp.signal.filtfilt(b, a, reds)
#irs_ = sp.signal.filtfilt(b, a, irs)
#greens_ = sp.signal.filtfilt(b, a, greens)
P_r = np.log(np.abs(np.fft.rfft(reds_)))
P_i = np.log(np.abs(np.fft.rfft(irs)))
P_g = np.log(np.abs(np.fft.rfft(greens)))
#axs[0][1].plot(np.arange(P_r.shape[0]) * 1 / 200, P_r)
#axs[0][1].plot(P_i)
#axs[0][1].plot(P_g)
axs[0][1].plot(np.arange(len(reds)) / 50, reds_, color = 'red')
#ax2 = axs[0][1].twinx()
#ax2.plot(np.arange(len(reds)) / 50, irs_, color = 'magenta')
#ax3 = ax2.twinx()
#ax3.plot(np.arange(len(reds)) / 50, greens_, color = 'green')
#axs[1][1].plot(np.array(gyros)[:,0])
#axs[1][1].plot(np.array(gyros)[:,1])
#axs[1][1].plot(np.array(gyros)[:,2])
gs = np.array(gyros)
acs = np.array(accs)
f_gs = np.log(np.abs(np.fft.rfft(acs, axis = 0)))
axs[1][1].plot(f_gs[:,0], alpha = 0.25)
axs[1][1].plot(f_gs[:,1], alpha = 0.25)
axs[1][1].plot(f_gs[:,2], alpha = 0.25)
#axs[1][1].plot(strains)
#axs[1][1].plot(t1s)
#axs[1][1].plot(t2s)
#plt.show()
def read_and_process(types, cons, size):
index = 0
while (index < size):
packet_type = cons[index]
print(packet_type)
try:
t = [t for t in types if t['type_code'] == packet_type][0]
except:
print("HERE")
print(cons[index-5:index+5])
quit()
return
print(index, packet_type, t['type_name'])
d = cons[index + 1 : index + 1 + t['size']]
if t['type_name'] == b'packet_imu':
process_imu(d, t)
if t['type_name'] == b'packet_msg':
print(d)
if t['type_name'] == b'packet_adc':
process_adc(d, t)
if t['type_name'] == b'packet_spo2':
process_ppg(d, t)
index += 1 + t['size']