Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@
* <p><strong>R2: per-key rate limiting.</strong> After successful auth, the
* filter checks the sliding-window limiter. Exceeding
* {@code rateLimitPerMin} returns 429.
*
* <p><strong>R5 / R7: SSE token fallback is scope-limited.</strong> The
* {@code ?token=} query param is accepted <em>only</em> on SSE stream paths
* ({@link #isSseStreamPath}), where browser EventSource cannot set an
* Authorization header (R7). It is rejected on every other path so the API
* key never leaks into access / proxy logs for normal calls (R5). Application
* logging here uses {@code getRequestURI()} (no query string), so the key does
* not reach app logs — but a reverse proxy may still log the query string, so
* keep the fallback as narrow as possible.
*/
@Slf4j
@Component
Expand All @@ -48,9 +57,12 @@ protected void doFilterInternal(HttpServletRequest request,
return;
}

String token = extractBearerToken(request);
boolean sse = isSseStreamPath(request);
String token = extractToken(request, sse);
if (!StringUtils.hasText(token)) {
sendUnauthorized(response, "Missing API key");
sendUnauthorized(response, sse
? "Missing API key (Authorization header or ?token= for EventSource)"
: "Missing API key");
return;
}

Expand All @@ -62,8 +74,12 @@ protected void doFilterInternal(HttpServletRequest request,

KbApiKeyContext context = authResult.get().context();

// R2: rate limit check
if (!rateLimiter.tryAcquire(context.keyId(), context.rateLimitPerMin(), Instant.now())) {
// R2: rate limit check — but NOT on the SSE stream path. EventSource
// reconnects/heartbeats would otherwise burn the per-minute window and
// can 429 the key's own POST /research start. Rate limiting belongs on
// the cost-producing endpoints (start/status/cancel), not the progress
// subscription. Per-key concurrency is still enforced upstream.
if (!sse && !rateLimiter.tryAcquire(context.keyId(), context.rateLimitPerMin(), Instant.now())) {
sendTooManyRequests(response, context.rateLimitPerMin());
return;
}
Expand All @@ -82,14 +98,33 @@ private boolean isOpenApiPath(HttpServletRequest request) {
return uri.startsWith("/api/v1/open/kb/");
}

private String extractBearerToken(HttpServletRequest request) {
/**
* SSE progress stream paths — the only place {@code ?token=} is accepted,
* because browser EventSource cannot set an Authorization header (R7).
* Matched on URI suffix + content type so the fallback tracks whichever
* endpoints expose SSE, without hard-coding a single kbId/sessionId.
*/
private boolean isSseStreamPath(HttpServletRequest request) {
String uri = request.getRequestURI();
return uri.startsWith("/api/v1/open/kb/") && uri.endsWith("/stream");
}

/**
* Extract the API key. Header is always accepted; the {@code ?token=}
* query param is accepted <em>only</em> on SSE stream paths ({@code sse}),
* to keep the key out of access/proxy logs on every other request (R5).
*/
private String extractToken(HttpServletRequest request, boolean sse) {
String bearer = request.getHeader("Authorization");
if (StringUtils.hasText(bearer) && bearer.startsWith("Bearer ")) {
return bearer.substring(7).trim();
}
// TODO: add ?token= SSE fallback once Deep Research SSE endpoint is live.
// EventSource can't set custom headers; for now P0-A has no SSE path so
// query param would leak the key into access / proxy logs (R5).
if (sse) {
String queryToken = request.getParameter("token");
if (StringUtils.hasText(queryToken)) {
return queryToken.trim();
}
}
return null;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
package vip.mate.kbopen.controller;

import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.servlet.http.HttpServletRequest;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import vip.mate.channel.web.ChatStreamTracker;
import vip.mate.channel.web.Utf8SseEmitter;
import vip.mate.common.result.R;
import vip.mate.exception.MateClawException;
import vip.mate.kbopen.auth.KbApiKeyContext;
import vip.mate.kbopen.auth.RequireKbScope;
import vip.mate.kbopen.research.KbResearchSessionRegistry;
import vip.mate.kbopen.research.KbResearchSessionRegistry.Session;
import vip.mate.kbopen.research.KbResearchSessionRegistry.Status;
import vip.mate.kbopen.research.KbResearchSessionRegistry.TooManyConcurrentException;
import vip.mate.wiki.service.WikiKnowledgeBaseService;
import vip.mate.wiki.service.WikiResearchService;
import vip.mate.wiki.service.WikiResearchService.ResearchResult;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* KB Open API — Deep Research endpoints.
*
* <p>Unlike the synchronous read endpoints, research is async (multi-step LLM
* pipeline) with SSE progress. The start endpoint returns a sessionId; the
* caller subscribes to SSE for progress, or polls status for the final result.
*
* <p>R7: the SSE endpoint uses {@code ?token=} query param because browser
* EventSource cannot set Authorization headers. The {@code KbOpenApiAuthFilter}
* already falls back to query param tokens.
*
* <h3>Cost &amp; lifecycle controls (review #446)</h3>
* <ul>
* <li>Cancel is <b>cooperative</b>: it calls {@link ChatStreamTracker#requestStop}
* so {@link WikiResearchService} bails at the next stage boundary — the
* draft fan-out and compose LLM calls are skipped, not run to completion.</li>
* <li>CANCELLED is a <b>sticky terminal</b>: a late complete() after cancel is
* a no-op in the registry, so the user never sees a COMPLETED report.</li>
* <li>Per-key <b>concurrency cap</b> ({@code mate.kbopen.research.max-concurrent-per-key})
* stops one key from spawning unbounded parallel pipelines → 429.</li>
* </ul>
*/
@Slf4j
@Tag(name = "KB Open API — Deep Research")
@RestController
@RequestMapping("/api/v1/open/kb")
@RequiredArgsConstructor
public class KbOpenResearchController {

private final WikiResearchService researchService;
private final WikiKnowledgeBaseService kbService;
private final ChatStreamTracker streamTracker;
private final KbResearchSessionRegistry sessionRegistry;

private static final ExecutorService RESEARCH_EXEC = Executors.newVirtualThreadPerTaskExecutor();

// ── POST /{kbId}/research — start ─────────────────────────────────────

@RequireKbScope("kb:search")
@PostMapping("/{kbId}/research")
@Operation(summary = "Start Deep Research (async, returns sessionId)")
public R<Map<String, Object>> startResearch(
@PathVariable Long kbId,
@RequestBody ResearchRequest req,
HttpServletRequest request) {
KbApiKeyContext ctx = requireContext(request);
String topic = req.topic();
if (topic == null || topic.isBlank()) {
throw new MateClawException(400, "topic is required");
}
if (kbService.getById(kbId) == null) {
throw new MateClawException(404, "Knowledge base not found: " + kbId);
}

String sessionId = "open-research-" + UUID.randomUUID();
streamTracker.register(sessionId);
streamTracker.incrementFlux(sessionId);
// Per-key concurrency cap (DoS / runaway-cost guard on top of the
// per-minute rate limiter). Throws → 429 below.
try {
sessionRegistry.startIfAllowed(sessionId, ctx.keyId(), kbId, topic);
} catch (TooManyConcurrentException e) {
try { streamTracker.complete(sessionId); } catch (Exception ignored) {}
throw new MateClawException(429, e.getMessage());
}

RESEARCH_EXEC.submit(() -> {
try {
ResearchResult result = researchService.research(kbId, topic, sessionId, req.topKPerQuestion());
// complete() is a no-op if the user already cancelled — the
// sticky CANCELLED terminal wins over a late COMPLETED.
sessionRegistry.complete(sessionId, result);
} catch (Exception e) {
log.error("[KbOpenResearch] Failed sessionId={}: {}", sessionId, e.getMessage(), e);
sessionRegistry.fail(sessionId, e.getMessage());
} finally {
try { streamTracker.broadcast(sessionId, "done", "{}"); } catch (Exception ignored) {}
try { streamTracker.complete(sessionId); } catch (Exception ignored) {}
}
});

return R.ok(Map.of(
"sessionId", sessionId,
"kbId", kbId,
"topic", topic,
"streamUrl", "/api/v1/open/kb/" + kbId + "/research/" + sessionId + "/stream"));
}

public record ResearchRequest(String topic, Integer topKPerQuestion) {}

// ── GET /{kbId}/research/{sessionId}/stream — SSE ─────────────────────

@RequireKbScope("kb:search")
@GetMapping(value = "/{kbId}/research/{sessionId}/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@Operation(summary = "Subscribe to research SSE progress (use ?token= for EventSource)")
public SseEmitter stream(
@PathVariable Long kbId,
@PathVariable String sessionId,
HttpServletRequest request) {
requireSessionOwnership(request, sessionId);

SseEmitter emitter = new Utf8SseEmitter(10 * 60 * 1000L);
boolean attached = streamTracker.attach(sessionId, emitter);
if (!attached) {
try {
emitter.send(SseEmitter.event().name("error")
.data("{\"message\":\"session not found or already ended\"}"));
emitter.complete();
} catch (Exception ignored) {}
}
emitter.onCompletion(() -> streamTracker.detach(sessionId, emitter));
emitter.onTimeout(() -> streamTracker.detach(sessionId, emitter));
emitter.onError(err -> streamTracker.detach(sessionId, emitter));
return emitter;
}

// ── GET /{kbId}/research/{sessionId}/status — query result ────────────

@RequireKbScope("kb:search")
@GetMapping("/{kbId}/research/{sessionId}/status")
@Operation(summary = "Query research status / final result")
public R<Map<String, Object>> status(
@PathVariable Long kbId,
@PathVariable String sessionId,
HttpServletRequest request) {
Session session = requireSessionOwnership(request, sessionId);
Status status = session.status();
ResearchResult result = session.result();

Map<String, Object> data = new LinkedHashMap<>();
data.put("sessionId", sessionId);
data.put("status", status.name().toLowerCase());
data.put("topic", session.topic());
if (result != null) {
data.put("report", result.report());
data.put("sections", result.sections().size());
}
if (session.error() != null) {
data.put("error", session.error());
}
return R.ok(data);
}

// ── POST /{kbId}/research/{sessionId}/cancel — cancel ─────────────────

@RequireKbScope("kb:search")
@PostMapping("/{kbId}/research/{sessionId}/cancel")
@Operation(summary = "Cancel a running research session")
public R<Map<String, Object>> cancel(
@PathVariable Long kbId,
@PathVariable String sessionId,
HttpServletRequest request) {
Session session = requireSessionOwnership(request, sessionId);
if (session.status() != Status.RUNNING) {
throw new MateClawException(409, "Session is not running (status: " + session.status() + ")");
}
// Cooperative cancellation: signal the running pipeline to bail at the
// next stage boundary (plan→draft, draft→compose, and inside the draft
// fan-out) rather than running LLM calls to completion.
streamTracker.requestStop(sessionId);
sessionRegistry.cancel(sessionId);
// Close the SSE stream so subscribers detach immediately.
try {
streamTracker.broadcast(sessionId, "cancelled", "{\"message\":\"cancelled by user\"}");
streamTracker.broadcast(sessionId, "done", "{}");
streamTracker.complete(sessionId);
} catch (Exception ignored) {}
return R.ok(Map.of("sessionId", sessionId, "status", "cancelled"));
}

// ── Auth helpers ──────────────────────────────────────────────────────

private KbApiKeyContext requireContext(HttpServletRequest request) {
KbApiKeyContext ctx = (KbApiKeyContext) request.getAttribute(KbApiKeyContext.ATTR);
if (ctx == null) {
throw new MateClawException(401, "Authentication required");
}
return ctx;
}

private Session requireSessionOwnership(HttpServletRequest request, String sessionId) {
KbApiKeyContext ctx = requireContext(request);
Optional<Session> session = sessionRegistry.get(sessionId);
if (session.isEmpty()) {
throw new MateClawException(404, "Research session not found: " + sessionId);
}
// A caller can only access sessions they started
if (!session.get().keyId().equals(ctx.keyId())) {
throw new MateClawException(403, "Session does not belong to this API key");
}
return session.get();
}
}
Loading