Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/core/compact_object.cc
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,14 @@ class PinnedMap {
absl::flat_hash_map<const void*, PendingRead*> pin_map_;

public:
~PinnedMap() {
// Thread shutdown: free any PendingRead entries that EngineShard::Heartbeat
// never had a chance to drain. Orphan buffers are best-effort — the
// thread's mimalloc heap is going away anyway.
for (auto& [_, pin] : pin_map_)
delete pin;
}
Comment on lines +449 to +455
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

1. Shutdown pin use-after-free 🐞 Bug ☼ Reliability

PinnedMap::~PinnedMap deletes all PendingRead entries even though non-owning threads can still hold
cmn::BorrowedString objects whose destructor calls ReleaseInternal() and dereferences the
PendingRead* pin. This can crash during shard thread teardown / process shutdown if any borrowed GET
reply is still in-flight when the shard thread’s TLS destructors run.
Agent Prompt
### Issue description
`PinnedMap::~PinnedMap()` currently `delete`s every `PendingRead*` in `pin_map_` during thread shutdown. However, `cmn::BorrowedString` can be destroyed on non-owning threads, and its destructor calls `BorrowedStringOps::ReleaseInternal()`, which dereferences the `PendingRead*` pin and decrements its atomic refcount. If the owning thread exits first and runs `~PinnedMap()`, later `BorrowedString` destruction will dereference freed memory (UAF) and can crash.

### Issue Context
The code explicitly documents that non-owning threads unpin by calling `BorrowedString::Unpin()` while the owning thread drains and deletes entries. Destructor-time deletion bypasses the refcount-based safety model.

### Fix Focus Areas
- src/core/compact_object.cc[449-455]

### Suggested fix
Change shutdown cleanup to **never delete pins that might still be referenced**:
- Option A (safest): remove the destructor cleanup entirely (intentionally leak `PendingRead` entries on thread shutdown; the thread heap/process is being torn down anyway).
- Option B: only delete entries whose `refcnt.load(acquire) == 0` (and free orphaned buffers for those), and **leave refcnt>0 entries allocated** to avoid UAF; optionally log/DFATAL in debug builds to catch unexpected outstanding borrows at shutdown.

Also consider (optional hardening): explicitly call `CompactObj::DrainPendingReads()` as part of shard shutdown ordering *before* TLS teardown, but this should be additive—not a substitute for making `~PinnedMap()` safe.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PinnedMap being thread-local destructs during thread destruction.


PendingRead* RegisterPin(const void* ptr);
// Returns true if `ptr` was pinned.
bool Orphan(const void* ptr);
Expand Down
2 changes: 1 addition & 1 deletion src/facade/reply_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ template <typename T> enable_if_t<is_integral_v<T>, char*> write_piece(T num, ch
}

char* write_piece(string_view str, char* dest) {
return (char*)memcpy(dest, str.data(), str.size()) + str.size();
return str.empty() ? dest : (char*)memcpy(dest, str.data(), str.size()) + str.size();
}

} // namespace
Expand Down
6 changes: 5 additions & 1 deletion src/server/engine_shard.cc
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ string EngineShard::TxQueueInfo::Format() const {
}

