Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 82 additions & 3 deletions Server/src/RedisManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@

#include "RedisManager.h"

/**
* Default size of the shared RedisPipeline buffer used for BMP state
* writes. Set high enough that a typical BMP UPDATE worth of route
* entries fits in a single round-trip, but bounded so that the pipeline
* auto-flushes if a producer goes quiet without the message-level
* flush hook ever firing.
*/
static constexpr size_t BMP_PIPELINE_SIZE = 1024;


/*********************************************************************//**
* Constructor for class
Expand All @@ -35,10 +44,22 @@ void RedisManager::Setup(Logger *logPtr) {
swss::SonicDBConfig::initialize();
}

// Drop any pre-existing buffered Table writers before we replace the
// pipeline they reference. Today Setup() is only called once from
// MsgBusImpl_redis's constructor, but bufferedTables_ holds raw
// pointers into pipeline_ and would dangle if a second Setup() ever
// reassigned pipeline_ without clearing the map first.
bufferedTables_.clear();

stateDb_ = std::make_shared<swss::DBConnector>(BMP_DB_NAME, 0, false);
separator_ = swss::SonicDBConfig::getSeparator(BMP_DB_NAME);
}

// Build a single shared pipeline backed by the same logical DB. All
// per-table buffered Table writers below share this pipeline so that a
// flush() sends every queued HSET in a single batch instead of one
// round-trip per route entry.
pipeline_ = std::make_shared<swss::RedisPipeline>(stateDb_.get(), BMP_PIPELINE_SIZE);
}


/**
Expand All @@ -51,6 +72,31 @@ std::string RedisManager::GetKeySeparator() {
}


/**
* Lookup-or-create a buffered Table writer for the given table name.
*
* The Table is constructed with buffered=true and bound to pipeline_, so
* each set() call enqueues the underlying HSET on the pipeline rather
* than synchronously talking to redis. The actual round-trip happens on
* FlushBMPTables() (or when the pipeline buffer fills).
*
* Returns nullptr if the table is not enabled via CONFIG_DB.
*/
swss::Table* RedisManager::GetOrCreateBufferedTable(const std::string& table) {
if (enabledTables_.find(table) == enabledTables_.end()) {
return nullptr;
}
auto it = bufferedTables_.find(table);
if (it == bufferedTables_.end()) {
auto t = std::make_unique<swss::Table>(pipeline_.get(), table, /*buffered=*/true);
auto raw = t.get();
bufferedTables_.emplace(table, std::move(t));
return raw;
}
return it->second.get();
}


