-
Notifications
You must be signed in to change notification settings - Fork 23
Expand file tree
/
Copy pathvcd4reader.py
More file actions
435 lines (363 loc) · 16 KB
/
vcd4reader.py
File metadata and controls
435 lines (363 loc) · 16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
import warnings
from pathlib import Path
import json
import numpy as np
import vcd.core as core
import vcd.types as types
import time
# TODO: get actions and objects per frame
# TODO: funtion to get frame intervals per action or object presence
#TODO: delete unecessary code
# dict for changes in structures
# data manipulation is only for external structures
dmd_struct = {
"groups": {
"grupo1A": "gA",
"grupo2A": "gB",
"grupo2M": "gC",
"grupo3B": "gD",
"grupoE": "gE",
"grupo4B": "gF",
"grupoZ": "gZ",
},
"sessions": {
"attm": "s1",
"atts": "s2",
"reach": "s3",
"attc": "s4",
"gaze": "s5",
"gazec": "s6",
"drow": "s7",
"attm2": "s1",
"atts2": "s2",
"reach2": "s3",
"attc2": "s4",
"gaze2": "s5",
"gazec2": "s6",
"drow2": "s7",
},
}
# Type of annotation
annotate_dict = {0: "unchanged", 1: "manual", 2: "interval"}
def is_string_int(s):
try:
int(s)
return True
except ValueError:
return False
def keys_exists(element, *keys):
"""
Check if *keys (nested) exists in `element` (dict).
"""
if not isinstance(element, dict):
raise AttributeError("keys_exists() expects dict as first argument.")
if len(keys) == 0:
raise AttributeError(
"keys_exists() expects at least two arguments, one given.")
_element = element
for key in keys:
try:
_element = _element[key]
except KeyError:
return False
return True
class VcdHandler():
def __init__(self, vcd_file: Path):
# vcd variables
self._vcd = None
self._vcd_file = str(vcd_file)
self.__vcd_loaded = False
# If vcd_file exists then load data into vcd object
if vcd_file.exists():
# -- Load OpenLABEL from file --
# Create a VCD instance and load file
# OpenLABEL json is in self._vcd.data
self._vcd = core.VCD()
self._vcd.load_from_file(file_name=self._vcd_file)
#Number of frames in video
self.__full_mosaic_frames= int(self._vcd.get_frame_intervals().get_dict()[0]["frame_end"]) + 1
#Number of actions in OpenLABEL
self.__num_actions = self._vcd.get_num_actions()
#Number of objects in OpenLABEL, including the driver
self.__num_objects = self._vcd.get_num_objects()
print("There are %s actions in this OpenLABEL" % (self.__num_actions+self.__num_objects-1)) #minus 1 for "driver" object
self.__vcd_loaded = True
else:
raise RuntimeError("OpenLABEL file not found.")
#function to get intervals from specific action, providing its name or its uid
def get_frames_intervals_of_action(self, uid):
if isinstance(uid, str):
uid = self.is_action_type_get_uid(uid)[1]
if uid >=0:
intervals = self._vcd.get_action(str(uid))["frame_intervals"]
return intervals
else:
raise RuntimeError("WARNING: OpenLABEL does not have action with uid",uid)
#function to get intervals from specific object, providing its name or its uid
def get_frames_intervals_of_object(self, uid):
if isinstance(uid, str):
uid = self.is_object_type_get_uid(uid)[1]
if uid >=0:
intervals = self._vcd.get_object(str(uid))["frame_intervals"]
return intervals
else:
raise RuntimeError("WARNING: OpenLABEL does not have an object with uid",uid)
#Function to know if an action name (label) given is an action type name. It is useful because type names are composed by level_name/label_name
#Also returns uid of action (e.g "only_left" will return 8)
def is_action_type_get_uid(self, action_string):
for uid, action_type in enumerate(self.get_action_type_list()):
if action_string == action_type.split("/")[1] or action_string == action_type:
return True, uid
return False, -1
#Function to know if an object type name (label) given exists in OpenLABEL.
#Also returns uid of object (e.g "driver" will return 0)
def is_object_type_get_uid(self, object_string):
for uid, object_type in enumerate(self.get_object_type_list()):
#If class name comes with type/classname
if len(object_string.split("/"))>1:
if object_string.split("/")[1] == object_type:
return True, uid
else:
if object_string == object_type:
return True, uid
return False, -1
#Funcion to go through the OpenLABEL and get the "type" val of all objects available
def get_object_type_list(self):
object_type_list = []
if self._vcd_file:
for uid in range(self.__num_objects):
object_type_list.append(self._vcd.get_object(str(uid)).get('type'))
return object_type_list
#Funcion to go through the OpenLABEL and get the "type" val of all actions available
def get_action_type_list(self):
action_type_list = []
if self._vcd_file:
for uid in range(self.__num_actions):
action_type_list.append(self._vcd.get_action(str(uid)).get('type'))
return action_type_list
# Return flag that indicate if OpenLABEL was loaded from file
def fileLoaded(self):
return self.__vcd_loaded
# This function reads each stream video uri from the OpenLABEL
def get_videos_uri(self):
streams_data = self._vcd.get_streams()
general = str(streams_data["general_camera"]["uri"])
return general
def get_frames_number(self):
return int(self._vcd.get_frame_intervals().get_dict()[0]["frame_end"]) + 1
"""def get_frames_with_action_data_name(self, uid, data_name):
frames = []
if uid in self.data['vcd']['actions'] and uid in self.__object_data_names:
object_ = self.data['vcd']['actions'][uid]
if data_name in self.__object_data_names[uid]:
# Now look into Frames
fis = object_['frame_intervals']
for fi in fis:
fi_tuple = (fi['frame_start'], fi['frame_end'])
for frame_num in range(fi_tuple[0], fi_tuple[1]+1):
if self.has_frame_object_data_name(frame_num, data_name, uid):
frames.append(frame_num)
return frames
def get_frames_with_action(self, action_uid):
frames = []
if uid_action in self.data['vcd']['actions']: #and uid in self.__object_data_names:
action_ = self.data['vcd']['actions'][uid]
if data_name in self.__object_data_names[uid]:
# Now look into Frames
fis = action_['frame_intervals']
for fi in fis:
fi_tuple = (fi['frame_start'], fi['frame_end'])
for frame_num in range(fi_tuple[0], fi_tuple[1]+1):
if self.has_frame_object_data_name(frame_num, data_name, uid):
frames.append(frame_num)
return frames
def has_frame_action_data_name(self, frame_num, data_name, uid_=-1):
if frame_num in self.data['vcd']['frames']:
for uid, obj in self.data['vcd']['frames'][frame_num]['actions'].items():
if uid_ == -1 or uid == uid_: # if uid == -1 means we want to loop over all objects
for valArray in obj['action_data'].values():
for val in valArray:
if val['name'] == data_name:
return True
return False"""
class VcdDMDHandler(VcdHandler):
def __init__(self, vcd_file: Path):
super().__init__(vcd_file)
# Internal Variables initialization
self.__uid_driver = None
self.ont_uid = 0
self.__group = None
self.__subject = None
self.__session = None
self.__date = None
self.__bf_shift = None
self.__hb_shift = None
self.__hf_shift = None
self.__face_frames = None
self.__body_frames = None
self.__hands_frames = None
self.__face_uri = None
self.__body_uri = None
self.__hands_uri = None
# Check required essential fields inside to be considered loaded
#vcd_metadata = self._vcd.data["vcd"]
vcd_streams = self._vcd.get_streams()
body_sh_exist = keys_exists(vcd_streams,"body_camera","stream_properties","sync","frame_shift")
hands_sh_exist = keys_exists(vcd_streams,"hands_camera","stream_properties","sync","frame_shift")
# If shifts fields exist then consider the OpenLABEL loaded was valid
if body_sh_exist and hands_sh_exist:
self.__vcd_loaded = True
else:
raise RuntimeError(
"OpenLABEL doesn't have all necesary information. Not valid."
)
# -- Get video info --
# Get video basic metadata
self.__group, self.__subject, self.__session, self.__date = self.get_basic_metadata()
# Get stream shifts
self.__bf_shift, self.__hf_shift, self.__hb_shift = self.get_shifts()
#Get video uri's
self.__face_uri, self.__body_uri, self.__hands_uri = self.get_videos_uris()
# Get frame numbers
self.__face_frames, self.__body_frames, self.__hands_frames = self.get_frame_numbers()
def get_basic_metadata(self):
if self._vcd_file:
if dict(self._vcd.get_metadata())["name"]:
# e.g: gA_1_s1_2019-03-08T09;31;15+01;00
name = str(dict(self._vcd.get_metadata())["name"]).split("_")
group = name[0]
subject = name[1]
session = name[2]
if not self._vcd.get_context_data(0, "recordTime") ==None:
record_time =self._vcd.get_context_data(0, "recordTime")["val"]
record_time = record_time.replace(";", ":")
date = record_time.split("T")
# Just get day and hour from the full timestamp
date = date[0]+"-"+date[1].split("+")[0]
else:
#current date
named_tuple = time.localtime() # get struct_time
date = time.strftime("%Y-%m-%d-%H;%M;%S", named_tuple)
return group, subject, session, date
else:
raise RuntimeError("WARNING: OpenLABEL does not have a name")
else:
return self.__group, self.__subject, self.__session, self.__date
# This function allows to get the stream shifts directly from a valid and
# loaded OpenLABEL file
# Returns:
# @body_face_shift
# @hands_face_shift
# @hands_body_shift
def get_shifts(self):
if self.__vcd_loaded:
stream = self._vcd.get_stream("body_camera")
body_face_sh = stream['stream_properties']['sync']['frame_shift']
stream = self._vcd.get_stream("hands_camera")
hands_face_sh = stream['stream_properties']['sync']['frame_shift']
hands_body_sh = hands_face_sh - body_face_sh
else:
body_face_sh = self.__bf_shift
hands_face_sh = self.__hf_shift
hands_body_sh = self.__hb_shift
return body_face_sh, hands_face_sh, hands_body_sh
# This function reads each stream video uri from the OpenLABEL
def get_videos_uris(self):
if self.__vcd_loaded:
stream = self._vcd.get_stream("face_camera")
face = str(stream["uri"])
stream = self._vcd.get_stream("body_camera")
body = str(stream["uri"])
stream = self._vcd.get_stream("hands_camera")
hands = str(stream["uri"])
else:
face = self.__face_uri
body = self.__body_uri
hands = self.__hands_uri
return face, body, hands
# This function reads the number of frames of the hands video from the OpenLABEL
def get_frame_numbers(self):
if self.__vcd_loaded:
stream = self._vcd.get_stream("face_camera")
face = int(stream["stream_properties"]["total_frames"])
stream = self._vcd.get_stream("body_camera")
body = int(stream["stream_properties"]["total_frames"])
stream = self._vcd.get_stream("hands_camera")
hands = int(stream["stream_properties"]["total_frames"])
else:
face = self.__face_frames
body = self.__body_frames
hands = self.__hands_frames
return face, body, hands
def get_intrinsics(self):
stream = self._vcd.get_stream("face_camera")
face = stream['stream_properties']['intrinsics_pinhole'][
'camera_matrix_3x4']
stream = self._vcd.get_stream("body_camera")
body = stream['stream_properties']['intrinsics_pinhole'][
'camera_matrix_3x4']
stream = self._vcd.get_stream("hands_camera")
hands = stream['stream_properties']['intrinsics_pinhole'][
'camera_matrix_3x4']
return face, body, hands
def isNumberOfFrames(self):
exist = True
face, body, hands = self.get_frame_numbers()
if face == 0 or hands == 0 or body == 0:
exist = False
return exist
# this functions checks if the OpenLABEL has the fields of statics annotations
# and the numbers of frames registered are not 0. If true, static
# annotations exist
def isStaticAnnotation(self, staticDict, obj_id):
exist = True
vcd_object = self._vcd.get_object(obj_id)
for att in staticDict:
att_exist = keys_exists(
vcd_object, "object_data", str(att["type"]))
if not att_exist:
exist = False
break
frames = self.isNumberOfFrames()
if not (frames and exist):
exist = False
return exist
# This function get different values from OpenLABEL to keep the consistency when
# the user saves/creates a new OpenLABEL
# @staticDict: dict of static annotations to get its values from OpenLABEL
# @ctx_id: id of the context (in this case 0)
def getStaticVector(self, staticDict, ctx_id):
for x in range(5):
att = staticDict[x]
# Get each of the static annotations of the directory from the OpenLABEL
object_vcd = dict(self._vcd.get_object_data(0, att["name"]))
att.update({"val": object_vcd["val"]})
# context
context = dict(self._vcd.get_context(ctx_id))["context_data"]["text"]
staticDict[5].update({"val": context[0]["val"]})
staticDict[6].update({"val": context[1]["val"]})
# record_time = context[2]["val"]
# Annotator id
meta_data = dict(self._vcd.get_metadata())
annotator = meta_data["annotator"]
staticDict[7].update({"val": annotator})
# returns:
# @staticDict: the dict with the values taken from the OpenLABEL
return staticDict
# This function get different values from OpenLABEL to keep the consistency when
# the user saves/creates a new OpenLABEL
# @ctx_id: id of the object (in this case 0)
def getMetadataVector(self, ctx_id):
# context
record_time = 0
if not self._vcd.get_context_data(ctx_id, "recordTime") ==None:
record_time =self._vcd.get_context_data(ctx_id, "recordTime")["val"]
# frames
face,body,hands = self.get_frame_numbers()
#intrinsics
face_mat, body_mat, hands_mat = self.get_intrinsics()
# returns:
# @face_meta: [rgb_video_frames,mat]
# @body_meta: [date_time,rgb_video_frames,mat]
# @face_meta: [rgb_video_frames,mat]
return [face, face_mat], [record_time, body, body_mat], [hands, hands_mat]