-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathapp.py
More file actions
345 lines (279 loc) · 11.2 KB
/
app.py
File metadata and controls
345 lines (279 loc) · 11.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
from flask import Flask, jsonify, request, Response
from flask_cors import CORS
import asyncio
import threading
import io
import sys
import argparse
import yaml
import os
import queue
from pathlib import Path
import shutil
from caption_openai import main as caption_main
from hints.registration import get_available_hint_sources, get_hint_source_descriptions
import time
app = Flask(__name__)
CORS(app)
captioning_in_progress = False
captioning_lock = threading.Lock() # Lock to prevent race conditions
output_queue = queue.Queue()
current_task = None
def get_user_config_dir():
"""Get the user configuration directory path (cross-platform)"""
home = Path.home()
config_dir = home / '.vlm-caption'
config_dir.mkdir(exist_ok=True)
return config_dir
def get_user_config_backup_path(filename:str="caption.yaml"):
"""Get the path to the user's config backup file"""
return get_user_config_dir() / filename
def backup_config_to_user_dir(config_path):
"""Backup the config file to the user directory so it can be reloaded on app update
Returns bool if backup successful"""
if os.path.exists(config_path):
backup_path = get_user_config_backup_path()
shutil.copy2(config_path, backup_path)
return True
return False
def backup_config_to_user_dir_with_timestamp(config_path:str="caption.yaml") -> bool:
"""Backup the config file with a timestamp to the user directory so that old configs are logged
Returns bool if backup successful"""
current_time = time.localtime()
formatted_time = time.strftime("%Y-%m-%d-%H-%M", current_time)
backup_path = f"caption_{formatted_time}.yaml"
if os.path.exists(config_path):
backup_path = get_user_config_backup_path(backup_path)
shutil.copy2(config_path, backup_path)
return True
return False
def restore_config_from_user_dir(config_path) -> bool:
"""Restore config from user directory backup if it exists
Returns bool if backup successful"""
try:
backup_path = get_user_config_backup_path()
if backup_path.exists():
shutil.copy2(backup_path, config_path)
print(f"Restored config from {backup_path}")
return True
except Exception as e:
print(f"Failed to restore config from backup: {e}")
return False
async def run_captioning_task():
"""Wrapper function for caption_main that handles cancellation"""
try:
await caption_main()
except asyncio.CancelledError:
print("Captioning task was cancelled")
raise
@app.route('/api/stop', methods=['POST'])
def stop_captioning():
global current_task
if current_task is None or current_task.done():
return jsonify({'error': 'No captioning process is currently running'}), 400
try:
# Cancel the task
current_task.cancel()
return jsonify({
'success': True,
'message': 'Captioning cancellation requested'
})
except Exception as e:
return jsonify({
'success': False,
'error': f'Failed to cancel captioning: {str(e)}'
}), 500
@app.route('/api/status', methods=['GET'])
def get_status():
return jsonify({
'captioning_in_progress': captioning_in_progress
})
@app.route('/api/health', methods=['GET'])
def health_check():
return jsonify({'status': 'healthy'})
@app.route('/api/hint_sources', methods=['GET'])
def get_hint_sources():
try:
# Get available hint sources
hint_sources = get_available_hint_sources()
descriptions = get_hint_source_descriptions()
# Combine the data for the response
hint_sources_data = {}
for source, display_name in hint_sources.items():
hint_sources_data[source] = {
'display_name': display_name,
'description': descriptions.get(source, '')
}
return jsonify({
'success': True,
'hint_sources': hint_sources_data
})
except Exception as e:
return jsonify({
'success': False,
'error': f'Failed to get hint sources: {str(e)}'
}), 500
def config_init_restore_backup(config_path) -> bool:
""" App reinstall will wipe config, so this will attempt to
1. restore config from userhome or initialize from template
2. backup config to userhome for restore after app reinstall
Returns success (bool)"""
init_config_path = "init.yaml"
if not os.path.exists(config_path):
if restore_config_from_user_dir(config_path):
print("Restored configuration from user backup")
elif os.path.exists(init_config_path):
shutil.copy2(init_config_path, config_path)
print(f"Created {config_path} from {init_config_path}")
backup_config_to_user_dir(config_path) # for app reinstall/restore
else:
return False
return True
@app.route('/api/config', methods=['GET'])
def get_config():
try:
config_path = 'caption.yaml'
config_init_or_load_userhome_succeeded = config_init_restore_backup(config_path)
if not config_init_or_load_userhome_succeeded:
return jsonify({'error': 'Neither prior configuration file nor init template found'}), 404
with open(config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
return jsonify({
'success': True,
'config': config
})
except Exception as e:
return jsonify({
'success': False,
'error': f'Failed to load configuration: {str(e)}'
}), 500
@app.route('/api/config', methods=['POST'])
def update_config():
try:
data = request.get_json()
if not data:
return jsonify({'error': 'No configuration data provided'}), 400
new_config = data.get('config')
if not new_config:
return jsonify({'error': 'No config object provided'}), 400
config_path = 'caption.yaml'
init_config_path = 'init.yaml'
current_config = {}
if os.path.exists(config_path):
with open(config_path, 'r', encoding='utf-8') as f:
current_config = yaml.safe_load(f) or {}
backup_path = f"{config_path}.backup"
with open(config_path, 'r', encoding='utf-8') as src, open(backup_path, 'w', encoding='utf-8') as dst:
dst.write(src.read())
elif os.path.exists(init_config_path):
# If caption.yaml doesn't exist but init.yaml does, create it first
shutil.copy2(init_config_path, config_path)
print(f"Created {config_path} from {init_config_path}")
with open(config_path, 'r', encoding='utf-8') as f:
current_config = yaml.safe_load(f) or {}
merged_config = current_config.copy()
merged_config.update(new_config)
with open(config_path, 'w', encoding='utf-8') as f:
yaml.dump(merged_config, f, default_flow_style=False, sort_keys=False)
# Backup the updated config to user directory for persistence across reinstalls
backup_success = backup_config_to_user_dir(config_path)
actual_saved_path = os.path.abspath(config_path)
user_backup_path = get_user_config_backup_path()
message = f'Configuration saved to {actual_saved_path}'
if backup_success:
message += f' and backed up to {user_backup_path}'
else:
message += ' (warning: backup to user directory failed)'
return jsonify({
'success': True,
'message': message
})
except Exception as e:
return jsonify({
'success': False,
'error': f'Failed to save configuration: {str(e)}'
}), 500
class StreamingStdout:
"""Custom stdout class that writes to both console and queue"""
def __init__(self, original_stdout, output_queue):
self.original_stdout = original_stdout
self.output_queue = output_queue
self.buffer = ""
self.encoding = 'utf-8'
def write(self, text):
self.original_stdout.write(text)
self.original_stdout.flush()
self.buffer += text
while '\n' in self.buffer:
line, self.buffer = self.buffer.split('\n', 1)
self.output_queue.put(line + '\n')
def flush(self):
self.original_stdout.flush()
def run_captioning_with_streaming():
global captioning_in_progress, captioning_lock, output_queue, current_task
original_stdout = sys.stdout
try:
# Clear any remaining items in the output queue
while not output_queue.empty():
try:
output_queue.get_nowait()
except queue.Empty:
break
sys.stdout = StreamingStdout(original_stdout, output_queue)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
current_task = loop.create_task(run_captioning_task())
try:
loop.run_until_complete(current_task)
except asyncio.CancelledError:
output_queue.put("data: [CANCELLED] Captioning was cancelled by user\n\n")
output_queue.put("data: [COMPLETE]\n\n")
loop.close()
except Exception as e:
output_queue.put(f"data: [ERROR] run_captioning_with_streaming {str(e)}\n\n")
finally:
sys.stdout = original_stdout
with captioning_lock:
captioning_in_progress = False
def generate_stream():
"""Generator function for Server-Sent Events"""
global output_queue
captioning_thread = threading.Thread(target=run_captioning_with_streaming)
captioning_thread.daemon = True
captioning_thread.start()
yield "data: [STARTED] Captioning process started...\n\n"
while True:
try:
output = output_queue.get(timeout=1)
if output.startswith("data: [COMPLETE]"):
yield output
break
elif output.startswith("data: [ERROR]"):
yield output
break
else:
yield f"data: {output.rstrip()}\n\n"
except queue.Empty:
yield "data: [KEEPALIVE]\n\n"
if not captioning_thread.is_alive() and output_queue.empty():
break
except Exception as e:
yield f"data: [ERROR] Stream error: {str(e)}\n\n"
break
@app.route('/api/run', methods=['GET'])
def run_captioning_stream():
"""Start captioning process with real-time streaming output"""
global captioning_in_progress, captioning_lock
with captioning_lock:
if captioning_in_progress:
return jsonify({'error': 'Captioning is already in progress'}), 400
captioning_in_progress = True
backup_config_to_user_dir_with_timestamp()
return Response(generate_stream(), mimetype="text/event-stream")
if __name__ == '__main__':
argparser = argparse.ArgumentParser()
argparser.add_argument("--port", type=int, default=5000)
args = argparser.parse_args()
# Only enable debug mode if running from source (not packaged)
debug_mode = not getattr(sys, 'frozen', False)
app.run(debug=debug_mode, host='0.0.0.0', port=args.port)