diff --git a/client/bridge_test.cc b/client/bridge_test.cc index 1385cae..67c0da7 100644 --- a/client/bridge_test.cc +++ b/client/bridge_test.cc @@ -24,6 +24,7 @@ #include #include #include +#include ABSL_FLAG(bool, start_server, true, "Start the subspace servers"); ABSL_FLAG(std::string, server, "", "Path to server executable"); @@ -195,8 +196,8 @@ void WaitForSubscribedMessage(toolbelt::FileDescriptor &bridge_pipe, << subscribed.channel_name() << std::endl; } -absl::StatusOr ReadMessageEventually(Subscriber &sub) { - for (int i = 0; i < 100; i++) { +absl::StatusOr ReadMessageEventually(Subscriber &sub, int attempts) { + for (int i = 0; i < attempts; i++) { absl::StatusOr msg = sub.ReadMessage(); if (!msg.ok()) { return msg.status(); @@ -212,6 +213,153 @@ absl::StatusOr ReadMessageEventually(Subscriber &sub) { return absl::DeadlineExceededError("Timed out waiting for bridge message"); } +absl::StatusOr ReadMessageEventually(Subscriber &sub) { + return ReadMessageEventually(sub, 100); +} + +struct BridgeRangeConfig { + int first_port = 0; + int last_port = 0; + bool fallback_to_ephemeral = false; +}; + +std::vector ReserveTcpPortRange(int num_ports, + int *first_port, + int *last_port) { + for (int attempt = 0; attempt < 100; ++attempt) { + std::vector sockets; + toolbelt::StreamSocket first_socket; + absl::Status status = + first_socket.Bind(toolbelt::InetAddress::AnyAddress(/*port=*/0), true); + if (!status.ok()) { + continue; + } + + int candidate_first = first_socket.BoundAddress().Port(); + if (candidate_first + num_ports - 1 > 65535) { + continue; + } + sockets.push_back(std::move(first_socket)); + + bool reserved = true; + for (int port = candidate_first + 1; + port < candidate_first + num_ports; ++port) { + toolbelt::StreamSocket socket; + status = socket.Bind(toolbelt::InetAddress::AnyAddress(port), true); + if (!status.ok()) { + reserved = false; + break; + } + sockets.push_back(std::move(socket)); + } + + if (reserved) { + *first_port = candidate_first; + *last_port = candidate_first + num_ports - 1; + return sockets; + } + } + + ADD_FAILURE() << "Unable to reserve a contiguous TCP port range"; + return {}; +} + +class ScopedBridgeServerPair { +public: + ~ScopedBridgeServerPair() { Stop(); } + + void Start(const std::array &ranges) { + int lock_fd = ::open("/tmp/subspace_bridge_test_port.lock", + O_CREAT | O_RDWR, 0666); + ASSERT_NE(-1, lock_fd); + ASSERT_EQ(0, ::flock(lock_fd, LOCK_EX)); + + std::array disc_ports; + std::array reserved_ports; + for (int i = 0; i < 2; i++) { + ASSERT_OK(reserved_ports[i].Bind( + toolbelt::InetAddress::AnyAddress(/*port=*/0))); + disc_ports[i] = reserved_ports[i].BoundAddress().Port(); + } + + for (int i = 0; i < 2; i++) { + char socket_name_template[] = "/tmp/subspaceXXXXXX"; // NOLINT + ::close(mkstemp(&socket_name_template[0])); + socket_[i] = &socket_name_template[0]; + + (void)pipe(server_pipe_[i]); + int peer_port = disc_ports[(i + 1) % 2]; + server_[i] = std::make_unique( + scheduler_[i], socket_[i], "", disc_ports[i], peer_port, + /*local=*/false, server_pipe_[i][1]); + server_[i]->SetLogLevel(absl::GetFlag(FLAGS_log_level)); + + if (ranges[i].first_port != 0) { + ASSERT_OK(server_[i]->SetBridgePortRange( + ranges[i].first_port, ranges[i].last_port, + ranges[i].fallback_to_ephemeral)); + } + + auto bridge_pipe = server_[i]->CreateBridgeNotificationPipe(); + ASSERT_OK(bridge_pipe); + bridge_notification_pipe_[i] = *bridge_pipe; + } + + for (auto &port : reserved_ports) { + port.Close(); + } + + for (int i = 0; i < 2; i++) { + server_thread_[i] = std::thread([this, i]() { + absl::Status s = server_[i]->Run(); + if (!s.ok()) { + fprintf(stderr, "Error running Subspace server: %s\n", + s.ToString().c_str()); + exit(1); + } + }); + + char buf[8]; + (void)::read(server_pipe_[i][0], buf, 8); + } + running_ = true; + + ASSERT_EQ(0, ::flock(lock_fd, LOCK_UN)); + ::close(lock_fd); + } + + void Stop() { + if (!running_) { + return; + } + + char buf[8]; + for (int i = 0; i < 2; i++) { + server_[i]->Stop(); + (void)::read(server_pipe_[i][0], buf, 8); + server_thread_[i].join(); + } + running_ = false; + } + + void InitClient(subspace::Client &client, int server) { + ASSERT_OK(client.Init(socket_[server])); + } + + toolbelt::FileDescriptor &BridgeNotificationPipe(int server) { + return bridge_notification_pipe_[server]; + } + +private: + co::CoroutineScheduler scheduler_[2]; + std::string socket_[2]; + int server_pipe_[2][2]; + std::unique_ptr server_[2]; + std::thread server_thread_[2]; + toolbelt::FileDescriptor bridge_notification_pipe_[2]; + bool running_ = false; +}; + TEST_F(BridgeTest, Basic) { subspace::Client client1; InitClient(client1, 0); @@ -248,6 +396,140 @@ TEST_F(BridgeTest, Basic) { ASSERT_EQ(256, sub->SlotSize()); } +TEST(BridgePortRangeTest, StaticBridgePortRangeSucceeds) { + int first_port0 = 0; + int last_port0 = 0; + auto reserved0 = + ReserveTcpPortRange(/*num_ports=*/4, &first_port0, &last_port0); + ASSERT_FALSE(reserved0.empty()); + int first_port1 = 0; + int last_port1 = 0; + auto reserved1 = + ReserveTcpPortRange(/*num_ports=*/4, &first_port1, &last_port1); + ASSERT_FALSE(reserved1.empty()); + reserved0.clear(); + reserved1.clear(); + + ScopedBridgeServerPair servers; + servers.Start({BridgeRangeConfig{first_port0, last_port0}, + BridgeRangeConfig{first_port1, last_port1}}); + + subspace::Client client1; + servers.InitClient(client1, 0); + subspace::Client client2; + servers.InitClient(client2, 1); + + absl::StatusOr pub = + client1.CreatePublisher("/bridged_static_range", + {.slot_size = 256, + .num_slots = 10, + .local = false}); + ASSERT_OK(pub); + + absl::StatusOr sub = + client2.CreateSubscriber("/bridged_static_range", + {.max_active_messages = 2}); + ASSERT_OK(sub); + + WaitForSubscribedMessage(servers.BridgeNotificationPipe(0), + "/bridged_static_range"); + + absl::StatusOr buffer = pub->GetMessageBuffer(); + ASSERT_OK(buffer); + memcpy(*buffer, "static", 6); + ASSERT_OK(pub->PublishMessage(6)); + + WaitForSubscribedMessage(servers.BridgeNotificationPipe(1), + "/bridged_static_range"); + + absl::StatusOr msg = ReadMessageEventually(*sub); + ASSERT_OK(msg); + ASSERT_EQ(6, msg->length); +} + +TEST(BridgePortRangeTest, FallsBackWhenConfiguredRangeIsBusy) { + int occupied_port = 0; + int last_port = 0; + auto occupied = + ReserveTcpPortRange(/*num_ports=*/1, &occupied_port, &last_port); + ASSERT_FALSE(occupied.empty()); + + ScopedBridgeServerPair servers; + servers.Start({BridgeRangeConfig{}, + BridgeRangeConfig{occupied_port, occupied_port, + /*fallback_to_ephemeral=*/true}}); + + subspace::Client client1; + servers.InitClient(client1, 0); + subspace::Client client2; + servers.InitClient(client2, 1); + + absl::StatusOr pub = + client1.CreatePublisher("/bridged_fallback_range", + {.slot_size = 256, + .num_slots = 10, + .local = false}); + ASSERT_OK(pub); + + absl::StatusOr sub = + client2.CreateSubscriber("/bridged_fallback_range", + {.max_active_messages = 2}); + ASSERT_OK(sub); + + WaitForSubscribedMessage(servers.BridgeNotificationPipe(0), + "/bridged_fallback_range"); + + absl::StatusOr buffer = pub->GetMessageBuffer(); + ASSERT_OK(buffer); + memcpy(*buffer, "fallback", 8); + ASSERT_OK(pub->PublishMessage(8)); + + WaitForSubscribedMessage(servers.BridgeNotificationPipe(1), + "/bridged_fallback_range"); + + absl::StatusOr msg = ReadMessageEventually(*sub); + ASSERT_OK(msg); + ASSERT_EQ(8, msg->length); +} + +TEST(BridgePortRangeTest, FailsWhenConfiguredRangeIsBusyAndFallbackDisabled) { + int occupied_port = 0; + int last_port = 0; + auto occupied = + ReserveTcpPortRange(/*num_ports=*/1, &occupied_port, &last_port); + ASSERT_FALSE(occupied.empty()); + + ScopedBridgeServerPair servers; + servers.Start({BridgeRangeConfig{}, + BridgeRangeConfig{occupied_port, occupied_port, + /*fallback_to_ephemeral=*/false}}); + + subspace::Client client1; + servers.InitClient(client1, 0); + subspace::Client client2; + servers.InitClient(client2, 1); + + absl::StatusOr pub = + client1.CreatePublisher("/bridged_exhausted_range", + {.slot_size = 256, + .num_slots = 10, + .local = false}); + ASSERT_OK(pub); + + absl::StatusOr sub = + client2.CreateSubscriber("/bridged_exhausted_range", + {.max_active_messages = 2}); + ASSERT_OK(sub); + + absl::StatusOr buffer = pub->GetMessageBuffer(); + ASSERT_OK(buffer); + memcpy(*buffer, "blocked", 7); + ASSERT_OK(pub->PublishMessage(7)); + + absl::StatusOr msg = ReadMessageEventually(*sub, 5); + ASSERT_FALSE(msg.ok()); +} + TEST_F(BridgeTest, TwoSubs) { subspace::Client client1; InitClient(client1, 0); diff --git a/docs/server-architecture.md b/docs/server-architecture.md index ea6c5aa..62cdbd8 100644 --- a/docs/server-architecture.md +++ b/docs/server-architecture.md @@ -140,6 +140,13 @@ Created when the local server wants to subscribe to a remote channel: 4. Creates a local publisher. 5. Receives messages over TCP and publishes them locally. +Bridge TCP listener ports are separate from the UDP discovery ports. By +default each bridge listener uses an ephemeral TCP port, but deployments with +firewalls can configure a fixed TCP bridge port or inclusive range with +`--bridge_ports=PORT` or `--bridge_ports=START-END`. If the configured range is +busy, `--bridge_ports_fallback_ephemeral` controls whether the server falls back +to an ephemeral TCP port or fails that bridge setup. + ### Gratuitous Advertise Every 5 seconds, broadcasts an `Advertise` for all local channels so late-joining subscribers can discover them. diff --git a/server/main.cc b/server/main.cc index 91ddb08..51722b5 100644 --- a/server/main.cc +++ b/server/main.cc @@ -5,7 +5,9 @@ #include "absl/flags/flag.h" #include "absl/flags/parse.h" #include "server.h" +#include #include +#include #include static co::CoroutineScheduler *g_scheduler; @@ -21,6 +23,12 @@ ABSL_FLAG(std::string, socket, "/tmp/subspace", ABSL_FLAG(int, disc_port, 6502, "Discovery UDP port"); ABSL_FLAG(int, peer_port, 6502, "Discovery peer UDP port"); ABSL_FLAG(std::string, peer_address, "", "Bridge peer hostname or IP address"); +ABSL_FLAG(std::string, bridge_ports, "", + "TCP bridge port or inclusive port range START-END. Empty uses an " + "ephemeral port for each bridge listener."); +ABSL_FLAG(bool, bridge_ports_fallback_ephemeral, false, + "If true, use an ephemeral TCP bridge port when --bridge_ports is " + "configured but unavailable."); ABSL_FLAG(std::string, log_level, "info", "Log level"); ABSL_FLAG(std::string, interface, "", "Discovery network interface"); ABSL_FLAG(bool, local, false, "Use local computer only"); @@ -43,6 +51,48 @@ ABSL_FLAG(std::string, shadow_socket, "", ABSL_FLAG(std::string, secondary_shadow_socket, "", "Secondary shadow process Unix socket (empty = disabled)"); +static bool ParsePort(const std::string &value, int *port) { + if (value.empty()) { + return false; + } + + char *end = nullptr; + errno = 0; + long parsed = std::strtol(value.c_str(), &end, 10); + if (errno != 0 || end == value.c_str() || *end != '\0' || parsed <= 0 || + parsed > 65535) { + return false; + } + + *port = static_cast(parsed); + return true; +} + +static bool ParseBridgePorts(const std::string &value, int *first_port, + int *last_port) { + if (value.empty()) { + *first_port = 0; + *last_port = 0; + return true; + } + + size_t dash = value.find('-'); + if (dash == std::string::npos) { + if (!ParsePort(value, first_port)) { + return false; + } + *last_port = *first_port; + return true; + } + if (value.find('-', dash + 1) != std::string::npos) { + return false; + } + + return ParsePort(value.substr(0, dash), first_port) && + ParsePort(value.substr(dash + 1), last_port) && + *first_port <= *last_port; +} + int main(int argc, char **argv) { absl::ParseCommandLine(argc, argv); @@ -72,6 +122,25 @@ int main(int argc, char **argv) { } server->SetLogLevel(absl::GetFlag(FLAGS_log_level)); + int bridge_first_port = 0; + int bridge_last_port = 0; + if (!ParseBridgePorts(absl::GetFlag(FLAGS_bridge_ports), &bridge_first_port, + &bridge_last_port)) { + fprintf(stderr, + "Invalid --bridge_ports value '%s'; expected PORT or START-END\n", + absl::GetFlag(FLAGS_bridge_ports).c_str()); + exit(1); + } + if (bridge_first_port != 0) { + absl::Status status = server->SetBridgePortRange( + bridge_first_port, bridge_last_port, + absl::GetFlag(FLAGS_bridge_ports_fallback_ephemeral)); + if (!status.ok()) { + fprintf(stderr, "Invalid bridge port range: %s\n", + status.ToString().c_str()); + exit(1); + } + } if (absl::GetFlag(FLAGS_cleanup_filesystem)) { server->SetCleanupFilesystem(true); } diff --git a/server/server.cc b/server/server.cc index 3d00b93..de264f0 100644 --- a/server/server.cc +++ b/server/server.cc @@ -327,6 +327,20 @@ static absl::Status FindIPAddresses(const std::string &interface, return absl::OkStatus(); } +static absl::StatusOr +BridgeAddressWithPort(const toolbelt::SocketAddress &addr, int port) { + switch (addr.Type()) { + case toolbelt::SocketAddress::kAddressInet: + return toolbelt::SocketAddress( + addr.GetInetAddress().IpAddressInNetworkOrder(), port); + case toolbelt::SocketAddress::kAddressVirtual: + return toolbelt::SocketAddress(addr.GetVirtualAddress().Cid(), port); + default: + return absl::InvalidArgumentError(absl::StrFormat( + "unsupported bridge listener address: %s", addr.ToString())); + } +} + Server::Server(co::CoroutineScheduler &scheduler, const std::string &socket_name, const std::string &interface, int disc_port, int peer_port, bool local, int notify_fd, @@ -355,6 +369,24 @@ Server::Server(co::CoroutineScheduler &scheduler, CreateShutdownTrigger(); } +absl::Status Server::SetBridgePortRange(int first_port, int last_port, + bool fallback_to_ephemeral) { + if (first_port == 0 && last_port == 0) { + bridge_port_range_ = {}; + bridge_ports_fallback_to_ephemeral_ = fallback_to_ephemeral; + return absl::OkStatus(); + } + if (first_port <= 0 || last_port <= 0 || first_port > 65535 || + last_port > 65535 || first_port > last_port) { + return absl::InvalidArgumentError(absl::StrFormat( + "invalid bridge port range %d-%d", first_port, last_port)); + } + + bridge_port_range_ = {.first_port = first_port, .last_port = last_port}; + bridge_ports_fallback_to_ephemeral_ = fallback_to_ephemeral; + return absl::OkStatus(); +} + Server::~Server() { // Clear this before other data members get destroyed. client_handlers_.clear(); @@ -394,6 +426,39 @@ void Server::CreateShutdownTrigger() { shutdown_trigger_fd_ = std::move(*fd); } +absl::Status Server::BindBridgeListener(toolbelt::StreamSocket &listener) { + if (!bridge_port_range_.Enabled()) { + return listener.Bind(toolbelt::SocketAddress::AnyPort(my_address_), true); + } + + absl::Status last_status = absl::UnavailableError("no ports tried"); + for (int port = bridge_port_range_.first_port; + port <= bridge_port_range_.last_port; ++port) { + absl::StatusOr addr = + BridgeAddressWithPort(my_address_, port); + if (!addr.ok()) { + return addr.status(); + } + + toolbelt::StreamSocket candidate; + absl::Status status = candidate.Bind(*addr, true); + if (status.ok()) { + listener = std::move(candidate); + return absl::OkStatus(); + } + last_status = status; + } + + if (bridge_ports_fallback_to_ephemeral_) { + return listener.Bind(toolbelt::SocketAddress::AnyPort(my_address_), true); + } + + return absl::UnavailableError(absl::StrFormat( + "unable to bind bridge listener in configured port range %d-%d: %s", + bridge_port_range_.first_port, bridge_port_range_.last_port, + last_status.ToString())); +} + absl::StatusOr Server::CreateBridgeNotificationPipe() { auto status = toolbelt::Pipe::Create(); @@ -1395,8 +1460,7 @@ void Server::BridgeTransmitterCoroutine(ServerChannel *channel, // Allocate a listen socket to wait for an incoming connection from the // other server. - absl::Status s = retirement_listener.Bind( - toolbelt::SocketAddress::AnyPort(my_address_), true); + absl::Status s = BindBridgeListener(retirement_listener); if (!s.ok()) { logger_.Log(toolbelt::LogLevel::kError, "Unable to bind socket for retirement receiver for %s: %s", @@ -1717,13 +1781,12 @@ void Server::BridgeReceiverCoroutine(std::string channel_name, } } bridge_guard{this, channel_name, publisher, sub_reliable}; - // Open a listening TCP socket on a free port. + // Open a listening TCP socket for the incoming bridge connection. logger_.Log(toolbelt::LogLevel::kDebug, "BridgeReceiverCoroutine running"); char buffer[kDiscoveryBufferSize]; toolbelt::StreamSocket receiver_listener; - auto addr = toolbelt::SocketAddress::AnyPort(my_address_); - absl::Status s = receiver_listener.Bind(addr, true); + absl::Status s = BindBridgeListener(receiver_listener); if (!s.ok()) { logger_.Log(toolbelt::LogLevel::kError, "Unable to bind socket for bridge receiver for %s: %s", diff --git a/server/server.h b/server/server.h index eac7c62..8695a64 100644 --- a/server/server.h +++ b/server/server.h @@ -101,6 +101,16 @@ class Server { absl::StatusOr CreateBridgeNotificationPipe(); + struct BridgePortRange { + int first_port = 0; + int last_port = 0; + + bool Enabled() const { return first_port != 0 || last_port != 0; } + }; + + absl::Status SetBridgePortRange(int first_port, int last_port, + bool fallback_to_ephemeral = false); + void SetCleanupFilesystem(bool v) { cleanup_filesystem_ = v; } void CleanupFilesystem(); @@ -207,6 +217,7 @@ class Server { const toolbelt::InetAddress &sender, const std::string &server_id); void GratuitousAdvertiseCoroutine(); + absl::Status BindBridgeListener(toolbelt::StreamSocket &listener); absl::Status RegisterPlugin(const std::string &name, void *handle, std::unique_ptr interface); absl::Status SendSubscribeMessage(const std::string &channel_name, @@ -276,6 +287,8 @@ class Server { toolbelt::TriggerFd shutdown_trigger_fd_; std::string machine_name_; bool publish_server_channels_ = true; + BridgePortRange bridge_port_range_; + bool bridge_ports_fallback_to_ephemeral_ = false; std::unique_ptr primary_shadow_replicator_; std::unique_ptr secondary_shadow_replicator_;