Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cfgmgr/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ DBGFLAGS = -g
endif

COMMON_ORCH_SOURCE = $(top_srcdir)/orchagent/orch.cpp \
$(top_srcdir)/orchagent/swssstats.cpp \
$(top_srcdir)/orchagent/request_parser.cpp \
$(top_srcdir)/orchagent/response_publisher.cpp \
$(top_srcdir)/lib/recorder.cpp
Expand Down
1 change: 1 addition & 0 deletions orchagent/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ orchagent_SOURCES = \
$(top_srcdir)/lib/orch_zmq_config.cpp \
orchdaemon.cpp \
orch.cpp \
swssstats.cpp \
notifications.cpp \
nhgorch.cpp \
nhgbase.cpp \
Expand Down
31 changes: 31 additions & 0 deletions orchagent/orch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <sys/time.h>
#include "timestamp.h"
#include "orch.h"
#include "swssstats.h"

#include "subscriberstatetable.h"
#include "portsorch.h"
Expand All @@ -16,6 +17,7 @@
using namespace swss;

int gBatchSize = 0;
std::atomic<bool> gSwssStatsRecord(true); // Enable SwssStats by default
Copy link

Copilot AI Apr 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gSwssStatsRecord is enabled by default, but there is no mechanism in this PR to toggle it at runtime (no config/env/CLI hook), and it introduces extra per-task work and a background writer thread. Consider defaulting this to disabled or wiring it to a configuration option so operators can turn it on only when needed.

Copilot uses AI. Check for mistakes.

std::shared_ptr<RingBuffer> Orch::gRingBuffer = nullptr;
std::shared_ptr<RingBuffer> Executor::gRingBuffer = nullptr;
Expand Down Expand Up @@ -248,8 +250,16 @@ void ConsumerBase::addToSync(const KeyOpFieldsValuesTuple &entry, bool onRetry)
string op = kfvOp(entry);

if (!onRetry)
{
/* Record incoming tasks */
Recorder::Instance().swss.record(dumpTuple(entry));

/* Record statistics */
if (gSwssStatsRecord)
{
SwssStats::getInstance()->recordTask(getTableName(), op);
}
}
else
Recorder::Instance().retry.record(dumpTuple(entry).append(DECACHE));

Expand Down Expand Up @@ -557,6 +567,8 @@ void Consumer::drain()
{
if (!m_toSync.empty())
{
size_t size_before = gSwssStatsRecord ? m_toSync.size() : 0;
bool threw = false;
try
{
((Orch *)m_orch)->doTask((Consumer&)*this);
Expand All @@ -565,21 +577,39 @@ void Consumer::drain()
{
SWSS_LOG_ERROR("Exception caught: type=invalid_argument, table=%s, error=%s",
getName().c_str(), e.what());
threw = true;
}
catch (const std::logic_error& e)
{
SWSS_LOG_ERROR("Exception caught: type=logic_error, table=%s, error=%s",
getName().c_str(), e.what());
threw = true;
}
catch (const std::exception& e)
{
SWSS_LOG_ERROR("Exception caught: type=exception, table=%s, error=%s",
getName().c_str(), e.what());
threw = true;
}
catch (...)
{
SWSS_LOG_ERROR("Exception caught: type=unknown, table=%s",
getName().c_str());
threw = true;
}
if (gSwssStatsRecord && size_before > 0)
{
if (threw)
{
SwssStats::getInstance()->recordError(getTableName(), 1);
}
else
{
size_t size_after = m_toSync.size();
uint64_t completed = (size_before > size_after) ? (size_before - size_after) : 0;
if (completed > 0)
SwssStats::getInstance()->recordComplete(getTableName(), completed);
}
}
Comment on lines 572 to +613
Copy link

Copilot AI Apr 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recordComplete() is called even when doTask() throws and the drain path enters one of the catch blocks. That can inflate “COMPLETE” counts during failures. Consider recording completion only on the success path (e.g., after doTask() returns normally), and use recordError() in the exception paths if you want ERROR to reflect task processing failures.

Copilot uses AI. Check for mistakes.
}
}
Expand Down Expand Up @@ -1235,3 +1265,4 @@ void Orch2::doTask(Consumer &consumer)
}
}
}

