diff --git a/Server/src/RedisManager.cpp b/Server/src/RedisManager.cpp index 0f7e2dd..de31d67 100644 --- a/Server/src/RedisManager.cpp +++ b/Server/src/RedisManager.cpp @@ -9,6 +9,15 @@ #include "RedisManager.h" +/** + * Default size of the shared RedisPipeline buffer used for BMP state + * writes. Set high enough that a typical BMP UPDATE worth of route + * entries fits in a single round-trip, but bounded so that the pipeline + * auto-flushes if a producer goes quiet without the message-level + * flush hook ever firing. + */ +static constexpr size_t BMP_PIPELINE_SIZE = 1024; + /*********************************************************************//** * Constructor for class @@ -35,10 +44,22 @@ void RedisManager::Setup(Logger *logPtr) { swss::SonicDBConfig::initialize(); } + // Drop any pre-existing buffered Table writers before we replace the + // pipeline they reference. Today Setup() is only called once from + // MsgBusImpl_redis's constructor, but bufferedTables_ holds raw + // pointers into pipeline_ and would dangle if a second Setup() ever + // reassigned pipeline_ without clearing the map first. + bufferedTables_.clear(); + stateDb_ = std::make_shared(BMP_DB_NAME, 0, false); separator_ = swss::SonicDBConfig::getSeparator(BMP_DB_NAME); -} + // Build a single shared pipeline backed by the same logical DB. All + // per-table buffered Table writers below share this pipeline so that a + // flush() sends every queued HSET in a single batch instead of one + // round-trip per route entry. + pipeline_ = std::make_shared(stateDb_.get(), BMP_PIPELINE_SIZE); +} /** @@ -51,6 +72,31 @@ std::string RedisManager::GetKeySeparator() { } +/** + * Lookup-or-create a buffered Table writer for the given table name. + * + * The Table is constructed with buffered=true and bound to pipeline_, so + * each set() call enqueues the underlying HSET on the pipeline rather + * than synchronously talking to redis. The actual round-trip happens on + * FlushBMPTables() (or when the pipeline buffer fills). + * + * Returns nullptr if the table is not enabled via CONFIG_DB. + */ +swss::Table* RedisManager::GetOrCreateBufferedTable(const std::string& table) { + if (enabledTables_.find(table) == enabledTables_.end()) { + return nullptr; + } + auto it = bufferedTables_.find(table); + if (it == bufferedTables_.end()) { + auto t = std::make_unique(pipeline_.get(), table, /*buffered=*/true); + auto raw = t.get(); + bufferedTables_.emplace(table, std::move(t)); + return raw; + } + return it->second.get(); +} + + /** * WriteBMPTable * @@ -60,11 +106,11 @@ std::string RedisManager::GetKeySeparator() { */ bool RedisManager::WriteBMPTable(const std::string& table, const std::vector& keys, const std::vector fieldValues) { - if (enabledTables_.find(table) == enabledTables_.end()) { + swss::Table* stateBMPTable = GetOrCreateBufferedTable(table); + if (stateBMPTable == nullptr) { DEBUG("RedisManager %s is disabled", table.c_str()); return false; } - std::unique_ptr stateBMPTable = std::make_unique(stateDb_.get(), table); std::ostringstream oss; for (const auto& key : keys) { oss << key << separator_; @@ -89,17 +135,44 @@ bool RedisManager::RemoveEntityFromBMPTable(const std::vector& keys for (const auto& key : keys) { DEBUG("RedisManager RemoveEntityFromBMPTable key = %s", key.c_str()); } + // Flush any buffered SETs first so that a later DEL on the same key + // cannot be reordered before its preceding SET. + FlushBMPTables(); stateDb_->del(keys); return true; } +/** + * FlushBMPTables - flush all buffered table writers' pending operations. + */ +void RedisManager::FlushBMPTables() { + for (auto& kv : bufferedTables_) { + if (kv.second) { + kv.second->flush(); + } + } +} + + /** * ExitRedisManager * * \param [in] N/A */ void RedisManager::ExitRedisManager() { + // Best-effort: drain anything still buffered so we don't lose state + // updates that have already been observed by openbmpd. This runs from + // ~MsgBusImpl_redis, so we must not let a redis I/O error escape into + // the destructor chain - mirror swss::RedisPipeline's own dtor + // pattern and swallow exceptions here. + try { + FlushBMPTables(); + } catch (const std::exception& e) { + LOG_INFO("RedisManager ExitRedisManager flush failed: %s", e.what()); + } catch (...) { + LOG_INFO("RedisManager ExitRedisManager flush failed with unknown exception"); + } exit_ = true; } @@ -141,6 +214,12 @@ bool RedisManager::InitBMPConfig() { */ void RedisManager::ResetBMPTable(const std::string & table) { + // Drain the pipeline before reading the current key set: getKeys() runs + // on stateDb_'s connection and would otherwise miss any SETs that are + // still buffered in pipeline_ (which uses an independent connection), + // leaving stale entries in redis after the reset completes. + FlushBMPTables(); + std::unique_ptr stateBMPTable = std::make_unique(stateDb_.get(), table); std::vector keys; stateBMPTable->getKeys(keys); diff --git a/Server/src/RedisManager.h b/Server/src/RedisManager.h index 05a437a..80211bd 100644 --- a/Server/src/RedisManager.h +++ b/Server/src/RedisManager.h @@ -13,11 +13,14 @@ #include #include #include +#include #include #include #include +#include #include +#include #include #include #include @@ -118,6 +121,16 @@ class RedisManager { */ bool RemoveEntityFromBMPTable(const std::vector& keys); + /** + * FlushBMPTables - flush any pending buffered HSET operations on the + * pipelined per-table writers. Should be called at natural batch + * boundaries (e.g. after a full BMP UPDATE message is processed) so + * that buffered redis writes are actually delivered. + * + * \param [in] N/A + */ + void FlushBMPTables(); + /** * Get Key separator for deletion * @@ -126,7 +139,16 @@ class RedisManager { std::string GetKeySeparator(); private: + /** + * Lookup (or lazily create) a buffered swss::Table writer for the + * given table name, backed by pipeline_. Returns nullptr if the table + * is not in enabledTables_. + */ + swss::Table* GetOrCreateBufferedTable(const std::string& table); + std::shared_ptr stateDb_; + std::shared_ptr pipeline_; + std::unordered_map> bufferedTables_; std::string separator_; Logger *logger; std::unordered_set enabledTables_; diff --git a/Server/src/redis/MsgBusImpl_redis.cpp b/Server/src/redis/MsgBusImpl_redis.cpp index 1e00203..c1cb977 100644 --- a/Server/src/redis/MsgBusImpl_redis.cpp +++ b/Server/src/redis/MsgBusImpl_redis.cpp @@ -96,6 +96,10 @@ void MsgBusImpl_redis::update_Peer(obj_bgp_peer &peer, obj_peer_up_event *up, ob } redisMgr_.WriteBMPTable(BMP_TABLE_NEI, keys, fieldValues); + // Peer state transitions are infrequent and BMP consumers expect to + // see them immediately; flush right away rather than waiting for the + // pipeline buffer to fill. + redisMgr_.FlushBMPTables(); } @@ -183,7 +187,17 @@ void MsgBusImpl_redis::update_unicastPrefix(obj_bgp_peer &peer, vector } if (!del_keys.empty()) { + // RemoveEntityFromBMPTable already flushes any buffered SETs + // before issuing the DEL, so SET-then-DEL ordering on the same + // key is preserved. redisMgr_.RemoveEntityFromBMPTable(del_keys); + } else { + // ADD path accumulated buffered set() calls across all rib[i] + // entries above; emit them in a single pipelined round-trip + // instead of one HSET-per-route. Without this flush the buffered + // updates would only be sent when the pipeline buffer fills, + // which can delay BMP state visibility for slow producers. + redisMgr_.FlushBMPTables(); } }