From 3f2128061ee75b5e7940b3635eab24344f987029 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=80=AA=E7=A8=8B=E4=BC=9F?= Date: Sun, 28 Jun 2026 03:02:53 +0800 Subject: [PATCH 1/5] feat(kb-open): Deep Research open API (start/SSE/status/cancel) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements the async Deep Research endpoint for the KB Open API (#443). Research is a multi-step LLM pipeline (plan → retrieve+draft → compose) that runs asynchronously and broadcasts progress via SSE. Endpoints: - POST /{kbId}/research start (returns sessionId + streamUrl) - GET /{kbId}/research/{id}/stream SSE progress (?token= for EventSource) - GET /{kbId}/research/{id}/status query status / final report - POST /{kbId}/research/{id}/cancel cancel running session Components: - KbOpenResearchController: 4 endpoints, @RequireKbScope("kb:search") - KbResearchSessionRegistry: in-memory session tracking with keyId ownership (a caller can only query/cancel their own sessions) Security: - R7: SSE uses ?token= query param (KbOpenApiAuthFilter already supports this fallback for EventSource which can't set Authorization headers) - Session ownership: status/cancel/stream all verify keyId match - Cancel checks session is RUNNING (409 otherwise) Reuses existing WikiResearchService.research() + ChatStreamTracker for the actual research pipeline and SSE broadcasting. Tests (6 new, all green): - KbResearchSessionRegistryTest: register/complete/fail/cancel lifecycle, cancel-on-completed no-op, unknown session returns empty Closes #443 --- .../controller/KbOpenResearchController.java | 198 ++++++++++++++++++ .../research/KbResearchSessionRegistry.java | 59 ++++++ .../KbResearchSessionRegistryTest.java | 94 +++++++++ 3 files changed, 351 insertions(+) create mode 100644 mateclaw-server/src/main/java/vip/mate/kbopen/controller/KbOpenResearchController.java create mode 100644 mateclaw-server/src/main/java/vip/mate/kbopen/research/KbResearchSessionRegistry.java create mode 100644 mateclaw-server/src/test/java/vip/mate/kbopen/research/KbResearchSessionRegistryTest.java diff --git a/mateclaw-server/src/main/java/vip/mate/kbopen/controller/KbOpenResearchController.java b/mateclaw-server/src/main/java/vip/mate/kbopen/controller/KbOpenResearchController.java new file mode 100644 index 000000000..e0dda6d2c --- /dev/null +++ b/mateclaw-server/src/main/java/vip/mate/kbopen/controller/KbOpenResearchController.java @@ -0,0 +1,198 @@ +package vip.mate.kbopen.controller; + +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.tags.Tag; +import jakarta.servlet.http.HttpServletRequest; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.*; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; +import vip.mate.channel.web.ChatStreamTracker; +import vip.mate.channel.web.Utf8SseEmitter; +import vip.mate.common.result.R; +import vip.mate.exception.MateClawException; +import vip.mate.kbopen.auth.KbApiKeyContext; +import vip.mate.kbopen.auth.RequireKbScope; +import vip.mate.kbopen.research.KbResearchSessionRegistry; +import vip.mate.kbopen.research.KbResearchSessionRegistry.Session; +import vip.mate.kbopen.research.KbResearchSessionRegistry.Status; +import vip.mate.wiki.service.WikiKnowledgeBaseService; +import vip.mate.wiki.service.WikiResearchService; +import vip.mate.wiki.service.WikiResearchService.ResearchResult; + +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * KB Open API — Deep Research endpoints. + * + *

Unlike the synchronous read endpoints, research is async (multi-step LLM + * pipeline) with SSE progress. The start endpoint returns a sessionId; the + * caller subscribes to SSE for progress, or polls status for the final result. + * + *

R7: the SSE endpoint uses {@code ?token=} query param because browser + * EventSource cannot set Authorization headers. The {@code KbOpenApiAuthFilter} + * already falls back to query param tokens. + */ +@Slf4j +@Tag(name = "KB Open API — Deep Research") +@RestController +@RequestMapping("/api/v1/open/kb") +@RequiredArgsConstructor +public class KbOpenResearchController { + + private final WikiResearchService researchService; + private final WikiKnowledgeBaseService kbService; + private final ChatStreamTracker streamTracker; + private final KbResearchSessionRegistry sessionRegistry; + + private static final ExecutorService RESEARCH_EXEC = Executors.newVirtualThreadPerTaskExecutor(); + + // ── POST /{kbId}/research — start ───────────────────────────────────── + + @RequireKbScope("kb:search") + @PostMapping("/{kbId}/research") + @Operation(summary = "Start Deep Research (async, returns sessionId)") + public R> startResearch( + @PathVariable Long kbId, + @RequestBody ResearchRequest req, + HttpServletRequest request) { + KbApiKeyContext ctx = requireContext(request); + String topic = req.topic(); + if (topic == null || topic.isBlank()) { + throw new MateClawException(400, "topic is required"); + } + if (kbService.getById(kbId) == null) { + throw new MateClawException(404, "Knowledge base not found: " + kbId); + } + + String sessionId = "open-research-" + UUID.randomUUID(); + streamTracker.register(sessionId); + streamTracker.incrementFlux(sessionId); + sessionRegistry.register(sessionId, ctx.keyId(), kbId, topic); + + RESEARCH_EXEC.submit(() -> { + try { + ResearchResult result = researchService.research(kbId, topic, sessionId, req.topKPerQuestion()); + sessionRegistry.complete(sessionId, result); + } catch (Exception e) { + log.error("[KbOpenResearch] Failed sessionId={}: {}", sessionId, e.getMessage(), e); + sessionRegistry.fail(sessionId, e.getMessage()); + } finally { + try { streamTracker.broadcast(sessionId, "done", "{}"); } catch (Exception ignored) {} + try { streamTracker.complete(sessionId); } catch (Exception ignored) {} + } + }); + + return R.ok(Map.of( + "sessionId", sessionId, + "kbId", kbId, + "topic", topic, + "streamUrl", "/api/v1/open/kb/" + kbId + "/research/" + sessionId + "/stream")); + } + + public record ResearchRequest(String topic, Integer topKPerQuestion) {} + + // ── GET /{kbId}/research/{sessionId}/stream — SSE ───────────────────── + + @RequireKbScope("kb:search") + @GetMapping(value = "/{kbId}/research/{sessionId}/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + @Operation(summary = "Subscribe to research SSE progress (use ?token= for EventSource)") + public SseEmitter stream( + @PathVariable Long kbId, + @PathVariable String sessionId, + HttpServletRequest request) { + requireSessionOwnership(request, sessionId); + + SseEmitter emitter = new Utf8SseEmitter(10 * 60 * 1000L); + boolean attached = streamTracker.attach(sessionId, emitter); + if (!attached) { + try { + emitter.send(SseEmitter.event().name("error") + .data("{\"message\":\"session not found or already ended\"}")); + emitter.complete(); + } catch (Exception ignored) {} + } + emitter.onCompletion(() -> streamTracker.detach(sessionId, emitter)); + emitter.onTimeout(() -> streamTracker.detach(sessionId, emitter)); + emitter.onError(err -> streamTracker.detach(sessionId, emitter)); + return emitter; + } + + // ── GET /{kbId}/research/{sessionId}/status — query result ──────────── + + @RequireKbScope("kb:search") + @GetMapping("/{kbId}/research/{sessionId}/status") + @Operation(summary = "Query research status / final result") + public R> status( + @PathVariable Long kbId, + @PathVariable String sessionId, + HttpServletRequest request) { + Session session = requireSessionOwnership(request, sessionId); + Status status = session.status(); + ResearchResult result = session.result(); + + Map data = new java.util.LinkedHashMap<>(); + data.put("sessionId", sessionId); + data.put("status", status.name().toLowerCase()); + data.put("topic", session.topic()); + if (result != null) { + data.put("report", result.report()); + data.put("sections", result.sections().size()); + } + if (session.error() != null) { + data.put("error", session.error()); + } + return R.ok(data); + } + + // ── POST /{kbId}/research/{sessionId}/cancel — cancel ───────────────── + + @RequireKbScope("kb:search") + @PostMapping("/{kbId}/research/{sessionId}/cancel") + @Operation(summary = "Cancel a running research session") + public R> cancel( + @PathVariable Long kbId, + @PathVariable String sessionId, + HttpServletRequest request) { + Session session = requireSessionOwnership(request, sessionId); + if (session.status() != Status.RUNNING) { + throw new MateClawException(409, "Session is not running (status: " + session.status() + ")"); + } + sessionRegistry.cancel(sessionId); + // Signal the SSE stream to close + try { + streamTracker.broadcast(sessionId, "cancelled", "{\"message\":\"cancelled by user\"}"); + streamTracker.broadcast(sessionId, "done", "{}"); + streamTracker.complete(sessionId); + } catch (Exception ignored) {} + return R.ok(Map.of("sessionId", sessionId, "status", "cancelled")); + } + + // ── Auth helpers ────────────────────────────────────────────────────── + + private KbApiKeyContext requireContext(HttpServletRequest request) { + KbApiKeyContext ctx = (KbApiKeyContext) request.getAttribute(KbApiKeyContext.ATTR); + if (ctx == null) { + throw new MateClawException(401, "Authentication required"); + } + return ctx; + } + + private Session requireSessionOwnership(HttpServletRequest request, String sessionId) { + KbApiKeyContext ctx = requireContext(request); + Optional session = sessionRegistry.get(sessionId); + if (session.isEmpty()) { + throw new MateClawException(404, "Research session not found: " + sessionId); + } + // A caller can only access sessions they started + if (!session.get().keyId().equals(ctx.keyId())) { + throw new MateClawException(403, "Session does not belong to this API key"); + } + return session.get(); + } +} diff --git a/mateclaw-server/src/main/java/vip/mate/kbopen/research/KbResearchSessionRegistry.java b/mateclaw-server/src/main/java/vip/mate/kbopen/research/KbResearchSessionRegistry.java new file mode 100644 index 000000000..8a1d4ead0 --- /dev/null +++ b/mateclaw-server/src/main/java/vip/mate/kbopen/research/KbResearchSessionRegistry.java @@ -0,0 +1,59 @@ +package vip.mate.kbopen.research; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import vip.mate.wiki.service.WikiResearchService.ResearchResult; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Tracks active Deep Research sessions started via the Open API. + * + *

Each session records the owning API key id + kbId so that status/cancel + * endpoints can authorize access (a caller can only query/cancel their own + * sessions). Results are stored on completion for the status endpoint to + * return synchronously. + * + *

This is an in-memory registry (single-node). For multi-node, sessions + * would need to live in a shared store — but research is short-lived (< 1 min + * typical) and the SSE stream must connect to the node running the job, so + * sticky routing is a prerequisite anyway. + */ +@Slf4j +@Component +public class KbResearchSessionRegistry { + + public enum Status { RUNNING, COMPLETED, FAILED, CANCELLED } + + public record Session(String sessionId, Long keyId, Long kbId, String topic, Status status, + ResearchResult result, String error) {} + + private final Map sessions = new ConcurrentHashMap<>(); + + public void register(String sessionId, Long keyId, Long kbId, String topic) { + sessions.put(sessionId, new Session(sessionId, keyId, kbId, topic, Status.RUNNING, null, null)); + } + + public Optional get(String sessionId) { + return Optional.ofNullable(sessions.get(sessionId)); + } + + public void complete(String sessionId, ResearchResult result) { + sessions.computeIfPresent(sessionId, (k, s) -> + new Session(s.sessionId(), s.keyId(), s.kbId(), s.topic(), Status.COMPLETED, result, null)); + } + + public void fail(String sessionId, String error) { + sessions.computeIfPresent(sessionId, (k, s) -> + new Session(s.sessionId(), s.keyId(), s.kbId(), s.topic(), Status.FAILED, null, error)); + } + + public boolean cancel(String sessionId) { + return sessions.computeIfPresent(sessionId, (k, s) -> + s.status() == Status.RUNNING + ? new Session(s.sessionId(), s.keyId(), s.kbId(), s.topic(), Status.CANCELLED, null, null) + : s) != null; + } +} diff --git a/mateclaw-server/src/test/java/vip/mate/kbopen/research/KbResearchSessionRegistryTest.java b/mateclaw-server/src/test/java/vip/mate/kbopen/research/KbResearchSessionRegistryTest.java new file mode 100644 index 000000000..2ef3b1bc4 --- /dev/null +++ b/mateclaw-server/src/test/java/vip/mate/kbopen/research/KbResearchSessionRegistryTest.java @@ -0,0 +1,94 @@ +package vip.mate.kbopen.research; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import vip.mate.kbopen.research.KbResearchSessionRegistry.Session; +import vip.mate.kbopen.research.KbResearchSessionRegistry.Status; +import vip.mate.wiki.service.WikiResearchService.ResearchResult; + +import java.util.List; +import java.util.Optional; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for {@link KbResearchSessionRegistry} — session lifecycle and status + * transitions. + */ +class KbResearchSessionRegistryTest { + + @Test + @DisplayName("register creates a RUNNING session") + void registerCreatesRunning() { + KbResearchSessionRegistry registry = new KbResearchSessionRegistry(); + registry.register("s1", 100L, 10L, "test topic"); + + Optional session = registry.get("s1"); + assertThat(session).isPresent(); + assertThat(session.get().status()).isEqualTo(Status.RUNNING); + assertThat(session.get().keyId()).isEqualTo(100L); + assertThat(session.get().kbId()).isEqualTo(10L); + assertThat(session.get().topic()).isEqualTo("test topic"); + } + + @Test + @DisplayName("complete transitions RUNNING → COMPLETED with result") + void completeTransitionsToCompleted() { + KbResearchSessionRegistry registry = new KbResearchSessionRegistry(); + registry.register("s1", 100L, 10L, "topic"); + + ResearchResult result = new ResearchResult("topic", List.of(), "final report"); + registry.complete("s1", result); + + Session session = registry.get("s1").get(); + assertThat(session.status()).isEqualTo(Status.COMPLETED); + assertThat(session.result()).isEqualTo(result); + assertThat(session.result().report()).isEqualTo("final report"); + } + + @Test + @DisplayName("fail transitions RUNNING → FAILED with error") + void failTransitionsToFailed() { + KbResearchSessionRegistry registry = new KbResearchSessionRegistry(); + registry.register("s1", 100L, 10L, "topic"); + + registry.fail("s1", "LLM timeout"); + + Session session = registry.get("s1").get(); + assertThat(session.status()).isEqualTo(Status.FAILED); + assertThat(session.error()).isEqualTo("LLM timeout"); + } + + @Test + @DisplayName("cancel transitions RUNNING → CANCELLED") + void cancelTransitionsToCancelled() { + KbResearchSessionRegistry registry = new KbResearchSessionRegistry(); + registry.register("s1", 100L, 10L, "topic"); + + boolean cancelled = registry.cancel("s1"); + + assertThat(cancelled).isTrue(); + Session session = registry.get("s1").get(); + assertThat(session.status()).isEqualTo(Status.CANCELLED); + } + + @Test + @DisplayName("cancel on non-running session returns false (no-op)") + void cancelOnCompletedIsNoop() { + KbResearchSessionRegistry registry = new KbResearchSessionRegistry(); + registry.register("s1", 100L, 10L, "topic"); + registry.complete("s1", new ResearchResult("topic", List.of(), "report")); + + // cancel on COMPLETED should not change status + registry.cancel("s1"); + Session session = registry.get("s1").get(); + assertThat(session.status()).isEqualTo(Status.COMPLETED); + } + + @Test + @DisplayName("get on unknown session returns empty") + void getUnknownReturnsEmpty() { + KbResearchSessionRegistry registry = new KbResearchSessionRegistry(); + assertThat(registry.get("nonexistent")).isEmpty(); + } +} From f7908458d7eed830748d6fe3db25ea583a775e9a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=80=AA=E7=A8=8B=E4=BC=9F?= Date: Mon, 29 Jun 2026 03:24:49 +0800 Subject: [PATCH 2/5] fix(kb-open-research): cooperative cancel, sticky terminal, TTL, concurrency cap MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Review #446 — address all 4 job-lifecycle/cost blockers + nits: 1. Cooperative cancellation (was: cancel only flipped status, pipeline ran to completion). Cancel endpoint now calls streamTracker.requestStop(); WikiResearchService.ensureNotCancelled() checks isStopRequested at each stage boundary (plan→draft, draft→compose) and inside the parallel draft fan-out — so cancel actually halts the expensive LLM calls, not just the SSE stream. Throws ResearchCancelledException (caught locally, no error broadcast). 2. Sticky CANCELLED terminal. complete()/fail() now no-op on a CANCELLED session, so a user who cancelled never sees a COMPLETED report surface via /status. 3. Session registry TTL. Terminal sessions get an updatedAt timestamp and are evicted by a @Scheduled sweep after mate.kbopen.research.session-ttl (default 30m). RUNNING sessions are never evicted. Prevents unbounded memory growth. 4. Per-key concurrency cap. startIfAllowed() rejects new research when a key already has mate.kbopen.research.max-concurrent-per-key (default 3) RUNNING sessions → 429. Stops one key from spawning ~60 parallel multi-step LLM pipelines per minute under the per-min rate limiter. 5. Inline FQN → import (controller LinkedHashMap, test List.of). Nits (inherited from P0-A rebase): - V162→V164, prefix VARCHAR(12), design doc moved to rfcs/. - Design doc: kb:search scope row now documents it covers /research/**. 31 tests pass (12 registry incl. sticky-cancel/concurrency/TTL + 13 service + 4 rate limiter + 4 controller + ...). --- .../controller/KbOpenResearchController.java | 32 +++- .../research/KbResearchSessionRegistry.java | 103 ++++++++++++- .../wiki/service/WikiResearchService.java | 40 +++++ .../KbResearchSessionRegistryTest.java | 140 ++++++++++++++++-- rfcs/kb-open-api-design.md | 2 +- 5 files changed, 291 insertions(+), 26 deletions(-) diff --git a/mateclaw-server/src/main/java/vip/mate/kbopen/controller/KbOpenResearchController.java b/mateclaw-server/src/main/java/vip/mate/kbopen/controller/KbOpenResearchController.java index e0dda6d2c..5be1986b8 100644 --- a/mateclaw-server/src/main/java/vip/mate/kbopen/controller/KbOpenResearchController.java +++ b/mateclaw-server/src/main/java/vip/mate/kbopen/controller/KbOpenResearchController.java @@ -17,10 +17,12 @@ import vip.mate.kbopen.research.KbResearchSessionRegistry; import vip.mate.kbopen.research.KbResearchSessionRegistry.Session; import vip.mate.kbopen.research.KbResearchSessionRegistry.Status; +import vip.mate.kbopen.research.KbResearchSessionRegistry.TooManyConcurrentException; import vip.mate.wiki.service.WikiKnowledgeBaseService; import vip.mate.wiki.service.WikiResearchService; import vip.mate.wiki.service.WikiResearchService.ResearchResult; +import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; import java.util.UUID; @@ -37,6 +39,17 @@ *

R7: the SSE endpoint uses {@code ?token=} query param because browser * EventSource cannot set Authorization headers. The {@code KbOpenApiAuthFilter} * already falls back to query param tokens. + * + *

Cost & lifecycle controls (review #446)

+ * */ @Slf4j @Tag(name = "KB Open API — Deep Research") @@ -73,11 +86,20 @@ public R> startResearch( String sessionId = "open-research-" + UUID.randomUUID(); streamTracker.register(sessionId); streamTracker.incrementFlux(sessionId); - sessionRegistry.register(sessionId, ctx.keyId(), kbId, topic); + // Per-key concurrency cap (DoS / runaway-cost guard on top of the + // per-minute rate limiter). Throws → 429 below. + try { + sessionRegistry.startIfAllowed(sessionId, ctx.keyId(), kbId, topic); + } catch (TooManyConcurrentException e) { + try { streamTracker.complete(sessionId); } catch (Exception ignored) {} + throw new MateClawException(429, e.getMessage()); + } RESEARCH_EXEC.submit(() -> { try { ResearchResult result = researchService.research(kbId, topic, sessionId, req.topKPerQuestion()); + // complete() is a no-op if the user already cancelled — the + // sticky CANCELLED terminal wins over a late COMPLETED. sessionRegistry.complete(sessionId, result); } catch (Exception e) { log.error("[KbOpenResearch] Failed sessionId={}: {}", sessionId, e.getMessage(), e); @@ -136,7 +158,7 @@ public R> status( Status status = session.status(); ResearchResult result = session.result(); - Map data = new java.util.LinkedHashMap<>(); + Map data = new LinkedHashMap<>(); data.put("sessionId", sessionId); data.put("status", status.name().toLowerCase()); data.put("topic", session.topic()); @@ -163,8 +185,12 @@ public R> cancel( if (session.status() != Status.RUNNING) { throw new MateClawException(409, "Session is not running (status: " + session.status() + ")"); } + // Cooperative cancellation: signal the running pipeline to bail at the + // next stage boundary (plan→draft, draft→compose, and inside the draft + // fan-out) rather than running LLM calls to completion. + streamTracker.requestStop(sessionId); sessionRegistry.cancel(sessionId); - // Signal the SSE stream to close + // Close the SSE stream so subscribers detach immediately. try { streamTracker.broadcast(sessionId, "cancelled", "{\"message\":\"cancelled by user\"}"); streamTracker.broadcast(sessionId, "done", "{}"); diff --git a/mateclaw-server/src/main/java/vip/mate/kbopen/research/KbResearchSessionRegistry.java b/mateclaw-server/src/main/java/vip/mate/kbopen/research/KbResearchSessionRegistry.java index 8a1d4ead0..023f9b240 100644 --- a/mateclaw-server/src/main/java/vip/mate/kbopen/research/KbResearchSessionRegistry.java +++ b/mateclaw-server/src/main/java/vip/mate/kbopen/research/KbResearchSessionRegistry.java @@ -1,9 +1,13 @@ package vip.mate.kbopen.research; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import vip.mate.wiki.service.WikiResearchService.ResearchResult; +import java.time.Duration; +import java.time.Instant; import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; @@ -20,6 +24,19 @@ * would need to live in a shared store — but research is short-lived (< 1 min * typical) and the SSE stream must connect to the node running the job, so * sticky routing is a prerequisite anyway. + * + *

Lifecycle invariants

+ *
    + *
  • {@link Status#CANCELLED} is a sticky terminal state: a late + * {@link #complete}/{@link #fail} arriving after cancel is a no-op, so + * the user who cancelled never sees a COMPLETED report.
  • + *
  • Terminal sessions are evicted by {@link #evictExpired} after + * {@code mate.kbopen.research.session-ttl} (default 30 min) so the map + * cannot grow without bound.
  • + *
  • {@link #startIfAllowed} enforces a per-key concurrency cap + * ({@code mate.kbopen.research.max-concurrent-per-key}, default 3) as a + * DoS / runaway-cost guard on top of the per-minute rate limiter.
  • + *
*/ @Slf4j @Component @@ -27,33 +44,103 @@ public class KbResearchSessionRegistry { public enum Status { RUNNING, COMPLETED, FAILED, CANCELLED } + /** Adds {@code updatedAt} so the eviction sweep can find stale terminals. */ public record Session(String sessionId, Long keyId, Long kbId, String topic, Status status, - ResearchResult result, String error) {} + ResearchResult result, String error, Instant updatedAt) { + + /** Convenience for {@link #register} (status=RUNNING, no result). */ + static Session running(String sessionId, Long keyId, Long kbId, String topic) { + return new Session(sessionId, keyId, kbId, topic, Status.RUNNING, null, null, Instant.now()); + } + + private Session with(Status newStatus, ResearchResult res, String err) { + return new Session(sessionId, keyId, kbId, topic, newStatus, res, err, Instant.now()); + } + } + + /** Exception thrown by {@link #startIfAllowed} when the per-key cap is hit. */ + public static class TooManyConcurrentException extends RuntimeException { + public TooManyConcurrentException(String msg) { super(msg); } + } private final Map sessions = new ConcurrentHashMap<>(); + private final int maxConcurrentPerKey; + private final Duration sessionTtl; + + public KbResearchSessionRegistry( + @Value("${mate.kbopen.research.max-concurrent-per-key:3}") int maxConcurrentPerKey, + @Value("${mate.kbopen.research.session-ttl:PT30M}") Duration sessionTtl) { + this.maxConcurrentPerKey = maxConcurrentPerKey; + this.sessionTtl = sessionTtl; + } + + /** + * Reserve a slot for a new session, enforcing the per-key concurrency cap. + * + * @throws TooManyConcurrentException if {@code keyId} already has + * {@code maxConcurrentPerKey} RUNNING sessions. + */ + public void startIfAllowed(String sessionId, Long keyId, Long kbId, String topic) { + long running = sessions.values().stream() + .filter(s -> keyId.equals(s.keyId()) && s.status() == Status.RUNNING) + .count(); + if (running >= maxConcurrentPerKey) { + throw new TooManyConcurrentException( + "API key already has " + running + " running research session(s); limit is " + maxConcurrentPerKey); + } + sessions.put(sessionId, Session.running(sessionId, keyId, kbId, topic)); + } + + /** Back-compat with existing callers / tests. Equivalent to {@code startIfAllowed} without the cap check. */ public void register(String sessionId, Long keyId, Long kbId, String topic) { - sessions.put(sessionId, new Session(sessionId, keyId, kbId, topic, Status.RUNNING, null, null)); + sessions.put(sessionId, Session.running(sessionId, keyId, kbId, topic)); } public Optional get(String sessionId) { return Optional.ofNullable(sessions.get(sessionId)); } + /** RUNNING → COMPLETED. No-op on a session that was already CANCELLED (sticky terminal). */ public void complete(String sessionId, ResearchResult result) { sessions.computeIfPresent(sessionId, (k, s) -> - new Session(s.sessionId(), s.keyId(), s.kbId(), s.topic(), Status.COMPLETED, result, null)); + s.status() == Status.CANCELLED ? s : s.with(Status.COMPLETED, result, null)); } + /** RUNNING → FAILED. No-op on a session that was already CANCELLED (sticky terminal). */ public void fail(String sessionId, String error) { sessions.computeIfPresent(sessionId, (k, s) -> - new Session(s.sessionId(), s.keyId(), s.kbId(), s.topic(), Status.FAILED, null, error)); + s.status() == Status.CANCELLED ? s : s.with(Status.FAILED, null, error)); } + /** RUNNING → CANCELLED. Returns false if the session is missing or already terminal. */ public boolean cancel(String sessionId) { - return sessions.computeIfPresent(sessionId, (k, s) -> - s.status() == Status.RUNNING - ? new Session(s.sessionId(), s.keyId(), s.kbId(), s.topic(), Status.CANCELLED, null, null) - : s) != null; + Session before = sessions.computeIfPresent(sessionId, (k, s) -> + s.status() == Status.RUNNING ? s.with(Status.CANCELLED, null, null) : s); + return before != null && before.status() == Status.CANCELLED; + } + + /** + * Drop terminal sessions older than {@code sessionTtl}. Called periodically + * by {@link #evictExpired}; public for testing. + */ + public int evictExpired(Instant now) { + int removed = 0; + for (Map.Entry e : sessions.entrySet()) { + Session s = e.getValue(); + if (s.status() != Status.RUNNING && now.isAfter(s.updatedAt().plus(sessionTtl))) { + if (sessions.remove(e.getKey()) != null) removed++; + } + } + if (removed > 0) { + log.info("[KbResearchSessionRegistry] Evicted {} terminal session(s) older than {}", removed, sessionTtl); + } + return removed; + } + + /** Scheduled sweep — runs every 5 min. */ + @Scheduled(fixedDelay = 5 * 60 * 1000L) + public void evictExpired() { + evictExpired(Instant.now()); } } diff --git a/mateclaw-server/src/main/java/vip/mate/wiki/service/WikiResearchService.java b/mateclaw-server/src/main/java/vip/mate/wiki/service/WikiResearchService.java index 82cb55d29..30bb1cdac 100644 --- a/mateclaw-server/src/main/java/vip/mate/wiki/service/WikiResearchService.java +++ b/mateclaw-server/src/main/java/vip/mate/wiki/service/WikiResearchService.java @@ -81,6 +81,10 @@ public ResearchResult research(Long kbId, String topic, String sessionId, Intege broadcast(sessionId, "research.plan", Map.of( "questions", questions.stream().map(q -> Map.of("question", q.question, "intent", q.intent)).toList() )); + // Cooperative cancellation: the Open API cancel endpoint calls + // streamTracker.requestStop(sessionId). Bail before the expensive + // retrieve+draft fan-out so cancel actually halts LLM cost. + ensureNotCancelled(sessionId); // Stage 2: Retrieve + Draft (并行) List
sections = draftStage(kbId, questions, topK, sessionId); @@ -88,6 +92,8 @@ public ResearchResult research(Long kbId, String topic, String sessionId, Intege broadcast(sessionId, "research.error", Map.of("message", i18n.msg("research.broadcast.draft_all_empty"))); return new ResearchResult(topic, sections, i18n.msg("research.fallback.no_materials")); } + // Second checkpoint before the compose LLM call. + ensureNotCancelled(sessionId); // Stage 3: Compose String report = composeStage(topic, sections); @@ -97,6 +103,11 @@ public ResearchResult research(Long kbId, String topic, String sessionId, Intege "materialsUsed", sections.stream().flatMap(s -> s.materialRefs.stream()).distinct().count() )); return new ResearchResult(topic, sections, report); + } catch (ResearchCancelledException ce) { + // Expected: caller has already flipped the session to CANCELLED + // and closed the SSE stream. Do not broadcast an error event. + log.info("[Research] Cancelled: kbId={}, topic={}, sessionId={}", kbId, topic, sessionId); + return new ResearchResult(topic, List.of(), "Research cancelled by user"); } catch (Exception e) { log.error("[Research] Failed: kbId={}, topic={}: {}", kbId, topic, e.getMessage(), e); broadcast(sessionId, "research.error", Map.of("message", e.getMessage() != null ? e.getMessage() : i18n.msg("research.broadcast.failed"))); @@ -104,6 +115,17 @@ public ResearchResult research(Long kbId, String topic, String sessionId, Intege } } + /** + * Throws {@link ResearchCancelledException} if the caller (via the Open API + * cancel endpoint) has called {@link ChatStreamTracker#requestStop} on this + * session. Checked at each pipeline stage boundary. + */ + private void ensureNotCancelled(String sessionId) { + if (streamTracker.isStopRequested(sessionId)) { + throw new ResearchCancelledException(sessionId); + } + } + // ==================== Stage 1: Plan ==================== private List planStage(String topic) { @@ -151,6 +173,12 @@ private List
draftStage(Long kbId, List questions, int top Thread.currentThread().interrupt(); return new Section(q.question, "", List.of()); } + // Re-check cancellation inside the parallel draft loop too — + // draftOneSection issues its own LLM call, which is the main + // cost driver, so skip queued-but-not-started drafts on cancel. + if (streamTracker.isStopRequested(sessionId)) { + return new Section(q.question, "", List.of()); + } try { Section section = draftOneSection(kbId, q, topK); broadcast(sessionId, "research.draft", Map.of( @@ -300,4 +328,16 @@ public record MaterialRef(int index, Long chunkId, Long rawId, String rawTitle) public record Section(String question, String content, List materialRefs) {} public record ResearchResult(String topic, List
sections, String report) {} + + /** + * Thrown when {@link ChatStreamTracker#requestStop(String)} was called on + * the session between pipeline stages. The {@link #research} method catches + * this to short-circuit — the caller (the Open API controller) has already + * flipped the registry to CANCELLED and closed the SSE stream. + */ + public static class ResearchCancelledException extends RuntimeException { + public ResearchCancelledException(String sessionId) { + super("Research cancelled: sessionId=" + sessionId); + } + } } diff --git a/mateclaw-server/src/test/java/vip/mate/kbopen/research/KbResearchSessionRegistryTest.java b/mateclaw-server/src/test/java/vip/mate/kbopen/research/KbResearchSessionRegistryTest.java index 2ef3b1bc4..924badb92 100644 --- a/mateclaw-server/src/test/java/vip/mate/kbopen/research/KbResearchSessionRegistryTest.java +++ b/mateclaw-server/src/test/java/vip/mate/kbopen/research/KbResearchSessionRegistryTest.java @@ -4,23 +4,34 @@ import org.junit.jupiter.api.Test; import vip.mate.kbopen.research.KbResearchSessionRegistry.Session; import vip.mate.kbopen.research.KbResearchSessionRegistry.Status; +import vip.mate.kbopen.research.KbResearchSessionRegistry.TooManyConcurrentException; import vip.mate.wiki.service.WikiResearchService.ResearchResult; +import java.time.Duration; +import java.time.Instant; import java.util.List; import java.util.Optional; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** - * Tests for {@link KbResearchSessionRegistry} — session lifecycle and status - * transitions. + * Tests for {@link KbResearchSessionRegistry} — session lifecycle, status + * transitions, sticky CANCELLED terminal, per-key concurrency cap, and TTL + * eviction. */ class KbResearchSessionRegistryTest { + private static final ResearchResult RESULT = new ResearchResult("topic", List.of(), "final report"); + + private KbResearchSessionRegistry newRegistry() { + return new KbResearchSessionRegistry(3, Duration.ofMinutes(30)); + } + @Test @DisplayName("register creates a RUNNING session") void registerCreatesRunning() { - KbResearchSessionRegistry registry = new KbResearchSessionRegistry(); + KbResearchSessionRegistry registry = newRegistry(); registry.register("s1", 100L, 10L, "test topic"); Optional session = registry.get("s1"); @@ -29,27 +40,27 @@ void registerCreatesRunning() { assertThat(session.get().keyId()).isEqualTo(100L); assertThat(session.get().kbId()).isEqualTo(10L); assertThat(session.get().topic()).isEqualTo("test topic"); + assertThat(session.get().updatedAt()).isNotNull(); } @Test @DisplayName("complete transitions RUNNING → COMPLETED with result") void completeTransitionsToCompleted() { - KbResearchSessionRegistry registry = new KbResearchSessionRegistry(); + KbResearchSessionRegistry registry = newRegistry(); registry.register("s1", 100L, 10L, "topic"); - ResearchResult result = new ResearchResult("topic", List.of(), "final report"); - registry.complete("s1", result); + registry.complete("s1", RESULT); Session session = registry.get("s1").get(); assertThat(session.status()).isEqualTo(Status.COMPLETED); - assertThat(session.result()).isEqualTo(result); + assertThat(session.result()).isEqualTo(RESULT); assertThat(session.result().report()).isEqualTo("final report"); } @Test @DisplayName("fail transitions RUNNING → FAILED with error") void failTransitionsToFailed() { - KbResearchSessionRegistry registry = new KbResearchSessionRegistry(); + KbResearchSessionRegistry registry = newRegistry(); registry.register("s1", 100L, 10L, "topic"); registry.fail("s1", "LLM timeout"); @@ -62,7 +73,7 @@ void failTransitionsToFailed() { @Test @DisplayName("cancel transitions RUNNING → CANCELLED") void cancelTransitionsToCancelled() { - KbResearchSessionRegistry registry = new KbResearchSessionRegistry(); + KbResearchSessionRegistry registry = newRegistry(); registry.register("s1", 100L, 10L, "topic"); boolean cancelled = registry.cancel("s1"); @@ -75,12 +86,13 @@ void cancelTransitionsToCancelled() { @Test @DisplayName("cancel on non-running session returns false (no-op)") void cancelOnCompletedIsNoop() { - KbResearchSessionRegistry registry = new KbResearchSessionRegistry(); + KbResearchSessionRegistry registry = newRegistry(); registry.register("s1", 100L, 10L, "topic"); - registry.complete("s1", new ResearchResult("topic", List.of(), "report")); + registry.complete("s1", RESULT); - // cancel on COMPLETED should not change status - registry.cancel("s1"); + boolean cancelled = registry.cancel("s1"); + + assertThat(cancelled).isFalse(); Session session = registry.get("s1").get(); assertThat(session.status()).isEqualTo(Status.COMPLETED); } @@ -88,7 +100,107 @@ void cancelOnCompletedIsNoop() { @Test @DisplayName("get on unknown session returns empty") void getUnknownReturnsEmpty() { - KbResearchSessionRegistry registry = new KbResearchSessionRegistry(); + KbResearchSessionRegistry registry = newRegistry(); assertThat(registry.get("nonexistent")).isEmpty(); } + + // ── Review #446: sticky CANCELLED terminal ──────────────────────────── + + @Test + @DisplayName("complete after cancel is a no-op — CANCELLED is sticky") + void completeAfterCancelIsNoop() { + KbResearchSessionRegistry registry = newRegistry(); + registry.register("s1", 100L, 10L, "topic"); + registry.cancel("s1"); + + // Late complete() arriving from the async pipeline must NOT overwrite + // the CANCELLED terminal the user explicitly requested. + registry.complete("s1", RESULT); + + Session session = registry.get("s1").get(); + assertThat(session.status()).isEqualTo(Status.CANCELLED); + assertThat(session.result()).isNull(); + } + + @Test + @DisplayName("fail after cancel is a no-op — CANCELLED is sticky") + void failAfterCancelIsNoop() { + KbResearchSessionRegistry registry = newRegistry(); + registry.register("s1", 100L, 10L, "topic"); + registry.cancel("s1"); + + registry.fail("s1", "race condition"); + + Session session = registry.get("s1").get(); + assertThat(session.status()).isEqualTo(Status.CANCELLED); + assertThat(session.error()).isNull(); + } + + // ── Review #446: per-key concurrency cap ────────────────────────────── + + @Test + @DisplayName("startIfAllowed throws once the per-key cap is reached") + void startIfAllowedEnforcesCap() { + KbResearchSessionRegistry registry = new KbResearchSessionRegistry(2, Duration.ofMinutes(30)); + registry.startIfAllowed("s1", 100L, 10L, "t1"); + registry.startIfAllowed("s2", 100L, 10L, "t2"); + + // Third running session for the same key should be rejected → 429 upstream. + assertThatThrownBy(() -> registry.startIfAllowed("s3", 100L, 10L, "t3")) + .isInstanceOf(TooManyConcurrentException.class) + .hasMessageContaining("limit is 2"); + + // A different key is unaffected (cap is per-key, not global). + registry.startIfAllowed("s4", 200L, 10L, "t4"); + assertThat(registry.get("s4")).isPresent(); + } + + @Test + @DisplayName("completed sessions do not count toward the running cap") + void completedDoesNotCountTowardCap() { + KbResearchSessionRegistry registry = new KbResearchSessionRegistry(1, Duration.ofMinutes(30)); + registry.startIfAllowed("s1", 100L, 10L, "t1"); + registry.complete("s1", RESULT); + + // The terminal session no longer occupies a slot. + registry.startIfAllowed("s2", 100L, 10L, "t2"); + assertThat(registry.get("s2")).isPresent(); + } + + // ── Review #446: TTL eviction ───────────────────────────────────────── + + @Test + @DisplayName("evictExpired removes terminal sessions past TTL but keeps RUNNING + fresh terminals") + void evictExpiredRemovesStaleTerminals() { + // Tiny TTL so a fresh terminal (updatedAt≈now) is clearly within window. + Duration ttl = Duration.ofSeconds(60); + KbResearchSessionRegistry registry = new KbResearchSessionRegistry(3, ttl); + registry.register("s1", 100L, 10L, "running"); // RUNNING — never evicted + registry.register("s2", 100L, 10L, "stale-completed"); + registry.complete("s2", RESULT); // terminal, will be aged + registry.register("s3", 100L, 10L, "fresh-cancelled"); + registry.cancel("s3"); // terminal, fresh + + // Horizon well past TTL: both terminals s2 and s3 are now stale. + Instant far = Instant.now().plus(Duration.ofSeconds(120)); + int removed = registry.evictExpired(far); + assertThat(removed).isEqualTo(2); + assertThat(registry.get("s1")).isPresent(); // running always kept + assertThat(registry.get("s2")).isEmpty(); + assertThat(registry.get("s3")).isEmpty(); + } + + @Test + @DisplayName("evictExpired keeps a fresh terminal within the TTL window") + void evictExpiredKeepsFreshTerminal() { + Duration ttl = Duration.ofHours(1); + KbResearchSessionRegistry registry = new KbResearchSessionRegistry(3, ttl); + registry.register("s1", 100L, 10L, "just-completed"); + registry.complete("s1", RESULT); // updatedAt ≈ now + + // Evict only 5 seconds later — well within the 1h TTL. + int removed = registry.evictExpired(Instant.now().plus(Duration.ofSeconds(5))); + assertThat(removed).isZero(); + assertThat(registry.get("s1")).isPresent(); + } } diff --git a/rfcs/kb-open-api-design.md b/rfcs/kb-open-api-design.md index c5cbcc147..ce22211c1 100644 --- a/rfcs/kb-open-api-design.md +++ b/rfcs/kb-open-api-design.md @@ -252,7 +252,7 @@ KbOpenApiAuthFilter (OncePerRequestFilter, 仅拦截 /api/v1/open/kb/**) | Scope | 允许的操作 | 说明 | |---|---|---| -| `kb:search` | POST `/search`、POST `/search/chunks` | 检索 | +| `kb:search` | POST `/search`、POST `/search/chunks`、POST `/research/**`(Deep Research 全套)| 检索(含异步深度研究,复用同一 scope)| | `kb:read` | GET `/pages/{slug}`、GET `/pages/{slug}/trace`、POST `/pages/{slug}/traverse` | 读主体画像 + 溯源 + 关系遍历 | | `kb:list` | GET `/pages`、GET `/taxonomy` | 列页面 + 地图 | | `kb:meta` | GET `/stats`、GET `/whats-new` | 元信息 + 时效查询 | From 61908e9d4cb0987d3830399d985fab3750a1a966 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=80=AA=E7=A8=8B=E4=BC=9F?= Date: Tue, 30 Jun 2026 11:17:11 +0800 Subject: [PATCH 3/5] fix(kb-open): scope-limited ?token= SSE auth fallback in KbOpenApiAuthFilter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit R7: the SSE progress stream (/research/{id}/stream) is consumed by browser EventSource, which cannot set an Authorization header. The filter's extractBearerToken() never read ?token= (still a TODO), so the SSE endpoint was unreachable from the browser — the headline use case got 401. Fix: accept ?token= ONLY on SSE stream paths (isSseStreamPath, suffix /stream), reject it everywhere else so the API key does not leak into access/proxy logs for normal calls (R5). Matches the JwtAuthFilter convention (getRequestURI logs carry no query string). Also bypass the per-minute rate limiter on the SSE path: EventSource reconnects/heartbeats would otherwise burn the key's window and 429 its own POST /research start. Rate limiting belongs on the cost-producing endpoints. Tests (6 new, KbOpenApiAuthFilterTest): - non-SSE: header passes, ?token= rejected (no authenticate call) - SSE: ?token= authenticates, missing token → 401 - SSE: bypasses rate limiter; non-SSE still hits it --- .../mate/kbopen/auth/KbOpenApiAuthFilter.java | 51 +++++-- .../kbopen/auth/KbOpenApiAuthFilterTest.java | 126 ++++++++++++++++++ 2 files changed, 169 insertions(+), 8 deletions(-) create mode 100644 mateclaw-server/src/test/java/vip/mate/kbopen/auth/KbOpenApiAuthFilterTest.java diff --git a/mateclaw-server/src/main/java/vip/mate/kbopen/auth/KbOpenApiAuthFilter.java b/mateclaw-server/src/main/java/vip/mate/kbopen/auth/KbOpenApiAuthFilter.java index d01777c0c..3328806b7 100644 --- a/mateclaw-server/src/main/java/vip/mate/kbopen/auth/KbOpenApiAuthFilter.java +++ b/mateclaw-server/src/main/java/vip/mate/kbopen/auth/KbOpenApiAuthFilter.java @@ -29,6 +29,15 @@ *

R2: per-key rate limiting. After successful auth, the * filter checks the sliding-window limiter. Exceeding * {@code rateLimitPerMin} returns 429. + * + *

R5 / R7: SSE token fallback is scope-limited. The + * {@code ?token=} query param is accepted only on SSE stream paths + * ({@link #isSseStreamPath}), where browser EventSource cannot set an + * Authorization header (R7). It is rejected on every other path so the API + * key never leaks into access / proxy logs for normal calls (R5). Application + * logging here uses {@code getRequestURI()} (no query string), so the key does + * not reach app logs — but a reverse proxy may still log the query string, so + * keep the fallback as narrow as possible. */ @Slf4j @Component @@ -48,9 +57,12 @@ protected void doFilterInternal(HttpServletRequest request, return; } - String token = extractBearerToken(request); + boolean sse = isSseStreamPath(request); + String token = extractToken(request, sse); if (!StringUtils.hasText(token)) { - sendUnauthorized(response, "Missing API key"); + sendUnauthorized(response, sse + ? "Missing API key (Authorization header or ?token= for EventSource)" + : "Missing API key"); return; } @@ -62,8 +74,12 @@ protected void doFilterInternal(HttpServletRequest request, KbApiKeyContext context = authResult.get().context(); - // R2: rate limit check - if (!rateLimiter.tryAcquire(context.keyId(), context.rateLimitPerMin(), Instant.now())) { + // R2: rate limit check — but NOT on the SSE stream path. EventSource + // reconnects/heartbeats would otherwise burn the per-minute window and + // can 429 the key's own POST /research start. Rate limiting belongs on + // the cost-producing endpoints (start/status/cancel), not the progress + // subscription. Per-key concurrency is still enforced upstream. + if (!sse && !rateLimiter.tryAcquire(context.keyId(), context.rateLimitPerMin(), Instant.now())) { sendTooManyRequests(response, context.rateLimitPerMin()); return; } @@ -82,14 +98,33 @@ private boolean isOpenApiPath(HttpServletRequest request) { return uri.startsWith("/api/v1/open/kb/"); } - private String extractBearerToken(HttpServletRequest request) { + /** + * SSE progress stream paths — the only place {@code ?token=} is accepted, + * because browser EventSource cannot set an Authorization header (R7). + * Matched on URI suffix + content type so the fallback tracks whichever + * endpoints expose SSE, without hard-coding a single kbId/sessionId. + */ + private boolean isSseStreamPath(HttpServletRequest request) { + String uri = request.getRequestURI(); + return uri.startsWith("/api/v1/open/kb/") && uri.endsWith("/stream"); + } + + /** + * Extract the API key. Header is always accepted; the {@code ?token=} + * query param is accepted only on SSE stream paths ({@code sse}), + * to keep the key out of access/proxy logs on every other request (R5). + */ + private String extractToken(HttpServletRequest request, boolean sse) { String bearer = request.getHeader("Authorization"); if (StringUtils.hasText(bearer) && bearer.startsWith("Bearer ")) { return bearer.substring(7).trim(); } - // TODO: add ?token= SSE fallback once Deep Research SSE endpoint is live. - // EventSource can't set custom headers; for now P0-A has no SSE path so - // query param would leak the key into access / proxy logs (R5). + if (sse) { + String queryToken = request.getParameter("token"); + if (StringUtils.hasText(queryToken)) { + return queryToken.trim(); + } + } return null; } diff --git a/mateclaw-server/src/test/java/vip/mate/kbopen/auth/KbOpenApiAuthFilterTest.java b/mateclaw-server/src/test/java/vip/mate/kbopen/auth/KbOpenApiAuthFilterTest.java new file mode 100644 index 000000000..0b47d8c26 --- /dev/null +++ b/mateclaw-server/src/test/java/vip/mate/kbopen/auth/KbOpenApiAuthFilterTest.java @@ -0,0 +1,126 @@ +package vip.mate.kbopen.auth; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.mock.web.MockFilterChain; +import org.springframework.mock.web.MockHttpServletRequest; +import org.springframework.mock.web.MockHttpServletResponse; +import vip.mate.kbopen.auth.KbApiKeyService.AuthResult; + +import java.time.LocalDateTime; +import java.util.Optional; +import java.util.Set; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Unit tests for {@link KbOpenApiAuthFilter} — focuses on the R7 SSE token + * fallback and R5 scope limitation added in #446: + *

    + *
  • {@code ?token=} is accepted on SSE stream paths (EventSource can't set + * an Authorization header).
  • + *
  • {@code ?token=} is rejected on non-SSE paths so the key doesn't leak + * into access / proxy logs.
  • + *
  • The SSE stream path bypasses the per-minute rate limiter (reconnects / + * heartbeats must not burn the key's start quota).
  • + *
+ */ +class KbOpenApiAuthFilterTest { + + private static final String KEY = "mck_abcd1234"; + private static final KbApiKeyContext CTX = + new KbApiKeyContext(7L, 1L, Set.of(10L), Set.of("kb:search"), 60); + + private KbApiKeyService keyService; + private KbApiKeyRateLimiter rateLimiter; + private KbOpenApiAuthFilter filter; + + @BeforeEach + void setUp() { + keyService = mock(KbApiKeyService.class); + rateLimiter = mock(KbApiKeyRateLimiter.class); + filter = new KbOpenApiAuthFilter(keyService, rateLimiter); + when(keyService.authenticate(KEY)).thenReturn(Optional.of(new AuthResult(CTX, LocalDateTime.now()))); + when(rateLimiter.tryAcquire(anyLong(), anyInt(), org.mockito.ArgumentMatchers.any())).thenReturn(true); + } + + private MockHttpServletRequest startRequest() { + MockHttpServletRequest req = new MockHttpServletRequest(); + req.setRequestURI("/api/v1/open/kb/10/research"); + req.setMethod("POST"); + req.addHeader("Authorization", "Bearer " + KEY); + return req; + } + + private MockHttpServletRequest sseRequest() { + MockHttpServletRequest req = new MockHttpServletRequest(); + req.setRequestURI("/api/v1/open/kb/10/research/open-research-x/stream"); + req.setMethod("GET"); + req.setQueryString("token=" + KEY); + req.addParameter("token", KEY); + return req; + } + + private int run(MockHttpServletRequest req) throws Exception { + MockHttpServletResponse res = new MockHttpServletResponse(); + filter.doFilter(req, res, new MockFilterChain()); + return res.getStatus(); + } + + @Test + @DisplayName("non-SSE path: header auth passes") + void nonSseHeaderAuth() throws Exception { + assertThat(run(startRequest())).isEqualTo(200); + } + + @Test + @DisplayName("non-SSE path: ?token= is rejected even with a valid key (R5 — no log leak)") + void nonSseQueryTokenRejected() throws Exception { + MockHttpServletRequest req = startRequest(); + req.removeHeader("Authorization"); + req.addParameter("token", KEY); + req.setQueryString("token=" + KEY); + + assertThat(run(req)).isEqualTo(401); + verify(keyService, never()).authenticate(KEY); + } + + @Test + @DisplayName("SSE path: ?token= authenticates (R7 — EventSource fallback)") + void sseQueryTokenAccepted() throws Exception { + assertThat(run(sseRequest())).isEqualTo(200); + verify(keyService).authenticate(KEY); + } + + @Test + @DisplayName("SSE path: missing token → 401") + void sseMissingToken() throws Exception { + MockHttpServletRequest req = sseRequest(); + req.removeParameter("token"); + req.setQueryString(null); + assertThat(run(req)).isEqualTo(401); + } + + @Test + @DisplayName("SSE path: bypasses the per-minute rate limiter (reconnects must not burn quota)") + void sseSkipsRateLimit() throws Exception { + run(sseRequest()); + verify(rateLimiter, never()) + .tryAcquire(anyLong(), anyInt(), org.mockito.ArgumentMatchers.any()); + } + + @Test + @DisplayName("non-SSE path: still goes through the rate limiter") + void nonSseHitsRateLimit() throws Exception { + run(startRequest()); + verify(rateLimiter) + .tryAcquire(anyLong(), anyInt(), org.mockito.ArgumentMatchers.any()); + } +} From d7b27e93cb60259a6ce4199cd3b39eedef8f5908 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=80=AA=E7=A8=8B=E4=BC=9F?= Date: Tue, 30 Jun 2026 11:22:26 +0800 Subject: [PATCH 4/5] fix(kb-open-research): make per-key concurrency cap atomic (no check-then-act race) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit startIfAllowed() did stream-and-count then put() — not atomic. Two concurrent starts for the same key could both pass the count check (both see < cap) and both put, admitting more sessions than the cap. On the virtual-thread start endpoint this is a real DoS/cost-bypass path. Fix: maintain a per-key AtomicInteger running counter (runningPerKey), incremented atomically on start (incrementAndGet + rollback on overflow) and decremented on each RUNNING→terminal transition (complete/fail/cancel). The counter is kept in lock-step with status==RUNNING; since terminal states are sticky, each session decrements exactly once. cancel() also rewritten to capture the pre-transition state cleanly (the old return check relied on Map.computeIfPresent returning the new value, which worked but read as 'before.status==CANCELLED'). Tests (+2): cancelled/failed release slot (counter consistency), and a concurrent-start test (12 virtual threads, cap=3) asserting exactly cap admits — would be flaky/fail under the old impl. --- .../research/KbResearchSessionRegistry.java | 72 +++++++++++++++---- .../KbResearchSessionRegistryTest.java | 50 +++++++++++++ 2 files changed, 110 insertions(+), 12 deletions(-) diff --git a/mateclaw-server/src/main/java/vip/mate/kbopen/research/KbResearchSessionRegistry.java b/mateclaw-server/src/main/java/vip/mate/kbopen/research/KbResearchSessionRegistry.java index 023f9b240..b87564f22 100644 --- a/mateclaw-server/src/main/java/vip/mate/kbopen/research/KbResearchSessionRegistry.java +++ b/mateclaw-server/src/main/java/vip/mate/kbopen/research/KbResearchSessionRegistry.java @@ -11,6 +11,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; /** * Tracks active Deep Research sessions started via the Open API. @@ -65,6 +66,16 @@ public static class TooManyConcurrentException extends RuntimeException { private final Map sessions = new ConcurrentHashMap<>(); + /** + * Per-key count of RUNNING sessions, kept in lock-step with the + * {@code status==RUNNING} sessions in {@link #sessions}. Maintained + * atomically so {@link #startIfAllowed} can enforce the cap without a + * check-then-act race (two concurrent starts could both pass a stream-based + * count and both put). Incremented on start, decremented on each + * RUNNING→terminal transition (complete/fail/cancel). + */ + private final Map runningPerKey = new ConcurrentHashMap<>(); + private final int maxConcurrentPerKey; private final Duration sessionTtl; @@ -78,22 +89,28 @@ public KbResearchSessionRegistry( /** * Reserve a slot for a new session, enforcing the per-key concurrency cap. * + *

Atomic: {@code incrementAndGet} + rollback on overflow, so concurrent + * starts for the same key cannot both slip past the cap. The previous + * stream-and-count impl had a check-then-act race. + * * @throws TooManyConcurrentException if {@code keyId} already has * {@code maxConcurrentPerKey} RUNNING sessions. */ public void startIfAllowed(String sessionId, Long keyId, Long kbId, String topic) { - long running = sessions.values().stream() - .filter(s -> keyId.equals(s.keyId()) && s.status() == Status.RUNNING) - .count(); - if (running >= maxConcurrentPerKey) { + AtomicInteger count = runningPerKey.computeIfAbsent(keyId, k -> new AtomicInteger()); + int now = count.incrementAndGet(); + if (now > maxConcurrentPerKey) { + count.decrementAndGet(); // rollback — slot was not granted throw new TooManyConcurrentException( - "API key already has " + running + " running research session(s); limit is " + maxConcurrentPerKey); + "API key already has " + maxConcurrentPerKey + + " running research session(s); limit is " + maxConcurrentPerKey); } sessions.put(sessionId, Session.running(sessionId, keyId, kbId, topic)); } /** Back-compat with existing callers / tests. Equivalent to {@code startIfAllowed} without the cap check. */ public void register(String sessionId, Long keyId, Long kbId, String topic) { + runningPerKey.computeIfAbsent(keyId, k -> new AtomicInteger()).incrementAndGet(); sessions.put(sessionId, Session.running(sessionId, keyId, kbId, topic)); } @@ -103,21 +120,52 @@ public Optional get(String sessionId) { /** RUNNING → COMPLETED. No-op on a session that was already CANCELLED (sticky terminal). */ public void complete(String sessionId, ResearchResult result) { - sessions.computeIfPresent(sessionId, (k, s) -> - s.status() == Status.CANCELLED ? s : s.with(Status.COMPLETED, result, null)); + sessions.computeIfPresent(sessionId, (k, s) -> { + if (s.status() == Status.CANCELLED) { + return s; // sticky terminal — no transition, no counter change + } + decrementRunning(s.keyId()); // RUNNING → COMPLETED releases the slot + return s.with(Status.COMPLETED, result, null); + }); } /** RUNNING → FAILED. No-op on a session that was already CANCELLED (sticky terminal). */ public void fail(String sessionId, String error) { - sessions.computeIfPresent(sessionId, (k, s) -> - s.status() == Status.CANCELLED ? s : s.with(Status.FAILED, null, error)); + sessions.computeIfPresent(sessionId, (k, s) -> { + if (s.status() == Status.CANCELLED) { + return s; + } + decrementRunning(s.keyId()); + return s.with(Status.FAILED, null, error); + }); } /** RUNNING → CANCELLED. Returns false if the session is missing or already terminal. */ public boolean cancel(String sessionId) { - Session before = sessions.computeIfPresent(sessionId, (k, s) -> - s.status() == Status.RUNNING ? s.with(Status.CANCELLED, null, null) : s); - return before != null && before.status() == Status.CANCELLED; + Session[] before = new Session[1]; + sessions.computeIfPresent(sessionId, (k, s) -> { + before[0] = s; + if (s.status() == Status.RUNNING) { + decrementRunning(s.keyId()); + return s.with(Status.CANCELLED, null, null); + } + return s; + }); + return before[0] != null && before[0].status() == Status.RUNNING; + } + + /** Release one running-slot for {@code keyId}, floored at 0. */ + private void decrementRunning(Long keyId) { + AtomicInteger count = runningPerKey.get(keyId); + if (count != null) { + // getAndDeccrement would go negative; clamp instead so repeated + // terminal transitions (e.g. complete after cancel) can't drift. + while (true) { + int cur = count.get(); + if (cur <= 0) break; + if (count.compareAndSet(cur, cur - 1)) break; + } + } } /** diff --git a/mateclaw-server/src/test/java/vip/mate/kbopen/research/KbResearchSessionRegistryTest.java b/mateclaw-server/src/test/java/vip/mate/kbopen/research/KbResearchSessionRegistryTest.java index 924badb92..c2c8fc145 100644 --- a/mateclaw-server/src/test/java/vip/mate/kbopen/research/KbResearchSessionRegistryTest.java +++ b/mateclaw-server/src/test/java/vip/mate/kbopen/research/KbResearchSessionRegistryTest.java @@ -167,6 +167,56 @@ void completedDoesNotCountTowardCap() { assertThat(registry.get("s2")).isPresent(); } + @Test + @DisplayName("cancelled and failed sessions also release their slot (counter consistency)") + void cancelledAndFailedReleaseSlot() { + KbResearchSessionRegistry registry = new KbResearchSessionRegistry(1, Duration.ofMinutes(30)); + registry.startIfAllowed("s1", 100L, 10L, "t1"); + registry.cancel("s1"); // RUNNING → CANCELLED releases slot + registry.startIfAllowed("s2", 100L, 10L, "t2"); + assertThat(registry.get("s2")).isPresent(); + + registry.fail("s2", "boom"); // RUNNING → FAILED releases slot + registry.startIfAllowed("s3", 100L, 10L, "t3"); + assertThat(registry.get("s3")).isPresent(); + } + + @Test + @DisplayName("concurrent starts never exceed the per-key cap (no check-then-act race)") + void startIfAllowedIsAtomicUnderConcurrency() throws Exception { + int cap = 3; + KbResearchSessionRegistry registry = new KbResearchSessionRegistry(cap, Duration.ofMinutes(30)); + int threads = cap * 4; // far more contenders than slots + java.util.concurrent.CountDownLatch start = new java.util.concurrent.CountDownLatch(1); + java.util.concurrent.atomic.AtomicInteger admitted = new java.util.concurrent.atomic.AtomicInteger(); + java.util.concurrent.atomic.AtomicInteger rejected = new java.util.concurrent.atomic.AtomicInteger(); + java.util.List workers = new java.util.ArrayList<>(); + + for (int i = 0; i < threads; i++) { + String sid = "concurrent-" + i; + Thread t = Thread.ofVirtual().start(() -> { + try { + start.await(); + registry.startIfAllowed(sid, 100L, 10L, "t"); + admitted.incrementAndGet(); + } catch (TooManyConcurrentException e) { + rejected.incrementAndGet(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + workers.add(t); + } + start.countDown(); + for (Thread t : workers) t.join(); + + // The whole point: exactly `cap` sessions get in, no matter the + // scheduling. The old stream-and-count impl could admit more under + // contention. + assertThat(admitted.get()).isEqualTo(cap); + assertThat(rejected.get()).isEqualTo(threads - cap); + } + // ── Review #446: TTL eviction ───────────────────────────────────────── @Test From 84d85bf6b96ef742db3230887b7ce554ff4871f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=80=AA=E7=A8=8B=E4=BC=9F?= Date: Tue, 30 Jun 2026 13:12:15 +0800 Subject: [PATCH 5/5] refactor(kb-open-research): remove unused register() back-compat method MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit register() was left over from the initial impl — it bypassed the per-key concurrency cap (no startIfAllowed check) and, after the atomic-counter fix, incremented runningPerKey without any overflow rollback. With no production caller (the start endpoint uses startIfAllowed), it only existed for tests to set up a RUNNING session. Drop it and route the tests through startIfAllowed so nothing can accidentally ship a path that ignores the cap. --- .../research/KbResearchSessionRegistry.java | 6 ----- .../KbResearchSessionRegistryTest.java | 24 +++++++++---------- 2 files changed, 12 insertions(+), 18 deletions(-) diff --git a/mateclaw-server/src/main/java/vip/mate/kbopen/research/KbResearchSessionRegistry.java b/mateclaw-server/src/main/java/vip/mate/kbopen/research/KbResearchSessionRegistry.java index b87564f22..130272fd5 100644 --- a/mateclaw-server/src/main/java/vip/mate/kbopen/research/KbResearchSessionRegistry.java +++ b/mateclaw-server/src/main/java/vip/mate/kbopen/research/KbResearchSessionRegistry.java @@ -108,12 +108,6 @@ public void startIfAllowed(String sessionId, Long keyId, Long kbId, String topic sessions.put(sessionId, Session.running(sessionId, keyId, kbId, topic)); } - /** Back-compat with existing callers / tests. Equivalent to {@code startIfAllowed} without the cap check. */ - public void register(String sessionId, Long keyId, Long kbId, String topic) { - runningPerKey.computeIfAbsent(keyId, k -> new AtomicInteger()).incrementAndGet(); - sessions.put(sessionId, Session.running(sessionId, keyId, kbId, topic)); - } - public Optional get(String sessionId) { return Optional.ofNullable(sessions.get(sessionId)); } diff --git a/mateclaw-server/src/test/java/vip/mate/kbopen/research/KbResearchSessionRegistryTest.java b/mateclaw-server/src/test/java/vip/mate/kbopen/research/KbResearchSessionRegistryTest.java index c2c8fc145..fd31b0363 100644 --- a/mateclaw-server/src/test/java/vip/mate/kbopen/research/KbResearchSessionRegistryTest.java +++ b/mateclaw-server/src/test/java/vip/mate/kbopen/research/KbResearchSessionRegistryTest.java @@ -29,10 +29,10 @@ private KbResearchSessionRegistry newRegistry() { } @Test - @DisplayName("register creates a RUNNING session") + @DisplayName("startIfAllowed creates a RUNNING session") void registerCreatesRunning() { KbResearchSessionRegistry registry = newRegistry(); - registry.register("s1", 100L, 10L, "test topic"); + registry.startIfAllowed("s1", 100L, 10L, "test topic"); Optional session = registry.get("s1"); assertThat(session).isPresent(); @@ -47,7 +47,7 @@ void registerCreatesRunning() { @DisplayName("complete transitions RUNNING → COMPLETED with result") void completeTransitionsToCompleted() { KbResearchSessionRegistry registry = newRegistry(); - registry.register("s1", 100L, 10L, "topic"); + registry.startIfAllowed("s1", 100L, 10L, "topic"); registry.complete("s1", RESULT); @@ -61,7 +61,7 @@ void completeTransitionsToCompleted() { @DisplayName("fail transitions RUNNING → FAILED with error") void failTransitionsToFailed() { KbResearchSessionRegistry registry = newRegistry(); - registry.register("s1", 100L, 10L, "topic"); + registry.startIfAllowed("s1", 100L, 10L, "topic"); registry.fail("s1", "LLM timeout"); @@ -74,7 +74,7 @@ void failTransitionsToFailed() { @DisplayName("cancel transitions RUNNING → CANCELLED") void cancelTransitionsToCancelled() { KbResearchSessionRegistry registry = newRegistry(); - registry.register("s1", 100L, 10L, "topic"); + registry.startIfAllowed("s1", 100L, 10L, "topic"); boolean cancelled = registry.cancel("s1"); @@ -87,7 +87,7 @@ void cancelTransitionsToCancelled() { @DisplayName("cancel on non-running session returns false (no-op)") void cancelOnCompletedIsNoop() { KbResearchSessionRegistry registry = newRegistry(); - registry.register("s1", 100L, 10L, "topic"); + registry.startIfAllowed("s1", 100L, 10L, "topic"); registry.complete("s1", RESULT); boolean cancelled = registry.cancel("s1"); @@ -110,7 +110,7 @@ void getUnknownReturnsEmpty() { @DisplayName("complete after cancel is a no-op — CANCELLED is sticky") void completeAfterCancelIsNoop() { KbResearchSessionRegistry registry = newRegistry(); - registry.register("s1", 100L, 10L, "topic"); + registry.startIfAllowed("s1", 100L, 10L, "topic"); registry.cancel("s1"); // Late complete() arriving from the async pipeline must NOT overwrite @@ -126,7 +126,7 @@ void completeAfterCancelIsNoop() { @DisplayName("fail after cancel is a no-op — CANCELLED is sticky") void failAfterCancelIsNoop() { KbResearchSessionRegistry registry = newRegistry(); - registry.register("s1", 100L, 10L, "topic"); + registry.startIfAllowed("s1", 100L, 10L, "topic"); registry.cancel("s1"); registry.fail("s1", "race condition"); @@ -225,10 +225,10 @@ void evictExpiredRemovesStaleTerminals() { // Tiny TTL so a fresh terminal (updatedAt≈now) is clearly within window. Duration ttl = Duration.ofSeconds(60); KbResearchSessionRegistry registry = new KbResearchSessionRegistry(3, ttl); - registry.register("s1", 100L, 10L, "running"); // RUNNING — never evicted - registry.register("s2", 100L, 10L, "stale-completed"); + registry.startIfAllowed("s1", 100L, 10L, "running"); // RUNNING — never evicted + registry.startIfAllowed("s2", 100L, 10L, "stale-completed"); registry.complete("s2", RESULT); // terminal, will be aged - registry.register("s3", 100L, 10L, "fresh-cancelled"); + registry.startIfAllowed("s3", 100L, 10L, "fresh-cancelled"); registry.cancel("s3"); // terminal, fresh // Horizon well past TTL: both terminals s2 and s3 are now stale. @@ -245,7 +245,7 @@ void evictExpiredRemovesStaleTerminals() { void evictExpiredKeepsFreshTerminal() { Duration ttl = Duration.ofHours(1); KbResearchSessionRegistry registry = new KbResearchSessionRegistry(3, ttl); - registry.register("s1", 100L, 10L, "just-completed"); + registry.startIfAllowed("s1", 100L, 10L, "just-completed"); registry.complete("s1", RESULT); // updatedAt ≈ now // Evict only 5 seconds later — well within the 1h TTL.