-
Notifications
You must be signed in to change notification settings - Fork 12
Expand file tree
/
Copy pathvisualization.py
More file actions
128 lines (104 loc) · 5.59 KB
/
visualization.py
File metadata and controls
128 lines (104 loc) · 5.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import tensorflow.compat.v1 as tf
from utils import VideoCompressor, write_png, warp
import numpy as np
from PIL import Image
import pickle as pkl
import cv2
import argparse
import os
tf.logging.set_verbosity(tf.logging.ERROR)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
def flow_to_img(flow, normalize=True, info=None, flow_mag_max=None):
"""Convert flow to viewable image, using color hue to encode flow vector orientation, and color saturation to
encode vector length. This is similar to the OpenCV tutorial on dense optical flow, except that they map vector
length to the value plane of the HSV color model, instead of the saturation plane, as we do here.
Args:
flow: optical flow
normalize: Normalize flow to 0..255
info: Text to superimpose on image (typically, the epe for the predicted flow)
flow_mag_max: Max flow to map to 255
Returns:
img: viewable representation of the dense optical flow in RGB format
flow_avg: optionally, also return average flow magnitude
Ref:
- OpenCV 3.0.0-dev documentation » OpenCV-Python Tutorials » Video Analysis »
https://docs.opencv.org/3.0-beta/doc/py_tutorials/py_video/py_lucas_kanade/py_lucas_kanade.html
"""
flow = np.squeeze(flow, axis=0)
hsv = np.zeros((flow.shape[0], flow.shape[1], 3), dtype=np.uint8)
flow_magnitude, flow_angle = cv2.cartToPolar(flow[..., 0].astype(np.float32), flow[..., 1].astype(np.float32))
# A couple times, we've gotten NaNs out of the above...
nans = np.isnan(flow_magnitude)
if np.any(nans):
nans = np.where(nans)
flow_magnitude[nans] = 0.
# Normalize
hsv[..., 0] = flow_angle * 180 / np.pi / 2
if normalize is True:
if flow_mag_max is None:
hsv[..., 1] = cv2.normalize(flow_magnitude, None, 0, 255, cv2.NORM_MINMAX)
else:
hsv[..., 1] = flow_magnitude * 255 / flow_mag_max
else:
hsv[..., 1] = flow_magnitude
hsv[..., 2] = 255
img = cv2.cvtColor(hsv, cv2.COLOR_HSV2RGB)
# Add text to the image, if requested
if info is not None:
font = cv2.FONT_HERSHEY_SIMPLEX
cv2.putText(img, info, (20, 20), font, 0.8, (0, 0, 0), 2, cv2.LINE_AA)
return np.expand_dims(img, axis=0)
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--model", "-m", default="checkpoints/videocompressor8192.pkl",
help="Saved model that you want to analyse\n"
"Default=`checkpoints/videocompressor8192.pkl`")
parser.add_argument("--input", "-i", default="demo/input/",
help="Directory where uncompressed frames lie and what you want to analyze\n"
"Default=`demo/input/`")
parser.add_argument("--output", "-o", default="demo/visualization/",
help="Directory where you want the analyzed files to be saved\n"
"Default=`demo/visualization/`")
parseargs = parser.parse_args()
return parseargs
if __name__ == "__main__":
args = parse_args()
if not os.path.exists(args.output):
os.mkdir(args.output)
w, h, _ = np.array(Image.open(os.path.join(args.input, 'im1.png'))).shape
if w % 16 != 0 or h % 16 != 0:
raise ValueError('Height and Width must be mutiples of 16.')
testnet = VideoCompressor(training=False)
testtfprvs = tf.placeholder(tf.float32, shape=[1, w, h, 3], name="testfirst_frame")
testtfnext = tf.placeholder(tf.float32, shape=[1, w, h, 3], name="testsecond_frame")
#
_, _, _ = testnet(testtfprvs, testtfnext)
flow = testnet.ofnet(testtfprvs, testtfnext)
reconflow, _ = testnet.ofcomp(flow)
motionCompensated = warp(testtfprvs, reconflow)
res = testtfnext - motionCompensated
reconres, _ = testnet.rescomp(res)
recon_image = motionCompensated + reconres
testinit = tf.global_variables_initializer()
with tf.Session() as sess:
sess.run(testinit)
with open(args.model, "rb") as f:
testnet.set_weights(pkl.load(f))
tenFirst = np.array(Image.open(os.path.join(args.input,'im1.png'))).astype(np.float32) * (1.0 / 255.0)
tenFirst = np.expand_dims(tenFirst, axis=0)
tenSecond = np.array(Image.open(os.path.join(args.input,'im1.png'))).astype(np.float32) * (1.0 / 255.0)
tenSecond = np.expand_dims(tenSecond, axis=0)
realflow, realreconflow, realmotcom, realres, realreconres, realimage = sess.run([flow, reconflow, motionCompensated,
res, reconres, recon_image],
feed_dict={testtfprvs: tenFirst,
testtfnext: tenSecond})
realflow = flow_to_img(realflow)
realreconflow = flow_to_img(realreconflow)
sess.run(write_png(os.path.join(args.output, 'first.png'), tenFirst))
sess.run(write_png(os.path.join(args.output , 'second.png'), tenSecond))
sess.run(write_png(os.path.join(args.output,'flow.png'), realflow))
sess.run(write_png(os.path.join(args.output,'reconflow.png'), realreconflow))
sess.run(write_png(os.path.join(args.output , 'motioncompensated.png'), realmotcom))
sess.run(write_png(os.path.join(args.output , 'residue.png'), realres))
sess.run(write_png(os.path.join(args.output , 'reconresidue.png'), realreconres))
sess.run(write_png(os.path.join(args.output , 'reconstructed.png'), realimage))