-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp.py
More file actions
161 lines (132 loc) · 5.07 KB
/
app.py
File metadata and controls
161 lines (132 loc) · 5.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import base64
import os
from urllib.parse import quote as urlquote
from flask import Flask, send_from_directory, Response
import dash
import dash_bootstrap_components as dbc
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Input, Output
import numpy as np
import cv2
UPLOAD_DIRECTORY = "/Users/luffy/Downloads/GitHub/video-to-frame/upload-files"
if not os.path.exists(UPLOAD_DIRECTORY):
os.makedirs(UPLOAD_DIRECTORY)
# Normally, Dash creates its own Flask server internally. By creating our own,
# we can create a route for downloading files directly:
server = Flask(__name__)
app = dash.Dash(server=server, external_stylesheets=[dbc.themes.BOOTSTRAP])
class video_object(object):
def __init__(self, URL):
self.video = cv2.VideoCapture(URL)
def __del__(self):
self.video.release()
def get_frame(self):
success, image = self.video.read()
ret, jpeg = cv2.imencode('.jpg', image)
return jpeg.tobytes()
def get_video_frame_nums(self):
return self.video.get(cv2.CAP_PROP_FRAME_COUNT)
progress_percentage = 0
def get_every_frame_from_video(video_object, folder_path):
global progress_percentage
success,image = video_object.video.read()
count = 0
amount_of_frames = video_object.get_video_frame_nums()
while success:
file_path = os.path.join(folder_path, "frame%d.jpg" % count)
cv2.imwrite(file_path, image) # save frame as JPEG file
success,image = video_object.video.read()
progress_percentage = count / amount_of_frames * 100
count += 1
@server.route("/download/<path:path>")
def download(path):
"""Serve a file from the upload directory."""
return send_from_directory(UPLOAD_DIRECTORY, path, as_attachment=True)
progress = html.Div(
[
dcc.Interval(id="progress-interval", n_intervals=0),
dbc.Progress(id="progress", striped=True, animated=True),
]
)
app.layout = html.Div(
[
html.H1("Video to Frame"),
html.H2("Upload"),
dcc.Upload(
id="upload-data",
children=html.Div(
["Drag and drop or click to select a video to upload."]
),
style={
"width": "100%",
"height": "60px",
"lineHeight": "60px",
"borderWidth": "1px",
"borderStyle": "dashed",
"borderRadius": "5px",
"textAlign": "center",
"margin": "10px",
},
multiple=True,
),
html.H2("Video"),
html.Ul(id="video-preview-label"),
progress,
],
style={"max-width": "500px"},
)
def save_file(name, content):
"""Decode and store a file uploaded with Plotly Dash."""
data = content.encode("utf8").split(b";base64,")[1]
with open(os.path.join(UPLOAD_DIRECTORY, name), "wb") as fp:
fp.write(base64.decodebytes(data))
def uploaded_files():
"""List the files in the upload directory."""
files = []
for filename in os.listdir(UPLOAD_DIRECTORY):
path = os.path.join(UPLOAD_DIRECTORY, filename)
if os.path.isfile(path):
files.append(filename)
return files
def file_download_link(filename):
"""Create a Plotly Dash 'A' element that downloads a file from the app."""
location = "/download/{}".format(urlquote(filename))
return html.A(filename, href=location)
def load_video(filename):
path = os.path.join(UPLOAD_DIRECTORY, filename)
video = video_object(path)
return video
@app.callback(
Output("video-preview-label", "children"),
[Input("upload-data", "filename"), Input("upload-data", "contents")],
)
def update_output(uploaded_filenames, uploaded_file_contents):
"""Save uploaded files and regenerate the file list."""
if uploaded_filenames is not None and uploaded_file_contents is not None:
if len(uploaded_filenames) > 1:
return [html.Li("Choose one video")]
save_file(uploaded_filenames[0], uploaded_file_contents[0])
if uploaded_filenames[0].lower().endswith((".avi", ".mp4", ".mov", ".3gp")):
folder_path = os.path.join(UPLOAD_DIRECTORY, os.path.splitext(uploaded_filenames[0])[0])
if not os.path.exists(folder_path):
os.makedirs(folder_path)
video = load_video(uploaded_filenames[0])
amount_of_frames = video.get_video_frame_nums()
get_every_frame_from_video(video, folder_path)
return [html.Li("Total frames: %d" % amount_of_frames)]
files = uploaded_files()
if len(files) == 0:
return [html.Li("No video yet!")]
else:
return [html.Li(file_download_link(filename)) for filename in files]
@app.callback(
[Output("progress", "value"), Output("progress", "children")],
[Input("progress-interval", "n_intervals")],
)
def update_progress(_):
global progress_percentage
progress = int(progress_percentage)
return progress, f"{progress} %" if progress >= 5 else ""
if __name__ == "__main__":
app.run_server(debug=True, port=8080)