1 change: 1 addition & 0 deletions orchagent/p4orch/tests/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ CFLAGS_GTEST =
LDADD_GTEST = -lgtest -lgtest_main -lgmock -lgmock_main

p4orch_tests_SOURCES = $(ORCHAGENT_DIR)/orch.cpp \
$(ORCHAGENT_DIR)/swssstats.cpp \
$(ORCHAGENT_DIR)/vrforch.cpp \
$(ORCHAGENT_DIR)/vxlanorch.cpp \
$(ORCHAGENT_DIR)/copporch.cpp \
Expand Down
217 changes: 217 additions & 0 deletions orchagent/swssstats.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
#include "swssstats.h"
#include "dbconnector.h"
#include "table.h"
#include "logger.h"
#include <chrono>
Copy link

Copilot AI Apr 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file uses unordered_map, piecewise_construct, and forward_as_tuple, but the required standard headers (<unordered_map>, <utility>, <tuple>) aren’t included. This may fail to compile depending on transitive includes; add the missing includes explicitly.

Suggested change
#include <chrono>
#include <chrono>
#include <tuple>
#include <unordered_map>
#include <utility>

Copilot uses AI. Check for mistakes.
#include <tuple>
#include <unordered_map>
#include <utility>

using namespace std;
using namespace swss;

#define SWSS_STATS_TABLE "SWSS_STATS"

SwssStats* SwssStats::getInstance()
{
static SwssStats instance;
return &instance;
}

SwssStats::SwssStats(uint32_t interval)
: m_running(true)
, m_interval_sec(interval)
{
SWSS_LOG_ENTER();

// DB connection is deferred to writerThread() to avoid calling
// DBConnector during early orchagent init when COUNTERS_DB may
// not yet be accessible (causes waitForGetResponse crash).
m_thread = make_unique<thread>(&SwssStats::writerThread, this);

SWSS_LOG_NOTICE("SwssStats initialized (interval: %u sec)", m_interval_sec);
}

SwssStats::~SwssStats()
{
SWSS_LOG_ENTER();

{
lock_guard<mutex> lock(m_statsMutex);
m_running = false;
}
// Wake the writer thread immediately instead of waiting up to m_interval_sec
m_cv.notify_all();

if (m_thread && m_thread->joinable())
{
m_thread->join();
}

SWSS_LOG_NOTICE("SwssStats stopped");
}

void SwssStats::recordTask(const string &table_name, const string &op)
{
auto& stats = getOrCreateStats(table_name);

bool updated = false;

if (op == "SET")
{
stats.set_count.fetch_add(1, memory_order_relaxed);
updated = true;
}
else if (op == "DEL")
{
stats.del_count.fetch_add(1, memory_order_relaxed);
updated = true;
}

// Only bump version when a counter was actually updated, so the writer
// thread does not treat unknown ops as changes requiring a Redis write.
// Release ordering ensures counter writes above are visible to the writer
// thread before it observes the version increment.
if (updated)
{
stats.version.fetch_add(1, memory_order_release);
}
}

void SwssStats::recordComplete(const string &table_name, uint64_t count)
{
auto& stats = getOrCreateStats(table_name);
stats.complete_count.fetch_add(count, memory_order_relaxed);
stats.version.fetch_add(1, memory_order_release);
}

void SwssStats::recordError(const string &table_name, uint64_t count)
{
auto& stats = getOrCreateStats(table_name);
stats.error_count.fetch_add(count, memory_order_relaxed);
stats.version.fetch_add(1, memory_order_release);
}

SwssStats::CounterSnapshot SwssStats::getCounters(const string &table_name)
{
lock_guard<mutex> lock(m_statsMutex);

CounterSnapshot snap;
auto it = m_stats.find(table_name);
if (it != m_stats.end())
{
snap.set_count = it->second.set_count.load(memory_order_relaxed);
snap.del_count = it->second.del_count.load(memory_order_relaxed);
snap.complete_count = it->second.complete_count.load(memory_order_relaxed);
snap.error_count = it->second.error_count.load(memory_order_relaxed);
}
return snap;
}

