import serial import matplotlib.pyplot as plt import numpy as np from scipy import signal import time fs = 244 b, a = signal.butter( N = 2, Wn = [30], btype = 'lowpass', fs = fs) ser = serial.Serial(port='/dev/ttyACM1', baudrate=115200, timeout = 0.5) ser.flush() data = bytes() ready = False ser.write(b'?') types = [] def arr_sizes(s): if s[-1:] != b']' or b'[' not in s: return 1 v = int(s[s.rindex(b'[') + 1:-1]) return v for i in range(13): line = ser.readline().strip(b'\n').strip(b'\r') if line != b'': L = line.split(b" ") types.append({'type_name' : L[0], 'type_code' : int(L[1]), 'size' : int(L[2]), 'elements' : [] }) i = 3 while i < len(L): types[-1]['elements'].append({'type_name' : L[i], 'name' : L[i + 1], 'offset' : int(L[i + 2]), 'n_elements' : arr_sizes(L[i + 1]), 'size' : int(L[i + 3]) * arr_sizes(L[i + 1]), 'readings' : []}) i += 4 ser.flush() ser.timeout = 5 adcs = [] accs = [] gyros = [] fig, axs = plt.subplots(2,2) then = time.time() for i in range(10): ser.write(b'R') data = ser.read(1024) continue index = 0 while (index < 1024): packet_type = (data[index + 1]<<8) + data[index] #print(packet_type) ind = [i for i,t in enumerate(types) if t['type_code'] == packet_type] if len(ind) != 1: break t = types[ind[0]] d = data[index + 2 : index + 2 + t['size']] for e in t['elements']: block = d[e['offset']:e['offset'] + e['size']] element_size = int(len(block) / e['n_elements']) if t['type_name'] == b'packet_ekg' and e['name'] == b'readings_cnts[50]': chunked = [(2.4 / (1<<24)) * int.from_bytes(block[i:i + element_size], byteorder='little', signed = True) for i in range(0, len(block), element_size)] adcs += chunked axs[0][0].cla() axs[0][1].cla() axs[1][1].cla() dat = np.array(adcs[-1000:]) axs[0][0].set_title("ADC") axs[0][1].set_title("ACC") axs[1][1].set_title("GYRO") axs[0][0].plot(dat, 'k.', linestyle = '--')#signal.filtfilt(b, a, dat)) axs[0][1].plot(accs[-30:], 'k.', linestyle = '--')#signal.filtfilt(b, a, dat)) axs[1][1].plot(gyros[-30:], 'k.', linestyle = '--')#signal.filtfilt(b, a, dat)) plt.pause(0.01) ##print(BB) e['readings'].append(chunked) if t['type_name'] == b'packet_imu' and e['name'] == b'readings_cnts[4]': imu_reading_type = block[0] >> 3 imu_reading_tag_cnt = (block[0] >> 1) & 3 # tag count is sensor time slow reading_xyz = [int.from_bytes(block[2:4], byteorder = 'big', signed = True), int.from_bytes(block[4:6], byteorder = 'big', signed = True), int.from_bytes(block[6:8], byteorder = 'big', signed = True)] if imu_reading_type == 1: #print("{:02x} {:d}".format(imu_reading_type, imu_reading_tag_cnt)) gyros.append([250 * e / (1<<16) for e in reading_xyz]) elif imu_reading_type == 2: accs.append([4 * e / (1<<16) for e in reading_xyz]) else: print("ERR") #chunked = [int.from_bytes(block[i:i + element_size], byteorder='little', signed = True) for i in range(0, len(block), element_size)] #print(chunked) #print(t['type_name']) index += 2 + t['size'] #for t in types: # if t['type_name'] == b'packet_ekg': # print(t) print((time.time() - then) / 10)