From d3f5d5a562bfa6fd7137eebfc8e39fe244a85420 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Fri, 22 May 2026 11:10:33 +0000 Subject: [PATCH 1/3] Fix split-buffer remap state consistency Co-authored-by: dallison --- client/client_test.cc | 39 +++++++++++++++++++++++++++++++++++ server/server.cc | 5 +++++ server/server_channel.cc | 8 +++----- shadow/shadow_test.cc | 44 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 91 insertions(+), 5 deletions(-) diff --git a/client/client_test.cc b/client/client_test.cc index 5815be1..fca6f1a 100644 --- a/client/client_test.cc +++ b/client/client_test.cc @@ -843,6 +843,45 @@ TEST_F(ClientTest, PlaceholderSubscriberLearnsSplitBuffersOnReload) { EXPECT_NE(reinterpret_cast(sub_prefix), msg->buffer); } +TEST_F(ClientTest, RejectsMixedSplitBufferPublisherOptions) { + subspace::Client client; + InitClient(client); + + subspace::PublisherOptions split_options; + split_options.SetSlotSize(128).SetNumSlots(4).SetUseSplitBuffers(true); + absl::StatusOr split_pub = + client.CreatePublisher("mixed_split_publishers", split_options); + ASSERT_OK(split_pub); + + subspace::PublisherOptions combined_options; + combined_options.SetSlotSize(128).SetNumSlots(4).SetUseSplitBuffers(false); + absl::StatusOr combined_pub = + client.CreatePublisher("mixed_split_publishers", combined_options); + ASSERT_FALSE(combined_pub.ok()); + EXPECT_THAT(combined_pub.status().message(), + ::testing::HasSubstr("Inconsistent split-buffer mode")); + + subspace::PublisherOptions bridge_split_options; + bridge_split_options.SetSlotSize(128) + .SetNumSlots(4) + .SetUseSplitBuffers(false) + .SetSplitBuffersOverBridge(true); + absl::StatusOr bridge_split_pub = client.CreatePublisher( + "mixed_bridge_split_publishers", bridge_split_options); + ASSERT_OK(bridge_split_pub); + + subspace::PublisherOptions no_bridge_split_options; + no_bridge_split_options.SetSlotSize(128) + .SetNumSlots(4) + .SetUseSplitBuffers(false) + .SetSplitBuffersOverBridge(false); + absl::StatusOr no_bridge_split_pub = client.CreatePublisher( + "mixed_bridge_split_publishers", no_bridge_split_options); + ASSERT_FALSE(no_bridge_split_pub.ok()); + EXPECT_THAT(no_bridge_split_pub.status().message(), + ::testing::HasSubstr("Inconsistent bridge split-buffer mode")); +} + TEST_F(ClientTest, SplitBufferCallbacksAllocateMapUnmapAndFreePayloadSlots) { auto state = std::make_shared(); diff --git a/server/server.cc b/server/server.cc index 3d00b93..ae7d196 100644 --- a/server/server.cc +++ b/server/server.cc @@ -975,6 +975,11 @@ absl::Status Server::RemapChannel(ServerChannel *channel, int slot_size, return fds.status(); } channel->SetSharedMemoryFds(std::move(*fds)); + // Remapping replaces the CCB/BCB FDs; shadow recovery must receive the + // refreshed descriptors instead of retaining the placeholder mappings. + ForEachShadow([channel](const std::unique_ptr &s) { + s->SendCreateChannel(channel); + }); return absl::OkStatus(); } diff --git a/server/server_channel.cc b/server/server_channel.cc index a9e5a98..9dfed7e 100644 --- a/server/server_channel.cc +++ b/server/server_channel.cc @@ -102,15 +102,13 @@ absl::Status ServerChannel::ValidateOrSetSplitBufferOptions( return absl::OkStatus(); } - if (options.use_split_buffers && - split_buffer_options_.use_split_buffers != options.use_split_buffers) { + if (split_buffer_options_.use_split_buffers != options.use_split_buffers) { return absl::InvalidArgumentError(absl::StrFormat( "Inconsistent split-buffer mode for %s on channel %s", user_type, Name())); } - if (options.split_buffers_over_bridge && - split_buffer_options_.split_buffers_over_bridge != - options.split_buffers_over_bridge) { + if (split_buffer_options_.split_buffers_over_bridge != + options.split_buffers_over_bridge) { return absl::InvalidArgumentError(absl::StrFormat( "Inconsistent bridge split-buffer mode for %s on channel %s", user_type, Name())); diff --git a/shadow/shadow_test.cc b/shadow/shadow_test.cc index 2d33ba0..0471f9b 100644 --- a/shadow/shadow_test.cc +++ b/shadow/shadow_test.cc @@ -598,6 +598,50 @@ TEST_F(ShadowRecoveryTest, ServerRecoversSplitBufferStateFromShadow) { StopShadow(); } +TEST_F(ShadowRecoveryTest, ShadowTracksPlaceholderRemap) { + signal(SIGPIPE, SIG_IGN); + + StartShadow(); + StartServer(); + + subspace::Client sub_client; + sub_client.SetThreadSafe(true); + ASSERT_THAT(sub_client.Init(RecoveryServerSocket()), IsOk()); + auto sub = sub_client.CreateSubscriber("shadow_placeholder_remap", + {.max_active_messages = 2}); + ASSERT_THAT(sub, IsOk()); + ASSERT_TRUE(sub->IsPlaceholder()); + + ASSERT_TRUE(WaitForShadowState([this]() { + return shadow_->WithChannels([](auto &channels) { + auto it = channels.find("shadow_placeholder_remap"); + return it != channels.end() && it->second.num_slots == 0 && + it->second.slot_size == 0; + }); + })); + + subspace::Client pub_client; + pub_client.SetThreadSafe(true); + ASSERT_THAT(pub_client.Init(RecoveryServerSocket()), IsOk()); + subspace::PublisherOptions options; + options.SetSlotSize(256).SetNumSlots(4).SetUseSplitBuffers(true); + auto pub = pub_client.CreatePublisher("shadow_placeholder_remap", options); + ASSERT_THAT(pub, IsOk()); + + ASSERT_TRUE(WaitForShadowState([this]() { + return shadow_->WithChannels([](auto &channels) { + auto it = channels.find("shadow_placeholder_remap"); + return it != channels.end() && it->second.num_slots == 4 && + it->second.slot_size == 256 && + it->second.has_split_buffer_options && + it->second.use_split_buffers; + }); + })); + + StopServer(); + StopShadow(); +} + TEST_F(ShadowRecoveryTest, ServerFunctionalAfterRecovery) { signal(SIGPIPE, SIG_IGN); From 09e85f0db8e2714ea01ba1975086a224c15b3365 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Fri, 22 May 2026 11:15:47 +0000 Subject: [PATCH 2/3] Preserve slot size when remapping channels Co-authored-by: dallison --- server/server.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/server/server.cc b/server/server.cc index ae7d196..7980e4e 100644 --- a/server/server.cc +++ b/server/server.cc @@ -974,6 +974,7 @@ absl::Status Server::RemapChannel(ServerChannel *channel, int slot_size, if (!fds.ok()) { return fds.status(); } + channel->SetLastKnownSlotSize(slot_size); channel->SetSharedMemoryFds(std::move(*fds)); // Remapping replaces the CCB/BCB FDs; shadow recovery must receive the // refreshed descriptors instead of retaining the placeholder mappings. From 75ace86b20b3a4e7cbf15698f3274f11d5fcb4ed Mon Sep 17 00:00:00 2001 From: Dave Allison Date: Tue, 26 May 2026 15:07:24 -0700 Subject: [PATCH 3/3] Align bridge publisher split-buffer mode --- server/server.cc | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/server/server.cc b/server/server.cc index 7980e4e..114cd59 100644 --- a/server/server.cc +++ b/server/server.cc @@ -1777,8 +1777,16 @@ void Server::BridgeReceiverCoroutine(std::string channel_name, "Failed to parse Subscribed message"); return; } - const bool bridge_publisher_split_buffers = + bool bridge_publisher_split_buffers = split_buffers_over_bridge || subscribed.split_buffers_over_bridge(); + if (auto it = channels_.find(channel_name); it != channels_.end()) { + const ServerChannel *split_channel = + SplitBufferOptionsChannel(it->second.get()); + if (split_channel->HasSplitBufferOptions()) { + bridge_publisher_split_buffers = + split_channel->GetSplitBufferOptions().use_split_buffers; + } + } // Build a publisher to publish incoming bridge messages to the channel. Client client(co::self);