Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ cc_test(
"@abseil-cpp//absl/status",
"@abseil-cpp//absl/status:status_matchers",
"@abseil-cpp//absl/status:statusor",
"@abseil-cpp//absl/strings:str_format",
"@googletest//:gtest",
"@coroutines//:co",
] + select({
Expand Down
2 changes: 1 addition & 1 deletion client/client_channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
#include <chrono>
#include <optional>
#include <sys/stat.h>
#if defined(__ANDROID__)
#if SUBSPACE_SHMEM_MODE == SUBSPACE_SHMEM_MODE_ANDROID
#include <sys/syscall.h>
#ifndef MFD_CLOEXEC
#define MFD_CLOEXEC 0x0001U
Expand Down
91 changes: 91 additions & 0 deletions client/client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,26 @@
#include "absl/flags/parse.h"
#include "absl/hash/hash_testing.h"
#include "absl/status/status.h"
#include "absl/strings/str_format.h"
#include "common/system_info.h"
#include "toolbelt/clock.h"
#include "toolbelt/hexdump.h"
#include "toolbelt/pipe.h"
#include <array>
#include <atomic>
#include <cerrno>
#include <cstdlib>
#include <cstring>
#include <fstream>
#include <inttypes.h>
#include <memory>
#include <sys/resource.h>
#if SUBSPACE_SHMEM_MODE == SUBSPACE_SHMEM_MODE_ANDROID
#include <sys/syscall.h>
#ifndef MFD_CLOEXEC
#define MFD_CLOEXEC 0x0001U
#endif
#endif
#include <unordered_map>

ABSL_FLAG(bool, start_server, true, "Start the subspace server");
Expand Down Expand Up @@ -81,6 +89,28 @@ uint64_t ExpectedSplitBufferVirtualMemoryUsage(int num_slots,
AlignPage(slot_size) * static_cast<uint64_t>(num_slots);
}

#if SUBSPACE_SHMEM_MODE == SUBSPACE_SHMEM_MODE_ANDROID
absl::StatusOr<toolbelt::FileDescriptor> CreateTestMemfd(const char *name,
size_t size) {
#ifdef __NR_memfd_create
int fd = static_cast<int>(
syscall(__NR_memfd_create, name, static_cast<unsigned int>(MFD_CLOEXEC)));
if (fd == -1) {
return absl::InternalError(absl::StrFormat(
"Failed to create test memfd %s: %s", name, strerror(errno)));
}
toolbelt::FileDescriptor result(fd);
if (ftruncate(result.Fd(), static_cast<off_t>(size)) == -1) {
return absl::InternalError(absl::StrFormat(
"Failed to size test memfd %s: %s", name, strerror(errno)));
}
return result;
#else
return absl::UnimplementedError("memfd_create is not available");
#endif
}
#endif

subspace::SplitBufferCallbacks MakeTestSplitBufferCallbacks(
std::shared_ptr<TestSplitBufferState> state) {
subspace::SplitBufferCallbacks callbacks;
Expand Down Expand Up @@ -198,6 +228,67 @@ TEST_F(ClientTest, Resize1) {
ASSERT_EQ(512, pub->SlotSize());
}

#if SUBSPACE_SHMEM_MODE == SUBSPACE_SHMEM_MODE_ANDROID
TEST(AndroidBufferRegistrationTest, FailedRegistrationRollsBackNumBuffers) {
constexpr int kNumSlots = 2;
absl::StatusOr<toolbelt::FileDescriptor> scb_fd =
CreateTestMemfd("subspace_test_scb", sizeof(subspace::SystemControlBlock));
if (absl::IsUnimplemented(scb_fd.status())) {
GTEST_SKIP() << "memfd_create is not available on this platform";
}
ASSERT_OK(scb_fd);
absl::StatusOr<toolbelt::FileDescriptor> ccb_fd =
CreateTestMemfd("subspace_test_ccb", subspace::CcbSize(kNumSlots));
ASSERT_OK(ccb_fd);
absl::StatusOr<toolbelt::FileDescriptor> bcb_fd = CreateTestMemfd(
"subspace_test_bcb", sizeof(subspace::BufferControlBlock));
ASSERT_OK(bcb_fd);

subspace::PublisherOptions options;
subspace::details::PublisherImpl publisher(
"android_registration_rollback", kNumSlots, /*channel_id=*/0,
/*publisher_id=*/0, /*vchan_id=*/-1, /*session_id=*/123, "",
options, [](subspace::Channel *) { return false; },
/*user_id=*/0, /*group_id=*/0);
ASSERT_OK(publisher.Map(
subspace::SharedMemoryFds(std::move(*ccb_fd), std::move(*bcb_fd)),
*scb_fd));

int failed_registration_attempts = 0;
publisher.SetClientBufferRegistrationCallback(
[&](const subspace::ClientBufferHandleMetadata &metadata,
const toolbelt::FileDescriptor *fd) {
failed_registration_attempts++;
EXPECT_EQ(0u, metadata.buffer_index);
EXPECT_NE(nullptr, fd);
EXPECT_TRUE(fd->Valid());
return absl::InternalError("injected registration failure");
});

absl::Status status = publisher.CreateOrAttachBuffers(/*slot_size=*/128);
EXPECT_FALSE(status.ok());
EXPECT_EQ(1, failed_registration_attempts);
EXPECT_EQ(0, publisher.GetCcb()->num_buffers.load(std::memory_order_relaxed));
EXPECT_TRUE(publisher.GetBuffers().empty());

std::vector<uint32_t> registered_indices;
publisher.SetClientBufferRegistrationCallback(
[&](const subspace::ClientBufferHandleMetadata &metadata,
const toolbelt::FileDescriptor *fd) {
EXPECT_NE(nullptr, fd);
EXPECT_TRUE(fd->Valid());
registered_indices.push_back(metadata.buffer_index);
return absl::OkStatus();
});

ASSERT_OK(publisher.CreateOrAttachBuffers(/*slot_size=*/128));
ASSERT_EQ(1u, registered_indices.size());
EXPECT_EQ(0u, registered_indices[0]);
EXPECT_EQ(1, publisher.GetCcb()->num_buffers.load(std::memory_order_relaxed));
EXPECT_EQ(1u, publisher.GetBuffers().size());
}
#endif

TEST_F(ClientTest, ResizeCallback) {
subspace::Client client;
InitClient(client);
Expand Down
17 changes: 17 additions & 0 deletions client/publisher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "client_channel.h"
#include "common/client_buffer.h"
#include "toolbelt/clock.h"
#include <algorithm>
#include <atomic>
#include <thread>
namespace subspace {
Expand Down Expand Up @@ -121,6 +122,22 @@ absl::Status PublisherImpl::CreateOrAttachBuffers(uint64_t final_slot_size) {
if (absl::Status status =
client_buffer_registration_callback_(metadata, &buffer.fd);
!status.ok()) {
int expected_num_buffers = new_num_buffers;
if (ccb_->num_buffers.compare_exchange_strong(
expected_num_buffers, old_num_buffers,
std::memory_order_acq_rel, std::memory_order_relaxed)) {
for (int j = old_num_buffers; j < new_num_buffers; j++) {
bcb_->sizes[j].store(0, std::memory_order_relaxed);
}
}
const size_t rollback_to =
std::min(static_cast<size_t>(old_num_buffers),
buffers_.size());
for (size_t j = rollback_to; j < buffers_.size(); j++) {
UnmapBufferSet(j, *buffers_[j],
/*destroy_owned_buffers=*/false);
}
buffers_.resize(rollback_to);
return status;
}
}
Expand Down
5 changes: 4 additions & 1 deletion common/channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ namespace subspace {
#define SUBSPACE_SHMEM_MODE_LINUX 2
#define SUBSPACE_SHMEM_MODE_ANDROID 3

// Change this if you want to use a different shared memory mode.
// Change this if you want to use a different shared memory mode. Builds may
// define SUBSPACE_SHMEM_MODE explicitly to exercise a non-default backend.
#ifndef SUBSPACE_SHMEM_MODE
#if defined(__ANDROID__)
// Android does not have /dev/shm; use anonymous fd-backed shared memory.
#define SUBSPACE_SHMEM_MODE SUBSPACE_SHMEM_MODE_ANDROID
Expand All @@ -39,6 +41,7 @@ namespace subspace {
// memory.
#define SUBSPACE_SHMEM_MODE SUBSPACE_SHMEM_MODE_POSIX
#endif
#endif

// Flag for flags field in MessagePrefix.
constexpr int kMessageActivate = 1; // This is a reliable activation message.
Expand Down
6 changes: 3 additions & 3 deletions common/split_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,6 @@ CreateSplitSharedMemoryBuffer(const SplitBufferMetadata &metadata) {
"Failed to create split buffer object %s: %s", metadata.object_name,
strerror(errno)));
}
#else
return absl::UnimplementedError("memfd_create is not available");
#endif
toolbelt::FileDescriptor shm_fd(fd);
if (GetSyscallShim().ftruncate_fn(
shm_fd.Fd(), static_cast<off_t>(PageAlignedSize(allocation_size))) ==
Expand All @@ -204,6 +201,9 @@ CreateSplitSharedMemoryBuffer(const SplitBufferMetadata &metadata) {
strerror(errno)));
}
return shm_fd;
#else
return absl::UnimplementedError("memfd_create is not available");
#endif
#else
int fd = GetSyscallShim().shm_open_fn(metadata.object_name.c_str(),
O_RDWR | O_CREAT | O_EXCL, 0666);
Expand Down
2 changes: 1 addition & 1 deletion server/server_channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#include "server/server.h"
#include <utility>
#include <sys/mman.h>
#if defined(__ANDROID__)
#if SUBSPACE_SHMEM_MODE == SUBSPACE_SHMEM_MODE_ANDROID
#include <sys/syscall.h>
#ifndef MFD_CLOEXEC
#define MFD_CLOEXEC 0x0001U
Expand Down
Loading