-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexample_tcp_server.py
More file actions
executable file
·57 lines (53 loc) · 2.32 KB
/
example_tcp_server.py
File metadata and controls
executable file
·57 lines (53 loc) · 2.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
"""Example TCP socket server."""
import socket
import json
def main():
"""Test TCP Socket Server."""
# Create an INET, STREAMing socket, this is TCP
# Note: context manager syntax allows for sockets to automatically be
# closed when an exception is raised or control flow returns.
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
# Bind the socket to the server
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("localhost", 8000))
sock.listen()
# Socket accept() will block for a maximum of 1 second. If you
# omit this, it blocks indefinitely, waiting for a connection.
sock.settimeout(1)
while True:
# Wait for a connection for 1s. The socket library avoids consuming
# CPU while waiting for a connection.
try:
clientsocket, address = sock.accept()
except socket.timeout:
continue
print("Connection from", address[0])
# Socket recv() will block for a maximum of 1 second. If you omit
# this, it blocks indefinitely, waiting for packets.
clientsocket.settimeout(1)
# Receive data, one chunk at a time. If recv() times out before we
# can read a chunk, then go back to the top of the loop and try
# again. When the client closes the connection, recv() returns
# empty data, which breaks out of the loop. We make a simplifying
# assumption that the client will always cleanly close the
# connection.
with clientsocket:
message_chunks = []
while True:
try:
data = clientsocket.recv(4096)
except socket.timeout:
continue
if not data:
break
message_chunks.append(data)
# Decode list-of-byte-strings to UTF8 and parse JSON data
message_bytes = b''.join(message_chunks)
message_str = message_bytes.decode("utf-8")
try:
message_dict = json.loads(message_str)
except json.JSONDecodeError:
continue
print(message_dict)
if __name__ == "__main__":
main()