Skip to content

Commit fd317e7

Browse files
committed
Improved readability with the addition of the SecurityMonitor.Div class and thread names.
1 parent 97e69e4 commit fd317e7

1 file changed

Lines changed: 49 additions & 31 deletions

File tree

security_monitor.py

Lines changed: 49 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import copy
3737
import queue
3838
from enum import IntEnum
39-
from typing import Optional
39+
from typing import Optional, NamedTuple
4040
from dataclasses import dataclass
4141
from dataclasses_json import dataclass_json
4242
import mpv
@@ -232,10 +232,25 @@ def stop(self):
232232
self._sock.close()
233233

234234
class SecurityMonitor():
235-
""" Security Monitor Windowing and Splitting """
235+
"""
236+
Security Monitor Windowing and Splitting
237+
238+
This class spawns and refreshes the video wall.
239+
240+
Through testing, it was found that permanent single players are not
241+
reliable. So, this class uses hot-staging in order to spawn players before
242+
bottom players are stopped and cleaned.
243+
244+
"""
236245
urls = ["rtsp://maglab:magcat@connor.maglab:8554/Camera1_sub",
237246
"rtsp://maglab:magcat@connor.maglab:8554/Camera2_sub"]
238247

248+
class Div(NamedTuple):
249+
""" Storage for calculated video wall geometry """
250+
col: int
251+
row: int
252+
num: int
253+
239254
# initialize with an event and division index
240255
# sample division indices to divisions:
241256
# 0 -> 1x1
@@ -246,11 +261,13 @@ class SecurityMonitor():
246261
def __init__(self, quit_queue, splitter_refresh_rate, div_idx):
247262
self.refresh_rate = splitter_refresh_rate
248263
self._queue_all = quit_queue
264+
# self._div is a three-element list consisting of
265+
# `columns`, `rows`, and `total players`
249266
self._div = self.calc_div(div_idx)
250267

251-
self.que = [multiprocessing.SimpleQueue() for _ in range(self._div[2]*2)]
252-
self.proc = [None] * (self._div[2]*2)
253-
self.url_idx = list(range(self._div[2]))
268+
self.que = [multiprocessing.SimpleQueue() for _ in range(self._div.num*2)]
269+
self.proc = [None] * (self._div.num*2)
270+
self.url_idx = list(range(self._div.num))
254271

255272
# Helper Functions
256273
# generate position string based on divisions and index
@@ -272,33 +289,32 @@ def _gen_pos(self, div, pos):
272289
def _gen_geo_str(self, idx):
273290
# divisions
274291
# must be greater than 0
275-
assert self._div[0] > 0
276-
assert self._div[1] > 0
292+
assert self._div.col > 0
293+
assert self._div.row > 0
277294
# must have columns and rows in division
278295
assert len(self._div) == 3
279296

280297
# position
281298
# calculate column and row
282-
col_div = self._div[0]
283-
row_div = self._div[1]
299+
col_div = self._div.col
300+
row_div = self._div.row
284301
[col_pos, row_pos] = self._idx2pos(idx)
285302
# positions must be less than divisions
286-
assert col_pos < self._div[0]
287-
assert row_pos < self._div[1]
303+
assert col_pos < self._div.col
304+
assert row_pos < self._div.row
288305

289306
# column width calculation
290-
geo_str=f"{100//self._div[0]}%"
307+
geo_str=f"{100//self._div.col}%"
291308
# row width calculation
292-
geo_str=f"{geo_str}x{100//self._div[1]}%"
309+
geo_str=f"{geo_str}x{100//self._div.row}%"
293310
# column position
294311
geo_str += self._gen_pos(col_div, col_pos)
295312
# row position
296313
geo_str += self._gen_pos(row_div, row_pos)
297314

298315
return geo_str
299316

