From 92551e8c8ad09b3bcfec88668a848c0ac74ae7ef Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Wed, 11 Feb 2026 18:18:42 +0000 Subject: [PATCH] fix: drain queued tasks after synchronous executor runs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: 李鼎 --- .../cffu2/ConcurrencyLimitExecutor.java | 25 +++++++++++++++---- .../cffu2/ConcurrencyLimitExecutorTest.kt | 8 +++--- 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/cffu-core/src/main/java/io/foldright/cffu2/ConcurrencyLimitExecutor.java b/cffu-core/src/main/java/io/foldright/cffu2/ConcurrencyLimitExecutor.java index 810c52c9..981cdb70 100644 --- a/cffu-core/src/main/java/io/foldright/cffu2/ConcurrencyLimitExecutor.java +++ b/cffu-core/src/main/java/io/foldright/cffu2/ConcurrencyLimitExecutor.java @@ -7,7 +7,6 @@ import java.util.ArrayDeque; import java.util.Deque; import java.util.concurrent.Executor; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -58,10 +57,6 @@ public void execute(Runnable command) { // so no need to declare it as type AtomicBoolean for thread safety. final boolean[] locking = {true}; try { - if (syncRunnerCount >= maxConcurrency) throw new RejectedExecutionException("reject new task:" - + " synchronous running task(s) (i.e. CallerRunsPolicy) already occupy" - + " all concurrency slot(s) of " + ConcurrencyLimitExecutor.this); - queue.add(command); if (workerCount >= maxConcurrency) return; @@ -93,13 +88,19 @@ private void syncRun() { try { command.run(); } finally { + boolean scheduleAsyncWorker = false; lock.lock(); try { workerCount--; syncRunnerCount--; + if (!queue.isEmpty() && workerCount < maxConcurrency) { + incrementWorkerCount(); + scheduleAsyncWorker = true; + } } finally { lock.unlock(); } + if (scheduleAsyncWorker) submitAsyncWorkerAfterSyncRun(); } } @@ -154,6 +155,20 @@ private void asyncWork() { } } + private void submitAsyncWorkerAfterSyncRun() { + try { + executor.execute(this::asyncWork); + } catch (Throwable e) { + lock.lock(); + try { + workerCount--; + } finally { + lock.unlock(); + } + logUncaughtException(ERROR, super.toString() + "#submitAsyncWorkerAfterSyncRun", e); + } + } + @GuardedBy("lock") private void incrementWorkerCount() { workerCount++; diff --git a/cffu-core/src/test/java/io/foldright/cffu2/ConcurrencyLimitExecutorTest.kt b/cffu-core/src/test/java/io/foldright/cffu2/ConcurrencyLimitExecutorTest.kt index b0b1f909..c3a40d51 100644 --- a/cffu-core/src/test/java/io/foldright/cffu2/ConcurrencyLimitExecutorTest.kt +++ b/cffu-core/src/test/java/io/foldright/cffu2/ConcurrencyLimitExecutorTest.kt @@ -157,12 +157,10 @@ class ConcurrencyLimitExecutorTest : FunSpec({ } /** - * ❗❗ TODO: Due to the limitation in the current ConcurrencyLimitExecutor implementation, - * if all tasks execute synchronously, - * - the remaining tasks in the work queue cannot be executed! - * - the task execution is only triggered by task submission. + * Regression test: when all submitted tasks execute synchronously at the base executor, + * queued tasks must still be drained after synchronous runners complete. */ - test("sync execution at MoreExecutors.directExecutor(), multi-threaded submission").config(enabled = false) { + test("sync execution at MoreExecutors.directExecutor(), multi-threaded submission") { val concurrencyLimitExecutor = ConcurrencyLimitExecutor(3, MoreExecutors.directExecutor()) val concurrencyChecker = ConcurrencyChecker(3)