-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathDebugServer.py
More file actions
56 lines (46 loc) · 1.62 KB
/
DebugServer.py
File metadata and controls
56 lines (46 loc) · 1.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import socket
from threading import Thread
from serial import Serial
class DebugServer:
def handle_clients(self):
self.sock.listen(5) # Now wait for client connection.
while True:
c, addr = self.sock.accept() # Establish connection with client.
self.connections.append(c)
print("New Connection!")
def handle_serial(self):
while True:
if self.ser.inWaiting() > 0:
line = self.ser.readline()
print(line)
for con in self.connections:
try:
con.sendall(line)
except:
print("client disconnected?")
self.connections.remove(con)
self.handle_serial()
def __init__(self):
self.ser = Serial('COM3', timeout=0.1)
self.writers = []
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_address = ('localhost', 20000)
self.sock.bind(server_address)
self.connections = []
threads = []
hc_thread = Thread(target=self.handle_clients)
ser_thread = Thread(target=self.handle_serial)
threads.append(hc_thread)
threads.append(ser_thread)
for t in threads:
t.start()
for t in threads: # shut down cleanly
t.join()
async def run(self):
while True:
if self.ser.inWaiting() > 0:
line = self.ser.readline()
print(line)
if __name__ == "__main__":
serv = DebugServer()
#asyncio.run(serv.run())