Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions client/client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,45 @@ TEST_F(ClientTest, PlaceholderSubscriberLearnsSplitBuffersOnReload) {
EXPECT_NE(reinterpret_cast<const void *>(sub_prefix), msg->buffer);
}

TEST_F(ClientTest, RejectsMixedSplitBufferPublisherOptions) {
subspace::Client client;
InitClient(client);

subspace::PublisherOptions split_options;
split_options.SetSlotSize(128).SetNumSlots(4).SetUseSplitBuffers(true);
absl::StatusOr<Publisher> split_pub =
client.CreatePublisher("mixed_split_publishers", split_options);
ASSERT_OK(split_pub);

subspace::PublisherOptions combined_options;
combined_options.SetSlotSize(128).SetNumSlots(4).SetUseSplitBuffers(false);
absl::StatusOr<Publisher> combined_pub =
client.CreatePublisher("mixed_split_publishers", combined_options);
ASSERT_FALSE(combined_pub.ok());
EXPECT_THAT(combined_pub.status().message(),
::testing::HasSubstr("Inconsistent split-buffer mode"));

subspace::PublisherOptions bridge_split_options;
bridge_split_options.SetSlotSize(128)
.SetNumSlots(4)
.SetUseSplitBuffers(false)
.SetSplitBuffersOverBridge(true);
absl::StatusOr<Publisher> bridge_split_pub = client.CreatePublisher(
"mixed_bridge_split_publishers", bridge_split_options);
ASSERT_OK(bridge_split_pub);

subspace::PublisherOptions no_bridge_split_options;
no_bridge_split_options.SetSlotSize(128)
.SetNumSlots(4)
.SetUseSplitBuffers(false)
.SetSplitBuffersOverBridge(false);
absl::StatusOr<Publisher> no_bridge_split_pub = client.CreatePublisher(
"mixed_bridge_split_publishers", no_bridge_split_options);
ASSERT_FALSE(no_bridge_split_pub.ok());
EXPECT_THAT(no_bridge_split_pub.status().message(),
::testing::HasSubstr("Inconsistent bridge split-buffer mode"));
}

TEST_F(ClientTest, SplitBufferCallbacksAllocateMapUnmapAndFreePayloadSlots) {
auto state = std::make_shared<TestSplitBufferState>();

Expand Down
16 changes: 15 additions & 1 deletion server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -974,7 +974,13 @@ absl::Status Server::RemapChannel(ServerChannel *channel, int slot_size,
if (!fds.ok()) {
return fds.status();
}
channel->SetLastKnownSlotSize(slot_size);
channel->SetSharedMemoryFds(std::move(*fds));
// Remapping replaces the CCB/BCB FDs; shadow recovery must receive the
// refreshed descriptors instead of retaining the placeholder mappings.
ForEachShadow([channel](const std::unique_ptr<ShadowReplicator> &s) {
s->SendCreateChannel(channel);
});
return absl::OkStatus();
}

Expand Down Expand Up @@ -1771,8 +1777,16 @@ void Server::BridgeReceiverCoroutine(std::string channel_name,
"Failed to parse Subscribed message");
return;
}
const bool bridge_publisher_split_buffers =
bool bridge_publisher_split_buffers =
split_buffers_over_bridge || subscribed.split_buffers_over_bridge();
if (auto it = channels_.find(channel_name); it != channels_.end()) {
const ServerChannel *split_channel =
SplitBufferOptionsChannel(it->second.get());
if (split_channel->HasSplitBufferOptions()) {
bridge_publisher_split_buffers =
split_channel->GetSplitBufferOptions().use_split_buffers;
}
}

// Build a publisher to publish incoming bridge messages to the channel.
Client client(co::self);
Expand Down
8 changes: 3 additions & 5 deletions server/server_channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,13 @@ absl::Status ServerChannel::ValidateOrSetSplitBufferOptions(
return absl::OkStatus();
}

if (options.use_split_buffers &&
split_buffer_options_.use_split_buffers != options.use_split_buffers) {
if (split_buffer_options_.use_split_buffers != options.use_split_buffers) {
return absl::InvalidArgumentError(absl::StrFormat(
"Inconsistent split-buffer mode for %s on channel %s", user_type,
Name()));
}
if (options.split_buffers_over_bridge &&
split_buffer_options_.split_buffers_over_bridge !=
options.split_buffers_over_bridge) {
if (split_buffer_options_.split_buffers_over_bridge !=
options.split_buffers_over_bridge) {
return absl::InvalidArgumentError(absl::StrFormat(
"Inconsistent bridge split-buffer mode for %s on channel %s", user_type,
Name()));
Expand Down
44 changes: 44 additions & 0 deletions shadow/shadow_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,50 @@ TEST_F(ShadowRecoveryTest, ServerRecoversSplitBufferStateFromShadow) {
StopShadow();
}

TEST_F(ShadowRecoveryTest, ShadowTracksPlaceholderRemap) {
signal(SIGPIPE, SIG_IGN);

StartShadow();
StartServer();

subspace::Client sub_client;
sub_client.SetThreadSafe(true);
ASSERT_THAT(sub_client.Init(RecoveryServerSocket()), IsOk());
auto sub = sub_client.CreateSubscriber("shadow_placeholder_remap",
{.max_active_messages = 2});
ASSERT_THAT(sub, IsOk());
ASSERT_TRUE(sub->IsPlaceholder());

ASSERT_TRUE(WaitForShadowState([this]() {
return shadow_->WithChannels([](auto &channels) {
auto it = channels.find("shadow_placeholder_remap");
return it != channels.end() && it->second.num_slots == 0 &&
it->second.slot_size == 0;
});
}));

subspace::Client pub_client;
pub_client.SetThreadSafe(true);
ASSERT_THAT(pub_client.Init(RecoveryServerSocket()), IsOk());
subspace::PublisherOptions options;
options.SetSlotSize(256).SetNumSlots(4).SetUseSplitBuffers(true);
auto pub = pub_client.CreatePublisher("shadow_placeholder_remap", options);
ASSERT_THAT(pub, IsOk());

ASSERT_TRUE(WaitForShadowState([this]() {
return shadow_->WithChannels([](auto &channels) {
auto it = channels.find("shadow_placeholder_remap");
return it != channels.end() && it->second.num_slots == 4 &&
it->second.slot_size == 256 &&
it->second.has_split_buffer_options &&
it->second.use_split_buffers;
});
}));

StopServer();
StopShadow();
}

TEST_F(ShadowRecoveryTest, ServerFunctionalAfterRecovery) {
signal(SIGPIPE, SIG_IGN);

Expand Down
Loading