From 08e3f3359dcb3ee7fb40badde3220cd697fcf30a Mon Sep 17 00:00:00 2001 From: Philippe Leduc Date: Mon, 22 Jun 2026 10:22:01 +0200 Subject: [PATCH] Bound publisher EAGAIN retry: stress harness no longer hangs on pool exhaustion A tight pool can stay exhausted permanently -- slots recycle only on eviction or teardown, never on consume, so once every publisher is blocked in allocate() nothing frees a slot. The stress publishers retried -EAGAIN forever, so pool_exhaustion / tiny-pool mpmc could deadlock? --- tests/stress/common.cc | 395 +++++++++++++++++++++++++++++++ tests/stress/common.h | 403 +++++--------------------------- tests/stress/fairness.cc | 17 +- tests/stress/mpmc.cc | 14 +- tests/stress/pool_exhaustion.cc | 28 +-- tests/unit/publisher-t.cc | 45 ++++ 6 files changed, 539 insertions(+), 363 deletions(-) diff --git a/tests/stress/common.cc b/tests/stress/common.cc index 8a10606..fa9e234 100644 --- a/tests/stress/common.cc +++ b/tests/stress/common.cc @@ -13,3 +13,398 @@ uint16_t contention_count() uint32_t per_side = std::max(2, (total + 1) / 2); return static_cast(std::min(per_side, UINT16_MAX)); } + +std::atomic g_all_publishers_done{false}; + +// Readiness barrier: subscribers signal after construction, publishers wait. +// Every ring is then Live for the whole run, making per-subscriber +// accounting exact. Scenarios must reset both before spawning threads. +std::atomic g_subscribers_ready{0}; +int g_subscribers_expected{0}; + +void wait_subscribers_ready() +{ + while (g_subscribers_ready.load(std::memory_order_acquire) < g_subscribers_expected) + { + kickmsg::yield(); + } +} + +// Messages a send_bounded() actually committed, and publishers that gave up. +// Conservation oracles use g_published (not the nominal count) because a +// publisher may stop early under sustained backpressure. Reset per scenario. +std::atomic g_published{0}; +std::atomic g_publisher_giveups{0}; + +// A tight pool can stay exhausted indefinitely: slots recycle only on +// eviction or teardown, never on consume, so once every publisher is blocked +// in allocate() nothing can free a slot. A publisher must therefore bound +// its EAGAIN retry rather than spin forever. Returns false if the pool +// stayed full past the deadline; the caller stops and books a giveup. +// 5 s of unbroken EAGAIN never happens on a healthy run (sends clear in +// microseconds) -- only a genuine deadlock reaches it. Override via +// KICKMSG_SEND_GIVEUP_MS (the test suite uses a small value to exercise the +// giveup path deterministically). +milliseconds send_giveup_deadline() +{ + static milliseconds const deadline = []() -> milliseconds + { + char const* env = std::getenv("KICKMSG_SEND_GIVEUP_MS"); + if (env != nullptr and *env != '\0') + { + long v = std::atol(env); + if (v > 0) + { + return milliseconds{v}; + } + } + return 5s; + }(); + return deadline; +} + +bool send_bounded(kickmsg::Publisher& pub, Payload const& msg, int pub_id) +{ + nanoseconds start = kickmsg::monotonic_ns(); + int32_t rc; + while ((rc = pub.send(&msg, sizeof(msg))) < 0) + { + if (rc != -EAGAIN) + { + std::fprintf(stderr, " [FATAL] publisher %d: send() returned %d\n", pub_id, rc); + std::abort(); + } + if (kickmsg::elapsed_time(start) >= send_giveup_deadline()) + { + g_publisher_giveups.fetch_add(1, std::memory_order_relaxed); + return false; + } + kickmsg::sleep(0ns); + } + g_published.fetch_add(1, std::memory_order_relaxed); + return true; +} + +void publisher_thread(kickmsg::SharedRegion& region, int pub_id, uint32_t count) +{ + kickmsg::Publisher pub{region}; + + wait_subscribers_ready(); + + for (uint32_t i = 0; i < count; ++i) + { + Payload msg; + msg.magic = Payload::MAGIC; + msg.pub_id = static_cast(pub_id); + msg.seq = i; + msg.checksum = compute_checksum(msg); + + if (not send_bounded(pub, msg, pub_id)) + { + break; + } + } +} + +void validate_payload(Payload const& msg, int num_pubs, uint64_t ring_pos, + std::vector& last_seq, + std::vector& last_pos, + SubResult& result, + MsgTrace* trace, std::size_t& trace_pos) +{ + if (msg.magic != Payload::MAGIC) + { + ++result.corrupted; + return; + } + if (msg.checksum != compute_checksum(msg)) + { + ++result.corrupted; + return; + } + if (msg.pub_id >= static_cast(num_pubs)) + { + ++result.bad_pub_id; + return; + } + + trace[trace_pos % TRACE_SIZE] = {msg.pub_id, msg.seq, ring_pos}; + ++trace_pos; + + auto& prev_seq = last_seq[msg.pub_id]; + auto& prev_pos = last_pos[msg.pub_id]; + + if (prev_seq != UINT32_MAX and msg.seq <= prev_seq) + { + auto delta = static_cast(prev_seq) - static_cast(msg.seq); + std::fprintf(stderr, " [REORDER] sub%d: pub %u seq %u @pos %" PRIu64 + " after prev seq %u @pos %" PRIu64 + " (delta=%d, lost=%" PRIu64 ", recv=%" PRIu64 ")\n", + result.sub_id, msg.pub_id, msg.seq, ring_pos, + prev_seq, prev_pos, + delta, result.lost, result.received); + + if (result.reordered == 0) + { + std::fprintf(stderr, " Recent messages (oldest first):\n"); + std::size_t start = 0; + if (trace_pos > TRACE_SIZE) + { + start = trace_pos - TRACE_SIZE; + } + for (std::size_t i = start; i < trace_pos; ++i) + { + auto& t = trace[i % TRACE_SIZE]; + char const* marker = ""; + if (t.pub_id == msg.pub_id) + { + marker = " <--"; + } + std::fprintf(stderr, " [%zu] pub %u seq %u @pos %" PRIu64 "%s\n", + i, t.pub_id, t.seq, t.ring_pos, marker); + } + } + + ++result.reordered; + return; + } + prev_seq = msg.seq; + prev_pos = ring_pos; + + ++result.received; +} + +SubResult subscriber_thread_copy(kickmsg::SharedRegion& region, int sub_id, + int num_pubs, uint32_t /*msgs_per_pub*/) +{ + kickmsg::Subscriber sub{region}; + g_subscribers_ready.fetch_add(1, std::memory_order_release); + + SubResult result{}; + result.sub_id = sub_id; + + std::vector last_seq(static_cast(num_pubs), UINT32_MAX); + std::vector last_pos(static_cast(num_pubs), UINT64_MAX); + MsgTrace trace[TRACE_SIZE]{}; + std::size_t trace_pos = 0; + + auto const timeout = milliseconds{500}; + + while (true) + { + auto sample = sub.receive(timeout); + if (not sample) + { + if (g_all_publishers_done) + { + // Null can mean retry budget burned on evicted entries; + // only null with no lost() progress proves ring-empty. + uint64_t lost_before = sub.lost(); + sample = sub.try_receive(); + if (not sample) + { + if (sub.lost() == lost_before) + { + break; + } + continue; + } + } + else + { + continue; + } + } + + if (sample->len() != sizeof(Payload)) + { + ++result.corrupted; + continue; + } + + Payload msg; + std::memcpy(&msg, sample->data(), sizeof(msg)); + validate_payload(msg, num_pubs, sample->ring_pos(), + last_seq, last_pos, result, trace, trace_pos); + } + + result.lost = sub.lost(); + return result; +} + +SubResult subscriber_thread_zerocopy(kickmsg::SharedRegion& region, int sub_id, + int num_pubs, uint32_t /*msgs_per_pub*/) +{ + kickmsg::Subscriber sub{region}; + g_subscribers_ready.fetch_add(1, std::memory_order_release); + + SubResult result{}; + result.sub_id = sub_id; + + std::vector last_seq(static_cast(num_pubs), UINT32_MAX); + std::vector last_pos(static_cast(num_pubs), UINT64_MAX); + MsgTrace trace[TRACE_SIZE]{}; + std::size_t trace_pos = 0; + + auto const timeout = milliseconds{500}; + + while (true) + { + auto view = sub.receive_view(timeout); + if (not view) + { + if (g_all_publishers_done) + { + // Same caveat as the copy path. + uint64_t lost_before = sub.lost(); + view = sub.try_receive_view(); + if (not view) + { + if (sub.lost() == lost_before) + { + break; + } + continue; + } + } + else + { + continue; + } + } + + if (view->len() != sizeof(Payload)) + { + ++result.corrupted; + continue; + } + + Payload msg; + std::memcpy(&msg, view->data(), sizeof(msg)); + validate_payload(msg, num_pubs, view->ring_pos(), + last_seq, last_pos, result, trace, trace_pos); + } + + result.lost = sub.lost(); + return result; +} + +// No-crash oracle: GC work not explained by observed publisher drops means +// the normal path leaked and GC masked it -- fail. Each stall-steal leaks +// at most one slot ref and books at least one drop, so reclaimed <= drops +// is the exact budget. +bool verify_gc_zero(kickmsg::SharedRegion& region, kickmsg::channel::Config const& cfg) +{ + bool ok = true; + + uint64_t dropped = 0; + for (uint32_t i = 0; i < cfg.max_subscribers; ++i) + { + auto* ring = kickmsg::sub_ring_at(region.base(), region.header(), i); + dropped += ring->dropped_count.load(std::memory_order_acquire); + } + + std::size_t repaired = region.repair_locked_entries(); + if (repaired != 0) + { + std::fprintf(stderr, " [FAIL] repair_locked_entries fixed %zu entries on a no-crash run\n", + repaired); + ok = false; + } + + std::size_t reclaimed = region.reclaim_orphaned_slots(); + if (reclaimed > dropped) + { + std::fprintf(stderr, " [FAIL] reclaim_orphaned_slots recovered %zu slots on a no-crash run " + "(only %" PRIu64 " publisher drops can account for steal residue)\n", + reclaimed, dropped); + ok = false; + } + else if (reclaimed != 0) + { + std::printf(" [NOTE] %zu slot(s) reclaimed, within the %" PRIu64 + "-drop steal budget (stalled-publisher steal residue)\n", + reclaimed, dropped); + } + + return ok; +} + +bool verify_pool_free(kickmsg::SharedRegion& region, kickmsg::channel::Config const& cfg) +{ + auto* base = region.base(); + auto* hdr = region.header(); + + std::vector seen(cfg.pool_size, false); + uint32_t count = 0; + uint32_t top = kickmsg::tagged_idx(hdr->free_top.load(std::memory_order_acquire)); + + while (top != kickmsg::INVALID_SLOT) + { + if (top >= cfg.pool_size) + { + std::fprintf(stderr, " [FAIL] free stack contains out-of-range index %u\n", top); + return false; + } + if (seen[top]) + { + std::fprintf(stderr, " [FAIL] free stack contains duplicate slot %u\n", top); + return false; + } + seen[top] = true; + ++count; + + auto* slot = kickmsg::slot_at(base, hdr, top); + top = slot->next_free; + } + + if (count != cfg.pool_size) + { + std::fprintf(stderr, " [FAIL] free stack has %u slots, expected %zu (leak!)\n", + count, cfg.pool_size); + return false; + } + return true; +} + +bool verify_rings_inactive(kickmsg::SharedRegion& region, kickmsg::channel::Config const& cfg) +{ + auto* base = region.base(); + auto* hdr = region.header(); + + for (uint32_t i = 0; i < cfg.max_subscribers; ++i) + { + auto* ring = kickmsg::sub_ring_at(base, hdr, i); + uint32_t packed = ring->state_flight.load(std::memory_order_acquire); + if (kickmsg::ring::get_state(packed) != kickmsg::ring::Free) + { + std::fprintf(stderr, " [FAIL] ring %u not Free after test\n", i); + return false; + } + if (kickmsg::ring::get_in_flight(packed) != 0) + { + std::fprintf(stderr, " [FAIL] ring %u has in_flight=%u after test\n", + i, kickmsg::ring::get_in_flight(packed)); + return false; + } + } + return true; +} + +bool verify_refcounts_zero(kickmsg::SharedRegion& region, kickmsg::channel::Config const& cfg) +{ + auto* base = region.base(); + auto* hdr = region.header(); + + for (uint32_t i = 0; i < cfg.pool_size; ++i) + { + auto* slot = kickmsg::slot_at(base, hdr, i); + uint32_t rc = slot->refcount; + if (rc != 0) + { + std::fprintf(stderr, " [FAIL] slot %u has refcount %u (expected 0)\n", i, rc); + return false; + } + } + return true; +} diff --git a/tests/stress/common.h b/tests/stress/common.h index 8211ba9..83f133b 100644 --- a/tests/stress/common.h +++ b/tests/stress/common.h @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -87,351 +88,63 @@ struct MsgTrace static constexpr std::size_t TRACE_SIZE = 16; -inline std::atomic g_all_publishers_done{false}; - -// Readiness barrier: subscribers signal after construction, publishers wait. -// Every ring is then Live for the whole run, making per-subscriber -// accounting exact. Scenarios must reset both before spawning threads. -inline std::atomic g_subscribers_ready{0}; -inline int g_subscribers_expected{0}; - -inline void wait_subscribers_ready() -{ - while (g_subscribers_ready.load(std::memory_order_acquire) < g_subscribers_expected) - { - kickmsg::yield(); - } -} - -inline void publisher_thread(kickmsg::SharedRegion& region, int pub_id, uint32_t count) -{ - kickmsg::Publisher pub{region}; - - wait_subscribers_ready(); - - for (uint32_t i = 0; i < count; ++i) - { - Payload msg; - msg.magic = Payload::MAGIC; - msg.pub_id = static_cast(pub_id); - msg.seq = i; - msg.checksum = compute_checksum(msg); - - int32_t rc; - while ((rc = pub.send(&msg, sizeof(msg))) < 0) - { - if (rc != -EAGAIN) - { - std::fprintf(stderr, " [FATAL] publisher %d: send() returned %d\n", pub_id, rc); - std::abort(); - } - kickmsg::sleep(0ns); - } - } -} - -inline void validate_payload(Payload const& msg, int num_pubs, uint64_t ring_pos, - std::vector& last_seq, - std::vector& last_pos, - SubResult& result, - MsgTrace* trace, std::size_t& trace_pos) -{ - if (msg.magic != Payload::MAGIC) - { - ++result.corrupted; - return; - } - if (msg.checksum != compute_checksum(msg)) - { - ++result.corrupted; - return; - } - if (msg.pub_id >= static_cast(num_pubs)) - { - ++result.bad_pub_id; - return; - } - - trace[trace_pos % TRACE_SIZE] = {msg.pub_id, msg.seq, ring_pos}; - ++trace_pos; - - auto& prev_seq = last_seq[msg.pub_id]; - auto& prev_pos = last_pos[msg.pub_id]; - - if (prev_seq != UINT32_MAX and msg.seq <= prev_seq) - { - auto delta = static_cast(prev_seq) - static_cast(msg.seq); - std::fprintf(stderr, " [REORDER] sub%d: pub %u seq %u @pos %" PRIu64 - " after prev seq %u @pos %" PRIu64 - " (delta=%d, lost=%" PRIu64 ", recv=%" PRIu64 ")\n", - result.sub_id, msg.pub_id, msg.seq, ring_pos, - prev_seq, prev_pos, - delta, result.lost, result.received); - - if (result.reordered == 0) - { - std::fprintf(stderr, " Recent messages (oldest first):\n"); - std::size_t start = 0; - if (trace_pos > TRACE_SIZE) - { - start = trace_pos - TRACE_SIZE; - } - for (std::size_t i = start; i < trace_pos; ++i) - { - auto& t = trace[i % TRACE_SIZE]; - char const* marker = ""; - if (t.pub_id == msg.pub_id) - { - marker = " <--"; - } - std::fprintf(stderr, " [%zu] pub %u seq %u @pos %" PRIu64 "%s\n", - i, t.pub_id, t.seq, t.ring_pos, marker); - } - } - - ++result.reordered; - return; - } - prev_seq = msg.seq; - prev_pos = ring_pos; - - ++result.received; -} - -inline SubResult subscriber_thread_copy(kickmsg::SharedRegion& region, int sub_id, - int num_pubs, uint32_t /*msgs_per_pub*/) -{ - kickmsg::Subscriber sub{region}; - g_subscribers_ready.fetch_add(1, std::memory_order_release); - - SubResult result{}; - result.sub_id = sub_id; - - std::vector last_seq(static_cast(num_pubs), UINT32_MAX); - std::vector last_pos(static_cast(num_pubs), UINT64_MAX); - MsgTrace trace[TRACE_SIZE]{}; - std::size_t trace_pos = 0; - - auto const timeout = milliseconds{500}; - - while (true) - { - auto sample = sub.receive(timeout); - if (not sample) - { - if (g_all_publishers_done) - { - // Null can mean retry budget burned on evicted entries; - // only null with no lost() progress proves ring-empty. - uint64_t lost_before = sub.lost(); - sample = sub.try_receive(); - if (not sample) - { - if (sub.lost() == lost_before) - { - break; - } - continue; - } - } - else - { - continue; - } - } - - if (sample->len() != sizeof(Payload)) - { - ++result.corrupted; - continue; - } - - Payload msg; - std::memcpy(&msg, sample->data(), sizeof(msg)); - validate_payload(msg, num_pubs, sample->ring_pos(), - last_seq, last_pos, result, trace, trace_pos); - } - - result.lost = sub.lost(); - return result; -} - -inline SubResult subscriber_thread_zerocopy(kickmsg::SharedRegion& region, int sub_id, - int num_pubs, uint32_t /*msgs_per_pub*/) -{ - kickmsg::Subscriber sub{region}; - g_subscribers_ready.fetch_add(1, std::memory_order_release); - - SubResult result{}; - result.sub_id = sub_id; - - std::vector last_seq(static_cast(num_pubs), UINT32_MAX); - std::vector last_pos(static_cast(num_pubs), UINT64_MAX); - MsgTrace trace[TRACE_SIZE]{}; - std::size_t trace_pos = 0; - - auto const timeout = milliseconds{500}; - - while (true) - { - auto view = sub.receive_view(timeout); - if (not view) - { - if (g_all_publishers_done) - { - // Same caveat as the copy path. - uint64_t lost_before = sub.lost(); - view = sub.try_receive_view(); - if (not view) - { - if (sub.lost() == lost_before) - { - break; - } - continue; - } - } - else - { - continue; - } - } - - if (view->len() != sizeof(Payload)) - { - ++result.corrupted; - continue; - } - - Payload msg; - std::memcpy(&msg, view->data(), sizeof(msg)); - validate_payload(msg, num_pubs, view->ring_pos(), - last_seq, last_pos, result, trace, trace_pos); - } - - result.lost = sub.lost(); - return result; -} - -// No-crash oracle: GC work not explained by observed publisher drops means -// the normal path leaked and GC masked it -- fail. Each stall-steal leaks -// at most one slot ref and books at least one drop, so reclaimed <= drops -// is the exact budget. -inline bool verify_gc_zero(kickmsg::SharedRegion& region, kickmsg::channel::Config const& cfg) -{ - bool ok = true; - - uint64_t dropped = 0; - for (uint32_t i = 0; i < cfg.max_subscribers; ++i) - { - auto* ring = kickmsg::sub_ring_at(region.base(), region.header(), i); - dropped += ring->dropped_count.load(std::memory_order_acquire); - } - - std::size_t repaired = region.repair_locked_entries(); - if (repaired != 0) - { - std::fprintf(stderr, " [FAIL] repair_locked_entries fixed %zu entries on a no-crash run\n", - repaired); - ok = false; - } - - std::size_t reclaimed = region.reclaim_orphaned_slots(); - if (reclaimed > dropped) - { - std::fprintf(stderr, " [FAIL] reclaim_orphaned_slots recovered %zu slots on a no-crash run " - "(only %" PRIu64 " publisher drops can account for steal residue)\n", - reclaimed, dropped); - ok = false; - } - else if (reclaimed != 0) - { - std::printf(" [NOTE] %zu slot(s) reclaimed, within the %" PRIu64 - "-drop steal budget (stalled-publisher steal residue)\n", - reclaimed, dropped); - } - - return ok; -} - -inline bool verify_pool_free(kickmsg::SharedRegion& region, kickmsg::channel::Config const& cfg) -{ - auto* base = region.base(); - auto* hdr = region.header(); - - std::vector seen(cfg.pool_size, false); - uint32_t count = 0; - uint32_t top = kickmsg::tagged_idx(hdr->free_top.load(std::memory_order_acquire)); - - while (top != kickmsg::INVALID_SLOT) - { - if (top >= cfg.pool_size) - { - std::fprintf(stderr, " [FAIL] free stack contains out-of-range index %u\n", top); - return false; - } - if (seen[top]) - { - std::fprintf(stderr, " [FAIL] free stack contains duplicate slot %u\n", top); - return false; - } - seen[top] = true; - ++count; - - auto* slot = kickmsg::slot_at(base, hdr, top); - top = slot->next_free; - } - - if (count != cfg.pool_size) - { - std::fprintf(stderr, " [FAIL] free stack has %u slots, expected %zu (leak!)\n", - count, cfg.pool_size); - return false; - } - return true; -} - -inline bool verify_rings_inactive(kickmsg::SharedRegion& region, kickmsg::channel::Config const& cfg) -{ - auto* base = region.base(); - auto* hdr = region.header(); - - for (uint32_t i = 0; i < cfg.max_subscribers; ++i) - { - auto* ring = kickmsg::sub_ring_at(base, hdr, i); - uint32_t packed = ring->state_flight.load(std::memory_order_acquire); - if (kickmsg::ring::get_state(packed) != kickmsg::ring::Free) - { - std::fprintf(stderr, " [FAIL] ring %u not Free after test\n", i); - return false; - } - if (kickmsg::ring::get_in_flight(packed) != 0) - { - std::fprintf(stderr, " [FAIL] ring %u has in_flight=%u after test\n", - i, kickmsg::ring::get_in_flight(packed)); - return false; - } - } - return true; -} - -inline bool verify_refcounts_zero(kickmsg::SharedRegion& region, kickmsg::channel::Config const& cfg) -{ - auto* base = region.base(); - auto* hdr = region.header(); - - for (uint32_t i = 0; i < cfg.pool_size; ++i) - { - auto* slot = kickmsg::slot_at(base, hdr, i); - uint32_t rc = slot->refcount; - if (rc != 0) - { - std::fprintf(stderr, " [FAIL] slot %u has refcount %u (expected 0)\n", i, rc); - return false; - } - } - return true; -} +// ---- Shared scenario state + helpers (defined in common.cc) ---- +// Scenarios reset the globals before spawning threads. + +extern std::atomic g_all_publishers_done; + +// Readiness barrier: subscribers signal after construction, publishers wait, +// so every ring is Live for the whole run and per-subscriber accounting is exact. +extern std::atomic g_subscribers_ready; +extern int g_subscribers_expected; + +// Messages send_bounded() actually committed, and publishers that gave up. +// Conservation oracles use g_published (not the nominal count) because a +// publisher may stop early under sustained backpressure. +extern std::atomic g_published; +extern std::atomic g_publisher_giveups; + +void wait_subscribers_ready(); + +// EAGAIN retry deadline for send_bounded; 5 s default, override via +// KICKMSG_SEND_GIVEUP_MS (the suite uses a small value to exercise the giveup +// path deterministically). +milliseconds send_giveup_deadline(); + +// A tight pool can stay exhausted forever: slots recycle only on eviction or +// teardown, never on consume, so once every publisher blocks in allocate() +// nothing frees a slot. send_bounded caps the EAGAIN retry rather than +// spinning: returns false (booking a giveup) if the pool stayed full past +// send_giveup_deadline(), true after a committed send. +bool send_bounded(kickmsg::Publisher& pub, Payload const& msg, int pub_id); + +// Publish `count` checksummed messages, stopping early if send_bounded gives up. +void publisher_thread(kickmsg::SharedRegion& region, int pub_id, uint32_t count); + +// Validate one received message against per-publisher sequence monotonicity +// and its checksum, updating SubResult counters and the reorder trace. +void validate_payload(Payload const& msg, int num_pubs, uint64_t ring_pos, + std::vector& last_seq, + std::vector& last_pos, + SubResult& result, + MsgTrace* trace, std::size_t& trace_pos); + +// Copy / zero-copy subscriber loops: consume until publishers are done and the +// ring is drained, validating every sample. +SubResult subscriber_thread_copy(kickmsg::SharedRegion& region, int sub_id, + int num_pubs, uint32_t msgs_per_pub); +SubResult subscriber_thread_zerocopy(kickmsg::SharedRegion& region, int sub_id, + int num_pubs, uint32_t msgs_per_pub); + +// No-crash GC oracle: repair must fix nothing and reclaim must stay within the +// observed publisher-drop budget, else the normal path leaked and GC masked it. +bool verify_gc_zero(kickmsg::SharedRegion& region, kickmsg::channel::Config const& cfg); + +// Post-run structural checks: full free stack (no leak / dup / range error), +// every ring Free with in_flight 0, every slot refcount 0. +bool verify_pool_free(kickmsg::SharedRegion& region, kickmsg::channel::Config const& cfg); +bool verify_rings_inactive(kickmsg::SharedRegion& region, kickmsg::channel::Config const& cfg); +bool verify_refcounts_zero(kickmsg::SharedRegion& region, kickmsg::channel::Config const& cfg); struct TestRunner { diff --git a/tests/stress/fairness.cc b/tests/stress/fairness.cc index 755b4dc..3fbfe29 100644 --- a/tests/stress/fairness.cc +++ b/tests/stress/fairness.cc @@ -26,6 +26,8 @@ bool run_fairness_test() g_subscribers_ready = 0; g_subscribers_expected = NUM_SUBS; + g_published = 0; + g_publisher_giveups = 0; std::vector results(NUM_SUBS); std::vector sub_threads; @@ -52,6 +54,15 @@ bool run_fairness_test() uint64_t min_recv = UINT64_MAX; uint64_t max_recv = 0; + // One publisher into a 512-slot pool has ample headroom -- a giveup here + // would be a real backpressure regression, not legitimate exhaustion. + uint64_t published = g_published.load(std::memory_order_relaxed); + if (g_publisher_giveups.load(std::memory_order_relaxed) != 0) + { + std::fprintf(stderr, " [FAIL] publisher gave up despite ample pool headroom\n"); + ok = false; + } + for (auto const& r : results) { min_recv = std::min(min_recv, r.received); @@ -68,11 +79,11 @@ bool run_fairness_test() // Exact conservation: the readiness barrier means every ring is Live // before the first send, and subscribers drain to completion. uint64_t accounted = r.received + r.lost + r.corrupted + r.bad_pub_id + r.reordered; - if (accounted != NUM_MSGS) + if (accounted != published) { std::fprintf(stderr, " [FAIL] sub%d: received+lost+corrupt+bad_pid+reorder (%" PRIu64 - ") != total_sent (%u)!\n", - r.sub_id, accounted, NUM_MSGS); + ") != published (%" PRIu64 ")!\n", + r.sub_id, accounted, published); ok = false; } } diff --git a/tests/stress/mpmc.cc b/tests/stress/mpmc.cc index 912df4d..83b2a84 100644 --- a/tests/stress/mpmc.cc +++ b/tests/stress/mpmc.cc @@ -15,6 +15,8 @@ bool run_stress_test(TestConfig const& tc) g_all_publishers_done = false; g_subscribers_ready = 0; g_subscribers_expected = tc.num_subscribers; + g_published = 0; + g_publisher_giveups = 0; kickmsg::channel::Config cfg; cfg.max_subscribers = tc.max_subs; @@ -66,7 +68,12 @@ bool run_stress_test(TestConfig const& tc) nanoseconds t1 = kickmsg::monotonic_ns(); int64_t elapsed_ms = std::chrono::duration_cast(t1 - t0).count(); - uint64_t total_sent = static_cast(tc.num_publishers) * tc.msgs_per_pub; + // Actual commits, not the nominal target: under a tight pool a publisher + // may give up on sustained backpressure (see send_bounded). Every + // committed message reaches every Live ring, so each subscriber accounts + // for exactly g_published of them. + uint64_t total_sent = g_published.load(std::memory_order_relaxed); + uint64_t giveups = g_publisher_giveups.load(std::memory_order_relaxed); bool all_ok = true; @@ -79,6 +86,11 @@ bool run_stress_test(TestConfig const& tc) tc.num_publishers, tc.num_subscribers, mode_label); std::printf(" Elapsed: %" PRId64 " ms, total published: %" PRIu64 "\n", elapsed_ms, total_sent); + if (giveups > 0) + { + std::printf(" [NOTE] %" PRIu64 " publisher giveup(s) under sustained pool exhaustion\n", + giveups); + } std::printf(" %-6s %10s %10s %10s %10s %10s\n", "sub", "received", "lost", "corrupt", "bad_pid", "reorder"); diff --git a/tests/stress/pool_exhaustion.cc b/tests/stress/pool_exhaustion.cc index ec30f80..79c709b 100644 --- a/tests/stress/pool_exhaustion.cc +++ b/tests/stress/pool_exhaustion.cc @@ -23,8 +23,9 @@ bool run_pool_exhaustion() g_subscribers_ready = 0; g_subscribers_expected = NUM_SUBS; + g_published = 0; + g_publisher_giveups = 0; - std::atomic eagain_count{0}; std::atomic corruption_count{0}; struct SlowSubStats @@ -35,7 +36,10 @@ bool run_pool_exhaustion() }; std::vector sub_stats(NUM_SUBS); - // Publishers that track EAGAIN + // Publishers face deliberate exhaustion (pool=8, 8 pubs). send_bounded + // gives up after sustained EAGAIN rather than spinning forever: this pool + // CAN deadlock permanently -- slots recycle only on eviction, and once + // every publisher is blocked in allocate() nothing can evict. auto pub_worker = [&](int pub_id) { kickmsg::Publisher pub{region}; @@ -50,17 +54,9 @@ bool run_pool_exhaustion() msg.seq = i; msg.checksum = compute_checksum(msg); - int32_t rc; - while ((rc = pub.send(&msg, sizeof(msg))) < 0) + if (not send_bounded(pub, msg, pub_id)) { - if (rc != -EAGAIN) - { - std::fprintf(stderr, " [FATAL] publisher %d: send() returned %d\n", - pub_id, rc); - std::abort(); - } - eagain_count.fetch_add(1, std::memory_order_relaxed); - kickmsg::yield(); + break; } } }; @@ -153,7 +149,9 @@ bool run_pool_exhaustion() t.join(); } - std::printf(" EAGAIN count: %" PRIu64 "\n", eagain_count.load()); + std::printf(" published: %" PRIu64 ", publisher giveups: %" PRIu64 "\n", + g_published.load(std::memory_order_relaxed), + g_publisher_giveups.load(std::memory_order_relaxed)); bool ok = true; @@ -164,7 +162,9 @@ bool run_pool_exhaustion() ok = false; } - uint64_t const total_sent = static_cast(NUM_PUBS) * NUM_MSGS; + // Actual commits, not the nominal target: publishers may give up under + // sustained exhaustion, and every committed message reaches all 4 rings. + uint64_t const total_sent = g_published.load(std::memory_order_relaxed); for (int i = 0; i < NUM_SUBS; ++i) { auto const& s = sub_stats[static_cast(i)]; diff --git a/tests/unit/publisher-t.cc b/tests/unit/publisher-t.cc index fd9320e..a6b8f6b 100644 --- a/tests/unit/publisher-t.cc +++ b/tests/unit/publisher-t.cc @@ -119,6 +119,51 @@ TEST_F(PublisherTest, PublishOversizedLenReturnsZeroAndRecyclesSlot) EXPECT_EQ(pub.publish(sizeof(uint32_t)), 1u); } +// Pins the slot-recycling contract that caused a multi-hour soak hang: a slot +// returns to the pool ONLY via eviction (a publisher overwriting an old ring +// entry) or teardown -- NEVER via subscriber consumption. So with a pool +// smaller than the ring capacity, once every slot is committed the pool stays +// exhausted FOREVER: positions below `capacity` have no previous occupant to +// evict, and consuming them frees nothing. A publisher that retries -EAGAIN +// without bound (as the stress harness once did) then spins forever. +TEST_F(PublisherTest, ConsumptionDoesNotRecycleSlotsPoolStaysExhausted) +{ + kickmsg::channel::Config cfg; + cfg.max_subscribers = 1; + cfg.sub_ring_capacity = 8; // capacity > pool: no eviction can occur yet + cfg.pool_size = 4; + cfg.max_payload_size = 8; + + auto region = kickmsg::SharedRegion::create(SHM_NAME, kickmsg::channel::PubSub, cfg); + kickmsg::Subscriber sub(region); + kickmsg::Publisher pub(region); + + // Fill the pool: 4 sends commit 4 slots (positions 0..3, each refcount 1 + // held by the single ring). The 5th targets position 4 -- below capacity, + // so no previous occupant to evict -- and the pool is empty. + uint32_t val = 0; + for (int i = 0; i < 4; ++i) + { + ASSERT_EQ(pub.send(&val, sizeof(val)), static_cast(sizeof(val))); + } + EXPECT_EQ(pub.send(&val, sizeof(val)), -EAGAIN); + + // Consume everything. Consumption is pin/unpin (net-zero); it does NOT + // release the ring's reference, so no slot returns to the pool. + for (int i = 0; i < 4; ++i) + { + ASSERT_TRUE(sub.try_receive().has_value()); + } + + // Still exhausted, permanently: draining changed nothing. A bounded + // publisher must give up here; an unbounded one hangs. + for (int i = 0; i < 1000; ++i) + { + ASSERT_EQ(pub.send(&val, sizeof(val)), -EAGAIN) + << "consumption must not recycle slots (iter " << i << ")"; + } +} + TEST_F(PublisherTest, SendReturnsEagainOnPoolExhaustion) { kickmsg::channel::Config cfg;