EngineShard::Stats& EngineShard::Stats::operator+=(const Stats& o) {
static_assert(sizeof(Stats) == 152);
static_assert(sizeof(Stats) == 160);

#define ADD(x) x += o.x

Expand All @@ -254,6 +254,7 @@ EngineShard::Stats& EngineShard::Stats::operator+=(const Stats& o) {
ADD(stream_sequential_accesses);
ADD(stream_random_accesses);
ADD(stream_fetch_all_accesses);
ADD(borrowed_string_views_total);

#undef ADD
return *this;
Expand Down Expand Up @@ -751,6 +752,9 @@ void EngineShard::Heartbeat() {
DVLOG(3) << " Hearbeat";
DCHECK(namespaces);

// Reap zero-copy GET pins whose refcnt has dropped to 0. Cheap and idempotent.
CompactObj::DrainPendingReads();

CacheStats();

// TODO: iterate over all namespaces
Expand Down
1 change: 1 addition & 0 deletions src/server/engine_shard.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class EngineShard {
uint64_t stream_sequential_accesses = 0; // head/tail: XADD, XREAD recent, XTRIM, etc.
uint64_t stream_random_accesses = 0; // arbitrary-ID lookups: XRANGE partial, XDEL, XCLAIM
uint64_t stream_fetch_all_accesses = 0; // full stream scan from beginning
uint64_t borrowed_string_views_total = 0;

Stats& operator+=(const Stats&);
};
Expand Down
2 changes: 2 additions & 0 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3493,6 +3493,8 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio
append("defrag_attempt_total", m.shard_stats.defrag_attempt_total);
append("defrag_realloc_total", m.shard_stats.defrag_realloc_total);
append("defrag_task_invocation_total", m.shard_stats.defrag_task_invocation_total);
append("borrowed_string_views_total", m.shard_stats.borrowed_string_views_total);
append("borrowed_strings_sent_total", m.coordinator_stats.borrowed_strings_sent_total);

// Number of connections that are currently blocked on grabbing interpreter.
append("blocked_on_interpreter", m.coordinator_stats.blocked_on_interpreter);
Expand Down
3 changes: 2 additions & 1 deletion src/server/server_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ ServerState::Stats::Stats(unsigned num_shards)
}

ServerState::Stats& ServerState::Stats::Add(const ServerState::Stats& other) {
static_assert(sizeof(Stats) == 26 * 8, "Stats size mismatch");
static_assert(sizeof(Stats) == 27 * 8, "Stats size mismatch");

#define ADD(x) this->x += (other.x)

Expand Down Expand Up @@ -88,6 +88,7 @@ ServerState::Stats& ServerState::Stats::Add(const ServerState::Stats& other) {
ADD(oom_error_cmd_cnt);
ADD(conn_timeout_events);
ADD(psync_requests_total);
ADD(borrowed_strings_sent_total);

if (this->tx_width_freq_arr.size() > 0) {
DCHECK_EQ(this->tx_width_freq_arr.size(), other.tx_width_freq_arr.size());
Expand Down
9 changes: 9 additions & 0 deletions src/server/server_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,15 @@ class ServerState { // public struct - to allow initialization.
uint64_t oom_error_cmd_cnt = 0;
uint32_t conn_timeout_events = 0;
uint64_t psync_requests_total = 0;

// Number of times SendBulkStringBorrowed reached the reply builder — i.e.,
// a borrowed CompactObj view flowed through the reply path without an
// intermediate std::string copy. Counts both the direct sink path
// (non-squashed) and the CapturingReplyBuilder path
// (squashed / EXEC / pipelined). Should match EngineShard's
// borrowed_string_views_total under read-only workloads.
uint64_t borrowed_strings_sent_total = 0;

std::valarray<uint64_t> tx_width_freq_arr, squash_width_freq_arr;

// Memory size of stored commands during multi-exec in connections
Expand Down
85 changes: 83 additions & 2 deletions src/server/string_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,19 @@
#include "server/generic_family.h"
#include "server/journal/journal.h"
#include "server/search/doc_index.h"
#include "server/server_state.h"
#include "server/table.h"
#include "server/tiered_storage.h"
#include "server/transaction.h"
#include "util/fibers/future.h"

ABSL_FLAG(bool, mget_dedup_keys, false, "If true, MGET will deduplicate keys");

ABSL_FLAG(bool, get_zero_copy, true,
"If true, GET returns a borrowed view into the CompactObj raw payload "
"(zero-copy) for large raw strings; if false, falls back to the "
"materializing path. Toggle to A/B benchmark the zero-copy GET path.");

namespace dfly {

namespace {
Expand All @@ -57,13 +63,51 @@ constexpr uint32_t kMaxStrLen = 1 << 28;

// Either immediately available value or tiering future + result
template <typename T> using TResultOrT = variant<T, TieredStorage::TResult<T>>;
using StringResult = TResultOrT<string>;

// StringResult adds a borrowed-view alternative on top of the generic
// 2-variant shape. Used by read-only commands (GET) that can borrow the
// value directly from the shard's CompactObj instead of materializing an
// owned std::string. Mutating commands (GETDEL/GETEX/GETSET) must NEVER
// produce the cmn::BorrowedString alternative because they free/replace
// the underlying storage.
//
// cmn::BorrowedString carries its own pin + release fn; ownership flows
// from CompactObj::TryBorrow through the variant into SendBulkStringBorrowed,
// which parks the borrow until writev (direct sink) or replay (capture).
using StringResult =
std::variant<std::string, cmn::BorrowedString, TieredStorage::TResult<std::string>>;

StringResult ReadString(DbIndex dbid, string_view key, const PrimeValue& pv, EngineShard* es) {
return pv.IsExternal() ? StringResult{ReadTieredString(dbid, key, pv, es->tiered_storage())}
: StringResult{pv.ToString()};
}

// Read-only fast path for GET: returns a borrowed view directly into the
// shard's CompactObj storage when the value is a raw (NONE_ENC) or ASCII-
// packed (ASCII1_ENC / ASCII2_ENC) large string; otherwise falls back to
// the materializing ReadString.
//
// For NONE_ENC: caller streams the bytes directly.
// For ASCII1/2_ENC: caller's reply path decodes chunk-by-chunk into its own
// scratch (no full decoded buffer is ever held). Either way the borrowed
// bytes remain valid only as long as the underlying storage is not mutated,
// freed, or relocated — see CompactObj::TryBorrow and the
// facade::SinkReplyBuilder::ReplyScope contract.
StringResult ReadStringBorrow(DbIndex dbid, string_view key, const PrimeValue& pv,
EngineShard* es) {
static thread_local bool zero_copy_enabled = absl::GetFlag(FLAGS_get_zero_copy);
Copy link
Copy Markdown

@augmentcode augmentcode Bot May 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReadStringBorrow caches FLAGS_get_zero_copy in a static thread_local, so runtime changes (e.g. via CONFIG SET if this ever becomes mutable) won’t propagate consistently across shard threads. Consider clarifying that get_zero_copy is startup-only, or otherwise ensuring updates are observed.

Severity: medium

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

if (zero_copy_enabled && !pv.IsExternal()) {
if (auto raw = pv.TryBorrow()) {
// TryBorrow already registered the pin and stamped read_pending. We
// move the whole borrow through; SendBulkStringBorrowed parks the pin
// until the reply's writev completes (or replay, for captures).
++es->stats().borrowed_string_views_total;
return StringResult{std::move(*raw)};
}
}
return ReadString(dbid, key, pv, es);
}

// Helper for performing SET operations with various options
class SetCmd {
public:
Expand Down Expand Up @@ -678,6 +722,31 @@ struct GetReplies {
Send(iores.error().message());
}

// Specialized overload for StringResult (3-variant: string / string_view /
Copy link
Copy Markdown

@augmentcode augmentcode Bot May 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment says StringResult is a 3-variant string / string_view / tiering future, but the variant is now std::string / cmn::BorrowedString / tiering future. Keeping this accurate matters because BorrowedString has pin/lifetime semantics that differ from a plain string_view.

Severity: low

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

// tiering future). The string_view alternative carries a borrowed view into
// the shard's storage and must be sent before any mutation can race —
// ReplyScope provides the lifetime contract during reply construction.
void Send(StringResult&& res) const {
if (holds_alternative<std::string>(res))
return Send(get<std::string>(res));
if (holds_alternative<cmn::BorrowedString>(res)) {
// Move the BorrowedString into SendBulkStringBorrowed; the reply
// builder dispatches on encoding (NONE_ENC iovec ref, ASCII chunked
// decode) and parks the pin until writev completes — or, for a
// CapturingReplyBuilder, moves it into the captured payload so the
// pin lives through capture/replay.
++ServerState::tlocal()->stats.borrowed_strings_sent_total;
rb->SendBulkStringBorrowed(std::move(get<cmn::BorrowedString>(res)));
return;
Comment thread
romange marked this conversation as resolved.
}
auto fut = get<TieredStorage::TResult<std::string>>(std::move(res));
io::Result<std::string> iores = fut.Get();
if (iores.has_value())
Send(*iores);
else
Send(iores.error().message());
}

void Send(size_t val) const {
rb->SendLong(val);
}
Expand All @@ -686,6 +755,13 @@ struct GetReplies {
rb->SendBulkString(str);
}

// Explicit overload to disambiguate against Send(StringResult&&): std::string
// could otherwise resolve either way (string -> variant alternative or
// string -> string_view), making Send(std::string) ambiguous.
void Send(const std::string& str) const {
rb->SendBulkString(str);
}

RedisReplyBuilder* rb;
};

Expand Down Expand Up @@ -1199,7 +1275,12 @@ cmd::CmdR CmdGet(CmdArgList args, CommandContext* cmd_cntx) {
if (!it_res.ok())
return it_res.status();

return ReadString(tx->GetDbIndex(), key, (*it_res)->second, es);
// GET is read-only: prefer a borrowed view into the shard storage for
// raw (NONE_ENC) large strings, skipping the per-call std::string
// allocation. Falls back to ReadString for inline / int / small /
// encoded / external values. The borrowed view is consumed within the
// reply scope before any mutation can race.
return ReadStringBorrow(tx->GetDbIndex(), key, (*it_res)->second, es);
};

GetReplies{cmd_cntx->rb()}.Send(co_await cmd::SingleHopT(cb));
Expand Down
Loading
Loading