SwssStats::TableStats& SwssStats::getOrCreateStats(const string &table_name)
{
lock_guard<mutex> lock(m_statsMutex);

auto it = m_stats.find(table_name);
if (it == m_stats.end())
{
it = m_stats.emplace(piecewise_construct,
forward_as_tuple(table_name),
forward_as_tuple()).first;

SWSS_LOG_INFO("Created stats for table: %s", table_name.c_str());
}

// Safe to return a reference after releasing the lock: std::map never
// invalidates references to existing elements on insert or erase of
// other elements (unlike unordered_map which rehashes).
return it->second;
}

// File-local helper: serialize a TableStats snapshot into FieldValueTuple list.
// Kept out of the header to avoid forward-declaring FieldValueTuple (which is
// a typedef, not a class, and cannot be forward-declared with 'class').
static void dumpStats(const SwssStats::TableStats &stats,
vector<FieldValueTuple> &values)
{
values.clear();
values.emplace_back("SET", to_string(stats.set_count.load(memory_order_relaxed)));
values.emplace_back("DEL", to_string(stats.del_count.load(memory_order_relaxed)));
values.emplace_back("COMPLETE", to_string(stats.complete_count.load(memory_order_relaxed)));
values.emplace_back("ERROR", to_string(stats.error_count.load(memory_order_relaxed)));
}
Comment on lines +131 to +142
Copy link

Copilot AI Apr 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description mentions latency metrics and queue depth (and “10 metrics per table”), but the implementation currently only serializes 4 counters (SET/DEL/COMPLETE/ERROR). Either implement the additional metrics or update the PR description and any in-code documentation so they match what’s actually exported to COUNTERS_DB.

Copilot uses AI. Check for mistakes.

void SwssStats::writerThread()
{
SWSS_LOG_ENTER();
SWSS_LOG_NOTICE("SwssStats writer thread started");

// Connect to COUNTERS_DB here (not in constructor) so that
// singleton creation during early orchagent startup is safe.
try
{
m_db = make_shared<DBConnector>("COUNTERS_DB", 0);
m_table = make_unique<Table>(m_db.get(), SWSS_STATS_TABLE);
}
catch (const exception& e)
{
SWSS_LOG_ERROR("SwssStats: failed to connect to COUNTERS_DB: %s. "
"Stats will not be written.", e.what());
return;
}

unordered_map<string, uint64_t> last_versions;

while (true)
{
{
unique_lock<mutex> lock(m_statsMutex);
// Wait for either the interval to elapse or a shutdown signal
m_cv.wait_for(lock, chrono::seconds(m_interval_sec),
[this]{ return !m_running.load(memory_order_relaxed); });

if (!m_running.load(memory_order_relaxed))
{
break;
}
}

vector<string> table_names;
vector<vector<FieldValueTuple>> table_values;

{
lock_guard<mutex> lock(m_statsMutex);

for (const auto& entry : m_stats)
{
const string& name = entry.first;
const TableStats& stats = entry.second;

// Acquire ordering pairs with the release in record* methods,
// ensuring we see all counter updates made before version++
uint64_t current_ver = stats.version.load(memory_order_acquire);

auto ver_it = last_versions.find(name);
if (ver_it != last_versions.end() && ver_it->second == current_ver)
{
continue; // No changes since last write
}

last_versions[name] = current_ver;

table_names.push_back(name);
table_values.emplace_back();
dumpStats(stats, table_values.back());
}
}

// Write to Redis outside the lock
for (size_t i = 0; i < table_names.size(); i++)
{
m_table->set(table_names[i], table_values[i]);
SWSS_LOG_DEBUG("Updated stats for: %s", table_names[i].c_str());
}
}

SWSS_LOG_NOTICE("SwssStats writer thread stopped");
}
Loading
Loading