Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

Expand Down Expand Up @@ -58,10 +57,6 @@ public void execute(Runnable command) {
// so no need to declare it as type AtomicBoolean for thread safety.
final boolean[] locking = {true};
try {
if (syncRunnerCount >= maxConcurrency) throw new RejectedExecutionException("reject new task:"
+ " synchronous running task(s) (i.e. CallerRunsPolicy) already occupy"
+ " all concurrency slot(s) of " + ConcurrencyLimitExecutor.this);

queue.add(command);
if (workerCount >= maxConcurrency) return;

Expand Down Expand Up @@ -93,13 +88,19 @@ private void syncRun() {
try {
command.run();
} finally {
boolean scheduleAsyncWorker = false;
lock.lock();
try {
workerCount--;
syncRunnerCount--;
if (!queue.isEmpty() && workerCount < maxConcurrency) {
incrementWorkerCount();
scheduleAsyncWorker = true;
}
} finally {
lock.unlock();
}
if (scheduleAsyncWorker) submitAsyncWorkerAfterSyncRun();
}
}

Expand Down Expand Up @@ -154,6 +155,20 @@ private void asyncWork() {
}
}

private void submitAsyncWorkerAfterSyncRun() {
try {
executor.execute(this::asyncWork);
} catch (Throwable e) {
lock.lock();
try {
workerCount--;
} finally {
lock.unlock();
}
logUncaughtException(ERROR, super.toString() + "#submitAsyncWorkerAfterSyncRun", e);
}
}

@GuardedBy("lock")
private void incrementWorkerCount() {
workerCount++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,10 @@ class ConcurrencyLimitExecutorTest : FunSpec({
}

/**
* ❗❗ TODO: Due to the limitation in the current ConcurrencyLimitExecutor implementation,
* if all tasks execute synchronously,
* - the remaining tasks in the work queue cannot be executed!
* - the task execution is only triggered by task submission.
* Regression test: when all submitted tasks execute synchronously at the base executor,
* queued tasks must still be drained after synchronous runners complete.
*/
test("sync execution at MoreExecutors.directExecutor(), multi-threaded submission").config(enabled = false) {
test("sync execution at MoreExecutors.directExecutor(), multi-threaded submission") {
val concurrencyLimitExecutor = ConcurrencyLimitExecutor(3, MoreExecutors.directExecutor())

val concurrencyChecker = ConcurrencyChecker(3)
Expand Down