import matplotlib.pyplot as plt import numpy as np from scipy import signal import time import serial def arr_sizes(s): if s[-1:] != b']' or b'[' not in s: return 1 v = int(s[s.rindex(b'[') + 1:-1]) return v def get_type_list(ser): ser.write(b'?') types = [] for i in range(13): line = ser.readline().strip(b'\n').strip(b'\r') if line != b'': L = line.split(b" ") types.append({'type_name' : L[0], 'type_code' : int(L[1]), 'size' : int(L[2]), 'elements' : [] }) i = 3 while i < len(L): types[-1]['elements'].append({'type_name' : L[i], 'name' : L[i + 1], 'offset' : int(L[i + 2]), 'n_elements' : arr_sizes(L[i + 1]), 'size' : int(L[i + 3]) * arr_sizes(L[i + 1])}) i += 4 return types def my_filter(arr): inds = np.where(np.abs(np.diff(arr)) > 0.005)[0] for ind in inds: prev_ind = ind - 1 while prev_ind in inds: prev_ind -= 1 next_ind = ind + 1 while next_ind in inds: next_ind += 1 arr[ind] = 0.5 * arr[prev_ind] + 0.5 * arr[next_ind] return arr accs = [] gyros = [] def process_imu(d, t): global accs global gyros for e in t['elements']: block = d[e['offset']:e['offset'] + e['size']] element_size = int(len(block) / e['n_elements']) if e['name'] == b'data[141]': for i in range(20): reading = block[1 + 7 * i : 8 + 7 * i] #print(reading) imu_reading_type = reading[0] >> 3 imu_reading_tag_cnt = (reading[0] >> 1) & 3 data = np.array([int.from_bytes(reading[2 * i + 1 : 2 * i + 3], byteorder = 'little', signed = True) for i in range(3)]) if imu_reading_type == 1: gyros.append(250 / (1<<16) * data) elif imu_reading_type == 2: accs.append(4 / (1<<16) * data) else: assert False if len(gyros) > 400: gyros = gyros[-400:] if len(accs) > 400: accs = accs[-400:] fig, axs = plt.subplots(2) g = np.array(gyros) a = np.array(accs) axs[0].set_ylabel("dps") axs[0].plot(g[:,0]) axs[0].plot(g[:,1]) axs[0].plot(g[:,2]) axs[1].set_ylabel("g") axs[1].plot(a[:,0]) axs[1].plot(a[:,1]) axs[1].plot(a[:,2]) plt.savefig("acc_gyro.png") plt.close() def update_with_data(ser, types): ser.timeout = 3 ser.reset_input_buffer() start = time.time() while(True): ser.write(b'r') size = int.from_bytes(ser.read(4), byteorder = 'little', signed = False) cons = ser.read(size) index = 0 while (index < size): packet_type = cons[index] t = [t for t in types if t['type_code'] == packet_type][0] #print(index, packet_type, t['type_name']) d = cons[index + 1 : index + 1 + t['size']] if t['type_name'] == b'packet_imu': process_imu(d, t) if t['type_name'] == b'packet_msg': print(d) index += 1 + t['size'] # if t['type_name'] == b'packet_ekg' and e['name'] == b'readings_cnts[50]': # chunked = [(2.4 / (1<<24)) * int.from_bytes(block[i:i + element_size], byteorder='little', signed = True) for i in range(0, len(block), element_size)] # adcs += chunked # if len(adcs) > 500: # dat = np.array(adcs[-2048:]) # if len(adcs) > 2048: # adcs = adcs[-2048:] # #filtered_dat = my_filter(dat) # filtered_dat = dat # filtered_dat = signal.filtfilt(b, a, filtered_dat) # dd = np.sort(dat) # center = 0.5 * (dd[-100] + dd[100]) # range_ = dd[-100] - dd[100] # if len(dat) == 2048: # print("Freq:", 2048 / (time.time() - start)) # axs[0].cla() # axs[1].cla() # axs[0].set_title("ADC {:3.3f}mV".format(1000 * (range_))) # #axs[0].plot(dat, 'r.', linestyle = '--') # axs[0].plot(filtered_dat, 'k.', linestyle = '--') # #axs[0].set_ylim(center - range_, center + range_) # _fft = np.log(np.abs(np.fft.fft(filtered_dat)[1:])) # axs[1].plot(250 * np.fft.fftfreq(dat.shape[-1])[1:], _fft, 'k.') # axs[1].axvline(x = 60) # plt.savefig("image.png") # if t['type_name'] == b'packet_imu' and e['name'] == b'readings_cnts[4]': # imu_reading_type = block[0] >> 3 # imu_reading_tag_cnt = (block[0] >> 1) & 3 # reading_xyz = [int.from_bytes(block[2:4], byteorder = 'big', signed = True), # int.from_bytes(block[4:6], byteorder = 'big', signed = True), # int.from_bytes(block[6:8], byteorder = 'big', signed = True)] # if imu_reading_type == 1: # #print("{:02x} {:d}".format(imu_reading_type, imu_reading_tag_cnt)) # gyros.append([250 * e / (1<<16) for e in reading_xyz]) # elif imu_reading_type == 2: # accs.append([4 * e / (1<<16) for e in reading_xyz]) # else: # print("ERR") def main(): ser = serial.Serial(port = '/dev/ttyACM1', timeout = 0.5) ser.flush() ser.reset_input_buffer() types = get_type_list(ser) # ser.write(b'1') # ser.write(b'3') # ser.write(b'5') ser.flush() update_with_data(ser, types) main()