diff --git a/.gitignore b/.gitignore index 4356279b..18e1fbf4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,9 @@ dist_deb/ .DS_Store .claude -.idea/ \ No newline at end of file +<<<<<<< HEAD +.idea +.local-tmp/ +======= +.idea/ +>>>>>>> origin/master diff --git a/Dockerfile.server b/Dockerfile.server index 3b8ffb66..ffd991d9 100644 --- a/Dockerfile.server +++ b/Dockerfile.server @@ -1,4 +1,4 @@ -FROM rust:1.87-slim-bookworm AS rust-bookworm-slim +FROM rust:1.88-slim-bookworm AS rust-bookworm-slim # ---------------------------------- @@ -47,6 +47,15 @@ RUN apt-get -qy install libsqlite3-dev >/dev/null RUN apt-get -qy install protobuf-compiler >/dev/null RUN apt-get -qy install python3 python3.11 python3.11-venv >/dev/null +# Install MinIO for S3 integration tests +RUN apt-get -qy install curl >/dev/null +RUN ARCH=$(dpkg --print-architecture) && \ + if [ "$ARCH" = "amd64" ]; then MINIO_ARCH="amd64"; \ + elif [ "$ARCH" = "arm64" ]; then MINIO_ARCH="arm64"; \ + else echo "Unsupported architecture: $ARCH" && exit 1; fi && \ + curl -fsSL https://dl.min.io/server/minio/release/linux-${MINIO_ARCH}/minio -o /usr/local/bin/minio && \ + chmod +x /usr/local/bin/minio + # Switch to regular user RUN mkdir -p /app RUN chown -R ${UID}:${GID} /app diff --git a/README.md b/README.md index afe51784..e1486ef1 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ Clapshot is an open-source, self-hosted tool for collaborative video/media revie - **File Organization**: Hierarchical folder system with drag-and-drop, admin user management interface - **Media Processing**: FFmpeg transcoding with configurable quality, thumbnail generation - **Authentication**: Reverse proxy integration supporting OAuth, JWT, Kerberos, SAML, etc. -- **Storage**: SQLite database with automatic migrations, file-based media storage +- **Storage**: SQLite database with automatic migrations, local filesystem or S3-compatible object storage - **Extensibility**: Plugin system for custom workflows and integrations *For a comprehensive feature list, see [FEATURES.md](FEATURES.md).* @@ -125,6 +125,39 @@ See [Upgrading Guide](doc/upgrading.md) for instructions on installing a new rel **Want to customize media processing?** See the [Transcoding and Thumbnailing Guide](doc/transcoding.md) for configuring hardware acceleration, custom encoders, and specialized processing workflows. +### Object Storage (S3-compatible) + +Clapshot can upload processed media and thumbnails to an S3-compatible object store while still staging files locally under `/videos`. + +**Required settings** (CLI flags, `clapshot-server.conf`, or `CLAPSHOT_SERVER__*` env vars): +- `storage-backend = s3` +- `s3-endpoint = https://s3.example.com` +- `s3-region = us-east-1` +- `s3-bucket = clapshot-media` + +**Authentication** uses the standard AWS SDK credential chain (in order of precedence): +1. Environment variables: `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY` +2. AWS credentials file (`~/.aws/credentials`) +3. IAM instance roles (for EC2/ECS deployments) + +**Optional settings:** +- `s3-prefix` – path inside the bucket (default: `videos`) +- `s3-public-url` – base URL used in playback links (defaults to `s3-endpoint/bucket`; set to your CDN/domain if different) + +**Docker env example:** +```bash +-e CLAPSHOT_SERVER__STORAGE_BACKEND=s3 \ +-e CLAPSHOT_SERVER__S3_ENDPOINT=https://s3.example.com \ +-e CLAPSHOT_SERVER__S3_REGION=us-east-1 \ +-e CLAPSHOT_SERVER__S3_BUCKET=clapshot-media \ +-e AWS_ACCESS_KEY_ID=YOUR_KEY \ +-e AWS_SECRET_ACCESS_KEY=YOUR_SECRET \ +-e CLAPSHOT_SERVER__S3_PUBLIC_URL=https://cdn.example.com/clapshot-media +``` + +Ensure the bucket/prefix is readable at `s3-public-url` for playback, and keep enough local disk for staging uploads under `data_dir/videos`. + + ## Architecture Overview Main components: diff --git a/client/src/App.svelte b/client/src/App.svelte index 41651573..dd85b64d 100644 --- a/client/src/App.svelte +++ b/client/src/App.svelte @@ -434,6 +434,7 @@ else let uploadUrl: string = $state(""); +let transcodePreferred: boolean = $state(true); // ------------------------------------------------------------- @@ -1133,11 +1134,19 @@ function onMediaFileListPopupAction(e: { detail: { action: Proto3.ActionDef, ite
{#if pit.folderListing.allowUpload} +
+ Transcode + +
diff --git a/client/src/__tests__/lib/NavBar.test.ts b/client/src/__tests__/lib/NavBar.test.ts index 6f38134e..9d9137a7 100644 --- a/client/src/__tests__/lib/NavBar.test.ts +++ b/client/src/__tests__/lib/NavBar.test.ts @@ -431,6 +431,26 @@ describe('NavBar.svelte', () => { expect(matchingReport?.progress).toBe(0.7); }); + it('renders a progress bar for the active video', () => { + const report: MediaProgressReport = { + mediaFileId: 'video123', + msg: 'Uploading to storage…', + progress: 0.4, + received_ts: Date.now() + }; + mediaFileId.set('video123'); + curVideo.set(createMinimalMediaFile({ + id: 'video123', + title: 'Test Video' + })); + latestProgressReports.set([report]); + + const { container } = render(NavBar); + expect(screen.getAllByText('Uploading to storage…').length).toBeGreaterThan(0); + const bars = container.querySelectorAll('.bg-amber-500'); + expect(bars.length).toBeGreaterThan(0); + }); + it('should return undefined when no matching progress report', () => { const reports: MediaProgressReport[] = [ { @@ -523,4 +543,4 @@ describe('NavBar.svelte', () => { expect(() => render(NavBar)).not.toThrow(); }); }); -}); \ No newline at end of file +}); diff --git a/client/src/__tests__/lib/asset_browser/VideoTile.test.ts b/client/src/__tests__/lib/asset_browser/VideoTile.test.ts index f118173b..06776389 100644 --- a/client/src/__tests__/lib/asset_browser/VideoTile.test.ts +++ b/client/src/__tests__/lib/asset_browser/VideoTile.test.ts @@ -102,6 +102,19 @@ describe('VideoTile', () => { expect(container.querySelector('.flex-grow')).toBeInTheDocument(); }); + it('shows a default icon when no preview or visualization exists', () => { + const mediaFile = createMediaFile({ + id: 'no-preview', + title: 'No Preview Video', + previewData: undefined, + }); + + const { container } = render(VideoTile, { item: mediaFile }); + + const icon = container.querySelector('i.fa-video'); + expect(icon).toBeInTheDocument(); + }); + it('should render with visualization override when no preview data', () => { const mediaFile = createMediaFile({ id: 'test-video-3', @@ -354,4 +367,4 @@ describe('VideoTile', () => { expect(titleElement).toBeInTheDocument(); expect(titleElement.textContent).toBe(mediaFile.title); }); -}); \ No newline at end of file +}); diff --git a/client/src/lib/NavBar.svelte b/client/src/lib/NavBar.svelte index 3c55a950..31648a98 100644 --- a/client/src/lib/NavBar.svelte +++ b/client/src/lib/NavBar.svelte @@ -23,13 +23,17 @@ let { onbasicauthlogout, onaddcomments }: Props = $props(); let loggedOut = $state(false); let localeOptions = $state(availableLocales); -// Watch for (transcoding) progress reports from server, and update progress bar if one matches this item. +// Watch for (transcoding/upload) progress reports from server, and show a quick status bar for the current video. let videoProgressMsg: string | undefined = $state(undefined); +let videoProgressVal: number | undefined = $state(undefined); onMount(async () => { - latestProgressReports.subscribe((reports: MediaProgressReport[]) => { - videoProgressMsg = reports.find((r: MediaProgressReport) => r.mediaFileId === $mediaFileId)?.msg; + const unsubscribe = latestProgressReports.subscribe((reports: MediaProgressReport[]) => { + const match = reports.find((r: MediaProgressReport) => r.mediaFileId === $mediaFileId); + videoProgressMsg = match?.msg; + videoProgressVal = match?.progress; }); + return () => unsubscribe(); }); $effect(() => { @@ -160,6 +164,17 @@ function addEDLComments(comments: Proto3.Comment[]) {
+ + {#if videoProgressVal !== undefined} +
+
+ {videoProgressMsg || 'Processing...'} +
+
+
+
+
+ {/if} {$curVideo?.title} {#if videoProgressMsg} {videoProgressMsg} diff --git a/client/src/lib/asset_browser/FileUpload.svelte b/client/src/lib/asset_browser/FileUpload.svelte index 9861b53f..9d1e1143 100644 --- a/client/src/lib/asset_browser/FileUpload.svelte +++ b/client/src/lib/asset_browser/FileUpload.svelte @@ -15,6 +15,7 @@ let files = { // Passed to HTTP POST request: listingData: Object; mediaFileAddedAction: string|undefined; + transcodePreferred?: boolean; children?: import('svelte').Snippet; } @@ -22,6 +23,7 @@ let files = { postUrl, listingData, mediaFileAddedAction, + transcodePreferred = true, children }: Props = $props(); @@ -80,11 +82,13 @@ function upload() { ajax.addEventListener("abort", abortHandler, false); ajax.open("POST", postUrl); ajax.setRequestHeader("X-FILE-NAME", encodeURIComponent(file.name)); + ajax.setRequestHeader("X-CLAPSHOT-TRANSCODE", transcodePreferred ? "true" : "false"); let upload_cookies = { ...LocalStorageCookies.getAllNonExpired() }; if (mediaFileAddedAction) upload_cookies["media_file_added_action"] = mediaFileAddedAction; upload_cookies["listing_data_json"] = JSON.stringify(listingData); + upload_cookies["transcode_preference"] = transcodePreferred ? "true" : "false"; ajax.setRequestHeader("X-CLAPSHOT-COOKIES", JSON.stringify(upload_cookies)); ajax.send(formdata); diff --git a/client/src/lib/asset_browser/VideoTile.svelte b/client/src/lib/asset_browser/VideoTile.svelte index 9f204a84..6751e7d4 100644 --- a/client/src/lib/asset_browser/VideoTile.svelte +++ b/client/src/lib/asset_browser/VideoTile.svelte @@ -67,6 +67,10 @@ function fmt_date(d: Date | undefined) {
+ {:else} +
+ +
{/if} @@ -119,4 +123,3 @@ function fmt_date(d: Date | undefined) { } - diff --git a/server/Cargo.lock b/server/Cargo.lock index 11cbfcd5..f624f616 100644 --- a/server/Cargo.lock +++ b/server/Cargo.lock @@ -38,6 +38,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" + [[package]] name = "android_system_properties" version = "0.1.5" @@ -296,6 +302,379 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" +[[package]] +name = "aws-config" +version = "1.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96571e6996817bf3d58f6b569e4b9fd2e9d2fcf9f7424eed07b2ce9bb87535e5" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-sdk-sts", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "http 1.3.1", + "time", + "tokio", + "tracing", + "url", +] + +[[package]] +name = "aws-credential-types" +version = "1.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3cd362783681b15d136480ad555a099e82ecd8e2d10a841e14dfd0078d67fee3" +dependencies = [ + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "zeroize", +] + +[[package]] +name = "aws-lc-rs" +version = "1.15.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a88aab2464f1f25453baa7a07c84c5b7684e274054ba06817f382357f77a288" +dependencies = [ + "aws-lc-sys", + "zeroize", +] + +[[package]] +name = "aws-lc-sys" +version = "0.35.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b45afffdee1e7c9126814751f88dddc747f41d91da16c9551a0f1e8a11e788a1" +dependencies = [ + "cc", + "cmake", + "dunce", + "fs_extra", +] + +[[package]] +name = "aws-runtime" +version = "1.5.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d81b5b2898f6798ad58f484856768bca817e3cd9de0974c24ae0f1113fe88f1b" +dependencies = [ + "aws-credential-types", + "aws-sigv4", + "aws-smithy-async", + "aws-smithy-eventstream", + "aws-smithy-http", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "http 0.2.12", + "http-body 0.4.6", + "percent-encoding", + "pin-project-lite", + "tracing", + "uuid", +] + +[[package]] +name = "aws-sdk-s3" +version = "1.119.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d65fddc3844f902dfe1864acb8494db5f9342015ee3ab7890270d36fbd2e01c" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-sigv4", + "aws-smithy-async", + "aws-smithy-checksums", + "aws-smithy-eventstream", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "bytes", + "fastrand", + "hex", + "hmac", + "http 0.2.12", + "http 1.3.1", + "http-body 0.4.6", + "lru", + "percent-encoding", + "regex-lite", + "sha2", + "tracing", + "url", +] + +[[package]] +name = "aws-sdk-sts" +version = "1.95.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55542378e419558e6b1f398ca70adb0b2088077e79ad9f14eb09441f2f7b2164" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-query", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "fastrand", + "http 0.2.12", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sigv4" +version = "1.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69e523e1c4e8e7e8ff219d732988e22bfeae8a1cafdbe6d9eca1546fa080be7c" +dependencies = [ + "aws-credential-types", + "aws-smithy-eventstream", + "aws-smithy-http", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "form_urlencoded", + "hex", + "hmac", + "http 0.2.12", + "http 1.3.1", + "percent-encoding", + "sha2", + "time", + "tracing", +] + +[[package]] +name = "aws-smithy-async" +version = "1.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ee19095c7c4dda59f1697d028ce704c24b2d33c6718790c7f1d5a3015b4107c" +dependencies = [ + "futures-util", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "aws-smithy-checksums" +version = "0.63.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87294a084b43d649d967efe58aa1f9e0adc260e13a6938eb904c0ae9b45824ae" +dependencies = [ + "aws-smithy-http", + "aws-smithy-types", + "bytes", + "crc-fast", + "hex", + "http 0.2.12", + "http-body 0.4.6", + "md-5", + "pin-project-lite", + "sha1", + "sha2", + "tracing", +] + +[[package]] +name = "aws-smithy-eventstream" +version = "0.60.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc12f8b310e38cad85cf3bef45ad236f470717393c613266ce0a89512286b650" +dependencies = [ + "aws-smithy-types", + "bytes", + "crc32fast", +] + +[[package]] +name = "aws-smithy-http" +version = "0.62.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "826141069295752372f8203c17f28e30c464d22899a43a0c9fd9c458d469c88b" +dependencies = [ + "aws-smithy-eventstream", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "bytes-utils", + "futures-core", + "futures-util", + "http 0.2.12", + "http 1.3.1", + "http-body 0.4.6", + "percent-encoding", + "pin-project-lite", + "pin-utils", + "tracing", +] + +[[package]] +name = "aws-smithy-http-client" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59e62db736db19c488966c8d787f52e6270be565727236fd5579eaa301e7bc4a" +dependencies = [ + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "h2 0.3.27", + "h2 0.4.13", + "http 0.2.12", + "http 1.3.1", + "http-body 0.4.6", + "hyper 0.14.32", + "hyper 1.8.1", + "hyper-rustls 0.24.2", + "hyper-rustls 0.27.6", + "hyper-util", + "pin-project-lite", + "rustls 0.21.12", + "rustls 0.23.36", + "rustls-native-certs", + "rustls-pki-types", + "tokio", + "tokio-rustls 0.26.2", + "tower", + "tracing", +] + +[[package]] +name = "aws-smithy-json" +version = "0.61.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49fa1213db31ac95288d981476f78d05d9cbb0353d22cdf3472cc05bb02f6551" +dependencies = [ + "aws-smithy-types", +] + +[[package]] +name = "aws-smithy-observability" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17f616c3f2260612fe44cede278bafa18e73e6479c4e393e2c4518cf2a9a228a" +dependencies = [ + "aws-smithy-runtime-api", +] + +[[package]] +name = "aws-smithy-query" +version = "0.60.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae5d689cf437eae90460e944a58b5668530d433b4ff85789e69d2f2a556e057d" +dependencies = [ + "aws-smithy-types", + "urlencoding", +] + +[[package]] +name = "aws-smithy-runtime" +version = "1.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a392db6c583ea4a912538afb86b7be7c5d8887d91604f50eb55c262ee1b4a5f5" +dependencies = [ + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-http-client", + "aws-smithy-observability", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "fastrand", + "http 0.2.12", + "http 1.3.1", + "http-body 0.4.6", + "http-body 1.0.1", + "pin-project-lite", + "pin-utils", + "tokio", + "tracing", +] + +[[package]] +name = "aws-smithy-runtime-api" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab0d43d899f9e508300e587bf582ba54c27a452dd0a9ea294690669138ae14a2" +dependencies = [ + "aws-smithy-async", + "aws-smithy-types", + "bytes", + "http 0.2.12", + "http 1.3.1", + "pin-project-lite", + "tokio", + "tracing", + "zeroize", +] + +[[package]] +name = "aws-smithy-types" +version = "1.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "905cb13a9895626d49cf2ced759b062d913834c7482c38e49557eac4e6193f01" +dependencies = [ + "base64-simd", + "bytes", + "bytes-utils", + "http 0.2.12", + "http 1.3.1", + "http-body 0.4.6", + "http-body 1.0.1", + "http-body-util", + "itoa", + "num-integer", + "pin-project-lite", + "pin-utils", + "ryu", + "serde", + "time", +] + +[[package]] +name = "aws-smithy-xml" +version = "0.60.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11b2f670422ff42bf7065031e72b45bc52a3508bd089f743ea90731ca2b6ea57" +dependencies = [ + "xmlparser", +] + +[[package]] +name = "aws-types" +version = "1.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d980627d2dd7bfc32a3c025685a033eeab8d365cc840c631ef59d1b8f428164" +dependencies = [ + "aws-credential-types", + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "rustc_version", + "tracing", +] + [[package]] name = "axum" version = "0.8.4" @@ -306,10 +685,10 @@ dependencies = [ "bytes", "form_urlencoded", "futures-util", - "http", - "http-body", + "http 1.3.1", + "http-body 1.0.1", "http-body-util", - "hyper", + "hyper 1.8.1", "hyper-util", "itoa", "matchit", @@ -338,8 +717,8 @@ checksum = "68464cd0412f486726fb3373129ef5d2993f90c34bc2bc1c1e9943b2f4fc7ca6" dependencies = [ "bytes", "futures-core", - "http", - "http-body", + "http 1.3.1", + "http-body 1.0.1", "http-body-util", "mime", "pin-project-lite", @@ -356,6 +735,16 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +[[package]] +name = "base64-simd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "339abbe78e73178762e23bea9dfd08e697eb3f3301cd4be981c0f78ba5859195" +dependencies = [ + "outref", + "vsimd", +] + [[package]] name = "bitflags" version = "2.9.1" @@ -478,12 +867,25 @@ version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" +[[package]] +name = "bytes-utils" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dafe3a8757b027e2be6e4e5601ed563c55989fcf1546e933c66c8eb3a058d35" +dependencies = [ + "bytes", + "either", +] + [[package]] name = "cc" -version = "1.2.24" +version = "1.2.52" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16595d3be041c03b09d08d0858631facccee9221e579704070e6e9e4915d3bc7" +checksum = "cd4932aefd12402b36c60956a4fe0035421f544799057659ff86f923657aada3" dependencies = [ + "find-msvc-tools", + "jobserver", + "libc", "shlex", ] @@ -574,6 +976,9 @@ dependencies = [ "aspasia", "assert_fs", "async-std", + "aws-config", + "aws-sdk-s3", + "aws-types", "base64", "bytes", "chrono", @@ -587,6 +992,7 @@ dependencies = [ "futures", "futures-util", "hex", + "http 0.2.12", "http-body-util", "hyper-util", "indoc", @@ -641,6 +1047,15 @@ dependencies = [ "whoami", ] +[[package]] +name = "cmake" +version = "0.1.57" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75443c44cd6b379beb8c5b45d85d0773baf31cce901fe7bb252f4eff3008ef7d" +dependencies = [ + "cc", +] + [[package]] name = "colorchoice" version = "1.0.3" @@ -656,6 +1071,16 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "core-foundation" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2a6cd9ae233e7f62ba4e9353e81a88df7fc8a5987b8d445b4d90c879bd156f6" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -671,6 +1096,34 @@ dependencies = [ "libc", ] +[[package]] +name = "crc" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5eb8a2a1cd12ab0d987a5d5e825195d372001a4094a0376319d5a0ad71c1ba0d" +dependencies = [ + "crc-catalog", +] + +[[package]] +name = "crc-catalog" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" + +[[package]] +name = "crc-fast" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ddc2d09feefeee8bd78101665bd8645637828fa9317f9f292496dbbd8c65ff3" +dependencies = [ + "crc", + "digest", + "rand 0.9.1", + "regex", + "rustversion", +] + [[package]] name = "crc32fast" version = "1.4.2" @@ -842,6 +1295,7 @@ checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ "block-buffer", "crypto-common", + "subtle", ] [[package]] @@ -881,6 +1335,12 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "dunce" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813" + [[package]] name = "either" version = "1.15.0" @@ -996,6 +1456,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "find-msvc-tools" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f449e6c6c08c865631d4890cfacf252b3d396c9bcc83adb6623cdb02a8336c41" + [[package]] name = "fixedbitset" version = "0.5.7" @@ -1018,6 +1484,12 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foldhash" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -1027,6 +1499,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fs_extra" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" + [[package]] name = "funty" version = "2.0.0" @@ -1210,16 +1688,35 @@ dependencies = [ [[package]] name = "h2" -version = "0.4.10" +version = "0.3.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0beca50380b1fc32983fc1cb4587bfa4bb9e78fc259aad4a0032d2080309222d" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 0.2.12", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "h2" +version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9421a676d1b147b16b82c9225157dc629087ef8ec4d5e2960f9437a90dac0a5" +checksum = "2f44da3a8150a6703ed5d34e164b875fd14c2cdab9af1252a9a1020bde2bdc54" dependencies = [ "atomic-waker", "bytes", "fnv", "futures-core", "futures-sink", - "http", + "http 1.3.1", "indexmap", "slab", "tokio", @@ -1241,6 +1738,11 @@ name = "hashbrown" version = "0.15.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "84b26c544d002229e640969970a2e74021aadf6e2f96372b9c58eff97de08eb3" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash", +] [[package]] name = "headers" @@ -1251,7 +1753,7 @@ dependencies = [ "base64", "bytes", "headers-core", - "http", + "http 1.3.1", "httpdate", "mime", "sha1", @@ -1263,7 +1765,7 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4" dependencies = [ - "http", + "http 1.3.1", ] [[package]] @@ -1290,6 +1792,26 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "hmac" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" +dependencies = [ + "digest", +] + +[[package]] +name = "http" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http" version = "1.3.1" @@ -1301,6 +1823,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http-body" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" +dependencies = [ + "bytes", + "http 0.2.12", + "pin-project-lite", +] + [[package]] name = "http-body" version = "1.0.1" @@ -1308,7 +1841,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" dependencies = [ "bytes", - "http", + "http 1.3.1", ] [[package]] @@ -1319,8 +1852,8 @@ checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" dependencies = [ "bytes", "futures-core", - "http", - "http-body", + "http 1.3.1", + "http-body 1.0.1", "pin-project-lite", ] @@ -1338,38 +1871,80 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" [[package]] name = "hyper" -version = "1.6.0" +version = "0.14.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc2b571658e38e0c01b1fdca3bbbe93c00d3d71693ff2770043f8c29bc7d6f80" +checksum = "41dfc780fdec9373c01bae43289ea34c972e40ee3c9f6b3c8801a35f35586ce7" dependencies = [ "bytes", "futures-channel", + "futures-core", "futures-util", - "h2", - "http", - "http-body", + "h2 0.3.27", + "http 0.2.12", + "http-body 0.4.6", "httparse", "httpdate", "itoa", "pin-project-lite", + "socket2 0.5.10", + "tokio", + "tower-service", + "tracing", + "want", +] + +[[package]] +name = "hyper" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ab2d4f250c3d7b1c9fcdff1cece94ea4e2dfbec68614f7b87cb205f24ca9d11" +dependencies = [ + "atomic-waker", + "bytes", + "futures-channel", + "futures-core", + "h2 0.4.13", + "http 1.3.1", + "http-body 1.0.1", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "pin-utils", "smallvec", "tokio", "want", ] +[[package]] +name = "hyper-rustls" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" +dependencies = [ + "futures-util", + "http 0.2.12", + "hyper 0.14.32", + "log", + "rustls 0.21.12", + "tokio", + "tokio-rustls 0.24.1", +] + [[package]] name = "hyper-rustls" version = "0.27.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "03a01595e11bdcec50946522c32dde3fc6914743000a68b93000965f2f02406d" dependencies = [ - "http", - "hyper", + "http 1.3.1", + "hyper 1.8.1", "hyper-util", - "rustls", + "rustls 0.23.36", + "rustls-native-certs", "rustls-pki-types", "tokio", - "tokio-rustls", + "tokio-rustls 0.26.2", "tower-service", "webpki-roots 1.0.0", ] @@ -1380,7 +1955,7 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" dependencies = [ - "hyper", + "hyper 1.8.1", "hyper-util", "pin-project-lite", "tokio", @@ -1389,20 +1964,23 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.13" +version = "0.1.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1c293b6b3d21eca78250dc7dbebd6b9210ec5530e038cbfe0661b5c47ab06e8" +checksum = "727805d60e7938b76b826a6ef209eb70eaa1812794f9424d4a4e2d740662df5f" dependencies = [ + "base64", "bytes", "futures-channel", "futures-core", "futures-util", - "http", - "http-body", - "hyper", + "http 1.3.1", + "http-body 1.0.1", + "hyper 1.8.1", + "ipnet", "libc", + "percent-encoding", "pin-project-lite", - "socket2 0.5.10", + "socket2 0.6.1", "tokio", "tower-service", "tracing", @@ -1613,6 +2191,16 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" +[[package]] +name = "jobserver" +version = "0.1.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9afb3de4395d6b3e67a780b6de64b51c978ecf11cb9a462c66be7d4ca9039d33" +dependencies = [ + "getrandom 0.3.3", + "libc", +] + [[package]] name = "js-sys" version = "0.3.82" @@ -1726,6 +2314,15 @@ dependencies = [ "value-bag", ] +[[package]] +name = "lru" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38" +dependencies = [ + "hashbrown 0.15.3", +] + [[package]] name = "lru-slab" version = "0.1.2" @@ -1747,6 +2344,16 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" +[[package]] +name = "md-5" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" +dependencies = [ + "cfg-if", + "digest", +] + [[package]] name = "memchr" version = "2.7.4" @@ -1826,7 +2433,7 @@ dependencies = [ "bytes", "futures-core", "futures-util", - "http", + "http 1.3.1", "httparse", "log", "memchr", @@ -1882,6 +2489,15 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" +[[package]] +name = "num-integer" +version = "0.1.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" +dependencies = [ + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -1922,6 +2538,18 @@ version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4895175b425cb1f87721b59f0f286c2092bd4af812243672510e1ac53e2e0ad" +[[package]] +name = "openssl-probe" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f50d9b3dabb09ecd771ad0aa242ca6894994c130308ca3d7684634df8037391" + +[[package]] +name = "outref" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a80800c0488c3a21695ea981a54918fbb37abf04f4d0720c453632255e2ff0e" + [[package]] name = "parking" version = "2.2.1" @@ -2298,7 +2926,7 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustc-hash", - "rustls", + "rustls 0.23.36", "socket2 0.5.10", "thiserror 2.0.17", "tokio", @@ -2318,7 +2946,7 @@ dependencies = [ "rand 0.9.1", "ring", "rustc-hash", - "rustls", + "rustls 0.23.36", "rustls-pki-types", "slab", "thiserror 2.0.17", @@ -2443,9 +3071,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.11.1" +version = "1.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" +checksum = "843bc0191f75f3e22651ae5f1e72939ab2f72a4bc30fa80a066bd66edefc24d4" dependencies = [ "aho-corasick", "memchr", @@ -2455,15 +3083,21 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.9" +version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" +checksum = "5276caf25ac86c8d810222b3dbb938e512c55c6831a10f3e6ed1c93b84041f1c" dependencies = [ "aho-corasick", "memchr", "regex-syntax", ] +[[package]] +name = "regex-lite" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d942b98df5e658f56f20d592c7f868833fe38115e65c33003d8cd224b0155da" + [[package]] name = "regex-syntax" version = "0.8.5" @@ -2490,11 +3124,11 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "http", - "http-body", + "http 1.3.1", + "http-body 1.0.1", "http-body-util", - "hyper", - "hyper-rustls", + "hyper 1.8.1", + "hyper-rustls 0.27.6", "hyper-util", "ipnet", "js-sys", @@ -2505,7 +3139,7 @@ dependencies = [ "percent-encoding", "pin-project-lite", "quinn", - "rustls", + "rustls 0.23.36", "rustls-pemfile", "rustls-pki-types", "serde", @@ -2513,7 +3147,7 @@ dependencies = [ "serde_urlencoded", "sync_wrapper", "tokio", - "tokio-rustls", + "tokio-rustls 0.26.2", "tokio-util", "tower", "tower-service", @@ -2591,6 +3225,15 @@ version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" +[[package]] +name = "rustc_version" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92" +dependencies = [ + "semver", +] + [[package]] name = "rustix" version = "1.0.7" @@ -2606,18 +3249,43 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.27" +version = "0.21.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "730944ca083c1c233a75c09f199e973ca499344a2b7ba9e755c457e86fb4a321" +checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" dependencies = [ + "log", + "ring", + "rustls-webpki 0.101.7", + "sct", +] + +[[package]] +name = "rustls" +version = "0.23.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c665f33d38cea657d9614f766881e4d510e0eda4239891eea56b4cadcf01801b" +dependencies = [ + "aws-lc-rs", "once_cell", "ring", "rustls-pki-types", - "rustls-webpki", + "rustls-webpki 0.103.8", "subtle", "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "612460d5f7bea540c490b2b6395d8e34a953e52b491accd6c86c8164c5932a63" +dependencies = [ + "openssl-probe", + "rustls-pki-types", + "schannel", + "security-framework", +] + [[package]] name = "rustls-pemfile" version = "2.2.0" @@ -2639,10 +3307,21 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.103.3" +version = "0.101.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" +dependencies = [ + "ring", + "untrusted", +] + +[[package]] +name = "rustls-webpki" +version = "0.103.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4a72fe2bcf7a6ac6fd7d0b9e5cb68aeb7d4c0a0271730218b3e92d43b4eb435" +checksum = "2ffdfa2f5286e2247234e03f680868ac2815974dc39e00ea15adc445d0aafe52" dependencies = [ + "aws-lc-rs", "ring", "rustls-pki-types", "untrusted", @@ -2678,6 +3357,15 @@ dependencies = [ "sdd", ] +[[package]] +name = "schannel" +version = "0.1.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "891d81b926048e76efe18581bf793546b4c0eaf8448d72be8de2bbee5fd166e1" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "scheduled-thread-pool" version = "0.2.7" @@ -2699,6 +3387,16 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "sct" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "sdd" version = "3.0.8" @@ -2711,6 +3409,29 @@ version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" +[[package]] +name = "security-framework" +version = "3.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3297343eaf830f66ede390ea39da1d462b6b0c1b000f420d0a83f898bbbe6ef" +dependencies = [ + "bitflags", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc1f0cbffaac4852523ce30d8bd3c5cdc873501d96ff467ca09b6767bb8cd5c0" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "semver" version = "1.0.27" @@ -3214,13 +3935,23 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "tokio-rustls" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" +dependencies = [ + "rustls 0.21.12", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e727b36a1a0e8b74c376ac2211e40c2c8af09fb4013c60d910495810f008e9b" dependencies = [ - "rustls", + "rustls 0.23.36", "tokio", ] @@ -3343,11 +4074,11 @@ dependencies = [ "axum", "base64", "bytes", - "h2", - "http", - "http-body", + "h2 0.4.13", + "http 1.3.1", + "http-body 1.0.1", "http-body-util", - "hyper", + "hyper 1.8.1", "hyper-timeout", "hyper-util", "percent-encoding", @@ -3589,7 +4320,7 @@ checksum = "eadc29d668c91fcc564941132e17b28a7ceb2f3ebf0b9dae3e03fd7a6748eb0d" dependencies = [ "bytes", "data-encoding", - "http", + "http 1.3.1", "httparse", "log", "rand 0.9.1", @@ -3606,7 +4337,7 @@ checksum = "8628dcc84e5a09eb3d8423d6cb682965dea9133204e8fb3efee74c2a0c259442" dependencies = [ "bytes", "data-encoding", - "http", + "http 1.3.1", "httparse", "log", "rand 0.9.1", @@ -3719,6 +4450,12 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "vsimd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" + [[package]] name = "vte" version = "0.14.1" @@ -3765,10 +4502,10 @@ dependencies = [ "bytes", "futures-util", "headers", - "http", - "http-body", + "http 1.3.1", + "http-body 1.0.1", "http-body-util", - "hyper", + "hyper 1.8.1", "hyper-util", "log", "mime", @@ -4251,6 +4988,12 @@ dependencies = [ "rustix", ] +[[package]] +name = "xmlparser" +version = "0.13.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" + [[package]] name = "yoke" version = "0.8.0" diff --git a/server/Cargo.toml b/server/Cargo.toml index 53359d12..14f7db4d 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -123,6 +123,10 @@ indoc = "2.0.5" Inflector = "0.11.4" serial_test = "3.1.1" aspasia = "0.2.0" +aws-config = { version = "1.8", default-features = false, features = ["rustls", "rt-tokio"] } +aws-sdk-s3 = { version = "1.115", default-features = false, features = ["rustls"] } +aws-types = "1.3" +http = "0.2" [dev-dependencies] assert_fs = "1.0.13" diff --git a/server/src/api_server/file_upload.rs b/server/src/api_server/file_upload.rs index 529fdf89..a70f5524 100644 --- a/server/src/api_server/file_upload.rs +++ b/server/src/api_server/file_upload.rs @@ -7,6 +7,7 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use crate::video_pipeline::IncomingFile; +use crate::video_pipeline::TranscodePreference; use super::parse_auth_headers; use super::server_state::ServerState; use super::user_session::{org_authz_with_default, AuthzTopic, AuthzError}; @@ -33,7 +34,7 @@ pub async fn handle_multipart_upload( body: impl warp::Stream> + Unpin) -> Result, Infallible> { - let (user_id, user_name, is_admin, cookies, filtered_headers, remote_error) = parse_auth_headers(&hdrs, &server.default_user, &server.org_http_headers_regex); + let (user_id, user_name, is_admin, mut cookies, filtered_headers, remote_error) = parse_auth_headers(&hdrs, &server.default_user, &server.org_http_headers_regex); // If X-Remote-Error is set, return error response if let Some(error_msg) = remote_error { @@ -73,6 +74,22 @@ pub async fn handle_multipart_upload( } } + // Determine transcoding preference from header + let transcode_preference = hdrs + .get("x-clapshot-transcode") + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_ascii_lowercase()) + .map(|s| match s.as_str() { + "true" | "1" | "yes" => TranscodePreference::Force, + "false" | "0" | "no" => TranscodePreference::Skip, + _ => TranscodePreference::Auto, + }) + .unwrap_or(TranscodePreference::Auto); + cookies.insert( + "transcode_preference".into(), + format!("{:?}", transcode_preference), + ); + // Parse the multipart stream let boundary = mime.get_param("boundary").map(|v| v.to_string()); let boundary = match boundary { @@ -165,7 +182,7 @@ pub async fn handle_multipart_upload( } } - if let Err(e) = upload_done.send(IncomingFile{ file_path: uploaded_file, user_id: user_id, cookies }) { + if let Err(e) = upload_done.send(IncomingFile{ file_path: uploaded_file, user_id, cookies, transcode_preference }) { tracing::error!("Failed to send upload ok signal: {:?}", e); return Ok(warp::reply::with_status("Internal error: failed to send upload ok signal".into(), warp::http::StatusCode::INTERNAL_SERVER_ERROR)); } diff --git a/server/src/api_server/mod.rs b/server/src/api_server/mod.rs index 5a5b626f..13b5c506 100644 --- a/server/src/api_server/mod.rs +++ b/server/src/api_server/mod.rs @@ -549,7 +549,7 @@ async fn run_api_server_async( tracing::info!("Allowed CORS origins: {:?}", cors_origins); let cors_methods = ["GET", "POST", "HEAD", "OPTIONS"]; - let cors_headers = ["x-file-name", "x-clapshot-cookies", "content-type", "upgrade", "sec-websocket-protocol", "sec-websocket-version"]; + let cors_headers = ["x-file-name", "x-clapshot-cookies", "x-clapshot-transcode", "content-type", "upgrade", "sec-websocket-protocol", "sec-websocket-version"]; let routes = if cors_origins.contains(&"*") { tracing::warn!(concat!( diff --git a/server/src/api_server/server_state.rs b/server/src/api_server/server_state.rs index 0757bfeb..313a758f 100644 --- a/server/src/api_server/server_state.rs +++ b/server/src/api_server/server_state.rs @@ -1,3 +1,4 @@ +use crate::storage::StorageBackend; use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -26,6 +27,8 @@ pub struct ServerState { pub db: Arc, pub media_files_dir: PathBuf, pub upload_dir: PathBuf, + pub media_base_url: String, + pub storage: StorageBackend, pub url_base: String, pub default_user: String, pub org_http_headers_regex: Regex, @@ -48,6 +51,7 @@ impl ServerState { media_files_dir: &Path, upload_dir: &Path, url_base: &str, + storage: StorageBackend, organizer_uri: Option, grpc_srv_listening_flag: Arc, default_user: String, @@ -58,6 +62,8 @@ impl ServerState { db, media_files_dir: media_files_dir.to_path_buf(), upload_dir: upload_dir.to_path_buf(), + media_base_url: storage.media_base_url().to_string(), + storage, grpc_srv_listening_flag, terminate_flag, url_base: url_base.to_string(), diff --git a/server/src/api_server/test_utils.rs b/server/src/api_server/test_utils.rs index 0f7ab700..202180a6 100644 --- a/server/src/api_server/test_utils.rs +++ b/server/src/api_server/test_utils.rs @@ -190,12 +190,14 @@ macro_rules! api_test { let ws_url = url_base.replace("http", "ws") + "/api/ws"; let media_files_dir = data_dir.join("videos"); let upload_dir = data_dir.join("upload"); + let storage = StorageBackend::local(media_files_dir.clone(), &url_base); let test_regex = validate_org_http_headers_regex("^X[-_]REMOTE[-_]").expect("Test regex failed"); let server_state = ServerState::new( db.clone(), &media_files_dir.clone(), &upload_dir.clone(), &url_base.clone(), + storage, None, grpc_srv_listening_flag.clone(), "anonymous".to_string(), diff --git a/server/src/api_server/tests.rs b/server/src/api_server/tests.rs index d9191bd2..5d06ade7 100644 --- a/server/src/api_server/tests.rs +++ b/server/src/api_server/tests.rs @@ -1,5 +1,6 @@ #![allow(dead_code)] +use crate::storage::StorageBackend; use std::sync::Arc; use std::str::FromStr; use std::sync::atomic::AtomicBool; @@ -13,10 +14,9 @@ use crate::database::DbBasicQuery; use crate::database::error::DBError; use crate::api_server::{parse_auth_headers, run_api_server_async, validate_org_http_headers_regex, UserMessage, UserMessageTopic}; use crate::api_server::server_state::ServerState; +use crate::api_server::test_utils::{ApiTestState, expect_msg, expect_no_msg, write, open_media_file, connect_client_ws}; use crate::database::models::{self}; use crate::database::tests::make_test_db; - -use crate::api_server::test_utils::{ApiTestState, expect_msg, expect_no_msg, write, open_media_file, connect_client_ws}; use crate::grpc::db_models::proto_msg_type_to_event_name; use warp::http::{HeaderMap, HeaderValue}; @@ -163,10 +163,30 @@ async fn test_api_open_bad_media_file() pub async fn expect_user_msg(ws: &mut crate::api_server::test_utils::WsClient, evt_type: proto::user_message::Type ) -> proto::UserMessage { println!(" --expect_user_msg of type {:?} ....", evt_type); - let cmd = expect_client_cmd!(ws, ShowMessages); - assert_eq!(cmd.msgs.len(), 1); - assert_eq!(cmd.msgs[0].r#type, evt_type as i32); - cmd.msgs[0].clone() + // Loop to skip PROGRESS messages when waiting for other message types + loop { + let cmd = expect_client_cmd!(ws, ShowMessages); + // Filter out PROGRESS messages if we're not looking for them + let non_progress_msgs: Vec<_> = cmd.msgs.iter() + .filter(|m| { + if evt_type != proto::user_message::Type::Progress { + m.r#type != proto::user_message::Type::Progress as i32 + } else { + true + } + }) + .collect(); + + if non_progress_msgs.is_empty() { + // Only got PROGRESS messages, keep waiting + println!(" (skipping PROGRESS message, waiting for {:?})", evt_type); + continue; + } + + assert_eq!(non_progress_msgs.len(), 1, "Expected 1 message of type {:?}, got {} non-progress messages", evt_type, non_progress_msgs.len()); + assert_eq!(non_progress_msgs[0].r#type, evt_type as i32, "Expected message type {:?}, got type {}", evt_type, non_progress_msgs[0].r#type); + return non_progress_msgs[0].clone(); + } } #[tokio::test] diff --git a/server/src/api_server/user_session.rs b/server/src/api_server/user_session.rs index 64b66424..a90c3615 100644 --- a/server/src/api_server/user_session.rs +++ b/server/src/api_server/user_session.rs @@ -151,7 +151,7 @@ pub async fn org_authz<'a>( AuthzTopic::MediaFile(v, op) => authz_op::Op::MediaFileOp( authz_op::MediaFileOp { op: op.into(), - media_file: Some(v.to_proto3(&server.url_base, vec![])) }), // omit subtitles for authz check + media_file: Some(v.to_proto3(&server.media_base_url, vec![])) }), // omit subtitles for authz check AuthzTopic::Comment(c, op) => authz_op::Op::CommentOp( authz_op::CommentOp { op: op.into(), diff --git a/server/src/api_server/ws_handers.rs b/server/src/api_server/ws_handers.rs index cda60ee8..58004997 100644 --- a/server/src/api_server/ws_handers.rs +++ b/server/src/api_server/ws_handers.rs @@ -102,7 +102,7 @@ pub async fn msg_open_navigation_page(data: &OpenNavigationPage , ses: &mut User let mut media_files: Vec = Vec::new(); for m in models::MediaFile::get_by_user(&mut server.db.conn()?, &ses.user_id, DBPaging::default())? { let subs = models::Subtitle::get_by_media_file(&mut server.db.conn()?, &m.id, DBPaging::default())?; - media_files.push(m.to_proto3(&server.url_base, subs)); + media_files.push(m.to_proto3(&server.media_base_url, subs)); } let h_txt = if media_files.is_empty() { "

You have no media yet.

" } else { "

All your media files

" }; @@ -137,7 +137,7 @@ pub async fn send_open_media_file_cmd(server: &ServerState, session_id: &str, me let conn = &mut server.db.conn()?; let v_db = models::MediaFile::get(conn, &media_file_id.into())?; let subs = models::Subtitle::get_by_media_file(conn, media_file_id, DBPaging::default())?; - let v = v_db.to_proto3(&server.url_base, subs); + let v = v_db.to_proto3(&server.media_base_url, subs); if v.playback_url.is_none() { return Err(anyhow!("No playback file")); } @@ -425,6 +425,7 @@ pub async fn msg_add_subtitle(data: &AddSubtitle, ses: &mut UserSession, server: }; tokio::fs::write(&orig_sub_file, file_contents).await.context("Failed to write orig subtitle file")?; + server.storage.upload_if_exists(&orig_sub_file); // Convert to WebVTT if needed let playback_filename ={ @@ -461,6 +462,7 @@ pub async fn msg_add_subtitle(data: &AddSubtitle, ses: &mut UserSession, server: } temp_workaround_aspasia_webvtt_bug(&vtt_path)?; + server.storage.upload_if_exists(&vtt_path); Some(vtt_path.file_name().context("Bad filename")?.to_str().context("Bad filename")?.to_string()) }, Err(e) => return Err(anyhow!("Failed to parse subtitle file: {:?}", e)), diff --git a/server/src/grpc/db_models.rs b/server/src/grpc/db_models.rs index 6c1e7458..30882286 100644 --- a/server/src/grpc/db_models.rs +++ b/server/src/grpc/db_models.rs @@ -53,7 +53,7 @@ impl models::MediaFile }) } - pub fn to_proto3(&self, url_base: &str, subtitles: Vec) -> proto::MediaFile + pub fn to_proto3(&self, media_base_url: &str, subtitles: Vec) -> proto::MediaFile { let duration = match (self.duration, self.total_frames, &self.fps) { (Some(dur), Some(total_frames), Some(fps)) => Some(proto::MediaFileDuration { @@ -75,12 +75,12 @@ impl models::MediaFile // Make preview data (thumb sheet and/or thumb url) let thumb_url = if matches!(self.has_thumbnail, Some(true)) { - Some(format!("{}/videos/{}/thumbs/thumb.webp", &url_base, &self.id)) + Some(format!("{}/thumbs/thumb.webp", format!("{}/{}", media_base_url, &self.id))) } else { None }; let thumb_sheet = match (self.thumb_sheet_cols, self.thumb_sheet_rows) { (Some(cols), Some(rows)) => Some(proto::media_file_preview_data::ThumbSheet { - url: format!("{}/videos/{}/thumbs/sheet-{}x{}.webp", &url_base, &self.id, cols, rows), + url: format!("{}/thumbs/sheet-{}x{}.webp", format!("{}/{}", media_base_url, &self.id), cols, rows), rows: rows as u32, cols: cols as u32, }), @@ -110,10 +110,10 @@ impl models::MediaFile added_time: Some(datetime_to_proto3(&self.added_time)), preview_data, processing_metadata, - subtitles: subtitles.into_iter().map(|s| s.to_proto3(url_base)).collect(), + subtitles: subtitles.into_iter().map(|s| s.to_proto3(media_base_url)).collect(), default_subtitle_id: self.default_subtitle_id.map(|id| id.to_string()), - playback_url: playback_uri.map(|uri| format!("{}/videos/{}/{}", url_base, &self.id, uri)), - orig_url: orig_uri.map(|uri| format!("{}/videos/{}/{}", url_base, &self.id, uri)) + playback_url: playback_uri.map(|uri| format!("{}/{}/{}", media_base_url, &self.id, uri)), + orig_url: orig_uri.map(|uri| format!("{}/{}/{}", media_base_url, &self.id, uri)) } } @@ -165,11 +165,12 @@ impl models::Subtitle }) } - pub fn to_proto3(&self, url_base: &str) -> proto::Subtitle + pub fn to_proto3(&self, media_base_url: &str) -> proto::Subtitle { - let orig_url = format!("{}/videos/{}/subs/orig/{}", url_base, &self.media_file_id, &self.orig_filename); + let base = format!("{}/{}", media_base_url, &self.media_file_id); + let orig_url = format!("{}/subs/orig/{}", base, &self.orig_filename); let playback_url = match &self.filename { - Some(f) => format!("{}/videos/{}/subs/{}", url_base, &self.media_file_id, f), + Some(f) => format!("{}/subs/{}", base, f), None => orig_url.clone() }; proto::Subtitle { diff --git a/server/src/grpc/grpc_server.rs b/server/src/grpc/grpc_server.rs index e616cdd2..68bab841 100644 --- a/server/src/grpc/grpc_server.rs +++ b/server/src/grpc/grpc_server.rs @@ -131,7 +131,7 @@ impl org::organizer_outbound_server::OrganizerOutbound for OrganizerOutboundImpl }; let mut proto_items = Vec::with_capacity(items.len()); - for mf in items { proto_items.push(mf.to_proto3(&self.server.url_base, mf.get_subtitles(conn)?)); } + for mf in items { proto_items.push(mf.to_proto3(&self.server.media_base_url, mf.get_subtitles(conn)?)); } Ok(Response::new(org::DbMediaFileList { items: proto_items, @@ -231,7 +231,7 @@ impl org::organizer_outbound_server::OrganizerOutbound for OrganizerOutboundImpl media_files: upsert_type!([ conn, req.media_files, models::MediaFile, models::MediaFileInsert, |it: &proto::MediaFile| it.id.is_empty(), - |it: &models::MediaFile| Ok(it.to_proto3(self.server.url_base.as_str(), it.get_subtitles(conn)?))])?, + |it: &models::MediaFile| Ok(it.to_proto3(self.server.media_base_url.as_str(), it.get_subtitles(conn)?))])?, comments: upsert_type!([ conn, req.comments, models::Comment, models::CommentInsert, |it: &proto::Comment| it.id.is_empty(), @@ -243,7 +243,7 @@ impl org::organizer_outbound_server::OrganizerOutbound for OrganizerOutboundImpl subtitles: upsert_type!([ conn, req.subtitles, models::Subtitle, models::SubtitleInsert, |it: &proto::Subtitle| it.id.is_empty(), - |it: &models::Subtitle| Ok(it.to_proto3(self.server.url_base.as_str()))])?, + |it: &models::Subtitle| Ok(it.to_proto3(self.server.media_base_url.as_str()))])?, })) } diff --git a/server/src/lib.rs b/server/src/lib.rs index b6b6a2f9..93f2fe06 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -12,6 +12,7 @@ pub mod api_server; pub mod database; pub mod tests; pub mod grpc; +pub mod storage; pub const PKG_VERSION: &'static str = env!("CARGO_PKG_VERSION"); pub const PKG_NAME: &'static str = env!("CARGO_PKG_NAME"); @@ -47,6 +48,7 @@ impl ClapshotInit { thumbnail_script: String, transcode_decision_script: String, org_http_headers_regex: regex::Regex, + storage: crate::storage::StorageBackend, terminate_flag: Arc) -> anyhow::Result { @@ -62,7 +64,7 @@ impl ClapshotInit { } // Create subdirectories - for d in &["videos", "incoming", "videos"] { + for d in &["videos", "incoming", "upload"] { std::fs::create_dir_all(&data_dir.join(d))?; } @@ -85,6 +87,7 @@ impl ClapshotInit { &data_dir.join("videos"), &data_dir.join("upload"), &url_base, + storage.clone(), organizer_uri.clone(), grpc_srv_listening_flag.clone(), default_user, @@ -126,7 +129,7 @@ impl ClapshotInit { let vpp_thread = Some({ let db = db.clone(); thread::spawn(move || { video_pipeline::run_forever( - db, tf.clone(), dd, user_msg_tx, poll_interval, resubmit_delay, target_bitrate, upload_rx, n_workers, ingest_username_from, ts, ths, tds)}) + db, tf.clone(), dd, storage.clone(), user_msg_tx, poll_interval, resubmit_delay, target_bitrate, upload_rx, n_workers, ingest_username_from, ts, ths, tds)}) }); @@ -366,6 +369,7 @@ pub fn run_clapshot( thumbnail_script: String, transcode_decision_script: String, org_http_headers_regex: regex::Regex, + storage: crate::storage::StorageBackend, ) -> anyhow::Result<()> { let terminate_flag = Arc::new(AtomicBool::new(false)); @@ -390,6 +394,7 @@ pub fn run_clapshot( thumbnail_script, transcode_decision_script, org_http_headers_regex, + storage, terminate_flag.clone() )?; diff --git a/server/src/main.rs b/server/src/main.rs index 61d26131..55b183b5 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -4,9 +4,10 @@ use clapshot_server::{ api_server::validate_org_http_headers_regex, grpc::{grpc_client::prepare_organizer, grpc_server::make_grpc_server_bind}, run_clapshot, PKG_NAME, PKG_VERSION, + storage::StorageBackend, video_pipeline::IngestUsernameFrom, }; -use std::{path::PathBuf, sync::Arc, str::FromStr}; +use std::{path::PathBuf, str::FromStr, sync::Arc}; use tracing::error; use indoc::indoc; @@ -146,6 +147,28 @@ struct Args { /// Case-insensitive matching. Default is disabled for security. #[arg(long, value_name="REGEX", default_value="^$")] org_http_headers: String, + + /// Storage backend (local or s3-compatible object storage) + #[arg(long, value_name="BACKEND", default_value="local")] + storage_backend: String, + + /// S3-compatible endpoint URL (only needed for non-AWS S3, e.g. MinIO). + /// For AWS S3, leave unset and configure AWS_REGION instead. + #[arg(long, value_name="URL")] + s3_endpoint: Option, + + /// S3 bucket (required for S3 backend) + #[arg(long, value_name="BUCKET")] + s3_bucket: Option, + + /// Path/prefix inside the bucket where media files are stored + #[arg(long, value_name="PREFIX", default_value="videos")] + s3_prefix: String, + + /// Public base URL for accessing the bucket/prefix (used for playback URLs). + /// If not set, defaults to endpoint/bucket or virtual-hosted style URL. + #[arg(long, value_name="URL")] + s3_public_url: Option, } fn main() -> anyhow::Result<()> { @@ -203,6 +226,35 @@ fn main() -> anyhow::Result<()> { // Validate and compile the org_http_headers regex let org_http_headers_regex = validate_org_http_headers_regex(&args.org_http_headers)?; + let storage = match args.storage_backend.as_str() { + "local" => StorageBackend::local(args.data_dir.join("videos"), &url_base), + "s3" => { + let bucket = args.s3_bucket.clone() + .ok_or_else(|| anyhow::anyhow!("--s3-bucket is required for S3 backend"))?; + + // Compute public URL for playback. If not specified, derive from endpoint/bucket. + // For custom endpoints (MinIO etc), default to path-style URLs. + let public_base_url = args.s3_public_url.clone() + .or_else(|| { + args.s3_endpoint.as_ref().map(|ep| + format!("{}/{}", ep.trim_end_matches('/'), &bucket) + ) + }) + .ok_or_else(|| anyhow::anyhow!( + "--s3-public-url is required when --s3-endpoint is not set (AWS S3 requires explicit public URL)" + ))?; + + StorageBackend::s3( + args.data_dir.join("videos"), + bucket, + args.s3_endpoint.clone(), + args.s3_prefix.clone(), + public_base_url, + )? + } + other => bail!("Unknown storage backend '{}'. Valid options: local, s3", other), + }; + // Run the server (blocking) if let Err(e) = run_clapshot( args.data_dir.to_path_buf(), @@ -223,6 +275,7 @@ fn main() -> anyhow::Result<()> { args.thumbnail_script, args.transcode_decision_script, org_http_headers_regex, + storage, ) { error!("run_clapshot() failed: {}", e); } diff --git a/server/src/storage.rs b/server/src/storage.rs new file mode 100644 index 00000000..7cda258b --- /dev/null +++ b/server/src/storage.rs @@ -0,0 +1,375 @@ +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use anyhow::{anyhow, Context}; +use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart}; +use aws_sdk_s3::{primitives::ByteStream, Client}; +use tokio::fs; +use tokio::io::AsyncReadExt; +use tracing; + +pub type ProgressCallback = Arc; + +const MULTIPART_MIN_SIZE: u64 = 5 * 1024 * 1024; +const MULTIPART_CHUNK_SIZE: usize = 8 * 1024 * 1024; +/// Simple content type guessing for a handful of formats we serve. +fn guess_content_type(path: &Path) -> &'static str { + match path + .extension() + .and_then(|e| e.to_str()) + .map(|s| s.to_ascii_lowercase()) + { + Some(ext) if ext == "mp4" => "video/mp4", + Some(ext) if ext == "mkv" => "video/x-matroska", + Some(ext) if ext == "webm" => "video/webm", + Some(ext) if ext == "mov" => "video/quicktime", + Some(ext) if ext == "webp" => "image/webp", + Some(ext) if ext == "png" => "image/png", + Some(ext) if ext == "jpg" || ext == "jpeg" => "image/jpeg", + Some(ext) if ext == "vtt" => "text/vtt", + Some(ext) if ext == "srt" => "application/x-subrip", + _ => "application/octet-stream", + } +} + +#[derive(Clone)] +pub enum StorageBackend { + LocalFs(LocalFsBackend), + S3(ObjectStorageBackend), +} + +impl StorageBackend { + pub fn local(media_root: PathBuf, url_base: &str) -> Self { + let prefix = "videos".to_string(); + let media_base_url = format!("{}/{}", url_base.trim_end_matches('/'), prefix); + StorageBackend::LocalFs(LocalFsBackend { + media_root, + prefix, + media_base_url, + }) + } + + /// Create an S3 storage backend using the AWS SDK default credential chain. + /// + /// Credentials are resolved automatically in this order: + /// 1. Environment variables: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY + /// 2. Shared credentials file: ~/.aws/credentials + /// 3. AWS config file: ~/.aws/config (with profiles) + /// 4. ECS container credentials + /// 5. EC2 instance metadata (IAM role) + /// + /// For MinIO or other S3-compatible storage, set endpoint to the service URL. + /// For AWS S3, leave endpoint as None and set AWS_REGION environment variable. + pub fn s3( + media_root: PathBuf, + bucket: String, + endpoint: Option, + prefix: String, + public_base_url: String, + ) -> anyhow::Result { + let media_base_url = format!( + "{}/{}", + public_base_url.trim_end_matches('/'), + prefix.trim_end_matches('/') + ); + + // Create a temporary runtime just for client initialization. + // The client survives after the runtime is dropped. + // We don't persist the runtime to avoid "cannot drop runtime in async context" panics + // when the storage is dropped inside another tokio runtime (e.g., during server shutdown). + let client = { + let rt = tokio::runtime::Runtime::new().context("create tokio runtime for S3 client init")?; + let client = rt.block_on(async { + let mut config_loader = aws_config::defaults(aws_config::BehaviorVersion::latest()); + + // Only override endpoint for non-AWS S3 (MinIO, etc.) + if let Some(ref ep) = endpoint { + config_loader = config_loader.endpoint_url(ep); + } + + let sdk_config = config_loader.load().await; + let s3_config = aws_sdk_s3::config::Builder::from(&sdk_config) + // Force path-style for MinIO compatibility + .force_path_style(endpoint.is_some()) + .build(); + Client::from_conf(s3_config) + }); + // rt is dropped here, but client survives + client + }; + + Ok(StorageBackend::S3(ObjectStorageBackend { + media_root, + prefix, + media_base_url, + client: Arc::new(client), + bucket, + })) + } + + pub fn media_base_url(&self) -> &str { + match self { + StorageBackend::LocalFs(b) => &b.media_base_url, + StorageBackend::S3(b) => &b.media_base_url, + } + } + + pub fn media_root(&self) -> &Path { + match self { + StorageBackend::LocalFs(b) => &b.media_root, + StorageBackend::S3(b) => &b.media_root, + } + } + + pub fn needs_remote_upload(&self) -> bool { + matches!(self, StorageBackend::S3(_)) + } + + /// Upload a file that lives under the media root. No-op for LocalFS. + pub fn upload_local_path(&self, abs_path: &Path) -> anyhow::Result<()> { + self.upload_with_progress(abs_path, None) + } + + /// Upload file if it exists and log an error instead of bailing. + pub fn upload_if_exists(&self, abs_path: &Path) { + if !self.needs_remote_upload() { + return; + } + if !abs_path.exists() { + tracing::debug!(path=?abs_path, "Skipping upload for missing file"); + return; + } + if let Err(e) = self.upload_local_path(abs_path) { + tracing::error!(path=?abs_path, details=%e, "Failed to upload asset to object storage"); + } + } + + /// Upload a file when object storage is enabled, and propagate failures. + pub fn upload_required(&self, abs_path: &Path) -> anyhow::Result<()> { + if !self.needs_remote_upload() { + return Ok(()); + } + self.upload_with_progress(abs_path, None) + } + + /// Upload a file, optionally reporting progress (0.0 - 1.0) while streaming to object storage. + pub fn upload_with_progress( + &self, + abs_path: &Path, + progress: Option, + ) -> anyhow::Result<()> { + match self { + StorageBackend::LocalFs(_) => { + if let Some(cb) = progress { + cb(1.0); + } + Ok(()) + } + StorageBackend::S3(backend) => backend.upload_with_progress(abs_path, progress), + } + } + + fn key_for_path(&self, abs_path: &Path) -> anyhow::Result { + let root = self.media_root(); + let rel = abs_path + .strip_prefix(root) + .with_context(|| format!("Path '{:?}' not under media root '{:?}'", abs_path, root))?; + let rel = rel.to_string_lossy().replace('\\', "/"); + let prefix = match self { + StorageBackend::LocalFs(b) => &b.prefix, + StorageBackend::S3(b) => &b.prefix, + } + .trim_end_matches('/'); + + if prefix.is_empty() { + Ok(rel) + } else { + Ok(format!("{}/{}", prefix, rel)) + } + } +} + +#[derive(Clone)] +pub struct LocalFsBackend { + pub media_root: PathBuf, + pub prefix: String, + pub media_base_url: String, +} + +#[derive(Clone)] +pub struct ObjectStorageBackend { + pub media_root: PathBuf, + pub prefix: String, + pub media_base_url: String, + pub bucket: String, + pub client: Arc, +} + +impl ObjectStorageBackend { + fn upload_with_progress( + &self, + abs_path: &Path, + progress: Option, + ) -> anyhow::Result<()> { + let key = StorageBackend::S3(self.clone()).key_for_path(abs_path)?; + let ct = guess_content_type(abs_path); + let bucket = self.bucket.clone(); + let client = self.client.clone(); + let path = abs_path.to_path_buf(); + + // Create a fresh runtime for each upload to avoid "cannot drop runtime in async context" panics. + // This is slightly less efficient than reusing a runtime, but much safer when the storage + // is held by code that runs inside another tokio runtime (like the api_server). + let rt = tokio::runtime::Runtime::new().context("create tokio runtime for S3 upload")?; + rt.block_on(async move { + let mut file = fs::File::open(&path) + .await + .with_context(|| format!("Open file {:?}", path))?; + let meta = file.metadata().await?; + let total_len = meta.len(); + + if total_len == 0 { + if let Some(cb) = progress.as_ref() { + cb(1.0); + } + client + .put_object() + .bucket(&bucket) + .key(&key) + .body(ByteStream::from(Vec::new())) + .content_type(ct) + .send() + .await + .context("upload empty object to storage")?; + return Ok(()); + } + + if total_len <= MULTIPART_MIN_SIZE { + let mut buffer = Vec::with_capacity(total_len as usize); + file.read_to_end(&mut buffer).await?; + + client + .put_object() + .bucket(&bucket) + .key(&key) + .body(ByteStream::from(buffer)) + .content_type(ct) + .send() + .await + .context("upload small object to storage")?; + + if let Some(cb) = progress { + cb(1.0); + } + return Ok(()); + } + + let upload = client + .create_multipart_upload() + .bucket(&bucket) + .key(&key) + .content_type(ct) + .send() + .await + .context("initiate multipart upload")?; + + let upload_id = upload + .upload_id() + .ok_or(anyhow!("Missing upload id from multipart upload"))? + .to_string(); + + let mut parts = Vec::new(); + let mut buf = vec![0u8; MULTIPART_CHUNK_SIZE]; + let mut part_number = 1; + let mut uploaded: u64 = 0; + + loop { + // Read a complete chunk (read() may return short reads with async I/O) + // S3 requires all parts except the last to be >= 5MB + let mut chunk_size = 0; + loop { + let bytes_read = file.read(&mut buf[chunk_size..]).await?; + if bytes_read == 0 { + break; // EOF + } + chunk_size += bytes_read; + if chunk_size >= MULTIPART_CHUNK_SIZE { + break; // Full chunk + } + } + + if chunk_size == 0 { + break; // No more data + } + + let body = ByteStream::from(buf[..chunk_size].to_vec()); + let res = client + .upload_part() + .bucket(&bucket) + .key(&key) + .upload_id(&upload_id) + .part_number(part_number) + .body(body) + .send() + .await + .with_context(|| format!("upload part {part_number}"))?; + + let etag = res + .e_tag() + .ok_or(anyhow!("Missing etag for uploaded part {part_number}"))? + .to_string(); + + parts.push( + CompletedPart::builder() + .e_tag(etag) + .part_number(part_number) + .build(), + ); + + uploaded += chunk_size as u64; + if let Some(cb) = progress.as_ref() { + cb((uploaded as f32 / total_len as f32).clamp(0.0, 1.0)); + } + + part_number += 1; + } + + let multipart = CompletedMultipartUpload::builder() + .set_parts(Some(parts)) + .build(); + + if let Err(e) = client + .complete_multipart_upload() + .bucket(&bucket) + .key(&key) + .upload_id(&upload_id) + .multipart_upload(multipart) + .send() + .await + { + tracing::error!( + details=%e, + upload_id=%upload_id, + key=%key, + "Completing multipart upload failed, aborting" + ); + // Best-effort abort; ignore abort error to bubble the original failure. + let _ = client + .abort_multipart_upload() + .bucket(&bucket) + .key(&key) + .upload_id(upload_id) + .send() + .await; + return Err(anyhow!("complete multipart upload: {e}")); + } + + if let Some(cb) = progress { + cb(1.0); + } + Ok::<(), anyhow::Error>(()) + })?; + + Ok(()) + } +} diff --git a/server/src/tests/integration_test.rs b/server/src/tests/integration_test.rs index a06ac3fd..ff8d756e 100644 --- a/server/src/tests/integration_test.rs +++ b/server/src/tests/integration_test.rs @@ -24,6 +24,7 @@ mod integration_test use crate::api_server::tests::expect_user_msg; use crate::api_server::validate_org_http_headers_regex; + use crate::storage::StorageBackend; use crate::database::schema::media_files::{thumb_sheet_cols, thumb_sheet_rows}; use crate::{expect_client_cmd, send_server_cmd}; @@ -65,6 +66,7 @@ mod integration_test file_path: PathBuf::from_str(data_dir.join("NASA_Red_Lettuce_excerpt.mov").to_str().unwrap())?, user_id: "nobody".to_string(), cookies: HashMap::new(), + transcode_preference: crate::video_pipeline::TranscodePreference::Auto, }; arg_sender.send(args.clone())?; @@ -105,10 +107,23 @@ mod integration_test } macro_rules! cs_main_test { + // 8-param variant: default storage, no ws_user_override ([$ws:ident, $data_dir:ident, $incoming_dir:ident, $org_conn:ident, $bitrate:expr, $org_cmd:expr, $custom_assertfs:expr, $ingest_username_from:expr] $($body:tt)*) => { - cs_main_test!([$ws, $data_dir, $incoming_dir, $org_conn, $bitrate, $org_cmd, $custom_assertfs, $ingest_username_from, None] $($body)*) + cs_main_test!(@impl [$ws, $data_dir, $incoming_dir, $org_conn, $bitrate, $org_cmd, $custom_assertfs, $ingest_username_from, None, + |media_root: std::path::PathBuf, url_base: &str| crate::storage::StorageBackend::local(media_root, url_base)] $($body)*) }; + // 9-param variant: default storage, with ws_user_override ([$ws:ident, $data_dir:ident, $incoming_dir:ident, $org_conn:ident, $bitrate:expr, $org_cmd:expr, $custom_assertfs:expr, $ingest_username_from:expr, $ws_user_override:expr] $($body:tt)*) => { + cs_main_test!(@impl [$ws, $data_dir, $incoming_dir, $org_conn, $bitrate, $org_cmd, $custom_assertfs, $ingest_username_from, $ws_user_override, + |media_root: std::path::PathBuf, url_base: &str| crate::storage::StorageBackend::local(media_root, url_base)] $($body)*) + }; + // 10-param variant: custom storage factory (receives media_root only, url_base captured by caller) + ([$ws:ident, $data_dir:ident, $incoming_dir:ident, $org_conn:ident, $bitrate:expr, $org_cmd:expr, $custom_assertfs:expr, $ingest_username_from:expr, $ws_user_override:expr, $storage_factory:expr] $($body:tt)*) => { + cs_main_test!(@impl [$ws, $data_dir, $incoming_dir, $org_conn, $bitrate, $org_cmd, $custom_assertfs, $ingest_username_from, $ws_user_override, + |media_root: std::path::PathBuf, _url_base: &str| { let f = $storage_factory; f(media_root) }] $($body)*) + }; + // Single implementation - storage_factory takes (media_root, url_base) + (@impl [$ws:ident, $data_dir:ident, $incoming_dir:ident, $org_conn:ident, $bitrate:expr, $org_cmd:expr, $custom_assertfs:expr, $ingest_username_from:expr, $ws_user_override:expr, $storage_factory:expr] $($body:tt)*) => { { let $data_dir = $custom_assertfs.unwrap_or(assert_fs::TempDir::new().unwrap()); let $incoming_dir = $data_dir.join("incoming"); @@ -131,9 +146,11 @@ mod integration_test let data_dir = $data_dir.path().to_path_buf(); let url_base = url_base.clone(); let org_uri = org_uri.clone(); + let media_root = data_dir.join("videos"); + let storage = { let f = $storage_factory; f(media_root, &url_base) }; let tf = terminate_flag.clone(); thread::spawn(move || { - let mut clapshot = crate::ClapshotInit::init_and_spawn_workers(data_dir, true, url_base, vec![], "127.0.0.1".into(), port, org_uri.clone(), grpc_server_bind, 4, target_bitrate, poll_interval, "anonymous".to_string(), poll_interval*5.0, $ingest_username_from, "scripts/clapshot-transcode".to_string(), "scripts/clapshot-thumbnail".to_string(), "scripts/clapshot-transcode-decision".to_string(), regex, tf)?; + let mut clapshot = crate::ClapshotInit::init_and_spawn_workers(data_dir, true, url_base, vec![], "127.0.0.1".into(), port, org_uri.clone(), grpc_server_bind, 4, target_bitrate, poll_interval, "anonymous".to_string(), poll_interval*5.0, $ingest_username_from, "scripts/clapshot-transcode".to_string(), "scripts/clapshot-thumbnail".to_string(), "scripts/clapshot-transcode-decision".to_string(), regex, storage, tf)?; clapshot.wait_for_termination() })}; @@ -155,7 +172,7 @@ mod integration_test tracing::info!("Waiting for run_clapshot() to terminate..."); let _ = th.join().unwrap(); } - } + }; } #[test] @@ -245,6 +262,7 @@ mod integration_test // --- Transcoding tests --- pub struct WaitForReportResults { + pub media_id: String, pub transcode_complete: bool, pub thumbs_complete: bool, pub got_progress_report: bool, @@ -262,6 +280,7 @@ mod integration_test check_file_outputs: Option<(PathBuf, String)>) -> WaitForReportResults { let mut res = WaitForReportResults { + media_id: String::new(), transcode_complete: false, thumbs_complete: false, got_progress_report: false, got_transcode_report: false, got_thumbnail_report: false, ts_cols: String::new(), ts_rows: String::new(), @@ -273,6 +292,7 @@ mod integration_test thread::sleep(Duration::from_secs_f32(0.5)); let msg = expect_user_msg(&mut ws, proto::user_message::Type::MediaFileAdded).await; // notification to client (with upload folder info etc) let vid = msg.refs.unwrap().media_file_id.unwrap(); + res.media_id = vid.clone(); thread::sleep(Duration::from_secs_f32(0.5)); let msg = expect_user_msg(&mut ws, proto::user_message::Type::Ok).await; // notification to user (in text) @@ -590,6 +610,7 @@ mod integration_test file_path: test_file.clone(), user_id: "test_user".to_string(), cookies: HashMap::new(), + transcode_preference: crate::video_pipeline::TranscodePreference::Auto, }; let (tx, rx) = crossbeam_channel::unbounded(); @@ -953,4 +974,303 @@ mod integration_test Ok(()) } + // ==================== S3/MinIO Integration Tests ==================== + + const TEST_BUCKET: &str = "clapshot-test"; + + /// Helper to manage a temporary MinIO instance for testing. + /// Spawns MinIO on a free port with a temp data directory. + /// Automatically cleans up when dropped. + struct TempMinIO { + process: std::process::Child, + endpoint: String, + port: u16, + _data_dir: assert_fs::TempDir, + } + + impl TempMinIO { + /// Start a new MinIO instance. Returns None if `minio` is not in PATH. + fn start() -> Option { + // Check if minio binary is available + if std::process::Command::new("minio") + .arg("--version") + .stdout(std::process::Stdio::null()) + .stderr(std::process::Stdio::null()) + .status() + .is_err() + { + tracing::warn!("MinIO binary not found in PATH - skipping S3 tests"); + return None; + } + + let port = portpicker::pick_unused_port().expect("No TCP ports free"); + let console_port = portpicker::pick_unused_port().expect("No TCP ports free for console"); + let data_dir = assert_fs::TempDir::new().expect("Failed to create temp dir for MinIO"); + + tracing::info!("Starting MinIO on port {} with data dir {:?}", port, data_dir.path()); + + let process = std::process::Command::new("minio") + .arg("server") + .arg(data_dir.path()) + .arg("--address") + .arg(format!("127.0.0.1:{}", port)) + .arg("--console-address") + .arg(format!("127.0.0.1:{}", console_port)) + .env("MINIO_ROOT_USER", "minioadmin") + .env("MINIO_ROOT_PASSWORD", "minioadmin") + .stdout(std::process::Stdio::null()) + .stderr(std::process::Stdio::null()) + .spawn() + .expect("Failed to start MinIO"); + + let endpoint = format!("http://127.0.0.1:{}", port); + + // Wait for MinIO to be ready + let start = std::time::Instant::now(); + let timeout = Duration::from_secs(10); + loop { + if start.elapsed() > timeout { + tracing::error!("MinIO failed to start within timeout"); + return None; + } + if reqwest::blocking::get(format!("{}/minio/health/live", endpoint)) + .map(|r| r.status().is_success()) + .unwrap_or(false) + { + tracing::info!("MinIO is ready on {}", endpoint); + break; + } + thread::sleep(Duration::from_millis(100)); + } + + Some(TempMinIO { + process, + endpoint, + port, + _data_dir: data_dir, + }) + } + + /// Create an S3 client for this MinIO instance (for test verification) + fn s3_client(&self) -> aws_sdk_s3::Client { + use aws_sdk_s3::config::Region; + + let rt = tokio::runtime::Runtime::new().unwrap(); + let endpoint = self.endpoint.clone(); + rt.block_on(async move { + let config = aws_config::defaults(aws_config::BehaviorVersion::latest()) + .endpoint_url(&endpoint) + .region(Region::new("us-east-1")) + .load() + .await; + let s3_config = aws_sdk_s3::config::Builder::from(&config) + .force_path_style(true) + .build(); + aws_sdk_s3::Client::from_conf(s3_config) + }) + } + + /// Create the test bucket + fn create_bucket(&self) -> anyhow::Result<()> { + let client = self.s3_client(); + let rt = tokio::runtime::Runtime::new()?; + rt.block_on(async { + client + .create_bucket() + .bucket(TEST_BUCKET) + .send() + .await + .map_err(|e| anyhow::anyhow!("Failed to create bucket: {}", e))?; + Ok(()) + }) + } + + /// Create a StorageBackend for this MinIO instance + fn storage_backend(&self, media_root: PathBuf, prefix: &str) -> anyhow::Result { + StorageBackend::s3( + media_root, + TEST_BUCKET.to_string(), + Some(self.endpoint.clone()), + prefix.to_string(), + format!("{}/{}", self.endpoint, TEST_BUCKET), + ) + } + + /// Check if an object exists in S3 + fn object_exists(&self, key: &str) -> bool { + let client = self.s3_client(); + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { + client + .head_object() + .bucket(TEST_BUCKET) + .key(key) + .send() + .await + .is_ok() + }) + } + + /// List all objects under a prefix (blocking version for non-async tests) + fn list_objects(&self, prefix: &str) -> Vec { + let client = self.s3_client(); + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(Self::list_objects_async(&client, prefix)) + } + + /// List all objects under a prefix (async version for use inside async contexts) + async fn list_objects_async(client: &aws_sdk_s3::Client, prefix: &str) -> Vec { + client + .list_objects_v2() + .bucket(TEST_BUCKET) + .prefix(prefix) + .send() + .await + .map(|r| r.contents().iter().filter_map(|o| o.key().map(String::from)).collect()) + .unwrap_or_default() + } + + /// Set up AWS env vars for SDK credential chain + fn setup_env_vars() { + std::env::set_var("AWS_ACCESS_KEY_ID", "minioadmin"); + std::env::set_var("AWS_SECRET_ACCESS_KEY", "minioadmin"); + std::env::set_var("AWS_REGION", "us-east-1"); + } + + /// Set up a complete test environment. Returns (storage, data_dir, prefix). + fn setup_test(&self) -> anyhow::Result<(StorageBackend, assert_fs::TempDir, String)> { + TempMinIO::setup_env_vars(); + self.create_bucket()?; + + let data_dir = assert_fs::TempDir::new()?; + let media_root = data_dir.path().join("videos"); + std::fs::create_dir_all(&media_root)?; + + let prefix = format!("test-{}", uuid::Uuid::new_v4()); + let storage = self.storage_backend(media_root, &prefix)?; + + Ok((storage, data_dir, prefix)) + } + } + + impl Drop for TempMinIO { + fn drop(&mut self) { + tracing::info!("Stopping MinIO on port {}", self.port); + let _ = self.process.kill(); + let _ = self.process.wait(); + } + } + + /// Tests S3 upload for both small files (simple PUT) and large files (multipart). + /// Also verifies progress callback and media_base_url. + #[test] + #[serial] + #[traced_test] + fn test_s3_storage_upload() -> anyhow::Result<()> { + let minio = match TempMinIO::start() { + Some(m) => m, + None => return Ok(()), // Skip if MinIO not available + }; + let (storage, data_dir, prefix) = minio.setup_test()?; + let media_root = data_dir.path().join("videos"); + + // Verify media_base_url is correct + let expected_url = format!("{}/{}/{}", minio.endpoint, TEST_BUCKET, prefix); + assert_eq!(storage.media_base_url(), expected_url); + + // Test 1: Small file upload (simple PUT, below 5MB threshold) + let small_dir = media_root.join("small-file"); + std::fs::create_dir_all(&small_dir)?; + let small_file = small_dir.join("small.mp4"); + std::fs::write(&small_file, b"small test content")?; + storage.upload_local_path(&small_file)?; + assert!(minio.object_exists(&format!("{}/small-file/small.mp4", prefix)), + "Small file should exist in S3"); + + // Test 2: Large file upload (multipart, 10MB > 5MB threshold) with progress + let large_dir = media_root.join("large-file"); + std::fs::create_dir_all(&large_dir)?; + let large_file = large_dir.join("large.mp4"); + std::fs::write(&large_file, vec![0u8; 10 * 1024 * 1024])?; + + let progress_values = Arc::new(std::sync::Mutex::new(Vec::new())); + let pv = progress_values.clone(); + let progress_cb: crate::storage::ProgressCallback = Arc::new(move |p| { + pv.lock().unwrap().push(p); + }); + storage.upload_with_progress(&large_file, Some(progress_cb))?; + + let progress = progress_values.lock().unwrap(); + assert!(!progress.is_empty(), "Progress should have been reported"); + assert!((progress.last().unwrap() - 1.0).abs() < 0.001, "Final progress should be ~1.0"); + assert!(minio.object_exists(&format!("{}/large-file/large.mp4", prefix)), + "Large file should exist in S3"); + + Ok(()) + } + + /// Full E2E test: ingest video → transcode → upload to S3 + #[test] + #[serial] + #[traced_test] + #[cfg(feature = "include_slow_tests")] + fn test_s3_video_ingest_transcode_upload() -> anyhow::Result<()> { + TempMinIO::setup_env_vars(); + let minio = match TempMinIO::start() { + Some(m) => m, + None => return Ok(()), + }; + minio.create_bucket()?; + + let test_prefix = format!("test-e2e-{}", uuid::Uuid::new_v4()); + let minio_endpoint = minio.endpoint.clone(); + let prefix_clone = test_prefix.clone(); + + // Get S3 client before entering async context (avoids nested runtime) + let s3_client = minio.s3_client(); + + let storage_factory = move |media_root: PathBuf| -> StorageBackend { + StorageBackend::s3( + media_root, TEST_BUCKET.to_string(), Some(minio_endpoint.clone()), + prefix_clone.clone(), format!("{}/{}", minio_endpoint, TEST_BUCKET), + ).expect("Failed to create S3 storage backend") + }; + + cs_main_test! {[ws, data_dir, incoming_dir, _org_conn, 500_000, None, None, IngestUsernameFrom::FileOwner, None, storage_factory] + // Ingest test video + let video_file_name = "NASA_Red_Lettuce_excerpt.mov"; + data_dir.copy_from("src/tests/assets/", &[video_file_name]).unwrap(); + std::fs::rename(data_dir.join(video_file_name), incoming_dir.join(video_file_name)).unwrap(); + + // Wait for processing (transcode + thumbnails) + let wait_res = wait_for_reports(&mut ws, true, true, true, None).await; + assert!(wait_res.transcode_complete, "Transcode did not complete"); + assert!(wait_res.thumbs_complete, "Thumbnails did not complete"); + + // Give S3 upload time to finish + thread::sleep(Duration::from_secs(2)); + + // Verify files in S3 (use async version to avoid nested runtime) + let objects = TempMinIO::list_objects_async(&s3_client, &format!("{}/{}/", test_prefix, wait_res.media_id)).await; + tracing::info!("S3 objects: {:?}", objects); + + assert!(objects.iter().any(|k| k.contains("video.mp4")), "Transcoded video missing: {:?}", objects); + assert!(objects.iter().any(|k| k.contains("/thumbs/")), "Thumbnails missing: {:?}", objects); + } + + Ok(()) + } + + #[test] + fn test_s3_storage_needs_remote_upload() -> anyhow::Result<()> { + let data_dir = assert_fs::TempDir::new()?; + let media_root = data_dir.path().join("videos"); + + // Local storage should not need remote upload + let local_storage = StorageBackend::local(media_root.clone(), "http://localhost:8080"); + assert!(!local_storage.needs_remote_upload()); + + Ok(()) + } + } diff --git a/server/src/video_pipeline/incoming_monitor.rs b/server/src/video_pipeline/incoming_monitor.rs index a2b44ba9..186af68a 100644 --- a/server/src/video_pipeline/incoming_monitor.rs +++ b/server/src/video_pipeline/incoming_monitor.rs @@ -119,7 +119,7 @@ pub fn run_forever( tracing::info!("Submitting for processing."); submission_time.insert(path.clone(), std::time::Instant::now()); if let Err(e) = incoming_sender.send( - super::IncomingFile {file_path: path.clone(), user_id: username, cookies: HashMap::new()}) { + super::IncomingFile {file_path: path.clone(), user_id: username, cookies: HashMap::new(), transcode_preference: super::TranscodePreference::Auto}) { tracing::error!(details=%e, "Failed to send incoming file to processing queue."); } }, diff --git a/server/src/video_pipeline/metadata_reader.rs b/server/src/video_pipeline/metadata_reader.rs index 31a08f15..1b5f36f0 100644 --- a/server/src/video_pipeline/metadata_reader.rs +++ b/server/src/video_pipeline/metadata_reader.rs @@ -49,7 +49,8 @@ pub struct Metadata { pub fps: Decimal, pub bitrate: u32, pub metadata_all: String, - pub upload_cookies: HashMap // Cookies from the upload, not read from the file + pub upload_cookies: HashMap, // Cookies from the upload, not read from the file + pub transcode_preference: super::TranscodePreference, } pub type MetadataResult = Result; @@ -165,7 +166,8 @@ fn extract_variables(json: serde_json::Value, args: &IncomingFile, get_file_s fps: Decimal::from_str(video_track["FrameRate"].as_str().ok_or("FPS not found")?).map_err(|_| "Invalid FPS".to_string())?, bitrate, metadata_all: json.to_string(), - upload_cookies: args.cookies.clone() + upload_cookies: args.cookies.clone(), + transcode_preference: args.transcode_preference, }) } @@ -181,7 +183,8 @@ fn extract_variables(json: serde_json::Value, args: &IncomingFile, get_file_s fps: Decimal::from_u8(0).unwrap(), bitrate: audio_track["BitRate"].as_str().ok_or("Bitrate not found")?.parse().map_err(|_| "Invalid bitrate".to_string())?, metadata_all: json.to_string(), - upload_cookies: args.cookies.clone() + upload_cookies: args.cookies.clone(), + transcode_preference: args.transcode_preference, }) } @@ -197,7 +200,8 @@ fn extract_variables(json: serde_json::Value, args: &IncomingFile, get_file_s fps: Decimal::from_u8(0).unwrap(), bitrate: 0, metadata_all: json.to_string(), - upload_cookies: args.cookies.clone() + upload_cookies: args.cookies.clone(), + transcode_preference: args.transcode_preference, }) } else { return Err("No video, audio or image track found".to_string()); @@ -280,7 +284,8 @@ fn test_fixture(has_bitrate: bool, has_fps: bool) -> (IncomingFile, serde_json:: let args = IncomingFile { file_path: PathBuf::from("test.mp4"), user_id: "test_user".to_string(), - cookies: Default::default() + cookies: Default::default(), + transcode_preference: super::TranscodePreference::Auto, }; (args, json) diff --git a/server/src/video_pipeline/mod.rs b/server/src/video_pipeline/mod.rs index 235f1cbc..b9ecea13 100644 --- a/server/src/video_pipeline/mod.rs +++ b/server/src/video_pipeline/mod.rs @@ -4,24 +4,24 @@ #![allow(unused_parens)] -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::io::Read; +use std::path::{Path, PathBuf}; use std::process::Command; use std::str::FromStr; -use std::sync::Arc; use std::sync::atomic::AtomicBool; +use std::sync::Arc; use std::thread; -use std::path::{PathBuf, Path}; use crossbeam_channel; -use crossbeam_channel::{Receiver, unbounded, select}; +use crossbeam_channel::{select, unbounded, Receiver}; use rust_decimal::prelude::{FromPrimitive, ToPrimitive}; use rust_decimal::Decimal; use tracing; -use anyhow::{anyhow, Context, bail}; -use sha2::{Sha256, Digest}; +use anyhow::{anyhow, bail, Context}; use hex; +use sha2::{Digest, Sha256}; pub mod incoming_monitor; pub mod metadata_reader; @@ -29,12 +29,13 @@ pub mod metadata_reader; mod cleanup_rejected; mod script_processor; -use metadata_reader::MetadataResult; use crate::api_server::{UserMessage, UserMessageTopic}; use crate::database::error::DBError; +use crate::database::{models, DbBasicQuery, DB}; +use crate::storage::StorageBackend; use crate::video_pipeline::metadata_reader::MediaType; use cleanup_rejected::clean_up_rejected_file; -use crate::database::{DB, models, DbBasicQuery}; +use metadata_reader::MetadataResult; #[derive(Debug, Clone)] pub enum IngestUsernameFrom { @@ -49,7 +50,10 @@ impl std::str::FromStr for IngestUsernameFrom { match s { "file-owner" => Ok(IngestUsernameFrom::FileOwner), "folder-name" => Ok(IngestUsernameFrom::FolderName), - _ => Err(format!("Invalid value '{}', must be 'file-owner' or 'folder-name'", s)), + _ => Err(format!( + "Invalid value '{}', must be 'file-owner' or 'folder-name'", + s + )), } } } @@ -59,12 +63,19 @@ pub const THUMB_SHEET_ROWS: u32 = 10; pub const THUMB_W: u32 = 160; pub const THUMB_H: u32 = 90; +#[derive(Debug, Clone, Copy)] +pub enum TranscodePreference { + Auto, + Force, + Skip, +} -#[derive (Clone, Debug)] +#[derive(Clone, Debug)] pub struct IncomingFile { pub file_path: PathBuf, pub user_id: String, - pub cookies: HashMap // Cookies from client, if this was an HTTP upload + pub cookies: HashMap, // Cookies from client, if this was an HTTP upload + pub transcode_preference: TranscodePreference, } #[derive(Debug, Clone)] @@ -75,13 +86,129 @@ pub struct DetailedMsg { pub user_id: String, } +fn send_progress_update( + user_msg_tx: &crossbeam_channel::Sender, + user_id: &str, + media_file_id: &str, + msg: &str, + progress: f32, +) { + let _ = user_msg_tx.send(UserMessage { + topic: UserMessageTopic::Progress, + msg: msg.to_string(), + details: None, + user_id: Some(user_id.to_string()), + media_file_id: Some(media_file_id.to_string()), + subtitle_id: None, + progress: Some(progress.clamp(0.0, 1.0)), + }); +} + +fn upload_to_storage_with_progress( + storage: &StorageBackend, + abs_path: &Path, + user_msg_tx: &crossbeam_channel::Sender, + user_id: &str, + media_file_id: &str, + label: &str, +) -> anyhow::Result<()> { + if !storage.needs_remote_upload() { + return storage.upload_local_path(abs_path); + } + + let tx = user_msg_tx.clone(); + let uid = user_id.to_string(); + let mid = media_file_id.to_string(); + let label = label.to_string(); + let callback = Arc::new(move |ratio: f32| { + send_progress_update(&tx, &uid, &mid, &label, ratio); + }); + + // Kick off the bar right away so the client shows it while we stream to S3. + callback(0.0); + storage.upload_with_progress(abs_path, Some(callback)) +} + +fn cleanup_local_media_dir(videos_dir: &Path, media_id: &str) -> anyhow::Result<()> { + let media_dir = videos_dir.join(media_id); + if !media_dir.exists() { + return Ok(()); + } + + // Remove main video files/symlink in the media root + let video_exts = ["mp4", "mkv", "webm", "mov", "avi"]; + if let Ok(entries) = std::fs::read_dir(&media_dir) { + for entry in entries.flatten() { + let path = entry.path(); + if path.is_file() { + let remove = path + .extension() + .and_then(|e| e.to_str()) + .map(|ext| video_exts.contains(&ext.to_ascii_lowercase().as_str())) + .unwrap_or(false) + || path.file_name().and_then(|n| n.to_str()) == Some("video.mp4"); + + if remove { + if let Err(e) = std::fs::remove_file(&path) { + tracing::warn!(details=%e, file=?path, "Failed to remove local media file after upload"); + } + } + } + } + } + + for sub in ["orig", "thumbs"] { + let dir = media_dir.join(sub); + if dir.exists() { + if let Err(e) = std::fs::remove_dir_all(&dir) { + tracing::warn!(details=%e, dir=?dir, "Failed to remove local media directory after upload"); + } + } + } + + Ok(()) +} + +fn maybe_cleanup_local_media( + storage: &StorageBackend, + videos_dir: &Path, + media_id: &str, + transcode_pending: bool, + db: &DB, +) { + if !storage.needs_remote_upload() { + return; + } + + let ready = db + .conn() + .and_then(|mut conn| models::MediaFile::get(&mut conn, &media_id.to_string())) + .map(|mf| { + let thumbs_done = mf.thumbs_done.is_some(); + let transcode_done = mf.recompression_done.is_some() || !transcode_pending; + thumbs_done && transcode_done + }) + .unwrap_or(false); + + if ready { + if let Err(e) = cleanup_local_media_dir(videos_dir, media_id) { + tracing::warn!(details=%e, media_file_id=%media_id, "Failed to clean up local media after upload"); + } + } +} /// Calculate hash identifier (media_file_id) for the submitted files, /// based on filename, user_id, size and sample of the file contents. -fn calc_media_file_id(file_path: &PathBuf, user_id: &str, upload_cookies: HashMap) -> anyhow::Result { +fn calc_media_file_id( + file_path: &PathBuf, + user_id: &str, + upload_cookies: HashMap, +) -> anyhow::Result { let mut file_hash = Sha256::new(); - let fname = file_path.file_name() - .ok_or(anyhow!("Bad filename: {:?}", file_path))?.to_str() + let fname = file_path + .file_name() + .ok_or(anyhow!("Bad filename: {:?}", file_path))? + .to_str() .ok_or(anyhow!("Bad filename encoding {:?}", file_path))?; file_hash.update(fname.as_bytes()); @@ -101,7 +228,7 @@ fn calc_media_file_id(file_path: &PathBuf, user_id: &str, upload_cookies: HashMa // Read max 32k of contents let file = std::fs::File::open(file_path)?; - let mut buf = Vec::with_capacity(32*1024); + let mut buf = Vec::with_capacity(32 * 1024); file.take(32768u64).read_to_end(&mut buf)?; file_hash.update(&buf); @@ -118,22 +245,26 @@ fn ingest_media_file( md: &metadata_reader::Metadata, data_dir: &Path, media_files_dir: &Path, + storage: &StorageBackend, target_bitrate: u32, db: &DB, user_msg_tx: &crossbeam_channel::Sender, - cmpr_tx: &crossbeam_channel::Sender, - transcode_decision_script: &str) + transcode_decision_script: &str, + cmpr_tx: &crossbeam_channel::Sender) -> anyhow::Result { let _span = tracing::info_span!("INGEST_MEDIA", media_id = %media_id, user=md.user_id, - filename=%md.src_file.file_name().unwrap_or_default().to_string_lossy()).entered(); + filename=%md.src_file.file_name().unwrap_or_default().to_string_lossy()) + .entered(); tracing::info!("Ingesting file."); let src = PathBuf::from(&md.src_file); - if !src.is_file() { bail!("Source file not found: {:?}", src) } + if !src.is_file() { + bail!("Source file not found: {:?}", src) + } let dir_for_media_file = media_files_dir.join(&media_id); tracing::debug!("Media dir = {:?}", dir_for_media_file); @@ -146,23 +277,27 @@ fn ingest_media_file( let new_owner = &md.user_id; if &v.user_id == new_owner { tracing::info!("User already has this media file."); - user_msg_tx.send(UserMessage { - topic: UserMessageTopic::Ok, - msg: "Media file already exists".to_string(), - user_id: Some(new_owner.clone()), - media_file_id: None, // Don't pass media file id here, otherwise the pre-existing media would be deleted! - ..Default::default() - }).ok(); - - clean_up_rejected_file(&data_dir, &src, Some(media_id.into())).unwrap_or_else(|e| { - tracing::error!(details=?e, "Cleanup failed."); - }); + user_msg_tx + .send(UserMessage { + topic: UserMessageTopic::Ok, + msg: "Media file already exists".to_string(), + user_id: Some(new_owner.clone()), + media_file_id: None, // Don't pass media file id here, otherwise the pre-existing media would be deleted! + ..Default::default() + }) + .ok(); + + clean_up_rejected_file(&data_dir, &src, Some(media_id.into())).unwrap_or_else( + |e| { + tracing::error!(details=?e, "Cleanup failed."); + }, + ); return Ok(false); } else { bail!("Hash collision?!? Media '{media_id}' already owned by another user '{new_owner}'.") } - }, + } Err(DBError::NotFound()) => { // File exists, but not in DB. Remove files and reprocess. tracing::info!("Dir for '{media_id}' exists, but not in DB. Deleting old dir and reprocessing."); @@ -201,28 +336,43 @@ fn ingest_media_file( if !src_moved.exists() { bail!("Failed to move {:?} file to orig/", src_moved) } - let orig_filename = src.file_name().ok_or(anyhow!("Bad filename: {:?}", src))?.to_string_lossy().into_owned(); + let orig_filename = src + .file_name() + .ok_or(anyhow!("Bad filename: {:?}", src))? + .to_string_lossy() + .into_owned(); // Add to DB tracing::debug!("Adding media file to DB."); - models::MediaFile::insert(&mut db.conn()?, &models::MediaFileInsert { - id: media_id.to_string(), - user_id: md.user_id.clone(), - media_type: Some(md.media_type.as_ref().into()), - recompression_done: None, - thumbs_done: None, - has_thumbnail: None, - thumb_sheet_cols: None, - thumb_sheet_rows: None, - orig_filename: Some(orig_filename.clone()), - title: Some(orig_filename), - total_frames: Some(md.total_frames as i32), - duration: md.duration.to_f32(), - fps: Some(md.fps.to_string()), - raw_metadata_all: Some(md.metadata_all.clone()), - default_subtitle_id: None, - })?; + models::MediaFile::insert( + &mut db.conn()?, + &models::MediaFileInsert { + id: media_id.to_string(), + user_id: md.user_id.clone(), + media_type: Some(md.media_type.as_ref().into()), + recompression_done: None, + thumbs_done: None, + has_thumbnail: None, + thumb_sheet_cols: None, + thumb_sheet_rows: None, + orig_filename: Some(orig_filename.clone()), + title: Some(orig_filename), + total_frames: Some(md.total_frames as i32), + duration: md.duration.to_f32(), + fps: Some(md.fps.to_string()), + raw_metadata_all: Some(md.metadata_all.clone()), + default_subtitle_id: None, + }, + )?; + upload_to_storage_with_progress( + storage, + &src_moved, + user_msg_tx, + &md.user_id, + media_id, + "Uploading to storage", + )?; // Check if it needs recompressing by running the decision script fn run_transcode_decision_script( @@ -289,19 +439,44 @@ fn ingest_media_file( duration: md.duration, }; - let transcode_req = match run_transcode_decision_script(md, target_bitrate, transcode_decision_script)? { - Some((reason, new_bitrate)) => { - let video_dst_prefix = format!("transcoded_br{}_{}", new_bitrate, uuid::Uuid::new_v4()); - cmpr_tx.send(script_processor::CmprInput::Transcode { - video_dst_dir: dir_for_media_file.clone(), - video_dst_prefix, - video_bitrate: new_bitrate, - src: src.clone() - }).map(|_| (true, reason)).context("Error sending file to transcoding") - }, - None => { - tracing::info!("Media OK already, not transcoding."); - Ok((false, "".to_string())) + let transcode_req = match md.transcode_preference { + TranscodePreference::Skip => { + tracing::info!("Transcode preference is Skip, not transcoding."); + Ok((false, "User preference: skip transcode".to_string())) + } + TranscodePreference::Force => { + tracing::info!("Transcode preference is Force, transcoding."); + let video_dst_prefix = format!("transcoded_br{}_{}", target_bitrate, uuid::Uuid::new_v4()); + cmpr_tx + .send(script_processor::CmprInput::Transcode { + video_dst_dir: dir_for_media_file.clone(), + video_dst_prefix, + video_bitrate: target_bitrate, + src: src.clone(), + }) + .map(|_| (true, "User preference: force transcode".to_string())) + .context("Error sending file to transcoding") + } + TranscodePreference::Auto => { + match run_transcode_decision_script(md, target_bitrate, &transcode_decision_script) { + Ok(Some((reason, new_bitrate))) => { + let video_dst_prefix = format!("transcoded_br{}_{}", new_bitrate, uuid::Uuid::new_v4()); + cmpr_tx + .send(script_processor::CmprInput::Transcode { + video_dst_dir: dir_for_media_file.clone(), + video_dst_prefix, + video_bitrate: new_bitrate, + src: src.clone(), + }) + .map(|_| (true, reason)) + .context("Error sending file to transcoding") + } + Ok(None) => { + tracing::info!("Media OK already, not transcoding."); + Ok((false, "".to_string())) + } + Err(e) => Err(e), + } } }; @@ -318,18 +493,20 @@ fn ingest_media_file( media_type: md.media_type.clone(), path: src_moved.clone(), duration: md.duration, - } + }, }) { tracing::error!(details=?e, "Failed to send file to thumbnailing"); if let Err(e) = user_msg_tx.send(UserMessage { - topic: UserMessageTopic::Error, - msg: "Thumbnailing failed.".to_string(), - details: Some(format!("Error sending file to thumbnailing: {}", e)), - user_id: Some(md.user_id.clone()), - media_file_id: Some(media_id.to_string()), - subtitle_id: None, - progress: None - }) { tracing::error!(details=?e, "Failed to send user message") }; + topic: UserMessageTopic::Error, + msg: "Thumbnailing failed.".to_string(), + details: Some(format!("Error sending file to thumbnailing: {}", e)), + user_id: Some(md.user_id.clone()), + media_file_id: Some(media_id.to_string()), + subtitle_id: None, + progress: None, + }) { + tracing::error!(details=?e, "Failed to send user message") + }; }; }; @@ -340,25 +517,36 @@ fn ingest_media_file( user_msg_tx.send(UserMessage { topic: UserMessageTopic::MediaFileAdded, msg: String::new(), - details: Some(serde_json::to_string(&md.upload_cookies).map_err(|e| anyhow!("Error serializing cookies: {}", e))?), + details: Some( + serde_json::to_string(&md.upload_cookies) + .map_err(|e| anyhow!("Error serializing cookies: {}", e))?, + ), user_id: Some(md.user_id.clone()), media_file_id: Some(media_id.to_string()), subtitle_id: None, progress: None, })?; // Tell user in text also - tracing::debug!(transcode=do_transcode, reason=reason, "Media added to DB. Transcode"); + tracing::debug!( + transcode = do_transcode, + reason = reason, + "Media added to DB. Transcode" + ); user_msg_tx.send(UserMessage { topic: UserMessageTopic::Ok, - msg: "Media added.".to_string() + if do_transcode {" Transcoding..."} else {""}, - details: if do_transcode { Some(format!("Transcoding because {reason}")) } else { None }, + msg: "Media file added.".to_string() + if do_transcode { " Transcoding..." } else { "" }, + details: if do_transcode { + Some(format!("Transcoding because {reason}")) + } else { + None + }, user_id: Some(md.user_id.clone()), media_file_id: Some(media_id.to_string()), subtitle_id: None, progress: if do_transcode { Some(0.0) } else { None }, })?; Ok(do_transcode) - }, + } Err(e) => { tracing::error!(details=?e, "Media added to DB, but failed to send to transcoding."); user_msg_tx.send(UserMessage { @@ -375,13 +563,11 @@ fn ingest_media_file( } } - - - pub fn run_forever( db: Arc, terminate_flag: Arc, data_dir: PathBuf, + storage: StorageBackend, user_msg_tx: crossbeam_channel::Sender, poll_interval: f32, resubmit_delay: f32, @@ -404,14 +590,14 @@ pub fn run_forever( // Thread for incoming folder scanner let (_md_thread, from_md, to_md) = { - let (arg_sender, arg_recvr) = unbounded::(); - let (res_sender, res_recvr) = unbounded::(); + let (arg_sender, arg_recvr) = unbounded::(); + let (res_sender, res_recvr) = unbounded::(); - let th = thread::spawn(move || { - metadata_reader::run_forever(arg_recvr, res_sender, 4); - }); - (th, res_recvr, arg_sender) - }; + let th = thread::spawn(move || { + metadata_reader::run_forever(arg_recvr, res_sender, 4); + }); + (th, res_recvr, arg_sender) + }; // Thread for metadata reader let (mon_thread, from_mon, mon_exit) = { @@ -420,15 +606,18 @@ pub fn run_forever( let data_dir = data_dir.clone(); let th = thread::spawn(move || { - if let Err(e) = incoming_monitor::run_forever( - data_dir.clone(), - (data_dir.join("incoming") ).clone(), - poll_interval, resubmit_delay, - incoming_sender, - exit_recvr, - ingest_username_from) { - tracing::error!(details=?e, "Error from incoming monitor."); - }}); + if let Err(e) = incoming_monitor::run_forever( + data_dir.clone(), + (data_dir.join("incoming")).clone(), + poll_interval, + resubmit_delay, + incoming_sender, + exit_recvr, + ingest_username_from, + ) { + tracing::error!(details=?e, "Error from incoming monitor."); + } + }); (th, incoming_recvr, exit_sender) }; @@ -439,29 +628,48 @@ pub fn run_forever( let transcode_script_clone = transcode_script.clone(); let thumbnail_script_clone = thumbnail_script.clone(); thread::spawn(move || { - script_processor::run_forever(cmpr_in_rx, cmpr_out_tx, cmpr_prog_tx, n_workers, transcode_script_clone, thumbnail_script_clone); + script_processor::run_forever( + cmpr_in_rx, + cmpr_out_tx, + cmpr_prog_tx, + n_workers, + transcode_script_clone, + thumbnail_script_clone, + ); }); - // Migration from older version: find a media file that is missing thumbnail sheet - fn legacy_thumbnail_next_media_file(db: &DB, videos_dir: &PathBuf, cmpr_in: &mut crossbeam_channel::Sender) -> Option { + let mut transcode_pending: HashSet = HashSet::new(); - let candidates = db.conn() + // Migration from older version: find a media file that is missing thumbnail sheet + fn legacy_thumbnail_next_media_file( + db: &DB, + videos_dir: &PathBuf, + cmpr_in: &mut crossbeam_channel::Sender, + ) -> Option { + let candidates = db + .conn() .and_then(|mut conn| models::MediaFile::get_all_with_missing_thumbnails(&mut conn)) - .map_err(|e| { tracing::error!(details=?e, "DB: Failed to get media files without thumbnails."); }).ok()?; + .map_err(|e| { + tracing::error!(details=?e, "DB: Failed to get media files without thumbnails."); + }) + .ok()?; if let Some(v) = candidates.first() { tracing::info!(id=%v.id, "Found legacy media file that needs thumbnailing."); let media_file_path = if v.recompression_done.is_some() { - Some(videos_dir.join(&v.id).join("video.mp4")) - } else { - match v.orig_filename { - Some(ref orig_filename) => Some(videos_dir.join(&v.id).join("orig").join(orig_filename)), - None => { - tracing::error!(media_file_id=%v.id, "Legacy thumbnailing failed. Original filename missing and not recompressed."); - None - }} - }; + Some(videos_dir.join(&v.id).join("video.mp4")) + } else { + match v.orig_filename { + Some(ref orig_filename) => { + Some(videos_dir.join(&v.id).join("orig").join(orig_filename)) + } + None => { + tracing::error!(media_file_id=%v.id, "Legacy thumbnailing failed. Original filename missing and not recompressed."); + None + } + } + }; match media_file_path { Some(file_path) => { @@ -477,23 +685,24 @@ pub fn run_forever( media_file_id: v.id.clone(), media_type, path: file_path, - duration: Decimal::from_f32(v.duration.unwrap_or(0.0)).unwrap_or_default(), + duration: Decimal::from_f32(v.duration.unwrap_or(0.0)) + .unwrap_or_default(), }, }; cmpr_in.send(req).unwrap_or_else(|e| { tracing::error!(details=?e, "Error sending legacy thumbnailing request to compressor."); }); return Some(v.id.clone()); - }, + } _ => { tracing::error!(media_file_id=%v.id, "Legacy thumbnailing failed. User ID or orig filename missing."); - }, + } } } None } - let mut legacy_media_file_now_thumnailing = legacy_thumbnail_next_media_file(&db, &media_files_dir, &mut cmpr_in_tx.clone()); - + let mut legacy_media_file_now_thumnailing = + legacy_thumbnail_next_media_file(&db, &media_files_dir, &mut cmpr_in_tx.clone()); let _span = tracing::info_span!("PIPELINE").entered(); loop { @@ -503,7 +712,12 @@ pub fn run_forever( match msg { Ok(msg) => { tracing::debug!("Got upload result. Submitting it for processing. {:?}", msg); - to_md.send(IncomingFile {file_path: msg.file_path.clone(),user_id: msg.user_id, cookies: msg.cookies }).unwrap_or_else(|e| { + to_md.send(IncomingFile { + file_path: msg.file_path.clone(), + user_id: msg.user_id, + cookies: msg.cookies, + transcode_preference: msg.transcode_preference, + }).unwrap_or_else(|e| { tracing::error!("Error sending file to metadata reader: {:?}", e); clean_up_rejected_file(&data_dir, &msg.file_path, None).unwrap_or_else(|e| { tracing::error!("Cleanup of '{:?}' failed: {:?}", &msg.file_path, e); @@ -531,7 +745,7 @@ pub fn run_forever( })) }, Ok(vid) => { - let ing_res = ingest_media_file(&vid, &md, &data_dir, &media_files_dir, target_bitrate, &db, &user_msg_tx, &cmpr_in_tx, &transcode_decision_script).map_err(|e| { + let ing_res = ingest_media_file(&vid, &md, &data_dir, &media_files_dir, &storage, target_bitrate, &db, &user_msg_tx, &transcode_decision_script, &cmpr_in_tx).map_err(|e| { DetailedMsg { msg: "Media ingestion failed".into(), details: e.to_string(), @@ -544,6 +758,13 @@ pub fn run_forever( } MetadataResult::Err(e) => (None, Err(e)) }; + if let (Some(vid), Ok(do_transcode)) = (&vid, &ing_res) { + if *do_transcode { + transcode_pending.insert(vid.clone()); + } else { + transcode_pending.remove(vid); + } + } // Relay errors, if any. // No need to send ok message here, variations of it are sent from ingest_media_file(). if let Err(e) = ing_res { @@ -616,6 +837,12 @@ pub fn run_forever( { let videos_dir = media_files_dir.clone(); let vid = logs.media_file_id.clone(); + let storage = storage.clone(); + let cleanup_vid = vid.clone(); + let cleanup_storage = storage.clone(); + let cleanup_videos_dir = videos_dir.clone(); + let cleanup_db = db.clone(); + transcode_pending.remove(&vid); tracing::info!(media_file=%vid, log_info=%logs.stdout, "Transcoding completed"); @@ -644,6 +871,17 @@ pub fn run_forever( tracing::error!(details=%e, "Failed to create symlink {:?} -> {:?}", symlink_path, video_dst); return false; } + if let Err(e) = upload_to_storage_with_progress( + &storage, + &symlink_path, + &utx, + &user_id, + &vid, + "Uploading transcoded video", + ) { + tracing::error!(details=%e, "Failed to upload transcoded file to object storage"); + return false; + } if let Err(e) = db.conn().and_then(|mut conn| models::MediaFile::set_recompressed(&mut conn, &vid)) { tracing::error!(details=%e, "Error marking media file as recompressed in DB"); @@ -673,12 +911,21 @@ pub fn run_forever( subtitle_id: None, progress: Some(1.0) }).unwrap_or_else(|e| { tracing::error!(details=%e, "Error sending user message"); }); + + maybe_cleanup_local_media( + &cleanup_storage, + &cleanup_videos_dir, + &cleanup_vid, + transcode_pending.contains(&cleanup_vid), + &cleanup_db, + ); }, ThumbsSuccess { thumb_dir, thumb_sheet_dims, logs } => { let videos_dir = media_files_dir.clone(); let vid = logs.media_file_id.clone(); + let storage = storage.clone(); let mut db_errors = false; // Thumbnails (and/or sheet) done? @@ -704,6 +951,12 @@ pub fn run_forever( } } } + if let Some(dir) = thumb_dir { + storage.upload_if_exists(&dir.join("thumb.webp")); + if let Some((sheet_cols, sheet_rows)) = thumb_sheet_dims { + storage.upload_if_exists(&dir.join(format!("sheet-{}x{}.webp", sheet_cols, sheet_rows))); + } + } // Send MediaFileUpdated message to user user_msg_tx.send(UserMessage { @@ -724,6 +977,14 @@ pub fn run_forever( tracing::error!(details=%e, "Error storing thumbs_done in DB"); } } + + maybe_cleanup_local_media( + &storage, + &videos_dir, + &vid, + transcode_pending.contains(&vid), + &db, + ); }, TranscodeFailure { logs, .. } | @@ -761,7 +1022,7 @@ pub fn run_forever( drop(mon_exit); terminate_flag.store(true, std::sync::atomic::Ordering::Relaxed); match mon_thread.join() { - Ok(_) => {}, + Ok(_) => {} Err(e) => { tracing::error!("Error waiting for monitor thread to exit: {:?}", e); }