Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 212 additions & 1 deletion backend/app/public_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
import json
import logging
import tempfile
import threading
import time
import uuid
from collections import defaultdict
from datetime import UTC, datetime, timedelta
from pathlib import Path
from typing import Annotated
from typing import Annotated, Literal
from urllib.request import Request as UrlRequest
from urllib.request import urlopen

Expand Down Expand Up @@ -58,6 +59,21 @@ class PublicExportResponse(BaseModel):
expires_at: datetime


PublicExportJobStatus = Literal["running", "done", "failed", "canceled"]


class PublicExportJobResponse(BaseModel):
job_id: str
export_id: str
status: PublicExportJobStatus
progress: int = 0
message: str = ""
download_url: str | None = None
report_url: str | None = None
expires_at: datetime
error_code: str | None = None


def client_ip(request: Request) -> str:
forwarded = request.headers.get("x-forwarded-for", "")
if forwarded:
Expand Down Expand Up @@ -195,6 +211,85 @@ def public_export_docx(request: PublicExportDocxRequest) -> PublicExportResponse
)


@router.post("/export-jobs/docx", response_model=PublicExportJobResponse)
def create_public_export_job(request: PublicExportDocxRequest) -> PublicExportJobResponse:
digest = thesis_digest(request.thesis.model_dump_json())
if not verify_export_token(request.export_token, digest):
raise AppError("EXPORT_TOKEN_INVALID", "导出凭证已失效,请重新预检后再导出。", status_code=403)

expires_at = datetime.now(UTC).replace(tzinfo=None) + timedelta(seconds=PUBLIC_EXPORT_RETENTION_SECONDS)
job_id = f"job_{uuid.uuid4().hex[:16]}"
export_id = f"pub_{uuid.uuid4().hex[:16]}"
request_payload = {
"thesis": request.thesis.model_dump(mode="json"),
"export_token": request.export_token,
}
storage.put_bytes(_job_request_key(job_id), json.dumps(request_payload, ensure_ascii=False).encode("utf-8"))
_write_job_meta(
job_id,
{
"job_id": job_id,
"export_id": export_id,
"status": "running",
"progress": 5,
"message": "导出任务已创建。",
"download_url": None,
"report_url": None,
"expires_at": expires_at.isoformat(),
"error_code": None,
"cancel_requested": False,
},
)
thread = threading.Thread(target=_run_public_export_job, args=(job_id,), daemon=True)
thread.start()
return _public_job_response(_read_job_meta(job_id))


@router.get("/export-jobs/{job_id}", response_model=PublicExportJobResponse)
def get_public_export_job(job_id: str) -> PublicExportJobResponse:
return _public_job_response(_read_job_meta(job_id))


@router.post("/export-jobs/{job_id}/cancel", response_model=PublicExportJobResponse)
def cancel_public_export_job(job_id: str) -> PublicExportJobResponse:
meta = _read_job_meta(job_id)
if meta["status"] in {"done", "failed", "canceled"}:
return _public_job_response(meta)
meta["cancel_requested"] = True
meta["status"] = "canceled"
meta["message"] = "导出已取消,可重新导出。"
meta["error_code"] = "EXPORT_CANCELED"
_write_job_meta(job_id, meta)
return _public_job_response(meta)


@router.post("/export-jobs/{job_id}/retry", response_model=PublicExportJobResponse)
def retry_public_export_job(job_id: str) -> PublicExportJobResponse:
meta = _read_job_meta(job_id)
if meta["status"] not in {"failed", "canceled"}:
raise AppError("EXPORT_JOB_NOT_RETRYABLE", "当前导出任务尚未进入可重试状态。", status_code=409)
payload = json.loads(storage.get_bytes(_job_request_key(job_id)).decode("utf-8"))
thesis = NormalizedThesis.model_validate(payload["thesis"])
digest = thesis_digest(thesis.model_dump_json())
if not verify_export_token(payload["export_token"], digest):
raise AppError("EXPORT_TOKEN_INVALID", "导出凭证已失效,请重新预检后再导出。", status_code=403)
meta.update(
{
"status": "running",
"progress": 5,
"message": "正在重新导出。",
"download_url": None,
"report_url": None,
"error_code": None,
"cancel_requested": False,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Prevent canceled job retry from reviving prior worker

retry_public_export_job flips cancel_requested back to False immediately, but the original worker thread may still be executing export_docx after a cancel. In that window, retry starts a second thread on the same job_id, and the first thread will now observe no cancellation and continue writing progress/artifacts, creating a race where two workers update the same job/export state unpredictably. Gate retry until the previous attempt is definitively finished, or version attempts so stale workers cannot resume after a retry.

Useful? React with 👍 / 👎.

}
)
_write_job_meta(job_id, meta)
thread = threading.Thread(target=_run_public_export_job, args=(job_id,), daemon=True)
thread.start()
return _public_job_response(meta)