300-
@staticmethod
301-
def calc_div(index):
317+
def calc_div(self, index):
302318
"""
303319
calculate number of divisions based on a magic index number
304320
returns : a three-element list consisting of columns, rows, and total players
@@ -316,16 +332,16 @@ def calc_div(index):
316332
row += 1
317333

318334
# final element is the total number of players visible
319-
return [col, row, col * row]
335+
return self.Div(col, row, col * row)
320336

321337
# index to position. position is a tuple.
322338
def _idx2pos(self, idx):
323-
assert idx < self._div[2]
324-
return [idx % self._div[0], idx // self._div[0]]
339+
assert idx < self._div.num
340+
return [idx % self._div.col, idx // self._div.col]
325341

326342
# this process actually contains the mpv stream player
327-
def _play_process(self, queue_in, queue_out, name):
328-
idx = self.url_idx[name % self._div[2]]
343+
def _play_process(self, queue_in, queue_out, p_idx):
344+
idx = self.url_idx[p_idx % self._div.num]
329345
geo_str = self._gen_geo_str(idx)
330346
player = mpv.MPV()
331347
# a series of configuration options that make the player act like a
@@ -343,15 +359,15 @@ def _play_process(self, queue_in, queue_out, name):
343359
# wait until the player is playing
344360
# timeout added here to terminate if the URL is not found
345361
try:
346-
logging.debug(f"Waiting for player {name} to start...")
362+
logging.debug(f"Waiting for player {p_idx} to start...")
347363
player.wait_until_playing(timeout=15)
348364
# set the output event to terminate the player behind this one
349365
# pylint: disable-next=broad-exception-caught
350366
except Exception as exc:
351-
logging.error(f"Player {name} stopped while waiting to start playing: {str(exc)}")
367+
logging.error(f"Player {p_idx} stopped while waiting to start playing: {str(exc)}")
352368
player.terminate()
353369
finally:
354-
logging.debug(f"Asking player below {name} to end.")
370+
logging.debug(f"Asking player below {p_idx} to end.")
355371
queue_out.put(True)
356372

357373
while True:
@@ -371,27 +387,29 @@ def _play_process(self, queue_in, queue_out, name):
371387
# pylint: disable-next=lost-exception
372388
break
373389

374-
logging.info(f"Player {name} stopping.")
390+
logging.info(f"Player {p_idx} stopping.")
375391
player.terminate()
376392
del player
377393

378394
# helper function to spawn a player
379395
def _handle_player(self, last_p, running = True):
380396
# inital player logic
381397
if running:
382-
# self._div[2] is the number of players visible.
383-
# the actual number of players is self._div[2] * 2
384-
i_play = (last_p + self._div[2]) % (self._div[2] * 2)
398+
# spawn a replacement player
399+
# the index calculation is last_p + self._div.num + 1 - 1
400+
# the +1 is for the next player, the -1 is because of zero-indexing
401+
i_play = (last_p + self._div.num) % (self._div.num * 2)
385402
else:
386403
# state where the players are initializing
387404
i_play = last_p
388-
last_p = (last_p + self._div[2]) % (self._div[2] * 2)
405+
last_p = (last_p + self._div.num) % (self._div.num * 2)
389406
logging.debug(f"Starting player: {i_play}")
390407
self.proc[i_play] = multiprocessing.Process(target=self._play_process, args=(
391408
self.que[i_play],
392409
self.que[last_p],
393410
i_play))
394411
self.proc[i_play].daemon = True
412+
self.proc[i_play].name = f"Player {i_play}"
395413
# clear the queue
396414
while not self.que[i_play].empty():
397415
logging.debug(f"Cleaning queue for player {i_play} before starting")
@@ -406,11 +424,11 @@ def _handle_player(self, last_p, running = True):
406424
def main(self):
407425
""" main / run function within the class """
408426
logging.info("Starting security monitor")
409-
assert len(self.urls) >= self._div[2]
427+
assert len(self.urls) >= self._div.num
410428

411429
try:
412430
# start initial players
413-
for i in range(self._div[2]):
431+
for i in range(self._div.num):
414432
self._handle_player(i, False)
415433
time_cnt = 0
416434
p_cnt = 0
@@ -428,7 +446,7 @@ def main(self):
428446
self.proc[p_cnt].kill()
429447
else:
430448
logging.debug(f"Successfully joined {p_cnt}")
431-
p_cnt = (p_cnt + 1) % (self._div[2]*2)
449+
p_cnt = (p_cnt + 1) % (self._div.num*2)
432450
try:
433451
_ = self._queue_all.get(timeout=1)
434452
# if the queue returns data, shut everything down

0 commit comments

Comments
 (0)