-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcli.py
More file actions
377 lines (306 loc) · 14.2 KB
/
cli.py
File metadata and controls
377 lines (306 loc) · 14.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
"""CLI entry point — argparse interface wiring the full splicer pipeline."""
import argparse
import random
import shutil
import sys
import tempfile
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from core.config import SplicerConfig
from core.platform import platform_check, PlatformError
from core.probe import probe, ProbeError
from core.normalize import normalize_video, normalize_image
from core.chunk import chunk_video
from core.assemble import assemble
from core.manifest import Manifest
from core.prep import grain_video, greyscale_video, collect_videos
IMAGE_EXTENSIONS = {".png", ".jpg", ".jpeg", ".bmp", ".tiff", ".tif", ".webp"}
VIDEO_EXTENSIONS = {".mp4", ".mov", ".avi", ".mkv", ".webm", ".m4v", ".mpg", ".mpeg", ".ts", ".gif"}
def main() -> None:
args = parse_args()
config = build_config(args)
# Verify platform and tool availability
try:
platform_check()
except PlatformError as e:
print(f"error: {e}", file=sys.stderr)
sys.exit(1)
# Benchmark mode — run benchmark and exit
if args.benchmark:
from core.bench import run_benchmark, CALIBRATION_FILENAME
cal = run_benchmark(config, verbose=True)
cal_path = Path(CALIBRATION_FILENAME)
cal.save(cal_path)
if not args.dry_run:
return
# If both --benchmark and --dry-run, fall through to dry-run
# Dry-run mode — estimate and exit
if args.dry_run:
from core.estimator import estimate as run_estimate
if not args.inputs:
print("error: --dry-run requires input files or directories", file=sys.stderr)
sys.exit(1)
input_paths = collect_inputs(args.inputs)
if not input_paths:
print("error: no valid input files found", file=sys.stderr)
sys.exit(1)
est = run_estimate(input_paths, config)
est.print_summary()
return
# Prep mode — preprocess and exit
if args.prep:
run_prep(args, config)
return
# Validate inputs for main pipeline (nargs="*" allows empty)
if not args.inputs:
print("error: inputs are required (provide files or directories)", file=sys.stderr)
sys.exit(1)
# Resolve RNG seed
if config.rng_seed is None:
config.rng_seed = random.randint(0, 2**31 - 1)
rng = random.Random(config.rng_seed)
print(f"RNG seed: {config.rng_seed}")
# Collect input files
input_paths = collect_inputs(args.inputs)
if not input_paths:
print("error: no valid input files found", file=sys.stderr)
sys.exit(1)
videos = [p for p in input_paths if p.suffix.lower() in VIDEO_EXTENSIONS]
images = [p for p in input_paths if p.suffix.lower() in IMAGE_EXTENSIONS]
print(f"Inputs: {len(videos)} video(s), {len(images)} image(s)")
# Setup output
output_dir = Path(config.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
with tempfile.TemporaryDirectory(prefix="splicer_") as tmpdir:
work_dir = Path(tmpdir)
norm_dir = work_dir / "normalized"
chunk_dir = work_dir / "chunks"
norm_dir.mkdir()
chunk_dir.mkdir()
# --- Phase 1: Probe & Normalize ---
print(f"\n--- Probing and normalizing inputs ({config.max_workers} workers) ---")
normalized_videos: list[Path] = []
def _normalize_one(idx: int, vid: Path) -> tuple[int, Path | None, str]:
"""Probe and normalize a single video. Returns (index, output_path, status_msg)."""
try:
info = probe(vid)
vfr_note = f" (VFR->CFR {config.target_fps}fps)" if info.is_vfr else ""
out = norm_dir / f"norm_{idx:04d}.mp4"
normalize_video(vid, out, config, probe_result=info)
return idx, out, f" [{idx+1}/{len(videos)}] {vid.name}{vfr_note}"
except (ProbeError, RuntimeError) as e:
return idx, None, f" [{idx+1}/{len(videos)}] {vid.name} SKIPPED: {e}"
with ThreadPoolExecutor(max_workers=config.max_workers) as pool:
futures = {pool.submit(_normalize_one, i, v): i for i, v in enumerate(videos)}
results: list[tuple[int, Path | None, str]] = []
for future in as_completed(futures):
idx, out, msg = future.result()
print(msg)
results.append((idx, out, msg))
# Sort by original index for deterministic ordering
results.sort(key=lambda r: r[0])
normalized_videos = [out for _, out, _ in results if out is not None]
for i, img in enumerate(images):
print(f" [img {i+1}/{len(images)}] {img.name}")
# Images get normalized during assembly, just validate here
try:
probe(img)
except ProbeError as e:
print(f" SKIPPED: {e}")
images = [p for p in images if p != img]
if not normalized_videos and not images:
print("error: no inputs survived normalization", file=sys.stderr)
sys.exit(1)
# --- Phase 2: Chunk ---
print(f"\n--- Chunking normalized videos ({config.max_workers} workers) ---")
# Pre-generate deterministic per-video sub-RNGs sequentially
sub_seeds = [rng.randint(0, 2**31 - 1) for _ in normalized_videos]
def _chunk_one(idx: int, norm_path: Path, seed: int) -> tuple[int, list]:
"""Chunk a single normalized video with its own sub-RNG."""
sub_rng = random.Random(seed)
# Each video gets its own subdir to avoid filename collisions
vid_chunk_dir = chunk_dir / f"v{idx:04d}"
vid_chunk_dir.mkdir(exist_ok=True)
chunks = chunk_video(norm_path, vid_chunk_dir, config, sub_rng, start_index=0)
print(f" [{idx+1}/{len(normalized_videos)}] {norm_path.name} -> {len(chunks)} chunks")
return idx, chunks
with ThreadPoolExecutor(max_workers=config.max_workers) as pool:
futures = {
pool.submit(_chunk_one, i, p, s): i
for i, (p, s) in enumerate(zip(normalized_videos, sub_seeds))
}
chunk_results: list[tuple[int, list]] = []
for future in as_completed(futures):
chunk_results.append(future.result())
# Reassemble in original order and re-index chunks
chunk_results.sort(key=lambda r: r[0])
all_chunks = []
chunk_idx = 0
for _, chunks in chunk_results:
for c in chunks:
c.chunk_index = chunk_idx
chunk_idx += 1
all_chunks.extend(chunks)
print(f"Total chunks: {len(all_chunks)}")
# --- Phase 3: Assemble ---
print("\n--- Assembling final output ---")
manifest = Manifest(
rng_seed=config.rng_seed,
config_snapshot=config.to_dict(),
)
# Record chunks in manifest
for c in all_chunks:
manifest.add_chunk(c)
output_path = output_dir / "splicer_output.mp4"
assemble(
chunks=all_chunks,
image_paths=[str(p) for p in images],
config=config,
rng=rng,
manifest=manifest,
work_dir=work_dir,
output_path=output_path,
)
# --- Phase 4: Write manifest ---
manifest_path = output_dir / "splicer_manifest.json"
manifest.save(manifest_path)
print(f"\nManifest: {manifest_path}")
print(f"Output: {output_path}")
print(f"Frames: {manifest.actual_frame_count} (expected {manifest.expected_frame_count})")
if manifest.luma_flags:
print(f"Luma warnings: {len(manifest.luma_flags)}")
print("Done.")
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(
prog="splicer",
description="Semantic threshold video splicer — rapid-cut assembly for perceptual disruption",
)
p.add_argument(
"inputs", nargs="*",
help="Input video/image files or directories containing them",
)
p.add_argument("-o", "--output-dir", default="output", help="Output directory (default: output)")
p.add_argument("--seed", type=int, default=None, help="RNG seed for reproducibility")
# Resolution
res = p.add_mutually_exclusive_group()
res.add_argument("--hd", action="store_true", help="1920x1080 output (default)")
res.add_argument("--ntsc", action="store_true", help="720x480 NTSC CRT output")
res.add_argument("--pal", action="store_true", help="720x576 PAL CRT output")
res.add_argument("--resolution", type=str, default=None, help="Custom WxH (e.g. 1280x720)")
# Aspect
p.add_argument("--aspect", choices=["letterbox", "crop", "stretch"], default="letterbox")
# Timing
p.add_argument("--chunk-min", type=int, default=None, help="Min chunk frames (default: 3)")
p.add_argument("--chunk-max", type=int, default=None, help="Max chunk frames (default: 5)")
p.add_argument("--fps", type=int, default=None, help="Target framerate (default: 24)")
# Anti-strobe
p.add_argument("--no-antistrobe", action="store_true", help="Disable all anti-strobe processing")
p.add_argument("--buffer-frames", type=int, default=None, help="Gray buffer frames between cuts (default: 1)")
p.add_argument("--luma-strength", type=float, default=None, help="Luma normalization strength 0.0-1.0 (default: 0.5)")
p.add_argument("--luma-threshold", type=int, default=None, help="Luma delta warning threshold 0-255 (default: 80)")
# Codec
p.add_argument("--preset", default=None, help="x264 preset (default: fast)")
# Parallelism
p.add_argument("--workers", type=int, default=None, help="Thread pool size for parallel operations (default: 4)")
# Prep mode
p.add_argument("--prep", action="store_true", help="Preprocessing mode — grain/greyscale sources, then exit (no splicer pipeline)")
p.add_argument("--grain", action="store_true", help="(prep) Split long videos into segments")
p.add_argument("--greyscale", action="store_true", help="(prep) Re-encode videos to greyscale")
p.add_argument("--grain-duration", type=int, default=None, help="(prep) Grain segment length in seconds (default: 60)")
# Benchmark & estimation
p.add_argument("--benchmark", action="store_true", help="Run benchmark on synthetic clip, save calibration data, then exit")
p.add_argument("--dry-run", action="store_true", dest="dry_run", help="Probe inputs and estimate pipeline time/size, then exit")
return p.parse_args()
def build_config(args: argparse.Namespace) -> SplicerConfig:
"""Build SplicerConfig from parsed CLI args."""
if args.ntsc:
config = SplicerConfig.ntsc_crt()
elif args.pal:
config = SplicerConfig.pal_crt()
else:
config = SplicerConfig()
if args.resolution:
parts = args.resolution.lower().split("x")
if len(parts) == 2:
config.target_resolution = (int(parts[0]), int(parts[1]))
config.output_dir = args.output_dir
config.aspect_mode = args.aspect
if args.seed is not None:
config.rng_seed = args.seed
if args.chunk_min is not None:
config.chunk_frames_min = args.chunk_min
if args.chunk_max is not None:
config.chunk_frames_max = args.chunk_max
if args.fps is not None:
config.target_fps = args.fps
if args.no_antistrobe:
config.antistrobe_enabled = False
if args.buffer_frames is not None:
config.antistrobe_buffer_frames = args.buffer_frames
if args.luma_strength is not None:
config.antistrobe_luma_strength = args.luma_strength
if args.luma_threshold is not None:
config.antistrobe_delta_threshold = args.luma_threshold
if args.preset is not None:
config.preset = args.preset
if args.grain_duration is not None:
config.grain_duration = args.grain_duration
if args.workers is not None:
config.max_workers = args.workers
return config
def run_prep(args: argparse.Namespace, config: SplicerConfig) -> None:
"""Run preprocessing pipeline: grain and/or greyscale, then exit."""
if not args.grain and not args.greyscale:
print("error: --prep requires at least one of --grain or --greyscale", file=sys.stderr)
sys.exit(1)
videos = collect_videos(args.inputs)
if not videos:
print("error: no video files found in inputs", file=sys.stderr)
sys.exit(1)
output_dir = Path(config.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
print(f"Prep mode: {len(videos)} video(s)")
working_files = videos
if args.grain:
print(f"\n--- Grain: splitting into ~{config.grain_duration}s segments ---")
grain_dir = output_dir / "_grain_tmp" if args.greyscale else output_dir
grain_dir.mkdir(parents=True, exist_ok=True)
grained: list[Path] = []
for vid in working_files:
segments = grain_video(vid, grain_dir, config)
grained.extend(segments)
print(f"Grain complete: {len(grained)} segment(s)")
working_files = grained
if args.greyscale:
print("\n--- Greyscale ---")
grey_dir = output_dir
grey_dir.mkdir(parents=True, exist_ok=True)
greyed: list[Path] = []
for vid in working_files:
out = greyscale_video(vid, grey_dir, config)
greyed.append(out)
print(f"Greyscale complete: {len(greyed)} file(s)")
# Clean up intermediate grain files if both modes ran
if args.grain:
grain_dir = output_dir / "_grain_tmp"
if grain_dir.exists():
shutil.rmtree(grain_dir)
print("\nPrep done.")
print(f"Output: {output_dir}")
def collect_inputs(paths: list[str]) -> list[Path]:
"""Expand directories and filter to supported file types."""
valid_ext = IMAGE_EXTENSIONS | VIDEO_EXTENSIONS
result: list[Path] = []
for p_str in paths:
p = Path(p_str)
if p.is_dir():
for child in sorted(p.iterdir()):
if child.is_file() and child.suffix.lower() in valid_ext:
result.append(child)
elif p.is_file() and p.suffix.lower() in valid_ext:
result.append(p)
else:
print(f" skipping: {p} (not a supported file or directory)")
return result
if __name__ == "__main__":
main()