LruDedup queue policy for Notification Consumer#1191
Conversation
|
/azp run |
|
Azure Pipelines successfully started running 1 pipeline(s). |
securely1g
left a comment
There was a problem hiding this comment.
Review: LruDedup Queue Policy for NotificationConsumer (swss-common)
Summary
This is the foundational PR that adds opt-in queue strategy primitives to swss::NotificationConsumer. It introduces:
NotificationQueueBaseabstract interface +FifoNotificationQueue(preserves legacy behavior) +LruDedupNotificationQueue(collapses byte-identical payloads)NotificationQueuePolicyenum + new 5-arg constructorsetOpAllowList()— admission filter that drops messages by op before queueingsetStatsLabel()— orch-qualified label for syslog disambiguation- Atomic stats counters +
getStats()+ self-throttled 5sSWSS_LOG_NOTICEinline fromprocessReply/push - Feature-test macro
SWSS_NOTIFICATIONCONSUMER_HAS_LRU_DEDUPfor downstream conditional compilation - Unit tests for both queue strategies
+620/-17, 5 files, clean single commit.
What's Good
- ABI preservation — 4-arg constructor symbol is unchanged; old binaries (python3-swsscommon, etc.) continue to resolve against the same mangled name. The 5-arg ctor is a distinct symbol. Well thought out.
- Strategy pattern — clean
NotificationQueueBaseabstraction allows adding new policies without touching consumer logic peekOp()is efficient — bounded scan, no JSON parse, returns early on malformed input- Self-throttled logging — 5s interval + idle suppression means storm conditions produce steady but bounded log output
- Atomic stats for cross-thread reads —
push()/pop()remain single-threaded, atomics are only for the telemetry reader. Correct use ofmemory_order_relaxedsince these are independent monotonic counters with no ordering dependencies between them. - Feature-test macro — clean downstream integration pattern; companion PRs can build against either old or new libswsscommon
- Good unit tests — covers FIFO ordering, dedup collapse, drain order, HWM monotonicity, memory bounding, and empty-pop safety
🔴 High Priority Issues
1. LRU reordering changes drain semantics
The LRU-dedup queue reorders entries: when a duplicate arrives, the old position is erased and re-inserted at the tail. This means pop() returns entries in "least-recently-seen" order, NOT arrival order.
For FDB events this is fine (end-state idempotent), but it's a subtle semantic difference that could surprise future consumers who opt in without understanding this. The class comment says "Drain order: last-seen time per unique payload" which is correct, but I'd suggest:
- Adding a stronger warning in the
NotificationQueuePolicy::LruDedupenum doc that ordering is not preserved - Consider whether a simpler "deduplicate but preserve original arrival order" policy (keep first seen position, drop subsequent duplicates) would be safer as the default dedup strategy
2. m_idx uses full message string as key — O(n) hash on large payloads
FDB event JSON payloads can be substantial (especially batch notifications with many FVTs). The unordered_map<string, iterator> hashes the entire message string on every push() and find(). Under storm conditions with many distinct payloads (different MACs), this is O(payload_length) per operation.
For the target use case (FDB storms with many identical payloads), the dedup hit rate is high and this is amortized well. But for consumers with mostly-distinct payloads, the hash overhead on large strings could be significant. Consider:
- Documenting that this policy is most effective when dedup hit rate is high
- Optionally pre-computing a hash (e.g., xxHash) at admission time for large payloads
3. push() is defined inline in the header
The LruDedupNotificationQueue::push() method is ~30 lines with multiple branches, atomic operations, and a function call (maybeLogStats()). Defining it inline in the header means:
- Every translation unit that includes
notificationconsumer.hgets a copy - Changes to push logic require recompiling all includers
- The compiler may or may not inline it depending on optimization level
Move to the .cpp file alongside maybeLogStats() for consistency and to reduce header bloat.
🟡 Medium Priority Issues
4. subscribeWithRetry() infinite loop with no backoff
while (true)
{
try { subscribe(); break; }
catch(...) { delete m_subscribe; SWSS_LOG_ERROR(...); }
}This busy-loops on Redis connection failure with no sleep/backoff. Pre-existing behavior, but now factored into a named function — good opportunity to add exponential backoff or at least a sleep(1).
5. maybeLogStats() called on every processReply and every push
Under storm conditions (the exact scenario this targets), processReply is called at event rate. Even though the 5s throttle gate short-circuits quickly, the steady_clock::now() call on every message is a syscall (clock_gettime) that adds up at 100k+ msg/s. Consider:
- Checking only every N-th message (e.g.,
if (m_received % 1024 == 0)) - Or moving stats logging to a timer-driven path instead of inline
6. No maximum queue depth enforcement
LruDedup bounds depth to count(distinct payloads) which is better than unbounded FIFO, but under a MAC storm with 100k distinct MACs (different source MACs flapping), the queue still grows to 100k entries. Consider an optional max_depth parameter that drops oldest entries when exceeded (true LRU eviction), or at minimum document this limitation.
7. Thread safety documentation
The header says "Single-threaded; no internal locking required" for the queue, but getStats() is explicitly designed for cross-thread reads. This is correct (relaxed atomics are sufficient for independent counters), but the documentation should be more precise: "push()/pop()/front()/empty()/size() must be called from a single thread. getStats() is safe to call from any thread."
🟢 Minor / Style
8. peekOp() returns std::string with a comment explaining why (C++14, no heterogeneous lookup). Consider adding a TODO for C++17 migration where string_view would avoid the allocation.
9. The Stats struct in LruDedupNotificationQueue has a comment about gcc/C++14 brace-init issues. This is a known quirk but might confuse future maintainers — a one-line reference to the gcc bug number would help.
10. m_label in LruDedupNotificationQueue is not atomic but can be written via setLabel() and read from maybeLogStats(). If setLabel is only called before the push/pop path starts, it's fine — but document this assumption.
11. The kStatsLogInterval is defined twice (once in the anonymous namespace in the .cpp, once implicitly referenced from the queue class). Consider making it a shared constant or at least documenting that both use the same 5s interval.
Questions
- Has the LRU reordering (issue #1) been validated as safe for all currently-opted-in consumers? For FDB it's fine, but port_state_change notifications where UP→DOWN→UP could collapse DOWN+UP into just UP (if the payloads are byte-identical, which they wouldn't be since state differs) — actually this is safe since different states = different byte strings. Worth a comment though.
- What's the measured overhead of
steady_clock::now()per message on the target platform? On x86 with vDSO it's ~20ns, but on some ARM platforms it can be 200ns+. - Is there a plan to expose
max_depthas a configurable parameter, or is "bounded by distinct payloads" considered sufficient?
Test Coverage
The unit tests are solid for the queue classes themselves. Missing:
- No test for
peekOp()edge cases (empty string, malformed JSON, missing closing quote) - No test for
setOpAllowListfiltering behavior (would need a mock Redis or the fullNotificationConsumerwired up) - No test for the
maybeLogStatsthrottling behavior
These are nice-to-haves; the core correctness tests are present.
Verdict: Well-designed foundational change with good ABI compatibility story and clean abstraction. The LRU reordering semantics (issue #1) should be more prominently documented, and the inline push() definition (issue #3) should move to the .cpp. The steady_clock::now() per-message overhead (issue #5) is worth measuring under load. Overall good to merge with minor fixes — this unblocks the companion sairedis and swss PRs.
|
@senthil-nexthop , can you check the comments, and also why pr is failing? |
2de32d0 to
6d2e378
Compare
|
/azp run |
|
Azure Pipelines successfully started running 1 pipeline(s). |
6d2e378 to
9def60e
Compare
|
/azp run |
|
Azure Pipelines successfully started running 1 pipeline(s). |
Added a comment in the LruDedup policy explaining the drain semantics.
Deduplication while preserving original order will lead to incorrect state for FDB events. For example, let's say the events received are: (A) LEARN(mac1, port1), (B) LEARN(mac1, port2) and (C) LEARN(mac1, port1).
The amortized cost is lower when dedup hit rate is high, which is observed to be 90%+ in the tests. Added a comment to reflect that.
Good catch, done.
Outside the scope of this PR, we should take this up as a separate issue and add optimizations.
Done, added a N-th message check to minimize cost of reading clock.
Added a comment to document this.
Done, updated the comment.
Updated the comments to address the style issue. Promoted peekOp() to make a testable function.
Both FDB and link state events are idempotent and safe.
Added a N-th message gate to minimize cost.
This is strictly not necessary for FDB and ports event, but it's a good optimization to add for future event types.
Added these tests.
|
@lguohan I've addressed the comments, added new tests and updated the PR. The DCO PR failure is fixed, but the pr is failing because of an upstream issue that is affecting all PRs. |
9def60e to
87a91f9
Compare
|
/azp run |
|
Azure Pipelines successfully started running 1 pipeline(s). |
|
@senthil-nexthop , can you check a little bit about the coverage? |
87a91f9 to
753bc82
Compare
|
/azp run |
|
Azure Pipelines successfully started running 1 pipeline(s). |
Signed-off-by: Senthil Krishnamurthy <senthil@nexthop.ai>
753bc82 to
c87533a
Compare
|
/azp run |
|
Azure Pipelines successfully started running 1 pipeline(s). |
@lguohan , added more test cases to increase coverage and all the checks have passed. The other PRs in sonic-swss and sonic-sairedis are failing because they have a dependency on symbols introduced by this PR. Once you approve and merge this, I can monitor the other 2 PRs and make sure the checks pass. |
What I did
Adds opt-in primitives to
swss::NotificationConsumerfor bounding queue growth and reducing cross-fanout cost on shared Redis pubsub channels.How I did it
NotificationQueuePolicy::LruDedup) — collapses byte-identical payloads on enqueue (std::list+std::unordered_map<string, iter>). Drain order: "last-seen time per unique payload." Memory bounded bycount(distinct in-flight payloads).NotificationConsumerconstructor takes the policy. The 4-arg constructor's mangled symbol is preserved verbatim (no ABI break forpython3-swsscommon, etc.).setOpAllowList(ops)— admission filter that drops messages whose JSON-array leadingopis not in the set, before they consume queue memory. Defends against cross-fanout on shared channels like"NOTIFICATIONS". Empty set (default) preserves legacy behavior.setStatsLabel(label)— orch-qualified label so syslog can distinguish multiple consumers SUBSCRIBE'd to the same channel.received,dropped_allowlist,pushed,dedup_hits,high_watermark,current_depth) +getStats()+ a self-throttled 5 sSWSS_LOG_NOTICEsummary inline fromprocessReply/push.How to verify
Unit tests
Related work
HLD: sonic-net/SONiC#2334
sonic-sairedis PR: sonic-net/sonic-sairedis#1899
sonic-swss PR: sonic-net/sonic-swss#4586