-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathImageServer.py
More file actions
155 lines (129 loc) · 4.4 KB
/
Copy pathImageServer.py
File metadata and controls
155 lines (129 loc) · 4.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import socket
import threading
import time
from dataclasses import dataclass
from queue import Queue
import cv2
import numpy as np
@dataclass
class ImageInfo:
size: int
width: int
height: int
channels: int
@dataclass
class Image:
width: int
height: int
channels: int
data_type: np.dtype
data: bytearray
class ImageServer:
def __init__(self, host, port, queue):
self.host = host
self.port = port
self.queue = queue
def accecpt_client(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
# let's the socket to be reused
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((self.host, self.port))
s.listen()
connection, addr = s.accept()
print("Connected by", addr)
return connection
def start(self):
# keep the tcp listener up if the client diesconnects
while True:
print("Starting server...")
print("Waiting for connection...")
self.con = self.accecpt_client()
print("Client connected")
image_size = self.receive_bytes(4)
image_width = self.receive_bytes(4)
image_height = self.receive_bytes(4)
image_channels = self.receive_bytes(4)
image_datatype = int.from_bytes(self.receive_bytes(1), byteorder="big")
print("Received image info...")
data_type = np.uint8
match image_datatype:
case 0:
data_type = np.uint8
case 1:
data_type = np.uint16
case 2:
data_type = np.float32
case 3:
data_type = np.float64
case _:
data_type = np.uint8
img_info = ImageInfo(
size=int.from_bytes(image_size, byteorder="big"),
width=int.from_bytes(image_width, byteorder="big"),
height=int.from_bytes(image_height, byteorder="big"),
channels=int.from_bytes(image_channels, byteorder="big"),
)
# get image data as long as connection is open
while True:
data = self.receive_bytes(img_info.size)
if not data:
print("Connection closed")
break
img = Image(
width=img_info.width,
height=img_info.height,
channels=img_info.channels,
data_type=np.dtype(data_type),
data=data,
)
self.queue.put(img)
def receive_bytes(self, len_bytes):
data = bytearray()
while len(data) < len_bytes:
net_data = self.con.recv(len_bytes - len(data))
if not net_data:
return net_data
data.extend(net_data)
return data
def display_img(img_queue):
last_tick = time.time()
while True:
try:
img_bytes = img_queue.get(False)
print(f"Len received data: {len(img_bytes.data)}")
bayer_im = np.frombuffer(img_bytes.data, dtype=img_bytes.data_type)
bayer_im = bayer_im.reshape(
(img_bytes.height, img_bytes.width, img_bytes.channels)
)
frame_period = time.time() - last_tick
last_tick = time.time()
frame_rate = 1 / frame_period
cv2.putText(
img=bayer_im,
text="{:10.2f}fps".format(frame_rate),
org=(20, 30),
fontFace=cv2.FONT_HERSHEY_TRIPLEX,
fontScale=1,
color=(255, 0, 0),
thickness=1,
)
bgr = cv2.cvtColor(bayer_im, cv2.IMREAD_COLOR)
cv2.imshow("image", bgr)
cv2.waitKey(1)
except:
# print("No new image")
cv2.waitKey(1)
if __name__ == "__main__":
image_queue = Queue(maxsize=0)
host = "127.0.0.1" # Standard loopback interface address (localhost)
port = 3333
img_receiver = ImageServer(host, port, image_queue)
receive_thread = threading.Thread(
target=img_receiver.start,
)
display_thread = threading.Thread(
target=display_img,
args=(image_queue,),
)
receive_thread.start()
display_thread.start()