112 lines
3.8 KiB
Python
112 lines
3.8 KiB
Python
import serial
|
|
import matplotlib.pyplot as plt
|
|
import numpy as np
|
|
from scipy import signal
|
|
import time
|
|
|
|
fs = 244
|
|
b, a = signal.butter(
|
|
N = 2,
|
|
Wn = [30],
|
|
btype = 'lowpass',
|
|
fs = fs)
|
|
|
|
ser = serial.Serial(port='/dev/ttyACM1', baudrate=115200, timeout = 0.5)
|
|
ser.flush()
|
|
|
|
data = bytes()
|
|
|
|
ready = False
|
|
|
|
ser.write(b'?')
|
|
|
|
types = []
|
|
|
|
def arr_sizes(s):
|
|
if s[-1:] != b']' or b'[' not in s:
|
|
return 1
|
|
v = int(s[s.rindex(b'[') + 1:-1])
|
|
return v
|
|
|
|
for i in range(13):
|
|
line = ser.readline().strip(b'\n').strip(b'\r')
|
|
if line != b'':
|
|
L = line.split(b" ")
|
|
types.append({'type_name' : L[0],
|
|
'type_code' : int(L[1]),
|
|
'size' : int(L[2]),
|
|
'elements' : []
|
|
})
|
|
i = 3
|
|
while i < len(L):
|
|
types[-1]['elements'].append({'type_name' : L[i], 'name' : L[i + 1], 'offset' : int(L[i + 2]), 'n_elements' : arr_sizes(L[i + 1]), 'size' : int(L[i + 3]) * arr_sizes(L[i + 1]), 'readings' : []})
|
|
i += 4
|
|
|
|
ser.flush()
|
|
ser.timeout = 5
|
|
|
|
adcs = []
|
|
accs = []
|
|
gyros = []
|
|
|
|
fig, axs = plt.subplots(2,2)
|
|
then = time.time()
|
|
for i in range(10):
|
|
ser.write(b'R')
|
|
data = ser.read(1024)
|
|
continue
|
|
|
|
index = 0
|
|
while (index < 1024):
|
|
packet_type = (data[index + 1]<<8) + data[index]
|
|
#print(packet_type)
|
|
ind = [i for i,t in enumerate(types) if t['type_code'] == packet_type]
|
|
if len(ind) != 1:
|
|
break
|
|
t = types[ind[0]]
|
|
d = data[index + 2 : index + 2 + t['size']]
|
|
for e in t['elements']:
|
|
block = d[e['offset']:e['offset'] + e['size']]
|
|
element_size = int(len(block) / e['n_elements'])
|
|
if t['type_name'] == b'packet_ekg' and e['name'] == b'readings_cnts[50]':
|
|
chunked = [(2.4 / (1<<24)) * int.from_bytes(block[i:i + element_size], byteorder='little', signed = True) for i in range(0, len(block), element_size)]
|
|
adcs += chunked
|
|
axs[0][0].cla()
|
|
axs[0][1].cla()
|
|
axs[1][1].cla()
|
|
dat = np.array(adcs[-1000:])
|
|
axs[0][0].set_title("ADC")
|
|
axs[0][1].set_title("ACC")
|
|
axs[1][1].set_title("GYRO")
|
|
axs[0][0].plot(dat, 'k.', linestyle = '--')#signal.filtfilt(b, a, dat))
|
|
axs[0][1].plot(accs[-30:], 'k.', linestyle = '--')#signal.filtfilt(b, a, dat))
|
|
axs[1][1].plot(gyros[-30:], 'k.', linestyle = '--')#signal.filtfilt(b, a, dat))
|
|
|
|
plt.pause(0.01)
|
|
##print(BB)
|
|
e['readings'].append(chunked)
|
|
if t['type_name'] == b'packet_imu' and e['name'] == b'readings_cnts[4]':
|
|
imu_reading_type = block[0] >> 3
|
|
imu_reading_tag_cnt = (block[0] >> 1) & 3 # tag count is sensor time slow
|
|
reading_xyz = [int.from_bytes(block[2:4], byteorder = 'big', signed = True),
|
|
int.from_bytes(block[4:6], byteorder = 'big', signed = True),
|
|
int.from_bytes(block[6:8], byteorder = 'big', signed = True)]
|
|
if imu_reading_type == 1:
|
|
#print("{:02x} {:d}".format(imu_reading_type, imu_reading_tag_cnt))
|
|
gyros.append([250 * e / (1<<16) for e in reading_xyz])
|
|
elif imu_reading_type == 2:
|
|
accs.append([4 * e / (1<<16) for e in reading_xyz])
|
|
else:
|
|
print("ERR")
|
|
#chunked = [int.from_bytes(block[i:i + element_size], byteorder='little', signed = True) for i in range(0, len(block), element_size)]
|
|
#print(chunked)
|
|
#print(t['type_name'])
|
|
|
|
index += 2 + t['size']
|
|
|
|
#for t in types:
|
|
# if t['type_name'] == b'packet_ekg':
|
|
# print(t)
|
|
|
|
print((time.time() - then) / 10)
|