diff --git a/mateclaw-server/src/main/java/vip/mate/kbopen/auth/KbOpenApiAuthFilter.java b/mateclaw-server/src/main/java/vip/mate/kbopen/auth/KbOpenApiAuthFilter.java index d01777c0c..3328806b7 100644 --- a/mateclaw-server/src/main/java/vip/mate/kbopen/auth/KbOpenApiAuthFilter.java +++ b/mateclaw-server/src/main/java/vip/mate/kbopen/auth/KbOpenApiAuthFilter.java @@ -29,6 +29,15 @@ *

R2: per-key rate limiting. After successful auth, the * filter checks the sliding-window limiter. Exceeding * {@code rateLimitPerMin} returns 429. + * + *

R5 / R7: SSE token fallback is scope-limited. The + * {@code ?token=} query param is accepted only on SSE stream paths + * ({@link #isSseStreamPath}), where browser EventSource cannot set an + * Authorization header (R7). It is rejected on every other path so the API + * key never leaks into access / proxy logs for normal calls (R5). Application + * logging here uses {@code getRequestURI()} (no query string), so the key does + * not reach app logs — but a reverse proxy may still log the query string, so + * keep the fallback as narrow as possible. */ @Slf4j @Component @@ -48,9 +57,12 @@ protected void doFilterInternal(HttpServletRequest request, return; } - String token = extractBearerToken(request); + boolean sse = isSseStreamPath(request); + String token = extractToken(request, sse); if (!StringUtils.hasText(token)) { - sendUnauthorized(response, "Missing API key"); + sendUnauthorized(response, sse + ? "Missing API key (Authorization header or ?token= for EventSource)" + : "Missing API key"); return; } @@ -62,8 +74,12 @@ protected void doFilterInternal(HttpServletRequest request, KbApiKeyContext context = authResult.get().context(); - // R2: rate limit check - if (!rateLimiter.tryAcquire(context.keyId(), context.rateLimitPerMin(), Instant.now())) { + // R2: rate limit check — but NOT on the SSE stream path. EventSource + // reconnects/heartbeats would otherwise burn the per-minute window and + // can 429 the key's own POST /research start. Rate limiting belongs on + // the cost-producing endpoints (start/status/cancel), not the progress + // subscription. Per-key concurrency is still enforced upstream. + if (!sse && !rateLimiter.tryAcquire(context.keyId(), context.rateLimitPerMin(), Instant.now())) { sendTooManyRequests(response, context.rateLimitPerMin()); return; } @@ -82,14 +98,33 @@ private boolean isOpenApiPath(HttpServletRequest request) { return uri.startsWith("/api/v1/open/kb/"); } - private String extractBearerToken(HttpServletRequest request) { + /** + * SSE progress stream paths — the only place {@code ?token=} is accepted, + * because browser EventSource cannot set an Authorization header (R7). + * Matched on URI suffix + content type so the fallback tracks whichever + * endpoints expose SSE, without hard-coding a single kbId/sessionId. + */ + private boolean isSseStreamPath(HttpServletRequest request) { + String uri = request.getRequestURI(); + return uri.startsWith("/api/v1/open/kb/") && uri.endsWith("/stream"); + } + + /** + * Extract the API key. Header is always accepted; the {@code ?token=} + * query param is accepted only on SSE stream paths ({@code sse}), + * to keep the key out of access/proxy logs on every other request (R5). + */ + private String extractToken(HttpServletRequest request, boolean sse) { String bearer = request.getHeader("Authorization"); if (StringUtils.hasText(bearer) && bearer.startsWith("Bearer ")) { return bearer.substring(7).trim(); } - // TODO: add ?token= SSE fallback once Deep Research SSE endpoint is live. - // EventSource can't set custom headers; for now P0-A has no SSE path so - // query param would leak the key into access / proxy logs (R5). + if (sse) { + String queryToken = request.getParameter("token"); + if (StringUtils.hasText(queryToken)) { + return queryToken.trim(); + } + } return null; } diff --git a/mateclaw-server/src/main/java/vip/mate/kbopen/controller/KbOpenResearchController.java b/mateclaw-server/src/main/java/vip/mate/kbopen/controller/KbOpenResearchController.java new file mode 100644 index 000000000..5be1986b8 --- /dev/null +++ b/mateclaw-server/src/main/java/vip/mate/kbopen/controller/KbOpenResearchController.java @@ -0,0 +1,224 @@ +package vip.mate.kbopen.controller; + +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.tags.Tag; +import jakarta.servlet.http.HttpServletRequest; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.*; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; +import vip.mate.channel.web.ChatStreamTracker; +import vip.mate.channel.web.Utf8SseEmitter; +import vip.mate.common.result.R; +import vip.mate.exception.MateClawException; +import vip.mate.kbopen.auth.KbApiKeyContext; +import vip.mate.kbopen.auth.RequireKbScope; +import vip.mate.kbopen.research.KbResearchSessionRegistry; +import vip.mate.kbopen.research.KbResearchSessionRegistry.Session; +import vip.mate.kbopen.research.KbResearchSessionRegistry.Status; +import vip.mate.kbopen.research.KbResearchSessionRegistry.TooManyConcurrentException; +import vip.mate.wiki.service.WikiKnowledgeBaseService; +import vip.mate.wiki.service.WikiResearchService; +import vip.mate.wiki.service.WikiResearchService.ResearchResult; + +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * KB Open API — Deep Research endpoints. + * + *

Unlike the synchronous read endpoints, research is async (multi-step LLM + * pipeline) with SSE progress. The start endpoint returns a sessionId; the + * caller subscribes to SSE for progress, or polls status for the final result. + * + *

R7: the SSE endpoint uses {@code ?token=} query param because browser + * EventSource cannot set Authorization headers. The {@code KbOpenApiAuthFilter} + * already falls back to query param tokens. + * + *

Cost & lifecycle controls (review #446)

+ * + */ +@Slf4j +@Tag(name = "KB Open API — Deep Research") +@RestController +@RequestMapping("/api/v1/open/kb") +@RequiredArgsConstructor +public class KbOpenResearchController { + + private final WikiResearchService researchService; + private final WikiKnowledgeBaseService kbService; + private final ChatStreamTracker streamTracker; + private final KbResearchSessionRegistry sessionRegistry; + + private static final ExecutorService RESEARCH_EXEC = Executors.newVirtualThreadPerTaskExecutor(); + + // ── POST /{kbId}/research — start ───────────────────────────────────── + + @RequireKbScope("kb:search") + @PostMapping("/{kbId}/research") + @Operation(summary = "Start Deep Research (async, returns sessionId)") + public R> startResearch( + @PathVariable Long kbId, + @RequestBody ResearchRequest req, + HttpServletRequest request) { + KbApiKeyContext ctx = requireContext(request); + String topic = req.topic(); + if (topic == null || topic.isBlank()) { + throw new MateClawException(400, "topic is required"); + } + if (kbService.getById(kbId) == null) { + throw new MateClawException(404, "Knowledge base not found: " + kbId); + } + + String sessionId = "open-research-" + UUID.randomUUID(); + streamTracker.register(sessionId); + streamTracker.incrementFlux(sessionId); + // Per-key concurrency cap (DoS / runaway-cost guard on top of the + // per-minute rate limiter). Throws → 429 below. + try { + sessionRegistry.startIfAllowed(sessionId, ctx.keyId(), kbId, topic); + } catch (TooManyConcurrentException e) { + try { streamTracker.complete(sessionId); } catch (Exception ignored) {} + throw new MateClawException(429, e.getMessage()); + } + + RESEARCH_EXEC.submit(() -> { + try { + ResearchResult result = researchService.research(kbId, topic, sessionId, req.topKPerQuestion()); + // complete() is a no-op if the user already cancelled — the + // sticky CANCELLED terminal wins over a late COMPLETED. + sessionRegistry.complete(sessionId, result); + } catch (Exception e) { + log.error("[KbOpenResearch] Failed sessionId={}: {}", sessionId, e.getMessage(), e); + sessionRegistry.fail(sessionId, e.getMessage()); + } finally { + try { streamTracker.broadcast(sessionId, "done", "{}"); } catch (Exception ignored) {} + try { streamTracker.complete(sessionId); } catch (Exception ignored) {} + } + }); + + return R.ok(Map.of( + "sessionId", sessionId, + "kbId", kbId, + "topic", topic, + "streamUrl", "/api/v1/open/kb/" + kbId + "/research/" + sessionId + "/stream")); + } + + public record ResearchRequest(String topic, Integer topKPerQuestion) {} + + // ── GET /{kbId}/research/{sessionId}/stream — SSE ───────────────────── + + @RequireKbScope("kb:search") + @GetMapping(value = "/{kbId}/research/{sessionId}/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + @Operation(summary = "Subscribe to research SSE progress (use ?token= for EventSource)") + public SseEmitter stream( + @PathVariable Long kbId, + @PathVariable String sessionId, + HttpServletRequest request) { + requireSessionOwnership(request, sessionId); + + SseEmitter emitter = new Utf8SseEmitter(10 * 60 * 1000L); + boolean attached = streamTracker.attach(sessionId, emitter); + if (!attached) { + try { + emitter.send(SseEmitter.event().name("error") + .data("{\"message\":\"session not found or already ended\"}")); + emitter.complete(); + } catch (Exception ignored) {} + } + emitter.onCompletion(() -> streamTracker.detach(sessionId, emitter)); + emitter.onTimeout(() -> streamTracker.detach(sessionId, emitter)); + emitter.onError(err -> streamTracker.detach(sessionId, emitter)); + return emitter; + } + + // ── GET /{kbId}/research/{sessionId}/status — query result ──────────── + + @RequireKbScope("kb:search") + @GetMapping("/{kbId}/research/{sessionId}/status") + @Operation(summary = "Query research status / final result") + public R> status( + @PathVariable Long kbId, + @PathVariable String sessionId, + HttpServletRequest request) { + Session session = requireSessionOwnership(request, sessionId); + Status status = session.status(); + ResearchResult result = session.result(); + + Map data = new LinkedHashMap<>(); + data.put("sessionId", sessionId); + data.put("status", status.name().toLowerCase()); + data.put("topic", session.topic()); + if (result != null) { + data.put("report", result.report()); + data.put("sections", result.sections().size()); + } + if (session.error() != null) { + data.put("error", session.error()); + } + return R.ok(data); + } + + // ── POST /{kbId}/research/{sessionId}/cancel — cancel ───────────────── + + @RequireKbScope("kb:search") + @PostMapping("/{kbId}/research/{sessionId}/cancel") + @Operation(summary = "Cancel a running research session") + public R> cancel( + @PathVariable Long kbId, + @PathVariable String sessionId, + HttpServletRequest request) { + Session session = requireSessionOwnership(request, sessionId); + if (session.status() != Status.RUNNING) { + throw new MateClawException(409, "Session is not running (status: " + session.status() + ")"); + } + // Cooperative cancellation: signal the running pipeline to bail at the + // next stage boundary (plan→draft, draft→compose, and inside the draft + // fan-out) rather than running LLM calls to completion. + streamTracker.requestStop(sessionId); + sessionRegistry.cancel(sessionId); + // Close the SSE stream so subscribers detach immediately. + try { + streamTracker.broadcast(sessionId, "cancelled", "{\"message\":\"cancelled by user\"}"); + streamTracker.broadcast(sessionId, "done", "{}"); + streamTracker.complete(sessionId); + } catch (Exception ignored) {} + return R.ok(Map.of("sessionId", sessionId, "status", "cancelled")); + } + + // ── Auth helpers ────────────────────────────────────────────────────── + + private KbApiKeyContext requireContext(HttpServletRequest request) { + KbApiKeyContext ctx = (KbApiKeyContext) request.getAttribute(KbApiKeyContext.ATTR); + if (ctx == null) { + throw new MateClawException(401, "Authentication required"); + } + return ctx; + } + + private Session requireSessionOwnership(HttpServletRequest request, String sessionId) { + KbApiKeyContext ctx = requireContext(request); + Optional session = sessionRegistry.get(sessionId); + if (session.isEmpty()) { + throw new MateClawException(404, "Research session not found: " + sessionId); + } + // A caller can only access sessions they started + if (!session.get().keyId().equals(ctx.keyId())) { + throw new MateClawException(403, "Session does not belong to this API key"); + } + return session.get(); + } +} diff --git a/mateclaw-server/src/main/java/vip/mate/kbopen/research/KbResearchSessionRegistry.java b/mateclaw-server/src/main/java/vip/mate/kbopen/research/KbResearchSessionRegistry.java new file mode 100644 index 000000000..130272fd5 --- /dev/null +++ b/mateclaw-server/src/main/java/vip/mate/kbopen/research/KbResearchSessionRegistry.java @@ -0,0 +1,188 @@ +package vip.mate.kbopen.research; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import vip.mate.wiki.service.WikiResearchService.ResearchResult; + +import java.time.Duration; +import java.time.Instant; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Tracks active Deep Research sessions started via the Open API. + * + *

Each session records the owning API key id + kbId so that status/cancel + * endpoints can authorize access (a caller can only query/cancel their own + * sessions). Results are stored on completion for the status endpoint to + * return synchronously. + * + *

This is an in-memory registry (single-node). For multi-node, sessions + * would need to live in a shared store — but research is short-lived (< 1 min + * typical) and the SSE stream must connect to the node running the job, so + * sticky routing is a prerequisite anyway. + * + *

Lifecycle invariants

+ *
    + *
  • {@link Status#CANCELLED} is a sticky terminal state: a late + * {@link #complete}/{@link #fail} arriving after cancel is a no-op, so + * the user who cancelled never sees a COMPLETED report.
  • + *
  • Terminal sessions are evicted by {@link #evictExpired} after + * {@code mate.kbopen.research.session-ttl} (default 30 min) so the map + * cannot grow without bound.
  • + *
  • {@link #startIfAllowed} enforces a per-key concurrency cap + * ({@code mate.kbopen.research.max-concurrent-per-key}, default 3) as a + * DoS / runaway-cost guard on top of the per-minute rate limiter.
  • + *
+ */ +@Slf4j +@Component +public class KbResearchSessionRegistry { + + public enum Status { RUNNING, COMPLETED, FAILED, CANCELLED } + + /** Adds {@code updatedAt} so the eviction sweep can find stale terminals. */ + public record Session(String sessionId, Long keyId, Long kbId, String topic, Status status, + ResearchResult result, String error, Instant updatedAt) { + + /** Convenience for {@link #register} (status=RUNNING, no result). */ + static Session running(String sessionId, Long keyId, Long kbId, String topic) { + return new Session(sessionId, keyId, kbId, topic, Status.RUNNING, null, null, Instant.now()); + } + + private Session with(Status newStatus, ResearchResult res, String err) { + return new Session(sessionId, keyId, kbId, topic, newStatus, res, err, Instant.now()); + } + } + + /** Exception thrown by {@link #startIfAllowed} when the per-key cap is hit. */ + public static class TooManyConcurrentException extends RuntimeException { + public TooManyConcurrentException(String msg) { super(msg); } + } + + private final Map sessions = new ConcurrentHashMap<>(); + + /** + * Per-key count of RUNNING sessions, kept in lock-step with the + * {@code status==RUNNING} sessions in {@link #sessions}. Maintained + * atomically so {@link #startIfAllowed} can enforce the cap without a + * check-then-act race (two concurrent starts could both pass a stream-based + * count and both put). Incremented on start, decremented on each + * RUNNING→terminal transition (complete/fail/cancel). + */ + private final Map runningPerKey = new ConcurrentHashMap<>(); + + private final int maxConcurrentPerKey; + private final Duration sessionTtl; + + public KbResearchSessionRegistry( + @Value("${mate.kbopen.research.max-concurrent-per-key:3}") int maxConcurrentPerKey, + @Value("${mate.kbopen.research.session-ttl:PT30M}") Duration sessionTtl) { + this.maxConcurrentPerKey = maxConcurrentPerKey; + this.sessionTtl = sessionTtl; + } + + /** + * Reserve a slot for a new session, enforcing the per-key concurrency cap. + * + *

Atomic: {@code incrementAndGet} + rollback on overflow, so concurrent + * starts for the same key cannot both slip past the cap. The previous + * stream-and-count impl had a check-then-act race. + * + * @throws TooManyConcurrentException if {@code keyId} already has + * {@code maxConcurrentPerKey} RUNNING sessions. + */ + public void startIfAllowed(String sessionId, Long keyId, Long kbId, String topic) { + AtomicInteger count = runningPerKey.computeIfAbsent(keyId, k -> new AtomicInteger()); + int now = count.incrementAndGet(); + if (now > maxConcurrentPerKey) { + count.decrementAndGet(); // rollback — slot was not granted + throw new TooManyConcurrentException( + "API key already has " + maxConcurrentPerKey + + " running research session(s); limit is " + maxConcurrentPerKey); + } + sessions.put(sessionId, Session.running(sessionId, keyId, kbId, topic)); + } + + public Optional get(String sessionId) { + return Optional.ofNullable(sessions.get(sessionId)); + } + + /** RUNNING → COMPLETED. No-op on a session that was already CANCELLED (sticky terminal). */ + public void complete(String sessionId, ResearchResult result) { + sessions.computeIfPresent(sessionId, (k, s) -> { + if (s.status() == Status.CANCELLED) { + return s; // sticky terminal — no transition, no counter change + } + decrementRunning(s.keyId()); // RUNNING → COMPLETED releases the slot + return s.with(Status.COMPLETED, result, null); + }); + } + + /** RUNNING → FAILED. No-op on a session that was already CANCELLED (sticky terminal). */ + public void fail(String sessionId, String error) { + sessions.computeIfPresent(sessionId, (k, s) -> { + if (s.status() == Status.CANCELLED) { + return s; + } + decrementRunning(s.keyId()); + return s.with(Status.FAILED, null, error); + }); + } + + /** RUNNING → CANCELLED. Returns false if the session is missing or already terminal. */ + public boolean cancel(String sessionId) { + Session[] before = new Session[1]; + sessions.computeIfPresent(sessionId, (k, s) -> { + before[0] = s; + if (s.status() == Status.RUNNING) { + decrementRunning(s.keyId()); + return s.with(Status.CANCELLED, null, null); + } + return s; + }); + return before[0] != null && before[0].status() == Status.RUNNING; + } + + /** Release one running-slot for {@code keyId}, floored at 0. */ + private void decrementRunning(Long keyId) { + AtomicInteger count = runningPerKey.get(keyId); + if (count != null) { + // getAndDeccrement would go negative; clamp instead so repeated + // terminal transitions (e.g. complete after cancel) can't drift. + while (true) { + int cur = count.get(); + if (cur <= 0) break; + if (count.compareAndSet(cur, cur - 1)) break; + } + } + } + + /** + * Drop terminal sessions older than {@code sessionTtl}. Called periodically + * by {@link #evictExpired}; public for testing. + */ + public int evictExpired(Instant now) { + int removed = 0; + for (Map.Entry e : sessions.entrySet()) { + Session s = e.getValue(); + if (s.status() != Status.RUNNING && now.isAfter(s.updatedAt().plus(sessionTtl))) { + if (sessions.remove(e.getKey()) != null) removed++; + } + } + if (removed > 0) { + log.info("[KbResearchSessionRegistry] Evicted {} terminal session(s) older than {}", removed, sessionTtl); + } + return removed; + } + + /** Scheduled sweep — runs every 5 min. */ + @Scheduled(fixedDelay = 5 * 60 * 1000L) + public void evictExpired() { + evictExpired(Instant.now()); + } +} diff --git a/mateclaw-server/src/main/java/vip/mate/wiki/service/WikiResearchService.java b/mateclaw-server/src/main/java/vip/mate/wiki/service/WikiResearchService.java index 82cb55d29..30bb1cdac 100644 --- a/mateclaw-server/src/main/java/vip/mate/wiki/service/WikiResearchService.java +++ b/mateclaw-server/src/main/java/vip/mate/wiki/service/WikiResearchService.java @@ -81,6 +81,10 @@ public ResearchResult research(Long kbId, String topic, String sessionId, Intege broadcast(sessionId, "research.plan", Map.of( "questions", questions.stream().map(q -> Map.of("question", q.question, "intent", q.intent)).toList() )); + // Cooperative cancellation: the Open API cancel endpoint calls + // streamTracker.requestStop(sessionId). Bail before the expensive + // retrieve+draft fan-out so cancel actually halts LLM cost. + ensureNotCancelled(sessionId); // Stage 2: Retrieve + Draft (并行) List

sections = draftStage(kbId, questions, topK, sessionId); @@ -88,6 +92,8 @@ public ResearchResult research(Long kbId, String topic, String sessionId, Intege broadcast(sessionId, "research.error", Map.of("message", i18n.msg("research.broadcast.draft_all_empty"))); return new ResearchResult(topic, sections, i18n.msg("research.fallback.no_materials")); } + // Second checkpoint before the compose LLM call. + ensureNotCancelled(sessionId); // Stage 3: Compose String report = composeStage(topic, sections); @@ -97,6 +103,11 @@ public ResearchResult research(Long kbId, String topic, String sessionId, Intege "materialsUsed", sections.stream().flatMap(s -> s.materialRefs.stream()).distinct().count() )); return new ResearchResult(topic, sections, report); + } catch (ResearchCancelledException ce) { + // Expected: caller has already flipped the session to CANCELLED + // and closed the SSE stream. Do not broadcast an error event. + log.info("[Research] Cancelled: kbId={}, topic={}, sessionId={}", kbId, topic, sessionId); + return new ResearchResult(topic, List.of(), "Research cancelled by user"); } catch (Exception e) { log.error("[Research] Failed: kbId={}, topic={}: {}", kbId, topic, e.getMessage(), e); broadcast(sessionId, "research.error", Map.of("message", e.getMessage() != null ? e.getMessage() : i18n.msg("research.broadcast.failed"))); @@ -104,6 +115,17 @@ public ResearchResult research(Long kbId, String topic, String sessionId, Intege } } + /** + * Throws {@link ResearchCancelledException} if the caller (via the Open API + * cancel endpoint) has called {@link ChatStreamTracker#requestStop} on this + * session. Checked at each pipeline stage boundary. + */ + private void ensureNotCancelled(String sessionId) { + if (streamTracker.isStopRequested(sessionId)) { + throw new ResearchCancelledException(sessionId); + } + } + // ==================== Stage 1: Plan ==================== private List planStage(String topic) { @@ -151,6 +173,12 @@ private List
draftStage(Long kbId, List questions, int top Thread.currentThread().interrupt(); return new Section(q.question, "", List.of()); } + // Re-check cancellation inside the parallel draft loop too — + // draftOneSection issues its own LLM call, which is the main + // cost driver, so skip queued-but-not-started drafts on cancel. + if (streamTracker.isStopRequested(sessionId)) { + return new Section(q.question, "", List.of()); + } try { Section section = draftOneSection(kbId, q, topK); broadcast(sessionId, "research.draft", Map.of( @@ -300,4 +328,16 @@ public record MaterialRef(int index, Long chunkId, Long rawId, String rawTitle) public record Section(String question, String content, List materialRefs) {} public record ResearchResult(String topic, List
sections, String report) {} + + /** + * Thrown when {@link ChatStreamTracker#requestStop(String)} was called on + * the session between pipeline stages. The {@link #research} method catches + * this to short-circuit — the caller (the Open API controller) has already + * flipped the registry to CANCELLED and closed the SSE stream. + */ + public static class ResearchCancelledException extends RuntimeException { + public ResearchCancelledException(String sessionId) { + super("Research cancelled: sessionId=" + sessionId); + } + } } diff --git a/mateclaw-server/src/test/java/vip/mate/kbopen/auth/KbOpenApiAuthFilterTest.java b/mateclaw-server/src/test/java/vip/mate/kbopen/auth/KbOpenApiAuthFilterTest.java new file mode 100644 index 000000000..0b47d8c26 --- /dev/null +++ b/mateclaw-server/src/test/java/vip/mate/kbopen/auth/KbOpenApiAuthFilterTest.java @@ -0,0 +1,126 @@ +package vip.mate.kbopen.auth; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.mock.web.MockFilterChain; +import org.springframework.mock.web.MockHttpServletRequest; +import org.springframework.mock.web.MockHttpServletResponse; +import vip.mate.kbopen.auth.KbApiKeyService.AuthResult; + +import java.time.LocalDateTime; +import java.util.Optional; +import java.util.Set; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Unit tests for {@link KbOpenApiAuthFilter} — focuses on the R7 SSE token + * fallback and R5 scope limitation added in #446: + *
    + *
  • {@code ?token=} is accepted on SSE stream paths (EventSource can't set + * an Authorization header).
  • + *
  • {@code ?token=} is rejected on non-SSE paths so the key doesn't leak + * into access / proxy logs.
  • + *
  • The SSE stream path bypasses the per-minute rate limiter (reconnects / + * heartbeats must not burn the key's start quota).
  • + *
+ */ +class KbOpenApiAuthFilterTest { + + private static final String KEY = "mck_abcd1234"; + private static final KbApiKeyContext CTX = + new KbApiKeyContext(7L, 1L, Set.of(10L), Set.of("kb:search"), 60); + + private KbApiKeyService keyService; + private KbApiKeyRateLimiter rateLimiter; + private KbOpenApiAuthFilter filter; + + @BeforeEach + void setUp() { + keyService = mock(KbApiKeyService.class); + rateLimiter = mock(KbApiKeyRateLimiter.class); + filter = new KbOpenApiAuthFilter(keyService, rateLimiter); + when(keyService.authenticate(KEY)).thenReturn(Optional.of(new AuthResult(CTX, LocalDateTime.now()))); + when(rateLimiter.tryAcquire(anyLong(), anyInt(), org.mockito.ArgumentMatchers.any())).thenReturn(true); + } + + private MockHttpServletRequest startRequest() { + MockHttpServletRequest req = new MockHttpServletRequest(); + req.setRequestURI("/api/v1/open/kb/10/research"); + req.setMethod("POST"); + req.addHeader("Authorization", "Bearer " + KEY); + return req; + } + + private MockHttpServletRequest sseRequest() { + MockHttpServletRequest req = new MockHttpServletRequest(); + req.setRequestURI("/api/v1/open/kb/10/research/open-research-x/stream"); + req.setMethod("GET"); + req.setQueryString("token=" + KEY); + req.addParameter("token", KEY); + return req; + } + + private int run(MockHttpServletRequest req) throws Exception { + MockHttpServletResponse res = new MockHttpServletResponse(); + filter.doFilter(req, res, new MockFilterChain()); + return res.getStatus(); + } + + @Test + @DisplayName("non-SSE path: header auth passes") + void nonSseHeaderAuth() throws Exception { + assertThat(run(startRequest())).isEqualTo(200); + } + + @Test + @DisplayName("non-SSE path: ?token= is rejected even with a valid key (R5 — no log leak)") + void nonSseQueryTokenRejected() throws Exception { + MockHttpServletRequest req = startRequest(); + req.removeHeader("Authorization"); + req.addParameter("token", KEY); + req.setQueryString("token=" + KEY); + + assertThat(run(req)).isEqualTo(401); + verify(keyService, never()).authenticate(KEY); + } + + @Test + @DisplayName("SSE path: ?token= authenticates (R7 — EventSource fallback)") + void sseQueryTokenAccepted() throws Exception { + assertThat(run(sseRequest())).isEqualTo(200); + verify(keyService).authenticate(KEY); + } + + @Test + @DisplayName("SSE path: missing token → 401") + void sseMissingToken() throws Exception { + MockHttpServletRequest req = sseRequest(); + req.removeParameter("token"); + req.setQueryString(null); + assertThat(run(req)).isEqualTo(401); + } + + @Test + @DisplayName("SSE path: bypasses the per-minute rate limiter (reconnects must not burn quota)") + void sseSkipsRateLimit() throws Exception { + run(sseRequest()); + verify(rateLimiter, never()) + .tryAcquire(anyLong(), anyInt(), org.mockito.ArgumentMatchers.any()); + } + + @Test + @DisplayName("non-SSE path: still goes through the rate limiter") + void nonSseHitsRateLimit() throws Exception { + run(startRequest()); + verify(rateLimiter) + .tryAcquire(anyLong(), anyInt(), org.mockito.ArgumentMatchers.any()); + } +} diff --git a/mateclaw-server/src/test/java/vip/mate/kbopen/research/KbResearchSessionRegistryTest.java b/mateclaw-server/src/test/java/vip/mate/kbopen/research/KbResearchSessionRegistryTest.java new file mode 100644 index 000000000..fd31b0363 --- /dev/null +++ b/mateclaw-server/src/test/java/vip/mate/kbopen/research/KbResearchSessionRegistryTest.java @@ -0,0 +1,256 @@ +package vip.mate.kbopen.research; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import vip.mate.kbopen.research.KbResearchSessionRegistry.Session; +import vip.mate.kbopen.research.KbResearchSessionRegistry.Status; +import vip.mate.kbopen.research.KbResearchSessionRegistry.TooManyConcurrentException; +import vip.mate.wiki.service.WikiResearchService.ResearchResult; + +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.Optional; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Tests for {@link KbResearchSessionRegistry} — session lifecycle, status + * transitions, sticky CANCELLED terminal, per-key concurrency cap, and TTL + * eviction. + */ +class KbResearchSessionRegistryTest { + + private static final ResearchResult RESULT = new ResearchResult("topic", List.of(), "final report"); + + private KbResearchSessionRegistry newRegistry() { + return new KbResearchSessionRegistry(3, Duration.ofMinutes(30)); + } + + @Test + @DisplayName("startIfAllowed creates a RUNNING session") + void registerCreatesRunning() { + KbResearchSessionRegistry registry = newRegistry(); + registry.startIfAllowed("s1", 100L, 10L, "test topic"); + + Optional session = registry.get("s1"); + assertThat(session).isPresent(); + assertThat(session.get().status()).isEqualTo(Status.RUNNING); + assertThat(session.get().keyId()).isEqualTo(100L); + assertThat(session.get().kbId()).isEqualTo(10L); + assertThat(session.get().topic()).isEqualTo("test topic"); + assertThat(session.get().updatedAt()).isNotNull(); + } + + @Test + @DisplayName("complete transitions RUNNING → COMPLETED with result") + void completeTransitionsToCompleted() { + KbResearchSessionRegistry registry = newRegistry(); + registry.startIfAllowed("s1", 100L, 10L, "topic"); + + registry.complete("s1", RESULT); + + Session session = registry.get("s1").get(); + assertThat(session.status()).isEqualTo(Status.COMPLETED); + assertThat(session.result()).isEqualTo(RESULT); + assertThat(session.result().report()).isEqualTo("final report"); + } + + @Test + @DisplayName("fail transitions RUNNING → FAILED with error") + void failTransitionsToFailed() { + KbResearchSessionRegistry registry = newRegistry(); + registry.startIfAllowed("s1", 100L, 10L, "topic"); + + registry.fail("s1", "LLM timeout"); + + Session session = registry.get("s1").get(); + assertThat(session.status()).isEqualTo(Status.FAILED); + assertThat(session.error()).isEqualTo("LLM timeout"); + } + + @Test + @DisplayName("cancel transitions RUNNING → CANCELLED") + void cancelTransitionsToCancelled() { + KbResearchSessionRegistry registry = newRegistry(); + registry.startIfAllowed("s1", 100L, 10L, "topic"); + + boolean cancelled = registry.cancel("s1"); + + assertThat(cancelled).isTrue(); + Session session = registry.get("s1").get(); + assertThat(session.status()).isEqualTo(Status.CANCELLED); + } + + @Test + @DisplayName("cancel on non-running session returns false (no-op)") + void cancelOnCompletedIsNoop() { + KbResearchSessionRegistry registry = newRegistry(); + registry.startIfAllowed("s1", 100L, 10L, "topic"); + registry.complete("s1", RESULT); + + boolean cancelled = registry.cancel("s1"); + + assertThat(cancelled).isFalse(); + Session session = registry.get("s1").get(); + assertThat(session.status()).isEqualTo(Status.COMPLETED); + } + + @Test + @DisplayName("get on unknown session returns empty") + void getUnknownReturnsEmpty() { + KbResearchSessionRegistry registry = newRegistry(); + assertThat(registry.get("nonexistent")).isEmpty(); + } + + // ── Review #446: sticky CANCELLED terminal ──────────────────────────── + + @Test + @DisplayName("complete after cancel is a no-op — CANCELLED is sticky") + void completeAfterCancelIsNoop() { + KbResearchSessionRegistry registry = newRegistry(); + registry.startIfAllowed("s1", 100L, 10L, "topic"); + registry.cancel("s1"); + + // Late complete() arriving from the async pipeline must NOT overwrite + // the CANCELLED terminal the user explicitly requested. + registry.complete("s1", RESULT); + + Session session = registry.get("s1").get(); + assertThat(session.status()).isEqualTo(Status.CANCELLED); + assertThat(session.result()).isNull(); + } + + @Test + @DisplayName("fail after cancel is a no-op — CANCELLED is sticky") + void failAfterCancelIsNoop() { + KbResearchSessionRegistry registry = newRegistry(); + registry.startIfAllowed("s1", 100L, 10L, "topic"); + registry.cancel("s1"); + + registry.fail("s1", "race condition"); + + Session session = registry.get("s1").get(); + assertThat(session.status()).isEqualTo(Status.CANCELLED); + assertThat(session.error()).isNull(); + } + + // ── Review #446: per-key concurrency cap ────────────────────────────── + + @Test + @DisplayName("startIfAllowed throws once the per-key cap is reached") + void startIfAllowedEnforcesCap() { + KbResearchSessionRegistry registry = new KbResearchSessionRegistry(2, Duration.ofMinutes(30)); + registry.startIfAllowed("s1", 100L, 10L, "t1"); + registry.startIfAllowed("s2", 100L, 10L, "t2"); + + // Third running session for the same key should be rejected → 429 upstream. + assertThatThrownBy(() -> registry.startIfAllowed("s3", 100L, 10L, "t3")) + .isInstanceOf(TooManyConcurrentException.class) + .hasMessageContaining("limit is 2"); + + // A different key is unaffected (cap is per-key, not global). + registry.startIfAllowed("s4", 200L, 10L, "t4"); + assertThat(registry.get("s4")).isPresent(); + } + + @Test + @DisplayName("completed sessions do not count toward the running cap") + void completedDoesNotCountTowardCap() { + KbResearchSessionRegistry registry = new KbResearchSessionRegistry(1, Duration.ofMinutes(30)); + registry.startIfAllowed("s1", 100L, 10L, "t1"); + registry.complete("s1", RESULT); + + // The terminal session no longer occupies a slot. + registry.startIfAllowed("s2", 100L, 10L, "t2"); + assertThat(registry.get("s2")).isPresent(); + } + + @Test + @DisplayName("cancelled and failed sessions also release their slot (counter consistency)") + void cancelledAndFailedReleaseSlot() { + KbResearchSessionRegistry registry = new KbResearchSessionRegistry(1, Duration.ofMinutes(30)); + registry.startIfAllowed("s1", 100L, 10L, "t1"); + registry.cancel("s1"); // RUNNING → CANCELLED releases slot + registry.startIfAllowed("s2", 100L, 10L, "t2"); + assertThat(registry.get("s2")).isPresent(); + + registry.fail("s2", "boom"); // RUNNING → FAILED releases slot + registry.startIfAllowed("s3", 100L, 10L, "t3"); + assertThat(registry.get("s3")).isPresent(); + } + + @Test + @DisplayName("concurrent starts never exceed the per-key cap (no check-then-act race)") + void startIfAllowedIsAtomicUnderConcurrency() throws Exception { + int cap = 3; + KbResearchSessionRegistry registry = new KbResearchSessionRegistry(cap, Duration.ofMinutes(30)); + int threads = cap * 4; // far more contenders than slots + java.util.concurrent.CountDownLatch start = new java.util.concurrent.CountDownLatch(1); + java.util.concurrent.atomic.AtomicInteger admitted = new java.util.concurrent.atomic.AtomicInteger(); + java.util.concurrent.atomic.AtomicInteger rejected = new java.util.concurrent.atomic.AtomicInteger(); + java.util.List workers = new java.util.ArrayList<>(); + + for (int i = 0; i < threads; i++) { + String sid = "concurrent-" + i; + Thread t = Thread.ofVirtual().start(() -> { + try { + start.await(); + registry.startIfAllowed(sid, 100L, 10L, "t"); + admitted.incrementAndGet(); + } catch (TooManyConcurrentException e) { + rejected.incrementAndGet(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + workers.add(t); + } + start.countDown(); + for (Thread t : workers) t.join(); + + // The whole point: exactly `cap` sessions get in, no matter the + // scheduling. The old stream-and-count impl could admit more under + // contention. + assertThat(admitted.get()).isEqualTo(cap); + assertThat(rejected.get()).isEqualTo(threads - cap); + } + + // ── Review #446: TTL eviction ───────────────────────────────────────── + + @Test + @DisplayName("evictExpired removes terminal sessions past TTL but keeps RUNNING + fresh terminals") + void evictExpiredRemovesStaleTerminals() { + // Tiny TTL so a fresh terminal (updatedAt≈now) is clearly within window. + Duration ttl = Duration.ofSeconds(60); + KbResearchSessionRegistry registry = new KbResearchSessionRegistry(3, ttl); + registry.startIfAllowed("s1", 100L, 10L, "running"); // RUNNING — never evicted + registry.startIfAllowed("s2", 100L, 10L, "stale-completed"); + registry.complete("s2", RESULT); // terminal, will be aged + registry.startIfAllowed("s3", 100L, 10L, "fresh-cancelled"); + registry.cancel("s3"); // terminal, fresh + + // Horizon well past TTL: both terminals s2 and s3 are now stale. + Instant far = Instant.now().plus(Duration.ofSeconds(120)); + int removed = registry.evictExpired(far); + assertThat(removed).isEqualTo(2); + assertThat(registry.get("s1")).isPresent(); // running always kept + assertThat(registry.get("s2")).isEmpty(); + assertThat(registry.get("s3")).isEmpty(); + } + + @Test + @DisplayName("evictExpired keeps a fresh terminal within the TTL window") + void evictExpiredKeepsFreshTerminal() { + Duration ttl = Duration.ofHours(1); + KbResearchSessionRegistry registry = new KbResearchSessionRegistry(3, ttl); + registry.startIfAllowed("s1", 100L, 10L, "just-completed"); + registry.complete("s1", RESULT); // updatedAt ≈ now + + // Evict only 5 seconds later — well within the 1h TTL. + int removed = registry.evictExpired(Instant.now().plus(Duration.ofSeconds(5))); + assertThat(removed).isZero(); + assertThat(registry.get("s1")).isPresent(); + } +} diff --git a/rfcs/kb-open-api-design.md b/rfcs/kb-open-api-design.md index c5cbcc147..ce22211c1 100644 --- a/rfcs/kb-open-api-design.md +++ b/rfcs/kb-open-api-design.md @@ -252,7 +252,7 @@ KbOpenApiAuthFilter (OncePerRequestFilter, 仅拦截 /api/v1/open/kb/**) | Scope | 允许的操作 | 说明 | |---|---|---| -| `kb:search` | POST `/search`、POST `/search/chunks` | 检索 | +| `kb:search` | POST `/search`、POST `/search/chunks`、POST `/research/**`(Deep Research 全套)| 检索(含异步深度研究,复用同一 scope)| | `kb:read` | GET `/pages/{slug}`、GET `/pages/{slug}/trace`、POST `/pages/{slug}/traverse` | 读主体画像 + 溯源 + 关系遍历 | | `kb:list` | GET `/pages`、GET `/taxonomy` | 列页面 + 地图 | | `kb:meta` | GET `/stats`、GET `/whats-new` | 元信息 + 时效查询 |