From 9a90b32965ae0117d30322e2e4c51ce8d84d2667 Mon Sep 17 00:00:00 2001 From: Jakub Zalas Date: Tue, 14 Dec 2021 21:33:29 +0000 Subject: [PATCH 1/4] Reproduce the problem with suspended overrides being delivered simultaneously Signed-off-by: Jakub Zalas --- .../ConcurrentQueueMailboxTest.java | 64 ++++++++++++++++++- 1 file changed, 63 insertions(+), 1 deletion(-) diff --git a/src/test/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailboxTest.java b/src/test/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailboxTest.java index 9094c2ef..abc2df39 100644 --- a/src/test/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailboxTest.java +++ b/src/test/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailboxTest.java @@ -15,6 +15,8 @@ import org.junit.Test; import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.function.Consumer; import static org.junit.Assert.assertEquals; @@ -63,6 +65,54 @@ public void testThatSuspendResumes(){ assertFalse(mailbox.isSuspended()); } + @Test + public void testThatMessagesAreDeliveredInOrderTheyArrived() { + + final Dispatcher dispatcher = new ExecutorDispatcher(2, 0, 1.0f); + final Mailbox mailbox = new ConcurrentQueueMailbox(dispatcher, 1); + + final TestResults testResults = new TestResults(3); + final CountTakerActor actor = new CountTakerActor(testResults); + + for (int count = 0; count < 3; ++count) { + final int countParam = count; + final SerializableConsumer consumer = (consumerActor) -> { + // Give longer Delay to messages that come first + delay(20 - (countParam * 10)); + consumerActor.take(countParam); + }; + final LocalMessage message = new LocalMessage(actor, CountTaker.class, consumer, "take(int)"); + mailbox.send(message); + } + + assertEquals(Arrays.asList(0, 1, 2), actor.testResults.getCounts()); + } + + @Test + public void testThatSuspendedOverrideMessagesAreDeliveredInOrderTheyArrived() { + + final Dispatcher dispatcher = new ExecutorDispatcher(2, 0, 1.0f); + final Mailbox mailbox = new ConcurrentQueueMailbox(dispatcher, 1); + + final TestResults testResults = new TestResults(3); + final CountTakerActor actor = new CountTakerActor(testResults); + + mailbox.suspendExceptFor("paused#", CountTaker.class); + + for (int count = 0; count < 3; ++count) { + final int countParam = count; + final SerializableConsumer consumer = (consumerActor) -> { + // Give longer Delay to messages that come first + delay(20 - (countParam * 10)); + consumerActor.take(countParam); + }; + final LocalMessage message = new LocalMessage(actor, CountTaker.class, consumer, "take(int)"); + mailbox.send(message); + } + + assertEquals(Arrays.asList(0, 1, 2), actor.testResults.getCounts()); + } + @Before @Override public void setUp() throws Exception { @@ -81,6 +131,13 @@ public void tearDown() throws Exception { dispatcher.close(); } + private void delay(final int millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + } + } + public static interface CountTaker { void take(final int count); } @@ -106,7 +163,8 @@ private TestResults(final int happenings) { this.accessSafely = AccessSafely .afterCompleting(happenings) .writingWith("counts", (Consumer) list::add) - .readingWith("counts", (Integer index)-> list.get(index)); + .readingWith("counts", (Integer index)-> list.get(index)) + .readingWith("counts", () -> list); } void addCount(Integer i){ @@ -116,5 +174,9 @@ void addCount(Integer i){ Integer getCount(int index){ return this.accessSafely.readFrom("counts", index); } + + List getCounts() { + return this.accessSafely.readFrom("counts"); + } } } From 6b7439d98c0c427ccba66d1cc0d5631fd1fd5eb7 Mon Sep 17 00:00:00 2001 From: Jakub Zalas Date: Tue, 14 Dec 2021 21:47:23 +0000 Subject: [PATCH 2/4] Ensure sequencial delivery of suspended override messages Signed-off-by: Jakub Zalas --- .../ConcurrentQueueMailbox.java | 81 +++++++++++++------ 1 file changed, 58 insertions(+), 23 deletions(-) diff --git a/src/main/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailbox.java b/src/main/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailbox.java index 1679004c..74cf50f3 100644 --- a/src/main/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailbox.java +++ b/src/main/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailbox.java @@ -12,10 +12,7 @@ import io.vlingo.xoom.actors.Message; import io.vlingo.xoom.actors.ResumingMailbox; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Queue; +import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -25,6 +22,7 @@ public class ConcurrentQueueMailbox implements Mailbox, Runnable { private AtomicBoolean delivering; private final Dispatcher dispatcher; private AtomicReference suspendedDeliveryOverrides; + private AtomicReference suspendedDeliveryQueue; private final Queue queue; private final byte throttlingCount; @@ -52,20 +50,13 @@ public void resume(final String name) { @Override public void send(final Message message) { - if (isSuspended()) { - if (suspendedDeliveryOverrides.get().matchesTop(message.protocol())) { - dispatcher.execute(new ResumingMailbox(message)); - if (!queue.isEmpty()) { - dispatcher.execute(this); - } - return; - } - queue.add(message); + if (isSuspendedExceptFor(message)) { + suspendedDeliveryQueue.get().add(message); } else { queue.add(message); - if (!isDelivering()) { - dispatcher.execute(this); - } + } + if (!isDelivering()) { + dispatcher.execute(this); } } @@ -85,6 +76,10 @@ public boolean isSuspendedFor(String name) { .find(name).isEmpty(); } + private boolean isSuspendedExceptFor(final Message override) { + return isSuspended() && suspendedDeliveryOverrides.get().matchesTop(override.protocol()); + } + @Override public Message receive() { return queue.poll(); @@ -101,17 +96,23 @@ public void run() { final int total = throttlingCount; for (int count = 0; count < total; ++count) { if (isSuspended()) { - break; - } - final Message message = receive(); - if (message != null) { - message.deliver(); + Message message = suspendedDeliveryQueue.get().poll(); + if (message != null) { + message.deliver(); + } else { + break; + } } else { - break; + final Message message = receive(); + if (message != null) { + message.deliver(); + } else { + break; + } } } delivering.set(false); - if (!queue.isEmpty()) { + if (!queue.isEmpty() || !suspendedDeliveryQueue.get().isEmpty()) { dispatcher.execute(this); } } @@ -127,6 +128,7 @@ protected ConcurrentQueueMailbox(final Dispatcher dispatcher, final int throttli this.dispatcher = dispatcher; this.delivering = new AtomicBoolean(false); this.suspendedDeliveryOverrides = new AtomicReference<>(new SuspendedDeliveryOverrides()); + this.suspendedDeliveryQueue = new AtomicReference<>(new SuspendedDeliveryQueue()); this.queue = new ConcurrentLinkedQueue(); this.throttlingCount = (byte) throttlingCount; } @@ -261,4 +263,37 @@ private static class Overrides { this.obsolete = false; } } + + private static class SuspendedDeliveryQueue { + private final AtomicBoolean accessible = new AtomicBoolean(false); + private final LinkedList queue = new LinkedList<>(); + + public void add(final Message message) { + while(true) { + if (accessible.compareAndSet(false, true)) { + queue.add(message); + accessible.set(false); + break; + } + } + + } + + public Message poll() { + while(true) { + if (accessible.compareAndSet(false, true)) { + Message message = null; + if (!queue.isEmpty()) { + message = queue.pop(); + } + accessible.set(false); + return message; + } + } + } + + public boolean isEmpty() { + return queue.isEmpty(); + } + } } From bc71e9b4889aa468f57414294386f4dbe3fd89d7 Mon Sep 17 00:00:00 2001 From: Jakub Zalas Date: Tue, 14 Dec 2021 22:50:23 +0000 Subject: [PATCH 3/4] Extract methods for readability Signed-off-by: Jakub Zalas --- .../ConcurrentQueueMailbox.java | 43 ++++++++++--------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/src/main/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailbox.java b/src/main/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailbox.java index 74cf50f3..aeb47c6b 100644 --- a/src/main/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailbox.java +++ b/src/main/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailbox.java @@ -50,11 +50,7 @@ public void resume(final String name) { @Override public void send(final Message message) { - if (isSuspendedExceptFor(message)) { - suspendedDeliveryQueue.get().add(message); - } else { - queue.add(message); - } + queue(message); if (!isDelivering()) { dispatcher.execute(this); } @@ -82,7 +78,11 @@ private boolean isSuspendedExceptFor(final Message override) { @Override public Message receive() { - return queue.poll(); + if (!isSuspended()) { + return queue.poll(); + } else { + return suspendedDeliveryQueue.get().poll(); + } } @Override @@ -95,24 +95,15 @@ public void run() { if (delivering.compareAndSet(false, true)) { final int total = throttlingCount; for (int count = 0; count < total; ++count) { - if (isSuspended()) { - Message message = suspendedDeliveryQueue.get().poll(); - if (message != null) { - message.deliver(); - } else { - break; - } + final Message message = receive(); + if (message != null) { + message.deliver(); } else { - final Message message = receive(); - if (message != null) { - message.deliver(); - } else { - break; - } + break; } } delivering.set(false); - if (!queue.isEmpty() || !suspendedDeliveryQueue.get().isEmpty()) { + if (!isQueueEmpty()) { dispatcher.execute(this); } } @@ -124,6 +115,18 @@ public int pendingMessages() { return queue.size(); } + private void queue(final Message message) { + if (isSuspendedExceptFor(message)) { + suspendedDeliveryQueue.get().add(message); + } else { + queue.add(message); + } + } + + private boolean isQueueEmpty() { + return queue.isEmpty() && suspendedDeliveryQueue.get().isEmpty(); + } + protected ConcurrentQueueMailbox(final Dispatcher dispatcher, final int throttlingCount) { this.dispatcher = dispatcher; this.delivering = new AtomicBoolean(false); From 3ebaae379358755d075c91cbe17cb8eb62f48e12 Mon Sep 17 00:00:00 2001 From: Jakub Zalas Date: Mon, 10 Jan 2022 12:58:58 +0000 Subject: [PATCH 4/4] Put any left suspended delivery messages back to the regular queue on resume Signed-off-by: Jakub Zalas --- .../vlingo/xoom/actors/ResumingMailbox.java | 51 ------------------- .../ConcurrentQueueMailbox.java | 35 +++++++++++-- .../ConcurrentQueueMailboxTest.java | 27 ++++++++++ 3 files changed, 57 insertions(+), 56 deletions(-) delete mode 100644 src/main/java/io/vlingo/xoom/actors/ResumingMailbox.java diff --git a/src/main/java/io/vlingo/xoom/actors/ResumingMailbox.java b/src/main/java/io/vlingo/xoom/actors/ResumingMailbox.java deleted file mode 100644 index 294c69d0..00000000 --- a/src/main/java/io/vlingo/xoom/actors/ResumingMailbox.java +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright © 2012-2021 VLINGO LABS. All rights reserved. -// -// This Source Code Form is subject to the terms of the -// Mozilla Public License, v. 2.0. If a copy of the MPL -// was not distributed with this file, You can obtain -// one at https://mozilla.org/MPL/2.0/. - -package io.vlingo.xoom.actors; - -public class ResumingMailbox implements Mailbox { - private final Message message; - - public ResumingMailbox(final Message message) { - this.message = message; - } - - @Override - public void run() { - message.deliver(); - } - - @Override - public void close() { } - - @Override - public boolean isClosed() { return false; } - - @Override - public boolean isDelivering() { return true; } - - @Override - public int concurrencyCapacity() { return 0; } - - @Override - public void resume(final String name) { } - - @Override - public void send(final Message message) { } - - @Override - public void suspendExceptFor(String name, Class... overrides) { } - - @Override - public boolean isSuspended() { return false; } - - @Override - public Message receive() { return null; } - - @Override - public int pendingMessages() { return 1; } -} diff --git a/src/main/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailbox.java b/src/main/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailbox.java index aeb47c6b..e7731aeb 100644 --- a/src/main/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailbox.java +++ b/src/main/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailbox.java @@ -10,25 +10,26 @@ import io.vlingo.xoom.actors.Dispatcher; import io.vlingo.xoom.actors.Mailbox; import io.vlingo.xoom.actors.Message; -import io.vlingo.xoom.actors.ResumingMailbox; import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.stream.Collectors; public class ConcurrentQueueMailbox implements Mailbox, Runnable { private AtomicBoolean delivering; private final Dispatcher dispatcher; - private AtomicReference suspendedDeliveryOverrides; - private AtomicReference suspendedDeliveryQueue; + private final AtomicReference suspendedDeliveryOverrides; + private final AtomicReference suspendedDeliveryQueue; private final Queue queue; private final byte throttlingCount; @Override public void close() { queue.clear(); + suspendedDeliveryQueue.get().clear(); } @Override @@ -44,6 +45,7 @@ public int concurrencyCapacity() { @Override public void resume(final String name) { if (suspendedDeliveryOverrides.get().pop(name)) { + suspendedDeliveryQueue.get().putBack(this::queue); dispatcher.execute(this); } } @@ -112,7 +114,7 @@ public void run() { /* @see io.vlingo.xoom.actors.Mailbox#pendingMessages() */ @Override public int pendingMessages() { - return queue.size(); + return queue.size() + suspendedDeliveryQueue.get().size(); } private void queue(final Message message) { @@ -279,7 +281,6 @@ public void add(final Message message) { break; } } - } public Message poll() { @@ -295,8 +296,32 @@ public Message poll() { } } + public void putBack(final Consumer consumer) { + while(true) { + if (accessible.compareAndSet(false, true)) { + queue.forEach(consumer); + queue.clear(); + accessible.set(false); + break; + } + } + } + + public void clear() { + while(true) { + if (accessible.compareAndSet(false, true)) { + queue.clear(); + break; + } + } + } + public boolean isEmpty() { return queue.isEmpty(); } + + public int size() { + return queue.size(); + } } } diff --git a/src/test/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailboxTest.java b/src/test/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailboxTest.java index abc2df39..037cca06 100644 --- a/src/test/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailboxTest.java +++ b/src/test/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailboxTest.java @@ -113,6 +113,33 @@ public void testThatSuspendedOverrideMessagesAreDeliveredInOrderTheyArrived() { assertEquals(Arrays.asList(0, 1, 2), actor.testResults.getCounts()); } + @Test + public void testThatSuspendedButNotHandledMessagesAreQueued() { + + final Dispatcher dispatcher = new ExecutorDispatcher(2, 0, 1.0f); + final Mailbox mailbox = new ConcurrentQueueMailbox(dispatcher, 1); + + final TestResults testResults = new TestResults(3); + final CountTakerActor actor = new CountTakerActor(testResults); + + mailbox.suspendExceptFor("paused#", CountTaker.class); + + for (int count = 0; count < 3; ++count) { + final int countParam = count; + final SerializableConsumer consumer = (consumerActor) -> { + // Give longer Delay to messages that come first + delay(20 - (countParam * 10)); + consumerActor.take(countParam); + }; + final LocalMessage message = new LocalMessage(actor, CountTaker.class, consumer, "take(int)"); + mailbox.send(message); + } + + mailbox.resume("paused#"); + + assertEquals(Arrays.asList(0, 1, 2), actor.testResults.getCounts()); + } + @Before @Override public void setUp() throws Exception {