import numpy as np import matplotlib.pyplot as plt import scipy as sp packet_definitions = b"""packet_rtc 1 28 uint32_t t 0 4 RTC_TimeTypeDef sTime 4 20 RTC_DateTypeDef sDate 24 4 packet_vbatt 2 8 uint32_t t 0 4 uint16_t vbatt_cnts 4 2 packet_imu 3 148 uint32_t t 0 4 uint8_t data[141] 4 1 packet_adc 4 268 uint32_t t 0 4 uint8_t index 4 1 int32_t ekg_readings_cnts[50] 8 4 int32_t str_readings_cnts[5] 208 4 int32_t oT_readings_cnts[5] 228 4 int32_t iT_readings_cnts[5] 248 4 packet_spo2 5 184 uint32_t t 0 4 uint8_t bytes[180] 4 1 packet_msg 6 36 uint32_t t 0 4 char buff[32] 4 1""".split(b'\n') def arr_sizes(s): if s[-1:] != b']' or b'[' not in s: return 1 v = int(s[s.rindex(b'[') + 1:-1]) return v def get_type_list(lines): types = [] for line in lines: if line != b'': L = line.split(b" ") types.append({'type_name' : L[0], 'type_code' : int(L[1]), 'size' : int(L[2]), 'elements' : [] }) i = 3 while i < len(L): types[-1]['elements'].append({'type_name' : L[i], 'name' : L[i + 1], 'offset' : int(L[i + 2]), 'n_elements' : arr_sizes(L[i + 1]), 'size' : int(L[i + 3]) * arr_sizes(L[i + 1])}) i += 4 return types reds = [] irs = [] greens = [] ppg_freq_Hz = 50 def process_ppg(d, t): global greens, reds, irs for e in t['elements']: block = d[e['offset']:e['offset'] + e['size']] element_size = int(len(block) / e['n_elements']) if e['name'] == b'bytes[180]': reds += [int.from_bytes(block[3 * i : 3 * i + 3], byteorder = 'big') for i in range(0,60,3)] irs += [int.from_bytes(block[3 * i : 3 * i + 3], byteorder = 'big') for i in range(1,60,3)] greens += [int.from_bytes(block[3 * i : 3 * i + 3], byteorder = 'big') for i in range(2,60,3)] # if len(reds) > 400: # reds = reds[-400:] # irs = irs[-400:] # greens = greens[-400:] # fig, axs = plt.subplots(3) # axs[0].set_title('red') # axs[1].set_title('ir') # axs[2].set_title('green') # axs[0].plot(reds) # axs[1].plot(irs) # axs[2].plot(greens) # plt.savefig("ppg.png") # plt.close() accs = [] gyros = [] imu_sparse = 0 def process_imu(d, t): global accs, gyros, imu_sparse for e in t['elements']: block = d[e['offset']:e['offset'] + e['size']] element_size = int(len(block) / e['n_elements']) if e['name'] == b'data[141]': for i in range(20): reading = block[1 + 7 * i : 8 + 7 * i] #print(reading) imu_reading_type = reading[0] >> 3 imu_reading_tag_cnt = (reading[0] >> 1) & 3 data = np.array([int.from_bytes(reading[2 * i + 1 : 2 * i + 3], byteorder = 'little', signed = True) for i in range(3)]) if imu_reading_type == 1: gyros.append(250 / (1<<16) * data) elif imu_reading_type == 2: accs.append(4 / (1<<16) * data) else: pass #assert False # imu_sparse += 1 # if imu_sparse % 5 == 4: # if len(gyros) > 1600: # gyros = gyros[-1600:] # accs = accs[-1600:] # fig, axs = plt.subplots(2) # g = np.array(gyros) # a = np.array(accs) # #np.save("gyros.npy", g) # #np.save("accs.npy", a) # a -= np.mean(a, axis = 0).reshape(1,3) # axs[0].set_ylabel("dps") # axs[0].plot(g[:,0]) # axs[0].plot(g[:,1]) # axs[0].plot(g[:,2]) # axs[1].set_ylabel("g") # axs[1].plot(a[:,0]) # axs[1].plot(a[:,1]) # axs[1].plot(a[:,2]) # plt.savefig("acc_gyro.png") # plt.close() ecgs = [] t1s = [] t2s = [] strains = [] ts = [] adc_sparse = 0 # Should be running at 488Hz freq_Hz = 488.28125 max_hr_Hz = 240 / 60 def process_adc(d, t): global ecgs, t1s, t2s, strains, adc_sparse, ts for e in t['elements']: block = d[e['offset']:e['offset'] + e['size']] element_size = int(len(block) / e['n_elements']) if e['name'] == b't': ts.append((1 / 2000) * int.from_bytes(block[:4], byteorder = 'little', signed = True)) if e['name'] == b'ekg_readings_cnts[50]': ecgs = ecgs + [(2.4 / (1<<24)) * int.from_bytes(block[4 * i : 4 * i + 4], byteorder = 'little', signed = True) for i in range(50)] # adc_sparse += 1 # if adc_sparse % 10 == 9: # if len(ecgs) > 200: # ecgs = ecgs[-4 * 4096:] # t1s = t1s[-4096:] # t2s = t2s[-4096:] # strains = strains[-4096:] # fig, axs = plt.subplots(2) # axs[0].set_title("ECG") # #axs[2].plot(np.array(ts[-200:-1]), 1000 * np.diff(ts[-200:]),'k.',linestyle='--') # #if len(ts) > 200: # # axs[2].set_title(str((ts[-1] - ts[-200]) / 200)) # #axs[2].set_xlabel('S') # #axs[2].set_ylabel('mS') # #axs[1].set_title("Strain") # #axs[2].set_title("oT") # #axs[3].set_title("iT") # # 0.25 * 244 Hz # #b, a = sp.signal.butter(2, [1 / (0.5 * freq_Hz), 120 / (0.5 * freq_Hz)], btype = 'bandpass') # #ecgs_ = sp.signal.filtfilt(b, a, ecgs) # #beats = np.where(np.diff(ecgs_) < -0.0003)[0] # #last_beat = -1000000 # #beats_ = [] # #for beat in beats: # # if (beat - last_beat) / 488 > 1 / (max_hr_Hz): # # last_beat = beat # # beats_.append(beat) # axs[0].plot(ecgs, 'k') # r_peaks = sp.signal.find_peaks(ecgs, height = None, threshold = None, distance = freq_Hz / max_hr_Hz, width = 5, prominence = 0.001)[0] # all_peaks = sp.signal.find_peaks(ecgs, height = None, threshold = None, width = 10, prominence = 0.0001)[0] # # for peak in all_peaks: # # ls = [e for e in r_peaks if peak - e < 0 and peak - e > -0.2 * freq_Hz] # # if len(ls) >= 1: # # axs.plot(peak, ecgs[peak], 'bo') # P peaks # # for peak in all_peaks: # # ls = [e for e in r_peaks if peak - e > 0 and peak - e < 0.4 * freq_Hz] # # if len(ls) >= 1: # # axs.plot(peak, ecgs[peak], 'yo') # T peaks # for peak in r_peaks: # axs[0].plot(peak, ecgs[peak], 'ro') # if len(r_peaks) > 5: # hr_bpm = np.mean(60 * freq_Hz / np.diff(r_peaks)) # #hrv_ms = 1000 * np.std(np.diff(r_peaks)) / freq_Hz # hrv_rmssd_ms = 1000. * np.power(np.mean(np.power(np.diff(np.diff(r_peaks / freq_Hz)), 2)), 0.5) # axs[0].set_title(f"ECG HR:{np.round(hr_bpm)}BPM HRV (RMSSD):{np.round(hrv_rmssd_ms)}ms") # hist = np.histogram(np.diff(r_peaks), bins = 8) # axs[1].plot(0.5 * (hist[1][1:] + hist[1][:-1]), hist[0]) # #axs[1].plot(strains) # #axs[2].plot(t1s) # #axs[3].plot(t2s) # fig.tight_layout() # plt.savefig("adcs.png") # plt.close() # fig, axs = plt.subplots() # axs.plot(ts[1:], np.diff(ts), 'k.') # plt.savefig("adc_ts.png") if e['name'] == b'str_readings_cnts[5]': strains = strains + [(2.4 / (1<<24)) * int.from_bytes(block[4 * i : 4 * i + 4], byteorder = 'little', signed = True) for i in range(5)] if e['name'] == b'oT_readings_cnts[5]': t1s = t1s + [(2.4 / (1<<24)) * int.from_bytes(block[4 * i : 4 * i + 4], byteorder = 'little', signed = True) for i in range(5)] if e['name'] == b'iT_readings_cnts[5]': t2s = t2s + [(2.4 / (1<<24)) * int.from_bytes(block[4 * i : 4 * i + 4], byteorder = 'little', signed = True) for i in range(5)] def make_graphs(): fig, axs = plt.subplots(2,2) b, a = sp.signal.butter(2, [1 / (0.5 * freq_Hz), 120 / (0.5 * freq_Hz)], btype = 'bandpass') ecgs_ = sp.signal.filtfilt(b, a, ecgs) r_peaks = sp.signal.find_peaks(ecgs, height = None, threshold = None, distance = freq_Hz / max_hr_Hz, width = 5, prominence = 0.001)[0] axs[0][0].plot(np.arange(len(ecgs)) / freq_Hz, ecgs_, 'k.', linestyle='--') for peak in r_peaks: axs[0][0].plot(peak / freq_Hz, ecgs_[peak], 'ro') axs[1][0].plot(r_peaks[:-1] / freq_Hz, 1 / (np.diff(r_peaks) /freq_Hz), 'k.') b, a = sp.signal.butter(2, [0.05 / (0.5 * ppg_freq_Hz), 4 / (0.5 * ppg_freq_Hz)], btype = 'bandpass') reds_ = sp.signal.filtfilt(b, a, reds) #irs_ = sp.signal.filtfilt(b, a, irs) #greens_ = sp.signal.filtfilt(b, a, greens) P_r = np.log(np.abs(np.fft.rfft(reds_))) P_i = np.log(np.abs(np.fft.rfft(irs))) P_g = np.log(np.abs(np.fft.rfft(greens))) #axs[0][1].plot(np.arange(P_r.shape[0]) * 1 / 200, P_r) #axs[0][1].plot(P_i) #axs[0][1].plot(P_g) axs[0][1].plot(np.arange(len(reds)) / 50, reds_, color = 'red') #ax2 = axs[0][1].twinx() #ax2.plot(np.arange(len(reds)) / 50, irs_, color = 'magenta') #ax3 = ax2.twinx() #ax3.plot(np.arange(len(reds)) / 50, greens_, color = 'green') #axs[1][1].plot(np.array(gyros)[:,0]) #axs[1][1].plot(np.array(gyros)[:,1]) #axs[1][1].plot(np.array(gyros)[:,2]) gs = np.array(gyros) acs = np.array(accs) f_gs = np.log(np.abs(np.fft.rfft(acs, axis = 0))) axs[1][1].plot(f_gs[:,0], alpha = 0.25) axs[1][1].plot(f_gs[:,1], alpha = 0.25) axs[1][1].plot(f_gs[:,2], alpha = 0.25) #axs[1][1].plot(strains) #axs[1][1].plot(t1s) #axs[1][1].plot(t2s) plt.show() def read_and_process(types, cons, size): index = 0 while (index < size): packet_type = cons[index] print(packet_type) try: t = [t for t in types if t['type_code'] == packet_type][0] except: print("HERE") print(cons[index-5:index+5]) quit() return print(index, packet_type, t['type_name']) d = cons[index + 1 : index + 1 + t['size']] if t['type_name'] == b'packet_imu': process_imu(d, t) if t['type_name'] == b'packet_msg': print(d) if t['type_name'] == b'packet_adc': process_adc(d, t) if t['type_name'] == b'packet_spo2': process_ppg(d, t) index += 1 + t['size']