/**
* WriteBMPTable
*
Expand All @@ -60,11 +106,11 @@ std::string RedisManager::GetKeySeparator() {
*/
bool RedisManager::WriteBMPTable(const std::string& table, const std::vector<std::string>& keys, const std::vector<swss::FieldValueTuple> fieldValues) {

if (enabledTables_.find(table) == enabledTables_.end()) {
swss::Table* stateBMPTable = GetOrCreateBufferedTable(table);
if (stateBMPTable == nullptr) {
DEBUG("RedisManager %s is disabled", table.c_str());
return false;
}
std::unique_ptr<swss::Table> stateBMPTable = std::make_unique<swss::Table>(stateDb_.get(), table);
std::ostringstream oss;
for (const auto& key : keys) {
oss << key << separator_;
Expand All @@ -89,17 +135,44 @@ bool RedisManager::RemoveEntityFromBMPTable(const std::vector<std::string>& keys
for (const auto& key : keys) {
DEBUG("RedisManager RemoveEntityFromBMPTable key = %s", key.c_str());
}
// Flush any buffered SETs first so that a later DEL on the same key
// cannot be reordered before its preceding SET.
FlushBMPTables();
stateDb_->del(keys);
return true;
}


/**
* FlushBMPTables - flush all buffered table writers' pending operations.
*/
void RedisManager::FlushBMPTables() {
for (auto& kv : bufferedTables_) {
if (kv.second) {
kv.second->flush();
}
}
}


/**
* ExitRedisManager
*
* \param [in] N/A
*/
void RedisManager::ExitRedisManager() {
// Best-effort: drain anything still buffered so we don't lose state
// updates that have already been observed by openbmpd. This runs from
// ~MsgBusImpl_redis, so we must not let a redis I/O error escape into
// the destructor chain - mirror swss::RedisPipeline's own dtor
// pattern and swallow exceptions here.
try {
FlushBMPTables();
} catch (const std::exception& e) {
LOG_INFO("RedisManager ExitRedisManager flush failed: %s", e.what());
} catch (...) {
LOG_INFO("RedisManager ExitRedisManager flush failed with unknown exception");
}
exit_ = true;
}

Expand Down Expand Up @@ -141,6 +214,12 @@ bool RedisManager::InitBMPConfig() {
*/
void RedisManager::ResetBMPTable(const std::string & table) {

// Drain the pipeline before reading the current key set: getKeys() runs
// on stateDb_'s connection and would otherwise miss any SETs that are
// still buffered in pipeline_ (which uses an independent connection),
// leaving stale entries in redis after the reset completes.
FlushBMPTables();

std::unique_ptr<swss::Table> stateBMPTable = std::make_unique<swss::Table>(stateDb_.get(), table);
std::vector<std::string> keys;
stateBMPTable->getKeys(keys);
Expand Down
22 changes: 22 additions & 0 deletions Server/src/RedisManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@
#include <swss/dbconnector.h>
#include <swss/table.h>
#include <swss/configdb.h>
#include <swss/redispipeline.h>

#include <string>
#include <list>
#include <map>
#include <memory>
#include <mutex>
#include <unordered_map>
#include <unordered_set>
#include <functional>
#include <vector>
Expand Down Expand Up @@ -118,6 +121,16 @@ class RedisManager {
*/
bool RemoveEntityFromBMPTable(const std::vector<std::string>& keys);

/**
* FlushBMPTables - flush any pending buffered HSET operations on the
* pipelined per-table writers. Should be called at natural batch
* boundaries (e.g. after a full BMP UPDATE message is processed) so
* that buffered redis writes are actually delivered.
*
* \param [in] N/A
*/
void FlushBMPTables();

/**
* Get Key separator for deletion
*
Expand All @@ -126,7 +139,16 @@ class RedisManager {
std::string GetKeySeparator();

private:
/**
* Lookup (or lazily create) a buffered swss::Table writer for the
* given table name, backed by pipeline_. Returns nullptr if the table
* is not in enabledTables_.
*/
swss::Table* GetOrCreateBufferedTable(const std::string& table);

std::shared_ptr<swss::DBConnector> stateDb_;
std::shared_ptr<swss::RedisPipeline> pipeline_;
std::unordered_map<std::string, std::unique_ptr<swss::Table>> bufferedTables_;
std::string separator_;
Logger *logger;
std::unordered_set<std::string> enabledTables_;
Expand Down
14 changes: 14 additions & 0 deletions Server/src/redis/MsgBusImpl_redis.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ void MsgBusImpl_redis::update_Peer(obj_bgp_peer &peer, obj_peer_up_event *up, ob
}

redisMgr_.WriteBMPTable(BMP_TABLE_NEI, keys, fieldValues);
// Peer state transitions are infrequent and BMP consumers expect to
// see them immediately; flush right away rather than waiting for the
// pipeline buffer to fill.
redisMgr_.FlushBMPTables();
}


Expand Down Expand Up @@ -183,7 +187,17 @@ void MsgBusImpl_redis::update_unicastPrefix(obj_bgp_peer &peer, vector<obj_rib>
}

if (!del_keys.empty()) {
// RemoveEntityFromBMPTable already flushes any buffered SETs
// before issuing the DEL, so SET-then-DEL ordering on the same
// key is preserved.
redisMgr_.RemoveEntityFromBMPTable(del_keys);
} else {
// ADD path accumulated buffered set() calls across all rib[i]
// entries above; emit them in a single pipelined round-trip
// instead of one HSET-per-route. Without this flush the buffered
// updates would only be sent when the pipeline buffer fills,
// which can delay BMP state visibility for slow producers.
redisMgr_.FlushBMPTables();
}
}

Expand Down