Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
286 changes: 284 additions & 2 deletions client/bridge_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <sys/resource.h>
#include <thread>
#include <unistd.h>
#include <vector>

ABSL_FLAG(bool, start_server, true, "Start the subspace servers");
ABSL_FLAG(std::string, server, "", "Path to server executable");
Expand Down Expand Up @@ -195,8 +196,8 @@ void WaitForSubscribedMessage(toolbelt::FileDescriptor &bridge_pipe,
<< subscribed.channel_name() << std::endl;
}

absl::StatusOr<Message> ReadMessageEventually(Subscriber &sub) {
for (int i = 0; i < 100; i++) {
absl::StatusOr<Message> ReadMessageEventually(Subscriber &sub, int attempts) {
for (int i = 0; i < attempts; i++) {
absl::StatusOr<Message> msg = sub.ReadMessage();
if (!msg.ok()) {
return msg.status();
Expand All @@ -212,6 +213,153 @@ absl::StatusOr<Message> ReadMessageEventually(Subscriber &sub) {
return absl::DeadlineExceededError("Timed out waiting for bridge message");
}

absl::StatusOr<Message> ReadMessageEventually(Subscriber &sub) {
return ReadMessageEventually(sub, 100);
}

struct BridgeRangeConfig {
int first_port = 0;
int last_port = 0;
bool fallback_to_ephemeral = false;
};

std::vector<toolbelt::StreamSocket> ReserveTcpPortRange(int num_ports,
int *first_port,
int *last_port) {
for (int attempt = 0; attempt < 100; ++attempt) {
std::vector<toolbelt::StreamSocket> sockets;
toolbelt::StreamSocket first_socket;
absl::Status status =
first_socket.Bind(toolbelt::InetAddress::AnyAddress(/*port=*/0), true);
if (!status.ok()) {
continue;
}

int candidate_first = first_socket.BoundAddress().Port();
if (candidate_first + num_ports - 1 > 65535) {
continue;
}
sockets.push_back(std::move(first_socket));

bool reserved = true;
for (int port = candidate_first + 1;
port < candidate_first + num_ports; ++port) {
toolbelt::StreamSocket socket;
status = socket.Bind(toolbelt::InetAddress::AnyAddress(port), true);
if (!status.ok()) {
reserved = false;
break;
}
sockets.push_back(std::move(socket));
}

if (reserved) {
*first_port = candidate_first;
*last_port = candidate_first + num_ports - 1;
return sockets;
}
}

ADD_FAILURE() << "Unable to reserve a contiguous TCP port range";
return {};
}

class ScopedBridgeServerPair {
public:
~ScopedBridgeServerPair() { Stop(); }

void Start(const std::array<BridgeRangeConfig, 2> &ranges) {
int lock_fd = ::open("/tmp/subspace_bridge_test_port.lock",
O_CREAT | O_RDWR, 0666);
ASSERT_NE(-1, lock_fd);
ASSERT_EQ(0, ::flock(lock_fd, LOCK_EX));

std::array<int, 2> disc_ports;
std::array<toolbelt::UDPSocket, 2> reserved_ports;
for (int i = 0; i < 2; i++) {
ASSERT_OK(reserved_ports[i].Bind(
toolbelt::InetAddress::AnyAddress(/*port=*/0)));
disc_ports[i] = reserved_ports[i].BoundAddress().Port();
}

for (int i = 0; i < 2; i++) {
char socket_name_template[] = "/tmp/subspaceXXXXXX"; // NOLINT
::close(mkstemp(&socket_name_template[0]));
socket_[i] = &socket_name_template[0];

(void)pipe(server_pipe_[i]);
int peer_port = disc_ports[(i + 1) % 2];
server_[i] = std::make_unique<subspace::Server>(
scheduler_[i], socket_[i], "", disc_ports[i], peer_port,
/*local=*/false, server_pipe_[i][1]);
server_[i]->SetLogLevel(absl::GetFlag(FLAGS_log_level));

if (ranges[i].first_port != 0) {
ASSERT_OK(server_[i]->SetBridgePortRange(
ranges[i].first_port, ranges[i].last_port,
ranges[i].fallback_to_ephemeral));
}

auto bridge_pipe = server_[i]->CreateBridgeNotificationPipe();
ASSERT_OK(bridge_pipe);
bridge_notification_pipe_[i] = *bridge_pipe;
}

for (auto &port : reserved_ports) {
port.Close();
}

for (int i = 0; i < 2; i++) {
server_thread_[i] = std::thread([this, i]() {
absl::Status s = server_[i]->Run();
if (!s.ok()) {
fprintf(stderr, "Error running Subspace server: %s\n",
s.ToString().c_str());
exit(1);
}
});

char buf[8];
(void)::read(server_pipe_[i][0], buf, 8);
}
running_ = true;

ASSERT_EQ(0, ::flock(lock_fd, LOCK_UN));
::close(lock_fd);
}

void Stop() {
if (!running_) {
return;
}

char buf[8];
for (int i = 0; i < 2; i++) {
server_[i]->Stop();
(void)::read(server_pipe_[i][0], buf, 8);
server_thread_[i].join();
}
running_ = false;
}

void InitClient(subspace::Client &client, int server) {
ASSERT_OK(client.Init(socket_[server]));
}

toolbelt::FileDescriptor &BridgeNotificationPipe(int server) {
return bridge_notification_pipe_[server];
}

private:
co::CoroutineScheduler scheduler_[2];
std::string socket_[2];
int server_pipe_[2][2];
std::unique_ptr<subspace::Server> server_[2];
std::thread server_thread_[2];
toolbelt::FileDescriptor bridge_notification_pipe_[2];
bool running_ = false;
};

TEST_F(BridgeTest, Basic) {
subspace::Client client1;
InitClient(client1, 0);
Expand Down Expand Up @@ -248,6 +396,140 @@ TEST_F(BridgeTest, Basic) {
ASSERT_EQ(256, sub->SlotSize());
}

TEST(BridgePortRangeTest, StaticBridgePortRangeSucceeds) {
int first_port0 = 0;
int last_port0 = 0;
auto reserved0 =
ReserveTcpPortRange(/*num_ports=*/4, &first_port0, &last_port0);
ASSERT_FALSE(reserved0.empty());
int first_port1 = 0;
int last_port1 = 0;
auto reserved1 =
ReserveTcpPortRange(/*num_ports=*/4, &first_port1, &last_port1);
ASSERT_FALSE(reserved1.empty());
reserved0.clear();
reserved1.clear();

ScopedBridgeServerPair servers;
servers.Start({BridgeRangeConfig{first_port0, last_port0},
BridgeRangeConfig{first_port1, last_port1}});

subspace::Client client1;
servers.InitClient(client1, 0);
subspace::Client client2;
servers.InitClient(client2, 1);

absl::StatusOr<Publisher> pub =
client1.CreatePublisher("/bridged_static_range",
{.slot_size = 256,
.num_slots = 10,
.local = false});
ASSERT_OK(pub);

absl::StatusOr<Subscriber> sub =
client2.CreateSubscriber("/bridged_static_range",
{.max_active_messages = 2});
ASSERT_OK(sub);

WaitForSubscribedMessage(servers.BridgeNotificationPipe(0),
"/bridged_static_range");

absl::StatusOr<void *> buffer = pub->GetMessageBuffer();
ASSERT_OK(buffer);
memcpy(*buffer, "static", 6);
ASSERT_OK(pub->PublishMessage(6));

WaitForSubscribedMessage(servers.BridgeNotificationPipe(1),
"/bridged_static_range");

absl::StatusOr<Message> msg = ReadMessageEventually(*sub);
ASSERT_OK(msg);
ASSERT_EQ(6, msg->length);
}

TEST(BridgePortRangeTest, FallsBackWhenConfiguredRangeIsBusy) {
int occupied_port = 0;
int last_port = 0;
auto occupied =
ReserveTcpPortRange(/*num_ports=*/1, &occupied_port, &last_port);
ASSERT_FALSE(occupied.empty());

ScopedBridgeServerPair servers;
servers.Start({BridgeRangeConfig{},
BridgeRangeConfig{occupied_port, occupied_port,
/*fallback_to_ephemeral=*/true}});

subspace::Client client1;
servers.InitClient(client1, 0);
subspace::Client client2;
servers.InitClient(client2, 1);

absl::StatusOr<Publisher> pub =
client1.CreatePublisher("/bridged_fallback_range",
{.slot_size = 256,
.num_slots = 10,
.local = false});
ASSERT_OK(pub);

absl::StatusOr<Subscriber> sub =
client2.CreateSubscriber("/bridged_fallback_range",
{.max_active_messages = 2});
ASSERT_OK(sub);

WaitForSubscribedMessage(servers.BridgeNotificationPipe(0),
"/bridged_fallback_range");

absl::StatusOr<void *> buffer = pub->GetMessageBuffer();
ASSERT_OK(buffer);
memcpy(*buffer, "fallback", 8);
ASSERT_OK(pub->PublishMessage(8));

WaitForSubscribedMessage(servers.BridgeNotificationPipe(1),
"/bridged_fallback_range");

absl::StatusOr<Message> msg = ReadMessageEventually(*sub);
ASSERT_OK(msg);
ASSERT_EQ(8, msg->length);
}

TEST(BridgePortRangeTest, FailsWhenConfiguredRangeIsBusyAndFallbackDisabled) {
int occupied_port = 0;
int last_port = 0;
auto occupied =
ReserveTcpPortRange(/*num_ports=*/1, &occupied_port, &last_port);
ASSERT_FALSE(occupied.empty());

ScopedBridgeServerPair servers;
servers.Start({BridgeRangeConfig{},
BridgeRangeConfig{occupied_port, occupied_port,
/*fallback_to_ephemeral=*/false}});

subspace::Client client1;
servers.InitClient(client1, 0);
subspace::Client client2;
servers.InitClient(client2, 1);

absl::StatusOr<Publisher> pub =
client1.CreatePublisher("/bridged_exhausted_range",
{.slot_size = 256,
.num_slots = 10,
.local = false});
ASSERT_OK(pub);

absl::StatusOr<Subscriber> sub =
client2.CreateSubscriber("/bridged_exhausted_range",
{.max_active_messages = 2});
ASSERT_OK(sub);

absl::StatusOr<void *> buffer = pub->GetMessageBuffer();
ASSERT_OK(buffer);
memcpy(*buffer, "blocked", 7);
ASSERT_OK(pub->PublishMessage(7));

absl::StatusOr<Message> msg = ReadMessageEventually(*sub, 5);
ASSERT_FALSE(msg.ok());
}

TEST_F(BridgeTest, TwoSubs) {
subspace::Client client1;
InitClient(client1, 0);
Expand Down
7 changes: 7 additions & 0 deletions docs/server-architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,13 @@ Created when the local server wants to subscribe to a remote channel:
4. Creates a local publisher.
5. Receives messages over TCP and publishes them locally.

Bridge TCP listener ports are separate from the UDP discovery ports. By
default each bridge listener uses an ephemeral TCP port, but deployments with
firewalls can configure a fixed TCP bridge port or inclusive range with
`--bridge_ports=PORT` or `--bridge_ports=START-END`. If the configured range is
busy, `--bridge_ports_fallback_ephemeral` controls whether the server falls back
to an ephemeral TCP port or fails that bridge setup.

### Gratuitous Advertise

Every 5 seconds, broadcasts an `Advertise` for all local channels so late-joining subscribers can discover them.
Expand Down
Loading
Loading