@router.get("/exports/{export_id}/download")
def download_public_export(export_id: str) -> Response:
meta = _read_valid_meta(export_id)
Expand Down Expand Up @@ -240,3 +335,119 @@ def _read_valid_meta(export_id: str) -> dict:
storage.delete_prefix(f"public/exports/{export_id}")
raise AppError("EXPORT_EXPIRED", "导出文件已超过 30 分钟保留期,请重新生成。", status_code=410)
return meta


def _job_meta_key(job_id: str) -> str:
return f"public/export-jobs/{job_id}/meta.json"


def _job_request_key(job_id: str) -> str:
return f"public/export-jobs/{job_id}/request.json"


def _write_job_meta(job_id: str, meta: dict) -> None:
storage.put_bytes(_job_meta_key(job_id), json.dumps(meta, ensure_ascii=False).encode("utf-8"))


def _read_job_meta(job_id: str) -> dict:
try:
meta = json.loads(storage.get_bytes(_job_meta_key(job_id)).decode("utf-8"))
except FileNotFoundError as exc:
raise AppError("EXPORT_JOB_NOT_FOUND", "导出任务不存在或已删除。", status_code=404) from exc
expires_at = datetime.fromisoformat(meta["expires_at"])
if expires_at < datetime.now(UTC).replace(tzinfo=None):
storage.delete_prefix(f"public/export-jobs/{job_id}")
if meta.get("export_id"):
storage.delete_prefix(f"public/exports/{meta['export_id']}")
raise AppError("EXPORT_EXPIRED", "导出任务已超过 30 分钟保留期,请重新生成。", status_code=410)
return meta


def _public_job_response(meta: dict) -> PublicExportJobResponse:
return PublicExportJobResponse(
job_id=meta["job_id"],
export_id=meta["export_id"],
status=meta["status"],
progress=max(0, min(100, int(meta.get("progress", 0)))),
message=meta.get("message", ""),
download_url=meta.get("download_url"),
report_url=meta.get("report_url"),
expires_at=datetime.fromisoformat(meta["expires_at"]),
error_code=meta.get("error_code"),
)


def _job_cancel_requested(job_id: str) -> bool:
try:
meta = _read_job_meta(job_id)
except AppError:
return True
return bool(meta.get("cancel_requested")) or meta.get("status") == "canceled"


def _update_job(job_id: str, **patch: object) -> dict:
meta = _read_job_meta(job_id)
if meta.get("status") in {"done", "failed", "canceled"} and patch.get("status") not in {meta.get("status"), None}:
return meta
meta.update(patch)
_write_job_meta(job_id, meta)
return meta


