Better analsysi

This commit is contained in:
ggw
2026-05-29 12:17:37 -05:00
parent b4e5c9e291
commit bcc934d06a
+345 -263
View File
@@ -1,9 +1,11 @@
import numpy as np
import matplotlib.pyplot as plt
import matplotlib
import matplotlib.pyplot as plt
import scipy as sp
import time
matplotlib.rcParams['legend.framealpha'] = 1.0
packet_definitions = b"""packet_rtc 1 28 uint32_t t 0 4 RTC_TimeTypeDef sTime 4 20 RTC_DateTypeDef sDate 24 4
packet_vbatt 2 8 uint32_t t 0 4 uint16_t vbatt_cnts 4 2
packet_imu 3 148 uint32_t t 0 4 uint8_t data[141] 4 1
@@ -35,70 +37,110 @@ def get_type_list(lines):
i += 4
return types
RIG_amp = np.zeros((0,3))
ppg_freq_Hz = 50
class ppg_vars():
# Red, IR, Green values
RIGs = np.zeros((0,3))
ts = []
freq_Hz = 50
def process_ppg(d, t):
global RIG_amp
for e in t['elements']:
block = d[e['offset']:e['offset'] + e['size']]
element_size = int(len(block) / e['n_elements'])
if e['name'] == b'bytes[180]':
RIG_amp = np.concatenate((RIG_amp, np.array([int.from_bytes(block[3 * i : 3 * i + 3], byteorder = 'big') for i in range(0,60)]).reshape(20,3)))
if len(ecgs) < ecg_freq_Hz * 100: # 500
return
# top is short again, bottom is long
fig, axs = plt.subplots(2)
for i, ax in enumerate(axs):
b, a = sp.signal.butter(2, 0.05 / (0.5 * ppg_freq_Hz), btype = 'highpass')
b1, a1 = sp.signal.butter(2, 0.05 / (0.5 * 0.1 * ecg_freq_Hz), btype = 'highpass')
if i == 0:
lookback_s = 5
else:
lookback_s = 60
breath_points = int(lookback_s * (0.1 * ecg_freq_Hz))
if i == 1:
ax.plot((len(ecgs) - 10 * np.arange(breath_points)[::-1]) / (ecg_freq_Hz), sp.signal.filtfilt(b1, a1, t2s[-breath_points:]), color = 'black')
r_ax = ax.twinx()
i_ax = ax.twinx()
g_ax = ax.twinx()
tt = - ppg_freq_Hz * lookback_s
xs = len(ecgs) / ecg_freq_Hz - np.arange(len(RIG_amp))[::-1] / ppg_freq_Hz
r_ax.plot(xs[tt:], sp.signal.filtfilt(b, a, RIG_amp[tt:,0]), 'r', alpha = 0.5)
i_ax.plot(xs[tt:], sp.signal.filtfilt(b, a, RIG_amp[tt:,1]), 'm', alpha = 0.5)
g_ax.plot(xs[tt:], sp.signal.filtfilt(b, a, RIG_amp[tt:,2]), 'g', alpha = 0.5)
r_ax.set_ylim(-750,750)
i_ax.set_ylim(-750,750)
g_ax.set_ylim(-750,750)
if i == 0:
for peak in r_peaks:
peak_s = peak / ecg_freq_Hz
if peak_s < xs[-1] and peak_s > xs[tt]:
r_ax.axvline(x = peak_s, color = 'black', alpha = 0.5)
plt.tight_layout()
plt.savefig("ppg.png")
plt.close()
accs = np.zeros((0,3))
gyros = np.zeros((0,3))
imu_freq_Hz = 240
last_imu_graph = 0
imu_ts = []
def process_imu(d, t):
global accs, gyros, imu_sparse, last_imu_graph, imu_ts
global ppg_vars
for e in t['elements']:
block = d[e['offset']:e['offset'] + e['size']]
element_size = int(len(block) / e['n_elements'])
if e['name'] == b't':
imu_ts.append((1 / 2000) * int.from_bytes(block[:4], byteorder = 'little', signed = True))
ppg_vars.ts.append((1 / 2000) * int.from_bytes(block[:4], byteorder = 'little', signed = True))
if e['name'] == b'bytes[180]':
ppg_vars.RIGs = np.concatenate((ppg_vars.RIGs, np.array([int.from_bytes(block[3 * i : 3 * i + 3], byteorder = 'big') for i in range(0,60)]).reshape(20,3)))
if len(ppg_vars.ts) % 3 == 2:
ppg_graph()
def ppg_graph():
#ratio of pulsatile / DC per wavelength gives the SPO2 measure
# Plots of short term PPG vs R peaks and long term PPG vs breathing
fig, axs = plt.subplots(2, 1, height_ratios = [1,2])
for i, ax in enumerate(axs):
if i == 0:
b_ppg, a_ppg = sp.signal.butter(2, [1 / (0.5 * ppg_vars.freq_Hz)], btype = 'highpass')
if i == 1:
b_ppg, a_ppg = sp.signal.butter(2, 0.05 / (0.5 * ppg_vars.freq_Hz), btype = 'highpass')
b_breath, a_breath = sp.signal.butter(2, 0.05 / (0.5 * 0.1 * adc_vars.freq_Hz), btype = 'highpass')
if i == 0:
lookback_s = 5
if i == 1:
lookback_s = 60
breath_points = int((0.1 * adc_vars.freq_Hz) * lookback_s)
ppg_points = int(ppg_vars.freq_Hz * lookback_s)
ts = len(adc_vars.ecgs) / adc_vars.freq_Hz - np.arange(len(ppg_vars.RIGs))[::-1] / ppg_vars.freq_Hz
if i == 0: # plot R wave peaks
for peak in adc_vars.r_peaks:
peak_s = peak / adc_vars.freq_Hz
if peak_s < ts[-1] and peak_s > ts[max(-ppg_points, -len(ts))]:
ax.axvline(x = peak_s, color = 'black', alpha = 0.5)
if i == 1: # plot chest strap strain
ax.plot((len(adc_vars.ecgs) - 10 * np.arange(min(breath_points, len(adc_vars.t2s)))[::-1]) / (adc_vars.freq_Hz), sp.signal.filtfilt(b_breath, a_breath, adc_vars.t2s[-breath_points:]), color = 'black')
ppg_ax = ax.twinx()
if False:
smoothed_ppg = sp.signal.filtfilt(b_ppg, a_ppg, ppg_vars.RIGs[-ppg_points:,:], axis = 0)
else:
smoothed_ppg = ppg_vars.RIGs[-ppg_points:,:] - np.mean(ppg_vars.RIGs[-ppg_points:,:],axis = 0)
ppg_ax.plot(ts[-ppg_points:], smoothed_ppg[:,0], 'r', alpha = 0.5)
ppg_ax.plot(ts[-ppg_points:], smoothed_ppg[:,1], 'm', alpha = 0.5)
ppg_ax.plot(ts[-ppg_points:], smoothed_ppg[:,2], 'g', alpha = 0.5)
#if i == 0:
# lim = 75
#else:
# lim = 250
#ppg_ax.set_ylim(-lim, lim)
ax.set_yticks([])
ppg_ax.set_yticks([])
ax.set_xlabel("t (s)")
if i == 1:
ax.plot([],[],'r',label='Red Val', alpha = 0.5)
ax.plot([],[],'m',label='IR Val', alpha = 0.5)
ax.plot([],[],'g',label='Green Val', alpha = 0.5)
ax.plot([],[],'k',label='Chest Diameter')
ax.legend(loc = 'lower center',
bbox_to_anchor = (0.5, 1.02),
ncol = 4)
plt.tight_layout()
plt.savefig("ppg.png")
plt.close()
class imu_vars:
accs = np.zeros((0,3))
gyros = np.zeros((0,3))
ts = []
freq_Hz = 240
def process_imu(d, t):
global imu_vars
for e in t['elements']:
block = d[e['offset']:e['offset'] + e['size']]
element_size = int(len(block) / e['n_elements'])
if e['name'] == b't':
imu_vars.ts.append((1 / 2000) * int.from_bytes(block[:4], byteorder = 'little', signed = True))
if e['name'] == b'data[141]':
for i in range(20):
reading = block[1 + 7 * i : 8 + 7 * i]
@@ -107,72 +149,69 @@ def process_imu(d, t):
imu_reading_tag_cnt = (reading[0] >> 1) & 3
data = np.array([int.from_bytes(reading[2 * i + 1 : 2 * i + 3], byteorder = 'little', signed = True) for i in range(3)]).reshape(1,3)
if imu_reading_type == 1:
gyros = np.concatenate((gyros, (250 / (1<<16) * data)))
imu_vars.gyros = np.concatenate((imu_vars.gyros, (250 / (1<<16) * data)))
elif imu_reading_type == 2:
accs = np.concatenate((accs, (4 / (1<<16) * data)))
imu_vars.accs = np.concatenate((imu_vars.accs, (4 / (1<<16) * data)))
else:
pass
#assert False
last_imu_graph += 1
if len(ecgs) < ecg_freq_Hz * 100:
assert False
def imu_graph():
return
tt = int(5 * imu_vars.freq_Hz)
if len(gyros) < 480:
return
if (last_imu_graph % 12 == 11):#True and time.time() - last_imu_graph > 0.5:
tt = int(5 * imu_freq_Hz)
if len(gyros) < 480:
return
fig, axs = plt.subplots(2)
b, a = sp.signal.butter(2, 15 / (0.5 * imu_freq_Hz), btype = 'lowpass')
g = sp.signal.filtfilt(b, a, np.abs(np.diff(np.array(gyros), axis = 0)), axis = 0)
a = sp.signal.filtfilt(b, a, np.abs(np.diff(np.array(accs), axis = 0)), axis = 0)
axs[0].set_ylabel("dps")
xs_g = len(ecgs) / ecg_freq_Hz - (np.arange(len(gyros) - 1) / imu_freq_Hz)[::-1]
xs_a = len(ecgs) / ecg_freq_Hz - (np.arange(len(accs) - 1) / imu_freq_Hz)[::-1]
axs[0].plot(xs_g[-tt:], np.sum(g[-tt:,:], axis = 1))
#axs[0].plot(xs_g[-tt:], g[-tt:,1])
#axs[0].plot(xs_g[-tt:], g[-tt:,2])
axs[1].set_ylabel("g")
axs[1].plot(xs_a[-tt:], np.sum(np.power(a[-tt:,:], 2.0), axis = 1))
#axs[1].plot(xs_a[-tt:], a[-tt:,1])
#axs[1].plot(xs_a[-tt:], a[-tt:,2])
for peak in r_peaks:
peak_s = peak / ecg_freq_Hz
if peak_s < xs_a[-1] and peak_s > xs_a[-tt]:
axs[1].axvline(x = peak_s, color = 'black', alpha = 0.5)
axs[0].axvline(x = peak_s, color = 'black', alpha = 0.5)
fig, axs = plt.subplots(2)
b, a = sp.signal.butter(2, 15 / (0.5 * imu_vars.freq_Hz), btype = 'lowpass')
g = sp.signal.filtfilt(b, a, np.abs(np.diff(np.array(imu_vars.gyros), axis = 0)), axis = 0)
a = sp.signal.filtfilt(b, a, np.abs(np.diff(np.array(imu_vars.accs), axis = 0)), axis = 0)
axs[0].set_ylabel("dps")
xs_g = len(adc_vars.ecgs) / adc_vars.freq_Hz - (np.arange(len(imu_vars.gyros) - 1) / imu_freq_Hz)[::-1]
xs_a = len(adc_vars.ecgs) / adc_vars.freq_Hz - (np.arange(len(imu_vars.accs) - 1) / imu_freq_Hz)[::-1]
axs[0].plot(xs_g[-tt:], np.sum(g[-tt:,:], axis = 1))
#axs[0].plot(xs_g[-tt:], g[-tt:,1])
#axs[0].plot(xs_g[-tt:], g[-tt:,2])
axs[1].set_ylabel("g")
axs[1].plot(xs_a[-tt:], np.sum(np.power(a[-tt:,:], 2.0), axis = 1))
#axs[1].plot(xs_a[-tt:], a[-tt:,1])
#axs[1].plot(xs_a[-tt:], a[-tt:,2])
for peak in adc_vars.r_peaks:
peak_s = peak / adc_vars.freq_Hz
if peak_s < xs_a[-1] and peak_s > xs_a[-tt]:
axs[1].axvline(x = peak_s, color = 'black', alpha = 0.5)
axs[0].axvline(x = peak_s, color = 'black', alpha = 0.5)
#axs[0].set_ylim(0,1.0)
#axs[1].set_ylim(0,0.05)
plt.tight_layout()
plt.savefig("acc_gyro.png")
plt.close()
if len(r_peaks) > 30:
if len(adc_vars.r_peaks) > 30:
fig, axs = plt.subplots(2)
for i in range(1,30):
xs_g = len(ecgs) / ecg_freq_Hz - (np.arange(len(gyros)) / imu_freq_Hz)[::-1]
ys = np.interp(np.linspace(r_peaks[-i] / ecg_freq_Hz - 1.0, r_peaks[-i] / ecg_freq_Hz + 1.0, 240),
xs_g = len(adc_vars.ecgs) / adc_vars.freq_Hz - (np.arange(len(gyros)) / imu_freq_Hz)[::-1]
ys = np.interp(np.linspace(adc_vars.r_peaks[-i] / adc_vars.freq_Hz - 1.0, adc_vars.r_peaks[-i] / adc_vars.freq_Hz + 1.0, 240),
xs_g,
gyros[:,0])
axs[0].plot(np.convolve(np.abs(np.diff(ys)),np.ones(10),mode='valid'), 'k.', alpha = 0.05)
ys = np.interp(np.linspace(r_peaks[-i] / ecg_freq_Hz - 1.0, r_peaks[-i] / ecg_freq_Hz + 1.0, 240),
ys = np.interp(np.linspace(adc_vars.r_peaks[-i] / adc_vars.freq_Hz - 1.0, adc_vars.r_peaks[-i] / adc_vars.freq_Hz + 1.0, 240),
xs_g,
gyros[:,1])
axs[0].plot(np.convolve(np.abs(np.diff(ys)),np.ones(10),mode='valid'), 'b.', alpha = 0.05)
ys = np.interp(np.linspace(r_peaks[-i] / ecg_freq_Hz - 1.0, r_peaks[-i] / ecg_freq_Hz + 1.0, 240),
ys = np.interp(np.linspace(adc_vars.r_peaks[-i] / adc_vars.freq_Hz - 1.0, adc_vars.r_peaks[-i] / adc_vars.freq_Hz + 1.0, 240),
xs_g,
gyros[:,2])
axs[0].plot(np.convolve(np.abs(np.diff(ys)),np.ones(10),mode='valid'), 'r.', alpha = 0.05)
for i in range(1,30):
xs_a = len(ecgs) / ecg_freq_Hz - (np.arange(len(accs)) / imu_freq_Hz)[::-1]
ys = np.interp(np.linspace(r_peaks[-i] / ecg_freq_Hz - 1.0, r_peaks[-i] / ecg_freq_Hz + 1.0, 240),
xs_a = len(adc_vars.ecgs) / adc_vars.freq_Hz - (np.arange(len(accs)) / imu_freq_Hz)[::-1]
ys = np.interp(np.linspace(adc_vars.r_peaks[-i] / adc_vars.freq_Hz - 1.0, adc_vars.r_peaks[-i] / adc_vars.freq_Hz + 1.0, 240),
xs_a,
accs[:,0])
axs[1].plot(np.convolve(np.abs(np.diff(ys)),np.ones(10),mode='valid'), 'k.', alpha = 0.05)
ys = np.interp(np.linspace(r_peaks[-i] / ecg_freq_Hz - 1.0, r_peaks[-i] / ecg_freq_Hz + 1.0, 240),
ys = np.interp(np.linspace(adc_vars.r_peaks[-i] / adc_vars.freq_Hz - 1.0, adc_vars.r_peaks[-i] / adc_vars.freq_Hz + 1.0, 240),
xs_a,
accs[:,1])
axs[1].plot(np.convolve(np.abs(np.diff(ys)),np.ones(10),mode='valid'), 'b.', alpha = 0.05)
ys = np.interp(np.linspace(r_peaks[-i] / ecg_freq_Hz - 1.0, r_peaks[-i] / ecg_freq_Hz + 1.0, 240),
ys = np.interp(np.linspace(adc_vars.r_peaks[-i] / adc_vars.freq_Hz - 1.0, adc_vars.r_peaks[-i] / adc_vars.freq_Hz + 1.0, 240),
xs_a,
accs[:,2])
axs[1].plot(np.convolve(np.abs(np.diff(ys)),np.ones(10),mode='valid'), 'r.', alpha = 0.05)
@@ -181,198 +220,241 @@ def process_imu(d, t):
plt.close()
ecgs = np.zeros(0)
t1s = np.zeros(0)
t2s = np.zeros(0)
strains = np.zeros(0)
ts = []
adc_sparse = 0
# Should be running at 488Hz
ecg_freq_Hz = 488.28125
max_hr_Hz = 120 / 60
last_adc_graph = 0
r_peaks = []
f_pwrs = []
class adc_vars():
ecgs = np.zeros(0)
t1s = np.zeros(0)
t2s = np.zeros(0)
strains = np.zeros(0)
ts = []
freq_Hz = 488.28125
max_hr_Hz = 120 / 60
r_peaks = []
#dead_times = []
def process_adc(d, t):
global ecgs, t1s, t2s, strains, adc_sparse, ts, last_adc_graph, r_peaks, f_pwrs
global adc_vars
for e in t['elements']:
block = d[e['offset']:e['offset'] + e['size']]
element_size = int(len(block) / e['n_elements'])
#if e['name'] == b't':
# ts.append((1 / 2000) * int.from_bytes(block[:4], byteorder = 'little', signed = True))
if e['name'] == b't':
adc_vars.ts.append((1 / 2000) * int.from_bytes(block[:4], byteorder = 'little', signed = True))
if e['name'] == b'ekg_readings_cnts[50]':
ecgs = np.concatenate((ecgs, np.array([(2.4 / (1<<24)) * int.from_bytes(block[4 * i : 4 * i + 4], byteorder = 'little', signed = True) for i in range(50)])))
adc_vars.ecgs = np.concatenate((adc_vars.ecgs, np.array([(2.4 / (1<<24)) * int.from_bytes(block[4 * i : 4 * i + 4], byteorder = 'little', signed = True) for i in range(50)])))
if e['name'] == b'str_readings_cnts[5]':
strains = np.concatenate((strains, np.array([(2.4 / (1<<24)) * int.from_bytes(block[4 * i : 4 * i + 4], byteorder = 'little', signed = True) for i in range(5)])))
adc_vars.strains = np.concatenate((adc_vars.strains, np.array([(2.4 / (1<<24)) * int.from_bytes(block[4 * i : 4 * i + 4], byteorder = 'little', signed = True) for i in range(5)])))
if e['name'] == b'oT_readings_cnts[5]':
t1s = np.concatenate((t1s, np.array([(2.4 / (1<<24)) * int.from_bytes(block[4 * i : 4 * i + 4], byteorder = 'little', signed = True) for i in range(5)])))
adc_vars.t1s = np.concatenate((adc_vars.t1s, np.array([(2.4 / (1<<24)) * int.from_bytes(block[4 * i : 4 * i + 4], byteorder = 'little', signed = True) for i in range(5)])))
if e['name'] == b'iT_readings_cnts[5]':
t2s = np.concatenate((t2s, np.array([(2.4 / (1<<24)) * int.from_bytes(block[4 * i : 4 * i + 4], byteorder = 'little', signed = True) for i in range(5)])))
if len(ecgs) < ecg_freq_Hz * 100:
return
last_adc_graph += 1
# About every second
if last_adc_graph % 10 == 9: #True and time.time() - last_adc_graph > 0.5:# and len(ecgs) > 500:
#last_adc_graph = time.time()
#ecgs = ecgs[- int(5 * 60 * ecg_freq_Hz):]
#b, a = sp.signal.bessel(2, [1 / (0.5 * ecg_freq_Hz), 120 / (0.5 * ecg_freq_Hz)], btype = 'bandpass')
#ecgs_ = sp.signal.filtfilt(b, a, ecgs)
adc_vars.t2s = np.concatenate((adc_vars.t2s, np.array([(2.4 / (1<<24)) * int.from_bytes(block[4 * i : 4 * i + 4], byteorder = 'little', signed = True) for i in range(5)])))
if len(r_peaks) > 0:
start = r_peaks[-1]
else:
start = 0
v = np.convolve(np.abs(np.diff(ecgs[start:])), np.ones(5) / 5, mode = 'valid')
inds = [start + e for e in np.where(v > 0.0002)[0]]
update_ecg_peaks()
last_ind = start
for ind in inds:
if ind - last_ind < ecg_freq_Hz / max_hr_Hz:
continue
region_start = ind - int(0.1 * ecg_freq_Hz)
region_end = ind + int(0.1 * ecg_freq_Hz)
peak = region_start + np.argmax(ecgs[region_start : region_end])
r_peaks.append(peak)
last_ind = ind
if len(adc_vars.ts) % 10 == 9:
adc_graphs()
fig, axs = plt.subplots(2, 1, height_ratios = [1,2])
axs[0].plot(np.arange(len(ecgs))[int(- 5 * ecg_freq_Hz):] / ecg_freq_Hz, ecgs[int(- 5 * ecg_freq_Hz):], 'k.', linestyle='--', alpha = 0.5)
axs[0].set_ylim(np.amin(ecgs[int(- 5 * ecg_freq_Hz):]) - 0.001, np.amax(ecgs[int(- 5 * ecg_freq_Hz):]) + 0.001)
strain_ax = axs[1].twinx()
strain_ax.plot(np.arange(len(t2s)) * 10 / ecg_freq_Hz, t2s, color = 'orange', alpha = 0.5)
strain_ax.set_yticks([])
axs[1].plot([],'k.',label = 'R-R int')
#alsho can do poincare plot
# for fft, resample RR interpolation to a 4Hz grid wich cubic spline
# welches method or FFT->PSD
# there's also Lomb-Scargle periodogram
def update_ecg_peaks():
global adc_vars
if len(adc_vars.r_peaks) > 0:
start = adc_vars.r_peaks[-1]
else:
start = 0
axs[1].plot([],'b.',label = '|delta(R-R int)|')
axs[1].plot([],'g',label = 'chest')
for peak in r_peaks:
axs[0].plot(peak / ecg_freq_Hz, ecgs[peak], 'ro')
# TODO: add dead time recognition
#if np.amax(adc_vars.ecgs[start:]) - np.amin(adc_vars.ecgs[start:start + 60]) > 0.005:
# Poincare
#R = np.diff(r_peaks[-180:]) / freq_Hz
#axs[2].plot(R[:-1], R[1:], 'k.', alpha = 0.2)
# lombscargle
#Ys = sp.signal.lombscargle(np.diff(r_peaks[-240:])[1:] / freq_Hz,
# np.diff(np.diff(r_peaks[-240:])) / freq_Hz,
# np.linspace(0.01,2.0,100),
# floating_mean = False)
#axs[2].plot(np.linspace(0.01,2.0,100), Ys)
v = np.diff(adc_vars.ecgs[start:])
inds = [start + e for e in np.where(v < -0.00025)[0]]
last_ind = start
for ind in inds:
if ind - last_ind < adc_vars.freq_Hz / adc_vars.max_hr_Hz:
continue
region_start = ind - int(0.1 * adc_vars.freq_Hz)
region_end = ind + int(0.1 * adc_vars.freq_Hz)
peak = region_start + np.argmax(adc_vars.ecgs[region_start : region_end])
adc_vars.r_peaks.append(peak)
last_ind = ind
# Power spectrum
# if len(r_peaks) > 120:
# R = np.array(r_peaks[-120:]) / freq_Hz
# fn = sp.interpolate.CubicSpline(0.5 * (R[1:] + R[:-1]), np.diff(R))
# end = 0.5 * (R[-2] + R[-1])
# Xs = np.linspace(end - 60, end, 1000)
# #np.log(np.abs(np.fft.fft(fn(Xs))))[1:])
# X, Y = sp.signal.welch(fn(Xs), fs = 1000 / 60)
# f_pwrs.append(Y[1:10])
# if len(f_pwrs) > 60:
# f_pwrs = f_pwrs[-60:]
# axs[2].imshow(f_pwrs, aspect = 'auto', extent = (X[1], X[9], 0, len(f_pwrs)))
def adc_graphs():
fig, axs = plt.subplots(2, 1, height_ratios = [2,1])
ecg_lookback_s = 5
RR_lookback_s = 600
ecg_points = int(ecg_lookback_s * adc_vars.freq_Hz)
ts = np.arange(max(len(adc_vars.ecgs) - ecg_points, 0), len(adc_vars.ecgs)) / adc_vars.freq_Hz
axs[0].plot(ts,
adc_vars.ecgs[-ecg_points:], 'k.', linestyle='--', alpha = 0.5)
axs[0].set_ylim(np.amin(adc_vars.ecgs[-ecg_points:]) - 0.001, np.amax(adc_vars.ecgs[-ecg_points:]) + 0.001)
for peak in adc_vars.r_peaks:
if peak < ts[-1] * adc_vars.freq_Hz and peak > ts[0] * adc_vars.freq_Hz:
axs[0].plot(peak / adc_vars.freq_Hz, adc_vars.ecgs[peak], 'ro')
axs[0].set_ylabel("V")
axs[0].set_xlabel("t (s)")
axs[1].plot(ts[:-1], np.diff(adc_vars.ecgs[-ecg_points:]), 'k.', linestyle='--', alpha = 0.25)
for peak in adc_vars.r_peaks:
if peak < ts[-1] * adc_vars.freq_Hz and peak > ts[0] * adc_vars.freq_Hz:
axs[1].axvline(peak / adc_vars.freq_Hz, color = 'red', alpha = 0.1)
plt.tight_layout()
plt.savefig("hrv_biofeedback.png", dpi = 150)
plt.close()
return
strain_ax = axs[1].twinx()
b_breath, a_breath = sp.signal.butter(2, 0.05 / (0.5 * 0.1 * adc_vars.freq_Hz), btype = 'highpass')
filtered_strain = sp.signal.filtfilt(b_breath, a_breath, adc_vars.t2s[-int(RR_lookback_s * adc_vars.freq_Hz * 0.1):])
strain_ax.plot(np.arange(max(len(adc_vars.t2s) - len(filtered_strain), 0), len(adc_vars.t2s)) * 10 / adc_vars.freq_Hz, filtered_strain, color = 'orange', alpha = 0.5)
strain_ax.set_yticks([])
first_valid_RR = len(adc_vars.ecgs) - RR_lookback_s * adc_vars.freq_Hz
r_peaks = [e for e in adc_vars.r_peaks if e > first_valid_RR]
axs[1].plot(np.array(r_peaks[:-1]) / adc_vars.freq_Hz, 1000 * np.diff(r_peaks) / adc_vars.freq_Hz, 'k.', alpha = 0.5)
hrv_ax = axs[1].twinx()
hrv = np.abs(1000 * np.diff(np.diff(r_peaks)) / adc_vars.freq_Hz)
hrv_ax.plot(np.array(r_peaks[2:]) / adc_vars.freq_Hz, hrv, 'b.', alpha = 0.5)
if len(r_peaks) > 10:
N = int(np.ceil(len(r_peaks) / 30))
for i in range(N):
block = 1000 * np.array(r_peaks[30 * i : min(len(r_peaks), 30 * i + 30)]) / adc_vars.freq_Hz
rmssd = np.power(np.sum(np.power(np.diff(np.diff(block)), 2.0)) / (len(block) - 2), 0.5)
sdnn = np.std(np.diff(block))
pNN50 = 100 * np.mean(np.abs(np.diff(np.diff(block))) > 50)
hrv_ax.plot([block[0] / 1000, block[-1] / 1000], [rmssd, rmssd], 'r', alpha = 0.5)
hrv_ax.plot([block[0] / 1000, block[-1] / 1000], [sdnn, sdnn], 'm', alpha = 0.5)
hrv_ax.plot([block[0] / 1000, block[-1] / 1000], [pNN50, pNN50], color = 'pink', alpha = 0.5)
hrv_ax.minorticks_on()
hrv_ax.grid(alpha = 0.5)
hrv_ax.tick_params(axis = 'y', colors = 'blue')
hrv_ax.yaxis.tick_right()
axs[1].set_ylim(0, 1200)
hrv_ax.set_ylim(0, 240)
axs[1].set_ylabel("R-R delta T (ms)")
hrv_ax.set_ylabel("delta R-R (ms)", color = 'blue')
axs[1].plot([],'k.',label = 'R-R int', alpha = 0.5)
axs[1].plot([],'b.',label = '|delta(R-R int)|', alpha = 0.5)
axs[1].plot([],color = 'orange', label = 'Breath (tension)', alpha = 0.5)
axs[1].plot([],color = 'red', label = 'RMSSD', alpha = 0.5)
axs[1].plot([],color = 'magenta', label = 'SSDN', alpha = 0.5)
axs[1].plot([],color = 'pink', label = 'pNN50', alpha = 0.5)
axs[1].legend(loc = 'lower center', bbox_to_anchor = (0.5, 1.02), ncol = 4)
axs[1].set_xlabel("t (s)")
hrv_ax.set_axisbelow(True)
plt.tight_layout()
plt.savefig("hrv_biofeedback.png", dpi = 150)
plt.close()
fig, ax = plt.subplots(1)
for i in range(1, min(len(adc_vars.r_peaks), 30)):
dat = adc_vars.ecgs[adc_vars.r_peaks[-i] - int(0.3 * adc_vars.freq_Hz) :
adc_vars.r_peaks[-i] + int(0.9 * adc_vars.freq_Hz)]
dat = dat - np.mean(np.sort(dat[40:-40]))
ax.plot(dat, '.', alpha = 0.02, color = matplotlib.colormaps['viridis'](i / 30.))
ax.set_ylim(-0.001, 0.002)
plt.savefig("pqrs_ensemble.png")
plt.close()
return
#also can do poincare plot
# for fft, resample RR interpolation to a 4Hz grid wich cubic spline
# welches method or FFT->PSD
# there's also Lomb-Scargle periodogram
# Poincare
#R = np.diff(r_peaks[-180:]) / freq_Hz
#axs[2].plot(R[:-1], R[1:], 'k.', alpha = 0.2)
# lombscargle
#Ys = sp.signal.lombscargle(np.diff(r_peaks[-240:])[1:] / freq_Hz,
# np.diff(np.diff(r_peaks[-240:])) / freq_Hz,
# np.linspace(0.01,2.0,100),
# floating_mean = False)
#axs[2].plot(np.linspace(0.01,2.0,100), Ys)
# Power spectrum
# if len(r_peaks) > 120:
# R = np.array(r_peaks[-120:]) / freq_Hz
# fn = sp.interpolate.CubicSpline(0.5 * (R[1:] + R[:-1]), np.diff(R))
# end = 0.5 * (R[-2] + R[-1])
# Xs = np.linspace(end - 60, end, 1000)
# #np.log(np.abs(np.fft.fft(fn(Xs))))[1:])
# X, Y = sp.signal.welch(fn(Xs), fs = 1000 / 60)
# f_pwrs.append(Y[1:10])
# if len(f_pwrs) > 60:
# f_pwrs = f_pwrs[-60:]
# axs[2].imshow(f_pwrs, aspect = 'auto', extent = (X[1], X[9], 0, len(f_pwrs)))
axs[1].plot(np.array(r_peaks[:-1]) / ecg_freq_Hz, 1000 * np.diff(r_peaks) / ecg_freq_Hz, 'k.', alpha = 0.25)
axs[0].set_xlim((len(ecgs) / ecg_freq_Hz) - 5, len(ecgs) / ecg_freq_Hz)
hrv_ax = axs[1].twinx()
hrv = np.abs(1000 * np.diff(np.diff(r_peaks)) / ecg_freq_Hz)
hrv_ax.plot(np.array(r_peaks[2:]) / ecg_freq_Hz, hrv, 'b.', alpha = 0.25)
if len(r_peaks) > 10:
N = int(np.ceil(len(r_peaks) / 30))
for i in range(N):
block = 1000 * np.array(r_peaks[30 * i : min(len(r_peaks), 30 * i + 30)]) / ecg_freq_Hz
rmssd = np.power(np.sum(np.power(np.diff(np.diff(block)), 2.0)) / (len(block) - 2), 0.5)
sdnn = np.std(np.diff(block))
pNN50 = 100 * np.mean(np.abs(np.diff(np.diff(block))) > 50)
hrv_ax.plot([block[0] / 1000, block[-1] / 1000], [rmssd, rmssd], 'r', alpha = 0.5)
hrv_ax.plot([block[0] / 1000, block[-1] / 1000], [sdnn, sdnn], 'm', alpha = 0.5)
hrv_ax.plot([block[0] / 1000, block[-1] / 1000], [pNN50, pNN50], color = 'orange', alpha = 0.5)
hrv_ax.minorticks_on()
hrv_ax.grid(alpha = 0.5)
hrv_ax.tick_params(axis = 'y', colors = 'blue')
hrv_ax.yaxis.tick_right()
axs[1].set_ylim(0, 1200)
hrv_ax.set_ylim(0, 240)
axs[0].set_xlabel("t (s)")
axs[1].set_xlabel("t (s)")
axs[0].set_ylabel("V (mV)")
axs[1].set_ylabel("beat delta T (ms)")
hrv_ax.set_ylabel("BPS delta T (ms)", color = 'blue')
axs[1].legend(loc='upper left', framealpha = 1)
hrv_ax.set_axisbelow(True)
plt.tight_layout()
#if (len(ecgs) / freq_Hz) > 6:
# plt.show()
# quit()
plt.savefig("hrv_biofeedback.png", dpi = 150)
plt.close()
# def make_graphs():
fig, ax = plt.subplots(1)
for i in range(1,30):
dat = ecgs[r_peaks[-i] - int(0.3 * ecg_freq_Hz) :
r_peaks[-i] + int(0.9 * ecg_freq_Hz)]
dat = dat - np.mean(np.sort(dat[40:-40]))
ax.plot(dat, '.', alpha = 0.02, color = matplotlib.colormaps['viridis'](i / 30.))
ax.set_ylim(-0.001, 0.002)
plt.savefig("pqrs_ensemble.png")
plt.close()
def make_graphs():
# fig, axs = plt.subplots(2,2)
fig, axs = plt.subplots(2,2)
b, a = sp.signal.butter(2, [1 / (0.5 * freq_Hz), 120 / (0.5 * freq_Hz)], btype = 'bandpass')
ecgs_ = sp.signal.filtfilt(b, a, ecgs)
# b, a = sp.signal.butter(2, [1 / (0.5 * freq_Hz), 120 / (0.5 * freq_Hz)], btype = 'bandpass')
# ecgs_ = sp.signal.filtfilt(b, a, ecgs)
r_peaks = sp.signal.find_peaks(ecgs, height = None, threshold = None, distance = freq_Hz / max_hr_Hz, width = 5, prominence = 0.001)[0]
axs[0][0].plot(np.arange(len(ecgs)) / freq_Hz, ecgs_, 'k.', linestyle='--')
for peak in r_peaks:
axs[0][0].plot(peak / freq_Hz, ecgs_[peak], 'ro')
axs[1][0].plot(r_peaks[:-1] / freq_Hz, 1 / (np.diff(r_peaks) /freq_Hz), 'k.')
# r_peaks = sp.signal.find_peaks(ecgs, height = None, threshold = None, distance = freq_Hz / max_hr_Hz, width = 5, prominence = 0.001)[0]
# axs[0][0].plot(np.arange(len(ecgs)) / freq_Hz, ecgs_, 'k.', linestyle='--')
# for peak in r_peaks:
# axs[0][0].plot(peak / freq_Hz, ecgs_[peak], 'ro')
# axs[1][0].plot(r_peaks[:-1] / freq_Hz, 1 / (np.diff(r_peaks) /freq_Hz), 'k.')
b, a = sp.signal.butter(2, [0.05 / (0.5 * ppg_freq_Hz), 4 / (0.5 * ppg_freq_Hz)], btype = 'bandpass')
reds_ = sp.signal.filtfilt(b, a, reds)
#irs_ = sp.signal.filtfilt(b, a, irs)
#greens_ = sp.signal.filtfilt(b, a, greens)
# b, a = sp.signal.butter(2, [0.05 / (0.5 * ppg_freq_Hz), 4 / (0.5 * ppg_freq_Hz)], btype = 'bandpass')
# reds_ = sp.signal.filtfilt(b, a, reds)
# #irs_ = sp.signal.filtfilt(b, a, irs)
# #greens_ = sp.signal.filtfilt(b, a, greens)
P_r = np.log(np.abs(np.fft.rfft(reds_)))
P_i = np.log(np.abs(np.fft.rfft(irs)))
P_g = np.log(np.abs(np.fft.rfft(greens)))
# P_r = np.log(np.abs(np.fft.rfft(reds_)))
# P_i = np.log(np.abs(np.fft.rfft(irs)))
# P_g = np.log(np.abs(np.fft.rfft(greens)))
#axs[0][1].plot(np.arange(P_r.shape[0]) * 1 / 200, P_r)
#axs[0][1].plot(P_i)
#axs[0][1].plot(P_g)
# #axs[0][1].plot(np.arange(P_r.shape[0]) * 1 / 200, P_r)
# #axs[0][1].plot(P_i)
# #axs[0][1].plot(P_g)
axs[0][1].plot(np.arange(len(reds)) / 50, reds_, color = 'red')
#ax2 = axs[0][1].twinx()
#ax2.plot(np.arange(len(reds)) / 50, irs_, color = 'magenta')
#ax3 = ax2.twinx()
#ax3.plot(np.arange(len(reds)) / 50, greens_, color = 'green')
# axs[0][1].plot(np.arange(len(reds)) / 50, reds_, color = 'red')
# #ax2 = axs[0][1].twinx()
# #ax2.plot(np.arange(len(reds)) / 50, irs_, color = 'magenta')
# #ax3 = ax2.twinx()
# #ax3.plot(np.arange(len(reds)) / 50, greens_, color = 'green')
#axs[1][1].plot(np.array(gyros)[:,0])
#axs[1][1].plot(np.array(gyros)[:,1])
#axs[1][1].plot(np.array(gyros)[:,2])
gs = np.array(gyros)
acs = np.array(accs)
f_gs = np.log(np.abs(np.fft.rfft(acs, axis = 0)))
axs[1][1].plot(f_gs[:,0], alpha = 0.25)
axs[1][1].plot(f_gs[:,1], alpha = 0.25)
axs[1][1].plot(f_gs[:,2], alpha = 0.25)
# #axs[1][1].plot(np.array(gyros)[:,0])
# #axs[1][1].plot(np.array(gyros)[:,1])
# #axs[1][1].plot(np.array(gyros)[:,2])
# gs = np.array(gyros)
# acs = np.array(accs)
# f_gs = np.log(np.abs(np.fft.rfft(acs, axis = 0)))
# axs[1][1].plot(f_gs[:,0], alpha = 0.25)
# axs[1][1].plot(f_gs[:,1], alpha = 0.25)
# axs[1][1].plot(f_gs[:,2], alpha = 0.25)
#axs[1][1].plot(strains)
#axs[1][1].plot(t1s)
#axs[1][1].plot(t2s)
# #axs[1][1].plot(strains)
# #axs[1][1].plot(t1s)
# #axs[1][1].plot(t2s)
#plt.show()
# #plt.show()
def read_and_process(types, cons, size):
index = 0