-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbrainflow_simple_example.py
More file actions
82 lines (70 loc) · 2.92 KB
/
brainflow_simple_example.py
File metadata and controls
82 lines (70 loc) · 2.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
from pickle import dump, load
from brainflow.data_filter import DataFilter, FilterTypes, DetrendOperations
from brainflow.board_shim import BoardShim
from pyqtgraph.Qt import QtGui, QtCore, QtWidgets
import logging
import brainflow_tools
import pyqtgraph as pg
'''
This script plots EEG data thats has been collected and filtered using Brainflow.
'''
class Graph:
def __init__(self, board_shim):
self.board_id = board_shim.get_board_id()
self.board_shim = board_shim
self.exg_channels = BoardShim.get_exg_channels(self.board_id)
self.sampling_rate = BoardShim.get_sampling_rate(self.board_id)
self.update_speed_ms = 50
self.window_size = 4
self.num_points = self.window_size * self.sampling_rate
self.app = QtWidgets.QApplication([])
self.win = pg.GraphicsLayoutWidget(show=True, title='BrainFlow Plot', size=(800, 600))
self._init_timeseries()
timer = QtCore.QTimer()
timer.timeout.connect(self.update)
timer.start(self.update_speed_ms)
QtWidgets.QApplication.instance().exec_()
def _init_timeseries(self):
self.plots = list()
self.curves = list()
for i in range(len(self.exg_channels)):
p = self.win.addPlot(row=i, col=0)
p.showAxis('left', False)
p.setMenuEnabled('left', False)
p.showAxis('bottom', False)
p.setMenuEnabled('bottom', False)
if i == 0:
p.setTitle('TimeSeries Plot')
self.plots.append(p)
curve = p.plot()
self.curves.append(curve)
def update(self):
data = self.board_shim.get_current_board_data(self.num_points)
for count, channel in enumerate(self.exg_channels):
# plot timeseries
DataFilter.detrend(data[channel], DetrendOperations.CONSTANT.value)
DataFilter.perform_bandpass(data[channel], self.sampling_rate, 3.0, 45.0, 2,
FilterTypes.BUTTERWORTH.value, 0)
DataFilter.perform_bandstop(data[channel], self.sampling_rate, 48.0, 52.0, 2,
FilterTypes.BUTTERWORTH.value, 0)
DataFilter.perform_bandstop(data[channel], self.sampling_rate, 58.0, 62.0, 2,
FilterTypes.BUTTERWORTH.value, 0)
self.curves[count].setData(data[channel].tolist())
self.app.processEvents()
def main():
# Connect to EEG
board, board_descr, args = brainflow_tools.connect_board()
try:
print(board_descr)
board.prepare_session()
board.start_stream(450000, args.streamer_params)
Graph(board)
except BaseException:
logging.warning('Exception', exc_info=True)
finally:
logging.info('End')
if board.is_prepared():
logging.info('Releasing session')
board.release_session()
if __name__ == "__main__":
main()