def _run_public_export_job(job_id: str) -> None:
try:
if _job_cancel_requested(job_id):
return
_update_job(job_id, progress=18, message="正在准备导出参数。")
payload = json.loads(storage.get_bytes(_job_request_key(job_id)).decode("utf-8"))
thesis = NormalizedThesis.model_validate(payload["thesis"])
digest = thesis_digest(thesis.model_dump_json())
if not verify_export_token(payload["export_token"], digest):
_update_job(
job_id,
status="failed",
progress=100,
message="导出凭证已失效,请重新预检后再导出。",
error_code="EXPORT_TOKEN_INVALID",
)
return
if _job_cancel_requested(job_id):
return
_update_job(job_id, progress=46, message="正在生成 Word 文件。")
payload_bytes = export_docx(thesis)
if _job_cancel_requested(job_id):
return
_update_job(job_id, progress=82, message="正在保存导出文件。")
meta = _read_job_meta(job_id)
expires_at = datetime.fromisoformat(meta["expires_at"])
export_id = meta["export_id"]
safe_title = (thesis.cover.title.strip() or "SC-TH-export").replace("/", "-")[:40]
filename = f"{safe_title}.docx"
docx_key = f"public/exports/{export_id}/{filename}"
report_key = f"public/exports/{export_id}/self-check-report.json"
storage.put_bytes(docx_key, payload_bytes)
storage.put_bytes(report_key, _public_report_payload(export_id, thesis, expires_at))
_write_meta(export_id, {"expires_at": expires_at.isoformat(), "docx_key": docx_key, "report_key": report_key, "filename": filename})
if _job_cancel_requested(job_id):
storage.delete_prefix(f"public/exports/{export_id}")
return
_update_job(
job_id,
status="done",
progress=100,
message="导出完成。",
download_url=f"/api/public/exports/{export_id}/download",
report_url=f"/api/public/exports/{export_id}/report",
error_code=None,
)
except AppError as exc:
try:
_update_job(job_id, status="failed", progress=100, message=exc.message, error_code=exc.code)
except AppError:
return
except Exception:
logger.exception("public export job failed job_id=%s", job_id)
try:
_update_job(job_id, status="failed", progress=100, message="导出失败,请稍后重试。", error_code="EXPORT_FAILED")
except AppError:
return
23 changes: 23 additions & 0 deletions backend/app/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def cleanup_expired_exports() -> int:
db.commit()
storage.delete_prefix("public/uploads")
cleanup_public_exports()
cleanup_public_export_jobs()
return deleted


Expand All @@ -50,11 +51,33 @@ def cleanup_public_exports() -> int:
return deleted


def cleanup_public_export_jobs() -> int:
storage_root = Path(os.getenv("SCNU_STORAGE_DIR", str(OUTPUTS_DIR / "storage")))
root = storage_root / "public" / "export-jobs"
if not root.exists():
return 0
now = datetime.now(UTC).replace(tzinfo=None)
deleted = 0
for meta_path in root.glob("*/meta.json"):
try:
meta = json.loads(meta_path.read_text(encoding="utf-8"))
expires_at = datetime.fromisoformat(meta["expires_at"])
except Exception:
continue
if expires_at < now:
storage.delete_prefix(f"public/export-jobs/{meta_path.parent.name}")
if meta.get("export_id"):
storage.delete_prefix(f"public/exports/{meta['export_id']}")
deleted += 1
return deleted


def main() -> None:
init_db()
print("SCNU Workbench worker ready. Queue integration is reserved for Celery/Redis jobs.")
while True:
cleanup_expired_exports()
cleanup_public_export_jobs()
time.sleep(60)


Expand Down
52 changes: 52 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ When `SCNU_ACCESS_CODE` is set, all `/api/*` routes require the access cookie ex
- `POST /api/public/precheck/docx`
- `POST /api/public/precheck/text`
- `POST /api/public/exports/docx`
- `POST /api/public/export-jobs/docx`
- `GET /api/public/export-jobs/{id}`
- `POST /api/public/export-jobs/{id}/cancel`
- `POST /api/public/export-jobs/{id}/retry`
- `GET /api/public/exports/{id}/download`
- `GET /api/public/exports/{id}/report`
- `GET /api/access-code/status`
Expand Down Expand Up @@ -54,6 +58,54 @@ JSON request:

Returns a retained export with `download_url`, `report_url`, and `expires_at`. Public exports are kept for 30 minutes.

This synchronous endpoint is retained for compatibility. The public UI uses the Job endpoints below.

### `POST /api/public/export-jobs/docx`

JSON request:

```json
{
"thesis": {},
"export_token": "..."
}
```

Creates a background export job and returns:

```json
{
"job_id": "job_...",
"export_id": "pub_...",
"status": "running",
"progress": 5,
"message": "导出任务已创建。",
"download_url": null,
"report_url": null,
"expires_at": "2026-04-21T12:30:00",
"error_code": null
}
```

Job status values:

- `running`
- `done`
- `failed`
- `canceled`

### `GET /api/public/export-jobs/{id}`

Returns the latest persisted job status. When `status=done`, `download_url` and `report_url` are available.

### `POST /api/public/export-jobs/{id}/cancel`

Requests cancellation and returns the persisted job status. Cancellation is cooperative: if the export has already completed, the completed state is returned.

### `POST /api/public/export-jobs/{id}/retry`

Retries a failed or canceled job with the original request payload while the original `export_token` remains valid.

### `GET /api/access-code/status`

Returns:
Expand Down
Loading
Loading